From 394c321624a1056e37fcb34ea38443706461344b Mon Sep 17 00:00:00 2001 From: Garet Halliday <me@garet.holiday> Date: Mon, 15 May 2023 15:24:11 -0500 Subject: [PATCH] rob v3 (renamed to v0 because of new api) fixes #1 --- lib/rob/schedulers/v0/job/job.go | 9 +- lib/rob/schedulers/v0/pool/pool.go | 97 ++++++- lib/rob/schedulers/v0/scheduler.go | 13 +- lib/rob/schedulers/v0/scheduler_test.go | 356 ++++++++++++++++++++++++ lib/rob/schedulers/v0/sink/sink.go | 156 ++++++++++- lib/rob/schedulers/v0/source/source.go | 34 ++- lib/util/maps/rwlocked.go | 75 +++++ lib/util/pools/locked.go | 25 ++ 8 files changed, 728 insertions(+), 37 deletions(-) create mode 100644 lib/rob/schedulers/v0/scheduler_test.go create mode 100644 lib/util/maps/rwlocked.go create mode 100644 lib/util/pools/locked.go diff --git a/lib/rob/schedulers/v0/job/job.go b/lib/rob/schedulers/v0/job/job.go index 81424f14..b05773ac 100644 --- a/lib/rob/schedulers/v0/job/job.go +++ b/lib/rob/schedulers/v0/job/job.go @@ -2,11 +2,18 @@ package job import ( "github.com/google/uuid" + "pggat2/lib/rob" ) -type Job struct { +type Concurrent struct { Source uuid.UUID Constraints rob.Constraints Work any } + +type Stalled struct { + Source uuid.UUID + Constraints rob.Constraints + Out chan<- rob.Worker +} diff --git a/lib/rob/schedulers/v0/pool/pool.go b/lib/rob/schedulers/v0/pool/pool.go index 7c342e0a..1da00e02 100644 --- a/lib/rob/schedulers/v0/pool/pool.go +++ b/lib/rob/schedulers/v0/pool/pool.go @@ -1,35 +1,102 @@ package pool import ( + "sync" + "github.com/google/uuid" + "pggat2/lib/rob/schedulers/v0/job" "pggat2/lib/rob/schedulers/v0/sink" - "sync" + "pggat2/lib/util/maps" ) type Pool struct { - sinks map[uuid.UUID]*sink.Sink - mu sync.RWMutex + affinity maps.RWLocked[uuid.UUID, uuid.UUID] + + backlog []job.Stalled + bmu sync.Mutex + + sinks maps.RWLocked[uuid.UUID, *sink.Sink] +} + +func MakePool() Pool { + return Pool{} +} + +// DoConcurrent attempts to do the work now. +// Returns true if the work was done, otherwise the sender should stall the work. +func (T *Pool) DoConcurrent(j job.Concurrent) (done bool) { + affinity, _ := T.affinity.Load(j.Source) + + // try affinity first + if v, ok := T.sinks.Load(affinity); ok { + if done = v.DoConcurrent(j); done { + return + } + } + + T.sinks.Range(func(id uuid.UUID, v *sink.Sink) bool { + if id == affinity { + return true + } + if done = v.DoConcurrent(j); done { + // set affinity + T.affinity.Store(j.Source, id) + return false + } + return true + }) + if done { + return + } + + return false } -// Do attempts to do the work. -// Returns true if the work was done, otherwise the sender should wait on their stall chan for the next node -func (T *Pool) Do(j job.Job) bool { - T.mu.RLock() - defer T.mu.RUnlock() +// DoStalled queues a job to be done eventually +func (T *Pool) DoStalled(j job.Stalled) { + affinity, _ := T.affinity.Load(j.Source) - // TODO(garet) choose affinity, prefer idle nodes - for _, v := range T.sinks { - if v.DoIfIdle(j) { + // try affinity first + if v, ok := T.sinks.Load(affinity); ok { + if ok = v.DoStalled(j); ok { return } } - panic("no available sinks") + var ok bool + T.sinks.Range(func(id uuid.UUID, v *sink.Sink) bool { + if id == affinity { + return true + } + if ok = v.DoStalled(j); ok { + // set affinity + T.affinity.Store(j.Source, id) + return false + } + return true + }) + if ok { + return + } + + // add to backlog + T.bmu.Lock() + defer T.bmu.Unlock() + T.backlog = append(T.backlog, j) } func (T *Pool) AddSink(s *sink.Sink) { - T.mu.Lock() - defer T.mu.Unlock() - T.sinks[s.ID()] = s + T.sinks.Store(s.ID(), s) + + T.bmu.Lock() + defer T.bmu.Unlock() + i := 0 + for _, v := range T.backlog { + if ok := s.DoStalled(v); !ok { + T.backlog[i] = v + i++ + } + } + T.backlog = T.backlog[:i] } diff --git a/lib/rob/schedulers/v0/scheduler.go b/lib/rob/schedulers/v0/scheduler.go index 14c0ae35..780a7d61 100644 --- a/lib/rob/schedulers/v0/scheduler.go +++ b/lib/rob/schedulers/v0/scheduler.go @@ -11,12 +11,23 @@ type Scheduler struct { pool pool.Pool } +func MakeScheduler() Scheduler { + return Scheduler{ + pool: pool.MakePool(), + } +} + +func NewScheduler() *Scheduler { + s := MakeScheduler() + return &s +} + func (T *Scheduler) AddSink(constraints rob.Constraints, worker rob.Worker) { T.pool.AddSink(sink.NewSink(constraints, worker)) } func (T *Scheduler) NewSource() rob.Worker { - return source.MakeSource(&T.pool) + return source.NewSource(&T.pool) } var _ rob.Scheduler = (*Scheduler)(nil) diff --git a/lib/rob/schedulers/v0/scheduler_test.go b/lib/rob/schedulers/v0/scheduler_test.go new file mode 100644 index 00000000..b2b0c60f --- /dev/null +++ b/lib/rob/schedulers/v0/scheduler_test.go @@ -0,0 +1,356 @@ +package schedulers + +import ( + "runtime" + "sync" + "sync/atomic" + "testing" + "time" + + "pggat2/lib/rob" +) + +type Work struct { + Sender int + Duration time.Duration +} + +type ShareTable struct { + table map[int]int + mu sync.RWMutex +} + +func (T *ShareTable) Inc(user int) { + T.mu.Lock() + defer T.mu.Unlock() + + if T.table == nil { + T.table = make(map[int]int) + } + T.table[user]++ +} + +func (T *ShareTable) Get(user int) int { + T.mu.RLock() + defer T.mu.RUnlock() + + v, _ := T.table[user] + return v +} + +type TestSink struct { + table *ShareTable + constraints rob.Constraints + inuse atomic.Bool +} + +func (T *TestSink) Do(constraints rob.Constraints, work any) { + if T.inuse.Swap(true) { + panic("Sink was already inuse") + } + defer T.inuse.Store(false) + if !T.constraints.Satisfies(constraints) { + panic("Scheduler did not obey constraints") + } + switch v := work.(type) { + case Work: + start := time.Now() + for time.Since(start) < v.Duration { + } + T.table.Inc(v.Sender) + } +} + +var _ rob.Worker = (*TestSink)(nil) + +func testSink(sched *Scheduler, table *ShareTable, constraints rob.Constraints) { + sched.AddSink(constraints, &TestSink{ + table: table, + constraints: constraints, + }) +} + +func testSource(sched *Scheduler, id int, dur time.Duration, constraints rob.Constraints) { + source := sched.NewSource() + for { + w := Work{ + Sender: id, + Duration: dur, + } + source.Do(constraints, w) + } +} + +func similar(v0, v1 int, vn ...int) bool { + const margin = 0.05 // 5% margin of error + + min := v0 + max := v0 + + if v1 < min { + min = v1 + } + if v1 > max { + max = v1 + } + + for _, v := range vn { + if v < min { + min = v + } + if v > max { + max = v + } + } + + if (float64(max-min) / float64(max)) > margin { + return false + } + return true +} + +// like debug.Stack but gets all stacks +func allStacks() []byte { + buf := make([]byte, 1024) + for { + n := runtime.Stack(buf, true) + if n < len(buf) { + return buf[:n] + } + buf = make([]byte, 2*len(buf)) + } +} + +func TestScheduler(t *testing.T) { + var table ShareTable + sched := NewScheduler() + testSink(sched, &table, 0) + + go testSource(sched, 0, 10*time.Millisecond, 0) + go testSource(sched, 1, 10*time.Millisecond, 0) + go testSource(sched, 2, 50*time.Millisecond, 0) + go testSource(sched, 3, 100*time.Millisecond, 0) + + time.Sleep(20 * time.Second) + t0 := table.Get(0) + t1 := table.Get(1) + t2 := table.Get(2) + t3 := table.Get(3) + + /* + Expectations: + - 0 and 1 should be similar and have roughly 10x of 3 + - 2 should have about twice as many executions as 3 + */ + + t.Log("share of 0:", t0) + t.Log("share of 1:", t1) + t.Log("share of 2:", t2) + t.Log("share of 3:", t3) + + if !similar(t0, t1) { + t.Error("expected s0 and s1 to be similar") + } + + if !similar(t0, t3*10) { + t.Error("expected s0 and s3*10 to be similar") + } + + if !similar(t2, t3*2) { + t.Error("expected s2 and s3*2 to be similar") + } +} + +func TestScheduler_Late(t *testing.T) { + var table ShareTable + sched := NewScheduler() + testSink(sched, &table, 0) + + go testSource(sched, 0, 10*time.Millisecond, 0) + go testSource(sched, 1, 10*time.Millisecond, 0) + + time.Sleep(10 * time.Second) + + go testSource(sched, 2, 10*time.Millisecond, 0) + go testSource(sched, 3, 10*time.Millisecond, 0) + + time.Sleep(10 * time.Second) + t0 := table.Get(0) + t1 := table.Get(1) + t2 := table.Get(2) + t3 := table.Get(3) + + /* + Expectations: + - 0 and 1 should be similar + - 2 and 3 should be similar + - 0 and 1 should have roughly three times as many executions as 2 and 3 + */ + + t.Log("share of 0:", t0) + t.Log("share of 1:", t1) + t.Log("share of 2:", t2) + t.Log("share of 3:", t3) + + if !similar(t0, t1) { + t.Error("expected s0 and s1 to be similar") + } + + if !similar(t2, t3) { + t.Error("expected s2 and s3 to be similar") + } + + if !similar(t0, 3*t2) { + t.Error("expected s0 and s2*3 to be similar") + } +} + +func TestScheduler_StealBalanced(t *testing.T) { + var table ShareTable + sched := NewScheduler() + testSink(sched, &table, 0) + testSink(sched, &table, 0) + + go testSource(sched, 0, 10*time.Millisecond, 0) + go testSource(sched, 1, 10*time.Millisecond, 0) + go testSource(sched, 2, 10*time.Millisecond, 0) + go testSource(sched, 3, 10*time.Millisecond, 0) + go testSource(sched, 4, 10*time.Millisecond, 0) + go testSource(sched, 5, 10*time.Millisecond, 0) + + time.Sleep(20 * time.Second) + t0 := table.Get(0) + t1 := table.Get(1) + t2 := table.Get(2) + t3 := table.Get(3) + t4 := table.Get(4) + t5 := table.Get(5) + + /* + Expectations: + - all users should get similar # of executions + */ + + t.Log("share of 0:", t0) + t.Log("share of 1:", t1) + t.Log("share of 2:", t2) + t.Log("share of 3:", t3) + t.Log("share of 4:", t4) + t.Log("share of 5:", t5) + + if !similar(t0, t1, t2, t3, t4, t5) { + t.Error("expected all shares to be similar") + } + + if t0 == 0 { + t.Error("expected executions on all sources (is there a race in the balancer??)") + t.Errorf("%s", allStacks()) + } +} + +func TestScheduler_StealUnbalanced(t *testing.T) { + var table ShareTable + sched := NewScheduler() + testSink(sched, &table, 0) + testSink(sched, &table, 0) + + go testSource(sched, 0, 10*time.Millisecond, 0) + go testSource(sched, 1, 10*time.Millisecond, 0) + go testSource(sched, 2, 10*time.Millisecond, 0) + go testSource(sched, 3, 10*time.Millisecond, 0) + go testSource(sched, 4, 10*time.Millisecond, 0) + + time.Sleep(20 * time.Second) + t0 := table.Get(0) + t1 := table.Get(1) + t2 := table.Get(2) + t3 := table.Get(3) + t4 := table.Get(4) + + /* + Expectations: + - all users should get similar # of executions + */ + + t.Log("share of 0:", t0) + t.Log("share of 1:", t1) + t.Log("share of 2:", t2) + t.Log("share of 3:", t3) + t.Log("share of 4:", t4) + + if !similar(t0, t1, t2, t3, t4) { + t.Error("expected all shares to be similar") + } + + if t0 == 0 { + t.Error("expected executions on all sources (is there a race in the balancer??)") + t.Errorf("%s", allStacks()) + } +} + +func TestScheduler_Constraints(t *testing.T) { + const ( + ConstraintA rob.Constraints = 1 << iota + ConstraintB + ) + + var table ShareTable + sched := NewScheduler() + + testSink(sched, &table, rob.Constraints.All(ConstraintA, ConstraintB)) + testSink(sched, &table, ConstraintA) + testSink(sched, &table, ConstraintB) + + go testSource(sched, 0, 10*time.Millisecond, rob.Constraints.All(ConstraintA, ConstraintB)) + go testSource(sched, 1, 10*time.Millisecond, rob.Constraints.All(ConstraintA, ConstraintB)) + go testSource(sched, 2, 10*time.Millisecond, ConstraintA) + go testSource(sched, 3, 10*time.Millisecond, ConstraintA) + go testSource(sched, 4, 10*time.Millisecond, ConstraintB) + go testSource(sched, 5, 10*time.Millisecond, ConstraintB) + + time.Sleep(20 * time.Second) + t0 := table.Get(0) + t1 := table.Get(1) + t2 := table.Get(2) + t3 := table.Get(3) + t4 := table.Get(4) + t5 := table.Get(5) + + /* + Expectations: + - all users should get similar # of executions (shares of 0 and 1 may be less because they have less sinks they can use: 1 vs 2) + - all constraints should be honored + */ + + t.Log("share of 0:", t0) + t.Log("share of 1:", t1) + t.Log("share of 2:", t2) + t.Log("share of 3:", t3) + t.Log("share of 4:", t4) + t.Log("share of 5:", t5) +} + +func TestScheduler_IdleWake(t *testing.T) { + var table ShareTable + sched := NewScheduler() + + testSink(sched, &table, 0) + + time.Sleep(10 * time.Second) + + go testSource(sched, 0, 10*time.Millisecond, 0) + + time.Sleep(10 * time.Second) + t0 := table.Get(0) + + /* + Expectations: + - 0 should have some executions + */ + + if t0 == 0 { + t.Error("expected executions to be greater than 0 (is idle waking broken?)") + } + + t.Log("share of 0:", t0) +} diff --git a/lib/rob/schedulers/v0/sink/sink.go b/lib/rob/schedulers/v0/sink/sink.go index 61f6d05b..6c6110d5 100644 --- a/lib/rob/schedulers/v0/sink/sink.go +++ b/lib/rob/schedulers/v0/sink/sink.go @@ -1,15 +1,34 @@ package sink import ( + "sync" + "time" + "github.com/google/uuid" + "pggat2/lib/rob" "pggat2/lib/rob/schedulers/v0/job" + "pggat2/lib/util/rbtree" + "pggat2/lib/util/ring" ) type Sink struct { id uuid.UUID constraints rob.Constraints worker rob.Worker + + // non final + + active uuid.UUID + start time.Time + + floor time.Duration + + stride map[uuid.UUID]time.Duration + pending map[uuid.UUID]*ring.Ring[job.Stalled] + scheduled rbtree.RBTree[time.Duration, job.Stalled] + + mu sync.Mutex } func NewSink(constraints rob.Constraints, worker rob.Worker) *Sink { @@ -17,6 +36,9 @@ func NewSink(constraints rob.Constraints, worker rob.Worker) *Sink { id: uuid.New(), constraints: constraints, worker: worker, + + stride: make(map[uuid.UUID]time.Duration), + pending: make(map[uuid.UUID]*ring.Ring[job.Stalled]), } } @@ -24,32 +46,142 @@ func (T *Sink) ID() uuid.UUID { return T.id } -func (T *Sink) Constraints() rob.Constraints { - return T.constraints +func (T *Sink) setNext(source uuid.UUID) { + T.active = source + T.start = time.Now() +} + +func (T *Sink) addStalled(j job.Stalled) { + // try to schedule right away + if ok := T.tryScheduleStalled(j); ok { + return + } + + // add to pending queue + if _, ok := T.pending[j.Source]; !ok { + r := ring.NewRing[job.Stalled](0, 1) + r.PushBack(j) + T.pending[j.Source] = r + return + } + + T.pending[j.Source].PushBack(j) +} + +func (T *Sink) schedulePending(source uuid.UUID) { + pending, ok := T.pending[source] + if !ok { + return + } + work, ok := pending.Get(0) + if !ok { + return + } + if ok = T.tryScheduleStalled(work); !ok { + return + } + pending.PopFront() +} + +func (T *Sink) tryScheduleStalled(j job.Stalled) bool { + if T.active == j.Source { + return false + } + + stride := T.stride[j.Source] + if stride < T.floor { + stride = T.floor + T.stride[j.Source] = stride + } + + for { + // find unique stride to schedule on + if s, ok := T.scheduled.Get(stride); ok { + if s.Source == j.Source { + return false + } + stride += 1 + continue + } + + T.scheduled.Set(stride, j) + break + } + + return true +} + +func (T *Sink) next() bool { + if T.active != uuid.Nil { + source := T.active + dur := time.Since(T.start) + T.active = uuid.Nil + + T.stride[source] += dur + + T.schedulePending(source) + } + + stride, j, ok := T.scheduled.Min() + if !ok { + return false + } + T.scheduled.Delete(stride) + T.floor = stride + + T.setNext(j.Source) + j.Out <- T + return true } -// DoIfIdle will call Do if the target Sink is idle. -// Returns true if the job is complete -func (T *Sink) DoIfIdle(j job.Job) bool { +func (T *Sink) DoConcurrent(j job.Concurrent) (done bool) { if !T.constraints.Satisfies(j.Constraints) { return false } - // TODO(garet) check if idle + T.mu.Lock() + + if T.active != uuid.Nil { + T.mu.Unlock() + // this Sink is in use + return false + } + + T.setNext(j.Source) + T.mu.Unlock() + T.Do(j.Constraints, j.Work) - T.Do(j) return true } -// Do will do the work if the constraints match -// Returns true if the job is complete -func (T *Sink) Do(j job.Job) bool { +func (T *Sink) DoStalled(j job.Stalled) (ok bool) { if !T.constraints.Satisfies(j.Constraints) { return false } - // TODO(garet) queue if we are too busy + T.mu.Lock() + defer T.mu.Unlock() + + if T.active != uuid.Nil { + // sink is in use, add to queue + T.addStalled(j) + } else { + // sink is open, do now + T.setNext(j.Source) + j.Out <- T + } - T.worker.Do(j.Constraints, j.Work) return true } + +func (T *Sink) Do(constraints rob.Constraints, work any) { + if !T.constraints.Satisfies(constraints) { + panic("Do called on sink with non satisfied constraints") + } + T.worker.Do(constraints, work) + T.mu.Lock() + defer T.mu.Unlock() + T.next() +} + +var _ rob.Worker = (*Sink)(nil) diff --git a/lib/rob/schedulers/v0/source/source.go b/lib/rob/schedulers/v0/source/source.go index 445daca7..936a16ac 100644 --- a/lib/rob/schedulers/v0/source/source.go +++ b/lib/rob/schedulers/v0/source/source.go @@ -2,30 +2,48 @@ package source import ( "github.com/google/uuid" + "pggat2/lib/rob" "pggat2/lib/rob/schedulers/v0/job" "pggat2/lib/rob/schedulers/v0/pool" + "pggat2/lib/util/pools" ) type Source struct { - id uuid.UUID - stall chan rob.Worker - pool *pool.Pool + id uuid.UUID + pool *pool.Pool + + stall pools.Locked[chan rob.Worker] } -func MakeSource(p *pool.Pool) Source { - return Source{ +func NewSource(p *pool.Pool) *Source { + return &Source{ id: uuid.New(), pool: p, } } -func (T Source) Do(constraints rob.Constraints, work any) { - T.pool.Do(job.Job{ +func (T *Source) Do(constraints rob.Constraints, work any) { + if T.pool.DoConcurrent(job.Concurrent{ Source: T.id, Constraints: constraints, Work: work, + }) { + return + } + out, ok := T.stall.Get() + if !ok { + out = make(chan rob.Worker) + } + defer T.stall.Put(out) + + T.pool.DoStalled(job.Stalled{ + Source: T.id, + Constraints: constraints, + Out: out, }) + worker := <-out + worker.Do(constraints, work) } -var _ rob.Worker = Source{} +var _ rob.Worker = (*Source)(nil) diff --git a/lib/util/maps/rwlocked.go b/lib/util/maps/rwlocked.go new file mode 100644 index 00000000..530724a2 --- /dev/null +++ b/lib/util/maps/rwlocked.go @@ -0,0 +1,75 @@ +package maps + +import "sync" + +type RWLocked[K comparable, V any] struct { + inner map[K]V + mu sync.RWMutex +} + +func (T *RWLocked[K, V]) Delete(key K) { + T.mu.Lock() + defer T.mu.Unlock() + delete(T.inner, key) +} + +func (T *RWLocked[K, V]) Load(key K) (value V, ok bool) { + T.mu.RLock() + defer T.mu.RUnlock() + value, ok = T.inner[key] + return +} + +func (T *RWLocked[K, V]) LoadAndDelete(key K) (value V, loaded bool) { + T.mu.Lock() + defer T.mu.Unlock() + value, loaded = T.inner[key] + delete(T.inner, key) + return +} + +func (T *RWLocked[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool) { + T.mu.Lock() + defer T.mu.Unlock() + actual, loaded = T.inner[key] + if !loaded { + if T.inner == nil { + T.inner = make(map[K]V) + } + T.inner[key] = value + actual = value + } + return +} + +func (T *RWLocked[K, V]) Store(key K, value V) { + T.mu.Lock() + defer T.mu.Unlock() + if T.inner == nil { + T.inner = make(map[K]V) + } + T.inner[key] = value +} + +func (T *RWLocked[K, V]) Swap(key K, value V) (previous V, loaded bool) { + T.mu.Lock() + defer T.mu.Unlock() + previous, loaded = T.inner[key] + if T.inner == nil { + T.inner = make(map[K]V) + } + T.inner[key] = value + return +} + +func (T *RWLocked[K, V]) Range(f func(key K, value V) bool) { + T.mu.RLock() + for key, value := range T.inner { + T.mu.RUnlock() + if !f(key, value) { + return + } + T.mu.RLock() + } + T.mu.RUnlock() +} diff --git a/lib/util/pools/locked.go b/lib/util/pools/locked.go new file mode 100644 index 00000000..122aa742 --- /dev/null +++ b/lib/util/pools/locked.go @@ -0,0 +1,25 @@ +package pools + +import "sync" + +type Locked[T any] struct { + inner []T + mu sync.Mutex +} + +func (L *Locked[T]) Get() (T, bool) { + L.mu.Lock() + defer L.mu.Unlock() + if len(L.inner) == 0 { + return *new(T), false + } + v := L.inner[len(L.inner)-1] + L.inner = L.inner[:len(L.inner)-1] + return v, true +} + +func (L *Locked[T]) Put(v T) { + L.mu.Lock() + defer L.mu.Unlock() + L.inner = append(L.inner, v) +} -- GitLab