From 80bd44fce58572089fa29e774f15cb1a29db3cad Mon Sep 17 00:00:00 2001
From: Alex Sharov <AskAlexSharov@gmail.com>
Date: Tue, 5 Apr 2022 16:22:11 +0700
Subject: [PATCH] Snapsthos: build indices on retire blocks (#3823)

* grpc up

* grpc up

* grpc up
---
 cmd/integration/commands/stages.go    | 11 +++++++++--
 eth/backend.go                        |  8 ++++++--
 eth/stagedsync/stage_senders_test.go  |  2 +-
 turbo/app/snapshots.go                |  2 +-
 turbo/snapshotsync/block_snapshots.go | 25 +++++++++++++------------
 turbo/stages/mock_sentry.go           |  2 +-
 turbo/stages/stageloop.go             | 15 +++++++++------
 7 files changed, 40 insertions(+), 25 deletions(-)

diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go
index 4a6d0c534c..44d9001564 100644
--- a/cmd/integration/commands/stages.go
+++ b/cmd/integration/commands/stages.go
@@ -576,11 +576,18 @@ func stageSenders(db kv.RwDB, ctx context.Context) error {
 	s := stage(sync, tx, nil, stages.Senders)
 	log.Info("Stage", "name", s.ID, "progress", s.BlockNumber)
 
+	snapshots := allSnapshots(chainConfig)
+	d, err := dir.OpenRw(snapshots.Dir())
+	if err != nil {
+		return err
+	}
+
 	pm, err := prune.Get(tx)
 	if err != nil {
 		return err
 	}
-	cfg := stagedsync.StageSendersCfg(db, chainConfig, tmpdir, pm, snapshotsync.NewBlockRetire(runtime.NumCPU(), tmpdir, allSnapshots(chainConfig), db, nil, nil))
+
+	cfg := stagedsync.StageSendersCfg(db, chainConfig, tmpdir, pm, snapshotsync.NewBlockRetire(runtime.NumCPU(), tmpdir, snapshots, d, db, nil, nil))
 	if unwind > 0 {
 		u := sync.NewUnwindState(stages.Senders, s.BlockNumber-unwind, s.BlockNumber)
 		if err = stagedsync.UnwindSendersStage(u, tx, cfg, ctx); err != nil {
@@ -1166,7 +1173,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig)
 
 	sync, err := stages2.NewStagedSync(context.Background(), logger, db, p2p.Config{}, cfg,
 		chainConfig.TerminalTotalDifficulty, sentryControlServer, tmpdir,
-		&stagedsync.Notifications{}, nil, allSn,
+		&stagedsync.Notifications{}, nil, allSn, cfg.SnapshotDir,
 	)
 	if err != nil {
 		panic(err)
diff --git a/eth/backend.go b/eth/backend.go
index 8c83053314..b36e6a63c3 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -31,6 +31,7 @@ import (
 
 	"github.com/holiman/uint256"
 	libcommon "github.com/ledgerwatch/erigon-lib/common"
+	"github.com/ledgerwatch/erigon-lib/common/dir"
 	"github.com/ledgerwatch/erigon-lib/direct"
 	"github.com/ledgerwatch/erigon-lib/etl"
 	proto_downloader "github.com/ledgerwatch/erigon-lib/gointerfaces/downloader"
@@ -481,11 +482,14 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l
 	if err := backend.StartMining(context.Background(), backend.chainDB, mining, backend.config.Miner, backend.gasPrice, backend.quitMining); err != nil {
 		return nil, err
 	}
-
+	d, err := dir.OpenRw(allSnapshots.Dir())
+	if err != nil {
+		return nil, err
+	}
 	backend.stagedSync, err = stages2.NewStagedSync(backend.sentryCtx, backend.log, backend.chainDB,
 		stack.Config().P2P, *config, chainConfig.TerminalTotalDifficulty,
 		backend.sentryControlServer, tmpdir, backend.notifications,
-		backend.downloaderClient, allSnapshots)
+		backend.downloaderClient, allSnapshots, d)
 	if err != nil {
 		return nil, err
 	}
diff --git a/eth/stagedsync/stage_senders_test.go b/eth/stagedsync/stage_senders_test.go
index c6384bc871..18638c9b1f 100644
--- a/eth/stagedsync/stage_senders_test.go
+++ b/eth/stagedsync/stage_senders_test.go
@@ -109,7 +109,7 @@ func TestSenders(t *testing.T) {
 
 	require.NoError(stages.SaveStageProgress(tx, stages.Bodies, 3))
 
-	cfg := StageSendersCfg(db, params.TestChainConfig, "", prune.Mode{}, snapshotsync.NewBlockRetire(1, "", nil, db, nil, nil))
+	cfg := StageSendersCfg(db, params.TestChainConfig, "", prune.Mode{}, snapshotsync.NewBlockRetire(1, "", nil, nil, db, nil, nil))
 	err := SpawnRecoverSendersStage(cfg, &StageState{ID: stages.Senders}, nil, tx, 3, ctx)
 	assert.NoError(t, err)
 
diff --git a/turbo/app/snapshots.go b/turbo/app/snapshots.go
index 8c42ae3736..8d2b0b96c9 100644
--- a/turbo/app/snapshots.go
+++ b/turbo/app/snapshots.go
@@ -245,7 +245,7 @@ func doRetireCommand(cliCtx *cli.Context) error {
 	snapshots := snapshotsync.NewRoSnapshots(cfg, snapshotDir)
 	snapshots.Reopen()
 
-	br := snapshotsync.NewBlockRetire(runtime.NumCPU()-1, tmpDir, snapshots, chainDB, nil, nil)
+	br := snapshotsync.NewBlockRetire(runtime.NumCPU()-1, tmpDir, snapshots, rwSnapshotDir, chainDB, nil, nil)
 
 	for i := from; i < to; i += every {
 		br.RetireBlocksInBackground(ctx, i, i+every, *chainID, log.LvlInfo)
diff --git a/turbo/snapshotsync/block_snapshots.go b/turbo/snapshotsync/block_snapshots.go
index bc98f0b007..7bfed5f23f 100644
--- a/turbo/snapshotsync/block_snapshots.go
+++ b/turbo/snapshotsync/block_snapshots.go
@@ -953,10 +953,11 @@ type BlockRetire struct {
 	wg      *sync.WaitGroup
 	result  *BlockRetireResult
 
-	workers   int
-	tmpDir    string
-	snapshots *RoSnapshots
-	db        kv.RoDB
+	workers     int
+	tmpDir      string
+	snapshots   *RoSnapshots
+	snapshotDir *dir.Rw
+	db          kv.RoDB
 
 	downloader proto_downloader.DownloaderClient
 	notifier   DBEventNotifier
@@ -967,8 +968,8 @@ type BlockRetireResult struct {
 	Err                error
 }
 
-func NewBlockRetire(workers int, tmpDir string, snapshots *RoSnapshots, db kv.RoDB, downloader proto_downloader.DownloaderClient, notifier DBEventNotifier) *BlockRetire {
-	return &BlockRetire{workers: workers, tmpDir: tmpDir, snapshots: snapshots, wg: &sync.WaitGroup{}, db: db, downloader: downloader, notifier: notifier}
+func NewBlockRetire(workers int, tmpDir string, snapshots *RoSnapshots, snapshotDir *dir.Rw, db kv.RoDB, downloader proto_downloader.DownloaderClient, notifier DBEventNotifier) *BlockRetire {
+	return &BlockRetire{workers: workers, tmpDir: tmpDir, snapshots: snapshots, snapshotDir: snapshotDir, wg: &sync.WaitGroup{}, db: db, downloader: downloader, notifier: notifier}
 }
 func (br *BlockRetire) Snapshots() *RoSnapshots { return br.snapshots }
 func (br *BlockRetire) Working() bool           { return br.working.Load() }
@@ -1025,7 +1026,7 @@ func (br *BlockRetire) RetireBlocksInBackground(ctx context.Context, blockFrom,
 		defer br.working.Store(false)
 		defer br.wg.Done()
 
-		err := retireBlocks(ctx, blockFrom, blockTo, chainID, br.tmpDir, br.snapshots, br.db, br.workers, br.downloader, lvl, br.notifier)
+		err := retireBlocks(ctx, blockFrom, blockTo, chainID, br.tmpDir, br.snapshots, br.snapshotDir, br.db, br.workers, br.downloader, lvl, br.notifier)
 		br.result = &BlockRetireResult{
 			BlockFrom: blockFrom,
 			BlockTo:   blockTo,
@@ -1038,7 +1039,7 @@ type DBEventNotifier interface {
 	OnNewSnapshot()
 }
 
-func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint256.Int, tmpDir string, snapshots *RoSnapshots, db kv.RoDB, workers int, downloader proto_downloader.DownloaderClient, lvl log.Lvl, notifier DBEventNotifier) error {
+func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint256.Int, tmpDir string, snapshots *RoSnapshots, rwSnapshotDir *dir.Rw, db kv.RoDB, workers int, downloader proto_downloader.DownloaderClient, lvl log.Lvl, notifier DBEventNotifier) error {
 	log.Log(lvl, "[snapshots] Retire Blocks", "range", fmt.Sprintf("%dk-%dk", blockFrom/1000, blockTo/1000))
 	// in future we will do it in background
 	if err := DumpBlocks(ctx, blockFrom, blockTo, DEFAULT_SEGMENT_SIZE, tmpDir, snapshots.Dir(), db, workers, lvl); err != nil {
@@ -1047,18 +1048,18 @@ func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint25
 	if err := snapshots.Reopen(); err != nil {
 		return fmt.Errorf("ReopenSegments: %w", err)
 	}
+	if err := BuildIndices(ctx, snapshots, rwSnapshotDir, chainID, tmpDir, snapshots.IndicesAvailable(), log.LvlInfo); err != nil {
+		return err
+	}
 	merger := NewMerger(tmpDir, workers, lvl, chainID)
 	ranges := merger.FindMergeRanges(snapshots)
 	if len(ranges) == 0 {
 		return nil
 	}
-	err := merger.Merge(ctx, snapshots, ranges, &dir.Rw{Path: snapshots.Dir()}, true)
+	err := merger.Merge(ctx, snapshots, ranges, rwSnapshotDir, true)
 	if err != nil {
 		return err
 	}
-	if err := snapshots.Reopen(); err != nil {
-		return fmt.Errorf("ReopenSegments: %w", err)
-	}
 	if notifier != nil { // notify about new snapshots of any size
 		notifier.OnNewSnapshot()
 	}
diff --git a/turbo/stages/mock_sentry.go b/turbo/stages/mock_sentry.go
index ae46007d98..31c2f0c886 100644
--- a/turbo/stages/mock_sentry.go
+++ b/turbo/stages/mock_sentry.go
@@ -316,7 +316,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey
 				blockReader,
 			),
 			stagedsync.StageIssuanceCfg(mock.DB, mock.ChainConfig, blockReader, true),
-			stagedsync.StageSendersCfg(mock.DB, mock.ChainConfig, mock.tmpdir, prune, snapshotsync.NewBlockRetire(1, mock.tmpdir, allSnapshots, mock.DB, snapshotsDownloader, mock.Notifications.Events)),
+			stagedsync.StageSendersCfg(mock.DB, mock.ChainConfig, mock.tmpdir, prune, snapshotsync.NewBlockRetire(1, mock.tmpdir, allSnapshots, snapshotDir, mock.DB, snapshotsDownloader, mock.Notifications.Events)),
 			stagedsync.StageExecuteBlocksCfg(
 				mock.DB,
 				prune,
diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go
index 3f6e4a4ee9..27b81c5c84 100644
--- a/turbo/stages/stageloop.go
+++ b/turbo/stages/stageloop.go
@@ -10,6 +10,7 @@ import (
 	"github.com/holiman/uint256"
 	libcommon "github.com/ledgerwatch/erigon-lib/common"
 	"github.com/ledgerwatch/erigon-lib/common/dbg"
+	"github.com/ledgerwatch/erigon-lib/common/dir"
 	proto_downloader "github.com/ledgerwatch/erigon-lib/gointerfaces/downloader"
 	"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
 	"github.com/ledgerwatch/erigon-lib/kv"
@@ -249,14 +250,16 @@ func NewStagedSync(
 	tmpdir string,
 	notifications *stagedsync.Notifications,
 	snapshotDownloader proto_downloader.DownloaderClient,
-	allSnapshots *snapshotsync.RoSnapshots,
+	snapshots *snapshotsync.RoSnapshots,
+	snapshotDir *dir.Rw,
 ) (*stagedsync.Sync, error) {
 	var blockReader interfaces.FullBlockReader
 	if cfg.Snapshot.Enabled {
-		blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots)
+		blockReader = snapshotsync.NewBlockReaderWithSnapshots(snapshots)
 	} else {
 		blockReader = snapshotsync.NewBlockReader()
 	}
+	blockRetire := snapshotsync.NewBlockRetire(1, tmpdir, snapshots, snapshotDir, db, snapshotDownloader, notifications.Events)
 
 	// During Import we don't want other services like header requests, body requests etc. to be running.
 	// Hence we run it in the test mode.
@@ -274,7 +277,7 @@ func NewStagedSync(
 				controlServer.Penalize,
 				cfg.BatchSize,
 				p2pCfg.NoDiscovery,
-				allSnapshots,
+				snapshots,
 				snapshotDownloader,
 				blockReader,
 				tmpdir,
@@ -291,11 +294,11 @@ func NewStagedSync(
 				cfg.BodyDownloadTimeoutSeconds,
 				*controlServer.ChainConfig,
 				cfg.BatchSize,
-				allSnapshots,
+				snapshots,
 				blockReader,
 			),
 			stagedsync.StageIssuanceCfg(db, controlServer.ChainConfig, blockReader, cfg.EnabledIssuance),
-			stagedsync.StageSendersCfg(db, controlServer.ChainConfig, tmpdir, cfg.Prune, snapshotsync.NewBlockRetire(1, tmpdir, allSnapshots, db, snapshotDownloader, notifications.Events)),
+			stagedsync.StageSendersCfg(db, controlServer.ChainConfig, tmpdir, cfg.Prune, blockRetire),
 			stagedsync.StageExecuteBlocksCfg(
 				db,
 				cfg.Prune,
@@ -315,7 +318,7 @@ func NewStagedSync(
 			stagedsync.StageHistoryCfg(db, cfg.Prune, tmpdir),
 			stagedsync.StageLogIndexCfg(db, cfg.Prune, tmpdir),
 			stagedsync.StageCallTracesCfg(db, cfg.Prune, 0, tmpdir),
-			stagedsync.StageTxLookupCfg(db, cfg.Prune, tmpdir, allSnapshots, isBor),
+			stagedsync.StageTxLookupCfg(db, cfg.Prune, tmpdir, snapshots, isBor),
 			stagedsync.StageFinishCfg(db, tmpdir, logger), runInTestMode),
 		stagedsync.DefaultUnwindOrder,
 		stagedsync.DefaultPruneOrder,
-- 
GitLab