Developing lightweight computation at the DSG edge

ping.go 13.3 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// Package ping is an ICMP ping library seeking to emulate the unix "ping"
// command.
//
// Here is a very simple example that sends & receives 3 packets:
//
//	pinger, err := ping.NewPinger("www.google.com")
//	if err != nil {
//		panic(err)
//	}
//
//	pinger.Count = 3
//	pinger.Run() // blocks until finished
//	stats := pinger.Statistics() // get send/receive/rtt stats
//
// Here is an example that emulates the unix ping command:
//
//	pinger, err := ping.NewPinger("www.google.com")
//	if err != nil {
//		fmt.Printf("ERROR: %s\n", err.Error())
//		return
//	}
//
//	pinger.OnRecv = func(pkt *ping.Packet) {
//		fmt.Printf("%d bytes from %s: icmp_seq=%d time=%v\n",
//			pkt.Nbytes, pkt.IPAddr, pkt.Seq, pkt.Rtt)
//	}
//	pinger.OnFinish = func(stats *ping.Statistics) {
//		fmt.Printf("\n--- %s ping statistics ---\n", stats.Addr)
//		fmt.Printf("%d packets transmitted, %d packets received, %v%% packet loss\n",
//			stats.PacketsSent, stats.PacketsRecv, stats.PacketLoss)
//		fmt.Printf("round-trip min/avg/max/stddev = %v/%v/%v/%v\n",
//			stats.MinRtt, stats.AvgRtt, stats.MaxRtt, stats.StdDevRtt)
//	}
//
//	fmt.Printf("PING %s (%s):\n", pinger.Addr(), pinger.IPAddr())
//	pinger.Run()
//
// It sends ICMP packet(s) and waits for a response. If it receives a response,
// it calls the "receive" callback. When it's finished, it calls the "finish"
// callback.
//
// For a full ping example, see "cmd/ping/ping.go".
//
package ping

import (
47
48
	"bytes"
	"encoding/binary"
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
	"fmt"
	"math"
	"math/rand"
	"net"
	"sync"
	"syscall"
	"time"

	"golang.org/x/net/icmp"
	"golang.org/x/net/ipv4"
	"golang.org/x/net/ipv6"
)

const (
	timeSliceLength  = 8
64
	trackerLength    = 8
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
	protocolICMP     = 1
	protocolIPv6ICMP = 58
)

var (
	ipv4Proto = map[string]string{"ip": "ip4:icmp", "udp": "udp4"}
	ipv6Proto = map[string]string{"ip": "ip6:ipv6-icmp", "udp": "udp6"}
)

// NewPinger returns a new Pinger struct pointer
func NewPinger(addr string) (*Pinger, error) {
	ipaddr, err := net.ResolveIPAddr("ip", addr)
	if err != nil {
		return nil, err
	}

	var ipv4 bool
	if isIPv4(ipaddr.IP) {
		ipv4 = true
	} else if isIPv6(ipaddr.IP) {
		ipv4 = false
	}

88
	r := rand.New(rand.NewSource(time.Now().UnixNano()))
89
90
91
92
93
94
	return &Pinger{
		ipaddr:   ipaddr,
		addr:     addr,
		Interval: time.Second,
		Timeout:  time.Second * 100000,
		Count:    -1,
95
		id:       r.Intn(math.MaxInt16),
96
97
		network:  "udp",
		ipv4:     ipv4,
98
		Size:     timeSliceLength,
99
		Tracker:  r.Int63n(math.MaxInt64),
100
		done:     make(chan bool),
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
	}, nil
}

