From da7d57e07c04dcbb7cc20b35f6606ef3f4c400e3 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= <peterke@gmail.com>
Date: Mon, 4 Sep 2017 22:35:00 +0300
Subject: [PATCH] core: make txpool operate on immutable state

---
 core/blockchain.go         |  12 --
 core/error.go              |   4 +
 core/state_transition.go   |   8 +-
 core/tx_list.go            |   1 +
 core/tx_pool.go            | 203 +++++++++++++++------------
 core/tx_pool_test.go       | 281 +++++++++++++------------------------
 eth/api_backend.go         |   8 --
 internal/ethapi/api.go     |   1 -
 internal/ethapi/backend.go |   1 -
 miner/worker.go            |  21 ++-
 10 files changed, 233 insertions(+), 307 deletions(-)

diff --git a/core/blockchain.go b/core/blockchain.go
index f3ca4e08c..0bb12fc19 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -81,7 +81,6 @@ type BlockChain struct {
 
 	hc            *HeaderChain
 	chainDb       ethdb.Database
-	rmTxFeed      event.Feed
 	rmLogsFeed    event.Feed
 	chainFeed     event.Feed
 	chainSideFeed event.Feed
@@ -1194,15 +1193,9 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
 	for _, tx := range diff {
 		DeleteTxLookupEntry(bc.chainDb, tx.Hash())
 	}
-	// Must be posted in a goroutine because of the transaction pool trying
-	// to acquire the chain manager lock
-	if len(diff) > 0 {
-		go bc.rmTxFeed.Send(RemovedTransactionEvent{diff})
-	}
 	if len(deletedLogs) > 0 {
 		go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
 	}
-
 	if len(oldChain) > 0 {
 		go func() {
 			for _, block := range oldChain {
@@ -1401,11 +1394,6 @@ func (bc *BlockChain) Config() *params.ChainConfig { return bc.config }
 // Engine retrieves the blockchain's consensus engine.
 func (bc *BlockChain) Engine() consensus.Engine { return bc.engine }
 
-// SubscribeRemovedTxEvent registers a subscription of RemovedTransactionEvent.
-func (bc *BlockChain) SubscribeRemovedTxEvent(ch chan<- RemovedTransactionEvent) event.Subscription {
-	return bc.scope.Track(bc.rmTxFeed.Subscribe(ch))
-}
-
 // SubscribeRemovedLogsEvent registers a subscription of RemovedLogsEvent.
 func (bc *BlockChain) SubscribeRemovedLogsEvent(ch chan<- RemovedLogsEvent) event.Subscription {
 	return bc.scope.Track(bc.rmLogsFeed.Subscribe(ch))
diff --git a/core/error.go b/core/error.go
index 9ac4fff51..410eca1e1 100644
--- a/core/error.go
+++ b/core/error.go
@@ -28,4 +28,8 @@ var (
 
 	// ErrBlacklistedHash is returned if a block to import is on the blacklist.
 	ErrBlacklistedHash = errors.New("blacklisted hash")
+
+	// ErrNonceTooHigh is returned if the nonce of a transaction is higher than the
+	// next one expected based on the local chain.
+	ErrNonceTooHigh = errors.New("nonce too high")
 )
diff --git a/core/state_transition.go b/core/state_transition.go
index bab4540be..e7a068589 100644
--- a/core/state_transition.go
+++ b/core/state_transition.go
@@ -18,7 +18,6 @@ package core
 
 import (
 	"errors"
-	"fmt"
 	"math/big"
 
 	"github.com/ethereum/go-ethereum/common"
@@ -197,8 +196,11 @@ func (st *StateTransition) preCheck() error {
 
 	// Make sure this transaction's nonce is correct
 	if msg.CheckNonce() {
-		if n := st.state.GetNonce(sender.Address()); n != msg.Nonce() {
-			return fmt.Errorf("invalid nonce: have %d, expected %d", msg.Nonce(), n)
+		nonce := st.state.GetNonce(sender.Address())
+		if nonce < msg.Nonce() {
+			return ErrNonceTooHigh
+		} else if nonce > msg.Nonce() {
+			return ErrNonceTooLow
 		}
 	}
 	return st.buyGas()
diff --git a/core/tx_list.go b/core/tx_list.go
index 1087970fa..2935929d7 100644
--- a/core/tx_list.go
+++ b/core/tx_list.go
@@ -298,6 +298,7 @@ func (l *txList) Filter(costLimit, gasLimit *big.Int) (types.Transactions, types
 
 	// If the list was strict, filter anything above the lowest nonce
 	var invalids types.Transactions
+
 	if l.strict && len(removed) > 0 {
 		lowest := uint64(math.MaxUint64)
 		for _, tx := range removed {
diff --git a/core/tx_pool.go b/core/tx_pool.go
index d835c94d1..f41fbe069 100644
--- a/core/tx_pool.go
+++ b/core/tx_pool.go
@@ -105,10 +105,11 @@ var (
 // blockChain provides the state of blockchain and current gas limit to do
 // some pre checks in tx pool and event subscribers.
 type blockChain interface {
-	State() (*state.StateDB, error)
-	GasLimit() *big.Int
+	CurrentHeader() *types.Header
 	SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription
-	SubscribeRemovedTxEvent(ch chan<- RemovedTransactionEvent) event.Subscription
+
+	GetBlock(hash common.Hash, number uint64) *types.Block
+	StateAt(root common.Hash) (*state.StateDB, error)
 }
 
 // TxPoolConfig are the configuration parameters of the transaction pool.
@@ -174,18 +175,19 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig {
 type TxPool struct {
 	config       TxPoolConfig
 	chainconfig  *params.ChainConfig
-	blockChain   blockChain
-	pendingState *state.ManagedState
+	chain        blockChain
 	gasPrice     *big.Int
 	txFeed       event.Feed
 	scope        event.SubscriptionScope
 	chainHeadCh  chan ChainHeadEvent
 	chainHeadSub event.Subscription
-	rmTxCh       chan RemovedTransactionEvent
-	rmTxSub      event.Subscription
 	signer       types.Signer
 	mu           sync.RWMutex
 
+	currentState  *state.StateDB      // Current state in the blockchain head
+	pendingState  *state.ManagedState // Pending state tracking virtual nonces
+	currentMaxGas *big.Int            // Current gas limit for transaction caps
+
 	locals  *accountSet // Set of local transaction to exepmt from evicion rules
 	journal *txJournal  // Journal of local transaction to back up to disk
 
@@ -202,28 +204,26 @@ type TxPool struct {
 
 // NewTxPool creates a new transaction pool to gather, sort and filter inbound
 // trnsactions from the network.
-func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, blockChain blockChain) *TxPool {
+func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool {
 	// Sanitize the input to ensure no vulnerable gas prices are set
 	config = (&config).sanitize()
 
 	// Create the transaction pool with its initial settings
 	pool := &TxPool{
-		config:       config,
-		chainconfig:  chainconfig,
-		blockChain:   blockChain,
-		signer:       types.NewEIP155Signer(chainconfig.ChainId),
-		pending:      make(map[common.Address]*txList),
-		queue:        make(map[common.Address]*txList),
-		beats:        make(map[common.Address]time.Time),
-		all:          make(map[common.Hash]*types.Transaction),
-		chainHeadCh:  make(chan ChainHeadEvent, chainHeadChanSize),
-		rmTxCh:       make(chan RemovedTransactionEvent, rmTxChanSize),
-		gasPrice:     new(big.Int).SetUint64(config.PriceLimit),
-		pendingState: nil,
+		config:      config,
+		chainconfig: chainconfig,
+		chain:       chain,
+		signer:      types.NewEIP155Signer(chainconfig.ChainId),
+		pending:     make(map[common.Address]*txList),
+		queue:       make(map[common.Address]*txList),
+		beats:       make(map[common.Address]time.Time),
+		all:         make(map[common.Hash]*types.Transaction),
+		chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
+		gasPrice:    new(big.Int).SetUint64(config.PriceLimit),
 	}
 	pool.locals = newAccountSet(pool.signer)
 	pool.priced = newTxPricedList(&pool.all)
-	pool.reset()
+	pool.reset(nil, chain.CurrentHeader())
 
 	// If local transactions and journaling is enabled, load from disk
 	if !config.NoLocals && config.Journal != "" {
@@ -237,8 +237,8 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, blockChain
 		}
 	}
 	// Subscribe events from blockchain
-	pool.chainHeadSub = pool.blockChain.SubscribeChainHeadEvent(pool.chainHeadCh)
-	pool.rmTxSub = pool.blockChain.SubscribeRemovedTxEvent(pool.rmTxCh)
+	pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)
+
 	// Start the event loop and return
 	pool.wg.Add(1)
 	go pool.loop()
@@ -264,31 +264,28 @@ func (pool *TxPool) loop() {
 	journal := time.NewTicker(pool.config.Rejournal)
 	defer journal.Stop()
 
+	// Track the previous head headers for transaction reorgs
+	head := pool.chain.CurrentHeader()
+
 	// Keep waiting for and reacting to the various events
 	for {
 		select {
 		// Handle ChainHeadEvent
 		case ev := <-pool.chainHeadCh:
-			pool.mu.Lock()
 			if ev.Block != nil {
+				pool.mu.Lock()
 				if pool.chainconfig.IsHomestead(ev.Block.Number()) {
 					pool.homestead = true
 				}
+				pool.reset(head, ev.Block.Header())
+				head = ev.Block.Header()
 
+				pool.mu.Unlock()
 			}
-			pool.reset()
-			pool.mu.Unlock()
 		// Be unsubscribed due to system stopped
 		case <-pool.chainHeadSub.Err():
 			return
 
-		// Handle RemovedTransactionEvent
-		case ev := <-pool.rmTxCh:
-			pool.addTxs(ev.Txs, false)
-		// Be unsubscribed due to system stopped
-		case <-pool.rmTxSub.Err():
-			return
-
 		// Handle stats reporting ticks
 		case <-report.C:
 			pool.mu.RLock()
@@ -333,28 +330,76 @@ func (pool *TxPool) loop() {
 
 // lockedReset is a wrapper around reset to allow calling it in a thread safe
 // manner. This method is only ever used in the tester!
-func (pool *TxPool) lockedReset() {
+func (pool *TxPool) lockedReset(oldHead, newHead *types.Header) {
 	pool.mu.Lock()
 	defer pool.mu.Unlock()
 
-	pool.reset()
+	pool.reset(oldHead, newHead)
 }
 
 // reset retrieves the current state of the blockchain and ensures the content
 // of the transaction pool is valid with regard to the chain state.
-func (pool *TxPool) reset() {
-	currentState, err := pool.blockChain.State()
+func (pool *TxPool) reset(oldHead, newHead *types.Header) {
+	// If we're reorging an old state, reinject all dropped transactions
+	var reinject types.Transactions
+
+	if oldHead != nil && oldHead.Hash() != newHead.ParentHash {
+		var discarded, included types.Transactions
+
+		var (
+			rem = pool.chain.GetBlock(oldHead.Hash(), oldHead.Number.Uint64())
+			add = pool.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64())
+		)
+		for rem.NumberU64() > add.NumberU64() {
+			discarded = append(discarded, rem.Transactions()...)
+			if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
+				log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
+				return
+			}
+		}
+		for add.NumberU64() > rem.NumberU64() {
+			included = append(included, add.Transactions()...)
+			if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
+				log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
+				return
+			}
+		}
+		for rem.Hash() != add.Hash() {
+			discarded = append(discarded, rem.Transactions()...)
+			if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
+				log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
+				return
+			}
+			included = append(included, add.Transactions()...)
+			if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
+				log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
+				return
+			}
+		}
+		reinject = types.TxDifference(discarded, included)
+	}
+	// Initialize the internal state to the current head
+	if newHead == nil {
+		newHead = pool.chain.CurrentHeader() // Special case during testing
+	}
+	statedb, err := pool.chain.StateAt(newHead.Root)
 	if err != nil {
-		log.Error("Failed reset txpool state", "err", err)
+		log.Error("Failed to reset txpool state", "err", err)
 		return
 	}
-	pool.pendingState = state.ManageState(currentState)
+	pool.currentState = statedb
+	pool.pendingState = state.ManageState(statedb)
+	pool.currentMaxGas = newHead.GasLimit
+
+	// Inject any transactions discarded due to reorgs
+	log.Debug("Reinjecting stale transactions", "count", len(reinject))
+	pool.addTxsLocked(reinject, false)
 
 	// validate the pool of pending transactions, this will remove
 	// any transactions that have been included in the block or
 	// have been invalidated because of another transaction (e.g.
 	// higher gas price)
-	pool.demoteUnexecutables(currentState)
+	pool.demoteUnexecutables()
 
 	// Update all accounts to the latest known pending nonce
 	for addr, list := range pool.pending {
@@ -363,16 +408,16 @@ func (pool *TxPool) reset() {
 	}
 	// Check the queue and move transactions over to the pending if possible
 	// or remove those that have become invalid
-	pool.promoteExecutables(currentState, nil)
+	pool.promoteExecutables(nil)
 }
 
 // Stop terminates the transaction pool.
 func (pool *TxPool) Stop() {
 	// Unsubscribe all subscriptions registered from txpool
 	pool.scope.Close()
+
 	// Unsubscribe subscriptions registered from blockchain
 	pool.chainHeadSub.Unsubscribe()
-	pool.rmTxSub.Unsubscribe()
 	pool.wg.Wait()
 
 	if pool.journal != nil {
@@ -442,8 +487,8 @@ func (pool *TxPool) stats() (int, int) {
 // Content retrieves the data content of the transaction pool, returning all the
 // pending as well as queued transactions, grouped by account and sorted by nonce.
 func (pool *TxPool) Content() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) {
-	pool.mu.RLock()
-	defer pool.mu.RUnlock()
+	pool.mu.Lock()
+	defer pool.mu.Unlock()
 
 	pending := make(map[common.Address]types.Transactions)
 	for addr, list := range pool.pending {
@@ -499,7 +544,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
 		return ErrNegativeValue
 	}
 	// Ensure the transaction doesn't exceed the current block limit gas.
-	if pool.blockChain.GasLimit().Cmp(tx.Gas()) < 0 {
+	if pool.currentMaxGas.Cmp(tx.Gas()) < 0 {
 		return ErrGasLimit
 	}
 	// Make sure the transaction is signed properly
@@ -513,16 +558,12 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
 		return ErrUnderpriced
 	}
 	// Ensure the transaction adheres to nonce ordering
-	currentState, err := pool.blockChain.State()
-	if err != nil {
-		return err
-	}
-	if currentState.GetNonce(from) > tx.Nonce() {
+	if pool.currentState.GetNonce(from) > tx.Nonce() {
 		return ErrNonceTooLow
 	}
 	// Transactor should have enough funds to cover the costs
 	// cost == V + GP * GL
-	if currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
+	if pool.currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
 		return ErrInsufficientFunds
 	}
 	intrGas := IntrinsicGas(tx.Data(), tx.To() == nil, pool.homestead)
@@ -721,12 +762,8 @@ func (pool *TxPool) addTx(tx *types.Transaction, local bool) error {
 	}
 	// If we added a new transaction, run promotion checks and return
 	if !replace {
-		state, err := pool.blockChain.State()
-		if err != nil {
-			return err
-		}
 		from, _ := types.Sender(pool.signer, tx) // already validated
-		pool.promoteExecutables(state, []common.Address{from})
+		pool.promoteExecutables([]common.Address{from})
 	}
 	return nil
 }
@@ -736,6 +773,12 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) error {
 	pool.mu.Lock()
 	defer pool.mu.Unlock()
 
+	return pool.addTxsLocked(txs, local)
+}
+
+// addTxsLocked attempts to queue a batch of transactions if they are valid,
+// whilst assuming the transaction pool lock is already held.
+func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) error {
 	// Add the batch of transaction, tracking the accepted ones
 	dirty := make(map[common.Address]struct{})
 	for _, tx := range txs {
@@ -748,15 +791,11 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) error {
 	}
 	// Only reprocess the internal state if something was actually added
 	if len(dirty) > 0 {
-		state, err := pool.blockChain.State()
-		if err != nil {
-			return err
-		}
 		addrs := make([]common.Address, 0, len(dirty))
 		for addr, _ := range dirty {
 			addrs = append(addrs, addr)
 		}
-		pool.promoteExecutables(state, addrs)
+		pool.promoteExecutables(addrs)
 	}
 	return nil
 }
@@ -770,24 +809,6 @@ func (pool *TxPool) Get(hash common.Hash) *types.Transaction {
 	return pool.all[hash]
 }
 
-// Remove removes the transaction with the given hash from the pool.
-func (pool *TxPool) Remove(hash common.Hash) {
-	pool.mu.Lock()
-	defer pool.mu.Unlock()
-
-	pool.removeTx(hash)
-}
-
-// RemoveBatch removes all given transactions from the pool.
-func (pool *TxPool) RemoveBatch(txs types.Transactions) {
-	pool.mu.Lock()
-	defer pool.mu.Unlock()
-
-	for _, tx := range txs {
-		pool.removeTx(tx.Hash())
-	}
-}
-
 // removeTx removes a single transaction from the queue, moving all subsequent
 // transactions back to the future queue.
 func (pool *TxPool) removeTx(hash common.Hash) {
@@ -834,9 +855,7 @@ func (pool *TxPool) removeTx(hash common.Hash) {
 // promoteExecutables moves transactions that have become processable from the
 // future queue to the set of pending transactions. During this process, all
 // invalidated transactions (low nonce, low balance) are deleted.
-func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.Address) {
-	gaslimit := pool.blockChain.GasLimit()
-
+func (pool *TxPool) promoteExecutables(accounts []common.Address) {
 	// Gather all the accounts potentially needing updates
 	if accounts == nil {
 		accounts = make([]common.Address, 0, len(pool.queue))
@@ -851,14 +870,14 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A
 			continue // Just in case someone calls with a non existing account
 		}
 		// Drop all transactions that are deemed too old (low nonce)
-		for _, tx := range list.Forward(state.GetNonce(addr)) {
+		for _, tx := range list.Forward(pool.currentState.GetNonce(addr)) {
 			hash := tx.Hash()
 			log.Trace("Removed old queued transaction", "hash", hash)
 			delete(pool.all, hash)
 			pool.priced.Removed()
 		}
 		// Drop all transactions that are too costly (low balance or out of gas)
-		drops, _ := list.Filter(state.GetBalance(addr), gaslimit)
+		drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
 		for _, tx := range drops {
 			hash := tx.Hash()
 			log.Trace("Removed unpayable queued transaction", "hash", hash)
@@ -1003,12 +1022,10 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A
 // demoteUnexecutables removes invalid and processed transactions from the pools
 // executable/pending queue and any subsequent transactions that become unexecutable
 // are moved back into the future queue.
-func (pool *TxPool) demoteUnexecutables(state *state.StateDB) {
-	gaslimit := pool.blockChain.GasLimit()
-
+func (pool *TxPool) demoteUnexecutables() {
 	// Iterate over all accounts and demote any non-executable transactions
 	for addr, list := range pool.pending {
-		nonce := state.GetNonce(addr)
+		nonce := pool.currentState.GetNonce(addr)
 
 		// Drop all transactions that are deemed too old (low nonce)
 		for _, tx := range list.Forward(nonce) {
@@ -1018,7 +1035,7 @@ func (pool *TxPool) demoteUnexecutables(state *state.StateDB) {
 			pool.priced.Removed()
 		}
 		// Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later
-		drops, invalids := list.Filter(state.GetBalance(addr), gaslimit)
+		drops, invalids := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
 		for _, tx := range drops {
 			hash := tx.Hash()
 			log.Trace("Removed unpayable pending transaction", "hash", hash)
@@ -1031,6 +1048,14 @@ func (pool *TxPool) demoteUnexecutables(state *state.StateDB) {
 			log.Trace("Demoting pending transaction", "hash", hash)
 			pool.enqueueTx(hash, tx)
 		}
+		// If there's a gap in front, warn (should never happen) and postpone all transactions
+		if list.Len() > 0 && list.txs.Get(nonce) == nil {
+			for _, tx := range list.Cap(0) {
+				hash := tx.Hash()
+				log.Error("Demoting invalidated transaction", "hash", hash)
+				pool.enqueueTx(hash, tx)
+			}
+		}
 		// Delete the entire queue entry if it became empty.
 		if list.Empty() {
 			delete(pool.pending, addr)
diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go
index c1bcb1b2d..cdd45b4b1 100644
--- a/core/tx_pool_test.go
+++ b/core/tx_pool_test.go
@@ -48,23 +48,24 @@ type testBlockChain struct {
 	statedb       *state.StateDB
 	gasLimit      *big.Int
 	chainHeadFeed *event.Feed
-	rmTxFeed      *event.Feed
 }
 
-func (bc *testBlockChain) State() (*state.StateDB, error) {
-	return bc.statedb, nil
-}
-
-func (bc *testBlockChain) GasLimit() *big.Int {
-	return new(big.Int).Set(bc.gasLimit)
+func (bc *testBlockChain) CurrentHeader() *types.Header {
+	return &types.Header{
+		GasLimit: bc.gasLimit,
+	}
 }
 
 func (bc *testBlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription {
 	return bc.chainHeadFeed.Subscribe(ch)
 }
 
-func (bc *testBlockChain) SubscribeRemovedTxEvent(ch chan<- RemovedTransactionEvent) event.Subscription {
-	return bc.rmTxFeed.Subscribe(ch)
+func (bc *testBlockChain) GetBlock(hash common.Hash, number uint64) *types.Block {
+	return types.NewBlock(bc.CurrentHeader(), nil, nil, nil)
+}
+
+func (bc *testBlockChain) StateAt(common.Hash) (*state.StateDB, error) {
+	return bc.statedb, nil
 }
 
 func transaction(nonce uint64, gaslimit *big.Int, key *ecdsa.PrivateKey) *types.Transaction {
@@ -79,7 +80,7 @@ func pricedTransaction(nonce uint64, gaslimit, gasprice *big.Int, key *ecdsa.Pri
 func setupTxPool() (*TxPool, *ecdsa.PrivateKey) {
 	db, _ := ethdb.NewMemDatabase()
 	statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
-	blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)}
+	blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)}
 
 	key, _ := crypto.GenerateKey()
 	pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
@@ -159,7 +160,7 @@ func TestStateChangeDuringPoolReset(t *testing.T) {
 
 	// setup pool with 2 transaction in it
 	statedb.SetBalance(address, new(big.Int).SetUint64(params.Ether))
-	blockchain := &testChain{&testBlockChain{statedb, big.NewInt(1000000000), new(event.Feed), new(event.Feed)}, address, &trigger}
+	blockchain := &testChain{&testBlockChain{statedb, big.NewInt(1000000000), new(event.Feed)}, address, &trigger}
 
 	tx0 := transaction(0, big.NewInt(100000), key)
 	tx1 := transaction(1, big.NewInt(100000), key)
@@ -182,7 +183,7 @@ func TestStateChangeDuringPoolReset(t *testing.T) {
 	// trigger state change in the background
 	trigger = true
 
-	pool.lockedReset()
+	pool.lockedReset(nil, nil)
 
 	pendingTx, err := pool.Pending()
 	if err != nil {
@@ -205,20 +206,20 @@ func TestInvalidTransactions(t *testing.T) {
 
 	tx := transaction(0, big.NewInt(100), key)
 	from, _ := deriveSender(tx)
-	currentState, _ := pool.blockChain.State()
-	currentState.AddBalance(from, big.NewInt(1))
+
+	pool.currentState.AddBalance(from, big.NewInt(1))
 	if err := pool.AddRemote(tx); err != ErrInsufficientFunds {
 		t.Error("expected", ErrInsufficientFunds)
 	}
 
 	balance := new(big.Int).Add(tx.Value(), new(big.Int).Mul(tx.Gas(), tx.GasPrice()))
-	currentState.AddBalance(from, balance)
+	pool.currentState.AddBalance(from, balance)
 	if err := pool.AddRemote(tx); err != ErrIntrinsicGas {
 		t.Error("expected", ErrIntrinsicGas, "got", err)
 	}
 
-	currentState.SetNonce(from, 1)
-	currentState.AddBalance(from, big.NewInt(0xffffffffffffff))
+	pool.currentState.SetNonce(from, 1)
+	pool.currentState.AddBalance(from, big.NewInt(0xffffffffffffff))
 	tx = transaction(0, big.NewInt(100000), key)
 	if err := pool.AddRemote(tx); err != ErrNonceTooLow {
 		t.Error("expected", ErrNonceTooLow)
@@ -240,21 +241,20 @@ func TestTransactionQueue(t *testing.T) {
 
 	tx := transaction(0, big.NewInt(100), key)
 	from, _ := deriveSender(tx)
-	currentState, _ := pool.blockChain.State()
-	currentState.AddBalance(from, big.NewInt(1000))
-	pool.lockedReset()
+	pool.currentState.AddBalance(from, big.NewInt(1000))
+	pool.lockedReset(nil, nil)
 	pool.enqueueTx(tx.Hash(), tx)
 
-	pool.promoteExecutables(currentState, []common.Address{from})
+	pool.promoteExecutables([]common.Address{from})
 	if len(pool.pending) != 1 {
 		t.Error("expected valid txs to be 1 is", len(pool.pending))
 	}
 
 	tx = transaction(1, big.NewInt(100), key)
 	from, _ = deriveSender(tx)
-	currentState.SetNonce(from, 2)
+	pool.currentState.SetNonce(from, 2)
 	pool.enqueueTx(tx.Hash(), tx)
-	pool.promoteExecutables(currentState, []common.Address{from})
+	pool.promoteExecutables([]common.Address{from})
 	if _, ok := pool.pending[from].txs.items[tx.Nonce()]; ok {
 		t.Error("expected transaction to be in tx pool")
 	}
@@ -270,15 +270,14 @@ func TestTransactionQueue(t *testing.T) {
 	tx2 := transaction(10, big.NewInt(100), key)
 	tx3 := transaction(11, big.NewInt(100), key)
 	from, _ = deriveSender(tx1)
-	currentState, _ = pool.blockChain.State()
-	currentState.AddBalance(from, big.NewInt(1000))
-	pool.lockedReset()
+	pool.currentState.AddBalance(from, big.NewInt(1000))
+	pool.lockedReset(nil, nil)
 
 	pool.enqueueTx(tx1.Hash(), tx1)
 	pool.enqueueTx(tx2.Hash(), tx2)
 	pool.enqueueTx(tx3.Hash(), tx3)
 
-	pool.promoteExecutables(currentState, []common.Address{from})
+	pool.promoteExecutables([]common.Address{from})
 
 	if len(pool.pending) != 1 {
 		t.Error("expected tx pool to be 1, got", len(pool.pending))
@@ -288,45 +287,13 @@ func TestTransactionQueue(t *testing.T) {
 	}
 }
 
-func TestRemoveTx(t *testing.T) {
-	pool, key := setupTxPool()
-	defer pool.Stop()
-
-	addr := crypto.PubkeyToAddress(key.PublicKey)
-	currentState, _ := pool.blockChain.State()
-	currentState.AddBalance(addr, big.NewInt(1))
-
-	tx1 := transaction(0, big.NewInt(100), key)
-	tx2 := transaction(2, big.NewInt(100), key)
-
-	pool.promoteTx(addr, tx1.Hash(), tx1)
-	pool.enqueueTx(tx2.Hash(), tx2)
-
-	if len(pool.queue) != 1 {
-		t.Error("expected queue to be 1, got", len(pool.queue))
-	}
-	if len(pool.pending) != 1 {
-		t.Error("expected pending to be 1, got", len(pool.pending))
-	}
-	pool.Remove(tx1.Hash())
-	pool.Remove(tx2.Hash())
-
-	if len(pool.queue) > 0 {
-		t.Error("expected queue to be 0, got", len(pool.queue))
-	}
-	if len(pool.pending) > 0 {
-		t.Error("expected pending to be 0, got", len(pool.pending))
-	}
-}
-
 func TestNegativeValue(t *testing.T) {
 	pool, key := setupTxPool()
 	defer pool.Stop()
 
 	tx, _ := types.SignTx(types.NewTransaction(0, common.Address{}, big.NewInt(-1), big.NewInt(100), big.NewInt(1), nil), types.HomesteadSigner{}, key)
 	from, _ := deriveSender(tx)
-	currentState, _ := pool.blockChain.State()
-	currentState.AddBalance(from, big.NewInt(1))
+	pool.currentState.AddBalance(from, big.NewInt(1))
 	if err := pool.AddRemote(tx); err != ErrNegativeValue {
 		t.Error("expected", ErrNegativeValue, "got", err)
 	}
@@ -340,10 +307,10 @@ func TestTransactionChainFork(t *testing.T) {
 	resetState := func() {
 		db, _ := ethdb.NewMemDatabase()
 		statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
-		pool.blockChain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)}
-		currentState, _ := pool.blockChain.State()
-		currentState.AddBalance(addr, big.NewInt(100000000000000))
-		pool.lockedReset()
+		statedb.AddBalance(addr, big.NewInt(100000000000000))
+
+		pool.chain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)}
+		pool.lockedReset(nil, nil)
 	}
 	resetState()
 
@@ -351,7 +318,7 @@ func TestTransactionChainFork(t *testing.T) {
 	if _, err := pool.add(tx, false); err != nil {
 		t.Error("didn't expect error", err)
 	}
-	pool.RemoveBatch([]*types.Transaction{tx})
+	pool.removeTx(tx.Hash())
 
 	// reset the pool's internal state
 	resetState()
@@ -368,10 +335,10 @@ func TestTransactionDoubleNonce(t *testing.T) {
 	resetState := func() {
 		db, _ := ethdb.NewMemDatabase()
 		statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
-		pool.blockChain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)}
-		currentState, _ := pool.blockChain.State()
-		currentState.AddBalance(addr, big.NewInt(100000000000000))
-		pool.lockedReset()
+		statedb.AddBalance(addr, big.NewInt(100000000000000))
+
+		pool.chain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)}
+		pool.lockedReset(nil, nil)
 	}
 	resetState()
 
@@ -387,8 +354,7 @@ func TestTransactionDoubleNonce(t *testing.T) {
 	if replace, err := pool.add(tx2, false); err != nil || !replace {
 		t.Errorf("second transaction insert failed (%v) or not reported replacement (%v)", err, replace)
 	}
-	state, _ := pool.blockChain.State()
-	pool.promoteExecutables(state, []common.Address{addr})
+	pool.promoteExecutables([]common.Address{addr})
 	if pool.pending[addr].Len() != 1 {
 		t.Error("expected 1 pending transactions, got", pool.pending[addr].Len())
 	}
@@ -397,7 +363,7 @@ func TestTransactionDoubleNonce(t *testing.T) {
 	}
 	// Add the third transaction and ensure it's not saved (smaller price)
 	pool.add(tx3, false)
-	pool.promoteExecutables(state, []common.Address{addr})
+	pool.promoteExecutables([]common.Address{addr})
 	if pool.pending[addr].Len() != 1 {
 		t.Error("expected 1 pending transactions, got", pool.pending[addr].Len())
 	}
@@ -415,8 +381,7 @@ func TestMissingNonce(t *testing.T) {
 	defer pool.Stop()
 
 	addr := crypto.PubkeyToAddress(key.PublicKey)
-	currentState, _ := pool.blockChain.State()
-	currentState.AddBalance(addr, big.NewInt(100000000000000))
+	pool.currentState.AddBalance(addr, big.NewInt(100000000000000))
 	tx := transaction(1, big.NewInt(100000), key)
 	if _, err := pool.add(tx, false); err != nil {
 		t.Error("didn't expect error", err)
@@ -432,47 +397,25 @@ func TestMissingNonce(t *testing.T) {
 	}
 }
 
-func TestNonceRecovery(t *testing.T) {
+func TestTransactionNonceRecovery(t *testing.T) {
 	const n = 10
 	pool, key := setupTxPool()
 	defer pool.Stop()
 
 	addr := crypto.PubkeyToAddress(key.PublicKey)
-	currentState, _ := pool.blockChain.State()
-	currentState.SetNonce(addr, n)
-	currentState.AddBalance(addr, big.NewInt(100000000000000))
-	pool.lockedReset()
+	pool.currentState.SetNonce(addr, n)
+	pool.currentState.AddBalance(addr, big.NewInt(100000000000000))
+	pool.lockedReset(nil, nil)
+
 	tx := transaction(n, big.NewInt(100000), key)
 	if err := pool.AddRemote(tx); err != nil {
 		t.Error(err)
 	}
 	// simulate some weird re-order of transactions and missing nonce(s)
-	currentState.SetNonce(addr, n-1)
-	pool.lockedReset()
-	if fn := pool.pendingState.GetNonce(addr); fn != n+1 {
-		t.Errorf("expected nonce to be %d, got %d", n+1, fn)
-	}
-}
-
-func TestRemovedTxEvent(t *testing.T) {
-	pool, key := setupTxPool()
-	defer pool.Stop()
-
-	tx := transaction(0, big.NewInt(1000000), key)
-	from, _ := deriveSender(tx)
-	currentState, _ := pool.blockChain.State()
-	currentState.AddBalance(from, big.NewInt(1000000000000))
-	pool.lockedReset()
-	blockChain, _ := pool.blockChain.(*testBlockChain)
-	blockChain.rmTxFeed.Send(RemovedTransactionEvent{types.Transactions{tx}})
-	blockChain.chainHeadFeed.Send(ChainHeadEvent{nil})
-	// wait for handling events
-	<-time.After(500 * time.Millisecond)
-	if pool.pending[from].Len() != 1 {
-		t.Error("expected 1 pending tx, got", pool.pending[from].Len())
-	}
-	if len(pool.all) != 1 {
-		t.Error("expected 1 total transactions, got", len(pool.all))
+	pool.currentState.SetNonce(addr, n-1)
+	pool.lockedReset(nil, nil)
+	if fn := pool.pendingState.GetNonce(addr); fn != n-1 {
+		t.Errorf("expected nonce to be %d, got %d", n-1, fn)
 	}
 }
 
@@ -484,9 +427,7 @@ func TestTransactionDropping(t *testing.T) {
 	defer pool.Stop()
 
 	account, _ := deriveSender(transaction(0, big.NewInt(0), key))
-
-	state, _ := pool.blockChain.State()
-	state.AddBalance(account, big.NewInt(1000))
+	pool.currentState.AddBalance(account, big.NewInt(1000))
 
 	// Add some pending and some queued transactions
 	var (
@@ -514,7 +455,7 @@ func TestTransactionDropping(t *testing.T) {
 	if len(pool.all) != 6 {
 		t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 6)
 	}
-	pool.lockedReset()
+	pool.lockedReset(nil, nil)
 	if pool.pending[account].Len() != 3 {
 		t.Errorf("pending transaction mismatch: have %d, want %d", pool.pending[account].Len(), 3)
 	}
@@ -525,8 +466,8 @@ func TestTransactionDropping(t *testing.T) {
 		t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 6)
 	}
 	// Reduce the balance of the account, and check that invalidated transactions are dropped
-	state.AddBalance(account, big.NewInt(-650))
-	pool.lockedReset()
+	pool.currentState.AddBalance(account, big.NewInt(-650))
+	pool.lockedReset(nil, nil)
 
 	if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok {
 		t.Errorf("funded pending transaction missing: %v", tx0)
@@ -550,8 +491,8 @@ func TestTransactionDropping(t *testing.T) {
 		t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 4)
 	}
 	// Reduce the block gas limit, check that invalidated transactions are dropped
-	pool.blockChain.(*testBlockChain).gasLimit = big.NewInt(100)
-	pool.lockedReset()
+	pool.chain.(*testBlockChain).gasLimit = big.NewInt(100)
+	pool.lockedReset(nil, nil)
 
 	if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok {
 		t.Errorf("funded pending transaction missing: %v", tx0)
@@ -579,9 +520,7 @@ func TestTransactionPostponing(t *testing.T) {
 	defer pool.Stop()
 
 	account, _ := deriveSender(transaction(0, big.NewInt(0), key))
-
-	state, _ := pool.blockChain.State()
-	state.AddBalance(account, big.NewInt(1000))
+	pool.currentState.AddBalance(account, big.NewInt(1000))
 
 	// Add a batch consecutive pending transactions for validation
 	txns := []*types.Transaction{}
@@ -605,7 +544,7 @@ func TestTransactionPostponing(t *testing.T) {
 	if len(pool.all) != len(txns) {
 		t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), len(txns))
 	}
-	pool.lockedReset()
+	pool.lockedReset(nil, nil)
 	if pool.pending[account].Len() != len(txns) {
 		t.Errorf("pending transaction mismatch: have %d, want %d", pool.pending[account].Len(), len(txns))
 	}
@@ -616,8 +555,8 @@ func TestTransactionPostponing(t *testing.T) {
 		t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), len(txns))
 	}
 	// Reduce the balance of the account, and check that transactions are reorganised
-	state.AddBalance(account, big.NewInt(-750))
-	pool.lockedReset()
+	pool.currentState.AddBalance(account, big.NewInt(-750))
+	pool.lockedReset(nil, nil)
 
 	if _, ok := pool.pending[account].txs.items[txns[0].Nonce()]; !ok {
 		t.Errorf("tx %d: valid and funded transaction missing from pending pool: %v", 0, txns[0])
@@ -655,10 +594,7 @@ func TestTransactionQueueAccountLimiting(t *testing.T) {
 	defer pool.Stop()
 
 	account, _ := deriveSender(transaction(0, big.NewInt(0), key))
-
-	state, _ := pool.blockChain.State()
-	state.AddBalance(account, big.NewInt(1000000))
-	pool.lockedReset()
+	pool.currentState.AddBalance(account, big.NewInt(1000000))
 
 	// Keep queuing up transactions and make sure all above a limit are dropped
 	for i := uint64(1); i <= testTxPoolConfig.AccountQueue+5; i++ {
@@ -699,7 +635,7 @@ func testTransactionQueueGlobalLimiting(t *testing.T, nolocals bool) {
 	// Create the pool to test the limit enforcement with
 	db, _ := ethdb.NewMemDatabase()
 	statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
-	blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)}
+	blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)}
 
 	config := testTxPoolConfig
 	config.NoLocals = nolocals
@@ -709,12 +645,10 @@ func testTransactionQueueGlobalLimiting(t *testing.T, nolocals bool) {
 	defer pool.Stop()
 
 	// Create a number of test accounts and fund them (last one will be the local)
-	state, _ := pool.blockChain.State()
-
 	keys := make([]*ecdsa.PrivateKey, 5)
 	for i := 0; i < len(keys); i++ {
 		keys[i], _ = crypto.GenerateKey()
-		state.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000))
+		pool.currentState.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000))
 	}
 	local := keys[len(keys)-1]
 
@@ -790,7 +724,7 @@ func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) {
 	// Create the pool to test the non-expiration enforcement
 	db, _ := ethdb.NewMemDatabase()
 	statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
-	blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)}
+	blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)}
 
 	config := testTxPoolConfig
 	config.Lifetime = time.Second
@@ -803,9 +737,8 @@ func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) {
 	local, _ := crypto.GenerateKey()
 	remote, _ := crypto.GenerateKey()
 
-	state, _ := pool.blockChain.State()
-	state.AddBalance(crypto.PubkeyToAddress(local.PublicKey), big.NewInt(1000000000))
-	state.AddBalance(crypto.PubkeyToAddress(remote.PublicKey), big.NewInt(1000000000))
+	pool.currentState.AddBalance(crypto.PubkeyToAddress(local.PublicKey), big.NewInt(1000000000))
+	pool.currentState.AddBalance(crypto.PubkeyToAddress(remote.PublicKey), big.NewInt(1000000000))
 
 	// Add the two transactions and ensure they both are queued up
 	if err := pool.AddLocal(pricedTransaction(1, big.NewInt(100000), big.NewInt(1), local)); err != nil {
@@ -854,10 +787,7 @@ func TestTransactionPendingLimiting(t *testing.T) {
 	defer pool.Stop()
 
 	account, _ := deriveSender(transaction(0, big.NewInt(0), key))
-
-	state, _ := pool.blockChain.State()
-	state.AddBalance(account, big.NewInt(1000000))
-	pool.lockedReset()
+	pool.currentState.AddBalance(account, big.NewInt(1000000))
 
 	// Keep queuing up transactions and make sure all above a limit are dropped
 	for i := uint64(0); i < testTxPoolConfig.AccountQueue+5; i++ {
@@ -887,8 +817,7 @@ func testTransactionLimitingEquivalency(t *testing.T, origin uint64) {
 	defer pool1.Stop()
 
 	account1, _ := deriveSender(transaction(0, big.NewInt(0), key1))
-	state1, _ := pool1.blockChain.State()
-	state1.AddBalance(account1, big.NewInt(1000000))
+	pool1.currentState.AddBalance(account1, big.NewInt(1000000))
 
 	for i := uint64(0); i < testTxPoolConfig.AccountQueue+5; i++ {
 		if err := pool1.AddRemote(transaction(origin+i, big.NewInt(100000), key1)); err != nil {
@@ -900,8 +829,7 @@ func testTransactionLimitingEquivalency(t *testing.T, origin uint64) {
 	defer pool2.Stop()
 
 	account2, _ := deriveSender(transaction(0, big.NewInt(0), key2))
-	state2, _ := pool2.blockChain.State()
-	state2.AddBalance(account2, big.NewInt(1000000))
+	pool2.currentState.AddBalance(account2, big.NewInt(1000000))
 
 	txns := []*types.Transaction{}
 	for i := uint64(0); i < testTxPoolConfig.AccountQueue+5; i++ {
@@ -934,7 +862,7 @@ func TestTransactionPendingGlobalLimiting(t *testing.T) {
 	// Create the pool to test the limit enforcement with
 	db, _ := ethdb.NewMemDatabase()
 	statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
-	blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)}
+	blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)}
 
 	config := testTxPoolConfig
 	config.GlobalSlots = config.AccountSlots * 10
@@ -943,12 +871,10 @@ func TestTransactionPendingGlobalLimiting(t *testing.T) {
 	defer pool.Stop()
 
 	// Create a number of test accounts and fund them
-	state, _ := pool.blockChain.State()
-
 	keys := make([]*ecdsa.PrivateKey, 5)
 	for i := 0; i < len(keys); i++ {
 		keys[i], _ = crypto.GenerateKey()
-		state.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000))
+		pool.currentState.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000))
 	}
 	// Generate and queue a batch of transactions
 	nonces := make(map[common.Address]uint64)
@@ -981,7 +907,7 @@ func TestTransactionCapClearsFromAll(t *testing.T) {
 	// Create the pool to test the limit enforcement with
 	db, _ := ethdb.NewMemDatabase()
 	statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
-	blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)}
+	blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)}
 
 	config := testTxPoolConfig
 	config.AccountSlots = 2
@@ -992,11 +918,9 @@ func TestTransactionCapClearsFromAll(t *testing.T) {
 	defer pool.Stop()
 
 	// Create a number of test accounts and fund them
-	state, _ := pool.blockChain.State()
-
 	key, _ := crypto.GenerateKey()
 	addr := crypto.PubkeyToAddress(key.PublicKey)
-	state.AddBalance(addr, big.NewInt(1000000))
+	pool.currentState.AddBalance(addr, big.NewInt(1000000))
 
 	txs := types.Transactions{}
 	for j := 0; j < int(config.GlobalSlots)*2; j++ {
@@ -1016,7 +940,7 @@ func TestTransactionPendingMinimumAllowance(t *testing.T) {
 	// Create the pool to test the limit enforcement with
 	db, _ := ethdb.NewMemDatabase()
 	statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
-	blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)}
+	blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)}
 
 	config := testTxPoolConfig
 	config.GlobalSlots = 0
@@ -1025,12 +949,10 @@ func TestTransactionPendingMinimumAllowance(t *testing.T) {
 	defer pool.Stop()
 
 	// Create a number of test accounts and fund them
-	state, _ := pool.blockChain.State()
-
 	keys := make([]*ecdsa.PrivateKey, 5)
 	for i := 0; i < len(keys); i++ {
 		keys[i], _ = crypto.GenerateKey()
-		state.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000))
+		pool.currentState.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000))
 	}
 	// Generate and queue a batch of transactions
 	nonces := make(map[common.Address]uint64)
@@ -1065,18 +987,16 @@ func TestTransactionPoolRepricing(t *testing.T) {
 	// Create the pool to test the pricing enforcement with
 	db, _ := ethdb.NewMemDatabase()
 	statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
-	blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)}
+	blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)}
 
 	pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
 	defer pool.Stop()
 
 	// Create a number of test accounts and fund them
-	state, _ := pool.blockChain.State()
-
 	keys := make([]*ecdsa.PrivateKey, 3)
 	for i := 0; i < len(keys); i++ {
 		keys[i], _ = crypto.GenerateKey()
-		state.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000))
+		pool.currentState.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000))
 	}
 	// Generate and queue a batch of transactions, both pending and queued
 	txs := types.Transactions{}
@@ -1147,18 +1067,16 @@ func TestTransactionPoolRepricingKeepsLocals(t *testing.T) {
 	// Create the pool to test the pricing enforcement with
 	db, _ := ethdb.NewMemDatabase()
 	statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
-	blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)}
+	blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)}
 
 	pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
 	defer pool.Stop()
 
 	// Create a number of test accounts and fund them
-	state, _ := pool.blockChain.State()
-
 	keys := make([]*ecdsa.PrivateKey, 3)
 	for i := 0; i < len(keys); i++ {
 		keys[i], _ = crypto.GenerateKey()
-		state.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000*1000000))
+		pool.currentState.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000*1000000))
 	}
 	// Create transaction (both pending and queued) with a linearly growing gasprice
 	for i := uint64(0); i < 500; i++ {
@@ -1189,11 +1107,11 @@ func TestTransactionPoolRepricingKeepsLocals(t *testing.T) {
 		}
 	}
 	validate()
-	
+
 	// Reprice the pool and check that nothing is dropped
 	pool.SetGasPrice(big.NewInt(2))
 	validate()
-	
+
 	pool.SetGasPrice(big.NewInt(2))
 	pool.SetGasPrice(big.NewInt(4))
 	pool.SetGasPrice(big.NewInt(8))
@@ -1210,7 +1128,7 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
 	// Create the pool to test the pricing enforcement with
 	db, _ := ethdb.NewMemDatabase()
 	statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
-	blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)}
+	blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)}
 
 	config := testTxPoolConfig
 	config.GlobalSlots = 2
@@ -1220,12 +1138,10 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
 	defer pool.Stop()
 
 	// Create a number of test accounts and fund them
-	state, _ := pool.blockChain.State()
-
 	keys := make([]*ecdsa.PrivateKey, 3)
 	for i := 0; i < len(keys); i++ {
 		keys[i], _ = crypto.GenerateKey()
-		state.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000))
+		pool.currentState.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000))
 	}
 	// Generate and queue a batch of transactions, both pending and queued
 	txs := types.Transactions{}
@@ -1298,16 +1214,14 @@ func TestTransactionReplacement(t *testing.T) {
 	// Create the pool to test the pricing enforcement with
 	db, _ := ethdb.NewMemDatabase()
 	statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
-	blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)}
+	blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)}
 
 	pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
 	defer pool.Stop()
 
 	// Create a test account to add transactions with
 	key, _ := crypto.GenerateKey()
-
-	state, _ := pool.blockChain.State()
-	state.AddBalance(crypto.PubkeyToAddress(key.PublicKey), big.NewInt(1000000000))
+	pool.currentState.AddBalance(crypto.PubkeyToAddress(key.PublicKey), big.NewInt(1000000000))
 
 	// Add pending transactions, ensuring the minimum price bump is enforced for replacement (for ultra low prices too)
 	price := int64(100)
@@ -1378,7 +1292,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) {
 	// Create the original pool to inject transaction into the journal
 	db, _ := ethdb.NewMemDatabase()
 	statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
-	blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)}
+	blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)}
 
 	config := testTxPoolConfig
 	config.NoLocals = nolocals
@@ -1391,9 +1305,8 @@ func testTransactionJournaling(t *testing.T, nolocals bool) {
 	local, _ := crypto.GenerateKey()
 	remote, _ := crypto.GenerateKey()
 
-	statedb, _ = pool.blockChain.State()
-	statedb.AddBalance(crypto.PubkeyToAddress(local.PublicKey), big.NewInt(1000000000))
-	statedb.AddBalance(crypto.PubkeyToAddress(remote.PublicKey), big.NewInt(1000000000))
+	pool.currentState.AddBalance(crypto.PubkeyToAddress(local.PublicKey), big.NewInt(1000000000))
+	pool.currentState.AddBalance(crypto.PubkeyToAddress(remote.PublicKey), big.NewInt(1000000000))
 
 	// Add three local and a remote transactions and ensure they are queued up
 	if err := pool.AddLocal(pricedTransaction(0, big.NewInt(100000), big.NewInt(1), local)); err != nil {
@@ -1421,7 +1334,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) {
 	// Terminate the old pool, bump the local nonce, create a new pool and ensure relevant transaction survive
 	pool.Stop()
 	statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1)
-	blockchain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)}
+	blockchain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)}
 	pool = NewTxPool(config, params.TestChainConfig, blockchain)
 
 	pending, queued = pool.Stats()
@@ -1442,11 +1355,11 @@ func testTransactionJournaling(t *testing.T, nolocals bool) {
 	}
 	// Bump the nonce temporarily and ensure the newly invalidated transaction is removed
 	statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 2)
-	pool.lockedReset()
+	pool.lockedReset(nil, nil)
 	time.Sleep(2 * config.Rejournal)
 	pool.Stop()
 	statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1)
-	blockchain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)}
+	blockchain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed)}
 	pool = NewTxPool(config, params.TestChainConfig, blockchain)
 
 	pending, queued = pool.Stats()
@@ -1480,8 +1393,7 @@ func benchmarkPendingDemotion(b *testing.B, size int) {
 	defer pool.Stop()
 
 	account, _ := deriveSender(transaction(0, big.NewInt(0), key))
-	state, _ := pool.blockChain.State()
-	state.AddBalance(account, big.NewInt(1000000))
+	pool.currentState.AddBalance(account, big.NewInt(1000000))
 
 	for i := 0; i < size; i++ {
 		tx := transaction(uint64(i), big.NewInt(100000), key)
@@ -1490,7 +1402,7 @@ func benchmarkPendingDemotion(b *testing.B, size int) {
 	// Benchmark the speed of pool validation
 	b.ResetTimer()
 	for i := 0; i < b.N; i++ {
-		pool.demoteUnexecutables(state)
+		pool.demoteUnexecutables()
 	}
 }
 
@@ -1506,8 +1418,7 @@ func benchmarkFuturePromotion(b *testing.B, size int) {
 	defer pool.Stop()
 
 	account, _ := deriveSender(transaction(0, big.NewInt(0), key))
-	state, _ := pool.blockChain.State()
-	state.AddBalance(account, big.NewInt(1000000))
+	pool.currentState.AddBalance(account, big.NewInt(1000000))
 
 	for i := 0; i < size; i++ {
 		tx := transaction(uint64(1+i), big.NewInt(100000), key)
@@ -1516,7 +1427,7 @@ func benchmarkFuturePromotion(b *testing.B, size int) {
 	// Benchmark the speed of pool validation
 	b.ResetTimer()
 	for i := 0; i < b.N; i++ {
-		pool.promoteExecutables(state, nil)
+		pool.promoteExecutables(nil)
 	}
 }
 
@@ -1527,8 +1438,7 @@ func BenchmarkPoolInsert(b *testing.B) {
 	defer pool.Stop()
 
 	account, _ := deriveSender(transaction(0, big.NewInt(0), key))
-	state, _ := pool.blockChain.State()
-	state.AddBalance(account, big.NewInt(1000000))
+	pool.currentState.AddBalance(account, big.NewInt(1000000))
 
 	txs := make(types.Transactions, b.N)
 	for i := 0; i < b.N; i++ {
@@ -1552,8 +1462,7 @@ func benchmarkPoolBatchInsert(b *testing.B, size int) {
 	defer pool.Stop()
 
 	account, _ := deriveSender(transaction(0, big.NewInt(0), key))
-	state, _ := pool.blockChain.State()
-	state.AddBalance(account, big.NewInt(1000000))
+	pool.currentState.AddBalance(account, big.NewInt(1000000))
 
 	batches := make([]types.Transactions, b.N)
 	for i := 0; i < b.N; i++ {
diff --git a/eth/api_backend.go b/eth/api_backend.go
index abf52326b..19ef79f23 100644
--- a/eth/api_backend.go
+++ b/eth/api_backend.go
@@ -115,10 +115,6 @@ func (b *EthApiBackend) GetEVM(ctx context.Context, msg core.Message, state *sta
 	return vm.NewEVM(context, state, b.eth.chainConfig, vmCfg), vmError, nil
 }
 
-func (b *EthApiBackend) SubscribeRemovedTxEvent(ch chan<- core.RemovedTransactionEvent) event.Subscription {
-	return b.eth.BlockChain().SubscribeRemovedTxEvent(ch)
-}
-
 func (b *EthApiBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription {
 	return b.eth.BlockChain().SubscribeRemovedLogsEvent(ch)
 }
@@ -143,10 +139,6 @@ func (b *EthApiBackend) SendTx(ctx context.Context, signedTx *types.Transaction)
 	return b.eth.txPool.AddLocal(signedTx)
 }
 
-func (b *EthApiBackend) RemoveTx(txHash common.Hash) {
-	b.eth.txPool.Remove(txHash)
-}
-
 func (b *EthApiBackend) GetPoolTransactions() (types.Transactions, error) {
 	pending, err := b.eth.txPool.Pending()
 	if err != nil {
diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go
index 30710aaab..0775749e7 100644
--- a/internal/ethapi/api.go
+++ b/internal/ethapi/api.go
@@ -1265,7 +1265,6 @@ func (s *PublicTransactionPoolAPI) Resend(ctx context.Context, sendArgs SendTxAr
 			if err != nil {
 				return common.Hash{}, err
 			}
-			s.b.RemoveTx(p.Hash())
 			if err = s.b.SendTx(ctx, signedTx); err != nil {
 				return common.Hash{}, err
 			}
diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go
index be17ffeae..368fa4872 100644
--- a/internal/ethapi/backend.go
+++ b/internal/ethapi/backend.go
@@ -59,7 +59,6 @@ type Backend interface {
 
 	// TxPool API
 	SendTx(ctx context.Context, signedTx *types.Transaction) error
-	RemoveTx(txHash common.Hash)
 	GetPoolTransactions() (types.Transactions, error)
 	GetPoolTransaction(txHash common.Hash) *types.Transaction
 	GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error)
diff --git a/miner/worker.go b/miner/worker.go
index 24e03be60..5bac5d6e8 100644
--- a/miner/worker.go
+++ b/miner/worker.go
@@ -71,7 +71,6 @@ type Work struct {
 	family    *set.Set       // family set (used for checking uncle invalidity)
 	uncles    *set.Set       // uncle set
 	tcount    int            // tx count in cycle
-	failedTxs types.Transactions
 
 	Block *types.Block // the new block
 
@@ -477,8 +476,6 @@ func (self *worker) commitNewWork() {
 	txs := types.NewTransactionsByPriceAndNonce(pending)
 	work.commitTransactions(self.mux, txs, self.chain, self.coinbase)
 
-	self.eth.TxPool().RemoveBatch(work.failedTxs)
-
 	// compute uncles for the new block.
 	var (
 		uncles    []*types.Header
@@ -563,6 +560,16 @@ func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsB
 			log.Trace("Gas limit exceeded for current block", "sender", from)
 			txs.Pop()
 
+		case core.ErrNonceTooLow:
+			// New head notification data race between the transaction pool and miner, shift
+			log.Trace("Skipping transaction with low nonce", "sender", from, "nonce", tx.Nonce())
+			txs.Shift()
+
+		case core.ErrNonceTooHigh:
+			// Reorg notification data race between the transaction pool and miner, skip account =
+			log.Trace("Skipping account with hight nonce", "sender", from, "nonce", tx.Nonce())
+			txs.Pop()
+
 		case nil:
 			// Everything ok, collect the logs and shift in the next transaction from the same account
 			coalescedLogs = append(coalescedLogs, logs...)
@@ -570,10 +577,10 @@ func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsB
 			txs.Shift()
 
 		default:
-			// Pop the current failed transaction without shifting in the next from the account
-			log.Trace("Transaction failed, will be removed", "hash", tx.Hash(), "err", err)
-			env.failedTxs = append(env.failedTxs, tx)
-			txs.Pop()
+			// Strange error, discard the transaction and get the next in line (note, the
+			// nonce-too-high clause will prevent us from executing in vain).
+			log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err)
+			txs.Shift()
 		}
 	}
 
-- 
GitLab