From 7bf6b759474dd250d7f0bf9130c6cae0f7e45ee7 Mon Sep 17 00:00:00 2001
From: Garet Halliday <me@garet.holiday>
Date: Tue, 16 May 2023 13:09:28 -0500
Subject: [PATCH] reverse iteration on rbtree

---
 cmd/cgat/main.go                   |  5 ++--
 lib/rob/schedulers/v0/sink/sink.go | 29 +++++++++++++++++------
 lib/rob/schedulers/v1/scheduler.go | 18 ++++++++++++++
 lib/util/rbtree/rbtree.go          | 35 +++++++++++++++++++++++++++
 lib/util/rbtree/rbtree_test.go     | 38 ++++++++++++++++++++++++++++++
 5 files changed, 116 insertions(+), 9 deletions(-)
 create mode 100644 lib/rob/schedulers/v1/scheduler.go

diff --git a/cmd/cgat/main.go b/cmd/cgat/main.go
index 44a36239..fc3c49c8 100644
--- a/cmd/cgat/main.go
+++ b/cmd/cgat/main.go
@@ -64,8 +64,9 @@ func main() {
 	log.Println("Starting pggat...")
 
 	r := schedulers.MakeScheduler()
-	go testServer(&r)
-	go testServer(&r)
+	for i := 0; i < 5; i++ {
+		go testServer(&r)
+	}
 
 	listener, err := net.Listen("tcp", "0.0.0.0:6432") // TODO(garet) make this configurable
 	if err != nil {
diff --git a/lib/rob/schedulers/v0/sink/sink.go b/lib/rob/schedulers/v0/sink/sink.go
index 9283b314..cd8abcd1 100644
--- a/lib/rob/schedulers/v0/sink/sink.go
+++ b/lib/rob/schedulers/v0/sink/sink.go
@@ -200,21 +200,36 @@ func (T *Sink) StealFor(rhs *Sink) (uuid.UUID, bool) {
 		if rhs.constraints.Satisfies(work.Constraints) {
 			source := work.Source
 
-			rhs.mu.Lock()
-			defer rhs.mu.Unlock()
-
-			// steal it
+			// take jobs from T
 			T.scheduled.Delete(stride)
 
-			rhs.addStalled(work)
-
-			// steal pending
 			pending, _ := T.pending[work.Source]
+			delete(T.pending, work.Source)
+
+			// we have to unlock to prevent deadlock
+			// if T tries to steal from rhs at the same time rhs tries to steal from T, we could deadlock otherwise
+			// (speaking from experience)
+			T.mu.Unlock()
+
+			// add to rhs
+			rhs.mu.Lock()
+
+			rhs.addStalled(work)
 
 			for work, ok = pending.PopFront(); ok; work, ok = pending.PopFront() {
 				rhs.addStalled(work)
 			}
 
+			rhs.mu.Unlock()
+
+			// try to return buffer to T (if fails, it's not a big deal)
+
+			T.mu.Lock()
+
+			if _, ok = T.pending[work.Source]; !ok {
+				T.pending[work.Source] = pending
+			}
+
 			return source, true
 		}
 	}
diff --git a/lib/rob/schedulers/v1/scheduler.go b/lib/rob/schedulers/v1/scheduler.go
new file mode 100644
index 00000000..4cf12a29
--- /dev/null
+++ b/lib/rob/schedulers/v1/scheduler.go
@@ -0,0 +1,18 @@
+package schedulers
+
+import "pggat2/lib/rob"
+
+type Scheduler struct {
+}
+
+func (T *Scheduler) AddSink(constraints rob.Constraints, worker rob.Worker) {
+	// TODO implement me
+	panic("implement me")
+}
+
+func (T *Scheduler) NewSource() rob.Worker {
+	// TODO implement me
+	panic("implement me")
+}
+
+var _ rob.Scheduler = (*Scheduler)(nil)
diff --git a/lib/util/rbtree/rbtree.go b/lib/util/rbtree/rbtree.go
index f22265cb..873d23d2 100644
--- a/lib/util/rbtree/rbtree.go
+++ b/lib/util/rbtree/rbtree.go
@@ -132,6 +132,14 @@ func (T *RBTree[K, V]) Min() (K, V, bool) {
 	return m.key, m.value, true
 }
 
+func (T *RBTree[K, V]) Max() (K, V, bool) {
+	m := T.max(T.root)
+	if m == nil {
+		return *new(K), *new(V), false
+	}
+	return m.key, m.value, true
+}
+
 func (T *RBTree[K, V]) rotateRight(n *node[K, V]) *node[K, V] {
 	if n == nil || n.left.getColor() == black {
 		panic("assertion failed")
@@ -242,6 +250,33 @@ func (T *RBTree[K, V]) Next(key K) (K, V, bool) {
 	return n.key, n.value, true
 }
 
+func (T *RBTree[K, V]) prev(n *node[K, V], key K) *node[K, V] {
+	if n == nil {
+		return nil
+	}
+	if n.key == key {
+		return T.max(n.left)
+	}
+	if n.key > key {
+		return T.prev(n.left, key)
+	}
+
+	// n.key < key
+	prev := T.prev(n.right, key)
+	if prev == nil {
+		return n
+	}
+	return prev
+}
+
+func (T *RBTree[K, V]) Prev(key K) (K, V, bool) {
+	n := T.prev(T.root, key)
+	if n == nil {
+		return *new(K), *new(V), false
+	}
+	return n.key, n.value, true
+}
+
 type color bool
 
 const (
diff --git a/lib/util/rbtree/rbtree_test.go b/lib/util/rbtree/rbtree_test.go
index 4878b46d..9bddbe83 100644
--- a/lib/util/rbtree/rbtree_test.go
+++ b/lib/util/rbtree/rbtree_test.go
@@ -35,6 +35,16 @@ func assertMin[K order, V comparable](t *testing.T, tree *RBTree[K, V], key K, v
 	}
 }
 
+func assertMax[K order, V comparable](t *testing.T, tree *RBTree[K, V], key K, value V) {
+	k, v, ok := tree.Max()
+	if !ok {
+		t.Error("expected tree to have values")
+	}
+	if k != key || v != value {
+		t.Error("expected key, value to be", key, value, "but got", k, v)
+	}
+}
+
 func assertNextSome[K order, V comparable](t *testing.T, tree *RBTree[K, V], after K, key K, value V) {
 	k, v, ok := tree.Next(after)
 	if !ok {
@@ -52,6 +62,23 @@ func assertNextNone[K order, V comparable](t *testing.T, tree *RBTree[K, V], aft
 	}
 }
 
+func assertPrevSome[K order, V comparable](t *testing.T, tree *RBTree[K, V], before K, key K, value V) {
+	k, v, ok := tree.Prev(before)
+	if !ok {
+		t.Error("expected tree to have another value")
+	}
+	if k != key || v != value {
+		t.Error("expected key, value to be", key, value, "but got", k, v)
+	}
+}
+
+func assertPrevNone[K order, V comparable](t *testing.T, tree *RBTree[K, V], before K) {
+	k, v, ok := tree.Prev(before)
+	if ok {
+		t.Error("expected tree to have no more values but got", k, v)
+	}
+}
+
 func TestRBTree_Insert(t *testing.T) {
 	tree := new(RBTree[int, int])
 	tree.Set(1, 2)
@@ -96,6 +123,17 @@ func TestRBTree_Iter(t *testing.T) {
 	assertNextNone(t, tree, 5)
 }
 
+func TestRBTree_IterRev(t *testing.T) {
+	tree := new(RBTree[int, int])
+	tree.Set(1, 2)
+	tree.Set(5, 6)
+	tree.Set(3, 4)
+	assertMax(t, tree, 5, 6)
+	assertPrevSome(t, tree, 5, 3, 4)
+	assertPrevSome(t, tree, 3, 1, 2)
+	assertPrevNone(t, tree, 1)
+}
+
 func TestRBTree_Stress(t *testing.T) {
 	const n = 1000000
 
-- 
GitLab