Developing lightweight computation at the DSG edge

goclidote.go 8.85 KB
Newer Older
1
2
3
4
5
6
7
8
package goclidote

import (
	"fmt"
	"strconv"

	"uc-monitor-go-test/cnml"

9
	"github.com/AntidoteDB/antidote-go-client"
10
	"github.com/golang/glog"
11
12
13
14
15
16
17
18
)

// DatabaseHost defines the host of the AntidoteDB instance to connect to is running
var DatabaseHost = "localhost"

// DatabasePort defines the port of the host where the AntidoteDB instance to connect to is running
var DatabasePort = 8087

19
20
21
22
23
24
25
26
27
28
29
30
31
32
// Error levels

// FATAL output code
var FATAL = 1

// ERROR output code
var ERROR = 2

// WARNING output code
var WARNING = 3

// INFO output code
var INFO = 4

33
34
35
36
// AntidoteAddItemToSetInBucket save an item to a set in a bucket
func AntidoteAddItemToSetInBucket(bucketName string, setName string, itemValue string) bool {

	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
37
	errCheck(err, ERROR, "[AntidoteAddItemToSetInBucket]: Unable to create a new AntidoteDB go client")
38
39
40
	defer client.Close()

	tx, err := client.StartTransaction()
41
	errCheck(err, ERROR, "[AntidoteAddItemToSetInBucket]: Unable to start a transaction using the go client")
42
43
44
45
46

	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(setName)

	err = bucket.Update(tx, antidoteclient.SetAdd(key, []byte(itemValue)))
47
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInBucket]: Unable to add value %s to set %s in bucket %s", itemValue, setName, bucketName))
48
49

	setVal, err := bucket.ReadSet(tx, key)
50
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInBucket]: Unable to read set %s in bucket %s", setName, bucketName))
51

52
	err = tx.Commit()
53
	errCheck(err, ERROR, "[AntidoteAddItemToSetInBucket]: Unable to commit transaction")
54

55
	// Return true if the value belongs now to the set (either it has been added now, or was there already)
56
57
58
59
60
61
	for _, v := range setVal {
		if string(v) == itemValue {
			return true
		}
	}

62
	// Return false otherwise
63
64
65
66
	return false
}

// AntidoteReadRegisterInBucket read a LWWW register  in a bucket
67
func AntidoteReadRegisterInBucket(bucketName string, registerName string) string {
68

69
	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
70
	errCheck(err, ERROR, "[AntidoteReadRegisterInBucket]: Unable to create a new AntidoteDB go client")
71
	defer client.Close()
72

73
	tx, err := client.StartTransaction()
74
	errCheck(err, ERROR, "[AntidoteReadRegisterInBucket]: Unable to start a transaction using the go client")
75

76
77
	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(registerName)
78

79
	setVal, err := bucket.ReadReg(tx, key)
80
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteReadRegisterInBucket]: Unable to read register %s in bucket %s", registerName, bucketName))
81

82
	err = tx.Commit()
83
	errCheck(err, ERROR, "[AntidoteReadRegisterInBucket]: Unable to commit transaction")
84
85

	return string(setVal[:])
86
87
88
}

// AntidoteSetRegisterInBucket set a LWWW register in a bucket
89
90
91
func AntidoteSetRegisterInBucket(bucketName string, registerName string, itemValue string) bool {

	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
92
	errCheck(err, ERROR, "[AntidoteSetRegisterInBucket]: Unable to create a new AntidoteDB go client")
93
	defer client.Close()
94

95
	tx, err := client.StartTransaction()
96
	errCheck(err, ERROR, "[AntidoteSetRegisterInBucket]: Unable to start a transaction using the go client")
97

98
99
	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(registerName)
100

101
	err = bucket.Update(tx, antidoteclient.RegPut(key, []byte(itemValue)))
102
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteSetRegisterInBucket]: Unable to set value %s to register %s in bucket %s", itemValue, registerName, bucketName))
103

104
	setVal, err := bucket.ReadReg(tx, key)
105
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteSetRegisterInBucket]: Unable to read register %s in bucket %s", registerName, bucketName))
106

107
	err = tx.Commit()
108
	errCheck(err, ERROR, "[AntidoteSetRegisterInBucket]: Unable to commit transaction")
109

110
	// Return true if the register value is itemValue
111
	if string(setVal[:]) == itemValue {
112
		return true
113
	}
114

115
	// Return false otherwise
116
117
118
119
	return false
}

// AntidoteRemoveRegisterInBucket remove (set null) a LWWW register in a bucket
120
121
122
func AntidoteRemoveRegisterInBucket(bucketName string, registerName string) bool {

	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
123
	errCheck(err, ERROR, "[AntidoteRemoveRegisterInBucket]: Unable to create a new AntidoteDB go client")
124
	defer client.Close()
125

126
	tx, err := client.StartTransaction()
127
	errCheck(err, ERROR, "[AntidoteRemoveRegisterInBucket]: Unable to start a transaction using the go client")
128

129
130
	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(registerName)
131

132
	err = bucket.Update(tx, antidoteclient.RegPut(key, []byte("")))
133
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteRemoveRegisterInBucket]: Unable to set a null value to register %s in bucket %s", registerName, bucketName))
134

135
	setVal, err := bucket.ReadReg(tx, key)
136
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteRemoveRegisterInBucket]: Unable to read register %s in bucket %s", registerName, bucketName))
137

138
	err = tx.Commit()
139
	errCheck(err, ERROR, "[AntidoteRemoveRegisterInBucket]: Unable to commit transaction")
140

141
	// Return true if the register value is null (0 length)
142
	if len(setVal) == 0 {
143
		return true
144
	}
145

146
	// Return false otherwise
147
148
149
150
	return false
}

// AntidoteReadItemsFromSetInBucket read items from a set in a bucket
151
func AntidoteReadItemsFromSetInBucket(bucketName string, setName string) []string {
152
153
154

	var items []string

155
	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
156
	errCheck(err, ERROR, "[AntidoteReadItemsFromSetInBucket]: Unable to create a new AntidoteDB go client")
157
	defer client.Close()
158

159
	tx, err := client.StartTransaction()
160
	errCheck(err, ERROR, "[AntidoteReadItemsFromSetInBucket]: Unable to start a transaction using the go client")
161

162
163
164
165
	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(setName)

	setVal, err := bucket.ReadSet(tx, key)
166
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteReadItemsFromSetInBucket]: Unable to read set %s in bucket %s", setName, bucketName))
167

168
	err = tx.Commit()
169
	errCheck(err, ERROR, "[AntidoteReadItemsFromSetInBucket]: Unable to commit transaction")
170

171
172
	for _, v := range setVal {
		items = append(items, string(v))
173
174
175
176
177
178
	}

	return items
}

// AntidoteRemoveItemFromSetInBucket removes an item from a set in a bucket
179
180
181
func AntidoteRemoveItemFromSetInBucket(bucketName string, setName string, itemValue string) bool {

	client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
182
	errCheck(err, ERROR, "[AntidoteRemoveItemFromSetInBucket]: Unable to create a new AntidoteDB go client")
183
	defer client.Close()
184

185
	tx, err := client.StartTransaction()
186
	errCheck(err, ERROR, "[AntidoteRemoveItemFromSetInBucket]: Unable to start a transaction using the go client")
187

188
189
	bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
	key := antidoteclient.Key(setName)
190

191
	err = bucket.Update(tx, antidoteclient.SetRemove(key, []byte(itemValue)))
192
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteRemoveItemFromSetInBucket]: Unable to remove item %s from set %s in bucket %s", itemValue, setName, bucketName))
193
194

	setVal, err := bucket.ReadSet(tx, key)
195
	errCheck(err, ERROR, fmt.Sprintf("[AntidoteRemoveItemFromSetInBucket]: Unable to read set %s in bucket %s", setName, bucketName))
196

197
	err = tx.Commit()
198
	errCheck(err, ERROR, "[AntidoteRemoveItemFromSetInBucket]: Unable to commit transaction")
199

200
201
	itemNotInSet := true

202
	// Return false if the item remains in the set
203
	for _, v := range setVal {
204
		if string(v[:]) == itemValue {
205
			itemNotInSet = false
206
207
208
		}
	}

209
	// Return true otherwise (even if it had never been there)
210
	return itemNotInSet
211
212
}

213
// ReadDevicesFromAntidote read all the devices from Antidote using the Go client
214
215
216
217
218
func ReadDevicesFromAntidote() []cnml.DeviceIpv4sGraphserver {
	var devices []cnml.DeviceIpv4sGraphserver

	for _, v := range AntidoteReadItemsFromSetInBucket("guifi", "devices") {
		thisDevID, err := strconv.Atoi(v)
219
		errCheck(err, WARNING, fmt.Sprintf("[ReadDevicesFromAntidote]: An error occurred when converting %s to int", v))
220
221
222
		var thisDev cnml.DeviceIpv4sGraphserver
		thisDev.ID = thisDevID
		thisDev.GraphServer, err = strconv.Atoi(AntidoteReadRegisterInBucket(fmt.Sprintf("device-%d", thisDevID), "graphserver"))
223
224
225
		if err != nil {
			thisDev.GraphServer = 0
		}
226
227
228
229
230
231
232
		thisDev.Addresses = AntidoteReadItemsFromSetInBucket(fmt.Sprintf("device-%d", thisDevID), "ipv4s")
		devices = append(devices, thisDev)
	}

	return devices
}

233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
// The errCheck function gracefully manages errors, warnings, etc.
func errCheck(e error, level int, message string) {

	if message == "" {
		message = "Unknown error"
	}
	if e != nil {
		switch level {
		case FATAL:
			glog.Fatalln(message, e)
		case ERROR:
			glog.Errorln(message, e)
		case WARNING:
			glog.Warningln(message, e)
		default:
			glog.Infoln(message, e)
		}
250
251
	}
}