diff --git a/cmd/cgat/main.go b/cmd/cgat/main.go index 9e9b3bb8f5fbd3f146e29b71234c682f5ead6612..8f44a7e0444630ef697d911043f0d122230f1bfc 100644 --- a/cmd/cgat/main.go +++ b/cmd/cgat/main.go @@ -7,8 +7,7 @@ import ( "time" "pggat2/lib/gat" - "pggat2/lib/gat/pools/transaction" - "pggat2/lib/rob" + "pggat2/lib/gat/pools/session" ) func main() { @@ -25,7 +24,7 @@ func main() { pooler.AddUser("postgres", postgres) // create pool - rawPool := transaction.NewPool() + rawPool := session.NewPool() pool := gat.NewPool(rawPool) postgres.AddPool("uniswap", pool) pool.AddRecipe("localhost", gat.TCPRecipe{ @@ -37,12 +36,23 @@ func main() { MaxConnections: 5, }) + /* + go func() { + var metrics rob.Metrics + + for { + time.Sleep(1 * time.Second) + rawPool.ReadSchedulerMetrics(&metrics) + log.Println(metrics.String()) + } + }() + */ go func() { - var metrics rob.Metrics + var metrics session.Metrics for { time.Sleep(1 * time.Second) - rawPool.ReadSchedulerMetrics(&metrics) + rawPool.ReadMetrics(&metrics) log.Println(metrics.String()) } }() diff --git a/lib/gat/pools/session/metrics.go b/lib/gat/pools/session/metrics.go index ee63fe94433044f85e3822872ed9148ec2350860..3058166f2ab41e6205efc82f014f073f797a1e8b 100644 --- a/lib/gat/pools/session/metrics.go +++ b/lib/gat/pools/session/metrics.go @@ -1,4 +1,30 @@ package session +import ( + "fmt" + "time" + + "github.com/google/uuid" +) + +type WorkerMetrics struct { + LastActive time.Time +} + type Metrics struct { + Workers map[uuid.UUID]WorkerMetrics +} + +func (T *Metrics) InUse() int { + var used int + for _, worker := range T.Workers { + if worker.LastActive == (time.Time{}) { + used++ + } + } + return used +} + +func (T *Metrics) String() string { + return fmt.Sprintf("%d in use / %d total", T.InUse(), len(T.Workers)) } diff --git a/lib/gat/pools/session/pool.go b/lib/gat/pools/session/pool.go index 8e7a5af9cdacc3a2c335bd42ce13598b628f7273..02ddf027356ff0c4169640d714c3677de93f7194 100644 --- a/lib/gat/pools/session/pool.go +++ b/lib/gat/pools/session/pool.go @@ -2,57 +2,69 @@ package session import ( "sync" + "time" "github.com/google/uuid" "pggat2/lib/bouncer/bouncers/v2" "pggat2/lib/gat" "pggat2/lib/util/chans" + "pggat2/lib/util/maps" "pggat2/lib/zap" ) +type queueItem struct { + added time.Time + id uuid.UUID +} + type Pool struct { // use slice lifo for better perf - queue []uuid.UUID + queue []queueItem conns map[uuid.UUID]zap.ReadWriter - qmu sync.RWMutex - - signal chan struct{} + ready sync.Cond + qmu sync.Mutex } func NewPool() *Pool { - return &Pool{ - signal: make(chan struct{}), - } + p := &Pool{} + p.ready.L = &p.qmu + return p } func (T *Pool) acquire(ctx *gat.Context) (uuid.UUID, zap.ReadWriter) { + T.qmu.Lock() + defer T.qmu.Unlock() for { - T.qmu.Lock() if len(T.queue) > 0 { - id := T.queue[len(T.queue)-1] + item := T.queue[len(T.queue)-1] T.queue = T.queue[:len(T.queue)-1] - conn, ok := T.conns[id] - T.qmu.Unlock() + conn, ok := T.conns[item.id] if !ok { continue } - return id, conn + return item.id, conn } - T.qmu.Unlock() if ctx.OnWait != nil { chans.TrySend(ctx.OnWait, struct{}{}) } - <-T.signal + T.ready.Wait() } } +func (T *Pool) _release(id uuid.UUID) { + T.queue = append(T.queue, queueItem{ + added: time.Now(), + id: id, + }) + + T.ready.Signal() +} + func (T *Pool) release(id uuid.UUID) { T.qmu.Lock() defer T.qmu.Unlock() - T.queue = append(T.queue, id) - - chans.TrySend(T.signal, struct{}{}) + T._release(id) } func (T *Pool) Serve(ctx *gat.Context, client zap.ReadWriter) { @@ -83,7 +95,7 @@ func (T *Pool) AddServer(server zap.ReadWriter) uuid.UUID { T.conns = make(map[uuid.UUID]zap.ReadWriter) } T.conns[id] = server - T.queue = append(T.queue, id) + T._release(id) return id } @@ -107,7 +119,26 @@ func (T *Pool) RemoveServer(id uuid.UUID) zap.ReadWriter { } func (T *Pool) ReadMetrics(metrics *Metrics) { - // TODO(garet) metrics + maps.Clear(metrics.Workers) + + if metrics.Workers == nil { + metrics.Workers = make(map[uuid.UUID]WorkerMetrics) + } + + T.qmu.Lock() + defer T.qmu.Unlock() + + for _, item := range T.queue { + metrics.Workers[item.id] = WorkerMetrics{ + LastActive: item.added, + } + } + + for id := range T.conns { + if _, ok := metrics.Workers[id]; !ok { + metrics.Workers[id] = WorkerMetrics{} + } + } } var _ gat.RawPool = (*Pool)(nil) diff --git a/lib/rob/metrics.go b/lib/rob/metrics.go index 9be8510b47027345ccf71581e69f2f421fe2078f..b752c595d756775e0d2e0d8a92e77db82a8fa8cc 100644 --- a/lib/rob/metrics.go +++ b/lib/rob/metrics.go @@ -8,6 +8,8 @@ import ( ) type WorkerMetrics struct { + LastActive time.Time + Idle time.Duration Active time.Duration } diff --git a/lib/rob/schedulers/v1/pool/sink/sink.go b/lib/rob/schedulers/v1/pool/sink/sink.go index 2895ebdc02df229d8435db989b21243e5548c5ff..d5d3686624a2790d379db58d58b4e91a7ed2be40 100644 --- a/lib/rob/schedulers/v1/pool/sink/sink.go +++ b/lib/rob/schedulers/v1/pool/sink/sink.go @@ -59,7 +59,11 @@ func (T *Sink) setActive(source uuid.UUID) { panic("set active called when another was active") } now := time.Now() - T.idle += now.Sub(T.start) + start := T.start + if start.Before(T.lastMetricsRead) { + start = T.lastMetricsRead + } + T.idle += now.Sub(start) T.active = source T.start = now } @@ -285,14 +289,23 @@ func (T *Sink) ReadMetrics(metrics *rob.Metrics) { now := time.Now() - if T.active == uuid.Nil { - T.idle += now.Sub(T.start) - T.start = now - } + var lastActive time.Time dur := now.Sub(T.lastMetricsRead) + if T.active == uuid.Nil { + lastActive = T.start + + start := T.start + if start.Before(T.lastMetricsRead) { + start = T.lastMetricsRead + } + T.idle += now.Sub(start) + } + metrics.Workers[T.id] = rob.WorkerMetrics{ + LastActive: lastActive, + Idle: T.idle, Active: dur - T.idle, }