Developing lightweight computation at the DSG edge

goclidote.go 9.45 KB
Newer Older
1
2
3
4
5
6
7
8
package goclidote

import (
	"fmt"
	"strconv"

	"uc-monitor-go-test/cnml"

9
	"github.com/AntidoteDB/antidote-go-client"
10
	"github.com/golang/glog"
11
12
13
)

// DatabaseHost defines the host of the AntidoteDB instance to connect to is running
14
// var DatabaseHost = "localhost"
15
16

// DatabasePort defines the port of the host where the AntidoteDB instance to connect to is running
17
// var DatabasePort = 8087
18

19
20
21
22
23
24
25
26
27
28
29
30
31
32
// Error levels

// FATAL output code
var FATAL = 1

// ERROR output code
var ERROR = 2

// WARNING output code
var WARNING = 3

// INFO output code
var INFO = 4

33
// AntidoteAddItemToSetInBucket save an item to a set in a bucket
34
func AntidoteAddItemToSetInBucket(bucketName string, setName string, itemValue string, DatabaseHost string, DatabasePort int) bool {
35
36

	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
37
	errCheck(err, ERROR, "[AntidoteAddItemToSetInBucket]: Unable to create a new AntidoteDB go client")
38
39
40
	defer client.Close()

	tx, err := client.StartTransaction()
41
	errCheck(err, ERROR, "[AntidoteAddItemToSetInBucket]: Unable to start a transaction using the go client")
42
43
44
45
46

	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(setName)

	err = bucket.Update(tx, antidoteclient.SetAdd(key, []byte(itemValue)))
47
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInBucket]: Unable to add value %s to set %s in bucket %s", itemValue, setName, bucketName))
48
49

	setVal, err := bucket.ReadSet(tx, key)
50
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInBucket]: Unable to read set %s in bucket %s", setName, bucketName))
51

52
	err = tx.Commit()
53
	errCheck(err, ERROR, "[AntidoteAddItemToSetInBucket]: Unable to commit transaction")
54

55
	// Return true if the value belongs now to the set (either it has been added now, or was there already)
56
57
58
59
60
61
	for _, v := range setVal {
		if string(v) == itemValue {
			return true
		}
	}

62
	// Return false otherwise
63
64
65
	return false
}

66
67
68
69
70
// AntidoteAddItemToSetInMapInBucket save an item to a set in a map in a bucket
func AntidoteAddItemToSetInMapInBucket(bucketName string, mapName string, setName string, itemValue string, DatabaseHost string, DatabasePort int) bool {
	return true
}

71
// AntidoteReadRegisterInBucket read a LWWW register  in a bucket
72
func AntidoteReadRegisterInBucket(bucketName string, registerName string, DatabaseHost string, DatabasePort int) string {
73

74
	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
75
	errCheck(err, ERROR, "[AntidoteReadRegisterInBucket]: Unable to create a new AntidoteDB go client")
76
	defer client.Close()
77

78
	tx, err := client.StartTransaction()
79
	errCheck(err, ERROR, "[AntidoteReadRegisterInBucket]: Unable to start a transaction using the go client")
80

81
82
	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(registerName)
83

84
	setVal, err := bucket.ReadReg(tx, key)
85
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteReadRegisterInBucket]: Unable to read register %s in bucket %s", registerName, bucketName))
86

87
	err = tx.Commit()
88
	errCheck(err, ERROR, "[AntidoteReadRegisterInBucket]: Unable to commit transaction")
89
90

	return string(setVal[:])
91
92
93
}

// AntidoteSetRegisterInBucket set a LWWW register in a bucket
94
func AntidoteSetRegisterInBucket(bucketName string, registerName string, itemValue string, DatabaseHost string, DatabasePort int) bool {
95
96

	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
97
	errCheck(err, ERROR, "[AntidoteSetRegisterInBucket]: Unable to create a new AntidoteDB go client")
98
	defer client.Close()
99

100
	tx, err := client.StartTransaction()
101
	errCheck(err, ERROR, "[AntidoteSetRegisterInBucket]: Unable to start a transaction using the go client")
102

103
104
	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(registerName)
105

106
	err = bucket.Update(tx, antidoteclient.RegPut(key, []byte(itemValue)))
107
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteSetRegisterInBucket]: Unable to set value %s to register %s in bucket %s", itemValue, registerName, bucketName))
108

109
	setVal, err := bucket.ReadReg(tx, key)
110
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteSetRegisterInBucket]: Unable to read register %s in bucket %s", registerName, bucketName))
111

112
	err = tx.Commit()
113
	errCheck(err, ERROR, "[AntidoteSetRegisterInBucket]: Unable to commit transaction")
114

115
	// Return true if the register value is itemValue
116
	if string(setVal[:]) == itemValue {
117
		return true
118
	}
119

120
	// Return false otherwise
121
122
123
124
	return false
}

// AntidoteRemoveRegisterInBucket remove (set null) a LWWW register in a bucket
125
func AntidoteRemoveRegisterInBucket(bucketName string, registerName string, DatabaseHost string, DatabasePort int) bool {
126
127

	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
128
	errCheck(err, ERROR, "[AntidoteRemoveRegisterInBucket]: Unable to create a new AntidoteDB go client")
129
	defer client.Close()
130

131
	tx, err := client.StartTransaction()
132
	errCheck(err, ERROR, "[AntidoteRemoveRegisterInBucket]: Unable to start a transaction using the go client")
133

134
135
	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(registerName)
136

137
	err = bucket.Update(tx, antidoteclient.RegPut(key, []byte("")))
138
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteRemoveRegisterInBucket]: Unable to set a null value to register %s in bucket %s", registerName, bucketName))
139

140
	setVal, err := bucket.ReadReg(tx, key)
141
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteRemoveRegisterInBucket]: Unable to read register %s in bucket %s", registerName, bucketName))
142

143
	err = tx.Commit()
144
	errCheck(err, ERROR, "[AntidoteRemoveRegisterInBucket]: Unable to commit transaction")
145

146
	// Return true if the register value is null (0 length)
147
	if len(setVal) == 0 {
148
		return true
149
	}
150

151
	// Return false otherwise
152
153
154
155
	return false
}

// AntidoteReadItemsFromSetInBucket read items from a set in a bucket
156
func AntidoteReadItemsFromSetInBucket(bucketName string, setName string, DatabaseHost string, DatabasePort int) []string {
157
158
159

	var items []string

160
	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
161
	errCheck(err, ERROR, "[AntidoteReadItemsFromSetInBucket]: Unable to create a new AntidoteDB go client")
162
	defer client.Close()
163

164
	tx, err := client.StartTransaction()
165
	errCheck(err, ERROR, "[AntidoteReadItemsFromSetInBucket]: Unable to start a transaction using the go client")
166

167
168
169
170
	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(setName)

	setVal, err := bucket.ReadSet(tx, key)
171
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteReadItemsFromSetInBucket]: Unable to read set %s in bucket %s", setName, bucketName))
172

173
	err = tx.Commit()
174
	errCheck(err, ERROR, "[AntidoteReadItemsFromSetInBucket]: Unable to commit transaction")
175

176
177
	for _, v := range setVal {
		items = append(items, string(v))
178
179
180
181
182
183
	}

	return items
}

// AntidoteRemoveItemFromSetInBucket removes an item from a set in a bucket
184
func AntidoteRemoveItemFromSetInBucket(bucketName string, setName string, itemValue string, DatabaseHost string, DatabasePort int) bool {
185
186

	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
187
	errCheck(err, ERROR, "[AntidoteRemoveItemFromSetInBucket]: Unable to create a new AntidoteDB go client")
188
	defer client.Close()
189

190
	tx, err := client.StartTransaction()
191
	errCheck(err, ERROR, "[AntidoteRemoveItemFromSetInBucket]: Unable to start a transaction using the go client")
192

193
194
	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(setName)
195

196
	err = bucket.Update(tx, antidoteclient.SetRemove(key, []byte(itemValue)))
197
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteRemoveItemFromSetInBucket]: Unable to remove item %s from set %s in bucket %s", itemValue, setName, bucketName))
198
199

	setVal, err := bucket.ReadSet(tx, key)
200
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteRemoveItemFromSetInBucket]: Unable to read set %s in bucket %s", setName, bucketName))
201

202
	err = tx.Commit()
203
	errCheck(err, ERROR, "[AntidoteRemoveItemFromSetInBucket]: Unable to commit transaction")
204

205
206
	itemNotInSet := true

207
	// Return false if the item remains in the set
208
	for _, v := range setVal {
209
		if string(v[:]) == itemValue {
210
			itemNotInSet = false
211
212
213
		}
	}

214
	// Return true otherwise (even if it had never been there)
215
	return itemNotInSet
216
217
}

218
// ReadDevicesFromAntidote read all the devices from Antidote using the Go client
219
func ReadDevicesFromAntidote(DatabaseHost string, DatabasePort int) []cnml.DeviceIpv4sGraphserver {
220
221
	var devices []cnml.DeviceIpv4sGraphserver

222
	for _, v := range AntidoteReadItemsFromSetInBucket("guifi", "devices", DatabaseHost, DatabasePort) {
223
		thisDevID, err := strconv.Atoi(v)
224
		errCheck(err, WARNING, fmt.Sprintf("[ReadDevicesFromAntidote]: An error occurred when converting %s to int", v))
225
226
		var thisDev cnml.DeviceIpv4sGraphserver
		thisDev.ID = thisDevID
227
		thisDev.GraphServer, err = strconv.Atoi(AntidoteReadRegisterInBucket(fmt.Sprintf("device-%d", thisDevID), "graphserver", DatabaseHost, DatabasePort))
228
229
230
		if err != nil {
			thisDev.GraphServer = 0
		}
231
		thisDev.Addresses = AntidoteReadItemsFromSetInBucket(fmt.Sprintf("device-%d", thisDevID), "ipv4s", DatabaseHost, DatabasePort)
232
233
234
235
236
237
		devices = append(devices, thisDev)
	}

	return devices
}

238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
// The errCheck function gracefully manages errors, warnings, etc.
func errCheck(e error, level int, message string) {

	if message == "" {
		message = "Unknown error"
	}
	if e != nil {
		switch level {
		case FATAL:
			glog.Fatalln(message, e)
		case ERROR:
			glog.Errorln(message, e)
		case WARNING:
			glog.Warningln(message, e)
		default:
			glog.Infoln(message, e)
		}
255
256
	}
}