Developing lightweight computation at the DSG edge

Commit 4fece62f authored by Mathias Weber's avatar Mathias Weber
Browse files

high-level interface

parent ec128966
......@@ -66,10 +66,7 @@ type Connection struct {
pool pool.Pool
}
type InteractiveTransaction struct {
txID []byte
con *Connection
}
func (client *Client) StartTransaction() (tx *InteractiveTransaction, err error) {
con, err := client.getConnection()
......@@ -98,84 +95,6 @@ func (client *Client) StartTransaction() (tx *InteractiveTransaction, err error)
return
}
func (tx *InteractiveTransaction) Update(updates ...*ApbUpdateOp) (op *ApbOperationResp, err error) {
apbUpdate := &ApbUpdateObjects{
Updates: updates,
TransactionDescriptor: tx.txID,
}
err = apbUpdate.encode(tx.con)
if err != nil {
return
}
return decodeOperationResp(tx.con)
}
func (client *Client) StaticUpdate(updates ...*ApbUpdateOp) (op *ApbCommitResp, err error) {
apbStaticUpdate := &ApbStaticUpdateObjects{
Transaction: &ApbStartTransaction{Properties: &ApbTxnProperties{}},
Updates: updates,
}
con, err := client.getConnection()
if err != nil {
return
}
err = apbStaticUpdate.encode(con)
if err != nil {
return
}
op, err = decodeCommitResp(con)
con.Close()
return
}
func (tx *InteractiveTransaction) Read(objects ...*ApbBoundObject) (resp *ApbReadObjectsResp, err error) {
apbUpdate := &ApbReadObjects{
TransactionDescriptor: tx.txID,
Boundobjects: objects,
}
err = apbUpdate.encode(tx.con)
if err != nil {
return
}
return decodeReadObjectsResp(tx.con)
}
func (client *Client) StaticRead(objects ...*ApbBoundObject) (resp *ApbStaticReadObjectsResp, err error) {
apbRead := &ApbStaticReadObjects{
Transaction: &ApbStartTransaction{Properties: &ApbTxnProperties{}},
Objects: objects,
}
con, err := client.getConnection()
if err != nil {
return
}
err = apbRead.encode(con)
if err != nil {
return
}
resp, err = decodeStaticReadObjectsResp(con)
con.Close()
return
}
func (tx *InteractiveTransaction) Commit() (op *ApbCommitResp, err error) {
msg := &ApbCommitTransaction{TransactionDescriptor: tx.txID}
err = msg.encode(tx.con)
if err != nil {
return
}
op, err = decodeCommitResp(tx.con)
tx.con.Close()
return
}
func (tx *InteractiveTransaction) Abort() (op *ApbOperationResp, err error) {
msg := &ApbAbortTransaction{TransactionDescriptor: tx.txID}
err = msg.encode(tx.con)
if err != nil {
return
}
op, err = decodeOperationResp(tx.con)
tx.con.Close()
return
func (client *Client) CreateStaticTransaction() *StaticTransaction {
return &StaticTransaction{client: client}
}
......@@ -13,34 +13,30 @@ func TestSimple(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer client.Close()
tx, err := client.StartTransaction()
if err != nil {
t.Fatal(err)
}
crdtType := CRDTType_ORSET
key := &ApbBoundObject{
Bucket: []byte("bucket"),
Key: []byte("keySet"),
Type: &crdtType}
addType := ApbSetUpdate_ADD
tx.Update(&ApbUpdateOp{
Boundobject: key,
Operation: &ApbUpdateOperation{Setop: &ApbSetUpdate{Optype: &addType, Adds: [][]byte{[]byte("test1")}}},
})
resp, err := tx.Read(key)
bucket := Bucket{[]byte("bucket")}
key := []byte("keyCounter")
_, err = bucket.Update(tx, CounterInc(key, 1))
if err != nil {
t.Fatal(err)
}
fmt.Print(resp.Objects[0])
counterVal, err := bucket.ReadCounter(tx, key)
if err != nil {
t.Fatal(err)
}
fmt.Print(counterVal)
_, err = tx.Commit()
if err != nil {
t.Fatal(err)
}
client.Close()
}
func TestSetUpdate(t *testing.T) {
......@@ -49,27 +45,30 @@ func TestSetUpdate(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer client.Close()
tx, err := client.StartTransaction()
if err != nil {
t.Fatal(err)
}
crdtType := CRDTType_ORSET
key := &ApbBoundObject{
Bucket: []byte("bucket"),
Key: []byte("keySet"),
Type: &crdtType}
addType := ApbSetUpdate_ADD
tx.Update(&ApbUpdateOp{
Boundobject: key,
Operation: &ApbUpdateOperation{Setop: &ApbSetUpdate{Optype: &addType, Adds: [][]byte{[]byte("test1")}}},
})
resp, err := tx.Read(key)
bucket := Bucket{[]byte("bucket")}
key := []byte("keySet")
_, err = bucket.Update(tx, SetAdd(key, []byte("test2")))
if err != nil {
t.Fatal(err)
}
fmt.Print(resp.Objects[0])
setVal, err := bucket.ReadSet(tx, key)
if err != nil {
t.Fatal(err)
}
for _,v := range setVal {
fmt.Println(string(v))
}
_, err = tx.Commit()
if err != nil {
......@@ -83,13 +82,10 @@ func TestManyUpdates(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer client.Close()
crdtType := CRDTType_COUNTER
key := &ApbBoundObject{
Bucket: []byte("bucket"),
Key: []byte("keyMany"),
Type: &crdtType}
one := int64(1)
bucket := Bucket{[]byte("bucket")}
key := []byte("keyMany")
wg := sync.WaitGroup{}
wg.Add(10)
......@@ -101,13 +97,13 @@ func TestManyUpdates(t *testing.T) {
if err != nil {
t.Fatal(err)
}
_, err = tx.Update(&ApbUpdateOp{
Boundobject: key,
Operation: &ApbUpdateOperation{Counterop: &ApbCounterUpdate{Inc: &one}},
})
updateResp, err := bucket.Update(tx, CounterInc(key, 1))
if err != nil {
t.Fatal(err)
}
if !(*updateResp.Success) {
fmt.Printf("Update #%d not successful\n", i)
}
_, err = tx.Commit()
if err != nil {
t.Fatal(err)
......@@ -120,12 +116,31 @@ func TestManyUpdates(t *testing.T) {
}
wg.Wait()
resp, err := client.StaticRead(key)
tx := client.CreateStaticTransaction()
counterVal, err := bucket.ReadCounter(tx, key)
if err != nil {
t.Fatal(err)
}
fmt.Print(resp.Objects.Objects[0])
fmt.Print(counterVal)
}
func TestReadMany(t *testing.T) {
client, err := NewClient(Host{"127.0.0.1", 8087})
if err != nil {
t.Fatal(err)
}
defer client.Close()
bucket := Bucket{[]byte("bucket")}
key := []byte("keyMany")
tx := client.CreateStaticTransaction()
counterVal, err := bucket.ReadCounter(tx, key)
if err != nil {
t.Fatal(err)
}
fmt.Print(counterVal)
}
func TestStatic(t *testing.T) {
......@@ -133,24 +148,19 @@ func TestStatic(t *testing.T) {
if err != nil {
t.Fatal(err)
}
bucket := Bucket{[]byte("bucket")}
key := []byte("keyStatic")
tx := client.CreateStaticTransaction()
crdtType := CRDTType_COUNTER
key := &ApbBoundObject{
Bucket: []byte("bucket"),
Key: []byte("keyStatic"),
Type: &crdtType}
one := int64(1)
_, err = client.StaticUpdate(&ApbUpdateOp{
Boundobject: key,
Operation: &ApbUpdateOperation{Counterop: &ApbCounterUpdate{Inc: &one}},
})
_, err = bucket.Update(tx, CounterInc(key, 1))
if err != nil {
t.Fatal(err)
}
resp, err := client.StaticRead(key)
counterVal, err := bucket.ReadCounter(tx, key)
if err != nil {
t.Fatal(err)
}
fmt.Print(resp.Objects.Objects[0])
fmt.Print(counterVal)
}
package antidoteclient
import (
"bytes"
"fmt"
)
type Bucket struct {
Bucket []byte
}
type Transaction interface {
Read(objects ...*ApbBoundObject) (resp *ApbReadObjectsResp, err error)
Update(updates ...*ApbUpdateOp) (op *ApbOperationResp, err error)
}
type CRDTReader interface {
ReadSet(tx Transaction, key []byte) (val [][]byte, err error)
ReadReg(tx Transaction, key []byte) (val []byte, err error)
ReadMap(tx Transaction, key []byte) (val *MapReadResult, err error)
ReadMVReg(tx Transaction, key []byte) (val [][]byte, err error)
ReadCounter(tx Transaction, key []byte) (val int32, err error)
}
type InteractiveTransaction struct {
txID []byte
con *Connection
commited bool
}
func (tx *InteractiveTransaction) Update(updates ...*ApbUpdateOp) (op *ApbOperationResp, err error) {
apbUpdate := &ApbUpdateObjects{
Updates: updates,
TransactionDescriptor: tx.txID,
}
err = apbUpdate.encode(tx.con)
if err != nil {
return
}
return decodeOperationResp(tx.con)
}
func (tx *InteractiveTransaction) Read(objects ...*ApbBoundObject) (resp *ApbReadObjectsResp, err error) {
apbUpdate := &ApbReadObjects{
TransactionDescriptor: tx.txID,
Boundobjects: objects,
}
err = apbUpdate.encode(tx.con)
if err != nil {
return
}
return decodeReadObjectsResp(tx.con)
}
func (tx *InteractiveTransaction) Commit() (op *ApbCommitResp, err error) {
msg := &ApbCommitTransaction{TransactionDescriptor: tx.txID}
err = msg.encode(tx.con)
if err != nil {
return
}
op, err = decodeCommitResp(tx.con)
tx.con.Close()
return
}
func (tx *InteractiveTransaction) Abort() (op *ApbOperationResp, err error) {
msg := &ApbAbortTransaction{TransactionDescriptor: tx.txID}
err = msg.encode(tx.con)
if err != nil {
return
}
op, err = decodeOperationResp(tx.con)
tx.con.Close()
return
}
type StaticTransaction struct {
client *Client
}
func (tx *StaticTransaction) Update(updates ...*ApbUpdateOp) (op *ApbOperationResp, err error) {
apbStaticUpdate := &ApbStaticUpdateObjects{
Transaction: &ApbStartTransaction{Properties: &ApbTxnProperties{}},
Updates: updates,
}
con, err := tx.client.getConnection()
if err != nil {
return
}
err = apbStaticUpdate.encode(con)
if err != nil {
return
}
resp, err := decodeCommitResp(con)
con.Close()
if err != nil {
return
}
return &ApbOperationResp{Success: resp.Success, Errorcode: resp.Errorcode}, nil
}
func (tx *StaticTransaction) Read(objects ...*ApbBoundObject) (resp *ApbReadObjectsResp, err error) {
apbRead := &ApbStaticReadObjects{
Transaction: &ApbStartTransaction{Properties: &ApbTxnProperties{}},
Objects: objects,
}
con, err := tx.client.getConnection()
if err != nil {
return
}
err = apbRead.encode(con)
if err != nil {
return
}
sresp, err := decodeStaticReadObjectsResp(con)
con.Close()
if err != nil {
return
}
return sresp.Objects, nil
}
func (bucket *Bucket) ReadSet(tx Transaction, key []byte) (val [][]byte, err error) {
crdtType := CRDTType_ORSET
resp, err := tx.Read(&ApbBoundObject{Bucket: bucket.Bucket, Key: key, Type: &crdtType})
if err != nil {
return
}
val = resp.Objects[0].Set.Value
return
}
func (bucket *Bucket) ReadReg(tx Transaction, key []byte) (val []byte, err error) {
crdtType := CRDTType_LWWREG
resp, err := tx.Read(&ApbBoundObject{Bucket: bucket.Bucket, Key: key, Type: &crdtType})
if err != nil {
return
}
val = resp.Objects[0].Reg.Value
return
}
func (bucket *Bucket) ReadMap(tx Transaction, key []byte) (val *MapReadResult, err error) {
crdtType := CRDTType_AWMAP
resp, err := tx.Read(&ApbBoundObject{Bucket: bucket.Bucket, Key: key, Type: &crdtType})
if err != nil {
return
}
val = &MapReadResult{mapResp: resp.Objects[0].Map}
return
}
func (bucket *Bucket) ReadMVReg(tx Transaction, key []byte) (val [][]byte, err error) {
crdtType := CRDTType_MVREG
resp, err := tx.Read(&ApbBoundObject{Bucket: bucket.Bucket, Key: key, Type: &crdtType})
if err != nil {
return
}
val = resp.Objects[0].Mvreg.Values
return
}
func (bucket *Bucket) ReadCounter(tx Transaction, key []byte) (val int32, err error) {
crdtType := CRDTType_COUNTER
resp, err := tx.Read(&ApbBoundObject{Bucket: bucket.Bucket, Key: key, Type: &crdtType})
if err != nil {
return
}
val = *resp.Objects[0].Counter.Value
return
}
func (mrr *MapReadResult) Set(key []byte) (val [][]byte, err error) {
for _, me := range mrr.mapResp.Entries {
if *me.Key.Type == CRDTType_ORSET && bytes.Equal(me.Key.Key, key) {
return me.Value.Set.Value, nil
}
}
return nil, fmt.Errorf("set entry with key '%s' not found", key)
}
type MapReadResult struct {
mapResp *ApbGetMapResp
}
func (mrr *MapReadResult) Reg(key []byte) (val []byte, err error) {
for _, me := range mrr.mapResp.Entries {
if *me.Key.Type == CRDTType_LWWREG && bytes.Equal(me.Key.Key, key) {
return me.Value.Reg.Value, nil
}
}
return nil, fmt.Errorf("register entry with key '%s' not found", key)
}
func (mrr *MapReadResult) Map(key []byte) (val *MapReadResult, err error) {
for _, me := range mrr.mapResp.Entries {
if *me.Key.Type == CRDTType_AWMAP && bytes.Equal(me.Key.Key, key) {
return &MapReadResult{mapResp: me.Value.Map}, nil
}
}
return nil, fmt.Errorf("map entry with key '%s' not found", key)
}
func (mrr *MapReadResult) MVReg(key []byte) (val [][]byte, err error) {
for _, me := range mrr.mapResp.Entries {
if *me.Key.Type == CRDTType_MVREG && bytes.Equal(me.Key.Key, key) {
return me.Value.Mvreg.Values, nil
}
}
return nil, fmt.Errorf("map entry with key '%s' not found", key)
}
func (mrr *MapReadResult) Counter(key []byte) (val int32, err error) {
for _, me := range mrr.mapResp.Entries {
if *me.Key.Type == CRDTType_COUNTER && bytes.Equal(me.Key.Key, key) {
return *me.Value.Counter.Value, nil
}
}
return 0, fmt.Errorf("counter entry with key '%s' not found", key)
}
// Updates
type UpdateConverter interface {
ConvertToToplevel(bucket []byte) *ApbUpdateOp
ConvertToNested() *ApbMapNestedUpdate
}
type CRDTUpdate struct {
Update *ApbUpdateOperation
Key []byte
Type CRDTType
}
type CRDTUpdater interface {
Update(tx Transaction, updates... *CRDTUpdate) (resp *ApbOperationResp, err error)
}
func (bucket *Bucket) Update(tx Transaction, updates... *CRDTUpdate) (resp *ApbOperationResp, err error) {
updateOps := make([]*ApbUpdateOp, len(updates))
for i,v := range updates {
updateOps[i] = v.ConvertToToplevel(bucket.Bucket)
}
return tx.Update(updateOps...)
}
func (update *CRDTUpdate) ConvertToToplevel(bucket []byte) *ApbUpdateOp {
return &ApbUpdateOp{
Boundobject: &ApbBoundObject{Key: update.Key, Type: &update.Type, Bucket: bucket},
Operation: update.Update,
}
}
func (update *CRDTUpdate) ConvertToNested() *ApbMapNestedUpdate {
return &ApbMapNestedUpdate{
Key: &ApbMapKey{Key: update.Key, Type: &update.Type},
Update: update.Update,
}
}
// CRDT update operations
func SetAdd(key []byte, elems ... []byte) *CRDTUpdate {
optype := ApbSetUpdate_ADD
return &CRDTUpdate{
Key: key,
Type: CRDTType_ORSET,
Update: &ApbUpdateOperation{
Setop: &ApbSetUpdate{Adds: elems, Optype: &optype},
},
}
}
func SetRemove(key []byte, elems ... []byte) *CRDTUpdate {
optype := ApbSetUpdate_REMOVE
return &CRDTUpdate{
Key: key,
Type: CRDTType_ORSET,
Update: &ApbUpdateOperation{
Setop: &ApbSetUpdate{Adds: elems, Optype: &optype},
},
}
}
func CounterInc(key []byte, inc int64) *CRDTUpdate {
return &CRDTUpdate{
Key: key,
Type: CRDTType_COUNTER,
Update: &ApbUpdateOperation{
Counterop: &ApbCounterUpdate{Inc: &inc},
},
}
}
func IntegerInc(key []byte, inc int64) *CRDTUpdate {
return &CRDTUpdate{
Key: key,
Type: CRDTType_ORSET,
Update: &ApbUpdateOperation{
Integerop: &ApbIntegerUpdate{Inc: &inc},
},
}
}
func IntegerSet(key []byte, value int64) *CRDTUpdate {
return &CRDTUpdate{
Key: key,
Type: CRDTType_ORSET,
Update: &ApbUpdateOperation{
Integerop: &ApbIntegerUpdate{Set: &value},
},
}
}
func RegPut(key []byte, value []byte) *CRDTUpdate {
return &CRDTUpdate{
Key: key,
Type: CRDTType_LWWREG,
Update: &ApbUpdateOperation{
Regop: &ApbRegUpdate{Value: value},
},
}
}
func MVRegPut(key []byte, value []byte) *CRDTUpdate {
return &CRDTUpdate{
Key: key,
Type: CRDTType_MVREG,