From 557f7e52f6bd6d62c7109722ae187e3fa32748d7 Mon Sep 17 00:00:00 2001 From: Garet Halliday <me@garet.holiday> Date: Tue, 5 Sep 2023 16:31:35 -0500 Subject: [PATCH] fix --- lib/gat/modes/pgbouncer/config.go | 12 ++++++++++++ lib/gat/modes/pgbouncer/pools.go | 17 +++++++---------- lib/gat/pool/metrics.go | 21 ++++++++++++++++++++- lib/gat/pool/pool.go | 5 ++++- lib/gat/pool/pools/session/pooler.go | 4 ++-- lib/gat/pools.go | 9 +++++++++ pgbouncer.ini | 2 +- 7 files changed, 55 insertions(+), 15 deletions(-) diff --git a/lib/gat/modes/pgbouncer/config.go b/lib/gat/modes/pgbouncer/config.go index 94f201f1..e97f8faf 100644 --- a/lib/gat/modes/pgbouncer/config.go +++ b/lib/gat/modes/pgbouncer/config.go @@ -5,12 +5,14 @@ import ( "net" "strconv" "strings" + "time" "tuxpa.in/a/zlog/log" "pggat2/lib/bouncer" "pggat2/lib/bouncer/frontends/v0" "pggat2/lib/gat" + "pggat2/lib/gat/pool" "pggat2/lib/util/encoding/ini" "pggat2/lib/util/flip" "pggat2/lib/util/strutil" @@ -285,6 +287,16 @@ func (T *Config) ListenAndServe() error { return err } + go func() { + var metrics pool.Metrics + for { + metrics.Clear() + time.Sleep(1 * time.Second) + pools.ReadMetrics(&metrics) + log.Print(metrics.String()) + } + }() + var bank flip.Bank if T.PgBouncer.ListenAddr != "" { diff --git a/lib/gat/modes/pgbouncer/pools.go b/lib/gat/modes/pgbouncer/pools.go index 30320211..5ecc862b 100644 --- a/lib/gat/modes/pgbouncer/pools.go +++ b/lib/gat/modes/pgbouncer/pools.go @@ -168,16 +168,6 @@ func (T *Pools) Lookup(user, database string) *pool.Pool { return nil } - go func() { - var metrics pool.Metrics - for { - time.Sleep(1 * time.Second) - metrics.Clear() - p.ReadMetrics(&metrics) - log.Println(metrics.String()) - } - }() - T.pools.Store(poolKey{ User: user, Database: database, @@ -253,6 +243,13 @@ func (T *Pools) Lookup(user, database string) *pool.Pool { return p } +func (T *Pools) ReadMetrics(metrics *pool.Metrics) { + T.pools.Range(func(_ poolKey, p *pool.Pool) bool { + p.ReadMetrics(metrics) + return true + }) +} + func (T *Pools) RegisterKey(key [8]byte, user, database string) { p := T.Lookup(user, database) if p == nil { diff --git a/lib/gat/pool/metrics.go b/lib/gat/pool/metrics.go index 2a6ad16c..c957e50a 100644 --- a/lib/gat/pool/metrics.go +++ b/lib/gat/pool/metrics.go @@ -16,6 +16,24 @@ type Metrics struct { Clients map[uuid.UUID]ItemMetrics } +func (T *Metrics) TransactionCount() int { + var serverTransactions int + var clientTransactions int + + for _, server := range T.Servers { + serverTransactions += server.Transactions + } + + for _, client := range T.Clients { + clientTransactions += client.Transactions + } + + if clientTransactions > serverTransactions { + return clientTransactions + } + return serverTransactions +} + func (T *Metrics) ServerStateCount() (active, idle, stalling int) { for _, server := range T.Servers { switch server.Peer { @@ -90,7 +108,8 @@ func (T *Metrics) String() string { serverActiveUtil, serverIdleUtil, serverStallingUtil := T.ServerStateUtil() clientActive, clientIdle, clientStalling := T.ClientStateCount() clientActiveUtil, clientIdleUtil, clientStallingUtil := T.ClientStateUtil() - return fmt.Sprintf("%d servers (%d (%.2f%%) active, %d (%.2f%%) idle, %d (%.2f%%) stalling) | %d clients (%d (%.2f%%) active, %d (%.2f%%) idle, %d (%.2f%%) stalling)", + return fmt.Sprintf("%d transactions | %d servers (%d (%.2f%%) active, %d (%.2f%%) idle, %d (%.2f%%) stalling) | %d clients (%d (%.2f%%) active, %d (%.2f%%) idle, %d (%.2f%%) stalling)", + T.TransactionCount(), len(T.Servers), serverActive, serverActiveUtil*100, diff --git a/lib/gat/pool/pool.go b/lib/gat/pool/pool.go index cc1648b1..85ccfda4 100644 --- a/lib/gat/pool/pool.go +++ b/lib/gat/pool/pool.go @@ -263,7 +263,7 @@ func (T *Pool) RemoveRecipe(name string) { } func (T *Pool) ScaleUp() { - T.recipes.Range(func(_ string, r *poolRecipe) bool { + failed := T.recipes.Range(func(_ string, r *poolRecipe) bool { // this can race, but it will just dial an extra server and disconnect it in worst case if r.recipe.MaxConnections == 0 || r.Size() < r.recipe.MaxConnections { T.scaleUpRecipe(r) @@ -272,6 +272,9 @@ func (T *Pool) ScaleUp() { return true }) + if failed { + log.Println("No available recipe found to scale up") + } } func syncInitialParameters( diff --git a/lib/gat/pool/pools/session/pooler.go b/lib/gat/pool/pools/session/pooler.go index fdf33215..22539867 100644 --- a/lib/gat/pool/pools/session/pooler.go +++ b/lib/gat/pool/pools/session/pooler.go @@ -85,9 +85,9 @@ func (T *Pooler) AcquireBlocking() uuid.UUID { func (T *Pooler) Acquire(_ uuid.UUID, mode pool.SyncMode) uuid.UUID { switch mode { - case pool.SyncModeBlocking: - return T.TryAcquire() case pool.SyncModeNonBlocking: + return T.TryAcquire() + case pool.SyncModeBlocking: return T.AcquireBlocking() default: return uuid.Nil diff --git a/lib/gat/pools.go b/lib/gat/pools.go index 6cb9e834..58658d2a 100644 --- a/lib/gat/pools.go +++ b/lib/gat/pools.go @@ -8,6 +8,8 @@ import ( type Pools interface { Lookup(user, database string) *pool.Pool + ReadMetrics(metrics *pool.Metrics) + // Key based lookup functions (for cancellation) RegisterKey(key [8]byte, user, database string) @@ -48,6 +50,13 @@ func (T *PoolsMap) Lookup(user, database string) *pool.Pool { return p } +func (T *PoolsMap) ReadMetrics(metrics *pool.Metrics) { + T.pools.Range(func(_ mapKey, p *pool.Pool) bool { + p.ReadMetrics(metrics) + return true + }) +} + // key based lookup funcs func (T *PoolsMap) RegisterKey(key [8]byte, user, database string) { diff --git a/pgbouncer.ini b/pgbouncer.ini index d458e49a..655239da 100644 --- a/pgbouncer.ini +++ b/pgbouncer.ini @@ -1,5 +1,5 @@ [pgbouncer] -pool_mode = transaction +pool_mode = session auth_file = userlist.txt listen_addr = * track_extra_parameters = IntervalStyle, session_authorization, default_transaction_read_only, search_path -- GitLab