diff --git a/consensus/bor/bor.go b/consensus/bor/bor.go index 9e65d9c8c67d37eb4313e7d3eacd7b6986cad9f4..c9e59050225e4133ee320b2a595966bf43b1a496 100644 --- a/consensus/bor/bor.go +++ b/consensus/bor/bor.go @@ -666,7 +666,8 @@ func (c *Bor) Finalize(chain consensus.ChainReader, header *types.Header, state if !c.WithoutHeimdall { // commit statees - if err := c.CommitStates(state, header, cx); err != nil { + _, err := c.CommitStates(state, header, cx) + if err != nil { log.Error("Error while committing states", "error", err) return } @@ -681,6 +682,7 @@ func (c *Bor) Finalize(chain consensus.ChainReader, header *types.Header, state // FinalizeAndAssemble implements consensus.Engine, ensuring no uncles are set, // nor block rewards given, and returns the final block. func (c *Bor) FinalizeAndAssemble(chain consensus.ChainReader, header *types.Header, state *state.StateDB, txs []*types.Transaction, uncles []*types.Header, receipts []*types.Receipt) (*types.Block, error) { + stateSyncData := &types.StateData{} headerNumber := header.Number.Uint64() if headerNumber%c.config.Sprint == 0 { cx := chainContext{Chain: chain, Bor: c} @@ -694,7 +696,8 @@ func (c *Bor) FinalizeAndAssemble(chain consensus.ChainReader, header *types.Hea if !c.WithoutHeimdall { // commit statees - if err := c.CommitStates(state, header, cx); err != nil { + stateSyncData, err = c.CommitStates(state, header, cx) + if err != nil { log.Error("Error while committing states", "error", err) return nil, err } @@ -704,9 +707,12 @@ func (c *Bor) FinalizeAndAssemble(chain consensus.ChainReader, header *types.Hea // No block rewards in PoA, so the state remains as is and uncles are dropped header.Root = state.IntermediateRoot(chain.Config().IsEIP158(header.Number)) header.UncleHash = types.CalcUncleHash(nil) + // Assemble block + block := types.NewBlock(header, txs, nil, receipts) - // Assemble and return the final block for sealing - return types.NewBlock(header, txs, nil, receipts), nil + block.SetStateSync(stateSyncData) + // return the final block for sealing + return block, nil } // Authorize injects a private key into the consensus engine to mint new blocks @@ -723,7 +729,6 @@ func (c *Bor) Authorize(signer common.Address, signFn SignerFn) { // the local signing credentials. func (c *Bor) Seal(chain consensus.ChainReader, block *types.Block, results chan<- *types.Block, stop <-chan struct{}) error { header := block.Header() - // Sealing the genesis block is not supported number := header.Number.Uint64() if number == 0 { @@ -1088,11 +1093,12 @@ func (c *Bor) CommitStates( state *state.StateDB, header *types.Header, chain chainContext, -) error { +) (*types.StateData, error) { + var stateData types.StateData number := header.Number.Uint64() _lastStateID, err := c.GenesisContractsClient.LastStateId(number - 1) if err != nil { - return err + return nil, err } to := time.Unix(int64(chain.Chain.GetHeaderByNumber(number-c.config.Sprint).Time), 0) @@ -1113,22 +1119,24 @@ func (c *Bor) CommitStates( break } - stateData := types.StateData{ + stateData = types.StateData{ Did: eventRecord.ID, Contract: eventRecord.Contract, Data: hex.EncodeToString(eventRecord.Data), TxHash: eventRecord.TxHash, } - go func() { - c.stateSyncFeed.Send(core.StateSyncEvent{StateData: &stateData}) - }() + fmt.Println("stateData", stateData) + // go func() { + // c.stateSyncFeed.Send(core.StateSyncEvent{StateData: &stateData}) + // }() if err := c.GenesisContractsClient.CommitState(eventRecord, state, header, chain); err != nil { - return err + return nil, err } lastStateID++ } - return nil + fmt.Println("retuning state Data", &stateData) + return &stateData, nil } func validateEventRecord(eventRecord *EventRecordWithTime, number uint64, to time.Time, lastStateID uint64, chainID string) error { @@ -1139,10 +1147,10 @@ func validateEventRecord(eventRecord *EventRecordWithTime, number uint64, to tim return nil } -// SubscribeStateSyncEvent registers a subscription of StateSyncEvent. -func (c *Bor) SubscribeStateSyncEvent(ch chan<- core.StateSyncEvent) event.Subscription { - return c.scope.Track(c.stateSyncFeed.Subscribe(ch)) -} +// // SubscribeStateSyncEvent registers a subscription of StateSyncEvent. +// func (c *Bor) SubscribeStateSyncEvent(ch chan<- core.StateSyncEvent) event.Subscription { +// return c.scope.Track(c.stateSyncFeed.Subscribe(ch)) +// } func (c *Bor) SetHeimdallClient(h IHeimdallClient) { c.HeimdallClient = h diff --git a/core/blockchain.go b/core/blockchain.go index 5f0e31c79e3a9ef44a426e5b0e15f71c665f9a8b..faae6dff8811aca6a15b39a478a96ad28f0313cf 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1542,6 +1542,11 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. if emitHeadEvent { bc.chainHeadFeed.Send(ChainHeadEvent{Block: block}) } + syncData := block.StateSyncData() + // TODO: add emitStateSyncEvent flag check + if syncData.Did != 0 { + bc.stateSyncFeed.Send(StateSyncEvent{StateData: syncData}) + } } else { bc.chainSideFeed.Send(ChainSideEvent{Block: block}) } @@ -2444,6 +2449,11 @@ func (bc *BlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Su return bc.scope.Track(bc.chainHeadFeed.Subscribe(ch)) } +// SubscribeStateSyncEvent registers a subscription of StateSyncEvent. +func (bc *BlockChain) SubscribeStateSyncEvent(ch chan<- StateSyncEvent) event.Subscription { + return bc.scope.Track(bc.stateSyncFeed.Subscribe(ch)) +} + // SubscribeChainSideEvent registers a subscription of ChainSideEvent. func (bc *BlockChain) SubscribeChainSideEvent(ch chan<- ChainSideEvent) event.Subscription { return bc.scope.Track(bc.chainSideFeed.Subscribe(ch)) @@ -2454,11 +2464,6 @@ func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscript return bc.scope.Track(bc.logsFeed.Subscribe(ch)) } -// SubscribeStateSyncEvent registers a subscription of StateSyncEvent. -func (bc *BlockChain) SubscribeStateSyncEvent(ch chan<- StateSyncEvent) event.Subscription { - return bc.scope.Track(bc.stateSyncFeed.Subscribe(ch)) -} - // SubscribeBlockProcessingEvent registers a subscription of bool where true means // block processing has started while false means it has stopped. func (bc *BlockChain) SubscribeBlockProcessingEvent(ch chan<- bool) event.Subscription { diff --git a/core/types/block.go b/core/types/block.go index be31b1a60afb3e238d54eb643e1102ac1f5b8606..6235bb7eb53d4578355214cdf99d47938cdeb4aa 100644 --- a/core/types/block.go +++ b/core/types/block.go @@ -156,10 +156,10 @@ type Body struct { // Block represents an entire block in the Ethereum blockchain. type Block struct { - header *Header - uncles []*Header - transactions Transactions - + header *Header + uncles []*Header + transactions Transactions + stateSyncData *StateData // caches hash atomic.Value size atomic.Value @@ -266,6 +266,11 @@ func CopyHeader(h *Header) *Header { return &cpy } +// SetStateSync set sync data in block +func (b *Block) SetStateSync(stateData *StateData) { + b.stateSyncData = stateData +} + // DecodeRLP decodes the Ethereum func (b *Block) DecodeRLP(s *rlp.Stream) error { var eb extblock @@ -311,11 +316,12 @@ func (b *Block) Transaction(hash common.Hash) *Transaction { return nil } -func (b *Block) Number() *big.Int { return new(big.Int).Set(b.header.Number) } -func (b *Block) GasLimit() uint64 { return b.header.GasLimit } -func (b *Block) GasUsed() uint64 { return b.header.GasUsed } -func (b *Block) Difficulty() *big.Int { return new(big.Int).Set(b.header.Difficulty) } -func (b *Block) Time() uint64 { return b.header.Time } +func (b *Block) Number() *big.Int { return new(big.Int).Set(b.header.Number) } +func (b *Block) GasLimit() uint64 { return b.header.GasLimit } +func (b *Block) GasUsed() uint64 { return b.header.GasUsed } +func (b *Block) Difficulty() *big.Int { return new(big.Int).Set(b.header.Difficulty) } +func (b *Block) Time() uint64 { return b.header.Time } +func (b *Block) StateSyncData() *StateData { return b.stateSyncData } func (b *Block) NumberU64() uint64 { return b.header.Number.Uint64() } func (b *Block) MixDigest() common.Hash { return b.header.MixDigest } @@ -372,9 +378,10 @@ func (b *Block) WithSeal(header *Header) *Block { cpy := *header return &Block{ - header: &cpy, - transactions: b.transactions, - uncles: b.uncles, + header: &cpy, + transactions: b.transactions, + uncles: b.uncles, + stateSyncData: b.stateSyncData, } } diff --git a/eth/api_backend.go b/eth/api_backend.go index e7add79b1aeea8e8ce3e09033b4c26601ecc6638..6f50814ccdc02095ff56cc4044bba0a4bd4ca5a3 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -218,8 +218,7 @@ func (b *EthAPIBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) e } func (b *EthAPIBackend) SubscribeStateSyncEvent(ch chan<- core.StateSyncEvent) event.Subscription { - engine := b.eth.Engine() - return engine.(*bor.Bor).SubscribeStateSyncEvent(ch) + return b.eth.BlockChain().SubscribeStateSyncEvent(ch) } func (b *EthAPIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {