From 32bb280179a44b9ad1058766bf61cdbacea30a59 Mon Sep 17 00:00:00 2001
From: Felix Lange <fjl@twurst.com>
Date: Mon, 2 May 2016 17:01:13 +0200
Subject: [PATCH] p2p: improve readability of dial task scheduling code

---
 p2p/server.go      | 57 +++++++++++++++++++++++-----------------------
 p2p/server_test.go | 50 ++++++++++++++++++++++++++++++++++++++++
 2 files changed, 78 insertions(+), 29 deletions(-)

diff --git a/p2p/server.go b/p2p/server.go
index 52d1be677..3b2f2b078 100644
--- a/p2p/server.go
+++ b/p2p/server.go
@@ -398,12 +398,11 @@ type dialer interface {
 func (srv *Server) run(dialstate dialer) {
 	defer srv.loopWG.Done()
 	var (
-		peers   = make(map[discover.NodeID]*Peer)
-		trusted = make(map[discover.NodeID]bool, len(srv.TrustedNodes))
-
-		tasks        []task
-		pendingTasks []task
+		peers        = make(map[discover.NodeID]*Peer)
+		trusted      = make(map[discover.NodeID]bool, len(srv.TrustedNodes))
 		taskdone     = make(chan task, maxActiveDialTasks)
+		runningTasks []task
+		queuedTasks  []task // tasks that can't run yet
 	)
 	// Put trusted nodes into a map to speed up checks.
 	// Trusted peers are loaded on startup and cannot be
@@ -412,39 +411,39 @@ func (srv *Server) run(dialstate dialer) {
 		trusted[n.ID] = true
 	}
 
-	// Some task list helpers.
+	// removes t from runningTasks
 	delTask := func(t task) {
-		for i := range tasks {
-			if tasks[i] == t {
-				tasks = append(tasks[:i], tasks[i+1:]...)
+		for i := range runningTasks {
+			if runningTasks[i] == t {
+				runningTasks = append(runningTasks[:i], runningTasks[i+1:]...)
 				break
 			}
 		}
 	}
-	scheduleTasks := func(new []task) {
-		pt := append(pendingTasks, new...)
-		start := maxActiveDialTasks - len(tasks)
-		if len(pt) < start {
-			start = len(pt)
+	// starts until max number of active tasks is satisfied
+	startTasks := func(ts []task) (rest []task) {
+		i := 0
+		for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ {
+			t := ts[i]
+			glog.V(logger.Detail).Infoln("new task:", t)
+			go func() { t.Do(srv); taskdone <- t }()
+			runningTasks = append(runningTasks, t)
 		}
-		if start > 0 {
-			tasks = append(tasks, pt[:start]...)
-			for _, t := range pt[:start] {
-				t := t
-				glog.V(logger.Detail).Infoln("new task:", t)
-				go func() { t.Do(srv); taskdone <- t }()
-			}
-			copy(pt, pt[start:])
-			pendingTasks = pt[:len(pt)-start]
+		return ts[i:]
+	}
+	scheduleTasks := func() {
+		// Start from queue first.
+		queuedTasks = append(queuedTasks[:0], startTasks(queuedTasks)...)
+		// Query dialer for new tasks and start as many as possible now.
+		if len(runningTasks) < maxActiveDialTasks {
+			nt := dialstate.newTasks(len(runningTasks)+len(queuedTasks), peers, time.Now())
+			queuedTasks = append(queuedTasks, startTasks(nt)...)
 		}
 	}
 
 running:
 	for {
-		// Query the dialer for new tasks and launch them.
-		now := time.Now()
-		nt := dialstate.newTasks(len(pendingTasks)+len(tasks), peers, now)
-		scheduleTasks(nt)
+		scheduleTasks()
 
 		select {
 		case <-srv.quit:
@@ -466,7 +465,7 @@ running:
 			// can update its state and remove it from the active
 			// tasks list.
 			glog.V(logger.Detail).Infoln("<-taskdone:", t)
-			dialstate.taskDone(t, now)
+			dialstate.taskDone(t, time.Now())
 			delTask(t)
 		case c := <-srv.posthandshake:
 			// A connection has passed the encryption handshake so
@@ -513,7 +512,7 @@ running:
 	// Wait for peers to shut down. Pending connections and tasks are
 	// not handled here and will terminate soon-ish because srv.quit
 	// is closed.
-	glog.V(logger.Detail).Infof("ignoring %d pending tasks at spindown", len(tasks))
+	glog.V(logger.Detail).Infof("ignoring %d pending tasks at spindown", len(runningTasks))
 	for len(peers) > 0 {
 		p := <-srv.delpeer
 		glog.V(logger.Detail).Infoln("<-delpeer (spindown):", p)
diff --git a/p2p/server_test.go b/p2p/server_test.go
index 02d1c8e01..b437ac367 100644
--- a/p2p/server_test.go
+++ b/p2p/server_test.go
@@ -235,6 +235,56 @@ func TestServerTaskScheduling(t *testing.T) {
 	}
 }
 
+// This test checks that Server doesn't drop tasks,
+// even if newTasks returns more than the maximum number of tasks.
+func TestServerManyTasks(t *testing.T) {
+	alltasks := make([]task, 300)
+	for i := range alltasks {
+		alltasks[i] = &testTask{index: i}
+	}
+
+	var (
+		srv        = &Server{quit: make(chan struct{}), ntab: fakeTable{}, running: true}
+		done       = make(chan *testTask)
+		start, end = 0, 0
+	)
+	defer srv.Stop()
+	srv.loopWG.Add(1)
+	go srv.run(taskgen{
+		newFunc: func(running int, peers map[discover.NodeID]*Peer) []task {
+			start, end = end, end+maxActiveDialTasks+10
+			if end > len(alltasks) {
+				end = len(alltasks)
+			}
+			return alltasks[start:end]
+		},
+		doneFunc: func(tt task) {
+			done <- tt.(*testTask)
+		},
+	})
+
+	doneset := make(map[int]bool)
+	timeout := time.After(2 * time.Second)
+	for len(doneset) < len(alltasks) {
+		select {
+		case tt := <-done:
+			if doneset[tt.index] {
+				t.Errorf("task %d got done more than once", tt.index)
+			} else {
+				doneset[tt.index] = true
+			}
+		case <-timeout:
+			t.Errorf("%d of %d tasks got done within 2s", len(doneset), len(alltasks))
+			for i := 0; i < len(alltasks); i++ {
+				if !doneset[i] {
+					t.Logf("task %d not done", i)
+				}
+			}
+			return
+		}
+	}
+}
+
 type taskgen struct {
 	newFunc  func(running int, peers map[discover.NodeID]*Peer) []task
 	doneFunc func(task)
-- 
GitLab