From 74a7d7c75a958b59d089ade4e348494bcad477ac Mon Sep 17 00:00:00 2001 From: Andrew Ashikhmin <34320705+yperbasis@users.noreply.github.com> Date: Tue, 22 Mar 2022 17:49:12 +0100 Subject: [PATCH] Refactor PoS downloader (#3717) * Remove blockHeight arg from FeedHeaderPoW * Revert "Remove blockHeight arg from FeedHeaderPoW" This reverts commit 9bf92921db11cd4e13386fcee29f30241d070fc6. * Move PayloadMessage & ForkChoiceMessage to package engineapi * RequestList instead of newPayloadCh & forkChoiceCh * Introduce request status * Send reply only when request status is New * Move BeaconRequestList to HeaderDownload * Don't SetFetching when PoS (Fetching means handling newBlockHashes) * Merge Syncing & Synced into DataWasMissing * Introduce SyncStatus * onlyNew arg in WaitForRequest * Move waitingForBeaconChain into RequestList * Interrupt instead of skipCycleHack * Introduce timeout for PoS headers * Split downloadMissingPoSHeaders * Move StartPoSDownloader into HeaderDownload * Move Stopping interrupt to StartPoSDownloader * Move stopping PayloadStatus back to HeadersPOS * cleanUpPoSDownload * Post-merge fix * TestBogusForkchoice * TestPoSDownloader * requestStatus in attemptPoSDownload * Broadcast in SetStatus * Cosmetics * attemptPoSDownload -> schedulePoSDownload * Demote some logs to Trace --- cmd/integration/commands/stages.go | 3 +- cmd/rpcdaemon/rpcdaemontest/test_util.go | 2 +- eth/backend.go | 17 +- eth/stagedsync/stage_headers.go | 346 ++++++++---------- ethdb/privateapi/engine_test.go | 33 +- ethdb/privateapi/ethbackend.go | 93 ++--- go.mod | 1 + go.sum | 2 + turbo/engineapi/request_list.go | 167 +++++++++ turbo/stages/headerdownload/header_algos.go | 180 +++++++-- .../headerdownload/header_data_struct.go | 27 +- turbo/stages/mock_sentry.go | 26 +- turbo/stages/sentry_mock_test.go | 111 +++++- turbo/stages/stageloop.go | 6 - 14 files changed, 667 insertions(+), 347 deletions(-) create mode 100644 turbo/engineapi/request_list.go diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index 05af41ff76..525f67d0ba 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -1168,8 +1168,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig) sync, err := stages2.NewStagedSync(context.Background(), logger, db, p2p.Config{}, cfg, chainConfig.TerminalTotalDifficulty, sentryControlServer, tmpdir, - nil, nil, nil, nil, nil, - allSn, + nil, nil, allSn, ) if err != nil { panic(err) diff --git a/cmd/rpcdaemon/rpcdaemontest/test_util.go b/cmd/rpcdaemon/rpcdaemontest/test_util.go index cd2c009818..b115076242 100644 --- a/cmd/rpcdaemon/rpcdaemontest/test_util.go +++ b/cmd/rpcdaemon/rpcdaemontest/test_util.go @@ -222,7 +222,7 @@ func CreateTestGrpcConn(t *testing.T, m *stages.MockSentry) (context.Context, *g ethashApi := apis[1].Service.(*ethash.API) server := grpc.NewServer() - remote.RegisterETHBACKENDServer(server, privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, snapshotsync.NewBlockReader(), nil, nil, nil, nil, nil, nil, nil, false)) + remote.RegisterETHBACKENDServer(server, privateapi.NewEthBackendServer(ctx, nil, m.DB, m.Notifications.Events, snapshotsync.NewBlockReader(), nil, nil, nil, nil, false)) txpool.RegisterTxpoolServer(server, m.TxPoolGrpcServer) txpool.RegisterMiningServer(server, privateapi.NewMiningServer(ctx, &IsMiningMock{}, ethashApi)) starknet.RegisterCAIROVMServer(server, &starknet.UnimplementedCAIROVMServer{}) diff --git a/eth/backend.go b/eth/backend.go index 0c563d3248..74b2db644b 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -27,7 +27,6 @@ import ( "sort" "strconv" "sync" - "sync/atomic" "time" "github.com/holiman/uint256" @@ -134,11 +133,6 @@ type Ethereum struct { txPool2Send *txpool2.Send txPool2GrpcServer txpool_proto.TxpoolServer notifyMiningAboutNewTxs chan struct{} - // When we receive something here, it means that the beacon chain transitioned - // to proof-of-stake so we start reverse syncing from the block - newPayloadCh chan privateapi.PayloadMessage - forkChoiceCh chan privateapi.ForkChoiceMessage - waitingForBeaconChain uint32 // atomic boolean flag downloadProtocols *downloader.Protocols } @@ -373,8 +367,6 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l miner := stagedsync.NewMiningState(&config.Miner) backend.pendingBlocks = miner.PendingResultCh backend.minedBlocks = miner.MiningResultCh - backend.newPayloadCh = make(chan privateapi.PayloadMessage) - backend.forkChoiceCh = make(chan privateapi.ForkChoiceMessage) // proof-of-work mining mining := stagedsync.New( @@ -409,11 +401,11 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l block := <-miningStatePos.MiningResultPOSCh return block, nil } - atomic.StoreUint32(&backend.waitingForBeaconChain, 0) + // Initialize ethbackend ethBackendRPC := privateapi.NewEthBackendServer(ctx, backend, backend.chainDB, backend.notifications.Events, - blockReader, chainConfig, backend.newPayloadCh, backend.forkChoiceCh, backend.sentryControlServer.Hd.PayloadStatusCh, - &backend.waitingForBeaconChain, backend.sentryControlServer.Hd.SkipCycleHack, assembleBlockPOS, config.Miner.EnabledPOS) + blockReader, chainConfig, backend.sentryControlServer.Hd.BeaconRequestList, backend.sentryControlServer.Hd.PayloadStatusCh, + assembleBlockPOS, config.Miner.EnabledPOS) miningRPC = privateapi.NewMiningServer(ctx, backend, ethashApi) // If we enabled the proposer flag we initiates the block proposing thread if config.Miner.EnabledPOS && chainConfig.TerminalTotalDifficulty != nil { @@ -493,12 +485,13 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l backend.stagedSync, err = stages2.NewStagedSync(backend.sentryCtx, backend.log, backend.chainDB, stack.Config().P2P, *config, chainConfig.TerminalTotalDifficulty, backend.sentryControlServer, tmpdir, backend.notifications.Accumulator, - backend.newPayloadCh, backend.forkChoiceCh, &backend.waitingForBeaconChain, backend.downloaderClient, allSnapshots) if err != nil { return nil, err } + backend.sentryControlServer.Hd.StartPoSDownloader(backend.sentryCtx, backend.sentryControlServer.SendHeaderRequest, backend.sentryControlServer.Penalize) + emptyBadHash := config.BadBlockHash == common.Hash{} if !emptyBadHash { var badBlockHeader *types.Header diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index 8cf57ccc56..8e6a327775 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -7,7 +7,6 @@ import ( "fmt" "math/big" "runtime" - "sync/atomic" "time" "github.com/c2h5oh/datasize" @@ -29,6 +28,7 @@ import ( "github.com/ledgerwatch/erigon/p2p/enode" "github.com/ledgerwatch/erigon/params" "github.com/ledgerwatch/erigon/rlp" + "github.com/ledgerwatch/erigon/turbo/engineapi" "github.com/ledgerwatch/erigon/turbo/snapshotsync" "github.com/ledgerwatch/erigon/turbo/snapshotsync/snapshothashes" "github.com/ledgerwatch/erigon/turbo/stages/bodydownload" @@ -41,20 +41,17 @@ import ( const ShortPoSReorgThresholdBlocks = 10 type HeadersCfg struct { - db kv.RwDB - hd *headerdownload.HeaderDownload - bodyDownload *bodydownload.BodyDownload - chainConfig params.ChainConfig - headerReqSend func(context.Context, *headerdownload.HeaderRequest) (enode.ID, bool) - announceNewHashes func(context.Context, []headerdownload.Announce) - penalize func(context.Context, []headerdownload.PenaltyItem) - batchSize datasize.ByteSize - noP2PDiscovery bool - tmpdir string - snapshotDir *dir.Rw - newPayloadCh chan privateapi.PayloadMessage - forkChoiceCh chan privateapi.ForkChoiceMessage - waitingForBeaconChain *uint32 // atomic boolean flag + db kv.RwDB + hd *headerdownload.HeaderDownload + bodyDownload *bodydownload.BodyDownload + chainConfig params.ChainConfig + headerReqSend func(context.Context, *headerdownload.HeaderRequest) (enode.ID, bool) + announceNewHashes func(context.Context, []headerdownload.Announce) + penalize func(context.Context, []headerdownload.PenaltyItem) + batchSize datasize.ByteSize + noP2PDiscovery bool + tmpdir string + snapshotDir *dir.Rw snapshots *snapshotsync.RoSnapshots snapshotHashesCfg *snapshothashes.Config @@ -72,9 +69,6 @@ func StageHeadersCfg( penalize func(context.Context, []headerdownload.PenaltyItem), batchSize datasize.ByteSize, noP2PDiscovery bool, - newPayloadCh chan privateapi.PayloadMessage, - forkChoiceCh chan privateapi.ForkChoiceMessage, - waitingForBeaconChain *uint32, // atomic boolean flag snapshots *snapshotsync.RoSnapshots, snapshotDownloader proto_downloader.DownloaderClient, blockReader interfaces.FullBlockReader, @@ -82,24 +76,21 @@ func StageHeadersCfg( snapshotDir *dir.Rw, ) HeadersCfg { return HeadersCfg{ - db: db, - hd: headerDownload, - bodyDownload: bodyDownload, - chainConfig: chainConfig, - headerReqSend: headerReqSend, - announceNewHashes: announceNewHashes, - penalize: penalize, - batchSize: batchSize, - tmpdir: tmpdir, - noP2PDiscovery: noP2PDiscovery, - newPayloadCh: newPayloadCh, - forkChoiceCh: forkChoiceCh, - waitingForBeaconChain: waitingForBeaconChain, - snapshots: snapshots, - snapshotDownloader: snapshotDownloader, - blockReader: blockReader, - snapshotHashesCfg: snapshothashes.KnownConfig(chainConfig.ChainName), - snapshotDir: snapshotDir, + db: db, + hd: headerDownload, + bodyDownload: bodyDownload, + chainConfig: chainConfig, + headerReqSend: headerReqSend, + announceNewHashes: announceNewHashes, + penalize: penalize, + batchSize: batchSize, + tmpdir: tmpdir, + noP2PDiscovery: noP2PDiscovery, + snapshots: snapshots, + snapshotDownloader: snapshotDownloader, + blockReader: blockReader, + snapshotHashesCfg: snapshothashes.KnownConfig(chainConfig.ChainName), + snapshotDir: snapshotDir, } } @@ -182,58 +173,55 @@ func HeadersPOS( ) error { log.Info(fmt.Sprintf("[%s] Waiting for Beacon Chain...", s.LogPrefix())) - atomic.StoreUint32(cfg.waitingForBeaconChain, 1) - defer atomic.StoreUint32(cfg.waitingForBeaconChain, 0) + onlyNewRequests := cfg.hd.PosStatus() == headerdownload.Syncing + interrupt, requestId, requestWithStatus := cfg.hd.BeaconRequestList.WaitForRequest(onlyNewRequests) - var payloadMessage privateapi.PayloadMessage - var forkChoiceMessage privateapi.ForkChoiceMessage + cfg.hd.SetHeaderReader(&chainReader{config: &cfg.chainConfig, tx: tx, blockReader: cfg.blockReader}) + headerInserter := headerdownload.NewHeaderInserter(s.LogPrefix(), nil, s.BlockNumber, cfg.blockReader) - // Decide what kind of action we need to take place - forkChoiceInsteadOfNewPayload := false - select { - case <-ctx.Done(): - cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{CriticalError: errors.New("server is stopping")} - if !useExternalTx { - return tx.Commit() + if interrupt != engineapi.None { + if interrupt == engineapi.Stopping { + cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{CriticalError: errors.New("server is stopping")} + } + if interrupt == engineapi.Synced { + verifyAndSaveDownloadedPoSHeaders(tx, cfg, headerInserter) } - return nil - case <-cfg.hd.SkipCycleHack: if !useExternalTx { return tx.Commit() } return nil - case forkChoiceMessage = <-cfg.forkChoiceCh: - forkChoiceInsteadOfNewPayload = true - case payloadMessage = <-cfg.newPayloadCh: } - atomic.StoreUint32(cfg.waitingForBeaconChain, 0) + request := requestWithStatus.Message + status := requestWithStatus.Status - cfg.hd.ClearPendingPayloadStatus() - - cfg.hd.SetHeaderReader(&chainReader{config: &cfg.chainConfig, tx: tx, blockReader: cfg.blockReader}) + // Decide what kind of action we need to take place + var payloadMessage *engineapi.PayloadMessage + forkChoiceMessage, forkChoiceInsteadOfNewPayload := request.(*engineapi.ForkChoiceMessage) + if !forkChoiceInsteadOfNewPayload { + payloadMessage = request.(*engineapi.PayloadMessage) + } - headerInserter := headerdownload.NewHeaderInserter(s.LogPrefix(), nil, s.BlockNumber, cfg.blockReader) + cfg.hd.ClearPendingPayloadStatus() if forkChoiceInsteadOfNewPayload { - handleForkChoice(&forkChoiceMessage, s, u, ctx, tx, cfg, headerInserter) + handleForkChoice(forkChoiceMessage, status, requestId, s, u, ctx, tx, cfg, headerInserter) } else { - if err := handleNewPayload(&payloadMessage, s, ctx, tx, cfg, headerInserter); err != nil { + if err := handleNewPayload(payloadMessage, status, requestId, s, ctx, tx, cfg, headerInserter); err != nil { return err } } if !useExternalTx { - if err := tx.Commit(); err != nil { - return err - } + return tx.Commit() } - return nil } func handleForkChoice( - forkChoiceMessage *privateapi.ForkChoiceMessage, + forkChoiceMessage *engineapi.ForkChoiceMessage, + requestStatus engineapi.RequestStatus, + requestId int, s *StageState, u Unwinder, ctx context.Context, @@ -247,9 +235,12 @@ func handleForkChoice( currentHeadHash := rawdb.ReadHeadHeaderHash(tx) if currentHeadHash == headerHash { // no-op log.Info(fmt.Sprintf("[%s] Fork choice no-op", s.LogPrefix())) - cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{ - Status: remote.EngineStatus_VALID, - LatestValidHash: currentHeadHash, + cfg.hd.BeaconRequestList.Remove(requestId) + if requestStatus == engineapi.New { + cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{ + Status: remote.EngineStatus_VALID, + LatestValidHash: currentHeadHash, + } } return nil } @@ -257,29 +248,23 @@ func handleForkChoice( header, err := rawdb.ReadHeaderByHash(tx, headerHash) if err != nil { log.Warn(fmt.Sprintf("[%s] Fork choice err", s.LogPrefix()), "err", err) - cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{CriticalError: err} + cfg.hd.BeaconRequestList.Remove(requestId) + if requestStatus == engineapi.New { + cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{CriticalError: err} + } return err } - repliedWithSyncStatus := false if header == nil { log.Info(fmt.Sprintf("[%s] Fork choice missing header", s.LogPrefix())) - cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{Status: remote.EngineStatus_SYNCING} - repliedWithSyncStatus = true - hashToDownload := headerHash heighToDownload := cfg.hd.TopSeenHeight() // approximate - success, err := downloadMissingPoSHeaders(hashToDownload, heighToDownload, s, ctx, tx, cfg, headerInserter) - if err != nil { - return err - } - if !success { - return nil - } - - header = rawdb.ReadHeader(tx, headerHash, headerInserter.GetHighest()) + schedulePoSDownload(requestStatus, requestId, hashToDownload, heighToDownload, s, cfg) + return nil } + cfg.hd.BeaconRequestList.Remove(requestId) + headerNumber := header.Number.Uint64() cfg.hd.UpdateTopSeenHeightPoS(headerNumber) @@ -288,14 +273,14 @@ func handleForkChoice( parent := rawdb.ReadHeader(tx, header.ParentHash, headerNumber-1) forkingPoint, err = headerInserter.ForkingPoint(tx, header, parent) if err != nil { - if !repliedWithSyncStatus { + if requestStatus == engineapi.New { cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{CriticalError: err} } return err } } - if !repliedWithSyncStatus { + if requestStatus == engineapi.New { if headerNumber-forkingPoint <= ShortPoSReorgThresholdBlocks { log.Info(fmt.Sprintf("[%s] Short range re-org", s.LogPrefix()), "headerNumber", headerNumber, "forkingPoint", forkingPoint) // TODO(yperbasis): what if some bodies are missing and we have to download them? @@ -316,7 +301,9 @@ func handleForkChoice( } func handleNewPayload( - payloadMessage *privateapi.PayloadMessage, + payloadMessage *engineapi.PayloadMessage, + requestStatus engineapi.RequestStatus, + requestId int, s *StageState, ctx context.Context, tx kv.RwTx, @@ -334,60 +321,56 @@ func handleNewPayload( existingCanonicalHash, err := rawdb.ReadCanonicalHash(tx, headerNumber) if err != nil { log.Warn(fmt.Sprintf("[%s] New payload err", s.LogPrefix()), "err", err) - cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{CriticalError: err} + cfg.hd.BeaconRequestList.Remove(requestId) + if requestStatus == engineapi.New { + cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{CriticalError: err} + } return err } if existingCanonicalHash != (common.Hash{}) && headerHash == existingCanonicalHash { log.Info(fmt.Sprintf("[%s] New payload: previously received valid header", s.LogPrefix())) - cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{ - Status: remote.EngineStatus_VALID, - LatestValidHash: headerHash, + cfg.hd.BeaconRequestList.Remove(requestId) + if requestStatus == engineapi.New { + cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{ + Status: remote.EngineStatus_VALID, + LatestValidHash: headerHash, + } } return nil } - // If we have the parent then we can move on with the stagedsync - parent := rawdb.ReadHeader(tx, header.ParentHash, headerNumber-1) - transactions, err := types.DecodeTransactions(payloadMessage.Body.Transactions) if err != nil { log.Warn("Error during Beacon transaction decoding", "err", err.Error()) - cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{ - Status: remote.EngineStatus_INVALID, - LatestValidHash: header.ParentHash, // TODO(yperbasis): potentially wrong when parent is nil - ValidationError: err, + cfg.hd.BeaconRequestList.Remove(requestId) + if requestStatus == engineapi.New { + cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{ + Status: remote.EngineStatus_INVALID, + LatestValidHash: header.ParentHash, // TODO(yperbasis): potentially wrong when parent is nil + ValidationError: err, + } } return nil } - if parent != nil { - log.Info(fmt.Sprintf("[%s] New payload begin verification", s.LogPrefix())) - success, err := verifyAndSaveNewPoSHeader(s, tx, cfg, header, headerInserter) - log.Info(fmt.Sprintf("[%s] New payload verification ended", s.LogPrefix()), "success", success, "err", err) - if err != nil || !success { - return err - } - } else { - log.Info(fmt.Sprintf("[%s] New payload missing parent", s.LogPrefix())) - cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{Status: remote.EngineStatus_SYNCING} + parent := rawdb.ReadHeader(tx, header.ParentHash, headerNumber-1) + if parent == nil { + log.Info(fmt.Sprintf("[%s] New payload missing parent", s.LogPrefix())) hashToDownload := header.ParentHash heightToDownload := headerNumber - 1 - success, err := downloadMissingPoSHeaders(hashToDownload, heightToDownload, s, ctx, tx, cfg, headerInserter) - if err != nil || !success { - return err - } + schedulePoSDownload(requestStatus, requestId, hashToDownload, heightToDownload, s, cfg) + return nil + } - if verificationErr := cfg.hd.VerifyHeader(header); verificationErr != nil { - log.Warn("Verification failed for header", "hash", headerHash, "height", headerNumber, "err", verificationErr) - return nil - } + cfg.hd.BeaconRequestList.Remove(requestId) - err = headerInserter.FeedHeaderPoS(tx, header, headerHash) - if err != nil { - return err - } + log.Info(fmt.Sprintf("[%s] New payload begin verification", s.LogPrefix())) + success, err := verifyAndSaveNewPoSHeader(requestStatus, s, tx, cfg, header, headerInserter) + log.Info(fmt.Sprintf("[%s] New payload verification ended", s.LogPrefix()), "success", success, "err", err) + if err != nil || !success { + return err } if cfg.bodyDownload != nil { @@ -399,6 +382,7 @@ func handleNewPayload( } func verifyAndSaveNewPoSHeader( + requestStatus engineapi.RequestStatus, s *StageState, tx kv.RwTx, cfg HeadersCfg, @@ -410,24 +394,30 @@ func verifyAndSaveNewPoSHeader( if verificationErr := cfg.hd.VerifyHeader(header); verificationErr != nil { log.Warn("Verification failed for header", "hash", headerHash, "height", headerNumber, "err", verificationErr) - cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{ - Status: remote.EngineStatus_INVALID, - LatestValidHash: header.ParentHash, - ValidationError: verificationErr, + if requestStatus == engineapi.New { + cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{ + Status: remote.EngineStatus_INVALID, + LatestValidHash: header.ParentHash, + ValidationError: verificationErr, + } } return } err = headerInserter.FeedHeaderPoS(tx, header, headerHash) if err != nil { - cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{CriticalError: err} + if requestStatus == engineapi.New { + cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{CriticalError: err} + } return } headBlockHash := rawdb.ReadHeadBlockHash(tx) if headBlockHash == header.ParentHash { // OK, we're on the canonical chain - cfg.hd.SetPendingPayloadStatus(headerHash) + if requestStatus == engineapi.New { + cfg.hd.SetPendingPayloadStatus(headerHash) + } logEvery := time.NewTicker(logInterval) defer logEvery.Stop() @@ -449,7 +439,11 @@ func verifyAndSaveNewPoSHeader( } } else { // Side chain or something weird - cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{Status: remote.EngineStatus_ACCEPTED} + // TODO(yperbasis): considered non-canonical because some missing headers were donloaded but not canonized + // Or it's not a problem because forkChoice is updated frequently? + if requestStatus == engineapi.New { + cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{Status: remote.EngineStatus_ACCEPTED} + } // No canonization, HeadHeaderHash & StageProgress are not updated } @@ -457,85 +451,40 @@ func verifyAndSaveNewPoSHeader( return } -func downloadMissingPoSHeaders( - hashToDownloadPoS common.Hash, +func schedulePoSDownload( + requestStatus engineapi.RequestStatus, + requestId int, + hashToDownload common.Hash, heightToDownload uint64, s *StageState, - ctx context.Context, - tx kv.RwTx, cfg HeadersCfg, - headerInserter *headerdownload.HeaderInserter, -) (success bool, err error) { - cfg.hd.SetPOSSync(true) - err = cfg.hd.ReadProgressFromDb(tx) - if err != nil { - return +) { + if requestStatus == engineapi.New { + cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{Status: remote.EngineStatus_SYNCING} } + cfg.hd.BeaconRequestList.SetStatus(requestId, engineapi.DataWasMissing) - cfg.hd.SetFetching(true) - cfg.hd.SetHashToDownloadPoS(hashToDownloadPoS) - cfg.hd.SetHeightToDownloadPoS(heightToDownload) + if cfg.hd.PosStatus() != headerdownload.Idle { + log.Trace(fmt.Sprintf("[%s] Postponing PoS download since another one is in progress", s.LogPrefix()), "height", heightToDownload, "hash", hashToDownload) + return + } - log.Info(fmt.Sprintf("[%s] Downloading PoS headers...", s.LogPrefix()), "height", heightToDownload, "hash", hashToDownloadPoS) + log.Info(fmt.Sprintf("[%s] Downloading PoS headers...", s.LogPrefix()), "height", heightToDownload, "hash", hashToDownload, "requestId", requestId) - stopped := false - prevProgress := uint64(0) + cfg.hd.SetPOSSync(true) + cfg.hd.SetRequestId(requestId) + cfg.hd.SetHeaderToDownloadPoS(hashToDownload, heightToDownload) + //nolint headerCollector := etl.NewCollector(s.LogPrefix(), cfg.tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize)) - defer headerCollector.Close() - cfg.hd.SetHeadersCollector(headerCollector) - // Cleanup after we finish backward sync - defer func() { - cfg.hd.SetHeadersCollector(nil) - cfg.hd.Unsync() - cfg.hd.SetFetching(false) - }() - - logEvery := time.NewTicker(logInterval) - defer logEvery.Stop() - - var req headerdownload.HeaderRequest - for !stopped { - sentToPeer := false - maxRequests := 4096 - for !sentToPeer && !stopped && maxRequests != 0 { - // TODO(yperbasis): handle the case when are not able to sync a chain - req = cfg.hd.RequestMoreHeadersForPOS() - _, sentToPeer = cfg.headerReqSend(ctx, &req) - maxRequests-- - } + // headerCollector is closed in verifyAndSaveDownloadedPoSHeaders, thus nolint - if cfg.hd.Synced() { - stopped = true - } - // Sleep and check for logs - timer := time.NewTimer(2 * time.Millisecond) - select { - case <-ctx.Done(): - stopped = true - case <-logEvery.C: - if prevProgress == 0 { - prevProgress = cfg.hd.Progress() - } else if cfg.hd.Progress() <= prevProgress { - diff := prevProgress - cfg.hd.Progress() - log.Info("Wrote Block Headers backwards", "now", cfg.hd.Progress(), - "blk/sec", float64(diff)/float64(logInterval/time.Second)) - prevProgress = cfg.hd.Progress() - } - case <-timer.C: - log.Trace("RequestQueueTime (header) ticked") - } - // Cleanup timer - timer.Stop() - } - - log.Info(fmt.Sprintf("[%s] Downloading PoS headers finished", s.LogPrefix())) + cfg.hd.SetHeadersCollector(headerCollector) - // If the user stopped it, we don't update anything - if !cfg.hd.Synced() { - return - } + cfg.hd.SetPosStatus(headerdownload.Syncing) +} +func verifyAndSaveDownloadedPoSHeaders(tx kv.RwTx, cfg HeadersCfg, headerInserter *headerdownload.HeaderInserter) error { headerLoadFunc := func(key, value []byte, _ etl.CurrentTableReader, _ etl.LoadNextFunc) error { var h types.Header if err := rlp.DecodeBytes(value, &h); err != nil { @@ -548,17 +497,24 @@ func downloadMissingPoSHeaders( return headerInserter.FeedHeaderPoS(tx, &h, h.Hash()) } - err = headerCollector.Load(tx, kv.Headers, headerLoadFunc, etl.TransformArgs{ + err := cfg.hd.HeadersCollector().Load(tx, kv.Headers, headerLoadFunc, etl.TransformArgs{ LogDetailsLoad: func(k, v []byte) (additionalLogArguments []interface{}) { return []interface{}{"block", binary.BigEndian.Uint64(k)} }, }) + if err != nil { - return + log.Warn("Removing beacon request due to", "err", err, "requestId", cfg.hd.RequestId()) + cfg.hd.BeaconRequestList.Remove(cfg.hd.RequestId()) + } else { + log.Info("PoS headers verified and saved", "requestId", cfg.hd.RequestId()) } - success = true - return + cfg.hd.HeadersCollector().Close() + cfg.hd.SetHeadersCollector(nil) + cfg.hd.SetPosStatus(headerdownload.Idle) + + return err } // HeadersPOW progresses Headers stage for Proof-of-Work headers diff --git a/ethdb/privateapi/engine_test.go b/ethdb/privateapi/engine_test.go index c98b6bebe4..42903819ad 100644 --- a/ethdb/privateapi/engine_test.go +++ b/ethdb/privateapi/engine_test.go @@ -2,7 +2,6 @@ package privateapi import ( "context" - "sync/atomic" "testing" "github.com/ledgerwatch/erigon-lib/gointerfaces" @@ -13,6 +12,7 @@ import ( "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/params" + "github.com/ledgerwatch/erigon/turbo/engineapi" "github.com/stretchr/testify/require" ) @@ -89,13 +89,11 @@ func TestMockDownloadRequest(t *testing.T) { require := require.New(t) makeTestDb(ctx, db) - newPayloadCh := make(chan PayloadMessage) - forkChoiceCh := make(chan ForkChoiceMessage) + beaconRequestList := engineapi.NewRequestList() statusCh := make(chan PayloadStatus) - waitingForHeaders := uint32(1) events := NewEvents() - backend := NewEthBackendServer(ctx, nil, db, events, nil, ¶ms.ChainConfig{TerminalTotalDifficulty: common.Big1}, newPayloadCh, forkChoiceCh, statusCh, &waitingForHeaders, nil, nil, false) + backend := NewEthBackendServer(ctx, nil, db, events, nil, ¶ms.ChainConfig{TerminalTotalDifficulty: common.Big1}, beaconRequestList, statusCh, nil, false) var err error var reply *remote.EnginePayloadStatus @@ -106,9 +104,8 @@ func TestMockDownloadRequest(t *testing.T) { done <- true }() - <-newPayloadCh + beaconRequestList.WaitForRequest(true) statusCh <- PayloadStatus{Status: remote.EngineStatus_SYNCING} - atomic.StoreUint32(&waitingForHeaders, 0) <-done require.NoError(err) require.Equal(reply.Status, remote.EngineStatus_SYNCING) @@ -151,13 +148,11 @@ func TestMockValidExecution(t *testing.T) { makeTestDb(ctx, db) - newPayloadCh := make(chan PayloadMessage) - forkChoiceCh := make(chan ForkChoiceMessage) + beaconRequestList := engineapi.NewRequestList() statusCh := make(chan PayloadStatus) - waitingForHeaders := uint32(1) events := NewEvents() - backend := NewEthBackendServer(ctx, nil, db, events, nil, ¶ms.ChainConfig{TerminalTotalDifficulty: common.Big1}, newPayloadCh, forkChoiceCh, statusCh, &waitingForHeaders, nil, nil, false) + backend := NewEthBackendServer(ctx, nil, db, events, nil, ¶ms.ChainConfig{TerminalTotalDifficulty: common.Big1}, beaconRequestList, statusCh, nil, false) var err error var reply *remote.EnginePayloadStatus @@ -168,7 +163,7 @@ func TestMockValidExecution(t *testing.T) { done <- true }() - <-newPayloadCh + beaconRequestList.WaitForRequest(true) statusCh <- PayloadStatus{ Status: remote.EngineStatus_VALID, @@ -189,13 +184,11 @@ func TestMockInvalidExecution(t *testing.T) { makeTestDb(ctx, db) - newPayloadCh := make(chan PayloadMessage) - forkChoiceCh := make(chan ForkChoiceMessage) + beaconRequestList := engineapi.NewRequestList() statusCh := make(chan PayloadStatus) - waitingForHeaders := uint32(1) events := NewEvents() - backend := NewEthBackendServer(ctx, nil, db, events, nil, ¶ms.ChainConfig{TerminalTotalDifficulty: common.Big1}, newPayloadCh, forkChoiceCh, statusCh, &waitingForHeaders, nil, nil, false) + backend := NewEthBackendServer(ctx, nil, db, events, nil, ¶ms.ChainConfig{TerminalTotalDifficulty: common.Big1}, beaconRequestList, statusCh, nil, false) var err error var reply *remote.EnginePayloadStatus @@ -206,7 +199,7 @@ func TestMockInvalidExecution(t *testing.T) { done <- true }() - <-newPayloadCh + beaconRequestList.WaitForRequest(true) // Simulate invalid status statusCh <- PayloadStatus{ Status: remote.EngineStatus_INVALID, @@ -227,13 +220,11 @@ func TestNoTTD(t *testing.T) { makeTestDb(ctx, db) - newPayloadCh := make(chan PayloadMessage) - forkChoiceCh := make(chan ForkChoiceMessage) + beaconRequestList := engineapi.NewRequestList() statusCh := make(chan PayloadStatus) - waitingForHeaders := uint32(1) events := NewEvents() - backend := NewEthBackendServer(ctx, nil, db, events, nil, ¶ms.ChainConfig{}, newPayloadCh, forkChoiceCh, statusCh, &waitingForHeaders, nil, nil, false) + backend := NewEthBackendServer(ctx, nil, db, events, nil, ¶ms.ChainConfig{}, beaconRequestList, statusCh, nil, false) var err error diff --git a/ethdb/privateapi/ethbackend.go b/ethdb/privateapi/ethbackend.go index 06c3a19e00..89d4e55c2f 100644 --- a/ethdb/privateapi/ethbackend.go +++ b/ethdb/privateapi/ethbackend.go @@ -7,7 +7,6 @@ import ( "math/big" "sort" "sync" - "sync/atomic" "time" "github.com/holiman/uint256" @@ -24,6 +23,7 @@ import ( "github.com/ledgerwatch/erigon/params" "github.com/ledgerwatch/erigon/rlp" "github.com/ledgerwatch/erigon/rpc" + "github.com/ledgerwatch/erigon/turbo/engineapi" "github.com/ledgerwatch/log/v3" "google.golang.org/protobuf/types/known/emptypb" ) @@ -54,20 +54,15 @@ type EthBackendServer struct { // Block proposing for proof-of-stake payloadId uint64 pendingPayloads map[uint64]*pendingPayload - // Send new Beacon Chain payloads to staged sync - newPayloadCh chan<- PayloadMessage - // Send Beacon Chain fork choice updates to staged sync - forkChoiceCh chan<- ForkChoiceMessage + // Send Beacon Chain requests to staged sync + requestList *engineapi.RequestList // Replies to newPayload & forkchoice requests - statusCh <-chan PayloadStatus - // Determines whether stageloop is processing a block or not - waitingForBeaconChain *uint32 // atomic boolean flag - skipCycleHack chan struct{} // with this channel we tell the stagedsync that we want to assemble a block - assemblePayloadPOS assemblePayloadPOSFunc - proposing bool - syncCond *sync.Cond // Engine API is asynchronous, we want to avoid CL to call different APIs at the same time - shutdown bool - logsFilter *LogsFilterAggregator + statusCh <-chan PayloadStatus + assemblePayloadPOS assemblePayloadPOSFunc + proposing bool + syncCond *sync.Cond // Engine API is asynchronous, we want to avoid CL to call different APIs at the same time + shutdown bool + logsFilter *LogsFilterAggregator } type EthBackend interface { @@ -87,31 +82,17 @@ type PayloadStatus struct { CriticalError error } -// The message we are going to send to the stage sync in NewPayload -type PayloadMessage struct { - Header *types.Header - Body *types.RawBody -} - -// The message we are going to send to the stage sync in ForkchoiceUpdated -type ForkChoiceMessage struct { - HeadBlockHash common.Hash - SafeBlockHash common.Hash - FinalizedBlockHash common.Hash -} - type pendingPayload struct { block *types.Block built bool } func NewEthBackendServer(ctx context.Context, eth EthBackend, db kv.RwDB, events *Events, blockReader interfaces.BlockAndTxnReader, - config *params.ChainConfig, newPayloadCh chan<- PayloadMessage, forkChoiceCh chan<- ForkChoiceMessage, statusCh <-chan PayloadStatus, - waitingForBeaconChain *uint32, skipCycleHack chan struct{}, assemblePayloadPOS assemblePayloadPOSFunc, proposing bool, + config *params.ChainConfig, requestList *engineapi.RequestList, statusCh <-chan PayloadStatus, + assemblePayloadPOS assemblePayloadPOSFunc, proposing bool, ) *EthBackendServer { s := &EthBackendServer{ctx: ctx, eth: eth, events: events, db: db, blockReader: blockReader, config: config, - newPayloadCh: newPayloadCh, forkChoiceCh: forkChoiceCh, statusCh: statusCh, waitingForBeaconChain: waitingForBeaconChain, - pendingPayloads: make(map[uint64]*pendingPayload), skipCycleHack: skipCycleHack, + requestList: requestList, statusCh: statusCh, pendingPayloads: make(map[uint64]*pendingPayload), assemblePayloadPOS: assemblePayloadPOS, proposing: proposing, syncCond: sync.NewCond(&sync.Mutex{}), logsFilter: NewLogsFilterAggregator(), } @@ -251,27 +232,27 @@ func convertPayloadStatus(payloadStatus *PayloadStatus) *remote.EnginePayloadSta func (s *EthBackendServer) stageLoopIsBusy() bool { for i := 0; i < 20; i++ { - if atomic.LoadUint32(s.waitingForBeaconChain) == 0 { + if !s.requestList.IsWaiting() { // This might happen, for example, in the following scenario: // 1) CL sends NewPayload and immediately after that ForkChoiceUpdated. // 2) We happily process NewPayload and stage loop is at the end. // 3) We start processing ForkChoiceUpdated, // but the stage loop hasn't moved yet from the end to the beginning of HeadersPOS - // and thus waitingForBeaconChain is not set yet. + // and thus requestList.WaitForRequest() is not called yet. // TODO(yperbasis): find a more elegant solution time.Sleep(5 * time.Millisecond) } } - return atomic.LoadUint32(s.waitingForBeaconChain) == 0 + return !s.requestList.IsWaiting() } // EngineNewPayloadV1, validates and possibly executes payload func (s *EthBackendServer) EngineNewPayloadV1(ctx context.Context, req *types2.ExecutionPayload) (*remote.EnginePayloadStatus, error) { - log.Info("[NewPayload] acquiring lock") + log.Trace("[NewPayload] acquiring lock") s.syncCond.L.Lock() defer s.syncCond.L.Unlock() - log.Info("[NewPayload] lock acquired") + log.Trace("[NewPayload] lock acquired") if s.config.TerminalTotalDifficulty == nil { log.Error("[NewPayload] not a proof-of-stake chain") @@ -319,18 +300,18 @@ func (s *EthBackendServer) EngineNewPayloadV1(ctx context.Context, req *types2.E // The process of validating a payload on the canonical chain MUST NOT be affected by an active sync process on a side branch of the block tree. // For example, if side branch B is SYNCING but the requisite data for validating a payload from canonical branch A is available, client software MUST initiate the validation process. // https://github.com/ethereum/execution-apis/blob/v1.0.0-alpha.6/src/engine/specification.md#payload-validation - log.Info("[NewPayload] stage loop is busy") + log.Trace("[NewPayload] stage loop is busy") return &remote.EnginePayloadStatus{Status: remote.EngineStatus_SYNCING}, nil } log.Info("[NewPayload] sending block", "height", header.Number, "hash", common.Hash(blockHash)) - s.newPayloadCh <- PayloadMessage{ + s.requestList.AddPayloadRequest(&engineapi.PayloadMessage{ Header: &header, Body: &types.RawBody{ Transactions: req.Transactions, Uncles: nil, }, - } + }) payloadStatus := <-s.statusCh log.Info("[NewPayload] got reply", "payloadStatus", payloadStatus) @@ -346,10 +327,10 @@ func (s *EthBackendServer) EngineNewPayloadV1(ctx context.Context, req *types2.E func (s *EthBackendServer) EngineGetPayloadV1(ctx context.Context, req *remote.EngineGetPayloadRequest) (*types2.ExecutionPayload, error) { // TODO(yperbasis): getPayload should stop block assembly if that's currently in fly - log.Info("[GetPayload] acquiring lock") + log.Trace("[GetPayload] acquiring lock") s.syncCond.L.Lock() defer s.syncCond.L.Unlock() - log.Info("[GetPayload] lock acquired") + log.Trace("[GetPayload] lock acquired") if !s.proposing { return nil, fmt.Errorf("execution layer not running as a proposer. enable proposer by taking out the --proposer.disable flag on startup") @@ -402,10 +383,10 @@ func (s *EthBackendServer) EngineGetPayloadV1(ctx context.Context, req *remote.E // EngineForkChoiceUpdatedV1, either states new block head or request the assembling of a new block func (s *EthBackendServer) EngineForkChoiceUpdatedV1(ctx context.Context, req *remote.EngineForkChoiceUpdatedRequest) (*remote.EngineForkChoiceUpdatedReply, error) { - log.Info("[ForkChoiceUpdated] acquiring lock") + log.Trace("[ForkChoiceUpdated] acquiring lock") s.syncCond.L.Lock() defer s.syncCond.L.Unlock() - log.Info("[ForkChoiceUpdated] lock acquired") + log.Trace("[ForkChoiceUpdated] lock acquired") if s.config.TerminalTotalDifficulty == nil { return nil, fmt.Errorf("not a proof-of-stake chain") @@ -417,20 +398,20 @@ func (s *EthBackendServer) EngineForkChoiceUpdatedV1(ctx context.Context, req *r // https://github.com/ethereum/execution-apis/blob/v1.0.0-alpha.6/src/engine/specification.md#specification-1 if s.stageLoopIsBusy() { - log.Info("[ForkChoiceUpdated] stage loop is busy") + log.Trace("[ForkChoiceUpdated] stage loop is busy") return &remote.EngineForkChoiceUpdatedReply{ PayloadStatus: &remote.EnginePayloadStatus{Status: remote.EngineStatus_SYNCING}, }, nil } - forkChoiceMessage := ForkChoiceMessage{ + forkChoiceMessage := engineapi.ForkChoiceMessage{ HeadBlockHash: gointerfaces.ConvertH256ToHash(req.ForkchoiceState.HeadBlockHash), SafeBlockHash: gointerfaces.ConvertH256ToHash(req.ForkchoiceState.SafeBlockHash), FinalizedBlockHash: gointerfaces.ConvertH256ToHash(req.ForkchoiceState.FinalizedBlockHash), } log.Info("[ForkChoiceUpdated] sending forkChoiceMessage", "head", forkChoiceMessage.HeadBlockHash) - s.forkChoiceCh <- forkChoiceMessage + s.requestList.AddForkChoiceRequest(&forkChoiceMessage) payloadStatus := <-s.statusCh log.Info("[ForkChoiceUpdated] got reply", "payloadStatus", payloadStatus) @@ -472,7 +453,7 @@ func (s *EthBackendServer) EngineForkChoiceUpdatedV1(ctx context.Context, req *r s.pendingPayloads[s.payloadId] = &pendingPayload{block: types.NewBlock(emptyHeader, nil, nil, nil)} - log.Info("[ForkChoiceUpdated] unpause assemble process") + log.Trace("[ForkChoiceUpdated] unpause assemble process") s.syncCond.Broadcast() // successfully assembled the payload and assigned the correct id @@ -498,10 +479,10 @@ func (s *EthBackendServer) evictOldPendingPayloads() { func (s *EthBackendServer) StartProposer() { go func() { - log.Info("[Proposer] acquiring lock") + log.Trace("[Proposer] acquiring lock") s.syncCond.L.Lock() defer s.syncCond.L.Unlock() - log.Info("[Proposer] lock acquired") + log.Trace("[Proposer] lock acquired") for { var blockToBuild *types.Block @@ -529,13 +510,13 @@ func (s *EthBackendServer) StartProposer() { } } - log.Info("[Proposer] Wait until we have to process new payloads") + log.Trace("[Proposer] Wait until we have to process new payloads") s.syncCond.Wait() - log.Info("[Proposer] Wait finished") + log.Trace("[Proposer] Wait finished") } // Tell the stage headers to leave space for the write transaction for mining stages - s.skipCycleHack <- struct{}{} + s.requestList.Interrupt(engineapi.Yield) param := core.BlockProposerParametersPOS{ ParentHash: blockToBuild.ParentHash(), @@ -544,11 +525,11 @@ func (s *EthBackendServer) StartProposer() { SuggestedFeeRecipient: blockToBuild.Header().Coinbase, } - log.Info("[Proposer] starting assembling...") + log.Trace("[Proposer] starting assembling...") s.syncCond.L.Unlock() block, err := s.assemblePayloadPOS(¶m) s.syncCond.L.Lock() - log.Info("[Proposer] payload assembled") + log.Trace("[Proposer] payload assembled") if err != nil { log.Warn("Error during block assembling", "err", err.Error()) @@ -564,10 +545,10 @@ func (s *EthBackendServer) StartProposer() { } func (s *EthBackendServer) StopProposer() { - log.Info("[StopProposer] acquiring lock") + log.Trace("[StopProposer] acquiring lock") s.syncCond.L.Lock() defer s.syncCond.L.Unlock() - log.Info("[StopProposer] lock acquired") + log.Trace("[StopProposer] lock acquired") s.shutdown = true s.syncCond.Broadcast() diff --git a/go.mod b/go.mod index ee018dd7e7..a7f1a63055 100644 --- a/go.mod +++ b/go.mod @@ -20,6 +20,7 @@ require ( github.com/dop251/goja v0.0.0-20200721192441-a695b0cdd498 github.com/edsrzf/mmap-go v1.0.0 github.com/emicklei/dot v0.16.0 + github.com/emirpasic/gods v1.12.0 github.com/fatih/color v1.13.0 github.com/fjl/gencodec v0.0.0-20191126094850-e283372f291f github.com/fortytw2/leaktest v1.3.0 // indirect diff --git a/go.sum b/go.sum index 8861fcd7e0..9ec9c4468e 100644 --- a/go.sum +++ b/go.sum @@ -337,6 +337,8 @@ github.com/elliotchance/orderedmap v1.3.0/go.mod h1:8hdSl6jmveQw8ScByd3AaNHNk51R github.com/elliotchance/orderedmap v1.4.0/go.mod h1:wsDwEaX5jEoyhbs7x93zk2H/qv0zwuhg4inXhDkYqys= github.com/emicklei/dot v0.16.0 h1:7PseyizTgeQ/aSF1eo4LcEfWlQSlzamFZpzY/nMB9EY= github.com/emicklei/dot v0.16.0/go.mod h1:DeV7GvQtIw4h2u73RKBkkFdvVAz0D9fzeJrgPW6gy/s= +github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg= +github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o= github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= diff --git a/turbo/engineapi/request_list.go b/turbo/engineapi/request_list.go new file mode 100644 index 0000000000..9e36560e1e --- /dev/null +++ b/turbo/engineapi/request_list.go @@ -0,0 +1,167 @@ +package engineapi + +import ( + "sync" + "sync/atomic" + + "github.com/emirpasic/gods/maps/treemap" + + "github.com/ledgerwatch/erigon/common" + "github.com/ledgerwatch/erigon/core/types" +) + +// The message we are going to send to the stage sync in NewPayload +type PayloadMessage struct { + Header *types.Header + Body *types.RawBody +} + +// The message we are going to send to the stage sync in ForkchoiceUpdated +type ForkChoiceMessage struct { + HeadBlockHash common.Hash + SafeBlockHash common.Hash + FinalizedBlockHash common.Hash +} + +type RequestStatus int + +const ( // RequestStatus values + New = iota + DataWasMissing +) + +type RequestWithStatus struct { + Message interface{} // *PayloadMessage or *ForkChoiceMessage + Status RequestStatus +} + +type Interrupt int + +const ( // Interrupt values + None = iota + Yield // e.g. yield RW transaction to block building + Synced + Stopping +) + +type RequestList struct { + requestId int + requests *treemap.Map // map[int]*RequestWithStatus + interrupt Interrupt + waiting uint32 + syncCond *sync.Cond +} + +func NewRequestList() *RequestList { + rl := &RequestList{ + requests: treemap.NewWithIntComparator(), + syncCond: sync.NewCond(&sync.Mutex{}), + } + return rl +} + +func (rl *RequestList) AddPayloadRequest(message *PayloadMessage) { + rl.syncCond.L.Lock() + defer rl.syncCond.L.Unlock() + + rl.requestId++ + + rl.requests.Put(rl.requestId, &RequestWithStatus{ + Message: message, + Status: New, + }) + + rl.syncCond.Broadcast() +} + +func (rl *RequestList) AddForkChoiceRequest(message *ForkChoiceMessage) { + rl.syncCond.L.Lock() + defer rl.syncCond.L.Unlock() + + rl.requestId++ + + // purge previous fork choices that are still syncing + rl.requests = rl.requests.Select(func(key interface{}, value interface{}) bool { + req := value.(*RequestWithStatus) + _, isForkChoice := req.Message.(*ForkChoiceMessage) + return req.Status == New || !isForkChoice + }) + // TODO(yperbasis): potentially purge some non-syncing old fork choices? + + rl.requests.Put(rl.requestId, &RequestWithStatus{ + Message: message, + Status: New, + }) + + rl.syncCond.Broadcast() +} + +func (rl *RequestList) firstRequest(onlyNew bool) (id int, request *RequestWithStatus) { + foundKey, foundValue := rl.requests.Min() + if onlyNew { + foundKey, foundValue = rl.requests.Find(func(key interface{}, value interface{}) bool { + return value.(*RequestWithStatus).Status == New + }) + } + if foundKey != nil { + return foundKey.(int), foundValue.(*RequestWithStatus) + } + return 0, nil +} + +func (rl *RequestList) WaitForRequest(onlyNew bool) (interrupt Interrupt, id int, request *RequestWithStatus) { + rl.syncCond.L.Lock() + defer rl.syncCond.L.Unlock() + + atomic.StoreUint32(&rl.waiting, 1) + defer atomic.StoreUint32(&rl.waiting, 0) + + for { + interrupt = rl.interrupt + if interrupt != None { + if interrupt != Stopping { + // clear the interrupt + rl.interrupt = None + } + return + } + id, request = rl.firstRequest(onlyNew) + if request != nil { + return + } + rl.syncCond.Wait() + } +} + +func (rl *RequestList) IsWaiting() bool { + return atomic.LoadUint32(&rl.waiting) != 0 +} + +func (rl *RequestList) Interrupt(kind Interrupt) { + rl.syncCond.L.Lock() + defer rl.syncCond.L.Unlock() + + rl.interrupt = kind + + rl.syncCond.Broadcast() +} + +func (rl *RequestList) Remove(id int) { + rl.syncCond.L.Lock() + defer rl.syncCond.L.Unlock() + + rl.requests.Remove(id) + // no need to broadcast +} + +func (rl *RequestList) SetStatus(id int, status RequestStatus) { + rl.syncCond.L.Lock() + defer rl.syncCond.L.Unlock() + + value, found := rl.requests.Get(id) + if found { + value.(*RequestWithStatus).Status = status + } + + rl.syncCond.Broadcast() +} diff --git a/turbo/stages/headerdownload/header_algos.go b/turbo/stages/headerdownload/header_algos.go index 37197d8bf9..66152f4322 100644 --- a/turbo/stages/headerdownload/header_algos.go +++ b/turbo/stages/headerdownload/header_algos.go @@ -29,6 +29,7 @@ import ( "github.com/ledgerwatch/erigon/p2p/enode" "github.com/ledgerwatch/erigon/params/networkname" "github.com/ledgerwatch/erigon/rlp" + "github.com/ledgerwatch/erigon/turbo/engineapi" ) // Implements sort.Interface so we can sort the incoming header in the message by block height @@ -623,17 +624,34 @@ func (hd *HeaderDownload) RequestMoreHeaders(currentTime uint64) (*HeaderRequest return nil, penalties } -func (hd *HeaderDownload) RequestMoreHeadersForPOS() HeaderRequest { - hd.lock.RLock() - defer hd.lock.RUnlock() - // Assemble the request - return HeaderRequest{ - Hash: hd.hashToDownloadPoS, - Number: hd.heightToDownloadPoS, +func (hd *HeaderDownload) requestMoreHeadersForPOS(currentTime uint64) (timeout bool, request *HeaderRequest, penalties []PenaltyItem) { + anchor := hd.posAnchor + if anchor == nil { + log.Trace("No PoS anchor") + return + } + + // Only process the anchors for which the nextRetryTime has already come + if anchor.nextRetryTime > currentTime { + return + } + + timeout = anchor.timeouts >= 10 + if timeout { + penalties = []PenaltyItem{{Penalty: AbandonedAnchorPenalty, PeerID: anchor.peerID}} + return + } + + // Request ancestors + request = &HeaderRequest{ + Anchor: anchor, + Hash: anchor.parentHash, + Number: anchor.blockHeight - 1, Length: 192, Skip: 0, Reverse: true, } + return } func (hd *HeaderDownload) UpdateRetryTime(req *HeaderRequest, currentTime, timeout uint64) { @@ -792,10 +810,14 @@ func (hd *HeaderDownload) InsertHeaders(hf FeedHeaderFunc, terminalTotalDifficul return hd.highestInDb >= hd.preverifiedHeight && hd.topSeenHeightPoW > 0 && hd.highestInDb >= hd.topSeenHeightPoW, nil } -func (hd *HeaderDownload) SetHashToDownloadPoS(hash common.Hash) { +func (hd *HeaderDownload) SetHeaderToDownloadPoS(hash common.Hash, height uint64) { hd.lock.Lock() defer hd.lock.Unlock() - hd.hashToDownloadPoS = hash + + hd.posAnchor = &Anchor{ + parentHash: hash, + blockHeight: height + 1, + } } func (hd *HeaderDownload) ProcessSegmentPOS(segment ChainSegment, tx kv.Getter) error { @@ -811,27 +833,33 @@ func (hd *HeaderDownload) ProcessSegmentPOS(segment ChainSegment, tx kv.Getter) log.Trace("Collecting...", "from", segment[0].Number, "to", segment[len(segment)-1].Number, "len", len(segment)) for _, segmentFragment := range segment { header := segmentFragment.Header - if header.Hash() != hd.hashToDownloadPoS { - return fmt.Errorf("unexpected hash %x (expected %x)", header.Hash(), hd.hashToDownloadPoS) + if header.Hash() != hd.posAnchor.parentHash { + return fmt.Errorf("unexpected hash %x (expected %x)", header.Hash(), hd.posAnchor.parentHash) } - if err := hd.headersCollector.Collect(dbutils.HeaderKey(header.Number.Uint64(), header.Hash()), segmentFragment.HeaderRaw); err != nil { + headerNumber := header.Number.Uint64() + if err := hd.headersCollector.Collect(dbutils.HeaderKey(headerNumber, header.Hash()), segmentFragment.HeaderRaw); err != nil { return err } - hd.hashToDownloadPoS = header.ParentHash - hd.heightToDownloadPoS = header.Number.Uint64() - 1 - - hh, err := hd.headerReader.Header(context.Background(), tx, hd.hashToDownloadPoS, hd.heightToDownloadPoS) + hh, err := hd.headerReader.Header(context.Background(), tx, header.ParentHash, headerNumber-1) if err != nil { return err } if hh != nil { - hd.synced = true + log.Trace("Synced", "requestId", hd.requestId) + hd.posAnchor = nil + hd.posStatus = Synced + hd.BeaconRequestList.Interrupt(engineapi.Synced) return nil } - if hd.heightToDownloadPoS == 0 { + hd.posAnchor = &Anchor{ + parentHash: header.ParentHash, + blockHeight: headerNumber, + } + + if headerNumber <= 1 { return errors.New("wrong genesis in PoS sync") } } @@ -850,8 +878,8 @@ func (hd *HeaderDownload) GrabAnnounces() []Announce { func (hd *HeaderDownload) Progress() uint64 { hd.lock.RLock() defer hd.lock.RUnlock() - if hd.posSync { - return hd.heightToDownloadPoS + if hd.posSync && hd.posAnchor != nil { + return hd.posAnchor.blockHeight - 1 } else { return hd.highestInDb } @@ -1170,16 +1198,16 @@ func (hd *HeaderDownload) SetFetching(fetching bool) { hd.fetching = fetching } -func (hd *HeaderDownload) SetHeightToDownloadPoS(heightToDownloadPoS uint64) { +func (hd *HeaderDownload) SetPosStatus(status SyncStatus) { hd.lock.Lock() defer hd.lock.Unlock() - hd.heightToDownloadPoS = heightToDownloadPoS + hd.posStatus = status } -func (hd *HeaderDownload) Unsync() { - hd.lock.Lock() - defer hd.lock.Unlock() - hd.synced = false +func (hd *HeaderDownload) HeadersCollector() *etl.Collector { + hd.lock.RLock() + defer hd.lock.RUnlock() + return hd.headersCollector } func (hd *HeaderDownload) SetHeadersCollector(collector *etl.Collector) { @@ -1200,10 +1228,10 @@ func (hd *HeaderDownload) POSSync() bool { return hd.posSync } -func (hd *HeaderDownload) Synced() bool { +func (hd *HeaderDownload) PosStatus() SyncStatus { hd.lock.RLock() defer hd.lock.RUnlock() - return hd.synced + return hd.posStatus } func (hd *HeaderDownload) RequestChaining() bool { @@ -1256,6 +1284,18 @@ func (hd *HeaderDownload) ClearPendingHeader() { hd.pendingHeaderHeight = 0 } +func (hd *HeaderDownload) RequestId() int { + hd.lock.RLock() + defer hd.lock.RUnlock() + return hd.requestId +} + +func (hd *HeaderDownload) SetRequestId(requestId int) { + hd.lock.Lock() + defer hd.lock.Unlock() + hd.requestId = requestId +} + func (hd *HeaderDownload) AddMinedHeader(header *types.Header) error { buf := bytes.NewBuffer(nil) if err := header.EncodeRLP(buf); err != nil { @@ -1316,6 +1356,90 @@ func (hd *HeaderDownload) AddHeaderFromSnapshot(tx kv.Tx, n uint64, r interfaces return nil } +const ( + logInterval = 30 * time.Second +) + +func (hd *HeaderDownload) cleanUpPoSDownload() { + if hd.headersCollector != nil { + hd.headersCollector.Close() + hd.headersCollector = nil + } + hd.posStatus = Idle +} + +func (hd *HeaderDownload) StartPoSDownloader( + ctx context.Context, + headerReqSend func(context.Context, *HeaderRequest) (enode.ID, bool), + penalize func(context.Context, []PenaltyItem), +) { + go func() { + prevProgress := uint64(0) + + logEvery := time.NewTicker(logInterval) + defer logEvery.Stop() + + for { + var req *HeaderRequest + var penalties []PenaltyItem + var currentTime uint64 + + hd.lock.Lock() + if hd.posStatus == Syncing { + currentTime = uint64(time.Now().Unix()) + var timeout bool + timeout, req, penalties = hd.requestMoreHeadersForPOS(currentTime) + if timeout { + log.Warn("Timeout", "requestId", hd.requestId) + hd.BeaconRequestList.Remove(hd.requestId) + hd.cleanUpPoSDownload() + } + } else { + prevProgress = 0 + } + hd.lock.Unlock() + + if req != nil { + _, sentToPeer := headerReqSend(ctx, req) + if sentToPeer { + // If request was actually sent to a peer, we update retry time to be 5 seconds in the future + hd.UpdateRetryTime(req, currentTime, 5 /* timeout */) + log.Trace("Sent request", "height", req.Number) + } + } + if len(penalties) > 0 { + penalize(ctx, penalties) + } + + // Sleep and check for logs + timer := time.NewTimer(2 * time.Millisecond) + select { + case <-ctx.Done(): + hd.lock.Lock() + hd.cleanUpPoSDownload() + hd.lock.Unlock() + hd.BeaconRequestList.Interrupt(engineapi.Stopping) + return + case <-logEvery.C: + if hd.PosStatus() == Syncing { + progress := hd.Progress() + if prevProgress == 0 { + prevProgress = progress + } else if progress <= prevProgress { + diff := prevProgress - progress + log.Info("Downloaded PoS Headers", "now", progress, + "blk/sec", float64(diff)/float64(logInterval/time.Second)) + prevProgress = progress + } + } + case <-timer.C: + } + // Cleanup timer + timer.Stop() + } + }() +} + func DecodeTips(encodings []string) (map[common.Hash]HeaderRecord, error) { hardTips := make(map[common.Hash]HeaderRecord, len(encodings)) diff --git a/turbo/stages/headerdownload/header_data_struct.go b/turbo/stages/headerdownload/header_data_struct.go index fc2147b6b9..4fed6e9d60 100644 --- a/turbo/stages/headerdownload/header_data_struct.go +++ b/turbo/stages/headerdownload/header_data_struct.go @@ -15,6 +15,7 @@ import ( "github.com/ledgerwatch/erigon/ethdb/privateapi" "github.com/ledgerwatch/erigon/p2p/enode" "github.com/ledgerwatch/erigon/rlp" + "github.com/ledgerwatch/erigon/turbo/engineapi" ) type QueueID uint8 @@ -104,7 +105,7 @@ type Anchor struct { links []*Link // Links attached immediately to this anchor parentHash common.Hash // Hash of the header this anchor can be connected to (to disappear) blockHeight uint64 - nextRetryTime uint64 // Zero when anchor has just been created, otherwise time when anchor needs to be check to see if retry is neeeded + nextRetryTime uint64 // Zero when anchor has just been created, otherwise time when anchor needs to be check to see if retry is needed timeouts int // Number of timeout that this anchor has experiences - after certain threshold, it gets invalidated idx int // Index of the anchor in the queue to be able to modify specific items } @@ -119,7 +120,7 @@ type Anchor struct { // As anchors are moved around in the binary heap, they internally track their // position in the heap (using `idx` field). This feature allows updating // the heap (using `Fix` function) in situations when anchor is accessed not -// throught the priority queue, but through the map `anchor` in the +// through the priority queue, but through the map `anchor` in the // HeaderDownloader type. type AnchorQueue []*Anchor @@ -246,6 +247,14 @@ func (iq *InsertQueue) Pop() interface{} { return x } +type SyncStatus int + +const ( // SyncStatus values + Idle = iota + Syncing + Synced // if we found a canonical hash during backward sync, in this case our sync process is done +) + type HeaderDownload struct { badHeaders map[common.Hash]struct{} anchors map[common.Hash]*Anchor // Mapping from parentHash to collection of anchors @@ -274,14 +283,15 @@ type HeaderDownload struct { consensusHeaderReader consensus.ChainHeaderReader headerReader interfaces.HeaderReader - // proof-of-stake + // Proof of Stake (PoS) topSeenHeightPoS uint64 - heightToDownloadPoS uint64 - hashToDownloadPoS common.Hash - synced bool // if we found a canonical hash during backward sync, in this case our sync process is done - posSync bool // True if the chain is syncing backwards or not + requestId int + posAnchor *Anchor + posStatus SyncStatus + posSync bool // Whether the chain is syncing in the PoS mode headersCollector *etl.Collector // ETL collector for headers - PayloadStatusCh chan privateapi.PayloadStatus // Channel to report payload validation/execution status (engine_newPayloadV1/forkchoiceUpdatedV1 response) + BeaconRequestList *engineapi.RequestList // Requests from ethbackend to staged sync + PayloadStatusCh chan privateapi.PayloadStatus // Responses (validation/execution status) pendingPayloadStatus common.Hash // Header whose status we still should send to PayloadStatusCh pendingHeaderHeight uint64 // Header to process after unwind (height) pendingHeaderHash common.Hash // Header to process after unwind (hash) @@ -313,6 +323,7 @@ func NewHeaderDownload( seenAnnounces: NewSeenAnnounces(), DeliveryNotify: make(chan struct{}, 1), SkipCycleHack: make(chan struct{}), + BeaconRequestList: engineapi.NewRequestList(), PayloadStatusCh: make(chan privateapi.PayloadStatus, 1), headerReader: headerReader, } diff --git a/turbo/stages/mock_sentry.go b/turbo/stages/mock_sentry.go index 75b43729fa..5213ffe72f 100644 --- a/turbo/stages/mock_sentry.go +++ b/turbo/stages/mock_sentry.go @@ -41,6 +41,7 @@ import ( "github.com/ledgerwatch/erigon/p2p/enode" "github.com/ledgerwatch/erigon/params" "github.com/ledgerwatch/erigon/rlp" + "github.com/ledgerwatch/erigon/turbo/engineapi" "github.com/ledgerwatch/erigon/turbo/shards" "github.com/ledgerwatch/erigon/turbo/snapshotsync" "github.com/ledgerwatch/erigon/turbo/stages/bodydownload" @@ -84,11 +85,6 @@ type MockSentry struct { TxPoolGrpcServer *txpool.GrpcServer TxPool *txpool.TxPool txPoolDB kv.RwDB - - // Beacon Chain - NewPayloadCh chan privateapi.PayloadMessage - ForkChoiceCh chan privateapi.ForkChoiceMessage - waitingForBeaconChain uint32 } func (ms *MockSentry) Close() { @@ -287,9 +283,6 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey isBor := mock.ChainConfig.Bor != nil - mock.NewPayloadCh = make(chan privateapi.PayloadMessage) - mock.ForkChoiceCh = make(chan privateapi.ForkChoiceMessage) - mock.Sync = stagedsync.New( stagedsync.DefaultStages(mock.Ctx, prune, stagedsync.StageHeadersCfg( @@ -302,9 +295,6 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey penalize, cfg.BatchSize, false, - mock.NewPayloadCh, - mock.ForkChoiceCh, - &mock.waitingForBeaconChain, allSnapshots, snapshotsDownloader, blockReader, @@ -352,6 +342,8 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey stagedsync.DefaultPruneOrder, ) + mock.downloader.Hd.StartPoSDownloader(mock.Ctx, sendHeaderRequest, penalize) + miningConfig := cfg.Miner miningConfig.Enabled = true miningConfig.Noverify = false @@ -518,3 +510,15 @@ func (ms *MockSentry) InsertChain(chain *core.ChainPack) error { } return nil } + +func (ms *MockSentry) SendPayloadRequest(message *engineapi.PayloadMessage) { + ms.downloader.Hd.BeaconRequestList.AddPayloadRequest(message) +} + +func (ms *MockSentry) SendForkChoiceRequest(message *engineapi.ForkChoiceMessage) { + ms.downloader.Hd.BeaconRequestList.AddForkChoiceRequest(message) +} + +func (ms *MockSentry) ReceivePayloadStatus() privateapi.PayloadStatus { + return <-ms.downloader.Hd.PayloadStatusCh +} diff --git a/turbo/stages/sentry_mock_test.go b/turbo/stages/sentry_mock_test.go index 07e23f4cfb..6a4e2df03e 100644 --- a/turbo/stages/sentry_mock_test.go +++ b/turbo/stages/sentry_mock_test.go @@ -5,15 +5,16 @@ import ( "testing" "github.com/holiman/uint256" + "github.com/ledgerwatch/erigon-lib/gointerfaces/remote" "github.com/ledgerwatch/erigon-lib/gointerfaces/sentry" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/u256" "github.com/ledgerwatch/erigon/core" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/eth/protocols/eth" - "github.com/ledgerwatch/erigon/ethdb/privateapi" "github.com/ledgerwatch/erigon/params" "github.com/ledgerwatch/erigon/rlp" + "github.com/ledgerwatch/erigon/turbo/engineapi" "github.com/ledgerwatch/erigon/turbo/stages" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -511,20 +512,116 @@ func TestForkchoiceToGenesis(t *testing.T) { m := stages.MockWithZeroTTD(t) // Trivial forkChoice: everything points to genesis - forkChoiceMessage := privateapi.ForkChoiceMessage{ + forkChoiceMessage := engineapi.ForkChoiceMessage{ HeadBlockHash: m.Genesis.Hash(), SafeBlockHash: m.Genesis.Hash(), FinalizedBlockHash: m.Genesis.Hash(), } - - go func() { - m.ForkChoiceCh <- forkChoiceMessage - }() + m.SendForkChoiceRequest(&forkChoiceMessage) headBlockHash, err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, 0, m.Notifications, true, m.UpdateHead, nil) if err != nil { t.Fatal(err) } - assert.Equal(t, headBlockHash, m.Genesis.Hash()) + assert.Equal(t, m.Genesis.Hash(), headBlockHash) + + payloadStatus := m.ReceivePayloadStatus() + assert.Equal(t, remote.EngineStatus_VALID, payloadStatus.Status) +} + +func TestBogusForkchoice(t *testing.T) { + m := stages.MockWithZeroTTD(t) + + // Bogus forkChoice: head points to rubbish + forkChoiceMessage := engineapi.ForkChoiceMessage{ + HeadBlockHash: common.HexToHash("11111111111111111111"), + SafeBlockHash: m.Genesis.Hash(), + FinalizedBlockHash: m.Genesis.Hash(), + } + m.SendForkChoiceRequest(&forkChoiceMessage) + + _, err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, 0, m.Notifications, true, m.UpdateHead, nil) + if err != nil { + t.Fatal(err) + } + + payloadStatus := m.ReceivePayloadStatus() + assert.Equal(t, remote.EngineStatus_SYNCING, payloadStatus.Status) + + // Now send a correct forkChoice + forkChoiceMessage = engineapi.ForkChoiceMessage{ + HeadBlockHash: m.Genesis.Hash(), + SafeBlockHash: m.Genesis.Hash(), + FinalizedBlockHash: m.Genesis.Hash(), + } + m.SendForkChoiceRequest(&forkChoiceMessage) + + _, err = stages.StageLoopStep(m.Ctx, m.DB, m.Sync, 0, m.Notifications, false, m.UpdateHead, nil) + if err != nil { + t.Fatal(err) + } + + payloadStatus = m.ReceivePayloadStatus() + assert.Equal(t, remote.EngineStatus_VALID, payloadStatus.Status) +} + +func TestPoSDownloader(t *testing.T) { + m := stages.MockWithZeroTTD(t) + + chain, err := core.GenerateChain(m.ChainConfig, m.Genesis, m.Engine, m.DB, 2, func(i int, b *core.BlockGen) { + b.SetCoinbase(common.Address{1}) + }, false /* intermediateHashes */) + if err != nil { + t.Fatalf("generate blocks: %v", err) + } + + // Send a payload with missing parent + payloadMessage := engineapi.PayloadMessage{ + Header: chain.TopBlock.Header(), + Body: chain.TopBlock.RawBody(), + } + m.SendPayloadRequest(&payloadMessage) + _, err = stages.StageLoopStep(m.Ctx, m.DB, m.Sync, 0, m.Notifications, true, m.UpdateHead, nil) + if err != nil { + t.Fatal(err) + } + payloadStatus := m.ReceivePayloadStatus() + assert.Equal(t, remote.EngineStatus_SYNCING, payloadStatus.Status) + + // Send the missing header + b, err := rlp.EncodeToBytes(ð.BlockHeadersPacket66{ + RequestId: 1, + BlockHeadersPacket: chain.Headers[0:1], + }) + require.NoError(t, err) + m.ReceiveWg.Add(1) + for _, err = range m.Send(&sentry.InboundMessage{Id: sentry.MessageId_BLOCK_HEADERS_66, Data: b, PeerId: m.PeerId}) { + require.NoError(t, err) + } + m.ReceiveWg.Wait() + + // First cycle: save the downloaded header + _, err = stages.StageLoopStep(m.Ctx, m.DB, m.Sync, 0, m.Notifications, false, m.UpdateHead, nil) + if err != nil { + t.Fatal(err) + } + // Second cycle: process the previous beacon request + _, err = stages.StageLoopStep(m.Ctx, m.DB, m.Sync, 0, m.Notifications, false, m.UpdateHead, nil) + if err != nil { + t.Fatal(err) + } + + // Point forkChoice to the head + forkChoiceMessage := engineapi.ForkChoiceMessage{ + HeadBlockHash: chain.TopBlock.Hash(), + SafeBlockHash: chain.TopBlock.Hash(), + FinalizedBlockHash: chain.TopBlock.Hash(), + } + m.SendForkChoiceRequest(&forkChoiceMessage) + headBlockHash, err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, 0, m.Notifications, false, m.UpdateHead, nil) + if err != nil { + t.Fatal(err) + } + assert.Equal(t, chain.TopBlock.Hash(), headBlockHash) } diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index 43a25a1227..e2c100ca77 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -249,9 +249,6 @@ func NewStagedSync( controlServer *sentry.ControlServerImpl, tmpdir string, accumulator *shards.Accumulator, - newPayloadCh chan privateapi.PayloadMessage, - forkChoiceCh chan privateapi.ForkChoiceMessage, - waitingForBeaconChain *uint32, snapshotDownloader proto_downloader.DownloaderClient, allSnapshots *snapshotsync.RoSnapshots, ) (*stagedsync.Sync, error) { @@ -278,9 +275,6 @@ func NewStagedSync( controlServer.Penalize, cfg.BatchSize, p2pCfg.NoDiscovery, - newPayloadCh, - forkChoiceCh, - waitingForBeaconChain, allSnapshots, snapshotDownloader, blockReader, -- GitLab