diff --git a/cmd/sentry/download/downloader.go b/cmd/sentry/download/downloader.go index 2a2c958d2b55023982cb9c8ee6b7fc7a299cf32f..6d487551744c692e39cee194dcb3a7fad89df991 100644 --- a/cmd/sentry/download/downloader.go +++ b/cmd/sentry/download/downloader.go @@ -353,6 +353,7 @@ func NewControlServer(db kv.RwDB, nodeName string, chainConfig *params.ChainConf 1024*1024, /* linkLimit */ engine, ) + if err := hd.RecoverFromDb(db); err != nil { return nil, fmt.Errorf("recovery from DB failed: %w", err) } @@ -470,30 +471,42 @@ func (cs *ControlServerImpl) blockHeaders(ctx context.Context, pkt eth.BlockHead Number: number, }) } - if segments, penaltyKind, err := cs.Hd.SplitIntoSegments(csHeaders); err == nil { if penaltyKind == headerdownload.NoPenalty { - var canRequestMore bool - for _, segment := range segments { - requestMore, penalties := cs.Hd.ProcessSegment(segment, false /* newBlock */, ConvertH256ToPeerID(peerID)) - canRequestMore = canRequestMore || requestMore - if len(penalties) > 0 { - cs.Penalize(ctx, penalties) + if cs.Hd.POSSync() { + tx, err := cs.db.BeginRo(ctx) + defer tx.Rollback() + if err != nil { + return err } - } - - if canRequestMore { - currentTime := uint64(time.Now().Unix()) - req, penalties := cs.Hd.RequestMoreHeaders(currentTime) - if req != nil { - if _, sentToPeer := cs.SendHeaderRequest(ctx, req); sentToPeer { - // If request was actually sent to a peer, we update retry time to be 5 seconds in the future - cs.Hd.UpdateRetryTime(req, currentTime, 5 /* timeout */) - log.Trace("Sent request", "height", req.Number) + for _, segment := range segments { + if err := cs.Hd.ProcessSegmentPOS(segment, tx); err != nil { + return err + } + } + } else { + var canRequestMore bool + for _, segment := range segments { + requestMore, penalties := cs.Hd.ProcessSegment(segment, false /* newBlock */, ConvertH256ToPeerID(peerID)) + canRequestMore = canRequestMore || requestMore + if len(penalties) > 0 { + cs.Penalize(ctx, penalties) } } - if len(penalties) > 0 { - cs.Penalize(ctx, penalties) + + if canRequestMore { + currentTime := uint64(time.Now().Unix()) + req, penalties := cs.Hd.RequestMoreHeaders(currentTime) + if req != nil { + if _, sentToPeer := cs.SendHeaderRequest(ctx, req); sentToPeer { + // If request was actually sent to a peer, we update retry time to be 5 seconds in the future + cs.Hd.UpdateRetryTime(req, currentTime, 5 /* timeout */) + log.Trace("Sent request", "height", req.Number) + } + } + if len(penalties) > 0 { + cs.Penalize(ctx, penalties) + } } } } else { diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index b5f698f619c91f24efef6600a5de322d30c6cadd..90ab3acaf8e2e78aea0183db03f3af585c69482a 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -11,6 +11,7 @@ import ( "github.com/c2h5oh/datasize" "github.com/holiman/uint256" libcommon "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/etl" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/cmd/rpcdaemon/interfaces" "github.com/ledgerwatch/erigon/common" @@ -38,6 +39,7 @@ type HeadersCfg struct { penalize func(context.Context, []headerdownload.PenaltyItem) batchSize datasize.ByteSize noP2PDiscovery bool + tmpdir string reverseDownloadCh chan types.Header waitingPosHeaders *bool snapshots *snapshotsync.AllSnapshots @@ -58,6 +60,7 @@ func StageHeadersCfg( waitingPosHeaders *bool, snapshots *snapshotsync.AllSnapshots, blockReader interfaces.FullBlockReader, + tmpdir string, ) HeadersCfg { return HeadersCfg{ db: db, @@ -110,14 +113,14 @@ func SpawnStageHeaders( } if isTrans { - return HeadersDownward(s, u, ctx, tx, cfg, initialCycle, test) + return HeadersPOS(s, u, ctx, tx, cfg, initialCycle, test, useExternalTx) } else { - return HeadersForward(s, u, ctx, tx, cfg, initialCycle, test, useExternalTx) + return HeadersPOW(s, u, ctx, tx, cfg, initialCycle, test, useExternalTx) } } // HeadersDownward progresses Headers stage in the downward direction -func HeadersDownward( +func HeadersPOS( s *StageState, u Unwinder, ctx context.Context, @@ -125,6 +128,7 @@ func HeadersDownward( cfg HeadersCfg, initialCycle bool, test bool, // Set to true in tests, allows the stage to fail rather than wait indefinitely + useExternalTx bool, ) error { *cfg.waitingPosHeaders = true // Waiting for the beacon chain @@ -132,8 +136,6 @@ func HeadersDownward( header := <-cfg.reverseDownloadCh *cfg.waitingPosHeaders = false - defer tx.Commit() - headerNumber := header.Number.Uint64() blockHash, err := rawdb.ReadCanonicalHash(tx, headerNumber) @@ -162,14 +164,124 @@ func HeadersDownward( return err } if parent != nil && parent.Hash() == header.ParentHash { - return s.Update(tx, header.Number.Uint64()) + if err := s.Update(tx, header.Number.Uint64()); err != nil { + return err + } + // For the sake of simplicity we can just assume it will be valid for now. (TODO: move to execution stage) + cfg.statusCh <- privateapi.ExecutionStatus{ + Status: privateapi.Valid, + HeadHash: header.Hash(), + } + return tx.Commit() } - // Downward sync if we need to process more (TODO) - return s.Update(tx, header.Number.Uint64()) + cfg.hd.SetPOSSync(true) + if err = cfg.hd.ReadProgressFromDb(tx); err != nil { + return err + } + cfg.hd.SetProcessed(header.Number.Uint64()) + cfg.hd.SetExpectedHash(header.ParentHash) + cfg.hd.SetFetching(true) + logPrefix := s.LogPrefix() + + logEvery := time.NewTicker(logInterval) + defer logEvery.Stop() + + // Allow other stages to run 1 cycle if no network available + if initialCycle && cfg.noP2PDiscovery { + return nil + } + + cfg.statusCh <- privateapi.ExecutionStatus{ + Status: privateapi.Syncing, + HeadHash: rawdb.ReadHeadBlockHash(tx), + } + log.Info(fmt.Sprintf("[%s] Waiting for headers...", logPrefix), "from", header.Number.Uint64()) + + cfg.hd.SetHeaderReader(&chainReader{config: &cfg.chainConfig, tx: tx, blockReader: cfg.blockReader}) + + stopped := false + prevProgress := header.Number.Uint64() + + headerCollector := etl.NewCollector(logPrefix, cfg.tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize)) + canonicalHeadersCollector := etl.NewCollector(logPrefix, cfg.tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize)) + cfg.hd.SetHeadersCollector(headerCollector) + cfg.hd.SetCanonicalHashesCollector(canonicalHeadersCollector) + // Cleanup after we finish backward sync + defer func() { + headerCollector.Close() + canonicalHeadersCollector.Close() + cfg.hd.SetHeadersCollector(nil) + cfg.hd.SetCanonicalHashesCollector(nil) + cfg.hd.Unsync() + cfg.hd.SetFetching(false) + }() + + var req headerdownload.HeaderRequest + for !stopped { + sentToPeer := false + maxRequests := 4096 + for !sentToPeer && !stopped && maxRequests != 0 { + req = cfg.hd.RequestMoreHeadersForPOS() + _, sentToPeer = cfg.headerReqSend(ctx, &req) + maxRequests-- + } + // Load headers into the database + announces := cfg.hd.GrabAnnounces() + if len(announces) > 0 { + cfg.announceNewHashes(ctx, announces) + } + if cfg.hd.Synced() { // We do not break unless there best header changed + stopped = true + } + // Sleep and check for logs + timer := time.NewTimer(2 * time.Millisecond) + select { + case <-ctx.Done(): + stopped = true + case <-logEvery.C: + diff := prevProgress - cfg.hd.Progress() + if cfg.hd.Progress() <= prevProgress { + log.Info("Wrote Block Headers backwards", "from", header.Number.Uint64(), + "now", cfg.hd.Progress(), "blk/sec", float64(diff)/float64(logInterval/time.Second)) + prevProgress = cfg.hd.Progress() + } + case <-timer.C: + log.Trace("RequestQueueTime (header) ticked") + } + // Cleanup timer + timer.Stop() + } + // If the user stopped it, we dont update anything + if !cfg.hd.Synced() { + return nil + } + + if err := headerCollector.Load(tx, kv.Headers, etl.IdentityLoadFunc, etl.TransformArgs{ + LogDetailsLoad: func(k, v []byte) (additionalLogArguments []interface{}) { + return []interface{}{"block", binary.BigEndian.Uint64(k)} + }, + }); err != nil { + return err + } + if err = canonicalHeadersCollector.Load(tx, kv.HeaderCanonical, etl.IdentityLoadFunc, etl.TransformArgs{ + LogDetailsLoad: func(k, v []byte) (additionalLogArguments []interface{}) { + return []interface{}{"block", binary.BigEndian.Uint64(k)} + }, + }); err != nil { + return err + } + if s.BlockNumber >= cfg.hd.Progress() { + u.UnwindTo(cfg.hd.Progress(), common.Hash{}) + } else { + if err := s.Update(tx, header.Number.Uint64()); err != nil { + return err + } + } + return tx.Commit() } // HeadersForward progresses Headers stage in the forward direction -func HeadersForward( +func HeadersPOW( s *StageState, u Unwinder, ctx context.Context, @@ -261,7 +373,6 @@ func HeadersForward( if header.Eip3675 { return nil } - if td.Cmp(cfg.terminalTotalDifficulty) > 0 { return rawdb.MarkTransition(tx, blockNum) } @@ -305,6 +416,7 @@ func HeadersForward( if err = cfg.hd.ReadProgressFromDb(tx); err != nil { return err } + cfg.hd.SetPOSSync(false) cfg.hd.SetFetching(true) defer cfg.hd.SetFetching(false) headerProgress = cfg.hd.Progress() diff --git a/ethdb/privateapi/ethbackend.go b/ethdb/privateapi/ethbackend.go index 399b4eff97da38c86d29bb499df53445b3a380f7..0e27654509eae1f68f30ae78dd1e4df45897d2d4 100644 --- a/ethdb/privateapi/ethbackend.go +++ b/ethdb/privateapi/ethbackend.go @@ -239,7 +239,6 @@ func (s *EthBackendServer) EngineExecutePayloadV1(ctx context.Context, req *type if header.Hash() != blockHash { return nil, fmt.Errorf("invalid hash for payload. got: %s, wanted: %s", common.Bytes2Hex(blockHash[:]), common.Bytes2Hex(header.Hash().Bytes())) } - log.Info("Received Payload from beacon-chain", "hash", blockHash) // Send the block over s.numberSent = req.BlockNumber s.reverseDownloadCh <- header diff --git a/turbo/stages/headerdownload/header_algos.go b/turbo/stages/headerdownload/header_algos.go index 9fe0594ef65eca05fa1df4696600b691f48fe53c..87ec09d99ab000d7f9cf6c66ae0dcec4431ab005 100644 --- a/turbo/stages/headerdownload/header_algos.go +++ b/turbo/stages/headerdownload/header_algos.go @@ -15,6 +15,7 @@ import ( "strings" "time" + "github.com/ledgerwatch/erigon-lib/etl" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/cmd/rpcdaemon/interfaces" "github.com/ledgerwatch/erigon/common" @@ -571,6 +572,19 @@ func (hd *HeaderDownload) RequestMoreHeaders(currentTime uint64) (*HeaderRequest return nil, penalties } +func (hd *HeaderDownload) RequestMoreHeadersForPOS() HeaderRequest { + hd.lock.RLock() + defer hd.lock.RUnlock() + // Assemble the request + return HeaderRequest{ + Hash: common.Hash{}, + Number: hd.lastProcessedPayload - 1, + Length: 192, + Skip: 0, + Reverse: true, + } +} + func (hd *HeaderDownload) UpdateRetryTime(req *HeaderRequest, currentTime, timeout uint64) { hd.lock.Lock() defer hd.lock.Unlock() @@ -682,6 +696,49 @@ func (hd *HeaderDownload) InsertHeaders(hf func(header *types.Header, headerRaw return hd.highestInDb >= hd.preverifiedHeight && hd.topSeenHeight > 0 && hd.highestInDb >= hd.topSeenHeight, nil } +func (hd *HeaderDownload) SetExpectedHash(hash common.Hash) { + hd.lock.Lock() + defer hd.lock.Unlock() + hd.expectedHash = hash +} + +func (hd *HeaderDownload) ProcessSegmentPOS(segment ChainSegment, tx kv.Getter) error { + if len(segment) == 0 { + return nil + } + hd.lock.Lock() + defer hd.lock.Unlock() + // Handle request after closing collectors + if hd.canonicalHashesCollector == nil || hd.headersCollector == nil { + return nil + } + log.Trace("Collecting...", "from", segment[0].Number, "to", segment[len(segment)-1].Number, "len", len(segment)) + for _, segmentFragment := range segment { + header := segmentFragment.Header + // If we found the block number we were missing, we can just dismiss it + if header.Hash() != hd.expectedHash { + return nil + } + currentCanonical, err := rawdb.ReadCanonicalHash(tx, header.Number.Uint64()) + if err != nil { + return err + } + if currentCanonical == hd.expectedHash || hd.lastProcessedPayload == 1 { + hd.synced = true + return nil + } + if err := hd.headersCollector.Collect(dbutils.HeaderKey(header.Number.Uint64(), header.Hash()), segmentFragment.HeaderRaw); err != nil { + return err + } + if err := hd.canonicalHashesCollector.Collect(dbutils.EncodeBlockNumber(header.Number.Uint64()), header.Hash().Bytes()); err != nil { + return err + } + hd.expectedHash = segmentFragment.Header.ParentHash + hd.lastProcessedPayload = header.Number.Uint64() + } + return nil +} + // GrabAnnounces - returns all available announces and forget them func (hd *HeaderDownload) GrabAnnounces() []Announce { hd.lock.Lock() @@ -694,7 +751,11 @@ func (hd *HeaderDownload) GrabAnnounces() []Announce { func (hd *HeaderDownload) Progress() uint64 { hd.lock.RLock() defer hd.lock.RUnlock() - return hd.highestInDb + if hd.posSync { + return hd.lastProcessedPayload + } else { + return hd.highestInDb + } } func (hd *HeaderDownload) HasLink(linkHash common.Hash) bool { @@ -965,6 +1026,48 @@ func (hd *HeaderDownload) SetFetching(fetching bool) { hd.fetching = fetching } +func (hd *HeaderDownload) SetProcessed(lastProcessed uint64) { + hd.lock.Lock() + defer hd.lock.Unlock() + hd.lastProcessedPayload = lastProcessed +} + +func (hd *HeaderDownload) Unsync() { + hd.lock.Lock() + defer hd.lock.Unlock() + hd.synced = false +} + +func (hd *HeaderDownload) SetHeadersCollector(collector *etl.Collector) { + hd.lock.Lock() + defer hd.lock.Unlock() + hd.headersCollector = collector +} + +func (hd *HeaderDownload) SetCanonicalHashesCollector(collector *etl.Collector) { + hd.lock.Lock() + defer hd.lock.Unlock() + hd.canonicalHashesCollector = collector +} + +func (hd *HeaderDownload) SetPOSSync(posSync bool) { + hd.lock.Lock() + defer hd.lock.Unlock() + hd.posSync = posSync +} + +func (hd *HeaderDownload) POSSync() bool { + hd.lock.RLock() + defer hd.lock.RUnlock() + return hd.posSync +} + +func (hd *HeaderDownload) Synced() bool { + hd.lock.RLock() + defer hd.lock.RUnlock() + return hd.synced +} + func (hd *HeaderDownload) RequestChaining() bool { hd.lock.RLock() defer hd.lock.RUnlock() diff --git a/turbo/stages/headerdownload/header_data_struct.go b/turbo/stages/headerdownload/header_data_struct.go index 81e75db02e5546be12d2978cb223ebb6d63c96fd..422a5b41706a0e4ac0859d7192a9e5dcd85188f0 100644 --- a/turbo/stages/headerdownload/header_data_struct.go +++ b/turbo/stages/headerdownload/header_data_struct.go @@ -7,6 +7,7 @@ import ( "sync" lru "github.com/hashicorp/golang-lru" + "github.com/ledgerwatch/erigon-lib/etl" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/consensus" "github.com/ledgerwatch/erigon/core/types" @@ -219,6 +220,13 @@ type HeaderDownload struct { topSeenHeight uint64 requestChaining bool // Whether the downloader is allowed to issue more requests when previous responses created or moved an anchor fetching bool // Set when the stage that is actively fetching the headers is in progress + // proof-of-stake + lastProcessedPayload uint64 // The last header number inserted when processing the chain backwards + expectedHash common.Hash // Parenthash of the last header inserted, we keep it so that we do not read it from database over and over + synced bool // if we found a canonical hash during backward sync, in this case our sync process is done + posSync bool // True if the chain is syncing backwards or not + headersCollector *etl.Collector // ETL collector for headers + canonicalHashesCollector *etl.Collector // ETL collector for canonical hashes } // HeaderRecord encapsulates two forms of the same header - raw RLP encoding (to avoid duplicated decodings and encodings), and parsed value types.Header diff --git a/turbo/stages/mock_sentry.go b/turbo/stages/mock_sentry.go index 87856cbcac8524d0d3ee97c76152dc063ffa02ed..d2bad02470425497052e6ddb8291865d7454757d 100644 --- a/turbo/stages/mock_sentry.go +++ b/turbo/stages/mock_sentry.go @@ -286,6 +286,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey nil, allSnapshots, blockReader, + mock.tmpdir, ), stagedsync.StageBlockHashesCfg(mock.DB, mock.tmpdir, mock.ChainConfig), stagedsync.StageBodiesCfg( mock.DB, mock.downloader.Bd, diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index 3dcf4d0bf1f05bd63dde7b111d5790e0097a1096..47122e46b9263a06717b90a34d148ee7553f3b70 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -255,6 +255,7 @@ func NewStagedSync( waitingForPOSHeaders, allSnapshots, blockReader, + tmpdir, ), stagedsync.StageBlockHashesCfg(db, tmpdir, controlServer.ChainConfig), stagedsync.StageBodiesCfg( db, controlServer.Bd,