From 996f90787b5df4c9e7e348ceb27831c23e1ae866 Mon Sep 17 00:00:00 2001
From: Garet Halliday <me@garet.holiday>
Date: Tue, 16 May 2023 15:37:52 -0500
Subject: [PATCH] a new rob a day keeps the doctor away

---
 cmd/cgat/main.go                              |   2 +-
 lib/rob/schedulers/v0/job/job.go              |  19 --
 lib/rob/schedulers/v0/pool/pool.go            | 121 -----------
 lib/rob/schedulers/v0/scheduler.go            |  33 ---
 lib/rob/schedulers/v1/pool/job/base.go        |  12 ++
 lib/rob/schedulers/v1/pool/job/concurrent.go  |   6 +
 lib/rob/schedulers/v1/pool/job/stalled.go     |   8 +
 lib/rob/schedulers/v1/pool/pool.go            | 142 +++++++++++++
 .../schedulers/{v0 => v1/pool}/sink/sink.go   | 194 +++++++++---------
 lib/rob/schedulers/v1/scheduler.go            |  24 ++-
 .../schedulers/{v0 => v1}/scheduler_test.go   |   0
 .../schedulers/{v0 => v1}/source/source.go    |  27 ++-
 lib/util/maps/rwlocked.go                     |  12 --
 13 files changed, 293 insertions(+), 307 deletions(-)
 delete mode 100644 lib/rob/schedulers/v0/job/job.go
 delete mode 100644 lib/rob/schedulers/v0/pool/pool.go
 delete mode 100644 lib/rob/schedulers/v0/scheduler.go
 create mode 100644 lib/rob/schedulers/v1/pool/job/base.go
 create mode 100644 lib/rob/schedulers/v1/pool/job/concurrent.go
 create mode 100644 lib/rob/schedulers/v1/pool/job/stalled.go
 create mode 100644 lib/rob/schedulers/v1/pool/pool.go
 rename lib/rob/schedulers/{v0 => v1/pool}/sink/sink.go (53%)
 rename lib/rob/schedulers/{v0 => v1}/scheduler_test.go (100%)
 rename lib/rob/schedulers/{v0 => v1}/source/source.go (60%)

diff --git a/cmd/cgat/main.go b/cmd/cgat/main.go
index fc3c49c8..8785296f 100644
--- a/cmd/cgat/main.go
+++ b/cmd/cgat/main.go
@@ -7,7 +7,7 @@ import (
 	_ "net/http/pprof"
 
 	"pggat2/lib/middleware/middlewares/eqp"
-	"pggat2/lib/rob/schedulers/v0"
+	"pggat2/lib/rob/schedulers/v1"
 	"pggat2/lib/zap/onebuffer"
 
 	"pggat2/lib/bouncer/backends/v0"
diff --git a/lib/rob/schedulers/v0/job/job.go b/lib/rob/schedulers/v0/job/job.go
deleted file mode 100644
index 12778185..00000000
--- a/lib/rob/schedulers/v0/job/job.go
+++ /dev/null
@@ -1,19 +0,0 @@
-package job
-
-import (
-	"github.com/google/uuid"
-
-	"pggat2/lib/rob"
-)
-
-type Concurrent struct {
-	Source      uuid.UUID
-	Constraints rob.Constraints
-	Work        any
-}
-
-type Stalled struct {
-	Source      uuid.UUID
-	Constraints rob.Constraints
-	Out         chan<- any
-}
diff --git a/lib/rob/schedulers/v0/pool/pool.go b/lib/rob/schedulers/v0/pool/pool.go
deleted file mode 100644
index 4b8635c9..00000000
--- a/lib/rob/schedulers/v0/pool/pool.go
+++ /dev/null
@@ -1,121 +0,0 @@
-package pool
-
-import (
-	"sync"
-
-	"github.com/google/uuid"
-
-	"pggat2/lib/rob/schedulers/v0/job"
-	"pggat2/lib/rob/schedulers/v0/sink"
-	"pggat2/lib/util/maps"
-)
-
-type Pool struct {
-	affinity maps.RWLocked[uuid.UUID, uuid.UUID]
-
-	backlog []job.Stalled
-	bmu     sync.Mutex
-
-	sinks maps.RWLocked[uuid.UUID, *sink.Sink]
-}
-
-func MakePool() Pool {
-	return Pool{}
-}
-
-// DoConcurrent attempts to do the work now.
-// Returns true if the work was done, otherwise the sender should stall the work.
-func (T *Pool) DoConcurrent(j job.Concurrent) (done bool) {
-	affinity, _ := T.affinity.Load(j.Source)
-
-	// try affinity first
-	if v, ok := T.sinks.Load(affinity); ok {
-		if done, _ = v.DoConcurrent(j); done {
-			return
-		}
-	}
-
-	T.sinks.Range(func(id uuid.UUID, v *sink.Sink) bool {
-		if id == affinity {
-			return true
-		}
-		var hasMore bool
-		if done, hasMore = v.DoConcurrent(j); done {
-			// set affinity
-			T.affinity.Store(j.Source, id)
-
-			if !hasMore {
-				T.StealFor(v)
-			}
-
-			return false
-		}
-		return true
-	})
-	if done {
-		return
-	}
-
-	return false
-}
-
-// DoStalled queues a job to be done eventually
-func (T *Pool) DoStalled(j job.Stalled) {
-	affinity, _ := T.affinity.Load(j.Source)
-
-	// try affinity first
-	if v, ok := T.sinks.Load(affinity); ok {
-		if ok = v.DoStalled(j); ok {
-			return
-		}
-	}
-
-	var ok bool
-	T.sinks.Range(func(id uuid.UUID, v *sink.Sink) bool {
-		if id == affinity {
-			return true
-		}
-		if ok = v.DoStalled(j); ok {
-			// set affinity
-			T.affinity.Store(j.Source, id)
-			return false
-		}
-		return true
-	})
-	if ok {
-		return
-	}
-
-	// add to backlog
-	T.bmu.Lock()
-	defer T.bmu.Unlock()
-	T.backlog = append(T.backlog, j)
-}
-
-func (T *Pool) AddSink(s *sink.Sink) {
-	T.sinks.Store(s.ID(), s)
-
-	T.bmu.Lock()
-	defer T.bmu.Unlock()
-	i := 0
-	for _, v := range T.backlog {
-		if ok := s.DoStalled(v); !ok {
-			T.backlog[i] = v
-			i++
-		}
-	}
-	T.backlog = T.backlog[:i]
-}
-
-func (T *Pool) StealFor(q *sink.Sink) {
-	T.sinks.Range(func(_ uuid.UUID, s *sink.Sink) bool {
-		if s == q {
-			return true
-		}
-		if source, ok := s.StealFor(q); ok {
-			T.affinity.Store(source, q.ID())
-			return false
-		}
-		return true
-	})
-}
diff --git a/lib/rob/schedulers/v0/scheduler.go b/lib/rob/schedulers/v0/scheduler.go
deleted file mode 100644
index 780a7d61..00000000
--- a/lib/rob/schedulers/v0/scheduler.go
+++ /dev/null
@@ -1,33 +0,0 @@
-package schedulers
-
-import (
-	"pggat2/lib/rob"
-	"pggat2/lib/rob/schedulers/v0/pool"
-	"pggat2/lib/rob/schedulers/v0/sink"
-	"pggat2/lib/rob/schedulers/v0/source"
-)
-
-type Scheduler struct {
-	pool pool.Pool
-}
-
-func MakeScheduler() Scheduler {
-	return Scheduler{
-		pool: pool.MakePool(),
-	}
-}
-
-func NewScheduler() *Scheduler {
-	s := MakeScheduler()
-	return &s
-}
-
-func (T *Scheduler) AddSink(constraints rob.Constraints, worker rob.Worker) {
-	T.pool.AddSink(sink.NewSink(constraints, worker))
-}
-
-func (T *Scheduler) NewSource() rob.Worker {
-	return source.NewSource(&T.pool)
-}
-
-var _ rob.Scheduler = (*Scheduler)(nil)
diff --git a/lib/rob/schedulers/v1/pool/job/base.go b/lib/rob/schedulers/v1/pool/job/base.go
new file mode 100644
index 00000000..2bffd2f8
--- /dev/null
+++ b/lib/rob/schedulers/v1/pool/job/base.go
@@ -0,0 +1,12 @@
+package job
+
+import (
+	"github.com/google/uuid"
+
+	"pggat2/lib/rob"
+)
+
+type Base struct {
+	Source      uuid.UUID
+	Constraints rob.Constraints
+}
diff --git a/lib/rob/schedulers/v1/pool/job/concurrent.go b/lib/rob/schedulers/v1/pool/job/concurrent.go
new file mode 100644
index 00000000..71c42263
--- /dev/null
+++ b/lib/rob/schedulers/v1/pool/job/concurrent.go
@@ -0,0 +1,6 @@
+package job
+
+type Concurrent struct {
+	Base
+	Work any
+}
diff --git a/lib/rob/schedulers/v1/pool/job/stalled.go b/lib/rob/schedulers/v1/pool/job/stalled.go
new file mode 100644
index 00000000..c7aa6a34
--- /dev/null
+++ b/lib/rob/schedulers/v1/pool/job/stalled.go
@@ -0,0 +1,8 @@
+package job
+
+import "github.com/google/uuid"
+
+type Stalled struct {
+	Base
+	Ready chan uuid.UUID
+}
diff --git a/lib/rob/schedulers/v1/pool/pool.go b/lib/rob/schedulers/v1/pool/pool.go
new file mode 100644
index 00000000..a01fcb8f
--- /dev/null
+++ b/lib/rob/schedulers/v1/pool/pool.go
@@ -0,0 +1,142 @@
+package pool
+
+import (
+	"sync"
+
+	"github.com/google/uuid"
+
+	"pggat2/lib/rob"
+	"pggat2/lib/rob/schedulers/v1/pool/job"
+	"pggat2/lib/rob/schedulers/v1/pool/sink"
+	"pggat2/lib/util/maps"
+)
+
+type Pool struct {
+	affinity maps.RWLocked[uuid.UUID, uuid.UUID]
+
+	// backlog should only be accessed when mu is Locked in some way or another
+	backlog []job.Stalled
+	bmu     sync.Mutex
+	sinks   map[uuid.UUID]*sink.Sink
+	mu      sync.RWMutex
+}
+
+func MakePool() Pool {
+	return Pool{
+		sinks: make(map[uuid.UUID]*sink.Sink),
+	}
+}
+
+func (T *Pool) DoConcurrent(j job.Concurrent) bool {
+	affinity, _ := T.affinity.Load(j.Source)
+
+	// these can be unlocked and locked a bunch here because it is less bad if DoConcurrent misses a sink
+	// (it will just stall the job and try again)
+	T.mu.RLock()
+
+	// try affinity first
+	if v, ok := T.sinks[affinity]; ok {
+		T.mu.RUnlock()
+		if ok = v.DoConcurrent(j); ok {
+			return true
+		}
+		T.mu.RLock()
+	}
+
+	for id, v := range T.sinks {
+		if id == affinity {
+			continue
+		}
+		T.mu.RUnlock()
+		if ok := v.DoConcurrent(j); ok {
+			// set affinity
+			T.affinity.Store(j.Source, id)
+
+			return true
+		}
+		T.mu.RLock()
+	}
+
+	T.mu.RUnlock()
+	return false
+}
+
+func (T *Pool) DoStalled(j job.Stalled) {
+	affinity, _ := T.affinity.Load(j.Source)
+
+	T.mu.RLock()
+	defer T.mu.RUnlock()
+
+	// try affinity first
+	if v, ok := T.sinks[affinity]; ok {
+		if ok = v.DoStalled(j); ok {
+			return
+		}
+	}
+
+	for id, v := range T.sinks {
+		if id == affinity {
+			continue
+		}
+
+		if ok := v.DoStalled(j); ok {
+			T.affinity.Store(j.Source, id)
+			return
+		}
+	}
+
+	// add to backlog
+	T.bmu.Lock()
+	defer T.bmu.Unlock()
+	T.backlog = append(T.backlog, j)
+}
+
+func (T *Pool) AddWorker(constraints rob.Constraints, worker rob.Worker) {
+	id := uuid.New()
+	s := sink.NewSink(id, constraints, worker)
+
+	T.mu.Lock()
+	defer T.mu.Unlock()
+	// if mu is locked, we don't need to lock bmu, because we are the only accessor
+	T.sinks[id] = s
+	i := 0
+	for _, v := range T.backlog {
+		if ok := s.DoStalled(v); !ok {
+			T.backlog[i] = v
+			i++
+		}
+	}
+	T.backlog = T.backlog[:i]
+}
+
+func (T *Pool) stealFor(id uuid.UUID) {
+	T.mu.RLock()
+
+	s := T.sinks[id]
+
+	for _, v := range T.sinks {
+		if v == s {
+			continue
+		}
+
+		T.mu.RUnlock()
+		if src := v.StealFor(s); src != uuid.Nil {
+			T.affinity.Store(src, id)
+			return
+		}
+		T.mu.RLock()
+	}
+
+	T.mu.RUnlock()
+}
+
+func (T *Pool) Do(id uuid.UUID, constraints rob.Constraints, work any) {
+	T.mu.RLock()
+	s := T.sinks[id]
+	T.mu.RUnlock()
+
+	if !s.Do(constraints, work) {
+		// try to steal
+		T.stealFor(id)
+	}
+}
diff --git a/lib/rob/schedulers/v0/sink/sink.go b/lib/rob/schedulers/v1/pool/sink/sink.go
similarity index 53%
rename from lib/rob/schedulers/v0/sink/sink.go
rename to lib/rob/schedulers/v1/pool/sink/sink.go
index cd8abcd1..7b8a4328 100644
--- a/lib/rob/schedulers/v0/sink/sink.go
+++ b/lib/rob/schedulers/v1/pool/sink/sink.go
@@ -7,7 +7,7 @@ import (
 	"github.com/google/uuid"
 
 	"pggat2/lib/rob"
-	"pggat2/lib/rob/schedulers/v0/job"
+	"pggat2/lib/rob/schedulers/v1/pool/job"
 	"pggat2/lib/util/rbtree"
 	"pggat2/lib/util/ring"
 )
@@ -22,8 +22,7 @@ type Sink struct {
 	active uuid.UUID
 	start  time.Time
 
-	floor time.Duration
-
+	floor     time.Duration
 	stride    map[uuid.UUID]time.Duration
 	pending   map[uuid.UUID]*ring.Ring[job.Stalled]
 	scheduled rbtree.RBTree[time.Duration, job.Stalled]
@@ -31,9 +30,9 @@ type Sink struct {
 	mu sync.Mutex
 }
 
-func NewSink(constraints rob.Constraints, worker rob.Worker) *Sink {
+func NewSink(id uuid.UUID, constraints rob.Constraints, worker rob.Worker) *Sink {
 	return &Sink{
-		id:          uuid.New(),
+		id:          id,
 		constraints: constraints,
 		worker:      worker,
 
@@ -42,60 +41,38 @@ func NewSink(constraints rob.Constraints, worker rob.Worker) *Sink {
 	}
 }
 
-func (T *Sink) ID() uuid.UUID {
-	return T.id
-}
-
-func (T *Sink) setNext(source uuid.UUID) {
+func (T *Sink) setActive(source uuid.UUID) {
+	if T.active != uuid.Nil {
+		panic("set active called when another was active")
+	}
 	T.active = source
 	T.start = time.Now()
 }
 
-func (T *Sink) addStalled(j job.Stalled) {
-	if T.active != uuid.Nil {
-		// sink is in use, add to queue
-		T.scheduleStalled(j)
-	} else {
-		// sink is open, do now
-		T.setNext(j.Source)
-		j.Out <- T
+func (T *Sink) DoConcurrent(j job.Concurrent) bool {
+	if !T.constraints.Satisfies(j.Constraints) {
+		return false
 	}
-}
 
-func (T *Sink) scheduleStalled(j job.Stalled) {
-	// try to schedule right away
-	if ok := T.tryScheduleStalled(j); ok {
-		return
-	}
+	T.mu.Lock()
 
-	// add to pending queue
-	if _, ok := T.pending[j.Source]; !ok {
-		r := ring.NewRing[job.Stalled](0, 1)
-		r.PushBack(j)
-		T.pending[j.Source] = r
-		return
+	if T.active != uuid.Nil {
+		// this Sink is in use
+		T.mu.Unlock()
+		return false
 	}
 
-	T.pending[j.Source].PushBack(j)
-}
+	T.setActive(j.Source)
 
-func (T *Sink) schedulePending(source uuid.UUID) {
-	pending, ok := T.pending[source]
-	if !ok {
-		return
-	}
-	work, ok := pending.Get(0)
-	if !ok {
-		return
-	}
-	if ok = T.tryScheduleStalled(work); !ok {
-		return
-	}
-	pending.PopFront()
+	T.mu.Unlock()
+
+	T.Do(j.Constraints, j.Work)
+	return true
 }
 
-func (T *Sink) tryScheduleStalled(j job.Stalled) bool {
+func (T *Sink) trySchedule(j job.Stalled) bool {
 	if T.active == j.Source {
+		// shouldn't be scheduled yet
 		return false
 	}
 
@@ -122,75 +99,96 @@ func (T *Sink) tryScheduleStalled(j job.Stalled) bool {
 	return true
 }
 
-func (T *Sink) next() bool {
-	if T.active != uuid.Nil {
-		source := T.active
-		dur := time.Since(T.start)
-		T.active = uuid.Nil
-
-		T.stride[source] += dur
-
-		T.schedulePending(source)
+func (T *Sink) enqueue(j job.Stalled) {
+	if T.trySchedule(j) {
+		return
 	}
 
-	stride, j, ok := T.scheduled.Min()
+	p, ok := T.pending[j.Source]
+
+	// add to pending queue
 	if !ok {
-		return false
+		p = ring.NewRing[job.Stalled](0, 1)
+		T.pending[j.Source] = p
 	}
-	T.scheduled.Delete(stride)
-	T.floor = stride
 
-	T.setNext(j.Source)
-	j.Out <- T
-	return true
+	p.PushBack(j)
 }
 
-func (T *Sink) DoConcurrent(j job.Concurrent) (done, hasMore bool) {
+func (T *Sink) DoStalled(j job.Stalled) bool {
 	if !T.constraints.Satisfies(j.Constraints) {
-		return false, true
+		return false
 	}
 
+	// enqueue job
 	T.mu.Lock()
+	defer T.mu.Unlock()
 
-	if T.active != uuid.Nil {
-		T.mu.Unlock()
-		// this Sink is in use
-		return false, true
+	if T.active == uuid.Nil {
+		// run it now
+		T.setActive(j.Source)
+		j.Ready <- T.id
+		return true
 	}
 
-	T.setNext(j.Source)
-	T.mu.Unlock()
-
-	return true, T.Do(j.Constraints, j.Work)
+	// enqueue for running later
+	T.enqueue(j)
+	return true
 }
 
-func (T *Sink) DoStalled(j job.Stalled) (ok bool) {
-	if !T.constraints.Satisfies(j.Constraints) {
-		return false
+func (T *Sink) enqueueNextFor(source uuid.UUID) {
+	pending, ok := T.pending[source]
+	if !ok {
+		return
 	}
+	j, ok := pending.Get(0)
+	if !ok {
+		return
+	}
+	if ok = T.trySchedule(j); !ok {
+		return
+	}
+	pending.PopFront()
+}
 
-	T.mu.Lock()
-	defer T.mu.Unlock()
+func (T *Sink) next() bool {
+	if T.active != uuid.Nil {
+		source := T.active
+		dur := time.Since(T.start)
+		T.active = uuid.Nil
+
+		T.stride[source] += dur
+
+		T.enqueueNextFor(source)
+	}
 
-	T.addStalled(j)
+	stride, j, ok := T.scheduled.Min()
+	if !ok {
+		return false
+	}
+	T.scheduled.Delete(stride)
+	if stride > T.floor {
+		T.floor = stride
+	}
 
+	T.setActive(j.Source)
+	j.Ready <- T.id
 	return true
 }
 
-func (T *Sink) Do(constraints rob.Constraints, work any) bool {
-	if !T.constraints.Satisfies(constraints) {
-		panic("Do called on sink with non satisfied constraints")
-	}
+func (T *Sink) Do(constraints rob.Constraints, work any) (hasMore bool) {
 	T.worker.Do(constraints, work)
+
+	// queue next
 	T.mu.Lock()
 	defer T.mu.Unlock()
 	return T.next()
 }
 
-func (T *Sink) StealFor(rhs *Sink) (uuid.UUID, bool) {
+func (T *Sink) StealFor(rhs *Sink) uuid.UUID {
 	if T == rhs {
 		// cannot steal from ourselves
-		return uuid.Nil, false
+		return uuid.Nil
 	}
 
 	T.mu.Lock()
@@ -203,36 +201,28 @@ func (T *Sink) StealFor(rhs *Sink) (uuid.UUID, bool) {
 			// take jobs from T
 			T.scheduled.Delete(stride)
 
-			pending, _ := T.pending[work.Source]
-			delete(T.pending, work.Source)
+			pending, _ := T.pending[source]
+			delete(T.pending, source)
 
-			// we have to unlock to prevent deadlock
-			// if T tries to steal from rhs at the same time rhs tries to steal from T, we could deadlock otherwise
-			// (speaking from experience)
 			T.mu.Unlock()
 
-			// add to rhs
-			rhs.mu.Lock()
-
-			rhs.addStalled(work)
+			rhs.DoStalled(work)
 
 			for work, ok = pending.PopFront(); ok; work, ok = pending.PopFront() {
-				rhs.addStalled(work)
+				rhs.DoStalled(work)
 			}
 
-			rhs.mu.Unlock()
-
-			// try to return buffer to T (if fails, it's not a big deal)
-
 			T.mu.Lock()
 
-			if _, ok = T.pending[work.Source]; !ok {
-				T.pending[work.Source] = pending
+			if pending != nil {
+				if _, ok = T.pending[source]; !ok {
+					T.pending[source] = pending
+				}
 			}
 
-			return source, true
+			return source
 		}
 	}
 
-	return uuid.Nil, false
+	return uuid.Nil
 }
diff --git a/lib/rob/schedulers/v1/scheduler.go b/lib/rob/schedulers/v1/scheduler.go
index 4cf12a29..bf6cb759 100644
--- a/lib/rob/schedulers/v1/scheduler.go
+++ b/lib/rob/schedulers/v1/scheduler.go
@@ -1,18 +1,32 @@
 package schedulers
 
-import "pggat2/lib/rob"
+import (
+	"pggat2/lib/rob"
+	"pggat2/lib/rob/schedulers/v1/pool"
+	"pggat2/lib/rob/schedulers/v1/source"
+)
 
 type Scheduler struct {
+	pool pool.Pool
+}
+
+func MakeScheduler() Scheduler {
+	return Scheduler{
+		pool: pool.MakePool(),
+	}
+}
+
+func NewScheduler() *Scheduler {
+	s := MakeScheduler()
+	return &s
 }
 
 func (T *Scheduler) AddSink(constraints rob.Constraints, worker rob.Worker) {
-	// TODO implement me
-	panic("implement me")
+	T.pool.AddWorker(constraints, worker)
 }
 
 func (T *Scheduler) NewSource() rob.Worker {
-	// TODO implement me
-	panic("implement me")
+	return source.NewSource(&T.pool)
 }
 
 var _ rob.Scheduler = (*Scheduler)(nil)
diff --git a/lib/rob/schedulers/v0/scheduler_test.go b/lib/rob/schedulers/v1/scheduler_test.go
similarity index 100%
rename from lib/rob/schedulers/v0/scheduler_test.go
rename to lib/rob/schedulers/v1/scheduler_test.go
diff --git a/lib/rob/schedulers/v0/source/source.go b/lib/rob/schedulers/v1/source/source.go
similarity index 60%
rename from lib/rob/schedulers/v0/source/source.go
rename to lib/rob/schedulers/v1/source/source.go
index f6c82772..373f663e 100644
--- a/lib/rob/schedulers/v0/source/source.go
+++ b/lib/rob/schedulers/v1/source/source.go
@@ -4,9 +4,8 @@ import (
 	"github.com/google/uuid"
 
 	"pggat2/lib/rob"
-	"pggat2/lib/rob/schedulers/v0/job"
-	"pggat2/lib/rob/schedulers/v0/pool"
-	"pggat2/lib/rob/schedulers/v0/sink"
+	"pggat2/lib/rob/schedulers/v1/pool"
+	"pggat2/lib/rob/schedulers/v1/pool/job"
 	"pggat2/lib/util/pools"
 )
 
@@ -14,7 +13,7 @@ type Source struct {
 	id   uuid.UUID
 	pool *pool.Pool
 
-	stall pools.Locked[chan any]
+	stall pools.Locked[chan uuid.UUID]
 }
 
 func NewSource(p *pool.Pool) *Source {
@@ -25,28 +24,28 @@ func NewSource(p *pool.Pool) *Source {
 }
 
 func (T *Source) Do(constraints rob.Constraints, work any) {
-	if T.pool.DoConcurrent(job.Concurrent{
+	base := job.Base{
 		Source:      T.id,
 		Constraints: constraints,
-		Work:        work,
+	}
+	if T.pool.DoConcurrent(job.Concurrent{
+		Base: base,
+		Work: work,
 	}) {
 		return
 	}
 	out, ok := T.stall.Get()
 	if !ok {
-		out = make(chan any)
+		out = make(chan uuid.UUID)
 	}
 	defer T.stall.Put(out)
 
 	T.pool.DoStalled(job.Stalled{
-		Source:      T.id,
-		Constraints: constraints,
-		Out:         out,
+		Base:  base,
+		Ready: out,
 	})
-	worker := (<-out).(*sink.Sink)
-	if hasMore := worker.Do(constraints, work); !hasMore {
-		T.pool.StealFor(worker)
-	}
+	worker := <-out
+	T.pool.Do(worker, constraints, work)
 }
 
 var _ rob.Worker = (*Source)(nil)
diff --git a/lib/util/maps/rwlocked.go b/lib/util/maps/rwlocked.go
index 530724a2..a51239a2 100644
--- a/lib/util/maps/rwlocked.go
+++ b/lib/util/maps/rwlocked.go
@@ -61,15 +61,3 @@ func (T *RWLocked[K, V]) Swap(key K, value V) (previous V, loaded bool) {
 	T.inner[key] = value
 	return
 }
-
-func (T *RWLocked[K, V]) Range(f func(key K, value V) bool) {
-	T.mu.RLock()
-	for key, value := range T.inner {
-		T.mu.RUnlock()
-		if !f(key, value) {
-			return
-		}
-		T.mu.RLock()
-	}
-	T.mu.RUnlock()
-}
-- 
GitLab