Developing lightweight computation at the DSG edge

Commit 9621aabf authored by Peter Zeller's avatar Peter Zeller
Browse files

some functions for Go interface

parent 1cd2c860
package antidoteclient
import (
"gopkg.in/fatih/pool.v2"
"net"
"fmt"
"gopkg.in/fatih/pool.v2"
"math/rand"
"net"
"time"
)
......@@ -20,16 +20,16 @@ type Host struct {
Port int
}
func NewClient(hosts []Host) (client *Client, err error) {
func NewClient(hosts ...Host) (client *Client, err error) {
pools := make([]pool.Pool, len(hosts))
for i, h := range hosts {
p, err := pool.NewChannelPool(INITIAL_POOL_SIZE, MAX_POOL_SIZE, func () (net.Conn, error) { return net.Dial("tcp", fmt.Sprint("{}:{}", h.Name, h.Port)) })
p, err := pool.NewChannelPool(INITIAL_POOL_SIZE, MAX_POOL_SIZE, func() (net.Conn, error) { return net.Dial("tcp", fmt.Sprintf("%s:%d", h.Name, h.Port)) })
if err != nil {
return
return nil, err
}
pools[i] = p
}
client = &Client {
client = &Client{
pools: pools,
}
return
......@@ -44,17 +44,17 @@ func (client *Client) Close() {
func (client *Client) getConnection() (c *Connection, err error) {
// maybe make this global?
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for _, i := range r.Perm(len(client.pools)) {
for _, i := range r.Perm(len(client.pools)) {
p := client.pools[i]
con, err := p.Get()
if err != nil {
return
return nil, err
}
c = &Connection{
Conn: con,
pool: p,
}
return
return c, nil
}
err = fmt.Errorf("All connections dead")
return
......@@ -95,3 +95,36 @@ func (client *Client) StartTransaction() (tx *InteractiveTransaction, err error)
}
return
}
func (tx *InteractiveTransaction) update(updates ...*ApbUpdateOp) (op *ApbOperationResp, err error) {
apbUpdate := &ApbUpdateObjects{
Updates: updates,
TransactionDescriptor: tx.txID,
}
err = apbUpdate.encode(tx.con)
if err != nil {
return
}
return decodeOperationResp(tx.con)
}
func (tx *InteractiveTransaction) read(objects ...*ApbBoundObject) (resp *ApbReadObjectsResp, err error) {
apbUpdate := &ApbReadObjects{
TransactionDescriptor: tx.txID,
Boundobjects: objects,
}
err = apbUpdate.encode(tx.con)
if err != nil {
return
}
return decodeReadObjectsResp(tx.con)
}
func (tx *InteractiveTransaction) commit() (op *ApbCommitResp, err error) {
msg := &ApbCommitTransaction{}
err = msg.encode(tx.con)
if err != nil {
return
}
return decodeCommitResp(tx.con)
}
package antidoteclient
//import "fmt"
import (
"testing"
"fmt"
)
func TestSimple(t *testing.T) {
client, err := NewClient(Host{"127.0.0.1", 8087})
if err != nil {
t.Fatal(err)
}
tx, err := client.StartTransaction()
if err != nil {
t.Fatal(err)
}
crdtType := CRDTType_COUNTER
key := &ApbBoundObject{
Bucket: []byte("bucket"),
Key: []byte("key"),
Type: &crdtType}
one := int64(1)
tx.update(&ApbUpdateOp{
Boundobject: key,
Operation: &ApbUpdateOperation{Counterop: &ApbCounterUpdate{Inc: &one}},
})
resp, err := tx.read(key)
if err != nil {
t.Fatal(err)
}
fmt.Print(resp.Objects[0])
_, err = tx.commit()
if err != nil {
t.Fatal(err)
}
}
package antidoteclient
import (
"encoding/binary"
"io"
"github.com/golang/protobuf/proto"
"fmt"
"encoding/binary"
"fmt"
"github.com/golang/protobuf/proto"
"io"
)
func readMsgRaw(reader io.Reader) (data []byte, err error) {
sizeB := make([]byte, 4)
var count uint32
// read the size of the message
count = 0
for count != 4 {
n, err := reader.Read(sizeB[count:])
if err != nil {
return
}
count += uint32(n)
}
sizeI := binary.BigEndian.Uint32(sizeB)
data = make([]byte, sizeI)
// read data
count = 0
for count != sizeI {
n, err := reader.Read(data[count:])
if err != nil {
data = nil
return
}
count += uint32(n)
}
return
sizeB := make([]byte, 4)
var count uint32
// read the size of the message
count = 0
for count != 4 {
n, err := reader.Read(sizeB[count:])
if err != nil {
return nil, err
}
count += uint32(n)
}
sizeI := binary.BigEndian.Uint32(sizeB)
data = make([]byte, sizeI)
// read data
count = 0
for count != sizeI {
n, err := reader.Read(data[count:])
if err != nil {
data = nil
return nil, err
}
count += uint32(n)
}
return
}
func (op *ApbReadObjects) encode(writer io.Writer) (err error) {
return encodeMsg(op, 116, writer)
return encodeMsg(op, 116, writer)
}
func (op *ApbUpdateObjects) encode(writer io.Writer) (err error) {
return encodeMsg(op, 118, writer)
return encodeMsg(op, 118, writer)
}
func (op *ApbStartTransaction) encode(writer io.Writer) (err error) {
return encodeMsg(op, 119, writer)
return encodeMsg(op, 119, writer)
}
func (op *ApbAbortTransaction) encode(writer io.Writer) (err error) {
return encodeMsg(op, 120, writer)
return encodeMsg(op, 120, writer)
}
func (op *ApbCommitTransaction) encode(writer io.Writer) (err error) {
return encodeMsg(op, 121, writer)
return encodeMsg(op, 121, writer)
}
func (op *ApbStaticUpdateObjects) encode(writer io.Writer) (err error) {
return encodeMsg(op, 122, writer)
return encodeMsg(op, 122, writer)
}
func (op *ApbStaticReadObjects) encode(writer io.Writer) (err error) {
return encodeMsg(op, 123, writer)
return encodeMsg(op, 123, writer)
}
func encodeMsg(message proto.Message, msgCode byte, writer io.Writer) (err error) {
msg, err := proto.Marshal(message)
if err != nil {
return
}
msgsize := len(msg)
buf := make([]byte, 5)
binary.BigEndian.PutUint32(buf[0:4], uint32(msgsize))
buf[5] = msgCode
writer.Write(buf)
writer.Write(msg)
return nil
msg, err := proto.Marshal(message)
if err != nil {
return
}
msgsize := len(msg)
buf := make([]byte, 5)
binary.BigEndian.PutUint32(buf[0:4], uint32(msgsize))
buf[5] = msgCode
writer.Write(buf)
writer.Write(msg)
return nil
}
func decodeOperationResp(reader io.Reader) (op *ApbOperationResp, err error) {
data, err := readMsgRaw(reader)
if err != nil {
return
}
switch data[0] {
case 0:
// error
err = decodeError(data[1:])
return
case 111:
// transaction response
resp := &ApbOperationResp{}
err = proto.Unmarshal(data[1:], resp)
if err != nil {
return
}
op = resp
return
}
err = fmt.Errorf("invalid message code: %d", data[0])
return
data, err := readMsgRaw(reader)
if err != nil {
return
}
switch data[0] {
case 0:
// error
err = decodeError(data[1:])
return
case 111:
// transaction response
resp := &ApbOperationResp{}
err = proto.Unmarshal(data[1:], resp)
if err != nil {
return
}
op = resp
return
}
err = fmt.Errorf("invalid message code: %d", data[0])
return
}
func decodeStartTransactionResp(reader io.Reader) (op *ApbStartTransactionResp, err error) {
data, err := readMsgRaw(reader)
if err != nil {
return
}
switch data[0] {
case 0:
// error
err = decodeError(data[1:])
return
case 124:
// transaction response
resp := &ApbStartTransactionResp{}
err = proto.Unmarshal(data[1:], resp)
if err != nil {
return
}
op = resp
return
}
err = fmt.Errorf("invalid message code: %d", data[0])
return
data, err := readMsgRaw(reader)
if err != nil {
return
}
switch data[0] {
case 0:
// error
err = decodeError(data[1:])
return
case 124:
// transaction response
resp := &ApbStartTransactionResp{}
err = proto.Unmarshal(data[1:], resp)
if err != nil {
return
}
op = resp
return
}
err = fmt.Errorf("invalid message code: %d", data[0])
return
}
func decodeReadObjectsResp(reader io.Reader) (op *ApbReadObjectsResp, err error) {
data, err := readMsgRaw(reader)
if err != nil {
return
}
switch data[0] {
case 0:
// error
err = decodeError(data[1:])
return
case 126:
// transaction response
resp := &ApbReadObjectsResp{}
err = proto.Unmarshal(data[1:], resp)
if err != nil {
return
}
op = resp
return
}
err = fmt.Errorf("invalid message code: %d", data[0])
return
data, err := readMsgRaw(reader)
if err != nil {
return
}
switch data[0] {
case 0:
// error
err = decodeError(data[1:])
return
case 126:
// transaction response
resp := &ApbReadObjectsResp{}
err = proto.Unmarshal(data[1:], resp)
if err != nil {
return
}
op = resp
return
}
err = fmt.Errorf("invalid message code: %d", data[0])
return
}
func decodeCommitResp(reader io.Reader) (op *ApbCommitResp, err error) {
data, err := readMsgRaw(reader)
if err != nil {
return
}
switch data[0] {
case 0:
// error
err = decodeError(data[1:])
return
case 127:
// transaction response
resp := &ApbCommitResp{}
err = proto.Unmarshal(data[1:], resp)
if err != nil {
return
}
op = resp
return
}
err = fmt.Errorf("invalid message code: %d", data[0])
return
data, err := readMsgRaw(reader)
if err != nil {
return
}
switch data[0] {
case 0:
// error
err = decodeError(data[1:])
return
case 127:
// transaction response
resp := &ApbCommitResp{}
err = proto.Unmarshal(data[1:], resp)
if err != nil {
return
}
op = resp
return
}
err = fmt.Errorf("invalid message code: %d", data[0])
return
}
func decodeStaticReadObjectsResp(reader io.Reader) (op *ApbStaticReadObjectsResp, err error) {
data, err := readMsgRaw(reader)
if err != nil {
return
}
switch data[0] {
case 0:
// error
err = decodeError(data[1:])
return
case 128:
// transaction response
resp := &ApbStaticReadObjectsResp{}
err = proto.Unmarshal(data[1:], resp)
if err != nil {
return
}
op = resp
return
}
err = fmt.Errorf("invalid message code: %d", data[0])
return
data, err := readMsgRaw(reader)
if err != nil {
return
}
switch data[0] {
case 0:
// error
err = decodeError(data[1:])
return
case 128:
// transaction response
resp := &ApbStaticReadObjectsResp{}
err = proto.Unmarshal(data[1:], resp)
if err != nil {
return
}
op = resp
return
}
err = fmt.Errorf("invalid message code: %d", data[0])
return
}
func decodeError(data []byte) (err error) {
resp := &ApbErrorResp{}
err = proto.Unmarshal(data, resp)
if err != nil {
return
}
err = fmt.Errorf("antidote error code %d, %s", resp.Errcode, string(resp.Errmsg))
return
resp := &ApbErrorResp{}
err = proto.Unmarshal(data, resp)
if err != nil {
return
}
err = fmt.Errorf("antidote error code %d, %s", resp.Errcode, string(resp.Errmsg))
return
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment