diff --git a/lib/gat/pool/pool.go b/lib/gat/pool/pool.go index ff18e1f487da53d02fce24564cce803365c5ed45..e31d69fd54ec50b09c42e2415186cc4f0885ba01 100644 --- a/lib/gat/pool/pool.go +++ b/lib/gat/pool/pool.go @@ -289,7 +289,7 @@ func (T *Pool) acquireServer(client *Client) *Server { return default: } - log.Printf("still waiting after %d in pool %p", start, T) + log.Printf("still waiting after %d in pool %p", time.Since(start), T) } }() serverID = T.options.Pooler.Acquire(client.GetID(), SyncModeBlocking) diff --git a/lib/rob/schedulers/v2/scheduler.go b/lib/rob/schedulers/v2/scheduler.go index 23421be1992551aae7bbffb0cdf08d4dc57afdba..4609055908f7d4072759bc850abda354bae0ce2f 100644 --- a/lib/rob/schedulers/v2/scheduler.go +++ b/lib/rob/schedulers/v2/scheduler.go @@ -36,11 +36,16 @@ func (T *Scheduler) NewWorker() uuid.UUID { } T.sinks[worker] = s - if len(T.backlog) > 0 { - for _, v := range T.backlog { - s.Enqueue(v) + if func() bool { + T.bmu.Lock() + defer T.bmu.Unlock() + if len(T.backlog) > 0 { + s.Enqueue(T.backlog...) + T.backlog = T.backlog[:0] + return true } - T.backlog = T.backlog[:0] + return false + }() { return worker } diff --git a/lib/rob/schedulers/v2/sink/sink.go b/lib/rob/schedulers/v2/sink/sink.go index 70e34356eb1315c5d088087103e66cd5f1ee0167..8445c6d25baec59473d68d45c2e94d106177edb5 100644 --- a/lib/rob/schedulers/v2/sink/sink.go +++ b/lib/rob/schedulers/v2/sink/sink.go @@ -50,28 +50,35 @@ func (T *Sink) schedule(j job.Stalled) bool { T.stride[j.User] = stride } else if stride < T.floor { stride = T.floor - if T.stride == nil { - T.stride = make(map[uuid.UUID]time.Duration) - } T.stride[j.User] = stride } for { // find unique stride to schedule on - if s, ok := T.scheduled.Get(stride); ok { - if s.User == j.User { - return false - } - stride += 1 - continue + s, ok := T.scheduled.Get(stride) + if !ok { + break } - T.scheduled.Set(stride, j) - return true + if s.User == j.User { + return false + } + stride += 1 + continue } + + T.scheduled.Set(stride, j) + return true } func (T *Sink) enqueue(j job.Stalled) { + if T.active == uuid.Nil { + // run it now + T.acquire(j.User) + j.Ready <- T.id + return + } + if T.schedule(j) { return } @@ -90,20 +97,14 @@ func (T *Sink) enqueue(j job.Stalled) { p.PushBack(j) } -func (T *Sink) Enqueue(j job.Stalled) { +func (T *Sink) Enqueue(j ...job.Stalled) { // enqueue job T.mu.Lock() defer T.mu.Unlock() - if T.active == uuid.Nil { - // run it now - T.acquire(j.User) - j.Ready <- T.id - return + for _, jj := range j { + T.enqueue(jj) } - - // enqueue for later - T.enqueue(j) } func (T *Sink) acquire(user uuid.UUID) { @@ -229,11 +230,13 @@ func (T *Sink) StealFor(rhs *Sink) uuid.UUID { T.mu.Unlock() - rhs.Enqueue(j) + rhs.mu.Lock() + defer rhs.mu.Unlock() + rhs.enqueue(j) if pending != nil { for j, ok = pending.PopFront(); ok; j, ok = pending.PopFront() { - rhs.Enqueue(j) + rhs.enqueue(j) } }