diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index 2bf738fb960fa99b547cbc6fb97f1dd740d39390..cf7350f47a6d30afc42b767cc5e3f943ced604d0 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -940,6 +940,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig) cfg := ethconfig.Defaults cfg.Prune = pm cfg.BatchSize = batchSize + cfg.TxPool.Disable = true if miningConfig != nil { cfg.Miner = *miningConfig } diff --git a/cmd/sentry/download/sentry.go b/cmd/sentry/download/sentry.go index 110cc9d936617aab0dee47b374d573c168ed66e5..f824df48496d5ea86a1c017f16937432fce59eb3 100644 --- a/cmd/sentry/download/sentry.go +++ b/cmd/sentry/download/sentry.go @@ -17,8 +17,7 @@ import ( "time" "github.com/golang/protobuf/ptypes/empty" - grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" - grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" + "github.com/ledgerwatch/erigon-lib/gointerfaces/grpcutil" "github.com/ledgerwatch/log/v3" "google.golang.org/protobuf/types/known/emptypb" @@ -37,7 +36,6 @@ import ( "github.com/ledgerwatch/erigon/params" "github.com/ledgerwatch/erigon/rlp" "google.golang.org/grpc" - "google.golang.org/grpc/keepalive" ) const ( @@ -430,39 +428,8 @@ func grpcSentryServer(ctx context.Context, sentryAddr string, ss *SentryServerIm if err != nil { return nil, fmt.Errorf("could not create Sentry P2P listener: %w, addr=%s", err, sentryAddr) } - var ( - streamInterceptors []grpc.StreamServerInterceptor - unaryInterceptors []grpc.UnaryServerInterceptor - ) - streamInterceptors = append(streamInterceptors, grpc_recovery.StreamServerInterceptor()) - unaryInterceptors = append(unaryInterceptors, grpc_recovery.UnaryServerInterceptor()) - //if metrics.Enabled { - // streamInterceptors = append(streamInterceptors, grpc_prometheus.StreamServerInterceptor) - // unaryInterceptors = append(unaryInterceptors, grpc_prometheus.UnaryServerInterceptor) - //} - - var grpcServer *grpc.Server - //cpus := uint32(runtime.GOMAXPROCS(-1)) - opts := []grpc.ServerOption{ - //grpc.NumStreamWorkers(cpus), // reduce amount of goroutines - grpc.WriteBufferSize(1024), // reduce buffers to save mem - grpc.ReadBufferSize(1024), - grpc.MaxConcurrentStreams(100), // to force clients reduce concurrency level - // Don't drop the connection, settings accordign to this comment on GitHub - // https://github.com/grpc/grpc-go/issues/3171#issuecomment-552796779 - grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ - MinTime: 10 * time.Second, - PermitWithoutStream: true, - }), - grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(streamInterceptors...)), - grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(unaryInterceptors...)), - } - grpcServer = grpc.NewServer(opts...) - + grpcServer := grpcutil.NewServer(100, nil) proto_sentry.RegisterSentryServer(grpcServer, ss) - //if metrics.Enabled { - // grpc_prometheus.Register(grpcServer) - //} go func() { if err1 := grpcServer.Serve(lis); err1 != nil { log.Error("Sentry P2P server fail", "err", err1) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 91430a52028a140fc5324205b80e64a6eb37de9f..881c4db001e773454d1b944f1929f142fec0be46 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -146,6 +146,10 @@ var ( Usage: "Lock memory maps for recent ethash mining DAGs", } // Transaction pool settings + TxPoolV2Flag = cli.BoolFlag{ + Name: "txpool.v2", + Usage: "experimental external pool and block producer, see ./cmd/txpool/readme.md for more info. Disabling internal txpool and block producer.", + } TxPoolLocalsFlag = cli.StringFlag{ Name: "txpool.locals", Usage: "Comma separated accounts to treat as locals (no flush, priority inclusion)", @@ -917,6 +921,9 @@ func setGPOCobra(f *pflag.FlagSet, cfg *gasprice.Config) { } func setTxPool(ctx *cli.Context, cfg *core.TxPoolConfig) { + if ctx.GlobalIsSet(TxPoolV2Flag.Name) { + cfg.Disable = true + } if ctx.GlobalIsSet(TxPoolLocalsFlag.Name) { locals := strings.Split(ctx.GlobalString(TxPoolLocalsFlag.Name), ",") for _, account := range locals { diff --git a/core/tx_pool.go b/core/tx_pool.go index 1538f65c56c3037b4144a9e23ced19adbeb18ed9..2803a78f3a0c98e1707cc419919f11ee23cae0bd 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -133,6 +133,7 @@ const ( // TxPoolConfig are the configuration parameters of the transaction pool. type TxPoolConfig struct { + Disable bool Locals []common.Address // Addresses that should be treated by default as local NoLocals bool // Whether local transaction handling should be disabled Journal string // Journal of local transactions to survive node restarts diff --git a/eth/backend.go b/eth/backend.go index 1f0102e5a1ff3381771013ce0b206ae6effe5c00..27914b1ace936ff58b0f5d172280996155ad4fe0 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -19,11 +19,8 @@ package eth import ( "context" - "crypto/tls" - "crypto/x509" "errors" "fmt" - "io/ioutil" "math/big" "os" "path" @@ -34,7 +31,9 @@ import ( "github.com/holiman/uint256" "github.com/ledgerwatch/erigon-lib/direct" + "github.com/ledgerwatch/erigon-lib/gointerfaces/grpcutil" "github.com/ledgerwatch/erigon-lib/gointerfaces/sentry" + txpool_proto "github.com/ledgerwatch/erigon-lib/gointerfaces/txpool" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/cmd/sentry/download" "github.com/ledgerwatch/erigon/common" @@ -268,100 +267,48 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere } } - backend.quitMining = make(chan struct{}) - backend.miningSealingQuit = make(chan struct{}) - backend.pendingBlocks = make(chan *types.Block, 1) - backend.minedBlocks = make(chan *types.Block, 1) - - miner := stagedsync.NewMiningState(&config.Miner) - backend.pendingBlocks = miner.PendingResultCh - backend.minedBlocks = miner.MiningResultCh - - mining := stagedsync.New( - stagedsync.MiningStages(backend.downloadCtx, - stagedsync.StageMiningCreateBlockCfg(backend.chainKV, miner, *backend.chainConfig, backend.engine, backend.txPool, tmpdir), - stagedsync.StageMiningExecCfg(backend.chainKV, miner, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir), - stagedsync.StageHashStateCfg(backend.chainKV, tmpdir), - stagedsync.StageTrieCfg(backend.chainKV, false, true, tmpdir), - stagedsync.StageMiningFinishCfg(backend.chainKV, *backend.chainConfig, backend.engine, miner, backend.miningSealingQuit), - ), stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder) - var ethashApi *ethash.API if casted, ok := backend.engine.(*ethash.Ethash); ok { ethashApi = casted.APIs(nil)[1].Service.(*ethash.API) } ethBackendRPC := privateapi.NewEthBackendServer(ctx, backend, backend.notifications.Events) - txPoolRPC := privateapi.NewTxPoolServer(ctx, backend.txPool) - miningRPC := privateapi.NewMiningServer(ctx, backend, ethashApi) - + var txPoolRPC txpool_proto.TxpoolServer + var miningRPC txpool_proto.MiningServer + if !config.TxPool.Disable { + txPoolRPC = privateapi.NewTxPoolServer(ctx, backend.txPool) + miningRPC = privateapi.NewMiningServer(ctx, backend, ethashApi) + } if stack.Config().PrivateApiAddr != "" { - + var creds *credentials.TransportCredentials if stack.Config().TLSConnection { - // load peer cert/key, ca cert - var creds credentials.TransportCredentials - - if stack.Config().TLSCACert != "" { - var peerCert tls.Certificate - var caCert []byte - peerCert, err = tls.LoadX509KeyPair(stack.Config().TLSCertFile, stack.Config().TLSKeyFile) - if err != nil { - log.Error("load peer cert/key error:%v", err) - return nil, err - } - caCert, err = ioutil.ReadFile(stack.Config().TLSCACert) - if err != nil { - log.Error("read ca cert file error:%v", err) - return nil, err - } - caCertPool := x509.NewCertPool() - caCertPool.AppendCertsFromPEM(caCert) - creds = credentials.NewTLS(&tls.Config{ - Certificates: []tls.Certificate{peerCert}, - ClientCAs: caCertPool, - ClientAuth: tls.RequireAndVerifyClientCert, - MinVersion: tls.VersionTLS12, - }) - } else { - creds, err = credentials.NewServerTLSFromFile(stack.Config().TLSCertFile, stack.Config().TLSKeyFile) - } - - if err != nil { - return nil, err - } - backend.privateAPI, err = privateapi.StartGrpc( - kvRPC, - ethBackendRPC, - txPoolRPC, - miningRPC, - stack.Config().PrivateApiAddr, - stack.Config().PrivateApiRateLimit, - &creds) - if err != nil { - return nil, err - } - } else { - backend.privateAPI, err = privateapi.StartGrpc( - kvRPC, - ethBackendRPC, - txPoolRPC, - miningRPC, - stack.Config().PrivateApiAddr, - stack.Config().PrivateApiRateLimit, - nil) + tlsCreds, err := grpcutil.TLS(stack.Config().TLSCACert, stack.Config().TLSCertFile, stack.Config().TLSKeyFile) if err != nil { return nil, err } + creds = &tlsCreds + } + backend.privateAPI, err = privateapi.StartGrpc( + kvRPC, + ethBackendRPC, + txPoolRPC, + miningRPC, + stack.Config().PrivateApiAddr, + stack.Config().PrivateApiRateLimit, + creds) + if err != nil { + return nil, err } + } if len(stack.Config().P2P.SentryAddr) > 0 { for _, addr := range stack.Config().P2P.SentryAddr { - sentry, err := download.GrpcSentryClient(backend.downloadCtx, addr) + sentryClient, err := download.GrpcSentryClient(backend.downloadCtx, addr) if err != nil { return nil, err } - backend.sentries = append(backend.sentries, sentry) + backend.sentries = append(backend.sentries, sentryClient) } } else { var readNodeInfo = func() *eth.NodeInfo { @@ -418,36 +365,8 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere if err != nil { return nil, err } - backend.txPoolP2PServer, err = txpool.NewP2PServer(backend.downloadCtx, backend.sentries, backend.txPool) - if err != nil { - return nil, err - } - - fetchTx := func(peerID string, hashes []common.Hash) error { - backend.txPoolP2PServer.SendTxsRequest(context.TODO(), peerID, hashes) - return nil - } - - backend.txPoolP2PServer.TxFetcher = fetcher.NewTxFetcher(backend.txPool.Has, backend.txPool.AddRemotes, fetchTx) config.BodyDownloadTimeoutSeconds = 30 - backend.stagedSync, err = stages2.NewStagedSync( - backend.downloadCtx, - backend.logger, - backend.chainKV, - stack.Config().P2P, - *config, - backend.downloadServer, - tmpdir, - backend.txPool, - backend.txPoolP2PServer, - - torrentClient, mg, backend.notifications.Accumulator, - ) - if err != nil { - return nil, err - } - emptyBadHash := config.BadBlockHash == common.Hash{} if !emptyBadHash { var badBlockHeader *types.Header @@ -465,31 +384,80 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere } } - go txpropagate.BroadcastPendingTxsToNetwork(backend.downloadCtx, backend.txPool, backend.txPoolP2PServer.RecentPeers, backend.downloadServer) + if !config.TxPool.Disable { + backend.txPoolP2PServer, err = txpool.NewP2PServer(backend.downloadCtx, backend.sentries, backend.txPool) + if err != nil { + return nil, err + } - go func() { - defer debug.LogPanic() - for { - select { - case b := <-backend.minedBlocks: - //p2p - //backend.downloadServer.BroadcastNewBlock(context.Background(), b, b.Difficulty()) - //rpcdaemon - if err := miningRPC.BroadcastMinedBlock(b); err != nil { - log.Error("txpool rpc mined block broadcast", "err", err) - } + fetchTx := func(peerID string, hashes []common.Hash) error { + backend.txPoolP2PServer.SendTxsRequest(context.TODO(), peerID, hashes) + return nil + } + + backend.txPoolP2PServer.TxFetcher = fetcher.NewTxFetcher(backend.txPool.Has, backend.txPool.AddRemotes, fetchTx) + + backend.quitMining = make(chan struct{}) + backend.miningSealingQuit = make(chan struct{}) + backend.pendingBlocks = make(chan *types.Block, 1) + backend.minedBlocks = make(chan *types.Block, 1) + + miner := stagedsync.NewMiningState(&config.Miner) + backend.pendingBlocks = miner.PendingResultCh + backend.minedBlocks = miner.MiningResultCh - case b := <-backend.pendingBlocks: - if err := miningRPC.BroadcastPendingBlock(b); err != nil { - log.Error("txpool rpc pending block broadcast", "err", err) + mining := stagedsync.New( + stagedsync.MiningStages(backend.downloadCtx, + stagedsync.StageMiningCreateBlockCfg(backend.chainKV, miner, *backend.chainConfig, backend.engine, backend.txPool, tmpdir), + stagedsync.StageMiningExecCfg(backend.chainKV, miner, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir), + stagedsync.StageHashStateCfg(backend.chainKV, tmpdir), + stagedsync.StageTrieCfg(backend.chainKV, false, true, tmpdir), + stagedsync.StageMiningFinishCfg(backend.chainKV, *backend.chainConfig, backend.engine, miner, backend.miningSealingQuit), + ), stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder) + + go txpropagate.BroadcastPendingTxsToNetwork(backend.downloadCtx, backend.txPool, backend.txPoolP2PServer.RecentPeers, backend.downloadServer) + + go func() { + defer debug.LogPanic() + for { + select { + case b := <-backend.minedBlocks: + //p2p + //backend.downloadServer.BroadcastNewBlock(context.Background(), b, b.Difficulty()) + //rpcdaemon + if err := miningRPC.(*privateapi.MiningServer).BroadcastMinedBlock(b); err != nil { + log.Error("txpool rpc mined block broadcast", "err", err) + } + + case b := <-backend.pendingBlocks: + if err := miningRPC.(*privateapi.MiningServer).BroadcastPendingBlock(b); err != nil { + log.Error("txpool rpc pending block broadcast", "err", err) + } + case <-backend.quitMining: + return } - case <-backend.quitMining: - return } + }() + + if err := backend.StartMining(context.Background(), backend.chainKV, mining, backend.config.Miner, backend.gasPrice, backend.quitMining); err != nil { + return nil, err } - }() + } + + backend.stagedSync, err = stages2.NewStagedSync( + backend.downloadCtx, + backend.logger, + backend.chainKV, + stack.Config().P2P, + *config, + backend.downloadServer, + tmpdir, + backend.txPool, + backend.txPoolP2PServer, - if err := backend.StartMining(context.Background(), backend.chainKV, mining, backend.config.Miner, backend.gasPrice, backend.quitMining); err != nil { + torrentClient, mg, backend.notifications.Accumulator, + ) + if err != nil { return nil, err } @@ -700,7 +668,9 @@ func (s *Ethereum) Start() error { func (s *Ethereum) Stop() error { // Stop all the peer-related stuff first. s.downloadCancel() - s.txPoolP2PServer.TxFetcher.Stop() + if s.txPoolP2PServer != nil { + s.txPoolP2PServer.TxFetcher.Stop() + } if s.privateAPI != nil { shutdownDone := make(chan bool) go func() { @@ -713,16 +683,15 @@ func (s *Ethereum) Stop() error { case <-shutdownDone: } } - s.txPool.Stop() + if s.txPool != nil { + s.txPool.Stop() + } if s.quitMining != nil { close(s.quitMining) } //s.miner.Stop() s.engine.Close() - if s.txPool != nil { - s.txPool.Stop() - } <-s.waitForStageLoopStop if s.config.Miner.Enabled { <-s.waitForMiningStop diff --git a/eth/stagedsync/default_stages.go b/eth/stagedsync/default_stages.go index a4d327231964938cdb5858b64238a61762fec2c4..87719322f27ff9cffc7b36e5a9b47fad4f49e637 100644 --- a/eth/stagedsync/default_stages.go +++ b/eth/stagedsync/default_stages.go @@ -254,6 +254,7 @@ func DefaultStages(ctx context.Context, { ID: stages.TxPool, Description: "Update transaction pool", + Disabled: txPool.config.Disable, Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, _ Unwinder, tx kv.RwTx) error { return SpawnTxPool(s, tx, txPool, ctx) }, diff --git a/eth/stagedsync/stage_txpool.go b/eth/stagedsync/stage_txpool.go index f99e810cd0d3e9306bc496f540acde6c7e22df55..a45cc9bfa96e7bcde09ca1d6f512dead2cb92efa 100644 --- a/eth/stagedsync/stage_txpool.go +++ b/eth/stagedsync/stage_txpool.go @@ -17,13 +17,15 @@ import ( type TxPoolCfg struct { db kv.RwDB pool *core.TxPool + config core.TxPoolConfig startFunc func() } -func StageTxPoolCfg(db kv.RwDB, pool *core.TxPool, startFunc func()) TxPoolCfg { +func StageTxPoolCfg(db kv.RwDB, pool *core.TxPool, config core.TxPoolConfig, startFunc func()) TxPoolCfg { return TxPoolCfg{ db: db, pool: pool, + config: config, startFunc: startFunc, } } diff --git a/ethdb/privateapi/all.go b/ethdb/privateapi/all.go index 17641fbed6f949ab3eeb2306739b71bd650014b4..d3f4c2f111856fafa07201719efd840d50a1dea6 100644 --- a/ethdb/privateapi/all.go +++ b/ethdb/privateapi/all.go @@ -3,10 +3,8 @@ package privateapi import ( "fmt" "net" - "time" - grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" - grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" + "github.com/ledgerwatch/erigon-lib/gointerfaces/grpcutil" //grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/ledgerwatch/erigon-lib/gointerfaces/remote" txpool_proto "github.com/ledgerwatch/erigon-lib/gointerfaces/txpool" @@ -14,7 +12,6 @@ import ( "github.com/ledgerwatch/log/v3" "google.golang.org/grpc" "google.golang.org/grpc/credentials" - "google.golang.org/grpc/keepalive" ) func StartGrpc(kv *remotedbserver.KvServer, ethBackendSrv *EthBackendServer, txPoolServer txpool_proto.TxpoolServer, miningServer txpool_proto.MiningServer, addr string, rateLimit uint32, creds *credentials.TransportCredentials) (*grpc.Server, error) { @@ -24,40 +21,7 @@ func StartGrpc(kv *remotedbserver.KvServer, ethBackendSrv *EthBackendServer, txP return nil, fmt.Errorf("could not create listener: %w, addr=%s", err, addr) } - var ( - streamInterceptors []grpc.StreamServerInterceptor - unaryInterceptors []grpc.UnaryServerInterceptor - ) - streamInterceptors = append(streamInterceptors, grpc_recovery.StreamServerInterceptor()) - unaryInterceptors = append(unaryInterceptors, grpc_recovery.UnaryServerInterceptor()) - - //if metrics.Enabled { - // streamInterceptors = append(streamInterceptors, grpc_prometheus.StreamServerInterceptor) - // unaryInterceptors = append(unaryInterceptors, grpc_prometheus.UnaryServerInterceptor) - //} - - var grpcServer *grpc.Server - //cpus := uint32(runtime.GOMAXPROCS(-1)) - opts := []grpc.ServerOption{ - //grpc.NumStreamWorkers(cpus), // reduce amount of goroutines - grpc.WriteBufferSize(1024), // reduce buffers to save mem - grpc.ReadBufferSize(1024), - grpc.MaxConcurrentStreams(rateLimit), // to force clients reduce concurrency level - // Don't drop the connection, settings accordign to this comment on GitHub - // https://github.com/grpc/grpc-go/issues/3171#issuecomment-552796779 - grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ - MinTime: 10 * time.Second, - PermitWithoutStream: true, - }), - grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(streamInterceptors...)), - grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(unaryInterceptors...)), - } - if creds == nil { - // no specific opts - } else { - opts = append(opts, grpc.Creds(*creds)) - } - grpcServer = grpc.NewServer(opts...) + grpcServer := grpcutil.NewServer(rateLimit, creds) remote.RegisterETHBACKENDServer(grpcServer, ethBackendSrv) if txPoolServer != nil { txpool_proto.RegisterTxpoolServer(grpcServer, txPoolServer) @@ -67,10 +31,6 @@ func StartGrpc(kv *remotedbserver.KvServer, ethBackendSrv *EthBackendServer, txP } remote.RegisterKVServer(grpcServer, kv) - //if metrics.Enabled { - // grpc_prometheus.Register(grpcServer) - //} - go func() { if err := grpcServer.Serve(lis); err != nil { log.Error("private RPC server fail", "err", err) diff --git a/go.mod b/go.mod index ec8586f0d8d775018ee9d59e8f781695627f36fe..a66137368b51b49370e536731459967e26093496 100644 --- a/go.mod +++ b/go.mod @@ -37,7 +37,7 @@ require ( github.com/julienschmidt/httprouter v1.3.0 github.com/kevinburke/go-bindata v3.21.0+incompatible github.com/kylelemons/godebug v1.1.0 // indirect - github.com/ledgerwatch/erigon-lib v0.0.0-20210902093323-b1435d367912 + github.com/ledgerwatch/erigon-lib v0.0.0-20210902093736-c935d9ff654f github.com/ledgerwatch/log/v3 v3.3.0 github.com/ledgerwatch/secp256k1 v0.0.0-20210626115225-cd5cd00ed72d github.com/logrusorgru/aurora/v3 v3.0.0 diff --git a/go.sum b/go.sum index 8b4264ea752c399c1b4b712d90f66267373acc54..f2723baf7706013962959d985c81d8c226dbe14a 100644 --- a/go.sum +++ b/go.sum @@ -492,8 +492,8 @@ github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0 github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c= github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= -github.com/ledgerwatch/erigon-lib v0.0.0-20210902093323-b1435d367912 h1:7671aVLMY7C2PyXON6xZjSMPIPd6zCgXfL653q9FwWA= -github.com/ledgerwatch/erigon-lib v0.0.0-20210902093323-b1435d367912/go.mod h1:q846JoG0oCWU9xTunmQAysfywjyoUzxx/5tHPo/F0t0= +github.com/ledgerwatch/erigon-lib v0.0.0-20210902093736-c935d9ff654f h1:iaaLM4kl29mWzQF9VtAzD+0uPvvgtzFAzgo5zlczNOU= +github.com/ledgerwatch/erigon-lib v0.0.0-20210902093736-c935d9ff654f/go.mod h1:q846JoG0oCWU9xTunmQAysfywjyoUzxx/5tHPo/F0t0= github.com/ledgerwatch/log/v3 v3.3.0 h1:k8N/3NQLILr8CKCMyza261vLFKU7VA+nMNNb0wVyQSc= github.com/ledgerwatch/log/v3 v3.3.0/go.mod h1:J58eOHHrIYHxl7LKkRsb/0YibKwtLfauUryl5SLRGm0= github.com/ledgerwatch/secp256k1 v0.0.0-20210626115225-cd5cd00ed72d h1:/IKMrJdfRsoYNc36PXqP4xMH3vhW/8IQyBKGQbKZUno= diff --git a/turbo/cli/default_flags.go b/turbo/cli/default_flags.go index 618e26f3a3fcd77b53e80dcd3745abd2c8c9c340..2ec02127da2a261122541a06fd3ed28019ee9412 100644 --- a/turbo/cli/default_flags.go +++ b/turbo/cli/default_flags.go @@ -10,6 +10,7 @@ import ( var DefaultFlags = []cli.Flag{ utils.DataDirFlag, utils.EthashDatasetDirFlag, + utils.TxPoolV2Flag, utils.TxPoolLocalsFlag, utils.TxPoolNoLocalsFlag, utils.TxPoolJournalFlag, diff --git a/turbo/stages/mock_sentry.go b/turbo/stages/mock_sentry.go index 456bd3559f06992aecc8d59195d9e58611a93d2f..8927d79d32ba76cf11ebd10cefc32cd789701f32 100644 --- a/turbo/stages/mock_sentry.go +++ b/turbo/stages/mock_sentry.go @@ -296,7 +296,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey stagedsync.StageLogIndexCfg(mock.DB, prune, mock.tmpdir), stagedsync.StageCallTracesCfg(mock.DB, prune, 0, mock.tmpdir), stagedsync.StageTxLookupCfg(mock.DB, prune, mock.tmpdir), - stagedsync.StageTxPoolCfg(mock.DB, txPool, func() { + stagedsync.StageTxPoolCfg(mock.DB, txPool, cfg.TxPool, func() { mock.StreamWg.Add(1) go txpool.RecvTxMessageLoop(mock.Ctx, mock.SentryClient, mock.TxPoolP2PServer.HandleInboundMessage, &mock.ReceiveWg) go txpropagate.BroadcastPendingTxsToNetwork(mock.Ctx, txPool, mock.TxPoolP2PServer.RecentPeers, mock.downloader) diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index 0fbcc7da80ef4af38eabf0bf91a6f1751583258c..7f48db73267d15fad8e3ad570d18652bb0d53080 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -261,7 +261,7 @@ func NewStagedSync( stagedsync.StageLogIndexCfg(db, cfg.Prune, tmpdir), stagedsync.StageCallTracesCfg(db, cfg.Prune, 0, tmpdir), stagedsync.StageTxLookupCfg(db, cfg.Prune, tmpdir), - stagedsync.StageTxPoolCfg(db, txPool, func() { + stagedsync.StageTxPoolCfg(db, txPool, cfg.TxPool, func() { for i := range txPoolServer.Sentries { go func(i int) { txpool.RecvTxMessageLoop(ctx, txPoolServer.Sentries[i], txPoolServer.HandleInboundMessage, nil)