From 88c696240dc7dfd99588d9e2ef0b04f03a06d1a5 Mon Sep 17 00:00:00 2001
From: gary rong <garyrong0905@gmail.com>
Date: Fri, 11 Dec 2020 17:44:57 +0800
Subject: [PATCH] core/txpool: remove "local" notion from the txpool price heap
 (#21478)

* core: separate the local notion from the pricedHeap

* core: add benchmarks

* core: improve tests

* core: address comments

* core: degrade the panic to error message

* core: fix typo

* core: address comments

* core: address comment

* core: use PEAK instead of POP

* core: address comments
---
 core/tx_list.go      | 152 +++++++++++++++------------------
 core/tx_pool.go      | 195 +++++++++++++++++++++++++++++++++----------
 core/tx_pool_test.go |  69 ++++++++++++---
 3 files changed, 271 insertions(+), 145 deletions(-)

diff --git a/core/tx_list.go b/core/tx_list.go
index cdd3df14c..894640d57 100644
--- a/core/tx_list.go
+++ b/core/tx_list.go
@@ -24,7 +24,6 @@ import (
 
 	"github.com/ethereum/go-ethereum/common"
 	"github.com/ethereum/go-ethereum/core/types"
-	"github.com/ethereum/go-ethereum/log"
 )
 
 // nonceHeap is a heap.Interface implementation over 64bit unsigned integers for
@@ -439,24 +438,29 @@ func (h *priceHeap) Pop() interface{} {
 }
 
 // txPricedList is a price-sorted heap to allow operating on transactions pool
-// contents in a price-incrementing way.
+// contents in a price-incrementing way. It's built opon the all transactions
+// in txpool but only interested in the remote part. It means only remote transactions
+// will be considered for tracking, sorting, eviction, etc.
 type txPricedList struct {
-	all    *txLookup  // Pointer to the map of all transactions
-	items  *priceHeap // Heap of prices of all the stored transactions
-	stales int        // Number of stale price points to (re-heap trigger)
+	all     *txLookup  // Pointer to the map of all transactions
+	remotes *priceHeap // Heap of prices of all the stored **remote** transactions
+	stales  int        // Number of stale price points to (re-heap trigger)
 }
 
 // newTxPricedList creates a new price-sorted transaction heap.
 func newTxPricedList(all *txLookup) *txPricedList {
 	return &txPricedList{
-		all:   all,
-		items: new(priceHeap),
+		all:     all,
+		remotes: new(priceHeap),
 	}
 }
 
 // Put inserts a new transaction into the heap.
-func (l *txPricedList) Put(tx *types.Transaction) {
-	heap.Push(l.items, tx)
+func (l *txPricedList) Put(tx *types.Transaction, local bool) {
+	if local {
+		return
+	}
+	heap.Push(l.remotes, tx)
 }
 
 // Removed notifies the prices transaction list that an old transaction dropped
@@ -465,121 +469,95 @@ func (l *txPricedList) Put(tx *types.Transaction) {
 func (l *txPricedList) Removed(count int) {
 	// Bump the stale counter, but exit if still too low (< 25%)
 	l.stales += count
-	if l.stales <= len(*l.items)/4 {
+	if l.stales <= len(*l.remotes)/4 {
 		return
 	}
 	// Seems we've reached a critical number of stale transactions, reheap
-	reheap := make(priceHeap, 0, l.all.Count())
-
-	l.stales, l.items = 0, &reheap
-	l.all.Range(func(hash common.Hash, tx *types.Transaction) bool {
-		*l.items = append(*l.items, tx)
-		return true
-	})
-	heap.Init(l.items)
+	l.Reheap()
 }
 
 // Cap finds all the transactions below the given price threshold, drops them
 // from the priced list and returns them for further removal from the entire pool.
-func (l *txPricedList) Cap(threshold *big.Int, local *accountSet) types.Transactions {
+//
+// Note: only remote transactions will be considered for eviction.
+func (l *txPricedList) Cap(threshold *big.Int) types.Transactions {
 	drop := make(types.Transactions, 0, 128) // Remote underpriced transactions to drop
-	save := make(types.Transactions, 0, 64)  // Local underpriced transactions to keep
-
-	for len(*l.items) > 0 {
+	for len(*l.remotes) > 0 {
 		// Discard stale transactions if found during cleanup
-		tx := heap.Pop(l.items).(*types.Transaction)
-		if l.all.Get(tx.Hash()) == nil {
+		cheapest := (*l.remotes)[0]
+		if l.all.GetRemote(cheapest.Hash()) == nil { // Removed or migrated
+			heap.Pop(l.remotes)
 			l.stales--
 			continue
 		}
 		// Stop the discards if we've reached the threshold
-		if tx.GasPriceIntCmp(threshold) >= 0 {
-			save = append(save, tx)
+		if cheapest.GasPriceIntCmp(threshold) >= 0 {
 			break
 		}
-		// Non stale transaction found, discard unless local
-		if local.containsTx(tx) {
-			save = append(save, tx)
-		} else {
-			drop = append(drop, tx)
-		}
-	}
-	for _, tx := range save {
-		heap.Push(l.items, tx)
+		heap.Pop(l.remotes)
+		drop = append(drop, cheapest)
 	}
 	return drop
 }
 
 // Underpriced checks whether a transaction is cheaper than (or as cheap as) the
-// lowest priced transaction currently being tracked.
-func (l *txPricedList) Underpriced(tx *types.Transaction, local *accountSet) bool {
-	// Local transactions cannot be underpriced
-	if local.containsTx(tx) {
-		return false
-	}
+// lowest priced (remote) transaction currently being tracked.
+func (l *txPricedList) Underpriced(tx *types.Transaction) bool {
 	// Discard stale price points if found at the heap start
-	for len(*l.items) > 0 {
-		head := []*types.Transaction(*l.items)[0]
-		if l.all.Get(head.Hash()) == nil {
+	for len(*l.remotes) > 0 {
+		head := []*types.Transaction(*l.remotes)[0]
+		if l.all.GetRemote(head.Hash()) == nil { // Removed or migrated
 			l.stales--
-			heap.Pop(l.items)
+			heap.Pop(l.remotes)
 			continue
 		}
 		break
 	}
 	// Check if the transaction is underpriced or not
-	if len(*l.items) == 0 {
-		log.Error("Pricing query for empty pool") // This cannot happen, print to catch programming errors
-		return false
+	if len(*l.remotes) == 0 {
+		return false // There is no remote transaction at all.
 	}
-	cheapest := []*types.Transaction(*l.items)[0]
+	// If the remote transaction is even cheaper than the
+	// cheapest one tracked locally, reject it.
+	cheapest := []*types.Transaction(*l.remotes)[0]
 	return cheapest.GasPriceCmp(tx) >= 0
 }
 
 // Discard finds a number of most underpriced transactions, removes them from the
 // priced list and returns them for further removal from the entire pool.
-func (l *txPricedList) Discard(slots int, local *accountSet) types.Transactions {
-	// If we have some local accountset, those will not be discarded
-	if !local.empty() {
-		// In case the list is filled to the brim with 'local' txs, we do this
-		// little check to avoid unpacking / repacking the heap later on, which
-		// is very expensive
-		discardable := 0
-		for _, tx := range *l.items {
-			if !local.containsTx(tx) {
-				discardable++
-			}
-			if discardable >= slots {
-				break
-			}
-		}
-		if slots > discardable {
-			slots = discardable
-		}
-	}
-	if slots == 0 {
-		return nil
-	}
-	drop := make(types.Transactions, 0, slots)               // Remote underpriced transactions to drop
-	save := make(types.Transactions, 0, len(*l.items)-slots) // Local underpriced transactions to keep
-
-	for len(*l.items) > 0 && slots > 0 {
+//
+// Note local transaction won't be considered for eviction.
+func (l *txPricedList) Discard(slots int, force bool) (types.Transactions, bool) {
+	drop := make(types.Transactions, 0, slots) // Remote underpriced transactions to drop
+	for len(*l.remotes) > 0 && slots > 0 {
 		// Discard stale transactions if found during cleanup
-		tx := heap.Pop(l.items).(*types.Transaction)
-		if l.all.Get(tx.Hash()) == nil {
+		tx := heap.Pop(l.remotes).(*types.Transaction)
+		if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated
 			l.stales--
 			continue
 		}
-		// Non stale transaction found, discard unless local
-		if local.containsTx(tx) {
-			save = append(save, tx)
-		} else {
-			drop = append(drop, tx)
-			slots -= numSlots(tx)
+		// Non stale transaction found, discard it
+		drop = append(drop, tx)
+		slots -= numSlots(tx)
+	}
+	// If we still can't make enough room for the new transaction
+	if slots > 0 && !force {
+		for _, tx := range drop {
+			heap.Push(l.remotes, tx)
 		}
+		return nil, false
 	}
-	for _, tx := range save {
-		heap.Push(l.items, tx)
-	}
-	return drop
+	return drop, true
+}
+
+// Reheap forcibly rebuilds the heap based on the current remote transaction set.
+func (l *txPricedList) Reheap() {
+	reheap := make(priceHeap, 0, l.all.RemoteCount())
+
+	l.stales, l.remotes = 0, &reheap
+	l.all.Range(func(hash common.Hash, tx *types.Transaction, local bool) bool {
+		*l.remotes = append(*l.remotes, tx)
+		return true
+	}, false, true) // Only iterate remotes
+	heap.Init(l.remotes)
 }
diff --git a/core/tx_pool.go b/core/tx_pool.go
index e3ffe103c..4a17c31ca 100644
--- a/core/tx_pool.go
+++ b/core/tx_pool.go
@@ -63,6 +63,10 @@ var (
 	// configured for the transaction pool.
 	ErrUnderpriced = errors.New("transaction underpriced")
 
+	// ErrTxPoolOverflow is returned if the transaction pool is full and can't accpet
+	// another remote transaction.
+	ErrTxPoolOverflow = errors.New("txpool is full")
+
 	// ErrReplaceUnderpriced is returned if a transaction is attempted to be replaced
 	// with a different one without the required price bump.
 	ErrReplaceUnderpriced = errors.New("replacement transaction underpriced")
@@ -105,6 +109,7 @@ var (
 	validTxMeter       = metrics.NewRegisteredMeter("txpool/valid", nil)
 	invalidTxMeter     = metrics.NewRegisteredMeter("txpool/invalid", nil)
 	underpricedTxMeter = metrics.NewRegisteredMeter("txpool/underpriced", nil)
+	overflowedTxMeter  = metrics.NewRegisteredMeter("txpool/overflowed", nil)
 
 	pendingGauge = metrics.NewRegisteredGauge("txpool/pending", nil)
 	queuedGauge  = metrics.NewRegisteredGauge("txpool/queued", nil)
@@ -421,7 +426,7 @@ func (pool *TxPool) SetGasPrice(price *big.Int) {
 	defer pool.mu.Unlock()
 
 	pool.gasPrice = price
-	for _, tx := range pool.priced.Cap(price, pool.locals) {
+	for _, tx := range pool.priced.Cap(price) {
 		pool.removeTx(tx.Hash(), false)
 	}
 	log.Info("Transaction pool price threshold updated", "price", price)
@@ -536,7 +541,6 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
 		return ErrInvalidSender
 	}
 	// Drop non-local transactions under our own minimal accepted gas price
-	local = local || pool.locals.contains(from) // account may be local even if the transaction arrived from the network
 	if !local && tx.GasPriceIntCmp(pool.gasPrice) < 0 {
 		return ErrUnderpriced
 	}
@@ -575,22 +579,36 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
 		knownTxMeter.Mark(1)
 		return false, ErrAlreadyKnown
 	}
+	// Make the local flag. If it's from local source or it's from the network but
+	// the sender is marked as local previously, treat it as the local transaction.
+	isLocal := local || pool.locals.containsTx(tx)
+
 	// If the transaction fails basic validation, discard it
-	if err := pool.validateTx(tx, local); err != nil {
+	if err := pool.validateTx(tx, isLocal); err != nil {
 		log.Trace("Discarding invalid transaction", "hash", hash, "err", err)
 		invalidTxMeter.Mark(1)
 		return false, err
 	}
 	// If the transaction pool is full, discard underpriced transactions
-	if uint64(pool.all.Count()) >= pool.config.GlobalSlots+pool.config.GlobalQueue {
+	if uint64(pool.all.Count()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue {
 		// If the new transaction is underpriced, don't accept it
-		if !local && pool.priced.Underpriced(tx, pool.locals) {
+		if !isLocal && pool.priced.Underpriced(tx) {
 			log.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice())
 			underpricedTxMeter.Mark(1)
 			return false, ErrUnderpriced
 		}
-		// New transaction is better than our worse ones, make room for it
-		drop := pool.priced.Discard(pool.all.Slots()-int(pool.config.GlobalSlots+pool.config.GlobalQueue)+numSlots(tx), pool.locals)
+		// New transaction is better than our worse ones, make room for it.
+		// If it's a local transaction, forcibly discard all available transactions.
+		// Otherwise if we can't make enough room for new one, abort the operation.
+		drop, success := pool.priced.Discard(pool.all.Slots()-int(pool.config.GlobalSlots+pool.config.GlobalQueue)+numSlots(tx), isLocal)
+
+		// Special case, we still can't make the room for the new remote one.
+		if !isLocal && !success {
+			log.Trace("Discarding overflown transaction", "hash", hash)
+			overflowedTxMeter.Mark(1)
+			return false, ErrTxPoolOverflow
+		}
+		// Kick out the underpriced remote transactions.
 		for _, tx := range drop {
 			log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice())
 			underpricedTxMeter.Mark(1)
@@ -612,8 +630,8 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
 			pool.priced.Removed(1)
 			pendingReplaceMeter.Mark(1)
 		}
-		pool.all.Add(tx)
-		pool.priced.Put(tx)
+		pool.all.Add(tx, isLocal)
+		pool.priced.Put(tx, isLocal)
 		pool.journalTx(from, tx)
 		pool.queueTxEvent(tx)
 		log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())
@@ -623,18 +641,17 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
 		return old != nil, nil
 	}
 	// New transaction isn't replacing a pending one, push into queue
-	replaced, err = pool.enqueueTx(hash, tx)
+	replaced, err = pool.enqueueTx(hash, tx, isLocal, true)
 	if err != nil {
 		return false, err
 	}
 	// Mark local addresses and journal local transactions
-	if local {
-		if !pool.locals.contains(from) {
-			log.Info("Setting new local account", "address", from)
-			pool.locals.add(from)
-		}
+	if local && !pool.locals.contains(from) {
+		log.Info("Setting new local account", "address", from)
+		pool.locals.add(from)
+		pool.priced.Removed(pool.all.RemoteToLocals(pool.locals)) // Migrate the remotes if it's marked as local first time.
 	}
-	if local || pool.locals.contains(from) {
+	if isLocal {
 		localGauge.Inc(1)
 	}
 	pool.journalTx(from, tx)
@@ -646,7 +663,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
 // enqueueTx inserts a new transaction into the non-executable transaction queue.
 //
 // Note, this method assumes the pool lock is held!
-func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, error) {
+func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction, local bool, addAll bool) (bool, error) {
 	// Try to insert the transaction into the future queue
 	from, _ := types.Sender(pool.signer, tx) // already validated
 	if pool.queue[from] == nil {
@@ -667,9 +684,14 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, er
 		// Nothing was replaced, bump the queued counter
 		queuedGauge.Inc(1)
 	}
-	if pool.all.Get(hash) == nil {
-		pool.all.Add(tx)
-		pool.priced.Put(tx)
+	// If the transaction isn't in lookup set but it's expected to be there,
+	// show the error log.
+	if pool.all.Get(hash) == nil && !addAll {
+		log.Error("Missing transaction in lookup set, please report the issue", "hash", hash)
+	}
+	if addAll {
+		pool.all.Add(tx, local)
+		pool.priced.Put(tx, local)
 	}
 	// If we never record the heartbeat, do it right now.
 	if _, exist := pool.beats[from]; !exist {
@@ -718,11 +740,6 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
 		// Nothing was replaced, bump the pending counter
 		pendingGauge.Inc(1)
 	}
-	// Failsafe to work around direct pending inserts (tests)
-	if pool.all.Get(hash) == nil {
-		pool.all.Add(tx)
-		pool.priced.Put(tx)
-	}
 	// Set the potentially new pending nonce and notify any subsystems of the new tx
 	pool.pendingNonces.set(addr, tx.Nonce()+1)
 
@@ -904,7 +921,8 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
 			}
 			// Postpone any invalidated transactions
 			for _, tx := range invalids {
-				pool.enqueueTx(tx.Hash(), tx)
+				// Internal shuffle shouldn't touch the lookup set.
+				pool.enqueueTx(tx.Hash(), tx, false, false)
 			}
 			// Update the account nonce if needed
 			pool.pendingNonces.setIfLower(addr, tx.Nonce())
@@ -1408,7 +1426,9 @@ func (pool *TxPool) demoteUnexecutables() {
 		for _, tx := range invalids {
 			hash := tx.Hash()
 			log.Trace("Demoting pending transaction", "hash", hash)
-			pool.enqueueTx(hash, tx)
+
+			// Internal shuffle shouldn't touch the lookup set.
+			pool.enqueueTx(hash, tx, false, false)
 		}
 		pendingGauge.Dec(int64(len(olds) + len(drops) + len(invalids)))
 		if pool.locals.contains(addr) {
@@ -1420,7 +1440,9 @@ func (pool *TxPool) demoteUnexecutables() {
 			for _, tx := range gapped {
 				hash := tx.Hash()
 				log.Error("Demoting invalidated transaction", "hash", hash)
-				pool.enqueueTx(hash, tx)
+
+				// Internal shuffle shouldn't touch the lookup set.
+				pool.enqueueTx(hash, tx, false, false)
 			}
 			pendingGauge.Dec(int64(len(gapped)))
 			// This might happen in a reorg, so log it to the metering
@@ -1519,8 +1541,8 @@ func (as *accountSet) merge(other *accountSet) {
 	as.cache = nil
 }
 
-// txLookup is used internally by TxPool to track transactions while allowing lookup without
-// mutex contention.
+// txLookup is used internally by TxPool to track transactions while allowing
+// lookup without mutex contention.
 //
 // Note, although this type is properly protected against concurrent access, it
 // is **not** a type that should ever be mutated or even exposed outside of the
@@ -1528,27 +1550,43 @@ func (as *accountSet) merge(other *accountSet) {
 // internal mechanisms. The sole purpose of the type is to permit out-of-bound
 // peeking into the pool in TxPool.Get without having to acquire the widely scoped
 // TxPool.mu mutex.
+//
+// This lookup set combines the notion of "local transactions", which is useful
+// to build upper-level structure.
 type txLookup struct {
-	all   map[common.Hash]*types.Transaction
-	slots int
-	lock  sync.RWMutex
+	slots   int
+	lock    sync.RWMutex
+	locals  map[common.Hash]*types.Transaction
+	remotes map[common.Hash]*types.Transaction
 }
 
 // newTxLookup returns a new txLookup structure.
 func newTxLookup() *txLookup {
 	return &txLookup{
-		all: make(map[common.Hash]*types.Transaction),
+		locals:  make(map[common.Hash]*types.Transaction),
+		remotes: make(map[common.Hash]*types.Transaction),
 	}
 }
 
-// Range calls f on each key and value present in the map.
-func (t *txLookup) Range(f func(hash common.Hash, tx *types.Transaction) bool) {
+// Range calls f on each key and value present in the map. The callback passed
+// should return the indicator whether the iteration needs to be continued.
+// Callers need to specify which set (or both) to be iterated.
+func (t *txLookup) Range(f func(hash common.Hash, tx *types.Transaction, local bool) bool, local bool, remote bool) {
 	t.lock.RLock()
 	defer t.lock.RUnlock()
 
-	for key, value := range t.all {
-		if !f(key, value) {
-			break
+	if local {
+		for key, value := range t.locals {
+			if !f(key, value, true) {
+				return
+			}
+		}
+	}
+	if remote {
+		for key, value := range t.remotes {
+			if !f(key, value, false) {
+				return
+			}
 		}
 	}
 }
@@ -1558,15 +1596,50 @@ func (t *txLookup) Get(hash common.Hash) *types.Transaction {
 	t.lock.RLock()
 	defer t.lock.RUnlock()
 
-	return t.all[hash]
+	if tx := t.locals[hash]; tx != nil {
+		return tx
+	}
+	return t.remotes[hash]
 }
 
-// Count returns the current number of items in the lookup.
+// GetLocal returns a transaction if it exists in the lookup, or nil if not found.
+func (t *txLookup) GetLocal(hash common.Hash) *types.Transaction {
+	t.lock.RLock()
+	defer t.lock.RUnlock()
+
+	return t.locals[hash]
+}
+
+// GetRemote returns a transaction if it exists in the lookup, or nil if not found.
+func (t *txLookup) GetRemote(hash common.Hash) *types.Transaction {
+	t.lock.RLock()
+	defer t.lock.RUnlock()
+
+	return t.remotes[hash]
+}
+
+// Count returns the current number of transactions in the lookup.
 func (t *txLookup) Count() int {
 	t.lock.RLock()
 	defer t.lock.RUnlock()
 
-	return len(t.all)
+	return len(t.locals) + len(t.remotes)
+}
+
+// LocalCount returns the current number of local transactions in the lookup.
+func (t *txLookup) LocalCount() int {
+	t.lock.RLock()
+	defer t.lock.RUnlock()
+
+	return len(t.locals)
+}
+
+// RemoteCount returns the current number of remote transactions in the lookup.
+func (t *txLookup) RemoteCount() int {
+	t.lock.RLock()
+	defer t.lock.RUnlock()
+
+	return len(t.remotes)
 }
 
 // Slots returns the current number of slots used in the lookup.
@@ -1578,14 +1651,18 @@ func (t *txLookup) Slots() int {
 }
 
 // Add adds a transaction to the lookup.
-func (t *txLookup) Add(tx *types.Transaction) {
+func (t *txLookup) Add(tx *types.Transaction, local bool) {
 	t.lock.Lock()
 	defer t.lock.Unlock()
 
 	t.slots += numSlots(tx)
 	slotsGauge.Update(int64(t.slots))
 
-	t.all[tx.Hash()] = tx
+	if local {
+		t.locals[tx.Hash()] = tx
+	} else {
+		t.remotes[tx.Hash()] = tx
+	}
 }
 
 // Remove removes a transaction from the lookup.
@@ -1593,10 +1670,36 @@ func (t *txLookup) Remove(hash common.Hash) {
 	t.lock.Lock()
 	defer t.lock.Unlock()
 
-	t.slots -= numSlots(t.all[hash])
+	tx, ok := t.locals[hash]
+	if !ok {
+		tx, ok = t.remotes[hash]
+	}
+	if !ok {
+		log.Error("No transaction found to be deleted", "hash", hash)
+		return
+	}
+	t.slots -= numSlots(tx)
 	slotsGauge.Update(int64(t.slots))
 
-	delete(t.all, hash)
+	delete(t.locals, hash)
+	delete(t.remotes, hash)
+}
+
+// RemoteToLocals migrates the transactions belongs to the given locals to locals
+// set. The assumption is held the locals set is thread-safe to be used.
+func (t *txLookup) RemoteToLocals(locals *accountSet) int {
+	t.lock.Lock()
+	defer t.lock.Unlock()
+
+	var migrated int
+	for hash, tx := range t.remotes {
+		if locals.containsTx(tx) {
+			t.locals[hash] = tx
+			delete(t.remotes, hash)
+			migrated += 1
+		}
+	}
+	return migrated
 }
 
 // numSlots calculates the number of slots needed for a single transaction.
diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go
index 246b3977d..47d3830b0 100644
--- a/core/tx_pool_test.go
+++ b/core/tx_pool_test.go
@@ -107,10 +107,11 @@ func validateTxPoolInternals(pool *TxPool) error {
 	if total := pool.all.Count(); total != pending+queued {
 		return fmt.Errorf("total transaction count %d != %d pending + %d queued", total, pending, queued)
 	}
-	if priced := pool.priced.items.Len() - pool.priced.stales; priced != pending+queued {
-		return fmt.Errorf("total priced transaction count %d != %d pending + %d queued", priced, pending, queued)
+	pool.priced.Reheap()
+	priced, remote := pool.priced.remotes.Len(), pool.all.RemoteCount()
+	if priced != remote {
+		return fmt.Errorf("total priced transaction count %d != %d", priced, remote)
 	}
-
 	// Ensure the next nonce to assign is the correct one
 	for addr, txs := range pool.pending {
 		// Find the last transaction
@@ -280,7 +281,7 @@ func TestTransactionQueue(t *testing.T) {
 	pool.currentState.AddBalance(from, big.NewInt(1000))
 	<-pool.requestReset(nil, nil)
 
-	pool.enqueueTx(tx.Hash(), tx)
+	pool.enqueueTx(tx.Hash(), tx, false, true)
 	<-pool.requestPromoteExecutables(newAccountSet(pool.signer, from))
 	if len(pool.pending) != 1 {
 		t.Error("expected valid txs to be 1 is", len(pool.pending))
@@ -289,7 +290,7 @@ func TestTransactionQueue(t *testing.T) {
 	tx = transaction(1, 100, key)
 	from, _ = deriveSender(tx)
 	pool.currentState.SetNonce(from, 2)
-	pool.enqueueTx(tx.Hash(), tx)
+	pool.enqueueTx(tx.Hash(), tx, false, true)
 
 	<-pool.requestPromoteExecutables(newAccountSet(pool.signer, from))
 	if _, ok := pool.pending[from].txs.items[tx.Nonce()]; ok {
@@ -313,9 +314,9 @@ func TestTransactionQueue2(t *testing.T) {
 	pool.currentState.AddBalance(from, big.NewInt(1000))
 	pool.reset(nil, nil)
 
-	pool.enqueueTx(tx1.Hash(), tx1)
-	pool.enqueueTx(tx2.Hash(), tx2)
-	pool.enqueueTx(tx3.Hash(), tx3)
+	pool.enqueueTx(tx1.Hash(), tx1, false, true)
+	pool.enqueueTx(tx2.Hash(), tx2, false, true)
+	pool.enqueueTx(tx3.Hash(), tx3, false, true)
 
 	pool.promoteExecutables([]common.Address{from})
 	if len(pool.pending) != 1 {
@@ -488,12 +489,21 @@ func TestTransactionDropping(t *testing.T) {
 		tx11 = transaction(11, 200, key)
 		tx12 = transaction(12, 300, key)
 	)
+	pool.all.Add(tx0, false)
+	pool.priced.Put(tx0, false)
 	pool.promoteTx(account, tx0.Hash(), tx0)
+
+	pool.all.Add(tx1, false)
+	pool.priced.Put(tx1, false)
 	pool.promoteTx(account, tx1.Hash(), tx1)
+
+	pool.all.Add(tx2, false)
+	pool.priced.Put(tx2, false)
 	pool.promoteTx(account, tx2.Hash(), tx2)
-	pool.enqueueTx(tx10.Hash(), tx10)
-	pool.enqueueTx(tx11.Hash(), tx11)
-	pool.enqueueTx(tx12.Hash(), tx12)
+
+	pool.enqueueTx(tx10.Hash(), tx10, false, true)
+	pool.enqueueTx(tx11.Hash(), tx11, false, true)
+	pool.enqueueTx(tx12.Hash(), tx12, false, true)
 
 	// Check that pre and post validations leave the pool as is
 	if pool.pending[account].Len() != 3 {
@@ -1964,7 +1974,7 @@ func benchmarkFuturePromotion(b *testing.B, size int) {
 
 	for i := 0; i < size; i++ {
 		tx := transaction(uint64(1+i), 100000, key)
-		pool.enqueueTx(tx.Hash(), tx)
+		pool.enqueueTx(tx.Hash(), tx, false, true)
 	}
 	// Benchmark the speed of pool validation
 	b.ResetTimer()
@@ -2007,3 +2017,38 @@ func benchmarkPoolBatchInsert(b *testing.B, size int, local bool) {
 		}
 	}
 }
+
+func BenchmarkInsertRemoteWithAllLocals(b *testing.B) {
+	// Allocate keys for testing
+	key, _ := crypto.GenerateKey()
+	account := crypto.PubkeyToAddress(key.PublicKey)
+
+	remoteKey, _ := crypto.GenerateKey()
+	remoteAddr := crypto.PubkeyToAddress(remoteKey.PublicKey)
+
+	locals := make([]*types.Transaction, 4096+1024) // Occupy all slots
+	for i := 0; i < len(locals); i++ {
+		locals[i] = transaction(uint64(i), 100000, key)
+	}
+	remotes := make([]*types.Transaction, 1000)
+	for i := 0; i < len(remotes); i++ {
+		remotes[i] = pricedTransaction(uint64(i), 100000, big.NewInt(2), remoteKey) // Higher gasprice
+	}
+	// Benchmark importing the transactions into the queue
+	b.ResetTimer()
+	for i := 0; i < b.N; i++ {
+		b.StopTimer()
+		pool, _ := setupTxPool()
+		pool.currentState.AddBalance(account, big.NewInt(100000000))
+		for _, local := range locals {
+			pool.AddLocal(local)
+		}
+		b.StartTimer()
+		// Assign a high enough balance for testing
+		pool.currentState.AddBalance(remoteAddr, big.NewInt(100000000))
+		for i := 0; i < len(remotes); i++ {
+			pool.AddRemotes([]*types.Transaction{remotes[i]})
+		}
+		pool.Stop()
+	}
+}
-- 
GitLab