diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 153427ee48e38c4addbb170757f1b0644c9e8d6a..5fa18a2e308d3ca6f168ddbe1045505fc4b053a4 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -74,7 +74,6 @@ var (
 	errBadPeer            = errors.New("action from bad peer ignored")
 	errStallingPeer       = errors.New("peer is stalling")
 	errNoPeers            = errors.New("no peers to keep download active")
-	errPendingQueue       = errors.New("pending items in queue")
 	errTimeout            = errors.New("timeout")
 	errEmptyHashSet       = errors.New("empty hash set by peer")
 	errEmptyHeaderSet     = errors.New("empty header set by peer")
@@ -90,6 +89,7 @@ var (
 	errCancelBodyFetch    = errors.New("block body download canceled (requested)")
 	errCancelReceiptFetch = errors.New("receipt download canceled (requested)")
 	errCancelStateFetch   = errors.New("state data download canceled (requested)")
+	errCancelProcessing   = errors.New("processing canceled (requested)")
 	errNoSyncActive       = errors.New("no sync active")
 )
 
@@ -129,7 +129,6 @@ type Downloader struct {
 	// Status
 	synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
 	synchronising   int32
-	processing      int32
 	notified        int32
 
 	// Channels
@@ -215,7 +214,7 @@ func (d *Downloader) Progress() (uint64, uint64, uint64) {
 
 // Synchronising returns whether the downloader is currently retrieving blocks.
 func (d *Downloader) Synchronising() bool {
-	return atomic.LoadInt32(&d.synchronising) > 0 || atomic.LoadInt32(&d.processing) > 0
+	return atomic.LoadInt32(&d.synchronising) > 0
 }
 
 // RegisterPeer injects a new download peer into the set of block source to be
@@ -263,9 +262,6 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode
 		glog.V(logger.Debug).Infof("Removing peer %v: %v", id, err)
 		d.dropPeer(id)
 
-	case errPendingQueue:
-		glog.V(logger.Debug).Infoln("Synchronisation aborted:", err)
-
 	default:
 		glog.V(logger.Warn).Infof("Synchronisation failed: %v", err)
 	}
@@ -290,10 +286,6 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
 	if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
 		glog.V(logger.Info).Infoln("Block synchronisation started")
 	}
-	// Abort if the queue still contains some leftover data
-	if d.queue.GetHeadResult() != nil {
-		return errPendingQueue
-	}
 	// Reset the queue, peer set and wake channels to clean any internal leftover state
 	d.queue.Reset()
 	d.peers.Reset()
@@ -335,7 +327,6 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
 	defer func() {
 		// reset on error
 		if err != nil {
-			d.cancel()
 			d.mux.Post(FailedEvent{err})
 		} else {
 			d.mux.Post(DoneEvent{})
@@ -365,23 +356,15 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
 		d.syncStatsChainHeight = latest
 		d.syncStatsLock.Unlock()
 
-		// Initiate the sync using a  concurrent hash and block retrieval algorithm
+		// Initiate the sync using a concurrent hash and block retrieval algorithm
+		d.queue.Prepare(origin+1, d.mode, 0)
 		if d.syncInitHook != nil {
 			d.syncInitHook(origin, latest)
 		}
-		d.queue.Prepare(origin+1, d.mode, 0)
-
-		errc := make(chan error, 2)
-		go func() { errc <- d.fetchHashes61(p, td, origin+1) }()
-		go func() { errc <- d.fetchBlocks61(origin + 1) }()
-
-		// If any fetcher fails, cancel the other
-		if err := <-errc; err != nil {
-			d.cancel()
-			<-errc
-			return err
-		}
-		return <-errc
+		return d.spawnSync(
+			func() error { return d.fetchHashes61(p, td, origin+1) },
+			func() error { return d.fetchBlocks61(origin + 1) },
+		)
 
 	case p.version >= 62:
 		// Look up the sync boundaries: the common ancestor and the target block
@@ -405,7 +388,6 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
 		switch d.mode {
 		case LightSync:
 			pivot = latest
-
 		case FastSync:
 			// Calculate the new fast/slow sync pivot point
 			pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval)))
@@ -426,34 +408,51 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
 			glog.V(logger.Debug).Infof("Fast syncing until pivot block #%d", pivot)
 		}
 		d.queue.Prepare(origin+1, d.mode, pivot)
-
 		if d.syncInitHook != nil {
 			d.syncInitHook(origin, latest)
 		}
-		errc := make(chan error, 4)
-		go func() { errc <- d.fetchHeaders(p, td, origin+1) }() // Headers are always retrieved
-		go func() { errc <- d.fetchBodies(origin + 1) }()       // Bodies are retrieved during normal and fast sync
-		go func() { errc <- d.fetchReceipts(origin + 1) }()     // Receipts are retrieved during fast sync
-		go func() { errc <- d.fetchNodeData() }()               // Node state data is retrieved during fast sync
-
-		// If any fetcher fails, cancel the others
-		var fail error
-		for i := 0; i < cap(errc); i++ {
-			if err := <-errc; err != nil {
-				if fail == nil {
-					fail = err
-					d.cancel()
-				}
-			}
-		}
-		return fail
+		return d.spawnSync(
+			func() error { return d.fetchHeaders(p, td, origin+1) }, // Headers are always retrieved
+			func() error { return d.fetchBodies(origin + 1) },       // Bodies are retrieved during normal and fast sync
+			func() error { return d.fetchReceipts(origin + 1) },     // Receipts are retrieved during fast sync
+			func() error { return d.fetchNodeData() },               // Node state data is retrieved during fast sync
+		)
 
 	default:
 		// Something very wrong, stop right here
 		glog.V(logger.Error).Infof("Unsupported eth protocol: %d", p.version)
 		return errBadPeer
 	}
-	return nil
+}
+
+// spawnSync runs d.process and all given fetcher functions to completion in
+// separate goroutines, returning the first error that appears.
+func (d *Downloader) spawnSync(fetchers ...func() error) error {
+	var wg sync.WaitGroup
+	errc := make(chan error, len(fetchers)+1)
+	wg.Add(len(fetchers) + 1)
+	go func() { defer wg.Done(); errc <- d.process() }()
+	for _, fn := range fetchers {
+		fn := fn
+		go func() { defer wg.Done(); errc <- fn() }()
+	}
+	// Wait for the first error, then terminate the others.
+	var err error
+	for i := 0; i < len(fetchers)+1; i++ {
+		if i == len(fetchers) {
+			// Close the queue when all fetchers have exited.
+			// This will cause the block processor to end when
+			// it has processed the queue.
+			d.queue.Close()
+		}
+		if err = <-errc; err != nil {
+			break
+		}
+	}
+	d.queue.Close()
+	d.cancel()
+	wg.Wait()
+	return err
 }
 
 // cancel cancels all of the operations and resets the queue. It returns true
@@ -470,12 +469,10 @@ func (d *Downloader) cancel() {
 		}
 	}
 	d.cancelLock.Unlock()
-
-	// Reset the queue
-	d.queue.Reset()
 }
 
 // Terminate interrupts the downloader, canceling all pending operations.
+// The downloader cannot be reused after calling Terminate.
 func (d *Downloader) Terminate() {
 	atomic.StoreInt32(&d.interrupt, 1)
 	d.cancel()
@@ -495,15 +492,6 @@ func (d *Downloader) fetchHeight61(p *peer) (uint64, error) {
 		case <-d.cancelCh:
 			return 0, errCancelBlockFetch
 
-		case <-d.headerCh:
-			// Out of bounds eth/62 block headers received, ignore them
-
-		case <-d.bodyCh:
-			// Out of bounds eth/62 block bodies received, ignore them
-
-		case <-d.hashCh:
-			// Out of bounds hashes received, ignore them
-
 		case packet := <-d.blockCh:
 			// Discard anything not from the origin peer
 			if packet.PeerId() != p.id {
@@ -521,6 +509,16 @@ func (d *Downloader) fetchHeight61(p *peer) (uint64, error) {
 		case <-timeout:
 			glog.V(logger.Debug).Infof("%v: head block timeout", p)
 			return 0, errTimeout
+
+		case <-d.hashCh:
+			// Out of bounds hashes received, ignore them
+
+		case <-d.headerCh:
+		case <-d.bodyCh:
+		case <-d.stateCh:
+		case <-d.receiptCh:
+			// Ignore eth/{62,63} packets because this is eth/61.
+			// These can arrive as a late delivery from a previous sync.
 		}
 	}
 }
@@ -571,18 +569,19 @@ func (d *Downloader) findAncestor61(p *peer) (uint64, error) {
 				}
 			}
 
+		case <-timeout:
+			glog.V(logger.Debug).Infof("%v: head hash timeout", p)
+			return 0, errTimeout
+
 		case <-d.blockCh:
 			// Out of bounds blocks received, ignore them
 
 		case <-d.headerCh:
-			// Out of bounds eth/62 block headers received, ignore them
-
 		case <-d.bodyCh:
-			// Out of bounds eth/62 block bodies received, ignore them
-
-		case <-timeout:
-			glog.V(logger.Debug).Infof("%v: head hash timeout", p)
-			return 0, errTimeout
+		case <-d.stateCh:
+		case <-d.receiptCh:
+			// Ignore eth/{62,63} packets because this is eth/61.
+			// These can arrive as a late delivery from a previous sync.
 		}
 	}
 	// If the head fetch already found an ancestor, return
@@ -631,18 +630,19 @@ func (d *Downloader) findAncestor61(p *peer) (uint64, error) {
 				}
 				start = check
 
+			case <-timeout:
+				glog.V(logger.Debug).Infof("%v: search hash timeout", p)
+				return 0, errTimeout
+
 			case <-d.blockCh:
 				// Out of bounds blocks received, ignore them
 
 			case <-d.headerCh:
-				// Out of bounds eth/62 block headers received, ignore them
-
 			case <-d.bodyCh:
-				// Out of bounds eth/62 block bodies received, ignore them
-
-			case <-timeout:
-				glog.V(logger.Debug).Infof("%v: search hash timeout", p)
-				return 0, errTimeout
+			case <-d.stateCh:
+			case <-d.receiptCh:
+				// Ignore eth/{62,63} packets because this is eth/61.
+				// These can arrive as a late delivery from a previous sync.
 			}
 		}
 	}
@@ -676,12 +676,6 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
 		case <-d.cancelCh:
 			return errCancelHashFetch
 
-		case <-d.headerCh:
-			// Out of bounds eth/62 block headers received, ignore them
-
-		case <-d.bodyCh:
-			// Out of bounds eth/62 block bodies received, ignore them
-
 		case packet := <-d.hashCh:
 			// Make sure the active peer is giving us the hashes
 			if packet.PeerId() != p.id {
@@ -750,6 +744,13 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
 			glog.V(logger.Debug).Infof("%v: hash request timed out", p)
 			hashTimeoutMeter.Mark(1)
 			return errTimeout
+
+		case <-d.headerCh:
+		case <-d.bodyCh:
+		case <-d.stateCh:
+		case <-d.receiptCh:
+			// Ignore eth/{62,63} packets because this is eth/61.
+			// These can arrive as a late delivery from a previous sync.
 		}
 	}
 }
@@ -774,12 +775,6 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
 		case <-d.cancelCh:
 			return errCancelBlockFetch
 
-		case <-d.headerCh:
-			// Out of bounds eth/62 block headers received, ignore them
-
-		case <-d.bodyCh:
-			// Out of bounds eth/62 block bodies received, ignore them
-
 		case packet := <-d.blockCh:
 			// If the peer was previously banned and failed to deliver it's pack
 			// in a reasonable time frame, ignore it's message.
@@ -800,7 +795,6 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
 					peer.Promote()
 					peer.SetBlocksIdle()
 					glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blocks))
-					go d.process()
 
 				case errInvalidChain:
 					// The hash chain is invalid (blocks are not ordered properly), abort
@@ -826,7 +820,6 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
 					peer.Demote()
 					peer.SetBlocksIdle()
 					glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err)
-					go d.process()
 				}
 			}
 			// Blocks arrived, try to update the progress
@@ -909,6 +902,13 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
 			if !throttled && !d.queue.InFlightBlocks() && len(idles) == total {
 				return errPeersUnavailable
 			}
+
+		case <-d.headerCh:
+		case <-d.bodyCh:
+		case <-d.stateCh:
+		case <-d.receiptCh:
+			// Ignore eth/{62,63} packets because this is eth/61.
+			// These can arrive as a late delivery from a previous sync.
 		}
 	}
 }
@@ -941,18 +941,19 @@ func (d *Downloader) fetchHeight(p *peer) (uint64, error) {
 			}
 			return headers[0].Number.Uint64(), nil
 
+		case <-timeout:
+			glog.V(logger.Debug).Infof("%v: head header timeout", p)
+			return 0, errTimeout
+
 		case <-d.bodyCh:
-			// Out of bounds block bodies received, ignore them
+		case <-d.stateCh:
+		case <-d.receiptCh:
+			// Out of bounds delivery, ignore
 
 		case <-d.hashCh:
-			// Out of bounds eth/61 hashes received, ignore them
-
 		case <-d.blockCh:
-			// Out of bounds eth/61 blocks received, ignore them
-
-		case <-timeout:
-			glog.V(logger.Debug).Infof("%v: head header timeout", p)
-			return 0, errTimeout
+			// Ignore eth/61 packets because this is eth/62+.
+			// These can arrive as a late delivery from a previous sync.
 		}
 	}
 }
@@ -1008,18 +1009,19 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) {
 				}
 			}
 
+		case <-timeout:
+			glog.V(logger.Debug).Infof("%v: head header timeout", p)
+			return 0, errTimeout
+
 		case <-d.bodyCh:
-			// Out of bounds block bodies received, ignore them
+		case <-d.stateCh:
+		case <-d.receiptCh:
+			// Out of bounds delivery, ignore
 
 		case <-d.hashCh:
-			// Out of bounds eth/61 hashes received, ignore them
-
 		case <-d.blockCh:
-			// Out of bounds eth/61 blocks received, ignore them
-
-		case <-timeout:
-			glog.V(logger.Debug).Infof("%v: head header timeout", p)
-			return 0, errTimeout
+			// Ignore eth/61 packets because this is eth/62+.
+			// These can arrive as a late delivery from a previous sync.
 		}
 	}
 	// If the head fetch already found an ancestor, return
@@ -1068,18 +1070,19 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) {
 				}
 				start = check
 
+			case <-timeout:
+				glog.V(logger.Debug).Infof("%v: search header timeout", p)
+				return 0, errTimeout
+
 			case <-d.bodyCh:
-				// Out of bounds block bodies received, ignore them
+			case <-d.stateCh:
+			case <-d.receiptCh:
+				// Out of bounds delivery, ignore
 
 			case <-d.hashCh:
-				// Out of bounds eth/61 hashes received, ignore them
-
 			case <-d.blockCh:
-				// Out of bounds eth/61 blocks received, ignore them
-
-			case <-timeout:
-				glog.V(logger.Debug).Infof("%v: search header timeout", p)
-				return 0, errTimeout
+				// Ignore eth/61 packets because this is eth/62+.
+				// These can arrive as a late delivery from a previous sync.
 			}
 		}
 	}
@@ -1141,12 +1144,6 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
 		case <-d.cancelCh:
 			return errCancelHeaderFetch
 
-		case <-d.hashCh:
-			// Out of bounds eth/61 hashes received, ignore them
-
-		case <-d.blockCh:
-			// Out of bounds eth/61 blocks received, ignore them
-
 		case packet := <-d.headerCh:
 			// Make sure the active peer is giving us the headers
 			if packet.PeerId() != p.id {
@@ -1268,6 +1265,11 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
 				}
 			}
 			return nil
+
+		case <-d.hashCh:
+		case <-d.blockCh:
+			// Ignore eth/61 packets because this is eth/62+.
+			// These can arrive as a late delivery from a previous sync.
 		}
 	}
 }
@@ -1336,10 +1338,8 @@ func (d *Downloader) fetchNodeData() error {
 					d.cancel()
 					return
 				}
-				// Processing succeeded, notify state fetcher and processor of continuation
-				if d.queue.PendingNodeData() == 0 {
-					go d.process()
-				} else {
+				// Processing succeeded, notify state fetcher of continuation
+				if d.queue.PendingNodeData() > 0 {
 					select {
 					case d.stateWakeCh <- true:
 					default:
@@ -1348,7 +1348,6 @@ func (d *Downloader) fetchNodeData() error {
 				// Log a message to the user and return
 				d.syncStatsLock.Lock()
 				defer d.syncStatsLock.Unlock()
-
 				d.syncStatsStateDone += uint64(delivered)
 				glog.V(logger.Info).Infof("imported %d state entries in %v: processed %d in total", delivered, time.Since(start), d.syncStatsStateDone)
 			})
@@ -1391,12 +1390,6 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
 		case <-d.cancelCh:
 			return errCancel
 
-		case <-d.hashCh:
-			// Out of bounds eth/61 hashes received, ignore them
-
-		case <-d.blockCh:
-			// Out of bounds eth/61 blocks received, ignore them
-
 		case packet := <-deliveryCh:
 			// If the peer was previously banned and failed to deliver it's pack
 			// in a reasonable time frame, ignore it's message.
@@ -1415,7 +1408,6 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
 					peer.Promote()
 					setIdle(peer)
 					glog.V(logger.Detail).Infof("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind))
-					go d.process()
 
 				case errInvalidChain:
 					// The hash chain is invalid (blocks are not ordered properly), abort
@@ -1441,7 +1433,6 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
 					peer.Demote()
 					setIdle(peer)
 					glog.V(logger.Detail).Infof("%s: %s delivery partially failed: %v", peer, strings.ToLower(kind), err)
-					go d.process()
 				}
 			}
 			// Blocks assembled, try to update the progress
@@ -1508,7 +1499,6 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
 				}
 				if progress {
 					progressed = true
-					go d.process()
 				}
 				if request == nil {
 					continue
@@ -1540,51 +1530,23 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
 			if !progressed && !throttled && !running && len(idles) == total && pending() > 0 {
 				return errPeersUnavailable
 			}
+
+		case <-d.hashCh:
+		case <-d.blockCh:
+			// Ignore eth/61 packets because this is eth/62+.
+			// These can arrive as a late delivery from a previous sync.
 		}
 	}
 }
 
 // process takes fetch results from the queue and tries to import them into the
