From 9d188f73b58ee1fe4bda00a9536bda4056755f2c Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= <peterke@gmail.com>
Date: Thu, 7 May 2015 21:07:20 +0300
Subject: [PATCH] eth, eth/downloader: make synchronize thread safe

---
 eth/downloader/downloader.go      | 72 +++++++------------------------
 eth/downloader/downloader_test.go |  2 +-
 eth/downloader/queue.go           | 10 -----
 eth/handler.go                    |  4 +-
 eth/sync.go                       | 16 ++-----
 5 files changed, 22 insertions(+), 82 deletions(-)

diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 25b251112..ef2a193ff 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -68,8 +68,7 @@ type Downloader struct {
 	getBlock getBlockFn
 
 	// Status
-	fetchingHashes    int32
-	downloadingBlocks int32
+	synchronizing int32
 
 	// Channels
 	newPeerCh chan *peer
@@ -120,43 +119,26 @@ func (d *Downloader) UnregisterPeer(id string) {
 	delete(d.peers, id)
 }
 
-// SynchroniseWithPeer will select the peer and use it for synchronizing. If an empty string is given
+// Synchronize will select the peer and use it for synchronizing. If an empty string is given
 // it will use the best peer possible and synchronize if it's TD is higher than our own. If any of the
 // checks fail an error will be returned. This method is synchronous
-func (d *Downloader) Synchronise(id string, hash common.Hash) error {
-	// Make sure it's doing neither. Once done we can restart the
-	// downloading process if the TD is higher. For now just get on
-	// with whatever is going on. This prevents unnecessary switching.
-	if d.isBusy() {
-		return errBusy
+func (d *Downloader) Synchronize(id string, hash common.Hash) error {
+	// Make sure only one goroutine is ever allowed past this point at once
+	if !atomic.CompareAndSwapInt32(&d.synchronizing, 0, 1) {
+		return nil
 	}
+	defer atomic.StoreInt32(&d.synchronizing, 0)
 
-	// When a synchronization attempt is made while the queue still
-	// contains items we abort the sync attempt
-	if done, pend := d.queue.Size(); done+pend > 0 {
+	// Abort if the queue still contains some leftover data
+	if _, cached := d.queue.Size(); cached > 0 {
 		return errPendingQueue
 	}
-
-	// Fetch the peer using the id or throw an error if the peer couldn't be found
+	// Retrieve the origin peer and initiate the downloading process
 	p := d.peers[id]
 	if p == nil {
 		return errUnknownPeer
 	}
-
-	// Get the hash from the peer and initiate the downloading progress.
-	err := d.getFromPeer(p, hash, false)
-	if err != nil {
-		return err
-	}
-
-	return nil
-}
-
-// Done lets the downloader know that whatever previous hashes were taken
-// are processed. If the block count reaches zero and done is called
-// we reset the queue for the next batch of incoming hashes and blocks.
-func (d *Downloader) Done() {
-	d.queue.Done()
+	return d.getFromPeer(p, hash, false)
 }
 
 // TakeBlocks takes blocks from the queue and yields them to the blockTaker handler
@@ -176,6 +158,7 @@ func (d *Downloader) Has(hash common.Hash) bool {
 }
 
 func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) (err error) {
+
 	d.activePeer = p.id
 	defer func() {
 		// reset on error
@@ -184,7 +167,7 @@ func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool)
 		}
 	}()
 
-	glog.V(logger.Detail).Infoln("Synchronising with the network using:", p.id)
+	glog.V(logger.Debug).Infoln("Synchronizing with the network using:", p.id)
 	// Start the fetcher. This will block the update entirely
 	// interupts need to be send to the appropriate channels
 	// respectively.
@@ -200,20 +183,13 @@ func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool)
 		return err
 	}
 
-	glog.V(logger.Detail).Infoln("Sync completed")
+	glog.V(logger.Debug).Infoln("Synchronization completed")
 
 	return nil
 }
 
 // XXX Make synchronous
 func (d *Downloader) startFetchingHashes(p *peer, h common.Hash, ignoreInitial bool) error {
-	atomic.StoreInt32(&d.fetchingHashes, 1)
-	defer atomic.StoreInt32(&d.fetchingHashes, 0)
-
-	if d.queue.Has(h) { // TODO: Is this possible? Shouldn't queue be empty for startFetchingHashes to be even called?
-		return errAlreadyInPool
-	}
-
 	glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", h[:4], p.id)
 
 	start := time.Now()
@@ -312,10 +288,8 @@ out:
 }
 
 func (d *Downloader) startFetchingBlocks(p *peer) error {
-	glog.V(logger.Detail).Infoln("Downloading", d.queue.Pending(), "block(s)")
+	glog.V(logger.Debug).Infoln("Downloading", d.queue.Pending(), "block(s)")
 
-	atomic.StoreInt32(&d.downloadingBlocks, 1)
-	defer atomic.StoreInt32(&d.downloadingBlocks, 0)
 	// Defer the peer reset. This will empty the peer requested set
 	// and makes sure there are no lingering peers with an incorrect
 	// state
@@ -439,19 +413,3 @@ func (d *Downloader) AddHashes(id string, hashes []common.Hash) error {
 
 	return nil
 }
-
-func (d *Downloader) isFetchingHashes() bool {
-	return atomic.LoadInt32(&d.fetchingHashes) == 1
-}
-
-func (d *Downloader) isDownloadingBlocks() bool {
-	return atomic.LoadInt32(&d.downloadingBlocks) == 1
-}
-
-func (d *Downloader) isBusy() bool {
-	return d.isFetchingHashes() || d.isDownloadingBlocks()
-}
-
-func (d *Downloader) IsBusy() bool {
-	return d.isBusy()
-}
diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go
index bd439d96a..f3402794b 100644
--- a/eth/downloader/downloader_test.go
+++ b/eth/downloader/downloader_test.go
@@ -61,7 +61,7 @@ func newTester(t *testing.T, hashes []common.Hash, blocks map[common.Hash]*types
 
 func (dl *downloadTester) sync(peerId string, hash common.Hash) error {
 	dl.activePeerId = peerId
-	return dl.downloader.Synchronise(peerId, hash)
+	return dl.downloader.Synchronize(peerId, hash)
 }
 
 func (dl *downloadTester) hasBlock(hash common.Hash) bool {
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index d849d4d68..515440bca 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -63,16 +63,6 @@ func (q *queue) Reset() {
 	q.blockCache = nil
 }
 
-// Done checks if all the downloads have been retrieved, wiping the queue.
-func (q *queue) Done() {
-	q.lock.Lock()
-	defer q.lock.Unlock()
-
-	if len(q.blockCache) == 0 {
-		q.Reset()
-	}
-}
-
 // Size retrieves the number of hashes in the queue, returning separately for
 // pending and already downloaded.
 func (q *queue) Size() (int, int) {
diff --git a/eth/handler.go b/eth/handler.go
index 1e0663816..b2018f336 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -307,7 +307,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
 
 		// Attempt to insert the newly received by checking if the parent exists.
 		// if the parent exists we process the block and propagate to our peers
-		// otherwise synchronise with the peer
+		// otherwise synchronize with the peer
 		if self.chainman.HasBlock(request.Block.ParentHash()) {
 			if _, err := self.chainman.InsertChain(types.Blocks{request.Block}); err != nil {
 				glog.V(logger.Error).Infoln("removed peer (", p.id, ") due to block error")
@@ -324,7 +324,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
 			}
 			self.BroadcastBlock(hash, request.Block)
 		} else {
-			go self.synchronise(p)
+			go self.synchronize(p)
 		}
 	default:
 		return errResp(ErrInvalidMsgCode, "%v", msg.Code)
diff --git a/eth/sync.go b/eth/sync.go
index 9e8b21a7c..b259c1d47 100644
--- a/eth/sync.go
+++ b/eth/sync.go
@@ -32,14 +32,14 @@ func (pm *ProtocolManager) update() {
 			}
 
 			itimer.Stop()
-			go pm.synchronise(peer)
+			go pm.synchronize(peer)
 		case <-itimer.C:
 			// The timer will make sure that the downloader keeps an active state
 			// in which it attempts to always check the network for highest td peers
 			// Either select the peer or restart the timer if no peers could
 			// be selected.
 			if peer := getBestPeer(pm.peers); peer != nil {
-				go pm.synchronise(peer)
+				go pm.synchronize(peer)
 			} else {
 				itimer.Reset(5 * time.Second)
 			}
@@ -63,7 +63,6 @@ func (pm *ProtocolManager) processBlocks() error {
 	if len(blocks) == 0 {
 		return nil
 	}
-	defer pm.downloader.Done()
 
 	glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].Number(), blocks[len(blocks)-1].Number())
 
@@ -78,26 +77,19 @@ func (pm *ProtocolManager) processBlocks() error {
 	return nil
 }
 
-func (pm *ProtocolManager) synchronise(peer *peer) {
+func (pm *ProtocolManager) synchronize(peer *peer) {
 	// Make sure the peer's TD is higher than our own. If not drop.
 	if peer.td.Cmp(pm.chainman.Td()) <= 0 {
 		return
 	}
-	// Check downloader if it's busy so it doesn't show the sync message
-	// for every attempty
-	if pm.downloader.IsBusy() {
-		return
-	}
-
 	// FIXME if we have the hash in our chain and the TD of the peer is
 	// much higher than ours, something is wrong with us or the peer.
 	// Check if the hash is on our own chain
 	if pm.chainman.HasBlock(peer.recentHash) {
 		return
 	}
-
 	// Get the hashes from the peer (synchronously)
-	err := pm.downloader.Synchronise(peer.id, peer.recentHash)
+	err := pm.downloader.Synchronize(peer.id, peer.recentHash)
 	if err != nil && err == downloader.ErrBadPeer {
 		glog.V(logger.Debug).Infoln("removed peer from peer set due to bad action")
 		pm.removePeer(peer)
-- 
GitLab