From 0a552f5f782fbe8f38db65d0a4cd377960e91aa6 Mon Sep 17 00:00:00 2001
From: "ptsayli@gmail.com" <ptsayli@gmail.com>
Date: Wed, 29 Jul 2020 21:09:10 +0530
Subject: [PATCH] chg: add and broadcast statesync event from block

---
 consensus/bor/bor.go | 42 +++++++++++++++++++++++++-----------------
 core/blockchain.go   | 15 ++++++++++-----
 core/types/block.go  | 31 +++++++++++++++++++------------
 eth/api_backend.go   |  3 +--
 4 files changed, 55 insertions(+), 36 deletions(-)

diff --git a/consensus/bor/bor.go b/consensus/bor/bor.go
index 9e65d9c8c..c9e590502 100644
--- a/consensus/bor/bor.go
+++ b/consensus/bor/bor.go
@@ -666,7 +666,8 @@ func (c *Bor) Finalize(chain consensus.ChainReader, header *types.Header, state
 
 		if !c.WithoutHeimdall {
 			// commit statees
-			if err := c.CommitStates(state, header, cx); err != nil {
+			_, err := c.CommitStates(state, header, cx)
+			if err != nil {
 				log.Error("Error while committing states", "error", err)
 				return
 			}
@@ -681,6 +682,7 @@ func (c *Bor) Finalize(chain consensus.ChainReader, header *types.Header, state
 // FinalizeAndAssemble implements consensus.Engine, ensuring no uncles are set,
 // nor block rewards given, and returns the final block.
 func (c *Bor) FinalizeAndAssemble(chain consensus.ChainReader, header *types.Header, state *state.StateDB, txs []*types.Transaction, uncles []*types.Header, receipts []*types.Receipt) (*types.Block, error) {
+	stateSyncData := &types.StateData{}
 	headerNumber := header.Number.Uint64()
 	if headerNumber%c.config.Sprint == 0 {
 		cx := chainContext{Chain: chain, Bor: c}
@@ -694,7 +696,8 @@ func (c *Bor) FinalizeAndAssemble(chain consensus.ChainReader, header *types.Hea
 
 		if !c.WithoutHeimdall {
 			// commit statees
-			if err := c.CommitStates(state, header, cx); err != nil {
+			stateSyncData, err = c.CommitStates(state, header, cx)
+			if err != nil {
 				log.Error("Error while committing states", "error", err)
 				return nil, err
 			}
@@ -704,9 +707,12 @@ func (c *Bor) FinalizeAndAssemble(chain consensus.ChainReader, header *types.Hea
 	// No block rewards in PoA, so the state remains as is and uncles are dropped
 	header.Root = state.IntermediateRoot(chain.Config().IsEIP158(header.Number))
 	header.UncleHash = types.CalcUncleHash(nil)
+	// Assemble block
+	block := types.NewBlock(header, txs, nil, receipts)
 
-	// Assemble and return the final block for sealing
-	return types.NewBlock(header, txs, nil, receipts), nil
+	block.SetStateSync(stateSyncData)
+	// return the final block for sealing
+	return block, nil
 }
 
 // Authorize injects a private key into the consensus engine to mint new blocks
@@ -723,7 +729,6 @@ func (c *Bor) Authorize(signer common.Address, signFn SignerFn) {
 // the local signing credentials.
 func (c *Bor) Seal(chain consensus.ChainReader, block *types.Block, results chan<- *types.Block, stop <-chan struct{}) error {
 	header := block.Header()
-
 	// Sealing the genesis block is not supported
 	number := header.Number.Uint64()
 	if number == 0 {
@@ -1088,11 +1093,12 @@ func (c *Bor) CommitStates(
 	state *state.StateDB,
 	header *types.Header,
 	chain chainContext,
-) error {
+) (*types.StateData, error) {
+	var stateData types.StateData
 	number := header.Number.Uint64()
 	_lastStateID, err := c.GenesisContractsClient.LastStateId(number - 1)
 	if err != nil {
-		return err
+		return nil, err
 	}
 
 	to := time.Unix(int64(chain.Chain.GetHeaderByNumber(number-c.config.Sprint).Time), 0)
@@ -1113,22 +1119,24 @@ func (c *Bor) CommitStates(
 			break
 		}
 
-		stateData := types.StateData{
+		stateData = types.StateData{
 			Did:      eventRecord.ID,
 			Contract: eventRecord.Contract,
 			Data:     hex.EncodeToString(eventRecord.Data),
 			TxHash:   eventRecord.TxHash,
 		}
-		go func() {
-			c.stateSyncFeed.Send(core.StateSyncEvent{StateData: &stateData})
-		}()
+		fmt.Println("stateData", stateData)
+		// go func() {
+		// 	c.stateSyncFeed.Send(core.StateSyncEvent{StateData: &stateData})
+		// }()
 
 		if err := c.GenesisContractsClient.CommitState(eventRecord, state, header, chain); err != nil {
-			return err
+			return nil, err
 		}
 		lastStateID++
 	}
-	return nil
+	fmt.Println("retuning state Data", &stateData)
+	return &stateData, nil
 }
 
 func validateEventRecord(eventRecord *EventRecordWithTime, number uint64, to time.Time, lastStateID uint64, chainID string) error {
@@ -1139,10 +1147,10 @@ func validateEventRecord(eventRecord *EventRecordWithTime, number uint64, to tim
 	return nil
 }
 
-// SubscribeStateSyncEvent registers a subscription of StateSyncEvent.
-func (c *Bor) SubscribeStateSyncEvent(ch chan<- core.StateSyncEvent) event.Subscription {
-	return c.scope.Track(c.stateSyncFeed.Subscribe(ch))
-}
+// // SubscribeStateSyncEvent registers a subscription of StateSyncEvent.
+// func (c *Bor) SubscribeStateSyncEvent(ch chan<- core.StateSyncEvent) event.Subscription {
+// 	return c.scope.Track(c.stateSyncFeed.Subscribe(ch))
+// }
 
 func (c *Bor) SetHeimdallClient(h IHeimdallClient) {
 	c.HeimdallClient = h
diff --git a/core/blockchain.go b/core/blockchain.go
index 5f0e31c79..faae6dff8 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -1542,6 +1542,11 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
 		if emitHeadEvent {
 			bc.chainHeadFeed.Send(ChainHeadEvent{Block: block})
 		}
+		syncData := block.StateSyncData()
+		// TODO: add emitStateSyncEvent flag check
+		if syncData.Did != 0 {
+			bc.stateSyncFeed.Send(StateSyncEvent{StateData: syncData})
+		}
 	} else {
 		bc.chainSideFeed.Send(ChainSideEvent{Block: block})
 	}
@@ -2444,6 +2449,11 @@ func (bc *BlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Su
 	return bc.scope.Track(bc.chainHeadFeed.Subscribe(ch))
 }
 
+// SubscribeStateSyncEvent registers a subscription of StateSyncEvent.
+func (bc *BlockChain) SubscribeStateSyncEvent(ch chan<- StateSyncEvent) event.Subscription {
+	return bc.scope.Track(bc.stateSyncFeed.Subscribe(ch))
+}
+
 // SubscribeChainSideEvent registers a subscription of ChainSideEvent.
 func (bc *BlockChain) SubscribeChainSideEvent(ch chan<- ChainSideEvent) event.Subscription {
 	return bc.scope.Track(bc.chainSideFeed.Subscribe(ch))
@@ -2454,11 +2464,6 @@ func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscript
 	return bc.scope.Track(bc.logsFeed.Subscribe(ch))
 }
 
-// SubscribeStateSyncEvent registers a subscription of StateSyncEvent.
-func (bc *BlockChain) SubscribeStateSyncEvent(ch chan<- StateSyncEvent) event.Subscription {
-	return bc.scope.Track(bc.stateSyncFeed.Subscribe(ch))
-}
-
 // SubscribeBlockProcessingEvent registers a subscription of bool where true means
 // block processing has started while false means it has stopped.
 func (bc *BlockChain) SubscribeBlockProcessingEvent(ch chan<- bool) event.Subscription {
diff --git a/core/types/block.go b/core/types/block.go
index be31b1a60..6235bb7eb 100644
--- a/core/types/block.go
+++ b/core/types/block.go
@@ -156,10 +156,10 @@ type Body struct {
 
 // Block represents an entire block in the Ethereum blockchain.
 type Block struct {
-	header       *Header
-	uncles       []*Header
-	transactions Transactions
-
+	header        *Header
+	uncles        []*Header
+	transactions  Transactions
+	stateSyncData *StateData
 	// caches
 	hash atomic.Value
 	size atomic.Value
@@ -266,6 +266,11 @@ func CopyHeader(h *Header) *Header {
 	return &cpy
 }
 
+// SetStateSync set sync data in block
+func (b *Block) SetStateSync(stateData *StateData) {
+	b.stateSyncData = stateData
+}
+
 // DecodeRLP decodes the Ethereum
 func (b *Block) DecodeRLP(s *rlp.Stream) error {
 	var eb extblock
@@ -311,11 +316,12 @@ func (b *Block) Transaction(hash common.Hash) *Transaction {
 	return nil
 }
 
-func (b *Block) Number() *big.Int     { return new(big.Int).Set(b.header.Number) }
-func (b *Block) GasLimit() uint64     { return b.header.GasLimit }
-func (b *Block) GasUsed() uint64      { return b.header.GasUsed }
-func (b *Block) Difficulty() *big.Int { return new(big.Int).Set(b.header.Difficulty) }
-func (b *Block) Time() uint64         { return b.header.Time }
+func (b *Block) Number() *big.Int          { return new(big.Int).Set(b.header.Number) }
+func (b *Block) GasLimit() uint64          { return b.header.GasLimit }
+func (b *Block) GasUsed() uint64           { return b.header.GasUsed }
+func (b *Block) Difficulty() *big.Int      { return new(big.Int).Set(b.header.Difficulty) }
+func (b *Block) Time() uint64              { return b.header.Time }
+func (b *Block) StateSyncData() *StateData { return b.stateSyncData }
 
 func (b *Block) NumberU64() uint64        { return b.header.Number.Uint64() }
 func (b *Block) MixDigest() common.Hash   { return b.header.MixDigest }
@@ -372,9 +378,10 @@ func (b *Block) WithSeal(header *Header) *Block {
 	cpy := *header
 
 	return &Block{
-		header:       &cpy,
-		transactions: b.transactions,
-		uncles:       b.uncles,
+		header:        &cpy,
+		transactions:  b.transactions,
+		uncles:        b.uncles,
+		stateSyncData: b.stateSyncData,
 	}
 }
 
diff --git a/eth/api_backend.go b/eth/api_backend.go
index e7add79b1..6f50814cc 100644
--- a/eth/api_backend.go
+++ b/eth/api_backend.go
@@ -218,8 +218,7 @@ func (b *EthAPIBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) e
 }
 
 func (b *EthAPIBackend) SubscribeStateSyncEvent(ch chan<- core.StateSyncEvent) event.Subscription {
-	engine := b.eth.Engine()
-	return engine.(*bor.Bor).SubscribeStateSyncEvent(ch)
+	return b.eth.BlockChain().SubscribeStateSyncEvent(ch)
 }
 
 func (b *EthAPIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
-- 
GitLab