From 43af5d42b8a7708d699a76086f5596e1a54956e5 Mon Sep 17 00:00:00 2001 From: Alex Sharov <AskAlexSharov@gmail.com> Date: Sat, 10 Jul 2021 16:43:58 +0700 Subject: [PATCH] simplify staged sync world (#2336) * simplify world * simplify world * simplify world --- cmd/integration/commands/snapshot_check.go | 4 +- cmd/integration/commands/stages.go | 20 +++--- cmd/integration/commands/state_stages.go | 10 +-- .../commands/send_transaction_test.go | 4 +- cmd/rpcdaemon/commands/txpool_api_test.go | 4 +- eth/backend.go | 46 +++++++------- eth/ethconfig/config.go | 5 +- eth/stagedsync/default_stages.go | 4 +- eth/stagedsync/stage_execute.go | 29 ++++++--- eth/stagedsync/stage_execute_test.go | 6 +- eth/stagedsync/stagebuilder.go | 7 ++- eth/stagedsync/stagedsync.go | 3 - turbo/shards/state_change_accumulator.go | 7 +++ turbo/stages/mock_sentry.go | 45 ++++++++----- turbo/stages/sentry_mock_test.go | 22 +++---- turbo/stages/stageloop.go | 63 +++++++++---------- 16 files changed, 150 insertions(+), 129 deletions(-) diff --git a/cmd/integration/commands/snapshot_check.go b/cmd/integration/commands/snapshot_check.go index e5f9082bc5..ada2d8af9f 100644 --- a/cmd/integration/commands/snapshot_check.go +++ b/cmd/integration/commands/snapshot_check.go @@ -247,8 +247,8 @@ func snapshotCheck(ctx context.Context, db ethdb.RwKV, isNew bool, tmpDir string log.Info("Stage4", "progress", stage4.BlockNumber) err = stagedsync.SpawnExecuteBlocksStage(stage4, sync, tx, blockNumber, ch, - stagedsync.StageExecuteBlocksCfg(db, false, false, false, 0, batchSize, nil, chainConfig, engine, vmConfig, tmpDir), nil, - ) + stagedsync.StageExecuteBlocksCfg(db, false, false, false, 0, batchSize, nil, chainConfig, engine, vmConfig, nil, false, tmpDir), + false) if err != nil { return fmt.Errorf("execution err %w", err) } diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index bceb901ae9..cb4d9514b9 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -436,17 +436,17 @@ func stageExec(db ethdb.RwKV, ctx context.Context) error { log.Info("Stage4", "progress", execStage.BlockNumber) ch := ctx.Done() - cfg := stagedsync.StageExecuteBlocksCfg(db, sm.Receipts, sm.CallTraces, sm.TEVM, 0, batchSize, nil, chainConfig, engine, vmConfig, tmpDBPath) + cfg := stagedsync.StageExecuteBlocksCfg(db, sm.Receipts, sm.CallTraces, sm.TEVM, 0, batchSize, nil, chainConfig, engine, vmConfig, nil, false, tmpDBPath) if unwind > 0 { u := &stagedsync.UnwindState{Stage: stages.Execution, UnwindPoint: execStage.BlockNumber - unwind} - err := stagedsync.UnwindExecutionStage(u, execStage, nil, ch, cfg, nil) + err := stagedsync.UnwindExecutionStage(u, execStage, nil, ch, cfg, false) if err != nil { return err } return nil } - err := stagedsync.SpawnExecuteBlocksStage(execStage, sync, nil, block, ch, cfg, nil) + err := stagedsync.SpawnExecuteBlocksStage(execStage, sync, nil, block, ch, cfg, false) if err != nil { return err } @@ -796,7 +796,6 @@ func newSync(ctx context.Context, db ethdb.RwKV) (ethdb.StorageMode, consensus.E var batchSize datasize.ByteSize must(batchSize.UnmarshalText([]byte(batchSizeStr))) - bodyDownloadTimeoutSeconds := 30 // TODO: convert to duration, make configurable blockDownloaderWindow := 65536 downloadServer, err := download.NewControlServer(db, "", chainConfig, genesisBlock.Hash(), engine, 1, nil, blockDownloaderWindow) @@ -814,14 +813,17 @@ func newSync(ctx context.Context, db ethdb.RwKV) (ethdb.StorageMode, consensus.E } txPoolP2PServer.TxFetcher = fetcher.NewTxFetcher(txPool.Has, txPool.AddRemotes, fetchTx) - st, err := stages2.NewStagedSync2(context.Background(), db, sm, batchSize, - bodyDownloadTimeoutSeconds, + + cfg := ethconfig.Defaults + cfg.StorageMode = sm + cfg.BatchSize = batchSize + + st, err := stages2.NewStagedSync2(context.Background(), db, cfg, downloadServer, tmpdir, - ethconfig.Snapshot{Enabled: false}, txPool, txPoolP2PServer, - nil, nil, + nil, nil, nil, ) if err != nil { panic(err) @@ -843,7 +845,7 @@ func newSync(ctx context.Context, db ethdb.RwKV) (ethdb.StorageMode, consensus.E var sync *stagedsync.State if err := db.View(context.Background(), func(tx ethdb.Tx) (err error) { - sync, err = st.Prepare(nil, tx, ctx.Done(), false, nil, nil) + sync, err = st.Prepare(nil, tx, ctx.Done(), false, nil) if err != nil { return nil } diff --git a/cmd/integration/commands/state_stages.go b/cmd/integration/commands/state_stages.go index 90acd1f460..00df2fa051 100644 --- a/cmd/integration/commands/state_stages.go +++ b/cmd/integration/commands/state_stages.go @@ -179,11 +179,11 @@ func syncBySmallSteps(db ethdb.RwKV, miningConfig params.MiningConfig, ctx conte stages.TxPool, // TODO: enable TxPool stage stages.Finish) - execCfg := stagedsync.StageExecuteBlocksCfg(db, sm.Receipts, sm.CallTraces, sm.TEVM, 0, batchSize, changeSetHook, chainConfig, engine, vmConfig, tmpDir) + execCfg := stagedsync.StageExecuteBlocksCfg(db, sm.Receipts, sm.CallTraces, sm.TEVM, 0, batchSize, changeSetHook, chainConfig, engine, vmConfig, nil, false, tmpDir) execUntilFunc := func(execToBlock uint64) func(stageState *stagedsync.StageState, unwinder stagedsync.Unwinder, tx ethdb.RwTx) error { return func(s *stagedsync.StageState, unwinder stagedsync.Unwinder, tx ethdb.RwTx) error { - if err := stagedsync.SpawnExecuteBlocksStage(s, unwinder, tx, execToBlock, quit, execCfg, nil); err != nil { + if err := stagedsync.SpawnExecuteBlocksStage(s, unwinder, tx, execToBlock, quit, execCfg, false); err != nil { return fmt.Errorf("spawnExecuteBlocksStage: %w", err) } return nil @@ -310,7 +310,7 @@ func syncBySmallSteps(db ethdb.RwKV, miningConfig params.MiningConfig, ctx conte miningConfig.Etherbase = nextBlock.Header().Coinbase miningConfig.ExtraData = nextBlock.Header().Extra - miningStages, err := mining.Prepare(db, tx, quit, false, miningWorld, nil) + miningStages, err := mining.Prepare(db, tx, quit, false, miningWorld) if err != nil { panic(err) } @@ -501,11 +501,11 @@ func loopExec(db ethdb.RwKV, ctx context.Context, unwind uint64) error { from := progress(tx, stages.Execution) to := from + unwind - cfg := stagedsync.StageExecuteBlocksCfg(db, true, false, false, 0, batchSize, nil, chainConfig, engine, vmConfig, tmpDBPath) + cfg := stagedsync.StageExecuteBlocksCfg(db, true, false, false, 0, batchSize, nil, chainConfig, engine, vmConfig, nil, false, tmpDBPath) // set block limit of execute stage sync.MockExecFunc(stages.Execution, func(stageState *stagedsync.StageState, unwinder stagedsync.Unwinder, tx ethdb.RwTx) error { - if err = stagedsync.SpawnExecuteBlocksStage(stageState, sync, tx, to, ch, cfg, nil); err != nil { + if err = stagedsync.SpawnExecuteBlocksStage(stageState, sync, tx, to, ch, cfg, false); err != nil { return fmt.Errorf("spawnExecuteBlocksStage: %w", err) } return nil diff --git a/cmd/rpcdaemon/commands/send_transaction_test.go b/cmd/rpcdaemon/commands/send_transaction_test.go index 1c0e63949f..c2eec7fc44 100644 --- a/cmd/rpcdaemon/commands/send_transaction_test.go +++ b/cmd/rpcdaemon/commands/send_transaction_test.go @@ -17,7 +17,6 @@ import ( "github.com/ledgerwatch/erigon/core" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/eth/protocols/eth" - "github.com/ledgerwatch/erigon/ethdb/remote/remotedbserver" "github.com/ledgerwatch/erigon/params" "github.com/ledgerwatch/erigon/rlp" "github.com/ledgerwatch/erigon/turbo/stages" @@ -56,10 +55,9 @@ func TestSendRawTransaction(t *testing.T) { } m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed - notifier := &remotedbserver.Events{} initialCycle := true highestSeenHeader := chain.TopBlock.NumberU64() - if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, notifier, initialCycle, nil, m.UpdateHead, nil); err != nil { + if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil { t.Fatal(err) } } diff --git a/cmd/rpcdaemon/commands/txpool_api_test.go b/cmd/rpcdaemon/commands/txpool_api_test.go index 1cd3ce90fc..80c28c367b 100644 --- a/cmd/rpcdaemon/commands/txpool_api_test.go +++ b/cmd/rpcdaemon/commands/txpool_api_test.go @@ -15,7 +15,6 @@ import ( "github.com/ledgerwatch/erigon/core" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/eth/protocols/eth" - "github.com/ledgerwatch/erigon/ethdb/remote/remotedbserver" "github.com/ledgerwatch/erigon/params" "github.com/ledgerwatch/erigon/rlp" "github.com/ledgerwatch/erigon/turbo/stages" @@ -52,10 +51,9 @@ func TestTxPoolContent(t *testing.T) { } m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed - notifier := &remotedbserver.Events{} initialCycle := true highestSeenHeader := chain.TopBlock.NumberU64() - if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, notifier, initialCycle, nil, m.UpdateHead, nil); err != nil { + if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil { t.Fatal(err) } } diff --git a/eth/backend.go b/eth/backend.go index de11722cc6..5db6922286 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -61,6 +61,7 @@ import ( "github.com/ledgerwatch/erigon/params" "github.com/ledgerwatch/erigon/rpc" "github.com/ledgerwatch/erigon/turbo/remote" + "github.com/ledgerwatch/erigon/turbo/shards" "github.com/ledgerwatch/erigon/turbo/snapshotsync" stages2 "github.com/ledgerwatch/erigon/turbo/stages" "github.com/ledgerwatch/erigon/turbo/stages/txpropagate" @@ -94,7 +95,6 @@ type Ethereum struct { torrentClient *snapshotsync.Client lock sync.RWMutex // Protects the variadic fields (e.g. gas price and etherbase) - events *remotedbserver.Events chainConfig *params.ChainConfig genesisHash common.Hash quitMining chan struct{} @@ -103,13 +103,16 @@ type Ethereum struct { minedBlocks chan *types.Block // downloader fields - downloadCtx context.Context - downloadCancel context.CancelFunc - downloadServer *download.ControlServerImpl - sentryServers []*download.SentryServerImpl - txPoolP2PServer *txpool.P2PServer - sentries []remote.SentryClient - stagedSync *stagedsync.StagedSync + downloadCtx context.Context + downloadCancel context.CancelFunc + downloadServer *download.ControlServerImpl + sentryServers []*download.SentryServerImpl + txPoolP2PServer *txpool.P2PServer + sentries []remote.SentryClient + stagedSync *stagedsync.StagedSync + + notifications *stagedsync.Notifications + waitForStageLoopStop chan struct{} waitForMiningStop chan struct{} } @@ -187,6 +190,10 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { waitForStageLoopStop: make(chan struct{}), waitForMiningStop: make(chan struct{}), sentries: []remote.SentryClient{}, + notifications: &stagedsync.Notifications{ + Events: remotedbserver.NewEvents(), + Accumulator: &shards.Accumulator{}, + }, } backend.gasPrice, _ = uint256.FromBig(config.Miner.GasPrice) @@ -240,7 +247,6 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { backend.txPool = core.NewTxPool(config.TxPool, chainConfig, chainKv) // setting notifier to support streaming events to rpc daemon - backend.events = remotedbserver.NewEvents() var mg *snapshotsync.SnapshotMigrator if config.Snapshot.Enabled { currentSnapshotBlock, currentInfohash, err := snapshotsync.GetSnapshotInfo(chainKv) @@ -262,7 +268,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { mining := stagedsync.New( stagedsync.MiningStages( stagedsync.StageMiningCreateBlockCfg(backend.chainKV, backend.config.Miner, *backend.chainConfig, backend.engine, backend.txPool, tmpdir), - stagedsync.StageMiningExecCfg(backend.chainKV, backend.config.Miner, backend.events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir), + stagedsync.StageMiningExecCfg(backend.chainKV, backend.config.Miner, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir), stagedsync.StageHashStateCfg(backend.chainKV, tmpdir), stagedsync.StageTrieCfg(backend.chainKV, false, true, tmpdir), stagedsync.StageMiningFinishCfg(backend.chainKV, *backend.chainConfig, backend.engine, backend.pendingBlocks, backend.minedBlocks, backend.miningSealingQuit), @@ -274,7 +280,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { } kvRPC := remotedbserver.NewKvServer(backend.chainKV) - ethBackendRPC := remotedbserver.NewEthBackendServer(backend, backend.events) + ethBackendRPC := remotedbserver.NewEthBackendServer(backend, backend.notifications.Events) txPoolRPC := remotedbserver.NewTxPoolServer(context.Background(), backend.txPool) miningRPC := remotedbserver.NewMiningServer(context.Background(), backend, ethashApi) @@ -413,21 +419,18 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { } backend.txPoolP2PServer.TxFetcher = fetcher.NewTxFetcher(backend.txPool.Has, backend.txPool.AddRemotes, fetchTx) - bodyDownloadTimeoutSeconds := 30 // TODO: convert to duration, make configurable + config.BodyDownloadTimeoutSeconds = 30 backend.stagedSync, err = stages2.NewStagedSync2( backend.downloadCtx, backend.chainKV, - config.StorageMode, - config.BatchSize, - bodyDownloadTimeoutSeconds, + *config, backend.downloadServer, tmpdir, - config.Snapshot, backend.txPool, backend.txPoolP2PServer, - torrentClient, mg, + torrentClient, mg, backend.notifications.Accumulator, ) if err != nil { return nil, err @@ -655,7 +658,7 @@ func (s *Ethereum) Start() error { }(i) } - go Loop(s.downloadCtx, s.chainKV, s.stagedSync, s.downloadServer, s.events, s.config.StateStream, s.waitForStageLoopStop, s.config.SyncLoopThrottle) + go Loop(s.downloadCtx, s.chainKV, s.stagedSync, s.downloadServer, s.notifications, s.waitForStageLoopStop, s.config.SyncLoopThrottle) return nil } @@ -700,8 +703,7 @@ func Loop( ctx context.Context, db ethdb.RwKV, sync *stagedsync.StagedSync, controlServer *download.ControlServerImpl, - notifier stagedsync.ChainEventNotifier, - stateStream bool, + notifications *stagedsync.Notifications, waitForDone chan struct{}, loopMinTime time.Duration, ) { @@ -711,9 +713,7 @@ func Loop( db, sync, controlServer.Hd, - controlServer.ChainConfig, - notifier, - stateStream, + notifications, controlServer.UpdateHead, waitForDone, loopMinTime, diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index 86e6ce70af..32ed8895fd 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -81,6 +81,8 @@ var Defaults = Config{ RPCGasCap: 25000000, GPO: FullNodeGPO, RPCTxFeeCap: 1, // 1 ether + + BodyDownloadTimeoutSeconds: 30, } func init() { @@ -184,7 +186,8 @@ type Config struct { // CheckpointOracle is the configuration for checkpoint oracle. CheckpointOracle *params.CheckpointOracleConfig `toml:",omitempty"` - StateStream bool + StateStream bool + BodyDownloadTimeoutSeconds int // TODO change to duration // SyncLoopThrottle sets a minimum time between staged loop iterations SyncLoopThrottle time.Duration diff --git a/eth/stagedsync/default_stages.go b/eth/stagedsync/default_stages.go index 63f2fa49c2..308cb9ef14 100644 --- a/eth/stagedsync/default_stages.go +++ b/eth/stagedsync/default_stages.go @@ -130,10 +130,10 @@ func DefaultStages(ctx context.Context, ID: stages.Execution, Description: "Execute blocks w/o hash checks", ExecFunc: func(s *StageState, u Unwinder, tx ethdb.RwTx) error { - return SpawnExecuteBlocksStage(s, u, tx, 0, ctx.Done(), exec, world.Accumulator) + return SpawnExecuteBlocksStage(s, u, tx, 0, ctx.Done(), exec, world.InitialCycle) }, UnwindFunc: func(u *UnwindState, s *StageState, tx ethdb.RwTx) error { - return UnwindExecutionStage(u, s, tx, ctx.Done(), exec, world.Accumulator) + return UnwindExecutionStage(u, s, tx, ctx.Done(), exec, world.InitialCycle) }, } }, diff --git a/eth/stagedsync/stage_execute.go b/eth/stagedsync/stage_execute.go index 7d04688c72..d9a16f5b53 100644 --- a/eth/stagedsync/stage_execute.go +++ b/eth/stagedsync/stage_execute.go @@ -59,6 +59,8 @@ type ExecuteBlockCfg struct { writeCallTraces bool writeTEVM bool pruningDistance uint64 + stateStream bool + accumulator *shards.Accumulator } func StageExecuteBlocksCfg( @@ -72,6 +74,8 @@ func StageExecuteBlocksCfg( chainConfig *params.ChainConfig, engine consensus.Engine, vmConfig *vm.Config, + accumulator *shards.Accumulator, + stateStream bool, tmpdir string, ) ExecuteBlockCfg { return ExecuteBlockCfg{ @@ -86,6 +90,8 @@ func StageExecuteBlocksCfg( engine: engine, vmConfig: vmConfig, tmpdir: tmpdir, + accumulator: accumulator, + stateStream: stateStream, } } @@ -104,11 +110,11 @@ func executeBlock( batch ethdb.Database, cfg ExecuteBlockCfg, writeChangesets bool, - accumulator *shards.Accumulator, checkTEVM func(contractHash common.Hash) (bool, error), + initialCycle bool, ) error { blockNum := block.NumberU64() - stateReader, stateWriter := newStateReaderWriter(batch, tx, blockNum, block.Hash(), writeChangesets, accumulator) + stateReader, stateWriter := newStateReaderWriter(batch, tx, blockNum, block.Hash(), writeChangesets, cfg.accumulator, initialCycle, cfg.stateStream) // where the magic happens getHeader := func(hash common.Hash, number uint64) *types.Header { return rawdb.ReadHeader(tx, hash, number) } @@ -198,6 +204,8 @@ func newStateReaderWriter( blockHash common.Hash, writeChangesets bool, accumulator *shards.Accumulator, + initialCycle bool, + stateStream bool, ) (state.StateReader, state.WriterWithChangeSets) { var stateReader state.StateReader @@ -205,8 +213,10 @@ func newStateReaderWriter( stateReader = state.NewPlainStateReader(batch) - if accumulator != nil { + if !initialCycle && stateStream { accumulator.StartChange(blockNum, blockHash, false) + } else { + accumulator = nil } if writeChangesets { stateWriter = state.NewPlainStateWriter(batch, tx, blockNum).SetAccumulator(accumulator) @@ -217,7 +227,7 @@ func newStateReaderWriter( return stateReader, stateWriter } -func SpawnExecuteBlocksStage(s *StageState, u Unwinder, tx ethdb.RwTx, toBlock uint64, quit <-chan struct{}, cfg ExecuteBlockCfg, accumulator *shards.Accumulator) error { +func SpawnExecuteBlocksStage(s *StageState, u Unwinder, tx ethdb.RwTx, toBlock uint64, quit <-chan struct{}, cfg ExecuteBlockCfg, initialCycle bool) error { useExternalTx := tx != nil if !useExternalTx { var err error @@ -290,7 +300,7 @@ Loop: checkTEVMCode = ethdb.GetCheckTEVM(tx) } - if err = executeBlock(block, tx, batch, cfg, writeChangesets, accumulator, checkTEVMCode); err != nil { + if err = executeBlock(block, tx, batch, cfg, writeChangesets, checkTEVMCode, initialCycle); err != nil { log.Error(fmt.Sprintf("[%s] Execution failed", logPrefix), "number", blockNum, "hash", block.Hash().String(), "error", err) if unwindErr := u.UnwindTo(blockNum-1, tx, block.Hash()); unwindErr != nil { return unwindErr @@ -429,7 +439,7 @@ func logProgress(logPrefix string, prevBlock uint64, prevTime time.Time, current return currentBlock, currentTx, currentTime } -func UnwindExecutionStage(u *UnwindState, s *StageState, tx ethdb.RwTx, quit <-chan struct{}, cfg ExecuteBlockCfg, accumulator *shards.Accumulator) error { +func UnwindExecutionStage(u *UnwindState, s *StageState, tx ethdb.RwTx, quit <-chan struct{}, cfg ExecuteBlockCfg, initialCycle bool) error { if u.UnwindPoint >= s.BlockNumber { s.Done() return nil @@ -446,7 +456,7 @@ func UnwindExecutionStage(u *UnwindState, s *StageState, tx ethdb.RwTx, quit <-c logPrefix := s.state.LogPrefix() log.Info(fmt.Sprintf("[%s] Unwind Execution", logPrefix), "from", s.BlockNumber, "to", u.UnwindPoint) - if err := unwindExecutionStage(u, s, tx, quit, cfg, accumulator); err != nil { + if err := unwindExecutionStage(u, s, tx, quit, cfg, initialCycle); err != nil { return err } if err := u.Done(tx); err != nil { @@ -461,12 +471,13 @@ func UnwindExecutionStage(u *UnwindState, s *StageState, tx ethdb.RwTx, quit <-c return nil } -func unwindExecutionStage(u *UnwindState, s *StageState, tx ethdb.RwTx, quit <-chan struct{}, cfg ExecuteBlockCfg, accumulator *shards.Accumulator) error { +func unwindExecutionStage(u *UnwindState, s *StageState, tx ethdb.RwTx, quit <-chan struct{}, cfg ExecuteBlockCfg, initialCycle bool) error { logPrefix := s.state.LogPrefix() stateBucket := dbutils.PlainStateBucket storageKeyLength := common.AddressLength + common.IncarnationLength + common.HashLength - if accumulator != nil { + var accumulator *shards.Accumulator + if !initialCycle && cfg.stateStream { hash, err := rawdb.ReadCanonicalHash(tx, u.UnwindPoint) if err != nil { return fmt.Errorf("%s: reading canonical hash of unwind point: %v", logPrefix, err) diff --git a/eth/stagedsync/stage_execute_test.go b/eth/stagedsync/stage_execute_test.go index 12f30ac6b3..bc9da04ec4 100644 --- a/eth/stagedsync/stage_execute_test.go +++ b/eth/stagedsync/stage_execute_test.go @@ -21,7 +21,7 @@ func TestUnwindExecutionStagePlainStatic(t *testing.T) { } u := &UnwindState{Stage: stages.Execution, UnwindPoint: 50} s := &StageState{Stage: stages.Execution, BlockNumber: 100} - err = UnwindExecutionStage(u, s, tx2, nil, ExecuteBlockCfg{writeReceipts: true}, nil) + err = UnwindExecutionStage(u, s, tx2, nil, ExecuteBlockCfg{writeReceipts: true}, false) if err != nil { t.Errorf("error while unwinding state: %v", err) } @@ -42,7 +42,7 @@ func TestUnwindExecutionStagePlainWithIncarnationChanges(t *testing.T) { } u := &UnwindState{Stage: stages.Execution, UnwindPoint: 50} s := &StageState{Stage: stages.Execution, BlockNumber: 100} - err = UnwindExecutionStage(u, s, tx2, nil, ExecuteBlockCfg{writeReceipts: true}, nil) + err = UnwindExecutionStage(u, s, tx2, nil, ExecuteBlockCfg{writeReceipts: true}, false) if err != nil { t.Errorf("error while unwinding state: %v", err) } @@ -64,7 +64,7 @@ func TestUnwindExecutionStagePlainWithCodeChanges(t *testing.T) { } u := &UnwindState{Stage: stages.Execution, UnwindPoint: 50} s := &StageState{Stage: stages.Execution, BlockNumber: 100} - err = UnwindExecutionStage(u, s, tx2, nil, ExecuteBlockCfg{writeReceipts: true}, nil) + err = UnwindExecutionStage(u, s, tx2, nil, ExecuteBlockCfg{writeReceipts: true}, false) if err != nil { t.Errorf("error while unwinding state: %v", err) } diff --git a/eth/stagedsync/stagebuilder.go b/eth/stagedsync/stagebuilder.go index ebd9881102..2df0a8cc38 100644 --- a/eth/stagedsync/stagebuilder.go +++ b/eth/stagedsync/stagebuilder.go @@ -7,6 +7,7 @@ import ( "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" "github.com/ledgerwatch/erigon/ethdb" + "github.com/ledgerwatch/erigon/ethdb/remote/remotedbserver" "github.com/ledgerwatch/erigon/turbo/shards" ) @@ -15,6 +16,11 @@ type ChainEventNotifier interface { OnNewPendingLogs(types.Logs) } +type Notifications struct { + Events *remotedbserver.Events + Accumulator *shards.Accumulator +} + // StageParameters contains the stage that stages receives at runtime when initializes. // Then the stage can use it to receive different useful functions. type StageParameters struct { @@ -25,7 +31,6 @@ type StageParameters struct { mining *MiningCfg snapshotsDir string - Accumulator *shards.Accumulator // State change accumulator } type MiningCfg struct { diff --git a/eth/stagedsync/stagedsync.go b/eth/stagedsync/stagedsync.go index 93f1e5a0a3..1e5ee11d17 100644 --- a/eth/stagedsync/stagedsync.go +++ b/eth/stagedsync/stagedsync.go @@ -4,7 +4,6 @@ import ( "context" "github.com/ledgerwatch/erigon/ethdb" - "github.com/ledgerwatch/erigon/turbo/shards" "github.com/ledgerwatch/erigon/turbo/snapshotsync" "github.com/ledgerwatch/erigon/turbo/stages/bodydownload" ) @@ -48,7 +47,6 @@ func (stagedSync *StagedSync) Prepare( quitCh <-chan struct{}, initialCycle bool, miningConfig *MiningCfg, - accumulator *shards.Accumulator, ) (*State, error) { stages := stagedSync.stageBuilders.Build( StageParameters{ @@ -56,7 +54,6 @@ func (stagedSync *StagedSync) Prepare( InitialCycle: initialCycle, mining: miningConfig, snapshotsDir: stagedSync.params.SnapshotDir, - Accumulator: accumulator, }, ) state := NewState(stages) diff --git a/turbo/shards/state_change_accumulator.go b/turbo/shards/state_change_accumulator.go index 760e7692c9..9e0ae5246d 100644 --- a/turbo/shards/state_change_accumulator.go +++ b/turbo/shards/state_change_accumulator.go @@ -14,6 +14,13 @@ type Accumulator struct { storageChangeIndex map[common.Address]map[common.Hash]int } +func (a *Accumulator) Reset() { + a.changes = nil + a.latestChange = nil + a.accountChangeIndex = nil + a.storageChangeIndex = nil +} + // StartChanges begins accumulation of changes for a new block func (a *Accumulator) StartChange(blockHeight uint64, blockHash common.Hash, unwind bool) { a.changes = append(a.changes, remote.StateChange{}) diff --git a/turbo/stages/mock_sentry.go b/turbo/stages/mock_sentry.go index a6fc1e8a9a..ccc337ae77 100644 --- a/turbo/stages/mock_sentry.go +++ b/turbo/stages/mock_sentry.go @@ -35,6 +35,7 @@ import ( "github.com/ledgerwatch/erigon/params" "github.com/ledgerwatch/erigon/rlp" "github.com/ledgerwatch/erigon/turbo/remote" + "github.com/ledgerwatch/erigon/turbo/shards" "github.com/ledgerwatch/erigon/turbo/stages/bodydownload" "github.com/ledgerwatch/erigon/turbo/stages/headerdownload" "github.com/ledgerwatch/erigon/turbo/stages/txpropagate" @@ -67,6 +68,8 @@ type MockSentry struct { StreamWg sync.WaitGroup ReceiveWg sync.WaitGroup Address common.Address + + Notifications *stagedsync.Notifications } // Stream returns stream, waiting if necessary @@ -152,6 +155,13 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey Engine: engine, ChainConfig: gspec.Config, Key: key, + Notifications: &stagedsync.Notifications{ + Events: remotedbserver.NewEvents(), + Accumulator: &shards.Accumulator{}, + }, + UpdateHead: func(Ctx context.Context, head uint64, hash common.Hash, td *uint256.Int) { + }, + PeerId: gointerfaces.ConvertBytesToH512([]byte("12345")), } mock.Ctx, mock.cancel = context.WithCancel(context.Background()) mock.Address = crypto.PubkeyToAddress(mock.Key.PublicKey) @@ -163,18 +173,18 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey } penalize := func(context.Context, []headerdownload.PenaltyItem) { } - batchSize := 1 * datasize.MB + cfg := ethconfig.Defaults + cfg.BatchSize = 1 * datasize.MB + cfg.BodyDownloadTimeoutSeconds = 10 + cfg.TxPool.Journal = "" + cfg.TxPool.StartOnInit = true + txPoolConfig := cfg.TxPool + sendBodyRequest := func(context.Context, *bodydownload.BodyRequest) []byte { return nil } - mock.UpdateHead = func(Ctx context.Context, head uint64, hash common.Hash, td *uint256.Int) { - } blockPropagator := func(Ctx context.Context, block *types.Block, td *big.Int) { } - blockDowloadTimeout := 10 - txPoolConfig := core.DefaultTxPoolConfig - txPoolConfig.Journal = "" - txPoolConfig.StartOnInit = true txPool := core.NewTxPool(txPoolConfig, mock.ChainConfig, mock.DB) txSentryClient := remote.NewSentryClientDirect(eth.ETH66, mock) mock.TxPoolP2PServer, err = txpool.NewP2PServer(mock.Ctx, []remote.SentryClient{txSentryClient}, txPool) @@ -213,6 +223,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey panic(err) } } + mock.Sync = NewStagedSync(mock.Ctx, sm, stagedsync.StageHeadersCfg( mock.DB, @@ -221,7 +232,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey sendHeaderRequest, propagateNewBlockHashes, penalize, - batchSize, + cfg.BatchSize, ), stagedsync.StageBlockHashesCfg(mock.DB, mock.tmpdir), stagedsync.StageSnapshotHeadersCfg(mock.DB, ethconfig.Snapshot{Enabled: false}, nil, nil), @@ -231,9 +242,9 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey sendBodyRequest, penalize, blockPropagator, - blockDowloadTimeout, + cfg.BodyDownloadTimeoutSeconds, *mock.ChainConfig, - batchSize, + cfg.BatchSize, ), stagedsync.StageSnapshotBodiesCfg( mock.DB, @@ -248,16 +259,18 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey sm.CallTraces, sm.TEVM, 0, - batchSize, + cfg.BatchSize, nil, mock.ChainConfig, mock.Engine, &vm.Config{NoReceipts: !sm.Receipts}, + nil, + cfg.StateStream, mock.tmpdir, ), stagedsync.StageTranspileCfg( mock.DB, - batchSize, + cfg.BatchSize, nil, nil, mock.ChainConfig, @@ -272,7 +285,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey stagedsync.StageTrieCfg(mock.DB, true, true, mock.tmpdir), stagedsync.StageHistoryCfg(mock.DB, mock.tmpdir), stagedsync.StageLogIndexCfg(mock.DB, mock.tmpdir), - stagedsync.StageCallTracesCfg(mock.DB, 0, batchSize, mock.tmpdir, mock.ChainConfig, mock.Engine), + stagedsync.StageCallTracesCfg(mock.DB, 0, cfg.BatchSize, mock.tmpdir, mock.ChainConfig, mock.Engine), stagedsync.StageTxLookupCfg(mock.DB, mock.tmpdir), stagedsync.StageTxPoolCfg(mock.DB, txPool, func() { mock.StreamWg.Add(1) @@ -285,7 +298,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey true, /* test */ ) - miningConfig := ethconfig.Defaults.Miner + miningConfig := cfg.Miner miningConfig.Enabled = true miningConfig.Noverify = false miningConfig.Etherbase = mock.Address @@ -306,7 +319,6 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey stagedsync.OptionalParameters{}, ) - mock.PeerId = gointerfaces.ConvertBytesToH512([]byte("12345")) mock.StreamWg.Add(1) go download.RecvMessageLoop(mock.Ctx, mock.SentryClient, mock.downloader, &mock.ReceiveWg) mock.StreamWg.Wait() @@ -393,10 +405,9 @@ func (ms *MockSentry) InsertChain(chain *core.ChainPack) error { } } ms.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed - notifier := &remotedbserver.Events{} initialCycle := false highestSeenHeader := uint64(chain.TopBlock.NumberU64()) - if err := StageLoopStep(ms.Ctx, ms.DB, ms.Sync, highestSeenHeader, notifier, initialCycle, nil, ms.UpdateHead, nil); err != nil { + if err := StageLoopStep(ms.Ctx, ms.DB, ms.Sync, highestSeenHeader, ms.Notifications, initialCycle, ms.UpdateHead, nil); err != nil { return err } // Check if the latest header was imported or rolled back diff --git a/turbo/stages/sentry_mock_test.go b/turbo/stages/sentry_mock_test.go index 8f2726a97f..cf6b3b5683 100644 --- a/turbo/stages/sentry_mock_test.go +++ b/turbo/stages/sentry_mock_test.go @@ -12,7 +12,6 @@ import ( "github.com/ledgerwatch/erigon/core" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/eth/protocols/eth" - "github.com/ledgerwatch/erigon/ethdb/remote/remotedbserver" "github.com/ledgerwatch/erigon/log" "github.com/ledgerwatch/erigon/params" "github.com/ledgerwatch/erigon/rlp" @@ -57,10 +56,9 @@ func TestHeaderStep(t *testing.T) { } m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed - notifier := &remotedbserver.Events{} initialCycle := true highestSeenHeader := uint64(chain.TopBlock.NumberU64()) - if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, notifier, initialCycle, nil, m.UpdateHead, nil); err != nil { + if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil { t.Fatal(err) } } @@ -98,10 +96,9 @@ func TestMineBlockWith1Tx(t *testing.T) { } m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed - notifier := &remotedbserver.Events{} initialCycle := true highestSeenHeader := uint64(chain.TopBlock.NumberU64()) - if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, notifier, initialCycle, nil, m.UpdateHead, nil); err != nil { + if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil { t.Fatal(err) } } @@ -170,10 +167,9 @@ func TestReorg(t *testing.T) { } m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed - notifier := &remotedbserver.Events{} initialCycle := true highestSeenHeader := uint64(chain.TopBlock.NumberU64()) - if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, notifier, initialCycle, nil, m.UpdateHead, nil); err != nil { + if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil { t.Fatal(err) } @@ -227,7 +223,7 @@ func TestReorg(t *testing.T) { highestSeenHeader = uint64(short.TopBlock.NumberU64()) initialCycle = false - if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, notifier, initialCycle, nil, m.UpdateHead, nil); err != nil { + if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil { t.Fatal(err) } @@ -271,7 +267,7 @@ func TestReorg(t *testing.T) { // This is unwind step highestSeenHeader = uint64(long1.TopBlock.NumberU64()) - if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, notifier, initialCycle, nil, m.UpdateHead, nil); err != nil { + if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil { t.Fatal(err) } @@ -309,7 +305,7 @@ func TestReorg(t *testing.T) { highestSeenHeader = uint64(short2.TopBlock.NumberU64()) initialCycle = false - if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, notifier, initialCycle, nil, m.UpdateHead, nil); err != nil { + if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil { t.Fatal(err) } } @@ -407,9 +403,8 @@ func TestAnchorReplace(t *testing.T) { m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed highestSeenHeader := uint64(long.TopBlock.NumberU64()) - notifier := &remotedbserver.Events{} initialCycle := true - if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, notifier, initialCycle, nil, m.UpdateHead, nil); err != nil { + if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil { t.Fatal(err) } } @@ -515,9 +510,8 @@ func TestAnchorReplace2(t *testing.T) { m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed highestSeenHeader := uint64(long.TopBlock.NumberU64()) - notifier := &remotedbserver.Events{} initialCycle := true - if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, notifier, initialCycle, nil, m.UpdateHead, nil); err != nil { + if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, m.Notifications, initialCycle, m.UpdateHead, nil); err != nil { t.Fatal(err) } } diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index 1ad66fb014..44492ed387 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -8,7 +8,6 @@ import ( "math/big" "time" - "github.com/c2h5oh/datasize" "github.com/holiman/uint256" "github.com/ledgerwatch/erigon/cmd/sentry/download" "github.com/ledgerwatch/erigon/common" @@ -64,9 +63,7 @@ func StageLoop( db ethdb.RwKV, sync *stagedsync.StagedSync, hd *headerdownload.HeaderDownload, - chainConfig *params.ChainConfig, - notifier stagedsync.ChainEventNotifier, - stateStream bool, + notifications *stagedsync.Notifications, updateHead func(ctx context.Context, head uint64, hash common.Hash, td *uint256.Int), waitForDone chan struct{}, loopMinTime time.Duration, @@ -85,11 +82,7 @@ func StageLoop( // Estimate the current top height seen from the peer height := hd.TopSeenHeight() - var accumulator *shards.Accumulator - if !initialCycle && stateStream { - accumulator = &shards.Accumulator{} - } - if err := StageLoopStep(ctx, db, sync, height, notifier, initialCycle, accumulator, updateHead, sync.GetSnapshotMigratorFinal()); err != nil { + if err := StageLoopStep(ctx, db, sync, height, notifications, initialCycle, updateHead, sync.GetSnapshotMigratorFinal()); err != nil { if errors.Is(err, common.ErrStopped) { return } @@ -122,9 +115,8 @@ func StageLoopStep( db ethdb.RwKV, sync *stagedsync.StagedSync, highestSeenHeader uint64, - notifier stagedsync.ChainEventNotifier, + notifications *stagedsync.Notifications, initialCycle bool, - accumulator *shards.Accumulator, updateHead func(ctx context.Context, head uint64, hash common.Hash, td *uint256.Int), snapshotMigratorFinal func(tx ethdb.Tx) error, ) (err error) { @@ -157,7 +149,10 @@ func StageLoopStep( return err } - st, err1 := sync.Prepare(db, nil, ctx.Done(), initialCycle, nil, accumulator) + if notifications != nil && notifications.Accumulator != nil { + notifications.Accumulator.Reset() + } + st, err1 := sync.Prepare(db, nil, ctx.Done(), initialCycle, nil) if err1 != nil { return fmt.Errorf("prepare staged sync: %w", err1) } @@ -219,7 +214,7 @@ func StageLoopStep( } updateHead(ctx, head, headHash, headTd256) - err = stagedsync.NotifyNewHeaders(ctx, finishProgressBefore, unwindTo, notifier, db) + err = stagedsync.NotifyNewHeaders(ctx, finishProgressBefore, unwindTo, notifications.Events, db) if err != nil { return err } @@ -241,7 +236,6 @@ func MiningStep(ctx context.Context, kv ethdb.RwKV, mining *stagedsync.StagedSyn ctx.Done(), false, stagedsync.StageMiningCfg(true), - nil, ) if err != nil { return err @@ -256,23 +250,22 @@ func MiningStep(ctx context.Context, kv ethdb.RwKV, mining *stagedsync.StagedSyn func NewStagedSync2( ctx context.Context, db ethdb.RwKV, - sm ethdb.StorageMode, - batchSize datasize.ByteSize, - bodyDownloadTimeout int, + cfg ethconfig.Config, controlServer *download.ControlServerImpl, tmpdir string, - snapshotCfg ethconfig.Snapshot, txPool *core.TxPool, txPoolServer *txpool.P2PServer, - client *snapshotsync.Client, snapshotMigrator *snapshotsync.SnapshotMigrator, + client *snapshotsync.Client, + snapshotMigrator *snapshotsync.SnapshotMigrator, + accumulator *shards.Accumulator, ) (*stagedsync.StagedSync, error) { var pruningDistance uint64 - if !sm.History { + if !cfg.StorageMode.History { pruningDistance = params.FullImmutabilityThreshold } - return NewStagedSync(ctx, sm, + return NewStagedSync(ctx, cfg.StorageMode, stagedsync.StageHeadersCfg( db, controlServer.Hd, @@ -280,48 +273,50 @@ func NewStagedSync2( controlServer.SendHeaderRequest, controlServer.PropagateNewBlockHashes, controlServer.Penalize, - batchSize, + cfg.BatchSize, ), stagedsync.StageBlockHashesCfg(db, tmpdir), - stagedsync.StageSnapshotHeadersCfg(db, snapshotCfg, client, snapshotMigrator), + stagedsync.StageSnapshotHeadersCfg(db, cfg.Snapshot, client, snapshotMigrator), stagedsync.StageBodiesCfg( db, controlServer.Bd, controlServer.SendBodyRequest, controlServer.Penalize, controlServer.BroadcastNewBlock, - bodyDownloadTimeout, + cfg.BodyDownloadTimeoutSeconds, *controlServer.ChainConfig, - batchSize, + cfg.BatchSize, ), - stagedsync.StageSnapshotBodiesCfg(db, snapshotCfg, client, snapshotMigrator, tmpdir), + stagedsync.StageSnapshotBodiesCfg(db, cfg.Snapshot, client, snapshotMigrator, tmpdir), stagedsync.StageSendersCfg(db, controlServer.ChainConfig, tmpdir), stagedsync.StageExecuteBlocksCfg( db, - sm.Receipts, - sm.CallTraces, - sm.TEVM, + cfg.StorageMode.Receipts, + cfg.StorageMode.CallTraces, + cfg.StorageMode.TEVM, pruningDistance, - batchSize, + cfg.BatchSize, nil, controlServer.ChainConfig, controlServer.Engine, - &vm.Config{NoReceipts: !sm.Receipts, EnableTEMV: sm.TEVM}, + &vm.Config{NoReceipts: !cfg.StorageMode.Receipts, EnableTEMV: cfg.StorageMode.TEVM}, + accumulator, + cfg.StateStream, tmpdir, ), stagedsync.StageTranspileCfg( db, - batchSize, + cfg.BatchSize, nil, nil, controlServer.ChainConfig, ), - stagedsync.StageSnapshotStateCfg(db, snapshotCfg, tmpdir, client, snapshotMigrator), + stagedsync.StageSnapshotStateCfg(db, cfg.Snapshot, tmpdir, client, snapshotMigrator), stagedsync.StageHashStateCfg(db, tmpdir), stagedsync.StageTrieCfg(db, true, true, tmpdir), stagedsync.StageHistoryCfg(db, tmpdir), stagedsync.StageLogIndexCfg(db, tmpdir), - stagedsync.StageCallTracesCfg(db, 0, batchSize, tmpdir, controlServer.ChainConfig, controlServer.Engine), + stagedsync.StageCallTracesCfg(db, 0, cfg.BatchSize, tmpdir, controlServer.ChainConfig, controlServer.Engine), stagedsync.StageTxLookupCfg(db, tmpdir), stagedsync.StageTxPoolCfg(db, txPool, func() { for i := range txPoolServer.Sentries { -- GitLab