good morning!!!!

Skip to content
Snippets Groups Projects
Commit 385585a9 authored by Garet Halliday's avatar Garet Halliday
Browse files

work stealing working

parent c9efd6cc
No related branches found
No related tags found
No related merge requests found
......@@ -11,7 +11,7 @@ type Scheduler struct {
sinks []*Sink
backorder []*Source
mu sync.Mutex
mu sync.RWMutex
}
func NewScheduler() *Scheduler {
......@@ -19,7 +19,7 @@ func NewScheduler() *Scheduler {
}
func (T *Scheduler) NewSink() rob.Sink {
sink := newSink()
sink := newSink(T)
T.mu.Lock()
defer T.mu.Unlock()
......@@ -49,4 +49,16 @@ func (T *Scheduler) NewSource() rob.Source {
return source
}
func (T *Scheduler) steal() *Source {
T.mu.RLock()
defer T.mu.RUnlock()
for _, sink := range T.sinks {
if source := sink.steal(); source != nil {
return source
}
}
return nil
}
var _ rob.Scheduler = (*Scheduler)(nil)
......@@ -126,3 +126,54 @@ func TestScheduler_Late(t *testing.T) {
i need to make these automatic, but it's easy enough to eyeball it
*/
}
func TestScheduler_StealBalanced(t *testing.T) {
var table ShareTable
sched := NewScheduler()
go testSink(sched, &table)
go testSink(sched, &table)
go testSource(sched, 0, 10*time.Millisecond)
go testSource(sched, 1, 10*time.Millisecond)
go testSource(sched, 2, 10*time.Millisecond)
go testSource(sched, 3, 10*time.Millisecond)
time.Sleep(20 * time.Second)
t0 := table.Get(0)
t1 := table.Get(1)
t2 := table.Get(2)
t3 := table.Get(3)
log.Println("share of 0:", t0)
log.Println("share of 1:", t1)
log.Println("share of 2:", t2)
log.Println("share of 3:", t3)
/*
Expectations:
- all users should get similar # of executions
*/
}
func TestScheduler_StealUnbalanced(t *testing.T) {
var table ShareTable
sched := NewScheduler()
go testSink(sched, &table)
go testSink(sched, &table)
go testSource(sched, 0, 10*time.Millisecond)
go testSource(sched, 1, 10*time.Millisecond)
go testSource(sched, 2, 10*time.Millisecond)
time.Sleep(20 * time.Second)
t0 := table.Get(0)
t1 := table.Get(1)
t2 := table.Get(2)
log.Println("share of 0:", t0)
log.Println("share of 1:", t1)
log.Println("share of 2:", t2)
/*
Expectations:
- all users should get similar # of executions
*/
}
......@@ -10,7 +10,14 @@ import (
"pggat2/lib/util/rbtree"
)
const (
// how often we should wake up to try and steal some work
stealPeriod = 100 * time.Millisecond
)
type Sink struct {
scheduler *Scheduler
runtime map[uuid.UUID]time.Duration
active *Source
......@@ -24,10 +31,11 @@ type Sink struct {
mu sync.Mutex
}
func newSink() *Sink {
func newSink(scheduler *Scheduler) *Sink {
return &Sink{
runtime: make(map[uuid.UUID]time.Duration),
ready: make(chan struct{}),
scheduler: scheduler,
runtime: make(map[uuid.UUID]time.Duration),
ready: make(chan struct{}),
}
}
......@@ -37,6 +45,19 @@ func (T *Sink) assign(source *Source) {
T.enqueue(source)
}
// steal a thread from this Sink. Note: you can only steal pending threads
func (T *Sink) steal() *Source {
T.mu.Lock()
defer T.mu.Unlock()
rt, source, ok := T.queue.Min()
if !ok {
return nil
}
T.queue.Delete(rt)
return source
}
func (T *Sink) enqueue(source *Source) {
T.mu.Lock()
defer T.mu.Unlock()
......@@ -100,9 +121,25 @@ func (T *Sink) Read() any {
runtime, T.active, ok = T.queue.Min()
if !ok {
T.mu.Unlock()
<-T.ready
// attempt to steal
source := T.scheduler.steal()
if source != nil {
T.assign(source)
}
T.mu.Lock()
continue
if runtime, T.active, ok = T.queue.Min(); ok {
// we have gotten it successfully
} else {
T.mu.Unlock()
select {
case <-T.ready:
case <-time.After(stealPeriod):
}
T.mu.Lock()
continue
}
}
T.queue.Delete(runtime)
T.floor = runtime
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment