From 525116dbff916825463931361f75e75e955c12e2 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Felf=C3=B6ldi=20Zsolt?= <zsfelfoldi@gmail.com>
Date: Wed, 22 Mar 2017 20:44:22 +0100
Subject: [PATCH] les: implement request distributor, fix blocking issues
 (#3660)

* les: implement request distributor, fix blocking issues
* core: moved header validation before chain mutex lock
---
 core/blockchain.go         |   7 +-
 core/headerchain.go        |  13 +-
 les/backend.go             |   2 +
 les/distributor.go         | 259 +++++++++++++++++++++++++++++++++++++
 les/distributor_test.go    | 192 +++++++++++++++++++++++++++
 les/execqueue.go           |  71 ++++++++++
 les/fetcher.go             | 188 ++++++++++++++++-----------
 les/flowcontrol/control.go | 104 ++++-----------
 les/handler.go             |  66 ++++++++--
 les/helper_test.go         |   8 +-
 les/odr.go                 |  99 +++++++-------
 les/odr_requests.go        |  12 +-
 les/odr_test.go            |   3 +
 les/peer.go                |  35 +++--
 les/request_test.go        |   3 +
 les/serverpool.go          |  76 -----------
 les/txrelay.go             |  27 +++-
 light/lightchain.go        |  13 +-
 light/txpool.go            |  21 +--
 19 files changed, 877 insertions(+), 322 deletions(-)
 create mode 100644 les/distributor.go
 create mode 100644 les/distributor_test.go
 create mode 100644 les/execqueue.go

diff --git a/core/blockchain.go b/core/blockchain.go
index 207c21a65..a57832df0 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -1313,6 +1313,11 @@ Error: %v
 // of the header retrieval mechanisms already need to verify nonces, as well as
 // because nonces can be verified sparsely, not needing to check each.
 func (self *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (int, error) {
+	start := time.Now()
+	if i, err := self.hc.ValidateHeaderChain(chain, checkFreq); err != nil {
+		return i, err
+	}
+
 	// Make sure only one thread manipulates the chain at once
 	self.chainmu.Lock()
 	defer self.chainmu.Unlock()
@@ -1328,7 +1333,7 @@ func (self *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int)
 		return err
 	}
 
-	return self.hc.InsertHeaderChain(chain, checkFreq, whFunc)
+	return self.hc.InsertHeaderChain(chain, whFunc, start)
 }
 
 // writeHeader writes a header into the local chain, given that its parent is
diff --git a/core/headerchain.go b/core/headerchain.go
index a3d622087..57da9771b 100644
--- a/core/headerchain.go
+++ b/core/headerchain.go
@@ -219,7 +219,8 @@ type WhCallback func(*types.Header) error
 // should be done or not. The reason behind the optional check is because some
 // of the header retrieval mechanisms already need to verfy nonces, as well as
 // because nonces can be verified sparsely, not needing to check each.
-func (hc *HeaderChain) InsertHeaderChain(chain []*types.Header, checkFreq int, writeHeader WhCallback) (int, error) {
+
+func (hc *HeaderChain) ValidateHeaderChain(chain []*types.Header, checkFreq int) (int, error) {
 	// Do a sanity check that the provided chain is actually ordered and linked
 	for i := 1; i < len(chain); i++ {
 		if chain[i].Number.Uint64() != chain[i-1].Number.Uint64()+1 || chain[i].ParentHash != chain[i-1].Hash() {
@@ -231,9 +232,6 @@ func (hc *HeaderChain) InsertHeaderChain(chain []*types.Header, checkFreq int, w
 				chain[i-1].Hash().Bytes()[:4], i, chain[i].Number, chain[i].Hash().Bytes()[:4], chain[i].ParentHash[:4])
 		}
 	}
-	// Collect some import statistics to report on
-	stats := struct{ processed, ignored int }{}
-	start := time.Now()
 
 	// Generate the list of headers that should be POW verified
 	verify := make([]bool, len(chain))
@@ -309,6 +307,13 @@ func (hc *HeaderChain) InsertHeaderChain(chain []*types.Header, checkFreq int, w
 			}
 		}
 	}
+
+	return 0, nil
+}
+
+func (hc *HeaderChain) InsertHeaderChain(chain []*types.Header, writeHeader WhCallback, start time.Time) (int, error) {
+	// Collect some import statistics to report on
+	stats := struct{ processed, ignored int }{}
 	// All headers passed verification, import them into the database
 	for i, header := range chain {
 		// Short circuit insertion if shutting down
diff --git a/les/backend.go b/les/backend.go
index 404728c0e..3cab75f33 100644
--- a/les/backend.go
+++ b/les/backend.go
@@ -107,6 +107,8 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
 	if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.LightMode, config.NetworkId, eth.eventMux, eth.pow, eth.blockchain, nil, chainDb, odr, relay); err != nil {
 		return nil, err
 	}
+	relay.ps = eth.protocolManager.peers
+	relay.reqDist = eth.protocolManager.reqDist
 
 	eth.ApiBackend = &LesApiBackend{eth, nil}
 	eth.ApiBackend.gpo = gasprice.NewLightPriceOracle(eth.ApiBackend)
diff --git a/les/distributor.go b/les/distributor.go
new file mode 100644
index 000000000..c59b36146
--- /dev/null
+++ b/les/distributor.go
@@ -0,0 +1,259 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// Package light implements on-demand retrieval capable state and chain objects
+// for the Ethereum Light Client.
+package les
+
+import (
+	"container/list"
+	"errors"
+	"sync"
+	"time"
+)
+
+// ErrNoPeers is returned if no peers capable of serving a queued request are available
+var ErrNoPeers = errors.New("no suitable peers available")
+
+// requestDistributor implements a mechanism that distributes requests to
+// suitable peers, obeying flow control rules and prioritizing them in creation
+// order (even when a resend is necessary).
+type requestDistributor struct {
+	reqQueue         *list.List
+	lastReqOrder     uint64
+	stopChn, loopChn chan struct{}
+	loopNextSent     bool
+	lock             sync.Mutex
+
+	getAllPeers func() map[distPeer]struct{}
+}
+
+// distPeer is an LES server peer interface for the request distributor.
+// waitBefore returns either the necessary waiting time before sending a request
+// with the given upper estimated cost or the estimated remaining relative buffer
+// value after sending such a request (in which case the request can be sent
+// immediately). At least one of these values is always zero.
+type distPeer interface {
+	waitBefore(uint64) (time.Duration, float64)
+	canQueue() bool
+	queueSend(f func())
+}
+
+// distReq is the request abstraction used by the distributor. It is based on
+// three callback functions:
+// - getCost returns the upper estimate of the cost of sending the request to a given peer
+// - canSend tells if the server peer is suitable to serve the request
+// - request prepares sending the request to the given peer and returns a function that
+// does the actual sending. Request order should be preserved but the callback itself should not
+// block until it is sent because other peers might still be able to receive requests while
+// one of them is blocking. Instead, the returned function is put in the peer's send queue.
+type distReq struct {
+	getCost func(distPeer) uint64
+	canSend func(distPeer) bool
+	request func(distPeer) func()
+
+	reqOrder uint64
+	sentChn  chan distPeer
+	element  *list.Element
+}
+
+// newRequestDistributor creates a new request distributor
+func newRequestDistributor(getAllPeers func() map[distPeer]struct{}, stopChn chan struct{}) *requestDistributor {
+	r := &requestDistributor{
+		reqQueue:    list.New(),
+		loopChn:     make(chan struct{}, 2),
+		stopChn:     stopChn,
+		getAllPeers: getAllPeers,
+	}
+	go r.loop()
+	return r
+}
+
+// distMaxWait is the maximum waiting time after which further necessary waiting
+// times are recalculated based on new feedback from the servers
+const distMaxWait = time.Millisecond * 10
+
+// main event loop
+func (d *requestDistributor) loop() {
+	for {
+		select {
+		case <-d.stopChn:
+			d.lock.Lock()
+			elem := d.reqQueue.Front()
+			for elem != nil {
+				close(elem.Value.(*distReq).sentChn)
+				elem = elem.Next()
+			}
+			d.lock.Unlock()
+			return
+		case <-d.loopChn:
+			d.lock.Lock()
+			d.loopNextSent = false
+		loop:
+			for {
+				peer, req, wait := d.nextRequest()
+				if req != nil && wait == 0 {
+					chn := req.sentChn // save sentChn because remove sets it to nil
+					d.remove(req)
+					send := req.request(peer)
+					if send != nil {
+						peer.queueSend(send)
+					}
+					chn <- peer
+					close(chn)
+				} else {
+					if wait == 0 {
+						// no request to send and nothing to wait for; the next
+						// queued request will wake up the loop
+						break loop
+					}
+					d.loopNextSent = true // a "next" signal has been sent, do not send another one until this one has been received
+					if wait > distMaxWait {
+						// waiting times may be reduced by incoming request replies, if it is too long, recalculate it periodically
+						wait = distMaxWait
+					}
+					go func() {
+						time.Sleep(wait)
+						d.loopChn <- struct{}{}
+					}()
+					break loop
+				}
+			}
+			d.lock.Unlock()
+		}
+	}
+}
+
+// selectPeerItem represents a peer to be selected for a request by weightedRandomSelect
+type selectPeerItem struct {
+	peer   distPeer
+	req    *distReq
+	weight int64
+}
+
+// Weight implements wrsItem interface
+func (sp selectPeerItem) Weight() int64 {
+	return sp.weight
+}
+
+// nextRequest returns the next possible request from any peer, along with the
+// associated peer and necessary waiting time
+func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) {
+	peers := d.getAllPeers()
+
+	elem := d.reqQueue.Front()
+	var (
+		bestPeer distPeer
+		bestReq  *distReq
+		bestWait time.Duration
+		sel      *weightedRandomSelect
+	)
+
+	for (len(peers) > 0 || elem == d.reqQueue.Front()) && elem != nil {
+		req := elem.Value.(*distReq)
+		canSend := false
+		for peer, _ := range peers {
+			if peer.canQueue() && req.canSend(peer) {
+				canSend = true
+				cost := req.getCost(peer)
+				wait, bufRemain := peer.waitBefore(cost)
+				if wait == 0 {
+					if sel == nil {
+						sel = newWeightedRandomSelect()
+					}
+					sel.update(selectPeerItem{peer: peer, req: req, weight: int64(bufRemain*1000000) + 1})
+				} else {
+					if bestReq == nil || wait < bestWait {
+						bestPeer = peer
+						bestReq = req
+						bestWait = wait
+					}
+				}
+				delete(peers, peer)
+			}
+		}
+		next := elem.Next()
+		if !canSend && elem == d.reqQueue.Front() {
+			close(req.sentChn)
+			d.remove(req)
+		}
+		elem = next
+	}
+
+	if sel != nil {
+		c := sel.choose().(selectPeerItem)
+		return c.peer, c.req, 0
+	}
+	return bestPeer, bestReq, bestWait
+}
+
+// queue adds a request to the distribution queue, returns a channel where the
+// receiving peer is sent once the request has been sent (request callback returned).
+// If the request is cancelled or timed out without suitable peers, the channel is
+// closed without sending any peer references to it.
+func (d *requestDistributor) queue(r *distReq) chan distPeer {
+	d.lock.Lock()
+	defer d.lock.Unlock()
+
+	if r.reqOrder == 0 {
+		d.lastReqOrder++
+		r.reqOrder = d.lastReqOrder
+	}
+
+	back := d.reqQueue.Back()
+	if back == nil || r.reqOrder > back.Value.(*distReq).reqOrder {
+		r.element = d.reqQueue.PushBack(r)
+	} else {
+		before := d.reqQueue.Front()
+		for before.Value.(*distReq).reqOrder < r.reqOrder {
+			before = before.Next()
+		}
+		r.element = d.reqQueue.InsertBefore(r, before)
+	}
+
+	if !d.loopNextSent {
+		d.loopNextSent = true
+		d.loopChn <- struct{}{}
+	}
+
+	r.sentChn = make(chan distPeer, 1)
+	return r.sentChn
+}
+
+// cancel removes a request from the queue if it has not been sent yet (returns
+// false if it has been sent already). It is guaranteed that the callback functions
+// will not be called after cancel returns.
+func (d *requestDistributor) cancel(r *distReq) bool {
+	d.lock.Lock()
+	defer d.lock.Unlock()
+
+	if r.sentChn == nil {
+		return false
+	}
+
+	close(r.sentChn)
+	d.remove(r)
+	return true
+}
+
+// remove removes a request from the queue
+func (d *requestDistributor) remove(r *distReq) {
+	r.sentChn = nil
+	if r.element != nil {
+		d.reqQueue.Remove(r.element)
+		r.element = nil
+	}
+}
diff --git a/les/distributor_test.go b/les/distributor_test.go
new file mode 100644
index 000000000..f2eb80729
--- /dev/null
+++ b/les/distributor_test.go
@@ -0,0 +1,192 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// Package light implements on-demand retrieval capable state and chain objects
+// for the Ethereum Light Client.
+package les
+
+import (
+	"math/rand"
+	"sync"
+	"testing"
+	"time"
+)
+
+type testDistReq struct {
+	cost, procTime, order uint64
+	canSendTo             map[*testDistPeer]struct{}
+}
+
+func (r *testDistReq) getCost(dp distPeer) uint64 {
+	return r.cost
+}
+
+func (r *testDistReq) canSend(dp distPeer) bool {
+	_, ok := r.canSendTo[dp.(*testDistPeer)]
+	return ok
+}
+
+func (r *testDistReq) request(dp distPeer) func() {
+	return func() { dp.(*testDistPeer).send(r) }
+}
+
+type testDistPeer struct {
+	sent    []*testDistReq
+	sumCost uint64
+	lock    sync.RWMutex
+}
+
+func (p *testDistPeer) send(r *testDistReq) {
+	p.lock.Lock()
+	defer p.lock.Unlock()
+
+	p.sent = append(p.sent, r)
+	p.sumCost += r.cost
+}
+
+func (p *testDistPeer) worker(t *testing.T, checkOrder bool, stop chan struct{}) {
+	var last uint64
+	for {
+		wait := time.Millisecond
+		p.lock.Lock()
+		if len(p.sent) > 0 {
+			rq := p.sent[0]
+			wait = time.Duration(rq.procTime)
+			p.sumCost -= rq.cost
+			if checkOrder {
+				if rq.order <= last {
+					t.Errorf("Requests processed in wrong order")
+				}
+				last = rq.order
+			}
+			p.sent = p.sent[1:]
+		}
+		p.lock.Unlock()
+		select {
+		case <-stop:
+			return
+		case <-time.After(wait):
+		}
+	}
+}
+
+const (
+	testDistBufLimit       = 10000000
+	testDistMaxCost        = 1000000
+	testDistPeerCount      = 5
+	testDistReqCount       = 50000
+	testDistMaxResendCount = 3
+)
+
+func (p *testDistPeer) waitBefore(cost uint64) (time.Duration, float64) {
+	p.lock.RLock()
+	sumCost := p.sumCost + cost
+	p.lock.RUnlock()
+	if sumCost < testDistBufLimit {
+		return 0, float64(testDistBufLimit-sumCost) / float64(testDistBufLimit)
+	} else {
+		return time.Duration(sumCost - testDistBufLimit), 0
+	}
+}
+
+func (p *testDistPeer) canQueue() bool {
+	return true
+}
+
+func (p *testDistPeer) queueSend(f func()) {
+	f()
+}
+
+func TestRequestDistributor(t *testing.T) {
+	testRequestDistributor(t, false)
+}
+
+func TestRequestDistributorResend(t *testing.T) {
+	testRequestDistributor(t, true)
+}
+
+func testRequestDistributor(t *testing.T, resend bool) {
+	stop := make(chan struct{})
+	defer close(stop)
+
+	var peers [testDistPeerCount]*testDistPeer
+	for i, _ := range peers {
+		peers[i] = &testDistPeer{}
+		go peers[i].worker(t, !resend, stop)
+	}
+
+	dist := newRequestDistributor(func() map[distPeer]struct{} {
+		m := make(map[distPeer]struct{})
+		for _, peer := range peers {
+			m[peer] = struct{}{}
+		}
+		return m
+	}, stop)
+
+	var wg sync.WaitGroup
+
+	for i := 1; i <= testDistReqCount; i++ {
+		cost := uint64(rand.Int63n(testDistMaxCost))
+		procTime := uint64(rand.Int63n(int64(cost + 1)))
+		rq := &testDistReq{
+			cost:      cost,
+			procTime:  procTime,
+			order:     uint64(i),
+			canSendTo: make(map[*testDistPeer]struct{}),
+		}
+		for _, peer := range peers {
+			if rand.Intn(2) != 0 {
+				rq.canSendTo[peer] = struct{}{}
+			}
+		}
+
+		wg.Add(1)
+		req := &distReq{
+			getCost: rq.getCost,
+			canSend: rq.canSend,
+			request: rq.request,
+		}
+		chn := dist.queue(req)
+		go func() {
+			cnt := 1
+			if resend && len(rq.canSendTo) != 0 {
+				cnt = rand.Intn(testDistMaxResendCount) + 1
+			}
+			for i := 0; i < cnt; i++ {
+				if i != 0 {
+					chn = dist.queue(req)
+				}
+				p := <-chn
+				if p == nil {
+					if len(rq.canSendTo) != 0 {
+						t.Errorf("Request that could have been sent was dropped")
+					}
+				} else {
+					peer := p.(*testDistPeer)
+					if _, ok := rq.canSendTo[peer]; !ok {
+						t.Errorf("Request sent to wrong peer")
+					}
+				}
+			}
+			wg.Done()
+		}()
+		if rand.Intn(1000) == 0 {
+			time.Sleep(time.Duration(rand.Intn(5000000)))
+		}
+	}
+
+	wg.Wait()
+}
diff --git a/les/execqueue.go b/les/execqueue.go
new file mode 100644
index 000000000..ac779003b
--- /dev/null
+++ b/les/execqueue.go
@@ -0,0 +1,71 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package les
+
+import (
+	"sync/atomic"
+)
+
+// ExecQueue implements a queue that executes function calls in a single thread,
+// in the same order as they have been queued.
+type execQueue struct {
+	chn                 chan func()
+	cnt, stop, capacity int32
+}
+
+// NewExecQueue creates a new execution queue.
+func newExecQueue(capacity int32) *execQueue {
+	q := &execQueue{
+		chn:      make(chan func(), capacity),
+		capacity: capacity,
+	}
+	go q.loop()
+	return q
+}
+
+func (q *execQueue) loop() {
+	for f := range q.chn {
+		atomic.AddInt32(&q.cnt, -1)
+		if atomic.LoadInt32(&q.stop) != 0 {
+			return
+		}
+		f()
+	}
+}
+
+// CanQueue returns true if more  function calls can be added to the execution queue.
+func (q *execQueue) canQueue() bool {
+	return atomic.LoadInt32(&q.stop) == 0 && atomic.LoadInt32(&q.cnt) < q.capacity
+}
+
+// Queue adds a function call to the execution queue. Returns true if successful.
+func (q *execQueue) queue(f func()) bool {
+	if atomic.LoadInt32(&q.stop) != 0 {
+		return false
+	}
+	if atomic.AddInt32(&q.cnt, 1) > q.capacity {
+		atomic.AddInt32(&q.cnt, -1)
+		return false
+	}
+	q.chn <- f
+	return true
+}
+
+// Stop stops the exec queue.
+func (q *execQueue) quit() {
+	atomic.StoreInt32(&q.stop, 1)
+}
diff --git a/les/fetcher.go b/les/fetcher.go
index f9e517d25..353e91932 100644
--- a/les/fetcher.go
+++ b/les/fetcher.go
@@ -135,35 +135,38 @@ func (f *lightFetcher) syncLoop() {
 			f.lock.Lock()
 			s := requesting
 			requesting = false
+			var (
+				rq    *distReq
+				reqID uint64
+			)
 			if !f.syncing && !(newAnnounce && s) {
-				reqID := getNextReqID()
-				if peer, node, amount, retry := f.nextRequest(reqID); node != nil {
-					requesting = true
-					if reqID, ok := f.request(peer, reqID, node, amount); ok {
-						go func() {
-							time.Sleep(softRequestTimeout)
-							f.reqMu.Lock()
-							req, ok := f.requested[reqID]
-							if ok {
-								req.timeout = true
-								f.requested[reqID] = req
-							}
-							f.reqMu.Unlock()
-							// keep starting new requests while possible
-							f.requestChn <- false
-						}()
-					}
-				} else {
-					if retry {
-						requesting = true
-						go func() {
-							time.Sleep(time.Millisecond * 100)
-							f.requestChn <- false
-						}()
-					}
-				}
+				rq, reqID = f.nextRequest()
 			}
+			syncing := f.syncing
 			f.lock.Unlock()
+
+			if rq != nil {
+				requesting = true
+				_, ok := <-f.pm.reqDist.queue(rq)
+				if !ok {
+					f.requestChn <- false
+				}
+
+				if !syncing {
+					go func() {
+						time.Sleep(softRequestTimeout)
+						f.reqMu.Lock()
+						req, ok := f.requested[reqID]
+						if ok {
+							req.timeout = true
+							f.requested[reqID] = req
+						}
+						f.reqMu.Unlock()
+						// keep starting new requests while possible
+						f.requestChn <- false
+					}()
+				}
+			}
 		case reqID := <-f.timeoutChn:
 			f.reqMu.Lock()
 			req, ok := f.requested[reqID]
@@ -334,6 +337,12 @@ func (f *lightFetcher) peerHasBlock(p *peer, hash common.Hash, number uint64) bo
 	f.lock.Lock()
 	defer f.lock.Unlock()
 
+	if f.syncing {
+		// always return true when syncing
+		// false positives are acceptable, a more sophisticated condition can be implemented later
+		return true
+	}
+
 	fp := f.peers[p]
 	if fp == nil || fp.root == nil {
 		return false
@@ -346,43 +355,13 @@ func (f *lightFetcher) peerHasBlock(p *peer, hash common.Hash, number uint64) bo
 	f.chain.LockChain()
 	defer f.chain.UnlockChain()
 	// if it's older than the peer's block tree root but it's in the same canonical chain
-	// than the root, we can still be sure the peer knows it
+	// as the root, we can still be sure the peer knows it
+	//
+	// when syncing, just check if it is part of the known chain, there is nothing better we
+	// can do since we do not know the most recent block hash yet
 	return core.GetCanonicalHash(f.pm.chainDb, fp.root.number) == fp.root.hash && core.GetCanonicalHash(f.pm.chainDb, number) == hash
 }
 
-// request initiates a header download request from a certain peer
-func (f *lightFetcher) request(p *peer, reqID uint64, n *fetcherTreeNode, amount uint64) (uint64, bool) {
-	fp := f.peers[p]
-	if fp == nil {
-		p.Log().Debug("Requesting from unknown peer")
-		p.fcServer.DeassignRequest(reqID)
-		return 0, false
-	}
-	if fp.bestConfirmed == nil || fp.root == nil || !f.checkKnownNode(p, fp.root) {
-		f.syncing = true
-		go func() {
-			p.Log().Debug("Synchronisation started")
-			f.pm.synchronise(p)
-			f.syncDone <- p
-		}()
-		p.fcServer.DeassignRequest(reqID)
-		return 0, false
-	}
-
-	n.requested = true
-	cost := p.GetRequestCost(GetBlockHeadersMsg, int(amount))
-	p.fcServer.SendRequest(reqID, cost)
-	f.reqMu.Lock()
-	f.requested[reqID] = fetchRequest{hash: n.hash, amount: amount, peer: p, sent: mclock.Now()}
-	f.reqMu.Unlock()
-	go p.RequestHeadersByHash(reqID, cost, n.hash, int(amount), 0, true)
-	go func() {
-		time.Sleep(hardRequestTimeout)
-		f.timeoutChn <- reqID
-	}()
-	return reqID, true
-}
-
 // requestAmount calculates the amount of headers to be downloaded starting
 // from a certain head backwards
 func (f *lightFetcher) requestAmount(p *peer, n *fetcherTreeNode) uint64 {
@@ -408,12 +387,13 @@ func (f *lightFetcher) requestedID(reqID uint64) bool {
 
 // nextRequest selects the peer and announced head to be requested next, amount
 // to be downloaded starting from the head backwards is also returned
-func (f *lightFetcher) nextRequest(reqID uint64) (*peer, *fetcherTreeNode, uint64, bool) {
+func (f *lightFetcher) nextRequest() (*distReq, uint64) {
 	var (
 		bestHash   common.Hash
 		bestAmount uint64
 	)
 	bestTd := f.maxConfirmedTd
+	bestSyncing := false
 
 	for p, fp := range f.peers {
 		for hash, n := range fp.nodeByHash {
@@ -423,29 +403,83 @@ func (f *lightFetcher) nextRequest(reqID uint64) (*peer, *fetcherTreeNode, uint6
 					bestHash = hash
 					bestAmount = amount
 					bestTd = n.td
+					bestSyncing = fp.bestConfirmed == nil || fp.root == nil || !f.checkKnownNode(p, fp.root)
 				}
 			}
 		}
 	}
 	if bestTd == f.maxConfirmedTd {
-		return nil, nil, 0, false
-	}
-
-	peer, _, locked := f.pm.serverPool.selectPeer(reqID, func(p *peer) (bool, time.Duration) {
-		fp := f.peers[p]
-		if fp == nil || fp.nodeByHash[bestHash] == nil {
-			return false, 0
+		return nil, 0
+	}
+
+	f.syncing = bestSyncing
+
+	var rq *distReq
+	reqID := getNextReqID()
+	if f.syncing {
+		rq = &distReq{
+			getCost: func(dp distPeer) uint64 {
+				return 0
+			},
+			canSend: func(dp distPeer) bool {
+				p := dp.(*peer)
+				fp := f.peers[p]
+				return fp != nil && fp.nodeByHash[bestHash] != nil
+			},
+			request: func(dp distPeer) func() {
+				go func() {
+					p := dp.(*peer)
+					p.Log().Debug("Synchronisation started")
+					f.pm.synchronise(p)
+					f.syncDone <- p
+				}()
+				return nil
+			},
+		}
+	} else {
+		rq = &distReq{
+			getCost: func(dp distPeer) uint64 {
+				p := dp.(*peer)
+				return p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount))
+			},
+			canSend: func(dp distPeer) bool {
+				p := dp.(*peer)
+				f.lock.Lock()
+				defer f.lock.Unlock()
+
+				fp := f.peers[p]
+				if fp == nil {
+					return false
+				}
+				n := fp.nodeByHash[bestHash]
+				return n != nil && !n.requested
+			},
+			request: func(dp distPeer) func() {
+				p := dp.(*peer)
+				f.lock.Lock()
+				fp := f.peers[p]
+				if fp != nil {
+					n := fp.nodeByHash[bestHash]
+					if n != nil {
+						n.requested = true
+					}
+				}
+				f.lock.Unlock()
+
+				cost := p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount))
+				p.fcServer.QueueRequest(reqID, cost)
+				f.reqMu.Lock()
+				f.requested[reqID] = fetchRequest{hash: bestHash, amount: bestAmount, peer: p, sent: mclock.Now()}
+				f.reqMu.Unlock()
+				go func() {
+					time.Sleep(hardRequestTimeout)
+					f.timeoutChn <- reqID
+				}()
+				return func() { p.RequestHeadersByHash(reqID, cost, bestHash, int(bestAmount), 0, true) }
+			},
 		}
-		return true, p.fcServer.CanSend(p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount)))
-	})
-	if !locked {
-		return nil, nil, 0, true
-	}
-	var node *fetcherTreeNode
-	if peer != nil {
-		node = f.peers[peer].nodeByHash[bestHash]
 	}
-	return peer, node, bestAmount, false
+	return rq, reqID
 }
 
 // deliverHeaders delivers header download request responses for processing
diff --git a/les/flowcontrol/control.go b/les/flowcontrol/control.go
index e45537cf5..e40e69346 100644
--- a/les/flowcontrol/control.go
+++ b/les/flowcontrol/control.go
@@ -94,14 +94,12 @@ func (peer *ClientNode) RequestProcessed(cost uint64) (bv, realCost uint64) {
 }
 
 type ServerNode struct {
-	bufEstimate     uint64
-	lastTime        mclock.AbsTime
-	params          *ServerParams
-	sumCost         uint64            // sum of req costs sent to this server
-	pending         map[uint64]uint64 // value = sumCost after sending the given req
-	assignedRequest uint64            // when != 0, only the request with the given ID can be sent to this peer
-	assignToken     chan struct{}     // send to this channel before assigning, read from it after deassigning
-	lock            sync.RWMutex
+	bufEstimate uint64
+	lastTime    mclock.AbsTime
+	params      *ServerParams
+	sumCost     uint64            // sum of req costs sent to this server
+	pending     map[uint64]uint64 // value = sumCost after sending the given req
+	lock        sync.RWMutex
 }
 
 func NewServerNode(params *ServerParams) *ServerNode {
@@ -110,7 +108,6 @@ func NewServerNode(params *ServerParams) *ServerNode {
 		lastTime:    mclock.Now(),
 		params:      params,
 		pending:     make(map[uint64]uint64),
-		assignToken: make(chan struct{}, 1),
 	}
 }
 
@@ -127,94 +124,37 @@ func (peer *ServerNode) recalcBLE(time mclock.AbsTime) {
 }
 
 // safetyMargin is added to the flow control waiting time when estimated buffer value is low
-const safetyMargin = time.Millisecond * 200
+const safetyMargin = time.Millisecond
 
-func (peer *ServerNode) canSend(maxCost uint64) time.Duration {
+func (peer *ServerNode) canSend(maxCost uint64) (time.Duration, float64) {
+	peer.recalcBLE(mclock.Now())
 	maxCost += uint64(safetyMargin) * peer.params.MinRecharge / uint64(fcTimeConst)
 	if maxCost > peer.params.BufLimit {
 		maxCost = peer.params.BufLimit
 	}
 	if peer.bufEstimate >= maxCost {
-		return 0
+		return 0, float64(peer.bufEstimate-maxCost) / float64(peer.params.BufLimit)
 	}
-	return time.Duration((maxCost - peer.bufEstimate) * uint64(fcTimeConst) / peer.params.MinRecharge)
+	return time.Duration((maxCost - peer.bufEstimate) * uint64(fcTimeConst) / peer.params.MinRecharge), 0
 }
 
 // CanSend returns the minimum waiting time required before sending a request
-// with the given maximum estimated cost
-func (peer *ServerNode) CanSend(maxCost uint64) time.Duration {
+// with the given maximum estimated cost. Second return value is the relative
+// estimated buffer level after sending the request (divided by BufLimit).
+func (peer *ServerNode) CanSend(maxCost uint64) (time.Duration, float64) {
 	peer.lock.RLock()
 	defer peer.lock.RUnlock()
 
 	return peer.canSend(maxCost)
 }
 
-// AssignRequest tries to assign the server node to the given request, guaranteeing
-// that once it returns true, no request will be sent to the node before this one
-func (peer *ServerNode) AssignRequest(reqID uint64) bool {
-	select {
-	case peer.assignToken <- struct{}{}:
-	default:
-		return false
-	}
-	peer.lock.Lock()
-	peer.assignedRequest = reqID
-	peer.lock.Unlock()
-	return true
-}
-
-// MustAssignRequest waits until the node can be assigned to the given request.
-// It is always guaranteed that assignments are released in a short amount of time.
-func (peer *ServerNode) MustAssignRequest(reqID uint64) {
-	peer.assignToken <- struct{}{}
-	peer.lock.Lock()
-	peer.assignedRequest = reqID
-	peer.lock.Unlock()
-}
-
-// DeassignRequest releases a request assignment in case the planned request
-// is not being sent.
-func (peer *ServerNode) DeassignRequest(reqID uint64) {
-	peer.lock.Lock()
-	if peer.assignedRequest == reqID {
-		peer.assignedRequest = 0
-		<-peer.assignToken
-	}
-	peer.lock.Unlock()
-}
-
-// IsAssigned returns true if the server node has already been assigned to a request
-// (note that this function returning false does not guarantee that you can assign a request
-// immediately afterwards, its only purpose is to help peer selection)
-func (peer *ServerNode) IsAssigned() bool {
-	peer.lock.RLock()
-	locked := peer.assignedRequest != 0
-	peer.lock.RUnlock()
-	return locked
-}
-
-// blocks until request can be sent
-func (peer *ServerNode) SendRequest(reqID, maxCost uint64) {
+// QueueRequest should be called when the request has been assigned to the given
+// server node, before putting it in the send queue. It is mandatory that requests
+// are sent in the same order as the QueueRequest calls are made.
+func (peer *ServerNode) QueueRequest(reqID, maxCost uint64) {
 	peer.lock.Lock()
 	defer peer.lock.Unlock()
 
-	if peer.assignedRequest != reqID {
-		peer.lock.Unlock()
-		peer.MustAssignRequest(reqID)
-		peer.lock.Lock()
-	}
-
-	peer.recalcBLE(mclock.Now())
-	wait := peer.canSend(maxCost)
-	for wait > 0 {
-		peer.lock.Unlock()
-		time.Sleep(wait)
-		peer.lock.Lock()
-		peer.recalcBLE(mclock.Now())
-		wait = peer.canSend(maxCost)
-	}
-	peer.assignedRequest = 0
-	<-peer.assignToken
 	peer.bufEstimate -= maxCost
 	peer.sumCost += maxCost
 	if reqID >= 0 {
@@ -222,6 +162,8 @@ func (peer *ServerNode) SendRequest(reqID, maxCost uint64) {
 	}
 }
 
+// GotReply adjusts estimated buffer value according to the value included in
+// the latest request reply.
 func (peer *ServerNode) GotReply(reqID, bv uint64) {
 
 	peer.lock.Lock()
@@ -235,6 +177,10 @@ func (peer *ServerNode) GotReply(reqID, bv uint64) {
 		return
 	}
 	delete(peer.pending, reqID)
-	peer.bufEstimate = bv - (peer.sumCost - sc)
+	cc := peer.sumCost - sc
+	peer.bufEstimate = 0
+	if bv > cc {
+		peer.bufEstimate = bv - cc
+	}
 	peer.lastTime = mclock.Now()
 }
diff --git a/les/handler.go b/les/handler.go
index 4271da8b8..ece2060ee 100644
--- a/les/handler.go
+++ b/les/handler.go
@@ -102,6 +102,7 @@ type ProtocolManager struct {
 	odr         *LesOdr
 	server      *LesServer
 	serverPool  *serverPool
+	reqDist     *requestDistributor
 
 	downloader *downloader.Downloader
 	fetcher    *lightFetcher
@@ -203,8 +204,17 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network
 			blockchain.InsertHeaderChain, nil, nil, blockchain.Rollback, removePeer)
 	}
 
+	manager.reqDist = newRequestDistributor(func() map[distPeer]struct{} {
+		m := make(map[distPeer]struct{})
+		peers := manager.peers.AllPeers()
+		for _, peer := range peers {
+			m[peer] = struct{}{}
+		}
+		return m
+	}, manager.quitSync)
 	if odr != nil {
 		odr.removePeer = removePeer
+		odr.reqDist = manager.reqDist
 	}
 
 	/*validator := func(block *types.Block, parent *types.Block) error {
@@ -334,17 +344,49 @@ func (pm *ProtocolManager) handle(p *peer) error {
 	if pm.lightSync {
 		requestHeadersByHash := func(origin common.Hash, amount int, skip int, reverse bool) error {
 			reqID := getNextReqID()
-			cost := p.GetRequestCost(GetBlockHeadersMsg, amount)
-			p.fcServer.MustAssignRequest(reqID)
-			p.fcServer.SendRequest(reqID, cost)
-			return p.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse)
+			rq := &distReq{
+				getCost: func(dp distPeer) uint64 {
+					peer := dp.(*peer)
+					return peer.GetRequestCost(GetBlockHeadersMsg, amount)
+				},
+				canSend: func(dp distPeer) bool {
+					return dp.(*peer) == p
+				},
+				request: func(dp distPeer) func() {
+					peer := dp.(*peer)
+					cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
+					peer.fcServer.QueueRequest(reqID, cost)
+					return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) }
+				},
+			}
+			_, ok := <-pm.reqDist.queue(rq)
+			if !ok {
+				return ErrNoPeers
+			}
+			return nil
 		}
 		requestHeadersByNumber := func(origin uint64, amount int, skip int, reverse bool) error {
 			reqID := getNextReqID()
-			cost := p.GetRequestCost(GetBlockHeadersMsg, amount)
-			p.fcServer.MustAssignRequest(reqID)
-			p.fcServer.SendRequest(reqID, cost)
-			return p.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse)
+			rq := &distReq{
+				getCost: func(dp distPeer) uint64 {
+					peer := dp.(*peer)
+					return peer.GetRequestCost(GetBlockHeadersMsg, amount)
+				},
+				canSend: func(dp distPeer) bool {
+					return dp.(*peer) == p
+				},
+				request: func(dp distPeer) func() {
+					peer := dp.(*peer)
+					cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
+					peer.fcServer.QueueRequest(reqID, cost)
+					return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) }
+				},
+			}
+			_, ok := <-pm.reqDist.queue(rq)
+			if !ok {
+				return ErrNoPeers
+			}
+			return nil
 		}
 		if err := pm.downloader.RegisterPeer(p.id, ethVersion, p.HeadAndTd,
 			requestHeadersByHash, requestHeadersByNumber, nil, nil, nil); err != nil {
@@ -884,7 +926,13 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
 	}
 
 	if deliverMsg != nil {
-		return pm.odr.Deliver(p, deliverMsg)
+		err := pm.odr.Deliver(p, deliverMsg)
+		if err != nil {
+			p.responseErrors++
+			if p.responseErrors > maxResponseErrors {
+				return err
+			}
+		}
 	}
 	return nil
 }
diff --git a/les/helper_test.go b/les/helper_test.go
index f6293ad1a..3e8ce57b6 100644
--- a/les/helper_test.go
+++ b/les/helper_test.go
@@ -352,11 +352,15 @@ func (p *testServerPool) setPeer(peer *peer) {
 	p.peer = peer
 }
 
-func (p *testServerPool) selectPeerWait(uint64, func(*peer) (bool, time.Duration), <-chan struct{}) *peer {
+func (p *testServerPool) getAllPeers() map[distPeer]struct{} {
 	p.lock.RLock()
 	defer p.lock.RUnlock()
 
-	return p.peer
+	m := make(map[distPeer]struct{})
+	if p.peer != nil {
+		m[p.peer] = struct{}{}
+	}
+	return m
 }
 
 func (p *testServerPool) adjustResponseTime(*poolEntry, time.Duration, bool) {
diff --git a/les/odr.go b/les/odr.go
index afc894ab5..06b44d318 100644
--- a/les/odr.go
+++ b/les/odr.go
@@ -32,14 +32,12 @@ import (
 var (
 	softRequestTimeout = time.Millisecond * 500
 	hardRequestTimeout = time.Second * 10
-	retryPeers         = time.Second * 1
 )
 
 // peerDropFn is a callback type for dropping a peer detected as malicious.
 type peerDropFn func(id string)
 
 type odrPeerSelector interface {
-	selectPeerWait(uint64, func(*peer) (bool, time.Duration), <-chan struct{}) *peer
 	adjustResponseTime(*poolEntry, time.Duration, bool)
 }
 
@@ -51,6 +49,7 @@ type LesOdr struct {
 	mlock, clock sync.Mutex
 	sentReqs     map[uint64]*sentReq
 	serverPool   odrPeerSelector
+	reqDist      *requestDistributor
 }
 
 func NewLesOdr(db ethdb.Database) *LesOdr {
@@ -165,69 +164,81 @@ func (self *LesOdr) requestPeer(req *sentReq, peer *peer, delivered, timeout cha
 func (self *LesOdr) networkRequest(ctx context.Context, lreq LesOdrRequest) error {
 	answered := make(chan struct{})
 	req := &sentReq{
-		valFunc:  lreq.Valid,
+		valFunc:  lreq.Validate,
 		sentTo:   make(map[*peer]chan struct{}),
 		answered: answered, // reply delivered by any peer
 	}
-	reqID := getNextReqID()
-	self.mlock.Lock()
-	self.sentReqs[reqID] = req
-	self.mlock.Unlock()
+
+	exclude := make(map[*peer]struct{})
 
 	reqWg := new(sync.WaitGroup)
 	reqWg.Add(1)
 	defer reqWg.Done()
-	go func() {
-		reqWg.Wait()
-		self.mlock.Lock()
-		delete(self.sentReqs, reqID)
-		self.mlock.Unlock()
-	}()
 
-	exclude := make(map[*peer]struct{})
-	for {
-		var p *peer
-		if self.serverPool != nil {
-			p = self.serverPool.selectPeerWait(reqID, func(p *peer) (bool, time.Duration) {
-				if _, ok := exclude[p]; ok || !lreq.CanSend(p) {
-					return false, 0
-				}
-				return true, p.fcServer.CanSend(lreq.GetCost(p))
-			}, ctx.Done())
-		}
-		if p == nil {
-			select {
-			case <-ctx.Done():
-				return ctx.Err()
-			case <-req.answered:
-				return nil
-			case <-time.After(retryPeers):
-			}
-		} else {
+	var timeout chan struct{}
+	reqID := getNextReqID()
+	rq := &distReq{
+		getCost: func(dp distPeer) uint64 {
+			return lreq.GetCost(dp.(*peer))
+		},
+		canSend: func(dp distPeer) bool {
+			p := dp.(*peer)
+			_, ok := exclude[p]
+			return !ok && lreq.CanSend(p)
+		},
+		request: func(dp distPeer) func() {
+			p := dp.(*peer)
 			exclude[p] = struct{}{}
 			delivered := make(chan struct{})
-			timeout := make(chan struct{})
+			timeout = make(chan struct{})
 			req.lock.Lock()
 			req.sentTo[p] = delivered
 			req.lock.Unlock()
 			reqWg.Add(1)
 			cost := lreq.GetCost(p)
-			p.fcServer.SendRequest(reqID, cost)
+			p.fcServer.QueueRequest(reqID, cost)
 			go self.requestPeer(req, p, delivered, timeout, reqWg)
-			lreq.Request(reqID, p)
-
-			select {
-			case <-ctx.Done():
-				return ctx.Err()
-			case <-answered:
-				return nil
-			case <-timeout:
+			return func() { lreq.Request(reqID, p) }
+		},
+	}
+
+	self.mlock.Lock()
+	self.sentReqs[reqID] = req
+	self.mlock.Unlock()
+
+	go func() {
+		reqWg.Wait()
+		self.mlock.Lock()
+		delete(self.sentReqs, reqID)
+		self.mlock.Unlock()
+	}()
+
+	for {
+		peerChn := self.reqDist.queue(rq)
+		select {
+		case <-ctx.Done():
+			self.reqDist.cancel(rq)
+			return ctx.Err()
+		case <-answered:
+			self.reqDist.cancel(rq)
+			return nil
+		case _, ok := <-peerChn:
+			if !ok {
+				return ErrNoPeers
 			}
 		}
+
+		select {
+		case <-ctx.Done():
+			return ctx.Err()
+		case <-answered:
+			return nil
+		case <-timeout:
+		}
 	}
 }
 
-// Retrieve tries to fetch an object from the local db, then from the LES network.
+// Retrieve tries to fetch an object from the LES network.
 // If the network retrieval was successful, it stores the object in local db.
 func (self *LesOdr) Retrieve(ctx context.Context, req light.OdrRequest) (err error) {
 	lreq := LesRequest(req)
diff --git a/les/odr_requests.go b/les/odr_requests.go
index 53aced93c..1f853b341 100644
--- a/les/odr_requests.go
+++ b/les/odr_requests.go
@@ -49,7 +49,7 @@ type LesOdrRequest interface {
 	GetCost(*peer) uint64
 	CanSend(*peer) bool
 	Request(uint64, *peer) error
-	Valid(ethdb.Database, *Msg) error // if true, keeps the retrieved object
+	Validate(ethdb.Database, *Msg) error
 }
 
 func LesRequest(req light.OdrRequest) LesOdrRequest {
@@ -92,7 +92,7 @@ func (r *BlockRequest) Request(reqID uint64, peer *peer) error {
 // Valid processes an ODR request reply message from the LES network
 // returns true and stores results in memory if the message was a valid reply
 // to the request (implementation of LesOdrRequest)
-func (r *BlockRequest) Valid(db ethdb.Database, msg *Msg) error {
+func (r *BlockRequest) Validate(db ethdb.Database, msg *Msg) error {
 	log.Debug("Validating block body", "hash", r.Hash)
 
 	// Ensure we have a correct message with a single block body
@@ -148,7 +148,7 @@ func (r *ReceiptsRequest) Request(reqID uint64, peer *peer) error {
 // Valid processes an ODR request reply message from the LES network
 // returns true and stores results in memory if the message was a valid reply
 // to the request (implementation of LesOdrRequest)
-func (r *ReceiptsRequest) Valid(db ethdb.Database, msg *Msg) error {
+func (r *ReceiptsRequest) Validate(db ethdb.Database, msg *Msg) error {
 	log.Debug("Validating block receipts", "hash", r.Hash)
 
 	// Ensure we have a correct message with a single block receipt
@@ -208,7 +208,7 @@ func (r *TrieRequest) Request(reqID uint64, peer *peer) error {
 // Valid processes an ODR request reply message from the LES network
 // returns true and stores results in memory if the message was a valid reply
 // to the request (implementation of LesOdrRequest)
-func (r *TrieRequest) Valid(db ethdb.Database, msg *Msg) error {
+func (r *TrieRequest) Validate(db ethdb.Database, msg *Msg) error {
 	log.Debug("Validating trie proof", "root", r.Id.Root, "key", r.Key)
 
 	// Ensure we have a correct message with a single proof
@@ -259,7 +259,7 @@ func (r *CodeRequest) Request(reqID uint64, peer *peer) error {
 // Valid processes an ODR request reply message from the LES network
 // returns true and stores results in memory if the message was a valid reply
 // to the request (implementation of LesOdrRequest)
-func (r *CodeRequest) Valid(db ethdb.Database, msg *Msg) error {
+func (r *CodeRequest) Validate(db ethdb.Database, msg *Msg) error {
 	log.Debug("Validating code data", "hash", r.Hash)
 
 	// Ensure we have a correct message with a single code element
@@ -319,7 +319,7 @@ func (r *ChtRequest) Request(reqID uint64, peer *peer) error {
 // Valid processes an ODR request reply message from the LES network
 // returns true and stores results in memory if the message was a valid reply
 // to the request (implementation of LesOdrRequest)
-func (r *ChtRequest) Valid(db ethdb.Database, msg *Msg) error {
+func (r *ChtRequest) Validate(db ethdb.Database, msg *Msg) error {
 	log.Debug("Validating CHT", "cht", r.ChtNum, "block", r.BlockNum)
 
 	// Ensure we have a correct message with a single proof element
diff --git a/les/odr_test.go b/les/odr_test.go
index 4f1fccb24..1b436b8e6 100644
--- a/les/odr_test.go
+++ b/les/odr_test.go
@@ -162,8 +162,11 @@ func testOdr(t *testing.T, protocol int, expFail uint64, fn odrTestFn) {
 	lpm, ldb, odr := newTestProtocolManagerMust(t, true, 0, nil)
 	_, err1, lpeer, err2 := newTestPeerPair("peer", protocol, pm, lpm)
 	pool := &testServerPool{}
+	lpm.reqDist = newRequestDistributor(pool.getAllPeers, lpm.quitSync)
+	odr.reqDist = lpm.reqDist
 	pool.setPeer(lpeer)
 	odr.serverPool = pool
+	lpeer.hasBlock = func(common.Hash, uint64) bool { return true }
 	select {
 	case <-time.After(time.Millisecond * 100):
 	case err := <-err1:
diff --git a/les/peer.go b/les/peer.go
index ef5f8a6ce..4793da296 100644
--- a/les/peer.go
+++ b/les/peer.go
@@ -22,6 +22,7 @@ import (
 	"fmt"
 	"math/big"
 	"sync"
+	"time"
 
 	"github.com/ethereum/go-ethereum/common"
 	"github.com/ethereum/go-ethereum/core/types"
@@ -37,7 +38,10 @@ var (
 	errNotRegistered     = errors.New("peer is not registered")
 )
 
-const maxHeadInfoLen = 20
+const (
+	maxHeadInfoLen    = 20
+	maxResponseErrors = 50 // number of invalid responses tolerated (makes the protocol less brittle but still avoids spam)
+)
 
 type peer struct {
 	*p2p.Peer
@@ -53,9 +57,11 @@ type peer struct {
 	lock     sync.RWMutex
 
 	announceChn chan announceData
+	sendQueue   *execQueue
 
-	poolEntry *poolEntry
-	hasBlock  func(common.Hash, uint64) bool
+	poolEntry      *poolEntry
+	hasBlock       func(common.Hash, uint64) bool
+	responseErrors int
 
 	fcClient       *flowcontrol.ClientNode // nil if the peer is server only
 	fcServer       *flowcontrol.ServerNode // nil if the peer is client only
@@ -76,6 +82,14 @@ func newPeer(version, network int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
 	}
 }
 
+func (p *peer) canQueue() bool {
+	return p.sendQueue.canQueue()
+}
+
+func (p *peer) queueSend(f func()) {
+	p.sendQueue.queue(f)
+}
+
 // Info gathers and returns a collection of metadata known about a peer.
 func (p *peer) Info() *eth.PeerInfo {
 	return &eth.PeerInfo{
@@ -117,6 +131,11 @@ func (p *peer) Td() *big.Int {
 	return new(big.Int).Set(p.headInfo.Td)
 }
 
+// waitBefore implements distPeer interface
+func (p *peer) waitBefore(maxCost uint64) (time.Duration, float64) {
+	return p.fcServer.CanSend(maxCost)
+}
+
 func sendRequest(w p2p.MsgWriter, msgcode, reqID, cost uint64, data interface{}) error {
 	type req struct {
 		ReqID uint64
@@ -237,11 +256,8 @@ func (p *peer) RequestHeaderProofs(reqID, cost uint64, reqs []*ChtReq) error {
 	return sendRequest(p.rw, GetHeaderProofsMsg, reqID, cost, reqs)
 }
 
-func (p *peer) SendTxs(cost uint64, txs types.Transactions) error {
+func (p *peer) SendTxs(reqID, cost uint64, txs types.Transactions) error {
 	p.Log().Debug("Fetching batch of transactions", "count", len(txs))
-	reqID := getNextReqID()
-	p.fcServer.MustAssignRequest(reqID)
-	p.fcServer.SendRequest(reqID, cost)
 	return p2p.Send(p.rw, SendTxMsg, txs)
 }
 
@@ -444,6 +460,7 @@ func (ps *peerSet) Register(p *peer) error {
 		return errAlreadyRegistered
 	}
 	ps.peers[p.id] = p
+	p.sendQueue = newExecQueue(100)
 	return nil
 }
 
@@ -453,8 +470,10 @@ func (ps *peerSet) Unregister(id string) error {
 	ps.lock.Lock()
 	defer ps.lock.Unlock()
 
-	if _, ok := ps.peers[id]; !ok {
+	if p, ok := ps.peers[id]; !ok {
 		return errNotRegistered
+	} else {
+		p.sendQueue.quit()
 	}
 	delete(ps.peers, id)
 	return nil
diff --git a/les/request_test.go b/les/request_test.go
index 10e9edf8b..bec6bf1bc 100644
--- a/les/request_test.go
+++ b/les/request_test.go
@@ -72,8 +72,11 @@ func testAccess(t *testing.T, protocol int, fn accessTestFn) {
 	lpm, ldb, odr := newTestProtocolManagerMust(t, true, 0, nil)
 	_, err1, lpeer, err2 := newTestPeerPair("peer", protocol, pm, lpm)
 	pool := &testServerPool{}
+	lpm.reqDist = newRequestDistributor(pool.getAllPeers, lpm.quitSync)
+	odr.reqDist = lpm.reqDist
 	pool.setPeer(lpeer)
 	odr.serverPool = pool
+	lpeer.hasBlock = func(common.Hash, uint64) bool { return true }
 	select {
 	case <-time.After(time.Millisecond * 100):
 	case err := <-err1:
diff --git a/les/serverpool.go b/les/serverpool.go
index 55d481dbf..64fe991c6 100644
--- a/les/serverpool.go
+++ b/les/serverpool.go
@@ -268,82 +268,6 @@ func (pool *serverPool) adjustResponseTime(entry *poolEntry, time time.Duration,
 	}
 }
 
-type selectPeerItem struct {
-	peer   *peer
-	weight int64
-	wait   time.Duration
-}
-
-func (sp selectPeerItem) Weight() int64 {
-	return sp.weight
-}
-
-// selectPeer selects a suitable peer for a request, also returning a necessary waiting time to perform the request
-// and a "locked" flag meaning that the request has been assigned to the given peer and its execution is guaranteed
-// after the given waiting time. If locked flag is false, selectPeer should be called again after the waiting time.
-func (pool *serverPool) selectPeer(reqID uint64, canSend func(*peer) (bool, time.Duration)) (*peer, time.Duration, bool) {
-	pool.lock.Lock()
-	type selectPeer struct {
-		peer         *peer
-		rstat, tstat float64
-	}
-	var list []selectPeer
-	sel := newWeightedRandomSelect()
-	for _, entry := range pool.entries {
-		if entry.state == psRegistered {
-			if !entry.peer.fcServer.IsAssigned() {
-				list = append(list, selectPeer{entry.peer, entry.responseStats.recentAvg(), entry.timeoutStats.recentAvg()})
-			}
-		}
-	}
-	pool.lock.Unlock()
-
-	for _, sp := range list {
-		ok, wait := canSend(sp.peer)
-		if ok {
-			w := int64(1000000000 * (peerSelectMinWeight + math.Exp(-(sp.rstat+float64(wait))/float64(responseScoreTC))*math.Pow((1-sp.tstat), timeoutPow)))
-			sel.update(selectPeerItem{peer: sp.peer, weight: w, wait: wait})
-		}
-	}
-	choice := sel.choose()
-	if choice == nil {
-		return nil, 0, false
-	}
-	peer, wait := choice.(selectPeerItem).peer, choice.(selectPeerItem).wait
-	locked := false
-	if wait < time.Millisecond*100 {
-		if peer.fcServer.AssignRequest(reqID) {
-			ok, w := canSend(peer)
-			wait = time.Duration(w)
-			if ok && wait < time.Millisecond*100 {
-				locked = true
-			} else {
-				peer.fcServer.DeassignRequest(reqID)
-				wait = time.Millisecond * 100
-			}
-		}
-	} else {
-		wait = time.Millisecond * 100
-	}
-	return peer, wait, locked
-}
-
-// selectPeer selects a suitable peer for a request, waiting until an assignment to
-// the request is guaranteed or the process is aborted.
-func (pool *serverPool) selectPeerWait(reqID uint64, canSend func(*peer) (bool, time.Duration), abort <-chan struct{}) *peer {
-	for {
-		peer, wait, locked := pool.selectPeer(reqID, canSend)
-		if locked {
-			return peer
-		}
-		select {
-		case <-abort:
-			return nil
-		case <-time.After(wait):
-		}
-	}
-}
-
 // eventLoop handles pool events and mutex locking for all internal functions
 func (pool *serverPool) eventLoop() {
 	lookupCnt := 0
diff --git a/les/txrelay.go b/les/txrelay.go
index 76d416c57..1ca3467e4 100644
--- a/les/txrelay.go
+++ b/les/txrelay.go
@@ -35,13 +35,14 @@ type LesTxRelay struct {
 	peerList     []*peer
 	peerStartPos int
 	lock         sync.RWMutex
+
+	reqDist *requestDistributor
 }
 
 func NewLesTxRelay() *LesTxRelay {
 	return &LesTxRelay{
 		txSent:    make(map[common.Hash]*ltrInfo),
 		txPending: make(map[common.Hash]struct{}),
-		ps:        newPeerSet(),
 	}
 }
 
@@ -108,10 +109,26 @@ func (self *LesTxRelay) send(txs types.Transactions, count int) {
 	}
 
 	for p, list := range sendTo {
-		cost := p.GetRequestCost(SendTxMsg, len(list))
-		go func(p *peer, list types.Transactions, cost uint64) {
-			p.SendTxs(cost, list)
-		}(p, list, cost)
+		pp := p
+		ll := list
+
+		reqID := getNextReqID()
+		rq := &distReq{
+			getCost: func(dp distPeer) uint64 {
+				peer := dp.(*peer)
+				return peer.GetRequestCost(SendTxMsg, len(ll))
+			},
+			canSend: func(dp distPeer) bool {
+				return dp.(*peer) == pp
+			},
+			request: func(dp distPeer) func() {
+				peer := dp.(*peer)
+				cost := peer.GetRequestCost(SendTxMsg, len(ll))
+				peer.fcServer.QueueRequest(reqID, cost)
+				return func() { peer.SendTxs(reqID, cost, ll) }
+			},
+		}
+		self.reqDist.queue(rq)
 	}
 }
 
diff --git a/light/lightchain.go b/light/lightchain.go
index 4370dc0fc..4715d47ab 100644
--- a/light/lightchain.go
+++ b/light/lightchain.go
@@ -20,6 +20,7 @@ import (
 	"math/big"
 	"sync"
 	"sync/atomic"
+	"time"
 
 	"github.com/ethereum/go-ethereum/common"
 	"github.com/ethereum/go-ethereum/core"
@@ -369,9 +370,17 @@ func (self *LightChain) postChainEvents(events []interface{}) {
 // In the case of a light chain, InsertHeaderChain also creates and posts light
 // chain events when necessary.
 func (self *LightChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (int, error) {
+	start := time.Now()
+	if i, err := self.hc.ValidateHeaderChain(chain, checkFreq); err != nil {
+		return i, err
+	}
+
 	// Make sure only one thread manipulates the chain at once
 	self.chainmu.Lock()
-	defer self.chainmu.Unlock()
+	defer func() {
+		self.chainmu.Unlock()
+		time.Sleep(time.Millisecond * 10) // ugly hack; do not hog chain lock in case syncing is CPU-limited by validation
+	}()
 
 	self.wg.Add(1)
 	defer self.wg.Done()
@@ -397,7 +406,7 @@ func (self *LightChain) InsertHeaderChain(chain []*types.Header, checkFreq int)
 		}
 		return err
 	}
-	i, err := self.hc.InsertHeaderChain(chain, checkFreq, whFunc)
+	i, err := self.hc.InsertHeaderChain(chain, whFunc, start)
 	go self.postChainEvents(events)
 	return i, err
 }
diff --git a/light/txpool.go b/light/txpool.go
index 28c8d8ca5..5eb1ba801 100644
--- a/light/txpool.go
+++ b/light/txpool.go
@@ -276,15 +276,17 @@ func (pool *TxPool) setNewHead(ctx context.Context, newHeader *types.Header) (tx
 	// clear old mined tx entries of old blocks
 	if idx := newHeader.Number.Uint64(); idx > pool.clearIdx+txPermanent {
 		idx2 := idx - txPermanent
-		for i := pool.clearIdx; i < idx2; i++ {
-			hash := core.GetCanonicalHash(pool.chainDb, i)
-			if list, ok := pool.mined[hash]; ok {
-				hashes := make([]common.Hash, len(list))
-				for i, tx := range list {
-					hashes[i] = tx.Hash()
+		if len(pool.mined) > 0 {
+			for i := pool.clearIdx; i < idx2; i++ {
+				hash := core.GetCanonicalHash(pool.chainDb, i)
+				if list, ok := pool.mined[hash]; ok {
+					hashes := make([]common.Hash, len(list))
+					for i, tx := range list {
+						hashes[i] = tx.Hash()
+					}
+					pool.relay.Discard(hashes)
+					delete(pool.mined, hash)
 				}
-				pool.relay.Discard(hashes)
-				delete(pool.mined, hash)
 			}
 		}
 		pool.clearIdx = idx2
@@ -303,15 +305,16 @@ func (pool *TxPool) eventLoop() {
 	for ev := range pool.events.Chan() {
 		switch ev.Data.(type) {
 		case core.ChainHeadEvent:
+			head := pool.chain.CurrentHeader()
 			pool.mu.Lock()
 			ctx, _ := context.WithTimeout(context.Background(), blockCheckTimeout)
-			head := pool.chain.CurrentHeader()
 			txc, _ := pool.setNewHead(ctx, head)
 			m, r := txc.getLists()
 			pool.relay.NewHead(pool.head, m, r)
 			pool.homestead = pool.config.IsHomestead(head.Number)
 			pool.signer = types.MakeSigner(pool.config, head.Number)
 			pool.mu.Unlock()
+			time.Sleep(time.Millisecond) // hack in order to avoid hogging the lock; this part will be replaced by a subsequent PR
 		}
 	}
 }
-- 
GitLab