Developing lightweight computation at the DSG edge

Commit 3da01f78 authored by Roger Pueyo Centelles's avatar Roger Pueyo Centelles
Browse files

[monitor-assign] Refactor code to reduce database calls (I)


Signed-off-by: Roger Pueyo Centelles's avatarRoger Pueyo Centelles <rpueyo@ac.upc.edu>
parent f43a74c1
...@@ -133,10 +133,6 @@ func initialize() { ...@@ -133,10 +133,6 @@ func initialize() {
// Register as a monitor to Antidote's list of active Monitors // Register as a monitor to Antidote's list of active Monitors
registerMonitorInAntidote() registerMonitorInAntidote()
// Import the global devices list and the global assignation lists
cnmlDevices = getCnmlDevicesList()
updateGlobalAssignationList()
// Publish the maxDev value for this monitor // Publish the maxDev value for this monitor
updateMonitorMaxDevsInAntidote() updateMonitorMaxDevsInAntidote()
...@@ -152,7 +148,7 @@ func initializeTimers() { ...@@ -152,7 +148,7 @@ func initializeTimers() {
startcnmlDevicesChecksumTimer() startcnmlDevicesChecksumTimer()
// Initialize cnmlDevices checksum changes check timer // Initialize cnmlDevices checksum changes check timer
startLocalAssignTimer() startAssignTimer()
} }
// The initializeFlags function initializes the command line flags // The initializeFlags function initializes the command line flags
...@@ -225,10 +221,9 @@ func initializeFlags() { ...@@ -225,10 +221,9 @@ func initializeFlags() {
} }
//Policy //Policy
if *policyPtr >= 1 && *policyPtr <= 3 { if *policyPtr >= 1 && *policyPtr <= 2 {
policy = *policyPtr policy = *policyPtr
} }
} }
// Get the global devices list from AntidoteDB // Get the global devices list from AntidoteDB
...@@ -236,17 +231,21 @@ func getCnmlDevicesList() []cnml.DeviceIpv4sGraphserver { ...@@ -236,17 +231,21 @@ func getCnmlDevicesList() []cnml.DeviceIpv4sGraphserver {
return goclidote.ReadDevicesFromAntidote(dbHost, dbPort) return goclidote.ReadDevicesFromAntidote(dbHost, dbPort)
} }
//Import all the monitor=>device assignations // Update globalAssign from the current shared database status
func updateGlobalAssignationList() { func updateGlobalAssignationList() {
fmt.Println("Updating globalAssign...") fmt.Println("Updating globalAssign...")
var newDevices []cnml.DeviceMonitorAssignation var newDevices []cnml.DeviceMonitorAssignation
var currDevices []cnml.DeviceMonitorAssignation
var oldDevices []cnml.DeviceMonitorAssignation
// Iterate through all the devices known to the monitoring system
for _, v := range cnmlDevices { for _, v := range cnmlDevices {
if isDeviceInAssignationSlice(v.ID, globalAssign) { if isDeviceInAssignationSlice(v.ID, globalAssign) {
// We already know this device; update its monitors assignation from AntidoteDB // We already know this device; update its monitors assignation from AntidoteDB
globalAssign[posDeviceInAssignationSlice(v.ID, globalAssign)].MonitorID = goclidote.AntidoteReadItemsFromSetInBucket(fmt.Sprintf("device-%d", v.ID), "monitors", dbHost, dbPort) globalAssign[posDeviceInAssignationSlice(v.ID, globalAssign)].MonitorID = goclidote.AntidoteReadItemsFromSetInBucket(fmt.Sprintf("device-%d", v.ID), "monitors", dbHost, dbPort)
} else { } else {
// A new device from the cnmlDevices list; let's add it to our global assignation list // A new device from the cnmlDevices list; add it to a new devices list to append it below
fmt.Printf("Adding device %d from cnmlDevices into globalAssign\n", v.ID) fmt.Printf("Adding device %d from cnmlDevices into globalAssign\n", v.ID)
var newDev cnml.DeviceMonitorAssignation var newDev cnml.DeviceMonitorAssignation
newDev.Device.ID = v.ID newDev.Device.ID = v.ID
...@@ -256,43 +255,78 @@ func updateGlobalAssignationList() { ...@@ -256,43 +255,78 @@ func updateGlobalAssignationList() {
} }
} }
// The globalAssign list may contain old devices, not in the CNML anymore; discard them
for _, v := range globalAssign {
if isDeviceInSliceByID(v.Device.ID, cnmlDevices) {
currDevices = append(currDevices, v)
} else {
oldDevices = append(oldDevices, v)
}
}
// Leave in globalAssign only current devices
globalAssign = currDevices
// Append the new devices to the globalAssign list // Append the new devices to the globalAssign list
for _, v := range newDevices { for _, v := range newDevices {
globalAssign = append(globalAssign, v) globalAssign = append(globalAssign, v)
} }
// The globalAssign list may contain old devices, not in the CNML anymore; discard them fmt.Println("globalAssign updated!")
newDevices = nil }
for _, v := range globalAssign {
func updateLocalAssignationList() {
var currlocalAssign []cnml.DeviceMonitorAssignation
var oldlocalAssign []cnml.DeviceMonitorAssignation
// The localAssign list may contain old devices, not in the CNML anymore;
// discard them.
for _, v := range localAssign {
if isDeviceInSliceByID(v.Device.ID, cnmlDevices) { if isDeviceInSliceByID(v.Device.ID, cnmlDevices) {
newDevices = append(newDevices, v) currlocalAssign = append(currlocalAssign, v)
} else {
oldlocalAssign = append(oldlocalAssign, v)
} }
} }
globalAssign = newDevices
fmt.Println("globalAssign updated!") // TODO: also look for devices in localAssign NOT assigned to this monitor, which should never have happened
localAssign = currlocalAssign
} }
func updateLocalAssignationList() { func mergeGlobalAssignIntoLocalAssign() {
var newlocalAssign []cnml.DeviceMonitorAssignation var newlocalAssign []cnml.DeviceMonitorAssignation
// Search for those devices in the globalAssign list that have this monitor
// assigned. Check if they are already in the localAssign list, or add them
// otherwise
for _, v := range globalAssign { for _, v := range globalAssign {
for _, w := range v.MonitorID { for _, w := range v.MonitorID {
if w == ID { if w == ID {
newlocalAssign = append(newlocalAssign, v) var inLocal = false
for _, x := range localAssign {
if v.Device.ID == x.Device.ID {
inLocal = true
break break
} }
} }
if !inLocal {
newlocalAssign = append(newlocalAssign, v)
}
}
}
} }
localAssign = newlocalAssign for _, v := range newlocalAssign {
localAssign = append(localAssign, v)
}
} }
func manageMonitorsList() { func manageMonitorsList() {
fmt.Println("") fmt.Println("")
fmt.Println("Managing the monitors list...") fmt.Println("Managing the monitors list...")
fmt.Println("I am monitor", ID)
// Fetch the current monitors list // Fetch the current monitors list
currentMonitors := goclidote.AntidoteReadItemsFromSetInBucket("guifi", "monitors", dbHost, dbPort) currentMonitors := goclidote.AntidoteReadItemsFromSetInBucket("guifi", "monitors", dbHost, dbPort)
...@@ -370,6 +404,9 @@ func sanitizeAssignationsInDatabase() { ...@@ -370,6 +404,9 @@ func sanitizeAssignationsInDatabase() {
fmt.Println("") fmt.Println("")
fmt.Println("Sanitizing the assignation list...") fmt.Println("Sanitizing the assignation list...")
// TODO: This list of monitors should be the same as the one just asked for
// in the manageMonitors function. Try to minimize calls by doing it only once
// Update the current monitors list // Update the current monitors list
fmt.Println("Getting the current monitors list...") fmt.Println("Getting the current monitors list...")
currentMonitors := getCurrentMonitorsList() currentMonitors := getCurrentMonitorsList()
...@@ -403,6 +440,10 @@ func getCurrentMonitorsList() []string { ...@@ -403,6 +440,10 @@ func getCurrentMonitorsList() []string {
return goclidote.AntidoteReadItemsFromSetInBucket("guifi", "monitors", dbHost, dbPort) return goclidote.AntidoteReadItemsFromSetInBucket("guifi", "monitors", dbHost, dbPort)
} }
func assignMonitorFromDevice(monitorID string, deviceID int) bool {
return goclidote.AntidoteAddItemToSetInBucket(fmt.Sprintf("device-%d", deviceID), "monitors", monitorID, dbHost, dbPort)
}
func unassignMonitorFromDevice(monitorID string, deviceID int) bool { func unassignMonitorFromDevice(monitorID string, deviceID int) bool {
return goclidote.AntidoteRemoveItemFromSetInBucket(fmt.Sprintf("device-%d", deviceID), "monitors", monitorID, dbHost, dbPort) return goclidote.AntidoteRemoveItemFromSetInBucket(fmt.Sprintf("device-%d", deviceID), "monitors", monitorID, dbHost, dbPort)
} }
...@@ -412,21 +453,8 @@ func reassignDevs() { ...@@ -412,21 +453,8 @@ func reassignDevs() {
fmt.Println("Reassignation of devices") fmt.Println("Reassignation of devices")
fmt.Println(len(localAssign), "devices currently assigned to this monitor (maximum:", maxDevs, "devices)") fmt.Println(len(localAssign), "devices currently assigned to this monitor (maximum:", maxDevs, "devices)")
// fmt.Println(cnmlDevices) // Track changes in the assignation list
// fmt.Println(globalAssign) assignChange := false
// fmt.Println(localAssign)
// Update the CNML devices list from AntidoteDB
fmt.Println("Updating the current cnml...")
cnmlDevices = getCnmlDevicesList()
// Update the local assignations list from AntidoteDB
updateLocalAssignationList()
// Add to the local assignation list those devices that are assigned via Graphserver
addDevicesAssignedByGraphserver()
assgnchanges := false // Track changes in the assignation list
switch policy { switch policy {
case 1: case 1:
...@@ -469,7 +497,7 @@ func reassignDevs() { ...@@ -469,7 +497,7 @@ func reassignDevs() {
newAssignDev.Device.Addresses = unassignedDevices[j].Addresses newAssignDev.Device.Addresses = unassignedDevices[j].Addresses
newAssignDev.MonitorID = append(newAssignDev.MonitorID, ID) newAssignDev.MonitorID = append(newAssignDev.MonitorID, ID)
localAssign = append(localAssign, newAssignDev) localAssign = append(localAssign, newAssignDev)
assgnchanges = true assignChange = true
// Remove it from the unassigned devices list // Remove it from the unassigned devices list
unassignedDevices = rmDeviceFromSlice(unassignedDevices[j], unassignedDevices) unassignedDevices = rmDeviceFromSlice(unassignedDevices[j], unassignedDevices)
} else { } else {
...@@ -527,7 +555,7 @@ func reassignDevs() { ...@@ -527,7 +555,7 @@ func reassignDevs() {
newAssignDev.Device.Addresses = unassignedDevices[j].Addresses newAssignDev.Device.Addresses = unassignedDevices[j].Addresses
newAssignDev.MonitorID = append(newAssignDev.MonitorID, ID) newAssignDev.MonitorID = append(newAssignDev.MonitorID, ID)
localAssign = append(localAssign, newAssignDev) localAssign = append(localAssign, newAssignDev)
assgnchanges = true assignChange = true
// Remove it from the unassigned devices list // Remove it from the unassigned devices list
unassignedDevices = rmDeviceFromSlice(unassignedDevices[j], unassignedDevices) unassignedDevices = rmDeviceFromSlice(unassignedDevices[j], unassignedDevices)
} else { } else {
...@@ -543,7 +571,7 @@ func reassignDevs() { ...@@ -543,7 +571,7 @@ func reassignDevs() {
} }
// Update the new assigned devices to the global shared list // Update the new assigned devices to the global shared list
if assgnchanges { if assignChange {
fmt.Println("Exporting the new assigned devices list") fmt.Println("Exporting the new assigned devices list")
for _, v := range localAssign { for _, v := range localAssign {
goclidote.AntidoteAddItemToSetInBucket(fmt.Sprintf("device-%d", v.Device.ID), "monitors", ID, dbHost, dbPort) goclidote.AntidoteAddItemToSetInBucket(fmt.Sprintf("device-%d", v.Device.ID), "monitors", ID, dbHost, dbPort)
...@@ -555,7 +583,7 @@ func reassignDevs() { ...@@ -555,7 +583,7 @@ func reassignDevs() {
// // Check for devices assigned locally that are no longer in the global devices list // // Check for devices assigned locally that are no longer in the global devices list
// func checkOutdatedLocalDevices() { // func checkOutdatedLocalDevices() {
// //
// assgnchanges := false // Track changes in the assignation list // assignChange := false // Track changes in the assignation list
// var deldevs []cnml.DeviceMonitorAssignation // Storage for the deleted devices // var deldevs []cnml.DeviceMonitorAssignation // Storage for the deleted devices
// newlocalAssign := localAssign // Duplicate the local list of assigned devices // newlocalAssign := localAssign // Duplicate the local list of assigned devices
// //
...@@ -564,12 +592,12 @@ func reassignDevs() { ...@@ -564,12 +592,12 @@ func reassignDevs() {
// fmt.Println("Removing old device ", v.Device.ID, " from the local assigned devices list") // fmt.Println("Removing old device ", v.Device.ID, " from the local assigned devices list")
// deldevs = append(deldevs, v) // deldevs = append(deldevs, v)
// newlocalAssign = rmDeviceFromAssignationList(v.Device.ID, newlocalAssign) // newlocalAssign = rmDeviceFromAssignationList(v.Device.ID, newlocalAssign)
// assgnchanges = true // assignChange = true
// } // }
// } // }
// //
// // Save the new local assigned devices, if changes have been made // // Save the new local assigned devices, if changes have been made
// if assgnchanges { // if assignChange {
// localAssign = newlocalAssign // localAssign = newlocalAssign
// for _, v := range deldevs { // for _, v := range deldevs {
// goclidote.AntidoteRemoveItemFromSetInBucket(fmt.Sprintf("device-%d", v.Device.ID), "monitors", ID, dbHost, dbPort) // goclidote.AntidoteRemoveItemFromSetInBucket(fmt.Sprintf("device-%d", v.Device.ID), "monitors", ID, dbHost, dbPort)
...@@ -693,24 +721,53 @@ func startcnmlDevicesChecksumTimer() { ...@@ -693,24 +721,53 @@ func startcnmlDevicesChecksumTimer() {
}() }()
} }
func startLocalAssignTimer() { func startAssignTimer() {
localAssignTicker := time.NewTicker(time.Duration(reassignInterval) * time.Second) localAssignTicker := time.NewTicker(time.Duration(reassignInterval) * time.Second)
go func() { go func() {
for range localAssignTicker.C { for range localAssignTicker.C {
// Register this monitor, or update the timestamp
registerMonitorInAntidote() registerMonitorInAntidote()
manageMonitorsList()
// Update CNML devices list
cnmlDevices = getCnmlDevicesList()
// Update the global assignations list from AntidoteDB // Update the global assignations list from AntidoteDB
updateGlobalAssignationList() updateGlobalAssignationList()
sanitizeAssignationsInDatabase()
reassignDevs() // Update the local assignations list to check for outdated devices
updateGlobalAssignationList()
updateLocalAssignationList() updateLocalAssignationList()
// Merge globalAssign into localAssign (local = local OR global)
mergeGlobalAssignIntoLocalAssign()
// Do the actual reassignation of devices to this monitor
reassignDevs()
// Manage the monitors list
manageMonitorsList()
// Sanitize the global assignations in the shared database
sanitizeAssignationsInDatabase()
} }
}() }()
} }
// Register this monitor in the shared database
func registerMonitorInAntidote() { func registerMonitorInAntidote() {
// Update its timestamp
updateMonitorTimestampInAntidote() updateMonitorTimestampInAntidote()
// Get the current monitors list
monitors := goclidote.AntidoteReadItemsFromSetInBucket("guifi", "monitors", dbHost, dbPort)
// If this monitor is already there, return
for _, v := range monitors {
if v == ID {
return
}
}
// Write the monitor to the shared database otherwise
goclidote.AntidoteAddItemToSetInBucket("guifi", "monitors", ID, dbHost, dbPort) goclidote.AntidoteAddItemToSetInBucket("guifi", "monitors", ID, dbHost, dbPort)
} }
...@@ -721,7 +778,7 @@ func updateMonitorTimestampInAntidote() { ...@@ -721,7 +778,7 @@ func updateMonitorTimestampInAntidote() {
} }
func updateMonitorMaxDevsInAntidote() { func updateMonitorMaxDevsInAntidote() {
goclidote.AntidoteSetRegisterInBucket("guifi", fmt.Sprintf("%d_maxdevs", maxDevs), fmt.Sprintf("%d", maxDevs), dbHost, dbPort) goclidote.AntidoteSetRegisterInBucket("guifi", fmt.Sprintf("%s_maxdevs", ID), fmt.Sprintf("%d", maxDevs), dbHost, dbPort)
fmt.Println("Setting capacity to", maxDevs) fmt.Println("Setting capacity to", maxDevs)
} }
...@@ -748,7 +805,7 @@ func calculateGlobalAverageLoad() float64 { ...@@ -748,7 +805,7 @@ func calculateGlobalAverageLoad() float64 {
var monitorAssignedDevs []int var monitorAssignedDevs []int
for _, v := range monitors { for _, v := range monitors {
capacity, err := strconv.ParseInt(goclidote.AntidoteReadRegisterInBucket("guifi", fmt.Sprintf("%d_maxdevs", maxDevs), dbHost, dbPort), 10, 64) capacity, err := strconv.ParseInt(goclidote.AntidoteReadRegisterInBucket("guifi", fmt.Sprintf("%s_maxdevs", v), dbHost, dbPort), 10, 64)
errCheck(err, WARNING, fmt.Sprintf("Monitor %s has an invalid maxDev", v)) errCheck(err, WARNING, fmt.Sprintf("Monitor %s has an invalid maxDev", v))
if err == nil && capacity > 0 { if err == nil && capacity > 0 {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment