From f7a71996fbbe9cea4445600ffa3c232a6cf42803 Mon Sep 17 00:00:00 2001
From: Jeffrey Wilcke <geffobscura@gmail.com>
Date: Sun, 30 Aug 2015 10:04:59 +0200
Subject: [PATCH] core, event/filter, xeth: refactored filter system

Moved the filtering system from `event` to `eth/filters` package and
removed the `core.Filter` object. The `filters.Filter` object now
requires a `common.Database` rather than a `eth.Backend` and invokes the
`core.GetBlockByX` directly rather than thru a "manager".
---
 {core => eth/filters}/filter.go |  29 ++++---
 eth/filters/filter_system.go    | 133 ++++++++++++++++++++++++++++++++
 event/filter/eth_filter.go      | 127 ------------------------------
 xeth/xeth.go                    |  31 ++++----
 4 files changed, 162 insertions(+), 158 deletions(-)
 rename {core => eth/filters}/filter.go (89%)
 create mode 100644 eth/filters/filter_system.go
 delete mode 100644 event/filter/eth_filter.go

diff --git a/core/filter.go b/eth/filters/filter.go
similarity index 89%
rename from core/filter.go
rename to eth/filters/filter.go
index b328ffff3..b7f795607 100644
--- a/core/filter.go
+++ b/eth/filters/filter.go
@@ -14,16 +14,15 @@
 // You should have received a copy of the GNU Lesser General Public License
 // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
 
-package core
+package filters
 
 import (
 	"math"
 
 	"github.com/ethereum/go-ethereum/common"
+	"github.com/ethereum/go-ethereum/core"
 	"github.com/ethereum/go-ethereum/core/state"
 	"github.com/ethereum/go-ethereum/core/types"
-	"github.com/ethereum/go-ethereum/logger"
-	"github.com/ethereum/go-ethereum/logger/glog"
 )
 
 type AccountChange struct {
@@ -32,7 +31,7 @@ type AccountChange struct {
 
 // Filtering interface
 type Filter struct {
-	eth      Backend
+	db       common.Database
 	earliest int64
 	latest   int64
 	skip     int
@@ -47,8 +46,8 @@ type Filter struct {
 
 // Create a new filter which uses a bloom filter on blocks to figure out whether a particular block
 // is interesting or not.
-func NewFilter(eth Backend) *Filter {
-	return &Filter{eth: eth}
+func New(db common.Database) *Filter {
+	return &Filter{db: db}
 }
 
 // Set the earliest and latest block for filtering.
@@ -80,7 +79,7 @@ func (self *Filter) SetSkip(skip int) {
 
 // Run filters logs with the current parameters set
 func (self *Filter) Find() state.Logs {
-	earliestBlock := self.eth.ChainManager().CurrentBlock()
+	earliestBlock := core.GetCurrentBlock(self.db)
 	var earliestBlockNo uint64 = uint64(self.earliest)
 	if self.earliest == -1 {
 		earliestBlockNo = earliestBlock.NumberU64()
@@ -92,7 +91,7 @@ func (self *Filter) Find() state.Logs {
 
 	var (
 		logs  state.Logs
-		block = self.eth.ChainManager().GetBlockByNumber(latestBlockNo)
+		block = core.GetBlockByNumber(self.db, latestBlockNo)
 	)
 
 done:
@@ -111,17 +110,17 @@ done:
 		// current parameters
 		if self.bloomFilter(block) {
 			// Get the logs of the block
-			unfiltered, err := self.eth.BlockProcessor().GetLogs(block)
-			if err != nil {
-				glog.V(logger.Warn).Infoln("err: filter get logs ", err)
-
-				break
+			var (
+				receipts   = core.GetBlockReceipts(self.db, block.Hash())
+				unfiltered state.Logs
+			)
+			for _, receipt := range receipts {
+				unfiltered = append(unfiltered, receipt.Logs()...)
 			}
-
 			logs = append(logs, self.FilterLogs(unfiltered)...)
 		}
 
-		block = self.eth.ChainManager().GetBlock(block.ParentHash())
+		block = core.GetBlockByHash(self.db, block.ParentHash())
 	}
 
 	skip := int(math.Min(float64(len(logs)), float64(self.skip)))
diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go
new file mode 100644
index 000000000..9ad73a896
--- /dev/null
+++ b/eth/filters/filter_system.go
@@ -0,0 +1,133 @@
+// Copyright 2014 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// package filters implements an ethereum filtering system for block,
+// transactions and log events.
+package filters
+
+import (
+	"sync"
+
+	"github.com/ethereum/go-ethereum/core"
+	"github.com/ethereum/go-ethereum/core/state"
+	"github.com/ethereum/go-ethereum/event"
+)
+
+// FilterSystem manages filters that filter specific events such as
+// block, transaction and log events. The Filtering system can be used to listen
+// for specific LOG events fires by the EVM (Ethereum Virtual Machine).
+type FilterSystem struct {
+	eventMux *event.TypeMux
+
+	filterMu sync.RWMutex
+	filterId int
+	filters  map[int]*Filter
+
+	quit chan struct{}
+}
+
+// NewFilterSystem returns a newly allocated filter manager
+func NewFilterSystem(mux *event.TypeMux) *FilterSystem {
+	fs := &FilterSystem{
+		eventMux: mux,
+		filters:  make(map[int]*Filter),
+	}
+	go fs.filterLoop()
+	return fs
+}
+
+// Stop quits the filter loop required for polling events
+func (fs *FilterSystem) Stop() {
+	close(fs.quit)
+}
+
+// Add adds a filter to the filter manager
+func (fs *FilterSystem) Add(filter *Filter) (id int) {
+	fs.filterMu.Lock()
+	defer fs.filterMu.Unlock()
+	id = fs.filterId
+	fs.filters[id] = filter
+	fs.filterId++
+
+	return id
+}
+
+// Remove removes a filter by filter id
+func (fs *FilterSystem) Remove(id int) {
+	fs.filterMu.Lock()
+	defer fs.filterMu.Unlock()
+	if _, ok := fs.filters[id]; ok {
+		delete(fs.filters, id)
+	}
+}
+
+// Get retrieves a filter installed using Add The filter may not be modified.
+func (fs *FilterSystem) Get(id int) *Filter {
+	fs.filterMu.RLock()
+	defer fs.filterMu.RUnlock()
+	return fs.filters[id]
+}
+
+// filterLoop waits for specific events from ethereum and fires their handlers
+// when the filter matches the requirements.
+func (fs *FilterSystem) filterLoop() {
+	// Subscribe to events
+	events := fs.eventMux.Subscribe(
+		//core.PendingBlockEvent{},
+		core.ChainEvent{},
+		core.TxPreEvent{},
+		state.Logs(nil))
+
+out:
+	for {
+		select {
+		case <-fs.quit:
+			break out
+		case event := <-events.Chan():
+			switch event := event.(type) {
+			case core.ChainEvent:
+				fs.filterMu.RLock()
+				for _, filter := range fs.filters {
+					if filter.BlockCallback != nil {
+						filter.BlockCallback(event.Block, event.Logs)
+					}
+				}
+				fs.filterMu.RUnlock()
+
+			case core.TxPreEvent:
+				fs.filterMu.RLock()
+				for _, filter := range fs.filters {
+					if filter.TransactionCallback != nil {
+						filter.TransactionCallback(event.Tx)
+					}
+				}
+				fs.filterMu.RUnlock()
+
+			case state.Logs:
+				fs.filterMu.RLock()
+				for _, filter := range fs.filters {
+					if filter.LogsCallback != nil {
+						msgs := filter.FilterLogs(event)
+						if len(msgs) > 0 {
+							filter.LogsCallback(msgs)
+						}
+					}
+				}
+				fs.filterMu.RUnlock()
+			}
+		}
+	}
+}
diff --git a/event/filter/eth_filter.go b/event/filter/eth_filter.go
deleted file mode 100644
index 6f61e2b60..000000000
--- a/event/filter/eth_filter.go
+++ /dev/null
@@ -1,127 +0,0 @@
-// Copyright 2014 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-package filter
-
-// TODO make use of the generic filtering system
-
-import (
-	"sync"
-
-	"github.com/ethereum/go-ethereum/core"
-	"github.com/ethereum/go-ethereum/core/state"
-	"github.com/ethereum/go-ethereum/event"
-)
-
-type FilterManager struct {
-	eventMux *event.TypeMux
-
-	filterMu sync.RWMutex
-	filterId int
-	filters  map[int]*core.Filter
-
-	quit chan struct{}
-}
-
-func NewFilterManager(mux *event.TypeMux) *FilterManager {
-	return &FilterManager{
-		eventMux: mux,
-		filters:  make(map[int]*core.Filter),
-	}
-}
-
-func (self *FilterManager) Start() {
-	go self.filterLoop()
-}
-
-func (self *FilterManager) Stop() {
-	close(self.quit)
-}
-
-func (self *FilterManager) InstallFilter(filter *core.Filter) (id int) {
-	self.filterMu.Lock()
-	defer self.filterMu.Unlock()
-	id = self.filterId
-	self.filters[id] = filter
-	self.filterId++
-
-	return id
-}
-
-func (self *FilterManager) UninstallFilter(id int) {
-	self.filterMu.Lock()
-	defer self.filterMu.Unlock()
-	if _, ok := self.filters[id]; ok {
-		delete(self.filters, id)
-	}
-}
-
-// GetFilter retrieves a filter installed using InstallFilter.
-// The filter may not be modified.
-func (self *FilterManager) GetFilter(id int) *core.Filter {
-	self.filterMu.RLock()
-	defer self.filterMu.RUnlock()
-	return self.filters[id]
-}
-
-func (self *FilterManager) filterLoop() {
-	// Subscribe to events
-	events := self.eventMux.Subscribe(
-		//core.PendingBlockEvent{},
-		core.ChainEvent{},
-		core.TxPreEvent{},
-		state.Logs(nil))
-
-out:
-	for {
-		select {
-		case <-self.quit:
-			break out
-		case event := <-events.Chan():
-			switch event := event.(type) {
-			case core.ChainEvent:
-				self.filterMu.RLock()
-				for _, filter := range self.filters {
-					if filter.BlockCallback != nil {
-						filter.BlockCallback(event.Block, event.Logs)
-					}
-				}
-				self.filterMu.RUnlock()
-
-			case core.TxPreEvent:
-				self.filterMu.RLock()
-				for _, filter := range self.filters {
-					if filter.TransactionCallback != nil {
-						filter.TransactionCallback(event.Tx)
-					}
-				}
-				self.filterMu.RUnlock()
-
-			case state.Logs:
-				self.filterMu.RLock()
-				for _, filter := range self.filters {
-					if filter.LogsCallback != nil {
-						msgs := filter.FilterLogs(event)
-						if len(msgs) > 0 {
-							filter.LogsCallback(msgs)
-						}
-					}
-				}
-				self.filterMu.RUnlock()
-			}
-		}
-	}
-}
diff --git a/xeth/xeth.go b/xeth/xeth.go
index 623b3a963..0494342a3 100644
--- a/xeth/xeth.go
+++ b/xeth/xeth.go
@@ -35,7 +35,7 @@ import (
 	"github.com/ethereum/go-ethereum/core/types"
 	"github.com/ethereum/go-ethereum/crypto"
 	"github.com/ethereum/go-ethereum/eth"
-	"github.com/ethereum/go-ethereum/event/filter"
+	"github.com/ethereum/go-ethereum/eth/filters"
 	"github.com/ethereum/go-ethereum/logger"
 	"github.com/ethereum/go-ethereum/logger/glog"
 	"github.com/ethereum/go-ethereum/miner"
@@ -75,7 +75,7 @@ type XEth struct {
 	whisper *Whisper
 
 	quit          chan struct{}
-	filterManager *filter.FilterManager
+	filterManager *filters.FilterSystem
 
 	logMu    sync.RWMutex
 	logQueue map[int]*logQueue
@@ -111,7 +111,7 @@ func New(ethereum *eth.Ethereum, frontend Frontend) *XEth {
 		backend:          ethereum,
 		frontend:         frontend,
 		quit:             make(chan struct{}),
-		filterManager:    filter.NewFilterManager(ethereum.EventMux()),
+		filterManager:    filters.NewFilterSystem(ethereum.EventMux()),
 		logQueue:         make(map[int]*logQueue),
 		blockQueue:       make(map[int]*hashQueue),
 		transactionQueue: make(map[int]*hashQueue),
@@ -128,7 +128,6 @@ func New(ethereum *eth.Ethereum, frontend Frontend) *XEth {
 	xeth.state = NewState(xeth, xeth.backend.ChainManager().State())
 
 	go xeth.start()
-	go xeth.filterManager.Start()
 
 	return xeth
 }
@@ -142,7 +141,7 @@ done:
 			self.logMu.Lock()
 			for id, filter := range self.logQueue {
 				if time.Since(filter.timeout) > filterTickerTime {
-					self.filterManager.UninstallFilter(id)
+					self.filterManager.Remove(id)
 					delete(self.logQueue, id)
 				}
 			}
@@ -151,7 +150,7 @@ done:
 			self.blockMu.Lock()
 			for id, filter := range self.blockQueue {
 				if time.Since(filter.timeout) > filterTickerTime {
-					self.filterManager.UninstallFilter(id)
+					self.filterManager.Remove(id)
 					delete(self.blockQueue, id)
 				}
 			}
@@ -160,7 +159,7 @@ done:
 			self.transactionMu.Lock()
 			for id, filter := range self.transactionQueue {
 				if time.Since(filter.timeout) > filterTickerTime {
-					self.filterManager.UninstallFilter(id)
+					self.filterManager.Remove(id)
 					delete(self.transactionQueue, id)
 				}
 			}
@@ -504,7 +503,7 @@ func (self *XEth) IsContract(address string) bool {
 }
 
 func (self *XEth) UninstallFilter(id int) bool {
-	defer self.filterManager.UninstallFilter(id)
+	defer self.filterManager.Remove(id)
 
 	if _, ok := self.logQueue[id]; ok {
 		self.logMu.Lock()
@@ -532,8 +531,8 @@ func (self *XEth) NewLogFilter(earliest, latest int64, skip, max int, address []
 	self.logMu.Lock()
 	defer self.logMu.Unlock()
 
-	filter := core.NewFilter(self.backend)
-	id := self.filterManager.InstallFilter(filter)
+	filter := filters.New(self.backend.ChainDb())
+	id := self.filterManager.Add(filter)
 	self.logQueue[id] = &logQueue{timeout: time.Now()}
 
 	filter.SetEarliestBlock(earliest)
@@ -558,8 +557,8 @@ func (self *XEth) NewTransactionFilter() int {
 	self.transactionMu.Lock()
 	defer self.transactionMu.Unlock()
 
-	filter := core.NewFilter(self.backend)
-	id := self.filterManager.InstallFilter(filter)
+	filter := filters.New(self.backend.ChainDb())
+	id := self.filterManager.Add(filter)
 	self.transactionQueue[id] = &hashQueue{timeout: time.Now()}
 
 	filter.TransactionCallback = func(tx *types.Transaction) {
@@ -577,8 +576,8 @@ func (self *XEth) NewBlockFilter() int {
 	self.blockMu.Lock()
 	defer self.blockMu.Unlock()
 
-	filter := core.NewFilter(self.backend)
-	id := self.filterManager.InstallFilter(filter)
+	filter := filters.New(self.backend.ChainDb())
+	id := self.filterManager.Add(filter)
 	self.blockQueue[id] = &hashQueue{timeout: time.Now()}
 
 	filter.BlockCallback = func(block *types.Block, logs state.Logs) {
@@ -635,7 +634,7 @@ func (self *XEth) TransactionFilterChanged(id int) []common.Hash {
 }
 
 func (self *XEth) Logs(id int) state.Logs {
-	filter := self.filterManager.GetFilter(id)
+	filter := self.filterManager.Get(id)
 	if filter != nil {
 		return filter.Find()
 	}
@@ -644,7 +643,7 @@ func (self *XEth) Logs(id int) state.Logs {
 }
 
 func (self *XEth) AllLogs(earliest, latest int64, skip, max int, address []string, topics [][]string) state.Logs {
-	filter := core.NewFilter(self.backend)
+	filter := filters.New(self.backend.ChainDb())
 	filter.SetEarliestBlock(earliest)
 	filter.SetLatestBlock(latest)
 	filter.SetSkip(skip)
-- 
GitLab