From fdccce781e94819ec9dc13ef6540a33efd3b26c6 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= <peterke@gmail.com>
Date: Mon, 8 Jun 2015 19:24:56 +0300
Subject: [PATCH] eth: fetch announced hashes from origin, periodically

---
 eth/handler.go | 54 ++++++++++++++++++++++------------------------
 eth/sync.go    | 58 +++++++++++++++++++++++++++++++++++++++++++++++---
 2 files changed, 80 insertions(+), 32 deletions(-)

diff --git a/eth/handler.go b/eth/handler.go
index 63ebc4bdd..7e9ec593a 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -21,7 +21,8 @@ import (
 const (
 	forceSyncCycle      = 10 * time.Second       // Time interval to force syncs, even if few peers are available
 	blockProcCycle      = 500 * time.Millisecond // Time interval to check for new blocks to process
-	blockArrivalTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested
+	notifyCheckCycle    = 100 * time.Millisecond // Time interval to allow hash notifies to fulfill before hard fetching
+	notifyArriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested
 	minDesiredPeerCount = 5                      // Amount of peers desired to start syncing
 	blockProcAmount     = 256
 )
@@ -57,6 +58,7 @@ type ProtocolManager struct {
 	minedBlockSub event.Subscription
 
 	newPeerCh chan *peer
+	newHashCh chan []*blockAnnounce
 	quitSync  chan struct{}
 	// wait group is used for graceful shutdowns during downloading
 	// and processing
@@ -74,6 +76,7 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo
 		downloader: downloader,
 		peers:      newPeerSet(),
 		newPeerCh:  make(chan *peer, 1),
+		newHashCh:  make(chan []*blockAnnounce, 1),
 		quitSync:   make(chan struct{}),
 	}
 
@@ -121,7 +124,8 @@ func (pm *ProtocolManager) Start() {
 	pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
 	go pm.minedBroadcastLoop()
 
-	go pm.update()
+	go pm.syncer()
+	go pm.fetcher()
 }
 
 func (pm *ProtocolManager) Stop() {
@@ -302,32 +306,24 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
 			p.blockHashes.Add(hash)
 			p.recentHash = hash
 		}
-		// Wait a bit for potentially receiving the blocks, fetch if not
-		go func() {
-			time.Sleep(blockArrivalTimeout)
-
-			// Drop all the hashes that are already known
-			unknown := make([]common.Hash, 0, len(hashes))
-			for _, hash := range hashes {
-				if !self.chainman.HasBlock(hash) {
-					unknown = append(unknown, hash)
-				}
-			}
-			if len(unknown) == 0 {
-				return
-			}
-			// Retrieve all the unknown hashes
-			if err := p.requestBlocks(unknown); err != nil {
-				glog.V(logger.Debug).Infof("%s: failed to request blocks: %v", p.id, err)
+		// Schedule all the unknown hashes for retrieval
+		unknown := make([]common.Hash, 0, len(hashes))
+		for _, hash := range hashes {
+			if !self.chainman.HasBlock(hash) {
+				unknown = append(unknown, hash)
 			}
-			if glog.V(logger.Detail) {
-				hashes := make([]string, len(unknown))
-				for i, hash := range unknown {
-					hashes[i] = fmt.Sprintf("%x", hash[:4])
-				}
-				glog.Infof("%s: requested blocks explicitly: %v", p.id, hashes)
+		}
+		announces := make([]*blockAnnounce, len(unknown))
+		for i, hash := range unknown {
+			announces[i] = &blockAnnounce{
+				hash: hash,
+				peer: p,
+				time: time.Now(),
 			}
-		}()
+		}
+		if len(announces) > 0 {
+			self.newHashCh <- announces
+		}
 
 	case NewBlockMsg:
 		var request newBlockMsgData
@@ -407,13 +403,13 @@ func (pm *ProtocolManager) BroadcastBlock(hash common.Hash, block *types.Block)
 	split := int(math.Sqrt(float64(len(peers))))
 
 	transfer := peers[:split]
-	nofity := peers[split:]
+	notify := peers[split:]
 
 	// Send out the data transfers and the notifications
-	for _, peer := range nofity {
+	for _, peer := range notify {
 		peer.sendNewBlockHashes([]common.Hash{hash})
 	}
-	glog.V(logger.Detail).Infoln("broadcast hash to", len(nofity), "peers.")
+	glog.V(logger.Detail).Infoln("broadcast hash to", len(notify), "peers.")
 
 	for _, peer := range transfer {
 		peer.sendNewBlock(block)
diff --git a/eth/sync.go b/eth/sync.go
index 56084f2f0..1a1cbdb47 100644
--- a/eth/sync.go
+++ b/eth/sync.go
@@ -5,15 +5,67 @@ import (
 	"sync/atomic"
 	"time"
 
+	"github.com/ethereum/go-ethereum/common"
 	"github.com/ethereum/go-ethereum/core/types"
 	"github.com/ethereum/go-ethereum/eth/downloader"
 	"github.com/ethereum/go-ethereum/logger"
 	"github.com/ethereum/go-ethereum/logger/glog"
 )
 
-// update periodically tries to synchronise with the network, both downloading
-// hashes and blocks as well as retrieving cached ones.
-func (pm *ProtocolManager) update() {
+// blockAnnounce is the hash notification of the availability of a new block in
+// the network.
+type blockAnnounce struct {
+	hash common.Hash
+	peer *peer
+	time time.Time
+}
+
+// fetcher is responsible for collecting hash notifications, and periodically
+// checking all unknown ones and individually fetching them.
+func (pm *ProtocolManager) fetcher() {
+	announces := make(map[common.Hash]*blockAnnounce)
+	request := make(map[*peer][]common.Hash)
+	cycle := time.Tick(notifyCheckCycle)
+
+	// Iterate the block fetching until a quit is requested
+	for {
+		select {
+		case notifications := <-pm.newHashCh:
+			// A batch of hashes the notified, schedule them for retrieval
+			glog.V(logger.Detail).Infof("Scheduling %d hash announces from %s", len(notifications), notifications[0].peer.id)
+			for _, announce := range notifications {
+				announces[announce.hash] = announce
+			}
+
+		case <-cycle:
+			// Check if any notified blocks failed to arrive
+			for hash, announce := range announces {
+				if time.Since(announce.time) > notifyArriveTimeout {
+					if !pm.chainman.HasBlock(hash) {
+						request[announce.peer] = append(request[announce.peer], hash)
+					}
+					delete(announces, hash)
+				}
+			}
+			if len(request) == 0 {
+				break
+			}
+			// Send out all block requests
+			for peer, hashes := range request {
+				glog.V(logger.Detail).Infof("Fetching specific %d blocks from %s", len(hashes), peer.id)
+				peer.requestBlocks(hashes)
+			}
+			request = make(map[*peer][]common.Hash)
+
+		case <-pm.quitSync:
+			return
+		}
+	}
+}
+
+// syncer is responsible for periodically synchronising with the network, both
+// downloading hashes and blocks as well as retrieving cached ones.
+func (pm *ProtocolManager) syncer() {
 	forceSync := time.Tick(forceSyncCycle)
 	blockProc := time.Tick(blockProcCycle)
 	blockProcPend := int32(0)
-- 
GitLab