From b27589517a5bf7f88944603b201fa1f7c0b33abf Mon Sep 17 00:00:00 2001
From: Felix Lange <fjl@twurst.com>
Date: Wed, 12 Apr 2017 18:48:49 +0200
Subject: [PATCH] consensus/ethash: simplify concurrency in VerifyHeaders

This change removes a convoluted use of sync/atomic from VerifyHeaders.
It also fixes the annoying error about future blocks.
---
 consensus/ethash/consensus.go | 129 +++++++++++++---------------------
 1 file changed, 48 insertions(+), 81 deletions(-)

diff --git a/consensus/ethash/consensus.go b/consensus/ethash/consensus.go
index 4b6e779d5..4f1ab8702 100644
--- a/consensus/ethash/consensus.go
+++ b/consensus/ethash/consensus.go
@@ -22,7 +22,6 @@ import (
 	"fmt"
 	"math/big"
 	"runtime"
-	"sync/atomic"
 	"time"
 
 	"github.com/ethereum/go-ethereum/common"
@@ -46,7 +45,6 @@ var (
 // codebase, inherently breaking if the engine is swapped out. Please put common
 // error types into the consensus package.
 var (
-	errInvalidChain      = errors.New("invalid header chain")
 	errLargeBlockTime    = errors.New("timestamp too big")
 	errZeroBlockTime     = errors.New("timestamp equals parent's")
 	errTooManyUncles     = errors.New("too many uncles")
@@ -90,111 +88,80 @@ func (ethash *Ethash) VerifyHeader(chain consensus.ChainReader, header *types.He
 // a results channel to retrieve the async verifications.
 func (ethash *Ethash) VerifyHeaders(chain consensus.ChainReader, headers []*types.Header, seals []bool) (chan<- struct{}, <-chan error) {
 	// If we're running a full engine faking, accept any input as valid
-	if ethash.fakeFull {
+	if ethash.fakeFull || len(headers) == 0 {
 		abort, results := make(chan struct{}), make(chan error, len(headers))
 		for i := 0; i < len(headers); i++ {
 			results <- nil
 		}
 		return abort, results
 	}
+
 	// Spawn as many workers as allowed threads
 	workers := runtime.GOMAXPROCS(0)
 	if len(headers) < workers {
 		workers = len(headers)
 	}
-	// Create a task channel and spawn the verifiers
-	type result struct {
-		index int
-		err   error
-	}
-	inputs := make(chan int, workers)
-	outputs := make(chan result, len(headers))
 
-	var badblock uint64
+	// Create a task channel and spawn the verifiers
+	var (
+		inputs = make(chan int)
+		done   = make(chan int, workers)
+		errors = make([]error, len(headers))
+		abort  = make(chan struct{})
+	)
 	for i := 0; i < workers; i++ {
 		go func() {
 			for index := range inputs {
-				// If we've found a bad block already before this, stop validating
-				if bad := atomic.LoadUint64(&badblock); bad != 0 && bad <= headers[index].Number.Uint64() {
-					outputs <- result{index: index, err: errInvalidChain}
-					continue
-				}
-				// We need to look up the first parent
-				var parent *types.Header
-				if index == 0 {
-					parent = chain.GetHeader(headers[0].ParentHash, headers[0].Number.Uint64()-1)
-				} else if headers[index-1].Hash() == headers[index].ParentHash {
-					parent = headers[index-1]
-				}
-				// Ensure the validation is useful and execute it
-				var failure error
-				switch {
-				case chain.GetHeader(headers[index].Hash(), headers[index].Number.Uint64()-1) != nil:
-					outputs <- result{index: index, err: nil}
-				case parent == nil:
-					failure = consensus.ErrUnknownAncestor
-					outputs <- result{index: index, err: failure}
-				default:
-					failure = ethash.verifyHeader(chain, headers[index], parent, false, seals[index])
-					outputs <- result{index: index, err: failure}
-				}
-				// If a validation failure occurred, mark subsequent blocks invalid
-				if failure != nil {
-					number := headers[index].Number.Uint64()
-					if prev := atomic.LoadUint64(&badblock); prev == 0 || prev > number {
-						// This two step atomic op isn't thread-safe in that `badblock` might end
-						// up slightly higher than the block number of the first failure (if many
-						// workers try to write at the same time), but it's fine as we're mostly
-						// interested to avoid large useless work, we don't care about 1-2 extra
-						// runs. Doing "full thread safety" would involve mutexes, which would be
-						// a noticeable sync overhead on the fast spinning worker routines.
-						atomic.StoreUint64(&badblock, number)
-					}
-				}
+				errors[index] = ethash.verifyHeaderWorker(chain, headers, seals, index)
+				done <- index
 			}
 		}()
 	}
-	// Feed item indices to the workers until done, sorting and feeding the results to the caller
-	dones := make([]bool, len(headers))
-	errors := make([]error, len(headers))
-
-	abort := make(chan struct{})
-	returns := make(chan error, len(headers))
 
+	errorsOut := make(chan error, len(headers))
 	go func() {
 		defer close(inputs)
-
-		input, output := 0, 0
-		for i := 0; i < len(headers)*2; i++ {
-			var res result
-
-			// If there are tasks left, push to workers
-			if input < len(headers) {
-				select {
-				case inputs <- input:
-					input++
-					continue
-				case <-abort:
-					return
-				case res = <-outputs:
+		var (
+			in, out = 0, 0
+			checked = make([]bool, len(headers))
+			inputs  = inputs
+		)
+		for {
+			select {
+			case inputs <- in:
+				if in++; in == len(headers) {
+					// Reached end of headers. Stop sending to workers.
+					inputs = nil
 				}
-			} else {
-				// Otherwise keep waiting for results
-				select {
-				case <-abort:
-					return
-				case res = <-outputs:
+			case index := <-done:
+				for checked[index] = true; checked[out]; out++ {
+					errorsOut <- errors[out]
+					if out == len(headers)-1 {
+						return
+					}
 				}
-			}
-			// A result arrived, save and propagate if next
-			dones[res.index], errors[res.index] = true, res.err
-			for output < len(headers) && dones[output] {
-				returns <- errors[output]
-				output++
+			case <-abort:
+				return
 			}
 		}
 	}()
-	return abort, returns
+	return abort, errorsOut
+}
+
+func (ethash *Ethash) verifyHeaderWorker(chain consensus.ChainReader, headers []*types.Header, seals []bool, index int) error {
+	var parent *types.Header
+	if index == 0 {
+		parent = chain.GetHeader(headers[0].ParentHash, headers[0].Number.Uint64()-1)
+	} else if headers[index-1].Hash() == headers[index].ParentHash {
+		parent = headers[index-1]
+	}
+	if parent == nil {
+		return consensus.ErrUnknownAncestor
+	}
+	if chain.GetHeader(headers[index].Hash(), headers[index].Number.Uint64()) != nil {
+		return nil // known block
+	}
+	return ethash.verifyHeader(chain, headers[index], parent, false, seals[index])
 }
 
 // VerifyUncles verifies that the given block's uncles conform to the consensus
-- 
GitLab