Developing lightweight computation at the DSG edge

transactions.go 11.4 KB
Newer Older
Mathias Weber's avatar
Mathias Weber committed
1
2
3
4
5
6
7
package antidoteclient

import (
	"bytes"
	"fmt"
)

Mathias Weber's avatar
Mathias Weber committed
8
9
// Represents a bucket in the Antidote database.
// Offers a high-level interface to issue read and write operations on objects in the bucket.
Mathias Weber's avatar
Mathias Weber committed
10
11
12
13
type Bucket struct {
	Bucket []byte
}

Mathias Weber's avatar
Mathias Weber committed
14
15
16
// A transaction object offers low-level mechanisms to send protocol-buffer messages to Antidote in the context of
// a highly-available transaction.
// Typical representatives are interactive transactions handled by Antidote and static transactions handled on the client side.
Mathias Weber's avatar
Mathias Weber committed
17
18
type Transaction interface {
	Read(objects ...*ApbBoundObject) (resp *ApbReadObjectsResp, err error)
Mathias Weber's avatar
Mathias Weber committed
19
	Update(updates ...*ApbUpdateOp) error
Mathias Weber's avatar
Mathias Weber committed
20
21
}

Mathias Weber's avatar
Mathias Weber committed
22
23
// Type alias for byte-slices.
// Used to represent keys of objects in buckets and maps
Mathias Weber's avatar
Mathias Weber committed
24
25
type Key []byte

Mathias Weber's avatar
Mathias Weber committed
26
// A CRDTReader allows to read the value of objects identified by keys in the context of a transaction.
Mathias Weber's avatar
Mathias Weber committed
27
type CRDTReader interface {
Mathias Weber's avatar
Mathias Weber committed
28
	// Read the value of a add-wins set identified by the given key
Mathias Weber's avatar
Mathias Weber committed
29
	ReadSet(tx Transaction, key Key) (val [][]byte, err error)
Mathias Weber's avatar
Mathias Weber committed
30
	// Read the value of a last-writer-wins register identified by the given key
Mathias Weber's avatar
Mathias Weber committed
31
	ReadReg(tx Transaction, key Key) (val []byte, err error)
Mathias Weber's avatar
Mathias Weber committed
32
	// Read the value of a add-wins map identified by the given key
Mathias Weber's avatar
Mathias Weber committed
33
	ReadMap(tx Transaction, key Key) (val *MapReadResult, err error)
Mathias Weber's avatar
Mathias Weber committed
34
	// Read the value of a multi-value register identified by the given key
Mathias Weber's avatar
Mathias Weber committed
35
	ReadMVReg(tx Transaction, key Key) (val [][]byte, err error)
Mathias Weber's avatar
Mathias Weber committed
36
	// Read the value of a counter identified by the given key
Mathias Weber's avatar
Mathias Weber committed
37
	ReadCounter(tx Transaction, key Key) (val int32, err error)
Mathias Weber's avatar
Mathias Weber committed
38
39
}

Mathias Weber's avatar
Mathias Weber committed
40
41
42
43
// A transaction handled by Antidote on the server side.
// Interactive Transactions need to be started on the server and are kept open for their duration.
// Update operations are only visible to reads issued in the context of the same transaction or after committing the transaction.
// Always commit or abort interactive transactions to clean up the server side!
Mathias Weber's avatar
Mathias Weber committed
44
type InteractiveTransaction struct {
Mathias Weber's avatar
Mathias Weber committed
45
46
	txID      []byte
	con       *connection
Mathias Weber's avatar
Mathias Weber committed
47
	committed bool
Mathias Weber's avatar
Mathias Weber committed
48
49
}

Mathias Weber's avatar
Mathias Weber committed
50
func (tx *InteractiveTransaction) Update(updates ...*ApbUpdateOp) error {
Mathias Weber's avatar
Mathias Weber committed
51
52
53
54
	apbUpdate := &ApbUpdateObjects{
		Updates:               updates,
		TransactionDescriptor: tx.txID,
	}
Mathias Weber's avatar
Mathias Weber committed
55
	err := apbUpdate.encode(tx.con)
Mathias Weber's avatar
Mathias Weber committed
56
	if err != nil {
Mathias Weber's avatar
Mathias Weber committed
57
58
59
60
61
		return err
	}
	resp, err := decodeOperationResp(tx.con)
	if err != nil {
		return err
Mathias Weber's avatar
Mathias Weber committed
62
	}
Mathias Weber's avatar
Mathias Weber committed
63
64
65
66
	if !(*resp.Success) {
		return fmt.Errorf("operation not successful; error code %d", *resp.Errorcode)
	}
	return nil
Mathias Weber's avatar
Mathias Weber committed
67
68
69
70
71
72
73
74
75
76
77
78
79
80
}

