Developing lightweight computation at the DSG edge

Commit 1f998b85 authored by Roger Pueyo Centelles's avatar Roger Pueyo Centelles
Browse files

[assign] AntidoteDB interaction (WiP)



 - Restructure workflow
Signed-off-by: default avatarRoger Pueyo Centelles <roger.pueyo@guifi.net>
parent 042e36df
......@@ -58,23 +58,24 @@ var globalAssignCheckInterval = 5 //60
var globalAssignCheckIntervalHelp = "Interval (in seconds) in which to check for changes in the global devices assignation list"
// Global data structures
var allDevs []cnml.DeviceIpv4sGraphserver //The global list with all the fetched devices and their IPv4 addresses
var allDevsChecksum string //The checksum of the last global devices list and IPv4 addresses fetched
var localAssign []cnml.DeviceIpv4sGraphserver //The list of locally assigned devices and their IPv4 addresses
var cnmlDevices []cnml.DeviceIpv4sGraphserver //The global list with all the fetched devices and their IPv4 addresses
var cnmlDevicesChecksum string //The checksum of the last global devices list and IPv4 addresses fetched
var localAssign []cnml.DeviceMonitorAssignation //The list of locally assigned devices and their IPv4 addresses
var globalAssign []cnml.DeviceMonitorAssignation //The global assignation list of devices<=>monitors
// The main function
func main() {
//Initialize the service (read the configuration flags, etc.)
initialize()
// Handle interrupt signals and gracefully terminate
// Handle interrupt signals in order to terminate gracefully
signalChan := make(chan os.Signal, 1)
cleanupDone := make(chan bool)
signal.Notify(signalChan, os.Interrupt)
go func() {
for _ = range signalChan {
fmt.Println("\nReceived an interrupt, deregistering and quitting...")
// TODO: remove this monitor's ID from all nodes
// removeMonitorFromAllNodes()
deregisterMonitorInAntidote()
cleanupDone <- true
}
......@@ -83,40 +84,36 @@ func main() {
fmt.Println("\nDone!")
}
// The initialize function manages the execution parameters, sets timers, etc.
// The initialize() function manages the execution parameters, sets timers, etc.
func initialize() {
fmt.Println("Initializing...")
// Manage command line flags and execution settings
initializeFlags()
//Register as a monitor to Antidote's list of active Monitors
// Register as a monitor to Antidote's list of active Monitors
registerMonitorInAntidote()
//Import the devices and IPv4 addresses list from the JSON file, and the checksum
// Now from Antidote
//allDevs = readDevsaddsFile(fmt.Sprintf("%s/%s", tmpDir, devsFile), ERROR)
allDevs = apidote.ReadDevicesFromAntidote()
//Import the global devices assignation lists
// Import the global devices list and the global assignation lists
cnmlDevices = getCnmlDevicesList()
updateGlobalAssignationList()
// Initialize periodic timers
initializeTimers()
//Block forever
fmt.Println("Entering infinite loop...")
//TODO: What is this select{} call doing here???
//select {}
// The application is now ininitialized; entering the loop
fmt.Println("Initialization done. Entering infinite loop...")
}
func initializeTimers() {
//Initialize allDevs checksum changes check timer
startAllDevsChecksumTimer()
// Initialize cnmlDevices checksum changes check timer
startcnmlDevicesChecksumTimer()
//Initialize the local assignation check timer
// Initialize the local assignation check timer
startGlobalAssignTimer()
//Initialize allDevs checksum changes check timer
// Initialize cnmlDevices checksum changes check timer
startLocalAssignTimer()
}
......@@ -191,176 +188,103 @@ func readDevsaddsFile(filename string, errLevel int) []cnml.DeviceIpv4sGraphserv
return newdevs
}
func updateDevsaddsFile(filename string, currdevsadds []cnml.DeviceIpv4sGraphserver) []cnml.DeviceIpv4sGraphserver {
fdevs := readDevsaddsFile(filename, ERROR)
for _, v := range fdevs {
if !isDeviceInSlice(v, currdevsadds) {
currdevsadds = append(currdevsadds, v)
}
}
return currdevsadds
}
//Export the assigned devices and IPv4 addresses list to a file
func exportDevsaddrsFile(filename string) {
//afile, err := os.Create(fmt.Sprintf("%s%s.json", assignDir, id))
exportfile, err := os.Create(filename)
errCheck(err, INFO, "TODO")
defer exportfile.Close()
for _, v := range localAssign {
jda, err := json.Marshal(v)
errCheck(err, INFO, "TODO")
exportfile.WriteString(string(jda))
exportfile.WriteString("\n")
}
}
//Read a text file containing a SHA256 checksum on its first line
func readChecksumFile(filename string, errLevel int) string {
cfile, err := os.Open(filename)
errCheck(err, errLevel, fmt.Sprintf("Error opening file %s", filename))
defer cfile.Close()
var sha256 string
scanner := bufio.NewScanner(cfile)
scanner.Scan()
sha256 = scanner.Text()
return sha256
// Get the global devices list from AntidoteDB
func getCnmlDevicesList() []cnml.DeviceIpv4sGraphserver {
return apidote.ReadDevicesFromAntidote()
}
//Import all the monitor=>device assignations
func updateGlobalAssignationList() {
// Update alldevs
allDevs = apidote.ReadDevicesFromAntidote()
for _, v := range allDevs {
for _, v := range cnmlDevices {
if isDeviceInAssignationSlice(v.ID, globalAssign) {
globalAssign[posDeviceInAssignationSlice(v.ID, globalAssign)].MonitorID = apidote.AntidoteReadItemsFromSetInBucket(fmt.Sprintf("device-%d", v.ID), "mo")
globalAssign[posDeviceInAssignationSlice(v.ID, globalAssign)].MonitorID = apidote.AntidoteReadItemsFromSetInBucket(fmt.Sprintf("device-%d", v.ID), "monitors")
} else {
var newDev cnml.DeviceMonitorAssignation
newDev.DeviceID = v.ID
newDev.MonitorID = apidote.AntidoteReadItemsFromSetInBucket(fmt.Sprintf("device-%d", v.ID), "monitors")
globalAssign = append(globalAssign, newDev)
}
}
newglobalAssign := globalAssign
for _, v := range globalAssign {
if !isDeviceInSliceByID(v.DeviceID, allDevs) {
if !isDeviceInSliceByID(v.DeviceID, cnmlDevices) {
rmDeviceFromAssignationList(v.DeviceID, newglobalAssign)
}
}
}
func assignDeviceMonitor(assgnlist []cnml.DeviceMonitorAssignation, deviceID int, monitorID string) []cnml.DeviceMonitorAssignation {
if !isDeviceInAssignationSlice(deviceID, assgnlist) {
var nassgn cnml.DeviceMonitorAssignation
nassgn.DeviceID = deviceID
nassgn.MonitorID = []string{monitorID}
assgnlist = append(assgnlist, nassgn)
} else {
var tassgn = assgnlist[posDeviceInAssignationSlice(deviceID, assgnlist)]
func updateLocalAssignationList() {
inMonList := false
for _, v := range tassgn.MonitorID {
if v == monitorID {
inMonList = true
}
}
var newlocalAssign []cnml.DeviceMonitorAssignation
if !inMonList {
//Fixme: Use pointers here
assgnlist[posDeviceInAssignationSlice(deviceID, assgnlist)].MonitorID = append(assgnlist[posDeviceInAssignationSlice(deviceID, assgnlist)].MonitorID, monitorID)
for _, v := range globalAssign {
for _, w := range v.MonitorID {
if w == ID {
newlocalAssign = append(newlocalAssign, v)
break
}
}
}
return assgnlist
localAssign = newlocalAssign
}
func reassignDevs() {
fmt.Println("\nReassignation of devices")
fmt.Println(cnmlDevices)
fmt.Println(globalAssign)
fmt.Println(localAssign)
fmt.Println(len(localAssign), "devices currently assigned (maximum:", maxDevs, "devices)")
assgnchanges := false // Track changes in the assignation list
var deldevs []cnml.DeviceIpv4sGraphserver // Storage for the deleted devices
newlocalAssign := localAssign // Duplicate the local list of assigned devices
// Check for devices assigned locally that are no longer in the global devices list
// TODO check this loop
for _, v := range localAssign {
if !isDeviceInSlice(v, allDevs) {
fmt.Println("Removing old device ", v.ID, " from the local assigned devices list")
deldevs = append(deldevs, v)
newlocalAssign = rmDeviceFromSlice(v, newlocalAssign)
assgnchanges = true
}
}
// Save the new local assigned devices, if changes have been made
if assgnchanges {
localAssign = newlocalAssign
// Update the CNML devices list from AntidoteDB
cnmlDevices = getCnmlDevicesList()
for _, v := range deldevs {
apidote.AntidoteRemoveItemFromSetInBucket(fmt.Sprintf("device-%d", v.ID), "monitors", ID)
}
}
// Update the global assignations list from AntidoteDB
updateGlobalAssignationList()
// Update the local assignations list from AntidoteDB
updateLocalAssignationList()
// Add to the local assignation list those devices that are assigned via Graphserver
addDevicesAssignedByGraphserver()
assgnchanges := false // Track changes in the assignation list
// If less than maximum devices are assigned and more can be assigned,
// pick some at random, at a maximum rate so that the maximum is reached
// in 1 hour, until the maximum is reached
if (len(localAssign) < maxDevs) && (len(localAssign) < len(allDevs)) {
if (len(localAssign) < maxDevs) && (len(localAssign) < len(cnmlDevices)) {
// Assign new nodes
fillingTime := 60.0 // Add decimal to force float64 type // Use 60 instead of 3600; be quick and do it in 1 min
toassign := int(math.Min(float64(int(math.Min(math.Min(float64(len(allDevs)), float64(maxDevs))*float64(devsCheckInterval)/fillingTime+1, float64(len(allDevs)-len(localAssign))))), float64(maxDevs-len(localAssign))))
toassign := int(math.Min(float64(int(math.Min(math.Min(float64(len(cnmlDevices)), float64(maxDevs))*float64(devsCheckInterval)/fillingTime+1, float64(len(cnmlDevices)-len(localAssign))))), float64(maxDevs-len(localAssign))))
fmt.Println("Assigning", toassign, "new devices")
//Create a list with all the unassigned devices
var unassignedDevices []cnml.DeviceIpv4sGraphserver
for _, v := range allDevs {
if !isDeviceInSlice(v, localAssign) {
for _, v := range cnmlDevices {
if (!isDeviceInAssignationSlice(v.ID, globalAssign) || len(globalAssign[posDeviceInAssignationSlice(v.ID, globalAssign)].MonitorID) < 1) && !isDeviceInAssignationSlice(v.ID, localAssign) {
unassignedDevices = append(unassignedDevices, v)
}
}
//Create a list with all the unassigned devices that are assigned via web to this monitor
var assignGraphserver []cnml.DeviceIpv4sGraphserver
for _, v := range unassignedDevices {
if strconv.Itoa(v.GraphServer) == ID {
assignGraphserver = append(assignGraphserver, v)
}
}
fmt.Println(len(unassignedDevices), "devices unassigned, of which", len(assignGraphserver), "are assigned via web")
fmt.Println(len(unassignedDevices), "devices unassigned") //, of which", len(assignGraphserver), "are assigned via web")
fmt.Println("Picking", toassign, "nodes randomly")
//TODO: if the number of nodes nodes assigned via the web is greater than the maxDevs value, some of them will
// never be assigned
for i := 0; i < toassign; i++ {
if len(assignGraphserver) > 0 {
//Pick a random device from the assignweb list
randsource := rand.New(rand.NewSource(time.Now().UnixNano()))
j := randsource.Intn(len(assignGraphserver))
//Save it to the assigned devices list
localAssign = append(localAssign, assignGraphserver[j])
assgnchanges = true
//Remove it from the unassigned devices list
unassignedDevices = rmDeviceFromSlice(assignGraphserver[j], unassignedDevices)
assignGraphserver = rmDeviceFromSlice(assignGraphserver[j], assignGraphserver)
} else if len(unassignedDevices) > 0 {
if len(unassignedDevices) > 0 {
//Pick a random device from the unassigned list
randsource := rand.New(rand.NewSource(time.Now().UnixNano()))
j := randsource.Intn(len(unassignedDevices))
//Save it to the assigned devices list
localAssign = append(localAssign, unassignedDevices[j])
var newAssignDev cnml.DeviceMonitorAssignation
newAssignDev.DeviceID = unassignedDevices[j].ID
newAssignDev.MonitorID = append(newAssignDev.MonitorID, ID)
localAssign = append(localAssign, newAssignDev)
assgnchanges = true
//Remove it from the unassigned devices list
unassignedDevices = rmDeviceFromSlice(unassignedDevices[j], unassignedDevices)
......@@ -377,7 +301,52 @@ func reassignDevs() {
if assgnchanges {
fmt.Println("Exporting the new assigned devices list")
for _, v := range localAssign {
apidote.AntidoteAddItemToSetInBucket(fmt.Sprintf("device-%d", v.ID), "monitors", ID)
apidote.AntidoteAddItemToSetInBucket(fmt.Sprintf("device-%d", v.DeviceID), "monitors", ID)
}
}
}
// // Check for devices assigned locally that are no longer in the global devices list
// func checkOutdatedLocalDevices() {
//
// assgnchanges := false // Track changes in the assignation list
// var deldevs []cnml.DeviceIpv4sGraphserver // Storage for the deleted devices
// newlocalAssign := localAssign // Duplicate the local list of assigned devices
//
// for _, v := range localAssign {
// if !isDeviceInSlice(v, cnmlDevices) {
// fmt.Println("Removing old device ", v.ID, " from the local assigned devices list")
// deldevs = append(deldevs, v)
// newlocalAssign = rmDeviceFromSlice(v, newlocalAssign)
// assgnchanges = true
// }
// }
//
// // Save the new local assigned devices, if changes have been made
// if assgnchanges {
// localAssign = newlocalAssign
// for _, v := range deldevs {
// apidote.AntidoteRemoveItemFromSetInBucket(fmt.Sprintf("device-%d", v.ID), "monitors", ID)
// }
// }
// }
func addDevicesAssignedByGraphserver() {
for _, v := range cnmlDevices {
if isDeviceInAssignationSlice(v.ID, localAssign) {
graphserver := false
deviceMonitors := globalAssign[posDeviceInAssignationSlice(v.ID, localAssign)].MonitorID
for _, w := range deviceMonitors {
if w == ID {
graphserver = true
break
}
}
if !graphserver {
globalAssign[posDeviceInAssignationSlice(v.ID, localAssign)].MonitorID = append(deviceMonitors, ID)
}
}
}
}
......@@ -468,12 +437,12 @@ func rmDeviceFromAssignationList(DeviceID int, slice []cnml.DeviceMonitorAssigna
return slice
}
func startAllDevsChecksumTimer() {
allDevsChecksumTicker := time.NewTicker(time.Duration(devsCheckInterval) * time.Second)
func startcnmlDevicesChecksumTimer() {
cnmlDevicesChecksumTicker := time.NewTicker(time.Duration(devsCheckInterval) * time.Second)
go func() {
for range allDevsChecksumTicker.C {
for range cnmlDevicesChecksumTicker.C {
// TODO: global checksum via AntidoteDB must be implemented in fetch app
// checkAllDevsChecksum()
// checkcnmlDevicesChecksum()
}
}()
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment