From 7fdb549e7b61eeca2e88111e45045e0fdce8b4e6 Mon Sep 17 00:00:00 2001 From: Garet Halliday <me@garet.holiday> Date: Mon, 23 Oct 2023 16:26:59 -0500 Subject: [PATCH] prevent pool from double scaling up, reduce latency --- lib/gat/handlers/pool/pooler.go | 14 +++-- lib/gat/handlers/pool/poolers/lifo/factory.go | 2 +- lib/gat/handlers/pool/poolers/lifo/pooler.go | 54 ++++++++++--------- lib/gat/handlers/pool/poolers/rob/factory.go | 2 +- lib/gat/handlers/pool/poolers/rob/pooler.go | 25 +++++---- lib/gat/handlers/pool/spool/pool.go | 26 ++------- lib/rob/schedulers/v3/scheduler.go | 32 ++++++++++- 7 files changed, 87 insertions(+), 68 deletions(-) diff --git a/lib/gat/handlers/pool/pooler.go b/lib/gat/handlers/pool/pooler.go index 96728601..ac3c34ea 100644 --- a/lib/gat/handlers/pool/pooler.go +++ b/lib/gat/handlers/pool/pooler.go @@ -2,13 +2,6 @@ package pool import "github.com/google/uuid" -type SyncMode int - -const ( - SyncModeNonBlocking SyncMode = iota - SyncModeBlocking -) - type Pooler interface { AddClient(id uuid.UUID) DeleteClient(client uuid.UUID) @@ -16,9 +9,14 @@ type Pooler interface { AddServer(id uuid.UUID) DeleteServer(server uuid.UUID) - Acquire(client uuid.UUID, sync SyncMode) (server uuid.UUID) + Acquire(client uuid.UUID) (server uuid.UUID) Release(server uuid.UUID) + // Waiting is signalled when a client begins waiting + Waiting() <-chan struct{} + // Waiters returns the number of waiters + Waiters() int + Close() } diff --git a/lib/gat/handlers/pool/poolers/lifo/factory.go b/lib/gat/handlers/pool/poolers/lifo/factory.go index 27af21dc..a4d2655f 100644 --- a/lib/gat/handlers/pool/poolers/lifo/factory.go +++ b/lib/gat/handlers/pool/poolers/lifo/factory.go @@ -23,7 +23,7 @@ func (T *Factory) CaddyModule() caddy.ModuleInfo { } func (T *Factory) NewPooler() pool.Pooler { - return new(Pooler) + return NewPooler() } var _ pool.PoolerFactory = (*Factory)(nil) diff --git a/lib/gat/handlers/pool/poolers/lifo/pooler.go b/lib/gat/handlers/pool/poolers/lifo/pooler.go index 1096a48f..90d259ad 100644 --- a/lib/gat/handlers/pool/poolers/lifo/pooler.go +++ b/lib/gat/handlers/pool/poolers/lifo/pooler.go @@ -10,13 +10,22 @@ import ( ) type Pooler struct { + waiting chan struct{} + queue []uuid.UUID servers map[uuid.UUID]struct{} + waiters int ready sync.Cond closed bool mu sync.Mutex } +func NewPooler() *Pooler { + return &Pooler{ + waiting: make(chan struct{}, 1), + } +} + func (*Pooler) AddClient(_ uuid.UUID) {} func (*Pooler) DeleteClient(_ uuid.UUID) { @@ -50,23 +59,6 @@ func (T *Pooler) DeleteServer(server uuid.UUID) { delete(T.servers, server) } -func (T *Pooler) TryAcquire() uuid.UUID { - T.mu.Lock() - defer T.mu.Unlock() - - if T.closed { - return uuid.Nil - } - - if len(T.queue) == 0 { - return uuid.Nil - } - - server := T.queue[len(T.queue)-1] - T.queue = T.queue[:len(T.queue)-1] - return server -} - func (T *Pooler) AcquireBlocking() uuid.UUID { T.mu.Lock() defer T.mu.Unlock() @@ -79,7 +71,13 @@ func (T *Pooler) AcquireBlocking() uuid.UUID { if T.ready.L == nil { T.ready.L = &T.mu } + T.waiters++ + select { + case T.waiting <- struct{}{}: + default: + } T.ready.Wait() + T.waiters-- } if T.closed { @@ -91,15 +89,8 @@ func (T *Pooler) AcquireBlocking() uuid.UUID { return server } -func (T *Pooler) Acquire(_ uuid.UUID, mode pool.SyncMode) uuid.UUID { - switch mode { - case pool.SyncModeNonBlocking: - return T.TryAcquire() - case pool.SyncModeBlocking: - return T.AcquireBlocking() - default: - return uuid.Nil - } +func (T *Pooler) Acquire(_ uuid.UUID) uuid.UUID { + return T.AcquireBlocking() } func (T *Pooler) Release(server uuid.UUID) { @@ -114,6 +105,17 @@ func (T *Pooler) Release(server uuid.UUID) { T.queue = append(T.queue, server) } +func (T *Pooler) Waiting() <-chan struct{} { + return T.waiting +} + +func (T *Pooler) Waiters() int { + T.mu.Lock() + defer T.mu.Unlock() + + return T.waiters +} + func (T *Pooler) Close() { T.mu.Lock() defer T.mu.Unlock() diff --git a/lib/gat/handlers/pool/poolers/rob/factory.go b/lib/gat/handlers/pool/poolers/rob/factory.go index cd3a6269..d3e74531 100644 --- a/lib/gat/handlers/pool/poolers/rob/factory.go +++ b/lib/gat/handlers/pool/poolers/rob/factory.go @@ -23,7 +23,7 @@ func (T *Factory) CaddyModule() caddy.ModuleInfo { } func (T *Factory) NewPooler() pool.Pooler { - return new(Pooler) + return NewPooler() } var _ pool.PoolerFactory = (*Factory)(nil) diff --git a/lib/gat/handlers/pool/poolers/rob/pooler.go b/lib/gat/handlers/pool/poolers/rob/pooler.go index fcb3fdbc..28bd52b2 100644 --- a/lib/gat/handlers/pool/poolers/rob/pooler.go +++ b/lib/gat/handlers/pool/poolers/rob/pooler.go @@ -12,6 +12,12 @@ type Pooler struct { s schedulers.Scheduler } +func NewPooler() *Pooler { + return &Pooler{ + s: schedulers.MakeScheduler(), + } +} + func (T *Pooler) AddClient(id uuid.UUID) { T.s.AddUser(id) } @@ -28,21 +34,22 @@ func (T *Pooler) DeleteServer(server uuid.UUID) { T.s.DeleteWorker(server) } -func (T *Pooler) Acquire(client uuid.UUID, sync pool.SyncMode) (server uuid.UUID) { - switch sync { - case pool.SyncModeBlocking: - return T.s.Acquire(client, rob.SyncModeBlocking) - case pool.SyncModeNonBlocking: - return T.s.Acquire(client, rob.SyncModeNonBlocking) - default: - panic("unreachable") - } +func (T *Pooler) Acquire(client uuid.UUID) (server uuid.UUID) { + return T.s.Acquire(client, rob.SyncModeTryNonBlocking) } func (T *Pooler) Release(server uuid.UUID) { T.s.Release(server) } +func (T *Pooler) Waiting() <-chan struct{} { + return T.s.Waiting() +} + +func (T *Pooler) Waiters() int { + return T.s.Waiters() +} + func (T *Pooler) Close() { T.s.Close() } diff --git a/lib/gat/handlers/pool/spool/pool.go b/lib/gat/handlers/pool/spool/pool.go index 2a4bc782..08120df8 100644 --- a/lib/gat/handlers/pool/spool/pool.go +++ b/lib/gat/handlers/pool/spool/pool.go @@ -3,7 +3,6 @@ package spool import ( "sort" "sync" - "sync/atomic" "time" "github.com/google/uuid" @@ -23,9 +22,6 @@ type Pool struct { closed chan struct{} - pendingCount atomic.Int64 - pending chan struct{} - recipes map[string]*Recipe recipeScaleOrder []*Recipe servers map[uuid.UUID]*Server @@ -40,8 +36,6 @@ func MakePool(config Config) Pool { pooler: pooler, closed: make(chan struct{}), - - pending: make(chan struct{}, 1), } } @@ -230,7 +224,7 @@ func (T *Pool) ScaleLoop() { for { var pending <-chan struct{} if backoffNext == 0 { - pending = T.pending + pending = T.pooler.Waiting() } select { @@ -251,7 +245,7 @@ func (T *Pool) ScaleLoop() { case <-pending: // scale up ok := true - for T.pendingCount.Load() > 0 { + for T.pooler.Waiters() > 0 { if !T.ScaleUp() { ok = false break @@ -288,21 +282,9 @@ func (T *Pool) RemoveClient(client uuid.UUID) { func (T *Pool) Acquire(client uuid.UUID) *Server { for { - serverID := T.pooler.Acquire(client, pool.SyncModeNonBlocking) + serverID := T.pooler.Acquire(client) if serverID == uuid.Nil { - T.pendingCount.Add(1) - select { - case T.pending <- struct{}{}: - default: - } - - serverID = T.pooler.Acquire(client, pool.SyncModeBlocking) - - T.pendingCount.Add(-1) - - if serverID == uuid.Nil { - return nil - } + return nil } T.mu.RLock() diff --git a/lib/rob/schedulers/v3/scheduler.go b/lib/rob/schedulers/v3/scheduler.go index 83dca598..f96e1e78 100644 --- a/lib/rob/schedulers/v3/scheduler.go +++ b/lib/rob/schedulers/v3/scheduler.go @@ -13,7 +13,8 @@ import ( ) type Scheduler struct { - cc pools.Locked[chan uuid.UUID] + cc pools.Locked[chan uuid.UUID] + waiting chan struct{} closed bool @@ -27,6 +28,12 @@ type Scheduler struct { mu sync.Mutex } +func MakeScheduler() Scheduler { + return Scheduler{ + waiting: make(chan struct{}, 1), + } +} + func (T *Scheduler) AddWorker(id uuid.UUID) { T.mu.Lock() defer T.mu.Unlock() @@ -156,6 +163,10 @@ func (T *Scheduler) Acquire(user uuid.UUID, sync rob.SyncMode) uuid.UUID { } T.schedule.Set(u.Stride, job) u.Scheduled = true + select { + case T.waiting <- struct{}{}: + default: + } return uuid.Nil, ready }() @@ -223,6 +234,25 @@ func (T *Scheduler) Release(worker uuid.UUID) { T.release(w) } +func (T *Scheduler) Waiting() <-chan struct{} { + return T.waiting +} + +func (T *Scheduler) Waiters() int { + T.mu.Lock() + defer T.mu.Unlock() + + num := 0 + + for _, user := range T.users { + if user.Scheduled { + num++ + } + } + + return num +} + func (T *Scheduler) Close() { T.mu.Lock() defer T.mu.Unlock() -- GitLab