Developing lightweight computation at the DSG edge

coder.go 3.59 KB
Newer Older
Mathias Weber's avatar
Mathias Weber committed
1
2
3
package antidoteclient

import (
Peter Zeller's avatar
Peter Zeller committed
4
5
6
	"encoding/binary"
	"fmt"
	"io"
Mathias Weber's avatar
Mathias Weber committed
7
8

	"github.com/golang/protobuf/proto"
Mathias Weber's avatar
Mathias Weber committed
9
10
11
)

func readMsgRaw(reader io.Reader) (data []byte, err error) {
Peter Zeller's avatar
Peter Zeller committed
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
	sizeB := make([]byte, 4)
	var count uint32
	// read the size of the message
	count = 0
	for count != 4 {
		n, err := reader.Read(sizeB[count:])
		if err != nil {
			return nil, err
		}
		count += uint32(n)
	}
	sizeI := binary.BigEndian.Uint32(sizeB)
	data = make([]byte, sizeI)
	// read data
	count = 0
	for count != sizeI {
		n, err := reader.Read(data[count:])
		if err != nil {
			data = nil
			return nil, err
		}
		count += uint32(n)
	}
	return
Mathias Weber's avatar
Mathias Weber committed
36
37
}

38
func (op *ApbReadObjects) encode(writer io.Writer) (err error) {
Peter Zeller's avatar
Peter Zeller committed
39
	return encodeMsg(op, 116, writer)
40
41
42
}

func (op *ApbUpdateObjects) encode(writer io.Writer) (err error) {
Peter Zeller's avatar
Peter Zeller committed
43
	return encodeMsg(op, 118, writer)
44
45
}

Mathias Weber's avatar
Mathias Weber committed
46
func (op *ApbStartTransaction) encode(writer io.Writer) (err error) {
Peter Zeller's avatar
Peter Zeller committed
47
	return encodeMsg(op, 119, writer)
48
49
50
}

func (op *ApbAbortTransaction) encode(writer io.Writer) (err error) {
Peter Zeller's avatar
Peter Zeller committed
51
	return encodeMsg(op, 120, writer)
52
53
54
}

func (op *ApbCommitTransaction) encode(writer io.Writer) (err error) {
Peter Zeller's avatar
Peter Zeller committed
55
	return encodeMsg(op, 121, writer)
56
57
58
}

func (op *ApbStaticUpdateObjects) encode(writer io.Writer) (err error) {
Peter Zeller's avatar
Peter Zeller committed
59
	return encodeMsg(op, 122, writer)
60
61
62
}

func (op *ApbStaticReadObjects) encode(writer io.Writer) (err error) {
Peter Zeller's avatar
Peter Zeller committed
63
	return encodeMsg(op, 123, writer)
64
65
66
}

func encodeMsg(message proto.Message, msgCode byte, writer io.Writer) (err error) {
Peter Zeller's avatar
Peter Zeller committed
67
68
69
70
71
72
	msg, err := proto.Marshal(message)
	if err != nil {
		return
	}
	msgsize := len(msg)
	buf := make([]byte, 5)
Mathias Weber's avatar
Mathias Weber committed
73
	binary.BigEndian.PutUint32(buf[0:4], uint32(msgsize+1))
Mathias Weber's avatar
Mathias Weber committed
74
	buf[4] = msgCode
Peter Zeller's avatar
Peter Zeller committed
75
76
77
	writer.Write(buf)
	writer.Write(msg)
	return nil
Mathias Weber's avatar
Mathias Weber committed
78
79
}

80
func decodeOperationResp(reader io.Reader) (op *ApbOperationResp, err error) {
Peter Zeller's avatar
Peter Zeller committed
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
	data, err := readMsgRaw(reader)
	if err != nil {
		return
	}
	switch data[0] {
	case 111:
		// transaction response
		resp := &ApbOperationResp{}
		err = proto.Unmarshal(data[1:], resp)
		if err != nil {
			return
		}
		op = resp
		return
	}
	err = fmt.Errorf("invalid message code: %d", data[0])
	return
98
99
}

Mathias Weber's avatar
Mathias Weber committed
100
func decodeStartTransactionResp(reader io.Reader) (op *ApbStartTransactionResp, err error) {
Peter Zeller's avatar
Peter Zeller committed
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
	data, err := readMsgRaw(reader)
	if err != nil {
		return
	}
	switch data[0] {
	case 124:
		// transaction response
		resp := &ApbStartTransactionResp{}
		err = proto.Unmarshal(data[1:], resp)
		if err != nil {
			return
		}
		op = resp
		return
	}
	err = fmt.Errorf("invalid message code: %d", data[0])
	return
Mathias Weber's avatar
Mathias Weber committed
118
119
}

120
func decodeReadObjectsResp(reader io.Reader) (op *ApbReadObjectsResp, err error) {
Peter Zeller's avatar
Peter Zeller committed
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
	data, err := readMsgRaw(reader)
	if err != nil {
		return
	}
	switch data[0] {
	case 126:
		// transaction response
		resp := &ApbReadObjectsResp{}
		err = proto.Unmarshal(data[1:], resp)
		if err != nil {
			return
		}
		op = resp
		return
	}
	err = fmt.Errorf("invalid message code: %d", data[0])
	return
138
139
140
}

func decodeCommitResp(reader io.Reader) (op *ApbCommitResp, err error) {
Peter Zeller's avatar
Peter Zeller committed
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
	data, err := readMsgRaw(reader)
	if err != nil {
		return
	}
	switch data[0] {
	case 127:
		// transaction response
		resp := &ApbCommitResp{}
		err = proto.Unmarshal(data[1:], resp)
		if err != nil {
			return
		}
		op = resp
		return
	}
	err = fmt.Errorf("invalid message code: %d", data[0])
	return
158
159
160
}

func decodeStaticReadObjectsResp(reader io.Reader) (op *ApbStaticReadObjectsResp, err error) {
Peter Zeller's avatar
Peter Zeller committed
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
	data, err := readMsgRaw(reader)
	if err != nil {
		return
	}
	switch data[0] {
	case 128:
		// transaction response
		resp := &ApbStaticReadObjectsResp{}
		err = proto.Unmarshal(data[1:], resp)
		if err != nil {
			return
		}
		op = resp
		return
	}
	err = fmt.Errorf("invalid message code: %d", data[0])
	return
178
}