From e33e57684fcd364900a896aa3e759bf4821c02e1 Mon Sep 17 00:00:00 2001
From: Zsolt Felfoldi <zsfelfoldi@gmail.com>
Date: Sat, 12 Nov 2016 21:02:02 +0100
Subject: [PATCH] p2p/discv5: fixed bootnode connect issues

---
 les/handler.go       |  5 ++-
 p2p/discv5/net.go    | 90 ++++++++++++++++++++++++++------------------
 p2p/discv5/table.go  | 76 +++++++++++++++++++++++--------------
 p2p/discv5/ticket.go |  4 +-
 p2p/discv5/udp.go    |  4 ++
 5 files changed, 110 insertions(+), 69 deletions(-)

diff --git a/les/handler.go b/les/handler.go
index a51358676..83d73666f 100644
--- a/les/handler.go
+++ b/les/handler.go
@@ -240,6 +240,7 @@ func (pm *ProtocolManager) findServers() {
 	if pm.p2pServer == nil || pm.topicDisc == nil {
 		return
 	}
+	glog.V(logger.Debug).Infoln("Looking for topic", string(pm.lesTopic))
 	enodes := make(chan string, 100)
 	stop := make(chan struct{})
 	go pm.topicDisc.SearchTopic(pm.lesTopic, stop, enodes)
@@ -280,9 +281,9 @@ func (pm *ProtocolManager) Start(srvr *p2p.Server) {
 	} else {
 		if pm.topicDisc != nil {
 			go func() {
-				glog.V(logger.Debug).Infoln("Starting topic register")
+				glog.V(logger.Debug).Infoln("Starting registering topic", string(pm.lesTopic))
 				pm.topicDisc.RegisterTopic(pm.lesTopic, pm.quitSync)
-				glog.V(logger.Debug).Infoln("Stopped topic register")
+				glog.V(logger.Debug).Infoln("Stopped registering topic", string(pm.lesTopic))
 			}()
 		}
 		go func() {
diff --git a/p2p/discv5/net.go b/p2p/discv5/net.go
index b08cd2bc7..5aedb39a0 100644
--- a/p2p/discv5/net.go
+++ b/p2p/discv5/net.go
@@ -41,9 +41,10 @@ var (
 )
 
 const (
-	autoRefreshInterval = 1 * time.Hour
-	seedCount           = 30
-	seedMaxAge          = 5 * 24 * time.Hour
+	autoRefreshInterval   = 1 * time.Hour
+	bucketRefreshInterval = 1 * time.Minute
+	seedCount             = 30
+	seedMaxAge            = 5 * 24 * time.Hour
 )
 
 const testTopic = "foo"
@@ -82,7 +83,6 @@ type Network struct {
 	tableOpResp      chan struct{}
 	topicRegisterReq chan topicRegisterReq
 	topicSearchReq   chan topicSearchReq
-	bucketFillChn    chan chan struct{}
 
 	// State of the main loop.
 	tab           *Table
@@ -169,7 +169,6 @@ func newNetwork(conn transport, ourPubkey ecdsa.PublicKey, natm nat.Interface, d
 		queryReq:         make(chan *findnodeQuery),
 		topicRegisterReq: make(chan topicRegisterReq),
 		topicSearchReq:   make(chan topicSearchReq),
-		bucketFillChn:    make(chan chan struct{}, 1),
 		nodes:            make(map[NodeID]*Node),
 	}
 	go net.loop()
@@ -353,8 +352,9 @@ func (net *Network) reqTableOp(f func()) (called bool) {
 
 func (net *Network) loop() {
 	var (
-		refreshTimer = time.NewTicker(autoRefreshInterval)
-		refreshDone  chan struct{} // closed when the 'refresh' lookup has ended
+		refreshTimer       = time.NewTicker(autoRefreshInterval)
+		bucketRefreshTimer = time.NewTimer(bucketRefreshInterval)
+		refreshDone        chan struct{} // closed when the 'refresh' lookup has ended
 	)
 
 	// Tracking the next ticket to register.
@@ -389,6 +389,7 @@ func (net *Network) loop() {
 		topicRegisterLookupDone   chan []*Node
 		topicRegisterLookupTick   = time.NewTimer(0)
 		topicSearchLookupTarget   lookupInfo
+		searchReqWhenRefreshDone  []topicSearchReq
 	)
 	topicSearchLookupDone := make(chan []*Node, 1)
 	<-topicRegisterLookupTick.C
@@ -406,6 +407,7 @@ loop:
 
 		// Ingress packet handling.
 		case pkt := <-net.read:
+			//fmt.Println("read", pkt.ev)
 			debugLog("<-net.read")
 			n := net.internNode(&pkt)
 			prestate := n.state
@@ -503,14 +505,18 @@ loop:
 			net.conn.sendTopicRegister(nextTicket.t.node, nextTicket.t.topics, nextTicket.idx, nextTicket.t.pong)
 
 		case req := <-net.topicSearchReq:
-			debugLog("<-net.topicSearchReq")
-			if req.found == nil {
-				net.ticketStore.removeSearchTopic(req.topic)
-				continue
-			}
-			net.ticketStore.addSearchTopic(req.topic, req.found)
-			if (topicSearchLookupTarget.target == common.Hash{}) {
-				topicSearchLookupDone <- nil
+			if refreshDone == nil {
+				debugLog("<-net.topicSearchReq")
+				if req.found == nil {
+					net.ticketStore.removeSearchTopic(req.topic)
+					continue
+				}
+				net.ticketStore.addSearchTopic(req.topic, req.found)
+				if (topicSearchLookupTarget.target == common.Hash{}) {
+					topicSearchLookupDone <- nil
+				}
+			} else {
+				searchReqWhenRefreshDone = append(searchReqWhenRefreshDone, req)
 			}
 
 		case nodes := <-topicSearchLookupDone:
@@ -519,7 +525,14 @@ loop:
 				net.ping(n, n.addr())
 				return n.pingEcho
 			}, func(n *Node, topic Topic) []byte {
-				return net.conn.send(n, topicQueryPacket, topicQuery{Topic: topic}) // TODO: set expiration
+				if n.state == known {
+					return net.conn.send(n, topicQueryPacket, topicQuery{Topic: topic}) // TODO: set expiration
+				} else {
+					if n.state == unknown {
+						net.ping(n, n.addr())
+					}
+					return nil
+				}
 			})
 			topicSearchLookupTarget = net.ticketStore.nextSearchLookup()
 			target := topicSearchLookupTarget.target
@@ -564,9 +577,12 @@ loop:
 				refreshDone = make(chan struct{})
 				net.refresh(refreshDone)
 			}
-		case doneChn := <-net.bucketFillChn:
-			debugLog("bucketFill")
-			net.bucketFill(doneChn)
+		case <-bucketRefreshTimer.C:
+			target := net.tab.chooseBucketRefreshTarget()
+			go func() {
+				net.lookup(target, false)
+				bucketRefreshTimer.Reset(bucketRefreshInterval)
+			}()
 		case newNursery := <-net.refreshReq:
 			debugLog("<-net.refreshReq")
 			if newNursery != nil {
@@ -580,6 +596,13 @@ loop:
 		case <-refreshDone:
 			debugLog("<-net.refreshDone")
 			refreshDone = nil
+			list := searchReqWhenRefreshDone
+			searchReqWhenRefreshDone = nil
+			go func() {
+				for _, req := range list {
+					net.topicSearchReq <- req
+				}
+			}()
 		}
 	}
 	debugLog("loop stopped")
@@ -643,28 +666,13 @@ func (net *Network) refresh(done chan<- struct{}) {
 	}()
 }
 
-func (net *Network) bucketFill(done chan<- struct{}) {
-	target := net.tab.chooseBucketFillTarget()
-	go func() {
-		net.lookup(target, false)
-		close(done)
-	}()
-}
-
-func (net *Network) BucketFill() {
-	done := make(chan struct{})
-	select {
-	case net.bucketFillChn <- done:
-		<-done
-	case <-net.closed:
-		close(done)
-	}
-}
-
 // Node Interning.
 
 func (net *Network) internNode(pkt *ingressPacket) *Node {
 	if n := net.nodes[pkt.remoteID]; n != nil {
+		n.IP = pkt.remoteAddr.IP
+		n.UDP = uint16(pkt.remoteAddr.Port)
+		n.TCP = uint16(pkt.remoteAddr.Port)
 		return n
 	}
 	n := NewNode(pkt.remoteID, pkt.remoteAddr.IP, uint16(pkt.remoteAddr.Port), uint16(pkt.remoteAddr.Port))
@@ -967,8 +975,10 @@ func init() {
 
 // handle processes packets sent by n and events related to n.
 func (net *Network) handle(n *Node, ev nodeEvent, pkt *ingressPacket) error {
+	//fmt.Println("handle", n.addr().String(), n.state, ev)
 	if pkt != nil {
 		if err := net.checkPacket(n, ev, pkt); err != nil {
+			//fmt.Println("check err:", err)
 			return err
 		}
 		// Start the background expiration goroutine after the first
@@ -985,6 +995,7 @@ func (net *Network) handle(n *Node, ev nodeEvent, pkt *ingressPacket) error {
 	}
 	next, err := n.state.handle(net, n, ev, pkt)
 	net.transition(n, next)
+	//fmt.Println("new state:", n.state)
 	return err
 }
 
@@ -1040,6 +1051,11 @@ func (net *Network) abortTimedEvent(n *Node, ev nodeEvent) {
 }
 
 func (net *Network) ping(n *Node, addr *net.UDPAddr) {
+	//fmt.Println("ping", n.addr().String(), n.ID.String(), n.sha.Hex())
+	if n.pingEcho != nil || n.ID == net.tab.self.ID {
+		//fmt.Println(" not sent")
+		return
+	}
 	debugLog(fmt.Sprintf("ping(node = %x)", n.ID[:8]))
 	n.pingTopics = net.ticketStore.regTopicSet()
 	n.pingEcho = net.conn.sendPing(n, addr, n.pingTopics)
diff --git a/p2p/discv5/table.go b/p2p/discv5/table.go
index 5c8c50706..2cf05009c 100644
--- a/p2p/discv5/table.go
+++ b/p2p/discv5/table.go
@@ -25,6 +25,7 @@ package discv5
 import (
 	"crypto/rand"
 	"encoding/binary"
+	"fmt"
 	"net"
 	"sort"
 
@@ -64,42 +65,54 @@ func newTable(ourID NodeID, ourAddr *net.UDPAddr) *Table {
 	return tab
 }
 
-func (tab *Table) chooseBucketFillTarget() common.Hash {
-	bucketCount := nBuckets
-	for bucketCount > 0 && len(tab.buckets[nBuckets-bucketCount].entries) == 0 {
-		bucketCount--
+const printTable = false
+
+// chooseBucketRefreshTarget selects random refresh targets to keep all Kademlia
+// buckets filled with live connections and keep the network topology healthy.
+// This requires selecting addresses closer to our own with a higher probability
+// in order to refresh closer buckets too.
+//
+// This algorithm approximates the distance distribution of existing nodes in the
+// table by selecting a random node from the table and selecting a target address
+// with a distance less than twice of that of the selected node.
+// This algorithm will be improved later to specifically target the least recently
+// used buckets.
+func (tab *Table) chooseBucketRefreshTarget() common.Hash {
+	entries := 0
+	if printTable {
+		fmt.Println()
 	}
-	var bucket int
-	for {
-		// select a target hash that could go into a certain randomly selected bucket
-		// buckets are chosen with an even chance out of the existing ones that contain
-		// less that bucketSize entries, plus a potential new one beyond these
-		bucket = nBuckets - 1 - int(randUint(uint32(bucketCount+1)))
-		if bucket == bucketCount || len(tab.buckets[bucket].entries) < bucketSize {
-			break
+	for i, b := range tab.buckets {
+		entries += len(b.entries)
+		if printTable {
+			for _, e := range b.entries {
+				fmt.Println(i, e.state, e.addr().String(), e.ID.String(), e.sha.Hex())
+			}
 		}
 	}
 
-	// calculate target that has the desired log distance from our own address hash
-	target := tab.self.sha.Bytes()
-	prefix := binary.BigEndian.Uint64(target[0:8])
-	shift := uint(nBuckets - 1 - bucket)
-	if bucket != bucketCount {
-		shift++
+	prefix := binary.BigEndian.Uint64(tab.self.sha[0:8])
+	dist := ^uint64(0)
+	entry := int(randUint(uint32(entries + 1)))
+	for _, b := range tab.buckets {
+		if entry < len(b.entries) {
+			n := b.entries[entry]
+			dist = binary.BigEndian.Uint64(n.sha[0:8]) ^ prefix
+			break
+		}
+		entry -= len(b.entries)
 	}
-	var b [8]byte
-	rand.Read(b[:])
-	rnd := binary.BigEndian.Uint64(b[:])
-	rndMask := (^uint64(0)) >> shift
-	addrMask := ^rndMask
-	xorMask := uint64(0)
-	if bucket != bucketCount {
-		xorMask = rndMask + 1
+
+	ddist := ^uint64(0)
+	if dist+dist > dist {
+		ddist = dist
 	}
-	prefix = (prefix&addrMask ^ xorMask) | (rnd & rndMask)
-	binary.BigEndian.PutUint64(target[0:8], prefix)
+	targetPrefix := prefix ^ randUint64n(ddist)
+
+	var target common.Hash
+	binary.BigEndian.PutUint64(target[0:8], targetPrefix)
 	rand.Read(target[8:])
-	return common.BytesToHash(target)
+	return target
 }
 
 // readRandomNodes fills the given slice with random nodes from the
@@ -175,6 +188,10 @@ func (tab *Table) closest(target common.Hash, nresults int) *nodesByDistance {
 // bucket has space available, adding the node succeeds immediately.
 // Otherwise, the node is added to the replacement cache for the bucket.
 func (tab *Table) add(n *Node) (contested *Node) {
+	//fmt.Println("add", n.addr().String(), n.ID.String(), n.sha.Hex())
+	if n.ID == tab.self.ID {
+		return
+	}
 	b := tab.buckets[logdist(tab.self.sha, n.sha)]
 	switch {
 	case b.bump(n):
@@ -228,6 +245,7 @@ outer:
 // delete removes an entry from the node table (used to evacuate
 // failed/non-bonded discovery peers).
 func (tab *Table) delete(node *Node) {
+	//fmt.Println("delete", node.addr().String(), node.ID.String(), node.sha.Hex())
 	bucket := tab.buckets[logdist(tab.self.sha, node.sha)]
 	for i := range bucket.entries {
 		if bucket.entries[i].ID == node.ID {
diff --git a/p2p/discv5/ticket.go b/p2p/discv5/ticket.go
index 3ee2f7fc4..202504314 100644
--- a/p2p/discv5/ticket.go
+++ b/p2p/discv5/ticket.go
@@ -525,7 +525,9 @@ func (s *ticketStore) searchLookupDone(lookup lookupInfo, nodes []*Node, ping fu
 			} // else {
 			if s.canQueryTopic(n, lookup.topic) {
 				hash := query(n, lookup.topic)
-				s.addTopicQuery(common.BytesToHash(hash), n, lookup)
+				if hash != nil {
+					s.addTopicQuery(common.BytesToHash(hash), n, lookup)
+				}
 			}
 			//}
 		}
diff --git a/p2p/discv5/udp.go b/p2p/discv5/udp.go
index af961984c..46d3200bf 100644
--- a/p2p/discv5/udp.go
+++ b/p2p/discv5/udp.go
@@ -336,14 +336,17 @@ func (t *udp) sendTopicNodes(remote *Node, queryHash common.Hash, nodes []*Node)
 }
 
 func (t *udp) sendPacket(toid NodeID, toaddr *net.UDPAddr, ptype byte, req interface{}) (hash []byte, err error) {
+	//fmt.Println("sendPacket", nodeEvent(ptype), toaddr.String(), toid.String())
 	packet, hash, err := encodePacket(t.priv, ptype, req)
 	if err != nil {
+		//fmt.Println(err)
 		return hash, err
 	}
 	glog.V(logger.Detail).Infof(">>> %v to %x@%v\n", nodeEvent(ptype), toid[:8], toaddr)
 	if _, err = t.conn.WriteToUDP(packet, toaddr); err != nil {
 		glog.V(logger.Detail).Infoln("UDP send failed:", err)
 	}
+	//fmt.Println(err)
 	return hash, err
 }
 
@@ -406,6 +409,7 @@ func (t *udp) handlePacket(from *net.UDPAddr, buf []byte) error {
 	pkt := ingressPacket{remoteAddr: from}
 	if err := decodePacket(buf, &pkt); err != nil {
 		glog.V(logger.Debug).Infof("Bad packet from %v: %v\n", from, err)
+		//fmt.Println("bad packet", err)
 		return err
 	}
 	t.net.reqReadPacket(pkt)
-- 
GitLab