diff --git a/cmd/headers/commands/download.go b/cmd/headers/commands/download.go deleted file mode 100644 index 9168d5fb2d9d7185d36afecf7d2b5f87e3c5282d..0000000000000000000000000000000000000000 --- a/cmd/headers/commands/download.go +++ /dev/null @@ -1,59 +0,0 @@ -package commands - -import ( - "path" - - "github.com/ledgerwatch/turbo-geth/cmd/headers/download" - "github.com/ledgerwatch/turbo-geth/common/etl" - "github.com/ledgerwatch/turbo-geth/turbo/node" - "github.com/spf13/cobra" -) - -var ( - combined bool // Whether downloader also includes sentry - timeout int // Timeout for delivery requests - window int // Size of sliding window for downloading block bodies - chain string // Name of the network to connect to -) - -var ( - // gitCommit is injected through the build flags (see Makefile) - gitCommit string - gitBranch string -) - -func init() { - downloadCmd.Flags().StringSliceVar(&sentryAddrs, "sentry.api.addr", []string{"localhost:9091"}, "comma separated sentry addresses '<host>:<port>,<host>:<port>'") - downloadCmd.Flags().BoolVar(&combined, "combined", false, "run downloader and sentry in the same process") - downloadCmd.Flags().IntVar(&timeout, "timeout", 30, "timeout for devp2p delivery requests, in seconds") - downloadCmd.Flags().IntVar(&window, "window", 65536, "size of sliding window for downloading block bodies, block") - downloadCmd.Flags().StringVar(&chain, "chain", "mainnet", "Name of the network (mainnet, testnets) to connect to") - - // Options below are only used in the combined mode - downloadCmd.Flags().StringVar(&natSetting, "nat", "any", "NAT port mapping mechanism (any|none|upnp|pmp|extip:<IP>)") - downloadCmd.Flags().IntVar(&port, "port", 30303, "p2p port number") - downloadCmd.Flags().StringSliceVar(&staticPeers, "staticpeers", []string{}, "static peer list [enode]") - downloadCmd.Flags().BoolVar(&discovery, "discovery", true, "discovery mode") - downloadCmd.Flags().StringVar(&netRestrict, "netrestrict", "", "CIDR range to accept peers from <CIDR>") - - withDatadir(downloadCmd) - rootCmd.AddCommand(downloadCmd) -} - -var downloadCmd = &cobra.Command{ - Use: "download", - Short: "Download headers backwards", - RunE: func(cmd *cobra.Command, args []string) error { - db := openDatabase(chaindata) - defer db.Close() - nodeConfig := node.NewNodeConfig(node.Params{GitCommit: gitCommit, GitBranch: gitBranch}) - nodeName := nodeConfig.NodeName() - tmpdir := path.Join(nodeConfig.DataDir, etl.TmpDirName) - - if combined { - return download.Combined(nodeConfig.DataDir, natSetting, port, staticPeers, discovery, netRestrict, db, timeout, window, chain, nodeName, tmpdir) - } - - return download.Download(sentryAddrs, db, timeout, window, chain, nodeName, tmpdir) - }, -} diff --git a/cmd/headers/download/downloader.go b/cmd/headers/download/downloader.go index 6917837b6234a2154f8198229702ba654b9b8636..cd8a81cc9fdc83793baa6d7cedf8dd6240a7fc55 100644 --- a/cmd/headers/download/downloader.go +++ b/cmd/headers/download/downloader.go @@ -23,7 +23,6 @@ import ( "github.com/ledgerwatch/turbo-geth/eth/protocols/eth" "github.com/ledgerwatch/turbo-geth/eth/stagedsync" "github.com/ledgerwatch/turbo-geth/ethdb" - "github.com/ledgerwatch/turbo-geth/ethdb/remote/remotedbserver" "github.com/ledgerwatch/turbo-geth/gointerfaces" proto_sentry "github.com/ledgerwatch/turbo-geth/gointerfaces/sentry" "github.com/ledgerwatch/turbo-geth/log" @@ -56,95 +55,6 @@ func GrpcSentryClient(ctx context.Context, sentryAddr string) (proto_sentry.Sent return proto_sentry.NewSentryClient(conn), nil } -// Download creates and starts standalone downloader -func Download(sentryAddrs []string, db ethdb.Database, timeout, window int, chain string, nodeName string, tmpdir string) error { - ctx := rootContext() - - log.Info("Starting Sentry client", "connecting to sentry", sentryAddrs) - sentries := make([]proto_sentry.SentryClient, len(sentryAddrs)) - for i, addr := range sentryAddrs { - sentry, err := GrpcSentryClient(ctx, addr) - if err != nil { - return err - } - sentries[i] = sentry - } - - chainConfig, genesisHash, engine, networkID := cfg(db, chain) - controlServer, err1 := NewControlServer(db, nodeName, chainConfig, genesisHash, engine, networkID, sentries, window) - if err1 != nil { - return fmt.Errorf("create core P2P server: %w", err1) - } - - // TODO: Make a reconnection loop - statusMsg := makeStatusData(controlServer) - - for _, sentry := range sentries { - if _, err := sentry.SetStatus(ctx, statusMsg, &grpc.EmptyCallOption{}); err != nil { - return fmt.Errorf("setting initial status message: %w", err) - } - } - - for _, sentry := range sentries { - go func(sentry proto_sentry.SentryClient) { - for { - select { - case <-ctx.Done(): - return - default: - } - RecvMessage(ctx, sentry, controlServer.HandleInboundMessage) - // Wait before trying to reconnect to prevent log flooding - time.Sleep(2 * time.Second) - } - }(sentry) - } - for _, sentry := range sentries { - go func(sentry proto_sentry.SentryClient) { - for { - select { - case <-ctx.Done(): - return - default: - } - RecvUploadMessage(ctx, sentry, controlServer.HandleInboundMessage) - // Wait before trying to reconnect to prevent log flooding - time.Sleep(2 * time.Second) - } - }(sentry) - } - - sm, err := ethdb.GetStorageModeFromDB(db) - if err != nil { - return err - } - - batchSize := 512 * datasize.MB - sync, err := NewStagedSync( - ctx, - db.(ethdb.HasRwKV).RwKV(), - sm, - batchSize, - timeout, - controlServer, - tmpdir, - nil, - ) - if err != nil { - return err - } - - stages.StageLoop( - ctx, - db, - sync, - controlServer.hd, - controlServer.chainConfig, - remotedbserver.NewEvents(), - ) - return nil -} - func RecvUploadMessage(ctx context.Context, sentry proto_sentry.SentryClient, handleInboundMessage func(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry proto_sentry.SentryClient) error) { streamCtx, cancel := context.WithCancel(ctx) defer cancel() @@ -201,71 +111,6 @@ func RecvMessage(ctx context.Context, sentry proto_sentry.SentryClient, handleIn } } -// Combined creates and starts sentry and downloader in the same process -func Combined(datadir string, natSetting string, port int, staticPeers []string, discovery bool, netRestrict string, db ethdb.Database, timeout, window int, chain string, nodeName string, tmpdir string) error { - ctx := rootContext() - - sentryServer := NewSentryServer(ctx, datadir) - sentry := &SentryClientDirect{} - sentry.SetServer(sentryServer) - chainConfig, genesisHash, engine, networkID := cfg(db, chain) - sentries := []proto_sentry.SentryClient{sentry} - controlServer, err := NewControlServer(db, nodeName, chainConfig, genesisHash, engine, networkID, sentries, window) - if err != nil { - return fmt.Errorf("create core P2P server: %w", err) - } - - var readNodeInfo = func() *eth.NodeInfo { - var res *eth.NodeInfo - _ = db.(ethdb.HasRwKV).RwKV().View(context.Background(), func(tx ethdb.Tx) error { - res = eth.ReadNodeInfo(db, controlServer.chainConfig, controlServer.genesisHash, controlServer.networkId) - return nil - }) - return res - } - - sentryServer.P2pServer, err = p2pServer(ctx, datadir, sentryServer.nodeName, readNodeInfo, sentryServer, natSetting, port, staticPeers, discovery, netRestrict, controlServer.genesisHash) - if err != nil { - return err - } - - if err = SetSentryStatus(ctx, sentries, controlServer); err != nil { - log.Error("failed to set sentry status", "error", err) - return nil - } - sm, err := ethdb.GetStorageModeFromDB(db) - if err != nil { - return err - } - batchSize := 512 * datasize.MB - sync, err := NewStagedSync( - ctx, - db.(ethdb.HasRwKV).RwKV(), - sm, - batchSize, - timeout, - controlServer, - tmpdir, - nil, - ) - if err != nil { - return err - } - - go RecvMessage(ctx, sentry, controlServer.HandleInboundMessage) - go RecvUploadMessage(ctx, sentry, controlServer.HandleInboundMessage) - - stages.StageLoop( - ctx, - db, - sync, - controlServer.hd, - controlServer.chainConfig, - remotedbserver.NewEvents(), - ) - return nil -} - //Deprecated - use stages.StageLoop func Loop(ctx context.Context, db ethdb.Database, sync *stagedsync.StagedSync, controlServer *ControlServerImpl, notifier stagedsync.ChainEventNotifier) { stages.StageLoop( @@ -296,6 +141,7 @@ func NewStagedSync( controlServer *ControlServerImpl, tmpdir string, txPool *core.TxPool, + txPoolServer *eth.TxPoolServer, ) (*stagedsync.StagedSync, error) { var increment uint64 if sm.Pruning { @@ -345,7 +191,10 @@ func NewStagedSync( stagedsync.StageLogIndexCfg(db, tmpdir), stagedsync.StageCallTracesCfg(db, 0, batchSize, tmpdir, controlServer.chainConfig, controlServer.engine), stagedsync.StageTxLookupCfg(db, tmpdir), - stagedsync.StageTxPoolCfg(db, txPool), + stagedsync.StageTxPoolCfg(db, txPool, func() { + txPoolServer.Start() + txPoolServer.TxFetcher.Start() + }), stagedsync.StageFinishCfg(db, tmpdir), ), nil } diff --git a/cmd/headers/download/sentry.go b/cmd/headers/download/sentry.go index bcada8e79dd9ea86bf5f1395946e82f9e0debe15..9ef7bf444ac1342c579babafc6eb8427f96763d7 100644 --- a/cmd/headers/download/sentry.go +++ b/cmd/headers/download/sentry.go @@ -920,7 +920,7 @@ func trySend(ch chan<- StreamMsg, msg *StreamMsg) { select { case ch <- *msg: default: - log.Warn("Dropped stream message", "type", msg.msgName) + //log.Warn("Dropped stream message", "type", msg.msgName) } } diff --git a/turbo/stages/sentry_mock_test.go b/cmd/headers/download/sentry_mock_test.go similarity index 59% rename from turbo/stages/sentry_mock_test.go rename to cmd/headers/download/sentry_mock_test.go index 2a21b9427da921fb411f2849969d5ed847987b49..85c150da12f026e416e4ffb6f76c23b4acc613c7 100644 --- a/turbo/stages/sentry_mock_test.go +++ b/cmd/headers/download/sentry_mock_test.go @@ -1,4 +1,4 @@ -package stages +package download import ( "context" @@ -15,15 +15,54 @@ import ( "github.com/ledgerwatch/turbo-geth/core" "github.com/ledgerwatch/turbo-geth/core/types" "github.com/ledgerwatch/turbo-geth/core/vm" + "github.com/ledgerwatch/turbo-geth/eth/protocols/eth" "github.com/ledgerwatch/turbo-geth/eth/stagedsync" "github.com/ledgerwatch/turbo-geth/ethdb" + "github.com/ledgerwatch/turbo-geth/gointerfaces/sentry" "github.com/ledgerwatch/turbo-geth/params" + "github.com/ledgerwatch/turbo-geth/turbo/stages" "github.com/ledgerwatch/turbo-geth/turbo/stages/bodydownload" "github.com/ledgerwatch/turbo-geth/turbo/stages/headerdownload" + "google.golang.org/protobuf/types/known/emptypb" ) +type MockSentry struct { + sentry.UnimplementedSentryServer +} + +func (ms *MockSentry) PenalizePeer(context.Context, *sentry.PenalizePeerRequest) (*emptypb.Empty, error) { + return nil, nil +} +func (ms *MockSentry) PeerMinBlock(context.Context, *sentry.PeerMinBlockRequest) (*emptypb.Empty, error) { + return nil, nil +} +func (ms *MockSentry) SendMessageByMinBlock(context.Context, *sentry.SendMessageByMinBlockRequest) (*sentry.SentPeers, error) { + return nil, nil +} +func (ms *MockSentry) SendMessageById(context.Context, *sentry.SendMessageByIdRequest) (*sentry.SentPeers, error) { + return nil, nil +} +func (ms *MockSentry) SendMessageToRandomPeers(context.Context, *sentry.SendMessageToRandomPeersRequest) (*sentry.SentPeers, error) { + return nil, nil +} +func (ms *MockSentry) SendMessageToAll(context.Context, *sentry.OutboundMessageData) (*sentry.SentPeers, error) { + return nil, nil +} +func (ms *MockSentry) SetStatus(context.Context, *sentry.StatusData) (*emptypb.Empty, error) { + return nil, nil +} +func (ms *MockSentry) ReceiveMessages(*emptypb.Empty, sentry.Sentry_ReceiveMessagesServer) error { + return nil +} +func (ms *MockSentry) ReceiveUploadMessages(*emptypb.Empty, sentry.Sentry_ReceiveUploadMessagesServer) error { + return nil +} +func (ms *MockSentry) ReceiveTxMessages(*emptypb.Empty, sentry.Sentry_ReceiveTxMessagesServer) error { + return nil +} + // passing tmpdir because it is renponsibility of the caller to clean it up -func testStagedSync(tmpdir string) *stagedsync.StagedSync { +func testStagedSync(t *testing.T, tmpdir string) *stagedsync.StagedSync { ctx := context.Background() memDb := ethdb.NewMemDatabase() defer memDb.Close() @@ -55,7 +94,14 @@ func testStagedSync(tmpdir string) *stagedsync.StagedSync { txPoolConfig.Journal = "" txPoolConfig.StartOnInit = true txPool := core.NewTxPool(txPoolConfig, chainConfig, memDb, txCacher) - return NewStagedSync(ctx, sm, + txSentryClient := &SentryClientDirect{} + txSentry := &MockSentry{} + txSentryClient.SetServer(txSentry) + txPoolServer, err := eth.NewTxPoolServer(ctx, []sentry.SentryClient{txSentryClient}, txPool) + if err != nil { + t.Fatal(err) + } + return stages.NewStagedSync(ctx, sm, stagedsync.StageHeadersCfg( db, hd, @@ -98,7 +144,10 @@ func testStagedSync(tmpdir string) *stagedsync.StagedSync { stagedsync.StageLogIndexCfg(db, tmpdir), stagedsync.StageCallTracesCfg(db, 0, batchSize, tmpdir, chainConfig, engine), stagedsync.StageTxLookupCfg(db, tmpdir), - stagedsync.StageTxPoolCfg(db, txPool), + stagedsync.StageTxPoolCfg(db, txPool, func() { + txPoolServer.Start() + txPoolServer.TxFetcher.Start() + }), stagedsync.StageFinishCfg(db, tmpdir), ) } @@ -110,5 +159,5 @@ func TestEmptyStageSync(t *testing.T) { } defer os.RemoveAll(tmpdir) // clean up - testStagedSync(tmpdir) + testStagedSync(t, tmpdir) } diff --git a/eth/backend.go b/eth/backend.go index bc2046b395dd6e53485411c98509bc7887de91b4..f3cd7c826ce5bf9717ede93b0f1176a4143cd768 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -108,7 +108,7 @@ type Ethereum struct { downloadV2Cancel context.CancelFunc downloadServer *download.ControlServerImpl sentryServer *download.SentryServerImpl - txPoolServer *download.TxPoolServer + txPoolServer *eth.TxPoolServer sentries []proto_sentry.SentryClient stagedSync2 *stagedsync.StagedSync } @@ -169,7 +169,7 @@ func New(stack *node.Node, config *ethconfig.Config, gitCommit string) (*Ethereu } log.Info("Initialised chain configuration", "config", chainConfig) - eth := &Ethereum{ + backend := &Ethereum{ config: config, chainDB: chainDb, chainKV: chainDb.(ethdb.HasRwKV).RwKV(), @@ -180,7 +180,7 @@ func New(stack *node.Node, config *ethconfig.Config, gitCommit string) (*Ethereu chainConfig: chainConfig, genesisHash: genesisHash, } - eth.gasPrice, _ = uint256.FromBig(config.Miner.GasPrice) + backend.gasPrice, _ = uint256.FromBig(config.Miner.GasPrice) var consensusConfig interface{} @@ -190,7 +190,7 @@ func New(stack *node.Node, config *ethconfig.Config, gitCommit string) (*Ethereu consensusConfig = &config.Ethash } - eth.engine = ethconfig.CreateConsensusEngine(chainConfig, consensusConfig, config.Miner.Notify, config.Miner.Noverify) + backend.engine = ethconfig.CreateConsensusEngine(chainConfig, consensusConfig, config.Miner.Notify, config.Miner.Noverify) log.Info("Initialising Ethereum protocol", "network", config.NetworkID) @@ -210,9 +210,9 @@ func New(stack *node.Node, config *ethconfig.Config, gitCommit string) (*Ethereu } } - if sm.Pruning && !eth.config.EnableDownloadV2 { + if sm.Pruning && !backend.config.EnableDownloadV2 { log.Info("Pruning is on, switching to new downloader") - eth.config.EnableDownloadV2 = true + backend.config.EnableDownloadV2 = true } if err = stagedsync.UpdateMetrics(chainDb); err != nil { @@ -226,12 +226,12 @@ func New(stack *node.Node, config *ethconfig.Config, gitCommit string) (*Ethereu config.TxPool.Journal = stack.ResolvePath(config.TxPool.Journal) } - eth.txPool = core.NewTxPool(config.TxPool, chainConfig, chainDb, txCacher) + backend.txPool = core.NewTxPool(config.TxPool, chainConfig, chainDb, txCacher) stagedSync := config.StagedSync // setting notifier to support streaming events to rpc daemon - eth.events = remotedbserver.NewEvents() + backend.events = remotedbserver.NewEvents() var mg *snapshotsync.SnapshotMigrator if config.SnapshotLayout { currentSnapshotBlock, currentInfohash, err := snapshotsync.GetSnapshotInfo(chainDb) @@ -247,14 +247,14 @@ func New(stack *node.Node, config *ethconfig.Config, gitCommit string) (*Ethereu if stagedSync == nil { // if there is not stagedsync, we create one with the custom notifier if config.SnapshotLayout { - stagedSync = stagedsync.New(stagedsync.WithSnapshotsStages(), stagedsync.UnwindOrderWithSnapshots(), stagedsync.OptionalParameters{Notifier: eth.events, SnapshotDir: snapshotsDir, TorrnetClient: torrentClient, SnapshotMigrator: mg}) + stagedSync = stagedsync.New(stagedsync.WithSnapshotsStages(), stagedsync.UnwindOrderWithSnapshots(), stagedsync.OptionalParameters{Notifier: backend.events, SnapshotDir: snapshotsDir, TorrnetClient: torrentClient, SnapshotMigrator: mg}) } else { - stagedSync = stagedsync.New(stagedsync.DefaultStages(), stagedsync.DefaultUnwindOrder(), stagedsync.OptionalParameters{Notifier: eth.events}) + stagedSync = stagedsync.New(stagedsync.DefaultStages(), stagedsync.DefaultUnwindOrder(), stagedsync.OptionalParameters{Notifier: backend.events}) } } else { // otherwise we add one if needed if stagedSync.Notifier == nil { - stagedSync.Notifier = eth.events + stagedSync.Notifier = backend.events } if config.SnapshotLayout { stagedSync.SetTorrentParams(torrentClient, snapshotsDir, mg) @@ -265,7 +265,7 @@ func New(stack *node.Node, config *ethconfig.Config, gitCommit string) (*Ethereu mining := stagedsync.New(stagedsync.MiningStages(), stagedsync.MiningUnwindOrder(), stagedsync.OptionalParameters{}) var ethashApi *ethash.API - if casted, ok := eth.engine.(*ethash.Ethash); ok { + if casted, ok := backend.engine.(*ethash.Ethash); ok { ethashApi = casted.APIs(nil)[1].Service.(*ethash.API) } if stack.Config().PrivateApiAddr != "" { @@ -301,29 +301,29 @@ func New(stack *node.Node, config *ethconfig.Config, gitCommit string) (*Ethereu if err != nil { return nil, err } - eth.privateAPI, err = remotedbserver.StartGrpc( + backend.privateAPI, err = remotedbserver.StartGrpc( chainDb.(ethdb.HasRwKV).RwKV(), - eth, - eth.txPool, + backend, + backend.txPool, ethashApi, stack.Config().PrivateApiAddr, stack.Config().PrivateApiRateLimit, &creds, - eth.events, + backend.events, gitCommit) if err != nil { return nil, err } } else { - eth.privateAPI, err = remotedbserver.StartGrpc( + backend.privateAPI, err = remotedbserver.StartGrpc( chainDb.(ethdb.HasRwKV).RwKV(), - eth, - eth.txPool, + backend, + backend.txPool, ethashApi, stack.Config().PrivateApiAddr, stack.Config().PrivateApiRateLimit, nil, - eth.events, + backend.events, gitCommit) if err != nil { return nil, err @@ -332,55 +332,54 @@ func New(stack *node.Node, config *ethconfig.Config, gitCommit string) (*Ethereu } checkpoint := config.Checkpoint - if eth.config.EnableDownloadV2 { - eth.downloadV2Ctx, eth.downloadV2Cancel = context.WithCancel(context.Background()) + if backend.config.EnableDownloadV2 { + backend.downloadV2Ctx, backend.downloadV2Cancel = context.WithCancel(context.Background()) if len(stack.Config().P2P.SentryAddr) > 0 { for _, addr := range stack.Config().P2P.SentryAddr { - sentry, err := download.GrpcSentryClient(eth.downloadV2Ctx, addr) + sentry, err := download.GrpcSentryClient(backend.downloadV2Ctx, addr) if err != nil { return nil, err } - eth.sentries = append(eth.sentries, sentry) + backend.sentries = append(backend.sentries, sentry) } } else { - eth.sentryServer = download.NewSentryServer(eth.downloadV2Ctx, stack.Config().DataDir) + backend.sentryServer = download.NewSentryServer(backend.downloadV2Ctx, stack.Config().DataDir) sentry := &download.SentryClientDirect{} - eth.sentryServer.P2pServer = eth.p2pServer - sentry.SetServer(eth.sentryServer) - eth.sentries = []proto_sentry.SentryClient{sentry} + backend.sentryServer.P2pServer = backend.p2pServer + sentry.SetServer(backend.sentryServer) + backend.sentries = []proto_sentry.SentryClient{sentry} } blockDownloaderWindow := 65536 - eth.downloadServer, err = download.NewControlServer(chainDb, stack.Config().NodeName(), chainConfig, genesisHash, eth.engine, eth.config.NetworkID, eth.sentries, blockDownloaderWindow) + backend.downloadServer, err = download.NewControlServer(chainDb, stack.Config().NodeName(), chainConfig, genesisHash, backend.engine, backend.config.NetworkID, backend.sentries, blockDownloaderWindow) if err != nil { return nil, err } - if err = download.SetSentryStatus(eth.downloadV2Ctx, eth.sentries, eth.downloadServer); err != nil { + if err = download.SetSentryStatus(backend.downloadV2Ctx, backend.sentries, backend.downloadServer); err != nil { return nil, err } - eth.txPoolServer, err = download.NewTxPoolServer(eth.sentries, eth.txPool) + backend.txPoolServer, err = eth.NewTxPoolServer(backend.downloadV2Ctx, backend.sentries, backend.txPool) if err != nil { return nil, err } fetchTx := func(peerID string, hashes []common.Hash) error { - eth.txPoolServer.SendTxsRequest(context.TODO(), peerID, hashes) + backend.txPoolServer.SendTxsRequest(context.TODO(), peerID, hashes) return nil } - eth.txPoolServer.TxFetcher = fetcher.NewTxFetcher(eth.txPool.Has, eth.txPool.AddRemotes, fetchTx) - eth.txPoolServer.TxFetcher.Start() - eth.txPool.Start(0, 0) // Start tx pool to avoid deadlocks in the initial stages (before TxPool stage starts working) + backend.txPoolServer.TxFetcher = fetcher.NewTxFetcher(backend.txPool.Has, backend.txPool.AddRemotes, fetchTx) bodyDownloadTimeoutSeconds := 30 // TODO: convert to duration, make configurable - eth.stagedSync2, err = download.NewStagedSync( - eth.downloadV2Ctx, - eth.chainKV, + backend.stagedSync2, err = download.NewStagedSync( + backend.downloadV2Ctx, + backend.chainKV, sm, config.BatchSize, bodyDownloadTimeoutSeconds, - eth.downloadServer, + backend.downloadServer, tmpdir, - eth.txPool, + backend.txPool, + backend.txPoolServer, ) if err != nil { return nil, err @@ -392,13 +391,13 @@ func New(stack *node.Node, config *ethconfig.Config, gitCommit string) (*Ethereu return nil, core.ErrNoGenesis } - if eth.handler, err = newHandler(&handlerConfig{ + if backend.handler, err = newHandler(&handlerConfig{ Database: chainDb, ChainConfig: chainConfig, genesis: genesisBlock, vmConfig: &vmConfig, - engine: eth.engine, - TxPool: eth.txPool, + engine: backend.engine, + TxPool: backend.txPool, Network: config.NetworkID, Checkpoint: checkpoint, @@ -407,14 +406,14 @@ func New(stack *node.Node, config *ethconfig.Config, gitCommit string) (*Ethereu return nil, err } - eth.handler.SetTmpDir(tmpdir) - eth.handler.SetBatchSize(config.BatchSize) - eth.handler.SetStagedSync(stagedSync) + backend.handler.SetTmpDir(tmpdir) + backend.handler.SetBatchSize(config.BatchSize) + backend.handler.SetStagedSync(stagedSync) } - go SendPendingTxsToRpcDaemon(eth.txPool, eth.events) + go SendPendingTxsToRpcDaemon(backend.txPool, backend.events) - if err := eth.StartMining(mining, tmpdir); err != nil { + if err := backend.StartMining(mining, tmpdir); err != nil { return nil, err } @@ -424,25 +423,24 @@ func New(stack *node.Node, config *ethconfig.Config, gitCommit string) (*Ethereu gpoParams.Default = config.Miner.GasPrice } //eth.APIBackend.gpo = gasprice.NewOracle(eth.APIBackend, gpoParams) - eth.ethDialCandidates, err = setupDiscovery(eth.config.EthDiscoveryURLs) + backend.ethDialCandidates, err = setupDiscovery(backend.config.EthDiscoveryURLs) if err != nil { return nil, err } - eth.ethDialCandidates, err = setupDiscovery(eth.config.EthDiscoveryURLs) + backend.ethDialCandidates, err = setupDiscovery(backend.config.EthDiscoveryURLs) if err != nil { return nil, err } // Register the backend on the node - stack.RegisterAPIs(eth.APIs()) - if eth.config.P2PEnabled { - stack.RegisterProtocols(eth.Protocols()) + stack.RegisterAPIs(backend.APIs()) + if backend.config.P2PEnabled { + stack.RegisterProtocols(backend.Protocols()) } - stack.RegisterLifecycle(eth) - // Check for unclean shutdown - return eth, nil + stack.RegisterLifecycle(backend) + return backend, nil } func SendPendingTxsToRpcDaemon(txPool *core.TxPool, notifier *remotedbserver.Events) { @@ -474,95 +472,10 @@ func BlockchainRuntimeConfig(config *ethconfig.Config) vm.Config { return vmConfig } -// func makeExtraData(extra []byte) []byte { -// if len(extra) == 0 { -// // create default extradata -// extra, _ = rlp.EncodeToBytes([]interface{}{ -// uint(params.VersionMajor<<16 | params.VersionMinor<<8 | params.VersionMicro), -// "turbo-geth", -// runtime.GOOS, -// }) -// } -// if uint64(len(extra)) > params.MaximumExtraDataSize { -// log.Warn("Miner extra data exceed limit", "extra", hexutil.Bytes(extra), "limit", params.MaximumExtraDataSize) -// extra = nil -// } -// return extra -// } - func (s *Ethereum) APIs() []rpc.API { return []rpc.API{} } -/* -// APIs return the collection of RPC services the ethereum package offers. -// NOTE, some of these services probably need to be moved to somewhere else. -func (s *Ethereum) APIs() []rpc.API { - if s.APIBackend == nil { - return []rpc.API{} - } - apis := ethapi.GetAPIs(s.APIBackend) - - // Append any APIs exposed explicitly by the consensus engine - //apis = append(apis, s.engine.APIs(s.BlockChain())...) - - // Append all the local APIs and return - return append(apis, []rpc.API{ - //{ - // Namespace: "eth", - // Version: "1.0", - // Service: NewPublicEthereumAPI(s), - // Public: true, - //}, - //{ - // Namespace: "eth", - // Version: "1.0", - // Service: NewPublicMinerAPI(s), - // Public: true, - //}, - //{ - // Namespace: "eth", - // Version: "1.0", - // Service: downloader.NewPublicDownloaderAPI(s.handler.downloader, s.eventMux), - // Public: true, - //}, - //{ - // Namespace: "miner", - // Version: "1.0", - // Service: NewPrivateMinerAPI(s), - // Public: false, - //}, - //{ - // Namespace: "eth", - // Version: "1.0", - // Service: filters.NewPublicFilterAPI(s.APIBackend, 5*time.Minute), - // Public: true, - //}, - //{ - // Namespace: "admin", - // Version: "1.0", - // Service: NewPrivateAdminAPI(s), - //}, - //{ - // Namespace: "debug", - // Version: "1.0", - // Service: NewPublicDebugAPI(s), - // Public: true, - //}, { - // Namespace: "debug", - // Version: "1.0", - // Service: NewPrivateDebugAPI(s), - //}, - { - Namespace: "net", - Version: "1.0", - Service: s.netRPCService, - Public: true, - }, - }...) -} -*/ - func (s *Ethereum) Etherbase() (eb common.Address, err error) { s.lock.RLock() etherbase := s.etherbase @@ -793,7 +706,6 @@ func (s *Ethereum) Start() error { if s.config.EnableDownloadV2 { go download.RecvMessage(s.downloadV2Ctx, s.sentries[0], s.downloadServer.HandleInboundMessage) go download.RecvUploadMessage(s.downloadV2Ctx, s.sentries[0], s.downloadServer.HandleInboundMessage) - go download.RecvTxMessage(s.downloadV2Ctx, s.sentries[0], s.txPoolServer.HandleInboundMessage) go download.Loop(s.downloadV2Ctx, s.chainDB, s.stagedSync2, s.downloadServer, s.events) } else { // Start the networking layer and the light server if requested diff --git a/cmd/headers/download/tx_pool.go b/eth/protocols/eth/tx_pool.go similarity index 89% rename from cmd/headers/download/tx_pool.go rename to eth/protocols/eth/tx_pool.go index 38882bf4e7b33e3ba6c52403f68f80f60879498b..4a75554f82dbafeb41f71f1968bd5047dead5fd5 100644 --- a/cmd/headers/download/tx_pool.go +++ b/eth/protocols/eth/tx_pool.go @@ -1,4 +1,4 @@ -package download +package eth import ( "context" @@ -11,7 +11,6 @@ import ( "github.com/ledgerwatch/turbo-geth/common" "github.com/ledgerwatch/turbo-geth/core" "github.com/ledgerwatch/turbo-geth/eth/fetcher" - "github.com/ledgerwatch/turbo-geth/eth/protocols/eth" "github.com/ledgerwatch/turbo-geth/gointerfaces" proto_sentry "github.com/ledgerwatch/turbo-geth/gointerfaces/sentry" "github.com/ledgerwatch/turbo-geth/log" @@ -20,13 +19,15 @@ import ( ) type TxPoolServer struct { + ctx context.Context sentries []proto_sentry.SentryClient txPool *core.TxPool TxFetcher *fetcher.TxFetcher } -func NewTxPoolServer(sentries []proto_sentry.SentryClient, txPool *core.TxPool) (*TxPoolServer, error) { +func NewTxPoolServer(ctx context.Context, sentries []proto_sentry.SentryClient, txPool *core.TxPool) (*TxPoolServer, error) { cs := &TxPoolServer{ + ctx: ctx, sentries: sentries, txPool: txPool, } @@ -34,8 +35,12 @@ func NewTxPoolServer(sentries []proto_sentry.SentryClient, txPool *core.TxPool) return cs, nil } +func (tp *TxPoolServer) Start() { + go RecvTxMessage(tp.ctx, tp.sentries[0], tp.HandleInboundMessage) +} + func (tp *TxPoolServer) newPooledTransactionHashes(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry proto_sentry.SentryClient) error { - var query eth.NewPooledTransactionHashesPacket + var query NewPooledTransactionHashesPacket if err := rlp.DecodeBytes(inreq.Data, &query); err != nil { return fmt.Errorf("decoding NewPooledTransactionHashesPacket: %v, data: %x", err, inreq.Data) } @@ -43,7 +48,7 @@ func (tp *TxPoolServer) newPooledTransactionHashes(ctx context.Context, inreq *p } func (tp *TxPoolServer) pooledTransactions(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry proto_sentry.SentryClient) error { - var query eth.PooledTransactionsPacket66 + var query PooledTransactionsPacket66 if err := rlp.DecodeBytes(inreq.Data, &query); err != nil { return fmt.Errorf("decoding PooledTransactionsPacket66: %v, data: %x", err, inreq.Data) } @@ -55,7 +60,7 @@ func (tp *TxPoolServer) transactions(ctx context.Context, inreq *proto_sentry.In if tp.txPool == nil { return nil } - var query eth.TransactionsPacket + var query TransactionsPacket if err := rlp.DecodeBytes(inreq.Data, &query); err != nil { return fmt.Errorf("decoding TransactionsPacket: %v, data: %x", err, inreq.Data) } @@ -66,12 +71,12 @@ func (tp *TxPoolServer) getPooledTransactions(ctx context.Context, inreq *proto_ if tp.txPool == nil { return nil } - var query eth.GetPooledTransactionsPacket66 + var query GetPooledTransactionsPacket66 if err := rlp.DecodeBytes(inreq.Data, &query); err != nil { return fmt.Errorf("decoding GetPooledTransactionsPacket66: %v, data: %x", err, inreq.Data) } - _, txs := eth.AnswerGetPooledTransactions(tp.txPool, query.GetPooledTransactionsPacket) - b, err := rlp.EncodeToBytes(ð.PooledTransactionsRLPPacket66{ + _, txs := AnswerGetPooledTransactions(tp.txPool, query.GetPooledTransactionsPacket) + b, err := rlp.EncodeToBytes(&PooledTransactionsRLPPacket66{ RequestId: query.RequestId, PooledTransactionsRLPPacket: txs, }) @@ -91,7 +96,7 @@ func (tp *TxPoolServer) getPooledTransactions(ctx context.Context, inreq *proto_ } func (tp *TxPoolServer) SendTxsRequest(ctx context.Context, peerID string, hashes []common.Hash) []byte { - bytes, err := rlp.EncodeToBytes(ð.GetPooledTransactionsPacket66{ + bytes, err := rlp.EncodeToBytes(&GetPooledTransactionsPacket66{ RequestId: rand.Uint64(), //nolint:gosec GetPooledTransactionsPacket: hashes, }) diff --git a/eth/stagedsync/stage_txpool.go b/eth/stagedsync/stage_txpool.go index 5881554d2068de08e451d95428b588d4b80a1945..da7e83fd3bcd5cc1e58606f7f5f5852341f4c2f0 100644 --- a/eth/stagedsync/stage_txpool.go +++ b/eth/stagedsync/stage_txpool.go @@ -15,14 +15,16 @@ import ( ) type TxPoolCfg struct { - db ethdb.RwKV - pool *core.TxPool + db ethdb.RwKV + pool *core.TxPool + startFunc func() } -func StageTxPoolCfg(db ethdb.RwKV, pool *core.TxPool) TxPoolCfg { +func StageTxPoolCfg(db ethdb.RwKV, pool *core.TxPool, startFunc func()) TxPoolCfg { return TxPoolCfg{ - db: db, - pool: pool, + db: db, + pool: pool, + startFunc: startFunc, } } @@ -59,6 +61,9 @@ func SpawnTxPool(s *StageState, tx ethdb.RwTx, cfg TxPoolCfg, quitCh <-chan stru if err := cfg.pool.Start(headHeader.GasLimit, to); err != nil { return fmt.Errorf("%s: start pool phase 1: %w", logPrefix, err) } + if cfg.startFunc != nil { + cfg.startFunc() + } } if cfg.pool != nil && cfg.pool.IsStarted() && s.BlockNumber > 0 { if err := incrementalTxPoolUpdate(logPrefix, s.BlockNumber, to, cfg.pool, tx, quitCh); err != nil { diff --git a/eth/stagedsync/stagebuilder.go b/eth/stagedsync/stagebuilder.go index c289c238a3af051929a40ec79b094e6d3adab531..a5c192451d30ff77c231b66d18b43694c4f39bce 100644 --- a/eth/stagedsync/stagebuilder.go +++ b/eth/stagedsync/stagebuilder.go @@ -343,7 +343,7 @@ func DefaultStages() StageBuilders { { ID: stages.TxPool, Build: func(world StageParameters) *Stage { - txPoolCfg := StageTxPoolCfg(world.DB.RwKV(), world.txPool) + txPoolCfg := StageTxPoolCfg(world.DB.RwKV(), world.txPool, nil /* startFunc */) return &Stage{ ID: stages.TxPool, Description: "Update transaction pool",