From 766655205f24875e43e84d2f9271af2aefef641a Mon Sep 17 00:00:00 2001 From: Alex Sharov <AskAlexSharov@gmail.com> Date: Sat, 17 Jul 2021 23:07:09 +0700 Subject: [PATCH] Remove sync.Prepare() func (#2389) --- cmd/integration/commands/stages.go | 24 ++---------- eth/backend.go | 6 +-- eth/stagedsync/stage.go | 2 +- eth/stagedsync/stagedsync.go | 30 --------------- eth/stagedsync/state.go | 61 ++++++++++++++++++------------ eth/stagedsync/state_test.go | 29 ++++++-------- eth/stagedsync/unwind.go | 4 +- turbo/stages/mock_sentry.go | 4 +- turbo/stages/stageloop.go | 24 ++++-------- 9 files changed, 69 insertions(+), 115 deletions(-) diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index 4e501203d5..43f058a4b1 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -743,7 +743,7 @@ func byChain() (*core.Genesis, *params.ChainConfig) { return genesis, chainConfig } -func newSync(ctx context.Context, db ethdb.RwKV, miningConfig *params.MiningConfig) (ethdb.StorageMode, consensus.Engine, *params.ChainConfig, *vm.Config, *core.TxPool, *stagedsync.State, *stagedsync.State, stagedsync.MiningState) { +func newSync(ctx context.Context, db ethdb.RwKV, miningConfig *params.MiningConfig) (ethdb.StorageMode, consensus.Engine, *params.ChainConfig, *vm.Config, *core.TxPool, *stagedsync.Sync, *stagedsync.Sync, stagedsync.MiningState) { tmpdir := path.Join(datadir, etl.TmpDirName) snapshotDir = path.Join(datadir, "erigon", "snapshot") @@ -806,7 +806,7 @@ func newSync(ctx context.Context, db ethdb.RwKV, miningConfig *params.MiningConf cfg.Miner = *miningConfig } - st, err := stages2.NewStagedSync2(context.Background(), db, cfg, + sync, err := stages2.NewStagedSync2(context.Background(), db, cfg, downloadServer, tmpdir, txPool, @@ -818,7 +818,7 @@ func newSync(ctx context.Context, db ethdb.RwKV, miningConfig *params.MiningConf } miner := stagedsync.NewMiningState(&cfg.Miner) - stMining := stagedsync.New( + miningSync := stagedsync.New( stagedsync.MiningStages(ctx, stagedsync.StageMiningCreateBlockCfg(db, miner, *chainConfig, engine, txPool, tmpdir), stagedsync.StageMiningExecCfg(db, miner, events, *chainConfig, engine, &vm.Config{}, tmpdir), @@ -829,22 +829,6 @@ func newSync(ctx context.Context, db ethdb.RwKV, miningConfig *params.MiningConf stagedsync.MiningUnwindOrder(), ) - var sync *stagedsync.State - var miningSync *stagedsync.State - if err := db.View(context.Background(), func(tx ethdb.Tx) (err error) { - sync, err = st.Prepare() - if err != nil { - return nil - } - miningSync, err = stMining.Prepare() - if err != nil { - return nil - } - return nil - }); err != nil { - panic(err) - } - return sm, engine, chainConfig, vmConfig, txPool, sync, miningSync, miner } @@ -856,7 +840,7 @@ func progress(tx ethdb.KVGetter, stage stages.SyncStage) uint64 { return res } -func stage(st *stagedsync.State, db ethdb.KVGetter, stage stages.SyncStage) *stagedsync.StageState { +func stage(st *stagedsync.Sync, db ethdb.KVGetter, stage stages.SyncStage) *stagedsync.StageState { res, err := st.StageState(stage, db) if err != nil { panic(err) diff --git a/eth/backend.go b/eth/backend.go index 5fc9221f59..46fe263a3e 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -109,7 +109,7 @@ type Ethereum struct { sentryServers []*download.SentryServerImpl txPoolP2PServer *txpool.P2PServer sentries []remote.SentryClient - stagedSync *stagedsync.StagedSync + stagedSync *stagedsync.Sync notifications *stagedsync.Notifications @@ -537,7 +537,7 @@ func (s *Ethereum) shouldPreserve(block *types.Block) bool { //nolint // StartMining starts the miner with the given number of CPU threads. If mining // is already running, this method adjust the number of threads allowed to use // and updates the minimum price required by the transaction pool. -func (s *Ethereum) StartMining(ctx context.Context, kv ethdb.RwKV, mining *stagedsync.StagedSync, cfg params.MiningConfig, gasPrice *uint256.Int, quitCh chan struct{}) error { +func (s *Ethereum) StartMining(ctx context.Context, kv ethdb.RwKV, mining *stagedsync.Sync, cfg params.MiningConfig, gasPrice *uint256.Int, quitCh chan struct{}) error { if !cfg.Enabled { return nil } @@ -705,7 +705,7 @@ func (s *Ethereum) Stop() error { //Deprecated - use stages.StageLoop func Loop( ctx context.Context, - db ethdb.RwKV, sync *stagedsync.StagedSync, + db ethdb.RwKV, sync *stagedsync.Sync, controlServer *download.ControlServerImpl, notifications *stagedsync.Notifications, waitForDone chan struct{}, diff --git a/eth/stagedsync/stage.go b/eth/stagedsync/stage.go index ca56254c76..ebc80f0746 100644 --- a/eth/stagedsync/stage.go +++ b/eth/stagedsync/stage.go @@ -38,7 +38,7 @@ type Stage struct { // StageState is the state of the stage. type StageState struct { - state *State + state *Sync ID stages.SyncStage BlockNumber uint64 // BlockNumber is the current block number of the stage at the beginning of the state execution. } diff --git a/eth/stagedsync/stagedsync.go b/eth/stagedsync/stagedsync.go index b58c37db2d..a66ba101af 100644 --- a/eth/stagedsync/stagedsync.go +++ b/eth/stagedsync/stagedsync.go @@ -1,31 +1 @@ package stagedsync - -type StagedSync struct { - stages []*Stage - unwindOrder UnwindOrder - Notifier ChainEventNotifier -} - -func New(stages []*Stage, unwindOrder UnwindOrder) *StagedSync { - return &StagedSync{ - stages: stages, - unwindOrder: unwindOrder, - } -} - -func (stagedSync *StagedSync) Prepare() (*State, error) { - state := NewState(stagedSync.stages) - - state.unwindOrder = make([]*Stage, len(stagedSync.unwindOrder)) - - for i, stageIndex := range stagedSync.unwindOrder { - for _, s := range stagedSync.stages { - if s.ID == stageIndex { - state.unwindOrder[i] = s - break - } - } - } - - return state, nil -} diff --git a/eth/stagedsync/state.go b/eth/stagedsync/state.go index 690a9abcba..f2dbd892af 100644 --- a/eth/stagedsync/state.go +++ b/eth/stagedsync/state.go @@ -15,7 +15,7 @@ import ( "github.com/ledgerwatch/erigon/log" ) -type State struct { +type Sync struct { unwindPoint *uint64 // used to run stages prevUnwindPoint *uint64 // used to get value from outside of staged sync after cycle (for example to notify RPCDaemon) badBlock common.Hash @@ -25,14 +25,14 @@ type State struct { currentStage uint } -func (s *State) Len() int { return len(s.stages) } -func (s *State) PrevUnwindPoint() *uint64 { return s.prevUnwindPoint } +func (s *Sync) Len() int { return len(s.stages) } +func (s *Sync) PrevUnwindPoint() *uint64 { return s.prevUnwindPoint } -func (s *State) NewUnwindState(id stages.SyncStage, unwindPoint, currentProgress uint64) *UnwindState { +func (s *Sync) NewUnwindState(id stages.SyncStage, unwindPoint, currentProgress uint64) *UnwindState { return &UnwindState{id, unwindPoint, currentProgress, common.Hash{}, s} } -func (s *State) NextStage() { +func (s *Sync) NextStage() { if s == nil { return } @@ -40,7 +40,7 @@ func (s *State) NextStage() { } // IsBefore returns true if stage1 goes before stage2 in staged sync -func (s *State) IsBefore(stage1, stage2 stages.SyncStage) bool { +func (s *Sync) IsBefore(stage1, stage2 stages.SyncStage) bool { idx1 := -1 idx2 := -1 for i, stage := range s.stages { @@ -57,7 +57,7 @@ func (s *State) IsBefore(stage1, stage2 stages.SyncStage) bool { } // IsAfter returns true if stage1 goes after stage2 in staged sync -func (s *State) IsAfter(stage1, stage2 stages.SyncStage) bool { +func (s *Sync) IsAfter(stage1, stage2 stages.SyncStage) bool { idx1 := -1 idx2 := -1 for i, stage := range s.stages { @@ -73,33 +73,33 @@ func (s *State) IsAfter(stage1, stage2 stages.SyncStage) bool { return idx1 > idx2 } -func (s *State) GetLocalHeight(db ethdb.KVGetter) (uint64, error) { +func (s *Sync) GetLocalHeight(db ethdb.KVGetter) (uint64, error) { state, err := s.StageState(stages.Headers, db) return state.BlockNumber, err } -func (s *State) UnwindTo(unwindPoint uint64, badBlock common.Hash) { +func (s *Sync) UnwindTo(unwindPoint uint64, badBlock common.Hash) { log.Info("UnwindTo", "block", unwindPoint, "bad_block_hash", badBlock.String()) s.unwindPoint = &unwindPoint s.badBlock = badBlock } -func (s *State) IsDone() bool { +func (s *Sync) IsDone() bool { return s.currentStage >= uint(len(s.stages)) && s.unwindPoint == nil } -func (s *State) CurrentStage() (uint, *Stage) { +func (s *Sync) CurrentStage() (uint, *Stage) { return s.currentStage, s.stages[s.currentStage] } -func (s *State) LogPrefix() string { +func (s *Sync) LogPrefix() string { if s == nil { return "" } return fmt.Sprintf("%d/%d %s", s.currentStage+1, s.Len(), s.stages[s.currentStage].ID) } -func (s *State) SetCurrentStage(id stages.SyncStage) error { +func (s *Sync) SetCurrentStage(id stages.SyncStage) error { for i, stage := range s.stages { if stage.ID == id { s.currentStage = uint(i) @@ -109,7 +109,7 @@ func (s *State) SetCurrentStage(id stages.SyncStage) error { return fmt.Errorf("stage not found with id: %v", id) } -func (s *State) StageByID(id stages.SyncStage) (*Stage, error) { +func (s *Sync) StageByID(id stages.SyncStage) (*Stage, error) { for _, stage := range s.stages { if stage.ID == id { return stage, nil @@ -118,16 +118,28 @@ func (s *State) StageByID(id stages.SyncStage) (*Stage, error) { return nil, fmt.Errorf("stage not found with id: %v", id) } -func NewState(stagesList []*Stage) *State { - st := &State{ +func New(stagesList []*Stage, unwindOrder []stages.SyncStage) *Sync { + unwindStages := make([]*Stage, len(stagesList)) + + for i, stageIndex := range unwindOrder { + for _, s := range stagesList { + if s.ID == stageIndex { + unwindStages[i] = s + break + } + } + } + + st := &Sync{ stages: stagesList, currentStage: 0, + unwindOrder: unwindStages, } return st } -func (s *State) StageState(stage stages.SyncStage, db ethdb.KVGetter) (*StageState, error) { +func (s *Sync) StageState(stage stages.SyncStage, db ethdb.KVGetter) (*StageState, error) { blockNum, err := stages.GetStageProgress(db, stage) if err != nil { return nil, err @@ -135,7 +147,7 @@ func (s *State) StageState(stage stages.SyncStage, db ethdb.KVGetter) (*StageSta return &StageState{s, stage, blockNum}, nil } -func (s *State) Run(db ethdb.RwKV, tx ethdb.RwTx, firstCycle bool) error { +func (s *Sync) Run(db ethdb.RwKV, tx ethdb.RwTx, firstCycle bool) error { var timings []interface{} for !s.IsDone() { if s.unwindPoint != nil { @@ -190,6 +202,7 @@ func (s *State) Run(db ethdb.RwKV, tx ethdb.RwTx, firstCycle bool) error { if err := printLogs(tx, timings); err != nil { return err } + s.currentStage = 0 return nil } @@ -227,7 +240,7 @@ func printLogs(tx ethdb.RwTx, timings []interface{}) error { return nil } -func (s *State) runStage(stage *Stage, db ethdb.RwKV, tx ethdb.RwTx, firstCycle bool) error { +func (s *Sync) runStage(stage *Stage, db ethdb.RwKV, tx ethdb.RwTx, firstCycle bool) error { useExternalTx := tx != nil if !useExternalTx { var err error @@ -260,7 +273,7 @@ func (s *State) runStage(stage *Stage, db ethdb.RwKV, tx ethdb.RwTx, firstCycle return nil } -func (s *State) unwindStage(firstCycle bool, stageID stages.SyncStage, db ethdb.RwKV, tx ethdb.RwTx) error { +func (s *Sync) unwindStage(firstCycle bool, stageID stages.SyncStage, db ethdb.RwKV, tx ethdb.RwTx) error { useExternalTx := tx != nil if !useExternalTx { var err error @@ -307,13 +320,13 @@ func (s *State) unwindStage(firstCycle bool, stageID stages.SyncStage, db ethdb. return nil } -func (s *State) DisableAllStages() { +func (s *Sync) DisableAllStages() { for i := range s.stages { s.stages[i].Disabled = true } } -func (s *State) DisableStages(ids ...stages.SyncStage) { +func (s *Sync) DisableStages(ids ...stages.SyncStage) { for i := range s.stages { for _, id := range ids { if s.stages[i].ID != id { @@ -324,7 +337,7 @@ func (s *State) DisableStages(ids ...stages.SyncStage) { } } -func (s *State) EnableStages(ids ...stages.SyncStage) { +func (s *Sync) EnableStages(ids ...stages.SyncStage) { for i := range s.stages { for _, id := range ids { if s.stages[i].ID != id { @@ -335,7 +348,7 @@ func (s *State) EnableStages(ids ...stages.SyncStage) { } } -func (s *State) MockExecFunc(id stages.SyncStage, f ExecFunc) { +func (s *Sync) MockExecFunc(id stages.SyncStage, f ExecFunc) { for i := range s.stages { if s.stages[i].ID == id { s.stages[i].Forward = f diff --git a/eth/stagedsync/state_test.go b/eth/stagedsync/state_test.go index d5a9a31220..19591fa166 100644 --- a/eth/stagedsync/state_test.go +++ b/eth/stagedsync/state_test.go @@ -42,7 +42,7 @@ func TestStateStagesSuccess(t *testing.T) { }, }, } - state := NewState(s) + state := New(s, nil) db, tx := kv.NewTestTx(t) err := state.Run(db, tx, true) assert.NoError(t, err) @@ -85,7 +85,7 @@ func TestStateDisabledStages(t *testing.T) { }, }, } - state := NewState(s) + state := New(s, nil) db, tx := kv.NewTestTx(t) err := state.Run(db, tx, true) assert.NoError(t, err) @@ -131,7 +131,7 @@ func TestStateRepeatedStage(t *testing.T) { }, }, } - state := NewState(s) + state := New(s, nil) db, tx := kv.NewTestTx(t) err := state.Run(db, tx, true) assert.NoError(t, err) @@ -174,8 +174,7 @@ func TestStateErroredStage(t *testing.T) { }, }, } - state := NewState(s) - state.unwindOrder = []*Stage{s[0], s[1], s[2]} + state := New(s, []stages.SyncStage{s[0].ID, s[1].ID, s[2].ID}) db, tx := kv.NewTestTx(t) err := state.Run(db, tx, true) assert.Equal(t, expectedErr, err) @@ -262,8 +261,7 @@ func TestStateUnwindSomeStagesBehindUnwindPoint(t *testing.T) { }, }, } - state := NewState(s) - state.unwindOrder = []*Stage{s[0], s[1], s[2], s[3]} + state := New(s, []stages.SyncStage{s[0].ID, s[1].ID, s[2].ID, s[3].ID}) db, tx := kv.NewTestTx(t) err := state.Run(db, tx, true) assert.NoError(t, err) @@ -360,8 +358,7 @@ func TestStateUnwind(t *testing.T) { }, }, } - state := NewState(s) - state.unwindOrder = []*Stage{s[0], s[1], s[2], s[3]} + state := New(s, []stages.SyncStage{s[0].ID, s[1].ID, s[2].ID, s[3].ID}) db, tx := kv.NewTestTx(t) err := state.Run(db, tx, true) assert.NoError(t, err) @@ -453,8 +450,7 @@ func TestStateUnwindEmptyUnwinder(t *testing.T) { }, }, } - state := NewState(s) - state.unwindOrder = []*Stage{s[0], s[1], s[2]} + state := New(s, []stages.SyncStage{s[0].ID, s[1].ID, s[2].ID}) db, tx := kv.NewTestTx(t) err := state.Run(db, tx, true) assert.NoError(t, err) @@ -510,12 +506,12 @@ func TestStateSyncDoTwice(t *testing.T) { }, } - state := NewState(s) + state := New(s, nil) db, tx := kv.NewTestTx(t) err := state.Run(db, tx, true) assert.NoError(t, err) - state = NewState(s) + state = New(s, nil) err = state.Run(db, tx, true) assert.NoError(t, err) @@ -571,14 +567,14 @@ func TestStateSyncInterruptRestart(t *testing.T) { }, } - state := NewState(s) + state := New(s, nil) db, tx := kv.NewTestTx(t) err := state.Run(db, tx, true) assert.Equal(t, expectedErr, err) expectedErr = nil - state = NewState(s) + state = New(s, nil) err = state.Run(db, tx, true) assert.NoError(t, err) @@ -653,8 +649,7 @@ func TestStateSyncInterruptLongUnwind(t *testing.T) { }, }, } - state := NewState(s) - state.unwindOrder = []*Stage{s[0], s[1], s[2]} + state := New(s, []stages.SyncStage{s[0].ID, s[1].ID, s[2].ID}) db, tx := kv.NewTestTx(t) err := state.Run(db, tx, true) assert.Error(t, errInterrupted, err) diff --git a/eth/stagedsync/unwind.go b/eth/stagedsync/unwind.go index 360bfad642..5095948e74 100644 --- a/eth/stagedsync/unwind.go +++ b/eth/stagedsync/unwind.go @@ -20,7 +20,7 @@ type UnwindState struct { CurrentBlockNumber uint64 // If unwind is caused by a bad block, this hash is not empty BadBlock common.Hash - state *State + state *Sync } func (u *UnwindState) LogPrefix() string { return u.state.LogPrefix() } @@ -36,7 +36,7 @@ func (u *UnwindState) Skip() {} type PruneState struct { ID stages.SyncStage PrunePoint uint64 // PrunePoint is the block to prune to. - state *State + state *Sync } func (u *PruneState) LogPrefix() string { return u.state.LogPrefix() } diff --git a/turbo/stages/mock_sentry.go b/turbo/stages/mock_sentry.go index 38f3b94bd8..c8585cc17d 100644 --- a/turbo/stages/mock_sentry.go +++ b/turbo/stages/mock_sentry.go @@ -52,8 +52,8 @@ type MockSentry struct { tmpdir string Engine consensus.Engine ChainConfig *params.ChainConfig - Sync *stagedsync.StagedSync - MiningSync *stagedsync.StagedSync + Sync *stagedsync.Sync + MiningSync *stagedsync.Sync PendingBlocks chan *types.Block MinedBlocks chan *types.Block downloader *download.ControlServerImpl diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index 853479cf1f..5e0ea5893e 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -47,7 +47,7 @@ func NewStagedSync( txPool stagedsync.TxPoolCfg, finish stagedsync.FinishCfg, test bool, -) *stagedsync.StagedSync { +) *stagedsync.Sync { return stagedsync.New( stagedsync.DefaultStages(ctx, sm, headers, blockHashes, snapshotHeader, bodies, snapshotBodies, senders, exec, trans, snapshotState, hashState, trieCfg, history, logIndex, callTraces, txLookup, txPool, finish, test), stagedsync.DefaultUnwindOrder(), @@ -58,7 +58,7 @@ func NewStagedSync( func StageLoop( ctx context.Context, db ethdb.RwKV, - sync *stagedsync.StagedSync, + sync *stagedsync.Sync, hd *headerdownload.HeaderDownload, notifications *stagedsync.Notifications, updateHead func(ctx context.Context, head uint64, hash common.Hash, td *uint256.Int), @@ -110,7 +110,7 @@ func StageLoop( func StageLoopStep( ctx context.Context, db ethdb.RwKV, - sync *stagedsync.StagedSync, + sync *stagedsync.Sync, highestSeenHeader uint64, notifications *stagedsync.Notifications, initialCycle bool, @@ -140,10 +140,6 @@ func StageLoopStep( if notifications != nil && notifications.Accumulator != nil { notifications.Accumulator.Reset() } - st, err1 := sync.Prepare() - if err1 != nil { - return fmt.Errorf("prepare staged sync: %w", err1) - } canRunCycleInOneTransaction := !initialCycle && highestSeenHeader-origin < 1024 && highestSeenHeader-hashStateStageProgress < 1024 @@ -156,7 +152,7 @@ func StageLoopStep( defer tx.Rollback() } - err = st.Run(db, tx, initialCycle) + err = sync.Run(db, tx, initialCycle) if err != nil { return err } @@ -202,7 +198,7 @@ func StageLoopStep( } updateHead(ctx, head, headHash, headTd256) - err = stagedsync.NotifyNewHeaders(ctx, finishProgressBefore, st.PrevUnwindPoint(), notifications.Events, db) + err = stagedsync.NotifyNewHeaders(ctx, finishProgressBefore, sync.PrevUnwindPoint(), notifications.Events, db) if err != nil { return err } @@ -210,7 +206,7 @@ func StageLoopStep( return nil } -func MiningStep(ctx context.Context, kv ethdb.RwKV, mining *stagedsync.StagedSync) (err error) { +func MiningStep(ctx context.Context, kv ethdb.RwKV, mining *stagedsync.Sync) (err error) { defer func() { err = debug.ReportPanicAndRecover() }() // avoid crash because Erigon's core does many things - tx, err := kv.BeginRw(ctx) @@ -218,11 +214,7 @@ func MiningStep(ctx context.Context, kv ethdb.RwKV, mining *stagedsync.StagedSyn return err } defer tx.Rollback() - miningState, err := mining.Prepare() - if err != nil { - return err - } - if err = miningState.Run(nil, tx, false); err != nil { + if err = mining.Run(nil, tx, false); err != nil { return err } tx.Rollback() @@ -241,7 +233,7 @@ func NewStagedSync2( client *snapshotsync.Client, snapshotMigrator *snapshotsync.SnapshotMigrator, accumulator *shards.Accumulator, -) (*stagedsync.StagedSync, error) { +) (*stagedsync.Sync, error) { var pruningDistance uint64 if !cfg.StorageMode.History { pruningDistance = params.FullImmutabilityThreshold -- GitLab