diff --git a/lib/gat/handlers/pool/pooler.go b/lib/gat/handlers/pool/pooler.go index 967286017daeb3c550c4af376558b0e1cac43b05..ac3c34eab1502bae685ef1d3b82b693fddabf974 100644 --- a/lib/gat/handlers/pool/pooler.go +++ b/lib/gat/handlers/pool/pooler.go @@ -2,13 +2,6 @@ package pool import "github.com/google/uuid" -type SyncMode int - -const ( - SyncModeNonBlocking SyncMode = iota - SyncModeBlocking -) - type Pooler interface { AddClient(id uuid.UUID) DeleteClient(client uuid.UUID) @@ -16,9 +9,14 @@ type Pooler interface { AddServer(id uuid.UUID) DeleteServer(server uuid.UUID) - Acquire(client uuid.UUID, sync SyncMode) (server uuid.UUID) + Acquire(client uuid.UUID) (server uuid.UUID) Release(server uuid.UUID) + // Waiting is signalled when a client begins waiting + Waiting() <-chan struct{} + // Waiters returns the number of waiters + Waiters() int + Close() } diff --git a/lib/gat/handlers/pool/poolers/lifo/factory.go b/lib/gat/handlers/pool/poolers/lifo/factory.go index 27af21dc549336a6dd21ee8ed7b2c0a83e4b3ce2..a4d2655fdbea7908e23ac112b8c6017e70b1f873 100644 --- a/lib/gat/handlers/pool/poolers/lifo/factory.go +++ b/lib/gat/handlers/pool/poolers/lifo/factory.go @@ -23,7 +23,7 @@ func (T *Factory) CaddyModule() caddy.ModuleInfo { } func (T *Factory) NewPooler() pool.Pooler { - return new(Pooler) + return NewPooler() } var _ pool.PoolerFactory = (*Factory)(nil) diff --git a/lib/gat/handlers/pool/poolers/lifo/pooler.go b/lib/gat/handlers/pool/poolers/lifo/pooler.go index 1096a48f0c251c45a1f04dfa0f8893991ff1ea4a..90d259ad2a1a9ba8062355817cb83f5d639aa5d0 100644 --- a/lib/gat/handlers/pool/poolers/lifo/pooler.go +++ b/lib/gat/handlers/pool/poolers/lifo/pooler.go @@ -10,13 +10,22 @@ import ( ) type Pooler struct { + waiting chan struct{} + queue []uuid.UUID servers map[uuid.UUID]struct{} + waiters int ready sync.Cond closed bool mu sync.Mutex } +func NewPooler() *Pooler { + return &Pooler{ + waiting: make(chan struct{}, 1), + } +} + func (*Pooler) AddClient(_ uuid.UUID) {} func (*Pooler) DeleteClient(_ uuid.UUID) { @@ -50,23 +59,6 @@ func (T *Pooler) DeleteServer(server uuid.UUID) { delete(T.servers, server) } -func (T *Pooler) TryAcquire() uuid.UUID { - T.mu.Lock() - defer T.mu.Unlock() - - if T.closed { - return uuid.Nil - } - - if len(T.queue) == 0 { - return uuid.Nil - } - - server := T.queue[len(T.queue)-1] - T.queue = T.queue[:len(T.queue)-1] - return server -} - func (T *Pooler) AcquireBlocking() uuid.UUID { T.mu.Lock() defer T.mu.Unlock() @@ -79,7 +71,13 @@ func (T *Pooler) AcquireBlocking() uuid.UUID { if T.ready.L == nil { T.ready.L = &T.mu } + T.waiters++ + select { + case T.waiting <- struct{}{}: + default: + } T.ready.Wait() + T.waiters-- } if T.closed { @@ -91,15 +89,8 @@ func (T *Pooler) AcquireBlocking() uuid.UUID { return server } -func (T *Pooler) Acquire(_ uuid.UUID, mode pool.SyncMode) uuid.UUID { - switch mode { - case pool.SyncModeNonBlocking: - return T.TryAcquire() - case pool.SyncModeBlocking: - return T.AcquireBlocking() - default: - return uuid.Nil - } +func (T *Pooler) Acquire(_ uuid.UUID) uuid.UUID { + return T.AcquireBlocking() } func (T *Pooler) Release(server uuid.UUID) { @@ -114,6 +105,17 @@ func (T *Pooler) Release(server uuid.UUID) { T.queue = append(T.queue, server) } +func (T *Pooler) Waiting() <-chan struct{} { + return T.waiting +} + +func (T *Pooler) Waiters() int { + T.mu.Lock() + defer T.mu.Unlock() + + return T.waiters +} + func (T *Pooler) Close() { T.mu.Lock() defer T.mu.Unlock() diff --git a/lib/gat/handlers/pool/poolers/rob/factory.go b/lib/gat/handlers/pool/poolers/rob/factory.go index cd3a62693825cd70269b7385d7a9ed46de141cf1..d3e7453107cc61c0c533b4c91de69d4e05ab0ec8 100644 --- a/lib/gat/handlers/pool/poolers/rob/factory.go +++ b/lib/gat/handlers/pool/poolers/rob/factory.go @@ -23,7 +23,7 @@ func (T *Factory) CaddyModule() caddy.ModuleInfo { } func (T *Factory) NewPooler() pool.Pooler { - return new(Pooler) + return NewPooler() } var _ pool.PoolerFactory = (*Factory)(nil) diff --git a/lib/gat/handlers/pool/poolers/rob/pooler.go b/lib/gat/handlers/pool/poolers/rob/pooler.go index fcb3fdbca8a108347aedf7e9bdff38fcd92fe940..28bd52b2ff2a86daafef558f059ac4e54777223f 100644 --- a/lib/gat/handlers/pool/poolers/rob/pooler.go +++ b/lib/gat/handlers/pool/poolers/rob/pooler.go @@ -12,6 +12,12 @@ type Pooler struct { s schedulers.Scheduler } +func NewPooler() *Pooler { + return &Pooler{ + s: schedulers.MakeScheduler(), + } +} + func (T *Pooler) AddClient(id uuid.UUID) { T.s.AddUser(id) } @@ -28,21 +34,22 @@ func (T *Pooler) DeleteServer(server uuid.UUID) { T.s.DeleteWorker(server) } -func (T *Pooler) Acquire(client uuid.UUID, sync pool.SyncMode) (server uuid.UUID) { - switch sync { - case pool.SyncModeBlocking: - return T.s.Acquire(client, rob.SyncModeBlocking) - case pool.SyncModeNonBlocking: - return T.s.Acquire(client, rob.SyncModeNonBlocking) - default: - panic("unreachable") - } +func (T *Pooler) Acquire(client uuid.UUID) (server uuid.UUID) { + return T.s.Acquire(client, rob.SyncModeTryNonBlocking) } func (T *Pooler) Release(server uuid.UUID) { T.s.Release(server) } +func (T *Pooler) Waiting() <-chan struct{} { + return T.s.Waiting() +} + +func (T *Pooler) Waiters() int { + return T.s.Waiters() +} + func (T *Pooler) Close() { T.s.Close() } diff --git a/lib/gat/handlers/pool/spool/pool.go b/lib/gat/handlers/pool/spool/pool.go index 2a4bc7828923030114d0627fde616fcf0ed73f79..08120df8897c98af3a6b77dd274a2a481557facf 100644 --- a/lib/gat/handlers/pool/spool/pool.go +++ b/lib/gat/handlers/pool/spool/pool.go @@ -3,7 +3,6 @@ package spool import ( "sort" "sync" - "sync/atomic" "time" "github.com/google/uuid" @@ -23,9 +22,6 @@ type Pool struct { closed chan struct{} - pendingCount atomic.Int64 - pending chan struct{} - recipes map[string]*Recipe recipeScaleOrder []*Recipe servers map[uuid.UUID]*Server @@ -40,8 +36,6 @@ func MakePool(config Config) Pool { pooler: pooler, closed: make(chan struct{}), - - pending: make(chan struct{}, 1), } } @@ -230,7 +224,7 @@ func (T *Pool) ScaleLoop() { for { var pending <-chan struct{} if backoffNext == 0 { - pending = T.pending + pending = T.pooler.Waiting() } select { @@ -251,7 +245,7 @@ func (T *Pool) ScaleLoop() { case <-pending: // scale up ok := true - for T.pendingCount.Load() > 0 { + for T.pooler.Waiters() > 0 { if !T.ScaleUp() { ok = false break @@ -288,21 +282,9 @@ func (T *Pool) RemoveClient(client uuid.UUID) { func (T *Pool) Acquire(client uuid.UUID) *Server { for { - serverID := T.pooler.Acquire(client, pool.SyncModeNonBlocking) + serverID := T.pooler.Acquire(client) if serverID == uuid.Nil { - T.pendingCount.Add(1) - select { - case T.pending <- struct{}{}: - default: - } - - serverID = T.pooler.Acquire(client, pool.SyncModeBlocking) - - T.pendingCount.Add(-1) - - if serverID == uuid.Nil { - return nil - } + return nil } T.mu.RLock() diff --git a/lib/rob/schedulers/v3/scheduler.go b/lib/rob/schedulers/v3/scheduler.go index 83dca5988a4a9459e66b437d5519fd92dbb25de4..f96e1e7844935d616f5bf6bbfee14b2d74248787 100644 --- a/lib/rob/schedulers/v3/scheduler.go +++ b/lib/rob/schedulers/v3/scheduler.go @@ -13,7 +13,8 @@ import ( ) type Scheduler struct { - cc pools.Locked[chan uuid.UUID] + cc pools.Locked[chan uuid.UUID] + waiting chan struct{} closed bool @@ -27,6 +28,12 @@ type Scheduler struct { mu sync.Mutex } +func MakeScheduler() Scheduler { + return Scheduler{ + waiting: make(chan struct{}, 1), + } +} + func (T *Scheduler) AddWorker(id uuid.UUID) { T.mu.Lock() defer T.mu.Unlock() @@ -156,6 +163,10 @@ func (T *Scheduler) Acquire(user uuid.UUID, sync rob.SyncMode) uuid.UUID { } T.schedule.Set(u.Stride, job) u.Scheduled = true + select { + case T.waiting <- struct{}{}: + default: + } return uuid.Nil, ready }() @@ -223,6 +234,25 @@ func (T *Scheduler) Release(worker uuid.UUID) { T.release(w) } +func (T *Scheduler) Waiting() <-chan struct{} { + return T.waiting +} + +func (T *Scheduler) Waiters() int { + T.mu.Lock() + defer T.mu.Unlock() + + num := 0 + + for _, user := range T.users { + if user.Scheduled { + num++ + } + } + + return num +} + func (T *Scheduler) Close() { T.mu.Lock() defer T.mu.Unlock()