From a1a5898ab48aff66d74382b3af99b118c89a8266 Mon Sep 17 00:00:00 2001
From: Garet Halliday <me@garet.holiday>
Date: Thu, 27 Jul 2023 13:18:28 -0600
Subject: [PATCH] scale up

---
 cmd/cgat/main.go                       |  1 -
 lib/gat/pool.go                        | 30 ++++++++++++++--
 lib/gat/pools/session/metrics.go       |  4 +++
 lib/gat/pools/session/pool.go          | 47 ++++++++++++++------------
 lib/gat/pools/transaction/pool.go      |  8 +++--
 lib/rob/context.go                     |  1 +
 lib/rob/schedulers/v1/source/source.go |  6 ++++
 lib/util/chans/try.go                  | 19 +++++++++++
 8 files changed, 88 insertions(+), 28 deletions(-)
 create mode 100644 lib/gat/pools/session/metrics.go
 create mode 100644 lib/util/chans/try.go

diff --git a/cmd/cgat/main.go b/cmd/cgat/main.go
index 19e706a8..9e9b3bb8 100644
--- a/cmd/cgat/main.go
+++ b/cmd/cgat/main.go
@@ -36,7 +36,6 @@ func main() {
 		MinConnections: 0,
 		MaxConnections: 5,
 	})
-	pool.Scale(1)
 
 	go func() {
 		var metrics rob.Metrics
diff --git a/lib/gat/pool.go b/lib/gat/pool.go
index 4b4d662b..8d5a8300 100644
--- a/lib/gat/pool.go
+++ b/lib/gat/pool.go
@@ -11,8 +11,12 @@ import (
 	"pggat2/lib/zap"
 )
 
+type Context struct {
+	OnWait chan<- struct{}
+}
+
 type RawPool interface {
-	Serve(client zap.ReadWriter)
+	Serve(ctx *Context, client zap.ReadWriter)
 
 	AddServer(server zap.ReadWriter) uuid.UUID
 	GetServer(id uuid.UUID) zap.ReadWriter
@@ -152,17 +156,37 @@ type Pool struct {
 	recipes map[string]*recipeWithConns
 	mu      sync.Mutex
 
+	ctx Context
+
 	raw RawPool
 }
 
 func NewPool(rawPool RawPool) *Pool {
-	return &Pool{
+	onWait := make(chan struct{})
+
+	p := &Pool{
+		ctx: Context{
+			OnWait: onWait,
+		},
 		raw: rawPool,
 	}
+
+	go func() {
+		for {
+			_, ok := <-onWait
+			if !ok {
+				break
+			}
+
+			p.Scale(1)
+		}
+	}()
+
+	return p
 }
 
 func (T *Pool) Serve(client zap.ReadWriter) {
-	T.raw.Serve(client)
+	T.raw.Serve(&T.ctx, client)
 }
 
 func (T *Pool) CurrentScale() int {
diff --git a/lib/gat/pools/session/metrics.go b/lib/gat/pools/session/metrics.go
new file mode 100644
index 00000000..ee63fe94
--- /dev/null
+++ b/lib/gat/pools/session/metrics.go
@@ -0,0 +1,4 @@
+package session
+
+type Metrics struct {
+}
diff --git a/lib/gat/pools/session/pool.go b/lib/gat/pools/session/pool.go
index 7e8659e3..8e7a5af9 100644
--- a/lib/gat/pools/session/pool.go
+++ b/lib/gat/pools/session/pool.go
@@ -7,6 +7,7 @@ import (
 
 	"pggat2/lib/bouncer/bouncers/v2"
 	"pggat2/lib/gat"
+	"pggat2/lib/util/chans"
 	"pggat2/lib/zap"
 )
 
@@ -14,7 +15,7 @@ type Pool struct {
 	// use slice lifo for better perf
 	queue []uuid.UUID
 	conns map[uuid.UUID]zap.ReadWriter
-	mu    sync.Mutex
+	qmu   sync.RWMutex
 
 	signal chan struct{}
 }
@@ -25,37 +26,37 @@ func NewPool() *Pool {
 	}
 }
 
-func (T *Pool) acquire() (uuid.UUID, zap.ReadWriter) {
+func (T *Pool) acquire(ctx *gat.Context) (uuid.UUID, zap.ReadWriter) {
 	for {
-		T.mu.Lock()
+		T.qmu.Lock()
 		if len(T.queue) > 0 {
 			id := T.queue[len(T.queue)-1]
 			T.queue = T.queue[:len(T.queue)-1]
 			conn, ok := T.conns[id]
-			T.mu.Unlock()
+			T.qmu.Unlock()
 			if !ok {
 				continue
 			}
 			return id, conn
 		}
-		T.mu.Unlock()
+		T.qmu.Unlock()
+		if ctx.OnWait != nil {
+			chans.TrySend(ctx.OnWait, struct{}{})
+		}
 		<-T.signal
 	}
 }
 
 func (T *Pool) release(id uuid.UUID) {
-	T.mu.Lock()
-	defer T.mu.Unlock()
+	T.qmu.Lock()
+	defer T.qmu.Unlock()
 	T.queue = append(T.queue, id)
 
-	select {
-	case T.signal <- struct{}{}:
-	default:
-	}
+	chans.TrySend(T.signal, struct{}{})
 }
 
-func (T *Pool) Serve(client zap.ReadWriter) {
-	id, server := T.acquire()
+func (T *Pool) Serve(ctx *gat.Context, client zap.ReadWriter) {
+	id, server := T.acquire(ctx)
 	for {
 		clientErr, serverErr := bouncers.Bounce(client, server)
 		if clientErr != nil || serverErr != nil {
@@ -64,9 +65,9 @@ func (T *Pool) Serve(client zap.ReadWriter) {
 				T.release(id)
 			} else {
 				_ = server.Close()
-				T.mu.Lock()
+				T.qmu.Lock()
 				delete(T.conns, id)
-				T.mu.Unlock()
+				T.qmu.Unlock()
 			}
 			break
 		}
@@ -74,8 +75,8 @@ func (T *Pool) Serve(client zap.ReadWriter) {
 }
 
 func (T *Pool) AddServer(server zap.ReadWriter) uuid.UUID {
-	T.mu.Lock()
-	defer T.mu.Unlock()
+	T.qmu.Lock()
+	defer T.qmu.Unlock()
 
 	id := uuid.New()
 	if T.conns == nil {
@@ -87,15 +88,15 @@ func (T *Pool) AddServer(server zap.ReadWriter) uuid.UUID {
 }
 
 func (T *Pool) GetServer(id uuid.UUID) zap.ReadWriter {
-	T.mu.Lock()
-	defer T.mu.Unlock()
+	T.qmu.Lock()
+	defer T.qmu.Unlock()
 
 	return T.conns[id]
 }
 
 func (T *Pool) RemoveServer(id uuid.UUID) zap.ReadWriter {
-	T.mu.Lock()
-	defer T.mu.Unlock()
+	T.qmu.Lock()
+	defer T.qmu.Unlock()
 
 	conn, ok := T.conns[id]
 	if !ok {
@@ -105,4 +106,8 @@ func (T *Pool) RemoveServer(id uuid.UUID) zap.ReadWriter {
 	return conn
 }
 
+func (T *Pool) ReadMetrics(metrics *Metrics) {
+	// TODO(garet) metrics
+}
+
 var _ gat.RawPool = (*Pool)(nil)
diff --git a/lib/gat/pools/transaction/pool.go b/lib/gat/pools/transaction/pool.go
index 03ec7006..cfd3fe35 100644
--- a/lib/gat/pools/transaction/pool.go
+++ b/lib/gat/pools/transaction/pool.go
@@ -57,7 +57,7 @@ func (T *Pool) RemoveServer(id uuid.UUID) zap.ReadWriter {
 	return conn.(*Conn).rw
 }
 
-func (T *Pool) Serve(client zap.ReadWriter) {
+func (T *Pool) Serve(ctx *gat.Context, client zap.ReadWriter) {
 	source := T.s.NewSource()
 	eqpc := eqp.NewClient()
 	defer eqpc.Done()
@@ -69,12 +69,14 @@ func (T *Pool) Serve(client zap.ReadWriter) {
 	)
 	buffer := zapbuf.NewBuffer(client)
 	defer buffer.Done()
-	var ctx rob.Context
+	robCtx := rob.Context{
+		OnWait: ctx.OnWait,
+	}
 	for {
 		if err := buffer.Buffer(); err != nil {
 			break
 		}
-		source.Do(&ctx, Work{
+		source.Do(&robCtx, Work{
 			rw:  buffer,
 			eqp: eqpc,
 			ps:  psc,
diff --git a/lib/rob/context.go b/lib/rob/context.go
index af826f9d..30438e6f 100644
--- a/lib/rob/context.go
+++ b/lib/rob/context.go
@@ -1,6 +1,7 @@
 package rob
 
 type Context struct {
+	OnWait      chan<- struct{}
 	Constraints Constraints
 	Removed     bool
 }
diff --git a/lib/rob/schedulers/v1/source/source.go b/lib/rob/schedulers/v1/source/source.go
index 0d37e617..ae9e4b44 100644
--- a/lib/rob/schedulers/v1/source/source.go
+++ b/lib/rob/schedulers/v1/source/source.go
@@ -8,6 +8,7 @@ import (
 	"pggat2/lib/rob"
 	"pggat2/lib/rob/schedulers/v1/pool"
 	"pggat2/lib/rob/schedulers/v1/pool/job"
+	"pggat2/lib/util/chans"
 	"pggat2/lib/util/pools"
 )
 
@@ -40,6 +41,11 @@ func (T *Source) Do(ctx *rob.Context, work any) {
 	}) {
 		return
 	}
+
+	if ctx.OnWait != nil {
+		chans.TrySend(ctx.OnWait, struct{}{})
+	}
+
 	out, ok := T.stall.Get()
 	if !ok {
 		out = make(chan uuid.UUID, 1)
diff --git a/lib/util/chans/try.go b/lib/util/chans/try.go
new file mode 100644
index 00000000..3b88d3d6
--- /dev/null
+++ b/lib/util/chans/try.go
@@ -0,0 +1,19 @@
+package chans
+
+func TrySend[T any](ch chan<- T, value T) bool {
+	select {
+	case ch <- value:
+		return true
+	default:
+		return false
+	}
+}
+
+func TryRecv[T any](ch <-chan T) (T, bool) {
+	select {
+	case value, ok := <-ch:
+		return value, ok
+	default:
+		return *new(T), false
+	}
+}
-- 
GitLab