Developing lightweight computation at the DSG edge

goclidote.go 10.9 KB
Newer Older
1
2
3
4
5
6
7
8
package goclidote

import (
	"fmt"
	"strconv"

	"uc-monitor-go-test/cnml"

9
	"github.com/AntidoteDB/antidote-go-client"
10
	"github.com/golang/glog"
11
12
13
)

// DatabaseHost defines the host of the AntidoteDB instance to connect to is running
14
// var DatabaseHost = "localhost"
15
16

// DatabasePort defines the port of the host where the AntidoteDB instance to connect to is running
17
// var DatabasePort = 8087
18

19
20
21
22
23
24
25
26
27
28
29
30
31
32
// Error levels

// FATAL output code
var FATAL = 1

// ERROR output code
var ERROR = 2

// WARNING output code
var WARNING = 3

// INFO output code
var INFO = 4

33
// AntidoteAddItemToSetInBucket save an item to a set in a bucket
34
func AntidoteAddItemToSetInBucket(bucketName string, setName string, itemValue string, DatabaseHost string, DatabasePort int) bool {
35
36

	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
37
	errCheck(err, ERROR, "[AntidoteAddItemToSetInBucket]: Unable to create a new AntidoteDB go client")
38
39
40
	defer client.Close()

	tx, err := client.StartTransaction()
41
	errCheck(err, ERROR, "[AntidoteAddItemToSetInBucket]: Unable to start a transaction using the go client")
42
43
44
45
46

	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(setName)

	err = bucket.Update(tx, antidoteclient.SetAdd(key, []byte(itemValue)))
47
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInBucket]: Unable to add value %s to set %s in bucket %s", itemValue, setName, bucketName))
48
49

	setVal, err := bucket.ReadSet(tx, key)
50
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInBucket]: Unable to read set %s in bucket %s", setName, bucketName))
51

52
	err = tx.Commit()
53
	errCheck(err, ERROR, "[AntidoteAddItemToSetInBucket]: Unable to commit transaction")
54

55
	// Return true if the value belongs now to the set (either it has been added now, or was there already)
56
57
58
59
60
61
	for _, v := range setVal {
		if string(v) == itemValue {
			return true
		}
	}

62
	// Return false otherwise
63
64
65
	return false
}

66
67
// AntidoteAddItemToSetInMapInBucket save an item to a set in a map in a bucket
func AntidoteAddItemToSetInMapInBucket(bucketName string, mapName string, setName string, itemValue string, DatabaseHost string, DatabasePort int) bool {
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
	errCheck(err, ERROR, "[AntidoteAddItemToSetInMapInBucket]: Unable to create a new AntidoteDB go client")
	defer client.Close()

	//TODO: remove this line
	// fmt.Printf("About to write data \"%s\" in set \"%s\" in map \"%s\" in bucket \"%s\"", itemValue, setName, mapName, bucketName)
	tx, err := client.StartTransaction()
	errCheck(err, ERROR, "[AntidoteAddItemToSetInMapInBucket]: Unable to start a transaction using the go client")

	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	mapKey := antidoteclient.Key(mapName)
	setKey := antidoteclient.Key(setName)

	err = bucket.Update(tx, antidoteclient.MapUpdate(mapKey,
		antidoteclient.SetAdd(setKey, []byte(itemValue))))
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInMapInBucket]: Unable to add value %s to set %s in bucket %s", itemValue, setName, bucketName))

	mapVal, err := bucket.ReadMap(tx, mapKey)
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInMapInBucket]: Unable to read set %s in bucket %s", setName, bucketName))

	setVal, err := mapVal.Set(setKey)
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInMapInBucket]: Unable to read set %s in bucket %s", setName, bucketName))

	// Return true if the value belongs now to the set (either it has been added now, or was there already)
	for _, v := range setVal {
		if string(v) == itemValue {
			return true
		}
	}


	// Return false otherwise
	return false
101
102
}

103
// AntidoteReadRegisterInBucket read a LWWW register  in a bucket
104
func AntidoteReadRegisterInBucket(bucketName string, registerName string, DatabaseHost string, DatabasePort int) string {
105

106
	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
107
	errCheck(err, ERROR, "[AntidoteReadRegisterInBucket]: Unable to create a new AntidoteDB go client")
108
	defer client.Close()
109

110
	tx, err := client.StartTransaction()
111
	errCheck(err, ERROR, "[AntidoteReadRegisterInBucket]: Unable to start a transaction using the go client")
112

113
114
	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(registerName)
115

116
	setVal, err := bucket.ReadReg(tx, key)
117
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteReadRegisterInBucket]: Unable to read register %s in bucket %s", registerName, bucketName))
118

119
	err = tx.Commit()
120
	errCheck(err, ERROR, "[AntidoteReadRegisterInBucket]: Unable to commit transaction")
121
122

	return string(setVal[:])
123
124
125
}

// AntidoteSetRegisterInBucket set a LWWW register in a bucket
126
func AntidoteSetRegisterInBucket(bucketName string, registerName string, itemValue string, DatabaseHost string, DatabasePort int) bool {
127
128

	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
129
	errCheck(err, ERROR, "[AntidoteSetRegisterInBucket]: Unable to create a new AntidoteDB go client")
130
	defer client.Close()
131

132
	tx, err := client.StartTransaction()
133
	errCheck(err, ERROR, "[AntidoteSetRegisterInBucket]: Unable to start a transaction using the go client")
134

135
136
	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(registerName)
137

138
	err = bucket.Update(tx, antidoteclient.RegPut(key, []byte(itemValue)))
139
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteSetRegisterInBucket]: Unable to set value %s to register %s in bucket %s", itemValue, registerName, bucketName))
140

141
	setVal, err := bucket.ReadReg(tx, key)
142
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteSetRegisterInBucket]: Unable to read register %s in bucket %s", registerName, bucketName))
143

144
	err = tx.Commit()
145
	errCheck(err, ERROR, "[AntidoteSetRegisterInBucket]: Unable to commit transaction")
146

147
	// Return true if the register value is itemValue
148
	if string(setVal[:]) == itemValue {
149
		return true
150
	}
151

152
	// Return false otherwise
153
154
155
156
	return false
}

// AntidoteRemoveRegisterInBucket remove (set null) a LWWW register in a bucket
157
func AntidoteRemoveRegisterInBucket(bucketName string, registerName string, DatabaseHost string, DatabasePort int) bool {
158
159

	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
160
	errCheck(err, ERROR, "[AntidoteRemoveRegisterInBucket]: Unable to create a new AntidoteDB go client")
161
	defer client.Close()
162

163
	tx, err := client.StartTransaction()
164
	errCheck(err, ERROR, "[AntidoteRemoveRegisterInBucket]: Unable to start a transaction using the go client")
165

166
167
	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(registerName)
168

169
	err = bucket.Update(tx, antidoteclient.RegPut(key, []byte("")))
170
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteRemoveRegisterInBucket]: Unable to set a null value to register %s in bucket %s", registerName, bucketName))
171

172
	setVal, err := bucket.ReadReg(tx, key)
173
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteRemoveRegisterInBucket]: Unable to read register %s in bucket %s", registerName, bucketName))
174

175
	err = tx.Commit()
176
	errCheck(err, ERROR, "[AntidoteRemoveRegisterInBucket]: Unable to commit transaction")
177

178
	// Return true if the register value is null (0 length)
179
	if len(setVal) == 0 {
180
		return true
181
	}
182

183
	// Return false otherwise
184
185
186
187
	return false
}

