diff --git a/lib/rob/schedulers/v1/notifier.go b/lib/rob/schedulers/v1/notifier.go new file mode 100644 index 0000000000000000000000000000000000000000..c3ed59d0ba229101d9db0112652a906d72bdcf70 --- /dev/null +++ b/lib/rob/schedulers/v1/notifier.go @@ -0,0 +1,5 @@ +package schedulers + +type notifier interface { + notify(which *Source) +} diff --git a/lib/rob/schedulers/v1/scheduler.go b/lib/rob/schedulers/v1/scheduler.go index cae0b5b8706d2264ead9a812799de6d483cb37b2..0386e1cc2f34f80f61a955cb98ae57cbe2660276 100644 --- a/lib/rob/schedulers/v1/scheduler.go +++ b/lib/rob/schedulers/v1/scheduler.go @@ -49,11 +49,14 @@ func (T *Scheduler) NewSource() rob.Source { return source } -func (T *Scheduler) steal() *Source { +func (T *Scheduler) steal(ignore *Sink) *Source { T.mu.RLock() defer T.mu.RUnlock() for _, sink := range T.sinks { + if sink == ignore { + continue + } if source := sink.steal(); source != nil { return source } @@ -62,3 +65,4 @@ func (T *Scheduler) steal() *Source { } var _ rob.Scheduler = (*Scheduler)(nil) +var _ stealer = (*Scheduler)(nil) diff --git a/lib/rob/schedulers/v1/scheduler_test.go b/lib/rob/schedulers/v1/scheduler_test.go index b03eb83a391b79ca04663f0b5f9f6f4c96132f94..92f72048f3532ffcdabb21ffce2dccf1c39e3782 100644 --- a/lib/rob/schedulers/v1/scheduler_test.go +++ b/lib/rob/schedulers/v1/scheduler_test.go @@ -1,7 +1,6 @@ package schedulers import ( - "log" "sync" "testing" "time" @@ -66,6 +65,34 @@ func testSource(sched *Scheduler, id int, dur time.Duration) { } } +func similar(v0, v1 int, vn ...int) bool { + const margin = 0.05 // 5% margin of error + + min := v0 + max := v0 + + if v1 < min { + min = v1 + } + if v1 > max { + max = v1 + } + + for _, v := range vn { + if v < min { + min = v + } + if v > max { + max = v + } + } + + if (float64(max-min) / float64(max)) > margin { + return false + } + return true +} + func TestScheduler(t *testing.T) { var table ShareTable sched := NewScheduler() @@ -81,16 +108,29 @@ func TestScheduler(t *testing.T) { t1 := table.Get(1) t2 := table.Get(2) t3 := table.Get(3) - log.Println("share of 0:", t0) - log.Println("share of 1:", t1) - log.Println("share of 2:", t2) - log.Println("share of 3:", t3) /* Expectations: - 0 and 1 should be similar and have roughly 10x of 3 - 2 should have about twice as many executions as 3 */ + + t.Log("share of 0:", t0) + t.Log("share of 1:", t1) + t.Log("share of 2:", t2) + t.Log("share of 3:", t3) + + if !similar(t0, t1) { + t.Error("expected s0 and s1 to be similar") + } + + if !similar(t0, t3*10) { + t.Error("expected s0 and s3*10 to be similar") + } + + if !similar(t2, t3*2) { + t.Error("expected s2 and s3*2 to be similar") + } } func TestScheduler_Late(t *testing.T) { @@ -111,20 +151,30 @@ func TestScheduler_Late(t *testing.T) { t1 := table.Get(1) t2 := table.Get(2) t3 := table.Get(3) - log.Println("share of 0:", t0) - log.Println("share of 1:", t1) - log.Println("share of 2:", t2) - log.Println("share of 3:", t3) /* Expectations: - 0 and 1 should be similar - 2 and 3 should be similar - 0 and 1 should have roughly three times as many executions as 2 and 3 - - IF THEY ARE ROUGHLY SIMILAR, THIS TEST IS A FAIL!!!! 0 AND 1 SHOULD NOT STALL WHEN 2 AND 3 ARE INTRODUCED - i need to make these automatic, but it's easy enough to eyeball it */ + + t.Log("share of 0:", t0) + t.Log("share of 1:", t1) + t.Log("share of 2:", t2) + t.Log("share of 3:", t3) + + if !similar(t0, t1) { + t.Error("expected s0 and s1 to be similar") + } + + if !similar(t2, t3) { + t.Error("expected s2 and s3 to be similar") + } + + if !similar(t0, 3*t2) { + t.Error("expected s0 and s2*3 to be similar") + } } func TestScheduler_StealBalanced(t *testing.T) { @@ -143,15 +193,20 @@ func TestScheduler_StealBalanced(t *testing.T) { t1 := table.Get(1) t2 := table.Get(2) t3 := table.Get(3) - log.Println("share of 0:", t0) - log.Println("share of 1:", t1) - log.Println("share of 2:", t2) - log.Println("share of 3:", t3) /* Expectations: - all users should get similar # of executions */ + + t.Log("share of 0:", t0) + t.Log("share of 1:", t1) + t.Log("share of 2:", t2) + t.Log("share of 3:", t3) + + if !similar(t0, t1, t2, t3) { + t.Error("expected all shares to be similar") + } } func TestScheduler_StealUnbalanced(t *testing.T) { @@ -168,12 +223,17 @@ func TestScheduler_StealUnbalanced(t *testing.T) { t0 := table.Get(0) t1 := table.Get(1) t2 := table.Get(2) - log.Println("share of 0:", t0) - log.Println("share of 1:", t1) - log.Println("share of 2:", t2) /* Expectations: - all users should get similar # of executions */ + + t.Log("share of 0:", t0) + t.Log("share of 1:", t1) + t.Log("share of 2:", t2) + + if !similar(t0, t1, t2) { + t.Error("expected all shares to be similar") + } } diff --git a/lib/rob/schedulers/v1/sink.go b/lib/rob/schedulers/v1/sink.go index 744018ca9c09d53402b37e32caf322cc8f729338..93b9ecfb87abf9a0f3b208a1751cc8d27e42393d 100644 --- a/lib/rob/schedulers/v1/sink.go +++ b/lib/rob/schedulers/v1/sink.go @@ -16,7 +16,7 @@ const ( ) type Sink struct { - scheduler *Scheduler + stealer stealer constraints rob.Constraints runtime map[uuid.UUID]time.Duration @@ -32,9 +32,9 @@ type Sink struct { mu sync.Mutex } -func newSink(scheduler *Scheduler, constraints rob.Constraints) *Sink { +func newSink(stealer stealer, constraints rob.Constraints) *Sink { return &Sink{ - scheduler: scheduler, + stealer: stealer, constraints: constraints, runtime: make(map[uuid.UUID]time.Duration), ready: make(chan struct{}), @@ -42,9 +42,16 @@ func newSink(scheduler *Scheduler, constraints rob.Constraints) *Sink { } func (T *Sink) assign(source *Source) { - source.setNotifier(T.enqueue) + T.mu.Lock() + defer T.mu.Unlock() - T.enqueue(source) + T._assign(source) +} + +func (T *Sink) _assign(source *Source) { + source.setNotifier(T) + + T._enqueue(source) } // steal a thread from this Sink. Note: you can only steal pending threads @@ -60,7 +67,7 @@ func (T *Sink) steal() *Source { return source } -func (T *Sink) enqueue(source *Source) { +func (T *Sink) notify(source *Source) { T.mu.Lock() defer T.mu.Unlock() T._enqueue(source) @@ -100,6 +107,31 @@ func (T *Sink) _enqueue(source *Source) { } } +func (T *Sink) _next() *Source { + for { + runtime, source, ok := T.queue.Min() + if !ok { + // attempt to steal + source = T.stealer.steal(T) + if source != nil { + T._assign(source) + } else { + // unlock to allow work to be added to queue while we wait + T.mu.Unlock() + select { + case <-T.ready: + case <-time.After(stealPeriod): + } + T.mu.Lock() + } + continue + } + T.queue.Delete(runtime) + T.floor = runtime + return source + } +} + func (T *Sink) Read() any { T.mu.Lock() defer T.mu.Unlock() @@ -117,36 +149,7 @@ func (T *Sink) Read() any { for { // get next runnable thread - for { - var runtime time.Duration - var ok bool - runtime, T.active, ok = T.queue.Min() - if !ok { - T.mu.Unlock() - - // attempt to steal - source := T.scheduler.steal() - if source != nil { - T.assign(source) - } - - T.mu.Lock() - if runtime, T.active, ok = T.queue.Min(); ok { - // we have gotten it successfully - } else { - T.mu.Unlock() - select { - case <-T.ready: - case <-time.After(stealPeriod): - } - T.mu.Lock() - continue - } - } - T.queue.Delete(runtime) - T.floor = runtime - break - } + T.active = T._next() T.start = time.Now() @@ -162,3 +165,4 @@ func (T *Sink) Read() any { } var _ rob.Sink = (*Sink)(nil) +var _ notifier = (*Sink)(nil) diff --git a/lib/rob/schedulers/v1/source.go b/lib/rob/schedulers/v1/source.go index ff6046f539fe0f411c6925cf0d35e37f6ecfef43..8d5e13caf70fc4a27b6e131f127987931368651a 100644 --- a/lib/rob/schedulers/v1/source.go +++ b/lib/rob/schedulers/v1/source.go @@ -12,9 +12,9 @@ import ( type Source struct { id uuid.UUID - queue ring.Ring[any] - notifier func(*Source) - mu sync.Mutex + queue ring.Ring[any] + notify notifier + mu sync.Mutex } func newSource() *Source { @@ -26,11 +26,11 @@ func newSource() *Source { func (T *Source) Schedule(w any, constraints rob.Constraints) { T.mu.Lock() T.queue.PushBack(w) - notifier := T.notifier + notify := T.notify T.mu.Unlock() - if notifier != nil { - notifier(T) + if notify != nil { + notify.notify(T) } } @@ -42,10 +42,10 @@ func (T *Source) pop() (next any, ok, hasMore bool) { return } -func (T *Source) setNotifier(notifier func(*Source)) { +func (T *Source) setNotifier(notify notifier) { T.mu.Lock() defer T.mu.Unlock() - T.notifier = notifier + T.notify = notify } var _ rob.Source = (*Source)(nil) diff --git a/lib/rob/schedulers/v1/stealer.go b/lib/rob/schedulers/v1/stealer.go new file mode 100644 index 0000000000000000000000000000000000000000..e6d859b331fa41ec7aa4ded1e7b6eabdb8d89e4b --- /dev/null +++ b/lib/rob/schedulers/v1/stealer.go @@ -0,0 +1,5 @@ +package schedulers + +type stealer interface { + steal(ignore *Sink) *Source +}