From 4ba1f03efe0828455ecfe342c76e35f4a29da900 Mon Sep 17 00:00:00 2001 From: Alex Sharov <AskAlexSharov@gmail.com> Date: Tue, 10 May 2022 09:29:44 +0700 Subject: [PATCH] Snapshots: atomic dir, step 3 (#4103) --- cmd/downloader/downloader/downloader.go | 319 +++++++++++------- cmd/downloader/downloader/grpc_server.go | 11 +- .../downloader/torrentcfg/torrentcfg.go | 21 +- cmd/downloader/downloader/util.go | 8 +- cmd/downloader/main.go | 28 +- cmd/integration/commands/stages.go | 2 +- cmd/rpcdaemon/commands/eth_subscribe_test.go | 2 +- cmd/utils/flags.go | 11 - eth/backend.go | 17 +- eth/stagedsync/stage_headers.go | 13 +- eth/stagedsync/stage_senders_test.go | 2 +- node/node.go | 6 - turbo/app/snapshots.go | 4 +- turbo/snapshotsync/block_reader.go | 174 +++++----- turbo/snapshotsync/block_snapshots.go | 30 +- turbo/stages/mock_sentry.go | 20 +- turbo/stages/stageloop.go | 20 +- 17 files changed, 340 insertions(+), 348 deletions(-) diff --git a/cmd/downloader/downloader/downloader.go b/cmd/downloader/downloader/downloader.go index c4511e198e..6664b6896c 100644 --- a/cmd/downloader/downloader/downloader.go +++ b/cmd/downloader/downloader/downloader.go @@ -3,27 +3,39 @@ package downloader import ( "context" "fmt" + "os" + "path/filepath" "runtime" + "strings" "sync" "time" "github.com/anacrolix/torrent" "github.com/anacrolix/torrent/metainfo" + "github.com/anacrolix/torrent/storage" common2 "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon-lib/kv/mdbx" "github.com/ledgerwatch/erigon/cmd/downloader/downloader/torrentcfg" + "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/log/v3" + mdbx2 "github.com/torquem-ch/mdbx-go/mdbx" "golang.org/x/sync/semaphore" ) +// Downloader - component which downloading historical files. Can use BitTorrent, or other protocols type Downloader struct { - torrentClient *torrent.Client - db kv.RwDB - cfg *torrentcfg.Cfg + db kv.RwDB + pieceCompletionDB storage.PieceCompletion + torrentClient *torrent.Client + clientLock *sync.RWMutex - statsLock *sync.RWMutex - stats AggStats - snapshotDir string + cfg *torrentcfg.Cfg + + statsLock *sync.RWMutex + stats AggStats + + folder storage.ClientImplCloser } type AggStats struct { @@ -40,125 +52,50 @@ type AggStats struct { UploadRate, DownloadRate uint64 } -func New(cfg *torrentcfg.Cfg, snapshotDir string) (*Downloader, error) { +func New(cfg *torrentcfg.Cfg) (*Downloader, error) { if err := portMustBeTCPAndUDPOpen(cfg.ListenPort); err != nil { return nil, err } - peerID, err := readPeerID(cfg.DB) + // Application must never see partially-downloaded files + // To provide such consistent view - downloader does: + // add suffix _tmp to <datadir>/snapshots - then method .onComplete will remove this suffix + // and App only work with <datadir>/snapshots folder + if !common.FileExist(filepath.Join(cfg.DataDir, "db")) { + cfg.DataDir += "_tmp" + } + db, c, m, torrentClient, err := openClient(cfg.ClientConfig) if err != nil { - return nil, fmt.Errorf("get peer id: %w", err) + return nil, fmt.Errorf("openClient: %w", err) } - cfg.PeerID = string(peerID) - torrentClient, err := torrent.NewClient(cfg.ClientConfig) + + peerID, err := readPeerID(db) if err != nil { - return nil, fmt.Errorf("fail to start torrent client: %w", err) + return nil, fmt.Errorf("get peer id: %w", err) } + cfg.PeerID = string(peerID) if len(peerID) == 0 { - if err = savePeerID(cfg.DB, torrentClient.PeerID()); err != nil { + if err = savePeerID(db, torrentClient.PeerID()); err != nil { return nil, fmt.Errorf("save peer id: %w", err) } } return &Downloader{ - cfg: cfg, - torrentClient: torrentClient, - db: cfg.DB, - statsLock: &sync.RWMutex{}, - snapshotDir: snapshotDir, + cfg: cfg, + db: db, + pieceCompletionDB: c, + folder: m, + torrentClient: torrentClient, + clientLock: &sync.RWMutex{}, + + statsLock: &sync.RWMutex{}, }, nil } -func (d *Downloader) Start(ctx context.Context, silent bool) error { - if err := BuildTorrentsAndAdd(ctx, d.snapshotDir, d.torrentClient); err != nil { - return fmt.Errorf("BuildTorrentsAndAdd: %w", err) - } - - var sem = semaphore.NewWeighted(int64(d.cfg.DownloadSlots)) - - go func() { - for { - torrents := d.Torrent().Torrents() - for _, t := range torrents { - <-t.GotInfo() - if t.Complete.Bool() { - continue - } - if err := sem.Acquire(ctx, 1); err != nil { - return - } - t.AllowDataDownload() - t.DownloadAll() - go func(t *torrent.Torrent) { - //r := t.NewReader() - //r.SetReadahead(t.Length()) - //_, _ = io.Copy(io.Discard, r) // enable streaming - it will prioritize sequential download - - <-t.Complete.On() - sem.Release(1) - }(t) - } - time.Sleep(30 * time.Second) - } - }() - - go func() { - var m runtime.MemStats - logEvery := time.NewTicker(20 * time.Second) - defer logEvery.Stop() - - interval := 10 * time.Second - statEvery := time.NewTicker(interval) - defer statEvery.Stop() - for { - select { - case <-ctx.Done(): - return - case <-statEvery.C: - d.ReCalcStats(interval) - - case <-logEvery.C: - if silent { - continue - } - - stats := d.Stats() - - if stats.MetadataReady < stats.FilesTotal { - log.Info(fmt.Sprintf("[Snapshots] Waiting for torrents metadata: %d/%d", stats.MetadataReady, stats.FilesTotal)) - continue - } - - runtime.ReadMemStats(&m) - if stats.Completed { - log.Info("[Snapshots] Seeding", - "up", common2.ByteCount(stats.UploadRate)+"/s", - "peers", stats.PeersUnique, - "connections", stats.ConnectionsTotal, - "files", stats.FilesTotal, - "alloc", common2.ByteCount(m.Alloc), "sys", common2.ByteCount(m.Sys)) - continue - } - - log.Info("[Snapshots] Downloading", - "progress", fmt.Sprintf("%.2f%% %s/%s", stats.Progress, common2.ByteCount(stats.BytesCompleted), common2.ByteCount(stats.BytesTotal)), - "download", common2.ByteCount(stats.DownloadRate)+"/s", - "upload", common2.ByteCount(stats.UploadRate)+"/s", - "peers", stats.PeersUnique, - "connections", stats.ConnectionsTotal, - "files", stats.FilesTotal, - "alloc", common2.ByteCount(m.Alloc), "sys", common2.ByteCount(m.Sys)) - if stats.PeersUnique == 0 { - ips := d.Torrent().BadPeerIPs() - if len(ips) > 0 { - log.Info("[Snapshots] Stats", "banned", ips) - } - } - } - } - }() - - return nil +func (d *Downloader) SnapshotsDir() string { + d.clientLock.RLock() + defer d.clientLock.RUnlock() + return d.cfg.DataDir } func (d *Downloader) ReCalcStats(interval time.Duration) { @@ -205,9 +142,47 @@ func (d *Downloader) ReCalcStats(interval time.Duration) { stats.PeersUnique = int32(len(peers)) stats.FilesTotal = int32(len(torrents)) + if prevStats.Completed == false && stats.Completed == true { + d.onComplete() + } + d.stats = stats } +// onComplete - only once - after download of all files fully done: +// - closing torrent client, closing downloader db +// - removing _tmp suffix from snapshotDir +// - open new torrentClient and db +func (d *Downloader) onComplete() { + if !strings.HasSuffix(d.cfg.DataDir, "_tmp") { + return + } + + d.clientLock.Lock() + defer d.clientLock.Unlock() + + d.torrentClient.Close() + d.folder.Close() + d.pieceCompletionDB.Close() + d.db.Close() + + // rename _tmp folder + snapshotDir := strings.TrimSuffix(d.cfg.DataDir, "_tmp") + if err := os.Rename(d.cfg.DataDir, snapshotDir); err != nil { + panic(err) + } + d.cfg.DataDir = snapshotDir + + db, c, m, torrentClient, err := openClient(d.cfg.ClientConfig) + if err != nil { + panic(err) + } + d.db = db + d.pieceCompletionDB = c + d.folder = m + d.torrentClient = torrentClient +} + func (d *Downloader) Stats() AggStats { d.statsLock.RLock() defer d.statsLock.RUnlock() @@ -216,12 +191,13 @@ func (d *Downloader) Stats() AggStats { func (d *Downloader) Close() { d.torrentClient.Close() - d.db.Close() - if d.cfg.CompletionCloser != nil { - if err := d.cfg.CompletionCloser.Close(); err != nil { - log.Warn("[Snapshots] CompletionCloser", "err", err) - } + if err := d.folder.Close(); err != nil { + log.Warn("[Snapshots] folder.close", "err", err) + } + if err := d.pieceCompletionDB.Close(); err != nil { + log.Warn("[Snapshots] pieceCompletionDB.close", "err", err) } + d.db.Close() } func (d *Downloader) PeerID() []byte { @@ -241,5 +217,122 @@ func (d *Downloader) StopSeeding(hash metainfo.Hash) error { } func (d *Downloader) Torrent() *torrent.Client { + d.clientLock.RLock() + defer d.clientLock.RUnlock() return d.torrentClient } + +func openClient(cfg *torrent.ClientConfig) (db kv.RwDB, c storage.PieceCompletion, m storage.ClientImplCloser, torrentClient *torrent.Client, err error) { + snapshotDir := cfg.DataDir + db, err = mdbx.NewMDBX(log.New()). + Flags(func(f uint) uint { return f | mdbx2.SafeNoSync }). + Label(kv.DownloaderDB). + WithTablessCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { return kv.DownloaderTablesCfg }). + SyncPeriod(15 * time.Second). + Path(filepath.Join(snapshotDir, "db")). + Open() + if err != nil { + return nil, nil, nil, nil, err + } + c, err = torrentcfg.NewMdbxPieceCompletion(db) + if err != nil { + return nil, nil, nil, nil, err + } + m = storage.NewMMapWithCompletion(snapshotDir, c) + torrentClient, err = torrent.NewClient(cfg) + if err != nil { + return nil, nil, nil, nil, err + } + + if err := BuildTorrentsAndAdd(context.Background(), snapshotDir, torrentClient); err != nil { + if err != nil { + return nil, nil, nil, nil, fmt.Errorf("BuildTorrentsAndAdd: %w", err) + } + } + + return db, c, m, torrentClient, nil +} + +func MainLoop(ctx context.Context, d *Downloader, silent bool) { + var sem = semaphore.NewWeighted(int64(d.cfg.DownloadSlots)) + + go func() { + for { + torrents := d.Torrent().Torrents() + for _, t := range torrents { + <-t.GotInfo() + if t.Complete.Bool() { + continue + } + if err := sem.Acquire(ctx, 1); err != nil { + return + } + t.AllowDataDownload() + t.DownloadAll() + go func(t *torrent.Torrent) { + //r := t.NewReader() + //r.SetReadahead(t.Length()) + //_, _ = io.Copy(io.Discard, r) // enable streaming - it will prioritize sequential download + + <-t.Complete.On() + sem.Release(1) + }(t) + } + time.Sleep(30 * time.Second) + } + }() + + var m runtime.MemStats + logEvery := time.NewTicker(20 * time.Second) + defer logEvery.Stop() + + interval := 10 * time.Second + statEvery := time.NewTicker(interval) + defer statEvery.Stop() + for { + select { + case <-ctx.Done(): + return + case <-statEvery.C: + d.ReCalcStats(interval) + + case <-logEvery.C: + if silent { + continue + } + + stats := d.Stats() + + if stats.MetadataReady < stats.FilesTotal { + log.Info(fmt.Sprintf("[Snapshots] Waiting for torrents metadata: %d/%d", stats.MetadataReady, stats.FilesTotal)) + continue + } + + runtime.ReadMemStats(&m) + if stats.Completed { + log.Info("[Snapshots] Seeding", + "up", common2.ByteCount(stats.UploadRate)+"/s", + "peers", stats.PeersUnique, + "connections", stats.ConnectionsTotal, + "files", stats.FilesTotal, + "alloc", common2.ByteCount(m.Alloc), "sys", common2.ByteCount(m.Sys)) + continue + } + + log.Info("[Snapshots] Downloading", + "progress", fmt.Sprintf("%.2f%% %s/%s", stats.Progress, common2.ByteCount(stats.BytesCompleted), common2.ByteCount(stats.BytesTotal)), + "download", common2.ByteCount(stats.DownloadRate)+"/s", + "upload", common2.ByteCount(stats.UploadRate)+"/s", + "peers", stats.PeersUnique, + "connections", stats.ConnectionsTotal, + "files", stats.FilesTotal, + "alloc", common2.ByteCount(m.Alloc), "sys", common2.ByteCount(m.Sys)) + if stats.PeersUnique == 0 { + ips := d.Torrent().BadPeerIPs() + if len(ips) > 0 { + log.Info("[Snapshots] Stats", "banned", ips) + } + } + } + } +} diff --git a/cmd/downloader/downloader/grpc_server.go b/cmd/downloader/downloader/grpc_server.go index 14a4326fb9..f7b27441b6 100644 --- a/cmd/downloader/downloader/grpc_server.go +++ b/cmd/downloader/downloader/grpc_server.go @@ -15,14 +15,13 @@ var ( _ proto_downloader.DownloaderServer = &GrpcServer{} ) -func NewGrpcServer(d *Downloader, snapshotDir string) (*GrpcServer, error) { - return &GrpcServer{d: d, snapshotDir: snapshotDir}, nil +func NewGrpcServer(d *Downloader) (*GrpcServer, error) { + return &GrpcServer{d: d}, nil } type GrpcServer struct { proto_downloader.UnimplementedDownloaderServer - d *Downloader - snapshotDir string + d *Downloader } func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.DownloadRequest) (*emptypb.Empty, error) { @@ -30,7 +29,7 @@ func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.Dow mi := &metainfo.MetaInfo{AnnounceList: Trackers} for _, it := range request.Items { if it.TorrentHash == nil { - err := BuildTorrentAndAdd(ctx, it.Path, s.snapshotDir, torrentClient) + err := BuildTorrentAndAdd(ctx, it.Path, s.d.SnapshotsDir(), torrentClient) if err != nil { return nil, err } @@ -53,7 +52,7 @@ func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.Dow t.AllowDataUpload() <-t.GotInfo() mi := t.Metainfo() - if err := CreateTorrentFileIfNotExists(s.snapshotDir, t.Info(), &mi); err != nil { + if err := CreateTorrentFileIfNotExists(s.d.SnapshotsDir(), t.Info(), &mi); err != nil { log.Warn("[downloader] create torrent file", "err", err) return } diff --git a/cmd/downloader/downloader/torrentcfg/torrentcfg.go b/cmd/downloader/downloader/torrentcfg/torrentcfg.go index f4b9f21343..81eb01f0ad 100644 --- a/cmd/downloader/downloader/torrentcfg/torrentcfg.go +++ b/cmd/downloader/downloader/torrentcfg/torrentcfg.go @@ -1,15 +1,11 @@ package torrentcfg import ( - "fmt" - "io" "time" lg "github.com/anacrolix/log" "github.com/anacrolix/torrent" - "github.com/anacrolix/torrent/storage" "github.com/c2h5oh/datasize" - "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/p2p/nat" "github.com/ledgerwatch/log/v3" "golang.org/x/time/rate" @@ -27,9 +23,9 @@ const DefaultNetworkChunkSize = DefaultPieceSize type Cfg struct { *torrent.ClientConfig - DB kv.RwDB - CompletionCloser io.Closer - DownloadSlots int + //DB kv.RwDB + //CompletionCloser io.Closer + DownloadSlots int } func Default() *torrent.ClientConfig { @@ -49,7 +45,7 @@ func Default() *torrent.ClientConfig { return torrentConfig } -func New(snapshotsDir string, verbosity lg.Level, natif nat.Interface, downloadRate, uploadRate datasize.ByteSize, port, connsPerFile int, db kv.RwDB, downloadSlots int) (*Cfg, error) { +func New(snapshotsDir string, verbosity lg.Level, natif nat.Interface, downloadRate, uploadRate datasize.ByteSize, port, connsPerFile int, downloadSlots int) (*Cfg, error) { torrentConfig := Default() // We would-like to reduce amount of goroutines in Erigon, so reducing next params torrentConfig.EstablishedConnsPerTorrent = connsPerFile // default: 50 @@ -102,12 +98,5 @@ func New(snapshotsDir string, verbosity lg.Level, natif nat.Interface, downloadR torrentConfig.Logger = lg.Default.FilterLevel(verbosity) torrentConfig.Logger.Handlers = []lg.Handler{adapterHandler{}} - c, err := NewMdbxPieceCompletion(db) - if err != nil { - return nil, fmt.Errorf("NewBoltPieceCompletion: %w", err) - } - m := storage.NewMMapWithCompletion(snapshotsDir, c) - //m := storage.NewFileWithCompletion(snapshotsDir, c) - torrentConfig.DefaultStorage = m - return &Cfg{ClientConfig: torrentConfig, DB: db, CompletionCloser: m, DownloadSlots: downloadSlots}, nil + return &Cfg{ClientConfig: torrentConfig, DownloadSlots: downloadSlots}, nil } diff --git a/cmd/downloader/downloader/util.go b/cmd/downloader/downloader/util.go index de06d1beaf..91c696928c 100644 --- a/cmd/downloader/downloader/util.go +++ b/cmd/downloader/downloader/util.go @@ -331,7 +331,13 @@ func AddTorrentFile(ctx context.Context, torrentFilePath string, torrentClient * if err != nil { return nil, err } - ts.ChunkSize = torrentcfg.DefaultNetworkChunkSize + + if _, ok := torrentClient.Torrent(ts.InfoHash); !ok { // can set ChunkSize only for new torrents + ts.ChunkSize = torrentcfg.DefaultNetworkChunkSize + } else { + ts.ChunkSize = 0 + } + t, _, err := torrentClient.AddTorrentSpec(ts) if err != nil { return nil, err diff --git a/cmd/downloader/main.go b/cmd/downloader/main.go index e299fc78ac..fa9c7a6ba9 100644 --- a/cmd/downloader/main.go +++ b/cmd/downloader/main.go @@ -14,8 +14,6 @@ import ( grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" "github.com/ledgerwatch/erigon-lib/common" proto_downloader "github.com/ledgerwatch/erigon-lib/gointerfaces/downloader" - "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon-lib/kv/mdbx" "github.com/ledgerwatch/erigon/cmd/downloader/downloader" "github.com/ledgerwatch/erigon/cmd/downloader/downloader/torrentcfg" "github.com/ledgerwatch/erigon/cmd/utils" @@ -25,7 +23,6 @@ import ( "github.com/ledgerwatch/log/v3" "github.com/pelletier/go-toml/v2" "github.com/spf13/cobra" - mdbx2 "github.com/torquem-ch/mdbx-go/mdbx" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/health" @@ -135,36 +132,20 @@ func Downloader(ctx context.Context) error { return fmt.Errorf("invalid nat option %s: %w", natSetting, err) } - db, err := mdbx.NewMDBX(log.New()). - Flags(func(f uint) uint { return f | mdbx2.SafeNoSync }). - Label(kv.DownloaderDB). - WithTablessCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { - return kv.DownloaderTablesCfg - }). - SyncPeriod(15 * time.Second). - Path(filepath.Join(snapshotDir, "db")). - Open() + cfg, err := torrentcfg.New(snapshotDir, torrentLogLevel, natif, downloadRate, uploadRate, torrentPort, torrentConnsPerFile, torrentDownloadSlots) if err != nil { return err } - cfg, err := torrentcfg.New(snapshotDir, torrentLogLevel, natif, downloadRate, uploadRate, torrentPort, torrentConnsPerFile, db, torrentDownloadSlots) - if err != nil { - return err - } - defer cfg.CompletionCloser.Close() - - d, err := downloader.New(cfg, snapshotDir) + d, err := downloader.New(cfg) if err != nil { return err } defer d.Close() log.Info("[torrent] Start", "my peerID", fmt.Sprintf("%x", d.Torrent().PeerID())) - if err := d.Start(ctx, false); err != nil { - return err - } + go downloader.MainLoop(ctx, d, false) - bittorrentServer, err := downloader.NewGrpcServer(d, snapshotDir) + bittorrentServer, err := downloader.NewGrpcServer(d) if err != nil { return fmt.Errorf("new server: %w", err) } @@ -192,7 +173,6 @@ var printTorrentHashes = &cobra.Command{ if forceRebuild { // remove and create .torrent files (will re-read all snapshots) removePieceCompletionStorage(snapshotDir) - files, err := downloader.AllTorrentPaths(snapshotDir) if err != nil { return err diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index b70dca35e3..fa0f84df82 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -611,7 +611,7 @@ func stageSenders(db kv.RwDB, ctx context.Context) error { if workers < 1 { workers = 1 } - br = snapshotsync.NewBlockRetire(workers, tmpdir, snapshots, snapshots.Dir(), db, nil, nil) + br = snapshotsync.NewBlockRetire(workers, tmpdir, snapshots, db, nil, nil) } pm, err := prune.Get(tx) diff --git a/cmd/rpcdaemon/commands/eth_subscribe_test.go b/cmd/rpcdaemon/commands/eth_subscribe_test.go index d0c06cd560..7527216c3c 100644 --- a/cmd/rpcdaemon/commands/eth_subscribe_test.go +++ b/cmd/rpcdaemon/commands/eth_subscribe_test.go @@ -20,7 +20,7 @@ import ( func TestEthSubscribe(t *testing.T) { m, require := stages.Mock(t), require.New(t) - chain, err := core.GenerateChain(m.ChainConfig, m.Genesis, m.Engine, m.DB, 42, func(i int, b *core.BlockGen) { + chain, err := core.GenerateChain(m.ChainConfig, m.Genesis, m.Engine, m.DB, 21, func(i int, b *core.BlockGen) { b.SetCoinbase(common.Address{1}) }, false /* intermediateHashes */) require.NoError(err) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 488c53aa0d..633fe9993e 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -28,13 +28,11 @@ import ( "strings" "text/tabwriter" "text/template" - "time" lg "github.com/anacrolix/log" "github.com/c2h5oh/datasize" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/kvcache" - "github.com/ledgerwatch/erigon-lib/kv/mdbx" "github.com/ledgerwatch/erigon-lib/txpool" "github.com/ledgerwatch/erigon/cmd/downloader/downloader/torrentcfg" "github.com/ledgerwatch/log/v3" @@ -59,7 +57,6 @@ import ( "github.com/ledgerwatch/erigon/p2p/nat" "github.com/ledgerwatch/erigon/p2p/netutil" "github.com/ledgerwatch/erigon/params" - mdbx2 "github.com/torquem-ch/mdbx-go/mdbx" ) func init() { @@ -1392,13 +1389,6 @@ func SetEthConfig(ctx *cli.Context, nodeConfig *node.Config, cfg *ethconfig.Conf panic(err) } - db := mdbx.NewMDBX(log.New()). - Flags(func(f uint) uint { return f | mdbx2.SafeNoSync }). - Label(kv.DownloaderDB). - WithTablessCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { return kv.DownloaderTablesCfg }). - SyncPeriod(15 * time.Second). - Path(filepath.Join(cfg.SnapshotDir, "db")). - MustOpen() var err error cfg.Torrent, err = torrentcfg.New(cfg.SnapshotDir, torrentcfg.String2LogLevel[ctx.GlobalString(TorrentVerbosityFlag.Name)], @@ -1406,7 +1396,6 @@ func SetEthConfig(ctx *cli.Context, nodeConfig *node.Config, cfg *ethconfig.Conf downloadRate, uploadRate, ctx.GlobalInt(TorrentPortFlag.Name), ctx.GlobalInt(TorrentConnsPerFileFlag.Name), - db, ctx.GlobalInt(TorrentDownloadSlotsFlag.Name), ) if err != nil { diff --git a/eth/backend.go b/eth/backend.go index 17cdb2120c..5a1b7ff960 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -139,7 +139,7 @@ type Ethereum struct { txPool2GrpcServer txpool_proto.TxpoolServer notifyMiningAboutNewTxs chan struct{} - downloadProtocols *downloader.Downloader + downloader *downloader.Downloader } // New creates a new Ethereum object (including the @@ -284,15 +284,12 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l backend.downloaderClient, err = downloadergrpc.NewClient(ctx, stack.Config().DownloaderAddr) } else { // start embedded Downloader - backend.downloadProtocols, err = downloader.New(config.Torrent, config.SnapshotDir) + backend.downloader, err = downloader.New(config.Torrent) if err != nil { return nil, err } - if err := backend.downloadProtocols.Start(ctx, true); err != nil { - return nil, fmt.Errorf("downloadProtocols start: %w", err) - } - - bittorrentServer, err := downloader.NewGrpcServer(backend.downloadProtocols, config.SnapshotDir) + go downloader.MainLoop(ctx, backend.downloader, true) + bittorrentServer, err := downloader.NewGrpcServer(backend.downloader) if err != nil { return nil, fmt.Errorf("new server: %w", err) } @@ -439,7 +436,7 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l creds, stack.Config().HealthCheck) if err != nil { - return nil, err + return nil, fmt.Errorf("private api: %w", err) } } @@ -822,8 +819,8 @@ func (s *Ethereum) Start() error { func (s *Ethereum) Stop() error { // Stop all the peer-related stuff first. s.sentryCancel() - if s.downloadProtocols != nil { - s.downloadProtocols.Close() + if s.downloader != nil { + s.downloader.Close() } if s.privateAPI != nil { shutdownDone := make(chan bool) diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index 2df6b7020a..ba3a9ffcdd 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -48,10 +48,8 @@ type HeadersCfg struct { batchSize datasize.ByteSize noP2PDiscovery bool tmpdir string - snapshotDir string snapshots *snapshotsync.RoSnapshots - snapshotHashesCfg *snapshothashes.Config snapshotDownloader proto_downloader.DownloaderClient blockReader interfaces.FullBlockReader dbEventNotifier snapshotsync.DBEventNotifier @@ -71,9 +69,7 @@ func StageHeadersCfg( snapshotDownloader proto_downloader.DownloaderClient, blockReader interfaces.FullBlockReader, tmpdir string, - snapshotDir string, - dbEventNotifier snapshotsync.DBEventNotifier, -) HeadersCfg { + dbEventNotifier snapshotsync.DBEventNotifier) HeadersCfg { return HeadersCfg{ db: db, hd: headerDownload, @@ -88,8 +84,6 @@ func StageHeadersCfg( snapshots: snapshots, snapshotDownloader: snapshotDownloader, blockReader: blockReader, - snapshotHashesCfg: snapshothashes.KnownConfig(chainConfig.ChainName), - snapshotDir: snapshotDir, dbEventNotifier: dbEventNotifier, } } @@ -1069,7 +1063,8 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R if err := cfg.snapshots.Reopen(); err != nil { return fmt.Errorf("ReopenSegments: %w", err) } - expect := cfg.snapshotHashesCfg.ExpectBlocks + + expect := snapshothashes.KnownConfig(cfg.chainConfig.ChainName).ExpectBlocks if cfg.snapshots.SegmentsAvailable() < expect { c, err := tx.Cursor(kv.Headers) if err != nil { @@ -1108,7 +1103,7 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R if workers > 2 { workers = 2 // 4 workers get killed on 16Gb RAM } - if err := snapshotsync.BuildIndices(ctx, cfg.snapshots, cfg.snapshotDir, *chainID, cfg.tmpdir, cfg.snapshots.IndicesAvailable(), workers, log.LvlInfo); err != nil { + if err := snapshotsync.BuildIndices(ctx, cfg.snapshots, *chainID, cfg.tmpdir, cfg.snapshots.IndicesAvailable(), workers, log.LvlInfo); err != nil { return err } } diff --git a/eth/stagedsync/stage_senders_test.go b/eth/stagedsync/stage_senders_test.go index b592c3a18c..c6384bc871 100644 --- a/eth/stagedsync/stage_senders_test.go +++ b/eth/stagedsync/stage_senders_test.go @@ -109,7 +109,7 @@ func TestSenders(t *testing.T) { require.NoError(stages.SaveStageProgress(tx, stages.Bodies, 3)) - cfg := StageSendersCfg(db, params.TestChainConfig, "", prune.Mode{}, snapshotsync.NewBlockRetire(1, "", nil, "", db, nil, nil)) + cfg := StageSendersCfg(db, params.TestChainConfig, "", prune.Mode{}, snapshotsync.NewBlockRetire(1, "", nil, db, nil, nil)) err := SpawnRecoverSendersStage(cfg, &StageState{ID: stages.Senders}, nil, tx, 3, ctx) assert.NoError(t, err) diff --git a/node/node.go b/node/node.go index 465e11fce0..97cd9ed3d8 100644 --- a/node/node.go +++ b/node/node.go @@ -472,16 +472,10 @@ func (n *Node) Server() *p2p.Server { } // DataDir retrieves the current datadir used by the protocol stack. -// Deprecated: No files should be stored in this directory, use InstanceDir instead. func (n *Node) DataDir() string { return n.config.DataDir } -// InstanceDir retrieves the instance directory used by the protocol stack. -func (n *Node) InstanceDir() string { - return n.config.DataDir -} - // HTTPEndpoint returns the URL of the HTTP server. Note that this URL does not // contain the JSON-RPC path prefix set by HTTPPathPrefix. func (n *Node) HTTPEndpoint() string { diff --git a/turbo/app/snapshots.go b/turbo/app/snapshots.go index e5665c369f..b09a363248 100644 --- a/turbo/app/snapshots.go +++ b/turbo/app/snapshots.go @@ -256,7 +256,7 @@ func doRetireCommand(cliCtx *cli.Context) error { if workers < 1 { workers = 1 } - br := snapshotsync.NewBlockRetire(workers, tmpDir, snapshots, snapshotDir, chainDB, nil, nil) + br := snapshotsync.NewBlockRetire(workers, tmpDir, snapshots, chainDB, nil, nil) for i := from; i < to; i += every { br.RetireBlocksInBackground(ctx, i, i+every, *chainID, log.LvlInfo) @@ -300,7 +300,7 @@ func rebuildIndices(ctx context.Context, chainDB kv.RoDB, cfg ethconfig.Snapshot if err := allSnapshots.Reopen(); err != nil { return err } - if err := snapshotsync.BuildIndices(ctx, allSnapshots, snapshotDir, *chainID, tmpDir, from, workers, log.LvlInfo); err != nil { + if err := snapshotsync.BuildIndices(ctx, allSnapshots, *chainID, tmpDir, from, workers, log.LvlInfo); err != nil { return err } return nil diff --git a/turbo/snapshotsync/block_reader.go b/turbo/snapshotsync/block_reader.go index c5f8bf1a00..25f1c8354d 100644 --- a/turbo/snapshotsync/block_reader.go +++ b/turbo/snapshotsync/block_reader.go @@ -187,19 +187,10 @@ type BlockReaderWithSnapshots struct { func NewBlockReaderWithSnapshots(snapshots *RoSnapshots) *BlockReaderWithSnapshots { return &BlockReaderWithSnapshots{sn: snapshots} } + func (back *BlockReaderWithSnapshots) HeaderByNumber(ctx context.Context, tx kv.Getter, blockHeight uint64) (h *types.Header, err error) { ok, err := back.sn.ViewHeaders(blockHeight, func(segment *HeaderSegment) error { - if segment.idxHeaderHash == nil { - fmt.Printf("why? %d, %d, %d, %d, %d\n", blockHeight, segment.From, segment.To, back.sn.segmentsAvailable.Load(), back.sn.idxAvailable.Load()) - back.sn.PrintDebug() - for _, sn := range back.sn.Headers.segments { - if sn.idxHeaderHash == nil { - fmt.Printf("seg with nil idx: %d,%d\n", segment.From, segment.To) - } - } - fmt.Printf("==== end debug print ====\n") - } - h, err = back.headerFromSnapshot(blockHeight, segment, nil) + h, _, err = back.headerFromSnapshot(blockHeight, segment, nil) if err != nil { return err } @@ -245,7 +236,7 @@ func (back *BlockReaderWithSnapshots) HeaderByHash(ctx context.Context, tx kv.Ge func (back *BlockReaderWithSnapshots) CanonicalHash(ctx context.Context, tx kv.Getter, blockHeight uint64) (h common.Hash, err error) { ok, err := back.sn.ViewHeaders(blockHeight, func(segment *HeaderSegment) error { - header, err := back.headerFromSnapshot(blockHeight, segment, nil) + header, _, err := back.headerFromSnapshot(blockHeight, segment, nil) if err != nil { return err } @@ -267,7 +258,7 @@ func (back *BlockReaderWithSnapshots) CanonicalHash(ctx context.Context, tx kv.G func (back *BlockReaderWithSnapshots) Header(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (h *types.Header, err error) { ok, err := back.sn.ViewHeaders(blockHeight, func(segment *HeaderSegment) error { - h, err = back.headerFromSnapshot(blockHeight, segment, nil) + h, _, err = back.headerFromSnapshot(blockHeight, segment, nil) if err != nil { return err } @@ -283,7 +274,7 @@ func (back *BlockReaderWithSnapshots) Header(ctx context.Context, tx kv.Getter, func (back *BlockReaderWithSnapshots) ReadHeaderByNumber(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (h *types.Header, err error) { ok, err := back.sn.ViewHeaders(blockHeight, func(segment *HeaderSegment) error { - h, err = back.headerFromSnapshot(blockHeight, segment, nil) + h, _, err = back.headerFromSnapshot(blockHeight, segment, nil) if err != nil { return err } @@ -303,8 +294,9 @@ func (back *BlockReaderWithSnapshots) ReadHeaderByNumber(ctx context.Context, tx func (back *BlockReaderWithSnapshots) BodyWithTransactions(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (body *types.Body, err error) { var baseTxnID uint64 var txsAmount uint32 + var buf []byte ok, err := back.sn.ViewBodies(blockHeight, func(seg *BodySegment) error { - body, baseTxnID, txsAmount, err = back.bodyFromSnapshot(blockHeight, seg, nil) + body, baseTxnID, txsAmount, buf, err = back.bodyFromSnapshot(blockHeight, seg, buf) if err != nil { return err } @@ -315,10 +307,13 @@ func (back *BlockReaderWithSnapshots) BodyWithTransactions(ctx context.Context, } if ok { ok, err = back.sn.ViewTxs(blockHeight, func(seg *TxnSegment) error { - txs, senders, err := back.txsFromSnapshot(baseTxnID, txsAmount, seg, nil) + txs, senders, err := back.txsFromSnapshot(baseTxnID, txsAmount, seg, buf) if err != nil { return err } + if txs == nil { + return nil + } body.Transactions = txs body.SendersToTxs(senders) return nil @@ -352,7 +347,7 @@ func (back *BlockReaderWithSnapshots) BodyRlp(ctx context.Context, tx kv.Getter, func (back *BlockReaderWithSnapshots) Body(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (body *types.Body, err error) { ok, err := back.sn.ViewBodies(blockHeight, func(seg *BodySegment) error { - body, _, _, err = back.bodyFromSnapshot(blockHeight, seg, nil) + body, _, _, _, err = back.bodyFromSnapshot(blockHeight, seg, nil) if err != nil { return err } @@ -372,15 +367,8 @@ func (back *BlockReaderWithSnapshots) BlockWithSenders(ctx context.Context, tx k var buf []byte var h *types.Header ok, err := back.sn.ViewHeaders(blockHeight, func(seg *HeaderSegment) error { - headerOffset := seg.idxHeaderHash.Lookup2(blockHeight - seg.idxHeaderHash.BaseDataID()) - gg := seg.seg.MakeGetter() - gg.Reset(headerOffset) - buf, _ = gg.Next(buf[:0]) - if len(buf) == 0 { - return nil - } - h = &types.Header{} - if err = rlp.DecodeBytes(buf[1:], h); err != nil { + h, buf, err = back.headerFromSnapshot(blockHeight, seg, buf) + if err != nil { return err } return nil @@ -388,19 +376,13 @@ func (back *BlockReaderWithSnapshots) BlockWithSenders(ctx context.Context, tx k if err != nil { return } - if ok { - var b *types.BodyForStorage + if ok && h != nil { + var b *types.Body + var baseTxnId uint64 + var txsAmount uint32 ok, err = back.sn.ViewBodies(blockHeight, func(seg *BodySegment) error { - bodyOffset := seg.idxBodyNumber.Lookup2(blockHeight - seg.idxBodyNumber.BaseDataID()) - gg := seg.seg.MakeGetter() - gg.Reset(bodyOffset) - buf, _ = gg.Next(buf[:0]) - if len(buf) == 0 { - return nil - } - b = &types.BodyForStorage{} - reader := bytes.NewReader(buf) - if err = rlp.Decode(reader, b); err != nil { + b, baseTxnId, txsAmount, buf, err = back.bodyFromSnapshot(blockHeight, seg, buf) + if err != nil { return err } return nil @@ -408,8 +390,8 @@ func (back *BlockReaderWithSnapshots) BlockWithSenders(ctx context.Context, tx k if err != nil { return } - if ok { - if b.TxAmount <= 2 { + if ok && b != nil { + if txsAmount == 0 { block = types.NewBlockFromStorage(hash, h, nil, b.Uncles) if len(senders) != block.Transactions().Len() { return block, senders, nil // no senders is fine - will recover them on the fly @@ -417,33 +399,12 @@ func (back *BlockReaderWithSnapshots) BlockWithSenders(ctx context.Context, tx k block.SendersToTxs(senders) return block, senders, nil } - reader := bytes.NewReader(nil) - txs := make([]types.Transaction, b.TxAmount-2) - senders = make([]common.Address, b.TxAmount-2) + var txs []types.Transaction + var senders []common.Address ok, err = back.sn.ViewTxs(blockHeight, func(seg *TxnSegment) error { - if b.BaseTxId < seg.IdxTxnHash.BaseDataID() { - return fmt.Errorf(".idx file has wrong baseDataID? %d<%d, %s", b.BaseTxId, seg.IdxTxnHash.BaseDataID(), seg.Seg.FilePath()) - } - - txnOffset := seg.IdxTxnHash.Lookup2(b.BaseTxId - seg.IdxTxnHash.BaseDataID()) - gg := seg.Seg.MakeGetter() - gg.Reset(txnOffset) - stream := rlp.NewStream(reader, 0) - buf, _ = gg.Next(buf[:0]) //first system-tx - for i := uint32(0); i < b.TxAmount-2; i++ { - buf, _ = gg.Next(buf[:0]) - if len(buf) < 1+20 { - return fmt.Errorf("segment %s has too short record: len(buf)=%d < 21", seg.Seg.FilePath(), len(buf)) - } - senders[i].SetBytes(buf[1 : 1+20]) - txRlp := buf[1+20:] - reader.Reset(txRlp) - stream.Reset(reader, 0) - txs[i], err = types.DecodeTransaction(stream) - if err != nil { - return err - } - txs[i].SetSender(senders[i]) + txs, senders, err = back.txsFromSnapshot(baseTxnId, txsAmount, seg, buf) + if err != nil { + return err } return nil }) @@ -474,19 +435,22 @@ func (back *BlockReaderWithSnapshots) BlockWithSenders(ctx context.Context, tx k return rawdb.NonCanonicalBlockWithSenders(tx, hash, blockHeight) } -func (back *BlockReaderWithSnapshots) headerFromSnapshot(blockHeight uint64, sn *HeaderSegment, buf []byte) (*types.Header, error) { +func (back *BlockReaderWithSnapshots) headerFromSnapshot(blockHeight uint64, sn *HeaderSegment, buf []byte) (*types.Header, []byte, error) { + if sn.idxHeaderHash == nil { + return nil, buf, nil + } headerOffset := sn.idxHeaderHash.Lookup2(blockHeight - sn.idxHeaderHash.BaseDataID()) gg := sn.seg.MakeGetter() gg.Reset(headerOffset) buf, _ = gg.Next(buf[:0]) if len(buf) == 0 { - return nil, nil + return nil, buf, nil } h := &types.Header{} if err := rlp.DecodeBytes(buf[1:], h); err != nil { - return nil, err + return nil, buf, err } - return h, nil + return h, buf, nil } // headerFromSnapshotByHash - getting header by hash AND ensure that it has correct hash @@ -494,6 +458,9 @@ func (back *BlockReaderWithSnapshots) headerFromSnapshot(blockHeight uint64, sn // but because our indices are based on PerfectHashMap, no way to know is given key exists or not, only way - // to make sure is to fetch it and compare hash func (back *BlockReaderWithSnapshots) headerFromSnapshotByHash(hash common.Hash, sn *HeaderSegment, buf []byte) (*types.Header, error) { + if sn.idxHeaderHash == nil { + return nil, nil + } reader := recsplit.NewIndexReader(sn.idxHeaderHash) localID := reader.Lookup(hash[:]) headerOffset := sn.idxHeaderHash.Lookup2(localID) @@ -514,58 +481,75 @@ func (back *BlockReaderWithSnapshots) headerFromSnapshotByHash(hash common.Hash, return h, nil } -func (back *BlockReaderWithSnapshots) bodyFromSnapshot(blockHeight uint64, sn *BodySegment, buf []byte) (*types.Body, uint64, uint32, error) { +func (back *BlockReaderWithSnapshots) bodyFromSnapshot(blockHeight uint64, sn *BodySegment, buf []byte) (*types.Body, uint64, uint32, []byte, error) { + if sn.idxBodyNumber == nil { + return nil, 0, 0, buf, nil + } bodyOffset := sn.idxBodyNumber.Lookup2(blockHeight - sn.idxBodyNumber.BaseDataID()) gg := sn.seg.MakeGetter() gg.Reset(bodyOffset) buf, _ = gg.Next(buf[:0]) if len(buf) == 0 { - return nil, 0, 0, nil + return nil, 0, 0, buf, nil } b := &types.BodyForStorage{} reader := bytes.NewReader(buf) if err := rlp.Decode(reader, b); err != nil { - return nil, 0, 0, err + return nil, 0, 0, buf, err } if b.BaseTxId < sn.idxBodyNumber.BaseDataID() { - return nil, 0, 0, fmt.Errorf(".idx file has wrong baseDataID? %d<%d, %s", b.BaseTxId, sn.idxBodyNumber.BaseDataID(), sn.seg.FilePath()) + return nil, 0, 0, buf, fmt.Errorf(".idx file has wrong baseDataID? %d<%d, %s", b.BaseTxId, sn.idxBodyNumber.BaseDataID(), sn.seg.FilePath()) } body := new(types.Body) body.Uncles = b.Uncles - return body, b.BaseTxId + 1, b.TxAmount - 2, nil // empty txs in the beginning and end of block + var txsAmount uint32 + if b.TxAmount >= 2 { + txsAmount = b.TxAmount - 2 + } + return body, b.BaseTxId + 1, txsAmount, buf, nil // empty txs in the beginning and end of block } -func (back *BlockReaderWithSnapshots) txsFromSnapshot(baseTxnID uint64, txsAmount uint32, txsSeg *TxnSegment, buf []byte) ([]types.Transaction, []common.Address, error) { - txs := make([]types.Transaction, txsAmount) - senders := make([]common.Address, txsAmount) +func (back *BlockReaderWithSnapshots) txsFromSnapshot(baseTxnID uint64, txsAmount uint32, txsSeg *TxnSegment, buf []byte) (txs []types.Transaction, senders []common.Address, err error) { + if txsSeg.IdxTxnHash == nil { + return nil, nil, nil + } + if baseTxnID < txsSeg.IdxTxnHash.BaseDataID() { + return nil, nil, fmt.Errorf(".idx file has wrong baseDataID? %d<%d, %s", baseTxnID, txsSeg.IdxTxnHash.BaseDataID(), txsSeg.Seg.FilePath()) + } + + txs = make([]types.Transaction, txsAmount) + senders = make([]common.Address, txsAmount) reader := bytes.NewReader(buf) - if txsAmount > 0 { - txnOffset := txsSeg.IdxTxnHash.Lookup2(baseTxnID - txsSeg.IdxTxnHash.BaseDataID()) - gg := txsSeg.Seg.MakeGetter() - gg.Reset(txnOffset) - stream := rlp.NewStream(reader, 0) - for i := uint32(0); i < txsAmount; i++ { - buf, _ = gg.Next(buf[:0]) - senders[i].SetBytes(buf[1 : 1+20]) - txRlp := buf[1+20:] - reader.Reset(txRlp) - stream.Reset(reader, 0) - var err error - txs[i], err = types.DecodeTransaction(stream) - if err != nil { - return nil, nil, err - } + if txsAmount == 0 { + return txs, senders, nil + } + txnOffset := txsSeg.IdxTxnHash.Lookup2(baseTxnID - txsSeg.IdxTxnHash.BaseDataID()) + gg := txsSeg.Seg.MakeGetter() + gg.Reset(txnOffset) + stream := rlp.NewStream(reader, 0) + for i := uint32(0); i < txsAmount; i++ { + buf, _ = gg.Next(buf[:0]) + if len(buf) < 1+20 { + return nil, nil, fmt.Errorf("segment %s has too short record: len(buf)=%d < 21", txsSeg.Seg.FilePath(), len(buf)) } + senders[i].SetBytes(buf[1 : 1+20]) + txRlp := buf[1+20:] + reader.Reset(txRlp) + stream.Reset(reader, 0) + txs[i], err = types.DecodeTransaction(stream) + if err != nil { + return nil, nil, err + } + txs[i].SetSender(senders[i]) } return txs, senders, nil } func (back *BlockReaderWithSnapshots) txnByHash(txnHash common.Hash, segments []*TxnSegment, buf []byte) (txn types.Transaction, blockNum, txnID uint64, err error) { - for i := len(segments) - 1; i >= 0; i-- { sn := segments[i] if sn.IdxTxnHash == nil || sn.IdxTxnHash2BlockNum == nil { diff --git a/turbo/snapshotsync/block_snapshots.go b/turbo/snapshotsync/block_snapshots.go index 3bd2cb2b6c..c4b01056a5 100644 --- a/turbo/snapshotsync/block_snapshots.go +++ b/turbo/snapshotsync/block_snapshots.go @@ -297,7 +297,6 @@ func (s *RoSnapshots) SegmentsAvailable() uint64 { return s.segmentsAvailable.Lo func (s *RoSnapshots) BlocksAvailable() uint64 { return min(s.segmentsAvailable.Load(), s.idxAvailable.Load()) } - func (s *RoSnapshots) EnsureExpectedBlocksAreAvailable(cfg *snapshothashes.Config) error { if s.BlocksAvailable() < cfg.ExpectBlocks { return fmt.Errorf("app must wait until all expected snapshots are available. Expected: %d, Available: %d", cfg.ExpectBlocks, s.BlocksAvailable()) @@ -597,7 +596,7 @@ func (s *RoSnapshots) ViewTxs(blockNum uint64, f func(sn *TxnSegment) error) (fo return s.Txs.ViewSegment(blockNum, f) } -func BuildIndices(ctx context.Context, s *RoSnapshots, snapshotDir string, chainID uint256.Int, tmpDir string, from uint64, workers int, lvl log.Lvl) error { +func BuildIndices(ctx context.Context, s *RoSnapshots, chainID uint256.Int, tmpDir string, from uint64, workers int, lvl log.Lvl) error { log.Log(lvl, "[snapshots] Build indices", "from", from) logEvery := time.NewTicker(20 * time.Second) defer logEvery.Stop() @@ -618,7 +617,7 @@ func BuildIndices(ctx context.Context, s *RoSnapshots, snapshotDir string, chain <-workersCh }() - f := filepath.Join(snapshotDir, snap.SegmentFileName(blockFrom, blockTo, snap.Headers)) + f := filepath.Join(s.Dir(), snap.SegmentFileName(blockFrom, blockTo, snap.Headers)) errs <- HeadersIdx(ctx, f, blockFrom, tmpDir, lvl) select { case <-ctx.Done(): @@ -665,7 +664,7 @@ func BuildIndices(ctx context.Context, s *RoSnapshots, snapshotDir string, chain <-workersCh }() - f := filepath.Join(snapshotDir, snap.SegmentFileName(blockFrom, blockTo, snap.Bodies)) + f := filepath.Join(s.Dir(), snap.SegmentFileName(blockFrom, blockTo, snap.Bodies)) errs <- BodiesIdx(ctx, f, blockFrom, tmpDir, lvl) select { case <-ctx.Done(): @@ -720,7 +719,7 @@ func BuildIndices(ctx context.Context, s *RoSnapshots, snapshotDir string, chain wg.Done() <-workersCh }() - errs <- TransactionsIdx(ctx, chainID, blockFrom, blockTo, snapshotDir, tmpDir, lvl) + errs <- TransactionsIdx(ctx, chainID, blockFrom, blockTo, s.Dir(), tmpDir, lvl) select { case <-ctx.Done(): errs <- ctx.Err() @@ -846,11 +845,10 @@ type BlockRetire struct { wg *sync.WaitGroup result *BlockRetireResult - workers int - tmpDir string - snapshots *RoSnapshots - snapshotDir string - db kv.RoDB + workers int + tmpDir string + snapshots *RoSnapshots + db kv.RoDB downloader proto_downloader.DownloaderClient notifier DBEventNotifier @@ -861,8 +859,8 @@ type BlockRetireResult struct { Err error } -func NewBlockRetire(workers int, tmpDir string, snapshots *RoSnapshots, snapshotDir string, db kv.RoDB, downloader proto_downloader.DownloaderClient, notifier DBEventNotifier) *BlockRetire { - return &BlockRetire{workers: workers, tmpDir: tmpDir, snapshots: snapshots, snapshotDir: snapshotDir, wg: &sync.WaitGroup{}, db: db, downloader: downloader, notifier: notifier} +func NewBlockRetire(workers int, tmpDir string, snapshots *RoSnapshots, db kv.RoDB, downloader proto_downloader.DownloaderClient, notifier DBEventNotifier) *BlockRetire { + return &BlockRetire{workers: workers, tmpDir: tmpDir, snapshots: snapshots, wg: &sync.WaitGroup{}, db: db, downloader: downloader, notifier: notifier} } func (br *BlockRetire) Snapshots() *RoSnapshots { return br.snapshots } func (br *BlockRetire) Working() bool { return br.working.Load() } @@ -919,7 +917,7 @@ func (br *BlockRetire) RetireBlocksInBackground(ctx context.Context, blockFrom, defer br.working.Store(false) defer br.wg.Done() - err := retireBlocks(ctx, blockFrom, blockTo, chainID, br.tmpDir, br.snapshots, br.snapshotDir, br.db, br.workers, br.downloader, lvl, br.notifier) + err := retireBlocks(ctx, blockFrom, blockTo, chainID, br.tmpDir, br.snapshots, br.db, br.workers, br.downloader, lvl, br.notifier) br.result = &BlockRetireResult{ BlockFrom: blockFrom, BlockTo: blockTo, @@ -932,7 +930,7 @@ type DBEventNotifier interface { OnNewSnapshot() } -func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint256.Int, tmpDir string, snapshots *RoSnapshots, snapshotDir string, db kv.RoDB, workers int, downloader proto_downloader.DownloaderClient, lvl log.Lvl, notifier DBEventNotifier) error { +func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint256.Int, tmpDir string, snapshots *RoSnapshots, db kv.RoDB, workers int, downloader proto_downloader.DownloaderClient, lvl log.Lvl, notifier DBEventNotifier) error { log.Log(lvl, "[snapshots] Retire Blocks", "range", fmt.Sprintf("%dk-%dk", blockFrom/1000, blockTo/1000)) // in future we will do it in background if err := DumpBlocks(ctx, blockFrom, blockTo, snap.DEFAULT_SEGMENT_SIZE, tmpDir, snapshots.Dir(), db, workers, lvl); err != nil { @@ -945,7 +943,7 @@ func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint25 if idxWorkers > 4 { idxWorkers = 4 } - if err := BuildIndices(ctx, snapshots, snapshotDir, chainID, tmpDir, snapshots.IndicesAvailable(), idxWorkers, log.LvlInfo); err != nil { + if err := BuildIndices(ctx, snapshots, chainID, tmpDir, snapshots.IndicesAvailable(), idxWorkers, log.LvlInfo); err != nil { return err } merger := NewMerger(tmpDir, workers, lvl, chainID, notifier) @@ -953,7 +951,7 @@ func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint25 if len(ranges) == 0 { return nil } - err := merger.Merge(ctx, snapshots, ranges, snapshotDir, true) + err := merger.Merge(ctx, snapshots, ranges, snapshots.Dir(), true) if err != nil { return err } diff --git a/turbo/stages/mock_sentry.go b/turbo/stages/mock_sentry.go index ad665aedab..618b2320ee 100644 --- a/turbo/stages/mock_sentry.go +++ b/turbo/stages/mock_sentry.go @@ -283,23 +283,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey mock.Sync = stagedsync.New( stagedsync.DefaultStages(mock.Ctx, prune, - stagedsync.StageHeadersCfg( - mock.DB, - mock.downloader.Hd, - mock.downloader.Bd, - *mock.ChainConfig, - sendHeaderRequest, - propagateNewBlockHashes, - penalize, - cfg.BatchSize, - false, - allSnapshots, - snapshotsDownloader, - blockReader, - mock.tmpdir, - mock.snapshotDir, - mock.Notifications.Events, - ), + stagedsync.StageHeadersCfg(mock.DB, mock.downloader.Hd, mock.downloader.Bd, *mock.ChainConfig, sendHeaderRequest, propagateNewBlockHashes, penalize, cfg.BatchSize, false, allSnapshots, snapshotsDownloader, blockReader, mock.tmpdir, mock.Notifications.Events), stagedsync.StageCumulativeIndexCfg(mock.DB), stagedsync.StageBlockHashesCfg(mock.DB, mock.tmpdir, mock.ChainConfig), stagedsync.StageBodiesCfg( @@ -315,7 +299,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey blockReader, ), stagedsync.StageIssuanceCfg(mock.DB, mock.ChainConfig, blockReader, true), - stagedsync.StageSendersCfg(mock.DB, mock.ChainConfig, mock.tmpdir, prune, snapshotsync.NewBlockRetire(1, mock.tmpdir, allSnapshots, snapshotDir, mock.DB, snapshotsDownloader, mock.Notifications.Events)), + stagedsync.StageSendersCfg(mock.DB, mock.ChainConfig, mock.tmpdir, prune, snapshotsync.NewBlockRetire(1, mock.tmpdir, allSnapshots, mock.DB, snapshotsDownloader, mock.Notifications.Events)), stagedsync.StageExecuteBlocksCfg( mock.DB, prune, diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index c23f63a0cc..49bd108476 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -261,7 +261,7 @@ func NewStagedSync( } else { blockReader = snapshotsync.NewBlockReader() } - blockRetire := snapshotsync.NewBlockRetire(1, tmpdir, snapshots, snapshotDir, db, snapshotDownloader, notifications.Events) + blockRetire := snapshotsync.NewBlockRetire(1, tmpdir, snapshots, db, snapshotDownloader, notifications.Events) // During Import we don't want other services like header requests, body requests etc. to be running. // Hence we run it in the test mode. @@ -269,23 +269,7 @@ func NewStagedSync( isBor := controlServer.ChainConfig.Bor != nil return stagedsync.New( stagedsync.DefaultStages(ctx, cfg.Prune, - stagedsync.StageHeadersCfg( - db, - controlServer.Hd, - controlServer.Bd, - *controlServer.ChainConfig, - controlServer.SendHeaderRequest, - controlServer.PropagateNewBlockHashes, - controlServer.Penalize, - cfg.BatchSize, - p2pCfg.NoDiscovery, - snapshots, - snapshotDownloader, - blockReader, - tmpdir, - cfg.SnapshotDir, - notifications.Events, - ), + stagedsync.StageHeadersCfg(db, controlServer.Hd, controlServer.Bd, *controlServer.ChainConfig, controlServer.SendHeaderRequest, controlServer.PropagateNewBlockHashes, controlServer.Penalize, cfg.BatchSize, p2pCfg.NoDiscovery, snapshots, snapshotDownloader, blockReader, tmpdir, notifications.Events), stagedsync.StageCumulativeIndexCfg(db), stagedsync.StageBlockHashesCfg(db, tmpdir, controlServer.ChainConfig), stagedsync.StageBodiesCfg( -- GitLab