diff --git a/core/types/block.go b/core/types/block.go
index f6f5f14903a65905220e42d4d89b12129d9cb453..8316cd7f3a28cf1ef56a8a5539141f75debe4649 100644
--- a/core/types/block.go
+++ b/core/types/block.go
@@ -147,6 +147,17 @@ func rlpHash(x interface{}) (h common.Hash) {
 	return h
 }
 
+// EmptyBody returns true if there is no additional 'body' to complete the header
+// that is: no transactions and no uncles.
+func (h *Header) EmptyBody() bool {
+	return h.TxHash == EmptyRootHash && h.UncleHash == EmptyUncleHash
+}
+
+// EmptyReceipts returns true if there are no receipts for this header/block.
+func (h *Header) EmptyReceipts() bool {
+	return h.ReceiptHash == EmptyRootHash
+}
+
 // Body is a simple (mutable, non-safe) data container for storing and moving
 // a block's data contents (transactions and uncles) together.
 type Body struct {
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 3a289a9c7fe9ae282e5f799b9f158444989d6aff..f1f0c4ea30f893489ef6f4da075ded4d6245d315 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -219,7 +219,7 @@ func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom,
 		stateBloom:     stateBloom,
 		mux:            mux,
 		checkpoint:     checkpoint,
-		queue:          newQueue(),
+		queue:          newQueue(blockCacheItems),
 		peers:          newPeerSet(),
 		rttEstimate:    uint64(rttMaxEstimate),
 		rttConfidence:  uint64(1000000),
@@ -370,7 +370,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
 		d.stateBloom.Close()
 	}
 	// Reset the queue, peer set and wake channels to clean any internal leftover state
-	d.queue.Reset()
+	d.queue.Reset(blockCacheItems)
 	d.peers.Reset()
 
 	for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
@@ -597,6 +597,9 @@ func (d *Downloader) Terminate() {
 	default:
 		close(d.quitCh)
 	}
+	if d.stateBloom != nil {
+		d.stateBloom.Close()
+	}
 	d.quitLock.Unlock()
 
 	// Cancel any pending download requests
@@ -629,7 +632,7 @@ func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) {
 			// Make sure the peer actually gave something valid
 			headers := packet.(*headerPack).headers
 			if len(headers) != 1 {
-				p.log.Debug("Multiple headers for single request", "headers", len(headers))
+				p.log.Warn("Multiple headers for single request", "headers", len(headers))
 				return nil, fmt.Errorf("%w: multiple headers (%d) for single request", errBadPeer, len(headers))
 			}
 			head := headers[0]
@@ -866,7 +869,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header)
 				// Make sure the peer actually gave something valid
 				headers := packer.(*headerPack).headers
 				if len(headers) != 1 {
-					p.log.Debug("Multiple headers for single request", "headers", len(headers))
+					p.log.Warn("Multiple headers for single request", "headers", len(headers))
 					return 0, fmt.Errorf("%w: multiple headers (%d) for single request", errBadPeer, len(headers))
 				}
 				arrived = true
@@ -890,7 +893,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header)
 				}
 				header := d.lightchain.GetHeaderByHash(h) // Independent of sync mode, header surely exists
 				if header.Number.Uint64() != check {
-					p.log.Debug("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check)
+					p.log.Warn("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check)
 					return 0, fmt.Errorf("%w: non-requested header (%d)", errBadPeer, header.Number)
 				}
 				start = check
@@ -1106,17 +1109,18 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) (
 			pack := packet.(*headerPack)
 			return d.queue.DeliverHeaders(pack.peerID, pack.headers, d.headerProcCh)
 		}
-		expire   = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) }
-		throttle = func() bool { return false }
-		reserve  = func(p *peerConnection, count int) (*fetchRequest, bool, error) {
-			return d.queue.ReserveHeaders(p, count), false, nil
+		expire  = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) }
+		reserve = func(p *peerConnection, count int) (*fetchRequest, bool, bool) {
+			return d.queue.ReserveHeaders(p, count), false, false
 		}
 		fetch    = func(p *peerConnection, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) }
 		capacity = func(p *peerConnection) int { return p.HeaderCapacity(d.requestRTT()) }
-		setIdle  = func(p *peerConnection, accepted int) { p.SetHeadersIdle(accepted) }
+		setIdle  = func(p *peerConnection, accepted int, deliveryTime time.Time) {
+			p.SetHeadersIdle(accepted, deliveryTime)
+		}
 	)
 	err := d.fetchParts(d.headerCh, deliver, d.queue.headerContCh, expire,
-		d.queue.PendingHeaders, d.queue.InFlightHeaders, throttle, reserve,
+		d.queue.PendingHeaders, d.queue.InFlightHeaders, reserve,
 		nil, fetch, d.queue.CancelHeaders, capacity, d.peers.HeaderIdlePeers, setIdle, "headers")
 
 	log.Debug("Skeleton fill terminated", "err", err)
@@ -1139,10 +1143,10 @@ func (d *Downloader) fetchBodies(from uint64) error {
 		expire   = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) }
 		fetch    = func(p *peerConnection, req *fetchRequest) error { return p.FetchBodies(req) }
 		capacity = func(p *peerConnection) int { return p.BlockCapacity(d.requestRTT()) }
-		setIdle  = func(p *peerConnection, accepted int) { p.SetBodiesIdle(accepted) }
+		setIdle  = func(p *peerConnection, accepted int, deliveryTime time.Time) { p.SetBodiesIdle(accepted, deliveryTime) }
 	)
 	err := d.fetchParts(d.bodyCh, deliver, d.bodyWakeCh, expire,
-		d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies,
+		d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ReserveBodies,
 		d.bodyFetchHook, fetch, d.queue.CancelBodies, capacity, d.peers.BodyIdlePeers, setIdle, "bodies")
 
 	log.Debug("Block body download terminated", "err", err)
@@ -1163,10 +1167,12 @@ func (d *Downloader) fetchReceipts(from uint64) error {
 		expire   = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) }
 		fetch    = func(p *peerConnection, req *fetchRequest) error { return p.FetchReceipts(req) }
 		capacity = func(p *peerConnection) int { return p.ReceiptCapacity(d.requestRTT()) }
-		setIdle  = func(p *peerConnection, accepted int) { p.SetReceiptsIdle(accepted) }
+		setIdle  = func(p *peerConnection, accepted int, deliveryTime time.Time) {
+			p.SetReceiptsIdle(accepted, deliveryTime)
+		}
 	)
 	err := d.fetchParts(d.receiptCh, deliver, d.receiptWakeCh, expire,
-		d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts,
+		d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ReserveReceipts,
 		d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "receipts")
 
 	log.Debug("Transaction receipt download terminated", "err", err)
@@ -1199,9 +1205,9 @@ func (d *Downloader) fetchReceipts(from uint64) error {
 //  - setIdle:     network callback to set a peer back to idle and update its estimated capacity (traffic shaping)
 //  - kind:        textual label of the type being downloaded to display in log messages
 func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
-	expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, error),
+	expire func() map[string]int, pending func() int, inFlight func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, bool),
 	fetchHook func([]*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int,
-	idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int), kind string) error {
+	idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int, time.Time), kind string) error {
 
 	// Create a ticker to detect expired retrieval tasks
 	ticker := time.NewTicker(100 * time.Millisecond)
@@ -1217,6 +1223,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
 			return errCanceled
 
 		case packet := <-deliveryCh:
+			deliveryTime := time.Now()
 			// If the peer was previously banned and failed to deliver its pack
 			// in a reasonable time frame, ignore its message.
 			if peer := d.peers.Peer(packet.PeerId()); peer != nil {
@@ -1229,7 +1236,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
 				// caused by a timed out request which came through in the end), set it to
 				// idle. If the delivery's stale, the peer should have already been idled.
 				if !errors.Is(err, errStaleDelivery) {
-					setIdle(peer, accepted)
+					setIdle(peer, accepted, deliveryTime)
 				}
 				// Issue a log to the user to see what's going on
 				switch {
@@ -1282,7 +1289,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
 					// how response times reacts, to it always requests one more than the minimum (i.e. min 2).
 					if fails > 2 {
 						peer.log.Trace("Data delivery timed out", "type", kind)
-						setIdle(peer, 0)
+						setIdle(peer, 0, time.Now())
 					} else {
 						peer.log.Debug("Stalling delivery, dropping", "type", kind)
 
@@ -1317,27 +1324,27 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
 			// Send a download request to all idle peers, until throttled
 			progressed, throttled, running := false, false, inFlight()
 			idles, total := idle()
-
+			pendCount := pending()
 			for _, peer := range idles {
 				// Short circuit if throttling activated
-				if throttle() {
-					throttled = true
+				if throttled {
 					break
 				}
 				// Short circuit if there is no more available task.
-				if pending() == 0 {
+				if pendCount = pending(); pendCount == 0 {
 					break
 				}
 				// Reserve a chunk of fetches for a peer. A nil can mean either that
 				// no more headers are available, or that the peer is known not to
 				// have them.
-				request, progress, err := reserve(peer, capacity(peer))
-				if err != nil {
-					return err
-				}
+				request, progress, throttle := reserve(peer, capacity(peer))
 				if progress {
 					progressed = true
 				}
+				if throttle {
+					throttled = true
+					throttleCounter.Inc(1)
+				}
 				if request == nil {
 					continue
 				}
@@ -1362,7 +1369,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
 			}
 			// Make sure that we have peers available for fetching. If all peers have been tried
 			// and all failed throw an error
-			if !progressed && !throttled && !running && len(idles) == total && pending() > 0 {
+			if !progressed && !throttled && !running && len(idles) == total && pendCount > 0 {
 				return errPeersUnavailable
 			}
 		}
@@ -1374,8 +1381,11 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
 // queue until the stream ends or a failure occurs.
 func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) error {
 	// Keep a count of uncertain headers to roll back
-	var rollback []*types.Header
-	mode := d.getMode()
+	var (
+		rollback    []*types.Header
+		rollbackErr error
+		mode        = d.getMode()
+	)
 	defer func() {
 		if len(rollback) > 0 {
 			// Flatten the headers and roll them back
@@ -1397,7 +1407,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
 			log.Warn("Rolled back headers", "count", len(hashes),
 				"header", fmt.Sprintf("%d->%d", lastHeader, d.lightchain.CurrentHeader().Number),
 				"fast", fmt.Sprintf("%d->%d", lastFastBlock, curFastBlock),
-				"block", fmt.Sprintf("%d->%d", lastBlock, curBlock))
+				"block", fmt.Sprintf("%d->%d", lastBlock, curBlock), "reason", rollbackErr)
 		}
 	}()
 
@@ -1407,6 +1417,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
 	for {
 		select {
 		case <-d.cancelCh:
+			rollbackErr = errCanceled
 			return errCanceled
 
 		case headers := <-d.headerProcCh:
@@ -1460,6 +1471,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
 				// Terminate if something failed in between processing chunks
 				select {
 				case <-d.cancelCh:
+					rollbackErr = errCanceled
 					return errCanceled
 				default:
 				}
@@ -1484,11 +1496,12 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
 						frequency = 1
 					}
 					if n, err := d.lightchain.InsertHeaderChain(chunk, frequency); err != nil {
+						rollbackErr = err
 						// If some headers were inserted, add them too to the rollback list
 						if n > 0 {
 							rollback = append(rollback, chunk[:n]...)
 						}
-						log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "err", err)
+						log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "parent", chunk[n].ParentHash, "err", err)
 						return fmt.Errorf("%w: %v", errInvalidChain, err)
 					}
 					// All verifications passed, store newly found uncertain headers
@@ -1503,6 +1516,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
 					for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
 						select {
 						case <-d.cancelCh:
+							rollbackErr = errCanceled
 							return errCanceled
 						case <-time.After(time.Second):
 						}
@@ -1510,7 +1524,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
 					// Otherwise insert the headers for content retrieval
 					inserts := d.queue.Schedule(chunk, origin)
 					if len(inserts) != len(chunk) {
-						log.Debug("Stale headers")
+						rollbackErr = fmt.Errorf("stale headers: len inserts %v len(chunk) %v", len(inserts), len(chunk))
 						return fmt.Errorf("%w: stale headers", errBadPeer)
 					}
 				}
@@ -1680,6 +1694,14 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error {
 }
 
 func splitAroundPivot(pivot uint64, results []*fetchResult) (p *fetchResult, before, after []*fetchResult) {
+	if len(results) == 0 {
+		return nil, nil, nil
+	}
+	if lastNum := results[len(results)-1].Header.Number.Uint64(); lastNum < pivot {
+		// the pivot is somewhere in the future
+		return nil, results, nil
+	}
+	// This can also be optimized, but only happens very seldom
 	for _, result := range results {
 		num := result.Header.Number.Uint64()
 		switch {
diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go
index f9092175cb7162cd80b8373b06bdaa03b0f8c0f4..5786a37448e0de315d917365d01ffd6d35a8045c 100644
--- a/eth/downloader/downloader_test.go
+++ b/eth/downloader/downloader_test.go
@@ -297,14 +297,13 @@ func (dl *downloadTester) InsertChain(blocks types.Blocks) (i int, err error) {
 		} else if _, err := dl.stateDb.Get(parent.Root().Bytes()); err != nil {
 			return i, fmt.Errorf("InsertChain: unknown parent state %x: %v", parent.Root(), err)
 		}
-		if _, ok := dl.ownHeaders[block.Hash()]; !ok {
+		if hdr := dl.getHeaderByHash(block.Hash()); hdr == nil {
 			dl.ownHashes = append(dl.ownHashes, block.Hash())
 			dl.ownHeaders[block.Hash()] = block.Header()
 		}
 		dl.ownBlocks[block.Hash()] = block
 		dl.ownReceipts[block.Hash()] = make(types.Receipts, 0)
 		dl.stateDb.Put(block.Root().Bytes(), []byte{0x00})
-
 		td := dl.getTd(block.ParentHash())
 		dl.ownChainTd[block.Hash()] = new(big.Int).Add(td, block.Difficulty())
 	}
@@ -538,7 +537,6 @@ func TestThrottling64Fast(t *testing.T) { testThrottling(t, 64, FastSync) }
 func testThrottling(t *testing.T, protocol int, mode SyncMode) {
 	t.Parallel()
 	tester := newTester()
-	defer tester.terminate()
 
 	// Create a long block chain to download and the tester
 	targetBlocks := testChainBase.len() - 1
@@ -570,31 +568,32 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) {
 			time.Sleep(25 * time.Millisecond)
 
 			tester.lock.Lock()
-			tester.downloader.queue.lock.Lock()
-			cached = len(tester.downloader.queue.blockDonePool)
-			if mode == FastSync {
-				if receipts := len(tester.downloader.queue.receiptDonePool); receipts < cached {
-					cached = receipts
-				}
+			{
+				tester.downloader.queue.resultCache.lock.Lock()
+				cached = tester.downloader.queue.resultCache.countCompleted()
+				tester.downloader.queue.resultCache.lock.Unlock()
+				frozen = int(atomic.LoadUint32(&blocked))
+				retrieved = len(tester.ownBlocks)
+
 			}
-			frozen = int(atomic.LoadUint32(&blocked))
-			retrieved = len(tester.ownBlocks)
-			tester.downloader.queue.lock.Unlock()
 			tester.lock.Unlock()
 
-			if cached == blockCacheItems || cached == blockCacheItems-reorgProtHeaderDelay || retrieved+cached+frozen == targetBlocks+1 || retrieved+cached+frozen == targetBlocks+1-reorgProtHeaderDelay {
+			if cached == blockCacheItems ||
+				cached == blockCacheItems-reorgProtHeaderDelay ||
+				retrieved+cached+frozen == targetBlocks+1 ||
+				retrieved+cached+frozen == targetBlocks+1-reorgProtHeaderDelay {
 				break
 			}
 		}
 		// Make sure we filled up the cache, then exhaust it
 		time.Sleep(25 * time.Millisecond) // give it a chance to screw up
-
 		tester.lock.RLock()
 		retrieved = len(tester.ownBlocks)
 		tester.lock.RUnlock()
 		if cached != blockCacheItems && cached != blockCacheItems-reorgProtHeaderDelay && retrieved+cached+frozen != targetBlocks+1 && retrieved+cached+frozen != targetBlocks+1-reorgProtHeaderDelay {
 			t.Fatalf("block count mismatch: have %v, want %v (owned %v, blocked %v, target %v)", cached, blockCacheItems, retrieved, frozen, targetBlocks+1)
 		}
+
 		// Permit the blocked blocks to import
 		if atomic.LoadUint32(&blocked) > 0 {
 			atomic.StoreUint32(&blocked, uint32(0))
@@ -606,6 +605,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) {
 	if err := <-errc; err != nil {
 		t.Fatalf("block synchronization failed: %v", err)
 	}
+	tester.terminate()
+
 }
 
 // Tests that simple synchronization against a forked chain works correctly. In
@@ -628,7 +629,6 @@ func testForkedSync(t *testing.T, protocol int, mode SyncMode) {
 	chainB := testChainForkLightB.shorten(testChainBase.len() + 80)
 	tester.newPeer("fork A", protocol, chainA)
 	tester.newPeer("fork B", protocol, chainB)
-
 	// Synchronise with the peer and make sure all blocks were retrieved
 	if err := tester.sync("fork A", nil, mode); err != nil {
 		t.Fatalf("failed to synchronise blocks: %v", err)
@@ -720,15 +720,12 @@ func TestBoundedHeavyForkedSync64Light(t *testing.T) { testBoundedHeavyForkedSyn
 
 func testBoundedHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) {
 	t.Parallel()
-
 	tester := newTester()
-	defer tester.terminate()
 
 	// Create a long enough forked chain
 	chainA := testChainForkLightA
 	chainB := testChainForkHeavy
 	tester.newPeer("original", protocol, chainA)
-	tester.newPeer("heavy-rewriter", protocol, chainB)
 
 	// Synchronise with the peer and make sure all blocks were retrieved
 	if err := tester.sync("original", nil, mode); err != nil {
@@ -736,10 +733,12 @@ func testBoundedHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) {
 	}
 	assertOwnChain(t, tester, chainA.len())
 
+	tester.newPeer("heavy-rewriter", protocol, chainB)
 	// Synchronise with the second peer and ensure that the fork is rejected to being too old
 	if err := tester.sync("heavy-rewriter", nil, mode); err != errInvalidAncestor {
 		t.Fatalf("sync failure mismatch: have %v, want %v", err, errInvalidAncestor)
 	}
+	tester.terminate()
 }
 
 // Tests that an inactive downloader will not accept incoming block headers and
@@ -1007,7 +1006,6 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) {
 	t.Parallel()
 
 	tester := newTester()
-	defer tester.terminate()
 
 	// Create a small enough block chain to download
 	targetBlocks := 3*fsHeaderSafetyNet + 256 + fsMinFullBlocks
@@ -1087,6 +1085,7 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) {
 			t.Fatalf("synchronised blocks mismatch: have %v, want %v", bs, chain.len())
 		}
 	}
+	tester.terminate()
 }
 
 // Tests that a peer advertising a high TD doesn't get to stall the downloader
@@ -1102,13 +1101,13 @@ func testHighTDStarvationAttack(t *testing.T, protocol int, mode SyncMode) {
 	t.Parallel()
 
 	tester := newTester()
-	defer tester.terminate()
 
 	chain := testChainBase.shorten(1)
 	tester.newPeer("attack", protocol, chain)
 	if err := tester.sync("attack", big.NewInt(1000000), mode); err != errStallingPeer {
 		t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer)
 	}
+	tester.terminate()
 }
 
 // Tests that misbehaving peers are disconnected, whilst behaving ones are not.
diff --git a/eth/downloader/metrics.go b/eth/downloader/metrics.go
index d4eb33794628467716f60073da21932abf069c67..c38732043aa20e020f25bad52b076ee089551396 100644
--- a/eth/downloader/metrics.go
+++ b/eth/downloader/metrics.go
@@ -40,4 +40,6 @@ var (
 
 	stateInMeter   = metrics.NewRegisteredMeter("eth/downloader/states/in", nil)
 	stateDropMeter = metrics.NewRegisteredMeter("eth/downloader/states/drop", nil)
+
+	throttleCounter = metrics.NewRegisteredCounter("eth/downloader/throttle", nil)
 )
diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go
index 852c250dc2a4c73a8313c036de81c3b2703b4c0d..96982fba2825ed0fbe9efd857c8640acdab33473 100644
--- a/eth/downloader/peer.go
+++ b/eth/downloader/peer.go
@@ -117,9 +117,7 @@ func newPeerConnection(id string, version int, peer Peer, logger log.Logger) *pe
 	return &peerConnection{
 		id:      id,
 		lacking: make(map[common.Hash]struct{}),
-
-		peer: peer,
-
+		peer:    peer,
 		version: version,
 		log:     logger,
 	}
@@ -173,12 +171,14 @@ func (p *peerConnection) FetchBodies(request *fetchRequest) error {
 	}
 	p.blockStarted = time.Now()
 
-	// Convert the header set to a retrievable slice
-	hashes := make([]common.Hash, 0, len(request.Headers))
-	for _, header := range request.Headers {
-		hashes = append(hashes, header.Hash())
-	}
-	go p.peer.RequestBodies(hashes)
+	go func() {
+		// Convert the header set to a retrievable slice
+		hashes := make([]common.Hash, 0, len(request.Headers))
+		for _, header := range request.Headers {
+			hashes = append(hashes, header.Hash())
+		}
+		p.peer.RequestBodies(hashes)
+	}()
 
 	return nil
 }
@@ -195,12 +195,14 @@ func (p *peerConnection) FetchReceipts(request *fetchRequest) error {
 	}
 	p.receiptStarted = time.Now()
 
-	// Convert the header set to a retrievable slice
-	hashes := make([]common.Hash, 0, len(request.Headers))
-	for _, header := range request.Headers {
-		hashes = append(hashes, header.Hash())
-	}
-	go p.peer.RequestReceipts(hashes)
+	go func() {
+		// Convert the header set to a retrievable slice
+		hashes := make([]common.Hash, 0, len(request.Headers))
+		for _, header := range request.Headers {
+			hashes = append(hashes, header.Hash())
+		}
+		p.peer.RequestReceipts(hashes)
+	}()
 
 	return nil
 }
@@ -225,34 +227,34 @@ func (p *peerConnection) FetchNodeData(hashes []common.Hash) error {
 // SetHeadersIdle sets the peer to idle, allowing it to execute new header retrieval
 // requests. Its estimated header retrieval throughput is updated with that measured
 // just now.
-func (p *peerConnection) SetHeadersIdle(delivered int) {
-	p.setIdle(p.headerStarted, delivered, &p.headerThroughput, &p.headerIdle)
+func (p *peerConnection) SetHeadersIdle(delivered int, deliveryTime time.Time) {
+	p.setIdle(deliveryTime.Sub(p.headerStarted), delivered, &p.headerThroughput, &p.headerIdle)
 }
 
 // SetBodiesIdle sets the peer to idle, allowing it to execute block body retrieval
 // requests. Its estimated body retrieval throughput is updated with that measured
 // just now.
-func (p *peerConnection) SetBodiesIdle(delivered int) {
-	p.setIdle(p.blockStarted, delivered, &p.blockThroughput, &p.blockIdle)
+func (p *peerConnection) SetBodiesIdle(delivered int, deliveryTime time.Time) {
+	p.setIdle(deliveryTime.Sub(p.blockStarted), delivered, &p.blockThroughput, &p.blockIdle)
 }
 
 // SetReceiptsIdle sets the peer to idle, allowing it to execute new receipt
 // retrieval requests. Its estimated receipt retrieval throughput is updated
 // with that measured just now.
-func (p *peerConnection) SetReceiptsIdle(delivered int) {
-	p.setIdle(p.receiptStarted, delivered, &p.receiptThroughput, &p.receiptIdle)
+func (p *peerConnection) SetReceiptsIdle(delivered int, deliveryTime time.Time) {
+	p.setIdle(deliveryTime.Sub(p.receiptStarted), delivered, &p.receiptThroughput, &p.receiptIdle)
 }
 
 // SetNodeDataIdle sets the peer to idle, allowing it to execute new state trie
 // data retrieval requests. Its estimated state retrieval throughput is updated
 // with that measured just now.
-func (p *peerConnection) SetNodeDataIdle(delivered int) {
-	p.setIdle(p.stateStarted, delivered, &p.stateThroughput, &p.stateIdle)
+func (p *peerConnection) SetNodeDataIdle(delivered int, deliveryTime time.Time) {
+	p.setIdle(deliveryTime.Sub(p.stateStarted), delivered, &p.stateThroughput, &p.stateIdle)
 }
 
 // setIdle sets the peer to idle, allowing it to execute new retrieval requests.
 // Its estimated retrieval throughput is updated with that measured just now.
-func (p *peerConnection) setIdle(started time.Time, delivered int, throughput *float64, idle *int32) {
+func (p *peerConnection) setIdle(elapsed time.Duration, delivered int, throughput *float64, idle *int32) {
 	// Irrelevant of the scaling, make sure the peer ends up idle
 	defer atomic.StoreInt32(idle, 0)
 
@@ -265,7 +267,9 @@ func (p *peerConnection) setIdle(started time.Time, delivered int, throughput *f
 		return
 	}
 	// Otherwise update the throughput with a new measurement
-	elapsed := time.Since(started) + 1 // +1 (ns) to ensure non-zero divisor
+	if elapsed <= 0 {
+		elapsed = 1 // +1 (ns) to ensure non-zero divisor
+	}
 	measured := float64(delivered) / (float64(elapsed) / float64(time.Second))
 
 	*throughput = (1-measurementImpact)*(*throughput) + measurementImpact*measured
@@ -523,22 +527,20 @@ func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peerC
 	defer ps.lock.RUnlock()
 
 	idle, total := make([]*peerConnection, 0, len(ps.peers)), 0
+	tps := make([]float64, 0, len(ps.peers))
 	for _, p := range ps.peers {
 		if p.version >= minProtocol && p.version <= maxProtocol {
 			if idleCheck(p) {
 				idle = append(idle, p)
+				tps = append(tps, throughput(p))
 			}
 			total++
 		}
 	}
-	for i := 0; i < len(idle); i++ {
-		for j := i + 1; j < len(idle); j++ {
-			if throughput(idle[i]) < throughput(idle[j]) {
-				idle[i], idle[j] = idle[j], idle[i]
-			}
-		}
-	}
-	return idle, total
+	// And sort them
+	sortPeers := &peerThroughputSort{idle, tps}
+	sort.Sort(sortPeers)
+	return sortPeers.p, total
 }
 
 // medianRTT returns the median RTT of the peerset, considering only the tuning
@@ -571,3 +573,24 @@ func (ps *peerSet) medianRTT() time.Duration {
 	}
 	return median
 }
+
+// peerThroughputSort implements the Sort interface, and allows for
+// sorting a set of peers by their throughput
+// The sorted data is with the _highest_ throughput first
+type peerThroughputSort struct {
+	p  []*peerConnection
+	tp []float64
+}
+
+func (ps *peerThroughputSort) Len() int {
+	return len(ps.p)
+}
+
+func (ps *peerThroughputSort) Less(i, j int) bool {
+	return ps.tp[i] > ps.tp[j]
+}
+
+func (ps *peerThroughputSort) Swap(i, j int) {
+	ps.p[i], ps.p[j] = ps.p[j], ps.p[i]
+	ps.tp[i], ps.tp[j] = ps.tp[j], ps.tp[i]
+}
diff --git a/eth/downloader/peer_test.go b/eth/downloader/peer_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..4bf0e200bb12a3361e018b21064d0cd0518f0108
--- /dev/null
+++ b/eth/downloader/peer_test.go
@@ -0,0 +1,53 @@
+// Copyright 2020 The go-ethereum Authors
+// This file is part of go-ethereum.
+//
+// go-ethereum is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// go-ethereum is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
+
+package downloader
+
+import (
+	"sort"
+	"testing"
+)
+
+func TestPeerThroughputSorting(t *testing.T) {
+	a := &peerConnection{
+		id:               "a",
+		headerThroughput: 1.25,
+	}
+	b := &peerConnection{
+		id:               "b",
+		headerThroughput: 1.21,
+	}
+	c := &peerConnection{
+		id:               "c",
+		headerThroughput: 1.23,
+	}
+
+	peers := []*peerConnection{a, b, c}
+	tps := []float64{a.headerThroughput,
+		b.headerThroughput, c.headerThroughput}
+	sortPeers := &peerThroughputSort{peers, tps}
+	sort.Sort(sortPeers)
+	if got, exp := sortPeers.p[0].id, "a"; got != exp {
+		t.Errorf("sort fail, got %v exp %v", got, exp)
+	}
+	if got, exp := sortPeers.p[1].id, "c"; got != exp {
+		t.Errorf("sort fail, got %v exp %v", got, exp)
+	}
+	if got, exp := sortPeers.p[2].id, "b"; got != exp {
+		t.Errorf("sort fail, got %v exp %v", got, exp)
+	}
+
+}
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index 9aea102539246793610ad1d4bd72178b2f394496..87225cb6254678dfc30ab46c889f58a4f0d9e92d 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -23,6 +23,7 @@ import (
 	"errors"
 	"fmt"
 	"sync"
+	"sync/atomic"
 	"time"
 
 	"github.com/ethereum/go-ethereum/common"
@@ -32,6 +33,11 @@ import (
 	"github.com/ethereum/go-ethereum/metrics"
 )
 
+const (
+	bodyType    = uint(0)
+	receiptType = uint(1)
+)
+
 var (
 	blockCacheItems      = 8192             // Maximum number of blocks to cache before throttling the download
 	blockCacheMemory     = 64 * 1024 * 1024 // Maximum amount of memory to use for block caching
@@ -54,8 +60,7 @@ type fetchRequest struct {
 // fetchResult is a struct collecting partial results from data fetchers until
 // all outstanding pieces complete and the result as a whole can be processed.
 type fetchResult struct {
-	Pending int         // Number of data fetches still pending
-	Hash    common.Hash // Hash of the header to prevent recalculating
+	pending int32 // Flag telling what deliveries are outstanding
 
 	Header       *types.Header
 	Uncles       []*types.Header
@@ -63,6 +68,44 @@ type fetchResult struct {
 	Receipts     types.Receipts
 }
 
+func newFetchResult(header *types.Header, fastSync bool) *fetchResult {
+	item := &fetchResult{
+		Header: header,
+	}
+	if !header.EmptyBody() {
+		item.pending |= (1 << bodyType)
+	}
+	if fastSync && !header.EmptyReceipts() {
+		item.pending |= (1 << receiptType)
+	}
+	return item
+}
+
+// SetBodyDone flags the body as finished.
+func (f *fetchResult) SetBodyDone() {
+	if v := atomic.LoadInt32(&f.pending); (v & (1 << bodyType)) != 0 {
+		atomic.AddInt32(&f.pending, -1)
+	}
+}
+
+// AllDone checks if item is done.
+func (f *fetchResult) AllDone() bool {
+	return atomic.LoadInt32(&f.pending) == 0
+}
+
+// SetReceiptsDone flags the receipts as finished.
+func (f *fetchResult) SetReceiptsDone() {
+	if v := atomic.LoadInt32(&f.pending); (v & (1 << receiptType)) != 0 {
+		atomic.AddInt32(&f.pending, -2)
+	}
+}
+
+// Done checks if the given type is done already
+func (f *fetchResult) Done(kind uint) bool {
+	v := atomic.LoadInt32(&f.pending)
+	return v&(1<<kind) == 0
+}
+
 // queue represents hashes that are either need fetching or are being fetched
 type queue struct {
 	mode SyncMode // Synchronisation mode to decide on the block parts to schedule for fetching
@@ -82,44 +125,37 @@ type queue struct {
 	blockTaskPool  map[common.Hash]*types.Header // [eth/62] Pending block (body) retrieval tasks, mapping hashes to headers
 	blockTaskQueue *prque.Prque                  // [eth/62] Priority queue of the headers to fetch the blocks (bodies) for
 	blockPendPool  map[string]*fetchRequest      // [eth/62] Currently pending block (body) retrieval operations
-	blockDonePool  map[common.Hash]struct{}      // [eth/62] Set of the completed block (body) fetches
 
 	receiptTaskPool  map[common.Hash]*types.Header // [eth/63] Pending receipt retrieval tasks, mapping hashes to headers
 	receiptTaskQueue *prque.Prque                  // [eth/63] Priority queue of the headers to fetch the receipts for
 	receiptPendPool  map[string]*fetchRequest      // [eth/63] Currently pending receipt retrieval operations
-	receiptDonePool  map[common.Hash]struct{}      // [eth/63] Set of the completed receipt fetches
 
-	resultCache  []*fetchResult     // Downloaded but not yet delivered fetch results
-	resultOffset uint64             // Offset of the first cached fetch result in the block chain
-	resultSize   common.StorageSize // Approximate size of a block (exponential moving average)
+	resultCache *resultStore       // Downloaded but not yet delivered fetch results
+	resultSize  common.StorageSize // Approximate size of a block (exponential moving average)
 
-	lock   *sync.Mutex
+	lock   *sync.RWMutex
 	active *sync.Cond
 	closed bool
+
+	lastStatLog time.Time
 }
 
 // newQueue creates a new download queue for scheduling block retrieval.
-func newQueue() *queue {
-	lock := new(sync.Mutex)
-	return &queue{
-		headerPendPool:   make(map[string]*fetchRequest),
+func newQueue(blockCacheLimit int) *queue {
+	lock := new(sync.RWMutex)
+	q := &queue{
 		headerContCh:     make(chan bool),
-		blockTaskPool:    make(map[common.Hash]*types.Header),
 		blockTaskQueue:   prque.New(nil),
-		blockPendPool:    make(map[string]*fetchRequest),
-		blockDonePool:    make(map[common.Hash]struct{}),
-		receiptTaskPool:  make(map[common.Hash]*types.Header),
 		receiptTaskQueue: prque.New(nil),
-		receiptPendPool:  make(map[string]*fetchRequest),
-		receiptDonePool:  make(map[common.Hash]struct{}),
-		resultCache:      make([]*fetchResult, blockCacheItems),
 		active:           sync.NewCond(lock),
 		lock:             lock,
 	}
+	q.Reset(blockCacheLimit)
+	return q
 }
 
 // Reset clears out the queue contents.
-func (q *queue) Reset() {
+func (q *queue) Reset(blockCacheLimit int) {
 	q.lock.Lock()
 	defer q.lock.Unlock()
 
@@ -132,15 +168,12 @@ func (q *queue) Reset() {
 	q.blockTaskPool = make(map[common.Hash]*types.Header)
 	q.blockTaskQueue.Reset()
 	q.blockPendPool = make(map[string]*fetchRequest)
-	q.blockDonePool = make(map[common.Hash]struct{})
 
 	q.receiptTaskPool = make(map[common.Hash]*types.Header)
 	q.receiptTaskQueue.Reset()
 	q.receiptPendPool = make(map[string]*fetchRequest)
-	q.receiptDonePool = make(map[common.Hash]struct{})
 
-	q.resultCache = make([]*fetchResult, blockCacheItems)
-	q.resultOffset = 0
+	q.resultCache = newResultStore(blockCacheLimit)
 }
 
 // Close marks the end of the sync, unblocking Results.
@@ -148,8 +181,8 @@ func (q *queue) Reset() {
 func (q *queue) Close() {
 	q.lock.Lock()
 	q.closed = true
+	q.active.Signal()
 	q.lock.Unlock()
-	q.active.Broadcast()
 }
 
 // PendingHeaders retrieves the number of header requests pending for retrieval.
@@ -210,58 +243,8 @@ func (q *queue) Idle() bool {
 
 	queued := q.blockTaskQueue.Size() + q.receiptTaskQueue.Size()
 	pending := len(q.blockPendPool) + len(q.receiptPendPool)
-	cached := len(q.blockDonePool) + len(q.receiptDonePool)
-
-	return (queued + pending + cached) == 0
-}
-
-// ShouldThrottleBlocks checks if the download should be throttled (active block (body)
-// fetches exceed block cache).
-func (q *queue) ShouldThrottleBlocks() bool {
-	q.lock.Lock()
-	defer q.lock.Unlock()
-
-	return q.resultSlots(q.blockPendPool, q.blockDonePool) <= 0
-}
-
-// ShouldThrottleReceipts checks if the download should be throttled (active receipt
-// fetches exceed block cache).
-func (q *queue) ShouldThrottleReceipts() bool {
-	q.lock.Lock()
-	defer q.lock.Unlock()
 
-	return q.resultSlots(q.receiptPendPool, q.receiptDonePool) <= 0
-}
-
-// resultSlots calculates the number of results slots available for requests
-// whilst adhering to both the item and the memory limits of the result cache.
-func (q *queue) resultSlots(pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}) int {
-	// Calculate the maximum length capped by the memory limit
-	limit := len(q.resultCache)
-	if common.StorageSize(len(q.resultCache))*q.resultSize > common.StorageSize(blockCacheMemory) {
-		limit = int((common.StorageSize(blockCacheMemory) + q.resultSize - 1) / q.resultSize)
-	}
-	// Calculate the number of slots already finished
-	finished := 0
-	for _, result := range q.resultCache[:limit] {
-		if result == nil {
-			break
-		}
-		if _, ok := donePool[result.Hash]; ok {
-			finished++
-		}
-	}
-	// Calculate the number of slots currently downloading
-	pending := 0
-	for _, request := range pendPool {
-		for _, header := range request.Headers {
-			if header.Number.Uint64() < q.resultOffset+uint64(limit) {
-				pending++
-			}
-		}
-	}
-	// Return the free slots to distribute
-	return limit - finished - pending
+	return (queued + pending) == 0
 }
 
 // ScheduleSkeleton adds a batch of header retrieval tasks to the queue to fill
@@ -323,21 +306,22 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
 			break
 		}
 		// Make sure no duplicate requests are executed
+		// We cannot skip this, even if the block is empty, since this is
+		// what triggers the fetchResult creation.
 		if _, ok := q.blockTaskPool[hash]; ok {
 			log.Warn("Header already scheduled for block fetch", "number", header.Number, "hash", hash)
-			continue
-		}
-		if _, ok := q.receiptTaskPool[hash]; ok {
-			log.Warn("Header already scheduled for receipt fetch", "number", header.Number, "hash", hash)
-			continue
+		} else {
+			q.blockTaskPool[hash] = header
+			q.blockTaskQueue.Push(header, -int64(header.Number.Uint64()))
 		}
-		// Queue the header for content retrieval
-		q.blockTaskPool[hash] = header
-		q.blockTaskQueue.Push(header, -int64(header.Number.Uint64()))
-
-		if q.mode == FastSync {
-			q.receiptTaskPool[hash] = header
-			q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64()))
+		// Queue for receipt retrieval
+		if q.mode == FastSync && !header.EmptyReceipts() {
+			if _, ok := q.receiptTaskPool[hash]; ok {
+				log.Warn("Header already scheduled for receipt fetch", "number", header.Number, "hash", hash)
+			} else {
+				q.receiptTaskPool[hash] = header
+				q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64()))
+			}
 		}
 		inserts = append(inserts, header)
 		q.headerHead = hash
@@ -347,67 +331,78 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
 }
 
 // Results retrieves and permanently removes a batch of fetch results from
-// the cache. The result slice will be empty if the queue has been closed.
+// the cache. the result slice will be empty if the queue has been closed.
+// Results can be called concurrently with Deliver and Schedule,
+// but assumes that there are not two simultaneous callers to Results
 func (q *queue) Results(block bool) []*fetchResult {
-	q.lock.Lock()
-	defer q.lock.Unlock()
-
-	// Count the number of items available for processing
-	nproc := q.countProcessableItems()
-	for nproc == 0 && !q.closed {
-		if !block {
-			return nil
+	// Abort early if there are no items and non-blocking requested
+	if !block && !q.resultCache.HasCompletedItems() {
+		return nil
+	}
+	closed := false
+	for !closed && !q.resultCache.HasCompletedItems() {
+		// In order to wait on 'active', we need to obtain the lock.
+		// That may take a while, if someone is delivering at the same
+		// time, so after obtaining the lock, we check again if there
+		// are any results to fetch.
+		// Also, in-between we ask for the lock and the lock is obtained,
+		// someone can have closed the queue. In that case, we should
+		// return the available results and stop blocking
+		q.lock.Lock()
+		if q.resultCache.HasCompletedItems() || q.closed {
+			q.lock.Unlock()
+			break
 		}
+		// No items available, and not closed
 		q.active.Wait()
-		nproc = q.countProcessableItems()
-	}
-	// Since we have a batch limit, don't pull more into "dangling" memory
-	if nproc > maxResultsProcess {
-		nproc = maxResultsProcess
-	}
-	results := make([]*fetchResult, nproc)
-	copy(results, q.resultCache[:nproc])
-	if len(results) > 0 {
-		// Mark results as done before dropping them from the cache.
-		for _, result := range results {
-			hash := result.Header.Hash()
-			delete(q.blockDonePool, hash)
-			delete(q.receiptDonePool, hash)
+		closed = q.closed
+		q.lock.Unlock()
+	}
+	// Regardless if closed or not, we can still deliver whatever we have
+	results := q.resultCache.GetCompleted(maxResultsProcess)
+	for _, result := range results {
+		// Recalculate the result item weights to prevent memory exhaustion
+		size := result.Header.Size()
+		for _, uncle := range result.Uncles {
+			size += uncle.Size()
 		}
-		// Delete the results from the cache and clear the tail.
-		copy(q.resultCache, q.resultCache[nproc:])
-		for i := len(q.resultCache) - nproc; i < len(q.resultCache); i++ {
-			q.resultCache[i] = nil
+		for _, receipt := range result.Receipts {
+			size += receipt.Size()
 		}
-		// Advance the expected block number of the first cache entry.
-		q.resultOffset += uint64(nproc)
-
-		// Recalculate the result item weights to prevent memory exhaustion
-		for _, result := range results {
-			size := result.Header.Size()
-			for _, uncle := range result.Uncles {
-				size += uncle.Size()
-			}
-			for _, receipt := range result.Receipts {
-				size += receipt.Size()
-			}
-			for _, tx := range result.Transactions {
-				size += tx.Size()
-			}
-			q.resultSize = common.StorageSize(blockCacheSizeWeight)*size + (1-common.StorageSize(blockCacheSizeWeight))*q.resultSize
+		for _, tx := range result.Transactions {
+			size += tx.Size()
 		}
+		q.resultSize = common.StorageSize(blockCacheSizeWeight)*size +
+			(1-common.StorageSize(blockCacheSizeWeight))*q.resultSize
+	}
+	// Using the newly calibrated resultsize, figure out the new throttle limit
+	// on the result cache
+	throttleThreshold := uint64((common.StorageSize(blockCacheMemory) + q.resultSize - 1) / q.resultSize)
+	throttleThreshold = q.resultCache.SetThrottleThreshold(throttleThreshold)
+
+	// Log some info at certain times
+	if time.Since(q.lastStatLog) > 10*time.Second {
+		q.lastStatLog = time.Now()
+		info := q.Stats()
+		info = append(info, "throttle", throttleThreshold)
+		log.Info("Downloader queue stats", info...)
 	}
 	return results
 }
 
-// countProcessableItems counts the processable items.
-func (q *queue) countProcessableItems() int {
-	for i, result := range q.resultCache {
-		if result == nil || result.Pending > 0 {
-			return i
-		}
+func (q *queue) Stats() []interface{} {
+	q.lock.RLock()
+	defer q.lock.RUnlock()
+
+	return q.stats()
+}
+
+func (q *queue) stats() []interface{} {
+	return []interface{}{
+		"receiptTasks", q.receiptTaskQueue.Size(),
+		"blockTasks", q.blockTaskQueue.Size(),
+		"itemSize", q.resultSize,
 	}
-	return len(q.resultCache)
 }
 
 // ReserveHeaders reserves a set of headers for the given peer, skipping any
@@ -453,27 +448,21 @@ func (q *queue) ReserveHeaders(p *peerConnection, count int) *fetchRequest {
 // ReserveBodies reserves a set of body fetches for the given peer, skipping any
 // previously failed downloads. Beside the next batch of needed fetches, it also
 // returns a flag whether empty blocks were queued requiring processing.
-func (q *queue) ReserveBodies(p *peerConnection, count int) (*fetchRequest, bool, error) {
-	isNoop := func(header *types.Header) bool {
-		return header.TxHash == types.EmptyRootHash && header.UncleHash == types.EmptyUncleHash
-	}
+func (q *queue) ReserveBodies(p *peerConnection, count int) (*fetchRequest, bool, bool) {
 	q.lock.Lock()
 	defer q.lock.Unlock()
 
-	return q.reserveHeaders(p, count, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, q.blockDonePool, isNoop)
+	return q.reserveHeaders(p, count, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, bodyType)
 }
 
 // ReserveReceipts reserves a set of receipt fetches for the given peer, skipping
 // any previously failed downloads. Beside the next batch of needed fetches, it
 // also returns a flag whether empty receipts were queued requiring importing.
-func (q *queue) ReserveReceipts(p *peerConnection, count int) (*fetchRequest, bool, error) {
-	isNoop := func(header *types.Header) bool {
-		return header.ReceiptHash == types.EmptyRootHash
-	}
+func (q *queue) ReserveReceipts(p *peerConnection, count int) (*fetchRequest, bool, bool) {
 	q.lock.Lock()
 	defer q.lock.Unlock()
 
-	return q.reserveHeaders(p, count, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, q.receiptDonePool, isNoop)
+	return q.reserveHeaders(p, count, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, receiptType)
 }
 
 // reserveHeaders reserves a set of data download operations for a given peer,
@@ -483,57 +472,71 @@ func (q *queue) ReserveReceipts(p *peerConnection, count int) (*fetchRequest, bo
 // Note, this method expects the queue lock to be already held for writing. The
 // reason the lock is not obtained in here is because the parameters already need
 // to access the queue, so they already need a lock anyway.
+//
+// Returns:
+//   item     - the fetchRequest
+//   progress - whether any progress was made
+//   throttle - if the caller should throttle for a while
 func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
-	pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, isNoop func(*types.Header) bool) (*fetchRequest, bool, error) {
+	pendPool map[string]*fetchRequest, kind uint) (*fetchRequest, bool, bool) {
 	// Short circuit if the pool has been depleted, or if the peer's already
 	// downloading something (sanity check not to corrupt state)
 	if taskQueue.Empty() {
-		return nil, false, nil
+		return nil, false, true
 	}
 	if _, ok := pendPool[p.id]; ok {
-		return nil, false, nil
+		return nil, false, false
 	}
-	// Calculate an upper limit on the items we might fetch (i.e. throttling)
-	space := q.resultSlots(pendPool, donePool)
-
 	// Retrieve a batch of tasks, skipping previously failed ones
 	send := make([]*types.Header, 0, count)
 	skip := make([]*types.Header, 0)
-
 	progress := false
-	for proc := 0; proc < space && len(send) < count && !taskQueue.Empty(); proc++ {
-		header := taskQueue.PopItem().(*types.Header)
-		hash := header.Hash()
-
-		// If we're the first to request this task, initialise the result container
-		index := int(header.Number.Int64() - int64(q.resultOffset))
-		if index >= len(q.resultCache) || index < 0 {
-			common.Report("index allocation went beyond available resultCache space")
-			return nil, false, fmt.Errorf("%w: index allocation went beyond available resultCache space", errInvalidChain)
+	throttled := false
+	for proc := 0; len(send) < count && !taskQueue.Empty(); proc++ {
+		// the task queue will pop items in order, so the highest prio block
+		// is also the lowest block number.
+		h, _ := taskQueue.Peek()
+		header := h.(*types.Header)
+		// we can ask the resultcache if this header is within the
+		// "prioritized" segment of blocks. If it is not, we need to throttle
+
+		stale, throttle, item, err := q.resultCache.AddFetch(header, q.mode == FastSync)
+		if stale {
+			// Don't put back in the task queue, this item has already been
+			// delivered upstream
+			taskQueue.PopItem()
+			progress = true
+			delete(taskPool, header.Hash())
+			proc = proc - 1
+			log.Error("Fetch reservation already delivered", "number", header.Number.Uint64())
+			continue
 		}
-		if q.resultCache[index] == nil {
-			components := 1
-			if q.mode == FastSync {
-				components = 2
-			}
-			q.resultCache[index] = &fetchResult{
-				Pending: components,
-				Hash:    hash,
-				Header:  header,
-			}
+		if throttle {
+			// There are no resultslots available. Leave it in the task queue
+			// However, if there are any left as 'skipped', we should not tell
+			// the caller to throttle, since we still want some other
+			// peer to fetch those for us
+			throttled = len(skip) == 0
+			break
 		}
-		// If this fetch task is a noop, skip this fetch operation
-		if isNoop(header) {
-			donePool[hash] = struct{}{}
-			delete(taskPool, hash)
-
-			space, proc = space-1, proc-1
-			q.resultCache[index].Pending--
+		if err != nil {
+			// this most definitely should _not_ happen
+			log.Warn("Failed to reserve headers", "err", err)
+			// There are no resultslots available. Leave it in the task queue
+			break
+		}
+		if item.Done(kind) {
+			// If it's a noop, we can skip this task
+			delete(taskPool, header.Hash())
+			taskQueue.PopItem()
+			proc = proc - 1
 			progress = true
 			continue
 		}
+		// Remove it from the task queue
+		taskQueue.PopItem()
 		// Otherwise unless the peer is known not to have the data, add to the retrieve list
-		if p.Lacks(hash) {
+		if p.Lacks(header.Hash()) {
 			skip = append(skip, header)
 		} else {
 			send = append(send, header)
@@ -543,13 +546,13 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common
 	for _, header := range skip {
 		taskQueue.Push(header, -int64(header.Number.Uint64()))
 	}
-	if progress {
+	if q.resultCache.HasCompletedItems() {
 		// Wake Results, resultCache was modified
 		q.active.Signal()
 	}
 	// Assemble and return the block download request
 	if len(send) == 0 {
-		return nil, progress, nil
+		return nil, progress, throttled
 	}
 	request := &fetchRequest{
 		Peer:    p,
@@ -557,8 +560,7 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common
 		Time:    time.Now(),
 	}
 	pendPool[p.id] = request
-
-	return request, progress, nil
+	return request, progress, throttled
 }
 
 // CancelHeaders aborts a fetch request, returning all pending skeleton indexes to the queue.
@@ -768,16 +770,23 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh
 func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) (int, error) {
 	q.lock.Lock()
 	defer q.lock.Unlock()
-
-	reconstruct := func(header *types.Header, index int, result *fetchResult) error {
-		if types.DeriveSha(types.Transactions(txLists[index])) != header.TxHash || types.CalcUncleHash(uncleLists[index]) != header.UncleHash {
+	validate := func(index int, header *types.Header) error {
+		if types.DeriveSha(types.Transactions(txLists[index])) != header.TxHash {
 			return errInvalidBody
 		}
+		if types.CalcUncleHash(uncleLists[index]) != header.UncleHash {
+			return errInvalidBody
+		}
+		return nil
+	}
+
+	reconstruct := func(index int, result *fetchResult) {
 		result.Transactions = txLists[index]
 		result.Uncles = uncleLists[index]
-		return nil
+		result.SetBodyDone()
 	}
-	return q.deliver(id, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, q.blockDonePool, bodyReqTimer, len(txLists), reconstruct)
+	return q.deliver(id, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool,
+		bodyReqTimer, len(txLists), validate, reconstruct)
 }
 
 // DeliverReceipts injects a receipt retrieval response into the results queue.
@@ -786,25 +795,29 @@ func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLi
 func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int, error) {
 	q.lock.Lock()
 	defer q.lock.Unlock()
-
-	reconstruct := func(header *types.Header, index int, result *fetchResult) error {
+	validate := func(index int, header *types.Header) error {
 		if types.DeriveSha(types.Receipts(receiptList[index])) != header.ReceiptHash {
 			return errInvalidReceipt
 		}
-		result.Receipts = receiptList[index]
 		return nil
 	}
-	return q.deliver(id, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, q.receiptDonePool, receiptReqTimer, len(receiptList), reconstruct)
+	reconstruct := func(index int, result *fetchResult) {
+		result.Receipts = receiptList[index]
+		result.SetReceiptsDone()
+	}
+	return q.deliver(id, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool,
+		receiptReqTimer, len(receiptList), validate, reconstruct)
 }
 
 // deliver injects a data retrieval response into the results queue.
 //
 // Note, this method expects the queue lock to be already held for writing. The
-// reason the lock is not obtained in here is because the parameters already need
+// reason this lock is not obtained in here is because the parameters already need
 // to access the queue, so they already need a lock anyway.
-func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
-	pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, reqTimer metrics.Timer,
-	results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) (int, error) {
+func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header,
+	taskQueue *prque.Prque, pendPool map[string]*fetchRequest, reqTimer metrics.Timer,
+	results int, validate func(index int, header *types.Header) error,
+	reconstruct func(index int, result *fetchResult)) (int, error) {
 
 	// Short circuit if the data was never requested
 	request := pendPool[id]
@@ -824,52 +837,53 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
 	var (
 		accepted int
 		failure  error
-		useful   bool
+		i        int
+		hashes   []common.Hash
 	)
-	for i, header := range request.Headers {
+	for _, header := range request.Headers {
 		// Short circuit assembly if no more fetch results are found
 		if i >= results {
 			break
 		}
-		// Reconstruct the next result if contents match up
-		index := int(header.Number.Int64() - int64(q.resultOffset))
-		if index >= len(q.resultCache) || index < 0 || q.resultCache[index] == nil {
-			failure = errInvalidChain
-			break
-		}
-		if err := reconstruct(header, i, q.resultCache[index]); err != nil {
+		// Validate the fields
+		if err := validate(i, header); err != nil {
 			failure = err
 			break
 		}
-		hash := header.Hash()
-
-		donePool[hash] = struct{}{}
-		q.resultCache[index].Pending--
-		useful = true
-		accepted++
+		hashes = append(hashes, header.Hash())
+		i++
+	}
 
+	for _, header := range request.Headers[:i] {
+		if res, stale, err := q.resultCache.GetDeliverySlot(header.Number.Uint64()); err == nil {
+			reconstruct(accepted, res)
+		} else {
+			// else: betweeen here and above, some other peer filled this result,
+			// or it was indeed a no-op. This should not happen, but if it does it's
+			// not something to panic about
+			log.Error("Delivery stale", "stale", stale, "number", header.Number.Uint64(), "err", err)
+			failure = errStaleDelivery
+		}
 		// Clean up a successful fetch
-		request.Headers[i] = nil
-		delete(taskPool, hash)
+		delete(taskPool, hashes[accepted])
+		accepted++
 	}
 	// Return all failed or missing fetches to the queue
-	for _, header := range request.Headers {
-		if header != nil {
-			taskQueue.Push(header, -int64(header.Number.Uint64()))
-		}
+	for _, header := range request.Headers[accepted:] {
+		taskQueue.Push(header, -int64(header.Number.Uint64()))
 	}
 	// Wake up Results
 	if accepted > 0 {
 		q.active.Signal()
 	}
-	// If none of the data was good, it's a stale delivery
 	if failure == nil {
 		return accepted, nil
 	}
+	// If none of the data was good, it's a stale delivery
 	if errors.Is(failure, errInvalidChain) {
 		return accepted, failure
 	}
-	if useful {
+	if accepted > 0 {
 		return accepted, fmt.Errorf("partial failure: %v", failure)
 	}
 	return accepted, fmt.Errorf("%w: %v", failure, errStaleDelivery)
@@ -882,8 +896,6 @@ func (q *queue) Prepare(offset uint64, mode SyncMode) {
 	defer q.lock.Unlock()
 
 	// Prepare the queue for sync results
-	if q.resultOffset < offset {
-		q.resultOffset = offset
-	}
+	q.resultCache.Prepare(offset)
 	q.mode = mode
 }
diff --git a/eth/downloader/queue_test.go b/eth/downloader/queue_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..07862390f942abf73dd4c43a04a812722eea3917
--- /dev/null
+++ b/eth/downloader/queue_test.go
@@ -0,0 +1,426 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package downloader
+
+import (
+	"fmt"
+	"math/big"
+	"math/rand"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/ethereum/go-ethereum/common"
+	"github.com/ethereum/go-ethereum/consensus/ethash"
+	"github.com/ethereum/go-ethereum/core"
+	"github.com/ethereum/go-ethereum/core/rawdb"
+	"github.com/ethereum/go-ethereum/core/types"
+	"github.com/ethereum/go-ethereum/log"
+	"github.com/ethereum/go-ethereum/params"
+)
+
+var (
+	testdb  = rawdb.NewMemoryDatabase()
+	genesis = core.GenesisBlockForTesting(testdb, testAddress, big.NewInt(1000000000))
+)
+
+// makeChain creates a chain of n blocks starting at and including parent.
+// the returned hash chain is ordered head->parent. In addition, every 3rd block
+// contains a transaction and every 5th an uncle to allow testing correct block
+// reassembly.
+func makeChain(n int, seed byte, parent *types.Block, empty bool) ([]*types.Block, []types.Receipts) {
+	blocks, receipts := core.GenerateChain(params.TestChainConfig, parent, ethash.NewFaker(), testdb, n, func(i int, block *core.BlockGen) {
+		block.SetCoinbase(common.Address{seed})
+		// Add one tx to every secondblock
+		if !empty && i%2 == 0 {
+			signer := types.MakeSigner(params.TestChainConfig, block.Number())
+			tx, err := types.SignTx(types.NewTransaction(block.TxNonce(testAddress), common.Address{seed}, big.NewInt(1000), params.TxGas, nil, nil), signer, testKey)
+			if err != nil {
+				panic(err)
+			}
+			block.AddTx(tx)
+		}
+	})
+	return blocks, receipts
+}
+
+type chainData struct {
+	blocks []*types.Block
+	offset int
+}
+
+var chain *chainData
+var emptyChain *chainData
+
+func init() {
+	// Create a chain of blocks to import
+	targetBlocks := 128
+	blocks, _ := makeChain(targetBlocks, 0, genesis, false)
+	chain = &chainData{blocks, 0}
+
+	blocks, _ = makeChain(targetBlocks, 0, genesis, true)
+	emptyChain = &chainData{blocks, 0}
+}
+
+func (chain *chainData) headers() []*types.Header {
+	hdrs := make([]*types.Header, len(chain.blocks))
+	for i, b := range chain.blocks {
+		hdrs[i] = b.Header()
+	}
+	return hdrs
+}
+
+func (chain *chainData) Len() int {
+	return len(chain.blocks)
+}
+
+func dummyPeer(id string) *peerConnection {
+	p := &peerConnection{
+		id:      id,
+		lacking: make(map[common.Hash]struct{}),
+	}
+	return p
+}
+
+func TestBasics(t *testing.T) {
+	q := newQueue(10)
+	if !q.Idle() {
+		t.Errorf("new queue should be idle")
+	}
+	q.Prepare(1, FastSync)
+	if res := q.Results(false); len(res) != 0 {
+		t.Fatal("new queue should have 0 results")
+	}
+
+	// Schedule a batch of headers
+	q.Schedule(chain.headers(), 1)
+	if q.Idle() {
+		t.Errorf("queue should not be idle")
+	}
+	if got, exp := q.PendingBlocks(), chain.Len(); got != exp {
+		t.Errorf("wrong pending block count, got %d, exp %d", got, exp)
+	}
+	// Only non-empty receipts get added to task-queue
+	if got, exp := q.PendingReceipts(), 64; got != exp {
+		t.Errorf("wrong pending receipt count, got %d, exp %d", got, exp)
+	}
+	// Items are now queued for downloading, next step is that we tell the
+	// queue that a certain peer will deliver them for us
+	{
+		peer := dummyPeer("peer-1")
+		fetchReq, _, throttle := q.ReserveBodies(peer, 50)
+		if !throttle {
+			// queue size is only 10, so throttling should occur
+			t.Fatal("should throttle")
+		}
+		// But we should still get the first things to fetch
+		if got, exp := len(fetchReq.Headers), 5; got != exp {
+			t.Fatalf("expected %d requests, got %d", exp, got)
+		}
+		if got, exp := fetchReq.Headers[0].Number.Uint64(), uint64(1); got != exp {
+			t.Fatalf("expected header %d, got %d", exp, got)
+		}
+	}
+	{
+		peer := dummyPeer("peer-2")
+		fetchReq, _, throttle := q.ReserveBodies(peer, 50)
+
+		// The second peer should hit throttling
+		if !throttle {
+			t.Fatalf("should not throttle")
+		}
+		// And not get any fetches at all, since it was throttled to begin with
+		if fetchReq != nil {
+			t.Fatalf("should have no fetches, got %d", len(fetchReq.Headers))
+		}
+	}
+	//fmt.Printf("blockTaskQueue len: %d\n", q.blockTaskQueue.Size())
+	//fmt.Printf("receiptTaskQueue len: %d\n", q.receiptTaskQueue.Size())
+	{
+		// The receipt delivering peer should not be affected
+		// by the throttling of body deliveries
+		peer := dummyPeer("peer-3")
+		fetchReq, _, throttle := q.ReserveReceipts(peer, 50)
+		if !throttle {
+			// queue size is only 10, so throttling should occur
+			t.Fatal("should throttle")
+		}
+		// But we should still get the first things to fetch
+		if got, exp := len(fetchReq.Headers), 5; got != exp {
+			t.Fatalf("expected %d requests, got %d", exp, got)
+		}
+		if got, exp := fetchReq.Headers[0].Number.Uint64(), uint64(1); got != exp {
+			t.Fatalf("expected header %d, got %d", exp, got)
+		}
+
+	}
+	//fmt.Printf("blockTaskQueue len: %d\n", q.blockTaskQueue.Size())
+	//fmt.Printf("receiptTaskQueue len: %d\n", q.receiptTaskQueue.Size())
+	//fmt.Printf("processable: %d\n", q.resultCache.countCompleted())
+}
+
+func TestEmptyBlocks(t *testing.T) {
+	q := newQueue(10)
+
+	q.Prepare(1, FastSync)
+	// Schedule a batch of headers
+	q.Schedule(emptyChain.headers(), 1)
+	if q.Idle() {
+		t.Errorf("queue should not be idle")
+	}
+	if got, exp := q.PendingBlocks(), len(emptyChain.blocks); got != exp {
+		t.Errorf("wrong pending block count, got %d, exp %d", got, exp)
+	}
+	if got, exp := q.PendingReceipts(), 0; got != exp {
+		t.Errorf("wrong pending receipt count, got %d, exp %d", got, exp)
+	}
+	// They won't be processable, because the fetchresults haven't been
+	// created yet
+	if got, exp := q.resultCache.countCompleted(), 0; got != exp {
+		t.Errorf("wrong processable count, got %d, exp %d", got, exp)
+	}
+
+	// Items are now queued for downloading, next step is that we tell the
+	// queue that a certain peer will deliver them for us
+	// That should trigger all of them to suddenly become 'done'
+	{
+		// Reserve blocks
+		peer := dummyPeer("peer-1")
+		fetchReq, _, _ := q.ReserveBodies(peer, 50)
+
+		// there should be nothing to fetch, blocks are empty
+		if fetchReq != nil {
+			t.Fatal("there should be no body fetch tasks remaining")
+		}
+
+	}
+	if q.blockTaskQueue.Size() != len(emptyChain.blocks)-10 {
+		t.Errorf("expected block task queue to be 0, got %d", q.blockTaskQueue.Size())
+	}
+	if q.receiptTaskQueue.Size() != 0 {
+		t.Errorf("expected receipt task queue to be 0, got %d", q.receiptTaskQueue.Size())
+	}
+	//fmt.Printf("receiptTaskQueue len: %d\n", q.receiptTaskQueue.Size())
+	{
+		peer := dummyPeer("peer-3")
+		fetchReq, _, _ := q.ReserveReceipts(peer, 50)
+
+		// there should be nothing to fetch, blocks are empty
+		if fetchReq != nil {
+			t.Fatal("there should be no body fetch tasks remaining")
+		}
+	}
+	if got, exp := q.resultCache.countCompleted(), 10; got != exp {
+		t.Errorf("wrong processable count, got %d, exp %d", got, exp)
+	}
+}
+
+// XTestDelivery does some more extensive testing of events that happen,
+// blocks that become known and peers that make reservations and deliveries.
+// disabled since it's not really a unit-test, but can be executed to test
+// some more advanced scenarios
+func XTestDelivery(t *testing.T) {
+	// the outside network, holding blocks
+	blo, rec := makeChain(128, 0, genesis, false)
+	world := newNetwork()
+	world.receipts = rec
+	world.chain = blo
+	world.progress(10)
+	if false {
+		log.Root().SetHandler(log.StdoutHandler)
+
+	}
+	q := newQueue(10)
+	var wg sync.WaitGroup
+	q.Prepare(1, FastSync)
+	wg.Add(1)
+	go func() {
+		// deliver headers
+		defer wg.Done()
+		c := 1
+		for {
+			//fmt.Printf("getting headers from %d\n", c)
+			hdrs := world.headers(c)
+			l := len(hdrs)
+			//fmt.Printf("scheduling %d headers, first %d last %d\n",
+			//	l, hdrs[0].Number.Uint64(), hdrs[len(hdrs)-1].Number.Uint64())
+			q.Schedule(hdrs, uint64(c))
+			c += l
+		}
+	}()
+	wg.Add(1)
+	go func() {
+		// collect results
+		defer wg.Done()
+		tot := 0
+		for {
+			res := q.Results(true)
+			tot += len(res)
+			fmt.Printf("got %d results, %d tot\n", len(res), tot)
+			// Now we can forget about these
+			world.forget(res[len(res)-1].Header.Number.Uint64())
+
+		}
+	}()
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		// reserve body fetch
+		i := 4
+		for {
+			peer := dummyPeer(fmt.Sprintf("peer-%d", i))
+			f, _, _ := q.ReserveBodies(peer, rand.Intn(30))
+			if f != nil {
+				var emptyList []*types.Header
+				var txs [][]*types.Transaction
+				var uncles [][]*types.Header
+				numToSkip := rand.Intn(len(f.Headers))
+				for _, hdr := range f.Headers[0 : len(f.Headers)-numToSkip] {
+					txs = append(txs, world.getTransactions(hdr.Number.Uint64()))
+					uncles = append(uncles, emptyList)
+				}
+				time.Sleep(100 * time.Millisecond)
+				_, err := q.DeliverBodies(peer.id, txs, uncles)
+				if err != nil {
+					fmt.Printf("delivered %d bodies %v\n", len(txs), err)
+				}
+			} else {
+				i++
+				time.Sleep(200 * time.Millisecond)
+			}
+		}
+	}()
+	go func() {
+		defer wg.Done()
+		// reserve receiptfetch
+		peer := dummyPeer("peer-3")
+		for {
+			f, _, _ := q.ReserveReceipts(peer, rand.Intn(50))
+			if f != nil {
+				var rcs [][]*types.Receipt
+				for _, hdr := range f.Headers {
+					rcs = append(rcs, world.getReceipts(hdr.Number.Uint64()))
+				}
+				_, err := q.DeliverReceipts(peer.id, rcs)
+				if err != nil {
+					fmt.Printf("delivered %d receipts %v\n", len(rcs), err)
+				}
+				time.Sleep(100 * time.Millisecond)
+			} else {
+				time.Sleep(200 * time.Millisecond)
+			}
+		}
+	}()
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		for i := 0; i < 50; i++ {
+			time.Sleep(300 * time.Millisecond)
+			//world.tick()
+			//fmt.Printf("trying to progress\n")
+			world.progress(rand.Intn(100))
+		}
+		for i := 0; i < 50; i++ {
+			time.Sleep(2990 * time.Millisecond)
+
+		}
+	}()
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		for {
+			time.Sleep(990 * time.Millisecond)
+			fmt.Printf("world block tip is %d\n",
+				world.chain[len(world.chain)-1].Header().Number.Uint64())
+			fmt.Println(q.Stats())
+		}
+	}()
+	wg.Wait()
+}
+
+func newNetwork() *network {
+	var l sync.RWMutex
+	return &network{
+		cond:   sync.NewCond(&l),
+		offset: 1, // block 1 is at blocks[0]
+	}
+}
+
+// represents the network
+type network struct {
+	offset   int
+	chain    []*types.Block
+	receipts []types.Receipts
+	lock     sync.RWMutex
+	cond     *sync.Cond
+}
+
+func (n *network) getTransactions(blocknum uint64) types.Transactions {
+	index := blocknum - uint64(n.offset)
+	return n.chain[index].Transactions()
+}
+func (n *network) getReceipts(blocknum uint64) types.Receipts {
+	index := blocknum - uint64(n.offset)
+	if got := n.chain[index].Header().Number.Uint64(); got != blocknum {
+		fmt.Printf("Err, got %d exp %d\n", got, blocknum)
+		panic("sd")
+	}
+	return n.receipts[index]
+}
+
+func (n *network) forget(blocknum uint64) {
+	index := blocknum - uint64(n.offset)
+	n.chain = n.chain[index:]
+	n.receipts = n.receipts[index:]
+	n.offset = int(blocknum)
+
+}
+func (n *network) progress(numBlocks int) {
+
+	n.lock.Lock()
+	defer n.lock.Unlock()
+	//fmt.Printf("progressing...\n")
+	newBlocks, newR := makeChain(numBlocks, 0, n.chain[len(n.chain)-1], false)
+	n.chain = append(n.chain, newBlocks...)
+	n.receipts = append(n.receipts, newR...)
+	n.cond.Broadcast()
+
+}
+
+func (n *network) headers(from int) []*types.Header {
+	numHeaders := 128
+	var hdrs []*types.Header
+	index := from - n.offset
+
+	for index >= len(n.chain) {
+		// wait for progress
+		n.cond.L.Lock()
+		//fmt.Printf("header going into wait\n")
+		n.cond.Wait()
+		index = from - n.offset
+		n.cond.L.Unlock()
+	}
+	n.lock.RLock()
+	defer n.lock.RUnlock()
+	for i, b := range n.chain[index:] {
+		hdrs = append(hdrs, b.Header())
+		if i >= numHeaders {
+			break
+		}
+	}
+	return hdrs
+}
diff --git a/eth/downloader/resultstore.go b/eth/downloader/resultstore.go
new file mode 100644
index 0000000000000000000000000000000000000000..21928c2a00baf0a8cbc095ce429eecdc21274d3e
--- /dev/null
+++ b/eth/downloader/resultstore.go
@@ -0,0 +1,194 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package downloader
+
+import (
+	"fmt"
+	"sync"
+	"sync/atomic"
+
+	"github.com/ethereum/go-ethereum/core/types"
+)
+
+// resultStore implements a structure for maintaining fetchResults, tracking their
+// download-progress and delivering (finished) results.
+type resultStore struct {
+	items        []*fetchResult // Downloaded but not yet delivered fetch results
+	resultOffset uint64         // Offset of the first cached fetch result in the block chain
+
+	// Internal index of first non-completed entry, updated atomically when needed.
+	// If all items are complete, this will equal length(items), so
+	// *important* : is not safe to use for indexing without checking against length
+	indexIncomplete int32 // atomic access
+
+	// throttleThreshold is the limit up to which we _want_ to fill the
+	// results. If blocks are large, we want to limit the results to less
+	// than the number of available slots, and maybe only fill 1024 out of
+	// 8192 possible places. The queue will, at certain times, recalibrate
+	// this index.
+	throttleThreshold uint64
+
+	lock sync.RWMutex
+}
+
+func newResultStore(size int) *resultStore {
+	return &resultStore{
+		resultOffset:      0,
+		items:             make([]*fetchResult, size),
+		throttleThreshold: uint64(size),
+	}
+}
+
+// SetThrottleThreshold updates the throttling threshold based on the requested
+// limit and the total queue capacity. It returns the (possibly capped) threshold
+func (r *resultStore) SetThrottleThreshold(threshold uint64) uint64 {
+	r.lock.Lock()
+	defer r.lock.Unlock()
+
+	limit := uint64(len(r.items))
+	if threshold >= limit {
+		threshold = limit
+	}
+	r.throttleThreshold = threshold
+	return r.throttleThreshold
+}
+
+// AddFetch adds a header for body/receipt fetching. This is used when the queue
+// wants to reserve headers for fetching.
+//
+// It returns the following:
+//   stale     - if true, this item is already passed, and should not be requested again
+//   throttled - if true, the store is at capacity, this particular header is not prio now
+//   item      - the result to store data into
+//   err       - any error that occurred
+func (r *resultStore) AddFetch(header *types.Header, fastSync bool) (stale, throttled bool, item *fetchResult, err error) {
+	r.lock.Lock()
+	defer r.lock.Unlock()
+
+	var index int
+	item, index, stale, throttled, err = r.getFetchResult(header.Number.Uint64())
+	if err != nil || stale || throttled {
+		return stale, throttled, item, err
+	}
+	if item == nil {
+		item = newFetchResult(header, fastSync)
+		r.items[index] = item
+	}
+	return stale, throttled, item, err
+}
+
+// GetDeliverySlot returns the fetchResult for the given header. If the 'stale' flag
+// is true, that means the header has already been delivered 'upstream'. This method
+// does not bubble up the 'throttle' flag, since it's moot at the point in time when
+// the item is downloaded and ready for delivery
+func (r *resultStore) GetDeliverySlot(headerNumber uint64) (*fetchResult, bool, error) {
+	r.lock.RLock()
+	defer r.lock.RUnlock()
+
+	res, _, stale, _, err := r.getFetchResult(headerNumber)
+	return res, stale, err
+}
+
+// getFetchResult returns the fetchResult corresponding to the given item, and
+// the index where the result is stored.
+func (r *resultStore) getFetchResult(headerNumber uint64) (item *fetchResult, index int, stale, throttle bool, err error) {
+	index = int(int64(headerNumber) - int64(r.resultOffset))
+	throttle = index >= int(r.throttleThreshold)
+	stale = index < 0
+
+	if index >= len(r.items) {
+		err = fmt.Errorf("%w: index allocation went beyond available resultStore space "+
+			"(index [%d] = header [%d] - resultOffset [%d], len(resultStore) = %d", errInvalidChain,
+			index, headerNumber, r.resultOffset, len(r.items))
+		return nil, index, stale, throttle, err
+	}
+	if stale {
+		return nil, index, stale, throttle, nil
+	}
+	item = r.items[index]
+	return item, index, stale, throttle, nil
+}
+
+// hasCompletedItems returns true if there are processable items available
+// this method is cheaper than countCompleted
+func (r *resultStore) HasCompletedItems() bool {
+	r.lock.RLock()
+	defer r.lock.RUnlock()
+
+	if len(r.items) == 0 {
+		return false
+	}
+	if item := r.items[0]; item != nil && item.AllDone() {
+		return true
+	}
+	return false
+}
+
+// countCompleted returns the number of items ready for delivery, stopping at
+// the first non-complete item.
+//
+// The mthod assumes (at least) rlock is held.
+func (r *resultStore) countCompleted() int {
+	// We iterate from the already known complete point, and see
+	// if any more has completed since last count
+	index := atomic.LoadInt32(&r.indexIncomplete)
+	for ; ; index++ {
+		if index >= int32(len(r.items)) {
+			break
+		}
+		result := r.items[index]
+		if result == nil || !result.AllDone() {
+			break
+		}
+	}
+	atomic.StoreInt32(&r.indexIncomplete, index)
+	return int(index)
+}
+
+// GetCompleted returns the next batch of completed fetchResults
+func (r *resultStore) GetCompleted(limit int) []*fetchResult {
+	r.lock.Lock()
+	defer r.lock.Unlock()
+
+	completed := r.countCompleted()
+	if limit > completed {
+		limit = completed
+	}
+	results := make([]*fetchResult, limit)
+	copy(results, r.items[:limit])
+
+	// Delete the results from the cache and clear the tail.
+	copy(r.items, r.items[limit:])
+	for i := len(r.items) - limit; i < len(r.items); i++ {
+		r.items[i] = nil
+	}
+	// Advance the expected block number of the first cache entry
+	r.resultOffset += uint64(limit)
+	atomic.AddInt32(&r.indexIncomplete, int32(-limit))
+
+	return results
+}
+
+// Prepare initialises the offset with the given block number
+func (r *resultStore) Prepare(offset uint64) {
+	r.lock.Lock()
+	defer r.lock.Unlock()
+
+	if r.resultOffset < offset {
+		r.resultOffset = offset
+	}
+}
diff --git a/eth/downloader/statesync.go b/eth/downloader/statesync.go
index b022617bbc3f9e5de9c62c54d88bf4bdc02c96f7..6a16114c26bf6e742d9d588643fc578deb8bc89a 100644
--- a/eth/downloader/statesync.go
+++ b/eth/downloader/statesync.go
@@ -34,7 +34,7 @@ import (
 // stateReq represents a batch of state fetch requests grouped together into
 // a single data retrieval network packet.
 type stateReq struct {
-	items    []common.Hash              // Hashes of the state items to download
+	nItems   uint16                     // Number of items requested for download (max is 384, so uint16 is sufficient)
 	tasks    map[common.Hash]*stateTask // Download tasks to track previous attempts
 	timeout  time.Duration              // Maximum round trip time for this to complete
 	timer    *time.Timer                // Timer to fire when the RTT timeout expires
@@ -99,7 +99,6 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync {
 		finished []*stateReq                  // Completed or failed requests
 		timeout  = make(chan *stateReq)       // Timed out active requests
 	)
-
 	// Run the state sync.
 	log.Trace("State sync starting", "root", s.root)
 	go s.run()
@@ -235,16 +234,16 @@ func (d *Downloader) spindownStateSync(active map[string]*stateReq, finished []*
 		if req == nil {
 			continue
 		}
-		req.peer.log.Trace("State peer marked idle (spindown)", "req.items", len(req.items), "reason", reason)
+		req.peer.log.Trace("State peer marked idle (spindown)", "req.items", int(req.nItems), "reason", reason)
 		req.timer.Stop()
 		delete(active, req.peer.id)
-		req.peer.SetNodeDataIdle(len(req.items))
+		req.peer.SetNodeDataIdle(int(req.nItems), time.Now())
 	}
 	// The 'finished' set contains deliveries that we were going to pass to processing.
 	// Those are now moot, but we still need to set those peers as idle, which would
 	// otherwise have been done after processing
 	for _, req := range finished {
-		req.peer.SetNodeDataIdle(len(req.items))
+		req.peer.SetNodeDataIdle(int(req.nItems), time.Now())
 	}
 }
 
@@ -350,9 +349,10 @@ func (s *stateSync) loop() (err error) {
 			return errCanceled
 
 		case req := <-s.deliver:
+			deliveryTime := time.Now()
 			// Response, disconnect or timeout triggered, drop the peer if stalling
 			log.Trace("Received node data response", "peer", req.peer.id, "count", len(req.response), "dropped", req.dropped, "timeout", !req.dropped && req.timedOut())
-			if len(req.items) <= 2 && !req.dropped && req.timedOut() {
+			if req.nItems <= 2 && !req.dropped && req.timedOut() {
 				// 2 items are the minimum requested, if even that times out, we've no use of
 				// this peer at the moment.
 				log.Warn("Stalling state sync, dropping peer", "peer", req.peer.id)
@@ -376,7 +376,7 @@ func (s *stateSync) loop() (err error) {
 			}
 			// Process all the received blobs and check for stale delivery
 			delivered, err := s.process(req)
-			req.peer.SetNodeDataIdle(delivered)
+			req.peer.SetNodeDataIdle(delivered, deliveryTime)
 			if err != nil {
 				log.Warn("Node data write error", "err", err)
 				return err
@@ -413,14 +413,14 @@ func (s *stateSync) assignTasks() {
 		// Assign a batch of fetches proportional to the estimated latency/bandwidth
 		cap := p.NodeDataCapacity(s.d.requestRTT())
 		req := &stateReq{peer: p, timeout: s.d.requestTTL()}
-		s.fillTasks(cap, req)
+		items := s.fillTasks(cap, req)
 
 		// If the peer was assigned tasks to fetch, send the network request
-		if len(req.items) > 0 {
-			req.peer.log.Trace("Requesting new batch of data", "type", "state", "count", len(req.items), "root", s.root)
+		if len(items) > 0 {
+			req.peer.log.Trace("Requesting new batch of data", "type", "state", "count", len(items), "root", s.root)
 			select {
 			case s.d.trackStateReq <- req:
-				req.peer.FetchNodeData(req.items)
+				req.peer.FetchNodeData(items)
 			case <-s.cancel:
 			case <-s.d.cancelCh:
 			}
@@ -430,7 +430,7 @@ func (s *stateSync) assignTasks() {
 
 // fillTasks fills the given request object with a maximum of n state download
 // tasks to send to the remote peer.
-func (s *stateSync) fillTasks(n int, req *stateReq) {
+func (s *stateSync) fillTasks(n int, req *stateReq) []common.Hash {
 	// Refill available tasks from the scheduler.
 	if len(s.tasks) < n {
 		new := s.sched.Missing(n - len(s.tasks))
@@ -439,11 +439,11 @@ func (s *stateSync) fillTasks(n int, req *stateReq) {
 		}
 	}
 	// Find tasks that haven't been tried with the request's peer.
-	req.items = make([]common.Hash, 0, n)
+	items := make([]common.Hash, 0, n)
 	req.tasks = make(map[common.Hash]*stateTask, n)
 	for hash, t := range s.tasks {
 		// Stop when we've gathered enough requests
-		if len(req.items) == n {
+		if len(items) == n {
 			break
 		}
 		// Skip any requests we've already tried from this peer
@@ -452,10 +452,12 @@ func (s *stateSync) fillTasks(n int, req *stateReq) {
 		}
 		// Assign the request to this peer
 		t.attempts[req.peer.id] = struct{}{}
-		req.items = append(req.items, hash)
+		items = append(items, hash)
 		req.tasks[hash] = t
 		delete(s.tasks, hash)
 	}
+	req.nItems = uint16(len(items))
+	return items
 }
 
 // process iterates over a batch of delivered state data, injecting each item
diff --git a/eth/fetcher/block_fetcher.go b/eth/fetcher/block_fetcher.go
index 7690a538624e54a07fad5226030e70546b312504..b7aa47e5a1e8acf32562cb872dc1540f397d112e 100644
--- a/eth/fetcher/block_fetcher.go
+++ b/eth/fetcher/block_fetcher.go
@@ -538,40 +538,51 @@ func (f *BlockFetcher) loop() {
 				return
 			}
 			bodyFilterInMeter.Mark(int64(len(task.transactions)))
-
 			blocks := []*types.Block{}
-			for i := 0; i < len(task.transactions) && i < len(task.uncles); i++ {
-				// Match up a body to any possible completion request
-				matched := false
-
-				for hash, announce := range f.completing {
-					if f.queued[hash] == nil {
-						txnHash := types.DeriveSha(types.Transactions(task.transactions[i]))
-						uncleHash := types.CalcUncleHash(task.uncles[i])
-
-						if txnHash == announce.header.TxHash && uncleHash == announce.header.UncleHash && announce.origin == task.peer {
-							// Mark the body matched, reassemble if still unknown
-							matched = true
-
-							if f.getBlock(hash) == nil {
-								block := types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i])
-								block.ReceivedAt = task.time
-
-								blocks = append(blocks, block)
-							} else {
-								f.forgetHash(hash)
-							}
+			// abort early if there's nothing explicitly requested
+			if len(f.completing) > 0 {
+				for i := 0; i < len(task.transactions) && i < len(task.uncles); i++ {
+					// Match up a body to any possible completion request
+					var (
+						matched   = false
+						uncleHash common.Hash // calculated lazily and reused
+						txnHash   common.Hash // calculated lazily and reused
+					)
+					for hash, announce := range f.completing {
+						if f.queued[hash] != nil || announce.origin != task.peer {
+							continue
+						}
+						if uncleHash == (common.Hash{}) {
+							uncleHash = types.CalcUncleHash(task.uncles[i])
+						}
+						if uncleHash != announce.header.UncleHash {
+							continue
+						}
+						if txnHash == (common.Hash{}) {
+							txnHash = types.DeriveSha(types.Transactions(task.transactions[i]))
+						}
+						if txnHash != announce.header.TxHash {
+							continue
 						}
+						// Mark the body matched, reassemble if still unknown
+						matched = true
+						if f.getBlock(hash) == nil {
+							block := types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i])
+							block.ReceivedAt = task.time
+							blocks = append(blocks, block)
+						} else {
+							f.forgetHash(hash)
+						}
+
+					}
+					if matched {
+						task.transactions = append(task.transactions[:i], task.transactions[i+1:]...)
+						task.uncles = append(task.uncles[:i], task.uncles[i+1:]...)
+						i--
+						continue
 					}
-				}
-				if matched {
-					task.transactions = append(task.transactions[:i], task.transactions[i+1:]...)
-					task.uncles = append(task.uncles[:i], task.uncles[i+1:]...)
-					i--
-					continue
 				}
 			}
-
 			bodyFilterOutMeter.Mark(int64(len(task.transactions)))
 			select {
 			case filter <- task: