diff --git a/lib/rob/schedulers/v2/pool/pool.go b/lib/rob/schedulers/v2/pool/pool.go index 3c73b10968296c598b881508597296eee39e75c7..d5cf5db032e6a50d0f39c707054602eb1399ce94 100644 --- a/lib/rob/schedulers/v2/pool/pool.go +++ b/lib/rob/schedulers/v2/pool/pool.go @@ -7,12 +7,17 @@ import ( "pggat2/lib/rob" "pggat2/lib/rob/schedulers/v2/job" - "pggat2/lib/rob/schedulers/v2/sink" + "pggat2/lib/rob/schedulers/v2/queue" ) +type constrainedQueue struct { + queue *queue.Queue + constraints rob.Constraints +} + type Pool struct { affinity map[uuid.UUID]uuid.UUID - sinks map[uuid.UUID]*sink.Sink + queues map[uuid.UUID]constrainedQueue backorder []job.Job mu sync.Mutex } @@ -20,25 +25,25 @@ type Pool struct { func MakePool() Pool { return Pool{ affinity: make(map[uuid.UUID]uuid.UUID), - sinks: make(map[uuid.UUID]*sink.Sink), + queues: make(map[uuid.UUID]constrainedQueue), } } -func (T *Pool) NewSink(constraints rob.Constraints) *sink.Sink { - id := uuid.New() - snk := sink.NewSink(constraints, func() { - T.stealFor(id) - }) +func (T *Pool) NewQueue(id uuid.UUID, constraints rob.Constraints) *queue.Queue { + q := queue.NewQueue() T.mu.Lock() defer T.mu.Unlock() - T.sinks[id] = snk + T.queues[id] = constrainedQueue{ + queue: q, + constraints: constraints, + } i := 0 for _, j := range T.backorder { if constraints.Satisfies(j.Constraints) { - snk.Queue(j) + q.Queue(j) } else { T.backorder[i] = j i++ @@ -46,34 +51,34 @@ func (T *Pool) NewSink(constraints rob.Constraints) *sink.Sink { } T.backorder = T.backorder[:i] - return snk + return q } func (T *Pool) Schedule(work job.Job) { T.mu.Lock() defer T.mu.Unlock() - if len(T.sinks) == 0 { + if len(T.queues) == 0 { T.backorder = append(T.backorder, work) return } - var snk *sink.Sink + var q constrainedQueue affinity, ok := T.affinity[work.Source] if ok { - snk = T.sinks[affinity] + q = T.queues[affinity] } - if !ok || !snk.Constraints().Satisfies(work.Constraints) || !snk.Idle() { + if !ok || !q.constraints.Satisfies(work.Constraints) || !q.queue.Idle() { // choose a new affinity that satisfies constraints ok = false - for id, s := range T.sinks { - if s.Constraints().Satisfies(work.Constraints) { + for id, s := range T.queues { + if s.constraints.Satisfies(work.Constraints) { current := id == affinity - snk = s + q = s affinity = id ok = true - if !current && s.Idle() { + if !current && s.queue.Idle() { // prefer idle core, if not idle try to see if we can find one that is break } @@ -87,25 +92,23 @@ func (T *Pool) Schedule(work job.Job) { } // yay, queued - snk.Queue(work) + q.queue.Queue(work) } -func (T *Pool) stealFor(id uuid.UUID) { +func (T *Pool) StealFor(id uuid.UUID) { T.mu.Lock() defer T.mu.Unlock() - snk, ok := T.sinks[id] + q, ok := T.queues[id] if !ok { return } - constraints := snk.Constraints() - - for _, s := range T.sinks { - if s == snk { + for _, s := range T.queues { + if s == q { continue } - works, ok := s.Steal(constraints) + works, ok := s.queue.Steal(q.constraints) if !ok { continue } @@ -114,7 +117,7 @@ func (T *Pool) stealFor(id uuid.UUID) { T.affinity[source] = id } for _, work := range works { - snk.Queue(work) + q.queue.Queue(work) } break } diff --git a/lib/rob/schedulers/v2/queue/queue.go b/lib/rob/schedulers/v2/queue/queue.go new file mode 100644 index 0000000000000000000000000000000000000000..17bbd952bd848be15bee433246f41658987493fa --- /dev/null +++ b/lib/rob/schedulers/v2/queue/queue.go @@ -0,0 +1,175 @@ +package queue + +import ( + "sync" + "time" + + "github.com/google/uuid" + + "pggat2/lib/rob" + "pggat2/lib/rob/schedulers/v2/job" + "pggat2/lib/util/rbtree" + "pggat2/lib/util/ring" +) + +type Queue struct { + signal chan struct{} + + active uuid.UUID + start time.Time + + floor time.Duration + + stride map[uuid.UUID]time.Duration + pending map[uuid.UUID]*ring.Ring[job.Job] + scheduled rbtree.RBTree[time.Duration, job.Job] + mu sync.Mutex +} + +func NewQueue() *Queue { + return &Queue{ + stride: make(map[uuid.UUID]time.Duration), + pending: make(map[uuid.UUID]*ring.Ring[job.Job]), + signal: make(chan struct{}), + } +} + +func (T *Queue) Idle() bool { + T.mu.Lock() + defer T.mu.Unlock() + + return T.active == uuid.Nil +} + +func (T *Queue) HasWork() bool { + T.mu.Lock() + defer T.mu.Unlock() + + _, _, ok := T.scheduled.Min() + return ok +} + +func (T *Queue) Queue(work job.Job) { + T.mu.Lock() + defer T.mu.Unlock() + + // try to schedule right away + if ok := T.scheduleWork(work); ok { + return + } + + // add to pending queue + if _, ok := T.pending[work.Source]; !ok { + T.pending[work.Source] = new(ring.Ring[job.Job]) + } + + T.pending[work.Source].PushBack(work) +} + +// schedule the next work for source +func (T *Queue) schedule(source uuid.UUID) { + pending, ok := T.pending[source] + if !ok { + return + } + work, ok := pending.Get(0) + if !ok { + return + } + if ok = T.scheduleWork(work); !ok { + return + } + pending.PopFront() +} + +func (T *Queue) scheduleWork(work job.Job) bool { + if T.active == work.Source { + return false + } + + stride := T.stride[work.Source] + if stride < T.floor { + stride = T.floor + T.stride[work.Source] = stride + } + + for { + // find unique stride to schedule on + if j, ok := T.scheduled.Get(stride); ok { + if j.Source == work.Source { + return false + } + stride += 1 + continue + } + + T.scheduled.Set(stride, work) + break + } + + // signal that more work is available if someone is waiting + select { + case T.signal <- struct{}{}: + default: + } + + return true +} + +// Steal work from this Sink that is satisfied by constraints +func (T *Queue) Steal(constraints rob.Constraints) ([]job.Job, bool) { + T.mu.Lock() + defer T.mu.Unlock() + + iter := T.scheduled.Iter() + for stride, work, ok := iter(); ok; stride, work, ok = iter() { + if constraints.Satisfies(work.Constraints) { + // steal it + T.scheduled.Delete(stride) + + // steal pending + pending, _ := T.pending[work.Source] + + jobs := make([]job.Job, 0, pending.Length()+1) + jobs = append(jobs, work) + + for work, ok = pending.PopFront(); ok; work, ok = pending.PopFront() { + jobs = append(jobs, work) + } + + return jobs, true + } + } + + // no stealable work + return nil, false +} + +func (T *Queue) Ready() <-chan struct{} { + return T.signal +} + +func (T *Queue) Read() (job.Job, bool) { + T.mu.Lock() + defer T.mu.Unlock() + + if T.active != uuid.Nil { + source := T.active + dur := time.Since(T.start) + T.active = uuid.Nil + + T.stride[source] += dur + T.schedule(source) + } + + stride, j, ok := T.scheduled.Min() + if !ok { + return job.Job{}, false + } + T.scheduled.Delete(stride) + T.floor = stride + + T.active = j.Source + T.start = time.Now() + return j, true +} diff --git a/lib/rob/schedulers/v2/scheduler.go b/lib/rob/schedulers/v2/scheduler.go index acc0047756ebd9d524e661425948326fed715bb6..9f61c98afd7f1b2bbcbbf930da490ff03ce4fb45 100644 --- a/lib/rob/schedulers/v2/scheduler.go +++ b/lib/rob/schedulers/v2/scheduler.go @@ -1,8 +1,11 @@ package schedulers import ( + "github.com/google/uuid" + "pggat2/lib/rob" "pggat2/lib/rob/schedulers/v2/pool" + "pggat2/lib/rob/schedulers/v2/sink" "pggat2/lib/rob/schedulers/v2/source" ) @@ -17,7 +20,9 @@ func NewScheduler() *Scheduler { } func (T *Scheduler) NewSink(fulfills rob.Constraints) rob.Sink { - return T.pool.NewSink(fulfills) + id := uuid.New() + q := T.pool.NewQueue(id, fulfills) + return sink.NewSink(id, &T.pool, q) } func (T *Scheduler) NewSource() rob.Source { diff --git a/lib/rob/schedulers/v2/sink/sink.go b/lib/rob/schedulers/v2/sink/sink.go index 5aab359f7ed552862327385ba2768ff71aaff745..01d3cb6d7669ead5bb13f478d86228998f015d1d 100644 --- a/lib/rob/schedulers/v2/sink/sink.go +++ b/lib/rob/schedulers/v2/sink/sink.go @@ -1,188 +1,44 @@ package sink import ( - "sync" - "time" - "github.com/google/uuid" "pggat2/lib/rob" - "pggat2/lib/rob/schedulers/v2/job" - "pggat2/lib/util/rbtree" - "pggat2/lib/util/ring" + "pggat2/lib/rob/schedulers/v2/pool" + "pggat2/lib/rob/schedulers/v2/queue" ) type Sink struct { - constraints rob.Constraints - steal func() - - active uuid.UUID - start time.Time - - floor time.Duration - - stride map[uuid.UUID]time.Duration - pending map[uuid.UUID]*ring.Ring[job.Job] - scheduled rbtree.RBTree[time.Duration, job.Job] - signal chan struct{} - mu sync.Mutex + id uuid.UUID + pool *pool.Pool + queue *queue.Queue } -func NewSink(constraints rob.Constraints, steal func()) *Sink { +func NewSink(id uuid.UUID, p *pool.Pool, q *queue.Queue) *Sink { return &Sink{ - constraints: constraints, - steal: steal, - stride: make(map[uuid.UUID]time.Duration), - pending: make(map[uuid.UUID]*ring.Ring[job.Job]), - signal: make(chan struct{}), + id: id, + pool: p, + queue: q, } } -func (T *Sink) Constraints() rob.Constraints { - // no lock needed because these never change - return T.constraints -} - -func (T *Sink) Idle() bool { - T.mu.Lock() - defer T.mu.Unlock() - - return T.active == uuid.Nil -} - -func (T *Sink) Queue(work job.Job) { - T.mu.Lock() - defer T.mu.Unlock() - - // try to schedule right away - if ok := T.scheduleWork(work); ok { - return - } - - // add to pending queue - if _, ok := T.pending[work.Source]; !ok { - T.pending[work.Source] = new(ring.Ring[job.Job]) - } - - T.pending[work.Source].PushBack(work) -} - -// schedule the next work for source -func (T *Sink) schedule(source uuid.UUID) { - pending, ok := T.pending[source] - if !ok { - return - } - work, ok := pending.Get(0) - if !ok { - return - } - if ok = T.scheduleWork(work); !ok { - return - } - pending.PopFront() -} - -func (T *Sink) scheduleWork(work job.Job) bool { - if T.active == work.Source { - return false - } - - stride := T.stride[work.Source] - if stride < T.floor { - stride = T.floor - T.stride[work.Source] = stride - } - - for { - // find unique stride to schedule on - if j, ok := T.scheduled.Get(stride); ok { - if j.Source == work.Source { - return false - } - stride += 1 - continue - } - - T.scheduled.Set(stride, work) - break - } - - // signal that more work is available if someone is waiting - select { - case T.signal <- struct{}{}: - default: - } - - return true -} - -// Steal work from this Sink that is satisfied by constraints -func (T *Sink) Steal(constraints rob.Constraints) ([]job.Job, bool) { - T.mu.Lock() - defer T.mu.Unlock() - - iter := T.scheduled.Iter() - for stride, work, ok := iter(); ok; stride, work, ok = iter() { - if constraints.Satisfies(work.Constraints) { - // steal it - T.scheduled.Delete(stride) - - // steal pending - pending, _ := T.pending[work.Source] - - jobs := make([]job.Job, 0, pending.Length()+1) - jobs = append(jobs, work) - - for work, ok = pending.PopFront(); ok; work, ok = pending.PopFront() { - jobs = append(jobs, work) - } - - return jobs, true - } - } - - // no stealable work - return nil, false -} - func (T *Sink) findWork() { - T.mu.Unlock() - T.steal() - T.mu.Lock() - if _, _, ok := T.scheduled.Min(); ok { + T.pool.StealFor(T.id) + // see if we stole some work + if T.queue.HasWork() { return } - T.mu.Unlock() - <-T.signal - T.mu.Lock() + // there is no work to steal, wait until some more is scheduled + <-T.queue.Ready() } func (T *Sink) Read() any { - T.mu.Lock() - defer T.mu.Unlock() - - if T.active != uuid.Nil { - source := T.active - dur := time.Since(T.start) - T.active = uuid.Nil - - T.stride[source] += dur - T.schedule(source) - } - for { - stride, j, ok := T.scheduled.Min() - if !ok { - T.findWork() - continue + v, ok := T.queue.Read() + if ok { + return v.Work } - T.scheduled.Delete(stride) - T.floor = stride - - T.active = j.Source - T.start = time.Now() - return j.Work + T.findWork() } }