diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index 48a1de2f1081722fd8a5ecae49d4a97fa6b0700f..dc41c631444bf36a4876e4bad1687ad52bae98c1 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -963,7 +963,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig) miningSync := stagedsync.New( stagedsync.MiningStages(ctx, - stagedsync.StageMiningCreateBlockCfg(db, miner, *chainConfig, engine, txPool, tmpdir), + stagedsync.StageMiningCreateBlockCfg(db, miner, *chainConfig, engine, txPool, nil, nil, tmpdir), stagedsync.StageMiningExecCfg(db, miner, events, *chainConfig, engine, &vm.Config{}, tmpdir), stagedsync.StageHashStateCfg(db, tmpdir), stagedsync.StageTrieCfg(db, false, true, tmpdir), diff --git a/cmd/integration/commands/state_stages.go b/cmd/integration/commands/state_stages.go index 6428486b46fb5c86b7bb67421f94ef47fc345138..77990cf5dbdd95c614a4584902f08d1bae49486b 100644 --- a/cmd/integration/commands/state_stages.go +++ b/cmd/integration/commands/state_stages.go @@ -317,6 +317,7 @@ func syncBySmallSteps(db kv.RwDB, miningConfig params.MiningConfig, ctx context. *chainConfig, engine, txPool, + nil, nil, tmpDir), quit) if err != nil { diff --git a/cmd/rpcdaemon/cli/config.go b/cmd/rpcdaemon/cli/config.go index 28f2b82be0c92ba74b4ad2cd44297d97531b0c8e..bd739c75324b1d10b80405a9a12d44f3b6e38172 100644 --- a/cmd/rpcdaemon/cli/config.go +++ b/cmd/rpcdaemon/cli/config.go @@ -88,7 +88,7 @@ func RootCommand() (*cobra.Command, *Flags) { rootCmd.PersistentFlags().UintVar(&cfg.RpcBatchConcurrency, "rpc.batch.concurrency", 50, "Does limit amount of goroutines to process 1 batch request. Means 1 bach request can't overload server. 1 batch still can have unlimited amount of request") rootCmd.PersistentFlags().BoolVar(&cfg.TraceCompatibility, "trace.compat", false, "Bug for bug compatibility with OE for trace_ routines") rootCmd.PersistentFlags().BoolVar(&cfg.TxPoolV2, "txpool.v2", false, "experimental external txpool") - rootCmd.PersistentFlags().StringVar(&cfg.TxPoolApiAddr, "txpool.api.addr", "127.0.0.1:9094", "txpool api network address, for example: 127.0.0.1:9094") + rootCmd.PersistentFlags().StringVar(&cfg.TxPoolApiAddr, "txpool.api.addr", "127.0.0.1:9090", "txpool api network address, for example: 127.0.0.1:9090") if err := rootCmd.MarkPersistentFlagFilename("rpc.accessList", "json"); err != nil { panic(err) diff --git a/cmd/rpcdaemon/commands/txpool_api.go b/cmd/rpcdaemon/commands/txpool_api.go index a0002134b25b72922ce0ad51a3807ec5193fadf2..daf90abc2a4055a5918d68120cac3092d7931c03 100644 --- a/cmd/rpcdaemon/commands/txpool_api.go +++ b/cmd/rpcdaemon/commands/txpool_api.go @@ -47,6 +47,7 @@ func (api *TxPoolAPIImpl) Content(ctx context.Context) (map[string]map[string]ma } pending := make(map[common.Address][]types.Transaction, 8) + baseFee := make(map[common.Address][]types.Transaction, 8) queued := make(map[common.Address][]types.Transaction, 8) for i := range reply.Txs { stream := rlp.NewStream(bytes.NewReader(reply.Txs[i].RlpTx), 0) @@ -61,6 +62,11 @@ func (api *TxPoolAPIImpl) Content(ctx context.Context) (map[string]map[string]ma pending[addr] = make([]types.Transaction, 0, 4) } pending[addr] = append(pending[addr], txn) + case proto_txpool.AllReply_BASE_FEE: + if _, ok := baseFee[addr]; !ok { + baseFee[addr] = make([]types.Transaction, 0, 4) + } + baseFee[addr] = append(baseFee[addr], txn) case proto_txpool.AllReply_QUEUED: if _, ok := queued[addr]; !ok { queued[addr] = make([]types.Transaction, 0, 4) @@ -88,6 +94,14 @@ func (api *TxPoolAPIImpl) Content(ctx context.Context) (map[string]map[string]ma } content["pending"][account.Hex()] = dump } + // Flatten the baseFee transactions + for account, txs := range baseFee { + dump := make(map[string]*RPCTransaction) + for _, txn := range txs { + dump[fmt.Sprintf("%d", txn.GetNonce())] = newRPCPendingTransaction(txn, curHeader, cc) + } + content["baseFee"][account.Hex()] = dump + } // Flatten the queued transactions for account, txs := range queued { dump := make(map[string]*RPCTransaction) @@ -107,6 +121,7 @@ func (api *TxPoolAPIImpl) Status(ctx context.Context) (map[string]hexutil.Uint, } return map[string]hexutil.Uint{ "pending": hexutil.Uint(reply.PendingCount), + "baseFee": hexutil.Uint(reply.BaseFeeCount), "queued": hexutil.Uint(reply.QueuedCount), }, nil } diff --git a/cmd/rpcdaemon/commands/txpool_api_test.go b/cmd/rpcdaemon/commands/txpool_api_test.go index a5db23e4c5085424c5b82e94381b252d34ccb2d5..4162bb548359519c77ec1906d0357def1f3cf649 100644 --- a/cmd/rpcdaemon/commands/txpool_api_test.go +++ b/cmd/rpcdaemon/commands/txpool_api_test.go @@ -83,7 +83,7 @@ func TestTxPoolContent(t *testing.T) { status, err := api.Status(ctx) require.NoError(err) - require.Len(status, 2) + require.Len(status, 3) require.Equal(status["pending"], hexutil.Uint(1)) require.Equal(status["queued"], hexutil.Uint(0)) } diff --git a/cmd/txpool/main.go b/cmd/txpool/main.go index ca5df26209d1f2d66d350b53fb752bc153e2f3c3..7fc59661c12c6d1acd97c31d2bd8ccfa37e6b912 100644 --- a/cmd/txpool/main.go +++ b/cmd/txpool/main.go @@ -4,16 +4,16 @@ import ( "fmt" "os" "path" + "time" "github.com/ledgerwatch/erigon-lib/direct" "github.com/ledgerwatch/erigon-lib/gointerfaces" "github.com/ledgerwatch/erigon-lib/gointerfaces/grpcutil" "github.com/ledgerwatch/erigon-lib/gointerfaces/remote" proto_sentry "github.com/ledgerwatch/erigon-lib/gointerfaces/sentry" - "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon-lib/kv/mdbx" "github.com/ledgerwatch/erigon-lib/kv/remotedb" "github.com/ledgerwatch/erigon-lib/txpool" + "github.com/ledgerwatch/erigon-lib/txpool/txpooluitl" "github.com/ledgerwatch/erigon/cmd/utils" "github.com/ledgerwatch/erigon/common/paths" "github.com/ledgerwatch/erigon/ethdb/remotedbserver" @@ -58,6 +58,7 @@ var rootCmd = &cobra.Command{ debug.Exit() }, RunE: func(cmd *cobra.Command, args []string) error { + ctx := cmd.Context() creds, err := grpcutil.TLS(TLSCACert, TLSCertfile, TLSKeyFile) if err != nil { return fmt.Errorf("could not connect to remoteKv: %w", err) @@ -74,13 +75,8 @@ var rootCmd = &cobra.Command{ } log.Info("TxPool started", "db", path.Join(datadir, "txpool")) - txPoolDB, err := mdbx.NewMDBX(log.New()).Path(path.Join(datadir, "txpool")).WithTablessCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { return kv.TxpoolTablesCfg }).Open() - if err != nil { - return err - } - sentryClients := make([]txpool.SentryClient, len(sentryAddr)) - sentryClientsCasted := make([]proto_sentry.SentryClient, len(sentryAddr)) + sentryClients := make([]direct.SentryClient, len(sentryAddr)) for i := range sentryAddr { creds, err := grpcutil.TLS(TLSCACert, TLSCertfile, TLSKeyFile) if err != nil { @@ -92,21 +88,22 @@ var rootCmd = &cobra.Command{ } sentryClients[i] = direct.NewSentryClientRemote(proto_sentry.NewSentryClient(sentryConn)) - sentryClientsCasted[i] = proto_sentry.SentryClient(sentryClients[i]) } + cfg := txpool.DefaultConfig + cfg.DBDir = path.Join(datadir, "txpool") + cfg.LogEvery = 5 * time.Minute + cfg.CommitEvery = 5 * time.Minute + newTxs := make(chan txpool.Hashes, 1024) - txPool, err := txpool.New(newTxs, txPoolDB, coreDB, txpool.DefaultConfig) + defer close(newTxs) + txPoolDB, txPool, fetch, send, txpoolGrpcServer, err := txpooluitl.AllComponents(ctx, cfg, newTxs, coreDB, sentryClients, kvClient) if err != nil { return err } + fetch.ConnectCore() + fetch.ConnectSentries() - fetcher := txpool.NewFetch(cmd.Context(), sentryClientsCasted, txPool, kvClient, coreDB, txPoolDB) - fetcher.ConnectCore() - fetcher.ConnectSentries() - - send := txpool.NewSend(cmd.Context(), sentryClients, txPool) - txpoolGrpcServer := txpool.NewGrpcServer(cmd.Context(), txPool, txPoolDB) /* var ethashApi *ethash.API if casted, ok := backend.engine.(*ethash.Ethash); ok { @@ -120,7 +117,8 @@ var rootCmd = &cobra.Command{ return err } - txpool.MainLoop(cmd.Context(), txPoolDB, coreDB, txPool, newTxs, send, txpoolGrpcServer.NewSlotsStreams) + notifyMiner := func() {} + txpool.MainLoop(cmd.Context(), txPoolDB, coreDB, txPool, newTxs, send, txpoolGrpcServer.NewSlotsStreams, notifyMiner) grpcServer.GracefulStop() return nil diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 0aaecb58f3b7490bd05b24abe383314c876653bc..e8a8f03a67bd8e2c6f89a38bdc69926d8566eec7 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -918,7 +918,7 @@ func setGPOCobra(f *pflag.FlagSet, cfg *gasprice.Config) { func setTxPool(ctx *cli.Context, cfg *core.TxPoolConfig) { if ctx.GlobalIsSet(TxPoolV2Flag.Name) { - cfg.Disable = true + cfg.V2 = true } if ctx.GlobalIsSet(TxPoolLocalsFlag.Name) { locals := strings.Split(ctx.GlobalString(TxPoolLocalsFlag.Name), ",") diff --git a/core/tx_pool.go b/core/tx_pool.go index 2803a78f3a0c98e1707cc419919f11ee23cae0bd..4d861f4df31f36a62f22af288a4c0f3bb3b07ee2 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -134,6 +134,7 @@ const ( // TxPoolConfig are the configuration parameters of the transaction pool. type TxPoolConfig struct { Disable bool + V2 bool Locals []common.Address // Addresses that should be treated by default as local NoLocals bool // Whether local transaction handling should be disabled Journal string // Journal of local transactions to survive node restarts diff --git a/eth/backend.go b/eth/backend.go index 1c876ad6bebb9f5847ca9f6abcba6760fb2a7d16..203f33d1a05ab49eafed74f472e9b0af085d429e 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -35,6 +35,8 @@ import ( "github.com/ledgerwatch/erigon-lib/gointerfaces/sentry" txpool_proto "github.com/ledgerwatch/erigon-lib/gointerfaces/txpool" "github.com/ledgerwatch/erigon-lib/kv" + txpool2 "github.com/ledgerwatch/erigon-lib/txpool" + "github.com/ledgerwatch/erigon-lib/txpool/txpooluitl" "github.com/ledgerwatch/erigon/cmd/sentry/download" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/debug" @@ -83,7 +85,7 @@ type Ethereum struct { txPool *core.TxPool // DB interfaces - chainKV kv.RwDB + chainDB kv.RwDB privateAPI *grpc.Server engine consensus.Engine @@ -116,6 +118,14 @@ type Ethereum struct { waitForStageLoopStop chan struct{} waitForMiningStop chan struct{} + + txPool2DB kv.RwDB + txPool2 *txpool2.TxPool + newTxs2 chan txpool2.Hashes + txPool2Fetch *txpool2.Fetch + txPool2Send *txpool2.Send + txPool2GrpcServer *txpool2.GrpcServer + notifyMiningAboutNewTxs chan struct{} } // New creates a new Ethereum object (including the @@ -201,7 +211,7 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere downloadCancel: ctxCancel, config: config, logger: logger, - chainKV: chainKv, + chainDB: chainKv, networkID: config.NetworkID, etherbase: config.Miner.Etherbase, torrentClient: torrentClient, @@ -281,40 +291,6 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere } } - var ethashApi *ethash.API - if casted, ok := backend.engine.(*ethash.Ethash); ok { - ethashApi = casted.APIs(nil)[1].Service.(*ethash.API) - } - - ethBackendRPC := privateapi.NewEthBackendServer(ctx, backend, backend.notifications.Events) - var txPoolRPC txpool_proto.TxpoolServer - var miningRPC txpool_proto.MiningServer - if !config.TxPool.Disable { - txPoolRPC = privateapi.NewTxPoolServer(ctx, backend.txPool) - miningRPC = privateapi.NewMiningServer(ctx, backend, ethashApi) - } - if stack.Config().PrivateApiAddr != "" { - var creds credentials.TransportCredentials - if stack.Config().TLSConnection { - creds, err = grpcutil.TLS(stack.Config().TLSCACert, stack.Config().TLSCertFile, stack.Config().TLSKeyFile) - if err != nil { - return nil, err - } - } - backend.privateAPI, err = privateapi.StartGrpc( - kvRPC, - ethBackendRPC, - txPoolRPC, - miningRPC, - stack.Config().PrivateApiAddr, - stack.Config().PrivateApiRateLimit, - creds) - if err != nil { - return nil, err - } - - } - if len(stack.Config().P2P.SentryAddr) > 0 { for _, addr := range stack.Config().P2P.SentryAddr { sentryClient, err := download.GrpcSentryClient(backend.downloadCtx, addr) @@ -326,7 +302,7 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere } else { var readNodeInfo = func() *eth.NodeInfo { var res *eth.NodeInfo - _ = backend.chainKV.View(context.Background(), func(tx kv.Tx) error { + _ = backend.chainDB.View(context.Background(), func(tx kv.Tx) error { res = eth.ReadNodeInfo(tx, backend.chainConfig, backend.genesisHash, backend.networkID) return nil }) @@ -397,7 +373,23 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere } } - if !config.TxPool.Disable { + var txPoolRPC txpool_proto.TxpoolServer + var miningRPC txpool_proto.MiningServer + if config.TxPool.V2 { + cfg := txpool2.DefaultConfig + cfg.DBDir = path.Join(stack.Config().DataDir, "txpool") + cfg.LogEvery = 5 * time.Minute + cfg.CommitEvery = 5 * time.Minute + + stateDiffClient := direct.NewStateDiffClientDirect(kvRPC) + backend.newTxs2 = make(chan txpool2.Hashes, 1024) + //defer close(newTxs) + backend.txPool2DB, backend.txPool2, backend.txPool2Fetch, backend.txPool2Send, backend.txPool2GrpcServer, err = txpooluitl.AllComponents(ctx, cfg, backend.newTxs2, backend.chainDB, backend.sentries, stateDiffClient) + if err != nil { + return nil, err + } + txPoolRPC = backend.txPool2GrpcServer + } else { backend.txPoolP2PServer, err = txpool.NewP2PServer(backend.downloadCtx, backend.sentries, backend.txPool) if err != nil { return nil, err @@ -409,58 +401,115 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere } backend.txPoolP2PServer.TxFetcher = fetcher.NewTxFetcher(backend.txPool.Has, backend.txPool.AddRemotes, fetchTx) + txPoolRPC = privateapi.NewTxPoolServer(ctx, backend.txPool) + } - backend.quitMining = make(chan struct{}) - backend.miningSealingQuit = make(chan struct{}) - backend.pendingBlocks = make(chan *types.Block, 1) - backend.minedBlocks = make(chan *types.Block, 1) + backend.notifyMiningAboutNewTxs = make(chan struct{}, 1) + backend.quitMining = make(chan struct{}) + backend.miningSealingQuit = make(chan struct{}) + backend.pendingBlocks = make(chan *types.Block, 1) + backend.minedBlocks = make(chan *types.Block, 1) - miner := stagedsync.NewMiningState(&config.Miner) - backend.pendingBlocks = miner.PendingResultCh - backend.minedBlocks = miner.MiningResultCh + miner := stagedsync.NewMiningState(&config.Miner) + backend.pendingBlocks = miner.PendingResultCh + backend.minedBlocks = miner.MiningResultCh - mining := stagedsync.New( - stagedsync.MiningStages(backend.downloadCtx, - stagedsync.StageMiningCreateBlockCfg(backend.chainKV, miner, *backend.chainConfig, backend.engine, backend.txPool, tmpdir), - stagedsync.StageMiningExecCfg(backend.chainKV, miner, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir), - stagedsync.StageHashStateCfg(backend.chainKV, tmpdir), - stagedsync.StageTrieCfg(backend.chainKV, false, true, tmpdir), - stagedsync.StageMiningFinishCfg(backend.chainKV, *backend.chainConfig, backend.engine, miner, backend.miningSealingQuit), - ), stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder) + mining := stagedsync.New( + stagedsync.MiningStages(backend.downloadCtx, + stagedsync.StageMiningCreateBlockCfg(backend.chainDB, miner, *backend.chainConfig, backend.engine, backend.txPool, backend.txPool2, backend.txPool2DB, tmpdir), + stagedsync.StageMiningExecCfg(backend.chainDB, miner, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir), + stagedsync.StageHashStateCfg(backend.chainDB, tmpdir), + stagedsync.StageTrieCfg(backend.chainDB, false, true, tmpdir), + stagedsync.StageMiningFinishCfg(backend.chainDB, *backend.chainConfig, backend.engine, miner, backend.miningSealingQuit), + ), stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder) - go txpropagate.BroadcastPendingTxsToNetwork(backend.downloadCtx, backend.txPool, backend.txPoolP2PServer.RecentPeers, backend.downloadServer) + var ethashApi *ethash.API + if casted, ok := backend.engine.(*ethash.Ethash); ok { + ethashApi = casted.APIs(nil)[1].Service.(*ethash.API) + } + + ethBackendRPC := privateapi.NewEthBackendServer(ctx, backend, backend.notifications.Events) + miningRPC = privateapi.NewMiningServer(ctx, backend, ethashApi) + if stack.Config().PrivateApiAddr != "" { + var creds credentials.TransportCredentials + if stack.Config().TLSConnection { + creds, err = grpcutil.TLS(stack.Config().TLSCACert, stack.Config().TLSCertFile, stack.Config().TLSKeyFile) + if err != nil { + return nil, err + } + } + backend.privateAPI, err = privateapi.StartGrpc( + kvRPC, + ethBackendRPC, + txPoolRPC, + miningRPC, + stack.Config().PrivateApiAddr, + stack.Config().PrivateApiRateLimit, + creds) + if err != nil { + return nil, err + } + } + if config.TxPool.V2 { + backend.txPool2Fetch.ConnectCore() + backend.txPool2Fetch.ConnectSentries() + go txpool2.MainLoop(backend.downloadCtx, backend.txPool2DB, backend.chainDB, backend.txPool2, backend.newTxs2, backend.txPool2Send, backend.txPool2GrpcServer.NewSlotsStreams, func() { + select { + case backend.notifyMiningAboutNewTxs <- struct{}{}: + default: + } + }) + } else { + go txpropagate.BroadcastPendingTxsToNetwork(backend.downloadCtx, backend.txPool, backend.txPoolP2PServer.RecentPeers, backend.downloadServer) go func() { - defer debug.LogPanic() + newTransactions := make(chan core.NewTxsEvent, 128) + sub := backend.txPool.SubscribeNewTxsEvent(newTransactions) + defer sub.Unsubscribe() + defer close(newTransactions) for { select { - case b := <-backend.minedBlocks: - //p2p - //backend.downloadServer.BroadcastNewBlock(context.Background(), b, b.Difficulty()) - //rpcdaemon - if err := miningRPC.(*privateapi.MiningServer).BroadcastMinedBlock(b); err != nil { - log.Error("txpool rpc mined block broadcast", "err", err) - } - - case b := <-backend.pendingBlocks: - if err := miningRPC.(*privateapi.MiningServer).BroadcastPendingBlock(b); err != nil { - log.Error("txpool rpc pending block broadcast", "err", err) - } - case <-backend.quitMining: + case <-ctx.Done(): return + case <-newTransactions: + select { + case backend.notifyMiningAboutNewTxs <- struct{}{}: + default: + } } } }() + } + go func() { + defer debug.LogPanic() + for { + select { + case b := <-backend.minedBlocks: + //p2p + //backend.downloadServer.BroadcastNewBlock(context.Background(), b, b.Difficulty()) + //rpcdaemon + if err := miningRPC.(*privateapi.MiningServer).BroadcastMinedBlock(b); err != nil { + log.Error("txpool rpc mined block broadcast", "err", err) + } - if err := backend.StartMining(context.Background(), backend.chainKV, mining, backend.config.Miner, backend.gasPrice, backend.quitMining); err != nil { - return nil, err + case b := <-backend.pendingBlocks: + if err := miningRPC.(*privateapi.MiningServer).BroadcastPendingBlock(b); err != nil { + log.Error("txpool rpc pending block broadcast", "err", err) + } + case <-backend.quitMining: + return + } } + }() + + if err := backend.StartMining(context.Background(), backend.chainDB, mining, backend.config.Miner, backend.gasPrice, backend.quitMining); err != nil { + return nil, err } backend.stagedSync, err = stages2.NewStagedSync( backend.downloadCtx, backend.logger, - backend.chainKV, + backend.chainDB, stack.Config().P2P, *config, backend.downloadServer, @@ -543,7 +592,7 @@ func (s *Ethereum) shouldPreserve(block *types.Block) bool { //nolint // StartMining starts the miner with the given number of CPU threads. If mining // is already running, this method adjust the number of threads allowed to use // and updates the minimum price required by the transaction pool. -func (s *Ethereum) StartMining(ctx context.Context, kv kv.RwDB, mining *stagedsync.Sync, cfg params.MiningConfig, gasPrice *uint256.Int, quitCh chan struct{}) error { +func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, mining *stagedsync.Sync, cfg params.MiningConfig, gasPrice *uint256.Int, quitCh chan struct{}) error { if !cfg.Enabled { return nil } @@ -566,8 +615,8 @@ func (s *Ethereum) StartMining(ctx context.Context, kv kv.RwDB, mining *stagedsy }) } - if s.chainConfig.ChainID.Uint64() != params.MainnetChainConfig.ChainID.Uint64() { - tx, err := kv.BeginRo(context.Background()) + if s.chainConfig.ChainID.Uint64() != params.MainnetChainConfig.ChainID.Uint64() && !s.config.TxPool.Disable { + tx, err := db.BeginRo(context.Background()) if err != nil { return err } @@ -576,8 +625,20 @@ func (s *Ethereum) StartMining(ctx context.Context, kv kv.RwDB, mining *stagedsy hh := rawdb.ReadCurrentHeader(tx) tx.Rollback() if hh != nil { - if err := s.txPool.Start(hh.GasLimit, execution); err != nil { - return err + if s.config.TxPool.V2 { + if err := s.txPool2DB.View(context.Background(), func(tx kv.Tx) error { + var baseFee uint64 + if hh.BaseFee != nil { + baseFee = hh.BaseFee.Uint64() + } + return s.txPool2.OnNewBlock(tx, nil, txpool2.TxSlots{}, txpool2.TxSlots{}, baseFee, hh.Number.Uint64(), hh.Hash()) + }); err != nil { + return err + } + } else { + if err := s.txPool.Start(hh.GasLimit, execution); err != nil { + return err + } } } } @@ -595,10 +656,6 @@ func (s *Ethereum) StartMining(ctx context.Context, kv kv.RwDB, mining *stagedsy go func() { defer debug.LogPanic() defer close(s.waitForMiningStop) - newTransactions := make(chan core.NewTxsEvent, 128) - sub := s.txPool.SubscribeNewTxsEvent(newTransactions) - defer sub.Unsubscribe() - defer close(newTransactions) var works bool var hasWork bool @@ -606,7 +663,7 @@ func (s *Ethereum) StartMining(ctx context.Context, kv kv.RwDB, mining *stagedsy for { select { - case <-newTransactions: + case <-s.notifyMiningAboutNewTxs: hasWork = true case err := <-errc: works = false @@ -617,15 +674,13 @@ func (s *Ethereum) StartMining(ctx context.Context, kv kv.RwDB, mining *stagedsy if err != nil { log.Warn("mining", "err", err) } - case <-sub.Err(): - return case <-quitCh: return } if !works && hasWork { works = true - go func() { errc <- stages2.MiningStep(ctx, kv, mining) }() + go func() { errc <- stages2.MiningStep(ctx, db, mining) }() } } }() @@ -635,8 +690,7 @@ func (s *Ethereum) StartMining(ctx context.Context, kv kv.RwDB, mining *stagedsy func (s *Ethereum) IsMining() bool { return s.config.Miner.Enabled } -func (s *Ethereum) TxPool() *core.TxPool { return s.txPool } -func (s *Ethereum) ChainKV() kv.RwDB { return s.chainKV } +func (s *Ethereum) ChainKV() kv.RwDB { return s.chainDB } func (s *Ethereum) NetVersion() (uint64, error) { return s.networkID, nil } func (s *Ethereum) NetPeerCount() (uint64, error) { var sentryPc uint64 = 0 @@ -680,7 +734,7 @@ func (s *Ethereum) Start() error { }(i) } - go stages2.StageLoop(s.downloadCtx, s.chainKV, s.stagedSync, s.downloadServer.Hd, s.notifications, s.downloadServer.UpdateHead, s.waitForStageLoopStop, s.config.SyncLoopThrottle) + go stages2.StageLoop(s.downloadCtx, s.chainDB, s.stagedSync, s.downloadServer.Hd, s.notifications, s.downloadServer.UpdateHead, s.waitForStageLoopStop, s.config.SyncLoopThrottle) return nil } @@ -721,6 +775,9 @@ func (s *Ethereum) Stop() error { for _, sentryServer := range s.sentryServers { sentryServer.Close() } - s.chainKV.Close() + s.chainDB.Close() + if s.config.TxPool.V2 { + s.txPool2DB.Close() + } return nil } diff --git a/eth/stagedsync/stage_mining_create_block.go b/eth/stagedsync/stage_mining_create_block.go index 8832a711e065bb08325ca290f316160678b86480..37d315a32efd1ced076a4e69a380be8636f4a65d 100644 --- a/eth/stagedsync/stage_mining_create_block.go +++ b/eth/stagedsync/stage_mining_create_block.go @@ -2,6 +2,7 @@ package stagedsync import ( "bytes" + "context" "errors" "fmt" "math/big" @@ -9,6 +10,7 @@ import ( mapset "github.com/deckarep/golang-set" "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon-lib/txpool" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/debug" "github.com/ledgerwatch/erigon/consensus" @@ -52,6 +54,8 @@ type MiningCreateBlockCfg struct { chainConfig params.ChainConfig engine consensus.Engine txPool *core.TxPool + txPool2 *txpool.TxPool + txPool2DB kv.RoDB tmpdir string } @@ -61,6 +65,8 @@ func StageMiningCreateBlockCfg( chainConfig params.ChainConfig, engine consensus.Engine, txPool *core.TxPool, + txPool2 *txpool.TxPool, + txPool2DB kv.RoDB, tmpdir string, ) MiningCreateBlockCfg { return MiningCreateBlockCfg{ @@ -69,6 +75,8 @@ func StageMiningCreateBlockCfg( chainConfig: chainConfig, engine: engine, txPool: txPool, + txPool2: txPool2, + txPool2DB: txPool2DB, tmpdir: tmpdir, } } @@ -76,14 +84,9 @@ func StageMiningCreateBlockCfg( // SpawnMiningCreateBlockStage //TODO: // - resubmitAdjustCh - variable is not implemented -func SpawnMiningCreateBlockStage(s *StageState, tx kv.RwTx, cfg MiningCreateBlockCfg, quit <-chan struct{}) error { - txPoolLocals := cfg.txPool.Locals() - pendingTxs, err := cfg.txPool.Pending() - if err != nil { - return err - } - +func SpawnMiningCreateBlockStage(s *StageState, tx kv.RwTx, cfg MiningCreateBlockCfg, quit <-chan struct{}) (err error) { current := cfg.miner.MiningBlock + txPoolLocals := cfg.txPool.Locals() coinbase := cfg.miner.MiningConfig.Etherbase const ( @@ -107,8 +110,62 @@ func SpawnMiningCreateBlockStage(s *StageState, tx kv.RwTx, cfg MiningCreateBloc log.Info(fmt.Sprintf("[%s] Start mine", logPrefix), "block", executionAt+1) blockNum := executionAt + 1 - signer := types.MakeSigner(&cfg.chainConfig, blockNum) + if cfg.txPool2 != nil { + txSlots := txpool.TxsRlp{} + if err = cfg.txPool2DB.View(context.Background(), func(tx kv.Tx) error { + if err := cfg.txPool2.Best(200, &txSlots, tx); err != nil { + return err + } + for i := 0; i < len(txSlots.Txs); i++ { + txSlots.Txs[i] = common.CopyBytes(txSlots.Txs[i]) // because we need this data outside of tx + } + return nil + }); err != nil { + return err + } + txs, err := types.UnmarshalTransactionsFromBinary(txSlots.Txs) + if err != nil { + return fmt.Errorf("decode rlp of pending txs: %w", err) + } + var sender common.Address + for i := range txs { + copy(sender[:], txSlots.Senders.At(i)) + txs[i].SetSender(sender) + } + current.RemoteTxs = types.NewTransactionsFixedOrder(txs) + // txpool v2 - doesn't prioritise local txs over remote + current.LocalTxs = types.NewTransactionsFixedOrder(nil) + } else { + pendingTxs, err := cfg.txPool.Pending() + if err != nil { + return err + } + // Split the pending transactions into locals and remotes + localTxs, remoteTxs := types.TransactionsGroupedBySender{}, types.TransactionsGroupedBySender{} + signer := types.MakeSigner(&cfg.chainConfig, blockNum) + for _, txs := range pendingTxs { + if len(txs) == 0 { + continue + } + from, _ := txs[0].Sender(*signer) + isLocal := false + for _, local := range txPoolLocals { + if local == from { + isLocal = true + break + } + } + if isLocal { + localTxs = append(localTxs, txs) + } else { + remoteTxs = append(remoteTxs, txs) + } + } + + current.LocalTxs = types.NewTransactionsByPriceAndNonce(*signer, localTxs) + current.RemoteTxs = types.NewTransactionsByPriceAndNonce(*signer, remoteTxs) + } localUncles, remoteUncles, err := readNonCanonicalHeaders(tx, blockNum, cfg.engine, coinbase, txPoolLocals) if err != nil { return err @@ -261,31 +318,6 @@ func SpawnMiningCreateBlockStage(s *StageState, tx kv.RwTx, cfg MiningCreateBloc current.Header = header current.Uncles = makeUncles(env.uncles) - - // Split the pending transactions into locals and remotes - localTxs, remoteTxs := types.TransactionsGroupedBySender{}, types.TransactionsGroupedBySender{} - for _, txs := range pendingTxs { - if len(txs) == 0 { - continue - } - from, _ := txs[0].Sender(*signer) - isLocal := false - for _, local := range txPoolLocals { - if local == from { - isLocal = true - break - } - } - - if isLocal { - localTxs = append(localTxs, txs) - } else { - remoteTxs = append(remoteTxs, txs) - } - } - - current.LocalTxs = types.NewTransactionsByPriceAndNonce(*signer, localTxs) - current.RemoteTxs = types.NewTransactionsByPriceAndNonce(*signer, remoteTxs) return nil } diff --git a/eth/stagedsync/stage_mining_finish.go b/eth/stagedsync/stage_mining_finish.go index 385437f4ef6448ac0019cfdf2f89848ad462f357..a24b8136d4ff72b721848318712c579162770a47 100644 --- a/eth/stagedsync/stage_mining_finish.go +++ b/eth/stagedsync/stage_mining_finish.go @@ -36,6 +36,7 @@ func StageMiningFinishCfg( func SpawnMiningFinishStage(s *StageState, tx kv.RwTx, cfg MiningFinishCfg, quit <-chan struct{}) error { logPrefix := s.LogPrefix() + log.Info(fmt.Sprintf("[%s] start", logPrefix)) current := cfg.miningState.MiningBlock // Short circuit when receiving duplicate result caused by resubmitting. diff --git a/go.mod b/go.mod index 3a81356113b5dee2bfa32fe848e851f7a85f597e..b52d50cd0f4c7b320405e166bfc4df21b19f9311 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.16 require ( github.com/RoaringBitmap/roaring v0.9.4 - github.com/VictoriaMetrics/fastcache v1.5.7 + github.com/VictoriaMetrics/fastcache v1.6.0 github.com/VictoriaMetrics/metrics v1.17.3 github.com/anacrolix/log v0.8.0 github.com/anacrolix/torrent v1.25.1 @@ -37,7 +37,7 @@ require ( github.com/julienschmidt/httprouter v1.3.0 github.com/kevinburke/go-bindata v3.21.0+incompatible github.com/kylelemons/godebug v1.1.0 // indirect - github.com/ledgerwatch/erigon-lib v0.0.0-20210903061352-2c1e2b0ec467 + github.com/ledgerwatch/erigon-lib v0.0.0-20210908053037-c4efc0ea3a6c github.com/ledgerwatch/log/v3 v3.3.0 github.com/ledgerwatch/secp256k1 v0.0.0-20210626115225-cd5cd00ed72d github.com/logrusorgru/aurora/v3 v3.0.0 diff --git a/go.sum b/go.sum index f725d0c9969031bb0c7906e98e2980e57cb6284c..458193187315a140ea20abb89d39252ded18ce0d 100644 --- a/go.sum +++ b/go.sum @@ -65,8 +65,8 @@ github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWX github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/StackExchange/wmi v1.2.1 h1:VIkavFPXSjcnS+O8yTq7NI32k0R5Aj+v39y29VYDOSA= github.com/StackExchange/wmi v1.2.1/go.mod h1:rcmrprowKIVzvc+NUiLncP2uuArMWLCbu9SBzvHz7e8= -github.com/VictoriaMetrics/fastcache v1.5.7 h1:4y6y0G8PRzszQUYIQHHssv/jgPHAb5qQuuDNdCbyAgw= -github.com/VictoriaMetrics/fastcache v1.5.7/go.mod h1:ptDBkNMQI4RtmVo8VS/XwRY6RoTu1dAWCbrk+6WsEM8= +github.com/VictoriaMetrics/fastcache v1.6.0 h1:C/3Oi3EiBCqufydp1neRZkqcwmEiuRT9c3fqvvgKm5o= +github.com/VictoriaMetrics/fastcache v1.6.0/go.mod h1:0qHz5QP0GMX4pfmMA/zt5RgfNuXJrTP0zS7DqpHGGTw= github.com/VictoriaMetrics/metrics v1.17.3 h1:QPUakR6JRy8BhL2C2kOgYKLuoPDwtJQ+7iKIZSjt1A4= github.com/VictoriaMetrics/metrics v1.17.3/go.mod h1:Z1tSfPfngDn12bTfZSCqArT3OPY3u88J12hSoOhuiRE= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= @@ -492,8 +492,8 @@ github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0 github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c= github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= -github.com/ledgerwatch/erigon-lib v0.0.0-20210903061352-2c1e2b0ec467 h1:OCM+wTC4hV3u8wVBjoC344ekfmpcEq6yfty87uB2ZCM= -github.com/ledgerwatch/erigon-lib v0.0.0-20210903061352-2c1e2b0ec467/go.mod h1:q846JoG0oCWU9xTunmQAysfywjyoUzxx/5tHPo/F0t0= +github.com/ledgerwatch/erigon-lib v0.0.0-20210908053037-c4efc0ea3a6c h1:/GDDVpBRcYJ+8KwRAfg42oGaB6Sluz1oHVgtJFw4nJ4= +github.com/ledgerwatch/erigon-lib v0.0.0-20210908053037-c4efc0ea3a6c/go.mod h1:q846JoG0oCWU9xTunmQAysfywjyoUzxx/5tHPo/F0t0= github.com/ledgerwatch/log/v3 v3.3.0 h1:k8N/3NQLILr8CKCMyza261vLFKU7VA+nMNNb0wVyQSc= github.com/ledgerwatch/log/v3 v3.3.0/go.mod h1:J58eOHHrIYHxl7LKkRsb/0YibKwtLfauUryl5SLRGm0= github.com/ledgerwatch/secp256k1 v0.0.0-20210626115225-cd5cd00ed72d h1:/IKMrJdfRsoYNc36PXqP4xMH3vhW/8IQyBKGQbKZUno= @@ -1014,6 +1014,7 @@ golang.org/x/sys v0.0.0-20210220050731-9a76102bfb43/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210305230114-8fe3ee5dd75b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210315160823-c6e025ad8005/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210324051608-47abb6519492/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210326220804-49726bf1d181/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/turbo/stages/mock_sentry.go b/turbo/stages/mock_sentry.go index cad94ac4b8905a0d8e2141f095c237855e888b8e..63c9613489e528032a19259f01c29b84a0b6de9f 100644 --- a/turbo/stages/mock_sentry.go +++ b/turbo/stages/mock_sentry.go @@ -322,7 +322,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey mock.MiningSync = stagedsync.New( stagedsync.MiningStages(mock.Ctx, - stagedsync.StageMiningCreateBlockCfg(mock.DB, miner, *mock.ChainConfig, mock.Engine, txPool, mock.tmpdir), + stagedsync.StageMiningCreateBlockCfg(mock.DB, miner, *mock.ChainConfig, mock.Engine, txPool, nil, nil, mock.tmpdir), stagedsync.StageMiningExecCfg(mock.DB, miner, nil, *mock.ChainConfig, mock.Engine, &vm.Config{}, mock.tmpdir), stagedsync.StageHashStateCfg(mock.DB, mock.tmpdir), stagedsync.StageTrieCfg(mock.DB, false, true, mock.tmpdir), diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index 110b2269fc35b688eb83de475741f612320b4921..c05fe92aa520d79abc0ad394d7349e37b1eb86c1 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -262,15 +262,18 @@ func NewStagedSync( stagedsync.StageCallTracesCfg(db, cfg.Prune, 0, tmpdir), stagedsync.StageTxLookupCfg(db, cfg.Prune, tmpdir), stagedsync.StageTxPoolCfg(db, txPool, cfg.TxPool, func() { - for i := range txPoolServer.Sentries { - go func(i int) { - txpool.RecvTxMessageLoop(ctx, txPoolServer.Sentries[i], txPoolServer.HandleInboundMessage, nil) - }(i) - go func(i int) { - txpool.RecvPeersLoop(ctx, txPoolServer.Sentries[i], txPoolServer.RecentPeers, nil) - }(i) + if cfg.TxPool.V2 { + } else { + for i := range txPoolServer.Sentries { + go func(i int) { + txpool.RecvTxMessageLoop(ctx, txPoolServer.Sentries[i], txPoolServer.HandleInboundMessage, nil) + }(i) + go func(i int) { + txpool.RecvPeersLoop(ctx, txPoolServer.Sentries[i], txPoolServer.RecentPeers, nil) + }(i) + } + txPoolServer.TxFetcher.Start() } - txPoolServer.TxFetcher.Start() }), stagedsync.StageFinishCfg(db, tmpdir, client, snapshotMigrator, logger), false, /* test */