diff --git a/lib/gat/pool/pool.go b/lib/gat/pool/pool.go index 759260e912e07a6d4e88a31766c55a1aedfc26d6..f5223b2d72bbe8f1065700846d6650a1621a7c3d 100644 --- a/lib/gat/pool/pool.go +++ b/lib/gat/pool/pool.go @@ -233,8 +233,6 @@ func (T *Pool) scaleUpL1(name string, r *recipe.Recipe) error { return errors.New("recipe was removed") } - log.Printf("added server to %p", T) - id := T.options.Pooler.NewServer() server := NewServer( T.options, diff --git a/lib/rob/schedulers/v2/scheduler.go b/lib/rob/schedulers/v2/scheduler.go index 05adb9b2d3692b8a6d8f7b49e85739d7fb96d656..2fe72f8f6ed987e3b9942d3f95de4ddeec559f12 100644 --- a/lib/rob/schedulers/v2/scheduler.go +++ b/lib/rob/schedulers/v2/scheduler.go @@ -8,7 +8,6 @@ import ( "pggat/lib/util/maps" "pggat/lib/util/pools" "sync" - "tuxpa.in/a/zlog/log" ) type Scheduler struct { @@ -38,7 +37,6 @@ func (T *Scheduler) NewWorker() uuid.UUID { T.sinks[worker] = s if len(T.backlog) > 0 { - log.Printf("%p adding %d jobs from backlog", T, len(T.backlog)) s.Enqueue(T.backlog...) T.backlog = T.backlog[:0] return worker @@ -94,9 +92,6 @@ func (T *Scheduler) tryAcquire(j job.Concurrent) uuid.UUID { } for id, v := range T.sinks { - if id == affinity { - continue - } if v.Acquire(j) { // set affinity T.affinity.Store(j.User, id) @@ -124,10 +119,6 @@ func (T *Scheduler) enqueue(j job.Stalled) { } for id, v := range T.sinks { - if id == affinity { - continue - } - v.Enqueue(j) T.affinity.Store(j.User, id) return @@ -136,7 +127,6 @@ func (T *Scheduler) enqueue(j job.Stalled) { // add to backlog T.bmu.Lock() defer T.bmu.Unlock() - log.Printf("%p adding to backlog", T) T.backlog = append(T.backlog, j) } diff --git a/lib/rob/schedulers/v2/sink/sink.go b/lib/rob/schedulers/v2/sink/sink.go index 8445c6d25baec59473d68d45c2e94d106177edb5..7b533f64ee60ce1ace284c005bfb7b07e19eb2ce 100644 --- a/lib/rob/schedulers/v2/sink/sink.go +++ b/lib/rob/schedulers/v2/sink/sink.go @@ -3,6 +3,7 @@ package sink import ( "sync" "time" + "tuxpa.in/a/zlog/log" "github.com/google/uuid" @@ -18,10 +19,11 @@ type Sink struct { active uuid.UUID start time.Time - floor time.Duration - stride map[uuid.UUID]time.Duration - pending map[uuid.UUID]*ring.Ring[job.Stalled] - scheduled rbtree.RBTree[time.Duration, job.Stalled] + floor time.Duration + stride map[uuid.UUID]time.Duration + pending map[uuid.UUID]*ring.Ring[job.Stalled] + scheduled rbtree.RBTree[time.Duration, job.Stalled] + scheduledLen int mu sync.Mutex } @@ -64,10 +66,10 @@ func (T *Sink) schedule(j job.Stalled) bool { return false } stride += 1 - continue } T.scheduled.Set(stride, j) + T.scheduledLen++ return true } @@ -162,9 +164,13 @@ func (T *Sink) next() bool { stride, j, ok := T.scheduled.Min() if !ok { + if T.scheduledLen != 0 { + log.Println("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA SCHEDULED LEN WAS NOT ZERO BUT FOUND NO MORE JOBS") + } return false } T.scheduled.Delete(stride) + T.scheduledLen-- if stride > T.floor { T.floor = stride } @@ -195,6 +201,7 @@ func (T *Sink) StealAll() []job.Stalled { break } } + T.scheduledLen = 0 for _, value := range T.pending { for { @@ -222,6 +229,7 @@ func (T *Sink) StealFor(rhs *Sink) uuid.UUID { return uuid.Nil } T.scheduled.Delete(stride) + T.scheduledLen-- user := j.User diff --git a/lib/util/rbtree/rbtree_test.go b/lib/util/rbtree/rbtree_test.go index 9bddbe835e34237f42534ad9f2e00f64621bc934..929e4a39e20f13561d2dbc7390af1d620f812dd7 100644 --- a/lib/util/rbtree/rbtree_test.go +++ b/lib/util/rbtree/rbtree_test.go @@ -173,3 +173,33 @@ func TestRBTree_Stress(t *testing.T) { _ = v } } + +func TestRBTree_Min2(t *testing.T) { + tree := new(RBTree[int, struct{}]) + + const n = 100000 + + for i := 0; i < n; i++ { + tree.Set(i, struct{}{}) + } + + for i := 0; i < n; i++ { + k, _, ok := tree.Min() + if !ok { + t.Error("expected tree to have min value") + return + } + + if k != i { + t.Error("out of order") + return + } + + tree.Delete(k) + } + + _, _, ok := tree.Min() + if ok { + t.Error("expected no more values") + } +}