From c5b1836481aaa2e17262b8bc430ceb15b8f59592 Mon Sep 17 00:00:00 2001
From: Giulio rebuffo <giulio.rebuffo@gmail.com>
Date: Fri, 22 Apr 2022 08:23:52 +0200
Subject: [PATCH] Added Ethstats service (#3931)

* somewhat there but not yet

* lol

* more efficient ethstats

* lint

* not die on no wifi
---
 cmd/integration/commands/stages.go |   2 +-
 cmd/utils/flags.go                 |   2 +
 core/rawdb/accessors_chain.go      |  13 +
 eth/backend.go                     |  15 +-
 eth/ethconfig/config.go            |   2 +
 eth/stagedsync/stage_finish.go     |  12 +-
 ethstats/ethstats.go               | 389 ++++++++++++-----------------
 turbo/cli/default_flags.go         |   1 +
 turbo/stages/mock_sentry.go        |   2 +-
 turbo/stages/stageloop.go          |   4 +-
 10 files changed, 202 insertions(+), 240 deletions(-)

diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go
index 7567fb1ff2..569ccd494d 100644
--- a/cmd/integration/commands/stages.go
+++ b/cmd/integration/commands/stages.go
@@ -1176,7 +1176,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig)
 
 	sync, err := stages2.NewStagedSync(context.Background(), logger, db, p2p.Config{}, cfg,
 		chainConfig.TerminalTotalDifficulty, sentryControlServer, tmpdir,
-		&stagedsync.Notifications{}, nil, allSn, cfg.SnapshotDir,
+		&stagedsync.Notifications{}, nil, allSn, cfg.SnapshotDir, nil,
 	)
 	if err != nil {
 		panic(err)
diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go
index 1f64f32f6b..b2bf40fea2 100644
--- a/cmd/utils/flags.go
+++ b/cmd/utils/flags.go
@@ -297,6 +297,7 @@ var (
 	EthStatsURLFlag = cli.StringFlag{
 		Name:  "ethstats",
 		Usage: "Reporting URL of a ethstats service (nodename:secret@host:port)",
+		Value: "",
 	}
 	FakePoWFlag = cli.BoolFlag{
 		Name:  "fakepow",
@@ -1408,6 +1409,7 @@ func SetEthConfig(ctx *cli.Context, nodeConfig *node.Config, cfg *ethconfig.Conf
 	setWhitelist(ctx, cfg)
 	setBorConfig(ctx, cfg)
 
+	cfg.Ethstats = ctx.GlobalString(EthStatsURLFlag.Name)
 	cfg.P2PEnabled = len(nodeConfig.P2P.SentryAddr) == 0
 	cfg.EnabledIssuance = ctx.GlobalIsSet(EnabledIssuance.Name)
 	if ctx.GlobalIsSet(NetworkIdFlag.Name) {
diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go
index f21cdbaed4..4c8efc4a31 100644
--- a/core/rawdb/accessors_chain.go
+++ b/core/rawdb/accessors_chain.go
@@ -32,6 +32,7 @@ import (
 	"github.com/ledgerwatch/erigon/common"
 	"github.com/ledgerwatch/erigon/common/dbutils"
 	"github.com/ledgerwatch/erigon/core/types"
+	"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
 	"github.com/ledgerwatch/erigon/ethdb/cbor"
 	"github.com/ledgerwatch/erigon/rlp"
 )
@@ -259,6 +260,18 @@ func ReadCurrentBlock(db kv.Tx) *types.Block {
 	return ReadBlock(db, headHash, *headNumber)
 }
 
+func ReadLastBlockSynced(db kv.Tx) (*types.Block, error) {
+	headNumber, err := stages.GetStageProgress(db, stages.Execution)
+	if err != nil {
+		return nil, err
+	}
+	headHash, err := ReadCanonicalHash(db, headNumber)
+	if err != nil {
+		return nil, err
+	}
+	return ReadBlock(db, headHash, headNumber), nil
+}
+
 func ReadHeadersByNumber(db kv.Tx, number uint64) ([]*types.Header, error) {
 	var res []*types.Header
 	c, err := db.Cursor(kv.Headers)
diff --git a/eth/backend.go b/eth/backend.go
index 34f5598d5a..2b641ca6af 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -70,6 +70,7 @@ import (
 	"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
 	"github.com/ledgerwatch/erigon/ethdb/privateapi"
 	"github.com/ledgerwatch/erigon/ethdb/prune"
+	"github.com/ledgerwatch/erigon/ethstats"
 	"github.com/ledgerwatch/erigon/node"
 	"github.com/ledgerwatch/erigon/p2p"
 	"github.com/ledgerwatch/erigon/params"
@@ -486,10 +487,16 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l
 	if err := backend.StartMining(context.Background(), backend.chainDB, mining, backend.config.Miner, backend.gasPrice, backend.quitMining); err != nil {
 		return nil, err
 	}
+
+	var headCh chan *types.Block
+	if config.Ethstats != "" {
+		headCh = make(chan *types.Block, 1)
+	}
+
 	backend.stagedSync, err = stages2.NewStagedSync(backend.sentryCtx, backend.log, backend.chainDB,
 		stack.Config().P2P, *config, chainConfig.TerminalTotalDifficulty,
 		backend.sentryControlServer, tmpdir, backend.notifications,
-		backend.downloaderClient, allSnapshots, config.SnapshotDir)
+		backend.downloaderClient, allSnapshots, config.SnapshotDir, headCh)
 	if err != nil {
 		return nil, err
 	}
@@ -519,7 +526,11 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l
 		gpoParams.Default = config.Miner.GasPrice
 	}
 	//eth.APIBackend.gpo = gasprice.NewOracle(eth.APIBackend, gpoParams)
-
+	if config.Ethstats != "" {
+		if err := ethstats.New(stack, backend.sentryServers, chainKv, backend.engine, config.Ethstats, backend.networkID, ctx.Done(), headCh); err != nil {
+			return nil, err
+		}
+	}
 	// start HTTP API
 	httpRpcCfg := stack.Config().Http
 	if httpRpcCfg.Enabled {
diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go
index 68eee2e4ae..17b60641cb 100644
--- a/eth/ethconfig/config.go
+++ b/eth/ethconfig/config.go
@@ -224,6 +224,8 @@ type Config struct {
 
 	// No heimdall service
 	WithoutHeimdall bool
+	// Ethstats service
+	Ethstats string
 }
 
 func CreateConsensusEngine(chainConfig *params.ChainConfig, logger log.Logger, config interface{}, notify []string, noverify bool, HeimdallURL string, WithoutHeimdall bool, datadir string) consensus.Engine {
diff --git a/eth/stagedsync/stage_finish.go b/eth/stagedsync/stage_finish.go
index 06803a6326..b9e3bc174b 100644
--- a/eth/stagedsync/stage_finish.go
+++ b/eth/stagedsync/stage_finish.go
@@ -25,13 +25,15 @@ type FinishCfg struct {
 	db     kv.RwDB
 	tmpDir string
 	log    log.Logger
+	headCh chan *types.Block
 }
 
-func StageFinishCfg(db kv.RwDB, tmpDir string, logger log.Logger) FinishCfg {
+func StageFinishCfg(db kv.RwDB, tmpDir string, logger log.Logger, headCh chan *types.Block) FinishCfg {
 	return FinishCfg{
 		db:     db,
 		log:    logger,
 		tmpDir: tmpDir,
+		headCh: headCh,
 	}
 }
 
@@ -67,6 +69,14 @@ func FinishForward(s *StageState, tx kv.RwTx, cfg FinishCfg, initialCycle bool)
 		}
 	}
 
+	if cfg.headCh != nil {
+		select {
+		case cfg.headCh <- rawdb.ReadCurrentBlock(tx):
+		default:
+		}
+
+	}
+
 	if !useExternalTx {
 		if err := tx.Commit(); err != nil {
 			return err
diff --git a/ethstats/ethstats.go b/ethstats/ethstats.go
index b8c32fd8af..7be659303e 100644
--- a/ethstats/ethstats.go
+++ b/ethstats/ethstats.go
@@ -17,7 +17,6 @@
 // Package ethstats implements the network stats reporting service.
 package ethstats
 
-/*
 import (
 	"context"
 	"encoding/json"
@@ -33,65 +32,38 @@ import (
 	"time"
 
 	"github.com/gorilla/websocket"
+	"github.com/ledgerwatch/erigon-lib/kv"
+	"github.com/ledgerwatch/erigon/cmd/sentry/sentry"
 	"github.com/ledgerwatch/erigon/common"
-	"github.com/ledgerwatch/erigon/common/mclock"
 	"github.com/ledgerwatch/erigon/consensus"
-	"github.com/ledgerwatch/erigon/core"
+	"github.com/ledgerwatch/erigon/core/rawdb"
 	"github.com/ledgerwatch/erigon/core/types"
-	"github.com/ledgerwatch/erigon/eth"
-	"github.com/ledgerwatch/erigon/eth/downloader"
-	"github.com/ledgerwatch/erigon/event"
-	"github.com/ledgerwatch/log/v3"
-	"github.com/ledgerwatch/erigon/miner"
+	"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
 	"github.com/ledgerwatch/erigon/node"
-	"github.com/ledgerwatch/erigon/p2p"
-	"github.com/ledgerwatch/erigon/rpc"
+	"github.com/ledgerwatch/log/v3"
 )
 
 const (
 	// historyUpdateRange is the number of blocks a node should report upon login or
 	// history request.
 	historyUpdateRange = 50
-
-	// txChanSize is the size of channel listening to NewTxsEvent.
-	// The number is referenced from the size of tx pool.
-	txChanSize = 4096
-	// chainHeadChanSize is the size of channel listening to ChainHeadEvent.
-	chainHeadChanSize = 10
 )
 
-// backend encompasses the bare-minimum functionality needed for ethstats reporting
-type backend interface {
-	SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription
-	SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription
-	CurrentHeader() *types.Header
-	HeaderByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Header, error)
-	GetTd(ctx context.Context, hash common.Hash) *big.Int
-	Stats() (pending int, queued int)
-	Downloader() *downloader.Downloader
-}
-
-// fullNodeBackend encompasses the functionality necessary for a full node
-// reporting to ethstats
-type fullNodeBackend interface {
-	backend
-	Miner() *miner.Miner
-	BlockByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Block, error)
-	CurrentBlock() *types.Block
-	SuggestPrice(ctx context.Context) (*big.Int, error)
-}
-
 // Service implements an Ethereum netstats reporting daemon that pushes local
 // chain statistics up to a monitoring server.
 type Service struct {
-	server  *p2p.Server // Peer-to-peer server to retrieve networking infos
-	backend backend
-	engine  consensus.Engine // Consensus engine to retrieve variadic block fields
+	servers   []*sentry.SentryServerImpl // Peer-to-peer server to retrieve networking infos
+	chaindb   kv.RoDB
+	networkid uint64
+	engine    consensus.Engine // Consensus engine to retrieve variadic block fields
 
 	node string // Name of the node to display on the monitoring page
 	pass string // Password to authorize access to the monitoring page
 	host string // Remote address of the monitoring service
 
+	quitCh <-chan struct{}
+	headCh <-chan *types.Block
+
 	pongCh chan struct{} // Pong notifications are fed into this channel
 	histCh chan []uint64 // History request block numbers are fed into this channel
 
@@ -121,6 +93,9 @@ func newConnectionWrapper(conn *websocket.Conn) *connWrapper {
 
 // WriteJSON wraps corresponding method on the websocket but is safe for concurrent calling
 func (w *connWrapper) WriteJSON(v interface{}) error {
+	if w.conn == nil {
+		return nil
+	}
 	w.wlock.Lock()
 	defer w.wlock.Unlock()
 
@@ -143,7 +118,7 @@ func (w *connWrapper) Close() error {
 }
 
 // New returns a monitoring service ready for stats reporting.
-func New(node *node.Node, backend backend, engine consensus.Engine, url string) error {
+func New(node *node.Node, servers []*sentry.SentryServerImpl, chainDB kv.RoDB, engine consensus.Engine, url string, networkid uint64, quitCh <-chan struct{}, headCh chan *types.Block) error {
 	// Parse the netstats connection url
 	re := regexp.MustCompile("([^:@]*)(:([^@]*))?@(.+)")
 	parts := re.FindStringSubmatch(url)
@@ -151,14 +126,17 @@ func New(node *node.Node, backend backend, engine consensus.Engine, url string)
 		return fmt.Errorf("invalid netstats url: \"%s\", should be nodename:secret@host:port", url)
 	}
 	ethstats := &Service{
-		backend: backend,
-		engine:  engine,
-		server:  node.Server(),
-		node:    parts[1],
-		pass:    parts[3],
-		host:    parts[4],
-		pongCh:  make(chan struct{}),
-		histCh:  make(chan []uint64, 1),
+		engine:    engine,
+		servers:   servers,
+		node:      parts[1],
+		pass:      parts[3],
+		host:      parts[4],
+		pongCh:    make(chan struct{}),
+		histCh:    make(chan []uint64, 1),
+		networkid: networkid,
+		chaindb:   chainDB,
+		headCh:    headCh,
+		quitCh:    quitCh,
 	}
 
 	node.RegisterLifecycle(ethstats)
@@ -182,56 +160,6 @@ func (s *Service) Stop() error {
 // loop keeps trying to connect to the netstats server, reporting chain events
 // until termination.
 func (s *Service) loop() {
-	// Subscribe to chain events to execute updates on
-	chainHeadCh := make(chan core.ChainHeadEvent, chainHeadChanSize)
-	headSub := s.backend.SubscribeChainHeadEvent(chainHeadCh)
-	defer headSub.Unsubscribe()
-
-	txEventCh := make(chan core.NewTxsEvent, txChanSize)
-	txSub := s.backend.SubscribeNewTxsEvent(txEventCh)
-	defer txSub.Unsubscribe()
-
-	// Start a goroutine that exhausts the subscriptions to avoid events piling up
-	var (
-		quitCh = make(chan struct{})
-		headCh = make(chan *types.Block, 1)
-		txCh   = make(chan struct{}, 1)
-	)
-	go func() {
-		var lastTx mclock.AbsTime
-
-	HandleLoop:
-		for {
-			select {
-			// Notify of chain head events, but drop if too frequent
-			case head := <-chainHeadCh:
-				select {
-				case headCh <- head.Block:
-				default:
-				}
-
-			// Notify of new transaction events, but drop if too frequent
-			case <-txEventCh:
-				if time.Duration(mclock.Now()-lastTx) < time.Second {
-					continue
-				}
-				lastTx = mclock.Now()
-
-				select {
-				case txCh <- struct{}{}:
-				default:
-				}
-
-			// node stopped
-			case <-txSub.Err():
-				break HandleLoop
-			case <-headSub.Err():
-				break HandleLoop
-			}
-		}
-		close(quitCh)
-	}()
-
 	// Resolve the URL, defaulting to TLS, but falling back to none too
 	path := fmt.Sprintf("%s/api", s.host)
 	urls := []string{path}
@@ -246,7 +174,7 @@ func (s *Service) loop() {
 	// Loop reporting until termination
 	for {
 		select {
-		case <-quitCh:
+		case <-s.quitCh:
 			return
 		case <-errTimer.C:
 			// Establish a websocket connection to the server on any supported URL
@@ -258,15 +186,15 @@ func (s *Service) loop() {
 			header := make(http.Header)
 			header.Set("origin", "http://localhost")
 			for _, url := range urls {
-				c, _, e := dialer.Dial(url, header)
-				err = e
+				//nolint
+				c, _, err := dialer.Dial(url, header)
 				if err == nil {
 					conn = newConnectionWrapper(c)
 					break
 				}
 			}
-			if err != nil {
-				log.Warn("Stats server unreachable", "err", err)
+			if err != nil || conn == nil {
+				log.Warn("Stats server unreachable")
 				errTimer.Reset(10 * time.Second)
 				continue
 			}
@@ -279,19 +207,12 @@ func (s *Service) loop() {
 			}
 			go s.readLoop(conn)
 
-			// Send the initial stats so our node looks decent from the get go
-			if err = s.report(conn); err != nil {
-				log.Warn("Initial stats report failed", "err", err)
-				conn.Close()
-				errTimer.Reset(0)
-				continue
-			}
 			// Keep sending status updates until the connection breaks
 			fullReport := time.NewTicker(15 * time.Second)
 
 			for err == nil {
 				select {
-				case <-quitCh:
+				case <-s.quitCh:
 					fullReport.Stop()
 					// Make sure the connection is closed
 					conn.Close()
@@ -305,17 +226,11 @@ func (s *Service) loop() {
 					if err = s.reportHistory(conn, list); err != nil {
 						log.Warn("Requested history report failed", "err", err)
 					}
-				case head := <-headCh:
+				case head := <-s.headCh:
 					if err = s.reportBlock(conn, head); err != nil {
 						log.Warn("Block stats report failed", "err", err)
 					}
-					if err = s.reportPending(conn); err != nil {
-						log.Warn("Post-block transaction stats report failed", "err", err)
-					}
-				case <-txCh:
-					if err = s.reportPending(conn); err != nil {
-						log.Warn("Transaction stats report failed", "err", err)
-					}
+
 				}
 			}
 			fullReport.Stop()
@@ -345,7 +260,7 @@ func (s *Service) readLoop(conn *connWrapper) {
 		// If the network packet is a system ping, respond to it directly
 		var ping string
 		if err := json.Unmarshal(blob, &ping); err == nil && strings.HasPrefix(ping, "primus::ping::") {
-			if err := conn.WriteJSON(strings.Replace(ping, "ping", "pong", -1)); err != nil {
+			if err := conn.WriteJSON(strings.ReplaceAll(ping, "ping", "pong")); err != nil {
 				log.Warn("Failed to respond to system ping message", "err", err)
 				return
 			}
@@ -442,23 +357,28 @@ type authMsg struct {
 // login tries to authorize the client at the remote server.
 func (s *Service) login(conn *connWrapper) error {
 	// Construct and send the login authentication
-	infos := s.server.NodeInfo()
+	// infos := s.server.NodeInfo()
 
 	var protocols []string
-	for _, proto := range s.server.Protocols {
-		protocols = append(protocols, fmt.Sprintf("%s/%d", proto.Name, proto.Version))
-	}
-	var network string
-	if info := infos.Protocols["eth"]; info != nil {
-		network = fmt.Sprintf("%d", info.(*ethproto.NodeInfo).Network)
+	for _, srv := range s.servers {
+		protocols = append(protocols, fmt.Sprintf("%s/%d", srv.Protocol.Name, srv.Protocol.Version))
+	}
+	nodeName := "Erigon"
+	if len(s.servers) > 0 {
+		nodeInfo, err := s.servers[0].NodeInfo(context.TODO(), nil)
+		if err != nil {
+			return err
+		}
+		nodeName = nodeInfo.Name
 	}
+
 	auth := &authMsg{
 		ID: s.node,
 		Info: nodeInfo{
 			Name:     s.node,
-			Node:     infos.Name,
-			Port:     infos.Ports.Listener,
-			Network:  network,
+			Node:     nodeName,
+			Port:     0,
+			Network:  fmt.Sprintf("%d", s.networkid),
 			Protocol: strings.Join(protocols, ", "),
 			API:      "No",
 			Os:       runtime.GOOS,
@@ -573,8 +493,25 @@ func (s uncleStats) MarshalJSON() ([]byte, error) {
 
 // reportBlock retrieves the current chain head and reports it to the stats server.
 func (s *Service) reportBlock(conn *connWrapper, block *types.Block) error {
+	roTx, err := s.chaindb.BeginRo(context.Background())
+	if err != nil {
+		return err
+	}
+	defer roTx.Rollback()
+	if block == nil {
+		block, err = rawdb.ReadLastBlockSynced(roTx)
+		if err != nil {
+			return err
+		}
+	}
+
+	td, err := rawdb.ReadTd(roTx, block.Hash(), block.NumberU64())
+	if err != nil {
+		return err
+	}
+
 	// Gather the block details from the header or block chain
-	details := s.assembleBlockStats(block)
+	details := s.assembleBlockStats(block, td)
 
 	// Assemble the block report and send it to the server
 	log.Trace("Sending new block to ethstats", "number", details.Number, "hash", details.Hash)
@@ -591,54 +528,44 @@ func (s *Service) reportBlock(conn *connWrapper, block *types.Block) error {
 
 // assembleBlockStats retrieves any required metadata to report a single block
 // and assembles the block stats. If block is nil, the current head is processed.
-func (s *Service) assembleBlockStats(block *types.Block) *blockStats {
+func (s *Service) assembleBlockStats(block *types.Block, td *big.Int) *blockStats {
+	if td == nil {
+		td = common.Big0
+	}
 	// Gather the block infos from the local blockchain
 	var (
-		header *types.Header
-		td     *big.Int
-		txs    []txStats
-		uncles []*types.Header
+		txs []txStats
 	)
-
-	// check if backend is a full node
-	fullBackend, ok := s.backend.(fullNodeBackend)
-	if ok {
-		if block == nil {
-			block = fullBackend.CurrentBlock()
-		}
-		header = block.Header()
-		td = fullBackend.GetTd(context.Background(), header.Hash())
-
-		txs = make([]txStats, len(block.Transactions()))
-		for i, tx := range block.Transactions() {
-			txs[i].Hash = tx.Hash()
-		}
-		uncles = block.Uncles()
+	for _, tx := range block.Transactions() {
+		txs = append(txs, txStats{tx.Hash()})
 	}
 
-	// Assemble and return the block stats
-	author, _ := s.engine.Author(header)
-
 	return &blockStats{
-		Number:     header.Number,
-		Hash:       header.Hash(),
-		ParentHash: header.ParentHash,
-		Timestamp:  new(big.Int).SetUint64(header.Time),
-		Miner:      author,
-		GasUsed:    header.GasUsed,
-		GasLimit:   header.GasLimit,
-		Diff:       header.Difficulty.String(),
+		Number:     block.Header().Number,
+		Hash:       block.Hash(),
+		ParentHash: block.Header().ParentHash,
+		Timestamp:  new(big.Int).SetUint64(block.Header().Time),
+		Miner:      block.Header().Coinbase,
+		GasUsed:    block.Header().GasUsed,
+		GasLimit:   block.Header().GasLimit,
+		Diff:       block.Header().Difficulty.String(),
 		TotalDiff:  td.String(),
 		Txs:        txs,
-		TxHash:     header.TxHash,
-		Root:       header.Root,
-		Uncles:     uncles,
+		TxHash:     block.Header().TxHash,
+		Root:       block.Header().Root,
+		Uncles:     block.Uncles(),
 	}
 }
 
 // reportHistory retrieves the most recent batch of blocks and reports it to the
 // stats server.
 func (s *Service) reportHistory(conn *connWrapper, list []uint64) error {
+	roTx, err := s.chaindb.BeginRo(context.Background())
+	if err != nil {
+		return err
+	}
+	defer roTx.Rollback()
+
 	// Figure out the indexes that need reporting
 	indexes := make([]uint64, 0, historyUpdateRange)
 	if len(list) > 0 {
@@ -646,31 +573,34 @@ func (s *Service) reportHistory(conn *connWrapper, list []uint64) error {
 		indexes = append(indexes, list...)
 	} else {
 		// No indexes requested, send back the top ones
-		head := s.backend.CurrentHeader().Number.Int64()
-		start := head - historyUpdateRange + 1
+		headHash := rawdb.ReadHeadBlockHash(roTx)
+		headNumber := rawdb.ReadHeaderNumber(roTx, headHash)
+		if headNumber == nil {
+			return nil
+		}
+		start := int(*headNumber - historyUpdateRange + 1)
 		if start < 0 {
 			start = 0
 		}
-		for i := uint64(start); i <= uint64(head); i++ {
+		for i := uint64(start); i <= *headNumber; i++ {
 			indexes = append(indexes, i)
 		}
 	}
 	// Gather the batch of blocks to report
 	history := make([]*blockStats, len(indexes))
 	for i, number := range indexes {
-		fullBackend, ok := s.backend.(fullNodeBackend)
 		// Retrieve the next block if it's known to us
-		var block *types.Block
-		if ok {
-			block, _ = fullBackend.BlockByNumber(context.Background(), rpc.BlockNumber(number)) // TODO ignore error here ?
-		} else {
-			if header, _ := s.backend.HeaderByNumber(context.Background(), rpc.BlockNumber(number)); header != nil {
-				block = types.NewBlockWithHeader(header)
-			}
+		block, err := rawdb.ReadBlockByNumber(roTx, number)
+		if err != nil {
+			return err
+		}
+		td, err := rawdb.ReadTd(roTx, block.Hash(), number)
+		if err != nil {
+			return err
 		}
 		// If we do have the block, add to the history and continue
 		if block != nil {
-			history[len(history)-1-i] = s.assembleBlockStats(block)
+			history[len(history)-1-i] = s.assembleBlockStats(block, td)
 			continue
 		}
 		// Ran out of blocks, cut the report short and send
@@ -693,77 +623,69 @@ func (s *Service) reportHistory(conn *connWrapper, list []uint64) error {
 	return conn.WriteJSON(report)
 }
 
-// pendStats is the information to report about pending transactions.
-type pendStats struct {
-	Pending int `json:"pending"`
-}
-
 // reportPending retrieves the current number of pending transactions and reports
 // it to the stats server.
 func (s *Service) reportPending(conn *connWrapper) error {
-	// Retrieve the pending count from the local blockchain
-	pending, _ := s.backend.Stats()
-	// Assemble the transaction stats and send it to the server
-	log.Trace("Sending pending transactions to ethstats", "count", pending)
-
-	stats := map[string]interface{}{
-		"id": s.node,
-		"stats": &pendStats{
-			Pending: pending,
-		},
-	}
-	report := map[string][]interface{}{
-		"emit": {"pending", stats},
-	}
-	return conn.WriteJSON(report)
+	/*	// Retrieve the pending count from the local blockchain
+		pending, _ := s.backend.Stats()
+		// Assemble the transaction stats and send it to the server
+		log.Trace("Sending pending transactions to ethstats", "count", pending)
+
+		stats := map[string]interface{}{
+			"id": s.node,
+			"stats": &pendStats{
+				Pending: pending,
+			},
+		}
+		report := map[string][]interface{}{
+			"emit": {"pending", stats},
+		}
+		return conn.WriteJSON(report)*/
+	return nil
 }
 
 // nodeStats is the information to report about the local node.
 type nodeStats struct {
-	Active   bool `json:"active"`
-	Syncing  bool `json:"syncing"`
-	Mining   bool `json:"mining"`
-	Hashrate int  `json:"hashrate"`
-	GoodPeers    int  `json:"peers"`
-	GasPrice int  `json:"gasPrice"`
-	Uptime   int  `json:"uptime"`
+	Active    bool `json:"active"`
+	Syncing   bool `json:"syncing"`
+	Mining    bool `json:"mining"`
+	Hashrate  int  `json:"hashrate"`
+	GoodPeers int  `json:"peers"`
+	GasPrice  int  `json:"gasPrice"`
+	Uptime    int  `json:"uptime"`
 }
 
 // reportStats retrieves various stats about the node at the networking and
 // mining layer and reports it to the stats server.
 func (s *Service) reportStats(conn *connWrapper) error {
-	// Gather the syncing and mining infos from the local miner instance
-	var (
-		mining   bool
-		hashrate int
-		syncing  bool
-		gasprice int
-	)
-	// check if backend is a full node
-	fullBackend, ok := s.backend.(fullNodeBackend)
-	if ok {
-		mining = fullBackend.Miner().Mining()
-		hashrate = int(fullBackend.Miner().HashRate())
-
-		sync := fullBackend.Downloader().Progress()
-		syncing = fullBackend.CurrentHeader().Number.Uint64() >= sync.HighestBlock
-
-		price, _ := fullBackend.SuggestPrice(context.Background())
-		gasprice = int(price.Uint64())
+	roTx, err := s.chaindb.BeginRo(context.Background())
+	if err != nil {
+		return err
+	}
+	defer roTx.Rollback()
+	sync, err := stages.GetStageProgress(roTx, stages.Execution)
+	if err != nil {
+		return err
+	}
+	finishSync, err := stages.GetStageProgress(roTx, stages.Finish)
+	if err != nil {
+		return err
+	}
+	// TODO(Giulio2002): peer tracking
+	peerCount := 0
+	for _, srv := range s.servers {
+		peerCount += srv.SimplePeerCount()
 	}
-	// Assemble the node stats and send it to the server
-	log.Trace("Sending node details to ethstats")
-
 	stats := map[string]interface{}{
 		"id": s.node,
 		"stats": &nodeStats{
-			Active:   true,
-			Mining:   mining,
-			Hashrate: hashrate,
-			GoodPeers:    s.server.PeerCount(),
-			GasPrice: gasprice,
-			Syncing:  syncing,
-			Uptime:   100,
+			Active:    true,
+			Mining:    false,
+			Hashrate:  0,
+			GoodPeers: peerCount,
+			GasPrice:  0,
+			Syncing:   sync != finishSync,
+			Uptime:    100,
 		},
 	}
 	report := map[string][]interface{}{
@@ -771,4 +693,3 @@ func (s *Service) reportStats(conn *connWrapper) error {
 	}
 	return conn.WriteJSON(report)
 }
-*/
diff --git a/turbo/cli/default_flags.go b/turbo/cli/default_flags.go
index 1300e3f1d9..a252294ba9 100644
--- a/turbo/cli/default_flags.go
+++ b/turbo/cli/default_flags.go
@@ -122,4 +122,5 @@ var DefaultFlags = []cli.Flag{
 	HealthCheckFlag,
 	utils.HeimdallURLFlag,
 	utils.WithoutHeimdallFlag,
+	utils.EthStatsURLFlag,
 }
diff --git a/turbo/stages/mock_sentry.go b/turbo/stages/mock_sentry.go
index 897f3bf426..5575624649 100644
--- a/turbo/stages/mock_sentry.go
+++ b/turbo/stages/mock_sentry.go
@@ -337,7 +337,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey
 			stagedsync.StageLogIndexCfg(mock.DB, prune, mock.tmpdir),
 			stagedsync.StageCallTracesCfg(mock.DB, prune, 0, mock.tmpdir),
 			stagedsync.StageTxLookupCfg(mock.DB, prune, mock.tmpdir, allSnapshots, isBor),
-			stagedsync.StageFinishCfg(mock.DB, mock.tmpdir, mock.Log), true),
+			stagedsync.StageFinishCfg(mock.DB, mock.tmpdir, mock.Log, nil), true),
 		stagedsync.DefaultUnwindOrder,
 		stagedsync.DefaultPruneOrder,
 	)
diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go
index 520e24086f..a2664a0f8c 100644
--- a/turbo/stages/stageloop.go
+++ b/turbo/stages/stageloop.go
@@ -19,6 +19,7 @@ import (
 	"github.com/ledgerwatch/erigon/common"
 	"github.com/ledgerwatch/erigon/consensus/misc"
 	"github.com/ledgerwatch/erigon/core/rawdb"
+	"github.com/ledgerwatch/erigon/core/types"
 	"github.com/ledgerwatch/erigon/core/vm"
 	"github.com/ledgerwatch/erigon/eth/ethconfig"
 	"github.com/ledgerwatch/erigon/eth/stagedsync"
@@ -253,6 +254,7 @@ func NewStagedSync(
 	snapshotDownloader proto_downloader.DownloaderClient,
 	snapshots *snapshotsync.RoSnapshots,
 	snapshotDir *dir.Rw,
+	headCh chan *types.Block,
 ) (*stagedsync.Sync, error) {
 	var blockReader interfaces.FullBlockReader
 	if cfg.Snapshot.Enabled {
@@ -320,7 +322,7 @@ func NewStagedSync(
 			stagedsync.StageLogIndexCfg(db, cfg.Prune, tmpdir),
 			stagedsync.StageCallTracesCfg(db, cfg.Prune, 0, tmpdir),
 			stagedsync.StageTxLookupCfg(db, cfg.Prune, tmpdir, snapshots, isBor),
-			stagedsync.StageFinishCfg(db, tmpdir, logger), runInTestMode),
+			stagedsync.StageFinishCfg(db, tmpdir, logger, headCh), runInTestMode),
 		stagedsync.DefaultUnwindOrder,
 		stagedsync.DefaultPruneOrder,
 	), nil
-- 
GitLab