Developing lightweight computation at the DSG edge

goclidote.go 15.9 KB
Newer Older
1
2
3
4
5
package goclidote

import (
	"fmt"
	"strconv"
6
	"time"
7
8
9

	"uc-monitor-go-test/cnml"

10
	"github.com/AntidoteDB/antidote-go-client"
11
	"github.com/golang/glog"
12
13
14
)

// DatabaseHost defines the host of the AntidoteDB instance to connect to is running
15
// var DatabaseHost = "localhost"
16
17

// DatabasePort defines the port of the host where the AntidoteDB instance to connect to is running
18
// var DatabasePort = 8087
19

20
21
22
23
24
25
26
27
28
29
30
31
32
33
// Error levels

// FATAL output code
var FATAL = 1

// ERROR output code
var ERROR = 2

// WARNING output code
var WARNING = 3

// INFO output code
var INFO = 4

34
// AntidoteAddItemToSetInBucket save an item to a set in a bucket
35
func AntidoteAddItemToSetInBucket(bucketName string, setName string, itemValue string, DatabaseHost string, DatabasePort int) bool {
36
37

	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
38
	errCheck(err, ERROR, "[AntidoteAddItemToSetInBucket]: Unable to create a new AntidoteDB go client")
39
40
41
	defer client.Close()

	tx, err := client.StartTransaction()
42
	errCheck(err, ERROR, "[AntidoteAddItemToSetInBucket]: Unable to start a transaction using the go client")
43
44
45
46
47

	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(setName)

	err = bucket.Update(tx, antidoteclient.SetAdd(key, []byte(itemValue)))
48
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInBucket]: Unable to add value %s to set %s in bucket %s", itemValue, setName, bucketName))
49
50

	setVal, err := bucket.ReadSet(tx, key)
51
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInBucket]: Unable to read set %s in bucket %s", setName, bucketName))
52

53
	err = tx.Commit()
54
	errCheck(err, ERROR, "[AntidoteAddItemToSetInBucket]: Unable to commit transaction")
55

56
	// Return true if the value belongs now to the set (either it has been added now, or was there already)
57
58
59
60
61
62
	for _, v := range setVal {
		if string(v) == itemValue {
			return true
		}
	}

63
	// Return false otherwise
64
65
66
	return false
}

67
68
// AntidoteAddItemToSetInMapInBucket save an item to a set in a map in a bucket
func AntidoteAddItemToSetInMapInBucket(bucketName string, mapName string, setName string, itemValue string, DatabaseHost string, DatabasePort int) bool {
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
	errCheck(err, ERROR, "[AntidoteAddItemToSetInMapInBucket]: Unable to create a new AntidoteDB go client")
	defer client.Close()

	tx, err := client.StartTransaction()
	errCheck(err, ERROR, "[AntidoteAddItemToSetInMapInBucket]: Unable to start a transaction using the go client")

	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	mapKey := antidoteclient.Key(mapName)
	setKey := antidoteclient.Key(setName)

	err = bucket.Update(tx, antidoteclient.MapUpdate(mapKey,
		antidoteclient.SetAdd(setKey, []byte(itemValue))))
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInMapInBucket]: Unable to add value %s to set %s in bucket %s", itemValue, setName, bucketName))

	mapVal, err := bucket.ReadMap(tx, mapKey)
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInMapInBucket]: Unable to read set %s in bucket %s", setName, bucketName))

	setVal, err := mapVal.Set(setKey)
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInMapInBucket]: Unable to read set %s in bucket %s", setName, bucketName))

90
91
92
	err = tx.Commit()
	errCheck(err, ERROR, "[AntidoteAddItemToSetInMapInBucket]: Unable to commit transaction")

93
94
95
96
97
98
99
100
101
	// Return true if the value belongs now to the set (either it has been added now, or was there already)
	for _, v := range setVal {
		if string(v) == itemValue {
			return true
		}
	}

	// Return false otherwise
	return false
102
103
}

104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
// AntidoteAddArrayToSetInMapInBucket save an item to a set in a map in a bucket
func AntidoteAddArrayToSetInMapInBucket(bucketName string, mapName string, setName string, itemArray []string, DatabaseHost string, DatabasePort int) bool {
	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
	errCheck(err, ERROR, "[AntidoteAddArrayToSetInMapInBucket]: Unable to create a new AntidoteDB go client")
	defer client.Close()

	tx, err := client.StartTransaction()
	errCheck(err, ERROR, "[AntidoteAddArrayToSetInMapInBucket]: Unable to start a transaction using the go client")

	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	mapKey := antidoteclient.Key(mapName)
	setKey := antidoteclient.Key(setName)

	for _, v := range itemArray {
		err = bucket.Update(tx, antidoteclient.MapUpdate(mapKey,
			antidoteclient.SetAdd(setKey, []byte(v))))
		errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddArrayToSetInMapInBucket]: Unable to add value %s to set %s in bucket %s", v, setName, bucketName))
121
		time.Sleep(time.Millisecond)
122
123
124
125
126
127
128
129
130
131
132
133
134
135
	}

	mapVal, err := bucket.ReadMap(tx, mapKey)
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddArrayToSetInMapInBucket]: Unable to read set %s in bucket %s", setName, bucketName))

	setVal, err := mapVal.Set(setKey)
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddArrayToSetInMapInBucket]: Unable to read set %s in bucket %s", setName, bucketName))

	err = tx.Commit()
	errCheck(err, ERROR, "[AntidoteAddArrayToSetInMapInBucket]: Unable to commit transaction")

	// Return true if all values belong now to the set (either they have been added now, or were there already)
	anyMissing := false

136
	for _, v := range itemArray {
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
		thisFound := false
		for _, w := range setVal {
			if v == string(w) {
				thisFound = true
			}
			if !thisFound {
				anyMissing = true
				break
			}
		}
	}

	// Return if all itemswere found
	return !anyMissing
}

153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
// AntidoteAddArrayToSetInMapInMapInBucket save an item to a set in a map in a map in a bucket
func AntidoteAddArrayToSetInMapInMapInBucket(bucketName string, outerMapName string, innerMapName string, setName string, itemArray []string, DatabaseHost string, DatabasePort int) bool {
	funcName := "AntidoteAddArrayToSetInMapInMapInBucket"

	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
	errCheck(err, ERROR, "["+funcName+"]: Unable to create a new AntidoteDB go client")
	defer client.Close()

	tx, err := client.StartTransaction()
	errCheck(err, ERROR, "["+funcName+"]: Unable to start a transaction using the go client")

	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	outerMapKey := antidoteclient.Key(outerMapName)
	innerMapKey := antidoteclient.Key(innerMapName)
	setKey := antidoteclient.Key(setName)

	for _, v := range itemArray {
		err = bucket.Update(tx, antidoteclient.MapUpdate(outerMapKey,
			antidoteclient.MapUpdate(innerMapKey,
				antidoteclient.SetAdd(setKey, []byte(v)))))
		errCheck(err, ERROR, fmt.Sprintf("["+funcName+"]: Unable to add value %s to set %s in map %s in map %s in bucket %s", v, setName, innerMapName, outerMapName, bucketName))
		time.Sleep(time.Millisecond)
	}

	outerMapVal, err := bucket.ReadMap(tx, outerMapKey)
	errCheck(err, ERROR, fmt.Sprintf("["+funcName+"]: Unable to read map %s in bucket %s", outerMapName, bucketName))

	innerMapVal, err := outerMapVal.Map(innerMapKey)
	errCheck(err, ERROR, fmt.Sprintf("["+funcName+"]: Unable to read map %s in  map %s in bucket %s", innerMapName, outerMapName, bucketName))

	setVal, err := innerMapVal.Set(setKey)
	errCheck(err, ERROR, fmt.Sprintf("["+funcName+"]: Unable to read set %s in bucket %s", setName, bucketName))

	err = tx.Commit()
	errCheck(err, ERROR, "["+funcName+"]: Unable to commit transaction")

	// Return true if all values belong now to the set (either they have been added now, or were there already)
	anyMissing := false

	for _, v := range itemArray {
		thisFound := false
		for _, w := range setVal {
			if v == string(w) {
				thisFound = true
			}
			if !thisFound {
				anyMissing = true
				break
			}
		}
	}

	// Return if all itemswere found
	return !anyMissing
}

209
// AntidoteReadRegisterInBucket read a LWWW register  in a bucket
210
func AntidoteReadRegisterInBucket(bucketName string, registerName string, DatabaseHost string, DatabasePort int) string {
211

212
	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
213
	errCheck(err, ERROR, "[AntidoteReadRegisterInBucket]: Unable to create a new AntidoteDB go client")
214
	defer client.Close()
215

216
	tx, err := client.StartTransaction()
217
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteReadRegisterInBucket]: Unable to start a transaction using the go client. bucketName: %s, registerName %s, DatabaseHost %s, DatabasePort %d\n", bucketName, registerName, DatabaseHost, DatabasePort))
218

219
220
	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(registerName)
221

222
	setVal, err := bucket.ReadReg(tx, key)
223
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteReadRegisterInBucket]: Unable to read register %s in bucket %s", registerName, bucketName))
224

225
	err = tx.Commit()
226
	errCheck(err, ERROR, "[AntidoteReadRegisterInBucket]: Unable to commit transaction")
227
228

	return string(setVal[:])
229
230
231
}

// AntidoteSetRegisterInBucket set a LWWW register in a bucket
232
func AntidoteSetRegisterInBucket(bucketName string, registerName string, itemValue string, DatabaseHost string, DatabasePort int) bool {
233
234

	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
235
236
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteSetRegisterInBucket]: Unable to create a new AntidoteDB go client. bucketName: %s, registerName %s, itemValue %s, DatabaseHost %s, DatabasePort %d\n", bucketName, registerName, itemValue, DatabaseHost, DatabasePort))

237
	defer client.Close()
238

239
	tx, err := client.StartTransaction()
240
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteSetRegisterInBucket]: Unable to start a transaction using the go client. bucketName: %s, registerName %s, itemValue %s, DatabaseHost %s, DatabasePort %d\n", bucketName, registerName, itemValue, DatabaseHost, DatabasePort))
241

242
243
	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(registerName)
244

245
	err = bucket.Update(tx, antidoteclient.RegPut(key, []byte(itemValue)))
246
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteSetRegisterInBucket]: Unable to set value %s to register %s in bucket %s", itemValue, registerName, bucketName))
247

248
	setVal, err := bucket.ReadReg(tx, key)
249
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteSetRegisterInBucket]: Unable to read register %s in bucket %s", registerName, bucketName))
250

251
	err = tx.Commit()
252
	errCheck(err, ERROR, "[AntidoteSetRegisterInBucket]: Unable to commit transaction")
253

254
	// Return true if the register value is itemValue
255
	if string(setVal[:]) == itemValue {
256
		return true
257
	}
258

259
	// Return false otherwise
260
261
262
263
	return false
}

// AntidoteRemoveRegisterInBucket remove (set null) a LWWW register in a bucket
264
func AntidoteRemoveRegisterInBucket(bucketName string, registerName string, DatabaseHost string, DatabasePort int) bool {
265
266

	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
267
	errCheck(err, ERROR, "[AntidoteRemoveRegisterInBucket]: Unable to create a new AntidoteDB go client")
268
	defer client.Close()
269

270
	tx, err := client.StartTransaction()
271
	errCheck(err, ERROR, "[AntidoteRemoveRegisterInBucket]: Unable to start a transaction using the go client")
272

273
274
	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(registerName)
275

276
	err = bucket.Update(tx, antidoteclient.RegPut(key, []byte("")))
277
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteRemoveRegisterInBucket]: Unable to set a null value to register %s in bucket %s", registerName, bucketName))
278

279
	setVal, err := bucket.ReadReg(tx, key)
280
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteRemoveRegisterInBucket]: Unable to read register %s in bucket %s", registerName, bucketName))
281

282
	err = tx.Commit()
283
	errCheck(err, ERROR, "[AntidoteRemoveRegisterInBucket]: Unable to commit transaction")
284

285
	// Return true if the register value is null (0 length)
286
	if len(setVal) == 0 {
287
		return true
288
	}
289

290
	// Return false otherwise
291
292
293
294
	return false
}

// AntidoteReadItemsFromSetInBucket read items from a set in a bucket
295
func AntidoteReadItemsFromSetInBucket(bucketName string, setName string, DatabaseHost string, DatabasePort int) []string {
296
297
298

	var items []string

299
	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
300
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteReadItemsFromSetInBucket]: Unable to create a new AntidoteDB go client. bucketName %s, setName %s, DatabaseHost %s, DatabasePort %d\n", bucketName, setName, DatabaseHost, DatabasePort))
301
	defer client.Close()
302

303
	tx, err := client.StartTransaction()
