diff --git a/lib/gat/pool/pool.go b/lib/gat/pool/pool.go index 3bf2462395acf89065633892ad57d57e3baa087e..f5223b2d72bbe8f1065700846d6650a1621a7c3d 100644 --- a/lib/gat/pool/pool.go +++ b/lib/gat/pool/pool.go @@ -146,25 +146,7 @@ func (T *Pool) removeRecipe(name string) { } } -var ( - numStarted atomic.Int64 - numSuccessful atomic.Int64 - numCancelled atomic.Int64 - numFailed atomic.Int64 -) - -func init() { - go func() { - for { - time.Sleep(1 * time.Second) - log.Printf("%d started, %d successful, %d cancelled, %d failed", numStarted.Load(), numSuccessful.Load(), numCancelled.Load(), numFailed.Load()) - } - }() -} - func (T *Pool) scaleUp() { - numStarted.Add(1) - backoff := T.options.ServerReconnectInitialTime backingOff := false @@ -177,14 +159,12 @@ func (T *Pool) scaleUp() { for { select { case <-T.closed: - numCancelled.Add(1) return default: } if !backingOff && T.backingOff.Load() { // already in backoff - numCancelled.Add(1) return } @@ -208,7 +188,6 @@ func (T *Pool) scaleUp() { if r != nil { err := T.scaleUpL1(name, r) if err == nil { - numSuccessful.Add(1) return } @@ -217,13 +196,11 @@ func (T *Pool) scaleUp() { if backoff == 0 { // no backoff - numFailed.Add(1) return } if !backingOff { if T.backingOff.Swap(true) { - numCancelled.Add(1) return } backingOff = true diff --git a/lib/rob/schedulers/v2/scheduler.go b/lib/rob/schedulers/v2/scheduler.go index d5efd37e03cbbee32305afa70b539df17261a3f2..08ca6d741e6cd5fd6f737e0541447c46ee2a2212 100644 --- a/lib/rob/schedulers/v2/scheduler.go +++ b/lib/rob/schedulers/v2/scheduler.go @@ -30,30 +30,22 @@ func (T *Scheduler) NewWorker() uuid.UUID { s := sink.NewSink(worker) - if func() bool { - T.mu.Lock() - defer T.mu.Unlock() - // if mu is locked, we don't need to lock bmu, because we are the only accessor - if T.sinks == nil { - T.sinks = make(map[uuid.UUID]*sink.Sink) - } - T.sinks[worker] = s - - if len(T.backlog) == 0 { - return false - } + T.mu.Lock() + defer T.mu.Unlock() + // if mu is locked, we don't need to lock bmu, because we are the only accessor + if T.sinks == nil { + T.sinks = make(map[uuid.UUID]*sink.Sink) + } + T.sinks[worker] = s + if len(T.backlog) > 0 { for _, v := range T.backlog { s.Enqueue(v) } T.backlog = T.backlog[:0] - return true - }() { return worker } - T.mu.RLock() - defer T.mu.RUnlock() T.stealFor(worker) return worker } @@ -96,33 +88,27 @@ func (T *Scheduler) DeleteUser(user uuid.UUID) { func (T *Scheduler) TryAcquire(j job.Concurrent) uuid.UUID { affinity, _ := T.affinity.Load(j.User) - // these can be unlocked and locked a bunch here because it is less bad if ExecuteConcurrent misses a sink - // (it will just stall the job and try again) T.mu.RLock() + defer T.mu.RUnlock() // try affinity first if v, ok := T.sinks[affinity]; ok { - T.mu.RUnlock() if v.Acquire(j) { return affinity } - T.mu.RLock() } for id, v := range T.sinks { if id == affinity { continue } - T.mu.RUnlock() if v.Acquire(j) { // set affinity T.affinity.Store(j.User, id) return id } - T.mu.RLock() } - T.mu.RUnlock() return uuid.Nil } @@ -200,7 +186,7 @@ func (T *Scheduler) Release(worker uuid.UUID) { } } -// stealFor will try to steal work for the specified worker. RLock Scheduler.mu before executing +// stealFor will try to steal work for the specified worker. Lock Scheduler.mu before executing func (T *Scheduler) stealFor(worker uuid.UUID) { s, ok := T.sinks[worker] if !ok { @@ -212,15 +198,8 @@ func (T *Scheduler) stealFor(worker uuid.UUID) { continue } - if func() bool { - T.mu.RUnlock() - defer T.mu.RLock() - if src := v.StealFor(s); src != uuid.Nil { - T.affinity.Store(src, worker) - return true - } - return false - }() { + if src := v.StealFor(s); src != uuid.Nil { + T.affinity.Store(src, worker) return } }