diff --git a/eth/stagedsync/stage_senders.go b/eth/stagedsync/stage_senders.go index fb00afe560234f94610eb386db128d36b7838bfc..dbfb11de12b688088e61ea870f83524d4b8e6cdd 100644 --- a/eth/stagedsync/stage_senders.go +++ b/eth/stagedsync/stage_senders.go @@ -416,13 +416,8 @@ func retireBlocksInSingleBackgroundThread(s *PruneState, cfg SendersCfg, ctx con } } - blockFrom, blockTo, ok := snapshotsync.CanRetire(s.ForwardProgress, cfg.blockRetire.Snapshots()) - if !ok { - return nil - } - chainID, _ := uint256.FromBig(cfg.chainConfig.ChainID) - cfg.blockRetire.RetireBlocksInBackground(ctx, blockFrom, blockTo, *chainID, log.LvlInfo) + cfg.blockRetire.RetireBlocksInBackground(ctx, s.ForwardProgress, *chainID, log.LvlInfo) return nil } diff --git a/turbo/app/snapshots.go b/turbo/app/snapshots.go index 54e58d95047aab0be1a8272a810189392bfbe098..0a0dbf9efc91fcb7374af07e53440a2e6bf912fc 100644 --- a/turbo/app/snapshots.go +++ b/turbo/app/snapshots.go @@ -260,7 +260,7 @@ func doRetireCommand(cliCtx *cli.Context) error { br := snapshotsync.NewBlockRetire(workers, tmpDir, snapshots, chainDB, nil, nil) for i := from; i < to; i += every { - br.RetireBlocksInBackground(ctx, i, i+every, *chainID, log.LvlInfo) + br.RetireBlocksInBackground(ctx, i, *chainID, log.LvlInfo) br.Wait() res := br.Result() if res.Err != nil { diff --git a/turbo/snapshotsync/block_snapshots.go b/turbo/snapshotsync/block_snapshots.go index 1f723762ee2a2cce897e3ccd2a4edbc73c0db6c8..2858667d835b93b18431d2cfce757c773f7e99be 100644 --- a/turbo/snapshotsync/block_snapshots.go +++ b/turbo/snapshotsync/block_snapshots.go @@ -387,6 +387,11 @@ func (s *RoSnapshots) Reopen() error { if err != nil { return err } + var segmentsMax uint64 + var segmentsMaxSet bool + s.Bodies.segments = s.Bodies.segments[:0] + s.Headers.segments = s.Headers.segments[:0] + s.Txs.segments = s.Txs.segments[:0] for _, f := range files { { seg := &BodySegment{From: f.From, To: f.To} @@ -426,10 +431,14 @@ func (s *RoSnapshots) Reopen() error { } if f.To > 0 { - s.segmentsMax.Store(f.To - 1) + segmentsMax = f.To - 1 } else { - s.segmentsMax.Store(0) + segmentsMax = 0 } + segmentsMaxSet = true + } + if segmentsMaxSet { + s.segmentsMax.Store(segmentsMax) } s.segmentsReady.Store(true) @@ -473,6 +482,11 @@ func (s *RoSnapshots) ReopenSegments() error { if err != nil { return err } + s.Bodies.segments = s.Bodies.segments[:0] + s.Headers.segments = s.Headers.segments[:0] + s.Txs.segments = s.Txs.segments[:0] + var segmentsMax uint64 + var segmentsMaxSet bool for _, f := range files { { seg := &BodySegment{From: f.From, To: f.To} @@ -487,7 +501,6 @@ func (s *RoSnapshots) ReopenSegments() error { s.Bodies.segments = append(s.Bodies.segments, seg) } { - fmt.Printf("reopen segment: %d-%d\n", f.From, f.To) seg := &HeaderSegment{From: f.From, To: f.To} fileName := snap.SegmentFileName(f.From, f.To, snap.Headers) seg.seg, err = compress.NewDecompressor(path.Join(s.dir, fileName)) @@ -513,10 +526,14 @@ func (s *RoSnapshots) ReopenSegments() error { } if f.To > 0 { - s.segmentsMax.Store(f.To - 1) + segmentsMax = f.To - 1 } else { - s.segmentsMax.Store(0) + segmentsMax = 0 } + segmentsMaxSet = true + } + if segmentsMaxSet { + s.segmentsMax.Store(segmentsMax) } s.segmentsReady.Store(true) return nil @@ -897,7 +914,7 @@ func CanDeleteTo(curBlockNum uint64, snapshots *RoSnapshots) (blockTo uint64) { hardLimit := (curBlockNum/1_000)*1_000 - params.FullImmutabilityThreshold return cmp.Min(hardLimit, snapshots.BlocksAvailable()+1) } -func (br *BlockRetire) RetireBlocksInBackground(ctx context.Context, blockFrom, blockTo uint64, chainID uint256.Int, lvl log.Lvl) { +func (br *BlockRetire) RetireBlocksInBackground(ctx context.Context, forwardProgress uint64, chainID uint256.Int, lvl log.Lvl) { if br.working.Load() { // go-routine is still working return @@ -906,7 +923,6 @@ func (br *BlockRetire) RetireBlocksInBackground(ctx context.Context, blockFrom, // Prevent invocation for the same range twice, result needs to be cleared in the Result() function return } - br.result = nil br.wg.Add(1) go func() { @@ -914,6 +930,11 @@ func (br *BlockRetire) RetireBlocksInBackground(ctx context.Context, blockFrom, defer br.working.Store(false) defer br.wg.Done() + blockFrom, blockTo, ok := CanRetire(forwardProgress, br.Snapshots()) + if !ok { + return + } + err := retireBlocks(ctx, blockFrom, blockTo, chainID, br.tmpDir, br.snapshots, br.db, br.workers, br.downloader, lvl, br.notifier) br.result = &BlockRetireResult{ BlockFrom: blockFrom, @@ -933,8 +954,8 @@ func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint25 if err := DumpBlocks(ctx, blockFrom, blockTo, snap.DEFAULT_SEGMENT_SIZE, tmpDir, snapshots.Dir(), db, workers, lvl); err != nil { return fmt.Errorf("DumpBlocks: %w", err) } - if err := snapshots.ReopenSegments(); err != nil { - return fmt.Errorf("ReopenSegments: %w", err) + if err := snapshots.Reopen(); err != nil { + return fmt.Errorf("Reopen: %w", err) } idxWorkers := workers if idxWorkers > 4 { @@ -943,6 +964,9 @@ func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint25 if err := BuildIndices(ctx, snapshots, chainID, tmpDir, snapshots.IndicesMax(), idxWorkers, log.LvlInfo); err != nil { return err } + if err := snapshots.Reopen(); err != nil { + return fmt.Errorf("Reopen: %w", err) + } merger := NewMerger(tmpDir, workers, lvl, chainID, notifier) ranges := merger.FindMergeRanges(snapshots) if len(ranges) == 0 {