From cba7861164fc4a8253928697a469f18098198ddc Mon Sep 17 00:00:00 2001
From: Alex Sharov <AskAlexSharov@gmail.com>
Date: Sun, 27 Jun 2021 13:41:21 +0700
Subject: [PATCH] Send raw tx test (#2234)

---
 cmd/rpcdaemon/commands/eth_ming_test.go       |  5 +-
 .../commands/send_transaction_test.go         | 82 ++++++++++++++-----
 cmd/rpcdaemon/commands/test_util.go           | 13 ++-
 cmd/rpcdaemon/commands/txpool_api_test.go     | 76 +++++++++++++----
 turbo/stages/mock_sentry.go                   | 57 ++++++-------
 turbo/txpool/p2p.go                           | 14 ++--
 6 files changed, 172 insertions(+), 75 deletions(-)

diff --git a/cmd/rpcdaemon/commands/eth_ming_test.go b/cmd/rpcdaemon/commands/eth_ming_test.go
index 78089accd7..7156984e4a 100644
--- a/cmd/rpcdaemon/commands/eth_ming_test.go
+++ b/cmd/rpcdaemon/commands/eth_ming_test.go
@@ -9,11 +9,12 @@ import (
 	"github.com/ledgerwatch/erigon/core/types"
 	"github.com/ledgerwatch/erigon/gointerfaces/txpool"
 	"github.com/ledgerwatch/erigon/rlp"
+	"github.com/ledgerwatch/erigon/turbo/stages"
 	"github.com/stretchr/testify/require"
 )
 
 func TestPendingBlock(t *testing.T) {
-	ctx, conn := createTestGrpcConn(t)
+	ctx, conn := createTestGrpcConn(t, stages.Mock(t))
 	mining := txpool.NewMiningClient(conn)
 	ff := filters.New(ctx, nil, nil, mining)
 	api := NewEthAPI(NewBaseApi(ff), nil, nil, nil, mining, 5000000)
@@ -38,7 +39,7 @@ func TestPendingBlock(t *testing.T) {
 }
 
 func TestPendingLogs(t *testing.T) {
-	ctx, conn := createTestGrpcConn(t)
+	ctx, conn := createTestGrpcConn(t, stages.Mock(t))
 	mining := txpool.NewMiningClient(conn)
 	ff := filters.New(ctx, nil, nil, mining)
 	api := NewEthAPI(NewBaseApi(ff), nil, nil, nil, mining, 5000000)
diff --git a/cmd/rpcdaemon/commands/send_transaction_test.go b/cmd/rpcdaemon/commands/send_transaction_test.go
index 097d6f82ae..9b504a4f40 100644
--- a/cmd/rpcdaemon/commands/send_transaction_test.go
+++ b/cmd/rpcdaemon/commands/send_transaction_test.go
@@ -5,33 +5,75 @@ import (
 	"crypto/ecdsa"
 	"math/big"
 	"testing"
-	"time"
 
 	"github.com/holiman/uint256"
 	"github.com/ledgerwatch/erigon/cmd/rpcdaemon/filters"
 	"github.com/ledgerwatch/erigon/common"
 	"github.com/ledgerwatch/erigon/common/u256"
+	"github.com/ledgerwatch/erigon/core"
 	"github.com/ledgerwatch/erigon/core/types"
-	"github.com/ledgerwatch/erigon/crypto"
+	"github.com/ledgerwatch/erigon/eth/protocols/eth"
+	"github.com/ledgerwatch/erigon/ethdb/remote/remotedbserver"
+	"github.com/ledgerwatch/erigon/gointerfaces/sentry"
 	"github.com/ledgerwatch/erigon/gointerfaces/txpool"
+	"github.com/ledgerwatch/erigon/params"
+	"github.com/ledgerwatch/erigon/rlp"
+	"github.com/ledgerwatch/erigon/turbo/stages"
 	"github.com/stretchr/testify/require"
 )
 
 func TestSendRawTransaction(t *testing.T) {
-	t.Skip("Flaky test")
-	db := createTestKV(t)
-	ctx, conn := createTestGrpcConn(t)
+	//t.Skip("Flaky test")
+	m, require := stages.Mock(t), require.New(t)
+
+	chain, err := core.GenerateChain(m.ChainConfig, m.Genesis, m.Engine, m.DB, 1, func(i int, b *core.BlockGen) {
+		b.SetCoinbase(common.Address{1})
+	}, false /* intemediateHashes */)
+	require.NoError(err)
+	{ // Do 1 step to start txPool
+
+		// Send NewBlock message
+		b, err := rlp.EncodeToBytes(&eth.NewBlockPacket{
+			Block: chain.TopBlock,
+			TD:    big.NewInt(1), // This is ignored anyway
+		})
+		require.NoError(err)
+		m.ReceiveWg.Add(1)
+		for _, err = range m.Send(&sentry.InboundMessage{Id: sentry.MessageId_NEW_BLOCK_66, Data: b, PeerId: m.PeerId}) {
+			require.NoError(err)
+		}
+		// Send all the headers
+		b, err = rlp.EncodeToBytes(&eth.BlockHeadersPacket66{
+			RequestId:          1,
+			BlockHeadersPacket: chain.Headers,
+		})
+		require.NoError(err)
+		m.ReceiveWg.Add(1)
+		for _, err = range m.Send(&sentry.InboundMessage{Id: sentry.MessageId_BLOCK_HEADERS_66, Data: b, PeerId: m.PeerId}) {
+			require.NoError(err)
+		}
+		m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed
+
+		notifier := &remotedbserver.Events{}
+		initialCycle := true
+		highestSeenHeader := chain.TopBlock.NumberU64()
+		if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, m.ChainConfig, notifier, initialCycle, nil, m.UpdateHead, nil); err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	expectValue := uint64(1234)
+	txn, err := types.SignTx(types.NewTransaction(0, common.Address{1}, uint256.NewInt(expectValue), params.TxGas, u256.Num1, nil), *types.LatestSignerForChainID(m.ChainConfig.ChainID), m.Key)
+	require.NoError(err)
+
+	ctx, conn := createTestGrpcConn(t, m)
 	txPool := txpool.NewTxpoolClient(conn)
 	ff := filters.New(ctx, nil, txPool, txpool.NewMiningClient(conn))
-	api := NewEthAPI(NewBaseApi(ff), db, nil, txPool, nil, 5000000)
+	api := NewEthAPI(NewBaseApi(ff), m.DB, nil, txPool, nil, 5000000)
 
-	// Call GetTransactionReceipt for un-protected transaction
-	var testKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
-	expect := uint64(40)
-	txn := transaction(expect, 1000000, testKey)
 	buf := bytes.NewBuffer(nil)
-	err := txn.MarshalBinary(buf)
-	require.NoError(t, err)
+	err = txn.MarshalBinary(buf)
+	require.NoError(err)
 
 	txsCh := make(chan []types.Transaction, 1)
 	defer close(txsCh)
@@ -39,13 +81,15 @@ func TestSendRawTransaction(t *testing.T) {
 	defer api.filters.UnsubscribePendingTxs(id)
 
 	_, err = api.SendRawTransaction(ctx, buf.Bytes())
-	require.NoError(t, err)
-	select {
-	case got := <-txsCh:
-		require.Equal(t, expect, got[0].GetNonce())
-	case <-time.After(500 * time.Millisecond):
-		t.Fatalf("timeout waiting for  expected notification")
-	}
+	require.NoError(err)
+
+	got := <-txsCh
+	require.Equal(expectValue, got[0].GetValue().Uint64())
+
+	//send same tx second time and expect error
+	_, err = api.SendRawTransaction(ctx, buf.Bytes())
+	require.NotNil(err)
+	require.Equal("ALREADY_EXISTS: already known", err.Error())
 }
 
 func transaction(nonce uint64, gaslimit uint64, key *ecdsa.PrivateKey) types.Transaction {
diff --git a/cmd/rpcdaemon/commands/test_util.go b/cmd/rpcdaemon/commands/test_util.go
index c702df5b3e..3c97b7317a 100644
--- a/cmd/rpcdaemon/commands/test_util.go
+++ b/cmd/rpcdaemon/commands/test_util.go
@@ -20,7 +20,6 @@ import (
 	"github.com/ledgerwatch/erigon/ethdb/remote/remotedbserver"
 	"github.com/ledgerwatch/erigon/gointerfaces/txpool"
 	"github.com/ledgerwatch/erigon/params"
-	"github.com/ledgerwatch/erigon/turbo/mock"
 	"github.com/ledgerwatch/erigon/turbo/stages"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/test/bufconn"
@@ -204,12 +203,18 @@ type IsMiningMock struct{}
 
 func (*IsMiningMock) IsMining() bool { return false }
 
-func createTestGrpcConn(t *testing.T) (context.Context, *grpc.ClientConn) { //nolint
+func createTestGrpcConn(t *testing.T, m *stages.MockSentry) (context.Context, *grpc.ClientConn) { //nolint
 	ctx, cancel := context.WithCancel(context.Background())
 
-	ethashApi := ethash.NewFaker().APIs(nil)[1].Service.(*ethash.API)
+	apis := m.Engine.APIs(nil)
+	if len(apis) < 1 {
+		t.Fatal("couldn't instantiate Engine api")
+	}
+
+	ethashApi := apis[1].Service.(*ethash.API)
 	server := grpc.NewServer()
-	txpool.RegisterTxpoolServer(server, remotedbserver.NewTxPoolServer(ctx, mock.NewTestTxPool()))
+
+	txpool.RegisterTxpoolServer(server, remotedbserver.NewTxPoolServer(ctx, m.TxPoolP2PServer.TxPool))
 	txpool.RegisterMiningServer(server, remotedbserver.NewMiningServer(ctx, &IsMiningMock{}, ethashApi))
 	listener := bufconn.Listen(1024 * 1024)
 
diff --git a/cmd/rpcdaemon/commands/txpool_api_test.go b/cmd/rpcdaemon/commands/txpool_api_test.go
index f95fa21007..604ee36748 100644
--- a/cmd/rpcdaemon/commands/txpool_api_test.go
+++ b/cmd/rpcdaemon/commands/txpool_api_test.go
@@ -2,36 +2,82 @@ package commands
 
 import (
 	"bytes"
+	"math/big"
 	"testing"
 
+	"github.com/holiman/uint256"
 	"github.com/ledgerwatch/erigon/cmd/rpcdaemon/filters"
 	"github.com/ledgerwatch/erigon/common"
-	"github.com/ledgerwatch/erigon/crypto"
+	"github.com/ledgerwatch/erigon/common/u256"
+	"github.com/ledgerwatch/erigon/core"
+	"github.com/ledgerwatch/erigon/core/types"
+	"github.com/ledgerwatch/erigon/eth/protocols/eth"
+	"github.com/ledgerwatch/erigon/ethdb/remote/remotedbserver"
+	"github.com/ledgerwatch/erigon/gointerfaces/sentry"
 	"github.com/ledgerwatch/erigon/gointerfaces/txpool"
+	"github.com/ledgerwatch/erigon/params"
+	"github.com/ledgerwatch/erigon/rlp"
+	"github.com/ledgerwatch/erigon/turbo/stages"
 	"github.com/stretchr/testify/require"
 )
 
 func TestTxPoolContent(t *testing.T) {
-	db := createTestKV(t)
-	ctx, conn := createTestGrpcConn(t)
+	m, require := stages.Mock(t), require.New(t)
+	chain, err := core.GenerateChain(m.ChainConfig, m.Genesis, m.Engine, m.DB, 1, func(i int, b *core.BlockGen) {
+		b.SetCoinbase(common.Address{1})
+	}, false /* intemediateHashes */)
+	require.NoError(err)
+	{ // Do 1 step to start txPool
+
+		// Send NewBlock message
+		b, err := rlp.EncodeToBytes(&eth.NewBlockPacket{
+			Block: chain.TopBlock,
+			TD:    big.NewInt(1), // This is ignored anyway
+		})
+		require.NoError(err)
+		m.ReceiveWg.Add(1)
+		for _, err = range m.Send(&sentry.InboundMessage{Id: sentry.MessageId_NEW_BLOCK_66, Data: b, PeerId: m.PeerId}) {
+			require.NoError(err)
+		}
+		// Send all the headers
+		b, err = rlp.EncodeToBytes(&eth.BlockHeadersPacket66{
+			RequestId:          1,
+			BlockHeadersPacket: chain.Headers,
+		})
+		require.NoError(err)
+		m.ReceiveWg.Add(1)
+		for _, err = range m.Send(&sentry.InboundMessage{Id: sentry.MessageId_BLOCK_HEADERS_66, Data: b, PeerId: m.PeerId}) {
+			require.NoError(err)
+		}
+		m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed
+
+		notifier := &remotedbserver.Events{}
+		initialCycle := true
+		highestSeenHeader := chain.TopBlock.NumberU64()
+		if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, m.ChainConfig, notifier, initialCycle, nil, m.UpdateHead, nil); err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	ctx, conn := createTestGrpcConn(t, m)
 	txPool := txpool.NewTxpoolClient(conn)
 	ff := filters.New(ctx, nil, txPool, txpool.NewMiningClient(conn))
-	api := NewTxPoolAPI(NewBaseApi(ff), db, txPool)
+	api := NewTxPoolAPI(NewBaseApi(ff), m.DB, txPool)
+
+	expectValue := uint64(1234)
+	txn, err := types.SignTx(types.NewTransaction(0, common.Address{1}, uint256.NewInt(expectValue), params.TxGas, u256.Num1, nil), *types.LatestSignerForChainID(m.ChainConfig.ChainID), m.Key)
+	require.NoError(err)
 
-	// Call GetTransactionReceipt for un-protected transaction
-	var testKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
-	expect := uint64(40)
-	txn := transaction(expect, 1000000, testKey)
 	buf := bytes.NewBuffer(nil)
-	err := txn.MarshalBinary(buf)
-	require.NoError(t, err)
+	err = txn.MarshalBinary(buf)
+	require.NoError(err)
 
 	_, err = txPool.Add(ctx, &txpool.AddRequest{RlpTxs: [][]byte{buf.Bytes()}})
-	require.NoError(t, err)
+	require.NoError(err)
 	content, err := api.Content(ctx)
-	require.NoError(t, err)
+	require.NoError(err)
 
-	sender := (common.Address{}).String()
-	require.Equal(t, 1, len(content["pending"][sender]))
-	require.Equal(t, expect, uint64(content["pending"][sender]["40"].Nonce))
+	sender := m.Address.String()
+	require.Equal(1, len(content["pending"][sender]))
+	require.Equal(expectValue, content["pending"][sender]["0"].Value.ToInt().Uint64())
 }
diff --git a/turbo/stages/mock_sentry.go b/turbo/stages/mock_sentry.go
index cece5992fc..8e68941681 100644
--- a/turbo/stages/mock_sentry.go
+++ b/turbo/stages/mock_sentry.go
@@ -41,28 +41,29 @@ import (
 
 type MockSentry struct {
 	proto_sentry.UnimplementedSentryServer
-	Ctx           context.Context
-	t             *testing.T
-	cancel        context.CancelFunc
-	DB            ethdb.RwKV
-	tmpdir        string
-	Engine        consensus.Engine
-	ChainConfig   *params.ChainConfig
-	Sync          *stagedsync.StagedSync
-	MiningSync    *stagedsync.StagedSync
-	PendingBlocks chan *types.Block
-	MinedBlocks   chan *types.Block
-	downloader    *download.ControlServerImpl
-	Key           *ecdsa.PrivateKey
-	Genesis       *types.Block
-	SentryClient  remote.SentryClient
-	PeerId        *ptypes.H512
-	UpdateHead    func(Ctx context.Context, head uint64, hash common.Hash, td *uint256.Int)
-	streams       map[proto_sentry.MessageId][]proto_sentry.Sentry_MessagesServer
-	sentMessages  []*proto_sentry.OutboundMessageData
-	StreamWg      sync.WaitGroup
-	ReceiveWg     sync.WaitGroup
-	Address       common.Address
+	Ctx             context.Context
+	t               *testing.T
+	cancel          context.CancelFunc
+	DB              ethdb.RwKV
+	tmpdir          string
+	Engine          consensus.Engine
+	ChainConfig     *params.ChainConfig
+	Sync            *stagedsync.StagedSync
+	MiningSync      *stagedsync.StagedSync
+	PendingBlocks   chan *types.Block
+	MinedBlocks     chan *types.Block
+	downloader      *download.ControlServerImpl
+	Key             *ecdsa.PrivateKey
+	Genesis         *types.Block
+	SentryClient    remote.SentryClient
+	PeerId          *ptypes.H512
+	TxPoolP2PServer *txpool.P2PServer
+	UpdateHead      func(Ctx context.Context, head uint64, hash common.Hash, td *uint256.Int)
+	streams         map[proto_sentry.MessageId][]proto_sentry.Sentry_MessagesServer
+	sentMessages    []*proto_sentry.OutboundMessageData
+	StreamWg        sync.WaitGroup
+	ReceiveWg       sync.WaitGroup
+	Address         common.Address
 }
 
 // Stream returns stream, waiting if necessary
@@ -173,7 +174,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey
 	txPoolConfig.StartOnInit = true
 	txPool := core.NewTxPool(txPoolConfig, mock.ChainConfig, mock.DB)
 	txSentryClient := remote.NewSentryClientDirect(eth.ETH66, mock)
-	txPoolP2PServer, err := txpool.NewP2PServer(mock.Ctx, []remote.SentryClient{txSentryClient}, txPool)
+	mock.TxPoolP2PServer, err = txpool.NewP2PServer(mock.Ctx, []remote.SentryClient{txSentryClient}, txPool)
 	if err != nil {
 		if t != nil {
 			t.Fatal(err)
@@ -182,11 +183,11 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey
 		}
 	}
 	fetchTx := func(PeerId string, hashes []common.Hash) error {
-		txPoolP2PServer.SendTxsRequest(context.TODO(), PeerId, hashes)
+		mock.TxPoolP2PServer.SendTxsRequest(context.TODO(), PeerId, hashes)
 		return nil
 	}
 
-	txPoolP2PServer.TxFetcher = fetcher.NewTxFetcher(txPool.Has, txPool.AddRemotes, fetchTx)
+	mock.TxPoolP2PServer.TxFetcher = fetcher.NewTxFetcher(txPool.Has, txPool.AddRemotes, fetchTx)
 	// Committed genesis will be shared between download and mock sentry
 	_, mock.Genesis, err = core.CommitGenesisBlock(mock.DB, gspec, sm.History)
 	if _, ok := err.(*params.ConfigCompatError); err != nil && !ok {
@@ -260,9 +261,9 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey
 		stagedsync.StageTxLookupCfg(mock.DB, mock.tmpdir),
 		stagedsync.StageTxPoolCfg(mock.DB, txPool, func() {
 			mock.StreamWg.Add(1)
-			go txpool.RecvTxMessageLoop(mock.Ctx, mock.SentryClient, mock.downloader, txPoolP2PServer.HandleInboundMessage, &mock.ReceiveWg)
+			go txpool.RecvTxMessageLoop(mock.Ctx, mock.SentryClient, mock.downloader, mock.TxPoolP2PServer.HandleInboundMessage, &mock.ReceiveWg)
 			mock.StreamWg.Wait()
-			txPoolP2PServer.TxFetcher.Start()
+			mock.TxPoolP2PServer.TxFetcher.Start()
 		}),
 		stagedsync.StageFinishCfg(mock.DB, mock.tmpdir),
 		true, /* test */
@@ -300,7 +301,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey
 		t.Cleanup(func() {
 			mock.cancel()
 			txPool.Stop()
-			txPoolP2PServer.TxFetcher.Stop()
+			mock.TxPoolP2PServer.TxFetcher.Stop()
 		})
 	}
 	return mock
diff --git a/turbo/txpool/p2p.go b/turbo/txpool/p2p.go
index 3390029e03..3a8aff55c9 100644
--- a/turbo/txpool/p2p.go
+++ b/turbo/txpool/p2p.go
@@ -30,7 +30,7 @@ import (
 type P2PServer struct {
 	ctx       context.Context
 	Sentries  []remote.SentryClient
-	txPool    *core.TxPool
+	TxPool    *core.TxPool
 	TxFetcher *fetcher.TxFetcher
 }
 
@@ -38,7 +38,7 @@ func NewP2PServer(ctx context.Context, sentries []remote.SentryClient, txPool *c
 	cs := &P2PServer{
 		ctx:      ctx,
 		Sentries: sentries,
-		txPool:   txPool,
+		TxPool:   txPool,
 	}
 
 	return cs, nil
@@ -83,7 +83,7 @@ func (tp *P2PServer) transactions66(ctx context.Context, inreq *proto_sentry.Inb
 }
 
 func (tp *P2PServer) transactions65(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry remote.SentryClient) error {
-	if tp.txPool == nil {
+	if tp.TxPool == nil {
 		return nil
 	}
 	var query eth.TransactionsPacket
@@ -94,14 +94,14 @@ func (tp *P2PServer) transactions65(ctx context.Context, inreq *proto_sentry.Inb
 }
 
 func (tp *P2PServer) getPooledTransactions66(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry remote.SentryClient) error {
-	if tp.txPool == nil {
+	if tp.TxPool == nil {
 		return nil
 	}
 	var query eth.GetPooledTransactionsPacket66
 	if err := rlp.DecodeBytes(inreq.Data, &query); err != nil {
 		return fmt.Errorf("decoding GetPooledTransactionsPacket66: %v, data: %x", err, inreq.Data)
 	}
-	_, txs := eth.AnswerGetPooledTransactions(tp.txPool, query.GetPooledTransactionsPacket)
+	_, txs := eth.AnswerGetPooledTransactions(tp.TxPool, query.GetPooledTransactionsPacket)
 	b, err := rlp.EncodeToBytes(&eth.PooledTransactionsRLPPacket66{
 		RequestId:                   query.RequestId,
 		PooledTransactionsRLPPacket: txs,
@@ -122,14 +122,14 @@ func (tp *P2PServer) getPooledTransactions66(ctx context.Context, inreq *proto_s
 }
 
 func (tp *P2PServer) getPooledTransactions65(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry remote.SentryClient) error {
-	if tp.txPool == nil {
+	if tp.TxPool == nil {
 		return nil
 	}
 	var query eth.GetPooledTransactionsPacket
 	if err := rlp.DecodeBytes(inreq.Data, &query); err != nil {
 		return fmt.Errorf("decoding getPooledTransactions65: %v, data: %x", err, inreq.Data)
 	}
-	_, txs := eth.AnswerGetPooledTransactions(tp.txPool, query)
+	_, txs := eth.AnswerGetPooledTransactions(tp.TxPool, query)
 	b, err := rlp.EncodeToBytes(eth.PooledTransactionsRLPPacket(txs))
 	if err != nil {
 		return fmt.Errorf("encode getPooledTransactions65 response: %v", err)
-- 
GitLab