diff --git a/consensus/bor/bor.go b/consensus/bor/bor.go index 406989b6cb672719af65cdee4d540eb0fac6ae71..85f24f5b78b709f996735ce4b2ddef24a6234ec9 100644 --- a/consensus/bor/bor.go +++ b/consensus/bor/bor.go @@ -30,6 +30,7 @@ import ( "github.com/maticnetwork/bor/core/vm" "github.com/maticnetwork/bor/crypto" "github.com/maticnetwork/bor/ethdb" + "github.com/maticnetwork/bor/event" "github.com/maticnetwork/bor/internal/ethapi" "github.com/maticnetwork/bor/log" "github.com/maticnetwork/bor/params" @@ -245,6 +246,8 @@ type Bor struct { stateReceiverABI abi.ABI HeimdallClient IHeimdallClient + stateDataFeed event.Feed + scope event.SubscriptionScope // The fields below are for testing only fakeDiff bool // Skip difficulty verifications } @@ -1158,6 +1161,7 @@ func (c *Bor) CommitStates( header *types.Header, chain core.ChainContext, ) error { + fmt.Println("comminting state") // get pending state proposals stateIds, err := c.GetPendingStateProposals(header.Number.Uint64() - 1) if err != nil { @@ -1201,6 +1205,16 @@ func (c *Bor) CommitStates( "txHash", eventRecord.TxHash, "chainID", eventRecord.ChainID, ) + stateData := types.StateData{ + Did: eventRecord.ID, + Contract: eventRecord.Contract, + Data: hex.EncodeToString(eventRecord.Data), + TxHash: eventRecord.TxHash, + } + + go func() { + c.stateDataFeed.Send(core.NewStateChangeEvent{StateData: &stateData}) + }() recordBytes, err := rlp.EncodeToBytes(eventRecord) if err != nil { @@ -1226,6 +1240,12 @@ func (c *Bor) CommitStates( return nil } + +// SubscribeStateEvent registers a subscription of ChainSideEvent. +func (c *Bor) SubscribeStateEvent(ch chan<- core.NewStateChangeEvent) event.Subscription { + return c.scope.Track(c.stateDataFeed.Subscribe(ch)) +} + func (c *Bor) SetHeimdallClient(h IHeimdallClient) { c.HeimdallClient = h } diff --git a/core/events.go b/core/events.go index ba12738fa1695cef401b7b5921e26f9eab2a8a10..71a23100393bbc75559d152529dee8bac6f7f399 100644 --- a/core/events.go +++ b/core/events.go @@ -41,6 +41,10 @@ type ChainEvent struct { Logs []*types.Log } +type NewStateChangeEvent struct { + StateData *types.StateData +} + type ChainSideEvent struct { Block *types.Block } diff --git a/core/types/transaction.go b/core/types/transaction.go index b19901c8ec2af9e23c9a86856ad79608648fc4ff..f44c3ece369f6dfc91ea842cfce0a628c3d3be21 100644 --- a/core/types/transaction.go +++ b/core/types/transaction.go @@ -60,6 +60,14 @@ type txdata struct { Hash *common.Hash `json:"hash" rlp:"-"` } +// State represents state received from Ethereum Blockchain +type StateData struct { + Did uint64 + Contract common.Address + Data string + TxHash common.Hash +} + type txdataMarshaling struct { AccountNonce hexutil.Uint64 Price *hexutil.Big @@ -71,6 +79,10 @@ type txdataMarshaling struct { S *hexutil.Big } +func (sd *StateData) StateData() *StateData { + return sd +} + func NewTransaction(nonce uint64, to common.Address, amount *big.Int, gasLimit uint64, gasPrice *big.Int, data []byte) *Transaction { return newTransaction(nonce, &to, amount, gasLimit, gasPrice, data) } diff --git a/eth/api_backend.go b/eth/api_backend.go index dbb57207d6b7a264154e83aac951e00ad679ab10..ff03d1a91e0537368b4c8ad9f10c07e916ef819a 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -24,6 +24,7 @@ import ( "github.com/maticnetwork/bor/accounts" "github.com/maticnetwork/bor/common" "github.com/maticnetwork/bor/common/math" + "github.com/maticnetwork/bor/consensus/bor" "github.com/maticnetwork/bor/core" "github.com/maticnetwork/bor/core/bloombits" "github.com/maticnetwork/bor/core/rawdb" @@ -155,6 +156,11 @@ func (b *EthAPIBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) e return b.eth.BlockChain().SubscribeChainSideEvent(ch) } +func (b *EthAPIBackend) SubscribeStateEvent(ch chan<- core.NewStateChangeEvent) event.Subscription { + engine := b.eth.Engine() + return engine.(*bor.Bor).SubscribeStateEvent(ch) +} + func (b *EthAPIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { return b.eth.BlockChain().SubscribeLogsEvent(ch) } diff --git a/eth/filters/api.go b/eth/filters/api.go index addbece2aa5152b4fb5dfeead0948a8a4d8b0f05..294be1cec3801512a6964514b55108faffd26a20 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -215,7 +215,6 @@ func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, er go func() { headers := make(chan *types.Header) headersSub := api.events.SubscribeNewHeads(headers) - for { select { case h := <-headers: @@ -233,6 +232,38 @@ func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, er return rpcSub, nil } +// NewDeposits send a notification each time a new deposit received from bridge. +func (api *PublicFilterAPI) NewDeposits(ctx context.Context, crit ethereum.FilterState) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported + } + + rpcSub := notifier.CreateSubscription() + go func() { + stateData := make(chan *types.StateData) + stateDataSub := api.events.SubscribeNewDeposits(stateData) + + for { + select { + case h := <-stateData: + if crit.Did == h.Did || crit.Contract == h.Contract || + (crit.Did == 0 && crit.Contract == common.Address{}) { + notifier.Notify(rpcSub.ID, h) + } + case <-rpcSub.Err(): + stateDataSub.Unsubscribe() + return + case <-notifier.Closed(): + stateDataSub.Unsubscribe() + return + } + } + }() + + return rpcSub, nil +} + // Logs creates a subscription that fires for all new log that match the given filter criteria. func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subscription, error) { notifier, supported := rpc.NotifierFromContext(ctx) diff --git a/eth/filters/filter.go b/eth/filters/filter.go index a32a902bd0a5ce3063328382c91fa5ff83c98735..547040eb35758bf46df56a52c51b8409807974b6 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -40,6 +40,7 @@ type Backend interface { SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription + SubscribeStateEvent(ch chan<- core.NewStateChangeEvent) event.Subscription SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index fd947eb0dce1546f53cc7c34212e7610a953d096..35efa4c8e7b93560c8b4a247b2fd99ed11e51ca1 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -53,6 +53,10 @@ const ( PendingTransactionsSubscription // BlocksSubscription queries hashes for blocks that are imported BlocksSubscription + + //StateSubscription to listen main chain state + StateSubscription + // LastSubscription keeps track of the last index LastIndexSubscription ) @@ -68,6 +72,8 @@ const ( logsChanSize = 10 // chainEvChanSize is the size of channel listening to ChainEvent. chainEvChanSize = 10 + // stateEvChanSize is the size of channel listening to ChainEvent. + stateEvChanSize = 10 ) var ( @@ -82,6 +88,7 @@ type subscription struct { logs chan []*types.Log hashes chan []common.Hash headers chan *types.Header + stateData chan *types.StateData installed chan struct{} // closed when the filter is installed err chan error // closed when the filter is uninstalled } @@ -99,15 +106,18 @@ type EventSystem struct { logsSub event.Subscription // Subscription for new log event rmLogsSub event.Subscription // Subscription for removed log event chainSub event.Subscription // Subscription for new chain event + stateSub event.Subscription // Subscription for new state change event pendingLogSub *event.TypeMuxSubscription // Subscription for pending log event // Channels - install chan *subscription // install filter for event notification - uninstall chan *subscription // remove filter for event notification - txsCh chan core.NewTxsEvent // Channel to receive new transactions event - logsCh chan []*types.Log // Channel to receive new log event - rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event - chainCh chan core.ChainEvent // Channel to receive new chain event + install chan *subscription // install filter for event notification + uninstall chan *subscription // remove filter for event notification + txsCh chan core.NewTxsEvent // Channel to receive new transactions event + logsCh chan []*types.Log // Channel to receive new log event + rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event + chainCh chan core.ChainEvent // Channel to receive new chain event + stateCh chan core.NewStateChangeEvent // Channel to receive deposit state change event + } // NewEventSystem creates a new manager that listens for event on the given mux, @@ -127,6 +137,7 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS logsCh: make(chan []*types.Log, logsChanSize), rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize), chainCh: make(chan core.ChainEvent, chainEvChanSize), + stateCh: make(chan core.NewStateChangeEvent, stateEvChanSize), } // Subscribe events @@ -134,12 +145,13 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh) m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh) m.chainSub = m.backend.SubscribeChainEvent(m.chainCh) + m.stateSub = m.backend.SubscribeStateEvent(m.stateCh) // TODO(rjl493456442): use feed to subscribe pending log event m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{}) // Make sure none of the subscriptions are empty if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || - m.pendingLogSub.Closed() { + m.stateSub == nil || m.pendingLogSub.Closed() { log.Crit("Subscribe for event system failed") } @@ -292,6 +304,24 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti logs: make(chan []*types.Log), hashes: make(chan []common.Hash), headers: headers, + stateData: make(chan *types.StateData), + installed: make(chan struct{}), + err: make(chan error), + } + return es.subscribe(sub) +} + +// SubscribeNewHeads creates a subscription that writes the header of a block that is +// imported in the chain. +func (es *EventSystem) SubscribeNewDeposits(stateData chan *types.StateData) *Subscription { + sub := &subscription{ + id: rpc.NewID(), + typ: StateSubscription, + created: time.Now(), + logs: make(chan []*types.Log), + hashes: make(chan []common.Hash), + headers: make(chan *types.Header), + stateData: stateData, installed: make(chan struct{}), err: make(chan error), } @@ -355,6 +385,10 @@ func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) { for _, f := range filters[PendingTransactionsSubscription] { f.hashes <- hashes } + case core.NewStateChangeEvent: + for _, f := range filters[StateSubscription] { + f.stateData <- e.StateData.StateData() + } case core.ChainEvent: for _, f := range filters[BlocksSubscription] { f.headers <- e.Block.Header() @@ -471,6 +505,8 @@ func (es *EventSystem) eventLoop() { es.broadcast(index, ev) case ev := <-es.chainCh: es.broadcast(index, ev) + case ev := <-es.stateCh: + es.broadcast(index, ev) case ev, active := <-es.pendingLogSub.Chan(): if !active { // system stopped return diff --git a/ethclient/ethclient.go b/ethclient/ethclient.go index ad8acaafa2d64709282c9fb1f4c0da71df9b6361..ea55e9e7c7eb19501a871538505714d4432f1439 100644 --- a/ethclient/ethclient.go +++ b/ethclient/ethclient.go @@ -24,7 +24,7 @@ import ( "fmt" "math/big" - "github.com/maticnetwork/bor" + ethereum "github.com/maticnetwork/bor" "github.com/maticnetwork/bor/common" "github.com/maticnetwork/bor/common/hexutil" "github.com/maticnetwork/bor/core/types" @@ -324,6 +324,12 @@ func (ec *Client) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) return ec.c.EthSubscribe(ctx, ch, "newHeads") } +// SubscribeNewHead subscribes to notifications about the current blockchain head +// on the given channel. +func (ec *Client) SubscribeNewDeposit(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) { + return ec.c.EthSubscribe(ctx, ch, "newDeposits") +} + // State Access // NetworkID returns the network ID (also known as the chain ID) for this chain. diff --git a/ethclient/ethclient_test.go b/ethclient/ethclient_test.go index c9364e2d8c43321fe1fa645a29a34afbfde20f2e..504a77a9706a01e1107fca90480b853cf14389eb 100644 --- a/ethclient/ethclient_test.go +++ b/ethclient/ethclient_test.go @@ -25,7 +25,7 @@ import ( "testing" "time" - "github.com/maticnetwork/bor" + bor "github.com/maticnetwork/bor" "github.com/maticnetwork/bor/common" "github.com/maticnetwork/bor/consensus/ethash" "github.com/maticnetwork/bor/core" @@ -39,17 +39,17 @@ import ( // Verify that Client implements the ethereum interfaces. var ( - _ = ethereum.ChainReader(&Client{}) - _ = ethereum.TransactionReader(&Client{}) - _ = ethereum.ChainStateReader(&Client{}) - _ = ethereum.ChainSyncReader(&Client{}) - _ = ethereum.ContractCaller(&Client{}) - _ = ethereum.GasEstimator(&Client{}) - _ = ethereum.GasPricer(&Client{}) - _ = ethereum.LogFilterer(&Client{}) - _ = ethereum.PendingStateReader(&Client{}) - // _ = ethereum.PendingStateEventer(&Client{}) - _ = ethereum.PendingContractCaller(&Client{}) + _ = bor.ChainReader(&Client{}) + _ = bor.TransactionReader(&Client{}) + _ = bor.ChainStateReader(&Client{}) + _ = bor.ChainSyncReader(&Client{}) + _ = bor.ContractCaller(&Client{}) + _ = bor.GasEstimator(&Client{}) + _ = bor.GasPricer(&Client{}) + _ = bor.LogFilterer(&Client{}) + _ = bor.PendingStateReader(&Client{}) + // _ = bor.PendingStateEventer(&Client{}) + _ = bor.PendingContractCaller(&Client{}) ) func TestToFilterArg(t *testing.T) { @@ -63,13 +63,13 @@ func TestToFilterArg(t *testing.T) { for _, testCase := range []struct { name string - input ethereum.FilterQuery + input bor.FilterQuery output interface{} err error }{ { "without BlockHash", - ethereum.FilterQuery{ + bor.FilterQuery{ Addresses: addresses, FromBlock: big.NewInt(1), ToBlock: big.NewInt(2), @@ -85,7 +85,7 @@ func TestToFilterArg(t *testing.T) { }, { "with nil fromBlock and nil toBlock", - ethereum.FilterQuery{ + bor.FilterQuery{ Addresses: addresses, Topics: [][]common.Hash{}, }, @@ -99,7 +99,7 @@ func TestToFilterArg(t *testing.T) { }, { "with blockhash", - ethereum.FilterQuery{ + bor.FilterQuery{ Addresses: addresses, BlockHash: &blockHash, Topics: [][]common.Hash{}, @@ -113,7 +113,7 @@ func TestToFilterArg(t *testing.T) { }, { "with blockhash and from block", - ethereum.FilterQuery{ + bor.FilterQuery{ Addresses: addresses, BlockHash: &blockHash, FromBlock: big.NewInt(1), @@ -124,7 +124,7 @@ func TestToFilterArg(t *testing.T) { }, { "with blockhash and to block", - ethereum.FilterQuery{ + bor.FilterQuery{ Addresses: addresses, BlockHash: &blockHash, ToBlock: big.NewInt(1), @@ -135,7 +135,7 @@ func TestToFilterArg(t *testing.T) { }, { "with blockhash and both from / to block", - ethereum.FilterQuery{ + bor.FilterQuery{ Addresses: addresses, BlockHash: &blockHash, FromBlock: big.NewInt(1), diff --git a/interfaces.go b/interfaces.go index 3aaed80b88075ef69062f1c441b6ba012b580019..4c8ce56d9dfbc4a3135bbfda2aed46d8c4b962f3 100644 --- a/interfaces.go +++ b/interfaces.go @@ -150,6 +150,11 @@ type FilterQuery struct { Topics [][]common.Hash } +type FilterState struct { + Did uint64 + Contract common.Address +} + // LogFilterer provides access to contract log events using a one-off query or continuous // event subscription. // diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index 986912b163862c84e6ecae18a0e85011d3ecf118..e0deeab81fdb42e2c907b223fe1469d085b52866 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -59,6 +59,7 @@ type Backend interface { GetTd(blockHash common.Hash) *big.Int GetEVM(ctx context.Context, msg core.Message, state *state.StateDB, header *types.Header) (*vm.EVM, func() error, error) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription + SubscribeStateEvent(ch chan<- core.NewStateChangeEvent) event.Subscription SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription diff --git a/les/api_backend.go b/les/api_backend.go index 8333fdd8c37fca8669cda92723a0921fd9b4cbea..5c8ec82d8812f6c828364e2d4a44334ebb163371 100644 --- a/les/api_backend.go +++ b/les/api_backend.go @@ -24,6 +24,7 @@ import ( "github.com/maticnetwork/bor/accounts" "github.com/maticnetwork/bor/common" "github.com/maticnetwork/bor/common/math" + "github.com/maticnetwork/bor/consensus/bor" "github.com/maticnetwork/bor/core" "github.com/maticnetwork/bor/core/bloombits" "github.com/maticnetwork/bor/core/rawdb" @@ -164,6 +165,11 @@ func (b *LesApiBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) e return b.eth.blockchain.SubscribeChainSideEvent(ch) } +func (b *LesApiBackend) SubscribeStateEvent(ch chan<- core.NewStateChangeEvent) event.Subscription { + engine := b.eth.Engine() + return engine.(*bor.Bor).SubscribeStateEvent(ch) +} + func (b *LesApiBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { return b.eth.blockchain.SubscribeLogsEvent(ch) } diff --git a/les/backend.go b/les/backend.go index 4481bd676ebfb18f3ff545dc619a1ca7deeed7e6..fc995dd58fc469237cf7dadd840e5343ca842f8f 100644 --- a/les/backend.go +++ b/les/backend.go @@ -28,6 +28,7 @@ import ( "github.com/maticnetwork/bor/common/hexutil" "github.com/maticnetwork/bor/common/mclock" "github.com/maticnetwork/bor/consensus" + "github.com/maticnetwork/bor/consensus/bor" "github.com/maticnetwork/bor/core" "github.com/maticnetwork/bor/core/bloombits" "github.com/maticnetwork/bor/core/rawdb" @@ -59,6 +60,7 @@ type LightEthereum struct { peers *peerSet txPool *light.TxPool blockchain *light.LightChain + bor *bor.Bor serverPool *serverPool reqDist *requestDistributor retriever *retrieveManager