From b97ea0e447c24c0a85f63a7714a2eb221a7faccd Mon Sep 17 00:00:00 2001
From: obscuren <geffobscura@gmail.com>
Date: Wed, 20 Aug 2014 09:59:09 +0200
Subject: [PATCH] Added JSFilter type

---
 ethpipe/js_pipe.go | 86 ++++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 83 insertions(+), 3 deletions(-)

diff --git a/ethpipe/js_pipe.go b/ethpipe/js_pipe.go
index a2b1a4551..0d0928fc3 100644
--- a/ethpipe/js_pipe.go
+++ b/ethpipe/js_pipe.go
@@ -2,10 +2,13 @@ package ethpipe
 
 import (
 	"encoding/json"
+	"fmt"
 	"sync/atomic"
 
 	"github.com/ethereum/eth-go/ethchain"
 	"github.com/ethereum/eth-go/ethcrypto"
+	"github.com/ethereum/eth-go/ethreact"
+	"github.com/ethereum/eth-go/ethstate"
 	"github.com/ethereum/eth-go/ethutil"
 )
 
@@ -74,7 +77,8 @@ func (self *JSPipe) NumberToHuman(balance string) string {
 }
 
 func (self *JSPipe) StorageAt(addr, storageAddr string) string {
-	return self.World().SafeGet(ethutil.Hex2Bytes(addr)).Storage(ethutil.Hex2Bytes(storageAddr)).Str()
+	storage := self.World().SafeGet(ethutil.Hex2Bytes(addr)).Storage(ethutil.Hex2Bytes(storageAddr))
+	return storage.BigInt().String()
 }
 
 func (self *JSPipe) TxCountAt(address string) int {
@@ -186,10 +190,45 @@ func (self *JSPipe) CompileMutan(code string) string {
 	return ethutil.Bytes2Hex(data)
 }
 
+func (self *JSPipe) Watch(object map[string]interface{}) *JSFilter {
+	return NewJSFilterFromMap(object, self.Pipe.obj)
+	/*} else if str, ok := object.(string); ok {
+	println("str")
+	return NewJSFilterFromString(str, self.Pipe.obj)
+	*/
+}
+
 func (self *JSPipe) Messages(object map[string]interface{}) string {
-	filter := ethchain.NewFilterFromMap(object, self.obj)
+	filter := self.Watch(object)
+
+	defer filter.Uninstall()
+
+	return filter.Messages()
+
+}
+
+type JSFilter struct {
+	eth ethchain.EthManager
+	*ethchain.Filter
+	quit chan bool
+
+	BlockCallback   func(*ethchain.Block)
+	MessageCallback func(ethstate.Messages)
+}
+
+func NewJSFilterFromMap(object map[string]interface{}, eth ethchain.EthManager) *JSFilter {
+	filter := &JSFilter{eth, ethchain.NewFilterFromMap(object, eth), make(chan bool), nil, nil}
+
+	go filter.mainLoop()
+
+	return filter
+}
 
-	messages := filter.Find()
+func NewJSFilterFromString(str string, eth ethchain.EthManager) *JSFilter {
+	return nil
+}
+
+func (self *JSFilter) MessagesToJson(messages ethstate.Messages) string {
 	var msgs []JSMessage
 	for _, m := range messages {
 		msgs = append(msgs, NewJSMessage(m))
@@ -202,3 +241,44 @@ func (self *JSPipe) Messages(object map[string]interface{}) string {
 
 	return string(b)
 }
+
+func (self *JSFilter) Messages() string {
+	return self.MessagesToJson(self.Find())
+}
+
+func (self *JSFilter) mainLoop() {
+	blockChan := make(chan ethreact.Event, 1)
+	messageChan := make(chan ethreact.Event, 1)
+	// Subscribe to events
+	reactor := self.eth.Reactor()
+	reactor.Subscribe("newBlock", blockChan)
+	reactor.Subscribe("messages", messageChan)
+out:
+	for {
+		select {
+		case <-self.quit:
+			break out
+		case block := <-blockChan:
+			if block, ok := block.Resource.(*ethchain.Block); ok {
+				if self.BlockCallback != nil {
+					self.BlockCallback(block)
+				}
+			}
+		case msg := <-messageChan:
+			if messages, ok := msg.Resource.(ethstate.Messages); ok {
+				if self.MessageCallback != nil {
+					msgs := self.FilterMessages(messages)
+					self.MessageCallback(msgs)
+				}
+			}
+		}
+	}
+}
+
+func (self *JSFilter) Changed(object interface{}) {
+	fmt.Printf("%T\n", object)
+}
+
+func (self *JSFilter) Uninstall() {
+	self.quit <- true
+}
-- 
GitLab