From b6f5523bdcded47c4f92b4cb5e6e23287bd6b60d Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= <peterke@gmail.com>
Date: Thu, 29 Oct 2015 18:37:26 +0200
Subject: [PATCH] eth/downloader: fetch data proportionally to peer capacity

---
 eth/downloader/downloader.go | 178 +++++++++++----------------
 eth/downloader/peer.go       | 231 +++++++++++++++++++----------------
 eth/downloader/queue.go      |  95 ++++++++------
 3 files changed, 258 insertions(+), 246 deletions(-)

diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 5fa18a2e3..c272d05af 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -45,16 +45,17 @@ var (
 	MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request
 	MaxStateFetch   = 384 // Amount of node state values to allow fetching per request
 
-	hashTTL        = 5 * time.Second    // [eth/61] Time it takes for a hash request to time out
-	blockSoftTTL   = 3 * time.Second    // [eth/61] Request completion threshold for increasing or decreasing a peer's bandwidth
-	blockHardTTL   = 3 * blockSoftTTL   // [eth/61] Maximum time allowance before a block request is considered expired
-	headerTTL      = 5 * time.Second    // [eth/62] Time it takes for a header request to time out
-	bodySoftTTL    = 3 * time.Second    // [eth/62] Request completion threshold for increasing or decreasing a peer's bandwidth
-	bodyHardTTL    = 3 * bodySoftTTL    // [eth/62] Maximum time allowance before a block body request is considered expired
-	receiptSoftTTL = 3 * time.Second    // [eth/63] Request completion threshold for increasing or decreasing a peer's bandwidth
-	receiptHardTTL = 3 * receiptSoftTTL // [eth/63] Maximum time allowance before a receipt request is considered expired
-	stateSoftTTL   = 2 * time.Second    // [eth/63] Request completion threshold for increasing or decreasing a peer's bandwidth
-	stateHardTTL   = 3 * stateSoftTTL   // [eth/63] Maximum time allowance before a node data request is considered expired
+	hashTTL        = 3 * time.Second     // [eth/61] Time it takes for a hash request to time out
+	blockTargetRTT = 3 * time.Second / 2 // [eth/61] Target time for completing a block retrieval request
+	blockTTL       = 3 * blockTargetRTT  // [eth/61] Maximum time allowance before a block request is considered expired
+
+	headerTTL        = 3 * time.Second      // [eth/62] Time it takes for a header request to time out
+	bodyTargetRTT    = 3 * time.Second / 2  // [eth/62] Target time for completing a block body retrieval request
+	bodyTTL          = 3 * bodyTargetRTT    // [eth/62] Maximum time allowance before a block body request is considered expired
+	receiptTargetRTT = 3 * time.Second / 2  // [eth/63] Target time for completing a receipt retrieval request
+	receiptTTL       = 3 * receiptTargetRTT // [eth/63] Maximum time allowance before a receipt request is considered expired
+	stateTargetRTT   = 2 * time.Second / 2  // [eth/63] Target time for completing a state trie retrieval request
+	stateTTL         = 3 * stateTargetRTT   // [eth/63] Maximum time allowance before a node data request is considered expired
 
 	maxQueuedHashes   = 256 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection)
 	maxQueuedHeaders  = 256 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
@@ -486,7 +487,7 @@ func (d *Downloader) fetchHeight61(p *peer) (uint64, error) {
 	// Request the advertised remote head block and wait for the response
 	go p.getBlocks([]common.Hash{p.head})
 
-	timeout := time.After(blockSoftTTL)
+	timeout := time.After(hashTTL)
 	for {
 		select {
 		case <-d.cancelCh:
@@ -779,47 +780,27 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
 			// If the peer was previously banned and failed to deliver it's pack
 			// in a reasonable time frame, ignore it's message.
 			if peer := d.peers.Peer(packet.PeerId()); peer != nil {
-				// Deliver the received chunk of blocks, and demote in case of errors
 				blocks := packet.(*blockPack).blocks
-				err := d.queue.DeliverBlocks(peer.id, blocks)
-				switch err {
-				case nil:
-					// If no blocks were delivered, demote the peer (need the delivery above)
-					if len(blocks) == 0 {
-						peer.Demote()
-						peer.SetBlocksIdle()
-						glog.V(logger.Detail).Infof("%s: no blocks delivered", peer)
-						break
-					}
-					// All was successful, promote the peer and potentially start processing
-					peer.Promote()
-					peer.SetBlocksIdle()
-					glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blocks))
 
-				case errInvalidChain:
-					// The hash chain is invalid (blocks are not ordered properly), abort
+				// Deliver the received chunk of blocks and check chain validity
+				accepted, err := d.queue.DeliverBlocks(peer.id, blocks)
+				if err == errInvalidChain {
 					return err
-
-				case errNoFetchesPending:
-					// Peer probably timed out with its delivery but came through
-					// in the end, demote, but allow to to pull from this peer.
-					peer.Demote()
-					peer.SetBlocksIdle()
-					glog.V(logger.Detail).Infof("%s: out of bound delivery", peer)
-
-				case errStaleDelivery:
-					// Delivered something completely else than requested, usually
-					// caused by a timeout and delivery during a new sync cycle.
-					// Don't set it to idle as the original request should still be
-					// in flight.
-					peer.Demote()
-					glog.V(logger.Detail).Infof("%s: stale delivery", peer)
-
+				}
+				// Unless a peer delivered something completely else than requested (usually
+				// caused by a timed out request which came through in the end), set it to
+				// idle. If the delivery's stale, the peer should have already been idled.
+				if err != errStaleDelivery {
+					peer.SetBlocksIdle(accepted)
+				}
+				// Issue a log to the user to see what's going on
+				switch {
+				case err == nil && len(blocks) == 0:
+					glog.V(logger.Detail).Infof("%s: no blocks delivered", peer)
+				case err == nil:
+					glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blocks))
 				default:
-					// Peer did something semi-useful, demote but keep it around
-					peer.Demote()
-					peer.SetBlocksIdle()
-					glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err)
+					glog.V(logger.Detail).Infof("%s: delivery failed: %v", peer, err)
 				}
 			}
 			// Blocks arrived, try to update the progress
@@ -852,10 +833,15 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
 				return errNoPeers
 			}
 			// Check for block request timeouts and demote the responsible peers
-			for _, pid := range d.queue.ExpireBlocks(blockHardTTL) {
+			for pid, fails := range d.queue.ExpireBlocks(blockTTL) {
 				if peer := d.peers.Peer(pid); peer != nil {
-					peer.Demote()
-					glog.V(logger.Detail).Infof("%s: block delivery timeout", peer)
+					if fails > 1 {
+						glog.V(logger.Detail).Infof("%s: block delivery timeout", peer)
+						peer.SetBlocksIdle(0)
+					} else {
+						glog.V(logger.Debug).Infof("%s: stalling block delivery, dropping", peer)
+						d.dropPeer(pid)
+					}
 				}
 			}
 			// If there's nothing more to fetch, wait or terminate
@@ -1281,14 +1267,14 @@ func (d *Downloader) fetchBodies(from uint64) error {
 	glog.V(logger.Debug).Infof("Downloading block bodies from #%d", from)
 
 	var (
-		deliver = func(packet dataPack) error {
+		deliver = func(packet dataPack) (int, error) {
 			pack := packet.(*bodyPack)
 			return d.queue.DeliverBodies(pack.peerId, pack.transactions, pack.uncles)
 		}
-		expire   = func() []string { return d.queue.ExpireBodies(bodyHardTTL) }
+		expire   = func() map[string]int { return d.queue.ExpireBodies(bodyTTL) }
 		fetch    = func(p *peer, req *fetchRequest) error { return p.FetchBodies(req) }
 		capacity = func(p *peer) int { return p.BlockCapacity() }
-		setIdle  = func(p *peer) { p.SetBodiesIdle() }
+		setIdle  = func(p *peer, accepted int) { p.SetBodiesIdle(accepted) }
 	)
 	err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire,
 		d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies,
@@ -1305,14 +1291,14 @@ func (d *Downloader) fetchReceipts(from uint64) error {
 	glog.V(logger.Debug).Infof("Downloading receipts from #%d", from)
 
 	var (
-		deliver = func(packet dataPack) error {
+		deliver = func(packet dataPack) (int, error) {
 			pack := packet.(*receiptPack)
 			return d.queue.DeliverReceipts(pack.peerId, pack.receipts)
 		}
-		expire   = func() []string { return d.queue.ExpireReceipts(receiptHardTTL) }
+		expire   = func() map[string]int { return d.queue.ExpireReceipts(receiptTTL) }
 		fetch    = func(p *peer, req *fetchRequest) error { return p.FetchReceipts(req) }
 		capacity = func(p *peer) int { return p.ReceiptCapacity() }
-		setIdle  = func(p *peer) { p.SetReceiptsIdle() }
+		setIdle  = func(p *peer, accepted int) { p.SetReceiptsIdle(accepted) }
 	)
 	err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire,
 		d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts,
@@ -1329,7 +1315,7 @@ func (d *Downloader) fetchNodeData() error {
 	glog.V(logger.Debug).Infof("Downloading node state data")
 
 	var (
-		deliver = func(packet dataPack) error {
+		deliver = func(packet dataPack) (int, error) {
 			start := time.Now()
 			return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(err error, delivered int) {
 				if err != nil {
@@ -1352,14 +1338,14 @@ func (d *Downloader) fetchNodeData() error {
 				glog.V(logger.Info).Infof("imported %d state entries in %v: processed %d in total", delivered, time.Since(start), d.syncStatsStateDone)
 			})
 		}
-		expire   = func() []string { return d.queue.ExpireNodeData(stateHardTTL) }
+		expire   = func() map[string]int { return d.queue.ExpireNodeData(stateTTL) }
 		throttle = func() bool { return false }
 		reserve  = func(p *peer, count int) (*fetchRequest, bool, error) {
 			return d.queue.ReserveNodeData(p, count), false, nil
 		}
 		fetch    = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) }
 		capacity = func(p *peer) int { return p.NodeDataCapacity() }
-		setIdle  = func(p *peer) { p.SetNodeDataIdle() }
+		setIdle  = func(p *peer, accepted int) { p.SetNodeDataIdle(accepted) }
 	)
 	err := d.fetchParts(errCancelStateFetch, d.stateCh, deliver, d.stateWakeCh, expire,
 		d.queue.PendingNodeData, d.queue.InFlightNodeData, throttle, reserve, nil, fetch,
@@ -1372,10 +1358,10 @@ func (d *Downloader) fetchNodeData() error {
 // fetchParts iteratively downloads scheduled block parts, taking any available
 // peers, reserving a chunk of fetch requests for each, waiting for delivery and
 // also periodically checking for timeouts.
-func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(packet dataPack) error, wakeCh chan bool,
-	expire func() []string, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error),
+func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
+	expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error),
 	fetchHook func([]*types.Header), fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int,
-	idle func() ([]*peer, int), setIdle func(*peer), kind string) error {
+	idle func() ([]*peer, int), setIdle func(*peer, int), kind string) error {
 
 	// Create a ticker to detect expired retrieval tasks
 	ticker := time.NewTicker(100 * time.Millisecond)
@@ -1394,45 +1380,25 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
 			// If the peer was previously banned and failed to deliver it's pack
 			// in a reasonable time frame, ignore it's message.
 			if peer := d.peers.Peer(packet.PeerId()); peer != nil {
-				// Deliver the received chunk of data, and demote in case of errors
-				switch err := deliver(packet); err {
-				case nil:
-					// If no blocks were delivered, demote the peer (need the delivery above to clean internal queue!)
-					if packet.Items() == 0 {
-						peer.Demote()
-						setIdle(peer)
-						glog.V(logger.Detail).Infof("%s: no %s delivered", peer, strings.ToLower(kind))
-						break
-					}
-					// All was successful, promote the peer and potentially start processing
-					peer.Promote()
-					setIdle(peer)
-					glog.V(logger.Detail).Infof("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind))
-
-				case errInvalidChain:
-					// The hash chain is invalid (blocks are not ordered properly), abort
+				// Deliver the received chunk of data and check chain validity
+				accepted, err := deliver(packet)
+				if err == errInvalidChain {
 					return err
-
-				case errNoFetchesPending:
-					// Peer probably timed out with its delivery but came through
-					// in the end, demote, but allow to to pull from this peer.
-					peer.Demote()
-					setIdle(peer)
-					glog.V(logger.Detail).Infof("%s: out of bound %s delivery", peer, strings.ToLower(kind))
-
-				case errStaleDelivery:
-					// Delivered something completely else than requested, usually
-					// caused by a timeout and delivery during a new sync cycle.
-					// Don't set it to idle as the original request should still be
-					// in flight.
-					peer.Demote()
-					glog.V(logger.Detail).Infof("%s: %s stale delivery", peer, strings.ToLower(kind))
-
+				}
+				// Unless a peer delivered something completely else than requested (usually
+				// caused by a timed out request which came through in the end), set it to
+				// idle. If the delivery's stale, the peer should have already been idled.
+				if err != errStaleDelivery {
+					setIdle(peer, accepted)
+				}
+				// Issue a log to the user to see what's going on
+				switch {
+				case err == nil && packet.Items() == 0:
+					glog.V(logger.Detail).Infof("%s: no %s delivered", peer, strings.ToLower(kind))
+				case err == nil:
+					glog.V(logger.Detail).Infof("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind))
 				default:
-					// Peer did something semi-useful, demote but keep it around
-					peer.Demote()
-					setIdle(peer)
-					glog.V(logger.Detail).Infof("%s: %s delivery partially failed: %v", peer, strings.ToLower(kind), err)
+					glog.V(logger.Detail).Infof("%s: %s delivery failed: %v", peer, strings.ToLower(kind), err)
 				}
 			}
 			// Blocks assembled, try to update the progress
@@ -1465,11 +1431,15 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
 				return errNoPeers
 			}
 			// Check for fetch request timeouts and demote the responsible peers
-			for _, pid := range expire() {
+			for pid, fails := range expire() {
 				if peer := d.peers.Peer(pid); peer != nil {
-					peer.Demote()
-					setIdle(peer)
-					glog.V(logger.Detail).Infof("%s: %s delivery timeout", peer, strings.ToLower(kind))
+					if fails > 1 {
+						glog.V(logger.Detail).Infof("%s: %s delivery timeout", peer, strings.ToLower(kind))
+						setIdle(peer, 0)
+					} else {
+						glog.V(logger.Debug).Infof("%s: stalling %s delivery, dropping", peer, strings.ToLower(kind))
+						d.dropPeer(pid)
+					}
 				}
 			}
 			// If there's nothing more to fetch, wait or terminate
diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go
index 9ba6dabbd..80f08b68f 100644
--- a/eth/downloader/peer.go
+++ b/eth/downloader/peer.go
@@ -30,8 +30,10 @@ import (
 	"github.com/ethereum/go-ethereum/common"
 )
 
-// Maximum number of entries allowed on the list or lacking items.
-const maxLackingHashes = 4096
+const (
+	maxLackingHashes = 4096 // Maximum number of entries allowed on the list or lacking items
+	throughputImpact = 0.1  // The impact a single measurement has on a peer's final throughput value.
+)
 
 // Hash and block fetchers belonging to eth/61 and below
 type relativeHashFetcherFn func(common.Hash) error
@@ -59,18 +61,16 @@ type peer struct {
 	blockIdle   int32 // Current block activity state of the peer (idle = 0, active = 1)
 	receiptIdle int32 // Current receipt activity state of the peer (idle = 0, active = 1)
 	stateIdle   int32 // Current node data activity state of the peer (idle = 0, active = 1)
-	rep         int32 // Simple peer reputation
 
-	blockCapacity   int32 // Number of blocks (bodies) allowed to fetch per request
-	receiptCapacity int32 // Number of receipts allowed to fetch per request
-	stateCapacity   int32 // Number of node data pieces allowed to fetch per request
+	blockThroughput   float64 // Number of blocks (bodies) measured to be retrievable per second
+	receiptThroughput float64 // Number of receipts measured to be retrievable per second
+	stateThroughput   float64 // Number of node data pieces measured to be retrievable per second
 
 	blockStarted   time.Time // Time instance when the last block (body)fetch was started
 	receiptStarted time.Time // Time instance when the last receipt fetch was started
 	stateStarted   time.Time // Time instance when the last node data fetch was started
 
-	lacking     map[common.Hash]struct{} // Set of hashes not to request (didn't have previously)
-	lackingLock sync.RWMutex             // Lock protecting the lacking hashes list
+	lacking map[common.Hash]struct{} // Set of hashes not to request (didn't have previously)
 
 	getRelHashes relativeHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an origin hash
 	getAbsHashes absoluteHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an absolute position
@@ -84,6 +84,7 @@ type peer struct {
 	getNodeData stateFetcherFn   // [eth/63] Method to retrieve a batch of state trie data
 
 	version int // Eth protocol version number to switch strategies
+	lock    sync.RWMutex
 }
 
 // newPeer create a new downloader peer, with specific hash and block retrieval
@@ -93,12 +94,9 @@ func newPeer(id string, version int, head common.Hash,
 	getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn,
 	getReceipts receiptFetcherFn, getNodeData stateFetcherFn) *peer {
 	return &peer{
-		id:              id,
-		head:            head,
-		blockCapacity:   1,
-		receiptCapacity: 1,
-		stateCapacity:   1,
-		lacking:         make(map[common.Hash]struct{}),
+		id:      id,
+		head:    head,
+		lacking: make(map[common.Hash]struct{}),
 
 		getRelHashes: getRelHashes,
 		getAbsHashes: getAbsHashes,
@@ -117,15 +115,18 @@ func newPeer(id string, version int, head common.Hash,
 
 // Reset clears the internal state of a peer entity.
 func (p *peer) Reset() {
+	p.lock.Lock()
+	defer p.lock.Unlock()
+
 	atomic.StoreInt32(&p.blockIdle, 0)
 	atomic.StoreInt32(&p.receiptIdle, 0)
-	atomic.StoreInt32(&p.blockCapacity, 1)
-	atomic.StoreInt32(&p.receiptCapacity, 1)
-	atomic.StoreInt32(&p.stateCapacity, 1)
+	atomic.StoreInt32(&p.stateIdle, 0)
+
+	p.blockThroughput = 0
+	p.receiptThroughput = 0
+	p.stateThroughput = 0
 
-	p.lackingLock.Lock()
 	p.lacking = make(map[common.Hash]struct{})
-	p.lackingLock.Unlock()
 }
 
 // Fetch61 sends a block retrieval request to the remote peer.
@@ -216,107 +217,86 @@ func (p *peer) FetchNodeData(request *fetchRequest) error {
 	return nil
 }
 
-// SetBlocksIdle sets the peer to idle, allowing it to execute new retrieval requests.
-// Its block retrieval allowance will also be updated either up- or downwards,
-// depending on whether the previous fetch completed in time.
-func (p *peer) SetBlocksIdle() {
-	p.setIdle(p.blockStarted, blockSoftTTL, blockHardTTL, MaxBlockFetch, &p.blockCapacity, &p.blockIdle)
+// SetBlocksIdle sets the peer to idle, allowing it to execute new block retrieval
+// requests. Its estimated block retrieval throughput is updated with that measured
+// just now.
+func (p *peer) SetBlocksIdle(delivered int) {
+	p.setIdle(p.blockStarted, delivered, &p.blockThroughput, &p.blockIdle)
 }
 
-// SetBodiesIdle sets the peer to idle, allowing it to execute new retrieval requests.
-// Its block body retrieval allowance will also be updated either up- or downwards,
-// depending on whether the previous fetch completed in time.
-func (p *peer) SetBodiesIdle() {
-	p.setIdle(p.blockStarted, bodySoftTTL, bodyHardTTL, MaxBodyFetch, &p.blockCapacity, &p.blockIdle)
+// SetBodiesIdle sets the peer to idle, allowing it to execute block body retrieval
+// requests. Its estimated body retrieval throughput is updated with that measured
+// just now.
+func (p *peer) SetBodiesIdle(delivered int) {
+	p.setIdle(p.blockStarted, delivered, &p.blockThroughput, &p.blockIdle)
 }
 
-// SetReceiptsIdle sets the peer to idle, allowing it to execute new retrieval requests.
-// Its receipt retrieval allowance will also be updated either up- or downwards,
-// depending on whether the previous fetch completed in time.
-func (p *peer) SetReceiptsIdle() {
-	p.setIdle(p.receiptStarted, receiptSoftTTL, receiptHardTTL, MaxReceiptFetch, &p.receiptCapacity, &p.receiptIdle)
+// SetReceiptsIdle sets the peer to idle, allowing it to execute new receipt
+// retrieval requests. Its estimated receipt retrieval throughput is updated
+// with that measured just now.
+func (p *peer) SetReceiptsIdle(delivered int) {
+	p.setIdle(p.receiptStarted, delivered, &p.receiptThroughput, &p.receiptIdle)
 }
 
-// SetNodeDataIdle sets the peer to idle, allowing it to execute new retrieval
-// requests. Its node data retrieval allowance will also be updated either up- or
-// downwards, depending on whether the previous fetch completed in time.
-func (p *peer) SetNodeDataIdle() {
-	p.setIdle(p.stateStarted, stateSoftTTL, stateSoftTTL, MaxStateFetch, &p.stateCapacity, &p.stateIdle)
+// SetNodeDataIdle sets the peer to idle, allowing it to execute new state trie
+// data retrieval requests. Its estimated state retrieval throughput is updated
+// with that measured just now.
+func (p *peer) SetNodeDataIdle(delivered int) {
+	p.setIdle(p.stateStarted, delivered, &p.stateThroughput, &p.stateIdle)
 }
 
 // setIdle sets the peer to idle, allowing it to execute new retrieval requests.
-// Its data retrieval allowance will also be updated either up- or downwards,
-// depending on whether the previous fetch completed in time.
-func (p *peer) setIdle(started time.Time, softTTL, hardTTL time.Duration, maxFetch int, capacity, idle *int32) {
-	// Update the peer's download allowance based on previous performance
-	scale := 2.0
-	if time.Since(started) > softTTL {
-		scale = 0.5
-		if time.Since(started) > hardTTL {
-			scale = 1 / float64(maxFetch) // reduces capacity to 1
-		}
-	}
-	for {
-		// Calculate the new download bandwidth allowance
-		prev := atomic.LoadInt32(capacity)
-		next := int32(math.Max(1, math.Min(float64(maxFetch), float64(prev)*scale)))
-
-		// Try to update the old value
-		if atomic.CompareAndSwapInt32(capacity, prev, next) {
-			// If we're having problems at 1 capacity, try to find better peers
-			if next == 1 {
-				p.Demote()
-			}
-			break
-		}
+// Its estimated retrieval throughput is updated with that measured just now.
+func (p *peer) setIdle(started time.Time, delivered int, throughput *float64, idle *int32) {
+	// Irrelevant of the scaling, make sure the peer ends up idle
+	defer atomic.StoreInt32(idle, 0)
+
+	p.lock.RLock()
+	defer p.lock.RUnlock()
+
+	// If nothing was delivered (hard timeout / unavailable data), reduce throughput to minimum
+	if delivered == 0 {
+		*throughput = 0
+		return
 	}
-	// Set the peer to idle to allow further fetch requests
-	atomic.StoreInt32(idle, 0)
+	// Otherwise update the throughput with a new measurement
+	measured := float64(delivered) / (float64(time.Since(started)+1) / float64(time.Second)) // +1 (ns) to ensure non-zero divisor
+	*throughput = (1-throughputImpact)*(*throughput) + throughputImpact*measured
 }
 
 // BlockCapacity retrieves the peers block download allowance based on its
-// previously discovered bandwidth capacity.
+// previously discovered throughput.
 func (p *peer) BlockCapacity() int {
-	return int(atomic.LoadInt32(&p.blockCapacity))
-}
+	p.lock.RLock()
+	defer p.lock.RUnlock()
 
-// ReceiptCapacity retrieves the peers block download allowance based on its
-// previously discovered bandwidth capacity.
-func (p *peer) ReceiptCapacity() int {
-	return int(atomic.LoadInt32(&p.receiptCapacity))
+	return int(math.Max(1, math.Min(p.blockThroughput*float64(blockTargetRTT)/float64(time.Second), float64(MaxBlockFetch))))
 }
 
-// NodeDataCapacity retrieves the peers block download allowance based on its
-// previously discovered bandwidth capacity.
-func (p *peer) NodeDataCapacity() int {
-	return int(atomic.LoadInt32(&p.stateCapacity))
-}
+// ReceiptCapacity retrieves the peers receipt download allowance based on its
+// previously discovered throughput.
+func (p *peer) ReceiptCapacity() int {
+	p.lock.RLock()
+	defer p.lock.RUnlock()
 
-// Promote increases the peer's reputation.
-func (p *peer) Promote() {
-	atomic.AddInt32(&p.rep, 1)
+	return int(math.Max(1, math.Min(p.receiptThroughput*float64(receiptTargetRTT)/float64(time.Second), float64(MaxReceiptFetch))))
 }
 
-// Demote decreases the peer's reputation or leaves it at 0.
-func (p *peer) Demote() {
-	for {
-		// Calculate the new reputation value
-		prev := atomic.LoadInt32(&p.rep)
-		next := prev / 2
+// NodeDataCapacity retrieves the peers state download allowance based on its
+// previously discovered throughput.
+func (p *peer) NodeDataCapacity() int {
+	p.lock.RLock()
+	defer p.lock.RUnlock()
 
-		// Try to update the old value
-		if atomic.CompareAndSwapInt32(&p.rep, prev, next) {
-			return
-		}
-	}
+	return int(math.Max(1, math.Min(p.stateThroughput*float64(stateTargetRTT)/float64(time.Second), float64(MaxStateFetch))))
 }
 
 // MarkLacking appends a new entity to the set of items (blocks, receipts, states)
 // that a peer is known not to have (i.e. have been requested before). If the
 // set reaches its maximum allowed capacity, items are randomly dropped off.
 func (p *peer) MarkLacking(hash common.Hash) {
-	p.lackingLock.Lock()
-	defer p.lackingLock.Unlock()
+	p.lock.Lock()
+	defer p.lock.Unlock()
 
 	for len(p.lacking) >= maxLackingHashes {
 		for drop, _ := range p.lacking {
@@ -330,8 +310,8 @@ func (p *peer) MarkLacking(hash common.Hash) {
 // Lacks retrieves whether the hash of a blockchain item is on the peers lacking
 // list (i.e. whether we know that the peer does not have it).
 func (p *peer) Lacks(hash common.Hash) bool {
-	p.lackingLock.RLock()
-	defer p.lackingLock.RUnlock()
+	p.lock.RLock()
+	defer p.lock.RUnlock()
 
 	_, ok := p.lacking[hash]
 	return ok
@@ -339,13 +319,13 @@ func (p *peer) Lacks(hash common.Hash) bool {
 
 // String implements fmt.Stringer.
 func (p *peer) String() string {
-	p.lackingLock.RLock()
-	defer p.lackingLock.RUnlock()
+	p.lock.RLock()
+	defer p.lock.RUnlock()
 
 	return fmt.Sprintf("Peer %s [%s]", p.id,
-		fmt.Sprintf("reputation %3d, ", atomic.LoadInt32(&p.rep))+
-			fmt.Sprintf("block cap %3d, ", atomic.LoadInt32(&p.blockCapacity))+
-			fmt.Sprintf("receipt cap %3d, ", atomic.LoadInt32(&p.receiptCapacity))+
+		fmt.Sprintf("blocks %3.2f/s, ", p.blockThroughput)+
+			fmt.Sprintf("receipts %3.2f/s, ", p.receiptThroughput)+
+			fmt.Sprintf("states %3.2f/s, ", p.stateThroughput)+
 			fmt.Sprintf("lacking %4d", len(p.lacking)),
 	)
 }
@@ -377,6 +357,10 @@ func (ps *peerSet) Reset() {
 
 // Register injects a new peer into the working set, or returns an error if the
 // peer is already known.
+//
+// The method also sets the starting throughput values of the new peer to the
+// average of all existing peers, to give it a realistic change of being used
+// for data retrievals.
 func (ps *peerSet) Register(p *peer) error {
 	ps.lock.Lock()
 	defer ps.lock.Unlock()
@@ -384,6 +368,20 @@ func (ps *peerSet) Register(p *peer) error {
 	if _, ok := ps.peers[p.id]; ok {
 		return errAlreadyRegistered
 	}
+	if len(ps.peers) > 0 {
+		p.blockThroughput, p.receiptThroughput, p.stateThroughput = 0, 0, 0
+
+		for _, peer := range ps.peers {
+			peer.lock.RLock()
+			p.blockThroughput += peer.blockThroughput
+			p.receiptThroughput += peer.receiptThroughput
+			p.stateThroughput += peer.stateThroughput
+			peer.lock.RUnlock()
+		}
+		p.blockThroughput /= float64(len(ps.peers))
+		p.receiptThroughput /= float64(len(ps.peers))
+		p.stateThroughput /= float64(len(ps.peers))
+	}
 	ps.peers[p.id] = p
 	return nil
 }
@@ -435,7 +433,12 @@ func (ps *peerSet) BlockIdlePeers() ([]*peer, int) {
 	idle := func(p *peer) bool {
 		return atomic.LoadInt32(&p.blockIdle) == 0
 	}
-	return ps.idlePeers(61, 61, idle)
+	throughput := func(p *peer) float64 {
+		p.lock.RLock()
+		defer p.lock.RUnlock()
+		return p.blockThroughput
+	}
+	return ps.idlePeers(61, 61, idle, throughput)
 }
 
 // BodyIdlePeers retrieves a flat list of all the currently body-idle peers within
@@ -444,7 +447,12 @@ func (ps *peerSet) BodyIdlePeers() ([]*peer, int) {
 	idle := func(p *peer) bool {
 		return atomic.LoadInt32(&p.blockIdle) == 0
 	}
-	return ps.idlePeers(62, 64, idle)
+	throughput := func(p *peer) float64 {
+		p.lock.RLock()
+		defer p.lock.RUnlock()
+		return p.blockThroughput
+	}
+	return ps.idlePeers(62, 64, idle, throughput)
 }
 
 // ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers
@@ -453,7 +461,12 @@ func (ps *peerSet) ReceiptIdlePeers() ([]*peer, int) {
 	idle := func(p *peer) bool {
 		return atomic.LoadInt32(&p.receiptIdle) == 0
 	}
-	return ps.idlePeers(63, 64, idle)
+	throughput := func(p *peer) float64 {
+		p.lock.RLock()
+		defer p.lock.RUnlock()
+		return p.receiptThroughput
+	}
+	return ps.idlePeers(63, 64, idle, throughput)
 }
 
 // NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle
@@ -462,12 +475,18 @@ func (ps *peerSet) NodeDataIdlePeers() ([]*peer, int) {
 	idle := func(p *peer) bool {
 		return atomic.LoadInt32(&p.stateIdle) == 0
 	}
-	return ps.idlePeers(63, 64, idle)
+	throughput := func(p *peer) float64 {
+		p.lock.RLock()
+		defer p.lock.RUnlock()
+		return p.stateThroughput
+	}
+	return ps.idlePeers(63, 64, idle, throughput)
 }
 
 // idlePeers retrieves a flat list of all currently idle peers satisfying the
 // protocol version constraints, using the provided function to check idleness.
-func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peer) bool) ([]*peer, int) {
+// The resulting set of peers are sorted by their measure throughput.
+func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peer) bool, throughput func(*peer) float64) ([]*peer, int) {
 	ps.lock.RLock()
 	defer ps.lock.RUnlock()
 
@@ -482,7 +501,7 @@ func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peer)
 	}
 	for i := 0; i < len(idle); i++ {
 		for j := i + 1; j < len(idle); j++ {
-			if atomic.LoadInt32(&idle[i].rep) < atomic.LoadInt32(&idle[j].rep) {
+			if throughput(idle[i]) < throughput(idle[j]) {
 				idle[i], idle[j] = idle[j], idle[i]
 			}
 		}
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index 584797d7b..1e55560db 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -703,7 +703,7 @@ func (q *queue) Revoke(peerId string) {
 
 // ExpireBlocks checks for in flight requests that exceeded a timeout allowance,
 // canceling them and returning the responsible peers for penalisation.
-func (q *queue) ExpireBlocks(timeout time.Duration) []string {
+func (q *queue) ExpireBlocks(timeout time.Duration) map[string]int {
 	q.lock.Lock()
 	defer q.lock.Unlock()
 
@@ -712,7 +712,7 @@ func (q *queue) ExpireBlocks(timeout time.Duration) []string {
 
 // ExpireBodies checks for in flight block body requests that exceeded a timeout
 // allowance, canceling them and returning the responsible peers for penalisation.
-func (q *queue) ExpireBodies(timeout time.Duration) []string {
+func (q *queue) ExpireBodies(timeout time.Duration) map[string]int {
 	q.lock.Lock()
 	defer q.lock.Unlock()
 
@@ -721,7 +721,7 @@ func (q *queue) ExpireBodies(timeout time.Duration) []string {
 
 // ExpireReceipts checks for in flight receipt requests that exceeded a timeout
 // allowance, canceling them and returning the responsible peers for penalisation.
-func (q *queue) ExpireReceipts(timeout time.Duration) []string {
+func (q *queue) ExpireReceipts(timeout time.Duration) map[string]int {
 	q.lock.Lock()
 	defer q.lock.Unlock()
 
@@ -730,7 +730,7 @@ func (q *queue) ExpireReceipts(timeout time.Duration) []string {
 
 // ExpireNodeData checks for in flight node data requests that exceeded a timeout
 // allowance, canceling them and returning the responsible peers for penalisation.
-func (q *queue) ExpireNodeData(timeout time.Duration) []string {
+func (q *queue) ExpireNodeData(timeout time.Duration) map[string]int {
 	q.lock.Lock()
 	defer q.lock.Unlock()
 
@@ -743,9 +743,9 @@ func (q *queue) ExpireNodeData(timeout time.Duration) []string {
 // Note, this method expects the queue lock to be already held. The
 // reason the lock is not obtained in here is because the parameters already need
 // to access the queue, so they already need a lock anyway.
-func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) []string {
+func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) map[string]int {
 	// Iterate over the expired requests and return each to the queue
-	peers := []string{}
+	expiries := make(map[string]int)
 	for id, request := range pendPool {
 		if time.Since(request.Time) > timeout {
 			// Update the metrics with the timeout
@@ -758,25 +758,32 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest,
 			for _, header := range request.Headers {
 				taskQueue.Push(header, -float32(header.Number.Uint64()))
 			}
-			peers = append(peers, id)
+			// Add the peer to the expiry report along the the number of failed requests
+			expirations := len(request.Hashes)
+			if expirations < len(request.Headers) {
+				expirations = len(request.Headers)
+			}
+			expiries[id] = expirations
 		}
 	}
 	// Remove the expired requests from the pending pool
-	for _, id := range peers {
+	for id, _ := range expiries {
 		delete(pendPool, id)
 	}
-	return peers
+	return expiries
 }
 
-// DeliverBlocks injects a block retrieval response into the download queue.
-func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error {
+// DeliverBlocks injects a block retrieval response into the download queue. The
+// method returns the number of blocks accepted from the delivery and also wakes
+// any threads waiting for data delivery.
+func (q *queue) DeliverBlocks(id string, blocks []*types.Block) (int, error) {
 	q.lock.Lock()
 	defer q.lock.Unlock()
 
 	// Short circuit if the blocks were never requested
 	request := q.blockPendPool[id]
 	if request == nil {
-		return errNoFetchesPending
+		return 0, errNoFetchesPending
 	}
 	blockReqTimer.UpdateSince(request.Time)
 	delete(q.blockPendPool, id)
@@ -788,7 +795,7 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error {
 		}
 	}
 	// Iterate over the downloaded blocks and add each of them
-	errs := make([]error, 0)
+	accepted, errs := 0, make([]error, 0)
 	for _, block := range blocks {
 		// Skip any blocks that were not requested
 		hash := block.Hash()
@@ -811,28 +818,33 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error {
 
 		delete(request.Hashes, hash)
 		delete(q.hashPool, hash)
+		accepted++
 	}
 	// Return all failed or missing fetches to the queue
 	for hash, index := range request.Hashes {
 		q.hashQueue.Push(hash, float32(index))
 	}
 	// Wake up WaitResults
-	q.active.Signal()
+	if accepted > 0 {
+		q.active.Signal()
+	}
 	// If none of the blocks were good, it's a stale delivery
 	switch {
 	case len(errs) == 0:
-		return nil
+		return accepted, nil
 	case len(errs) == 1 && (errs[0] == errInvalidChain || errs[0] == errInvalidBlock):
-		return errs[0]
+		return accepted, errs[0]
 	case len(errs) == len(blocks):
-		return errStaleDelivery
+		return accepted, errStaleDelivery
 	default:
-		return fmt.Errorf("multiple failures: %v", errs)
+		return accepted, fmt.Errorf("multiple failures: %v", errs)
 	}
 }
 
 // DeliverBodies injects a block body retrieval response into the results queue.
-func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) error {
+// The method returns the number of blocks bodies accepted from the delivery and
+// also wakes any threads waiting for data delivery.
+func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) (int, error) {
 	q.lock.Lock()
 	defer q.lock.Unlock()
 
@@ -848,7 +860,9 @@ func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLi
 }
 
 // DeliverReceipts injects a receipt retrieval response into the results queue.
-func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) error {
+// The method returns the number of transaction receipts accepted from the delivery
+// and also wakes any threads waiting for data delivery.
+func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int, error) {
 	q.lock.Lock()
 	defer q.lock.Unlock()
 
@@ -867,12 +881,14 @@ func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) error
 // Note, this method expects the queue lock to be already held for writing. The
 // reason the lock is not obtained in here is because the parameters already need
 // to access the queue, so they already need a lock anyway.
-func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque, pendPool map[string]*fetchRequest,
-	donePool map[common.Hash]struct{}, reqTimer metrics.Timer, results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) error {
+func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
+	pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, reqTimer metrics.Timer,
+	results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) (int, error) {
+
 	// Short circuit if the data was never requested
 	request := pendPool[id]
 	if request == nil {
-		return errNoFetchesPending
+		return 0, errNoFetchesPending
 	}
 	reqTimer.UpdateSince(request.Time)
 	delete(pendPool, id)
@@ -885,8 +901,9 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
 	}
 	// Assemble each of the results with their headers and retrieved data parts
 	var (
-		failure error
-		useful  bool
+		accepted int
+		failure  error
+		useful   bool
 	)
 	for i, header := range request.Headers {
 		// Short circuit assembly if no more fetch results are found
@@ -906,6 +923,7 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
 		donePool[header.Hash()] = struct{}{}
 		q.resultCache[index].Pending--
 		useful = true
+		accepted++
 
 		// Clean up a successful fetch
 		request.Headers[i] = nil
@@ -918,27 +936,31 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
 		}
 	}
 	// Wake up WaitResults
-	q.active.Signal()
+	if accepted > 0 {
+		q.active.Signal()
+	}
 	// If none of the data was good, it's a stale delivery
 	switch {
 	case failure == nil || failure == errInvalidChain:
-		return failure
+		return accepted, failure
 	case useful:
-		return fmt.Errorf("partial failure: %v", failure)
+		return accepted, fmt.Errorf("partial failure: %v", failure)
 	default:
-		return errStaleDelivery
+		return accepted, errStaleDelivery
 	}
 }
 
 // DeliverNodeData injects a node state data retrieval response into the queue.
-func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, int)) error {
+// The method returns the number of node state entries originally requested, and
+// the number of them actually accepted from the delivery.
+func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, int)) (int, error) {
 	q.lock.Lock()
 	defer q.lock.Unlock()
 
 	// Short circuit if the data was never requested
 	request := q.statePendPool[id]
 	if request == nil {
-		return errNoFetchesPending
+		return 0, errNoFetchesPending
 	}
 	stateReqTimer.UpdateSince(request.Time)
 	delete(q.statePendPool, id)
@@ -950,10 +972,10 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i
 		}
 	}
 	// Iterate over the downloaded data and verify each of them
-	errs := make([]error, 0)
+	accepted, errs := 0, make([]error, 0)
 	process := []trie.SyncResult{}
 	for _, blob := range data {
-		// Skip any blocks that were not requested
+		// Skip any state trie entires that were not requested
 		hash := common.BytesToHash(crypto.Sha3(blob))
 		if _, ok := request.Hashes[hash]; !ok {
 			errs = append(errs, fmt.Errorf("non-requested state data %x", hash))
@@ -961,6 +983,7 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i
 		}
 		// Inject the next state trie item into the processing queue
 		process = append(process, trie.SyncResult{hash, blob})
+		accepted++
 
 		delete(request.Hashes, hash)
 		delete(q.stateTaskPool, hash)
@@ -978,11 +1001,11 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i
 	// If none of the data items were good, it's a stale delivery
 	switch {
 	case len(errs) == 0:
-		return nil
+		return accepted, nil
 	case len(errs) == len(request.Hashes):
-		return errStaleDelivery
+		return accepted, errStaleDelivery
 	default:
-		return fmt.Errorf("multiple failures: %v", errs)
+		return accepted, fmt.Errorf("multiple failures: %v", errs)
 	}
 }
 
-- 
GitLab