Developing lightweight computation at the DSG edge

Commit 0129ca2b authored by Mathias Weber's avatar Mathias Weber
Browse files

type for keys

parent 4fece62f
...@@ -20,7 +20,7 @@ func TestSimple(t *testing.T) { ...@@ -20,7 +20,7 @@ func TestSimple(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
bucket := Bucket{[]byte("bucket")} bucket := Bucket{[]byte("bucket")}
key := []byte("keyCounter") key := Key("keyCounter")
_, err = bucket.Update(tx, CounterInc(key, 1)) _, err = bucket.Update(tx, CounterInc(key, 1))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
...@@ -53,9 +53,9 @@ func TestSetUpdate(t *testing.T) { ...@@ -53,9 +53,9 @@ func TestSetUpdate(t *testing.T) {
} }
bucket := Bucket{[]byte("bucket")} bucket := Bucket{[]byte("bucket")}
key := []byte("keySet") key := Key("keySet")
_, err = bucket.Update(tx, SetAdd(key, []byte("test2"))) _, err = bucket.Update(tx, SetAdd(key, []byte("test1")))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -85,7 +85,7 @@ func TestManyUpdates(t *testing.T) { ...@@ -85,7 +85,7 @@ func TestManyUpdates(t *testing.T) {
defer client.Close() defer client.Close()
bucket := Bucket{[]byte("bucket")} bucket := Bucket{[]byte("bucket")}
key := []byte("keyMany") key := Key("keyMany")
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(10) wg.Add(10)
...@@ -104,10 +104,13 @@ func TestManyUpdates(t *testing.T) { ...@@ -104,10 +104,13 @@ func TestManyUpdates(t *testing.T) {
if !(*updateResp.Success) { if !(*updateResp.Success) {
fmt.Printf("Update #%d not successful\n", i) fmt.Printf("Update #%d not successful\n", i)
} }
_, err = tx.Commit() commitResp, err := tx.Commit()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if !(*commitResp.Success) {
fmt.Printf("Commit #%d not successful\n", i)
}
if i%1000 == 0 { if i%1000 == 0 {
fmt.Println(i) fmt.Println(i)
} }
...@@ -133,7 +136,7 @@ func TestReadMany(t *testing.T) { ...@@ -133,7 +136,7 @@ func TestReadMany(t *testing.T) {
defer client.Close() defer client.Close()
bucket := Bucket{[]byte("bucket")} bucket := Bucket{[]byte("bucket")}
key := []byte("keyMany") key := Key("keyMany")
tx := client.CreateStaticTransaction() tx := client.CreateStaticTransaction()
counterVal, err := bucket.ReadCounter(tx, key) counterVal, err := bucket.ReadCounter(tx, key)
if err != nil { if err != nil {
...@@ -149,7 +152,7 @@ func TestStatic(t *testing.T) { ...@@ -149,7 +152,7 @@ func TestStatic(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
bucket := Bucket{[]byte("bucket")} bucket := Bucket{[]byte("bucket")}
key := []byte("keyStatic") key := Key("keyStatic")
tx := client.CreateStaticTransaction() tx := client.CreateStaticTransaction()
......
...@@ -14,12 +14,14 @@ type Transaction interface { ...@@ -14,12 +14,14 @@ type Transaction interface {
Update(updates ...*ApbUpdateOp) (op *ApbOperationResp, err error) Update(updates ...*ApbUpdateOp) (op *ApbOperationResp, err error)
} }
type Key []byte
type CRDTReader interface { type CRDTReader interface {
ReadSet(tx Transaction, key []byte) (val [][]byte, err error) ReadSet(tx Transaction, key Key) (val [][]byte, err error)
ReadReg(tx Transaction, key []byte) (val []byte, err error) ReadReg(tx Transaction, key Key) (val []byte, err error)
ReadMap(tx Transaction, key []byte) (val *MapReadResult, err error) ReadMap(tx Transaction, key Key) (val *MapReadResult, err error)
ReadMVReg(tx Transaction, key []byte) (val [][]byte, err error) ReadMVReg(tx Transaction, key Key) (val [][]byte, err error)
ReadCounter(tx Transaction, key []byte) (val int32, err error) ReadCounter(tx Transaction, key Key) (val int32, err error)
} }
type InteractiveTransaction struct { type InteractiveTransaction struct {
...@@ -124,7 +126,7 @@ func (tx *StaticTransaction) Read(objects ...*ApbBoundObject) (resp *ApbReadObje ...@@ -124,7 +126,7 @@ func (tx *StaticTransaction) Read(objects ...*ApbBoundObject) (resp *ApbReadObje
} }
func (bucket *Bucket) ReadSet(tx Transaction, key []byte) (val [][]byte, err error) { func (bucket *Bucket) ReadSet(tx Transaction, key Key) (val [][]byte, err error) {
crdtType := CRDTType_ORSET crdtType := CRDTType_ORSET
resp, err := tx.Read(&ApbBoundObject{Bucket: bucket.Bucket, Key: key, Type: &crdtType}) resp, err := tx.Read(&ApbBoundObject{Bucket: bucket.Bucket, Key: key, Type: &crdtType})
if err != nil { if err != nil {
...@@ -134,7 +136,7 @@ func (bucket *Bucket) ReadSet(tx Transaction, key []byte) (val [][]byte, err err ...@@ -134,7 +136,7 @@ func (bucket *Bucket) ReadSet(tx Transaction, key []byte) (val [][]byte, err err
return return
} }
func (bucket *Bucket) ReadReg(tx Transaction, key []byte) (val []byte, err error) { func (bucket *Bucket) ReadReg(tx Transaction, key Key) (val []byte, err error) {
crdtType := CRDTType_LWWREG crdtType := CRDTType_LWWREG
resp, err := tx.Read(&ApbBoundObject{Bucket: bucket.Bucket, Key: key, Type: &crdtType}) resp, err := tx.Read(&ApbBoundObject{Bucket: bucket.Bucket, Key: key, Type: &crdtType})
if err != nil { if err != nil {
...@@ -144,7 +146,7 @@ func (bucket *Bucket) ReadReg(tx Transaction, key []byte) (val []byte, err error ...@@ -144,7 +146,7 @@ func (bucket *Bucket) ReadReg(tx Transaction, key []byte) (val []byte, err error
return return
} }
func (bucket *Bucket) ReadMap(tx Transaction, key []byte) (val *MapReadResult, err error) { func (bucket *Bucket) ReadMap(tx Transaction, key Key) (val *MapReadResult, err error) {
crdtType := CRDTType_AWMAP crdtType := CRDTType_AWMAP
resp, err := tx.Read(&ApbBoundObject{Bucket: bucket.Bucket, Key: key, Type: &crdtType}) resp, err := tx.Read(&ApbBoundObject{Bucket: bucket.Bucket, Key: key, Type: &crdtType})
if err != nil { if err != nil {
...@@ -154,7 +156,7 @@ func (bucket *Bucket) ReadMap(tx Transaction, key []byte) (val *MapReadResult, e ...@@ -154,7 +156,7 @@ func (bucket *Bucket) ReadMap(tx Transaction, key []byte) (val *MapReadResult, e
return return
} }
func (bucket *Bucket) ReadMVReg(tx Transaction, key []byte) (val [][]byte, err error) { func (bucket *Bucket) ReadMVReg(tx Transaction, key Key) (val [][]byte, err error) {
crdtType := CRDTType_MVREG crdtType := CRDTType_MVREG
resp, err := tx.Read(&ApbBoundObject{Bucket: bucket.Bucket, Key: key, Type: &crdtType}) resp, err := tx.Read(&ApbBoundObject{Bucket: bucket.Bucket, Key: key, Type: &crdtType})
if err != nil { if err != nil {
...@@ -164,7 +166,7 @@ func (bucket *Bucket) ReadMVReg(tx Transaction, key []byte) (val [][]byte, err e ...@@ -164,7 +166,7 @@ func (bucket *Bucket) ReadMVReg(tx Transaction, key []byte) (val [][]byte, err e
return return
} }
func (bucket *Bucket) ReadCounter(tx Transaction, key []byte) (val int32, err error) { func (bucket *Bucket) ReadCounter(tx Transaction, key Key) (val int32, err error) {
crdtType := CRDTType_COUNTER crdtType := CRDTType_COUNTER
resp, err := tx.Read(&ApbBoundObject{Bucket: bucket.Bucket, Key: key, Type: &crdtType}) resp, err := tx.Read(&ApbBoundObject{Bucket: bucket.Bucket, Key: key, Type: &crdtType})
if err != nil { if err != nil {
...@@ -174,7 +176,7 @@ func (bucket *Bucket) ReadCounter(tx Transaction, key []byte) (val int32, err er ...@@ -174,7 +176,7 @@ func (bucket *Bucket) ReadCounter(tx Transaction, key []byte) (val int32, err er
return return
} }
func (mrr *MapReadResult) Set(key []byte) (val [][]byte, err error) { func (mrr *MapReadResult) Set(key Key) (val [][]byte, err error) {
for _, me := range mrr.mapResp.Entries { for _, me := range mrr.mapResp.Entries {
if *me.Key.Type == CRDTType_ORSET && bytes.Equal(me.Key.Key, key) { if *me.Key.Type == CRDTType_ORSET && bytes.Equal(me.Key.Key, key) {
return me.Value.Set.Value, nil return me.Value.Set.Value, nil
...@@ -189,7 +191,7 @@ type MapReadResult struct { ...@@ -189,7 +191,7 @@ type MapReadResult struct {
mapResp *ApbGetMapResp mapResp *ApbGetMapResp
} }
func (mrr *MapReadResult) Reg(key []byte) (val []byte, err error) { func (mrr *MapReadResult) Reg(key Key) (val []byte, err error) {
for _, me := range mrr.mapResp.Entries { for _, me := range mrr.mapResp.Entries {
if *me.Key.Type == CRDTType_LWWREG && bytes.Equal(me.Key.Key, key) { if *me.Key.Type == CRDTType_LWWREG && bytes.Equal(me.Key.Key, key) {
return me.Value.Reg.Value, nil return me.Value.Reg.Value, nil
...@@ -198,7 +200,7 @@ func (mrr *MapReadResult) Reg(key []byte) (val []byte, err error) { ...@@ -198,7 +200,7 @@ func (mrr *MapReadResult) Reg(key []byte) (val []byte, err error) {
return nil, fmt.Errorf("register entry with key '%s' not found", key) return nil, fmt.Errorf("register entry with key '%s' not found", key)
} }
func (mrr *MapReadResult) Map(key []byte) (val *MapReadResult, err error) { func (mrr *MapReadResult) Map(key Key) (val *MapReadResult, err error) {
for _, me := range mrr.mapResp.Entries { for _, me := range mrr.mapResp.Entries {
if *me.Key.Type == CRDTType_AWMAP && bytes.Equal(me.Key.Key, key) { if *me.Key.Type == CRDTType_AWMAP && bytes.Equal(me.Key.Key, key) {
return &MapReadResult{mapResp: me.Value.Map}, nil return &MapReadResult{mapResp: me.Value.Map}, nil
...@@ -207,7 +209,7 @@ func (mrr *MapReadResult) Map(key []byte) (val *MapReadResult, err error) { ...@@ -207,7 +209,7 @@ func (mrr *MapReadResult) Map(key []byte) (val *MapReadResult, err error) {
return nil, fmt.Errorf("map entry with key '%s' not found", key) return nil, fmt.Errorf("map entry with key '%s' not found", key)
} }
func (mrr *MapReadResult) MVReg(key []byte) (val [][]byte, err error) { func (mrr *MapReadResult) MVReg(key Key) (val [][]byte, err error) {
for _, me := range mrr.mapResp.Entries { for _, me := range mrr.mapResp.Entries {
if *me.Key.Type == CRDTType_MVREG && bytes.Equal(me.Key.Key, key) { if *me.Key.Type == CRDTType_MVREG && bytes.Equal(me.Key.Key, key) {
return me.Value.Mvreg.Values, nil return me.Value.Mvreg.Values, nil
...@@ -216,7 +218,7 @@ func (mrr *MapReadResult) MVReg(key []byte) (val [][]byte, err error) { ...@@ -216,7 +218,7 @@ func (mrr *MapReadResult) MVReg(key []byte) (val [][]byte, err error) {
return nil, fmt.Errorf("map entry with key '%s' not found", key) return nil, fmt.Errorf("map entry with key '%s' not found", key)
} }
func (mrr *MapReadResult) Counter(key []byte) (val int32, err error) { func (mrr *MapReadResult) Counter(key Key) (val int32, err error) {
for _, me := range mrr.mapResp.Entries { for _, me := range mrr.mapResp.Entries {
if *me.Key.Type == CRDTType_COUNTER && bytes.Equal(me.Key.Key, key) { if *me.Key.Type == CRDTType_COUNTER && bytes.Equal(me.Key.Key, key) {
return *me.Value.Counter.Value, nil return *me.Value.Counter.Value, nil
...@@ -265,7 +267,7 @@ func (update *CRDTUpdate) ConvertToNested() *ApbMapNestedUpdate { ...@@ -265,7 +267,7 @@ func (update *CRDTUpdate) ConvertToNested() *ApbMapNestedUpdate {
// CRDT update operations // CRDT update operations
func SetAdd(key []byte, elems ... []byte) *CRDTUpdate { func SetAdd(key Key, elems ... []byte) *CRDTUpdate {
optype := ApbSetUpdate_ADD optype := ApbSetUpdate_ADD
return &CRDTUpdate{ return &CRDTUpdate{
Key: key, Key: key,
...@@ -276,7 +278,7 @@ func SetAdd(key []byte, elems ... []byte) *CRDTUpdate { ...@@ -276,7 +278,7 @@ func SetAdd(key []byte, elems ... []byte) *CRDTUpdate {
} }
} }
func SetRemove(key []byte, elems ... []byte) *CRDTUpdate { func SetRemove(key Key, elems ... []byte) *CRDTUpdate {
optype := ApbSetUpdate_REMOVE optype := ApbSetUpdate_REMOVE
return &CRDTUpdate{ return &CRDTUpdate{
Key: key, Key: key,
...@@ -287,7 +289,7 @@ func SetRemove(key []byte, elems ... []byte) *CRDTUpdate { ...@@ -287,7 +289,7 @@ func SetRemove(key []byte, elems ... []byte) *CRDTUpdate {
} }
} }
func CounterInc(key []byte, inc int64) *CRDTUpdate { func CounterInc(key Key, inc int64) *CRDTUpdate {
return &CRDTUpdate{ return &CRDTUpdate{
Key: key, Key: key,
Type: CRDTType_COUNTER, Type: CRDTType_COUNTER,
...@@ -297,7 +299,7 @@ func CounterInc(key []byte, inc int64) *CRDTUpdate { ...@@ -297,7 +299,7 @@ func CounterInc(key []byte, inc int64) *CRDTUpdate {
} }
} }
func IntegerInc(key []byte, inc int64) *CRDTUpdate { func IntegerInc(key Key, inc int64) *CRDTUpdate {
return &CRDTUpdate{ return &CRDTUpdate{
Key: key, Key: key,
Type: CRDTType_ORSET, Type: CRDTType_ORSET,
...@@ -307,7 +309,7 @@ func IntegerInc(key []byte, inc int64) *CRDTUpdate { ...@@ -307,7 +309,7 @@ func IntegerInc(key []byte, inc int64) *CRDTUpdate {
} }
} }
func IntegerSet(key []byte, value int64) *CRDTUpdate { func IntegerSet(key Key, value int64) *CRDTUpdate {
return &CRDTUpdate{ return &CRDTUpdate{
Key: key, Key: key,
Type: CRDTType_ORSET, Type: CRDTType_ORSET,
...@@ -317,7 +319,7 @@ func IntegerSet(key []byte, value int64) *CRDTUpdate { ...@@ -317,7 +319,7 @@ func IntegerSet(key []byte, value int64) *CRDTUpdate {
} }
} }
func RegPut(key []byte, value []byte) *CRDTUpdate { func RegPut(key Key, value []byte) *CRDTUpdate {
return &CRDTUpdate{ return &CRDTUpdate{
Key: key, Key: key,
Type: CRDTType_LWWREG, Type: CRDTType_LWWREG,
...@@ -327,7 +329,7 @@ func RegPut(key []byte, value []byte) *CRDTUpdate { ...@@ -327,7 +329,7 @@ func RegPut(key []byte, value []byte) *CRDTUpdate {
} }
} }
func MVRegPut(key []byte, value []byte) *CRDTUpdate { func MVRegPut(key Key, value []byte) *CRDTUpdate {
return &CRDTUpdate{ return &CRDTUpdate{
Key: key, Key: key,
Type: CRDTType_MVREG, Type: CRDTType_MVREG,
...@@ -337,7 +339,7 @@ func MVRegPut(key []byte, value []byte) *CRDTUpdate { ...@@ -337,7 +339,7 @@ func MVRegPut(key []byte, value []byte) *CRDTUpdate {
} }
} }
func MapUpdate(key []byte, updates ... *CRDTUpdate) *CRDTUpdate { func MapUpdate(key Key, updates ... *CRDTUpdate) *CRDTUpdate {
nupdates := make([]*ApbMapNestedUpdate, len(updates)) nupdates := make([]*ApbMapNestedUpdate, len(updates))
for i,v := range updates { for i,v := range updates {
nupdates[i] = v.ConvertToNested() nupdates[i] = v.ConvertToNested()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment