From c113723fdb9d9fa4c8ac57777f9aecfe97391453 Mon Sep 17 00:00:00 2001
From: gary rong <garyrong0905@gmail.com>
Date: Wed, 8 May 2019 19:30:36 +0800
Subject: [PATCH] core: handle importing known blocks more gracefully (#19417)

* core: import known blocks if they can be inserted as canonical blocks

* core: insert knowns blocks

* core: remove useless

* core: doesn't process head block in reorg function
---
 core/blockchain.go      |  72 ++++++--
 core/blockchain_test.go | 356 +++++++++++++++++++++++++++-------------
 2 files changed, 303 insertions(+), 125 deletions(-)

diff --git a/core/blockchain.go b/core/blockchain.go
index 9fa5b09f9..8cbef7173 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -883,10 +883,10 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
 
 var lastWrite uint64
 
-// WriteBlockWithoutState writes only the block and its metadata to the database,
+// writeBlockWithoutState writes only the block and its metadata to the database,
 // but does not write any state. This is used to construct competing side forks
 // up to the point where they exceed the canonical total difficulty.
-func (bc *BlockChain) WriteBlockWithoutState(block *types.Block, td *big.Int) (err error) {
+func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (err error) {
 	bc.wg.Add(1)
 	defer bc.wg.Done()
 
@@ -898,6 +898,26 @@ func (bc *BlockChain) WriteBlockWithoutState(block *types.Block, td *big.Int) (e
 	return nil
 }
 
+// writeKnownBlock updates the head block flag with a known block
+// and introduces chain reorg if necessary.
+func (bc *BlockChain) writeKnownBlock(block *types.Block) error {
+	bc.wg.Add(1)
+	defer bc.wg.Done()
+
+	current := bc.CurrentBlock()
+	if block.ParentHash() != current.Hash() {
+		if err := bc.reorg(current, block); err != nil {
+			return err
+		}
+	}
+	// Write the positional metadata for transaction/receipt lookups.
+	// Preimages here is empty, ignore it.
+	rawdb.WriteTxLookupEntries(bc.db, block)
+
+	bc.insert(block)
+	return nil
+}
+
 // WriteBlockWithState writes the block and all associated state to the database.
 func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, err error) {
 	bc.chainmu.Lock()
@@ -1139,18 +1159,42 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
 		//   2. The block is stored as a sidechain, and is lying about it's stateroot, and passes a stateroot
 		// 	    from the canonical chain, which has not been verified.
 		// Skip all known blocks that are behind us
-		current := bc.CurrentBlock().NumberU64()
-		for block != nil && err == ErrKnownBlock && current >= block.NumberU64() {
+		var (
+			current  = bc.CurrentBlock()
+			localTd  = bc.GetTd(current.Hash(), current.NumberU64())
+			externTd = bc.GetTd(block.ParentHash(), block.NumberU64()-1) // The first block can't be nil
+		)
+		for block != nil && err == ErrKnownBlock {
+			externTd = new(big.Int).Add(externTd, block.Difficulty())
+			if localTd.Cmp(externTd) < 0 {
+				break
+			}
 			stats.ignored++
 			block, err = it.next()
 		}
+		// The remaining blocks are still known blocks, the only scenario here is:
+		// During the fast sync, the pivot point is already submitted but rollback
+		// happens. Then node resets the head full block to a lower height via `rollback`
+		// and leaves a few known blocks in the database.
+		//
+		// When node runs a fast sync again, it can re-import a batch of known blocks via
+		// `insertChain` while a part of them have higher total difficulty than current
+		// head full block(new pivot point).
+		for block != nil && err == ErrKnownBlock {
+			if err := bc.writeKnownBlock(block); err != nil {
+				return it.index, nil, nil, err
+			}
+			lastCanon = block
+
+			block, err = it.next()
+		}
 		// Falls through to the block import
 	}
 
 	switch {
 	// First block is pruned, insert as sidechain and reorg only if TD grows enough
 	case err == consensus.ErrPrunedAncestor:
-		return bc.insertSidechain(block, it)
+		return bc.insertSideChain(block, it)
 
 	// First block is future, shove it (and all children) to the future queue (unknown ancestor)
 	case err == consensus.ErrFutureBlock || (err == consensus.ErrUnknownAncestor && bc.futureBlocks.Contains(it.first().ParentHash())):
@@ -1313,13 +1357,13 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
 	return it.index, events, coalescedLogs, err
 }
 
-// insertSidechain is called when an import batch hits upon a pruned ancestor
+// insertSideChain is called when an import batch hits upon a pruned ancestor
 // error, which happens when a sidechain with a sufficiently old fork-block is
 // found.
 //
 // The method writes all (header-and-body-valid) blocks to disk, then tries to
 // switch over to the new chain if the TD exceeded the current chain.
-func (bc *BlockChain) insertSidechain(block *types.Block, it *insertIterator) (int, []interface{}, []*types.Log, error) {
+func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (int, []interface{}, []*types.Log, error) {
 	var (
 		externTd *big.Int
 		current  = bc.CurrentBlock()
@@ -1360,7 +1404,7 @@ func (bc *BlockChain) insertSidechain(block *types.Block, it *insertIterator) (i
 
 		if !bc.HasBlock(block.Hash(), block.NumberU64()) {
 			start := time.Now()
-			if err := bc.WriteBlockWithoutState(block, externTd); err != nil {
+			if err := bc.writeBlockWithoutState(block, externTd); err != nil {
 				return it.index, nil, nil, err
 			}
 			log.Debug("Injected sidechain block", "number", block.Number(), "hash", block.Hash(),
@@ -1524,15 +1568,15 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
 	} else {
 		log.Error("Impossible reorg, please file an issue", "oldnum", oldBlock.Number(), "oldhash", oldBlock.Hash(), "newnum", newBlock.Number(), "newhash", newBlock.Hash())
 	}
-	// Insert the new chain, taking care of the proper incremental order
-	for i := len(newChain) - 1; i >= 0; i-- {
+	// Insert the new chain(except the head block(reverse order)),
+	// taking care of the proper incremental order.
+	for i := len(newChain) - 1; i >= 1; i-- {
 		// Insert the block in the canonical way, re-writing history
 		bc.insert(newChain[i])
 
-		// Collect reborn logs due to chain reorg (except head block (reverse order))
-		if i != 0 {
-			collectLogs(newChain[i].Hash(), false)
-		}
+		// Collect reborn logs due to chain reorg
+		collectLogs(newChain[i].Hash(), false)
+
 		// Write lookup entries for hash based transaction/receipt searches
 		rawdb.WriteTxLookupEntries(bc.db, newChain[i])
 		addedTxs = append(addedTxs, newChain[i].Transactions()...)
diff --git a/core/blockchain_test.go b/core/blockchain_test.go
index 80a949d90..70e3207f5 100644
--- a/core/blockchain_test.go
+++ b/core/blockchain_test.go
@@ -1564,117 +1564,6 @@ func TestLargeReorgTrieGC(t *testing.T) {
 	}
 }
 
-// Benchmarks large blocks with value transfers to non-existing accounts
-func benchmarkLargeNumberOfValueToNonexisting(b *testing.B, numTxs, numBlocks int, recipientFn func(uint64) common.Address, dataFn func(uint64) []byte) {
-	var (
-		signer          = types.HomesteadSigner{}
-		testBankKey, _  = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
-		testBankAddress = crypto.PubkeyToAddress(testBankKey.PublicKey)
-		bankFunds       = big.NewInt(100000000000000000)
-		gspec           = Genesis{
-			Config: params.TestChainConfig,
-			Alloc: GenesisAlloc{
-				testBankAddress: {Balance: bankFunds},
-				common.HexToAddress("0xc0de"): {
-					Code:    []byte{0x60, 0x01, 0x50},
-					Balance: big.NewInt(0),
-				}, // push 1, pop
-			},
-			GasLimit: 100e6, // 100 M
-		}
-	)
-	// Generate the original common chain segment and the two competing forks
-	engine := ethash.NewFaker()
-	db := rawdb.NewMemoryDatabase()
-	genesis := gspec.MustCommit(db)
-
-	blockGenerator := func(i int, block *BlockGen) {
-		block.SetCoinbase(common.Address{1})
-		for txi := 0; txi < numTxs; txi++ {
-			uniq := uint64(i*numTxs + txi)
-			recipient := recipientFn(uniq)
-			//recipient := common.BigToAddress(big.NewInt(0).SetUint64(1337 + uniq))
-			tx, err := types.SignTx(types.NewTransaction(uniq, recipient, big.NewInt(1), params.TxGas, big.NewInt(1), nil), signer, testBankKey)
-			if err != nil {
-				b.Error(err)
-			}
-			block.AddTx(tx)
-		}
-	}
-
-	shared, _ := GenerateChain(params.TestChainConfig, genesis, engine, db, numBlocks, blockGenerator)
-	b.StopTimer()
-	b.ResetTimer()
-	for i := 0; i < b.N; i++ {
-		// Import the shared chain and the original canonical one
-		diskdb := rawdb.NewMemoryDatabase()
-		gspec.MustCommit(diskdb)
-
-		chain, err := NewBlockChain(diskdb, nil, params.TestChainConfig, engine, vm.Config{}, nil)
-		if err != nil {
-			b.Fatalf("failed to create tester chain: %v", err)
-		}
-		b.StartTimer()
-		if _, err := chain.InsertChain(shared); err != nil {
-			b.Fatalf("failed to insert shared chain: %v", err)
-		}
-		b.StopTimer()
-		if got := chain.CurrentBlock().Transactions().Len(); got != numTxs*numBlocks {
-			b.Fatalf("Transactions were not included, expected %d, got %d", numTxs*numBlocks, got)
-
-		}
-	}
-}
-func BenchmarkBlockChain_1x1000ValueTransferToNonexisting(b *testing.B) {
-	var (
-		numTxs    = 1000
-		numBlocks = 1
-	)
-
-	recipientFn := func(nonce uint64) common.Address {
-		return common.BigToAddress(big.NewInt(0).SetUint64(1337 + nonce))
-	}
-	dataFn := func(nonce uint64) []byte {
-		return nil
-	}
-
-	benchmarkLargeNumberOfValueToNonexisting(b, numTxs, numBlocks, recipientFn, dataFn)
-}
-func BenchmarkBlockChain_1x1000ValueTransferToExisting(b *testing.B) {
-	var (
-		numTxs    = 1000
-		numBlocks = 1
-	)
-	b.StopTimer()
-	b.ResetTimer()
-
-	recipientFn := func(nonce uint64) common.Address {
-		return common.BigToAddress(big.NewInt(0).SetUint64(1337))
-	}
-	dataFn := func(nonce uint64) []byte {
-		return nil
-	}
-
-	benchmarkLargeNumberOfValueToNonexisting(b, numTxs, numBlocks, recipientFn, dataFn)
-}
-func BenchmarkBlockChain_1x1000Executions(b *testing.B) {
-	var (
-		numTxs    = 1000
-		numBlocks = 1
-	)
-	b.StopTimer()
-	b.ResetTimer()
-
-	recipientFn := func(nonce uint64) common.Address {
-		return common.BigToAddress(big.NewInt(0).SetUint64(0xc0de))
-	}
-	dataFn := func(nonce uint64) []byte {
-		return nil
-	}
-
-	benchmarkLargeNumberOfValueToNonexisting(b, numTxs, numBlocks, recipientFn, dataFn)
-}
-
 // Tests that importing a very large side fork, which is larger than the canon chain,
 // but where the difficulty per block is kept low: this means that it will not
 // overtake the 'canon' chain until after it's passed canon by about 200 blocks.
@@ -1812,6 +1701,138 @@ func TestPrunedImportSide(t *testing.T) {
 	testSideImport(t, 1, -10)
 }
 
+func TestInsertKnownHeaders(t *testing.T)      { testInsertKnownChainData(t, "headers") }
+func TestInsertKnownReceiptChain(t *testing.T) { testInsertKnownChainData(t, "receipts") }
+func TestInsertKnownBlocks(t *testing.T)       { testInsertKnownChainData(t, "blocks") }
+
+func testInsertKnownChainData(t *testing.T, typ string) {
+	engine := ethash.NewFaker()
+
+	db := rawdb.NewMemoryDatabase()
+	genesis := new(Genesis).MustCommit(db)
+
+	blocks, receipts := GenerateChain(params.TestChainConfig, genesis, engine, db, 32, func(i int, b *BlockGen) { b.SetCoinbase(common.Address{1}) })
+	// A longer chain but total difficulty is lower.
+	blocks2, receipts2 := GenerateChain(params.TestChainConfig, blocks[len(blocks)-1], engine, db, 65, func(i int, b *BlockGen) { b.SetCoinbase(common.Address{1}) })
+	// A shorter chain but total difficulty is higher.
+	blocks3, receipts3 := GenerateChain(params.TestChainConfig, blocks[len(blocks)-1], engine, db, 64, func(i int, b *BlockGen) {
+		b.SetCoinbase(common.Address{1})
+		b.OffsetTime(-9) // A higher difficulty
+	})
+
+	// Import the shared chain and the original canonical one
+	chaindb := rawdb.NewMemoryDatabase()
+	new(Genesis).MustCommit(chaindb)
+
+	chain, err := NewBlockChain(chaindb, nil, params.TestChainConfig, engine, vm.Config{}, nil)
+	if err != nil {
+		t.Fatalf("failed to create tester chain: %v", err)
+	}
+
+	var (
+		inserter func(blocks []*types.Block, receipts []types.Receipts) error
+		asserter func(t *testing.T, block *types.Block)
+	)
+	headers, headers2 := make([]*types.Header, 0, len(blocks)), make([]*types.Header, 0, len(blocks2))
+	for _, block := range blocks {
+		headers = append(headers, block.Header())
+	}
+	for _, block := range blocks2 {
+		headers2 = append(headers2, block.Header())
+	}
+	if typ == "headers" {
+		inserter = func(blocks []*types.Block, receipts []types.Receipts) error {
+			headers := make([]*types.Header, 0, len(blocks))
+			for _, block := range blocks {
+				headers = append(headers, block.Header())
+			}
+			_, err := chain.InsertHeaderChain(headers, 1)
+			return err
+		}
+		asserter = func(t *testing.T, block *types.Block) {
+			if chain.CurrentHeader().Hash() != block.Hash() {
+				t.Fatalf("current head header mismatch, have %v, want %v", chain.CurrentHeader().Hash().Hex(), block.Hash().Hex())
+			}
+		}
+	} else if typ == "receipts" {
+		inserter = func(blocks []*types.Block, receipts []types.Receipts) error {
+			headers := make([]*types.Header, 0, len(blocks))
+			for _, block := range blocks {
+				headers = append(headers, block.Header())
+			}
+			_, err := chain.InsertHeaderChain(headers, 1)
+			if err != nil {
+				return err
+			}
+			_, err = chain.InsertReceiptChain(blocks, receipts)
+			return err
+		}
+		asserter = func(t *testing.T, block *types.Block) {
+			if chain.CurrentFastBlock().Hash() != block.Hash() {
+				t.Fatalf("current head fast block mismatch, have %v, want %v", chain.CurrentFastBlock().Hash().Hex(), block.Hash().Hex())
+			}
+		}
+	} else {
+		inserter = func(blocks []*types.Block, receipts []types.Receipts) error {
+			_, err := chain.InsertChain(blocks)
+			return err
+		}
+		asserter = func(t *testing.T, block *types.Block) {
+			if chain.CurrentBlock().Hash() != block.Hash() {
+				t.Fatalf("current head block mismatch, have %v, want %v", chain.CurrentBlock().Hash().Hex(), block.Hash().Hex())
+			}
+		}
+	}
+
+	if err := inserter(blocks, receipts); err != nil {
+		t.Fatalf("failed to insert chain data: %v", err)
+	}
+
+	// Reimport the chain data again. All the imported
+	// chain data are regarded "known" data.
+	if err := inserter(blocks, receipts); err != nil {
+		t.Fatalf("failed to insert chain data: %v", err)
+	}
+	asserter(t, blocks[len(blocks)-1])
+
+	// Import a long canonical chain with some known data as prefix.
+	var rollback []common.Hash
+	for i := len(blocks) / 2; i < len(blocks); i++ {
+		rollback = append(rollback, blocks[i].Hash())
+	}
+	chain.Rollback(rollback)
+	if err := inserter(append(blocks, blocks2...), append(receipts, receipts2...)); err != nil {
+		t.Fatalf("failed to insert chain data: %v", err)
+	}
+	asserter(t, blocks2[len(blocks2)-1])
+
+	// Import a heavier shorter but higher total difficulty chain with some known data as prefix.
+	if err := inserter(append(blocks, blocks3...), append(receipts, receipts3...)); err != nil {
+		t.Fatalf("failed to insert chain data: %v", err)
+	}
+	asserter(t, blocks3[len(blocks3)-1])
+
+	// Import a longer but lower total difficulty chain with some known data as prefix.
+	if err := inserter(append(blocks, blocks2...), append(receipts, receipts2...)); err != nil {
+		t.Fatalf("failed to insert chain data: %v", err)
+	}
+	// The head shouldn't change.
+	asserter(t, blocks3[len(blocks3)-1])
+
+	if typ != "headers" {
+		// Rollback the heavier chain and re-insert the longer chain again
+		for i := 0; i < len(blocks3); i++ {
+			rollback = append(rollback, blocks3[i].Hash())
+		}
+		chain.Rollback(rollback)
+
+		if err := inserter(append(blocks, blocks2...), append(receipts, receipts2...)); err != nil {
+			t.Fatalf("failed to insert chain data: %v", err)
+		}
+		asserter(t, blocks2[len(blocks2)-1])
+	}
+}
+
 // getLongAndShortChains returns two chains,
 // A is longer, B is heavier
 func getLongAndShortChains() (*BlockChain, []*types.Block, []*types.Block, error) {
@@ -1931,3 +1952,116 @@ func TestReorgToShorterRemovesCanonMappingHeaderChain(t *testing.T) {
 		t.Errorf("expected header to be gone: %v", headerByNum.Number.Uint64())
 	}
 }
+
+// Benchmarks large blocks with value transfers to non-existing accounts
+func benchmarkLargeNumberOfValueToNonexisting(b *testing.B, numTxs, numBlocks int, recipientFn func(uint64) common.Address, dataFn func(uint64) []byte) {
+	var (
+		signer          = types.HomesteadSigner{}
+		testBankKey, _  = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
+		testBankAddress = crypto.PubkeyToAddress(testBankKey.PublicKey)
+		bankFunds       = big.NewInt(100000000000000000)
+		gspec           = Genesis{
+			Config: params.TestChainConfig,
+			Alloc: GenesisAlloc{
+				testBankAddress: {Balance: bankFunds},
+				common.HexToAddress("0xc0de"): {
+					Code:    []byte{0x60, 0x01, 0x50},
+					Balance: big.NewInt(0),
+				}, // push 1, pop
+			},
+			GasLimit: 100e6, // 100 M
+		}
+	)
+	// Generate the original common chain segment and the two competing forks
+	engine := ethash.NewFaker()
+	db := rawdb.NewMemoryDatabase()
+	genesis := gspec.MustCommit(db)
+
+	blockGenerator := func(i int, block *BlockGen) {
+		block.SetCoinbase(common.Address{1})
+		for txi := 0; txi < numTxs; txi++ {
+			uniq := uint64(i*numTxs + txi)
+			recipient := recipientFn(uniq)
+			tx, err := types.SignTx(types.NewTransaction(uniq, recipient, big.NewInt(1), params.TxGas, big.NewInt(1), nil), signer, testBankKey)
+			if err != nil {
+				b.Error(err)
+			}
+			block.AddTx(tx)
+		}
+	}
+
+	shared, _ := GenerateChain(params.TestChainConfig, genesis, engine, db, numBlocks, blockGenerator)
+	b.StopTimer()
+	b.ResetTimer()
+	for i := 0; i < b.N; i++ {
+		// Import the shared chain and the original canonical one
+		diskdb := rawdb.NewMemoryDatabase()
+		gspec.MustCommit(diskdb)
+
+		chain, err := NewBlockChain(diskdb, nil, params.TestChainConfig, engine, vm.Config{}, nil)
+		if err != nil {
+			b.Fatalf("failed to create tester chain: %v", err)
+		}
+		b.StartTimer()
+		if _, err := chain.InsertChain(shared); err != nil {
+			b.Fatalf("failed to insert shared chain: %v", err)
+		}
+		b.StopTimer()
+		if got := chain.CurrentBlock().Transactions().Len(); got != numTxs*numBlocks {
+			b.Fatalf("Transactions were not included, expected %d, got %d", numTxs*numBlocks, got)
+
+		}
+	}
+}
+
+func BenchmarkBlockChain_1x1000ValueTransferToNonexisting(b *testing.B) {
+	var (
+		numTxs    = 1000
+		numBlocks = 1
+	)
+
+	recipientFn := func(nonce uint64) common.Address {
+		return common.BigToAddress(big.NewInt(0).SetUint64(1337 + nonce))
+	}
+	dataFn := func(nonce uint64) []byte {
+		return nil
+	}
+
+	benchmarkLargeNumberOfValueToNonexisting(b, numTxs, numBlocks, recipientFn, dataFn)
+}
+
+func BenchmarkBlockChain_1x1000ValueTransferToExisting(b *testing.B) {
+	var (
+		numTxs    = 1000
+		numBlocks = 1
+	)
+	b.StopTimer()
+	b.ResetTimer()
+
+	recipientFn := func(nonce uint64) common.Address {
+		return common.BigToAddress(big.NewInt(0).SetUint64(1337))
+	}
+	dataFn := func(nonce uint64) []byte {
+		return nil
+	}
+
+	benchmarkLargeNumberOfValueToNonexisting(b, numTxs, numBlocks, recipientFn, dataFn)
+}
+
+func BenchmarkBlockChain_1x1000Executions(b *testing.B) {
+	var (
+		numTxs    = 1000
+		numBlocks = 1
+	)
+	b.StopTimer()
+	b.ResetTimer()
+
+	recipientFn := func(nonce uint64) common.Address {
+		return common.BigToAddress(big.NewInt(0).SetUint64(0xc0de))
+	}
+	dataFn := func(nonce uint64) []byte {
+		return nil
+	}
+
+	benchmarkLargeNumberOfValueToNonexisting(b, numTxs, numBlocks, recipientFn, dataFn)
+}
-- 
GitLab