// Pinger represents ICMP packet sender/receiver
type Pinger struct {
	// Interval is the wait time between each packet send. Default is 1s.
	Interval time.Duration

	// Timeout specifies a timeout before ping exits, regardless of how many
	// packets have been received.
	Timeout time.Duration

	// Count tells pinger to stop after sending (and receiving) Count echo
	// packets. If this option is not specified, pinger will operate until
	// interrupted.
	Count int

	// Debug runs in debug mode
	Debug bool

	// Number of packets sent
	PacketsSent int

	// Number of packets received
	PacketsRecv int

	// rtts is all of the Rtts
	rtts []time.Duration

130
131
132
	// ttls is all of the Ttls
	ttls []int

133
134
135
136
137
138
	// OnRecv is called when Pinger receives and processes a packet
	OnRecv func(*Packet)

	// OnFinish is called when Pinger exits
	OnFinish func(*Statistics)

139
140
141
	// Size of packet being sent
	Size int

142
143
144
	// Tracker: Used to uniquely identify packet when non-priviledged
	Tracker int64

145
146
147
	// Source is the source IP address
	Source string

148
149
150
151
152
153
	// stop chan bool
	done chan bool

	ipaddr *net.IPAddr
	addr   string

154
155
156
	ipv4     bool
	size     int
	id       int
157
158
159
160
161
162
163
	sequence int
	network  string
}

type packet struct {
	bytes  []byte
	nbytes int
Clint Armstrong's avatar
Clint Armstrong committed
164
	ttl    int
165
166
167
168
169
170
171
172
173
174
}

// Packet represents a received and processed ICMP echo packet.
type Packet struct {
	// Rtt is the round-trip time it took to ping.
	Rtt time.Duration

	// IPAddr is the address of the host being pinged.
	IPAddr *net.IPAddr

Ben Kochie's avatar
Ben Kochie committed
175
176
177
	// Addr is the string address of the host being pinged.
	Addr string

178
179
180
181
182
	// NBytes is the number of bytes in the message.
	Nbytes int

	// Seq is the ICMP sequence number.
	Seq int
Clint Armstrong's avatar
Clint Armstrong committed
183

Maximiliano Churichi's avatar
Maximiliano Churichi committed
184
	// TTL is the Time To Live on the packet.
Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
185
	TTL int
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
}

// Statistics represent the stats of a currently running or finished
// pinger operation.
type Statistics struct {
	// PacketsRecv is the number of packets received.
	PacketsRecv int

	// PacketsSent is the number of packets sent.
	PacketsSent int

	// PacketLoss is the percentage of packets lost.
	PacketLoss float64

	// IPAddr is the address of the host being pinged.
	IPAddr *net.IPAddr

	// Addr is the string address of the host being pinged.
	Addr string

	// Rtts is all of the round-trip times sent via this pinger.
	Rtts []time.Duration

	// MinRtt is the minimum round-trip time sent via this pinger.
	MinRtt time.Duration

	// MaxRtt is the maximum round-trip time sent via this pinger.
	MaxRtt time.Duration

	// AvgRtt is the average round-trip time sent via this pinger.
	AvgRtt time.Duration

	// StdDevRtt is the standard deviation of the round-trip times sent via
	// this pinger.
	StdDevRtt time.Duration
221
222
223

	// Ttts is all of the TTLs sent via this pinger.
	Ttls []int
224
225
226
227
}

// SetIPAddr sets the ip address of the target host.
func (p *Pinger) SetIPAddr(ipaddr *net.IPAddr) {
Cameron Sparr's avatar
Cameron Sparr committed
228
229
230
231
232
233
234
	var ipv4 bool
	if isIPv4(ipaddr.IP) {
		ipv4 = true
	} else if isIPv6(ipaddr.IP) {
		ipv4 = false
	}

235
236
	p.ipaddr = ipaddr
	p.addr = ipaddr.String()
Cameron Sparr's avatar
Cameron Sparr committed
237
	p.ipv4 = ipv4
238
239
240
241
242
243
244
245
246
247
}

// IPAddr returns the ip address of the target host.
func (p *Pinger) IPAddr() *net.IPAddr {
	return p.ipaddr
}

// SetAddr resolves and sets the ip address of the target host, addr can be a
// DNS name like "www.google.com" or IP like "127.0.0.1".
func (p *Pinger) SetAddr(addr string) error {
Cameron Sparr's avatar
Cameron Sparr committed
248
	ipaddr, err := net.ResolveIPAddr("ip", addr)
249
250
251
	if err != nil {
		return err
	}
Cameron Sparr's avatar
Cameron Sparr committed
252
253

	p.SetIPAddr(ipaddr)
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
	p.addr = addr
	return nil
}

// Addr returns the string ip address of the target host.
func (p *Pinger) Addr() string {
	return p.addr
}

// SetPrivileged sets the type of ping pinger will send.
// false means pinger will send an "unprivileged" UDP ping.
// true means pinger will send a "privileged" raw ICMP ping.
// NOTE: setting to true requires that it be run with super-user privileges.
func (p *Pinger) SetPrivileged(privileged bool) {
	if privileged {
		p.network = "ip"
	} else {
		p.network = "udp"
	}
}

// Privileged returns whether pinger is running in privileged mode.
func (p *Pinger) Privileged() bool {
	return p.network == "ip"
}

// Run runs the pinger. This is a blocking function that will exit when it's
// done. If Count or Interval are not specified, it will run continuously until
// it is interrupted.
func (p *Pinger) Run() {
	p.run()
}

func (p *Pinger) run() {
	var conn *icmp.PacketConn
	if p.ipv4 {
290
		if conn = p.listen(ipv4Proto[p.network]); conn == nil {
291
292
			return
		}
Clint Armstrong's avatar
Clint Armstrong committed
293
		conn.IPv4PacketConn().SetControlMessage(ipv4.FlagTTL, true)
294
	} else {
295
		if conn = p.listen(ipv6Proto[p.network]); conn == nil {
296
297
			return
		}
Clint Armstrong's avatar
Clint Armstrong committed
298
		conn.IPv6PacketConn().SetControlMessage(ipv6.FlagHopLimit, true)
299
300
301
302
303
304
	}
	defer conn.Close()
	defer p.finish()

	var wg sync.WaitGroup
	recv := make(chan *packet, 5)
Ben Kochie's avatar
Ben Kochie committed
305
	defer close(recv)
306
307
308
309
310
311
312
313
314
	wg.Add(1)
	go p.recvICMP(conn, recv, &wg)

	err := p.sendICMP(conn)
	if err != nil {
		fmt.Println(err.Error())
	}

	timeout := time.NewTicker(p.Timeout)
Ben Kochie's avatar
Ben Kochie committed
315
	defer timeout.Stop()
316
	interval := time.NewTicker(p.Interval)
Ben Kochie's avatar
Ben Kochie committed
317
	defer interval.Stop()
318
319
320
321
322
323
324
325
326
327
328

	for {
		select {
		case <-p.done:
			wg.Wait()
			return
		case <-timeout.C:
			close(p.done)
			wg.Wait()
			return
		case <-interval.C:
Cameron Sparr's avatar
Cameron Sparr committed
329
330
331
			if p.Count > 0 && p.PacketsSent >= p.Count {
				continue
			}
332
333
334
335
336
337
338
339
340
			err = p.sendICMP(conn)
			if err != nil {
				fmt.Println("FATAL: ", err.Error())
			}
		case r := <-recv:
			err := p.processPacket(r)
			if err != nil {
				fmt.Println("FATAL: ", err.Error())
			}
Ben Kochie's avatar
Ben Kochie committed
341
342
343
344
345
		}
		if p.Count > 0 && p.PacketsRecv >= p.Count {
			close(p.done)
			wg.Wait()
			return
346
347
348
349
		}
	}
}

350
351
352
353
func (p *Pinger) Stop() {
	close(p.done)
}

354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
func (p *Pinger) finish() {
	handler := p.OnFinish
	if handler != nil {
		s := p.Statistics()
		handler(s)
	}
}

// Statistics returns the statistics of the pinger. This can be run while the
// pinger is running or after it is finished. OnFinish calls this function to
// get it's finished statistics.
func (p *Pinger) Statistics() *Statistics {
	loss := float64(p.PacketsSent-p.PacketsRecv) / float64(p.PacketsSent) * 100
	var min, max, total time.Duration
	if len(p.rtts) > 0 {
		min = p.rtts[0]
		max = p.rtts[0]
	}
	for _, rtt := range p.rtts {
		if rtt < min {
			min = rtt
		}
		if rtt > max {
			max = rtt
		}
		total += rtt
	}
	s := Statistics{
		PacketsSent: p.PacketsSent,
		PacketsRecv: p.PacketsRecv,
		PacketLoss:  loss,
		Rtts:        p.rtts,
		Addr:        p.addr,
		IPAddr:      p.ipaddr,
		MaxRtt:      max,
		MinRtt:      min,
390
		Ttls:        p.ttls,
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
	}
	if len(p.rtts) > 0 {
		s.AvgRtt = total / time.Duration(len(p.rtts))
		var sumsquares time.Duration
		for _, rtt := range p.rtts {
			sumsquares += (rtt - s.AvgRtt) * (rtt - s.AvgRtt)
		}
		s.StdDevRtt = time.Duration(math.Sqrt(
			float64(sumsquares / time.Duration(len(p.rtts)))))
	}
	return &s
}

func (p *Pinger) recvICMP(
	conn *icmp.PacketConn,
	recv chan<- *packet,
	wg *sync.WaitGroup,
) {
	defer wg.Done()
	for {
		select {
		case <-p.done:
			return
		default:
			bytes := make([]byte, 512)
			conn.SetReadDeadline(time.Now().Add(time.Millisecond * 100))
Clint Armstrong's avatar
Clint Armstrong committed
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
			var n, ttl int
			var err error
			if p.ipv4 {
				var cm *ipv4.ControlMessage
				n, cm, _, err = conn.IPv4PacketConn().ReadFrom(bytes)
				if cm != nil {
					ttl = cm.TTL
				}
			} else {
				var cm *ipv6.ControlMessage
				n, cm, _, err = conn.IPv6PacketConn().ReadFrom(bytes)
				if cm != nil {
					ttl = cm.HopLimit
				}
			}
432
433
434
435
436
437
438
439
440
441
442
443
			if err != nil {
				if neterr, ok := err.(*net.OpError); ok {
					if neterr.Timeout() {
						// Read timeout
						continue
					} else {
						close(p.done)
						return
					}
				}
			}

Clint Armstrong's avatar
Clint Armstrong committed
444
			recv <- &packet{bytes: bytes, nbytes: n, ttl: ttl}
445
446
447
448
449
		}
	}
}

func (p *Pinger) processPacket(recv *packet) error {
450
	receivedAt := time.Now()
451
452
453
454
	var bytes []byte
	var proto int
	if p.ipv4 {
		if p.network == "ip" {
Alirie Gray's avatar
Alirie Gray committed
455
			bytes = ipv4Payload(recv)
456
457
458
459
460
461
462
463
464
465
466
467
		} else {
			bytes = recv.bytes
		}
		proto = protocolICMP
	} else {
		bytes = recv.bytes
		proto = protocolIPv6ICMP
	}

	var m *icmp.Message
	var err error
	if m, err = icmp.ParseMessage(proto, bytes[:recv.nbytes]); err != nil {
468
		return fmt.Errorf("error parsing icmp message: %s", err.Error())
469
470
471
472
473
474
475
476
477
478
	}

	if m.Type != ipv4.ICMPTypeEchoReply && m.Type != ipv6.ICMPTypeEchoReply {
		// Not an echo reply, ignore it
		return nil
	}

	outPkt := &Packet{
		Nbytes: recv.nbytes,
		IPAddr: p.ipaddr,
479
		Addr:   p.addr,
Roger Pueyo Centelles's avatar
Roger Pueyo Centelles committed
480
		TTL:    recv.ttl,
481
482
483
484
	}

	switch pkt := m.Body.(type) {
	case *icmp.Echo:
485
486
487
488
489
490
491
492
493
494
495
496

		// If we are priviledged, we can match icmp.ID
		if p.network == "ip" {
			// Check if reply from same ID
			if pkt.ID != p.id {
				return nil
			}
		}

		if len(pkt.Data) < timeSliceLength+trackerLength {
			return fmt.Errorf("insufficient data received; got: %d %v",
				len(pkt.Data), pkt.Data)
497
		}
498
499
500
501
502
503
504
505
506

		tracker := bytesToInt(pkt.Data[timeSliceLength:])
		timestamp := bytesToTime(pkt.Data[:timeSliceLength])

		if tracker != p.Tracker {
			return nil
		}

		outPkt.Rtt = receivedAt.Sub(timestamp)
507
		outPkt.Seq = pkt.Seq
508
		p.PacketsRecv++
509
510
	default:
		// Very bad, not sure how this can happen
511
		return fmt.Errorf("invalid ICMP echo reply; type: '%T', '%v'", pkt, pkt)
512
513
514
	}

	p.rtts = append(p.rtts, outPkt.Rtt)
515
	p.ttls = append(p.ttls, outPkt.TTL)
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
	handler := p.OnRecv
	if handler != nil {
		handler(outPkt)
	}

	return nil
}

func (p *Pinger) sendICMP(conn *icmp.PacketConn) error {
	var typ icmp.Type
	if p.ipv4 {
		typ = ipv4.ICMPTypeEcho
	} else {
		typ = ipv6.ICMPTypeEchoRequest
	}

	var dst net.Addr = p.ipaddr
	if p.network == "udp" {
		dst = &net.UDPAddr{IP: p.ipaddr.IP, Zone: p.ipaddr.Zone}
	}

537
538
539
	t := append(timeToBytes(time.Now()), intToBytes(p.Tracker)...)
	if remainSize := p.Size - timeSliceLength - trackerLength; remainSize > 0 {
		t = append(t, bytes.Repeat([]byte{1}, remainSize)...)
540
	}
541
542
543
544

	body := &icmp.Echo{
		ID:   p.id,
		Seq:  p.sequence,
545
		Data: t,
546
	}
547

548
549
550
551
552
	msg := &icmp.Message{
		Type: typ,
		Code: 0,
		Body: body,
	}
553
554

	msgBytes, err := msg.Marshal(nil)
555
556
557
558
559
	if err != nil {
		return err
	}

	for {
560
		if _, err := conn.WriteTo(msgBytes, dst); err != nil {
561
562
563
564
565
566
			if neterr, ok := err.(*net.OpError); ok {
				if neterr.Err == syscall.ENOBUFS {
					continue
				}
			}
		}
567
568
		p.PacketsSent++
		p.sequence++
569
570
		break
	}
571

572
573
574
	return nil
}

575
576
func (p *Pinger) listen(netProto string) *icmp.PacketConn {
	conn, err := icmp.ListenPacket(netProto, p.Source)
577
578
579
580
581
582
583
584
	if err != nil {
		fmt.Printf("Error listening for ICMP packets: %s\n", err.Error())
		close(p.done)
		return nil
	}
	return conn
}

Alirie Gray's avatar
Alirie Gray committed
585
586
func ipv4Payload(recv *packet) []byte {
	b := recv.bytes
587
588
589
590
	if len(b) < ipv4.HeaderLen {
		return b
	}
	hdrlen := int(b[0]&0x0f) << 2
Alirie Gray's avatar
Alirie Gray committed
591
	recv.nbytes -= hdrlen
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
	return b[hdrlen:]
}

func bytesToTime(b []byte) time.Time {
	var nsec int64
	for i := uint8(0); i < 8; i++ {
		nsec += int64(b[i]) << ((7 - i) * 8)
	}
	return time.Unix(nsec/1000000000, nsec%1000000000)
}

func isIPv4(ip net.IP) bool {
	return len(ip.To4()) == net.IPv4len
}

func isIPv6(ip net.IP) bool {
	return len(ip) == net.IPv6len
}

func timeToBytes(t time.Time) []byte {
	nsec := t.UnixNano()
	b := make([]byte, 8)
	for i := uint8(0); i < 8; i++ {
		b[i] = byte((nsec >> ((7 - i) * 8)) & 0xff)
	}
	return b
}
619
620
621
622
623
624
625
626
627
628

func bytesToInt(b []byte) int64 {
	return int64(binary.BigEndian.Uint64(b))
}

func intToBytes(tracker int64) []byte {
	b := make([]byte, 8)
	binary.BigEndian.PutUint64(b, uint64(tracker))
	return b
}