From 19beb79f27200b9efdea8c04b8c0caa3bfced908 Mon Sep 17 00:00:00 2001
From: Garet Halliday <me@garet.holiday>
Date: Tue, 18 Jul 2023 19:17:06 -0500
Subject: [PATCH] removable

---
 cmd/cgat/main.go                        |  1 +
 lib/rob/scheduler.go                    |  5 ++-
 lib/rob/schedulers/v1/pool/pool.go      | 48 ++++++++++++++++++-------
 lib/rob/schedulers/v1/pool/sink/sink.go | 40 +++++++++++++++++----
 lib/rob/schedulers/v1/scheduler.go      | 10 ++++--
 lib/rob/schedulers/v1/scheduler_test.go | 48 +++++++++++++++++++++++--
 lib/rob/schedulers/v1/source/source.go  |  7 ++--
 7 files changed, 133 insertions(+), 26 deletions(-)

diff --git a/cmd/cgat/main.go b/cmd/cgat/main.go
index 76671d6e..16ca9b11 100644
--- a/cmd/cgat/main.go
+++ b/cmd/cgat/main.go
@@ -35,6 +35,7 @@ func (T server) Do(_ rob.Constraints, w any) {
 	job.psc.SetServer(T.pss)
 	T.eqps.SetClient(job.eqpc)
 	bouncers.Bounce(job.rw, T.rw)
+	return
 }
 
 var _ rob.Worker = server{}
diff --git a/lib/rob/scheduler.go b/lib/rob/scheduler.go
index dc19f198..fb621e9b 100644
--- a/lib/rob/scheduler.go
+++ b/lib/rob/scheduler.go
@@ -1,7 +1,10 @@
 package rob
 
+import "github.com/google/uuid"
+
 type Scheduler interface {
-	AddSink(Constraints, Worker)
+	AddSink(Constraints, Worker) uuid.UUID
+	RemoveSink(id uuid.UUID)
 
 	NewSource() Worker
 }
diff --git a/lib/rob/schedulers/v1/pool/pool.go b/lib/rob/schedulers/v1/pool/pool.go
index e7186643..d3172dd4 100644
--- a/lib/rob/schedulers/v1/pool/pool.go
+++ b/lib/rob/schedulers/v1/pool/pool.go
@@ -27,17 +27,17 @@ func MakePool() Pool {
 	}
 }
 
-func (T *Pool) DoConcurrent(j job.Concurrent) bool {
+func (T *Pool) ExecuteConcurrent(j job.Concurrent) bool {
 	affinity, _ := T.affinity.Load(j.Source)
 
-	// these can be unlocked and locked a bunch here because it is less bad if DoConcurrent misses a sink
+	// these can be unlocked and locked a bunch here because it is less bad if ExecuteConcurrent misses a sink
 	// (it will just stall the job and try again)
 	T.mu.RLock()
 
 	// try affinity first
 	if v, ok := T.sinks[affinity]; ok {
 		T.mu.RUnlock()
-		if done, hasMore := v.DoConcurrent(j); done {
+		if done, hasMore := v.ExecuteConcurrent(j); done {
 			if !hasMore {
 				T.stealFor(affinity)
 			}
@@ -51,7 +51,7 @@ func (T *Pool) DoConcurrent(j job.Concurrent) bool {
 			continue
 		}
 		T.mu.RUnlock()
-		if ok, hasMore := v.DoConcurrent(j); ok {
+		if ok, hasMore := v.ExecuteConcurrent(j); ok {
 			// set affinity
 			T.affinity.Store(j.Source, id)
 
@@ -68,7 +68,7 @@ func (T *Pool) DoConcurrent(j job.Concurrent) bool {
 	return false
 }
 
-func (T *Pool) DoStalled(j job.Stalled) {
+func (T *Pool) ExecuteStalled(j job.Stalled) {
 	affinity, _ := T.affinity.Load(j.Source)
 
 	T.mu.RLock()
@@ -76,7 +76,7 @@ func (T *Pool) DoStalled(j job.Stalled) {
 
 	// try affinity first
 	if v, ok := T.sinks[affinity]; ok {
-		if ok = v.DoStalled(j); ok {
+		if ok = v.ExecuteStalled(j); ok {
 			return
 		}
 	}
@@ -86,7 +86,7 @@ func (T *Pool) DoStalled(j job.Stalled) {
 			continue
 		}
 
-		if ok := v.DoStalled(j); ok {
+		if ok := v.ExecuteStalled(j); ok {
 			T.affinity.Store(j.Source, id)
 			return
 		}
@@ -98,7 +98,7 @@ func (T *Pool) DoStalled(j job.Stalled) {
 	T.backlog = append(T.backlog, j)
 }
 
-func (T *Pool) AddWorker(constraints rob.Constraints, worker rob.Worker) {
+func (T *Pool) AddWorker(constraints rob.Constraints, worker rob.Worker) uuid.UUID {
 	id := uuid.New()
 	s := sink.NewSink(id, constraints, worker)
 
@@ -108,18 +108,42 @@ func (T *Pool) AddWorker(constraints rob.Constraints, worker rob.Worker) {
 	T.sinks[id] = s
 	i := 0
 	for _, v := range T.backlog {
-		if ok := s.DoStalled(v); !ok {
+		if ok := s.ExecuteStalled(v); !ok {
 			T.backlog[i] = v
 			i++
 		}
 	}
 	T.backlog = T.backlog[:i]
+
+	return id
+}
+
+func (T *Pool) RemoveWorker(id uuid.UUID) {
+	T.mu.Lock()
+	s, ok := T.sinks[id]
+	if !ok {
+		T.mu.Unlock()
+		return
+	}
+	delete(T.sinks, id)
+	T.mu.Unlock()
+
+	// now we need to reschedule all the work that was scheduled to s (stalled only).
+	jobs := s.StealAll()
+
+	for _, j := range jobs {
+		T.ExecuteStalled(j)
+	}
 }
 
 func (T *Pool) stealFor(id uuid.UUID) {
 	T.mu.RLock()
 
-	s := T.sinks[id]
+	s, ok := T.sinks[id]
+	if !ok {
+		T.mu.RUnlock()
+		return
+	}
 
 	for _, v := range T.sinks {
 		if v == s {
@@ -137,12 +161,12 @@ func (T *Pool) stealFor(id uuid.UUID) {
 	T.mu.RUnlock()
 }
 
-func (T *Pool) Do(id uuid.UUID, constraints rob.Constraints, work any) {
+func (T *Pool) Execute(id uuid.UUID, constraints rob.Constraints, work any) {
 	T.mu.RLock()
 	s := T.sinks[id]
 	T.mu.RUnlock()
 
-	if !s.Do(constraints, work) {
+	if !s.Execute(constraints, work) {
 		// try to steal
 		T.stealFor(id)
 	}
diff --git a/lib/rob/schedulers/v1/pool/sink/sink.go b/lib/rob/schedulers/v1/pool/sink/sink.go
index 52b67bdb..32fa8045 100644
--- a/lib/rob/schedulers/v1/pool/sink/sink.go
+++ b/lib/rob/schedulers/v1/pool/sink/sink.go
@@ -49,7 +49,7 @@ func (T *Sink) setActive(source uuid.UUID) {
 	T.start = time.Now()
 }
 
-func (T *Sink) DoConcurrent(j job.Concurrent) (ok, hasMore bool) {
+func (T *Sink) ExecuteConcurrent(j job.Concurrent) (ok, hasMore bool) {
 	if !T.constraints.Satisfies(j.Constraints) {
 		return false, false
 	}
@@ -66,7 +66,7 @@ func (T *Sink) DoConcurrent(j job.Concurrent) (ok, hasMore bool) {
 
 	T.mu.Unlock()
 
-	return true, T.Do(j.Constraints, j.Work)
+	return true, T.Execute(j.Constraints, j.Work)
 }
 
 func (T *Sink) trySchedule(j job.Stalled) bool {
@@ -119,7 +119,7 @@ func (T *Sink) enqueue(j job.Stalled) {
 	p.PushBack(j)
 }
 
-func (T *Sink) DoStalled(j job.Stalled) bool {
+func (T *Sink) ExecuteStalled(j job.Stalled) bool {
 	if !T.constraints.Satisfies(j.Constraints) {
 		return false
 	}
@@ -180,7 +180,7 @@ func (T *Sink) next() bool {
 	return true
 }
 
-func (T *Sink) Do(constraints rob.Constraints, work any) (hasMore bool) {
+func (T *Sink) Execute(constraints rob.Constraints, work any) (hasMore bool) {
 	T.worker.Do(constraints, work)
 
 	// queue next
@@ -210,10 +210,10 @@ func (T *Sink) StealFor(rhs *Sink) uuid.UUID {
 
 			T.mu.Unlock()
 
-			rhs.DoStalled(j)
+			rhs.ExecuteStalled(j)
 
 			for j, ok = pending.PopFront(); ok; j, ok = pending.PopFront() {
-				rhs.DoStalled(j)
+				rhs.ExecuteStalled(j)
 			}
 
 			T.mu.Lock()
@@ -230,3 +230,31 @@ func (T *Sink) StealFor(rhs *Sink) uuid.UUID {
 
 	return uuid.Nil
 }
+
+func (T *Sink) StealAll() []job.Stalled {
+	var all []job.Stalled
+
+	T.mu.Lock()
+	defer T.mu.Unlock()
+
+	for {
+		if k, j, ok := T.scheduled.Min(); ok {
+			T.scheduled.Delete(k)
+			all = append(all, j)
+		} else {
+			break
+		}
+	}
+
+	for _, value := range T.pending {
+		for {
+			if j, ok := value.PopFront(); ok {
+				all = append(all, j)
+			} else {
+				break
+			}
+		}
+	}
+
+	return all
+}
diff --git a/lib/rob/schedulers/v1/scheduler.go b/lib/rob/schedulers/v1/scheduler.go
index bf6cb759..3087da87 100644
--- a/lib/rob/schedulers/v1/scheduler.go
+++ b/lib/rob/schedulers/v1/scheduler.go
@@ -1,6 +1,8 @@
 package schedulers
 
 import (
+	"github.com/google/uuid"
+
 	"pggat2/lib/rob"
 	"pggat2/lib/rob/schedulers/v1/pool"
 	"pggat2/lib/rob/schedulers/v1/source"
@@ -21,8 +23,12 @@ func NewScheduler() *Scheduler {
 	return &s
 }
 
-func (T *Scheduler) AddSink(constraints rob.Constraints, worker rob.Worker) {
-	T.pool.AddWorker(constraints, worker)
+func (T *Scheduler) AddSink(constraints rob.Constraints, worker rob.Worker) uuid.UUID {
+	return T.pool.AddWorker(constraints, worker)
+}
+
+func (T *Scheduler) RemoveSink(id uuid.UUID) {
+	T.pool.RemoveWorker(id)
 }
 
 func (T *Scheduler) NewSource() rob.Worker {
diff --git a/lib/rob/schedulers/v1/scheduler_test.go b/lib/rob/schedulers/v1/scheduler_test.go
index da5f41a7..4f146c5b 100644
--- a/lib/rob/schedulers/v1/scheduler_test.go
+++ b/lib/rob/schedulers/v1/scheduler_test.go
@@ -7,6 +7,8 @@ import (
 	"testing"
 	"time"
 
+	"github.com/google/uuid"
+
 	"pggat2/lib/rob"
 )
 
@@ -61,8 +63,8 @@ func (T *TestSink) Do(constraints rob.Constraints, work any) {
 
 var _ rob.Worker = (*TestSink)(nil)
 
-func testSink(sched *Scheduler, table *ShareTable, constraints rob.Constraints) {
-	sched.AddSink(constraints, &TestSink{
+func testSink(sched *Scheduler, table *ShareTable, constraints rob.Constraints) uuid.UUID {
+	return sched.AddSink(constraints, &TestSink{
 		table:       table,
 		constraints: constraints,
 	})
@@ -401,3 +403,45 @@ func TestScheduler_Starve(t *testing.T) {
 		t.Error("expected all executions to be similar (is 0 starving?)")
 	}
 }
+
+func TestScheduler_RemoveSink(t *testing.T) {
+	var table ShareTable
+	sched := NewScheduler()
+	testSink(sched, &table, 0)
+	toRemove := testSink(sched, &table, 0)
+
+	go testSource(sched, 0, 10*time.Millisecond, 0)
+	go testSource(sched, 1, 10*time.Millisecond, 0)
+	go testSource(sched, 2, 10*time.Millisecond, 0)
+	go testSource(sched, 3, 10*time.Millisecond, 0)
+
+	time.Sleep(10 * time.Second)
+
+	sched.RemoveSink(toRemove)
+
+	time.Sleep(10 * time.Second)
+
+	t0 := table.Get(0)
+	t1 := table.Get(1)
+	t2 := table.Get(2)
+	t3 := table.Get(3)
+
+	/*
+		Expectations:
+		- all users should get similar # of executions
+	*/
+
+	t.Log("share of 0:", t0)
+	t.Log("share of 1:", t1)
+	t.Log("share of 2:", t2)
+	t.Log("share of 3:", t3)
+
+	if !similar(t0, t1, t2, t3) {
+		t.Error("expected all shares to be similar")
+	}
+
+	if t0 == 0 {
+		t.Error("expected executions on all sources (is there a race in the balancer??)")
+		t.Errorf("%s", allStacks())
+	}
+}
diff --git a/lib/rob/schedulers/v1/source/source.go b/lib/rob/schedulers/v1/source/source.go
index da6db406..8eda121c 100644
--- a/lib/rob/schedulers/v1/source/source.go
+++ b/lib/rob/schedulers/v1/source/source.go
@@ -28,7 +28,7 @@ func (T *Source) Do(constraints rob.Constraints, work any) {
 		Source:      T.id,
 		Constraints: constraints,
 	}
-	if T.pool.DoConcurrent(job.Concurrent{
+	if T.pool.ExecuteConcurrent(job.Concurrent{
 		Base: base,
 		Work: work,
 	}) {
@@ -40,12 +40,13 @@ func (T *Source) Do(constraints rob.Constraints, work any) {
 	}
 	defer T.stall.Put(out)
 
-	T.pool.DoStalled(job.Stalled{
+	T.pool.ExecuteStalled(job.Stalled{
 		Base:  base,
 		Ready: out,
 	})
 	worker := <-out
-	T.pool.Do(worker, constraints, work)
+	T.pool.Execute(worker, constraints, work)
+	return
 }
 
 var _ rob.Worker = (*Source)(nil)
-- 
GitLab