From 797b0812ab742b2a24e3c9277fa6564ff9eb9094 Mon Sep 17 00:00:00 2001
From: Martin Holst Swende <martin@swende.se>
Date: Mon, 25 Jan 2021 07:17:05 +0100
Subject: [PATCH] eth/protocols/snap: snap sync testing (#22179)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

* eth/protocols/snap: make timeout configurable

* eth/protocols/snap: snap sync testing

* eth/protocols/snap: test to trigger panic

* eth/protocols/snap: fix race condition on timeouts

* eth/protocols/snap: return error on cancelled sync

* squashme: updates + test causing panic + properly serve accounts in order

* eth/protocols/snap: revert failing storage response

* eth/protocols/snap: revert on bad responses (storage, code)

* eth/protocols/snap: fix account handling stall

* eth/protocols/snap: fix remaining revertal-issues

* eth/protocols/snap: timeouthandler for bytecode requests

* eth/protocols/snap: debugging + fix log message

* eth/protocols/snap: fix misspelliings in docs

* eth/protocols/snap: fix race in bytecode handling

* eth/protocols/snap: undo deduplication of storage roots

* synctests: refactor + minify panic testcase

* eth/protocols/snap: minor polishes

* eth: minor polishes to make logs more useful

* eth/protocols/snap: remove excessive logs from the test runs

* eth/protocols/snap: stress tests with concurrency

* eth/protocols/snap: further fixes to test cancel channel handling

* eth/protocols/snap: extend test timeouts on CI

Co-authored-by: Péter Szilágyi <peterke@gmail.com>
---
 eth/downloader/downloader.go    |    4 +-
 eth/handler.go                  |   16 +-
 eth/protocols/snap/peer.go      |    5 +
 eth/protocols/snap/protocol.go  |    1 +
 eth/protocols/snap/sync.go      |  349 ++++++-----
 eth/protocols/snap/sync_test.go | 1020 +++++++++++++++++++++++++++++++
 6 files changed, 1248 insertions(+), 147 deletions(-)

diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 31c1cb47c..421803876 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -298,7 +298,7 @@ func (d *Downloader) RegisterPeer(id string, version uint, peer Peer) error {
 		// Tests use short IDs, don't choke on them
 		logger = log.New("peer", id)
 	} else {
-		logger = log.New("peer", id[:16])
+		logger = log.New("peer", id[:8])
 	}
 	logger.Trace("Registering sync peer")
 	if err := d.peers.Register(newPeerConnection(id, version, peer, logger)); err != nil {
@@ -325,7 +325,7 @@ func (d *Downloader) UnregisterPeer(id string) error {
 		// Tests use short IDs, don't choke on them
 		logger = log.New("peer", id)
 	} else {
-		logger = log.New("peer", id[:16])
+		logger = log.New("peer", id[:8])
 	}
 	logger.Trace("Unregistering sync peer")
 	if err := d.peers.Unregister(id); err != nil {
diff --git a/eth/handler.go b/eth/handler.go
index a9506c499..f6366d9af 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -326,24 +326,32 @@ func (h *handler) runSnapPeer(peer *snap.Peer, handler snap.Handler) error {
 }
 
 func (h *handler) removePeer(id string) {
+	// Create a custom logger to avoid printing the entire id
+	var logger log.Logger
+	if len(id) < 16 {
+		// Tests use short IDs, don't choke on them
+		logger = log.New("peer", id)
+	} else {
+		logger = log.New("peer", id[:8])
+	}
 	// Remove the eth peer if it exists
 	eth := h.peers.ethPeer(id)
 	if eth != nil {
-		log.Debug("Removing Ethereum peer", "peer", id)
+		logger.Debug("Removing Ethereum peer")
 		h.downloader.UnregisterPeer(id)
 		h.txFetcher.Drop(id)
 
 		if err := h.peers.unregisterEthPeer(id); err != nil {
-			log.Error("Peer removal failed", "peer", id, "err", err)
+			logger.Error("Ethereum peer removal failed", "err", err)
 		}
 	}
 	// Remove the snap peer if it exists
 	snap := h.peers.snapPeer(id)
 	if snap != nil {
-		log.Debug("Removing Snapshot peer", "peer", id)
+		logger.Debug("Removing Snapshot peer")
 		h.downloader.SnapSyncer.Unregister(id)
 		if err := h.peers.unregisterSnapPeer(id); err != nil {
-			log.Error("Peer removal failed", "peer", id, "err", err)
+			logger.Error("Snapshot peer removel failed", "err", err)
 		}
 	}
 	// Hard disconnect at the networking layer
diff --git a/eth/protocols/snap/peer.go b/eth/protocols/snap/peer.go
index 73eaaadd0..4f3d550f1 100644
--- a/eth/protocols/snap/peer.go
+++ b/eth/protocols/snap/peer.go
@@ -56,6 +56,11 @@ func (p *Peer) Version() uint {
 	return p.version
 }
 
+// Log overrides the P2P logget with the higher level one containing only the id.
+func (p *Peer) Log() log.Logger {
+	return p.logger
+}
+
 // RequestAccountRange fetches a batch of accounts rooted in a specific account
 // trie, starting with the origin.
 func (p *Peer) RequestAccountRange(id uint64, root common.Hash, origin, limit common.Hash, bytes uint64) error {
diff --git a/eth/protocols/snap/protocol.go b/eth/protocols/snap/protocol.go
index a1e434969..f1a25a206 100644
--- a/eth/protocols/snap/protocol.go
+++ b/eth/protocols/snap/protocol.go
@@ -61,6 +61,7 @@ var (
 	errDecode         = errors.New("invalid message")
 	errInvalidMsgCode = errors.New("invalid message code")
 	errBadRequest     = errors.New("bad request")
+	errCancelled      = errors.New("sync cancelled")
 )
 
 // Packet represents a p2p message in the `snap` protocol.
diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go
index d6f0eb547..e7720026b 100644
--- a/eth/protocols/snap/sync.go
+++ b/eth/protocols/snap/sync.go
@@ -73,10 +73,6 @@ const (
 	// waste bandwidth.
 	maxTrieRequestCount = 512
 
-	// requestTimeout is the maximum time a peer is allowed to spend on serving
-	// a single network request.
-	requestTimeout = 10 * time.Second // TODO(karalabe): Make it dynamic ala fast-sync?
-
 	// accountConcurrency is the number of chunks to split the account trie into
 	// to allow concurrent retrievals.
 	accountConcurrency = 16
@@ -86,6 +82,12 @@ const (
 	storageConcurrency = 16
 )
 
+var (
+	// requestTimeout is the maximum time a peer is allowed to spend on serving
+	// a single network request.
+	requestTimeout = 10 * time.Second // TODO(karalabe): Make it dynamic ala fast-sync?
+)
+
 // accountRequest tracks a pending account range request to ensure responses are
 // to actual requests and to validate any security constraints.
 //
@@ -331,6 +333,33 @@ type syncProgress struct {
 	BytecodeHealNops   uint64             // Number of bytecodes not requested
 }
 
+// SyncPeer abstracts out the methods required for a peer to be synced against
+// with the goal of allowing the construction of mock peers without the full
+// blown networking.
+type SyncPeer interface {
+	// ID retrieves the peer's unique identifier.
+	ID() string
+
+	// RequestAccountRange fetches a batch of accounts rooted in a specific account
+	// trie, starting with the origin.
+	RequestAccountRange(id uint64, root, origin, limit common.Hash, bytes uint64) error
+
+	// RequestStorageRange fetches a batch of storage slots belonging to one or
+	// more accounts. If slots from only one accout is requested, an origin marker
+	// may also be used to retrieve from there.
+	RequestStorageRanges(id uint64, root common.Hash, accounts []common.Hash, origin, limit []byte, bytes uint64) error
+
+	// RequestByteCodes fetches a batch of bytecodes by hash.
+	RequestByteCodes(id uint64, hashes []common.Hash, bytes uint64) error
+
+	// RequestTrieNodes fetches a batch of account or storage trie nodes rooted in
+	// a specificstate trie.
+	RequestTrieNodes(id uint64, root common.Hash, paths []TrieNodePathSet, bytes uint64) error
+
+	// Log retrieves the peer's own contextual logger.
+	Log() log.Logger
+}
+
 // Syncer is an Ethereum account and storage trie syncer based on snapshots and
 // the  snap protocol. It's purpose is to download all the accounts and storage
 // slots from remote peers and reassemble chunks of the state trie, on top of
@@ -346,14 +375,15 @@ type Syncer struct {
 	db    ethdb.KeyValueStore // Database to store the trie nodes into (and dedup)
 	bloom *trie.SyncBloom     // Bloom filter to deduplicate nodes for state fixup
 
-	root   common.Hash    // Current state trie root being synced
-	tasks  []*accountTask // Current account task set being synced
-	healer *healTask      // Current state healing task being executed
-	update chan struct{}  // Notification channel for possible sync progression
+	root    common.Hash    // Current state trie root being synced
+	tasks   []*accountTask // Current account task set being synced
+	snapped bool           // Flag to signal that snap phase is done
+	healer  *healTask      // Current state healing task being executed
+	update  chan struct{}  // Notification channel for possible sync progression
 
-	peers    map[string]*Peer // Currently active peers to download from
-	peerJoin *event.Feed      // Event feed to react to peers joining
-	peerDrop *event.Feed      // Event feed to react to peers dropping
+	peers    map[string]SyncPeer // Currently active peers to download from
+	peerJoin *event.Feed         // Event feed to react to peers joining
+	peerDrop *event.Feed         // Event feed to react to peers dropping
 
 	// Request tracking during syncing phase
 	statelessPeers map[string]struct{} // Peers that failed to deliver state data
@@ -410,12 +440,14 @@ type Syncer struct {
 	lock sync.RWMutex   // Protects fields that can change outside of sync (peers, reqs, root)
 }
 
+// NewSyncer creates a new snapshot syncer to download the Ethereum state over the
+// snap protocol.
 func NewSyncer(db ethdb.KeyValueStore, bloom *trie.SyncBloom) *Syncer {
 	return &Syncer{
 		db:    db,
 		bloom: bloom,
 
-		peers:    make(map[string]*Peer),
+		peers:    make(map[string]SyncPeer),
 		peerJoin: new(event.Feed),
 		peerDrop: new(event.Feed),
 		update:   make(chan struct{}, 1),
@@ -447,27 +479,29 @@ func NewSyncer(db ethdb.KeyValueStore, bloom *trie.SyncBloom) *Syncer {
 }
 
 // Register injects a new data source into the syncer's peerset.
-func (s *Syncer) Register(peer *Peer) error {
+func (s *Syncer) Register(peer SyncPeer) error {
 	// Make sure the peer is not registered yet
+	id := peer.ID()
+
 	s.lock.Lock()
-	if _, ok := s.peers[peer.id]; ok {
-		log.Error("Snap peer already registered", "id", peer.id)
+	if _, ok := s.peers[id]; ok {
+		log.Error("Snap peer already registered", "id", id)
 
 		s.lock.Unlock()
 		return errors.New("already registered")
 	}
-	s.peers[peer.id] = peer
+	s.peers[id] = peer
 
 	// Mark the peer as idle, even if no sync is running
-	s.accountIdlers[peer.id] = struct{}{}
-	s.storageIdlers[peer.id] = struct{}{}
-	s.bytecodeIdlers[peer.id] = struct{}{}
-	s.trienodeHealIdlers[peer.id] = struct{}{}
-	s.bytecodeHealIdlers[peer.id] = struct{}{}
+	s.accountIdlers[id] = struct{}{}
+	s.storageIdlers[id] = struct{}{}
+	s.bytecodeIdlers[id] = struct{}{}
+	s.trienodeHealIdlers[id] = struct{}{}
+	s.bytecodeHealIdlers[id] = struct{}{}
 	s.lock.Unlock()
 
 	// Notify any active syncs that a new peer can be assigned data
-	s.peerJoin.Send(peer.id)
+	s.peerJoin.Send(id)
 	return nil
 }
 
@@ -566,6 +600,7 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error {
 		s.assignAccountTasks(cancel)
 		s.assignBytecodeTasks(cancel)
 		s.assignStorageTasks(cancel)
+
 		if len(s.tasks) == 0 {
 			// Sync phase done, run heal phase
 			s.assignTrienodeHealTasks(cancel)
@@ -580,7 +615,7 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error {
 		case id := <-peerDrop:
 			s.revertRequests(id)
 		case <-cancel:
-			return nil
+			return errCancelled
 
 		case req := <-s.accountReqFails:
 			s.revertAccountRequest(req)
@@ -622,6 +657,7 @@ func (s *Syncer) loadSyncStatus() {
 				log.Debug("Scheduled account sync task", "from", task.Next, "last", task.Last)
 			}
 			s.tasks = progress.Tasks
+			s.snapped = len(s.tasks) == 0
 
 			s.accountSynced = progress.AccountSynced
 			s.accountBytes = progress.AccountBytes
@@ -701,6 +737,11 @@ func (s *Syncer) cleanAccountTasks() {
 			i--
 		}
 	}
+	if len(s.tasks) == 0 {
+		s.lock.Lock()
+		s.snapped = true
+		s.lock.Unlock()
+	}
 }
 
 // cleanStorageTasks iterates over all the account tasks and storage sub-tasks
@@ -798,7 +839,7 @@ func (s *Syncer) assignAccountTasks(cancel chan struct{}) {
 		delete(s.accountIdlers, idle)
 
 		s.pend.Add(1)
-		go func(peer *Peer, root common.Hash) {
+		go func(peer SyncPeer, root common.Hash) {
 			defer s.pend.Done()
 
 			// Attempt to send the remote request and revert if it fails
@@ -885,7 +926,7 @@ func (s *Syncer) assignBytecodeTasks(cancel chan struct{}) {
 		delete(s.bytecodeIdlers, idle)
 
 		s.pend.Add(1)
-		go func(peer *Peer) {
+		go func(peer SyncPeer) {
 			defer s.pend.Done()
 
 			// Attempt to send the remote request and revert if it fails
@@ -962,7 +1003,6 @@ func (s *Syncer) assignStorageTasks(cancel chan struct{}) {
 				// Found an incomplete storage chunk, schedule it
 				accounts = append(accounts, account)
 				roots = append(roots, st.root)
-
 				subtask = st
 				break // Large contract chunks are downloaded individually
 			}
@@ -1010,7 +1050,7 @@ func (s *Syncer) assignStorageTasks(cancel chan struct{}) {
 		delete(s.storageIdlers, idle)
 
 		s.pend.Add(1)
-		go func(peer *Peer, root common.Hash) {
+		go func(peer SyncPeer, root common.Hash) {
 			defer s.pend.Done()
 
 			// Attempt to send the remote request and revert if it fails
@@ -1125,7 +1165,7 @@ func (s *Syncer) assignTrienodeHealTasks(cancel chan struct{}) {
 		delete(s.trienodeHealIdlers, idle)
 
 		s.pend.Add(1)
-		go func(peer *Peer, root common.Hash) {
+		go func(peer SyncPeer, root common.Hash) {
 			defer s.pend.Done()
 
 			// Attempt to send the remote request and revert if it fails
@@ -1223,7 +1263,7 @@ func (s *Syncer) assignBytecodeHealTasks(cancel chan struct{}) {
 		delete(s.bytecodeHealIdlers, idle)
 
 		s.pend.Add(1)
-		go func(peer *Peer) {
+		go func(peer SyncPeer) {
 			defer s.pend.Done()
 
 			// Attempt to send the remote request and revert if it fails
@@ -1522,7 +1562,7 @@ func (s *Syncer) processAccountResponse(res *accountResponse) {
 			break
 		}
 	}
-	// Itereate over all the accounts and assemble which ones need further sub-
+	// Iterate over all the accounts and assemble which ones need further sub-
 	// filling before the entire account range can be persisted.
 	res.task.needCode = make([]bool, len(res.accounts))
 	res.task.needState = make([]bool, len(res.accounts))
@@ -1566,7 +1606,7 @@ func (s *Syncer) processAccountResponse(res *accountResponse) {
 		}
 	}
 	// Delete any subtasks that have been aborted but not resumed. This may undo
-	// some progress if a newpeer gives us less accounts than an old one, but for
+	// some progress if a new peer gives us less accounts than an old one, but for
 	// now we have to live with that.
 	for hash := range res.task.SubTasks {
 		if _, ok := resumed[hash]; !ok {
@@ -1650,95 +1690,92 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
 	)
 	// Iterate over all the accounts and reconstruct their storage tries from the
 	// delivered slots
-	delivered := make(map[common.Hash]bool)
-	for i := 0; i < len(res.hashes); i++ {
-		delivered[res.roots[i]] = true
-	}
 	for i, account := range res.accounts {
 		// If the account was not delivered, reschedule it
 		if i >= len(res.hashes) {
-			if !delivered[res.roots[i]] {
-				res.mainTask.stateTasks[account] = res.roots[i]
-			}
+			res.mainTask.stateTasks[account] = res.roots[i]
 			continue
 		}
 		// State was delivered, if complete mark as not needed any more, otherwise
 		// mark the account as needing healing
-		for j, acc := range res.mainTask.res.accounts {
-			if res.roots[i] == acc.Root {
-				// If the packet contains multiple contract storage slots, all
-				// but the last are surely complete. The last contract may be
-				// chunked, so check it's continuation flag.
-				if res.subTask == nil && res.mainTask.needState[j] && (i < len(res.hashes)-1 || !res.cont) {
-					res.mainTask.needState[j] = false
-					res.mainTask.pend--
-				}
-				// If the last contract was chunked, mark it as needing healing
-				// to avoid writing it out to disk prematurely.
-				if res.subTask == nil && !res.mainTask.needHeal[j] && i == len(res.hashes)-1 && res.cont {
-					res.mainTask.needHeal[j] = true
-				}
-				// If the last contract was chunked, we need to switch to large
-				// contract handling mode
-				if res.subTask == nil && i == len(res.hashes)-1 && res.cont {
-					// If we haven't yet started a large-contract retrieval, create
-					// the subtasks for it within the main account task
-					if tasks, ok := res.mainTask.SubTasks[account]; !ok {
-						var (
-							next common.Hash
-						)
-						step := new(big.Int).Sub(
-							new(big.Int).Div(
-								new(big.Int).Exp(common.Big2, common.Big256, nil),
-								big.NewInt(storageConcurrency),
-							), common.Big1,
-						)
-						for k := 0; k < storageConcurrency; k++ {
-							last := common.BigToHash(new(big.Int).Add(next.Big(), step))
-							if k == storageConcurrency-1 {
-								// Make sure we don't overflow if the step is not a proper divisor
-								last = common.HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
-							}
-							tasks = append(tasks, &storageTask{
-								Next: next,
-								Last: last,
-								root: acc.Root,
-							})
-							log.Debug("Created storage sync task", "account", account, "root", acc.Root, "from", next, "last", last)
-							next = common.BigToHash(new(big.Int).Add(last.Big(), common.Big1))
+		for j, hash := range res.mainTask.res.hashes {
+			if account != hash {
+				continue
+			}
+			acc := res.mainTask.res.accounts[j]
+
+			// If the packet contains multiple contract storage slots, all
+			// but the last are surely complete. The last contract may be
+			// chunked, so check it's continuation flag.
+			if res.subTask == nil && res.mainTask.needState[j] && (i < len(res.hashes)-1 || !res.cont) {
+				res.mainTask.needState[j] = false
+				res.mainTask.pend--
+			}
+			// If the last contract was chunked, mark it as needing healing
+			// to avoid writing it out to disk prematurely.
+			if res.subTask == nil && !res.mainTask.needHeal[j] && i == len(res.hashes)-1 && res.cont {
+				res.mainTask.needHeal[j] = true
+			}
+			// If the last contract was chunked, we need to switch to large
+			// contract handling mode
+			if res.subTask == nil && i == len(res.hashes)-1 && res.cont {
+				// If we haven't yet started a large-contract retrieval, create
+				// the subtasks for it within the main account task
+				if tasks, ok := res.mainTask.SubTasks[account]; !ok {
+					var (
+						next common.Hash
+					)
+					step := new(big.Int).Sub(
+						new(big.Int).Div(
+							new(big.Int).Exp(common.Big2, common.Big256, nil),
+							big.NewInt(storageConcurrency),
+						), common.Big1,
+					)
+					for k := 0; k < storageConcurrency; k++ {
+						last := common.BigToHash(new(big.Int).Add(next.Big(), step))
+						if k == storageConcurrency-1 {
+							// Make sure we don't overflow if the step is not a proper divisor
+							last = common.HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
 						}
-						res.mainTask.SubTasks[account] = tasks
-
-						// Since we've just created the sub-tasks, this response
-						// is surely for the first one (zero origin)
-						res.subTask = tasks[0]
+						tasks = append(tasks, &storageTask{
+							Next: next,
+							Last: last,
+							root: acc.Root,
+						})
+						log.Debug("Created storage sync task", "account", account, "root", acc.Root, "from", next, "last", last)
+						next = common.BigToHash(new(big.Int).Add(last.Big(), common.Big1))
 					}
+					res.mainTask.SubTasks[account] = tasks
+
+					// Since we've just created the sub-tasks, this response
+					// is surely for the first one (zero origin)
+					res.subTask = tasks[0]
 				}
-				// If we're in large contract delivery mode, forward the subtask
-				if res.subTask != nil {
-					// Ensure the response doesn't overflow into the subsequent task
-					last := res.subTask.Last.Big()
-					for k, hash := range res.hashes[i] {
-						if hash.Big().Cmp(last) > 0 {
-							// Chunk overflown, cut off excess, but also update the boundary
-							for l := k; l < len(res.hashes[i]); l++ {
-								if err := res.tries[i].Prove(res.hashes[i][l][:], 0, res.overflow); err != nil {
-									panic(err) // Account range was already proven, what happened
-								}
+			}
+			// If we're in large contract delivery mode, forward the subtask
+			if res.subTask != nil {
+				// Ensure the response doesn't overflow into the subsequent task
+				last := res.subTask.Last.Big()
+				for k, hash := range res.hashes[i] {
+					if hash.Big().Cmp(last) > 0 {
+						// Chunk overflown, cut off excess, but also update the boundary
+						for l := k; l < len(res.hashes[i]); l++ {
+							if err := res.tries[i].Prove(res.hashes[i][l][:], 0, res.overflow); err != nil {
+								panic(err) // Account range was already proven, what happened
 							}
-							res.hashes[i] = res.hashes[i][:k]
-							res.slots[i] = res.slots[i][:k]
-							res.cont = false // Mark range completed
-							break
 						}
-					}
-					// Forward the relevant storage chunk (even if created just now)
-					if res.cont {
-						res.subTask.Next = common.BigToHash(new(big.Int).Add(res.hashes[i][len(res.hashes[i])-1].Big(), big.NewInt(1)))
-					} else {
-						res.subTask.done = true
+						res.hashes[i] = res.hashes[i][:k]
+						res.slots[i] = res.slots[i][:k]
+						res.cont = false // Mark range completed
+						break
 					}
 				}
+				// Forward the relevant storage chunk (even if created just now)
+				if res.cont {
+					res.subTask.Next = common.BigToHash(new(big.Int).Add(res.hashes[i][len(res.hashes[i])-1].Big(), big.NewInt(1)))
+				} else {
+					res.subTask.done = true
+				}
 			}
 		}
 		// Iterate over all the reconstructed trie nodes and push them to disk
@@ -1941,7 +1978,7 @@ func (s *Syncer) forwardAccountTask(task *accountTask) {
 
 // OnAccounts is a callback method to invoke when a range of accounts are
 // received from a remote peer.
-func (s *Syncer) OnAccounts(peer *Peer, id uint64, hashes []common.Hash, accounts [][]byte, proof [][]byte) error {
+func (s *Syncer) OnAccounts(peer SyncPeer, id uint64, hashes []common.Hash, accounts [][]byte, proof [][]byte) error {
 	size := common.StorageSize(len(hashes) * common.HashLength)
 	for _, account := range accounts {
 		size += common.StorageSize(len(account))
@@ -1949,15 +1986,15 @@ func (s *Syncer) OnAccounts(peer *Peer, id uint64, hashes []common.Hash, account
 	for _, node := range proof {
 		size += common.StorageSize(len(node))
 	}
-	logger := peer.logger.New("reqid", id)
+	logger := peer.Log().New("reqid", id)
 	logger.Trace("Delivering range of accounts", "hashes", len(hashes), "accounts", len(accounts), "proofs", len(proof), "bytes", size)
 
 	// Whether or not the response is valid, we can mark the peer as idle and
 	// notify the scheduler to assign a new task. If the response is invalid,
 	// we'll drop the peer in a bit.
 	s.lock.Lock()
-	if _, ok := s.peers[peer.id]; ok {
-		s.accountIdlers[peer.id] = struct{}{}
+	if _, ok := s.peers[peer.ID()]; ok {
+		s.accountIdlers[peer.ID()] = struct{}{}
 	}
 	select {
 	case s.update <- struct{}{}:
@@ -1975,7 +2012,11 @@ func (s *Syncer) OnAccounts(peer *Peer, id uint64, hashes []common.Hash, account
 
 	// Clean up the request timeout timer, we'll see how to proceed further based
 	// on the actual delivered content
-	req.timeout.Stop()
+	if !req.timeout.Stop() {
+		// The timeout is already triggered, and this request will be reverted+rescheduled
+		s.lock.Unlock()
+		return nil
+	}
 
 	// Response is valid, but check if peer is signalling that it does not have
 	// the requested data. For account range queries that means the state being
@@ -1983,7 +2024,7 @@ func (s *Syncer) OnAccounts(peer *Peer, id uint64, hashes []common.Hash, account
 	// synced to our head.
 	if len(hashes) == 0 && len(accounts) == 0 && len(proof) == 0 {
 		logger.Debug("Peer rejected account range request", "root", s.root)
-		s.statelessPeers[peer.id] = struct{}{}
+		s.statelessPeers[peer.ID()] = struct{}{}
 		s.lock.Unlock()
 
 		// Signal this request as failed, and ready for rescheduling
@@ -2011,6 +2052,8 @@ func (s *Syncer) OnAccounts(peer *Peer, id uint64, hashes []common.Hash, account
 	db, tr, notary, cont, err := trie.VerifyRangeProof(root, req.origin[:], end, keys, accounts, proofdb)
 	if err != nil {
 		logger.Warn("Account range failed proof", "err", err)
+		// Signal this request as failed, and ready for rescheduling
+		s.scheduleRevertAccountRequest(req)
 		return err
 	}
 	// Partial trie reconstructed, send it to the scheduler for storage filling
@@ -2050,9 +2093,9 @@ func (s *Syncer) OnAccounts(peer *Peer, id uint64, hashes []common.Hash, account
 
 // OnByteCodes is a callback method to invoke when a batch of contract
 // bytes codes are received from a remote peer.
-func (s *Syncer) OnByteCodes(peer *Peer, id uint64, bytecodes [][]byte) error {
+func (s *Syncer) OnByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) error {
 	s.lock.RLock()
-	syncing := len(s.tasks) > 0
+	syncing := !s.snapped
 	s.lock.RUnlock()
 
 	if syncing {
@@ -2063,20 +2106,20 @@ func (s *Syncer) OnByteCodes(peer *Peer, id uint64, bytecodes [][]byte) error {
 
 // onByteCodes is a callback method to invoke when a batch of contract
 // bytes codes are received from a remote peer in the syncing phase.
-func (s *Syncer) onByteCodes(peer *Peer, id uint64, bytecodes [][]byte) error {
+func (s *Syncer) onByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) error {
 	var size common.StorageSize
 	for _, code := range bytecodes {
 		size += common.StorageSize(len(code))
 	}
-	logger := peer.logger.New("reqid", id)
+	logger := peer.Log().New("reqid", id)
 	logger.Trace("Delivering set of bytecodes", "bytecodes", len(bytecodes), "bytes", size)
 
 	// Whether or not the response is valid, we can mark the peer as idle and
 	// notify the scheduler to assign a new task. If the response is invalid,
 	// we'll drop the peer in a bit.
 	s.lock.Lock()
-	if _, ok := s.peers[peer.id]; ok {
-		s.bytecodeIdlers[peer.id] = struct{}{}
+	if _, ok := s.peers[peer.ID()]; ok {
+		s.bytecodeIdlers[peer.ID()] = struct{}{}
 	}
 	select {
 	case s.update <- struct{}{}:
@@ -2094,14 +2137,18 @@ func (s *Syncer) onByteCodes(peer *Peer, id uint64, bytecodes [][]byte) error {
 
 	// Clean up the request timeout timer, we'll see how to proceed further based
 	// on the actual delivered content
-	req.timeout.Stop()
+	if !req.timeout.Stop() {
+		// The timeout is already triggered, and this request will be reverted+rescheduled
+		s.lock.Unlock()
+		return nil
+	}
 
 	// Response is valid, but check if peer is signalling that it does not have
 	// the requested data. For bytecode range queries that means the peer is not
 	// yet synced.
 	if len(bytecodes) == 0 {
 		logger.Debug("Peer rejected bytecode request")
-		s.statelessPeers[peer.id] = struct{}{}
+		s.statelessPeers[peer.ID()] = struct{}{}
 		s.lock.Unlock()
 
 		// Signal this request as failed, and ready for rescheduling
@@ -2132,6 +2179,8 @@ func (s *Syncer) onByteCodes(peer *Peer, id uint64, bytecodes [][]byte) error {
 		}
 		// We've either ran out of hashes, or got unrequested data
 		logger.Warn("Unexpected bytecodes", "count", len(bytecodes)-i)
+		// Signal this request as failed, and ready for rescheduling
+		s.scheduleRevertBytecodeRequest(req)
 		return errors.New("unexpected bytecode")
 	}
 	// Response validated, send it to the scheduler for filling
@@ -2150,7 +2199,7 @@ func (s *Syncer) onByteCodes(peer *Peer, id uint64, bytecodes [][]byte) error {
 
 // OnStorage is a callback method to invoke when ranges of storage slots
 // are received from a remote peer.
-func (s *Syncer) OnStorage(peer *Peer, id uint64, hashes [][]common.Hash, slots [][][]byte, proof [][]byte) error {
+func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slots [][][]byte, proof [][]byte) error {
 	// Gather some trace stats to aid in debugging issues
 	var (
 		hashCount int
@@ -2170,15 +2219,15 @@ func (s *Syncer) OnStorage(peer *Peer, id uint64, hashes [][]common.Hash, slots
 	for _, node := range proof {
 		size += common.StorageSize(len(node))
 	}
-	logger := peer.logger.New("reqid", id)
+	logger := peer.Log().New("reqid", id)
 	logger.Trace("Delivering ranges of storage slots", "accounts", len(hashes), "hashes", hashCount, "slots", slotCount, "proofs", len(proof), "size", size)
 
 	// Whether or not the response is valid, we can mark the peer as idle and
 	// notify the scheduler to assign a new task. If the response is invalid,
 	// we'll drop the peer in a bit.
 	s.lock.Lock()
-	if _, ok := s.peers[peer.id]; ok {
-		s.storageIdlers[peer.id] = struct{}{}
+	if _, ok := s.peers[peer.ID()]; ok {
+		s.storageIdlers[peer.ID()] = struct{}{}
 	}
 	select {
 	case s.update <- struct{}{}:
@@ -2196,17 +2245,23 @@ func (s *Syncer) OnStorage(peer *Peer, id uint64, hashes [][]common.Hash, slots
 
 	// Clean up the request timeout timer, we'll see how to proceed further based
 	// on the actual delivered content
-	req.timeout.Stop()
+	if !req.timeout.Stop() {
+		// The timeout is already triggered, and this request will be reverted+rescheduled
+		s.lock.Unlock()
+		return nil
+	}
 
 	// Reject the response if the hash sets and slot sets don't match, or if the
 	// peer sent more data than requested.
 	if len(hashes) != len(slots) {
 		s.lock.Unlock()
+		s.scheduleRevertStorageRequest(req) // reschedule request
 		logger.Warn("Hash and slot set size mismatch", "hashset", len(hashes), "slotset", len(slots))
 		return errors.New("hash and slot set size mismatch")
 	}
 	if len(hashes) > len(req.accounts) {
 		s.lock.Unlock()
+		s.scheduleRevertStorageRequest(req) // reschedule request
 		logger.Warn("Hash set larger than requested", "hashset", len(hashes), "requested", len(req.accounts))
 		return errors.New("hash set larger than requested")
 	}
@@ -2216,11 +2271,9 @@ func (s *Syncer) OnStorage(peer *Peer, id uint64, hashes [][]common.Hash, slots
 	// synced to our head.
 	if len(hashes) == 0 {
 		logger.Debug("Peer rejected storage request")
-		s.statelessPeers[peer.id] = struct{}{}
+		s.statelessPeers[peer.ID()] = struct{}{}
 		s.lock.Unlock()
-
-		// Signal this request as failed, and ready for rescheduling
-		s.scheduleRevertStorageRequest(req)
+		s.scheduleRevertStorageRequest(req) // reschedule request
 		return nil
 	}
 	s.lock.Unlock()
@@ -2250,6 +2303,7 @@ func (s *Syncer) OnStorage(peer *Peer, id uint64, hashes [][]common.Hash, slots
 			// space and hash to the origin root.
 			dbs[i], tries[i], _, _, err = trie.VerifyRangeProof(req.roots[i], nil, nil, keys, slots[i], nil)
 			if err != nil {
+				s.scheduleRevertStorageRequest(req) // reschedule request
 				logger.Warn("Storage slots failed proof", "err", err)
 				return err
 			}
@@ -2264,6 +2318,7 @@ func (s *Syncer) OnStorage(peer *Peer, id uint64, hashes [][]common.Hash, slots
 			}
 			dbs[i], tries[i], notary, cont, err = trie.VerifyRangeProof(req.roots[i], req.origin[:], end, keys, slots[i], proofdb)
 			if err != nil {
+				s.scheduleRevertStorageRequest(req) // reschedule request
 				logger.Warn("Storage range failed proof", "err", err)
 				return err
 			}
@@ -2302,20 +2357,20 @@ func (s *Syncer) OnStorage(peer *Peer, id uint64, hashes [][]common.Hash, slots
 
 // OnTrieNodes is a callback method to invoke when a batch of trie nodes
 // are received from a remote peer.
-func (s *Syncer) OnTrieNodes(peer *Peer, id uint64, trienodes [][]byte) error {
+func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error {
 	var size common.StorageSize
 	for _, node := range trienodes {
 		size += common.StorageSize(len(node))
 	}
-	logger := peer.logger.New("reqid", id)
+	logger := peer.Log().New("reqid", id)
 	logger.Trace("Delivering set of healing trienodes", "trienodes", len(trienodes), "bytes", size)
 
 	// Whether or not the response is valid, we can mark the peer as idle and
 	// notify the scheduler to assign a new task. If the response is invalid,
 	// we'll drop the peer in a bit.
 	s.lock.Lock()
-	if _, ok := s.peers[peer.id]; ok {
-		s.trienodeHealIdlers[peer.id] = struct{}{}
+	if _, ok := s.peers[peer.ID()]; ok {
+		s.trienodeHealIdlers[peer.ID()] = struct{}{}
 	}
 	select {
 	case s.update <- struct{}{}:
@@ -2333,14 +2388,18 @@ func (s *Syncer) OnTrieNodes(peer *Peer, id uint64, trienodes [][]byte) error {
 
 	// Clean up the request timeout timer, we'll see how to proceed further based
 	// on the actual delivered content
-	req.timeout.Stop()
+	if !req.timeout.Stop() {
+		// The timeout is already triggered, and this request will be reverted+rescheduled
+		s.lock.Unlock()
+		return nil
+	}
 
 	// Response is valid, but check if peer is signalling that it does not have
 	// the requested data. For bytecode range queries that means the peer is not
 	// yet synced.
 	if len(trienodes) == 0 {
 		logger.Debug("Peer rejected trienode heal request")
-		s.statelessPeers[peer.id] = struct{}{}
+		s.statelessPeers[peer.ID()] = struct{}{}
 		s.lock.Unlock()
 
 		// Signal this request as failed, and ready for rescheduling
@@ -2371,6 +2430,8 @@ func (s *Syncer) OnTrieNodes(peer *Peer, id uint64, trienodes [][]byte) error {
 		}
 		// We've either ran out of hashes, or got unrequested data
 		logger.Warn("Unexpected healing trienodes", "count", len(trienodes)-i)
+		// Signal this request as failed, and ready for rescheduling
+		s.scheduleRevertTrienodeHealRequest(req)
 		return errors.New("unexpected healing trienode")
 	}
 	// Response validated, send it to the scheduler for filling
@@ -2390,20 +2451,20 @@ func (s *Syncer) OnTrieNodes(peer *Peer, id uint64, trienodes [][]byte) error {
 
 // onHealByteCodes is a callback method to invoke when a batch of contract
 // bytes codes are received from a remote peer in the healing phase.
-func (s *Syncer) onHealByteCodes(peer *Peer, id uint64, bytecodes [][]byte) error {
+func (s *Syncer) onHealByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) error {
 	var size common.StorageSize
 	for _, code := range bytecodes {
 		size += common.StorageSize(len(code))
 	}
-	logger := peer.logger.New("reqid", id)
+	logger := peer.Log().New("reqid", id)
 	logger.Trace("Delivering set of healing bytecodes", "bytecodes", len(bytecodes), "bytes", size)
 
 	// Whether or not the response is valid, we can mark the peer as idle and
 	// notify the scheduler to assign a new task. If the response is invalid,
 	// we'll drop the peer in a bit.
 	s.lock.Lock()
-	if _, ok := s.peers[peer.id]; ok {
-		s.bytecodeHealIdlers[peer.id] = struct{}{}
+	if _, ok := s.peers[peer.ID()]; ok {
+		s.bytecodeHealIdlers[peer.ID()] = struct{}{}
 	}
 	select {
 	case s.update <- struct{}{}:
@@ -2421,14 +2482,18 @@ func (s *Syncer) onHealByteCodes(peer *Peer, id uint64, bytecodes [][]byte) erro
 
 	// Clean up the request timeout timer, we'll see how to proceed further based
 	// on the actual delivered content
-	req.timeout.Stop()
+	if !req.timeout.Stop() {
+		// The timeout is already triggered, and this request will be reverted+rescheduled
+		s.lock.Unlock()
+		return nil
+	}
 
 	// Response is valid, but check if peer is signalling that it does not have
 	// the requested data. For bytecode range queries that means the peer is not
 	// yet synced.
 	if len(bytecodes) == 0 {
 		logger.Debug("Peer rejected bytecode heal request")
-		s.statelessPeers[peer.id] = struct{}{}
+		s.statelessPeers[peer.ID()] = struct{}{}
 		s.lock.Unlock()
 
 		// Signal this request as failed, and ready for rescheduling
@@ -2459,6 +2524,8 @@ func (s *Syncer) onHealByteCodes(peer *Peer, id uint64, bytecodes [][]byte) erro
 		}
 		// We've either ran out of hashes, or got unrequested data
 		logger.Warn("Unexpected healing bytecodes", "count", len(bytecodes)-i)
+		// Signal this request as failed, and ready for rescheduling
+		s.scheduleRevertBytecodeHealRequest(req)
 		return errors.New("unexpected healing bytecode")
 	}
 	// Response validated, send it to the scheduler for filling
diff --git a/eth/protocols/snap/sync_test.go b/eth/protocols/snap/sync_test.go
index 4f28b99bf..0b048786e 100644
--- a/eth/protocols/snap/sync_test.go
+++ b/eth/protocols/snap/sync_test.go
@@ -17,15 +17,29 @@
 package snap
 
 import (
+	"bytes"
 	"crypto/rand"
+	"encoding/binary"
 	"fmt"
+	"math/big"
+	"sort"
 	"testing"
+	"time"
 
+	"github.com/ethereum/go-ethereum/common"
+	"github.com/ethereum/go-ethereum/core/rawdb"
+	"github.com/ethereum/go-ethereum/core/state"
 	"github.com/ethereum/go-ethereum/crypto"
+	"github.com/ethereum/go-ethereum/light"
+	"github.com/ethereum/go-ethereum/log"
+	"github.com/ethereum/go-ethereum/rlp"
+	"github.com/ethereum/go-ethereum/trie"
 	"golang.org/x/crypto/sha3"
 )
 
 func TestHashing(t *testing.T) {
+	t.Parallel()
+
 	var bytecodes = make([][]byte, 10)
 	for i := 0; i < len(bytecodes); i++ {
 		buf := make([]byte, 100)
@@ -96,3 +110,1009 @@ func BenchmarkHashing(b *testing.B) {
 		}
 	})
 }
+
+type storageHandlerFunc func(t *testPeer, requestId uint64, root common.Hash, accounts []common.Hash, origin, limit []byte, max uint64) error
+type accountHandlerFunc func(t *testPeer, requestId uint64, root common.Hash, origin common.Hash, cap uint64) error
+type trieHandlerFunc func(t *testPeer, requestId uint64, root common.Hash, paths []TrieNodePathSet, cap uint64) error
+type codeHandlerFunc func(t *testPeer, id uint64, hashes []common.Hash, max uint64) error
+
+type testPeer struct {
+	id            string
+	test          *testing.T
+	remote        *Syncer
+	logger        log.Logger
+	accountTrie   *trie.Trie
+	accountValues entrySlice
+	storageTries  map[common.Hash]*trie.Trie
+	storageValues map[common.Hash]entrySlice
+
+	accountRequestHandler accountHandlerFunc
+	storageRequestHandler storageHandlerFunc
+	trieRequestHandler    trieHandlerFunc
+	codeRequestHandler    codeHandlerFunc
+	cancelCh              chan struct{}
+}
+
+func newTestPeer(id string, t *testing.T, cancelCh chan struct{}) *testPeer {
+	peer := &testPeer{
+		id:                    id,
+		test:                  t,
+		logger:                log.New("id", id),
+		accountRequestHandler: defaultAccountRequestHandler,
+		trieRequestHandler:    defaultTrieRequestHandler,
+		storageRequestHandler: defaultStorageRequestHandler,
+		codeRequestHandler:    defaultCodeRequestHandler,
+		cancelCh:              cancelCh,
+	}
+	//stderrHandler := log.StreamHandler(os.Stderr, log.TerminalFormat(true))
+	//peer.logger.SetHandler(stderrHandler)
+	return peer
+
+}
+
+func (t *testPeer) ID() string      { return t.id }
+func (t *testPeer) Log() log.Logger { return t.logger }
+
+func (t *testPeer) RequestAccountRange(id uint64, root, origin, limit common.Hash, bytes uint64) error {
+	t.logger.Trace("Fetching range of accounts", "reqid", id, "root", root, "origin", origin, "limit", limit, "bytes", common.StorageSize(bytes))
+	go t.accountRequestHandler(t, id, root, origin, bytes)
+	return nil
+}
+
+func (t *testPeer) RequestTrieNodes(id uint64, root common.Hash, paths []TrieNodePathSet, bytes uint64) error {
+	t.logger.Trace("Fetching set of trie nodes", "reqid", id, "root", root, "pathsets", len(paths), "bytes", common.StorageSize(bytes))
+	go t.trieRequestHandler(t, id, root, paths, bytes)
+	return nil
+}
+
+func (t *testPeer) RequestStorageRanges(id uint64, root common.Hash, accounts []common.Hash, origin, limit []byte, bytes uint64) error {
+	if len(accounts) == 1 && origin != nil {
+		t.logger.Trace("Fetching range of large storage slots", "reqid", id, "root", root, "account", accounts[0], "origin", common.BytesToHash(origin), "limit", common.BytesToHash(limit), "bytes", common.StorageSize(bytes))
+	} else {
+		t.logger.Trace("Fetching ranges of small storage slots", "reqid", id, "root", root, "accounts", len(accounts), "first", accounts[0], "bytes", common.StorageSize(bytes))
+	}
+	go t.storageRequestHandler(t, id, root, accounts, origin, limit, bytes)
+	return nil
+}
+
+func (t *testPeer) RequestByteCodes(id uint64, hashes []common.Hash, bytes uint64) error {
+	t.logger.Trace("Fetching set of byte codes", "reqid", id, "hashes", len(hashes), "bytes", common.StorageSize(bytes))
+	go t.codeRequestHandler(t, id, hashes, bytes)
+	return nil
+}
+
+// defaultTrieRequestHandler is a well-behaving handler for trie healing requests
+func defaultTrieRequestHandler(t *testPeer, requestId uint64, root common.Hash, paths []TrieNodePathSet, cap uint64) error {
+	// Pass the response
+	var nodes [][]byte
+	for _, pathset := range paths {
+		switch len(pathset) {
+		case 1:
+			blob, _, err := t.accountTrie.TryGetNode(pathset[0])
+			if err != nil {
+				t.logger.Info("Error handling req", "error", err)
+				break
+			}
+			nodes = append(nodes, blob)
+		default:
+			account := t.storageTries[(common.BytesToHash(pathset[0]))]
+			for _, path := range pathset[1:] {
+				blob, _, err := account.TryGetNode(path)
+				if err != nil {
+					t.logger.Info("Error handling req", "error", err)
+					break
+				}
+				nodes = append(nodes, blob)
+			}
+		}
+	}
+	t.remote.OnTrieNodes(t, requestId, nodes)
+	return nil
+}
+
+// defaultAccountRequestHandler is a well-behaving handler for AccountRangeRequests
+func defaultAccountRequestHandler(t *testPeer, id uint64, root common.Hash, origin common.Hash, cap uint64) error {
+	keys, vals, proofs := createAccountRequestResponse(t, root, origin, cap)
+	if err := t.remote.OnAccounts(t, id, keys, vals, proofs); err != nil {
+		t.logger.Error("remote error on delivery", "error", err)
+		t.test.Errorf("Remote side rejected our delivery: %v", err)
+		t.remote.Unregister(t.id)
+		close(t.cancelCh)
+		return err
+	}
+	return nil
+}
+
+func createAccountRequestResponse(t *testPeer, root common.Hash, origin common.Hash, cap uint64) (keys []common.Hash, vals [][]byte, proofs [][]byte) {
+	var size uint64
+	for _, entry := range t.accountValues {
+		if size > cap {
+			break
+		}
+		if bytes.Compare(origin[:], entry.k) <= 0 {
+			keys = append(keys, common.BytesToHash(entry.k))
+			vals = append(vals, entry.v)
+			size += uint64(32 + len(entry.v))
+		}
+	}
+	// Unless we send the entire trie, we need to supply proofs
+	// Actually, we need to supply proofs either way! This seems tob be an implementation
+	// quirk in go-ethereum
+	proof := light.NewNodeSet()
+	if err := t.accountTrie.Prove(origin[:], 0, proof); err != nil {
+		t.logger.Error("Could not prove inexistence of origin", "origin", origin,
+			"error", err)
+	}
+	if len(keys) > 0 {
+		lastK := (keys[len(keys)-1])[:]
+		if err := t.accountTrie.Prove(lastK, 0, proof); err != nil {
+			t.logger.Error("Could not prove last item",
+				"error", err)
+		}
+	}
+	for _, blob := range proof.NodeList() {
+		proofs = append(proofs, blob)
+	}
+	return keys, vals, proofs
+}
+
+// defaultStorageRequestHandler is a well-behaving storage request handler
+func defaultStorageRequestHandler(t *testPeer, requestId uint64, root common.Hash, accounts []common.Hash, bOrigin, bLimit []byte, max uint64) error {
+	hashes, slots, proofs := createStorageRequestResponse(t, root, accounts, bOrigin, bLimit, max)
+	if err := t.remote.OnStorage(t, requestId, hashes, slots, proofs); err != nil {
+		t.logger.Error("remote error on delivery", "error", err)
+		t.test.Errorf("Remote side rejected our delivery: %v", err)
+		close(t.cancelCh)
+	}
+	return nil
+}
+
+func defaultCodeRequestHandler(t *testPeer, id uint64, hashes []common.Hash, max uint64) error {
+	var bytecodes [][]byte
+	for _, h := range hashes {
+		bytecodes = append(bytecodes, getCode(h))
+	}
+	if err := t.remote.OnByteCodes(t, id, bytecodes); err != nil {
+		t.logger.Error("remote error on delivery", "error", err)
+		t.test.Errorf("Remote side rejected our delivery: %v", err)
+		close(t.cancelCh)
+	}
+	return nil
+}
+
+func createStorageRequestResponse(t *testPeer, root common.Hash, accounts []common.Hash, bOrigin, bLimit []byte, max uint64) (hashes [][]common.Hash, slots [][][]byte, proofs [][]byte) {
+	var (
+		size  uint64
+		limit = common.HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
+	)
+	if len(bLimit) > 0 {
+		limit = common.BytesToHash(bLimit)
+	}
+	var origin common.Hash
+	if len(bOrigin) > 0 {
+		origin = common.BytesToHash(bOrigin)
+	}
+
+	var limitExceeded bool
+	var incomplete bool
+	for _, account := range accounts {
+
+		var keys []common.Hash
+		var vals [][]byte
+		for _, entry := range t.storageValues[account] {
+			if limitExceeded {
+				incomplete = true
+				break
+			}
+			if bytes.Compare(entry.k, origin[:]) < 0 {
+				incomplete = true
+				continue
+			}
+			keys = append(keys, common.BytesToHash(entry.k))
+			vals = append(vals, entry.v)
+			size += uint64(32 + len(entry.v))
+			if bytes.Compare(entry.k, limit[:]) >= 0 {
+				limitExceeded = true
+			}
+			if size > max {
+				limitExceeded = true
+			}
+		}
+		hashes = append(hashes, keys)
+		slots = append(slots, vals)
+
+		if incomplete {
+			// If we're aborting, we need to prove the first and last item
+			// This terminates the response (and thus the loop)
+			proof := light.NewNodeSet()
+			stTrie := t.storageTries[account]
+
+			// Here's a potential gotcha: when constructing the proof, we cannot
+			// use the 'origin' slice directly, but must use the full 32-byte
+			// hash form.
+			if err := stTrie.Prove(origin[:], 0, proof); err != nil {
+				t.logger.Error("Could not prove inexistence of origin", "origin", origin,
+					"error", err)
+			}
+			if len(keys) > 0 {
+				lastK := (keys[len(keys)-1])[:]
+				if err := stTrie.Prove(lastK, 0, proof); err != nil {
+					t.logger.Error("Could not prove last item", "error", err)
+				}
+			}
+			for _, blob := range proof.NodeList() {
+				proofs = append(proofs, blob)
+			}
+			break
+		}
+	}
+	return hashes, slots, proofs
+}
+
+// emptyRequestAccountRangeFn is a rejects AccountRangeRequests
+func emptyRequestAccountRangeFn(t *testPeer, requestId uint64, root common.Hash, origin common.Hash, cap uint64) error {
+	var proofs [][]byte
+	var keys []common.Hash
+	var vals [][]byte
+	t.remote.OnAccounts(t, requestId, keys, vals, proofs)
+	return nil
+}
+
+func nonResponsiveRequestAccountRangeFn(t *testPeer, requestId uint64, root common.Hash, origin common.Hash, cap uint64) error {
+	return nil
+}
+
+func emptyTrieRequestHandler(t *testPeer, requestId uint64, root common.Hash, paths []TrieNodePathSet, cap uint64) error {
+	var nodes [][]byte
+	t.remote.OnTrieNodes(t, requestId, nodes)
+	return nil
+}
+
+func nonResponsiveTrieRequestHandler(t *testPeer, requestId uint64, root common.Hash, paths []TrieNodePathSet, cap uint64) error {
+	return nil
+}
+
+func emptyStorageRequestHandler(t *testPeer, requestId uint64, root common.Hash, accounts []common.Hash, origin, limit []byte, max uint64) error {
+	var hashes [][]common.Hash
+	var slots [][][]byte
+	var proofs [][]byte
+	t.remote.OnStorage(t, requestId, hashes, slots, proofs)
+	return nil
+}
+
+func nonResponsiveStorageRequestHandler(t *testPeer, requestId uint64, root common.Hash, accounts []common.Hash, origin, limit []byte, max uint64) error {
+	return nil
+}
+
+//func emptyCodeRequestHandler(t *testPeer, id uint64, hashes []common.Hash, max uint64) error {
+//	var bytecodes [][]byte
+//	t.remote.OnByteCodes(t, id, bytecodes)
+//	return nil
+//}
+
+func corruptCodeRequestHandler(t *testPeer, id uint64, hashes []common.Hash, max uint64) error {
+	var bytecodes [][]byte
+	for _, h := range hashes {
+		// Send back the hashes
+		bytecodes = append(bytecodes, h[:])
+	}
+	if err := t.remote.OnByteCodes(t, id, bytecodes); err != nil {
+		t.logger.Error("remote error on delivery", "error", err)
+		// Mimic the real-life handler, which drops a peer on errors
+		t.remote.Unregister(t.id)
+	}
+	return nil
+}
+
+func cappedCodeRequestHandler(t *testPeer, id uint64, hashes []common.Hash, max uint64) error {
+	var bytecodes [][]byte
+	for _, h := range hashes[:1] {
+		bytecodes = append(bytecodes, getCode(h))
+	}
+	if err := t.remote.OnByteCodes(t, id, bytecodes); err != nil {
+		t.logger.Error("remote error on delivery", "error", err)
+		// Mimic the real-life handler, which drops a peer on errors
+		t.remote.Unregister(t.id)
+	}
+	return nil
+}
+
+// starvingStorageRequestHandler is somewhat well-behaving storage handler, but it caps the returned results to be very small
+func starvingStorageRequestHandler(t *testPeer, requestId uint64, root common.Hash, accounts []common.Hash, origin, limit []byte, max uint64) error {
+	return defaultStorageRequestHandler(t, requestId, root, accounts, origin, limit, 500)
+}
+
+func starvingAccountRequestHandler(t *testPeer, requestId uint64, root common.Hash, origin common.Hash, cap uint64) error {
+	return defaultAccountRequestHandler(t, requestId, root, origin, 500)
+}
+
+//func misdeliveringAccountRequestHandler(t *testPeer, requestId uint64, root common.Hash, origin common.Hash, cap uint64) error {
+//	return defaultAccountRequestHandler(t, requestId-1, root, origin, 500)
+//}
+
+func corruptAccountRequestHandler(t *testPeer, requestId uint64, root common.Hash, origin common.Hash, cap uint64) error {
+	hashes, accounts, proofs := createAccountRequestResponse(t, root, origin, cap)
+	if len(proofs) > 0 {
+		proofs = proofs[1:]
+	}
+	if err := t.remote.OnAccounts(t, requestId, hashes, accounts, proofs); err != nil {
+		t.logger.Info("remote error on delivery (as expected)", "error", err)
+		// Mimic the real-life handler, which drops a peer on errors
+		t.remote.Unregister(t.id)
+	}
+	return nil
+}
+
+// corruptStorageRequestHandler doesn't provide good proofs
+func corruptStorageRequestHandler(t *testPeer, requestId uint64, root common.Hash, accounts []common.Hash, origin, limit []byte, max uint64) error {
+	hashes, slots, proofs := createStorageRequestResponse(t, root, accounts, origin, limit, max)
+	if len(proofs) > 0 {
+		proofs = proofs[1:]
+	}
+	if err := t.remote.OnStorage(t, requestId, hashes, slots, proofs); err != nil {
+		t.logger.Info("remote error on delivery (as expected)", "error", err)
+		// Mimic the real-life handler, which drops a peer on errors
+		t.remote.Unregister(t.id)
+	}
+	return nil
+}
+
+func noProofStorageRequestHandler(t *testPeer, requestId uint64, root common.Hash, accounts []common.Hash, origin, limit []byte, max uint64) error {
+	hashes, slots, _ := createStorageRequestResponse(t, root, accounts, origin, limit, max)
+	if err := t.remote.OnStorage(t, requestId, hashes, slots, nil); err != nil {
+		t.logger.Info("remote error on delivery (as expected)", "error", err)
+		// Mimic the real-life handler, which drops a peer on errors
+		t.remote.Unregister(t.id)
+	}
+	return nil
+}
+
+// TestSyncBloatedProof tests a scenario where we provide only _one_ value, but
+// also ship the entire trie inside the proof. If the attack is successful,
+// the remote side does not do any follow-up requests
+func TestSyncBloatedProof(t *testing.T) {
+	t.Parallel()
+
+	sourceAccountTrie, elems := makeAccountTrieNoStorage(100)
+	cancel := make(chan struct{})
+	source := newTestPeer("source", t, cancel)
+	source.accountTrie = sourceAccountTrie
+	source.accountValues = elems
+
+	source.accountRequestHandler = func(t *testPeer, requestId uint64, root common.Hash, origin common.Hash, cap uint64) error {
+		var proofs [][]byte
+		var keys []common.Hash
+		var vals [][]byte
+
+		// The values
+		for _, entry := range t.accountValues {
+			if bytes.Compare(origin[:], entry.k) <= 0 {
+				keys = append(keys, common.BytesToHash(entry.k))
+				vals = append(vals, entry.v)
+			}
+		}
+		// The proofs
+		proof := light.NewNodeSet()
+		if err := t.accountTrie.Prove(origin[:], 0, proof); err != nil {
+			t.logger.Error("Could not prove origin", "origin", origin, "error", err)
+		}
+		// The bloat: add proof of every single element
+		for _, entry := range t.accountValues {
+			if err := t.accountTrie.Prove(entry.k, 0, proof); err != nil {
+				t.logger.Error("Could not prove item", "error", err)
+			}
+		}
+		// And remove one item from the elements
+		if len(keys) > 2 {
+			keys = append(keys[:1], keys[2:]...)
+			vals = append(vals[:1], vals[2:]...)
+		}
+		for _, blob := range proof.NodeList() {
+			proofs = append(proofs, blob)
+		}
+		if err := t.remote.OnAccounts(t, requestId, keys, vals, proofs); err != nil {
+			t.logger.Info("remote error on delivery", "error", err)
+			// This is actually correct, signal to exit the test successfully
+			close(t.cancelCh)
+		}
+		return nil
+	}
+	syncer := setupSyncer(source)
+	if err := syncer.Sync(sourceAccountTrie.Hash(), cancel); err == nil {
+		t.Fatal("No error returned from incomplete/cancelled sync")
+	}
+}
+
+func setupSyncer(peers ...*testPeer) *Syncer {
+	stateDb := rawdb.NewMemoryDatabase()
+	syncer := NewSyncer(stateDb, trie.NewSyncBloom(1, stateDb))
+	for _, peer := range peers {
+		syncer.Register(peer)
+		peer.remote = syncer
+	}
+	return syncer
+}
+
+// TestSync tests a basic sync with one peer
+func TestSync(t *testing.T) {
+	t.Parallel()
+
+	cancel := make(chan struct{})
+	sourceAccountTrie, elems := makeAccountTrieNoStorage(100)
+
+	mkSource := func(name string) *testPeer {
+		source := newTestPeer(name, t, cancel)
+		source.accountTrie = sourceAccountTrie
+		source.accountValues = elems
+		return source
+	}
+
+	syncer := setupSyncer(mkSource("sourceA"))
+	if err := syncer.Sync(sourceAccountTrie.Hash(), cancel); err != nil {
+		t.Fatalf("sync failed: %v", err)
+	}
+}
+
+// TestSyncTinyTriePanic tests a basic sync with one peer, and a tiny trie. This caused a
+// panic within the prover
+func TestSyncTinyTriePanic(t *testing.T) {
+	t.Parallel()
+
+	cancel := make(chan struct{})
+
+	sourceAccountTrie, elems := makeAccountTrieNoStorage(1)
+
+	mkSource := func(name string) *testPeer {
+		source := newTestPeer(name, t, cancel)
+		source.accountTrie = sourceAccountTrie
+		source.accountValues = elems
+		return source
+	}
+
+	syncer := setupSyncer(
+		mkSource("nice-a"),
+	)
+	done := checkStall(t, cancel)
+	if err := syncer.Sync(sourceAccountTrie.Hash(), cancel); err != nil {
+		t.Fatalf("sync failed: %v", err)
+	}
+	close(done)
+}
+
+// TestMultiSync tests a basic sync with multiple peers
+func TestMultiSync(t *testing.T) {
+	t.Parallel()
+
+	cancel := make(chan struct{})
+	sourceAccountTrie, elems := makeAccountTrieNoStorage(100)
+
+	mkSource := func(name string) *testPeer {
+		source := newTestPeer(name, t, cancel)
+		source.accountTrie = sourceAccountTrie
+		source.accountValues = elems
+		return source
+	}
+
+	syncer := setupSyncer(mkSource("sourceA"), mkSource("sourceB"))
+	if err := syncer.Sync(sourceAccountTrie.Hash(), cancel); err != nil {
+		t.Fatalf("sync failed: %v", err)
+	}
+}
+
+// TestSyncWithStorage tests  basic sync using accounts + storage + code
+func TestSyncWithStorage(t *testing.T) {
+	t.Parallel()
+
+	cancel := make(chan struct{})
+	sourceAccountTrie, elems, storageTries, storageElems := makeAccountTrieWithStorage(3, 3000, true)
+
+	mkSource := func(name string) *testPeer {
+		source := newTestPeer(name, t, cancel)
+		source.accountTrie = sourceAccountTrie
+		source.accountValues = elems
+		source.storageTries = storageTries
+		source.storageValues = storageElems
+		return source
+	}
+	syncer := setupSyncer(mkSource("sourceA"))
+	if err := syncer.Sync(sourceAccountTrie.Hash(), cancel); err != nil {
+		t.Fatalf("sync failed: %v", err)
+	}
+}
+
+// TestMultiSyncManyUseless contains one good peer, and many which doesn't return anything valuable at all
+func TestMultiSyncManyUseless(t *testing.T) {
+	t.Parallel()
+
+	cancel := make(chan struct{})
+
+	sourceAccountTrie, elems, storageTries, storageElems := makeAccountTrieWithStorage(100, 3000, true)
+
+	mkSource := func(name string, a, b, c bool) *testPeer {
+		source := newTestPeer(name, t, cancel)
+		source.accountTrie = sourceAccountTrie
+		source.accountValues = elems
+		source.storageTries = storageTries
+		source.storageValues = storageElems
+
+		if !a {
+			source.accountRequestHandler = emptyRequestAccountRangeFn
+		}
+		if !b {
+			source.storageRequestHandler = emptyStorageRequestHandler
+		}
+		if !c {
+			source.trieRequestHandler = emptyTrieRequestHandler
+		}
+		return source
+	}
+
+	syncer := setupSyncer(
+		mkSource("full", true, true, true),
+		mkSource("noAccounts", false, true, true),
+		mkSource("noStorage", true, false, true),
+		mkSource("noTrie", true, true, false),
+	)
+	if err := syncer.Sync(sourceAccountTrie.Hash(), cancel); err != nil {
+		t.Fatalf("sync failed: %v", err)
+	}
+}
+
+// TestMultiSyncManyUseless contains one good peer, and many which doesn't return anything valuable at all
+func TestMultiSyncManyUselessWithLowTimeout(t *testing.T) {
+	// We're setting the timeout to very low, to increase the chance of the timeout
+	// being triggered. This was previously a cause of panic, when a response
+	// arrived simultaneously as a timeout was triggered.
+	defer func(old time.Duration) { requestTimeout = old }(requestTimeout)
+	requestTimeout = time.Millisecond
+
+	cancel := make(chan struct{})
+
+	sourceAccountTrie, elems, storageTries, storageElems := makeAccountTrieWithStorage(100, 3000, true)
+
+	mkSource := func(name string, a, b, c bool) *testPeer {
+		source := newTestPeer(name, t, cancel)
+		source.accountTrie = sourceAccountTrie
+		source.accountValues = elems
+		source.storageTries = storageTries
+		source.storageValues = storageElems
+
+		if !a {
+			source.accountRequestHandler = emptyRequestAccountRangeFn
+		}
+		if !b {
+			source.storageRequestHandler = emptyStorageRequestHandler
+		}
+		if !c {
+			source.trieRequestHandler = emptyTrieRequestHandler
+		}
+		return source
+	}
+
+	syncer := setupSyncer(
+		mkSource("full", true, true, true),
+		mkSource("noAccounts", false, true, true),
+		mkSource("noStorage", true, false, true),
+		mkSource("noTrie", true, true, false),
+	)
+	if err := syncer.Sync(sourceAccountTrie.Hash(), cancel); err != nil {
+		t.Fatalf("sync failed: %v", err)
+	}
+}
+
+// TestMultiSyncManyUnresponsive contains one good peer, and many which doesn't respond at all
+func TestMultiSyncManyUnresponsive(t *testing.T) {
+	// We're setting the timeout to very low, to make the test run a bit faster
+	defer func(old time.Duration) { requestTimeout = old }(requestTimeout)
+	requestTimeout = time.Millisecond
+
+	cancel := make(chan struct{})
+
+	sourceAccountTrie, elems, storageTries, storageElems := makeAccountTrieWithStorage(100, 3000, true)
+
+	mkSource := func(name string, a, b, c bool) *testPeer {
+		source := newTestPeer(name, t, cancel)
+		source.accountTrie = sourceAccountTrie
+		source.accountValues = elems
+		source.storageTries = storageTries
+		source.storageValues = storageElems
+
+		if !a {
+			source.accountRequestHandler = nonResponsiveRequestAccountRangeFn
+		}
+		if !b {
+			source.storageRequestHandler = nonResponsiveStorageRequestHandler
+		}
+		if !c {
+			source.trieRequestHandler = nonResponsiveTrieRequestHandler
+		}
+		return source
+	}
+
+	syncer := setupSyncer(
+		mkSource("full", true, true, true),
+		mkSource("noAccounts", false, true, true),
+		mkSource("noStorage", true, false, true),
+		mkSource("noTrie", true, true, false),
+	)
+	if err := syncer.Sync(sourceAccountTrie.Hash(), cancel); err != nil {
+		t.Fatalf("sync failed: %v", err)
+	}
+}
+
+func checkStall(t *testing.T, cancel chan struct{}) chan struct{} {
+	testDone := make(chan struct{})
+	go func() {
+		select {
+		case <-time.After(time.Minute): // TODO(karalabe): Make tests smaller, this is too much
+			t.Log("Sync stalled")
+			close(cancel)
+		case <-testDone:
+			return
+		}
+	}()
+	return testDone
+}
+
+// TestSyncNoStorageAndOneCappedPeer tests sync using accounts and no storage, where one peer is
+// consistently returning very small results
+func TestSyncNoStorageAndOneCappedPeer(t *testing.T) {
+	t.Parallel()
+
+	cancel := make(chan struct{})
+
+	sourceAccountTrie, elems := makeAccountTrieNoStorage(3000)
+
+	mkSource := func(name string, slow bool) *testPeer {
+		source := newTestPeer(name, t, cancel)
+		source.accountTrie = sourceAccountTrie
+		source.accountValues = elems
+
+		if slow {
+			source.accountRequestHandler = starvingAccountRequestHandler
+		}
+		return source
+	}
+
+	syncer := setupSyncer(
+		mkSource("nice-a", false),
+		mkSource("nice-b", false),
+		mkSource("nice-c", false),
+		mkSource("capped", true),
+	)
+	done := checkStall(t, cancel)
+	if err := syncer.Sync(sourceAccountTrie.Hash(), cancel); err != nil {
+		t.Fatalf("sync failed: %v", err)
+	}
+	close(done)
+}
+
+// TestSyncNoStorageAndOneCodeCorruptPeer has one peer which doesn't deliver
+// code requests properly.
+func TestSyncNoStorageAndOneCodeCorruptPeer(t *testing.T) {
+	t.Parallel()
+
+	cancel := make(chan struct{})
+
+	sourceAccountTrie, elems := makeAccountTrieNoStorage(3000)
+
+	mkSource := func(name string, codeFn codeHandlerFunc) *testPeer {
+		source := newTestPeer(name, t, cancel)
+		source.accountTrie = sourceAccountTrie
+		source.accountValues = elems
+		source.codeRequestHandler = codeFn
+		return source
+	}
+	// One is capped, one is corrupt. If we don't use a capped one, there's a 50%
+	// chance that the full set of codes requested are sent only to the
+	// non-corrupt peer, which delivers everything in one go, and makes the
+	// test moot
+	syncer := setupSyncer(
+		mkSource("capped", cappedCodeRequestHandler),
+		mkSource("corrupt", corruptCodeRequestHandler),
+	)
+	done := checkStall(t, cancel)
+	if err := syncer.Sync(sourceAccountTrie.Hash(), cancel); err != nil {
+		t.Fatalf("sync failed: %v", err)
+	}
+	close(done)
+}
+
+func TestSyncNoStorageAndOneAccountCorruptPeer(t *testing.T) {
+	t.Parallel()
+
+	cancel := make(chan struct{})
+
+	sourceAccountTrie, elems := makeAccountTrieNoStorage(3000)
+
+	mkSource := func(name string, accFn accountHandlerFunc) *testPeer {
+		source := newTestPeer(name, t, cancel)
+		source.accountTrie = sourceAccountTrie
+		source.accountValues = elems
+		source.accountRequestHandler = accFn
+		return source
+	}
+	// One is capped, one is corrupt. If we don't use a capped one, there's a 50%
+	// chance that the full set of codes requested are sent only to the
+	// non-corrupt peer, which delivers everything in one go, and makes the
+	// test moot
+	syncer := setupSyncer(
+		mkSource("capped", defaultAccountRequestHandler),
+		mkSource("corrupt", corruptAccountRequestHandler),
+	)
+	done := checkStall(t, cancel)
+	if err := syncer.Sync(sourceAccountTrie.Hash(), cancel); err != nil {
+		t.Fatalf("sync failed: %v", err)
+	}
+	close(done)
+}
+
+// TestSyncNoStorageAndOneCodeCappedPeer has one peer which delivers code hashes
+// one by one
+func TestSyncNoStorageAndOneCodeCappedPeer(t *testing.T) {
+	t.Parallel()
+
+	cancel := make(chan struct{})
+
+	sourceAccountTrie, elems := makeAccountTrieNoStorage(3000)
+
+	mkSource := func(name string, codeFn codeHandlerFunc) *testPeer {
+		source := newTestPeer(name, t, cancel)
+		source.accountTrie = sourceAccountTrie
+		source.accountValues = elems
+		source.codeRequestHandler = codeFn
+		return source
+	}
+	// Count how many times it's invoked. Remember, there are only 8 unique hashes,
+	// so it shouldn't be more than that
+	var counter int
+	syncer := setupSyncer(
+		mkSource("capped", func(t *testPeer, id uint64, hashes []common.Hash, max uint64) error {
+			counter++
+			return cappedCodeRequestHandler(t, id, hashes, max)
+		}),
+	)
+	done := checkStall(t, cancel)
+	if err := syncer.Sync(sourceAccountTrie.Hash(), cancel); err != nil {
+		t.Fatalf("sync failed: %v", err)
+	}
+	close(done)
+	// There are only 8 unique hashes, and 3K accounts. However, the code
+	// deduplication is per request batch. If it were a perfect global dedup,
+	// we would expect only 8 requests. If there were no dedup, there would be
+	// 3k requests.
+	// We expect somewhere below 100 requests for these 8 unique hashes.
+	if threshold := 100; counter > threshold {
+		t.Fatalf("Error, expected < %d invocations, got %d", threshold, counter)
+	}
+}
+
+// TestSyncWithStorageAndOneCappedPeer tests sync using accounts + storage, where one peer is
+// consistently returning very small results
+func TestSyncWithStorageAndOneCappedPeer(t *testing.T) {
+	t.Parallel()
+
+	cancel := make(chan struct{})
+
+	sourceAccountTrie, elems, storageTries, storageElems := makeAccountTrieWithStorage(300, 1000, false)
+
+	mkSource := func(name string, slow bool) *testPeer {
+		source := newTestPeer(name, t, cancel)
+		source.accountTrie = sourceAccountTrie
+		source.accountValues = elems
+		source.storageTries = storageTries
+		source.storageValues = storageElems
+
+		if slow {
+			source.storageRequestHandler = starvingStorageRequestHandler
+		}
+		return source
+	}
+
+	syncer := setupSyncer(
+		mkSource("nice-a", false),
+		mkSource("slow", true),
+	)
+	done := checkStall(t, cancel)
+	if err := syncer.Sync(sourceAccountTrie.Hash(), cancel); err != nil {
+		t.Fatalf("sync failed: %v", err)
+	}
+	close(done)
+}
+
+// TestSyncWithStorageAndCorruptPeer tests sync using accounts + storage, where one peer is
+// sometimes sending bad proofs
+func TestSyncWithStorageAndCorruptPeer(t *testing.T) {
+	t.Parallel()
+
+	cancel := make(chan struct{})
+
+	sourceAccountTrie, elems, storageTries, storageElems := makeAccountTrieWithStorage(100, 3000, true)
+
+	mkSource := func(name string, handler storageHandlerFunc) *testPeer {
+		source := newTestPeer(name, t, cancel)
+		source.accountTrie = sourceAccountTrie
+		source.accountValues = elems
+		source.storageTries = storageTries
+		source.storageValues = storageElems
+		source.storageRequestHandler = handler
+		return source
+	}
+
+	syncer := setupSyncer(
+		mkSource("nice-a", defaultStorageRequestHandler),
+		mkSource("nice-b", defaultStorageRequestHandler),
+		mkSource("nice-c", defaultStorageRequestHandler),
+		mkSource("corrupt", corruptStorageRequestHandler),
+	)
+	done := checkStall(t, cancel)
+	if err := syncer.Sync(sourceAccountTrie.Hash(), cancel); err != nil {
+		t.Fatalf("sync failed: %v", err)
+	}
+	close(done)
+}
+
+func TestSyncWithStorageAndNonProvingPeer(t *testing.T) {
+	t.Parallel()
+
+	cancel := make(chan struct{})
+
+	sourceAccountTrie, elems, storageTries, storageElems := makeAccountTrieWithStorage(100, 3000, true)
+
+	mkSource := func(name string, handler storageHandlerFunc) *testPeer {
+		source := newTestPeer(name, t, cancel)
+		source.accountTrie = sourceAccountTrie
+		source.accountValues = elems
+		source.storageTries = storageTries
+		source.storageValues = storageElems
+		source.storageRequestHandler = handler
+		return source
+	}
+
+	syncer := setupSyncer(
+		mkSource("nice-a", defaultStorageRequestHandler),
+		mkSource("nice-b", defaultStorageRequestHandler),
+		mkSource("nice-c", defaultStorageRequestHandler),
+		mkSource("corrupt", noProofStorageRequestHandler),
+	)
+	done := checkStall(t, cancel)
+	if err := syncer.Sync(sourceAccountTrie.Hash(), cancel); err != nil {
+		t.Fatalf("sync failed: %v", err)
+	}
+	close(done)
+}
+
+type kv struct {
+	k, v []byte
+	t    bool
+}
+
+// Some helpers for sorting
+type entrySlice []*kv
+
+func (p entrySlice) Len() int           { return len(p) }
+func (p entrySlice) Less(i, j int) bool { return bytes.Compare(p[i].k, p[j].k) < 0 }
+func (p entrySlice) Swap(i, j int)      { p[i], p[j] = p[j], p[i] }
+
+func key32(i uint64) []byte {
+	key := make([]byte, 32)
+	binary.LittleEndian.PutUint64(key, i)
+	return key
+}
+
+var (
+	codehashes = []common.Hash{
+		crypto.Keccak256Hash([]byte{0}),
+		crypto.Keccak256Hash([]byte{1}),
+		crypto.Keccak256Hash([]byte{2}),
+		crypto.Keccak256Hash([]byte{3}),
+		crypto.Keccak256Hash([]byte{4}),
+		crypto.Keccak256Hash([]byte{5}),
+		crypto.Keccak256Hash([]byte{6}),
+		crypto.Keccak256Hash([]byte{7}),
+	}
+)
+
+// getACodeHash returns a pseudo-random code hash
+func getACodeHash(i uint64) []byte {
+	h := codehashes[int(i)%len(codehashes)]
+	return common.CopyBytes(h[:])
+}
+
+// convenience function to lookup the code from the code hash
+func getCode(hash common.Hash) []byte {
+	if hash == emptyCode {
+		return nil
+	}
+	for i, h := range codehashes {
+		if h == hash {
+			return []byte{byte(i)}
+		}
+	}
+	return nil
+}
+
+// makeAccountTrieNoStorage spits out a trie, along with the leafs
+func makeAccountTrieNoStorage(n int) (*trie.Trie, entrySlice) {
+	db := trie.NewDatabase(rawdb.NewMemoryDatabase())
+	accTrie, _ := trie.New(common.Hash{}, db)
+	var entries entrySlice
+	for i := uint64(1); i <= uint64(n); i++ {
+		value, _ := rlp.EncodeToBytes(state.Account{
+			Nonce:    i,
+			Balance:  big.NewInt(int64(i)),
+			Root:     emptyRoot,
+			CodeHash: getACodeHash(i),
+		})
+		key := key32(i)
+		elem := &kv{key, value, false}
+		accTrie.Update(elem.k, elem.v)
+		entries = append(entries, elem)
+	}
+	sort.Sort(entries)
+	// Push to disk layer
+	accTrie.Commit(nil)
+	return accTrie, entries
+}
+
+// makeAccountTrieWithStorage spits out a trie, along with the leafs
+func makeAccountTrieWithStorage(accounts, slots int, code bool) (*trie.Trie, entrySlice,
+	map[common.Hash]*trie.Trie, map[common.Hash]entrySlice) {
+
+	var (
+		db             = trie.NewDatabase(rawdb.NewMemoryDatabase())
+		accTrie, _     = trie.New(common.Hash{}, db)
+		entries        entrySlice
+		storageTries   = make(map[common.Hash]*trie.Trie)
+		storageEntries = make(map[common.Hash]entrySlice)
+	)
+
+	// Make a storage trie which we reuse for the whole lot
+	stTrie, stEntries := makeStorageTrie(slots, db)
+	stRoot := stTrie.Hash()
+	// Create n accounts in the trie
+	for i := uint64(1); i <= uint64(accounts); i++ {
+		key := key32(i)
+		codehash := emptyCode[:]
+		if code {
+			codehash = getACodeHash(i)
+		}
+		value, _ := rlp.EncodeToBytes(state.Account{
+			Nonce:    i,
+			Balance:  big.NewInt(int64(i)),
+			Root:     stRoot,
+			CodeHash: codehash,
+		})
+		elem := &kv{key, value, false}
+		accTrie.Update(elem.k, elem.v)
+		entries = append(entries, elem)
+		// we reuse the same one for all accounts
+		storageTries[common.BytesToHash(key)] = stTrie
+		storageEntries[common.BytesToHash(key)] = stEntries
+	}
+	sort.Sort(entries)
+	stTrie.Commit(nil)
+	accTrie.Commit(nil)
+	return accTrie, entries, storageTries, storageEntries
+}
+
+// makeStorageTrie fills a storage trie with n items, returning the
+// not-yet-committed trie and the sorted entries
+func makeStorageTrie(n int, db *trie.Database) (*trie.Trie, entrySlice) {
+	trie, _ := trie.New(common.Hash{}, db)
+	var entries entrySlice
+	for i := uint64(1); i <= uint64(n); i++ {
+		// store 'i' at slot 'i'
+		slotValue := key32(i)
+		rlpSlotValue, _ := rlp.EncodeToBytes(common.TrimLeftZeroes(slotValue[:]))
+
+		slotKey := key32(i)
+		key := crypto.Keccak256Hash(slotKey[:])
+
+		elem := &kv{key[:], rlpSlotValue, false}
+		trie.Update(elem.k, elem.v)
+		entries = append(entries, elem)
+	}
+	sort.Sort(entries)
+	return trie, entries
+}
-- 
GitLab