From 10b3f97c9dcc6f3711aa2d3b1bb43e67eb921223 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= <peterke@gmail.com>
Date: Mon, 11 Sep 2017 13:13:05 +0300
Subject: [PATCH] core: only fire one chain head per batch (#15123)

* core: only fire one chain head per batch

* miner: announce chan events synchronously
---
 core/blockchain.go      | 53 +++++++++++++++++------------
 core/blockchain_test.go |  2 +-
 miner/worker.go         | 75 +++++++++++++++++------------------------
 3 files changed, 63 insertions(+), 67 deletions(-)

diff --git a/core/blockchain.go b/core/blockchain.go
index 25be8d762..235c1f3da 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -854,9 +854,22 @@ func (bc *BlockChain) WriteBlockAndState(block *types.Block, receipts []*types.R
 	return status, nil
 }
 
-// InsertChain will attempt to insert the given chain in to the canonical chain or, otherwise, create a fork. If an error is returned
-// it will return the index number of the failing block as well an error describing what went wrong (for possible errors see core/errors.go).
+// InsertChain attempts to insert the given batch of blocks in to the canonical
+// chain or, otherwise, create a fork. If an error is returned it will return
+// the index number of the failing block as well an error describing what went
+// wrong.
+//
+// After insertion is done, all accumulated events will be fired.
 func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
+	n, events, logs, err := bc.insertChain(chain)
+	bc.PostChainEvents(events, logs)
+	return n, err
+}
+
+// insertChain will execute the actual chain insertion and event aggregation. The
+// only reason this method exists as a separate one is to make locking cleaner
+// with deferred statements.
+func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*types.Log, error) {
 	// Do a sanity check that the provided chain is actually ordered and linked
 	for i := 1; i < len(chain); i++ {
 		if chain[i].NumberU64() != chain[i-1].NumberU64()+1 || chain[i].ParentHash() != chain[i-1].Hash() {
@@ -864,7 +877,7 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
 			log.Error("Non contiguous block insert", "number", chain[i].Number(), "hash", chain[i].Hash(),
 				"parent", chain[i].ParentHash(), "prevnumber", chain[i-1].Number(), "prevhash", chain[i-1].Hash())
 
-			return 0, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, chain[i-1].NumberU64(),
+			return 0, nil, nil, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, chain[i-1].NumberU64(),
 				chain[i-1].Hash().Bytes()[:4], i, chain[i].NumberU64(), chain[i].Hash().Bytes()[:4], chain[i].ParentHash().Bytes()[:4])
 		}
 	}
@@ -881,6 +894,7 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
 	var (
 		stats         = insertStats{startTime: mclock.Now()}
 		events        = make([]interface{}, 0, len(chain))
+		lastCanon     *types.Block
 		coalescedLogs []*types.Log
 	)
 	// Start the parallel header verifier
@@ -904,7 +918,7 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
 		// If the header is a banned one, straight out abort
 		if BadHashes[block.Hash()] {
 			bc.reportBlock(block, nil, ErrBlacklistedHash)
-			return i, ErrBlacklistedHash
+			return i, events, coalescedLogs, ErrBlacklistedHash
 		}
 		// Wait for the block's verification to complete
 		bstart := time.Now()
@@ -925,7 +939,7 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
 				// if given.
 				max := big.NewInt(time.Now().Unix() + maxTimeFutureBlocks)
 				if block.Time().Cmp(max) > 0 {
-					return i, fmt.Errorf("future block: %v > %v", block.Time(), max)
+					return i, events, coalescedLogs, fmt.Errorf("future block: %v > %v", block.Time(), max)
 				}
 				bc.futureBlocks.Add(block.Hash(), block)
 				stats.queued++
@@ -939,7 +953,7 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
 			}
 
 			bc.reportBlock(block, nil, err)
-			return i, err
+			return i, events, coalescedLogs, err
 		}
 		// Create a new statedb using the parent block and report an
 		// error if it fails.
@@ -951,40 +965,35 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
 		}
 		state, err := state.New(parent.Root(), bc.stateCache)
 		if err != nil {
-			return i, err
+			return i, events, coalescedLogs, err
 		}
 		// Process block using the parent state as reference point.
 		receipts, logs, usedGas, err := bc.processor.Process(block, state, bc.vmConfig)
 		if err != nil {
 			bc.reportBlock(block, receipts, err)
-			return i, err
+			return i, events, coalescedLogs, err
 		}
 		// Validate the state using the default validator
 		err = bc.Validator().ValidateState(block, parent, state, receipts, usedGas)
 		if err != nil {
 			bc.reportBlock(block, receipts, err)
-			return i, err
+			return i, events, coalescedLogs, err
 		}
-
 		// Write the block to the chain and get the status.
 		status, err := bc.WriteBlockAndState(block, receipts, state)
 		if err != nil {
-			return i, err
+			return i, events, coalescedLogs, err
 		}
 		switch status {
 		case CanonStatTy:
 			log.Debug("Inserted new block", "number", block.Number(), "hash", block.Hash(), "uncles", len(block.Uncles()),
 				"txs", len(block.Transactions()), "gas", block.GasUsed(), "elapsed", common.PrettyDuration(time.Since(bstart)))
-			// coalesce logs for later processing
+
 			coalescedLogs = append(coalescedLogs, logs...)
 			blockInsertTimer.UpdateSince(bstart)
 			events = append(events, ChainEvent{block, block.Hash(), logs})
-			// We need some control over the mining operation. Acquiring locks and waiting
-			// for the miner to create new block takes too long and in most cases isn't
-			// even necessary.
-			if bc.LastBlockHash() == block.Hash() {
-				events = append(events, ChainHeadEvent{block})
-			}
+			lastCanon = block
+
 		case SideStatTy:
 			log.Debug("Inserted forked block", "number", block.Number(), "hash", block.Hash(), "diff", block.Difficulty(), "elapsed",
 				common.PrettyDuration(time.Since(bstart)), "txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles()))
@@ -996,9 +1005,11 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
 		stats.usedGas += usedGas.Uint64()
 		stats.report(chain, i)
 	}
-	go bc.PostChainEvents(events, coalescedLogs)
-
-	return 0, nil
+	// Append a single chain head event if we've progressed the chain
+	if lastCanon != nil && bc.LastBlockHash() == lastCanon.Hash() {
+		events = append(events, ChainHeadEvent{lastCanon})
+	}
+	return 0, events, coalescedLogs, nil
 }
 
 // insertStats tracks and reports on block insertion.
diff --git a/core/blockchain_test.go b/core/blockchain_test.go
index 470974a0a..700a30a16 100644
--- a/core/blockchain_test.go
+++ b/core/blockchain_test.go
@@ -935,7 +935,7 @@ func TestReorgSideEvent(t *testing.T) {
 		}
 		gen.AddTx(tx)
 	})
-	chainSideCh := make(chan ChainSideEvent)
+	chainSideCh := make(chan ChainSideEvent, 64)
 	blockchain.SubscribeChainSideEvent(chainSideCh)
 	if _, err := blockchain.InsertChain(replacementBlocks); err != nil {
 		t.Fatalf("failed to insert chain: %v", err)
diff --git a/miner/worker.go b/miner/worker.go
index b48db2a30..bf24970f5 100644
--- a/miner/worker.go
+++ b/miner/worker.go
@@ -125,8 +125,6 @@ type worker struct {
 	// atomic status counters
 	mining int32
 	atWork int32
-
-	fullValidation bool
 }
 
 func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase common.Address, eth Backend, mux *event.TypeMux) *worker {
@@ -146,7 +144,6 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com
 		coinbase:       coinbase,
 		agents:         make(map[Agent]struct{}),
 		unconfirmed:    newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth),
-		fullValidation: false,
 	}
 	// Subscribe TxPreEvent for tx pool
 	worker.txSub = eth.TxPool().SubscribeTxPreEvent(worker.txCh)
@@ -297,50 +294,38 @@ func (self *worker) wait() {
 			block := result.Block
 			work := result.Work
 
-			if self.fullValidation {
-				if _, err := self.chain.InsertChain(types.Blocks{block}); err != nil {
-					log.Error("Mined invalid block", "err", err)
-					continue
-				}
-				go self.mux.Post(core.NewMinedBlockEvent{Block: block})
-			} else {
-				// Update the block hash in all logs since it is now available and not when the
-				// receipt/log of individual transactions were created.
-				for _, r := range work.receipts {
-					for _, l := range r.Logs {
-						l.BlockHash = block.Hash()
-					}
-				}
-				for _, log := range work.state.Logs() {
-					log.BlockHash = block.Hash()
-				}
-				stat, err := self.chain.WriteBlockAndState(block, work.receipts, work.state)
-				if err != nil {
-					log.Error("Failed writing block to chain", "err", err)
-					continue
-				}
-
-				// check if canon block and write transactions
-				if stat == core.CanonStatTy {
-					// implicit by posting ChainHeadEvent
-					mustCommitNewWork = false
+			// Update the block hash in all logs since it is now available and not when the
+			// receipt/log of individual transactions were created.
+			for _, r := range work.receipts {
+				for _, l := range r.Logs {
+					l.BlockHash = block.Hash()
 				}
-				// broadcast before waiting for validation
-				go func(block *types.Block, logs []*types.Log, receipts []*types.Receipt) {
-					self.mux.Post(core.NewMinedBlockEvent{Block: block})
-					var (
-						events        []interface{}
-						coalescedLogs []*types.Log
-					)
-					events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
-					if stat == core.CanonStatTy {
-						events = append(events, core.ChainHeadEvent{Block: block})
-						coalescedLogs = logs
-					}
-					// post blockchain events
-					self.chain.PostChainEvents(events, coalescedLogs)
-				}(block, work.state.Logs(), work.receipts)
 			}
+			for _, log := range work.state.Logs() {
+				log.BlockHash = block.Hash()
+			}
+			stat, err := self.chain.WriteBlockAndState(block, work.receipts, work.state)
+			if err != nil {
+				log.Error("Failed writing block to chain", "err", err)
+				continue
+			}
+			// check if canon block and write transactions
+			if stat == core.CanonStatTy {
+				// implicit by posting ChainHeadEvent
+				mustCommitNewWork = false
+			}
+			// Broadcast the block and announce chain insertion event
+			self.mux.Post(core.NewMinedBlockEvent{Block: block})
+			var (
+				events []interface{}
+				logs   = work.state.Logs()
+			)
+			events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
+			if stat == core.CanonStatTy {
+				events = append(events, core.ChainHeadEvent{Block: block})
+			}
+			self.chain.PostChainEvents(events, logs)
+
 			// Insert the block into the set of pending ones to wait for confirmations
 			self.unconfirmed.Insert(block.NumberU64(), block.Hash())
 
-- 
GitLab