Developing lightweight computation at the DSG edge

goclidote.go 13.5 KB
Newer Older
1
2
3
4
5
package goclidote

import (
	"fmt"
	"strconv"
6
	"time"
7
8
9

	"uc-monitor-go-test/cnml"

10
	"github.com/AntidoteDB/antidote-go-client"
11
	"github.com/golang/glog"
12
13
14
)

// DatabaseHost defines the host of the AntidoteDB instance to connect to is running
15
// var DatabaseHost = "localhost"
16
17

// DatabasePort defines the port of the host where the AntidoteDB instance to connect to is running
18
// var DatabasePort = 8087
19

20
21
22
23
24
25
26
27
28
29
30
31
32
33
// Error levels

// FATAL output code
var FATAL = 1

// ERROR output code
var ERROR = 2

// WARNING output code
var WARNING = 3

// INFO output code
var INFO = 4

34
// AntidoteAddItemToSetInBucket save an item to a set in a bucket
35
func AntidoteAddItemToSetInBucket(bucketName string, setName string, itemValue string, DatabaseHost string, DatabasePort int) bool {
36
37

	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
38
	errCheck(err, ERROR, "[AntidoteAddItemToSetInBucket]: Unable to create a new AntidoteDB go client")
39
40
41
	defer client.Close()

	tx, err := client.StartTransaction()
42
	errCheck(err, ERROR, "[AntidoteAddItemToSetInBucket]: Unable to start a transaction using the go client")
43
44
45
46
47

	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(setName)

	err = bucket.Update(tx, antidoteclient.SetAdd(key, []byte(itemValue)))
48
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInBucket]: Unable to add value %s to set %s in bucket %s", itemValue, setName, bucketName))
49
50

	setVal, err := bucket.ReadSet(tx, key)
51
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInBucket]: Unable to read set %s in bucket %s", setName, bucketName))
52

53
	err = tx.Commit()
54
	errCheck(err, ERROR, "[AntidoteAddItemToSetInBucket]: Unable to commit transaction")
55

56
	// Return true if the value belongs now to the set (either it has been added now, or was there already)
57
58
59
60
61
62
	for _, v := range setVal {
		if string(v) == itemValue {
			return true
		}
	}

63
	// Return false otherwise
64
65
66
	return false
}

67
68
// AntidoteAddItemToSetInMapInBucket save an item to a set in a map in a bucket
func AntidoteAddItemToSetInMapInBucket(bucketName string, mapName string, setName string, itemValue string, DatabaseHost string, DatabasePort int) bool {
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
	errCheck(err, ERROR, "[AntidoteAddItemToSetInMapInBucket]: Unable to create a new AntidoteDB go client")
	defer client.Close()

	tx, err := client.StartTransaction()
	errCheck(err, ERROR, "[AntidoteAddItemToSetInMapInBucket]: Unable to start a transaction using the go client")

	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	mapKey := antidoteclient.Key(mapName)
	setKey := antidoteclient.Key(setName)

	err = bucket.Update(tx, antidoteclient.MapUpdate(mapKey,
		antidoteclient.SetAdd(setKey, []byte(itemValue))))
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInMapInBucket]: Unable to add value %s to set %s in bucket %s", itemValue, setName, bucketName))

	mapVal, err := bucket.ReadMap(tx, mapKey)
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInMapInBucket]: Unable to read set %s in bucket %s", setName, bucketName))

	setVal, err := mapVal.Set(setKey)
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInMapInBucket]: Unable to read set %s in bucket %s", setName, bucketName))

90
91
92
	err = tx.Commit()
	errCheck(err, ERROR, "[AntidoteAddItemToSetInMapInBucket]: Unable to commit transaction")

93
94
95
96
97
98
99
100
101
	// Return true if the value belongs now to the set (either it has been added now, or was there already)
	for _, v := range setVal {
		if string(v) == itemValue {
			return true
		}
	}

	// Return false otherwise
	return false
102
103
}

104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
// AntidoteAddArrayToSetInMapInBucket save an item to a set in a map in a bucket
func AntidoteAddArrayToSetInMapInBucket(bucketName string, mapName string, setName string, itemArray []string, DatabaseHost string, DatabasePort int) bool {
	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
	errCheck(err, ERROR, "[AntidoteAddArrayToSetInMapInBucket]: Unable to create a new AntidoteDB go client")
	defer client.Close()

	tx, err := client.StartTransaction()
	errCheck(err, ERROR, "[AntidoteAddArrayToSetInMapInBucket]: Unable to start a transaction using the go client")

	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	mapKey := antidoteclient.Key(mapName)
	setKey := antidoteclient.Key(setName)

	for _, v := range itemArray {
		err = bucket.Update(tx, antidoteclient.MapUpdate(mapKey,
			antidoteclient.SetAdd(setKey, []byte(v))))
		errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddArrayToSetInMapInBucket]: Unable to add value %s to set %s in bucket %s", v, setName, bucketName))
121
		time.Sleep(time.Millisecond)
122
123
124
125
126
127
128
129
130
131
132
133
134
135
	}

	mapVal, err := bucket.ReadMap(tx, mapKey)
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddArrayToSetInMapInBucket]: Unable to read set %s in bucket %s", setName, bucketName))

	setVal, err := mapVal.Set(setKey)
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddArrayToSetInMapInBucket]: Unable to read set %s in bucket %s", setName, bucketName))

	err = tx.Commit()
	errCheck(err, ERROR, "[AntidoteAddArrayToSetInMapInBucket]: Unable to commit transaction")

	// Return true if all values belong now to the set (either they have been added now, or were there already)
	anyMissing := false

136
	for _, v := range itemArray {
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
		thisFound := false
		for _, w := range setVal {
			if v == string(w) {
				thisFound = true
			}
			if !thisFound {
				anyMissing = true
				break
			}
		}
	}

	// Return if all itemswere found
	return !anyMissing
}

153
// AntidoteReadRegisterInBucket read a LWWW register  in a bucket
154
func AntidoteReadRegisterInBucket(bucketName string, registerName string, DatabaseHost string, DatabasePort int) string {
155

156
	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
157
	errCheck(err, ERROR, "[AntidoteReadRegisterInBucket]: Unable to create a new AntidoteDB go client")
158
	defer client.Close()
159

160
	tx, err := client.StartTransaction()
161
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteReadRegisterInBucket]: Unable to start a transaction using the go client. bucketName: %s, registerName %s, DatabaseHost %s, DatabasePort %d\n", bucketName, registerName, DatabaseHost, DatabasePort))
162

163
164
	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(registerName)
165

166
	setVal, err := bucket.ReadReg(tx, key)
167
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteReadRegisterInBucket]: Unable to read register %s in bucket %s", registerName, bucketName))
168

169
	err = tx.Commit()
170
	errCheck(err, ERROR, "[AntidoteReadRegisterInBucket]: Unable to commit transaction")
171
172

	return string(setVal[:])
173
174
175
}

// AntidoteSetRegisterInBucket set a LWWW register in a bucket
176
func AntidoteSetRegisterInBucket(bucketName string, registerName string, itemValue string, DatabaseHost string, DatabasePort int) bool {
177
178

	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
179
180
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteSetRegisterInBucket]: Unable to create a new AntidoteDB go client. bucketName: %s, registerName %s, itemValue %s, DatabaseHost %s, DatabasePort %d\n", bucketName, registerName, itemValue, DatabaseHost, DatabasePort))

181
	defer client.Close()
182

183
	tx, err := client.StartTransaction()
184
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteSetRegisterInBucket]: Unable to start a transaction using the go client. bucketName: %s, registerName %s, itemValue %s, DatabaseHost %s, DatabasePort %d\n", bucketName, registerName, itemValue, DatabaseHost, DatabasePort))
185

186
187
	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(registerName)
188

189
	err = bucket.Update(tx, antidoteclient.RegPut(key, []byte(itemValue)))
190
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteSetRegisterInBucket]: Unable to set value %s to register %s in bucket %s", itemValue, registerName, bucketName))
191

192
	setVal, err := bucket.ReadReg(tx, key)
193
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteSetRegisterInBucket]: Unable to read register %s in bucket %s", registerName, bucketName))
194

195
	err = tx.Commit()
196
	errCheck(err, ERROR, "[AntidoteSetRegisterInBucket]: Unable to commit transaction")
197

198
	// Return true if the register value is itemValue
199
	if string(setVal[:]) == itemValue {
200
		return true
201
	}
202

203
	// Return false otherwise
204
205
206
207
	return false
}

// AntidoteRemoveRegisterInBucket remove (set null) a LWWW register in a bucket
208
func AntidoteRemoveRegisterInBucket(bucketName string, registerName string, DatabaseHost string, DatabasePort int) bool {
209
210

	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
211
	errCheck(err, ERROR, "[AntidoteRemoveRegisterInBucket]: Unable to create a new AntidoteDB go client")
212
	defer client.Close()
213

214
	tx, err := client.StartTransaction()
215
	errCheck(err, ERROR, "[AntidoteRemoveRegisterInBucket]: Unable to start a transaction using the go client")
216

217
218
	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(registerName)
219

220
	err = bucket.Update(tx, antidoteclient.RegPut(key, []byte("")))
221
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteRemoveRegisterInBucket]: Unable to set a null value to register %s in bucket %s", registerName, bucketName))
222

223
	setVal, err := bucket.ReadReg(tx, key)
224
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteRemoveRegisterInBucket]: Unable to read register %s in bucket %s", registerName, bucketName))
225

226
	err = tx.Commit()
227
	errCheck(err, ERROR, "[AntidoteRemoveRegisterInBucket]: Unable to commit transaction")
228

229
	// Return true if the register value is null (0 length)
230
	if len(setVal) == 0 {
231
		return true
232
	}
233

234
	// Return false otherwise
235
236
237
238
	return false
}

// AntidoteReadItemsFromSetInBucket read items from a set in a bucket
239
func AntidoteReadItemsFromSetInBucket(bucketName string, setName string, DatabaseHost string, DatabasePort int) []string {
240
241
242

	var items []string

243
	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
244
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteReadItemsFromSetInBucket]: Unable to create a new AntidoteDB go client. bucketName %s, setName %s, DatabaseHost %s, DatabasePort %d\n", bucketName, setName, DatabaseHost, DatabasePort))
245
	defer client.Close()
246

247
	tx, err := client.StartTransaction()
248
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteReadItemsFromSetInBucket]: Unable to start a transaction using the go client. bucketName: %s, setName: %s, DatabaseHost %s, DatabasePort %d\n", bucketName, setName, DatabaseHost, DatabasePort))
249

250
251
252
253
	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(setName)

	setVal, err := bucket.ReadSet(tx, key)
254
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteReadItemsFromSetInBucket]: Unable to read set %s in bucket %s", setName, bucketName))
255

256
	err = tx.Commit()
257
	errCheck(err, ERROR, "[AntidoteReadItemsFromSetInBucket]: Unable to commit transaction")
258

259
260
	for _, v := range setVal {
		items = append(items, string(v))
261
262
263
264
265
266
	}

	return items
}

// AntidoteRemoveItemFromSetInBucket removes an item from a set in a bucket
267
func AntidoteRemoveItemFromSetInBucket(bucketName string, setName string, itemValue string, DatabaseHost string, DatabasePort int) bool {
268
269

	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
270
	errCheck(err, ERROR, "[AntidoteRemoveItemFromSetInBucket]: Unable to create a new AntidoteDB go client")
271
	defer client.Close()
272

273
	tx, err := client.StartTransaction()
274
	errCheck(err, ERROR, "[AntidoteRemoveItemFromSetInBucket]: Unable to start a transaction using the go client")
275

276
277
	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(setName)
278

279
	err = bucket.Update(tx, antidoteclient.SetRemove(key, []byte(itemValue)))
280
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteRemoveItemFromSetInBucket]: Unable to remove item %s from set %s in bucket %s", itemValue, setName, bucketName))
281
282

	setVal, err := bucket.ReadSet(tx, key)
283
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteRemoveItemFromSetInBucket]: Unable to read set %s in bucket %s", setName, bucketName))
284

285
	err = tx.Commit()
286
	errCheck(err, ERROR, "[AntidoteRemoveItemFromSetInBucket]: Unable to commit transaction")
287

288
289
	itemNotInSet := true

290
	// Return false if the item remains in the set
291
	for _, v := range setVal {
292
		if string(v[:]) == itemValue {
293
			itemNotInSet = false
294
295
296
		}
	}

297
	// Return true otherwise (even if it had never been there)
298
	return itemNotInSet
299
300
}

301
// ReadDevicesFromAntidote read all the devices from Antidote using the Go client
302
func ReadDevicesFromAntidote(DatabaseHost string, DatabasePort int) []cnml.DeviceIpv4sGraphserver {
303
304
	var devices []cnml.DeviceIpv4sGraphserver

305
	for _, v := range AntidoteReadItemsFromSetInBucket("guifi", "devices", DatabaseHost, DatabasePort) {
306
		time.Sleep(time.Millisecond)
307
		thisDevID, err := strconv.Atoi(v)
308
		errCheck(err, WARNING, fmt.Sprintf("[ReadDevicesFromAntidote]: An error occurred when converting %s to int", v))
309
		//fmt.Printf("%s, ",v)
310
311
		var thisDev cnml.DeviceIpv4sGraphserver
		thisDev.ID = thisDevID
312
		thisDev.GraphServer, err = strconv.Atoi(AntidoteReadRegisterInBucket(fmt.Sprintf("device-%d", thisDevID), "graphserver", DatabaseHost, DatabasePort))
313
314
315
		if err != nil {
			thisDev.GraphServer = 0
		}
316
		time.Sleep(time.Millisecond)
317
		thisDev.Addresses = AntidoteReadItemsFromSetInBucket(fmt.Sprintf("device-%d", thisDevID), "ipv4s", DatabaseHost, DatabasePort)
318
319
320
321
322
323
		devices = append(devices, thisDev)
	}

	return devices
}

324
325
326
327
328
329
330
331
332
// The errCheck function gracefully manages errors, warnings, etc.
func errCheck(e error, level int, message string) {

	if message == "" {
		message = "Unknown error"
	}
	if e != nil {
		switch level {
		case FATAL:
333
			glog.Infoln(message, e)
334
335
336
337
338
339
340
		case ERROR:
			glog.Errorln(message, e)
		case WARNING:
			glog.Warningln(message, e)
		default:
			glog.Infoln(message, e)
		}
341
342
	}
}