diff --git a/lib/gat/handlers/pool/poolers/lifo/pooler.go b/lib/gat/handlers/pool/poolers/lifo/pooler.go index 90d259ad2a1a9ba8062355817cb83f5d639aa5d0..50da4a3f54c1d3adc831b973f0c5418a7ba5b12b 100644 --- a/lib/gat/handlers/pool/poolers/lifo/pooler.go +++ b/lib/gat/handlers/pool/poolers/lifo/pooler.go @@ -6,24 +6,24 @@ import ( "github.com/google/uuid" "gfx.cafe/gfx/pggat/lib/gat/handlers/pool" + "gfx.cafe/gfx/pggat/lib/util/pools" + "gfx.cafe/gfx/pggat/lib/util/ring" "gfx.cafe/gfx/pggat/lib/util/slices" ) type Pooler struct { waiting chan struct{} - queue []uuid.UUID servers map[uuid.UUID]struct{} - waiters int - ready sync.Cond + queue []uuid.UUID + waiters ring.Ring[chan<- uuid.UUID] + pool pools.Pool[chan uuid.UUID] closed bool mu sync.Mutex } func NewPooler() *Pooler { - return &Pooler{ - waiting: make(chan struct{}, 1), - } + return new(Pooler) } func (*Pooler) AddClient(_ uuid.UUID) {} @@ -36,17 +36,12 @@ func (T *Pooler) AddServer(server uuid.UUID) { T.mu.Lock() defer T.mu.Unlock() - T.queue = append(T.queue, server) - if T.servers == nil { T.servers = make(map[uuid.UUID]struct{}) } T.servers[server] = struct{}{} - if T.ready.L == nil { - T.ready.L = &T.mu - } - T.ready.Signal() + T.Release(server) } func (T *Pooler) DeleteServer(server uuid.UUID) { @@ -59,52 +54,67 @@ func (T *Pooler) DeleteServer(server uuid.UUID) { delete(T.servers, server) } -func (T *Pooler) AcquireBlocking() uuid.UUID { - T.mu.Lock() - defer T.mu.Unlock() +func (T *Pooler) Acquire(_ uuid.UUID) uuid.UUID { + v, c := func() (uuid.UUID, chan uuid.UUID) { + T.mu.Lock() + defer T.mu.Unlock() - if T.closed { - return uuid.Nil - } + if T.closed { + return uuid.Nil, nil + } + + if len(T.queue) > 0 { + worker := T.queue[len(T.queue)-1] + T.queue = T.queue[:len(T.queue)-1] - for len(T.queue) == 0 { - if T.ready.L == nil { - T.ready.L = &T.mu + return worker, nil } - T.waiters++ - select { - case T.waiting <- struct{}{}: - default: + + ready, _ := T.pool.Get() + if ready == nil { + ready = make(chan uuid.UUID, 1) } - T.ready.Wait() - T.waiters-- - } + T.waiters.PushBack(ready) - if T.closed { - return uuid.Nil + return uuid.Nil, ready + }() + + if v != uuid.Nil { + return v } - server := T.queue[len(T.queue)-1] - T.queue = T.queue[:len(T.queue)-1] - return server -} + if c != nil { + var ok bool + v, ok = <-c + if ok { + T.pool.Put(c) + } + } -func (T *Pooler) Acquire(_ uuid.UUID) uuid.UUID { - return T.AcquireBlocking() + return v } -func (T *Pooler) Release(server uuid.UUID) { - T.mu.Lock() - defer T.mu.Unlock() - +func (T *Pooler) release(server uuid.UUID) { // check if server was removed if _, ok := T.servers[server]; !ok { return } + if c, ok := T.waiters.PopFront(); ok { + c <- server + return + } + T.queue = append(T.queue, server) } +func (T *Pooler) Release(server uuid.UUID) { + T.mu.Lock() + defer T.mu.Unlock() + + T.release(server) +} + func (T *Pooler) Waiting() <-chan struct{} { return T.waiting } @@ -113,7 +123,7 @@ func (T *Pooler) Waiters() int { T.mu.Lock() defer T.mu.Unlock() - return T.waiters + return T.waiters.Length() } func (T *Pooler) Close() { @@ -121,10 +131,11 @@ func (T *Pooler) Close() { defer T.mu.Unlock() T.closed = true - if T.ready.L == nil { - T.ready.L = &T.mu + clear(T.servers) + T.queue = T.queue[:0] + for c, ok := T.waiters.PopFront(); ok; c, ok = T.waiters.PopFront() { + close(c) } - T.ready.Broadcast() } var _ pool.Pooler = (*Pooler)(nil)