From 5413a4db91b229abc86bc5461f665a898f9cc866 Mon Sep 17 00:00:00 2001 From: ledgerwatch <akhounov@gmail.com> Date: Fri, 10 Dec 2021 07:04:04 +0000 Subject: [PATCH] Header download simplifications (#3106) * Header download simplifications * Fixes * Reuse headerRaw for inserting into DB * Fix tests * Fix test * Revert to the loop in RequestMoreHeaders * Change Warn to Debug Co-authored-by: Alex Sharp <alexsharp@Alexs-MacBook-Pro.local> Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local> --- cmd/sentry/download/downloader.go | 13 +- eth/stagedsync/stage_headers.go | 14 +- .../stages/headerdownload/header_algo_test.go | 7 +- turbo/stages/headerdownload/header_algos.go | 288 +++++++----------- .../headerdownload/header_data_struct.go | 2 + 5 files changed, 139 insertions(+), 185 deletions(-) diff --git a/cmd/sentry/download/downloader.go b/cmd/sentry/download/downloader.go index 85d65c74fb..2a2c958d2b 100644 --- a/cmd/sentry/download/downloader.go +++ b/cmd/sentry/download/downloader.go @@ -471,8 +471,8 @@ func (cs *ControlServerImpl) blockHeaders(ctx context.Context, pkt eth.BlockHead }) } - if segments, penalty, err := cs.Hd.SplitIntoSegments(csHeaders); err == nil { - if penalty == headerdownload.NoPenalty { + if segments, penaltyKind, err := cs.Hd.SplitIntoSegments(csHeaders); err == nil { + if penaltyKind == headerdownload.NoPenalty { var canRequestMore bool for _, segment := range segments { requestMore, penalties := cs.Hd.ProcessSegment(segment, false /* newBlock */, ConvertH256ToPeerID(peerID)) @@ -486,12 +486,15 @@ func (cs *ControlServerImpl) blockHeaders(ctx context.Context, pkt eth.BlockHead currentTime := uint64(time.Now().Unix()) req, penalties := cs.Hd.RequestMoreHeaders(currentTime) if req != nil { - if _, ok := cs.SendHeaderRequest(ctx, req); ok { - cs.Hd.SentRequest(req, currentTime, 5 /* timeout */) + if _, sentToPeer := cs.SendHeaderRequest(ctx, req); sentToPeer { + // If request was actually sent to a peer, we update retry time to be 5 seconds in the future + cs.Hd.UpdateRetryTime(req, currentTime, 5 /* timeout */) log.Trace("Sent request", "height", req.Number) } } - cs.Penalize(ctx, penalties) + if len(penalties) > 0 { + cs.Penalize(ctx, penalties) + } } } else { outreq := proto_sentry.PenalizePeerRequest{ diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index f77f340758..b5f698f619 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -365,22 +365,28 @@ Loop: if req != nil { _, sentToPeer = cfg.headerReqSend(ctx, req) if sentToPeer { - cfg.hd.SentRequest(req, currentTime, 5 /* timeout */) + // If request was actually sent to a peer, we update retry time to be 5 seconds in the future + cfg.hd.UpdateRetryTime(req, currentTime, 5 /* timeout */) log.Trace("Sent request", "height", req.Number) } } - cfg.penalize(ctx, penalties) + if len(penalties) > 0 { + cfg.penalize(ctx, penalties) + } maxRequests := 64 // Limit number of requests sent per round to let some headers to be inserted into the database for req != nil && sentToPeer && maxRequests > 0 { req, penalties = cfg.hd.RequestMoreHeaders(currentTime) if req != nil { _, sentToPeer = cfg.headerReqSend(ctx, req) if sentToPeer { - cfg.hd.SentRequest(req, currentTime, 5 /*timeout */) + // If request was actually sent to a peer, we update retry time to be 5 seconds in the future + cfg.hd.UpdateRetryTime(req, currentTime, 5 /*timeout */) log.Trace("Sent request", "height", req.Number) } } - cfg.penalize(ctx, penalties) + if len(penalties) > 0 { + cfg.penalize(ctx, penalties) + } maxRequests-- } diff --git a/turbo/stages/headerdownload/header_algo_test.go b/turbo/stages/headerdownload/header_algo_test.go index 167e21cf9b..31f7767f15 100644 --- a/turbo/stages/headerdownload/header_algo_test.go +++ b/turbo/stages/headerdownload/header_algo_test.go @@ -11,6 +11,7 @@ import ( "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/crypto" "github.com/ledgerwatch/erigon/params" + "github.com/ledgerwatch/erigon/rlp" "github.com/ledgerwatch/erigon/turbo/snapshotsync" ) @@ -49,10 +50,12 @@ func TestInserter1(t *testing.T) { ParentHash: h1Hash, } h2Hash := h2.Hash() - if _, err = hi.FeedHeader(tx, snapshotsync.NewBlockReader(), &h1, h1Hash, 1, nil); err != nil { + data1, _ := rlp.EncodeToBytes(&h1) + if _, err = hi.FeedHeader(tx, snapshotsync.NewBlockReader(), &h1, data1, h1Hash, 1, nil); err != nil { t.Errorf("feed empty header 1: %v", err) } - if _, err = hi.FeedHeader(tx, snapshotsync.NewBlockReader(), &h2, h2Hash, 2, nil); err != nil { + data2, _ := rlp.EncodeToBytes(&h2) + if _, err = hi.FeedHeader(tx, snapshotsync.NewBlockReader(), &h2, data2, h2Hash, 2, nil); err != nil { t.Errorf("feed empty header 2: %v", err) } } diff --git a/turbo/stages/headerdownload/header_algos.go b/turbo/stages/headerdownload/header_algos.go index 6fc5f440f6..9fe0594ef6 100644 --- a/turbo/stages/headerdownload/header_algos.go +++ b/turbo/stages/headerdownload/header_algos.go @@ -137,31 +137,31 @@ func (hd *HeaderDownload) IsBadHeader(headerHash common.Hash) bool { return ok } -// FindAnchors attempts to find anchors to which given chain segment can be attached to -func (hd *HeaderDownload) findAnchors(segment ChainSegment) (found bool, start int) { +// findAnchor attempts to find anchor to which given chain segment can be attached to +func (hd *HeaderDownload) findAnchor(segment ChainSegment) (found bool, anchor *Anchor, start int) { // Walk the segment from children towards parents for i, h := range segment { // Check if the header can be attached to an anchor of a working tree - if _, attaching := hd.anchors[h.Hash]; attaching { - return true, i + if anchor, attaching := hd.anchors[h.Hash]; attaching { + return true, anchor, i } } - return false, 0 + return false, nil, 0 } // FindLink attempts to find a non-persisted link that given chain segment can be attached to. -func (hd *HeaderDownload) findLink(segment ChainSegment, start int) (found bool, end int) { +func (hd *HeaderDownload) findLink(segment ChainSegment, start int) (found bool, link *Link, end int) { if _, duplicate := hd.getLink(segment[start].Hash); duplicate { - return false, 0 + return false, nil, 0 } // Walk the segment from children towards parents for i, h := range segment[start:] { // Check if the header can be attached to any links - if _, attaching := hd.getLink(h.Header.ParentHash); attaching { - return true, start + i + 1 + if link, attaching := hd.getLink(h.Header.ParentHash); attaching { + return true, link, start + i + 1 } } - return false, len(segment) + return false, nil, len(segment) } func (hd *HeaderDownload) removeUpwards(toRemove []*Link) { @@ -183,110 +183,77 @@ func (hd *HeaderDownload) markPreverified(link *Link) { } // ExtendUp extends a working tree up from the link, using given chain segment -func (hd *HeaderDownload) extendUp(segment ChainSegment) error { - // Find attachment link again - linkH := segment[len(segment)-1] - attachmentLink, attaching := hd.getLink(linkH.Header.ParentHash) - if !attaching { - return fmt.Errorf("extendUp attachment link not found for %x", linkH.Header.ParentHash) - } - if attachmentLink.preverified && len(attachmentLink.next) > 0 { - return fmt.Errorf("cannot extendUp from preverified link %d with children", attachmentLink.blockHeight) - } +func (hd *HeaderDownload) extendUp(segment ChainSegment, attachmentLink *Link) { // Iterate over headers backwards (from parents towards children) prevLink := attachmentLink for i := len(segment) - 1; i >= 0; i-- { link := hd.addHeaderAsLink(segment[i], false /* persisted */) + if prevLink.persisted { + // If we are attching to already persisted link, schedule for insertion (persistence) + hd.insertList = append(hd.insertList, link) + } prevLink.next = append(prevLink.next, link) prevLink = link if _, ok := hd.preverifiedHashes[link.hash]; ok { hd.markPreverified(link) } } - - if _, bad := hd.badHeaders[attachmentLink.hash]; !bad && attachmentLink.persisted { - link := hd.links[linkH.Hash] - hd.insertList = append(hd.insertList, link) - } - return nil } // ExtendDown extends some working trees down from the anchor, using given chain segment // it creates a new anchor and collects all the links from the attached anchors to it -func (hd *HeaderDownload) extendDown(segment ChainSegment) (bool, error) { +func (hd *HeaderDownload) extendDown(segment ChainSegment, anchor *Anchor) bool { // Find attachment anchor again - anchorHash := segment[0].Hash - if anchor, attaching := hd.anchors[anchorHash]; attaching { - anchorPreverified := false - for _, link := range anchor.links { - if link.preverified { - anchorPreverified = true - break - } + + anchorPreverified := false + for _, link := range anchor.links { + if link.preverified { + anchorPreverified = true + break } - newAnchorH := segment[len(segment)-1] - newAnchorHeader := newAnchorH.Header - var newAnchor *Anchor - newAnchor, preExisting := hd.anchors[newAnchorHeader.ParentHash] - if !preExisting { - newAnchor = &Anchor{ - parentHash: newAnchorHeader.ParentHash, - nextRetryTime: 0, // Will ensure this anchor will be top priority - peerID: anchor.peerID, - blockHeight: newAnchorH.Number, - } - if newAnchor.blockHeight > 0 { - hd.anchors[newAnchorHeader.ParentHash] = newAnchor - heap.Push(hd.anchorQueue, newAnchor) - } + } + newAnchorH := segment[len(segment)-1] + newAnchorHeader := newAnchorH.Header + var newAnchor *Anchor + newAnchor, preExisting := hd.anchors[newAnchorHeader.ParentHash] + if !preExisting { + newAnchor = &Anchor{ + parentHash: newAnchorHeader.ParentHash, + nextRetryTime: 0, // Will ensure this anchor will be top priority + peerID: anchor.peerID, + blockHeight: newAnchorH.Number, } - - // Anchor is removed from the map, but not from the anchorQueue - // This is because it is hard to find the index under which the anchor is stored in the anchorQueue - // But removal will happen anyway, in th function RequestMoreHeaders, if it disapppears from the map - delete(hd.anchors, anchor.parentHash) - // Add all headers in the segments as links to this anchor - var prevLink *Link - for i := len(segment) - 1; i >= 0; i-- { - link := hd.addHeaderAsLink(segment[i], false /* pesisted */) - if prevLink == nil { - newAnchor.links = append(newAnchor.links, link) - } else { - prevLink.next = append(prevLink.next, link) - } - prevLink = link - if _, ok := hd.preverifiedHashes[link.hash]; ok { - hd.markPreverified(link) - } + if newAnchor.blockHeight > 0 { + hd.anchors[newAnchorHeader.ParentHash] = newAnchor + heap.Push(hd.anchorQueue, newAnchor) + } + } + hd.removeAnchor(anchor) + // Add all headers in the segments as links to this anchor + var prevLink *Link + for i := len(segment) - 1; i >= 0; i-- { + link := hd.addHeaderAsLink(segment[i], false /* pesisted */) + if prevLink == nil { + newAnchor.links = append(newAnchor.links, link) + } else { + prevLink.next = append(prevLink.next, link) } - prevLink.next = anchor.links - anchor.links = nil - if anchorPreverified { - // Mark the entire segment as preverified - hd.markPreverified(prevLink) + prevLink = link + if _, ok := hd.preverifiedHashes[link.hash]; ok { + hd.markPreverified(link) } - return !preExisting, nil } - return false, fmt.Errorf("extendDown attachment anchors not found for %x", anchorHash) + prevLink.next = anchor.links + anchor.links = nil + if anchorPreverified { + // Mark the entire segment as preverified + hd.markPreverified(prevLink) + } + return !preExisting } // Connect connects some working trees using anchors of some, and a link of another -func (hd *HeaderDownload) connect(segment ChainSegment) ([]PenaltyItem, error) { - // Find attachment link again - linkH := segment[len(segment)-1] - // Find attachement anchors again - anchorHash := segment[0].Hash - attachmentLink, ok1 := hd.getLink(linkH.Header.ParentHash) - if !ok1 { - return nil, fmt.Errorf("connect attachment link not found for %x", linkH.Header.ParentHash) - } - if attachmentLink.preverified && len(attachmentLink.next) > 0 { - return nil, fmt.Errorf("cannot connect to preverified link %d with children", attachmentLink.blockHeight) - } - anchor, ok2 := hd.anchors[anchorHash] - if !ok2 { - return nil, fmt.Errorf("connect attachment anchors not found for %x", anchorHash) - } +func (hd *HeaderDownload) connect(segment ChainSegment, attachmentLink *Link, anchor *Anchor) []PenaltyItem { anchorPreverified := false for _, link := range anchor.links { if link.preverified { @@ -294,14 +261,15 @@ func (hd *HeaderDownload) connect(segment ChainSegment) ([]PenaltyItem, error) { break } } - // Anchor is removed from the map, but not from the anchorQueue - // This is because it is hard to find the index under which the anchor is stored in the anchorQueue - // But removal will happen anyway, in th function RequestMoreHeaders, if it disapppears from the map - delete(hd.anchors, anchor.parentHash) + hd.removeAnchor(anchor) // Iterate over headers backwards (from parents towards children) prevLink := attachmentLink for i := len(segment) - 1; i >= 0; i-- { link := hd.addHeaderAsLink(segment[i], false /* persisted */) + // If we attach to already persisted link, mark this one for insertion + if prevLink.persisted { + hd.insertList = append(hd.insertList, link) + } prevLink.next = append(prevLink.next, link) prevLink = link if _, ok := hd.preverifiedHashes[link.hash]; ok { @@ -316,30 +284,20 @@ func (hd *HeaderDownload) connect(segment ChainSegment) ([]PenaltyItem, error) { } var penalties []PenaltyItem if _, bad := hd.badHeaders[attachmentLink.hash]; bad { - hd.invalidateAnchor(anchor) + hd.invalidateAnchor(anchor, "descendant of a known bad block") penalties = append(penalties, PenaltyItem{Penalty: AbandonedAnchorPenalty, PeerID: anchor.peerID}) - } else if attachmentLink.persisted { - link := hd.links[linkH.Hash] - hd.insertList = append(hd.insertList, link) } - return penalties, nil + return penalties } -func (hd *HeaderDownload) removeAnchor(anchorHash common.Hash) error { - // Find attachement anchors again - anchor, ok := hd.anchors[anchorHash] - if !ok { - return fmt.Errorf("connect attachment anchors not found for %x", anchorHash) - } - // Anchor is removed from the map, but not from the anchorQueue - // This is because it is hard to find the index under which the anchor is stored in the anchorQueue - // But removal will happen anyway, in th function RequestMoreHeaders, if it disapppears from the map +func (hd *HeaderDownload) removeAnchor(anchor *Anchor) { + // Anchor is removed from the map, and from the priority queue delete(hd.anchors, anchor.parentHash) - return nil + heap.Remove(hd.anchorQueue, anchor.idx) } // if anchor will be abandoned - given peerID will get Penalty -func (hd *HeaderDownload) newAnchor(segment ChainSegment, peerID enode.ID) (bool, error) { +func (hd *HeaderDownload) newAnchor(segment ChainSegment, peerID enode.ID) bool { anchorH := segment[len(segment)-1] anchorHeader := anchorH.Header @@ -347,10 +305,12 @@ func (hd *HeaderDownload) newAnchor(segment ChainSegment, peerID enode.ID) (bool anchor, preExisting := hd.anchors[anchorHeader.ParentHash] if !preExisting { if anchorH.Number < hd.highestInDb { - return false, fmt.Errorf("new anchor too far in the past: %d, latest header in db: %d", anchorH.Number, hd.highestInDb) + log.Debug(fmt.Sprintf("new anchor too far in the past: %d, latest header in db: %d", anchorH.Number, hd.highestInDb)) + return false } if len(hd.anchors) >= hd.anchorLimit { - return false, fmt.Errorf("too many anchors: %d, limit %d", len(hd.anchors), hd.anchorLimit) + log.Debug(fmt.Sprintf("too many anchors: %d, limit %d", len(hd.anchors), hd.anchorLimit)) + return false } anchor = &Anchor{ parentHash: anchorHeader.ParentHash, @@ -375,7 +335,7 @@ func (hd *HeaderDownload) newAnchor(segment ChainSegment, peerID enode.ID) (bool hd.markPreverified(link) } } - return !preExisting, nil + return !preExisting } func (hd *HeaderDownload) pruneLinkQueue() { @@ -573,9 +533,9 @@ func (hd *HeaderDownload) ReadProgressFromDb(tx kv.RwTx) (err error) { return nil } -func (hd *HeaderDownload) invalidateAnchor(anchor *Anchor) { - log.Warn("Invalidating anchor for suspected unavailability", "height", anchor.blockHeight) - delete(hd.anchors, anchor.parentHash) +func (hd *HeaderDownload) invalidateAnchor(anchor *Anchor, reason string) { + log.Warn("Invalidating anchor", "height", anchor.blockHeight, "hash", anchor.parentHash, "reason", reason) + hd.removeAnchor(anchor) hd.removeUpwards(anchor.links) } @@ -589,35 +549,34 @@ func (hd *HeaderDownload) RequestMoreHeaders(currentTime uint64) (*HeaderRequest } for hd.anchorQueue.Len() > 0 { anchor := (*hd.anchorQueue)[0] - if _, ok := hd.anchors[anchor.parentHash]; ok { - if anchor.nextRetryTime > currentTime { - // Anchor not ready for re-request yet - return nil, penalties - } - if anchor.timeouts < 10 { - return &HeaderRequest{Hash: anchor.parentHash, Number: anchor.blockHeight - 1, Length: 192, Skip: 0, Reverse: true}, penalties - } else { - // Ancestors of this anchor seem to be unavailable, invalidate and move on - hd.invalidateAnchor(anchor) - penalties = append(penalties, PenaltyItem{Penalty: AbandonedAnchorPenalty, PeerID: anchor.peerID}) - } + // Only process the anchors for which the nextRetryTime has already come + if anchor.nextRetryTime > currentTime { + return nil, penalties + } + if anchor.timeouts < 10 { + // Produce a header request that would extend this anchor (add parent, parent of parent, etc.) + return &HeaderRequest{ + Anchor: anchor, + Hash: anchor.parentHash, + Number: anchor.blockHeight - 1, + Length: 192, + Skip: 0, + Reverse: true, + }, penalties } - // Anchor disappeared or unavailable, pop from the queue and move on - heap.Remove(hd.anchorQueue, 0) + // Ancestors of this anchor seem to be unavailable, invalidate and move on + hd.invalidateAnchor(anchor, "suspected unavailability") + penalties = append(penalties, PenaltyItem{Penalty: AbandonedAnchorPenalty, PeerID: anchor.peerID}) } return nil, penalties } -func (hd *HeaderDownload) SentRequest(req *HeaderRequest, currentTime, timeout uint64) { +func (hd *HeaderDownload) UpdateRetryTime(req *HeaderRequest, currentTime, timeout uint64) { hd.lock.Lock() defer hd.lock.Unlock() - anchor, ok := hd.anchors[req.Hash] - if !ok { - return - } - anchor.timeouts++ - anchor.nextRetryTime = currentTime + timeout - heap.Fix(hd.anchorQueue, anchor.idx) + req.Anchor.timeouts++ + req.Anchor.nextRetryTime = currentTime + timeout + heap.Fix(hd.anchorQueue, req.Anchor.idx) } func (hd *HeaderDownload) RequestSkeleton() *HeaderRequest { @@ -645,7 +604,7 @@ func (hd *HeaderDownload) RequestSkeleton() *HeaderRequest { // InsertHeaders attempts to insert headers into the database, verifying them first // It returns true in the first return value if the system is "in sync" -func (hd *HeaderDownload) InsertHeaders(hf func(header *types.Header, hash common.Hash, blockHeight uint64, terminalTotalDifficulty *big.Int) (bool, error), terminalTotalDifficulty *big.Int, logPrefix string, logChannel <-chan time.Time) (bool, error) { +func (hd *HeaderDownload) InsertHeaders(hf func(header *types.Header, headerRaw []byte, hash common.Hash, blockHeight uint64, terminalTotalDifficulty *big.Int) (bool, error), terminalTotalDifficulty *big.Int, logPrefix string, logChannel <-chan time.Time) (bool, error) { hd.lock.Lock() defer hd.lock.Unlock() @@ -691,7 +650,7 @@ func (hd *HeaderDownload) InsertHeaders(hf func(header *types.Header, hash commo continue } - isTrans, err := hf(link.header, link.hash, link.blockHeight, terminalTotalDifficulty) + isTrans, err := hf(link.header, link.headerRaw, link.hash, link.blockHeight, terminalTotalDifficulty) if err != nil { return false, err } @@ -706,6 +665,7 @@ func (hd *HeaderDownload) InsertHeaders(hf func(header *types.Header, hash commo } link.persisted = true link.header = nil // Drop header reference to free memory, as we won't need it anymore + link.headerRaw = nil heap.Push(hd.persistedLinkQueue, link) if len(link.next) > 0 { hd.insertList = append(hd.insertList, link.next...) @@ -767,6 +727,7 @@ func (hd *HeaderDownload) addHeaderAsLink(h ChainSegmentHeader, persisted bool) blockHeight: h.Number, hash: h.Hash, header: h.Header, + headerRaw: h.HeaderRaw, persisted: persisted, } hd.links[h.Hash] = link @@ -778,13 +739,13 @@ func (hd *HeaderDownload) addHeaderAsLink(h ChainSegmentHeader, persisted bool) return link } -func (hi *HeaderInserter) FeedHeaderFunc(db kv.StatelessRwTx, headerReader interfaces.HeaderReader) func(header *types.Header, hash common.Hash, blockHeight uint64, terminalTotalDifficulty *big.Int) (bool, error) { - return func(header *types.Header, hash common.Hash, blockHeight uint64, terminalTotalDifficulty *big.Int) (bool, error) { - return hi.FeedHeader(db, headerReader, header, hash, blockHeight, terminalTotalDifficulty) +func (hi *HeaderInserter) FeedHeaderFunc(db kv.StatelessRwTx, headerReader interfaces.HeaderReader) func(header *types.Header, headerRaw []byte, hash common.Hash, blockHeight uint64, terminalTotalDifficulty *big.Int) (bool, error) { + return func(header *types.Header, headerRaw []byte, hash common.Hash, blockHeight uint64, terminalTotalDifficulty *big.Int) (bool, error) { + return hi.FeedHeader(db, headerReader, header, headerRaw, hash, blockHeight, terminalTotalDifficulty) } } -func (hi *HeaderInserter) FeedHeader(db kv.StatelessRwTx, headerReader interfaces.HeaderReader, header *types.Header, hash common.Hash, blockHeight uint64, terminalTotalDifficulty *big.Int) (isTrans bool, err error) { +func (hi *HeaderInserter) FeedHeader(db kv.StatelessRwTx, headerReader interfaces.HeaderReader, header *types.Header, headerRaw []byte, hash common.Hash, blockHeight uint64, terminalTotalDifficulty *big.Int) (isTrans bool, err error) { if hash == hi.prevHash { // Skip duplicates return false, nil @@ -870,15 +831,11 @@ func (hi *HeaderInserter) FeedHeader(db kv.StatelessRwTx, headerReader interface // This makes sure we end up chosing the chain with the max total difficulty hi.localTd.Set(td) } - data, err2 := rlp.EncodeToBytes(header) - if err2 != nil { - return false, fmt.Errorf("[%s] failed to RLP encode header: %w", hi.logPrefix, err2) - } if err = rawdb.WriteTd(db, hash, blockHeight, td); err != nil { return false, fmt.Errorf("[%s] failed to WriteTd: %w", hi.logPrefix, err) } - if err = db.Put(kv.Headers, dbutils.HeaderKey(blockHeight, hash), data); err != nil { + if err = db.Put(kv.Headers, dbutils.HeaderKey(blockHeight, hash), headerRaw); err != nil { return false, fmt.Errorf("[%s] failed to store header: %w", hi.logPrefix, err) } @@ -930,16 +887,14 @@ func (hd *HeaderDownload) ProcessSegment(segment ChainSegment, newBlock bool, pe log.Trace("processSegment", "from", lowestNum, "to", highestNum) hd.lock.Lock() defer hd.lock.Unlock() - foundAnchor, start := hd.findAnchors(segment) - foundTip, end := hd.findLink(segment, start) // We ignore penalty because we will check it as part of PoW check + foundAnchor, anchor, start := hd.findAnchor(segment) + foundTip, link, end := hd.findLink(segment, start) if end == 0 { log.Trace("Duplicate segment") if foundAnchor { // If duplicate segment is extending from the anchor, the anchor needs to be deleted, // otherwise it will keep producing requests that will be found duplicate - if err := hd.removeAnchor(segment[start].Hash); err != nil { - log.Warn("removal of anchor failed", "error", err) - } + hd.removeAnchor(anchor) } return } @@ -956,35 +911,20 @@ func (hd *HeaderDownload) ProcessSegment(segment ChainSegment, newBlock bool, pe if foundAnchor { if foundTip { // Connect - var err error - if penalties, err = hd.connect(subSegment); err != nil { - log.Debug("Connect failed", "error", err) - return - } + penalties = hd.connect(subSegment, link, anchor) log.Trace("Connected", "start", startNum, "end", endNum) } else { // ExtendDown - var err error - if requestMore, err = hd.extendDown(subSegment); err != nil { - log.Debug("ExtendDown failed", "error", err) - return - } + requestMore = hd.extendDown(subSegment, anchor) log.Trace("Extended Down", "start", startNum, "end", endNum) } } else if foundTip { // ExtendUp - if err := hd.extendUp(subSegment); err != nil { - log.Debug("ExtendUp failed", "error", err) - return - } + hd.extendUp(subSegment, link) log.Trace("Extended Up", "start", startNum, "end", endNum) } else { // NewAnchor - var err error - if requestMore, err = hd.newAnchor(subSegment, peerID); err != nil { - log.Debug("NewAnchor failed", "error", err) - return - } + requestMore = hd.newAnchor(subSegment, peerID) log.Trace("NewAnchor", "start", startNum, "end", endNum) } //log.Info(hd.anchorState()) diff --git a/turbo/stages/headerdownload/header_data_struct.go b/turbo/stages/headerdownload/header_data_struct.go index a6c12094ab..81e75db02e 100644 --- a/turbo/stages/headerdownload/header_data_struct.go +++ b/turbo/stages/headerdownload/header_data_struct.go @@ -21,6 +21,7 @@ import ( // present to allow potential reorgs type Link struct { header *types.Header + headerRaw []byte next []*Link // Allows iteration over links in ascending block height order hash common.Hash // Hash of the header blockHeight uint64 @@ -179,6 +180,7 @@ type HeaderRequest struct { Length uint64 Skip uint64 Reverse bool + Anchor *Anchor } type PenaltyItem struct { -- GitLab