From 260243cf884444b87b823a2b644bb3340f9dadd8 Mon Sep 17 00:00:00 2001
From: Sayli Patil <ptsayli@gmail.com>
Date: Mon, 6 Apr 2020 09:52:21 +0530
Subject: [PATCH] MAT-865:  Add subscribe deposit event filter (#21)

* add eth_depositById rpc method

* add sample depositById test

* change ethereum to bor

* add subscription for new deposit data

* create channel to listen new state change

* push state data to subcribed channel

* apply filter on deposit events sub

* remove unused methods

* Fix: no filter

* Remove unused method

* revert changes

* Fix: RPC port

* refactor and cleanup

* Fix: SubscribeStateEvent

* remove unused import

* Resolve comments

Co-authored-by: Arpit Agarwal <93arpit@gmail.com>
---
 consensus/bor/bor.go         | 20 +++++++++++++++
 core/events.go               |  4 +++
 core/types/transaction.go    | 12 +++++++++
 eth/api_backend.go           |  6 +++++
 eth/filters/api.go           | 33 +++++++++++++++++++++++-
 eth/filters/filter.go        |  1 +
 eth/filters/filter_system.go | 50 +++++++++++++++++++++++++++++++-----
 ethclient/ethclient.go       |  8 +++++-
 ethclient/ethclient_test.go  | 38 +++++++++++++--------------
 interfaces.go                |  5 ++++
 internal/ethapi/backend.go   |  1 +
 les/api_backend.go           |  6 +++++
 les/backend.go               |  2 ++
 13 files changed, 158 insertions(+), 28 deletions(-)

diff --git a/consensus/bor/bor.go b/consensus/bor/bor.go
index 406989b6c..85f24f5b7 100644
--- a/consensus/bor/bor.go
+++ b/consensus/bor/bor.go
@@ -30,6 +30,7 @@ import (
 	"github.com/maticnetwork/bor/core/vm"
 	"github.com/maticnetwork/bor/crypto"
 	"github.com/maticnetwork/bor/ethdb"
+	"github.com/maticnetwork/bor/event"
 	"github.com/maticnetwork/bor/internal/ethapi"
 	"github.com/maticnetwork/bor/log"
 	"github.com/maticnetwork/bor/params"
@@ -245,6 +246,8 @@ type Bor struct {
 	stateReceiverABI abi.ABI
 	HeimdallClient   IHeimdallClient
 
+	stateDataFeed event.Feed
+	scope         event.SubscriptionScope
 	// The fields below are for testing only
 	fakeDiff bool // Skip difficulty verifications
 }
@@ -1158,6 +1161,7 @@ func (c *Bor) CommitStates(
 	header *types.Header,
 	chain core.ChainContext,
 ) error {
+	fmt.Println("comminting state")
 	// get pending state proposals
 	stateIds, err := c.GetPendingStateProposals(header.Number.Uint64() - 1)
 	if err != nil {
@@ -1201,6 +1205,16 @@ func (c *Bor) CommitStates(
 			"txHash", eventRecord.TxHash,
 			"chainID", eventRecord.ChainID,
 		)
+		stateData := types.StateData{
+			Did:      eventRecord.ID,
+			Contract: eventRecord.Contract,
+			Data:     hex.EncodeToString(eventRecord.Data),
+			TxHash:   eventRecord.TxHash,
+		}
+
+		go func() {
+			c.stateDataFeed.Send(core.NewStateChangeEvent{StateData: &stateData})
+		}()
 
 		recordBytes, err := rlp.EncodeToBytes(eventRecord)
 		if err != nil {
@@ -1226,6 +1240,12 @@ func (c *Bor) CommitStates(
 	return nil
 }
 
+
+// SubscribeStateEvent registers a subscription of ChainSideEvent.
+func (c *Bor) SubscribeStateEvent(ch chan<- core.NewStateChangeEvent) event.Subscription {
+	return c.scope.Track(c.stateDataFeed.Subscribe(ch))
+}
+
 func (c *Bor) SetHeimdallClient(h IHeimdallClient) {
 	c.HeimdallClient = h
 }
diff --git a/core/events.go b/core/events.go
index ba12738fa..71a231003 100644
--- a/core/events.go
+++ b/core/events.go
@@ -41,6 +41,10 @@ type ChainEvent struct {
 	Logs  []*types.Log
 }
 
+type NewStateChangeEvent struct {
+	StateData *types.StateData
+}
+
 type ChainSideEvent struct {
 	Block *types.Block
 }
diff --git a/core/types/transaction.go b/core/types/transaction.go
index b19901c8e..f44c3ece3 100644
--- a/core/types/transaction.go
+++ b/core/types/transaction.go
@@ -60,6 +60,14 @@ type txdata struct {
 	Hash *common.Hash `json:"hash" rlp:"-"`
 }
 
+// State represents state received from Ethereum Blockchain
+type StateData struct {
+	Did      uint64
+	Contract common.Address
+	Data     string
+	TxHash   common.Hash
+}
+
 type txdataMarshaling struct {
 	AccountNonce hexutil.Uint64
 	Price        *hexutil.Big
@@ -71,6 +79,10 @@ type txdataMarshaling struct {
 	S            *hexutil.Big
 }
 
+func (sd *StateData) StateData() *StateData {
+	return sd
+}
+
 func NewTransaction(nonce uint64, to common.Address, amount *big.Int, gasLimit uint64, gasPrice *big.Int, data []byte) *Transaction {
 	return newTransaction(nonce, &to, amount, gasLimit, gasPrice, data)
 }
diff --git a/eth/api_backend.go b/eth/api_backend.go
index dbb57207d..ff03d1a91 100644
--- a/eth/api_backend.go
+++ b/eth/api_backend.go
@@ -24,6 +24,7 @@ import (
 	"github.com/maticnetwork/bor/accounts"
 	"github.com/maticnetwork/bor/common"
 	"github.com/maticnetwork/bor/common/math"
+	"github.com/maticnetwork/bor/consensus/bor"
 	"github.com/maticnetwork/bor/core"
 	"github.com/maticnetwork/bor/core/bloombits"
 	"github.com/maticnetwork/bor/core/rawdb"
@@ -155,6 +156,11 @@ func (b *EthAPIBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) e
 	return b.eth.BlockChain().SubscribeChainSideEvent(ch)
 }
 
+func (b *EthAPIBackend) SubscribeStateEvent(ch chan<- core.NewStateChangeEvent) event.Subscription {
+	engine := b.eth.Engine()
+	return engine.(*bor.Bor).SubscribeStateEvent(ch)
+}
+
 func (b *EthAPIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
 	return b.eth.BlockChain().SubscribeLogsEvent(ch)
 }
diff --git a/eth/filters/api.go b/eth/filters/api.go
index addbece2a..294be1cec 100644
--- a/eth/filters/api.go
+++ b/eth/filters/api.go
@@ -215,7 +215,6 @@ func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, er
 	go func() {
 		headers := make(chan *types.Header)
 		headersSub := api.events.SubscribeNewHeads(headers)
-
 		for {
 			select {
 			case h := <-headers:
@@ -233,6 +232,38 @@ func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, er
 	return rpcSub, nil
 }
 
+// NewDeposits send a notification each time a new deposit received from bridge.
+func (api *PublicFilterAPI) NewDeposits(ctx context.Context, crit ethereum.FilterState) (*rpc.Subscription, error) {
+	notifier, supported := rpc.NotifierFromContext(ctx)
+	if !supported {
+		return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
+	}
+
+	rpcSub := notifier.CreateSubscription()
+	go func() {
+		stateData := make(chan *types.StateData)
+		stateDataSub := api.events.SubscribeNewDeposits(stateData)
+
+		for {
+			select {
+			case h := <-stateData:
+				if crit.Did == h.Did || crit.Contract == h.Contract ||
+					(crit.Did == 0 && crit.Contract == common.Address{}) {
+					notifier.Notify(rpcSub.ID, h)
+				}
+			case <-rpcSub.Err():
+				stateDataSub.Unsubscribe()
+				return
+			case <-notifier.Closed():
+				stateDataSub.Unsubscribe()
+				return
+			}
+		}
+	}()
+
+	return rpcSub, nil
+}
+
 // Logs creates a subscription that fires for all new log that match the given filter criteria.
 func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subscription, error) {
 	notifier, supported := rpc.NotifierFromContext(ctx)
diff --git a/eth/filters/filter.go b/eth/filters/filter.go
index a32a902bd..547040eb3 100644
--- a/eth/filters/filter.go
+++ b/eth/filters/filter.go
@@ -40,6 +40,7 @@ type Backend interface {
 
 	SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
 	SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
+	SubscribeStateEvent(ch chan<- core.NewStateChangeEvent) event.Subscription
 	SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
 	SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
 
diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go
index fd947eb0d..35efa4c8e 100644
--- a/eth/filters/filter_system.go
+++ b/eth/filters/filter_system.go
@@ -53,6 +53,10 @@ const (
 	PendingTransactionsSubscription
 	// BlocksSubscription queries hashes for blocks that are imported
 	BlocksSubscription
+
+	//StateSubscription to listen main chain state
+	StateSubscription
+
 	// LastSubscription keeps track of the last index
 	LastIndexSubscription
 )
@@ -68,6 +72,8 @@ const (
 	logsChanSize = 10
 	// chainEvChanSize is the size of channel listening to ChainEvent.
 	chainEvChanSize = 10
+	// stateEvChanSize is the size of channel listening to ChainEvent.
+	stateEvChanSize = 10
 )
 
 var (
@@ -82,6 +88,7 @@ type subscription struct {
 	logs      chan []*types.Log
 	hashes    chan []common.Hash
 	headers   chan *types.Header
+	stateData chan *types.StateData
 	installed chan struct{} // closed when the filter is installed
 	err       chan error    // closed when the filter is uninstalled
 }
@@ -99,15 +106,18 @@ type EventSystem struct {
 	logsSub       event.Subscription         // Subscription for new log event
 	rmLogsSub     event.Subscription         // Subscription for removed log event
 	chainSub      event.Subscription         // Subscription for new chain event
+	stateSub      event.Subscription         // Subscription for new state change event
 	pendingLogSub *event.TypeMuxSubscription // Subscription for pending log event
 
 	// Channels
-	install   chan *subscription         // install filter for event notification
-	uninstall chan *subscription         // remove filter for event notification
-	txsCh     chan core.NewTxsEvent      // Channel to receive new transactions event
-	logsCh    chan []*types.Log          // Channel to receive new log event
-	rmLogsCh  chan core.RemovedLogsEvent // Channel to receive removed log event
-	chainCh   chan core.ChainEvent       // Channel to receive new chain event
+	install   chan *subscription            // install filter for event notification
+	uninstall chan *subscription            // remove filter for event notification
+	txsCh     chan core.NewTxsEvent         // Channel to receive new transactions event
+	logsCh    chan []*types.Log             // Channel to receive new log event
+	rmLogsCh  chan core.RemovedLogsEvent    // Channel to receive removed log event
+	chainCh   chan core.ChainEvent          // Channel to receive new chain event
+	stateCh   chan core.NewStateChangeEvent // Channel to receive deposit state change event
+
 }
 
 // NewEventSystem creates a new manager that listens for event on the given mux,
@@ -127,6 +137,7 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS
 		logsCh:    make(chan []*types.Log, logsChanSize),
 		rmLogsCh:  make(chan core.RemovedLogsEvent, rmLogsChanSize),
 		chainCh:   make(chan core.ChainEvent, chainEvChanSize),
+		stateCh:   make(chan core.NewStateChangeEvent, stateEvChanSize),
 	}
 
 	// Subscribe events
@@ -134,12 +145,13 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS
 	m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh)
 	m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh)
 	m.chainSub = m.backend.SubscribeChainEvent(m.chainCh)
+	m.stateSub = m.backend.SubscribeStateEvent(m.stateCh)
 	// TODO(rjl493456442): use feed to subscribe pending log event
 	m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{})
 
 	// Make sure none of the subscriptions are empty
 	if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil ||
-		m.pendingLogSub.Closed() {
+		m.stateSub == nil || m.pendingLogSub.Closed() {
 		log.Crit("Subscribe for event system failed")
 	}
 
@@ -292,6 +304,24 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti
 		logs:      make(chan []*types.Log),
 		hashes:    make(chan []common.Hash),
 		headers:   headers,
+		stateData: make(chan *types.StateData),
+		installed: make(chan struct{}),
+		err:       make(chan error),
+	}
+	return es.subscribe(sub)
+}
+
+// SubscribeNewHeads creates a subscription that writes the header of a block that is
+// imported in the chain.
+func (es *EventSystem) SubscribeNewDeposits(stateData chan *types.StateData) *Subscription {
+	sub := &subscription{
+		id:        rpc.NewID(),
+		typ:       StateSubscription,
+		created:   time.Now(),
+		logs:      make(chan []*types.Log),
+		hashes:    make(chan []common.Hash),
+		headers:   make(chan *types.Header),
+		stateData: stateData,
 		installed: make(chan struct{}),
 		err:       make(chan error),
 	}
@@ -355,6 +385,10 @@ func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) {
 		for _, f := range filters[PendingTransactionsSubscription] {
 			f.hashes <- hashes
 		}
+	case core.NewStateChangeEvent:
+		for _, f := range filters[StateSubscription] {
+			f.stateData <- e.StateData.StateData()
+		}
 	case core.ChainEvent:
 		for _, f := range filters[BlocksSubscription] {
 			f.headers <- e.Block.Header()
@@ -471,6 +505,8 @@ func (es *EventSystem) eventLoop() {
 			es.broadcast(index, ev)
 		case ev := <-es.chainCh:
 			es.broadcast(index, ev)
+		case ev := <-es.stateCh:
+			es.broadcast(index, ev)
 		case ev, active := <-es.pendingLogSub.Chan():
 			if !active { // system stopped
 				return
diff --git a/ethclient/ethclient.go b/ethclient/ethclient.go
index ad8acaafa..ea55e9e7c 100644
--- a/ethclient/ethclient.go
+++ b/ethclient/ethclient.go
@@ -24,7 +24,7 @@ import (
 	"fmt"
 	"math/big"
 
-	"github.com/maticnetwork/bor"
+	ethereum "github.com/maticnetwork/bor"
 	"github.com/maticnetwork/bor/common"
 	"github.com/maticnetwork/bor/common/hexutil"
 	"github.com/maticnetwork/bor/core/types"
@@ -324,6 +324,12 @@ func (ec *Client) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header)
 	return ec.c.EthSubscribe(ctx, ch, "newHeads")
 }
 
+// SubscribeNewHead subscribes to notifications about the current blockchain head
+// on the given channel.
+func (ec *Client) SubscribeNewDeposit(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) {
+	return ec.c.EthSubscribe(ctx, ch, "newDeposits")
+}
+
 // State Access
 
 // NetworkID returns the network ID (also known as the chain ID) for this chain.
diff --git a/ethclient/ethclient_test.go b/ethclient/ethclient_test.go
index c9364e2d8..504a77a97 100644
--- a/ethclient/ethclient_test.go
+++ b/ethclient/ethclient_test.go
@@ -25,7 +25,7 @@ import (
 	"testing"
 	"time"
 
-	"github.com/maticnetwork/bor"
+	bor "github.com/maticnetwork/bor"
 	"github.com/maticnetwork/bor/common"
 	"github.com/maticnetwork/bor/consensus/ethash"
 	"github.com/maticnetwork/bor/core"
@@ -39,17 +39,17 @@ import (
 
 // Verify that Client implements the ethereum interfaces.
 var (
-	_ = ethereum.ChainReader(&Client{})
-	_ = ethereum.TransactionReader(&Client{})
-	_ = ethereum.ChainStateReader(&Client{})
-	_ = ethereum.ChainSyncReader(&Client{})
-	_ = ethereum.ContractCaller(&Client{})
-	_ = ethereum.GasEstimator(&Client{})
-	_ = ethereum.GasPricer(&Client{})
-	_ = ethereum.LogFilterer(&Client{})
-	_ = ethereum.PendingStateReader(&Client{})
-	// _ = ethereum.PendingStateEventer(&Client{})
-	_ = ethereum.PendingContractCaller(&Client{})
+	_ = bor.ChainReader(&Client{})
+	_ = bor.TransactionReader(&Client{})
+	_ = bor.ChainStateReader(&Client{})
+	_ = bor.ChainSyncReader(&Client{})
+	_ = bor.ContractCaller(&Client{})
+	_ = bor.GasEstimator(&Client{})
+	_ = bor.GasPricer(&Client{})
+	_ = bor.LogFilterer(&Client{})
+	_ = bor.PendingStateReader(&Client{})
+	// _ = bor.PendingStateEventer(&Client{})
+	_ = bor.PendingContractCaller(&Client{})
 )
 
 func TestToFilterArg(t *testing.T) {
@@ -63,13 +63,13 @@ func TestToFilterArg(t *testing.T) {
 
 	for _, testCase := range []struct {
 		name   string
-		input  ethereum.FilterQuery
+		input  bor.FilterQuery
 		output interface{}
 		err    error
 	}{
 		{
 			"without BlockHash",
-			ethereum.FilterQuery{
+			bor.FilterQuery{
 				Addresses: addresses,
 				FromBlock: big.NewInt(1),
 				ToBlock:   big.NewInt(2),
@@ -85,7 +85,7 @@ func TestToFilterArg(t *testing.T) {
 		},
 		{
 			"with nil fromBlock and nil toBlock",
-			ethereum.FilterQuery{
+			bor.FilterQuery{
 				Addresses: addresses,
 				Topics:    [][]common.Hash{},
 			},
@@ -99,7 +99,7 @@ func TestToFilterArg(t *testing.T) {
 		},
 		{
 			"with blockhash",
-			ethereum.FilterQuery{
+			bor.FilterQuery{
 				Addresses: addresses,
 				BlockHash: &blockHash,
 				Topics:    [][]common.Hash{},
@@ -113,7 +113,7 @@ func TestToFilterArg(t *testing.T) {
 		},
 		{
 			"with blockhash and from block",
-			ethereum.FilterQuery{
+			bor.FilterQuery{
 				Addresses: addresses,
 				BlockHash: &blockHash,
 				FromBlock: big.NewInt(1),
@@ -124,7 +124,7 @@ func TestToFilterArg(t *testing.T) {
 		},
 		{
 			"with blockhash and to block",
-			ethereum.FilterQuery{
+			bor.FilterQuery{
 				Addresses: addresses,
 				BlockHash: &blockHash,
 				ToBlock:   big.NewInt(1),
@@ -135,7 +135,7 @@ func TestToFilterArg(t *testing.T) {
 		},
 		{
 			"with blockhash and both from / to block",
-			ethereum.FilterQuery{
+			bor.FilterQuery{
 				Addresses: addresses,
 				BlockHash: &blockHash,
 				FromBlock: big.NewInt(1),
diff --git a/interfaces.go b/interfaces.go
index 3aaed80b8..4c8ce56d9 100644
--- a/interfaces.go
+++ b/interfaces.go
@@ -150,6 +150,11 @@ type FilterQuery struct {
 	Topics [][]common.Hash
 }
 
+type FilterState struct {
+	Did      uint64
+	Contract common.Address
+}
+
 // LogFilterer provides access to contract log events using a one-off query or continuous
 // event subscription.
 //
diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go
index 986912b16..e0deeab81 100644
--- a/internal/ethapi/backend.go
+++ b/internal/ethapi/backend.go
@@ -59,6 +59,7 @@ type Backend interface {
 	GetTd(blockHash common.Hash) *big.Int
 	GetEVM(ctx context.Context, msg core.Message, state *state.StateDB, header *types.Header) (*vm.EVM, func() error, error)
 	SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
+	SubscribeStateEvent(ch chan<- core.NewStateChangeEvent) event.Subscription
 	SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription
 	SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription
 
diff --git a/les/api_backend.go b/les/api_backend.go
index 8333fdd8c..5c8ec82d8 100644
--- a/les/api_backend.go
+++ b/les/api_backend.go
@@ -24,6 +24,7 @@ import (
 	"github.com/maticnetwork/bor/accounts"
 	"github.com/maticnetwork/bor/common"
 	"github.com/maticnetwork/bor/common/math"
+	"github.com/maticnetwork/bor/consensus/bor"
 	"github.com/maticnetwork/bor/core"
 	"github.com/maticnetwork/bor/core/bloombits"
 	"github.com/maticnetwork/bor/core/rawdb"
@@ -164,6 +165,11 @@ func (b *LesApiBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) e
 	return b.eth.blockchain.SubscribeChainSideEvent(ch)
 }
 
+func (b *LesApiBackend) SubscribeStateEvent(ch chan<- core.NewStateChangeEvent) event.Subscription {
+	engine := b.eth.Engine()
+	return engine.(*bor.Bor).SubscribeStateEvent(ch)
+}
+
 func (b *LesApiBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
 	return b.eth.blockchain.SubscribeLogsEvent(ch)
 }
diff --git a/les/backend.go b/les/backend.go
index 4481bd676..fc995dd58 100644
--- a/les/backend.go
+++ b/les/backend.go
@@ -28,6 +28,7 @@ import (
 	"github.com/maticnetwork/bor/common/hexutil"
 	"github.com/maticnetwork/bor/common/mclock"
 	"github.com/maticnetwork/bor/consensus"
+	"github.com/maticnetwork/bor/consensus/bor"
 	"github.com/maticnetwork/bor/core"
 	"github.com/maticnetwork/bor/core/bloombits"
 	"github.com/maticnetwork/bor/core/rawdb"
@@ -59,6 +60,7 @@ type LightEthereum struct {
 	peers      *peerSet
 	txPool     *light.TxPool
 	blockchain *light.LightChain
+	bor        *bor.Bor
 	serverPool *serverPool
 	reqDist    *requestDistributor
 	retriever  *retrieveManager
-- 
GitLab