From c374447401f3c33653fa4327fa9633042f09091c Mon Sep 17 00:00:00 2001
From: villanuevawill <hello@willvillanueva.com>
Date: Fri, 22 May 2020 21:15:01 -0400
Subject: [PATCH] core: fix queued transaction eviction

Solves issue#20582. Non-executable transactions should not be evicted on each tick if there are no promote transactions or if a pending/reset empties the pending list. Tests and logging expanded to handle these cases in the future.

core/tx_pool: use a ts for each tx in the queue, but only update the heartbeat on promotion or pending replaced

queuedTs proper naming
---
 core/tx_pool.go      |  36 ++++++++----
 core/tx_pool_test.go | 133 ++++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 157 insertions(+), 12 deletions(-)

diff --git a/core/tx_pool.go b/core/tx_pool.go
index 2a4a994d4..168f2671e 100644
--- a/core/tx_pool.go
+++ b/core/tx_pool.go
@@ -98,6 +98,7 @@ var (
 	queuedReplaceMeter   = metrics.NewRegisteredMeter("txpool/queued/replace", nil)
 	queuedRateLimitMeter = metrics.NewRegisteredMeter("txpool/queued/ratelimit", nil) // Dropped due to rate limiting
 	queuedNofundsMeter   = metrics.NewRegisteredMeter("txpool/queued/nofunds", nil)   // Dropped due to out-of-funds
+	queuedEvictionMeter  = metrics.NewRegisteredMeter("txpool/queued/eviction", nil)  // Dropped due to lifetime
 
 	// General tx metrics
 	knownTxMeter       = metrics.NewRegisteredMeter("txpool/known", nil)
@@ -231,11 +232,12 @@ type TxPool struct {
 	locals  *accountSet // Set of local transaction to exempt from eviction rules
 	journal *txJournal  // Journal of local transaction to back up to disk
 
-	pending map[common.Address]*txList   // All currently processable transactions
-	queue   map[common.Address]*txList   // Queued but non-processable transactions
-	beats   map[common.Address]time.Time // Last heartbeat from each known account
-	all     *txLookup                    // All transactions to allow lookups
-	priced  *txPricedList                // All transactions sorted by price
+	pending  map[common.Address]*txList   // All currently processable transactions
+	queue    map[common.Address]*txList   // Queued but non-processable transactions
+	beats    map[common.Address]time.Time // Last heartbeat from each known account
+	queuedTs map[common.Hash]time.Time    // Timestamp for when queued transactions were added
+	all      *txLookup                    // All transactions to allow lookups
+	priced   *txPricedList                // All transactions sorted by price
 
 	chainHeadCh     chan ChainHeadEvent
 	chainHeadSub    event.Subscription
@@ -266,6 +268,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
 		pending:         make(map[common.Address]*txList),
 		queue:           make(map[common.Address]*txList),
 		beats:           make(map[common.Address]time.Time),
+		queuedTs:        make(map[common.Hash]time.Time),
 		all:             newTxLookup(),
 		chainHeadCh:     make(chan ChainHeadEvent, chainHeadChanSize),
 		reqResetCh:      make(chan *txpoolResetRequest),
@@ -363,7 +366,10 @@ func (pool *TxPool) loop() {
 				// Any non-locals old enough should be removed
 				if time.Since(pool.beats[addr]) > pool.config.Lifetime {
 					for _, tx := range pool.queue[addr].Flatten() {
-						pool.removeTx(tx.Hash(), true)
+						if time.Since(pool.queuedTs[tx.Hash()]) > pool.config.Lifetime {
+							queuedEvictionMeter.Mark(1)
+							pool.removeTx(tx.Hash(), true)
+						}
 					}
 				}
 			}
@@ -616,6 +622,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
 		pool.all.Add(tx)
 		pool.priced.Put(tx)
 		pool.journalTx(from, tx)
+		pool.beats[from] = time.Now()
 		pool.queueTxEvent(tx)
 		log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())
 		return old != nil, nil
@@ -658,16 +665,20 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, er
 	}
 	// Discard any previous transaction and mark this
 	if old != nil {
-		pool.all.Remove(old.Hash())
+		old_hash := old.Hash()
+		pool.all.Remove(old_hash)
 		pool.priced.Removed(1)
+		delete(pool.queuedTs, old_hash)
 		queuedReplaceMeter.Mark(1)
 	} else {
 		// Nothing was replaced, bump the queued counter
 		queuedGauge.Inc(1)
+		pool.queuedTs[hash] = time.Now()
 	}
 	if pool.all.Get(hash) == nil {
 		pool.all.Add(tx)
 		pool.priced.Put(tx)
+		pool.queuedTs[hash] = time.Now()
 	}
 	return old != nil, nil
 }
@@ -700,7 +711,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
 		// An older transaction was better, discard this
 		pool.all.Remove(hash)
 		pool.priced.Removed(1)
-
+		delete(pool.queuedTs, hash)
 		pendingDiscardMeter.Mark(1)
 		return false
 	}
@@ -708,7 +719,6 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
 	if old != nil {
 		pool.all.Remove(old.Hash())
 		pool.priced.Removed(1)
-
 		pendingReplaceMeter.Mark(1)
 	} else {
 		// Nothing was replaced, bump the pending counter
@@ -721,6 +731,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
 	}
 	// Set the potentially new pending nonce and notify any subsystems of the new tx
 	pool.beats[addr] = time.Now()
+	delete(pool.queuedTs, hash)
 	pool.pendingNonces.set(addr, tx.Nonce()+1)
 
 	return true
@@ -895,7 +906,6 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
 			// If no more pending transactions are left, remove the list
 			if pending.Empty() {
 				delete(pool.pending, addr)
-				delete(pool.beats, addr)
 			}
 			// Postpone any invalidated transactions
 			for _, tx := range invalids {
@@ -913,6 +923,7 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
 		if removed, _ := future.Remove(tx); removed {
 			// Reduce the queued counter
 			queuedGauge.Dec(1)
+			delete(pool.queuedTs, hash)
 		}
 		if future.Empty() {
 			delete(pool.queue, addr)
@@ -1191,6 +1202,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans
 		for _, tx := range forwards {
 			hash := tx.Hash()
 			pool.all.Remove(hash)
+			delete(pool.queuedTs, hash)
 		}
 		log.Trace("Removed old queued transactions", "count", len(forwards))
 		// Drop all transactions that are too costly (low balance or out of gas)
@@ -1198,6 +1210,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans
 		for _, tx := range drops {
 			hash := tx.Hash()
 			pool.all.Remove(hash)
+			delete(pool.queuedTs, hash)
 		}
 		log.Trace("Removed unpayable queued transactions", "count", len(drops))
 		queuedNofundsMeter.Mark(int64(len(drops)))
@@ -1220,6 +1233,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans
 			for _, tx := range caps {
 				hash := tx.Hash()
 				pool.all.Remove(hash)
+				delete(pool.queuedTs, hash)
 				log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
 			}
 			queuedRateLimitMeter.Mark(int64(len(caps)))
@@ -1414,7 +1428,7 @@ func (pool *TxPool) demoteUnexecutables() {
 			}
 			pendingGauge.Dec(int64(len(gapped)))
 		}
-		// Delete the entire queue entry if it became empty.
+		// Delete the entire pending entry if it became empty.
 		if list.Empty() {
 			delete(pool.pending, addr)
 			delete(pool.beats, addr)
diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go
index c436a309f..3f48ce6ce 100644
--- a/core/tx_pool_test.go
+++ b/core/tx_pool_test.go
@@ -109,6 +109,10 @@ func validateTxPoolInternals(pool *TxPool) error {
 	if priced := pool.priced.items.Len() - pool.priced.stales; priced != pending+queued {
 		return fmt.Errorf("total priced transaction count %d != %d pending + %d queued", priced, pending, queued)
 	}
+	if queued != len(pool.queuedTs) {
+		return fmt.Errorf("total queued transaction count %d != %d queuedTs length", queued, len(pool.queuedTs))
+	}
+
 	// Ensure the next nonce to assign is the correct one
 	for addr, txs := range pool.pending {
 		// Find the last transaction
@@ -868,7 +872,7 @@ func TestTransactionQueueTimeLimitingNoLocals(t *testing.T) {
 func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) {
 	// Reduce the eviction interval to a testable amount
 	defer func(old time.Duration) { evictionInterval = old }(evictionInterval)
-	evictionInterval = time.Second
+	evictionInterval = time.Millisecond * 100
 
 	// Create the pool to test the non-expiration enforcement
 	statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
@@ -905,6 +909,22 @@ func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) {
 	if err := validateTxPoolInternals(pool); err != nil {
 		t.Fatalf("pool internal state corrupted: %v", err)
 	}
+
+	// Allow the eviction interval to run
+	time.Sleep(2 * evictionInterval)
+
+	// Transactions should not be evicted from the queue yet since lifetime duration has not passed
+	pending, queued = pool.Stats()
+	if pending != 0 {
+		t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0)
+	}
+	if queued != 2 {
+		t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
+	}
+	if err := validateTxPoolInternals(pool); err != nil {
+		t.Fatalf("pool internal state corrupted: %v", err)
+	}
+
 	// Wait a bit for eviction to run and clean up any leftovers, and ensure only the local remains
 	time.Sleep(2 * config.Lifetime)
 
@@ -924,6 +944,117 @@ func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) {
 	if err := validateTxPoolInternals(pool); err != nil {
 		t.Fatalf("pool internal state corrupted: %v", err)
 	}
+
+	// remove current transactions and increase nonce to prepare for a reset and cleanup
+	statedb.SetNonce(crypto.PubkeyToAddress(remote.PublicKey), 2)
+	statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 2)
+
+	<-pool.requestReset(nil, nil)
+
+	// make sure queue, pending are cleared
+	pending, queued = pool.Stats()
+	if pending != 0 {
+		t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0)
+	}
+	if queued != 0 {
+		t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
+	}
+	if err := validateTxPoolInternals(pool); err != nil {
+		t.Fatalf("pool internal state corrupted: %v", err)
+	}
+
+	if err := pool.AddLocal(pricedTransaction(2, 100000, big.NewInt(1), local)); err != nil {
+		t.Fatalf("failed to add remote transaction: %v", err)
+	}
+	if err := pool.AddLocal(pricedTransaction(4, 100000, big.NewInt(1), local)); err != nil {
+		t.Fatalf("failed to add remote transaction: %v", err)
+	}
+	if err := pool.addRemoteSync(pricedTransaction(2, 100000, big.NewInt(1), remote)); err != nil {
+		t.Fatalf("failed to add remote transaction: %v", err)
+	}
+	if err := pool.addRemoteSync(pricedTransaction(4, 100000, big.NewInt(1), remote)); err != nil {
+		t.Fatalf("failed to add remote transaction: %v", err)
+	}
+
+	// wait a short amount of time to add an additional future queued item to test proper eviction when
+	// pending is removed
+	time.Sleep(2 * evictionInterval)
+	if err := pool.addRemoteSync(pricedTransaction(5, 100000, big.NewInt(1), remote)); err != nil {
+		t.Fatalf("failed to add remote transaction: %v", err)
+	}
+
+	// Make sure future queue and pending have transactions
+	pending, queued = pool.Stats()
+	if pending != 2 {
+		t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2)
+	}
+	if queued != 3 {
+		t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 3)
+	}
+	if err := validateTxPoolInternals(pool); err != nil {
+		t.Fatalf("pool internal state corrupted: %v", err)
+	}
+
+	// Trigger a reset to make sure queued items are not evicted
+	statedb.SetNonce(crypto.PubkeyToAddress(remote.PublicKey), 3)
+	statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 3)
+	<-pool.requestReset(nil, nil)
+
+	// Wait for eviction to run
+	time.Sleep(evictionInterval * 2)
+
+	// a pool reset, empty pending list, or demotion of pending transactions should maintain
+	// queued transactions for non locals and locals alike if the lifetime duration has not passed yet
+	pending, queued = pool.Stats()
+	if pending != 0 {
+		t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0)
+	}
+	if queued != 3 {
+		t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
+	}
+	if err := validateTxPoolInternals(pool); err != nil {
+		t.Fatalf("pool internal state corrupted: %v", err)
+	}
+
+	// Wait for the lifetime to run for all transactions except the one that was added later
+	time.Sleep(evictionInterval * 7)
+	pending, queued = pool.Stats()
+	if pending != 0 {
+		t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0)
+	}
+	if nolocals {
+		if queued != 1 {
+			t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1)
+		}
+	} else {
+		if queued != 2 {
+			t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
+		}
+	}
+
+	if err := validateTxPoolInternals(pool); err != nil {
+		t.Fatalf("pool internal state corrupted: %v", err)
+	}
+
+	// lifetime should pass for the final transaction
+	time.Sleep(evictionInterval * 2)
+
+	pending, queued = pool.Stats()
+	if pending != 0 {
+		t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0)
+	}
+	if nolocals {
+		if queued != 0 {
+			t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
+		}
+	} else {
+		if queued != 1 {
+			t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
+		}
+	}
+	if err := validateTxPoolInternals(pool); err != nil {
+		t.Fatalf("pool internal state corrupted: %v", err)
+	}
 }
 
 // Tests that even if the transaction count belonging to a single account goes
-- 
GitLab