diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index dc61302b983f7b55f68d7870c197373ac0e0d0dc..549dca13a95f96e58e5673ed2b752cc1f4609a83 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -139,72 +139,6 @@ func SpawnStageHeaders( } } -func finishHandlingForkChoice( - forkChoice *engineapi.ForkChoiceMessage, - headHeight uint64, - s *StageState, - tx kv.RwTx, - cfg HeadersCfg, - useExternalTx bool, -) error { - log.Info(fmt.Sprintf("[%s] Unsettled forkchoice after unwind", s.LogPrefix()), "height", headHeight, "forkchoice", forkChoice) - - logEvery := time.NewTicker(logInterval) - defer logEvery.Stop() - - if err := fixCanonicalChain(s.LogPrefix(), logEvery, headHeight, forkChoice.HeadBlockHash, tx, cfg.blockReader); err != nil { - return err - } - - if err := rawdb.WriteHeadHeaderHash(tx, forkChoice.HeadBlockHash); err != nil { - return err - } - - sendErrResponse := cfg.hd.GetPendingPayloadStatus() != (common.Hash{}) - - safeIsCanonical, err := rawdb.IsCanonicalHash(tx, forkChoice.SafeBlockHash) - if err != nil { - return err - } - if !safeIsCanonical { - log.Warn(fmt.Sprintf("[%s] Non-canonical SafeBlockHash", s.LogPrefix()), "forkChoice", forkChoice) - if sendErrResponse { - cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{ - CriticalError: errors.New("safe block is not an ancestor of head block"), - } - cfg.hd.ClearPendingPayloadStatus() - sendErrResponse = false - } - } - - finalizedIsCanonical, err := rawdb.IsCanonicalHash(tx, forkChoice.FinalizedBlockHash) - if err != nil { - return err - } - if !finalizedIsCanonical { - log.Warn(fmt.Sprintf("[%s] Non-canonical FinalizedBlockHash", s.LogPrefix()), "forkChoice", forkChoice) - if sendErrResponse { - cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{ - CriticalError: errors.New("finalized block is not an ancestor of head block"), - } - cfg.hd.ClearPendingPayloadStatus() - } - } - - if err := s.Update(tx, headHeight); err != nil { - return err - } - - if !useExternalTx { - if err := tx.Commit(); err != nil { - return err - } - } - - cfg.hd.ClearUnsettledForkChoice() - return nil -} - // HeadersPOS processes Proof-of-Stake requests (newPayload, forkchoiceUpdated) func HeadersPOS( s *StageState, @@ -263,8 +197,46 @@ func HeadersPOS( return nil } +func safeAndFinalizedBlocksAreCanonical( + forkChoice *engineapi.ForkChoiceMessage, + s *StageState, + tx kv.RwTx, + cfg HeadersCfg, + sendErrResponse bool, +) (bool, error) { + safeIsCanonical, err := rawdb.IsCanonicalHash(tx, forkChoice.SafeBlockHash) + if err != nil { + return false, err + } + if !safeIsCanonical { + log.Warn(fmt.Sprintf("[%s] Non-canonical SafeBlockHash", s.LogPrefix()), "forkChoice", forkChoice) + if sendErrResponse { + cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{ + CriticalError: errors.New("safe block is not an ancestor of head block"), + } + } + return false, nil + } + + finalizedIsCanonical, err := rawdb.IsCanonicalHash(tx, forkChoice.FinalizedBlockHash) + if err != nil { + return false, err + } + if !finalizedIsCanonical { + log.Warn(fmt.Sprintf("[%s] Non-canonical FinalizedBlockHash", s.LogPrefix()), "forkChoice", forkChoice) + if sendErrResponse { + cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{ + CriticalError: errors.New("finalized block is not an ancestor of head block"), + } + } + return false, nil + } + + return true, nil +} + func startHandlingForkChoice( - forkChoiceMessage *engineapi.ForkChoiceMessage, + forkChoice *engineapi.ForkChoiceMessage, requestStatus engineapi.RequestStatus, requestId int, s *StageState, @@ -274,14 +246,22 @@ func startHandlingForkChoice( cfg HeadersCfg, headerInserter *headerdownload.HeaderInserter, ) error { - headerHash := forkChoiceMessage.HeadBlockHash + headerHash := forkChoice.HeadBlockHash log.Info(fmt.Sprintf("[%s] Handling fork choice", s.LogPrefix()), "headerHash", headerHash) currentHeadHash := rawdb.ReadHeadHeaderHash(tx) if currentHeadHash == headerHash { // no-op log.Info(fmt.Sprintf("[%s] Fork choice no-op", s.LogPrefix())) cfg.hd.BeaconRequestList.Remove(requestId) - if requestStatus == engineapi.New { + canonical, err := safeAndFinalizedBlocksAreCanonical(forkChoice, s, tx, cfg, requestStatus == engineapi.New) + if err != nil { + log.Warn(fmt.Sprintf("[%s] Fork choice err", s.LogPrefix()), "err", err) + if requestStatus == engineapi.New { + cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{CriticalError: err} + } + return err + } + if canonical && requestStatus == engineapi.New { cfg.hd.PayloadStatusCh <- privateapi.PayloadStatus{ Status: remote.EngineStatus_VALID, LatestValidHash: currentHeadHash, @@ -356,11 +336,55 @@ func startHandlingForkChoice( u.UnwindTo(forkingPoint, common.Hash{}) log.Trace(fmt.Sprintf("[%s] Fork choice unwind finished", s.LogPrefix())) - cfg.hd.SetUnsettledForkChoice(forkChoiceMessage, headerNumber) + cfg.hd.SetUnsettledForkChoice(forkChoice, headerNumber) return nil } +func finishHandlingForkChoice( + forkChoice *engineapi.ForkChoiceMessage, + headHeight uint64, + s *StageState, + tx kv.RwTx, + cfg HeadersCfg, + useExternalTx bool, +) error { + log.Info(fmt.Sprintf("[%s] Unsettled forkchoice after unwind", s.LogPrefix()), "height", headHeight, "forkchoice", forkChoice) + + logEvery := time.NewTicker(logInterval) + defer logEvery.Stop() + + if err := fixCanonicalChain(s.LogPrefix(), logEvery, headHeight, forkChoice.HeadBlockHash, tx, cfg.blockReader); err != nil { + return err + } + + if err := rawdb.WriteHeadHeaderHash(tx, forkChoice.HeadBlockHash); err != nil { + return err + } + + sendErrResponse := cfg.hd.GetPendingPayloadStatus() != (common.Hash{}) + canonical, err := safeAndFinalizedBlocksAreCanonical(forkChoice, s, tx, cfg, sendErrResponse) + if err != nil { + return err + } + if !canonical { + cfg.hd.ClearPendingPayloadStatus() + } + + if err := s.Update(tx, headHeight); err != nil { + return err + } + + if !useExternalTx { + if err := tx.Commit(); err != nil { + return err + } + } + + cfg.hd.ClearUnsettledForkChoice() + return nil +} + func handleNewPayload( payloadMessage *engineapi.PayloadMessage, requestStatus engineapi.RequestStatus,