Developing lightweight computation at the DSG edge

goclidote.go 15.9 KB
Newer Older
1
2
3
4
5
package goclidote

import (
	"fmt"
	"strconv"
6
	"time"
7
8
9

	"uc-monitor-go-test/cnml"

10
	"github.com/AntidoteDB/antidote-go-client"
11
	"github.com/golang/glog"
12
13
14
)

// DatabaseHost defines the host of the AntidoteDB instance to connect to is running
15
// var DatabaseHost = "localhost"
16
17

// DatabasePort defines the port of the host where the AntidoteDB instance to connect to is running
18
// var DatabasePort = 8087
19

20
21
22
23
24
25
26
27
28
29
30
31
32
33
// Error levels

// FATAL output code
var FATAL = 1

// ERROR output code
var ERROR = 2

// WARNING output code
var WARNING = 3

// INFO output code
var INFO = 4

34
// AntidoteAddItemToSetInBucket save an item to a set in a bucket
35
func AntidoteAddItemToSetInBucket(bucketName string, setName string, itemValue string, DatabaseHost string, DatabasePort int) bool {
36
37

	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
38
	errCheck(err, ERROR, "[AntidoteAddItemToSetInBucket]: Unable to create a new AntidoteDB go client")
39
40
41
	defer client.Close()

	tx, err := client.StartTransaction()
42
	errCheck(err, ERROR, "[AntidoteAddItemToSetInBucket]: Unable to start a transaction using the go client")
43
44
45
46
47

	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(setName)

	err = bucket.Update(tx, antidoteclient.SetAdd(key, []byte(itemValue)))
48
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInBucket]: Unable to add value %s to set %s in bucket %s", itemValue, setName, bucketName))
49
50

	setVal, err := bucket.ReadSet(tx, key)
51
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInBucket]: Unable to read set %s in bucket %s", setName, bucketName))
52

53
	err = tx.Commit()
54
	errCheck(err, ERROR, "[AntidoteAddItemToSetInBucket]: Unable to commit transaction")
55

56
	// Return true if the value belongs now to the set (either it has been added now, or was there already)
57
58
59
60
61
62
	for _, v := range setVal {
		if string(v) == itemValue {
			return true
		}
	}

63
	// Return false otherwise
64
65
66
	return false
}

67
68
// AntidoteAddItemToSetInMapInBucket save an item to a set in a map in a bucket
func AntidoteAddItemToSetInMapInBucket(bucketName string, mapName string, setName string, itemValue string, DatabaseHost string, DatabasePort int) bool {
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
	errCheck(err, ERROR, "[AntidoteAddItemToSetInMapInBucket]: Unable to create a new AntidoteDB go client")
	defer client.Close()

	tx, err := client.StartTransaction()
	errCheck(err, ERROR, "[AntidoteAddItemToSetInMapInBucket]: Unable to start a transaction using the go client")

	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	mapKey := antidoteclient.Key(mapName)
	setKey := antidoteclient.Key(setName)

	err = bucket.Update(tx, antidoteclient.MapUpdate(mapKey,
		antidoteclient.SetAdd(setKey, []byte(itemValue))))
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInMapInBucket]: Unable to add value %s to set %s in bucket %s", itemValue, setName, bucketName))

	mapVal, err := bucket.ReadMap(tx, mapKey)
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInMapInBucket]: Unable to read set %s in bucket %s", setName, bucketName))

	setVal, err := mapVal.Set(setKey)
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInMapInBucket]: Unable to read set %s in bucket %s", setName, bucketName))

90
91
92
	err = tx.Commit()
	errCheck(err, ERROR, "[AntidoteAddItemToSetInMapInBucket]: Unable to commit transaction")

93
94
95
96
97
98
99
100
101
	// Return true if the value belongs now to the set (either it has been added now, or was there already)
	for _, v := range setVal {
		if string(v) == itemValue {
			return true
		}
	}

	// Return false otherwise
	return false
102
103
}

104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
// AntidoteAddArrayToSetInMapInBucket save an item to a set in a map in a bucket
func AntidoteAddArrayToSetInMapInBucket(bucketName string, mapName string, setName string, itemArray []string, DatabaseHost string, DatabasePort int) bool {
	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
	errCheck(err, ERROR, "[AntidoteAddArrayToSetInMapInBucket]: Unable to create a new AntidoteDB go client")
	defer client.Close()

	tx, err := client.StartTransaction()
	errCheck(err, ERROR, "[AntidoteAddArrayToSetInMapInBucket]: Unable to start a transaction using the go client")

	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	mapKey := antidoteclient.Key(mapName)
	setKey := antidoteclient.Key(setName)

	for _, v := range itemArray {
		err = bucket.Update(tx, antidoteclient.MapUpdate(mapKey,
			antidoteclient.SetAdd(setKey, []byte(v))))
		errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddArrayToSetInMapInBucket]: Unable to add value %s to set %s in bucket %s", v, setName, bucketName))
121
		time.Sleep(time.Millisecond)
122
123
124
125
126
127
128
129
130
131
132
133
134
135
	}

	mapVal, err := bucket.ReadMap(tx, mapKey)
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddArrayToSetInMapInBucket]: Unable to read set %s in bucket %s", setName, bucketName))

	setVal, err := mapVal.Set(setKey)
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddArrayToSetInMapInBucket]: Unable to read set %s in bucket %s", setName, bucketName))

	err = tx.Commit()
	errCheck(err, ERROR, "[AntidoteAddArrayToSetInMapInBucket]: Unable to commit transaction")

	// Return true if all values belong now to the set (either they have been added now, or were there already)
	anyMissing := false

136
	for _, v := range itemArray {
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
		thisFound := false
		for _, w := range setVal {
			if v == string(w) {
				thisFound = true
			}
			if !thisFound {
				anyMissing = true
				break
			}
		}
	}

	// Return if all itemswere found
	return !anyMissing
}

153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
// AntidoteAddArrayToSetInMapInMapInBucket save an item to a set in a map in a map in a bucket
func AntidoteAddArrayToSetInMapInMapInBucket(bucketName string, outerMapName string, innerMapName string, setName string, itemArray []string, DatabaseHost string, DatabasePort int) bool {
	funcName := "AntidoteAddArrayToSetInMapInMapInBucket"

	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
	errCheck(err, ERROR, "["+funcName+"]: Unable to create a new AntidoteDB go client")
	defer client.Close()

	tx, err := client.StartTransaction()
	errCheck(err, ERROR, "["+funcName+"]: Unable to start a transaction using the go client")

	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	outerMapKey := antidoteclient.Key(outerMapName)
	innerMapKey := antidoteclient.Key(innerMapName)
	setKey := antidoteclient.Key(setName)

	for _, v := range itemArray {
		err = bucket.Update(tx, antidoteclient.MapUpdate(outerMapKey,
			antidoteclient.MapUpdate(innerMapKey,
				antidoteclient.SetAdd(setKey, []byte(v)))))
		errCheck(err, ERROR, fmt.Sprintf("["+funcName+"]: Unable to add value %s to set %s in map %s in map %s in bucket %s", v, setName, innerMapName, outerMapName, bucketName))
		time.Sleep(time.Millisecond)
	}

	outerMapVal, err := bucket.ReadMap(tx, outerMapKey)
	errCheck(err, ERROR, fmt.Sprintf("["+funcName+"]: Unable to read map %s in bucket %s", outerMapName, bucketName))

	innerMapVal, err := outerMapVal.Map(innerMapKey)
181
	errCheck(err, ERROR, fmt.Sprintf("["+funcName+"]: Unable to read map %s in map %s in bucket %s", innerMapName, outerMapName, bucketName))
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208

	setVal, err := innerMapVal.Set(setKey)
	errCheck(err, ERROR, fmt.Sprintf("["+funcName+"]: Unable to read set %s in bucket %s", setName, bucketName))

	err = tx.Commit()
	errCheck(err, ERROR, "["+funcName+"]: Unable to commit transaction")

	// Return true if all values belong now to the set (either they have been added now, or were there already)
	anyMissing := false

	for _, v := range itemArray {
		thisFound := false
		for _, w := range setVal {
			if v == string(w) {
				thisFound = true
			}
			if !thisFound {
				anyMissing = true
				break
			}
		}
	}

	// Return if all itemswere found
	return !anyMissing
}

209
// AntidoteReadRegisterInBucket read a LWWW register  in a bucket
210
func AntidoteReadRegisterInBucket(bucketName string, registerName string, DatabaseHost string, DatabasePort int) string {
211

212
	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
213
	errCheck(err, ERROR, "[AntidoteReadRegisterInBucket]: Unable to create a new AntidoteDB go client")
214
	defer client.Close()
215

216
	tx, err := client.StartTransaction()
217
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteReadRegisterInBucket]: Unable to start a transaction using the go client. bucketName: %s, registerName %s, DatabaseHost %s, DatabasePort %d\n", bucketName, registerName, DatabaseHost, DatabasePort))
218

219
220
	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(registerName)
221

222
	setVal, err := bucket.ReadReg(tx, key)
223
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteReadRegisterInBucket]: Unable to read register %s in bucket %s", registerName, bucketName))
224

225
	err = tx.Commit()
226
	errCheck(err, ERROR, "[AntidoteReadRegisterInBucket]: Unable to commit transaction")
227
228

	return string(setVal[:])
229
230
231
}

// AntidoteSetRegisterInBucket set a LWWW register in a bucket
232
func AntidoteSetRegisterInBucket(bucketName string, registerName string, itemValue string, DatabaseHost string, DatabasePort int) bool {
233
234

	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
235
236
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteSetRegisterInBucket]: Unable to create a new AntidoteDB go client. bucketName: %s, registerName %s, itemValue %s, DatabaseHost %s, DatabasePort %d\n", bucketName, registerName, itemValue, DatabaseHost, DatabasePort))

237
	defer client.Close()
238

239
	tx, err := client.StartTransaction()
240
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteSetRegisterInBucket]: Unable to start a transaction using the go client. bucketName: %s, registerName %s, itemValue %s, DatabaseHost %s, DatabasePort %d\n", bucketName, registerName, itemValue, DatabaseHost, DatabasePort))
241

242
243
	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(registerName)
244

245
	err = bucket.Update(tx, antidoteclient.RegPut(key, []byte(itemValue)))
246
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteSetRegisterInBucket]: Unable to set value %s to register %s in bucket %s", itemValue, registerName, bucketName))
247

248
	setVal, err := bucket.ReadReg(tx, key)
249
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteSetRegisterInBucket]: Unable to read register %s in bucket %s", registerName, bucketName))
250

251
	err = tx.Commit()
252
	errCheck(err, ERROR, "[AntidoteSetRegisterInBucket]: Unable to commit transaction")
253

254
	// Return true if the register value is itemValue
255
	if string(setVal[:]) == itemValue {
256
		return true
257
	}
258

259
	// Return false otherwise
260
261
262
263
	return false
}

// AntidoteRemoveRegisterInBucket remove (set null) a LWWW register in a bucket
264
func AntidoteRemoveRegisterInBucket(bucketName string, registerName string, DatabaseHost string, DatabasePort int) bool {
265
266

	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
267
	errCheck(err, ERROR, "[AntidoteRemoveRegisterInBucket]: Unable to create a new AntidoteDB go client")
268
	defer client.Close()
269

270
	tx, err := client.StartTransaction()
271
	errCheck(err, ERROR, "[AntidoteRemoveRegisterInBucket]: Unable to start a transaction using the go client")
272

273
274
	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(registerName)
275

276
	err = bucket.Update(tx, antidoteclient.RegPut(key, []byte("")))
277
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteRemoveRegisterInBucket]: Unable to set a null value to register %s in bucket %s", registerName, bucketName))
278

279
	setVal, err := bucket.ReadReg(tx, key)
280
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteRemoveRegisterInBucket]: Unable to read register %s in bucket %s", registerName, bucketName))
281

282
	err = tx.Commit()
283
	errCheck(err, ERROR, "[AntidoteRemoveRegisterInBucket]: Unable to commit transaction")
284

285
	// Return true if the register value is null (0 length)
286
	if len(setVal) == 0 {
287
		return true
288
	}
289

290
	// Return false otherwise
291
292
293
294
	return false
}

// AntidoteReadItemsFromSetInBucket read items from a set in a bucket
295
func AntidoteReadItemsFromSetInBucket(bucketName string, setName string, DatabaseHost string, DatabasePort int) []string {
296
297
298

	var items []string

299
	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
300
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteReadItemsFromSetInBucket]: Unable to create a new AntidoteDB go client. bucketName %s, setName %s, DatabaseHost %s, DatabasePort %d\n", bucketName, setName, DatabaseHost, DatabasePort))
301
	defer client.Close()
302

303
	tx, err := client.StartTransaction()
304
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteReadItemsFromSetInBucket]: Unable to start a transaction using the go client. bucketName: %s, setName: %s, DatabaseHost %s, DatabasePort %d\n", bucketName, setName, DatabaseHost, DatabasePort))
305

306
307
308
309
	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(setName)

	setVal, err := bucket.ReadSet(tx, key)
310
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteReadItemsFromSetInBucket]: Unable to read set %s in bucket %s", setName, bucketName))
311

312
	err = tx.Commit()
313
	errCheck(err, ERROR, "[AntidoteReadItemsFromSetInBucket]: Unable to commit transaction")
314

315
316
	for _, v := range setVal {
		items = append(items, string(v))
317
318
319
320
321
322
	}

	return items
}

// AntidoteRemoveItemFromSetInBucket removes an item from a set in a bucket
323
func AntidoteRemoveItemFromSetInBucket(bucketName string, setName string, itemValue string, DatabaseHost string, DatabasePort int) bool {
324
325

	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
326
	errCheck(err, ERROR, "[AntidoteRemoveItemFromSetInBucket]: Unable to create a new AntidoteDB go client")
327
	defer client.Close()
328

329
	tx, err := client.StartTransaction()
330
	errCheck(err, ERROR, "[AntidoteRemoveItemFromSetInBucket]: Unable to start a transaction using the go client")
331

332
333
	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(setName)
334

335
	err = bucket.Update(tx, antidoteclient.SetRemove(key, []byte(itemValue)))
336
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteRemoveItemFromSetInBucket]: Unable to remove item %s from set %s in bucket %s", itemValue, setName, bucketName))
337
338

	setVal, err := bucket.ReadSet(tx, key)
339
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteRemoveItemFromSetInBucket]: Unable to read set %s in bucket %s", setName, bucketName))
340

341
	err = tx.Commit()
342
	errCheck(err, ERROR, "[AntidoteRemoveItemFromSetInBucket]: Unable to commit transaction")
343

344
345
	itemNotInSet := true

346
	// Return false if the item remains in the set
347
	for _, v := range setVal {
348
		if string(v[:]) == itemValue {
349
			itemNotInSet = false
350
351
352
		}
	}

353
	// Return true otherwise (even if it had never been there)
354
	return itemNotInSet
355
356
}

357
// ReadDevicesFromAntidote read all the devices from Antidote using the Go client
358
func ReadDevicesFromAntidote(DatabaseHost string, DatabasePort int) []cnml.DeviceIpv4sGraphserver {
359
360
	var devices []cnml.DeviceIpv4sGraphserver

361
	for _, v := range AntidoteReadItemsFromSetInBucket("guifi", "devices", DatabaseHost, DatabasePort) {
362
		time.Sleep(time.Millisecond)
363
		thisDevID, err := strconv.Atoi(v)
364
		errCheck(err, WARNING, fmt.Sprintf("[ReadDevicesFromAntidote]: An error occurred when converting %s to int", v))
365
		//fmt.Printf("%s, ",v)
366
367
		var thisDev cnml.DeviceIpv4sGraphserver
		thisDev.ID = thisDevID
368
		thisDev.GraphServer, err = strconv.Atoi(AntidoteReadRegisterInBucket(fmt.Sprintf("device-%d", thisDevID), "graphserver", DatabaseHost, DatabasePort))
369
370
371
		if err != nil {
			thisDev.GraphServer = 0
		}
372
		time.Sleep(time.Millisecond)
373
		thisDev.Addresses = AntidoteReadItemsFromSetInBucket(fmt.Sprintf("device-%d", thisDevID), "ipv4s", DatabaseHost, DatabasePort)
374
375
		time.Sleep(time.Millisecond)
		thisDev.SNMPNames = AntidoteReadItemsFromSetInBucket(fmt.Sprintf("device-%d", thisDevID), "snmpnames", DatabaseHost, DatabasePort)
376
377
378
379
380
381
		devices = append(devices, thisDev)
	}

	return devices
}

382
383
384
385
386
387
388
389
390
// The errCheck function gracefully manages errors, warnings, etc.
func errCheck(e error, level int, message string) {

	if message == "" {
		message = "Unknown error"
	}
	if e != nil {
		switch level {
		case FATAL:
391
			glog.Infoln(message, e)
392
393
394
395
396
397
398
		case ERROR:
			glog.Errorln(message, e)
		case WARNING:
			glog.Warningln(message, e)
		default:
			glog.Infoln(message, e)
		}
399
400
	}
}