From 0e7d019e0eb3a4929c504ac5899fbe55c4227f7e Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= <peterke@gmail.com>
Date: Fri, 20 Oct 2017 12:34:43 +0300
Subject: [PATCH] core: fire tx event on replace, expand tests

---
 core/bloombits/matcher_test.go |   2 +-
 core/tx_pool.go                |   5 +
 core/tx_pool_test.go           | 188 ++++++++++++++++++++++++++++++++-
 3 files changed, 191 insertions(+), 4 deletions(-)

diff --git a/core/bloombits/matcher_test.go b/core/bloombits/matcher_test.go
index f95d0ea9e..2e15e7aac 100644
--- a/core/bloombits/matcher_test.go
+++ b/core/bloombits/matcher_test.go
@@ -85,7 +85,7 @@ func TestWildcardMatcher(t *testing.T) {
 }
 
 // makeRandomIndexes generates a random filter system, composed on multiple filter
-// criteria, each having one bloom list component for the address and arbitrarilly
+// criteria, each having one bloom list component for the address and arbitrarily
 // many topic bloom list components.
 func makeRandomIndexes(lengths []int, max int) [][]bloomIndexes {
 	res := make([][]bloomIndexes, len(lengths))
diff --git a/core/tx_pool.go b/core/tx_pool.go
index 0ad765179..a705e36d6 100644
--- a/core/tx_pool.go
+++ b/core/tx_pool.go
@@ -640,6 +640,10 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
 		pool.journalTx(from, tx)
 
 		log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())
+
+		// We've directly injected a replacement transaction, notify subsystems
+		go pool.txFeed.Send(TxPreEvent{tx})
+
 		return old != nil, nil
 	}
 	// New transaction isn't replacing a pending one, push into queue
@@ -729,6 +733,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
 	// Set the potentially new pending nonce and notify any subsystems of the new tx
 	pool.beats[addr] = time.Now()
 	pool.pendingState.SetNonce(addr, tx.Nonce()+1)
+
 	go pool.txFeed.Send(TxPreEvent{tx})
 }
 
diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go
index 17d736877..eec128cba 100644
--- a/core/tx_pool_test.go
+++ b/core/tx_pool_test.go
@@ -117,6 +117,28 @@ func validateTxPoolInternals(pool *TxPool) error {
 	return nil
 }
 
+// validateEvents checks that the correct number of transaction addition events
+// were fired on the pool's event feed.
+func validateEvents(events chan TxPreEvent, count int) error {
+	for i := 0; i < count; i++ {
+		select {
+		case <-events:
+		case <-time.After(time.Second):
+			return fmt.Errorf("event #%d not fired", i)
+		}
+	}
+	select {
+	case tx := <-events:
+		return fmt.Errorf("more than %d events fired: %v", count, tx.Tx)
+
+	case <-time.After(50 * time.Millisecond):
+		// This branch should be "default", but it's a data race between goroutines,
+		// reading the event channel and pushng into it, so better wait a bit ensuring
+		// really nothing gets injected.
+	}
+	return nil
+}
+
 func deriveSender(tx *types.Transaction) (common.Address, error) {
 	return types.Sender(types.HomesteadSigner{}, tx)
 }
@@ -149,7 +171,9 @@ func (c *testChain) State() (*state.StateDB, error) {
 // This test simulates a scenario where a new block is imported during a
 // state reset and tests whether the pending state is in sync with the
 // block head event that initiated the resetState().
-func TestStateChangeDuringPoolReset(t *testing.T) {
+func TestStateChangeDuringTransactionPoolReset(t *testing.T) {
+	t.Parallel()
+
 	var (
 		db, _      = ethdb.NewMemDatabase()
 		key, _     = crypto.GenerateKey()
@@ -201,6 +225,8 @@ func TestStateChangeDuringPoolReset(t *testing.T) {
 }
 
 func TestInvalidTransactions(t *testing.T) {
+	t.Parallel()
+
 	pool, key := setupTxPool()
 	defer pool.Stop()
 
@@ -236,6 +262,8 @@ func TestInvalidTransactions(t *testing.T) {
 }
 
 func TestTransactionQueue(t *testing.T) {
+	t.Parallel()
+
 	pool, key := setupTxPool()
 	defer pool.Stop()
 
@@ -287,7 +315,9 @@ func TestTransactionQueue(t *testing.T) {
 	}
 }
 
-func TestNegativeValue(t *testing.T) {
+func TestTransactionNegativeValue(t *testing.T) {
+	t.Parallel()
+
 	pool, key := setupTxPool()
 	defer pool.Stop()
 
@@ -300,6 +330,8 @@ func TestNegativeValue(t *testing.T) {
 }
 
 func TestTransactionChainFork(t *testing.T) {
+	t.Parallel()
+
 	pool, key := setupTxPool()
 	defer pool.Stop()
 
@@ -328,6 +360,8 @@ func TestTransactionChainFork(t *testing.T) {
 }
 
 func TestTransactionDoubleNonce(t *testing.T) {
+	t.Parallel()
+
 	pool, key := setupTxPool()
 	defer pool.Stop()
 
@@ -376,7 +410,9 @@ func TestTransactionDoubleNonce(t *testing.T) {
 	}
 }
 
-func TestMissingNonce(t *testing.T) {
+func TestTransactionMissingNonce(t *testing.T) {
+	t.Parallel()
+
 	pool, key := setupTxPool()
 	defer pool.Stop()
 
@@ -398,6 +434,8 @@ func TestMissingNonce(t *testing.T) {
 }
 
 func TestTransactionNonceRecovery(t *testing.T) {
+	t.Parallel()
+
 	const n = 10
 	pool, key := setupTxPool()
 	defer pool.Stop()
@@ -422,6 +460,8 @@ func TestTransactionNonceRecovery(t *testing.T) {
 // Tests that if an account runs out of funds, any pending and queued transactions
 // are dropped.
 func TestTransactionDropping(t *testing.T) {
+	t.Parallel()
+
 	// Create a test account and fund it
 	pool, key := setupTxPool()
 	defer pool.Stop()
@@ -515,6 +555,8 @@ func TestTransactionDropping(t *testing.T) {
 // of fund), all consecutive (still valid, but not executable) transactions are
 // postponed back into the future queue to prevent broadcasting them.
 func TestTransactionPostponing(t *testing.T) {
+	t.Parallel()
+
 	// Create a test account and fund it
 	pool, key := setupTxPool()
 	defer pool.Stop()
@@ -586,9 +628,68 @@ func TestTransactionPostponing(t *testing.T) {
 	}
 }
 
+// Tests that if the transaction pool has both executable and non-executable
+// transactions from an origin account, filling the nonce gap moves all queued
+// ones into the pending pool.
+func TestTransactionGapFilling(t *testing.T) {
+	t.Parallel()
+
+	// Create a test account and fund it
+	pool, key := setupTxPool()
+	defer pool.Stop()
+
+	account, _ := deriveSender(transaction(0, big.NewInt(0), key))
+	pool.currentState.AddBalance(account, big.NewInt(1000000))
+
+	// Keep track of transaction events to ensure all executables get announced
+	events := make(chan TxPreEvent, testTxPoolConfig.AccountQueue+5)
+	sub := pool.txFeed.Subscribe(events)
+	defer sub.Unsubscribe()
+
+	// Create a pending and a queued transaction with a nonce-gap in between
+	if err := pool.AddRemote(transaction(0, big.NewInt(100000), key)); err != nil {
+		t.Fatalf("failed to add pending transaction: %v", err)
+	}
+	if err := pool.AddRemote(transaction(2, big.NewInt(100000), key)); err != nil {
+		t.Fatalf("failed to add queued transaction: %v", err)
+	}
+	pending, queued := pool.Stats()
+	if pending != 1 {
+		t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 1)
+	}
+	if queued != 1 {
+		t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1)
+	}
+	if err := validateEvents(events, 1); err != nil {
+		t.Fatalf("original event firing failed: %v", err)
+	}
+	if err := validateTxPoolInternals(pool); err != nil {
+		t.Fatalf("pool internal state corrupted: %v", err)
+	}
+	// Fill the nonce gap and ensure all transactions become pending
+	if err := pool.AddRemote(transaction(1, big.NewInt(100000), key)); err != nil {
+		t.Fatalf("failed to add gapped transaction: %v", err)
+	}
+	pending, queued = pool.Stats()
+	if pending != 3 {
+		t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3)
+	}
+	if queued != 0 {
+		t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
+	}
+	if err := validateEvents(events, 2); err != nil {
+		t.Fatalf("gap-filling event firing failed: %v", err)
+	}
+	if err := validateTxPoolInternals(pool); err != nil {
+		t.Fatalf("pool internal state corrupted: %v", err)
+	}
+}
+
 // Tests that if the transaction count belonging to a single account goes above
 // some threshold, the higher transactions are dropped to prevent DOS attacks.
 func TestTransactionQueueAccountLimiting(t *testing.T) {
+	t.Parallel()
+
 	// Create a test account and fund it
 	pool, key := setupTxPool()
 	defer pool.Stop()
@@ -632,6 +733,8 @@ func TestTransactionQueueGlobalLimitingNoLocals(t *testing.T) {
 }
 
 func testTransactionQueueGlobalLimiting(t *testing.T, nolocals bool) {
+	t.Parallel()
+
 	// Create the pool to test the limit enforcement with
 	db, _ := ethdb.NewMemDatabase()
 	statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
@@ -782,6 +885,8 @@ func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) {
 // above some threshold, as long as the transactions are executable, they are
 // accepted.
 func TestTransactionPendingLimiting(t *testing.T) {
+	t.Parallel()
+
 	// Create a test account and fund it
 	pool, key := setupTxPool()
 	defer pool.Stop()
@@ -789,6 +894,11 @@ func TestTransactionPendingLimiting(t *testing.T) {
 	account, _ := deriveSender(transaction(0, big.NewInt(0), key))
 	pool.currentState.AddBalance(account, big.NewInt(1000000))
 
+	// Keep track of transaction events to ensure all executables get announced
+	events := make(chan TxPreEvent, testTxPoolConfig.AccountQueue+5)
+	sub := pool.txFeed.Subscribe(events)
+	defer sub.Unsubscribe()
+
 	// Keep queuing up transactions and make sure all above a limit are dropped
 	for i := uint64(0); i < testTxPoolConfig.AccountQueue+5; i++ {
 		if err := pool.AddRemote(transaction(i, big.NewInt(100000), key)); err != nil {
@@ -804,6 +914,12 @@ func TestTransactionPendingLimiting(t *testing.T) {
 	if len(pool.all) != int(testTxPoolConfig.AccountQueue+5) {
 		t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), testTxPoolConfig.AccountQueue+5)
 	}
+	if err := validateEvents(events, int(testTxPoolConfig.AccountQueue+5)); err != nil {
+		t.Fatalf("event firing failed: %v", err)
+	}
+	if err := validateTxPoolInternals(pool); err != nil {
+		t.Fatalf("pool internal state corrupted: %v", err)
+	}
 }
 
 // Tests that the transaction limits are enforced the same way irrelevant whether
@@ -812,6 +928,8 @@ func TestTransactionQueueLimitingEquivalency(t *testing.T)   { testTransactionLi
 func TestTransactionPendingLimitingEquivalency(t *testing.T) { testTransactionLimitingEquivalency(t, 0) }
 
 func testTransactionLimitingEquivalency(t *testing.T, origin uint64) {
+	t.Parallel()
+
 	// Add a batch of transactions to a pool one by one
 	pool1, key1 := setupTxPool()
 	defer pool1.Stop()
@@ -859,6 +977,8 @@ func testTransactionLimitingEquivalency(t *testing.T, origin uint64) {
 // some hard threshold, the higher transactions are dropped to prevent DOS
 // attacks.
 func TestTransactionPendingGlobalLimiting(t *testing.T) {
+	t.Parallel()
+
 	// Create the pool to test the limit enforcement with
 	db, _ := ethdb.NewMemDatabase()
 	statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
@@ -904,6 +1024,8 @@ func TestTransactionPendingGlobalLimiting(t *testing.T) {
 
 // Tests that if transactions start being capped, transactions are also removed from 'all'
 func TestTransactionCapClearsFromAll(t *testing.T) {
+	t.Parallel()
+
 	// Create the pool to test the limit enforcement with
 	db, _ := ethdb.NewMemDatabase()
 	statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
@@ -937,6 +1059,8 @@ func TestTransactionCapClearsFromAll(t *testing.T) {
 // some hard threshold, if they are under the minimum guaranteed slot count then
 // the transactions are still kept.
 func TestTransactionPendingMinimumAllowance(t *testing.T) {
+	t.Parallel()
+
 	// Create the pool to test the limit enforcement with
 	db, _ := ethdb.NewMemDatabase()
 	statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
@@ -984,6 +1108,8 @@ func TestTransactionPendingMinimumAllowance(t *testing.T) {
 //
 // Note, local transactions are never allowed to be dropped.
 func TestTransactionPoolRepricing(t *testing.T) {
+	t.Parallel()
+
 	// Create the pool to test the pricing enforcement with
 	db, _ := ethdb.NewMemDatabase()
 	statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
@@ -992,6 +1118,11 @@ func TestTransactionPoolRepricing(t *testing.T) {
 	pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
 	defer pool.Stop()
 
+	// Keep track of transaction events to ensure all executables get announced
+	events := make(chan TxPreEvent, 32)
+	sub := pool.txFeed.Subscribe(events)
+	defer sub.Unsubscribe()
+
 	// Create a number of test accounts and fund them
 	keys := make([]*ecdsa.PrivateKey, 3)
 	for i := 0; i < len(keys); i++ {
@@ -1022,6 +1153,9 @@ func TestTransactionPoolRepricing(t *testing.T) {
 	if queued != 3 {
 		t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 3)
 	}
+	if err := validateEvents(events, 4); err != nil {
+		t.Fatalf("original event firing failed: %v", err)
+	}
 	if err := validateTxPoolInternals(pool); err != nil {
 		t.Fatalf("pool internal state corrupted: %v", err)
 	}
@@ -1035,6 +1169,9 @@ func TestTransactionPoolRepricing(t *testing.T) {
 	if queued != 3 {
 		t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 3)
 	}
+	if err := validateEvents(events, 0); err != nil {
+		t.Fatalf("reprice event firing failed: %v", err)
+	}
 	if err := validateTxPoolInternals(pool); err != nil {
 		t.Fatalf("pool internal state corrupted: %v", err)
 	}
@@ -1045,6 +1182,9 @@ func TestTransactionPoolRepricing(t *testing.T) {
 	if err := pool.AddRemote(pricedTransaction(2, big.NewInt(100000), big.NewInt(1), keys[1])); err != ErrUnderpriced {
 		t.Fatalf("adding underpriced queued transaction error mismatch: have %v, want %v", err, ErrUnderpriced)
 	}
+	if err := validateEvents(events, 0); err != nil {
+		t.Fatalf("post-reprice event firing failed: %v", err)
+	}
 	if err := validateTxPoolInternals(pool); err != nil {
 		t.Fatalf("pool internal state corrupted: %v", err)
 	}
@@ -1056,6 +1196,9 @@ func TestTransactionPoolRepricing(t *testing.T) {
 	if pending, _ = pool.Stats(); pending != 3 {
 		t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3)
 	}
+	if err := validateEvents(events, 1); err != nil {
+		t.Fatalf("post-reprice local event firing failed: %v", err)
+	}
 	if err := validateTxPoolInternals(pool); err != nil {
 		t.Fatalf("pool internal state corrupted: %v", err)
 	}
@@ -1064,6 +1207,8 @@ func TestTransactionPoolRepricing(t *testing.T) {
 // Tests that setting the transaction pool gas price to a higher value does not
 // remove local transactions.
 func TestTransactionPoolRepricingKeepsLocals(t *testing.T) {
+	t.Parallel()
+
 	// Create the pool to test the pricing enforcement with
 	db, _ := ethdb.NewMemDatabase()
 	statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
@@ -1125,6 +1270,8 @@ func TestTransactionPoolRepricingKeepsLocals(t *testing.T) {
 //
 // Note, local transactions are never allowed to be dropped.
 func TestTransactionPoolUnderpricing(t *testing.T) {
+	t.Parallel()
+
 	// Create the pool to test the pricing enforcement with
 	db, _ := ethdb.NewMemDatabase()
 	statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
@@ -1137,6 +1284,11 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
 	pool := NewTxPool(config, params.TestChainConfig, blockchain)
 	defer pool.Stop()
 
+	// Keep track of transaction events to ensure all executables get announced
+	events := make(chan TxPreEvent, 32)
+	sub := pool.txFeed.Subscribe(events)
+	defer sub.Unsubscribe()
+
 	// Create a number of test accounts and fund them
 	keys := make([]*ecdsa.PrivateKey, 3)
 	for i := 0; i < len(keys); i++ {
@@ -1164,6 +1316,9 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
 	if queued != 1 {
 		t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1)
 	}
+	if err := validateEvents(events, 3); err != nil {
+		t.Fatalf("original event firing failed: %v", err)
+	}
 	if err := validateTxPoolInternals(pool); err != nil {
 		t.Fatalf("pool internal state corrupted: %v", err)
 	}
@@ -1188,6 +1343,9 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
 	if queued != 2 {
 		t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
 	}
+	if err := validateEvents(events, 2); err != nil {
+		t.Fatalf("additional event firing failed: %v", err)
+	}
 	if err := validateTxPoolInternals(pool); err != nil {
 		t.Fatalf("pool internal state corrupted: %v", err)
 	}
@@ -1203,6 +1361,9 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
 	if queued != 2 {
 		t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
 	}
+	if err := validateEvents(events, 1); err != nil {
+		t.Fatalf("local event firing failed: %v", err)
+	}
 	if err := validateTxPoolInternals(pool); err != nil {
 		t.Fatalf("pool internal state corrupted: %v", err)
 	}
@@ -1211,6 +1372,8 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
 // Tests that the pool rejects replacement transactions that don't meet the minimum
 // price bump required.
 func TestTransactionReplacement(t *testing.T) {
+	t.Parallel()
+
 	// Create the pool to test the pricing enforcement with
 	db, _ := ethdb.NewMemDatabase()
 	statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
@@ -1219,6 +1382,11 @@ func TestTransactionReplacement(t *testing.T) {
 	pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
 	defer pool.Stop()
 
+	// Keep track of transaction events to ensure all executables get announced
+	events := make(chan TxPreEvent, 32)
+	sub := pool.txFeed.Subscribe(events)
+	defer sub.Unsubscribe()
+
 	// Create a test account to add transactions with
 	key, _ := crypto.GenerateKey()
 	pool.currentState.AddBalance(crypto.PubkeyToAddress(key.PublicKey), big.NewInt(1000000000))
@@ -1236,6 +1404,9 @@ func TestTransactionReplacement(t *testing.T) {
 	if err := pool.AddRemote(pricedTransaction(0, big.NewInt(100000), big.NewInt(2), key)); err != nil {
 		t.Fatalf("failed to replace original cheap pending transaction: %v", err)
 	}
+	if err := validateEvents(events, 2); err != nil {
+		t.Fatalf("cheap replacement event firing failed: %v", err)
+	}
 
 	if err := pool.AddRemote(pricedTransaction(0, big.NewInt(100000), big.NewInt(price), key)); err != nil {
 		t.Fatalf("failed to add original proper pending transaction: %v", err)
@@ -1246,6 +1417,9 @@ func TestTransactionReplacement(t *testing.T) {
 	if err := pool.AddRemote(pricedTransaction(0, big.NewInt(100000), big.NewInt(threshold+1), key)); err != nil {
 		t.Fatalf("failed to replace original proper pending transaction: %v", err)
 	}
+	if err := validateEvents(events, 2); err != nil {
+		t.Fatalf("proper replacement event firing failed: %v", err)
+	}
 	// Add queued transactions, ensuring the minimum price bump is enforced for replacement (for ultra low prices too)
 	if err := pool.AddRemote(pricedTransaction(2, big.NewInt(100000), big.NewInt(1), key)); err != nil {
 		t.Fatalf("failed to add original queued transaction: %v", err)
@@ -1266,6 +1440,10 @@ func TestTransactionReplacement(t *testing.T) {
 	if err := pool.AddRemote(pricedTransaction(2, big.NewInt(100000), big.NewInt(threshold+1), key)); err != nil {
 		t.Fatalf("failed to replace original queued transaction: %v", err)
 	}
+
+	if err := validateEvents(events, 0); err != nil {
+		t.Fatalf("queued replacement event firing failed: %v", err)
+	}
 	if err := validateTxPoolInternals(pool); err != nil {
 		t.Fatalf("pool internal state corrupted: %v", err)
 	}
@@ -1277,6 +1455,8 @@ func TestTransactionJournaling(t *testing.T)         { testTransactionJournaling
 func TestTransactionJournalingNoLocals(t *testing.T) { testTransactionJournaling(t, true) }
 
 func testTransactionJournaling(t *testing.T, nolocals bool) {
+	t.Parallel()
+
 	// Create a temporary file for the journal
 	file, err := ioutil.TempFile("", "")
 	if err != nil {
@@ -1335,6 +1515,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) {
 	pool.Stop()
 	statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1)
 	blockchain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)}
+
 	pool = NewTxPool(config, params.TestChainConfig, blockchain)
 
 	pending, queued = pool.Stats()
@@ -1358,6 +1539,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) {
 	pool.lockedReset(nil, nil)
 	time.Sleep(2 * config.Rejournal)
 	pool.Stop()
+
 	statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1)
 	blockchain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)}
 	pool = NewTxPool(config, params.TestChainConfig, blockchain)
-- 
GitLab