From f459a3f0ae43eac29b597427be6602970f10334c Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= <peterke@gmail.com>
Date: Wed, 23 Sep 2015 12:39:17 +0300
Subject: [PATCH] eth/downloader: always send termination wakes, clean leftover

---
 eth/downloader/downloader.go | 56 +++++++++++++++++++++++-------------
 1 file changed, 36 insertions(+), 20 deletions(-)

diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index f038e24e4..d1a716c5f 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -154,7 +154,7 @@ type Downloader struct {
 	blockCh   chan blockPack  // [eth/61] Channel receiving inbound blocks
 	headerCh  chan headerPack // [eth/62] Channel receiving inbound block headers
 	bodyCh    chan bodyPack   // [eth/62] Channel receiving inbound block bodies
-	processCh chan bool       // Channel to signal the block fetcher of new or finished work
+	wakeCh    chan bool       // Channel to signal the block/body fetcher of new tasks
 
 	cancelCh   chan struct{} // Channel to cancel mid-flight syncs
 	cancelLock sync.RWMutex  // Lock to protect the cancel channel in delivers
@@ -188,7 +188,7 @@ func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, he
 		blockCh:     make(chan blockPack, 1),
 		headerCh:    make(chan headerPack, 1),
 		bodyCh:      make(chan bodyPack, 1),
-		processCh:   make(chan bool, 1),
+		wakeCh:      make(chan bool, 1),
 	}
 }
 
@@ -282,6 +282,10 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int) error
 	d.queue.Reset()
 	d.peers.Reset()
 
+	select {
+	case <-d.wakeCh:
+	default:
+	}
 	// Create cancel channel for aborting mid-flight
 	d.cancelLock.Lock()
 	d.cancelCh = make(chan struct{})
@@ -633,7 +637,7 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
 				glog.V(logger.Debug).Infof("%v: no available hashes", p)
 
 				select {
-				case d.processCh <- false:
+				case d.wakeCh <- false:
 				case <-d.cancelCh:
 				}
 				// If no hashes were retrieved at all, the peer violated it's TD promise that it had a
@@ -664,12 +668,18 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
 				return errBadPeer
 			}
 			// Notify the block fetcher of new hashes, but stop if queue is full
-			cont := d.queue.Pending() < maxQueuedHashes
-			select {
-			case d.processCh <- cont:
-			default:
-			}
-			if !cont {
+			if d.queue.Pending() < maxQueuedHashes {
+				// We still have hashes to fetch, send continuation wake signal (potential)
+				select {
+				case d.wakeCh <- true:
+				default:
+				}
+			} else {
+				// Hash limit reached, send a termination wake signal (enforced)
+				select {
+				case d.wakeCh <- false:
+				case <-d.cancelCh:
+				}
 				return nil
 			}
 			// Queue not yet full, fetch the next batch
@@ -766,7 +776,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
 			default:
 			}
 
-		case cont := <-d.processCh:
+		case cont := <-d.wakeCh:
 			// The hash fetcher sent a continuation flag, check if it's done
 			if !cont {
 				finished = true
@@ -1053,7 +1063,7 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
 				glog.V(logger.Debug).Infof("%v: no available headers", p)
 
 				select {
-				case d.processCh <- false:
+				case d.wakeCh <- false:
 				case <-d.cancelCh:
 				}
 				// If no headers were retrieved at all, the peer violated it's TD promise that it had a
@@ -1084,12 +1094,18 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
 				return errBadPeer
 			}
 			// Notify the block fetcher of new headers, but stop if queue is full
-			cont := d.queue.Pending() < maxQueuedHeaders
-			select {
-			case d.processCh <- cont:
-			default:
-			}
-			if !cont {
+			if d.queue.Pending() < maxQueuedHeaders {
+				// We still have headers to fetch, send continuation wake signal (potential)
+				select {
+				case d.wakeCh <- true:
+				default:
+				}
+			} else {
+				// Header limit reached, send a termination wake signal (enforced)
+				select {
+				case d.wakeCh <- false:
+				case <-d.cancelCh:
+				}
 				return nil
 			}
 			// Queue not yet full, fetch the next batch
@@ -1104,8 +1120,8 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
 
 			// Finish the sync gracefully instead of dumping the gathered data though
 			select {
-			case d.processCh <- false:
-			default:
+			case d.wakeCh <- false:
+			case <-d.cancelCh:
 			}
 			return nil
 		}
@@ -1199,7 +1215,7 @@ func (d *Downloader) fetchBodies(from uint64) error {
 			default:
 			}
 
-		case cont := <-d.processCh:
+		case cont := <-d.wakeCh:
 			// The header fetcher sent a continuation flag, check if it's done
 			if !cont {
 				finished = true
-- 
GitLab