// AntidoteReadItemsFromSetInBucket read items from a set in a bucket
188
func AntidoteReadItemsFromSetInBucket(bucketName string, setName string, DatabaseHost string, DatabasePort int) []string {
189
190
191

	var items []string

192
	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
193
	errCheck(err, ERROR, "[AntidoteReadItemsFromSetInBucket]: Unable to create a new AntidoteDB go client")
194
	defer client.Close()
195

196
	tx, err := client.StartTransaction()
197
	errCheck(err, ERROR, "[AntidoteReadItemsFromSetInBucket]: Unable to start a transaction using the go client")
198

199
200
201
202
	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(setName)

	setVal, err := bucket.ReadSet(tx, key)
203
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteReadItemsFromSetInBucket]: Unable to read set %s in bucket %s", setName, bucketName))
204

205
	err = tx.Commit()
206
	errCheck(err, ERROR, "[AntidoteReadItemsFromSetInBucket]: Unable to commit transaction")
207

208
209
	for _, v := range setVal {
		items = append(items, string(v))
210
211
212
213
214
215
	}

	return items
}

// AntidoteRemoveItemFromSetInBucket removes an item from a set in a bucket
216
func AntidoteRemoveItemFromSetInBucket(bucketName string, setName string, itemValue string, DatabaseHost string, DatabasePort int) bool {
217
218

	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
219
	errCheck(err, ERROR, "[AntidoteRemoveItemFromSetInBucket]: Unable to create a new AntidoteDB go client")
220
	defer client.Close()
221

222
	tx, err := client.StartTransaction()
223
	errCheck(err, ERROR, "[AntidoteRemoveItemFromSetInBucket]: Unable to start a transaction using the go client")
224

225
226
	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(setName)
227

228
	err = bucket.Update(tx, antidoteclient.SetRemove(key, []byte(itemValue)))
229
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteRemoveItemFromSetInBucket]: Unable to remove item %s from set %s in bucket %s", itemValue, setName, bucketName))
230
231

	setVal, err := bucket.ReadSet(tx, key)
232
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteRemoveItemFromSetInBucket]: Unable to read set %s in bucket %s", setName, bucketName))
233

234
	err = tx.Commit()
235
	errCheck(err, ERROR, "[AntidoteRemoveItemFromSetInBucket]: Unable to commit transaction")
236

237
238
	itemNotInSet := true

239
	// Return false if the item remains in the set
240
	for _, v := range setVal {
241
		if string(v[:]) == itemValue {
242
			itemNotInSet = false
243
244
245
		}
	}

246
	// Return true otherwise (even if it had never been there)
247
	return itemNotInSet
248
249
}

250
// ReadDevicesFromAntidote read all the devices from Antidote using the Go client
251
func ReadDevicesFromAntidote(DatabaseHost string, DatabasePort int) []cnml.DeviceIpv4sGraphserver {
252
253
	var devices []cnml.DeviceIpv4sGraphserver

254
	for _, v := range AntidoteReadItemsFromSetInBucket("guifi", "devices", DatabaseHost, DatabasePort) {
255
		thisDevID, err := strconv.Atoi(v)
256
		errCheck(err, WARNING, fmt.Sprintf("[ReadDevicesFromAntidote]: An error occurred when converting %s to int", v))
257
258
		var thisDev cnml.DeviceIpv4sGraphserver
		thisDev.ID = thisDevID
259
		thisDev.GraphServer, err = strconv.Atoi(AntidoteReadRegisterInBucket(fmt.Sprintf("device-%d", thisDevID), "graphserver", DatabaseHost, DatabasePort))
260
261
262
		if err != nil {
			thisDev.GraphServer = 0
		}
263
		thisDev.Addresses = AntidoteReadItemsFromSetInBucket(fmt.Sprintf("device-%d", thisDevID), "ipv4s", DatabaseHost, DatabasePort)
264
265
266
267
268
269
		devices = append(devices, thisDev)
	}

	return devices
}

270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
// The errCheck function gracefully manages errors, warnings, etc.
func errCheck(e error, level int, message string) {

	if message == "" {
		message = "Unknown error"
	}
	if e != nil {
		switch level {
		case FATAL:
			glog.Fatalln(message, e)
		case ERROR:
			glog.Errorln(message, e)
		case WARNING:
			glog.Warningln(message, e)
		default:
			glog.Infoln(message, e)
		}
287
288
	}
}