-// chain. The type of import operation will depend on the result contents:
-//  -
-//
-// The algorithmic flow is as follows:
-//  - The `processing` flag is swapped to 1 to ensure singleton access
-//  - The current `cancel` channel is retrieved to detect sync abortions
-//  - Blocks are iteratively taken from the cache and inserted into the chain
-//  - When the cache becomes empty, insertion stops
-//  - The `processing` flag is swapped back to 0
-//  - A post-exit check is made whether new blocks became available
-//     - This step is important: it handles a potential race condition between
-//       checking for no more work, and releasing the processing "mutex". In
-//       between these state changes, a block may have arrived, but a processing
-//       attempt denied, so we need to re-enter to ensure the block isn't left
-//       to idle in the cache.
-func (d *Downloader) process() {
-	// Make sure only one goroutine is ever allowed to process blocks at once
-	if !atomic.CompareAndSwapInt32(&d.processing, 0, 1) {
-		return
-	}
-	// If the processor just exited, but there are freshly pending items, try to
-	// reenter. This is needed because the goroutine spinned up for processing
-	// the fresh results might have been rejected entry to to this present thread
-	// not yet releasing the `processing` state.
-	defer func() {
-		if atomic.LoadInt32(&d.interrupt) == 0 && d.queue.GetHeadResult() != nil {
-			d.process()
-		}
-	}()
-	// Release the lock upon exit (note, before checking for reentry!)
-	// the import statistics to zero.
-	defer atomic.StoreInt32(&d.processing, 0)
-
-	// Repeat the processing as long as there are results to process
+// chain. The type of import operation will depend on the result contents.
+func (d *Downloader) process() error {
+	pivot := d.queue.FastSyncPivot()
 	for {
-		// Fetch the next batch of results
-		pivot := d.queue.FastSyncPivot() // Fetch pivot before results to prevent reset race
-		results := d.queue.TakeResults()
+		results := d.queue.WaitResults()
 		if len(results) == 0 {
-			return
+			return nil // queue empty
 		}
 		if d.chainInsertHook != nil {
 			d.chainInsertHook(results)
@@ -1597,7 +1559,7 @@ func (d *Downloader) process() {
 		for len(results) != 0 {
 			// Check for any termination requests
 			if atomic.LoadInt32(&d.interrupt) == 1 {
-				return
+				return errCancelProcessing
 			}
 			// Retrieve the a batch of results to import
 			var (
@@ -1633,8 +1595,7 @@ func (d *Downloader) process() {
 			}
 			if err != nil {
 				glog.V(logger.Debug).Infof("Result #%d [%x…] processing failed: %v", results[index].Header.Number, results[index].Header.Hash().Bytes()[:4], err)
-				d.cancel()
-				return
+				return err
 			}
 			// Shift the results to the next batch
 			results = results[items:]
@@ -1685,19 +1646,16 @@ func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, i
 			dropMeter.Mark(int64(packet.Items()))
 		}
 	}()
-	// Make sure the downloader is active
-	if atomic.LoadInt32(&d.synchronising) == 0 {
-		return errNoSyncActive
-	}
 	// Deliver or abort if the sync is canceled while queuing
 	d.cancelLock.RLock()
 	cancel := d.cancelCh
 	d.cancelLock.RUnlock()
-
+	if cancel == nil {
+		return errNoSyncActive
+	}
 	select {
 	case destCh <- packet:
 		return nil
-
 	case <-cancel:
 		return errNoSyncActive
 	}
diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go
index ef6f74a6baf4a5322fa9b5256bda34a6b860e4eb..cfcc8a2ef92101ad90f348a2d0793d45c107f7a7 100644
--- a/eth/downloader/downloader_test.go
+++ b/eth/downloader/downloader_test.go
@@ -169,17 +169,7 @@ func (dl *downloadTester) sync(id string, td *big.Int, mode SyncMode) error {
 		}
 	}
 	dl.lock.RUnlock()
-
-	err := dl.downloader.synchronise(id, hash, td, mode)
-	for {
-		// If the queue is empty and processing stopped, break
-		if dl.downloader.queue.Idle() && atomic.LoadInt32(&dl.downloader.processing) == 0 {
-			break
-		}
-		// Otherwise sleep a bit and retry
-		time.Sleep(time.Millisecond)
-	}
-	return err
+	return dl.downloader.synchronise(id, hash, td, mode)
 }
 
 // hasHeader checks if a header is present in the testers canonical chain.
@@ -701,6 +691,8 @@ func TestCanonicalSynchronisation64Fast(t *testing.T)  { testCanonicalSynchronis
 func TestCanonicalSynchronisation64Light(t *testing.T) { testCanonicalSynchronisation(t, 64, LightSync) }
 
 func testCanonicalSynchronisation(t *testing.T, protocol int, mode SyncMode) {
+	t.Parallel()
+
 	// Create a small enough block chain to download
 	targetBlocks := blockCacheLimit - 15
 	hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
@@ -725,6 +717,8 @@ func TestThrottling64Full(t *testing.T) { testThrottling(t, 64, FullSync) }
 func TestThrottling64Fast(t *testing.T) { testThrottling(t, 64, FastSync) }
 
 func testThrottling(t *testing.T, protocol int, mode SyncMode) {
+	t.Parallel()
+
 	// Create a long block chain to download and the tester
 	targetBlocks := 8 * blockCacheLimit
 	hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
@@ -757,8 +751,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) {
 		for start := time.Now(); time.Since(start) < time.Second; {
 			time.Sleep(25 * time.Millisecond)
 
-			tester.lock.RLock()
-			tester.downloader.queue.lock.RLock()
+			tester.lock.Lock()
+			tester.downloader.queue.lock.Lock()
 			cached = len(tester.downloader.queue.blockDonePool)
 			if mode == FastSync {
 				if receipts := len(tester.downloader.queue.receiptDonePool); receipts < cached {
@@ -769,8 +763,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) {
 			}
 			frozen = int(atomic.LoadUint32(&blocked))
 			retrieved = len(tester.ownBlocks)
-			tester.downloader.queue.lock.RUnlock()
-			tester.lock.RUnlock()
+			tester.downloader.queue.lock.Unlock()
+			tester.lock.Unlock()
 
 			if cached == blockCacheLimit || retrieved+cached+frozen == targetBlocks+1 {
 				break
@@ -810,6 +804,8 @@ func TestForkedSynchronisation64Fast(t *testing.T)  { testForkedSynchronisation(
 func TestForkedSynchronisation64Light(t *testing.T) { testForkedSynchronisation(t, 64, LightSync) }
 
 func testForkedSynchronisation(t *testing.T, protocol int, mode SyncMode) {
+	t.Parallel()
+
 	// Create a long enough forked chain
 	common, fork := MaxHashFetch, 2*MaxHashFetch
 	hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil)
@@ -833,6 +829,7 @@ func testForkedSynchronisation(t *testing.T, protocol int, mode SyncMode) {
 
 // Tests that an inactive downloader will not accept incoming hashes and blocks.
 func TestInactiveDownloader61(t *testing.T) {
+	t.Parallel()
 	tester := newTester()
 
 	// Check that neither hashes nor blocks are accepted
@@ -847,6 +844,7 @@ func TestInactiveDownloader61(t *testing.T) {
 // Tests that an inactive downloader will not accept incoming block headers and
 // bodies.
 func TestInactiveDownloader62(t *testing.T) {
+	t.Parallel()
 	tester := newTester()
 
 	// Check that neither block headers nor bodies are accepted
@@ -861,6 +859,7 @@ func TestInactiveDownloader62(t *testing.T) {
 // Tests that an inactive downloader will not accept incoming block headers,
 // bodies and receipts.
 func TestInactiveDownloader63(t *testing.T) {
+	t.Parallel()
 	tester := newTester()
 
 	// Check that neither block headers nor bodies are accepted
@@ -885,6 +884,8 @@ func TestCancel64Fast(t *testing.T)  { testCancel(t, 64, FastSync) }
 func TestCancel64Light(t *testing.T) { testCancel(t, 64, LightSync) }
 
 func testCancel(t *testing.T, protocol int, mode SyncMode) {
+	t.Parallel()
+
 	// Create a small enough block chain to download and the tester
 	targetBlocks := blockCacheLimit - 15
 	if targetBlocks >= MaxHashFetch {
@@ -923,6 +924,8 @@ func TestMultiSynchronisation64Fast(t *testing.T)  { testMultiSynchronisation(t,
 func TestMultiSynchronisation64Light(t *testing.T) { testMultiSynchronisation(t, 64, LightSync) }
 
 func testMultiSynchronisation(t *testing.T, protocol int, mode SyncMode) {
+	t.Parallel()
+
 	// Create various peers with various parts of the chain
 	targetPeers := 8
 	targetBlocks := targetPeers*blockCacheLimit - 15
@@ -950,6 +953,8 @@ func TestMultiProtoSynchronisation64Fast(t *testing.T)  { testMultiProtoSync(t,
 func TestMultiProtoSynchronisation64Light(t *testing.T) { testMultiProtoSync(t, 64, LightSync) }
 
 func testMultiProtoSync(t *testing.T, protocol int, mode SyncMode) {
+	t.Parallel()
+
 	// Create a small enough block chain to download
 	targetBlocks := blockCacheLimit - 15
 	hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
@@ -986,6 +991,8 @@ func TestEmptyShortCircuit64Fast(t *testing.T)  { testEmptyShortCircuit(t, 64, F
 func TestEmptyShortCircuit64Light(t *testing.T) { testEmptyShortCircuit(t, 64, LightSync) }
 
 func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) {
+	t.Parallel()
+
 	// Create a block chain to download
 	targetBlocks := 2*blockCacheLimit - 15
 	hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
@@ -1037,6 +1044,8 @@ func TestMissingHeaderAttack64Fast(t *testing.T)  { testMissingHeaderAttack(t, 6
 func TestMissingHeaderAttack64Light(t *testing.T) { testMissingHeaderAttack(t, 64, LightSync) }
 
 func testMissingHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
+	t.Parallel()
+
 	// Create a small enough block chain to download
 	targetBlocks := blockCacheLimit - 15
 	hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
@@ -1188,6 +1197,8 @@ func TestHighTDStarvationAttack64Fast(t *testing.T)  { testHighTDStarvationAttac
 func TestHighTDStarvationAttack64Light(t *testing.T) { testHighTDStarvationAttack(t, 64, LightSync) }
 
 func testHighTDStarvationAttack(t *testing.T, protocol int, mode SyncMode) {
+	t.Parallel()
+
 	tester := newTester()
 	hashes, headers, blocks, receipts := makeChain(0, 0, genesis, nil)
 
@@ -1209,25 +1220,26 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) {
 		result error
 		drop   bool
 	}{
-		{nil, false},                  // Sync succeeded, all is well
-		{errBusy, false},              // Sync is already in progress, no problem
-		{errUnknownPeer, false},       // Peer is unknown, was already dropped, don't double drop
-		{errBadPeer, true},            // Peer was deemed bad for some reason, drop it
-		{errStallingPeer, true},       // Peer was detected to be stalling, drop it
-		{errNoPeers, false},           // No peers to download from, soft race, no issue
-		{errPendingQueue, false},      // There are blocks still cached, wait to exhaust, no issue
-		{errTimeout, true},            // No hashes received in due time, drop the peer
-		{errEmptyHashSet, true},       // No hashes were returned as a response, drop as it's a dead end
-		{errEmptyHeaderSet, true},     // No headers were returned as a response, drop as it's a dead end
-		{errPeersUnavailable, true},   // Nobody had the advertised blocks, drop the advertiser
-		{errInvalidChain, true},       // Hash chain was detected as invalid, definitely drop
-		{errInvalidBlock, false},      // A bad peer was detected, but not the sync origin
-		{errInvalidBody, false},       // A bad peer was detected, but not the sync origin
-		{errInvalidReceipt, false},    // A bad peer was detected, but not the sync origin
-		{errCancelHashFetch, false},   // Synchronisation was canceled, origin may be innocent, don't drop
-		{errCancelBlockFetch, false},  // Synchronisation was canceled, origin may be innocent, don't drop
-		{errCancelHeaderFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
-		{errCancelBodyFetch, false},   // Synchronisation was canceled, origin may be innocent, don't drop
+		{nil, false},                   // Sync succeeded, all is well
+		{errBusy, false},               // Sync is already in progress, no problem
+		{errUnknownPeer, false},        // Peer is unknown, was already dropped, don't double drop
+		{errBadPeer, true},             // Peer was deemed bad for some reason, drop it
+		{errStallingPeer, true},        // Peer was detected to be stalling, drop it
+		{errNoPeers, false},            // No peers to download from, soft race, no issue
+		{errTimeout, true},             // No hashes received in due time, drop the peer
+		{errEmptyHashSet, true},        // No hashes were returned as a response, drop as it's a dead end
+		{errEmptyHeaderSet, true},      // No headers were returned as a response, drop as it's a dead end
+		{errPeersUnavailable, true},    // Nobody had the advertised blocks, drop the advertiser
+		{errInvalidChain, true},        // Hash chain was detected as invalid, definitely drop
+		{errInvalidBlock, false},       // A bad peer was detected, but not the sync origin
+		{errInvalidBody, false},        // A bad peer was detected, but not the sync origin
+		{errInvalidReceipt, false},     // A bad peer was detected, but not the sync origin
+		{errCancelHashFetch, false},    // Synchronisation was canceled, origin may be innocent, don't drop
+		{errCancelBlockFetch, false},   // Synchronisation was canceled, origin may be innocent, don't drop
+		{errCancelHeaderFetch, false},  // Synchronisation was canceled, origin may be innocent, don't drop
+		{errCancelBodyFetch, false},    // Synchronisation was canceled, origin may be innocent, don't drop
+		{errCancelReceiptFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+		{errCancelProcessing, false},   // Synchronisation was canceled, origin may be innocent, don't drop
 	}
 	// Run the tests and check disconnection status
 	tester := newTester()
@@ -1261,6 +1273,8 @@ func TestSyncProgress64Fast(t *testing.T)  { testSyncProgress(t, 64, FastSync) }
 func TestSyncProgress64Light(t *testing.T) { testSyncProgress(t, 64, LightSync) }
 
 func testSyncProgress(t *testing.T, protocol int, mode SyncMode) {
+	t.Parallel()
+
 	// Create a small enough block chain to download
 	targetBlocks := blockCacheLimit - 15
 	hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
@@ -1331,6 +1345,8 @@ func TestForkedSyncProgress64Fast(t *testing.T)  { testForkedSyncProgress(t, 64,
 func TestForkedSyncProgress64Light(t *testing.T) { testForkedSyncProgress(t, 64, LightSync) }
 
 func testForkedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
+	t.Parallel()
+
 	// Create a forked chain to simulate origin revertal
 	common, fork := MaxHashFetch, 2*MaxHashFetch
 	hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil)
@@ -1404,6 +1420,8 @@ func TestFailedSyncProgress64Fast(t *testing.T)  { testFailedSyncProgress(t, 64,
 func TestFailedSyncProgress64Light(t *testing.T) { testFailedSyncProgress(t, 64, LightSync) }
 
 func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
+	t.Parallel()
+
 	// Create a small enough block chain to download
 	targetBlocks := blockCacheLimit - 15
 	hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
@@ -1478,6 +1496,8 @@ func TestFakedSyncProgress64Fast(t *testing.T)  { testFakedSyncProgress(t, 64, F
 func TestFakedSyncProgress64Light(t *testing.T) { testFakedSyncProgress(t, 64, LightSync) }
 
 func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
+	t.Parallel()
+
 	// Create a small block chain
 	targetBlocks := blockCacheLimit - 15
 	hashes, headers, blocks, receipts := makeChain(targetBlocks+3, 0, genesis, nil)
@@ -1541,3 +1561,50 @@ func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
 		t.Fatalf("Final progress mismatch: have %v/%v/%v, want 0-%v/%v/%v", origin, current, latest, targetBlocks, targetBlocks, targetBlocks)
 	}
 }
+
+// This test reproduces an issue where unexpected deliveries would
+// block indefinitely if they arrived at the right time.
+func TestDeliverHeadersHang62(t *testing.T)      { testDeliverHeadersHang(t, 62, FullSync) }
+func TestDeliverHeadersHang63Full(t *testing.T)  { testDeliverHeadersHang(t, 63, FullSync) }
+func TestDeliverHeadersHang63Fast(t *testing.T)  { testDeliverHeadersHang(t, 63, FastSync) }
+func TestDeliverHeadersHang64Full(t *testing.T)  { testDeliverHeadersHang(t, 64, FullSync) }
+func TestDeliverHeadersHang64Fast(t *testing.T)  { testDeliverHeadersHang(t, 64, FastSync) }
+func TestDeliverHeadersHang64Light(t *testing.T) { testDeliverHeadersHang(t, 64, LightSync) }
+
+func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) {
+	t.Parallel()
+	hashes, headers, blocks, receipts := makeChain(5, 0, genesis, nil)
+	fakeHeads := []*types.Header{{}, {}, {}, {}}
+	for i := 0; i < 200; i++ {
+		tester := newTester()
+		tester.newPeer("peer", protocol, hashes, headers, blocks, receipts)
+		// Whenever the downloader requests headers, flood it with
+		// a lot of unrequested header deliveries.
+		tester.downloader.peers.peers["peer"].getAbsHeaders = func(from uint64, count, skip int, reverse bool) error {
+			deliveriesDone := make(chan struct{}, 500)
+			for i := 0; i < cap(deliveriesDone); i++ {
+				peer := fmt.Sprintf("fake-peer%d", i)
+				go func() {
+					tester.downloader.DeliverHeaders(peer, fakeHeads)
+					deliveriesDone <- struct{}{}
+				}()
+			}
+			// Deliver the actual requested headers.
+			impl := tester.peerGetAbsHeadersFn("peer", 0)
+			go impl(from, count, skip, reverse)
+			// None of the extra deliveries should block.
+			timeout := time.After(5 * time.Second)
+			for i := 0; i < cap(deliveriesDone); i++ {
+				select {
+				case <-deliveriesDone:
+				case <-timeout:
+					panic("blocked")
+				}
+			}
+			return nil
+		}
+		if err := tester.sync("peer", nil, mode); err != nil {
+			t.Errorf("sync failed: %v", err)
+		}
+	}
+}
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index 1fb5b6e12ff0420d99941f2c3e8fa06631e3d38b..584797d7becd7803dbe61ce43e5efceaa82aa746 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -101,11 +101,14 @@ type queue struct {
 	resultCache  []*fetchResult // Downloaded but not yet delivered fetch results
 	resultOffset uint64         // Offset of the first cached fetch result in the block chain
 
-	lock sync.RWMutex
+	lock   *sync.Mutex
+	active *sync.Cond
+	closed bool
 }
 
 // newQueue creates a new download queue for scheduling block retrieval.
 func newQueue(stateDb ethdb.Database) *queue {
+	lock := new(sync.Mutex)
 	return &queue{
 		hashPool:         make(map[common.Hash]int),
 		hashQueue:        prque.New(),
@@ -122,6 +125,8 @@ func newQueue(stateDb ethdb.Database) *queue {
 		statePendPool:    make(map[string]*fetchRequest),
 		stateDatabase:    stateDb,
 		resultCache:      make([]*fetchResult, blockCacheLimit),
+		active:           sync.NewCond(lock),
+		lock:             lock,
 	}
 }
 
@@ -133,6 +138,7 @@ func (q *queue) Reset() {
 	q.stateSchedLock.Lock()
 	defer q.stateSchedLock.Unlock()
 
+	q.closed = false
 	q.mode = FullSync
 	q.fastSyncPivot = 0
 
@@ -162,18 +168,27 @@ func (q *queue) Reset() {
 	q.resultOffset = 0
 }
 
+// Close marks the end of the sync, unblocking WaitResults.
+// It may be called even if the queue is already closed.
+func (q *queue) Close() {
+	q.lock.Lock()
+	q.closed = true
+	q.lock.Unlock()
+	q.active.Broadcast()
+}
+
 // PendingBlocks retrieves the number of block (body) requests pending for retrieval.
 func (q *queue) PendingBlocks() int {
-	q.lock.RLock()
-	defer q.lock.RUnlock()
+	q.lock.Lock()
+	defer q.lock.Unlock()
 
 	return q.hashQueue.Size() + q.blockTaskQueue.Size()
 }
 
 // PendingReceipts retrieves the number of block receipts pending for retrieval.
 func (q *queue) PendingReceipts() int {
-	q.lock.RLock()
-	defer q.lock.RUnlock()
+	q.lock.Lock()
+	defer q.lock.Unlock()
 
 	return q.receiptTaskQueue.Size()
 }
@@ -192,8 +207,8 @@ func (q *queue) PendingNodeData() int {
 // InFlightBlocks retrieves whether there are block fetch requests currently in
 // flight.
 func (q *queue) InFlightBlocks() bool {
-	q.lock.RLock()
-	defer q.lock.RUnlock()
+	q.lock.Lock()
+	defer q.lock.Unlock()
 
 	return len(q.blockPendPool) > 0
 }
@@ -201,8 +216,8 @@ func (q *queue) InFlightBlocks() bool {
 // InFlightReceipts retrieves whether there are receipt fetch requests currently
 // in flight.
 func (q *queue) InFlightReceipts() bool {
-	q.lock.RLock()
-	defer q.lock.RUnlock()
+	q.lock.Lock()
+	defer q.lock.Unlock()
 
 	return len(q.receiptPendPool) > 0
 }
@@ -210,8 +225,8 @@ func (q *queue) InFlightReceipts() bool {
 // InFlightNodeData retrieves whether there are node data entry fetch requests
 // currently in flight.
 func (q *queue) InFlightNodeData() bool {
-	q.lock.RLock()
-	defer q.lock.RUnlock()
+	q.lock.Lock()
+	defer q.lock.Unlock()
 
 	return len(q.statePendPool)+int(atomic.LoadInt32(&q.stateProcessors)) > 0
 }
@@ -219,8 +234,8 @@ func (q *queue) InFlightNodeData() bool {
 // Idle returns if the queue is fully idle or has some data still inside. This
 // method is used by the tester to detect termination events.
 func (q *queue) Idle() bool {
-	q.lock.RLock()
-	defer q.lock.RUnlock()
+	q.lock.Lock()
+	defer q.lock.Unlock()
 
 	queued := q.hashQueue.Size() + q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + q.stateTaskQueue.Size()
 	pending := len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool)
@@ -237,8 +252,8 @@ func (q *queue) Idle() bool {
 
 // FastSyncPivot retrieves the currently used fast sync pivot point.
 func (q *queue) FastSyncPivot() uint64 {
-	q.lock.RLock()
-	defer q.lock.RUnlock()
+	q.lock.Lock()
+	defer q.lock.Unlock()
 
 	return q.fastSyncPivot
 }
@@ -246,8 +261,8 @@ func (q *queue) FastSyncPivot() uint64 {
 // ShouldThrottleBlocks checks if the download should be throttled (active block (body)
 // fetches exceed block cache).
 func (q *queue) ShouldThrottleBlocks() bool {
-	q.lock.RLock()
-	defer q.lock.RUnlock()
+	q.lock.Lock()
+	defer q.lock.Unlock()
 
 	// Calculate the currently in-flight block (body) requests
 	pending := 0
@@ -261,8 +276,8 @@ func (q *queue) ShouldThrottleBlocks() bool {
 // ShouldThrottleReceipts checks if the download should be throttled (active receipt
 // fetches exceed block cache).
 func (q *queue) ShouldThrottleReceipts() bool {
-	q.lock.RLock()
-	defer q.lock.RUnlock()
+	q.lock.Lock()
+	defer q.lock.Unlock()
 
 	// Calculate the currently in-flight receipt requests
 	pending := 0
@@ -351,91 +366,74 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
 	return inserts
 }
 
-// GetHeadResult retrieves the first fetch result from the cache, or nil if it hasn't
-// been downloaded yet (or simply non existent).
-func (q *queue) GetHeadResult() *fetchResult {
-	q.lock.RLock()
-	defer q.lock.RUnlock()
+// WaitResults retrieves and permanently removes a batch of fetch
+// results from the cache. the result slice will be empty if the queue
+// has been closed.
+func (q *queue) WaitResults() []*fetchResult {
+	q.lock.Lock()
+	defer q.lock.Unlock()
 
-	// If there are no results pending, return nil
-	if len(q.resultCache) == 0 || q.resultCache[0] == nil {
-		return nil
-	}
-	// If the next result is still incomplete, return nil
-	if q.resultCache[0].Pending > 0 {
-		return nil
+	nproc := q.countProcessableItems()
+	for nproc == 0 && !q.closed {
+		q.active.Wait()
+		nproc = q.countProcessableItems()
 	}
-	// If the next result is the fast sync pivot...
-	if q.mode == FastSync && q.resultCache[0].Header.Number.Uint64() == q.fastSyncPivot {
-		// If the pivot state trie is still being pulled, return nil
-		if len(q.stateTaskPool) > 0 {
-			return nil
+	results := make([]*fetchResult, nproc)
+	copy(results, q.resultCache[:nproc])
+	if len(results) > 0 {
+		// Mark results as done before dropping them from the cache.
+		for _, result := range results {
+			hash := result.Header.Hash()
+			delete(q.blockDonePool, hash)
+			delete(q.receiptDonePool, hash)
 		}
-		if q.PendingNodeData() > 0 {
-			return nil
-		}
-		// If the state is done, but not enough post-pivot headers were verified, stall...
-		for i := 0; i < fsHeaderForceVerify; i++ {
-			if i+1 >= len(q.resultCache) || q.resultCache[i+1] == nil {
-				return nil
-			}
+		// Delete the results from the cache and clear the tail.
+		copy(q.resultCache, q.resultCache[nproc:])
+		for i := len(q.resultCache) - nproc; i < len(q.resultCache); i++ {
+			q.resultCache[i] = nil
 		}
+		// Advance the expected block number of the first cache entry.
+		q.resultOffset += uint64(nproc)
 	}
-	return q.resultCache[0]
+	return results
 }
 
-// TakeResults retrieves and permanently removes a batch of fetch results from
-// the cache.
-func (q *queue) TakeResults() []*fetchResult {
-	q.lock.Lock()
-	defer q.lock.Unlock()
-
-	// Accumulate all available results
-	results := []*fetchResult{}
+// countProcessableItems counts the processable items.
+func (q *queue) countProcessableItems() int {
 	for i, result := range q.resultCache {
-		// Stop if no more results are ready
+		// Don't process incomplete or unavailable items.
 		if result == nil || result.Pending > 0 {
-			break
+			return i
 		}
-		// The fast sync pivot block may only be processed after state fetch completes
-		if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot {
-			if len(q.stateTaskPool) > 0 {
-				break
-			}
-			if q.PendingNodeData() > 0 {
-				break
-			}
-			// Even is state fetch is done, ensure post-pivot headers passed verifications
-			safe := true
-			for j := 0; j < fsHeaderForceVerify; j++ {
-				if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil {
-					safe = false
+		// Special handling for the fast-sync pivot block:
+		if q.mode == FastSync {
+			bnum := result.Header.Number.Uint64()
+			if bnum == q.fastSyncPivot {
+				// If the state of the pivot block is not
+				// available yet, we cannot proceed and return 0.
+				//
+				// Stop before processing the pivot block to ensure that
+				// resultCache has space for fsHeaderForceVerify items. Not
+				// doing this could leave us unable to download the required
+				// amount of headers.
+				if i > 0 || len(q.stateTaskPool) > 0 || q.PendingNodeData() > 0 {
+					return i
+				}
+				for j := 0; j < fsHeaderForceVerify; j++ {
+					if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil {
+						return i
+					}
 				}
 			}
-			if !safe {
-				break
+			// If we're just the fast sync pivot, stop as well
+			// because the following batch needs different insertion.
+			// This simplifies handling the switchover in d.process.
+			if bnum == q.fastSyncPivot+1 && i > 0 {
+				return i
 			}
 		}
-		// If we've just inserted the fast sync pivot, stop as the following batch needs different insertion
-		if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot+1 && len(results) > 0 {
-			break
-		}
-		results = append(results, result)
-
-		hash := result.Header.Hash()
-		delete(q.blockDonePool, hash)
-		delete(q.receiptDonePool, hash)
 	}
-	// Delete the results from the slice and let them be garbage collected
-	// without this slice trick the results would stay in memory until nil
-	// would be assigned to them.
-	copy(q.resultCache, q.resultCache[len(results):])
-	for k, n := len(q.resultCache)-len(results), len(q.resultCache); k < n; k++ {
-		q.resultCache[k] = nil
-	}
-	q.resultOffset += uint64(len(results))
-
-	return results
+	return len(q.resultCache)
 }
 
 // ReserveBlocks reserves a set of block hashes for the given peer, skipping any
@@ -584,6 +582,7 @@ func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*typ
 		// If we're the first to request this task, initialise the result container
 		index := int(header.Number.Int64() - int64(q.resultOffset))
 		if index >= len(q.resultCache) || index < 0 {
+			common.Report("index allocation went beyond available resultCache space")
 			return nil, false, errInvalidChain
 		}
 		if q.resultCache[index] == nil {
@@ -617,6 +616,10 @@ func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*typ
 	for _, header := range skip {
 		taskQueue.Push(header, -float32(header.Number.Uint64()))
 	}
+	if progress {
+		// Wake WaitResults, resultCache was modified
+		q.active.Signal()
+	}
 	// Assemble and return the block download request
 	if len(send) == 0 {
 		return nil, progress, nil
@@ -737,7 +740,7 @@ func (q *queue) ExpireNodeData(timeout time.Duration) []string {
 // expire is the generic check that move expired tasks from a pending pool back
 // into a task pool, returning all entities caught with expired tasks.
 //
-// Note, this method expects the queue lock to be already held for writing. The
+// Note, this method expects the queue lock to be already held. The
 // reason the lock is not obtained in here is because the parameters already need
 // to access the queue, so they already need a lock anyway.
 func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) []string {
@@ -813,17 +816,16 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error {
 	for hash, index := range request.Hashes {
 		q.hashQueue.Push(hash, float32(index))
 	}
+	// Wake up WaitResults
+	q.active.Signal()
 	// If none of the blocks were good, it's a stale delivery
 	switch {
 	case len(errs) == 0:
 		return nil
-
 	case len(errs) == 1 && (errs[0] == errInvalidChain || errs[0] == errInvalidBlock):
 		return errs[0]
-
 	case len(errs) == len(blocks):
 		return errStaleDelivery
-
 	default:
 		return fmt.Errorf("multiple failures: %v", errs)
 	}
@@ -915,14 +917,14 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
 			taskQueue.Push(header, -float32(header.Number.Uint64()))
 		}
 	}
+	// Wake up WaitResults
+	q.active.Signal()
 	// If none of the data was good, it's a stale delivery
 	switch {
 	case failure == nil || failure == errInvalidChain:
 		return failure
-
 	case useful:
 		return fmt.Errorf("partial failure: %v", failure)
-
 	default:
 		return errStaleDelivery
 	}
@@ -977,10 +979,8 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i
 	switch {
 	case len(errs) == 0:
 		return nil
-
 	case len(errs) == len(request.Hashes):
 		return errStaleDelivery
-
 	default:
 		return fmt.Errorf("multiple failures: %v", errs)
 	}
@@ -989,6 +989,10 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i
 // deliverNodeData is the asynchronous node data processor that injects a batch
 // of sync results into the state scheduler.
 func (q *queue) deliverNodeData(results []trie.SyncResult, callback func(error, int)) {
+	// Wake up WaitResults after the state has been written because it
+	// might be waiting for the pivot block state to get completed.
+	defer q.active.Signal()
+
 	// Process results one by one to permit task fetches in between
 	for i, result := range results {
 		q.stateSchedLock.Lock()
diff --git a/eth/sync.go b/eth/sync.go
index bbf2abc0486aea7fb6df7ac283ef07093cbc34bc..dd8aef8e43cd78b40e308a0e07f406f03da3231a 100644
--- a/eth/sync.go
+++ b/eth/sync.go
@@ -175,10 +175,6 @@ func (pm *ProtocolManager) synchronise(peer *peer) {
 	}
 	// If fast sync was enabled, and we synced up, disable it
 	if pm.fastSync {
-		// Wait until all pending imports finish processing
-		for pm.downloader.Synchronising() {
-			time.Sleep(100 * time.Millisecond)
-		}
 		// Disable fast sync if we indeed have something in our chain
 		if pm.blockchain.CurrentBlock().NumberU64() > 0 {
 			glog.V(logger.Info).Infof("fast sync complete, auto disabling")