good morning!!!!

Skip to content
Snippets Groups Projects
Commit 2fb57b2e authored by Jeffrey Wilcke's avatar Jeffrey Wilcke
Browse files

Reworked filters

parent 96cf6fc1
Branches
Tags
No related merge requests found
...@@ -23,6 +23,9 @@ type Filter struct { ...@@ -23,6 +23,9 @@ type Filter struct {
max int max int
altered []data altered []data
BlockCallback func(*Block)
MessageCallback func(ethstate.Messages)
} }
// Create a new filter which uses a bloom filter on blocks to figure out whether a particular block // Create a new filter which uses a bloom filter on blocks to figure out whether a particular block
......
...@@ -16,6 +16,7 @@ import ( ...@@ -16,6 +16,7 @@ import (
"github.com/ethereum/eth-go/ethlog" "github.com/ethereum/eth-go/ethlog"
"github.com/ethereum/eth-go/ethreact" "github.com/ethereum/eth-go/ethreact"
"github.com/ethereum/eth-go/ethrpc" "github.com/ethereum/eth-go/ethrpc"
"github.com/ethereum/eth-go/ethstate"
"github.com/ethereum/eth-go/ethutil" "github.com/ethereum/eth-go/ethutil"
"github.com/ethereum/eth-go/ethwire" "github.com/ethereum/eth-go/ethwire"
) )
...@@ -87,6 +88,8 @@ type Ethereum struct { ...@@ -87,6 +88,8 @@ type Ethereum struct {
clientIdentity ethwire.ClientIdentity clientIdentity ethwire.ClientIdentity
isUpToDate bool isUpToDate bool
filters map[int]*ethchain.Filter
} }
func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager *ethcrypto.KeyManager, caps Caps, usePnp bool) (*Ethereum, error) { func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager *ethcrypto.KeyManager, caps Caps, usePnp bool) (*Ethereum, error) {
...@@ -116,6 +119,7 @@ func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager ...@@ -116,6 +119,7 @@ func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager
keyManager: keyManager, keyManager: keyManager,
clientIdentity: clientIdentity, clientIdentity: clientIdentity,
isUpToDate: true, isUpToDate: true,
filters: make(map[int]*ethchain.Filter),
} }
ethereum.reactor = ethreact.New() ethereum.reactor = ethreact.New()
...@@ -386,6 +390,7 @@ func (s *Ethereum) Start(seed bool) { ...@@ -386,6 +390,7 @@ func (s *Ethereum) Start(seed bool) {
// Start the reaping processes // Start the reaping processes
go s.ReapDeadPeerHandler() go s.ReapDeadPeerHandler()
go s.update() go s.update()
go s.filterLoop()
if seed { if seed {
s.Seed() s.Seed()
...@@ -536,6 +541,60 @@ out: ...@@ -536,6 +541,60 @@ out:
} }
} }
var filterId = 0
func (self *Ethereum) InstallFilter(object map[string]interface{}) (*ethchain.Filter, int) {
defer func() { filterId++ }()
filter := ethchain.NewFilterFromMap(object, self)
self.filters[filterId] = filter
return filter, filterId
}
func (self *Ethereum) UninstallFilter(id int) {
delete(self.filters, id)
}
func (self *Ethereum) GetFilter(id int) *ethchain.Filter {
return self.filters[id]
}
func (self *Ethereum) filterLoop() {
blockChan := make(chan ethreact.Event, 5)
messageChan := make(chan ethreact.Event, 5)
// Subscribe to events
reactor := self.Reactor()
reactor.Subscribe("newBlock", blockChan)
reactor.Subscribe("messages", messageChan)
out:
for {
select {
case <-self.quit:
break out
case block := <-blockChan:
if block, ok := block.Resource.(*ethchain.Block); ok {
for _, filter := range self.filters {
if filter.BlockCallback != nil {
filter.BlockCallback(block)
}
}
}
case msg := <-messageChan:
if messages, ok := msg.Resource.(ethstate.Messages); ok {
for _, filter := range self.filters {
if filter.MessageCallback != nil {
msgs := filter.FilterMessages(messages)
if len(msgs) > 0 {
filter.MessageCallback(msgs)
}
}
}
}
}
}
}
func bootstrapDb(db ethutil.Database) { func bootstrapDb(db ethutil.Database) {
d, _ := db.Get([]byte("ProtocolVersion")) d, _ := db.Get([]byte("ProtocolVersion"))
protov := ethutil.NewValue(d).Uint() protov := ethutil.NewValue(d).Uint()
......
...@@ -3,12 +3,10 @@ package ethpipe ...@@ -3,12 +3,10 @@ package ethpipe
import ( import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"fmt"
"sync/atomic" "sync/atomic"
"github.com/ethereum/eth-go/ethchain" "github.com/ethereum/eth-go/ethchain"
"github.com/ethereum/eth-go/ethcrypto" "github.com/ethereum/eth-go/ethcrypto"
"github.com/ethereum/eth-go/ethreact"
"github.com/ethereum/eth-go/ethstate" "github.com/ethereum/eth-go/ethstate"
"github.com/ethereum/eth-go/ethutil" "github.com/ethereum/eth-go/ethutil"
) )
...@@ -234,102 +232,11 @@ func (self *JSPipe) CompileMutan(code string) string { ...@@ -234,102 +232,11 @@ func (self *JSPipe) CompileMutan(code string) string {
return ethutil.Bytes2Hex(data) return ethutil.Bytes2Hex(data)
} }
func (self *JSPipe) Watch(object map[string]interface{}) *JSFilter { func ToJSMessages(messages ethstate.Messages) *ethutil.List {
return NewJSFilterFromMap(object, self.Pipe.obj)
/*} else if str, ok := object.(string); ok {
println("str")
return NewJSFilterFromString(str, self.Pipe.obj)
*/
}
func (self *JSPipe) Messages(object map[string]interface{}) string {
filter := self.Watch(object)
filter.Uninstall()
return filter.Messages()
}
type JSFilter struct {
eth ethchain.EthManager
*ethchain.Filter
quit chan bool
BlockCallback func(*ethchain.Block)
MessageCallback func(ethstate.Messages)
}
func NewJSFilterFromMap(object map[string]interface{}, eth ethchain.EthManager) *JSFilter {
filter := &JSFilter{eth, ethchain.NewFilterFromMap(object, eth), make(chan bool), nil, nil}
go filter.mainLoop()
return filter
}
func NewJSFilterFromString(str string, eth ethchain.EthManager) *JSFilter {
return nil
}
func (self *JSFilter) MessagesToJson(messages ethstate.Messages) string {
var msgs []JSMessage var msgs []JSMessage
for _, m := range messages { for _, m := range messages {
msgs = append(msgs, NewJSMessage(m)) msgs = append(msgs, NewJSMessage(m))
} }
// Return an empty array instead of "null" return ethutil.NewList(msgs)
if len(msgs) == 0 {
return "[]"
}
b, err := json.Marshal(msgs)
if err != nil {
return "{\"error\":" + err.Error() + "}"
}
return string(b)
}
func (self *JSFilter) Messages() string {
return self.MessagesToJson(self.Find())
}
func (self *JSFilter) mainLoop() {
blockChan := make(chan ethreact.Event, 5)
messageChan := make(chan ethreact.Event, 5)
// Subscribe to events
reactor := self.eth.Reactor()
reactor.Subscribe("newBlock", blockChan)
reactor.Subscribe("messages", messageChan)
out:
for {
select {
case <-self.quit:
break out
case block := <-blockChan:
if block, ok := block.Resource.(*ethchain.Block); ok {
if self.BlockCallback != nil {
self.BlockCallback(block)
}
}
case msg := <-messageChan:
if messages, ok := msg.Resource.(ethstate.Messages); ok {
if self.MessageCallback != nil {
println("messages!")
msgs := self.FilterMessages(messages)
if len(msgs) > 0 {
self.MessageCallback(msgs)
}
}
}
}
}
}
func (self *JSFilter) Changed(object interface{}) {
fmt.Printf("%T\n", object)
}
func (self *JSFilter) Uninstall() {
self.quit <- true
} }
...@@ -20,6 +20,10 @@ func NewList(t interface{}) *List { ...@@ -20,6 +20,10 @@ func NewList(t interface{}) *List {
return &List{list, list.Len()} return &List{list, list.Len()}
} }
func EmptyList() *List {
return NewList([]interface{}{})
}
// Get N element from the embedded slice. Returns nil if OOB. // Get N element from the embedded slice. Returns nil if OOB.
func (self *List) Get(i int) interface{} { func (self *List) Get(i int) interface{} {
if self.list.Len() > i { if self.list.Len() > i {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment