From 2655d5728864e38db5b6b58d06613d5248315e28 Mon Sep 17 00:00:00 2001 From: Alex Sharov <AskAlexSharov@gmail.com> Date: Thu, 28 Apr 2022 12:13:30 +0700 Subject: [PATCH] reduce downloader deps (#4010) * reduce downloader deps * reduce downloader deps --- cmd/downloader/downloader/util.go | 10 +- cmd/integration/commands/stages.go | 11 +- eth/backend.go | 4 +- turbo/app/snapshots.go | 3 +- turbo/snapshotsync/block_snapshots.go | 308 ++++-------------- turbo/snapshotsync/block_snapshots_test.go | 55 ++-- turbo/snapshotsync/snap/files.go | 202 ++++++++++++ .../{snapshotsynccli => snap}/flags.go | 2 +- 8 files changed, 304 insertions(+), 291 deletions(-) create mode 100644 turbo/snapshotsync/snap/files.go rename turbo/snapshotsync/{snapshotsynccli => snap}/flags.go (97%) diff --git a/cmd/downloader/downloader/util.go b/cmd/downloader/downloader/util.go index e57add8279..644e57fa5d 100644 --- a/cmd/downloader/downloader/util.go +++ b/cmd/downloader/downloader/util.go @@ -21,7 +21,7 @@ import ( "github.com/ledgerwatch/erigon-lib/common/dir" "github.com/ledgerwatch/erigon/cmd/downloader/downloader/torrentcfg" "github.com/ledgerwatch/erigon/cmd/downloader/trackers" - "github.com/ledgerwatch/erigon/turbo/snapshotsync" + "github.com/ledgerwatch/erigon/turbo/snapshotsync/snap" "github.com/ledgerwatch/log/v3" "golang.org/x/sync/semaphore" ) @@ -54,7 +54,7 @@ func AllTorrentFiles(dir string) ([]string, error) { } var res []string for _, f := range files { - if !snapshotsync.IsCorrectFileName(f.Name()) { + if !snap.IsCorrectFileName(f.Name()) { continue } fileInfo, err := f.Info() @@ -78,7 +78,7 @@ func allSegmentFiles(dir string) ([]string, error) { } var res []string for _, f := range files { - if !snapshotsync.IsCorrectFileName(f.Name()) { + if !snap.IsCorrectFileName(f.Name()) { continue } fileInfo, err := f.Info() @@ -98,11 +98,11 @@ func allSegmentFiles(dir string) ([]string, error) { // BuildTorrentFileIfNeed - create .torrent files from .seg files (big IO) - if .seg files were added manually func BuildTorrentFileIfNeed(ctx context.Context, originalFileName string, root *dir.Rw) (ok bool, err error) { - f, err := snapshotsync.ParseFileName(root.Path, originalFileName) + f, err := snap.ParseFileName(root.Path, originalFileName) if err != nil { return false, err } - if f.To-f.From != snapshotsync.DEFAULT_SEGMENT_SIZE { + if f.To-f.From != snap.DEFAULT_SEGMENT_SIZE { return false, nil } torrentFilePath := filepath.Join(root.Path, originalFileName+".torrent") diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index 8318260036..29e1400ae8 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -15,11 +15,6 @@ import ( "github.com/ledgerwatch/erigon-lib/common/dir" "github.com/ledgerwatch/erigon-lib/etl" "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon/turbo/snapshotsync/snapshotsynccli" - "github.com/ledgerwatch/log/v3" - "github.com/ledgerwatch/secp256k1" - "github.com/spf13/cobra" - "github.com/ledgerwatch/erigon/cmd/rpcdaemon/interfaces" "github.com/ledgerwatch/erigon/cmd/sentry/sentry" "github.com/ledgerwatch/erigon/common/dbutils" @@ -39,7 +34,11 @@ import ( "github.com/ledgerwatch/erigon/p2p" "github.com/ledgerwatch/erigon/params" "github.com/ledgerwatch/erigon/turbo/snapshotsync" + "github.com/ledgerwatch/erigon/turbo/snapshotsync/snap" stages2 "github.com/ledgerwatch/erigon/turbo/stages" + "github.com/ledgerwatch/log/v3" + "github.com/ledgerwatch/secp256k1" + "github.com/spf13/cobra" ) var cmdStageHeaders = &cobra.Command{ @@ -296,7 +295,7 @@ var cmdSetSnapshto = &cobra.Command{ snCfg = allSnapshots(chainConfig).Cfg() } if err := db.Update(context.Background(), func(tx kv.RwTx) error { - return snapshotsynccli.ForceSetFlags(tx, snCfg) + return snap.ForceSetFlags(tx, snCfg) }); err != nil { return err } diff --git a/eth/backend.go b/eth/backend.go index d2a665dc77..4f6e562862 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -79,7 +79,7 @@ import ( "github.com/ledgerwatch/erigon/rpc" "github.com/ledgerwatch/erigon/turbo/shards" "github.com/ledgerwatch/erigon/turbo/snapshotsync" - "github.com/ledgerwatch/erigon/turbo/snapshotsync/snapshotsynccli" + "github.com/ledgerwatch/erigon/turbo/snapshotsync/snap" stages2 "github.com/ledgerwatch/erigon/turbo/stages" "github.com/ledgerwatch/log/v3" "google.golang.org/grpc" @@ -245,7 +245,7 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l if err != nil { return err } - if err := snapshotsynccli.EnsureNotChanged(tx, config.Snapshot); err != nil { + if err := snap.EnsureNotChanged(tx, config.Snapshot); err != nil { return err } log.Info("Effective", "prune_flags", config.Prune.String(), "snapshot_flags", config.Snapshot.String()) diff --git a/turbo/app/snapshots.go b/turbo/app/snapshots.go index b21aff1e52..384947150e 100644 --- a/turbo/app/snapshots.go +++ b/turbo/app/snapshots.go @@ -26,6 +26,7 @@ import ( "github.com/ledgerwatch/erigon/internal/debug" "github.com/ledgerwatch/erigon/params" "github.com/ledgerwatch/erigon/turbo/snapshotsync" + "github.com/ledgerwatch/erigon/turbo/snapshotsync/snap" "github.com/ledgerwatch/log/v3" "github.com/urfave/cli" ) @@ -106,7 +107,7 @@ var ( SnapshotSegmentSizeFlag = cli.Uint64Flag{ Name: "segment.size", Usage: "Amount of blocks in each segment", - Value: snapshotsync.DEFAULT_SEGMENT_SIZE, + Value: snap.DEFAULT_SEGMENT_SIZE, } SnapshotRebuildFlag = cli.BoolFlag{ Name: "rebuild", diff --git a/turbo/snapshotsync/block_snapshots.go b/turbo/snapshotsync/block_snapshots.go index fea15329ba..1e91885473 100644 --- a/turbo/snapshotsync/block_snapshots.go +++ b/turbo/snapshotsync/block_snapshots.go @@ -6,13 +6,11 @@ import ( "encoding/binary" "errors" "fmt" - "io/fs" "os" "path" "path/filepath" "runtime" "sort" - "strconv" "strings" "sync" "time" @@ -34,6 +32,7 @@ import ( "github.com/ledgerwatch/erigon/eth/ethconfig" "github.com/ledgerwatch/erigon/params" "github.com/ledgerwatch/erigon/rlp" + "github.com/ledgerwatch/erigon/turbo/snapshotsync/snap" "github.com/ledgerwatch/erigon/turbo/snapshotsync/snapshothashes" "github.com/ledgerwatch/log/v3" "go.uber.org/atomic" @@ -52,62 +51,6 @@ type BlocksSnapshot struct { From, To uint64 // [from,to) } -type Type int - -const ( - Headers Type = iota - Bodies - Transactions - NumberOfTypes -) - -func (ft Type) String() string { - switch ft { - case Headers: - return "headers" - case Bodies: - return "bodies" - case Transactions: - return "transactions" - default: - panic(fmt.Sprintf("unknown file type: %d", ft)) - } -} - -func ParseFileType(s string) (Type, bool) { - switch s { - case "headers": - return Headers, true - case "bodies": - return Bodies, true - case "transactions": - return Transactions, true - default: - return NumberOfTypes, false - } -} - -type IdxType string - -const ( - Transactions2Block IdxType = "transactions-to-block" -) - -func (it IdxType) String() string { return string(it) } - -var AllSnapshotTypes = []Type{Headers, Bodies, Transactions} - -var ( - ErrInvalidFileName = fmt.Errorf("invalid compressed file name") -) - -func FileName(from, to uint64, fileType string) string { - return fmt.Sprintf("v1-%06d-%06d-%s", from/1_000, to/1_000, fileType) -} -func SegmentFileName(from, to uint64, t Type) string { return FileName(from, to, t.String()) + ".seg" } -func DatFileName(from, to uint64, fType string) string { return FileName(from, to, fType) + ".dat" } -func IdxFileName(from, to uint64, fType string) string { return FileName(from, to, fType) + ".idx" } - func (s BlocksSnapshot) Has(block uint64) bool { return block >= s.From && block < s.To } type HeaderSegment struct { @@ -128,12 +71,12 @@ func (sn *HeaderSegment) close() { } func (sn *HeaderSegment) reopen(dir string) (err error) { sn.close() - fileName := SegmentFileName(sn.From, sn.To, Headers) + fileName := snap.SegmentFileName(sn.From, sn.To, snap.Headers) sn.seg, err = compress.NewDecompressor(path.Join(dir, fileName)) if err != nil { return err } - sn.idxHeaderHash, err = recsplit.OpenIndex(path.Join(dir, IdxFileName(sn.From, sn.To, Headers.String()))) + sn.idxHeaderHash, err = recsplit.OpenIndex(path.Join(dir, snap.IdxFileName(sn.From, sn.To, snap.Headers.String()))) if err != nil { return err } @@ -158,12 +101,12 @@ func (sn *BodySegment) close() { } func (sn *BodySegment) reopen(dir string) (err error) { sn.close() - fileName := SegmentFileName(sn.From, sn.To, Bodies) + fileName := snap.SegmentFileName(sn.From, sn.To, snap.Bodies) sn.seg, err = compress.NewDecompressor(path.Join(dir, fileName)) if err != nil { return err } - sn.idxBodyNumber, err = recsplit.OpenIndex(path.Join(dir, IdxFileName(sn.From, sn.To, Bodies.String()))) + sn.idxBodyNumber, err = recsplit.OpenIndex(path.Join(dir, snap.IdxFileName(sn.From, sn.To, snap.Bodies.String()))) if err != nil { return err } @@ -193,16 +136,16 @@ func (sn *TxnSegment) close() { } func (sn *TxnSegment) reopen(dir string) (err error) { sn.close() - fileName := SegmentFileName(sn.From, sn.To, Transactions) + fileName := snap.SegmentFileName(sn.From, sn.To, snap.Transactions) sn.Seg, err = compress.NewDecompressor(path.Join(dir, fileName)) if err != nil { return err } - sn.IdxTxnHash, err = recsplit.OpenIndex(path.Join(dir, IdxFileName(sn.From, sn.To, Transactions.String()))) + sn.IdxTxnHash, err = recsplit.OpenIndex(path.Join(dir, snap.IdxFileName(sn.From, sn.To, snap.Transactions.String()))) if err != nil { return err } - sn.IdxTxnHash2BlockNum, err = recsplit.OpenIndex(path.Join(dir, IdxFileName(sn.From, sn.To, Transactions2Block.String()))) + sn.IdxTxnHash2BlockNum, err = recsplit.OpenIndex(path.Join(dir, snap.IdxFileName(sn.From, sn.To, snap.Transactions2Block.String()))) if err != nil { return err } @@ -394,10 +337,10 @@ func (s *RoSnapshots) idxAvailability() uint64 { } func (s *RoSnapshots) ReopenIndices() error { - return s.ReopenSomeIndices(AllSnapshotTypes...) + return s.ReopenSomeIndices(snap.AllSnapshotTypes...) } -func (s *RoSnapshots) ReopenSomeIndices(types ...Type) (err error) { +func (s *RoSnapshots) ReopenSomeIndices(types ...snap.Type) (err error) { s.Headers.lock.Lock() defer s.Headers.lock.Unlock() s.Bodies.lock.Lock() @@ -407,15 +350,15 @@ func (s *RoSnapshots) ReopenSomeIndices(types ...Type) (err error) { for _, t := range types { switch t { - case Headers: + case snap.Headers: if err := s.Headers.reopen(s.dir); err != nil { return err } - case Bodies: + case snap.Bodies: if err := s.Bodies.reopen(s.dir); err != nil { return err } - case Transactions: + case snap.Transactions: if err := s.Txs.reopen(s.dir); err != nil { return err } @@ -437,7 +380,7 @@ func (s *RoSnapshots) AsyncOpenAll(ctx context.Context) { return default: } - if err := s.Reopen(); err != nil && !errors.Is(err, os.ErrNotExist) && !errors.Is(err, ErrSnapshotMissed) { + if err := s.Reopen(); err != nil && !errors.Is(err, os.ErrNotExist) && !errors.Is(err, snap.ErrSnapshotMissed) { log.Error("AsyncOpenAll", "err", err) } time.Sleep(15 * time.Second) @@ -460,7 +403,7 @@ func (s *RoSnapshots) Reopen() error { for _, f := range files { { seg := &BodySegment{From: f.From, To: f.To} - fileName := SegmentFileName(f.From, f.To, Bodies) + fileName := snap.SegmentFileName(f.From, f.To, snap.Bodies) seg.seg, err = compress.NewDecompressor(path.Join(s.dir, fileName)) if err != nil { if errors.Is(err, os.ErrNotExist) { @@ -472,7 +415,7 @@ func (s *RoSnapshots) Reopen() error { } { seg := &HeaderSegment{From: f.From, To: f.To} - fileName := SegmentFileName(f.From, f.To, Headers) + fileName := snap.SegmentFileName(f.From, f.To, snap.Headers) seg.seg, err = compress.NewDecompressor(path.Join(s.dir, fileName)) if err != nil { if errors.Is(err, os.ErrNotExist) { @@ -484,7 +427,7 @@ func (s *RoSnapshots) Reopen() error { } { seg := &TxnSegment{From: f.From, To: f.To} - fileName := SegmentFileName(f.From, f.To, Transactions) + fileName := snap.SegmentFileName(f.From, f.To, snap.Transactions) seg.Seg, err = compress.NewDecompressor(path.Join(s.dir, fileName)) if err != nil { if errors.Is(err, os.ErrNotExist) { @@ -504,23 +447,23 @@ func (s *RoSnapshots) Reopen() error { s.segmentsReady.Store(true) for _, sn := range s.Headers.segments { - sn.idxHeaderHash, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(sn.From, sn.To, Headers.String()))) + sn.idxHeaderHash, err = recsplit.OpenIndex(path.Join(s.dir, snap.IdxFileName(sn.From, sn.To, snap.Headers.String()))) if err != nil && !errors.Is(err, os.ErrNotExist) { return err } } for _, sn := range s.Bodies.segments { - sn.idxBodyNumber, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(sn.From, sn.To, Bodies.String()))) + sn.idxBodyNumber, err = recsplit.OpenIndex(path.Join(s.dir, snap.IdxFileName(sn.From, sn.To, snap.Bodies.String()))) if err != nil && !errors.Is(err, os.ErrNotExist) { return err } } for _, sn := range s.Txs.segments { - sn.IdxTxnHash, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(sn.From, sn.To, Transactions.String()))) + sn.IdxTxnHash, err = recsplit.OpenIndex(path.Join(s.dir, snap.IdxFileName(sn.From, sn.To, snap.Transactions.String()))) if err != nil && !errors.Is(err, os.ErrNotExist) { return err } - sn.IdxTxnHash2BlockNum, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(sn.From, sn.To, Transactions2Block.String()))) + sn.IdxTxnHash2BlockNum, err = recsplit.OpenIndex(path.Join(s.dir, snap.IdxFileName(sn.From, sn.To, snap.Transactions2Block.String()))) if err != nil && !errors.Is(err, os.ErrNotExist) { return err } @@ -546,7 +489,7 @@ func (s *RoSnapshots) ReopenSegments() error { for _, f := range files { { seg := &BodySegment{From: f.From, To: f.To} - fileName := SegmentFileName(f.From, f.To, Bodies) + fileName := snap.SegmentFileName(f.From, f.To, snap.Bodies) seg.seg, err = compress.NewDecompressor(path.Join(s.dir, fileName)) if err != nil { if errors.Is(err, os.ErrNotExist) { @@ -559,7 +502,7 @@ func (s *RoSnapshots) ReopenSegments() error { { fmt.Printf("reopen segment: %d-%d\n", f.From, f.To) seg := &HeaderSegment{From: f.From, To: f.To} - fileName := SegmentFileName(f.From, f.To, Headers) + fileName := snap.SegmentFileName(f.From, f.To, snap.Headers) seg.seg, err = compress.NewDecompressor(path.Join(s.dir, fileName)) if err != nil { if errors.Is(err, os.ErrNotExist) { @@ -571,7 +514,7 @@ func (s *RoSnapshots) ReopenSegments() error { } { seg := &TxnSegment{From: f.From, To: f.To} - fileName := SegmentFileName(f.From, f.To, Transactions) + fileName := snap.SegmentFileName(f.From, f.To, snap.Transactions) seg.Seg, err = compress.NewDecompressor(path.Join(s.dir, fileName)) if err != nil { if errors.Is(err, os.ErrNotExist) { @@ -676,7 +619,7 @@ func BuildIndices(ctx context.Context, s *RoSnapshots, snapshotDir *dir.Rw, chai <-workersCh }() - f := filepath.Join(snapshotDir.Path, SegmentFileName(blockFrom, blockTo, Headers)) + f := filepath.Join(snapshotDir.Path, snap.SegmentFileName(blockFrom, blockTo, snap.Headers)) errs <- HeadersIdx(ctx, f, blockFrom, tmpDir, lvl) select { case <-ctx.Done(): @@ -723,7 +666,7 @@ func BuildIndices(ctx context.Context, s *RoSnapshots, snapshotDir *dir.Rw, chai <-workersCh }() - f := filepath.Join(snapshotDir.Path, SegmentFileName(blockFrom, blockTo, Bodies)) + f := filepath.Join(snapshotDir.Path, snap.SegmentFileName(blockFrom, blockTo, snap.Bodies)) errs <- BodiesIdx(ctx, f, blockFrom, tmpDir, lvl) select { case <-ctx.Done(): @@ -753,7 +696,7 @@ func BuildIndices(ctx context.Context, s *RoSnapshots, snapshotDir *dir.Rw, chai return err } // hack to read first block body - to get baseTxId from there - if err := s.ReopenSomeIndices(Headers, Bodies); err != nil { + if err := s.ReopenSomeIndices(snap.Headers, snap.Bodies); err != nil { return err } if err := s.Txs.View(func(segments []*TxnSegment) error { @@ -812,100 +755,29 @@ func BuildIndices(ctx context.Context, s *RoSnapshots, snapshotDir *dir.Rw, chai return nil } -// FileInfo - parsed file metadata -type FileInfo struct { - _ fs.FileInfo - Version uint8 - From, To uint64 - Path, Ext string - T Type -} - -func IdxFiles(dir string) (res []FileInfo, err error) { return filesWithExt(dir, ".idx") } -func Segments(dir string) (res []FileInfo, err error) { return filesWithExt(dir, ".seg") } -func TmpFiles(dir string) (res []string, err error) { - files, err := os.ReadDir(dir) - if err != nil { - return nil, err - } - for _, f := range files { - if f.IsDir() || len(f.Name()) < 3 { - continue - } - if filepath.Ext(f.Name()) != ".tmp" { - continue - } - res = append(res, filepath.Join(dir, f.Name())) - } - return res, nil -} - -var ErrSnapshotMissed = fmt.Errorf("snapshot missed") - -func noGaps(in []FileInfo) (out []FileInfo, err error) { +func noGaps(in []snap.FileInfo) (out []snap.FileInfo, err error) { var prevTo uint64 for _, f := range in { if f.To <= prevTo { continue } if f.From != prevTo { // no gaps - return nil, fmt.Errorf("%w: from %d to %d", ErrSnapshotMissed, prevTo, f.From) + return nil, fmt.Errorf("%w: from %d to %d", snap.ErrSnapshotMissed, prevTo, f.From) } prevTo = f.To out = append(out, f) } return out, nil } -func parseDir(dir string) (res []FileInfo, err error) { - files, err := os.ReadDir(dir) - if err != nil { - return nil, err - } - for _, f := range files { - fileInfo, err := f.Info() - if err != nil { - return nil, err - } - if f.IsDir() || fileInfo.Size() == 0 || len(f.Name()) < 3 { - continue - } - - meta, err := ParseFileName(dir, f.Name()) - if err != nil { - if errors.Is(err, ErrInvalidFileName) { - continue - } - return nil, err - } - res = append(res, meta) - } - sort.Slice(res, func(i, j int) bool { - if res[i].Version != res[j].Version { - return res[i].Version < res[j].Version - } - if res[i].From != res[j].From { - return res[i].From < res[j].From - } - if res[i].To != res[j].To { - return res[i].To < res[j].To - } - if res[i].T != res[j].T { - return res[i].T < res[j].T - } - return res[i].Ext < res[j].Ext - }) - - return res, nil -} -func allTypeOfSegmentsMustExist(dir string, in []FileInfo) (res []FileInfo) { +func allTypeOfSegmentsMustExist(dir string, in []snap.FileInfo) (res []snap.FileInfo) { MainLoop: for _, f := range in { if f.From == f.To { continue } - for _, t := range AllSnapshotTypes { - p := filepath.Join(dir, SegmentFileName(f.From, f.To, t)) + for _, t := range snap.AllSnapshotTypes { + p := filepath.Join(dir, snap.SegmentFileName(f.From, f.To, t)) if _, err := os.Stat(p); err != nil { if errors.Is(err, os.ErrNotExist) { continue MainLoop @@ -919,7 +791,7 @@ MainLoop: } // noOverlaps - keep largest ranges and avoid overlap -func noOverlaps(in []FileInfo) (res []FileInfo) { +func noOverlaps(in []snap.FileInfo) (res []snap.FileInfo) { for i := range in { f := in[i] if f.From == f.To { @@ -943,13 +815,13 @@ func noOverlaps(in []FileInfo) (res []FileInfo) { return res } -func segments2(dir string) (res []FileInfo, err error) { - list, err := Segments(dir) +func segments2(dir string) (res []snap.FileInfo, err error) { + list, err := snap.Segments(dir) if err != nil { return nil, err } for _, f := range list { - if f.T != Headers { + if f.T != snap.Headers { continue } res = append(res, f) @@ -957,72 +829,10 @@ func segments2(dir string) (res []FileInfo, err error) { return noGaps(noOverlaps(allTypeOfSegmentsMustExist(dir, res))) } -func filterExt(in []FileInfo, expectExt string) (out []FileInfo) { - for _, f := range in { - if f.Ext != expectExt { // filter out only compressed files - continue - } - out = append(out, f) - } - return out -} -func filesWithExt(dir, expectExt string) ([]FileInfo, error) { - files, err := parseDir(dir) - if err != nil { - return nil, err - } - return filterExt(files, expectExt), nil -} - -func IsCorrectFileName(name string) bool { - parts := strings.Split(name, "-") - return len(parts) == 4 && parts[3] != "v1" -} - -func ParseFileName(dir, fileName string) (res FileInfo, err error) { - ext := filepath.Ext(fileName) - onlyName := fileName[:len(fileName)-len(ext)] - parts := strings.Split(onlyName, "-") - if len(parts) < 4 { - return res, fmt.Errorf("expected format: v1-001500-002000-bodies.seg got: %s. %w", fileName, ErrInvalidFileName) - } - if parts[0] != "v1" { - return res, fmt.Errorf("version: %s. %w", parts[0], ErrInvalidFileName) - } - from, err := strconv.ParseUint(parts[1], 10, 64) - if err != nil { - return - } - to, err := strconv.ParseUint(parts[2], 10, 64) - if err != nil { - return - } - var snapshotType Type - ft, ok := ParseFileType(parts[3]) - if !ok { - return res, fmt.Errorf("unexpected snapshot suffix: %s,%w", parts[2], ErrInvalidFileName) - } - switch ft { - case Headers: - snapshotType = Headers - case Bodies: - snapshotType = Bodies - case Transactions: - snapshotType = Transactions - default: - return res, fmt.Errorf("unexpected snapshot suffix: %s,%w", parts[2], ErrInvalidFileName) - } - return FileInfo{From: from * 1_000, To: to * 1_000, Path: filepath.Join(dir, fileName), T: snapshotType, Ext: ext}, nil -} - -const MERGE_THRESHOLD = 2 // don't trigger merge if have too small amount of partial segments -const DEFAULT_SEGMENT_SIZE = 500_000 -const MIN_SEGMENT_SIZE = 1_000 - func chooseSegmentEnd(from, to, blocksPerFile uint64) uint64 { next := (from/blocksPerFile + 1) * blocksPerFile to = min(next, to) - return to - (to % MIN_SEGMENT_SIZE) // round down to the nearest 1k + return to - (to % snap.MIN_SEGMENT_SIZE) // round down to the nearest 1k } func min(a, b uint64) uint64 { @@ -1126,7 +936,7 @@ type DBEventNotifier interface { func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint256.Int, tmpDir string, snapshots *RoSnapshots, rwSnapshotDir *dir.Rw, db kv.RoDB, workers int, downloader proto_downloader.DownloaderClient, lvl log.Lvl, notifier DBEventNotifier) error { log.Log(lvl, "[snapshots] Retire Blocks", "range", fmt.Sprintf("%dk-%dk", blockFrom/1000, blockTo/1000)) // in future we will do it in background - if err := DumpBlocks(ctx, blockFrom, blockTo, DEFAULT_SEGMENT_SIZE, tmpDir, snapshots.Dir(), db, workers, lvl); err != nil { + if err := DumpBlocks(ctx, blockFrom, blockTo, snap.DEFAULT_SEGMENT_SIZE, tmpDir, snapshots.Dir(), db, workers, lvl); err != nil { return fmt.Errorf("DumpBlocks: %w", err) } if err := snapshots.Reopen(); err != nil { @@ -1149,14 +959,14 @@ func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint25 return err } // start seed large .seg of large size - req := &proto_downloader.DownloadRequest{Items: make([]*proto_downloader.DownloadItem, 0, len(AllSnapshotTypes))} + req := &proto_downloader.DownloadRequest{Items: make([]*proto_downloader.DownloadItem, 0, len(snap.AllSnapshotTypes))} for _, r := range ranges { - if r.to-r.from != DEFAULT_SEGMENT_SIZE { + if r.to-r.from != snap.DEFAULT_SEGMENT_SIZE { continue } - for _, t := range AllSnapshotTypes { + for _, t := range snap.AllSnapshotTypes { req.Items = append(req.Items, &proto_downloader.DownloadItem{ - Path: SegmentFileName(r.from, r.to, t), + Path: snap.SegmentFileName(r.from, r.to, t), }) } } @@ -1180,17 +990,17 @@ func DumpBlocks(ctx context.Context, blockFrom, blockTo, blocksPerFile uint64, t return nil } func dumpBlocksRange(ctx context.Context, blockFrom, blockTo uint64, tmpDir, snapshotDir string, chainDB kv.RoDB, workers int, lvl log.Lvl) error { - segmentFile := filepath.Join(snapshotDir, SegmentFileName(blockFrom, blockTo, Transactions)) + segmentFile := filepath.Join(snapshotDir, snap.SegmentFileName(blockFrom, blockTo, snap.Transactions)) if _, err := DumpTxs(ctx, chainDB, segmentFile, tmpDir, blockFrom, blockTo, workers, lvl); err != nil { return fmt.Errorf("DumpTxs: %w", err) } - segmentFile = filepath.Join(snapshotDir, SegmentFileName(blockFrom, blockTo, Bodies)) + segmentFile = filepath.Join(snapshotDir, snap.SegmentFileName(blockFrom, blockTo, snap.Bodies)) if err := DumpBodies(ctx, chainDB, segmentFile, tmpDir, blockFrom, blockTo, workers, lvl); err != nil { return fmt.Errorf("DumpBodies: %w", err) } - segmentFile = filepath.Join(snapshotDir, SegmentFileName(blockFrom, blockTo, Headers)) + segmentFile = filepath.Join(snapshotDir, snap.SegmentFileName(blockFrom, blockTo, snap.Headers)) if err := DumpHeaders(ctx, chainDB, segmentFile, tmpDir, blockFrom, blockTo, workers, lvl); err != nil { return fmt.Errorf("DumpHeaders: %w", err) } @@ -1493,7 +1303,7 @@ func TransactionsIdx(ctx context.Context, chainID uint256.Int, blockFrom, blockT var expectedCount, firstTxID uint64 firstBlockNum := blockFrom - bodySegmentPath := filepath.Join(snapshotDir.Path, SegmentFileName(blockFrom, blockTo, Bodies)) + bodySegmentPath := filepath.Join(snapshotDir.Path, snap.SegmentFileName(blockFrom, blockTo, snap.Bodies)) bodiesSegment, err := compress.NewDecompressor(bodySegmentPath) if err != nil { return err @@ -1509,7 +1319,7 @@ func TransactionsIdx(ctx context.Context, chainID uint256.Int, blockFrom, blockT } firstTxID = firstBody.BaseTxId - bodyIdxPath := filepath.Join(snapshotDir.Path, IdxFileName(blockFrom, blockTo, Bodies.String())) + bodyIdxPath := filepath.Join(snapshotDir.Path, snap.IdxFileName(blockFrom, blockTo, snap.Bodies.String())) idx, err := recsplit.OpenIndex(bodyIdxPath) if err != nil { return err @@ -1529,7 +1339,7 @@ func TransactionsIdx(ctx context.Context, chainID uint256.Int, blockFrom, blockT idx.Close() } - segmentFilePath := filepath.Join(snapshotDir.Path, SegmentFileName(blockFrom, blockTo, Transactions)) + segmentFilePath := filepath.Join(snapshotDir.Path, snap.SegmentFileName(blockFrom, blockTo, snap.Transactions)) d, err := compress.NewDecompressor(segmentFilePath) if err != nil { return err @@ -1542,7 +1352,7 @@ func TransactionsIdx(ctx context.Context, chainID uint256.Int, blockFrom, blockT BucketSize: 2000, LeafSize: 8, TmpDir: tmpDir, - IndexFile: filepath.Join(snapshotDir.Path, IdxFileName(blockFrom, blockTo, Transactions.String())), + IndexFile: filepath.Join(snapshotDir.Path, snap.IdxFileName(blockFrom, blockTo, snap.Transactions.String())), BaseDataID: firstTxID, }) if err != nil { @@ -1554,7 +1364,7 @@ func TransactionsIdx(ctx context.Context, chainID uint256.Int, blockFrom, blockT BucketSize: 2000, LeafSize: 8, TmpDir: tmpDir, - IndexFile: filepath.Join(snapshotDir.Path, IdxFileName(blockFrom, blockTo, Transactions2Block.String())), + IndexFile: filepath.Join(snapshotDir.Path, snap.IdxFileName(blockFrom, blockTo, snap.Transactions2Block.String())), BaseDataID: firstBlockNum, }) if err != nil { @@ -1834,7 +1644,7 @@ func (r mergeRange) String() string { return fmt.Sprintf("%dk-%dk", r.from/1000, func (*Merger) FindMergeRanges(snapshots *RoSnapshots) (res []mergeRange) { for i := len(snapshots.Headers.segments) - 1; i > 0; i-- { sn := snapshots.Headers.segments[i] - if sn.To-sn.From >= DEFAULT_SEGMENT_SIZE { // is complete .seg + if sn.To-sn.From >= snap.DEFAULT_SEGMENT_SIZE { // is complete .seg continue } @@ -1894,7 +1704,7 @@ func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, mergeRanges return err } { - segFilePath := filepath.Join(snapshotDir.Path, SegmentFileName(r.from, r.to, Bodies)) + segFilePath := filepath.Join(snapshotDir.Path, snap.SegmentFileName(r.from, r.to, snap.Bodies)) if err := m.merge(ctx, toMergeBodies, segFilePath, logEvery); err != nil { return fmt.Errorf("mergeByAppendSegments: %w", err) } @@ -1906,7 +1716,7 @@ func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, mergeRanges } { - segFilePath := filepath.Join(snapshotDir.Path, SegmentFileName(r.from, r.to, Headers)) + segFilePath := filepath.Join(snapshotDir.Path, snap.SegmentFileName(r.from, r.to, snap.Headers)) if err := m.merge(ctx, toMergeHeaders, segFilePath, logEvery); err != nil { return fmt.Errorf("mergeByAppendSegments: %w", err) } @@ -1918,7 +1728,7 @@ func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, mergeRanges } { - segFilePath := filepath.Join(snapshotDir.Path, SegmentFileName(r.from, r.to, Transactions)) + segFilePath := filepath.Join(snapshotDir.Path, snap.SegmentFileName(r.from, r.to, snap.Transactions)) if err := m.merge(ctx, toMergeTxs, segFilePath, logEvery); err != nil { return fmt.Errorf("mergeByAppendSegments: %w", err) } @@ -2008,12 +1818,12 @@ func (m *Merger) removeOldFiles(toDel []string, snapshotsDir *dir.Rw) error { ext := filepath.Ext(f) withoutExt := f[:len(f)-len(ext)] _ = os.Remove(withoutExt + ".idx") - if strings.HasSuffix(withoutExt, Transactions.String()) { + if strings.HasSuffix(withoutExt, snap.Transactions.String()) { _ = os.Remove(withoutExt + "-to-block.idx") _ = os.Remove(withoutExt + "-id.idx") } } - tmpFiles, err := TmpFiles(snapshotsDir.Path) + tmpFiles, err := snap.TmpFiles(snapshotsDir.Path) if err != nil { return err } @@ -2030,11 +1840,11 @@ func assertAllSegments(blocks []*BlocksSnapshot, root string) { wg.Add(1) go func(sn *BlocksSnapshot) { defer wg.Done() - f := filepath.Join(root, SegmentFileName(sn.From, sn.To, Headers)) + f := filepath.Join(root, snap.SegmentFileName(sn.From, sn.To, snap.Headers)) assertSegment(f) - f = filepath.Join(root, SegmentFileName(sn.From, sn.To, Bodies)) + f = filepath.Join(root, snap.SegmentFileName(sn.From, sn.To, snap.Bodies)) assertSegment(f) - f = filepath.Join(root, SegmentFileName(sn.From, sn.To, Transactions)) + f = filepath.Join(root, snap.SegmentFileName(sn.From, sn.To, snap.Transactions)) assertSegment(f) fmt.Printf("done:%s\n", f) }(sn) diff --git a/turbo/snapshotsync/block_snapshots_test.go b/turbo/snapshotsync/block_snapshots_test.go index 5f6e8122fc..2929f898c5 100644 --- a/turbo/snapshotsync/block_snapshots_test.go +++ b/turbo/snapshotsync/block_snapshots_test.go @@ -13,13 +13,14 @@ import ( "github.com/ledgerwatch/erigon/common/math" "github.com/ledgerwatch/erigon/eth/ethconfig" "github.com/ledgerwatch/erigon/params/networkname" + "github.com/ledgerwatch/erigon/turbo/snapshotsync/snap" "github.com/ledgerwatch/erigon/turbo/snapshotsync/snapshothashes" "github.com/ledgerwatch/log/v3" "github.com/stretchr/testify/require" ) -func createTestSegmentFile(t *testing.T, from, to uint64, name Type, dir string) { - c, err := compress.NewCompressor(context.Background(), "test", filepath.Join(dir, SegmentFileName(from, to, name)), dir, 100, 1, log.LvlDebug) +func createTestSegmentFile(t *testing.T, from, to uint64, name snap.Type, dir string) { + c, err := compress.NewCompressor(context.Background(), "test", filepath.Join(dir, snap.SegmentFileName(from, to, name)), dir, 100, 1, log.LvlDebug) require.NoError(t, err) defer c.Close() err = c.AddWord([]byte{1}) @@ -30,7 +31,7 @@ func createTestSegmentFile(t *testing.T, from, to uint64, name Type, dir string) KeyCount: 1, BucketSize: 10, TmpDir: dir, - IndexFile: filepath.Join(dir, IdxFileName(from, to, name.String())), + IndexFile: filepath.Join(dir, snap.IdxFileName(from, to, name.String())), LeafSize: 8, }) require.NoError(t, err) @@ -39,12 +40,12 @@ func createTestSegmentFile(t *testing.T, from, to uint64, name Type, dir string) require.NoError(t, err) err = idx.Build() require.NoError(t, err) - if name == Transactions { + if name == snap.Transactions { idx, err := recsplit.NewRecSplit(recsplit.RecSplitArgs{ KeyCount: 1, BucketSize: 10, TmpDir: dir, - IndexFile: filepath.Join(dir, IdxFileName(from, to, Transactions2Block.String())), + IndexFile: filepath.Join(dir, snap.IdxFileName(from, to, snap.Transactions2Block.String())), LeafSize: 8, }) require.NoError(t, err) @@ -59,7 +60,7 @@ func createTestSegmentFile(t *testing.T, from, to uint64, name Type, dir string) func TestMergeSnapshots(t *testing.T) { dir, require := t.TempDir(), require.New(t) createFile := func(from, to uint64) { - for _, snT := range AllSnapshotTypes { + for _, snT := range snap.AllSnapshotTypes { createTestSegmentFile(t, from, to, snT, dir) } } @@ -82,7 +83,7 @@ func TestMergeSnapshots(t *testing.T) { require.NoError(err) } - expectedFileName := SegmentFileName(500_000, 1_000_000, Transactions) + expectedFileName := snap.SegmentFileName(500_000, 1_000_000, snap.Transactions) d, err := compress.NewDecompressor(filepath.Join(dir, expectedFileName)) require.NoError(err) defer d.Close() @@ -97,7 +98,7 @@ func TestMergeSnapshots(t *testing.T) { require.NoError(err) } - expectedFileName = SegmentFileName(1_100_000, 1_200_000, Transactions) + expectedFileName = snap.SegmentFileName(1_100_000, 1_200_000, snap.Transactions) d, err = compress.NewDecompressor(filepath.Join(dir, expectedFileName)) require.NoError(err) defer d.Close() @@ -129,7 +130,7 @@ func TestOpenAllSnapshot(t *testing.T) { chainSnapshotCfg := snapshothashes.KnownConfig(networkname.MainnetChainName) chainSnapshotCfg.ExpectBlocks = math.MaxUint64 cfg := ethconfig.Snapshot{Enabled: true} - createFile := func(from, to uint64, name Type) { createTestSegmentFile(t, from, to, name, dir) } + createFile := func(from, to uint64, name snap.Type) { createTestSegmentFile(t, from, to, name, dir) } s := NewRoSnapshots(cfg, dir) defer s.Close() err := s.Reopen() @@ -137,23 +138,23 @@ func TestOpenAllSnapshot(t *testing.T) { require.Equal(0, len(s.Headers.segments)) s.Close() - createFile(500_000, 1_000_000, Bodies) + createFile(500_000, 1_000_000, snap.Bodies) s = NewRoSnapshots(cfg, dir) defer s.Close() require.Equal(0, len(s.Bodies.segments)) //because, no headers and transactions snapshot files are created s.Close() - createFile(500_000, 1_000_000, Headers) - createFile(500_000, 1_000_000, Transactions) + createFile(500_000, 1_000_000, snap.Headers) + createFile(500_000, 1_000_000, snap.Transactions) s = NewRoSnapshots(cfg, dir) err = s.Reopen() require.Error(err) require.Equal(0, len(s.Headers.segments)) //because, no gaps are allowed (expect snapshots from block 0) s.Close() - createFile(0, 500_000, Bodies) - createFile(0, 500_000, Headers) - createFile(0, 500_000, Transactions) + createFile(0, 500_000, snap.Bodies) + createFile(0, 500_000, snap.Headers) + createFile(0, 500_000, snap.Transactions) s = NewRoSnapshots(cfg, dir) defer s.Close() @@ -190,9 +191,9 @@ func TestOpenAllSnapshot(t *testing.T) { defer s.Close() require.Equal(2, len(s.Headers.segments)) - createFile(500_000, 900_000, Headers) - createFile(500_000, 900_000, Bodies) - createFile(500_000, 900_000, Transactions) + createFile(500_000, 900_000, snap.Headers) + createFile(500_000, 900_000, snap.Bodies) + createFile(500_000, 900_000, snap.Transactions) chainSnapshotCfg.ExpectBlocks = math.MaxUint64 s = NewRoSnapshots(cfg, dir) defer s.Close() @@ -217,24 +218,24 @@ func TestParseCompressedFileName(t *testing.T) { require.NoError(err) return s.Name() } - _, err := ParseFileName("", stat("a")) + _, err := snap.ParseFileName("", stat("a")) require.Error(err) - _, err = ParseFileName("", stat("1-a")) + _, err = snap.ParseFileName("", stat("1-a")) require.Error(err) - _, err = ParseFileName("", stat("1-2-a")) + _, err = snap.ParseFileName("", stat("1-2-a")) require.Error(err) - _, err = ParseFileName("", stat("1-2-bodies.info")) + _, err = snap.ParseFileName("", stat("1-2-bodies.info")) require.Error(err) - _, err = ParseFileName("", stat("1-2-bodies.seg")) + _, err = snap.ParseFileName("", stat("1-2-bodies.seg")) require.Error(err) - _, err = ParseFileName("", stat("v2-1-2-bodies.seg")) + _, err = snap.ParseFileName("", stat("v2-1-2-bodies.seg")) require.Error(err) - _, err = ParseFileName("", stat("v0-1-2-bodies.seg")) + _, err = snap.ParseFileName("", stat("v0-1-2-bodies.seg")) require.Error(err) - f, err := ParseFileName("", stat("v1-1-2-bodies.seg")) + f, err := snap.ParseFileName("", stat("v1-1-2-bodies.seg")) require.NoError(err) - require.Equal(f.T, Bodies) + require.Equal(f.T, snap.Bodies) require.Equal(1_000, int(f.From)) require.Equal(2_000, int(f.To)) } diff --git a/turbo/snapshotsync/snap/files.go b/turbo/snapshotsync/snap/files.go new file mode 100644 index 0000000000..0e8f90a7f5 --- /dev/null +++ b/turbo/snapshotsync/snap/files.go @@ -0,0 +1,202 @@ +package snap + +import ( + "errors" + "fmt" + "io/fs" + "os" + "path/filepath" + "sort" + "strconv" + "strings" +) + +type Type int + +const ( + Headers Type = iota + Bodies + Transactions + NumberOfTypes +) + +func (ft Type) String() string { + switch ft { + case Headers: + return "headers" + case Bodies: + return "bodies" + case Transactions: + return "transactions" + default: + panic(fmt.Sprintf("unknown file type: %d", ft)) + } +} + +func ParseFileType(s string) (Type, bool) { + switch s { + case "headers": + return Headers, true + case "bodies": + return Bodies, true + case "transactions": + return Transactions, true + default: + return NumberOfTypes, false + } +} + +type IdxType string + +const ( + Transactions2Block IdxType = "transactions-to-block" +) + +func (it IdxType) String() string { return string(it) } + +var AllSnapshotTypes = []Type{Headers, Bodies, Transactions} + +var ( + ErrInvalidFileName = fmt.Errorf("invalid compressed file name") +) + +func FileName(from, to uint64, fileType string) string { + return fmt.Sprintf("v1-%06d-%06d-%s", from/1_000, to/1_000, fileType) +} +func SegmentFileName(from, to uint64, t Type) string { return FileName(from, to, t.String()) + ".seg" } +func DatFileName(from, to uint64, fType string) string { return FileName(from, to, fType) + ".dat" } +func IdxFileName(from, to uint64, fType string) string { return FileName(from, to, fType) + ".idx" } + +func FilterExt(in []FileInfo, expectExt string) (out []FileInfo) { + for _, f := range in { + if f.Ext != expectExt { // filter out only compressed files + continue + } + out = append(out, f) + } + return out +} +func FilesWithExt(dir, expectExt string) ([]FileInfo, error) { + files, err := ParseDir(dir) + if err != nil { + return nil, err + } + return FilterExt(files, expectExt), nil +} + +func IsCorrectFileName(name string) bool { + parts := strings.Split(name, "-") + return len(parts) == 4 && parts[3] != "v1" +} + +func ParseFileName(dir, fileName string) (res FileInfo, err error) { + ext := filepath.Ext(fileName) + onlyName := fileName[:len(fileName)-len(ext)] + parts := strings.Split(onlyName, "-") + if len(parts) < 4 { + return res, fmt.Errorf("expected format: v1-001500-002000-bodies.seg got: %s. %w", fileName, ErrInvalidFileName) + } + if parts[0] != "v1" { + return res, fmt.Errorf("version: %s. %w", parts[0], ErrInvalidFileName) + } + from, err := strconv.ParseUint(parts[1], 10, 64) + if err != nil { + return + } + to, err := strconv.ParseUint(parts[2], 10, 64) + if err != nil { + return + } + var snapshotType Type + ft, ok := ParseFileType(parts[3]) + if !ok { + return res, fmt.Errorf("unexpected snapshot suffix: %s,%w", parts[2], ErrInvalidFileName) + } + switch ft { + case Headers: + snapshotType = Headers + case Bodies: + snapshotType = Bodies + case Transactions: + snapshotType = Transactions + default: + return res, fmt.Errorf("unexpected snapshot suffix: %s,%w", parts[2], ErrInvalidFileName) + } + return FileInfo{From: from * 1_000, To: to * 1_000, Path: filepath.Join(dir, fileName), T: snapshotType, Ext: ext}, nil +} + +const MERGE_THRESHOLD = 2 // don't trigger merge if have too small amount of partial segments +const DEFAULT_SEGMENT_SIZE = 500_000 +const MIN_SEGMENT_SIZE = 1_000 + +// FileInfo - parsed file metadata +type FileInfo struct { + _ fs.FileInfo + Version uint8 + From, To uint64 + Path, Ext string + T Type +} + +func IdxFiles(dir string) (res []FileInfo, err error) { return FilesWithExt(dir, ".idx") } +func Segments(dir string) (res []FileInfo, err error) { return FilesWithExt(dir, ".seg") } +func TmpFiles(dir string) (res []string, err error) { + files, err := os.ReadDir(dir) + if err != nil { + return nil, err + } + for _, f := range files { + if f.IsDir() || len(f.Name()) < 3 { + continue + } + if filepath.Ext(f.Name()) != ".tmp" { + continue + } + res = append(res, filepath.Join(dir, f.Name())) + } + return res, nil +} + +var ErrSnapshotMissed = fmt.Errorf("snapshot missed") + +func ParseDir(dir string) (res []FileInfo, err error) { + files, err := os.ReadDir(dir) + if err != nil { + return nil, err + } + for _, f := range files { + fileInfo, err := f.Info() + if err != nil { + return nil, err + } + if f.IsDir() || fileInfo.Size() == 0 || len(f.Name()) < 3 { + continue + } + + meta, err := ParseFileName(dir, f.Name()) + if err != nil { + if errors.Is(err, ErrInvalidFileName) { + continue + } + return nil, err + } + res = append(res, meta) + } + sort.Slice(res, func(i, j int) bool { + if res[i].Version != res[j].Version { + return res[i].Version < res[j].Version + } + if res[i].From != res[j].From { + return res[i].From < res[j].From + } + if res[i].To != res[j].To { + return res[i].To < res[j].To + } + if res[i].T != res[j].T { + return res[i].T < res[j].T + } + return res[i].Ext < res[j].Ext + }) + + return res, nil +} diff --git a/turbo/snapshotsync/snapshotsynccli/flags.go b/turbo/snapshotsync/snap/flags.go similarity index 97% rename from turbo/snapshotsync/snapshotsynccli/flags.go rename to turbo/snapshotsync/snap/flags.go index eac39949f7..e0f23e9078 100644 --- a/turbo/snapshotsync/snapshotsynccli/flags.go +++ b/turbo/snapshotsync/snap/flags.go @@ -1,4 +1,4 @@ -package snapshotsynccli +package snap import ( "fmt" -- GitLab