Developing lightweight computation at the DSG edge

Commit b029d8a1 authored by Mathias Weber's avatar Mathias Weber
Browse files

wip initial version

parents
# Created by .ignore support plugin (hsz.mobi)
### Go template
# Binaries for programs and plugins
*.exe
*.dll
*.so
*.dylib
# Test binary, build with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Project-local glide cache, RE: https://github.com/Masterminds/glide/issues/736
.glide/
# ignore intellij project
.idea/
*.iml
\ No newline at end of file
get_protogen:
go get -u github.com/golang/protobuf/protoc-gen-go
protogen: get_protogen
protoc --go_out=$(shell pwd) antidote.proto
This diff is collapsed.
syntax = "proto2";
// Java package specifiers
option java_package = "eu.antidotedb.antidotepb";
option java_outer_classname = "AntidotePB";
option go_package = "antidoteclient";
enum CRDT_type {
COUNTER = 3;
ORSET = 4;
LWWREG = 5;
MVREG = 6;
INTEGER = 7;
GMAP = 8;
AWMAP = 9;
RWSET = 10;
RRMAP = 11;
FATCOUNTER = 12;
POLICY = 13;
}
//------------------
// Error messages
message ApbErrorResp {
required bytes errmsg = 1;
required uint32 errcode = 2;
}
//------------------
// Counter
// Counter increment requenst
message ApbCounterUpdate {
// inc indicates the value to be incremented. To decrement, use a negative value. If no value is given, it will be considered as an increment by 1
optional sint64 inc = 1;
}
// Response operation
message ApbGetCounterResp {
required sint32 value = 1;
}
//------------------
// Set
// Set updates request
message ApbSetUpdate{
enum SetOpType // TODO merge adds/removes
{
ADD = 1;
REMOVE = 2;
}
required SetOpType optype = 1;
repeated bytes adds = 2;
repeated bytes rems = 3;
}
// Get set request
message ApbGetSetResp {
repeated bytes value = 1;
}
//------------------
// LWW-register
// Register update
message ApbRegUpdate {
required bytes value = 1;
}
// Response operation
message ApbGetRegResp {
required bytes value = 1;
}
//------------------
// MV-register
// use ApbRegUpdate for updates
// response:
message ApbGetMVRegResp {
repeated bytes values = 1;
}
//------------------
// Policy
message ApbPolicyUpdate {
repeated bytes permissions = 1;
}
message ApbGetPolicyResp {
repeated bytes permissions = 1;
}
//------------------
// Integer
message ApbIntegerUpdate {
// choose one of the following:
// increment the integer
optional sint64 inc = 1;
// set the integer to a number
optional sint64 set = 2;
}
message ApbGetIntegerResp {
required sint64 value = 1;
}
//------------------
// Map
message ApbMapKey {
required bytes key = 1;
required CRDT_type type = 2;
}
message ApbMapUpdate {
repeated ApbMapNestedUpdate updates = 1;
repeated ApbMapKey removedKeys = 2;
}
message ApbMapNestedUpdate {
required ApbMapKey key = 1;
required ApbUpdateOperation update = 2;
}
message ApbGetMapResp {
repeated ApbMapEntry entries = 1;
}
message ApbMapEntry {
required ApbMapKey key = 1;
required ApbReadObjectResp value = 2;
}
// General reset operation
message ApbCrdtReset {
}
// Response operation
message ApbOperationResp {
required bool success = 1;
optional uint32 errorcode = 2;
}
//--------------------------------------------------------------
// Properties parameters of a transaction
message ApbTxnProperties {
optional uint32 read_write = 1 ; //default = 0 = read_write, 1 = read_only, 2 = write_only
optional uint32 red_blue = 2 ; // default = 0 = blue, 1 = red
}
// Object (Key) representation
message ApbBoundObject {
required bytes key = 1;
required CRDT_type type = 2;
required bytes bucket = 3;
}
// Objects to be read
message ApbReadObjects {
repeated ApbBoundObject boundobjects = 1;
required bytes transaction_descriptor = 2;
}
// An Object to be updated with specified operation
message ApbUpdateOp {
required ApbBoundObject boundobject = 1;
required ApbUpdateOperation operation = 2;
}
message ApbUpdateOperation { // TODO use this above
optional ApbCounterUpdate counterop = 1;
optional ApbSetUpdate setop = 2;
optional ApbRegUpdate regop = 3;
optional ApbIntegerUpdate integerop = 4;
optional ApbMapUpdate mapop = 5;
optional ApbCrdtReset resetop = 6;
optional ApbPolicyUpdate policyop = 7;
}
// Objects to be updated
message ApbUpdateObjects {
repeated ApbUpdateOp updates = 1;
required bytes transaction_descriptor = 2;
}
// Start Transaction
message ApbStartTransaction {
optional bytes timestamp = 1;
optional ApbTxnProperties properties = 2;
}
// Abort Transaction
message ApbAbortTransaction {
required bytes transaction_descriptor = 1;
}
// Commit Transaction
message ApbCommitTransaction {
required bytes transaction_descriptor = 1;
}
message ApbStaticUpdateObjects{
required ApbStartTransaction transaction = 1;
repeated ApbUpdateOp updates = 2;
}
message ApbStaticReadObjects{
required ApbStartTransaction transaction = 1;
repeated ApbBoundObject objects = 2;
}
//Start transaction response
message ApbStartTransactionResp {
required bool success = 1;
optional bytes transaction_descriptor = 2;
optional uint32 errorcode = 3;
}
//Read Objects Response
message ApbReadObjectResp {
// one of the following:
optional ApbGetCounterResp counter = 1;
optional ApbGetSetResp set = 2;
optional ApbGetRegResp reg = 3;
optional ApbGetMVRegResp mvreg = 4;
optional ApbGetIntegerResp int = 5;
optional ApbGetMapResp map = 6;
optional ApbGetPolicyResp policy = 7;
}
message ApbReadObjectsResp {
required bool success = 1;
repeated ApbReadObjectResp objects = 2;
optional uint32 errorcode = 3;
}
// Commit Response
message ApbCommitResp {
required bool success = 1;
optional bytes commit_time = 2;
optional uint32 errorcode = 3;
}
message ApbStaticReadObjectsResp {
required ApbReadObjectsResp objects = 1;
required ApbCommitResp committime = 2;
}
package antidoteclient
import (
"gopkg.in/fatih/pool.v2"
"net"
"fmt"
"math/rand"
"time"
"io"
"github.com/golang/protobuf/proto"
"encoding/binary"
)
const INITIAL_POOL_SIZE = 1
const MAX_POOL_SIZE = 50
type Client struct {
pools []pool.Pool
}
type Host struct {
Name string
Port int
}
func NewClient(hosts []Host) (client *Client, err error) {
pools := make([]pool.Pool, len(hosts))
for i, h := range hosts {
p, err := pool.NewChannelPool(INITIAL_POOL_SIZE, MAX_POOL_SIZE, func () (net.Conn, error) { return net.Dial("tcp", fmt.Sprint("{}:{}", h.Name, h.Port)) })
if err != nil {
return
}
pools[i] = p
}
client = &Client {
pools: pools,
}
return
}
func (client *Client) Close() {
for _, p := range client.pools {
p.Close()
}
}
func (client *Client) getConnection() (c *Connection, err error) {
// maybe make this global?
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for _, i := range r.Perm(len(client.pools)) {
p := client.pools[i]
con, err := p.Get()
if err != nil {
return
}
c = &Connection{
Conn: con,
pool: p,
}
return
}
err = fmt.Errorf("All connections dead")
return
}
// a close already puts the connection back into the right pool
type Connection struct {
net.Conn
pool pool.Pool
}
type InteractiveTransaction struct {
TxID []byte
con Connection
}
func (op *ApbStartTransaction) encode(writer io.Writer) (err error) {
msg, err := proto.Marshal(op)
if err != nil {
return
}
msgsize := len(msg)
buf := make([]byte, 5)
binary.BigEndian.PutUint32(buf[0:4], uint32(msgsize))
buf[5] = 119
writer.Write(buf)
writer.Write(msg)
return nil
}
func (client *Client) StartTransaction() (tx *InteractiveTransaction, err error) {
con, err := client.getConnection()
if err != nil {
return
}
apbtxn := &ApbStartTransaction{
Properties: &ApbTxnProperties{},
}
apbtxn.encode(con)
// TODO get response, extract txid, return object
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment