From ec22485e1b4a7037f17776cc9b2f5ab09b0e3302 Mon Sep 17 00:00:00 2001 From: Garet Halliday <me@garet.holiday> Date: Mon, 31 Jul 2023 11:44:56 -0600 Subject: [PATCH] track last in use for scaling down --- cmd/cgat/main.go | 20 +++++-- lib/gat/pools/session/metrics.go | 26 ++++++++++ lib/gat/pools/session/pool.go | 69 ++++++++++++++++++------- lib/rob/metrics.go | 2 + lib/rob/schedulers/v1/pool/sink/sink.go | 23 +++++++-- 5 files changed, 111 insertions(+), 29 deletions(-) diff --git a/cmd/cgat/main.go b/cmd/cgat/main.go index 9e9b3bb8..8f44a7e0 100644 --- a/cmd/cgat/main.go +++ b/cmd/cgat/main.go @@ -7,8 +7,7 @@ import ( "time" "pggat2/lib/gat" - "pggat2/lib/gat/pools/transaction" - "pggat2/lib/rob" + "pggat2/lib/gat/pools/session" ) func main() { @@ -25,7 +24,7 @@ func main() { pooler.AddUser("postgres", postgres) // create pool - rawPool := transaction.NewPool() + rawPool := session.NewPool() pool := gat.NewPool(rawPool) postgres.AddPool("uniswap", pool) pool.AddRecipe("localhost", gat.TCPRecipe{ @@ -37,12 +36,23 @@ func main() { MaxConnections: 5, }) + /* + go func() { + var metrics rob.Metrics + + for { + time.Sleep(1 * time.Second) + rawPool.ReadSchedulerMetrics(&metrics) + log.Println(metrics.String()) + } + }() + */ go func() { - var metrics rob.Metrics + var metrics session.Metrics for { time.Sleep(1 * time.Second) - rawPool.ReadSchedulerMetrics(&metrics) + rawPool.ReadMetrics(&metrics) log.Println(metrics.String()) } }() diff --git a/lib/gat/pools/session/metrics.go b/lib/gat/pools/session/metrics.go index ee63fe94..3058166f 100644 --- a/lib/gat/pools/session/metrics.go +++ b/lib/gat/pools/session/metrics.go @@ -1,4 +1,30 @@ package session +import ( + "fmt" + "time" + + "github.com/google/uuid" +) + +type WorkerMetrics struct { + LastActive time.Time +} + type Metrics struct { + Workers map[uuid.UUID]WorkerMetrics +} + +func (T *Metrics) InUse() int { + var used int + for _, worker := range T.Workers { + if worker.LastActive == (time.Time{}) { + used++ + } + } + return used +} + +func (T *Metrics) String() string { + return fmt.Sprintf("%d in use / %d total", T.InUse(), len(T.Workers)) } diff --git a/lib/gat/pools/session/pool.go b/lib/gat/pools/session/pool.go index 8e7a5af9..02ddf027 100644 --- a/lib/gat/pools/session/pool.go +++ b/lib/gat/pools/session/pool.go @@ -2,57 +2,69 @@ package session import ( "sync" + "time" "github.com/google/uuid" "pggat2/lib/bouncer/bouncers/v2" "pggat2/lib/gat" "pggat2/lib/util/chans" + "pggat2/lib/util/maps" "pggat2/lib/zap" ) +type queueItem struct { + added time.Time + id uuid.UUID +} + type Pool struct { // use slice lifo for better perf - queue []uuid.UUID + queue []queueItem conns map[uuid.UUID]zap.ReadWriter - qmu sync.RWMutex - - signal chan struct{} + ready sync.Cond + qmu sync.Mutex } func NewPool() *Pool { - return &Pool{ - signal: make(chan struct{}), - } + p := &Pool{} + p.ready.L = &p.qmu + return p } func (T *Pool) acquire(ctx *gat.Context) (uuid.UUID, zap.ReadWriter) { + T.qmu.Lock() + defer T.qmu.Unlock() for { - T.qmu.Lock() if len(T.queue) > 0 { - id := T.queue[len(T.queue)-1] + item := T.queue[len(T.queue)-1] T.queue = T.queue[:len(T.queue)-1] - conn, ok := T.conns[id] - T.qmu.Unlock() + conn, ok := T.conns[item.id] if !ok { continue } - return id, conn + return item.id, conn } - T.qmu.Unlock() if ctx.OnWait != nil { chans.TrySend(ctx.OnWait, struct{}{}) } - <-T.signal + T.ready.Wait() } } +func (T *Pool) _release(id uuid.UUID) { + T.queue = append(T.queue, queueItem{ + added: time.Now(), + id: id, + }) + + T.ready.Signal() +} + func (T *Pool) release(id uuid.UUID) { T.qmu.Lock() defer T.qmu.Unlock() - T.queue = append(T.queue, id) - - chans.TrySend(T.signal, struct{}{}) + T._release(id) } func (T *Pool) Serve(ctx *gat.Context, client zap.ReadWriter) { @@ -83,7 +95,7 @@ func (T *Pool) AddServer(server zap.ReadWriter) uuid.UUID { T.conns = make(map[uuid.UUID]zap.ReadWriter) } T.conns[id] = server - T.queue = append(T.queue, id) + T._release(id) return id } @@ -107,7 +119,26 @@ func (T *Pool) RemoveServer(id uuid.UUID) zap.ReadWriter { } func (T *Pool) ReadMetrics(metrics *Metrics) { - // TODO(garet) metrics + maps.Clear(metrics.Workers) + + if metrics.Workers == nil { + metrics.Workers = make(map[uuid.UUID]WorkerMetrics) + } + + T.qmu.Lock() + defer T.qmu.Unlock() + + for _, item := range T.queue { + metrics.Workers[item.id] = WorkerMetrics{ + LastActive: item.added, + } + } + + for id := range T.conns { + if _, ok := metrics.Workers[id]; !ok { + metrics.Workers[id] = WorkerMetrics{} + } + } } var _ gat.RawPool = (*Pool)(nil) diff --git a/lib/rob/metrics.go b/lib/rob/metrics.go index 9be8510b..b752c595 100644 --- a/lib/rob/metrics.go +++ b/lib/rob/metrics.go @@ -8,6 +8,8 @@ import ( ) type WorkerMetrics struct { + LastActive time.Time + Idle time.Duration Active time.Duration } diff --git a/lib/rob/schedulers/v1/pool/sink/sink.go b/lib/rob/schedulers/v1/pool/sink/sink.go index 2895ebdc..d5d36866 100644 --- a/lib/rob/schedulers/v1/pool/sink/sink.go +++ b/lib/rob/schedulers/v1/pool/sink/sink.go @@ -59,7 +59,11 @@ func (T *Sink) setActive(source uuid.UUID) { panic("set active called when another was active") } now := time.Now() - T.idle += now.Sub(T.start) + start := T.start + if start.Before(T.lastMetricsRead) { + start = T.lastMetricsRead + } + T.idle += now.Sub(start) T.active = source T.start = now } @@ -285,14 +289,23 @@ func (T *Sink) ReadMetrics(metrics *rob.Metrics) { now := time.Now() - if T.active == uuid.Nil { - T.idle += now.Sub(T.start) - T.start = now - } + var lastActive time.Time dur := now.Sub(T.lastMetricsRead) + if T.active == uuid.Nil { + lastActive = T.start + + start := T.start + if start.Before(T.lastMetricsRead) { + start = T.lastMetricsRead + } + T.idle += now.Sub(start) + } + metrics.Workers[T.id] = rob.WorkerMetrics{ + LastActive: lastActive, + Idle: T.idle, Active: dur - T.idle, } -- GitLab