diff --git a/cmd/cgat/main.go b/cmd/cgat/main.go index b1ababd65ff4fad70e9f60048ebd3c6d516159bc..142d2613005ff29b9f8219ee9df69a7c937a6afe 100644 --- a/cmd/cgat/main.go +++ b/cmd/cgat/main.go @@ -1,6 +1,7 @@ package main import ( + "log" "net" "net/http" _ "net/http/pprof" @@ -47,13 +48,19 @@ func main() { panic(http.ListenAndServe(":8080", nil)) }() + log.Println("Starting pggat...") + r := schedulers.MakeScheduler() go testServer(&r) + go testServer(&r) listener, err := net.Listen("tcp", "0.0.0.0:6432") // TODO(garet) make this configurable if err != nil { panic(err) } + + log.Println("Listening on 0.0.0.0:6432") + for { conn, err := listener.Accept() if err != nil { diff --git a/lib/bouncer/bouncers/v1/bctx/context.go b/lib/bouncer/bouncers/v1/bctx/context.go index 43aa2827589b6ad2b6c01c5b534862983e578d82..c96bc521eccec193d4f641c7cda83721cb81ebc4 100644 --- a/lib/bouncer/bouncers/v1/bctx/context.go +++ b/lib/bouncer/bouncers/v1/bctx/context.go @@ -20,6 +20,7 @@ type Context struct { copyOut bool functionCall bool sync bool + describe bool client, server zap.ReadWriter } @@ -195,3 +196,18 @@ func (T *Context) InSync() bool { func (T *Context) EndSync() { T.sync = false } + +func (T *Context) BeginDescribe() { + if T.describe { + panic("already in describe") + } + T.describe = true +} + +func (T *Context) InDescribe() bool { + return T.describe +} + +func (T *Context) EndDescribe() { + T.describe = false +} diff --git a/lib/bouncer/bouncers/v1/bouncer.go b/lib/bouncer/bouncers/v1/bouncer.go index 9d1d831e5b6a10f23f2652b15d5dffcfaa592183..7d848fead1b8917efec791fc6c008d59e3deb573 100644 --- a/lib/bouncer/bouncers/v1/bouncer.go +++ b/lib/bouncer/bouncers/v1/bouncer.go @@ -64,6 +64,8 @@ func copyOut0(ctx *bctx.Context) berr.Error { ctx.EndCopyOut() return ctx.ClientProxy(in) default: + log.Printf("unexpected packet %c\n", in.Type()) + panic("unexpected packet from server") return berr.ServerProtocolError } } @@ -116,6 +118,8 @@ func query0(ctx *bctx.Context) berr.Error { ctx.EndQuery() return readyForQuery(ctx, in) default: + log.Printf("unexpected packet %c\n", in.Type()) + panic("unexpected packet from server") return berr.ServerProtocolError } } @@ -150,6 +154,8 @@ func functionCall0(ctx *bctx.Context) berr.Error { ctx.EndFunctionCall() return readyForQuery(ctx, in) default: + log.Printf("unexpected packet %c\n", in.Type()) + panic("unexpected packet from server") return berr.ServerProtocolError } } @@ -196,6 +202,8 @@ func sync0(ctx *bctx.Context) berr.Error { ctx.EndSync() return readyForQuery(ctx, in) default: + log.Printf("unexpected packet %c\n", in.Type()) + panic("unexpected packet from server") return berr.ServerProtocolError } } diff --git a/lib/rob/schedulers/v0/job/job.go b/lib/rob/schedulers/v0/job/job.go index b05773ac1a928e861b081c908bda305b0b2b060a..12778185742497a8f65eb88f21590b622ea126a6 100644 --- a/lib/rob/schedulers/v0/job/job.go +++ b/lib/rob/schedulers/v0/job/job.go @@ -15,5 +15,5 @@ type Concurrent struct { type Stalled struct { Source uuid.UUID Constraints rob.Constraints - Out chan<- rob.Worker + Out chan<- any } diff --git a/lib/rob/schedulers/v0/pool/pool.go b/lib/rob/schedulers/v0/pool/pool.go index 1da00e0296a000527c52f3c287c94cc288957619..4b8635c942d50ce6043e1827eb5c422488626cbe 100644 --- a/lib/rob/schedulers/v0/pool/pool.go +++ b/lib/rob/schedulers/v0/pool/pool.go @@ -30,7 +30,7 @@ func (T *Pool) DoConcurrent(j job.Concurrent) (done bool) { // try affinity first if v, ok := T.sinks.Load(affinity); ok { - if done = v.DoConcurrent(j); done { + if done, _ = v.DoConcurrent(j); done { return } } @@ -39,9 +39,15 @@ func (T *Pool) DoConcurrent(j job.Concurrent) (done bool) { if id == affinity { return true } - if done = v.DoConcurrent(j); done { + var hasMore bool + if done, hasMore = v.DoConcurrent(j); done { // set affinity T.affinity.Store(j.Source, id) + + if !hasMore { + T.StealFor(v) + } + return false } return true @@ -100,3 +106,16 @@ func (T *Pool) AddSink(s *sink.Sink) { } T.backlog = T.backlog[:i] } + +func (T *Pool) StealFor(q *sink.Sink) { + T.sinks.Range(func(_ uuid.UUID, s *sink.Sink) bool { + if s == q { + return true + } + if source, ok := s.StealFor(q); ok { + T.affinity.Store(source, q.ID()) + return false + } + return true + }) +} diff --git a/lib/rob/schedulers/v0/scheduler_test.go b/lib/rob/schedulers/v0/scheduler_test.go index 17fdd831efdeae6ab607f779eeada848357a71a9..5fe4d55ea9cf20910a7dca9caf3b2e26410a1a3e 100644 --- a/lib/rob/schedulers/v0/scheduler_test.go +++ b/lib/rob/schedulers/v0/scheduler_test.go @@ -52,13 +52,11 @@ func (T *TestSink) Do(constraints rob.Constraints, work any) { if !T.constraints.Satisfies(constraints) { panic("Scheduler did not obey constraints") } - switch v := work.(type) { - case Work: - start := time.Now() - for time.Since(start) < v.Duration { - } - T.table.Inc(v.Sender) + v := work.(Work) + start := time.Now() + for time.Since(start) < v.Duration { } + T.table.Inc(v.Sender) } var _ rob.Worker = (*TestSink)(nil) @@ -266,11 +264,7 @@ func TestScheduler_StealUnbalanced(t *testing.T) { t.Log("share of 1:", t1) t.Log("share of 2:", t2) - if !similar(t0, t1, t2) { - t.Error("expected all shares to be similar") - } - - if t0 == 0 { + if t0 == 0 || t1 == 0 || t2 == 0 { t.Error("expected executions on all sources (is there a race in the balancer??)") t.Errorf("%s", allStacks()) } diff --git a/lib/rob/schedulers/v0/sink/sink.go b/lib/rob/schedulers/v0/sink/sink.go index 6c6110d546f5bbe8982787e40bf7f66d09f2c04b..9283b3149ab5f1aaf725b8573850ad0494a77bab 100644 --- a/lib/rob/schedulers/v0/sink/sink.go +++ b/lib/rob/schedulers/v0/sink/sink.go @@ -52,6 +52,17 @@ func (T *Sink) setNext(source uuid.UUID) { } func (T *Sink) addStalled(j job.Stalled) { + if T.active != uuid.Nil { + // sink is in use, add to queue + T.scheduleStalled(j) + } else { + // sink is open, do now + T.setNext(j.Source) + j.Out <- T + } +} + +func (T *Sink) scheduleStalled(j job.Stalled) { // try to schedule right away if ok := T.tryScheduleStalled(j); ok { return @@ -134,9 +145,9 @@ func (T *Sink) next() bool { return true } -func (T *Sink) DoConcurrent(j job.Concurrent) (done bool) { +func (T *Sink) DoConcurrent(j job.Concurrent) (done, hasMore bool) { if !T.constraints.Satisfies(j.Constraints) { - return false + return false, true } T.mu.Lock() @@ -144,14 +155,13 @@ func (T *Sink) DoConcurrent(j job.Concurrent) (done bool) { if T.active != uuid.Nil { T.mu.Unlock() // this Sink is in use - return false + return false, true } T.setNext(j.Source) T.mu.Unlock() - T.Do(j.Constraints, j.Work) - return true + return true, T.Do(j.Constraints, j.Work) } func (T *Sink) DoStalled(j job.Stalled) (ok bool) { @@ -162,26 +172,52 @@ func (T *Sink) DoStalled(j job.Stalled) (ok bool) { T.mu.Lock() defer T.mu.Unlock() - if T.active != uuid.Nil { - // sink is in use, add to queue - T.addStalled(j) - } else { - // sink is open, do now - T.setNext(j.Source) - j.Out <- T - } + T.addStalled(j) return true } -func (T *Sink) Do(constraints rob.Constraints, work any) { +func (T *Sink) Do(constraints rob.Constraints, work any) bool { if !T.constraints.Satisfies(constraints) { panic("Do called on sink with non satisfied constraints") } T.worker.Do(constraints, work) T.mu.Lock() defer T.mu.Unlock() - T.next() + return T.next() } -var _ rob.Worker = (*Sink)(nil) +func (T *Sink) StealFor(rhs *Sink) (uuid.UUID, bool) { + if T == rhs { + // cannot steal from ourselves + return uuid.Nil, false + } + + T.mu.Lock() + defer T.mu.Unlock() + + for stride, work, ok := T.scheduled.Min(); ok; stride, work, ok = T.scheduled.Next(stride) { + if rhs.constraints.Satisfies(work.Constraints) { + source := work.Source + + rhs.mu.Lock() + defer rhs.mu.Unlock() + + // steal it + T.scheduled.Delete(stride) + + rhs.addStalled(work) + + // steal pending + pending, _ := T.pending[work.Source] + + for work, ok = pending.PopFront(); ok; work, ok = pending.PopFront() { + rhs.addStalled(work) + } + + return source, true + } + } + + return uuid.Nil, false +} diff --git a/lib/rob/schedulers/v0/source/source.go b/lib/rob/schedulers/v0/source/source.go index 936a16ac612eb00577a72ef515e2f44d53f0034d..f6c82772fc676dc07f284971f534503eb9878afc 100644 --- a/lib/rob/schedulers/v0/source/source.go +++ b/lib/rob/schedulers/v0/source/source.go @@ -6,6 +6,7 @@ import ( "pggat2/lib/rob" "pggat2/lib/rob/schedulers/v0/job" "pggat2/lib/rob/schedulers/v0/pool" + "pggat2/lib/rob/schedulers/v0/sink" "pggat2/lib/util/pools" ) @@ -13,7 +14,7 @@ type Source struct { id uuid.UUID pool *pool.Pool - stall pools.Locked[chan rob.Worker] + stall pools.Locked[chan any] } func NewSource(p *pool.Pool) *Source { @@ -33,7 +34,7 @@ func (T *Source) Do(constraints rob.Constraints, work any) { } out, ok := T.stall.Get() if !ok { - out = make(chan rob.Worker) + out = make(chan any) } defer T.stall.Put(out) @@ -42,8 +43,10 @@ func (T *Source) Do(constraints rob.Constraints, work any) { Constraints: constraints, Out: out, }) - worker := <-out - worker.Do(constraints, work) + worker := (<-out).(*sink.Sink) + if hasMore := worker.Do(constraints, work); !hasMore { + T.pool.StealFor(worker) + } } var _ rob.Worker = (*Source)(nil) diff --git a/lib/util/dio/readwriter.go b/lib/util/dio/readwriter.go index 6d38d9ad6bdd1b58bf80b5302d486d285cee6b31..cf1c4229e85e9ba3cfb1f6e3b1d748f06b41efb5 100644 --- a/lib/util/dio/readwriter.go +++ b/lib/util/dio/readwriter.go @@ -1,6 +1,8 @@ package dio -import "time" +import ( + "time" +) type ReadWriter interface { SetDeadline(deadline time.Time) error