From f8c8753e424f1b3fda752845a17c76d038f61608 Mon Sep 17 00:00:00 2001
From: Garet Halliday <me@garet.holiday>
Date: Tue, 25 Jul 2023 19:13:31 -0600
Subject: [PATCH] sched

---
 cmd/cgat/main.go                        |  2 +-
 contrib/discovery/k8s/k8s.go            |  3 +-
 go.mod                                  |  1 -
 go.sum                                  |  1 -
 lib/gat/pools/session/pool.go           | 41 +++-----------
 lib/gat/pools/transaction/conn.go       | 15 ++---
 lib/gat/pools/transaction/pool.go       | 28 ++++------
 lib/gat/recipe.go                       | 50 +++++++++++++++--
 lib/gat/recipebook/recipebook.go        | 50 -----------------
 lib/rob/context.go                      |  6 ++
 lib/rob/scheduler.go                    |  3 +-
 lib/rob/schedulers/v1/pool/job/base.go  |  8 +--
 lib/rob/schedulers/v1/pool/pool.go      | 29 +++++++++-
 lib/rob/schedulers/v1/pool/sink/sink.go | 19 +++++--
 lib/rob/schedulers/v1/scheduler.go      |  4 ++
 lib/rob/schedulers/v1/scheduler_test.go | 73 +++++++++++++++++++++++--
 lib/rob/schedulers/v1/source/source.go  | 12 ++--
 lib/rob/worker.go                       |  2 +-
 18 files changed, 203 insertions(+), 144 deletions(-)
 delete mode 100644 lib/gat/recipebook/recipebook.go
 create mode 100644 lib/rob/context.go

diff --git a/cmd/cgat/main.go b/cmd/cgat/main.go
index 97537366..049b9f1b 100644
--- a/cmd/cgat/main.go
+++ b/cmd/cgat/main.go
@@ -27,7 +27,7 @@ func main() {
 	// create pool
 	pool := transaction.NewPool()
 	postgres.AddPool("uniswap", pool)
-	pool.AddRecipe("localhost", gat.Recipe{
+	pool.AddRecipe("localhost", gat.TCPRecipe{
 		Database:       "uniswap",
 		Address:        "localhost:5432",
 		User:           "postgres",
diff --git a/contrib/discovery/k8s/k8s.go b/contrib/discovery/k8s/k8s.go
index 512ebf75..d1fed911 100644
--- a/contrib/discovery/k8s/k8s.go
+++ b/contrib/discovery/k8s/k8s.go
@@ -3,6 +3,7 @@ package k8s
 import (
 	"context"
 	"fmt"
+
 	"pggat2/lib/gat"
 
 	"tuxpa.in/a/zlog/log"
@@ -15,7 +16,7 @@ import (
 )
 
 type PodWatcher struct {
-	BaseRecipe gat.Recipe
+	BaseRecipe gat.TCPRecipe
 
 	Namespace   string
 	ListOptions metav1.ListOptions
diff --git a/go.mod b/go.mod
index d4e104c0..0b32b1df 100644
--- a/go.mod
+++ b/go.mod
@@ -5,7 +5,6 @@ go 1.20
 require (
 	github.com/google/uuid v1.3.0
 	github.com/xdg-go/scram v1.1.2
-	golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4
 	k8s.io/api v0.27.4
 	k8s.io/apimachinery v0.27.4
 	k8s.io/client-go v0.27.4
diff --git a/go.sum b/go.sum
index 0565953b..8d7e7b62 100644
--- a/go.sum
+++ b/go.sum
@@ -284,7 +284,6 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ
 golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 h1:uVc8UZUe6tr40fFVnUP5Oj+veunVezqYl9z7DYw9xzw=
 golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
diff --git a/lib/gat/pools/session/pool.go b/lib/gat/pools/session/pool.go
index 4261756f..bccc2388 100644
--- a/lib/gat/pools/session/pool.go
+++ b/lib/gat/pools/session/pool.go
@@ -2,13 +2,11 @@ package session
 
 import (
 	"log"
-	"net"
 	"sync"
 
 	"pggat2/lib/bouncer/backends/v0"
 	"pggat2/lib/bouncer/bouncers/v2"
 	"pggat2/lib/gat"
-	"pggat2/lib/gat/recipebook"
 	"pggat2/lib/zap"
 )
 
@@ -17,15 +15,12 @@ type Pool struct {
 	queue []zap.ReadWriter
 	mu    sync.RWMutex
 
-	book *recipebook.Book
-
 	signal chan struct{}
 }
 
 func NewPool() *Pool {
 	return &Pool{
 		signal: make(chan struct{}),
-		book:   recipebook.NewBook(),
 	}
 }
 
@@ -71,47 +66,27 @@ func (T *Pool) Serve(client zap.ReadWriter) {
 }
 
 func (T *Pool) AddRecipe(name string, recipe gat.Recipe) {
-	funcCh := make(chan func())
-	if !T.book.AddIfNew(name, recipe, func() {
-		fn := <-funcCh
-		if fn != nil {
-			fn()
-		}
-	}) {
-		return
-	}
-	conns := []zap.ReadWriter{}
-	for i := 0; i < recipe.MinConnections; i++ {
-		conn, err := net.Dial("tcp", recipe.Address)
+	for i := 0; i < recipe.GetMinConnections(); i++ {
+		rw, err := recipe.Connect()
 		if err != nil {
-			_ = conn.Close()
 			// TODO(garet) do something here
-			log.Printf("Failed to connect to %s: %v", recipe.Address, err)
+			log.Printf("Failed to connect: %v", err)
 			continue
 		}
-		rw := zap.WrapIOReadWriter(conn)
-		err2 := backends.Accept(rw, recipe.User, recipe.Password, recipe.Database)
+		err2 := backends.Accept(rw, recipe.GetUser(), recipe.GetPassword(), recipe.GetDatabase())
 		if err2 != nil {
-			_ = conn.Close()
+			_ = rw.Close()
 			// TODO(garet) do something here
-			log.Printf("Failed to connect to %s: %v", recipe.Address, err2)
+			log.Printf("Failed to connect: %v", err2)
 			continue
 		}
-		conns = append(conns, rw)
 		T.release(rw)
 	}
-	fn := func() {
-		for _, v := range conns {
-			v.Close()
-		}
-	}
-	funcCh <- fn
 }
 
 func (T *Pool) RemoveRecipe(name string) {
-	if !T.book.Remove(name) {
-		return
-	}
+	panic("TODO")
+	// TODO(garet)
 }
 
 var _ gat.Pool = (*Pool)(nil)
diff --git a/lib/gat/pools/transaction/conn.go b/lib/gat/pools/transaction/conn.go
index bedcefe9..b41d3955 100644
--- a/lib/gat/pools/transaction/conn.go
+++ b/lib/gat/pools/transaction/conn.go
@@ -1,8 +1,6 @@
 package transaction
 
 import (
-	"github.com/google/uuid"
-
 	"pggat2/lib/bouncer/bouncers/v2"
 	"pggat2/lib/middleware/middlewares/eqp"
 	"pggat2/lib/middleware/middlewares/ps"
@@ -11,14 +9,12 @@ import (
 )
 
 type Conn struct {
-	pool *Pool
-	id   uuid.UUID
-	rw   zap.ReadWriter
-	eqp  *eqp.Server
-	ps   *ps.Server
+	rw  zap.ReadWriter
+	eqp *eqp.Server
+	ps  *ps.Server
 }
 
-func (T *Conn) Do(_ rob.Constraints, work any) {
+func (T *Conn) Do(ctx *rob.Context, work any) {
 	job := work.(Work)
 	job.ps.SetServer(T.ps)
 	T.eqp.SetClient(job.eqp)
@@ -27,8 +23,7 @@ func (T *Conn) Do(_ rob.Constraints, work any) {
 		_ = job.rw.Close()
 		if serverErr != nil {
 			_ = T.rw.Close()
-			T.pool.remove(T.id)
-			panic(serverErr)
+			ctx.Removed = true
 		}
 	}
 	return
diff --git a/lib/gat/pools/transaction/pool.go b/lib/gat/pools/transaction/pool.go
index 60614b12..24e08b5e 100644
--- a/lib/gat/pools/transaction/pool.go
+++ b/lib/gat/pools/transaction/pool.go
@@ -2,7 +2,6 @@ package transaction
 
 import (
 	"log"
-	"net"
 
 	"github.com/google/uuid"
 
@@ -30,15 +29,13 @@ func NewPool() *Pool {
 }
 
 func (T *Pool) AddRecipe(name string, recipe gat.Recipe) {
-	for i := 0; i < recipe.MinConnections; i++ {
-		conn, err := net.Dial("tcp", recipe.Address)
+	for i := 0; i < recipe.GetMinConnections(); i++ {
+		rw, err := recipe.Connect()
 		if err != nil {
-			_ = conn.Close()
 			// TODO(garet) do something here
-			log.Printf("Failed to connect to %s: %v", recipe.Address, err)
+			log.Printf("Failed to connect: %v", err)
 			continue
 		}
-		rw := zap.WrapIOReadWriter(conn)
 		eqps := eqp.NewServer()
 		pss := ps.NewServer()
 		mw := interceptor.NewInterceptor(
@@ -46,21 +43,19 @@ func (T *Pool) AddRecipe(name string, recipe gat.Recipe) {
 			eqps,
 			pss,
 		)
-		err2 := backends.Accept(mw, recipe.User, recipe.Password, recipe.Database)
+		err2 := backends.Accept(mw, recipe.GetUser(), recipe.GetPassword(), recipe.GetDatabase())
 		if err2 != nil {
-			_ = conn.Close()
+			_ = rw.Close()
 			// TODO(garet) do something here
-			log.Printf("Failed to connect to %s: %v", recipe.Address, err2)
+			log.Printf("Failed to connect: %v", err2)
 			continue
 		}
 		sink := &Conn{
-			pool: T,
-			rw:   mw,
-			eqp:  eqps,
-			ps:   pss,
+			rw:  mw,
+			eqp: eqps,
+			ps:  pss,
 		}
-		id := T.s.AddSink(0, sink)
-		sink.id = id
+		T.s.AddSink(0, sink)
 	}
 }
 
@@ -85,12 +80,13 @@ func (T *Pool) Serve(client zap.ReadWriter) {
 	)
 	buffer := zapbuf.NewBuffer(client)
 	defer buffer.Done()
+	var ctx rob.Context
 	for {
 		if err := buffer.Buffer(); err != nil {
 			_ = client.Close()
 			break
 		}
-		source.Do(0, Work{
+		source.Do(&ctx, Work{
 			rw:  buffer,
 			eqp: eqpc,
 			ps:  psc,
diff --git a/lib/gat/recipe.go b/lib/gat/recipe.go
index f8d14f10..62ffebfa 100644
--- a/lib/gat/recipe.go
+++ b/lib/gat/recipe.go
@@ -1,19 +1,57 @@
 package gat
 
-import "reflect"
+import (
+	"net"
 
-type Recipe struct {
-	// Connection Parameters
+	"pggat2/lib/zap"
+)
+
+type Recipe interface {
+	Connect() (zap.ReadWriter, error)
+
+	GetDatabase() string
+	GetUser() string
+	GetPassword() string
+
+	GetMinConnections() int
+	GetMaxConnections() int
+}
+
+type TCPRecipe struct {
 	Database string
 	Address  string
 	User     string
 	Password string
 
-	// Config
 	MinConnections int
 	MaxConnections int
 }
 
-func RecipesEqual(a, b Recipe) bool {
-	return reflect.DeepEqual(a, b)
+func (T TCPRecipe) Connect() (zap.ReadWriter, error) {
+	conn, err := net.Dial("tcp", T.Address)
+	if err != nil {
+		return nil, err
+	}
+	rw := zap.WrapIOReadWriter(conn)
+	return rw, nil
+}
+
+func (T TCPRecipe) GetDatabase() string {
+	return T.Database
+}
+
+func (T TCPRecipe) GetUser() string {
+	return T.User
+}
+
+func (T TCPRecipe) GetPassword() string {
+	return T.Password
+}
+
+func (T TCPRecipe) GetMinConnections() int {
+	return T.MinConnections
+}
+
+func (T TCPRecipe) GetMaxConnections() int {
+	return T.MaxConnections
 }
diff --git a/lib/gat/recipebook/recipebook.go b/lib/gat/recipebook/recipebook.go
deleted file mode 100644
index 24aaabb2..00000000
--- a/lib/gat/recipebook/recipebook.go
+++ /dev/null
@@ -1,50 +0,0 @@
-package recipebook
-
-import (
-	"pggat2/lib/gat"
-	"sync"
-)
-
-type Entry struct {
-	r       gat.Recipe
-	onEvict func()
-}
-
-type Book struct {
-	item map[string]Entry
-	mu   sync.RWMutex
-}
-
-func NewBook() *Book {
-	return &Book{
-		item: map[string]Entry{},
-	}
-}
-
-func (b *Book) Remove(name string) (found bool) {
-	b.mu.Lock()
-	defer b.mu.Unlock()
-	val, ok := b.item[name]
-	if !ok {
-		return false
-	}
-	val.onEvict()
-	delete(b.item, name)
-	return true
-}
-
-func (b *Book) AddIfNew(name string, recipe gat.Recipe, onEvict func()) (changed bool) {
-	e := Entry{r: recipe, onEvict: onEvict}
-	b.mu.Lock()
-	defer b.mu.Unlock()
-	val, ok := b.item[name]
-	if !ok {
-		b.item[name] = e
-		return true
-	}
-	if !gat.RecipesEqual(val.r, recipe) {
-		b.item[name] = e
-		return true
-	}
-	return false
-}
diff --git a/lib/rob/context.go b/lib/rob/context.go
new file mode 100644
index 00000000..79ba0e57
--- /dev/null
+++ b/lib/rob/context.go
@@ -0,0 +1,6 @@
+package rob
+
+type Context struct {
+	Constraints Constraints
+	Removed     bool
+}
diff --git a/lib/rob/scheduler.go b/lib/rob/scheduler.go
index 69316339..208b012f 100644
--- a/lib/rob/scheduler.go
+++ b/lib/rob/scheduler.go
@@ -3,7 +3,8 @@ package rob
 import "github.com/google/uuid"
 
 type Scheduler interface {
-	AddSink(Constraints, Worker) uuid.UUID
+	AddSink(constraints Constraints, worker Worker) uuid.UUID
+	GetSink(id uuid.UUID) Worker
 	RemoveSink(id uuid.UUID)
 
 	NewSource() Worker
diff --git a/lib/rob/schedulers/v1/pool/job/base.go b/lib/rob/schedulers/v1/pool/job/base.go
index bcc7ce43..c8a292df 100644
--- a/lib/rob/schedulers/v1/pool/job/base.go
+++ b/lib/rob/schedulers/v1/pool/job/base.go
@@ -9,8 +9,8 @@ import (
 )
 
 type Base struct {
-	Created     time.Time
-	ID          uuid.UUID
-	Source      uuid.UUID
-	Constraints rob.Constraints
+	Created time.Time
+	ID      uuid.UUID
+	Source  uuid.UUID
+	Context *rob.Context
 }
diff --git a/lib/rob/schedulers/v1/pool/pool.go b/lib/rob/schedulers/v1/pool/pool.go
index 102fe2ba..58319caa 100644
--- a/lib/rob/schedulers/v1/pool/pool.go
+++ b/lib/rob/schedulers/v1/pool/pool.go
@@ -38,6 +38,10 @@ func (T *Pool) ExecuteConcurrent(j job.Concurrent) bool {
 	if v, ok := T.sinks[affinity]; ok {
 		T.mu.RUnlock()
 		if done, hasMore := v.ExecuteConcurrent(j); done {
+			if j.Context.Removed {
+				T.RemoveWorker(affinity)
+				return true
+			}
 			if !hasMore {
 				T.stealFor(affinity)
 			}
@@ -52,6 +56,11 @@ func (T *Pool) ExecuteConcurrent(j job.Concurrent) bool {
 		}
 		T.mu.RUnlock()
 		if ok, hasMore := v.ExecuteConcurrent(j); ok {
+			if j.Context.Removed {
+				T.RemoveWorker(id)
+				return true
+			}
+
 			// set affinity
 			T.affinity.Store(j.Source, id)
 
@@ -118,6 +127,16 @@ func (T *Pool) AddWorker(constraints rob.Constraints, worker rob.Worker) uuid.UU
 	return id
 }
 
+func (T *Pool) GetWorker(id uuid.UUID) rob.Worker {
+	T.mu.RLock()
+	defer T.mu.RUnlock()
+	s, ok := T.sinks[id]
+	if !ok {
+		return nil
+	}
+	return s.GetWorker()
+}
+
 func (T *Pool) RemoveWorker(id uuid.UUID) {
 	T.mu.Lock()
 	s, ok := T.sinks[id]
@@ -161,12 +180,18 @@ func (T *Pool) stealFor(id uuid.UUID) {
 	T.mu.RUnlock()
 }
 
-func (T *Pool) Execute(id uuid.UUID, constraints rob.Constraints, work any) {
+func (T *Pool) Execute(id uuid.UUID, ctx *rob.Context, work any) {
 	T.mu.RLock()
 	s := T.sinks[id]
 	T.mu.RUnlock()
 
-	if !s.Execute(constraints, work) {
+	hasMore := s.Execute(ctx, work)
+	if ctx.Removed {
+		// remove
+		T.RemoveWorker(id)
+		return
+	}
+	if !hasMore {
 		// try to steal
 		T.stealFor(id)
 	}
diff --git a/lib/rob/schedulers/v1/pool/sink/sink.go b/lib/rob/schedulers/v1/pool/sink/sink.go
index 5686972a..2895ebdc 100644
--- a/lib/rob/schedulers/v1/pool/sink/sink.go
+++ b/lib/rob/schedulers/v1/pool/sink/sink.go
@@ -50,6 +50,10 @@ func NewSink(id uuid.UUID, constraints rob.Constraints, worker rob.Worker) *Sink
 	}
 }
 
+func (T *Sink) GetWorker() rob.Worker {
+	return T.worker
+}
+
 func (T *Sink) setActive(source uuid.UUID) {
 	if T.active != uuid.Nil {
 		panic("set active called when another was active")
@@ -61,7 +65,7 @@ func (T *Sink) setActive(source uuid.UUID) {
 }
 
 func (T *Sink) ExecuteConcurrent(j job.Concurrent) (ok, hasMore bool) {
-	if !T.constraints.Satisfies(j.Constraints) {
+	if !T.constraints.Satisfies(j.Context.Constraints) {
 		return false, false
 	}
 
@@ -77,7 +81,7 @@ func (T *Sink) ExecuteConcurrent(j job.Concurrent) (ok, hasMore bool) {
 
 	T.mu.Unlock()
 
-	return true, T.Execute(j.Constraints, j.Work)
+	return true, T.Execute(j.Context, j.Work)
 }
 
 func (T *Sink) trySchedule(j job.Stalled) bool {
@@ -131,7 +135,7 @@ func (T *Sink) enqueue(j job.Stalled) {
 }
 
 func (T *Sink) ExecuteStalled(j job.Stalled) bool {
-	if !T.constraints.Satisfies(j.Constraints) {
+	if !T.constraints.Satisfies(j.Context.Constraints) {
 		return false
 	}
 
@@ -193,8 +197,11 @@ func (T *Sink) next() bool {
 	return true
 }
 
-func (T *Sink) Execute(constraints rob.Constraints, work any) (hasMore bool) {
-	T.worker.Do(constraints, work)
+func (T *Sink) Execute(ctx *rob.Context, work any) (hasMore bool) {
+	T.worker.Do(ctx, work)
+	if ctx.Removed {
+		return false
+	}
 
 	// queue next
 	T.mu.Lock()
@@ -212,7 +219,7 @@ func (T *Sink) StealFor(rhs *Sink) uuid.UUID {
 	defer T.mu.Unlock()
 
 	for stride, j, ok := T.scheduled.Min(); ok; stride, j, ok = T.scheduled.Next(stride) {
-		if rhs.constraints.Satisfies(j.Constraints) {
+		if rhs.constraints.Satisfies(j.Context.Constraints) {
 			source := j.Source
 
 			// take jobs from T
diff --git a/lib/rob/schedulers/v1/scheduler.go b/lib/rob/schedulers/v1/scheduler.go
index 85d644d2..1bb79173 100644
--- a/lib/rob/schedulers/v1/scheduler.go
+++ b/lib/rob/schedulers/v1/scheduler.go
@@ -27,6 +27,10 @@ func (T *Scheduler) AddSink(constraints rob.Constraints, worker rob.Worker) uuid
 	return T.pool.AddWorker(constraints, worker)
 }
 
+func (T *Scheduler) GetSink(id uuid.UUID) rob.Worker {
+	return T.pool.GetWorker(id)
+}
+
 func (T *Scheduler) RemoveSink(id uuid.UUID) {
 	T.pool.RemoveWorker(id)
 }
diff --git a/lib/rob/schedulers/v1/scheduler_test.go b/lib/rob/schedulers/v1/scheduler_test.go
index 4f146c5b..2f135dcc 100644
--- a/lib/rob/schedulers/v1/scheduler_test.go
+++ b/lib/rob/schedulers/v1/scheduler_test.go
@@ -44,14 +44,16 @@ type TestSink struct {
 	table       *ShareTable
 	constraints rob.Constraints
 	inuse       atomic.Bool
+	remove      atomic.Bool
+	removed     atomic.Bool
 }
 
-func (T *TestSink) Do(constraints rob.Constraints, work any) {
+func (T *TestSink) Do(ctx *rob.Context, work any) {
 	if T.inuse.Swap(true) {
 		panic("Sink was already inuse")
 	}
 	defer T.inuse.Store(false)
-	if !T.constraints.Satisfies(constraints) {
+	if !T.constraints.Satisfies(ctx.Constraints) {
 		panic("Scheduler did not obey constraints")
 	}
 	v := work.(Work)
@@ -59,6 +61,13 @@ func (T *TestSink) Do(constraints rob.Constraints, work any) {
 	for time.Since(start) < v.Duration {
 	}
 	T.table.Inc(v.Sender)
+	if T.remove.Load() {
+		removed := T.removed.Swap(true)
+		if removed {
+			panic("Scheduler did not remove when requested")
+		}
+		ctx.Removed = true
+	}
 }
 
 var _ rob.Worker = (*TestSink)(nil)
@@ -70,6 +79,18 @@ func testSink(sched *Scheduler, table *ShareTable, constraints rob.Constraints)
 	})
 }
 
+func testSinkRemoveAfter(sched *Scheduler, table *ShareTable, constraints rob.Constraints, removeAfter time.Duration) uuid.UUID {
+	sink := &TestSink{
+		table:       table,
+		constraints: constraints,
+	}
+	go func() {
+		time.Sleep(removeAfter)
+		sink.remove.Store(true)
+	}()
+	return sched.AddSink(constraints, sink)
+}
+
 func testSource(sched *Scheduler, id int, dur time.Duration, constraints rob.Constraints) {
 	source := sched.NewSource()
 	for {
@@ -77,7 +98,9 @@ func testSource(sched *Scheduler, id int, dur time.Duration, constraints rob.Con
 			Sender:   id,
 			Duration: dur,
 		}
-		source.Do(constraints, w)
+		source.Do(&rob.Context{
+			Constraints: constraints,
+		}, w)
 	}
 }
 
@@ -88,7 +111,9 @@ func testStarver(sched *Scheduler, id int, dur time.Duration, constraints rob.Co
 			Sender:   id,
 			Duration: dur,
 		}
-		source.Do(constraints, w)
+		source.Do(&rob.Context{
+			Constraints: constraints,
+		}, w)
 	}
 }
 
@@ -404,7 +429,7 @@ func TestScheduler_Starve(t *testing.T) {
 	}
 }
 
-func TestScheduler_RemoveSink(t *testing.T) {
+func TestScheduler_RemoveSinkOuter(t *testing.T) {
 	var table ShareTable
 	sched := NewScheduler()
 	testSink(sched, &table, 0)
@@ -445,3 +470,41 @@ func TestScheduler_RemoveSink(t *testing.T) {
 		t.Errorf("%s", allStacks())
 	}
 }
+
+func TestScheduler_RemoveSinkInner(t *testing.T) {
+	var table ShareTable
+	sched := NewScheduler()
+	testSink(sched, &table, 0)
+	testSinkRemoveAfter(sched, &table, 0, 10*time.Second)
+
+	go testSource(sched, 0, 10*time.Millisecond, 0)
+	go testSource(sched, 1, 10*time.Millisecond, 0)
+	go testSource(sched, 2, 10*time.Millisecond, 0)
+	go testSource(sched, 3, 10*time.Millisecond, 0)
+
+	time.Sleep(20 * time.Second)
+
+	t0 := table.Get(0)
+	t1 := table.Get(1)
+	t2 := table.Get(2)
+	t3 := table.Get(3)
+
+	/*
+		Expectations:
+		- all users should get similar # of executions
+	*/
+
+	t.Log("share of 0:", t0)
+	t.Log("share of 1:", t1)
+	t.Log("share of 2:", t2)
+	t.Log("share of 3:", t3)
+
+	if !similar(t0, t1, t2, t3) {
+		t.Error("expected all shares to be similar")
+	}
+
+	if t0 == 0 {
+		t.Error("expected executions on all sources (is there a race in the balancer??)")
+		t.Errorf("%s", allStacks())
+	}
+}
diff --git a/lib/rob/schedulers/v1/source/source.go b/lib/rob/schedulers/v1/source/source.go
index 194e6d52..d36ce378 100644
--- a/lib/rob/schedulers/v1/source/source.go
+++ b/lib/rob/schedulers/v1/source/source.go
@@ -25,12 +25,12 @@ func NewSource(p *pool.Pool) *Source {
 	}
 }
 
-func (T *Source) Do(constraints rob.Constraints, work any) {
+func (T *Source) Do(ctx *rob.Context, work any) {
 	base := job.Base{
-		Created:     time.Now(),
-		ID:          uuid.New(),
-		Source:      T.id,
-		Constraints: constraints,
+		Created: time.Now(),
+		ID:      uuid.New(),
+		Source:  T.id,
+		Context: ctx,
 	}
 	if T.pool.ExecuteConcurrent(job.Concurrent{
 		Base: base,
@@ -49,7 +49,7 @@ func (T *Source) Do(constraints rob.Constraints, work any) {
 		Ready: out,
 	})
 	worker := <-out
-	T.pool.Execute(worker, constraints, work)
+	T.pool.Execute(worker, ctx, work)
 	return
 }
 
diff --git a/lib/rob/worker.go b/lib/rob/worker.go
index 82bf3868..27823975 100644
--- a/lib/rob/worker.go
+++ b/lib/rob/worker.go
@@ -1,5 +1,5 @@
 package rob
 
 type Worker interface {
-	Do(constraints Constraints, work any)
+	Do(ctx *Context, work any)
 }
-- 
GitLab