From 57880b8817c64dba20ac1c9756beda90289fb3d2 Mon Sep 17 00:00:00 2001
From: Alex Sharov <AskAlexSharov@gmail.com>
Date: Thu, 20 May 2021 12:33:55 +0700
Subject: [PATCH] fixes for dv2 cancelation (#1968)

* fixes

* fixes

* fixes

* fixes

* fixes
---
 cmd/headers/download/downloader.go          |  3 +-
 eth/backend.go                              | 45 +++++++++++++--------
 eth/stagedsync/stage_headers_new.go         |  3 +-
 turbo/stages/headerdownload/header_algos.go |  2 +
 turbo/stages/stageloop.go                   |  8 ++++
 5 files changed, 42 insertions(+), 19 deletions(-)

diff --git a/cmd/headers/download/downloader.go b/cmd/headers/download/downloader.go
index 4150b796a8..3f0709b72d 100644
--- a/cmd/headers/download/downloader.go
+++ b/cmd/headers/download/downloader.go
@@ -110,7 +110,7 @@ func RecvMessage(ctx context.Context, sentry proto_sentry.SentryClient, handleIn
 }
 
 //Deprecated - use stages.StageLoop
-func Loop(ctx context.Context, db ethdb.Database, sync *stagedsync.StagedSync, controlServer *ControlServerImpl, notifier stagedsync.ChainEventNotifier) {
+func Loop(ctx context.Context, db ethdb.Database, sync *stagedsync.StagedSync, controlServer *ControlServerImpl, notifier stagedsync.ChainEventNotifier, waitForDone chan struct{}) {
 	stages.StageLoop(
 		ctx,
 		db,
@@ -118,6 +118,7 @@ func Loop(ctx context.Context, db ethdb.Database, sync *stagedsync.StagedSync, c
 		controlServer.hd,
 		controlServer.chainConfig,
 		notifier,
+		waitForDone,
 	)
 }
 
diff --git a/eth/backend.go b/eth/backend.go
index 931a102ac0..d5a35be0d0 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -105,13 +105,15 @@ type Ethereum struct {
 	minedBlocks   chan *types.Block
 
 	// downloader v2 fields
-	downloadV2Ctx    context.Context
-	downloadV2Cancel context.CancelFunc
-	downloadServer   *download.ControlServerImpl
-	sentryServer     *download.SentryServerImpl
-	txPoolP2PServer  *eth.TxPoolServer
-	sentries         []proto_sentry.SentryClient
-	stagedSync2      *stagedsync.StagedSync
+	downloadV2Ctx        context.Context
+	downloadV2Cancel     context.CancelFunc
+	downloadServer       *download.ControlServerImpl
+	sentryServer         *download.SentryServerImpl
+	txPoolP2PServer      *eth.TxPoolServer
+	sentries             []proto_sentry.SentryClient
+	stagedSync2          *stagedsync.StagedSync
+	waitForStageLoopStop chan struct{}
+	waitForMiningStop    chan struct{}
 }
 
 // New creates a new Ethereum object (including the
@@ -171,15 +173,17 @@ func New(stack *node.Node, config *ethconfig.Config, gitCommit string) (*Ethereu
 	log.Info("Initialised chain configuration", "config", chainConfig)
 
 	backend := &Ethereum{
-		config:        config,
-		chainDB:       chainDb,
-		chainKV:       chainDb.(ethdb.HasRwKV).RwKV(),
-		networkID:     config.NetworkID,
-		etherbase:     config.Miner.Etherbase,
-		p2pServer:     stack.Server(),
-		torrentClient: torrentClient,
-		chainConfig:   chainConfig,
-		genesisHash:   genesisHash,
+		config:               config,
+		chainDB:              chainDb,
+		chainKV:              chainDb.(ethdb.HasRwKV).RwKV(),
+		networkID:            config.NetworkID,
+		etherbase:            config.Miner.Etherbase,
+		p2pServer:            stack.Server(),
+		torrentClient:        torrentClient,
+		chainConfig:          chainConfig,
+		genesisHash:          genesisHash,
+		waitForStageLoopStop: make(chan struct{}),
+		waitForMiningStop:    make(chan struct{}),
 	}
 	backend.gasPrice, _ = uint256.FromBig(config.Miner.GasPrice)
 
@@ -603,6 +607,7 @@ func (s *Ethereum) StartMining(kv ethdb.RwKV, pendingBlocksCh chan *types.Block,
 	}
 
 	go func() {
+		defer close(s.waitForMiningStop)
 		newTransactions := make(chan core.NewTxsEvent, txChanSize)
 		sub := s.txPool.SubscribeNewTxsEvent(newTransactions)
 		defer sub.Unsubscribe()
@@ -716,7 +721,7 @@ func (s *Ethereum) Start() error {
 	if s.config.EnableDownloadV2 {
 		go download.RecvMessage(s.downloadV2Ctx, s.sentries[0], s.downloadServer.HandleInboundMessage)
 		go download.RecvUploadMessage(s.downloadV2Ctx, s.sentries[0], s.downloadServer.HandleInboundMessage)
-		go download.Loop(s.downloadV2Ctx, s.chainDB, s.stagedSync2, s.downloadServer, s.events)
+		go download.Loop(s.downloadV2Ctx, s.chainDB, s.stagedSync2, s.downloadServer, s.events, s.waitForStageLoopStop)
 	} else {
 		eth.StartENRUpdater(s.chainConfig, s.genesisHash, s.events, s.p2pServer.LocalNode())
 		// Start the networking layer and the light server if requested
@@ -760,5 +765,11 @@ func (s *Ethereum) Stop() error {
 	if s.txPool != nil {
 		s.txPool.Stop()
 	}
+	if s.config.EnableDownloadV2 {
+		<-s.waitForStageLoopStop
+	}
+	if s.config.Miner.Enabled {
+		<-s.waitForMiningStop
+	}
 	return nil
 }
diff --git a/eth/stagedsync/stage_headers_new.go b/eth/stagedsync/stage_headers_new.go
index 9988d59e5a..cb306c3f10 100644
--- a/eth/stagedsync/stage_headers_new.go
+++ b/eth/stagedsync/stage_headers_new.go
@@ -240,7 +240,7 @@ func HeadersForward(
 	}
 	log.Info(fmt.Sprintf("[%s] Processed", logPrefix), "highest", headerInserter.GetHighest(), "age", common.PrettyAge(time.Unix(int64(headerInserter.GetHighestTimestamp()), 0)))
 	if stopped {
-		return fmt.Errorf("interrupted")
+		return common.ErrStopped
 	}
 	stageHeadersGauge.Update(int64(headerInserter.GetHighest()))
 	return nil
@@ -261,6 +261,7 @@ func fixCanonicalChain(logPrefix string, height uint64, hash common.Hash, tx eth
 		ancestor := rawdb.ReadHeader(tx, ancestorHash, ancestorHeight)
 		if ancestor == nil {
 			log.Error("ancestor nil", "height", ancestorHeight, "hash", ancestorHash)
+			return err
 		} else {
 			log.Debug("ancestor", "height", ancestorHeight, "hash", ancestorHash)
 		}
diff --git a/turbo/stages/headerdownload/header_algos.go b/turbo/stages/headerdownload/header_algos.go
index 2f65facf9d..20676e089f 100644
--- a/turbo/stages/headerdownload/header_algos.go
+++ b/turbo/stages/headerdownload/header_algos.go
@@ -435,6 +435,8 @@ func (hd *HeaderDownload) SetPreverifiedHashes(preverifiedHashes map[common.Hash
 }
 
 func (hd *HeaderDownload) RecoverFromDb(db ethdb.Database) error {
+	hd.lock.Lock()
+	defer hd.lock.Unlock()
 	// Drain persistedLinksQueue and remove links
 	for hd.persistedLinkQueue.Len() > 0 {
 		link := heap.Pop(hd.persistedLinkQueue).(*Link)
diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go
index 05ca86a354..e0261c5fcf 100644
--- a/turbo/stages/stageloop.go
+++ b/turbo/stages/stageloop.go
@@ -3,12 +3,14 @@ package stages
 import (
 	"context"
 	"encoding/binary"
+	"errors"
 	"fmt"
 	"runtime/debug"
 	"strings"
 	"time"
 
 	"github.com/c2h5oh/datasize"
+	"github.com/ledgerwatch/turbo-geth/common"
 	"github.com/ledgerwatch/turbo-geth/common/dbutils"
 	"github.com/ledgerwatch/turbo-geth/core/vm"
 	"github.com/ledgerwatch/turbo-geth/eth/stagedsync"
@@ -51,7 +53,9 @@ func StageLoop(
 	hd *headerdownload.HeaderDownload,
 	chainConfig *params.ChainConfig,
 	notifier stagedsync.ChainEventNotifier,
+	waitForDone chan struct{},
 ) {
+	defer close(waitForDone)
 	initialCycle := true
 
 	for {
@@ -64,6 +68,10 @@ func StageLoop(
 		// Estimate the current top height seen from the peer
 		height := hd.TopSeenHeight()
 		if err := StageLoopStep(ctx, db, sync, height, chainConfig, notifier, initialCycle); err != nil {
+			if errors.Is(err, common.ErrStopped) {
+				return
+			}
+
 			log.Error("Stage loop failure", "error", err)
 			if recoveryErr := hd.RecoverFromDb(db); recoveryErr != nil {
 				log.Error("Failed to recover header downoader", "error", recoveryErr)
-- 
GitLab