good morning!!!!

Skip to content
Snippets Groups Projects
Commit b74daa34 authored by Garet Halliday's avatar Garet Halliday
Browse files

don't use sync.Cond so we can have a timeout

parent 6096dbf7
No related branches found
No related tags found
No related merge requests found
Pipeline #71424 failed with stages
in 13 minutes and 56 seconds
......@@ -6,24 +6,24 @@ import (
"github.com/google/uuid"
"gfx.cafe/gfx/pggat/lib/gat/handlers/pool"
"gfx.cafe/gfx/pggat/lib/util/pools"
"gfx.cafe/gfx/pggat/lib/util/ring"
"gfx.cafe/gfx/pggat/lib/util/slices"
)
type Pooler struct {
waiting chan struct{}
queue []uuid.UUID
servers map[uuid.UUID]struct{}
waiters int
ready sync.Cond
queue []uuid.UUID
waiters ring.Ring[chan<- uuid.UUID]
pool pools.Pool[chan uuid.UUID]
closed bool
mu sync.Mutex
}
func NewPooler() *Pooler {
return &Pooler{
waiting: make(chan struct{}, 1),
}
return new(Pooler)
}
func (*Pooler) AddClient(_ uuid.UUID) {}
......@@ -36,17 +36,12 @@ func (T *Pooler) AddServer(server uuid.UUID) {
T.mu.Lock()
defer T.mu.Unlock()
T.queue = append(T.queue, server)
if T.servers == nil {
T.servers = make(map[uuid.UUID]struct{})
}
T.servers[server] = struct{}{}
if T.ready.L == nil {
T.ready.L = &T.mu
}
T.ready.Signal()
T.Release(server)
}
func (T *Pooler) DeleteServer(server uuid.UUID) {
......@@ -59,52 +54,67 @@ func (T *Pooler) DeleteServer(server uuid.UUID) {
delete(T.servers, server)
}
func (T *Pooler) AcquireBlocking() uuid.UUID {
T.mu.Lock()
defer T.mu.Unlock()
func (T *Pooler) Acquire(_ uuid.UUID) uuid.UUID {
v, c := func() (uuid.UUID, chan uuid.UUID) {
T.mu.Lock()
defer T.mu.Unlock()
if T.closed {
return uuid.Nil
}
if T.closed {
return uuid.Nil, nil
}
if len(T.queue) > 0 {
worker := T.queue[len(T.queue)-1]
T.queue = T.queue[:len(T.queue)-1]
for len(T.queue) == 0 {
if T.ready.L == nil {
T.ready.L = &T.mu
return worker, nil
}
T.waiters++
select {
case T.waiting <- struct{}{}:
default:
ready, _ := T.pool.Get()
if ready == nil {
ready = make(chan uuid.UUID, 1)
}
T.ready.Wait()
T.waiters--
}
T.waiters.PushBack(ready)
if T.closed {
return uuid.Nil
return uuid.Nil, ready
}()
if v != uuid.Nil {
return v
}
server := T.queue[len(T.queue)-1]
T.queue = T.queue[:len(T.queue)-1]
return server
}
if c != nil {
var ok bool
v, ok = <-c
if ok {
T.pool.Put(c)
}
}
func (T *Pooler) Acquire(_ uuid.UUID) uuid.UUID {
return T.AcquireBlocking()
return v
}
func (T *Pooler) Release(server uuid.UUID) {
T.mu.Lock()
defer T.mu.Unlock()
func (T *Pooler) release(server uuid.UUID) {
// check if server was removed
if _, ok := T.servers[server]; !ok {
return
}
if c, ok := T.waiters.PopFront(); ok {
c <- server
return
}
T.queue = append(T.queue, server)
}
func (T *Pooler) Release(server uuid.UUID) {
T.mu.Lock()
defer T.mu.Unlock()
T.release(server)
}
func (T *Pooler) Waiting() <-chan struct{} {
return T.waiting
}
......@@ -113,7 +123,7 @@ func (T *Pooler) Waiters() int {
T.mu.Lock()
defer T.mu.Unlock()
return T.waiters
return T.waiters.Length()
}
func (T *Pooler) Close() {
......@@ -121,10 +131,11 @@ func (T *Pooler) Close() {
defer T.mu.Unlock()
T.closed = true
if T.ready.L == nil {
T.ready.L = &T.mu
clear(T.servers)
T.queue = T.queue[:0]
for c, ok := T.waiters.PopFront(); ok; c, ok = T.waiters.PopFront() {
close(c)
}
T.ready.Broadcast()
}
var _ pool.Pooler = (*Pooler)(nil)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment