Developing lightweight computation at the DSG edge

goclidote.go 12.7 KB
Newer Older
1
2
3
4
5
6
7
8
package goclidote

import (
	"fmt"
	"strconv"

	"uc-monitor-go-test/cnml"

9
	"github.com/AntidoteDB/antidote-go-client"
10
	"github.com/golang/glog"
11
12
13
)

// DatabaseHost defines the host of the AntidoteDB instance to connect to is running
14
// var DatabaseHost = "localhost"
15
16

// DatabasePort defines the port of the host where the AntidoteDB instance to connect to is running
17
// var DatabasePort = 8087
18

19
20
21
22
23
24
25
26
27
28
29
30
31
32
// Error levels

// FATAL output code
var FATAL = 1

// ERROR output code
var ERROR = 2

// WARNING output code
var WARNING = 3

// INFO output code
var INFO = 4

33
// AntidoteAddItemToSetInBucket save an item to a set in a bucket
34
func AntidoteAddItemToSetInBucket(bucketName string, setName string, itemValue string, DatabaseHost string, DatabasePort int) bool {
35
36

	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
37
	errCheck(err, ERROR, "[AntidoteAddItemToSetInBucket]: Unable to create a new AntidoteDB go client")
38
39
40
	defer client.Close()

	tx, err := client.StartTransaction()
41
	errCheck(err, ERROR, "[AntidoteAddItemToSetInBucket]: Unable to start a transaction using the go client")
42
43
44
45
46

	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(setName)

	err = bucket.Update(tx, antidoteclient.SetAdd(key, []byte(itemValue)))
47
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInBucket]: Unable to add value %s to set %s in bucket %s", itemValue, setName, bucketName))
48
49

	setVal, err := bucket.ReadSet(tx, key)
50
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInBucket]: Unable to read set %s in bucket %s", setName, bucketName))
51

52
	err = tx.Commit()
53
	errCheck(err, ERROR, "[AntidoteAddItemToSetInBucket]: Unable to commit transaction")
54

55
	// Return true if the value belongs now to the set (either it has been added now, or was there already)
56
57
58
59
60
61
	for _, v := range setVal {
		if string(v) == itemValue {
			return true
		}
	}

62
	// Return false otherwise
63
64
65
	return false
}

66

67
68
// AntidoteAddItemToSetInMapInBucket save an item to a set in a map in a bucket
func AntidoteAddItemToSetInMapInBucket(bucketName string, mapName string, setName string, itemValue string, DatabaseHost string, DatabasePort int) bool {
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
	errCheck(err, ERROR, "[AntidoteAddItemToSetInMapInBucket]: Unable to create a new AntidoteDB go client")
	defer client.Close()

	tx, err := client.StartTransaction()
	errCheck(err, ERROR, "[AntidoteAddItemToSetInMapInBucket]: Unable to start a transaction using the go client")

	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	mapKey := antidoteclient.Key(mapName)
	setKey := antidoteclient.Key(setName)

	err = bucket.Update(tx, antidoteclient.MapUpdate(mapKey,
		antidoteclient.SetAdd(setKey, []byte(itemValue))))
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInMapInBucket]: Unable to add value %s to set %s in bucket %s", itemValue, setName, bucketName))

	mapVal, err := bucket.ReadMap(tx, mapKey)
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInMapInBucket]: Unable to read set %s in bucket %s", setName, bucketName))

	setVal, err := mapVal.Set(setKey)
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInMapInBucket]: Unable to read set %s in bucket %s", setName, bucketName))

90
91
92
	err = tx.Commit()
	errCheck(err, ERROR, "[AntidoteAddItemToSetInMapInBucket]: Unable to commit transaction")

93
94
95
96
97
98
99
100
101
	// Return true if the value belongs now to the set (either it has been added now, or was there already)
	for _, v := range setVal {
		if string(v) == itemValue {
			return true
		}
	}

	// Return false otherwise
	return false
102
103
}

104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
// AntidoteAddArrayToSetInMapInBucket save an item to a set in a map in a bucket
func AntidoteAddArrayToSetInMapInBucket(bucketName string, mapName string, setName string, itemArray []string, DatabaseHost string, DatabasePort int) bool {
	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
	errCheck(err, ERROR, "[AntidoteAddArrayToSetInMapInBucket]: Unable to create a new AntidoteDB go client")
	defer client.Close()

	tx, err := client.StartTransaction()
	errCheck(err, ERROR, "[AntidoteAddArrayToSetInMapInBucket]: Unable to start a transaction using the go client")

	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	mapKey := antidoteclient.Key(mapName)
	setKey := antidoteclient.Key(setName)

	for _, v := range itemArray {
		err = bucket.Update(tx, antidoteclient.MapUpdate(mapKey,
			antidoteclient.SetAdd(setKey, []byte(v))))
		errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddArrayToSetInMapInBucket]: Unable to add value %s to set %s in bucket %s", v, setName, bucketName))
	}

	mapVal, err := bucket.ReadMap(tx, mapKey)
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddArrayToSetInMapInBucket]: Unable to read set %s in bucket %s", setName, bucketName))

	setVal, err := mapVal.Set(setKey)
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddArrayToSetInMapInBucket]: Unable to read set %s in bucket %s", setName, bucketName))

	err = tx.Commit()
	errCheck(err, ERROR, "[AntidoteAddArrayToSetInMapInBucket]: Unable to commit transaction")

	// Return true if all values belong now to the set (either they have been added now, or were there already)
	anyMissing := false

	for _,v := range itemArray {
		thisFound := false
		for _, w := range setVal {
			if v == string(w) {
				thisFound = true
			}
			if !thisFound {
				anyMissing = true
				break
			}
		}
	}

	// Return if all itemswere found
	return !anyMissing
}

152
// AntidoteReadRegisterInBucket read a LWWW register  in a bucket
153
func AntidoteReadRegisterInBucket(bucketName string, registerName string, DatabaseHost string, DatabasePort int) string {
154

155
	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
156
	errCheck(err, ERROR, "[AntidoteReadRegisterInBucket]: Unable to create a new AntidoteDB go client")
157
	defer client.Close()
158

159
	tx, err := client.StartTransaction()
160
	errCheck(err, ERROR, "[AntidoteReadRegisterInBucket]: Unable to start a transaction using the go client")
161

162
163
	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(registerName)
164

165
	setVal, err := bucket.ReadReg(tx, key)
166
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteReadRegisterInBucket]: Unable to read register %s in bucket %s", registerName, bucketName))
167

168
	err = tx.Commit()
169
	errCheck(err, ERROR, "[AntidoteReadRegisterInBucket]: Unable to commit transaction")
170
171

	return string(setVal[:])
172
173
174
}

// AntidoteSetRegisterInBucket set a LWWW register in a bucket
175
func AntidoteSetRegisterInBucket(bucketName string, registerName string, itemValue string, DatabaseHost string, DatabasePort int) bool {
176
177

	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
178
	errCheck(err, ERROR, "[AntidoteSetRegisterInBucket]: Unable to create a new AntidoteDB go client")
179
	defer client.Close()
180

181
	tx, err := client.StartTransaction()
182
	errCheck(err, ERROR, "[AntidoteSetRegisterInBucket]: Unable to start a transaction using the go client")
183

184
185
	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(registerName)
186

187
	err = bucket.Update(tx, antidoteclient.RegPut(key, []byte(itemValue)))
188
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteSetRegisterInBucket]: Unable to set value %s to register %s in bucket %s", itemValue, registerName, bucketName))
189

190
	setVal, err := bucket.ReadReg(tx, key)
191
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteSetRegisterInBucket]: Unable to read register %s in bucket %s", registerName, bucketName))
192

193
	err = tx.Commit()
194
	errCheck(err, ERROR, "[AntidoteSetRegisterInBucket]: Unable to commit transaction")
195

196
	// Return true if the register value is itemValue
197
	if string(setVal[:]) == itemValue {
198
		return true
199
	}
200

201
	// Return false otherwise
202
203
204
205
	return false
}

// AntidoteRemoveRegisterInBucket remove (set null) a LWWW register in a bucket
206
func AntidoteRemoveRegisterInBucket(bucketName string, registerName string, DatabaseHost string, DatabasePort int) bool {
207
208

	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
209
	errCheck(err, ERROR, "[AntidoteRemoveRegisterInBucket]: Unable to create a new AntidoteDB go client")
210
	defer client.Close()
211

212
	tx, err := client.StartTransaction()
213
	errCheck(err, ERROR, "[AntidoteRemoveRegisterInBucket]: Unable to start a transaction using the go client")
214

215
216
	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(registerName)
217

218
	err = bucket.Update(tx, antidoteclient.RegPut(key, []byte("")))
219
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteRemoveRegisterInBucket]: Unable to set a null value to register %s in bucket %s", registerName, bucketName))
220

221
	setVal, err := bucket.ReadReg(tx, key)
222
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteRemoveRegisterInBucket]: Unable to read register %s in bucket %s", registerName, bucketName))
223

224
	err = tx.Commit()
225
	errCheck(err, ERROR, "[AntidoteRemoveRegisterInBucket]: Unable to commit transaction")
226

227
	// Return true if the register value is null (0 length)
228
	if len(setVal) == 0 {
229
		return true
230
	}
231

232
	// Return false otherwise
233
234
235
236
	return false
}

// AntidoteReadItemsFromSetInBucket read items from a set in a bucket
237
func AntidoteReadItemsFromSetInBucket(bucketName string, setName string, DatabaseHost string, DatabasePort int) []string {
238
239
240

	var items []string

241
	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
242
	errCheck(err, ERROR, "[AntidoteReadItemsFromSetInBucket]: Unable to create a new AntidoteDB go client")
243
	defer client.Close()
244

245
	tx, err := client.StartTransaction()
246
	errCheck(err, ERROR, "[AntidoteReadItemsFromSetInBucket]: Unable to start a transaction using the go client")
247

248
249
250
251
	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(setName)

	setVal, err := bucket.ReadSet(tx, key)
252
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteReadItemsFromSetInBucket]: Unable to read set %s in bucket %s", setName, bucketName))
253

254
	err = tx.Commit()
255
	errCheck(err, ERROR, "[AntidoteReadItemsFromSetInBucket]: Unable to commit transaction")
256

257
258
	for _, v := range setVal {
		items = append(items, string(v))
259
260
261
262
263
264
	}

	return items
}

// AntidoteRemoveItemFromSetInBucket removes an item from a set in a bucket
265
func AntidoteRemoveItemFromSetInBucket(bucketName string, setName string, itemValue string, DatabaseHost string, DatabasePort int) bool {
266
267

	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
268
	errCheck(err, ERROR, "[AntidoteRemoveItemFromSetInBucket]: Unable to create a new AntidoteDB go client")
269
	defer client.Close()
270

271
	tx, err := client.StartTransaction()
272
	errCheck(err, ERROR, "[AntidoteRemoveItemFromSetInBucket]: Unable to start a transaction using the go client")
273

274
275
	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(setName)
276

277
	err = bucket.Update(tx, antidoteclient.SetRemove(key, []byte(itemValue)))
278
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteRemoveItemFromSetInBucket]: Unable to remove item %s from set %s in bucket %s", itemValue, setName, bucketName))
279
280

	setVal, err := bucket.ReadSet(tx, key)
281
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteRemoveItemFromSetInBucket]: Unable to read set %s in bucket %s", setName, bucketName))
282

283
	err = tx.Commit()
284
	errCheck(err, ERROR, "[AntidoteRemoveItemFromSetInBucket]: Unable to commit transaction")
285

286
287
	itemNotInSet := true

288
	// Return false if the item remains in the set
289
	for _, v := range setVal {
290
		if string(v[:]) == itemValue {
291
			itemNotInSet = false
292
293
294
		}
	}

295
	// Return true otherwise (even if it had never been there)
296
	return itemNotInSet
297
298
}

299
// ReadDevicesFromAntidote read all the devices from Antidote using the Go client
300
func ReadDevicesFromAntidote(DatabaseHost string, DatabasePort int) []cnml.DeviceIpv4sGraphserver {
301
302
	var devices []cnml.DeviceIpv4sGraphserver

303
	for _, v := range AntidoteReadItemsFromSetInBucket("guifi", "devices", DatabaseHost, DatabasePort) {
304
		thisDevID, err := strconv.Atoi(v)
305
		errCheck(err, WARNING, fmt.Sprintf("[ReadDevicesFromAntidote]: An error occurred when converting %s to int", v))
306
307
		var thisDev cnml.DeviceIpv4sGraphserver
		thisDev.ID = thisDevID
308
		thisDev.GraphServer, err = strconv.Atoi(AntidoteReadRegisterInBucket(fmt.Sprintf("device-%d", thisDevID), "graphserver", DatabaseHost, DatabasePort))
309
310
311
		if err != nil {
			thisDev.GraphServer = 0
		}
312
		thisDev.Addresses = AntidoteReadItemsFromSetInBucket(fmt.Sprintf("device-%d", thisDevID), "ipv4s", DatabaseHost, DatabasePort)
313
314
315
316
317
318
		devices = append(devices, thisDev)
	}

	return devices
}

319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
// The errCheck function gracefully manages errors, warnings, etc.
func errCheck(e error, level int, message string) {

	if message == "" {
		message = "Unknown error"
	}
	if e != nil {
		switch level {
		case FATAL:
			glog.Fatalln(message, e)
		case ERROR:
			glog.Errorln(message, e)
		case WARNING:
			glog.Warningln(message, e)
		default:
			glog.Infoln(message, e)
		}
336
337
	}
}