From c64852dbccd0c8eb57cab994aefd0243c65b351b Mon Sep 17 00:00:00 2001
From: obscuren <geffobscura@gmail.com>
Date: Thu, 5 Feb 2015 11:55:03 -0800
Subject: [PATCH] pending / chain event

---
 cmd/mist/assets/html/home.html | 10 +++++++++-
 core/block_processor.go        |  8 ++++++++
 core/filter.go                 |  5 +++--
 event/filter/old_filter.go     | 11 ++++++++++-
 rpc/message.go                 | 16 +++++++++++++++-
 rpc/packages.go                | 26 ++++++++++++++++++++++++++
 6 files changed, 71 insertions(+), 5 deletions(-)

diff --git a/cmd/mist/assets/html/home.html b/cmd/mist/assets/html/home.html
index 7116f5dde..531327c68 100644
--- a/cmd/mist/assets/html/home.html
+++ b/cmd/mist/assets/html/home.html
@@ -60,7 +60,7 @@
     var web3 = require('web3');
     var eth = web3.eth;
 
-    web3.setProvider(new web3.providers.HttpSyncProvider('http://localhost:8080'));
+    web3.setProvider(new web3.providers.HttpSyncProvider('http://localhost:8545'));
 
     document.querySelector("#number").innerHTML = eth.number;
     document.querySelector("#coinbase").innerHTML = eth.coinbase
@@ -69,6 +69,14 @@
     document.querySelector("#gas_price").innerHTML = eth.gasPrice;
     document.querySelector("#mining").innerHTML = eth.mining;
     document.querySelector("#listening").innerHTML = eth.listening;
+
+    eth.watch('pending').changed(function() {
+        console.log("pending changed");
+    });
+    eth.watch('chain').changed(function() {
+        console.log("chain changed");
+    });
+
 </script>
 
 </html>
diff --git a/core/block_processor.go b/core/block_processor.go
index d59d7feca..349de85e0 100644
--- a/core/block_processor.go
+++ b/core/block_processor.go
@@ -19,6 +19,10 @@ import (
 	"gopkg.in/fatih/set.v0"
 )
 
+type PendingBlockEvent struct {
+	Block *types.Block
+}
+
 var statelogger = logger.NewLogger("BLOCK")
 
 type EthManager interface {
@@ -154,6 +158,10 @@ done:
 	block.Reward = cumulativeSum
 	block.Header().GasUsed = totalUsedGas
 
+	if transientProcess {
+		go self.eventMux.Post(PendingBlockEvent{block})
+	}
+
 	return receipts, handled, unhandled, erroneous, err
 }
 
diff --git a/core/filter.go b/core/filter.go
index c22996d7e..24d1f5a4a 100644
--- a/core/filter.go
+++ b/core/filter.go
@@ -33,8 +33,9 @@ type Filter struct {
 	max      int
 	topics   [][]byte
 
-	BlockCallback func(*types.Block)
-	LogsCallback  func(state.Logs)
+	BlockCallback   func(*types.Block)
+	PendingCallback func(*types.Block)
+	LogsCallback    func(state.Logs)
 }
 
 // Create a new filter which uses a bloom filter on blocks to figure out whether a particular block
diff --git a/event/filter/old_filter.go b/event/filter/old_filter.go
index 4c01572db..ab0127ffb 100644
--- a/event/filter/old_filter.go
+++ b/event/filter/old_filter.go
@@ -59,7 +59,7 @@ func (self *FilterManager) GetFilter(id int) *core.Filter {
 
 func (self *FilterManager) filterLoop() {
 	// Subscribe to events
-	events := self.eventMux.Subscribe(core.NewBlockEvent{}, state.Logs(nil))
+	events := self.eventMux.Subscribe(core.PendingBlockEvent{}, core.NewBlockEvent{}, state.Logs(nil))
 
 out:
 	for {
@@ -77,6 +77,15 @@ out:
 				}
 				self.filterMu.RUnlock()
 
+			case core.PendingBlockEvent:
+				self.filterMu.RLock()
+				for _, filter := range self.filters {
+					if filter.PendingCallback != nil {
+						filter.PendingCallback(event.Block)
+					}
+				}
+				self.filterMu.RUnlock()
+
 			case state.Logs:
 				self.filterMu.RLock()
 				for _, filter := range self.filters {
diff --git a/rpc/message.go b/rpc/message.go
index e110bdf3e..78dc6e2ff 100644
--- a/rpc/message.go
+++ b/rpc/message.go
@@ -205,7 +205,6 @@ func (req *RpcRequest) ToFilterArgs() (*FilterOptions, error) {
 	if len(req.Params) < 1 {
 		return nil, NewErrorResponse(ErrorArguments)
 	}
-	fmt.Println("FILTER PARAMS", string(req.Params[0]))
 
 	args := new(FilterOptions)
 	r := bytes.NewReader(req.Params[0])
@@ -217,6 +216,21 @@ func (req *RpcRequest) ToFilterArgs() (*FilterOptions, error) {
 	return args, nil
 }
 
+func (req *RpcRequest) ToFilterStringArgs() (string, error) {
+	if len(req.Params) < 1 {
+		return "", NewErrorResponse(ErrorArguments)
+	}
+
+	var args string
+	err := json.Unmarshal(req.Params[0], &args)
+	if err != nil {
+		return "", NewErrorResponse(ErrorDecodeArgs)
+	}
+
+	rpclogger.DebugDetailf("%T %v", args, args)
+	return args, nil
+}
+
 func (req *RpcRequest) ToFilterChangedArgs() (int, error) {
 	if len(req.Params) < 1 {
 		return 0, NewErrorResponse(ErrorArguments)
diff --git a/rpc/packages.go b/rpc/packages.go
index 047bbda9a..ac3127356 100644
--- a/rpc/packages.go
+++ b/rpc/packages.go
@@ -32,6 +32,7 @@ import (
 	"sync"
 
 	"github.com/ethereum/go-ethereum/core"
+	"github.com/ethereum/go-ethereum/core/types"
 	"github.com/ethereum/go-ethereum/crypto"
 	"github.com/ethereum/go-ethereum/ethdb"
 	"github.com/ethereum/go-ethereum/ethutil"
@@ -88,6 +89,25 @@ func (self *EthereumApi) NewFilter(args *FilterOptions, reply *interface{}) erro
 	return nil
 }
 
+func (self *EthereumApi) NewFilterString(args string, reply *interface{}) error {
+	var id int
+	filter := core.NewFilter(self.xeth.Backend())
+
+	callback := func(block *types.Block) {
+		self.logs[id] = append(self.logs[id], &state.StateLog{})
+	}
+	if args == "pending" {
+		filter.PendingCallback = callback
+	} else if args == "chain" {
+		filter.BlockCallback = callback
+	}
+
+	id = self.filterManager.InstallFilter(filter)
+	*reply = id
+
+	return nil
+}
+
 func (self *EthereumApi) FilterChanged(id int, reply *interface{}) error {
 	self.logMut.RLock()
 	defer self.logMut.RUnlock()
@@ -389,6 +409,12 @@ func (p *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) error
 			return err
 		}
 		return p.NewFilter(args, reply)
+	case "eth_newFilterString":
+		args, err := req.ToFilterStringArgs()
+		if err != nil {
+			return err
+		}
+		return p.NewFilterString(args, reply)
 	case "eth_changed":
 		args, err := req.ToFilterChangedArgs()
 		if err != nil {
-- 
GitLab