304
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteReadItemsFromSetInBucket]: Unable to start a transaction using the go client. bucketName: %s, setName: %s, DatabaseHost %s, DatabasePort %d\n", bucketName, setName, DatabaseHost, DatabasePort))
305

306
307
308
309
	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(setName)

	setVal, err := bucket.ReadSet(tx, key)
310
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteReadItemsFromSetInBucket]: Unable to read set %s in bucket %s", setName, bucketName))
311

312
	err = tx.Commit()
313
	errCheck(err, ERROR, "[AntidoteReadItemsFromSetInBucket]: Unable to commit transaction")
314

315
316
	for _, v := range setVal {
		items = append(items, string(v))
317
318
319
320
321
322
	}

	return items
}

// AntidoteRemoveItemFromSetInBucket removes an item from a set in a bucket
323
func AntidoteRemoveItemFromSetInBucket(bucketName string, setName string, itemValue string, DatabaseHost string, DatabasePort int) bool {
324
325

	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
326
	errCheck(err, ERROR, "[AntidoteRemoveItemFromSetInBucket]: Unable to create a new AntidoteDB go client")
327
	defer client.Close()
328

329
	tx, err := client.StartTransaction()
330
	errCheck(err, ERROR, "[AntidoteRemoveItemFromSetInBucket]: Unable to start a transaction using the go client")
331

332
333
	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(setName)
334

335
	err = bucket.Update(tx, antidoteclient.SetRemove(key, []byte(itemValue)))
336
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteRemoveItemFromSetInBucket]: Unable to remove item %s from set %s in bucket %s", itemValue, setName, bucketName))
337
338

	setVal, err := bucket.ReadSet(tx, key)
339
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteRemoveItemFromSetInBucket]: Unable to read set %s in bucket %s", setName, bucketName))
340

341
	err = tx.Commit()
342
	errCheck(err, ERROR, "[AntidoteRemoveItemFromSetInBucket]: Unable to commit transaction")
343

344
345
	itemNotInSet := true

346
	// Return false if the item remains in the set
347
	for _, v := range setVal {
348
		if string(v[:]) == itemValue {
349
			itemNotInSet = false
350
351
352
		}
	}

353
	// Return true otherwise (even if it had never been there)
354
	return itemNotInSet
355
356
}

357
// ReadDevicesFromAntidote read all the devices from Antidote using the Go client
358
func ReadDevicesFromAntidote(DatabaseHost string, DatabasePort int) []cnml.DeviceIpv4sGraphserver {
359
360
	var devices []cnml.DeviceIpv4sGraphserver

361
	for _, v := range AntidoteReadItemsFromSetInBucket("guifi", "devices", DatabaseHost, DatabasePort) {
362
		time.Sleep(time.Millisecond)
363
		thisDevID, err := strconv.Atoi(v)
364
		errCheck(err, WARNING, fmt.Sprintf("[ReadDevicesFromAntidote]: An error occurred when converting %s to int", v))
365
		//fmt.Printf("%s, ",v)
366
367
		var thisDev cnml.DeviceIpv4sGraphserver
		thisDev.ID = thisDevID
368
		thisDev.GraphServer, err = strconv.Atoi(AntidoteReadRegisterInBucket(fmt.Sprintf("device-%d", thisDevID), "graphserver", DatabaseHost, DatabasePort))
369
370
371
		if err != nil {
			thisDev.GraphServer = 0
		}
372
		time.Sleep(time.Millisecond)
373
		thisDev.Addresses = AntidoteReadItemsFromSetInBucket(fmt.Sprintf("device-%d", thisDevID), "ipv4s", DatabaseHost, DatabasePort)
374
375
		time.Sleep(time.Millisecond)
		thisDev.SNMPNames = AntidoteReadItemsFromSetInBucket(fmt.Sprintf("device-%d", thisDevID), "snmpnames", DatabaseHost, DatabasePort)
376
377
378
379
380
381
		devices = append(devices, thisDev)
	}

	return devices
}

382
383
384
385
386
387
388
389
390
// The errCheck function gracefully manages errors, warnings, etc.
func errCheck(e error, level int, message string) {

	if message == "" {
		message = "Unknown error"
	}
	if e != nil {
		switch level {
		case FATAL:
391
			glog.Infoln(message, e)
392
393
394
395
396
397
398
		case ERROR:
			glog.Errorln(message, e)
		case WARNING:
			glog.Warningln(message, e)
		default:
			glog.Infoln(message, e)
		}
399
400
	}
}