Developing lightweight computation at the DSG edge

Commit 0ce77862 authored by Roger Pueyo Centelles's avatar Roger Pueyo Centelles
Browse files

[assign] AntidoteDB interaction (WiP)



 - Register monitor to AntidoteDB
 - Unregister monitor on exit
 - Save monitor ID to assigned devices
Signed-off-by: default avatarRoger Pueyo Centelles <roger.pueyo@guifi.net>
parent 285ab6c8
......@@ -9,13 +9,15 @@ import (
"math"
"math/rand"
"os"
"os/signal"
"strconv"
"strings"
"time"
"github.com/golang/glog"
"uc-monitor-go-test/apidote"
"uc-monitor-go-test/cnml"
"github.com/golang/glog"
)
// Error levels
......@@ -36,57 +38,125 @@ var WARNING = 4
var DEFAULTFILEMODE os.FileMode = 0755
//Default settings and descriptions
// idNum holds the monitor id, in numberic form
var idNum int
// ID represents the ID of the monitor, in alfanumeric form
var ID = "0"
// idHelp usage description help
var idHelp = "Numeric ID of this monitoring instance"
// maxDevs defines the default maximum number of nodes a monitor is assigned to
var maxDevs = 1000
var maxDevsHelp = "Maximum number of devices to monitor"
var tmpDir = "/tmp/gmonitor2/"
// tmpDirHelp usage description help for tmpDir
// tmpDir sets the working temporary directory
var tmpDir = "/tmp/gmonitor2/"
var tmpDirHelp = "Temporary directory where to read and write files"
var devsFile = "devs.json"
// devsFileHelp usage description help for devsFile
// devsFile contains the filename of the file storing the devices
var devsFile = "devs.json"
var devsFileHelp = "Filename of the JSON file containing the list of devices to monitor and their IPv4 addresses"
var devsChecksumFile = "devs.checksum"
// idHelp usage description help
// devsChecksumFile contains the filename of the file containing the latest CNML checksum
var devsChecksumFile = "devs.checksum"
var devsChecksumFileHelp = "Filename of the text file containing the checksum of the list of devices to monitor"
var assignDir = fmt.Sprintf("%sassign/", tmpDir)
// idHelp usage description help
var assignDir = fmt.Sprintf("%sassign/", tmpDir)
var assignDirHelp = "Temporary directory where to write local and read foreign device<=>monitors assignation files"
var devsCheckInterval = 5 //60
// idHelp usage description help
var devsCheckIntervalHelp = "Interval (in seconds) in which to check for changes in the devices list"
var reassignInterval = 5 //60
// idHelp usage description help
var reassignIntervalHelp = "Interval (in seconds) in which to update the local devices assignation list"
var globalAssignCheckInterval = 5 //60
// idHelp usage description help
var globalAssignCheckIntervalHelp = "Interval (in seconds) in which to check for changes in the global devices assignation list"
//Global variables
var allDevs []cnml.DeviceIpv4Adddresses //The global list with all the fetched devices and their IPv4 addresses
// Global data structures
var allDevs []cnml.DeviceIpv4sGraphserver //The global list with all the fetched devices and their IPv4 addresses
var allDevsChecksum string //The checksum of the last global devices list and IPv4 addresses fetched
var localAssign []cnml.DeviceIpv4Adddresses //The list of locally assigned devices and their IPv4 addresses
var localAssign []cnml.DeviceIpv4sGraphserver //The list of locally assigned devices and their IPv4 addresses
var localAssignChecksum string //The checksum of the local assigned devices list
var globalAssign []cnml.DeviceMonitorAssignation //The global assignation list of devices<=>monitors
var globalAssignChecksums []cnml.AssignChecksum //The checksum of the foreign assigned devices list
// The main function
func main() {
//Initialize the service (read the configuration flags, etc.)
initialize()
// Handle interrupt signals and gracefully terminate
signalChan := make(chan os.Signal, 1)
cleanupDone := make(chan bool)
signal.Notify(signalChan, os.Interrupt)
go func() {
for _ = range signalChan {
fmt.Println("\nReceived an interrupt, deregistering and quitting...")
deregisterMonitorInAntidote()
cleanupDone <- true
}
}()
<-cleanupDone
fmt.Println("\nDone!")
}
// The initialize function manages the execution parameters, sets timers, etc.
func initialize() {
// Manage command line flags and execution settings
initializeFlags()
// Initialize temporary working directories
initializeDirs()
//Register as a monitor to Antidote's list of active Monitors
registerMonitorInAntidote()
//Import the devices and IPv4 addresses list from the JSON file, and the checksum
// Now from Antidote
//allDevs = readDevsaddsFile(fmt.Sprintf("%s/%s", tmpDir, devsFile), ERROR)
allDevs = apidote.ReadDevicesFromAntidote()
setCurrentAllDevsChecksum()
//Import the global devices assignation lists
updateGlobalImportAllAssignFiles()
initializeTimers()
//Block forever
fmt.Println("Entering infinite loop...")
//TODO: What is this select{} call doing here???
//select {}
}
func initializeDirs() {
//Create the temporary dir, along with any necessary parents
err := os.MkdirAll(tmpDir, DEFAULTFILEMODE)
errCheck(err, FATAL, fmt.Sprintf("Unable to create temporary directory %s", tmpDir))
//Create the assignation dir, along with any necessary parents
err = os.MkdirAll(assignDir, DEFAULTFILEMODE)
errCheck(err, FATAL, fmt.Sprintf("Unable to create temporary directory %s", assignDir))
}
func initializeTimers() {
//Initialize allDevs checksum changes check timer
startAllDevsChecksumTimer()
//Initialize the local assignation check timer
startGlobalAssignTimer()
//Initialize allDevs checksum changes check timer
startLocalAssignTimer()
}
// The initializeFlags function initializes the command line flags
func initializeFlags() {
//Define and parse command line flags
idInt, _ := strconv.ParseInt(ID, 10, 32)
idPtr := flag.Int("id", int(idInt), idHelp)
......@@ -120,7 +190,6 @@ func initialize() {
//Directories
tmpDir = *tmpDirPtr
assignDir = *assignDirPtr
initializeDirs()
//Devices and addresses list related files
devsFile = *devsFilePtr
......@@ -136,48 +205,11 @@ func initialize() {
if *globalAssignCheckIntervalPtr > 0 {
globalAssignCheckInterval = *globalAssignCheckIntervalPtr
}
//Import the devices and IPv4 addresses list from the JSON file, and the checksum
//allDevs = readDevsaddsFile(fmt.Sprintf("%s/%s", tmpDir, devsFile), ERROR)
allDevs = apidote.ReadDevicesFromAntidote()
setCurrentAllDevsChecksum()
//Import the global devices assignation lists
updateGlobalImportAllAssignFiles()
initializeTimers()
//Block forever
fmt.Println("Entering infinite loop...")
select {}
}
func initializeDirs() {
//Create the temporary dir, along with any necessary parents
err := os.MkdirAll(tmpDir, DEFAULTFILEMODE)
errCheck(err, FATAL, fmt.Sprintf("Unable to create temporary directory %s", tmpDir))
//Create the assignation dir, along with any necessary parents
err = os.MkdirAll(assignDir, DEFAULTFILEMODE)
errCheck(err, FATAL, fmt.Sprintf("Unable to create temporary directory %s", assignDir))
}
func initializeTimers() {
//Initialize allDevs checksum changes check timer
startAllDevsChecksumTimer()
//Initialize the local assignation check timer
startGlobalAssignTimer()
//Initialize allDevs checksum changes check timer
startLocalAssignTimer()
}
//Read a devices and IPv4 addresses list from a JSON file
func readDevsaddsFile(filename string, errLevel int) []cnml.DeviceIpv4Adddresses {
var newdevs []cnml.DeviceIpv4Adddresses
// readDevsaddsFile reads a devices and IPv4 addresses list from a JSON file
func readDevsaddsFile(filename string, errLevel int) []cnml.DeviceIpv4sGraphserver {
var newdevs []cnml.DeviceIpv4sGraphserver
jdevsaddrsFile, err := os.Open(filename)
errCheck(err, errLevel, fmt.Sprintf("Error opening file %s", filename))
......@@ -195,7 +227,7 @@ func readDevsaddsFile(filename string, errLevel int) []cnml.DeviceIpv4Adddresses
//Unmarshal the JSON objects to DeviceAddresses objects
for _, v := range lines {
var devaddr cnml.DeviceIpv4Adddresses
var devaddr cnml.DeviceIpv4sGraphserver
err := json.Unmarshal([]byte(v), &devaddr)
errCheck(err, INFO, "TODO")
newdevs = append(newdevs, devaddr)
......@@ -205,7 +237,7 @@ func readDevsaddsFile(filename string, errLevel int) []cnml.DeviceIpv4Adddresses
return newdevs
}
func updateDevsaddsFile(filename string, currdevsadds []cnml.DeviceIpv4Adddresses) []cnml.DeviceIpv4Adddresses {
func updateDevsaddsFile(filename string, currdevsadds []cnml.DeviceIpv4sGraphserver) []cnml.DeviceIpv4sGraphserver {
fdevs := readDevsaddsFile(filename, ERROR)
......@@ -391,22 +423,27 @@ func reassignDevs() {
fmt.Println("\nReassignation of devices")
fmt.Println(len(localAssign), "devices currently assigned (maximum:", maxDevs, "devices)")
assgnchanges := false
var deldevs []cnml.DeviceIpv4Adddresses
newassigndevs := localAssign
assgnchanges := false // Track changes in the assignation list
var deldevs []cnml.DeviceIpv4sGraphserver // Storage for the deleted devices
newlocalAssign := localAssign // Duplicate the local list of assigned devices
// Check if all the devices assigned are still in the global list
// Check for devices assigned locally that are no longer in the global devices list
// TODO check this loop
for _, v := range localAssign {
if !isDeviceInSlice(v, allDevs) {
//TODO check this loop
fmt.Println("Removing old device ", v.ID, " from the assigned devices list")
fmt.Println("Removing old device ", v.ID, " from the local assigned devices list")
deldevs = append(deldevs, v)
newassigndevs = rmDeviceFromSlice(v, newassigndevs)
newlocalAssign = rmDeviceFromSlice(v, newlocalAssign)
assgnchanges = true
}
}
// Save the new local assigned devices, if changes have been made
if assgnchanges {
localAssign = newassigndevs
localAssign = newlocalAssign
for _, v := range deldevs {
apidote.AntidoteRemoveItemFromSetInBucket(fmt.Sprintf("device-%d", v.ID), "monitors", ID)
}
}
// If less than maximum devices are assigned and more can be assigned,
......@@ -414,56 +451,53 @@ func reassignDevs() {
// in 1 hour, until the maximum is reached
if (len(localAssign) < maxDevs) && (len(localAssign) < len(allDevs)) {
//Assign the following new nodes
// toassign := int(math.Min(float64(int(math.Min(math.Min(float64(len(allDevs)), float64(maxDevs))*float64(devsCheckInterval)/3600+1, float64(len(allDevs)-len(localAssign))))), float64(maxDevs-len(localAssign))))
//Be quick, do it in 1 min
toassign := int(math.Min(float64(int(math.Min(math.Min(float64(len(allDevs)), float64(maxDevs))*float64(devsCheckInterval)/60+1, float64(len(allDevs)-len(localAssign))))), float64(maxDevs-len(localAssign))))
// Assign new nodes
fillingTime := 60.0 // Add decimal to force float64 type // Use 60 instead of 3600; be quick and do it in 1 min
toassign := int(math.Min(float64(int(math.Min(math.Min(float64(len(allDevs)), float64(maxDevs))*float64(devsCheckInterval)/fillingTime+1, float64(len(allDevs)-len(localAssign))))), float64(maxDevs-len(localAssign))))
fmt.Println("Assigning", toassign, "new devices")
//Create a list with all the unassigned devices
var unassigned []cnml.DeviceIpv4Adddresses
var unassignedDevices []cnml.DeviceIpv4sGraphserver
for _, v := range allDevs {
if !isDeviceInSlice(v, localAssign) {
unassigned = append(unassigned, v)
unassignedDevices = append(unassignedDevices, v)
}
}
//Create a list with all the unassigned devices that are assigned via web to this monitor
var assignweb []cnml.DeviceIpv4sGraphserver
// for _, v := range unassigned {
// // if fmt.Sprint(v.GraphServer) == ID {
// // assignweb = append(assignweb, v)
// // } else {
// // }
//
// }
var assignGraphserver []cnml.DeviceIpv4sGraphserver
for _, v := range unassignedDevices {
if strconv.Itoa(v.GraphServer) == ID {
assignGraphserver = append(assignGraphserver, v)
}
}
fmt.Println(len(unassigned), "devices unassigned, of which", len(assignweb), "are assigned via web")
fmt.Println(len(unassignedDevices), "devices unassigned, of which", len(assignGraphserver), "are assigned via web")
fmt.Println("Picking", toassign, "nodes randomly")
//TODO: if the number of nodes nodes assigned via the web is greater than the maxDevs value, some of them will
// never be assigned
for i := 0; i < toassign; i++ {
if len(assignweb) > 0 {
if len(assignGraphserver) > 0 {
//Pick a random device from the assignweb list
randsource := rand.New(rand.NewSource(time.Now().UnixNano()))
j := randsource.Intn(len(assignweb))
j := randsource.Intn(len(assignGraphserver))
//Save it to the assigned devices list
localAssign = append(localAssign, assignweb[j])
localAssign = append(localAssign, assignGraphserver[j])
assgnchanges = true
//Remove it from the unassigned devices list
unassigned = rmDeviceFromSlice(assignweb[j], unassigned)
assignweb = rmDeviceFromSlice(assignweb[j], assignweb)
} else if len(unassigned) > 0 {
unassignedDevices = rmDeviceFromSlice(assignGraphserver[j], unassignedDevices)
assignGraphserver = rmDeviceFromSlice(assignGraphserver[j], assignGraphserver)
} else if len(unassignedDevices) > 0 {
//Pick a random device from the unassigned list
randsource := rand.New(rand.NewSource(time.Now().UnixNano()))
j := randsource.Intn(len(unassigned))
j := randsource.Intn(len(unassignedDevices))
//Save it to the assigned devices list
localAssign = append(localAssign, unassigned[j])
localAssign = append(localAssign, unassignedDevices[j])
assgnchanges = true
//Remove it from the unassigned devices list
unassigned = rmDeviceFromSlice(unassigned[j], unassigned)
unassignedDevices = rmDeviceFromSlice(unassignedDevices[j], unassignedDevices)
} else {
fmt.Println(toassign-i, "more devices were to be assigned, but no unassigned devices remain")
break
......@@ -476,11 +510,14 @@ func reassignDevs() {
if assgnchanges {
fmt.Println("Exporting the new assigned devices list")
for _, v := range localAssign {
apidote.AntidoteAddItemToSetInBucket(fmt.Sprintf("device-%d", v.ID), "monitors", ID)
}
exportDevsaddrsFile(fmt.Sprintf("%s/%s.json", assignDir, ID))
}
}
func posDeviceInSlice(device cnml.DeviceIpv4Adddresses, slice []cnml.DeviceIpv4Adddresses) int {
func posDeviceInSlice(device cnml.DeviceIpv4sGraphserver, slice []cnml.DeviceIpv4sGraphserver) int {
for k, v := range slice {
if device.ID == v.ID {
return k
......@@ -489,7 +526,7 @@ func posDeviceInSlice(device cnml.DeviceIpv4Adddresses, slice []cnml.DeviceIpv4A
return -1
}
func isDeviceInSlice(device cnml.DeviceIpv4Adddresses, slice []cnml.DeviceIpv4Adddresses) bool {
func isDeviceInSlice(device cnml.DeviceIpv4sGraphserver, slice []cnml.DeviceIpv4sGraphserver) bool {
if posDeviceInSlice(device, slice) >= 0 {
return true
}
......@@ -529,7 +566,7 @@ func isMonitorInSlice(id string, slice []cnml.AssignChecksum) bool {
return false
}
func rmDeviceFromSlice(device cnml.DeviceIpv4Adddresses, slice []cnml.DeviceIpv4Adddresses) []cnml.DeviceIpv4Adddresses {
func rmDeviceFromSlice(device cnml.DeviceIpv4sGraphserver, slice []cnml.DeviceIpv4sGraphserver) []cnml.DeviceIpv4sGraphserver {
if isDeviceInSlice(device, slice) {
pos := posDeviceInSlice(device, slice)
......@@ -566,6 +603,14 @@ func startLocalAssignTimer() {
}()
}
func registerMonitorInAntidote() {
apidote.AntidoteAddItemToSetInBucket("guifi", "monitors", ID)
}
func deregisterMonitorInAntidote() {
apidote.AntidoteRemoveItemFromSetInBucket("guifi", "monitors", ID)
}
func errCheck(e error, level int, message string) {
if message == "" {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment