diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index 7567fb1ff213954c904c96e4230037dcddb219d8..569ccd494dc7e4e1231c6f4c4743be9cce4f5b9d 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -1176,7 +1176,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig) sync, err := stages2.NewStagedSync(context.Background(), logger, db, p2p.Config{}, cfg, chainConfig.TerminalTotalDifficulty, sentryControlServer, tmpdir, - &stagedsync.Notifications{}, nil, allSn, cfg.SnapshotDir, + &stagedsync.Notifications{}, nil, allSn, cfg.SnapshotDir, nil, ) if err != nil { panic(err) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 1f64f32f6ba760942d13282379c581e57324962d..b2bf40fea26569375d48594ec67921a2ba2c3eb6 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -297,6 +297,7 @@ var ( EthStatsURLFlag = cli.StringFlag{ Name: "ethstats", Usage: "Reporting URL of a ethstats service (nodename:secret@host:port)", + Value: "", } FakePoWFlag = cli.BoolFlag{ Name: "fakepow", @@ -1408,6 +1409,7 @@ func SetEthConfig(ctx *cli.Context, nodeConfig *node.Config, cfg *ethconfig.Conf setWhitelist(ctx, cfg) setBorConfig(ctx, cfg) + cfg.Ethstats = ctx.GlobalString(EthStatsURLFlag.Name) cfg.P2PEnabled = len(nodeConfig.P2P.SentryAddr) == 0 cfg.EnabledIssuance = ctx.GlobalIsSet(EnabledIssuance.Name) if ctx.GlobalIsSet(NetworkIdFlag.Name) { diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index f21cdbaed40f0c1e0d75c5bafc444bb023f5fed1..4c8efc4a31886c0dd597482404e447a46881b859 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -32,6 +32,7 @@ import ( "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/dbutils" "github.com/ledgerwatch/erigon/core/types" + "github.com/ledgerwatch/erigon/eth/stagedsync/stages" "github.com/ledgerwatch/erigon/ethdb/cbor" "github.com/ledgerwatch/erigon/rlp" ) @@ -259,6 +260,18 @@ func ReadCurrentBlock(db kv.Tx) *types.Block { return ReadBlock(db, headHash, *headNumber) } +func ReadLastBlockSynced(db kv.Tx) (*types.Block, error) { + headNumber, err := stages.GetStageProgress(db, stages.Execution) + if err != nil { + return nil, err + } + headHash, err := ReadCanonicalHash(db, headNumber) + if err != nil { + return nil, err + } + return ReadBlock(db, headHash, headNumber), nil +} + func ReadHeadersByNumber(db kv.Tx, number uint64) ([]*types.Header, error) { var res []*types.Header c, err := db.Cursor(kv.Headers) diff --git a/eth/backend.go b/eth/backend.go index 34f5598d5ac828098ad4b7ab5f829f35d5f5b462..2b641ca6afc4fee26b0ad02e59426304d3d652f7 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -70,6 +70,7 @@ import ( "github.com/ledgerwatch/erigon/eth/stagedsync/stages" "github.com/ledgerwatch/erigon/ethdb/privateapi" "github.com/ledgerwatch/erigon/ethdb/prune" + "github.com/ledgerwatch/erigon/ethstats" "github.com/ledgerwatch/erigon/node" "github.com/ledgerwatch/erigon/p2p" "github.com/ledgerwatch/erigon/params" @@ -486,10 +487,16 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l if err := backend.StartMining(context.Background(), backend.chainDB, mining, backend.config.Miner, backend.gasPrice, backend.quitMining); err != nil { return nil, err } + + var headCh chan *types.Block + if config.Ethstats != "" { + headCh = make(chan *types.Block, 1) + } + backend.stagedSync, err = stages2.NewStagedSync(backend.sentryCtx, backend.log, backend.chainDB, stack.Config().P2P, *config, chainConfig.TerminalTotalDifficulty, backend.sentryControlServer, tmpdir, backend.notifications, - backend.downloaderClient, allSnapshots, config.SnapshotDir) + backend.downloaderClient, allSnapshots, config.SnapshotDir, headCh) if err != nil { return nil, err } @@ -519,7 +526,11 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l gpoParams.Default = config.Miner.GasPrice } //eth.APIBackend.gpo = gasprice.NewOracle(eth.APIBackend, gpoParams) - + if config.Ethstats != "" { + if err := ethstats.New(stack, backend.sentryServers, chainKv, backend.engine, config.Ethstats, backend.networkID, ctx.Done(), headCh); err != nil { + return nil, err + } + } // start HTTP API httpRpcCfg := stack.Config().Http if httpRpcCfg.Enabled { diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index 68eee2e4aefd6d494cbde04301efd2e23df65a1f..17b60641cbe677146ed4bec63d829501888f37b0 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -224,6 +224,8 @@ type Config struct { // No heimdall service WithoutHeimdall bool + // Ethstats service + Ethstats string } func CreateConsensusEngine(chainConfig *params.ChainConfig, logger log.Logger, config interface{}, notify []string, noverify bool, HeimdallURL string, WithoutHeimdall bool, datadir string) consensus.Engine { diff --git a/eth/stagedsync/stage_finish.go b/eth/stagedsync/stage_finish.go index 06803a6326cd1b58677a3880a32f760a7e7af3ef..b9e3bc174b9f5f1fcb1c3e4f79d43a53c54d9df7 100644 --- a/eth/stagedsync/stage_finish.go +++ b/eth/stagedsync/stage_finish.go @@ -25,13 +25,15 @@ type FinishCfg struct { db kv.RwDB tmpDir string log log.Logger + headCh chan *types.Block } -func StageFinishCfg(db kv.RwDB, tmpDir string, logger log.Logger) FinishCfg { +func StageFinishCfg(db kv.RwDB, tmpDir string, logger log.Logger, headCh chan *types.Block) FinishCfg { return FinishCfg{ db: db, log: logger, tmpDir: tmpDir, + headCh: headCh, } } @@ -67,6 +69,14 @@ func FinishForward(s *StageState, tx kv.RwTx, cfg FinishCfg, initialCycle bool) } } + if cfg.headCh != nil { + select { + case cfg.headCh <- rawdb.ReadCurrentBlock(tx): + default: + } + + } + if !useExternalTx { if err := tx.Commit(); err != nil { return err diff --git a/ethstats/ethstats.go b/ethstats/ethstats.go index b8c32fd8afb44d600219a95d8fd6195527064823..7be659303e7b94e24515beb4a64c9c6c281ff791 100644 --- a/ethstats/ethstats.go +++ b/ethstats/ethstats.go @@ -17,7 +17,6 @@ // Package ethstats implements the network stats reporting service. package ethstats -/* import ( "context" "encoding/json" @@ -33,65 +32,38 @@ import ( "time" "github.com/gorilla/websocket" + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon/cmd/sentry/sentry" "github.com/ledgerwatch/erigon/common" - "github.com/ledgerwatch/erigon/common/mclock" "github.com/ledgerwatch/erigon/consensus" - "github.com/ledgerwatch/erigon/core" + "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/types" - "github.com/ledgerwatch/erigon/eth" - "github.com/ledgerwatch/erigon/eth/downloader" - "github.com/ledgerwatch/erigon/event" - "github.com/ledgerwatch/log/v3" - "github.com/ledgerwatch/erigon/miner" + "github.com/ledgerwatch/erigon/eth/stagedsync/stages" "github.com/ledgerwatch/erigon/node" - "github.com/ledgerwatch/erigon/p2p" - "github.com/ledgerwatch/erigon/rpc" + "github.com/ledgerwatch/log/v3" ) const ( // historyUpdateRange is the number of blocks a node should report upon login or // history request. historyUpdateRange = 50 - - // txChanSize is the size of channel listening to NewTxsEvent. - // The number is referenced from the size of tx pool. - txChanSize = 4096 - // chainHeadChanSize is the size of channel listening to ChainHeadEvent. - chainHeadChanSize = 10 ) -// backend encompasses the bare-minimum functionality needed for ethstats reporting -type backend interface { - SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription - SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription - CurrentHeader() *types.Header - HeaderByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Header, error) - GetTd(ctx context.Context, hash common.Hash) *big.Int - Stats() (pending int, queued int) - Downloader() *downloader.Downloader -} - -// fullNodeBackend encompasses the functionality necessary for a full node -// reporting to ethstats -type fullNodeBackend interface { - backend - Miner() *miner.Miner - BlockByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Block, error) - CurrentBlock() *types.Block - SuggestPrice(ctx context.Context) (*big.Int, error) -} - // Service implements an Ethereum netstats reporting daemon that pushes local // chain statistics up to a monitoring server. type Service struct { - server *p2p.Server // Peer-to-peer server to retrieve networking infos - backend backend - engine consensus.Engine // Consensus engine to retrieve variadic block fields + servers []*sentry.SentryServerImpl // Peer-to-peer server to retrieve networking infos + chaindb kv.RoDB + networkid uint64 + engine consensus.Engine // Consensus engine to retrieve variadic block fields node string // Name of the node to display on the monitoring page pass string // Password to authorize access to the monitoring page host string // Remote address of the monitoring service + quitCh <-chan struct{} + headCh <-chan *types.Block + pongCh chan struct{} // Pong notifications are fed into this channel histCh chan []uint64 // History request block numbers are fed into this channel @@ -121,6 +93,9 @@ func newConnectionWrapper(conn *websocket.Conn) *connWrapper { // WriteJSON wraps corresponding method on the websocket but is safe for concurrent calling func (w *connWrapper) WriteJSON(v interface{}) error { + if w.conn == nil { + return nil + } w.wlock.Lock() defer w.wlock.Unlock() @@ -143,7 +118,7 @@ func (w *connWrapper) Close() error { } // New returns a monitoring service ready for stats reporting. -func New(node *node.Node, backend backend, engine consensus.Engine, url string) error { +func New(node *node.Node, servers []*sentry.SentryServerImpl, chainDB kv.RoDB, engine consensus.Engine, url string, networkid uint64, quitCh <-chan struct{}, headCh chan *types.Block) error { // Parse the netstats connection url re := regexp.MustCompile("([^:@]*)(:([^@]*))?@(.+)") parts := re.FindStringSubmatch(url) @@ -151,14 +126,17 @@ func New(node *node.Node, backend backend, engine consensus.Engine, url string) return fmt.Errorf("invalid netstats url: \"%s\", should be nodename:secret@host:port", url) } ethstats := &Service{ - backend: backend, - engine: engine, - server: node.Server(), - node: parts[1], - pass: parts[3], - host: parts[4], - pongCh: make(chan struct{}), - histCh: make(chan []uint64, 1), + engine: engine, + servers: servers, + node: parts[1], + pass: parts[3], + host: parts[4], + pongCh: make(chan struct{}), + histCh: make(chan []uint64, 1), + networkid: networkid, + chaindb: chainDB, + headCh: headCh, + quitCh: quitCh, } node.RegisterLifecycle(ethstats) @@ -182,56 +160,6 @@ func (s *Service) Stop() error { // loop keeps trying to connect to the netstats server, reporting chain events // until termination. func (s *Service) loop() { - // Subscribe to chain events to execute updates on - chainHeadCh := make(chan core.ChainHeadEvent, chainHeadChanSize) - headSub := s.backend.SubscribeChainHeadEvent(chainHeadCh) - defer headSub.Unsubscribe() - - txEventCh := make(chan core.NewTxsEvent, txChanSize) - txSub := s.backend.SubscribeNewTxsEvent(txEventCh) - defer txSub.Unsubscribe() - - // Start a goroutine that exhausts the subscriptions to avoid events piling up - var ( - quitCh = make(chan struct{}) - headCh = make(chan *types.Block, 1) - txCh = make(chan struct{}, 1) - ) - go func() { - var lastTx mclock.AbsTime - - HandleLoop: - for { - select { - // Notify of chain head events, but drop if too frequent - case head := <-chainHeadCh: - select { - case headCh <- head.Block: - default: - } - - // Notify of new transaction events, but drop if too frequent - case <-txEventCh: - if time.Duration(mclock.Now()-lastTx) < time.Second { - continue - } - lastTx = mclock.Now() - - select { - case txCh <- struct{}{}: - default: - } - - // node stopped - case <-txSub.Err(): - break HandleLoop - case <-headSub.Err(): - break HandleLoop - } - } - close(quitCh) - }() - // Resolve the URL, defaulting to TLS, but falling back to none too path := fmt.Sprintf("%s/api", s.host) urls := []string{path} @@ -246,7 +174,7 @@ func (s *Service) loop() { // Loop reporting until termination for { select { - case <-quitCh: + case <-s.quitCh: return case <-errTimer.C: // Establish a websocket connection to the server on any supported URL @@ -258,15 +186,15 @@ func (s *Service) loop() { header := make(http.Header) header.Set("origin", "http://localhost") for _, url := range urls { - c, _, e := dialer.Dial(url, header) - err = e + //nolint + c, _, err := dialer.Dial(url, header) if err == nil { conn = newConnectionWrapper(c) break } } - if err != nil { - log.Warn("Stats server unreachable", "err", err) + if err != nil || conn == nil { + log.Warn("Stats server unreachable") errTimer.Reset(10 * time.Second) continue } @@ -279,19 +207,12 @@ func (s *Service) loop() { } go s.readLoop(conn) - // Send the initial stats so our node looks decent from the get go - if err = s.report(conn); err != nil { - log.Warn("Initial stats report failed", "err", err) - conn.Close() - errTimer.Reset(0) - continue - } // Keep sending status updates until the connection breaks fullReport := time.NewTicker(15 * time.Second) for err == nil { select { - case <-quitCh: + case <-s.quitCh: fullReport.Stop() // Make sure the connection is closed conn.Close() @@ -305,17 +226,11 @@ func (s *Service) loop() { if err = s.reportHistory(conn, list); err != nil { log.Warn("Requested history report failed", "err", err) } - case head := <-headCh: + case head := <-s.headCh: if err = s.reportBlock(conn, head); err != nil { log.Warn("Block stats report failed", "err", err) } - if err = s.reportPending(conn); err != nil { - log.Warn("Post-block transaction stats report failed", "err", err) - } - case <-txCh: - if err = s.reportPending(conn); err != nil { - log.Warn("Transaction stats report failed", "err", err) - } + } } fullReport.Stop() @@ -345,7 +260,7 @@ func (s *Service) readLoop(conn *connWrapper) { // If the network packet is a system ping, respond to it directly var ping string if err := json.Unmarshal(blob, &ping); err == nil && strings.HasPrefix(ping, "primus::ping::") { - if err := conn.WriteJSON(strings.Replace(ping, "ping", "pong", -1)); err != nil { + if err := conn.WriteJSON(strings.ReplaceAll(ping, "ping", "pong")); err != nil { log.Warn("Failed to respond to system ping message", "err", err) return } @@ -442,23 +357,28 @@ type authMsg struct { // login tries to authorize the client at the remote server. func (s *Service) login(conn *connWrapper) error { // Construct and send the login authentication - infos := s.server.NodeInfo() + // infos := s.server.NodeInfo() var protocols []string - for _, proto := range s.server.Protocols { - protocols = append(protocols, fmt.Sprintf("%s/%d", proto.Name, proto.Version)) - } - var network string - if info := infos.Protocols["eth"]; info != nil { - network = fmt.Sprintf("%d", info.(*ethproto.NodeInfo).Network) + for _, srv := range s.servers { + protocols = append(protocols, fmt.Sprintf("%s/%d", srv.Protocol.Name, srv.Protocol.Version)) + } + nodeName := "Erigon" + if len(s.servers) > 0 { + nodeInfo, err := s.servers[0].NodeInfo(context.TODO(), nil) + if err != nil { + return err + } + nodeName = nodeInfo.Name } + auth := &authMsg{ ID: s.node, Info: nodeInfo{ Name: s.node, - Node: infos.Name, - Port: infos.Ports.Listener, - Network: network, + Node: nodeName, + Port: 0, + Network: fmt.Sprintf("%d", s.networkid), Protocol: strings.Join(protocols, ", "), API: "No", Os: runtime.GOOS, @@ -573,8 +493,25 @@ func (s uncleStats) MarshalJSON() ([]byte, error) { // reportBlock retrieves the current chain head and reports it to the stats server. func (s *Service) reportBlock(conn *connWrapper, block *types.Block) error { + roTx, err := s.chaindb.BeginRo(context.Background()) + if err != nil { + return err + } + defer roTx.Rollback() + if block == nil { + block, err = rawdb.ReadLastBlockSynced(roTx) + if err != nil { + return err + } + } + + td, err := rawdb.ReadTd(roTx, block.Hash(), block.NumberU64()) + if err != nil { + return err + } + // Gather the block details from the header or block chain - details := s.assembleBlockStats(block) + details := s.assembleBlockStats(block, td) // Assemble the block report and send it to the server log.Trace("Sending new block to ethstats", "number", details.Number, "hash", details.Hash) @@ -591,54 +528,44 @@ func (s *Service) reportBlock(conn *connWrapper, block *types.Block) error { // assembleBlockStats retrieves any required metadata to report a single block // and assembles the block stats. If block is nil, the current head is processed. -func (s *Service) assembleBlockStats(block *types.Block) *blockStats { +func (s *Service) assembleBlockStats(block *types.Block, td *big.Int) *blockStats { + if td == nil { + td = common.Big0 + } // Gather the block infos from the local blockchain var ( - header *types.Header - td *big.Int - txs []txStats - uncles []*types.Header + txs []txStats ) - - // check if backend is a full node - fullBackend, ok := s.backend.(fullNodeBackend) - if ok { - if block == nil { - block = fullBackend.CurrentBlock() - } - header = block.Header() - td = fullBackend.GetTd(context.Background(), header.Hash()) - - txs = make([]txStats, len(block.Transactions())) - for i, tx := range block.Transactions() { - txs[i].Hash = tx.Hash() - } - uncles = block.Uncles() + for _, tx := range block.Transactions() { + txs = append(txs, txStats{tx.Hash()}) } - // Assemble and return the block stats - author, _ := s.engine.Author(header) - return &blockStats{ - Number: header.Number, - Hash: header.Hash(), - ParentHash: header.ParentHash, - Timestamp: new(big.Int).SetUint64(header.Time), - Miner: author, - GasUsed: header.GasUsed, - GasLimit: header.GasLimit, - Diff: header.Difficulty.String(), + Number: block.Header().Number, + Hash: block.Hash(), + ParentHash: block.Header().ParentHash, + Timestamp: new(big.Int).SetUint64(block.Header().Time), + Miner: block.Header().Coinbase, + GasUsed: block.Header().GasUsed, + GasLimit: block.Header().GasLimit, + Diff: block.Header().Difficulty.String(), TotalDiff: td.String(), Txs: txs, - TxHash: header.TxHash, - Root: header.Root, - Uncles: uncles, + TxHash: block.Header().TxHash, + Root: block.Header().Root, + Uncles: block.Uncles(), } } // reportHistory retrieves the most recent batch of blocks and reports it to the // stats server. func (s *Service) reportHistory(conn *connWrapper, list []uint64) error { + roTx, err := s.chaindb.BeginRo(context.Background()) + if err != nil { + return err + } + defer roTx.Rollback() + // Figure out the indexes that need reporting indexes := make([]uint64, 0, historyUpdateRange) if len(list) > 0 { @@ -646,31 +573,34 @@ func (s *Service) reportHistory(conn *connWrapper, list []uint64) error { indexes = append(indexes, list...) } else { // No indexes requested, send back the top ones - head := s.backend.CurrentHeader().Number.Int64() - start := head - historyUpdateRange + 1 + headHash := rawdb.ReadHeadBlockHash(roTx) + headNumber := rawdb.ReadHeaderNumber(roTx, headHash) + if headNumber == nil { + return nil + } + start := int(*headNumber - historyUpdateRange + 1) if start < 0 { start = 0 } - for i := uint64(start); i <= uint64(head); i++ { + for i := uint64(start); i <= *headNumber; i++ { indexes = append(indexes, i) } } // Gather the batch of blocks to report history := make([]*blockStats, len(indexes)) for i, number := range indexes { - fullBackend, ok := s.backend.(fullNodeBackend) // Retrieve the next block if it's known to us - var block *types.Block - if ok { - block, _ = fullBackend.BlockByNumber(context.Background(), rpc.BlockNumber(number)) // TODO ignore error here ? - } else { - if header, _ := s.backend.HeaderByNumber(context.Background(), rpc.BlockNumber(number)); header != nil { - block = types.NewBlockWithHeader(header) - } + block, err := rawdb.ReadBlockByNumber(roTx, number) + if err != nil { + return err + } + td, err := rawdb.ReadTd(roTx, block.Hash(), number) + if err != nil { + return err } // If we do have the block, add to the history and continue if block != nil { - history[len(history)-1-i] = s.assembleBlockStats(block) + history[len(history)-1-i] = s.assembleBlockStats(block, td) continue } // Ran out of blocks, cut the report short and send @@ -693,77 +623,69 @@ func (s *Service) reportHistory(conn *connWrapper, list []uint64) error { return conn.WriteJSON(report) } -// pendStats is the information to report about pending transactions. -type pendStats struct { - Pending int `json:"pending"` -} - // reportPending retrieves the current number of pending transactions and reports // it to the stats server. func (s *Service) reportPending(conn *connWrapper) error { - // Retrieve the pending count from the local blockchain - pending, _ := s.backend.Stats() - // Assemble the transaction stats and send it to the server - log.Trace("Sending pending transactions to ethstats", "count", pending) - - stats := map[string]interface{}{ - "id": s.node, - "stats": &pendStats{ - Pending: pending, - }, - } - report := map[string][]interface{}{ - "emit": {"pending", stats}, - } - return conn.WriteJSON(report) + /* // Retrieve the pending count from the local blockchain + pending, _ := s.backend.Stats() + // Assemble the transaction stats and send it to the server + log.Trace("Sending pending transactions to ethstats", "count", pending) + + stats := map[string]interface{}{ + "id": s.node, + "stats": &pendStats{ + Pending: pending, + }, + } + report := map[string][]interface{}{ + "emit": {"pending", stats}, + } + return conn.WriteJSON(report)*/ + return nil } // nodeStats is the information to report about the local node. type nodeStats struct { - Active bool `json:"active"` - Syncing bool `json:"syncing"` - Mining bool `json:"mining"` - Hashrate int `json:"hashrate"` - GoodPeers int `json:"peers"` - GasPrice int `json:"gasPrice"` - Uptime int `json:"uptime"` + Active bool `json:"active"` + Syncing bool `json:"syncing"` + Mining bool `json:"mining"` + Hashrate int `json:"hashrate"` + GoodPeers int `json:"peers"` + GasPrice int `json:"gasPrice"` + Uptime int `json:"uptime"` } // reportStats retrieves various stats about the node at the networking and // mining layer and reports it to the stats server. func (s *Service) reportStats(conn *connWrapper) error { - // Gather the syncing and mining infos from the local miner instance - var ( - mining bool - hashrate int - syncing bool - gasprice int - ) - // check if backend is a full node - fullBackend, ok := s.backend.(fullNodeBackend) - if ok { - mining = fullBackend.Miner().Mining() - hashrate = int(fullBackend.Miner().HashRate()) - - sync := fullBackend.Downloader().Progress() - syncing = fullBackend.CurrentHeader().Number.Uint64() >= sync.HighestBlock - - price, _ := fullBackend.SuggestPrice(context.Background()) - gasprice = int(price.Uint64()) + roTx, err := s.chaindb.BeginRo(context.Background()) + if err != nil { + return err + } + defer roTx.Rollback() + sync, err := stages.GetStageProgress(roTx, stages.Execution) + if err != nil { + return err + } + finishSync, err := stages.GetStageProgress(roTx, stages.Finish) + if err != nil { + return err + } + // TODO(Giulio2002): peer tracking + peerCount := 0 + for _, srv := range s.servers { + peerCount += srv.SimplePeerCount() } - // Assemble the node stats and send it to the server - log.Trace("Sending node details to ethstats") - stats := map[string]interface{}{ "id": s.node, "stats": &nodeStats{ - Active: true, - Mining: mining, - Hashrate: hashrate, - GoodPeers: s.server.PeerCount(), - GasPrice: gasprice, - Syncing: syncing, - Uptime: 100, + Active: true, + Mining: false, + Hashrate: 0, + GoodPeers: peerCount, + GasPrice: 0, + Syncing: sync != finishSync, + Uptime: 100, }, } report := map[string][]interface{}{ @@ -771,4 +693,3 @@ func (s *Service) reportStats(conn *connWrapper) error { } return conn.WriteJSON(report) } -*/ diff --git a/turbo/cli/default_flags.go b/turbo/cli/default_flags.go index 1300e3f1d9c60a8d35b4815735311aff0b2983fe..a252294ba9fcf3c2a9d7222416d7431195ccc83c 100644 --- a/turbo/cli/default_flags.go +++ b/turbo/cli/default_flags.go @@ -122,4 +122,5 @@ var DefaultFlags = []cli.Flag{ HealthCheckFlag, utils.HeimdallURLFlag, utils.WithoutHeimdallFlag, + utils.EthStatsURLFlag, } diff --git a/turbo/stages/mock_sentry.go b/turbo/stages/mock_sentry.go index 897f3bf4265cfaf69dfedd5ad8205d2ef0d994a0..55756246497e073e7abf67135818011b229b67a1 100644 --- a/turbo/stages/mock_sentry.go +++ b/turbo/stages/mock_sentry.go @@ -337,7 +337,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey stagedsync.StageLogIndexCfg(mock.DB, prune, mock.tmpdir), stagedsync.StageCallTracesCfg(mock.DB, prune, 0, mock.tmpdir), stagedsync.StageTxLookupCfg(mock.DB, prune, mock.tmpdir, allSnapshots, isBor), - stagedsync.StageFinishCfg(mock.DB, mock.tmpdir, mock.Log), true), + stagedsync.StageFinishCfg(mock.DB, mock.tmpdir, mock.Log, nil), true), stagedsync.DefaultUnwindOrder, stagedsync.DefaultPruneOrder, ) diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index 520e24086f7722b1e4fa0755bc9b863180296116..a2664a0f8c87e430b80e1e142df91b03d91c71b0 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -19,6 +19,7 @@ import ( "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/consensus/misc" "github.com/ledgerwatch/erigon/core/rawdb" + "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/core/vm" "github.com/ledgerwatch/erigon/eth/ethconfig" "github.com/ledgerwatch/erigon/eth/stagedsync" @@ -253,6 +254,7 @@ func NewStagedSync( snapshotDownloader proto_downloader.DownloaderClient, snapshots *snapshotsync.RoSnapshots, snapshotDir *dir.Rw, + headCh chan *types.Block, ) (*stagedsync.Sync, error) { var blockReader interfaces.FullBlockReader if cfg.Snapshot.Enabled { @@ -320,7 +322,7 @@ func NewStagedSync( stagedsync.StageLogIndexCfg(db, cfg.Prune, tmpdir), stagedsync.StageCallTracesCfg(db, cfg.Prune, 0, tmpdir), stagedsync.StageTxLookupCfg(db, cfg.Prune, tmpdir, snapshots, isBor), - stagedsync.StageFinishCfg(db, tmpdir, logger), runInTestMode), + stagedsync.StageFinishCfg(db, tmpdir, logger, headCh), runInTestMode), stagedsync.DefaultUnwindOrder, stagedsync.DefaultPruneOrder, ), nil