diff --git a/cmd/downloader/downloader/torrentcfg/torrentcfg.go b/cmd/downloader/downloader/torrentcfg/torrentcfg.go index 59e176fd4ac273c9406d7da2e6f584e79f4d4ab1..255221ebca5823c47f9b9e46074431bd1aa7c5fc 100644 --- a/cmd/downloader/downloader/torrentcfg/torrentcfg.go +++ b/cmd/downloader/downloader/torrentcfg/torrentcfg.go @@ -10,7 +10,6 @@ import ( "github.com/anacrolix/torrent/storage" "github.com/c2h5oh/datasize" "github.com/ledgerwatch/erigon-lib/common/dir" - "golang.org/x/time/rate" ) // DefaultPieceSize - Erigon serves many big files, bigger pieces will reduce @@ -49,8 +48,8 @@ func New(snapshotsDir *dir.Rw, verbosity lg.Level, downloadRate, uploadRate data torrentConfig.UpnpID = torrentConfig.UpnpID + "leecher" // rates are divided by 2 - I don't know why it works, maybe bug inside torrent lib accounting - torrentConfig.UploadRateLimiter = rate.NewLimiter(rate.Limit(uploadRate.Bytes()/2), 2*DefaultPieceSize) // default: unlimited - torrentConfig.DownloadRateLimiter = rate.NewLimiter(rate.Limit(downloadRate.Bytes()/2), 2*DefaultPieceSize) // default: unlimited + //torrentConfig.UploadRateLimiter = rate.NewLimiter(rate.Limit(uploadRate.Bytes()/2), 2*16384) // default: unlimited + //torrentConfig.DownloadRateLimiter = rate.NewLimiter(rate.Limit(downloadRate.Bytes()/2), 2*DefaultPieceSize) // default: unlimited // debug if lg.Debug == verbosity { diff --git a/eth/backend.go b/eth/backend.go index 5538239c608470771310ad92efdeb3e5e2ba6498..0c563d3248d725d61eedc9498c0b3aadeff47a94 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -312,7 +312,6 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l var allSnapshots *snapshotsync.RoSnapshots if config.Snapshot.Enabled { allSnapshots = snapshotsync.NewRoSnapshots(config.Snapshot, config.SnapshotDir.Path) - allSnapshots.AsyncOpenAll(ctx) blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots) if len(stack.Config().DownloaderAddr) > 0 { diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index a32440a559aaaca713c1a3366fb15d7028c6bdbe..f4957b3ff8d176a58656d54cbae5de93af668098 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -572,7 +572,7 @@ func HeadersPOW( test bool, // Set to true in tests, allows the stage to fail rather than wait indefinitely useExternalTx bool, ) error { - if err := DownloadAndIndexSnapshotsIfNeed(s, ctx, tx, cfg); err != nil { + if err := DownloadAndIndexSnapshotsIfNeed(s, ctx, tx, cfg, initialCycle); err != nil { return err } @@ -987,22 +987,25 @@ func HeadersPrune(p *PruneState, tx kv.RwTx, cfg HeadersCfg, ctx context.Context return nil } -func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.RwTx, cfg HeadersCfg) error { +func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.RwTx, cfg HeadersCfg, initialCycle bool) error { if cfg.snapshots == nil { return nil } - if !cfg.snapshots.SegmentsReady() || cfg.snapshots.SegmentsAvailable() < cfg.snapshotHashesCfg.ExpectBlocks { + if !initialCycle { if err := WaitForDownloader(ctx, tx, cfg); err != nil { return err } - expect := cfg.snapshotHashesCfg.ExpectBlocks if err := cfg.snapshots.ReopenSegments(); err != nil { return fmt.Errorf("ReopenSegments: %w", err) } + expect := cfg.snapshotHashesCfg.ExpectBlocks if cfg.snapshots.SegmentsAvailable() < expect { return fmt.Errorf("not enough snapshots available: %d > %d", expect, cfg.snapshots.SegmentsAvailable()) } + if err := cfg.snapshots.ReopenIndices(); err != nil { + return fmt.Errorf("ReopenIndices: %w", err) + } } // Create .idx files @@ -1127,25 +1130,34 @@ func WaitForDownloader(ctx context.Context, tx kv.RwTx, cfg HeadersCfg) error { } break } + var prevBytesCompleted uint64 + logEvery := time.NewTicker(logInterval) + defer logEvery.Stop() // Print download progress until all segments are available +Loop: for { select { case <-ctx.Done(): return ctx.Err() - default: - } - if reply, err := cfg.snapshotDownloader.Stats(ctx, &proto_downloader.StatsRequest{}); err != nil { - log.Warn("Error while waiting for snapshots progress", "err", err) - } else if int(reply.Torrents) < len(snapshotsCfg.Preverified) { - log.Warn("Downloader has not enough snapshots (yet)") - } else if reply.Completed { - break - } else { - readiness := 100 * (float64(reply.BytesCompleted) / float64(reply.BytesTotal)) - log.Info("[Snapshots] download", "progress", fmt.Sprintf("%.2f%%", readiness)) + case <-logEvery.C: + if reply, err := cfg.snapshotDownloader.Stats(ctx, &proto_downloader.StatsRequest{}); err != nil { + log.Warn("Error while waiting for snapshots progress", "err", err) + } else if int(reply.Torrents) < len(snapshotsCfg.Preverified) { + log.Warn("Downloader has not enough snapshots (yet)") + } else if reply.Completed { + break Loop + } else { + readBytesPerSec := (reply.BytesCompleted - prevBytesCompleted) / uint64(logInterval.Seconds()) + //result.writeBytesPerSec += (result.bytesWritten - prevStats.bytesWritten) / int64(interval.Seconds()) + + readiness := 100 * (float64(reply.BytesCompleted) / float64(reply.BytesTotal)) + log.Info("[Snapshots] download", "progress", fmt.Sprintf("%.2f%%", readiness), + "download", libcommon.ByteCount(readBytesPerSec)+"/s", + ) + prevBytesCompleted = reply.BytesCompleted + } } - time.Sleep(10 * time.Second) } if err := tx.Put(kv.DatabaseInfo, []byte(readyKey), []byte{1}); err != nil { diff --git a/eth/stagedsync/stage_senders.go b/eth/stagedsync/stage_senders.go index fc972862b0d61e4d1501c6d9069e44c0c03d8170..905349e2f82358562c57e2fb0bbdb3c39bf8925b 100644 --- a/eth/stagedsync/stage_senders.go +++ b/eth/stagedsync/stage_senders.go @@ -397,7 +397,7 @@ func retireBlocks(s *PruneState, tx kv.RwTx, cfg SendersCfg, ctx context.Context } if res := cfg.blockRetire.Result(); res != nil { if res.Err != nil { - return fmt.Errorf("[%s] retire blocks last error: %w", s.LogPrefix(), res.Err) + return fmt.Errorf("[%s] retire blocks last error: %w, fromBlock=%d, toBlock=%d", s.LogPrefix(), res.Err, res.BlockFrom, res.BlockTo) } } if !cfg.blockRetire.Snapshots().Cfg().KeepBlocks { diff --git a/migrations/txs_begin_end.go b/migrations/txs_begin_end.go index 713c60322fc6a9f6fc5c5926914bfb5ef9b77a4e..8be749d241938365abc6b0f9406ea2cbb708fd96 100644 --- a/migrations/txs_begin_end.go +++ b/migrations/txs_begin_end.go @@ -238,7 +238,7 @@ func writeRawBodyDeprecated(db kv.StatelessRwTx, hash common.Hash, number uint64 return fmt.Errorf("failed to write body: %w", err) } if err = rawdb.WriteRawTransactions(db, body.Transactions, baseTxId); err != nil { - return fmt.Errorf("failed to WriteRawTransactions: %w", err) + return fmt.Errorf("failed to WriteRawTransactions: %w, blockNum=%d", err, number) } return nil } diff --git a/turbo/snapshotsync/block_reader.go b/turbo/snapshotsync/block_reader.go index 810dae4d04dc3712e6edddf822a018ce21306bda..575a174ea65bc5c3decdeb55685a12f49a73a670 100644 --- a/turbo/snapshotsync/block_reader.go +++ b/turbo/snapshotsync/block_reader.go @@ -219,6 +219,10 @@ func (back *BlockReaderWithSnapshots) HeaderByHash(ctx context.Context, tx kv.Ge buf := make([]byte, 128) if err := back.sn.Headers.View(func(segments []*HeaderSegment) error { for i := len(segments) - 1; i >= 0; i-- { + if segments[i].idxHeaderHash == nil { + continue + } + h, err = back.headerFromSnapshotByHash(hash, segments[i], buf) if err != nil { return err @@ -546,6 +550,9 @@ func (back *BlockReaderWithSnapshots) txsFromSnapshot(baseTxnID uint64, txsAmoun func (back *BlockReaderWithSnapshots) txnByHash(txnHash common.Hash, segments []*TxnSegment, buf []byte) (txn types.Transaction, blockNum, txnID uint64, err error) { for i := len(segments) - 1; i >= 0; i-- { sn := segments[i] + if sn.IdxTxnId == nil || sn.IdxTxnHash == nil || sn.IdxTxnHash2BlockNum == nil { + continue + } reader := recsplit.NewIndexReader(sn.IdxTxnHash) offset := reader.Lookup(txnHash[:])