diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index d14e61141f48061141a66969c24ee968c804c97b..f081fce994b99aa604c2448c14d8e4976ae11cec 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -1091,6 +1091,9 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R if workers < 1 { workers = 1 } + if workers > 4 { + workers = 4 + } if err := snapshotsync.BuildIndices(ctx, cfg.snapshots, cfg.snapshotDir, *chainID, cfg.tmpdir, cfg.snapshots.IndicesAvailable(), workers, log.LvlInfo); err != nil { return err } diff --git a/eth/stagedsync/stage_txlookup.go b/eth/stagedsync/stage_txlookup.go index 26875003429e8c112f454e84ae080b8d02f138f9..5733aae49d9cdfa20b30c32f891193d73e83e54a 100644 --- a/eth/stagedsync/stage_txlookup.go +++ b/eth/stagedsync/stage_txlookup.go @@ -1,7 +1,6 @@ package stagedsync import ( - "bytes" "context" "encoding/binary" "fmt" @@ -12,10 +11,8 @@ import ( "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/dbutils" "github.com/ledgerwatch/erigon/core/rawdb" - "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/crypto" "github.com/ledgerwatch/erigon/ethdb/prune" - "github.com/ledgerwatch/erigon/rlp" "github.com/ledgerwatch/erigon/turbo/snapshotsync" ) @@ -85,7 +82,7 @@ func SpawnTxLookup(s *StageState, tx kv.RwTx, toBlock uint64, cfg TxLookupCfg, c startBlock++ } startKey := dbutils.EncodeBlockNumber(startBlock) - if err = TxLookupTransform(logPrefix, tx, startKey, dbutils.EncodeBlockNumber(endBlock), quitCh, cfg); err != nil { + if err = txnLookupTransform(logPrefix, tx, startKey, dbutils.EncodeBlockNumber(endBlock), quitCh, cfg); err != nil { return err } if err = s.Update(tx, endBlock); err != nil { @@ -100,9 +97,9 @@ func SpawnTxLookup(s *StageState, tx kv.RwTx, toBlock uint64, cfg TxLookupCfg, c return nil } -func TxLookupTransform(logPrefix string, tx kv.RwTx, startKey, endKey []byte, quitCh <-chan struct{}, cfg TxLookupCfg) error { +func txnLookupTransform(logPrefix string, tx kv.RwTx, startKey, endKey []byte, quitCh <-chan struct{}, cfg TxLookupCfg) error { bigNum := new(big.Int) - return etl.Transform(logPrefix, tx, kv.HeaderCanonical, kv.TxLookup, cfg.tmpdir, func(k []byte, v []byte, next etl.ExtractNextFunc) error { + return etl.Transform(logPrefix, tx, kv.HeaderCanonical, kv.TxLookup, cfg.tmpdir, func(k, v []byte, next etl.ExtractNextFunc) error { blocknum := binary.BigEndian.Uint64(k) blockHash := common.BytesToHash(v) body := rawdb.ReadCanonicalBodyWithTransactions(tx, blockHash, blocknum) @@ -118,7 +115,7 @@ func TxLookupTransform(logPrefix string, tx kv.RwTx, startKey, endKey []byte, qu if cfg.isBor { borPrefix := []byte("matic-bor-receipt-") - if err := next(k, crypto.Keccak256(append(borPrefix, append(k, blockHash[:]...)...)), bigNum.SetUint64(blocknum).Bytes()); err != nil { + if err := next(k, crypto.Keccak256(append(append(borPrefix, k...), v...)), bigNum.SetUint64(blocknum).Bytes()); err != nil { return err } } @@ -135,7 +132,6 @@ func TxLookupTransform(logPrefix string, tx kv.RwTx, startKey, endKey []byte, qu } func UnwindTxLookup(u *UnwindState, s *StageState, tx kv.RwTx, cfg TxLookupCfg, ctx context.Context) (err error) { - quitCh := ctx.Done() if s.BlockNumber <= u.UnwindPoint { return nil } @@ -148,7 +144,9 @@ func UnwindTxLookup(u *UnwindState, s *StageState, tx kv.RwTx, cfg TxLookupCfg, defer tx.Rollback() } - if err := unwindTxLookup(u, s, tx, cfg, quitCh); err != nil { + // end key needs to be s.BlockNumber + 1 and not s.BlockNumber, because + // the keys in BlockBody table always have hash after the block number + if err := deleteTxLookupRange(tx, s.LogPrefix(), u.UnwindPoint+1, s.BlockNumber+1, ctx, cfg); err != nil { return err } if err := u.Done(tx); err != nil { @@ -162,45 +160,6 @@ func UnwindTxLookup(u *UnwindState, s *StageState, tx kv.RwTx, cfg TxLookupCfg, return nil } -func unwindTxLookup(u *UnwindState, s *StageState, tx kv.RwTx, cfg TxLookupCfg, quitCh <-chan struct{}) error { - reader := bytes.NewReader(nil) - logPrefix := s.LogPrefix() - return etl.Transform(logPrefix, tx, kv.BlockBody, kv.TxLookup, cfg.tmpdir, func(k, v []byte, next etl.ExtractNextFunc) error { - body := new(types.BodyForStorage) - reader.Reset(v) - if err := rlp.Decode(reader, body); err != nil { - return fmt.Errorf("rlp decode err: %w", err) - } - - txs, err := rawdb.CanonicalTransactions(tx, body.BaseTxId+1, body.TxAmount-2) - if err != nil { - return err - } - for _, txn := range txs { - if err = next(k, txn.Hash().Bytes(), nil); err != nil { - return err - } - } - - if cfg.isBor { - borPrefix := []byte("matic-bor-receipt-") - if err := next(k, crypto.Keccak256(append(borPrefix, k...)), nil); err != nil { - return err - } - } - return nil - }, etl.IdentityLoadFunc, etl.TransformArgs{ - Quit: quitCh, - ExtractStartKey: dbutils.EncodeBlockNumber(u.UnwindPoint + 1), - // end key needs to be s.BlockNumber + 1 and not s.BlockNumber, because - // the keys in BlockBody table always have hash after the block number - ExtractEndKey: dbutils.EncodeBlockNumber(s.BlockNumber + 1), - LogDetailsExtract: func(k, v []byte) (additionalLogArguments []interface{}) { - return []interface{}{"block", binary.BigEndian.Uint64(k)} - }, - }) -} - func PruneTxLookup(s *PruneState, tx kv.RwTx, cfg TxLookupCfg, ctx context.Context) (err error) { logPrefix := s.LogPrefix() useExternalTx := tx != nil @@ -217,7 +176,7 @@ func PruneTxLookup(s *PruneState, tx kv.RwTx, cfg TxLookupCfg, ctx context.Conte if cfg.prune.TxIndex.Enabled() { to := cfg.prune.TxIndex.PruneTo(s.ForwardProgress) if blockFrom < to { - if err = pruneTxLookup(tx, logPrefix, cfg.tmpdir, blockFrom, to, ctx, cfg); err != nil { + if err = deleteTxLookupRange(tx, logPrefix, blockFrom, to, ctx, cfg); err != nil { return err } } @@ -227,7 +186,7 @@ func PruneTxLookup(s *PruneState, tx kv.RwTx, cfg TxLookupCfg, ctx context.Conte } else if cfg.snapshots != nil && cfg.snapshots.Cfg().Enabled { to := snapshotsync.CanDeleteTo(s.ForwardProgress, cfg.snapshots) if blockFrom < to { - if err = pruneTxLookup(tx, logPrefix, cfg.tmpdir, blockFrom, to, ctx, cfg); err != nil { + if err = deleteTxLookupRange(tx, logPrefix, blockFrom, to, ctx, cfg); err != nil { return err } } @@ -244,27 +203,24 @@ func PruneTxLookup(s *PruneState, tx kv.RwTx, cfg TxLookupCfg, ctx context.Conte return nil } -func pruneTxLookup(tx kv.RwTx, logPrefix, tmpDir string, pruneFrom, pruneTo uint64, ctx context.Context, cfg TxLookupCfg) error { - reader := bytes.NewReader(nil) - return etl.Transform(logPrefix, tx, kv.BlockBody, kv.TxLookup, tmpDir, func(k, v []byte, next etl.ExtractNextFunc) error { - body := new(types.BodyForStorage) - reader.Reset(v) - if err := rlp.Decode(reader, body); err != nil { - return fmt.Errorf("rlp decode: %w", err) +// deleteTxLookupRange [from,to) +func deleteTxLookupRange(tx kv.RwTx, logPrefix string, blockFrom, blockTo uint64, ctx context.Context, cfg TxLookupCfg) error { + return etl.Transform(logPrefix, tx, kv.HeaderCanonical, kv.TxLookup, cfg.tmpdir, func(k, v []byte, next etl.ExtractNextFunc) error { + blocknum := binary.BigEndian.Uint64(k) + blockHash := common.BytesToHash(v) + body := rawdb.ReadCanonicalBodyWithTransactions(tx, blockHash, blocknum) + if body == nil { + return fmt.Errorf("empty block body %d, hash %x", blocknum, v) } - txs, err := rawdb.CanonicalTransactions(tx, body.BaseTxId+1, body.TxAmount-2) - if err != nil { - return err - } - for _, txn := range txs { + for _, txn := range body.Transactions { if err := next(k, txn.Hash().Bytes(), nil); err != nil { return err } } if cfg.isBor { borPrefix := []byte("matic-bor-receipt-") - if err := next(k, crypto.Keccak256(append(borPrefix, k...)), nil); err != nil { + if err := next(k, crypto.Keccak256(append(append(borPrefix, k...), v...)), nil); err != nil { return err } } @@ -272,8 +228,8 @@ func pruneTxLookup(tx kv.RwTx, logPrefix, tmpDir string, pruneFrom, pruneTo uint return nil }, etl.IdentityLoadFunc, etl.TransformArgs{ Quit: ctx.Done(), - ExtractStartKey: dbutils.EncodeBlockNumber(pruneFrom), - ExtractEndKey: dbutils.EncodeBlockNumber(pruneTo), + ExtractStartKey: dbutils.EncodeBlockNumber(blockFrom), + ExtractEndKey: dbutils.EncodeBlockNumber(blockTo), LogDetailsExtract: func(k, v []byte) (additionalLogArguments []interface{}) { return []interface{}{"block", binary.BigEndian.Uint64(k)} }, diff --git a/turbo/app/snapshots.go b/turbo/app/snapshots.go index 3d74f847e31b7698ca87de9d3963664d6d4bfbca..b21aff1e525890f7bc0bdbb1c2bb468c2b89a454 100644 --- a/turbo/app/snapshots.go +++ b/turbo/app/snapshots.go @@ -135,6 +135,9 @@ func doIndicesCommand(cliCtx *cli.Context) error { if workers < 1 { workers = 1 } + if workers > 4 { + workers = 4 + } if err := rebuildIndices(ctx, chainDB, cfg, rwSnapshotDir, tmpDir, from, workers); err != nil { log.Error("Error", "err", err) } diff --git a/turbo/snapshotsync/block_snapshots.go b/turbo/snapshotsync/block_snapshots.go index 5613bdea9a721a7ea6fe47377dab1ab33602d145..8f6f986eae205d77642fae16d75c2d95ed42d152 100644 --- a/turbo/snapshotsync/block_snapshots.go +++ b/turbo/snapshotsync/block_snapshots.go @@ -1129,7 +1129,11 @@ func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint25 if err := snapshots.Reopen(); err != nil { return fmt.Errorf("ReopenSegments: %w", err) } - if err := BuildIndices(ctx, snapshots, rwSnapshotDir, chainID, tmpDir, snapshots.IndicesAvailable(), workers, log.LvlInfo); err != nil { + idxWorkers := workers + if idxWorkers > 4 { + idxWorkers = 4 + } + if err := BuildIndices(ctx, snapshots, rwSnapshotDir, chainID, tmpDir, snapshots.IndicesAvailable(), idxWorkers, log.LvlInfo); err != nil { return err } merger := NewMerger(tmpDir, workers, lvl, chainID, notifier)