diff --git a/consensus/bor/bor.go b/consensus/bor/bor.go index 9e65d9c8c67d37eb4313e7d3eacd7b6986cad9f4..35e3bc64a02d9464812f3d6d3cc904876b0b7830 100644 --- a/consensus/bor/bor.go +++ b/consensus/bor/bor.go @@ -666,7 +666,8 @@ func (c *Bor) Finalize(chain consensus.ChainReader, header *types.Header, state if !c.WithoutHeimdall { // commit statees - if err := c.CommitStates(state, header, cx); err != nil { + _, err := c.CommitStates(state, header, cx) + if err != nil { log.Error("Error while committing states", "error", err) return } @@ -681,6 +682,7 @@ func (c *Bor) Finalize(chain consensus.ChainReader, header *types.Header, state // FinalizeAndAssemble implements consensus.Engine, ensuring no uncles are set, // nor block rewards given, and returns the final block. func (c *Bor) FinalizeAndAssemble(chain consensus.ChainReader, header *types.Header, state *state.StateDB, txs []*types.Transaction, uncles []*types.Header, receipts []*types.Receipt) (*types.Block, error) { + stateSyncData := []*types.StateData{} headerNumber := header.Number.Uint64() if headerNumber%c.config.Sprint == 0 { cx := chainContext{Chain: chain, Bor: c} @@ -694,7 +696,8 @@ func (c *Bor) FinalizeAndAssemble(chain consensus.ChainReader, header *types.Hea if !c.WithoutHeimdall { // commit statees - if err := c.CommitStates(state, header, cx); err != nil { + stateSyncData, err = c.CommitStates(state, header, cx) + if err != nil { log.Error("Error while committing states", "error", err) return nil, err } @@ -704,9 +707,12 @@ func (c *Bor) FinalizeAndAssemble(chain consensus.ChainReader, header *types.Hea // No block rewards in PoA, so the state remains as is and uncles are dropped header.Root = state.IntermediateRoot(chain.Config().IsEIP158(header.Number)) header.UncleHash = types.CalcUncleHash(nil) + // Assemble block + block := types.NewBlock(header, txs, nil, receipts) - // Assemble and return the final block for sealing - return types.NewBlock(header, txs, nil, receipts), nil + block.SetStateSync(stateSyncData) + // return the final block for sealing + return block, nil } // Authorize injects a private key into the consensus engine to mint new blocks @@ -723,7 +729,6 @@ func (c *Bor) Authorize(signer common.Address, signFn SignerFn) { // the local signing credentials. func (c *Bor) Seal(chain consensus.ChainReader, block *types.Block, results chan<- *types.Block, stop <-chan struct{}) error { header := block.Header() - // Sealing the genesis block is not supported number := header.Number.Uint64() if number == 0 { @@ -1088,11 +1093,12 @@ func (c *Bor) CommitStates( state *state.StateDB, header *types.Header, chain chainContext, -) error { +) ([]*types.StateData, error) { + stateSyncs := make([]*types.StateData, 0) number := header.Number.Uint64() _lastStateID, err := c.GenesisContractsClient.LastStateId(number - 1) if err != nil { - return err + return nil, err } to := time.Unix(int64(chain.Chain.GetHeaderByNumber(number-c.config.Sprint).Time), 0) @@ -1119,16 +1125,14 @@ func (c *Bor) CommitStates( Data: hex.EncodeToString(eventRecord.Data), TxHash: eventRecord.TxHash, } - go func() { - c.stateSyncFeed.Send(core.StateSyncEvent{StateData: &stateData}) - }() + stateSyncs = append(stateSyncs, &stateData) if err := c.GenesisContractsClient.CommitState(eventRecord, state, header, chain); err != nil { - return err + return nil, err } lastStateID++ } - return nil + return stateSyncs, nil } func validateEventRecord(eventRecord *EventRecordWithTime, number uint64, to time.Time, lastStateID uint64, chainID string) error { @@ -1139,11 +1143,6 @@ func validateEventRecord(eventRecord *EventRecordWithTime, number uint64, to tim return nil } -// SubscribeStateSyncEvent registers a subscription of StateSyncEvent. -func (c *Bor) SubscribeStateSyncEvent(ch chan<- core.StateSyncEvent) event.Subscription { - return c.scope.Track(c.stateSyncFeed.Subscribe(ch)) -} - func (c *Bor) SetHeimdallClient(h IHeimdallClient) { c.HeimdallClient = h } diff --git a/core/blockchain.go b/core/blockchain.go index 5f0e31c79e3a9ef44a426e5b0e15f71c665f9a8b..5a70de3fe843930cdb1fb210975319bb745ca2dc 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1542,6 +1542,10 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. if emitHeadEvent { bc.chainHeadFeed.Send(ChainHeadEvent{Block: block}) } + syncData := block.StateSyncData() + for _, data := range syncData { + bc.stateSyncFeed.Send(StateSyncEvent{StateData: data}) + } } else { bc.chainSideFeed.Send(ChainSideEvent{Block: block}) } @@ -2444,6 +2448,11 @@ func (bc *BlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Su return bc.scope.Track(bc.chainHeadFeed.Subscribe(ch)) } +// SubscribeStateSyncEvent registers a subscription of StateSyncEvent. +func (bc *BlockChain) SubscribeStateSyncEvent(ch chan<- StateSyncEvent) event.Subscription { + return bc.scope.Track(bc.stateSyncFeed.Subscribe(ch)) +} + // SubscribeChainSideEvent registers a subscription of ChainSideEvent. func (bc *BlockChain) SubscribeChainSideEvent(ch chan<- ChainSideEvent) event.Subscription { return bc.scope.Track(bc.chainSideFeed.Subscribe(ch)) @@ -2454,11 +2463,6 @@ func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscript return bc.scope.Track(bc.logsFeed.Subscribe(ch)) } -// SubscribeStateSyncEvent registers a subscription of StateSyncEvent. -func (bc *BlockChain) SubscribeStateSyncEvent(ch chan<- StateSyncEvent) event.Subscription { - return bc.scope.Track(bc.stateSyncFeed.Subscribe(ch)) -} - // SubscribeBlockProcessingEvent registers a subscription of bool where true means // block processing has started while false means it has stopped. func (bc *BlockChain) SubscribeBlockProcessingEvent(ch chan<- bool) event.Subscription { diff --git a/core/types/block.go b/core/types/block.go index be31b1a60afb3e238d54eb643e1102ac1f5b8606..e00b5e0d2be57e80af5d433af033d905bd45485e 100644 --- a/core/types/block.go +++ b/core/types/block.go @@ -156,10 +156,10 @@ type Body struct { // Block represents an entire block in the Ethereum blockchain. type Block struct { - header *Header - uncles []*Header - transactions Transactions - + header *Header + uncles []*Header + transactions Transactions + stateSyncData []*StateData // caches hash atomic.Value size atomic.Value @@ -266,6 +266,11 @@ func CopyHeader(h *Header) *Header { return &cpy } +// SetStateSync set sync data in block +func (b *Block) SetStateSync(stateData []*StateData) { + b.stateSyncData = stateData +} + // DecodeRLP decodes the Ethereum func (b *Block) DecodeRLP(s *rlp.Stream) error { var eb extblock @@ -311,11 +316,12 @@ func (b *Block) Transaction(hash common.Hash) *Transaction { return nil } -func (b *Block) Number() *big.Int { return new(big.Int).Set(b.header.Number) } -func (b *Block) GasLimit() uint64 { return b.header.GasLimit } -func (b *Block) GasUsed() uint64 { return b.header.GasUsed } -func (b *Block) Difficulty() *big.Int { return new(big.Int).Set(b.header.Difficulty) } -func (b *Block) Time() uint64 { return b.header.Time } +func (b *Block) Number() *big.Int { return new(big.Int).Set(b.header.Number) } +func (b *Block) GasLimit() uint64 { return b.header.GasLimit } +func (b *Block) GasUsed() uint64 { return b.header.GasUsed } +func (b *Block) Difficulty() *big.Int { return new(big.Int).Set(b.header.Difficulty) } +func (b *Block) Time() uint64 { return b.header.Time } +func (b *Block) StateSyncData() []*StateData { return b.stateSyncData } func (b *Block) NumberU64() uint64 { return b.header.Number.Uint64() } func (b *Block) MixDigest() common.Hash { return b.header.MixDigest } @@ -372,9 +378,10 @@ func (b *Block) WithSeal(header *Header) *Block { cpy := *header return &Block{ - header: &cpy, - transactions: b.transactions, - uncles: b.uncles, + header: &cpy, + transactions: b.transactions, + uncles: b.uncles, + stateSyncData: b.stateSyncData, } } diff --git a/eth/api_backend.go b/eth/api_backend.go index e7add79b1aeea8e8ce3e09033b4c26601ecc6638..6f50814ccdc02095ff56cc4044bba0a4bd4ca5a3 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -218,8 +218,7 @@ func (b *EthAPIBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) e } func (b *EthAPIBackend) SubscribeStateSyncEvent(ch chan<- core.StateSyncEvent) event.Subscription { - engine := b.eth.Engine() - return engine.(*bor.Bor).SubscribeStateSyncEvent(ch) + return b.eth.BlockChain().SubscribeStateSyncEvent(ch) } func (b *EthAPIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {