diff --git a/cmd/headers/download/downloader.go b/cmd/headers/download/downloader.go index 4150b796a87977a6b93908cd1ecd2b153cea2e87..3f0709b72d69309fbf6dd32e10d677a66a1026d2 100644 --- a/cmd/headers/download/downloader.go +++ b/cmd/headers/download/downloader.go @@ -110,7 +110,7 @@ func RecvMessage(ctx context.Context, sentry proto_sentry.SentryClient, handleIn } //Deprecated - use stages.StageLoop -func Loop(ctx context.Context, db ethdb.Database, sync *stagedsync.StagedSync, controlServer *ControlServerImpl, notifier stagedsync.ChainEventNotifier) { +func Loop(ctx context.Context, db ethdb.Database, sync *stagedsync.StagedSync, controlServer *ControlServerImpl, notifier stagedsync.ChainEventNotifier, waitForDone chan struct{}) { stages.StageLoop( ctx, db, @@ -118,6 +118,7 @@ func Loop(ctx context.Context, db ethdb.Database, sync *stagedsync.StagedSync, c controlServer.hd, controlServer.chainConfig, notifier, + waitForDone, ) } diff --git a/eth/backend.go b/eth/backend.go index 931a102ac08bfebfa7727d4fa91ee2dcb0b93f0b..d5a35be0d03ce53684e696d04207cc85b53aa4c4 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -105,13 +105,15 @@ type Ethereum struct { minedBlocks chan *types.Block // downloader v2 fields - downloadV2Ctx context.Context - downloadV2Cancel context.CancelFunc - downloadServer *download.ControlServerImpl - sentryServer *download.SentryServerImpl - txPoolP2PServer *eth.TxPoolServer - sentries []proto_sentry.SentryClient - stagedSync2 *stagedsync.StagedSync + downloadV2Ctx context.Context + downloadV2Cancel context.CancelFunc + downloadServer *download.ControlServerImpl + sentryServer *download.SentryServerImpl + txPoolP2PServer *eth.TxPoolServer + sentries []proto_sentry.SentryClient + stagedSync2 *stagedsync.StagedSync + waitForStageLoopStop chan struct{} + waitForMiningStop chan struct{} } // New creates a new Ethereum object (including the @@ -171,15 +173,17 @@ func New(stack *node.Node, config *ethconfig.Config, gitCommit string) (*Ethereu log.Info("Initialised chain configuration", "config", chainConfig) backend := &Ethereum{ - config: config, - chainDB: chainDb, - chainKV: chainDb.(ethdb.HasRwKV).RwKV(), - networkID: config.NetworkID, - etherbase: config.Miner.Etherbase, - p2pServer: stack.Server(), - torrentClient: torrentClient, - chainConfig: chainConfig, - genesisHash: genesisHash, + config: config, + chainDB: chainDb, + chainKV: chainDb.(ethdb.HasRwKV).RwKV(), + networkID: config.NetworkID, + etherbase: config.Miner.Etherbase, + p2pServer: stack.Server(), + torrentClient: torrentClient, + chainConfig: chainConfig, + genesisHash: genesisHash, + waitForStageLoopStop: make(chan struct{}), + waitForMiningStop: make(chan struct{}), } backend.gasPrice, _ = uint256.FromBig(config.Miner.GasPrice) @@ -603,6 +607,7 @@ func (s *Ethereum) StartMining(kv ethdb.RwKV, pendingBlocksCh chan *types.Block, } go func() { + defer close(s.waitForMiningStop) newTransactions := make(chan core.NewTxsEvent, txChanSize) sub := s.txPool.SubscribeNewTxsEvent(newTransactions) defer sub.Unsubscribe() @@ -716,7 +721,7 @@ func (s *Ethereum) Start() error { if s.config.EnableDownloadV2 { go download.RecvMessage(s.downloadV2Ctx, s.sentries[0], s.downloadServer.HandleInboundMessage) go download.RecvUploadMessage(s.downloadV2Ctx, s.sentries[0], s.downloadServer.HandleInboundMessage) - go download.Loop(s.downloadV2Ctx, s.chainDB, s.stagedSync2, s.downloadServer, s.events) + go download.Loop(s.downloadV2Ctx, s.chainDB, s.stagedSync2, s.downloadServer, s.events, s.waitForStageLoopStop) } else { eth.StartENRUpdater(s.chainConfig, s.genesisHash, s.events, s.p2pServer.LocalNode()) // Start the networking layer and the light server if requested @@ -760,5 +765,11 @@ func (s *Ethereum) Stop() error { if s.txPool != nil { s.txPool.Stop() } + if s.config.EnableDownloadV2 { + <-s.waitForStageLoopStop + } + if s.config.Miner.Enabled { + <-s.waitForMiningStop + } return nil } diff --git a/eth/stagedsync/stage_headers_new.go b/eth/stagedsync/stage_headers_new.go index 9988d59e5a10eb5333e33536b4ebc9403b544c45..cb306c3f101ddc2957f89c45c1246bd781700b4d 100644 --- a/eth/stagedsync/stage_headers_new.go +++ b/eth/stagedsync/stage_headers_new.go @@ -240,7 +240,7 @@ func HeadersForward( } log.Info(fmt.Sprintf("[%s] Processed", logPrefix), "highest", headerInserter.GetHighest(), "age", common.PrettyAge(time.Unix(int64(headerInserter.GetHighestTimestamp()), 0))) if stopped { - return fmt.Errorf("interrupted") + return common.ErrStopped } stageHeadersGauge.Update(int64(headerInserter.GetHighest())) return nil @@ -261,6 +261,7 @@ func fixCanonicalChain(logPrefix string, height uint64, hash common.Hash, tx eth ancestor := rawdb.ReadHeader(tx, ancestorHash, ancestorHeight) if ancestor == nil { log.Error("ancestor nil", "height", ancestorHeight, "hash", ancestorHash) + return err } else { log.Debug("ancestor", "height", ancestorHeight, "hash", ancestorHash) } diff --git a/turbo/stages/headerdownload/header_algos.go b/turbo/stages/headerdownload/header_algos.go index 2f65facf9daff20bc02933fcab5d3b9b993d2c1b..20676e089fadd3ff976df70ac53f414bbbba2da3 100644 --- a/turbo/stages/headerdownload/header_algos.go +++ b/turbo/stages/headerdownload/header_algos.go @@ -435,6 +435,8 @@ func (hd *HeaderDownload) SetPreverifiedHashes(preverifiedHashes map[common.Hash } func (hd *HeaderDownload) RecoverFromDb(db ethdb.Database) error { + hd.lock.Lock() + defer hd.lock.Unlock() // Drain persistedLinksQueue and remove links for hd.persistedLinkQueue.Len() > 0 { link := heap.Pop(hd.persistedLinkQueue).(*Link) diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index 05ca86a354a075ca2ae15fd1e0c6d0a351639aa5..e0261c5fcfe163ab895ea03b835a252c39c95077 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -3,12 +3,14 @@ package stages import ( "context" "encoding/binary" + "errors" "fmt" "runtime/debug" "strings" "time" "github.com/c2h5oh/datasize" + "github.com/ledgerwatch/turbo-geth/common" "github.com/ledgerwatch/turbo-geth/common/dbutils" "github.com/ledgerwatch/turbo-geth/core/vm" "github.com/ledgerwatch/turbo-geth/eth/stagedsync" @@ -51,7 +53,9 @@ func StageLoop( hd *headerdownload.HeaderDownload, chainConfig *params.ChainConfig, notifier stagedsync.ChainEventNotifier, + waitForDone chan struct{}, ) { + defer close(waitForDone) initialCycle := true for { @@ -64,6 +68,10 @@ func StageLoop( // Estimate the current top height seen from the peer height := hd.TopSeenHeight() if err := StageLoopStep(ctx, db, sync, height, chainConfig, notifier, initialCycle); err != nil { + if errors.Is(err, common.ErrStopped) { + return + } + log.Error("Stage loop failure", "error", err) if recoveryErr := hd.RecoverFromDb(db); recoveryErr != nil { log.Error("Failed to recover header downoader", "error", recoveryErr)