diff --git a/core/blockchain.go b/core/blockchain.go
index 1caebae78c5256afe4bc2cc182fed0bfa4303b33..b7dce001bb44f138649736e542044fe20795999d 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -1392,7 +1392,9 @@ func (bc *BlockChain) InsertBodyChain(ctx context.Context, chain types.Blocks) (
if len(chain) == 0 {
return 0, nil
}
-
+ log.Info("Inserting chain",
+ "start", chain[0].NumberU64(), "end", chain[len(chain)-1].NumberU64(),
+ "current", bc.CurrentBlock().Number().Uint64(), "currentHeader", bc.CurrentHeader().Number.Uint64())
// Remove already known canon-blocks
var (
block, prev *types.Block
@@ -2156,106 +2158,6 @@ func (bc *BlockChain) rollbackBadBlock(block *types.Block, receipts types.Receip
}
}
-func (bc *BlockChain) insertHeaderChainStaged(chain []*types.Header) (int, int, int, bool, uint64, error) {
- // The function below will insert headers, and track the lowest block
- // number that have replace the canonical chain. This number will be
- // used to trigger invalidation of further sync stages
- var newCanonical bool
- var lowestCanonicalNumber uint64
- whFunc := func(header *types.Header) error {
- status, err := bc.hc.WriteHeader(context.Background(), header)
- if err != nil {
- return err
- }
- if status == CanonStatTy {
- number := header.Number.Uint64()
- if !newCanonical || number < lowestCanonicalNumber {
- lowestCanonicalNumber = number
- newCanonical = true
- }
- }
- return nil
- }
- // Collect some import statistics to report on
- stats := struct{ processed, ignored int }{}
- // All headers passed verification, import them into the database
- for i, header := range chain {
- // Short circuit insertion if shutting down
- if bc.hc.procInterrupt() {
- log.Debug("Premature abort during headers import")
- return i, stats.processed, stats.ignored, newCanonical, lowestCanonicalNumber, errors.New("aborted")
- }
- // If the header's already known, skip it, otherwise store
- hash := header.Hash()
- if bc.hc.HasHeader(hash, header.Number.Uint64()) {
- externTd := bc.hc.GetTd(bc.hc.chainDb, hash, header.Number.Uint64())
- localTd := bc.hc.GetTd(bc.hc.chainDb, bc.hc.currentHeaderHash, bc.hc.CurrentHeader().Number.Uint64())
- if externTd == nil || externTd.Cmp(localTd) <= 0 {
- stats.ignored++
- continue
- }
- }
- if err := whFunc(header); err != nil {
- return i, stats.processed, stats.ignored, newCanonical, lowestCanonicalNumber, err
- }
- stats.processed++
- }
- // Everything processed without errors
- return len(chain), stats.processed, stats.ignored, newCanonical, lowestCanonicalNumber, nil
-}
-
-// InsertHeaderChainStaged attempts to add the chunk of headers to the headerchain
-// It return the following values
-// 1. (type int) number of items in the input chain that have been successfully processed and committed
-// 2. (type bool) whether the insertion of this chunk has changed the canonical chain
-// 3. (type uint64) the lowest block number that has been displaced from the canonical chain (this is to be used to invalidate further sync stages)
-// 4. (type error) error happed during processing
-func (bc *BlockChain) InsertHeaderChainStaged(chain []*types.Header, checkFreq int) (int, bool, uint64, error) {
- start := time.Now()
- if i, err := bc.hc.ValidateHeaderChain(chain, checkFreq); err != nil {
- return i, false, 0, err
- }
-
- // Make sure only one thread manipulates the chain at once
- bc.chainmu.Lock()
- defer bc.chainmu.Unlock()
-
- if err := bc.addJob(); err != nil {
- return 0, false, 0, err
- }
- defer bc.doneJob()
-
- n, processed, ignored, newCanonical, lowestCanonicalNumber, err := bc.insertHeaderChainStaged(chain)
-
- // Report some public statistics so the user has a clue what's going on
- last := chain[len(chain)-1]
-
- ctx := []interface{}{
- "count", processed, "elapsed", common.PrettyDuration(time.Since(start)),
- "number", last.Number, "hash", last.Hash(),
- }
- if timestamp := time.Unix(int64(last.Time), 0); time.Since(timestamp) > time.Minute {
- ctx = append(ctx, []interface{}{"age", common.PrettyAge(timestamp)}...)
- }
- if ignored > 0 {
- ctx = append(ctx, []interface{}{"ignored", ignored}...)
- }
- log.Info("Imported new block headers", ctx...)
-
- var written uint64
- var err1 error
- if written, err1 = bc.db.Commit(); err1 != nil {
- log.Error("Could not commit chainDb", "error", err1)
- return 0, false, 0, err1
- }
- size, err := bc.db.(ethdb.HasStats).DiskSize(context.Background())
- if err != nil {
- return 0, false, 0, err
- }
- log.Info("Database", "size", size, "written", written)
- return n, newCanonical, lowestCanonicalNumber, err
-}
-
// InsertHeaderChain attempts to insert the given header chain in to the local
// chain, possibly creating a reorg. If an error is returned, it will return the
// index number of the failing header as well an error describing what went wrong.
diff --git a/core/blockchain_test.go b/core/blockchain_test.go
index f3cf936c6e4ff1a2b657ab2f637faa1c48aa14f8..43e1972243aa29f2401c627b67f7a7e07370b01f 100644
--- a/core/blockchain_test.go
+++ b/core/blockchain_test.go
@@ -630,7 +630,7 @@ func testInsertNonceError(t *testing.T, full bool) {
blockchain.engine = ethash.NewFakeFailer(failNum)
blockchain.hc.engine = blockchain.engine
- failRes, _, _, err = blockchain.InsertHeaderChainStaged(headers, 1)
+ failRes, err = blockchain.InsertHeaderChain(headers, 1)
}
// Check that the returned error indicates the failure
if failRes != failAt {
diff --git a/core/generate_index.go b/core/generate_index.go
index bde91b0289e4af90f146107450e1f2dac8c662db..be1512d40542fef5ed1694322f251083a0719bd7 100644
--- a/core/generate_index.go
+++ b/core/generate_index.go
@@ -87,9 +87,9 @@ func (ig *IndexGenerator) GenerateIndex(startBlock, endBlock uint64, changeSetBu
if !ok {
return errors.New("unknown bucket type")
}
- log.Info("Index generation started", "from", startBlock, "csbucket", string(changeSetBucket))
+ log.Info("Index generation started", "from", startBlock, "to", endBlock, "csbucket", string(changeSetBucket))
if endBlock < startBlock && endBlock != 0 {
- return errors.New("endblock greater start block")
+ return fmt.Errorf("generateIndex %s: endBlock %d smaller than startBlock %d", changeSetBucket, endBlock, startBlock)
}
var (
endBlockKey []byte
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 5b5d5632bb015e7f0709f412cf02dbab04ba7b99..ca8afe014e573846c2b61d5f5c01a5400246d347 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -218,9 +218,6 @@ type BlockChain interface {
// InsertBodyChain inserts a batch of blocks into the local chain, without executing them.
InsertBodyChain(context.Context, types.Blocks) (int, error)
- // InsertHeaderChainStaged inserts a batch of headers into the local chain.
- InsertHeaderChainStaged([]*types.Header, int) (int, bool, uint64, error)
-
// InsertReceiptChain inserts a batch of receipts into the local chain.
InsertReceiptChain(types.Blocks, []types.Receipts, uint64) (int, error)
@@ -749,7 +746,6 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header)
floor = int64(-1)
localHeight uint64
remoteHeight = remoteHeader.Number.Uint64()
- err error
)
switch d.mode {
case FullSync:
@@ -757,10 +753,9 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header)
case FastSync:
localHeight = d.blockchain.CurrentFastBlock().NumberU64()
case StagedSync:
- localHeight, err = d.stagedSync.GetLocalHeight(d.stateDB)
- if err != nil {
- return 0, err
- }
+ headHash := rawdb.ReadHeadHeaderHash(d.stateDB)
+ headNumber := rawdb.ReadHeaderNumber(d.stateDB, headHash)
+ localHeight = *headNumber
default:
localHeight = d.lightchain.CurrentHeader().Number.Uint64()
}
@@ -968,10 +963,10 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64)
defer p.log.Debug("Header download terminated")
// Create a timeout timer, and the associated header fetcher
- skeleton := true // Skeleton assembly phase or finishing up
- request := time.Now() // time of the last skeleton fetch request
- timeout := time.NewTimer(0) // timer to dump a non-responsive active peer
- <-timeout.C // timeout channel should be initially empty
+ skeleton := d.mode != StagedSync // Skeleton assembly phase or finishing up
+ request := time.Now() // time of the last skeleton fetch request
+ timeout := time.NewTimer(0) // timer to dump a non-responsive active peer
+ <-timeout.C // timeout channel should be initially empty
defer timeout.Stop()
var ttl time.Duration
@@ -1053,8 +1048,12 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64)
if n := len(headers); n > 0 {
// Retrieve the current head we're at
var head uint64
- if d.mode == LightSync || d.mode == StagedSync {
+ if d.mode == LightSync {
head = d.lightchain.CurrentHeader().Number.Uint64()
+ } else if d.mode == StagedSync {
+ headHash := rawdb.ReadHeadHeaderHash(d.stateDB)
+ headNumber := rawdb.ReadHeaderNumber(d.stateDB, headHash)
+ head = *headNumber
} else {
head = d.blockchain.CurrentFastBlock().NumberU64()
if full := d.blockchain.CurrentBlock().NumberU64(); head < full {
@@ -1486,11 +1485,17 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
// This check cannot be executed "as is" for full imports, since blocks may still be
// queued for processing when the header download completes. However, as long as the
// peer gave us something useful, we're already happy/progressed (above check).
- if d.mode == FastSync || d.mode == LightSync || d.mode == StagedSync {
+ if d.mode == FastSync || d.mode == LightSync {
head := d.lightchain.CurrentHeader()
if td.Cmp(d.lightchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 {
return errStallingPeer
}
+ } else if d.mode == StagedSync {
+ headHash := rawdb.ReadHeadHeaderHash(d.stateDB)
+ headNumber := rawdb.ReadHeaderNumber(d.stateDB, headHash)
+ if td.Cmp(d.lightchain.GetTd(headHash, *headNumber)) > 0 {
+ return errStallingPeer
+ }
}
// Disable any rollback and return
rollback = nil
@@ -1526,13 +1531,13 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
var n int
var err error
if d.mode == StagedSync {
- var newCanonical bool
- var lowestCanonicalNumber uint64
- n, newCanonical, lowestCanonicalNumber, err = d.blockchain.InsertHeaderChainStaged(chunk, frequency)
- if newCanonical && d.headersUnwinder != nil {
+ var reorg bool
+ var forkBlockNumber uint64
+ n, reorg, forkBlockNumber, err = stagedsync.InsertHeaderChain(d.stateDB, chunk, d.blockchain.Config(), d.blockchain.Engine(), frequency)
+ if reorg && d.headersUnwinder != nil {
// Need to unwind further stages
- if err1 := d.headersUnwinder.UnwindTo(lowestCanonicalNumber, d.stateDB); err1 != nil {
- return fmt.Errorf("unwinding all stages to %d: %v", lowestCanonicalNumber, err1)
+ if err1 := d.headersUnwinder.UnwindTo(forkBlockNumber, d.stateDB); err1 != nil {
+ return fmt.Errorf("unwinding all stages to %d: %v", forkBlockNumber, err1)
}
}
} else {
@@ -1548,7 +1553,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
if n > 0 {
rollback = append(rollback, chunk[:n]...)
}
- log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "err", err)
+ log.Error("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "err", err)
return errInvalidChain
}
diff --git a/eth/downloader/downloader_stagedsync.go b/eth/downloader/downloader_stagedsync.go
index 9504c685d716a864c3d05818f016d04ff9a5fddd..c7193d01122bc8cba1a66a56c475ba4868054e09 100644
--- a/eth/downloader/downloader_stagedsync.go
+++ b/eth/downloader/downloader_stagedsync.go
@@ -44,7 +44,7 @@ func (d *Downloader) SpawnBodyDownloadStage(id string, s *stagedsync.StageState,
}
// Skip non relevant records
- if len(k) == 8+len(dbutils.HeaderHashSuffix) && bytes.Equal(k[8:], dbutils.HeaderHashSuffix) {
+ if dbutils.CheckCanonicalKey(k) {
// This is how we learn about canonical chain
blockNumber := binary.BigEndian.Uint64(k[:8])
if blockNumber != currentNumber {
@@ -57,7 +57,7 @@ func (d *Downloader) SpawnBodyDownloadStage(id string, s *stagedsync.StageState,
copy(hashes[hashCount][:], v)
}
hashCount++
- if hashCount > len(hashes) { // We allow hashCount to go +1 over what it should be, to let headers to be read
+ if hashCount >= len(hashes) { // We allow hashCount to go +1 over what it should be, to let headers to be read
return false, nil
}
return true, nil
@@ -84,14 +84,14 @@ func (d *Downloader) SpawnBodyDownloadStage(id string, s *stagedsync.StageState,
return false, nil
}
d.queue.Reset()
- if hashCount <= 1 {
+ if hashCount == 0 {
// No more bodies to download
return false, nil
}
from := origin + 1
d.queue.Prepare(from, d.mode)
- d.queue.ScheduleBodies(from, hashes[:hashCount-1], headers)
- to := from + uint64(hashCount-1)
+ d.queue.ScheduleBodies(from, hashes[:hashCount], headers)
+ to := from + uint64(hashCount)
select {
case d.bodyWakeCh <- true:
@@ -107,7 +107,7 @@ func (d *Downloader) SpawnBodyDownloadStage(id string, s *stagedsync.StageState,
}
if err := d.spawnSync(fetchers); err == nil {
- return true, nil
+ return true, s.Update(d.stateDB, to)
}
log.Error("Trying to rollback 1 block due to error")
return true, s.Update(d.stateDB, origin-1)
@@ -145,11 +145,19 @@ func (d *Downloader) SpawnHeaderDownloadStage(
s *stagedsync.StageState,
u stagedsync.Unwinder,
) error {
+ d.headersState = s
+ d.headersUnwinder = u
d.bodiesState = s
d.bodiesUnwinder = u
defer func() {
+ d.headersState = nil
+ d.headersUnwinder = nil
d.bodiesState = nil
d.bodiesUnwinder = nil
}()
+ d.cancelLock.Lock()
+ d.cancelCh = make(chan struct{})
+ d.cancelLock.Unlock()
+ defer d.Cancel() // No matter what, we can't leave the cancel channel open
return d.spawnSync(fetchers)
}
diff --git a/eth/downloader/downloader_stagedsync_test.go b/eth/downloader/downloader_stagedsync_test.go
index 00126e47d21cdb6f9dc433ebde35ba19027f06ea..23de358e884af6b79a789b0a62d2272c6cd9c6ba 100644
--- a/eth/downloader/downloader_stagedsync_test.go
+++ b/eth/downloader/downloader_stagedsync_test.go
@@ -2,7 +2,6 @@ package downloader
import (
"context"
- "errors"
"fmt"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/consensus"
@@ -29,7 +28,6 @@ type stagedSyncTester struct {
db ethdb.Database
peers map[string]*stagedSyncTesterPeer
genesis *types.Block
- currentHeader *types.Header
lock sync.RWMutex
}
@@ -43,7 +41,6 @@ func newStagedSyncTester() *stagedSyncTester {
tester.genesis = core.GenesisBlockForTesting(tester.db, testAddress, big.NewInt(1000000000))
rawdb.WriteTd(tester.db, tester.genesis.Hash(), tester.genesis.NumberU64(), tester.genesis.Difficulty())
rawdb.WriteBlock(context.Background(), tester.db, testGenesis)
- tester.currentHeader = tester.genesis.Header()
tester.downloader = New(uint64(StagedSync), tester.db, trie.NewSyncBloom(1, tester.db), new(event.TypeMux), tester, nil, tester.dropPeer, ethdb.DefaultStorageMode)
return tester
}
@@ -86,10 +83,9 @@ func (st *stagedSyncTester) CurrentFastBlock() *types.Block {
// CurrentHeader is part of the implementation of BlockChain interface defined in downloader.go
func (st *stagedSyncTester) CurrentHeader() *types.Header {
- st.lock.RLock()
- defer st.lock.RUnlock()
-
- return st.currentHeader
+ hash := rawdb.ReadHeadHeaderHash(st.db)
+ number := rawdb.ReadHeaderNumber(st.db, hash)
+ return rawdb.ReadHeader(st.db, hash, *number)
}
// ExecuteBlockEphemerally is part of the implementation of BlockChain interface defined in downloader.go
@@ -156,74 +152,6 @@ func (st *stagedSyncTester) InsertChain(_ context.Context, blocks types.Blocks)
panic("")
}
-// InsertHeaderChainStaged is part of the implementation of BlockChain interface defined in downloader.go
-func (st *stagedSyncTester) InsertHeaderChainStaged(headers []*types.Header, checkFreq int) (int, bool, uint64, error) {
- st.lock.Lock()
- defer st.lock.Unlock()
-
- if rawdb.ReadHeaderNumber(st.db, headers[0].ParentHash) == nil {
- return 0, false, 0, errors.New("unknown parent")
- }
- for i := 1; i < len(headers); i++ {
- if headers[i].ParentHash != headers[i-1].Hash() {
- return i, false, 0, errors.New("unknown parent")
- }
- }
- var newCanonical bool
- var lowestCanonicalNumber uint64
- // Do a full insert if pre-checks passed
- for i, header := range headers {
- if rawdb.ReadHeaderNumber(st.db, header.Hash()) != nil {
- continue
- }
- if rawdb.ReadHeaderNumber(st.db, header.ParentHash) == nil {
- return i, newCanonical, lowestCanonicalNumber, fmt.Errorf("unknown parent %x", header.ParentHash)
- }
- number := header.Number.Uint64()
- ptd := rawdb.ReadTd(st.db, header.ParentHash, number-1)
- externTd := ptd.Add(ptd, header.Difficulty)
- localTd := rawdb.ReadTd(st.db, st.currentHeader.Hash(), st.currentHeader.Number.Uint64())
- if externTd.Cmp(localTd) > 0 {
- batch := st.db.NewBatch()
- // Delete any canonical number assignments above the new head
- for i := number + 1; ; i++ {
- hash := rawdb.ReadCanonicalHash(st.db, i)
- if hash == (common.Hash{}) {
- break
- }
- rawdb.DeleteCanonicalHash(batch, i)
- }
- // Overwrite any stale canonical number assignments
- var (
- headHash = header.ParentHash
- headNumber = number - 1
- headHeader = rawdb.ReadHeader(st.db, headHash, headNumber)
- )
- for rawdb.ReadCanonicalHash(st.db, headNumber) != headHash {
- rawdb.WriteCanonicalHash(batch, headHash, headNumber)
-
- headHash = headHeader.ParentHash
- headNumber--
- headHeader = rawdb.ReadHeader(st.db, headHash, headNumber)
- }
- if _, err := batch.Commit(); err != nil {
- return i, newCanonical, lowestCanonicalNumber, fmt.Errorf("write header markers into disk: %v", err)
- }
- // Last step update all in-memory head header markers
- st.currentHeader = types.CopyHeader(header)
- if !newCanonical || number < lowestCanonicalNumber {
- lowestCanonicalNumber = number
- newCanonical = true
- }
- }
- rawdb.WriteTd(st.db, header.Hash(), header.Number.Uint64(), externTd)
- rawdb.WriteHeader(context.Background(), st.db, header)
- rawdb.WriteCanonicalHash(st.db, header.Hash(), header.Number.Uint64())
- st.currentHeader = header
- }
- return len(headers), newCanonical, lowestCanonicalNumber, nil
-}
-
// InsertHeaderChain is part of the implementation of BlockChain interface defined in downloader.go
func (st *stagedSyncTester) InsertHeaderChain(headers []*types.Header, checkFreq int) (i int, err error) {
panic("")
@@ -324,7 +252,29 @@ func (stp *stagedSyncTesterPeer) RequestReceipts(hashes []common.Hash) error {
panic("")
}
+func TestStagedBase(t *testing.T) {
+ core.UsePlainStateExecution = true // Stage5 unwinds do not support hashed state
+ // Same as testChainForkLightA but much shorter
+ //testChainBasePlus1 := testChainBase.makeFork(1, false, 1)
+ log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StreamHandler(os.Stderr, log.TerminalFormat(true))))
+ tester := newStagedSyncTester()
+ if err := tester.newPeer("peer", 65, testChainBase); err != nil {
+ t.Fatal(err)
+ }
+ if err := tester.sync("peer", nil); err != nil {
+ t.Fatal(err)
+ }
+}
+
+func TestCompareChains(t *testing.T) {
+ for i := 8388; i < 8395; i++ {
+ fmt.Printf("testChainForkLightA[%d]=%x\n", i, testChainForkLightA.chain[i])
+ fmt.Printf("testChainForkHeavy[%d]=%x\n", i, testChainForkHeavy.chain[i])
+ }
+}
+
func TestUnwind(t *testing.T) {
+ core.UsePlainStateExecution = true // Stage5 unwinds do not support hashed state
log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StreamHandler(os.Stderr, log.TerminalFormat(true))))
tester := newStagedSyncTester()
if err := tester.newPeer("peer", 65, testChainForkLightA); err != nil {
diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go
index 24658092342c285ad8d0f60dde337b1bedef8745..7a40b3ac0d94f4b62d4d08164ccea4c7a7502d6e 100644
--- a/eth/downloader/downloader_test.go
+++ b/eth/downloader/downloader_test.go
@@ -243,10 +243,6 @@ func (dl *downloadTester) GetTd(hash common.Hash, number uint64) *big.Int {
return dl.ownChainTd[hash]
}
-func (dl *downloadTester) InsertHeaderChainStaged([]*types.Header, int) (int, bool, uint64, error) {
- return 0, false, 0, nil
-}
-
// InsertHeaderChain injects a new batch of headers into the simulated chain.
func (dl *downloadTester) InsertHeaderChain(headers []*types.Header, checkFreq int) (i int, err error) {
dl.lock.Lock()
diff --git a/eth/downloader/testchain_test.go b/eth/downloader/testchain_test.go
index 7f40764b3eee84d5f74487e1f5062d7f3e61728a..ded6af5344b539c52dc59bb41db7d3dbbe41a395 100644
--- a/eth/downloader/testchain_test.go
+++ b/eth/downloader/testchain_test.go
@@ -55,7 +55,7 @@ func TestMain(m *testing.M) {
wg.Add(3)
go func() { testChainForkLightA = testChainBase.makeFork(forkLen, false, 1); wg.Done() }()
go func() { testChainForkLightB = testChainBase.makeFork(forkLen, false, 2); wg.Done() }()
- go func() { testChainForkHeavy = testChainBase.makeFork(forkLen, true, 3); wg.Done() }()
+ go func() { testChainForkHeavy = testChainBase.makeFork(forkLen+1, true, 3); wg.Done() }()
wg.Wait()
result := m.Run()
diff --git a/eth/stagedsync/stage_execute.go b/eth/stagedsync/stage_execute.go
index 688b546af6aa25b66ec2639f10806f82807b8abc..14fabee2c01a5d6c5bec1e2cf606cd03db6573e7 100644
--- a/eth/stagedsync/stage_execute.go
+++ b/eth/stagedsync/stage_execute.go
@@ -77,14 +77,9 @@ func (l *progressLogger) Stop() {
}
func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, blockchain BlockChain, limit uint64, quit chan struct{}, dests vm.Cache, writeReceipts bool) error {
- lastProcessedBlockNumber := s.BlockNumber
-
- nextBlockNumber := uint64(0)
-
- atomic.StoreUint64(&nextBlockNumber, lastProcessedBlockNumber+1)
- profileNumber := atomic.LoadUint64(&nextBlockNumber)
+ nextBlockNumber := s.BlockNumber
if prof {
- f, err := os.Create(fmt.Sprintf("cpu-%d.prof", profileNumber))
+ f, err := os.Create(fmt.Sprintf("cpu-%d.prof", s.BlockNumber))
if err != nil {
log.Error("could not create CPU profile", "error", err)
return err
@@ -109,20 +104,23 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, blockchain B
chainConfig := blockchain.Config()
engine := blockchain.Engine()
vmConfig := blockchain.GetVMConfig()
+ log.Info("Attempting to start execution from", "block", atomic.LoadUint64(&nextBlockNumber)+1)
for {
if err := common.Stopped(quit); err != nil {
return err
}
- blockNum := atomic.LoadUint64(&nextBlockNumber)
+ blockNum := atomic.LoadUint64(&nextBlockNumber) + 1
if limit > 0 && blockNum >= limit {
break
}
- block := blockchain.GetBlockByNumber(blockNum)
+ blockHash := rawdb.ReadCanonicalHash(stateDB, blockNum)
+ block := rawdb.ReadBlock(stateDB, blockHash, blockNum)
if block == nil {
break
}
+ atomic.StoreUint64(&nextBlockNumber, blockNum)
type cacheSetter interface {
SetAccountCache(cache *fastcache.Cache)
@@ -166,13 +164,10 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, blockchain B
rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receipts)
}
- if err = s.Update(batch, blockNum); err != nil {
- return err
- }
-
- atomic.AddUint64(&nextBlockNumber, 1)
-
if batch.BatchSize() >= stateDB.IdealBatchSize() {
+ if err = s.Update(batch, blockNum); err != nil {
+ return err
+ }
start := time.Now()
if _, err = batch.Commit(); err != nil {
return err
@@ -181,12 +176,12 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, blockchain B
}
if prof {
- if blockNum-profileNumber == 100000 {
+ if blockNum-s.BlockNumber == 100000 {
// Flush the CPU profiler
pprof.StopCPUProfile()
// And the memory profiler
- f, _ := os.Create(fmt.Sprintf("mem-%d.prof", profileNumber))
+ f, _ := os.Create(fmt.Sprintf("mem-%d.prof", s.BlockNumber))
defer f.Close()
runtime.GC()
if err = pprof.WriteHeapProfile(f); err != nil {
@@ -196,10 +191,13 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, blockchain B
}
}
- _, err := batch.Commit()
- if err != nil {
+ if err := s.Update(batch, atomic.LoadUint64(&nextBlockNumber)); err != nil {
+ return err
+ }
+ if _, err := batch.Commit(); err != nil {
return fmt.Errorf("sync Execute: failed to write batch commit: %v", err)
}
+ log.Info("Completed on", "block", atomic.LoadUint64(&nextBlockNumber))
s.Done()
return nil
}
@@ -277,7 +275,6 @@ func unwindExecutionStage(u *UnwindState, s *StageState, stateDB ethdb.Database)
if err != nil {
return fmt.Errorf("unwind Execute: failed to write db commit: %v", err)
}
-
return nil
}
diff --git a/eth/stagedsync/stage_hashstate.go b/eth/stagedsync/stage_hashstate.go
index 2883021a665ba55ea695457f01b4b7cd08f86a87..790ff9cfaeb54cb0d94e58d271d61ec5cfe4ae2d 100644
--- a/eth/stagedsync/stage_hashstate.go
+++ b/eth/stagedsync/stage_hashstate.go
@@ -49,14 +49,17 @@ func SpawnHashStateStage(s *StageState, stateDB ethdb.Database, datadir string,
return err
}
}
+ if err := verifyRootHash(stateDB, syncHeadNumber); err != nil {
+ return err
+ }
+ return s.DoneAndUpdate(stateDB, syncHeadNumber)
+}
+func verifyRootHash(stateDB ethdb.Database, syncHeadNumber uint64) error {
hash := rawdb.ReadCanonicalHash(stateDB, syncHeadNumber)
- syncHeadBlock := rawdb.ReadBlock(stateDB, hash, syncHeadNumber)
-
- blockNr := syncHeadBlock.Header().Number.Uint64()
-
- log.Info("Validating root hash", "block", blockNr, "blockRoot", syncHeadBlock.Root().Hex())
- loader := trie.NewSubTrieLoader(blockNr)
+ syncHeadHeader := rawdb.ReadHeader(stateDB, hash, syncHeadNumber)
+ log.Info("Validating root hash", "block", syncHeadNumber, "blockRoot", syncHeadHeader.Root.Hex())
+ loader := trie.NewSubTrieLoader(syncHeadNumber)
rl := trie.NewRetainList(0)
subTries, err1 := loader.LoadFromFlatDB(stateDB, rl, nil /*HashCollector*/, [][]byte{nil}, []int{0}, false)
if err1 != nil {
@@ -65,14 +68,26 @@ func SpawnHashStateStage(s *StageState, stateDB ethdb.Database, datadir string,
if len(subTries.Hashes) != 1 {
return fmt.Errorf("expected 1 hash, got %d", len(subTries.Hashes))
}
- if subTries.Hashes[0] != syncHeadBlock.Root() {
- return fmt.Errorf("wrong trie root: %x, expected (from header): %x", subTries.Hashes[0], syncHeadBlock.Root())
+ if subTries.Hashes[0] != syncHeadHeader.Root {
+ return fmt.Errorf("wrong trie root: %x, expected (from header): %x", subTries.Hashes[0], syncHeadHeader.Root)
}
-
- return s.DoneAndUpdate(stateDB, blockNr)
+ return nil
}
func unwindHashStateStage(u *UnwindState, s *StageState, stateDB ethdb.Database, datadir string, quit chan struct{}) error {
+ if err := unwindHashStateStageImpl(u, s, stateDB, datadir, quit); err != nil {
+ return err
+ }
+ if err := verifyRootHash(stateDB, u.UnwindPoint); err != nil {
+ return err
+ }
+ if err := u.Done(stateDB); err != nil {
+ return fmt.Errorf("unwind HashState: reset: %v", err)
+ }
+ return nil
+}
+
+func unwindHashStateStageImpl(u *UnwindState, s *StageState, stateDB ethdb.Database, datadir string, quit chan struct{}) error {
// Currently it does not require unwinding because it does not create any Intemediate Hash records
// and recomputes the state root from scratch
prom := NewPromoter(stateDB, quit)
@@ -83,9 +98,6 @@ func unwindHashStateStage(u *UnwindState, s *StageState, stateDB ethdb.Database,
if err := prom.Unwind(s.BlockNumber, u.UnwindPoint, dbutils.PlainStorageChangeSetBucket); err != nil {
return err
}
- if err := u.Done(stateDB); err != nil {
- return fmt.Errorf("unwind HashCheck: reset: %v", err)
- }
return nil
}
@@ -217,6 +229,20 @@ var promoterMapper = map[string]struct {
KeySize: common.AddressLength + common.IncarnationLength + common.HashLength,
Template: "st-prom-",
},
+ string(dbutils.AccountChangeSetBucket): {
+ WalkerAdapter: func(v []byte) changeset.Walker {
+ return changeset.AccountChangeSetBytes(v)
+ },
+ KeySize: common.HashLength,
+ Template: "acc-prom-",
+ },
+ string(dbutils.StorageChangeSetBucket): {
+ WalkerAdapter: func(v []byte) changeset.Walker {
+ return changeset.StorageChangeSetBytes(v)
+ },
+ KeySize: common.HashLength + common.IncarnationLength + common.HashLength,
+ Template: "st-prom-",
+ },
}
func (p *Promoter) fillChangeSetBuffer(bucket []byte, blockNum, to uint64, changesets []byte, offsets []int) (bool, uint64, []int, error) {
diff --git a/eth/stagedsync/stage_hashstate_test.go b/eth/stagedsync/stage_hashstate_test.go
index 217ab73d3247400d7016d979b5f4a9d82ecff0e9..90ee6107e9c2d0ebacffef506a774d3fb1832063 100644
--- a/eth/stagedsync/stage_hashstate_test.go
+++ b/eth/stagedsync/stage_hashstate_test.go
@@ -112,7 +112,7 @@ func TestUnwindHashed(t *testing.T) {
}
u := &UnwindState{UnwindPoint: 50}
s := &StageState{BlockNumber: 100}
- err = unwindHashStateStage(u, s, db2, getDataDir(), nil)
+ err = unwindHashStateStageImpl(u, s, db2, getDataDir(), nil)
if err != nil {
t.Errorf("error while unwind state: %v", err)
}
diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go
index 6ebddeb00b34336c6546b670f52f89e0060eaf9e..765f8d5a4a07d6e8879926c261a7ffc4d6d2802d 100644
--- a/eth/stagedsync/stage_headers.go
+++ b/eth/stagedsync/stage_headers.go
@@ -1,10 +1,21 @@
package stagedsync
import (
+ "context"
+ "errors"
+ "fmt"
+ "github.com/ledgerwatch/turbo-geth/common"
+ "github.com/ledgerwatch/turbo-geth/consensus"
+ "github.com/ledgerwatch/turbo-geth/core/rawdb"
+ "github.com/ledgerwatch/turbo-geth/core/types"
+ "github.com/ledgerwatch/turbo-geth/ethdb"
+ "github.com/ledgerwatch/turbo-geth/log"
+ "github.com/ledgerwatch/turbo-geth/params"
+ "math/big"
+ mrand "math/rand"
"os"
"runtime/pprof"
-
- "github.com/ledgerwatch/turbo-geth/log"
+ "time"
)
func SpawnHeaderDownloadStage(s *StageState, u Unwinder, d DownloaderGlue, headersFetchers []func() error) error {
@@ -28,3 +39,161 @@ func SpawnHeaderDownloadStage(s *StageState, u Unwinder, d DownloaderGlue, heade
}
return err
}
+
+// Implements consensus.ChainReader
+type ChainReader struct {
+ config *params.ChainConfig
+ db ethdb.Database
+}
+
+// Config retrieves the blockchain's chain configuration.
+func (cr ChainReader) Config() *params.ChainConfig {
+ return cr.config
+}
+
+// CurrentHeader retrieves the current header from the local chain.
+func (cr ChainReader) CurrentHeader() *types.Header {
+ hash := rawdb.ReadHeadHeaderHash(cr.db)
+ number := rawdb.ReadHeaderNumber(cr.db, hash)
+ return rawdb.ReadHeader(cr.db, hash, *number)
+}
+
+// GetHeader retrieves a block header from the database by hash and number.
+func (cr ChainReader) GetHeader(hash common.Hash, number uint64) *types.Header {
+ return rawdb.ReadHeader(cr.db, hash, number)
+}
+
+// GetHeaderByNumber retrieves a block header from the database by number.
+func (cr ChainReader) GetHeaderByNumber(number uint64) *types.Header {
+ hash := rawdb.ReadCanonicalHash(cr.db, number)
+ return rawdb.ReadHeader(cr.db, hash, number)
+}
+
+// GetHeaderByHash retrieves a block header from the database by its hash.
+func (cr ChainReader) GetHeaderByHash(hash common.Hash) *types.Header {
+ number := rawdb.ReadHeaderNumber(cr.db, hash)
+ return rawdb.ReadHeader(cr.db, hash, *number)
+}
+
+// GetBlock retrieves a block from the database by hash and number.
+func (cr ChainReader) GetBlock(hash common.Hash, number uint64) *types.Block {
+ return rawdb.ReadBlock(cr.db, hash, number)
+}
+
+func InsertHeaderChain(db ethdb.Database, headers []*types.Header, config *params.ChainConfig, engine consensus.Engine, checkFreq int) (int, bool, uint64, error) {
+ start := time.Now()
+ if rawdb.ReadHeaderNumber(db, headers[0].ParentHash) == nil {
+ return 0, false, 0, errors.New("unknown parent")
+ }
+ parentTd := rawdb.ReadTd(db, headers[0].ParentHash, headers[0].Number.Uint64()-1)
+ externTd := new(big.Int).Set(parentTd)
+ for i, header := range headers {
+ if i > 0 {
+ if header.ParentHash != headers[i-1].Hash() {
+ return i, false, 0, errors.New("unknown parent")
+ }
+ }
+ externTd = externTd.Add(externTd, header.Difficulty)
+ }
+ // Generate the list of seal verification requests, and start the parallel verifier
+ seals := make([]bool, len(headers))
+ if checkFreq != 0 {
+ // In case of checkFreq == 0 all seals are left false.
+ for i := 0; i < len(seals)/checkFreq; i++ {
+ index := i * checkFreq
+ if index >= len(seals) {
+ index = len(seals) - 1
+ }
+ seals[index] = true
+ }
+ // Last should always be verified to avoid junk.
+ seals[len(seals)-1] = true
+ }
+
+ abort, results := engine.VerifyHeaders(ChainReader{config, db}, headers, seals)
+ defer close(abort)
+
+ // Iterate over the headers and ensure they all check out
+ for i := range headers {
+ // Otherwise wait for headers checks and ensure they pass
+ if err := <-results; err != nil {
+ return i, false, 0, err
+ }
+ }
+ headHash := rawdb.ReadHeadHeaderHash(db)
+ headNumber := rawdb.ReadHeaderNumber(db, headHash)
+ localTd := rawdb.ReadTd(db, headHash, *headNumber)
+ lastHeader := headers[len(headers)-1]
+ // If the total difficulty is higher than our known, add it to the canonical chain
+ // Second clause in the if statement reduces the vulnerability to selfish mining.
+ // Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf
+ reorg := externTd.Cmp(localTd) > 0
+ if !reorg && externTd.Cmp(localTd) == 0 {
+ if lastHeader.Number.Uint64() < *headNumber {
+ reorg = true
+ } else if lastHeader.Number.Uint64() == *headNumber {
+ reorg = mrand.Float64() < 0.5
+ }
+ }
+
+ var deepFork bool // Whether the forkBlock is outside this header chain segment
+ if reorg && headers[0].ParentHash != rawdb.ReadCanonicalHash(db, headers[0].Number.Uint64()-1) {
+ deepFork = true
+ }
+ var forkBlockNumber uint64
+ ignored := 0
+ batch := db.NewBatch()
+ // Do a full insert if pre-checks passed
+ td := new(big.Int).Set(parentTd)
+ for _, header := range headers {
+ if rawdb.ReadHeaderNumber(batch, header.Hash()) != nil {
+ ignored++
+ continue
+ }
+ number := header.Number.Uint64()
+ if reorg && !deepFork && forkBlockNumber == 0 && header.Hash() != rawdb.ReadCanonicalHash(batch, number) {
+ forkBlockNumber = number - 1
+ }
+ if reorg {
+ rawdb.WriteCanonicalHash(batch, header.Hash(), header.Number.Uint64())
+ }
+ td = td.Add(td, header.Difficulty)
+ rawdb.WriteTd(batch, header.Hash(), header.Number.Uint64(), td)
+ rawdb.WriteHeader(context.Background(), batch, header)
+ }
+ if deepFork {
+ forkHeader := rawdb.ReadHeader(batch, headers[0].ParentHash, headers[0].Number.Uint64()-1)
+ forkBlockNumber = forkHeader.Number.Uint64() - 1
+ forkHash := forkHeader.ParentHash
+ for forkHash != rawdb.ReadCanonicalHash(batch, forkBlockNumber) {
+ rawdb.WriteCanonicalHash(batch, forkHash, forkBlockNumber)
+ forkHeader = rawdb.ReadHeader(batch, forkHash, forkBlockNumber)
+ forkBlockNumber = forkHeader.Number.Uint64() - 1
+ forkHash = forkHeader.ParentHash
+ }
+ rawdb.WriteCanonicalHash(batch, headers[0].ParentHash, headers[0].Number.Uint64()-1)
+ }
+ if reorg {
+ // Delete any canonical number assignments above the new head
+ for i := lastHeader.Number.Uint64() + 1; i <= *headNumber; i++ {
+ rawdb.DeleteCanonicalHash(batch, i)
+ }
+ rawdb.WriteHeadHeaderHash(batch, lastHeader.Hash())
+ }
+ if _, err := batch.Commit(); err != nil {
+ return 0, false, 0, fmt.Errorf("write header markers into disk: %w", err)
+ }
+ // Report some public statistics so the user has a clue what's going on
+ ctx := []interface{}{
+ "count", len(headers), "elapsed", common.PrettyDuration(time.Since(start)),
+ "number", lastHeader.Number, "hash", lastHeader.Hash(),
+ }
+ if timestamp := time.Unix(int64(lastHeader.Time), 0); time.Since(timestamp) > time.Minute {
+ ctx = append(ctx, []interface{}{"age", common.PrettyAge(timestamp)}...)
+ }
+ if ignored > 0 {
+ ctx = append(ctx, []interface{}{"ignored", ignored}...)
+ }
+ log.Info("Imported new block headers", ctx...)
+ return len(headers), reorg, forkBlockNumber, nil
+}
diff --git a/eth/stagedsync/stage_indexes.go b/eth/stagedsync/stage_indexes.go
index a9b678c5b236cca35aef0eb30b9530652b68d795..fd274c8a6b678605c380f9e70e9c17842c729000 100644
--- a/eth/stagedsync/stage_indexes.go
+++ b/eth/stagedsync/stage_indexes.go
@@ -1,6 +1,7 @@
package stagedsync
import (
+ "fmt"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/core"
"github.com/ledgerwatch/turbo-geth/ethdb"
@@ -29,7 +30,7 @@ func spawnAccountHistoryIndex(s *StageState, db ethdb.Database, datadir string,
return err
}
- return s.DoneAndUpdate(db, blockNum)
+ return s.DoneAndUpdate(db, endBlock)
}
func spawnStorageHistoryIndex(s *StageState, db ethdb.Database, datadir string, plainState bool, quitCh chan struct{}) error {
@@ -44,7 +45,6 @@ func spawnStorageHistoryIndex(s *StageState, db ethdb.Database, datadir string,
if err != nil {
log.Warn("Execution block error is empty")
}
-
if plainState {
err = ig.GenerateIndex(blockNum, endBlock, dbutils.PlainStorageChangeSetBucket)
} else {
@@ -54,21 +54,39 @@ func spawnStorageHistoryIndex(s *StageState, db ethdb.Database, datadir string,
return err
}
- return s.DoneAndUpdate(db, blockNum)
+ return s.DoneAndUpdate(db, endBlock)
}
-func unwindAccountHistoryIndex(unwindPoint uint64, db ethdb.Database, plainState bool, quitCh chan struct{}) error {
+func unwindAccountHistoryIndex(u *UnwindState, db ethdb.Database, plainState bool, quitCh chan struct{}) error {
ig := core.NewIndexGenerator(db, quitCh)
if plainState {
- return ig.Truncate(unwindPoint, dbutils.PlainAccountChangeSetBucket)
+ if err := ig.Truncate(u.UnwindPoint, dbutils.PlainAccountChangeSetBucket); err != nil {
+ return err
+ }
+ } else {
+ if err := ig.Truncate(u.UnwindPoint, dbutils.AccountChangeSetBucket); err != nil {
+ return err
+ }
+ }
+ if err := u.Done(db); err != nil {
+ return fmt.Errorf("unwind AccountHistorytIndex: %w", err)
}
- return ig.Truncate(unwindPoint, dbutils.AccountChangeSetBucket)
+ return nil
}
-func unwindStorageHistoryIndex(unwindPoint uint64, db ethdb.Database, plainState bool, quitCh chan struct{}) error {
+func unwindStorageHistoryIndex(u *UnwindState, db ethdb.Database, plainState bool, quitCh chan struct{}) error {
ig := core.NewIndexGenerator(db, quitCh)
if plainState {
- return ig.Truncate(unwindPoint, dbutils.PlainStorageChangeSetBucket)
+ if err := ig.Truncate(u.UnwindPoint, dbutils.PlainStorageChangeSetBucket); err != nil {
+ return err
+ }
+ } else {
+ if err := ig.Truncate(u.UnwindPoint, dbutils.StorageChangeSetBucket); err != nil {
+ return err
+ }
+ }
+ if err := u.Done(db); err != nil {
+ return fmt.Errorf("unwind StorageHistorytIndex: %w", err)
}
- return ig.Truncate(unwindPoint, dbutils.StorageChangeSetBucket)
+ return nil
}
diff --git a/eth/stagedsync/stage_senders.go b/eth/stagedsync/stage_senders.go
index 73def2bb4ed2a669eb84cd0eca67b0775fc0b903..6f4f48d7a8fa7dde917bae391b9076bcfc06e16b 100644
--- a/eth/stagedsync/stage_senders.go
+++ b/eth/stagedsync/stage_senders.go
@@ -51,8 +51,7 @@ func spawnRecoverSendersStage(s *StageState, stateDB ethdb.Database, config *par
defer pprof.StopCPUProfile()
}
- lastProcessedBlockNumber := s.BlockNumber
- nextBlockNumber := lastProcessedBlockNumber + 1
+ nextBlockNumber := s.BlockNumber
mutation := stateDB.NewBatch()
defer func() {
@@ -91,23 +90,22 @@ func spawnRecoverSendersStage(s *StageState, stateDB ethdb.Database, config *par
written := 0
for i := 0; i < batchSize; i++ {
- hash := rawdb.ReadCanonicalHash(mutation, nextBlockNumber)
+ hash := rawdb.ReadCanonicalHash(mutation, nextBlockNumber+1)
if hash == emptyHash {
needExit = true
break
}
- body := rawdb.ReadBody(mutation, hash, nextBlockNumber)
+ body := rawdb.ReadBody(mutation, hash, nextBlockNumber+1)
if body == nil {
needExit = true
break
}
+ nextBlockNumber++
blockNumber.SetUint64(nextBlockNumber)
s := types.MakeSigner(config, &blockNumber)
jobs <- &senderRecoveryJob{s, body, hash, nextBlockNumber, nil}
written++
-
- nextBlockNumber++
}
for i := 0; i < written; i++ {
diff --git a/eth/stagedsync/stage_txlookup.go b/eth/stagedsync/stage_txlookup.go
index 6ce22c340a8416d55525a495c451e99365e0ec55..85c5db809971bf7c88e56691fcd6f0bc28f403cc 100644
--- a/eth/stagedsync/stage_txlookup.go
+++ b/eth/stagedsync/stage_txlookup.go
@@ -1,13 +1,18 @@
package stagedsync
import (
+ "bytes"
"encoding/binary"
+ "fmt"
+ "github.com/golang/snappy"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
+ "github.com/ledgerwatch/turbo-geth/common/debug"
"github.com/ledgerwatch/turbo-geth/common/etl"
"github.com/ledgerwatch/turbo-geth/core/rawdb"
+ "github.com/ledgerwatch/turbo-geth/core/types"
"github.com/ledgerwatch/turbo-geth/ethdb"
- "github.com/ledgerwatch/turbo-geth/log"
+ "github.com/ledgerwatch/turbo-geth/rlp"
"math/big"
"runtime"
)
@@ -32,7 +37,7 @@ func spawnTxLookup(s *StageState, db ethdb.Database, dataDir string, quitCh chan
return err
}
- return s.DoneAndUpdate(db, blockNum)
+ return s.DoneAndUpdate(db, syncHeadNumber)
}
func TxLookupTransform(db ethdb.Database, startKey, endKey []byte, quitCh chan struct{}, datadir string, chunks [][]byte) error {
@@ -44,15 +49,12 @@ func TxLookupTransform(db ethdb.Database, startKey, endKey []byte, quitCh chan s
blockHash := common.BytesToHash(v)
body := rawdb.ReadBody(db, blockHash, blocknum)
if body == nil {
- log.Error("empty body", "blocknum", blocknum, "hash", common.BytesToHash(v))
- return nil
- //return fmt.Errorf("empty block %v", blocknum)
+ return fmt.Errorf("tx lookup generation, empty block body %d, hash %x", blocknum, v)
}
blockNumBytes := new(big.Int).SetUint64(blocknum).Bytes()
for _, tx := range body.Transactions {
- err := next(k, tx.Hash().Bytes(), blockNumBytes)
- if err != nil {
+ if err := next(k, tx.Hash().Bytes(), blockNumBytes); err != nil {
return err
}
}
@@ -65,35 +67,42 @@ func TxLookupTransform(db ethdb.Database, startKey, endKey []byte, quitCh chan s
})
}
-func unwindTxLookup(unwindPoint uint64, db ethdb.Database, quitCh chan struct{}) error {
- var blocksToRemove [][]byte
- err := db.Walk(dbutils.HeaderHashKey(unwindPoint), dbutils.HeaderHashKey(unwindPoint), 0, func(k, v []byte) (b bool, e error) {
+func unwindTxLookup(u *UnwindState, db ethdb.Database, quitCh chan struct{}) error {
+ var txsToRemove [][]byte
+ // Remove lookup entries for all blocks above unwindPoint
+ if err := db.Walk(dbutils.BlockBodyPrefix, dbutils.EncodeBlockNumber(u.UnwindPoint+1), 0, func(k, v []byte) (b bool, e error) {
if err := common.Stopped(quitCh); err != nil {
return false, err
}
- if !dbutils.CheckCanonicalKey(k) {
- return true, nil
+ data := v
+ if debug.IsBlockCompressionEnabled() && len(data) > 0 {
+ var err1 error
+ data, err1 = snappy.Decode(nil, v)
+ if err1 != nil {
+ return false, fmt.Errorf("unwindTxLookup, snappy err: %w", err1)
+ }
}
- blocknum := binary.BigEndian.Uint64(k)
- body := rawdb.ReadBody(db, common.BytesToHash(v), blocknum)
- if body == nil {
- log.Error("empty body", "blocknum", blocknum, "hash", common.BytesToHash(v))
- return true, nil
+ body := new(types.Body)
+ if err := rlp.Decode(bytes.NewReader(data), body); err != nil {
+ return false, fmt.Errorf("unwindTxLookup, rlp decode err: %w", err)
}
for _, tx := range body.Transactions {
- blocksToRemove = append(blocksToRemove, tx.Hash().Bytes())
+ txsToRemove = append(txsToRemove, tx.Hash().Bytes())
}
return true, nil
- })
- if err != nil {
+ }); err != nil {
return err
}
- for _, v := range blocksToRemove {
- if err = db.Delete(dbutils.TxLookupPrefix, v); err != nil {
+ // TODO: Do it in a batcn and update the progress
+ for _, v := range txsToRemove {
+ if err := db.Delete(dbutils.TxLookupPrefix, v); err != nil {
return err
}
}
+ if err := u.Done(db); err != nil {
+ return fmt.Errorf("unwind TxLookup: %w", err)
+ }
return nil
}
diff --git a/eth/stagedsync/stagedsync.go b/eth/stagedsync/stagedsync.go
index 72d038b8806b467758bfed8bec7283e7e32b2b9d..3c92812b9782f5bc14de7d6395d1b5f9bab1bee8 100644
--- a/eth/stagedsync/stagedsync.go
+++ b/eth/stagedsync/stagedsync.go
@@ -93,7 +93,7 @@ func PrepareStagedSync(
return spawnAccountHistoryIndex(s, stateDB, datadir, core.UsePlainStateExecution, quitCh)
},
UnwindFunc: func(u *UnwindState, s *StageState) error {
- return unwindAccountHistoryIndex(u.UnwindPoint, stateDB, core.UsePlainStateExecution, quitCh)
+ return unwindAccountHistoryIndex(u, stateDB, core.UsePlainStateExecution, quitCh)
},
},
{
@@ -105,7 +105,7 @@ func PrepareStagedSync(
return spawnStorageHistoryIndex(s, stateDB, datadir, core.UsePlainStateExecution, quitCh)
},
UnwindFunc: func(u *UnwindState, s *StageState) error {
- return unwindStorageHistoryIndex(u.UnwindPoint, stateDB, core.UsePlainStateExecution, quitCh)
+ return unwindStorageHistoryIndex(u, stateDB, core.UsePlainStateExecution, quitCh)
},
},
{
@@ -117,7 +117,7 @@ func PrepareStagedSync(
return spawnTxLookup(s, stateDB, datadir, quitCh)
},
UnwindFunc: func(u *UnwindState, s *StageState) error {
- return unwindTxLookup(u.UnwindPoint, stateDB, quitCh)
+ return unwindTxLookup(u, stateDB, quitCh)
},
},
}