From a84e6ac7de28743808fe898dabd560ac13e43133 Mon Sep 17 00:00:00 2001
From: Garet Halliday <me@garet.holiday>
Date: Thu, 27 Apr 2023 19:22:26 -0500
Subject: [PATCH] hmmm

---
 .gitignore                              |   1 +
 lib/rob/schedulers/v0/scheduler.go      |  41 ++++----
 lib/rob/schedulers/v0/scheduler_test.go |   5 +
 lib/rob/schedulers/v0/sink.go           | 124 +++++++++---------------
 lib/rob/schedulers/v0/source.go         |  59 ++++++++---
 lib/rob/schedulers/v0/thread.go         |  26 -----
 lib/rob/schedulers/v0/work.go           |  13 ---
 7 files changed, 113 insertions(+), 156 deletions(-)
 create mode 100644 .gitignore
 delete mode 100644 lib/rob/schedulers/v0/thread.go
 delete mode 100644 lib/rob/schedulers/v0/work.go

diff --git a/.gitignore b/.gitignore
new file mode 100644
index 00000000..723ef36f
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+.idea
\ No newline at end of file
diff --git a/lib/rob/schedulers/v0/scheduler.go b/lib/rob/schedulers/v0/scheduler.go
index d0abb11b..71d5ffa1 100644
--- a/lib/rob/schedulers/v0/scheduler.go
+++ b/lib/rob/schedulers/v0/scheduler.go
@@ -2,15 +2,15 @@ package v0
 
 import (
 	"math/rand"
+	"sync"
 
 	"pggat2/lib/rob"
 )
 
 type Scheduler struct {
-	sinks   []*Sink
-	sources []*Source
-
-	backOrders []*work
+	sinks     []*Sink
+	backorder []*Source
+	mu        sync.Mutex
 }
 
 func NewScheduler() *Scheduler {
@@ -18,30 +18,29 @@ func NewScheduler() *Scheduler {
 }
 
 func (T *Scheduler) NewSink() rob.Sink {
-	sink := newSink(T)
-	T.sinks = append(T.sinks, sink)
-	for _, backOrder := range T.backOrders {
-		sink.enqueue(backOrder)
+	sink := newSink()
+	T.mu.Lock()
+	defer T.mu.Unlock()
+	for _, source := range T.backorder {
+		source.assign(sink)
 	}
-	T.backOrders = T.backOrders[:0]
+	T.backorder = T.backorder[:0]
+	T.sinks = append(T.sinks, sink)
 	return sink
 }
 
 func (T *Scheduler) NewSource() rob.Source {
-	source := newSource(T)
-	T.sources = append(T.sources, source)
-	return source
-}
-
-func (T *Scheduler) getSink() *Sink {
+	source := newSource()
+	T.mu.Lock()
+	defer T.mu.Unlock()
 	if len(T.sinks) == 0 {
-		return nil
+		T.backorder = append(T.backorder, source)
+	} else {
+		idx := rand.Intn(len(T.sinks))
+		sink := T.sinks[idx]
+		source.assign(sink)
 	}
-	return T.sinks[rand.Intn(len(T.sinks))]
-}
-
-func (T *Scheduler) backOrder(w *work) {
-	T.backOrders = append(T.backOrders, w)
+	return source
 }
 
 var _ rob.Scheduler = (*Scheduler)(nil)
diff --git a/lib/rob/schedulers/v0/scheduler_test.go b/lib/rob/schedulers/v0/scheduler_test.go
index 80cec375..e29357c9 100644
--- a/lib/rob/schedulers/v0/scheduler_test.go
+++ b/lib/rob/schedulers/v0/scheduler_test.go
@@ -113,4 +113,9 @@ func TestScheduler(t *testing.T) {
 	log.Println("share of 1:", t1)
 	log.Println("share of 2:", t2)
 	log.Println("share of 3:", t3)
+	log.Println("total:",
+		time.Duration((t1+t3)*10)*time.Millisecond+
+			time.Duration(t2*50)*time.Millisecond+
+			time.Duration(t1*100)*time.Millisecond,
+	)
 }
diff --git a/lib/rob/schedulers/v0/sink.go b/lib/rob/schedulers/v0/sink.go
index eba5f095..58b9c355 100644
--- a/lib/rob/schedulers/v0/sink.go
+++ b/lib/rob/schedulers/v0/sink.go
@@ -5,88 +5,70 @@ import (
 	"sync"
 	"time"
 
-	"github.com/google/uuid"
-
 	"pggat2/lib/rob"
 )
 
-const (
-	// tIdleTimeout is how long a thread can be idle before it is no longer runnable
-	tIdleTimeout = 5 * time.Millisecond
-)
-
 type Sink struct {
-	scheduler *Scheduler
+	// currently active Source
+	active *Source
+	// start time of the current thread
+	start time.Time
 
-	threads map[uuid.UUID]*thread
+	awake chan struct{}
 
-	active      *thread
-	activeStart time.Time
-
-	sigRunnable chan struct{}
 	// TODO(garet) change for red black tree
-	runnable []*thread
-
-	minRuntime time.Duration
+	// runnable queue
+	queue []*Source
 
 	mu sync.Mutex
 }
 
-func newSink(scheduler *Scheduler) *Sink {
+func newSink() *Sink {
 	sink := &Sink{
-		scheduler: scheduler,
-
-		threads: make(map[uuid.UUID]*thread),
-
-		sigRunnable: make(chan struct{}),
+		awake: make(chan struct{}),
 	}
 	return sink
 }
 
-func (T *Sink) newThread(source *Source) *thread {
-	t := &thread{
-		source: source,
+func (T *Sink) _runnable(t *Source) {
+	for _, q := range T.queue {
+		if t == q {
+			return
+		}
 	}
-	T.threads[source.id] = t
-	return t
-}
 
-func (T *Sink) getOrCreateThread(source *Source) *thread {
-	var t *thread
-	var ok bool
-	// get or create thread
-	if t, ok = T.threads[source.id]; !ok {
-		t = T.newThread(source)
+	T.queue = append(T.queue, t)
+	sort.Slice(
+		T.queue,
+		func(i, j int) bool {
+			return T.queue[i].runtime.Load() > T.queue[j].runtime.Load()
+		},
+	)
+
+	select {
+	case T.awake <- struct{}{}:
+	default:
 	}
-	return t
 }
 
-func (T *Sink) enqueueRunnable(t *thread) {
-	if len(t.queue) == 0 {
+func (T *Sink) runnable(t *Source) {
+	if t.idle() {
 		panic("tried to enqueue a stalled thread")
 	}
 
-	if t == T.active {
+	T.mu.Lock()
+	defer T.mu.Unlock()
+
+	if T.active == t {
 		return
 	}
 
-	T.runnable = append(T.runnable, t)
-	sort.Slice(
-		T.runnable,
-		func(i, j int) bool {
-			return T.runnable[i].runtime > T.runnable[j].runtime
-		},
-	)
-
-	select {
-	case T.sigRunnable <- struct{}{}:
-	default:
-	}
+	T._runnable(t)
 }
 
-func (T *Sink) popThread() *thread {
-	t := T.runnable[len(T.runnable)-1]
-	T.runnable = T.runnable[:len(T.runnable)-1]
+func (T *Sink) _next() *Source {
+	t := T.queue[len(T.queue)-1]
+	T.queue = T.queue[:len(T.queue)-1]
 	return t
 }
 
@@ -100,47 +82,29 @@ func (T *Sink) Read() any {
 		t := T.active
 		T.active = nil
 
-		t.finish = now
-		dur := now.Sub(T.activeStart)
-		t.runtime += dur
+		dur := now.Sub(T.start)
+		t.runtime.Add(dur.Nanoseconds())
 
 		// reschedule if thread has more work
-		if len(t.queue) > 0 {
-			T.enqueueRunnable(t)
+		if !t.idle() {
+			T._runnable(t)
 		}
 	}
 
-	for len(T.runnable) == 0 {
+	for len(T.queue) == 0 {
 		T.mu.Unlock()
-		<-T.sigRunnable
+		<-T.awake
 		T.mu.Lock()
 	}
 
 	// pop thread off
-	t := T.popThread()
+	t := T._next()
 
-	T.minRuntime = t.runtime
 	T.active = t
-	T.activeStart = now
+	T.start = now
 
 	// pop work from thread
-	w := t.popWork()
-
-	return w.payload
-}
-
-func (T *Sink) enqueue(w *work) {
-	T.mu.Lock()
-	defer T.mu.Unlock()
-
-	t := T.getOrCreateThread(w.source)
-
-	if len(t.queue) == 0 && time.Since(t.finish) > tIdleTimeout {
-		t.runtime = T.minRuntime
-	}
-
-	t.enqueue(w)
-	T.enqueueRunnable(t)
+	return t.pop()
 }
 
 var _ rob.Sink = (*Sink)(nil)
diff --git a/lib/rob/schedulers/v0/source.go b/lib/rob/schedulers/v0/source.go
index c177a150..e2c300a4 100644
--- a/lib/rob/schedulers/v0/source.go
+++ b/lib/rob/schedulers/v0/source.go
@@ -1,36 +1,63 @@
 package v0
 
 import (
-	"github.com/google/uuid"
+	"sync"
+	"sync/atomic"
 
 	"pggat2/lib/rob"
 )
 
 type Source struct {
-	scheduler *Scheduler
+	sink atomic.Pointer[Sink]
 
-	id        uuid.UUID
-	preferred *Sink
+	// vruntime in CFS
+	runtime atomic.Int64
+
+	queue []any
+	mu    sync.RWMutex
 }
 
-func newSource(scheduler *Scheduler) *Source {
-	return &Source{
-		scheduler: scheduler,
+func newSource() *Source {
+	return &Source{}
+}
 
-		id: uuid.New(),
+func (T *Source) Schedule(w any) {
+	T.mu.Lock()
+	wasEmpty := len(T.queue) == 0
+	T.queue = append(T.queue, w)
+	T.mu.Unlock()
+	if wasEmpty {
+		sink := T.sink.Load()
+		if sink != nil {
+			sink.runnable(T)
+		}
 	}
 }
 
-func (T *Source) Schedule(a any) {
-	w := newWork(T, a)
-	if T.preferred == nil {
-		T.preferred = T.scheduler.getSink()
+func (T *Source) idle() bool {
+	T.mu.RLock()
+	defer T.mu.RUnlock()
+	return len(T.queue) == 0
+}
+
+func (T *Source) pop() any {
+	T.mu.Lock()
+	defer T.mu.Unlock()
+
+	if len(T.queue) == 0 {
+		panic("pop on empty Source")
 	}
-	if T.preferred != nil {
-		T.preferred.enqueue(w)
-		return
+
+	w := T.queue[0]
+	for i := 1; i < len(T.queue)-1; i++ {
+		T.queue[i] = T.queue[i+1]
 	}
-	T.scheduler.backOrder(w)
+	T.queue = T.queue[:len(T.queue)-1]
+	return w
+}
+
+func (T *Source) assign(sink *Sink) {
+	T.sink.Store(sink)
 }
 
 var _ rob.Source = (*Source)(nil)
diff --git a/lib/rob/schedulers/v0/thread.go b/lib/rob/schedulers/v0/thread.go
deleted file mode 100644
index c2a9ceb7..00000000
--- a/lib/rob/schedulers/v0/thread.go
+++ /dev/null
@@ -1,26 +0,0 @@
-package v0
-
-import "time"
-
-type thread struct {
-	source *Source
-	// runtime of thread (similar to vruntime in CFS)
-	runtime time.Duration
-
-	finish time.Time
-
-	queue []*work
-}
-
-func (T *thread) popWork() *work {
-	w := T.queue[0]
-	for i := 1; i < len(T.queue)-1; i++ {
-		T.queue[i] = T.queue[i+1]
-	}
-	T.queue = T.queue[:len(T.queue)-1]
-	return w
-}
-
-func (T *thread) enqueue(w *work) {
-	T.queue = append(T.queue, w)
-}
diff --git a/lib/rob/schedulers/v0/work.go b/lib/rob/schedulers/v0/work.go
deleted file mode 100644
index 1c74e298..00000000
--- a/lib/rob/schedulers/v0/work.go
+++ /dev/null
@@ -1,13 +0,0 @@
-package v0
-
-type work struct {
-	source  *Source
-	payload any
-}
-
-func newWork(source *Source, payload any) *work {
-	return &work{
-		source:  source,
-		payload: payload,
-	}
-}
-- 
GitLab