From fc7e0fe6c75f5c0015935eb49bde677a53be52be Mon Sep 17 00:00:00 2001
From: gary rong <garyrong0905@gmail.com>
Date: Fri, 29 Nov 2019 21:22:08 +0800
Subject: [PATCH] core, miner: remove PostChainEvents (#19396)

This change:

- removes the PostChainEvents method on core.BlockChain.
- sorts 'removed log' events by block number.
- fire the NewChainHead event if we inject a canonical block into the chain
  even if the entire insertion is not successful.
- guarantees correct event ordering in all cases.
---
 core/blockchain.go      | 179 ++++++++++++++++++++--------------------
 core/blockchain_test.go | 107 ++++++++++++------------
 miner/worker.go         |  20 ++---
 miner/worker_test.go    |  12 ++-
 4 files changed, 160 insertions(+), 158 deletions(-)

diff --git a/core/blockchain.go b/core/blockchain.go
index 73b6ee65d..ebab7a3ab 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -1260,16 +1260,16 @@ func (bc *BlockChain) writeKnownBlock(block *types.Block) error {
 }
 
 // WriteBlockWithState writes the block and all associated state to the database.
-func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, err error) {
+func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) {
 	bc.chainmu.Lock()
 	defer bc.chainmu.Unlock()
 
-	return bc.writeBlockWithState(block, receipts, state)
+	return bc.writeBlockWithState(block, receipts, logs, state, emitHeadEvent)
 }
 
 // writeBlockWithState writes the block and all associated state to the database,
 // but is expects the chain mutex to be held.
-func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, err error) {
+func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) {
 	bc.wg.Add(1)
 	defer bc.wg.Done()
 
@@ -1394,6 +1394,23 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
 		bc.insert(block)
 	}
 	bc.futureBlocks.Remove(block.Hash())
+
+	if status == CanonStatTy {
+		bc.chainFeed.Send(ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
+		if len(logs) > 0 {
+			bc.logsFeed.Send(logs)
+		}
+		// In theory we should fire a ChainHeadEvent when we inject
+		// a canonical block, but sometimes we can insert a batch of
+		// canonicial blocks. Avoid firing too much ChainHeadEvents,
+		// we will fire an accumulated ChainHeadEvent and disable fire
+		// event here.
+		if emitHeadEvent {
+			bc.chainHeadFeed.Send(ChainHeadEvent{Block: block})
+		}
+	} else {
+		bc.chainSideFeed.Send(ChainSideEvent{Block: block})
+	}
 	return status, nil
 }
 
@@ -1444,11 +1461,10 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
 	// Pre-checks passed, start the full block imports
 	bc.wg.Add(1)
 	bc.chainmu.Lock()
-	n, events, logs, err := bc.insertChain(chain, true)
+	n, err := bc.insertChain(chain, true)
 	bc.chainmu.Unlock()
 	bc.wg.Done()
 
-	bc.PostChainEvents(events, logs)
 	return n, err
 }
 
@@ -1460,23 +1476,24 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
 // racey behaviour. If a sidechain import is in progress, and the historic state
 // is imported, but then new canon-head is added before the actual sidechain
 // completes, then the historic state could be pruned again
-func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, error) {
+func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, error) {
 	// If the chain is terminating, don't even bother starting up
 	if atomic.LoadInt32(&bc.procInterrupt) == 1 {
-		return 0, nil, nil, nil
+		return 0, nil
 	}
 	// Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss)
 	senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain)
 
-	// A queued approach to delivering events. This is generally
-	// faster than direct delivery and requires much less mutex
-	// acquiring.
 	var (
-		stats         = insertStats{startTime: mclock.Now()}
-		events        = make([]interface{}, 0, len(chain))
-		lastCanon     *types.Block
-		coalescedLogs []*types.Log
+		stats     = insertStats{startTime: mclock.Now()}
+		lastCanon *types.Block
 	)
+	// Fire a single chain head event if we've progressed the chain
+	defer func() {
+		if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() {
+			bc.chainHeadFeed.Send(ChainHeadEvent{lastCanon})
+		}
+	}()
 	// Start the parallel header verifier
 	headers := make([]*types.Header, len(chain))
 	seals := make([]bool, len(chain))
@@ -1526,7 +1543,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
 		for block != nil && err == ErrKnownBlock {
 			log.Debug("Writing previously known block", "number", block.Number(), "hash", block.Hash())
 			if err := bc.writeKnownBlock(block); err != nil {
-				return it.index, nil, nil, err
+				return it.index, err
 			}
 			lastCanon = block
 
@@ -1545,7 +1562,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
 		for block != nil && (it.index == 0 || err == consensus.ErrUnknownAncestor) {
 			log.Debug("Future block, postponing import", "number", block.Number(), "hash", block.Hash())
 			if err := bc.addFutureBlock(block); err != nil {
-				return it.index, events, coalescedLogs, err
+				return it.index, err
 			}
 			block, err = it.next()
 		}
@@ -1553,14 +1570,14 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
 		stats.ignored += it.remaining()
 
 		// If there are any still remaining, mark as ignored
-		return it.index, events, coalescedLogs, err
+		return it.index, err
 
 	// Some other error occurred, abort
 	case err != nil:
 		bc.futureBlocks.Remove(block.Hash())
 		stats.ignored += len(it.chain)
 		bc.reportBlock(block, nil, err)
-		return it.index, events, coalescedLogs, err
+		return it.index, err
 	}
 	// No validation errors for the first block (or chain prefix skipped)
 	for ; block != nil && err == nil || err == ErrKnownBlock; block, err = it.next() {
@@ -1572,7 +1589,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
 		// If the header is a banned one, straight out abort
 		if BadHashes[block.Hash()] {
 			bc.reportBlock(block, nil, ErrBlacklistedHash)
-			return it.index, events, coalescedLogs, ErrBlacklistedHash
+			return it.index, ErrBlacklistedHash
 		}
 		// If the block is known (in the middle of the chain), it's a special case for
 		// Clique blocks where they can share state among each other, so importing an
@@ -1589,15 +1606,13 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
 				"root", block.Root())
 
 			if err := bc.writeKnownBlock(block); err != nil {
-				return it.index, nil, nil, err
+				return it.index, err
 			}
 			stats.processed++
 
 			// We can assume that logs are empty here, since the only way for consecutive
 			// Clique blocks to have the same state is if there are no transactions.
-			events = append(events, ChainEvent{block, block.Hash(), nil})
 			lastCanon = block
-
 			continue
 		}
 		// Retrieve the parent block and it's state to execute on top
@@ -1609,7 +1624,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
 		}
 		statedb, err := state.New(parent.Root, bc.stateCache)
 		if err != nil {
-			return it.index, events, coalescedLogs, err
+			return it.index, err
 		}
 		// If we have a followup block, run that against the current state to pre-cache
 		// transactions and probabilistically some of the account/storage trie nodes.
@@ -1634,7 +1649,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
 		if err != nil {
 			bc.reportBlock(block, receipts, err)
 			atomic.StoreUint32(&followupInterrupt, 1)
-			return it.index, events, coalescedLogs, err
+			return it.index, err
 		}
 		// Update the metrics touched during block processing
 		accountReadTimer.Update(statedb.AccountReads)     // Account reads are complete, we can mark them
@@ -1653,7 +1668,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
 		if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil {
 			bc.reportBlock(block, receipts, err)
 			atomic.StoreUint32(&followupInterrupt, 1)
-			return it.index, events, coalescedLogs, err
+			return it.index, err
 		}
 		proctime := time.Since(start)
 
@@ -1665,10 +1680,10 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
 
 		// Write the block to the chain and get the status.
 		substart = time.Now()
-		status, err := bc.writeBlockWithState(block, receipts, statedb)
+		status, err := bc.writeBlockWithState(block, receipts, logs, statedb, false)
 		if err != nil {
 			atomic.StoreUint32(&followupInterrupt, 1)
-			return it.index, events, coalescedLogs, err
+			return it.index, err
 		}
 		atomic.StoreUint32(&followupInterrupt, 1)
 
@@ -1686,8 +1701,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
 				"elapsed", common.PrettyDuration(time.Since(start)),
 				"root", block.Root())
 
-			coalescedLogs = append(coalescedLogs, logs...)
-			events = append(events, ChainEvent{block, block.Hash(), logs})
 			lastCanon = block
 
 			// Only count canonical blocks for GC processing time
@@ -1698,7 +1711,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
 				"diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(start)),
 				"txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles()),
 				"root", block.Root())
-			events = append(events, ChainSideEvent{block})
 
 		default:
 			// This in theory is impossible, but lets be nice to our future selves and leave
@@ -1717,24 +1729,20 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
 	// Any blocks remaining here? The only ones we care about are the future ones
 	if block != nil && err == consensus.ErrFutureBlock {
 		if err := bc.addFutureBlock(block); err != nil {
-			return it.index, events, coalescedLogs, err
+			return it.index, err
 		}
 		block, err = it.next()
 
 		for ; block != nil && err == consensus.ErrUnknownAncestor; block, err = it.next() {
 			if err := bc.addFutureBlock(block); err != nil {
-				return it.index, events, coalescedLogs, err
+				return it.index, err
 			}
 			stats.queued++
 		}
 	}
 	stats.ignored += it.remaining()
 
-	// Append a single chain head event if we've progressed the chain
-	if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() {
-		events = append(events, ChainHeadEvent{lastCanon})
-	}
-	return it.index, events, coalescedLogs, err
+	return it.index, err
 }
 
 // insertSideChain is called when an import batch hits upon a pruned ancestor
@@ -1743,7 +1751,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
 //
 // The method writes all (header-and-body-valid) blocks to disk, then tries to
 // switch over to the new chain if the TD exceeded the current chain.
-func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (int, []interface{}, []*types.Log, error) {
+func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (int, error) {
 	var (
 		externTd *big.Int
 		current  = bc.CurrentBlock()
@@ -1779,7 +1787,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
 				// If someone legitimately side-mines blocks, they would still be imported as usual. However,
 				// we cannot risk writing unverified blocks to disk when they obviously target the pruning
 				// mechanism.
-				return it.index, nil, nil, errors.New("sidechain ghost-state attack")
+				return it.index, errors.New("sidechain ghost-state attack")
 			}
 		}
 		if externTd == nil {
@@ -1790,7 +1798,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
 		if !bc.HasBlock(block.Hash(), block.NumberU64()) {
 			start := time.Now()
 			if err := bc.writeBlockWithoutState(block, externTd); err != nil {
-				return it.index, nil, nil, err
+				return it.index, err
 			}
 			log.Debug("Injected sidechain block", "number", block.Number(), "hash", block.Hash(),
 				"diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(start)),
@@ -1807,7 +1815,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
 	localTd := bc.GetTd(current.Hash(), current.NumberU64())
 	if localTd.Cmp(externTd) > 0 {
 		log.Info("Sidechain written to disk", "start", it.first().NumberU64(), "end", it.previous().Number, "sidetd", externTd, "localtd", localTd)
-		return it.index, nil, nil, err
+		return it.index, err
 	}
 	// Gather all the sidechain hashes (full blocks may be memory heavy)
 	var (
@@ -1822,7 +1830,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
 		parent = bc.GetHeader(parent.ParentHash, parent.Number.Uint64()-1)
 	}
 	if parent == nil {
-		return it.index, nil, nil, errors.New("missing parent")
+		return it.index, errors.New("missing parent")
 	}
 	// Import all the pruned blocks to make the state available
 	var (
@@ -1841,15 +1849,15 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
 		// memory here.
 		if len(blocks) >= 2048 || memory > 64*1024*1024 {
 			log.Info("Importing heavy sidechain segment", "blocks", len(blocks), "start", blocks[0].NumberU64(), "end", block.NumberU64())
-			if _, _, _, err := bc.insertChain(blocks, false); err != nil {
-				return 0, nil, nil, err
+			if _, err := bc.insertChain(blocks, false); err != nil {
+				return 0, err
 			}
 			blocks, memory = blocks[:0], 0
 
 			// If the chain is terminating, stop processing blocks
 			if atomic.LoadInt32(&bc.procInterrupt) == 1 {
 				log.Debug("Premature abort during blocks processing")
-				return 0, nil, nil, nil
+				return 0, nil
 			}
 		}
 	}
@@ -1857,7 +1865,7 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
 		log.Info("Importing sidechain segment", "start", blocks[0].NumberU64(), "end", blocks[len(blocks)-1].NumberU64())
 		return bc.insertChain(blocks, false)
 	}
-	return 0, nil, nil, nil
+	return 0, nil
 }
 
 // reorg takes two blocks, an old chain and a new chain and will reconstruct the
@@ -1872,11 +1880,11 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
 		deletedTxs types.Transactions
 		addedTxs   types.Transactions
 
-		deletedLogs []*types.Log
-		rebirthLogs []*types.Log
+		deletedLogs [][]*types.Log
+		rebirthLogs [][]*types.Log
 
-		// collectLogs collects the logs that were generated during the
-		// processing of the block that corresponds with the given hash.
+		// collectLogs collects the logs that were generated or removed during
+		// the processing of the block that corresponds with the given hash.
 		// These logs are later announced as deleted or reborn
 		collectLogs = func(hash common.Hash, removed bool) {
 			number := bc.hc.GetBlockNumber(hash)
@@ -1884,18 +1892,40 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
 				return
 			}
 			receipts := rawdb.ReadReceipts(bc.db, hash, *number, bc.chainConfig)
+
+			var logs []*types.Log
 			for _, receipt := range receipts {
 				for _, log := range receipt.Logs {
 					l := *log
 					if removed {
 						l.Removed = true
-						deletedLogs = append(deletedLogs, &l)
 					} else {
-						rebirthLogs = append(rebirthLogs, &l)
 					}
+					logs = append(logs, &l)
+				}
+			}
+			if len(logs) > 0 {
+				if removed {
+					deletedLogs = append(deletedLogs, logs)
+				} else {
+					rebirthLogs = append(rebirthLogs, logs)
 				}
 			}
 		}
+		// mergeLogs returns a merged log slice with specified sort order.
+		mergeLogs = func(logs [][]*types.Log, reverse bool) []*types.Log {
+			var ret []*types.Log
+			if reverse {
+				for i := len(logs) - 1; i >= 0; i-- {
+					ret = append(ret, logs[i]...)
+				}
+			} else {
+				for i := 0; i < len(logs); i++ {
+					ret = append(ret, logs[i]...)
+				}
+			}
+			return ret
+		}
 	)
 	// Reduce the longer chain to the same number as the shorter one
 	if oldBlock.NumberU64() > newBlock.NumberU64() {
@@ -1990,45 +2020,18 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
 	// this goroutine if there are no events to fire, but realistcally that only
 	// ever happens if we're reorging empty blocks, which will only happen on idle
 	// networks where performance is not an issue either way.
-	//
-	// TODO(karalabe): Can we get rid of the goroutine somehow to guarantee correct
-	// event ordering?
-	go func() {
-		if len(deletedLogs) > 0 {
-			bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
-		}
-		if len(rebirthLogs) > 0 {
-			bc.logsFeed.Send(rebirthLogs)
-		}
-		if len(oldChain) > 0 {
-			for _, block := range oldChain {
-				bc.chainSideFeed.Send(ChainSideEvent{Block: block})
-			}
-		}
-	}()
-	return nil
-}
-
-// PostChainEvents iterates over the events generated by a chain insertion and
-// posts them into the event feed.
-// TODO: Should not expose PostChainEvents. The chain events should be posted in WriteBlock.
-func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) {
-	// post event logs for further processing
-	if logs != nil {
-		bc.logsFeed.Send(logs)
+	if len(deletedLogs) > 0 {
+		bc.rmLogsFeed.Send(RemovedLogsEvent{mergeLogs(deletedLogs, true)})
 	}
-	for _, event := range events {
-		switch ev := event.(type) {
-		case ChainEvent:
-			bc.chainFeed.Send(ev)
-
-		case ChainHeadEvent:
-			bc.chainHeadFeed.Send(ev)
-
-		case ChainSideEvent:
-			bc.chainSideFeed.Send(ev)
+	if len(rebirthLogs) > 0 {
+		bc.logsFeed.Send(mergeLogs(rebirthLogs, false))
+	}
+	if len(oldChain) > 0 {
+		for i := len(oldChain) - 1; i >= 0; i-- {
+			bc.chainSideFeed.Send(ChainSideEvent{Block: oldChain[i]})
 		}
 	}
+	return nil
 }
 
 func (bc *BlockChain) update() {
diff --git a/core/blockchain_test.go b/core/blockchain_test.go
index 3c2bba569..de23ead21 100644
--- a/core/blockchain_test.go
+++ b/core/blockchain_test.go
@@ -22,6 +22,7 @@ import (
 	"math/big"
 	"math/rand"
 	"os"
+	"reflect"
 	"sync"
 	"testing"
 	"time"
@@ -960,16 +961,20 @@ func TestLogReorgs(t *testing.T) {
 	}
 
 	chain, _ = GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 3, func(i int, gen *BlockGen) {})
+	done := make(chan struct{})
+	go func() {
+		ev := <-rmLogsCh
+		if len(ev.Logs) == 0 {
+			t.Error("expected logs")
+		}
+		close(done)
+	}()
 	if _, err := blockchain.InsertChain(chain); err != nil {
 		t.Fatalf("failed to insert forked chain: %v", err)
 	}
-
 	timeout := time.NewTimer(1 * time.Second)
 	select {
-	case ev := <-rmLogsCh:
-		if len(ev.Logs) == 0 {
-			t.Error("expected logs")
-		}
+	case <-done:
 	case <-timeout.C:
 		t.Fatal("Timeout. There is no RemovedLogsEvent has been sent.")
 	}
@@ -982,39 +987,47 @@ func TestLogRebirth(t *testing.T) {
 		db      = rawdb.NewMemoryDatabase()
 
 		// this code generates a log
-		code     = common.Hex2Bytes("60606040525b7f24ec1d3ff24c2f6ff210738839dbc339cd45a5294d85c79361016243157aae7b60405180905060405180910390a15b600a8060416000396000f360606040526008565b00")
-		gspec    = &Genesis{Config: params.TestChainConfig, Alloc: GenesisAlloc{addr1: {Balance: big.NewInt(10000000000000)}}}
-		genesis  = gspec.MustCommit(db)
-		signer   = types.NewEIP155Signer(gspec.Config.ChainID)
-		newLogCh = make(chan bool)
+		code        = common.Hex2Bytes("60606040525b7f24ec1d3ff24c2f6ff210738839dbc339cd45a5294d85c79361016243157aae7b60405180905060405180910390a15b600a8060416000396000f360606040526008565b00")
+		gspec       = &Genesis{Config: params.TestChainConfig, Alloc: GenesisAlloc{addr1: {Balance: big.NewInt(10000000000000)}}}
+		genesis     = gspec.MustCommit(db)
+		signer      = types.NewEIP155Signer(gspec.Config.ChainID)
+		newLogCh    = make(chan bool)
+		removeLogCh = make(chan bool)
 	)
 
-	// listenNewLog checks whether the received logs number is equal with expected.
-	listenNewLog := func(sink chan []*types.Log, expect int) {
+	// validateLogEvent checks whether the received logs number is equal with expected.
+	validateLogEvent := func(sink interface{}, result chan bool, expect int) {
+		chanval := reflect.ValueOf(sink)
+		chantyp := chanval.Type()
+		if chantyp.Kind() != reflect.Chan || chantyp.ChanDir()&reflect.RecvDir == 0 {
+			t.Fatalf("invalid channel, given type %v", chantyp)
+		}
 		cnt := 0
+		var recv []reflect.Value
+		timeout := time.After(1 * time.Second)
+		cases := []reflect.SelectCase{{Chan: chanval, Dir: reflect.SelectRecv}, {Chan: reflect.ValueOf(timeout), Dir: reflect.SelectRecv}}
 		for {
-			select {
-			case logs := <-sink:
-				cnt += len(logs)
-			case <-time.NewTimer(5 * time.Second).C:
-				// new logs timeout
-				newLogCh <- false
+			chose, v, _ := reflect.Select(cases)
+			if chose == 1 {
+				// Not enough event received
+				result <- false
 				return
 			}
+			cnt += 1
+			recv = append(recv, v)
 			if cnt == expect {
 				break
-			} else if cnt > expect {
-				// redundant logs received
-				newLogCh <- false
-				return
 			}
 		}
-		select {
-		case <-sink:
-			// redundant logs received
-			newLogCh <- false
-		case <-time.NewTimer(100 * time.Millisecond).C:
-			newLogCh <- true
+		done := time.After(50 * time.Millisecond)
+		cases = cases[:1]
+		cases = append(cases, reflect.SelectCase{Chan: reflect.ValueOf(done), Dir: reflect.SelectRecv})
+		chose, _, _ := reflect.Select(cases)
+		// If chose equal 0, it means receiving redundant events.
+		if chose == 1 {
+			result <- true
+		} else {
+			result <- false
 		}
 	}
 
@@ -1038,12 +1051,12 @@ func TestLogRebirth(t *testing.T) {
 	})
 
 	// Spawn a goroutine to receive log events
-	go listenNewLog(logsCh, 1)
+	go validateLogEvent(logsCh, newLogCh, 1)
 	if _, err := blockchain.InsertChain(chain); err != nil {
 		t.Fatalf("failed to insert chain: %v", err)
 	}
 	if !<-newLogCh {
-		t.Fatalf("failed to receive new log event")
+		t.Fatal("failed to receive new log event")
 	}
 
 	// Generate long reorg chain
@@ -1060,40 +1073,31 @@ func TestLogRebirth(t *testing.T) {
 	})
 
 	// Spawn a goroutine to receive log events
-	go listenNewLog(logsCh, 1)
+	go validateLogEvent(logsCh, newLogCh, 1)
+	go validateLogEvent(rmLogsCh, removeLogCh, 1)
 	if _, err := blockchain.InsertChain(forkChain); err != nil {
 		t.Fatalf("failed to insert forked chain: %v", err)
 	}
 	if !<-newLogCh {
-		t.Fatalf("failed to receive new log event")
+		t.Fatal("failed to receive new log event")
 	}
-	// Ensure removedLog events received
-	select {
-	case ev := <-rmLogsCh:
-		if len(ev.Logs) == 0 {
-			t.Error("expected logs")
-		}
-	case <-time.NewTimer(1 * time.Second).C:
-		t.Fatal("Timeout. There is no RemovedLogsEvent has been sent.")
+	if !<-removeLogCh {
+		t.Fatal("failed to receive removed log event")
 	}
 
 	newBlocks, _ := GenerateChain(params.TestChainConfig, chain[len(chain)-1], ethash.NewFaker(), db, 1, func(i int, gen *BlockGen) {})
-	go listenNewLog(logsCh, 1)
+	go validateLogEvent(logsCh, newLogCh, 1)
+	go validateLogEvent(rmLogsCh, removeLogCh, 1)
 	if _, err := blockchain.InsertChain(newBlocks); err != nil {
 		t.Fatalf("failed to insert forked chain: %v", err)
 	}
-	// Ensure removedLog events received
-	select {
-	case ev := <-rmLogsCh:
-		if len(ev.Logs) == 0 {
-			t.Error("expected logs")
-		}
-	case <-time.NewTimer(1 * time.Second).C:
-		t.Fatal("Timeout. There is no RemovedLogsEvent has been sent.")
-	}
 	// Rebirth logs should omit a newLogEvent
 	if !<-newLogCh {
-		t.Fatalf("failed to receive new log event")
+		t.Fatal("failed to receive new log event")
+	}
+	// Ensure removedLog events received
+	if !<-removeLogCh {
+		t.Fatal("failed to receive removed log event")
 	}
 }
 
@@ -1145,7 +1149,6 @@ func TestSideLogRebirth(t *testing.T) {
 
 	logsCh := make(chan []*types.Log)
 	blockchain.SubscribeLogsEvent(logsCh)
-
 	chain, _ := GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 2, func(i int, gen *BlockGen) {
 		if i == 1 {
 			// Higher block difficulty
diff --git a/miner/worker.go b/miner/worker.go
index 52f8919f0..c95b32042 100644
--- a/miner/worker.go
+++ b/miner/worker.go
@@ -590,7 +590,7 @@ func (w *worker) resultLoop() {
 				logs = append(logs, receipt.Logs...)
 			}
 			// Commit block and state to database.
-			stat, err := w.chain.WriteBlockWithState(block, receipts, task.state)
+			_, err := w.chain.WriteBlockWithState(block, receipts, logs, task.state, true)
 			if err != nil {
 				log.Error("Failed writing block to chain", "err", err)
 				continue
@@ -601,16 +601,6 @@ func (w *worker) resultLoop() {
 			// Broadcast the block and announce chain insertion event
 			w.mux.Post(core.NewMinedBlockEvent{Block: block})
 
-			var events []interface{}
-			switch stat {
-			case core.CanonStatTy:
-				events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
-				events = append(events, core.ChainHeadEvent{Block: block})
-			case core.SideStatTy:
-				events = append(events, core.ChainSideEvent{Block: block})
-			}
-			w.chain.PostChainEvents(events, logs)
-
 			// Insert the block into the set of pending ones to resultLoop for confirmations
 			w.unconfirmed.Insert(block.NumberU64(), block.Hash())
 
@@ -996,3 +986,11 @@ func (w *worker) commit(uncles []*types.Header, interval func(), update bool, st
 	}
 	return nil
 }
+
+// postSideBlock fires a side chain event, only use it for testing.
+func (w *worker) postSideBlock(event core.ChainSideEvent) {
+	select {
+	case w.chainSideCh <- event:
+	case <-w.exitCh:
+	}
+}
diff --git a/miner/worker_test.go b/miner/worker_test.go
index 95d5f64b1..81fb8f1b9 100644
--- a/miner/worker_test.go
+++ b/miner/worker_test.go
@@ -149,9 +149,6 @@ func newTestWorkerBackend(t *testing.T, chainConfig *params.ChainConfig, engine
 
 func (b *testWorkerBackend) BlockChain() *core.BlockChain { return b.chain }
 func (b *testWorkerBackend) TxPool() *core.TxPool         { return b.txPool }
-func (b *testWorkerBackend) PostChainEvents(events []interface{}) {
-	b.chain.PostChainEvents(events, nil)
-}
 
 func (b *testWorkerBackend) newRandomUncle() *types.Block {
 	var parent *types.Block
@@ -243,8 +240,8 @@ func testGenerateBlockAndImport(t *testing.T, isClique bool) {
 	for i := 0; i < 5; i++ {
 		b.txPool.AddLocal(b.newRandomTx(true))
 		b.txPool.AddLocal(b.newRandomTx(false))
-		b.PostChainEvents([]interface{}{core.ChainSideEvent{Block: b.newRandomUncle()}})
-		b.PostChainEvents([]interface{}{core.ChainSideEvent{Block: b.newRandomUncle()}})
+		w.postSideBlock(core.ChainSideEvent{Block: b.newRandomUncle()})
+		w.postSideBlock(core.ChainSideEvent{Block: b.newRandomUncle()})
 		select {
 		case e := <-loopErr:
 			t.Fatal(e)
@@ -295,7 +292,7 @@ func testEmptyWork(t *testing.T, chainConfig *params.ChainConfig, engine consens
 	}
 	w.skipSealHook = func(task *task) bool { return true }
 	w.fullTaskHook = func() {
-		// Aarch64 unit tests are running in a VM on travis, they must
+		// Arch64 unit tests are running in a VM on travis, they must
 		// be given more time to execute.
 		time.Sleep(time.Second)
 	}
@@ -351,7 +348,8 @@ func TestStreamUncleBlock(t *testing.T) {
 		}
 	}
 
-	b.PostChainEvents([]interface{}{core.ChainSideEvent{Block: b.uncleBlock}})
+	w.postSideBlock(core.ChainSideEvent{Block: b.uncleBlock})
+
 	select {
 	case <-taskCh:
 	case <-time.NewTimer(time.Second).C:
-- 
GitLab