diff --git a/accounts/abi/bind/backends/simulated.go b/accounts/abi/bind/backends/simulated.go index 58c2c4a44d95d06e167e80ced9a36ea1a1ef3349..a161fb0c9501bafa97c6e2743986e62e726d70c8 100644 --- a/accounts/abi/bind/backends/simulated.go +++ b/accounts/abi/bind/backends/simulated.go @@ -510,6 +510,9 @@ func (fb *filterBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEve func (fb *filterBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { return fb.bc.SubscribeLogsEvent(ch) } +func (fb *filterBackend) SubscribeStateEvent(ch chan<- core.NewStateChangeEvent) event.Subscription { + return fb.bc.SubscribeStateEvent(ch) +} func (fb *filterBackend) BloomStatus() (uint64, uint64) { return 4096, 0 } func (fb *filterBackend) ServiceFilter(ctx context.Context, ms *bloombits.MatcherSession) { diff --git a/consensus/bor/bor.go b/consensus/bor/bor.go index 406989b6cb672719af65cdee4d540eb0fac6ae71..bd55a745e7c9530ecb116275309975f84722bd44 100644 --- a/consensus/bor/bor.go +++ b/consensus/bor/bor.go @@ -30,6 +30,7 @@ import ( "github.com/maticnetwork/bor/core/vm" "github.com/maticnetwork/bor/crypto" "github.com/maticnetwork/bor/ethdb" + "github.com/maticnetwork/bor/event" "github.com/maticnetwork/bor/internal/ethapi" "github.com/maticnetwork/bor/log" "github.com/maticnetwork/bor/params" @@ -245,6 +246,8 @@ type Bor struct { stateReceiverABI abi.ABI HeimdallClient IHeimdallClient + stateDataFeed event.Feed + scope event.SubscriptionScope // The fields below are for testing only fakeDiff bool // Skip difficulty verifications } @@ -1201,6 +1204,16 @@ func (c *Bor) CommitStates( "txHash", eventRecord.TxHash, "chainID", eventRecord.ChainID, ) + stateData := types.StateData{ + Did: eventRecord.ID, + Contract: eventRecord.Contract, + Data: hex.EncodeToString(eventRecord.Data), + TxHash: eventRecord.TxHash, + } + + go func() { + c.stateDataFeed.Send(core.NewStateChangeEvent{StateData: &stateData}) + }() recordBytes, err := rlp.EncodeToBytes(eventRecord) if err != nil { @@ -1226,6 +1239,11 @@ func (c *Bor) CommitStates( return nil } +// SubscribeStateEvent registers a subscription of ChainSideEvent. +func (c *Bor) SubscribeStateEvent(ch chan<- core.NewStateChangeEvent) event.Subscription { + return c.scope.Track(c.stateDataFeed.Subscribe(ch)) +} + func (c *Bor) SetHeimdallClient(h IHeimdallClient) { c.HeimdallClient = h } diff --git a/core/blockchain.go b/core/blockchain.go index 34bbd400efde468d81aba58b97bde525d1a8e019..e90ffbb30eac7c53fb032e225ccc53b1f3b040b3 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -27,6 +27,7 @@ import ( "sync/atomic" "time" + "github.com/hashicorp/golang-lru" "github.com/maticnetwork/bor/common" "github.com/maticnetwork/bor/common/mclock" "github.com/maticnetwork/bor/common/prque" @@ -42,7 +43,6 @@ import ( "github.com/maticnetwork/bor/params" "github.com/maticnetwork/bor/rlp" "github.com/maticnetwork/bor/trie" - "github.com/hashicorp/golang-lru" ) var ( @@ -142,6 +142,7 @@ type BlockChain struct { chainHeadFeed event.Feed logsFeed event.Feed blockProcFeed event.Feed + stateDataFeed event.Feed scope event.SubscriptionScope genesisBlock *types.Block @@ -2177,6 +2178,11 @@ func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscript return bc.scope.Track(bc.logsFeed.Subscribe(ch)) } +// SubscribeStateEvent registers a subscription of ChainSideEvent. +func (bc *BlockChain) SubscribeStateEvent(ch chan<- NewStateChangeEvent) event.Subscription { + return bc.scope.Track(bc.stateDataFeed.Subscribe(ch)) +} + // SubscribeBlockProcessingEvent registers a subscription of bool where true means // block processing has started while false means it has stopped. func (bc *BlockChain) SubscribeBlockProcessingEvent(ch chan<- bool) event.Subscription { diff --git a/core/events.go b/core/events.go index ba12738fa1695cef401b7b5921e26f9eab2a8a10..71a23100393bbc75559d152529dee8bac6f7f399 100644 --- a/core/events.go +++ b/core/events.go @@ -41,6 +41,10 @@ type ChainEvent struct { Logs []*types.Log } +type NewStateChangeEvent struct { + StateData *types.StateData +} + type ChainSideEvent struct { Block *types.Block } diff --git a/core/types/transaction.go b/core/types/transaction.go index b19901c8ec2af9e23c9a86856ad79608648fc4ff..1481229b5ca2a56c8f8f108aba080886e97416c5 100644 --- a/core/types/transaction.go +++ b/core/types/transaction.go @@ -60,6 +60,14 @@ type txdata struct { Hash *common.Hash `json:"hash" rlp:"-"` } +// State represents state received from Ethereum Blockchain +type StateData struct { + Did uint64 + Contract common.Address + Data string + TxHash common.Hash +} + type txdataMarshaling struct { AccountNonce hexutil.Uint64 Price *hexutil.Big diff --git a/eth/api_backend.go b/eth/api_backend.go index dbb57207d6b7a264154e83aac951e00ad679ab10..ff03d1a91e0537368b4c8ad9f10c07e916ef819a 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -24,6 +24,7 @@ import ( "github.com/maticnetwork/bor/accounts" "github.com/maticnetwork/bor/common" "github.com/maticnetwork/bor/common/math" + "github.com/maticnetwork/bor/consensus/bor" "github.com/maticnetwork/bor/core" "github.com/maticnetwork/bor/core/bloombits" "github.com/maticnetwork/bor/core/rawdb" @@ -155,6 +156,11 @@ func (b *EthAPIBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) e return b.eth.BlockChain().SubscribeChainSideEvent(ch) } +func (b *EthAPIBackend) SubscribeStateEvent(ch chan<- core.NewStateChangeEvent) event.Subscription { + engine := b.eth.Engine() + return engine.(*bor.Bor).SubscribeStateEvent(ch) +} + func (b *EthAPIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { return b.eth.BlockChain().SubscribeLogsEvent(ch) } diff --git a/eth/filters/api.go b/eth/filters/api.go index addbece2aa5152b4fb5dfeead0948a8a4d8b0f05..d7d382d9e8a8ab416f0d26ea9bf294288b091555 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -17,6 +17,7 @@ package filters import ( + "bytes" "context" "encoding/json" "errors" @@ -215,7 +216,6 @@ func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, er go func() { headers := make(chan *types.Header) headersSub := api.events.SubscribeNewHeads(headers) - for { select { case h := <-headers: @@ -233,6 +233,38 @@ func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, er return rpcSub, nil } +// NewDeposits send a notification each time a new deposit received from bridge. +func (api *PublicFilterAPI) NewDeposits(ctx context.Context, crit ethereum.FilterState) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported + } + + rpcSub := notifier.CreateSubscription() + go func() { + stateData := make(chan *types.StateData) + stateDataSub := api.events.SubscribeNewDeposits(stateData) + + for { + select { + case h := <-stateData: + if crit.Did == h.Did || bytes.Compare(crit.Contract.Bytes(), h.Contract.Bytes()) == 0 || + (crit.Did == 0 && crit.Contract == common.Address{}) { + notifier.Notify(rpcSub.ID, h) + } + case <-rpcSub.Err(): + stateDataSub.Unsubscribe() + return + case <-notifier.Closed(): + stateDataSub.Unsubscribe() + return + } + } + }() + + return rpcSub, nil +} + // Logs creates a subscription that fires for all new log that match the given filter criteria. func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subscription, error) { notifier, supported := rpc.NotifierFromContext(ctx) diff --git a/eth/filters/filter.go b/eth/filters/filter.go index a32a902bd0a5ce3063328382c91fa5ff83c98735..547040eb35758bf46df56a52c51b8409807974b6 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -40,6 +40,7 @@ type Backend interface { SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription + SubscribeStateEvent(ch chan<- core.NewStateChangeEvent) event.Subscription SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index fd947eb0dce1546f53cc7c34212e7610a953d096..deff4faea078a82eef7dd087018d195a9779b8d8 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -53,6 +53,10 @@ const ( PendingTransactionsSubscription // BlocksSubscription queries hashes for blocks that are imported BlocksSubscription + + //StateSubscription to listen main chain state + StateSubscription + // LastSubscription keeps track of the last index LastIndexSubscription ) @@ -68,6 +72,8 @@ const ( logsChanSize = 10 // chainEvChanSize is the size of channel listening to ChainEvent. chainEvChanSize = 10 + // stateEvChanSize is the size of channel listening to ChainEvent. + stateEvChanSize = 10 ) var ( @@ -82,6 +88,7 @@ type subscription struct { logs chan []*types.Log hashes chan []common.Hash headers chan *types.Header + stateData chan *types.StateData installed chan struct{} // closed when the filter is installed err chan error // closed when the filter is uninstalled } @@ -99,15 +106,18 @@ type EventSystem struct { logsSub event.Subscription // Subscription for new log event rmLogsSub event.Subscription // Subscription for removed log event chainSub event.Subscription // Subscription for new chain event + stateSub event.Subscription // Subscription for new state change event pendingLogSub *event.TypeMuxSubscription // Subscription for pending log event // Channels - install chan *subscription // install filter for event notification - uninstall chan *subscription // remove filter for event notification - txsCh chan core.NewTxsEvent // Channel to receive new transactions event - logsCh chan []*types.Log // Channel to receive new log event - rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event - chainCh chan core.ChainEvent // Channel to receive new chain event + install chan *subscription // install filter for event notification + uninstall chan *subscription // remove filter for event notification + txsCh chan core.NewTxsEvent // Channel to receive new transactions event + logsCh chan []*types.Log // Channel to receive new log event + rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event + chainCh chan core.ChainEvent // Channel to receive new chain event + stateCh chan core.NewStateChangeEvent // Channel to receive deposit state change event + } // NewEventSystem creates a new manager that listens for event on the given mux, @@ -127,6 +137,7 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS logsCh: make(chan []*types.Log, logsChanSize), rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize), chainCh: make(chan core.ChainEvent, chainEvChanSize), + stateCh: make(chan core.NewStateChangeEvent, stateEvChanSize), } // Subscribe events @@ -134,12 +145,13 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh) m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh) m.chainSub = m.backend.SubscribeChainEvent(m.chainCh) + m.stateSub = m.backend.SubscribeStateEvent(m.stateCh) // TODO(rjl493456442): use feed to subscribe pending log event m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{}) // Make sure none of the subscriptions are empty if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || - m.pendingLogSub.Closed() { + m.stateSub == nil || m.pendingLogSub.Closed() { log.Crit("Subscribe for event system failed") } @@ -292,6 +304,23 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti logs: make(chan []*types.Log), hashes: make(chan []common.Hash), headers: headers, + stateData: make(chan *types.StateData), + installed: make(chan struct{}), + err: make(chan error), + } + return es.subscribe(sub) +} + +// SubscribeNewDeposits creates a subscription that writes details about the new state sync events (from mainchain to Bor) +func (es *EventSystem) SubscribeNewDeposits(stateData chan *types.StateData) *Subscription { + sub := &subscription{ + id: rpc.NewID(), + typ: StateSubscription, + created: time.Now(), + logs: make(chan []*types.Log), + hashes: make(chan []common.Hash), + headers: make(chan *types.Header), + stateData: stateData, installed: make(chan struct{}), err: make(chan error), } @@ -355,6 +384,10 @@ func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) { for _, f := range filters[PendingTransactionsSubscription] { f.hashes <- hashes } + case core.NewStateChangeEvent: + for _, f := range filters[StateSubscription] { + f.stateData <- e.StateData + } case core.ChainEvent: for _, f := range filters[BlocksSubscription] { f.headers <- e.Block.Header() @@ -471,6 +504,8 @@ func (es *EventSystem) eventLoop() { es.broadcast(index, ev) case ev := <-es.chainCh: es.broadcast(index, ev) + case ev := <-es.stateCh: + es.broadcast(index, ev) case ev, active := <-es.pendingLogSub.Chan(): if !active { // system stopped return diff --git a/ethclient/ethclient.go b/ethclient/ethclient.go index ad8acaafa2d64709282c9fb1f4c0da71df9b6361..5d349a289b86f8875b4e59d4b38cdf22778a2f6f 100644 --- a/ethclient/ethclient.go +++ b/ethclient/ethclient.go @@ -24,7 +24,7 @@ import ( "fmt" "math/big" - "github.com/maticnetwork/bor" + ethereum "github.com/maticnetwork/bor" "github.com/maticnetwork/bor/common" "github.com/maticnetwork/bor/common/hexutil" "github.com/maticnetwork/bor/core/types" @@ -324,6 +324,11 @@ func (ec *Client) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) return ec.c.EthSubscribe(ctx, ch, "newHeads") } +// SubscribeNewDeposit subscribes to new state sync events +func (ec *Client) SubscribeNewDeposit(ctx context.Context, ch chan<- *types.StateData) (ethereum.Subscription, error) { + return ec.c.EthSubscribe(ctx, ch, "newDeposits", nil) +} + // State Access // NetworkID returns the network ID (also known as the chain ID) for this chain. diff --git a/ethclient/ethclient_test.go b/ethclient/ethclient_test.go index c9364e2d8c43321fe1fa645a29a34afbfde20f2e..504a77a9706a01e1107fca90480b853cf14389eb 100644 --- a/ethclient/ethclient_test.go +++ b/ethclient/ethclient_test.go @@ -25,7 +25,7 @@ import ( "testing" "time" - "github.com/maticnetwork/bor" + bor "github.com/maticnetwork/bor" "github.com/maticnetwork/bor/common" "github.com/maticnetwork/bor/consensus/ethash" "github.com/maticnetwork/bor/core" @@ -39,17 +39,17 @@ import ( // Verify that Client implements the ethereum interfaces. var ( - _ = ethereum.ChainReader(&Client{}) - _ = ethereum.TransactionReader(&Client{}) - _ = ethereum.ChainStateReader(&Client{}) - _ = ethereum.ChainSyncReader(&Client{}) - _ = ethereum.ContractCaller(&Client{}) - _ = ethereum.GasEstimator(&Client{}) - _ = ethereum.GasPricer(&Client{}) - _ = ethereum.LogFilterer(&Client{}) - _ = ethereum.PendingStateReader(&Client{}) - // _ = ethereum.PendingStateEventer(&Client{}) - _ = ethereum.PendingContractCaller(&Client{}) + _ = bor.ChainReader(&Client{}) + _ = bor.TransactionReader(&Client{}) + _ = bor.ChainStateReader(&Client{}) + _ = bor.ChainSyncReader(&Client{}) + _ = bor.ContractCaller(&Client{}) + _ = bor.GasEstimator(&Client{}) + _ = bor.GasPricer(&Client{}) + _ = bor.LogFilterer(&Client{}) + _ = bor.PendingStateReader(&Client{}) + // _ = bor.PendingStateEventer(&Client{}) + _ = bor.PendingContractCaller(&Client{}) ) func TestToFilterArg(t *testing.T) { @@ -63,13 +63,13 @@ func TestToFilterArg(t *testing.T) { for _, testCase := range []struct { name string - input ethereum.FilterQuery + input bor.FilterQuery output interface{} err error }{ { "without BlockHash", - ethereum.FilterQuery{ + bor.FilterQuery{ Addresses: addresses, FromBlock: big.NewInt(1), ToBlock: big.NewInt(2), @@ -85,7 +85,7 @@ func TestToFilterArg(t *testing.T) { }, { "with nil fromBlock and nil toBlock", - ethereum.FilterQuery{ + bor.FilterQuery{ Addresses: addresses, Topics: [][]common.Hash{}, }, @@ -99,7 +99,7 @@ func TestToFilterArg(t *testing.T) { }, { "with blockhash", - ethereum.FilterQuery{ + bor.FilterQuery{ Addresses: addresses, BlockHash: &blockHash, Topics: [][]common.Hash{}, @@ -113,7 +113,7 @@ func TestToFilterArg(t *testing.T) { }, { "with blockhash and from block", - ethereum.FilterQuery{ + bor.FilterQuery{ Addresses: addresses, BlockHash: &blockHash, FromBlock: big.NewInt(1), @@ -124,7 +124,7 @@ func TestToFilterArg(t *testing.T) { }, { "with blockhash and to block", - ethereum.FilterQuery{ + bor.FilterQuery{ Addresses: addresses, BlockHash: &blockHash, ToBlock: big.NewInt(1), @@ -135,7 +135,7 @@ func TestToFilterArg(t *testing.T) { }, { "with blockhash and both from / to block", - ethereum.FilterQuery{ + bor.FilterQuery{ Addresses: addresses, BlockHash: &blockHash, FromBlock: big.NewInt(1), diff --git a/interfaces.go b/interfaces.go index 3aaed80b88075ef69062f1c441b6ba012b580019..4c8ce56d9dfbc4a3135bbfda2aed46d8c4b962f3 100644 --- a/interfaces.go +++ b/interfaces.go @@ -150,6 +150,11 @@ type FilterQuery struct { Topics [][]common.Hash } +type FilterState struct { + Did uint64 + Contract common.Address +} + // LogFilterer provides access to contract log events using a one-off query or continuous // event subscription. // diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index 986912b163862c84e6ecae18a0e85011d3ecf118..e0deeab81fdb42e2c907b223fe1469d085b52866 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -59,6 +59,7 @@ type Backend interface { GetTd(blockHash common.Hash) *big.Int GetEVM(ctx context.Context, msg core.Message, state *state.StateDB, header *types.Header) (*vm.EVM, func() error, error) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription + SubscribeStateEvent(ch chan<- core.NewStateChangeEvent) event.Subscription SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription diff --git a/les/api_backend.go b/les/api_backend.go index 8333fdd8c37fca8669cda92723a0921fd9b4cbea..5c8ec82d8812f6c828364e2d4a44334ebb163371 100644 --- a/les/api_backend.go +++ b/les/api_backend.go @@ -24,6 +24,7 @@ import ( "github.com/maticnetwork/bor/accounts" "github.com/maticnetwork/bor/common" "github.com/maticnetwork/bor/common/math" + "github.com/maticnetwork/bor/consensus/bor" "github.com/maticnetwork/bor/core" "github.com/maticnetwork/bor/core/bloombits" "github.com/maticnetwork/bor/core/rawdb" @@ -164,6 +165,11 @@ func (b *LesApiBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) e return b.eth.blockchain.SubscribeChainSideEvent(ch) } +func (b *LesApiBackend) SubscribeStateEvent(ch chan<- core.NewStateChangeEvent) event.Subscription { + engine := b.eth.Engine() + return engine.(*bor.Bor).SubscribeStateEvent(ch) +} + func (b *LesApiBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { return b.eth.blockchain.SubscribeLogsEvent(ch) } diff --git a/les/backend.go b/les/backend.go index 4481bd676ebfb18f3ff545dc619a1ca7deeed7e6..fc995dd58fc469237cf7dadd840e5343ca842f8f 100644 --- a/les/backend.go +++ b/les/backend.go @@ -28,6 +28,7 @@ import ( "github.com/maticnetwork/bor/common/hexutil" "github.com/maticnetwork/bor/common/mclock" "github.com/maticnetwork/bor/consensus" + "github.com/maticnetwork/bor/consensus/bor" "github.com/maticnetwork/bor/core" "github.com/maticnetwork/bor/core/bloombits" "github.com/maticnetwork/bor/core/rawdb" @@ -59,6 +60,7 @@ type LightEthereum struct { peers *peerSet txPool *light.TxPool blockchain *light.LightChain + bor *bor.Bor serverPool *serverPool reqDist *requestDistributor retriever *retrieveManager