From aa4273c49816a75679da317aab41ac74298579a7 Mon Sep 17 00:00:00 2001 From: Garet Halliday <me@garet.holiday> Date: Sat, 16 Sep 2023 01:47:36 -0500 Subject: [PATCH] hmm --- lib/gat/pool/pool.go | 14 -------- lib/rob/schedulers/v2/scheduler.go | 58 ++++++++++++++++-------------- 2 files changed, 32 insertions(+), 40 deletions(-) diff --git a/lib/gat/pool/pool.go b/lib/gat/pool/pool.go index 01609369..759260e9 100644 --- a/lib/gat/pool/pool.go +++ b/lib/gat/pool/pool.go @@ -281,21 +281,7 @@ func (T *Pool) acquireServer(client *Client) *Server { if serverID == uuid.Nil { // TODO(garet) can this be run on same thread and only create a goroutine if scaling is possible? go T.scaleUp() - done := make(chan struct{}) - go func() { - start := time.Now() - for { - time.Sleep(1 * time.Second) - select { - case <-done: - return - default: - } - log.Printf("still waiting after %d in pool %p", time.Since(start), T) - } - }() serverID = T.options.Pooler.Acquire(client.GetID(), SyncModeBlocking) - close(done) } T.mu.RLock() diff --git a/lib/rob/schedulers/v2/scheduler.go b/lib/rob/schedulers/v2/scheduler.go index 46090559..05adb9b2 100644 --- a/lib/rob/schedulers/v2/scheduler.go +++ b/lib/rob/schedulers/v2/scheduler.go @@ -8,6 +8,7 @@ import ( "pggat/lib/util/maps" "pggat/lib/util/pools" "sync" + "tuxpa.in/a/zlog/log" ) type Scheduler struct { @@ -36,16 +37,10 @@ func (T *Scheduler) NewWorker() uuid.UUID { } T.sinks[worker] = s - if func() bool { - T.bmu.Lock() - defer T.bmu.Unlock() - if len(T.backlog) > 0 { - s.Enqueue(T.backlog...) - T.backlog = T.backlog[:0] - return true - } - return false - }() { + if len(T.backlog) > 0 { + log.Printf("%p adding %d jobs from backlog", T, len(T.backlog)) + s.Enqueue(T.backlog...) + T.backlog = T.backlog[:0] return worker } @@ -54,14 +49,10 @@ func (T *Scheduler) NewWorker() uuid.UUID { } func (T *Scheduler) DeleteWorker(worker uuid.UUID) { - var s *sink.Sink - var ok bool - func() { - T.mu.Lock() - defer T.mu.Unlock() - s, ok = T.sinks[worker] - delete(T.sinks, worker) - }() + T.mu.Lock() + defer T.mu.Unlock() + s, ok := T.sinks[worker] + delete(T.sinks, worker) if !ok { return } @@ -70,7 +61,11 @@ func (T *Scheduler) DeleteWorker(worker uuid.UUID) { jobs := s.StealAll() for _, j := range jobs { - T.Enqueue(j) + if id := T.tryAcquire(j.Concurrent); id != uuid.Nil { + j.Ready <- id + continue + } + T.enqueue(j) } } @@ -88,12 +83,9 @@ func (T *Scheduler) DeleteUser(user uuid.UUID) { } } -func (T *Scheduler) TryAcquire(j job.Concurrent) uuid.UUID { +func (T *Scheduler) tryAcquire(j job.Concurrent) uuid.UUID { affinity, _ := T.affinity.Load(j.User) - T.mu.RLock() - defer T.mu.RUnlock() - // try affinity first if v, ok := T.sinks[affinity]; ok { if v.Acquire(j) { @@ -115,12 +107,16 @@ func (T *Scheduler) TryAcquire(j job.Concurrent) uuid.UUID { return uuid.Nil } -func (T *Scheduler) Enqueue(j job.Stalled) { - affinity, _ := T.affinity.Load(j.User) - +func (T *Scheduler) TryAcquire(j job.Concurrent) uuid.UUID { T.mu.RLock() defer T.mu.RUnlock() + return T.tryAcquire(j) +} + +func (T *Scheduler) enqueue(j job.Stalled) { + affinity, _ := T.affinity.Load(j.User) + // try affinity first if v, ok := T.sinks[affinity]; ok { v.Enqueue(j) @@ -140,9 +136,19 @@ func (T *Scheduler) Enqueue(j job.Stalled) { // add to backlog T.bmu.Lock() defer T.bmu.Unlock() + log.Printf("%p adding to backlog", T) T.backlog = append(T.backlog, j) } +func (T *Scheduler) Enqueue(j ...job.Stalled) { + T.mu.RLock() + defer T.mu.RUnlock() + + for _, jj := range j { + T.enqueue(jj) + } +} + func (T *Scheduler) Acquire(user uuid.UUID, mode rob.SyncMode) uuid.UUID { switch mode { case rob.SyncModeNonBlocking: -- GitLab