diff --git a/accounts/abi/bind/backends/bor_simulated.go b/accounts/abi/bind/backends/bor_simulated.go new file mode 100644 index 0000000000000000000000000000000000000000..3ef16213296d21f15abd963a8782a3f8cb3fda08 --- /dev/null +++ b/accounts/abi/bind/backends/bor_simulated.go @@ -0,0 +1,11 @@ +package backends + +import ( + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/event" +) + +// SubscribeStateSyncEvent subscribes to state sync events +func (fb *filterBackend) SubscribeStateSyncEvent(ch chan<- core.StateSyncEvent) event.Subscription { + return fb.bc.SubscribeStateSyncEvent(ch) +} diff --git a/consensus/bor/bor.go b/consensus/bor/bor.go index 6210a6207e3cec9ef81f5dce49d9f0245dea207c..0c3a840a8bc36c186713db03c1aa88933498aafd 100644 --- a/consensus/bor/bor.go +++ b/consensus/bor/bor.go @@ -224,8 +224,7 @@ type Bor struct { HeimdallClient IHeimdallClient WithoutHeimdall bool - stateSyncFeed event.Feed - scope event.SubscriptionScope + scope event.SubscriptionScope // The fields below are for testing only fakeDiff bool // Skip difficulty verifications } @@ -653,6 +652,8 @@ func (c *Bor) Prepare(chain consensus.ChainHeaderReader, header *types.Header) e // Finalize implements consensus.Engine, ensuring no uncles are set, nor block // rewards given. func (c *Bor) Finalize(chain consensus.ChainHeaderReader, header *types.Header, state *state.StateDB, txs []*types.Transaction, uncles []*types.Header) { + stateSyncData := []*types.StateSyncData{} + var err error headerNumber := header.Number.Uint64() if headerNumber%c.config.Sprint == 0 { @@ -665,7 +666,7 @@ func (c *Bor) Finalize(chain consensus.ChainHeaderReader, header *types.Header, if !c.WithoutHeimdall { // commit statees - _, err = c.CommitStates(state, header, cx) + stateSyncData, err = c.CommitStates(state, header, cx) if err != nil { log.Error("Error while committing states", "error", err) return @@ -676,11 +677,17 @@ func (c *Bor) Finalize(chain consensus.ChainHeaderReader, header *types.Header, // No block rewards in PoA, so the state remains as is and uncles are dropped header.Root = state.IntermediateRoot(chain.Config().IsEIP158(header.Number)) header.UncleHash = types.CalcUncleHash(nil) + + // Set state sync data to blockchain + bc := chain.(*core.BlockChain) + bc.SetStateSync(stateSyncData) } // FinalizeAndAssemble implements consensus.Engine, ensuring no uncles are set, // nor block rewards given, and returns the final block. func (c *Bor) FinalizeAndAssemble(chain consensus.ChainHeaderReader, header *types.Header, state *state.StateDB, txs []*types.Transaction, uncles []*types.Header, receipts []*types.Receipt) (*types.Block, error) { + stateSyncData := []*types.StateSyncData{} + headerNumber := header.Number.Uint64() if headerNumber%c.config.Sprint == 0 { cx := chainContext{Chain: chain, Bor: c} @@ -694,7 +701,7 @@ func (c *Bor) FinalizeAndAssemble(chain consensus.ChainHeaderReader, header *typ if !c.WithoutHeimdall { // commit states - _, err = c.CommitStates(state, header, cx) + stateSyncData, err = c.CommitStates(state, header, cx) if err != nil { log.Error("Error while committing states", "error", err) return nil, err @@ -708,6 +715,11 @@ func (c *Bor) FinalizeAndAssemble(chain consensus.ChainHeaderReader, header *typ // Assemble block block := types.NewBlock(header, txs, nil, receipts, new(trie.Trie)) + + // set state sync + bc := chain.(*core.BlockChain) + bc.SetStateSync(stateSyncData) + // return the final block for sealing return block, nil } @@ -1090,8 +1102,8 @@ func (c *Bor) CommitStates( state *state.StateDB, header *types.Header, chain chainContext, -) ([]*types.StateData, error) { - stateSyncs := make([]*types.StateData, 0) +) ([]*types.StateSyncData, error) { + stateSyncs := make([]*types.StateSyncData, 0) number := header.Number.Uint64() _lastStateID, err := c.GenesisContractsClient.LastStateId(number - 1) if err != nil { @@ -1116,8 +1128,8 @@ func (c *Bor) CommitStates( break } - stateData := types.StateData{ - Did: eventRecord.ID, + stateData := types.StateSyncData{ + ID: eventRecord.ID, Contract: eventRecord.Contract, Data: hex.EncodeToString(eventRecord.Data), TxHash: eventRecord.TxHash, diff --git a/core/blockchain.go b/core/blockchain.go index 898adf1a7e72af7036a0890b3e253260920433b9..0396192cd9a6607388b8135a46f98cd338c11481 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -211,6 +211,10 @@ type BlockChain struct { shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block. terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion. writeLegacyJournal bool // Testing flag used to flush the snapshot journal in legacy format. + + // Bor related changes + stateSyncData []*types.StateSyncData // State sync data + stateSyncFeed event.Feed // State sync feed } // NewBlockChain returns a fully initialised block chain using information @@ -1630,6 +1634,11 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. // event here. if emitHeadEvent { bc.chainHeadFeed.Send(ChainHeadEvent{Block: block}) + // BOR state sync feed related changes + for _, data := range bc.stateSyncData { + bc.stateSyncFeed.Send(StateSyncEvent{Data: data}) + } + // BOR } } else { bc.chainSideFeed.Send(ChainSideEvent{Block: block}) @@ -1887,6 +1896,12 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er atomic.StoreUint32(&followupInterrupt, 1) return it.index, err } + // BOR state sync feed related changes + for _, data := range bc.stateSyncData { + bc.stateSyncFeed.Send(StateSyncEvent{Data: data}) + } + // BOR + // Update the metrics touched during block processing accountReadTimer.Update(statedb.AccountReads) // Account reads are complete, we can mark them storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete, we can mark them @@ -2555,3 +2570,17 @@ func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscript func (bc *BlockChain) SubscribeBlockProcessingEvent(ch chan<- bool) event.Subscription { return bc.scope.Track(bc.blockProcFeed.Subscribe(ch)) } + +// +// Bor related changes +// + +// SetStateSync set sync data in state_data +func (bc *BlockChain) SetStateSync(stateData []*types.StateSyncData) { + bc.stateSyncData = stateData +} + +// SubscribeStateSyncEvent registers a subscription of StateSyncEvent. +func (bc *BlockChain) SubscribeStateSyncEvent(ch chan<- StateSyncEvent) event.Subscription { + return bc.scope.Track(bc.stateSyncFeed.Subscribe(ch)) +} diff --git a/core/bor_events.go b/core/bor_events.go new file mode 100644 index 0000000000000000000000000000000000000000..d8a5b38d0e9d1ada4a01dc4a9c3e6b02362d10ea --- /dev/null +++ b/core/bor_events.go @@ -0,0 +1,10 @@ +package core + +import ( + "github.com/ethereum/go-ethereum/core/types" +) + +// StateSyncEvent represents state sync events +type StateSyncEvent struct { + Data *types.StateSyncData +} diff --git a/core/types/state_data.go b/core/types/state_data.go index 38174461c78d634009d1a55bb6d1c815b69080f4..d04f91ec53dd71cf94e45908afd73a8c9df87bbd 100644 --- a/core/types/state_data.go +++ b/core/types/state_data.go @@ -2,9 +2,9 @@ package types import "github.com/ethereum/go-ethereum/common" -// StateData represents state received from Ethereum Blockchain -type StateData struct { - Did uint64 +// StateSyncData represents state received from Ethereum Blockchain +type StateSyncData struct { + ID uint64 Contract common.Address Data string TxHash common.Hash diff --git a/eth/bor_api_backend.go b/eth/bor_api_backend.go new file mode 100644 index 0000000000000000000000000000000000000000..860b99de5bc102145d1a93f4c99de94faa75e712 --- /dev/null +++ b/eth/bor_api_backend.go @@ -0,0 +1,35 @@ +package eth + +import ( + "context" + "errors" + + "github.com/ethereum/go-ethereum/consensus/bor" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/event" +) + +// GetRootHash returns root hash for given start and end block +func (b *EthAPIBackend) GetRootHash(ctx context.Context, starBlockNr uint64, endBlockNr uint64) (string, error) { + var api *bor.API + for _, _api := range b.eth.Engine().APIs(b.eth.BlockChain()) { + if _api.Namespace == "bor" { + api = _api.Service.(*bor.API) + } + } + + if api == nil { + return "", errors.New("Only available in Bor engine") + } + + root, err := api.GetRootHash(starBlockNr, endBlockNr) + if err != nil { + return "", err + } + return root, nil +} + +// SubscribeStateSyncEvent subscribes to state sync event +func (b *EthAPIBackend) SubscribeStateSyncEvent(ch chan<- core.StateSyncEvent) event.Subscription { + return b.eth.BlockChain().SubscribeStateSyncEvent(ch) +} diff --git a/eth/filters/bor_api.go b/eth/filters/bor_api.go new file mode 100644 index 0000000000000000000000000000000000000000..925a6feebb012086e636b1a8850dfcbd4ef16195 --- /dev/null +++ b/eth/filters/bor_api.go @@ -0,0 +1,43 @@ +package filters + +import ( + "bytes" + "context" + + ethereum "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/rpc" +) + +// NewDeposits send a notification each time a new deposit received from bridge. +func (api *PublicFilterAPI) NewDeposits(ctx context.Context, crit ethereum.StateSyncFilter) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported + } + + rpcSub := notifier.CreateSubscription() + go func() { + stateSyncData := make(chan *types.StateSyncData) + stateSyncSub := api.events.SubscribeNewDeposits(stateSyncData) + + for { + select { + case h := <-stateSyncData: + if crit.ID == h.ID || bytes.Compare(crit.Contract.Bytes(), h.Contract.Bytes()) == 0 || + (crit.ID == 0 && crit.Contract == common.Address{}) { + notifier.Notify(rpcSub.ID, h) + } + case <-rpcSub.Err(): + stateSyncSub.Unsubscribe() + return + case <-notifier.Closed(): + stateSyncSub.Unsubscribe() + return + } + } + }() + + return rpcSub, nil +} diff --git a/eth/filters/bor_filter_system.go b/eth/filters/bor_filter_system.go new file mode 100644 index 0000000000000000000000000000000000000000..c3308de27b2648fe649a782a0fb0da7c11970617 --- /dev/null +++ b/eth/filters/bor_filter_system.go @@ -0,0 +1,32 @@ +package filters + +import ( + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/rpc" +) + +func (es *EventSystem) handleStateSyncEvent(filters filterIndex, ev core.StateSyncEvent) { + for _, f := range filters[StateSyncSubscription] { + f.stateSyncData <- ev.Data + } +} + +// SubscribeNewDeposits creates a subscription that writes details about the new state sync events (from mainchain to Bor) +func (es *EventSystem) SubscribeNewDeposits(data chan *types.StateSyncData) *Subscription { + sub := &subscription{ + id: rpc.NewID(), + typ: StateSyncSubscription, + created: time.Now(), + logs: make(chan []*types.Log), + hashes: make(chan []common.Hash), + headers: make(chan *types.Header), + stateSyncData: data, + installed: make(chan struct{}), + err: make(chan error), + } + return es.subscribe(sub) +} diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 17635837af8a04f8dbd63208dbea9121b4f3d8f7..03bb08bd03df19a8f0cfb8a1cb8f72a87ae7a088 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -45,6 +45,8 @@ type Backend interface { BloomStatus() (uint64, uint64) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) + + SubscribeStateSyncEvent(ch chan<- core.StateSyncEvent) event.Subscription } // Filter can be used to retrieve and filter logs. diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index a105ec51c350d2111125cb545b693fd18b8683ac..603d0f02abbbad8432a09bb1518a946c0e4b71bf 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -52,7 +52,9 @@ const ( PendingTransactionsSubscription // BlocksSubscription queries hashes for blocks that are imported BlocksSubscription - // LastSubscription keeps track of the last index + // StateSyncSubscription to listen main chain state + StateSyncSubscription + // LastIndexSubscription keeps track of the last index LastIndexSubscription ) @@ -66,6 +68,8 @@ const ( logsChanSize = 10 // chainEvChanSize is the size of channel listening to ChainEvent. chainEvChanSize = 10 + // stateEvChanSize is the size of channel listening to StateSyncEvent. + stateEvChanSize = 10 ) type subscription struct { @@ -78,6 +82,8 @@ type subscription struct { headers chan *types.Header installed chan struct{} // closed when the filter is installed err chan error // closed when the filter is uninstalled + + stateSyncData chan *types.StateSyncData } // EventSystem creates subscriptions, processes events and broadcasts them to the @@ -102,6 +108,10 @@ type EventSystem struct { pendingLogsCh chan []*types.Log // Channel to receive new log event rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event chainCh chan core.ChainEvent // Channel to receive new chain event + + // Bor related subscription and channels + stateSyncSub event.Subscription // Subscription for new state event + stateSyncCh chan core.StateSyncEvent // Channel to receive deposit state change event } // NewEventSystem creates a new manager that listens for event on the given mux, @@ -121,6 +131,7 @@ func NewEventSystem(backend Backend, lightMode bool) *EventSystem { rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize), pendingLogsCh: make(chan []*types.Log, logsChanSize), chainCh: make(chan core.ChainEvent, chainEvChanSize), + stateSyncCh: make(chan core.StateSyncEvent, stateEvChanSize), } // Subscribe events @@ -129,6 +140,7 @@ func NewEventSystem(backend Backend, lightMode bool) *EventSystem { m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh) m.chainSub = m.backend.SubscribeChainEvent(m.chainCh) m.pendingLogsSub = m.backend.SubscribePendingLogsEvent(m.pendingLogsCh) + m.stateSyncSub = m.backend.SubscribeStateSyncEvent(m.stateSyncCh) // Make sure none of the subscriptions are empty if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogsSub == nil { @@ -448,6 +460,7 @@ func (es *EventSystem) eventLoop() { es.rmLogsSub.Unsubscribe() es.pendingLogsSub.Unsubscribe() es.chainSub.Unsubscribe() + es.stateSyncSub.Unsubscribe() }() index := make(filterIndex) @@ -467,6 +480,8 @@ func (es *EventSystem) eventLoop() { es.handlePendingLogs(index, ev) case ev := <-es.chainCh: es.handleChainEvent(index, ev) + case ev := <-es.stateSyncCh: + es.handleStateSyncEvent(index, ev) case f := <-es.install: if f.typ == MinedAndPendingLogsSubscription { diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index c8d1d43abbd81a99dd23e3d167e020be3326be63..204ff9ffaa8316f58c85896d0b18a5df911cf6cb 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -47,6 +47,8 @@ type testBackend struct { rmLogsFeed event.Feed pendingLogsFeed event.Feed chainFeed event.Feed + + stateSyncFeed event.Feed } func (b *testBackend) ChainDb() ethdb.Database { @@ -152,6 +154,10 @@ func (b *testBackend) ServiceFilter(ctx context.Context, session *bloombits.Matc }() } +func (b *testBackend) SubscribeStateSyncEvent(ch chan<- core.StateSyncEvent) event.Subscription { + return b.stateSyncFeed.Subscribe(ch) +} + // TestBlockSubscription tests if a block subscription returns block hashes for posted chain events. // It creates multiple subscriptions: // - one at the start and should receive all posted chain events and a second (blockHashes) diff --git a/ethclient/bor_ethclient.go b/ethclient/bor_ethclient.go new file mode 100644 index 0000000000000000000000000000000000000000..f2758cc524c6e9f8dbff6dd2f4e58d44a1a66cab --- /dev/null +++ b/ethclient/bor_ethclient.go @@ -0,0 +1,14 @@ +package ethclient + +import ( + "context" +) + +// GetRootHash returns the merkle root of the block headers +func (ec *Client) GetRootHash(ctx context.Context, startBlockNumber uint64, endBlockNumber uint64) (string, error) { + var rootHash string + if err := ec.c.CallContext(ctx, &rootHash, "eth_getRootHash", startBlockNumber, endBlockNumber); err != nil { + return "", err + } + return rootHash, nil +} diff --git a/interfaces.go b/interfaces.go index 1ff31f96b6a6c2a8a4084e3628f06863503096e8..bbae0b3bee3d7236d3c6b6d49f8f3133c27aa91b 100644 --- a/interfaces.go +++ b/interfaces.go @@ -209,3 +209,9 @@ type GasEstimator interface { type PendingStateEventer interface { SubscribePendingTransactions(ctx context.Context, ch chan<- *types.Transaction) (Subscription, error) } + +// StateSyncFilter state sync filter +type StateSyncFilter struct { + ID uint64 + Contract common.Address +} diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index 10e716bf200dadb4c4a2035fcf9c33dd53c3351d..10251ace52e2f95c91b8550236c9a3e18327a797 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -86,6 +86,10 @@ type Backend interface { SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription + // Bor related APIs + SubscribeStateSyncEvent(ch chan<- core.StateSyncEvent) event.Subscription + GetRootHash(ctx context.Context, starBlockNr uint64, endBlockNr uint64) (string, error) + ChainConfig() *params.ChainConfig Engine() consensus.Engine } diff --git a/internal/ethapi/bor_api.go b/internal/ethapi/bor_api.go new file mode 100644 index 0000000000000000000000000000000000000000..6958c0c513e11805260409d00305e1bbd96a53f4 --- /dev/null +++ b/internal/ethapi/bor_api.go @@ -0,0 +1,14 @@ +package ethapi + +import ( + "context" +) + +// GetRootHash returns root hash for given start and end block +func (s *PublicBlockChainAPI) GetRootHash(ctx context.Context, starBlockNr uint64, endBlockNr uint64) (string, error) { + root, err := s.b.GetRootHash(ctx, starBlockNr, endBlockNr) + if err != nil { + return "", err + } + return root, nil +} diff --git a/internal/web3ext/bor_ext.go b/internal/web3ext/bor_ext.go new file mode 100644 index 0000000000000000000000000000000000000000..b7fe9b4886af4ebe3348f5aa23573cec97aad3a4 --- /dev/null +++ b/internal/web3ext/bor_ext.go @@ -0,0 +1,53 @@ +package web3ext + +// BorJs bor related apis +const BorJs = ` +web3._extend({ + property: 'bor', + methods: [ + new web3._extend.Method({ + name: 'getSnapshot', + call: 'bor_getSnapshot', + params: 1, + inputFormatter: [null] + }), + new web3._extend.Method({ + name: 'getAuthor', + call: 'bor_getAuthor', + params: 1, + inputFormatter: [null] + }), + new web3._extend.Method({ + name: 'getSnapshotAtHash', + call: 'bor_getSnapshotAtHash', + params: 1 + }), + new web3._extend.Method({ + name: 'getSigners', + call: 'bor_getSigners', + params: 1, + inputFormatter: [null] + }), + new web3._extend.Method({ + name: 'getSignersAtHash', + call: 'bor_getSignersAtHash', + params: 1 + }), + new web3._extend.Method({ + name: 'getCurrentProposer', + call: 'bor_getCurrentProposer', + params: 0 + }), + new web3._extend.Method({ + name: 'getCurrentValidators', + call: 'bor_getCurrentValidators', + params: 0 + }), + new web3._extend.Method({ + name: 'getRootHash', + call: 'bor_getRootHash', + params: 2, + }), + ] +}); +` diff --git a/internal/web3ext/web3ext.go b/internal/web3ext/web3ext.go index 77954bbbf0011aeb30181f146b4887fe296158d0..682c262fc1bbcb97b6e19f8932e5a239c892d613 100644 --- a/internal/web3ext/web3ext.go +++ b/internal/web3ext/web3ext.go @@ -34,6 +34,9 @@ var Modules = map[string]string{ "txpool": TxpoolJs, "les": LESJs, "lespay": LESPayJs, + + // Bor related apis + "bor": BorJs, } const ChequebookJs = ` diff --git a/les/bor_api_backend.go b/les/bor_api_backend.go new file mode 100644 index 0000000000000000000000000000000000000000..248246810981480193b167306145613cad249773 --- /dev/null +++ b/les/bor_api_backend.go @@ -0,0 +1,19 @@ +package les + +import ( + "context" + "errors" + + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/event" +) + +// GetRootHash returns root hash for given start and end block +func (b *LesApiBackend) GetRootHash(ctx context.Context, starBlockNr uint64, endBlockNr uint64) (string, error) { + return "", errors.New("Not implemented") +} + +// SubscribeStateSyncEvent subscribe state sync event +func (b *LesApiBackend) SubscribeStateSyncEvent(ch chan<- core.StateSyncEvent) event.Subscription { + return b.eth.blockchain.SubscribeStateSyncEvent(ch) +} diff --git a/light/lightchain.go b/light/lightchain.go index 6fc321ae0b5717d182dc6a79a0a71e5ab06481b5..e6f9628282e0aa04bb4f742a762967ed911085d3 100644 --- a/light/lightchain.go +++ b/light/lightchain.go @@ -580,3 +580,9 @@ func (lc *LightChain) DisableCheckFreq() { func (lc *LightChain) EnableCheckFreq() { atomic.StoreInt32(&lc.disableCheckFreq, 0) } + +// SubscribeStateSyncEvent implements the interface of filters.Backend +// LightChain does not send core.NewStateChangeSyncEvent, so return an empty subscription. +func (lc *LightChain) SubscribeStateSyncEvent(ch chan<- core.StateSyncEvent) event.Subscription { + return lc.scope.Track(new(event.Feed).Subscribe(ch)) +}