From 5f4bb9826470b01f55d086256d52b00dde6c6946 Mon Sep 17 00:00:00 2001 From: Alex Sharov <AskAlexSharov@gmail.com> Date: Mon, 21 Mar 2022 10:28:24 +0700 Subject: [PATCH] Snapshots: do indexing for older ranges if need (#3733) * save * save * save * save --- eth/stagedsync/stage_headers.go | 37 +++++---------------------- turbo/snapshotsync/block_snapshots.go | 7 ++--- 2 files changed, 10 insertions(+), 34 deletions(-) diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index 0d0ba79168..9ac44851a6 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -996,37 +996,12 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R if err := WaitForDownloader(ctx, tx, cfg); err != nil { return err } - - logEvery := time.NewTicker(logInterval) - defer logEvery.Stop() - - // Open segments - for { - headers, bodies, txs, err := cfg.snapshots.SegmentsAvailability() - if err != nil { - return err - } - expect := cfg.snapshotHashesCfg.ExpectBlocks - if headers >= expect && bodies >= expect && txs >= expect { - if err := cfg.snapshots.ReopenSegments(); err != nil { - return fmt.Errorf("ReopenSegments: %w", err) - } - if expect > cfg.snapshots.BlocksAvailable() { - return fmt.Errorf("not enough snapshots available: %d > %d", expect, cfg.snapshots.BlocksAvailable()) - } - - break - } - log.Info(fmt.Sprintf("[%s] Waiting for snapshots up to block %d...", s.LogPrefix(), expect), "headers", headers, "bodies", bodies, "txs", txs) - time.Sleep(10 * time.Second) - - select { - case <-ctx.Done(): - return ctx.Err() - case <-logEvery.C: - log.Info(fmt.Sprintf("[%s] Waiting for snapshots up to block %d...", s.LogPrefix(), expect), "headers", headers, "bodies", bodies, "txs", txs) - default: - } + expect := cfg.snapshotHashesCfg.ExpectBlocks + if err := cfg.snapshots.ReopenSegments(); err != nil { + return fmt.Errorf("ReopenSegments: %w", err) + } + if cfg.snapshots.SegmentsAvailable() < expect { + return fmt.Errorf("not enough snapshots available: %d > %d", expect, cfg.snapshots.SegmentsAvailable()) } } diff --git a/turbo/snapshotsync/block_snapshots.go b/turbo/snapshotsync/block_snapshots.go index 04e8d40c30..62901a815a 100644 --- a/turbo/snapshotsync/block_snapshots.go +++ b/turbo/snapshotsync/block_snapshots.go @@ -333,8 +333,8 @@ type RoSnapshots struct { Txs *txnSegments dir string - segmentsAvailable atomic.Uint64 - idxAvailable atomic.Uint64 + segmentsAvailable atomic.Uint64 // all types of .seg files are available - up to this number + idxAvailable atomic.Uint64 // all types of .idx files are available - up to this number cfg ethconfig.Snapshot } @@ -563,6 +563,7 @@ func (s *RoSnapshots) ViewTxs(blockNum uint64, f func(sn *TxnSegment) error) (fo } func BuildIndices(ctx context.Context, s *RoSnapshots, snapshotDir *dir.Rw, chainID uint256.Int, tmpDir string, from uint64, lvl log.Lvl) error { + log.Log(lvl, "[snapshots] Build indices", "from", from) logEvery := time.NewTicker(20 * time.Second) defer logEvery.Stop() @@ -995,7 +996,7 @@ func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint25 return err } log.Log(lvl, "[snapshots] Merge done. Indexing new segments", "from", ranges[0].from) - if err := BuildIndices(ctx, snapshots, &dir.Rw{Path: snapshots.Dir()}, chainID, tmpDir, ranges[0].from, lvl); err != nil { + if err := BuildIndices(ctx, snapshots, &dir.Rw{Path: snapshots.Dir()}, chainID, tmpDir, min(ranges[0].from, snapshots.IndicesAvailable()), lvl); err != nil { return fmt.Errorf("BuildIndices: %w", err) } if err := snapshots.ReopenIndices(); err != nil { -- GitLab