diff --git a/cmd/cgat/main.go b/cmd/cgat/main.go index 19e706a8c70effc2b8b6e4da53f575312ac89588..9e9b3bb8f5fbd3f146e29b71234c682f5ead6612 100644 --- a/cmd/cgat/main.go +++ b/cmd/cgat/main.go @@ -36,7 +36,6 @@ func main() { MinConnections: 0, MaxConnections: 5, }) - pool.Scale(1) go func() { var metrics rob.Metrics diff --git a/lib/gat/pool.go b/lib/gat/pool.go index 4b4d662bd465c8392dd68cc6f9a6b4f4632751d2..8d5a830087a2347c5b817f6213f2fbfd4dd4eb1b 100644 --- a/lib/gat/pool.go +++ b/lib/gat/pool.go @@ -11,8 +11,12 @@ import ( "pggat2/lib/zap" ) +type Context struct { + OnWait chan<- struct{} +} + type RawPool interface { - Serve(client zap.ReadWriter) + Serve(ctx *Context, client zap.ReadWriter) AddServer(server zap.ReadWriter) uuid.UUID GetServer(id uuid.UUID) zap.ReadWriter @@ -152,17 +156,37 @@ type Pool struct { recipes map[string]*recipeWithConns mu sync.Mutex + ctx Context + raw RawPool } func NewPool(rawPool RawPool) *Pool { - return &Pool{ + onWait := make(chan struct{}) + + p := &Pool{ + ctx: Context{ + OnWait: onWait, + }, raw: rawPool, } + + go func() { + for { + _, ok := <-onWait + if !ok { + break + } + + p.Scale(1) + } + }() + + return p } func (T *Pool) Serve(client zap.ReadWriter) { - T.raw.Serve(client) + T.raw.Serve(&T.ctx, client) } func (T *Pool) CurrentScale() int { diff --git a/lib/gat/pools/session/metrics.go b/lib/gat/pools/session/metrics.go new file mode 100644 index 0000000000000000000000000000000000000000..ee63fe94433044f85e3822872ed9148ec2350860 --- /dev/null +++ b/lib/gat/pools/session/metrics.go @@ -0,0 +1,4 @@ +package session + +type Metrics struct { +} diff --git a/lib/gat/pools/session/pool.go b/lib/gat/pools/session/pool.go index 7e8659e331bb0ee5420a67d2c002fae0a7bb7782..8e7a5af9cdacc3a2c335bd42ce13598b628f7273 100644 --- a/lib/gat/pools/session/pool.go +++ b/lib/gat/pools/session/pool.go @@ -7,6 +7,7 @@ import ( "pggat2/lib/bouncer/bouncers/v2" "pggat2/lib/gat" + "pggat2/lib/util/chans" "pggat2/lib/zap" ) @@ -14,7 +15,7 @@ type Pool struct { // use slice lifo for better perf queue []uuid.UUID conns map[uuid.UUID]zap.ReadWriter - mu sync.Mutex + qmu sync.RWMutex signal chan struct{} } @@ -25,37 +26,37 @@ func NewPool() *Pool { } } -func (T *Pool) acquire() (uuid.UUID, zap.ReadWriter) { +func (T *Pool) acquire(ctx *gat.Context) (uuid.UUID, zap.ReadWriter) { for { - T.mu.Lock() + T.qmu.Lock() if len(T.queue) > 0 { id := T.queue[len(T.queue)-1] T.queue = T.queue[:len(T.queue)-1] conn, ok := T.conns[id] - T.mu.Unlock() + T.qmu.Unlock() if !ok { continue } return id, conn } - T.mu.Unlock() + T.qmu.Unlock() + if ctx.OnWait != nil { + chans.TrySend(ctx.OnWait, struct{}{}) + } <-T.signal } } func (T *Pool) release(id uuid.UUID) { - T.mu.Lock() - defer T.mu.Unlock() + T.qmu.Lock() + defer T.qmu.Unlock() T.queue = append(T.queue, id) - select { - case T.signal <- struct{}{}: - default: - } + chans.TrySend(T.signal, struct{}{}) } -func (T *Pool) Serve(client zap.ReadWriter) { - id, server := T.acquire() +func (T *Pool) Serve(ctx *gat.Context, client zap.ReadWriter) { + id, server := T.acquire(ctx) for { clientErr, serverErr := bouncers.Bounce(client, server) if clientErr != nil || serverErr != nil { @@ -64,9 +65,9 @@ func (T *Pool) Serve(client zap.ReadWriter) { T.release(id) } else { _ = server.Close() - T.mu.Lock() + T.qmu.Lock() delete(T.conns, id) - T.mu.Unlock() + T.qmu.Unlock() } break } @@ -74,8 +75,8 @@ func (T *Pool) Serve(client zap.ReadWriter) { } func (T *Pool) AddServer(server zap.ReadWriter) uuid.UUID { - T.mu.Lock() - defer T.mu.Unlock() + T.qmu.Lock() + defer T.qmu.Unlock() id := uuid.New() if T.conns == nil { @@ -87,15 +88,15 @@ func (T *Pool) AddServer(server zap.ReadWriter) uuid.UUID { } func (T *Pool) GetServer(id uuid.UUID) zap.ReadWriter { - T.mu.Lock() - defer T.mu.Unlock() + T.qmu.Lock() + defer T.qmu.Unlock() return T.conns[id] } func (T *Pool) RemoveServer(id uuid.UUID) zap.ReadWriter { - T.mu.Lock() - defer T.mu.Unlock() + T.qmu.Lock() + defer T.qmu.Unlock() conn, ok := T.conns[id] if !ok { @@ -105,4 +106,8 @@ func (T *Pool) RemoveServer(id uuid.UUID) zap.ReadWriter { return conn } +func (T *Pool) ReadMetrics(metrics *Metrics) { + // TODO(garet) metrics +} + var _ gat.RawPool = (*Pool)(nil) diff --git a/lib/gat/pools/transaction/pool.go b/lib/gat/pools/transaction/pool.go index 03ec700637f60c690b30365d166e09bd8a40d382..cfd3fe357583ef1bfd519fd97f28c33b7a6c8196 100644 --- a/lib/gat/pools/transaction/pool.go +++ b/lib/gat/pools/transaction/pool.go @@ -57,7 +57,7 @@ func (T *Pool) RemoveServer(id uuid.UUID) zap.ReadWriter { return conn.(*Conn).rw } -func (T *Pool) Serve(client zap.ReadWriter) { +func (T *Pool) Serve(ctx *gat.Context, client zap.ReadWriter) { source := T.s.NewSource() eqpc := eqp.NewClient() defer eqpc.Done() @@ -69,12 +69,14 @@ func (T *Pool) Serve(client zap.ReadWriter) { ) buffer := zapbuf.NewBuffer(client) defer buffer.Done() - var ctx rob.Context + robCtx := rob.Context{ + OnWait: ctx.OnWait, + } for { if err := buffer.Buffer(); err != nil { break } - source.Do(&ctx, Work{ + source.Do(&robCtx, Work{ rw: buffer, eqp: eqpc, ps: psc, diff --git a/lib/rob/context.go b/lib/rob/context.go index af826f9dff5e3915bf361b1c8dbf351134f17ae6..30438e6f12c8e3eed57319bef3a1f02df3437348 100644 --- a/lib/rob/context.go +++ b/lib/rob/context.go @@ -1,6 +1,7 @@ package rob type Context struct { + OnWait chan<- struct{} Constraints Constraints Removed bool } diff --git a/lib/rob/schedulers/v1/source/source.go b/lib/rob/schedulers/v1/source/source.go index 0d37e61783db1d65fbfacf6f2122e55590ccdb12..ae9e4b44161a0bf16c3cd9779d2543adecf2995f 100644 --- a/lib/rob/schedulers/v1/source/source.go +++ b/lib/rob/schedulers/v1/source/source.go @@ -8,6 +8,7 @@ import ( "pggat2/lib/rob" "pggat2/lib/rob/schedulers/v1/pool" "pggat2/lib/rob/schedulers/v1/pool/job" + "pggat2/lib/util/chans" "pggat2/lib/util/pools" ) @@ -40,6 +41,11 @@ func (T *Source) Do(ctx *rob.Context, work any) { }) { return } + + if ctx.OnWait != nil { + chans.TrySend(ctx.OnWait, struct{}{}) + } + out, ok := T.stall.Get() if !ok { out = make(chan uuid.UUID, 1) diff --git a/lib/util/chans/try.go b/lib/util/chans/try.go new file mode 100644 index 0000000000000000000000000000000000000000..3b88d3d698643fb5c419cc7caa309ef1913e14ca --- /dev/null +++ b/lib/util/chans/try.go @@ -0,0 +1,19 @@ +package chans + +func TrySend[T any](ch chan<- T, value T) bool { + select { + case ch <- value: + return true + default: + return false + } +} + +func TryRecv[T any](ch <-chan T) (T, bool) { + select { + case value, ok := <-ch: + return value, ok + default: + return *new(T), false + } +}