From 5043c431da12d04d659de362aa858ec764b8d23d Mon Sep 17 00:00:00 2001
From: Garet Halliday <me@garet.holiday>
Date: Tue, 25 Jul 2023 22:27:49 -0600
Subject: [PATCH] very bad experimental auto scaling

---
 cmd/cgat/main.go                        |   2 +-
 lib/gat/pools/transaction/pool.go       | 101 ++++++++++++++++++------
 lib/rob/scheduler.go                    |   7 +-
 lib/rob/schedulers/v1/pool/pool.go      |   6 ++
 lib/rob/schedulers/v1/scheduler.go      |  10 ++-
 lib/rob/schedulers/v1/scheduler_test.go |   6 +-
 6 files changed, 98 insertions(+), 34 deletions(-)

diff --git a/cmd/cgat/main.go b/cmd/cgat/main.go
index 049b9f1b..f8965cc0 100644
--- a/cmd/cgat/main.go
+++ b/cmd/cgat/main.go
@@ -32,7 +32,7 @@ func main() {
 		Address:        "localhost:5432",
 		User:           "postgres",
 		Password:       "password",
-		MinConnections: 5,
+		MinConnections: 0,
 		MaxConnections: 5,
 	})
 
diff --git a/lib/gat/pools/transaction/pool.go b/lib/gat/pools/transaction/pool.go
index 215e74aa..92c56e43 100644
--- a/lib/gat/pools/transaction/pool.go
+++ b/lib/gat/pools/transaction/pool.go
@@ -2,6 +2,7 @@ package transaction
 
 import (
 	"log"
+	"sync"
 
 	"github.com/google/uuid"
 
@@ -12,7 +13,6 @@ import (
 	"pggat2/lib/middleware/middlewares/ps"
 	"pggat2/lib/rob"
 	"pggat2/lib/rob/schedulers/v1"
-	"pggat2/lib/util/maps"
 	"pggat2/lib/util/maths"
 	"pggat2/lib/zap"
 	"pggat2/lib/zap/zapbuf"
@@ -21,7 +21,8 @@ import (
 type Pool struct {
 	s schedulers.Scheduler
 
-	recipes maps.RWLocked[string, *Recipe]
+	recipes map[string]*Recipe
+	mu      sync.RWMutex
 }
 
 func NewPool() *Pool {
@@ -32,12 +33,16 @@ func NewPool() *Pool {
 	return pool
 }
 
-func (T *Pool) openOne(r *Recipe) {
+func (T *Pool) scaleUpRecipe(r *Recipe) bool {
+	if len(r.open) >= r.recipe.GetMaxConnections() {
+		return false
+	}
+
 	rw, err := r.recipe.Connect()
 	if err != nil {
 		// TODO(garet) do something here
 		log.Printf("Failed to connect: %v", err)
-		return
+		return false
 	}
 	eqps := eqp.NewServer()
 	pss := ps.NewServer()
@@ -51,33 +56,39 @@ func (T *Pool) openOne(r *Recipe) {
 		_ = rw.Close()
 		// TODO(garet) do something here
 		log.Printf("Failed to connect: %v", err2)
-		return
+		return false
 	}
 	sink := &Conn{
 		rw:  mw,
 		eqp: eqps,
 		ps:  pss,
 	}
-	id := T.s.AddSink(0, sink)
+	id := T.s.AddWorker(0, sink)
 	r.open = append(r.open, id)
+	return true
 }
 
-func (T *Pool) closeOne(r *Recipe) {
+func (T *Pool) scaleDownRecipe(r *Recipe) bool {
+	if len(r.open) <= r.recipe.GetMinConnections() {
+		return false
+	}
+
 	if len(r.open) == 0 {
 		// none to close
-		return
+		return false
 	}
 
 	id := r.open[len(r.open)-1]
-	conn := T.s.RemoveSink(id).(*Conn)
+	conn := T.s.RemoveWorker(id).(*Conn)
 	_ = conn.rw.Close()
 	r.open = r.open[:len(r.open)-1]
+	return true
 }
 
-func (T *Pool) openCount(r *Recipe) int {
+func (T *Pool) recipeOpenCount(r *Recipe) int {
 	j := 0
 	for i := 0; i < len(r.open); i++ {
-		if T.s.GetSink(r.open[i]) != nil {
+		if T.s.GetWorker(r.open[i]) != nil {
 			r.open[j] = r.open[i]
 			j++
 		}
@@ -87,27 +98,36 @@ func (T *Pool) openCount(r *Recipe) int {
 	return j
 }
 
-func (T *Pool) scale(r *Recipe, target int) {
-	target = maths.Clamp(target, r.recipe.GetMinConnections(), r.recipe.GetMaxConnections())
-
-	target -= T.openCount(r)
-
+func (T *Pool) scaleRecipe(r *Recipe, target int) (remaining int) {
 	if target > 0 {
 		for i := 0; i < target; i++ {
-			T.openOne(r)
+			if !T.scaleUpRecipe(r) {
+				break
+			}
+			target--
 		}
 	} else if target < 0 {
 		for i := 0; i > target; i-- {
-			T.closeOne(r)
+			if !T.scaleDownRecipe(r) {
+				break
+			}
+			target++
 		}
 	}
+	return target
+}
+
+func (T *Pool) scaleRecipeTo(r *Recipe, target int) bool {
+	target = maths.Clamp(target, r.recipe.GetMinConnections(), r.recipe.GetMaxConnections())
+	target -= T.recipeOpenCount(r)
+	return T.scaleRecipe(r, target) == 0
 }
 
 func (T *Pool) addRecipe(r *Recipe) {
 	r.mu.Lock()
 	defer r.mu.Unlock()
 
-	T.scale(r, 0)
+	T.scaleRecipe(r, 0)
 }
 
 func (T *Pool) removeRecipe(r *Recipe) {
@@ -115,28 +135,53 @@ func (T *Pool) removeRecipe(r *Recipe) {
 	defer r.mu.Unlock()
 
 	for len(r.open) > 0 {
-		T.closeOne(r)
+		T.scaleDownRecipe(r)
 	}
 }
 
 func (T *Pool) AddRecipe(name string, recipe gat.Recipe) {
 	r := NewRecipe(recipe)
 	T.addRecipe(r)
-	if old, ok := T.recipes.Swap(name, r); ok {
+	T.mu.Lock()
+	old, ok := T.recipes[name]
+	if T.recipes == nil {
+		T.recipes = make(map[string]*Recipe)
+	}
+	T.recipes[name] = r
+	T.mu.Unlock()
+	if ok {
 		T.removeRecipe(old)
 	}
 }
 
 func (T *Pool) remove(id uuid.UUID) {
-	T.s.RemoveSink(id)
+	T.s.RemoveWorker(id)
 }
 
 func (T *Pool) RemoveRecipe(name string) {
-	if r, ok := T.recipes.LoadAndDelete(name); ok {
+	T.mu.Lock()
+	r, ok := T.recipes[name]
+	T.mu.Unlock()
+	if ok {
 		T.removeRecipe(r)
 	}
 }
 
+func (T *Pool) scale(amount int) bool {
+	T.mu.RLock()
+	for _, r := range T.recipes {
+		T.mu.RUnlock()
+		amount = T.scaleRecipe(r, amount)
+		T.mu.RLock()
+		if amount == 0 {
+			break
+		}
+	}
+	T.mu.RUnlock()
+
+	return amount == 0
+}
+
 func (T *Pool) Serve(client zap.ReadWriter) {
 	source := T.s.NewSource()
 	eqpc := eqp.NewClient()
@@ -152,7 +197,6 @@ func (T *Pool) Serve(client zap.ReadWriter) {
 	var ctx rob.Context
 	for {
 		if err := buffer.Buffer(); err != nil {
-			_ = client.Close()
 			break
 		}
 		source.Do(&ctx, Work{
@@ -161,10 +205,19 @@ func (T *Pool) Serve(client zap.ReadWriter) {
 			ps:  psc,
 		})
 	}
+	_ = client.Close()
 }
 
 func (T *Pool) ReadSchedulerMetrics(metrics *rob.Metrics) {
 	T.s.ReadMetrics(metrics)
+	avgUtil := metrics.AverageWorkerUtilization()
+	workerCount := len(metrics.Workers)
+	if avgUtil > 0.7 {
+		T.scale(1)
+	}
+	if avgUtil < 0.7 && ((float64(workerCount)*avgUtil)/(float64(workerCount-1)*100.0) < 0.7 || avgUtil == 0) {
+		T.scale(-1)
+	}
 }
 
 var _ gat.Pool = (*Pool)(nil)
diff --git a/lib/rob/scheduler.go b/lib/rob/scheduler.go
index 0dd77cba..22f6c2c9 100644
--- a/lib/rob/scheduler.go
+++ b/lib/rob/scheduler.go
@@ -3,9 +3,10 @@ package rob
 import "github.com/google/uuid"
 
 type Scheduler interface {
-	AddSink(constraints Constraints, worker Worker) uuid.UUID
-	GetSink(id uuid.UUID) Worker
-	RemoveSink(id uuid.UUID) Worker
+	AddWorker(constraints Constraints, worker Worker) uuid.UUID
+	GetWorker(id uuid.UUID) Worker
+	RemoveWorker(id uuid.UUID) Worker
+	WorkerCount() int
 
 	NewSource() Worker
 
diff --git a/lib/rob/schedulers/v1/pool/pool.go b/lib/rob/schedulers/v1/pool/pool.go
index 6fb64868..42200abb 100644
--- a/lib/rob/schedulers/v1/pool/pool.go
+++ b/lib/rob/schedulers/v1/pool/pool.go
@@ -157,6 +157,12 @@ func (T *Pool) RemoveWorker(id uuid.UUID) rob.Worker {
 	return s.GetWorker()
 }
 
+func (T *Pool) WorkerCount() int {
+	T.mu.RLock()
+	defer T.mu.RUnlock()
+	return len(T.sinks)
+}
+
 func (T *Pool) stealFor(id uuid.UUID) {
 	T.mu.RLock()
 
diff --git a/lib/rob/schedulers/v1/scheduler.go b/lib/rob/schedulers/v1/scheduler.go
index a59fb480..608704d8 100644
--- a/lib/rob/schedulers/v1/scheduler.go
+++ b/lib/rob/schedulers/v1/scheduler.go
@@ -23,18 +23,22 @@ func NewScheduler() *Scheduler {
 	return &s
 }
 
-func (T *Scheduler) AddSink(constraints rob.Constraints, worker rob.Worker) uuid.UUID {
+func (T *Scheduler) AddWorker(constraints rob.Constraints, worker rob.Worker) uuid.UUID {
 	return T.pool.AddWorker(constraints, worker)
 }
 
-func (T *Scheduler) GetSink(id uuid.UUID) rob.Worker {
+func (T *Scheduler) GetWorker(id uuid.UUID) rob.Worker {
 	return T.pool.GetWorker(id)
 }
 
-func (T *Scheduler) RemoveSink(id uuid.UUID) rob.Worker {
+func (T *Scheduler) RemoveWorker(id uuid.UUID) rob.Worker {
 	return T.pool.RemoveWorker(id)
 }
 
+func (T *Scheduler) WorkerCount() int {
+	return T.pool.WorkerCount()
+}
+
 func (T *Scheduler) NewSource() rob.Worker {
 	return source.NewSource(&T.pool)
 }
diff --git a/lib/rob/schedulers/v1/scheduler_test.go b/lib/rob/schedulers/v1/scheduler_test.go
index aaecd2e7..43f22dd6 100644
--- a/lib/rob/schedulers/v1/scheduler_test.go
+++ b/lib/rob/schedulers/v1/scheduler_test.go
@@ -73,7 +73,7 @@ func (T *TestSink) Do(ctx *rob.Context, work any) {
 var _ rob.Worker = (*TestSink)(nil)
 
 func testSink(sched *Scheduler, table *ShareTable, constraints rob.Constraints) uuid.UUID {
-	return sched.AddSink(constraints, &TestSink{
+	return sched.AddWorker(constraints, &TestSink{
 		table:       table,
 		constraints: constraints,
 	})
@@ -88,7 +88,7 @@ func testSinkRemoveAfter(sched *Scheduler, table *ShareTable, constraints rob.Co
 		time.Sleep(removeAfter)
 		sink.remove.Store(true)
 	}()
-	return sched.AddSink(constraints, sink)
+	return sched.AddWorker(constraints, sink)
 }
 
 func testSource(sched *Scheduler, id int, dur time.Duration, constraints rob.Constraints) {
@@ -442,7 +442,7 @@ func TestScheduler_RemoveSinkOuter(t *testing.T) {
 
 	time.Sleep(10 * time.Second)
 
-	sched.RemoveSink(toRemove)
+	sched.RemoveWorker(toRemove)
 
 	time.Sleep(10 * time.Second)
 
-- 
GitLab