Developing lightweight computation at the DSG edge

Commit 7683bbe1 authored by Mathias Weber's avatar Mathias Weber
Browse files

documentation

parent 0129ca2b
......@@ -11,15 +11,21 @@ import (
const INITIAL_POOL_SIZE = 1
const MAX_POOL_SIZE = 50
// Represents connections to the Antidote database.
// Allows to start/create transaction.
type Client struct {
pools []pool.Pool
}
// Represents an Antidote server.
// The port needs to be the port of the protocol-buffer interface (usually 8087)
type Host struct {
Name string
Port int
}
// Recreates a new Antidote client connected to the given Antidote servers.
// Remember to close the client to clean-up the connections in the connection pool
func NewClient(hosts ...Host) (client *Client, err error) {
pools := make([]pool.Pool, len(hosts))
for i, h := range hosts {
......@@ -35,13 +41,14 @@ func NewClient(hosts ...Host) (client *Client, err error) {
return
}
// Call close after using the client to clean up the connections int he connection pool and release resources.
func (client *Client) Close() {
for _, p := range client.pools {
p.Close()
}
}
func (client *Client) getConnection() (c *Connection, err error) {
func (client *Client) getConnection() (c *connection, err error) {
// maybe make this global?
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for _, i := range r.Perm(len(client.pools)) {
......@@ -50,7 +57,7 @@ func (client *Client) getConnection() (c *Connection, err error) {
if err != nil {
return nil, err
}
c = &Connection{
c = &connection{
Conn: con,
pool: p,
}
......@@ -61,13 +68,15 @@ func (client *Client) getConnection() (c *Connection, err error) {
}
// a close already puts the connection back into the right pool
type Connection struct {
type connection struct {
net.Conn
pool pool.Pool
}
// Starts an interactive transaction and registers it on the Antidote server.
// The connection used to issue reads and updates is sticky;
// interactive transactions are only valid local to the server they are started on.
func (client *Client) StartTransaction() (tx *InteractiveTransaction, err error) {
con, err := client.getConnection()
if err != nil {
......@@ -95,6 +104,7 @@ func (client *Client) StartTransaction() (tx *InteractiveTransaction, err error)
return
}
// Creates a static transaction object. Does not communicate with the Antidote server.
func (client *Client) CreateStaticTransaction() *StaticTransaction {
return &StaticTransaction{client: client}
}
......@@ -5,41 +5,65 @@ import (
"fmt"
)
// Represents a bucket in the Antidote database.
// Offers a high-level interface to issue read and write operations on objects in the bucket.
type Bucket struct {
Bucket []byte
}
// A transaction object offers low-level mechanisms to send protocol-buffer messages to Antidote in the context of
// a highly-available transaction.
// Typical representatives are interactive transactions handled by Antidote and static transactions handled on the client side.
type Transaction interface {
Read(objects ...*ApbBoundObject) (resp *ApbReadObjectsResp, err error)
Update(updates ...*ApbUpdateOp) (op *ApbOperationResp, err error)
Update(updates ...*ApbUpdateOp) error
}
// Type alias for byte-slices.
// Used to represent keys of objects in buckets and maps
type Key []byte
// A CRDTReader allows to read the value of objects identified by keys in the context of a transaction.
type CRDTReader interface {
// Read the value of a add-wins set identified by the given key
ReadSet(tx Transaction, key Key) (val [][]byte, err error)
// Read the value of a last-writer-wins register identified by the given key
ReadReg(tx Transaction, key Key) (val []byte, err error)
// Read the value of a add-wins map identified by the given key
ReadMap(tx Transaction, key Key) (val *MapReadResult, err error)
// Read the value of a multi-value register identified by the given key
ReadMVReg(tx Transaction, key Key) (val [][]byte, err error)
// Read the value of a counter identified by the given key
ReadCounter(tx Transaction, key Key) (val int32, err error)
}
// A transaction handled by Antidote on the server side.
// Interactive Transactions need to be started on the server and are kept open for their duration.
// Update operations are only visible to reads issued in the context of the same transaction or after committing the transaction.
// Always commit or abort interactive transactions to clean up the server side!
type InteractiveTransaction struct {
txID []byte
con *Connection
commited bool
con *connection
committed bool
}
func (tx *InteractiveTransaction) Update(updates ...*ApbUpdateOp) (op *ApbOperationResp, err error) {
func (tx *InteractiveTransaction) Update(updates ...*ApbUpdateOp) error {
apbUpdate := &ApbUpdateObjects{
Updates: updates,
TransactionDescriptor: tx.txID,
}
err = apbUpdate.encode(tx.con)
err := apbUpdate.encode(tx.con)
if err != nil {
return
return err
}
resp, err := decodeOperationResp(tx.con)
if err != nil {
return err
}
return decodeOperationResp(tx.con)
if !(*resp.Success) {
return fmt.Errorf("operation not successful; error code %d", *resp.Errorcode)
}
return nil
}
func (tx *InteractiveTransaction) Read(objects ...*ApbBoundObject) (resp *ApbReadObjectsResp, err error) {
......@@ -54,53 +78,78 @@ func (tx *InteractiveTransaction) Read(objects ...*ApbBoundObject) (resp *ApbRea
return decodeReadObjectsResp(tx.con)
}
func (tx *InteractiveTransaction) Commit() (op *ApbCommitResp, err error) {
msg := &ApbCommitTransaction{TransactionDescriptor: tx.txID}
err = msg.encode(tx.con)
if err != nil {
return
// commits the transaction, makes the updates issued under this transaction visible to subsequent transaction
// and cleans up the server side.
func (tx *InteractiveTransaction) Commit() error {
if !tx.committed {
msg := &ApbCommitTransaction{TransactionDescriptor: tx.txID}
err := msg.encode(tx.con)
if err != nil {
return err
}
op, err := decodeCommitResp(tx.con)
tx.con.Close()
if err != nil {
return err
}
if !(*op.Success) {
return fmt.Errorf("operation not successful; error code %d", *op.Errorcode)
}
}
op, err = decodeCommitResp(tx.con)
tx.con.Close()
return
return nil
}
func (tx *InteractiveTransaction) Abort() (op *ApbOperationResp, err error) {
msg := &ApbAbortTransaction{TransactionDescriptor: tx.txID}
err = msg.encode(tx.con)
if err != nil {
return
// aborts the transactions, discards updates issued under this transaction
// and cleans up the server side.
// WARNING: May not be supported by the current version of Antidote
func (tx *InteractiveTransaction) Abort() error {
if !tx.committed {
msg := &ApbAbortTransaction{TransactionDescriptor: tx.txID}
err := msg.encode(tx.con)
if err != nil {
return err
}
op, err := decodeOperationResp(tx.con)
tx.con.Close()
if err != nil {
return err
}
if !(*op.Success) {
return fmt.Errorf("operation not successful; error code %d", *op.Errorcode)
}
}
op, err = decodeOperationResp(tx.con)
tx.con.Close()
return
return nil
}
// Pseudo transaction to issue reads and updated without starting an interactive transaction.
// Can be interpreted as starting a transaction for each read or update and directly committing it.
type StaticTransaction struct {
client *Client
}
func (tx *StaticTransaction) Update(updates ...*ApbUpdateOp) (op *ApbOperationResp, err error) {
func (tx *StaticTransaction) Update(updates ...*ApbUpdateOp) error {
apbStaticUpdate := &ApbStaticUpdateObjects{
Transaction: &ApbStartTransaction{Properties: &ApbTxnProperties{}},
Updates: updates,
}
con, err := tx.client.getConnection()
if err != nil {
return
return err
}
err = apbStaticUpdate.encode(con)
if err != nil {
return
return err
}
resp, err := decodeCommitResp(con)
con.Close()
if err != nil {
return
return err
}
if !(*resp.Success) {
return fmt.Errorf("operation not successful; error code %d", *resp.Errorcode)
}
return &ApbOperationResp{Success: resp.Success, Errorcode: resp.Errorcode}, nil
return nil
}
......@@ -176,6 +225,14 @@ func (bucket *Bucket) ReadCounter(tx Transaction, key Key) (val int32, err error
return
}
// Represents the result of reading from a map object.
// Grants access to the keys of the map to access values of the nested CRDTs.
type MapReadResult struct {
mapResp *ApbGetMapResp
}
// Access the value of the nested add-wins set under the given key
func (mrr *MapReadResult) Set(key Key) (val [][]byte, err error) {
for _, me := range mrr.mapResp.Entries {
if *me.Key.Type == CRDTType_ORSET && bytes.Equal(me.Key.Key, key) {
......@@ -185,12 +242,7 @@ func (mrr *MapReadResult) Set(key Key) (val [][]byte, err error) {
return nil, fmt.Errorf("set entry with key '%s' not found", key)
}
type MapReadResult struct {
mapResp *ApbGetMapResp
}
// Access the value of the nested last-writer-wins register under the given key
func (mrr *MapReadResult) Reg(key Key) (val []byte, err error) {
for _, me := range mrr.mapResp.Entries {
if *me.Key.Type == CRDTType_LWWREG && bytes.Equal(me.Key.Key, key) {
......@@ -200,6 +252,7 @@ func (mrr *MapReadResult) Reg(key Key) (val []byte, err error) {
return nil, fmt.Errorf("register entry with key '%s' not found", key)
}
// Access the value of the nested add-wins map under the given key
func (mrr *MapReadResult) Map(key Key) (val *MapReadResult, err error) {
for _, me := range mrr.mapResp.Entries {
if *me.Key.Type == CRDTType_AWMAP && bytes.Equal(me.Key.Key, key) {
......@@ -209,6 +262,7 @@ func (mrr *MapReadResult) Map(key Key) (val *MapReadResult, err error) {
return nil, fmt.Errorf("map entry with key '%s' not found", key)
}
// Access the value of the nested multi-value register under the given key
func (mrr *MapReadResult) MVReg(key Key) (val [][]byte, err error) {
for _, me := range mrr.mapResp.Entries {
if *me.Key.Type == CRDTType_MVREG && bytes.Equal(me.Key.Key, key) {
......@@ -218,6 +272,7 @@ func (mrr *MapReadResult) MVReg(key Key) (val [][]byte, err error) {
return nil, fmt.Errorf("map entry with key '%s' not found", key)
}
// Access the value of the nested counter under the given key
func (mrr *MapReadResult) Counter(key Key) (val int32, err error) {
for _, me := range mrr.mapResp.Entries {
if *me.Key.Type == CRDTType_COUNTER && bytes.Equal(me.Key.Key, key) {
......@@ -227,23 +282,28 @@ func (mrr *MapReadResult) Counter(key Key) (val int32, err error) {
return 0, fmt.Errorf("counter entry with key '%s' not found", key)
}
// Updates
// Represents updates that can be converted to top-level updates applicable to a bucket
// or nested updates applicable to a map
type UpdateConverter interface {
ConvertToToplevel(bucket []byte) *ApbUpdateOp
ConvertToNested() *ApbMapNestedUpdate
}
// Represents updates of a specific key of a specific type.
// Can be applied to either buckets or maps
type CRDTUpdate struct {
Update *ApbUpdateOperation
Key []byte
Key Key
Type CRDTType
}
// A CRDTUpdater allows to apply updates in the context of a transaction.
type CRDTUpdater interface {
Update(tx Transaction, updates... *CRDTUpdate) (resp *ApbOperationResp, err error)
Update(tx Transaction, updates... *CRDTUpdate) error
}
func (bucket *Bucket) Update(tx Transaction, updates... *CRDTUpdate) (resp *ApbOperationResp, err error) {
func (bucket *Bucket) Update(tx Transaction, updates... *CRDTUpdate) error {
updateOps := make([]*ApbUpdateOp, len(updates))
for i,v := range updates {
updateOps[i] = v.ConvertToToplevel(bucket.Bucket)
......@@ -267,6 +327,7 @@ func (update *CRDTUpdate) ConvertToNested() *ApbMapNestedUpdate {
// CRDT update operations
// Represents the update to add an element to an add-wins set
func SetAdd(key Key, elems ... []byte) *CRDTUpdate {
optype := ApbSetUpdate_ADD
return &CRDTUpdate{
......@@ -278,6 +339,7 @@ func SetAdd(key Key, elems ... []byte) *CRDTUpdate {
}
}
// Represents the update to remove an element from an add-wins set
func SetRemove(key Key, elems ... []byte) *CRDTUpdate {
optype := ApbSetUpdate_REMOVE
return &CRDTUpdate{
......@@ -289,6 +351,7 @@ func SetRemove(key Key, elems ... []byte) *CRDTUpdate {
}
}
// Represents the update to increment a counter
func CounterInc(key Key, inc int64) *CRDTUpdate {
return &CRDTUpdate{
Key: key,
......@@ -299,26 +362,7 @@ func CounterInc(key Key, inc int64) *CRDTUpdate {
}
}
func IntegerInc(key Key, inc int64) *CRDTUpdate {
return &CRDTUpdate{
Key: key,
Type: CRDTType_ORSET,
Update: &ApbUpdateOperation{
Integerop: &ApbIntegerUpdate{Inc: &inc},
},
}
}
func IntegerSet(key Key, value int64) *CRDTUpdate {
return &CRDTUpdate{
Key: key,
Type: CRDTType_ORSET,
Update: &ApbUpdateOperation{
Integerop: &ApbIntegerUpdate{Set: &value},
},
}
}
// Represents the update to write a value into an last-writer-wins register
func RegPut(key Key, value []byte) *CRDTUpdate {
return &CRDTUpdate{
Key: key,
......@@ -329,6 +373,7 @@ func RegPut(key Key, value []byte) *CRDTUpdate {
}
}
// Represents the update to write a value into an multi-value register
func MVRegPut(key Key, value []byte) *CRDTUpdate {
return &CRDTUpdate{
Key: key,
......@@ -339,6 +384,7 @@ func MVRegPut(key Key, value []byte) *CRDTUpdate {
}
}
// Represents the update to nested objects of an add-wins map
func MapUpdate(key Key, updates ... *CRDTUpdate) *CRDTUpdate {
nupdates := make([]*ApbMapNestedUpdate, len(updates))
for i,v := range updates {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment