Developing lightweight computation at the DSG edge

Commit f2783624 authored by Roger Pueyo Centelles's avatar Roger Pueyo Centelles
Browse files

[goclidote] Gracefully handle AntidoteDB communication errors



The goclidote library now correctly handles communication errors with
AntidoteDB and avoids continuing a transaction if an error has
occurred.
Signed-off-by: Roger Pueyo Centelles's avatarRoger Pueyo Centelles <rpueyo@ac.upc.edu>
parent f4e5deda
......@@ -7,7 +7,7 @@ import (
"uc-monitor-go-test/cnml"
"github.com/AntidoteDB/antidote-go-client"
antidoteclient "github.com/AntidoteDB/antidote-go-client"
"github.com/golang/glog"
)
......@@ -38,28 +38,36 @@ func AntidoteAddItemToSetInBucket(bucketName string, setName string, itemValue s
errCheck(err, ERROR, "[AntidoteAddItemToSetInBucket]: Unable to create a new AntidoteDB go client")
defer client.Close()
tx, err := client.StartTransaction()
errCheck(err, ERROR, "[AntidoteAddItemToSetInBucket]: Unable to start a transaction using the go client")
bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
key := antidoteclient.Key(setName)
err = bucket.Update(tx, antidoteclient.SetAdd(key, []byte(itemValue)))
errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInBucket]: Unable to add value %s to set %s in bucket %s", itemValue, setName, bucketName))
setVal, err := bucket.ReadSet(tx, key)
errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInBucket]: Unable to read set %s in bucket %s", setName, bucketName))
if err == nil {
tx, err := client.StartTransaction()
errCheck(err, ERROR, "[AntidoteAddItemToSetInBucket]: Unable to start a transaction using the go client")
err = tx.Commit()
errCheck(err, ERROR, "[AntidoteAddItemToSetInBucket]: Unable to commit transaction")
if err == nil {
bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
key := antidoteclient.Key(setName)
// Return true if the value belongs now to the set (either it has been added now, or was there already)
for _, v := range setVal {
if string(v) == itemValue {
return true
err = bucket.Update(tx, antidoteclient.SetAdd(key, []byte(itemValue)))
errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInBucket]: Unable to add value %s to set %s in bucket %s", itemValue, setName, bucketName))
if err == nil {
setVal, err := bucket.ReadSet(tx, key)
errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInBucket]: Unable to read set %s in bucket %s", setName, bucketName))
if err == nil {
err = tx.Commit()
errCheck(err, ERROR, "[AntidoteAddItemToSetInBucket]: Unable to commit transaction")
if err == nil {
// Return true if the value belongs now to the set (either it has been added now, or was there already)
for _, v := range setVal {
if string(v) == itemValue {
return true
}
}
}
}
}
}
}
// Return false otherwise
return false
}
......@@ -70,138 +78,170 @@ func AntidoteAddItemToSetInMapInBucket(bucketName string, mapName string, setNam
errCheck(err, ERROR, "[AntidoteAddItemToSetInMapInBucket]: Unable to create a new AntidoteDB go client")
defer client.Close()
tx, err := client.StartTransaction()
errCheck(err, ERROR, "[AntidoteAddItemToSetInMapInBucket]: Unable to start a transaction using the go client")
bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
mapKey := antidoteclient.Key(mapName)
setKey := antidoteclient.Key(setName)
err = bucket.Update(tx, antidoteclient.MapUpdate(mapKey,
antidoteclient.SetAdd(setKey, []byte(itemValue))))
errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInMapInBucket]: Unable to add value %s to set %s in bucket %s", itemValue, setName, bucketName))
mapVal, err := bucket.ReadMap(tx, mapKey)
errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInMapInBucket]: Unable to read set %s in bucket %s", setName, bucketName))
setVal, err := mapVal.Set(setKey)
errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInMapInBucket]: Unable to read set %s in bucket %s", setName, bucketName))
err = tx.Commit()
errCheck(err, ERROR, "[AntidoteAddItemToSetInMapInBucket]: Unable to commit transaction")
// Return true if the value belongs now to the set (either it has been added now, or was there already)
for _, v := range setVal {
if string(v) == itemValue {
return true
if err == nil {
tx, err := client.StartTransaction()
errCheck(err, ERROR, "[AntidoteAddItemToSetInMapInBucket]: Unable to start a transaction using the go client")
if err == nil {
bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
mapKey := antidoteclient.Key(mapName)
setKey := antidoteclient.Key(setName)
err = bucket.Update(tx, antidoteclient.MapUpdate(mapKey,
antidoteclient.SetAdd(setKey, []byte(itemValue))))
errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInMapInBucket]: Unable to add value %s to set %s in bucket %s", itemValue, setName, bucketName))
if err == nil {
mapVal, err := bucket.ReadMap(tx, mapKey)
errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInMapInBucket]: Unable to read set %s in bucket %s", setName, bucketName))
if err == nil {
setVal, err := mapVal.Set(setKey)
errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddItemToSetInMapInBucket]: Unable to read set %s in bucket %s", setName, bucketName))
if err == nil {
err = tx.Commit()
errCheck(err, ERROR, "[AntidoteAddItemToSetInMapInBucket]: Unable to commit transaction")
if err == nil {
// Return true if the value belongs now to the set (either it has been added now, or was there already)
for _, v := range setVal {
if string(v) == itemValue {
return true
}
}
}
}
}
}
}
}
// Return false otherwise
return false
}
// AntidoteAddArrayToSetInMapInBucket save an item to a set in a map in a bucket
func AntidoteAddArrayToSetInMapInBucket(bucketName string, mapName string, setName string, itemArray []string, DatabaseHost string, DatabasePort int) bool {
// Return true if all values belong now to the set (either they have been added now, or were there already)
anyMissing := false
client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
errCheck(err, ERROR, "[AntidoteAddArrayToSetInMapInBucket]: Unable to create a new AntidoteDB go client")
defer client.Close()
tx, err := client.StartTransaction()
errCheck(err, ERROR, "[AntidoteAddArrayToSetInMapInBucket]: Unable to start a transaction using the go client")
bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
mapKey := antidoteclient.Key(mapName)
setKey := antidoteclient.Key(setName)
for _, v := range itemArray {
err = bucket.Update(tx, antidoteclient.MapUpdate(mapKey,
antidoteclient.SetAdd(setKey, []byte(v))))
errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddArrayToSetInMapInBucket]: Unable to add value %s to set %s in bucket %s", v, setName, bucketName))
time.Sleep(time.Millisecond)
}
mapVal, err := bucket.ReadMap(tx, mapKey)
errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddArrayToSetInMapInBucket]: Unable to read set %s in bucket %s", setName, bucketName))
setVal, err := mapVal.Set(setKey)
errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddArrayToSetInMapInBucket]: Unable to read set %s in bucket %s", setName, bucketName))
err = tx.Commit()
errCheck(err, ERROR, "[AntidoteAddArrayToSetInMapInBucket]: Unable to commit transaction")
// Return true if all values belong now to the set (either they have been added now, or were there already)
anyMissing := false
for _, v := range itemArray {
thisFound := false
for _, w := range setVal {
if v == string(w) {
thisFound = true
if err == nil {
tx, err := client.StartTransaction()
errCheck(err, ERROR, "[AntidoteAddArrayToSetInMapInBucket]: Unable to start a transaction using the go client")
if err == nil {
bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
mapKey := antidoteclient.Key(mapName)
setKey := antidoteclient.Key(setName)
for _, v := range itemArray {
err = bucket.Update(tx, antidoteclient.MapUpdate(mapKey,
antidoteclient.SetAdd(setKey, []byte(v))))
errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddArrayToSetInMapInBucket]: Unable to add value %s to set %s in bucket %s", v, setName, bucketName))
if err == nil {
time.Sleep(time.Millisecond)
}
}
if !thisFound {
anyMissing = true
break
mapVal, err := bucket.ReadMap(tx, mapKey)
errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddArrayToSetInMapInBucket]: Unable to read set %s in bucket %s", setName, bucketName))
if err == nil {
setVal, err := mapVal.Set(setKey)
errCheck(err, ERROR, fmt.Sprintf("[AntidoteAddArrayToSetInMapInBucket]: Unable to read set %s in bucket %s", setName, bucketName))
if err == nil {
err = tx.Commit()
errCheck(err, ERROR, "[AntidoteAddArrayToSetInMapInBucket]: Unable to commit transaction")
if err == nil {
for _, v := range itemArray {
thisFound := false
for _, w := range setVal {
if v == string(w) {
thisFound = true
}
if !thisFound {
anyMissing = true
break
}
}
}
}
}
}
}
}
// Return if all itemswere found
return !anyMissing
}
// AntidoteAddArrayToSetInMapInMapInBucket save an item to a set in a map in a map in a bucket
func AntidoteAddArrayToSetInMapInMapInBucket(bucketName string, outerMapName string, innerMapName string, setName string, itemArray []string, DatabaseHost string, DatabasePort int) bool {
// Return true if all values belong now to the set (either they have been added now, or were there already)
anyMissing := false
funcName := "AntidoteAddArrayToSetInMapInMapInBucket"
client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
errCheck(err, ERROR, "["+funcName+"]: Unable to create a new AntidoteDB go client")
defer client.Close()
tx, err := client.StartTransaction()
errCheck(err, ERROR, "["+funcName+"]: Unable to start a transaction using the go client")
bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
outerMapKey := antidoteclient.Key(outerMapName)
innerMapKey := antidoteclient.Key(innerMapName)
setKey := antidoteclient.Key(setName)
for _, v := range itemArray {
err = bucket.Update(tx, antidoteclient.MapUpdate(outerMapKey,
antidoteclient.MapUpdate(innerMapKey,
antidoteclient.SetAdd(setKey, []byte(v)))))
errCheck(err, ERROR, fmt.Sprintf("["+funcName+"]: Unable to add value %s to set %s in map %s in map %s in bucket %s", v, setName, innerMapName, outerMapName, bucketName))
time.Sleep(time.Millisecond)
}
outerMapVal, err := bucket.ReadMap(tx, outerMapKey)
errCheck(err, ERROR, fmt.Sprintf("["+funcName+"]: Unable to read map %s in bucket %s", outerMapName, bucketName))
innerMapVal, err := outerMapVal.Map(innerMapKey)
errCheck(err, ERROR, fmt.Sprintf("["+funcName+"]: Unable to read map %s in map %s in bucket %s", innerMapName, outerMapName, bucketName))
setVal, err := innerMapVal.Set(setKey)
errCheck(err, ERROR, fmt.Sprintf("["+funcName+"]: Unable to read set %s in bucket %s", setName, bucketName))
err = tx.Commit()
errCheck(err, ERROR, "["+funcName+"]: Unable to commit transaction")
// Return true if all values belong now to the set (either they have been added now, or were there already)
anyMissing := false
for _, v := range itemArray {
thisFound := false
for _, w := range setVal {
if v == string(w) {
thisFound = true
if err == nil {
tx, err := client.StartTransaction()
errCheck(err, ERROR, "["+funcName+"]: Unable to start a transaction using the go client")
if err == nil {
bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
outerMapKey := antidoteclient.Key(outerMapName)
innerMapKey := antidoteclient.Key(innerMapName)
setKey := antidoteclient.Key(setName)
for _, v := range itemArray {
err = bucket.Update(tx, antidoteclient.MapUpdate(outerMapKey,
antidoteclient.MapUpdate(innerMapKey,
antidoteclient.SetAdd(setKey, []byte(v)))))
errCheck(err, ERROR, fmt.Sprintf("["+funcName+"]: Unable to add value %s to set %s in map %s in map %s in bucket %s", v, setName, innerMapName, outerMapName, bucketName))
if err == nil {
time.Sleep(time.Millisecond)
}
}
if !thisFound {
anyMissing = true
break
outerMapVal, err := bucket.ReadMap(tx, outerMapKey)
errCheck(err, ERROR, fmt.Sprintf("["+funcName+"]: Unable to read map %s in bucket %s", outerMapName, bucketName))
if err == nil {
innerMapVal, err := outerMapVal.Map(innerMapKey)
errCheck(err, ERROR, fmt.Sprintf("["+funcName+"]: Unable to read map %s in map %s in bucket %s", innerMapName, outerMapName, bucketName))
if err == nil {
setVal, err := innerMapVal.Set(setKey)
errCheck(err, ERROR, fmt.Sprintf("["+funcName+"]: Unable to read set %s in bucket %s", setName, bucketName))
if err == nil {
err = tx.Commit()
errCheck(err, ERROR, "["+funcName+"]: Unable to commit transaction")
if err == nil {
for _, v := range itemArray {
thisFound := false
for _, w := range setVal {
if v == string(w) {
thisFound = true
}
if !thisFound {
anyMissing = true
break
}
}
}
}
}
}
}
}
}
// Return if all itemswere found
return !anyMissing
}
......@@ -209,22 +249,29 @@ func AntidoteAddArrayToSetInMapInMapInBucket(bucketName string, outerMapName str
// AntidoteReadRegisterInBucket read a LWWW register in a bucket
func AntidoteReadRegisterInBucket(bucketName string, registerName string, DatabaseHost string, DatabasePort int) string {
var setVal []byte
client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
errCheck(err, ERROR, "[AntidoteReadRegisterInBucket]: Unable to create a new AntidoteDB go client")
defer client.Close()
tx, err := client.StartTransaction()
errCheck(err, ERROR, fmt.Sprintf("[AntidoteReadRegisterInBucket]: Unable to start a transaction using the go client. bucketName: %s, registerName %s, DatabaseHost %s, DatabasePort %d\n", bucketName, registerName, DatabaseHost, DatabasePort))
bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
key := antidoteclient.Key(registerName)
if err == nil {
tx, err := client.StartTransaction()
errCheck(err, ERROR, fmt.Sprintf("[AntidoteReadRegisterInBucket]: Unable to start a transaction using the go client. bucketName: %s, registerName %s, DatabaseHost %s, DatabasePort %d\n", bucketName, registerName, DatabaseHost, DatabasePort))
setVal, err := bucket.ReadReg(tx, key)
errCheck(err, ERROR, fmt.Sprintf("[AntidoteReadRegisterInBucket]: Unable to read register %s in bucket %s", registerName, bucketName))
bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
key := antidoteclient.Key(registerName)
err = tx.Commit()
errCheck(err, ERROR, "[AntidoteReadRegisterInBucket]: Unable to commit transaction")
if err == nil {
setVal, err = bucket.ReadReg(tx, key)
errCheck(err, ERROR, fmt.Sprintf("[AntidoteReadRegisterInBucket]: Unable to read register %s in bucket %s", registerName, bucketName))
if err == nil {
err = tx.Commit()
errCheck(err, ERROR, "[AntidoteReadRegisterInBucket]: Unable to commit transaction")
}
}
}
return string(setVal[:])
}
......@@ -233,29 +280,37 @@ func AntidoteSetRegisterInBucket(bucketName string, registerName string, itemVal
client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
errCheck(err, ERROR, fmt.Sprintf("[AntidoteSetRegisterInBucket]: Unable to create a new AntidoteDB go client. bucketName: %s, registerName %s, itemValue %s, DatabaseHost %s, DatabasePort %d\n", bucketName, registerName, itemValue, DatabaseHost, DatabasePort))
defer client.Close()
tx, err := client.StartTransaction()
errCheck(err, ERROR, fmt.Sprintf("[AntidoteSetRegisterInBucket]: Unable to start a transaction using the go client. bucketName: %s, registerName %s, itemValue %s, DatabaseHost %s, DatabasePort %d\n", bucketName, registerName, itemValue, DatabaseHost, DatabasePort))
if err == nil {
tx, err := client.StartTransaction()
errCheck(err, ERROR, fmt.Sprintf("[AntidoteSetRegisterInBucket]: Unable to start a transaction using the go client. bucketName: %s, registerName %s, itemValue %s, DatabaseHost %s, DatabasePort %d\n", bucketName, registerName, itemValue, DatabaseHost, DatabasePort))
bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
key := antidoteclient.Key(registerName)
if err == nil {
bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
key := antidoteclient.Key(registerName)
err = bucket.Update(tx, antidoteclient.RegPut(key, []byte(itemValue)))
errCheck(err, ERROR, fmt.Sprintf("[AntidoteSetRegisterInBucket]: Unable to set value %s to register %s in bucket %s", itemValue, registerName, bucketName))
if err == nil {
err = bucket.Update(tx, antidoteclient.RegPut(key, []byte(itemValue)))
errCheck(err, ERROR, fmt.Sprintf("[AntidoteSetRegisterInBucket]: Unable to set value %s to register %s in bucket %s", itemValue, registerName, bucketName))
setVal, err := bucket.ReadReg(tx, key)
errCheck(err, ERROR, fmt.Sprintf("[AntidoteSetRegisterInBucket]: Unable to read register %s in bucket %s", registerName, bucketName))
if err == nil {
setVal, err := bucket.ReadReg(tx, key)
errCheck(err, ERROR, fmt.Sprintf("[AntidoteSetRegisterInBucket]: Unable to read register %s in bucket %s", registerName, bucketName))
err = tx.Commit()
errCheck(err, ERROR, "[AntidoteSetRegisterInBucket]: Unable to commit transaction")
if err == nil {
err = tx.Commit()
errCheck(err, ERROR, "[AntidoteSetRegisterInBucket]: Unable to commit transaction")
// Return true if the register value is itemValue
if string(setVal[:]) == itemValue {
return true
// Return true if the register value is itemValue
if string(setVal[:]) == itemValue {
return true
}
}
}
}
}
}
// Return false otherwise
return false
}
......@@ -267,26 +322,36 @@ func AntidoteRemoveRegisterInBucket(bucketName string, registerName string, Data
errCheck(err, ERROR, "[AntidoteRemoveRegisterInBucket]: Unable to create a new AntidoteDB go client")
defer client.Close()
tx, err := client.StartTransaction()
errCheck(err, ERROR, "[AntidoteRemoveRegisterInBucket]: Unable to start a transaction using the go client")
bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
key := antidoteclient.Key(registerName)
err = bucket.Update(tx, antidoteclient.RegPut(key, []byte("")))
errCheck(err, ERROR, fmt.Sprintf("[AntidoteRemoveRegisterInBucket]: Unable to set a null value to register %s in bucket %s", registerName, bucketName))
regVal, err := bucket.ReadReg(tx, key)
errCheck(err, ERROR, fmt.Sprintf("[AntidoteRemoveRegisterInBucket]: Unable to read register %s in bucket %s", registerName, bucketName))
err = tx.Commit()
errCheck(err, ERROR, "[AntidoteRemoveRegisterInBucket]: Unable to commit transaction")
// Return true if the register value is null (0 length)
if len(regVal) == 0 {
return true
if err == nil {
tx, err := client.StartTransaction()
errCheck(err, ERROR, "[AntidoteRemoveRegisterInBucket]: Unable to start a transaction using the go client")
if err == nil {
bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
key := antidoteclient.Key(registerName)
if err == nil {
err = bucket.Update(tx, antidoteclient.RegPut(key, []byte("")))
errCheck(err, ERROR, fmt.Sprintf("[AntidoteRemoveRegisterInBucket]: Unable to set a null value to register %s in bucket %s", registerName, bucketName))
if err == nil {
regVal, err := bucket.ReadReg(tx, key)
errCheck(err, ERROR, fmt.Sprintf("[AntidoteRemoveRegisterInBucket]: Unable to read register %s in bucket %s", registerName, bucketName))
if err == nil {
err = tx.Commit()
errCheck(err, ERROR, "[AntidoteRemoveRegisterInBucket]: Unable to commit transaction")
if err == nil {
// Return true if the register value is null (0 length)
if len(regVal) == 0 {
return true
}
}
}
}
}
}
}
// Return false otherwise
return false
}
......@@ -300,56 +365,69 @@ func AntidoteReadItemsFromSetInBucket(bucketName string, setName string, Databas
errCheck(err, ERROR, fmt.Sprintf("[AntidoteReadItemsFromSetInBucket]: Unable to create a new AntidoteDB go client. bucketName %s, setName %s, DatabaseHost %s, DatabasePort %d\n", bucketName, setName, DatabaseHost, DatabasePort))
defer client.Close()
tx, err := client.StartTransaction()
errCheck(err, ERROR, fmt.Sprintf("[AntidoteReadItemsFromSetInBucket]: Unable to start a transaction using the go client. bucketName: %s, setName: %s, DatabaseHost %s, DatabasePort %d\n", bucketName, setName, DatabaseHost, DatabasePort))
if err == nil {
tx, err := client.StartTransaction()
errCheck(err, ERROR, fmt.Sprintf("[AntidoteReadItemsFromSetInBucket]: Unable to start a transaction using the go client. bucketName: %s, setName: %s, DatabaseHost %s, DatabasePort %d\n", bucketName, setName, DatabaseHost, DatabasePort))
bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
key := antidoteclient.Key(setName)
if err == nil {
bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
key := antidoteclient.Key(setName)
setVal, err := bucket.ReadSet(tx, key)
errCheck(err, ERROR, fmt.Sprintf("[AntidoteReadItemsFromSetInBucket]: Unable to read set %s in bucket %s", setName, bucketName))
setVal, err := bucket.ReadSet(tx, key)
errCheck(err, ERROR, fmt.Sprintf("[AntidoteReadItemsFromSetInBucket]: Unable to read set %s in bucket %s", setName, bucketName))
err = tx.Commit()
errCheck(err, ERROR, "[AntidoteReadItemsFromSetInBucket]: Unable to commit transaction")
if err == nil {
err = tx.Commit()
errCheck(err, ERROR, "[AntidoteReadItemsFromSetInBucket]: Unable to commit transaction")
for _, v := range setVal {
items = append(items, string(v))
for _, v := range setVal {
items = append(items, string(v))
}
}
}
}
return items
}
// AntidoteRemoveItemFromSetInBucket removes an item from a set in a bucket
func AntidoteRemoveItemFromSetInBucket(bucketName string, setName string, itemValue string, DatabaseHost string, DatabasePort int) bool {
itemNotInSet := true
client, err := antidoteclient.NewClient(antidoteclient.Host{Name: DatabaseHost, Port: DatabasePort})
errCheck(err, ERROR, "[AntidoteRemoveItemFromSetInBucket]: Unable to create a new AntidoteDB go client")
defer client.Close()
tx, err := client.StartTransaction()
errCheck(err, ERROR, "[AntidoteRemoveItemFromSetInBucket]: Unable to start a transaction using the go client")
bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
key := antidoteclient.Key(setName)
err = bucket.Update(tx, antidoteclient.SetRemove(key, []byte(itemValue)))
errCheck(err, ERROR, fmt.Sprintf("[AntidoteRemoveItemFromSetInBucket]: Unable to remove item %s from set %s in bucket %s", itemValue, setName, bucketName))
setVal, err := bucket.ReadSet(tx, key)
errCheck(err, ERROR, fmt.Sprintf("[AntidoteRemoveItemFromSetInBucket]: Unable to read set %s in bucket %s", setName, bucketName))
err = tx.Commit()
errCheck(err, ERROR, "[AntidoteRemoveItemFromSetInBucket]: Unable to commit transaction")
itemNotInSet := true
// Return false if the item remains in the set
for _, v := range setVal {
if string(v[:]) == itemValue {
itemNotInSet = false
if err == nil {
tx, err := client.StartTransaction()
errCheck(err, ERROR, "[AntidoteRemoveItemFromSetInBucket]: Unable to start a transaction using the go client")
if err == nil {
bucket := antidoteclient.Bucket{Bucket: []byte(bucketName)}
key := antidoteclient.Key(setName)
if err == nil {
err = bucket.Update(tx, antidoteclient.SetRemove(key, []byte(itemValue)))
errCheck(err, ERROR, fmt.Sprintf("[AntidoteRemoveItemFromSetInBucket]: Unable to remove item %s from set %s in bucket %s", itemValue, setName, bucketName))
if err == nil {
setVal, err := bucket.ReadSet(tx, key)
errCheck(err, ERROR, fmt.Sprintf("[AntidoteRemoveItemFromSetInBucket]: Unable to read set %s in bucket %s", setName, bucketName))
if err == nil {
err = tx.Commit()
errCheck(err, ERROR, "[AntidoteRemoveItemFromSetInBucket]: Unable to commit transaction")
// Return false if the item remains in the set
for _, v := range setVal {
if string(v[:]) == itemValue {
itemNotInSet = false
}
}
}
}
}
}
}
// Return true otherwise (even if it had never been there)
return itemNotInSet
}
......@@ -363,19 +441,20 @@ func ReadDevicesFromAntidote(DatabaseHost string, DatabasePort int) []cnml.Devic
thisDevID, err := strconv.Atoi(v)
errCheck(err, WARNING, fmt.Sprintf("[ReadDevicesFromAntidote]: An error occurred when converting %s to int", v))
//fmt.Printf("%s, ",v)
var thisDev cnml.DeviceIpv4sGraphserver
thisDev.ID = thisDevID
thisDev.GraphServer, err = strconv.Atoi(AntidoteReadRegisterInBucket(fmt.Sprintf("device-%d", thisDevID), "graphserver", DatabaseHost, DatabasePort))
if err != nil {
thisDev.GraphServer = 0
if err == nil {
var thisDev cnml.DeviceIpv4sGraphserver
thisDev.ID = thisDevID
thisDev.GraphServer, err = strconv.Atoi(AntidoteReadRegisterInBucket(fmt.Sprintf("device-%d", thisDevID), "graphserver", DatabaseHost, DatabasePort))
if err != nil {
thisDev.GraphServer = 0
}
time.Sleep(time.Millisecond)