From f314222180a218bab80178884b1ed990d0ad4f76 Mon Sep 17 00:00:00 2001
From: Alex Sharov <AskAlexSharov@gmail.com>
Date: Fri, 18 Mar 2022 11:12:18 +0700
Subject: [PATCH] Snapshots: start seed new large .seg files (#3724)

---
 cmd/downloader/downloader/downloader.go       | 37 ++++++----
 .../downloader/{server.go => grpc_server.go}  | 15 +++-
 cmd/downloader/downloader/util.go             | 32 +++++----
 cmd/downloader/trackers/trackerslist          |  2 +-
 cmd/integration/commands/stages.go            |  3 +-
 cmd/state/commands/erigon2.go                 |  7 --
 eth/backend.go                                | 51 --------------
 eth/stagedsync/stage_headers.go               | 20 ++----
 eth/stagedsync/stage_senders.go               |  9 +--
 eth/stagedsync/stage_senders_test.go          |  2 +-
 turbo/app/snapshots.go                        |  2 +-
 turbo/snapshotsync/block_snapshots.go         | 69 +++++++++++++++----
 turbo/snapshotsync/block_snapshots_test.go    |  3 +
 turbo/snapshotsync/snapshothashes/embed.go    |  5 +-
 turbo/stages/mock_sentry.go                   |  2 +-
 turbo/stages/stageloop.go                     |  2 +-
 16 files changed, 130 insertions(+), 131 deletions(-)
 rename cmd/downloader/downloader/{server.go => grpc_server.go} (87%)

diff --git a/cmd/downloader/downloader/downloader.go b/cmd/downloader/downloader/downloader.go
index d8593e1623..a0f5c7ea6a 100644
--- a/cmd/downloader/downloader/downloader.go
+++ b/cmd/downloader/downloader/downloader.go
@@ -204,6 +204,25 @@ func CalcStats(prevStats AggStats, interval time.Duration, client *torrent.Clien
 	return result
 }
 
+func AddTorrentFile(ctx context.Context, torrentFilePath string, torrentClient *torrent.Client) (mi *metainfo.MetaInfo, err error) {
+	mi, err = metainfo.LoadFromFile(torrentFilePath)
+	if err != nil {
+		return nil, err
+	}
+	mi.AnnounceList = Trackers
+
+	t := time.Now()
+	_, err = torrentClient.AddTorrent(mi)
+	if err != nil {
+		return mi, err
+	}
+	took := time.Since(t)
+	if took > 3*time.Second {
+		log.Info("[torrent] Check validity", "file", torrentFilePath, "took", took)
+	}
+	return mi, nil
+}
+
 // AddTorrentFiles - adding .torrent files to torrentClient (and checking their hashes), if .torrent file
 // added first time - pieces verification process will start (disk IO heavy) - Progress
 // kept in `piece completion storage` (surviving reboot). Once it done - no disk IO needed again.
@@ -214,27 +233,15 @@ func AddTorrentFiles(ctx context.Context, snapshotsDir *dir.Rw, torrentClient *t
 		return err
 	}
 	for _, torrentFilePath := range files {
+		if _, err := AddTorrentFile(ctx, torrentFilePath, torrentClient); err != nil {
+			return err
+		}
 		select {
 		case <-ctx.Done():
 			return ctx.Err()
 		default:
 		}
 
-		mi, err := metainfo.LoadFromFile(torrentFilePath)
-		if err != nil {
-			return err
-		}
-		mi.AnnounceList = Trackers
-
-		t := time.Now()
-		_, err = torrentClient.AddTorrent(mi)
-		if err != nil {
-			return err
-		}
-		took := time.Since(t)
-		if took > 3*time.Second {
-			log.Info("[torrent] Check validity", "file", torrentFilePath, "took", took)
-		}
 	}
 
 	return nil
diff --git a/cmd/downloader/downloader/server.go b/cmd/downloader/downloader/grpc_server.go
similarity index 87%
rename from cmd/downloader/downloader/server.go
rename to cmd/downloader/downloader/grpc_server.go
index dfb86f425c..f3fa18defe 100644
--- a/cmd/downloader/downloader/server.go
+++ b/cmd/downloader/downloader/grpc_server.go
@@ -3,6 +3,7 @@ package downloader
 import (
 	"context"
 	"errors"
+	"path/filepath"
 
 	"github.com/anacrolix/torrent"
 	"github.com/anacrolix/torrent/metainfo"
@@ -58,8 +59,18 @@ type GrpcServer struct {
 func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.DownloadRequest) (*emptypb.Empty, error) {
 	infoHashes := make([]metainfo.Hash, len(request.Items))
 	for i, it := range request.Items {
-		//TODO: if hash is empty - create .torrent file from path file (if it exists)
-		infoHashes[i] = gointerfaces.ConvertH160toAddress(it.TorrentHash)
+		if it.TorrentHash == nil {
+			if err := BuildTorrentFileIfNeed(ctx, it.Path, s.snapshotDir); err != nil {
+				return nil, err
+			}
+			metaInfo, err := AddTorrentFile(ctx, filepath.Join(s.snapshotDir.Path, it.Path+".torrent"), s.t.TorrentClient)
+			if err != nil {
+				return nil, err
+			}
+			infoHashes[i] = metaInfo.HashInfoBytes()
+		} else {
+			infoHashes[i] = gointerfaces.ConvertH160toAddress(it.TorrentHash)
+		}
 	}
 	if err := ResolveAbsentTorrents(ctx, s.t.TorrentClient, infoHashes, s.snapshotDir, s.silent); err != nil {
 		return nil, err
diff --git a/cmd/downloader/downloader/util.go b/cmd/downloader/downloader/util.go
index ad055ff874..57e2b709a7 100644
--- a/cmd/downloader/downloader/util.go
+++ b/cmd/downloader/downloader/util.go
@@ -85,6 +85,24 @@ func allSegmentFiles(dir string) ([]string, error) {
 	return res, nil
 }
 
+// BuildTorrentFileIfNeed - create .torrent files from .seg files (big IO) - if .seg files were added manually
+func BuildTorrentFileIfNeed(ctx context.Context, originalFileName string, root *dir.Rw) (err error) {
+	torrentFilePath := filepath.Join(root.Path, originalFileName+".torrent")
+	if _, err := os.Stat(torrentFilePath); err != nil {
+		if !errors.Is(err, os.ErrNotExist) {
+			return err
+		}
+		info, err := BuildInfoBytesForFile(root.Path, originalFileName)
+		if err != nil {
+			return err
+		}
+		if err := CreateTorrentFile(root, info, nil); err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
 // BuildTorrentFilesIfNeed - create .torrent files from .seg files (big IO) - if .seg files were added manually
 func BuildTorrentFilesIfNeed(ctx context.Context, root *dir.Rw) error {
 	logEvery := time.NewTicker(20 * time.Second)
@@ -95,18 +113,8 @@ func BuildTorrentFilesIfNeed(ctx context.Context, root *dir.Rw) error {
 		return err
 	}
 	for i, f := range files {
-		torrentFileName := filepath.Join(root.Path, f+".torrent")
-		if _, err := os.Stat(torrentFileName); err != nil {
-			if !errors.Is(err, os.ErrNotExist) {
-				return err
-			}
-			info, err := BuildInfoBytesForFile(root.Path, f)
-			if err != nil {
-				return err
-			}
-			if err := CreateTorrentFile(root, info, nil); err != nil {
-				return err
-			}
+		if err := BuildTorrentFileIfNeed(ctx, f, root); err != nil {
+			return err
 		}
 
 		select {
diff --git a/cmd/downloader/trackers/trackerslist b/cmd/downloader/trackers/trackerslist
index c378dbeac7..0a2affcd41 160000
--- a/cmd/downloader/trackers/trackerslist
+++ b/cmd/downloader/trackers/trackerslist
@@ -1 +1 @@
-Subproject commit c378dbeac796719b7b0210d4a97d575a6e8dc66f
+Subproject commit 0a2affcd4120a141e0d0ac8c7c43e2eaa7fee894
diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go
index ac0acf5df0..05af41ff76 100644
--- a/cmd/integration/commands/stages.go
+++ b/cmd/integration/commands/stages.go
@@ -579,7 +579,7 @@ func stageSenders(db kv.RwDB, ctx context.Context) error {
 	if err != nil {
 		return err
 	}
-	cfg := stagedsync.StageSendersCfg(db, chainConfig, tmpdir, pm, snapshotsync.NewBlockRetire(runtime.NumCPU(), tmpdir, allSnapshots(chainConfig), db))
+	cfg := stagedsync.StageSendersCfg(db, chainConfig, tmpdir, pm, snapshotsync.NewBlockRetire(runtime.NumCPU(), tmpdir, allSnapshots(chainConfig), db, nil))
 	if unwind > 0 {
 		u := sync.NewUnwindState(stages.Senders, s.BlockNumber-unwind, s.BlockNumber)
 		if err = stagedsync.UnwindSendersStage(u, tx, cfg, ctx); err != nil {
@@ -1058,6 +1058,7 @@ func allSnapshots(cc *params.ChainConfig) *snapshotsync.RoSnapshots {
 	openSnapshotOnce.Do(func() {
 		if enableSnapshot {
 			snapshotCfg := ethconfig.NewSnapshotCfg(enableSnapshot, true)
+			dir.MustExist(filepath.Join(datadir, "snapshots"))
 			_allSnapshotsSingleton = snapshotsync.NewRoSnapshots(snapshotCfg, filepath.Join(datadir, "snapshots"))
 			if err := _allSnapshotsSingleton.ReopenSegments(); err != nil {
 				panic(err)
diff --git a/cmd/state/commands/erigon2.go b/cmd/state/commands/erigon2.go
index fd7e1baf75..f92e2322b2 100644
--- a/cmd/state/commands/erigon2.go
+++ b/cmd/state/commands/erigon2.go
@@ -33,11 +33,9 @@ import (
 	"github.com/ledgerwatch/erigon/core/types"
 	"github.com/ledgerwatch/erigon/core/types/accounts"
 	"github.com/ledgerwatch/erigon/core/vm"
-	"github.com/ledgerwatch/erigon/eth"
 	"github.com/ledgerwatch/erigon/eth/ethconfig"
 	"github.com/ledgerwatch/erigon/params"
 	"github.com/ledgerwatch/erigon/turbo/snapshotsync"
-	"github.com/ledgerwatch/erigon/turbo/snapshotsync/snapshothashes"
 )
 
 const (
@@ -182,11 +180,6 @@ func Erigon2(genesis *core.Genesis, chainConfig *params.ChainConfig, logger log.
 	engine := initConsensusEngine(chainConfig, logger)
 	var blockReader interfaces.FullBlockReader
 	if snapshotBlocks {
-		snConfig := snapshothashes.KnownConfig(chainConfig.ChainName)
-		snConfig.ExpectBlocks, err = eth.RestoreExpectedExternalSnapshot(historyDb, snConfig)
-		if err != nil {
-			return err
-		}
 		allSnapshots := snapshotsync.NewRoSnapshots(ethconfig.NewSnapshotCfg(true, false), path.Join(datadir, "snapshots"))
 		defer allSnapshots.Close()
 		blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots)
diff --git a/eth/backend.go b/eth/backend.go
index f75d9e68e8..5538239c60 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -19,7 +19,6 @@ package eth
 
 import (
 	"context"
-	"encoding/binary"
 	"errors"
 	"fmt"
 	"math/big"
@@ -77,7 +76,6 @@ import (
 	"github.com/ledgerwatch/erigon/rpc"
 	"github.com/ledgerwatch/erigon/turbo/shards"
 	"github.com/ledgerwatch/erigon/turbo/snapshotsync"
-	"github.com/ledgerwatch/erigon/turbo/snapshotsync/snapshothashes"
 	"github.com/ledgerwatch/erigon/turbo/snapshotsync/snapshotsynccli"
 	stages2 "github.com/ledgerwatch/erigon/turbo/stages"
 	"github.com/ledgerwatch/log/v3"
@@ -313,12 +311,6 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l
 	var blockReader interfaces.FullBlockReader
 	var allSnapshots *snapshotsync.RoSnapshots
 	if config.Snapshot.Enabled {
-		snConfig := snapshothashes.KnownConfig(chainConfig.ChainName)
-		snConfig.ExpectBlocks, err = RestoreExpectedExternalSnapshot(chainKv, snConfig)
-		if err != nil {
-			return nil, err
-		}
-
 		allSnapshots = snapshotsync.NewRoSnapshots(config.Snapshot, config.SnapshotDir.Path)
 		allSnapshots.AsyncOpenAll(ctx)
 		blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots)
@@ -564,49 +556,6 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l
 	return backend, nil
 }
 
-func RestoreExpectedExternalSnapshot(db kv.RwDB, snConfig *snapshothashes.Config) (uint64, error) {
-	const SyncedWithSnapshot = "synced_with_snapshot"
-	var snapshotToBlockInDB *uint64
-	// Check if we have an already initialized chain and fall back to
-	// that if so. Otherwise we need to generate a new genesis spec.
-	if err := db.View(context.Background(), func(tx kv.Tx) error {
-		v, err := tx.GetOne(kv.DatabaseInfo, []byte(SyncedWithSnapshot))
-		if err != nil {
-			return err
-		}
-		if v != nil {
-			valueInDB := binary.BigEndian.Uint64(v)
-			snapshotToBlockInDB = &valueInDB
-			return nil
-		}
-
-		return nil
-	}); err != nil {
-		return 0, err
-	}
-
-	if snapshotToBlockInDB != nil {
-		if *snapshotToBlockInDB != snConfig.ExpectBlocks {
-			return *snapshotToBlockInDB, nil //
-			//log.Warn(fmt.Sprintf("'incremental snapshots feature' not implemented yet. New snapshots available up to block %d, but this node was synced to snapshot %d and will keep other blocks in db. (it's safe, re-sync may reduce db size)", snapshotToBlockInDB, snConfig.ExpectBlocks))
-			//snConfig.ExpectBlocks = *snapshotToBlockInDB
-		}
-	}
-
-	if err := db.Update(context.Background(), func(tx kv.RwTx) error {
-		num := make([]byte, 8)
-		binary.BigEndian.PutUint64(num, snConfig.ExpectBlocks)
-		if err := tx.Put(kv.DatabaseInfo, []byte(SyncedWithSnapshot), num); err != nil {
-			return err
-		}
-		return nil
-	}); err != nil {
-		return 0, err
-	}
-
-	return snConfig.ExpectBlocks, nil
-}
-
 func (s *Ethereum) APIs() []rpc.API {
 	return []rpc.API{}
 }
diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go
index c069c0676f..0d0ba79168 100644
--- a/eth/stagedsync/stage_headers.go
+++ b/eth/stagedsync/stage_headers.go
@@ -1009,7 +1009,7 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R
 			expect := cfg.snapshotHashesCfg.ExpectBlocks
 			if headers >= expect && bodies >= expect && txs >= expect {
 				if err := cfg.snapshots.ReopenSegments(); err != nil {
-					return err
+					return fmt.Errorf("ReopenSegments: %w", err)
 				}
 				if expect > cfg.snapshots.BlocksAvailable() {
 					return fmt.Errorf("not enough snapshots available: %d > %d", expect, cfg.snapshots.BlocksAvailable())
@@ -1031,31 +1031,21 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R
 	}
 
 	// Create .idx files
-	if !cfg.snapshots.IndicesReady() {
+	if cfg.snapshots.IndicesAvailable() < cfg.snapshots.SegmentsAvailable() {
 		if !cfg.snapshots.SegmentsReady() {
 			return fmt.Errorf("not all snapshot segments are available")
 		}
 
 		// wait for Downloader service to download all expected snapshots
-		logEvery := time.NewTicker(logInterval)
-		defer logEvery.Stop()
-		headers, bodies, txs, err := cfg.snapshots.IdxAvailability()
-		if err != nil {
-			return err
-		}
-		expect := cfg.snapshotHashesCfg.ExpectBlocks
-		if headers < expect || bodies < expect || txs < expect {
+		if cfg.snapshots.IndicesAvailable() < cfg.snapshots.SegmentsAvailable() {
 			chainID, _ := uint256.FromBig(cfg.chainConfig.ChainID)
-			if err := snapshotsync.BuildIndices(ctx, cfg.snapshots, cfg.snapshotDir, *chainID, cfg.tmpdir, 0, log.LvlInfo); err != nil {
+			if err := snapshotsync.BuildIndices(ctx, cfg.snapshots, cfg.snapshotDir, *chainID, cfg.tmpdir, cfg.snapshots.IndicesAvailable(), log.LvlInfo); err != nil {
 				return err
 			}
 		}
 
 		if err := cfg.snapshots.ReopenIndices(); err != nil {
-			return err
-		}
-		if expect > cfg.snapshots.IndicesAvailable() {
-			return fmt.Errorf("not enough snapshots available: %d > %d", expect, cfg.snapshots.BlocksAvailable())
+			return fmt.Errorf("ReopenIndices: %w", err)
 		}
 	}
 
diff --git a/eth/stagedsync/stage_senders.go b/eth/stagedsync/stage_senders.go
index fcede85ba0..fc972862b0 100644
--- a/eth/stagedsync/stage_senders.go
+++ b/eth/stagedsync/stage_senders.go
@@ -401,16 +401,17 @@ func retireBlocks(s *PruneState, tx kv.RwTx, cfg SendersCfg, ctx context.Context
 		}
 	}
 	if !cfg.blockRetire.Snapshots().Cfg().KeepBlocks {
+		// TODO: remove this check for the release
+		if err := cfg.blockRetire.Snapshots().EnsureExpectedBlocksAreAvailable(cfg.snapshotHashesCfg); err != nil {
+			return err
+		}
+
 		canDeleteTo := cfg.blockRetire.CanDeleteTo(s.ForwardProgress)
 		if err := rawdb.DeleteAncientBlocks(tx, canDeleteTo, 1_000); err != nil {
 			return nil
 		}
 	}
 
-	// TODO: remove this check for the release
-	if err := cfg.blockRetire.Snapshots().EnsureExpectedBlocksAreAvailable(cfg.snapshotHashesCfg); err != nil {
-		return err
-	}
 	blockFrom, blockTo, ok := cfg.blockRetire.CanRetire(s.ForwardProgress)
 	if !ok {
 		return nil
diff --git a/eth/stagedsync/stage_senders_test.go b/eth/stagedsync/stage_senders_test.go
index d549449ca8..36288852a2 100644
--- a/eth/stagedsync/stage_senders_test.go
+++ b/eth/stagedsync/stage_senders_test.go
@@ -109,7 +109,7 @@ func TestSenders(t *testing.T) {
 
 	require.NoError(stages.SaveStageProgress(tx, stages.Bodies, 3))
 
-	cfg := StageSendersCfg(db, params.TestChainConfig, "", prune.Mode{}, snapshotsync.NewBlockRetire(1, "", nil, db))
+	cfg := StageSendersCfg(db, params.TestChainConfig, "", prune.Mode{}, snapshotsync.NewBlockRetire(1, "", nil, db, nil))
 	err := SpawnRecoverSendersStage(cfg, &StageState{ID: stages.Senders}, nil, tx, 3, ctx)
 	assert.NoError(t, err)
 
diff --git a/turbo/app/snapshots.go b/turbo/app/snapshots.go
index c9eb05b1aa..c3bf37690f 100644
--- a/turbo/app/snapshots.go
+++ b/turbo/app/snapshots.go
@@ -157,7 +157,7 @@ func doRetireCommand(cliCtx *cli.Context) error {
 	snapshots := snapshotsync.NewRoSnapshots(cfg, snapshotDir)
 	snapshots.ReopenSegments()
 
-	br := snapshotsync.NewBlockRetire(runtime.NumCPU()/2, tmpDir, snapshots, chainDB)
+	br := snapshotsync.NewBlockRetire(runtime.NumCPU()/2, tmpDir, snapshots, chainDB, nil)
 
 	for i := from; i < to; i += every {
 		br.RetireBlocksInBackground(ctx, i, i+every, *chainID, log.LvlInfo)
diff --git a/turbo/snapshotsync/block_snapshots.go b/turbo/snapshotsync/block_snapshots.go
index a5e916d86f..cc2a1d8a55 100644
--- a/turbo/snapshotsync/block_snapshots.go
+++ b/turbo/snapshotsync/block_snapshots.go
@@ -22,6 +22,7 @@ import (
 	common2 "github.com/ledgerwatch/erigon-lib/common"
 	"github.com/ledgerwatch/erigon-lib/common/dir"
 	"github.com/ledgerwatch/erigon-lib/compress"
+	proto_downloader "github.com/ledgerwatch/erigon-lib/gointerfaces/downloader"
 	"github.com/ledgerwatch/erigon-lib/kv"
 	"github.com/ledgerwatch/erigon-lib/recsplit"
 	"github.com/ledgerwatch/erigon-lib/txpool"
@@ -346,12 +347,15 @@ func NewRoSnapshots(cfg ethconfig.Snapshot, snapshotDir string) *RoSnapshots {
 	return &RoSnapshots{dir: snapshotDir, cfg: cfg, Headers: &headerSegments{}, Bodies: &bodySegments{}, Txs: &txnSegments{}}
 }
 
-func (s *RoSnapshots) Cfg() ethconfig.Snapshot  { return s.cfg }
-func (s *RoSnapshots) Dir() string              { return s.dir }
-func (s *RoSnapshots) SegmentsReady() bool      { return s.segmentsReady.Load() }
-func (s *RoSnapshots) BlocksAvailable() uint64  { return s.segmentsAvailable.Load() }
-func (s *RoSnapshots) IndicesReady() bool       { return s.indicesReady.Load() }
-func (s *RoSnapshots) IndicesAvailable() uint64 { return s.idxAvailable.Load() }
+func (s *RoSnapshots) Cfg() ethconfig.Snapshot   { return s.cfg }
+func (s *RoSnapshots) Dir() string               { return s.dir }
+func (s *RoSnapshots) SegmentsReady() bool       { return s.segmentsReady.Load() }
+func (s *RoSnapshots) IndicesReady() bool        { return s.indicesReady.Load() }
+func (s *RoSnapshots) IndicesAvailable() uint64  { return s.idxAvailable.Load() }
+func (s *RoSnapshots) SegmentsAvailable() uint64 { return s.segmentsAvailable.Load() }
+func (s *RoSnapshots) BlocksAvailable() uint64 {
+	return min(s.segmentsAvailable.Load(), s.idxAvailable.Load())
+}
 
 func (s *RoSnapshots) EnsureExpectedBlocksAreAvailable(cfg *snapshothashes.Config) error {
 	if s.BlocksAvailable() < cfg.ExpectBlocks {
@@ -396,18 +400,28 @@ func (s *RoSnapshots) ReopenSomeIndices(types ...Type) (err error) {
 	defer s.Bodies.lock.Unlock()
 	s.Txs.lock.Lock()
 	defer s.Txs.lock.Unlock()
+Loop:
 	for _, t := range types {
 		switch t {
 		case Headers:
 			if err := s.Headers.reopen(s.dir); err != nil {
+				if errors.Is(err, os.ErrNotExist) {
+					break Loop
+				}
 				return err
 			}
 		case Bodies:
 			if err := s.Bodies.reopen(s.dir); err != nil {
+				if errors.Is(err, os.ErrNotExist) {
+					break Loop
+				}
 				return err
 			}
 		case Transactions:
 			if err := s.Txs.reopen(s.dir); err != nil {
+				if errors.Is(err, os.ErrNotExist) {
+					break Loop
+				}
 				return err
 			}
 		default:
@@ -530,19 +544,19 @@ func (s *RoSnapshots) closeSegmentsLocked() {
 	}
 }
 func (s *RoSnapshots) ViewHeaders(blockNum uint64, f func(sn *HeaderSegment) error) (found bool, err error) {
-	if !s.indicesReady.Load() || blockNum > s.segmentsAvailable.Load() {
+	if !s.indicesReady.Load() || blockNum > s.BlocksAvailable() {
 		return false, nil
 	}
 	return s.Headers.ViewSegment(blockNum, f)
 }
 func (s *RoSnapshots) ViewBodies(blockNum uint64, f func(sn *BodySegment) error) (found bool, err error) {
-	if !s.indicesReady.Load() || blockNum > s.segmentsAvailable.Load() {
+	if !s.indicesReady.Load() || blockNum > s.BlocksAvailable() {
 		return false, nil
 	}
 	return s.Bodies.ViewSegment(blockNum, f)
 }
 func (s *RoSnapshots) ViewTxs(blockNum uint64, f func(sn *TxnSegment) error) (found bool, err error) {
-	if !s.indicesReady.Load() || blockNum > s.segmentsAvailable.Load() {
+	if !s.indicesReady.Load() || blockNum > s.BlocksAvailable() {
 		return false, nil
 	}
 	return s.Txs.ViewSegment(blockNum, f)
@@ -701,7 +715,7 @@ func noGaps(in []FileInfo) (out []FileInfo, err error) {
 			continue
 		}
 		if f.From != prevTo { // no gaps
-			return nil, fmt.Errorf("[open snapshots] snapshot missed: from %d to %d", prevTo, f.From)
+			return nil, fmt.Errorf("snapshot missed: from %d to %d", prevTo, f.From)
 		}
 		prevTo = f.To
 		out = append(out, f)
@@ -884,6 +898,8 @@ type BlockRetire struct {
 	tmpDir    string
 	snapshots *RoSnapshots
 	db        kv.RoDB
+
+	snapshotDownloader proto_downloader.DownloaderClient
 }
 
 type BlockRetireResult struct {
@@ -891,8 +907,8 @@ type BlockRetireResult struct {
 	Err                error
 }
 
-func NewBlockRetire(workers int, tmpDir string, snapshots *RoSnapshots, db kv.RoDB) *BlockRetire {
-	return &BlockRetire{workers: workers, tmpDir: tmpDir, snapshots: snapshots, wg: &sync.WaitGroup{}, db: db}
+func NewBlockRetire(workers int, tmpDir string, snapshots *RoSnapshots, db kv.RoDB, snapshotDownloader proto_downloader.DownloaderClient) *BlockRetire {
+	return &BlockRetire{workers: workers, tmpDir: tmpDir, snapshots: snapshots, wg: &sync.WaitGroup{}, db: db, snapshotDownloader: snapshotDownloader}
 }
 func (br *BlockRetire) Snapshots() *RoSnapshots { return br.snapshots }
 func (br *BlockRetire) Working() bool           { return br.working.Load() }
@@ -909,7 +925,16 @@ func (br *BlockRetire) CanRetire(curBlockNum uint64) (blockFrom, blockTo uint64,
 func canRetire(from, to uint64) (blockFrom, blockTo uint64, can bool) {
 	blockFrom = (from / 1_000) * 1_000
 	roundedTo1K := (to / 1_000) * 1_000
-	jump := roundedTo1K - blockFrom
+	var maxJump uint64 = 1_000
+	if blockFrom%500_000 == 0 {
+		maxJump = 500_000
+	} else if blockFrom%100_000 == 0 {
+		maxJump = 100_000
+	} else if blockFrom%10_000 == 0 {
+		maxJump = 10_000
+	}
+	//roundedTo1K := (to / 1_000) * 1_000
+	jump := min(maxJump, roundedTo1K-blockFrom)
 	switch { // only next segment sizes are allowed
 	case jump >= 500_000:
 		blockTo = blockFrom + 500_000
@@ -940,7 +965,7 @@ func (br *BlockRetire) RetireBlocksInBackground(ctx context.Context, blockFrom,
 		defer br.working.Store(false)
 		defer br.wg.Done()
 
-		err := retireBlocks(ctx, blockFrom, blockTo, chainID, br.tmpDir, br.snapshots, br.db, br.workers, lvl)
+		err := retireBlocks(ctx, blockFrom, blockTo, chainID, br.tmpDir, br.snapshots, br.db, br.workers, br.snapshotDownloader, lvl)
 		br.result = &BlockRetireResult{
 			BlockFrom: blockFrom,
 			BlockTo:   blockTo,
@@ -949,7 +974,7 @@ func (br *BlockRetire) RetireBlocksInBackground(ctx context.Context, blockFrom,
 	}()
 }
 
-func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint256.Int, tmpDir string, snapshots *RoSnapshots, db kv.RoDB, workers int, lvl log.Lvl) error {
+func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint256.Int, tmpDir string, snapshots *RoSnapshots, db kv.RoDB, workers int, snapshotDownloader proto_downloader.DownloaderClient, lvl log.Lvl) error {
 	log.Log(lvl, "[snapshots] Retire Blocks", "range", fmt.Sprintf("%dk-%dk", blockFrom/1000, blockTo/1000))
 	// in future we will do it in background
 	if err := DumpBlocks(ctx, blockFrom, blockTo, DEFAULT_SEGMENT_SIZE, tmpDir, snapshots.Dir(), db, workers, lvl); err != nil {
@@ -974,6 +999,20 @@ func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint25
 		return fmt.Errorf("ReopenIndices: %w", err)
 	}
 
+	// start seed large .seg of large size
+	if blockTo-blockFrom == DEFAULT_SEGMENT_SIZE {
+		req := &proto_downloader.DownloadRequest{Items: make([]*proto_downloader.DownloadItem, len(AllSnapshotTypes))}
+		for _, t := range AllSnapshotTypes {
+			req.Items = append(req.Items, &proto_downloader.DownloadItem{
+				Path: SegmentFileName(blockFrom, blockTo, t),
+			})
+		}
+		if snapshotDownloader != nil {
+			if _, err := snapshotDownloader.Download(ctx, req); err != nil {
+				return err
+			}
+		}
+	}
 	return nil
 }
 
diff --git a/turbo/snapshotsync/block_snapshots_test.go b/turbo/snapshotsync/block_snapshots_test.go
index f7d429822b..272d624885 100644
--- a/turbo/snapshotsync/block_snapshots_test.go
+++ b/turbo/snapshotsync/block_snapshots_test.go
@@ -117,6 +117,7 @@ func TestCanRetire(t *testing.T) {
 		{1_000_000, 1_120_000, 1_000_000, 1_100_000, true},
 		{2_500_000, 4_100_000, 2_500_000, 3_000_000, true},
 		{2_500_000, 2_500_100, 2_500_000, 2_500_000, false},
+		{1_001_000, 2_000_000, 1_001_000, 1_002_000, true},
 	}
 	for _, tc := range cases {
 		from, to, can := canRetire(tc.inFrom, tc.inTo)
@@ -174,6 +175,8 @@ func TestOpenAllSnapshot(t *testing.T) {
 
 	err = s.ReopenSegments()
 	require.NoError(err)
+	err = s.ReopenIndices()
+	require.NoError(err)
 	s.indicesReady.Store(true)
 	require.Equal(2, len(s.Headers.segments))
 
diff --git a/turbo/snapshotsync/snapshothashes/embed.go b/turbo/snapshotsync/snapshothashes/embed.go
index c129cf3253..79e617627d 100644
--- a/turbo/snapshotsync/snapshothashes/embed.go
+++ b/turbo/snapshotsync/snapshothashes/embed.go
@@ -38,10 +38,7 @@ var (
 )
 
 func newConfig(preverified Preverified) *Config {
-	return &Config{
-		ExpectBlocks: maxBlockNum(preverified),
-		Preverified:  preverified,
-	}
+	return &Config{ExpectBlocks: maxBlockNum(preverified), Preverified: preverified}
 }
 
 func maxBlockNum(preverified Preverified) uint64 {
diff --git a/turbo/stages/mock_sentry.go b/turbo/stages/mock_sentry.go
index efc7113126..75b43729fa 100644
--- a/turbo/stages/mock_sentry.go
+++ b/turbo/stages/mock_sentry.go
@@ -326,7 +326,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey
 				blockReader,
 			),
 			stagedsync.StageIssuanceCfg(mock.DB, mock.ChainConfig, blockReader, true),
-			stagedsync.StageSendersCfg(mock.DB, mock.ChainConfig, mock.tmpdir, prune, snapshotsync.NewBlockRetire(1, mock.tmpdir, allSnapshots, mock.DB)),
+			stagedsync.StageSendersCfg(mock.DB, mock.ChainConfig, mock.tmpdir, prune, snapshotsync.NewBlockRetire(1, mock.tmpdir, allSnapshots, mock.DB, snapshotsDownloader)),
 			stagedsync.StageExecuteBlocksCfg(
 				mock.DB,
 				prune,
diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go
index 15c266fe38..43a25a1227 100644
--- a/turbo/stages/stageloop.go
+++ b/turbo/stages/stageloop.go
@@ -302,7 +302,7 @@ func NewStagedSync(
 				blockReader,
 			),
 			stagedsync.StageIssuanceCfg(db, controlServer.ChainConfig, blockReader, cfg.EnabledIssuance),
-			stagedsync.StageSendersCfg(db, controlServer.ChainConfig, tmpdir, cfg.Prune, snapshotsync.NewBlockRetire(1, tmpdir, allSnapshots, db)),
+			stagedsync.StageSendersCfg(db, controlServer.ChainConfig, tmpdir, cfg.Prune, snapshotsync.NewBlockRetire(1, tmpdir, allSnapshots, db, snapshotDownloader)),
 			stagedsync.StageExecuteBlocksCfg(
 				db,
 				cfg.Prune,
-- 
GitLab