diff --git a/cmd/downloader/readme.md b/cmd/downloader/readme.md index 6a504728a641cd0bda9fc945bc3474661302335a..9c9e6d468f3c746cd958e66aab3f9c1e1162fd23 100644 --- a/cmd/downloader/readme.md +++ b/cmd/downloader/readme.md @@ -24,6 +24,8 @@ downloader --downloader.api.addr=127.0.0.1:9093 --torrent.port=42068 --datadir=< erigon --experimental.snapshot --downloader.api.addr=127.0.0.1:9093 --datadir=<your_datadir> ``` +Use `--experimental.snapshot.keepblocks=true` to don't delete retired blocks from DB + ## How to create new network or bootnode ```shell diff --git a/cmd/integration/commands/flags.go b/cmd/integration/commands/flags.go index d1fdd0f3fab9970fa3a2c387397a7ab251939574..20d80cb8d5bc96e989784deb2fef82bf82daa1a0 100644 --- a/cmd/integration/commands/flags.go +++ b/cmd/integration/commands/flags.go @@ -90,7 +90,7 @@ func withBucket(cmd *cobra.Command) { } func withDataDir2(cmd *cobra.Command) { - cmd.Flags().String(utils.DataDirFlag.Name, paths.DefaultDataDir(), utils.DataDirFlag.Usage) + cmd.Flags().StringVar(&datadir, utils.DataDirFlag.Name, paths.DefaultDataDir(), utils.DataDirFlag.Usage) must(cmd.MarkFlagDirname(utils.DataDirFlag.Name)) must(cmd.MarkFlagRequired(utils.DataDirFlag.Name)) cmd.Flags().IntVar(&databaseVerbosity, "database.verbosity", 2, "Enabling internal db logs. Very high verbosity levels may require recompile db. Default: 2, means warning.") diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index 30be72ed3da8ccc3c92b77aea999e50da6053ce9..94d239f8c7945a2f0c8c7d1e26a256151dcc709f 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -5,12 +5,14 @@ import ( "context" "fmt" "path/filepath" + "runtime" "sort" "strings" "sync" "github.com/c2h5oh/datasize" common2 "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/common/dir" "github.com/ledgerwatch/erigon-lib/etl" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/log/v3" @@ -551,20 +553,26 @@ func stageSenders(db kv.RwDB, ctx context.Context) error { if err != nil { return err } - cfg := stagedsync.StageSendersCfg(db, chainConfig, tmpdir, pm, allSnapshots(chainConfig)) + cfg := stagedsync.StageSendersCfg(db, chainConfig, tmpdir, pm, snapshotsync.NewBlockRetire(runtime.NumCPU(), tmpdir, allSnapshots(chainConfig), db)) if unwind > 0 { u := sync.NewUnwindState(stages.Senders, s.BlockNumber-unwind, s.BlockNumber) - err = stagedsync.UnwindSendersStage(u, tx, cfg, ctx) + if err = stagedsync.UnwindSendersStage(u, tx, cfg, ctx); err != nil { + return err + } + } else if pruneTo > 0 { + p, err := sync.PruneStageState(stages.Senders, s.BlockNumber, tx, db) if err != nil { return err } + if err = stagedsync.PruneSendersStage(p, tx, cfg, ctx); err != nil { + return err + } + return nil } else { - err = stagedsync.SpawnRecoverSendersStage(cfg, s, sync, tx, block, ctx) - if err != nil { + if err = stagedsync.SpawnRecoverSendersStage(cfg, s, sync, tx, block, ctx); err != nil { return err } } - return tx.Commit() } @@ -1023,7 +1031,7 @@ var _allSnapshotsSingleton *snapshotsync.RoSnapshots func allSnapshots(cc *params.ChainConfig) *snapshotsync.RoSnapshots { openSnapshotOnce.Do(func() { if enableSnapshot { - snapshotCfg := ethconfig.NewSnapshotCfg(true, false) + snapshotCfg := ethconfig.NewSnapshotCfg(enableSnapshot, true) _allSnapshotsSingleton = snapshotsync.NewRoSnapshots(snapshotCfg, filepath.Join(datadir, "snapshots")) if err := _allSnapshotsSingleton.ReopenSegments(); err != nil { panic(err) @@ -1036,12 +1044,18 @@ func allSnapshots(cc *params.ChainConfig) *snapshotsync.RoSnapshots { return _allSnapshotsSingleton } +var openBlockReaderOnce sync.Once +var _blockReaderSingleton interfaces.FullBlockReader + func getBlockReader(cc *params.ChainConfig) (blockReader interfaces.FullBlockReader) { - blockReader = snapshotsync.NewBlockReader() - if sn := allSnapshots(cc); sn != nil { - blockReader = snapshotsync.NewBlockReaderWithSnapshots(sn) - } - return blockReader + openBlockReaderOnce.Do(func() { + _blockReaderSingleton = snapshotsync.NewBlockReader() + if sn := allSnapshots(cc); sn != nil { + x := snapshotsync.NewBlockReaderWithSnapshots(sn) + _blockReaderSingleton = x + } + }) + return _blockReaderSingleton } func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig) (prune.Mode, consensus.Engine, *params.ChainConfig, *vm.Config, *stagedsync.Sync, *stagedsync.Sync, stagedsync.MiningState) { @@ -1099,8 +1113,9 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig) var batchSize datasize.ByteSize must(batchSize.UnmarshalText([]byte(batchSizeStr))) + br := getBlockReader(chainConfig) blockDownloaderWindow := 65536 - sentryControlServer, err := sentry.NewControlServer(db, "", chainConfig, genesisBlock.Hash(), engine, 1, nil, blockDownloaderWindow, getBlockReader(chainConfig)) + sentryControlServer, err := sentry.NewControlServer(db, "", chainConfig, genesisBlock.Hash(), engine, 1, nil, blockDownloaderWindow, br) if err != nil { panic(err) } @@ -1112,10 +1127,19 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig) if miningConfig != nil { cfg.Miner = *miningConfig } + cfg.Snapshot = allSnapshots(chainConfig).Cfg() + if cfg.Snapshot.Enabled { + snDir, err := dir.OpenRw(filepath.Join(datadir, "snapshots")) + if err != nil { + panic(err) + } + cfg.SnapshotDir = snDir + } sync, err := stages2.NewStagedSync(context.Background(), logger, db, p2p.Config{}, cfg, chainConfig.TerminalTotalDifficulty, sentryControlServer, tmpdir, nil, nil, nil, nil, nil, + allSnapshots(chainConfig), ) if err != nil { panic(err) @@ -1127,7 +1151,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig) stagedsync.StageMiningCreateBlockCfg(db, miner, *chainConfig, engine, nil, nil, nil, tmpdir), stagedsync.StageMiningExecCfg(db, miner, events, *chainConfig, engine, &vm.Config{}, tmpdir), stagedsync.StageHashStateCfg(db, tmpdir), - stagedsync.StageTrieCfg(db, false, true, tmpdir, getBlockReader(chainConfig)), + stagedsync.StageTrieCfg(db, false, true, tmpdir, br), stagedsync.StageMiningFinishCfg(db, *chainConfig, engine, miner, ctx.Done()), ), stagedsync.MiningUnwindOrder, diff --git a/cmd/rpcdaemon/cli/config.go b/cmd/rpcdaemon/cli/config.go index c8ccb76b05513e60dd844f55d534d8ada4d0acd1..f0ab51e13e9dd6dcec9d0ebc232365bb3798ea2c 100644 --- a/cmd/rpcdaemon/cli/config.go +++ b/cmd/rpcdaemon/cli/config.go @@ -115,7 +115,7 @@ func RootCommand() (*cobra.Command, *httpcfg.HttpCfg) { if cfg.Chaindata == "" { cfg.Chaindata = filepath.Join(cfg.DataDir, "chaindata") } - cfg.Snapshot = ethconfig.NewSnapshotCfg(cfg.Snapshot.Enabled, cfg.Snapshot.RetireEnabled) + cfg.Snapshot = ethconfig.NewSnapshotCfg(cfg.Snapshot.Enabled, cfg.Snapshot.KeepBlocks) } if cfg.TxPoolApiAddr == "" { cfg.TxPoolApiAddr = cfg.PrivateApiAddr diff --git a/cmd/state/commands/erigon2.go b/cmd/state/commands/erigon2.go index efc4862d2fd2b8c893600b0ecc393b3079b53f97..0b31d49b7b6d7a4a27c186f3408944a9247824bb 100644 --- a/cmd/state/commands/erigon2.go +++ b/cmd/state/commands/erigon2.go @@ -130,7 +130,7 @@ func Erigon2(genesis *core.Genesis, chainConfig *params.ChainConfig, logger log. if err != nil { return err } - allSnapshots := snapshotsync.NewRoSnapshots(ethconfig.NewSnapshotCfg(true, false), path.Join(datadir, "snapshots")) + allSnapshots := snapshotsync.NewRoSnapshots(ethconfig.NewSnapshotCfg(true, true), path.Join(datadir, "snapshots")) defer allSnapshots.Close() blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots) } else { diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index a25b0aa4a9097f27c0f7144500aa2912ce3d62a9..cee78d80aca3dc4897e0313464ed0396277a0add 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -619,9 +619,9 @@ var ( Name: "experimental.snapshot", Usage: "Enabling experimental snapshot sync", } - SnapshotRetireFlag = cli.BoolFlag{ - Name: ethconfig.FlagSnapshotRetire, - Usage: "Delete(!) old blocks from DB, by moving them to snapshots", + SnapshotKeepBlocksFlag = cli.BoolFlag{ + Name: ethconfig.FlagSnapshotKeepBlocks, + Usage: "Keep ancient blocks in db (useful for debug)", } TorrentVerbosityFlag = cli.StringFlag{ Name: "torrent.verbosity", @@ -1333,9 +1333,7 @@ func CheckExclusive(ctx *cli.Context, args ...interface{}) { // SetEthConfig applies eth-related command line flags to the config. func SetEthConfig(ctx *cli.Context, nodeConfig *node.Config, cfg *ethconfig.Config) { - if ctx.GlobalBool(SnapshotSyncFlag.Name) { - cfg.Snapshot.Enabled = true - } + cfg.Snapshot.Enabled = ctx.GlobalBool(SnapshotSyncFlag.Name) if cfg.Snapshot.Enabled { snDir, err := dir.OpenRw(filepath.Join(nodeConfig.DataDir, "snapshots")) if err != nil { @@ -1343,9 +1341,7 @@ func SetEthConfig(ctx *cli.Context, nodeConfig *node.Config, cfg *ethconfig.Conf } cfg.SnapshotDir = snDir } - if ctx.GlobalBool(SnapshotRetireFlag.Name) { - cfg.Snapshot.RetireEnabled = true - } + cfg.Snapshot.KeepBlocks = ctx.GlobalBool(SnapshotKeepBlocksFlag.Name) torrentVerbosity := lg.Warning if ctx.GlobalIsSet(TorrentVerbosityFlag.Name) { torrentVerbosity = torrentcfg.String2LogLevel[ctx.GlobalString(TorrentVerbosityFlag.Name)] diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index 6d72abcfeaf04a9e8f506a032162471d52d583a5..d17de0bb1766eec7ab93dbc336624754f367d2b7 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -1042,43 +1042,69 @@ func WriteBlock(db kv.RwTx, block *types.Block) error { return nil } +func min(a, b uint64) uint64 { + if a < b { + return a + } + return b +} + // DeleteAncientBlocks - delete old block after moving it to snapshots. [from, to) -func DeleteAncientBlocks(db kv.RwTx, blockFrom, blockTo uint64) error { - //doesn't delete Receipts - because Receipts are not in snapshots yet +// doesn't delete reciepts +func DeleteAncientBlocks(db kv.RwTx, blockTo uint64, blocksDeleteLimit int) error { + c, err := db.Cursor(kv.Headers) + if err != nil { + return err + } + defer c.Close() + + var stopAtBlock uint64 + { + k, _, err := c.First() + if err != nil { + return err + } + firstBlock := binary.BigEndian.Uint64(k) + stopAtBlock = min(blockTo, firstBlock+uint64(blocksDeleteLimit)) + } + for k, _, err := c.First(); k != nil; k, _, err = c.Next() { + if err != nil { + return err + } + + n := binary.BigEndian.Uint64(k) + if n >= stopAtBlock { + break + } - for n := blockFrom; n < blockTo; n++ { canonicalHash, err := ReadCanonicalHash(db, n) if err != nil { return err } - if err := db.ForPrefix(kv.Headers, dbutils.EncodeBlockNumber(n), func(k, v []byte) error { - isCanonical := bytes.Equal(k[8:], canonicalHash[:]) - if err := db.Delete(kv.Headers, k, nil); err != nil { - return err - } - b, err := ReadBodyForStorageByKey(db, k) - if err != nil { - return err - } - txIDBytes := make([]byte, 8) - for txID := b.BaseTxId; txID < b.BaseTxId+uint64(b.TxAmount); txID++ { - binary.BigEndian.PutUint64(txIDBytes, txID) - bucket := kv.EthTx - if !isCanonical { - bucket = kv.NonCanonicalTxs - } - if err := db.Delete(bucket, txIDBytes, nil); err != nil { - return err - } - } - if err := db.Delete(kv.BlockBody, k, nil); err != nil { - return err + isCanonical := bytes.Equal(k[8:], canonicalHash[:]) + + b, err := ReadBodyForStorageByKey(db, k) + if err != nil { + return err + } + txIDBytes := make([]byte, 8) + for txID := b.BaseTxId; txID < b.BaseTxId+uint64(b.TxAmount); txID++ { + binary.BigEndian.PutUint64(txIDBytes, txID) + bucket := kv.EthTx + if !isCanonical { + bucket = kv.NonCanonicalTxs } - if err := db.Delete(kv.Senders, k, nil); err != nil { + if err := db.Delete(bucket, txIDBytes, nil); err != nil { return err } - return nil - }); err != nil { + } + if err := db.Delete(kv.Headers, k, nil); err != nil { + return err + } + if err := db.Delete(kv.BlockBody, k, nil); err != nil { + return err + } + if err := db.Delete(kv.Senders, k, nil); err != nil { return err } } diff --git a/eth/backend.go b/eth/backend.go index e4dc5e6bb97d00ba843cdf626be0977afd112f8b..f75d9e68e8f53c5930c62a100513e03b83fa358e 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -311,6 +311,7 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l } var blockReader interfaces.FullBlockReader + var allSnapshots *snapshotsync.RoSnapshots if config.Snapshot.Enabled { snConfig := snapshothashes.KnownConfig(chainConfig.ChainName) snConfig.ExpectBlocks, err = RestoreExpectedExternalSnapshot(chainKv, snConfig) @@ -318,7 +319,7 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l return nil, err } - allSnapshots := snapshotsync.NewRoSnapshots(config.Snapshot, config.SnapshotDir.Path) + allSnapshots = snapshotsync.NewRoSnapshots(config.Snapshot, config.SnapshotDir.Path) allSnapshots.AsyncOpenAll(ctx) blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots) @@ -502,7 +503,7 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l stack.Config().P2P, *config, chainConfig.TerminalTotalDifficulty, backend.sentryControlServer, tmpdir, backend.notifications.Accumulator, backend.newPayloadCh, backend.forkChoiceCh, &backend.waitingForBeaconChain, - backend.downloaderClient) + backend.downloaderClient, allSnapshots) if err != nil { return nil, err } diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index fa2eb03ac5310daf4b03bfac64339340cdd68394..8d22137666d08e886d9031a8dda27854704d6887 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -123,8 +123,8 @@ func init() { //go:generate gencodec -type Config -formats toml -out gen_config.go type Snapshot struct { - Enabled bool - RetireEnabled bool + Enabled bool + KeepBlocks bool } func (s Snapshot) String() string { @@ -132,19 +132,19 @@ func (s Snapshot) String() string { if s.Enabled { out = append(out, "--"+FlagSnapshot+"=true") } - if s.RetireEnabled { - out = append(out, "--"+FlagSnapshotRetire+"=true") + if s.KeepBlocks { + out = append(out, "--"+FlagSnapshotKeepBlocks+"=true") } return strings.Join(out, " ") } var ( - FlagSnapshot = "experimental.snapshot" - FlagSnapshotRetire = "experimental.snapshot.retire" + FlagSnapshot = "experimental.snapshot" + FlagSnapshotKeepBlocks = "experimental.snapshot.keepblocks" ) -func NewSnapshotCfg(enabled, retireEnabled bool) Snapshot { - return Snapshot{Enabled: enabled, RetireEnabled: retireEnabled} +func NewSnapshotCfg(enabled, keepBlocks bool) Snapshot { + return Snapshot{Enabled: enabled, KeepBlocks: keepBlocks} } // Config contains configuration options for ETH protocol. diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index e3b544b8d4d5fbb476d04fc5b3d20f88f6d55526..ff5b672a5b2c7d583df550e2115d7b5181e8e0c9 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -1082,16 +1082,20 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R return err } - sn, ok := cfg.snapshots.Blocks(cfg.snapshots.BlocksAvailable()) - if !ok { - return fmt.Errorf("snapshot not found for block: %d", cfg.snapshots.BlocksAvailable()) - } - // ResetSequence - allow set arbitrary value to sequence (for example to decrement it to exact value) - lastTxnID := sn.TxnHashIdx.BaseDataID() + uint64(sn.Transactions.Count()) - if err := rawdb.ResetSequence(tx, kv.EthTx, lastTxnID+1); err != nil { + ok, err := cfg.snapshots.ViewTxs(cfg.snapshots.BlocksAvailable(), func(sn *snapshotsync.TxnSegment) error { + lastTxnID := sn.IdxTxnHash.BaseDataID() + uint64(sn.Seg.Count()) + if err := rawdb.ResetSequence(tx, kv.EthTx, lastTxnID+1); err != nil { + return err + } + return nil + }) + if err != nil { return err } + if !ok { + return fmt.Errorf("snapshot not found for block: %d", cfg.snapshots.BlocksAvailable()) + } } // Add last headers from snapshots to HeaderDownloader (as persistent links) diff --git a/eth/stagedsync/stage_senders.go b/eth/stagedsync/stage_senders.go index b01c2b5887b4d3a23b4f20824d6c5cfcb72e162e..fcede85ba05c4e70d6ea5aae57c87c1e1283f189 100644 --- a/eth/stagedsync/stage_senders.go +++ b/eth/stagedsync/stage_senders.go @@ -38,11 +38,11 @@ type SendersCfg struct { tmpdir string prune prune.Mode chainConfig *params.ChainConfig - snapshots *snapshotsync.RoSnapshots + blockRetire *snapshotsync.BlockRetire snapshotHashesCfg *snapshothashes.Config } -func StageSendersCfg(db kv.RwDB, chainCfg *params.ChainConfig, tmpdir string, prune prune.Mode, snapshots *snapshotsync.RoSnapshots) SendersCfg { +func StageSendersCfg(db kv.RwDB, chainCfg *params.ChainConfig, tmpdir string, prune prune.Mode, br *snapshotsync.BlockRetire) SendersCfg { const sendersBatchSize = 10000 const sendersBlockSize = 4096 @@ -56,7 +56,7 @@ func StageSendersCfg(db kv.RwDB, chainCfg *params.ChainConfig, tmpdir string, pr tmpdir: tmpdir, chainConfig: chainCfg, prune: prune, - snapshots: snapshots, + blockRetire: br, snapshotHashesCfg: snapshothashes.KnownConfig(chainCfg.ChainName), } } @@ -103,8 +103,8 @@ func SpawnRecoverSendersStage(cfg SendersCfg, s *StageState, u Unwinder, tx kv.R defer canonicalC.Close() startFrom := s.BlockNumber + 1 - if cfg.snapshots != nil && startFrom < cfg.snapshots.BlocksAvailable() { - startFrom = cfg.snapshots.BlocksAvailable() + if cfg.blockRetire.Snapshots() != nil && startFrom < cfg.blockRetire.Snapshots().BlocksAvailable() { + startFrom = cfg.blockRetire.Snapshots().BlocksAvailable() } for k, v, err := canonicalC.Seek(dbutils.EncodeBlockNumber(startFrom)); k != nil; k, v, err = canonicalC.Next() { @@ -373,17 +373,16 @@ func PruneSendersStage(s *PruneState, tx kv.RwTx, cfg SendersCfg, ctx context.Co defer tx.Rollback() } - if cfg.snapshots != nil && cfg.snapshots.Cfg().RetireEnabled { + if cfg.blockRetire.Snapshots() != nil && cfg.blockRetire.Snapshots().Cfg().Enabled { if err := retireBlocks(s, tx, cfg, ctx); err != nil { return fmt.Errorf("retireBlocks: %w", err) } - } - - if cfg.prune.TxIndex.Enabled() { + } else if cfg.prune.TxIndex.Enabled() { if err = PruneTable(tx, kv.Senders, s.LogPrefix(), to, logEvery, ctx); err != nil { return err } } + if !useExternalTx { if err = tx.Commit(); err != nil { return err @@ -393,43 +392,32 @@ func PruneSendersStage(s *PruneState, tx kv.RwTx, cfg SendersCfg, ctx context.Co } func retireBlocks(s *PruneState, tx kv.RwTx, cfg SendersCfg, ctx context.Context) (err error) { - if err := cfg.snapshots.EnsureExpectedBlocksAreAvailable(cfg.snapshotHashesCfg); err != nil { - return err - } - blockFrom := cfg.snapshots.BlocksAvailable() + 1 - blockTo := s.ForwardProgress - params.FullImmutabilityThreshold - if blockTo-blockFrom < 1000 { + if cfg.blockRetire.Working() { return nil } - //TODO: avoid too large deletes - - chainID, _ := uint256.FromBig(cfg.chainConfig.ChainID) - wg := sync.WaitGroup{} - wg.Add(1) - go func() { // move to own goroutine, because in this goroutine already living RwTx - // in future we will do it in background - if err := snapshotsync.RetireBlocks(ctx, blockFrom, blockTo, *chainID, cfg.tmpdir, cfg.snapshots, cfg.db, 1, log.LvlDebug); err != nil { - panic(err) - //return err + if res := cfg.blockRetire.Result(); res != nil { + if res.Err != nil { + return fmt.Errorf("[%s] retire blocks last error: %w", s.LogPrefix(), res.Err) } - if err := cfg.snapshots.ReopenSegments(); err != nil { - panic(err) - //return err + } + if !cfg.blockRetire.Snapshots().Cfg().KeepBlocks { + canDeleteTo := cfg.blockRetire.CanDeleteTo(s.ForwardProgress) + if err := rawdb.DeleteAncientBlocks(tx, canDeleteTo, 1_000); err != nil { + return nil } - if err := cfg.snapshots.ReopenIndices(); err != nil { - panic(err) + } - //return err - } - // RoSnapshots must be atomic? Or we can create new instance? - // seed new 500K files + // TODO: remove this check for the release + if err := cfg.blockRetire.Snapshots().EnsureExpectedBlocksAreAvailable(cfg.snapshotHashesCfg); err != nil { + return err + } + blockFrom, blockTo, ok := cfg.blockRetire.CanRetire(s.ForwardProgress) + if !ok { + return nil + } - //if err := rawdb.DeleteAncientBlocks(tx, blockFrom, blockTo); err != nil { - // return nil - //} + chainID, _ := uint256.FromBig(cfg.chainConfig.ChainID) + cfg.blockRetire.RetireBlocksInBackground(ctx, blockFrom, blockTo, *chainID, log.LvlInfo) - defer wg.Done() - }() - wg.Wait() return nil } diff --git a/eth/stagedsync/stage_senders_test.go b/eth/stagedsync/stage_senders_test.go index addca9fa3e0af134e5211de93c5c7a7a3db76f53..d549449ca8a8e7c44dacce9a4301560a11958c7a 100644 --- a/eth/stagedsync/stage_senders_test.go +++ b/eth/stagedsync/stage_senders_test.go @@ -13,6 +13,7 @@ import ( "github.com/ledgerwatch/erigon/eth/stagedsync/stages" "github.com/ledgerwatch/erigon/ethdb/prune" "github.com/ledgerwatch/erigon/params" + "github.com/ledgerwatch/erigon/turbo/snapshotsync" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -108,7 +109,7 @@ func TestSenders(t *testing.T) { require.NoError(stages.SaveStageProgress(tx, stages.Bodies, 3)) - cfg := StageSendersCfg(db, params.TestChainConfig, "", prune.Mode{}, nil) + cfg := StageSendersCfg(db, params.TestChainConfig, "", prune.Mode{}, snapshotsync.NewBlockRetire(1, "", nil, db)) err := SpawnRecoverSendersStage(cfg, &StageState{ID: stages.Senders}, nil, tx, 3, ctx) assert.NoError(t, err) diff --git a/turbo/app/snapshots.go b/turbo/app/snapshots.go index 530d822b81851e266fb63ad0ee86680650650f44..c9eb05b1aa542201896d1188f6c423e80c82dd19 100644 --- a/turbo/app/snapshots.go +++ b/turbo/app/snapshots.go @@ -157,10 +157,14 @@ func doRetireCommand(cliCtx *cli.Context) error { snapshots := snapshotsync.NewRoSnapshots(cfg, snapshotDir) snapshots.ReopenSegments() + br := snapshotsync.NewBlockRetire(runtime.NumCPU()/2, tmpDir, snapshots, chainDB) + for i := from; i < to; i += every { - if err := snapshotsync.RetireBlocks(ctx, i, i+every, *chainID, tmpDir, snapshots, chainDB, runtime.NumCPU()/2, log.LvlInfo); err != nil { - panic(err) - //return err + br.RetireBlocksInBackground(ctx, i, i+every, *chainID, log.LvlInfo) + br.Wait() + res := br.Result() + if res.Err != nil { + panic(res.Err) } } return nil diff --git a/turbo/cli/default_flags.go b/turbo/cli/default_flags.go index 81769c72290a5e807e3bbc17d048129a388fbfef..037dc7c24d8115efa213d2eba28904fcd882d572 100644 --- a/turbo/cli/default_flags.go +++ b/turbo/cli/default_flags.go @@ -70,7 +70,7 @@ var DefaultFlags = []cli.Flag{ utils.TraceMaxtracesFlag, utils.SnapshotSyncFlag, - utils.SnapshotRetireFlag, + utils.SnapshotKeepBlocksFlag, utils.DbPageSizeFlag, utils.TorrentPortFlag, utils.TorrentUploadRateFlag, diff --git a/turbo/snapshotsync/block_reader.go b/turbo/snapshotsync/block_reader.go index 7cf40190f6a7f0d99517028fe3aae845dfb3f629..810dae4d04dc3712e6edddf822a018ce21306bda 100644 --- a/turbo/snapshotsync/block_reader.go +++ b/turbo/snapshotsync/block_reader.go @@ -5,7 +5,6 @@ import ( "context" "encoding/binary" "fmt" - "sync" "github.com/ledgerwatch/erigon-lib/gointerfaces" "github.com/ledgerwatch/erigon-lib/gointerfaces/remote" @@ -183,25 +182,33 @@ func (back *RemoteBlockReader) BodyRlp(ctx context.Context, tx kv.Getter, hash c // BlockReaderWithSnapshots can read blocks from db and snapshots type BlockReaderWithSnapshots struct { - sn *RoSnapshots - lock sync.RWMutex + sn *RoSnapshots } func NewBlockReaderWithSnapshots(snapshots *RoSnapshots) *BlockReaderWithSnapshots { return &BlockReaderWithSnapshots{sn: snapshots} } -func (back *BlockReaderWithSnapshots) HeaderByNumber(ctx context.Context, tx kv.Getter, blockHeight uint64) (*types.Header, error) { - sn, ok := back.sn.Blocks(blockHeight) - if !ok { - h := rawdb.ReadHeaderByNumber(tx, blockHeight) +func (back *BlockReaderWithSnapshots) HeaderByNumber(ctx context.Context, tx kv.Getter, blockHeight uint64) (h *types.Header, err error) { + ok, err := back.sn.ViewHeaders(blockHeight, func(segment *HeaderSegment) error { + h, err = back.headerFromSnapshot(blockHeight, segment, nil) + if err != nil { + return err + } + return nil + }) + if err != nil { + return nil, err + } + if ok { return h, nil } - return back.headerFromSnapshot(blockHeight, sn, nil) + + return rawdb.ReadHeaderByNumber(tx, blockHeight), nil } // HeaderByHash - will search header in all snapshots starting from recent -func (back *BlockReaderWithSnapshots) HeaderByHash(ctx context.Context, tx kv.Getter, hash common.Hash) (*types.Header, error) { - h, err := rawdb.ReadHeaderByHash(tx, hash) +func (back *BlockReaderWithSnapshots) HeaderByHash(ctx context.Context, tx kv.Getter, hash common.Hash) (h *types.Header, err error) { + h, err = rawdb.ReadHeaderByHash(tx, hash) if err != nil { return nil, err } @@ -210,65 +217,109 @@ func (back *BlockReaderWithSnapshots) HeaderByHash(ctx context.Context, tx kv.Ge } buf := make([]byte, 128) - for i := len(back.sn.blocks) - 1; i >= 0; i-- { - h, err := back.headerFromSnapshotByHash(hash, back.sn.blocks[i], buf) - if err != nil { - return nil, nil - } - if h != nil { - return h, nil + if err := back.sn.Headers.View(func(segments []*HeaderSegment) error { + for i := len(segments) - 1; i >= 0; i-- { + h, err = back.headerFromSnapshotByHash(hash, segments[i], buf) + if err != nil { + return err + } } + return nil + }); err != nil { + return nil, err } - return nil, nil + return h, nil } -func (back *BlockReaderWithSnapshots) CanonicalHash(ctx context.Context, tx kv.Getter, blockHeight uint64) (common.Hash, error) { - sn, ok := back.sn.Blocks(blockHeight) - if !ok { - return rawdb.ReadCanonicalHash(tx, blockHeight) - } - - h, err := back.headerFromSnapshot(blockHeight, sn, nil) +func (back *BlockReaderWithSnapshots) CanonicalHash(ctx context.Context, tx kv.Getter, blockHeight uint64) (h common.Hash, err error) { + ok, err := back.sn.ViewHeaders(blockHeight, func(segment *HeaderSegment) error { + header, err := back.headerFromSnapshot(blockHeight, segment, nil) + if err != nil { + return err + } + if header == nil { + return nil + } + h = header.Hash() + return nil + }) if err != nil { - return common.Hash{}, err + return h, err } - if h == nil { - return common.Hash{}, err + if ok { + return h, nil } - return h.Hash(), nil + + return rawdb.ReadCanonicalHash(tx, blockHeight) } -func (back *BlockReaderWithSnapshots) Header(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (*types.Header, error) { - sn, ok := back.sn.Blocks(blockHeight) - if !ok { - h := rawdb.ReadHeader(tx, hash, blockHeight) +func (back *BlockReaderWithSnapshots) Header(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (h *types.Header, err error) { + ok, err := back.sn.ViewHeaders(blockHeight, func(segment *HeaderSegment) error { + h, err = back.headerFromSnapshot(blockHeight, segment, nil) + if err != nil { + return err + } + return nil + }) + if ok { return h, nil } - return back.headerFromSnapshot(blockHeight, sn, nil) + h = rawdb.ReadHeader(tx, hash, blockHeight) + return h, nil } -func (back *BlockReaderWithSnapshots) ReadHeaderByNumber(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (*types.Header, error) { - sn, ok := back.sn.Blocks(blockHeight) +func (back *BlockReaderWithSnapshots) ReadHeaderByNumber(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (h *types.Header, err error) { + ok, err := back.sn.ViewHeaders(blockHeight, func(segment *HeaderSegment) error { + h, err = back.headerFromSnapshot(blockHeight, segment, nil) + if err != nil { + return err + } + return nil + }) + if err != nil { + return nil, err + } if !ok { - h := rawdb.ReadHeader(tx, hash, blockHeight) return h, nil } - return back.headerFromSnapshot(blockHeight, sn, nil) + h = rawdb.ReadHeader(tx, hash, blockHeight) + return h, nil } func (back *BlockReaderWithSnapshots) BodyWithTransactions(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (body *types.Body, err error) { - sn, ok := back.sn.Blocks(blockHeight) - if !ok { - body, err := rawdb.ReadBodyWithTransactions(tx, hash, blockHeight) + var baseTxnID uint64 + var txsAmount uint32 + ok, err := back.sn.ViewBodies(blockHeight, func(seg *BodySegment) error { + body, baseTxnID, txsAmount, err = back.bodyFromSnapshot(blockHeight, seg, nil) + if err != nil { + return err + } + return nil + }) + if err != nil { + return nil, err + } + if ok { + ok, err = back.sn.ViewTxs(blockHeight, func(seg *TxnSegment) error { + txs, senders, err := back.txsFromSnapshot(baseTxnID, txsAmount, seg, nil) + if err != nil { + return err + } + body.Transactions = txs + body.SendersToTxs(senders) + return nil + }) if err != nil { return nil, err } - return body, nil + if ok { + return body, nil + } } - body, _, _, _, err = back.bodyWithTransactionsFromSnapshot(blockHeight, sn, nil) + body, err = rawdb.ReadBodyWithTransactions(tx, hash, blockHeight) if err != nil { return nil, err } @@ -288,99 +339,127 @@ func (back *BlockReaderWithSnapshots) BodyRlp(ctx context.Context, tx kv.Getter, } func (back *BlockReaderWithSnapshots) Body(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (body *types.Body, err error) { - sn, ok := back.sn.Blocks(blockHeight) - if !ok { - body, _, _ := rawdb.ReadBody(tx, hash, blockHeight) - return body, nil - } - - body, _, _, err = back.bodyFromSnapshot(blockHeight, sn, nil) + ok, err := back.sn.ViewBodies(blockHeight, func(seg *BodySegment) error { + body, _, _, err = back.bodyFromSnapshot(blockHeight, seg, nil) + if err != nil { + return err + } + return nil + }) if err != nil { return nil, err } + if ok { + return body, nil + } + body, _, _ = rawdb.ReadBody(tx, hash, blockHeight) return body, nil } func (back *BlockReaderWithSnapshots) BlockWithSenders(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (block *types.Block, senders []common.Address, err error) { - sn, ok := back.sn.Blocks(blockHeight) - if !ok { - canonicalHash, err := rawdb.ReadCanonicalHash(tx, blockHeight) + var buf []byte + var h *types.Header + ok, err := back.sn.ViewHeaders(blockHeight, func(seg *HeaderSegment) error { + headerOffset := seg.idxHeaderHash.Lookup2(blockHeight - seg.idxHeaderHash.BaseDataID()) + gg := seg.seg.MakeGetter() + gg.Reset(headerOffset) + buf, _ = gg.Next(buf[:0]) + h = &types.Header{} + if err = rlp.DecodeBytes(buf[1:], h); err != nil { + return err + } + return nil + }) + if err != nil { + return + } + if ok { + var b *types.BodyForStorage + ok, err = back.sn.ViewBodies(blockHeight, func(seg *BodySegment) error { + bodyOffset := seg.idxBodyNumber.Lookup2(blockHeight - seg.idxBodyNumber.BaseDataID()) + gg := seg.seg.MakeGetter() + gg.Reset(bodyOffset) + buf, _ = gg.Next(buf[:0]) + b = &types.BodyForStorage{} + reader := bytes.NewReader(buf) + if err = rlp.Decode(reader, b); err != nil { + return err + } + return nil + }) if err != nil { - return nil, nil, fmt.Errorf("requested non-canonical hash %x. canonical=%x", hash, canonicalHash) + return } - if canonicalHash == hash { - block, senders, err = rawdb.ReadBlockWithSenders(tx, hash, blockHeight) + if ok { + if b.TxAmount <= 2 { + block = types.NewBlockFromStorage(hash, h, nil, b.Uncles) + if len(senders) != block.Transactions().Len() { + return block, senders, nil // no senders is fine - will recover them on the fly + } + block.SendersToTxs(senders) + return block, senders, nil + } + reader := bytes.NewReader(nil) + txs := make([]types.Transaction, b.TxAmount-2) + senders = make([]common.Address, b.TxAmount-2) + ok, err = back.sn.ViewTxs(blockHeight, func(seg *TxnSegment) error { + if b.BaseTxId < seg.IdxTxnHash.BaseDataID() { + return fmt.Errorf(".idx file has wrong baseDataID? %d<%d, %s", b.BaseTxId, seg.IdxTxnHash.BaseDataID(), seg.Seg.FilePath()) + } + r := recsplit.NewIndexReader(seg.IdxTxnId) + binary.BigEndian.PutUint64(buf[:8], b.BaseTxId-seg.IdxTxnId.BaseDataID()) + txnOffset := r.Lookup(buf[:8]) + gg := seg.Seg.MakeGetter() + gg.Reset(txnOffset) + stream := rlp.NewStream(reader, 0) + buf, _ = gg.Next(buf[:0]) //first system-tx + for i := uint32(0); i < b.TxAmount-2; i++ { + buf, _ = gg.Next(buf[:0]) + if len(buf) < 1+20 { + return fmt.Errorf("segment %s has too short record: len(buf)=%d < 21", seg.Seg.FilePath(), len(buf)) + } + senders[i].SetBytes(buf[1 : 1+20]) + txRlp := buf[1+20:] + reader.Reset(txRlp) + stream.Reset(reader, 0) + txs[i], err = types.DecodeTransaction(stream) + if err != nil { + return err + } + txs[i].SetSender(senders[i]) + } + return nil + }) if err != nil { return nil, nil, err } - return block, senders, nil + if ok { + block = types.NewBlockFromStorage(hash, h, txs, b.Uncles) + if len(senders) != block.Transactions().Len() { + return block, senders, nil // no senders is fine - will recover them on the fly + } + block.SendersToTxs(senders) + return block, senders, nil + } } - return rawdb.NonCanonicalBlockWithSenders(tx, hash, blockHeight) - } - - buf := make([]byte, 16) - - back.lock.Lock() - defer back.lock.Unlock() - - headerOffset := sn.HeaderHashIdx.Lookup2(blockHeight - sn.HeaderHashIdx.BaseDataID()) - bodyOffset := sn.BodyNumberIdx.Lookup2(blockHeight - sn.BodyNumberIdx.BaseDataID()) - - gg := sn.Headers.MakeGetter() - gg.Reset(headerOffset) - buf, _ = gg.Next(buf[:0]) - h := &types.Header{} - if err = rlp.DecodeBytes(buf[1:], h); err != nil { - return nil, nil, err } - - gg = sn.Bodies.MakeGetter() - gg.Reset(bodyOffset) - buf, _ = gg.Next(buf[:0]) - b := &types.BodyForStorage{} - reader := bytes.NewReader(buf) - if err = rlp.Decode(reader, b); err != nil { - return nil, nil, err - } - - if b.BaseTxId < sn.TxnHashIdx.BaseDataID() { - return nil, nil, fmt.Errorf(".idx file has wrong baseDataID? %d<%d, %s", b.BaseTxId, sn.TxnHashIdx.BaseDataID(), sn.Transactions.FilePath()) + canonicalHash, err := rawdb.ReadCanonicalHash(tx, blockHeight) + if err != nil { + return nil, nil, fmt.Errorf("requested non-canonical hash %x. canonical=%x", hash, canonicalHash) } - - txs := make([]types.Transaction, b.TxAmount-2) - senders = make([]common.Address, b.TxAmount-2) - if b.TxAmount > 2 { - r := recsplit.NewIndexReader(sn.TxnIdsIdx) - binary.BigEndian.PutUint64(buf[:8], b.BaseTxId-sn.TxnIdsIdx.BaseDataID()) - txnOffset := r.Lookup(buf[:8]) - gg = sn.Transactions.MakeGetter() - gg.Reset(txnOffset) - stream := rlp.NewStream(reader, 0) - buf, _ = gg.Next(buf[:0]) //first system-tx - for i := uint32(0); i < b.TxAmount-2; i++ { - buf, _ = gg.Next(buf[:0]) - senders[i].SetBytes(buf[1 : 1+20]) - txRlp := buf[1+20:] - reader.Reset(txRlp) - stream.Reset(reader, 0) - txs[i], err = types.DecodeTransaction(stream) - if err != nil { - return nil, nil, err - } + if canonicalHash == hash { + block, senders, err = rawdb.ReadBlockWithSenders(tx, hash, blockHeight) + if err != nil { + return nil, nil, err } + return block, senders, nil } - - block = types.NewBlockFromStorage(hash, h, txs, b.Uncles) - if len(senders) != block.Transactions().Len() { - return block, senders, nil // no senders is fine - will recover them on the fly - } - block.SendersToTxs(senders) - return block, senders, nil + return rawdb.NonCanonicalBlockWithSenders(tx, hash, blockHeight) } -func (back *BlockReaderWithSnapshots) headerFromSnapshot(blockHeight uint64, sn *BlocksSnapshot, buf []byte) (*types.Header, error) { - headerOffset := sn.HeaderHashIdx.Lookup2(blockHeight - sn.HeaderHashIdx.BaseDataID()) - gg := sn.Headers.MakeGetter() +func (back *BlockReaderWithSnapshots) headerFromSnapshot(blockHeight uint64, sn *HeaderSegment, buf []byte) (*types.Header, error) { + headerOffset := sn.idxHeaderHash.Lookup2(blockHeight - sn.idxHeaderHash.BaseDataID()) + gg := sn.seg.MakeGetter() gg.Reset(headerOffset) buf, _ = gg.Next(buf[:0]) h := &types.Header{} @@ -394,11 +473,11 @@ func (back *BlockReaderWithSnapshots) headerFromSnapshot(blockHeight uint64, sn // because HeaderByHash method will search header in all snapshots - and may request header which doesn't exists // but because our indices are based on PerfectHashMap, no way to know is given key exists or not, only way - // to make sure is to fetch it and compare hash -func (back *BlockReaderWithSnapshots) headerFromSnapshotByHash(hash common.Hash, sn *BlocksSnapshot, buf []byte) (*types.Header, error) { - reader := recsplit.NewIndexReader(sn.HeaderHashIdx) +func (back *BlockReaderWithSnapshots) headerFromSnapshotByHash(hash common.Hash, sn *HeaderSegment, buf []byte) (*types.Header, error) { + reader := recsplit.NewIndexReader(sn.idxHeaderHash) localID := reader.Lookup(hash[:]) - headerOffset := sn.HeaderHashIdx.Lookup2(localID) - gg := sn.Headers.MakeGetter() + headerOffset := sn.idxHeaderHash.Lookup2(localID) + gg := sn.seg.MakeGetter() gg.Reset(headerOffset) buf, _ = gg.Next(buf[:0]) if hash[0] != buf[0] { @@ -415,10 +494,10 @@ func (back *BlockReaderWithSnapshots) headerFromSnapshotByHash(hash common.Hash, return h, nil } -func (back *BlockReaderWithSnapshots) bodyFromSnapshot(blockHeight uint64, sn *BlocksSnapshot, buf []byte) (*types.Body, uint64, uint32, error) { - bodyOffset := sn.BodyNumberIdx.Lookup2(blockHeight - sn.BodyNumberIdx.BaseDataID()) +func (back *BlockReaderWithSnapshots) bodyFromSnapshot(blockHeight uint64, sn *BodySegment, buf []byte) (*types.Body, uint64, uint32, error) { + bodyOffset := sn.idxBodyNumber.Lookup2(blockHeight - sn.idxBodyNumber.BaseDataID()) - gg := sn.Bodies.MakeGetter() + gg := sn.seg.MakeGetter() gg.Reset(bodyOffset) buf, _ = gg.Next(buf[:0]) b := &types.BodyForStorage{} @@ -427,8 +506,8 @@ func (back *BlockReaderWithSnapshots) bodyFromSnapshot(blockHeight uint64, sn *B return nil, 0, 0, err } - if b.BaseTxId < sn.TxnHashIdx.BaseDataID() { - return nil, 0, 0, fmt.Errorf(".idx file has wrong baseDataID? %d<%d, %s", b.BaseTxId, sn.TxnHashIdx.BaseDataID(), sn.Transactions.FilePath()) + if b.BaseTxId < sn.idxBodyNumber.BaseDataID() { + return nil, 0, 0, fmt.Errorf(".idx file has wrong baseDataID? %d<%d, %s", b.BaseTxId, sn.idxBodyNumber.BaseDataID(), sn.seg.FilePath()) } body := new(types.Body) @@ -436,19 +515,15 @@ func (back *BlockReaderWithSnapshots) bodyFromSnapshot(blockHeight uint64, sn *B return body, b.BaseTxId + 1, b.TxAmount - 2, nil // empty txs in the beginning and end of block } -func (back *BlockReaderWithSnapshots) bodyWithTransactionsFromSnapshot(blockHeight uint64, sn *BlocksSnapshot, buf []byte) (*types.Body, []common.Address, uint64, uint32, error) { - body, baseTxnID, txsAmount, err := back.bodyFromSnapshot(blockHeight, sn, buf) - if err != nil { - return nil, nil, 0, 0, err - } +func (back *BlockReaderWithSnapshots) txsFromSnapshot(baseTxnID uint64, txsAmount uint32, txsSeg *TxnSegment, buf []byte) ([]types.Transaction, []common.Address, error) { txs := make([]types.Transaction, txsAmount) senders := make([]common.Address, txsAmount) reader := bytes.NewReader(buf) if txsAmount > 0 { - r := recsplit.NewIndexReader(sn.TxnIdsIdx) - binary.BigEndian.PutUint64(buf[:8], baseTxnID-sn.TxnIdsIdx.BaseDataID()) + r := recsplit.NewIndexReader(txsSeg.IdxTxnId) + binary.BigEndian.PutUint64(buf[:8], baseTxnID-txsSeg.IdxTxnId.BaseDataID()) txnOffset := r.Lookup(buf[:8]) - gg := sn.Transactions.MakeGetter() + gg := txsSeg.Seg.MakeGetter() gg.Reset(txnOffset) stream := rlp.NewStream(reader, 0) for i := uint32(0); i < txsAmount; i++ { @@ -460,30 +535,29 @@ func (back *BlockReaderWithSnapshots) bodyWithTransactionsFromSnapshot(blockHeig var err error txs[i], err = types.DecodeTransaction(stream) if err != nil { - return nil, nil, 0, 0, err + return nil, nil, err } } } - return body, senders, baseTxnID, txsAmount, nil + return txs, senders, nil } -func (back *BlockReaderWithSnapshots) txnByHash(txnHash common.Hash, buf []byte) (txn types.Transaction, blockNum, txnID uint64, err error) { - for i := len(back.sn.blocks) - 1; i >= 0; i-- { - sn := back.sn.blocks[i] +func (back *BlockReaderWithSnapshots) txnByHash(txnHash common.Hash, segments []*TxnSegment, buf []byte) (txn types.Transaction, blockNum, txnID uint64, err error) { + for i := len(segments) - 1; i >= 0; i-- { + sn := segments[i] - reader := recsplit.NewIndexReader(sn.TxnHashIdx) + reader := recsplit.NewIndexReader(sn.IdxTxnHash) offset := reader.Lookup(txnHash[:]) - gg := sn.Transactions.MakeGetter() + gg := sn.Seg.MakeGetter() gg.Reset(offset) - //fmt.Printf("try: %d, %d, %d, %d\n", i, sn.From, localID, blockNum) buf, _ = gg.Next(buf[:0]) // first byte txnHash check - reducing false-positives 256 times. Allows don't store and don't calculate full hash of entity - when checking many snapshots. if txnHash[0] != buf[0] { continue } - reader2 := recsplit.NewIndexReader(sn.TxnHash2BlockNumIdx) + reader2 := recsplit.NewIndexReader(sn.IdxTxnHash2BlockNum) blockNum = reader2.Lookup(txnHash[:]) sender := buf[1 : 1+20] txn, err = types.DecodeTransaction(rlp.NewStream(bytes.NewReader(buf[1+20:]), uint64(len(buf)))) @@ -493,10 +567,8 @@ func (back *BlockReaderWithSnapshots) txnByHash(txnHash common.Hash, buf []byte) txn.SetSender(common.BytesToAddress(sender)) // final txnHash check - completely avoid false-positives if txn.Hash() == txnHash { - //fmt.Printf("try_succeed: %d, %d, %d, %d\n", i, sn.From, localID, blockNum) return } - //fmt.Printf("try_failed: %x, %x\n", txn.Hash(), txnHash) } return } @@ -511,8 +583,18 @@ func (back *BlockReaderWithSnapshots) TxnLookup(ctx context.Context, tx kv.Gette return *n, true, nil } - txn, blockNum, _, err := back.txnByHash(txnHash, nil) - if err != nil { + var txn types.Transaction + var blockNum uint64 + if err := back.sn.Txs.View(func(segments []*TxnSegment) error { + txn, blockNum, _, err = back.txnByHash(txnHash, segments, nil) + if err != nil { + return err + } + if txn == nil { + return nil + } + return nil + }); err != nil { return 0, false, err } if txn == nil { diff --git a/turbo/snapshotsync/block_snapshots.go b/turbo/snapshotsync/block_snapshots.go index 2c9d35fba3354ae4b0f8fc283b0c679392cb0191..a5e916d86fda086cfafa82d2166af1e4e921d620 100644 --- a/turbo/snapshotsync/block_snapshots.go +++ b/turbo/snapshotsync/block_snapshots.go @@ -31,6 +31,7 @@ import ( "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/eth/ethconfig" + "github.com/ledgerwatch/erigon/params" "github.com/ledgerwatch/erigon/rlp" "github.com/ledgerwatch/erigon/turbo/snapshotsync/snapshothashes" "github.com/ledgerwatch/log/v3" @@ -109,13 +110,230 @@ func IdxFileName(from, to uint64, fType string) string { return FileName(from, t func (s BlocksSnapshot) Has(block uint64) bool { return block >= s.From && block < s.To } +type HeaderSegment struct { + seg *compress.Decompressor // value: first_byte_of_header_hash + header_rlp + idxHeaderHash *recsplit.Index // header_hash -> headers_segment_offset + From, To uint64 +} + +func (sn *HeaderSegment) close() { + if sn.seg != nil { + sn.seg.Close() + sn.seg = nil + } + if sn.idxHeaderHash != nil { + sn.idxHeaderHash.Close() + sn.idxHeaderHash = nil + } +} +func (sn *HeaderSegment) reopen(dir string) (err error) { + sn.close() + fileName := SegmentFileName(sn.From, sn.To, Headers) + sn.seg, err = compress.NewDecompressor(path.Join(dir, fileName)) + if err != nil { + return err + } + sn.idxHeaderHash, err = recsplit.OpenIndex(path.Join(dir, IdxFileName(sn.From, sn.To, Headers.String()))) + if err != nil { + return err + } + return nil +} + +type BodySegment struct { + seg *compress.Decompressor // value: rlp(types.BodyForStorage) + idxBodyNumber *recsplit.Index // block_num_u64 -> bodies_segment_offset + From, To uint64 +} + +func (sn *BodySegment) close() { + if sn.seg != nil { + sn.seg.Close() + sn.seg = nil + } + if sn.idxBodyNumber != nil { + sn.idxBodyNumber.Close() + sn.idxBodyNumber = nil + } +} +func (sn *BodySegment) reopen(dir string) (err error) { + sn.close() + fileName := SegmentFileName(sn.From, sn.To, Bodies) + sn.seg, err = compress.NewDecompressor(path.Join(dir, fileName)) + if err != nil { + return err + } + sn.idxBodyNumber, err = recsplit.OpenIndex(path.Join(dir, IdxFileName(sn.From, sn.To, Bodies.String()))) + if err != nil { + return err + } + return nil +} + +type TxnSegment struct { + Seg *compress.Decompressor // value: first_byte_of_transaction_hash + transaction_rlp + IdxTxnHash *recsplit.Index // transaction_hash -> transactions_segment_offset + IdxTxnId *recsplit.Index // transaction_id -> transactions_segment_offset + IdxTxnHash2BlockNum *recsplit.Index // transaction_hash -> block_number + From, To uint64 +} + +func (sn *TxnSegment) close() { + if sn.Seg != nil { + sn.Seg.Close() + sn.Seg = nil + } + if sn.IdxTxnHash != nil { + sn.IdxTxnHash.Close() + sn.IdxTxnHash = nil + } + if sn.IdxTxnId != nil { + sn.IdxTxnId.Close() + sn.IdxTxnId = nil + } + if sn.IdxTxnHash2BlockNum != nil { + sn.IdxTxnHash2BlockNum.Close() + sn.IdxTxnHash2BlockNum = nil + } +} +func (sn *TxnSegment) reopen(dir string) (err error) { + sn.close() + fileName := SegmentFileName(sn.From, sn.To, Transactions) + sn.Seg, err = compress.NewDecompressor(path.Join(dir, fileName)) + if err != nil { + return err + } + sn.IdxTxnHash, err = recsplit.OpenIndex(path.Join(dir, IdxFileName(sn.From, sn.To, Transactions.String()))) + if err != nil { + return err + } + sn.IdxTxnId, err = recsplit.OpenIndex(path.Join(dir, IdxFileName(sn.From, sn.To, TransactionsId.String()))) + if err != nil { + return err + } + sn.IdxTxnHash2BlockNum, err = recsplit.OpenIndex(path.Join(dir, IdxFileName(sn.From, sn.To, Transactions2Block.String()))) + if err != nil { + return err + } + return nil +} + +type headerSegments struct { + lock sync.RWMutex + segments []*HeaderSegment +} + +func (s *headerSegments) closeLocked() { + for i := range s.segments { + s.segments[i].close() + } +} +func (s *headerSegments) reopen(dir string) error { + for i := range s.segments { + if err := s.segments[i].reopen(dir); err != nil { + return err + } + } + return nil +} +func (s *headerSegments) View(f func(segments []*HeaderSegment) error) error { + s.lock.RLock() + defer s.lock.RUnlock() + return f(s.segments) +} +func (s *headerSegments) ViewSegment(blockNum uint64, f func(sn *HeaderSegment) error) (found bool, err error) { + s.lock.RLock() + defer s.lock.RUnlock() + for _, seg := range s.segments { + if !(blockNum >= seg.From && blockNum < seg.To) { + continue + } + return true, f(seg) + } + return false, nil +} + +type bodySegments struct { + lock sync.RWMutex + segments []*BodySegment +} + +func (s *bodySegments) closeLocked() { + for i := range s.segments { + s.segments[i].close() + } +} +func (s *bodySegments) reopen(dir string) error { + for i := range s.segments { + if err := s.segments[i].reopen(dir); err != nil { + return err + } + } + return nil +} +func (s *bodySegments) View(f func([]*BodySegment) error) error { + s.lock.RLock() + defer s.lock.RUnlock() + return f(s.segments) +} +func (s *bodySegments) ViewSegment(blockNum uint64, f func(*BodySegment) error) (found bool, err error) { + s.lock.RLock() + defer s.lock.RUnlock() + for _, seg := range s.segments { + if !(blockNum >= seg.From && blockNum < seg.To) { + continue + } + return true, f(seg) + } + return false, nil +} + +type txnSegments struct { + lock sync.RWMutex + segments []*TxnSegment +} + +func (s *txnSegments) closeLocked() { + for i := range s.segments { + s.segments[i].close() + } +} +func (s *txnSegments) reopen(dir string) error { + for i := range s.segments { + if err := s.segments[i].reopen(dir); err != nil { + return err + } + } + return nil +} +func (s *txnSegments) View(f func([]*TxnSegment) error) error { + s.lock.RLock() + defer s.lock.RUnlock() + return f(s.segments) +} +func (s *txnSegments) ViewSegment(blockNum uint64, f func(*TxnSegment) error) (found bool, err error) { + s.lock.RLock() + defer s.lock.RUnlock() + for _, seg := range s.segments { + if !(blockNum >= seg.From && blockNum < seg.To) { + continue + } + return true, f(seg) + } + return false, nil +} + type RoSnapshots struct { - indicesReady atomic.Bool - segmentsReady atomic.Bool - blocks []*BlocksSnapshot + indicesReady atomic.Bool + segmentsReady atomic.Bool + + Headers *headerSegments + Bodies *bodySegments + Txs *txnSegments + dir string - segmentsAvailable uint64 - idxAvailable uint64 + segmentsAvailable atomic.Uint64 + idxAvailable atomic.Uint64 cfg ethconfig.Snapshot } @@ -125,15 +343,15 @@ type RoSnapshots struct { // - gaps are not allowed // - segment have [from:to) semantic func NewRoSnapshots(cfg ethconfig.Snapshot, snapshotDir string) *RoSnapshots { - return &RoSnapshots{dir: snapshotDir, cfg: cfg} + return &RoSnapshots{dir: snapshotDir, cfg: cfg, Headers: &headerSegments{}, Bodies: &bodySegments{}, Txs: &txnSegments{}} } func (s *RoSnapshots) Cfg() ethconfig.Snapshot { return s.cfg } func (s *RoSnapshots) Dir() string { return s.dir } func (s *RoSnapshots) SegmentsReady() bool { return s.segmentsReady.Load() } -func (s *RoSnapshots) BlocksAvailable() uint64 { return s.segmentsAvailable } +func (s *RoSnapshots) BlocksAvailable() uint64 { return s.segmentsAvailable.Load() } func (s *RoSnapshots) IndicesReady() bool { return s.indicesReady.Load() } -func (s *RoSnapshots) IndicesAvailable() uint64 { return s.idxAvailable } +func (s *RoSnapshots) IndicesAvailable() uint64 { return s.idxAvailable.Load() } func (s *RoSnapshots) EnsureExpectedBlocksAreAvailable(cfg *snapshothashes.Config) error { if s.BlocksAvailable() < cfg.ExpectBlocks { @@ -168,70 +386,43 @@ func (s *RoSnapshots) IdxAvailability() (headers, bodies, txs uint64, err error) } func (s *RoSnapshots) ReopenIndices() error { - s.closeIndices() return s.ReopenSomeIndices(AllSnapshotTypes...) } func (s *RoSnapshots) ReopenSomeIndices(types ...Type) (err error) { - for _, bs := range s.blocks { - for _, snapshotType := range types { - switch snapshotType { - case Headers: - if bs.HeaderHashIdx != nil { - bs.HeaderHashIdx.Close() - bs.HeaderHashIdx = nil - } - bs.HeaderHashIdx, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(bs.From, bs.To, Headers.String()))) - if err != nil { - return err - } - case Bodies: - if bs.BodyNumberIdx != nil { - bs.BodyNumberIdx.Close() - bs.BodyNumberIdx = nil - } - bs.BodyNumberIdx, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(bs.From, bs.To, Bodies.String()))) - if err != nil { - return err - } - case Transactions: - if bs.TxnHashIdx != nil { - bs.TxnHashIdx.Close() - bs.TxnHashIdx = nil - } - bs.TxnHashIdx, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(bs.From, bs.To, Transactions.String()))) - if err != nil { - return err - } - - if bs.TxnIdsIdx != nil { - bs.TxnIdsIdx.Close() - bs.TxnIdsIdx = nil - } - bs.TxnIdsIdx, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(bs.From, bs.To, TransactionsId.String()))) - if err != nil { - return err - } - - if bs.TxnHash2BlockNumIdx != nil { - bs.TxnHash2BlockNumIdx.Close() - bs.TxnHash2BlockNumIdx = nil - } - bs.TxnHash2BlockNumIdx, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(bs.From, bs.To, Transactions2Block.String()))) - if err != nil { - return err - } - default: - panic(fmt.Sprintf("unknown snapshot type: %s", snapshotType)) + s.Headers.lock.Lock() + defer s.Headers.lock.Unlock() + s.Bodies.lock.Lock() + defer s.Bodies.lock.Unlock() + s.Txs.lock.Lock() + defer s.Txs.lock.Unlock() + for _, t := range types { + switch t { + case Headers: + if err := s.Headers.reopen(s.dir); err != nil { + return err + } + case Bodies: + if err := s.Bodies.reopen(s.dir); err != nil { + return err } + case Transactions: + if err := s.Txs.reopen(s.dir); err != nil { + return err + } + default: + panic(fmt.Sprintf("unknown snapshot type: %s", t)) } + } - if bs.To > 0 { - s.idxAvailable = bs.To - 1 - } else { - s.idxAvailable = 0 - } + //TODO: make calculatable? + segments := s.Headers.segments + if len(segments) > 0 && segments[len(segments)-1].To > 0 { + s.idxAvailable.Store(segments[len(segments)-1].To - 1) + } else { + s.idxAvailable.Store(0) } + s.indicesReady.Store(true) return nil } @@ -256,51 +447,59 @@ func (s *RoSnapshots) AsyncOpenAll(ctx context.Context) { } func (s *RoSnapshots) ReopenSegments() error { - s.closeSegements() - s.closeIndices() - s.blocks = nil + s.Headers.lock.Lock() + defer s.Headers.lock.Unlock() + s.Bodies.lock.Lock() + defer s.Bodies.lock.Unlock() + s.Txs.lock.Lock() + defer s.Txs.lock.Unlock() + s.closeSegmentsLocked() files, err := segmentsOfType(s.dir, Headers) if err != nil { return err } for _, f := range files { - blocksSnapshot := &BlocksSnapshot{From: f.From, To: f.To} { + seg := &BodySegment{From: f.From, To: f.To} fileName := SegmentFileName(f.From, f.To, Bodies) - blocksSnapshot.Bodies, err = compress.NewDecompressor(path.Join(s.dir, fileName)) + seg.seg, err = compress.NewDecompressor(path.Join(s.dir, fileName)) if err != nil { if errors.Is(err, os.ErrNotExist) { break } return err } + s.Bodies.segments = append(s.Bodies.segments, seg) } { + seg := &HeaderSegment{From: f.From, To: f.To} fileName := SegmentFileName(f.From, f.To, Headers) - blocksSnapshot.Headers, err = compress.NewDecompressor(path.Join(s.dir, fileName)) + seg.seg, err = compress.NewDecompressor(path.Join(s.dir, fileName)) if err != nil { if errors.Is(err, os.ErrNotExist) { break } return err } + s.Headers.segments = append(s.Headers.segments, seg) } { + seg := &TxnSegment{From: f.From, To: f.To} fileName := SegmentFileName(f.From, f.To, Transactions) - blocksSnapshot.Transactions, err = compress.NewDecompressor(path.Join(s.dir, fileName)) + seg.Seg, err = compress.NewDecompressor(path.Join(s.dir, fileName)) if err != nil { if errors.Is(err, os.ErrNotExist) { break } return err } + s.Txs.segments = append(s.Txs.segments, seg) } - s.blocks = append(s.blocks, blocksSnapshot) - if blocksSnapshot.To > 0 { - s.segmentsAvailable = blocksSnapshot.To - 1 + if f.To > 0 { + s.segmentsAvailable.Store(f.To - 1) } else { - s.segmentsAvailable = 0 + s.segmentsAvailable.Store(0) } } s.segmentsReady.Store(true) @@ -308,116 +507,127 @@ func (s *RoSnapshots) ReopenSegments() error { } func (s *RoSnapshots) Close() { - s.closeSegements() - s.closeIndices() - s.blocks = nil + s.Headers.lock.Lock() + defer s.Headers.lock.Unlock() + s.Bodies.lock.Lock() + defer s.Bodies.lock.Unlock() + s.Txs.lock.Lock() + defer s.Txs.lock.Unlock() + s.closeSegmentsLocked() } - -func (s *RoSnapshots) closeSegements() { - for _, s := range s.blocks { - if s.Headers != nil { - s.Headers.Close() - } - if s.Bodies != nil { - s.Bodies.Close() - } - if s.Transactions != nil { - s.Transactions.Close() - } +func (s *RoSnapshots) closeSegmentsLocked() { + if s.Headers != nil { + s.Headers.closeLocked() + s.Headers.segments = nil } -} -func (s *RoSnapshots) closeIndices() { - for _, s := range s.blocks { - if s.HeaderHashIdx != nil { - s.HeaderHashIdx.Close() - } - if s.BodyNumberIdx != nil { - s.BodyNumberIdx.Close() - } - if s.TxnHashIdx != nil { - s.TxnHashIdx.Close() - } - if s.TxnIdsIdx != nil { - s.TxnIdsIdx.Close() - } - if s.TxnHash2BlockNumIdx != nil { - s.TxnHash2BlockNumIdx.Close() - } + if s.Bodies != nil { + s.Bodies.closeLocked() + s.Bodies.segments = nil + } + if s.Txs != nil { + s.Txs.closeLocked() + s.Txs.segments = nil } } - -func (s *RoSnapshots) Blocks(blockNumber uint64) (snapshot *BlocksSnapshot, found bool) { - if !s.indicesReady.Load() { - return nil, false +func (s *RoSnapshots) ViewHeaders(blockNum uint64, f func(sn *HeaderSegment) error) (found bool, err error) { + if !s.indicesReady.Load() || blockNum > s.segmentsAvailable.Load() { + return false, nil } - - if blockNumber > s.segmentsAvailable { - return snapshot, false + return s.Headers.ViewSegment(blockNum, f) +} +func (s *RoSnapshots) ViewBodies(blockNum uint64, f func(sn *BodySegment) error) (found bool, err error) { + if !s.indicesReady.Load() || blockNum > s.segmentsAvailable.Load() { + return false, nil } - for _, blocksSnapshot := range s.blocks { - if blocksSnapshot.Has(blockNumber) { - return blocksSnapshot, true - } + return s.Bodies.ViewSegment(blockNum, f) +} +func (s *RoSnapshots) ViewTxs(blockNum uint64, f func(sn *TxnSegment) error) (found bool, err error) { + if !s.indicesReady.Load() || blockNum > s.segmentsAvailable.Load() { + return false, nil } - return snapshot, false + return s.Txs.ViewSegment(blockNum, f) } func BuildIndices(ctx context.Context, s *RoSnapshots, snapshotDir *dir.Rw, chainID uint256.Int, tmpDir string, from uint64, lvl log.Lvl) error { logEvery := time.NewTicker(20 * time.Second) defer logEvery.Stop() - for _, sn := range s.blocks { - if sn.From < from { - continue - } - f := filepath.Join(snapshotDir.Path, SegmentFileName(sn.From, sn.To, Headers)) - if err := HeadersHashIdx(ctx, f, sn.From, tmpDir, logEvery, lvl); err != nil { - return err + if err := s.Headers.View(func(segments []*HeaderSegment) error { + for _, sn := range segments { + if sn.From < from { + continue + } + f := filepath.Join(snapshotDir.Path, SegmentFileName(sn.From, sn.To, Headers)) + if err := HeadersHashIdx(ctx, f, sn.From, tmpDir, logEvery, lvl); err != nil { + return err + } } + return nil + }); err != nil { + return nil } - for _, sn := range s.blocks { - if sn.From < from { - continue - } - f := filepath.Join(snapshotDir.Path, SegmentFileName(sn.From, sn.To, Bodies)) - if err := BodiesIdx(ctx, f, sn.From, tmpDir, logEvery, lvl); err != nil { - return err + if err := s.Bodies.View(func(segments []*BodySegment) error { + for _, sn := range segments { + if sn.From < from { + continue + } + f := filepath.Join(snapshotDir.Path, SegmentFileName(sn.From, sn.To, Bodies)) + if err := BodiesIdx(ctx, f, sn.From, tmpDir, logEvery, lvl); err != nil { + return err + } } + return nil + }); err != nil { + return nil } // hack to read first block body - to get baseTxId from there if err := s.ReopenSomeIndices(Headers, Bodies); err != nil { return err } - for _, sn := range s.blocks { - if sn.From < from { - continue - } - // build txs idx - gg := sn.Bodies.MakeGetter() - buf, _ := gg.Next(nil) - firstBody := &types.BodyForStorage{} - if err := rlp.DecodeBytes(buf, firstBody); err != nil { - return err - } + if err := s.Txs.View(func(segments []*TxnSegment) error { + for i, sn := range segments { + if sn.From < from { + continue + } - var expectedTxsAmount uint64 - { - off := sn.BodyNumberIdx.Lookup2(sn.To - 1 - sn.From) - gg.Reset(off) + if err := s.Bodies.View(func(bodySegments []*BodySegment) error { + // build txs idx + gg := bodySegments[i].seg.MakeGetter() + buf, _ := gg.Next(nil) + firstBody := &types.BodyForStorage{} + if err := rlp.DecodeBytes(buf, firstBody); err != nil { + return err + } - buf, _ = gg.Next(buf[:0]) - lastBody := new(types.BodyForStorage) - err := rlp.DecodeBytes(buf, lastBody) - if err != nil { - return err + var expectedTxsAmount uint64 + { + off := bodySegments[i].idxBodyNumber.Lookup2(sn.To - 1 - sn.From) + gg.Reset(off) + + buf, _ = gg.Next(buf[:0]) + lastBody := new(types.BodyForStorage) + err := rlp.DecodeBytes(buf, lastBody) + if err != nil { + return err + } + expectedTxsAmount = lastBody.BaseTxId + uint64(lastBody.TxAmount) - firstBody.BaseTxId + } + + f := filepath.Join(snapshotDir.Path, SegmentFileName(sn.From, sn.To, Transactions)) + if err := TransactionsHashIdx(ctx, chainID, sn, bodySegments[i], firstBody.BaseTxId, sn.From, expectedTxsAmount, f, tmpDir, logEvery, lvl); err != nil { + return err + } + + return nil + }); err != nil { + return nil } - expectedTxsAmount = lastBody.BaseTxId + uint64(lastBody.TxAmount) - firstBody.BaseTxId - } - f := filepath.Join(snapshotDir.Path, SegmentFileName(sn.From, sn.To, Transactions)) - if err := TransactionsHashIdx(ctx, chainID, sn, firstBody.BaseTxId, sn.From, expectedTxsAmount, f, tmpDir, logEvery, lvl); err != nil { - return err + } + return nil + }); err != nil { + return nil } return nil @@ -665,7 +875,81 @@ func min(a, b uint64) uint64 { return b } -func RetireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint256.Int, tmpDir string, snapshots *RoSnapshots, db kv.RoDB, workers int, lvl log.Lvl) error { +type BlockRetire struct { + working atomic.Bool + wg *sync.WaitGroup + result *BlockRetireResult + + workers int + tmpDir string + snapshots *RoSnapshots + db kv.RoDB +} + +type BlockRetireResult struct { + BlockFrom, BlockTo uint64 + Err error +} + +func NewBlockRetire(workers int, tmpDir string, snapshots *RoSnapshots, db kv.RoDB) *BlockRetire { + return &BlockRetire{workers: workers, tmpDir: tmpDir, snapshots: snapshots, wg: &sync.WaitGroup{}, db: db} +} +func (br *BlockRetire) Snapshots() *RoSnapshots { return br.snapshots } +func (br *BlockRetire) Working() bool { return br.working.Load() } +func (br *BlockRetire) Wait() { br.wg.Wait() } +func (br *BlockRetire) Result() *BlockRetireResult { + r := br.result + br.result = nil + return r +} +func (br *BlockRetire) CanRetire(curBlockNum uint64) (blockFrom, blockTo uint64, can bool) { + blockFrom = br.snapshots.BlocksAvailable() + 1 + return canRetire(blockFrom, curBlockNum-params.FullImmutabilityThreshold) +} +func canRetire(from, to uint64) (blockFrom, blockTo uint64, can bool) { + blockFrom = (from / 1_000) * 1_000 + roundedTo1K := (to / 1_000) * 1_000 + jump := roundedTo1K - blockFrom + switch { // only next segment sizes are allowed + case jump >= 500_000: + blockTo = blockFrom + 500_000 + case jump >= 100_000: + blockTo = blockFrom + 100_000 + case jump >= 10_000: + blockTo = blockFrom + 10_000 + case jump >= 1_000: + blockTo = blockFrom + 1_000 + default: + blockTo = blockFrom + } + return blockFrom, blockTo, blockTo-blockFrom >= 1_000 +} +func (br *BlockRetire) CanDeleteTo(curBlockNum uint64) (blockTo uint64) { + hardLimit := (curBlockNum/1_000)*1_000 - params.FullImmutabilityThreshold + return min(hardLimit, br.snapshots.BlocksAvailable()+1) +} +func (br *BlockRetire) RetireBlocksInBackground(ctx context.Context, blockFrom, blockTo uint64, chainID uint256.Int, lvl log.Lvl) { + br.result = nil + if br.working.Load() { + return + } + + br.wg.Add(1) + go func() { + br.working.Store(true) + defer br.working.Store(false) + defer br.wg.Done() + + err := retireBlocks(ctx, blockFrom, blockTo, chainID, br.tmpDir, br.snapshots, br.db, br.workers, lvl) + br.result = &BlockRetireResult{ + BlockFrom: blockFrom, + BlockTo: blockTo, + Err: err, + } + }() +} + +func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint256.Int, tmpDir string, snapshots *RoSnapshots, db kv.RoDB, workers int, lvl log.Lvl) error { log.Log(lvl, "[snapshots] Retire Blocks", "range", fmt.Sprintf("%dk-%dk", blockFrom/1000, blockTo/1000)) // in future we will do it in background if err := DumpBlocks(ctx, blockFrom, blockTo, DEFAULT_SEGMENT_SIZE, tmpDir, snapshots.Dir(), db, workers, lvl); err != nil { @@ -1001,7 +1285,7 @@ func DumpBodies(ctx context.Context, db kv.RoDB, segmentFilePath, tmpDir string, var EmptyTxHash = common.Hash{} -func TransactionsHashIdx(ctx context.Context, chainID uint256.Int, sn *BlocksSnapshot, firstTxID, firstBlockNum, expectedCount uint64, segmentFilePath, tmpDir string, logEvery *time.Ticker, lvl log.Lvl) error { +func TransactionsHashIdx(ctx context.Context, chainID uint256.Int, txsSegment *TxnSegment, bodiesSegment *BodySegment, firstTxID, firstBlockNum, expectedCount uint64, segmentFilePath, tmpDir string, logEvery *time.Ticker, lvl log.Lvl) error { dir, _ := filepath.Split(segmentFilePath) d, err := compress.NewDecompressor(segmentFilePath) @@ -1018,7 +1302,7 @@ func TransactionsHashIdx(ctx context.Context, chainID uint256.Int, sn *BlocksSna BucketSize: 2000, LeafSize: 8, TmpDir: tmpDir, - IndexFile: filepath.Join(dir, IdxFileName(sn.From, sn.To, Transactions.String())), + IndexFile: filepath.Join(dir, IdxFileName(txsSegment.From, txsSegment.To, Transactions.String())), BaseDataID: firstTxID, }) if err != nil { @@ -1030,7 +1314,7 @@ func TransactionsHashIdx(ctx context.Context, chainID uint256.Int, sn *BlocksSna BucketSize: 2000, LeafSize: 8, TmpDir: tmpDir, - IndexFile: filepath.Join(dir, IdxFileName(sn.From, sn.To, TransactionsId.String())), + IndexFile: filepath.Join(dir, IdxFileName(txsSegment.From, txsSegment.To, TransactionsId.String())), BaseDataID: firstTxID, }) if err != nil { @@ -1042,7 +1326,7 @@ func TransactionsHashIdx(ctx context.Context, chainID uint256.Int, sn *BlocksSna BucketSize: 2000, LeafSize: 8, TmpDir: tmpDir, - IndexFile: filepath.Join(dir, IdxFileName(sn.From, sn.To, Transactions2Block.String())), + IndexFile: filepath.Join(dir, IdxFileName(txsSegment.From, txsSegment.To, Transactions2Block.String())), BaseDataID: firstBlockNum, }) if err != nil { @@ -1149,8 +1433,8 @@ RETRY: defer wg.Done() blockNum := firstBlockNum body := &types.BodyForStorage{} - if err := sn.Bodies.WithReadAhead(func() error { - bodyGetter := sn.Bodies.MakeGetter() + if err := bodiesSegment.seg.WithReadAhead(func() error { + bodyGetter := bodiesSegment.seg.MakeGetter() bodyGetter.Reset(0) buf, _ = bodyGetter.Next(buf[:0]) if err := rlp.DecodeBytes(buf, body); err != nil { @@ -1364,23 +1648,30 @@ RETRY: func ForEachHeader(ctx context.Context, s *RoSnapshots, walker func(header *types.Header) error) error { r := bytes.NewReader(nil) - for _, sn := range s.blocks { - ch := forEachAsync(ctx, sn.Headers) - for it := range ch { - if it.err != nil { - return nil - } + err := s.Headers.View(func(snapshots []*HeaderSegment) error { + for _, sn := range snapshots { + ch := forEachAsync(ctx, sn.seg) + for it := range ch { + if it.err != nil { + return nil + } - header := new(types.Header) - r.Reset(it.word[1:]) - if err := rlp.Decode(r, header); err != nil { - return err - } - if err := walker(header); err != nil { - return err + header := new(types.Header) + r.Reset(it.word[1:]) + if err := rlp.Decode(r, header); err != nil { + return err + } + if err := walker(header); err != nil { + return err + } } } + return nil + }) + if err != nil { + return err } + return nil } @@ -1394,35 +1685,6 @@ func NewMerger(tmpDir string, workers int, lvl log.Lvl) *Merger { return &Merger{tmpDir: tmpDir, workers: workers, lvl: lvl} } -/* - a.fileLocks[fType].RLock() - defer a.fileLocks[fType].RUnlock() - var maxEndBlock uint64 - a.files[fType].Ascend(func(i btree.Item) bool { - item := i.(*byEndBlockItem) - if item.decompressor == nil { - return true // Skip B-tree based items - } - pre = append(pre, item) - if aggTo == 0 { - var doubleEnd uint64 - nextDouble := item.endBlock - for nextDouble <= maxEndBlock && nextDouble-item.startBlock < maxSpan { - doubleEnd = nextDouble - nextDouble = doubleEnd + (doubleEnd - item.startBlock) + 1 - } - if doubleEnd != item.endBlock { - aggFrom = item.startBlock - aggTo = doubleEnd - } else { - post = append(post, item) - return true - } - } - toAggregate = append(toAggregate, item) - return item.endBlock < aggTo - }) -*/ type mergeRange struct { from, to uint64 } @@ -1430,8 +1692,8 @@ type mergeRange struct { func (r mergeRange) String() string { return fmt.Sprintf("%dk-%dk", r.from/1000, r.to/1000) } func (*Merger) FindMergeRanges(snapshots *RoSnapshots) (res []mergeRange) { - for i := len(snapshots.blocks) - 1; i > 0; i-- { - sn := snapshots.blocks[i] + for i := len(snapshots.Headers.segments) - 1; i > 0; i-- { + sn := snapshots.Headers.segments[i] if sn.To-sn.From >= DEFAULT_SEGMENT_SIZE { // is complete .seg continue } @@ -1445,7 +1707,7 @@ func (*Merger) FindMergeRanges(snapshots *RoSnapshots) (res []mergeRange) { } aggFrom := sn.To - span res = append(res, mergeRange{from: aggFrom, to: sn.To}) - for snapshots.blocks[i].From > aggFrom { + for snapshots.Headers.segments[i].From > aggFrom { i-- } break @@ -1455,17 +1717,27 @@ func (*Merger) FindMergeRanges(snapshots *RoSnapshots) (res []mergeRange) { return res } func (m *Merger) filesByRange(snapshots *RoSnapshots, from, to uint64) (toMergeHeaders, toMergeBodies, toMergeTxs []string) { - for _, sn := range snapshots.blocks { - if sn.From < from { - continue - } - if sn.To > to { - break - } + if err := snapshots.Headers.View(func(hSegments []*HeaderSegment) error { + return snapshots.Bodies.View(func(bSegments []*BodySegment) error { + return snapshots.Txs.View(func(tSegments []*TxnSegment) error { + for i, sn := range hSegments { + if sn.From < from { + continue + } + if sn.To > to { + break + } + + toMergeHeaders = append(toMergeHeaders, hSegments[i].seg.FilePath()) + toMergeBodies = append(toMergeBodies, bSegments[i].seg.FilePath()) + toMergeTxs = append(toMergeTxs, tSegments[i].Seg.FilePath()) + } - toMergeBodies = append(toMergeBodies, sn.Bodies.FilePath()) - toMergeHeaders = append(toMergeHeaders, sn.Headers.FilePath()) - toMergeTxs = append(toMergeTxs, sn.Transactions.FilePath()) + return nil + }) + }) + }); err != nil { + panic(err) } return } diff --git a/turbo/snapshotsync/block_snapshots_test.go b/turbo/snapshotsync/block_snapshots_test.go index eb3f099482fef00386edc0454731647d72ababac..f7d429822b56ee63a62d894656ce4148f08c5e7e 100644 --- a/turbo/snapshotsync/block_snapshots_test.go +++ b/turbo/snapshotsync/block_snapshots_test.go @@ -107,6 +107,24 @@ func TestMergeSnapshots(t *testing.T) { require.Equal(1, a) } +func TestCanRetire(t *testing.T) { + require := require.New(t) + cases := []struct { + inFrom, inTo, outFrom, outTo uint64 + can bool + }{ + {0, 1234, 0, 1000, true}, + {1_000_000, 1_120_000, 1_000_000, 1_100_000, true}, + {2_500_000, 4_100_000, 2_500_000, 3_000_000, true}, + {2_500_000, 2_500_100, 2_500_000, 2_500_000, false}, + } + for _, tc := range cases { + from, to, can := canRetire(tc.inFrom, tc.inTo) + require.Equal(int(tc.outFrom), int(from)) + require.Equal(int(tc.outTo), int(to)) + require.Equal(tc.can, can, tc.inFrom, tc.inTo) + } +} func TestRecompress(t *testing.T) { dir, require := t.TempDir(), require.New(t) createFile := func(from, to uint64) { createTestSegmentFile(t, from, to, Headers, dir) } @@ -131,13 +149,13 @@ func TestOpenAllSnapshot(t *testing.T) { defer s.Close() err := s.ReopenSegments() require.NoError(err) - require.Equal(0, len(s.blocks)) + require.Equal(0, len(s.Headers.segments)) s.Close() createFile(500_000, 1_000_000, Bodies) s = NewRoSnapshots(cfg, dir) defer s.Close() - require.Equal(0, len(s.blocks)) //because, no headers and transactions snapshot files are created + require.Equal(0, len(s.Bodies.segments)) //because, no headers and transactions snapshot files are created s.Close() createFile(500_000, 1_000_000, Headers) @@ -145,7 +163,7 @@ func TestOpenAllSnapshot(t *testing.T) { s = NewRoSnapshots(cfg, dir) err = s.ReopenSegments() require.Error(err) - require.Equal(0, len(s.blocks)) //because, no gaps are allowed (expect snapshots from block 0) + require.Equal(0, len(s.Headers.segments)) //because, no gaps are allowed (expect snapshots from block 0) s.Close() createFile(0, 500_000, Bodies) @@ -157,17 +175,26 @@ func TestOpenAllSnapshot(t *testing.T) { err = s.ReopenSegments() require.NoError(err) s.indicesReady.Store(true) - require.Equal(2, len(s.blocks)) + require.Equal(2, len(s.Headers.segments)) - sn, ok := s.Blocks(10) + ok, err := s.ViewTxs(10, func(sn *TxnSegment) error { + require.Equal(int(sn.To), 500_000) + return nil + }) + require.NoError(err) require.True(ok) - require.Equal(int(sn.To), 500_000) - sn, ok = s.Blocks(500_000) + ok, err = s.ViewTxs(500_000, func(sn *TxnSegment) error { + require.Equal(int(sn.To), 1_000_000) // [from:to) + return nil + }) + require.NoError(err) require.True(ok) - require.Equal(int(sn.To), 1_000_000) // [from:to) - _, ok = s.Blocks(1_000_000) + ok, err = s.ViewTxs(1_000_000, func(sn *TxnSegment) error { + return nil + }) + require.NoError(err) require.False(ok) // Erigon may create new snapshots by itself - with high bigger than hardcoded ExpectedBlocks @@ -177,7 +204,7 @@ func TestOpenAllSnapshot(t *testing.T) { err = s.ReopenSegments() require.NoError(err) defer s.Close() - require.Equal(2, len(s.blocks)) + require.Equal(2, len(s.Headers.segments)) createFile(500_000, 900_000, Headers) createFile(500_000, 900_000, Bodies) diff --git a/turbo/snapshotsync/snapshotsynccli/flags.go b/turbo/snapshotsync/snapshotsynccli/flags.go index 24029d2b986f564115e77686275075d7a1b57fc7..94cdf6ebf4d74b75ae1ff9240aaebc2f7c25913c 100644 --- a/turbo/snapshotsync/snapshotsynccli/flags.go +++ b/turbo/snapshotsync/snapshotsynccli/flags.go @@ -8,8 +8,7 @@ import ( ) var ( - blockSnapshotEnabledKey = []byte("blocksSnapshotEnabled") - blockSnapshotRetireEnabledKey = []byte("blocksSnapshotRetireEnabled") + blockSnapshotEnabledKey = []byte("blocksSnapshotEnabled") ) func EnsureNotChanged(tx kv.GetPut, cfg ethconfig.Snapshot) error { @@ -20,12 +19,5 @@ func EnsureNotChanged(tx kv.GetPut, cfg ethconfig.Snapshot) error { if !ok { return fmt.Errorf("node was started with --%s=%v, can't change it", ethconfig.FlagSnapshot, v) } - ok, v, err = kv.EnsureNotChangedBool(tx, kv.DatabaseInfo, blockSnapshotRetireEnabledKey, cfg.RetireEnabled) - if err != nil { - return err - } - if !ok { - return fmt.Errorf("node was started with --%s=%v, can't change it", ethconfig.FlagSnapshotRetire, v) - } return nil } diff --git a/turbo/stages/mock_sentry.go b/turbo/stages/mock_sentry.go index 7f270656432cde5625acf2b4db7097fffb8a56ed..efc7113126a3e9341cc37187f1eaa13e9626f341 100644 --- a/turbo/stages/mock_sentry.go +++ b/turbo/stages/mock_sentry.go @@ -326,7 +326,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey blockReader, ), stagedsync.StageIssuanceCfg(mock.DB, mock.ChainConfig, blockReader, true), - stagedsync.StageSendersCfg(mock.DB, mock.ChainConfig, mock.tmpdir, prune, allSnapshots), + stagedsync.StageSendersCfg(mock.DB, mock.ChainConfig, mock.tmpdir, prune, snapshotsync.NewBlockRetire(1, mock.tmpdir, allSnapshots, mock.DB)), stagedsync.StageExecuteBlocksCfg( mock.DB, prune, diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index d5a4931a37d4c9f4ee5104883b51c4823e606779..15c266fe38ba356778acf0e13f4e5d7a5ff63ac3 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -253,11 +253,10 @@ func NewStagedSync( forkChoiceCh chan privateapi.ForkChoiceMessage, waitingForBeaconChain *uint32, snapshotDownloader proto_downloader.DownloaderClient, + allSnapshots *snapshotsync.RoSnapshots, ) (*stagedsync.Sync, error) { var blockReader interfaces.FullBlockReader - var allSnapshots *snapshotsync.RoSnapshots if cfg.Snapshot.Enabled { - allSnapshots = snapshotsync.NewRoSnapshots(cfg.Snapshot, cfg.SnapshotDir.Path) blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots) } else { blockReader = snapshotsync.NewBlockReader() @@ -303,7 +302,7 @@ func NewStagedSync( blockReader, ), stagedsync.StageIssuanceCfg(db, controlServer.ChainConfig, blockReader, cfg.EnabledIssuance), - stagedsync.StageSendersCfg(db, controlServer.ChainConfig, tmpdir, cfg.Prune, allSnapshots), + stagedsync.StageSendersCfg(db, controlServer.ChainConfig, tmpdir, cfg.Prune, snapshotsync.NewBlockRetire(1, tmpdir, allSnapshots, db)), stagedsync.StageExecuteBlocksCfg( db, cfg.Prune,