Developing lightweight computation at the DSG edge

goclidote.go 12.6 KB
Newer Older
1
2
3
4
5
6
7
8
package goclidote

import (
	"fmt"
	"strconv"

	"uc-monitor-go-test/cnml"

9
	"github.com/AntidoteDB/antidote-go-client"
10
	"github.com/golang/glog"
11
12
13
)

// DatabaseHost defines the host of the AntidoteDB instance to connect to is running
14
// var DatabaseHost = "localhost"
15
16

// DatabasePort defines the port of the host where the AntidoteDB instance to connect to is running
17
// var DatabasePort = 8087
18

19
20
21
22
23
24
25
26
27
28
29
30
31
32
// Error levels

// FATAL output code
var FATAL = 1

// ERROR output code
var ERROR = 2

// WARNING output code
var WARNING = 3

// INFO output code
var INFO = 4

33
// AntidoteAddItemToSetInBucket save an item to a set in a bucket
34
func AntidoteAddItemToSetInBucket(bucketName string, setName string, itemValue string, DatabaseHost string, DatabasePort int) bool {
35
36

	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
37
	errCheck(err, ERROR, "[AntidoteAddItemToSetInBucket]: Unable to create a new AntidoteDB go client")
38
39
40
	defer client.Close()

	tx, err := client.StartTransaction()
41
	errCheck(err, ERROR, "[AntidoteAddItemToSetInBucket]: Unable to start a transaction using the go client")
42
43
44
45
46

	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(setName)

	err = bucket.Update(tx, antidoteclient.SetAdd(key, []byte(itemValue)))
47
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInBucket]: Unable to add value %s to set %s in bucket %s", itemValue, setName, bucketName))
48
49

	setVal, err := bucket.ReadSet(tx, key)
50
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInBucket]: Unable to read set %s in bucket %s", setName, bucketName))
51

52
	err = tx.Commit()
53
	errCheck(err, ERROR, "[AntidoteAddItemToSetInBucket]: Unable to commit transaction")
54

55
	// Return true if the value belongs now to the set (either it has been added now, or was there already)
56
57
58
59
60
61
	for _, v := range setVal {
		if string(v) == itemValue {
			return true
		}
	}

62
	// Return false otherwise
63
64
65
	return false
}

66

67
68
// AntidoteAddItemToSetInMapInBucket save an item to a set in a map in a bucket
func AntidoteAddItemToSetInMapInBucket(bucketName string, mapName string, setName string, itemValue string, DatabaseHost string, DatabasePort int) bool {
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
	errCheck(err, ERROR, "[AntidoteAddItemToSetInMapInBucket]: Unable to create a new AntidoteDB go client")
	defer client.Close()

	tx, err := client.StartTransaction()
	errCheck(err, ERROR, "[AntidoteAddItemToSetInMapInBucket]: Unable to start a transaction using the go client")

	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	mapKey := antidoteclient.Key(mapName)
	setKey := antidoteclient.Key(setName)

	err = bucket.Update(tx, antidoteclient.MapUpdate(mapKey,
		antidoteclient.SetAdd(setKey, []byte(itemValue))))
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInMapInBucket]: Unable to add value %s to set %s in bucket %s", itemValue, setName, bucketName))

	mapVal, err := bucket.ReadMap(tx, mapKey)
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInMapInBucket]: Unable to read set %s in bucket %s", setName, bucketName))

	setVal, err := mapVal.Set(setKey)
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInMapInBucket]: Unable to read set %s in bucket %s", setName, bucketName))

	// Return true if the value belongs now to the set (either it has been added now, or was there already)
	for _, v := range setVal {
		if string(v) == itemValue {
			return true
		}
	}

	// Return false otherwise
	return false
99
100
}

101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
// AntidoteAddArrayToSetInMapInBucket save an item to a set in a map in a bucket
func AntidoteAddArrayToSetInMapInBucket(bucketName string, mapName string, setName string, itemArray []string, DatabaseHost string, DatabasePort int) bool {
	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
	errCheck(err, ERROR, "[AntidoteAddArrayToSetInMapInBucket]: Unable to create a new AntidoteDB go client")
	defer client.Close()

	tx, err := client.StartTransaction()
	errCheck(err, ERROR, "[AntidoteAddArrayToSetInMapInBucket]: Unable to start a transaction using the go client")

	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	mapKey := antidoteclient.Key(mapName)
	setKey := antidoteclient.Key(setName)

	for _, v := range itemArray {
		err = bucket.Update(tx, antidoteclient.MapUpdate(mapKey,
			antidoteclient.SetAdd(setKey, []byte(v))))
		errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddArrayToSetInMapInBucket]: Unable to add value %s to set %s in bucket %s", v, setName, bucketName))
	}

	mapVal, err := bucket.ReadMap(tx, mapKey)
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddArrayToSetInMapInBucket]: Unable to read set %s in bucket %s", setName, bucketName))

	setVal, err := mapVal.Set(setKey)
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddArrayToSetInMapInBucket]: Unable to read set %s in bucket %s", setName, bucketName))

	err = tx.Commit()
	errCheck(err, ERROR, "[AntidoteAddArrayToSetInMapInBucket]: Unable to commit transaction")

	// Return true if all values belong now to the set (either they have been added now, or were there already)
	anyMissing := false

	for _,v := range itemArray {
		thisFound := false
		for _, w := range setVal {
			if v == string(w) {
				thisFound = true
			}
			if !thisFound {
				anyMissing = true
				break
			}
		}
	}

	// Return if all itemswere found
	return !anyMissing
}

149
// AntidoteReadRegisterInBucket read a LWWW register  in a bucket
150
func AntidoteReadRegisterInBucket(bucketName string, registerName string, DatabaseHost string, DatabasePort int) string {
151

152
	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
153
	errCheck(err, ERROR, "[AntidoteReadRegisterInBucket]: Unable to create a new AntidoteDB go client")
154
	defer client.Close()
155

156
	tx, err := client.StartTransaction()
157
	errCheck(err, ERROR, "[AntidoteReadRegisterInBucket]: Unable to start a transaction using the go client")
158

159
160
	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(registerName)
161

162
	setVal, err := bucket.ReadReg(tx, key)
163
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteReadRegisterInBucket]: Unable to read register %s in bucket %s", registerName, bucketName))
164

165
	err = tx.Commit()
166
	errCheck(err, ERROR, "[AntidoteReadRegisterInBucket]: Unable to commit transaction")
167
168

	return string(setVal[:])
169
170
171
}

// AntidoteSetRegisterInBucket set a LWWW register in a bucket
172
func AntidoteSetRegisterInBucket(bucketName string, registerName string, itemValue string, DatabaseHost string, DatabasePort int) bool {
173
174

	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
175
	errCheck(err, ERROR, "[AntidoteSetRegisterInBucket]: Unable to create a new AntidoteDB go client")
176
	defer client.Close()
177

178
	tx, err := client.StartTransaction()
179
	errCheck(err, ERROR, "[AntidoteSetRegisterInBucket]: Unable to start a transaction using the go client")
180

181
182
	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(registerName)
183

184
	err = bucket.Update(tx, antidoteclient.RegPut(key, []byte(itemValue)))
185
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteSetRegisterInBucket]: Unable to set value %s to register %s in bucket %s", itemValue, registerName, bucketName))
186

187
	setVal, err := bucket.ReadReg(tx, key)
188
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteSetRegisterInBucket]: Unable to read register %s in bucket %s", registerName, bucketName))
189

190
	err = tx.Commit()
191
	errCheck(err, ERROR, "[AntidoteSetRegisterInBucket]: Unable to commit transaction")
192

193
	// Return true if the register value is itemValue
194
	if string(setVal[:]) == itemValue {
195
		return true
196
	}
197

198
	// Return false otherwise
199
200
201
202
	return false
}

// AntidoteRemoveRegisterInBucket remove (set null) a LWWW register in a bucket
203
func AntidoteRemoveRegisterInBucket(bucketName string, registerName string, DatabaseHost string, DatabasePort int) bool {
204
205

	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
206
	errCheck(err, ERROR, "[AntidoteRemoveRegisterInBucket]: Unable to create a new AntidoteDB go client")
207
	defer client.Close()
208

209
	tx, err := client.StartTransaction()
210
	errCheck(err, ERROR, "[AntidoteRemoveRegisterInBucket]: Unable to start a transaction using the go client")
211

212
213
	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(registerName)
214

215
	err = bucket.Update(tx, antidoteclient.RegPut(key, []byte("")))
216
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteRemoveRegisterInBucket]: Unable to set a null value to register %s in bucket %s", registerName, bucketName))
217

218
	setVal, err := bucket.ReadReg(tx, key)
219
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteRemoveRegisterInBucket]: Unable to read register %s in bucket %s", registerName, bucketName))
220

221
	err = tx.Commit()
222
	errCheck(err, ERROR, "[AntidoteRemoveRegisterInBucket]: Unable to commit transaction")
223

224
	// Return true if the register value is null (0 length)
225
	if len(setVal) == 0 {
226
		return true
227
	}
228

229
	// Return false otherwise
230
231
232
233
	return false
}

// AntidoteReadItemsFromSetInBucket read items from a set in a bucket
234
func AntidoteReadItemsFromSetInBucket(bucketName string, setName string, DatabaseHost string, DatabasePort int) []string {
235
236
237

	var items []string

238
	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
239
	errCheck(err, ERROR, "[AntidoteReadItemsFromSetInBucket]: Unable to create a new AntidoteDB go client")
240
	defer client.Close()
241

242
	tx, err := client.StartTransaction()
243
	errCheck(err, ERROR, "[AntidoteReadItemsFromSetInBucket]: Unable to start a transaction using the go client")
244

245
246
247
248
	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(setName)

	setVal, err := bucket.ReadSet(tx, key)
249
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteReadItemsFromSetInBucket]: Unable to read set %s in bucket %s", setName, bucketName))
250

251
	err = tx.Commit()
252
	errCheck(err, ERROR, "[AntidoteReadItemsFromSetInBucket]: Unable to commit transaction")
253

254
255
	for _, v := range setVal {
		items = append(items, string(v))
256
257
258
259
260
261
	}

	return items
}

// AntidoteRemoveItemFromSetInBucket removes an item from a set in a bucket
262
func AntidoteRemoveItemFromSetInBucket(bucketName string, setName string, itemValue string, DatabaseHost string, DatabasePort int) bool {
263
264

	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
265
	errCheck(err, ERROR, "[AntidoteRemoveItemFromSetInBucket]: Unable to create a new AntidoteDB go client")
266
	defer client.Close()
267

268
	tx, err := client.StartTransaction()
269
	errCheck(err, ERROR, "[AntidoteRemoveItemFromSetInBucket]: Unable to start a transaction using the go client")
270

271
272
	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(setName)
273

274
	err = bucket.Update(tx, antidoteclient.SetRemove(key, []byte(itemValue)))
275
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteRemoveItemFromSetInBucket]: Unable to remove item %s from set %s in bucket %s", itemValue, setName, bucketName))
276
277

	setVal, err := bucket.ReadSet(tx, key)
278
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteRemoveItemFromSetInBucket]: Unable to read set %s in bucket %s", setName, bucketName))
279

280
	err = tx.Commit()
281
	errCheck(err, ERROR, "[AntidoteRemoveItemFromSetInBucket]: Unable to commit transaction")
282

283
284
	itemNotInSet := true

285
	// Return false if the item remains in the set
286
	for _, v := range setVal {
287
		if string(v[:]) == itemValue {
288
			itemNotInSet = false
289
290
291
		}
	}

292
	// Return true otherwise (even if it had never been there)
293
	return itemNotInSet
294
295
}

296
// ReadDevicesFromAntidote read all the devices from Antidote using the Go client
297
func ReadDevicesFromAntidote(DatabaseHost string, DatabasePort int) []cnml.DeviceIpv4sGraphserver {
298
299
	var devices []cnml.DeviceIpv4sGraphserver

300
	for _, v := range AntidoteReadItemsFromSetInBucket("guifi", "devices", DatabaseHost, DatabasePort) {
301
		thisDevID, err := strconv.Atoi(v)
302
		errCheck(err, WARNING, fmt.Sprintf("[ReadDevicesFromAntidote]: An error occurred when converting %s to int", v))
303
304
		var thisDev cnml.DeviceIpv4sGraphserver
		thisDev.ID = thisDevID
305
		thisDev.GraphServer, err = strconv.Atoi(AntidoteReadRegisterInBucket(fmt.Sprintf("device-%d", thisDevID), "graphserver", DatabaseHost, DatabasePort))
306
307
308
		if err != nil {
			thisDev.GraphServer = 0
		}
309
		thisDev.Addresses = AntidoteReadItemsFromSetInBucket(fmt.Sprintf("device-%d", thisDevID), "ipv4s", DatabaseHost, DatabasePort)
310
311
312
313
314
315
		devices = append(devices, thisDev)
	}

	return devices
}

316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
// The errCheck function gracefully manages errors, warnings, etc.
func errCheck(e error, level int, message string) {

	if message == "" {
		message = "Unknown error"
	}
	if e != nil {
		switch level {
		case FATAL:
			glog.Fatalln(message, e)
		case ERROR:
			glog.Errorln(message, e)
		case WARNING:
			glog.Warningln(message, e)
		default:
			glog.Infoln(message, e)
		}
333
334
	}
}