diff --git a/lib/rob/schedulers/v2/scheduler.go b/lib/rob/schedulers/v2/scheduler.go index 50d89d8594a8c3b7f4318608248c92b129e440c1..9b8d593e8b9272496bb12c29d63a62a4752332bb 100644 --- a/lib/rob/schedulers/v2/scheduler.go +++ b/lib/rob/schedulers/v2/scheduler.go @@ -8,22 +8,21 @@ import ( "gfx.cafe/gfx/pggat/lib/rob" "gfx.cafe/gfx/pggat/lib/rob/schedulers/v2/job" "gfx.cafe/gfx/pggat/lib/rob/schedulers/v2/sink" - "gfx.cafe/gfx/pggat/lib/util/maps" "gfx.cafe/gfx/pggat/lib/util/pools" ) type Scheduler struct { - affinity maps.RWLocked[uuid.UUID, uuid.UUID] - // resource pools ready pools.Locked[chan uuid.UUID] // backlog is the list of user - backlog []job.Stalled - bmu sync.Mutex - sinks map[uuid.UUID]*sink.Sink - closed bool - mu sync.RWMutex + affinity map[uuid.UUID]uuid.UUID + amu sync.RWMutex + backlog []job.Stalled + bmu sync.Mutex + sinks map[uuid.UUID]*sink.Sink + closed bool + mu sync.RWMutex } func (T *Scheduler) AddWorker(worker uuid.UUID) { @@ -71,17 +70,22 @@ func (T *Scheduler) DeleteWorker(worker uuid.UUID) { func (*Scheduler) AddUser(_ uuid.UUID) {} func (T *Scheduler) DeleteUser(user uuid.UUID) { - T.affinity.Delete(user) - T.mu.RLock() defer T.mu.RUnlock() + + T.amu.Lock() + delete(T.affinity, user) + T.amu.Unlock() + for _, v := range T.sinks { v.RemoveUser(user) } } func (T *Scheduler) tryAcquire(j job.Concurrent) uuid.UUID { - affinity, _ := T.affinity.Load(j.User) + T.amu.RLock() + affinity := T.affinity[j.User] + T.amu.RUnlock() // try affinity first if v, ok := T.sinks[affinity]; ok { @@ -93,7 +97,12 @@ func (T *Scheduler) tryAcquire(j job.Concurrent) uuid.UUID { for id, v := range T.sinks { if v.Acquire(j) { // set affinity - T.affinity.Store(j.User, id) + T.amu.Lock() + if T.affinity == nil { + T.affinity = make(map[uuid.UUID]uuid.UUID) + } + T.affinity[j.User] = id + T.amu.Unlock() return id } } @@ -113,7 +122,9 @@ func (T *Scheduler) TryAcquire(j job.Concurrent) uuid.UUID { } func (T *Scheduler) enqueue(j job.Stalled) { - affinity, _ := T.affinity.Load(j.User) + T.amu.RLock() + affinity := T.affinity[j.User] + T.amu.RUnlock() // try affinity first if v, ok := T.sinks[affinity]; ok { @@ -123,7 +134,12 @@ func (T *Scheduler) enqueue(j job.Stalled) { for id, v := range T.sinks { v.Enqueue(j) - T.affinity.Store(j.User, id) + T.amu.Lock() + if T.affinity == nil { + T.affinity = make(map[uuid.UUID]uuid.UUID) + } + T.affinity[j.User] = id + T.amu.Unlock() return } @@ -212,7 +228,12 @@ func (T *Scheduler) stealFor(worker uuid.UUID) { } if src := v.StealFor(s); src != uuid.Nil { - T.affinity.Store(src, worker) + T.amu.Lock() + if T.affinity == nil { + T.affinity = make(map[uuid.UUID]uuid.UUID) + } + T.affinity[src] = worker + T.amu.Unlock() return } }