Developing lightweight computation at the DSG edge

Commit 2a059878 authored by Mathias Weber's avatar Mathias Weber
Browse files

implemented static updates/reads

parent 9167a5da
...@@ -98,7 +98,7 @@ func (client *Client) StartTransaction() (tx *InteractiveTransaction, err error) ...@@ -98,7 +98,7 @@ func (client *Client) StartTransaction() (tx *InteractiveTransaction, err error)
return return
} }
func (tx *InteractiveTransaction) update(updates ...*ApbUpdateOp) (op *ApbOperationResp, err error) { func (tx *InteractiveTransaction) Update(updates ...*ApbUpdateOp) (op *ApbOperationResp, err error) {
apbUpdate := &ApbUpdateObjects{ apbUpdate := &ApbUpdateObjects{
Updates: updates, Updates: updates,
TransactionDescriptor: tx.txID, TransactionDescriptor: tx.txID,
...@@ -110,7 +110,25 @@ func (tx *InteractiveTransaction) update(updates ...*ApbUpdateOp) (op *ApbOperat ...@@ -110,7 +110,25 @@ func (tx *InteractiveTransaction) update(updates ...*ApbUpdateOp) (op *ApbOperat
return decodeOperationResp(tx.con) return decodeOperationResp(tx.con)
} }
func (tx *InteractiveTransaction) read(objects ...*ApbBoundObject) (resp *ApbReadObjectsResp, err error) { func (client *Client) StaticUpdate(updates ...*ApbUpdateOp) (op *ApbCommitResp, err error) {
apbStaticUpdate := &ApbStaticUpdateObjects{
Transaction: &ApbStartTransaction{Properties: &ApbTxnProperties{}},
Updates: updates,
}
con, err := client.getConnection()
if err != nil {
return
}
err = apbStaticUpdate.encode(con)
if err != nil {
return
}
op, err = decodeCommitResp(con)
con.Close()
return
}
func (tx *InteractiveTransaction) Read(objects ...*ApbBoundObject) (resp *ApbReadObjectsResp, err error) {
apbUpdate := &ApbReadObjects{ apbUpdate := &ApbReadObjects{
TransactionDescriptor: tx.txID, TransactionDescriptor: tx.txID,
Boundobjects: objects, Boundobjects: objects,
...@@ -122,20 +140,42 @@ func (tx *InteractiveTransaction) read(objects ...*ApbBoundObject) (resp *ApbRea ...@@ -122,20 +140,42 @@ func (tx *InteractiveTransaction) read(objects ...*ApbBoundObject) (resp *ApbRea
return decodeReadObjectsResp(tx.con) return decodeReadObjectsResp(tx.con)
} }
func (tx *InteractiveTransaction) commit() (op *ApbCommitResp, err error) { func (client *Client) StaticRead(objects ...*ApbBoundObject) (resp *ApbStaticReadObjectsResp, err error) {
apbRead := &ApbStaticReadObjects{
Transaction: &ApbStartTransaction{Properties: &ApbTxnProperties{}},
Objects: objects,
}
con, err := client.getConnection()
if err != nil {
return
}
err = apbRead.encode(con)
if err != nil {
return
}
resp, err = decodeStaticReadObjectsResp(con)
con.Close()
return
}
func (tx *InteractiveTransaction) Commit() (op *ApbCommitResp, err error) {
msg := &ApbCommitTransaction{TransactionDescriptor: tx.txID} msg := &ApbCommitTransaction{TransactionDescriptor: tx.txID}
err = msg.encode(tx.con) err = msg.encode(tx.con)
if err != nil { if err != nil {
return return
} }
return decodeCommitResp(tx.con) op, err = decodeCommitResp(tx.con)
tx.con.Close()
return
} }
func (tx *InteractiveTransaction) abort() (op *ApbOperationResp, err error) { func (tx *InteractiveTransaction) Abort() (op *ApbOperationResp, err error) {
msg := &ApbAbortTransaction{TransactionDescriptor: tx.txID} msg := &ApbAbortTransaction{TransactionDescriptor: tx.txID}
err = msg.encode(tx.con) err = msg.encode(tx.con)
if err != nil { if err != nil {
return return
} }
return decodeOperationResp(tx.con) op, err = decodeOperationResp(tx.con)
tx.con.Close()
return
} }
...@@ -23,20 +23,47 @@ func TestSimple(t *testing.T) { ...@@ -23,20 +23,47 @@ func TestSimple(t *testing.T) {
Key: []byte("key"), Key: []byte("key"),
Type: &crdtType} Type: &crdtType}
one := int64(1) one := int64(1)
tx.update(&ApbUpdateOp{ tx.Update(&ApbUpdateOp{
Boundobject: key, Boundobject: key,
Operation: &ApbUpdateOperation{Counterop: &ApbCounterUpdate{Inc: &one}}, Operation: &ApbUpdateOperation{Counterop: &ApbCounterUpdate{Inc: &one}},
}) })
resp, err := tx.read(key) resp, err := tx.Read(key)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
fmt.Print(resp.Objects[0]) fmt.Print(resp.Objects[0])
_, err = tx.commit() _, err = tx.Commit()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
} }
func TestStatic(t *testing.T) {
client, err := NewClient(Host{"127.0.0.1", 8087})
if err != nil {
t.Fatal(err)
}
crdtType := CRDTType_COUNTER
key := &ApbBoundObject{
Bucket: []byte("bucket"),
Key: []byte("keyStatic"),
Type: &crdtType}
one := int64(1)
_, err = client.StaticUpdate(&ApbUpdateOp{
Boundobject: key,
Operation: &ApbUpdateOperation{Counterop: &ApbCounterUpdate{Inc: &one}},
})
if err != nil {
t.Fatal(err)
}
resp, err := client.StaticRead(key)
if err != nil {
t.Fatal(err)
}
fmt.Print(resp.Objects.Objects[0])
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment