diff --git a/accounts/abi/bind/backends/simulated.go b/accounts/abi/bind/backends/simulated.go
index e30572e0a111bdb8888691aed11f2a499e18c42d..e625a4286845236bb9190f94dac35f54e8296a6d 100644
--- a/accounts/abi/bind/backends/simulated.go
+++ b/accounts/abi/bind/backends/simulated.go
@@ -76,7 +76,7 @@ func NewSimulatedBackendWithDatabase(database ethdb.Database, alloc core.Genesis
 		database:   database,
 		blockchain: blockchain,
 		config:     genesis.Config,
-		events:     filters.NewEventSystem(new(event.TypeMux), &filterBackend{database, blockchain}, false),
+		events:     filters.NewEventSystem(&filterBackend{database, blockchain}, false),
 	}
 	backend.rollback()
 	return backend
@@ -502,22 +502,34 @@ func (fb *filterBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*ty
 }
 
 func (fb *filterBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
-	return event.NewSubscription(func(quit <-chan struct{}) error {
-		<-quit
-		return nil
-	})
+	return nullSubscription()
 }
+
 func (fb *filterBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
 	return fb.bc.SubscribeChainEvent(ch)
 }
+
 func (fb *filterBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription {
 	return fb.bc.SubscribeRemovedLogsEvent(ch)
 }
+
 func (fb *filterBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
 	return fb.bc.SubscribeLogsEvent(ch)
 }
 
+func (fb *filterBackend) SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription {
+	return nullSubscription()
+}
+
 func (fb *filterBackend) BloomStatus() (uint64, uint64) { return 4096, 0 }
+
 func (fb *filterBackend) ServiceFilter(ctx context.Context, ms *bloombits.MatcherSession) {
 	panic("not supported")
 }
+
+func nullSubscription() event.Subscription {
+	return event.NewSubscription(func(quit <-chan struct{}) error {
+		<-quit
+		return nil
+	})
+}
diff --git a/core/events.go b/core/events.go
index 710bdb589485e0768d25e1fb2c5bf5d16ba85ec1..ac935a137f5f6ae5459e783802ca60c83c09937e 100644
--- a/core/events.go
+++ b/core/events.go
@@ -24,11 +24,6 @@ import (
 // NewTxsEvent is posted when a batch of transactions enter the transaction pool.
 type NewTxsEvent struct{ Txs []*types.Transaction }
 
-// PendingLogsEvent is posted pre mining and notifies of pending logs.
-type PendingLogsEvent struct {
-	Logs []*types.Log
-}
-
 // NewMinedBlockEvent is posted when a block has been imported.
 type NewMinedBlockEvent struct{ Block *types.Block }
 
diff --git a/eth/api_backend.go b/eth/api_backend.go
index 4b74ccff519d4694caf991c0bf06e17ad6be3927..0ad2d57fd2e28858b6707ed4f4678d2ded406cf8 100644
--- a/eth/api_backend.go
+++ b/eth/api_backend.go
@@ -202,6 +202,10 @@ func (b *EthAPIBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEven
 	return b.eth.BlockChain().SubscribeRemovedLogsEvent(ch)
 }
 
+func (b *EthAPIBackend) SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription {
+	return b.eth.miner.SubscribePendingLogs(ch)
+}
+
 func (b *EthAPIBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
 	return b.eth.BlockChain().SubscribeChainEvent(ch)
 }
diff --git a/eth/filters/api.go b/eth/filters/api.go
index 5ed80a887563fe2ddd1dd4cb8e4a2af354baad38..46c9f44bb0e7e0516ba9f55f5e2e4128686a966f 100644
--- a/eth/filters/api.go
+++ b/eth/filters/api.go
@@ -65,9 +65,8 @@ type PublicFilterAPI struct {
 func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI {
 	api := &PublicFilterAPI{
 		backend: backend,
-		mux:     backend.EventMux(),
 		chainDb: backend.ChainDb(),
-		events:  NewEventSystem(backend.EventMux(), backend, lightMode),
+		events:  NewEventSystem(backend, lightMode),
 		filters: make(map[rpc.ID]*filter),
 	}
 	go api.timeoutLoop()
@@ -428,7 +427,7 @@ func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
 			hashes := f.hashes
 			f.hashes = nil
 			return returnHashes(hashes), nil
-		case LogsSubscription:
+		case LogsSubscription, MinedAndPendingLogsSubscription:
 			logs := f.logs
 			f.logs = nil
 			return returnLogs(logs), nil
diff --git a/eth/filters/bench_test.go b/eth/filters/bench_test.go
index fc7e6f5273ce083a86949347b6a957cea9e93a7d..584ea23d02c0a77f03d42912dad3415bbd09d999 100644
--- a/eth/filters/bench_test.go
+++ b/eth/filters/bench_test.go
@@ -28,7 +28,6 @@ import (
 	"github.com/ethereum/go-ethereum/core/rawdb"
 	"github.com/ethereum/go-ethereum/core/types"
 	"github.com/ethereum/go-ethereum/ethdb"
-	"github.com/ethereum/go-ethereum/event"
 	"github.com/ethereum/go-ethereum/node"
 )
 
@@ -122,14 +121,13 @@ func benchmarkBloomBits(b *testing.B, sectionSize uint64) {
 
 	b.Log("Running filter benchmarks...")
 	start = time.Now()
-	mux := new(event.TypeMux)
 	var backend *testBackend
 
 	for i := 0; i < benchFilterCnt; i++ {
 		if i%20 == 0 {
 			db.Close()
 			db, _ = rawdb.NewLevelDBDatabase(benchDataDir, 128, 1024, "")
-			backend = &testBackend{mux, db, cnt, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)}
+			backend = &testBackend{db: db, sections: cnt}
 		}
 		var addr common.Address
 		addr[0] = byte(i)
@@ -173,8 +171,7 @@ func BenchmarkNoBloomBits(b *testing.B) {
 
 	b.Log("Running filter benchmarks...")
 	start := time.Now()
-	mux := new(event.TypeMux)
-	backend := &testBackend{mux, db, 0, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)}
+	backend := &testBackend{db: db}
 	filter := NewRangeFilter(backend, 0, int64(*headNum), []common.Address{{}}, nil)
 	filter.Logs(context.Background())
 	d := time.Since(start)
diff --git a/eth/filters/filter.go b/eth/filters/filter.go
index 071613ad7a158d739bea4282a6d153ad77cde804..21840920716891dead7ea7bf0e50654b60154dfe 100644
--- a/eth/filters/filter.go
+++ b/eth/filters/filter.go
@@ -32,7 +32,6 @@ import (
 
 type Backend interface {
 	ChainDb() ethdb.Database
-	EventMux() *event.TypeMux
 	HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error)
 	HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error)
 	GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
@@ -42,6 +41,7 @@ type Backend interface {
 	SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
 	SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
 	SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
+	SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription
 
 	BloomStatus() (uint64, uint64)
 	ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go
index 70139c1a96ece84df6b75282179ce0e1d6ed7b72..a105ec51c350d2111125cb545b693fd18b8683ac 100644
--- a/eth/filters/filter_system.go
+++ b/eth/filters/filter_system.go
@@ -20,7 +20,6 @@ package filters
 
 import (
 	"context"
-	"errors"
 	"fmt"
 	"sync"
 	"time"
@@ -58,7 +57,6 @@ const (
 )
 
 const (
-
 	// txChanSize is the size of channel listening to NewTxsEvent.
 	// The number is referenced from the size of tx pool.
 	txChanSize = 4096
@@ -70,10 +68,6 @@ const (
 	chainEvChanSize = 10
 )
 
-var (
-	ErrInvalidSubscriptionID = errors.New("invalid id")
-)
-
 type subscription struct {
 	id        rpc.ID
 	typ       Type
@@ -89,25 +83,25 @@ type subscription struct {
 // EventSystem creates subscriptions, processes events and broadcasts them to the
 // subscription which match the subscription criteria.
 type EventSystem struct {
-	mux       *event.TypeMux
 	backend   Backend
 	lightMode bool
 	lastHead  *types.Header
 
 	// Subscriptions
-	txsSub        event.Subscription         // Subscription for new transaction event
-	logsSub       event.Subscription         // Subscription for new log event
-	rmLogsSub     event.Subscription         // Subscription for removed log event
-	chainSub      event.Subscription         // Subscription for new chain event
-	pendingLogSub *event.TypeMuxSubscription // Subscription for pending log event
+	txsSub         event.Subscription // Subscription for new transaction event
+	logsSub        event.Subscription // Subscription for new log event
+	rmLogsSub      event.Subscription // Subscription for removed log event
+	pendingLogsSub event.Subscription // Subscription for pending log event
+	chainSub       event.Subscription // Subscription for new chain event
 
 	// Channels
-	install   chan *subscription         // install filter for event notification
-	uninstall chan *subscription         // remove filter for event notification
-	txsCh     chan core.NewTxsEvent      // Channel to receive new transactions event
-	logsCh    chan []*types.Log          // Channel to receive new log event
-	rmLogsCh  chan core.RemovedLogsEvent // Channel to receive removed log event
-	chainCh   chan core.ChainEvent       // Channel to receive new chain event
+	install       chan *subscription         // install filter for event notification
+	uninstall     chan *subscription         // remove filter for event notification
+	txsCh         chan core.NewTxsEvent      // Channel to receive new transactions event
+	logsCh        chan []*types.Log          // Channel to receive new log event
+	pendingLogsCh chan []*types.Log          // Channel to receive new log event
+	rmLogsCh      chan core.RemovedLogsEvent // Channel to receive removed log event
+	chainCh       chan core.ChainEvent       // Channel to receive new chain event
 }
 
 // NewEventSystem creates a new manager that listens for event on the given mux,
@@ -116,17 +110,17 @@ type EventSystem struct {
 //
 // The returned manager has a loop that needs to be stopped with the Stop function
 // or by stopping the given mux.
-func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventSystem {
+func NewEventSystem(backend Backend, lightMode bool) *EventSystem {
 	m := &EventSystem{
-		mux:       mux,
-		backend:   backend,
-		lightMode: lightMode,
-		install:   make(chan *subscription),
-		uninstall: make(chan *subscription),
-		txsCh:     make(chan core.NewTxsEvent, txChanSize),
-		logsCh:    make(chan []*types.Log, logsChanSize),
-		rmLogsCh:  make(chan core.RemovedLogsEvent, rmLogsChanSize),
-		chainCh:   make(chan core.ChainEvent, chainEvChanSize),
+		backend:       backend,
+		lightMode:     lightMode,
+		install:       make(chan *subscription),
+		uninstall:     make(chan *subscription),
+		txsCh:         make(chan core.NewTxsEvent, txChanSize),
+		logsCh:        make(chan []*types.Log, logsChanSize),
+		rmLogsCh:      make(chan core.RemovedLogsEvent, rmLogsChanSize),
+		pendingLogsCh: make(chan []*types.Log, logsChanSize),
+		chainCh:       make(chan core.ChainEvent, chainEvChanSize),
 	}
 
 	// Subscribe events
@@ -134,12 +128,10 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS
 	m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh)
 	m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh)
 	m.chainSub = m.backend.SubscribeChainEvent(m.chainCh)
-	// TODO(rjl493456442): use feed to subscribe pending log event
-	m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{})
+	m.pendingLogsSub = m.backend.SubscribePendingLogsEvent(m.pendingLogsCh)
 
 	// Make sure none of the subscriptions are empty
-	if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil ||
-		m.pendingLogSub.Closed() {
+	if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogsSub == nil {
 		log.Crit("Subscribe for event system failed")
 	}
 
@@ -316,58 +308,61 @@ func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscript
 
 type filterIndex map[Type]map[rpc.ID]*subscription
 
-// broadcast event to filters that match criteria.
-func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) {
-	if ev == nil {
+func (es *EventSystem) handleLogs(filters filterIndex, ev []*types.Log) {
+	if len(ev) == 0 {
 		return
 	}
+	for _, f := range filters[LogsSubscription] {
+		matchedLogs := filterLogs(ev, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics)
+		if len(matchedLogs) > 0 {
+			f.logs <- matchedLogs
+		}
+	}
+}
 
-	switch e := ev.(type) {
-	case []*types.Log:
-		if len(e) > 0 {
-			for _, f := range filters[LogsSubscription] {
-				if matchedLogs := filterLogs(e, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
-					f.logs <- matchedLogs
-				}
-			}
+func (es *EventSystem) handlePendingLogs(filters filterIndex, ev []*types.Log) {
+	if len(ev) == 0 {
+		return
+	}
+	for _, f := range filters[PendingLogsSubscription] {
+		matchedLogs := filterLogs(ev, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics)
+		if len(matchedLogs) > 0 {
+			f.logs <- matchedLogs
 		}
-	case core.RemovedLogsEvent:
-		for _, f := range filters[LogsSubscription] {
-			if matchedLogs := filterLogs(e.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
-				f.logs <- matchedLogs
-			}
+	}
+}
+
+func (es *EventSystem) handleRemovedLogs(filters filterIndex, ev core.RemovedLogsEvent) {
+	for _, f := range filters[LogsSubscription] {
+		matchedLogs := filterLogs(ev.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics)
+		if len(matchedLogs) > 0 {
+			f.logs <- matchedLogs
 		}
-	case *event.TypeMuxEvent:
-		if muxe, ok := e.Data.(core.PendingLogsEvent); ok {
-			for _, f := range filters[PendingLogsSubscription] {
-				if e.Time.After(f.created) {
-					if matchedLogs := filterLogs(muxe.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
-						f.logs <- matchedLogs
-					}
+	}
+}
+
+func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent) {
+	hashes := make([]common.Hash, 0, len(ev.Txs))
+	for _, tx := range ev.Txs {
+		hashes = append(hashes, tx.Hash())
+	}
+	for _, f := range filters[PendingTransactionsSubscription] {
+		f.hashes <- hashes
+	}
+}
+
+func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent) {
+	for _, f := range filters[BlocksSubscription] {
+		f.headers <- ev.Block.Header()
+	}
+	if es.lightMode && len(filters[LogsSubscription]) > 0 {
+		es.lightFilterNewHead(ev.Block.Header(), func(header *types.Header, remove bool) {
+			for _, f := range filters[LogsSubscription] {
+				if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
+					f.logs <- matchedLogs
 				}
 			}
-		}
-	case core.NewTxsEvent:
-		hashes := make([]common.Hash, 0, len(e.Txs))
-		for _, tx := range e.Txs {
-			hashes = append(hashes, tx.Hash())
-		}
-		for _, f := range filters[PendingTransactionsSubscription] {
-			f.hashes <- hashes
-		}
-	case core.ChainEvent:
-		for _, f := range filters[BlocksSubscription] {
-			f.headers <- e.Block.Header()
-		}
-		if es.lightMode && len(filters[LogsSubscription]) > 0 {
-			es.lightFilterNewHead(e.Block.Header(), func(header *types.Header, remove bool) {
-				for _, f := range filters[LogsSubscription] {
-					if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
-						f.logs <- matchedLogs
-					}
-				}
-			})
-		}
+		})
 	}
 }
 
@@ -448,10 +443,10 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.
 func (es *EventSystem) eventLoop() {
 	// Ensure all subscriptions get cleaned up
 	defer func() {
-		es.pendingLogSub.Unsubscribe()
 		es.txsSub.Unsubscribe()
 		es.logsSub.Unsubscribe()
 		es.rmLogsSub.Unsubscribe()
+		es.pendingLogsSub.Unsubscribe()
 		es.chainSub.Unsubscribe()
 	}()
 
@@ -462,20 +457,16 @@ func (es *EventSystem) eventLoop() {
 
 	for {
 		select {
-		// Handle subscribed events
 		case ev := <-es.txsCh:
-			es.broadcast(index, ev)
+			es.handleTxsEvent(index, ev)
 		case ev := <-es.logsCh:
-			es.broadcast(index, ev)
+			es.handleLogs(index, ev)
 		case ev := <-es.rmLogsCh:
-			es.broadcast(index, ev)
+			es.handleRemovedLogs(index, ev)
+		case ev := <-es.pendingLogsCh:
+			es.handlePendingLogs(index, ev)
 		case ev := <-es.chainCh:
-			es.broadcast(index, ev)
-		case ev, active := <-es.pendingLogSub.Chan():
-			if !active { // system stopped
-				return
-			}
-			es.broadcast(index, ev)
+			es.handleChainEvent(index, ev)
 
 		case f := <-es.install:
 			if f.typ == MinedAndPendingLogsSubscription {
diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go
index 93cb43123f70cb82bfeffbcecdabbb1323074bb9..c8d1d43abbd81a99dd23e3d167e020be3326be63 100644
--- a/eth/filters/filter_system_test.go
+++ b/eth/filters/filter_system_test.go
@@ -39,23 +39,20 @@ import (
 )
 
 type testBackend struct {
-	mux        *event.TypeMux
-	db         ethdb.Database
-	sections   uint64
-	txFeed     *event.Feed
-	rmLogsFeed *event.Feed
-	logsFeed   *event.Feed
-	chainFeed  *event.Feed
+	mux             *event.TypeMux
+	db              ethdb.Database
+	sections        uint64
+	txFeed          event.Feed
+	logsFeed        event.Feed
+	rmLogsFeed      event.Feed
+	pendingLogsFeed event.Feed
+	chainFeed       event.Feed
 }
 
 func (b *testBackend) ChainDb() ethdb.Database {
 	return b.db
 }
 
-func (b *testBackend) EventMux() *event.TypeMux {
-	return b.mux
-}
-
 func (b *testBackend) HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) {
 	var (
 		hash common.Hash
@@ -116,6 +113,10 @@ func (b *testBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscript
 	return b.logsFeed.Subscribe(ch)
 }
 
+func (b *testBackend) SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription {
+	return b.pendingLogsFeed.Subscribe(ch)
+}
+
 func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
 	return b.chainFeed.Subscribe(ch)
 }
@@ -160,13 +161,8 @@ func TestBlockSubscription(t *testing.T) {
 	t.Parallel()
 
 	var (
-		mux         = new(event.TypeMux)
 		db          = rawdb.NewMemoryDatabase()
-		txFeed      = new(event.Feed)
-		rmLogsFeed  = new(event.Feed)
-		logsFeed    = new(event.Feed)
-		chainFeed   = new(event.Feed)
-		backend     = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
+		backend     = &testBackend{db: db}
 		api         = NewPublicFilterAPI(backend, false)
 		genesis     = new(core.Genesis).MustCommit(db)
 		chain, _    = core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 10, func(i int, gen *core.BlockGen) {})
@@ -205,7 +201,7 @@ func TestBlockSubscription(t *testing.T) {
 
 	time.Sleep(1 * time.Second)
 	for _, e := range chainEvents {
-		chainFeed.Send(e)
+		backend.chainFeed.Send(e)
 	}
 
 	<-sub0.Err()
@@ -217,14 +213,9 @@ func TestPendingTxFilter(t *testing.T) {
 	t.Parallel()
 
 	var (
-		mux        = new(event.TypeMux)
-		db         = rawdb.NewMemoryDatabase()
-		txFeed     = new(event.Feed)
-		rmLogsFeed = new(event.Feed)
-		logsFeed   = new(event.Feed)
-		chainFeed  = new(event.Feed)
-		backend    = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
-		api        = NewPublicFilterAPI(backend, false)
+		db      = rawdb.NewMemoryDatabase()
+		backend = &testBackend{db: db}
+		api     = NewPublicFilterAPI(backend, false)
 
 		transactions = []*types.Transaction{
 			types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
@@ -240,7 +231,7 @@ func TestPendingTxFilter(t *testing.T) {
 	fid0 := api.NewPendingTransactionFilter()
 
 	time.Sleep(1 * time.Second)
-	txFeed.Send(core.NewTxsEvent{Txs: transactions})
+	backend.txFeed.Send(core.NewTxsEvent{Txs: transactions})
 
 	timeout := time.Now().Add(1 * time.Second)
 	for {
@@ -277,14 +268,9 @@ func TestPendingTxFilter(t *testing.T) {
 // If not it must return an error.
 func TestLogFilterCreation(t *testing.T) {
 	var (
-		mux        = new(event.TypeMux)
-		db         = rawdb.NewMemoryDatabase()
-		txFeed     = new(event.Feed)
-		rmLogsFeed = new(event.Feed)
-		logsFeed   = new(event.Feed)
-		chainFeed  = new(event.Feed)
-		backend    = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
-		api        = NewPublicFilterAPI(backend, false)
+		db      = rawdb.NewMemoryDatabase()
+		backend = &testBackend{db: db}
+		api     = NewPublicFilterAPI(backend, false)
 
 		testCases = []struct {
 			crit    FilterCriteria
@@ -326,14 +312,9 @@ func TestInvalidLogFilterCreation(t *testing.T) {
 	t.Parallel()
 
 	var (
-		mux        = new(event.TypeMux)
-		db         = rawdb.NewMemoryDatabase()
-		txFeed     = new(event.Feed)
-		rmLogsFeed = new(event.Feed)
-		logsFeed   = new(event.Feed)
-		chainFeed  = new(event.Feed)
-		backend    = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
-		api        = NewPublicFilterAPI(backend, false)
+		db      = rawdb.NewMemoryDatabase()
+		backend = &testBackend{db: db}
+		api     = NewPublicFilterAPI(backend, false)
 	)
 
 	// different situations where log filter creation should fail.
@@ -353,15 +334,10 @@ func TestInvalidLogFilterCreation(t *testing.T) {
 
 func TestInvalidGetLogsRequest(t *testing.T) {
 	var (
-		mux        = new(event.TypeMux)
-		db         = rawdb.NewMemoryDatabase()
-		txFeed     = new(event.Feed)
-		rmLogsFeed = new(event.Feed)
-		logsFeed   = new(event.Feed)
-		chainFeed  = new(event.Feed)
-		backend    = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
-		api        = NewPublicFilterAPI(backend, false)
-		blockHash  = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111")
+		db        = rawdb.NewMemoryDatabase()
+		backend   = &testBackend{db: db}
+		api       = NewPublicFilterAPI(backend, false)
+		blockHash = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111")
 	)
 
 	// Reason: Cannot specify both BlockHash and FromBlock/ToBlock)
@@ -383,14 +359,9 @@ func TestLogFilter(t *testing.T) {
 	t.Parallel()
 
 	var (
-		mux        = new(event.TypeMux)
-		db         = rawdb.NewMemoryDatabase()
-		txFeed     = new(event.Feed)
-		rmLogsFeed = new(event.Feed)
-		logsFeed   = new(event.Feed)
-		chainFeed  = new(event.Feed)
-		backend    = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
-		api        = NewPublicFilterAPI(backend, false)
+		db      = rawdb.NewMemoryDatabase()
+		backend = &testBackend{db: db}
+		api     = NewPublicFilterAPI(backend, false)
 
 		firstAddr      = common.HexToAddress("0x1111111111111111111111111111111111111111")
 		secondAddr     = common.HexToAddress("0x2222222222222222222222222222222222222222")
@@ -400,7 +371,7 @@ func TestLogFilter(t *testing.T) {
 		secondTopic    = common.HexToHash("0x2222222222222222222222222222222222222222222222222222222222222222")
 		notUsedTopic   = common.HexToHash("0x9999999999999999999999999999999999999999999999999999999999999999")
 
-		// posted twice, once as vm.Logs and once as core.PendingLogsEvent
+		// posted twice, once as regular logs and once as pending logs.
 		allLogs = []*types.Log{
 			{Address: firstAddr},
 			{Address: firstAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 1},
@@ -453,11 +424,11 @@ func TestLogFilter(t *testing.T) {
 
 	// raise events
 	time.Sleep(1 * time.Second)
-	if nsend := logsFeed.Send(allLogs); nsend == 0 {
-		t.Fatal("Shoud have at least one subscription")
+	if nsend := backend.logsFeed.Send(allLogs); nsend == 0 {
+		t.Fatal("Logs event not delivered")
 	}
-	if err := mux.Post(core.PendingLogsEvent{Logs: allLogs}); err != nil {
-		t.Fatal(err)
+	if nsend := backend.pendingLogsFeed.Send(allLogs); nsend == 0 {
+		t.Fatal("Pending logs event not delivered")
 	}
 
 	for i, tt := range testCases {
@@ -502,14 +473,9 @@ func TestPendingLogsSubscription(t *testing.T) {
 	t.Parallel()
 
 	var (
-		mux        = new(event.TypeMux)
-		db         = rawdb.NewMemoryDatabase()
-		txFeed     = new(event.Feed)
-		rmLogsFeed = new(event.Feed)
-		logsFeed   = new(event.Feed)
-		chainFeed  = new(event.Feed)
-		backend    = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
-		api        = NewPublicFilterAPI(backend, false)
+		db      = rawdb.NewMemoryDatabase()
+		backend = &testBackend{db: db}
+		api     = NewPublicFilterAPI(backend, false)
 
 		firstAddr      = common.HexToAddress("0x1111111111111111111111111111111111111111")
 		secondAddr     = common.HexToAddress("0x2222222222222222222222222222222222222222")
@@ -521,26 +487,18 @@ func TestPendingLogsSubscription(t *testing.T) {
 		fourthTopic    = common.HexToHash("0x4444444444444444444444444444444444444444444444444444444444444444")
 		notUsedTopic   = common.HexToHash("0x9999999999999999999999999999999999999999999999999999999999999999")
 
-		allLogs = []core.PendingLogsEvent{
-			{Logs: []*types.Log{{Address: firstAddr, Topics: []common.Hash{}, BlockNumber: 0}}},
-			{Logs: []*types.Log{{Address: firstAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 1}}},
-			{Logs: []*types.Log{{Address: secondAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 2}}},
-			{Logs: []*types.Log{{Address: thirdAddress, Topics: []common.Hash{secondTopic}, BlockNumber: 3}}},
-			{Logs: []*types.Log{{Address: thirdAddress, Topics: []common.Hash{secondTopic}, BlockNumber: 4}}},
-			{Logs: []*types.Log{
+		allLogs = [][]*types.Log{
+			{{Address: firstAddr, Topics: []common.Hash{}, BlockNumber: 0}},
+			{{Address: firstAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 1}},
+			{{Address: secondAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 2}},
+			{{Address: thirdAddress, Topics: []common.Hash{secondTopic}, BlockNumber: 3}},
+			{{Address: thirdAddress, Topics: []common.Hash{secondTopic}, BlockNumber: 4}},
+			{
 				{Address: thirdAddress, Topics: []common.Hash{firstTopic}, BlockNumber: 5},
 				{Address: thirdAddress, Topics: []common.Hash{thirdTopic}, BlockNumber: 5},
 				{Address: thirdAddress, Topics: []common.Hash{fourthTopic}, BlockNumber: 5},
 				{Address: firstAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 5},
-			}},
-		}
-
-		convertLogs = func(pl []core.PendingLogsEvent) []*types.Log {
-			var logs []*types.Log
-			for _, l := range pl {
-				logs = append(logs, l.Logs...)
-			}
-			return logs
+			},
 		}
 
 		testCases = []struct {
@@ -550,21 +508,52 @@ func TestPendingLogsSubscription(t *testing.T) {
 			sub      *Subscription
 		}{
 			// match all
-			{ethereum.FilterQuery{}, convertLogs(allLogs), nil, nil},
+			{
+				ethereum.FilterQuery{}, flattenLogs(allLogs),
+				nil, nil,
+			},
 			// match none due to no matching addresses
-			{ethereum.FilterQuery{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{nil}}, []*types.Log{}, nil, nil},
+			{
+				ethereum.FilterQuery{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{nil}},
+				nil,
+				nil, nil,
+			},
 			// match logs based on addresses, ignore topics
-			{ethereum.FilterQuery{Addresses: []common.Address{firstAddr}}, append(convertLogs(allLogs[:2]), allLogs[5].Logs[3]), nil, nil},
+			{
+				ethereum.FilterQuery{Addresses: []common.Address{firstAddr}},
+				append(flattenLogs(allLogs[:2]), allLogs[5][3]),
+				nil, nil,
+			},
 			// match none due to no matching topics (match with address)
-			{ethereum.FilterQuery{Addresses: []common.Address{secondAddr}, Topics: [][]common.Hash{{notUsedTopic}}}, []*types.Log{}, nil, nil},
+			{
+				ethereum.FilterQuery{Addresses: []common.Address{secondAddr}, Topics: [][]common.Hash{{notUsedTopic}}},
+				nil, nil, nil,
+			},
 			// match logs based on addresses and topics
-			{ethereum.FilterQuery{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}}, append(convertLogs(allLogs[3:5]), allLogs[5].Logs[0]), nil, nil},
+			{
+				ethereum.FilterQuery{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}},
+				append(flattenLogs(allLogs[3:5]), allLogs[5][0]),
+				nil, nil,
+			},
 			// match logs based on multiple addresses and "or" topics
-			{ethereum.FilterQuery{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}}, append(convertLogs(allLogs[2:5]), allLogs[5].Logs[0]), nil, nil},
+			{
+				ethereum.FilterQuery{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}},
+				append(flattenLogs(allLogs[2:5]), allLogs[5][0]),
+				nil,
+				nil,
+			},
 			// block numbers are ignored for filters created with New***Filter, these return all logs that match the given criteria when the state changes
-			{ethereum.FilterQuery{Addresses: []common.Address{firstAddr}, FromBlock: big.NewInt(2), ToBlock: big.NewInt(3)}, append(convertLogs(allLogs[:2]), allLogs[5].Logs[3]), nil, nil},
+			{
+				ethereum.FilterQuery{Addresses: []common.Address{firstAddr}, FromBlock: big.NewInt(2), ToBlock: big.NewInt(3)},
+				append(flattenLogs(allLogs[:2]), allLogs[5][3]),
+				nil, nil,
+			},
 			// multiple pending logs, should match only 2 topics from the logs in block 5
-			{ethereum.FilterQuery{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, fourthTopic}}}, []*types.Log{allLogs[5].Logs[0], allLogs[5].Logs[2]}, nil, nil},
+			{
+				ethereum.FilterQuery{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, fourthTopic}}},
+				[]*types.Log{allLogs[5][0], allLogs[5][2]},
+				nil, nil,
+			},
 		}
 	)
 
@@ -607,10 +596,15 @@ func TestPendingLogsSubscription(t *testing.T) {
 
 	// raise events
 	time.Sleep(1 * time.Second)
-	// allLogs are type of core.PendingLogsEvent
-	for _, l := range allLogs {
-		if err := mux.Post(l); err != nil {
-			t.Fatal(err)
-		}
+	for _, ev := range allLogs {
+		backend.pendingLogsFeed.Send(ev)
+	}
+}
+
+func flattenLogs(pl [][]*types.Log) []*types.Log {
+	var logs []*types.Log
+	for _, l := range pl {
+		logs = append(logs, l...)
 	}
+	return logs
 }
diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go
index eafa19cdfa22797d1581469808a0c39525a15709..f45720d5a90fe7e86214408215861fdf9eb045e1 100644
--- a/eth/filters/filter_test.go
+++ b/eth/filters/filter_test.go
@@ -29,7 +29,6 @@ import (
 	"github.com/ethereum/go-ethereum/core/rawdb"
 	"github.com/ethereum/go-ethereum/core/types"
 	"github.com/ethereum/go-ethereum/crypto"
-	"github.com/ethereum/go-ethereum/event"
 	"github.com/ethereum/go-ethereum/params"
 )
 
@@ -50,18 +49,13 @@ func BenchmarkFilters(b *testing.B) {
 	defer os.RemoveAll(dir)
 
 	var (
-		db, _      = rawdb.NewLevelDBDatabase(dir, 0, 0, "")
-		mux        = new(event.TypeMux)
-		txFeed     = new(event.Feed)
-		rmLogsFeed = new(event.Feed)
-		logsFeed   = new(event.Feed)
-		chainFeed  = new(event.Feed)
-		backend    = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
-		key1, _    = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
-		addr1      = crypto.PubkeyToAddress(key1.PublicKey)
-		addr2      = common.BytesToAddress([]byte("jeff"))
-		addr3      = common.BytesToAddress([]byte("ethereum"))
-		addr4      = common.BytesToAddress([]byte("random addresses please"))
+		db, _   = rawdb.NewLevelDBDatabase(dir, 0, 0, "")
+		backend = &testBackend{db: db}
+		key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
+		addr1   = crypto.PubkeyToAddress(key1.PublicKey)
+		addr2   = common.BytesToAddress([]byte("jeff"))
+		addr3   = common.BytesToAddress([]byte("ethereum"))
+		addr4   = common.BytesToAddress([]byte("random addresses please"))
 	)
 	defer db.Close()
 
@@ -109,15 +103,10 @@ func TestFilters(t *testing.T) {
 	defer os.RemoveAll(dir)
 
 	var (
-		db, _      = rawdb.NewLevelDBDatabase(dir, 0, 0, "")
-		mux        = new(event.TypeMux)
-		txFeed     = new(event.Feed)
-		rmLogsFeed = new(event.Feed)
-		logsFeed   = new(event.Feed)
-		chainFeed  = new(event.Feed)
-		backend    = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
-		key1, _    = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
-		addr       = crypto.PubkeyToAddress(key1.PublicKey)
+		db, _   = rawdb.NewLevelDBDatabase(dir, 0, 0, "")
+		backend = &testBackend{db: db}
+		key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
+		addr    = crypto.PubkeyToAddress(key1.PublicKey)
 
 		hash1 = common.BytesToHash([]byte("topic1"))
 		hash2 = common.BytesToHash([]byte("topic2"))
diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go
index 73b6c89ceaafcb86f7b6ada46edb2201f4a8a53c..245091df35a37af5e358a8d75f782ee570d07dae 100644
--- a/internal/ethapi/backend.go
+++ b/internal/ethapi/backend.go
@@ -43,7 +43,6 @@ type Backend interface {
 	ProtocolVersion() int
 	SuggestPrice(ctx context.Context) (*big.Int, error)
 	ChainDb() ethdb.Database
-	EventMux() *event.TypeMux
 	AccountManager() *accounts.Manager
 	ExtRPCEnabled() bool
 	RPCGasCap() *big.Int // global gas cap for eth_call over rpc: DoS protection
@@ -80,6 +79,7 @@ type Backend interface {
 	GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error)
 	ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
 	SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
+	SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription
 	SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
 
 	ChainConfig() *params.ChainConfig
diff --git a/les/api_backend.go b/les/api_backend.go
index e01e1be98bd2e83957600bbeb4c7cd780d80b439..5627c34d6a32a65fbb24351aa3ad2f5f8a81d360 100644
--- a/les/api_backend.go
+++ b/les/api_backend.go
@@ -225,6 +225,13 @@ func (b *LesApiBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscri
 	return b.eth.blockchain.SubscribeLogsEvent(ch)
 }
 
+func (b *LesApiBackend) SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription {
+	return event.NewSubscription(func(quit <-chan struct{}) error {
+		<-quit
+		return nil
+	})
+}
+
 func (b *LesApiBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription {
 	return b.eth.blockchain.SubscribeRemovedLogsEvent(ch)
 }
@@ -245,10 +252,6 @@ func (b *LesApiBackend) ChainDb() ethdb.Database {
 	return b.eth.chainDb
 }
 
-func (b *LesApiBackend) EventMux() *event.TypeMux {
-	return b.eth.eventMux
-}
-
 func (b *LesApiBackend) AccountManager() *accounts.Manager {
 	return b.eth.accountManager
 }
diff --git a/miner/miner.go b/miner/miner.go
index bc9af0f060596652ff2a47b2fb5903144bf7a30b..b968b3a92a1f689987a525efc6587bea5c0c05d7 100644
--- a/miner/miner.go
+++ b/miner/miner.go
@@ -182,3 +182,9 @@ func (miner *Miner) SetEtherbase(addr common.Address) {
 	miner.coinbase = addr
 	miner.worker.setEtherbase(addr)
 }
+
+// SubscribePendingLogs starts delivering logs from pending transactions
+// to the given channel.
+func (self *Miner) SubscribePendingLogs(ch chan<- []*types.Log) event.Subscription {
+	return self.worker.pendingLogsFeed.Subscribe(ch)
+}
diff --git a/miner/worker.go b/miner/worker.go
index c95b32042f1450475b0a67e9b493642c7a19a61a..d3cd10ed26d4afe986c54b1624c01268c42106c4 100644
--- a/miner/worker.go
+++ b/miner/worker.go
@@ -128,6 +128,9 @@ type worker struct {
 	eth         Backend
 	chain       *core.BlockChain
 
+	// Feeds
+	pendingLogsFeed event.Feed
+
 	// Subscriptions
 	mux          *event.TypeMux
 	txsCh        chan core.NewTxsEvent
@@ -809,7 +812,7 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin
 			cpy[i] = new(types.Log)
 			*cpy[i] = *l
 		}
-		go w.mux.Post(core.PendingLogsEvent{Logs: cpy})
+		w.pendingLogsFeed.Send(cpy)
 	}
 	// Notify resubmit loop to decrease resubmitting interval if current interval is larger
 	// than the user-specified one.