From e08f0e734b2731bb3b52d49283800ef8d7f63a01 Mon Sep 17 00:00:00 2001 From: Garet Halliday <me@garet.holiday> Date: Sat, 16 Sep 2023 02:11:18 -0500 Subject: [PATCH] one day this will work --- lib/gat/pool/pool.go | 2 -- lib/rob/schedulers/v2/scheduler.go | 10 ---------- lib/rob/schedulers/v2/sink/sink.go | 18 +++++++++++++----- lib/util/rbtree/rbtree_test.go | 30 ++++++++++++++++++++++++++++++ 4 files changed, 43 insertions(+), 17 deletions(-) diff --git a/lib/gat/pool/pool.go b/lib/gat/pool/pool.go index 759260e9..f5223b2d 100644 --- a/lib/gat/pool/pool.go +++ b/lib/gat/pool/pool.go @@ -233,8 +233,6 @@ func (T *Pool) scaleUpL1(name string, r *recipe.Recipe) error { return errors.New("recipe was removed") } - log.Printf("added server to %p", T) - id := T.options.Pooler.NewServer() server := NewServer( T.options, diff --git a/lib/rob/schedulers/v2/scheduler.go b/lib/rob/schedulers/v2/scheduler.go index 05adb9b2..2fe72f8f 100644 --- a/lib/rob/schedulers/v2/scheduler.go +++ b/lib/rob/schedulers/v2/scheduler.go @@ -8,7 +8,6 @@ import ( "pggat/lib/util/maps" "pggat/lib/util/pools" "sync" - "tuxpa.in/a/zlog/log" ) type Scheduler struct { @@ -38,7 +37,6 @@ func (T *Scheduler) NewWorker() uuid.UUID { T.sinks[worker] = s if len(T.backlog) > 0 { - log.Printf("%p adding %d jobs from backlog", T, len(T.backlog)) s.Enqueue(T.backlog...) T.backlog = T.backlog[:0] return worker @@ -94,9 +92,6 @@ func (T *Scheduler) tryAcquire(j job.Concurrent) uuid.UUID { } for id, v := range T.sinks { - if id == affinity { - continue - } if v.Acquire(j) { // set affinity T.affinity.Store(j.User, id) @@ -124,10 +119,6 @@ func (T *Scheduler) enqueue(j job.Stalled) { } for id, v := range T.sinks { - if id == affinity { - continue - } - v.Enqueue(j) T.affinity.Store(j.User, id) return @@ -136,7 +127,6 @@ func (T *Scheduler) enqueue(j job.Stalled) { // add to backlog T.bmu.Lock() defer T.bmu.Unlock() - log.Printf("%p adding to backlog", T) T.backlog = append(T.backlog, j) } diff --git a/lib/rob/schedulers/v2/sink/sink.go b/lib/rob/schedulers/v2/sink/sink.go index 8445c6d2..7b533f64 100644 --- a/lib/rob/schedulers/v2/sink/sink.go +++ b/lib/rob/schedulers/v2/sink/sink.go @@ -3,6 +3,7 @@ package sink import ( "sync" "time" + "tuxpa.in/a/zlog/log" "github.com/google/uuid" @@ -18,10 +19,11 @@ type Sink struct { active uuid.UUID start time.Time - floor time.Duration - stride map[uuid.UUID]time.Duration - pending map[uuid.UUID]*ring.Ring[job.Stalled] - scheduled rbtree.RBTree[time.Duration, job.Stalled] + floor time.Duration + stride map[uuid.UUID]time.Duration + pending map[uuid.UUID]*ring.Ring[job.Stalled] + scheduled rbtree.RBTree[time.Duration, job.Stalled] + scheduledLen int mu sync.Mutex } @@ -64,10 +66,10 @@ func (T *Sink) schedule(j job.Stalled) bool { return false } stride += 1 - continue } T.scheduled.Set(stride, j) + T.scheduledLen++ return true } @@ -162,9 +164,13 @@ func (T *Sink) next() bool { stride, j, ok := T.scheduled.Min() if !ok { + if T.scheduledLen != 0 { + log.Println("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA SCHEDULED LEN WAS NOT ZERO BUT FOUND NO MORE JOBS") + } return false } T.scheduled.Delete(stride) + T.scheduledLen-- if stride > T.floor { T.floor = stride } @@ -195,6 +201,7 @@ func (T *Sink) StealAll() []job.Stalled { break } } + T.scheduledLen = 0 for _, value := range T.pending { for { @@ -222,6 +229,7 @@ func (T *Sink) StealFor(rhs *Sink) uuid.UUID { return uuid.Nil } T.scheduled.Delete(stride) + T.scheduledLen-- user := j.User diff --git a/lib/util/rbtree/rbtree_test.go b/lib/util/rbtree/rbtree_test.go index 9bddbe83..929e4a39 100644 --- a/lib/util/rbtree/rbtree_test.go +++ b/lib/util/rbtree/rbtree_test.go @@ -173,3 +173,33 @@ func TestRBTree_Stress(t *testing.T) { _ = v } } + +func TestRBTree_Min2(t *testing.T) { + tree := new(RBTree[int, struct{}]) + + const n = 100000 + + for i := 0; i < n; i++ { + tree.Set(i, struct{}{}) + } + + for i := 0; i < n; i++ { + k, _, ok := tree.Min() + if !ok { + t.Error("expected tree to have min value") + return + } + + if k != i { + t.Error("out of order") + return + } + + tree.Delete(k) + } + + _, _, ok := tree.Min() + if ok { + t.Error("expected no more values") + } +} -- GitLab