diff --git a/lib/gat/handlers/pool/pooler.go b/lib/gat/handlers/pool/pooler.go index ac3c34eab1502bae685ef1d3b82b693fddabf974..4cba15365814da8a822e210169bb0c675b0d58fb 100644 --- a/lib/gat/handlers/pool/pooler.go +++ b/lib/gat/handlers/pool/pooler.go @@ -1,6 +1,10 @@ package pool -import "github.com/google/uuid" +import ( + "time" + + "github.com/google/uuid" +) type Pooler interface { AddClient(id uuid.UUID) @@ -9,7 +13,7 @@ type Pooler interface { AddServer(id uuid.UUID) DeleteServer(server uuid.UUID) - Acquire(client uuid.UUID) (server uuid.UUID) + Acquire(client uuid.UUID, timeout time.Duration) (server uuid.UUID) Release(server uuid.UUID) // Waiting is signalled when a client begins waiting diff --git a/lib/gat/handlers/pool/poolers/lifo/pooler.go b/lib/gat/handlers/pool/poolers/lifo/pooler.go index 50da4a3f54c1d3adc831b973f0c5418a7ba5b12b..2c801f9dcec48658ededf809e6ba83d75d324092 100644 --- a/lib/gat/handlers/pool/poolers/lifo/pooler.go +++ b/lib/gat/handlers/pool/poolers/lifo/pooler.go @@ -2,6 +2,7 @@ package lifo import ( "sync" + "time" "github.com/google/uuid" @@ -23,7 +24,9 @@ type Pooler struct { } func NewPooler() *Pooler { - return new(Pooler) + return &Pooler{ + waiting: make(chan struct{}), + } } func (*Pooler) AddClient(_ uuid.UUID) {} @@ -54,7 +57,7 @@ func (T *Pooler) DeleteServer(server uuid.UUID) { delete(T.servers, server) } -func (T *Pooler) Acquire(_ uuid.UUID) uuid.UUID { +func (T *Pooler) Acquire(_ uuid.UUID, timeout time.Duration) uuid.UUID { v, c := func() (uuid.UUID, chan uuid.UUID) { T.mu.Lock() defer T.mu.Unlock() @@ -76,6 +79,11 @@ func (T *Pooler) Acquire(_ uuid.UUID) uuid.UUID { } T.waiters.PushBack(ready) + select { + case T.waiting <- struct{}{}: + default: + } + return uuid.Nil, ready }() @@ -84,10 +92,44 @@ func (T *Pooler) Acquire(_ uuid.UUID) uuid.UUID { } if c != nil { + var timeoutC <-chan time.Time + if timeout != 0 { + timer := time.NewTimer(timeout) + defer timer.Stop() + timeoutC = timer.C + } + var ok bool - v, ok = <-c - if ok { - T.pool.Put(c) + select { + case v, ok = <-c: + if ok { + T.pool.Put(c) + } + case <-timeoutC: + T.mu.Lock() + defer T.mu.Unlock() + + // try to remove the channel from the queue, we might've lost the race though + waitCount := T.waiters.Length() + var found bool + for i := 0; i < waitCount; i++ { + cc, _ := T.waiters.PopFront() + if c == cc { + found = true + break + } + T.waiters.PushBack(cc) + } + + if found { + T.pool.Put(c) + } else { + // we lost the race :(, we have a worker though + v, ok = <-c + if ok { + T.pool.Put(c) + } + } } } diff --git a/lib/gat/handlers/pool/poolers/rob/pooler.go b/lib/gat/handlers/pool/poolers/rob/pooler.go index 28bd52b2ff2a86daafef558f059ac4e54777223f..e4b58b3b08cc7c057180f3716db95225680e5edc 100644 --- a/lib/gat/handlers/pool/poolers/rob/pooler.go +++ b/lib/gat/handlers/pool/poolers/rob/pooler.go @@ -1,10 +1,11 @@ package rob import ( + "time" + "github.com/google/uuid" "gfx.cafe/gfx/pggat/lib/gat/handlers/pool" - "gfx.cafe/gfx/pggat/lib/rob" "gfx.cafe/gfx/pggat/lib/rob/schedulers/v3" ) @@ -34,8 +35,8 @@ func (T *Pooler) DeleteServer(server uuid.UUID) { T.s.DeleteWorker(server) } -func (T *Pooler) Acquire(client uuid.UUID) (server uuid.UUID) { - return T.s.Acquire(client, rob.SyncModeTryNonBlocking) +func (T *Pooler) Acquire(client uuid.UUID, timeout time.Duration) (server uuid.UUID) { + return T.s.Acquire(client, timeout) } func (T *Pooler) Release(server uuid.UUID) { diff --git a/lib/gat/handlers/pool/spool/pool.go b/lib/gat/handlers/pool/spool/pool.go index 3815bbf97e738a9571af5d98bc7555aec573fbd4..54f7d4742b6dec0ac69b9834417fac58769f14b1 100644 --- a/lib/gat/handlers/pool/spool/pool.go +++ b/lib/gat/handlers/pool/spool/pool.go @@ -252,7 +252,7 @@ func (T *Pool) RemoveClient(client uuid.UUID) { func (T *Pool) Acquire(client uuid.UUID) *Server { for { - serverID := T.pooler.Acquire(client) + serverID := T.pooler.Acquire(client, 0) // TODO(garet) timeout in config if serverID == uuid.Nil { return nil } diff --git a/lib/rob/scheduler.go b/lib/rob/scheduler.go index 97fcadd721142da07b537c7ac78b358bc0376ab1..86800a53503c9503a69dffc3b998ae60551e9ff4 100644 --- a/lib/rob/scheduler.go +++ b/lib/rob/scheduler.go @@ -1,20 +1,13 @@ package rob import ( + "time" + "github.com/google/uuid" ) type SyncMode int -const ( - // SyncModeNonBlocking will attempt to acquire a worker without blocking - SyncModeNonBlocking SyncMode = iota - // SyncModeBlocking will block to acquire a worker - SyncModeBlocking - // SyncModeTryNonBlocking will attempt to acquire without blocking first, then fallback to blocking if none were available - SyncModeTryNonBlocking -) - type Scheduler interface { AddWorker(id uuid.UUID) DeleteWorker(worker uuid.UUID) @@ -22,8 +15,8 @@ type Scheduler interface { AddUser(id uuid.UUID) DeleteUser(user uuid.UUID) - // Acquire will acquire a worker with the desired SyncMode - Acquire(user uuid.UUID, sync SyncMode) uuid.UUID + // Acquire will acquire a worker with timeout + Acquire(user uuid.UUID, timeout time.Duration) uuid.UUID // Release will release a worker. // This should be called after acquire unless the worker is removed with RemoveWorker diff --git a/lib/rob/schedulers/v3/scheduler.go b/lib/rob/schedulers/v3/scheduler.go index f96e1e7844935d616f5bf6bbfee14b2d74248787..a1624a41c9a167775ef4d36a527ec4bdc5e26c74 100644 --- a/lib/rob/schedulers/v3/scheduler.go +++ b/lib/rob/schedulers/v3/scheduler.go @@ -50,7 +50,7 @@ func (T *Scheduler) AddWorker(id uuid.UUID) { } T.workers[id] = worker - T.release(worker) + T.releaseWorker(worker) } func (T *Scheduler) DeleteWorker(worker uuid.UUID) { @@ -116,7 +116,7 @@ func (T *Scheduler) DeleteUser(user uuid.UUID) { } } -func (T *Scheduler) Acquire(user uuid.UUID, sync rob.SyncMode) uuid.UUID { +func (T *Scheduler) Acquire(user uuid.UUID, timeout time.Duration) uuid.UUID { v, c := func() (uuid.UUID, chan uuid.UUID) { T.mu.Lock() defer T.mu.Unlock() @@ -141,10 +141,6 @@ func (T *Scheduler) Acquire(user uuid.UUID, sync rob.SyncMode) uuid.UUID { return worker.ID, nil } - if sync == rob.SyncModeNonBlocking { - return uuid.Nil, nil - } - ready, _ := T.cc.Get() if ready == nil { ready = make(chan uuid.UUID, 1) @@ -176,17 +172,64 @@ func (T *Scheduler) Acquire(user uuid.UUID, sync rob.SyncMode) uuid.UUID { } if c != nil { + var timeoutC <-chan time.Time + if timeout != 0 { + timer := time.NewTimer(timeout) + defer timer.Stop() + timeoutC = timer.C + } + var ok bool - v, ok = <-c - if ok { - T.cc.Put(c) + select { + case v, ok = <-c: + if ok { + T.cc.Put(c) + } + case <-timeoutC: + T.mu.Lock() + defer T.mu.Unlock() + + // try to remove the job from the queue, we might've lost the race though + var u *User + u, ok = T.users[user] + if !ok { + // we were removed? probably fine + select { + case v, ok = <-c: + // we got a job but we're removed so let's just give it back + if ok { + T.cc.Put(c) + } + + if v != uuid.Nil { + T.release(v) + } + return uuid.Nil + default: + // we were removed before we got a job + return uuid.Nil + } + } + + _, ok = T.schedule.Get(u.Stride) + if ok { + u.Scheduled = false + T.schedule.Delete(u.Stride) + T.cc.Put(c) + } else { + // we lost the race, but we got a worker + v, ok = <-c + if ok { + T.cc.Put(c) + } + } } } return v } -func (T *Scheduler) release(worker *Worker) { +func (T *Scheduler) releaseWorker(worker *Worker) { now := time.Now() // update prev user and state @@ -218,10 +261,7 @@ func (T *Scheduler) release(worker *Worker) { job.Ready <- worker.ID } -func (T *Scheduler) Release(worker uuid.UUID) { - T.mu.Lock() - defer T.mu.Unlock() - +func (T *Scheduler) release(worker uuid.UUID) { if T.closed { return } @@ -231,7 +271,14 @@ func (T *Scheduler) Release(worker uuid.UUID) { return } - T.release(w) + T.releaseWorker(w) +} + +func (T *Scheduler) Release(worker uuid.UUID) { + T.mu.Lock() + defer T.mu.Unlock() + + T.release(worker) } func (T *Scheduler) Waiting() <-chan struct{} {