Developing lightweight computation at the DSG edge

antidoteclient.go 2.63 KB
Newer Older
Mathias Weber's avatar
Mathias Weber committed
1
2
3
4
5
package antidoteclient

import (
	"fmt"
	"math/rand"
Peter Zeller's avatar
Peter Zeller committed
6
	"net"
Mathias Weber's avatar
Mathias Weber committed
7
	"time"
Mathias Weber's avatar
Mathias Weber committed
8
9

	"gopkg.in/fatih/pool.v2"
Mathias Weber's avatar
Mathias Weber committed
10
11
12
13
14
)

const INITIAL_POOL_SIZE = 1
const MAX_POOL_SIZE = 50

Mathias Weber's avatar
Mathias Weber committed
15
16
// Represents connections to the Antidote database.
// Allows to start/create transaction.
Mathias Weber's avatar
Mathias Weber committed
17
18
19
20
type Client struct {
	pools []pool.Pool
}

Mathias Weber's avatar
Mathias Weber committed
21
22
// Represents an Antidote server.
// The port needs to be the port of the protocol-buffer interface (usually 8087)
Mathias Weber's avatar
Mathias Weber committed
23
24
25
26
27
type Host struct {
	Name string
	Port int
}

Mathias Weber's avatar
Mathias Weber committed
28
29
// Recreates a new Antidote client connected to the given Antidote servers.
// Remember to close the client to clean-up the connections in the connection pool
Peter Zeller's avatar
Peter Zeller committed
30
func NewClient(hosts ...Host) (client *Client, err error) {
Mathias Weber's avatar
Mathias Weber committed
31
32
	pools := make([]pool.Pool, len(hosts))
	for i, h := range hosts {
Peter Zeller's avatar
Peter Zeller committed
33
		p, err := pool.NewChannelPool(INITIAL_POOL_SIZE, MAX_POOL_SIZE, func() (net.Conn, error) { return net.Dial("tcp", fmt.Sprintf("%s:%d", h.Name, h.Port)) })
Mathias Weber's avatar
Mathias Weber committed
34
		if err != nil {
Peter Zeller's avatar
Peter Zeller committed
35
			return nil, err
Mathias Weber's avatar
Mathias Weber committed
36
37
38
		}
		pools[i] = p
	}
Peter Zeller's avatar
Peter Zeller committed
39
	client = &Client{
Mathias Weber's avatar
Mathias Weber committed
40
41
42
43
44
		pools: pools,
	}
	return
}

Mathias Weber's avatar
Mathias Weber committed
45
// Call close after using the client to clean up the connections int he connection pool and release resources.
Mathias Weber's avatar
Mathias Weber committed
46
47
48
49
50
51
func (client *Client) Close() {
	for _, p := range client.pools {
		p.Close()
	}
}

Mathias Weber's avatar
Mathias Weber committed
52
func (client *Client) getConnection() (c *connection, err error) {
Mathias Weber's avatar
Mathias Weber committed
53
54
	// maybe make this global?
	r := rand.New(rand.NewSource(time.Now().UnixNano()))
Peter Zeller's avatar
Peter Zeller committed
55
	for _, i := range r.Perm(len(client.pools)) {
Mathias Weber's avatar
Mathias Weber committed
56
57
58
		p := client.pools[i]
		con, err := p.Get()
		if err != nil {
Peter Zeller's avatar
Peter Zeller committed
59
			return nil, err
Mathias Weber's avatar
Mathias Weber committed
60
		}
Mathias Weber's avatar
Mathias Weber committed
61
		c = &connection{
Mathias Weber's avatar
Mathias Weber committed
62
63
64
			Conn: con,
			pool: p,
		}
Peter Zeller's avatar
Peter Zeller committed
65
		return c, nil
Mathias Weber's avatar
Mathias Weber committed
66
67
68
69
70
71
	}
	err = fmt.Errorf("All connections dead")
	return
}

// a close already puts the connection back into the right pool
Mathias Weber's avatar
Mathias Weber committed
72
type connection struct {
Mathias Weber's avatar
Mathias Weber committed
73
74
75
76
	net.Conn
	pool pool.Pool
}

Mathias Weber's avatar
Mathias Weber committed
77
78
79
// Starts an interactive transaction and registers it on the Antidote server.
// The connection used to issue reads and updates is sticky;
// interactive transactions are only valid local to the server they are started on.
Mathias Weber's avatar
Mathias Weber committed
80
81
82
83
84
func (client *Client) StartTransaction() (tx *InteractiveTransaction, err error) {
	con, err := client.getConnection()
	if err != nil {
		return
	}
Mathias Weber's avatar
Mathias Weber committed
85
86
	readwrite := uint32(0)
	blue := uint32(0)
Mathias Weber's avatar
Mathias Weber committed
87
	apbtxn := &ApbStartTransaction{
Mathias Weber's avatar
Mathias Weber committed
88
		Properties: &ApbTxnProperties{ReadWrite: &readwrite, RedBlue: &blue},
Mathias Weber's avatar
Mathias Weber committed
89
	}
Mathias Weber's avatar
Mathias Weber committed
90
91
92
93
94
95
96
97
98
99
100
	err = apbtxn.encode(con)
	if err != nil {
		return
	}

	apbtxnresp, err := decodeStartTransactionResp(con)
	if err != nil {
		return
	}
	txndesc := apbtxnresp.TransactionDescriptor
	tx = &InteractiveTransaction{
Mathias Weber's avatar
Mathias Weber committed
101
102
		con:  con,
		txID: txndesc,
Mathias Weber's avatar
Mathias Weber committed
103
104
	}
	return
Mathias Weber's avatar
Mathias Weber committed
105
}
Peter Zeller's avatar
Peter Zeller committed
106

Mathias Weber's avatar
Mathias Weber committed
107
// Creates a static transaction object. Does not communicate with the Antidote server.
Mathias Weber's avatar
Mathias Weber committed
108
109
func (client *Client) CreateStaticTransaction() *StaticTransaction {
	return &StaticTransaction{client: client}
110
}