diff --git a/cmd/headers/check/check.go b/cmd/headers/check/check.go index b35b19dc6f42b6328ddee709afb7af0021ef7aaa..ac666766ca246f72e4dd354df0c121dd19e9c219 100644 --- a/cmd/headers/check/check.go +++ b/cmd/headers/check/check.go @@ -11,6 +11,7 @@ import ( func Check(filesDir string) error { log.Info("Checking", "directory", filesDir) hd := headerdownload.NewHeaderDownload( + common.Hash{}, /* initialHash */ filesDir, 32*1024, /* bufferLimit */ 16*1024, /* tipLimit */ diff --git a/cmd/headers/commands/download.go b/cmd/headers/commands/download.go index cd0c61e80a58d43cd94bf71c3a9719bcb27d7e1e..360ee0a6f200e488f5f1415d60246d6466562850 100644 --- a/cmd/headers/commands/download.go +++ b/cmd/headers/commands/download.go @@ -14,6 +14,8 @@ func init() { downloadCmd.Flags().StringVar(&bufferSizeStr, "bufferSize", "512M", "size o the buffer") downloadCmd.Flags().StringVar(&sentryAddr, "sentryAddr", "localhost:9091", "sentry address <host>:<port>") downloadCmd.Flags().StringVar(&coreAddr, "coreAddr", "localhost:9092", "core address <host>:<port>") + withChaindata(downloadCmd) + withLmdbFlags(downloadCmd) rootCmd.AddCommand(downloadCmd) } @@ -21,6 +23,8 @@ var downloadCmd = &cobra.Command{ Use: "download", Short: "Download headers backwards", RunE: func(cmd *cobra.Command, args []string) error { - return download.Download(filesDir, bufferSizeStr, sentryAddr, coreAddr) + db := openDatabase(chaindata) + defer db.Close() + return download.Download(filesDir, bufferSizeStr, sentryAddr, coreAddr, db) }, } diff --git a/cmd/headers/commands/root.go b/cmd/headers/commands/root.go index 8ac2669ee2ee5c37e96f7cfcd637bafa46bd6067..95f8e70fbe97be5ffdc8bd13d6ec825cb66d92b2 100644 --- a/cmd/headers/commands/root.go +++ b/cmd/headers/commands/root.go @@ -7,16 +7,22 @@ import ( "os/signal" "syscall" + "github.com/c2h5oh/datasize" "github.com/ledgerwatch/turbo-geth/cmd/utils" + "github.com/ledgerwatch/turbo-geth/ethdb" "github.com/ledgerwatch/turbo-geth/internal/debug" "github.com/ledgerwatch/turbo-geth/log" "github.com/spf13/cobra" ) var ( - filesDir string // Directory when the files should be stored - sentryAddr string // Address of the sentry <host>:<port> - coreAddr string // Address of the core <host>:<port> + filesDir string // Directory when the files should be stored + sentryAddr string // Address of the sentry <host>:<port> + coreAddr string // Address of the core <host>:<port> + chaindata string // Path to chaindata + database string // Type of database (lmdb or mdbx) + mapSizeStr string // Map size for LMDB + freelistReuse int ) func init() { @@ -60,3 +66,58 @@ func Execute() { os.Exit(1) } } + +func must(err error) { + if err != nil { + panic(err) + } +} + +func withChaindata(cmd *cobra.Command) { + cmd.Flags().StringVar(&chaindata, "chaindata", "", "path to the db") + must(cmd.MarkFlagDirname("chaindata")) + must(cmd.MarkFlagRequired("chaindata")) + cmd.Flags().StringVar(&database, "database", "", "lmdb|mdbx") +} + +func withLmdbFlags(cmd *cobra.Command) { + cmd.Flags().StringVar(&mapSizeStr, "lmdb.mapSize", "", "map size for LMDB") + cmd.Flags().IntVar(&freelistReuse, "maxFreelistReuse", 0, "Find a big enough contiguous page range for large values in freelist is hard just allocate new pages and even don't try to search if value is bigger than this limit. Measured in pages.") +} + +func openDatabase(path string) *ethdb.ObjectDatabase { + db := ethdb.NewObjectDatabase(openKV(path, false)) + return db +} + +func openKV(path string, exclusive bool) ethdb.KV { + if database == "mdbx" { + opts := ethdb.NewMDBX().Path(path) + if exclusive { + opts = opts.Exclusive() + } + if mapSizeStr != "" { + var mapSize datasize.ByteSize + must(mapSize.UnmarshalText([]byte(mapSizeStr))) + opts = opts.MapSize(mapSize) + } + if freelistReuse > 0 { + opts = opts.MaxFreelistReuse(uint(freelistReuse)) + } + return opts.MustOpen() + } + + opts := ethdb.NewLMDB().Path(path) + if exclusive { + opts = opts.Exclusive() + } + if mapSizeStr != "" { + var mapSize datasize.ByteSize + must(mapSize.UnmarshalText([]byte(mapSizeStr))) + opts = opts.MapSize(mapSize) + } + if freelistReuse > 0 { + opts = opts.MaxFreelistReuse(uint(freelistReuse)) + } + return opts.MustOpen() +} diff --git a/cmd/headers/download/downloader.go b/cmd/headers/download/downloader.go index cc5294d0368902935ed16f19ed99436e2a8ffb1b..5dbcbb7c0502d9acefe4d7822c24f119002099b3 100644 --- a/cmd/headers/download/downloader.go +++ b/cmd/headers/download/downloader.go @@ -24,10 +24,12 @@ import ( "github.com/ledgerwatch/turbo-geth/consensus/ethash" "github.com/ledgerwatch/turbo-geth/core/types" "github.com/ledgerwatch/turbo-geth/eth" + "github.com/ledgerwatch/turbo-geth/ethdb" "github.com/ledgerwatch/turbo-geth/log" "github.com/ledgerwatch/turbo-geth/metrics" "github.com/ledgerwatch/turbo-geth/params" "github.com/ledgerwatch/turbo-geth/rlp" + "github.com/ledgerwatch/turbo-geth/turbo/stages" "github.com/ledgerwatch/turbo-geth/turbo/stages/headerdownload" "google.golang.org/grpc" "google.golang.org/grpc/backoff" @@ -44,7 +46,10 @@ func (cr chainReader) GetHeader(hash common.Hash, number uint64) *types.Header { func (cr chainReader) GetHeaderByNumber(number uint64) *types.Header { panic("") } func (cr chainReader) GetHeaderByHash(hash common.Hash) *types.Header { panic("") } -func processSegment(hd *headerdownload.HeaderDownload, segment *headerdownload.ChainSegment) { +//nolint:interfacer +func processSegment(lock *sync.Mutex, hd *headerdownload.HeaderDownload, segment *headerdownload.ChainSegment) { + lock.Lock() + defer lock.Unlock() log.Info(hd.AnchorState()) log.Info("processSegment", "from", segment.Headers[0].Number.Uint64(), "to", segment.Headers[len(segment.Headers)-1].Number.Uint64()) foundAnchor, start, anchorParent, invalidAnchors := hd.FindAnchors(segment) @@ -101,9 +106,6 @@ func processSegment(hd *headerdownload.HeaderDownload, segment *headerdownload.C hd.AddSegmentToBuffer(segment, start, end) log.Info("Extended Up", "start", start, "end", end) } - if start == 0 || end > 0 { - hd.CheckInitiation(segment, params.MainnetGenesisHash) - } } } else { // NewAnchor @@ -116,7 +118,7 @@ func processSegment(hd *headerdownload.HeaderDownload, segment *headerdownload.C } } -func Download(filesDir string, bufferSizeStr string, sentryAddr string, coreAddr string) error { +func Download(filesDir string, bufferSizeStr string, sentryAddr string, coreAddr string, db ethdb.Database) error { ctx := rootContext() log.Info("Starting Core P2P server", "on", coreAddr, "connecting to sentry", coreAddr) @@ -179,7 +181,7 @@ func Download(filesDir string, bufferSizeStr string, sentryAddr string, coreAddr } var controlServer *ControlServerImpl - if controlServer, err = NewControlServer(filesDir, int(bufferSize), sentryClient); err != nil { + if controlServer, err = NewControlServer(db, filesDir, int(bufferSize), sentryClient); err != nil { return fmt.Errorf("create core P2P server: %w", err) } proto_core.RegisterControlServer(grpcServer, controlServer) @@ -195,19 +197,22 @@ func Download(filesDir string, bufferSizeStr string, sentryAddr string, coreAddr go controlServer.loop(ctx) - <-ctx.Done() + if err = stages.StageLoop(ctx, db, controlServer.hd); err != nil { + log.Error("Stage loop failure", "error", err) + } + return nil } type ControlServerImpl struct { proto_core.UnimplementedControlServer - hdLock sync.Mutex + lock sync.Mutex hd *headerdownload.HeaderDownload sentryClient proto_sentry.SentryClient requestWakeUp chan struct{} } -func NewControlServer(filesDir string, bufferSize int, sentryClient proto_sentry.SentryClient) (*ControlServerImpl, error) { +func NewControlServer(db ethdb.Database, filesDir string, bufferSize int, sentryClient proto_sentry.SentryClient) (*ControlServerImpl, error) { //config := eth.DefaultConfig.Ethash engine := ethash.New(ethash.Config{ CachesInMem: 1, @@ -225,6 +230,7 @@ func NewControlServer(filesDir string, bufferSize int, sentryClient proto_sentry return engine.VerifySeal(cr, header) } hd := headerdownload.NewHeaderDownload( + common.Hash{}, /* initialHash */ filesDir, bufferSize, /* bufferLimit */ 16*1024, /* tipLimit */ @@ -234,10 +240,14 @@ func NewControlServer(filesDir string, bufferSize int, sentryClient proto_sentry 3600, /* newAnchor future limit */ 3600, /* newAnchor past limit */ ) + dbRecovered, err := hd.RecoverFromDb(db, uint64(time.Now().Unix())) + if err != nil { + log.Error("Recovery from DB failed", "error", err) + } hardTips := headerdownload.InitHardCodedTips("hard-coded-headers.dat") - if recovered, err := hd.RecoverFromFiles(uint64(time.Now().Unix()), hardTips); err != nil || !recovered { - if err != nil { - log.Error("Recovery from file failed, will start from scratch", "error", err) + if recovered, err1 := hd.RecoverFromFiles(uint64(time.Now().Unix()), hardTips); err1 != nil || (!recovered && !dbRecovered) { + if err1 != nil { + log.Error("Recovery from file failed, will start from scratch", "error", err1) } hd.SetHardCodedTips(hardTips) // Insert hard-coded headers if present @@ -270,8 +280,6 @@ func NewControlServer(filesDir string, bufferSize int, sentryClient proto_sentry } func (cs *ControlServerImpl) newBlockHashes(ctx context.Context, inreq *proto_core.InboundMessage) (*empty.Empty, error) { - cs.hdLock.Lock() - defer cs.hdLock.Unlock() var request eth.NewBlockHashesData if err := rlp.DecodeBytes(inreq.Data, &request); err != nil { return nil, fmt.Errorf("decode NewBlockHashes: %v", err) @@ -305,8 +313,6 @@ func (cs *ControlServerImpl) newBlockHashes(ctx context.Context, inreq *proto_co } func (cs *ControlServerImpl) blockHeaders(ctx context.Context, inreq *proto_core.InboundMessage) (*empty.Empty, error) { - cs.hdLock.Lock() - defer cs.hdLock.Unlock() var request []*types.Header if err := rlp.DecodeBytes(inreq.Data, &request); err != nil { return nil, fmt.Errorf("decode BlockHeaders: %v", err) @@ -314,7 +320,7 @@ func (cs *ControlServerImpl) blockHeaders(ctx context.Context, inreq *proto_core if segments, penalty, err := cs.hd.SplitIntoSegments(request); err == nil { if penalty == headerdownload.NoPenalty { for _, segment := range segments { - processSegment(cs.hd, segment) + processSegment(&cs.lock, cs.hd, segment) } } else { outreq := proto_sentry.PenalizePeerRequest{ @@ -333,15 +339,13 @@ func (cs *ControlServerImpl) blockHeaders(ctx context.Context, inreq *proto_core } func (cs *ControlServerImpl) newBlock(ctx context.Context, inreq *proto_core.InboundMessage) (*empty.Empty, error) { - cs.hdLock.Lock() - defer cs.hdLock.Unlock() var request eth.NewBlockData if err := rlp.DecodeBytes(inreq.Data, &request); err != nil { return nil, fmt.Errorf("decode NewBlockMsg: %v", err) } if segments, penalty, err := cs.hd.SingleHeaderAsSegment(request.Block.Header()); err == nil { if penalty == headerdownload.NoPenalty { - processSegment(cs.hd, segments[0]) // There is only one segment in this case + processSegment(&cs.lock, cs.hd, segments[0]) // There is only one segment in this case } else { outreq := proto_sentry.PenalizePeerRequest{ PeerId: inreq.PeerId, @@ -410,11 +414,7 @@ func (cs *ControlServerImpl) sendRequests(ctx context.Context, reqs []*headerdow } func (cs *ControlServerImpl) loop(ctx context.Context) { - var timer *time.Timer - cs.hdLock.Lock() - reqs := cs.hd.RequestMoreHeaders(uint64(time.Now().Unix()), 5 /*timeout */) - timer = cs.hd.RequestQueueTimer - cs.hdLock.Unlock() + reqs, timer := cs.hd.RequestMoreHeaders(uint64(time.Now().Unix()), 5 /*timeout */) cs.sendRequests(ctx, reqs) for { select { @@ -425,10 +425,7 @@ func (cs *ControlServerImpl) loop(ctx context.Context) { case <-cs.requestWakeUp: log.Info("Woken up by the incoming request") } - cs.hdLock.Lock() - reqs := cs.hd.RequestMoreHeaders(uint64(time.Now().Unix()), 5 /*timeout */) - timer = cs.hd.RequestQueueTimer - cs.hdLock.Unlock() + reqs, timer = cs.hd.RequestMoreHeaders(uint64(time.Now().Unix()), 5 /*timeout */) cs.sendRequests(ctx, reqs) } } diff --git a/turbo/stages/headerdownload/forward.go b/turbo/stages/headerdownload/forward.go new file mode 100644 index 0000000000000000000000000000000000000000..7499947b3657b9bac57e6979e989c1bcc2a655e5 --- /dev/null +++ b/turbo/stages/headerdownload/forward.go @@ -0,0 +1,186 @@ +package headerdownload + +import ( + "context" + "fmt" + "math/big" + "os" + "runtime" + "time" + + "github.com/ledgerwatch/turbo-geth/common" + "github.com/ledgerwatch/turbo-geth/common/dbutils" + "github.com/ledgerwatch/turbo-geth/core/rawdb" + "github.com/ledgerwatch/turbo-geth/core/types" + "github.com/ledgerwatch/turbo-geth/ethdb" + "github.com/ledgerwatch/turbo-geth/log" + "github.com/ledgerwatch/turbo-geth/rlp" +) + +const ( + logInterval = 30 * time.Second +) + +// Forward progresses Headers stage in the forward direction +func Forward(logPrefix string, db ethdb.Database, files []string, buffer []byte) error { + count := 0 + var highest uint64 + log.Info("Processing headers...") + var tx ethdb.DbWithPendingMutations + var useExternalTx bool + if hasTx, ok := db.(ethdb.HasTx); ok && hasTx.Tx() != nil { + tx = db.(ethdb.DbWithPendingMutations) + useExternalTx = true + } else { + var err error + tx, err = db.Begin(context.Background(), ethdb.RW) + if err != nil { + return err + } + defer tx.Rollback() + } + batch := tx.NewBatch() + defer batch.Rollback() + logEvery := time.NewTicker(logInterval) + defer logEvery.Stop() + var logBlock uint64 + + headHash := rawdb.ReadHeadHeaderHash(tx) + headNumber := rawdb.ReadHeaderNumber(tx, headHash) + localTd, err1 := rawdb.ReadTd(tx, headHash, *headNumber) + if err1 != nil { + return err1 + } + var parentDiffs = make(map[common.Hash]*big.Int) + var childDiffs = make(map[common.Hash]*big.Int) + var prevHash common.Hash // Hash of previously seen header - to filter out potential duplicates + var prevHeight uint64 + var newCanonical bool + var forkNumber uint64 + var canonicalBacktrack = make(map[common.Hash]common.Hash) + if _, _, err1 = ReadFilesAndBuffer(files, buffer, func(header *types.Header, blockHeight uint64) error { + hash := header.Hash() + if hash == prevHash { + // Skip duplicates + return nil + } + if ch, err := rawdb.ReadCanonicalHash(tx, blockHeight); err == nil { + if ch == hash { + // Already canonical, skip + return nil + } + } else { + return err + } + if blockHeight < prevHeight { + return fmt.Errorf("[%s] headers are unexpectedly unsorted, got %d after %d", logPrefix, blockHeight, prevHeight) + } + if forkNumber == 0 { + forkNumber = blockHeight + logBlock = blockHeight - 1 + } + if blockHeight > prevHeight { + // Clear out parent map and move childMap to its place + if blockHeight == prevHeight+1 { + parentDiffs = childDiffs + } else if prevHeight > 0 { + return fmt.Errorf("[%s] header chain break, from %d to %d", logPrefix, prevHeight, blockHeight) + } + childDiffs = make(map[common.Hash]*big.Int) + prevHeight = blockHeight + } + parentDiff, ok := parentDiffs[header.ParentHash] + if !ok { + var err error + if parentDiff, err = rawdb.ReadTd(tx, header.ParentHash, blockHeight-1); err != nil { + return fmt.Errorf("[%s] reading total difficulty of the parent header %d %x: %w", logPrefix, blockHeight-1, header.ParentHash, err) + } + } + cumulativeDiff := new(big.Int).Add(parentDiff, header.Difficulty) + childDiffs[hash] = cumulativeDiff + if !newCanonical && cumulativeDiff.Cmp(localTd) > 0 { + newCanonical = true + backHash := header.ParentHash + backNumber := blockHeight - 1 + for pHash, pOk := canonicalBacktrack[backHash]; pOk; pHash, pOk = canonicalBacktrack[backHash] { + if err := rawdb.WriteCanonicalHash(batch, backHash, backNumber); err != nil { + return fmt.Errorf("[%s] marking canonical header %d %x: %w", logPrefix, backNumber, backHash, err) + } + backHash = pHash + backNumber-- + } + canonicalBacktrack = nil + } + if !newCanonical { + canonicalBacktrack[hash] = header.ParentHash + } + if newCanonical { + if err := rawdb.WriteCanonicalHash(batch, hash, blockHeight); err != nil { + return fmt.Errorf("[%s] marking canonical header %d %x: %w", logPrefix, blockHeight, hash, err) + } + } + data, err := rlp.EncodeToBytes(header) + if err != nil { + return fmt.Errorf("[%s] Failed to RLP encode header: %w", logPrefix, err) + } + if err = rawdb.WriteTd(batch, hash, blockHeight, cumulativeDiff); err != nil { + return fmt.Errorf("[%s] Failed to WriteTd: %w", logPrefix, err) + } + if err = batch.Put(dbutils.HeaderPrefix, dbutils.HeaderKey(blockHeight, hash), data); err != nil { + return fmt.Errorf("[%s] Failed to store header: %w", logPrefix, err) + } + prevHash = hash + count++ + if blockHeight > highest { + highest = blockHeight + } + if batch.BatchSize() >= batch.IdealBatchSize() { + if err = batch.CommitAndBegin(context.Background()); err != nil { + return err + } + if !useExternalTx { + if err = tx.CommitAndBegin(context.Background()); err != nil { + return err + } + } + } + select { + default: + case <-logEvery.C: + logBlock = logProgress(logPrefix, logBlock, blockHeight, batch) + } + return nil + }); err1 != nil { + return err1 + } + if _, err := batch.Commit(); err != nil { + return fmt.Errorf("%s: failed to write batch commit: %v", logPrefix, err) + } + if !useExternalTx { + if _, err := tx.Commit(); err != nil { + return err + } + } + log.Info("Processed", "headers", count, "highest", highest) + for _, file := range files { + if err := os.Remove(file); err != nil { + log.Error("Could not remove", "file", file, "error", err) + } + } + return nil +} + +func logProgress(logPrefix string, prev, now uint64, batch ethdb.DbWithPendingMutations) uint64 { + speed := float64(now-prev) / float64(logInterval/time.Second) + var m runtime.MemStats + runtime.ReadMemStats(&m) + log.Info(fmt.Sprintf("[%s] Wrote blocks", logPrefix), + "number", now, + "blk/second", speed, + "batch", common.StorageSize(batch.BatchSize()), + "alloc", common.StorageSize(m.Alloc), + "sys", common.StorageSize(m.Sys), + "numGC", int(m.NumGC)) + + return now +} diff --git a/turbo/stages/headerdownload/header_algos.go b/turbo/stages/headerdownload/header_algos.go index 6883d766c1e1d3114b90532d4af15c0264df34d4..f1f911783f1100be15505ee6c53c5f881d999da6 100644 --- a/turbo/stages/headerdownload/header_algos.go +++ b/turbo/stages/headerdownload/header_algos.go @@ -4,6 +4,7 @@ import ( "bufio" "bytes" "container/heap" + "context" "encoding/binary" "errors" "fmt" @@ -18,8 +19,12 @@ import ( "github.com/holiman/uint256" "github.com/ledgerwatch/turbo-geth/common" + "github.com/ledgerwatch/turbo-geth/common/dbutils" + "github.com/ledgerwatch/turbo-geth/core/rawdb" "github.com/ledgerwatch/turbo-geth/core/types" + "github.com/ledgerwatch/turbo-geth/ethdb" "github.com/ledgerwatch/turbo-geth/log" + "github.com/ledgerwatch/turbo-geth/rlp" ) // Implements sort.Interface so we can sort the incoming header in the message by block height @@ -40,6 +45,8 @@ func (h HeadersByBlockHeight) Swap(i, j int) { // SplitIntoSegments converts message containing headers into a collection of chain segments func (hd *HeaderDownload) SplitIntoSegments(msg []*types.Header) ([]*ChainSegment, Penalty, error) { + hd.lock.RLock() + defer hd.lock.RUnlock() sort.Sort(HeadersByBlockHeight(msg)) // Now all headers are order from the highest block height to the lowest var segments []*ChainSegment // Segments being built @@ -94,6 +101,8 @@ func (hd *HeaderDownload) childParentValid(child, parent *types.Header) (bool, P // SingleHeaderAsSegment converts message containing 1 header into one singleton chain segment func (hd *HeaderDownload) SingleHeaderAsSegment(header *types.Header) ([]*ChainSegment, Penalty, error) { + hd.lock.RLock() + defer hd.lock.RUnlock() headerHash := header.Hash() if _, bad := hd.badHeaders[headerHash]; bad { return nil, BadBlockPenalty, nil @@ -103,6 +112,8 @@ func (hd *HeaderDownload) SingleHeaderAsSegment(header *types.Header) ([]*ChainS // FindAnchors attempts to find anchors to which given chain segment can be attached to func (hd *HeaderDownload) FindAnchors(segment *ChainSegment) (found bool, start int, anchorParent common.Hash, invalidAnchors []int) { + hd.lock.RLock() + defer hd.lock.RUnlock() // Walk the segment from children towards parents for i, header := range segment.Headers { // Check if the header can be attached to an anchor of a working tree @@ -121,6 +132,8 @@ func (hd *HeaderDownload) FindAnchors(segment *ChainSegment) (found bool, start // InvalidateAnchors removes trees with given anchor hashes (belonging to the given anchor parent) func (hd *HeaderDownload) InvalidateAnchors(anchorParent common.Hash, invalidAnchors []int) (tombstones []common.Hash, err error) { + hd.lock.Lock() + defer hd.lock.Unlock() if len(invalidAnchors) > 0 { if anchors, attaching := hd.anchors[anchorParent]; attaching { j := 0 @@ -154,6 +167,8 @@ func (hd *HeaderDownload) InvalidateAnchors(anchorParent common.Hash, invalidAnc // FindTip attempts to find tip of a tree that given chain segment can be attached to // the given chain segment may be found invalid relative to a working tree, in this case penalty for peer is returned func (hd *HeaderDownload) FindTip(segment *ChainSegment, start int) (found bool, end int, penalty Penalty) { + hd.lock.RLock() + defer hd.lock.RUnlock() if _, duplicate := hd.getTip(segment.Headers[start].Hash()); duplicate { return true, 0, NoPenalty } @@ -175,6 +190,8 @@ func (hd *HeaderDownload) FindTip(segment *ChainSegment, start int) (found bool, // It reports first verification error, or returns the powDepth that the anchor of this // chain segment should have, if created func (hd *HeaderDownload) VerifySeals(segment *ChainSegment, anchorFound, tipFound bool, start, end int, currentTime uint64) (powDepth int, err error) { + hd.lock.RLock() + defer hd.lock.RUnlock() if !anchorFound && !tipFound { anchorHeader := segment.Headers[end-1] if anchorHeader.Time > currentTime+hd.newAnchorFutureLimit { @@ -222,6 +239,8 @@ func (hd *HeaderDownload) VerifySeals(segment *ChainSegment, anchorFound, tipFou // ExtendUp extends a working tree up from the tip, using given chain segment func (hd *HeaderDownload) ExtendUp(segment *ChainSegment, start, end int, currentTime uint64) error { + hd.lock.Lock() + defer hd.lock.Unlock() // Find attachment tip again tipHeader := segment.Headers[end-1] if attachmentTip, attaching := hd.getTip(tipHeader.ParentHash); attaching { @@ -239,15 +258,32 @@ func (hd *HeaderDownload) ExtendUp(segment *ChainSegment, start, end int, curren return fmt.Errorf("extendUp addHeaderAsTip for %x: %v", header.Hash(), err) } } + if start == 0 || end > 0 { + // Check if the staged sync can start + if hd.checkInitiation(segment) { + hd.stageReady = true + // Signal at every opportunity to avoid deadlocks + select { + case hd.stageReadyCh <- struct{}{}: + default: + } + } + } } else { return fmt.Errorf("extendUp attachment tip not found for %x", tipHeader.ParentHash) } return nil } +func (hd *HeaderDownload) StageReadyChannel() chan struct{} { + return hd.stageReadyCh +} + // ExtendDown extends some working trees down from the anchor, using given chain segment // it creates a new anchor and collects all the tips from the attached anchors to it func (hd *HeaderDownload) ExtendDown(segment *ChainSegment, start, end int, powDepth int, currentTime uint64) error { + hd.lock.Lock() + defer hd.lock.Unlock() // Find attachement anchors again anchorHeader := segment.Headers[start] if anchors, attaching := hd.anchors[anchorHeader.Hash()]; attaching { @@ -260,6 +296,7 @@ func (hd *HeaderDownload) ExtendDown(segment *ChainSegment, start, end int, powD powDepth: powDepth, timestamp: newAnchorHeader.Time, difficulty: *diff, + parentHash: newAnchorHeader.ParentHash, hash: newAnchorHeader.Hash(), blockHeight: newAnchorHeader.Number.Uint64(), tipQueue: &AnchorTipQueue{}, @@ -316,6 +353,8 @@ func (hd *HeaderDownload) ExtendDown(segment *ChainSegment, start, end int, powD // Connect connects some working trees using anchors of some, and a tip of another func (hd *HeaderDownload) Connect(segment *ChainSegment, start, end int, currentTime uint64) error { + hd.lock.Lock() + defer hd.lock.Unlock() // Find attachment tip again tipHeader := segment.Headers[end-1] // Find attachement anchors again @@ -374,6 +413,8 @@ func (hd *HeaderDownload) Connect(segment *ChainSegment, start, end int, current } func (hd *HeaderDownload) NewAnchor(segment *ChainSegment, start, end int, currentTime uint64) error { + hd.lock.Lock() + defer hd.lock.Unlock() anchorHeader := segment.Headers[end-1] var anchor *Anchor var err error @@ -393,13 +434,15 @@ func (hd *HeaderDownload) NewAnchor(segment *ChainSegment, start, end int, curre return fmt.Errorf("newAnchor addHeaderAsTip for %x: %v", header.Hash(), err) } } - if anchorHeader.ParentHash != (common.Hash{}) { + if anchorHeader.ParentHash != hd.initialHash { hd.requestQueue.PushFront(RequestQueueItem{anchorParent: anchorHeader.ParentHash, waitUntil: currentTime}) } return nil } func (hd *HeaderDownload) HardCodedHeader(header *types.Header, currentTime uint64) error { + hd.lock.Lock() + defer hd.lock.Unlock() if anchor, err := hd.addHeaderAsAnchor(header, 0 /* powDepth */); err == nil { diff, overflow := uint256.FromBig(header.Difficulty) if overflow { @@ -434,6 +477,8 @@ func (hd *HeaderDownload) HardCodedHeader(header *types.Header, currentTime uint // AddSegmentToBuffer adds another segment to the buffer and return true if the buffer is now full func (hd *HeaderDownload) AddSegmentToBuffer(segment *ChainSegment, start, end int) { + hd.lock.Lock() + defer hd.lock.Unlock() if end > start { fmt.Printf("Adding segment [%d-%d] to the buffer\n", segment.Headers[end-1].Number.Uint64(), segment.Headers[start].Number.Uint64()) } @@ -445,6 +490,8 @@ func (hd *HeaderDownload) AddSegmentToBuffer(segment *ChainSegment, start, end i } func (hd *HeaderDownload) AddHeaderToBuffer(header *types.Header) { + hd.lock.Lock() + defer hd.lock.Unlock() fmt.Printf("Adding header %d to the buffer\n", header.Number.Uint64()) var serBuffer [HeaderSerLength]byte SerialiseHeader(header, serBuffer[:]) @@ -452,6 +499,8 @@ func (hd *HeaderDownload) AddHeaderToBuffer(header *types.Header) { } func (hd *HeaderDownload) AnchorState() string { + hd.lock.RLock() + defer hd.lock.RUnlock() //nolint:prealloc var ss []string for anchorParent, anchors := range hd.anchors { @@ -561,6 +610,8 @@ func (h *Heap) Pop() interface{} { const AnchorSerLen = 32 /* ParentHash */ + 8 /* powDepth */ + 8 /* maxTipHeight */ func (hd *HeaderDownload) CheckFiles() error { + hd.lock.RLock() + defer hd.lock.RUnlock() fileInfos, err := ioutil.ReadDir(hd.filesDir) if err != nil { return err @@ -626,48 +677,42 @@ func InitHardCodedTips(filename string) map[common.Hash]struct{} { } func (hd *HeaderDownload) SetHardCodedTips(hardTips map[common.Hash]struct{}) { + hd.lock.Lock() + defer hd.lock.Unlock() hd.hardTips = hardTips } -func (hd *HeaderDownload) RecoverFromFiles(currentTime uint64, hardTips map[common.Hash]struct{}) (bool, error) { - fileInfos, err := ioutil.ReadDir(hd.filesDir) - if err != nil { - return false, err - } - h := &Heap{} - heap.Init(h) - var buffer [HeaderSerLength]byte - var anchorBuf [AnchorSerLen]byte +func ReadFilesAndBuffer(files []string, headerBuf []byte, hf func(header *types.Header, blockHeight uint64) error) (map[common.Hash]*Anchor, uint32, error) { //nolint:prealloc var fs []*os.File //nolint:prealloc var rs []io.Reader + var anchorBuf [AnchorSerLen]byte + var lastAnchors map[common.Hash]*Anchor + var lastAnchorSequence uint32 // Open all files and only read anchor sequences to decide which one has the latest information about the anchors - hd.anchorSequence = 0 - var lastAnchors = make(map[common.Hash]*Anchor) - for _, fileInfo := range fileInfos { - f, err1 := os.Open(path.Join(hd.filesDir, fileInfo.Name())) + for _, filename := range files { + f, err1 := os.Open(filename) if err1 != nil { - return false, fmt.Errorf("open file %s: %v", fileInfo.Name(), err1) + return nil, 0, fmt.Errorf("open file %s: %v", filename, err1) } r := bufio.NewReader(f) - if _, err = io.ReadFull(r, anchorBuf[:8]); err != nil { + if _, err := io.ReadFull(r, anchorBuf[:8]); err != nil { fmt.Printf("reading anchor sequence and count from file: %v\n", err) continue } anchorSequence := binary.BigEndian.Uint32(anchorBuf[:]) anchorCount := int(binary.BigEndian.Uint32((anchorBuf[4:]))) var anchors = make(map[common.Hash]*Anchor) - if anchorSequence >= hd.anchorSequence { + if anchorSequence >= lastAnchorSequence { fmt.Printf("Reading anchor sequence %d, anchor count: %d\n", anchorSequence, anchorCount) } for i := 0; i < anchorCount; i++ { - if _, err = io.ReadFull(r, anchorBuf[:]); err != nil { + if _, err := io.ReadFull(r, anchorBuf[:]); err != nil { fmt.Printf("reading anchor %x from file: %v\n", i, err) } - if anchorSequence >= hd.anchorSequence { // Don't bother with parsing if we are not going to use this info - anchor := &Anchor{tipQueue: &AnchorTipQueue{}, anchorID: hd.nextAnchorID} - hd.nextAnchorID++ + if anchorSequence >= lastAnchorSequence { // Don't bother with parsing if we are not going to use this info + anchor := &Anchor{tipQueue: &AnchorTipQueue{}} heap.Init(anchor.tipQueue) pos := 0 copy(anchor.hash[:], anchorBuf[pos:]) @@ -679,19 +724,35 @@ func (hd *HeaderDownload) RecoverFromFiles(currentTime uint64, hardTips map[comm fmt.Printf("anchor: %x, powDepth: %d, maxTipHeight %d\n", anchor.hash, anchor.powDepth, anchor.maxTipHeight) } } - if anchorSequence >= hd.anchorSequence { - hd.anchorSequence = anchorSequence + 1 + if anchorSequence >= lastAnchorSequence { + lastAnchorSequence = anchorSequence + 1 lastAnchors = anchors } fs = append(fs, f) rs = append(rs, r) } + if headerBuf != nil { + sort.Sort(BufferSorter(headerBuf)) + fs = append(fs, nil) + rs = append(rs, bytes.NewReader(headerBuf)) + } + defer func() { + for _, f := range fs { + if f != nil { + //lint:noerrcheck + f.Close() + } + } + }() + h := &Heap{} + heap.Init(h) + var buffer [HeaderSerLength]byte for i, f := range fs { r := rs[i] var header types.Header - if _, err = io.ReadFull(r, buffer[:]); err != nil { + if _, err := io.ReadFull(r, buffer[:]); err != nil { if !errors.Is(err, io.EOF) { - fmt.Printf("reading header from file: %v\n", err) + return nil, 0, fmt.Errorf("reading header from file: %w", err) } continue } @@ -699,76 +760,13 @@ func (hd *HeaderDownload) RecoverFromFiles(currentTime uint64, hardTips map[comm he := HeapElem{file: f, reader: r, blockHeight: header.Number.Uint64(), hash: header.Hash(), header: &header} heap.Push(h, he) } - var prevHeight uint64 - var parentAnchors = make(map[common.Hash]*Anchor) - var parentDiffs = make(map[common.Hash]*uint256.Int) - var childAnchors = make(map[common.Hash]*Anchor) - var childDiffs = make(map[common.Hash]*uint256.Int) - var prevHash common.Hash // Hash of previously seen header - to filter out potential duplicates for h.Len() > 0 { he := (heap.Pop(h)).(HeapElem) - hash := he.header.Hash() - if hash != prevHash { - if he.blockHeight > prevHeight { - // Clear out parent map and move childMap to its place - parentAnchors = childAnchors - parentDiffs = childDiffs - childAnchors = make(map[common.Hash]*Anchor) - childDiffs = make(map[common.Hash]*uint256.Int) - if he.blockHeight != prevHeight+1 { - // Skipping the level, so no connection between grand-parents and grand-children - parentAnchors = make(map[common.Hash]*Anchor) - parentDiffs = make(map[common.Hash]*uint256.Int) - } - prevHeight = he.blockHeight - } - // Since this header has already been processed, we do not expect overflow - cumulativeDiff, overflow := uint256.FromBig(he.header.Difficulty) - if overflow { - return false, fmt.Errorf("overflow when converting header.Difficulty to uint256: %s", he.header.Difficulty) - } - parentHash := he.header.ParentHash - if parentAnchor, found := parentAnchors[parentHash]; found { - parentDiff := parentDiffs[parentHash] - cumulativeDiff.Add(cumulativeDiff, parentDiff) - if err = hd.addHeaderAsTip(he.header, parentAnchor, *cumulativeDiff, currentTime); err != nil { - return false, fmt.Errorf("add header as tip: %v", err) - } - childAnchors[hash] = parentAnchor - childDiffs[hash] = cumulativeDiff - } else { - anchor, anchorExisted := lastAnchors[hash] - if !anchorExisted { - anchor = &Anchor{powDepth: hd.initPowDepth, hash: hash, tipQueue: &AnchorTipQueue{}, anchorID: hd.nextAnchorID} - hd.nextAnchorID++ - heap.Init(anchor.tipQueue) - fmt.Printf("Undeclared anchor for hash %x, inserting as empty\n", hash) - } - diff, overflow := uint256.FromBig(he.header.Difficulty) - if overflow { - return false, fmt.Errorf("overflow when converting header.Difficulty to uint256: %s", he.header.Difficulty) - } - anchor.difficulty = *diff - anchor.timestamp = he.header.Time - anchor.blockHeight = he.header.Number.Uint64() - if err = hd.addHeaderAsTip(he.header, anchor, *cumulativeDiff, currentTime); err != nil { - return false, fmt.Errorf("add header as tip: %v", err) - } - if len(hd.anchors[parentHash]) == 0 { - if parentHash != (common.Hash{}) { - hd.requestQueue.PushFront(RequestQueueItem{anchorParent: parentHash, waitUntil: currentTime}) - } - } - hd.anchors[parentHash] = append(hd.anchors[parentHash], anchor) - childAnchors[hash] = anchor - childDiffs[hash] = cumulativeDiff - } - prevHash = hash - } else { - fmt.Printf("Duplicate header: %d %x\n", he.header.Number.Uint64(), hash) + if err := hf(he.header, he.blockHeight); err != nil { + return nil, 0, err } var header types.Header - if _, err = io.ReadFull(he.reader, buffer[:]); err == nil { + if _, err := io.ReadFull(he.reader, buffer[:]); err == nil { DeserialiseHeader(&header, buffer[:]) he.blockHeight = header.Number.Uint64() he.hash = header.Hash() @@ -776,26 +774,170 @@ func (hd *HeaderDownload) RecoverFromFiles(currentTime uint64, hardTips map[comm heap.Push(h, he) } else { if !errors.Is(err, io.EOF) { - fmt.Printf("reading header from file: %v\n", err) + return nil, 0, fmt.Errorf("reading header from file: %w", err) + } + if he.file != nil { + if err = he.file.Close(); err != nil { + return nil, 0, fmt.Errorf("closing file: %w", err) + } + } + } + } + return lastAnchors, lastAnchorSequence, nil +} + +func (hd *HeaderDownload) RecoverFromDb(db ethdb.Database, currentTime uint64) (bool, error) { + var anchor *Anchor + err := db.(ethdb.HasKV).KV().View(context.Background(), func(tx ethdb.Tx) error { + c := tx.Cursor(dbutils.HeaderPrefix) + var anchorH types.Header + for k, v, err := c.First(); k != nil; k, v, err = c.Next() { + if err != nil { + return err + } + if len(k) == 40 { + // This is header record + if err = rlp.DecodeBytes(v, &anchorH); err != nil { + return err + } + break + } + } + for k, v, err := c.Last(); k != nil && hd.tipCount < hd.tipLimit; k, v, err = c.Prev() { + if err != nil { + return err + } + if len(k) != 40 { + continue + } + var h types.Header + if err = rlp.DecodeBytes(v, &h); err != nil { + return err + } + var td *big.Int + if td, err = rawdb.ReadTd(db, h.Hash(), h.Number.Uint64()); err != nil { + return err + } + cumulativeDiff, overflow := uint256.FromBig(td) + if overflow { + return fmt.Errorf("overflow of difficulty: %d", td) + } + if anchor == nil { + if anchor, err = hd.addHeaderAsAnchor(&anchorH, 0); err != nil { + return err + } } - if err = he.file.Close(); err != nil { - fmt.Printf("closing file: %v\n", err) + if err = hd.addHeaderAsTip(&h, anchor, *cumulativeDiff, currentTime); err != nil { + return err } } + return nil + }) + return anchor != nil && anchor.maxTipHeight > anchor.blockHeight, err +} + +func (hd *HeaderDownload) RecoverFromFiles(currentTime uint64, hardTips map[common.Hash]struct{}) (bool, error) { + hd.lock.Lock() + defer hd.lock.Unlock() + fileInfos, err := ioutil.ReadDir(hd.filesDir) + if err != nil { + return false, err + } + var lastAnchors map[common.Hash]*Anchor + var files = make([]string, len(fileInfos)) + for i, fileInfo := range fileInfos { + files[i] = path.Join(hd.filesDir, fileInfo.Name()) + } + var prevHeight uint64 + var parentAnchors = make(map[common.Hash]*Anchor) + var parentDiffs = make(map[common.Hash]*uint256.Int) + var childAnchors = make(map[common.Hash]*Anchor) + var childDiffs = make(map[common.Hash]*uint256.Int) + var prevHash common.Hash // Hash of previously seen header - to filter out potential duplicates + if lastAnchors, hd.anchorSequence, err = ReadFilesAndBuffer(files, nil, + func(header *types.Header, blockHeight uint64) error { + hash := header.Hash() + if hash != prevHash { + if blockHeight > prevHeight { + // Clear out parent map and move childMap to its place + parentAnchors = childAnchors + parentDiffs = childDiffs + childAnchors = make(map[common.Hash]*Anchor) + childDiffs = make(map[common.Hash]*uint256.Int) + if blockHeight != prevHeight+1 { + // Skipping the level, so no connection between grand-parents and grand-children + parentAnchors = make(map[common.Hash]*Anchor) + parentDiffs = make(map[common.Hash]*uint256.Int) + } + prevHeight = blockHeight + } + // Since this header has already been processed, we do not expect overflow + cumulativeDiff, overflow := uint256.FromBig(header.Difficulty) + if overflow { + return fmt.Errorf("overflow when converting header.Difficulty to uint256: %s", header.Difficulty) + } + parentHash := header.ParentHash + if parentAnchor, found := parentAnchors[parentHash]; found { + parentDiff := parentDiffs[parentHash] + cumulativeDiff.Add(cumulativeDiff, parentDiff) + if err = hd.addHeaderAsTip(header, parentAnchor, *cumulativeDiff, currentTime); err != nil { + return fmt.Errorf("add header as tip: %v", err) + } + childAnchors[hash] = parentAnchor + childDiffs[hash] = cumulativeDiff + } else { + anchor, anchorExisted := lastAnchors[hash] + if !anchorExisted { + anchor = &Anchor{powDepth: hd.initPowDepth, hash: hash, tipQueue: &AnchorTipQueue{}, anchorID: hd.nextAnchorID} + hd.nextAnchorID++ + heap.Init(anchor.tipQueue) + fmt.Printf("Undeclared anchor for hash %x, inserting as empty\n", hash) + } + diff, overflow := uint256.FromBig(header.Difficulty) + if overflow { + return fmt.Errorf("overflow when converting header.Difficulty to uint256: %s", header.Difficulty) + } + anchor.difficulty = *diff + anchor.timestamp = header.Time + anchor.blockHeight = header.Number.Uint64() + if err = hd.addHeaderAsTip(header, anchor, *cumulativeDiff, currentTime); err != nil { + return fmt.Errorf("add header as tip: %v", err) + } + if len(hd.anchors[parentHash]) == 0 { + if parentHash != (common.Hash{}) { + hd.requestQueue.PushFront(RequestQueueItem{anchorParent: parentHash, waitUntil: currentTime}) + } + } + hd.anchors[parentHash] = append(hd.anchors[parentHash], anchor) + childAnchors[hash] = anchor + childDiffs[hash] = cumulativeDiff + } + prevHash = hash + } else { + fmt.Printf("Duplicate header: %d %x\n", header.Number.Uint64(), hash) + } + return nil + }); err != nil { + return false, err } // Based on the last anchors, set the hardTips for _, anchor := range lastAnchors { - if _, ok := hardTips[anchor.hash]; ok { + anchor.anchorID = hd.nextAnchorID + hd.nextAnchorID++ + if _, ok := hardTips[anchor.hash]; ok && anchor.maxTipHeight == anchor.blockHeight { hd.hardTips[anchor.hash] = struct{}{} fmt.Printf("Adding %d %x to hard-coded tips\n", anchor.blockHeight, anchor.hash) } } + hd.files = files return hd.anchorSequence > 0, nil } -func (hd *HeaderDownload) RequestMoreHeaders(currentTime, timeout uint64) []*HeaderRequest { +func (hd *HeaderDownload) RequestMoreHeaders(currentTime, timeout uint64) ([]*HeaderRequest, *time.Timer) { + hd.lock.Lock() + defer hd.lock.Unlock() if hd.requestQueue.Len() == 0 { - return nil + return nil, hd.RequestQueueTimer } var prevTopTime uint64 = hd.requestQueue.Front().Value.(RequestQueueItem).waitUntil var requests []*HeaderRequest @@ -809,7 +951,7 @@ func (hd *HeaderDownload) RequestMoreHeaders(currentTime, timeout uint64) []*Hea } } hd.resetRequestQueueTimer(prevTopTime, currentTime) - return requests + return requests, hd.RequestQueueTimer } func (hd *HeaderDownload) resetRequestQueueTimer(prevTopTime, currentTime uint64) { @@ -829,6 +971,8 @@ func (hd *HeaderDownload) resetRequestQueueTimer(prevTopTime, currentTime uint64 } func (hd *HeaderDownload) FlushBuffer() error { + hd.lock.Lock() + defer hd.lock.Unlock() if len(hd.buffer) < hd.bufferLimit { // Not flushing the buffer unless it is full return nil @@ -871,6 +1015,7 @@ func (hd *HeaderDownload) FlushBuffer() error { } hd.buffer = hd.buffer[:0] hd.anchorSequence++ + hd.files = append(hd.files, bufferFile.Name()) } else { return err } @@ -878,22 +1023,36 @@ func (hd *HeaderDownload) FlushBuffer() error { return nil } +func (hd *HeaderDownload) PrepareStageData() (files []string, buffer []byte) { + hd.lock.Lock() + defer hd.lock.Unlock() + if !hd.stageReady { + return nil, nil + } + files = hd.files + hd.files = nil + buffer = hd.buffer + hd.buffer, hd.anotherBuffer = hd.anotherBuffer[:0], hd.buffer + hd.stageReady = false + return +} + // CheckInitiation looks at the first header in the given segment, and assuming // that it has been added as a tip, checks whether the anchor parent hash // associated with this tip equals to pre-set value (0x00..00 for genesis) -func (hd *HeaderDownload) CheckInitiation(segment *ChainSegment, initialHash common.Hash) bool { +func (hd *HeaderDownload) checkInitiation(segment *ChainSegment) bool { tipHash := segment.Headers[0].Hash() tip, exists := hd.getTip(tipHash) if !exists { return false } - if tip.anchor.hash != initialHash { + if tip.anchor.parentHash != hd.initialHash { return false } fmt.Printf("Tip %d %x has total difficulty %d, highest %d, len(hd.hardTips) %d\n", tip.blockHeight, tipHash, tip.cumulativeDifficulty.ToBig(), hd.highestTotalDifficulty.ToBig(), len(hd.hardTips)) if tip.cumulativeDifficulty.Gt(&hd.highestTotalDifficulty) { hd.highestTotalDifficulty.Set(&tip.cumulativeDifficulty) - return true + return len(hd.hardTips) == 0 } return false } @@ -912,6 +1071,8 @@ func (hd *HeaderDownload) childTipValid(child *types.Header, tipHash common.Hash } func (hd *HeaderDownload) HasTip(tipHash common.Hash) bool { + hd.lock.RLock() + defer hd.lock.RUnlock() if _, ok := hd.getTip(tipHash); ok { return true } @@ -973,6 +1134,7 @@ func (hd *HeaderDownload) addHeaderAsAnchor(header *types.Header, powDepth int) powDepth: powDepth, difficulty: *diff, timestamp: header.Time, + parentHash: header.ParentHash, hash: header.Hash(), blockHeight: header.Number.Uint64(), tipQueue: &AnchorTipQueue{}, diff --git a/turbo/stages/headerdownload/header_data_struct.go b/turbo/stages/headerdownload/header_data_struct.go index e7b91d3dc75d839ff273320099cc5eb26cf7ce55..524b840b4a40676273c4419fed98361d833af0c6 100644 --- a/turbo/stages/headerdownload/header_data_struct.go +++ b/turbo/stages/headerdownload/header_data_struct.go @@ -5,6 +5,7 @@ import ( "encoding/binary" "fmt" "math/big" + "sync" "time" "github.com/holiman/uint256" @@ -56,6 +57,7 @@ type Anchor struct { powDepth int tipQueue *AnchorTipQueue difficulty uint256.Int + parentHash common.Hash hash common.Hash blockHeight uint64 timestamp uint64 @@ -127,9 +129,12 @@ type VerifySealFunc func(header *types.Header) error type CalcDifficultyFunc func(childTimestamp uint64, parentTime uint64, parentDifficulty, parentNumber *big.Int, parentHash, parentUncleHash common.Hash) *big.Int type HeaderDownload struct { + lock sync.RWMutex buffer []byte + anotherBuffer []byte bufferLimit int filesDir string + files []string anchorSequence uint32 // Sequence number to be used for recording anchors next time the buffer is flushed badHeaders map[common.Hash]struct{} anchors map[common.Hash][]*Anchor // Mapping from parentHash to collection of anchors @@ -147,6 +152,9 @@ type HeaderDownload struct { calcDifficultyFunc CalcDifficultyFunc verifySealFunc VerifySealFunc RequestQueueTimer *time.Timer + initialHash common.Hash + stageReady bool + stageReadyCh chan struct{} } type TipQueueItem struct { @@ -187,13 +195,16 @@ func (rq *RequestQueue) Pop() interface{} { return x } -func NewHeaderDownload(filesDir string, +func NewHeaderDownload( + initialHash common.Hash, + filesDir string, bufferLimit, tipLimit, initPowDepth int, calcDifficultyFunc CalcDifficultyFunc, verifySealFunc VerifySealFunc, newAnchorFutureLimit, newAnchorPastLimit uint64, ) *HeaderDownload { hd := &HeaderDownload{ + initialHash: initialHash, filesDir: filesDir, bufferLimit: bufferLimit, badHeaders: make(map[common.Hash]struct{}), @@ -208,6 +219,7 @@ func NewHeaderDownload(filesDir string, newAnchorPastLimit: newAnchorPastLimit, hardTips: make(map[common.Hash]struct{}), tips: make(map[common.Hash]*Tip), + stageReadyCh: make(chan struct{}), } hd.RequestQueueTimer = time.NewTimer(time.Hour) return hd diff --git a/turbo/stages/headerdownload/header_test.go b/turbo/stages/headerdownload/header_test.go index 270e94165d852e4cc36dad85f64e1670cb7fda10..565f3f42b2be08ba01ea79cc0d849bed54a06299 100644 --- a/turbo/stages/headerdownload/header_test.go +++ b/turbo/stages/headerdownload/header_test.go @@ -16,7 +16,7 @@ const TestTipLimit = 10 const TestInitPowDepth = 16 func TestSplitIntoSegments(t *testing.T) { - hd := NewHeaderDownload("", TestBufferLimit, TestTipLimit, TestInitPowDepth, func(childTimestamp uint64, parentTime uint64, parentDifficulty, parentNumber *big.Int, parentHash, parentUncleHash common.Hash) *big.Int { + hd := NewHeaderDownload(common.Hash{}, "", TestBufferLimit, TestTipLimit, TestInitPowDepth, func(childTimestamp uint64, parentTime uint64, parentDifficulty, parentNumber *big.Int, parentHash, parentUncleHash common.Hash) *big.Int { // To get child difficulty, we just add 1000 to the parent difficulty return big.NewInt(0).Add(parentDifficulty, big.NewInt(1000)) }, nil, 60, 60) @@ -179,7 +179,7 @@ func TestSplitIntoSegments(t *testing.T) { } func TestSingleHeaderAsSegment(t *testing.T) { - hd := NewHeaderDownload("", TestBufferLimit, TestTipLimit, TestInitPowDepth, func(childTimestamp uint64, parentTime uint64, parentDifficulty, parentNumber *big.Int, parentHash, parentUncleHash common.Hash) *big.Int { + hd := NewHeaderDownload(common.Hash{}, "", TestBufferLimit, TestTipLimit, TestInitPowDepth, func(childTimestamp uint64, parentTime uint64, parentDifficulty, parentNumber *big.Int, parentHash, parentUncleHash common.Hash) *big.Int { // To get child difficulty, we just add 1000 to the parent difficulty return big.NewInt(0).Add(parentDifficulty, big.NewInt(1000)) }, nil, 60, 60) @@ -288,7 +288,7 @@ func TestFindTip(t *testing.T) { } func TestExtendUp(t *testing.T) { - hd := NewHeaderDownload("", TestBufferLimit, TestTipLimit, TestInitPowDepth, func(childTimestamp uint64, parentTime uint64, parentDifficulty, parentNumber *big.Int, parentHash, parentUncleHash common.Hash) *big.Int { + hd := NewHeaderDownload(common.Hash{}, "", TestBufferLimit, TestTipLimit, TestInitPowDepth, func(childTimestamp uint64, parentTime uint64, parentDifficulty, parentNumber *big.Int, parentHash, parentUncleHash common.Hash) *big.Int { // To get child difficulty, we just add 1000 to the parent difficulty return big.NewInt(0).Add(parentDifficulty, big.NewInt(1000)) }, func(header *types.Header) error { @@ -424,7 +424,7 @@ func TestExtendUp(t *testing.T) { } func TestExtendDown(t *testing.T) { - hd := NewHeaderDownload("", TestBufferLimit, TestTipLimit, TestInitPowDepth, func(childTimestamp uint64, parentTime uint64, parentDifficulty, parentNumber *big.Int, parentHash, parentUncleHash common.Hash) *big.Int { + hd := NewHeaderDownload(common.Hash{}, "", TestBufferLimit, TestTipLimit, TestInitPowDepth, func(childTimestamp uint64, parentTime uint64, parentDifficulty, parentNumber *big.Int, parentHash, parentUncleHash common.Hash) *big.Int { // To get child difficulty, we just add 1000 to the parent difficulty return big.NewInt(0).Add(parentDifficulty, big.NewInt(1000)) }, func(header *types.Header) error { diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go new file mode 100644 index 0000000000000000000000000000000000000000..5744b75798008093c1c640372d4c3d359a3373bc --- /dev/null +++ b/turbo/stages/stageloop.go @@ -0,0 +1,32 @@ +package stages + +import ( + "context" + "fmt" + + "github.com/ledgerwatch/turbo-geth/core" + "github.com/ledgerwatch/turbo-geth/ethdb" + "github.com/ledgerwatch/turbo-geth/log" + "github.com/ledgerwatch/turbo-geth/turbo/stages/headerdownload" +) + +// StageLoop runs the continuous loop of staged sync +func StageLoop(ctx context.Context, db ethdb.Database, hd *headerdownload.HeaderDownload) error { + if _, _, _, err := core.SetupGenesisBlock(db, core.DefaultGenesisBlock(), false, false /* overwrite */); err != nil { + return fmt.Errorf("setup genesis block: %w", err) + } + files, buffer := hd.PrepareStageData() + for { + if len(files) > 0 || len(buffer) > 0 { + if err := headerdownload.Forward("1/14 Headers", db, files, buffer); err != nil { + log.Error("header download forward failed", "error", err) + } + } + select { + case <-ctx.Done(): + return nil + case <-hd.StageReadyChannel(): + } + files, buffer = hd.PrepareStageData() + } +}