From 337d1bfb7c80c6f4e91841af638e76a7742dbe32 Mon Sep 17 00:00:00 2001 From: Jaynti Kanani <jdkanani@gmail.com> Date: Fri, 20 Nov 2020 14:25:19 +0530 Subject: [PATCH] new: add roothash and state sync filter apis --- accounts/abi/bind/backends/bor_simulated.go | 11 ++++++ consensus/bor/bor.go | 28 ++++++++++---- core/blockchain.go | 29 ++++++++++++++ core/bor_events.go | 10 +++++ core/types/state_data.go | 6 +-- eth/bor_api_backend.go | 35 +++++++++++++++++ eth/filters/bor_api.go | 43 +++++++++++++++++++++ eth/filters/bor_filter_system.go | 32 +++++++++++++++ eth/filters/filter.go | 2 + eth/filters/filter_system.go | 17 +++++++- eth/filters/filter_system_test.go | 6 +++ ethclient/bor_ethclient.go | 14 +++++++ interfaces.go | 6 +++ internal/ethapi/backend.go | 4 ++ internal/ethapi/bor_api.go | 14 +++++++ les/bor_api_backend.go | 19 +++++++++ light/lightchain.go | 6 +++ 17 files changed, 270 insertions(+), 12 deletions(-) create mode 100644 accounts/abi/bind/backends/bor_simulated.go create mode 100644 core/bor_events.go create mode 100644 eth/bor_api_backend.go create mode 100644 eth/filters/bor_api.go create mode 100644 eth/filters/bor_filter_system.go create mode 100644 ethclient/bor_ethclient.go create mode 100644 internal/ethapi/bor_api.go create mode 100644 les/bor_api_backend.go diff --git a/accounts/abi/bind/backends/bor_simulated.go b/accounts/abi/bind/backends/bor_simulated.go new file mode 100644 index 000000000..3ef162132 --- /dev/null +++ b/accounts/abi/bind/backends/bor_simulated.go @@ -0,0 +1,11 @@ +package backends + +import ( + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/event" +) + +// SubscribeStateSyncEvent subscribes to state sync events +func (fb *filterBackend) SubscribeStateSyncEvent(ch chan<- core.StateSyncEvent) event.Subscription { + return fb.bc.SubscribeStateSyncEvent(ch) +} diff --git a/consensus/bor/bor.go b/consensus/bor/bor.go index 6210a6207..0c3a840a8 100644 --- a/consensus/bor/bor.go +++ b/consensus/bor/bor.go @@ -224,8 +224,7 @@ type Bor struct { HeimdallClient IHeimdallClient WithoutHeimdall bool - stateSyncFeed event.Feed - scope event.SubscriptionScope + scope event.SubscriptionScope // The fields below are for testing only fakeDiff bool // Skip difficulty verifications } @@ -653,6 +652,8 @@ func (c *Bor) Prepare(chain consensus.ChainHeaderReader, header *types.Header) e // Finalize implements consensus.Engine, ensuring no uncles are set, nor block // rewards given. func (c *Bor) Finalize(chain consensus.ChainHeaderReader, header *types.Header, state *state.StateDB, txs []*types.Transaction, uncles []*types.Header) { + stateSyncData := []*types.StateSyncData{} + var err error headerNumber := header.Number.Uint64() if headerNumber%c.config.Sprint == 0 { @@ -665,7 +666,7 @@ func (c *Bor) Finalize(chain consensus.ChainHeaderReader, header *types.Header, if !c.WithoutHeimdall { // commit statees - _, err = c.CommitStates(state, header, cx) + stateSyncData, err = c.CommitStates(state, header, cx) if err != nil { log.Error("Error while committing states", "error", err) return @@ -676,11 +677,17 @@ func (c *Bor) Finalize(chain consensus.ChainHeaderReader, header *types.Header, // No block rewards in PoA, so the state remains as is and uncles are dropped header.Root = state.IntermediateRoot(chain.Config().IsEIP158(header.Number)) header.UncleHash = types.CalcUncleHash(nil) + + // Set state sync data to blockchain + bc := chain.(*core.BlockChain) + bc.SetStateSync(stateSyncData) } // FinalizeAndAssemble implements consensus.Engine, ensuring no uncles are set, // nor block rewards given, and returns the final block. func (c *Bor) FinalizeAndAssemble(chain consensus.ChainHeaderReader, header *types.Header, state *state.StateDB, txs []*types.Transaction, uncles []*types.Header, receipts []*types.Receipt) (*types.Block, error) { + stateSyncData := []*types.StateSyncData{} + headerNumber := header.Number.Uint64() if headerNumber%c.config.Sprint == 0 { cx := chainContext{Chain: chain, Bor: c} @@ -694,7 +701,7 @@ func (c *Bor) FinalizeAndAssemble(chain consensus.ChainHeaderReader, header *typ if !c.WithoutHeimdall { // commit states - _, err = c.CommitStates(state, header, cx) + stateSyncData, err = c.CommitStates(state, header, cx) if err != nil { log.Error("Error while committing states", "error", err) return nil, err @@ -708,6 +715,11 @@ func (c *Bor) FinalizeAndAssemble(chain consensus.ChainHeaderReader, header *typ // Assemble block block := types.NewBlock(header, txs, nil, receipts, new(trie.Trie)) + + // set state sync + bc := chain.(*core.BlockChain) + bc.SetStateSync(stateSyncData) + // return the final block for sealing return block, nil } @@ -1090,8 +1102,8 @@ func (c *Bor) CommitStates( state *state.StateDB, header *types.Header, chain chainContext, -) ([]*types.StateData, error) { - stateSyncs := make([]*types.StateData, 0) +) ([]*types.StateSyncData, error) { + stateSyncs := make([]*types.StateSyncData, 0) number := header.Number.Uint64() _lastStateID, err := c.GenesisContractsClient.LastStateId(number - 1) if err != nil { @@ -1116,8 +1128,8 @@ func (c *Bor) CommitStates( break } - stateData := types.StateData{ - Did: eventRecord.ID, + stateData := types.StateSyncData{ + ID: eventRecord.ID, Contract: eventRecord.Contract, Data: hex.EncodeToString(eventRecord.Data), TxHash: eventRecord.TxHash, diff --git a/core/blockchain.go b/core/blockchain.go index 898adf1a7..0396192cd 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -211,6 +211,10 @@ type BlockChain struct { shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block. terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion. writeLegacyJournal bool // Testing flag used to flush the snapshot journal in legacy format. + + // Bor related changes + stateSyncData []*types.StateSyncData // State sync data + stateSyncFeed event.Feed // State sync feed } // NewBlockChain returns a fully initialised block chain using information @@ -1630,6 +1634,11 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. // event here. if emitHeadEvent { bc.chainHeadFeed.Send(ChainHeadEvent{Block: block}) + // BOR state sync feed related changes + for _, data := range bc.stateSyncData { + bc.stateSyncFeed.Send(StateSyncEvent{Data: data}) + } + // BOR } } else { bc.chainSideFeed.Send(ChainSideEvent{Block: block}) @@ -1887,6 +1896,12 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er atomic.StoreUint32(&followupInterrupt, 1) return it.index, err } + // BOR state sync feed related changes + for _, data := range bc.stateSyncData { + bc.stateSyncFeed.Send(StateSyncEvent{Data: data}) + } + // BOR + // Update the metrics touched during block processing accountReadTimer.Update(statedb.AccountReads) // Account reads are complete, we can mark them storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete, we can mark them @@ -2555,3 +2570,17 @@ func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscript func (bc *BlockChain) SubscribeBlockProcessingEvent(ch chan<- bool) event.Subscription { return bc.scope.Track(bc.blockProcFeed.Subscribe(ch)) } + +// +// Bor related changes +// + +// SetStateSync set sync data in state_data +func (bc *BlockChain) SetStateSync(stateData []*types.StateSyncData) { + bc.stateSyncData = stateData +} + +// SubscribeStateSyncEvent registers a subscription of StateSyncEvent. +func (bc *BlockChain) SubscribeStateSyncEvent(ch chan<- StateSyncEvent) event.Subscription { + return bc.scope.Track(bc.stateSyncFeed.Subscribe(ch)) +} diff --git a/core/bor_events.go b/core/bor_events.go new file mode 100644 index 000000000..d8a5b38d0 --- /dev/null +++ b/core/bor_events.go @@ -0,0 +1,10 @@ +package core + +import ( + "github.com/ethereum/go-ethereum/core/types" +) + +// StateSyncEvent represents state sync events +type StateSyncEvent struct { + Data *types.StateSyncData +} diff --git a/core/types/state_data.go b/core/types/state_data.go index 38174461c..d04f91ec5 100644 --- a/core/types/state_data.go +++ b/core/types/state_data.go @@ -2,9 +2,9 @@ package types import "github.com/ethereum/go-ethereum/common" -// StateData represents state received from Ethereum Blockchain -type StateData struct { - Did uint64 +// StateSyncData represents state received from Ethereum Blockchain +type StateSyncData struct { + ID uint64 Contract common.Address Data string TxHash common.Hash diff --git a/eth/bor_api_backend.go b/eth/bor_api_backend.go new file mode 100644 index 000000000..860b99de5 --- /dev/null +++ b/eth/bor_api_backend.go @@ -0,0 +1,35 @@ +package eth + +import ( + "context" + "errors" + + "github.com/ethereum/go-ethereum/consensus/bor" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/event" +) + +// GetRootHash returns root hash for given start and end block +func (b *EthAPIBackend) GetRootHash(ctx context.Context, starBlockNr uint64, endBlockNr uint64) (string, error) { + var api *bor.API + for _, _api := range b.eth.Engine().APIs(b.eth.BlockChain()) { + if _api.Namespace == "bor" { + api = _api.Service.(*bor.API) + } + } + + if api == nil { + return "", errors.New("Only available in Bor engine") + } + + root, err := api.GetRootHash(starBlockNr, endBlockNr) + if err != nil { + return "", err + } + return root, nil +} + +// SubscribeStateSyncEvent subscribes to state sync event +func (b *EthAPIBackend) SubscribeStateSyncEvent(ch chan<- core.StateSyncEvent) event.Subscription { + return b.eth.BlockChain().SubscribeStateSyncEvent(ch) +} diff --git a/eth/filters/bor_api.go b/eth/filters/bor_api.go new file mode 100644 index 000000000..925a6feeb --- /dev/null +++ b/eth/filters/bor_api.go @@ -0,0 +1,43 @@ +package filters + +import ( + "bytes" + "context" + + ethereum "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/rpc" +) + +// NewDeposits send a notification each time a new deposit received from bridge. +func (api *PublicFilterAPI) NewDeposits(ctx context.Context, crit ethereum.StateSyncFilter) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported + } + + rpcSub := notifier.CreateSubscription() + go func() { + stateSyncData := make(chan *types.StateSyncData) + stateSyncSub := api.events.SubscribeNewDeposits(stateSyncData) + + for { + select { + case h := <-stateSyncData: + if crit.ID == h.ID || bytes.Compare(crit.Contract.Bytes(), h.Contract.Bytes()) == 0 || + (crit.ID == 0 && crit.Contract == common.Address{}) { + notifier.Notify(rpcSub.ID, h) + } + case <-rpcSub.Err(): + stateSyncSub.Unsubscribe() + return + case <-notifier.Closed(): + stateSyncSub.Unsubscribe() + return + } + } + }() + + return rpcSub, nil +} diff --git a/eth/filters/bor_filter_system.go b/eth/filters/bor_filter_system.go new file mode 100644 index 000000000..c3308de27 --- /dev/null +++ b/eth/filters/bor_filter_system.go @@ -0,0 +1,32 @@ +package filters + +import ( + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/rpc" +) + +func (es *EventSystem) handleStateSyncEvent(filters filterIndex, ev core.StateSyncEvent) { + for _, f := range filters[StateSyncSubscription] { + f.stateSyncData <- ev.Data + } +} + +// SubscribeNewDeposits creates a subscription that writes details about the new state sync events (from mainchain to Bor) +func (es *EventSystem) SubscribeNewDeposits(data chan *types.StateSyncData) *Subscription { + sub := &subscription{ + id: rpc.NewID(), + typ: StateSyncSubscription, + created: time.Now(), + logs: make(chan []*types.Log), + hashes: make(chan []common.Hash), + headers: make(chan *types.Header), + stateSyncData: data, + installed: make(chan struct{}), + err: make(chan error), + } + return es.subscribe(sub) +} diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 17635837a..03bb08bd0 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -45,6 +45,8 @@ type Backend interface { BloomStatus() (uint64, uint64) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) + + SubscribeStateSyncEvent(ch chan<- core.StateSyncEvent) event.Subscription } // Filter can be used to retrieve and filter logs. diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index a105ec51c..603d0f02a 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -52,7 +52,9 @@ const ( PendingTransactionsSubscription // BlocksSubscription queries hashes for blocks that are imported BlocksSubscription - // LastSubscription keeps track of the last index + // StateSyncSubscription to listen main chain state + StateSyncSubscription + // LastIndexSubscription keeps track of the last index LastIndexSubscription ) @@ -66,6 +68,8 @@ const ( logsChanSize = 10 // chainEvChanSize is the size of channel listening to ChainEvent. chainEvChanSize = 10 + // stateEvChanSize is the size of channel listening to StateSyncEvent. + stateEvChanSize = 10 ) type subscription struct { @@ -78,6 +82,8 @@ type subscription struct { headers chan *types.Header installed chan struct{} // closed when the filter is installed err chan error // closed when the filter is uninstalled + + stateSyncData chan *types.StateSyncData } // EventSystem creates subscriptions, processes events and broadcasts them to the @@ -102,6 +108,10 @@ type EventSystem struct { pendingLogsCh chan []*types.Log // Channel to receive new log event rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event chainCh chan core.ChainEvent // Channel to receive new chain event + + // Bor related subscription and channels + stateSyncSub event.Subscription // Subscription for new state event + stateSyncCh chan core.StateSyncEvent // Channel to receive deposit state change event } // NewEventSystem creates a new manager that listens for event on the given mux, @@ -121,6 +131,7 @@ func NewEventSystem(backend Backend, lightMode bool) *EventSystem { rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize), pendingLogsCh: make(chan []*types.Log, logsChanSize), chainCh: make(chan core.ChainEvent, chainEvChanSize), + stateSyncCh: make(chan core.StateSyncEvent, stateEvChanSize), } // Subscribe events @@ -129,6 +140,7 @@ func NewEventSystem(backend Backend, lightMode bool) *EventSystem { m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh) m.chainSub = m.backend.SubscribeChainEvent(m.chainCh) m.pendingLogsSub = m.backend.SubscribePendingLogsEvent(m.pendingLogsCh) + m.stateSyncSub = m.backend.SubscribeStateSyncEvent(m.stateSyncCh) // Make sure none of the subscriptions are empty if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogsSub == nil { @@ -448,6 +460,7 @@ func (es *EventSystem) eventLoop() { es.rmLogsSub.Unsubscribe() es.pendingLogsSub.Unsubscribe() es.chainSub.Unsubscribe() + es.stateSyncSub.Unsubscribe() }() index := make(filterIndex) @@ -467,6 +480,8 @@ func (es *EventSystem) eventLoop() { es.handlePendingLogs(index, ev) case ev := <-es.chainCh: es.handleChainEvent(index, ev) + case ev := <-es.stateSyncCh: + es.handleStateSyncEvent(index, ev) case f := <-es.install: if f.typ == MinedAndPendingLogsSubscription { diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index c8d1d43ab..204ff9ffa 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -47,6 +47,8 @@ type testBackend struct { rmLogsFeed event.Feed pendingLogsFeed event.Feed chainFeed event.Feed + + stateSyncFeed event.Feed } func (b *testBackend) ChainDb() ethdb.Database { @@ -152,6 +154,10 @@ func (b *testBackend) ServiceFilter(ctx context.Context, session *bloombits.Matc }() } +func (b *testBackend) SubscribeStateSyncEvent(ch chan<- core.StateSyncEvent) event.Subscription { + return b.stateSyncFeed.Subscribe(ch) +} + // TestBlockSubscription tests if a block subscription returns block hashes for posted chain events. // It creates multiple subscriptions: // - one at the start and should receive all posted chain events and a second (blockHashes) diff --git a/ethclient/bor_ethclient.go b/ethclient/bor_ethclient.go new file mode 100644 index 000000000..f2758cc52 --- /dev/null +++ b/ethclient/bor_ethclient.go @@ -0,0 +1,14 @@ +package ethclient + +import ( + "context" +) + +// GetRootHash returns the merkle root of the block headers +func (ec *Client) GetRootHash(ctx context.Context, startBlockNumber uint64, endBlockNumber uint64) (string, error) { + var rootHash string + if err := ec.c.CallContext(ctx, &rootHash, "eth_getRootHash", startBlockNumber, endBlockNumber); err != nil { + return "", err + } + return rootHash, nil +} diff --git a/interfaces.go b/interfaces.go index 1ff31f96b..bbae0b3be 100644 --- a/interfaces.go +++ b/interfaces.go @@ -209,3 +209,9 @@ type GasEstimator interface { type PendingStateEventer interface { SubscribePendingTransactions(ctx context.Context, ch chan<- *types.Transaction) (Subscription, error) } + +// StateSyncFilter state sync filter +type StateSyncFilter struct { + ID uint64 + Contract common.Address +} diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index 10e716bf2..10251ace5 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -86,6 +86,10 @@ type Backend interface { SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription + // Bor related APIs + SubscribeStateSyncEvent(ch chan<- core.StateSyncEvent) event.Subscription + GetRootHash(ctx context.Context, starBlockNr uint64, endBlockNr uint64) (string, error) + ChainConfig() *params.ChainConfig Engine() consensus.Engine } diff --git a/internal/ethapi/bor_api.go b/internal/ethapi/bor_api.go new file mode 100644 index 000000000..6958c0c51 --- /dev/null +++ b/internal/ethapi/bor_api.go @@ -0,0 +1,14 @@ +package ethapi + +import ( + "context" +) + +// GetRootHash returns root hash for given start and end block +func (s *PublicBlockChainAPI) GetRootHash(ctx context.Context, starBlockNr uint64, endBlockNr uint64) (string, error) { + root, err := s.b.GetRootHash(ctx, starBlockNr, endBlockNr) + if err != nil { + return "", err + } + return root, nil +} diff --git a/les/bor_api_backend.go b/les/bor_api_backend.go new file mode 100644 index 000000000..248246810 --- /dev/null +++ b/les/bor_api_backend.go @@ -0,0 +1,19 @@ +package les + +import ( + "context" + "errors" + + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/event" +) + +// GetRootHash returns root hash for given start and end block +func (b *LesApiBackend) GetRootHash(ctx context.Context, starBlockNr uint64, endBlockNr uint64) (string, error) { + return "", errors.New("Not implemented") +} + +// SubscribeStateSyncEvent subscribe state sync event +func (b *LesApiBackend) SubscribeStateSyncEvent(ch chan<- core.StateSyncEvent) event.Subscription { + return b.eth.blockchain.SubscribeStateSyncEvent(ch) +} diff --git a/light/lightchain.go b/light/lightchain.go index 6fc321ae0..e6f962828 100644 --- a/light/lightchain.go +++ b/light/lightchain.go @@ -580,3 +580,9 @@ func (lc *LightChain) DisableCheckFreq() { func (lc *LightChain) EnableCheckFreq() { atomic.StoreInt32(&lc.disableCheckFreq, 0) } + +// SubscribeStateSyncEvent implements the interface of filters.Backend +// LightChain does not send core.NewStateChangeSyncEvent, so return an empty subscription. +func (lc *LightChain) SubscribeStateSyncEvent(ch chan<- core.StateSyncEvent) event.Subscription { + return lc.scope.Track(new(event.Feed).Subscribe(ch)) +} -- GitLab