From 8d9be18b296afb8302249dcc96651aabb0975e26 Mon Sep 17 00:00:00 2001
From: obscuren <geffobscura@gmail.com>
Date: Fri, 6 Mar 2015 15:50:44 +0100
Subject: [PATCH] Queued approach to delivering chain events

---
 core/chain_manager.go | 107 ++++++++++++++++++++++++++++--------------
 core/events.go        |  10 ++++
 miner/miner.go        |   1 +
 miner/worker.go       |   4 +-
 4 files changed, 86 insertions(+), 36 deletions(-)

diff --git a/core/chain_manager.go b/core/chain_manager.go
index 75d2f6bad..20a1737ad 100644
--- a/core/chain_manager.go
+++ b/core/chain_manager.go
@@ -19,11 +19,6 @@ var (
 	jsonlogger  = logger.NewJsonLogger()
 )
 
-type ChainEvent struct {
-	Block *types.Block
-	Td    *big.Int
-}
-
 type StateQuery interface {
 	GetAccount(addr []byte) *state.StateObject
 }
@@ -93,13 +88,16 @@ type ChainManager struct {
 
 	transState *state.StateDB
 	txState    *state.StateDB
+
+	quit chan struct{}
 }
 
 func NewChainManager(db ethutil.Database, mux *event.TypeMux) *ChainManager {
-	bc := &ChainManager{db: db, genesisBlock: GenesisBlock(db), eventMux: mux}
+	bc := &ChainManager{db: db, genesisBlock: GenesisBlock(db), eventMux: mux, quit: make(chan struct{})}
 	bc.setLastBlock()
 	bc.transState = bc.State().Copy()
 	bc.txState = bc.State().Copy()
+	go bc.update()
 
 	return bc
 }
@@ -388,16 +386,24 @@ func (self *ChainManager) CalcTotalDiff(block *types.Block) (*big.Int, error) {
 }
 
 func (bc *ChainManager) Stop() {
-	if bc.CurrentBlock != nil {
-		chainlogger.Infoln("Stopped")
-	}
+	close(bc.quit)
+}
+
+type queueEvent struct {
+	queue          []interface{}
+	canonicalCount int
+	sideCount      int
+	splitCount     int
 }
 
 func (self *ChainManager) InsertChain(chain types.Blocks) error {
-	self.tsmu.Lock()
-	defer self.tsmu.Unlock()
+	//self.tsmu.Lock()
+	//defer self.tsmu.Unlock()
 
-	for _, block := range chain {
+	// A queued approach to delivering events. This is generally faster than direct delivery and requires much less mutex acquiring.
+	var queue = make([]interface{}, len(chain))
+	var queueEvent = queueEvent{queue: queue}
+	for i, block := range chain {
 		// Call in to the block processor and check for errors. It's likely that if one block fails
 		// all others will fail too (unless a known block is returned).
 		td, err := self.processor.Process(block)
@@ -414,7 +420,6 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error {
 		}
 		block.Td = td
 
-		var canonical, split bool
 		self.mu.Lock()
 		cblock := self.currentBlock
 		{
@@ -426,41 +431,75 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error {
 			if td.Cmp(self.td) > 0 {
 				if block.Header().Number.Cmp(new(big.Int).Add(cblock.Header().Number, ethutil.Big1)) < 0 {
 					chainlogger.Infof("Split detected. New head #%v (%x) TD=%v, was #%v (%x) TD=%v\n", block.Header().Number, block.Hash()[:4], td, cblock.Header().Number, cblock.Hash()[:4], self.td)
-					split = true
+
+					queue[i] = ChainSplitEvent{block}
+					queueEvent.splitCount++
 				}
 
 				self.setTotalDifficulty(td)
 				self.insert(block)
 
-				canonical = true
+				/*
+					jsonlogger.LogJson(&logger.EthChainNewHead{
+						BlockHash:     ethutil.Bytes2Hex(block.Hash()),
+						BlockNumber:   block.Number(),
+						ChainHeadHash: ethutil.Bytes2Hex(cblock.Hash()),
+						BlockPrevHash: ethutil.Bytes2Hex(block.ParentHash()),
+					})
+				*/
+
+				self.setTransState(state.New(block.Root(), self.db))
+				queue[i] = ChainEvent{block}
+				queueEvent.canonicalCount++
+			} else {
+				queue[i] = ChainSideEvent{block}
+				queueEvent.sideCount++
 			}
 		}
 		self.mu.Unlock()
 
-		if canonical {
-			/*
-				jsonlogger.LogJson(&logger.EthChainNewHead{
-					BlockHash:     ethutil.Bytes2Hex(block.Hash()),
-					BlockNumber:   block.Number(),
-					ChainHeadHash: ethutil.Bytes2Hex(cblock.Hash()),
-					BlockPrevHash: ethutil.Bytes2Hex(block.ParentHash()),
-				})
-			*/
-			self.setTransState(state.New(block.Root(), self.db))
-			self.eventMux.Post(ChainEvent{block, td})
-		} else {
-			//self.eventMux.
-		}
-
-		if split {
-			self.setTxState(state.New(block.Root(), self.db))
-			self.eventMux.Post(ChainSplitEvent{block})
-		}
 	}
 
+	// XXX put this in a goroutine?
+	go self.eventMux.Post(queueEvent)
+
 	return nil
 }
 
+func (self *ChainManager) update() {
+	events := self.eventMux.Subscribe(queueEvent{})
+
+out:
+	for {
+		select {
+		case ev := <-events.Chan():
+			switch ev := ev.(type) {
+			case queueEvent:
+				for i, event := range ev.queue {
+					switch event := event.(type) {
+					case ChainEvent:
+						// We need some control over the mining operation. Acquiring locks and waiting for the miner to create new block takes too long
+						// and in most cases isn't even necessary.
+						if i == ev.canonicalCount {
+							self.eventMux.Post(ChainHeadEvent{event.Block})
+						}
+					case ChainSplitEvent:
+						// On chain splits we need to reset the transaction state. We can't be sure whether the actual
+						// state of the accounts are still valid.
+						if i == ev.splitCount {
+							self.setTxState(state.New(event.Block.Root(), self.db))
+						}
+					}
+
+					self.eventMux.Post(event)
+				}
+			}
+		case <-self.quit:
+			break out
+		}
+	}
+}
+
 // Satisfy state query interface
 func (self *ChainManager) GetAccount(addr []byte) *state.StateObject {
 	return self.State().GetAccount(addr)
diff --git a/core/events.go b/core/events.go
index 4cbbc609c..23678ef60 100644
--- a/core/events.go
+++ b/core/events.go
@@ -16,3 +16,13 @@ type NewMinedBlockEvent struct{ Block *types.Block }
 
 // ChainSplit is posted when a new head is detected
 type ChainSplitEvent struct{ Block *types.Block }
+
+type ChainEvent struct{ Block *types.Block }
+
+type ChainSideEvent struct{ Block *types.Block }
+
+type ChainHeadEvent struct{ Block *types.Block }
+
+// Mining operation events
+type StartMining struct{}
+type TopMining struct{}
diff --git a/miner/miner.go b/miner/miner.go
index 490296431..d3b1f578a 100644
--- a/miner/miner.go
+++ b/miner/miner.go
@@ -30,6 +30,7 @@ func New(coinbase []byte, eth core.Backend, pow pow.PoW, minerThreads int) *Mine
 		pow:      pow,
 	}
 
+	minerThreads = 1
 	for i := 0; i < minerThreads; i++ {
 		miner.worker.register(NewCpuMiner(i, miner.pow))
 	}
diff --git a/miner/worker.go b/miner/worker.go
index 29992b327..cd105fa73 100644
--- a/miner/worker.go
+++ b/miner/worker.go
@@ -116,7 +116,7 @@ func (self *worker) register(agent Agent) {
 }
 
 func (self *worker) update() {
-	events := self.mux.Subscribe(core.ChainEvent{}, core.NewMinedBlockEvent{})
+	events := self.mux.Subscribe(core.ChainHeadEvent{}, core.NewMinedBlockEvent{})
 
 	timer := time.NewTicker(2 * time.Second)
 
@@ -125,7 +125,7 @@ out:
 		select {
 		case event := <-events.Chan():
 			switch ev := event.(type) {
-			case core.ChainEvent:
+			case core.ChainHeadEvent:
 				if self.current.block != ev.Block {
 					self.commitNewWork()
 				}
-- 
GitLab