Developing lightweight computation at the DSG edge

Commit 776530da authored by Mathias Weber's avatar Mathias Weber
Browse files

update to new CRDT types

parent b9d1b368
{
"spellright.language": "en",
"spellright.documentTypes": [
"markdown",
"latex",
"plaintext"
]
}
\ No newline at end of file
# Antidote Go Client Library
## How to install
Get the library using Go:
```
go get github.com/AntidoteDB/antidote-go-client
```
Then import the library in your code:
```
import (
antidote "github.com/AntidoteDB/antidote-go-client"
)
```
## How to use
To connect to a running Antidote instance, you have to create a client using ``client, err := antidote.NewClient(...)```.
The function takes one or more host definitions consisting of a host name and the protocol buffer port.
To connect to an Antidote instance running on the same machine with default port, you pass `Host{"127.0.0.1", 8087}` to the `NewClient` function.
Do not forget to defer the close method `defer client.Close()`.
The client manages a connection pool and picks a random connection to a random host whenever a connection is required.
Operations are executed on the data store using a `Bucket` object.
```
bucket := antidote.Bucket{[]byte("test")}
```
The bucket name is given as byte-slice.
A bucket object includes functions for reading and updating persistent objects in the data store.
These functions take as parameter a transaction, which can either be an `InteractiveTransaction` or a `StaticTransaction`.
Interactive transactions all to combine multiple read- and update-operations into an atomic transaction.
Updates issued in the context of an interactive transaction are visible to read operations issued in the same context after the updates.
Interactive transactions have to be committed in order to make the updates visible to subsequent transactions.
```
tx, err := client.StartTransaction()
if err != nil {
... // handle error
}
... //use transaction
err = tx.Commit()
```
Static transactions can be seen as one-shot transactions for executing a set of updates or a read operation.
Static transactions do not have to be committed or closed and are mainly handled by the Antidote server.
```
tx := client.CreateStaticTransaction()
```
The `Bucket.Update(...)` function takes, in addition to a transaction, a list of CRDT updates.
These update objects are created using the following functions:
- SetAdd(key Key, elems ...[]byte)
- SetRemove(key Key, elems ...[]byte)
- CounterInc(key Key, inc int64)
- RegPut(key Key, value []byte)
- MVRegPut(key Key, value []byte)
- MapUpdate(key Key, updates ...*CRDTUpdate)
The first 5 updates are straight forward updates of sets, counters, registers and multi-value registers.
The map update is more complex in that it takes update of the keys inside the map as parameter.
To update the key `key1` in map `map1` referring to a counter, the following update is created:
```
antidote.MapUpdate([]byte("map1"),
antidote.CounterInc([]byte("key1"), 1)
)
```
These updates are executed in the context of a transaction using the `Update` function of the `Bucket`.
This diff is collapsed.
syntax = "proto2"; syntax = "proto2";
// Java package specifiers // Java package specifiers
option java_package = "eu.antidotedb.antidotepb"; option java_package = "com.basho.riak.protobuf";
option java_outer_classname = "AntidotePB"; option java_outer_classname = "AntidotePB";
option go_package = "antidoteclient"; option go_package = "antidoteclient";
...@@ -10,23 +10,15 @@ enum CRDT_type { ...@@ -10,23 +10,15 @@ enum CRDT_type {
ORSET = 4; ORSET = 4;
LWWREG = 5; LWWREG = 5;
MVREG = 6; MVREG = 6;
INTEGER = 7;
GMAP = 8; GMAP = 8;
AWMAP = 9;
RWSET = 10; RWSET = 10;
RRMAP = 11; RRMAP = 11;
FATCOUNTER = 12; FATCOUNTER = 12;
POLICY = 13; FLAG_EW = 13;
FLAG_DW = 14;
} }
//------------------
// Error messages
message ApbErrorResp {
required bytes errmsg = 1;
required uint32 errcode = 2;
}
//------------------ //------------------
// Counter // Counter
...@@ -85,33 +77,6 @@ message ApbGetMVRegResp { ...@@ -85,33 +77,6 @@ message ApbGetMVRegResp {
repeated bytes values = 1; repeated bytes values = 1;
} }
//------------------
// Policy
message ApbPolicyUpdate {
repeated bytes permissions = 1;
}
message ApbGetPolicyResp {
repeated bytes permissions = 1;
}
//------------------
// Integer
message ApbIntegerUpdate {
// choose one of the following:
// increment the integer
optional sint64 inc = 1;
// set the integer to a number
optional sint64 set = 2;
}
message ApbGetIntegerResp {
required sint64 value = 1;
}
//------------------ //------------------
// Map // Map
...@@ -140,6 +105,18 @@ message ApbMapEntry { ...@@ -140,6 +105,18 @@ message ApbMapEntry {
required ApbReadObjectResp value = 2; required ApbReadObjectResp value = 2;
} }
//-------------------
// Flags
message ApbFlagUpdate {
required bool value = 1;
}
message ApbGetFlagResp {
required bool value = 1;
}
// General reset operation // General reset operation
message ApbCrdtReset { message ApbCrdtReset {
...@@ -184,10 +161,9 @@ message ApbUpdateOperation { // TODO use this above ...@@ -184,10 +161,9 @@ message ApbUpdateOperation { // TODO use this above
optional ApbCounterUpdate counterop = 1; optional ApbCounterUpdate counterop = 1;
optional ApbSetUpdate setop = 2; optional ApbSetUpdate setop = 2;
optional ApbRegUpdate regop = 3; optional ApbRegUpdate regop = 3;
optional ApbIntegerUpdate integerop = 4;
optional ApbMapUpdate mapop = 5; optional ApbMapUpdate mapop = 5;
optional ApbCrdtReset resetop = 6; optional ApbCrdtReset resetop = 6;
optional ApbPolicyUpdate policyop = 7; optional ApbFlagUpdate flagop = 7;
} }
// Objects to be updated // Objects to be updated
...@@ -237,9 +213,8 @@ message ApbReadObjectResp { ...@@ -237,9 +213,8 @@ message ApbReadObjectResp {
optional ApbGetSetResp set = 2; optional ApbGetSetResp set = 2;
optional ApbGetRegResp reg = 3; optional ApbGetRegResp reg = 3;
optional ApbGetMVRegResp mvreg = 4; optional ApbGetMVRegResp mvreg = 4;
optional ApbGetIntegerResp int = 5;
optional ApbGetMapResp map = 6; optional ApbGetMapResp map = 6;
optional ApbGetPolicyResp policy = 7; optional ApbGetFlagResp flag = 7;
} }
message ApbReadObjectsResp { message ApbReadObjectsResp {
required bool success = 1; required bool success = 1;
......
...@@ -2,10 +2,11 @@ package antidoteclient ...@@ -2,10 +2,11 @@ package antidoteclient
import ( import (
"fmt" "fmt"
"gopkg.in/fatih/pool.v2"
"math/rand" "math/rand"
"net" "net"
"time" "time"
"gopkg.in/fatih/pool.v2"
) )
const INITIAL_POOL_SIZE = 1 const INITIAL_POOL_SIZE = 1
...@@ -73,7 +74,6 @@ type connection struct { ...@@ -73,7 +74,6 @@ type connection struct {
pool pool.Pool pool pool.Pool
} }
// Starts an interactive transaction and registers it on the Antidote server. // Starts an interactive transaction and registers it on the Antidote server.
// The connection used to issue reads and updates is sticky; // The connection used to issue reads and updates is sticky;
// interactive transactions are only valid local to the server they are started on. // interactive transactions are only valid local to the server they are started on.
......
package antidoteclient package antidoteclient
import ( import (
"testing" "bytes"
"fmt" "fmt"
"sync" "sync"
"bytes" "testing"
"time" "time"
) )
...@@ -39,7 +39,6 @@ func TestSimple(t *testing.T) { ...@@ -39,7 +39,6 @@ func TestSimple(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
if counterVal != 1 { if counterVal != 1 {
t.Fatalf("Counter value should be 1 but is %d", counterVal) t.Fatalf("Counter value should be 1 but is %d", counterVal)
} }
...@@ -67,7 +66,6 @@ func TestSetUpdate(t *testing.T) { ...@@ -67,7 +66,6 @@ func TestSetUpdate(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
setVal, err := bucket.ReadSet(tx, key) setVal, err := bucket.ReadSet(tx, key)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
...@@ -78,10 +76,9 @@ func TestSetUpdate(t *testing.T) { ...@@ -78,10 +76,9 @@ func TestSetUpdate(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
for _, expected := range []string{"test1", "value2", "inset3"} {
for _,expected := range []string{"test1", "value2", "inset3"} {
found := false found := false
for _,val := range setVal { for _, val := range setVal {
if string(val) == expected { if string(val) == expected {
found = true found = true
break break
...@@ -124,16 +121,16 @@ func TestMap(t *testing.T) { ...@@ -124,16 +121,16 @@ func TestMap(t *testing.T) {
if v, e := mapVal.Counter(Key("counter")); e != nil || v != 13 { if v, e := mapVal.Counter(Key("counter")); e != nil || v != 13 {
t.Fatalf("Wrong counter value: %d", v) t.Fatalf("Wrong counter value: %d", v)
} }
if v,e := mapVal.Reg(Key("reg")); e != nil || !bytes.Equal(v, []byte("Hello World")) { if v, e := mapVal.Reg(Key("reg")); e != nil || !bytes.Equal(v, []byte("Hello World")) {
t.Fatalf("Wrong reg value: %p", v) t.Fatalf("Wrong reg value: %p", v)
} }
v,_ := mapVal.Set(Key("set")) v, _ := mapVal.Set(Key("set"))
if len(v) != 2 { if len(v) != 2 {
t.Fatal("Wrong number of elements in set") t.Fatal("Wrong number of elements in set")
} }
for _,expected := range []string{"A", "B"} { for _, expected := range []string{"A", "B"} {
found := false found := false
for _,val := range v { for _, val := range v {
if string(val) == expected { if string(val) == expected {
found = true found = true
break break
...@@ -172,7 +169,6 @@ func TestStatic(t *testing.T) { ...@@ -172,7 +169,6 @@ func TestStatic(t *testing.T) {
} }
} }
// tests for many updates, not enabled // tests for many updates, not enabled
// this is a bit faster than the sequential one, if number of threads in configured correctly // this is a bit faster than the sequential one, if number of threads in configured correctly
...@@ -188,13 +184,13 @@ func testManyUpdates(t *testing.T) { ...@@ -188,13 +184,13 @@ func testManyUpdates(t *testing.T) {
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
numThreads := 3 numThreads := 5
wg.Add(numThreads) wg.Add(numThreads)
for k:=0; k<numThreads; k++ { for k := 0; k < numThreads; k++ {
go func() { go func() {
defer wg.Done() defer wg.Done()
for i := 0; i < 10000; i++ { for i := 0; i < 6000; i++ {
tx, err := client.StartTransaction() tx, err := client.StartTransaction()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
......
...@@ -3,8 +3,9 @@ package antidoteclient ...@@ -3,8 +3,9 @@ package antidoteclient
import ( import (
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"github.com/golang/protobuf/proto"
"io" "io"
"github.com/golang/protobuf/proto"
) )
func readMsgRaw(reader io.Reader) (data []byte, err error) { func readMsgRaw(reader io.Reader) (data []byte, err error) {
...@@ -82,10 +83,6 @@ func decodeOperationResp(reader io.Reader) (op *ApbOperationResp, err error) { ...@@ -82,10 +83,6 @@ func decodeOperationResp(reader io.Reader) (op *ApbOperationResp, err error) {
return return
} }
switch data[0] { switch data[0] {
case 0:
// error
err = decodeError(data[1:])
return
case 111: case 111:
// transaction response // transaction response
resp := &ApbOperationResp{} resp := &ApbOperationResp{}
...@@ -106,10 +103,6 @@ func decodeStartTransactionResp(reader io.Reader) (op *ApbStartTransactionResp, ...@@ -106,10 +103,6 @@ func decodeStartTransactionResp(reader io.Reader) (op *ApbStartTransactionResp,
return return
} }
switch data[0] { switch data[0] {
case 0:
// error
err = decodeError(data[1:])
return
case 124: case 124:
// transaction response // transaction response
resp := &ApbStartTransactionResp{} resp := &ApbStartTransactionResp{}
...@@ -130,10 +123,6 @@ func decodeReadObjectsResp(reader io.Reader) (op *ApbReadObjectsResp, err error) ...@@ -130,10 +123,6 @@ func decodeReadObjectsResp(reader io.Reader) (op *ApbReadObjectsResp, err error)
return return
} }
switch data[0] { switch data[0] {
case 0:
// error
err = decodeError(data[1:])
return
case 126: case 126:
// transaction response // transaction response
resp := &ApbReadObjectsResp{} resp := &ApbReadObjectsResp{}
...@@ -154,10 +143,6 @@ func decodeCommitResp(reader io.Reader) (op *ApbCommitResp, err error) { ...@@ -154,10 +143,6 @@ func decodeCommitResp(reader io.Reader) (op *ApbCommitResp, err error) {
return return
} }
switch data[0] { switch data[0] {
case 0:
// error
err = decodeError(data[1:])
return
case 127: case 127:
// transaction response // transaction response
resp := &ApbCommitResp{} resp := &ApbCommitResp{}
...@@ -178,10 +163,6 @@ func decodeStaticReadObjectsResp(reader io.Reader) (op *ApbStaticReadObjectsResp ...@@ -178,10 +163,6 @@ func decodeStaticReadObjectsResp(reader io.Reader) (op *ApbStaticReadObjectsResp
return return
} }
switch data[0] { switch data[0] {
case 0:
// error
err = decodeError(data[1:])
return
case 128: case 128:
// transaction response // transaction response
resp := &ApbStaticReadObjectsResp{} resp := &ApbStaticReadObjectsResp{}
...@@ -195,13 +176,3 @@ func decodeStaticReadObjectsResp(reader io.Reader) (op *ApbStaticReadObjectsResp ...@@ -195,13 +176,3 @@ func decodeStaticReadObjectsResp(reader io.Reader) (op *ApbStaticReadObjectsResp
err = fmt.Errorf("invalid message code: %d", data[0]) err = fmt.Errorf("invalid message code: %d", data[0])
return return
} }
func decodeError(data []byte) (err error) {
resp := &ApbErrorResp{}
err = proto.Unmarshal(data, resp)
if err != nil {
return
}
err = fmt.Errorf("antidote error code %d, %s", resp.Errcode, string(resp.Errmsg))
return
}
...@@ -121,7 +121,6 @@ func (tx *InteractiveTransaction) Abort() error { ...@@ -121,7 +121,6 @@ func (tx *InteractiveTransaction) Abort() error {
return nil return nil
} }
// Pseudo transaction to issue reads and updated without starting an interactive transaction. // Pseudo transaction to issue reads and updated without starting an interactive transaction.
// Can be interpreted as starting a transaction for each read or update and directly committing it. // Can be interpreted as starting a transaction for each read or update and directly committing it.
type StaticTransaction struct { type StaticTransaction struct {
...@@ -152,7 +151,6 @@ func (tx *StaticTransaction) Update(updates ...*ApbUpdateOp) error { ...@@ -152,7 +151,6 @@ func (tx *StaticTransaction) Update(updates ...*ApbUpdateOp) error {
return nil return nil
} }
func (tx *StaticTransaction) Read(objects ...*ApbBoundObject) (resp *ApbReadObjectsResp, err error) { func (tx *StaticTransaction) Read(objects ...*ApbBoundObject) (resp *ApbReadObjectsResp, err error) {
apbRead := &ApbStaticReadObjects{ apbRead := &ApbStaticReadObjects{
Transaction: &ApbStartTransaction{Properties: &ApbTxnProperties{}}, Transaction: &ApbStartTransaction{Properties: &ApbTxnProperties{}},
...@@ -174,7 +172,6 @@ func (tx *StaticTransaction) Read(objects ...*ApbBoundObject) (resp *ApbReadObje ...@@ -174,7 +172,6 @@ func (tx *StaticTransaction) Read(objects ...*ApbBoundObject) (resp *ApbReadObje
return sresp.Objects, nil return sresp.Objects, nil
} }
func (bucket *Bucket) ReadSet(tx Transaction, key Key) (val [][]byte, err error) { func (bucket *Bucket) ReadSet(tx Transaction, key Key) (val [][]byte, err error) {
crdtType := CRDTType_ORSET crdtType := CRDTType_ORSET
resp, err := tx.Read(&ApbBoundObject{Bucket: bucket.Bucket, Key: key, Type: &crdtType}) resp, err := tx.Read(&ApbBoundObject{Bucket: bucket.Bucket, Key: key, Type: &crdtType})
...@@ -196,7 +193,7 @@ func (bucket *Bucket) ReadReg(tx Transaction, key Key) (val []byte, err error) { ...@@ -196,7 +193,7 @@ func (bucket *Bucket) ReadReg(tx Transaction, key Key) (val []byte, err error) {
} }
func (bucket *Bucket) ReadMap(tx Transaction, key Key) (val *MapReadResult, err error) { func (bucket *Bucket) ReadMap(tx Transaction, key Key) (val *MapReadResult, err error) {
crdtType := CRDTType_AWMAP crdtType := CRDTType_RRMAP
resp, err := tx.Read(&ApbBoundObject{Bucket: bucket.Bucket, Key: key, Type: &crdtType}) resp, err := tx.Read(&ApbBoundObject{Bucket: bucket.Bucket, Key: key, Type: &crdtType})
if err != nil { if err != nil {
return return
...@@ -225,7 +222,6 @@ func (bucket *Bucket) ReadCounter(tx Transaction, key Key) (val int32, err error ...@@ -225,7 +222,6 @@ func (bucket *Bucket) ReadCounter(tx Transaction, key Key) (val int32, err error
return return
} }
// Represents the result of reading from a map object. // Represents the result of reading from a map object.
// Grants access to the keys of the map to access values of the nested CRDTs. // Grants access to the keys of the map to access values of the nested CRDTs.
type MapReadResult struct { type MapReadResult struct {
...@@ -255,7 +251,7 @@ func (mrr *MapReadResult) Reg(key Key) (val []byte, err error) { ...@@ -255,7 +251,7 @@ func (mrr *MapReadResult) Reg(key Key) (val []byte, err error) {
// Access the value of the nested add-wins map under the given key // Access the value of the nested add-wins map under the given key
func (mrr *MapReadResult) Map(key Key) (val *MapReadResult, err error) { func (mrr *MapReadResult) Map(key Key) (val *MapReadResult, err error) {
for _, me := range mrr.mapResp.Entries { for _, me := range mrr.mapResp.Entries {
if *me.Key.Type == CRDTType_AWMAP && bytes.Equal(me.Key.Key, key) { if *me.Key.Type == CRDTType_RRMAP && bytes.Equal(me.Key.Key, key) {
return &MapReadResult{mapResp: me.Value.Map}, nil return &MapReadResult{mapResp: me.Value.Map}, nil
} }
} }
...@@ -282,7 +278,6 @@ func (mrr *MapReadResult) Counter(key Key) (val int32, err error) { ...@@ -282,7 +278,6 @@ func (mrr *MapReadResult) Counter(key Key) (val int32, err error) {
return 0, fmt.Errorf("counter entry with key '%s' not found", key) return 0, fmt.Errorf("counter entry with key '%s' not found", key)
} }
// Represents updates that can be converted to top-level updates applicable to a bucket // Represents updates that can be converted to top-level updates applicable to a bucket
// or nested updates applicable to a map // or nested updates applicable to a map
type UpdateConverter interface { type UpdateConverter interface {
...@@ -300,12 +295,12 @@ type CRDTUpdate struct { ...@@ -300,12 +295,12 @@ type CRDTUpdate struct {
// A CRDTUpdater allows to apply updates in the context of a transaction. // A CRDTUpdater allows to apply updates in the context of a transaction.
type CRDTUpdater interface { type CRDTUpdater interface {
Update(tx Transaction, updates... *CRDTUpdate) error Update(tx Transaction, updates ...*CRDTUpdate) error
} }
func (bucket *Bucket) Update(tx Transaction, updates... *CRDTUpdate) error { func (bucket *Bucket) Update(tx Transaction, updates ...*CRDTUpdate) error {
updateOps := make([]*ApbUpdateOp, len(updates)) updateOps := make([]*ApbUpdateOp, len(updates))
for i,v := range updates { for i, v := range updates {
updateOps[i] = v.ConvertToToplevel(bucket.Bucket) updateOps[i] = v.ConvertToToplevel(bucket.Bucket)
} }
return tx.Update(updateOps...) return tx.Update(updateOps...)
...@@ -328,7 +323,7 @@ func (update *CRDTUpdate) ConvertToNested() *ApbMapNestedUpdate { ...@@ -328,7 +323,7 @@ func (update *CRDTUpdate) ConvertToNested() *ApbMapNestedUpdate {
// CRDT update operations // CRDT update operations
// Represents the update to add an element to an add-wins set // Represents the update to add an element to an add-wins set
func SetAdd(key Key, elems ... []byte) *CRDTUpdate { func SetAdd(key Key, elems ...[]byte) *CRDTUpdate {
optype := ApbSetUpdate_ADD optype := ApbSetUpdate_ADD
return &CRDTUpdate{ return &CRDTUpdate{
Key: key, Key: key,
...@@ -340,7 +335,7 @@ func SetAdd(key Key, elems ... []byte) *CRDTUpdate { ...@@ -340,7 +335,7 @@ func SetAdd(key Key, elems ... []byte) *CRDTUpdate {
} }
// Represents the update to remove an element from an add-wins set // Represents the update to remove an element from an add-wins set
func SetRemove(key Key, elems ... []byte) *CRDTUpdate { func SetRemove(key Key, elems ...[]byte) *CRDTUpdate {
optype := ApbSetUpdate_REMOVE optype := ApbSetUpdate_REMOVE
return &CRDTUpdate{ return &CRDTUpdate{
Key: key, Key: key,
...@@ -385,14 +380,14 @@ func MVRegPut(key Key, value []byte) *CRDTUpdate { ...@@ -385,14 +380,14 @@ func MVRegPut(key Key, value []byte) *CRDTUpdate {