diff --git a/cmd/downloader/generator/commands/generate_body_snapshot.go b/cmd/downloader/generator/commands/generate_body_snapshot.go deleted file mode 100644 index ced7d4c7a172220aa88dcbdde5a434bf64fa99e8..0000000000000000000000000000000000000000 --- a/cmd/downloader/generator/commands/generate_body_snapshot.go +++ /dev/null @@ -1,89 +0,0 @@ -package commands - -import ( - "context" - "fmt" - "os" - "time" - - "github.com/ledgerwatch/erigon-lib/kv" - kv2 "github.com/ledgerwatch/erigon-lib/kv/mdbx" - "github.com/spf13/cobra" - - libcommon "github.com/ledgerwatch/erigon-lib/common" - "github.com/ledgerwatch/erigon/common" - "github.com/ledgerwatch/erigon/common/dbutils" - "github.com/ledgerwatch/erigon/core/rawdb" - "github.com/ledgerwatch/log/v3" -) - -func init() { - withDatadir(generateBodiesSnapshotCmd) - withSnapshotFile(generateBodiesSnapshotCmd) - withBlock(generateBodiesSnapshotCmd) - rootCmd.AddCommand(generateBodiesSnapshotCmd) - -} - -var generateBodiesSnapshotCmd = &cobra.Command{ - Use: "bodies", - Short: "Generate bodies snapshot", - Example: "go run cmd/snapshots/generator/main.go bodies --block 11000000 --datadir /media/b00ris/nvme/snapshotsync/ --snapshotDir /media/b00ris/nvme/snapshotsync/tg/snapshots/ --snapshotMode \"hb\" --snapshot /media/b00ris/nvme/snapshots/bodies_test", - RunE: func(cmd *cobra.Command, args []string) error { - return BodySnapshot(cmd.Context(), log.New(), chaindata, snapshotFile, block, snapshotDir, snapshotMode) - }, -} - -func BodySnapshot(ctx context.Context, logger log.Logger, dbPath, snapshotPath string, toBlock uint64, snapshotDir string, snapshotMode string) error { - db := kv2.NewMDBX(logger).Path(dbPath).MustOpen() - snKV := kv2.NewMDBX(logger).WithTablessCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { - return kv.TableCfg{ - kv.BlockBody: kv.TableCfgItem{}, - } - }).Path(snapshotPath).MustOpen() - - tx, err := db.BeginRo(context.Background()) - if err != nil { - return err - } - defer tx.Rollback() - - logEvery := time.NewTicker(30 * time.Second) - defer logEvery.Stop() - - t := time.Now() - var hash common.Hash - if err := snKV.Update(ctx, func(sntx kv.RwTx) error { - for i := uint64(1); i <= toBlock; i++ { - if common.IsCanceled(ctx) { - return libcommon.ErrStopped - } - - hash, err = rawdb.ReadCanonicalHash(tx, i) - if err != nil { - return fmt.Errorf("getting canonical hash for block %d: %w", i, err) - } - body := rawdb.ReadBodyRLP(tx, hash, i) - if err = sntx.Put(kv.BlockBody, dbutils.BlockBodyKey(i, hash), body); err != nil { - return err - } - select { - case <-logEvery.C: - log.Info("progress", "bucket", kv.BlockBody, "block num", i) - default: - } - } - return nil - }); err != nil { - return err - } - snKV.Close() - err = os.Remove(snapshotPath + "/mdbx.lck") - if err != nil { - log.Warn("Remove lock", "err", err) - return err - } - - log.Info("Finished", "duration", time.Since(t)) - return nil -} diff --git a/cmd/downloader/generator/commands/generate_header_snapshot.go b/cmd/downloader/generator/commands/generate_header_snapshot.go deleted file mode 100644 index 4f15f42cc7aeeba5461bce57e2cdbaba299f511c..0000000000000000000000000000000000000000 --- a/cmd/downloader/generator/commands/generate_header_snapshot.go +++ /dev/null @@ -1,108 +0,0 @@ -package commands - -import ( - "context" - "errors" - "fmt" - "os" - "time" - - "github.com/ledgerwatch/erigon-lib/kv" - kv2 "github.com/ledgerwatch/erigon-lib/kv/mdbx" - "github.com/spf13/cobra" - - libcommon "github.com/ledgerwatch/erigon-lib/common" - "github.com/ledgerwatch/erigon/common" - "github.com/ledgerwatch/erigon/common/dbutils" - "github.com/ledgerwatch/erigon/core/rawdb" - "github.com/ledgerwatch/log/v3" -) - -func init() { - withDatadir(generateHeadersSnapshotCmd) - withSnapshotFile(generateHeadersSnapshotCmd) - withBlock(generateHeadersSnapshotCmd) - - rootCmd.AddCommand(generateHeadersSnapshotCmd) -} - -var generateHeadersSnapshotCmd = &cobra.Command{ - Use: "headers", - Short: "Generate headers snapshot", - Example: "go run cmd/snapshots/generator/main.go headers --block 11000000 --datadir /media/b00ris/nvme/snapshotsync/ --snapshotDir /media/b00ris/nvme/snapshotsync/tg/snapshots/ --snapshotMode \"hb\" --snapshot /media/b00ris/nvme/snapshots/headers_test", - RunE: func(cmd *cobra.Command, args []string) error { - return HeaderSnapshot(cmd.Context(), log.New(), chaindata, snapshotFile, block, snapshotDir, snapshotMode) - }, -} - -func HeaderSnapshot(ctx context.Context, logger log.Logger, dbPath, snapshotPath string, toBlock uint64, snapshotDir string, snapshotMode string) error { - if snapshotPath == "" { - return errors.New("empty snapshot path") - } - err := os.RemoveAll(snapshotPath) - if err != nil { - return err - } - db := kv2.NewMDBX(logger).Path(dbPath).MustOpen() - - snKV := kv2.NewMDBX(logger).WithTablessCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { - return kv.TableCfg{ - kv.Headers: kv.TableCfgItem{}, - } - }).Path(snapshotPath).MustOpen() - - tx, err := db.BeginRo(context.Background()) - if err != nil { - return err - } - defer tx.Rollback() - snTx, err := snKV.BeginRw(context.Background()) - if err != nil { - return err - } - defer snTx.Rollback() - - t := time.Now() - var hash common.Hash - var header []byte - c, err := snTx.RwCursor(kv.Headers) - if err != nil { - return err - } - defer c.Close() - for i := uint64(1); i <= toBlock; i++ { - if common.IsCanceled(ctx) { - return libcommon.ErrStopped - } - - hash, err = rawdb.ReadCanonicalHash(tx, i) - if err != nil { - return fmt.Errorf("getting canonical hash for block %d: %w", i, err) - } - header = rawdb.ReadHeaderRLP(tx, hash, i) - if len(header) == 0 { - return fmt.Errorf("empty header: %v", i) - } - if err = c.Append(dbutils.HeaderKey(i, hash), header); err != nil { - return err - } - if i%1000 == 0 { - log.Info("Committed", "block", i) - } - } - - if err = snTx.Commit(); err != nil { - return err - } - - snKV.Close() - - err = os.Remove(snapshotPath + "/lock.mdb") - if err != nil { - log.Warn("Remove lock", "err", err) - return err - } - log.Info("Finished", "duration", time.Since(t)) - - return nil -} diff --git a/cmd/downloader/generator/commands/verify_headers.go b/cmd/downloader/generator/commands/verify_headers.go deleted file mode 100644 index 07043204bdfd9bfcd8ab968852bda056257fccbe..0000000000000000000000000000000000000000 --- a/cmd/downloader/generator/commands/verify_headers.go +++ /dev/null @@ -1,109 +0,0 @@ -package commands - -import ( - "context" - "errors" - "sync/atomic" - "time" - - "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon/core/types" - "github.com/ledgerwatch/erigon/rlp" - "github.com/ledgerwatch/erigon/turbo/snapshotsync" - "github.com/ledgerwatch/log/v3" - "github.com/spf13/cobra" -) - -func init() { - withSnapshotFile(verifyHeadersSnapshotCmd) - withBlock(verifyHeadersSnapshotCmd) - rootCmd.AddCommand(verifyHeadersSnapshotCmd) - -} - -//go run cmd/snapshots/generator/main.go state_copy --block 11000000 --snapshot /media/b00ris/nvme/snapshots/state -var verifyHeadersSnapshotCmd = &cobra.Command{ - Use: "verify_headers", - Short: "Copy from state snapshot", - Example: "go run cmd/snapshots/generator/main.go verify_headers --block 11000000 --snapshot /media/b00ris/nvme/snapshots/state", - RunE: func(cmd *cobra.Command, args []string) error { - return VerifyHeadersSnapshot(cmd.Context(), snapshotFile) - }, -} - -func VerifyHeadersSnapshot(ctx context.Context, snapshotPath string) error { - tt := time.Now() - log.Info("Start validation") - var prevHeader *types.Header - var lastHeader uint64 - - go func() { - for { - select { - case <-ctx.Done(): - return - default: - log.Info("Verifying", "t", time.Since(tt), "block", atomic.LoadUint64(&lastHeader)) - } - time.Sleep(time.Second * 10) - } - }() - snKV, err := snapshotsync.OpenHeadersSnapshot(snapshotPath) - if err != nil { - return err - } - err = snKV.View(ctx, func(tx kv.Tx) error { - c, err := tx.Cursor(kv.Headers) - if err != nil { - return err - } - k, v, innerErr := c.First() - for { - if len(k) == 0 && len(v) == 0 { - break - } - if innerErr != nil { - return innerErr - } - - header := new(types.Header) - innerErr := rlp.DecodeBytes(v, header) - if innerErr != nil { - return innerErr - } - - if prevHeader != nil { - if prevHeader.Number.Uint64()+1 != header.Number.Uint64() { - log.Error("invalid header number", "p", prevHeader.Number.Uint64(), "c", header.Number.Uint64()) - return errors.New("invalid header number") - } - if prevHeader.Hash() != header.ParentHash { - log.Error("invalid parent hash", "p", prevHeader.Hash(), "c", header.ParentHash) - return errors.New("invalid parent hash") - } - } - k, v, innerErr = c.Next() - if innerErr != nil { - return innerErr - } - - prevHeader = header - - atomic.StoreUint64(&lastHeader, header.Number.Uint64()) - } - if innerErr != nil { - return innerErr - } - return nil - }) - if err != nil { - return err - } - if block != 0 { - if lastHeader != block { - return errors.New("incorrect last block") - } - } - log.Info("Success", "t", time.Since(tt)) - return nil -} diff --git a/cmd/state/commands/verify_headers_snapshot.go b/cmd/state/commands/verify_headers_snapshot.go deleted file mode 100644 index 92395320b8a51a44a191a5804002cef72d49fb94..0000000000000000000000000000000000000000 --- a/cmd/state/commands/verify_headers_snapshot.go +++ /dev/null @@ -1,24 +0,0 @@ -package commands - -import ( - "github.com/ledgerwatch/erigon/cmd/state/verify" - "github.com/ledgerwatch/log/v3" - "github.com/spf13/cobra" -) - -func init() { - withDatadir(verifyHeadersSnapshotCmd) - rootCmd.AddCommand(verifyHeadersSnapshotCmd) -} - -var verifyHeadersSnapshotCmd = &cobra.Command{ - Use: "verifyHeadersSnapshot", - Short: "Verify headers snapshot", - RunE: func(cmd *cobra.Command, args []string) error { - if chaindata == "" && len(args) > 0 { - chaindata = args[0] - } - logger := log.New() - return verify.HeadersSnapshot(logger, chaindata) - }, -} diff --git a/cmd/state/verify/verify_headers_snapshot.go b/cmd/state/verify/verify_headers_snapshot.go deleted file mode 100644 index 0debde447b11c63573514616ef7ce35bf84d9ec6..0000000000000000000000000000000000000000 --- a/cmd/state/verify/verify_headers_snapshot.go +++ /dev/null @@ -1,57 +0,0 @@ -package verify - -import ( - "context" - "errors" - - "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon-lib/kv/mdbx" - "github.com/ledgerwatch/erigon/core/types" - "github.com/ledgerwatch/erigon/rlp" - "github.com/ledgerwatch/log/v3" -) - -func HeadersSnapshot(logger log.Logger, snapshotPath string) error { - snKV := mdbx.NewMDBX(logger).Path(snapshotPath).Readonly().WithTablessCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { - return kv.TableCfg{ - kv.Headers: kv.TableCfgItem{}, - } - }).MustOpen() - var prevHeader *types.Header - err := snKV.View(context.Background(), func(tx kv.Tx) error { - c, err := tx.Cursor(kv.Headers) - if err != nil { - return err - } - k, v, innerErr := c.First() - for { - if len(k) == 0 && len(v) == 0 { - break - } - if innerErr != nil { - return innerErr - } - - header := new(types.Header) - innerErr := rlp.DecodeBytes(v, header) - if innerErr != nil { - return innerErr - } - - if prevHeader != nil { - if prevHeader.Number.Uint64()+1 != header.Number.Uint64() { - log.Error("invalid header number", "p", prevHeader.Number.Uint64(), "c", header.Number.Uint64()) - return errors.New("invalid header number") - } - if prevHeader.Hash() != header.ParentHash { - log.Error("invalid parent hash", "p", prevHeader.Hash(), "c", header.ParentHash) - return errors.New("invalid parent hash") - } - } - k, v, innerErr = c.Next() //nolint - prevHeader = header - } - return nil - }) - return err -} diff --git a/eth/backend.go b/eth/backend.go index 73d927e48fffe16bc5b0e5c6954297eff1e1e2f2..975aabc9b80a0d203ed4a2305141a3a87f191199 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -239,20 +239,6 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere config.TxPool.Journal = stack.ResolvePath(config.TxPool.Journal) } - // setting notifier to support streaming events to rpc daemon - var mg *snapshotsync.SnapshotMigrator - if config.Snapshot.Enabled { - currentSnapshotBlock, currentInfohash, err := snapshotsync.GetSnapshotInfo(chainKv) - if err != nil { - return nil, err - } - mg = snapshotsync.NewMigrator(config.Snapshot.Dir, currentSnapshotBlock, currentInfohash) - err = mg.RemoveNonCurrentSnapshots() - if err != nil { - log.Error("Remove non current snapshot", "err", err) - } - } - if len(stack.Config().P2P.SentryAddr) > 0 { for _, addr := range stack.Config().P2P.SentryAddr { sentryClient, err := download.GrpcSentryClient(backend.downloadCtx, addr) diff --git a/turbo/snapshotsync/bodies_snapshot.go b/turbo/snapshotsync/bodies_snapshot.go index 72fb77bde600e9f40e18815ffdee00b13da9ab48..adcd09ff16eedcb7652bf03d5168d2c57ca4e3ae 100644 --- a/turbo/snapshotsync/bodies_snapshot.go +++ b/turbo/snapshotsync/bodies_snapshot.go @@ -1,260 +1 @@ package snapshotsync - -import ( - "bytes" - "context" - "encoding/binary" - "errors" - "fmt" - "os" - - "github.com/ledgerwatch/erigon-lib/etl" - "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon-lib/kv/mdbx" - "github.com/ledgerwatch/erigon/common" - "github.com/ledgerwatch/erigon/common/dbutils" - "github.com/ledgerwatch/erigon/core/types" - "github.com/ledgerwatch/erigon/ethdb" - "github.com/ledgerwatch/erigon/ethdb/snapshotdb" - "github.com/ledgerwatch/erigon/rlp" - "github.com/ledgerwatch/log/v3" -) - -func GenerateBodiesSnapshot(ctx context.Context, readTX kv.Tx, writeTX kv.RwTx, toBlock uint64) error { - readBodyCursor, err := readTX.Cursor(kv.BlockBody) - if err != nil { - return err - } - - writeBodyCursor, err := writeTX.RwCursor(kv.BlockBody) - if err != nil { - return err - } - writeEthTXCursor, err := writeTX.RwCursor(kv.EthTx) - if err != nil { - return err - } - readEthTXCursor, err := readTX.Cursor(kv.EthTx) - if err != nil { - return err - } - - var expectedBaseTxId uint64 - err = ethdb.Walk(readBodyCursor, []byte{}, 0, func(k, v []byte) (bool, error) { - if binary.BigEndian.Uint64(k) > toBlock { - return false, nil - } - - canonocalHash, err := readTX.GetOne(kv.HeaderCanonical, dbutils.EncodeBlockNumber(binary.BigEndian.Uint64(k))) - if err != nil { - return false, err - } - if !bytes.Equal(canonocalHash, k[8:]) { - return true, nil - } - bd := &types.BodyForStorage{} - err = rlp.DecodeBytes(v, bd) - if err != nil { - return false, fmt.Errorf("block %s decode err %w", common.Bytes2Hex(k), err) - } - baseTxId := bd.BaseTxId - amount := bd.TxAmount - - bd.BaseTxId = expectedBaseTxId - newV, err := rlp.EncodeToBytes(bd) - if err != nil { - return false, err - } - err = writeBodyCursor.Append(common.CopyBytes(k), newV) - if err != nil { - return false, err - } - - newExpectedTx := expectedBaseTxId - err = ethdb.Walk(readEthTXCursor, dbutils.EncodeBlockNumber(baseTxId), 0, func(k, v []byte) (bool, error) { - if newExpectedTx >= expectedBaseTxId+uint64(amount) { - return false, nil - } - err = writeEthTXCursor.Append(dbutils.EncodeBlockNumber(newExpectedTx), common.CopyBytes(v)) - if err != nil { - return false, err - } - newExpectedTx++ - return true, nil - }) - if err != nil { - return false, err - } - if newExpectedTx > expectedBaseTxId+uint64(amount) { - fmt.Println("newExpectedTx > expectedBaseTxId+amount", newExpectedTx, expectedBaseTxId, amount, "block", common.Bytes2Hex(k)) - return false, errors.New("newExpectedTx > expectedBaseTxId+amount") - } - expectedBaseTxId += uint64(amount) - return true, nil - }) - if err != nil { - return err - } - return nil -} - -func CreateBodySnapshot(readTx kv.Tx, logger log.Logger, lastBlock uint64, snapshotPath string) error { - db, err := mdbx.NewMDBX(logger).WithTablessCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { - return kv.TableCfg{ - kv.BlockBody: kv.ChaindataTablesCfg[kv.BlockBody], - kv.EthTx: kv.ChaindataTablesCfg[kv.EthTx], - } - }).Path(snapshotPath).Open() - if err != nil { - return err - } - - defer db.Close() - writeTX, err := db.BeginRw(context.Background()) - if err != nil { - return err - } - defer writeTX.Rollback() - err = GenerateBodiesSnapshot(context.TODO(), readTx, writeTX, lastBlock) - if err != nil { - return err - } - return writeTX.Commit() -} - -func OpenBodiesSnapshot(logger log.Logger, dbPath string) (kv.RoDB, error) { - return mdbx.NewMDBX(logger).Path(dbPath).WithTablessCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { - return kv.TableCfg{ - kv.BlockBody: kv.ChaindataTablesCfg[kv.BlockBody], - kv.EthTx: kv.ChaindataTablesCfg[kv.EthTx], - } - }).Readonly().Open() -} - -func RemoveBlocksData(db kv.RoDB, tx kv.RwTx, newSnapshot uint64) (err error) { - log.Info("Remove blocks data", "to", newSnapshot) - if _, ok := db.(snapshotdb.SnapshotUpdater); !ok { - return errors.New("db don't implement snapshotUpdater interface") - } - bodiesSnapshot := db.(snapshotdb.SnapshotUpdater).BodiesSnapshot() - if bodiesSnapshot == nil { - log.Info("bodiesSnapshot is empty") - return nil - } - blockBodySnapshotReadTX, err := bodiesSnapshot.BeginRo(context.Background()) - if err != nil { - return err - } - defer blockBodySnapshotReadTX.Rollback() - ethtxSnapshotReadTX, err := blockBodySnapshotReadTX.Cursor(kv.EthTx) - if err != nil { - return err - } - lastEthTXSnapshotKey, _, err := ethtxSnapshotReadTX.Last() - if err != nil { - return err - } - rewriteId := binary.BigEndian.Uint64(lastEthTXSnapshotKey) + 1 - - writeTX := tx.(snapshotdb.DBTX).DBTX() - blockBodyWriteCursor, err := writeTX.RwCursor(kv.BlockBody) - if err != nil { - return fmt.Errorf("get bodies cursor %w", err) - } - ethTXWriteCursor, err := writeTX.RwCursor(kv.EthTx) - if err != nil { - return fmt.Errorf("get ethtx cursor %w", err) - } - - logPrefix := "RemoveBlocksData" - bodiesCollector := etl.NewCollector(logPrefix, os.TempDir(), etl.NewSortableBuffer(etl.BufferOptimalSize)) - defer bodiesCollector.Close() - ethTXCollector := etl.NewCollector(logPrefix, os.TempDir(), etl.NewSortableBuffer(etl.BufferOptimalSize)) - defer ethTXCollector.Close() - err = ethdb.Walk(blockBodyWriteCursor, dbutils.BlockBodyKey(0, common.Hash{}), 0, func(k, v []byte) (bool, error) { - if binary.BigEndian.Uint64(k) > newSnapshot { - return false, nil - } - has, err := blockBodySnapshotReadTX.Has(kv.BlockBody, k) - if err != nil { - return false, err - } - bd := types.BodyForStorage{} - err = rlp.DecodeBytes(v, &bd) - if err != nil { - return false, err - } - - if has { - innerErr := blockBodyWriteCursor.Delete(k, nil) - if innerErr != nil { - return false, fmt.Errorf("remove %v err:%w", common.Bytes2Hex(k), innerErr) - } - for i := bd.BaseTxId; i < bd.BaseTxId+uint64(bd.TxAmount); i++ { - err = ethTXWriteCursor.Delete(dbutils.EncodeBlockNumber(i), nil) - if err != nil { - return false, err - } - } - } else { - collectKey := common.CopyBytes(k) - oldBaseTxId := bd.BaseTxId - bd.BaseTxId = rewriteId - bodyBytes, err := rlp.EncodeToBytes(bd) - if err != nil { - return false, err - } - - if bd.TxAmount > 0 { - txIdKey := make([]byte, 8) - binary.BigEndian.PutUint64(txIdKey, oldBaseTxId) - i := uint32(0) - - for k, v, err := ethTXWriteCursor.SeekExact(txIdKey); k != nil; k, v, err = ethTXWriteCursor.Next() { - if err != nil { - return false, err - } - - err = ethTXCollector.Collect(dbutils.EncodeBlockNumber(rewriteId+uint64(i)), common.CopyBytes(v)) - if err != nil { - return false, err - } - - i++ - if i >= bd.TxAmount { - break - } - } - } - //we need to remove it to use snapshot data instead - for i := oldBaseTxId; i < oldBaseTxId+uint64(bd.TxAmount); i++ { - err = ethTXWriteCursor.Delete(dbutils.EncodeBlockNumber(i), nil) - if err != nil { - return false, err - } - } - - rewriteId += uint64(bd.TxAmount) - - err = bodiesCollector.Collect(collectKey, bodyBytes) - if err != nil { - return false, err - } - } - - return true, nil - }) - if err != nil { - return err - } - err = bodiesCollector.Load(writeTX, kv.BlockBody, etl.IdentityLoadFunc, etl.TransformArgs{}) - if err != nil { - return err - } - - err = ethTXCollector.Load(writeTX, kv.EthTx, etl.IdentityLoadFunc, etl.TransformArgs{}) - if err != nil { - return err - } - return nil -} diff --git a/turbo/snapshotsync/headers_snapshot.go b/turbo/snapshotsync/headers_snapshot.go index a85bc84ec68ce34b4b3f4040a81cb171a4d1fd28..adcd09ff16eedcb7652bf03d5168d2c57ca4e3ae 100644 --- a/turbo/snapshotsync/headers_snapshot.go +++ b/turbo/snapshotsync/headers_snapshot.go @@ -1,129 +1 @@ package snapshotsync - -import ( - "context" - "errors" - "fmt" - "os" - "time" - - libcommon "github.com/ledgerwatch/erigon-lib/common" - "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon-lib/kv/mdbx" - "github.com/ledgerwatch/erigon/common" - "github.com/ledgerwatch/erigon/common/dbutils" - "github.com/ledgerwatch/erigon/core/rawdb" - "github.com/ledgerwatch/erigon/ethdb" - "github.com/ledgerwatch/erigon/ethdb/snapshotdb" - "github.com/ledgerwatch/log/v3" -) - -func CreateHeadersSnapshot(ctx context.Context, readTX kv.Tx, toBlock uint64, snapshotPath string) error { - // remove created snapshot if it's not saved in main db(to avoid append error) - err := os.RemoveAll(snapshotPath) - if err != nil { - return err - } - - snKV, err := mdbx.NewMDBX(log.New()).WithTablessCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { - return kv.TableCfg{ - kv.Headers: kv.ChaindataTablesCfg[kv.Headers], - } - }).Path(snapshotPath).Open() - if err != nil { - return err - } - sntx, err := snKV.BeginRw(context.Background()) - if err != nil { - return fmt.Errorf("begin err: %w", err) - } - defer sntx.Rollback() - - err = GenerateHeadersSnapshot(ctx, readTX, sntx, toBlock) - if err != nil { - return fmt.Errorf("generate err: %w", err) - } - err = sntx.Commit() - if err != nil { - return fmt.Errorf("commit err: %w", err) - } - snKV.Close() - - return nil -} - -func GenerateHeadersSnapshot(ctx context.Context, db kv.Tx, sntx kv.RwTx, toBlock uint64) error { - headerCursor, err := sntx.RwCursor(kv.Headers) - if err != nil { - return err - } - var hash common.Hash - var header []byte - t := time.NewTicker(time.Second * 30) - defer t.Stop() - tt := time.Now() - for i := uint64(0); i <= toBlock; i++ { - if common.IsCanceled(ctx) { - return libcommon.ErrStopped - } - select { - case <-t.C: - log.Info("Headers snapshot generation", "t", time.Since(tt), "block", i) - default: - } - hash, err = rawdb.ReadCanonicalHash(db, i) - if err != nil { - return err - } - header = rawdb.ReadHeaderRLP(db, hash, i) - if len(header) < 2 { - return fmt.Errorf("header %d is empty, %v", i, header) - } - - err = headerCursor.Append(dbutils.HeaderKey(i, hash), header) - if err != nil { - return err - } - } - return nil -} - -func OpenHeadersSnapshot(dbPath string) (kv.RoDB, error) { - return mdbx.NewMDBX(log.New()).WithTablessCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { - return kv.TableCfg{ - kv.Headers: kv.ChaindataTablesCfg[kv.Headers], - } - }).Readonly().Path(dbPath).Open() -} - -func RemoveHeadersData(db kv.RoDB, tx kv.RwTx, currentSnapshot, newSnapshot uint64) (err error) { - log.Info("Remove data", "from", currentSnapshot, "to", newSnapshot) - if _, ok := db.(snapshotdb.SnapshotUpdater); !ok { - return errors.New("db don't implement snapshotUpdater interface") - } - headerSnapshot := db.(snapshotdb.SnapshotUpdater).HeadersSnapshot() - if headerSnapshot == nil { - return errors.New("empty headers snapshot") - } - writeTX := tx.(snapshotdb.DBTX).DBTX() - c, err := writeTX.RwCursor(kv.Headers) - if err != nil { - return fmt.Errorf("get headers cursor %w", err) - } - - return headerSnapshot.View(context.Background(), func(tx kv.Tx) error { - c2, err := tx.Cursor(kv.Headers) - if err != nil { - return err - } - defer c2.Close() - defer c2.Close() - return ethdb.Walk(c2, dbutils.EncodeBlockNumber(currentSnapshot), 0, func(k, v []byte) (bool, error) { - innerErr := c.Delete(k, nil) - if innerErr != nil { - return false, fmt.Errorf("remove %v err:%w", common.Bytes2Hex(k), innerErr) - } - return true, nil - }) - }) -} diff --git a/turbo/snapshotsync/snapshot_builder.go b/turbo/snapshotsync/snapshot_builder.go index b63c7464fe015f8adc1d2e9a8e9107c2c5309aff..33443659ee2e73406425fc9d6c755ca8a817cbe0 100644 --- a/turbo/snapshotsync/snapshot_builder.go +++ b/turbo/snapshotsync/snapshot_builder.go @@ -3,360 +3,12 @@ package snapshotsync import ( "context" "encoding/binary" - "errors" - "io/ioutil" - "os" "path/filepath" "strconv" - "strings" - "sync/atomic" - "time" - "github.com/anacrolix/torrent/metainfo" "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon/common" - "github.com/ledgerwatch/erigon/common/dbutils" - "github.com/ledgerwatch/erigon/common/debug" - "github.com/ledgerwatch/erigon/ethdb" - "github.com/ledgerwatch/erigon/ethdb/snapshotdb" - "github.com/ledgerwatch/erigon/params" - "github.com/ledgerwatch/log/v3" ) -func NewMigrator(snapshotDir string, currentSnapshotBlock uint64, currentSnapshotInfohash []byte) *SnapshotMigrator { - return &SnapshotMigrator{ - snapshotsDir: snapshotDir, - HeadersCurrentSnapshot: currentSnapshotBlock, - HeadersNewSnapshotInfohash: currentSnapshotInfohash, - replaceChan: make(chan struct{}), - } -} - -type SnapshotMigrator struct { - snapshotsDir string - HeadersCurrentSnapshot uint64 - HeadersNewSnapshot uint64 - BodiesCurrentSnapshot uint64 - BodiesNewSnapshot uint64 - HeadersNewSnapshotInfohash []byte - BodiesNewSnapshotInfohash []byte - snapshotType string - started uint64 - replaceChan chan struct{} - replaced uint64 -} - -func (sm *SnapshotMigrator) AsyncStages(migrateToBlock uint64, logger log.Logger, dbi kv.RwDB, rwTX kv.Tx, bittorrent *Client, async bool) error { - if atomic.LoadUint64(&sm.started) > 0 { - return nil - } - - var snapshotName string - var snapshotHashKey []byte - if sm.HeadersCurrentSnapshot < migrateToBlock && atomic.LoadUint64(&sm.HeadersNewSnapshot) < migrateToBlock { - snapshotName = "headers" - snapshotHashKey = kv.CurrentHeadersSnapshotHash - } else if sm.BodiesCurrentSnapshot < migrateToBlock && atomic.LoadUint64(&sm.BodiesNewSnapshot) < migrateToBlock { - snapshotName = "bodies" - snapshotHashKey = kv.CurrentBodiesSnapshotHash - } else { - return nil - } - atomic.StoreUint64(&sm.started, 1) - sm.snapshotType = snapshotName - snapshotPath := SnapshotName(sm.snapshotsDir, sm.snapshotType, migrateToBlock) - switch sm.snapshotType { - case "headers": - sm.HeadersNewSnapshot = migrateToBlock - case "bodies": - sm.BodiesNewSnapshot = migrateToBlock - } - atomic.StoreUint64(&sm.replaced, 0) - - var initialStages []func(db kv.RoDB, tx kv.Tx, toBlock uint64) error - switch sm.snapshotType { - case "headers": - initialStages = []func(db kv.RoDB, tx kv.Tx, toBlock uint64) error{ - func(db kv.RoDB, tx kv.Tx, toBlock uint64) error { - return CreateHeadersSnapshot(context.Background(), tx, toBlock, snapshotPath) - }, - func(db kv.RoDB, tx kv.Tx, toBlock uint64) error { - //replace snapshot - if _, ok := db.(snapshotdb.SnapshotUpdater); !ok { - return errors.New("db don't implement snapshotUpdater interface") - } - snapshotKV, err := OpenHeadersSnapshot(snapshotPath) - if err != nil { - return err - } - - db.(snapshotdb.SnapshotUpdater).UpdateSnapshots("headers", snapshotKV, sm.replaceChan) - return nil - }, - } - case "bodies": - initialStages = []func(db kv.RoDB, tx kv.Tx, toBlock uint64) error{ - func(db kv.RoDB, tx kv.Tx, toBlock uint64) error { - return CreateBodySnapshot(tx, logger, toBlock, snapshotPath) - }, - func(db kv.RoDB, tx kv.Tx, toBlock uint64) error { - //replace snapshot - if _, ok := db.(snapshotdb.SnapshotUpdater); !ok { - return errors.New("db don't implement snapshotUpdater interface") - } - snapshotKV, err := OpenBodiesSnapshot(logger, snapshotPath) - if err != nil { - return err - } - - db.(snapshotdb.SnapshotUpdater).UpdateSnapshots("bodies", snapshotKV, sm.replaceChan) - return nil - }, - } - } - - btStages := func(shapshotHashKey []byte) []func(db kv.RoDB, tx kv.Tx, toBlock uint64) error { - return []func(db kv.RoDB, tx kv.Tx, toBlock uint64) error{ - func(db kv.RoDB, tx kv.Tx, toBlock uint64) error { - //todo headers infohash - var infohash []byte - var err error - infohash, err = tx.GetOne(kv.BittorrentInfo, shapshotHashKey) - if err != nil && !errors.Is(err, ethdb.ErrKeyNotFound) { - log.Error("Get infohash", "err", err, "block", toBlock) - return err - } - - if len(infohash) == 20 { - var hash metainfo.Hash - copy(hash[:], infohash) - log.Info("Stop seeding snapshot", "type", snapshotName, "infohash", hash.String()) - err = bittorrent.StopSeeding(hash) - if err != nil { - log.Error("Stop seeding", "err", err, "block", toBlock) - return err - } - log.Info("Stopped seeding snapshot", "type", snapshotName, "infohash", hash.String()) - } else { - log.Warn("Hasn't stopped snapshot", "infohash", common.Bytes2Hex(infohash)) - } - return nil - }, - func(db kv.RoDB, tx kv.Tx, toBlock uint64) error { - log.Info("Start seeding snapshot", "type", snapshotName) - seedingInfoHash, err := bittorrent.SeedSnapshot(snapshotName, snapshotPath) - if err != nil { - log.Error("Seeding", "err", err) - return err - } - - switch snapshotName { - case "bodies": - sm.BodiesNewSnapshotInfohash = seedingInfoHash[:] - case "headers": - sm.HeadersNewSnapshotInfohash = seedingInfoHash[:] - } - - log.Info("Started seeding snapshot", "type", snapshotName, "infohash", seedingInfoHash.String()) - atomic.StoreUint64(&sm.started, 2) - return nil - }, - } - } - - stages := append(initialStages, btStages(snapshotHashKey)...) - - startStages := func(tx kv.Tx) (innerErr error) { - defer func() { - if innerErr != nil { - atomic.StoreUint64(&sm.started, 0) - switch snapshotName { - case "headers": - atomic.StoreUint64(&sm.HeadersNewSnapshot, atomic.LoadUint64(&sm.HeadersCurrentSnapshot)) - case "bodies": - atomic.StoreUint64(&sm.BodiesNewSnapshot, atomic.LoadUint64(&sm.BodiesCurrentSnapshot)) - } - - log.Error("Error on stage. Rollback", "type", snapshotName, "err", innerErr) - } - }() - for i := range stages { - log.Info("ID", "i", i) - innerErr = stages[i](dbi, tx, migrateToBlock) - if innerErr != nil { - return innerErr - } - } - return nil - } - if async { - go func() { - //@todo think about possibility that write tx has uncommited data that we don't have in readTXs - defer debug.LogPanic() - readTX, err := dbi.BeginRo(context.Background()) - if err != nil { - log.Error("begin", "err", err) - return - } - defer readTX.Rollback() - - innerErr := startStages(readTX) - if innerErr != nil { - log.Error("Async stages", "err", innerErr) - return - } - }() - } else { - return startStages(rwTX) - } - return nil -} - -func (sm *SnapshotMigrator) Replaced() bool { - select { - case <-sm.replaceChan: - log.Info("Snapshot replaced") - atomic.StoreUint64(&sm.replaced, 1) - default: - } - - return atomic.LoadUint64(&sm.replaced) == 1 -} - -func (sm *SnapshotMigrator) SyncStages(migrateToBlock uint64, dbi kv.RwDB, rwTX kv.RwTx) error { - log.Info("SyncStages", "started", atomic.LoadUint64(&sm.started)) - - if atomic.LoadUint64(&sm.started) == 2 && sm.Replaced() { - var syncStages []func(db kv.RoDB, tx kv.RwTx, toBlock uint64) error - switch sm.snapshotType { - case "bodies": - syncStages = []func(db kv.RoDB, tx kv.RwTx, toBlock uint64) error{ - func(db kv.RoDB, tx kv.RwTx, toBlock uint64) error { - log.Info("Prune db", "new", atomic.LoadUint64(&sm.BodiesNewSnapshot)) - return RemoveBlocksData(db, tx, atomic.LoadUint64(&sm.BodiesNewSnapshot)) - }, - func(db kv.RoDB, tx kv.RwTx, toBlock uint64) error { - log.Info("Save bodies snapshot", "new", common.Bytes2Hex(sm.HeadersNewSnapshotInfohash), "new", atomic.LoadUint64(&sm.HeadersNewSnapshot)) - c, err := tx.RwCursor(kv.BittorrentInfo) - if err != nil { - return err - } - if len(sm.BodiesNewSnapshotInfohash) == 20 { - err = c.Put(kv.CurrentBodiesSnapshotHash, sm.BodiesNewSnapshotInfohash) - if err != nil { - return err - } - } - return c.Put(kv.CurrentBodiesSnapshotBlock, dbutils.EncodeBlockNumber(atomic.LoadUint64(&sm.BodiesNewSnapshot))) - }, - } - case "headers": - syncStages = []func(db kv.RoDB, tx kv.RwTx, toBlock uint64) error{ - func(db kv.RoDB, tx kv.RwTx, toBlock uint64) error { - log.Info("Prune headers db", "current", sm.HeadersCurrentSnapshot, "new", atomic.LoadUint64(&sm.HeadersNewSnapshot)) - return RemoveHeadersData(db, tx, sm.HeadersCurrentSnapshot, atomic.LoadUint64(&sm.HeadersNewSnapshot)) - }, - func(db kv.RoDB, tx kv.RwTx, toBlock uint64) error { - log.Info("Save headers snapshot", "new", common.Bytes2Hex(sm.HeadersNewSnapshotInfohash), "new", atomic.LoadUint64(&sm.HeadersNewSnapshot)) - c, err := tx.RwCursor(kv.BittorrentInfo) - if err != nil { - return err - } - if len(sm.HeadersNewSnapshotInfohash) == 20 { - err = c.Put(kv.CurrentHeadersSnapshotHash, sm.HeadersNewSnapshotInfohash) - if err != nil { - return err - } - } - return c.Put(kv.CurrentHeadersSnapshotBlock, dbutils.EncodeBlockNumber(atomic.LoadUint64(&sm.HeadersNewSnapshot))) - }, - } - } - - for i := range syncStages { - innerErr := syncStages[i](dbi, rwTX, migrateToBlock) - if innerErr != nil { - return innerErr - } - } - atomic.StoreUint64(&sm.started, 3) - - } - return nil -} - -func (sm *SnapshotMigrator) Final(tx kv.Tx) error { - if atomic.LoadUint64(&sm.started) < 3 { - return nil - } - - v, err := tx.GetOne(kv.BittorrentInfo, kv.CurrentHeadersSnapshotBlock) - if errors.Is(err, ethdb.ErrKeyNotFound) { - return nil - } - if err != nil { - return err - } - - if len(v) != 8 { - log.Error("Incorrect length", "ln", len(v)) - return nil - } - - if sm.HeadersCurrentSnapshot < atomic.LoadUint64(&sm.HeadersNewSnapshot) && sm.HeadersCurrentSnapshot != 0 { - oldSnapshotPath := SnapshotName(sm.snapshotsDir, "headers", sm.HeadersCurrentSnapshot) - log.Info("Removing old snapshot", "path", oldSnapshotPath) - tt := time.Now() - err = os.RemoveAll(oldSnapshotPath) - if err != nil { - log.Error("Remove snapshot", "err", err) - return err - } - log.Info("Removed old snapshot", "path", oldSnapshotPath, "t", time.Since(tt)) - } - - if binary.BigEndian.Uint64(v) == atomic.LoadUint64(&sm.HeadersNewSnapshot) { - atomic.StoreUint64(&sm.HeadersCurrentSnapshot, sm.HeadersNewSnapshot) - atomic.StoreUint64(&sm.started, 0) - atomic.StoreUint64(&sm.replaced, 0) - log.Info("CurrentHeadersSnapshotBlock commited", "block", binary.BigEndian.Uint64(v)) - return nil - } - return nil -} - -func (sm *SnapshotMigrator) RemoveNonCurrentSnapshots() error { - files, err := ioutil.ReadDir(sm.snapshotsDir) - if err != nil { - return err - } - - for i := range files { - snapshotName := files[i].Name() - if files[i].IsDir() && strings.HasPrefix(snapshotName, "headers") { - snapshotBlock, innerErr := strconv.ParseUint(strings.TrimPrefix(snapshotName, "headers"), 10, 64) - if innerErr != nil { - log.Warn("unknown snapshot", "name", snapshotName, "err", innerErr) - continue - } - if snapshotBlock != sm.HeadersCurrentSnapshot { - snapshotPath := filepath.Join(sm.snapshotsDir, snapshotName) - innerErr = os.RemoveAll(snapshotPath) - if innerErr != nil { - log.Warn("useless snapshot has't removed", "path", snapshotPath, "err", innerErr) - } - log.Info("removed useless snapshot", "path", snapshotPath) - } - } - } - return nil -} - -//CalculateEpoch - returns latest available snapshot block that possible to create. -func CalculateEpoch(block, epochSize uint64) uint64 { - return block - (block+params.FullImmutabilityThreshold)%epochSize -} - func SnapshotName(baseDir, name string, blockNum uint64) string { return filepath.Join(baseDir, name) + strconv.FormatUint(blockNum, 10) } diff --git a/turbo/snapshotsync/wrapdb.go b/turbo/snapshotsync/wrapdb.go index 781cabd1af5263f6f7dd4787390d4eb33f8b91c3..bc4de4dc43a377fd01362dcd4d4f2a15dd0a64d1 100644 --- a/turbo/snapshotsync/wrapdb.go +++ b/turbo/snapshotsync/wrapdb.go @@ -2,8 +2,6 @@ package snapshotsync import ( "context" - "encoding/binary" - "errors" "time" "github.com/ledgerwatch/erigon-lib/gointerfaces/snapshotsync" @@ -36,12 +34,6 @@ var ( } ) -//nolint -func WrapBySnapshotsFromDir(kv kv.RwDB, snapshotDir string, mode SnapshotMode) (kv.RwDB, error) { - //todo remove it - return nil, errors.New("deprecated") //nolint -} - func WrapBySnapshotsFromDownloader(db kv.RwDB, snapshots map[snapshotsync.SnapshotType]*snapshotsync.SnapshotsInfo) (kv.RwDB, error) { snKV := snapshotdb.NewSnapshotKV().DB(db) for k, v := range snapshots { @@ -69,34 +61,6 @@ func WrapBySnapshotsFromDownloader(db kv.RwDB, snapshots map[snapshotsync.Snapsh return snKV.Open(), nil } -func WrapSnapshots(chainDb kv.RwDB, snapshotsDir string) (kv.RwDB, error) { - var snapshotBlock uint64 - var hasSnapshotBlock bool - if err := chainDb.View(context.Background(), func(tx kv.Tx) error { - v, err := tx.GetOne(kv.BittorrentInfo, kv.CurrentHeadersSnapshotBlock) - if err != nil { - return err - } - hasSnapshotBlock = len(v) == 8 - if hasSnapshotBlock { - snapshotBlock = binary.BigEndian.Uint64(v) - } - return nil - }); err != nil { - return chainDb, err - } - - snKVOpts := snapshotdb.NewSnapshotKV().DB(chainDb) - if hasSnapshotBlock { - snKV, innerErr := OpenHeadersSnapshot(SnapshotName(snapshotsDir, "headers", snapshotBlock)) - if innerErr != nil { - return chainDb, innerErr - } - snKVOpts = snKVOpts.HeadersSnapshot(snKV) - } - return snKVOpts.Open(), nil -} - func DownloadSnapshots(torrentClient *Client, ExternalSnapshotDownloaderAddr string, networkID uint64, snapshotMode SnapshotMode, chainDb ethdb.Database) error { var downloadedSnapshots map[snapshotsync.SnapshotType]*snapshotsync.SnapshotsInfo if ExternalSnapshotDownloaderAddr != "" {