diff --git a/core/types/block.go b/core/types/block.go
index 37b6f3ec17e86b93ff5276167c6bf4350d2c5386..59935924784334137c30bb184a5d0e9a5a90e3b8 100644
--- a/core/types/block.go
+++ b/core/types/block.go
@@ -114,6 +114,28 @@ func (h *Header) UnmarshalJSON(data []byte) error {
 	return nil
 }
 
+func (h *Header) MarshalJSON() ([]byte, error) {
+	fields := map[string]interface{}{
+		"hash":             h.Hash(),
+		"parentHash":       h.ParentHash,
+		"number":           fmt.Sprintf("%#x", h.Number),
+		"nonce":            h.Nonce,
+		"receiptRoot":      h.ReceiptHash,
+		"logsBloom":        h.Bloom,
+		"sha3Uncles":       h.UncleHash,
+		"stateRoot":        h.Root,
+		"miner":            h.Coinbase,
+		"difficulty":       fmt.Sprintf("%#x", h.Difficulty),
+		"extraData":        fmt.Sprintf("0x%x", h.Extra),
+		"gasLimit":         fmt.Sprintf("%#x", h.GasLimit),
+		"gasUsed":          fmt.Sprintf("%#x", h.GasUsed),
+		"timestamp":        fmt.Sprintf("%#x", h.Time),
+		"transactionsRoot": h.TxHash,
+	}
+
+	return json.Marshal(fields)
+}
+
 func rlpHash(x interface{}) (h common.Hash) {
 	hw := sha3.NewKeccak256()
 	rlp.Encode(hw, x)
diff --git a/eth/downloader/api.go b/eth/downloader/api.go
index 94cd6515f89b4d1f85cf1f03dbaaab2c121e419d..c36dfb7e07032c4623b4700365e86a8953445e00 100644
--- a/eth/downloader/api.go
+++ b/eth/downloader/api.go
@@ -19,53 +19,102 @@ package downloader
 import (
 	"sync"
 
-	"golang.org/x/net/context"
-
 	"github.com/ethereum/go-ethereum/event"
 	"github.com/ethereum/go-ethereum/rpc"
+	"golang.org/x/net/context"
 )
 
 // PublicDownloaderAPI provides an API which gives information about the current synchronisation status.
 // It offers only methods that operates on data that can be available to anyone without security risks.
 type PublicDownloaderAPI struct {
-	d                   *Downloader
-	mux                 *event.TypeMux
-	muSyncSubscriptions sync.Mutex
-	syncSubscriptions   map[string]rpc.Subscription
+	d                         *Downloader
+	mux                       *event.TypeMux
+	installSyncSubscription   chan chan interface{}
+	uninstallSyncSubscription chan *uninstallSyncSubscriptionRequest
 }
 
-// NewPublicDownloaderAPI create a new PublicDownloaderAPI.
+// NewPublicDownloaderAPI create a new PublicDownloaderAPI. The API has an internal event loop that
+// listens for events from the downloader through the global event mux. In case it receives one of
+// these events it broadcasts it to all syncing subscriptions that are installed through the
+// installSyncSubscription channel.
 func NewPublicDownloaderAPI(d *Downloader, m *event.TypeMux) *PublicDownloaderAPI {
-	api := &PublicDownloaderAPI{d: d, mux: m, syncSubscriptions: make(map[string]rpc.Subscription)}
+	api := &PublicDownloaderAPI{
+		d:   d,
+		mux: m,
+		installSyncSubscription:   make(chan chan interface{}),
+		uninstallSyncSubscription: make(chan *uninstallSyncSubscriptionRequest),
+	}
 
-	go api.run()
+	go api.eventLoop()
 
 	return api
 }
 
-func (api *PublicDownloaderAPI) run() {
-	sub := api.mux.Subscribe(StartEvent{}, DoneEvent{}, FailedEvent{})
-
-	for event := range sub.Chan() {
-		var notification interface{}
+// eventLoop runs an loop until the event mux closes. It will install and uninstall new
+// sync subscriptions and broadcasts sync status updates to the installed sync subscriptions.
+func (api *PublicDownloaderAPI) eventLoop() {
+	var (
+		sub               = api.mux.Subscribe(StartEvent{}, DoneEvent{}, FailedEvent{})
+		syncSubscriptions = make(map[chan interface{}]struct{})
+	)
+
+	for {
+		select {
+		case i := <-api.installSyncSubscription:
+			syncSubscriptions[i] = struct{}{}
+		case u := <-api.uninstallSyncSubscription:
+			delete(syncSubscriptions, u.c)
+			close(u.uninstalled)
+		case event := <-sub.Chan():
+			if event == nil {
+				return
+			}
 
-		switch event.Data.(type) {
-		case StartEvent:
-			result := &SyncingResult{Syncing: true}
-			result.Status.Origin, result.Status.Current, result.Status.Height, result.Status.Pulled, result.Status.Known = api.d.Progress()
-			notification = result
-		case DoneEvent, FailedEvent:
-			notification = false
+			var notification interface{}
+			switch event.Data.(type) {
+			case StartEvent:
+				result := &SyncingResult{Syncing: true}
+				result.Status.Origin, result.Status.Current, result.Status.Height, result.Status.Pulled, result.Status.Known = api.d.Progress()
+				notification = result
+			case DoneEvent, FailedEvent:
+				notification = false
+			}
+			// broadcast
+			for c := range syncSubscriptions {
+				c <- notification
+			}
 		}
+	}
+}
+
+// Syncing provides information when this nodes starts synchronising with the Ethereum network and when it's finished.
+func (api *PublicDownloaderAPI) Syncing(ctx context.Context) (*rpc.Subscription, error) {
+	notifier, supported := rpc.NotifierFromContext(ctx)
+	if !supported {
+		return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
+	}
 
-		api.muSyncSubscriptions.Lock()
-		for id, sub := range api.syncSubscriptions {
-			if sub.Notify(notification) == rpc.ErrNotificationNotFound {
-				delete(api.syncSubscriptions, id)
+	rpcSub := notifier.CreateSubscription()
+
+	go func() {
+		statuses := make(chan interface{})
+		sub := api.SubscribeSyncStatus(statuses)
+
+		for {
+			select {
+			case status := <-statuses:
+				notifier.Notify(rpcSub.ID, status)
+			case <-rpcSub.Err():
+				sub.Unsubscribe()
+				return
+			case <-notifier.Closed():
+				sub.Unsubscribe()
+				return
 			}
 		}
-		api.muSyncSubscriptions.Unlock()
-	}
+	}()
+
+	return rpcSub, nil
 }
 
 // Progress gives progress indications when the node is synchronising with the Ethereum network.
@@ -83,26 +132,42 @@ type SyncingResult struct {
 	Status  Progress `json:"status"`
 }
 
-// Syncing provides information when this nodes starts synchronising with the Ethereum network and when it's finished.
-func (api *PublicDownloaderAPI) Syncing(ctx context.Context) (rpc.Subscription, error) {
-	notifier, supported := rpc.NotifierFromContext(ctx)
-	if !supported {
-		return nil, rpc.ErrNotificationsUnsupported
-	}
-
-	subscription, err := notifier.NewSubscription(func(id string) {
-		api.muSyncSubscriptions.Lock()
-		delete(api.syncSubscriptions, id)
-		api.muSyncSubscriptions.Unlock()
-	})
+// uninstallSyncSubscriptionRequest uninstalles a syncing subscription in the API event loop.
+type uninstallSyncSubscriptionRequest struct {
+	c           chan interface{}
+	uninstalled chan interface{}
+}
 
-	if err != nil {
-		return nil, err
-	}
+// SyncStatusSubscription represents a syncing subscription.
+type SyncStatusSubscription struct {
+	api       *PublicDownloaderAPI // register subscription in event loop of this api instance
+	c         chan interface{}     // channel where events are broadcasted to
+	unsubOnce sync.Once            // make sure unsubscribe logic is executed once
+}
 
-	api.muSyncSubscriptions.Lock()
-	api.syncSubscriptions[subscription.ID()] = subscription
-	api.muSyncSubscriptions.Unlock()
+// Unsubscribe uninstalls the subscription from the DownloadAPI event loop.
+// The status channel that was passed to subscribeSyncStatus isn't used anymore
+// after this method returns.
+func (s *SyncStatusSubscription) Unsubscribe() {
+	s.unsubOnce.Do(func() {
+		req := uninstallSyncSubscriptionRequest{s.c, make(chan interface{})}
+		s.api.uninstallSyncSubscription <- &req
+
+		for {
+			select {
+			case <-s.c:
+				// drop new status events until uninstall confirmation
+				continue
+			case <-req.uninstalled:
+				return
+			}
+		}
+	})
+}
 
-	return subscription, nil
+// SubscribeSyncStatus creates a subscription that will broadcast new synchronisation updates.
+// The given channel must receive interface values, the result can either
+func (api *PublicDownloaderAPI) SubscribeSyncStatus(status chan interface{}) *SyncStatusSubscription {
+	api.installSyncSubscription <- status
+	return &SyncStatusSubscription{api: api, c: status}
 }
diff --git a/eth/filters/api.go b/eth/filters/api.go
index 65c5b9380da304894fff6023a4b5fb3311fd69f2..3bc2203481fa16253b79a291abd9b36088d4c38d 100644
--- a/eth/filters/api.go
+++ b/eth/filters/api.go
@@ -17,292 +17,414 @@
 package filters
 
 import (
-	"crypto/rand"
 	"encoding/hex"
 	"encoding/json"
 	"errors"
 	"fmt"
+	"math/big"
 	"sync"
 	"time"
 
+	"golang.org/x/net/context"
+
 	"github.com/ethereum/go-ethereum/common"
 	"github.com/ethereum/go-ethereum/core/types"
-	"github.com/ethereum/go-ethereum/core/vm"
 	"github.com/ethereum/go-ethereum/ethdb"
 	"github.com/ethereum/go-ethereum/event"
 	"github.com/ethereum/go-ethereum/rpc"
-
-	"golang.org/x/net/context"
 )
 
 var (
-	filterTickerTime = 5 * time.Minute
+	deadline = 5 * time.Minute // consider a filter inactive if it has not been polled for within deadline
 )
 
-// byte will be inferred
-const (
-	unknownFilterTy = iota
-	blockFilterTy
-	transactionFilterTy
-	logFilterTy
-)
+// filter is a helper struct that holds meta information over the filter type
+// and associated subscription in the event system.
+type filter struct {
+	typ      Type
+	deadline *time.Timer // filter is inactiv when deadline triggers
+	hashes   []common.Hash
+	crit     FilterCriteria
+	logs     []Log
+	s        *Subscription // associated subscription in event system
+}
 
 // PublicFilterAPI offers support to create and manage filters. This will allow external clients to retrieve various
 // information related to the Ethereum protocol such als blocks, transactions and logs.
 type PublicFilterAPI struct {
-	mux *event.TypeMux
-
-	quit    chan struct{}
-	chainDb ethdb.Database
-
-	filterManager *FilterSystem
-
-	filterMapMu   sync.RWMutex
-	filterMapping map[string]int // maps between filter internal filter identifiers and external filter identifiers
-
-	logMu    sync.RWMutex
-	logQueue map[int]*logQueue
-
-	blockMu    sync.RWMutex
-	blockQueue map[int]*hashQueue
-
-	transactionMu    sync.RWMutex
-	transactionQueue map[int]*hashQueue
+	mux       *event.TypeMux
+	quit      chan struct{}
+	chainDb   ethdb.Database
+	events    *EventSystem
+	filtersMu sync.Mutex
+	filters   map[rpc.ID]*filter
 }
 
 // NewPublicFilterAPI returns a new PublicFilterAPI instance.
 func NewPublicFilterAPI(chainDb ethdb.Database, mux *event.TypeMux) *PublicFilterAPI {
-	svc := &PublicFilterAPI{
-		mux:              mux,
-		chainDb:          chainDb,
-		filterManager:    NewFilterSystem(mux),
-		filterMapping:    make(map[string]int),
-		logQueue:         make(map[int]*logQueue),
-		blockQueue:       make(map[int]*hashQueue),
-		transactionQueue: make(map[int]*hashQueue),
+	api := &PublicFilterAPI{
+		mux:     mux,
+		chainDb: chainDb,
+		events:  NewEventSystem(mux),
+		filters: make(map[rpc.ID]*filter),
 	}
-	go svc.start()
-	return svc
-}
 
-// Stop quits the work loop.
-func (s *PublicFilterAPI) Stop() {
-	close(s.quit)
+	go api.timeoutLoop()
+
+	return api
 }
 
-// start the work loop, wait and process events.
-func (s *PublicFilterAPI) start() {
-	timer := time.NewTicker(2 * time.Second)
-	defer timer.Stop()
-done:
+// timeoutLoop runs every 5 minutes and deletes filters that have not been recently used.
+// Tt is started when the api is created.
+func (api *PublicFilterAPI) timeoutLoop() {
+	ticker := time.NewTicker(5 * time.Minute)
 	for {
-		select {
-		case <-timer.C:
-			s.filterManager.Lock() // lock order like filterLoop()
-			s.logMu.Lock()
-			for id, filter := range s.logQueue {
-				if time.Since(filter.timeout) > filterTickerTime {
-					s.filterManager.Remove(id)
-					delete(s.logQueue, id)
-				}
+		<-ticker.C
+		api.filtersMu.Lock()
+		for id, f := range api.filters {
+			select {
+			case <-f.deadline.C:
+				f.s.Unsubscribe()
+				delete(api.filters, id)
+			default:
+				continue
 			}
-			s.logMu.Unlock()
+		}
+		api.filtersMu.Unlock()
+	}
+}
 
-			s.blockMu.Lock()
-			for id, filter := range s.blockQueue {
-				if time.Since(filter.timeout) > filterTickerTime {
-					s.filterManager.Remove(id)
-					delete(s.blockQueue, id)
-				}
-			}
-			s.blockMu.Unlock()
+// NewPendingTransactionFilter creates a filter that fetches pending transaction hashes
+// as transactions enter the pending state.
+//
+// It is part of the filter package because this filter can be used throug the
+// `eth_getFilterChanges` polling method that is also used for log filters.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newpendingtransactionfilter
+func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID {
+	var (
+		pendingTxs   = make(chan common.Hash)
+		pendingTxSub = api.events.SubscribePendingTxEvents(pendingTxs)
+	)
 
-			s.transactionMu.Lock()
-			for id, filter := range s.transactionQueue {
-				if time.Since(filter.timeout) > filterTickerTime {
-					s.filterManager.Remove(id)
-					delete(s.transactionQueue, id)
+	api.filtersMu.Lock()
+	api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: pendingTxSub}
+	api.filtersMu.Unlock()
+
+	go func() {
+		for {
+			select {
+			case ph := <-pendingTxs:
+				api.filtersMu.Lock()
+				if f, found := api.filters[pendingTxSub.ID]; found {
+					f.hashes = append(f.hashes, ph)
 				}
+				api.filtersMu.Unlock()
+			case <-pendingTxSub.Err():
+				api.filtersMu.Lock()
+				delete(api.filters, pendingTxSub.ID)
+				api.filtersMu.Unlock()
+				return
 			}
-			s.transactionMu.Unlock()
-			s.filterManager.Unlock()
-		case <-s.quit:
-			break done
 		}
-	}
+	}()
 
+	return pendingTxSub.ID
 }
 
-// NewBlockFilter create a new filter that returns blocks that are included into the canonical chain.
-func (s *PublicFilterAPI) NewBlockFilter() (string, error) {
-	// protect filterManager.Add() and setting of filter fields
-	s.filterManager.Lock()
-	defer s.filterManager.Unlock()
-
-	externalId, err := newFilterId()
-	if err != nil {
-		return "", err
+// NewPendingTransactions creates a subscription that is triggered each time a transaction
+// enters the transaction pool and was signed from one of the transactions this nodes manages.
+func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscription, error) {
+	notifier, supported := rpc.NotifierFromContext(ctx)
+	if !supported {
+		return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
 	}
 
-	filter := New(s.chainDb)
-	id, err := s.filterManager.Add(filter, ChainFilter)
-	if err != nil {
-		return "", err
-	}
+	rpcSub := notifier.CreateSubscription()
+
+	go func() {
+		txHashes := make(chan common.Hash)
+		pendingTxSub := api.events.SubscribePendingTxEvents(txHashes)
+
+		for {
+			select {
+			case h := <-txHashes:
+				notifier.Notify(rpcSub.ID, h)
+			case <-rpcSub.Err():
+				pendingTxSub.Unsubscribe()
+				return
+			case <-notifier.Closed():
+				pendingTxSub.Unsubscribe()
+				return
+			}
+		}
+	}()
 
-	s.blockMu.Lock()
-	s.blockQueue[id] = &hashQueue{timeout: time.Now()}
-	s.blockMu.Unlock()
+	return rpcSub, nil
+}
 
-	filter.BlockCallback = func(block *types.Block, logs vm.Logs) {
-		s.blockMu.Lock()
-		defer s.blockMu.Unlock()
+// NewBlockFilter creates a filter that fetches blocks that are imported into the chain.
+// It is part of the filter package since polling goes with eth_getFilterChanges.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newblockfilter
+func (api *PublicFilterAPI) NewBlockFilter() rpc.ID {
+	var (
+		headers   = make(chan *types.Header)
+		headerSub = api.events.SubscribeNewHeads(headers)
+	)
 
-		if queue := s.blockQueue[id]; queue != nil {
-			queue.add(block.Hash())
+	api.filtersMu.Lock()
+	api.filters[headerSub.ID] = &filter{typ: BlocksSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: headerSub}
+	api.filtersMu.Unlock()
+
+	go func() {
+		for {
+			select {
+			case h := <-headers:
+				api.filtersMu.Lock()
+				if f, found := api.filters[headerSub.ID]; found {
+					f.hashes = append(f.hashes, h.Hash())
+				}
+				api.filtersMu.Unlock()
+			case <-headerSub.Err():
+				api.filtersMu.Lock()
+				delete(api.filters, headerSub.ID)
+				api.filtersMu.Unlock()
+				return
+			}
 		}
+	}()
+
+	return headerSub.ID
+}
+
+// NewHeads send a notification each time a new (header) block is appended to the chain.
+func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
+	notifier, supported := rpc.NotifierFromContext(ctx)
+	if !supported {
+		return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
 	}
 
-	s.filterMapMu.Lock()
-	s.filterMapping[externalId] = id
-	s.filterMapMu.Unlock()
+	rpcSub := notifier.CreateSubscription()
 
-	return externalId, nil
-}
+	go func() {
+		headers := make(chan *types.Header)
+		headersSub := api.events.SubscribeNewHeads(headers)
 
-// NewPendingTransactionFilter creates a filter that returns new pending transactions.
-func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) {
-	// protect filterManager.Add() and setting of filter fields
-	s.filterManager.Lock()
-	defer s.filterManager.Unlock()
+		for {
+			select {
+			case h := <-headers:
+				notifier.Notify(rpcSub.ID, h)
+			case <-rpcSub.Err():
+				headersSub.Unsubscribe()
+				return
+			case <-notifier.Closed():
+				headersSub.Unsubscribe()
+				return
+			}
+		}
+	}()
 
-	externalId, err := newFilterId()
-	if err != nil {
-		return "", err
-	}
+	return rpcSub, nil
+}
 
-	filter := New(s.chainDb)
-	id, err := s.filterManager.Add(filter, PendingTxFilter)
-	if err != nil {
-		return "", err
+// Logs creates a subscription that fires for all new log that match the given filter criteria.
+func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subscription, error) {
+	notifier, supported := rpc.NotifierFromContext(ctx)
+	if !supported {
+		return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
 	}
 
-	s.transactionMu.Lock()
-	s.transactionQueue[id] = &hashQueue{timeout: time.Now()}
-	s.transactionMu.Unlock()
+	rpcSub := notifier.CreateSubscription()
 
-	filter.TransactionCallback = func(tx *types.Transaction) {
-		s.transactionMu.Lock()
-		defer s.transactionMu.Unlock()
+	go func() {
+		matchedLogs := make(chan []Log)
+		logsSub := api.events.SubscribeLogs(crit, matchedLogs)
 
-		if queue := s.transactionQueue[id]; queue != nil {
-			queue.add(tx.Hash())
+		for {
+			select {
+			case logs := <-matchedLogs:
+				for _, log := range logs {
+					notifier.Notify(rpcSub.ID, &log)
+				}
+			case <-rpcSub.Err(): // client send an unsubscribe request
+				logsSub.Unsubscribe()
+				return
+			case <-notifier.Closed(): // connection dropped
+				logsSub.Unsubscribe()
+				return
+			}
 		}
-	}
+	}()
 
-	s.filterMapMu.Lock()
-	s.filterMapping[externalId] = id
-	s.filterMapMu.Unlock()
+	return rpcSub, nil
+}
 
-	return externalId, nil
+// FilterCriteria represents a request to create a new filter.
+type FilterCriteria struct {
+	FromBlock *big.Int
+	ToBlock   *big.Int
+	Addresses []common.Address
+	Topics    [][]common.Hash
 }
 
-// newLogFilter creates a new log filter.
-func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash, callback func(log *vm.Log, removed bool)) (int, error) {
-	// protect filterManager.Add() and setting of filter fields
-	s.filterManager.Lock()
-	defer s.filterManager.Unlock()
+// NewFilter creates a new filter and returns the filter id. It can be
+// used to retrieve logs when the state changes. This method cannot be
+// used to fetch logs that are already stored in the state.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newfilter
+func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) rpc.ID {
+	var (
+		logs    = make(chan []Log)
+		logsSub = api.events.SubscribeLogs(crit, logs)
+	)
 
-	filter := New(s.chainDb)
-	id, err := s.filterManager.Add(filter, LogFilter)
-	if err != nil {
-		return 0, err
+	if crit.FromBlock == nil {
+		crit.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
+	}
+	if crit.ToBlock == nil {
+		crit.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
 	}
 
-	s.logMu.Lock()
-	s.logQueue[id] = &logQueue{timeout: time.Now()}
-	s.logMu.Unlock()
-
-	filter.SetBeginBlock(earliest)
-	filter.SetEndBlock(latest)
-	filter.SetAddresses(addresses)
-	filter.SetTopics(topics)
-	filter.LogCallback = func(log *vm.Log, removed bool) {
-		if callback != nil {
-			callback(log, removed)
-		} else {
-			s.logMu.Lock()
-			defer s.logMu.Unlock()
-			if queue := s.logQueue[id]; queue != nil {
-				queue.add(vmlog{log, removed})
+	api.filtersMu.Lock()
+	api.filters[logsSub.ID] = &filter{typ: LogsSubscription, crit: crit, deadline: time.NewTimer(deadline), logs: make([]Log, 0), s: logsSub}
+	api.filtersMu.Unlock()
+
+	go func() {
+		for {
+			select {
+			case l := <-logs:
+				api.filtersMu.Lock()
+				if f, found := api.filters[logsSub.ID]; found {
+					f.logs = append(f.logs, l...)
+				}
+				api.filtersMu.Unlock()
+			case <-logsSub.Err():
+				api.filtersMu.Lock()
+				delete(api.filters, logsSub.ID)
+				api.filtersMu.Unlock()
+				return
 			}
 		}
-	}
+	}()
 
-	return id, nil
+	return logsSub.ID
 }
 
-// Logs creates a subscription that fires for all new log that match the given filter criteria.
-func (s *PublicFilterAPI) Logs(ctx context.Context, args NewFilterArgs) (rpc.Subscription, error) {
-	notifier, supported := rpc.NotifierFromContext(ctx)
-	if !supported {
-		return nil, rpc.ErrNotificationsUnsupported
+// GetLogs returns logs matching the given argument that are stored within the state.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getlogs
+func (api *PublicFilterAPI) GetLogs(crit FilterCriteria) []Log {
+	if crit.FromBlock == nil {
+		crit.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
+	}
+	if crit.ToBlock == nil {
+		crit.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
 	}
 
-	var (
-		externalId   string
-		subscription rpc.Subscription
-		err          error
-	)
+	filter := New(api.chainDb)
+	filter.SetBeginBlock(crit.FromBlock.Int64())
+	filter.SetEndBlock(crit.ToBlock.Int64())
+	filter.SetAddresses(crit.Addresses)
+	filter.SetTopics(crit.Topics)
 
-	if externalId, err = newFilterId(); err != nil {
-		return nil, err
-	}
+	return returnLogs(filter.Find())
+}
 
-	// uninstall filter when subscription is unsubscribed/cancelled
-	if subscription, err = notifier.NewSubscription(func(string) {
-		s.UninstallFilter(externalId)
-	}); err != nil {
-		return nil, err
+// UninstallFilter removes the filter with the given filter id.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_uninstallfilter
+func (api *PublicFilterAPI) UninstallFilter(id rpc.ID) bool {
+	api.filtersMu.Lock()
+	f, found := api.filters[id]
+	if found {
+		delete(api.filters, id)
 	}
-
-	notifySubscriber := func(log *vm.Log, removed bool) {
-		rpcLog := toRPCLogs(vm.Logs{log}, removed)
-		if err := subscription.Notify(rpcLog); err != nil {
-			subscription.Cancel()
-		}
+	api.filtersMu.Unlock()
+	if found {
+		f.s.Unsubscribe()
 	}
 
-	// from and to block number are not used since subscriptions don't allow you to travel to "time"
-	var id int
-	if len(args.Addresses) > 0 {
-		id, err = s.newLogFilter(-1, -1, args.Addresses, args.Topics, notifySubscriber)
-	} else {
-		id, err = s.newLogFilter(-1, -1, nil, args.Topics, notifySubscriber)
+	return found
+}
+
+// GetFilterLogs returns the logs for the filter with the given id.
+// If the filter could not be found an empty array of logs is returned.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterlogs
+func (api *PublicFilterAPI) GetFilterLogs(id rpc.ID) []Log {
+	api.filtersMu.Lock()
+	f, found := api.filters[id]
+	api.filtersMu.Unlock()
+
+	if !found || f.typ != LogsSubscription {
+		return []Log{}
 	}
 
-	if err != nil {
-		subscription.Cancel()
-		return nil, err
+	filter := New(api.chainDb)
+	filter.SetBeginBlock(f.crit.FromBlock.Int64())
+	filter.SetEndBlock(f.crit.ToBlock.Int64())
+	filter.SetAddresses(f.crit.Addresses)
+	filter.SetTopics(f.crit.Topics)
+
+	return returnLogs(filter.Find())
+}
+
+// GetFilterChanges returns the logs for the filter with the given id since
+// last time is was called. This can be used for polling.
+//
+// For pending transaction and block filters the result is []common.Hash.
+// (pending)Log filters return []Log. If the filter could not be found
+// []interface{}{} is returned.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterchanges
+func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) interface{} {
+	api.filtersMu.Lock()
+	defer api.filtersMu.Unlock()
+
+	if f, found := api.filters[id]; found {
+		if !f.deadline.Stop() {
+			// timer expired but filter is not yet removed in timeout loop
+			// receive timer value and reset timer
+			<-f.deadline.C
+		}
+		f.deadline.Reset(deadline)
+
+		switch f.typ {
+		case PendingTransactionsSubscription, BlocksSubscription:
+			hashes := f.hashes
+			f.hashes = nil
+			return returnHashes(hashes)
+		case PendingLogsSubscription, LogsSubscription:
+			logs := f.logs
+			f.logs = nil
+			return returnLogs(logs)
+		}
 	}
 
-	s.filterMapMu.Lock()
-	s.filterMapping[externalId] = id
-	s.filterMapMu.Unlock()
+	return []interface{}{}
+}
 
-	return subscription, err
+// returnHashes is a helper that will return an empty hash array case the given hash array is nil,
+// otherwise the given hashes array is returned.
+func returnHashes(hashes []common.Hash) []common.Hash {
+	if hashes == nil {
+		return []common.Hash{}
+	}
+	return hashes
 }
 
-// NewFilterArgs represents a request to create a new filter.
-type NewFilterArgs struct {
-	FromBlock rpc.BlockNumber
-	ToBlock   rpc.BlockNumber
-	Addresses []common.Address
-	Topics    [][]common.Hash
+// returnLogs is a helper that will return an empty log array in case the given logs array is nil,
+// otherwise the given logs array is returned.
+func returnLogs(logs []Log) []Log {
+	if logs == nil {
+		return []Log{}
+	}
+	return logs
 }
 
 // UnmarshalJSON sets *args fields with given data.
-func (args *NewFilterArgs) UnmarshalJSON(data []byte) error {
+func (args *FilterCriteria) UnmarshalJSON(data []byte) error {
 	type input struct {
 		From      *rpc.BlockNumber `json:"fromBlock"`
 		ToBlock   *rpc.BlockNumber `json:"toBlock"`
@@ -316,15 +438,15 @@ func (args *NewFilterArgs) UnmarshalJSON(data []byte) error {
 	}
 
 	if raw.From == nil || raw.From.Int64() < 0 {
-		args.FromBlock = rpc.LatestBlockNumber
+		args.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
 	} else {
-		args.FromBlock = *raw.From
+		args.FromBlock = big.NewInt(raw.From.Int64())
 	}
 
 	if raw.ToBlock == nil || raw.ToBlock.Int64() < 0 {
-		args.ToBlock = rpc.LatestBlockNumber
+		args.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
 	} else {
-		args.ToBlock = *raw.ToBlock
+		args.ToBlock = big.NewInt(raw.ToBlock.Int64())
 	}
 
 	args.Addresses = []common.Address{}
@@ -414,255 +536,3 @@ func (args *NewFilterArgs) UnmarshalJSON(data []byte) error {
 
 	return nil
 }
-
-// NewFilter creates a new filter and returns the filter id. It can be uses to retrieve logs.
-func (s *PublicFilterAPI) NewFilter(args NewFilterArgs) (string, error) {
-	externalId, err := newFilterId()
-	if err != nil {
-		return "", err
-	}
-
-	var id int
-	if len(args.Addresses) > 0 {
-		id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), args.Addresses, args.Topics, nil)
-	} else {
-		id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), nil, args.Topics, nil)
-	}
-	if err != nil {
-		return "", err
-	}
-
-	s.filterMapMu.Lock()
-	s.filterMapping[externalId] = id
-	s.filterMapMu.Unlock()
-
-	return externalId, nil
-}
-
-// GetLogs returns the logs matching the given argument.
-func (s *PublicFilterAPI) GetLogs(args NewFilterArgs) []vmlog {
-	filter := New(s.chainDb)
-	filter.SetBeginBlock(args.FromBlock.Int64())
-	filter.SetEndBlock(args.ToBlock.Int64())
-	filter.SetAddresses(args.Addresses)
-	filter.SetTopics(args.Topics)
-
-	return toRPCLogs(filter.Find(), false)
-}
-
-// UninstallFilter removes the filter with the given filter id.
-func (s *PublicFilterAPI) UninstallFilter(filterId string) bool {
-	s.filterManager.Lock()
-	defer s.filterManager.Unlock()
-
-	s.filterMapMu.Lock()
-	id, ok := s.filterMapping[filterId]
-	if !ok {
-		s.filterMapMu.Unlock()
-		return false
-	}
-	delete(s.filterMapping, filterId)
-	s.filterMapMu.Unlock()
-
-	s.filterManager.Remove(id)
-
-	s.logMu.Lock()
-	if _, ok := s.logQueue[id]; ok {
-		delete(s.logQueue, id)
-		s.logMu.Unlock()
-		return true
-	}
-	s.logMu.Unlock()
-
-	s.blockMu.Lock()
-	if _, ok := s.blockQueue[id]; ok {
-		delete(s.blockQueue, id)
-		s.blockMu.Unlock()
-		return true
-	}
-	s.blockMu.Unlock()
-
-	s.transactionMu.Lock()
-	if _, ok := s.transactionQueue[id]; ok {
-		delete(s.transactionQueue, id)
-		s.transactionMu.Unlock()
-		return true
-	}
-	s.transactionMu.Unlock()
-
-	return false
-}
-
-// getFilterType is a helper utility that determine the type of filter for the given filter id.
-func (s *PublicFilterAPI) getFilterType(id int) byte {
-	if _, ok := s.blockQueue[id]; ok {
-		return blockFilterTy
-	} else if _, ok := s.transactionQueue[id]; ok {
-		return transactionFilterTy
-	} else if _, ok := s.logQueue[id]; ok {
-		return logFilterTy
-	}
-
-	return unknownFilterTy
-}
-
-// blockFilterChanged returns a collection of block hashes for the block filter with the given id.
-func (s *PublicFilterAPI) blockFilterChanged(id int) []common.Hash {
-	s.blockMu.Lock()
-	defer s.blockMu.Unlock()
-
-	if s.blockQueue[id] != nil {
-		return s.blockQueue[id].get()
-	}
-	return nil
-}
-
-// transactionFilterChanged returns a collection of transaction hashes for the pending
-// transaction filter with the given id.
-func (s *PublicFilterAPI) transactionFilterChanged(id int) []common.Hash {
-	s.blockMu.Lock()
-	defer s.blockMu.Unlock()
-
-	if s.transactionQueue[id] != nil {
-		return s.transactionQueue[id].get()
-	}
-	return nil
-}
-
-// logFilterChanged returns a collection of logs for the log filter with the given id.
-func (s *PublicFilterAPI) logFilterChanged(id int) []vmlog {
-	s.logMu.Lock()
-	defer s.logMu.Unlock()
-
-	if s.logQueue[id] != nil {
-		return s.logQueue[id].get()
-	}
-	return nil
-}
-
-// GetFilterLogs returns the logs for the filter with the given id.
-func (s *PublicFilterAPI) GetFilterLogs(filterId string) []vmlog {
-	s.filterMapMu.RLock()
-	id, ok := s.filterMapping[filterId]
-	s.filterMapMu.RUnlock()
-	if !ok {
-		return toRPCLogs(nil, false)
-	}
-
-	if filter := s.filterManager.Get(id); filter != nil {
-		return toRPCLogs(filter.Find(), false)
-	}
-
-	return toRPCLogs(nil, false)
-}
-
-// GetFilterChanges returns the logs for the filter with the given id since last time is was called.
-// This can be used for polling.
-func (s *PublicFilterAPI) GetFilterChanges(filterId string) interface{} {
-	s.filterMapMu.RLock()
-	id, ok := s.filterMapping[filterId]
-	s.filterMapMu.RUnlock()
-
-	if !ok { // filter not found
-		return []interface{}{}
-	}
-
-	switch s.getFilterType(id) {
-	case blockFilterTy:
-		return returnHashes(s.blockFilterChanged(id))
-	case transactionFilterTy:
-		return returnHashes(s.transactionFilterChanged(id))
-	case logFilterTy:
-		return s.logFilterChanged(id)
-	}
-
-	return []interface{}{}
-}
-
-type vmlog struct {
-	*vm.Log
-	Removed bool `json:"removed"`
-}
-
-type logQueue struct {
-	mu sync.Mutex
-
-	logs    []vmlog
-	timeout time.Time
-	id      int
-}
-
-func (l *logQueue) add(logs ...vmlog) {
-	l.mu.Lock()
-	defer l.mu.Unlock()
-
-	l.logs = append(l.logs, logs...)
-}
-
-func (l *logQueue) get() []vmlog {
-	l.mu.Lock()
-	defer l.mu.Unlock()
-
-	l.timeout = time.Now()
-	tmp := l.logs
-	l.logs = nil
-	return tmp
-}
-
-type hashQueue struct {
-	mu sync.Mutex
-
-	hashes  []common.Hash
-	timeout time.Time
-	id      int
-}
-
-func (l *hashQueue) add(hashes ...common.Hash) {
-	l.mu.Lock()
-	defer l.mu.Unlock()
-
-	l.hashes = append(l.hashes, hashes...)
-}
-
-func (l *hashQueue) get() []common.Hash {
-	l.mu.Lock()
-	defer l.mu.Unlock()
-
-	l.timeout = time.Now()
-	tmp := l.hashes
-	l.hashes = nil
-	return tmp
-}
-
-// newFilterId generates a new random filter identifier that can be exposed to the outer world. By publishing random
-// identifiers it is not feasible for DApp's to guess filter id's for other DApp's and uninstall or poll for them
-// causing the affected DApp to miss data.
-func newFilterId() (string, error) {
-	var subid [16]byte
-	n, _ := rand.Read(subid[:])
-	if n != 16 {
-		return "", errors.New("Unable to generate filter id")
-	}
-	return "0x" + hex.EncodeToString(subid[:]), nil
-}
-
-// toRPCLogs is a helper that will convert a vm.Logs array to an structure which
-// can hold additional information about the logs such as whether it was deleted.
-// Additionally when nil is given it will by default instead create an empty slice
-// instead. This is required by the RPC specification.
-func toRPCLogs(logs vm.Logs, removed bool) []vmlog {
-	convertedLogs := make([]vmlog, len(logs))
-	for i, log := range logs {
-		convertedLogs[i] = vmlog{Log: log, Removed: removed}
-	}
-	return convertedLogs
-}
-
-// returnHashes is a helper that will return an empty hash array case the given hash array is nil, otherwise is will
-// return the given hashes. The RPC interfaces defines that always an array is returned.
-func returnHashes(hashes []common.Hash) []common.Hash {
-	if hashes == nil {
-		return []common.Hash{}
-	}
-	return hashes
-}
diff --git a/eth/filters/api_test.go b/eth/filters/api_test.go
index 9e8edc2413681ecbd7a95e26a0b1213eddf11a79..98eb6cbaa2717a858051755c9bfb8d45b9d69352 100644
--- a/eth/filters/api_test.go
+++ b/eth/filters/api_test.go
@@ -14,7 +14,7 @@
 // You should have received a copy of the GNU Lesser General Public License
 // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
 
-package filters_test
+package filters
 
 import (
 	"encoding/json"
@@ -22,7 +22,6 @@ import (
 	"testing"
 
 	"github.com/ethereum/go-ethereum/common"
-	"github.com/ethereum/go-ethereum/eth/filters"
 	"github.com/ethereum/go-ethereum/rpc"
 )
 
@@ -39,14 +38,14 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) {
 	)
 
 	// default values
-	var test0 filters.NewFilterArgs
+	var test0 FilterCriteria
 	if err := json.Unmarshal([]byte("{}"), &test0); err != nil {
 		t.Fatal(err)
 	}
-	if test0.FromBlock != rpc.LatestBlockNumber {
+	if test0.FromBlock.Int64() != rpc.LatestBlockNumber.Int64() {
 		t.Fatalf("expected %d, got %d", rpc.LatestBlockNumber, test0.FromBlock)
 	}
-	if test0.ToBlock != rpc.LatestBlockNumber {
+	if test0.ToBlock.Int64() != rpc.LatestBlockNumber.Int64() {
 		t.Fatalf("expected %d, got %d", rpc.LatestBlockNumber, test0.ToBlock)
 	}
 	if len(test0.Addresses) != 0 {
@@ -57,20 +56,20 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) {
 	}
 
 	// from, to block number
-	var test1 filters.NewFilterArgs
+	var test1 FilterCriteria
 	vector := fmt.Sprintf(`{"fromBlock":"0x%x","toBlock":"0x%x"}`, fromBlock, toBlock)
 	if err := json.Unmarshal([]byte(vector), &test1); err != nil {
 		t.Fatal(err)
 	}
-	if test1.FromBlock != fromBlock {
+	if test1.FromBlock.Int64() != fromBlock.Int64() {
 		t.Fatalf("expected FromBlock %d, got %d", fromBlock, test1.FromBlock)
 	}
-	if test1.ToBlock != toBlock {
+	if test1.ToBlock.Int64() != toBlock.Int64() {
 		t.Fatalf("expected ToBlock %d, got %d", toBlock, test1.ToBlock)
 	}
 
 	// single address
-	var test2 filters.NewFilterArgs
+	var test2 FilterCriteria
 	vector = fmt.Sprintf(`{"address": "%s"}`, address0.Hex())
 	if err := json.Unmarshal([]byte(vector), &test2); err != nil {
 		t.Fatal(err)
@@ -83,7 +82,7 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) {
 	}
 
 	// multiple address
-	var test3 filters.NewFilterArgs
+	var test3 FilterCriteria
 	vector = fmt.Sprintf(`{"address": ["%s", "%s"]}`, address0.Hex(), address1.Hex())
 	if err := json.Unmarshal([]byte(vector), &test3); err != nil {
 		t.Fatal(err)
@@ -99,7 +98,7 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) {
 	}
 
 	// single topic
-	var test4 filters.NewFilterArgs
+	var test4 FilterCriteria
 	vector = fmt.Sprintf(`{"topics": ["%s"]}`, topic0.Hex())
 	if err := json.Unmarshal([]byte(vector), &test4); err != nil {
 		t.Fatal(err)
@@ -115,7 +114,7 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) {
 	}
 
 	// test multiple "AND" topics
-	var test5 filters.NewFilterArgs
+	var test5 FilterCriteria
 	vector = fmt.Sprintf(`{"topics": ["%s", "%s"]}`, topic0.Hex(), topic1.Hex())
 	if err := json.Unmarshal([]byte(vector), &test5); err != nil {
 		t.Fatal(err)
@@ -137,7 +136,7 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) {
 	}
 
 	// test optional topic
-	var test6 filters.NewFilterArgs
+	var test6 FilterCriteria
 	vector = fmt.Sprintf(`{"topics": ["%s", null, "%s"]}`, topic0.Hex(), topic2.Hex())
 	if err := json.Unmarshal([]byte(vector), &test6); err != nil {
 		t.Fatal(err)
@@ -165,7 +164,7 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) {
 	}
 
 	// test OR topics
-	var test7 filters.NewFilterArgs
+	var test7 FilterCriteria
 	vector = fmt.Sprintf(`{"topics": [["%s", "%s"], null, ["%s", null]]}`, topic0.Hex(), topic1.Hex(), topic2.Hex())
 	if err := json.Unmarshal([]byte(vector), &test7); err != nil {
 		t.Fatal(err)
diff --git a/eth/filters/filter.go b/eth/filters/filter.go
index fd739bf0ee8c02ed853ee9f4b5da220be929339f..4226620dc4e80ba55a0de33dd0350727eb81ec12 100644
--- a/eth/filters/filter.go
+++ b/eth/filters/filter.go
@@ -23,15 +23,10 @@ import (
 	"github.com/ethereum/go-ethereum/common"
 	"github.com/ethereum/go-ethereum/core"
 	"github.com/ethereum/go-ethereum/core/types"
-	"github.com/ethereum/go-ethereum/core/vm"
 	"github.com/ethereum/go-ethereum/ethdb"
 )
 
-type AccountChange struct {
-	Address, StateAddress []byte
-}
-
-// Filtering interface
+// Filter can be used to retrieve and filter logs
 type Filter struct {
 	created time.Time
 
@@ -39,70 +34,72 @@ type Filter struct {
 	begin, end int64
 	addresses  []common.Address
 	topics     [][]common.Hash
-
-	BlockCallback       func(*types.Block, vm.Logs)
-	TransactionCallback func(*types.Transaction)
-	LogCallback         func(*vm.Log, bool)
 }
 
-// Create a new filter which uses a bloom filter on blocks to figure out whether a particular block
-// is interesting or not.
+// New creates a new filter which uses a bloom filter on blocks to figure out whether
+// a particular block is interesting or not.
 func New(db ethdb.Database) *Filter {
 	return &Filter{db: db}
 }
 
-// Set the earliest and latest block for filtering.
+// SetBeginBlock sets the earliest block for filtering.
 // -1 = latest block (i.e., the current block)
 // hash = particular hash from-to
-func (self *Filter) SetBeginBlock(begin int64) {
-	self.begin = begin
+func (f *Filter) SetBeginBlock(begin int64) {
+	f.begin = begin
 }
 
-func (self *Filter) SetEndBlock(end int64) {
-	self.end = end
+// SetEndBlock sets the latest block for filtering.
+func (f *Filter) SetEndBlock(end int64) {
+	f.end = end
 }
 
-func (self *Filter) SetAddresses(addr []common.Address) {
-	self.addresses = addr
+// SetAddresses matches only logs that are generated from addresses that are included
+// in the given addresses.
+func (f *Filter) SetAddresses(addr []common.Address) {
+	f.addresses = addr
 }
 
-func (self *Filter) SetTopics(topics [][]common.Hash) {
-	self.topics = topics
+// SetTopics matches only logs that have topics matching the given topics.
+func (f *Filter) SetTopics(topics [][]common.Hash) {
+	f.topics = topics
 }
 
 // Run filters logs with the current parameters set
-func (self *Filter) Find() vm.Logs {
-	latestHash := core.GetHeadBlockHash(self.db)
-	latestBlock := core.GetBlock(self.db, latestHash, core.GetBlockNumber(self.db, latestHash))
+func (f *Filter) Find() []Log {
+	latestHash := core.GetHeadBlockHash(f.db)
+	latestBlock := core.GetBlock(f.db, latestHash, core.GetBlockNumber(f.db, latestHash))
 	if latestBlock == nil {
-		return vm.Logs{}
+		return []Log{}
 	}
-	var beginBlockNo uint64 = uint64(self.begin)
-	if self.begin == -1 {
+
+	var beginBlockNo uint64 = uint64(f.begin)
+	if f.begin == -1 {
 		beginBlockNo = latestBlock.NumberU64()
 	}
-	var endBlockNo uint64 = uint64(self.end)
-	if self.end == -1 {
+
+	endBlockNo := uint64(f.end)
+	if f.end == -1 {
 		endBlockNo = latestBlock.NumberU64()
 	}
 
 	// if no addresses are present we can't make use of fast search which
 	// uses the mipmap bloom filters to check for fast inclusion and uses
 	// higher range probability in order to ensure at least a false positive
-	if len(self.addresses) == 0 {
-		return self.getLogs(beginBlockNo, endBlockNo)
+	if len(f.addresses) == 0 {
+		return f.getLogs(beginBlockNo, endBlockNo)
 	}
-	return self.mipFind(beginBlockNo, endBlockNo, 0)
+	return f.mipFind(beginBlockNo, endBlockNo, 0)
 }
 
-func (self *Filter) mipFind(start, end uint64, depth int) (logs vm.Logs) {
+func (f *Filter) mipFind(start, end uint64, depth int) (logs []Log) {
 	level := core.MIPMapLevels[depth]
 	// normalise numerator so we can work in level specific batches and
 	// work with the proper range checks
 	for num := start / level * level; num <= end; num += level {
 		// find addresses in bloom filters
-		bloom := core.GetMipmapBloom(self.db, num, level)
-		for _, addr := range self.addresses {
+		bloom := core.GetMipmapBloom(f.db, num, level)
+		for _, addr := range f.addresses {
 			if bloom.TestBytes(addr[:]) {
 				// range check normalised values and make sure that
 				// we're resolving the correct range instead of the
@@ -110,9 +107,9 @@ func (self *Filter) mipFind(start, end uint64, depth int) (logs vm.Logs) {
 				start := uint64(math.Max(float64(num), float64(start)))
 				end := uint64(math.Min(float64(num+level-1), float64(end)))
 				if depth+1 == len(core.MIPMapLevels) {
-					logs = append(logs, self.getLogs(start, end)...)
+					logs = append(logs, f.getLogs(start, end)...)
 				} else {
-					logs = append(logs, self.mipFind(start, end, depth+1)...)
+					logs = append(logs, f.mipFind(start, end, depth+1)...)
 				}
 				// break so we don't check the same range for each
 				// possible address. Checks on multiple addresses
@@ -125,12 +122,15 @@ func (self *Filter) mipFind(start, end uint64, depth int) (logs vm.Logs) {
 	return logs
 }
 
-func (self *Filter) getLogs(start, end uint64) (logs vm.Logs) {
+func (f *Filter) getLogs(start, end uint64) (logs []Log) {
+	var block *types.Block
+
 	for i := start; i <= end; i++ {
-		var block *types.Block
-		hash := core.GetCanonicalHash(self.db, i)
+		hash := core.GetCanonicalHash(f.db, i)
 		if hash != (common.Hash{}) {
-			block = core.GetBlock(self.db, hash, i)
+			block = core.GetBlock(f.db, hash, i)
+		} else { // block not found
+			return logs
 		}
 		if block == nil { // block not found/written
 			return logs
@@ -138,16 +138,20 @@ func (self *Filter) getLogs(start, end uint64) (logs vm.Logs) {
 
 		// Use bloom filtering to see if this block is interesting given the
 		// current parameters
-		if self.bloomFilter(block) {
+		if f.bloomFilter(block) {
 			// Get the logs of the block
 			var (
-				receipts   = core.GetBlockReceipts(self.db, block.Hash(), i)
-				unfiltered vm.Logs
+				receipts   = core.GetBlockReceipts(f.db, block.Hash(), i)
+				unfiltered []Log
 			)
 			for _, receipt := range receipts {
-				unfiltered = append(unfiltered, receipt.Logs...)
+				rl := make([]Log, len(receipt.Logs))
+				for i, l := range receipt.Logs {
+					rl[i] = Log{l, false}
+				}
+				unfiltered = append(unfiltered, rl...)
 			}
-			logs = append(logs, self.FilterLogs(unfiltered)...)
+			logs = append(logs, filterLogs(unfiltered, f.addresses, f.topics)...)
 		}
 	}
 
@@ -164,26 +168,25 @@ func includes(addresses []common.Address, a common.Address) bool {
 	return false
 }
 
-func (self *Filter) FilterLogs(logs vm.Logs) vm.Logs {
-	var ret vm.Logs
+func filterLogs(logs []Log, addresses []common.Address, topics [][]common.Hash) []Log {
+	var ret []Log
 
 	// Filter the logs for interesting stuff
 Logs:
 	for _, log := range logs {
-		if len(self.addresses) > 0 && !includes(self.addresses, log.Address) {
+		if len(addresses) > 0 && !includes(addresses, log.Address) {
 			continue
 		}
 
-		logTopics := make([]common.Hash, len(self.topics))
+		logTopics := make([]common.Hash, len(topics))
 		copy(logTopics, log.Topics)
 
-		// If the to filtered topics is greater than the amount of topics in
-		//  logs, skip.
-		if len(self.topics) > len(log.Topics) {
+		// If the to filtered topics is greater than the amount of topics in logs, skip.
+		if len(topics) > len(log.Topics) {
 			continue Logs
 		}
 
-		for i, topics := range self.topics {
+		for i, topics := range topics {
 			var match bool
 			for _, topic := range topics {
 				// common.Hash{} is a match all (wildcard)
@@ -196,7 +199,6 @@ Logs:
 			if !match {
 				continue Logs
 			}
-
 		}
 
 		ret = append(ret, log)
@@ -205,10 +207,10 @@ Logs:
 	return ret
 }
 
-func (self *Filter) bloomFilter(block *types.Block) bool {
-	if len(self.addresses) > 0 {
+func (f *Filter) bloomFilter(block *types.Block) bool {
+	if len(f.addresses) > 0 {
 		var included bool
-		for _, addr := range self.addresses {
+		for _, addr := range f.addresses {
 			if types.BloomLookup(block.Bloom(), addr) {
 				included = true
 				break
@@ -220,7 +222,7 @@ func (self *Filter) bloomFilter(block *types.Block) bool {
 		}
 	}
 
-	for _, sub := range self.topics {
+	for _, sub := range f.topics {
 		var included bool
 		for _, topic := range sub {
 			if (topic == common.Hash{}) || types.BloomLookup(block.Bloom(), topic) {
diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go
index 2564642131cccb5c25e545ebd40e8007a9c2c95d..04a55fd0990f3b01405b077a4c76da76529e9c6b 100644
--- a/eth/filters/filter_system.go
+++ b/eth/filters/filter_system.go
@@ -14,179 +14,305 @@
 // You should have received a copy of the GNU Lesser General Public License
 // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
 
-// package filters implements an ethereum filtering system for block,
+// Package filters implements an ethereum filtering system for block,
 // transactions and log events.
 package filters
 
 import (
+	"encoding/json"
+	"errors"
 	"fmt"
 	"sync"
 	"time"
 
+	"github.com/ethereum/go-ethereum/common"
 	"github.com/ethereum/go-ethereum/core"
+	"github.com/ethereum/go-ethereum/core/types"
 	"github.com/ethereum/go-ethereum/core/vm"
 	"github.com/ethereum/go-ethereum/event"
+	"github.com/ethereum/go-ethereum/rpc"
 )
 
-// FilterType determines the type of filter and is used to put the filter in to
+// Type determines the kind of filter and is used to put the filter in to
 // the correct bucket when added.
-type FilterType byte
+type Type byte
 
 const (
-	ChainFilter      FilterType = iota // new block events filter
-	PendingTxFilter                    // pending transaction filter
-	LogFilter                          // new or removed log filter
-	PendingLogFilter                   // pending log filter
+	// UnknownSubscription indicates an unkown subscription type
+	UnknownSubscription Type = iota
+	// LogsSubscription queries for new or removed (chain reorg) logs
+	LogsSubscription
+	// PendingLogsSubscription queries for logs for the pending block
+	PendingLogsSubscription
+	// PendingTransactionsSubscription queries tx hashes for pending
+	// transactions entering the pending state
+	PendingTransactionsSubscription
+	// BlocksSubscription queries hashes for blocks that are imported
+	BlocksSubscription
 )
 
-// FilterSystem manages filters that filter specific events such as
-// block, transaction and log events. The Filtering system can be used to listen
-// for specific LOG events fired by the EVM (Ethereum Virtual Machine).
-type FilterSystem struct {
-	filterMu sync.RWMutex
-	filterId int
+var (
+	ErrInvalidSubscriptionID = errors.New("invalid id")
+)
+
+// Log is a helper that can hold additional information about vm.Log
+// necessary for the RPC interface.
+type Log struct {
+	*vm.Log
+	Removed bool `json:"removed"`
+}
 
-	chainFilters      map[int]*Filter
-	pendingTxFilters  map[int]*Filter
-	logFilters        map[int]*Filter
-	pendingLogFilters map[int]*Filter
+func (l *Log) MarshalJSON() ([]byte, error) {
+	fields := map[string]interface{}{
+		"address":          l.Address,
+		"data":             fmt.Sprintf("0x%x", l.Data),
+		"blockNumber":      fmt.Sprintf("%#x", l.BlockNumber),
+		"logIndex":         fmt.Sprintf("%#x", l.Index),
+		"blockHash":        l.BlockHash,
+		"transactionHash":  l.TxHash,
+		"transactionIndex": fmt.Sprintf("%#x", l.TxIndex),
+		"topics":           l.Topics,
+		"removed":          l.Removed,
+	}
 
-	// generic is an ugly hack for Get
-	generic map[int]*Filter
+	return json.Marshal(fields)
+}
 
-	sub event.Subscription
+type subscription struct {
+	id        rpc.ID
+	typ       Type
+	created   time.Time
+	logsCrit  FilterCriteria
+	logs      chan []Log
+	hashes    chan common.Hash
+	headers   chan *types.Header
+	installed chan struct{} // closed when the filter is installed
+	err       chan error    // closed when the filter is uninstalled
 }
 
-// NewFilterSystem returns a newly allocated filter manager
-func NewFilterSystem(mux *event.TypeMux) *FilterSystem {
-	fs := &FilterSystem{
-		chainFilters:      make(map[int]*Filter),
-		pendingTxFilters:  make(map[int]*Filter),
-		logFilters:        make(map[int]*Filter),
-		pendingLogFilters: make(map[int]*Filter),
-		generic:           make(map[int]*Filter),
+// EventSystem creates subscriptions, processes events and broadcasts them to the
+// subscription which match the subscription criteria.
+type EventSystem struct {
+	mux       *event.TypeMux
+	sub       event.Subscription
+	install   chan *subscription // install filter for event notification
+	uninstall chan *subscription // remove filter for event notification
+}
+
+// NewEventSystem creates a new manager that listens for event on the given mux,
+// parses and filters them. It uses the all map to retrieve filter changes. The
+// work loop holds its own index that is used to forward events to filters.
+//
+// The returned manager has a loop that needs to be stopped with the Stop function
+// or by stopping the given mux.
+func NewEventSystem(mux *event.TypeMux) *EventSystem {
+	m := &EventSystem{
+		mux:       mux,
+		install:   make(chan *subscription),
+		uninstall: make(chan *subscription),
 	}
-	fs.sub = mux.Subscribe(
-		core.PendingLogsEvent{},
-		core.RemovedLogsEvent{},
-		core.ChainEvent{},
-		core.TxPreEvent{},
-		vm.Logs(nil),
-	)
-	go fs.filterLoop()
-	return fs
+
+	go m.eventLoop()
+
+	return m
 }
 
-// Stop quits the filter loop required for polling events
-func (fs *FilterSystem) Stop() {
-	fs.sub.Unsubscribe()
+// Subscription is created when the client registers itself for a particular event.
+type Subscription struct {
+	ID        rpc.ID
+	f         *subscription
+	es        *EventSystem
+	unsubOnce sync.Once
 }
 
-// Acquire filter system maps lock, required to force lock acquisition
-// sequence with filterMu acquired first to avoid deadlocks by callbacks
-func (fs *FilterSystem) Lock() {
-	fs.filterMu.Lock()
+// Err returns a channel that is closed when unsubscribed.
+func (sub *Subscription) Err() <-chan error {
+	return sub.f.err
 }
 
-// Release filter system maps lock
-func (fs *FilterSystem) Unlock() {
-	fs.filterMu.Unlock()
+// Unsubscribe uninstalls the subscription from the event broadcast loop.
+func (sub *Subscription) Unsubscribe() {
+	sub.unsubOnce.Do(func() {
+	uninstallLoop:
+		for {
+			// write uninstall request and consume logs/hashes. This prevents
+			// the eventLoop broadcast method to deadlock when writing to the
+			// filter event channel while the subscription loop is waiting for
+			// this method to return (and thus not reading these events).
+			select {
+			case sub.es.uninstall <- sub.f:
+				break uninstallLoop
+			case <-sub.f.logs:
+			case <-sub.f.hashes:
+			case <-sub.f.headers:
+			}
+		}
+
+		// wait for filter to be uninstalled in work loop before returning
+		// this ensures that the manager won't use the event channel which
+		// will probably be closed by the client asap after this method returns.
+		<-sub.Err()
+	})
 }
 
-// Add adds a filter to the filter manager
-// Expects filterMu to be locked.
-func (fs *FilterSystem) Add(filter *Filter, filterType FilterType) (int, error) {
-	id := fs.filterId
-	filter.created = time.Now()
+// subscribe installs the subscription in the event broadcast loop.
+func (es *EventSystem) subscribe(sub *subscription) *Subscription {
+	es.install <- sub
+	<-sub.installed
+	return &Subscription{ID: sub.id, f: sub, es: es}
+}
 
-	switch filterType {
-	case ChainFilter:
-		fs.chainFilters[id] = filter
-	case PendingTxFilter:
-		fs.pendingTxFilters[id] = filter
-	case LogFilter:
-		fs.logFilters[id] = filter
-	case PendingLogFilter:
-		fs.pendingLogFilters[id] = filter
-	default:
-		return 0, fmt.Errorf("unknown filter type %v", filterType)
+// SubscribeLogs creates a subscription that will write all logs matching the
+// given criteria to the given logs channel.
+func (es *EventSystem) SubscribeLogs(crit FilterCriteria, logs chan []Log) *Subscription {
+	sub := &subscription{
+		id:        rpc.NewID(),
+		typ:       LogsSubscription,
+		logsCrit:  crit,
+		created:   time.Now(),
+		logs:      logs,
+		hashes:    make(chan common.Hash),
+		headers:   make(chan *types.Header),
+		installed: make(chan struct{}),
+		err:       make(chan error),
 	}
-	fs.generic[id] = filter
 
-	fs.filterId++
+	return es.subscribe(sub)
+}
 
-	return id, nil
+// SubscribePendingLogs creates a subscription that will write pending logs matching the
+// given criteria to the given channel.
+func (es *EventSystem) SubscribePendingLogs(crit FilterCriteria, logs chan []Log) *Subscription {
+	sub := &subscription{
+		id:        rpc.NewID(),
+		typ:       PendingLogsSubscription,
+		logsCrit:  crit,
+		created:   time.Now(),
+		logs:      logs,
+		hashes:    make(chan common.Hash),
+		headers:   make(chan *types.Header),
+		installed: make(chan struct{}),
+		err:       make(chan error),
+	}
+
+	return es.subscribe(sub)
 }
 
-// Remove removes a filter by filter id
-// Expects filterMu to be locked.
-func (fs *FilterSystem) Remove(id int) {
-	delete(fs.chainFilters, id)
-	delete(fs.pendingTxFilters, id)
-	delete(fs.logFilters, id)
-	delete(fs.pendingLogFilters, id)
-	delete(fs.generic, id)
+// SubscribePendingTxEvents creates a sbuscription that writes transaction hashes for
+// transactions that enter the transaction pool.
+func (es *EventSystem) SubscribePendingTxEvents(hashes chan common.Hash) *Subscription {
+	sub := &subscription{
+		id:        rpc.NewID(),
+		typ:       PendingTransactionsSubscription,
+		created:   time.Now(),
+		logs:      make(chan []Log),
+		hashes:    hashes,
+		headers:   make(chan *types.Header),
+		installed: make(chan struct{}),
+		err:       make(chan error),
+	}
+
+	return es.subscribe(sub)
 }
 
-func (fs *FilterSystem) Get(id int) *Filter {
-	fs.filterMu.RLock()
-	defer fs.filterMu.RUnlock()
+// SubscribeNewHeads creates a subscription that writes the header of a block that is
+// imported in the chain.
+func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscription {
+	sub := &subscription{
+		id:        rpc.NewID(),
+		typ:       BlocksSubscription,
+		created:   time.Now(),
+		logs:      make(chan []Log),
+		hashes:    make(chan common.Hash),
+		headers:   headers,
+		installed: make(chan struct{}),
+		err:       make(chan error),
+	}
 
-	return fs.generic[id]
+	return es.subscribe(sub)
 }
 
-// filterLoop waits for specific events from ethereum and fires their handlers
-// when the filter matches the requirements.
-func (fs *FilterSystem) filterLoop() {
-	for event := range fs.sub.Chan() {
-		switch ev := event.Data.(type) {
-		case core.ChainEvent:
-			fs.filterMu.RLock()
-			for _, filter := range fs.chainFilters {
-				if filter.BlockCallback != nil && !filter.created.After(event.Time) {
-					filter.BlockCallback(ev.Block, ev.Logs)
+type filterIndex map[Type]map[rpc.ID]*subscription
+
+// broadcast event to filters that match criteria.
+func broadcast(filters filterIndex, ev *event.Event) {
+	if ev == nil {
+		return
+	}
+
+	switch e := ev.Data.(type) {
+	case vm.Logs:
+		if len(e) > 0 {
+			for _, f := range filters[LogsSubscription] {
+				if ev.Time.After(f.created) {
+					if matchedLogs := filterLogs(convertLogs(e, false), f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
+						f.logs <- matchedLogs
+					}
 				}
 			}
-			fs.filterMu.RUnlock()
-		case core.TxPreEvent:
-			fs.filterMu.RLock()
-			for _, filter := range fs.pendingTxFilters {
-				if filter.TransactionCallback != nil && !filter.created.After(event.Time) {
-					filter.TransactionCallback(ev.Tx)
+		}
+	case core.RemovedLogsEvent:
+		for _, f := range filters[LogsSubscription] {
+			if ev.Time.After(f.created) {
+				if matchedLogs := filterLogs(convertLogs(e.Logs, true), f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
+					f.logs <- matchedLogs
 				}
 			}
-			fs.filterMu.RUnlock()
-
-		case vm.Logs:
-			fs.filterMu.RLock()
-			for _, filter := range fs.logFilters {
-				if filter.LogCallback != nil && !filter.created.After(event.Time) {
-					for _, log := range filter.FilterLogs(ev) {
-						filter.LogCallback(log, false)
-					}
+		}
+	case core.PendingLogsEvent:
+		for _, f := range filters[PendingLogsSubscription] {
+			if ev.Time.After(f.created) {
+				if matchedLogs := filterLogs(convertLogs(e.Logs, false), f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
+					f.logs <- matchedLogs
 				}
 			}
-			fs.filterMu.RUnlock()
-		case core.RemovedLogsEvent:
-			fs.filterMu.RLock()
-			for _, filter := range fs.logFilters {
-				if filter.LogCallback != nil && !filter.created.After(event.Time) {
-					for _, removedLog := range filter.FilterLogs(ev.Logs) {
-						filter.LogCallback(removedLog, true)
-					}
-				}
+		}
+	case core.TxPreEvent:
+		for _, f := range filters[PendingTransactionsSubscription] {
+			if ev.Time.After(f.created) {
+				f.hashes <- e.Tx.Hash()
 			}
-			fs.filterMu.RUnlock()
-		case core.PendingLogsEvent:
-			fs.filterMu.RLock()
-			for _, filter := range fs.pendingLogFilters {
-				if filter.LogCallback != nil && !filter.created.After(event.Time) {
-					for _, pendingLog := range ev.Logs {
-						filter.LogCallback(pendingLog, false)
-					}
-				}
+		}
+	case core.ChainEvent:
+		for _, f := range filters[BlocksSubscription] {
+			if ev.Time.After(f.created) {
+				f.headers <- e.Block.Header()
 			}
-			fs.filterMu.RUnlock()
 		}
 	}
 }
+
+// eventLoop (un)installs filters and processes mux events.
+func (es *EventSystem) eventLoop() {
+	var (
+		index = make(filterIndex)
+		sub   = es.mux.Subscribe(core.PendingLogsEvent{}, core.RemovedLogsEvent{}, vm.Logs{}, core.TxPreEvent{}, core.ChainEvent{})
+	)
+	for {
+		select {
+		case ev, active := <-sub.Chan():
+			if !active { // system stopped
+				return
+			}
+			broadcast(index, ev)
+		case f := <-es.install:
+			if _, found := index[f.typ]; !found {
+				index[f.typ] = make(map[rpc.ID]*subscription)
+			}
+			index[f.typ][f.id] = f
+			close(f.installed)
+		case f := <-es.uninstall:
+			delete(index[f.typ], f.id)
+			close(f.err)
+		}
+	}
+}
+
+// convertLogs is a helper utility that converts vm.Logs to []filter.Log.
+func convertLogs(in vm.Logs, removed bool) []Log {
+	logs := make([]Log, len(in))
+	for i, l := range in {
+		logs[i] = Log{l, removed}
+	}
+	return logs
+}
diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go
index 72824cb0880a25c61fe9f549276688f38275be2e..9e6fde1c64a4553407c0e699a0f24294170be3a0 100644
--- a/eth/filters/filter_system_test.go
+++ b/eth/filters/filter_system_test.go
@@ -17,101 +17,310 @@
 package filters
 
 import (
+	"math/big"
+	"reflect"
 	"testing"
 	"time"
 
+	"github.com/ethereum/go-ethereum/common"
 	"github.com/ethereum/go-ethereum/core"
 	"github.com/ethereum/go-ethereum/core/types"
 	"github.com/ethereum/go-ethereum/core/vm"
+	"github.com/ethereum/go-ethereum/ethdb"
 	"github.com/ethereum/go-ethereum/event"
+	"github.com/ethereum/go-ethereum/rpc"
 )
 
-func TestCallbacks(t *testing.T) {
+var (
+	mux   = new(event.TypeMux)
+	db, _ = ethdb.NewMemDatabase()
+	api   = NewPublicFilterAPI(db, mux)
+)
+
+// TestBlockSubscription tests if a block subscription returns block hashes for posted chain events.
+// It creates multiple subscriptions:
+// - one at the start and should receive all posted chain events and a second (blockHashes)
+// - one that is created after a cutoff moment and uninstalled after a second cutoff moment (blockHashes[cutoff1:cutoff2])
+// - one that is created after the second cutoff moment (blockHashes[cutoff2:])
+func TestBlockSubscription(t *testing.T) {
+	t.Parallel()
+
 	var (
-		mux            event.TypeMux
-		fs             = NewFilterSystem(&mux)
-		blockDone      = make(chan struct{})
-		txDone         = make(chan struct{})
-		logDone        = make(chan struct{})
-		removedLogDone = make(chan struct{})
-		pendingLogDone = make(chan struct{})
+		genesis     = core.WriteGenesisBlockForTesting(db)
+		chain, _    = core.GenerateChain(nil, genesis, db, 10, func(i int, gen *core.BlockGen) {})
+		chainEvents = []core.ChainEvent{}
 	)
 
-	blockFilter := &Filter{
-		BlockCallback: func(*types.Block, vm.Logs) {
-			close(blockDone)
-		},
-	}
-	txFilter := &Filter{
-		TransactionCallback: func(*types.Transaction) {
-			close(txDone)
-		},
+	for _, blk := range chain {
+		chainEvents = append(chainEvents, core.ChainEvent{Hash: blk.Hash(), Block: blk})
 	}
-	logFilter := &Filter{
-		LogCallback: func(l *vm.Log, oob bool) {
-			if !oob {
-				close(logDone)
+
+	chan0 := make(chan *types.Header)
+	sub0 := api.events.SubscribeNewHeads(chan0)
+	chan1 := make(chan *types.Header)
+	sub1 := api.events.SubscribeNewHeads(chan1)
+
+	go func() { // simulate client
+		i1, i2 := 0, 0
+		for i1 != len(chainEvents) || i2 != len(chainEvents) {
+			select {
+			case header := <-chan0:
+				if chainEvents[i1].Hash != header.Hash() {
+					t.Errorf("sub0 received invalid hash on index %d, want %x, got %x", i1, chainEvents[i1].Hash, header.Hash())
+				}
+				i1++
+			case header := <-chan1:
+				if chainEvents[i2].Hash != header.Hash() {
+					t.Errorf("sub1 received invalid hash on index %d, want %x, got %x", i2, chainEvents[i2].Hash, header.Hash())
+				}
+				i2++
 			}
-		},
+		}
+
+		sub0.Unsubscribe()
+		sub1.Unsubscribe()
+	}()
+
+	time.Sleep(1 * time.Second)
+	for _, e := range chainEvents {
+		mux.Post(e)
 	}
-	removedLogFilter := &Filter{
-		LogCallback: func(l *vm.Log, oob bool) {
-			if oob {
-				close(removedLogDone)
-			}
-		},
+
+	<-sub0.Err()
+	<-sub1.Err()
+}
+
+// TestPendingTxFilter tests whether pending tx filters retrieve all pending transactions that are posted to the event mux.
+func TestPendingTxFilter(t *testing.T) {
+	t.Parallel()
+
+	var (
+		transactions = []*types.Transaction{
+			types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil),
+			types.NewTransaction(1, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil),
+			types.NewTransaction(2, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil),
+			types.NewTransaction(3, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil),
+			types.NewTransaction(4, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil),
+		}
+
+		hashes []common.Hash
+	)
+
+	fid0 := api.NewPendingTransactionFilter()
+
+	time.Sleep(1 * time.Second)
+	for _, tx := range transactions {
+		ev := core.TxPreEvent{Tx: tx}
+		mux.Post(ev)
 	}
-	pendingLogFilter := &Filter{
-		LogCallback: func(*vm.Log, bool) {
-			close(pendingLogDone)
-		},
+
+	for {
+		h := api.GetFilterChanges(fid0).([]common.Hash)
+		hashes = append(hashes, h...)
+
+		if len(hashes) >= len(transactions) {
+			break
+		}
+
+		time.Sleep(100 * time.Millisecond)
+	}
+
+	for i := range hashes {
+		if hashes[i] != transactions[i].Hash() {
+			t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), hashes[i])
+		}
+	}
+}
+
+// TestLogFilter tests whether log filters match the correct logs that are posted to the event mux.
+func TestLogFilter(t *testing.T) {
+	t.Parallel()
+
+	var (
+		firstAddr      = common.HexToAddress("0x1111111111111111111111111111111111111111")
+		secondAddr     = common.HexToAddress("0x2222222222222222222222222222222222222222")
+		thirdAddress   = common.HexToAddress("0x3333333333333333333333333333333333333333")
+		notUsedAddress = common.HexToAddress("0x9999999999999999999999999999999999999999")
+		firstTopic     = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111")
+		secondTopic    = common.HexToHash("0x2222222222222222222222222222222222222222222222222222222222222222")
+		notUsedTopic   = common.HexToHash("0x9999999999999999999999999999999999999999999999999999999999999999")
+
+		allLogs = vm.Logs{
+			// Note, these are used for comparison of the test cases.
+			vm.NewLog(firstAddr, []common.Hash{}, []byte(""), 0),
+			vm.NewLog(firstAddr, []common.Hash{firstTopic}, []byte(""), 1),
+			vm.NewLog(secondAddr, []common.Hash{firstTopic}, []byte(""), 1),
+			vm.NewLog(thirdAddress, []common.Hash{secondTopic}, []byte(""), 2),
+			vm.NewLog(thirdAddress, []common.Hash{secondTopic}, []byte(""), 3),
+		}
+
+		testCases = []struct {
+			crit     FilterCriteria
+			expected vm.Logs
+			id       rpc.ID
+		}{
+			// match all
+			{FilterCriteria{}, allLogs, ""},
+			// match none due to no matching addresses
+			{FilterCriteria{Addresses: []common.Address{common.Address{}, notUsedAddress}, Topics: [][]common.Hash{allLogs[0].Topics}}, vm.Logs{}, ""},
+			// match logs based on addresses, ignore topics
+			{FilterCriteria{Addresses: []common.Address{firstAddr}}, allLogs[:2], ""},
+			// match none due to no matching topics (match with address)
+			{FilterCriteria{Addresses: []common.Address{secondAddr}, Topics: [][]common.Hash{[]common.Hash{notUsedTopic}}}, vm.Logs{}, ""},
+			// match logs based on addresses and topics
+			{FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{[]common.Hash{firstTopic, secondTopic}}}, allLogs[3:5], ""},
+			// match logs based on multiple addresses and "or" topics
+			{FilterCriteria{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{[]common.Hash{firstTopic, secondTopic}}}, allLogs[2:5], ""},
+			// block numbers are ignored for filters created with New***Filter, these return all logs that match the given criterias when the state changes
+			{FilterCriteria{Addresses: []common.Address{firstAddr}, FromBlock: big.NewInt(1), ToBlock: big.NewInt(2)}, allLogs[:2], ""},
+		}
+
+		err error
+	)
+
+	// create all filters
+	for i := range testCases {
+		testCases[i].id = api.NewFilter(testCases[i].crit)
 	}
 
-	fs.Add(blockFilter, ChainFilter)
-	fs.Add(txFilter, PendingTxFilter)
-	fs.Add(logFilter, LogFilter)
-	fs.Add(removedLogFilter, LogFilter)
-	fs.Add(pendingLogFilter, PendingLogFilter)
-
-	mux.Post(core.ChainEvent{})
-	mux.Post(core.TxPreEvent{})
-	mux.Post(vm.Logs{&vm.Log{}})
-	mux.Post(core.RemovedLogsEvent{Logs: vm.Logs{&vm.Log{}}})
-	mux.Post(core.PendingLogsEvent{Logs: vm.Logs{&vm.Log{}}})
-
-	const dura = 5 * time.Second
-	failTimer := time.NewTimer(dura)
-	select {
-	case <-blockDone:
-	case <-failTimer.C:
-		t.Error("block filter failed to trigger (timeout)")
+	// raise events
+	time.Sleep(1 * time.Second)
+	if err = mux.Post(allLogs); err != nil {
+		t.Fatal(err)
 	}
 
-	failTimer.Reset(dura)
-	select {
-	case <-txDone:
-	case <-failTimer.C:
-		t.Error("transaction filter failed to trigger (timeout)")
+	for i, tt := range testCases {
+		var fetched []Log
+		for { // fetch all expected logs
+			fetched = append(fetched, api.GetFilterChanges(tt.id).([]Log)...)
+			if len(fetched) >= len(tt.expected) {
+				break
+			}
+
+			time.Sleep(100 * time.Millisecond)
+		}
+
+		if len(fetched) != len(tt.expected) {
+			t.Errorf("invalid number of logs for case %d, want %d log(s), got %d", i, len(tt.expected), len(fetched))
+			return
+		}
+
+		for l := range fetched {
+			if fetched[l].Removed {
+				t.Errorf("expected log not to be removed for log %d in case %d", l, i)
+			}
+			if !reflect.DeepEqual(fetched[l].Log, tt.expected[l]) {
+				t.Errorf("invalid log on index %d for case %d", l, i)
+			}
+
+		}
 	}
+}
 
-	failTimer.Reset(dura)
-	select {
-	case <-logDone:
-	case <-failTimer.C:
-		t.Error("log filter failed to trigger (timeout)")
+// TestPendingLogsSubscription tests if a subscription receives the correct pending logs that are posted to the event mux.
+func TestPendingLogsSubscription(t *testing.T) {
+	t.Parallel()
+
+	var (
+		firstAddr      = common.HexToAddress("0x1111111111111111111111111111111111111111")
+		secondAddr     = common.HexToAddress("0x2222222222222222222222222222222222222222")
+		thirdAddress   = common.HexToAddress("0x3333333333333333333333333333333333333333")
+		notUsedAddress = common.HexToAddress("0x9999999999999999999999999999999999999999")
+		firstTopic     = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111")
+		secondTopic    = common.HexToHash("0x2222222222222222222222222222222222222222222222222222222222222222")
+		thirdTopic     = common.HexToHash("0x3333333333333333333333333333333333333333333333333333333333333333")
+		forthTopic     = common.HexToHash("0x4444444444444444444444444444444444444444444444444444444444444444")
+		notUsedTopic   = common.HexToHash("0x9999999999999999999999999999999999999999999999999999999999999999")
+
+		allLogs = []core.PendingLogsEvent{
+			core.PendingLogsEvent{Logs: vm.Logs{vm.NewLog(firstAddr, []common.Hash{}, []byte(""), 0)}},
+			core.PendingLogsEvent{Logs: vm.Logs{vm.NewLog(firstAddr, []common.Hash{firstTopic}, []byte(""), 1)}},
+			core.PendingLogsEvent{Logs: vm.Logs{vm.NewLog(secondAddr, []common.Hash{firstTopic}, []byte(""), 2)}},
+			core.PendingLogsEvent{Logs: vm.Logs{vm.NewLog(thirdAddress, []common.Hash{secondTopic}, []byte(""), 3)}},
+			core.PendingLogsEvent{Logs: vm.Logs{vm.NewLog(thirdAddress, []common.Hash{secondTopic}, []byte(""), 4)}},
+			core.PendingLogsEvent{Logs: vm.Logs{
+				vm.NewLog(thirdAddress, []common.Hash{firstTopic}, []byte(""), 5),
+				vm.NewLog(thirdAddress, []common.Hash{thirdTopic}, []byte(""), 5),
+				vm.NewLog(thirdAddress, []common.Hash{forthTopic}, []byte(""), 5),
+				vm.NewLog(firstAddr, []common.Hash{firstTopic}, []byte(""), 5),
+			}},
+		}
+
+		convertLogs = func(pl []core.PendingLogsEvent) vm.Logs {
+			var logs vm.Logs
+			for _, l := range pl {
+				logs = append(logs, l.Logs...)
+			}
+			return logs
+		}
+
+		testCases = []struct {
+			crit     FilterCriteria
+			expected vm.Logs
+			c        chan []Log
+			sub      *Subscription
+		}{
+			// match all
+			{FilterCriteria{}, convertLogs(allLogs), nil, nil},
+			// match none due to no matching addresses
+			{FilterCriteria{Addresses: []common.Address{common.Address{}, notUsedAddress}, Topics: [][]common.Hash{[]common.Hash{}}}, vm.Logs{}, nil, nil},
+			// match logs based on addresses, ignore topics
+			{FilterCriteria{Addresses: []common.Address{firstAddr}}, append(convertLogs(allLogs[:2]), allLogs[5].Logs[3]), nil, nil},
+			// match none due to no matching topics (match with address)
+			{FilterCriteria{Addresses: []common.Address{secondAddr}, Topics: [][]common.Hash{[]common.Hash{notUsedTopic}}}, vm.Logs{}, nil, nil},
+			// match logs based on addresses and topics
+			{FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{[]common.Hash{firstTopic, secondTopic}}}, append(convertLogs(allLogs[3:5]), allLogs[5].Logs[0]), nil, nil},
+			// match logs based on multiple addresses and "or" topics
+			{FilterCriteria{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{[]common.Hash{firstTopic, secondTopic}}}, append(convertLogs(allLogs[2:5]), allLogs[5].Logs[0]), nil, nil},
+			// block numbers are ignored for filters created with New***Filter, these return all logs that match the given criterias when the state changes
+			{FilterCriteria{Addresses: []common.Address{firstAddr}, FromBlock: big.NewInt(2), ToBlock: big.NewInt(3)}, append(convertLogs(allLogs[:2]), allLogs[5].Logs[3]), nil, nil},
+			// multiple pending logs, should match only 2 topics from the logs in block 5
+			{FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{[]common.Hash{firstTopic, forthTopic}}}, vm.Logs{allLogs[5].Logs[0], allLogs[5].Logs[2]}, nil, nil},
+		}
+	)
+
+	// create all subscriptions, this ensures all subscriptions are created before the events are posted.
+	// on slow machines this could otherwise lead to missing events when the subscription is created after
+	// (some) events are posted.
+	for i := range testCases {
+		testCases[i].c = make(chan []Log)
+		testCases[i].sub = api.events.SubscribePendingLogs(testCases[i].crit, testCases[i].c)
 	}
 
-	failTimer.Reset(dura)
-	select {
-	case <-removedLogDone:
-	case <-failTimer.C:
-		t.Error("removed log filter failed to trigger (timeout)")
+	for n, test := range testCases {
+		i := n
+		tt := test
+		go func() {
+			var fetched []Log
+		fetchLoop:
+			for {
+				logs := <-tt.c
+				fetched = append(fetched, logs...)
+				if len(fetched) >= len(tt.expected) {
+					break fetchLoop
+				}
+			}
+
+			if len(fetched) != len(tt.expected) {
+				t.Fatalf("invalid number of logs for case %d, want %d log(s), got %d", i, len(tt.expected), len(fetched))
+			}
+
+			for l := range fetched {
+				if fetched[l].Removed {
+					t.Errorf("expected log not to be removed for log %d in case %d", l, i)
+				}
+				if !reflect.DeepEqual(fetched[l].Log, tt.expected[l]) {
+					t.Errorf("invalid log on index %d for case %d", l, i)
+				}
+			}
+		}()
 	}
 
-	failTimer.Reset(dura)
-	select {
-	case <-pendingLogDone:
-	case <-failTimer.C:
-		t.Error("pending log filter failed to trigger (timeout)")
+	// raise events
+	time.Sleep(1 * time.Second)
+	for _, l := range allLogs {
+		if err := mux.Post(l); err != nil {
+			t.Fatal(err)
+		}
 	}
 }
diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go
index 88bacc45bfa403f1045adcad4f685d5ffc6de284..e1729d1d29fae205442656e2116d4a57fe2771f1 100644
--- a/internal/ethapi/api.go
+++ b/internal/ethapi/api.go
@@ -24,7 +24,6 @@ import (
 	"fmt"
 	"math/big"
 	"strings"
-	"sync"
 	"time"
 
 	"github.com/ethereum/ethash"
@@ -345,37 +344,12 @@ func (s *PrivateAccountAPI) SignAndSendTransaction(ctx context.Context, args Sen
 // PublicBlockChainAPI provides an API to access the Ethereum blockchain.
 // It offers only methods that operate on public data that is freely available to anyone.
 type PublicBlockChainAPI struct {
-	b                       Backend
-	muNewBlockSubscriptions sync.Mutex                             // protects newBlocksSubscriptions
-	newBlockSubscriptions   map[string]func(core.ChainEvent) error // callbacks for new block subscriptions
+	b Backend
 }
 
 // NewPublicBlockChainAPI creates a new Etheruem blockchain API.
 func NewPublicBlockChainAPI(b Backend) *PublicBlockChainAPI {
-	api := &PublicBlockChainAPI{
-		b: b,
-		newBlockSubscriptions: make(map[string]func(core.ChainEvent) error),
-	}
-
-	go api.subscriptionLoop()
-
-	return api
-}
-
-// subscriptionLoop reads events from the global event mux and creates notifications for the matched subscriptions.
-func (s *PublicBlockChainAPI) subscriptionLoop() {
-	sub := s.b.EventMux().Subscribe(core.ChainEvent{})
-	for event := range sub.Chan() {
-		if chainEvent, ok := event.Data.(core.ChainEvent); ok {
-			s.muNewBlockSubscriptions.Lock()
-			for id, notifyOf := range s.newBlockSubscriptions {
-				if notifyOf(chainEvent) == rpc.ErrNotificationNotFound {
-					delete(s.newBlockSubscriptions, id)
-				}
-			}
-			s.muNewBlockSubscriptions.Unlock()
-		}
-	}
+	return &PublicBlockChainAPI{b}
 }
 
 // BlockNumber returns the block number of the chain head.
@@ -470,45 +444,6 @@ func (s *PublicBlockChainAPI) GetUncleCountByBlockHash(ctx context.Context, bloc
 	return nil
 }
 
-// NewBlocksArgs allows the user to specify if the returned block should include transactions and in which format.
-type NewBlocksArgs struct {
-	IncludeTransactions bool `json:"includeTransactions"`
-	TransactionDetails  bool `json:"transactionDetails"`
-}
-
-// NewBlocks triggers a new block event each time a block is appended to the chain. It accepts an argument which allows
-// the caller to specify whether the output should contain transactions and in what format.
-func (s *PublicBlockChainAPI) NewBlocks(ctx context.Context, args NewBlocksArgs) (rpc.Subscription, error) {
-	notifier, supported := rpc.NotifierFromContext(ctx)
-	if !supported {
-		return nil, rpc.ErrNotificationsUnsupported
-	}
-
-	// create a subscription that will remove itself when unsubscribed/cancelled
-	subscription, err := notifier.NewSubscription(func(subId string) {
-		s.muNewBlockSubscriptions.Lock()
-		delete(s.newBlockSubscriptions, subId)
-		s.muNewBlockSubscriptions.Unlock()
-	})
-
-	if err != nil {
-		return nil, err
-	}
-
-	// add a callback that is called on chain events which will format the block and notify the client
-	s.muNewBlockSubscriptions.Lock()
-	s.newBlockSubscriptions[subscription.ID()] = func(e core.ChainEvent) error {
-		notification, err := s.rpcOutputBlock(e.Block, args.IncludeTransactions, args.TransactionDetails)
-		if err == nil {
-			return subscription.Notify(notification)
-		}
-		glog.V(logger.Warn).Info("unable to format block %v\n", err)
-		return nil
-	}
-	s.muNewBlockSubscriptions.Unlock()
-	return subscription, nil
-}
-
 // GetCode returns the code stored at the given address in the state for the given block number.
 func (s *PublicBlockChainAPI) GetCode(ctx context.Context, address common.Address, blockNr rpc.BlockNumber) (string, error) {
 	state, _, err := s.b.StateAndHeaderByNumber(blockNr)
@@ -867,40 +802,12 @@ func newRPCTransaction(b *types.Block, txHash common.Hash) (*RPCTransaction, err
 
 // PublicTransactionPoolAPI exposes methods for the RPC interface
 type PublicTransactionPoolAPI struct {
-	b               Backend
-	muPendingTxSubs sync.Mutex
-	pendingTxSubs   map[string]rpc.Subscription
+	b Backend
 }
 
 // NewPublicTransactionPoolAPI creates a new RPC service with methods specific for the transaction pool.
 func NewPublicTransactionPoolAPI(b Backend) *PublicTransactionPoolAPI {
-	api := &PublicTransactionPoolAPI{
-		b:             b,
-		pendingTxSubs: make(map[string]rpc.Subscription),
-	}
-
-	go api.subscriptionLoop()
-
-	return api
-}
-
-// subscriptionLoop listens for events on the global event mux and creates notifications for subscriptions.
-func (s *PublicTransactionPoolAPI) subscriptionLoop() {
-	sub := s.b.EventMux().Subscribe(core.TxPreEvent{})
-	for event := range sub.Chan() {
-		tx := event.Data.(core.TxPreEvent)
-		if from, err := tx.Tx.FromFrontier(); err == nil {
-			if s.b.AccountManager().HasAddress(from) {
-				s.muPendingTxSubs.Lock()
-				for id, sub := range s.pendingTxSubs {
-					if sub.Notify(tx.Tx.Hash()) == rpc.ErrNotificationNotFound {
-						delete(s.pendingTxSubs, id)
-					}
-				}
-				s.muPendingTxSubs.Unlock()
-			}
-		}
-	}
+	return &PublicTransactionPoolAPI{b}
 }
 
 func getTransaction(chainDb ethdb.Database, b Backend, txHash common.Hash) (*types.Transaction, bool, error) {
@@ -1353,31 +1260,6 @@ func (s *PublicTransactionPoolAPI) PendingTransactions() []*RPCTransaction {
 	return transactions
 }
 
-// NewPendingTransactions creates a subscription that is triggered each time a transaction enters the transaction pool
-// and is send from one of the transactions this nodes manages.
-func (s *PublicTransactionPoolAPI) NewPendingTransactions(ctx context.Context) (rpc.Subscription, error) {
-	notifier, supported := rpc.NotifierFromContext(ctx)
-	if !supported {
-		return nil, rpc.ErrNotificationsUnsupported
-	}
-
-	subscription, err := notifier.NewSubscription(func(id string) {
-		s.muPendingTxSubs.Lock()
-		delete(s.pendingTxSubs, id)
-		s.muPendingTxSubs.Unlock()
-	})
-
-	if err != nil {
-		return nil, err
-	}
-
-	s.muPendingTxSubs.Lock()
-	s.pendingTxSubs[subscription.ID()] = subscription
-	s.muPendingTxSubs.Unlock()
-
-	return subscription, nil
-}
-
 // Resend accepts an existing transaction and a new gas price and limit. It will remove the given transaction from the
 // pool and reinsert it with the new gas price and limit.
 func (s *PublicTransactionPoolAPI) Resend(ctx context.Context, tx *Tx, gasPrice, gasLimit *rpc.HexNumber) (common.Hash, error) {
diff --git a/rpc/notification.go b/rpc/notification.go
deleted file mode 100644
index 8754330716d1b683152976795bad359f351f113f..0000000000000000000000000000000000000000
--- a/rpc/notification.go
+++ /dev/null
@@ -1,297 +0,0 @@
-// Copyright 2016 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-package rpc
-
-import (
-	"errors"
-	"sync"
-	"time"
-
-	"github.com/ethereum/go-ethereum/logger"
-	"github.com/ethereum/go-ethereum/logger/glog"
-	"golang.org/x/net/context"
-)
-
-var (
-	// ErrNotificationsUnsupported is returned when the connection doesn't support notifications
-	ErrNotificationsUnsupported = errors.New("subscription notifications not supported by the current transport")
-
-	// ErrNotificationNotFound is returned when the notification for the given id is not found
-	ErrNotificationNotFound = errors.New("notification not found")
-
-	// errNotifierStopped is returned when the notifier is stopped (e.g. codec is closed)
-	errNotifierStopped = errors.New("unable to send notification")
-
-	// errNotificationQueueFull is returns when there are too many notifications in the queue
-	errNotificationQueueFull = errors.New("too many pending notifications")
-)
-
-// unsubSignal is a signal that the subscription is unsubscribed. It is used to flush buffered
-// notifications that might be pending in the internal queue.
-var unsubSignal = new(struct{})
-
-// UnsubscribeCallback defines a callback that is called when a subcription ends.
-// It receives the subscription id as argument.
-type UnsubscribeCallback func(id string)
-
-// notification is a helper object that holds event data for a subscription
-type notification struct {
-	sub  *bufferedSubscription // subscription id
-	data interface{}           // event data
-}
-
-// A Notifier type describes the interface for objects that can send create subscriptions
-type Notifier interface {
-	// Create a new subscription. The given callback is called when this subscription
-	// is cancelled (e.g. client send an unsubscribe, connection closed).
-	NewSubscription(UnsubscribeCallback) (Subscription, error)
-	// Cancel subscription
-	Unsubscribe(id string) error
-}
-
-type notifierKey struct{}
-
-// NotifierFromContext returns the Notifier value stored in ctx, if any.
-func NotifierFromContext(ctx context.Context) (Notifier, bool) {
-	n, ok := ctx.Value(notifierKey{}).(Notifier)
-	return n, ok
-}
-
-// Subscription defines the interface for objects that can notify subscribers
-type Subscription interface {
-	// Inform client of an event
-	Notify(data interface{}) error
-	// Unique identifier
-	ID() string
-	// Cancel subscription
-	Cancel() error
-}
-
-// bufferedSubscription is a subscription that uses a bufferedNotifier to send
-// notifications to subscribers.
-type bufferedSubscription struct {
-	id               string
-	unsubOnce        sync.Once           // call unsub method once
-	unsub            UnsubscribeCallback // called on Unsubscribed
-	notifier         *bufferedNotifier   // forward notifications to
-	pending          chan interface{}    // closed when active
-	flushed          chan interface{}    // closed when all buffered notifications are send
-	lastNotification time.Time           // last time a notification was send
-}
-
-// ID returns the subscription identifier that the client uses to refer to this instance.
-func (s *bufferedSubscription) ID() string {
-	return s.id
-}
-
-// Cancel informs the notifier that this subscription is cancelled by the API
-func (s *bufferedSubscription) Cancel() error {
-	return s.notifier.Unsubscribe(s.id)
-}
-
-// Notify the subscriber of a particular event.
-func (s *bufferedSubscription) Notify(data interface{}) error {
-	return s.notifier.send(s.id, data)
-}
-
-// bufferedNotifier is a notifier that queues notifications in an internal queue and
-// send them as fast as possible to the client from this queue. It will stop if the
-// queue grows past a given size.
-type bufferedNotifier struct {
-	codec         ServerCodec                      // underlying connection
-	mu            sync.Mutex                       // guard internal state
-	subscriptions map[string]*bufferedSubscription // keep track of subscriptions associated with codec
-	queueSize     int                              // max number of items in queue
-	queue         chan *notification               // notification queue
-	stopped       bool                             // indication if this notifier is ordered to stop
-}
-
-// newBufferedNotifier returns a notifier that queues notifications in an internal queue
-// from which notifications are send as fast as possible to the client. If the queue size
-// limit is reached (client is unable to keep up) it will stop and closes the codec.
-func newBufferedNotifier(codec ServerCodec, size int) *bufferedNotifier {
-	notifier := &bufferedNotifier{
-		codec:         codec,
-		subscriptions: make(map[string]*bufferedSubscription),
-		queue:         make(chan *notification, size),
-		queueSize:     size,
-	}
-
-	go notifier.run()
-
-	return notifier
-}
-
-// NewSubscription creates a new subscription that forwards events to this instance internal
-// queue. The given callback is called when the subscription is unsubscribed/cancelled.
-func (n *bufferedNotifier) NewSubscription(callback UnsubscribeCallback) (Subscription, error) {
-	id, err := newSubscriptionID()
-	if err != nil {
-		return nil, err
-	}
-
-	n.mu.Lock()
-	defer n.mu.Unlock()
-
-	if n.stopped {
-		return nil, errNotifierStopped
-	}
-
-	sub := &bufferedSubscription{
-		id:               id,
-		unsub:            callback,
-		notifier:         n,
-		pending:          make(chan interface{}),
-		flushed:          make(chan interface{}),
-		lastNotification: time.Now(),
-	}
-
-	n.subscriptions[id] = sub
-
-	return sub, nil
-}
-
-// Remove the given subscription. If subscription is not found notificationNotFoundErr is returned.
-func (n *bufferedNotifier) Unsubscribe(subid string) error {
-	n.mu.Lock()
-	sub, found := n.subscriptions[subid]
-	n.mu.Unlock()
-
-	if found {
-		// send the unsubscribe signal, this will cause the notifier not to accept new events
-		// for this subscription and will close the flushed channel after the last (buffered)
-		// notification was send to the client.
-		if err := n.send(subid, unsubSignal); err != nil {
-			return err
-		}
-
-		// wait for confirmation that all (buffered) events are send for this subscription.
-		// this ensures that the unsubscribe method response is not send before all buffered
-		// events for this subscription are send.
-		<-sub.flushed
-
-		return nil
-	}
-
-	return ErrNotificationNotFound
-}
-
-// Send enques the given data for the subscription with public ID on the internal queue. t returns
-// an error when the notifier is stopped or the queue is full. If data is the unsubscribe signal it
-// will remove the subscription with the given id from the subscription collection.
-func (n *bufferedNotifier) send(id string, data interface{}) error {
-	n.mu.Lock()
-	defer n.mu.Unlock()
-
-	if n.stopped {
-		return errNotifierStopped
-	}
-
-	var (
-		subscription *bufferedSubscription
-		found        bool
-	)
-
-	// check if subscription is associated with this connection, it might be cancelled
-	// (subscribe/connection closed)
-	if subscription, found = n.subscriptions[id]; !found {
-		glog.V(logger.Error).Infof("received notification for unknown subscription %s\n", id)
-		return ErrNotificationNotFound
-	}
-
-	// received the unsubscribe signal. Add it to the queue to make sure any pending notifications
-	// for this subscription are send. When the run loop receives this singal it will signal that
-	// all pending subscriptions are flushed and that the confirmation of the unsubscribe can be
-	// send to the user. Remove the subscriptions to make sure new notifications are not accepted.
-	if data == unsubSignal {
-		delete(n.subscriptions, id)
-		if subscription.unsub != nil {
-			subscription.unsubOnce.Do(func() { subscription.unsub(id) })
-		}
-	}
-
-	subscription.lastNotification = time.Now()
-
-	if len(n.queue) >= n.queueSize {
-		glog.V(logger.Warn).Infoln("too many buffered notifications -> close connection")
-		n.codec.Close()
-		return errNotificationQueueFull
-	}
-
-	n.queue <- &notification{subscription, data}
-	return nil
-}
-
-// run reads notifications from the internal queue and sends them to the client. In case of an
-// error, or when the codec is closed it will cancel all active subscriptions and returns.
-func (n *bufferedNotifier) run() {
-	defer func() {
-		n.mu.Lock()
-		defer n.mu.Unlock()
-
-		n.stopped = true
-		close(n.queue)
-
-		// on exit call unsubscribe callback
-		for id, sub := range n.subscriptions {
-			if sub.unsub != nil {
-				sub.unsubOnce.Do(func() { sub.unsub(id) })
-			}
-			close(sub.flushed)
-			delete(n.subscriptions, id)
-		}
-	}()
-
-	for {
-		select {
-		case notification := <-n.queue:
-			// It can happen that an event is raised before the RPC server was able to send the sub
-			// id to the client. Therefore subscriptions are marked as pending until the sub id was
-			// send. The RPC server will activate the subscription by closing the pending chan.
-			<-notification.sub.pending
-
-			if notification.data == unsubSignal {
-				// unsubSignal is the last accepted message for this subscription. Raise the signal
-				// that all buffered notifications are sent by closing the flushed channel. This
-				// indicates that the response for the unsubscribe can be send to the client.
-				close(notification.sub.flushed)
-			} else {
-				msg := n.codec.CreateNotification(notification.sub.id, notification.data)
-				if err := n.codec.Write(msg); err != nil {
-					n.codec.Close()
-					// unable to send notification to client, unsubscribe all subscriptions
-					glog.V(logger.Warn).Infof("unable to send notification - %v\n", err)
-					return
-				}
-			}
-		case <-n.codec.Closed(): // connection was closed
-			glog.V(logger.Debug).Infoln("codec closed, stop subscriptions")
-			return
-		}
-	}
-}
-
-// Marks the subscription as active. This will causes the notifications for this subscription to be
-// forwarded to the client.
-func (n *bufferedNotifier) activate(subid string) {
-	n.mu.Lock()
-	defer n.mu.Unlock()
-
-	if sub, found := n.subscriptions[subid]; found {
-		close(sub.pending)
-	}
-}
diff --git a/rpc/server.go b/rpc/server.go
index 040805a5c2c3869a98d3f7704c7d93b5db1ffc75..996c6370046e8b2a703db41c6821e6b851283af2 100644
--- a/rpc/server.go
+++ b/rpc/server.go
@@ -166,7 +166,7 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecO
 	// to send notification to clients. It is thight to the codec/connection. If the
 	// connection is closed the notifier will stop and cancels all active subscriptions.
 	if options&OptionSubscriptions == OptionSubscriptions {
-		ctx = context.WithValue(ctx, notifierKey{}, newBufferedNotifier(codec, notificationBufferSize))
+		ctx = context.WithValue(ctx, notifierKey{}, newNotifier(codec))
 	}
 	s.codecsMu.Lock()
 	if atomic.LoadInt32(&s.run) != 1 { // server stopped
@@ -247,7 +247,7 @@ func (s *Server) Stop() {
 }
 
 // createSubscription will call the subscription callback and returns the subscription id or error.
-func (s *Server) createSubscription(ctx context.Context, c ServerCodec, req *serverRequest) (string, error) {
+func (s *Server) createSubscription(ctx context.Context, c ServerCodec, req *serverRequest) (ID, error) {
 	// subscription have as first argument the context following optional arguments
 	args := []reflect.Value{req.callb.rcvr, reflect.ValueOf(ctx)}
 	args = append(args, req.args...)
@@ -257,7 +257,7 @@ func (s *Server) createSubscription(ctx context.Context, c ServerCodec, req *ser
 		return "", reply[1].Interface().(error)
 	}
 
-	return reply[0].Interface().(Subscription).ID(), nil
+	return reply[0].Interface().(*Subscription).ID, nil
 }
 
 // handle executes a request and returns the response from the callback.
@@ -273,8 +273,8 @@ func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverReque
 				return codec.CreateErrorResponse(&req.id, &callbackError{ErrNotificationsUnsupported.Error()}), nil
 			}
 
-			subid := req.args[0].String()
-			if err := notifier.Unsubscribe(subid); err != nil {
+			subid := ID(req.args[0].String())
+			if err := notifier.unsubscribe(subid); err != nil {
 				return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil
 			}
 
@@ -292,7 +292,7 @@ func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverReque
 		// active the subscription after the sub id was successfully sent to the client
 		activateSub := func() {
 			notifier, _ := NotifierFromContext(ctx)
-			notifier.(*bufferedNotifier).activate(subid)
+			notifier.activate(subid)
 		}
 
 		return codec.CreateResponse(req.id, subid), activateSub
diff --git a/rpc/server_test.go b/rpc/server_test.go
index e6840bde4734d4bd4d9fe5d7969c9d5315d05ffa..c3c88fab755d4e93c9a7ef1b600fcd5212a1a7d5 100644
--- a/rpc/server_test.go
+++ b/rpc/server_test.go
@@ -72,7 +72,7 @@ func (s *Service) InvalidRets3() (string, string, error) {
 	return "", "", nil
 }
 
-func (s *Service) Subscription(ctx context.Context) (Subscription, error) {
+func (s *Service) Subscription(ctx context.Context) (*Subscription, error) {
 	return nil, nil
 }
 
diff --git a/rpc/subscription.go b/rpc/subscription.go
new file mode 100644
index 0000000000000000000000000000000000000000..863d34b20267cd0ba932c8496a3e4f301f34b0fe
--- /dev/null
+++ b/rpc/subscription.go
@@ -0,0 +1,135 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package rpc
+
+import (
+	"errors"
+	"sync"
+
+	"golang.org/x/net/context"
+)
+
+var (
+	// ErrNotificationsUnsupported is returned when the connection doesn't support notifications
+	ErrNotificationsUnsupported = errors.New("notifications not supported")
+	// ErrNotificationNotFound is returned when the notification for the given id is not found
+	ErrSubscriptionNotFound = errors.New("subscription not found")
+)
+
+// ID defines a psuedo random number that is used to identify RPC subscriptions.
+type ID string
+
+// a Subscription is created by a notifier and tight to that notifier. The client can use
+// this subscription to wait for an unsubscribe request for the client, see Err().
+type Subscription struct {
+	ID  ID
+	err chan error // closed on unsubscribe
+}
+
+// Err returns a channel that is closed when the client send an unsubscribe request.
+func (s *Subscription) Err() <-chan error {
+	return s.err
+}
+
+// notifierKey is used to store a notifier within the connection context.
+type notifierKey struct{}
+
+// Notifier is tight to a RPC connection that supports subscriptions.
+// Server callbacks use the notifier to send notifications.
+type Notifier struct {
+	codec    ServerCodec
+	subMu    sync.RWMutex // guards active and inactive maps
+	stopped  bool
+	active   map[ID]*Subscription
+	inactive map[ID]*Subscription
+}
+
+// newNotifier creates a new notifier that can be used to send subscription
+// notifications to the client.
+func newNotifier(codec ServerCodec) *Notifier {
+	return &Notifier{
+		codec:    codec,
+		active:   make(map[ID]*Subscription),
+		inactive: make(map[ID]*Subscription),
+	}
+}
+
+// NotifierFromContext returns the Notifier value stored in ctx, if any.
+func NotifierFromContext(ctx context.Context) (*Notifier, bool) {
+	n, ok := ctx.Value(notifierKey{}).(*Notifier)
+	return n, ok
+}
+
+// CreateSubscription returns a new subscription that is coupled to the
+// RPC connection. By default subscriptions are inactive and notifications
+// are dropped until the subscription is marked as active. This is done
+// by the RPC server after the subscription ID is send to the client.
+func (n *Notifier) CreateSubscription() *Subscription {
+	s := &Subscription{NewID(), make(chan error)}
+	n.subMu.Lock()
+	n.inactive[s.ID] = s
+	n.subMu.Unlock()
+	return s
+}
+
+// Notify sends a notification to the client with the given data as payload.
+// If an error occurs the RPC connection is closed and the error is returned.
+func (n *Notifier) Notify(id ID, data interface{}) error {
+	n.subMu.RLock()
+	defer n.subMu.RUnlock()
+
+	_, active := n.active[id]
+	if active {
+		notification := n.codec.CreateNotification(string(id), data)
+		if err := n.codec.Write(notification); err != nil {
+			n.codec.Close()
+			return err
+		}
+	}
+	return nil
+}
+
+// Closed returns a channel that is closed when the RPC connection is closed.
+func (n *Notifier) Closed() <-chan interface{} {
+	return n.codec.Closed()
+}
+
+// unsubscribe a subscription.
+// If the subscription could not be found ErrSubscriptionNotFound is returned.
+func (n *Notifier) unsubscribe(id ID) error {
+	n.subMu.Lock()
+	defer n.subMu.Unlock()
+	if s, found := n.active[id]; found {
+		close(s.err)
+		delete(n.active, id)
+		return nil
+	}
+	return ErrSubscriptionNotFound
+}
+
+// activate enables a subscription. Until a subscription is enabled all
+// notifications are dropped. This method is called by the RPC server after
+// the subscription ID was sent to client. This prevents notifications being
+// send to the client before the subscription ID is send to the client.
+func (n *Notifier) activate(id ID) {
+	n.subMu.Lock()
+	defer n.subMu.Unlock()
+	if sub, found := n.inactive[id]; found {
+		n.active[id] = sub
+		delete(n.inactive, id)
+	}
+}
diff --git a/rpc/notification_test.go b/rpc/subscription_test.go
similarity index 84%
rename from rpc/notification_test.go
rename to rpc/subscription_test.go
index 52352848c4af915f388515da19b1e7857a17d46b..8bb3416947dfc52bf2c0e73b0e4c2337fc640986 100644
--- a/rpc/notification_test.go
+++ b/rpc/subscription_test.go
@@ -50,7 +50,7 @@ func (s *NotificationTestService) Unsubscribe(subid string) {
 	s.mu.Unlock()
 }
 
-func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val int) (Subscription, error) {
+func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val int) (*Subscription, error) {
 	notifier, supported := NotifierFromContext(ctx)
 	if !supported {
 		return nil, ErrNotificationsUnsupported
@@ -59,17 +59,29 @@ func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val i
 	// by explicitly creating an subscription we make sure that the subscription id is send back to the client
 	// before the first subscription.Notify is called. Otherwise the events might be send before the response
 	// for the eth_subscribe method.
-	subscription, err := notifier.NewSubscription(s.Unsubscribe)
-	if err != nil {
-		return nil, err
-	}
+	subscription := notifier.CreateSubscription()
 
 	go func() {
+		// test expects n events, if we begin sending event immediatly some events
+		// will probably be dropped since the subscription ID might not be send to
+		// the client.
+		time.Sleep(5 * time.Second)
 		for i := 0; i < n; i++ {
-			if err := subscription.Notify(val + i); err != nil {
+			if err := notifier.Notify(subscription.ID, val+i); err != nil {
 				return
 			}
 		}
+
+		select {
+		case <-notifier.Closed():
+			s.mu.Lock()
+			s.unsubscribed = true
+			s.mu.Unlock()
+		case <-subscription.Err():
+			s.mu.Lock()
+			s.unsubscribed = true
+			s.mu.Unlock()
+		}
 	}()
 
 	return subscription, nil
@@ -77,7 +89,7 @@ func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val i
 
 // HangSubscription blocks on s.unblockHangSubscription before
 // sending anything.
-func (s *NotificationTestService) HangSubscription(ctx context.Context, val int) (Subscription, error) {
+func (s *NotificationTestService) HangSubscription(ctx context.Context, val int) (*Subscription, error) {
 	notifier, supported := NotifierFromContext(ctx)
 	if !supported {
 		return nil, ErrNotificationsUnsupported
@@ -85,12 +97,10 @@ func (s *NotificationTestService) HangSubscription(ctx context.Context, val int)
 
 	s.gotHangSubscriptionReq <- struct{}{}
 	<-s.unblockHangSubscription
-	subscription, err := notifier.NewSubscription(s.Unsubscribe)
-	if err != nil {
-		return nil, err
-	}
+	subscription := notifier.CreateSubscription()
+
 	go func() {
-		subscription.Notify(val)
+		notifier.Notify(subscription.ID, val)
 	}()
 	return subscription, nil
 }
diff --git a/rpc/types.go b/rpc/types.go
index 2a7268ad8bb2e3a8445c50ccdad8f74d156542b8..89c5b5bc9b850d953bc7cfc01630d8246c8ebd0d 100644
--- a/rpc/types.go
+++ b/rpc/types.go
@@ -269,6 +269,6 @@ func (bn *BlockNumber) UnmarshalJSON(data []byte) error {
 	return fmt.Errorf("blocknumber not in range [%d, %d]", earliestBlockNumber, maxBlockNumber)
 }
 
-func (bn *BlockNumber) Int64() int64 {
-	return (int64)(*bn)
+func (bn BlockNumber) Int64() int64 {
+	return (int64)(bn)
 }
diff --git a/rpc/utils.go b/rpc/utils.go
index 1ac6698f568708cf59285309aa74d61bbdae3c0e..b590ba62f3777a6cf0125b3d6f4e76275ed0a294 100644
--- a/rpc/utils.go
+++ b/rpc/utils.go
@@ -17,17 +17,26 @@
 package rpc
 
 import (
-	"crypto/rand"
+	"bufio"
+	crand "crypto/rand"
+	"encoding/binary"
 	"encoding/hex"
-	"errors"
 	"math/big"
+	"math/rand"
 	"reflect"
+	"sync"
+	"time"
 	"unicode"
 	"unicode/utf8"
 
 	"golang.org/x/net/context"
 )
 
+var (
+	subscriptionIDGenMu sync.Mutex
+	subscriptionIDGen   = idGenerator()
+)
+
 // Is this an exported - upper case - name?
 func isExported(name string) bool {
 	rune, _ := utf8.DecodeRuneInString(name)
@@ -218,11 +227,28 @@ METHODS:
 	return callbacks, subscriptions
 }
 
-func newSubscriptionID() (string, error) {
-	var subid [16]byte
-	n, _ := rand.Read(subid[:])
-	if n != 16 {
-		return "", errors.New("Unable to generate subscription id")
+// idGenerator helper utility that generates a (pseudo) random sequence of
+// bytes that are used to generate identifiers.
+func idGenerator() *rand.Rand {
+	if seed, err := binary.ReadVarint(bufio.NewReader(crand.Reader)); err == nil {
+		return rand.New(rand.NewSource(seed))
+	}
+	return rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
+}
+
+// NewID generates a identifier that can be used as an identifier in the RPC interface.
+// e.g. filter and subscription identifier.
+func NewID() ID {
+	subscriptionIDGenMu.Lock()
+	defer subscriptionIDGenMu.Unlock()
+
+	id := make([]byte, 16)
+	for i := 0; i < len(id); i += 7 {
+		val := subscriptionIDGen.Int63()
+		for j := 0; i+j < len(id) && j < 7; j++ {
+			id[i+j] = byte(val)
+			val >>= 8
+		}
 	}
-	return "0x" + hex.EncodeToString(subid[:]), nil
+	return ID("0x" + hex.EncodeToString(id))
 }