From 95fe34c43a8ecc62bd0cadcde4b88e6d0a78df94 Mon Sep 17 00:00:00 2001 From: Garet Halliday <me@garet.holiday> Date: Mon, 15 May 2023 00:19:19 -0500 Subject: [PATCH] so much fun --- cmd/cgat/main.go | 11 +- lib/{mw2 => middleware}/context.go | 2 +- .../interceptor/context.go | 4 +- .../interceptor/interceptor.go | 6 +- lib/{mw2 => middleware}/middleware.go | 2 +- .../middlewares/eqp/client.go | 8 +- .../middlewares/eqp/portal.go | 0 .../middlewares/eqp/preparedStatement.go | 0 .../middlewares/eqp/server.go | 20 +- .../middlewares/unterminate/unterminate.go | 8 +- lib/{mw2 => middleware}/nil.go | 2 +- lib/rob/constraints.go | 2 +- lib/rob/scheduler.go | 6 +- lib/rob/schedulers/{v2 => v0}/job/job.go | 3 +- lib/rob/schedulers/v0/pool/pool.go | 35 ++ lib/rob/schedulers/v0/scheduler.go | 22 ++ lib/rob/schedulers/v0/sink/sink.go | 55 +++ lib/rob/schedulers/v0/source/source.go | 31 ++ lib/rob/schedulers/v2/pool/pool.go | 116 ------ lib/rob/schedulers/v2/queue/queue.go | 187 ---------- lib/rob/schedulers/v2/scheduler.go | 37 -- lib/rob/schedulers/v2/scheduler_test.go | 336 ------------------ lib/rob/schedulers/v2/sink/sink.go | 45 --- lib/rob/schedulers/v2/source/source.go | 31 -- lib/rob/sink.go | 5 - lib/rob/source.go | 6 - lib/rob/worker.go | 5 + .../onebuffer/onebuffer.go | 0 28 files changed, 184 insertions(+), 801 deletions(-) rename lib/{mw2 => middleware}/context.go (88%) rename lib/{mw2 => middleware}/interceptor/context.go (83%) rename lib/{mw2 => middleware}/interceptor/interceptor.go (89%) rename lib/{mw2 => middleware}/middleware.go (87%) rename lib/{mw2 => middleware}/middlewares/eqp/client.go (94%) rename lib/{mw2 => middleware}/middlewares/eqp/portal.go (100%) rename lib/{mw2 => middleware}/middlewares/eqp/preparedStatement.go (100%) rename lib/{mw2 => middleware}/middlewares/eqp/server.go (86%) rename lib/{mw2 => middleware}/middlewares/unterminate/unterminate.go (61%) rename lib/{mw2 => middleware}/nil.go (90%) rename lib/rob/schedulers/{v2 => v0}/job/job.go (99%) create mode 100644 lib/rob/schedulers/v0/pool/pool.go create mode 100644 lib/rob/schedulers/v0/scheduler.go create mode 100644 lib/rob/schedulers/v0/sink/sink.go create mode 100644 lib/rob/schedulers/v0/source/source.go delete mode 100644 lib/rob/schedulers/v2/pool/pool.go delete mode 100644 lib/rob/schedulers/v2/queue/queue.go delete mode 100644 lib/rob/schedulers/v2/scheduler.go delete mode 100644 lib/rob/schedulers/v2/scheduler_test.go delete mode 100644 lib/rob/schedulers/v2/sink/sink.go delete mode 100644 lib/rob/schedulers/v2/source/source.go delete mode 100644 lib/rob/sink.go delete mode 100644 lib/rob/source.go create mode 100644 lib/rob/worker.go rename lib/{middleware/middlewares => zap}/onebuffer/onebuffer.go (100%) diff --git a/cmd/cgat/main.go b/cmd/cgat/main.go index ebda2277..b663dbff 100644 --- a/cmd/cgat/main.go +++ b/cmd/cgat/main.go @@ -4,16 +4,15 @@ import ( "net" "net/http" _ "net/http/pprof" + "pggat2/lib/zap/onebuffer" "pggat2/lib/bouncer/backends/v0" "pggat2/lib/bouncer/bouncers/v1" "pggat2/lib/bouncer/frontends/v0" - "pggat2/lib/middleware/middlewares/onebuffer" - "pggat2/lib/mw2" - "pggat2/lib/mw2/interceptor" - "pggat2/lib/mw2/middlewares/unterminate" + "pggat2/lib/middleware" + "pggat2/lib/middleware/interceptor" + "pggat2/lib/middleware/middlewares/unterminate" "pggat2/lib/rob" - "pggat2/lib/rob/schedulers/v2" "pggat2/lib/zap" "pggat2/lib/zap/zio" ) @@ -62,7 +61,7 @@ func main() { source := r.NewSource() client := zio.MakeReadWriter(conn) ob := onebuffer.MakeOnebuffer(&client) - mw := interceptor.MakeInterceptor(&ob, []mw2.Middleware{ + mw := interceptor.MakeInterceptor(&ob, []middleware.Middleware{ unterminate.Unterminate, }) frontends.Accept(&mw) diff --git a/lib/mw2/context.go b/lib/middleware/context.go similarity index 88% rename from lib/mw2/context.go rename to lib/middleware/context.go index 06acdbd8..ba696bc9 100644 --- a/lib/mw2/context.go +++ b/lib/middleware/context.go @@ -1,4 +1,4 @@ -package mw2 +package middleware import "pggat2/lib/zap" diff --git a/lib/mw2/interceptor/context.go b/lib/middleware/interceptor/context.go similarity index 83% rename from lib/mw2/interceptor/context.go rename to lib/middleware/interceptor/context.go index 6d4d53bd..3df2fe1f 100644 --- a/lib/mw2/interceptor/context.go +++ b/lib/middleware/interceptor/context.go @@ -1,7 +1,7 @@ package interceptor import ( - "pggat2/lib/mw2" + "pggat2/lib/middleware" "pggat2/lib/util/decorator" "pggat2/lib/zap" ) @@ -27,4 +27,4 @@ func (T *Context) Cancel() { T.cancelled = true } -var _ mw2.Context = (*Context)(nil) +var _ middleware.Context = (*Context)(nil) diff --git a/lib/mw2/interceptor/interceptor.go b/lib/middleware/interceptor/interceptor.go similarity index 89% rename from lib/mw2/interceptor/interceptor.go rename to lib/middleware/interceptor/interceptor.go index c543d482..167029ab 100644 --- a/lib/mw2/interceptor/interceptor.go +++ b/lib/middleware/interceptor/interceptor.go @@ -1,16 +1,16 @@ package interceptor import ( - "pggat2/lib/mw2" + "pggat2/lib/middleware" "pggat2/lib/zap" ) type Interceptor struct { - middlewares []mw2.Middleware + middlewares []middleware.Middleware Context } -func MakeInterceptor(rw zap.ReadWriter, middlewares []mw2.Middleware) Interceptor { +func MakeInterceptor(rw zap.ReadWriter, middlewares []middleware.Middleware) Interceptor { return Interceptor{ middlewares: middlewares, Context: makeContext(rw), diff --git a/lib/mw2/middleware.go b/lib/middleware/middleware.go similarity index 87% rename from lib/mw2/middleware.go rename to lib/middleware/middleware.go index f1afde8e..dac5eb76 100644 --- a/lib/mw2/middleware.go +++ b/lib/middleware/middleware.go @@ -1,4 +1,4 @@ -package mw2 +package middleware import "pggat2/lib/zap" diff --git a/lib/mw2/middlewares/eqp/client.go b/lib/middleware/middlewares/eqp/client.go similarity index 94% rename from lib/mw2/middlewares/eqp/client.go rename to lib/middleware/middlewares/eqp/client.go index 70abce46..927fb6fe 100644 --- a/lib/mw2/middlewares/eqp/client.go +++ b/lib/middleware/middlewares/eqp/client.go @@ -3,7 +3,7 @@ package eqp import ( "errors" - "pggat2/lib/mw2" + "pggat2/lib/middleware" "pggat2/lib/zap" packets "pggat2/lib/zap/packets/v3.0" ) @@ -20,7 +20,7 @@ func MakeClient() Client { } } -func (T *Client) Send(_ mw2.Context, out zap.Out) error { +func (T *Client) Send(_ middleware.Context, out zap.Out) error { in := zap.OutToIn(out) switch in.Type() { case packets.ReadyForQuery: @@ -34,7 +34,7 @@ func (T *Client) Send(_ mw2.Context, out zap.Out) error { return nil } -func (T *Client) Read(ctx mw2.Context, in zap.In) error { +func (T *Client) Read(ctx middleware.Context, in zap.In) error { switch in.Type() { case packets.Query: // clobber unnamed portal and unnamed prepared statement @@ -146,4 +146,4 @@ func (T *Client) Read(ctx mw2.Context, in zap.In) error { return nil } -var _ mw2.Middleware = (*Client)(nil) +var _ middleware.Middleware = (*Client)(nil) diff --git a/lib/mw2/middlewares/eqp/portal.go b/lib/middleware/middlewares/eqp/portal.go similarity index 100% rename from lib/mw2/middlewares/eqp/portal.go rename to lib/middleware/middlewares/eqp/portal.go diff --git a/lib/mw2/middlewares/eqp/preparedStatement.go b/lib/middleware/middlewares/eqp/preparedStatement.go similarity index 100% rename from lib/mw2/middlewares/eqp/preparedStatement.go rename to lib/middleware/middlewares/eqp/preparedStatement.go diff --git a/lib/mw2/middlewares/eqp/server.go b/lib/middleware/middlewares/eqp/server.go similarity index 86% rename from lib/mw2/middlewares/eqp/server.go rename to lib/middleware/middlewares/eqp/server.go index 108f105f..8f5aec86 100644 --- a/lib/mw2/middlewares/eqp/server.go +++ b/lib/middleware/middlewares/eqp/server.go @@ -3,7 +3,7 @@ package eqp import ( "errors" - "pggat2/lib/mw2" + "pggat2/lib/middleware" "pggat2/lib/util/ring" "pggat2/lib/zap" packets "pggat2/lib/zap/packets/v3.0" @@ -50,7 +50,7 @@ func (T *Server) SetClient(client *Client) { T.peer = client } -func (T *Server) closePreparedStatement(ctx mw2.Context, target string) error { +func (T *Server) closePreparedStatement(ctx middleware.Context, target string) error { out := T.buf.Write() packets.WriteClose(out, 'S', target) err := ctx.Send(out) @@ -67,7 +67,7 @@ func (T *Server) closePreparedStatement(ctx mw2.Context, target string) error { return nil } -func (T *Server) closePortal(ctx mw2.Context, target string) error { +func (T *Server) closePortal(ctx middleware.Context, target string) error { out := T.buf.Write() packets.WriteClose(out, 'P', target) err := ctx.Send(out) @@ -84,7 +84,7 @@ func (T *Server) closePortal(ctx mw2.Context, target string) error { return nil } -func (T *Server) bindPreparedStatement(ctx mw2.Context, target string, preparedStatement PreparedStatement) error { +func (T *Server) bindPreparedStatement(ctx middleware.Context, target string, preparedStatement PreparedStatement) error { if target != "" { if _, ok := T.preparedStatements[target]; ok { err := T.closePreparedStatement(ctx, target) @@ -106,7 +106,7 @@ func (T *Server) bindPreparedStatement(ctx mw2.Context, target string, preparedS return nil } -func (T *Server) bindPortal(ctx mw2.Context, target string, portal Portal) error { +func (T *Server) bindPortal(ctx middleware.Context, target string, portal Portal) error { if target != "" { if _, ok := T.portals[target]; ok { err := T.closePortal(ctx, target) @@ -128,7 +128,7 @@ func (T *Server) bindPortal(ctx mw2.Context, target string, portal Portal) error return nil } -func (T *Server) syncPreparedStatement(ctx mw2.Context, target string) error { +func (T *Server) syncPreparedStatement(ctx middleware.Context, target string) error { // we can assume client has the prepared statement because it should be checked by eqp.Client expected := T.peer.preparedStatements[target] actual, ok := T.preparedStatements[target] @@ -138,7 +138,7 @@ func (T *Server) syncPreparedStatement(ctx mw2.Context, target string) error { return nil } -func (T *Server) syncPortal(ctx mw2.Context, target string) error { +func (T *Server) syncPortal(ctx middleware.Context, target string) error { expected := T.peer.portals[target] actual, ok := T.portals[target] if !ok || !expected.Equals(actual) { @@ -147,7 +147,7 @@ func (T *Server) syncPortal(ctx mw2.Context, target string) error { return nil } -func (T *Server) Send(ctx mw2.Context, out zap.Out) error { +func (T *Server) Send(ctx middleware.Context, out zap.Out) error { in := zap.OutToIn(out) switch in.Type() { case packets.Query: @@ -194,7 +194,7 @@ func (T *Server) Send(ctx mw2.Context, out zap.Out) error { return nil } -func (T *Server) Read(ctx mw2.Context, in zap.In) error { +func (T *Server) Read(ctx middleware.Context, in zap.In) error { switch in.Type() { case packets.ParseComplete: ctx.Cancel() @@ -233,4 +233,4 @@ func (T *Server) Read(ctx mw2.Context, in zap.In) error { return nil } -var _ mw2.Middleware = (*Server)(nil) +var _ middleware.Middleware = (*Server)(nil) diff --git a/lib/mw2/middlewares/unterminate/unterminate.go b/lib/middleware/middlewares/unterminate/unterminate.go similarity index 61% rename from lib/mw2/middlewares/unterminate/unterminate.go rename to lib/middleware/middlewares/unterminate/unterminate.go index 4fe9524e..0f3e0b7c 100644 --- a/lib/mw2/middlewares/unterminate/unterminate.go +++ b/lib/middleware/middlewares/unterminate/unterminate.go @@ -3,7 +3,7 @@ package unterminate import ( "io" - "pggat2/lib/mw2" + "pggat2/lib/middleware" "pggat2/lib/zap" packets "pggat2/lib/zap/packets/v3.0" ) @@ -11,14 +11,14 @@ import ( var Unterminate = unterm{} type unterm struct { - mw2.Nil + middleware.Nil } -func (unterm) Read(_ mw2.Context, in zap.In) error { +func (unterm) Read(_ middleware.Context, in zap.In) error { if in.Type() == packets.Terminate { return io.EOF } return nil } -var _ mw2.Middleware = unterm{} +var _ middleware.Middleware = unterm{} diff --git a/lib/mw2/nil.go b/lib/middleware/nil.go similarity index 90% rename from lib/mw2/nil.go rename to lib/middleware/nil.go index 1d3d5fde..f749bcd7 100644 --- a/lib/mw2/nil.go +++ b/lib/middleware/nil.go @@ -1,4 +1,4 @@ -package mw2 +package middleware import "pggat2/lib/zap" diff --git a/lib/rob/constraints.go b/lib/rob/constraints.go index a86179e5..4cdc4afa 100644 --- a/lib/rob/constraints.go +++ b/lib/rob/constraints.go @@ -1,6 +1,6 @@ package rob -// Constraints is a bitfield used to control which Sink a job runs on. +// Constraints is a bitfield used to control which Worker a job runs on. // They can be declared by using const ... rob.Constraints = 1 << iota. // Because Constraints is an int64, you may have a maximum of 64 constraints // diff --git a/lib/rob/scheduler.go b/lib/rob/scheduler.go index 2b97c43b..dc19f198 100644 --- a/lib/rob/scheduler.go +++ b/lib/rob/scheduler.go @@ -1,7 +1,7 @@ package rob type Scheduler interface { - // NewSink creates a new sink that fulfills input constraints - NewSink(fulfills Constraints) Sink - NewSource() Source + AddSink(Constraints, Worker) + + NewSource() Worker } diff --git a/lib/rob/schedulers/v2/job/job.go b/lib/rob/schedulers/v0/job/job.go similarity index 99% rename from lib/rob/schedulers/v2/job/job.go rename to lib/rob/schedulers/v0/job/job.go index 2d5dae90..81424f14 100644 --- a/lib/rob/schedulers/v2/job/job.go +++ b/lib/rob/schedulers/v0/job/job.go @@ -2,12 +2,11 @@ package job import ( "github.com/google/uuid" - "pggat2/lib/rob" ) type Job struct { Source uuid.UUID - Work any Constraints rob.Constraints + Work any } diff --git a/lib/rob/schedulers/v0/pool/pool.go b/lib/rob/schedulers/v0/pool/pool.go new file mode 100644 index 00000000..7c342e0a --- /dev/null +++ b/lib/rob/schedulers/v0/pool/pool.go @@ -0,0 +1,35 @@ +package pool + +import ( + "github.com/google/uuid" + "pggat2/lib/rob/schedulers/v0/job" + "pggat2/lib/rob/schedulers/v0/sink" + "sync" +) + +type Pool struct { + sinks map[uuid.UUID]*sink.Sink + mu sync.RWMutex +} + +// Do attempts to do the work. +// Returns true if the work was done, otherwise the sender should wait on their stall chan for the next node +func (T *Pool) Do(j job.Job) bool { + T.mu.RLock() + defer T.mu.RUnlock() + + // TODO(garet) choose affinity, prefer idle nodes + for _, v := range T.sinks { + if v.DoIfIdle(j) { + return + } + } + + panic("no available sinks") +} + +func (T *Pool) AddSink(s *sink.Sink) { + T.mu.Lock() + defer T.mu.Unlock() + T.sinks[s.ID()] = s +} diff --git a/lib/rob/schedulers/v0/scheduler.go b/lib/rob/schedulers/v0/scheduler.go new file mode 100644 index 00000000..14c0ae35 --- /dev/null +++ b/lib/rob/schedulers/v0/scheduler.go @@ -0,0 +1,22 @@ +package schedulers + +import ( + "pggat2/lib/rob" + "pggat2/lib/rob/schedulers/v0/pool" + "pggat2/lib/rob/schedulers/v0/sink" + "pggat2/lib/rob/schedulers/v0/source" +) + +type Scheduler struct { + pool pool.Pool +} + +func (T *Scheduler) AddSink(constraints rob.Constraints, worker rob.Worker) { + T.pool.AddSink(sink.NewSink(constraints, worker)) +} + +func (T *Scheduler) NewSource() rob.Worker { + return source.MakeSource(&T.pool) +} + +var _ rob.Scheduler = (*Scheduler)(nil) diff --git a/lib/rob/schedulers/v0/sink/sink.go b/lib/rob/schedulers/v0/sink/sink.go new file mode 100644 index 00000000..61f6d05b --- /dev/null +++ b/lib/rob/schedulers/v0/sink/sink.go @@ -0,0 +1,55 @@ +package sink + +import ( + "github.com/google/uuid" + "pggat2/lib/rob" + "pggat2/lib/rob/schedulers/v0/job" +) + +type Sink struct { + id uuid.UUID + constraints rob.Constraints + worker rob.Worker +} + +func NewSink(constraints rob.Constraints, worker rob.Worker) *Sink { + return &Sink{ + id: uuid.New(), + constraints: constraints, + worker: worker, + } +} + +func (T *Sink) ID() uuid.UUID { + return T.id +} + +func (T *Sink) Constraints() rob.Constraints { + return T.constraints +} + +// DoIfIdle will call Do if the target Sink is idle. +// Returns true if the job is complete +func (T *Sink) DoIfIdle(j job.Job) bool { + if !T.constraints.Satisfies(j.Constraints) { + return false + } + + // TODO(garet) check if idle + + T.Do(j) + return true +} + +// Do will do the work if the constraints match +// Returns true if the job is complete +func (T *Sink) Do(j job.Job) bool { + if !T.constraints.Satisfies(j.Constraints) { + return false + } + + // TODO(garet) queue if we are too busy + + T.worker.Do(j.Constraints, j.Work) + return true +} diff --git a/lib/rob/schedulers/v0/source/source.go b/lib/rob/schedulers/v0/source/source.go new file mode 100644 index 00000000..445daca7 --- /dev/null +++ b/lib/rob/schedulers/v0/source/source.go @@ -0,0 +1,31 @@ +package source + +import ( + "github.com/google/uuid" + "pggat2/lib/rob" + "pggat2/lib/rob/schedulers/v0/job" + "pggat2/lib/rob/schedulers/v0/pool" +) + +type Source struct { + id uuid.UUID + stall chan rob.Worker + pool *pool.Pool +} + +func MakeSource(p *pool.Pool) Source { + return Source{ + id: uuid.New(), + pool: p, + } +} + +func (T Source) Do(constraints rob.Constraints, work any) { + T.pool.Do(job.Job{ + Source: T.id, + Constraints: constraints, + Work: work, + }) +} + +var _ rob.Worker = Source{} diff --git a/lib/rob/schedulers/v2/pool/pool.go b/lib/rob/schedulers/v2/pool/pool.go deleted file mode 100644 index d4f24031..00000000 --- a/lib/rob/schedulers/v2/pool/pool.go +++ /dev/null @@ -1,116 +0,0 @@ -package pool - -import ( - "sync" - - "github.com/google/uuid" - - "pggat2/lib/rob" - "pggat2/lib/rob/schedulers/v2/job" - "pggat2/lib/rob/schedulers/v2/queue" -) - -type constrainedQueue struct { - queue *queue.Queue - constraints rob.Constraints -} - -type Pool struct { - affinity map[uuid.UUID]uuid.UUID - queues map[uuid.UUID]constrainedQueue - backorder []job.Job - mu sync.Mutex -} - -func MakePool() Pool { - return Pool{ - affinity: make(map[uuid.UUID]uuid.UUID), - queues: make(map[uuid.UUID]constrainedQueue), - } -} - -func (T *Pool) NewQueue(id uuid.UUID, constraints rob.Constraints) *queue.Queue { - q := queue.NewQueue() - - T.mu.Lock() - defer T.mu.Unlock() - - T.queues[id] = constrainedQueue{ - queue: q, - constraints: constraints, - } - - i := 0 - for _, j := range T.backorder { - if constraints.Satisfies(j.Constraints) { - q.Queue(j) - } else { - T.backorder[i] = j - i++ - } - } - T.backorder = T.backorder[:i] - - return q -} - -func (T *Pool) Schedule(work job.Job) { - T.mu.Lock() - defer T.mu.Unlock() - - if len(T.queues) == 0 { - T.backorder = append(T.backorder, work) - return - } - - var q constrainedQueue - affinity, ok := T.affinity[work.Source] - if ok { - q = T.queues[affinity] - } - - if !ok || !q.constraints.Satisfies(work.Constraints) || !q.queue.Idle() { - // choose a new affinity that satisfies constraints - ok = false - for id, s := range T.queues { - if s.constraints.Satisfies(work.Constraints) { - current := id == affinity - q = s - affinity = id - ok = true - if !current && s.queue.Idle() { - // prefer idle core, if not idle try to see if we can find one that is - break - } - } - } - if !ok { - T.backorder = append(T.backorder, work) - return - } - T.affinity[work.Source] = affinity - } - - // yay, queued - q.queue.Queue(work) -} - -func (T *Pool) StealFor(id uuid.UUID) { - T.mu.Lock() - defer T.mu.Unlock() - - q, ok := T.queues[id] - if !ok { - return - } - - for _, s := range T.queues { - if s == q { - continue - } - if source, ok := s.queue.Steal(q.constraints, q.queue); ok { - T.affinity[source] = id - break - } - } -} diff --git a/lib/rob/schedulers/v2/queue/queue.go b/lib/rob/schedulers/v2/queue/queue.go deleted file mode 100644 index 71477792..00000000 --- a/lib/rob/schedulers/v2/queue/queue.go +++ /dev/null @@ -1,187 +0,0 @@ -package queue - -import ( - "sync" - "time" - - "github.com/google/uuid" - - "pggat2/lib/rob" - "pggat2/lib/rob/schedulers/v2/job" - "pggat2/lib/util/rbtree" - "pggat2/lib/util/ring" -) - -type Queue struct { - signal chan struct{} - - active uuid.UUID - start time.Time - - floor time.Duration - - stride map[uuid.UUID]time.Duration - pending map[uuid.UUID]*ring.Ring[job.Job] - scheduled rbtree.RBTree[time.Duration, job.Job] - mu sync.Mutex -} - -func NewQueue() *Queue { - return &Queue{ - stride: make(map[uuid.UUID]time.Duration), - pending: make(map[uuid.UUID]*ring.Ring[job.Job]), - signal: make(chan struct{}), - } -} - -func (T *Queue) Idle() bool { - T.mu.Lock() - defer T.mu.Unlock() - - return T.active == uuid.Nil -} - -func (T *Queue) HasWork() bool { - T.mu.Lock() - defer T.mu.Unlock() - - _, _, ok := T.scheduled.Min() - return ok -} - -func (T *Queue) Queue(work job.Job) { - T.mu.Lock() - defer T.mu.Unlock() - - T.queue(work) -} - -func (T *Queue) queue(work job.Job) { - // try to schedule right away - if ok := T.scheduleWork(work); ok { - return - } - - // add to pending queue - if _, ok := T.pending[work.Source]; !ok { - T.pending[work.Source] = new(ring.Ring[job.Job]) - } - - T.pending[work.Source].PushBack(work) -} - -// schedule the next work for source -func (T *Queue) schedule(source uuid.UUID) { - pending, ok := T.pending[source] - if !ok { - return - } - work, ok := pending.Get(0) - if !ok { - return - } - if ok = T.scheduleWork(work); !ok { - return - } - pending.PopFront() -} - -func (T *Queue) scheduleWork(work job.Job) bool { - if T.active == work.Source { - return false - } - - stride := T.stride[work.Source] - if stride < T.floor { - stride = T.floor - T.stride[work.Source] = stride - } - - for { - // find unique stride to schedule on - if j, ok := T.scheduled.Get(stride); ok { - if j.Source == work.Source { - return false - } - stride += 1 - continue - } - - T.scheduled.Set(stride, work) - break - } - - // signal that more work is available if someone is waiting - select { - case T.signal <- struct{}{}: - default: - } - - return true -} - -// Steal work from this Sink that is satisfied by constraints -func (T *Queue) Steal(constraints rob.Constraints, dst *Queue) (uuid.UUID, bool) { - if T == dst { - // cannot steal from ourselves - return uuid.Nil, false - } - - T.mu.Lock() - defer T.mu.Unlock() - - for stride, work, ok := T.scheduled.Min(); ok; stride, work, ok = T.scheduled.Next(stride) { - if constraints.Satisfies(work.Constraints) { - source := work.Source - - dst.mu.Lock() - defer dst.mu.Unlock() - - // steal it - T.scheduled.Delete(stride) - - dst.queue(work) - - // steal pending - pending, _ := T.pending[work.Source] - - for work, ok = pending.PopFront(); ok; work, ok = pending.PopFront() { - dst.queue(work) - } - - return source, true - } - } - - // no stealable work - return uuid.Nil, false -} - -func (T *Queue) Ready() <-chan struct{} { - return T.signal -} - -func (T *Queue) Read() (job.Job, bool) { - T.mu.Lock() - defer T.mu.Unlock() - - if T.active != uuid.Nil { - source := T.active - dur := time.Since(T.start) - T.active = uuid.Nil - - T.stride[source] += dur - T.schedule(source) - } - - stride, j, ok := T.scheduled.Min() - if !ok { - return job.Job{}, false - } - T.scheduled.Delete(stride) - T.floor = stride - - T.active = j.Source - T.start = time.Now() - return j, true -} diff --git a/lib/rob/schedulers/v2/scheduler.go b/lib/rob/schedulers/v2/scheduler.go deleted file mode 100644 index c28f49e7..00000000 --- a/lib/rob/schedulers/v2/scheduler.go +++ /dev/null @@ -1,37 +0,0 @@ -package schedulers - -import ( - "github.com/google/uuid" - - "pggat2/lib/rob" - "pggat2/lib/rob/schedulers/v2/pool" - "pggat2/lib/rob/schedulers/v2/sink" - "pggat2/lib/rob/schedulers/v2/source" -) - -type Scheduler struct { - pool pool.Pool -} - -func MakeScheduler() Scheduler { - return Scheduler{ - pool: pool.MakePool(), - } -} - -func NewScheduler() *Scheduler { - scheduler := MakeScheduler() - return &scheduler -} - -func (T *Scheduler) NewSink(fulfills rob.Constraints) rob.Sink { - id := uuid.New() - q := T.pool.NewQueue(id, fulfills) - return sink.MakeSink(id, &T.pool, q) -} - -func (T *Scheduler) NewSource() rob.Source { - return source.MakeSource(&T.pool) -} - -var _ rob.Scheduler = (*Scheduler)(nil) diff --git a/lib/rob/schedulers/v2/scheduler_test.go b/lib/rob/schedulers/v2/scheduler_test.go deleted file mode 100644 index 14200287..00000000 --- a/lib/rob/schedulers/v2/scheduler_test.go +++ /dev/null @@ -1,336 +0,0 @@ -package schedulers - -import ( - "runtime" - "sync" - "testing" - "time" - - "pggat2/lib/rob" -) - -type Work struct { - Sender int - Duration time.Duration - Done chan<- struct{} - Constraints rob.Constraints -} - -type ShareTable struct { - table map[int]int - mu sync.RWMutex -} - -func (T *ShareTable) Inc(user int) { - T.mu.Lock() - defer T.mu.Unlock() - - if T.table == nil { - T.table = make(map[int]int) - } - T.table[user]++ -} - -func (T *ShareTable) Get(user int) int { - T.mu.RLock() - defer T.mu.RUnlock() - - v, _ := T.table[user] - return v -} - -func testSink(sched *Scheduler, table *ShareTable, constraints rob.Constraints) { - sink := sched.NewSink(constraints) - for { - w := sink.Read() - switch v := w.(type) { - case Work: - if !constraints.Satisfies(v.Constraints) { - panic("Scheduler did not obey constraints") - } - // dummy load - start := time.Now() - for time.Since(start) < v.Duration { - } - table.Inc(v.Sender) - close(v.Done) - } - } -} - -func testSource(sched *Scheduler, id int, dur time.Duration, constraints rob.Constraints) { - source := sched.NewSource() - for { - done := make(chan struct{}) - w := Work{ - Sender: id, - Duration: dur, - Done: done, - Constraints: constraints, - } - source.Schedule(w, constraints) - <-done - } -} - -func similar(v0, v1 int, vn ...int) bool { - const margin = 0.05 // 5% margin of error - - min := v0 - max := v0 - - if v1 < min { - min = v1 - } - if v1 > max { - max = v1 - } - - for _, v := range vn { - if v < min { - min = v - } - if v > max { - max = v - } - } - - if (float64(max-min) / float64(max)) > margin { - return false - } - return true -} - -// like debug.Stack but gets all stacks -func allStacks() []byte { - buf := make([]byte, 1024) - for { - n := runtime.Stack(buf, true) - if n < len(buf) { - return buf[:n] - } - buf = make([]byte, 2*len(buf)) - } -} - -func TestScheduler(t *testing.T) { - var table ShareTable - sched := NewScheduler() - go testSink(sched, &table, 0) - - go testSource(sched, 0, 10*time.Millisecond, 0) - go testSource(sched, 1, 10*time.Millisecond, 0) - go testSource(sched, 2, 50*time.Millisecond, 0) - go testSource(sched, 3, 100*time.Millisecond, 0) - - time.Sleep(20 * time.Second) - t0 := table.Get(0) - t1 := table.Get(1) - t2 := table.Get(2) - t3 := table.Get(3) - - /* - Expectations: - - 0 and 1 should be similar and have roughly 10x of 3 - - 2 should have about twice as many executions as 3 - */ - - t.Log("share of 0:", t0) - t.Log("share of 1:", t1) - t.Log("share of 2:", t2) - t.Log("share of 3:", t3) - - if !similar(t0, t1) { - t.Error("expected s0 and s1 to be similar") - } - - if !similar(t0, t3*10) { - t.Error("expected s0 and s3*10 to be similar") - } - - if !similar(t2, t3*2) { - t.Error("expected s2 and s3*2 to be similar") - } -} - -func TestScheduler_Late(t *testing.T) { - var table ShareTable - sched := NewScheduler() - go testSink(sched, &table, 0) - - go testSource(sched, 0, 10*time.Millisecond, 0) - go testSource(sched, 1, 10*time.Millisecond, 0) - - time.Sleep(10 * time.Second) - - go testSource(sched, 2, 10*time.Millisecond, 0) - go testSource(sched, 3, 10*time.Millisecond, 0) - - time.Sleep(10 * time.Second) - t0 := table.Get(0) - t1 := table.Get(1) - t2 := table.Get(2) - t3 := table.Get(3) - - /* - Expectations: - - 0 and 1 should be similar - - 2 and 3 should be similar - - 0 and 1 should have roughly three times as many executions as 2 and 3 - */ - - t.Log("share of 0:", t0) - t.Log("share of 1:", t1) - t.Log("share of 2:", t2) - t.Log("share of 3:", t3) - - if !similar(t0, t1) { - t.Error("expected s0 and s1 to be similar") - } - - if !similar(t2, t3) { - t.Error("expected s2 and s3 to be similar") - } - - if !similar(t0, 3*t2) { - t.Error("expected s0 and s2*3 to be similar") - } -} - -func TestScheduler_StealBalanced(t *testing.T) { - var table ShareTable - sched := NewScheduler() - go testSink(sched, &table, 0) - go testSink(sched, &table, 0) - - go testSource(sched, 0, 10*time.Millisecond, 0) - go testSource(sched, 1, 10*time.Millisecond, 0) - go testSource(sched, 2, 10*time.Millisecond, 0) - go testSource(sched, 3, 10*time.Millisecond, 0) - - time.Sleep(20 * time.Second) - t0 := table.Get(0) - t1 := table.Get(1) - t2 := table.Get(2) - t3 := table.Get(3) - - /* - Expectations: - - all users should get similar # of executions - */ - - t.Log("share of 0:", t0) - t.Log("share of 1:", t1) - t.Log("share of 2:", t2) - t.Log("share of 3:", t3) - - if !similar(t0, t1, t2, t3) { - t.Error("expected all shares to be similar") - } - - if t0 == 0 { - t.Error("expected executions on all sources (is there a race in the balancer??)") - t.Errorf("%s", allStacks()) - } -} - -func TestScheduler_StealUnbalanced(t *testing.T) { - var table ShareTable - sched := NewScheduler() - go testSink(sched, &table, 0) - go testSink(sched, &table, 0) - - go testSource(sched, 0, 10*time.Millisecond, 0) - go testSource(sched, 1, 10*time.Millisecond, 0) - go testSource(sched, 2, 10*time.Millisecond, 0) - - time.Sleep(20 * time.Second) - t0 := table.Get(0) - t1 := table.Get(1) - t2 := table.Get(2) - - /* - Expectations: - - all users should get similar # of executions - */ - - t.Log("share of 0:", t0) - t.Log("share of 1:", t1) - t.Log("share of 2:", t2) - - if !similar(t0, t1, t2) { - t.Error("expected all shares to be similar") - } - - if t0 == 0 { - t.Error("expected executions on all sources (is there a race in the balancer??)") - t.Errorf("%s", allStacks()) - } -} - -func TestScheduler_Constraints(t *testing.T) { - const ( - ConstraintA rob.Constraints = 1 << iota - ConstraintB - ) - - var table ShareTable - sched := NewScheduler() - - go testSink(sched, &table, rob.Constraints.All(ConstraintA, ConstraintB)) - go testSink(sched, &table, ConstraintA) - go testSink(sched, &table, ConstraintB) - - go testSource(sched, 0, 10*time.Millisecond, rob.Constraints.All(ConstraintA, ConstraintB)) - go testSource(sched, 1, 10*time.Millisecond, rob.Constraints.All(ConstraintA, ConstraintB)) - go testSource(sched, 2, 10*time.Millisecond, ConstraintA) - go testSource(sched, 3, 10*time.Millisecond, ConstraintA) - go testSource(sched, 4, 10*time.Millisecond, ConstraintB) - go testSource(sched, 5, 10*time.Millisecond, ConstraintB) - - time.Sleep(20 * time.Second) - t0 := table.Get(0) - t1 := table.Get(1) - t2 := table.Get(2) - t3 := table.Get(3) - t4 := table.Get(4) - t5 := table.Get(5) - - /* - Expectations: - - all users should get similar # of executions (shares of 0 and 1 may be less because they have less sinks they can use: 1 vs 2) - - all constraints should be honored - */ - - t.Log("share of 0:", t0) - t.Log("share of 1:", t1) - t.Log("share of 2:", t2) - t.Log("share of 3:", t3) - t.Log("share of 4:", t4) - t.Log("share of 5:", t5) -} - -func TestScheduler_IdleWake(t *testing.T) { - var table ShareTable - sched := NewScheduler() - - go testSink(sched, &table, 0) - - time.Sleep(10 * time.Second) - - go testSource(sched, 0, 10*time.Millisecond, 0) - - time.Sleep(10 * time.Second) - t0 := table.Get(0) - - /* - Expectations: - - 0 should have some executions - */ - - if t0 == 0 { - t.Error("expected executions to be greater than 0 (is idle waking broken?)") - } - - t.Log("share of 0:", t0) -} diff --git a/lib/rob/schedulers/v2/sink/sink.go b/lib/rob/schedulers/v2/sink/sink.go deleted file mode 100644 index b786a88f..00000000 --- a/lib/rob/schedulers/v2/sink/sink.go +++ /dev/null @@ -1,45 +0,0 @@ -package sink - -import ( - "github.com/google/uuid" - - "pggat2/lib/rob" - "pggat2/lib/rob/schedulers/v2/pool" - "pggat2/lib/rob/schedulers/v2/queue" -) - -type Sink struct { - id uuid.UUID - pool *pool.Pool - queue *queue.Queue -} - -func MakeSink(id uuid.UUID, p *pool.Pool, q *queue.Queue) Sink { - return Sink{ - id: id, - pool: p, - queue: q, - } -} - -func (T Sink) findWork() { - T.pool.StealFor(T.id) - // see if we stole some work - if T.queue.HasWork() { - return - } - // there is no work to steal, wait until some more is scheduled - <-T.queue.Ready() -} - -func (T Sink) Read() any { - for { - v, ok := T.queue.Read() - if ok { - return v.Work - } - T.findWork() - } -} - -var _ rob.Sink = Sink{} diff --git a/lib/rob/schedulers/v2/source/source.go b/lib/rob/schedulers/v2/source/source.go deleted file mode 100644 index ac70054c..00000000 --- a/lib/rob/schedulers/v2/source/source.go +++ /dev/null @@ -1,31 +0,0 @@ -package source - -import ( - "github.com/google/uuid" - - "pggat2/lib/rob" - "pggat2/lib/rob/schedulers/v2/job" - "pggat2/lib/rob/schedulers/v2/pool" -) - -type Source struct { - uuid uuid.UUID - pool *pool.Pool -} - -func MakeSource(p *pool.Pool) Source { - return Source{ - uuid: uuid.New(), - pool: p, - } -} - -func (T Source) Schedule(work any, constraints rob.Constraints) { - T.pool.Schedule(job.Job{ - Source: T.uuid, - Work: work, - Constraints: constraints, - }) -} - -var _ rob.Source = Source{} diff --git a/lib/rob/sink.go b/lib/rob/sink.go deleted file mode 100644 index 0f9fbf5e..00000000 --- a/lib/rob/sink.go +++ /dev/null @@ -1,5 +0,0 @@ -package rob - -type Sink interface { - Read() any -} diff --git a/lib/rob/source.go b/lib/rob/source.go deleted file mode 100644 index d5a40253..00000000 --- a/lib/rob/source.go +++ /dev/null @@ -1,6 +0,0 @@ -package rob - -type Source interface { - // Schedule work with constraints. Work will run on a Sink that at least fulfills these constraints - Schedule(work any, constraints Constraints) -} diff --git a/lib/rob/worker.go b/lib/rob/worker.go new file mode 100644 index 00000000..82bf3868 --- /dev/null +++ b/lib/rob/worker.go @@ -0,0 +1,5 @@ +package rob + +type Worker interface { + Do(constraints Constraints, work any) +} diff --git a/lib/middleware/middlewares/onebuffer/onebuffer.go b/lib/zap/onebuffer/onebuffer.go similarity index 100% rename from lib/middleware/middlewares/onebuffer/onebuffer.go rename to lib/zap/onebuffer/onebuffer.go -- GitLab