func (tx *InteractiveTransaction) Read(objects ...*ApbBoundObject) (resp *ApbReadObjectsResp, err error) {
	apbUpdate := &ApbReadObjects{
		TransactionDescriptor: tx.txID,
		Boundobjects:          objects,
	}
	err = apbUpdate.encode(tx.con)
	if err != nil {
		return
	}
	return decodeReadObjectsResp(tx.con)
}

Mathias Weber's avatar
Mathias Weber committed
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
// commits the transaction, makes the updates issued under this transaction visible to subsequent transaction
// and cleans up the server side.
func (tx *InteractiveTransaction) Commit() error {
	if !tx.committed {
		msg := &ApbCommitTransaction{TransactionDescriptor: tx.txID}
		err := msg.encode(tx.con)
		if err != nil {
			return err
		}
		op, err := decodeCommitResp(tx.con)
		tx.con.Close()
		if err != nil {
			return err
		}
		if !(*op.Success) {
			return fmt.Errorf("operation not successful; error code %d", *op.Errorcode)
		}
Mathias Weber's avatar
Mathias Weber committed
98
	}
Mathias Weber's avatar
Mathias Weber committed
99
	return nil
Mathias Weber's avatar
Mathias Weber committed
100
101
}

Mathias Weber's avatar
Mathias Weber committed
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
// aborts the transactions, discards updates issued under this transaction
// and cleans up the server side.
// WARNING: May not be supported by the current version of Antidote
func (tx *InteractiveTransaction) Abort() error {
	if !tx.committed {
		msg := &ApbAbortTransaction{TransactionDescriptor: tx.txID}
		err := msg.encode(tx.con)
		if err != nil {
			return err
		}
		op, err := decodeOperationResp(tx.con)
		tx.con.Close()
		if err != nil {
			return err
		}
		if !(*op.Success) {
			return fmt.Errorf("operation not successful; error code %d", *op.Errorcode)
		}
Mathias Weber's avatar
Mathias Weber committed
120
	}
Mathias Weber's avatar
Mathias Weber committed
121
	return nil
Mathias Weber's avatar
Mathias Weber committed
122
123
}

Mathias Weber's avatar
Mathias Weber committed
124
125
// Pseudo transaction to issue reads and updated without starting an interactive transaction.
// Can be interpreted as starting a transaction for each read or update and directly committing it.
Mathias Weber's avatar
Mathias Weber committed
126
127
128
129
type StaticTransaction struct {
	client *Client
}

Mathias Weber's avatar
Mathias Weber committed
130
func (tx *StaticTransaction) Update(updates ...*ApbUpdateOp) error {
Mathias Weber's avatar
Mathias Weber committed
131
132
	apbStaticUpdate := &ApbStaticUpdateObjects{
		Transaction: &ApbStartTransaction{Properties: &ApbTxnProperties{}},
Mathias Weber's avatar
Mathias Weber committed
133
		Updates:     updates,
Mathias Weber's avatar
Mathias Weber committed
134
135
136
	}
	con, err := tx.client.getConnection()
	if err != nil {
Mathias Weber's avatar
Mathias Weber committed
137
		return err
Mathias Weber's avatar
Mathias Weber committed
138
139
140
	}
	err = apbStaticUpdate.encode(con)
	if err != nil {
Mathias Weber's avatar
Mathias Weber committed
141
		return err
Mathias Weber's avatar
Mathias Weber committed
142
143
144
145
	}
	resp, err := decodeCommitResp(con)
	con.Close()
	if err != nil {
Mathias Weber's avatar
Mathias Weber committed
146
147
148
149
		return err
	}
	if !(*resp.Success) {
		return fmt.Errorf("operation not successful; error code %d", *resp.Errorcode)
Mathias Weber's avatar
Mathias Weber committed
150
	}
Mathias Weber's avatar
Mathias Weber committed
151
	return nil
Mathias Weber's avatar
Mathias Weber committed
152
153
154
155
156
}

func (tx *StaticTransaction) Read(objects ...*ApbBoundObject) (resp *ApbReadObjectsResp, err error) {
	apbRead := &ApbStaticReadObjects{
		Transaction: &ApbStartTransaction{Properties: &ApbTxnProperties{}},
Mathias Weber's avatar
Mathias Weber committed
157
		Objects:     objects,
Mathias Weber's avatar
Mathias Weber committed
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
	}
	con, err := tx.client.getConnection()
	if err != nil {
		return
	}
	err = apbRead.encode(con)
	if err != nil {
		return
	}
	sresp, err := decodeStaticReadObjectsResp(con)
	con.Close()
	if err != nil {
		return
	}
	return sresp.Objects, nil
}

Mathias Weber's avatar
Mathias Weber committed
175
func (bucket *Bucket) ReadSet(tx Transaction, key Key) (val [][]byte, err error) {
Mathias Weber's avatar
Mathias Weber committed
176
177
178
179
180
181
182
183
184
	crdtType := CRDTType_ORSET
	resp, err := tx.Read(&ApbBoundObject{Bucket: bucket.Bucket, Key: key, Type: &crdtType})
	if err != nil {
		return
	}
	val = resp.Objects[0].Set.Value
	return
}

Mathias Weber's avatar
Mathias Weber committed
185
func (bucket *Bucket) ReadReg(tx Transaction, key Key) (val []byte, err error) {
Mathias Weber's avatar
Mathias Weber committed
186
187
188
189
190
191
192
193
194
	crdtType := CRDTType_LWWREG
	resp, err := tx.Read(&ApbBoundObject{Bucket: bucket.Bucket, Key: key, Type: &crdtType})
	if err != nil {
		return
	}
	val = resp.Objects[0].Reg.Value
	return
}

Mathias Weber's avatar
Mathias Weber committed
195
func (bucket *Bucket) ReadMap(tx Transaction, key Key) (val *MapReadResult, err error) {
Mathias Weber's avatar
Mathias Weber committed
196
	crdtType := CRDTType_RRMAP
Mathias Weber's avatar
Mathias Weber committed
197
198
199
200
201
202
203
204
	resp, err := tx.Read(&ApbBoundObject{Bucket: bucket.Bucket, Key: key, Type: &crdtType})
	if err != nil {
		return
	}
	val = &MapReadResult{mapResp: resp.Objects[0].Map}
	return
}

Mathias Weber's avatar
Mathias Weber committed
205
func (bucket *Bucket) ReadMVReg(tx Transaction, key Key) (val [][]byte, err error) {
Mathias Weber's avatar
Mathias Weber committed
206
207
208
209
210
211
212
213
214
	crdtType := CRDTType_MVREG
	resp, err := tx.Read(&ApbBoundObject{Bucket: bucket.Bucket, Key: key, Type: &crdtType})
	if err != nil {
		return
	}
	val = resp.Objects[0].Mvreg.Values
	return
}

Mathias Weber's avatar
Mathias Weber committed
215
func (bucket *Bucket) ReadCounter(tx Transaction, key Key) (val int32, err error) {
Mathias Weber's avatar
Mathias Weber committed
216
217
218
219
220
221
222
223
224
	crdtType := CRDTType_COUNTER
	resp, err := tx.Read(&ApbBoundObject{Bucket: bucket.Bucket, Key: key, Type: &crdtType})
	if err != nil {
		return
	}
	val = *resp.Objects[0].Counter.Value
	return
}

Mathias Weber's avatar
Mathias Weber committed
225
226
227
228
229
230
231
// Represents the result of reading from a map object.
// Grants access to the keys of the map to access values of the nested CRDTs.
type MapReadResult struct {
	mapResp *ApbGetMapResp
}

// Access the value of the nested add-wins set under the given key
Mathias Weber's avatar
Mathias Weber committed
232
func (mrr *MapReadResult) Set(key Key) (val [][]byte, err error) {
Mathias Weber's avatar
Mathias Weber committed
233
234
235
236
237
238
239
240
	for _, me := range mrr.mapResp.Entries {
		if *me.Key.Type == CRDTType_ORSET && bytes.Equal(me.Key.Key, key) {
			return me.Value.Set.Value, nil
		}
	}
	return nil, fmt.Errorf("set entry with key '%s' not found", key)
}

Mathias Weber's avatar
Mathias Weber committed
241
// Access the value of the nested last-writer-wins register under the given key
Mathias Weber's avatar
Mathias Weber committed
242
func (mrr *MapReadResult) Reg(key Key) (val []byte, err error) {
Mathias Weber's avatar
Mathias Weber committed
243
244
245
246
247
248
249
250
	for _, me := range mrr.mapResp.Entries {
		if *me.Key.Type == CRDTType_LWWREG && bytes.Equal(me.Key.Key, key) {
			return me.Value.Reg.Value, nil
		}
	}
	return nil, fmt.Errorf("register entry with key '%s' not found", key)
}

Mathias Weber's avatar
Mathias Weber committed
251
// Access the value of the nested add-wins map under the given key
Mathias Weber's avatar
Mathias Weber committed
252
func (mrr *MapReadResult) Map(key Key) (val *MapReadResult, err error) {
Mathias Weber's avatar
Mathias Weber committed
253
	for _, me := range mrr.mapResp.Entries {
Mathias Weber's avatar
Mathias Weber committed
254
		if *me.Key.Type == CRDTType_RRMAP && bytes.Equal(me.Key.Key, key) {
Mathias Weber's avatar
Mathias Weber committed
255
256
257
258
259
260
			return &MapReadResult{mapResp: me.Value.Map}, nil
		}
	}
	return nil, fmt.Errorf("map entry with key '%s' not found", key)
}

Mathias Weber's avatar
Mathias Weber committed
261
// Access the value of the nested multi-value register under the given key
Mathias Weber's avatar
Mathias Weber committed
262
func (mrr *MapReadResult) MVReg(key Key) (val [][]byte, err error) {
Mathias Weber's avatar
Mathias Weber committed
263
264
265
266
267
268
269
270
	for _, me := range mrr.mapResp.Entries {
		if *me.Key.Type == CRDTType_MVREG && bytes.Equal(me.Key.Key, key) {
			return me.Value.Mvreg.Values, nil
		}
	}
	return nil, fmt.Errorf("map entry with key '%s' not found", key)
}

Mathias Weber's avatar
Mathias Weber committed
271
// Access the value of the nested counter under the given key
Mathias Weber's avatar
Mathias Weber committed
272
func (mrr *MapReadResult) Counter(key Key) (val int32, err error) {
Mathias Weber's avatar
Mathias Weber committed
273
274
275
276
277
278
279
280
	for _, me := range mrr.mapResp.Entries {
		if *me.Key.Type == CRDTType_COUNTER && bytes.Equal(me.Key.Key, key) {
			return *me.Value.Counter.Value, nil
		}
	}
	return 0, fmt.Errorf("counter entry with key '%s' not found", key)
}

Mathias Weber's avatar
Mathias Weber committed
281
282
// Represents updates that can be converted to top-level updates applicable to a bucket
// or nested updates applicable to a map
Mathias Weber's avatar
Mathias Weber committed
283
284
285
286
287
type UpdateConverter interface {
	ConvertToToplevel(bucket []byte) *ApbUpdateOp
	ConvertToNested() *ApbMapNestedUpdate
}

Mathias Weber's avatar
Mathias Weber committed
288
289
// Represents updates of a specific key of a specific type.
// Can be applied to either buckets or maps
Mathias Weber's avatar
Mathias Weber committed
290
291
type CRDTUpdate struct {
	Update *ApbUpdateOperation
Mathias Weber's avatar
Mathias Weber committed
292
	Key    Key
Mathias Weber's avatar
Mathias Weber committed
293
294
295
	Type   CRDTType
}

Mathias Weber's avatar
Mathias Weber committed
296
// A CRDTUpdater allows to apply updates in the context of a transaction.
Mathias Weber's avatar
Mathias Weber committed
297
type CRDTUpdater interface {
Mathias Weber's avatar
Mathias Weber committed
298
	Update(tx Transaction, updates ...*CRDTUpdate) error
Mathias Weber's avatar
Mathias Weber committed
299
300
}

Mathias Weber's avatar
Mathias Weber committed
301
func (bucket *Bucket) Update(tx Transaction, updates ...*CRDTUpdate) error {
Mathias Weber's avatar
Mathias Weber committed
302
	updateOps := make([]*ApbUpdateOp, len(updates))
Mathias Weber's avatar
Mathias Weber committed
303
	for i, v := range updates {
Mathias Weber's avatar
Mathias Weber committed
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
		updateOps[i] = v.ConvertToToplevel(bucket.Bucket)
	}
	return tx.Update(updateOps...)
}

func (update *CRDTUpdate) ConvertToToplevel(bucket []byte) *ApbUpdateOp {
	return &ApbUpdateOp{
		Boundobject: &ApbBoundObject{Key: update.Key, Type: &update.Type, Bucket: bucket},
		Operation:   update.Update,
	}
}

func (update *CRDTUpdate) ConvertToNested() *ApbMapNestedUpdate {
	return &ApbMapNestedUpdate{
		Key:    &ApbMapKey{Key: update.Key, Type: &update.Type},
		Update: update.Update,
	}
}

// CRDT update operations

Mathias Weber's avatar
Mathias Weber committed
325
// Represents the update to add an element to an add-wins set
Mathias Weber's avatar
Mathias Weber committed
326
func SetAdd(key Key, elems ...[]byte) *CRDTUpdate {
Mathias Weber's avatar
Mathias Weber committed
327
328
329
330
331
332
333
334
335
336
	optype := ApbSetUpdate_ADD
	return &CRDTUpdate{
		Key:  key,
		Type: CRDTType_ORSET,
		Update: &ApbUpdateOperation{
			Setop: &ApbSetUpdate{Adds: elems, Optype: &optype},
		},
	}
}

Mathias Weber's avatar
Mathias Weber committed
337
// Represents the update to remove an element from an add-wins set
Mathias Weber's avatar
Mathias Weber committed
338
func SetRemove(key Key, elems ...[]byte) *CRDTUpdate {
Mathias Weber's avatar
Mathias Weber committed
339
340
341
342
343
344
345
346
347
348
	optype := ApbSetUpdate_REMOVE
	return &CRDTUpdate{
		Key:  key,
		Type: CRDTType_ORSET,
		Update: &ApbUpdateOperation{
			Setop: &ApbSetUpdate{Adds: elems, Optype: &optype},
		},
	}
}

Mathias Weber's avatar
Mathias Weber committed
349
// Represents the update to increment a counter
Mathias Weber's avatar
Mathias Weber committed
350
func CounterInc(key Key, inc int64) *CRDTUpdate {
Mathias Weber's avatar
Mathias Weber committed
351
352
353
354
355
356
357
358
359
	return &CRDTUpdate{
		Key:  key,
		Type: CRDTType_COUNTER,
		Update: &ApbUpdateOperation{
			Counterop: &ApbCounterUpdate{Inc: &inc},
		},
	}
}

Mathias Weber's avatar
Mathias Weber committed
360
// Represents the update to write a value into an last-writer-wins register
Mathias Weber's avatar
Mathias Weber committed
361
func RegPut(key Key, value []byte) *CRDTUpdate {
Mathias Weber's avatar
Mathias Weber committed
362
363
364
365
366
367
368
369
370
	return &CRDTUpdate{
		Key:  key,
		Type: CRDTType_LWWREG,
		Update: &ApbUpdateOperation{
			Regop: &ApbRegUpdate{Value: value},
		},
	}
}

Mathias Weber's avatar
Mathias Weber committed
371
// Represents the update to write a value into an multi-value register
Mathias Weber's avatar
Mathias Weber committed
372
func MVRegPut(key Key, value []byte) *CRDTUpdate {
Mathias Weber's avatar
Mathias Weber committed
373
374
375
376
377
378
379
380
381
	return &CRDTUpdate{
		Key:  key,
		Type: CRDTType_MVREG,
		Update: &ApbUpdateOperation{
			Regop: &ApbRegUpdate{Value: value},
		},
	}
}

Mathias Weber's avatar
Mathias Weber committed
382
// Represents the update to nested objects of an add-wins map
Mathias Weber's avatar
Mathias Weber committed
383
func MapUpdate(key Key, updates ...*CRDTUpdate) *CRDTUpdate {
Mathias Weber's avatar
Mathias Weber committed
384
	nupdates := make([]*ApbMapNestedUpdate, len(updates))
Mathias Weber's avatar
Mathias Weber committed
385
	for i, v := range updates {
Mathias Weber's avatar
Mathias Weber committed
386
387
388
389
		nupdates[i] = v.ConvertToNested()
	}
	return &CRDTUpdate{
		Key:  key,
Mathias Weber's avatar
Mathias Weber committed
390
		Type: CRDTType_RRMAP,
Mathias Weber's avatar
Mathias Weber committed
391
392
393
394
395
		Update: &ApbUpdateOperation{
			Mapop: &ApbMapUpdate{Updates: nupdates},
		},
	}
}