From 12cbf4d6fa2d2bac7542eea3ba72693df6716794 Mon Sep 17 00:00:00 2001 From: ledgerwatch <akhounov@gmail.com> Date: Wed, 1 Sep 2021 22:21:57 +0100 Subject: [PATCH] Ropsten to find correct chain (#2614) * Ropsten to find correct chain * Cleanup and compile fix * Compile fix * Print duration of the unwind, not the timestamp Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local> --- cmd/integration/commands/state_stages.go | 8 +-- eth/stagedsync/default_stages.go | 41 +++++++------- eth/stagedsync/stage.go | 2 +- eth/stagedsync/stage_bodies.go | 11 ++++ eth/stagedsync/stage_headers.go | 69 ++++++++++++++---------- eth/stagedsync/stage_interhashes.go | 5 +- eth/stagedsync/stagebuilder.go | 10 ++-- eth/stagedsync/sync.go | 12 +++-- eth/stagedsync/sync_test.go | 58 ++++++++++---------- 9 files changed, 123 insertions(+), 93 deletions(-) diff --git a/cmd/integration/commands/state_stages.go b/cmd/integration/commands/state_stages.go index d98d4293c3..6428486b46 100644 --- a/cmd/integration/commands/state_stages.go +++ b/cmd/integration/commands/state_stages.go @@ -186,8 +186,8 @@ func syncBySmallSteps(db kv.RwDB, miningConfig params.MiningConfig, ctx context. execCfg := stagedsync.StageExecuteBlocksCfg(db, pm, batchSize, changeSetHook, chainConfig, engine, vmConfig, nil, false, tmpDir) - execUntilFunc := func(execToBlock uint64) func(firstCycle bool, stageState *stagedsync.StageState, unwinder stagedsync.Unwinder, tx kv.RwTx) error { - return func(firstCycle bool, s *stagedsync.StageState, unwinder stagedsync.Unwinder, tx kv.RwTx) error { + execUntilFunc := func(execToBlock uint64) func(firstCycle bool, badBlockUnwind bool, stageState *stagedsync.StageState, unwinder stagedsync.Unwinder, tx kv.RwTx) error { + return func(firstCycle bool, badBlockUnwind bool, s *stagedsync.StageState, unwinder stagedsync.Unwinder, tx kv.RwTx) error { if err := stagedsync.SpawnExecuteBlocksStage(s, unwinder, tx, execToBlock, ctx, execCfg, firstCycle); err != nil { return fmt.Errorf("spawnExecuteBlocksStage: %w", err) } @@ -310,7 +310,7 @@ func syncBySmallSteps(db kv.RwDB, miningConfig params.MiningConfig, ctx context. if miner.MiningConfig.Enabled && nextBlock != nil && nextBlock.Header().Coinbase != (common.Address{}) { miner.MiningConfig.Etherbase = nextBlock.Header().Coinbase miner.MiningConfig.ExtraData = nextBlock.Header().Extra - miningStages.MockExecFunc(stages.MiningCreateBlock, func(firstCycle bool, s *stagedsync.StageState, u stagedsync.Unwinder, tx kv.RwTx) error { + miningStages.MockExecFunc(stages.MiningCreateBlock, func(firstCycle bool, badBlockUnwind bool, s *stagedsync.StageState, u stagedsync.Unwinder, tx kv.RwTx) error { err = stagedsync.SpawnMiningCreateBlockStage(s, tx, stagedsync.StageMiningCreateBlockCfg(db, miner, @@ -501,7 +501,7 @@ func loopExec(db kv.RwDB, ctx context.Context, unwind uint64) error { cfg := stagedsync.StageExecuteBlocksCfg(db, pm, batchSize, nil, chainConfig, engine, vmConfig, nil, false, tmpDBPath) // set block limit of execute stage - sync.MockExecFunc(stages.Execution, func(firstCycle bool, stageState *stagedsync.StageState, unwinder stagedsync.Unwinder, tx kv.RwTx) error { + sync.MockExecFunc(stages.Execution, func(firstCycle bool, badBlockUnwind bool, stageState *stagedsync.StageState, unwinder stagedsync.Unwinder, tx kv.RwTx) error { if err = stagedsync.SpawnExecuteBlocksStage(stageState, sync, tx, to, ctx, cfg, false); err != nil { return fmt.Errorf("spawnExecuteBlocksStage: %w", err) } diff --git a/eth/stagedsync/default_stages.go b/eth/stagedsync/default_stages.go index ce8876d75d..a4d3272319 100644 --- a/eth/stagedsync/default_stages.go +++ b/eth/stagedsync/default_stages.go @@ -33,11 +33,14 @@ func DefaultStages(ctx context.Context, { ID: stages.Headers, Description: "Download headers", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { + if badBlockUnwind { + return nil + } return HeadersForward(s, u, ctx, tx, headers, firstCycle, test) }, Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx) error { - return HeadersUnwind(u, s, tx, headers) + return HeadersUnwind(u, s, tx, headers, test) }, Prune: func(firstCycle bool, p *PruneState, tx kv.RwTx) error { return HeadersPrune(p, tx, headers, ctx) @@ -46,7 +49,7 @@ func DefaultStages(ctx context.Context, { ID: stages.BlockHashes, Description: "Write block hashes", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { return SpawnBlockHashStage(s, tx, blockHashCfg, ctx) }, Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx) error { @@ -61,7 +64,7 @@ func DefaultStages(ctx context.Context, Description: "Create headers snapshot", Disabled: true, DisabledDescription: "Enable by --snapshot.layout", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { return SpawnHeadersSnapshotGenerationStage(s, tx, snapshotHeaders, firstCycle, ctx) }, Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx) error { @@ -74,7 +77,7 @@ func DefaultStages(ctx context.Context, { ID: stages.Bodies, Description: "Download block bodies", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { return BodiesForward(s, u, ctx, tx, bodies, test) }, Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx) error { @@ -89,7 +92,7 @@ func DefaultStages(ctx context.Context, Description: "Create bodies snapshot", Disabled: true, DisabledDescription: "Enable by --snapshot.layout", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { return SpawnBodiesSnapshotGenerationStage(s, tx, snapshotBodies, ctx) }, Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx) error { @@ -102,7 +105,7 @@ func DefaultStages(ctx context.Context, { ID: stages.Senders, Description: "Recover senders from tx signatures", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { return SpawnRecoverSendersStage(senders, s, u, tx, 0, ctx) }, Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx) error { @@ -115,7 +118,7 @@ func DefaultStages(ctx context.Context, { ID: stages.Execution, Description: "Execute blocks w/o hash checks", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { return SpawnExecuteBlocksStage(s, u, tx, 0, ctx, exec, firstCycle) }, Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx) error { @@ -130,7 +133,7 @@ func DefaultStages(ctx context.Context, Description: "Transpile marked EVM contracts to TEVM", Disabled: !sm.Experiments.TEVM, DisabledDescription: "Enable by adding `tevm` to --experiments", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { return SpawnTranspileStage(s, tx, 0, trans, ctx) }, Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx) error { @@ -145,7 +148,7 @@ func DefaultStages(ctx context.Context, Description: "Create state snapshot", Disabled: true, DisabledDescription: "Enable by --snapshot.layout", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { return SpawnStateSnapshotGenerationStage(s, tx, snapshotState, ctx) }, Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx) error { @@ -158,7 +161,7 @@ func DefaultStages(ctx context.Context, { ID: stages.HashState, Description: "Hash the key in the state", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { return SpawnHashStateStage(s, tx, hashState, ctx) }, Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx) error { @@ -171,7 +174,7 @@ func DefaultStages(ctx context.Context, { ID: stages.IntermediateHashes, Description: "Generate intermediate hashes and computing state root", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { _, err := SpawnIntermediateHashesStage(s, u, tx, trieCfg, ctx) return err }, @@ -186,7 +189,7 @@ func DefaultStages(ctx context.Context, ID: stages.CallTraces, Description: "Generate call traces index", DisabledDescription: "Work In Progress", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { return SpawnCallTraces(s, tx, callTraces, ctx) }, Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx) error { @@ -199,7 +202,7 @@ func DefaultStages(ctx context.Context, { ID: stages.AccountHistoryIndex, Description: "Generate account history index", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { return SpawnAccountHistoryIndex(s, tx, history, ctx) }, Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx) error { @@ -212,7 +215,7 @@ func DefaultStages(ctx context.Context, { ID: stages.StorageHistoryIndex, Description: "Generate storage history index", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { return SpawnStorageHistoryIndex(s, tx, history, ctx) }, Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx) error { @@ -225,7 +228,7 @@ func DefaultStages(ctx context.Context, { ID: stages.LogIndex, Description: "Generate receipt logs index", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { return SpawnLogIndex(s, tx, logIndex, ctx) }, Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx) error { @@ -238,7 +241,7 @@ func DefaultStages(ctx context.Context, { ID: stages.TxLookup, Description: "Generate tx lookup index", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { return SpawnTxLookup(s, tx, txLookup, ctx) }, Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx) error { @@ -251,7 +254,7 @@ func DefaultStages(ctx context.Context, { ID: stages.TxPool, Description: "Update transaction pool", - Forward: func(firstCycle bool, s *StageState, _ Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, _ Unwinder, tx kv.RwTx) error { return SpawnTxPool(s, tx, txPool, ctx) }, Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx) error { @@ -264,7 +267,7 @@ func DefaultStages(ctx context.Context, { ID: stages.Finish, Description: "Final: update current block for the RPC API", - Forward: func(firstCycle bool, s *StageState, _ Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, _ Unwinder, tx kv.RwTx) error { return FinishForward(s, tx, finish) }, Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx) error { diff --git a/eth/stagedsync/stage.go b/eth/stagedsync/stage.go index 20102b7c9b..7abdd1acee 100644 --- a/eth/stagedsync/stage.go +++ b/eth/stagedsync/stage.go @@ -9,7 +9,7 @@ import ( // ExecFunc is the execution function for the stage to move forward. // * state - is the current state of the stage and contains stage data. // * unwinder - if the stage needs to cause unwinding, `unwinder` methods can be used. -type ExecFunc func(firstCycle bool, s *StageState, unwinder Unwinder, tx kv.RwTx) error +type ExecFunc func(firstCycle bool, badBlockUnwind bool, s *StageState, unwinder Unwinder, tx kv.RwTx) error // UnwindFunc is the unwinding logic of the stage. // * unwindState - contains information about the unwind itself. diff --git a/eth/stagedsync/stage_bodies.go b/eth/stagedsync/stage_bodies.go index 353753c0b8..24dd9e7a3d 100644 --- a/eth/stagedsync/stage_bodies.go +++ b/eth/stagedsync/stage_bodies.go @@ -94,6 +94,8 @@ func BodiesForward( var req *bodydownload.BodyRequest var peer []byte stopped := false + prevProgress := bodyProgress + noProgressCount := 0 // How many time the progress was printed without actual progress Loop: for !stopped { // TODO: this is incorrect use @@ -174,6 +176,9 @@ Loop: stopped = true break } + if s.BlockNumber > 0 && noProgressCount >= 5 { + break + } timer.Stop() timer = time.NewTimer(1 * time.Second) select { @@ -181,7 +186,13 @@ Loop: stopped = true case <-logEvery.C: deliveredCount, wastedCount := cfg.bd.DeliveryCounts() + if prevProgress == bodyProgress { + noProgressCount++ + } else { + noProgressCount = 0 // Reset, there was progress + } logProgressBodies(logPrefix, bodyProgress, prevDeliveredCount, deliveredCount, prevWastedCount, wastedCount) + prevProgress = bodyProgress prevDeliveredCount = deliveredCount prevWastedCount = wastedCount //log.Info("Timings", "d1", d1, "d2", d2, "d3", d3, "d4", d4, "d5", d5, "d6", d6) diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index c941a63b5f..8e10ed0035 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -118,6 +118,7 @@ func HeadersForward( var peer []byte stopped := false prevProgress := headerProgress + noProgressCount := 0 // How many time the progress was printed without actual progress for !stopped { currentTime := uint64(time.Now().Unix()) req, penalties := cfg.hd.RequestMoreHeaders(currentTime) @@ -160,6 +161,9 @@ func HeadersForward( if len(announces) > 0 { cfg.announceNewHashes(ctx, announces) } + if s.BlockNumber > 0 && noProgressCount >= 5 { + break + } if headerInserter.BestHeaderChanged() { // We do not break unless there best header changed if !initialCycle { // if this is not an initial cycle, we need to react quickly when new headers are coming in @@ -179,6 +183,11 @@ func HeadersForward( stopped = true case <-logEvery.C: progress := cfg.hd.Progress() + if prevProgress == progress { + noProgressCount++ + } else { + noProgressCount = 0 // Reset, there was progress + } logProgressHeaders(logPrefix, prevProgress, progress) prevProgress = progress case <-timer.C: @@ -240,7 +249,7 @@ func fixCanonicalChain(logPrefix string, logEvery *time.Ticker, height uint64, h return nil } -func HeadersUnwind(u *UnwindState, s *StageState, tx kv.RwTx, cfg HeadersCfg) (err error) { +func HeadersUnwind(u *UnwindState, s *StageState, tx kv.RwTx, cfg HeadersCfg, test bool) (err error) { useExternalTx := tx != nil if !useExternalTx { tx, err = cfg.db.BeginRw(context.Background()) @@ -284,42 +293,44 @@ func HeadersUnwind(u *UnwindState, s *StageState, tx kv.RwTx, cfg HeadersCfg) (e } } if badBlock { - // Find header with biggest TD - tdCursor, cErr := tx.Cursor(kv.HeaderTD) - if cErr != nil { - return cErr - } - defer tdCursor.Close() - var k, v []byte - k, v, err = tdCursor.Last() - if err != nil { - return err - } var maxTd big.Int var maxHash common.Hash var maxNum uint64 = 0 - for ; err == nil && k != nil; k, v, err = tdCursor.Prev() { - if len(k) != 40 { - return fmt.Errorf("key in TD table has to be 40 bytes long: %x", k) + if test { // If we are not in the test, we can do searching for the heaviest chain in the next cycle + // Find header with biggest TD + tdCursor, cErr := tx.Cursor(kv.HeaderTD) + if cErr != nil { + return cErr } - var hash common.Hash - copy(hash[:], k[8:]) - if cfg.hd.IsBadHeader(hash) { - continue - } - var td big.Int - if err = rlp.DecodeBytes(v, &td); err != nil { + defer tdCursor.Close() + var k, v []byte + k, v, err = tdCursor.Last() + if err != nil { return err } - if td.Cmp(&maxTd) > 0 { - maxTd.Set(&td) - copy(maxHash[:], k[8:]) - maxNum = binary.BigEndian.Uint64(k[:8]) + for ; err == nil && k != nil; k, v, err = tdCursor.Prev() { + if len(k) != 40 { + return fmt.Errorf("key in TD table has to be 40 bytes long: %x", k) + } + var hash common.Hash + copy(hash[:], k[8:]) + if cfg.hd.IsBadHeader(hash) { + continue + } + var td big.Int + if err = rlp.DecodeBytes(v, &td); err != nil { + return err + } + if td.Cmp(&maxTd) > 0 { + maxTd.Set(&td) + copy(maxHash[:], k[8:]) + maxNum = binary.BigEndian.Uint64(k[:8]) + } + } + if err != nil { + return err } } - if err != nil { - return err - } if maxNum == 0 { maxNum = u.UnwindPoint if maxHash, err = rawdb.ReadCanonicalHash(tx, maxNum); err != nil { diff --git a/eth/stagedsync/stage_interhashes.go b/eth/stagedsync/stage_interhashes.go index b1b6fcefe2..597967c6be 100644 --- a/eth/stagedsync/stage_interhashes.go +++ b/eth/stagedsync/stage_interhashes.go @@ -91,8 +91,9 @@ func SpawnIntermediateHashesStage(s *StageState, u Unwinder, tx kv.RwTx, cfg Tri if cfg.checkRoot && root != expectedRootHash { log.Error(fmt.Sprintf("[%s] Wrong trie root of block %d: %x, expected (from header): %x. Block hash: %x", logPrefix, to, root, expectedRootHash, headerHash)) if to > s.BlockNumber { - log.Warn("Unwinding due to incorrect root hash", "to", to-1) - u.UnwindTo(to-1, headerHash) + unwindTo := (to + s.BlockNumber) / 2 // Binary search for the correct block, biased to the lower numbers + log.Warn("Unwinding due to incorrect root hash", "to", unwindTo) + u.UnwindTo(unwindTo, headerHash) } } else if err = s.Update(tx, to); err != nil { return trie.EmptyRoot, err diff --git a/eth/stagedsync/stagebuilder.go b/eth/stagedsync/stagebuilder.go index 3b4a5494e5..3a2155ffcf 100644 --- a/eth/stagedsync/stagebuilder.go +++ b/eth/stagedsync/stagebuilder.go @@ -33,7 +33,7 @@ func MiningStages( { ID: stages.MiningCreateBlock, Description: "Mining: construct new block from tx pool", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { return SpawnMiningCreateBlockStage(s, tx, createBlockCfg, ctx.Done()) }, Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx) error { return nil }, @@ -42,7 +42,7 @@ func MiningStages( { ID: stages.MiningExecution, Description: "Mining: construct new block from tx pool", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { return SpawnMiningExecStage(s, tx, execCfg, ctx.Done()) }, Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx) error { return nil }, @@ -51,7 +51,7 @@ func MiningStages( { ID: stages.HashState, Description: "Hash the key in the state", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { return SpawnHashStateStage(s, tx, hashStateCfg, ctx) }, Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx) error { return nil }, @@ -60,7 +60,7 @@ func MiningStages( { ID: stages.IntermediateHashes, Description: "Generate intermediate hashes and computing state root", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { stateRoot, err := SpawnIntermediateHashesStage(s, u, tx, trieCfg, ctx) if err != nil { return err @@ -74,7 +74,7 @@ func MiningStages( { ID: stages.MiningFinish, Description: "Mining: create and propagate valid block", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { return SpawnMiningFinishStage(s, tx, finish, ctx.Done()) }, Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx kv.RwTx) error { return nil }, diff --git a/eth/stagedsync/sync.go b/eth/stagedsync/sync.go index 96276aa068..ad4c188aad 100644 --- a/eth/stagedsync/sync.go +++ b/eth/stagedsync/sync.go @@ -192,6 +192,7 @@ func (s *Sync) Run(db kv.RwDB, tx kv.RwTx, firstCycle bool) error { s.prevUnwindPoint = nil s.timings = s.timings[:0] for !s.IsDone() { + var badBlockUnwind bool if s.unwindPoint != nil { for j := 0; j < len(s.unwindOrder); j++ { if s.unwindOrder[j] == nil || s.unwindOrder[j].Disabled || s.unwindOrder[j].Unwind == nil { @@ -203,6 +204,9 @@ func (s *Sync) Run(db kv.RwDB, tx kv.RwTx, firstCycle bool) error { } s.prevUnwindPoint = s.unwindPoint s.unwindPoint = nil + if s.badBlock != (common.Hash{}) { + badBlockUnwind = true + } s.badBlock = common.Hash{} if err := s.SetCurrentStage(s.stages[0].ID); err != nil { return err @@ -226,7 +230,7 @@ func (s *Sync) Run(db kv.RwDB, tx kv.RwTx, firstCycle bool) error { continue } - if err := s.runStage(stage, db, tx, firstCycle); err != nil { + if err := s.runStage(stage, db, tx, firstCycle, badBlockUnwind); err != nil { return err } @@ -302,14 +306,14 @@ func printLogs(tx kv.RwTx, timings []Timing) error { return nil } -func (s *Sync) runStage(stage *Stage, db kv.RwDB, tx kv.RwTx, firstCycle bool) (err error) { +func (s *Sync) runStage(stage *Stage, db kv.RwDB, tx kv.RwTx, firstCycle bool, badBlockUnwind bool) (err error) { start := time.Now() stageState, err := s.StageState(stage.ID, tx, db) if err != nil { return err } - if err = stage.Forward(firstCycle, stageState, s, tx); err != nil { + if err = stage.Forward(firstCycle, badBlockUnwind, stageState, s, tx); err != nil { return fmt.Errorf("[%s] %w", s.LogPrefix(), err) } @@ -348,7 +352,7 @@ func (s *Sync) unwindStage(firstCycle bool, stage *Stage, db kv.RwDB, tx kv.RwTx took := time.Since(t) if took > 60*time.Second { logPrefix := s.LogPrefix() - log.Info(fmt.Sprintf("[%s] Unwind done", logPrefix), "in", t) + log.Info(fmt.Sprintf("[%s] Unwind done", logPrefix), "in", took) } s.timings = append(s.timings, Timing{isUnwind: true, stage: stage.ID, took: time.Since(t)}) return nil diff --git a/eth/stagedsync/sync_test.go b/eth/stagedsync/sync_test.go index 0b9f1e3481..2e5358d9fb 100644 --- a/eth/stagedsync/sync_test.go +++ b/eth/stagedsync/sync_test.go @@ -18,7 +18,7 @@ func TestStagesSuccess(t *testing.T) { { ID: stages.Headers, Description: "Downloading headers", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { flow = append(flow, stages.Headers) return nil }, @@ -26,7 +26,7 @@ func TestStagesSuccess(t *testing.T) { { ID: stages.Bodies, Description: "Downloading block bodiess", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { flow = append(flow, stages.Bodies) return nil }, @@ -34,7 +34,7 @@ func TestStagesSuccess(t *testing.T) { { ID: stages.Senders, Description: "Recovering senders from tx signatures", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { flow = append(flow, stages.Senders) return nil }, @@ -57,7 +57,7 @@ func TestDisabledStages(t *testing.T) { { ID: stages.Headers, Description: "Downloading headers", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { flow = append(flow, stages.Headers) return nil }, @@ -65,7 +65,7 @@ func TestDisabledStages(t *testing.T) { { ID: stages.Bodies, Description: "Downloading block bodiess", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { flow = append(flow, stages.Bodies) return nil }, @@ -74,7 +74,7 @@ func TestDisabledStages(t *testing.T) { { ID: stages.Senders, Description: "Recovering senders from tx signatures", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { flow = append(flow, stages.Senders) return nil }, @@ -98,7 +98,7 @@ func TestErroredStage(t *testing.T) { { ID: stages.Headers, Description: "Downloading headers", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { flow = append(flow, stages.Headers) return nil }, @@ -106,7 +106,7 @@ func TestErroredStage(t *testing.T) { { ID: stages.Bodies, Description: "Downloading block bodiess", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { flow = append(flow, stages.Bodies) return expectedErr }, @@ -114,7 +114,7 @@ func TestErroredStage(t *testing.T) { { ID: stages.Senders, Description: "Recovering senders from tx signatures", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { flow = append(flow, stages.Senders) return nil }, @@ -138,7 +138,7 @@ func TestUnwindSomeStagesBehindUnwindPoint(t *testing.T) { { ID: stages.Headers, Description: "Downloading headers", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { flow = append(flow, stages.Headers) if s.BlockNumber == 0 { return s.Update(tx, 2000) @@ -153,7 +153,7 @@ func TestUnwindSomeStagesBehindUnwindPoint(t *testing.T) { { ID: stages.Bodies, Description: "Downloading block bodiess", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { flow = append(flow, stages.Bodies) if s.BlockNumber == 0 { return s.Update(tx, 1000) @@ -168,7 +168,7 @@ func TestUnwindSomeStagesBehindUnwindPoint(t *testing.T) { { ID: stages.Senders, Description: "Recovering senders from tx signatures", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { if s.BlockNumber == 0 { if err := s.Update(tx, 1700); err != nil { return err @@ -190,7 +190,7 @@ func TestUnwindSomeStagesBehindUnwindPoint(t *testing.T) { { ID: stages.IntermediateHashes, Disabled: true, - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { flow = append(flow, stages.IntermediateHashes) if s.BlockNumber == 0 { return s.Update(tx, 2000) @@ -236,7 +236,7 @@ func TestUnwind(t *testing.T) { { ID: stages.Headers, Description: "Downloading headers", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { flow = append(flow, stages.Headers) if s.BlockNumber == 0 { return s.Update(tx, 2000) @@ -251,7 +251,7 @@ func TestUnwind(t *testing.T) { { ID: stages.Bodies, Description: "Downloading block bodiess", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { flow = append(flow, stages.Bodies) if s.BlockNumber == 0 { return s.Update(tx, 2000) @@ -266,7 +266,7 @@ func TestUnwind(t *testing.T) { { ID: stages.Senders, Description: "Recovering senders from tx signatures", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { flow = append(flow, stages.Senders) if !unwound { unwound = true @@ -283,7 +283,7 @@ func TestUnwind(t *testing.T) { { ID: stages.IntermediateHashes, Disabled: true, - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { flow = append(flow, stages.IntermediateHashes) if s.BlockNumber == 0 { return s.Update(tx, 2000) @@ -344,7 +344,7 @@ func TestUnwindEmptyUnwinder(t *testing.T) { { ID: stages.Headers, Description: "Downloading headers", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { flow = append(flow, stages.Headers) if s.BlockNumber == 0 { return s.Update(tx, 2000) @@ -359,7 +359,7 @@ func TestUnwindEmptyUnwinder(t *testing.T) { { ID: stages.Bodies, Description: "Downloading block bodiess", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { flow = append(flow, stages.Bodies) if s.BlockNumber == 0 { return s.Update(tx, 2000) @@ -370,7 +370,7 @@ func TestUnwindEmptyUnwinder(t *testing.T) { { ID: stages.Senders, Description: "Recovering senders from tx signatures", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { flow = append(flow, stages.Senders) if !unwound { unwound = true @@ -418,7 +418,7 @@ func TestSyncDoTwice(t *testing.T) { { ID: stages.Headers, Description: "Downloading headers", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { flow = append(flow, stages.Headers) return s.Update(tx, s.BlockNumber+100) }, @@ -426,7 +426,7 @@ func TestSyncDoTwice(t *testing.T) { { ID: stages.Bodies, Description: "Downloading block bodiess", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { flow = append(flow, stages.Bodies) return s.Update(tx, s.BlockNumber+200) }, @@ -434,7 +434,7 @@ func TestSyncDoTwice(t *testing.T) { { ID: stages.Senders, Description: "Recovering senders from tx signatures", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { flow = append(flow, stages.Senders) return s.Update(tx, s.BlockNumber+300) }, @@ -476,7 +476,7 @@ func TestStateSyncInterruptRestart(t *testing.T) { { ID: stages.Headers, Description: "Downloading headers", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { flow = append(flow, stages.Headers) return nil }, @@ -484,7 +484,7 @@ func TestStateSyncInterruptRestart(t *testing.T) { { ID: stages.Bodies, Description: "Downloading block bodiess", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { flow = append(flow, stages.Bodies) return expectedErr }, @@ -492,7 +492,7 @@ func TestStateSyncInterruptRestart(t *testing.T) { { ID: stages.Senders, Description: "Recovering senders from tx signatures", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { flow = append(flow, stages.Senders) return nil }, @@ -528,7 +528,7 @@ func TestSyncInterruptLongUnwind(t *testing.T) { { ID: stages.Headers, Description: "Downloading headers", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { flow = append(flow, stages.Headers) if s.BlockNumber == 0 { return s.Update(tx, 2000) @@ -543,7 +543,7 @@ func TestSyncInterruptLongUnwind(t *testing.T) { { ID: stages.Bodies, Description: "Downloading block bodiess", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { flow = append(flow, stages.Bodies) if s.BlockNumber == 0 { return s.Update(tx, 2000) @@ -558,7 +558,7 @@ func TestSyncInterruptLongUnwind(t *testing.T) { { ID: stages.Senders, Description: "Recovering senders from tx signatures", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx kv.RwTx) error { + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, tx kv.RwTx) error { flow = append(flow, stages.Senders) if !unwound { unwound = true -- GitLab