Developing lightweight computation at the DSG edge

Commit f0bea33a authored by Roger Pueyo Centelles's avatar Roger Pueyo Centelles
Browse files

[assign] AntidoteDB interaction (WiP)



Debris cleanup
Signed-off-by: default avatarRoger Pueyo Centelles <roger.pueyo@guifi.net>
parent 74917e3f
......@@ -5,13 +5,11 @@ import (
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"math"
"math/rand"
"os"
"os/signal"
"strconv"
"strings"
"time"
"uc-monitor-go-test/apidote"
......@@ -50,21 +48,6 @@ var idHelp = "Numeric ID of this monitoring instance"
var maxDevs = 1000
var maxDevsHelp = "Maximum number of devices to monitor"
// tmpDir sets the working temporary directory
var tmpDir = "/tmp/gmonitor2/"
var tmpDirHelp = "Temporary directory where to read and write files"
// devsFile contains the filename of the file storing the devices
var devsFile = "devs.json"
var devsFileHelp = "Filename of the JSON file containing the list of devices to monitor and their IPv4 addresses"
// devsChecksumFile contains the filename of the file containing the latest CNML checksum
var devsChecksumFile = "devs.checksum"
var devsChecksumFileHelp = "Filename of the text file containing the checksum of the list of devices to monitor"
var assignDir = fmt.Sprintf("%sassign/", tmpDir)
var assignDirHelp = "Temporary directory where to write local and read foreign device<=>monitors assignation files"
var devsCheckInterval = 5 //60
var devsCheckIntervalHelp = "Interval (in seconds) in which to check for changes in the devices list"
......@@ -78,9 +61,7 @@ var globalAssignCheckIntervalHelp = "Interval (in seconds) in which to check for
var allDevs []cnml.DeviceIpv4sGraphserver //The global list with all the fetched devices and their IPv4 addresses
var allDevsChecksum string //The checksum of the last global devices list and IPv4 addresses fetched
var localAssign []cnml.DeviceIpv4sGraphserver //The list of locally assigned devices and their IPv4 addresses
var localAssignChecksum string //The checksum of the local assigned devices list
var globalAssign []cnml.DeviceMonitorAssignation //The global assignation list of devices<=>monitors
var globalAssignChecksums []cnml.AssignChecksum //The checksum of the foreign assigned devices list
// The main function
func main() {
......@@ -108,9 +89,6 @@ func initialize() {
// Manage command line flags and execution settings
initializeFlags()
// Initialize temporary working directories
initializeDirs()
//Register as a monitor to Antidote's list of active Monitors
registerMonitorInAntidote()
......@@ -119,8 +97,6 @@ func initialize() {
//allDevs = readDevsaddsFile(fmt.Sprintf("%s/%s", tmpDir, devsFile), ERROR)
allDevs = apidote.ReadDevicesFromAntidote()
setCurrentAllDevsChecksum()
//Import the global devices assignation lists
updateGlobalAssignationList()
......@@ -133,16 +109,6 @@ func initialize() {
//select {}
}
func initializeDirs() {
//Create the temporary dir, along with any necessary parents
err := os.MkdirAll(tmpDir, DEFAULTFILEMODE)
errCheck(err, FATAL, fmt.Sprintf("Unable to create temporary directory %s", tmpDir))
//Create the assignation dir, along with any necessary parents
err = os.MkdirAll(assignDir, DEFAULTFILEMODE)
errCheck(err, FATAL, fmt.Sprintf("Unable to create temporary directory %s", assignDir))
}
func initializeTimers() {
//Initialize allDevs checksum changes check timer
startAllDevsChecksumTimer()
......@@ -161,10 +127,6 @@ func initializeFlags() {
idInt, _ := strconv.ParseInt(ID, 10, 32)
idPtr := flag.Int("id", int(idInt), idHelp)
maxDevsPtr := flag.Int("maxDevs", maxDevs, maxDevsHelp)
tmpDirPtr := flag.String("tmpDir", tmpDir, tmpDirHelp)
devsFilePtr := flag.String("devsFile", devsFile, devsFileHelp)
devsChecksumFilePtr := flag.String("devsChecksumFile", devsChecksumFile, devsChecksumFileHelp)
assignDirPtr := flag.String("assignDir", assignDir, assignDirHelp)
devsCheckIntervalPtr := flag.Int("devsCheckInterval", devsCheckInterval, devsCheckIntervalHelp)
reassignIntervalPtr := flag.Int("reassignInterval", reassignInterval, reassignIntervalHelp)
globalAssignCheckIntervalPtr := flag.Int("globalAssignCheckInterval", globalAssignCheckInterval, globalAssignCheckIntervalHelp)
......@@ -187,14 +149,6 @@ func initializeFlags() {
maxDevs = *maxDevsPtr
}
//Directories
tmpDir = *tmpDirPtr
assignDir = *assignDirPtr
//Devices and addresses list related files
devsFile = *devsFilePtr
devsChecksumFile = *devsChecksumFilePtr
//Timers for tickers (intervals)
if *devsCheckIntervalPtr > 0 {
devsCheckInterval = *devsCheckIntervalPtr
......@@ -282,118 +236,6 @@ func readChecksumFile(filename string, errLevel int) string {
return sha256
}
//Read the last SHA256 checksum of the devices and IPv4 addresses GO struct
func setCurrentAllDevsChecksum() {
allDevsChecksum = readChecksumFile(fmt.Sprintf("%s/%s", tmpDir, devsChecksumFile), ERROR)
fmt.Println("Current all devices list checksum set to", allDevsChecksum)
}
func checkAllDevsChecksum() {
if allDevsChecksum != readChecksumFile(fmt.Sprintf("%s/%s", tmpDir, devsChecksumFile), ERROR) {
fmt.Println("SHA256 has changed since last check")
setCurrentAllDevsChecksum()
//allDevs = readDevsaddsFile(fmt.Sprintf("%s/%s", tmpDir, devsFile), ERROR)
allDevs = apidote.ReadDevicesFromAntidote()
return
}
fmt.Println("SHA256 has not changed since last check")
}
// //DEPRECATED: Import all the monitor=>device assignations
// func import_all_assign_files() {
//
// //List all the JSON files in the assignations dir
// json_files, err := ioutil.ReadDir(fmt.Sprintf("%s", assignDir))
// errCheck(err, INFO, "TODO")
//
// for _, v := range json_files {
// if v.Name() != fmt.Sprintf("%s.json", id) {
// fmt.Println("Importing assignation file", v.Name())
// }
//
// monitorID := strings.Replace(v.Name(), ".json", "", -1)
//
// //Import all json files except the current instance's one
// if monitorID != ID {
// fmt.Println("Adding assignation list from monitor", monitorID, "to global assignation list")
// adevsadds := readDevsaddsFile(fmt.Sprintf("%s/%s", assignDir, v.Name()), ERROR)
// for _, w := range adevsadds {
// globalAssign = assignDeviceMonitor(globalAssign, w.Id, monitorID)
// }
// }
// }
// }
//Import all the monitor=>device assignations
func updateGlobalImportAllAssignFiles() {
//List all the assignation and checksum files in the assignations dir
assignDirFiles, err := ioutil.ReadDir(fmt.Sprintf("%s", assignDir))
errCheck(err, WARNING, fmt.Sprintf("The global assignation dir %s can not be scanned for files", assignDir))
//Iterate all the files to get all the monitors' info
for _, jfile := range assignDirFiles {
//Get the monitor ID from the filename
monitorID := strings.Replace(jfile.Name(), ".json", "", -1)
//But only pick only .json files, not .checksum, and exclude the local instance
if monitorID != ID && !jfile.IsDir() && jfile.Name() != monitorID {
//Find the corresponding .checksum file
for _, cfile := range assignDirFiles {
//We have the json file and the checksum file in jfile and cfile
if !cfile.IsDir() && cfile.Name() == fmt.Sprintf("%s.checksum", monitorID) {
//Read the checksum from the file
monitorChecksum := readChecksumFile(fmt.Sprintf("%s/%s", assignDir, cfile.Name()), ERROR)
//If we got something as a checksum (TODO: check it is an actual checksum)
if monitorChecksum != "" {
//Check if the monitor is already known
if isMonitorInSlice(monitorID, globalAssignChecksums) {
//Check if it must be updated
if monitorChecksum != globalAssignChecksums[posMonitorInSlice(monitorID, globalAssignChecksums)].Checksum {
//Update it
fmt.Println("Updating assignation list from monitor", monitorID)
monitorDevsAssign := readDevsaddsFile(fmt.Sprintf("%s/%s", assignDir, jfile.Name()), ERROR)
//Iterate all the assigned devices
for _, v := range monitorDevsAssign {
//Add them to the global list
globalAssign = assignDeviceMonitor(globalAssign, v.ID, monitorID)
}
//Save the new checksum
globalAssignChecksums[posMonitorInSlice(monitorID, globalAssignChecksums)].Checksum = monitorChecksum
}
} else {
//Add new monitor to the list
fmt.Println("Importing assignation list from monitor", monitorID)
thisChecksum := cnml.AssignChecksum{MonitorID: monitorID, Checksum: monitorChecksum}
globalAssignChecksums = append(globalAssignChecksums, thisChecksum)
monitorDevsAssign := readDevsaddsFile(fmt.Sprintf("%s/%s", assignDir, jfile.Name()), ERROR)
//Iterate all the assigned devices
for _, v := range monitorDevsAssign {
//Add them to the global list
globalAssign = assignDeviceMonitor(globalAssign, v.ID, monitorID)
}
}
}
}
}
}
// monitorID := strings.Replace(v.Name(), ".json", "", -1)
// if v.Name() != fmt.Sprintf("%s.json", id) {
// fmt.Println("Importing assignation file", v.Name())
// }
//
// monitorID := strings.Replace(v.Name(), ".json", "", -1)
//
// //Import all json files except the current instance's one
// if monitorID != ID {
// fmt.Println("Adding assignation list from monitor", monitorID, "to global assignation list")
// adevsadds := readDevsaddsFile(fmt.Sprintf("%s/%s", assignDir, v.Name()), ERROR)
// for _, w := range adevsadds {
// globalAssign = assignDeviceMonitor(globalAssign, w.Id, monitorID)
// }
// }
}
}
//Import all the monitor=>device assignations
func updateGlobalAssignationList() {
......@@ -537,7 +379,6 @@ func reassignDevs() {
for _, v := range localAssign {
apidote.AntidoteAddItemToSetInBucket(fmt.Sprintf("device-%d", v.ID), "monitors", ID)
}
exportDevsaddrsFile(fmt.Sprintf("%s/%s.json", assignDir, ID))
}
}
......@@ -631,7 +472,8 @@ func startAllDevsChecksumTimer() {
allDevsChecksumTicker := time.NewTicker(time.Duration(devsCheckInterval) * time.Second)
go func() {
for range allDevsChecksumTicker.C {
checkAllDevsChecksum()
// TODO: global checksum via AntidoteDB must be implemented in fetch app
// checkAllDevsChecksum()
}
}()
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment