Developing lightweight computation at the DSG edge

monitor-assign.go 27.6 KB
Newer Older
Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
1
2
3
4
5
6
7
8
package main

import (
	"flag"
	"fmt"
	"math"
	"math/rand"
	"os"
9
	"os/signal"
Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
10
11
12
	"strconv"
	"time"

13
	"uc-monitor-go-test/cnml"
14
	"uc-monitor-go-test/goclidote"
15
16

	"github.com/golang/glog"
Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
17
18
)

19
20
21
22
23
var dbHost = "localhost"
var dbHostHelp = "The hostname or IP address where AntidoteDB is running"
var dbPort = 8087
var dbPortHelp = "The TCP port on which AntidoteDB is listening"

24
// The Timestamps struct contains a monitor's timestamp and a local timestamp
25
26
27
28
29
30
31
32
33
34
35
type Timestamps struct {
	monitor int64
	local   int64
}

// The MonitorTimestamp struct contains a monitor's id and timestamps
type MonitorTimestamp struct {
	id         string
	timestamps Timestamps
}

Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
36
37
38
39
40
41
42
43
44
// Error levels

// FATAL output code
var FATAL = 1

// ERROR output code
var ERROR = 2

// WARNING output code
45
46
47
48
var WARNING = 3

// INFO output code
var INFO = 4
Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
49
50
51
52
53

// DEFAULTFILEMODE for created files
var DEFAULTFILEMODE os.FileMode = 0755

//Default settings and descriptions
54

55
// monitorTimeout defines how long to wait before removing a crashed monitor from the database
56
var monitorTimeout int64 = 450
57
var monitorTimeoutHelp = "Time before deregistering crashed monitors or in network partition"
58

Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
59
60
61
62
63
64
// idNum holds the monitor id, in numberic form
var idNum int

// ID represents the ID of the monitor, in alfanumeric form
var ID = "0"
var idHelp = "Numeric ID of this monitoring instance"
65

66
67
68
// IDts holds a timestamp
var IDts = "0_ts"

69
70
// minMons defines the minimum number of monitors a deviced must be assigned to
// so that it is considered to have enough redundancy
71
var minMons = 3
72
73
var minMonsHelp = "Minimum number of monitors to assign to each device"

74
75
76
77
78
// maxMons defines the maximum number of monitors a device must be assigned to,
// so that it is not monitored by more monitors than needed
var maxMons = minMons + 1
var maxMonsHelp = "Maximum number of monitors to assign to each device"

79
// maxDevs defines the default maximum number of nodes a monitor is assigned to
Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
80
81
82
var maxDevs = 1000
var maxDevsHelp = "Maximum number of devices to monitor"

83
var devsCheckInterval = 150
Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
84
var devsCheckIntervalHelp = "Interval (in seconds) in which to check for changes in the devices list"
85

86
var fillingTime = 150
87
88
var fillingTimeHelp = "Interval (in seconds) for filling the local assignation list"

89
var reassignInterval = 150
Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
90
var reassignIntervalHelp = "Interval (in seconds) in which to update the local devices assignation list"
91

92
var globalAssignCheckInterval = 150
Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
93
94
var globalAssignCheckIntervalHelp = "Interval (in seconds) in which to check for changes in the global devices assignation list"

95
96
97
var policy = 1
var policyHelp = "Network devices to monitoring servers assignation policy"

98
// Global data structures
99
100
101
var cnmlDevices []cnml.DeviceIpv4sGraphserver    //The global list with all the fetched devices and their IPv4 addresses
var cnmlDevicesChecksum string                   //The checksum of the last global devices list and IPv4 addresses fetched
var localAssign []cnml.DeviceMonitorAssignation  //The list of locally assigned devices and their IPv4 addresses
Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
102
var globalAssign []cnml.DeviceMonitorAssignation //The global assignation list of devices<=>monitors
103
var globalTimestamps []MonitorTimestamp
Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
104
105
106
107

func main() {
	//Initialize the service (read the configuration flags, etc.)
	initialize()
108

109
	// Handle interrupt signals in order to terminate gracefully
110
111
112
113
114
	signalChan := make(chan os.Signal, 1)
	cleanupDone := make(chan bool)
	signal.Notify(signalChan, os.Interrupt)
	go func() {
		for _ = range signalChan {
115
			fmt.Println("\nReceived an interrupt, deregistering from the monitors list and quitting...")
116
117

			deregisterMonitorInAntidote(ID)
118
119
120
121
			cleanupDone <- true
		}
	}()
	<-cleanupDone
122
	fmt.Println("\nBye!")
Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
123
124
}

125
// The initialize() function manages the execution parameters, sets timers, etc.
Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
126
127
func initialize() {

128
129
	fmt.Println("Initializing...")

130
131
132
	// Manage command line flags and execution settings
	initializeFlags()

133
	// Register as a monitor to Antidote's list of active Monitors
134
135
	registerMonitorInAntidote()

136
137
138
	// Publish the maxDev value for this monitor
	updateMonitorMaxDevsInAntidote()

139
	// Initialize periodic timers
140
141
	initializeTimers()

142
	// The application is now initialized; entering the loop
143
	fmt.Println("Initialization done. Entering infinite loop...")
144
145
146
}

func initializeTimers() {
147
148
	// Initialize cnmlDevices checksum changes check timer
	startcnmlDevicesChecksumTimer()
149

150
	// Initialize cnmlDevices checksum changes check timer
151
	startAssignTimer()
152
153
154
155
156
}

// The initializeFlags function initializes the command line flags
func initializeFlags() {

Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
157
	//Define and parse command line flags
158
159
	dbHostPtr := flag.String("db_host", dbHost, dbHostHelp)
	dbPortPtr := flag.Int("db_port", dbPort, dbPortHelp)
Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
160
161
	idInt, _ := strconv.ParseInt(ID, 10, 32)
	idPtr := flag.Int("id", int(idInt), idHelp)
162
	minMonsPtr := flag.Int("minMons", minMons, minMonsHelp)
163
	maxMonsPtr := flag.Int("maxMons", maxMons, maxMonsHelp)
Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
164
165
	maxDevsPtr := flag.Int("maxDevs", maxDevs, maxDevsHelp)
	devsCheckIntervalPtr := flag.Int("devsCheckInterval", devsCheckInterval, devsCheckIntervalHelp)
166
	fillingTimePtr := flag.Int("fillingTime", fillingTime, fillingTimeHelp)
Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
167
168
	reassignIntervalPtr := flag.Int("reassignInterval", reassignInterval, reassignIntervalHelp)
	globalAssignCheckIntervalPtr := flag.Int("globalAssignCheckInterval", globalAssignCheckInterval, globalAssignCheckIntervalHelp)
169
	monitorTimeoutPtr := flag.Int("monitorTimeout", int(monitorTimeout), monitorTimeoutHelp)
170
	policyPtr := flag.Int("policy", int(policy), policyHelp)
Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
171
172
173
174
175
176
177

	flag.Parse()

	//Service id
	if *idPtr > 0 {
		idNum = *idPtr
		ID = fmt.Sprint(*idPtr)
178
		IDts = fmt.Sprintf("%s_ts", ID)
Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
179
180
181
182
183
		fmt.Println("Using ID", ID)
	} else {
		randsource := rand.New(rand.NewSource(time.Now().UnixNano()))
		ID = fmt.Sprint("a", 100000+randsource.Intn(900000))
		fmt.Println("Using anonymous id", ID)
184
		IDts = fmt.Sprintf("%s_ts", ID)
Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
185
186
	}

187
188
189
	dbHost = *dbHostPtr
	dbPort = *dbPortPtr

Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
190
191
192
193
194
	//Maximum devices to monitor
	if *maxDevsPtr > 0 {
		maxDevs = *maxDevsPtr
	}

195
	// Minimum and maximum monitors per device
196
197
198
	if *minMonsPtr > 0 {
		minMons = *minMonsPtr
	}
199
200
201
	if *maxMonsPtr >= minMons {
		maxMons = *maxMonsPtr
	}
202

Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
203
204
205
206
	//Timers for tickers (intervals)
	if *devsCheckIntervalPtr > 0 {
		devsCheckInterval = *devsCheckIntervalPtr
	}
207
208
209
	if *fillingTimePtr > 0 {
		fillingTime = *fillingTimePtr
	}
Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
210
211
212
213
214
215
	if *reassignIntervalPtr > 0 {
		reassignInterval = *reassignIntervalPtr
	}
	if *globalAssignCheckIntervalPtr > 0 {
		globalAssignCheckInterval = *globalAssignCheckIntervalPtr
	}
216
217
218
219
220
221
	if *monitorTimeoutPtr > 0 {
		monitorTimeout = int64(*monitorTimeoutPtr)
	}
	if monitorTimeout <= int64(globalAssignCheckInterval) || monitorTimeout <= int64(reassignInterval) {
		monitorTimeout = int64(math.Max(float64(globalAssignCheckInterval), float64(reassignInterval))) + 1
	}
222
223

	//Policy
224
	if *policyPtr >= 1 && *policyPtr <= 2 {
225
226
		policy = *policyPtr
	}
Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
227
228
}

229
230
// Get the global devices list from AntidoteDB
func getCnmlDevicesList() []cnml.DeviceIpv4sGraphserver {
231
	return goclidote.ReadDevicesFromAntidote(dbHost, dbPort)
Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
232
233
}

234
// Update globalAssign from the current shared database status
235
func updateGlobalAssignationList() {
236
	fmt.Println("Updating globalAssign...")
237

238
	var newDevices []cnml.DeviceMonitorAssignation
239
240
	var currDevices []cnml.DeviceMonitorAssignation
	var oldDevices []cnml.DeviceMonitorAssignation
241

242
	// Iterate through all the devices known to the monitoring system
243
	for _, v := range cnmlDevices {
244
		if isDeviceInAssignationSlice(v.ID, globalAssign) {
245
			// We already know this device; update its monitors assignation from AntidoteDB
246
			globalAssign[posDeviceInAssignationSlice(v.ID, globalAssign)].MonitorID = goclidote.AntidoteReadItemsFromSetInBucket(fmt.Sprintf("device-%d", v.ID), "monitors", dbHost, dbPort)
247
		} else {
248
			// A new device from the cnmlDevices list; add it to a new devices list to append it below
249
			fmt.Printf("Adding device %d from cnmlDevices into globalAssign\n", v.ID)
250
			var newDev cnml.DeviceMonitorAssignation
251
252
			newDev.Device.ID = v.ID
			newDev.Device.Addresses = v.Addresses
253
			newDev.MonitorID = goclidote.AntidoteReadItemsFromSetInBucket(fmt.Sprintf("device-%d", v.ID), "monitors", dbHost, dbPort)
254
			newDevices = append(newDevices, newDev)
255
256
257
		}
	}

258
259
260
261
262
263
264
265
266
267
268
269
	// The globalAssign list may contain old devices, not in the CNML anymore; discard them
	for _, v := range globalAssign {
		if isDeviceInSliceByID(v.Device.ID, cnmlDevices) {
			currDevices = append(currDevices, v)
		} else {
			oldDevices = append(oldDevices, v)
		}
	}

	// Leave in globalAssign only current devices
	globalAssign = currDevices

270
271
272
273
274
	// Append the new devices to the globalAssign list
	for _, v := range newDevices {
		globalAssign = append(globalAssign, v)
	}

275
276
277
278
279
280
281
282
283
284
285
	fmt.Println("globalAssign updated!")
}

func updateLocalAssignationList() {

	var currlocalAssign []cnml.DeviceMonitorAssignation
	var oldlocalAssign []cnml.DeviceMonitorAssignation

	// The localAssign list may contain old devices, not in the CNML anymore;
	// discard them.
	for _, v := range localAssign {
286
		if isDeviceInSliceByID(v.Device.ID, cnmlDevices) {
287
288
289
			currlocalAssign = append(currlocalAssign, v)
		} else {
			oldlocalAssign = append(oldlocalAssign, v)
290
291
		}
	}
292

293
294
295
	// TODO: also look for devices in localAssign NOT assigned to this monitor, which should never have happened

	localAssign = currlocalAssign
296
297
}

298
func mergeGlobalAssignIntoLocalAssign() {
Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
299

300
	var newlocalAssign []cnml.DeviceMonitorAssignation
Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
301

302
303
304
	// Search for those devices in the globalAssign list that have this monitor
	// assigned. Check if they are already in the localAssign list, or add them
	// otherwise
305
306
307
	for _, v := range globalAssign {
		for _, w := range v.MonitorID {
			if w == ID {
308
309
310
311
312
313
314
315
316
317
				var inLocal = false
				for _, x := range localAssign {
					if v.Device.ID == x.Device.ID {
						inLocal = true
						break
					}
				}
				if !inLocal {
					newlocalAssign = append(newlocalAssign, v)
				}
318
			}
Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
319
320
		}
	}
321

322
323
324
	for _, v := range newlocalAssign {
		localAssign = append(localAssign, v)
	}
Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
325
326
}

327
func manageMonitorsList() {
328
	fmt.Println("")
329
330
331
	fmt.Println("Managing the monitors list...")

	// Fetch the current monitors list
332
	currentMonitors := goclidote.AntidoteReadItemsFromSetInBucket("guifi", "monitors", dbHost, dbPort)
333
334
335
336
337
338
339
340
341
342
	fmt.Println(len(currentMonitors), "monitors registered in the database:")
	var monitors = ""
	for _, v := range currentMonitors {
		if monitors == "" {
			monitors = v
		} else {
			monitors = fmt.Sprintf("%s, %s", monitors, v)
		}
	}
	fmt.Println(monitors)
343
344
345
346
347
348
349

	// Create an array and store the timestamps fetched from the database
	var currentTimestamps []MonitorTimestamp

	for _, v := range currentMonitors {
		var thisMonitor MonitorTimestamp

350
		mts, err := strconv.ParseInt(goclidote.AntidoteReadRegisterInBucket("guifi", fmt.Sprintf("%s_ts", v), dbHost, dbPort), 10, 64)
351
352
353
354
355
356
357
358
359
360
361
362
		errCheck(err, WARNING, fmt.Sprintf("Unable to read the timestamp from Antidote for monitor %s", v))

		thisMonitor.id = v
		thisMonitor.timestamps.monitor = mts
		thisMonitor.timestamps.local = time.Now().Unix()

		currentTimestamps = append(currentTimestamps, thisMonitor)
	}

	var nextGlobalTimestamps []MonitorTimestamp

	// Iterate the fetched timestamps list and process it
363
	for _, v := range currentTimestamps {
364
365
		var knownMonitor = false

366
367
368
369
		// Iterate the local timestamps list
		for _, w := range globalTimestamps {

			// Look for matching IDs in both lists
370
371
372
			if v.id == w.id {
				knownMonitor = true

373
374
				// Check if the timestamps are the same (i.e., the monitor has not
				// reported itself to the database recently)
375
376
				if v.timestamps.monitor == w.timestamps.monitor {
					// The monitor is not updating its info to the database, check local timestamps
377
					if v.timestamps.local-w.timestamps.local > monitorTimeout {
378
						// Deregister the monitor
379
380
						fmt.Println("Deregistering crashed monitor", v.id)
						deregisterMonitorInAntidote(v.id)
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
					} else {
						nextGlobalTimestamps = append(nextGlobalTimestamps, w)
					}
					// The monitor is updating its info to the database, keep the latest info
				} else {
					nextGlobalTimestamps = append(nextGlobalTimestamps, v)
				}
			}
		}

		// This monitor is new, save it for the next iteration
		if knownMonitor == false {
			nextGlobalTimestamps = append(nextGlobalTimestamps, v)
		}
	}

	globalTimestamps = nextGlobalTimestamps
}

400
401
402
403
404
405
406
// The sanitizeAssignationsInDatabase function gets the current list of monitors
// from the database, the current nodes<->monitors assignations, and sanitizes
// it (unassigns crashed monitors, etc.)
func sanitizeAssignationsInDatabase() {
	fmt.Println("")
	fmt.Println("Sanitizing the assignation list...")

407
408
409
	// TODO: This list of monitors should be the same as the one just asked for
	// in the manageMonitors function. Try to minimize calls by doing it only once

410
	// Update the current monitors list
411
	fmt.Println("Getting the current monitors list...")
412
413
414
	currentMonitors := getCurrentMonitorsList()

	// Update the current devices list
415
	fmt.Println("Updating the current cnml...")
416
417
418
419
	cnmlDevices = getCnmlDevicesList()

	for _, v := range globalAssign {
		// fmt.Println("############")
420
		// fmt.Println(k, ": device ID:", v.Device.ID)
421
422
423
424
425
426
		// fmt.Println("Monitors:", v.MonitorID)
		for _, w := range v.MonitorID {
			var monitorFound = false
			for _, x := range currentMonitors {
				if w == x {
					monitorFound = true
427
					fmt.Println(fmt.Sprintf("Monitor %s found, keeping it for device %d", w, v.Device.ID))
428
429
430
				}
			}
			if monitorFound == false {
431
432
				fmt.Println(fmt.Sprintf("Monitor %s not found, unassigning it from device %d", w, v.Device.ID))
				unassignMonitorFromDevice(w, v.Device.ID)
433
434
435
			}
		}
	}
436
	fmt.Println("Ended assignation list sanitization...")
437
438
439
}

func getCurrentMonitorsList() []string {
440
	return goclidote.AntidoteReadItemsFromSetInBucket("guifi", "monitors", dbHost, dbPort)
441
442
}

443
444
445
446
func assignMonitorFromDevice(monitorID string, deviceID int) bool {
	return goclidote.AntidoteAddItemToSetInBucket(fmt.Sprintf("device-%d", deviceID), "monitors", monitorID, dbHost, dbPort)
}

447
func unassignMonitorFromDevice(monitorID string, deviceID int) bool {
448
	return goclidote.AntidoteRemoveItemFromSetInBucket(fmt.Sprintf("device-%d", deviceID), "monitors", monitorID, dbHost, dbPort)
449
450
}

Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
451
func reassignDevs() {
452
453
	fmt.Println("")
	fmt.Println("Reassignation of devices")
454
	fmt.Println(len(localAssign), "devices currently assigned to this monitor (maximum:", maxDevs, "devices)")
Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
455

456
457
	// Track changes in the assignation list
	assignChange := false
Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
458

459
460
461
462
463
464
465
466
467
468
469
470
471
472
	switch policy {
	case 1:
		// Policy #1: The monitoring server self-assigns those network devices that
		// have less monitoring devices assigned than the minimum number specified
		// by the minMons parameter. Devices are picked randomly until the server
		// reaches its maximum number of monitored devices, specified by the maxDevs
		// parameter.

		// Check if more nodes can be assigned, and if there are any devices not
		// being monitored by this server that could be assigned
		if (len(localAssign) < maxDevs) && (len(localAssign) < len(cnmlDevices)) {

			// Calculate how many new devices can be assigned
			toassign := int(math.Min(float64(int(math.Min(math.Min(float64(len(cnmlDevices)), float64(maxDevs))*float64(devsCheckInterval)/float64(fillingTime)+1, float64(len(cnmlDevices)-len(localAssign))))), float64(maxDevs-len(localAssign))))
473
			fmt.Println("Assigning up to", toassign, "new devices")
474
475
476
477

			// Create a list with all the unassigned devices
			var unassignedDevices []cnml.DeviceIpv4sGraphserver
			for _, v := range cnmlDevices {
478
				if !isDeviceInAssignationSlice(v.ID, localAssign) && (!isDeviceInAssignationSlice(v.ID, globalAssign) || len(globalAssign[posDeviceInAssignationSlice(v.ID, globalAssign)].MonitorID) < minMons) {
479
480
					unassignedDevices = append(unassignedDevices, v)
				}
Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
481
			}
482
483
			fmt.Println(len(unassignedDevices), "devices unassigned")

484
			fmt.Println("Picking", toassign, "devices randomly")
485
486
487
488
489
490
491
492
493
494
495
496
497
498

			// Select some devices to self-assign, up to the previously calculated
			// value in the "toassign" variable
			for i := 0; i < toassign; i++ {
				// Check if unassigned devices remain
				if len(unassignedDevices) > 0 {
					// Pick a random device from the unassigned list
					randsource := rand.New(rand.NewSource(time.Now().UnixNano()))
					j := randsource.Intn(len(unassignedDevices))
					// Save it to the new assigned devices list
					var newAssignDev cnml.DeviceMonitorAssignation
					newAssignDev.Device.ID = unassignedDevices[j].ID
					newAssignDev.Device.Addresses = unassignedDevices[j].Addresses
					newAssignDev.MonitorID = append(newAssignDev.MonitorID, ID)
499
					localAssign = append(localAssign, newAssignDev)
500
					assignChange = true
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
					// Remove it from the unassigned devices list
					unassignedDevices = rmDeviceFromSlice(unassignedDevices[j], unassignedDevices)
				} else {
					fmt.Println(toassign-i, "more devices were to be assigned, but no unassigned devices remain")
					break
				}
			}
			fmt.Println(len(localAssign), "devices currently assigned to this monitor")
		} else {
			fmt.Println("Not assigning any new device.")
		}

	case 2:
		// Policy #2: The monitoring server self-assigns those network devices that
		// have the least monitoring devices already assigned. Devices are picked
		// randomly, from those with less monitors, until the server reaches its
		// maximum number of monitored devices specified by the maxDevs parameter.

		// Check if more nodes can be assigned, and if there are any devices not
		// being monitored by this server that could be assigned
		if (len(localAssign) < maxDevs) && (len(localAssign) < len(cnmlDevices)) {

			// Calculate how many new devices can be assigned
			toassign := int(math.Min(float64(int(math.Min(math.Min(float64(len(cnmlDevices)), float64(maxDevs))*float64(devsCheckInterval)/float64(fillingTime)+1, float64(len(cnmlDevices)-len(localAssign))))), float64(maxDevs-len(localAssign))))
525
			fmt.Println("Assigning up to", toassign, "new devices")
526
527
528
529
530

			// Calculate which is the number of least monitors a device has
			leastMons := 0

			var unassignedDevices []cnml.DeviceIpv4sGraphserver
531
532
533
534
535
536

			for leastMons < maxMons && len(unassignedDevices) == 0 {
				for _, v := range cnmlDevices {
					if !isDeviceInAssignationSlice(v.ID, globalAssign) || (!isDeviceInAssignationSlice(v.ID, localAssign) && len(globalAssign[posDeviceInAssignationSlice(v.ID, globalAssign)].MonitorID) < leastMons) {
						unassignedDevices = append(unassignedDevices, v)
					}
537
				}
538
				leastMons = leastMons + 1
539
540
541
			}
			fmt.Println(len(unassignedDevices), "devices unassigned")

542
			fmt.Println("Picking up to", toassign, "devices randomly")
543
544
545
546
547
548
549
550
551
552
553
554
555
556

			// Select some devices to self-assign, up to the previously calculated
			// value in the "toassign" variable
			for i := 0; i < toassign; i++ {
				// Check if unassigned devices remain
				if len(unassignedDevices) > 0 {
					// Pick a random device from the unassigned list
					randsource := rand.New(rand.NewSource(time.Now().UnixNano()))
					j := randsource.Intn(len(unassignedDevices))
					// Save it to the new assigned devices list
					var newAssignDev cnml.DeviceMonitorAssignation
					newAssignDev.Device.ID = unassignedDevices[j].ID
					newAssignDev.Device.Addresses = unassignedDevices[j].Addresses
					newAssignDev.MonitorID = append(newAssignDev.MonitorID, ID)
557
					localAssign = append(localAssign, newAssignDev)
558
					assignChange = true
559
560
561
562
563
564
					// Remove it from the unassigned devices list
					unassignedDevices = rmDeviceFromSlice(unassignedDevices[j], unassignedDevices)
				} else {
					fmt.Println(toassign-i, "more devices were to be assigned, but no unassigned devices remain")
					break
				}
Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
565
			}
566
567
568
			fmt.Println(len(localAssign), "devices currently assigned to this monitor")
		} else {
			fmt.Println("Not assigning any new device.")
Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
569
		}
570
		fmt.Println("Global average load:", calculateGlobalAverageLoad())
Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
571
572
	}

573
	// Update the new assigned devices to the global shared list
574
	if assignChange {
Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
575
		fmt.Println("Exporting the new assigned devices list")
576
		for _, v := range localAssign {
577
			goclidote.AntidoteAddItemToSetInBucket(fmt.Sprintf("device-%d", v.Device.ID), "monitors", ID, dbHost, dbPort)
578
		}
579
		// localLoad := math.min(1, float64(len(localAssign))/float64(maxDevs))
580
581
582
583
584
585
	}
}

// // Check for devices assigned locally that are no longer in the global devices list
// func checkOutdatedLocalDevices() {
//
586
// 	assignChange := false                       // Track changes in the assignation list
587
588
// 	var deldevs []cnml.DeviceMonitorAssignation // Storage for the deleted devices
// 	newlocalAssign := localAssign               // Duplicate the local list of assigned devices
589
590
//
// 	for _, v := range localAssign {
591
592
// 		if !isDeviceInSliceByID(v.Device.ID, cnmlDevices) {
// 			fmt.Println("Removing old device ", v.Device.ID, " from the local assigned devices list")
593
// 			deldevs = append(deldevs, v)
594
// 			newlocalAssign = rmDeviceFromAssignationList(v.Device.ID, newlocalAssign)
595
// 			assignChange = true
596
597
598
599
// 		}
// 	}
//
// 	// Save the new local assigned devices, if changes have been made
600
// 	if assignChange {
601
602
// 		localAssign = newlocalAssign
// 		for _, v := range deldevs {
603
// 			goclidote.AntidoteRemoveItemFromSetInBucket(fmt.Sprintf("device-%d", v.Device.ID), "monitors", ID, dbHost, dbPort)
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
// 		}
// 	}
// }

func addDevicesAssignedByGraphserver() {
	for _, v := range cnmlDevices {
		if isDeviceInAssignationSlice(v.ID, localAssign) {
			graphserver := false
			deviceMonitors := globalAssign[posDeviceInAssignationSlice(v.ID, localAssign)].MonitorID

			for _, w := range deviceMonitors {
				if w == ID {
					graphserver = true
					break
				}
			}

			if !graphserver {
				globalAssign[posDeviceInAssignationSlice(v.ID, localAssign)].MonitorID = append(deviceMonitors, ID)
			}
624
		}
Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
625
626
627
	}
}

628
func posDeviceInSlice(device cnml.DeviceIpv4sGraphserver, slice []cnml.DeviceIpv4sGraphserver) int {
Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
629
630
631
632
633
634
635
636
	for k, v := range slice {
		if device.ID == v.ID {
			return k
		}
	}
	return -1
}

637
638
639
640
641
642
643
644
645
func posDeviceInSliceByID(deviceID int, slice []cnml.DeviceIpv4sGraphserver) int {
	for k, v := range slice {
		if deviceID == v.ID {
			return k
		}
	}
	return -1
}

646
func isDeviceInSlice(device cnml.DeviceIpv4sGraphserver, slice []cnml.DeviceIpv4sGraphserver) bool {
Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
647
648
649
650
651
652
653
	if posDeviceInSlice(device, slice) >= 0 {
		return true
	}

	return false
}

654
655
656
657
658
659
660
661
func isDeviceInSliceByID(deviceID int, slice []cnml.DeviceIpv4sGraphserver) bool {
	if posDeviceInSliceByID(deviceID, slice) >= 0 {
		return true
	}

	return false
}

Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
662
663
func posDeviceInAssignationSlice(device int, slice []cnml.DeviceMonitorAssignation) int {
	for k, v := range slice {
664
		if device == v.Device.ID {
Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
			return k
		}
	}
	return -1
}

func isDeviceInAssignationSlice(device int, slice []cnml.DeviceMonitorAssignation) bool {
	if (posDeviceInAssignationSlice(device, slice)) >= 0 {
		return true
	}
	return false
}

func posMonitorInSlice(id string, slice []cnml.AssignChecksum) int {
	for k, v := range slice {
		if ID == v.MonitorID {
			return k
		}
	}
	return -1
}

func isMonitorInSlice(id string, slice []cnml.AssignChecksum) bool {
	if posMonitorInSlice(id, slice) >= 0 {
		return true
	}
	return false
}

694
func rmDeviceFromSlice(device cnml.DeviceIpv4sGraphserver, slice []cnml.DeviceIpv4sGraphserver) []cnml.DeviceIpv4sGraphserver {
Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
695
696
697
698
699
700
701
702
703

	if isDeviceInSlice(device, slice) {
		pos := posDeviceInSlice(device, slice)

		return append(slice[:pos], slice[pos+1:]...)
	}
	return slice
}

704
705
706
707
708
709
710
711
712
713
func rmDeviceFromAssignationList(DeviceID int, slice []cnml.DeviceMonitorAssignation) []cnml.DeviceMonitorAssignation {

	if isDeviceInAssignationSlice(DeviceID, slice) {
		pos := posDeviceInAssignationSlice(DeviceID, slice)

		return append(slice[:pos], slice[pos+1:]...)
	}
	return slice
}

714
715
func startcnmlDevicesChecksumTimer() {
	cnmlDevicesChecksumTicker := time.NewTicker(time.Duration(devsCheckInterval) * time.Second)
Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
716
	go func() {
717
		for range cnmlDevicesChecksumTicker.C {
718
			// TODO: global checksum via AntidoteDB must be implemented in fetch app
719
			// checkcnmlDevicesChecksum()
Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
720
721
722
723
		}
	}()
}

724
func startAssignTimer() {
Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
725
726
727
	localAssignTicker := time.NewTicker(time.Duration(reassignInterval) * time.Second)
	go func() {
		for range localAssignTicker.C {
728
			// Register this monitor, or update the timestamp
729
			registerMonitorInAntidote()
730
731
732
733

			// Update CNML devices list
			cnmlDevices = getCnmlDevicesList()

734
735
			// Update the global assignations list from AntidoteDB
			updateGlobalAssignationList()
736
737

			// Update the local assignations list to check for outdated devices
738
			updateLocalAssignationList()
739
740
741
742
743
744
745
746
747
748
749
750

			// Merge globalAssign into localAssign (local = local OR global)
			mergeGlobalAssignIntoLocalAssign()

			// Do the actual reassignation of devices to this monitor
			reassignDevs()

			// Manage the monitors list
			manageMonitorsList()

			// Sanitize the global assignations in the shared database
			sanitizeAssignationsInDatabase()
Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
751
752
753
754
		}
	}()
}

755
// Register this monitor in the shared database
756
func registerMonitorInAntidote() {
757
	// Update its timestamp
758
	updateMonitorTimestampInAntidote()
759
760
761
762
763
764
765
766
767
768
769
770

	// Get the current monitors list
	monitors := goclidote.AntidoteReadItemsFromSetInBucket("guifi", "monitors", dbHost, dbPort)

	// If this monitor is already there, return
	for _, v := range monitors {
		if v == ID {
			return
		}
	}

	// Write the monitor to the shared database otherwise
771
	goclidote.AntidoteAddItemToSetInBucket("guifi", "monitors", ID, dbHost, dbPort)
772
773
}

774
775
func updateMonitorTimestampInAntidote() {
	timestamp := time.Now().Unix()
776
	goclidote.AntidoteSetRegisterInBucket("guifi", IDts, fmt.Sprintf("%d", timestamp), dbHost, dbPort)
777
778
779
	fmt.Println("Setting timestamp to", timestamp)
}

780
func updateMonitorMaxDevsInAntidote() {
781
	goclidote.AntidoteSetRegisterInBucket("guifi", fmt.Sprintf("%s_maxdevs", ID), fmt.Sprintf("%d", maxDevs), dbHost, dbPort)
782
783
784
	fmt.Println("Setting capacity to", maxDevs)
}

785
786
787
788
789
// Deregister a monitor from AntidoteDB (e.g., on exit, or on crash)
func deregisterMonitorInAntidote(monitorID string) {

	// Try up to 3 times
	for i := 0; i < 3; i++ {
790
		if goclidote.AntidoteRemoveItemFromSetInBucket("guifi", "monitors", monitorID, dbHost, dbPort) {
791
792
793
794
795
			fmt.Println("Deregistration successful")
			break
		} else {
			fmt.Println("Deregistration unsuccessful")
		}
796
797
	}

798
	goclidote.AntidoteRemoveRegisterInBucket("guifi", fmt.Sprintf("%s_ts", monitorID), dbHost, dbPort)
799
800
}

801
802
803
804
805
806
807
func calculateGlobalAverageLoad() float64 {

	var monitors = getCurrentMonitorsList()
	var monitorsMaxDevs []int
	var monitorAssignedDevs []int

	for _, v := range monitors {
808
		capacity, err := strconv.ParseInt(goclidote.AntidoteReadRegisterInBucket("guifi", fmt.Sprintf("%s_maxdevs", v), dbHost, dbPort), 10, 64)
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
		errCheck(err, WARNING, fmt.Sprintf("Monitor %s has an invalid maxDev", v))

		if err == nil && capacity > 0 {
			monitorsMaxDevs = append(monitorsMaxDevs, int(capacity))
		} else {
			monitorsMaxDevs = append(monitorsMaxDevs, 0)
		}
		monitorAssignedDevs = append(monitorAssignedDevs, 0)
	}

	fmt.Println("len(globalAssign):", len(globalAssign))
	for _, v := range globalAssign {
		fmt.Println(v)
		for _, w := range v.MonitorID {
			fmt.Println("len(monitors):", len(monitors))
			for m, x := range monitors {
				if w == x {
					monitorAssignedDevs[m] = monitorAssignedDevs[m] + 1
				}
			}
		}
	}

	var globalMaxDevs uint64
	var globalAssignedDevs uint64

	for _, v := range monitorsMaxDevs {
		globalMaxDevs = globalMaxDevs + uint64(v)
	}
	for _, v := range monitorAssignedDevs {
		globalAssignedDevs = globalAssignedDevs + uint64(v)
	}

	fmt.Println(monitors)
	fmt.Println(monitorsMaxDevs)
	fmt.Println(monitorAssignedDevs)

	if globalMaxDevs > 0 {
		if globalAssignedDevs >= 0 {
			if globalAssignedDevs <= globalMaxDevs {
				// Standard case
				return float64(globalAssignedDevs) / float64(globalMaxDevs)
			}
			// Overload case, or monitors not reporting properly maxDevs
			return 1

		}
		// Weird case with negative assigned devices
		return 0
	}
	// Corner case when no monitors are registered, or not reporting properly;
	// return a compromise 50% load
	return 0.5
}

Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
func errCheck(e error, level int, message string) {

	if message == "" {
		message = "Unknown error"
	}
	if e != nil {
		switch level {
		case FATAL:
			glog.Fatalln(message, e)
		case ERROR:
			glog.Errorln(message, e)
		case WARNING:
			glog.Warningln(message, e)
		default:
			glog.Infoln(message, e)
		}
	}
}