From 58e22c5bc32f7243dca2d979e3d405eb50ac0fed Mon Sep 17 00:00:00 2001 From: Alex Sharov <AskAlexSharov@gmail.com> Date: Sun, 18 Jul 2021 09:23:25 +0700 Subject: [PATCH] remove stage.Done() method (#2390) --- cmd/integration/commands/snapshot_check.go | 8 +- eth/stagedsync/stage.go | 43 +++++-- eth/stagedsync/stage_blockhashes.go | 3 +- eth/stagedsync/stage_bodies.go | 3 +- eth/stagedsync/stage_bodies_snapshot.go | 1 - eth/stagedsync/stage_call_traces.go | 3 +- eth/stagedsync/stage_execute.go | 3 - eth/stagedsync/stage_finish.go | 3 +- eth/stagedsync/stage_hashstate.go | 3 +- eth/stagedsync/stage_headers.go | 4 +- eth/stagedsync/stage_headers_snapshot.go | 5 +- eth/stagedsync/stage_indexes.go | 6 +- eth/stagedsync/stage_interhashes.go | 4 +- eth/stagedsync/stage_log_index.go | 3 +- eth/stagedsync/stage_mining_create_block.go | 1 - eth/stagedsync/stage_mining_exec.go | 2 - eth/stagedsync/stage_mining_finish.go | 2 - eth/stagedsync/stage_senders.go | 4 +- eth/stagedsync/stage_state_snapshot.go | 2 +- eth/stagedsync/stage_tevm.go | 3 - eth/stagedsync/stage_txlookup.go | 2 +- eth/stagedsync/stage_txpool.go | 4 +- eth/stagedsync/stagedsync.go | 1 - eth/stagedsync/{state.go => sync.go} | 2 + .../{state_test.go => sync_test.go} | 120 ++++-------------- eth/stagedsync/unwind.go | 42 ------ eth/stagedsync/unwind_test.go | 1 - 27 files changed, 76 insertions(+), 202 deletions(-) delete mode 100644 eth/stagedsync/stagedsync.go rename eth/stagedsync/{state.go => sync.go} (99%) rename eth/stagedsync/{state_test.go => sync_test.go} (86%) delete mode 100644 eth/stagedsync/unwind.go delete mode 100644 eth/stagedsync/unwind_test.go diff --git a/cmd/integration/commands/snapshot_check.go b/cmd/integration/commands/snapshot_check.go index c515a9242d..34a388847b 100644 --- a/cmd/integration/commands/snapshot_check.go +++ b/cmd/integration/commands/snapshot_check.go @@ -209,24 +209,24 @@ func snapshotCheck(ctx context.Context, db ethdb.RwKV, isNew bool, tmpDir string if isNew { stage3 := stage(sync, tx, stages.Senders) - err = stage3.DoneAndUpdate(tx, lastBlockHeaderNumber) + err = stage3.Update(tx, lastBlockHeaderNumber) if err != nil { return err } stage4 := stage(sync, tx, stages.Execution) - err = stage4.DoneAndUpdate(tx, snapshotBlock) + err = stage4.Update(tx, snapshotBlock) if err != nil { return err } stage5 := stage(sync, tx, stages.HashState) - err = stage5.DoneAndUpdate(tx, snapshotBlock) + err = stage5.Update(tx, snapshotBlock) if err != nil { return err } stage6 := stage(sync, tx, stages.IntermediateHashes) - err = stage6.DoneAndUpdate(tx, snapshotBlock) + err = stage6.Update(tx, snapshotBlock) if err != nil { return err } diff --git a/eth/stagedsync/stage.go b/eth/stagedsync/stage.go index ebc80f0746..f70830a417 100644 --- a/eth/stagedsync/stage.go +++ b/eth/stagedsync/stage.go @@ -1,6 +1,7 @@ package stagedsync import ( + "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" "github.com/ledgerwatch/erigon/ethdb" ) @@ -50,22 +51,40 @@ func (s *StageState) Update(db ethdb.Putter, newBlockNum uint64) error { return stages.SaveStageProgress(db, s.ID, newBlockNum) } -// Done makes sure that the stage execution is complete and proceeds to the next state. -// If Done() is not called and the stage `Forward` exits, then the same stage will be called again. -// This side effect is useful for something like block body download. -func (s *StageState) Done() { - s.state.NextStage() -} - // ExecutionAt gets the current state of the "Execution" stage, which block is currently executed. func (s *StageState) ExecutionAt(db ethdb.KVGetter) (uint64, error) { execution, err := stages.GetStageProgress(db, stages.Execution) return execution, err } -// DoneAndUpdate a convenience method combining both `Done()` and `Update()` calls together. -func (s *StageState) DoneAndUpdate(db ethdb.Putter, newBlockNum uint64) error { - err := stages.SaveStageProgress(db, s.ID, newBlockNum) - s.state.NextStage() - return err +// Unwinder allows the stage to cause an unwind. +type Unwinder interface { + // UnwindTo begins staged sync unwind to the specified block. + UnwindTo(unwindPoint uint64, badBlock common.Hash) +} + +// UnwindState contains the information about unwind. +type UnwindState struct { + ID stages.SyncStage + // UnwindPoint is the block to unwind to. + UnwindPoint uint64 + CurrentBlockNumber uint64 + // If unwind is caused by a bad block, this hash is not empty + BadBlock common.Hash + state *Sync } + +func (u *UnwindState) LogPrefix() string { return u.state.LogPrefix() } + +// Done updates the DB state of the stage. +func (u *UnwindState) Done(db ethdb.Putter) error { + return stages.SaveStageProgress(db, u.ID, u.UnwindPoint) +} + +type PruneState struct { + ID stages.SyncStage + PrunePoint uint64 // PrunePoint is the block to prune to. + state *Sync +} + +func (u *PruneState) LogPrefix() string { return u.state.LogPrefix() } diff --git a/eth/stagedsync/stage_blockhashes.go b/eth/stagedsync/stage_blockhashes.go index 3fa4c6440b..e77576b396 100644 --- a/eth/stagedsync/stage_blockhashes.go +++ b/eth/stagedsync/stage_blockhashes.go @@ -49,7 +49,6 @@ func SpawnBlockHashStage(s *StageState, tx ethdb.RwTx, cfg BlockHashesCfg, ctx c } headHash := rawdb.ReadHeaderByNumber(tx, headNumber).Hash() if s.BlockNumber == headNumber { - s.Done() return nil } @@ -75,7 +74,7 @@ func SpawnBlockHashStage(s *StageState, tx ethdb.RwTx, cfg BlockHashesCfg, ctx c ); err != nil { return err } - if err = s.DoneAndUpdate(tx, headNumber); err != nil { + if err = s.Update(tx, headNumber); err != nil { return err } if !useExternalTx { diff --git a/eth/stagedsync/stage_bodies.go b/eth/stagedsync/stage_bodies.go index 3f7b637059..196c1d5e0a 100644 --- a/eth/stagedsync/stage_bodies.go +++ b/eth/stagedsync/stage_bodies.go @@ -79,7 +79,6 @@ func BodiesForward( } bodyProgress = s.BlockNumber if bodyProgress == headerProgress { - s.Done() return nil } logPrefix := s.LogPrefix() @@ -198,7 +197,7 @@ Loop: d6 += time.Since(start) stageBodiesGauge.Update(int64(bodyProgress)) } - if err := s.DoneAndUpdate(tx, bodyProgress); err != nil { + if err := s.Update(tx, bodyProgress); err != nil { return err } if !useExternalTx { diff --git a/eth/stagedsync/stage_bodies_snapshot.go b/eth/stagedsync/stage_bodies_snapshot.go index 9c5fbe042e..0a31a84e9a 100644 --- a/eth/stagedsync/stage_bodies_snapshot.go +++ b/eth/stagedsync/stage_bodies_snapshot.go @@ -27,7 +27,6 @@ func StageSnapshotBodiesCfg(db ethdb.RwKV, snapshot ethconfig.Snapshot, client * } func SpawnBodiesSnapshotGenerationStage(s *StageState, tx ethdb.RwTx, cfg SnapshotBodiesCfg, ctx context.Context) error { - s.Done() return nil } diff --git a/eth/stagedsync/stage_call_traces.go b/eth/stagedsync/stage_call_traces.go index ea6d8465c7..4b0f3482a8 100644 --- a/eth/stagedsync/stage_call_traces.go +++ b/eth/stagedsync/stage_call_traces.go @@ -63,7 +63,6 @@ func SpawnCallTraces(s *StageState, tx ethdb.RwTx, cfg CallTracesCfg, ctx contex return fmt.Errorf("%s: getting last executed block: %w", logPrefix, err) } if endBlock == s.BlockNumber { - s.Done() return nil } @@ -71,7 +70,7 @@ func SpawnCallTraces(s *StageState, tx ethdb.RwTx, cfg CallTracesCfg, ctx contex return err } - if err := s.DoneAndUpdate(tx, endBlock); err != nil { + if err := s.Update(tx, endBlock); err != nil { return err } if !useExternalTx { diff --git a/eth/stagedsync/stage_execute.go b/eth/stagedsync/stage_execute.go index 5588400adf..1ea3d919d3 100644 --- a/eth/stagedsync/stage_execute.go +++ b/eth/stagedsync/stage_execute.go @@ -243,7 +243,6 @@ func SpawnExecuteBlocksStage(s *StageState, u Unwinder, tx ethdb.RwTx, toBlock u to = min(prevStageProgress, toBlock) } if to <= s.BlockNumber { - s.Done() return nil } logPrefix := s.LogPrefix() @@ -362,7 +361,6 @@ Loop: } log.Info(fmt.Sprintf("[%s] Completed on", logPrefix), "block", stageProgress) - s.Done() return stoppedErr } @@ -436,7 +434,6 @@ func logProgress(logPrefix string, prevBlock uint64, prevTime time.Time, current func UnwindExecutionStage(u *UnwindState, s *StageState, tx ethdb.RwTx, ctx context.Context, cfg ExecuteBlockCfg, initialCycle bool) (err error) { quit := ctx.Done() if u.UnwindPoint >= s.BlockNumber { - s.Done() return nil } useExternalTx := tx != nil diff --git a/eth/stagedsync/stage_finish.go b/eth/stagedsync/stage_finish.go index a914510b8b..19bd47744a 100644 --- a/eth/stagedsync/stage_finish.go +++ b/eth/stagedsync/stage_finish.go @@ -45,7 +45,6 @@ func FinishForward(s *StageState, tx ethdb.RwTx, cfg FinishCfg) error { return err } if executionAt <= s.BlockNumber { - s.Done() return nil } @@ -63,7 +62,7 @@ func FinishForward(s *StageState, tx ethdb.RwTx, cfg FinishCfg) error { } } rawdb.WriteHeadBlockHash(tx, rawdb.ReadHeadHeaderHash(tx)) - err = s.DoneAndUpdate(tx, executionAt) + err = s.Update(tx, executionAt) if err != nil { return err } diff --git a/eth/stagedsync/stage_hashstate.go b/eth/stagedsync/stage_hashstate.go index 6c4959ee12..ac822df452 100644 --- a/eth/stagedsync/stage_hashstate.go +++ b/eth/stagedsync/stage_hashstate.go @@ -47,7 +47,6 @@ func SpawnHashStateStage(s *StageState, tx ethdb.RwTx, cfg HashStateCfg, ctx con if s.BlockNumber == to { // we already did hash check for this block // we don't do the obvious `if s.BlockNumber > to` to support reorgs more naturally - s.Done() return nil } if s.BlockNumber > to { @@ -67,7 +66,7 @@ func SpawnHashStateStage(s *StageState, tx ethdb.RwTx, cfg HashStateCfg, ctx con } } - if err = s.DoneAndUpdate(tx, to); err != nil { + if err = s.Update(tx, to); err != nil { return err } diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index 6f618e5ea3..02d539a26e 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -98,7 +98,6 @@ func HeadersForward( return err } } - s.Done() return nil } @@ -191,7 +190,6 @@ func HeadersForward( return fmt.Errorf("%s: failed to fix canonical chain: %w", logPrefix, err) } } - s.Done() if !useExternalTx { if err := tx.Commit(); err != nil { return err @@ -313,7 +311,7 @@ func HeadersUnwind(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg HeadersCfg) if err = rawdb.WriteHeadHeaderHash(tx, maxHash); err != nil { return err } - if err = s.DoneAndUpdate(tx, maxNum); err != nil { + if err = s.Update(tx, maxNum); err != nil { return err } } diff --git a/eth/stagedsync/stage_headers_snapshot.go b/eth/stagedsync/stage_headers_snapshot.go index fc3dae0d4e..af8c8773d9 100644 --- a/eth/stagedsync/stage_headers_snapshot.go +++ b/eth/stagedsync/stage_headers_snapshot.go @@ -32,7 +32,6 @@ func StageSnapshotHeadersCfg(db ethdb.RwKV, snapshot ethconfig.Snapshot, client func SpawnHeadersSnapshotGenerationStage(s *StageState, tx ethdb.RwTx, cfg SnapshotHeadersCfg, initial bool, ctx context.Context) error { //generate snapshot only on initial mode if !initial { - s.Done() return nil } @@ -49,7 +48,6 @@ func SpawnHeadersSnapshotGenerationStage(s *StageState, tx ethdb.RwTx, cfg Snaps //it's too early for snapshot if to < snapshotsync.EpochSize { - s.Done() return nil } @@ -63,7 +61,6 @@ func SpawnHeadersSnapshotGenerationStage(s *StageState, tx ethdb.RwTx, cfg Snaps //So we have to move headers to snapshot right after headers stage. //but we don't want to block not initial sync if snapshotBlock <= currentSnapshotBlock { - s.Done() return nil } @@ -88,7 +85,7 @@ func SpawnHeadersSnapshotGenerationStage(s *StageState, tx ethdb.RwTx, cfg Snaps if err != nil { return err } - err = s.DoneAndUpdate(tx, snapshotBlock) + err = s.Update(tx, snapshotBlock) if err != nil { return err } diff --git a/eth/stagedsync/stage_indexes.go b/eth/stagedsync/stage_indexes.go index b9bbb285c8..8cbb66a77b 100644 --- a/eth/stagedsync/stage_indexes.go +++ b/eth/stagedsync/stage_indexes.go @@ -55,7 +55,6 @@ func SpawnAccountHistoryIndex(s *StageState, tx ethdb.RwTx, cfg HistoryCfg, ctx return fmt.Errorf("%s: getting last executed block: %w", logPrefix, err) } if executionAt <= s.BlockNumber { - s.Done() return nil } @@ -69,7 +68,7 @@ func SpawnAccountHistoryIndex(s *StageState, tx ethdb.RwTx, cfg HistoryCfg, ctx return fmt.Errorf("[%s] %w", logPrefix, err) } - if err := s.DoneAndUpdate(tx, executionAt); err != nil { + if err := s.Update(tx, executionAt); err != nil { return fmt.Errorf("[%s] %w", logPrefix, err) } @@ -99,7 +98,6 @@ func SpawnStorageHistoryIndex(s *StageState, tx ethdb.RwTx, cfg HistoryCfg, ctx return fmt.Errorf("%s: logs index: getting last executed block: %w", logPrefix, err) } if executionAt <= s.BlockNumber { - s.Done() return nil } @@ -113,7 +111,7 @@ func SpawnStorageHistoryIndex(s *StageState, tx ethdb.RwTx, cfg HistoryCfg, ctx return fmt.Errorf("[%s] %w", logPrefix, err) } - if err := s.DoneAndUpdate(tx, executionAt); err != nil { + if err := s.Update(tx, executionAt); err != nil { return fmt.Errorf("[%s] %w", logPrefix, err) } diff --git a/eth/stagedsync/stage_interhashes.go b/eth/stagedsync/stage_interhashes.go index 3ee1557ae1..896ac6654e 100644 --- a/eth/stagedsync/stage_interhashes.go +++ b/eth/stagedsync/stage_interhashes.go @@ -57,7 +57,6 @@ func SpawnIntermediateHashesStage(s *StageState, u Unwinder, tx ethdb.RwTx, cfg if s.BlockNumber == to { // we already did hash check for this block // we don't do the obvious `if s.BlockNumber > to` to support reorgs more naturally - s.Done() return trie.EmptyRoot, nil } @@ -96,8 +95,7 @@ func SpawnIntermediateHashesStage(s *StageState, u Unwinder, tx ethdb.RwTx, cfg log.Warn("Unwinding due to incorrect root hash", "to", to-1) u.UnwindTo(to-1, headerHash) } - s.Done() - } else if err = s.DoneAndUpdate(tx, to); err != nil { + } else if err = s.Update(tx, to); err != nil { return trie.EmptyRoot, err } } else { diff --git a/eth/stagedsync/stage_log_index.go b/eth/stagedsync/stage_log_index.go index 436a59bb47..b05e20727d 100644 --- a/eth/stagedsync/stage_log_index.go +++ b/eth/stagedsync/stage_log_index.go @@ -59,7 +59,6 @@ func SpawnLogIndex(s *StageState, tx ethdb.RwTx, cfg LogIndexCfg, ctx context.Co return fmt.Errorf("%s: getting last executed block: %w", logPrefix, err) } if endBlock == s.BlockNumber { - s.Done() return nil } @@ -72,7 +71,7 @@ func SpawnLogIndex(s *StageState, tx ethdb.RwTx, cfg LogIndexCfg, ctx context.Co return err } - if err := s.DoneAndUpdate(tx, endBlock); err != nil { + if err := s.Update(tx, endBlock); err != nil { return err } if !useExternalTx { diff --git a/eth/stagedsync/stage_mining_create_block.go b/eth/stagedsync/stage_mining_create_block.go index 1eff9b0e61..53d64fcf03 100644 --- a/eth/stagedsync/stage_mining_create_block.go +++ b/eth/stagedsync/stage_mining_create_block.go @@ -286,7 +286,6 @@ func SpawnMiningCreateBlockStage(s *StageState, tx ethdb.RwTx, cfg MiningCreateB current.LocalTxs = types.NewTransactionsByPriceAndNonce(*signer, localTxs) current.RemoteTxs = types.NewTransactionsByPriceAndNonce(*signer, remoteTxs) - s.Done() fmt.Printf("aa: %t, %t,%t\n", current == nil, cfg.miner.MiningBlock == nil, current.Header == nil) return nil } diff --git a/eth/stagedsync/stage_mining_exec.go b/eth/stagedsync/stage_mining_exec.go index 4e58ff0373..319ebe31e4 100644 --- a/eth/stagedsync/stage_mining_exec.go +++ b/eth/stagedsync/stage_mining_exec.go @@ -68,7 +68,6 @@ func SpawnMiningExecStage(s *StageState, tx ethdb.RwTx, cfg MiningExecCfg, quit // sealing in advance without waiting block execution finished. if !noempty { log.Info("Commit an empty block", "number", current.Header.Number) - s.Done() return nil } @@ -141,7 +140,6 @@ func SpawnMiningExecStage(s *StageState, tx ethdb.RwTx, cfg MiningExecCfg, quit if err := stages.SaveStageProgress(tx, stages.Execution, current.Header.Number.Uint64()); err != nil { return err } - s.Done() return nil } diff --git a/eth/stagedsync/stage_mining_finish.go b/eth/stagedsync/stage_mining_finish.go index f7da6c125b..a1540d8a45 100644 --- a/eth/stagedsync/stage_mining_finish.go +++ b/eth/stagedsync/stage_mining_finish.go @@ -58,7 +58,6 @@ func SpawnMiningFinishStage(s *StageState, tx ethdb.RwTx, cfg MiningFinishCfg, q // Tests may set pre-calculated nonce if block.Header().Nonce.Uint64() != 0 { cfg.miningState.MiningResultCh <- block - s.Done() return nil } @@ -77,6 +76,5 @@ func SpawnMiningFinishStage(s *StageState, tx ethdb.RwTx, cfg MiningFinishCfg, q log.Warn("Block sealing failed", "err", err) } - s.Done() return nil } diff --git a/eth/stagedsync/stage_senders.go b/eth/stagedsync/stage_senders.go index 0285b0c8b3..44d237a072 100644 --- a/eth/stagedsync/stage_senders.go +++ b/eth/stagedsync/stage_senders.go @@ -72,7 +72,6 @@ func SpawnRecoverSendersStage(cfg SendersCfg, s *StageState, u Unwinder, tx ethd to = min(prevStageProgress, toBlock) } if to <= s.BlockNumber { - s.Done() return nil } logPrefix := s.LogPrefix() @@ -238,7 +237,6 @@ Loop: if to > s.BlockNumber { u.UnwindTo(minBlockNum-1, minBlockHash) } - s.Done() } else { if err := collectorSenders.Load(logPrefix, tx, dbutils.Senders, @@ -252,7 +250,7 @@ Loop: ); err != nil { return err } - if err = s.DoneAndUpdate(tx, to); err != nil { + if err = s.Update(tx, to); err != nil { return err } } diff --git a/eth/stagedsync/stage_state_snapshot.go b/eth/stagedsync/stage_state_snapshot.go index afe906f101..31fcccdbd2 100644 --- a/eth/stagedsync/stage_state_snapshot.go +++ b/eth/stagedsync/stage_state_snapshot.go @@ -36,7 +36,7 @@ func SpawnStateSnapshotGenerationStage(s *StageState, tx ethdb.RwTx, cfg Snapsho defer tx.Rollback() } - if err = s.DoneAndUpdate(tx, 0); err != nil { + if err = s.Update(tx, 0); err != nil { return err } if !useExternalTx { diff --git a/eth/stagedsync/stage_tevm.go b/eth/stagedsync/stage_tevm.go index 26b0630246..a3ff26db33 100644 --- a/eth/stagedsync/stage_tevm.go +++ b/eth/stagedsync/stage_tevm.go @@ -63,7 +63,6 @@ func SpawnTranspileStage(s *StageState, tx ethdb.RwTx, toBlock uint64, cfg Trans } if to <= s.BlockNumber { - s.Done() return nil } @@ -88,8 +87,6 @@ func SpawnTranspileStage(s *StageState, tx ethdb.RwTx, toBlock uint64, cfg Trans } } - s.Done() - if to > s.BlockNumber+16 { log.Info(fmt.Sprintf("[%s] Completed on", logPrefix), "block", toBlock) } diff --git a/eth/stagedsync/stage_txlookup.go b/eth/stagedsync/stage_txlookup.go index 25e378feae..1cdc32224b 100644 --- a/eth/stagedsync/stage_txlookup.go +++ b/eth/stagedsync/stage_txlookup.go @@ -59,7 +59,7 @@ func SpawnTxLookup(s *StageState, tx ethdb.RwTx, cfg TxLookupCfg, ctx context.Co if err = TxLookupTransform(logPrefix, tx, startKey, dbutils.EncodeBlockNumber(syncHeadNumber), quitCh, cfg); err != nil { return err } - if err = s.DoneAndUpdate(tx, syncHeadNumber); err != nil { + if err = s.Update(tx, syncHeadNumber); err != nil { return err } diff --git a/eth/stagedsync/stage_txpool.go b/eth/stagedsync/stage_txpool.go index 60d827026a..868c9cf450 100644 --- a/eth/stagedsync/stage_txpool.go +++ b/eth/stagedsync/stage_txpool.go @@ -44,7 +44,6 @@ func SpawnTxPool(s *StageState, tx ethdb.RwTx, cfg TxPoolCfg, ctx context.Contex return err } if to == s.BlockNumber { - s.Done() return nil } @@ -73,7 +72,7 @@ func SpawnTxPool(s *StageState, tx ethdb.RwTx, cfg TxPoolCfg, ctx context.Contex pending, queued := cfg.pool.Stats() log.Info(fmt.Sprintf("[%s] Transaction stats", logPrefix), "pending", pending, "queued", queued) } - if err := s.DoneAndUpdate(tx, to); err != nil { + if err := s.Update(tx, to); err != nil { return err } if !useExternalTx { @@ -151,7 +150,6 @@ func incrementalTxPoolUpdate(logPrefix string, from, to uint64, pool *core.TxPoo func UnwindTxPool(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg TxPoolCfg, ctx context.Context) (err error) { if u.UnwindPoint >= s.BlockNumber { - s.Done() return nil } useExternalTx := tx != nil diff --git a/eth/stagedsync/stagedsync.go b/eth/stagedsync/stagedsync.go deleted file mode 100644 index a66ba101af..0000000000 --- a/eth/stagedsync/stagedsync.go +++ /dev/null @@ -1 +0,0 @@ -package stagedsync diff --git a/eth/stagedsync/state.go b/eth/stagedsync/sync.go similarity index 99% rename from eth/stagedsync/state.go rename to eth/stagedsync/sync.go index f2dbd892af..643eb9e1dd 100644 --- a/eth/stagedsync/state.go +++ b/eth/stagedsync/sync.go @@ -197,6 +197,8 @@ func (s *Sync) Run(db ethdb.RwKV, tx ethdb.RwTx, firstCycle bool) error { return err } timings = append(timings, string(stage.ID), time.Since(t)) + + s.NextStage() } if err := printLogs(tx, timings); err != nil { diff --git a/eth/stagedsync/state_test.go b/eth/stagedsync/sync_test.go similarity index 86% rename from eth/stagedsync/state_test.go rename to eth/stagedsync/sync_test.go index 19591fa166..56c79dcb29 100644 --- a/eth/stagedsync/state_test.go +++ b/eth/stagedsync/sync_test.go @@ -11,7 +11,7 @@ import ( "github.com/stretchr/testify/assert" ) -func TestStateStagesSuccess(t *testing.T) { +func TestStagesSuccess(t *testing.T) { flow := make([]stages.SyncStage, 0) s := []*Stage{ { @@ -19,7 +19,6 @@ func TestStateStagesSuccess(t *testing.T) { Description: "Downloading headers", Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Headers) - s.Done() return nil }, }, @@ -28,7 +27,6 @@ func TestStateStagesSuccess(t *testing.T) { Description: "Downloading block bodiess", Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Bodies) - s.Done() return nil }, }, @@ -37,7 +35,6 @@ func TestStateStagesSuccess(t *testing.T) { Description: "Recovering senders from tx signatures", Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Senders) - s.Done() return nil }, }, @@ -53,7 +50,7 @@ func TestStateStagesSuccess(t *testing.T) { assert.Equal(t, expectedFlow, flow) } -func TestStateDisabledStages(t *testing.T) { +func TestDisabledStages(t *testing.T) { flow := make([]stages.SyncStage, 0) s := []*Stage{ { @@ -61,7 +58,6 @@ func TestStateDisabledStages(t *testing.T) { Description: "Downloading headers", Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Headers) - s.Done() return nil }, }, @@ -70,7 +66,6 @@ func TestStateDisabledStages(t *testing.T) { Description: "Downloading block bodiess", Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Bodies) - s.Done() return nil }, Disabled: true, @@ -80,7 +75,6 @@ func TestStateDisabledStages(t *testing.T) { Description: "Recovering senders from tx signatures", Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Senders) - s.Done() return nil }, }, @@ -96,53 +90,7 @@ func TestStateDisabledStages(t *testing.T) { assert.Equal(t, expectedFlow, flow) } -func TestStateRepeatedStage(t *testing.T) { - repeatStageTwo := 2 - flow := make([]stages.SyncStage, 0) - s := []*Stage{ - { - ID: stages.Headers, - Description: "Downloading headers", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { - flow = append(flow, stages.Headers) - s.Done() - return nil - }, - }, - { - ID: stages.Bodies, - Description: "Downloading block bodiess", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { - flow = append(flow, stages.Bodies) - repeatStageTwo-- - if repeatStageTwo < 0 { - s.Done() - } - return nil - }, - }, - { - ID: stages.Senders, - Description: "Recovering senders from tx signatures", - Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { - flow = append(flow, stages.Senders) - s.Done() - return nil - }, - }, - } - state := New(s, nil) - db, tx := kv.NewTestTx(t) - err := state.Run(db, tx, true) - assert.NoError(t, err) - - expectedFlow := []stages.SyncStage{ - stages.Headers, stages.Bodies, stages.Bodies, stages.Bodies, stages.Senders, - } - assert.Equal(t, expectedFlow, flow) -} - -func TestStateErroredStage(t *testing.T) { +func TestErroredStage(t *testing.T) { flow := make([]stages.SyncStage, 0) expectedErr := errors.New("test error") s := []*Stage{ @@ -151,7 +99,6 @@ func TestStateErroredStage(t *testing.T) { Description: "Downloading headers", Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Headers) - s.Done() return nil }, }, @@ -160,7 +107,6 @@ func TestStateErroredStage(t *testing.T) { Description: "Downloading block bodiess", Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Bodies) - s.Done() return expectedErr }, }, @@ -169,7 +115,6 @@ func TestStateErroredStage(t *testing.T) { Description: "Recovering senders from tx signatures", Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Senders) - s.Done() return nil }, }, @@ -185,7 +130,7 @@ func TestStateErroredStage(t *testing.T) { assert.Equal(t, expectedFlow, flow) } -func TestStateUnwindSomeStagesBehindUnwindPoint(t *testing.T) { +func TestUnwindSomeStagesBehindUnwindPoint(t *testing.T) { flow := make([]stages.SyncStage, 0) unwound := false s := []*Stage{ @@ -195,9 +140,8 @@ func TestStateUnwindSomeStagesBehindUnwindPoint(t *testing.T) { Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Headers) if s.BlockNumber == 0 { - return s.DoneAndUpdate(tx, 2000) + return s.Update(tx, 2000) } - s.Done() return nil }, Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { @@ -211,9 +155,8 @@ func TestStateUnwindSomeStagesBehindUnwindPoint(t *testing.T) { Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Bodies) if s.BlockNumber == 0 { - return s.DoneAndUpdate(tx, 1000) + return s.Update(tx, 1000) } - s.Done() return nil }, Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { @@ -236,7 +179,6 @@ func TestStateUnwindSomeStagesBehindUnwindPoint(t *testing.T) { u.UnwindTo(1500, common.Hash{}) return nil } - s.Done() return nil }, Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { @@ -250,9 +192,8 @@ func TestStateUnwindSomeStagesBehindUnwindPoint(t *testing.T) { Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.IntermediateHashes) if s.BlockNumber == 0 { - return s.DoneAndUpdate(tx, 2000) + return s.Update(tx, 2000) } - s.Done() return nil }, Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { @@ -287,7 +228,7 @@ func TestStateUnwindSomeStagesBehindUnwindPoint(t *testing.T) { assert.Equal(t, 1500, int(stageState.BlockNumber)) } -func TestStateUnwind(t *testing.T) { +func TestUnwind(t *testing.T) { flow := make([]stages.SyncStage, 0) unwound := false s := []*Stage{ @@ -297,9 +238,8 @@ func TestStateUnwind(t *testing.T) { Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Headers) if s.BlockNumber == 0 { - return s.DoneAndUpdate(tx, 2000) + return s.Update(tx, 2000) } - s.Done() return nil }, Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { @@ -313,9 +253,8 @@ func TestStateUnwind(t *testing.T) { Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Bodies) if s.BlockNumber == 0 { - return s.DoneAndUpdate(tx, 2000) + return s.Update(tx, 2000) } - s.Done() return nil }, Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { @@ -331,9 +270,8 @@ func TestStateUnwind(t *testing.T) { if !unwound { unwound = true u.UnwindTo(500, common.Hash{}) - return s.DoneAndUpdate(tx, 3000) + return s.Update(tx, 3000) } - s.Done() return nil }, Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { @@ -347,9 +285,8 @@ func TestStateUnwind(t *testing.T) { Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.IntermediateHashes) if s.BlockNumber == 0 { - return s.DoneAndUpdate(tx, 2000) + return s.Update(tx, 2000) } - s.Done() return nil }, Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { @@ -399,7 +336,7 @@ func TestStateUnwind(t *testing.T) { } -func TestStateUnwindEmptyUnwinder(t *testing.T) { +func TestUnwindEmptyUnwinder(t *testing.T) { flow := make([]stages.SyncStage, 0) unwound := false s := []*Stage{ @@ -409,9 +346,8 @@ func TestStateUnwindEmptyUnwinder(t *testing.T) { Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Headers) if s.BlockNumber == 0 { - return s.DoneAndUpdate(tx, 2000) + return s.Update(tx, 2000) } - s.Done() return nil }, Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { @@ -425,9 +361,8 @@ func TestStateUnwindEmptyUnwinder(t *testing.T) { Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Bodies) if s.BlockNumber == 0 { - return s.DoneAndUpdate(tx, 2000) + return s.Update(tx, 2000) } - s.Done() return nil }, }, @@ -439,9 +374,8 @@ func TestStateUnwindEmptyUnwinder(t *testing.T) { if !unwound { unwound = true u.UnwindTo(500, common.Hash{}) - return s.DoneAndUpdate(tx, 3000) + return s.Update(tx, 3000) } - s.Done() return nil }, Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { @@ -476,7 +410,7 @@ func TestStateUnwindEmptyUnwinder(t *testing.T) { assert.Equal(t, 500, int(stageState.BlockNumber)) } -func TestStateSyncDoTwice(t *testing.T) { +func TestSyncDoTwice(t *testing.T) { flow := make([]stages.SyncStage, 0) s := []*Stage{ @@ -485,7 +419,7 @@ func TestStateSyncDoTwice(t *testing.T) { Description: "Downloading headers", Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Headers) - return s.DoneAndUpdate(tx, s.BlockNumber+100) + return s.Update(tx, s.BlockNumber+100) }, }, { @@ -493,7 +427,7 @@ func TestStateSyncDoTwice(t *testing.T) { Description: "Downloading block bodiess", Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Bodies) - return s.DoneAndUpdate(tx, s.BlockNumber+200) + return s.Update(tx, s.BlockNumber+200) }, }, { @@ -501,7 +435,7 @@ func TestStateSyncDoTwice(t *testing.T) { Description: "Recovering senders from tx signatures", Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Senders) - return s.DoneAndUpdate(tx, s.BlockNumber+300) + return s.Update(tx, s.BlockNumber+300) }, }, } @@ -543,7 +477,6 @@ func TestStateSyncInterruptRestart(t *testing.T) { Description: "Downloading headers", Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Headers) - s.Done() return nil }, }, @@ -552,7 +485,6 @@ func TestStateSyncInterruptRestart(t *testing.T) { Description: "Downloading block bodiess", Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Bodies) - s.Done() return expectedErr }, }, @@ -561,7 +493,6 @@ func TestStateSyncInterruptRestart(t *testing.T) { Description: "Recovering senders from tx signatures", Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Senders) - s.Done() return nil }, }, @@ -584,7 +515,7 @@ func TestStateSyncInterruptRestart(t *testing.T) { assert.Equal(t, expectedFlow, flow) } -func TestStateSyncInterruptLongUnwind(t *testing.T) { +func TestSyncInterruptLongUnwind(t *testing.T) { // interrupt a stage that is too big to fit in one batch, // so the db is in inconsitent state, so we have to restart with that flow := make([]stages.SyncStage, 0) @@ -599,9 +530,8 @@ func TestStateSyncInterruptLongUnwind(t *testing.T) { Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Headers) if s.BlockNumber == 0 { - return s.DoneAndUpdate(tx, 2000) + return s.Update(tx, 2000) } - s.Done() return nil }, Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { @@ -615,9 +545,8 @@ func TestStateSyncInterruptLongUnwind(t *testing.T) { Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Bodies) if s.BlockNumber == 0 { - return s.DoneAndUpdate(tx, 2000) + return s.Update(tx, 2000) } - s.Done() return nil }, Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { @@ -633,9 +562,8 @@ func TestStateSyncInterruptLongUnwind(t *testing.T) { if !unwound { unwound = true u.UnwindTo(500, common.Hash{}) - return s.DoneAndUpdate(tx, 3000) + return s.Update(tx, 3000) } - s.Done() return nil }, Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { diff --git a/eth/stagedsync/unwind.go b/eth/stagedsync/unwind.go deleted file mode 100644 index 5095948e74..0000000000 --- a/eth/stagedsync/unwind.go +++ /dev/null @@ -1,42 +0,0 @@ -package stagedsync - -import ( - "github.com/ledgerwatch/erigon/common" - "github.com/ledgerwatch/erigon/eth/stagedsync/stages" - "github.com/ledgerwatch/erigon/ethdb" -) - -// Unwinder allows the stage to cause an unwind. -type Unwinder interface { - // UnwindTo begins staged sync unwind to the specified block. - UnwindTo(unwindPoint uint64, badBlock common.Hash) -} - -// UnwindState contains the information about unwind. -type UnwindState struct { - ID stages.SyncStage - // UnwindPoint is the block to unwind to. - UnwindPoint uint64 - CurrentBlockNumber uint64 - // If unwind is caused by a bad block, this hash is not empty - BadBlock common.Hash - state *Sync -} - -func (u *UnwindState) LogPrefix() string { return u.state.LogPrefix() } - -// Done updates the DB state of the stage. -func (u *UnwindState) Done(db ethdb.Putter) error { - return stages.SaveStageProgress(db, u.ID, u.UnwindPoint) -} - -// Skip ignores the unwind -func (u *UnwindState) Skip() {} - -type PruneState struct { - ID stages.SyncStage - PrunePoint uint64 // PrunePoint is the block to prune to. - state *Sync -} - -func (u *PruneState) LogPrefix() string { return u.state.LogPrefix() } diff --git a/eth/stagedsync/unwind_test.go b/eth/stagedsync/unwind_test.go deleted file mode 100644 index a66ba101af..0000000000 --- a/eth/stagedsync/unwind_test.go +++ /dev/null @@ -1 +0,0 @@ -package stagedsync -- GitLab