diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index 83b04738cb6ffcb29bcc75c4a783823e7697647c..b4269596c9217cced0d001b684d4abd84c20d79d 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -390,7 +390,7 @@ func stageSenders(db ethdb.RwKV, ctx context.Context) error { cfg := stagedsync.StageSendersCfg(db, chainConfig, tmpdir) if unwind > 0 { u := &stagedsync.UnwindState{ID: stages.Senders, UnwindPoint: stage3.BlockNumber - unwind} - err = stagedsync.UnwindSendersStage(u, stage3, tx, cfg) + err = stagedsync.UnwindSendersStage(u, tx, cfg, ctx) if err != nil { return err } diff --git a/common/dbutils/bucket.go b/common/dbutils/bucket.go index 12c144e30af3aaac90a92b3aaf3310f023f05d98..c5efd2c3176da088f504736663dbdbf4395fc4e9 100644 --- a/common/dbutils/bucket.go +++ b/common/dbutils/bucket.go @@ -236,6 +236,8 @@ const ( SyncStageProgress = "SyncStage" // Position to where to unwind sync stages: stageName -> stageData SyncStageUnwind = "SyncStageUnwind" + // Position to where to prune sync stages: stageName -> stageData + SyncStagePrune = "SyncStagePrune" CliqueBucket = "Clique" CliqueSeparateBucket = "CliqueSeparate" diff --git a/eth/stagedsync/default_stages.go b/eth/stagedsync/default_stages.go index e4d3449673000232785755b29e5214f33c6428a6..ba70cc041436c225936d2a4f833a5bc698d61b6b 100644 --- a/eth/stagedsync/default_stages.go +++ b/eth/stagedsync/default_stages.go @@ -36,21 +36,27 @@ func DefaultStages(ctx context.Context, { ID: stages.Headers, Description: "Download headers", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { return HeadersForward(s, u, ctx, tx, headers, firstCycle, test) }, - UnwindFunc: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { + Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { return HeadersUnwind(u, s, tx, headers) }, + Prune: func(firstCycle bool, p *PruneState, tx ethdb.RwTx) error { + return HeadersPrune(p, tx, headers, ctx) + }, }, { ID: stages.BlockHashes, Description: "Write block hashes", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { return SpawnBlockHashStage(s, tx, blockHashCfg, ctx) }, - UnwindFunc: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { - return UnwindBlockHashStage(u, s, tx, blockHashCfg, ctx) + Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { + return UnwindBlockHashStage(u, tx, blockHashCfg, ctx) + }, + Prune: func(firstCycle bool, p *PruneState, tx ethdb.RwTx) error { + return PruneBlockHashStage(p, tx, blockHashCfg, ctx) }, }, { @@ -58,21 +64,27 @@ func DefaultStages(ctx context.Context, Description: "Create headers snapshot", Disabled: true, DisabledDescription: "Enable by --snapshot.layout", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { return SpawnHeadersSnapshotGenerationStage(s, tx, snapshotHeaders, firstCycle, ctx) }, - UnwindFunc: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { - return UnwindHeadersSnapshotGenerationStage(u, s, tx, snapshotHeaders, ctx) + Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { + return UnwindHeadersSnapshotGenerationStage(u, tx, snapshotHeaders, ctx) + }, + Prune: func(firstCycle bool, p *PruneState, tx ethdb.RwTx) error { + return PruneHeadersSnapshotGenerationStage(p, tx, snapshotHeaders, ctx) }, }, { ID: stages.Bodies, Description: "Download block bodies", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { return BodiesForward(s, u, ctx, tx, bodies, test) }, - UnwindFunc: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { - return UnwindBodiesStage(u, s, tx, bodies) + Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { + return UnwindBodiesStage(u, tx, bodies, ctx) + }, + Prune: func(firstCycle bool, p *PruneState, tx ethdb.RwTx) error { + return PruneBodiesStage(p, tx, bodies, ctx) }, }, { @@ -80,161 +92,249 @@ func DefaultStages(ctx context.Context, Description: "Create bodies snapshot", Disabled: true, DisabledDescription: "Enable by --snapshot.layout", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { return SpawnBodiesSnapshotGenerationStage(s, tx, snapshotBodies, ctx) }, - UnwindFunc: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { - return UnwindBodiesSnapshotGenerationStage(u, s, tx, snapshotBodies, ctx) + Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { + return UnwindBodiesSnapshotGenerationStage(u, tx, snapshotBodies, ctx) + }, + Prune: func(firstCycle bool, p *PruneState, tx ethdb.RwTx) error { + return PruneBodiesSnapshotGenerationStage(p, tx, snapshotBodies, ctx) }, }, { ID: stages.Senders, Description: "Recover senders from tx signatures", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { return SpawnRecoverSendersStage(senders, s, u, tx, 0, ctx) }, - UnwindFunc: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { - return UnwindSendersStage(u, s, tx, senders) + Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { + return UnwindSendersStage(u, tx, senders, ctx) + }, + Prune: func(firstCycle bool, p *PruneState, tx ethdb.RwTx) error { + return PruneSendersStage(p, tx, senders, ctx) }, }, { ID: stages.Execution, Description: "Execute blocks w/o hash checks", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { return SpawnExecuteBlocksStage(s, u, tx, 0, ctx, exec, firstCycle) }, - UnwindFunc: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { + Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { return UnwindExecutionStage(u, s, tx, ctx, exec, firstCycle) }, + Prune: func(firstCycle bool, p *PruneState, tx ethdb.RwTx) error { + return PruneExecutionStage(p, tx, exec, ctx, firstCycle) + }, }, { ID: stages.Translation, Description: "Transpile marked EVM contracts to TEVM", Disabled: !sm.TEVM, DisabledDescription: "Enable by adding `e` to --storage-mode", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { return SpawnTranspileStage(s, tx, 0, trans, ctx) }, - UnwindFunc: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { + Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { return UnwindTranspileStage(u, s, tx, trans, ctx) }, + Prune: func(firstCycle bool, p *PruneState, tx ethdb.RwTx) error { + return PruneTranspileStage(p, tx, trans, firstCycle, ctx) + }, }, { ID: stages.CreateStateSnapshot, Description: "Create state snapshot", Disabled: true, DisabledDescription: "Enable by --snapshot.layout", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { - return SpawnStateSnapshotGenerationStage(s, tx, snapshotState, ctx.Done()) + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + return SpawnStateSnapshotGenerationStage(s, tx, snapshotState, ctx) }, - UnwindFunc: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { - return UnwindStateSnapshotGenerationStage(u, s, tx, snapshotState, ctx.Done()) + Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { + return UnwindStateSnapshotGenerationStage(u, tx, snapshotState, ctx) + }, + Prune: func(firstCycle bool, p *PruneState, tx ethdb.RwTx) error { + return PruneStateSnapshotGenerationStage(p, tx, snapshotState, ctx) }, }, { ID: stages.HashState, Description: "Hash the key in the state", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { return SpawnHashStateStage(s, tx, hashState, ctx) }, - UnwindFunc: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { + Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { return UnwindHashStateStage(u, s, tx, hashState, ctx) }, + Prune: func(firstCycle bool, p *PruneState, tx ethdb.RwTx) error { + return PruneHashStateStage(p, tx, hashState, ctx) + }, }, { ID: stages.IntermediateHashes, Description: "Generate intermediate hashes and computing state root", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { _, err := SpawnIntermediateHashesStage(s, u, tx, trieCfg, ctx) return err }, - UnwindFunc: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { + Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { return UnwindIntermediateHashesStage(u, s, tx, trieCfg, ctx) }, + Prune: func(firstCycle bool, p *PruneState, tx ethdb.RwTx) error { + return PruneIntermediateHashesStage(p, tx, trieCfg, ctx) + }, }, { ID: stages.CallTraces, Description: "Generate call traces index", DisabledDescription: "Work In Progress", Disabled: !sm.CallTraces, - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { return SpawnCallTraces(s, tx, callTraces, ctx) }, - UnwindFunc: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { + Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { return UnwindCallTraces(u, s, tx, callTraces, ctx) }, + Prune: func(firstCycle bool, p *PruneState, tx ethdb.RwTx) error { + return PruneCallTraces(p, tx, callTraces, ctx) + }, }, { ID: stages.AccountHistoryIndex, Description: "Generate account history index", Disabled: !sm.History, DisabledDescription: "Enable by adding `h` to --storage-mode", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { return SpawnAccountHistoryIndex(s, tx, history, ctx) }, - UnwindFunc: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { + Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { return UnwindAccountHistoryIndex(u, s, tx, history, ctx) }, + Prune: func(firstCycle bool, p *PruneState, tx ethdb.RwTx) error { + return PruneAccountHistoryIndex(p, tx, history, ctx) + }, }, { ID: stages.StorageHistoryIndex, Description: "Generate storage history index", Disabled: !sm.History, DisabledDescription: "Enable by adding `h` to --storage-mode", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { return SpawnStorageHistoryIndex(s, tx, history, ctx) }, - UnwindFunc: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { + Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { return UnwindStorageHistoryIndex(u, s, tx, history, ctx) }, + Prune: func(firstCycle bool, p *PruneState, tx ethdb.RwTx) error { + return PruneStorageHistoryIndex(p, tx, history, ctx) + }, }, { ID: stages.LogIndex, Description: "Generate receipt logs index", Disabled: !sm.Receipts, DisabledDescription: "Enable by adding `r` to --storage-mode", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { return SpawnLogIndex(s, tx, logIndex, ctx) }, - UnwindFunc: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { + Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { return UnwindLogIndex(u, s, tx, logIndex, ctx) }, + Prune: func(firstCycle bool, p *PruneState, tx ethdb.RwTx) error { + return PruneLogIndex(p, tx, logIndex, ctx) + }, }, { ID: stages.TxLookup, Description: "Generate tx lookup index", Disabled: !sm.TxIndex, DisabledDescription: "Enable by adding `t` to --storage-mode", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { return SpawnTxLookup(s, tx, txLookup, ctx) }, - UnwindFunc: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { + Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { return UnwindTxLookup(u, s, tx, txLookup, ctx) }, + Prune: func(firstCycle bool, p *PruneState, tx ethdb.RwTx) error { + return PruneTxLookup(p, tx, txLookup, ctx) + }, }, { ID: stages.TxPool, Description: "Update transaction pool", - ExecFunc: func(firstCycle bool, s *StageState, _ Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, _ Unwinder, tx ethdb.RwTx) error { return SpawnTxPool(s, tx, txPool, ctx) }, - UnwindFunc: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { + Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { return UnwindTxPool(u, s, tx, txPool, ctx) }, + Prune: func(firstCycle bool, p *PruneState, tx ethdb.RwTx) error { + return PruneTxPool(p, tx, txPool, ctx) + }, }, { ID: stages.Finish, Description: "Final: update current block for the RPC API", - ExecFunc: func(firstCycle bool, s *StageState, _ Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, _ Unwinder, tx ethdb.RwTx) error { return FinishForward(s, tx, finish) }, - UnwindFunc: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { - return UnwindFinish(u, s, tx, finish) + Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { + return UnwindFinish(u, tx, finish, ctx) + }, + Prune: func(firstCycle bool, p *PruneState, tx ethdb.RwTx) error { + return PruneFinish(p, tx, finish, ctx) }, }, } } +func DefaultForwardOrder() UnwindOrder { + return []stages.SyncStage{ + stages.Headers, + stages.BlockHashes, + stages.CreateHeadersSnapshot, + stages.Bodies, + stages.CreateBodiesSnapshot, + stages.Senders, + stages.Execution, + stages.Translation, + stages.CreateStateSnapshot, + stages.HashState, + stages.IntermediateHashes, + stages.CallTraces, + stages.AccountHistoryIndex, + stages.StorageHistoryIndex, + stages.LogIndex, + stages.TxLookup, + stages.TxPool, + stages.Finish, + } +} + +func DefaultPruningOrder() UnwindOrder { + return []stages.SyncStage{ + stages.Headers, + stages.BlockHashes, + stages.CreateHeadersSnapshot, + stages.Bodies, + stages.CreateBodiesSnapshot, + stages.Senders, + stages.Execution, + stages.Translation, + stages.CreateStateSnapshot, + stages.HashState, + stages.IntermediateHashes, + stages.CallTraces, + stages.AccountHistoryIndex, + stages.StorageHistoryIndex, + stages.LogIndex, + stages.TxLookup, + stages.TxPool, + stages.Finish, + } +} + func DefaultUnwindOrder() UnwindOrder { return []stages.SyncStage{ stages.Headers, diff --git a/eth/stagedsync/stage.go b/eth/stagedsync/stage.go index 5022dd58ff3113f6da319023974d1cf63ea00cc5..ca56254c764d93869aff20bbf47980af6d91b6ac 100644 --- a/eth/stagedsync/stage.go +++ b/eth/stagedsync/stage.go @@ -8,12 +8,16 @@ import ( // ExecFunc is the execution function for the stage to move forward. // * state - is the current state of the stage and contains stage data. // * unwinder - if the stage needs to cause unwinding, `unwinder` methods can be used. -type ExecFunc func(firstCycle bool, state *StageState, unwinder Unwinder, tx ethdb.RwTx) error +type ExecFunc func(firstCycle bool, s *StageState, unwinder Unwinder, tx ethdb.RwTx) error // UnwindFunc is the unwinding logic of the stage. // * unwindState - contains information about the unwind itself. // * stageState - represents the state of this stage at the beginning of unwind. -type UnwindFunc func(firstCycle bool, unwindState *UnwindState, state *StageState, tx ethdb.RwTx) error +type UnwindFunc func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error + +// PruneFunc is the execution function for the stage to prune old data. +// * state - is the current state of the stage and contains stage data. +type PruneFunc func(firstCycle bool, p *PruneState, tx ethdb.RwTx) error // Stage is a single sync stage in staged sync. type Stage struct { @@ -21,10 +25,11 @@ type Stage struct { Description string // DisabledDescription shows in the log with a message if the stage is disabled. Here, you can show which command line flags should be provided to enable the page. DisabledDescription string - // ExecFunc is called when the stage is executed. The main logic of the stage should be here. Should always end with `s.Done()` to allow going to the next stage. MUST NOT be nil! - ExecFunc ExecFunc - // UnwindFunc is called when the stage should be unwound. The unwind logic should be there. MUST NOT be nil! - UnwindFunc UnwindFunc + // Forward is called when the stage is executed. The main logic of the stage should be here. Should always end with `s.Done()` to allow going to the next stage. MUST NOT be nil! + Forward ExecFunc + // Unwind is called when the stage should be unwound. The unwind logic should be there. MUST NOT be nil! + Unwind UnwindFunc + Prune PruneFunc // ID of the sync stage. Should not be empty and should be unique. It is recommended to prefix it with reverse domain to avoid clashes (`com.example.my-stage`). ID stages.SyncStage // Disabled defines if the stage is disabled. It sets up when the stage is build by its `StageBuilder`. @@ -38,9 +43,7 @@ type StageState struct { BlockNumber uint64 // BlockNumber is the current block number of the stage at the beginning of the state execution. } -func (s *StageState) LogPrefix() string { - return s.state.LogPrefix() -} +func (s *StageState) LogPrefix() string { return s.state.LogPrefix() } // Update updates the stage state (current block number) in the database. Can be called multiple times during stage execution. func (s *StageState) Update(db ethdb.Putter, newBlockNum uint64) error { @@ -48,7 +51,7 @@ func (s *StageState) Update(db ethdb.Putter, newBlockNum uint64) error { } // Done makes sure that the stage execution is complete and proceeds to the next state. -// If Done() is not called and the stage `ExecFunc` exits, then the same stage will be called again. +// If Done() is not called and the stage `Forward` exits, then the same stage will be called again. // This side effect is useful for something like block body download. func (s *StageState) Done() { s.state.NextStage() diff --git a/eth/stagedsync/stage_blockhashes.go b/eth/stagedsync/stage_blockhashes.go index f0b45c659f71e38c5f9323e8fdc65ce35d03cef7..1fed112472b0f8455f0c7056fb6dc4847745c5ea 100644 --- a/eth/stagedsync/stage_blockhashes.go +++ b/eth/stagedsync/stage_blockhashes.go @@ -33,10 +33,9 @@ func StageBlockHashesCfg(db ethdb.RwKV, tmpDir string) BlockHashesCfg { } } -func SpawnBlockHashStage(s *StageState, tx ethdb.RwTx, cfg BlockHashesCfg, ctx context.Context) error { +func SpawnBlockHashStage(s *StageState, tx ethdb.RwTx, cfg BlockHashesCfg, ctx context.Context) (err error) { useExternalTx := tx != nil if !useExternalTx { - var err error tx, err = cfg.db.BeginRw(ctx) if err != nil { return err @@ -59,7 +58,7 @@ func SpawnBlockHashStage(s *StageState, tx ethdb.RwTx, cfg BlockHashesCfg, ctx c endKey := dbutils.HeaderKey(headNumber, headHash) // Make sure we stop at head //todo do we need non canonical headers ? - logPrefix := s.state.LogPrefix() + logPrefix := s.LogPrefix() if err := etl.Transform( logPrefix, tx, @@ -76,21 +75,20 @@ func SpawnBlockHashStage(s *StageState, tx ethdb.RwTx, cfg BlockHashesCfg, ctx c ); err != nil { return err } - if err := s.DoneAndUpdate(tx, headNumber); err != nil { + if err = s.DoneAndUpdate(tx, headNumber); err != nil { return err } if !useExternalTx { - if err := tx.Commit(); err != nil { + if err = tx.Commit(); err != nil { return err } } return nil } -func UnwindBlockHashStage(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg BlockHashesCfg, ctx context.Context) error { +func UnwindBlockHashStage(u *UnwindState, tx ethdb.RwTx, cfg BlockHashesCfg, ctx context.Context) (err error) { useExternalTx := tx != nil if !useExternalTx { - var err error tx, err = cfg.db.BeginRw(ctx) if err != nil { return err @@ -98,14 +96,35 @@ func UnwindBlockHashStage(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg Bloc defer tx.Rollback() } - err := u.Done(tx) - logPrefix := s.state.LogPrefix() - if err != nil { + logPrefix := u.LogPrefix() + + if err = u.Done(tx); err != nil { return fmt.Errorf("%s: reset: %v", logPrefix, err) } if !useExternalTx { - err = tx.Commit() + if err = tx.Commit(); err != nil { + return fmt.Errorf("%s: failed to write db commit: %v", logPrefix, err) + } + } + return nil +} + +func PruneBlockHashStage(p *PruneState, tx ethdb.RwTx, cfg BlockHashesCfg, ctx context.Context) (err error) { + useExternalTx := tx != nil + if !useExternalTx { + tx, err = cfg.db.BeginRw(ctx) if err != nil { + return err + } + defer tx.Rollback() + } + + logPrefix := p.LogPrefix() + if err = p.Done(tx); err != nil { + return fmt.Errorf("%s: reset: %v", logPrefix, err) + } + if !useExternalTx { + if err = tx.Commit(); err != nil { return fmt.Errorf("%s: failed to write db commit: %v", logPrefix, err) } } diff --git a/eth/stagedsync/stage_bodies.go b/eth/stagedsync/stage_bodies.go index 31fa15e9f1e1ddda60c87a5e40804075e1abb7c6..0af239239f436814cc605ad6bce86f4e3d6db7b6 100644 --- a/eth/stagedsync/stage_bodies.go +++ b/eth/stagedsync/stage_bodies.go @@ -228,25 +228,45 @@ func logProgressBodies(logPrefix string, committed uint64, prevDeliveredCount, d "sys", common.StorageSize(m.Sys)) } -func UnwindBodiesStage(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg BodiesCfg) error { +func UnwindBodiesStage(u *UnwindState, tx ethdb.RwTx, cfg BodiesCfg, ctx context.Context) (err error) { useExternalTx := tx != nil if !useExternalTx { - var err error - tx, err = cfg.db.BeginRw(context.Background()) + tx, err = cfg.db.BeginRw(ctx) if err != nil { return err } defer tx.Rollback() } - err := u.Done(tx) - logPrefix := s.state.LogPrefix() - if err != nil { - return fmt.Errorf("%s: reset: %v", logPrefix, err) + + logPrefix := u.LogPrefix() + if err = u.Done(tx); err != nil { + return fmt.Errorf("[%s]: reset: %v", logPrefix, err) } if !useExternalTx { - err = tx.Commit() + if err = tx.Commit(); err != nil { + return fmt.Errorf("[%s]: failed to write db commit: %v", logPrefix, err) + } + } + return nil +} + +func PruneBodiesStage(s *PruneState, tx ethdb.RwTx, cfg BodiesCfg, ctx context.Context) (err error) { + useExternalTx := tx != nil + if !useExternalTx { + tx, err = cfg.db.BeginRw(ctx) if err != nil { - return fmt.Errorf("%s: failed to write db commit: %v", logPrefix, err) + return err + } + defer tx.Rollback() + } + + logPrefix := s.LogPrefix() + if err = s.Done(tx); err != nil { + return fmt.Errorf("[%s]: reset: %v", logPrefix, err) + } + if !useExternalTx { + if err = tx.Commit(); err != nil { + return fmt.Errorf("[%s]: failed to write db commit: %v", logPrefix, err) } } return nil diff --git a/eth/stagedsync/stage_bodies_snapshot.go b/eth/stagedsync/stage_bodies_snapshot.go index 85b72423c91d2373b779951ae4934fe7a481287d..a1aca4de2387b9bbccd822820bcb95ea34b1bbb3 100644 --- a/eth/stagedsync/stage_bodies_snapshot.go +++ b/eth/stagedsync/stage_bodies_snapshot.go @@ -31,10 +31,9 @@ func SpawnBodiesSnapshotGenerationStage(s *StageState, tx ethdb.RwTx, cfg Snapsh return nil } -func UnwindBodiesSnapshotGenerationStage(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg SnapshotBodiesCfg, ctx context.Context) error { +func UnwindBodiesSnapshotGenerationStage(s *UnwindState, tx ethdb.RwTx, cfg SnapshotBodiesCfg, ctx context.Context) (err error) { useExternalTx := tx != nil if !useExternalTx { - var err error tx, err = cfg.db.BeginRw(ctx) if err != nil { return err @@ -42,7 +41,28 @@ func UnwindBodiesSnapshotGenerationStage(u *UnwindState, s *StageState, tx ethdb defer tx.Rollback() } - if err := u.Done(tx); err != nil { + if err := s.Done(tx); err != nil { + return err + } + if !useExternalTx { + if err := tx.Commit(); err != nil { + return err + } + } + return nil +} + +func PruneBodiesSnapshotGenerationStage(s *PruneState, tx ethdb.RwTx, cfg SnapshotBodiesCfg, ctx context.Context) (err error) { + useExternalTx := tx != nil + if !useExternalTx { + tx, err = cfg.db.BeginRw(ctx) + if err != nil { + return err + } + defer tx.Rollback() + } + + if err := s.Done(tx); err != nil { return err } if !useExternalTx { diff --git a/eth/stagedsync/stage_call_traces.go b/eth/stagedsync/stage_call_traces.go index 5715e5338dae9e9cf119e60bea2c40e76e43dcf5..2193aa9e0f5174da2902dc86b57fd26de19a5c3f 100644 --- a/eth/stagedsync/stage_call_traces.go +++ b/eth/stagedsync/stage_call_traces.go @@ -68,7 +68,7 @@ func SpawnCallTraces(s *StageState, tx ethdb.RwTx, cfg CallTracesCfg, ctx contex if cfg.ToBlock > 0 && cfg.ToBlock < endBlock { endBlock = cfg.ToBlock } - logPrefix := s.state.LogPrefix() + logPrefix := s.LogPrefix() if err != nil { return fmt.Errorf("%s: getting last executed block: %w", logPrefix, err) } @@ -266,14 +266,13 @@ func finaliseCallTraces(collectorFrom, collectorTo *etl.Collector, logPrefix str return nil } -func UnwindCallTraces(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg CallTracesCfg, ctx context.Context) error { +func UnwindCallTraces(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg CallTracesCfg, ctx context.Context) (err error) { if s.BlockNumber <= u.UnwindPoint { return nil } useExternalTx := tx != nil if !useExternalTx { - var err error - tx, err = cfg.db.BeginRw(context.Background()) + tx, err = cfg.db.BeginRw(ctx) if err != nil { return err } @@ -281,7 +280,7 @@ func UnwindCallTraces(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg CallTrac } quitCh := ctx.Done() - logPrefix := s.state.LogPrefix() + logPrefix := s.LogPrefix() if err := unwindCallTraces(logPrefix, tx, s.BlockNumber, u.UnwindPoint, quitCh, cfg); err != nil { return fmt.Errorf("[%s] %w", logPrefix, err) } @@ -410,3 +409,24 @@ func (ct *CallTracer) CaptureAccountRead(account common.Address) error { func (ct *CallTracer) CaptureAccountWrite(account common.Address) error { return nil } + +func PruneCallTraces(s *PruneState, tx ethdb.RwTx, cfg CallTracesCfg, ctx context.Context) (err error) { + useExternalTx := tx != nil + if !useExternalTx { + tx, err = cfg.db.BeginRw(ctx) + if err != nil { + return err + } + defer tx.Rollback() + } + + if err = s.Done(tx); err != nil { + return err + } + if !useExternalTx { + if err = tx.Commit(); err != nil { + return err + } + } + return nil +} diff --git a/eth/stagedsync/stage_execute.go b/eth/stagedsync/stage_execute.go index ce12a49a271da7043b0b6d087d8172be3e34bae1..c4f464652bf97d5c8f7ac2835d724072ceba1338 100644 --- a/eth/stagedsync/stage_execute.go +++ b/eth/stagedsync/stage_execute.go @@ -227,11 +227,10 @@ func newStateReaderWriter( return stateReader, stateWriter } -func SpawnExecuteBlocksStage(s *StageState, u Unwinder, tx ethdb.RwTx, toBlock uint64, ctx context.Context, cfg ExecuteBlockCfg, initialCycle bool) error { +func SpawnExecuteBlocksStage(s *StageState, u Unwinder, tx ethdb.RwTx, toBlock uint64, ctx context.Context, cfg ExecuteBlockCfg, initialCycle bool) (err error) { quit := ctx.Done() useExternalTx := tx != nil if !useExternalTx { - var err error tx, err = cfg.db.BeginRw(context.Background()) if err != nil { return err @@ -251,7 +250,7 @@ func SpawnExecuteBlocksStage(s *StageState, u Unwinder, tx ethdb.RwTx, toBlock u s.Done() return nil } - logPrefix := s.state.LogPrefix() + logPrefix := s.LogPrefix() if to > s.BlockNumber+16 { log.Info(fmt.Sprintf("[%s] Blocks execution", logPrefix), "from", s.BlockNumber, "to", to) } @@ -346,24 +345,24 @@ Loop: } } - if err := s.Update(batch, stageProgress); err != nil { + if err = s.Update(batch, stageProgress); err != nil { return err } - if err := batch.Commit(); err != nil { + if err = batch.Commit(); err != nil { return fmt.Errorf("%s: failed to write batch commit: %v", logPrefix, err) } // Prune changesets if needed if cfg.pruningDistance > 0 { - if err := pruneDupSortedBucket(tx, logPrefix, "account changesets", dbutils.AccountChangeSetBucket, to, cfg.pruningDistance, logEvery.C); err != nil { + if err = pruneDupSortedBucket(tx, logPrefix, "account changesets", dbutils.AccountChangeSetBucket, to, cfg.pruningDistance, logEvery.C); err != nil { return err } - if err := pruneDupSortedBucket(tx, logPrefix, "storage changesets", dbutils.StorageChangeSetBucket, to, cfg.pruningDistance, logEvery.C); err != nil { + if err = pruneDupSortedBucket(tx, logPrefix, "storage changesets", dbutils.StorageChangeSetBucket, to, cfg.pruningDistance, logEvery.C); err != nil { return err } } if !useExternalTx { - if err := tx.Commit(); err != nil { + if err = tx.Commit(); err != nil { return err } } @@ -440,7 +439,7 @@ func logProgress(logPrefix string, prevBlock uint64, prevTime time.Time, current return currentBlock, currentTx, currentTime } -func UnwindExecutionStage(u *UnwindState, s *StageState, tx ethdb.RwTx, ctx context.Context, cfg ExecuteBlockCfg, initialCycle bool) error { +func UnwindExecutionStage(u *UnwindState, s *StageState, tx ethdb.RwTx, ctx context.Context, cfg ExecuteBlockCfg, initialCycle bool) (err error) { quit := ctx.Done() if u.UnwindPoint >= s.BlockNumber { s.Done() @@ -448,25 +447,24 @@ func UnwindExecutionStage(u *UnwindState, s *StageState, tx ethdb.RwTx, ctx cont } useExternalTx := tx != nil if !useExternalTx { - var err error tx, err = cfg.db.BeginRw(context.Background()) if err != nil { return err } defer tx.Rollback() } - logPrefix := s.state.LogPrefix() + logPrefix := s.LogPrefix() log.Info(fmt.Sprintf("[%s] Unwind Execution", logPrefix), "from", s.BlockNumber, "to", u.UnwindPoint) - if err := unwindExecutionStage(u, s, tx, quit, cfg, initialCycle); err != nil { + if err = unwindExecutionStage(u, s, tx, quit, cfg, initialCycle); err != nil { return err } - if err := u.Done(tx); err != nil { + if err = u.Done(tx); err != nil { return fmt.Errorf("%s: reset: %v", logPrefix, err) } if !useExternalTx { - if err := tx.Commit(); err != nil { + if err = tx.Commit(); err != nil { return err } } @@ -474,7 +472,7 @@ func UnwindExecutionStage(u *UnwindState, s *StageState, tx ethdb.RwTx, ctx cont } func unwindExecutionStage(u *UnwindState, s *StageState, tx ethdb.RwTx, quit <-chan struct{}, cfg ExecuteBlockCfg, initialCycle bool) error { - logPrefix := s.state.LogPrefix() + logPrefix := s.LogPrefix() stateBucket := dbutils.PlainStateBucket storageKeyLength := common.AddressLength + common.IncarnationLength + common.HashLength @@ -614,3 +612,25 @@ func min(a, b uint64) uint64 { } return b } + +func PruneExecutionStage(p *PruneState, tx ethdb.RwTx, cfg ExecuteBlockCfg, ctx context.Context, initialCycle bool) (err error) { + useExternalTx := tx != nil + if !useExternalTx { + tx, err = cfg.db.BeginRw(ctx) + if err != nil { + return err + } + defer tx.Rollback() + } + + logPrefix := p.LogPrefix() + if err = p.Done(tx); err != nil { + return fmt.Errorf("%s: reset: %v", logPrefix, err) + } + if !useExternalTx { + if err = tx.Commit(); err != nil { + return fmt.Errorf("%s: failed to write db commit: %v", logPrefix, err) + } + } + return nil +} diff --git a/eth/stagedsync/stage_finish.go b/eth/stagedsync/stage_finish.go index c4dee99015afe94918eeedbb0bf9b9af25972a9a..8980b5f06d5544a8f44236aa010e606e1728d995 100644 --- a/eth/stagedsync/stage_finish.go +++ b/eth/stagedsync/stage_finish.go @@ -75,22 +75,42 @@ func FinishForward(s *StageState, tx ethdb.RwTx, cfg FinishCfg) error { return nil } -func UnwindFinish(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg FinishCfg) error { +func UnwindFinish(u *UnwindState, tx ethdb.RwTx, cfg FinishCfg, ctx context.Context) (err error) { useExternalTx := tx != nil if !useExternalTx { - var err error - tx, err = cfg.db.BeginRw(context.Background()) + tx, err = cfg.db.BeginRw(ctx) if err != nil { return err } defer tx.Rollback() } - err := u.Done(tx) - if err != nil { + + if err = u.Done(tx); err != nil { return err } if !useExternalTx { - if err := tx.Commit(); err != nil { + if err = tx.Commit(); err != nil { + return err + } + } + return nil +} + +func PruneFinish(u *PruneState, tx ethdb.RwTx, cfg FinishCfg, ctx context.Context) (err error) { + useExternalTx := tx != nil + if !useExternalTx { + tx, err = cfg.db.BeginRw(ctx) + if err != nil { + return err + } + defer tx.Rollback() + } + + if err = u.Done(tx); err != nil { + return err + } + if !useExternalTx { + if err = tx.Commit(); err != nil { return err } } diff --git a/eth/stagedsync/stage_hashstate.go b/eth/stagedsync/stage_hashstate.go index 7454340e6cb798475be7524b7e5c70b842333e26..0b74bb9837e111309e4c82f51e126a48a2feb9a7 100644 --- a/eth/stagedsync/stage_hashstate.go +++ b/eth/stagedsync/stage_hashstate.go @@ -38,7 +38,7 @@ func SpawnHashStateStage(s *StageState, tx ethdb.RwTx, cfg HashStateCfg, ctx con defer tx.Rollback() } - logPrefix := s.state.LogPrefix() + logPrefix := s.LogPrefix() to, err := s.ExecutionAt(tx) if err != nil { return fmt.Errorf("[%s] %w", logPrefix, err) @@ -79,26 +79,25 @@ func SpawnHashStateStage(s *StageState, tx ethdb.RwTx, cfg HashStateCfg, ctx con return nil } -func UnwindHashStateStage(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg HashStateCfg, ctx context.Context) error { +func UnwindHashStateStage(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg HashStateCfg, ctx context.Context) (err error) { useExternalTx := tx != nil if !useExternalTx { - var err error - tx, err = cfg.db.BeginRw(context.Background()) + tx, err = cfg.db.BeginRw(ctx) if err != nil { return err } defer tx.Rollback() } - logPrefix := s.state.LogPrefix() - if err := unwindHashStateStageImpl(logPrefix, u, s, tx, cfg, ctx.Done()); err != nil { + logPrefix := s.LogPrefix() + if err = unwindHashStateStageImpl(logPrefix, u, s, tx, cfg, ctx.Done()); err != nil { return fmt.Errorf("[%s] %w", logPrefix, err) } - if err := u.Done(tx); err != nil { + if err = u.Done(tx); err != nil { return fmt.Errorf("%s: reset: %v", logPrefix, err) } if !useExternalTx { - if err := tx.Commit(); err != nil { + if err = tx.Commit(); err != nil { return err } } @@ -522,3 +521,24 @@ func promoteHashedStateIncrementally(logPrefix string, s *StageState, from, to u } return nil } + +func PruneHashStateStage(s *PruneState, tx ethdb.RwTx, cfg HashStateCfg, ctx context.Context) (err error) { + useExternalTx := tx != nil + if !useExternalTx { + tx, err = cfg.db.BeginRw(ctx) + if err != nil { + return err + } + defer tx.Rollback() + } + + if err = s.Done(tx); err != nil { + return err + } + if !useExternalTx { + if err = tx.Commit(); err != nil { + return err + } + } + return nil +} diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index e89410914a465d5f13a5b8a4d34edcc0ba52d4eb..39661e6d349c0041967c4d01a64295fc7e010e8c 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -240,8 +240,7 @@ func fixCanonicalChain(logPrefix string, logEvery *time.Ticker, height uint64, h return nil } -func HeadersUnwind(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg HeadersCfg) error { - var err error +func HeadersUnwind(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg HeadersCfg) (err error) { useExternalTx := tx != nil if !useExternalTx { tx, err = cfg.db.BeginRw(context.Background()) @@ -371,3 +370,25 @@ func (cr epochReader) GetEpoch(hash common.Hash, number uint64) ([]byte, error) func (cr epochReader) PutEpoch(hash common.Hash, number uint64, proof []byte) error { return rawdb.WriteEpoch(cr.tx, number, hash, proof) } + +func HeadersPrune(p *PruneState, tx ethdb.RwTx, cfg HeadersCfg, ctx context.Context) (err error) { + useExternalTx := tx != nil + if !useExternalTx { + tx, err = cfg.db.BeginRw(ctx) + if err != nil { + return err + } + defer tx.Rollback() + } + + logPrefix := p.LogPrefix() + if err = p.Done(tx); err != nil { + return fmt.Errorf("%s: reset: %v", logPrefix, err) + } + if !useExternalTx { + if err = tx.Commit(); err != nil { + return fmt.Errorf("%s: failed to write db commit: %v", logPrefix, err) + } + } + return nil +} diff --git a/eth/stagedsync/stage_headers_snapshot.go b/eth/stagedsync/stage_headers_snapshot.go index 9a69d4933c88b88815dd4a7079e59bfd705ed5d5..3f8aa154f856fe449269a96a6521f876af89a3d2 100644 --- a/eth/stagedsync/stage_headers_snapshot.go +++ b/eth/stagedsync/stage_headers_snapshot.go @@ -122,10 +122,30 @@ func SpawnHeadersSnapshotGenerationStage(s *StageState, tx ethdb.RwTx, cfg Snaps return nil } -func UnwindHeadersSnapshotGenerationStage(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg SnapshotHeadersCfg, ctx context.Context) error { +func UnwindHeadersSnapshotGenerationStage(u *UnwindState, tx ethdb.RwTx, cfg SnapshotHeadersCfg, ctx context.Context) (err error) { + useExternalTx := tx != nil + if !useExternalTx { + tx, err = cfg.db.BeginRw(ctx) + if err != nil { + return err + } + defer tx.Rollback() + } + + if err := u.Done(tx); err != nil { + return err + } + if !useExternalTx { + if err := tx.Commit(); err != nil { + return err + } + } + return nil +} + +func PruneHeadersSnapshotGenerationStage(u *PruneState, tx ethdb.RwTx, cfg SnapshotHeadersCfg, ctx context.Context) (err error) { useExternalTx := tx != nil if !useExternalTx { - var err error tx, err = cfg.db.BeginRw(ctx) if err != nil { return err diff --git a/eth/stagedsync/stage_indexes.go b/eth/stagedsync/stage_indexes.go index 368e900cef744db7b0b255f2cfdd4ded9f8747b7..2d695ed77546c01a44fc2dc22505d32b56100e01 100644 --- a/eth/stagedsync/stage_indexes.go +++ b/eth/stagedsync/stage_indexes.go @@ -50,7 +50,7 @@ func SpawnAccountHistoryIndex(s *StageState, tx ethdb.RwTx, cfg HistoryCfg, ctx quitCh := ctx.Done() executionAt, err := s.ExecutionAt(tx) - logPrefix := s.state.LogPrefix() + logPrefix := s.LogPrefix() if err != nil { return fmt.Errorf("%s: getting last executed block: %w", logPrefix, err) } @@ -94,7 +94,7 @@ func SpawnStorageHistoryIndex(s *StageState, tx ethdb.RwTx, cfg HistoryCfg, ctx quitCh := ctx.Done() executionAt, err := s.ExecutionAt(tx) - logPrefix := s.state.LogPrefix() + logPrefix := s.LogPrefix() if err != nil { return fmt.Errorf("%s: logs index: getting last executed block: %w", logPrefix, err) } @@ -222,10 +222,9 @@ func promoteHistory(logPrefix string, tx ethdb.RwTx, changesetBucket string, sta return nil } -func UnwindAccountHistoryIndex(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg HistoryCfg, ctx context.Context) error { +func UnwindAccountHistoryIndex(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg HistoryCfg, ctx context.Context) (err error) { useExternalTx := tx != nil if !useExternalTx { - var err error tx, err = cfg.db.BeginRw(ctx) if err != nil { return err @@ -234,7 +233,7 @@ func UnwindAccountHistoryIndex(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg } quitCh := ctx.Done() - logPrefix := s.state.LogPrefix() + logPrefix := s.LogPrefix() if err := unwindHistory(logPrefix, tx, dbutils.AccountChangeSetBucket, u.UnwindPoint, cfg, quitCh); err != nil { return fmt.Errorf("[%s] %w", logPrefix, err) } @@ -251,11 +250,11 @@ func UnwindAccountHistoryIndex(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg return nil } -func UnwindStorageHistoryIndex(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg HistoryCfg, ctx context.Context) error { +func UnwindStorageHistoryIndex(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg HistoryCfg, ctx context.Context) (err error) { useExternalTx := tx != nil if !useExternalTx { var err error - tx, err = cfg.db.BeginRw(context.Background()) + tx, err = cfg.db.BeginRw(ctx) if err != nil { return err } @@ -263,7 +262,7 @@ func UnwindStorageHistoryIndex(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg } quitCh := ctx.Done() - logPrefix := s.state.LogPrefix() + logPrefix := s.LogPrefix() if err := unwindHistory(logPrefix, tx, dbutils.StorageChangeSetBucket, u.UnwindPoint, cfg, quitCh); err != nil { return fmt.Errorf("[%s] %w", logPrefix, err) } @@ -349,3 +348,45 @@ func truncateBitmaps64(tx ethdb.RwTx, bucket string, inMem map[string]struct{}, return nil } + +func PruneAccountHistoryIndex(s *PruneState, tx ethdb.RwTx, cfg HistoryCfg, ctx context.Context) (err error) { + useExternalTx := tx != nil + if !useExternalTx { + tx, err = cfg.db.BeginRw(ctx) + if err != nil { + return err + } + defer tx.Rollback() + } + + if err = s.Done(tx); err != nil { + return err + } + if !useExternalTx { + if err = tx.Commit(); err != nil { + return err + } + } + return nil +} + +func PruneStorageHistoryIndex(s *PruneState, tx ethdb.RwTx, cfg HistoryCfg, ctx context.Context) (err error) { + useExternalTx := tx != nil + if !useExternalTx { + tx, err = cfg.db.BeginRw(ctx) + if err != nil { + return err + } + defer tx.Rollback() + } + + if err = s.Done(tx); err != nil { + return err + } + if !useExternalTx { + if err = tx.Commit(); err != nil { + return err + } + } + return nil +} diff --git a/eth/stagedsync/stage_interhashes.go b/eth/stagedsync/stage_interhashes.go index 2ad58440156747cacddc81d0f96ec9c56f56722e..fb15ec4e5a0fec21961484dd50797a48f13deae9 100644 --- a/eth/stagedsync/stage_interhashes.go +++ b/eth/stagedsync/stage_interhashes.go @@ -74,7 +74,7 @@ func SpawnIntermediateHashesStage(s *StageState, u Unwinder, tx ethdb.RwTx, cfg headerHash = syncHeadHeader.Hash() } - logPrefix := s.state.LogPrefix() + logPrefix := s.LogPrefix() if to > s.BlockNumber+16 { log.Info(fmt.Sprintf("[%s] Generating intermediate hashes", logPrefix), "from", s.BlockNumber, "to", to) } @@ -391,12 +391,11 @@ func incrementIntermediateHashes(logPrefix string, s *StageState, db ethdb.RwTx, return hash, nil } -func UnwindIntermediateHashesStage(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg TrieCfg, ctx context.Context) error { +func UnwindIntermediateHashesStage(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg TrieCfg, ctx context.Context) (err error) { quit := ctx.Done() useExternalTx := tx != nil if !useExternalTx { - var err error - tx, err = cfg.db.BeginRw(context.Background()) + tx, err = cfg.db.BeginRw(ctx) if err != nil { return err } @@ -417,7 +416,7 @@ func UnwindIntermediateHashesStage(u *UnwindState, s *StageState, tx ethdb.RwTx, // } // } - logPrefix := s.state.LogPrefix() + logPrefix := s.LogPrefix() if err := unwindIntermediateHashesStageImpl(logPrefix, u, s, tx, cfg, expectedRootHash, quit); err != nil { return err } @@ -558,3 +557,24 @@ func storageTrieCollector(collector *etl.Collector) trie.StorageHashCollector2 { return collector.Collect(newK, newV) } } + +func PruneIntermediateHashesStage(s *PruneState, tx ethdb.RwTx, cfg TrieCfg, ctx context.Context) (err error) { + useExternalTx := tx != nil + if !useExternalTx { + tx, err = cfg.db.BeginRw(ctx) + if err != nil { + return err + } + defer tx.Rollback() + } + + if err = s.Done(tx); err != nil { + return err + } + if !useExternalTx { + if err = tx.Commit(); err != nil { + return err + } + } + return nil +} diff --git a/eth/stagedsync/stage_log_index.go b/eth/stagedsync/stage_log_index.go index 8c6483e6087077e292cabb909f5d7a37d3582bfb..fb15a13090cc686cddf1027da34723ed63cdcc1b 100644 --- a/eth/stagedsync/stage_log_index.go +++ b/eth/stagedsync/stage_log_index.go @@ -54,7 +54,7 @@ func SpawnLogIndex(s *StageState, tx ethdb.RwTx, cfg LogIndexCfg, ctx context.Co } endBlock, err := s.ExecutionAt(tx) - logPrefix := s.state.LogPrefix() + logPrefix := s.LogPrefix() if err != nil { return fmt.Errorf("%s: getting last executed block: %w", logPrefix, err) } @@ -217,11 +217,10 @@ func promoteLogIndex(logPrefix string, tx ethdb.RwTx, start uint64, cfg LogIndex return nil } -func UnwindLogIndex(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg LogIndexCfg, ctx context.Context) error { +func UnwindLogIndex(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg LogIndexCfg, ctx context.Context) (err error) { quitCh := ctx.Done() useExternalTx := tx != nil if !useExternalTx { - var err error tx, err = cfg.db.BeginRw(ctx) if err != nil { return err @@ -229,7 +228,7 @@ func UnwindLogIndex(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg LogIndexCf defer tx.Rollback() } - logPrefix := s.state.LogPrefix() + logPrefix := s.LogPrefix() if err := unwindLogIndex(logPrefix, tx, u.UnwindPoint, cfg, quitCh); err != nil { return err } @@ -237,13 +236,11 @@ func UnwindLogIndex(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg LogIndexCf if err := u.Done(tx); err != nil { return fmt.Errorf("%s: %w", logPrefix, err) } - if !useExternalTx { if err := tx.Commit(); err != nil { return err } } - return nil } @@ -326,3 +323,24 @@ func truncateBitmaps(tx ethdb.RwTx, bucket string, inMem map[string]struct{}, to return nil } + +func PruneLogIndex(s *PruneState, tx ethdb.RwTx, cfg LogIndexCfg, ctx context.Context) (err error) { + useExternalTx := tx != nil + if !useExternalTx { + tx, err = cfg.db.BeginRw(ctx) + if err != nil { + return err + } + defer tx.Rollback() + } + + if err = s.Done(tx); err != nil { + return err + } + if !useExternalTx { + if err = tx.Commit(); err != nil { + return err + } + } + return nil +} diff --git a/eth/stagedsync/stage_mining_create_block.go b/eth/stagedsync/stage_mining_create_block.go index a85ebb6a0412d6f96bd94619d92da93d004fc416..1eff9b0e61e64ed39831a99606d12271a422add5 100644 --- a/eth/stagedsync/stage_mining_create_block.go +++ b/eth/stagedsync/stage_mining_create_block.go @@ -96,7 +96,7 @@ func SpawnMiningCreateBlockStage(s *StageState, tx ethdb.RwTx, cfg MiningCreateB return fmt.Errorf("refusing to mine without etherbase") } - logPrefix := s.state.LogPrefix() + logPrefix := s.LogPrefix() executionAt, err := s.ExecutionAt(tx) if err != nil { return fmt.Errorf("%s: getting last executed block: %w", logPrefix, err) diff --git a/eth/stagedsync/stage_mining_exec.go b/eth/stagedsync/stage_mining_exec.go index 88c10cc51eec1a21b6a81dd0125864b097e6d91b..4e58ff03739fec2bb44f2b72870b65d3cce92d29 100644 --- a/eth/stagedsync/stage_mining_exec.go +++ b/eth/stagedsync/stage_mining_exec.go @@ -52,7 +52,7 @@ func StageMiningExecCfg( // - resubmitAdjustCh - variable is not implemented func SpawnMiningExecStage(s *StageState, tx ethdb.RwTx, cfg MiningExecCfg, quit <-chan struct{}) error { cfg.vmConfig.NoReceipts = false - logPrefix := s.state.LogPrefix() + logPrefix := s.LogPrefix() current := cfg.miningState.MiningBlock localTxs := current.LocalTxs remoteTxs := current.RemoteTxs diff --git a/eth/stagedsync/stage_mining_finish.go b/eth/stagedsync/stage_mining_finish.go index 62627a8f3c4a7c11507b53ecd47e81163cb9b0a2..f7da6c125b1c9d04f044a3288d34e453e2675dac 100644 --- a/eth/stagedsync/stage_mining_finish.go +++ b/eth/stagedsync/stage_mining_finish.go @@ -36,7 +36,7 @@ func StageMiningFinishCfg( } func SpawnMiningFinishStage(s *StageState, tx ethdb.RwTx, cfg MiningFinishCfg, quit <-chan struct{}) error { - logPrefix := s.state.LogPrefix() + logPrefix := s.LogPrefix() current := cfg.miningState.MiningBlock // Short circuit when receiving duplicate result caused by resubmitting. diff --git a/eth/stagedsync/stage_senders.go b/eth/stagedsync/stage_senders.go index 259cd7d61b69f78fd955ae662b18a220af854e18..6b013a566c0ff36a046e9c2971e32a640dfd55fe 100644 --- a/eth/stagedsync/stage_senders.go +++ b/eth/stagedsync/stage_senders.go @@ -75,7 +75,7 @@ func SpawnRecoverSendersStage(cfg SendersCfg, s *StageState, u Unwinder, tx ethd s.Done() return nil } - logPrefix := s.state.LogPrefix() + logPrefix := s.LogPrefix() if to > s.BlockNumber+16 { log.Info(fmt.Sprintf("[%s] Started", logPrefix), "from", s.BlockNumber, "to", to) } @@ -327,25 +327,44 @@ func recoverSenders(ctx context.Context, logPrefix string, cryptoContext *secp25 } } -func UnwindSendersStage(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg SendersCfg) error { +func UnwindSendersStage(s *UnwindState, tx ethdb.RwTx, cfg SendersCfg, ctx context.Context) (err error) { useExternalTx := tx != nil if !useExternalTx { - var err error - tx, err = cfg.db.BeginRw(context.Background()) + tx, err = cfg.db.BeginRw(ctx) if err != nil { return err } defer tx.Rollback() } - err := u.Done(tx) - logPrefix := s.state.LogPrefix() - if err != nil { + logPrefix := s.LogPrefix() + if err = s.Done(tx); err != nil { return fmt.Errorf("%s: reset: %v", logPrefix, err) } if !useExternalTx { - err = tx.Commit() + if err = tx.Commit(); err != nil { + return fmt.Errorf("%s: failed to write db commit: %v", logPrefix, err) + } + } + return nil +} + +func PruneSendersStage(s *PruneState, tx ethdb.RwTx, cfg SendersCfg, ctx context.Context) (err error) { + useExternalTx := tx != nil + if !useExternalTx { + tx, err = cfg.db.BeginRw(ctx) if err != nil { + return err + } + defer tx.Rollback() + } + + logPrefix := s.LogPrefix() + if err = s.Done(tx); err != nil { + return fmt.Errorf("%s: reset: %v", logPrefix, err) + } + if !useExternalTx { + if err = tx.Commit(); err != nil { return fmt.Errorf("%s: failed to write db commit: %v", logPrefix, err) } } diff --git a/eth/stagedsync/stage_state_snapshot.go b/eth/stagedsync/stage_state_snapshot.go index b2a9e024f0d00395d37741b2e7a3345a10a3f835..6883ef39cc3717860c0d5e1295555438781b20da 100644 --- a/eth/stagedsync/stage_state_snapshot.go +++ b/eth/stagedsync/stage_state_snapshot.go @@ -26,20 +26,17 @@ func StageSnapshotStateCfg(db ethdb.RwKV, snapshot ethconfig.Snapshot, tmpDir st } } -func SpawnStateSnapshotGenerationStage(s *StageState, tx ethdb.RwTx, cfg SnapshotStateCfg, quit <-chan struct{}) error { +func SpawnStateSnapshotGenerationStage(s *StageState, tx ethdb.RwTx, cfg SnapshotStateCfg, ctx context.Context) (err error) { useExternalTx := tx != nil if !useExternalTx { - var err error - tx, err = cfg.db.BeginRw(context.Background()) + tx, err = cfg.db.BeginRw(ctx) if err != nil { return err } defer tx.Rollback() } - err := s.DoneAndUpdate(tx, 0) - - if err != nil { + if err = s.DoneAndUpdate(tx, 0); err != nil { return err } if !useExternalTx { @@ -50,22 +47,42 @@ func SpawnStateSnapshotGenerationStage(s *StageState, tx ethdb.RwTx, cfg Snapsho return nil } -func UnwindStateSnapshotGenerationStage(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg SnapshotStateCfg, quit <-chan struct{}) error { +func UnwindStateSnapshotGenerationStage(s *UnwindState, tx ethdb.RwTx, cfg SnapshotStateCfg, ctx context.Context) (err error) { useExternalTx := tx != nil if !useExternalTx { - var err error - tx, err = cfg.db.BeginRw(context.Background()) + tx, err = cfg.db.BeginRw(ctx) if err != nil { return err } defer tx.Rollback() } - if err := u.Done(tx); err != nil { + if err = s.Done(tx); err != nil { return err } if !useExternalTx { - if err := tx.Commit(); err != nil { + if err = tx.Commit(); err != nil { + return err + } + } + return nil +} + +func PruneStateSnapshotGenerationStage(s *PruneState, tx ethdb.RwTx, cfg SnapshotStateCfg, ctx context.Context) (err error) { + useExternalTx := tx != nil + if !useExternalTx { + tx, err = cfg.db.BeginRw(ctx) + if err != nil { + return err + } + defer tx.Rollback() + } + + if err = s.Done(tx); err != nil { + return err + } + if !useExternalTx { + if err = tx.Commit(); err != nil { return err } } diff --git a/eth/stagedsync/stage_tevm.go b/eth/stagedsync/stage_tevm.go index 864350e3504cc2b385c96f62aa6a5d317908ca01..a23f4fa140ec11883c79d26fd4d2b7030d1387ed 100644 --- a/eth/stagedsync/stage_tevm.go +++ b/eth/stagedsync/stage_tevm.go @@ -74,7 +74,7 @@ func SpawnTranspileStage(s *StageState, tx ethdb.RwTx, toBlock uint64, cfg Trans } stageProgress := uint64(0) - logPrefix := s.state.LogPrefix() + logPrefix := s.LogPrefix() if to > s.BlockNumber+16 { log.Info(fmt.Sprintf("[%s] Contract translation", logPrefix), "from", s.BlockNumber, "to", to) } @@ -277,10 +277,9 @@ func logTEVMProgress(logPrefix string, prevContract uint64, prevTime time.Time, return currentContract, currentTime } -func UnwindTranspileStage(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg TranspileCfg, ctx context.Context) error { +func UnwindTranspileStage(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg TranspileCfg, ctx context.Context) (err error) { useExternalTx := tx != nil if !useExternalTx { - var err error tx, err = cfg.db.BeginRw(ctx) if err != nil { return err @@ -351,14 +350,12 @@ func UnwindTranspileStage(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg Tran } } - err = u.Done(tx) - logPrefix := s.state.LogPrefix() - if err != nil { + logPrefix := s.LogPrefix() + if err = u.Done(tx); err != nil { return fmt.Errorf("%s: reset: %v", logPrefix, err) } if !useExternalTx { - err = tx.Commit() - if err != nil { + if err = tx.Commit(); err != nil { return fmt.Errorf("%s: failed to write db commit: %v", logPrefix, err) } } @@ -369,3 +366,25 @@ func UnwindTranspileStage(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg Tran func transpileCode(code []byte) ([]byte, error) { return append(make([]byte, 0, len(code)), code...), nil } + +func PruneTranspileStage(p *PruneState, tx ethdb.RwTx, cfg TranspileCfg, initialCycle bool, ctx context.Context) (err error) { + useExternalTx := tx != nil + if !useExternalTx { + tx, err = cfg.db.BeginRw(ctx) + if err != nil { + return err + } + defer tx.Rollback() + } + + logPrefix := p.LogPrefix() + if err = p.Done(tx); err != nil { + return fmt.Errorf("%s: reset: %v", logPrefix, err) + } + if !useExternalTx { + if err = tx.Commit(); err != nil { + return fmt.Errorf("%s: failed to write db commit: %v", logPrefix, err) + } + } + return nil +} diff --git a/eth/stagedsync/stage_txlookup.go b/eth/stagedsync/stage_txlookup.go index d6bc988ae17ebc84c9e021836b2756df3b7838c4..13e75f995a2012463bc9747ef330376b9a916f1f 100644 --- a/eth/stagedsync/stage_txlookup.go +++ b/eth/stagedsync/stage_txlookup.go @@ -31,11 +31,10 @@ func StageTxLookupCfg( } } -func SpawnTxLookup(s *StageState, tx ethdb.RwTx, cfg TxLookupCfg, ctx context.Context) error { +func SpawnTxLookup(s *StageState, tx ethdb.RwTx, cfg TxLookupCfg, ctx context.Context) (err error) { quitCh := ctx.Done() useExternalTx := tx != nil if !useExternalTx { - var err error tx, err = cfg.db.BeginRw(ctx) if err != nil { return err @@ -55,7 +54,7 @@ func SpawnTxLookup(s *StageState, tx ethdb.RwTx, cfg TxLookupCfg, ctx context.Co return err } - logPrefix := s.state.LogPrefix() + logPrefix := s.LogPrefix() startKey = dbutils.EncodeBlockNumber(blockNum) if err = TxLookupTransform(logPrefix, tx, startKey, dbutils.EncodeBlockNumber(syncHeadNumber), quitCh, cfg); err != nil { return err @@ -98,14 +97,13 @@ func TxLookupTransform(logPrefix string, tx ethdb.RwTx, startKey, endKey []byte, }) } -func UnwindTxLookup(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg TxLookupCfg, ctx context.Context) error { +func UnwindTxLookup(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg TxLookupCfg, ctx context.Context) (err error) { quitCh := ctx.Done() if s.BlockNumber <= u.UnwindPoint { return nil } useExternalTx := tx != nil if !useExternalTx { - var err error tx, err = cfg.db.BeginRw(ctx) if err != nil { return err @@ -119,7 +117,6 @@ func UnwindTxLookup(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg TxLookupCf if err := u.Done(tx); err != nil { return err } - if !useExternalTx { if err := tx.Commit(); err != nil { return err @@ -132,7 +129,7 @@ func unwindTxLookup(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg TxLookupCf collector := etl.NewCollector(cfg.tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize)) defer collector.Close("TxLookup") - logPrefix := s.state.LogPrefix() + logPrefix := s.LogPrefix() c, err := tx.Cursor(dbutils.BlockBodyPrefix) if err != nil { return err @@ -174,3 +171,24 @@ func unwindTxLookup(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg TxLookupCf } return nil } + +func PruneTxLookup(s *PruneState, tx ethdb.RwTx, cfg TxLookupCfg, ctx context.Context) (err error) { + useExternalTx := tx != nil + if !useExternalTx { + tx, err = cfg.db.BeginRw(ctx) + if err != nil { + return err + } + defer tx.Rollback() + } + + if err = s.Done(tx); err != nil { + return err + } + if !useExternalTx { + if err = tx.Commit(); err != nil { + return err + } + } + return nil +} diff --git a/eth/stagedsync/stage_txpool.go b/eth/stagedsync/stage_txpool.go index aaa90fc16f8daa4e18d0f952cecfc47278543106..fa8e6d218143de91dc1c1b297a4cbf654456a7d2 100644 --- a/eth/stagedsync/stage_txpool.go +++ b/eth/stagedsync/stage_txpool.go @@ -48,7 +48,7 @@ func SpawnTxPool(s *StageState, tx ethdb.RwTx, cfg TxPoolCfg, ctx context.Contex return nil } - logPrefix := s.state.LogPrefix() + logPrefix := s.LogPrefix() if to < s.BlockNumber { return fmt.Errorf("%s: to (%d) < from (%d)", logPrefix, to, s.BlockNumber) } @@ -149,15 +149,14 @@ func incrementalTxPoolUpdate(logPrefix string, from, to uint64, pool *core.TxPoo return nil } -func UnwindTxPool(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg TxPoolCfg, ctx context.Context) error { +func UnwindTxPool(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg TxPoolCfg, ctx context.Context) (err error) { if u.UnwindPoint >= s.BlockNumber { s.Done() return nil } useExternalTx := tx != nil if !useExternalTx { - var err error - tx, err = cfg.db.BeginRw(context.Background()) + tx, err = cfg.db.BeginRw(ctx) if err != nil { return err } @@ -165,7 +164,7 @@ func UnwindTxPool(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg TxPoolCfg, c } quitCh := ctx.Done() - logPrefix := s.state.LogPrefix() + logPrefix := s.LogPrefix() if cfg.pool != nil && cfg.pool.IsStarted() { if err := unwindTxPoolUpdate(logPrefix, u.UnwindPoint, s.BlockNumber, cfg.pool, tx, quitCh); err != nil { return fmt.Errorf("[%s]: %w", logPrefix, err) @@ -280,3 +279,24 @@ func unwindTxPoolUpdate(logPrefix string, from, to uint64, pool *core.TxPool, tx log.Info(fmt.Sprintf("[%s] Injection complete", logPrefix)) return nil } + +func PruneTxPool(s *PruneState, tx ethdb.RwTx, cfg TxPoolCfg, ctx context.Context) (err error) { + useExternalTx := tx != nil + if !useExternalTx { + tx, err = cfg.db.BeginRw(ctx) + if err != nil { + return err + } + defer tx.Rollback() + } + + if err = s.Done(tx); err != nil { + return err + } + if !useExternalTx { + if err = tx.Commit(); err != nil { + return err + } + } + return nil +} diff --git a/eth/stagedsync/stagebuilder.go b/eth/stagedsync/stagebuilder.go index f3dc4558d8f3c8d31385ff12376aa2ffe1da24c2..79de385eefa66f09bc4dd5c7eebb295397b598da 100644 --- a/eth/stagedsync/stagebuilder.go +++ b/eth/stagedsync/stagebuilder.go @@ -32,31 +32,34 @@ func MiningStages( { ID: stages.MiningCreateBlock, Description: "Mining: construct new block from tx pool", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { return SpawnMiningCreateBlockStage(s, tx, createBlockCfg, ctx.Done()) }, - UnwindFunc: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { return nil }, + Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { return nil }, + Prune: func(firstCycle bool, u *PruneState, tx ethdb.RwTx) error { return nil }, }, { ID: stages.MiningExecution, Description: "Mining: construct new block from tx pool", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { return SpawnMiningExecStage(s, tx, execCfg, ctx.Done()) }, - UnwindFunc: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { return nil }, + Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { return nil }, + Prune: func(firstCycle bool, u *PruneState, tx ethdb.RwTx) error { return nil }, }, { ID: stages.HashState, Description: "Hash the key in the state", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { return SpawnHashStateStage(s, tx, hashStateCfg, ctx) }, - UnwindFunc: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { return nil }, + Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { return nil }, + Prune: func(firstCycle bool, u *PruneState, tx ethdb.RwTx) error { return nil }, }, { ID: stages.IntermediateHashes, Description: "Generate intermediate hashes and computing state root", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { stateRoot, err := SpawnIntermediateHashesStage(s, u, tx, trieCfg, ctx) if err != nil { return err @@ -64,15 +67,17 @@ func MiningStages( createBlockCfg.miner.MiningBlock.Header.Root = stateRoot return nil }, - UnwindFunc: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { return nil }, + Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { return nil }, + Prune: func(firstCycle bool, u *PruneState, tx ethdb.RwTx) error { return nil }, }, { ID: stages.MiningFinish, Description: "Mining: create and propagate valid block", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { return SpawnMiningFinishStage(s, tx, finish, ctx.Done()) }, - UnwindFunc: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { return nil }, + Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { return nil }, + Prune: func(firstCycle bool, u *PruneState, tx ethdb.RwTx) error { return nil }, }, } } diff --git a/eth/stagedsync/stages/stages.go b/eth/stagedsync/stages/stages.go index ac4c6ac519f5ea962dd1d82a962d6ff3f54f3291..3efd57c485ebd55a2a384e328266a1a032c01f6d 100644 --- a/eth/stagedsync/stages/stages.go +++ b/eth/stagedsync/stages/stages.go @@ -102,6 +102,11 @@ func SaveStageUnwind(db ethdb.Putter, stage SyncStage, invalidation uint64) erro return db.Put(dbutils.SyncStageUnwind, []byte(stage), marshalData(invalidation)) } +// SaveStagePrune saves the progress of the given stage pruning in the database +func SaveStagePrune(db ethdb.Putter, stage SyncStage, invalidation uint64) error { + return db.Put(dbutils.SyncStageUnwind, []byte(stage), marshalData(invalidation)) +} + func marshalData(blockNumber uint64) []byte { return encodeBigEndian(blockNumber) } diff --git a/eth/stagedsync/state.go b/eth/stagedsync/state.go index a1f1283b0cd93dc332e425192ffa59e5882b9a1b..19c62ed5160318a684dec354994221ad2acf35bd 100644 --- a/eth/stagedsync/state.go +++ b/eth/stagedsync/state.go @@ -78,7 +78,7 @@ func (s *State) UnwindTo(blockNumber uint64, tx ethdb.RwTx, badBlock common.Hash if stage.Disabled { continue } - if err := s.unwindStack.Add(UnwindState{stage.ID, blockNumber, badBlock}, tx); err != nil { + if err := s.unwindStack.Add(UnwindState{stage.ID, blockNumber, badBlock, s}, tx); err != nil { return err } } @@ -124,11 +124,12 @@ func (s *State) StageByID(id stages.SyncStage) (*Stage, error) { } func NewState(stagesList []*Stage) *State { - return &State{ + st := &State{ stages: stagesList, currentStage: 0, - unwindStack: NewPersistentUnwindStack(), } + st.unwindStack = NewPersistentUnwindStack(st) + return st } func (s *State) LoadUnwindInfo(db ethdb.KVGetter) error { @@ -257,7 +258,7 @@ func (s *State) runStage(stage *Stage, db ethdb.RwKV, tx ethdb.RwTx, firstCycle start := time.Now() logPrefix := s.LogPrefix() - if err = stage.ExecFunc(firstCycle, stageState, s, tx); err != nil { + if err = stage.Forward(firstCycle, stageState, s, tx); err != nil { return err } @@ -284,7 +285,7 @@ func (s *State) unwindStage(firstCycle bool, unwind *UnwindState, db ethdb.RwKV, if err != nil { return err } - if stage.UnwindFunc == nil { + if stage.Unwind == nil { return nil } var stageState *StageState @@ -304,7 +305,7 @@ func (s *State) unwindStage(firstCycle bool, unwind *UnwindState, db ethdb.RwKV, tx = nil } - err = stage.UnwindFunc(firstCycle, unwind, stageState, tx) + err = stage.Unwind(firstCycle, unwind, stageState, tx) if err != nil { return err } @@ -346,7 +347,7 @@ func (s *State) EnableStages(ids ...stages.SyncStage) { func (s *State) MockExecFunc(id stages.SyncStage, f ExecFunc) { for i := range s.stages { if s.stages[i].ID == id { - s.stages[i].ExecFunc = f + s.stages[i].Forward = f } } } diff --git a/eth/stagedsync/state_test.go b/eth/stagedsync/state_test.go index 44de5c0ac500b3e51e2636137680f008347100bd..8d765dfdc2748251e14dce2f209e2cbf71f688a2 100644 --- a/eth/stagedsync/state_test.go +++ b/eth/stagedsync/state_test.go @@ -17,7 +17,7 @@ func TestStateStagesSuccess(t *testing.T) { { ID: stages.Headers, Description: "Downloading headers", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Headers) s.Done() return nil @@ -26,7 +26,7 @@ func TestStateStagesSuccess(t *testing.T) { { ID: stages.Bodies, Description: "Downloading block bodiess", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Bodies) s.Done() return nil @@ -35,7 +35,7 @@ func TestStateStagesSuccess(t *testing.T) { { ID: stages.Senders, Description: "Recovering senders from tx signatures", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Senders) s.Done() return nil @@ -59,7 +59,7 @@ func TestStateDisabledStages(t *testing.T) { { ID: stages.Headers, Description: "Downloading headers", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Headers) s.Done() return nil @@ -68,7 +68,7 @@ func TestStateDisabledStages(t *testing.T) { { ID: stages.Bodies, Description: "Downloading block bodiess", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Bodies) s.Done() return nil @@ -78,7 +78,7 @@ func TestStateDisabledStages(t *testing.T) { { ID: stages.Senders, Description: "Recovering senders from tx signatures", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Senders) s.Done() return nil @@ -103,7 +103,7 @@ func TestStateRepeatedStage(t *testing.T) { { ID: stages.Headers, Description: "Downloading headers", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Headers) s.Done() return nil @@ -112,7 +112,7 @@ func TestStateRepeatedStage(t *testing.T) { { ID: stages.Bodies, Description: "Downloading block bodiess", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Bodies) repeatStageTwo-- if repeatStageTwo < 0 { @@ -124,7 +124,7 @@ func TestStateRepeatedStage(t *testing.T) { { ID: stages.Senders, Description: "Recovering senders from tx signatures", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Senders) s.Done() return nil @@ -149,7 +149,7 @@ func TestStateErroredStage(t *testing.T) { { ID: stages.Headers, Description: "Downloading headers", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Headers) s.Done() return nil @@ -158,7 +158,7 @@ func TestStateErroredStage(t *testing.T) { { ID: stages.Bodies, Description: "Downloading block bodiess", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Bodies) s.Done() return expectedErr @@ -167,7 +167,7 @@ func TestStateErroredStage(t *testing.T) { { ID: stages.Senders, Description: "Recovering senders from tx signatures", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Senders) s.Done() return nil @@ -193,7 +193,7 @@ func TestStateUnwindSomeStagesBehindUnwindPoint(t *testing.T) { { ID: stages.Headers, Description: "Downloading headers", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Headers) if s.BlockNumber == 0 { return s.DoneAndUpdate(tx, 2000) @@ -201,7 +201,7 @@ func TestStateUnwindSomeStagesBehindUnwindPoint(t *testing.T) { s.Done() return nil }, - UnwindFunc: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { + Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { flow = append(flow, unwindOf(stages.Headers)) return u.Done(tx) }, @@ -209,7 +209,7 @@ func TestStateUnwindSomeStagesBehindUnwindPoint(t *testing.T) { { ID: stages.Bodies, Description: "Downloading block bodiess", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Bodies) if s.BlockNumber == 0 { return s.DoneAndUpdate(tx, 1000) @@ -217,7 +217,7 @@ func TestStateUnwindSomeStagesBehindUnwindPoint(t *testing.T) { s.Done() return nil }, - UnwindFunc: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { + Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { flow = append(flow, unwindOf(stages.Bodies)) return u.Done(tx) }, @@ -225,7 +225,7 @@ func TestStateUnwindSomeStagesBehindUnwindPoint(t *testing.T) { { ID: stages.Senders, Description: "Recovering senders from tx signatures", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { if s.BlockNumber == 0 { if err := s.Update(tx, 1700); err != nil { return err @@ -239,7 +239,7 @@ func TestStateUnwindSomeStagesBehindUnwindPoint(t *testing.T) { s.Done() return nil }, - UnwindFunc: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { + Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { flow = append(flow, unwindOf(stages.Senders)) return u.Done(tx) }, @@ -247,7 +247,7 @@ func TestStateUnwindSomeStagesBehindUnwindPoint(t *testing.T) { { ID: stages.IntermediateHashes, Disabled: true, - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.IntermediateHashes) if s.BlockNumber == 0 { return s.DoneAndUpdate(tx, 2000) @@ -255,7 +255,7 @@ func TestStateUnwindSomeStagesBehindUnwindPoint(t *testing.T) { s.Done() return nil }, - UnwindFunc: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { + Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { flow = append(flow, unwindOf(stages.IntermediateHashes)) return u.Done(tx) }, @@ -295,7 +295,7 @@ func TestStateUnwind(t *testing.T) { { ID: stages.Headers, Description: "Downloading headers", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Headers) if s.BlockNumber == 0 { return s.DoneAndUpdate(tx, 2000) @@ -303,7 +303,7 @@ func TestStateUnwind(t *testing.T) { s.Done() return nil }, - UnwindFunc: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { + Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { flow = append(flow, unwindOf(stages.Headers)) return u.Done(tx) }, @@ -311,7 +311,7 @@ func TestStateUnwind(t *testing.T) { { ID: stages.Bodies, Description: "Downloading block bodiess", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Bodies) if s.BlockNumber == 0 { return s.DoneAndUpdate(tx, 2000) @@ -319,7 +319,7 @@ func TestStateUnwind(t *testing.T) { s.Done() return nil }, - UnwindFunc: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { + Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { flow = append(flow, unwindOf(stages.Bodies)) return u.Done(tx) }, @@ -327,7 +327,7 @@ func TestStateUnwind(t *testing.T) { { ID: stages.Senders, Description: "Recovering senders from tx signatures", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Senders) if !unwound { unwound = true @@ -340,7 +340,7 @@ func TestStateUnwind(t *testing.T) { s.Done() return nil }, - UnwindFunc: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { + Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { flow = append(flow, unwindOf(stages.Senders)) return u.Done(tx) }, @@ -348,7 +348,7 @@ func TestStateUnwind(t *testing.T) { { ID: stages.IntermediateHashes, Disabled: true, - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.IntermediateHashes) if s.BlockNumber == 0 { return s.DoneAndUpdate(tx, 2000) @@ -356,7 +356,7 @@ func TestStateUnwind(t *testing.T) { s.Done() return nil }, - UnwindFunc: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { + Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { flow = append(flow, unwindOf(stages.IntermediateHashes)) return u.Done(tx) }, @@ -397,7 +397,7 @@ func TestStateUnwindEmptyUnwinder(t *testing.T) { { ID: stages.Headers, Description: "Downloading headers", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Headers) if s.BlockNumber == 0 { return s.DoneAndUpdate(tx, 2000) @@ -405,7 +405,7 @@ func TestStateUnwindEmptyUnwinder(t *testing.T) { s.Done() return nil }, - UnwindFunc: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { + Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { flow = append(flow, unwindOf(stages.Headers)) return u.Done(tx) }, @@ -413,7 +413,7 @@ func TestStateUnwindEmptyUnwinder(t *testing.T) { { ID: stages.Bodies, Description: "Downloading block bodiess", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Bodies) if s.BlockNumber == 0 { return s.DoneAndUpdate(tx, 2000) @@ -425,7 +425,7 @@ func TestStateUnwindEmptyUnwinder(t *testing.T) { { ID: stages.Senders, Description: "Recovering senders from tx signatures", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Senders) if !unwound { unwound = true @@ -438,7 +438,7 @@ func TestStateUnwindEmptyUnwinder(t *testing.T) { s.Done() return nil }, - UnwindFunc: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { + Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { flow = append(flow, unwindOf(stages.Senders)) return u.Done(tx) }, @@ -478,7 +478,7 @@ func TestStateSyncDoTwice(t *testing.T) { { ID: stages.Headers, Description: "Downloading headers", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Headers) return s.DoneAndUpdate(tx, s.BlockNumber+100) }, @@ -486,7 +486,7 @@ func TestStateSyncDoTwice(t *testing.T) { { ID: stages.Bodies, Description: "Downloading block bodiess", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Bodies) return s.DoneAndUpdate(tx, s.BlockNumber+200) }, @@ -494,7 +494,7 @@ func TestStateSyncDoTwice(t *testing.T) { { ID: stages.Senders, Description: "Recovering senders from tx signatures", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Senders) return s.DoneAndUpdate(tx, s.BlockNumber+300) }, @@ -536,7 +536,7 @@ func TestStateSyncInterruptRestart(t *testing.T) { { ID: stages.Headers, Description: "Downloading headers", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Headers) s.Done() return nil @@ -545,7 +545,7 @@ func TestStateSyncInterruptRestart(t *testing.T) { { ID: stages.Bodies, Description: "Downloading block bodiess", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Bodies) s.Done() return expectedErr @@ -554,7 +554,7 @@ func TestStateSyncInterruptRestart(t *testing.T) { { ID: stages.Senders, Description: "Recovering senders from tx signatures", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Senders) s.Done() return nil @@ -591,7 +591,7 @@ func TestStateSyncInterruptLongUnwind(t *testing.T) { { ID: stages.Headers, Description: "Downloading headers", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Headers) if s.BlockNumber == 0 { return s.DoneAndUpdate(tx, 2000) @@ -599,7 +599,7 @@ func TestStateSyncInterruptLongUnwind(t *testing.T) { s.Done() return nil }, - UnwindFunc: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { + Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { flow = append(flow, unwindOf(stages.Headers)) return u.Done(tx) }, @@ -607,7 +607,7 @@ func TestStateSyncInterruptLongUnwind(t *testing.T) { { ID: stages.Bodies, Description: "Downloading block bodiess", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Bodies) if s.BlockNumber == 0 { return s.DoneAndUpdate(tx, 2000) @@ -615,7 +615,7 @@ func TestStateSyncInterruptLongUnwind(t *testing.T) { s.Done() return nil }, - UnwindFunc: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { + Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { flow = append(flow, unwindOf(stages.Bodies)) return u.Done(tx) }, @@ -623,7 +623,7 @@ func TestStateSyncInterruptLongUnwind(t *testing.T) { { ID: stages.Senders, Description: "Recovering senders from tx signatures", - ExecFunc: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { + Forward: func(firstCycle bool, s *StageState, u Unwinder, tx ethdb.RwTx) error { flow = append(flow, stages.Senders) if !unwound { unwound = true @@ -636,7 +636,7 @@ func TestStateSyncInterruptLongUnwind(t *testing.T) { s.Done() return nil }, - UnwindFunc: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { + Unwind: func(firstCycle bool, u *UnwindState, s *StageState, tx ethdb.RwTx) error { flow = append(flow, unwindOf(stages.Senders)) if !interrupted { interrupted = true diff --git a/eth/stagedsync/unwind.go b/eth/stagedsync/unwind.go index 9cd759415d06954a57ea3d40fb25098df73c878c..79ca0c3722f1f16d98378590e6807b854a8040c2 100644 --- a/eth/stagedsync/unwind.go +++ b/eth/stagedsync/unwind.go @@ -19,9 +19,12 @@ type UnwindState struct { UnwindPoint uint64 // If unwind is caused by a bad block, this hash is not empty BadBlock common.Hash + state *State } -// Done() updates the DB state of the stage. +func (u *UnwindState) LogPrefix() string { return u.state.LogPrefix() } + +// Done updates the DB state of the stage. func (u *UnwindState) Done(db ethdb.Putter) error { err := stages.SaveStageProgress(db, u.ID, u.UnwindPoint) if err != nil { @@ -30,17 +33,18 @@ func (u *UnwindState) Done(db ethdb.Putter) error { return stages.SaveStageUnwind(db, u.ID, 0) } -// Skip() ignores the unwind +// Skip ignores the unwind func (u *UnwindState) Skip(db ethdb.Putter) error { return stages.SaveStageUnwind(db, u.ID, 0) } type PersistentUnwindStack struct { unwindStack []UnwindState + state *State } -func NewPersistentUnwindStack() *PersistentUnwindStack { - return &PersistentUnwindStack{make([]UnwindState, 0)} +func NewPersistentUnwindStack(state *State) *PersistentUnwindStack { + return &PersistentUnwindStack{make([]UnwindState, 0), state} } func (s *PersistentUnwindStack) AddFromDB(db ethdb.KVGetter, stageID stages.SyncStage) error { @@ -62,7 +66,7 @@ func (s *PersistentUnwindStack) LoadFromDB(db ethdb.KVGetter, stageID stages.Syn return nil, err } if unwindPoint > 0 { - return &UnwindState{stageID, unwindPoint, common.Hash{}}, nil + return &UnwindState{stageID, unwindPoint, common.Hash{}, s.state}, nil } return nil, nil } @@ -91,3 +95,22 @@ func (s *PersistentUnwindStack) Pop() *UnwindState { s.unwindStack = s.unwindStack[:len(s.unwindStack)-1] return &unwind } + +// PruneState contains the information about unwind. +type PruneState struct { + ID stages.SyncStage + PrunePoint uint64 // PrunePoint is the block to prune to. + state *State +} + +func (u *PruneState) LogPrefix() string { return u.state.LogPrefix() } + +// Done updates the DB state of the stage. +func (u *PruneState) Done(db ethdb.Putter) error { + return stages.SaveStagePrune(db, u.ID, 0) +} + +// Skip ignores the prune +func (u *PruneState) Skip(db ethdb.Putter) error { + return stages.SaveStagePrune(db, u.ID, 0) +} diff --git a/eth/stagedsync/unwind_test.go b/eth/stagedsync/unwind_test.go index aa4a14098dba523c3f6a3fd81c494059ffad1aa2..f7467deec99e8857b6b49eb584a0abb0dcc7dbf6 100644 --- a/eth/stagedsync/unwind_test.go +++ b/eth/stagedsync/unwind_test.go @@ -12,16 +12,16 @@ import ( func TestUnwindStackLoadFromDb(t *testing.T) { _, tx := kv.NewTestTx(t) - stack := NewPersistentUnwindStack() + stack := NewPersistentUnwindStack(nil) stages := []stages.SyncStage{stages.Bodies, stages.Headers, stages.Execution} points := []uint64{10, 20, 30} for i := range stages { - err := stack.Add(UnwindState{stages[i], points[i], common.Hash{}}, tx) + err := stack.Add(UnwindState{stages[i], points[i], common.Hash{}, nil}, tx) assert.NoError(t, err) } - stack2 := NewPersistentUnwindStack() + stack2 := NewPersistentUnwindStack(nil) for i := range stages { err := stack2.AddFromDB(tx, stages[i]) assert.NoError(t, err) @@ -34,12 +34,12 @@ func TestUnwindStackLoadFromDb(t *testing.T) { func TestUnwindStackLoadFromDbAfterDone(t *testing.T) { _, tx := kv.NewTestTx(t) - stack := NewPersistentUnwindStack() + stack := NewPersistentUnwindStack(nil) stages := []stages.SyncStage{stages.Bodies, stages.Headers, stages.Execution} points := []uint64{10, 20, 30} for i := range stages { - err := stack.Add(UnwindState{stages[i], points[i], common.Hash{}}, tx) + err := stack.Add(UnwindState{stages[i], points[i], common.Hash{}, nil}, tx) assert.NoError(t, err) } @@ -48,7 +48,7 @@ func TestUnwindStackLoadFromDbAfterDone(t *testing.T) { err := u.Done(tx) assert.NoError(t, err) - stack2 := NewPersistentUnwindStack() + stack2 := NewPersistentUnwindStack(nil) for i := range stages { err := stack2.AddFromDB(tx, stages[i]) assert.NoError(t, err) @@ -61,19 +61,19 @@ func TestUnwindStackLoadFromDbAfterDone(t *testing.T) { func TestUnwindStackLoadFromDbNoDone(t *testing.T) { _, tx := kv.NewTestTx(t) - stack := NewPersistentUnwindStack() + stack := NewPersistentUnwindStack(nil) stages := []stages.SyncStage{stages.Bodies, stages.Headers, stages.Execution} points := []uint64{10, 20, 30} for i := range stages { - err := stack.Add(UnwindState{stages[i], points[i], common.Hash{}}, tx) + err := stack.Add(UnwindState{stages[i], points[i], common.Hash{}, nil}, tx) assert.NoError(t, err) } u := stack.Pop() assert.NotNil(t, u) - stack2 := NewPersistentUnwindStack() + stack2 := NewPersistentUnwindStack(nil) for i := range stages { err := stack2.AddFromDB(tx, stages[i]) assert.NoError(t, err) @@ -86,14 +86,14 @@ func TestUnwindStackLoadFromDbNoDone(t *testing.T) { func TestUnwindStackPopAndEmpty(t *testing.T) { _, tx := kv.NewTestTx(t) - stack := NewPersistentUnwindStack() + stack := NewPersistentUnwindStack(nil) stages := []stages.SyncStage{stages.Bodies, stages.Headers, stages.Execution} points := []uint64{10, 20, 30} for i := range stages { - err := stack.Add(UnwindState{stages[i], points[i], common.Hash{}}, tx) + err := stack.Add(UnwindState{stages[i], points[i], common.Hash{}, nil}, tx) assert.NoError(t, err) } @@ -119,20 +119,20 @@ func TestUnwindStackPopAndEmpty(t *testing.T) { func TestUnwindOverrideWithLower(t *testing.T) { _, tx := kv.NewTestTx(t) - stack := NewPersistentUnwindStack() + stack := NewPersistentUnwindStack(nil) stages := []stages.SyncStage{stages.Bodies, stages.Headers, stages.Execution} points := []uint64{10, 20, 30} for i := range stages { - err := stack.Add(UnwindState{stages[i], points[i], common.Hash{}}, tx) + err := stack.Add(UnwindState{stages[i], points[i], common.Hash{}, nil}, tx) assert.NoError(t, err) } assert.Equal(t, 3, len(stack.unwindStack)) - err := stack.Add(UnwindState{stages[0], 5, common.Hash{}}, tx) + err := stack.Add(UnwindState{stages[0], 5, common.Hash{}, nil}, tx) assert.NoError(t, err) // we append if the next unwind is to the lower block @@ -142,20 +142,20 @@ func TestUnwindOverrideWithLower(t *testing.T) { func TestUnwindOverrideWithHigher(t *testing.T) { _, tx := kv.NewTestTx(t) - stack := NewPersistentUnwindStack() + stack := NewPersistentUnwindStack(nil) stages := []stages.SyncStage{stages.Bodies, stages.Headers, stages.Execution} points := []uint64{10, 20, 30} for i := range stages { - err := stack.Add(UnwindState{stages[i], points[i], common.Hash{}}, tx) + err := stack.Add(UnwindState{stages[i], points[i], common.Hash{}, nil}, tx) assert.NoError(t, err) } assert.Equal(t, 3, len(stack.unwindStack)) - err := stack.Add(UnwindState{stages[0], 105, common.Hash{}}, tx) + err := stack.Add(UnwindState{stages[0], 105, common.Hash{}, nil}, tx) assert.NoError(t, err) // we ignore if next unwind is to the higher block @@ -165,18 +165,18 @@ func TestUnwindOverrideWithHigher(t *testing.T) { func TestUnwindOverrideWithTheSame(t *testing.T) { _, tx := kv.NewTestTx(t) - stack := NewPersistentUnwindStack() + stack := NewPersistentUnwindStack(nil) stages := []stages.SyncStage{stages.Bodies, stages.Headers, stages.Execution} points := []uint64{10, 20, 30} for i := range stages { - err := stack.Add(UnwindState{stages[i], points[i], common.Hash{}}, tx) + err := stack.Add(UnwindState{stages[i], points[i], common.Hash{}, nil}, tx) assert.NoError(t, err) } assert.Equal(t, 3, len(stack.unwindStack)) - err := stack.Add(UnwindState{stages[0], 10, common.Hash{}}, tx) + err := stack.Add(UnwindState{stages[0], 10, common.Hash{}, nil}, tx) assert.NoError(t, err) // we ignore if next unwind is to the higher block