From 8f6aa2f7ae27c11ff19484dc2d536f19f40e0999 Mon Sep 17 00:00:00 2001 From: Alex Sharov <AskAlexSharov@gmail.com> Date: Tue, 29 Jun 2021 17:00:22 +0700 Subject: [PATCH] propagate new tx to p2p (#2254) --- .../commands/corner_cases_support_test.go | 3 +- cmd/rpcdaemon/commands/debug_api_test.go | 5 +- cmd/rpcdaemon/commands/eth_api_test.go | 5 +- cmd/rpcdaemon/commands/eth_call_test.go | 5 +- cmd/rpcdaemon/commands/eth_ming_test.go | 18 ++--- .../commands/send_transaction_test.go | 18 +++-- cmd/rpcdaemon/commands/trace_adhoc_test.go | 9 +-- cmd/rpcdaemon/commands/txpool_api_test.go | 3 +- .../{commands => rpcdaemontest}/test_util.go | 6 +- cmd/sentry/download/broadcast.go | 65 +++++++++++++++++++ cmd/sentry/download/sentry.go | 6 +- eth/backend.go | 31 ++------- eth/stagedsync/stagebuilder.go | 2 - ethdb/remote/remotedbserver/events.go | 26 -------- turbo/stages/mock_sentry.go | 2 + turbo/stages/txpropagate/deprecated.go | 30 +++++++++ 16 files changed, 151 insertions(+), 83 deletions(-) rename cmd/rpcdaemon/{commands => rpcdaemontest}/test_util.go (98%) create mode 100644 turbo/stages/txpropagate/deprecated.go diff --git a/cmd/rpcdaemon/commands/corner_cases_support_test.go b/cmd/rpcdaemon/commands/corner_cases_support_test.go index bf9c910ede..36c678cbcd 100644 --- a/cmd/rpcdaemon/commands/corner_cases_support_test.go +++ b/cmd/rpcdaemon/commands/corner_cases_support_test.go @@ -4,6 +4,7 @@ import ( "context" "testing" + "github.com/ledgerwatch/erigon/cmd/rpcdaemon/rpcdaemontest" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/rpc" "github.com/stretchr/testify/require" @@ -13,7 +14,7 @@ import ( // see https://github.com/ledgerwatch/erigon/issues/1645 func TestNotFoundMustReturnNil(t *testing.T) { require := require.New(t) - db := createTestKV(t) + db := rpcdaemontest.CreateTestKV(t) defer db.Close() api := NewEthAPI(NewBaseApi(nil), db, nil, nil, nil, 5000000) ctx := context.Background() diff --git a/cmd/rpcdaemon/commands/debug_api_test.go b/cmd/rpcdaemon/commands/debug_api_test.go index 3bc9cc68ad..e2c3e816a3 100644 --- a/cmd/rpcdaemon/commands/debug_api_test.go +++ b/cmd/rpcdaemon/commands/debug_api_test.go @@ -7,6 +7,7 @@ import ( "testing" jsoniter "github.com/json-iterator/go" + "github.com/ledgerwatch/erigon/cmd/rpcdaemon/rpcdaemontest" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/eth/tracers" "github.com/ledgerwatch/erigon/internal/ethapi" @@ -35,7 +36,7 @@ var debugTraceTransactionNoRefundTests = []struct { } func TestTraceTransaction(t *testing.T) { - db := createTestKV(t) + db := rpcdaemontest.CreateTestKV(t) api := NewPrivateDebugAPI(NewBaseApi(nil), db, 0) for _, tt := range debugTraceTransactionTests { var buf bytes.Buffer @@ -64,7 +65,7 @@ func TestTraceTransaction(t *testing.T) { } func TestTraceTransactionNoRefund(t *testing.T) { - db := createTestKV(t) + db := rpcdaemontest.CreateTestKV(t) api := NewPrivateDebugAPI(NewBaseApi(nil), db, 0) for _, tt := range debugTraceTransactionNoRefundTests { var buf bytes.Buffer diff --git a/cmd/rpcdaemon/commands/eth_api_test.go b/cmd/rpcdaemon/commands/eth_api_test.go index ad12d99021..92d3312636 100644 --- a/cmd/rpcdaemon/commands/eth_api_test.go +++ b/cmd/rpcdaemon/commands/eth_api_test.go @@ -4,11 +4,12 @@ import ( "context" "testing" + "github.com/ledgerwatch/erigon/cmd/rpcdaemon/rpcdaemontest" "github.com/ledgerwatch/erigon/common" ) func TestGetTransactionReceipt(t *testing.T) { - db := createTestKV(t) + db := rpcdaemontest.CreateTestKV(t) api := NewEthAPI(NewBaseApi(nil), db, nil, nil, nil, 5000000) // Call GetTransactionReceipt for transaction which is not in the database if _, err := api.GetTransactionReceipt(context.Background(), common.Hash{}); err != nil { @@ -17,7 +18,7 @@ func TestGetTransactionReceipt(t *testing.T) { } func TestGetTransactionReceiptUnprotected(t *testing.T) { - db := createTestKV(t) + db := rpcdaemontest.CreateTestKV(t) api := NewEthAPI(NewBaseApi(nil), db, nil, nil, nil, 5000000) // Call GetTransactionReceipt for un-protected transaction if _, err := api.GetTransactionReceipt(context.Background(), common.HexToHash("0x3f3cb8a0e13ed2481f97f53f7095b9cbc78b6ffb779f2d3e565146371a8830ea")); err != nil { diff --git a/cmd/rpcdaemon/commands/eth_call_test.go b/cmd/rpcdaemon/commands/eth_call_test.go index 979457528b..d276172f0a 100644 --- a/cmd/rpcdaemon/commands/eth_call_test.go +++ b/cmd/rpcdaemon/commands/eth_call_test.go @@ -5,13 +5,14 @@ import ( "fmt" "testing" + "github.com/ledgerwatch/erigon/cmd/rpcdaemon/rpcdaemontest" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/internal/ethapi" "github.com/ledgerwatch/erigon/rpc" ) func TestEstimateGas(t *testing.T) { - db := createTestKV(t) + db := rpcdaemontest.CreateTestKV(t) api := NewEthAPI(NewBaseApi(nil), db, nil, nil, nil, 5000000) var from = common.HexToAddress("0x71562b71999873db5b286df957af199ec94617f7") var to = common.HexToAddress("0x0d3ab14bbad3d99f4203bd7a11acb94882050e7e") @@ -24,7 +25,7 @@ func TestEstimateGas(t *testing.T) { } func TestEthCallNonCanonical(t *testing.T) { - db := createTestKV(t) + db := rpcdaemontest.CreateTestKV(t) api := NewEthAPI(NewBaseApi(nil), db, nil, nil, nil, 5000000) var from = common.HexToAddress("0x71562b71999873db5b286df957af199ec94617f7") var to = common.HexToAddress("0x0d3ab14bbad3d99f4203bd7a11acb94882050e7e") diff --git a/cmd/rpcdaemon/commands/eth_ming_test.go b/cmd/rpcdaemon/commands/eth_ming_test.go index 7156984e4a..749738fb9a 100644 --- a/cmd/rpcdaemon/commands/eth_ming_test.go +++ b/cmd/rpcdaemon/commands/eth_ming_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/ledgerwatch/erigon/cmd/rpcdaemon/filters" + "github.com/ledgerwatch/erigon/cmd/rpcdaemon/rpcdaemontest" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/gointerfaces/txpool" "github.com/ledgerwatch/erigon/rlp" @@ -14,7 +15,7 @@ import ( ) func TestPendingBlock(t *testing.T) { - ctx, conn := createTestGrpcConn(t, stages.Mock(t)) + ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, stages.Mock(t)) mining := txpool.NewMiningClient(conn) ff := filters.New(ctx, nil, nil, mining) api := NewEthAPI(NewBaseApi(ff), nil, nil, nil, mining, 5000000) @@ -23,10 +24,10 @@ func TestPendingBlock(t *testing.T) { require.NoError(t, err) ch := make(chan *types.Block, 1) defer close(ch) - id := api.filters.SubscribePendingBlock(ch) - defer api.filters.UnsubscribePendingBlock(id) + id := ff.SubscribePendingBlock(ch) + defer ff.UnsubscribePendingBlock(id) - api.filters.HandlePendingBlock(&txpool.OnPendingBlockReply{RplBlock: b}) + ff.HandlePendingBlock(&txpool.OnPendingBlockReply{RplBlock: b}) block := api.pendingBlock() require.Equal(t, block.Number().Uint64(), expect) @@ -39,20 +40,19 @@ func TestPendingBlock(t *testing.T) { } func TestPendingLogs(t *testing.T) { - ctx, conn := createTestGrpcConn(t, stages.Mock(t)) + ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, stages.Mock(t)) mining := txpool.NewMiningClient(conn) ff := filters.New(ctx, nil, nil, mining) - api := NewEthAPI(NewBaseApi(ff), nil, nil, nil, mining, 5000000) expect := []byte{211} ch := make(chan types.Logs, 1) defer close(ch) - id := api.filters.SubscribePendingLogs(ch) - defer api.filters.UnsubscribePendingLogs(id) + id := ff.SubscribePendingLogs(ch) + defer ff.UnsubscribePendingLogs(id) b, err := rlp.EncodeToBytes([]*types.Log{{Data: expect}}) require.NoError(t, err) - api.filters.HandlePendingLogs(&txpool.OnPendingLogsReply{RplLogs: b}) + ff.HandlePendingLogs(&txpool.OnPendingLogsReply{RplLogs: b}) select { case logs := <-ch: require.Equal(t, expect, logs[0].Data) diff --git a/cmd/rpcdaemon/commands/send_transaction_test.go b/cmd/rpcdaemon/commands/send_transaction_test.go index 9b504a4f40..3be92dae2a 100644 --- a/cmd/rpcdaemon/commands/send_transaction_test.go +++ b/cmd/rpcdaemon/commands/send_transaction_test.go @@ -1,4 +1,4 @@ -package commands +package commands_test import ( "bytes" @@ -7,7 +7,9 @@ import ( "testing" "github.com/holiman/uint256" + "github.com/ledgerwatch/erigon/cmd/rpcdaemon/commands" "github.com/ledgerwatch/erigon/cmd/rpcdaemon/filters" + "github.com/ledgerwatch/erigon/cmd/rpcdaemon/rpcdaemontest" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/u256" "github.com/ledgerwatch/erigon/core" @@ -66,10 +68,10 @@ func TestSendRawTransaction(t *testing.T) { txn, err := types.SignTx(types.NewTransaction(0, common.Address{1}, uint256.NewInt(expectValue), params.TxGas, u256.Num1, nil), *types.LatestSignerForChainID(m.ChainConfig.ChainID), m.Key) require.NoError(err) - ctx, conn := createTestGrpcConn(t, m) + ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, m) txPool := txpool.NewTxpoolClient(conn) ff := filters.New(ctx, nil, txPool, txpool.NewMiningClient(conn)) - api := NewEthAPI(NewBaseApi(ff), m.DB, nil, txPool, nil, 5000000) + api := commands.NewEthAPI(commands.NewBaseApi(ff), m.DB, nil, txPool, nil, 5000000) buf := bytes.NewBuffer(nil) err = txn.MarshalBinary(buf) @@ -77,8 +79,8 @@ func TestSendRawTransaction(t *testing.T) { txsCh := make(chan []types.Transaction, 1) defer close(txsCh) - id := api.filters.SubscribePendingTxs(txsCh) - defer api.filters.UnsubscribePendingTxs(id) + id := ff.SubscribePendingTxs(txsCh) + defer ff.UnsubscribePendingTxs(id) _, err = api.SendRawTransaction(ctx, buf.Bytes()) require.NoError(err) @@ -90,6 +92,12 @@ func TestSendRawTransaction(t *testing.T) { _, err = api.SendRawTransaction(ctx, buf.Bytes()) require.NotNil(err) require.Equal("ALREADY_EXISTS: already known", err.Error()) + m.ReceiveWg.Wait() + + //TODO: make propagation easy to test - now race + //time.Sleep(time.Second) + //sent := m.SentMessage(0) + //require.Equal(eth.ToProto[m.SentryClient.Protocol()][eth.NewPooledTransactionHashesMsg], sent.Id) } func transaction(nonce uint64, gaslimit uint64, key *ecdsa.PrivateKey) types.Transaction { diff --git a/cmd/rpcdaemon/commands/trace_adhoc_test.go b/cmd/rpcdaemon/commands/trace_adhoc_test.go index 3e49c18266..602d45a11c 100644 --- a/cmd/rpcdaemon/commands/trace_adhoc_test.go +++ b/cmd/rpcdaemon/commands/trace_adhoc_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/ledgerwatch/erigon/cmd/rpcdaemon/cli" + "github.com/ledgerwatch/erigon/cmd/rpcdaemon/rpcdaemontest" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/hexutil" "github.com/ledgerwatch/erigon/core/rawdb" @@ -15,7 +16,7 @@ import ( ) func TestEmptyQuery(t *testing.T) { - db := createTestKV(t) + db := rpcdaemontest.CreateTestKV(t) api := NewTraceAPI(NewBaseApi(nil), db, &cli.Flags{}) // Call GetTransactionReceipt for transaction which is not in the database var latest = rpc.LatestBlockNumber @@ -31,7 +32,7 @@ func TestEmptyQuery(t *testing.T) { } } func TestCoinbaseBalance(t *testing.T) { - db := createTestKV(t) + db := rpcdaemontest.CreateTestKV(t) api := NewTraceAPI(NewBaseApi(nil), db, &cli.Flags{}) // Call GetTransactionReceipt for transaction which is not in the database var latest = rpc.LatestBlockNumber @@ -57,7 +58,7 @@ func TestCoinbaseBalance(t *testing.T) { } func TestReplayTransaction(t *testing.T) { - db := createTestKV(t) + db := rpcdaemontest.CreateTestKV(t) api := NewTraceAPI(NewBaseApi(nil), db, &cli.Flags{}) var txnHash common.Hash if err := db.View(context.Background(), func(tx ethdb.Tx) error { @@ -84,7 +85,7 @@ func TestReplayTransaction(t *testing.T) { } func TestReplayBlockTransactions(t *testing.T) { - db := createTestKV(t) + db := rpcdaemontest.CreateTestKV(t) api := NewTraceAPI(NewBaseApi(nil), db, &cli.Flags{}) // Call GetTransactionReceipt for transaction which is not in the database diff --git a/cmd/rpcdaemon/commands/txpool_api_test.go b/cmd/rpcdaemon/commands/txpool_api_test.go index 604ee36748..da7287fec7 100644 --- a/cmd/rpcdaemon/commands/txpool_api_test.go +++ b/cmd/rpcdaemon/commands/txpool_api_test.go @@ -7,6 +7,7 @@ import ( "github.com/holiman/uint256" "github.com/ledgerwatch/erigon/cmd/rpcdaemon/filters" + "github.com/ledgerwatch/erigon/cmd/rpcdaemon/rpcdaemontest" "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/common/u256" "github.com/ledgerwatch/erigon/core" @@ -59,7 +60,7 @@ func TestTxPoolContent(t *testing.T) { } } - ctx, conn := createTestGrpcConn(t, m) + ctx, conn := rpcdaemontest.CreateTestGrpcConn(t, m) txPool := txpool.NewTxpoolClient(conn) ff := filters.New(ctx, nil, txPool, txpool.NewMiningClient(conn)) api := NewTxPoolAPI(NewBaseApi(ff), m.DB, txPool) diff --git a/cmd/rpcdaemon/commands/test_util.go b/cmd/rpcdaemon/rpcdaemontest/test_util.go similarity index 98% rename from cmd/rpcdaemon/commands/test_util.go rename to cmd/rpcdaemon/rpcdaemontest/test_util.go index 3c97b7317a..671bf62c21 100644 --- a/cmd/rpcdaemon/commands/test_util.go +++ b/cmd/rpcdaemon/rpcdaemontest/test_util.go @@ -1,4 +1,4 @@ -package commands +package rpcdaemontest import ( "context" @@ -25,7 +25,7 @@ import ( "google.golang.org/grpc/test/bufconn" ) -func createTestKV(t *testing.T) ethdb.RwKV { +func CreateTestKV(t *testing.T) ethdb.RwKV { // Configure and generate a sample block chain var ( key, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") @@ -203,7 +203,7 @@ type IsMiningMock struct{} func (*IsMiningMock) IsMining() bool { return false } -func createTestGrpcConn(t *testing.T, m *stages.MockSentry) (context.Context, *grpc.ClientConn) { //nolint +func CreateTestGrpcConn(t *testing.T, m *stages.MockSentry) (context.Context, *grpc.ClientConn) { //nolint ctx, cancel := context.WithCancel(context.Background()) apis := m.Engine.APIs(nil) diff --git a/cmd/sentry/download/broadcast.go b/cmd/sentry/download/broadcast.go index 010cac86df..4e0e8af9a3 100644 --- a/cmd/sentry/download/broadcast.go +++ b/cmd/sentry/download/broadcast.go @@ -4,6 +4,7 @@ import ( "context" "math/big" + "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/eth/protocols/eth" proto_sentry "github.com/ledgerwatch/erigon/gointerfaces/sentry" @@ -15,6 +16,12 @@ import ( // Methods of sentry called by Core +const ( + // This is the target size for the packs of transactions or announcements. A + // pack can get larger than this if a single transactions exceeds this size. + maxTxPacketSize = 100 * 1024 +) + func (cs *ControlServerImpl) PropagateNewBlockHashes(ctx context.Context, announces []headerdownload.Announce) { cs.lock.RLock() defer cs.lock.RUnlock() @@ -114,3 +121,61 @@ func (cs *ControlServerImpl) BroadcastNewBlock(ctx context.Context, block *types } } } + +func (cs *ControlServerImpl) BroadcastNewTxs(ctx context.Context, txs []types.Transaction) { + cs.lock.RLock() + defer cs.lock.RUnlock() + + for len(txs) > 0 { + pendingLen := maxTxPacketSize / common.HashLength + pending := make([]common.Hash, 0, pendingLen) + + for i := 0; i < pendingLen && i < len(txs); i++ { + pending = append(pending, txs[i].Hash()) + } + txs = txs[len(pending):] + + data, err := rlp.EncodeToBytes(eth.NewPooledTransactionHashesPacket(pending)) + if err != nil { + log.Error("broadcastNewBlock", "error", err) + } + var req66, req65 *proto_sentry.SendMessageToRandomPeersRequest + for _, sentry := range cs.sentries { + if !sentry.Ready() { + continue + } + + switch sentry.Protocol() { + case eth.ETH65: + if req65 == nil { + req65 = &proto_sentry.SendMessageToRandomPeersRequest{ + MaxPeers: 1024, + Data: &proto_sentry.OutboundMessageData{ + Id: proto_sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_65, + Data: data, + }, + } + } + + if _, err = sentry.SendMessageToRandomPeers(ctx, req65, &grpc.EmptyCallOption{}); err != nil { + log.Error("broadcastNewBlock", "error", err) + } + + case eth.ETH66: + if req66 == nil { + req66 = &proto_sentry.SendMessageToRandomPeersRequest{ + MaxPeers: 1024, + Data: &proto_sentry.OutboundMessageData{ + Id: proto_sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_66, + Data: data, + }, + } + } + if _, err = sentry.SendMessageToRandomPeers(ctx, req66, &grpc.EmptyCallOption{}); err != nil { + log.Error("broadcastNewBlock", "error", err) + } + continue + } + } + } +} diff --git a/cmd/sentry/download/sentry.go b/cmd/sentry/download/sentry.go index 7c1410fc14..b6baebbf55 100644 --- a/cmd/sentry/download/sentry.go +++ b/cmd/sentry/download/sentry.go @@ -732,7 +732,9 @@ func (ss *SentryServerImpl) SendMessageById(_ context.Context, inreq *proto_sent func (ss *SentryServerImpl) SendMessageToRandomPeers(ctx context.Context, req *proto_sentry.SendMessageToRandomPeersRequest) (*proto_sentry.SentPeers, error) { msgcode := eth.FromProto[ss.Protocol.Version][req.Data.Id] - if msgcode != eth.NewBlockMsg && msgcode != eth.NewBlockHashesMsg { + if msgcode != eth.NewBlockMsg && + msgcode != eth.NewBlockHashesMsg && + msgcode != eth.NewPooledTransactionHashesMsg { return &proto_sentry.SentPeers{}, fmt.Errorf("sendMessageToRandomPeers not implemented for message Id: %s", req.Data.Id) } @@ -775,7 +777,7 @@ func (ss *SentryServerImpl) SendMessageToRandomPeers(ctx context.Context, req *p func (ss *SentryServerImpl) SendMessageToAll(ctx context.Context, req *proto_sentry.OutboundMessageData) (*proto_sentry.SentPeers, error) { msgcode := eth.FromProto[ss.Protocol.Version][req.Id] if msgcode != eth.NewBlockMsg && msgcode != eth.NewBlockHashesMsg { - return &proto_sentry.SentPeers{}, fmt.Errorf("sendMessageToRandomPeers not implemented for message Id: %s", req.Id) + return &proto_sentry.SentPeers{}, fmt.Errorf("sendMessageToAll not implemented for message Id: %s", req.Id) } var innerErr error diff --git a/eth/backend.go b/eth/backend.go index c3b6ee13cc..9bd8ba1974 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -62,6 +62,7 @@ import ( "github.com/ledgerwatch/erigon/turbo/remote" "github.com/ledgerwatch/erigon/turbo/snapshotsync" stages2 "github.com/ledgerwatch/erigon/turbo/stages" + "github.com/ledgerwatch/erigon/turbo/stages/txpropagate" "github.com/ledgerwatch/erigon/turbo/txpool" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -410,17 +411,20 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { log.Info("Set torrent params", "snapshotsDir", snapshotsDir) } - go SendPendingTxsToRpcDaemon(backend.txPool, backend.events) + go txpropagate.BroadcastNewTxsToNetworks(backend.downloadV2Ctx, backend.txPool, backend.downloadServer) go func() { defer debug.LogPanic() for { select { case b := <-backend.minedBlocks: - // todo: broadcast p2p + //p2p + //backend.downloadServer.BroadcastNewBlock(context.Background(), b, b.Difficulty()) + //rpcdaemon if err := miningRPC.BroadcastMinedBlock(b); err != nil { log.Error("txpool rpc mined block broadcast", "err", err) } + case b := <-backend.pendingBlocks: if err := miningRPC.BroadcastPendingBlock(b); err != nil { log.Error("txpool rpc pending block broadcast", "err", err) @@ -448,27 +452,6 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { return backend, nil } -const txChanSize int = 4096 - -func SendPendingTxsToRpcDaemon(txPool *core.TxPool, notifier *remotedbserver.Events) { - defer debug.LogPanic() - if notifier == nil { - return - } - - txsCh := make(chan core.NewTxsEvent, txChanSize) - txsSub := txPool.SubscribeNewTxsEvent(txsCh) - defer txsSub.Unsubscribe() - - for { - select { - case e := <-txsCh: - notifier.OnNewPendingTxs(e.Txs) - case <-txsSub.Err(): - return - } - } -} func (s *Ethereum) APIs() []rpc.API { return []rpc.API{} } @@ -568,7 +551,7 @@ func (s *Ethereum) StartMining(ctx context.Context, kv ethdb.RwKV, mining *stage go func() { defer debug.LogPanic() defer close(s.waitForMiningStop) - newTransactions := make(chan core.NewTxsEvent, txChanSize) + newTransactions := make(chan core.NewTxsEvent, 128) sub := s.txPool.SubscribeNewTxsEvent(newTransactions) defer sub.Unsubscribe() defer close(newTransactions) diff --git a/eth/stagedsync/stagebuilder.go b/eth/stagedsync/stagebuilder.go index b0c8fa7441..0debd8f796 100644 --- a/eth/stagedsync/stagebuilder.go +++ b/eth/stagedsync/stagebuilder.go @@ -14,8 +14,6 @@ import ( type ChainEventNotifier interface { OnNewHeader(*types.Header) OnNewPendingLogs(types.Logs) - OnNewPendingBlock(*types.Block) - OnNewPendingTxs([]types.Transaction) } // StageParameters contains the stage that stages receives at runtime when initializes. diff --git a/ethdb/remote/remotedbserver/events.go b/ethdb/remote/remotedbserver/events.go index 6e1fbda41c..46d46a5c6e 100644 --- a/ethdb/remote/remotedbserver/events.go +++ b/ethdb/remote/remotedbserver/events.go @@ -49,12 +49,6 @@ func (e *Events) AddPendingBlockSubscription(s PendingBlockSubscription) { e.pendingBlockSubscriptions[len(e.pendingBlockSubscriptions)] = s } -func (e *Events) AddPendingTxsSubscription(s PendingTxsSubscription) { - e.lock.Lock() - defer e.lock.Unlock() - e.pendingTxsSubscriptions[len(e.pendingTxsSubscriptions)] = s -} - func (e *Events) OnNewHeader(newHeader *types.Header) { e.lock.Lock() defer e.lock.Unlock() @@ -74,23 +68,3 @@ func (e *Events) OnNewPendingLogs(logs types.Logs) { } } } - -func (e *Events) OnNewPendingBlock(block *types.Block) { - e.lock.Lock() - defer e.lock.Unlock() - for i, sub := range e.pendingBlockSubscriptions { - if err := sub(block); err != nil { - delete(e.pendingBlockSubscriptions, i) - } - } -} - -func (e *Events) OnNewPendingTxs(txs []types.Transaction) { - e.lock.Lock() - defer e.lock.Unlock() - for i, sub := range e.pendingTxsSubscriptions { - if err := sub(txs); err != nil { - delete(e.pendingTxsSubscriptions, i) - } - } -} diff --git a/turbo/stages/mock_sentry.go b/turbo/stages/mock_sentry.go index 8e68941681..9d7678b44b 100644 --- a/turbo/stages/mock_sentry.go +++ b/turbo/stages/mock_sentry.go @@ -35,6 +35,7 @@ import ( "github.com/ledgerwatch/erigon/turbo/remote" "github.com/ledgerwatch/erigon/turbo/stages/bodydownload" "github.com/ledgerwatch/erigon/turbo/stages/headerdownload" + "github.com/ledgerwatch/erigon/turbo/stages/txpropagate" "github.com/ledgerwatch/erigon/turbo/txpool" "google.golang.org/protobuf/types/known/emptypb" ) @@ -262,6 +263,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey stagedsync.StageTxPoolCfg(mock.DB, txPool, func() { mock.StreamWg.Add(1) go txpool.RecvTxMessageLoop(mock.Ctx, mock.SentryClient, mock.downloader, mock.TxPoolP2PServer.HandleInboundMessage, &mock.ReceiveWg) + go txpropagate.BroadcastNewTxsToNetworks(mock.Ctx, txPool, mock.downloader) mock.StreamWg.Wait() mock.TxPoolP2PServer.TxFetcher.Start() }), diff --git a/turbo/stages/txpropagate/deprecated.go b/turbo/stages/txpropagate/deprecated.go new file mode 100644 index 0000000000..1d2542ab48 --- /dev/null +++ b/turbo/stages/txpropagate/deprecated.go @@ -0,0 +1,30 @@ +package txpropagate + +import ( + "context" + + "github.com/ledgerwatch/erigon/cmd/sentry/download" + "github.com/ledgerwatch/erigon/common/debug" + "github.com/ledgerwatch/erigon/core" +) + +const txChanSize int = 4096 + +func BroadcastNewTxsToNetworks(ctx context.Context, txPool *core.TxPool, s *download.ControlServerImpl) { + defer debug.LogPanic() + + txsCh := make(chan core.NewTxsEvent, txChanSize) + txsSub := txPool.SubscribeNewTxsEvent(txsCh) + defer txsSub.Unsubscribe() + + for { + select { + case e := <-txsCh: + s.BroadcastNewTxs(context.Background(), e.Txs) + case <-txsSub.Err(): + return + case <-ctx.Done(): + return + } + } +} -- GitLab