diff --git a/accounts/abi/bind/backends/simulated.go b/accounts/abi/bind/backends/simulated.go
index 14bf7bd7523801a72fdeddb319ad78baf5ba2ae1..e0ee06a0a18daa93ebf00961485389e1782dbd39 100644
--- a/accounts/abi/bind/backends/simulated.go
+++ b/accounts/abi/bind/backends/simulated.go
@@ -33,7 +33,6 @@ import (
 	"github.com/ethereum/go-ethereum/core/types"
 	"github.com/ethereum/go-ethereum/core/vm"
 	"github.com/ethereum/go-ethereum/ethdb"
-	"github.com/ethereum/go-ethereum/event"
 	"github.com/ethereum/go-ethereum/params"
 )
 
@@ -61,7 +60,7 @@ func NewSimulatedBackend(alloc core.GenesisAlloc) *SimulatedBackend {
 	database, _ := ethdb.NewMemDatabase()
 	genesis := core.Genesis{Config: params.AllProtocolChanges, Alloc: alloc}
 	genesis.MustCommit(database)
-	blockchain, _ := core.NewBlockChain(database, genesis.Config, ethash.NewFaker(), new(event.TypeMux), vm.Config{})
+	blockchain, _ := core.NewBlockChain(database, genesis.Config, ethash.NewFaker(), vm.Config{})
 	backend := &SimulatedBackend{database: database, blockchain: blockchain, config: genesis.Config}
 	backend.rollback()
 	return backend
diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go
index 04728b5c62ac66aff83dfe0516e6e5a9c8d97a7c..b2570847194f90f4ff5a4e587129f75dd6acec89 100644
--- a/cmd/utils/flags.go
+++ b/cmd/utils/flags.go
@@ -41,7 +41,6 @@ import (
 	"github.com/ethereum/go-ethereum/eth/gasprice"
 	"github.com/ethereum/go-ethereum/ethdb"
 	"github.com/ethereum/go-ethereum/ethstats"
-	"github.com/ethereum/go-ethereum/event"
 	"github.com/ethereum/go-ethereum/les"
 	"github.com/ethereum/go-ethereum/log"
 	"github.com/ethereum/go-ethereum/metrics"
@@ -1103,7 +1102,7 @@ func MakeChain(ctx *cli.Context, stack *node.Node) (chain *core.BlockChain, chai
 		Fatalf("%v", err)
 	}
 	vmcfg := vm.Config{EnablePreimageRecording: ctx.GlobalBool(VMEnableDebugFlag.Name)}
-	chain, err = core.NewBlockChain(chainDb, config, engine, new(event.TypeMux), vmcfg)
+	chain, err = core.NewBlockChain(chainDb, config, engine, vmcfg)
 	if err != nil {
 		Fatalf("Can't create BlockChain: %v", err)
 	}
diff --git a/core/bench_test.go b/core/bench_test.go
index b9250f7d39396f34ab677384b220bf594ec362aa..ab25c27d3913b271cee886a9b199c72b128af1d5 100644
--- a/core/bench_test.go
+++ b/core/bench_test.go
@@ -30,7 +30,6 @@ import (
 	"github.com/ethereum/go-ethereum/core/vm"
 	"github.com/ethereum/go-ethereum/crypto"
 	"github.com/ethereum/go-ethereum/ethdb"
-	"github.com/ethereum/go-ethereum/event"
 	"github.com/ethereum/go-ethereum/params"
 )
 
@@ -175,8 +174,7 @@ func benchInsertChain(b *testing.B, disk bool, gen func(int, *BlockGen)) {
 
 	// Time the insertion of the new chain.
 	// State and blocks are stored in the same DB.
-	evmux := new(event.TypeMux)
-	chainman, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), evmux, vm.Config{})
+	chainman, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), vm.Config{})
 	defer chainman.Stop()
 	b.ReportAllocs()
 	b.ResetTimer()
@@ -286,7 +284,7 @@ func benchReadChain(b *testing.B, full bool, count uint64) {
 		if err != nil {
 			b.Fatalf("error opening database at %v: %v", dir, err)
 		}
-		chain, err := NewBlockChain(db, params.TestChainConfig, ethash.NewFaker(), new(event.TypeMux), vm.Config{})
+		chain, err := NewBlockChain(db, params.TestChainConfig, ethash.NewFaker(), vm.Config{})
 		if err != nil {
 			b.Fatalf("error creating chain: %v", err)
 		}
diff --git a/core/block_validator_test.go b/core/block_validator_test.go
index c0afc2955fe8c62369ffb374f3483180816d64ed..6d54c2b9326349c5c6338eff24ad67b74831a975 100644
--- a/core/block_validator_test.go
+++ b/core/block_validator_test.go
@@ -25,7 +25,6 @@ import (
 	"github.com/ethereum/go-ethereum/core/types"
 	"github.com/ethereum/go-ethereum/core/vm"
 	"github.com/ethereum/go-ethereum/ethdb"
-	"github.com/ethereum/go-ethereum/event"
 	"github.com/ethereum/go-ethereum/params"
 )
 
@@ -43,7 +42,7 @@ func TestHeaderVerification(t *testing.T) {
 		headers[i] = block.Header()
 	}
 	// Run the header checker for blocks one-by-one, checking for both valid and invalid nonces
-	chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFaker(), new(event.TypeMux), vm.Config{})
+	chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFaker(), vm.Config{})
 	defer chain.Stop()
 
 	for i := 0; i < len(blocks); i++ {
@@ -107,11 +106,11 @@ func testHeaderConcurrentVerification(t *testing.T, threads int) {
 		var results <-chan error
 
 		if valid {
-			chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFaker(), new(event.TypeMux), vm.Config{})
+			chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFaker(), vm.Config{})
 			_, results = chain.engine.VerifyHeaders(chain, headers, seals)
 			chain.Stop()
 		} else {
-			chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFakeFailer(uint64(len(headers)-1)), new(event.TypeMux), vm.Config{})
+			chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFakeFailer(uint64(len(headers)-1)), vm.Config{})
 			_, results = chain.engine.VerifyHeaders(chain, headers, seals)
 			chain.Stop()
 		}
@@ -174,7 +173,7 @@ func testHeaderConcurrentAbortion(t *testing.T, threads int) {
 	defer runtime.GOMAXPROCS(old)
 
 	// Start the verifications and immediately abort
-	chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFakeDelayer(time.Millisecond), new(event.TypeMux), vm.Config{})
+	chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFakeDelayer(time.Millisecond), vm.Config{})
 	defer chain.Stop()
 
 	abort, results := chain.engine.VerifyHeaders(chain, headers, seals)
diff --git a/core/blockchain.go b/core/blockchain.go
index 3fb8be15f45a9f6ded432dc0dd7c11ff25e6817c..f3ca4e08cf1bf115c25eedc4d6e5ffe9287ea9a1 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -79,10 +79,16 @@ const (
 type BlockChain struct {
 	config *params.ChainConfig // chain & network configuration
 
-	hc           *HeaderChain
-	chainDb      ethdb.Database
-	eventMux     *event.TypeMux
-	genesisBlock *types.Block
+	hc            *HeaderChain
+	chainDb       ethdb.Database
+	rmTxFeed      event.Feed
+	rmLogsFeed    event.Feed
+	chainFeed     event.Feed
+	chainSideFeed event.Feed
+	chainHeadFeed event.Feed
+	logsFeed      event.Feed
+	scope         event.SubscriptionScope
+	genesisBlock  *types.Block
 
 	mu      sync.RWMutex // global mutex for locking chain operations
 	chainmu sync.RWMutex // blockchain insertion lock
@@ -115,7 +121,7 @@ type BlockChain struct {
 // NewBlockChain returns a fully initialised block chain using information
 // available in the database. It initialises the default Ethereum Validator and
 // Processor.
-func NewBlockChain(chainDb ethdb.Database, config *params.ChainConfig, engine consensus.Engine, mux *event.TypeMux, vmConfig vm.Config) (*BlockChain, error) {
+func NewBlockChain(chainDb ethdb.Database, config *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config) (*BlockChain, error) {
 	bodyCache, _ := lru.New(bodyCacheLimit)
 	bodyRLPCache, _ := lru.New(bodyCacheLimit)
 	blockCache, _ := lru.New(blockCacheLimit)
@@ -126,7 +132,6 @@ func NewBlockChain(chainDb ethdb.Database, config *params.ChainConfig, engine co
 		config:       config,
 		chainDb:      chainDb,
 		stateCache:   state.NewDatabase(chainDb),
-		eventMux:     mux,
 		quit:         make(chan struct{}),
 		bodyCache:    bodyCache,
 		bodyRLPCache: bodyRLPCache,
@@ -594,6 +599,8 @@ func (bc *BlockChain) Stop() {
 	if !atomic.CompareAndSwapInt32(&bc.running, 0, 1) {
 		return
 	}
+	// Unsubscribe all subscriptions registered from blockchain
+	bc.scope.Close()
 	close(bc.quit)
 	atomic.StoreInt32(&bc.procInterrupt, 1)
 
@@ -1000,6 +1007,12 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
 
 			blockInsertTimer.UpdateSince(bstart)
 			events = append(events, ChainEvent{block, block.Hash(), logs})
+			// We need some control over the mining operation. Acquiring locks and waiting
+			// for the miner to create new block takes too long and in most cases isn't
+			// even necessary.
+			if bc.LastBlockHash() == block.Hash() {
+				events = append(events, ChainHeadEvent{block})
+			}
 
 			// Write the positional metadata for transaction and receipt lookups
 			if err := WriteTxLookupEntries(bc.chainDb, block); err != nil {
@@ -1024,7 +1037,7 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
 		stats.usedGas += usedGas.Uint64()
 		stats.report(chain, i)
 	}
-	go bc.postChainEvents(events, coalescedLogs)
+	go bc.PostChainEvents(events, coalescedLogs)
 
 	return 0, nil
 }
@@ -1184,16 +1197,16 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
 	// Must be posted in a goroutine because of the transaction pool trying
 	// to acquire the chain manager lock
 	if len(diff) > 0 {
-		go bc.eventMux.Post(RemovedTransactionEvent{diff})
+		go bc.rmTxFeed.Send(RemovedTransactionEvent{diff})
 	}
 	if len(deletedLogs) > 0 {
-		go bc.eventMux.Post(RemovedLogsEvent{deletedLogs})
+		go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
 	}
 
 	if len(oldChain) > 0 {
 		go func() {
 			for _, block := range oldChain {
-				bc.eventMux.Post(ChainSideEvent{Block: block})
+				bc.chainSideFeed.Send(ChainSideEvent{Block: block})
 			}
 		}()
 	}
@@ -1201,22 +1214,25 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
 	return nil
 }
 
-// postChainEvents iterates over the events generated by a chain insertion and
-// posts them into the event mux.
-func (bc *BlockChain) postChainEvents(events []interface{}, logs []*types.Log) {
+// PostChainEvents iterates over the events generated by a chain insertion and
+// posts them into the event feed.
+// TODO: Should not expose PostChainEvents. The chain events should be posted in WriteBlock.
+func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) {
 	// post event logs for further processing
-	bc.eventMux.Post(logs)
+	if logs != nil {
+		bc.logsFeed.Send(logs)
+	}
 	for _, event := range events {
-		if event, ok := event.(ChainEvent); ok {
-			// We need some control over the mining operation. Acquiring locks and waiting
-			// for the miner to create new block takes too long and in most cases isn't
-			// even necessary.
-			if bc.LastBlockHash() == event.Hash {
-				bc.eventMux.Post(ChainHeadEvent{event.Block})
-			}
+		switch ev := event.(type) {
+		case ChainEvent:
+			bc.chainFeed.Send(ev)
+
+		case ChainHeadEvent:
+			bc.chainHeadFeed.Send(ev)
+
+		case ChainSideEvent:
+			bc.chainSideFeed.Send(ev)
 		}
-		// Fire the insertion events individually too
-		bc.eventMux.Post(event)
 	}
 }
 
@@ -1384,3 +1400,33 @@ func (bc *BlockChain) Config() *params.ChainConfig { return bc.config }
 
 // Engine retrieves the blockchain's consensus engine.
 func (bc *BlockChain) Engine() consensus.Engine { return bc.engine }
+
+// SubscribeRemovedTxEvent registers a subscription of RemovedTransactionEvent.
+func (bc *BlockChain) SubscribeRemovedTxEvent(ch chan<- RemovedTransactionEvent) event.Subscription {
+	return bc.scope.Track(bc.rmTxFeed.Subscribe(ch))
+}
+
+// SubscribeRemovedLogsEvent registers a subscription of RemovedLogsEvent.
+func (bc *BlockChain) SubscribeRemovedLogsEvent(ch chan<- RemovedLogsEvent) event.Subscription {
+	return bc.scope.Track(bc.rmLogsFeed.Subscribe(ch))
+}
+
+// SubscribeChainEvent registers a subscription of ChainEvent.
+func (bc *BlockChain) SubscribeChainEvent(ch chan<- ChainEvent) event.Subscription {
+	return bc.scope.Track(bc.chainFeed.Subscribe(ch))
+}
+
+// SubscribeChainHeadEvent registers a subscription of ChainHeadEvent.
+func (bc *BlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription {
+	return bc.scope.Track(bc.chainHeadFeed.Subscribe(ch))
+}
+
+// SubscribeChainSideEvent registers a subscription of ChainSideEvent.
+func (bc *BlockChain) SubscribeChainSideEvent(ch chan<- ChainSideEvent) event.Subscription {
+	return bc.scope.Track(bc.chainSideFeed.Subscribe(ch))
+}
+
+// SubscribeLogsEvent registers a subscription of []*types.Log.
+func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
+	return bc.scope.Track(bc.logsFeed.Subscribe(ch))
+}
diff --git a/core/blockchain_test.go b/core/blockchain_test.go
index 4a0f449408a5e94ab0fd22179eb66f77842cbaef..470974a0a76684fff33f5101a51970de57d43dcd 100644
--- a/core/blockchain_test.go
+++ b/core/blockchain_test.go
@@ -31,7 +31,6 @@ import (
 	"github.com/ethereum/go-ethereum/core/vm"
 	"github.com/ethereum/go-ethereum/crypto"
 	"github.com/ethereum/go-ethereum/ethdb"
-	"github.com/ethereum/go-ethereum/event"
 	"github.com/ethereum/go-ethereum/params"
 )
 
@@ -47,7 +46,7 @@ func newTestBlockChain(fake bool) *BlockChain {
 	if !fake {
 		engine = ethash.NewTester()
 	}
-	blockchain, err := NewBlockChain(db, gspec.Config, engine, new(event.TypeMux), vm.Config{})
+	blockchain, err := NewBlockChain(db, gspec.Config, engine, vm.Config{})
 	if err != nil {
 		panic(err)
 	}
@@ -497,7 +496,7 @@ func testReorgBadHashes(t *testing.T, full bool) {
 	}
 
 	// Create a new BlockChain and check that it rolled back the state.
-	ncm, err := NewBlockChain(bc.chainDb, bc.config, ethash.NewFaker(), new(event.TypeMux), vm.Config{})
+	ncm, err := NewBlockChain(bc.chainDb, bc.config, ethash.NewFaker(), vm.Config{})
 	if err != nil {
 		t.Fatalf("failed to create new chain manager: %v", err)
 	}
@@ -610,7 +609,7 @@ func TestFastVsFullChains(t *testing.T) {
 	// Import the chain as an archive node for the comparison baseline
 	archiveDb, _ := ethdb.NewMemDatabase()
 	gspec.MustCommit(archiveDb)
-	archive, _ := NewBlockChain(archiveDb, gspec.Config, ethash.NewFaker(), new(event.TypeMux), vm.Config{})
+	archive, _ := NewBlockChain(archiveDb, gspec.Config, ethash.NewFaker(), vm.Config{})
 	defer archive.Stop()
 
 	if n, err := archive.InsertChain(blocks); err != nil {
@@ -619,7 +618,7 @@ func TestFastVsFullChains(t *testing.T) {
 	// Fast import the chain as a non-archive node to test
 	fastDb, _ := ethdb.NewMemDatabase()
 	gspec.MustCommit(fastDb)
-	fast, _ := NewBlockChain(fastDb, gspec.Config, ethash.NewFaker(), new(event.TypeMux), vm.Config{})
+	fast, _ := NewBlockChain(fastDb, gspec.Config, ethash.NewFaker(), vm.Config{})
 	defer fast.Stop()
 
 	headers := make([]*types.Header, len(blocks))
@@ -697,7 +696,7 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) {
 	archiveDb, _ := ethdb.NewMemDatabase()
 	gspec.MustCommit(archiveDb)
 
-	archive, _ := NewBlockChain(archiveDb, gspec.Config, ethash.NewFaker(), new(event.TypeMux), vm.Config{})
+	archive, _ := NewBlockChain(archiveDb, gspec.Config, ethash.NewFaker(), vm.Config{})
 	if n, err := archive.InsertChain(blocks); err != nil {
 		t.Fatalf("failed to process block %d: %v", n, err)
 	}
@@ -710,7 +709,7 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) {
 	// Import the chain as a non-archive node and ensure all pointers are updated
 	fastDb, _ := ethdb.NewMemDatabase()
 	gspec.MustCommit(fastDb)
-	fast, _ := NewBlockChain(fastDb, gspec.Config, ethash.NewFaker(), new(event.TypeMux), vm.Config{})
+	fast, _ := NewBlockChain(fastDb, gspec.Config, ethash.NewFaker(), vm.Config{})
 	defer fast.Stop()
 
 	headers := make([]*types.Header, len(blocks))
@@ -731,7 +730,7 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) {
 	lightDb, _ := ethdb.NewMemDatabase()
 	gspec.MustCommit(lightDb)
 
-	light, _ := NewBlockChain(lightDb, gspec.Config, ethash.NewFaker(), new(event.TypeMux), vm.Config{})
+	light, _ := NewBlockChain(lightDb, gspec.Config, ethash.NewFaker(), vm.Config{})
 	if n, err := light.InsertHeaderChain(headers, 1); err != nil {
 		t.Fatalf("failed to insert header %d: %v", n, err)
 	}
@@ -800,8 +799,7 @@ func TestChainTxReorgs(t *testing.T) {
 		}
 	})
 	// Import the chain. This runs all block validation rules.
-	evmux := &event.TypeMux{}
-	blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), evmux, vm.Config{})
+	blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), vm.Config{})
 	if i, err := blockchain.InsertChain(chain); err != nil {
 		t.Fatalf("failed to insert original chain[%d]: %v", i, err)
 	}
@@ -872,11 +870,11 @@ func TestLogReorgs(t *testing.T) {
 		signer  = types.NewEIP155Signer(gspec.Config.ChainId)
 	)
 
-	var evmux event.TypeMux
-	blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), &evmux, vm.Config{})
+	blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), vm.Config{})
 	defer blockchain.Stop()
 
-	subs := evmux.Subscribe(RemovedLogsEvent{})
+	rmLogsCh := make(chan RemovedLogsEvent)
+	blockchain.SubscribeRemovedLogsEvent(rmLogsCh)
 	chain, _ := GenerateChain(params.TestChainConfig, genesis, db, 2, func(i int, gen *BlockGen) {
 		if i == 1 {
 			tx, err := types.SignTx(types.NewContractCreation(gen.TxNonce(addr1), new(big.Int), big.NewInt(1000000), new(big.Int), code), signer, key1)
@@ -895,9 +893,14 @@ func TestLogReorgs(t *testing.T) {
 		t.Fatalf("failed to insert forked chain: %v", err)
 	}
 
-	ev := <-subs.Chan()
-	if len(ev.Data.(RemovedLogsEvent).Logs) == 0 {
-		t.Error("expected logs")
+	timeout := time.NewTimer(1 * time.Second)
+	select {
+	case ev := <-rmLogsCh:
+		if len(ev.Logs) == 0 {
+			t.Error("expected logs")
+		}
+	case <-timeout.C:
+		t.Fatal("Timeout. There is no RemovedLogsEvent has been sent.")
 	}
 }
 
@@ -914,8 +917,7 @@ func TestReorgSideEvent(t *testing.T) {
 		signer  = types.NewEIP155Signer(gspec.Config.ChainId)
 	)
 
-	evmux := &event.TypeMux{}
-	blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), evmux, vm.Config{})
+	blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), vm.Config{})
 	defer blockchain.Stop()
 
 	chain, _ := GenerateChain(gspec.Config, genesis, db, 3, func(i int, gen *BlockGen) {})
@@ -933,7 +935,8 @@ func TestReorgSideEvent(t *testing.T) {
 		}
 		gen.AddTx(tx)
 	})
-	subs := evmux.Subscribe(ChainSideEvent{})
+	chainSideCh := make(chan ChainSideEvent)
+	blockchain.SubscribeChainSideEvent(chainSideCh)
 	if _, err := blockchain.InsertChain(replacementBlocks); err != nil {
 		t.Fatalf("failed to insert chain: %v", err)
 	}
@@ -956,8 +959,8 @@ func TestReorgSideEvent(t *testing.T) {
 done:
 	for {
 		select {
-		case ev := <-subs.Chan():
-			block := ev.Data.(ChainSideEvent).Block
+		case ev := <-chainSideCh:
+			block := ev.Block
 			if _, ok := expectedSideHashes[block.Hash()]; !ok {
 				t.Errorf("%d: didn't expect %x to be in side chain", i, block.Hash())
 			}
@@ -977,7 +980,7 @@ done:
 
 	// make sure no more events are fired
 	select {
-	case e := <-subs.Chan():
+	case e := <-chainSideCh:
 		t.Errorf("unexpected event fired: %v", e)
 	case <-time.After(250 * time.Millisecond):
 	}
@@ -1038,10 +1041,9 @@ func TestEIP155Transition(t *testing.T) {
 			Alloc:  GenesisAlloc{address: {Balance: funds}, deleteAddr: {Balance: new(big.Int)}},
 		}
 		genesis = gspec.MustCommit(db)
-		mux     event.TypeMux
 	)
 
-	blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), &mux, vm.Config{})
+	blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), vm.Config{})
 	defer blockchain.Stop()
 
 	blocks, _ := GenerateChain(gspec.Config, genesis, db, 4, func(i int, block *BlockGen) {
@@ -1148,9 +1150,8 @@ func TestEIP161AccountRemoval(t *testing.T) {
 			Alloc: GenesisAlloc{address: {Balance: funds}},
 		}
 		genesis = gspec.MustCommit(db)
-		mux     event.TypeMux
 	)
-	blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), &mux, vm.Config{})
+	blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), vm.Config{})
 	defer blockchain.Stop()
 
 	blocks, _ := GenerateChain(gspec.Config, genesis, db, 3, func(i int, block *BlockGen) {
diff --git a/core/chain_makers.go b/core/chain_makers.go
index 976a8114d3af4f39c05928339663da54f4676cf6..cb5825d18797c02e232ca31f4813856010050e2b 100644
--- a/core/chain_makers.go
+++ b/core/chain_makers.go
@@ -27,7 +27,6 @@ import (
 	"github.com/ethereum/go-ethereum/core/types"
 	"github.com/ethereum/go-ethereum/core/vm"
 	"github.com/ethereum/go-ethereum/ethdb"
-	"github.com/ethereum/go-ethereum/event"
 	"github.com/ethereum/go-ethereum/params"
 )
 
@@ -236,7 +235,7 @@ func newCanonical(n int, full bool) (ethdb.Database, *BlockChain, error) {
 	db, _ := ethdb.NewMemDatabase()
 	genesis := gspec.MustCommit(db)
 
-	blockchain, _ := NewBlockChain(db, params.AllProtocolChanges, ethash.NewFaker(), new(event.TypeMux), vm.Config{})
+	blockchain, _ := NewBlockChain(db, params.AllProtocolChanges, ethash.NewFaker(), vm.Config{})
 	// Create and inject the requested chain
 	if n == 0 {
 		return db, blockchain, nil
diff --git a/core/chain_makers_test.go b/core/chain_makers_test.go
index 28eb76c6342dddf3233bf60aa50b1c04190be33a..2260c62fbda9b6c90b720dd452ed93a5592ce21e 100644
--- a/core/chain_makers_test.go
+++ b/core/chain_makers_test.go
@@ -25,7 +25,6 @@ import (
 	"github.com/ethereum/go-ethereum/core/vm"
 	"github.com/ethereum/go-ethereum/crypto"
 	"github.com/ethereum/go-ethereum/ethdb"
-	"github.com/ethereum/go-ethereum/event"
 	"github.com/ethereum/go-ethereum/params"
 )
 
@@ -80,9 +79,7 @@ func ExampleGenerateChain() {
 	})
 
 	// Import the chain. This runs all block validation rules.
-	evmux := &event.TypeMux{}
-
-	blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), evmux, vm.Config{})
+	blockchain, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), vm.Config{})
 	defer blockchain.Stop()
 
 	if i, err := blockchain.InsertChain(chain); err != nil {
diff --git a/core/dao_test.go b/core/dao_test.go
index 99bf1ecaef8f5d37ad9de69107b6ea002d26cbde..d6e11d78ae8b55e62e2fbaafd89542ae890ec8d4 100644
--- a/core/dao_test.go
+++ b/core/dao_test.go
@@ -23,7 +23,6 @@ import (
 	"github.com/ethereum/go-ethereum/consensus/ethash"
 	"github.com/ethereum/go-ethereum/core/vm"
 	"github.com/ethereum/go-ethereum/ethdb"
-	"github.com/ethereum/go-ethereum/event"
 	"github.com/ethereum/go-ethereum/params"
 )
 
@@ -42,13 +41,13 @@ func TestDAOForkRangeExtradata(t *testing.T) {
 	proDb, _ := ethdb.NewMemDatabase()
 	gspec.MustCommit(proDb)
 	proConf := &params.ChainConfig{HomesteadBlock: big.NewInt(0), DAOForkBlock: forkBlock, DAOForkSupport: true}
-	proBc, _ := NewBlockChain(proDb, proConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{})
+	proBc, _ := NewBlockChain(proDb, proConf, ethash.NewFaker(), vm.Config{})
 	defer proBc.Stop()
 
 	conDb, _ := ethdb.NewMemDatabase()
 	gspec.MustCommit(conDb)
 	conConf := &params.ChainConfig{HomesteadBlock: big.NewInt(0), DAOForkBlock: forkBlock, DAOForkSupport: false}
-	conBc, _ := NewBlockChain(conDb, conConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{})
+	conBc, _ := NewBlockChain(conDb, conConf, ethash.NewFaker(), vm.Config{})
 	defer conBc.Stop()
 
 	if _, err := proBc.InsertChain(prefix); err != nil {
@@ -62,8 +61,7 @@ func TestDAOForkRangeExtradata(t *testing.T) {
 		// Create a pro-fork block, and try to feed into the no-fork chain
 		db, _ = ethdb.NewMemDatabase()
 		gspec.MustCommit(db)
-
-		bc, _ := NewBlockChain(db, conConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{})
+		bc, _ := NewBlockChain(db, conConf, ethash.NewFaker(), vm.Config{})
 		defer bc.Stop()
 
 		blocks := conBc.GetBlocksFromHash(conBc.CurrentBlock().Hash(), int(conBc.CurrentBlock().NumberU64()))
@@ -85,8 +83,7 @@ func TestDAOForkRangeExtradata(t *testing.T) {
 		// Create a no-fork block, and try to feed into the pro-fork chain
 		db, _ = ethdb.NewMemDatabase()
 		gspec.MustCommit(db)
-
-		bc, _ = NewBlockChain(db, proConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{})
+		bc, _ = NewBlockChain(db, proConf, ethash.NewFaker(), vm.Config{})
 		defer bc.Stop()
 
 		blocks = proBc.GetBlocksFromHash(proBc.CurrentBlock().Hash(), int(proBc.CurrentBlock().NumberU64()))
@@ -109,8 +106,7 @@ func TestDAOForkRangeExtradata(t *testing.T) {
 	// Verify that contra-forkers accept pro-fork extra-datas after forking finishes
 	db, _ = ethdb.NewMemDatabase()
 	gspec.MustCommit(db)
-
-	bc, _ := NewBlockChain(db, conConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{})
+	bc, _ := NewBlockChain(db, conConf, ethash.NewFaker(), vm.Config{})
 	defer bc.Stop()
 
 	blocks := conBc.GetBlocksFromHash(conBc.CurrentBlock().Hash(), int(conBc.CurrentBlock().NumberU64()))
@@ -127,8 +123,7 @@ func TestDAOForkRangeExtradata(t *testing.T) {
 	// Verify that pro-forkers accept contra-fork extra-datas after forking finishes
 	db, _ = ethdb.NewMemDatabase()
 	gspec.MustCommit(db)
-
-	bc, _ = NewBlockChain(db, proConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{})
+	bc, _ = NewBlockChain(db, proConf, ethash.NewFaker(), vm.Config{})
 	defer bc.Stop()
 
 	blocks = proBc.GetBlocksFromHash(proBc.CurrentBlock().Hash(), int(proBc.CurrentBlock().NumberU64()))
diff --git a/core/genesis_test.go b/core/genesis_test.go
index 8b193759fc74f49c75f69d46802f4814d2d2a5f3..482f86190435a450a276a50694a411655abe8b28 100644
--- a/core/genesis_test.go
+++ b/core/genesis_test.go
@@ -26,7 +26,6 @@ import (
 	"github.com/ethereum/go-ethereum/consensus/ethash"
 	"github.com/ethereum/go-ethereum/core/vm"
 	"github.com/ethereum/go-ethereum/ethdb"
-	"github.com/ethereum/go-ethereum/event"
 	"github.com/ethereum/go-ethereum/params"
 )
 
@@ -119,9 +118,8 @@ func TestSetupGenesis(t *testing.T) {
 				// Commit the 'old' genesis block with Homestead transition at #2.
 				// Advance to block #4, past the homestead transition block of customg.
 				genesis := oldcustomg.MustCommit(db)
-				bc, _ := NewBlockChain(db, oldcustomg.Config, ethash.NewFullFaker(), new(event.TypeMux), vm.Config{})
+				bc, _ := NewBlockChain(db, oldcustomg.Config, ethash.NewFullFaker(), vm.Config{})
 				defer bc.Stop()
-
 				bc.SetValidator(bproc{})
 				bc.InsertChain(makeBlockChainWithDiff(genesis, []int{2, 3, 4, 5}, 0))
 				bc.CurrentBlock()
diff --git a/core/tx_pool.go b/core/tx_pool.go
index b0c251f921cbb50a5776b6a49fa18ea67f7e5522..d835c94d1a8dc8f08aa9d0e1ee63e6ede16c80c4 100644
--- a/core/tx_pool.go
+++ b/core/tx_pool.go
@@ -34,6 +34,13 @@ import (
 	"gopkg.in/karalabe/cookiejar.v2/collections/prque"
 )
 
+const (
+	// chainHeadChanSize is the size of channel listening to ChainHeadEvent.
+	chainHeadChanSize = 10
+	// rmTxChanSize is the size of channel listening to RemovedTransactionEvent.
+	rmTxChanSize = 10
+)
+
 var (
 	// ErrInvalidSender is returned if the transaction contains an invalid signature.
 	ErrInvalidSender = errors.New("invalid sender")
@@ -95,7 +102,14 @@ var (
 	underpricedTxCounter = metrics.NewCounter("txpool/underpriced")
 )
 
-type stateFn func() (*state.StateDB, error)
+// blockChain provides the state of blockchain and current gas limit to do
+// some pre checks in tx pool and event subscribers.
+type blockChain interface {
+	State() (*state.StateDB, error)
+	GasLimit() *big.Int
+	SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription
+	SubscribeRemovedTxEvent(ch chan<- RemovedTransactionEvent) event.Subscription
+}
 
 // TxPoolConfig are the configuration parameters of the transaction pool.
 type TxPoolConfig struct {
@@ -160,12 +174,15 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig {
 type TxPool struct {
 	config       TxPoolConfig
 	chainconfig  *params.ChainConfig
-	currentState stateFn // The state function which will allow us to do some pre checks
+	blockChain   blockChain
 	pendingState *state.ManagedState
-	gasLimit     func() *big.Int // The current gas limit function callback
 	gasPrice     *big.Int
-	eventMux     *event.TypeMux
-	events       *event.TypeMuxSubscription
+	txFeed       event.Feed
+	scope        event.SubscriptionScope
+	chainHeadCh  chan ChainHeadEvent
+	chainHeadSub event.Subscription
+	rmTxCh       chan RemovedTransactionEvent
+	rmTxSub      event.Subscription
 	signer       types.Signer
 	mu           sync.RWMutex
 
@@ -185,7 +202,7 @@ type TxPool struct {
 
 // NewTxPool creates a new transaction pool to gather, sort and filter inbound
 // trnsactions from the network.
-func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool {
+func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, blockChain blockChain) *TxPool {
 	// Sanitize the input to ensure no vulnerable gas prices are set
 	config = (&config).sanitize()
 
@@ -193,17 +210,16 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, eventMux *e
 	pool := &TxPool{
 		config:       config,
 		chainconfig:  chainconfig,
+		blockChain:   blockChain,
 		signer:       types.NewEIP155Signer(chainconfig.ChainId),
 		pending:      make(map[common.Address]*txList),
 		queue:        make(map[common.Address]*txList),
 		beats:        make(map[common.Address]time.Time),
 		all:          make(map[common.Hash]*types.Transaction),
-		eventMux:     eventMux,
-		currentState: currentStateFn,
-		gasLimit:     gasLimitFn,
+		chainHeadCh:  make(chan ChainHeadEvent, chainHeadChanSize),
+		rmTxCh:       make(chan RemovedTransactionEvent, rmTxChanSize),
 		gasPrice:     new(big.Int).SetUint64(config.PriceLimit),
 		pendingState: nil,
-		events:       eventMux.Subscribe(ChainHeadEvent{}, RemovedTransactionEvent{}),
 	}
 	pool.locals = newAccountSet(pool.signer)
 	pool.priced = newTxPricedList(&pool.all)
@@ -220,6 +236,9 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, eventMux *e
 			log.Warn("Failed to rotate transaction journal", "err", err)
 		}
 	}
+	// Subscribe events from blockchain
+	pool.chainHeadSub = pool.blockChain.SubscribeChainHeadEvent(pool.chainHeadCh)
+	pool.rmTxSub = pool.blockChain.SubscribeRemovedTxEvent(pool.rmTxCh)
 	// Start the event loop and return
 	pool.wg.Add(1)
 	go pool.loop()
@@ -248,25 +267,27 @@ func (pool *TxPool) loop() {
 	// Keep waiting for and reacting to the various events
 	for {
 		select {
-		// Handle any events fired by the system
-		case ev, ok := <-pool.events.Chan():
-			if !ok {
-				return
-			}
-			switch ev := ev.Data.(type) {
-			case ChainHeadEvent:
-				pool.mu.Lock()
-				if ev.Block != nil {
-					if pool.chainconfig.IsHomestead(ev.Block.Number()) {
-						pool.homestead = true
-					}
+		// Handle ChainHeadEvent
+		case ev := <-pool.chainHeadCh:
+			pool.mu.Lock()
+			if ev.Block != nil {
+				if pool.chainconfig.IsHomestead(ev.Block.Number()) {
+					pool.homestead = true
 				}
-				pool.reset()
-				pool.mu.Unlock()
 
-			case RemovedTransactionEvent:
-				pool.addTxs(ev.Txs, false)
 			}
+			pool.reset()
+			pool.mu.Unlock()
+		// Be unsubscribed due to system stopped
+		case <-pool.chainHeadSub.Err():
+			return
+
+		// Handle RemovedTransactionEvent
+		case ev := <-pool.rmTxCh:
+			pool.addTxs(ev.Txs, false)
+		// Be unsubscribed due to system stopped
+		case <-pool.rmTxSub.Err():
+			return
 
 		// Handle stats reporting ticks
 		case <-report.C:
@@ -322,7 +343,7 @@ func (pool *TxPool) lockedReset() {
 // reset retrieves the current state of the blockchain and ensures the content
 // of the transaction pool is valid with regard to the chain state.
 func (pool *TxPool) reset() {
-	currentState, err := pool.currentState()
+	currentState, err := pool.blockChain.State()
 	if err != nil {
 		log.Error("Failed reset txpool state", "err", err)
 		return
@@ -347,7 +368,11 @@ func (pool *TxPool) reset() {
 
 // Stop terminates the transaction pool.
 func (pool *TxPool) Stop() {
-	pool.events.Unsubscribe()
+	// Unsubscribe all subscriptions registered from txpool
+	pool.scope.Close()
+	// Unsubscribe subscriptions registered from blockchain
+	pool.chainHeadSub.Unsubscribe()
+	pool.rmTxSub.Unsubscribe()
 	pool.wg.Wait()
 
 	if pool.journal != nil {
@@ -356,6 +381,12 @@ func (pool *TxPool) Stop() {
 	log.Info("Transaction pool stopped")
 }
 
+// SubscribeTxPreEvent registers a subscription of TxPreEvent and
+// starts sending event to the given channel.
+func (pool *TxPool) SubscribeTxPreEvent(ch chan<- TxPreEvent) event.Subscription {
+	return pool.scope.Track(pool.txFeed.Subscribe(ch))
+}
+
 // GasPrice returns the current gas price enforced by the transaction pool.
 func (pool *TxPool) GasPrice() *big.Int {
 	pool.mu.RLock()
@@ -468,7 +499,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
 		return ErrNegativeValue
 	}
 	// Ensure the transaction doesn't exceed the current block limit gas.
-	if pool.gasLimit().Cmp(tx.Gas()) < 0 {
+	if pool.blockChain.GasLimit().Cmp(tx.Gas()) < 0 {
 		return ErrGasLimit
 	}
 	// Make sure the transaction is signed properly
@@ -482,7 +513,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
 		return ErrUnderpriced
 	}
 	// Ensure the transaction adheres to nonce ordering
-	currentState, err := pool.currentState()
+	currentState, err := pool.blockChain.State()
 	if err != nil {
 		return err
 	}
@@ -647,7 +678,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
 	// Set the potentially new pending nonce and notify any subsystems of the new tx
 	pool.beats[addr] = time.Now()
 	pool.pendingState.SetNonce(addr, tx.Nonce()+1)
-	go pool.eventMux.Post(TxPreEvent{tx})
+	go pool.txFeed.Send(TxPreEvent{tx})
 }
 
 // AddLocal enqueues a single transaction into the pool if it is valid, marking
@@ -690,7 +721,7 @@ func (pool *TxPool) addTx(tx *types.Transaction, local bool) error {
 	}
 	// If we added a new transaction, run promotion checks and return
 	if !replace {
-		state, err := pool.currentState()
+		state, err := pool.blockChain.State()
 		if err != nil {
 			return err
 		}
@@ -717,7 +748,7 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) error {
 	}
 	// Only reprocess the internal state if something was actually added
 	if len(dirty) > 0 {
-		state, err := pool.currentState()
+		state, err := pool.blockChain.State()
 		if err != nil {
 			return err
 		}
@@ -804,7 +835,7 @@ func (pool *TxPool) removeTx(hash common.Hash) {
 // future queue to the set of pending transactions. During this process, all
 // invalidated transactions (low nonce, low balance) are deleted.
 func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.Address) {
-	gaslimit := pool.gasLimit()
+	gaslimit := pool.blockChain.GasLimit()
 
 	// Gather all the accounts potentially needing updates
 	if accounts == nil {
@@ -973,7 +1004,7 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A
 // executable/pending queue and any subsequent transactions that become unexecutable
 // are moved back into the future queue.
 func (pool *TxPool) demoteUnexecutables(state *state.StateDB) {
-	gaslimit := pool.gasLimit()
+	gaslimit := pool.blockChain.GasLimit()
 
 	// Iterate over all accounts and demote any non-executable transactions
 	for addr, list := range pool.pending {
diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go
index fcb330051c84f193ee83c88fd7cdd20d40e0cd9d..ee1ddd4bbc795f3efa62974c017222e3ddc6f565 100644
--- a/core/tx_pool_test.go
+++ b/core/tx_pool_test.go
@@ -44,6 +44,29 @@ func init() {
 	testTxPoolConfig.Journal = ""
 }
 
+type testBlockChain struct {
+	statedb       *state.StateDB
+	gasLimit      *big.Int
+	chainHeadFeed *event.Feed
+	rmTxFeed      *event.Feed
+}
+
+func (bc *testBlockChain) State() (*state.StateDB, error) {
+	return bc.statedb, nil
+}
+
+func (bc *testBlockChain) GasLimit() *big.Int {
+	return new(big.Int).Set(bc.gasLimit)
+}
+
+func (bc *testBlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription {
+	return bc.chainHeadFeed.Subscribe(ch)
+}
+
+func (bc *testBlockChain) SubscribeRemovedTxEvent(ch chan<- RemovedTransactionEvent) event.Subscription {
+	return bc.rmTxFeed.Subscribe(ch)
+}
+
 func transaction(nonce uint64, gaslimit *big.Int, key *ecdsa.PrivateKey) *types.Transaction {
 	return pricedTransaction(nonce, gaslimit, big.NewInt(1), key)
 }
@@ -56,9 +79,10 @@ func pricedTransaction(nonce uint64, gaslimit, gasprice *big.Int, key *ecdsa.Pri
 func setupTxPool() (*TxPool, *ecdsa.PrivateKey) {
 	db, _ := ethdb.NewMemDatabase()
 	statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
+	blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)}
 
 	key, _ := crypto.GenerateKey()
-	pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) })
+	pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
 
 	return pool, key
 }
@@ -96,6 +120,31 @@ func deriveSender(tx *types.Transaction) (common.Address, error) {
 	return types.Sender(types.HomesteadSigner{}, tx)
 }
 
+type testChain struct {
+	*testBlockChain
+	address common.Address
+	trigger *bool
+}
+
+// testChain.State() is used multiple times to reset the pending state.
+// when simulate is true it will create a state that indicates
+// that tx0 and tx1 are included in the chain.
+func (c *testChain) State() (*state.StateDB, error) {
+	// delay "state change" by one. The tx pool fetches the
+	// state multiple times and by delaying it a bit we simulate
+	// a state change between those fetches.
+	stdb := c.statedb
+	if *c.trigger {
+		db, _ := ethdb.NewMemDatabase()
+		c.statedb, _ = state.New(common.Hash{}, state.NewDatabase(db))
+		// simulate that the new head block included tx0 and tx1
+		c.statedb.SetNonce(c.address, 2)
+		c.statedb.SetBalance(c.address, new(big.Int).SetUint64(params.Ether))
+		*c.trigger = false
+	}
+	return stdb, nil
+}
+
 // This test simulates a scenario where a new block is imported during a
 // state reset and tests whether the pending state is in sync with the
 // block head event that initiated the resetState().
@@ -104,38 +153,18 @@ func TestStateChangeDuringPoolReset(t *testing.T) {
 		db, _      = ethdb.NewMemDatabase()
 		key, _     = crypto.GenerateKey()
 		address    = crypto.PubkeyToAddress(key.PublicKey)
-		mux        = new(event.TypeMux)
 		statedb, _ = state.New(common.Hash{}, state.NewDatabase(db))
 		trigger    = false
 	)
 
 	// setup pool with 2 transaction in it
 	statedb.SetBalance(address, new(big.Int).SetUint64(params.Ether))
+	blockchain := &testChain{&testBlockChain{statedb, big.NewInt(1000000000), new(event.Feed), new(event.Feed)}, address, &trigger}
 
 	tx0 := transaction(0, big.NewInt(100000), key)
 	tx1 := transaction(1, big.NewInt(100000), key)
 
-	// stateFunc is used multiple times to reset the pending state.
-	// when simulate is true it will create a state that indicates
-	// that tx0 and tx1 are included in the chain.
-	stateFunc := func() (*state.StateDB, error) {
-		// delay "state change" by one. The tx pool fetches the
-		// state multiple times and by delaying it a bit we simulate
-		// a state change between those fetches.
-		stdb := statedb
-		if trigger {
-			statedb, _ = state.New(common.Hash{}, state.NewDatabase(db))
-			// simulate that the new head block included tx0 and tx1
-			statedb.SetNonce(address, 2)
-			statedb.SetBalance(address, new(big.Int).SetUint64(params.Ether))
-			trigger = false
-		}
-		return stdb, nil
-	}
-
-	gasLimitFunc := func() *big.Int { return big.NewInt(1000000000) }
-
-	pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, mux, stateFunc, gasLimitFunc)
+	pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
 	defer pool.Stop()
 
 	nonce := pool.State().GetNonce(address)
@@ -176,7 +205,7 @@ func TestInvalidTransactions(t *testing.T) {
 
 	tx := transaction(0, big.NewInt(100), key)
 	from, _ := deriveSender(tx)
-	currentState, _ := pool.currentState()
+	currentState, _ := pool.blockChain.State()
 	currentState.AddBalance(from, big.NewInt(1))
 	if err := pool.AddRemote(tx); err != ErrInsufficientFunds {
 		t.Error("expected", ErrInsufficientFunds)
@@ -211,7 +240,7 @@ func TestTransactionQueue(t *testing.T) {
 
 	tx := transaction(0, big.NewInt(100), key)
 	from, _ := deriveSender(tx)
-	currentState, _ := pool.currentState()
+	currentState, _ := pool.blockChain.State()
 	currentState.AddBalance(from, big.NewInt(1000))
 	pool.lockedReset()
 	pool.enqueueTx(tx.Hash(), tx)
@@ -241,7 +270,7 @@ func TestTransactionQueue(t *testing.T) {
 	tx2 := transaction(10, big.NewInt(100), key)
 	tx3 := transaction(11, big.NewInt(100), key)
 	from, _ = deriveSender(tx1)
-	currentState, _ = pool.currentState()
+	currentState, _ = pool.blockChain.State()
 	currentState.AddBalance(from, big.NewInt(1000))
 	pool.lockedReset()
 
@@ -264,7 +293,7 @@ func TestRemoveTx(t *testing.T) {
 	defer pool.Stop()
 
 	addr := crypto.PubkeyToAddress(key.PublicKey)
-	currentState, _ := pool.currentState()
+	currentState, _ := pool.blockChain.State()
 	currentState.AddBalance(addr, big.NewInt(1))
 
 	tx1 := transaction(0, big.NewInt(100), key)
@@ -296,7 +325,7 @@ func TestNegativeValue(t *testing.T) {
 
 	tx, _ := types.SignTx(types.NewTransaction(0, common.Address{}, big.NewInt(-1), big.NewInt(100), big.NewInt(1), nil), types.HomesteadSigner{}, key)
 	from, _ := deriveSender(tx)
-	currentState, _ := pool.currentState()
+	currentState, _ := pool.blockChain.State()
 	currentState.AddBalance(from, big.NewInt(1))
 	if err := pool.AddRemote(tx); err != ErrNegativeValue {
 		t.Error("expected", ErrNegativeValue, "got", err)
@@ -311,8 +340,8 @@ func TestTransactionChainFork(t *testing.T) {
 	resetState := func() {
 		db, _ := ethdb.NewMemDatabase()
 		statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
-		pool.currentState = func() (*state.StateDB, error) { return statedb, nil }
-		currentState, _ := pool.currentState()
+		pool.blockChain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)}
+		currentState, _ := pool.blockChain.State()
 		currentState.AddBalance(addr, big.NewInt(100000000000000))
 		pool.lockedReset()
 	}
@@ -339,8 +368,8 @@ func TestTransactionDoubleNonce(t *testing.T) {
 	resetState := func() {
 		db, _ := ethdb.NewMemDatabase()
 		statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
-		pool.currentState = func() (*state.StateDB, error) { return statedb, nil }
-		currentState, _ := pool.currentState()
+		pool.blockChain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)}
+		currentState, _ := pool.blockChain.State()
 		currentState.AddBalance(addr, big.NewInt(100000000000000))
 		pool.lockedReset()
 	}
@@ -358,7 +387,7 @@ func TestTransactionDoubleNonce(t *testing.T) {
 	if replace, err := pool.add(tx2, false); err != nil || !replace {
 		t.Errorf("second transaction insert failed (%v) or not reported replacement (%v)", err, replace)
 	}
-	state, _ := pool.currentState()
+	state, _ := pool.blockChain.State()
 	pool.promoteExecutables(state, []common.Address{addr})
 	if pool.pending[addr].Len() != 1 {
 		t.Error("expected 1 pending transactions, got", pool.pending[addr].Len())
@@ -386,7 +415,7 @@ func TestMissingNonce(t *testing.T) {
 	defer pool.Stop()
 
 	addr := crypto.PubkeyToAddress(key.PublicKey)
-	currentState, _ := pool.currentState()
+	currentState, _ := pool.blockChain.State()
 	currentState.AddBalance(addr, big.NewInt(100000000000000))
 	tx := transaction(1, big.NewInt(100000), key)
 	if _, err := pool.add(tx, false); err != nil {
@@ -409,7 +438,7 @@ func TestNonceRecovery(t *testing.T) {
 	defer pool.Stop()
 
 	addr := crypto.PubkeyToAddress(key.PublicKey)
-	currentState, _ := pool.currentState()
+	currentState, _ := pool.blockChain.State()
 	currentState.SetNonce(addr, n)
 	currentState.AddBalance(addr, big.NewInt(100000000000000))
 	pool.lockedReset()
@@ -431,11 +460,14 @@ func TestRemovedTxEvent(t *testing.T) {
 
 	tx := transaction(0, big.NewInt(1000000), key)
 	from, _ := deriveSender(tx)
-	currentState, _ := pool.currentState()
+	currentState, _ := pool.blockChain.State()
 	currentState.AddBalance(from, big.NewInt(1000000000000))
 	pool.lockedReset()
-	pool.eventMux.Post(RemovedTransactionEvent{types.Transactions{tx}})
-	pool.eventMux.Post(ChainHeadEvent{nil})
+	blockChain, _ := pool.blockChain.(*testBlockChain)
+	blockChain.rmTxFeed.Send(RemovedTransactionEvent{types.Transactions{tx}})
+	blockChain.chainHeadFeed.Send(ChainHeadEvent{nil})
+	// wait for handling events
+	<-time.After(500 * time.Millisecond)
 	if pool.pending[from].Len() != 1 {
 		t.Error("expected 1 pending tx, got", pool.pending[from].Len())
 	}
@@ -453,7 +485,7 @@ func TestTransactionDropping(t *testing.T) {
 
 	account, _ := deriveSender(transaction(0, big.NewInt(0), key))
 
-	state, _ := pool.currentState()
+	state, _ := pool.blockChain.State()
 	state.AddBalance(account, big.NewInt(1000))
 
 	// Add some pending and some queued transactions
@@ -518,7 +550,7 @@ func TestTransactionDropping(t *testing.T) {
 		t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 4)
 	}
 	// Reduce the block gas limit, check that invalidated transactions are dropped
-	pool.gasLimit = func() *big.Int { return big.NewInt(100) }
+	pool.blockChain.(*testBlockChain).gasLimit = big.NewInt(100)
 	pool.lockedReset()
 
 	if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok {
@@ -548,7 +580,7 @@ func TestTransactionPostponing(t *testing.T) {
 
 	account, _ := deriveSender(transaction(0, big.NewInt(0), key))
 
-	state, _ := pool.currentState()
+	state, _ := pool.blockChain.State()
 	state.AddBalance(account, big.NewInt(1000))
 
 	// Add a batch consecutive pending transactions for validation
@@ -624,7 +656,7 @@ func TestTransactionQueueAccountLimiting(t *testing.T) {
 
 	account, _ := deriveSender(transaction(0, big.NewInt(0), key))
 
-	state, _ := pool.currentState()
+	state, _ := pool.blockChain.State()
 	state.AddBalance(account, big.NewInt(1000000))
 	pool.lockedReset()
 
@@ -667,16 +699,17 @@ func testTransactionQueueGlobalLimiting(t *testing.T, nolocals bool) {
 	// Create the pool to test the limit enforcement with
 	db, _ := ethdb.NewMemDatabase()
 	statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
+	blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)}
 
 	config := testTxPoolConfig
 	config.NoLocals = nolocals
 	config.GlobalQueue = config.AccountQueue*3 - 1 // reduce the queue limits to shorten test time (-1 to make it non divisible)
 
-	pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) })
+	pool := NewTxPool(config, params.TestChainConfig, blockchain)
 	defer pool.Stop()
 
 	// Create a number of test accounts and fund them (last one will be the local)
-	state, _ := pool.currentState()
+	state, _ := pool.blockChain.State()
 
 	keys := make([]*ecdsa.PrivateKey, 5)
 	for i := 0; i < len(keys); i++ {
@@ -757,19 +790,20 @@ func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) {
 	// Create the pool to test the non-expiration enforcement
 	db, _ := ethdb.NewMemDatabase()
 	statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
+	blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)}
 
 	config := testTxPoolConfig
 	config.Lifetime = time.Second
 	config.NoLocals = nolocals
 
-	pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) })
+	pool := NewTxPool(config, params.TestChainConfig, blockchain)
 	defer pool.Stop()
 
 	// Create two test accounts to ensure remotes expire but locals do not
 	local, _ := crypto.GenerateKey()
 	remote, _ := crypto.GenerateKey()
 
-	state, _ := pool.currentState()
+	state, _ := pool.blockChain.State()
 	state.AddBalance(crypto.PubkeyToAddress(local.PublicKey), big.NewInt(1000000000))
 	state.AddBalance(crypto.PubkeyToAddress(remote.PublicKey), big.NewInt(1000000000))
 
@@ -821,7 +855,7 @@ func TestTransactionPendingLimiting(t *testing.T) {
 
 	account, _ := deriveSender(transaction(0, big.NewInt(0), key))
 
-	state, _ := pool.currentState()
+	state, _ := pool.blockChain.State()
 	state.AddBalance(account, big.NewInt(1000000))
 	pool.lockedReset()
 
@@ -853,7 +887,7 @@ func testTransactionLimitingEquivalency(t *testing.T, origin uint64) {
 	defer pool1.Stop()
 
 	account1, _ := deriveSender(transaction(0, big.NewInt(0), key1))
-	state1, _ := pool1.currentState()
+	state1, _ := pool1.blockChain.State()
 	state1.AddBalance(account1, big.NewInt(1000000))
 
 	for i := uint64(0); i < testTxPoolConfig.AccountQueue+5; i++ {
@@ -866,7 +900,7 @@ func testTransactionLimitingEquivalency(t *testing.T, origin uint64) {
 	defer pool2.Stop()
 
 	account2, _ := deriveSender(transaction(0, big.NewInt(0), key2))
-	state2, _ := pool2.currentState()
+	state2, _ := pool2.blockChain.State()
 	state2.AddBalance(account2, big.NewInt(1000000))
 
 	txns := []*types.Transaction{}
@@ -900,15 +934,16 @@ func TestTransactionPendingGlobalLimiting(t *testing.T) {
 	// Create the pool to test the limit enforcement with
 	db, _ := ethdb.NewMemDatabase()
 	statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
+	blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)}
 
 	config := testTxPoolConfig
 	config.GlobalSlots = config.AccountSlots * 10
 
-	pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) })
+	pool := NewTxPool(config, params.TestChainConfig, blockchain)
 	defer pool.Stop()
 
 	// Create a number of test accounts and fund them
-	state, _ := pool.currentState()
+	state, _ := pool.blockChain.State()
 
 	keys := make([]*ecdsa.PrivateKey, 5)
 	for i := 0; i < len(keys); i++ {
@@ -946,17 +981,18 @@ func TestTransactionCapClearsFromAll(t *testing.T) {
 	// Create the pool to test the limit enforcement with
 	db, _ := ethdb.NewMemDatabase()
 	statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
+	blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)}
 
 	config := testTxPoolConfig
 	config.AccountSlots = 2
 	config.AccountQueue = 2
 	config.GlobalSlots = 8
 
-	pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) })
+	pool := NewTxPool(config, params.TestChainConfig, blockchain)
 	defer pool.Stop()
 
 	// Create a number of test accounts and fund them
-	state, _ := pool.currentState()
+	state, _ := pool.blockChain.State()
 
 	key, _ := crypto.GenerateKey()
 	addr := crypto.PubkeyToAddress(key.PublicKey)
@@ -980,15 +1016,16 @@ func TestTransactionPendingMinimumAllowance(t *testing.T) {
 	// Create the pool to test the limit enforcement with
 	db, _ := ethdb.NewMemDatabase()
 	statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
+	blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)}
 
 	config := testTxPoolConfig
 	config.GlobalSlots = 0
 
-	pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) })
+	pool := NewTxPool(config, params.TestChainConfig, blockchain)
 	defer pool.Stop()
 
 	// Create a number of test accounts and fund them
-	state, _ := pool.currentState()
+	state, _ := pool.blockChain.State()
 
 	keys := make([]*ecdsa.PrivateKey, 5)
 	for i := 0; i < len(keys); i++ {
@@ -1028,12 +1065,13 @@ func TestTransactionPoolRepricing(t *testing.T) {
 	// Create the pool to test the pricing enforcement with
 	db, _ := ethdb.NewMemDatabase()
 	statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
+	blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)}
 
-	pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) })
+	pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
 	defer pool.Stop()
 
 	// Create a number of test accounts and fund them
-	state, _ := pool.currentState()
+	state, _ := pool.blockChain.State()
 
 	keys := make([]*ecdsa.PrivateKey, 3)
 	for i := 0; i < len(keys); i++ {
@@ -1112,16 +1150,17 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
 	// Create the pool to test the pricing enforcement with
 	db, _ := ethdb.NewMemDatabase()
 	statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
+	blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)}
 
 	config := testTxPoolConfig
 	config.GlobalSlots = 2
 	config.GlobalQueue = 2
 
-	pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) })
+	pool := NewTxPool(config, params.TestChainConfig, blockchain)
 	defer pool.Stop()
 
 	// Create a number of test accounts and fund them
-	state, _ := pool.currentState()
+	state, _ := pool.blockChain.State()
 
 	keys := make([]*ecdsa.PrivateKey, 3)
 	for i := 0; i < len(keys); i++ {
@@ -1199,14 +1238,15 @@ func TestTransactionReplacement(t *testing.T) {
 	// Create the pool to test the pricing enforcement with
 	db, _ := ethdb.NewMemDatabase()
 	statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
+	blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)}
 
-	pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) })
+	pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
 	defer pool.Stop()
 
 	// Create a test account to add transactions with
 	key, _ := crypto.GenerateKey()
 
-	state, _ := pool.currentState()
+	state, _ := pool.blockChain.State()
 	state.AddBalance(crypto.PubkeyToAddress(key.PublicKey), big.NewInt(1000000000))
 
 	// Add pending transactions, ensuring the minimum price bump is enforced for replacement (for ultra low prices too)
@@ -1278,19 +1318,20 @@ func testTransactionJournaling(t *testing.T, nolocals bool) {
 	// Create the original pool to inject transaction into the journal
 	db, _ := ethdb.NewMemDatabase()
 	statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
+	blockchain := &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)}
 
 	config := testTxPoolConfig
 	config.NoLocals = nolocals
 	config.Journal = journal
 	config.Rejournal = time.Second
 
-	pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) })
+	pool := NewTxPool(config, params.TestChainConfig, blockchain)
 
 	// Create two test accounts to ensure remotes expire but locals do not
 	local, _ := crypto.GenerateKey()
 	remote, _ := crypto.GenerateKey()
 
-	statedb, _ = pool.currentState()
+	statedb, _ = pool.blockChain.State()
 	statedb.AddBalance(crypto.PubkeyToAddress(local.PublicKey), big.NewInt(1000000000))
 	statedb.AddBalance(crypto.PubkeyToAddress(remote.PublicKey), big.NewInt(1000000000))
 
@@ -1320,7 +1361,8 @@ func testTransactionJournaling(t *testing.T, nolocals bool) {
 	// Terminate the old pool, bump the local nonce, create a new pool and ensure relevant transaction survive
 	pool.Stop()
 	statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1)
-	pool = NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) })
+	blockchain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)}
+	pool = NewTxPool(config, params.TestChainConfig, blockchain)
 
 	pending, queued = pool.Stats()
 	if queued != 0 {
@@ -1344,7 +1386,8 @@ func testTransactionJournaling(t *testing.T, nolocals bool) {
 	time.Sleep(2 * config.Rejournal)
 	pool.Stop()
 	statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1)
-	pool = NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) })
+	blockchain = &testBlockChain{statedb, big.NewInt(1000000), new(event.Feed), new(event.Feed)}
+	pool = NewTxPool(config, params.TestChainConfig, blockchain)
 
 	pending, queued = pool.Stats()
 	if pending != 0 {
@@ -1377,7 +1420,7 @@ func benchmarkPendingDemotion(b *testing.B, size int) {
 	defer pool.Stop()
 
 	account, _ := deriveSender(transaction(0, big.NewInt(0), key))
-	state, _ := pool.currentState()
+	state, _ := pool.blockChain.State()
 	state.AddBalance(account, big.NewInt(1000000))
 
 	for i := 0; i < size; i++ {
@@ -1403,7 +1446,7 @@ func benchmarkFuturePromotion(b *testing.B, size int) {
 	defer pool.Stop()
 
 	account, _ := deriveSender(transaction(0, big.NewInt(0), key))
-	state, _ := pool.currentState()
+	state, _ := pool.blockChain.State()
 	state.AddBalance(account, big.NewInt(1000000))
 
 	for i := 0; i < size; i++ {
@@ -1424,7 +1467,7 @@ func BenchmarkPoolInsert(b *testing.B) {
 	defer pool.Stop()
 
 	account, _ := deriveSender(transaction(0, big.NewInt(0), key))
-	state, _ := pool.currentState()
+	state, _ := pool.blockChain.State()
 	state.AddBalance(account, big.NewInt(1000000))
 
 	txs := make(types.Transactions, b.N)
@@ -1449,7 +1492,7 @@ func benchmarkPoolBatchInsert(b *testing.B, size int) {
 	defer pool.Stop()
 
 	account, _ := deriveSender(transaction(0, big.NewInt(0), key))
-	state, _ := pool.currentState()
+	state, _ := pool.blockChain.State()
 	state.AddBalance(account, big.NewInt(1000000))
 
 	batches := make([]types.Transactions, b.N)
diff --git a/eth/api_backend.go b/eth/api_backend.go
index 7ef7c030ddf898beeb82003a25a130991ff8518a..abf52326b6813396bf39b86c17643e33b1f5d09f 100644
--- a/eth/api_backend.go
+++ b/eth/api_backend.go
@@ -115,6 +115,30 @@ func (b *EthApiBackend) GetEVM(ctx context.Context, msg core.Message, state *sta
 	return vm.NewEVM(context, state, b.eth.chainConfig, vmCfg), vmError, nil
 }
 
+func (b *EthApiBackend) SubscribeRemovedTxEvent(ch chan<- core.RemovedTransactionEvent) event.Subscription {
+	return b.eth.BlockChain().SubscribeRemovedTxEvent(ch)
+}
+
+func (b *EthApiBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription {
+	return b.eth.BlockChain().SubscribeRemovedLogsEvent(ch)
+}
+
+func (b *EthApiBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
+	return b.eth.BlockChain().SubscribeChainEvent(ch)
+}
+
+func (b *EthApiBackend) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription {
+	return b.eth.BlockChain().SubscribeChainHeadEvent(ch)
+}
+
+func (b *EthApiBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription {
+	return b.eth.BlockChain().SubscribeChainSideEvent(ch)
+}
+
+func (b *EthApiBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
+	return b.eth.BlockChain().SubscribeLogsEvent(ch)
+}
+
 func (b *EthApiBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error {
 	return b.eth.txPool.AddLocal(signedTx)
 }
@@ -151,6 +175,10 @@ func (b *EthApiBackend) TxPoolContent() (map[common.Address]types.Transactions,
 	return b.eth.TxPool().Content()
 }
 
+func (b *EthApiBackend) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription {
+	return b.eth.TxPool().SubscribeTxPreEvent(ch)
+}
+
 func (b *EthApiBackend) Downloader() *downloader.Downloader {
 	return b.eth.Downloader()
 }
diff --git a/eth/backend.go b/eth/backend.go
index 8a837f7b882397256b99ca8a0d73c20fae8fa8fd..5f4f2097a7fff2f9133b92eb1771f90c31dd3d25 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -137,7 +137,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
 	}
 
 	vmConfig := vm.Config{EnablePreimageRecording: config.EnablePreimageRecording}
-	eth.blockchain, err = core.NewBlockChain(chainDb, eth.chainConfig, eth.engine, eth.eventMux, vmConfig)
+	eth.blockchain, err = core.NewBlockChain(chainDb, eth.chainConfig, eth.engine, vmConfig)
 	if err != nil {
 		return nil, err
 	}
@@ -151,7 +151,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
 	if config.TxPool.Journal != "" {
 		config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal)
 	}
-	eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.EventMux(), eth.blockchain.State, eth.blockchain.GasLimit)
+	eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain)
 
 	maxPeers := config.MaxPeers
 	if config.LightServ > 0 {
diff --git a/eth/filters/filter.go b/eth/filters/filter.go
index f27b76929e512cbabd840b1306c757ddc06807e7..f848bc6af9307adb23139fd00e940f895ca0071a 100644
--- a/eth/filters/filter.go
+++ b/eth/filters/filter.go
@@ -34,6 +34,10 @@ type Backend interface {
 	EventMux() *event.TypeMux
 	HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error)
 	GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
+	SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription
+	SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
+	SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
+	SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
 }
 
 // Filter can be used to retrieve and filter logs.
diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go
index ab0b7473ea0d8023dce88ef25b2af91987f87174..00ade0ffb251a0f771ac4ab2fb536f278ef4ba30 100644
--- a/eth/filters/filter_system.go
+++ b/eth/filters/filter_system.go
@@ -54,6 +54,19 @@ const (
 	LastIndexSubscription
 )
 
+const (
+
+	// txChanSize is the size of channel listening to TxPreEvent.
+	// The number is referenced from the size of tx pool.
+	txChanSize = 4096
+	// rmLogsChanSize is the size of channel listening to RemovedLogsEvent.
+	rmLogsChanSize = 10
+	// logsChanSize is the size of channel listening to LogsEvent.
+	logsChanSize = 10
+	// chainEvChanSize is the size of channel listening to ChainEvent.
+	chainEvChanSize = 10
+)
+
 var (
 	ErrInvalidSubscriptionID = errors.New("invalid id")
 )
@@ -276,57 +289,50 @@ func (es *EventSystem) SubscribePendingTxEvents(hashes chan common.Hash) *Subscr
 type filterIndex map[Type]map[rpc.ID]*subscription
 
 // broadcast event to filters that match criteria.
-func (es *EventSystem) broadcast(filters filterIndex, ev *event.TypeMuxEvent) {
+func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) {
 	if ev == nil {
 		return
 	}
 
-	switch e := ev.Data.(type) {
+	switch e := ev.(type) {
 	case []*types.Log:
 		if len(e) > 0 {
 			for _, f := range filters[LogsSubscription] {
-				if ev.Time.After(f.created) {
-					if matchedLogs := filterLogs(e, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
-						f.logs <- matchedLogs
-					}
+				if matchedLogs := filterLogs(e, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
+					f.logs <- matchedLogs
 				}
 			}
 		}
 	case core.RemovedLogsEvent:
 		for _, f := range filters[LogsSubscription] {
-			if ev.Time.After(f.created) {
-				if matchedLogs := filterLogs(e.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
-					f.logs <- matchedLogs
-				}
+			if matchedLogs := filterLogs(e.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
+				f.logs <- matchedLogs
 			}
 		}
-	case core.PendingLogsEvent:
-		for _, f := range filters[PendingLogsSubscription] {
-			if ev.Time.After(f.created) {
-				if matchedLogs := filterLogs(e.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
-					f.logs <- matchedLogs
+	case *event.TypeMuxEvent:
+		switch muxe := e.Data.(type) {
+		case core.PendingLogsEvent:
+			for _, f := range filters[PendingLogsSubscription] {
+				if e.Time.After(f.created) {
+					if matchedLogs := filterLogs(muxe.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
+						f.logs <- matchedLogs
+					}
 				}
 			}
 		}
 	case core.TxPreEvent:
 		for _, f := range filters[PendingTransactionsSubscription] {
-			if ev.Time.After(f.created) {
-				f.hashes <- e.Tx.Hash()
-			}
+			f.hashes <- e.Tx.Hash()
 		}
 	case core.ChainEvent:
 		for _, f := range filters[BlocksSubscription] {
-			if ev.Time.After(f.created) {
-				f.headers <- e.Block.Header()
-			}
+			f.headers <- e.Block.Header()
 		}
 		if es.lightMode && len(filters[LogsSubscription]) > 0 {
 			es.lightFilterNewHead(e.Block.Header(), func(header *types.Header, remove bool) {
 				for _, f := range filters[LogsSubscription] {
-					if ev.Time.After(f.created) {
-						if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
-							f.logs <- matchedLogs
-						}
+					if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
+						f.logs <- matchedLogs
 					}
 				}
 			})
@@ -395,9 +401,28 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.
 func (es *EventSystem) eventLoop() {
 	var (
 		index = make(filterIndex)
-		sub   = es.mux.Subscribe(core.PendingLogsEvent{}, core.RemovedLogsEvent{}, []*types.Log{}, core.TxPreEvent{}, core.ChainEvent{})
+		sub   = es.mux.Subscribe(core.PendingLogsEvent{})
+		// Subscribe TxPreEvent form txpool
+		txCh  = make(chan core.TxPreEvent, txChanSize)
+		txSub = es.backend.SubscribeTxPreEvent(txCh)
+		// Subscribe RemovedLogsEvent
+		rmLogsCh  = make(chan core.RemovedLogsEvent, rmLogsChanSize)
+		rmLogsSub = es.backend.SubscribeRemovedLogsEvent(rmLogsCh)
+		// Subscribe []*types.Log
+		logsCh  = make(chan []*types.Log, logsChanSize)
+		logsSub = es.backend.SubscribeLogsEvent(logsCh)
+		// Subscribe ChainEvent
+		chainEvCh  = make(chan core.ChainEvent, chainEvChanSize)
+		chainEvSub = es.backend.SubscribeChainEvent(chainEvCh)
 	)
 
+	// Unsubscribe all events
+	defer sub.Unsubscribe()
+	defer txSub.Unsubscribe()
+	defer rmLogsSub.Unsubscribe()
+	defer logsSub.Unsubscribe()
+	defer chainEvSub.Unsubscribe()
+
 	for i := UnknownSubscription; i < LastIndexSubscription; i++ {
 		index[i] = make(map[rpc.ID]*subscription)
 	}
@@ -409,6 +434,17 @@ func (es *EventSystem) eventLoop() {
 				return
 			}
 			es.broadcast(index, ev)
+
+		// Handle subscribed events
+		case ev := <-txCh:
+			es.broadcast(index, ev)
+		case ev := <-rmLogsCh:
+			es.broadcast(index, ev)
+		case ev := <-logsCh:
+			es.broadcast(index, ev)
+		case ev := <-chainEvCh:
+			es.broadcast(index, ev)
+
 		case f := <-es.install:
 			if f.typ == MinedAndPendingLogsSubscription {
 				// the type are logs and pending logs subscriptions
@@ -427,6 +463,16 @@ func (es *EventSystem) eventLoop() {
 				delete(index[f.typ], f.id)
 			}
 			close(f.err)
+
+		// System stopped
+		case <-txSub.Err():
+			return
+		case <-rmLogsSub.Err():
+			return
+		case <-logsSub.Err():
+			return
+		case <-chainEvSub.Err():
+			return
 		}
 	}
 }
diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go
index 23e6d66e1816be9661a6d48f1ee447a9f8a45327..fcc888b8cf380fb66e28eb46cff8ee81acab78b6 100644
--- a/eth/filters/filter_system_test.go
+++ b/eth/filters/filter_system_test.go
@@ -34,8 +34,12 @@ import (
 )
 
 type testBackend struct {
-	mux *event.TypeMux
-	db  ethdb.Database
+	mux        *event.TypeMux
+	db         ethdb.Database
+	txFeed     *event.Feed
+	rmLogsFeed *event.Feed
+	logsFeed   *event.Feed
+	chainFeed  *event.Feed
 }
 
 func (b *testBackend) ChainDb() ethdb.Database {
@@ -64,6 +68,22 @@ func (b *testBackend) GetReceipts(ctx context.Context, blockHash common.Hash) (t
 	return core.GetBlockReceipts(b.db, blockHash, num), nil
 }
 
+func (b *testBackend) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription {
+	return b.txFeed.Subscribe(ch)
+}
+
+func (b *testBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription {
+	return b.rmLogsFeed.Subscribe(ch)
+}
+
+func (b *testBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
+	return b.logsFeed.Subscribe(ch)
+}
+
+func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
+	return b.chainFeed.Subscribe(ch)
+}
+
 // TestBlockSubscription tests if a block subscription returns block hashes for posted chain events.
 // It creates multiple subscriptions:
 // - one at the start and should receive all posted chain events and a second (blockHashes)
@@ -75,7 +95,11 @@ func TestBlockSubscription(t *testing.T) {
 	var (
 		mux         = new(event.TypeMux)
 		db, _       = ethdb.NewMemDatabase()
-		backend     = &testBackend{mux, db}
+		txFeed      = new(event.Feed)
+		rmLogsFeed  = new(event.Feed)
+		logsFeed    = new(event.Feed)
+		chainFeed   = new(event.Feed)
+		backend     = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
 		api         = NewPublicFilterAPI(backend, false)
 		genesis     = new(core.Genesis).MustCommit(db)
 		chain, _    = core.GenerateChain(params.TestChainConfig, genesis, db, 10, func(i int, gen *core.BlockGen) {})
@@ -114,7 +138,7 @@ func TestBlockSubscription(t *testing.T) {
 
 	time.Sleep(1 * time.Second)
 	for _, e := range chainEvents {
-		mux.Post(e)
+		chainFeed.Send(e)
 	}
 
 	<-sub0.Err()
@@ -126,10 +150,14 @@ func TestPendingTxFilter(t *testing.T) {
 	t.Parallel()
 
 	var (
-		mux     = new(event.TypeMux)
-		db, _   = ethdb.NewMemDatabase()
-		backend = &testBackend{mux, db}
-		api     = NewPublicFilterAPI(backend, false)
+		mux        = new(event.TypeMux)
+		db, _      = ethdb.NewMemDatabase()
+		txFeed     = new(event.Feed)
+		rmLogsFeed = new(event.Feed)
+		logsFeed   = new(event.Feed)
+		chainFeed  = new(event.Feed)
+		backend    = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
+		api        = NewPublicFilterAPI(backend, false)
 
 		transactions = []*types.Transaction{
 			types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil),
@@ -147,9 +175,10 @@ func TestPendingTxFilter(t *testing.T) {
 	time.Sleep(1 * time.Second)
 	for _, tx := range transactions {
 		ev := core.TxPreEvent{Tx: tx}
-		mux.Post(ev)
+		txFeed.Send(ev)
 	}
 
+	timeout := time.Now().Add(1 * time.Second)
 	for {
 		results, err := api.GetFilterChanges(fid0)
 		if err != nil {
@@ -161,10 +190,18 @@ func TestPendingTxFilter(t *testing.T) {
 		if len(hashes) >= len(transactions) {
 			break
 		}
+		// check timeout
+		if time.Now().After(timeout) {
+			break
+		}
 
 		time.Sleep(100 * time.Millisecond)
 	}
 
+	if len(hashes) != len(transactions) {
+		t.Errorf("invalid number of transactions, want %d transactions(s), got %d", len(transactions), len(hashes))
+		return
+	}
 	for i := range hashes {
 		if hashes[i] != transactions[i].Hash() {
 			t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), hashes[i])
@@ -176,10 +213,14 @@ func TestPendingTxFilter(t *testing.T) {
 // If not it must return an error.
 func TestLogFilterCreation(t *testing.T) {
 	var (
-		mux     = new(event.TypeMux)
-		db, _   = ethdb.NewMemDatabase()
-		backend = &testBackend{mux, db}
-		api     = NewPublicFilterAPI(backend, false)
+		mux        = new(event.TypeMux)
+		db, _      = ethdb.NewMemDatabase()
+		txFeed     = new(event.Feed)
+		rmLogsFeed = new(event.Feed)
+		logsFeed   = new(event.Feed)
+		chainFeed  = new(event.Feed)
+		backend    = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
+		api        = NewPublicFilterAPI(backend, false)
 
 		testCases = []struct {
 			crit    FilterCriteria
@@ -221,10 +262,14 @@ func TestInvalidLogFilterCreation(t *testing.T) {
 	t.Parallel()
 
 	var (
-		mux     = new(event.TypeMux)
-		db, _   = ethdb.NewMemDatabase()
-		backend = &testBackend{mux, db}
-		api     = NewPublicFilterAPI(backend, false)
+		mux        = new(event.TypeMux)
+		db, _      = ethdb.NewMemDatabase()
+		txFeed     = new(event.Feed)
+		rmLogsFeed = new(event.Feed)
+		logsFeed   = new(event.Feed)
+		chainFeed  = new(event.Feed)
+		backend    = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
+		api        = NewPublicFilterAPI(backend, false)
 	)
 
 	// different situations where log filter creation should fail.
@@ -242,15 +287,19 @@ func TestInvalidLogFilterCreation(t *testing.T) {
 	}
 }
 
-// TestLogFilter tests whether log filters match the correct logs that are posted to the event mux.
+// TestLogFilter tests whether log filters match the correct logs that are posted to the event feed.
 func TestLogFilter(t *testing.T) {
 	t.Parallel()
 
 	var (
-		mux     = new(event.TypeMux)
-		db, _   = ethdb.NewMemDatabase()
-		backend = &testBackend{mux, db}
-		api     = NewPublicFilterAPI(backend, false)
+		mux        = new(event.TypeMux)
+		db, _      = ethdb.NewMemDatabase()
+		txFeed     = new(event.Feed)
+		rmLogsFeed = new(event.Feed)
+		logsFeed   = new(event.Feed)
+		chainFeed  = new(event.Feed)
+		backend    = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
+		api        = NewPublicFilterAPI(backend, false)
 
 		firstAddr      = common.HexToAddress("0x1111111111111111111111111111111111111111")
 		secondAddr     = common.HexToAddress("0x2222222222222222222222222222222222222222")
@@ -311,8 +360,8 @@ func TestLogFilter(t *testing.T) {
 
 	// raise events
 	time.Sleep(1 * time.Second)
-	if err := mux.Post(allLogs); err != nil {
-		t.Fatal(err)
+	if nsend := logsFeed.Send(allLogs); nsend == 0 {
+		t.Fatal("Shoud have at least one subscription")
 	}
 	if err := mux.Post(core.PendingLogsEvent{Logs: allLogs}); err != nil {
 		t.Fatal(err)
@@ -320,6 +369,7 @@ func TestLogFilter(t *testing.T) {
 
 	for i, tt := range testCases {
 		var fetched []*types.Log
+		timeout := time.Now().Add(1 * time.Second)
 		for { // fetch all expected logs
 			results, err := api.GetFilterChanges(tt.id)
 			if err != nil {
@@ -330,6 +380,10 @@ func TestLogFilter(t *testing.T) {
 			if len(fetched) >= len(tt.expected) {
 				break
 			}
+			// check timeout
+			if time.Now().After(timeout) {
+				break
+			}
 
 			time.Sleep(100 * time.Millisecond)
 		}
@@ -350,15 +404,19 @@ func TestLogFilter(t *testing.T) {
 	}
 }
 
-// TestPendingLogsSubscription tests if a subscription receives the correct pending logs that are posted to the event mux.
+// TestPendingLogsSubscription tests if a subscription receives the correct pending logs that are posted to the event feed.
 func TestPendingLogsSubscription(t *testing.T) {
 	t.Parallel()
 
 	var (
-		mux     = new(event.TypeMux)
-		db, _   = ethdb.NewMemDatabase()
-		backend = &testBackend{mux, db}
-		api     = NewPublicFilterAPI(backend, false)
+		mux        = new(event.TypeMux)
+		db, _      = ethdb.NewMemDatabase()
+		txFeed     = new(event.Feed)
+		rmLogsFeed = new(event.Feed)
+		logsFeed   = new(event.Feed)
+		chainFeed  = new(event.Feed)
+		backend    = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
+		api        = NewPublicFilterAPI(backend, false)
 
 		firstAddr      = common.HexToAddress("0x1111111111111111111111111111111111111111")
 		secondAddr     = common.HexToAddress("0x2222222222222222222222222222222222222222")
@@ -456,6 +514,7 @@ func TestPendingLogsSubscription(t *testing.T) {
 
 	// raise events
 	time.Sleep(1 * time.Second)
+	// allLogs are type of core.PendingLogsEvent
 	for _, l := range allLogs {
 		if err := mux.Post(l); err != nil {
 			t.Fatal(err)
diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go
index b6cfd4bbc3dcf9279147ebc8547832b2bccf9db6..3244c04d74b53e012f4baf23b723e2bcbcf2082f 100644
--- a/eth/filters/filter_test.go
+++ b/eth/filters/filter_test.go
@@ -49,14 +49,18 @@ func BenchmarkMipmaps(b *testing.B) {
 	defer os.RemoveAll(dir)
 
 	var (
-		db, _   = ethdb.NewLDBDatabase(dir, 0, 0)
-		mux     = new(event.TypeMux)
-		backend = &testBackend{mux, db}
-		key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
-		addr1   = crypto.PubkeyToAddress(key1.PublicKey)
-		addr2   = common.BytesToAddress([]byte("jeff"))
-		addr3   = common.BytesToAddress([]byte("ethereum"))
-		addr4   = common.BytesToAddress([]byte("random addresses please"))
+		db, _      = ethdb.NewLDBDatabase(dir, 0, 0)
+		mux        = new(event.TypeMux)
+		txFeed     = new(event.Feed)
+		rmLogsFeed = new(event.Feed)
+		logsFeed   = new(event.Feed)
+		chainFeed  = new(event.Feed)
+		backend    = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
+		key1, _    = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
+		addr1      = crypto.PubkeyToAddress(key1.PublicKey)
+		addr2      = common.BytesToAddress([]byte("jeff"))
+		addr3      = common.BytesToAddress([]byte("ethereum"))
+		addr4      = common.BytesToAddress([]byte("random addresses please"))
 	)
 	defer db.Close()
 
@@ -119,11 +123,15 @@ func TestFilters(t *testing.T) {
 	defer os.RemoveAll(dir)
 
 	var (
-		db, _   = ethdb.NewLDBDatabase(dir, 0, 0)
-		mux     = new(event.TypeMux)
-		backend = &testBackend{mux, db}
-		key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
-		addr    = crypto.PubkeyToAddress(key1.PublicKey)
+		db, _      = ethdb.NewLDBDatabase(dir, 0, 0)
+		mux        = new(event.TypeMux)
+		txFeed     = new(event.Feed)
+		rmLogsFeed = new(event.Feed)
+		logsFeed   = new(event.Feed)
+		chainFeed  = new(event.Feed)
+		backend    = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
+		key1, _    = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
+		addr       = crypto.PubkeyToAddress(key1.PublicKey)
 
 		hash1 = common.BytesToHash([]byte("topic1"))
 		hash2 = common.BytesToHash([]byte("topic2"))
diff --git a/eth/handler.go b/eth/handler.go
index e6a9c86d7945f3e4d03c922d930e67df048bb1a8..9d230a4ad65a3ba68fb479ea22d74a1818203c60 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -45,6 +45,10 @@ import (
 const (
 	softResponseLimit = 2 * 1024 * 1024 // Target maximum size of returned blocks, headers or node data.
 	estHeaderRlpSize  = 500             // Approximate size of an RLP encoded block header
+
+	// txChanSize is the size of channel listening to TxPreEvent.
+	// The number is referenced from the size of tx pool.
+	txChanSize = 4096
 )
 
 var (
@@ -78,7 +82,8 @@ type ProtocolManager struct {
 	SubProtocols []p2p.Protocol
 
 	eventMux      *event.TypeMux
-	txSub         *event.TypeMuxSubscription
+	txCh          chan core.TxPreEvent
+	txSub         event.Subscription
 	minedBlockSub *event.TypeMuxSubscription
 
 	// channels for fetcher, syncer, txsyncLoop
@@ -200,7 +205,8 @@ func (pm *ProtocolManager) removePeer(id string) {
 
 func (pm *ProtocolManager) Start() {
 	// broadcast transactions
-	pm.txSub = pm.eventMux.Subscribe(core.TxPreEvent{})
+	pm.txCh = make(chan core.TxPreEvent, txChanSize)
+	pm.txSub = pm.txpool.SubscribeTxPreEvent(pm.txCh)
 	go pm.txBroadcastLoop()
 	// broadcast mined blocks
 	pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
@@ -724,10 +730,15 @@ func (self *ProtocolManager) minedBroadcastLoop() {
 }
 
 func (self *ProtocolManager) txBroadcastLoop() {
-	// automatically stops if unsubscribe
-	for obj := range self.txSub.Chan() {
-		event := obj.Data.(core.TxPreEvent)
-		self.BroadcastTx(event.Tx.Hash(), event.Tx)
+	for {
+		select {
+		case event := <-self.txCh:
+			self.BroadcastTx(event.Tx.Hash(), event.Tx)
+
+		// Err() channel will be closed when unsubscribing.
+		case <-self.txSub.Err():
+			return
+		}
 	}
 }
 
diff --git a/eth/handler_test.go b/eth/handler_test.go
index ca9c9e1b40cfe53d3b3691d64f160ed666a19141..aba2774444017a2bb7a4d52dbca995c9fc488e0d 100644
--- a/eth/handler_test.go
+++ b/eth/handler_test.go
@@ -474,7 +474,7 @@ func testDAOChallenge(t *testing.T, localForked, remoteForked bool, timeout bool
 		config        = &params.ChainConfig{DAOForkBlock: big.NewInt(1), DAOForkSupport: localForked}
 		gspec         = &core.Genesis{Config: config}
 		genesis       = gspec.MustCommit(db)
-		blockchain, _ = core.NewBlockChain(db, config, pow, evmux, vm.Config{})
+		blockchain, _ = core.NewBlockChain(db, config, pow, vm.Config{})
 	)
 	pm, err := NewProtocolManager(config, downloader.FullSync, DefaultConfig.NetworkId, 1000, evmux, new(testTxPool), pow, blockchain, db)
 	if err != nil {
diff --git a/eth/helper_test.go b/eth/helper_test.go
index 546478a3eecb47b892db6b2c51f78c50377e60cb..f1dab95283ee048b4b76196145bb1428596f015b 100644
--- a/eth/helper_test.go
+++ b/eth/helper_test.go
@@ -59,7 +59,7 @@ func newTestProtocolManager(mode downloader.SyncMode, blocks int, generator func
 			Alloc:  core.GenesisAlloc{testBank: {Balance: big.NewInt(1000000)}},
 		}
 		genesis       = gspec.MustCommit(db)
-		blockchain, _ = core.NewBlockChain(db, gspec.Config, engine, evmux, vm.Config{})
+		blockchain, _ = core.NewBlockChain(db, gspec.Config, engine, vm.Config{})
 	)
 	chain, _ := core.GenerateChain(gspec.Config, genesis, db, blocks, generator)
 	if _, err := blockchain.InsertChain(chain); err != nil {
@@ -88,8 +88,9 @@ func newTestProtocolManagerMust(t *testing.T, mode downloader.SyncMode, blocks i
 
 // testTxPool is a fake, helper transaction pool for testing purposes
 type testTxPool struct {
-	pool  []*types.Transaction        // Collection of all transactions
-	added chan<- []*types.Transaction // Notification channel for new transactions
+	txFeed event.Feed
+	pool   []*types.Transaction        // Collection of all transactions
+	added  chan<- []*types.Transaction // Notification channel for new transactions
 
 	lock sync.RWMutex // Protects the transaction pool
 }
@@ -124,6 +125,10 @@ func (p *testTxPool) Pending() (map[common.Address]types.Transactions, error) {
 	return batches, nil
 }
 
+func (p *testTxPool) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription {
+	return p.txFeed.Subscribe(ch)
+}
+
 // newTestTransaction create a new dummy transaction.
 func newTestTransaction(from *ecdsa.PrivateKey, nonce uint64, datasize int) *types.Transaction {
 	tx := types.NewTransaction(nonce, common.Address{}, big.NewInt(0), big.NewInt(100000), big.NewInt(0), make([]byte, datasize))
diff --git a/eth/protocol.go b/eth/protocol.go
index 376e4663e3a5454d37fe2f8145c921da0759ddd1..2c41376fa8daa68886edd590d71c6dc8a3b66e0e 100644
--- a/eth/protocol.go
+++ b/eth/protocol.go
@@ -22,7 +22,9 @@ import (
 	"math/big"
 
 	"github.com/ethereum/go-ethereum/common"
+	"github.com/ethereum/go-ethereum/core"
 	"github.com/ethereum/go-ethereum/core/types"
+	"github.com/ethereum/go-ethereum/event"
 	"github.com/ethereum/go-ethereum/rlp"
 )
 
@@ -100,6 +102,10 @@ type txPool interface {
 	// Pending should return pending transactions.
 	// The slice should be modifiable by the caller.
 	Pending() (map[common.Address]types.Transactions, error)
+
+	// SubscribeTxPreEvent should return an event subscription of
+	// TxPreEvent and send events to the given channel.
+	SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription
 }
 
 // statusData is the network packet for the status message.
diff --git a/ethstats/ethstats.go b/ethstats/ethstats.go
index b75c5e6da4a73b652a007c3d44aad08f26117ad0..bb03dc72b0c78787180e48d315cf906bb88fa33e 100644
--- a/ethstats/ethstats.go
+++ b/ethstats/ethstats.go
@@ -44,9 +44,27 @@ import (
 	"golang.org/x/net/websocket"
 )
 
-// historyUpdateRange is the number of blocks a node should report upon login or
-// history request.
-const historyUpdateRange = 50
+const (
+	// historyUpdateRange is the number of blocks a node should report upon login or
+	// history request.
+	historyUpdateRange = 50
+
+	// txChanSize is the size of channel listening to TxPreEvent.
+	// The number is referenced from the size of tx pool.
+	txChanSize = 4096
+	// chainHeadChanSize is the size of channel listening to ChainHeadEvent.
+	chainHeadChanSize = 10
+)
+
+type txPool interface {
+	// SubscribeTxPreEvent should return an event subscription of
+	// TxPreEvent and send events to the given channel.
+	SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription
+}
+
+type blockChain interface {
+	SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription
+}
 
 // Service implements an Ethereum netstats reporting daemon that pushes local
 // chain statistics up to a monitoring server.
@@ -118,16 +136,22 @@ func (s *Service) Stop() error {
 // until termination.
 func (s *Service) loop() {
 	// Subscribe to chain events to execute updates on
-	var emux *event.TypeMux
+	var blockchain blockChain
+	var txpool txPool
 	if s.eth != nil {
-		emux = s.eth.EventMux()
+		blockchain = s.eth.BlockChain()
+		txpool = s.eth.TxPool()
 	} else {
-		emux = s.les.EventMux()
+		blockchain = s.les.BlockChain()
+		txpool = s.les.TxPool()
 	}
-	headSub := emux.Subscribe(core.ChainHeadEvent{})
+
+	chainHeadCh := make(chan core.ChainHeadEvent, chainHeadChanSize)
+	headSub := blockchain.SubscribeChainHeadEvent(chainHeadCh)
 	defer headSub.Unsubscribe()
 
-	txSub := emux.Subscribe(core.TxPreEvent{})
+	txEventCh := make(chan core.TxPreEvent, txChanSize)
+	txSub := txpool.SubscribeTxPreEvent(txEventCh)
 	defer txSub.Unsubscribe()
 
 	// Start a goroutine that exhausts the subsciptions to avoid events piling up
@@ -139,25 +163,18 @@ func (s *Service) loop() {
 	go func() {
 		var lastTx mclock.AbsTime
 
+	HandleLoop:
 		for {
 			select {
 			// Notify of chain head events, but drop if too frequent
-			case head, ok := <-headSub.Chan():
-				if !ok { // node stopped
-					close(quitCh)
-					return
-				}
+			case head := <-chainHeadCh:
 				select {
-				case headCh <- head.Data.(core.ChainHeadEvent).Block:
+				case headCh <- head.Block:
 				default:
 				}
 
 			// Notify of new transaction events, but drop if too frequent
-			case _, ok := <-txSub.Chan():
-				if !ok { // node stopped
-					close(quitCh)
-					return
-				}
+			case <-txEventCh:
 				if time.Duration(mclock.Now()-lastTx) < time.Second {
 					continue
 				}
@@ -167,8 +184,16 @@ func (s *Service) loop() {
 				case txCh <- struct{}{}:
 				default:
 				}
+
+			// node stopped
+			case <-txSub.Err():
+				break HandleLoop
+			case <-headSub.Err():
+				break HandleLoop
 			}
 		}
+		close(quitCh)
+		return
 	}()
 	// Loop reporting until termination
 	for {
diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go
index d122b7915b0e9efb9bd7fc924e9f7e647bdfd922..be17ffeae4a5200537ce9dd976de7b078c81293d 100644
--- a/internal/ethapi/backend.go
+++ b/internal/ethapi/backend.go
@@ -53,6 +53,9 @@ type Backend interface {
 	GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
 	GetTd(blockHash common.Hash) *big.Int
 	GetEVM(ctx context.Context, msg core.Message, state *state.StateDB, header *types.Header, vmCfg vm.Config) (*vm.EVM, func() error, error)
+	SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
+	SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription
+	SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription
 
 	// TxPool API
 	SendTx(ctx context.Context, signedTx *types.Transaction) error
@@ -62,6 +65,7 @@ type Backend interface {
 	GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error)
 	Stats() (pending int, queued int)
 	TxPoolContent() (map[common.Address]types.Transactions, map[common.Address]types.Transactions)
+	SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription
 
 	ChainConfig() *params.ChainConfig
 	CurrentBlock() *types.Block
diff --git a/les/api_backend.go b/les/api_backend.go
index 7a3c2447c900e5416e454b3b84fdb2c5ed533ca0..1323e88644c183c9c0c432939690ac61096fd747 100644
--- a/les/api_backend.go
+++ b/les/api_backend.go
@@ -124,6 +124,30 @@ func (b *LesApiBackend) TxPoolContent() (map[common.Address]types.Transactions,
 	return b.eth.txPool.Content()
 }
 
+func (b *LesApiBackend) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription {
+	return b.eth.txPool.SubscribeTxPreEvent(ch)
+}
+
+func (b *LesApiBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
+	return b.eth.blockchain.SubscribeChainEvent(ch)
+}
+
+func (b *LesApiBackend) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription {
+	return b.eth.blockchain.SubscribeChainHeadEvent(ch)
+}
+
+func (b *LesApiBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription {
+	return b.eth.blockchain.SubscribeChainSideEvent(ch)
+}
+
+func (b *LesApiBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
+	return b.eth.blockchain.SubscribeLogsEvent(ch)
+}
+
+func (b *LesApiBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription {
+	return b.eth.blockchain.SubscribeRemovedLogsEvent(ch)
+}
+
 func (b *LesApiBackend) Downloader() *downloader.Downloader {
 	return b.eth.Downloader()
 }
diff --git a/les/backend.go b/les/backend.go
index a4e7726717859b46a86c3657b2977eaa156b45c4..4c33417c03ec5ee53317ee6eac77075b0e1c5206 100644
--- a/les/backend.go
+++ b/les/backend.go
@@ -103,7 +103,7 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
 	eth.serverPool = newServerPool(chainDb, quitSync, &eth.wg)
 	eth.retriever = newRetrieveManager(peers, eth.reqDist, eth.serverPool)
 	eth.odr = NewLesOdr(chainDb, eth.retriever)
-	if eth.blockchain, err = light.NewLightChain(eth.odr, eth.chainConfig, eth.engine, eth.eventMux); err != nil {
+	if eth.blockchain, err = light.NewLightChain(eth.odr, eth.chainConfig, eth.engine); err != nil {
 		return nil, err
 	}
 	// Rewind the chain in case of an incompatible config upgrade.
@@ -113,7 +113,7 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
 		core.WriteChainConfig(chainDb, genesisHash, chainConfig)
 	}
 
-	eth.txPool = light.NewTxPool(eth.chainConfig, eth.eventMux, eth.blockchain, eth.relay)
+	eth.txPool = light.NewTxPool(eth.chainConfig, eth.blockchain, eth.relay)
 	if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, true, config.NetworkId, eth.eventMux, eth.engine, eth.peers, eth.blockchain, nil, chainDb, eth.odr, eth.relay, quitSync, &eth.wg); err != nil {
 		return nil, err
 	}
diff --git a/les/handler.go b/les/handler.go
index 234b6e998b7bb81d81a827bfa153029de16e63a7..1a75cd369d6a3ac08425a3a81453f5d98c91195e 100644
--- a/les/handler.go
+++ b/les/handler.go
@@ -82,6 +82,7 @@ type BlockChain interface {
 	GetBlockHashesFromHash(hash common.Hash, max uint64) []common.Hash
 	LastBlockHash() common.Hash
 	Genesis() *types.Block
+	SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription
 }
 
 type txPool interface {
diff --git a/les/helper_test.go b/les/helper_test.go
index 7dccfc458237c970aacc8c7fb0d059c48f6d5c2a..b33454e1d0c218d5abaf6d118a408a69b4b907c2 100644
--- a/les/helper_test.go
+++ b/les/helper_test.go
@@ -146,9 +146,9 @@ func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *cor
 	}
 
 	if lightSync {
-		chain, _ = light.NewLightChain(odr, gspec.Config, engine, evmux)
+		chain, _ = light.NewLightChain(odr, gspec.Config, engine)
 	} else {
-		blockchain, _ := core.NewBlockChain(db, gspec.Config, engine, evmux, vm.Config{})
+		blockchain, _ := core.NewBlockChain(db, gspec.Config, engine, vm.Config{})
 		gchain, _ := core.GenerateChain(gspec.Config, genesis, db, blocks, generator)
 		if _, err := blockchain.InsertChain(gchain); err != nil {
 			panic(err)
diff --git a/les/server.go b/les/server.go
index 39ef0efffe5830a7e1a35928a9e7c4d5390fd828..8b27307141ac9527c747c2af4d5e285290de6577 100644
--- a/les/server.go
+++ b/les/server.go
@@ -271,7 +271,8 @@ func (s *requestCostStats) update(msgCode, reqCnt, cost uint64) {
 
 func (pm *ProtocolManager) blockLoop() {
 	pm.wg.Add(1)
-	sub := pm.eventMux.Subscribe(core.ChainHeadEvent{})
+	headCh := make(chan core.ChainHeadEvent, 10)
+	headSub := pm.blockchain.SubscribeChainHeadEvent(headCh)
 	newCht := make(chan struct{}, 10)
 	newCht <- struct{}{}
 	go func() {
@@ -280,10 +281,10 @@ func (pm *ProtocolManager) blockLoop() {
 		lastBroadcastTd := common.Big0
 		for {
 			select {
-			case ev := <-sub.Chan():
+			case ev := <-headCh:
 				peers := pm.peers.AllPeers()
 				if len(peers) > 0 {
-					header := ev.Data.(core.ChainHeadEvent).Block.Header()
+					header := ev.Block.Header()
 					hash := header.Hash()
 					number := header.Number.Uint64()
 					td := core.GetTd(pm.chainDb, hash, number)
@@ -319,7 +320,7 @@ func (pm *ProtocolManager) blockLoop() {
 					}
 				}()
 			case <-pm.quitSync:
-				sub.Unsubscribe()
+				headSub.Unsubscribe()
 				pm.wg.Done()
 				return
 			}
diff --git a/light/lightchain.go b/light/lightchain.go
index a51043975f8e0a6cc0caca20abb15c552bb87637..df194ecadf97c0278abcb8d2a5c418e7fc4cd64a 100644
--- a/light/lightchain.go
+++ b/light/lightchain.go
@@ -44,11 +44,14 @@ var (
 // headers, downloading block bodies and receipts on demand through an ODR
 // interface. It only does header validation during chain insertion.
 type LightChain struct {
-	hc           *core.HeaderChain
-	chainDb      ethdb.Database
-	odr          OdrBackend
-	eventMux     *event.TypeMux
-	genesisBlock *types.Block
+	hc            *core.HeaderChain
+	chainDb       ethdb.Database
+	odr           OdrBackend
+	chainFeed     event.Feed
+	chainSideFeed event.Feed
+	chainHeadFeed event.Feed
+	scope         event.SubscriptionScope
+	genesisBlock  *types.Block
 
 	mu      sync.RWMutex
 	chainmu sync.RWMutex
@@ -69,7 +72,7 @@ type LightChain struct {
 // NewLightChain returns a fully initialised light chain using information
 // available in the database. It initialises the default Ethereum header
 // validator.
-func NewLightChain(odr OdrBackend, config *params.ChainConfig, engine consensus.Engine, mux *event.TypeMux) (*LightChain, error) {
+func NewLightChain(odr OdrBackend, config *params.ChainConfig, engine consensus.Engine) (*LightChain, error) {
 	bodyCache, _ := lru.New(bodyCacheLimit)
 	bodyRLPCache, _ := lru.New(bodyCacheLimit)
 	blockCache, _ := lru.New(blockCacheLimit)
@@ -77,7 +80,6 @@ func NewLightChain(odr OdrBackend, config *params.ChainConfig, engine consensus.
 	bc := &LightChain{
 		chainDb:      odr.Database(),
 		odr:          odr,
-		eventMux:     mux,
 		quit:         make(chan struct{}),
 		bodyCache:    bodyCache,
 		bodyRLPCache: bodyRLPCache,
@@ -316,16 +318,18 @@ func (self *LightChain) Rollback(chain []common.Hash) {
 }
 
 // postChainEvents iterates over the events generated by a chain insertion and
-// posts them into the event mux.
+// posts them into the event feed.
 func (self *LightChain) postChainEvents(events []interface{}) {
 	for _, event := range events {
-		if event, ok := event.(core.ChainEvent); ok {
-			if self.LastBlockHash() == event.Hash {
-				self.eventMux.Post(core.ChainHeadEvent{Block: event.Block})
+		switch ev := event.(type) {
+		case core.ChainEvent:
+			if self.LastBlockHash() == ev.Hash {
+				self.chainHeadFeed.Send(core.ChainHeadEvent{Block: ev.Block})
 			}
+			self.chainFeed.Send(ev)
+		case core.ChainSideEvent:
+			self.chainSideFeed.Send(ev)
 		}
-		// Fire the insertion events individually too
-		self.eventMux.Post(event)
 	}
 }
 
@@ -467,3 +471,30 @@ func (self *LightChain) LockChain() {
 func (self *LightChain) UnlockChain() {
 	self.chainmu.RUnlock()
 }
+
+// SubscribeChainEvent registers a subscription of ChainEvent.
+func (self *LightChain) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
+	return self.scope.Track(self.chainFeed.Subscribe(ch))
+}
+
+// SubscribeChainHeadEvent registers a subscription of ChainHeadEvent.
+func (self *LightChain) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription {
+	return self.scope.Track(self.chainHeadFeed.Subscribe(ch))
+}
+
+// SubscribeChainSideEvent registers a subscription of ChainSideEvent.
+func (self *LightChain) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription {
+	return self.scope.Track(self.chainSideFeed.Subscribe(ch))
+}
+
+// SubscribeLogsEvent implements the interface of filters.Backend
+// LightChain does not send logs events, so return an empty subscription.
+func (self *LightChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
+	return self.scope.Track(new(event.Feed).Subscribe(ch))
+}
+
+// SubscribeRemovedLogsEvent implements the interface of filters.Backend
+// LightChain does not send core.RemovedLogsEvent, so return an empty subscription.
+func (self *LightChain) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription {
+	return self.scope.Track(new(event.Feed).Subscribe(ch))
+}
diff --git a/light/lightchain_test.go b/light/lightchain_test.go
index 0ad64052562eb14942f0578d4cdd650a219ac843..40a4d396a83d9168a8481757d32415cca49a4dff 100644
--- a/light/lightchain_test.go
+++ b/light/lightchain_test.go
@@ -26,7 +26,6 @@ import (
 	"github.com/ethereum/go-ethereum/core"
 	"github.com/ethereum/go-ethereum/core/types"
 	"github.com/ethereum/go-ethereum/ethdb"
-	"github.com/ethereum/go-ethereum/event"
 	"github.com/ethereum/go-ethereum/params"
 )
 
@@ -55,7 +54,7 @@ func newCanonical(n int) (ethdb.Database, *LightChain, error) {
 	db, _ := ethdb.NewMemDatabase()
 	gspec := core.Genesis{Config: params.TestChainConfig}
 	genesis := gspec.MustCommit(db)
-	blockchain, _ := NewLightChain(&dummyOdr{db: db}, gspec.Config, ethash.NewFaker(), new(event.TypeMux))
+	blockchain, _ := NewLightChain(&dummyOdr{db: db}, gspec.Config, ethash.NewFaker())
 
 	// Create and inject the requested chain
 	if n == 0 {
@@ -75,7 +74,7 @@ func newTestLightChain() *LightChain {
 		Config:     params.TestChainConfig,
 	}
 	gspec.MustCommit(db)
-	lc, err := NewLightChain(&dummyOdr{db: db}, gspec.Config, ethash.NewFullFaker(), new(event.TypeMux))
+	lc, err := NewLightChain(&dummyOdr{db: db}, gspec.Config, ethash.NewFullFaker())
 	if err != nil {
 		panic(err)
 	}
@@ -339,7 +338,7 @@ func TestReorgBadHeaderHashes(t *testing.T) {
 	defer func() { delete(core.BadHashes, headers[3].Hash()) }()
 
 	// Create a new LightChain and check that it rolled back the state.
-	ncm, err := NewLightChain(&dummyOdr{db: bc.chainDb}, params.TestChainConfig, ethash.NewFaker(), new(event.TypeMux))
+	ncm, err := NewLightChain(&dummyOdr{db: bc.chainDb}, params.TestChainConfig, ethash.NewFaker())
 	if err != nil {
 		t.Fatalf("failed to create new chain manager: %v", err)
 	}
diff --git a/light/odr_test.go b/light/odr_test.go
index 544b64effc801a9c4ea9b1cfb55f4cf5f0e11fb5..bd1e976e81db1d5b86a61615f4d2e63b81454adb 100644
--- a/light/odr_test.go
+++ b/light/odr_test.go
@@ -33,7 +33,6 @@ import (
 	"github.com/ethereum/go-ethereum/core/vm"
 	"github.com/ethereum/go-ethereum/crypto"
 	"github.com/ethereum/go-ethereum/ethdb"
-	"github.com/ethereum/go-ethereum/event"
 	"github.com/ethereum/go-ethereum/params"
 	"github.com/ethereum/go-ethereum/rlp"
 	"github.com/ethereum/go-ethereum/trie"
@@ -233,7 +232,6 @@ func testChainGen(i int, block *core.BlockGen) {
 
 func testChainOdr(t *testing.T, protocol int, fn odrTestFn) {
 	var (
-		evmux   = new(event.TypeMux)
 		sdb, _  = ethdb.NewMemDatabase()
 		ldb, _  = ethdb.NewMemDatabase()
 		gspec   = core.Genesis{Alloc: core.GenesisAlloc{testBankAddress: {Balance: testBankFunds}}}
@@ -241,14 +239,14 @@ func testChainOdr(t *testing.T, protocol int, fn odrTestFn) {
 	)
 	gspec.MustCommit(ldb)
 	// Assemble the test environment
-	blockchain, _ := core.NewBlockChain(sdb, params.TestChainConfig, ethash.NewFullFaker(), evmux, vm.Config{})
+	blockchain, _ := core.NewBlockChain(sdb, params.TestChainConfig, ethash.NewFullFaker(), vm.Config{})
 	gchain, _ := core.GenerateChain(params.TestChainConfig, genesis, sdb, 4, testChainGen)
 	if _, err := blockchain.InsertChain(gchain); err != nil {
 		t.Fatal(err)
 	}
 
 	odr := &testOdr{sdb: sdb, ldb: ldb}
-	lightchain, err := NewLightChain(odr, params.TestChainConfig, ethash.NewFullFaker(), evmux)
+	lightchain, err := NewLightChain(odr, params.TestChainConfig, ethash.NewFullFaker())
 	if err != nil {
 		t.Fatal(err)
 	}
diff --git a/light/trie_test.go b/light/trie_test.go
index 9b2cf7c2b12328ac086d07344d4ebbd5b20f4c29..5f45c01af9ab07e1e338c21baee120468e40c72a 100644
--- a/light/trie_test.go
+++ b/light/trie_test.go
@@ -28,7 +28,6 @@ import (
 	"github.com/ethereum/go-ethereum/core/state"
 	"github.com/ethereum/go-ethereum/core/vm"
 	"github.com/ethereum/go-ethereum/ethdb"
-	"github.com/ethereum/go-ethereum/event"
 	"github.com/ethereum/go-ethereum/params"
 	"github.com/ethereum/go-ethereum/trie"
 )
@@ -41,7 +40,7 @@ func TestNodeIterator(t *testing.T) {
 		genesis    = gspec.MustCommit(fulldb)
 	)
 	gspec.MustCommit(lightdb)
-	blockchain, _ := core.NewBlockChain(fulldb, params.TestChainConfig, ethash.NewFullFaker(), new(event.TypeMux), vm.Config{})
+	blockchain, _ := core.NewBlockChain(fulldb, params.TestChainConfig, ethash.NewFullFaker(), vm.Config{})
 	gchain, _ := core.GenerateChain(params.TestChainConfig, genesis, fulldb, 4, testChainGen)
 	if _, err := blockchain.InsertChain(gchain); err != nil {
 		panic(err)
diff --git a/light/txpool.go b/light/txpool.go
index 7cbb991e85894a6e71f20bf5c143a7a58380d56b..bd215b99288351baf24d1362f845e9c6b56e6e31 100644
--- a/light/txpool.go
+++ b/light/txpool.go
@@ -33,6 +33,11 @@ import (
 	"github.com/ethereum/go-ethereum/rlp"
 )
 
+const (
+	// chainHeadChanSize is the size of channel listening to ChainHeadEvent.
+	chainHeadChanSize = 10
+)
+
 // txPermanent is the number of mined blocks after a mined transaction is
 // considered permanent and no rollback is expected
 var txPermanent = uint64(500)
@@ -43,21 +48,23 @@ var txPermanent = uint64(500)
 // always receive all locally signed transactions in the same order as they are
 // created.
 type TxPool struct {
-	config   *params.ChainConfig
-	signer   types.Signer
-	quit     chan bool
-	eventMux *event.TypeMux
-	events   *event.TypeMuxSubscription
-	mu       sync.RWMutex
-	chain    *LightChain
-	odr      OdrBackend
-	chainDb  ethdb.Database
-	relay    TxRelayBackend
-	head     common.Hash
-	nonce    map[common.Address]uint64            // "pending" nonce
-	pending  map[common.Hash]*types.Transaction   // pending transactions by tx hash
-	mined    map[common.Hash][]*types.Transaction // mined transactions by block hash
-	clearIdx uint64                               // earliest block nr that can contain mined tx info
+	config       *params.ChainConfig
+	signer       types.Signer
+	quit         chan bool
+	txFeed       event.Feed
+	scope        event.SubscriptionScope
+	chainHeadCh  chan core.ChainHeadEvent
+	chainHeadSub event.Subscription
+	mu           sync.RWMutex
+	chain        *LightChain
+	odr          OdrBackend
+	chainDb      ethdb.Database
+	relay        TxRelayBackend
+	head         common.Hash
+	nonce        map[common.Address]uint64            // "pending" nonce
+	pending      map[common.Hash]*types.Transaction   // pending transactions by tx hash
+	mined        map[common.Hash][]*types.Transaction // mined transactions by block hash
+	clearIdx     uint64                               // earliest block nr that can contain mined tx info
 
 	homestead bool
 }
@@ -78,23 +85,24 @@ type TxRelayBackend interface {
 }
 
 // NewTxPool creates a new light transaction pool
-func NewTxPool(config *params.ChainConfig, eventMux *event.TypeMux, chain *LightChain, relay TxRelayBackend) *TxPool {
+func NewTxPool(config *params.ChainConfig, chain *LightChain, relay TxRelayBackend) *TxPool {
 	pool := &TxPool{
-		config:   config,
-		signer:   types.NewEIP155Signer(config.ChainId),
-		nonce:    make(map[common.Address]uint64),
-		pending:  make(map[common.Hash]*types.Transaction),
-		mined:    make(map[common.Hash][]*types.Transaction),
-		quit:     make(chan bool),
-		eventMux: eventMux,
-		events:   eventMux.Subscribe(core.ChainHeadEvent{}),
-		chain:    chain,
-		relay:    relay,
-		odr:      chain.Odr(),
-		chainDb:  chain.Odr().Database(),
-		head:     chain.CurrentHeader().Hash(),
-		clearIdx: chain.CurrentHeader().Number.Uint64(),
-	}
+		config:      config,
+		signer:      types.NewEIP155Signer(config.ChainId),
+		nonce:       make(map[common.Address]uint64),
+		pending:     make(map[common.Hash]*types.Transaction),
+		mined:       make(map[common.Hash][]*types.Transaction),
+		quit:        make(chan bool),
+		chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
+		chain:       chain,
+		relay:       relay,
+		odr:         chain.Odr(),
+		chainDb:     chain.Odr().Database(),
+		head:        chain.CurrentHeader().Hash(),
+		clearIdx:    chain.CurrentHeader().Number.Uint64(),
+	}
+	// Subscribe events from blockchain
+	pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)
 	go pool.eventLoop()
 
 	return pool
@@ -274,13 +282,17 @@ const blockCheckTimeout = time.Second * 3
 // eventLoop processes chain head events and also notifies the tx relay backend
 // about the new head hash and tx state changes
 func (pool *TxPool) eventLoop() {
-	for ev := range pool.events.Chan() {
-		switch ev.Data.(type) {
-		case core.ChainHeadEvent:
-			pool.setNewHead(ev.Data.(core.ChainHeadEvent).Block.Header())
+	for {
+		select {
+		case ev := <-pool.chainHeadCh:
+			pool.setNewHead(ev.Block.Header())
 			// hack in order to avoid hogging the lock; this part will
 			// be replaced by a subsequent PR.
 			time.Sleep(time.Millisecond)
+
+		// System stopped
+		case <-pool.chainHeadSub.Err():
+			return
 		}
 	}
 }
@@ -301,11 +313,20 @@ func (pool *TxPool) setNewHead(head *types.Header) {
 
 // Stop stops the light transaction pool
 func (pool *TxPool) Stop() {
+	// Unsubscribe all subscriptions registered from txpool
+	pool.scope.Close()
+	// Unsubscribe subscriptions registered from blockchain
+	pool.chainHeadSub.Unsubscribe()
 	close(pool.quit)
-	pool.events.Unsubscribe()
 	log.Info("Transaction pool stopped")
 }
 
+// SubscribeTxPreEvent registers a subscription of core.TxPreEvent and
+// starts sending event to the given channel.
+func (pool *TxPool) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription {
+	return pool.scope.Track(pool.txFeed.Subscribe(ch))
+}
+
 // Stats returns the number of currently pending (locally created) transactions
 func (pool *TxPool) Stats() (pending int) {
 	pool.mu.RLock()
@@ -388,7 +409,7 @@ func (self *TxPool) add(ctx context.Context, tx *types.Transaction) error {
 		// Notify the subscribers. This event is posted in a goroutine
 		// because it's possible that somewhere during the post "Remove transaction"
 		// gets called which will then wait for the global tx pool lock and deadlock.
-		go self.eventMux.Post(core.TxPreEvent{Tx: tx})
+		go self.txFeed.Send(core.TxPreEvent{Tx: tx})
 	}
 
 	// Print a log message if low enough level is set
diff --git a/light/txpool_test.go b/light/txpool_test.go
index f23832a4174580bb442a463c36c018b1b9bbb3c1..fe7936ac28a16003996cdc4b45f72553b633872c 100644
--- a/light/txpool_test.go
+++ b/light/txpool_test.go
@@ -29,7 +29,6 @@ import (
 	"github.com/ethereum/go-ethereum/core/types"
 	"github.com/ethereum/go-ethereum/core/vm"
 	"github.com/ethereum/go-ethereum/ethdb"
-	"github.com/ethereum/go-ethereum/event"
 	"github.com/ethereum/go-ethereum/params"
 )
 
@@ -82,7 +81,6 @@ func TestTxPool(t *testing.T) {
 	}
 
 	var (
-		evmux   = new(event.TypeMux)
 		sdb, _  = ethdb.NewMemDatabase()
 		ldb, _  = ethdb.NewMemDatabase()
 		gspec   = core.Genesis{Alloc: core.GenesisAlloc{testBankAddress: {Balance: testBankFunds}}}
@@ -90,7 +88,7 @@ func TestTxPool(t *testing.T) {
 	)
 	gspec.MustCommit(ldb)
 	// Assemble the test environment
-	blockchain, _ := core.NewBlockChain(sdb, params.TestChainConfig, ethash.NewFullFaker(), evmux, vm.Config{})
+	blockchain, _ := core.NewBlockChain(sdb, params.TestChainConfig, ethash.NewFullFaker(), vm.Config{})
 	gchain, _ := core.GenerateChain(params.TestChainConfig, genesis, sdb, poolTestBlocks, txPoolTestChainGen)
 	if _, err := blockchain.InsertChain(gchain); err != nil {
 		panic(err)
@@ -102,9 +100,9 @@ func TestTxPool(t *testing.T) {
 		discard: make(chan int, 1),
 		mined:   make(chan int, 1),
 	}
-	lightchain, _ := NewLightChain(odr, params.TestChainConfig, ethash.NewFullFaker(), evmux)
+	lightchain, _ := NewLightChain(odr, params.TestChainConfig, ethash.NewFullFaker())
 	txPermanent = 50
-	pool := NewTxPool(params.TestChainConfig, evmux, lightchain, relay)
+	pool := NewTxPool(params.TestChainConfig, lightchain, relay)
 	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
 	defer cancel()
 
diff --git a/miner/worker.go b/miner/worker.go
index dab192c24f932d57552ae9946c724df0741c096c..24e03be60af22ce37f9ebc0e0183e0f8c2d26ad3 100644
--- a/miner/worker.go
+++ b/miner/worker.go
@@ -41,6 +41,14 @@ import (
 const (
 	resultQueueSize  = 10
 	miningLogAtDepth = 5
+
+	// txChanSize is the size of channel listening to TxPreEvent.
+	// The number is referenced from the size of tx pool.
+	txChanSize = 4096
+	// chainHeadChanSize is the size of channel listening to ChainHeadEvent.
+	chainHeadChanSize = 10
+	// chainSideChanSize is the size of channel listening to ChainSideEvent.
+	chainSideChanSize = 10
 )
 
 // Agent can register themself with the worker
@@ -87,9 +95,14 @@ type worker struct {
 	mu sync.Mutex
 
 	// update loop
-	mux    *event.TypeMux
-	events *event.TypeMuxSubscription
-	wg     sync.WaitGroup
+	mux          *event.TypeMux
+	txCh         chan core.TxPreEvent
+	txSub        event.Subscription
+	chainHeadCh  chan core.ChainHeadEvent
+	chainHeadSub event.Subscription
+	chainSideCh  chan core.ChainSideEvent
+	chainSideSub event.Subscription
+	wg           sync.WaitGroup
 
 	agents map[Agent]struct{}
 	recv   chan *Result
@@ -123,6 +136,9 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com
 		engine:         engine,
 		eth:            eth,
 		mux:            mux,
+		txCh:           make(chan core.TxPreEvent, txChanSize),
+		chainHeadCh:    make(chan core.ChainHeadEvent, chainHeadChanSize),
+		chainSideCh:    make(chan core.ChainSideEvent, chainSideChanSize),
 		chainDb:        eth.ChainDb(),
 		recv:           make(chan *Result, resultQueueSize),
 		chain:          eth.BlockChain(),
@@ -133,7 +149,11 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com
 		unconfirmed:    newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth),
 		fullValidation: false,
 	}
-	worker.events = worker.mux.Subscribe(core.ChainHeadEvent{}, core.ChainSideEvent{}, core.TxPreEvent{})
+	// Subscribe TxPreEvent for tx pool
+	worker.txSub = eth.TxPool().SubscribeTxPreEvent(worker.txCh)
+	// Subscribe events for blockchain
+	worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh)
+	worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh)
 	go worker.update()
 
 	go worker.wait()
@@ -225,20 +245,28 @@ func (self *worker) unregister(agent Agent) {
 }
 
 func (self *worker) update() {
-	for event := range self.events.Chan() {
+	defer self.txSub.Unsubscribe()
+	defer self.chainHeadSub.Unsubscribe()
+	defer self.chainSideSub.Unsubscribe()
+
+	for {
 		// A real event arrived, process interesting content
-		switch ev := event.Data.(type) {
-		case core.ChainHeadEvent:
+		select {
+		// Handle ChainHeadEvent
+		case <-self.chainHeadCh:
 			self.commitNewWork()
-		case core.ChainSideEvent:
+
+		// Handle ChainSideEvent
+		case ev := <-self.chainSideCh:
 			self.uncleMu.Lock()
 			self.possibleUncles[ev.Block.Hash()] = ev.Block
 			self.uncleMu.Unlock()
-		case core.TxPreEvent:
+
+		// Handle TxPreEvent
+		case ev := <-self.txCh:
 			// Apply transaction to the pending state if we're not mining
 			if atomic.LoadInt32(&self.mining) == 0 {
 				self.currentMu.Lock()
-
 				acc, _ := types.Sender(self.current.signer, ev.Tx)
 				txs := map[common.Address]types.Transactions{acc: {ev.Tx}}
 				txset := types.NewTransactionsByPriceAndNonce(txs)
@@ -246,6 +274,14 @@ func (self *worker) update() {
 				self.current.commitTransactions(self.mux, txset, self.chain, self.coinbase)
 				self.currentMu.Unlock()
 			}
+
+		// System stopped
+		case <-self.txSub.Err():
+			return
+		case <-self.chainHeadSub.Err():
+			return
+		case <-self.chainSideSub.Err():
+			return
 		}
 	}
 }
@@ -298,12 +334,18 @@ func (self *worker) wait() {
 				// broadcast before waiting for validation
 				go func(block *types.Block, logs []*types.Log, receipts []*types.Receipt) {
 					self.mux.Post(core.NewMinedBlockEvent{Block: block})
-					self.mux.Post(core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
+					var (
+						events        []interface{}
+						coalescedLogs []*types.Log
+					)
+					events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
 
 					if stat == core.CanonStatTy {
-						self.mux.Post(core.ChainHeadEvent{Block: block})
-						self.mux.Post(logs)
+						events = append(events, core.ChainHeadEvent{Block: block})
+						coalescedLogs = logs
 					}
+					// post blockchain events
+					self.chain.PostChainEvents(events, coalescedLogs)
 					if err := core.WriteBlockReceipts(self.chainDb, block.Hash(), block.NumberU64(), receipts); err != nil {
 						log.Warn("Failed writing block receipts", "err", err)
 					}
diff --git a/tests/block_test_util.go b/tests/block_test_util.go
index c1d433ff39dc07e4334e9cf6b6a8389fdc242584..a789e6d8873d262a411c7ffce57abe35f2dea2b1 100644
--- a/tests/block_test_util.go
+++ b/tests/block_test_util.go
@@ -33,7 +33,6 @@ import (
 	"github.com/ethereum/go-ethereum/core/types"
 	"github.com/ethereum/go-ethereum/core/vm"
 	"github.com/ethereum/go-ethereum/ethdb"
-	"github.com/ethereum/go-ethereum/event"
 	"github.com/ethereum/go-ethereum/params"
 	"github.com/ethereum/go-ethereum/rlp"
 )
@@ -111,7 +110,7 @@ func (t *BlockTest) Run() error {
 		return fmt.Errorf("genesis block state root does not match test: computed=%x, test=%x", gblock.Root().Bytes()[:6], t.json.Genesis.StateRoot[:6])
 	}
 
-	chain, err := core.NewBlockChain(db, config, ethash.NewShared(), new(event.TypeMux), vm.Config{})
+	chain, err := core.NewBlockChain(db, config, ethash.NewShared(), vm.Config{})
 	if err != nil {
 		return err
 	}