From 272f546bc0f9104bdecb48209c58be89dcb7c6cb Mon Sep 17 00:00:00 2001
From: Garet Halliday <me@garet.holiday>
Date: Sat, 16 Sep 2023 01:13:42 -0500
Subject: [PATCH] maybe

---
 lib/gat/pool/pool.go               |  2 +-
 lib/rob/schedulers/v2/scheduler.go | 13 ++++++---
 lib/rob/schedulers/v2/sink/sink.go | 47 ++++++++++++++++--------------
 3 files changed, 35 insertions(+), 27 deletions(-)

diff --git a/lib/gat/pool/pool.go b/lib/gat/pool/pool.go
index ff18e1f4..e31d69fd 100644
--- a/lib/gat/pool/pool.go
+++ b/lib/gat/pool/pool.go
@@ -289,7 +289,7 @@ func (T *Pool) acquireServer(client *Client) *Server {
 						return
 					default:
 					}
-					log.Printf("still waiting after %d in pool %p", start, T)
+					log.Printf("still waiting after %d in pool %p", time.Since(start), T)
 				}
 			}()
 			serverID = T.options.Pooler.Acquire(client.GetID(), SyncModeBlocking)
diff --git a/lib/rob/schedulers/v2/scheduler.go b/lib/rob/schedulers/v2/scheduler.go
index 23421be1..46090559 100644
--- a/lib/rob/schedulers/v2/scheduler.go
+++ b/lib/rob/schedulers/v2/scheduler.go
@@ -36,11 +36,16 @@ func (T *Scheduler) NewWorker() uuid.UUID {
 	}
 	T.sinks[worker] = s
 
-	if len(T.backlog) > 0 {
-		for _, v := range T.backlog {
-			s.Enqueue(v)
+	if func() bool {
+		T.bmu.Lock()
+		defer T.bmu.Unlock()
+		if len(T.backlog) > 0 {
+			s.Enqueue(T.backlog...)
+			T.backlog = T.backlog[:0]
+			return true
 		}
-		T.backlog = T.backlog[:0]
+		return false
+	}() {
 		return worker
 	}
 
diff --git a/lib/rob/schedulers/v2/sink/sink.go b/lib/rob/schedulers/v2/sink/sink.go
index 70e34356..8445c6d2 100644
--- a/lib/rob/schedulers/v2/sink/sink.go
+++ b/lib/rob/schedulers/v2/sink/sink.go
@@ -50,28 +50,35 @@ func (T *Sink) schedule(j job.Stalled) bool {
 		T.stride[j.User] = stride
 	} else if stride < T.floor {
 		stride = T.floor
-		if T.stride == nil {
-			T.stride = make(map[uuid.UUID]time.Duration)
-		}
 		T.stride[j.User] = stride
 	}
 
 	for {
 		// find unique stride to schedule on
-		if s, ok := T.scheduled.Get(stride); ok {
-			if s.User == j.User {
-				return false
-			}
-			stride += 1
-			continue
+		s, ok := T.scheduled.Get(stride)
+		if !ok {
+			break
 		}
 
-		T.scheduled.Set(stride, j)
-		return true
+		if s.User == j.User {
+			return false
+		}
+		stride += 1
+		continue
 	}
+
+	T.scheduled.Set(stride, j)
+	return true
 }
 
 func (T *Sink) enqueue(j job.Stalled) {
+	if T.active == uuid.Nil {
+		// run it now
+		T.acquire(j.User)
+		j.Ready <- T.id
+		return
+	}
+
 	if T.schedule(j) {
 		return
 	}
@@ -90,20 +97,14 @@ func (T *Sink) enqueue(j job.Stalled) {
 	p.PushBack(j)
 }
 
-func (T *Sink) Enqueue(j job.Stalled) {
+func (T *Sink) Enqueue(j ...job.Stalled) {
 	// enqueue job
 	T.mu.Lock()
 	defer T.mu.Unlock()
 
-	if T.active == uuid.Nil {
-		// run it now
-		T.acquire(j.User)
-		j.Ready <- T.id
-		return
+	for _, jj := range j {
+		T.enqueue(jj)
 	}
-
-	// enqueue for later
-	T.enqueue(j)
 }
 
 func (T *Sink) acquire(user uuid.UUID) {
@@ -229,11 +230,13 @@ func (T *Sink) StealFor(rhs *Sink) uuid.UUID {
 
 	T.mu.Unlock()
 
-	rhs.Enqueue(j)
+	rhs.mu.Lock()
+	defer rhs.mu.Unlock()
+	rhs.enqueue(j)
 
 	if pending != nil {
 		for j, ok = pending.PopFront(); ok; j, ok = pending.PopFront() {
-			rhs.Enqueue(j)
+			rhs.enqueue(j)
 		}
 	}
 
-- 
GitLab