package main import ( "flag" "fmt" "math" "math/rand" "os" "os/signal" "strconv" "time" "uc-monitor-go-test/cnml" "uc-monitor-go-test/goclidote" "github.com/golang/glog" ) var dbHost = "localhost" var dbHostHelp = "The hostname or IP address where AntidoteDB is running" var dbPort = 8087 var dbPortHelp = "The TCP port on which AntidoteDB is listening" // The Timestamps struct contains a monitor's timestamp and a local timestamp type Timestamps struct { monitor int64 local int64 } // The MonitorTimestamp struct contains a monitor's id and timestamps type MonitorTimestamp struct { id string timestamps Timestamps } // Error levels // FATAL output code var FATAL = 1 // ERROR output code var ERROR = 2 // WARNING output code var WARNING = 3 // INFO output code var INFO = 4 // DEFAULTFILEMODE for created files var DEFAULTFILEMODE os.FileMode = 0755 //Default settings and descriptions // monitorTimeout defines how long to wait before removing a crashed monitor from the database var monitorTimeout int64 = 450 var monitorTimeoutHelp = "Time before deregistering crashed monitors or in network partition" // idNum holds the monitor id, in numberic form var idNum int // ID represents the ID of the monitor, in alfanumeric form var ID = "0" var idHelp = "Numeric ID of this monitoring instance" // IDts holds a timestamp var IDts = "0_ts" // minMons defines the minimum number of monitors a deviced must be assigned to // so that it is considered to have enough redundancy var minMons = 3 var minMonsHelp = "Minimum number of monitors to assign to each device" // maxMons defines the maximum number of monitors a device must be assigned to, // so that it is not monitored by more monitors than needed var maxMons = minMons + 1 var maxMonsHelp = "Maximum number of monitors to assign to each device" // maxDevs defines the default maximum number of nodes a monitor is assigned to var maxDevs = 1000 var maxDevsHelp = "Maximum number of devices to monitor" var devsCheckInterval = 150 var devsCheckIntervalHelp = "Interval (in seconds) in which to check for changes in the devices list" var fillingTime = 150 var fillingTimeHelp = "Interval (in seconds) for filling the local assignation list" var reassignInterval = 150 var reassignIntervalHelp = "Interval (in seconds) in which to update the local devices assignation list" var globalAssignCheckInterval = 150 var globalAssignCheckIntervalHelp = "Interval (in seconds) in which to check for changes in the global devices assignation list" var policy = 1 var policyHelp = "Network devices to monitoring servers assignation policy" // Global data structures var cnmlDevices []cnml.DeviceIpv4sGraphserver //The global list with all the fetched devices and their IPv4 addresses var cnmlDevicesChecksum string //The checksum of the last global devices list and IPv4 addresses fetched var localAssign []cnml.DeviceMonitorAssignation //The list of locally assigned devices and their IPv4 addresses var globalAssign []cnml.DeviceMonitorAssignation //The global assignation list of devices<=>monitors var globalTimestamps []MonitorTimestamp func main() { //Initialize the service (read the configuration flags, etc.) initialize() // Handle interrupt signals in order to terminate gracefully signalChan := make(chan os.Signal, 1) cleanupDone := make(chan bool) signal.Notify(signalChan, os.Interrupt) go func() { for _ = range signalChan { fmt.Println("\nReceived an interrupt, deregistering from the monitors list and quitting...") deregisterMonitorInAntidote(ID) cleanupDone <- true } }() <-cleanupDone fmt.Println("\nBye!") } // The initialize() function manages the execution parameters, sets timers, etc. func initialize() { fmt.Println("Initializing...") // Manage command line flags and execution settings initializeFlags() // Register as a monitor to Antidote's list of active Monitors registerMonitorInAntidote() // Publish the maxDev value for this monitor updateMonitorMaxDevsInAntidote() // Initialize periodic timers initializeTimers() // The application is now initialized; entering the loop fmt.Println("Initialization done. Entering infinite loop...") } func initializeTimers() { // Initialize cnmlDevices checksum changes check timer startcnmlDevicesChecksumTimer() // Initialize cnmlDevices checksum changes check timer startAssignTimer() } // The initializeFlags function initializes the command line flags func initializeFlags() { //Define and parse command line flags dbHostPtr := flag.String("db_host", dbHost, dbHostHelp) dbPortPtr := flag.Int("db_port", dbPort, dbPortHelp) idInt, _ := strconv.ParseInt(ID, 10, 32) idPtr := flag.Int("id", int(idInt), idHelp) minMonsPtr := flag.Int("minMons", minMons, minMonsHelp) maxMonsPtr := flag.Int("maxMons", maxMons, maxMonsHelp) maxDevsPtr := flag.Int("maxDevs", maxDevs, maxDevsHelp) devsCheckIntervalPtr := flag.Int("devsCheckInterval", devsCheckInterval, devsCheckIntervalHelp) fillingTimePtr := flag.Int("fillingTime", fillingTime, fillingTimeHelp) reassignIntervalPtr := flag.Int("reassignInterval", reassignInterval, reassignIntervalHelp) globalAssignCheckIntervalPtr := flag.Int("globalAssignCheckInterval", globalAssignCheckInterval, globalAssignCheckIntervalHelp) monitorTimeoutPtr := flag.Int("monitorTimeout", int(monitorTimeout), monitorTimeoutHelp) policyPtr := flag.Int("policy", int(policy), policyHelp) flag.Parse() //Service id if *idPtr > 0 { idNum = *idPtr ID = fmt.Sprint(*idPtr) IDts = fmt.Sprintf("%s_ts", ID) fmt.Println("Using ID", ID) } else { randsource := rand.New(rand.NewSource(time.Now().UnixNano())) ID = fmt.Sprint("a", 100000+randsource.Intn(900000)) fmt.Println("Using anonymous id", ID) IDts = fmt.Sprintf("%s_ts", ID) } dbHost = *dbHostPtr dbPort = *dbPortPtr //Maximum devices to monitor if *maxDevsPtr > 0 { maxDevs = *maxDevsPtr } // Minimum and maximum monitors per device if *minMonsPtr > 0 { minMons = *minMonsPtr } if *maxMonsPtr >= minMons { maxMons = *maxMonsPtr } //Timers for tickers (intervals) if *devsCheckIntervalPtr > 0 { devsCheckInterval = *devsCheckIntervalPtr } if *fillingTimePtr > 0 { fillingTime = *fillingTimePtr } if *reassignIntervalPtr > 0 { reassignInterval = *reassignIntervalPtr } if *globalAssignCheckIntervalPtr > 0 { globalAssignCheckInterval = *globalAssignCheckIntervalPtr } if *monitorTimeoutPtr > 0 { monitorTimeout = int64(*monitorTimeoutPtr) } if monitorTimeout <= int64(globalAssignCheckInterval) || monitorTimeout <= int64(reassignInterval) { monitorTimeout = int64(math.Max(float64(globalAssignCheckInterval), float64(reassignInterval))) + 1 } //Policy if *policyPtr >= 1 && *policyPtr <= 2 { policy = *policyPtr } } // Get the global devices list from AntidoteDB func getCnmlDevicesList() []cnml.DeviceIpv4sGraphserver { return goclidote.ReadDevicesFromAntidote(dbHost, dbPort) } // Update globalAssign from the current shared database status func updateGlobalAssignationList() { fmt.Println("Updating globalAssign...") var newDevices []cnml.DeviceMonitorAssignation var currDevices []cnml.DeviceMonitorAssignation var oldDevices []cnml.DeviceMonitorAssignation // Iterate through all the devices known to the monitoring system for _, v := range cnmlDevices { if isDeviceInAssignationSlice(v.ID, globalAssign) { // We already know this device; update its monitors assignation from AntidoteDB globalAssign[posDeviceInAssignationSlice(v.ID, globalAssign)].MonitorID = goclidote.AntidoteReadItemsFromSetInBucket(fmt.Sprintf("device-%d", v.ID), "monitors", dbHost, dbPort) } else { // A new device from the cnmlDevices list; add it to a new devices list to append it below fmt.Printf("Adding device %d from cnmlDevices into globalAssign\n", v.ID) var newDev cnml.DeviceMonitorAssignation newDev.Device.ID = v.ID newDev.Device.Addresses = v.Addresses newDev.MonitorID = goclidote.AntidoteReadItemsFromSetInBucket(fmt.Sprintf("device-%d", v.ID), "monitors", dbHost, dbPort) newDevices = append(newDevices, newDev) } } // The globalAssign list may contain old devices, not in the CNML anymore; discard them for _, v := range globalAssign { if isDeviceInSliceByID(v.Device.ID, cnmlDevices) { currDevices = append(currDevices, v) } else { oldDevices = append(oldDevices, v) } } // Leave in globalAssign only current devices globalAssign = currDevices // Append the new devices to the globalAssign list for _, v := range newDevices { globalAssign = append(globalAssign, v) } fmt.Println("globalAssign updated!") } func updateLocalAssignationList() { var currlocalAssign []cnml.DeviceMonitorAssignation var oldlocalAssign []cnml.DeviceMonitorAssignation // The localAssign list may contain old devices, not in the CNML anymore; // discard them. for _, v := range localAssign { if isDeviceInSliceByID(v.Device.ID, cnmlDevices) { currlocalAssign = append(currlocalAssign, v) } else { oldlocalAssign = append(oldlocalAssign, v) } } // TODO: also look for devices in localAssign NOT assigned to this monitor, which should never have happened localAssign = currlocalAssign } func mergeGlobalAssignIntoLocalAssign() { var newlocalAssign []cnml.DeviceMonitorAssignation // Search for those devices in the globalAssign list that have this monitor // assigned. Check if they are already in the localAssign list, or add them // otherwise for _, v := range globalAssign { for _, w := range v.MonitorID { if w == ID { var inLocal = false for _, x := range localAssign { if v.Device.ID == x.Device.ID { inLocal = true break } } if !inLocal { newlocalAssign = append(newlocalAssign, v) } } } } for _, v := range newlocalAssign { localAssign = append(localAssign, v) } } func manageMonitorsList() { fmt.Println("") fmt.Println("Managing the monitors list...") // Fetch the current monitors list currentMonitors := goclidote.AntidoteReadItemsFromSetInBucket("guifi", "monitors", dbHost, dbPort) fmt.Println(len(currentMonitors), "monitors registered in the database:") var monitors = "" for _, v := range currentMonitors { if monitors == "" { monitors = v } else { monitors = fmt.Sprintf("%s, %s", monitors, v) } } fmt.Println(monitors) // Create an array and store the timestamps fetched from the database var currentTimestamps []MonitorTimestamp for _, v := range currentMonitors { var thisMonitor MonitorTimestamp mts, err := strconv.ParseInt(goclidote.AntidoteReadRegisterInBucket("guifi", fmt.Sprintf("%s_ts", v), dbHost, dbPort), 10, 64) errCheck(err, WARNING, fmt.Sprintf("Unable to read the timestamp from Antidote for monitor %s", v)) thisMonitor.id = v thisMonitor.timestamps.monitor = mts thisMonitor.timestamps.local = time.Now().Unix() currentTimestamps = append(currentTimestamps, thisMonitor) } var nextGlobalTimestamps []MonitorTimestamp // Iterate the fetched timestamps list and process it for _, v := range currentTimestamps { var knownMonitor = false // Iterate the local timestamps list for _, w := range globalTimestamps { // Look for matching IDs in both lists if v.id == w.id { knownMonitor = true // Check if the timestamps are the same (i.e., the monitor has not // reported itself to the database recently) if v.timestamps.monitor == w.timestamps.monitor { // The monitor is not updating its info to the database, check local timestamps if v.timestamps.local-w.timestamps.local > monitorTimeout { // Deregister the monitor fmt.Println("Deregistering crashed monitor", v.id) deregisterMonitorInAntidote(v.id) } else { nextGlobalTimestamps = append(nextGlobalTimestamps, w) } // The monitor is updating its info to the database, keep the latest info } else { nextGlobalTimestamps = append(nextGlobalTimestamps, v) } } } // This monitor is new, save it for the next iteration if knownMonitor == false { nextGlobalTimestamps = append(nextGlobalTimestamps, v) } } globalTimestamps = nextGlobalTimestamps } // The sanitizeAssignationsInDatabase function gets the current list of monitors // from the database, the current nodes<->monitors assignations, and sanitizes // it (unassigns crashed monitors, etc.) func sanitizeAssignationsInDatabase() { fmt.Println("") fmt.Println("Sanitizing the assignation list...") // TODO: This list of monitors should be the same as the one just asked for // in the manageMonitors function. Try to minimize calls by doing it only once // Update the current monitors list fmt.Println("Getting the current monitors list...") currentMonitors := getCurrentMonitorsList() // Update the current devices list fmt.Println("Updating the current cnml...") cnmlDevices = getCnmlDevicesList() for _, v := range globalAssign { // fmt.Println("############") // fmt.Println(k, ": device ID:", v.Device.ID) // fmt.Println("Monitors:", v.MonitorID) for _, w := range v.MonitorID { var monitorFound = false for _, x := range currentMonitors { if w == x { monitorFound = true fmt.Println(fmt.Sprintf("Monitor %s found, keeping it for device %d", w, v.Device.ID)) } } if monitorFound == false { fmt.Println(fmt.Sprintf("Monitor %s not found, unassigning it from device %d", w, v.Device.ID)) unassignMonitorFromDevice(w, v.Device.ID) } } } fmt.Println("Ended assignation list sanitization...") } func getCurrentMonitorsList() []string { return goclidote.AntidoteReadItemsFromSetInBucket("guifi", "monitors", dbHost, dbPort) } func assignMonitorFromDevice(monitorID string, deviceID int) bool { return goclidote.AntidoteAddItemToSetInBucket(fmt.Sprintf("device-%d", deviceID), "monitors", monitorID, dbHost, dbPort) } func unassignMonitorFromDevice(monitorID string, deviceID int) bool { return goclidote.AntidoteRemoveItemFromSetInBucket(fmt.Sprintf("device-%d", deviceID), "monitors", monitorID, dbHost, dbPort) } func reassignDevs() { fmt.Println("") fmt.Println("Reassignation of devices") fmt.Println(len(localAssign), "devices currently assigned to this monitor (maximum:", maxDevs, "devices)") // Track changes in the assignation list assignChange := false switch policy { case 1: // Policy #1: The monitoring server self-assigns those network devices that // have less monitoring devices assigned than the minimum number specified // by the minMons parameter. Devices are picked randomly until the server // reaches its maximum number of monitored devices, specified by the maxDevs // parameter. // Check if more nodes can be assigned, and if there are any devices not // being monitored by this server that could be assigned if (len(localAssign) < maxDevs) && (len(localAssign) < len(cnmlDevices)) { // Calculate how many new devices can be assigned toassign := int(math.Min(float64(int(math.Min(math.Min(float64(len(cnmlDevices)), float64(maxDevs))*float64(devsCheckInterval)/float64(fillingTime)+1, float64(len(cnmlDevices)-len(localAssign))))), float64(maxDevs-len(localAssign)))) fmt.Println("Assigning up to", toassign, "new devices") // Create a list with all the unassigned devices var unassignedDevices []cnml.DeviceIpv4sGraphserver for _, v := range cnmlDevices { if !isDeviceInAssignationSlice(v.ID, localAssign) && (!isDeviceInAssignationSlice(v.ID, globalAssign) || len(globalAssign[posDeviceInAssignationSlice(v.ID, globalAssign)].MonitorID) < minMons) { unassignedDevices = append(unassignedDevices, v) } } fmt.Println(len(unassignedDevices), "devices unassigned") fmt.Println("Picking", toassign, "devices randomly") // Select some devices to self-assign, up to the previously calculated // value in the "toassign" variable for i := 0; i < toassign; i++ { // Check if unassigned devices remain if len(unassignedDevices) > 0 { // Pick a random device from the unassigned list randsource := rand.New(rand.NewSource(time.Now().UnixNano())) j := randsource.Intn(len(unassignedDevices)) // Save it to the new assigned devices list var newAssignDev cnml.DeviceMonitorAssignation newAssignDev.Device.ID = unassignedDevices[j].ID newAssignDev.Device.Addresses = unassignedDevices[j].Addresses newAssignDev.MonitorID = append(newAssignDev.MonitorID, ID) localAssign = append(localAssign, newAssignDev) assignChange = true // Remove it from the unassigned devices list unassignedDevices = rmDeviceFromSlice(unassignedDevices[j], unassignedDevices) } else { fmt.Println(toassign-i, "more devices were to be assigned, but no unassigned devices remain") break } } fmt.Println(len(localAssign), "devices currently assigned to this monitor") } else { fmt.Println("Not assigning any new device.") } case 2: // Policy #2: The monitoring server self-assigns those network devices that // have the least monitoring devices already assigned. Devices are picked // randomly, from those with less monitors, until the server reaches its // maximum number of monitored devices specified by the maxDevs parameter. // Check if more nodes can be assigned, and if there are any devices not // being monitored by this server that could be assigned if (len(localAssign) < maxDevs) && (len(localAssign) < len(cnmlDevices)) { // Calculate how many new devices can be assigned toassign := int(math.Min(float64(int(math.Min(math.Min(float64(len(cnmlDevices)), float64(maxDevs))*float64(devsCheckInterval)/float64(fillingTime)+1, float64(len(cnmlDevices)-len(localAssign))))), float64(maxDevs-len(localAssign)))) fmt.Println("Assigning up to", toassign, "new devices") // Calculate which is the number of least monitors a device has leastMons := 0 var unassignedDevices []cnml.DeviceIpv4sGraphserver for leastMons < maxMons && len(unassignedDevices) == 0 { for _, v := range cnmlDevices { if !isDeviceInAssignationSlice(v.ID, globalAssign) || (!isDeviceInAssignationSlice(v.ID, localAssign) && len(globalAssign[posDeviceInAssignationSlice(v.ID, globalAssign)].MonitorID) < leastMons) { unassignedDevices = append(unassignedDevices, v) } } leastMons = leastMons + 1 } fmt.Println(len(unassignedDevices), "devices unassigned") fmt.Println("Picking up to", toassign, "devices randomly") // Select some devices to self-assign, up to the previously calculated // value in the "toassign" variable for i := 0; i < toassign; i++ { // Check if unassigned devices remain if len(unassignedDevices) > 0 { // Pick a random device from the unassigned list randsource := rand.New(rand.NewSource(time.Now().UnixNano())) j := randsource.Intn(len(unassignedDevices)) // Save it to the new assigned devices list var newAssignDev cnml.DeviceMonitorAssignation newAssignDev.Device.ID = unassignedDevices[j].ID newAssignDev.Device.Addresses = unassignedDevices[j].Addresses newAssignDev.MonitorID = append(newAssignDev.MonitorID, ID) localAssign = append(localAssign, newAssignDev) assignChange = true // Remove it from the unassigned devices list unassignedDevices = rmDeviceFromSlice(unassignedDevices[j], unassignedDevices) } else { fmt.Println(toassign-i, "more devices were to be assigned, but no unassigned devices remain") break } } fmt.Println(len(localAssign), "devices currently assigned to this monitor") } else { fmt.Println("Not assigning any new device.") } fmt.Println("Global average load:", calculateGlobalAverageLoad()) } // Update the new assigned devices to the global shared list if assignChange { fmt.Println("Exporting the new assigned devices list") for _, v := range localAssign { goclidote.AntidoteAddItemToSetInBucket(fmt.Sprintf("device-%d", v.Device.ID), "monitors", ID, dbHost, dbPort) } // localLoad := math.min(1, float64(len(localAssign))/float64(maxDevs)) } } // // Check for devices assigned locally that are no longer in the global devices list // func checkOutdatedLocalDevices() { // // assignChange := false // Track changes in the assignation list // var deldevs []cnml.DeviceMonitorAssignation // Storage for the deleted devices // newlocalAssign := localAssign // Duplicate the local list of assigned devices // // for _, v := range localAssign { // if !isDeviceInSliceByID(v.Device.ID, cnmlDevices) { // fmt.Println("Removing old device ", v.Device.ID, " from the local assigned devices list") // deldevs = append(deldevs, v) // newlocalAssign = rmDeviceFromAssignationList(v.Device.ID, newlocalAssign) // assignChange = true // } // } // // // Save the new local assigned devices, if changes have been made // if assignChange { // localAssign = newlocalAssign // for _, v := range deldevs { // goclidote.AntidoteRemoveItemFromSetInBucket(fmt.Sprintf("device-%d", v.Device.ID), "monitors", ID, dbHost, dbPort) // } // } // } func addDevicesAssignedByGraphserver() { for _, v := range cnmlDevices { if isDeviceInAssignationSlice(v.ID, localAssign) { graphserver := false deviceMonitors := globalAssign[posDeviceInAssignationSlice(v.ID, localAssign)].MonitorID for _, w := range deviceMonitors { if w == ID { graphserver = true break } } if !graphserver { globalAssign[posDeviceInAssignationSlice(v.ID, localAssign)].MonitorID = append(deviceMonitors, ID) } } } } func posDeviceInSlice(device cnml.DeviceIpv4sGraphserver, slice []cnml.DeviceIpv4sGraphserver) int { for k, v := range slice { if device.ID == v.ID { return k } } return -1 } func posDeviceInSliceByID(deviceID int, slice []cnml.DeviceIpv4sGraphserver) int { for k, v := range slice { if deviceID == v.ID { return k } } return -1 } func isDeviceInSlice(device cnml.DeviceIpv4sGraphserver, slice []cnml.DeviceIpv4sGraphserver) bool { if posDeviceInSlice(device, slice) >= 0 { return true } return false } func isDeviceInSliceByID(deviceID int, slice []cnml.DeviceIpv4sGraphserver) bool { if posDeviceInSliceByID(deviceID, slice) >= 0 { return true } return false } func posDeviceInAssignationSlice(device int, slice []cnml.DeviceMonitorAssignation) int { for k, v := range slice { if device == v.Device.ID { return k } } return -1 } func isDeviceInAssignationSlice(device int, slice []cnml.DeviceMonitorAssignation) bool { if (posDeviceInAssignationSlice(device, slice)) >= 0 { return true } return false } func posMonitorInSlice(id string, slice []cnml.AssignChecksum) int { for k, v := range slice { if ID == v.MonitorID { return k } } return -1 } func isMonitorInSlice(id string, slice []cnml.AssignChecksum) bool { if posMonitorInSlice(id, slice) >= 0 { return true } return false } func rmDeviceFromSlice(device cnml.DeviceIpv4sGraphserver, slice []cnml.DeviceIpv4sGraphserver) []cnml.DeviceIpv4sGraphserver { if isDeviceInSlice(device, slice) { pos := posDeviceInSlice(device, slice) return append(slice[:pos], slice[pos+1:]...) } return slice } func rmDeviceFromAssignationList(DeviceID int, slice []cnml.DeviceMonitorAssignation) []cnml.DeviceMonitorAssignation { if isDeviceInAssignationSlice(DeviceID, slice) { pos := posDeviceInAssignationSlice(DeviceID, slice) return append(slice[:pos], slice[pos+1:]...) } return slice } func startcnmlDevicesChecksumTimer() { cnmlDevicesChecksumTicker := time.NewTicker(time.Duration(devsCheckInterval) * time.Second) go func() { for range cnmlDevicesChecksumTicker.C { // TODO: global checksum via AntidoteDB must be implemented in fetch app // checkcnmlDevicesChecksum() } }() } func startAssignTimer() { localAssignTicker := time.NewTicker(time.Duration(reassignInterval) * time.Second) go func() { for range localAssignTicker.C { // Register this monitor, or update the timestamp registerMonitorInAntidote() // Update CNML devices list cnmlDevices = getCnmlDevicesList() // Update the global assignations list from AntidoteDB updateGlobalAssignationList() // Update the local assignations list to check for outdated devices updateLocalAssignationList() // Merge globalAssign into localAssign (local = local OR global) mergeGlobalAssignIntoLocalAssign() // Do the actual reassignation of devices to this monitor reassignDevs() // Manage the monitors list manageMonitorsList() // Sanitize the global assignations in the shared database sanitizeAssignationsInDatabase() } }() } // Register this monitor in the shared database func registerMonitorInAntidote() { // Update its timestamp updateMonitorTimestampInAntidote() // Get the current monitors list monitors := goclidote.AntidoteReadItemsFromSetInBucket("guifi", "monitors", dbHost, dbPort) // If this monitor is already there, return for _, v := range monitors { if v == ID { return } } // Write the monitor to the shared database otherwise goclidote.AntidoteAddItemToSetInBucket("guifi", "monitors", ID, dbHost, dbPort) } func updateMonitorTimestampInAntidote() { timestamp := time.Now().Unix() goclidote.AntidoteSetRegisterInBucket("guifi", IDts, fmt.Sprintf("%d", timestamp), dbHost, dbPort) fmt.Println("Setting timestamp to", timestamp) } func updateMonitorMaxDevsInAntidote() { goclidote.AntidoteSetRegisterInBucket("guifi", fmt.Sprintf("%s_maxdevs", ID), fmt.Sprintf("%d", maxDevs), dbHost, dbPort) fmt.Println("Setting capacity to", maxDevs) } // Deregister a monitor from AntidoteDB (e.g., on exit, or on crash) func deregisterMonitorInAntidote(monitorID string) { // Try up to 3 times for i := 0; i < 3; i++ { if goclidote.AntidoteRemoveItemFromSetInBucket("guifi", "monitors", monitorID, dbHost, dbPort) { fmt.Println("Deregistration successful") break } else { fmt.Println("Deregistration unsuccessful") } } goclidote.AntidoteRemoveRegisterInBucket("guifi", fmt.Sprintf("%s_ts", monitorID), dbHost, dbPort) } func calculateGlobalAverageLoad() float64 { var monitors = getCurrentMonitorsList() var monitorsMaxDevs []int var monitorAssignedDevs []int for _, v := range monitors { capacity, err := strconv.ParseInt(goclidote.AntidoteReadRegisterInBucket("guifi", fmt.Sprintf("%s_maxdevs", v), dbHost, dbPort), 10, 64) errCheck(err, WARNING, fmt.Sprintf("Monitor %s has an invalid maxDev", v)) if err == nil && capacity > 0 { monitorsMaxDevs = append(monitorsMaxDevs, int(capacity)) } else { monitorsMaxDevs = append(monitorsMaxDevs, 0) } monitorAssignedDevs = append(monitorAssignedDevs, 0) } fmt.Println("len(globalAssign):", len(globalAssign)) for _, v := range globalAssign { fmt.Println(v) for _, w := range v.MonitorID { fmt.Println("len(monitors):", len(monitors)) for m, x := range monitors { if w == x { monitorAssignedDevs[m] = monitorAssignedDevs[m] + 1 } } } } var globalMaxDevs uint64 var globalAssignedDevs uint64 for _, v := range monitorsMaxDevs { globalMaxDevs = globalMaxDevs + uint64(v) } for _, v := range monitorAssignedDevs { globalAssignedDevs = globalAssignedDevs + uint64(v) } fmt.Println(monitors) fmt.Println(monitorsMaxDevs) fmt.Println(monitorAssignedDevs) if globalMaxDevs > 0 { if globalAssignedDevs >= 0 { if globalAssignedDevs <= globalMaxDevs { // Standard case return float64(globalAssignedDevs) / float64(globalMaxDevs) } // Overload case, or monitors not reporting properly maxDevs return 1 } // Weird case with negative assigned devices return 0 } // Corner case when no monitors are registered, or not reporting properly; // return a compromise 50% load return 0.5 } func errCheck(e error, level int, message string) { if message == "" { message = "Unknown error" } if e != nil { switch level { case FATAL: glog.Fatalln(message, e) case ERROR: glog.Errorln(message, e) case WARNING: glog.Warningln(message, e) default: glog.Infoln(message, e) } } }