good morning!!!!

Skip to content
Snippets Groups Projects
Unverified Commit cba78611 authored by Alex Sharov's avatar Alex Sharov Committed by GitHub
Browse files

Send raw tx test (#2234)

parent 6b21eab3
No related branches found
No related tags found
No related merge requests found
......@@ -9,11 +9,12 @@ import (
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/gointerfaces/txpool"
"github.com/ledgerwatch/erigon/rlp"
"github.com/ledgerwatch/erigon/turbo/stages"
"github.com/stretchr/testify/require"
)
func TestPendingBlock(t *testing.T) {
ctx, conn := createTestGrpcConn(t)
ctx, conn := createTestGrpcConn(t, stages.Mock(t))
mining := txpool.NewMiningClient(conn)
ff := filters.New(ctx, nil, nil, mining)
api := NewEthAPI(NewBaseApi(ff), nil, nil, nil, mining, 5000000)
......@@ -38,7 +39,7 @@ func TestPendingBlock(t *testing.T) {
}
func TestPendingLogs(t *testing.T) {
ctx, conn := createTestGrpcConn(t)
ctx, conn := createTestGrpcConn(t, stages.Mock(t))
mining := txpool.NewMiningClient(conn)
ff := filters.New(ctx, nil, nil, mining)
api := NewEthAPI(NewBaseApi(ff), nil, nil, nil, mining, 5000000)
......
......@@ -5,33 +5,75 @@ import (
"crypto/ecdsa"
"math/big"
"testing"
"time"
"github.com/holiman/uint256"
"github.com/ledgerwatch/erigon/cmd/rpcdaemon/filters"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/common/u256"
"github.com/ledgerwatch/erigon/core"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/crypto"
"github.com/ledgerwatch/erigon/eth/protocols/eth"
"github.com/ledgerwatch/erigon/ethdb/remote/remotedbserver"
"github.com/ledgerwatch/erigon/gointerfaces/sentry"
"github.com/ledgerwatch/erigon/gointerfaces/txpool"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/rlp"
"github.com/ledgerwatch/erigon/turbo/stages"
"github.com/stretchr/testify/require"
)
func TestSendRawTransaction(t *testing.T) {
t.Skip("Flaky test")
db := createTestKV(t)
ctx, conn := createTestGrpcConn(t)
//t.Skip("Flaky test")
m, require := stages.Mock(t), require.New(t)
chain, err := core.GenerateChain(m.ChainConfig, m.Genesis, m.Engine, m.DB, 1, func(i int, b *core.BlockGen) {
b.SetCoinbase(common.Address{1})
}, false /* intemediateHashes */)
require.NoError(err)
{ // Do 1 step to start txPool
// Send NewBlock message
b, err := rlp.EncodeToBytes(&eth.NewBlockPacket{
Block: chain.TopBlock,
TD: big.NewInt(1), // This is ignored anyway
})
require.NoError(err)
m.ReceiveWg.Add(1)
for _, err = range m.Send(&sentry.InboundMessage{Id: sentry.MessageId_NEW_BLOCK_66, Data: b, PeerId: m.PeerId}) {
require.NoError(err)
}
// Send all the headers
b, err = rlp.EncodeToBytes(&eth.BlockHeadersPacket66{
RequestId: 1,
BlockHeadersPacket: chain.Headers,
})
require.NoError(err)
m.ReceiveWg.Add(1)
for _, err = range m.Send(&sentry.InboundMessage{Id: sentry.MessageId_BLOCK_HEADERS_66, Data: b, PeerId: m.PeerId}) {
require.NoError(err)
}
m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed
notifier := &remotedbserver.Events{}
initialCycle := true
highestSeenHeader := chain.TopBlock.NumberU64()
if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, m.ChainConfig, notifier, initialCycle, nil, m.UpdateHead, nil); err != nil {
t.Fatal(err)
}
}
expectValue := uint64(1234)
txn, err := types.SignTx(types.NewTransaction(0, common.Address{1}, uint256.NewInt(expectValue), params.TxGas, u256.Num1, nil), *types.LatestSignerForChainID(m.ChainConfig.ChainID), m.Key)
require.NoError(err)
ctx, conn := createTestGrpcConn(t, m)
txPool := txpool.NewTxpoolClient(conn)
ff := filters.New(ctx, nil, txPool, txpool.NewMiningClient(conn))
api := NewEthAPI(NewBaseApi(ff), db, nil, txPool, nil, 5000000)
api := NewEthAPI(NewBaseApi(ff), m.DB, nil, txPool, nil, 5000000)
// Call GetTransactionReceipt for un-protected transaction
var testKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
expect := uint64(40)
txn := transaction(expect, 1000000, testKey)
buf := bytes.NewBuffer(nil)
err := txn.MarshalBinary(buf)
require.NoError(t, err)
err = txn.MarshalBinary(buf)
require.NoError(err)
txsCh := make(chan []types.Transaction, 1)
defer close(txsCh)
......@@ -39,13 +81,15 @@ func TestSendRawTransaction(t *testing.T) {
defer api.filters.UnsubscribePendingTxs(id)
_, err = api.SendRawTransaction(ctx, buf.Bytes())
require.NoError(t, err)
select {
case got := <-txsCh:
require.Equal(t, expect, got[0].GetNonce())
case <-time.After(500 * time.Millisecond):
t.Fatalf("timeout waiting for expected notification")
}
require.NoError(err)
got := <-txsCh
require.Equal(expectValue, got[0].GetValue().Uint64())
//send same tx second time and expect error
_, err = api.SendRawTransaction(ctx, buf.Bytes())
require.NotNil(err)
require.Equal("ALREADY_EXISTS: already known", err.Error())
}
func transaction(nonce uint64, gaslimit uint64, key *ecdsa.PrivateKey) types.Transaction {
......
......@@ -20,7 +20,6 @@ import (
"github.com/ledgerwatch/erigon/ethdb/remote/remotedbserver"
"github.com/ledgerwatch/erigon/gointerfaces/txpool"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/turbo/mock"
"github.com/ledgerwatch/erigon/turbo/stages"
"google.golang.org/grpc"
"google.golang.org/grpc/test/bufconn"
......@@ -204,12 +203,18 @@ type IsMiningMock struct{}
func (*IsMiningMock) IsMining() bool { return false }
func createTestGrpcConn(t *testing.T) (context.Context, *grpc.ClientConn) { //nolint
func createTestGrpcConn(t *testing.T, m *stages.MockSentry) (context.Context, *grpc.ClientConn) { //nolint
ctx, cancel := context.WithCancel(context.Background())
ethashApi := ethash.NewFaker().APIs(nil)[1].Service.(*ethash.API)
apis := m.Engine.APIs(nil)
if len(apis) < 1 {
t.Fatal("couldn't instantiate Engine api")
}
ethashApi := apis[1].Service.(*ethash.API)
server := grpc.NewServer()
txpool.RegisterTxpoolServer(server, remotedbserver.NewTxPoolServer(ctx, mock.NewTestTxPool()))
txpool.RegisterTxpoolServer(server, remotedbserver.NewTxPoolServer(ctx, m.TxPoolP2PServer.TxPool))
txpool.RegisterMiningServer(server, remotedbserver.NewMiningServer(ctx, &IsMiningMock{}, ethashApi))
listener := bufconn.Listen(1024 * 1024)
......
......@@ -2,36 +2,82 @@ package commands
import (
"bytes"
"math/big"
"testing"
"github.com/holiman/uint256"
"github.com/ledgerwatch/erigon/cmd/rpcdaemon/filters"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/crypto"
"github.com/ledgerwatch/erigon/common/u256"
"github.com/ledgerwatch/erigon/core"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/eth/protocols/eth"
"github.com/ledgerwatch/erigon/ethdb/remote/remotedbserver"
"github.com/ledgerwatch/erigon/gointerfaces/sentry"
"github.com/ledgerwatch/erigon/gointerfaces/txpool"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/rlp"
"github.com/ledgerwatch/erigon/turbo/stages"
"github.com/stretchr/testify/require"
)
func TestTxPoolContent(t *testing.T) {
db := createTestKV(t)
ctx, conn := createTestGrpcConn(t)
m, require := stages.Mock(t), require.New(t)
chain, err := core.GenerateChain(m.ChainConfig, m.Genesis, m.Engine, m.DB, 1, func(i int, b *core.BlockGen) {
b.SetCoinbase(common.Address{1})
}, false /* intemediateHashes */)
require.NoError(err)
{ // Do 1 step to start txPool
// Send NewBlock message
b, err := rlp.EncodeToBytes(&eth.NewBlockPacket{
Block: chain.TopBlock,
TD: big.NewInt(1), // This is ignored anyway
})
require.NoError(err)
m.ReceiveWg.Add(1)
for _, err = range m.Send(&sentry.InboundMessage{Id: sentry.MessageId_NEW_BLOCK_66, Data: b, PeerId: m.PeerId}) {
require.NoError(err)
}
// Send all the headers
b, err = rlp.EncodeToBytes(&eth.BlockHeadersPacket66{
RequestId: 1,
BlockHeadersPacket: chain.Headers,
})
require.NoError(err)
m.ReceiveWg.Add(1)
for _, err = range m.Send(&sentry.InboundMessage{Id: sentry.MessageId_BLOCK_HEADERS_66, Data: b, PeerId: m.PeerId}) {
require.NoError(err)
}
m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed
notifier := &remotedbserver.Events{}
initialCycle := true
highestSeenHeader := chain.TopBlock.NumberU64()
if err := stages.StageLoopStep(m.Ctx, m.DB, m.Sync, highestSeenHeader, m.ChainConfig, notifier, initialCycle, nil, m.UpdateHead, nil); err != nil {
t.Fatal(err)
}
}
ctx, conn := createTestGrpcConn(t, m)
txPool := txpool.NewTxpoolClient(conn)
ff := filters.New(ctx, nil, txPool, txpool.NewMiningClient(conn))
api := NewTxPoolAPI(NewBaseApi(ff), db, txPool)
api := NewTxPoolAPI(NewBaseApi(ff), m.DB, txPool)
expectValue := uint64(1234)
txn, err := types.SignTx(types.NewTransaction(0, common.Address{1}, uint256.NewInt(expectValue), params.TxGas, u256.Num1, nil), *types.LatestSignerForChainID(m.ChainConfig.ChainID), m.Key)
require.NoError(err)
// Call GetTransactionReceipt for un-protected transaction
var testKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
expect := uint64(40)
txn := transaction(expect, 1000000, testKey)
buf := bytes.NewBuffer(nil)
err := txn.MarshalBinary(buf)
require.NoError(t, err)
err = txn.MarshalBinary(buf)
require.NoError(err)
_, err = txPool.Add(ctx, &txpool.AddRequest{RlpTxs: [][]byte{buf.Bytes()}})
require.NoError(t, err)
require.NoError(err)
content, err := api.Content(ctx)
require.NoError(t, err)
require.NoError(err)
sender := (common.Address{}).String()
require.Equal(t, 1, len(content["pending"][sender]))
require.Equal(t, expect, uint64(content["pending"][sender]["40"].Nonce))
sender := m.Address.String()
require.Equal(1, len(content["pending"][sender]))
require.Equal(expectValue, content["pending"][sender]["0"].Value.ToInt().Uint64())
}
......@@ -41,28 +41,29 @@ import (
type MockSentry struct {
proto_sentry.UnimplementedSentryServer
Ctx context.Context
t *testing.T
cancel context.CancelFunc
DB ethdb.RwKV
tmpdir string
Engine consensus.Engine
ChainConfig *params.ChainConfig
Sync *stagedsync.StagedSync
MiningSync *stagedsync.StagedSync
PendingBlocks chan *types.Block
MinedBlocks chan *types.Block
downloader *download.ControlServerImpl
Key *ecdsa.PrivateKey
Genesis *types.Block
SentryClient remote.SentryClient
PeerId *ptypes.H512
UpdateHead func(Ctx context.Context, head uint64, hash common.Hash, td *uint256.Int)
streams map[proto_sentry.MessageId][]proto_sentry.Sentry_MessagesServer
sentMessages []*proto_sentry.OutboundMessageData
StreamWg sync.WaitGroup
ReceiveWg sync.WaitGroup
Address common.Address
Ctx context.Context
t *testing.T
cancel context.CancelFunc
DB ethdb.RwKV
tmpdir string
Engine consensus.Engine
ChainConfig *params.ChainConfig
Sync *stagedsync.StagedSync
MiningSync *stagedsync.StagedSync
PendingBlocks chan *types.Block
MinedBlocks chan *types.Block
downloader *download.ControlServerImpl
Key *ecdsa.PrivateKey
Genesis *types.Block
SentryClient remote.SentryClient
PeerId *ptypes.H512
TxPoolP2PServer *txpool.P2PServer
UpdateHead func(Ctx context.Context, head uint64, hash common.Hash, td *uint256.Int)
streams map[proto_sentry.MessageId][]proto_sentry.Sentry_MessagesServer
sentMessages []*proto_sentry.OutboundMessageData
StreamWg sync.WaitGroup
ReceiveWg sync.WaitGroup
Address common.Address
}
// Stream returns stream, waiting if necessary
......@@ -173,7 +174,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey
txPoolConfig.StartOnInit = true
txPool := core.NewTxPool(txPoolConfig, mock.ChainConfig, mock.DB)
txSentryClient := remote.NewSentryClientDirect(eth.ETH66, mock)
txPoolP2PServer, err := txpool.NewP2PServer(mock.Ctx, []remote.SentryClient{txSentryClient}, txPool)
mock.TxPoolP2PServer, err = txpool.NewP2PServer(mock.Ctx, []remote.SentryClient{txSentryClient}, txPool)
if err != nil {
if t != nil {
t.Fatal(err)
......@@ -182,11 +183,11 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey
}
}
fetchTx := func(PeerId string, hashes []common.Hash) error {
txPoolP2PServer.SendTxsRequest(context.TODO(), PeerId, hashes)
mock.TxPoolP2PServer.SendTxsRequest(context.TODO(), PeerId, hashes)
return nil
}
txPoolP2PServer.TxFetcher = fetcher.NewTxFetcher(txPool.Has, txPool.AddRemotes, fetchTx)
mock.TxPoolP2PServer.TxFetcher = fetcher.NewTxFetcher(txPool.Has, txPool.AddRemotes, fetchTx)
// Committed genesis will be shared between download and mock sentry
_, mock.Genesis, err = core.CommitGenesisBlock(mock.DB, gspec, sm.History)
if _, ok := err.(*params.ConfigCompatError); err != nil && !ok {
......@@ -260,9 +261,9 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey
stagedsync.StageTxLookupCfg(mock.DB, mock.tmpdir),
stagedsync.StageTxPoolCfg(mock.DB, txPool, func() {
mock.StreamWg.Add(1)
go txpool.RecvTxMessageLoop(mock.Ctx, mock.SentryClient, mock.downloader, txPoolP2PServer.HandleInboundMessage, &mock.ReceiveWg)
go txpool.RecvTxMessageLoop(mock.Ctx, mock.SentryClient, mock.downloader, mock.TxPoolP2PServer.HandleInboundMessage, &mock.ReceiveWg)
mock.StreamWg.Wait()
txPoolP2PServer.TxFetcher.Start()
mock.TxPoolP2PServer.TxFetcher.Start()
}),
stagedsync.StageFinishCfg(mock.DB, mock.tmpdir),
true, /* test */
......@@ -300,7 +301,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey
t.Cleanup(func() {
mock.cancel()
txPool.Stop()
txPoolP2PServer.TxFetcher.Stop()
mock.TxPoolP2PServer.TxFetcher.Stop()
})
}
return mock
......
......@@ -30,7 +30,7 @@ import (
type P2PServer struct {
ctx context.Context
Sentries []remote.SentryClient
txPool *core.TxPool
TxPool *core.TxPool
TxFetcher *fetcher.TxFetcher
}
......@@ -38,7 +38,7 @@ func NewP2PServer(ctx context.Context, sentries []remote.SentryClient, txPool *c
cs := &P2PServer{
ctx: ctx,
Sentries: sentries,
txPool: txPool,
TxPool: txPool,
}
return cs, nil
......@@ -83,7 +83,7 @@ func (tp *P2PServer) transactions66(ctx context.Context, inreq *proto_sentry.Inb
}
func (tp *P2PServer) transactions65(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry remote.SentryClient) error {
if tp.txPool == nil {
if tp.TxPool == nil {
return nil
}
var query eth.TransactionsPacket
......@@ -94,14 +94,14 @@ func (tp *P2PServer) transactions65(ctx context.Context, inreq *proto_sentry.Inb
}
func (tp *P2PServer) getPooledTransactions66(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry remote.SentryClient) error {
if tp.txPool == nil {
if tp.TxPool == nil {
return nil
}
var query eth.GetPooledTransactionsPacket66
if err := rlp.DecodeBytes(inreq.Data, &query); err != nil {
return fmt.Errorf("decoding GetPooledTransactionsPacket66: %v, data: %x", err, inreq.Data)
}
_, txs := eth.AnswerGetPooledTransactions(tp.txPool, query.GetPooledTransactionsPacket)
_, txs := eth.AnswerGetPooledTransactions(tp.TxPool, query.GetPooledTransactionsPacket)
b, err := rlp.EncodeToBytes(&eth.PooledTransactionsRLPPacket66{
RequestId: query.RequestId,
PooledTransactionsRLPPacket: txs,
......@@ -122,14 +122,14 @@ func (tp *P2PServer) getPooledTransactions66(ctx context.Context, inreq *proto_s
}
func (tp *P2PServer) getPooledTransactions65(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry remote.SentryClient) error {
if tp.txPool == nil {
if tp.TxPool == nil {
return nil
}
var query eth.GetPooledTransactionsPacket
if err := rlp.DecodeBytes(inreq.Data, &query); err != nil {
return fmt.Errorf("decoding getPooledTransactions65: %v, data: %x", err, inreq.Data)
}
_, txs := eth.AnswerGetPooledTransactions(tp.txPool, query)
_, txs := eth.AnswerGetPooledTransactions(tp.TxPool, query)
b, err := rlp.EncodeToBytes(eth.PooledTransactionsRLPPacket(txs))
if err != nil {
return fmt.Errorf("encode getPooledTransactions65 response: %v", err)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment