From 1a5388fd61d1193ec8e84d9fea28df933c167b75 Mon Sep 17 00:00:00 2001 From: Garet Halliday <me@garet.holiday> Date: Sat, 16 Sep 2023 02:43:02 -0500 Subject: [PATCH] find where the job went --- lib/rob/schedulers/v2/scheduler.go | 43 +++++++++++++++++++++++++++--- lib/rob/schedulers/v2/sink/sink.go | 25 +++++++++++++++++ 2 files changed, 65 insertions(+), 3 deletions(-) diff --git a/lib/rob/schedulers/v2/scheduler.go b/lib/rob/schedulers/v2/scheduler.go index 168e4e54..e503438a 100644 --- a/lib/rob/schedulers/v2/scheduler.go +++ b/lib/rob/schedulers/v2/scheduler.go @@ -2,13 +2,14 @@ package schedulers import ( "github.com/google/uuid" + "log" "pggat/lib/rob" "pggat/lib/rob/schedulers/v2/job" "pggat/lib/rob/schedulers/v2/sink" "pggat/lib/util/maps" "pggat/lib/util/pools" "sync" - "tuxpa.in/a/zlog/log" + "time" ) type Scheduler struct { @@ -143,7 +144,6 @@ func (T *Scheduler) Enqueue(j ...job.Stalled) { func (T *Scheduler) Acquire(user uuid.UUID, mode rob.SyncMode) uuid.UUID { switch mode { case rob.SyncModeNonBlocking: - defer log.Printf("%p non blocking acquire completed", T) return T.TryAcquire(job.Concurrent{ User: user, }) @@ -161,7 +161,44 @@ func (T *Scheduler) Acquire(user uuid.UUID, mode rob.SyncMode) uuid.UUID { Ready: ready, } T.Enqueue(j) - defer log.Printf("%p blocking acquire completed", T) + + done := make(chan struct{}) + defer func() { + time.Sleep(10 * time.Second) + select { + case <-done: + return + default: + } + + // try to find where this job ended up + T.mu.Lock() + defer T.mu.Unlock() + + for id, s := range T.sinks { + if s.IsScheduled(j.User) { + log.Printf("in %v scheduled", id) + return + } + if s.IsPending(j.User) { + log.Printf("in %v pending", id) + return + } + } + + T.bmu.Lock() + defer T.bmu.Unlock() + + for _, b := range T.backlog { + if b.User == j.User { + log.Printf("in backlog") + return + } + } + + log.Printf("NOWHERE TO BE FOUND") + }() + return <-ready case rob.SyncModeTryNonBlocking: if id := T.Acquire(user, rob.SyncModeNonBlocking); id != uuid.Nil { diff --git a/lib/rob/schedulers/v2/sink/sink.go b/lib/rob/schedulers/v2/sink/sink.go index cca41fbb..a3987e5d 100644 --- a/lib/rob/schedulers/v2/sink/sink.go +++ b/lib/rob/schedulers/v2/sink/sink.go @@ -248,3 +248,28 @@ func (T *Sink) RemoveUser(user uuid.UUID) { delete(T.pending, user) delete(T.stride, user) } + +func (T *Sink) IsScheduled(user uuid.UUID) bool { + T.mu.Lock() + defer T.mu.Unlock() + + for s, j, ok := T.scheduled.Min(); ok; s, j, ok = T.scheduled.Next(s) { + if j.User == user { + return true + } + } + + return false +} + +func (T *Sink) IsPending(user uuid.UUID) bool { + T.mu.Lock() + defer T.mu.Unlock() + + p, ok := T.pending[user] + if !ok { + return false + } + + return p.Length() > 0 +} -- GitLab