diff --git a/lib/gat/pool/pools/transaction/pool.go b/lib/gat/pool/pools/transaction/pool.go index 78a0ae528b314c528a14fb9ecc92d5a86961a0fd..0c9faf01916ed9c39d333ad699d1ebe9e6a8b672 100644 --- a/lib/gat/pool/pools/transaction/pool.go +++ b/lib/gat/pool/pools/transaction/pool.go @@ -3,7 +3,7 @@ package transaction import "pggat2/lib/gat/pool" func NewPool(options pool.Options) *pool.Pool { - // TODO(garet) options.Pooler = new(Pooler) + options.Pooler = new(Pooler) options.ParameterStatusSync = pool.ParameterStatusSyncDynamic options.ExtendedQuerySync = true return pool.NewPool(options) diff --git a/lib/gat/pool/pools/transaction/pooler.go b/lib/gat/pool/pools/transaction/pooler.go new file mode 100644 index 0000000000000000000000000000000000000000..00350f40a204d649e5bc3986d832257f5f68bdae --- /dev/null +++ b/lib/gat/pool/pools/transaction/pooler.go @@ -0,0 +1,46 @@ +package transaction + +import ( + "github.com/google/uuid" + + "pggat2/lib/gat/pool" + "pggat2/lib/rob/schedulers/v2" +) + +type Pooler struct { + s schedulers.Scheduler +} + +func (T *Pooler) AddClient(client uuid.UUID) { + T.s.AddUser(client) +} + +func (T *Pooler) RemoveClient(client uuid.UUID) { + T.s.RemoveUser(client) +} + +func (T *Pooler) AddServer(server uuid.UUID) { + T.s.AddWorker(server) +} + +func (T *Pooler) RemoveServer(server uuid.UUID) { + T.s.RemoveWorker(server) +} + +func (T *Pooler) AcquireConcurrent(client uuid.UUID) uuid.UUID { + return T.s.AcquireConcurrent(client) +} + +func (T *Pooler) AcquireAsync(client uuid.UUID) uuid.UUID { + return T.s.AcquireAsync(client) +} + +func (*Pooler) ReleaseAfterTransaction() bool { + return true +} + +func (T *Pooler) Release(server uuid.UUID) { + T.s.Release(server) +} + +var _ pool.Pooler = (*Pooler)(nil) diff --git a/lib/rob/constraints.go b/lib/rob/constraints.go deleted file mode 100644 index 4cdc4afa63d81a72edc53a9ed9905396b4f5fc54..0000000000000000000000000000000000000000 --- a/lib/rob/constraints.go +++ /dev/null @@ -1,33 +0,0 @@ -package rob - -// Constraints is a bitfield used to control which Worker a job runs on. -// They can be declared by using const ... rob.Constraints = 1 << iota. -// Because Constraints is an int64, you may have a maximum of 64 constraints -// -// Example: -/* - const ( - ConstraintOne rob.Constraints = 1 << iota - ConstraintTwo - ConstraintThree - ) - - var All = rob.Constraints.All( - ConstraintOne, - ConstraintTwo, - ConstraintThree, - ) -*/ -type Constraints int64 - -func (T Constraints) All(cn ...Constraints) Constraints { - v := T - for _, c := range cn { - v |= c - } - return v -} - -func (T Constraints) Satisfies(other Constraints) bool { - return (other & T) == other -} diff --git a/lib/rob/constraints_test.go b/lib/rob/constraints_test.go deleted file mode 100644 index 1874080573ac2d558a0965170f3f67d46e10b93e..0000000000000000000000000000000000000000 --- a/lib/rob/constraints_test.go +++ /dev/null @@ -1,55 +0,0 @@ -package rob - -import "testing" - -const ( - ConstraintOne Constraints = 1 << iota - ConstraintTwo - ConstraintThree - ConstraintFour -) - -func TestConstraints_All(t *testing.T) { - all := Constraints.All(ConstraintOne, ConstraintTwo, ConstraintThree, ConstraintFour) - if all != 0b1111 { - t.Error("expected all bits to be set") - } - odd := Constraints.All(ConstraintOne, ConstraintThree) - if odd != 0b0101 { - t.Error("expected odd bits to be set") - } - even := Constraints.All(ConstraintTwo, ConstraintFour) - if even != 0b1010 { - t.Error("expected even bits to be set") - } -} - -func TestConstraints_Satisfies(t *testing.T) { - all := Constraints.All(ConstraintOne, ConstraintTwo, ConstraintThree, ConstraintFour) - if ConstraintOne.Satisfies(all) { - t.Error("expected one to not satisfy all") - } - odd := Constraints.All(ConstraintOne, ConstraintThree) - if odd.Satisfies(all) { - t.Error("expected odd to not satisfy all") - } - if ConstraintOne.Satisfies(odd) { - t.Error("expected one to not satisfy odd") - } - even := Constraints.All(ConstraintTwo, ConstraintFour) - if even.Satisfies(all) { - t.Error("expected even to not satisfy all") - } - if ConstraintOne.Satisfies(even) { - t.Error("expected one to not satisfy even") - } - if !even.Satisfies(even) { - t.Error("expected even to satisfy even") - } - if !all.Satisfies(even) { - t.Error("expected all to satisfy even") - } - if !all.Satisfies(odd) { - t.Error("expected all to satisfy odd") - } -} diff --git a/lib/rob/context.go b/lib/rob/context.go deleted file mode 100644 index 30438e6f12c8e3eed57319bef3a1f02df3437348..0000000000000000000000000000000000000000 --- a/lib/rob/context.go +++ /dev/null @@ -1,15 +0,0 @@ -package rob - -type Context struct { - OnWait chan<- struct{} - Constraints Constraints - Removed bool -} - -func (T *Context) Remove() { - T.Removed = true -} - -func (T *Context) Reset() { - T.Removed = false -} diff --git a/lib/rob/metrics.go b/lib/rob/metrics.go deleted file mode 100644 index b752c595d756775e0d2e0d8a92e77db82a8fa8cc..0000000000000000000000000000000000000000 --- a/lib/rob/metrics.go +++ /dev/null @@ -1,93 +0,0 @@ -package rob - -import ( - "fmt" - "time" - - "github.com/google/uuid" -) - -type WorkerMetrics struct { - LastActive time.Time - - Idle time.Duration - Active time.Duration -} - -type JobMetrics struct { - Created time.Time - Backlogged bool -} - -type Metrics struct { - Jobs map[uuid.UUID]JobMetrics - Workers map[uuid.UUID]WorkerMetrics -} - -func (T *Metrics) BackloggedJobCount() int { - count := 0 - - for _, job := range T.Jobs { - if job.Backlogged { - count++ - } - } - - return count -} - -func (T *Metrics) AverageJobAge() time.Duration { - now := time.Now() - - sum := time.Duration(0) - count := len(T.Jobs) - - for _, job := range T.Jobs { - sum += now.Sub(job.Created) - } - - if count == 0 { - return 0 - } - - return sum / time.Duration(count) -} - -func (T *Metrics) MaxJobAge() time.Duration { - now := time.Now() - - max := time.Duration(0) - - for _, job := range T.Jobs { - age := now.Sub(job.Created) - if age > max { - max = age - } - } - - return max -} - -func (T *Metrics) AverageWorkerUtilization() float64 { - if len(T.Workers) == 0 { - if T.BackloggedJobCount() > 0 { - return 1 - } else { - return 0 - } - } - - idle := time.Duration(0) - active := time.Duration(0) - - for _, worker := range T.Workers { - idle += worker.Idle - active += worker.Active - } - - return float64(active) / float64(idle+active) -} - -func (T *Metrics) String() string { - return fmt.Sprintf("%d queued jobs (%d backlogged, %s avg age, %s max age) / %d workers (%.2f%% util)", len(T.Jobs), T.BackloggedJobCount(), T.AverageJobAge().String(), T.MaxJobAge().String(), len(T.Workers), T.AverageWorkerUtilization()*100) -} diff --git a/lib/rob/scheduler.go b/lib/rob/scheduler.go index be8926eab0289c66d9ef385903d3f0b7870c28ba..282adf21ee02dac42ca5ab794b218f16a564cd6a 100644 --- a/lib/rob/scheduler.go +++ b/lib/rob/scheduler.go @@ -1,19 +1,31 @@ package rob import ( - "time" - "github.com/google/uuid" ) type Scheduler interface { - AddWorker(constraints Constraints, worker Worker) uuid.UUID - GetWorker(id uuid.UUID) Worker - GetIdleWorker() (uuid.UUID, time.Time) - RemoveWorker(id uuid.UUID) Worker - WorkerCount() int + AddWorker(worker uuid.UUID) + RemoveWorker(worker uuid.UUID) + + AddUser(user uuid.UUID) + RemoveUser(user uuid.UUID) + + // AcquireConcurrent tries to acquire a peer for the user without stalling. + // Returns uuid.Nil if no peer can be acquired + AcquireConcurrent(user uuid.UUID) uuid.UUID + // AcquireAsync will stall until a peer is available + AcquireAsync(user uuid.UUID) uuid.UUID + + // Release will release a worker. + // This should be called after acquire unless the worker is removed with RemoveWorker + Release(worker uuid.UUID) +} - NewSource() Worker +func Acquire(scheduler Scheduler, user uuid.UUID) uuid.UUID { + if s := scheduler.AcquireConcurrent(user); s != uuid.Nil { + return s + } - ReadMetrics(metrics *Metrics) + return scheduler.AcquireAsync(user) } diff --git a/lib/rob/schedulers/v1/pool/job/base.go b/lib/rob/schedulers/v1/pool/job/base.go deleted file mode 100644 index c8a292df196cf3d18c889187ff2e8126301a76dd..0000000000000000000000000000000000000000 --- a/lib/rob/schedulers/v1/pool/job/base.go +++ /dev/null @@ -1,16 +0,0 @@ -package job - -import ( - "time" - - "github.com/google/uuid" - - "pggat2/lib/rob" -) - -type Base struct { - Created time.Time - ID uuid.UUID - Source uuid.UUID - Context *rob.Context -} diff --git a/lib/rob/schedulers/v1/pool/job/concurrent.go b/lib/rob/schedulers/v1/pool/job/concurrent.go deleted file mode 100644 index 71c4226386608a1be75bc0cca91dddc68949159e..0000000000000000000000000000000000000000 --- a/lib/rob/schedulers/v1/pool/job/concurrent.go +++ /dev/null @@ -1,6 +0,0 @@ -package job - -type Concurrent struct { - Base - Work any -} diff --git a/lib/rob/schedulers/v1/pool/job/stalled.go b/lib/rob/schedulers/v1/pool/job/stalled.go deleted file mode 100644 index c7aa6a34f15f498a73f816992910676a11b543b5..0000000000000000000000000000000000000000 --- a/lib/rob/schedulers/v1/pool/job/stalled.go +++ /dev/null @@ -1,8 +0,0 @@ -package job - -import "github.com/google/uuid" - -type Stalled struct { - Base - Ready chan uuid.UUID -} diff --git a/lib/rob/schedulers/v1/pool/pool.go b/lib/rob/schedulers/v1/pool/pool.go deleted file mode 100644 index 12e09ba12197f998dbcbf79950fda60f066ee5bd..0000000000000000000000000000000000000000 --- a/lib/rob/schedulers/v1/pool/pool.go +++ /dev/null @@ -1,261 +0,0 @@ -package pool - -import ( - "sync" - "time" - - "github.com/google/uuid" - - "pggat2/lib/rob" - "pggat2/lib/rob/schedulers/v1/pool/job" - "pggat2/lib/rob/schedulers/v1/pool/sink" - "pggat2/lib/util/maps" -) - -type Pool struct { - affinity maps.RWLocked[uuid.UUID, uuid.UUID] - - // backlog should only be accessed when mu is Locked in some way or another - backlog []job.Stalled - bmu sync.Mutex - sinks map[uuid.UUID]*sink.Sink - mu sync.RWMutex -} - -func MakePool() Pool { - return Pool{ - sinks: make(map[uuid.UUID]*sink.Sink), - } -} - -func (T *Pool) ExecuteConcurrent(j job.Concurrent) bool { - affinity, _ := T.affinity.Load(j.Source) - - // these can be unlocked and locked a bunch here because it is less bad if ExecuteConcurrent misses a sink - // (it will just stall the job and try again) - T.mu.RLock() - - // try affinity first - if v, ok := T.sinks[affinity]; ok { - T.mu.RUnlock() - if done, hasMore := v.ExecuteConcurrent(j); done { - if j.Context.Removed { - T.RemoveWorker(affinity) - return true - } - if !hasMore { - T.stealFor(affinity) - } - return true - } - T.mu.RLock() - } - - for id, v := range T.sinks { - if id == affinity { - continue - } - T.mu.RUnlock() - if ok, hasMore := v.ExecuteConcurrent(j); ok { - if j.Context.Removed { - T.RemoveWorker(id) - return true - } - - // set affinity - T.affinity.Store(j.Source, id) - - if !hasMore { - T.stealFor(id) - } - - return true - } - T.mu.RLock() - } - - T.mu.RUnlock() - return false -} - -func (T *Pool) ExecuteStalled(j job.Stalled) { - affinity, _ := T.affinity.Load(j.Source) - - T.mu.RLock() - defer T.mu.RUnlock() - - // try affinity first - if v, ok := T.sinks[affinity]; ok { - if ok = v.ExecuteStalled(j); ok { - return - } - } - - for id, v := range T.sinks { - if id == affinity { - continue - } - - if ok := v.ExecuteStalled(j); ok { - T.affinity.Store(j.Source, id) - return - } - } - - // add to backlog - T.bmu.Lock() - defer T.bmu.Unlock() - T.backlog = append(T.backlog, j) -} - -func (T *Pool) AddWorker(constraints rob.Constraints, worker rob.Worker) uuid.UUID { - id := uuid.New() - s := sink.NewSink(id, constraints, worker) - - func() { - T.mu.Lock() - defer T.mu.Unlock() - // if mu is locked, we don't need to lock bmu, because we are the only accessor - T.sinks[id] = s - i := 0 - for _, v := range T.backlog { - if ok := s.ExecuteStalled(v); !ok { - T.backlog[i] = v - i++ - } - } - T.backlog = T.backlog[:i] - }() - - T.stealFor(id) - - return id -} - -func (T *Pool) GetWorker(id uuid.UUID) rob.Worker { - T.mu.RLock() - defer T.mu.RUnlock() - s, ok := T.sinks[id] - if !ok { - return nil - } - return s.GetWorker() -} - -func (T *Pool) GetIdleWorker() (id uuid.UUID, idleStart time.Time) { - T.mu.RLock() - defer T.mu.RUnlock() - - for i, s := range T.sinks { - start := s.IdleStart() - if idleStart == (time.Time{}) || start.Before(idleStart) { - idleStart = start - id = i - } - } - - return -} - -func (T *Pool) RemoveWorker(id uuid.UUID) rob.Worker { - var s *sink.Sink - var ok bool - func() { - T.mu.Lock() - defer T.mu.Unlock() - s, ok = T.sinks[id] - delete(T.sinks, id) - }() - if !ok { - return nil - } - - // now we need to reschedule all the work that was scheduled to s (stalled only). - jobs := s.StealAll() - - for _, j := range jobs { - T.ExecuteStalled(j) - } - - return s.GetWorker() -} - -func (T *Pool) WorkerCount() int { - T.mu.RLock() - defer T.mu.RUnlock() - return len(T.sinks) -} - -func (T *Pool) stealFor(id uuid.UUID) { - T.mu.RLock() - - s, ok := T.sinks[id] - if !ok { - T.mu.RUnlock() - return - } - - for _, v := range T.sinks { - if v == s { - continue - } - - T.mu.RUnlock() - if src := v.StealFor(s); src != uuid.Nil { - T.affinity.Store(src, id) - return - } - T.mu.RLock() - } - - T.mu.RUnlock() -} - -func (T *Pool) Execute(id uuid.UUID, ctx *rob.Context, work any) { - var s *sink.Sink - func() { - T.mu.RLock() - defer T.mu.RUnlock() - s = T.sinks[id] - }() - - hasMore := s.Execute(ctx, work) - if ctx.Removed { - // remove - T.RemoveWorker(id) - return - } - if !hasMore { - // try to steal - T.stealFor(id) - } -} - -func (T *Pool) ReadMetrics(metrics *rob.Metrics) { - maps.Clear(metrics.Jobs) - if metrics.Jobs == nil { - metrics.Jobs = make(map[uuid.UUID]rob.JobMetrics) - } - maps.Clear(metrics.Workers) - if metrics.Workers == nil { - metrics.Workers = make(map[uuid.UUID]rob.WorkerMetrics) - } - - T.mu.RLock() - defer T.mu.RUnlock() - - func() { - T.bmu.Lock() - defer T.bmu.Unlock() - for _, j := range T.backlog { - metrics.Jobs[j.ID] = rob.JobMetrics{ - Created: j.Created, - Backlogged: true, - } - } - }() - - for _, worker := range T.sinks { - worker.ReadMetrics(metrics) - } -} diff --git a/lib/rob/schedulers/v1/pool/sink/sink.go b/lib/rob/schedulers/v1/pool/sink/sink.go deleted file mode 100644 index 682b36703a3413d582a906da07da8143a0fc26c6..0000000000000000000000000000000000000000 --- a/lib/rob/schedulers/v1/pool/sink/sink.go +++ /dev/null @@ -1,346 +0,0 @@ -package sink - -import ( - "sync" - "time" - - "github.com/google/uuid" - - "pggat2/lib/rob" - "pggat2/lib/rob/schedulers/v1/pool/job" - "pggat2/lib/util/rbtree" - "pggat2/lib/util/ring" -) - -type Sink struct { - id uuid.UUID - constraints rob.Constraints - worker rob.Worker - - // non final - - // metrics - lastMetricsRead time.Time - idle time.Duration - - active uuid.UUID - start time.Time - - floor time.Duration - stride map[uuid.UUID]time.Duration - pending map[uuid.UUID]*ring.Ring[job.Stalled] - scheduled rbtree.RBTree[time.Duration, job.Stalled] - - mu sync.Mutex -} - -func NewSink(id uuid.UUID, constraints rob.Constraints, worker rob.Worker) *Sink { - now := time.Now() - - return &Sink{ - id: id, - constraints: constraints, - worker: worker, - - lastMetricsRead: now, - start: now, - - stride: make(map[uuid.UUID]time.Duration), - pending: make(map[uuid.UUID]*ring.Ring[job.Stalled]), - } -} - -func (T *Sink) IdleStart() time.Time { - T.mu.Lock() - defer T.mu.Unlock() - if T.active != uuid.Nil { - return time.Time{} - } - - return T.start -} - -func (T *Sink) GetWorker() rob.Worker { - return T.worker -} - -func (T *Sink) setActive(source uuid.UUID) { - if T.active != uuid.Nil { - panic("set active called when another was active") - } - now := time.Now() - start := T.start - if start.Before(T.lastMetricsRead) { - start = T.lastMetricsRead - } - T.idle += now.Sub(start) - T.active = source - T.start = now -} - -func (T *Sink) ExecuteConcurrent(j job.Concurrent) (ok, hasMore bool) { - if !T.constraints.Satisfies(j.Context.Constraints) { - return false, false - } - - var wasInUse bool - - func() { - T.mu.Lock() - defer T.mu.Unlock() - - if T.active != uuid.Nil { - wasInUse = true - return - } - - T.setActive(j.Source) - }() - if wasInUse { - return false, false - } - - return true, T.Execute(j.Context, j.Work) -} - -func (T *Sink) trySchedule(j job.Stalled) bool { - if T.active == j.Source { - // shouldn't be scheduled yet - return false - } - - stride, ok := T.stride[j.Source] - if !ok { - // set to max - stride = T.floor - if s, _, ok := T.scheduled.Max(); ok { - stride = s + 1 - } - T.stride[j.Source] = stride - } else if stride < T.floor { - stride = T.floor - T.stride[j.Source] = stride - } - - for { - // find unique stride to schedule on - if s, ok := T.scheduled.Get(stride); ok { - if s.Source == j.Source { - return false - } - stride += 1 - continue - } - - T.scheduled.Set(stride, j) - return true - } -} - -func (T *Sink) enqueue(j job.Stalled) { - if T.trySchedule(j) { - return - } - - p, ok := T.pending[j.Source] - - // add to pending queue - if !ok { - p = ring.NewRing[job.Stalled](0, 1) - T.pending[j.Source] = p - } - - p.PushBack(j) -} - -func (T *Sink) ExecuteStalled(j job.Stalled) bool { - if !T.constraints.Satisfies(j.Context.Constraints) { - return false - } - - // enqueue job - T.mu.Lock() - defer T.mu.Unlock() - - if T.active == uuid.Nil { - // run it now - T.setActive(j.Source) - j.Ready <- T.id - return true - } - - // enqueue for running later - T.enqueue(j) - return true -} - -func (T *Sink) enqueueNextFor(source uuid.UUID) { - pending, ok := T.pending[source] - if !ok { - return - } - j, ok := pending.PopFront() - if !ok { - return - } - if ok = T.trySchedule(j); !ok { - pending.PushFront(j) - return - } -} - -func (T *Sink) next() bool { - now := time.Now() - if T.active != uuid.Nil { - source := T.active - dur := now.Sub(T.start) - T.active = uuid.Nil - T.start = now - - T.stride[source] += dur - - T.enqueueNextFor(source) - } - - stride, j, ok := T.scheduled.Min() - if !ok { - return false - } - T.scheduled.Delete(stride) - if stride > T.floor { - T.floor = stride - } - - T.setActive(j.Source) - j.Ready <- T.id - return true -} - -func (T *Sink) Execute(ctx *rob.Context, work any) (hasMore bool) { - T.worker.Do(ctx, work) - if ctx.Removed { - return false - } - - // queue next - T.mu.Lock() - defer T.mu.Unlock() - return T.next() -} - -func (T *Sink) StealFor(rhs *Sink) uuid.UUID { - if T == rhs { - // cannot steal from ourselves - return uuid.Nil - } - - T.mu.Lock() - defer T.mu.Unlock() - - for stride, j, ok := T.scheduled.Min(); ok; stride, j, ok = T.scheduled.Next(stride) { - if rhs.constraints.Satisfies(j.Context.Constraints) { - source := j.Source - - // take jobs from T - T.scheduled.Delete(stride) - - pending, _ := T.pending[source] - delete(T.pending, source) - - func() { - T.mu.Unlock() - defer T.mu.Lock() - - rhs.ExecuteStalled(j) - - for j, ok = pending.PopFront(); ok; j, ok = pending.PopFront() { - rhs.ExecuteStalled(j) - } - }() - - if pending != nil { - if _, ok = T.pending[source]; !ok { - T.pending[source] = pending - } - } - - return source - } - } - - return uuid.Nil -} - -func (T *Sink) StealAll() []job.Stalled { - var all []job.Stalled - - T.mu.Lock() - defer T.mu.Unlock() - - for { - if k, j, ok := T.scheduled.Min(); ok { - T.scheduled.Delete(k) - all = append(all, j) - } else { - break - } - } - - for _, value := range T.pending { - for { - if j, ok := value.PopFront(); ok { - all = append(all, j) - } else { - break - } - } - } - - return all -} - -func (T *Sink) ReadMetrics(metrics *rob.Metrics) { - T.mu.Lock() - defer T.mu.Unlock() - - now := time.Now() - - var lastActive time.Time - - dur := now.Sub(T.lastMetricsRead) - - if T.active == uuid.Nil { - lastActive = T.start - - start := T.start - if start.Before(T.lastMetricsRead) { - start = T.lastMetricsRead - } - T.idle += now.Sub(start) - } - - metrics.Workers[T.id] = rob.WorkerMetrics{ - LastActive: lastActive, - - Idle: T.idle, - Active: dur - T.idle, - } - - T.lastMetricsRead = now - T.idle = 0 - - for _, pending := range T.pending { - for i := 0; i < pending.Length(); i++ { - j, _ := pending.Get(i) - metrics.Jobs[j.ID] = rob.JobMetrics{ - Created: j.Created, - } - } - } - - for k, v, ok := T.scheduled.Min(); ok; k, v, ok = T.scheduled.Next(k) { - metrics.Jobs[v.ID] = rob.JobMetrics{ - Created: v.Created, - } - } -} diff --git a/lib/rob/schedulers/v1/scheduler.go b/lib/rob/schedulers/v1/scheduler.go deleted file mode 100644 index ed32e815d96d7ef4c309329d08906fdaed078dac..0000000000000000000000000000000000000000 --- a/lib/rob/schedulers/v1/scheduler.go +++ /dev/null @@ -1,56 +0,0 @@ -package schedulers - -import ( - "time" - - "github.com/google/uuid" - - "pggat2/lib/rob" - "pggat2/lib/rob/schedulers/v1/pool" - "pggat2/lib/rob/schedulers/v1/source" -) - -type Scheduler struct { - pool pool.Pool -} - -func MakeScheduler() Scheduler { - return Scheduler{ - pool: pool.MakePool(), - } -} - -func NewScheduler() *Scheduler { - s := MakeScheduler() - return &s -} - -func (T *Scheduler) AddWorker(constraints rob.Constraints, worker rob.Worker) uuid.UUID { - return T.pool.AddWorker(constraints, worker) -} - -func (T *Scheduler) GetWorker(id uuid.UUID) rob.Worker { - return T.pool.GetWorker(id) -} - -func (T *Scheduler) GetIdleWorker() (uuid.UUID, time.Time) { - return T.pool.GetIdleWorker() -} - -func (T *Scheduler) RemoveWorker(id uuid.UUID) rob.Worker { - return T.pool.RemoveWorker(id) -} - -func (T *Scheduler) WorkerCount() int { - return T.pool.WorkerCount() -} - -func (T *Scheduler) NewSource() rob.Worker { - return source.NewSource(&T.pool) -} - -func (T *Scheduler) ReadMetrics(metrics *rob.Metrics) { - T.pool.ReadMetrics(metrics) -} - -var _ rob.Scheduler = (*Scheduler)(nil) diff --git a/lib/rob/schedulers/v1/source/source.go b/lib/rob/schedulers/v1/source/source.go deleted file mode 100644 index ae9e4b44161a0bf16c3cd9779d2543adecf2995f..0000000000000000000000000000000000000000 --- a/lib/rob/schedulers/v1/source/source.go +++ /dev/null @@ -1,64 +0,0 @@ -package source - -import ( - "time" - - "github.com/google/uuid" - - "pggat2/lib/rob" - "pggat2/lib/rob/schedulers/v1/pool" - "pggat2/lib/rob/schedulers/v1/pool/job" - "pggat2/lib/util/chans" - "pggat2/lib/util/pools" -) - -type Source struct { - id uuid.UUID - pool *pool.Pool - - stall pools.Locked[chan uuid.UUID] -} - -func NewSource(p *pool.Pool) *Source { - return &Source{ - id: uuid.New(), - pool: p, - } -} - -func (T *Source) Do(ctx *rob.Context, work any) { - ctx.Reset() - - base := job.Base{ - Created: time.Now(), - ID: uuid.New(), - Source: T.id, - Context: ctx, - } - if T.pool.ExecuteConcurrent(job.Concurrent{ - Base: base, - Work: work, - }) { - return - } - - if ctx.OnWait != nil { - chans.TrySend(ctx.OnWait, struct{}{}) - } - - out, ok := T.stall.Get() - if !ok { - out = make(chan uuid.UUID, 1) - } - defer T.stall.Put(out) - - T.pool.ExecuteStalled(job.Stalled{ - Base: base, - Ready: out, - }) - worker := <-out - T.pool.Execute(worker, ctx, work) - return -} - -var _ rob.Worker = (*Source)(nil) diff --git a/lib/rob/schedulers/v2/job/concurrent.go b/lib/rob/schedulers/v2/job/concurrent.go new file mode 100644 index 0000000000000000000000000000000000000000..f868fe595dc83bfe42892ace1b53e026af6463c2 --- /dev/null +++ b/lib/rob/schedulers/v2/job/concurrent.go @@ -0,0 +1,9 @@ +package job + +import ( + "github.com/google/uuid" +) + +type Concurrent struct { + User uuid.UUID +} diff --git a/lib/rob/schedulers/v2/job/stalled.go b/lib/rob/schedulers/v2/job/stalled.go new file mode 100644 index 0000000000000000000000000000000000000000..202f974c7af219192d108c173922dc50368ba8dd --- /dev/null +++ b/lib/rob/schedulers/v2/job/stalled.go @@ -0,0 +1,10 @@ +package job + +import ( + "github.com/google/uuid" +) + +type Stalled struct { + Concurrent + Ready chan<- uuid.UUID +} diff --git a/lib/rob/schedulers/v2/scheduler.go b/lib/rob/schedulers/v2/scheduler.go new file mode 100644 index 0000000000000000000000000000000000000000..76ac15a97be452268a4188e94b9071640e391748 --- /dev/null +++ b/lib/rob/schedulers/v2/scheduler.go @@ -0,0 +1,218 @@ +package schedulers + +import ( + "sync" + + "github.com/google/uuid" + + "pggat2/lib/rob" + "pggat2/lib/rob/schedulers/v2/job" + "pggat2/lib/rob/schedulers/v2/sink" + "pggat2/lib/util/maps" + "pggat2/lib/util/pools" +) + +type Scheduler struct { + affinity maps.RWLocked[uuid.UUID, uuid.UUID] + + // resource pools + ready pools.Locked[chan uuid.UUID] + + // backlog is the list of user + backlog []job.Stalled + bmu sync.Mutex + sinks map[uuid.UUID]*sink.Sink + mu sync.RWMutex +} + +func (T *Scheduler) AddWorker(worker uuid.UUID) { + s := sink.NewSink(worker) + + if func() bool { + T.mu.Lock() + defer T.mu.Unlock() + // if mu is locked, we don't need to lock bmu, because we are the only accessor + if T.sinks == nil { + T.sinks = make(map[uuid.UUID]*sink.Sink) + } + T.sinks[worker] = s + + if len(T.backlog) == 0 { + return false + } + + for _, v := range T.backlog { + s.Enqueue(v) + } + T.backlog = T.backlog[:0] + return true + }() { + return + } + + T.mu.RLock() + defer T.mu.RUnlock() + T.stealFor(worker) +} + +func (T *Scheduler) RemoveWorker(worker uuid.UUID) { + var s *sink.Sink + var ok bool + func() { + T.mu.Lock() + defer T.mu.Unlock() + s, ok = T.sinks[worker] + delete(T.sinks, worker) + }() + if !ok { + return + } + + // now we need to reschedule all the work that was scheduled to s (stalled only). + jobs := s.StealAll() + + for _, j := range jobs { + T.Enqueue(j) + } +} + +func (*Scheduler) AddUser(_ uuid.UUID) { + // nothing to do, users are added lazily +} + +func (T *Scheduler) RemoveUser(user uuid.UUID) { + T.affinity.Delete(user) + + T.mu.RLock() + defer T.mu.RUnlock() + for _, v := range T.sinks { + v.RemoveUser(user) + } +} + +func (T *Scheduler) Acquire(j job.Concurrent) uuid.UUID { + affinity, _ := T.affinity.Load(j.User) + + // these can be unlocked and locked a bunch here because it is less bad if ExecuteConcurrent misses a sink + // (it will just stall the job and try again) + T.mu.RLock() + + // try affinity first + if v, ok := T.sinks[affinity]; ok { + T.mu.RUnlock() + if v.Acquire(j) { + return affinity + } + T.mu.RLock() + } + + for id, v := range T.sinks { + if id == affinity { + continue + } + T.mu.RUnlock() + if v.Acquire(j) { + // set affinity + T.affinity.Store(j.User, id) + return id + } + T.mu.RLock() + } + + T.mu.RUnlock() + return uuid.Nil +} + +func (T *Scheduler) AcquireConcurrent(user uuid.UUID) uuid.UUID { + return T.Acquire(job.Concurrent{ + User: user, + }) +} + +func (T *Scheduler) Enqueue(j job.Stalled) { + affinity, _ := T.affinity.Load(j.User) + + T.mu.RLock() + defer T.mu.RUnlock() + + // try affinity first + if v, ok := T.sinks[affinity]; ok { + v.Enqueue(j) + return + } + + for id, v := range T.sinks { + if id == affinity { + continue + } + + v.Enqueue(j) + T.affinity.Store(j.User, id) + return + } + + // add to backlog + T.bmu.Lock() + defer T.bmu.Unlock() + T.backlog = append(T.backlog, j) +} + +func (T *Scheduler) AcquireAsync(user uuid.UUID) uuid.UUID { + ready, ok := T.ready.Get() + if !ok { + ready = make(chan uuid.UUID, 1) + } + defer T.ready.Put(ready) + + j := job.Stalled{ + Concurrent: job.Concurrent{ + User: user, + }, + Ready: ready, + } + T.Enqueue(j) + return <-ready +} + +func (T *Scheduler) Release(worker uuid.UUID) { + T.mu.RLock() + defer T.mu.RUnlock() + + s, ok := T.sinks[worker] + if !ok { + return + } + hasMore := s.Release() + if !hasMore { + // try to steal + T.stealFor(worker) + } +} + +// stealFor will try to steal work for the specified worker. RLock Scheduler.mu before executing +func (T *Scheduler) stealFor(worker uuid.UUID) { + s, ok := T.sinks[worker] + if !ok { + return + } + + for _, v := range T.sinks { + if v == s { + continue + } + + if func() bool { + T.mu.RUnlock() + defer T.mu.RLock() + if src := v.StealFor(s); src != uuid.Nil { + T.affinity.Store(src, worker) + return true + } + return false + }() { + return + } + } +} + +var _ rob.Scheduler = (*Scheduler)(nil) diff --git a/lib/rob/schedulers/v1/scheduler_test.go b/lib/rob/schedulers/v2/scheduler_test.go similarity index 52% rename from lib/rob/schedulers/v1/scheduler_test.go rename to lib/rob/schedulers/v2/scheduler_test.go index 43f22dd66e48cea0f0ca47545036792bdc921817..f21e39d5790d84edec9c92404ea36eb57bd30173 100644 --- a/lib/rob/schedulers/v1/scheduler_test.go +++ b/lib/rob/schedulers/v2/scheduler_test.go @@ -3,7 +3,6 @@ package schedulers import ( "runtime" "sync" - "sync/atomic" "testing" "time" @@ -12,11 +11,6 @@ import ( "pggat2/lib/rob" ) -type Work struct { - Sender int - Duration time.Duration -} - type ShareTable struct { table map[int]int mu sync.RWMutex @@ -40,80 +34,56 @@ func (T *ShareTable) Get(user int) int { return v } -type TestSink struct { - table *ShareTable - constraints rob.Constraints - inuse atomic.Bool - remove atomic.Bool - removed atomic.Bool +func testSink(sched *Scheduler) uuid.UUID { + id := uuid.New() + sched.AddWorker(id) + return id } -func (T *TestSink) Do(ctx *rob.Context, work any) { - if T.inuse.Swap(true) { - panic("Sink was already inuse") - } - defer T.inuse.Store(false) - if !T.constraints.Satisfies(ctx.Constraints) { - panic("Scheduler did not obey constraints") - } - v := work.(Work) - start := time.Now() - for time.Since(start) < v.Duration { - } - T.table.Inc(v.Sender) - if T.remove.Load() { - removed := T.removed.Swap(true) - if removed { - panic("Scheduler did not remove when requested") +func testSource(sched *Scheduler, tab *ShareTable, id int, dur time.Duration) { + source := uuid.New() + sched.AddUser(source) + for { + sink := rob.Acquire(sched, source) + start := time.Now() + for time.Since(start) < dur { } - ctx.Remove() + tab.Inc(id) + sched.Release(sink) } } -var _ rob.Worker = (*TestSink)(nil) - -func testSink(sched *Scheduler, table *ShareTable, constraints rob.Constraints) uuid.UUID { - return sched.AddWorker(constraints, &TestSink{ - table: table, - constraints: constraints, - }) -} - -func testSinkRemoveAfter(sched *Scheduler, table *ShareTable, constraints rob.Constraints, removeAfter time.Duration) uuid.UUID { - sink := &TestSink{ - table: table, - constraints: constraints, - } - go func() { - time.Sleep(removeAfter) - sink.remove.Store(true) - }() - return sched.AddWorker(constraints, sink) -} - -func testSource(sched *Scheduler, id int, dur time.Duration, constraints rob.Constraints) { - source := sched.NewSource() - for { - w := Work{ - Sender: id, - Duration: dur, - } - source.Do(&rob.Context{ - Constraints: constraints, - }, w) +func testMultiSource(sched *Scheduler, tab *ShareTable, id int, dur time.Duration, num int) { + source := uuid.New() + sched.AddUser(source) + for i := 0; i < num; i++ { + go func() { + for { + sink := rob.Acquire(sched, source) + start := time.Now() + for time.Since(start) < dur { + } + tab.Inc(id) + sched.Release(sink) + } + }() } } -func testStarver(sched *Scheduler, id int, dur time.Duration, constraints rob.Constraints) { +func testStarver(sched *Scheduler, tab *ShareTable, id int, dur time.Duration) { for { - source := sched.NewSource() - w := Work{ - Sender: id, - Duration: dur, - } - source.Do(&rob.Context{ - Constraints: constraints, - }, w) + func() { + source := uuid.New() + sched.AddUser(source) + defer sched.RemoveUser(source) + + sink := rob.Acquire(sched, source) + defer sched.Release(sink) + start := time.Now() + for time.Since(start) < dur { + } + tab.Inc(id) + }() } } @@ -159,13 +129,13 @@ func allStacks() []byte { func TestScheduler(t *testing.T) { var table ShareTable - sched := NewScheduler() - testSink(sched, &table, 0) + sched := new(Scheduler) + testSink(sched) - go testSource(sched, 0, 10*time.Millisecond, 0) - go testSource(sched, 1, 10*time.Millisecond, 0) - go testSource(sched, 2, 50*time.Millisecond, 0) - go testSource(sched, 3, 100*time.Millisecond, 0) + go testSource(sched, &table, 0, 10*time.Millisecond) + go testSource(sched, &table, 1, 10*time.Millisecond) + go testSource(sched, &table, 2, 50*time.Millisecond) + go testSource(sched, &table, 3, 100*time.Millisecond) time.Sleep(20 * time.Second) t0 := table.Get(0) @@ -199,16 +169,16 @@ func TestScheduler(t *testing.T) { func TestScheduler_Late(t *testing.T) { var table ShareTable - sched := NewScheduler() - testSink(sched, &table, 0) + sched := new(Scheduler) + testSink(sched) - go testSource(sched, 0, 10*time.Millisecond, 0) - go testSource(sched, 1, 10*time.Millisecond, 0) + go testSource(sched, &table, 0, 10*time.Millisecond) + go testSource(sched, &table, 1, 10*time.Millisecond) time.Sleep(10 * time.Second) - go testSource(sched, 2, 10*time.Millisecond, 0) - go testSource(sched, 3, 10*time.Millisecond, 0) + go testSource(sched, &table, 2, 10*time.Millisecond) + go testSource(sched, &table, 3, 10*time.Millisecond) time.Sleep(10 * time.Second) t0 := table.Get(0) @@ -243,14 +213,14 @@ func TestScheduler_Late(t *testing.T) { func TestScheduler_StealBalanced(t *testing.T) { var table ShareTable - sched := NewScheduler() - testSink(sched, &table, 0) - testSink(sched, &table, 0) + sched := new(Scheduler) + testSink(sched) + testSink(sched) - go testSource(sched, 0, 10*time.Millisecond, 0) - go testSource(sched, 1, 10*time.Millisecond, 0) - go testSource(sched, 2, 10*time.Millisecond, 0) - go testSource(sched, 3, 10*time.Millisecond, 0) + go testSource(sched, &table, 0, 10*time.Millisecond) + go testSource(sched, &table, 1, 10*time.Millisecond) + go testSource(sched, &table, 2, 10*time.Millisecond) + go testSource(sched, &table, 3, 10*time.Millisecond) time.Sleep(20 * time.Second) t0 := table.Get(0) @@ -270,6 +240,7 @@ func TestScheduler_StealBalanced(t *testing.T) { if !similar(t0, t1, t2, t3) { t.Error("expected all shares to be similar") + t.Errorf("%s", allStacks()) } if t0 == 0 { @@ -280,13 +251,13 @@ func TestScheduler_StealBalanced(t *testing.T) { func TestScheduler_StealUnbalanced(t *testing.T) { var table ShareTable - sched := NewScheduler() - testSink(sched, &table, 0) - testSink(sched, &table, 0) + sched := new(Scheduler) + testSink(sched) + testSink(sched) - go testSource(sched, 0, 10*time.Millisecond, 0) - go testSource(sched, 1, 10*time.Millisecond, 0) - go testSource(sched, 2, 10*time.Millisecond, 0) + go testSource(sched, &table, 0, 10*time.Millisecond) + go testSource(sched, &table, 1, 10*time.Millisecond) + go testSource(sched, &table, 2, 10*time.Millisecond) time.Sleep(20 * time.Second) t0 := table.Get(0) @@ -302,63 +273,26 @@ func TestScheduler_StealUnbalanced(t *testing.T) { t.Log("share of 1:", t1) t.Log("share of 2:", t2) + if !similar(t0, t1, t2) { + t.Error("expected all shares to be similar") + t.Errorf("%s", allStacks()) + } + if t0 == 0 || t1 == 0 || t2 == 0 { t.Error("expected executions on all sources (is there a race in the balancer??)") t.Errorf("%s", allStacks()) } } -func TestScheduler_Constraints(t *testing.T) { - const ( - ConstraintA rob.Constraints = 1 << iota - ConstraintB - ) - - var table ShareTable - sched := NewScheduler() - - testSink(sched, &table, rob.Constraints.All(ConstraintA, ConstraintB)) - testSink(sched, &table, ConstraintA) - testSink(sched, &table, ConstraintB) - - go testSource(sched, 0, 10*time.Millisecond, rob.Constraints.All(ConstraintA, ConstraintB)) - go testSource(sched, 1, 10*time.Millisecond, rob.Constraints.All(ConstraintA, ConstraintB)) - go testSource(sched, 2, 10*time.Millisecond, ConstraintA) - go testSource(sched, 3, 10*time.Millisecond, ConstraintA) - go testSource(sched, 4, 10*time.Millisecond, ConstraintB) - go testSource(sched, 5, 10*time.Millisecond, ConstraintB) - - time.Sleep(20 * time.Second) - t0 := table.Get(0) - t1 := table.Get(1) - t2 := table.Get(2) - t3 := table.Get(3) - t4 := table.Get(4) - t5 := table.Get(5) - - /* - Expectations: - - all users should get similar # of executions (shares of 0 and 1 may be less because they have less sinks they can use: 1 vs 2) - - all constraints should be honored - */ - - t.Log("share of 0:", t0) - t.Log("share of 1:", t1) - t.Log("share of 2:", t2) - t.Log("share of 3:", t3) - t.Log("share of 4:", t4) - t.Log("share of 5:", t5) -} - func TestScheduler_IdleWake(t *testing.T) { var table ShareTable - sched := NewScheduler() + sched := new(Scheduler) - testSink(sched, &table, 0) + testSink(sched) time.Sleep(10 * time.Second) - go testSource(sched, 0, 10*time.Millisecond, 0) + go testSource(sched, &table, 0, 10*time.Millisecond) time.Sleep(10 * time.Second) t0 := table.Get(0) @@ -377,13 +311,13 @@ func TestScheduler_IdleWake(t *testing.T) { func TestScheduler_LateSink(t *testing.T) { var table ShareTable - sched := NewScheduler() + sched := new(Scheduler) - go testSource(sched, 0, 10*time.Millisecond, 0) + go testSource(sched, &table, 0, 10*time.Millisecond) time.Sleep(10 * time.Second) - testSink(sched, &table, 0) + testSink(sched) time.Sleep(10 * time.Second) t0 := table.Get(0) @@ -402,13 +336,13 @@ func TestScheduler_LateSink(t *testing.T) { func TestScheduler_Starve(t *testing.T) { var table ShareTable - sched := NewScheduler() + sched := new(Scheduler) - testSink(sched, &table, 0) + testSink(sched) - go testStarver(sched, 1, 10*time.Millisecond, 0) - go testStarver(sched, 2, 10*time.Millisecond, 0) - go testSource(sched, 0, 10*time.Millisecond, 0) + go testStarver(sched, &table, 1, 10*time.Millisecond) + go testStarver(sched, &table, 2, 10*time.Millisecond) + go testSource(sched, &table, 0, 10*time.Millisecond) time.Sleep(20 * time.Second) t0 := table.Get(0) @@ -431,14 +365,14 @@ func TestScheduler_Starve(t *testing.T) { func TestScheduler_RemoveSinkOuter(t *testing.T) { var table ShareTable - sched := NewScheduler() - testSink(sched, &table, 0) - toRemove := testSink(sched, &table, 0) + sched := new(Scheduler) + testSink(sched) + toRemove := testSink(sched) - go testSource(sched, 0, 10*time.Millisecond, 0) - go testSource(sched, 1, 10*time.Millisecond, 0) - go testSource(sched, 2, 10*time.Millisecond, 0) - go testSource(sched, 3, 10*time.Millisecond, 0) + go testSource(sched, &table, 0, 10*time.Millisecond) + go testSource(sched, &table, 1, 10*time.Millisecond) + go testSource(sched, &table, 2, 10*time.Millisecond) + go testSource(sched, &table, 3, 10*time.Millisecond) time.Sleep(10 * time.Second) @@ -471,23 +405,20 @@ func TestScheduler_RemoveSinkOuter(t *testing.T) { } } -func TestScheduler_RemoveSinkInner(t *testing.T) { +func TestScheduler_MultiJob(t *testing.T) { var table ShareTable - sched := NewScheduler() - testSink(sched, &table, 0) - testSinkRemoveAfter(sched, &table, 0, 10*time.Second) + sched := new(Scheduler) + testSink(sched) + testSink(sched) - go testSource(sched, 0, 10*time.Millisecond, 0) - go testSource(sched, 1, 10*time.Millisecond, 0) - go testSource(sched, 2, 10*time.Millisecond, 0) - go testSource(sched, 3, 10*time.Millisecond, 0) + go testMultiSource(sched, &table, 0, 10*time.Millisecond, 2) + go testMultiSource(sched, &table, 1, 10*time.Millisecond, 3) + go testMultiSource(sched, &table, 2, 10*time.Millisecond, 4) time.Sleep(20 * time.Second) - t0 := table.Get(0) t1 := table.Get(1) t2 := table.Get(2) - t3 := table.Get(3) /* Expectations: @@ -497,13 +428,8 @@ func TestScheduler_RemoveSinkInner(t *testing.T) { t.Log("share of 0:", t0) t.Log("share of 1:", t1) t.Log("share of 2:", t2) - t.Log("share of 3:", t3) - if !similar(t0, t1, t2, t3) { - t.Error("expected all shares to be similar") - } - - if t0 == 0 { + if t0 == 0 || t1 == 0 || t2 == 0 { t.Error("expected executions on all sources (is there a race in the balancer??)") t.Errorf("%s", allStacks()) } diff --git a/lib/rob/schedulers/v2/sink/sink.go b/lib/rob/schedulers/v2/sink/sink.go new file mode 100644 index 0000000000000000000000000000000000000000..8bfd2c032726303db494ff3e6b60b6f2ad2373aa --- /dev/null +++ b/lib/rob/schedulers/v2/sink/sink.go @@ -0,0 +1,247 @@ +package sink + +import ( + "sync" + "time" + + "github.com/google/uuid" + + "pggat2/lib/rob/schedulers/v2/job" + "pggat2/lib/util/rbtree" + "pggat2/lib/util/ring" +) + +type Sink struct { + id uuid.UUID + + // non final + active uuid.UUID + start time.Time + + floor time.Duration + stride map[uuid.UUID]time.Duration + pending map[uuid.UUID]*ring.Ring[job.Stalled] + scheduled rbtree.RBTree[time.Duration, job.Stalled] + + mu sync.Mutex +} + +func NewSink(id uuid.UUID) *Sink { + return &Sink{ + id: id, + } +} + +func (T *Sink) schedule(j job.Stalled) bool { + if T.active == j.User { + return false + } + + stride, ok := T.stride[j.User] + if !ok { + // set to max + stride = T.floor + if s, _, ok := T.scheduled.Max(); ok { + stride = s + 1 + } + if T.stride == nil { + T.stride = make(map[uuid.UUID]time.Duration) + } + T.stride[j.User] = stride + } else if stride < T.floor { + stride = T.floor + if T.stride == nil { + T.stride = make(map[uuid.UUID]time.Duration) + } + T.stride[j.User] = stride + } + + for { + // find unique stride to schedule on + if s, ok := T.scheduled.Get(stride); ok { + if s.User == j.User { + return false + } + stride += 1 + continue + } + + T.scheduled.Set(stride, j) + return true + } +} + +func (T *Sink) enqueue(j job.Stalled) { + if T.schedule(j) { + return + } + + p, ok := T.pending[j.User] + + // add to pending queue + if !ok { + p = ring.NewRing[job.Stalled](0, 1) + if T.pending == nil { + T.pending = make(map[uuid.UUID]*ring.Ring[job.Stalled]) + } + T.pending[j.User] = p + } + + p.PushBack(j) +} + +func (T *Sink) Enqueue(j job.Stalled) { + // enqueue job + T.mu.Lock() + defer T.mu.Unlock() + + if T.active == uuid.Nil { + // run it now + T.acquire(j.User) + j.Ready <- T.id + return + } + + // enqueue for later + T.enqueue(j) +} + +func (T *Sink) acquire(user uuid.UUID) { + if T.active != uuid.Nil { + panic("acquire called when already in use") + } + T.active = user + T.start = time.Now() +} + +func (T *Sink) Acquire(j job.Concurrent) bool { + T.mu.Lock() + defer T.mu.Unlock() + + if T.active != uuid.Nil { + // already active + return false + } + + T.acquire(j.User) + + return true +} + +func (T *Sink) enqueueNext(user uuid.UUID) { + pending, ok := T.pending[user] + if !ok { + return + } + j, ok := pending.PopFront() + if !ok { + return + } + if ok = T.schedule(j); !ok { + pending.PushFront(j) + return + } +} + +func (T *Sink) next() bool { + now := time.Now() + if T.active != uuid.Nil { + user := T.active + dur := now.Sub(T.start) + T.active = uuid.Nil + T.start = now + + if T.stride == nil { + T.stride = make(map[uuid.UUID]time.Duration) + } + T.stride[user] += dur + + T.enqueueNext(user) + } + + stride, j, ok := T.scheduled.Min() + if !ok { + return false + } + T.scheduled.Delete(stride) + if stride > T.floor { + T.floor = stride + } + + T.acquire(j.User) + j.Ready <- T.id + return true +} + +func (T *Sink) Release() (hasMore bool) { + T.mu.Lock() + defer T.mu.Unlock() + + return T.next() +} + +func (T *Sink) StealAll() []job.Stalled { + var all []job.Stalled + + T.mu.Lock() + defer T.mu.Unlock() + + for { + if k, j, ok := T.scheduled.Min(); ok { + T.scheduled.Delete(k) + all = append(all, j) + } else { + break + } + } + + for _, value := range T.pending { + for { + if j, ok := value.PopFront(); ok { + all = append(all, j) + } else { + break + } + } + } + + return all +} + +func (T *Sink) StealFor(rhs *Sink) uuid.UUID { + if T == rhs { + return uuid.Nil + } + + T.mu.Lock() + + stride, j, ok := T.scheduled.Min() + if !ok { + T.mu.Unlock() + return uuid.Nil + } + T.scheduled.Delete(stride) + + user := j.User + + pending, _ := T.pending[user] + delete(T.pending, user) + + T.mu.Unlock() + + rhs.Enqueue(j) + + for j, ok = pending.PopFront(); ok; j, ok = pending.PopFront() { + rhs.Enqueue(j) + } + + return user +} + +func (T *Sink) RemoveUser(user uuid.UUID) { + T.mu.Lock() + defer T.mu.Unlock() + + delete(T.pending, user) + delete(T.stride, user) +} diff --git a/lib/rob/worker.go b/lib/rob/worker.go deleted file mode 100644 index 2782397518db4a4f03b36d22e006a68845ca7ee3..0000000000000000000000000000000000000000 --- a/lib/rob/worker.go +++ /dev/null @@ -1,5 +0,0 @@ -package rob - -type Worker interface { - Do(ctx *Context, work any) -} diff --git a/pgbouncer.ini b/pgbouncer.ini index 5e2ff5aa099ad513dbf8c73daae4856b246d337c..6c175d6f9eb355246d436605ac5020519668c5f8 100644 --- a/pgbouncer.ini +++ b/pgbouncer.ini @@ -1,5 +1,5 @@ [pgbouncer] -pool_mode = session +pool_mode = transaction auth_file = userlist.txt listen_addr = * track_extra_parameters = IntervalStyle, session_authorization, default_transaction_read_only, search_path