From 62208f73f9be166a3fe85f2a51765ccfdb53a230 Mon Sep 17 00:00:00 2001
From: Garet Halliday <me@garet.holiday>
Date: Wed, 4 Oct 2023 17:53:12 -0500
Subject: [PATCH] hmmmm
---
lib/{bouncer => bounce}/backends/v0/accept.go | 3 +-
lib/{bouncer => bounce}/backends/v0/cancel.go | 0
.../backends/v0/context.go | 0
lib/{bouncer => bounce}/backends/v0/errors.go | 0
.../backends/v0/options.go | 3 +-
lib/{bouncer => bounce}/backends/v0/query.go | 0
.../bouncers/v2/bouncer.go | 0
.../frontends/v0/accept.go | 0
.../frontends/v0/authenticate.go | 0
.../frontends/v0/context.go | 0
.../frontends/v0/options.go | 0
.../frontends/v0/params.go | 0
lib/{bouncer => bounce}/sslmode.go | 2 +-
lib/gat/gatcaddyfile/handler.go | 9 +-
lib/gat/handlers/discovery/config.go | 3 +-
.../google_cloud_sql/discoverer.go | 7 +-
lib/gat/handlers/discovery/module.go | 29 +-
lib/gat/handlers/pgbouncer/config.go | 9 +-
lib/gat/handlers/pgbouncer/module.go | 11 +-
lib/gat/handlers/pgbouncer_spilo/module.go | 5 +-
lib/gat/handlers/pool/config.go | 4 +-
lib/gat/handlers/pool/module.go | 6 +-
lib/gat/pool/client.go | 55 --
lib/gat/pool/config.go | 61 ---
lib/gat/pool/conn.go | 120 -----
lib/gat/pool/errors.go | 5 -
lib/gat/pool/flow.go | 115 ----
lib/gat/pool/pool.go | 497 ------------------
lib/gat/pool/recipe.go | 5 -
lib/gat/pool/scaler.go | 115 ----
lib/gat/pool/server.go | 57 --
lib/gat/pool/withcredentials.go | 8 -
lib/pool/backend.go | 209 ++++++++
lib/pool/config.go | 12 +
lib/pool/errors.go | 7 +
lib/pool/frontend.go | 5 +
lib/{gat => }/pool/pooler.go | 0
lib/{gat => }/pool/recipe/config.go | 0
lib/{gat => }/pool/recipe/dialer.go | 4 +-
lib/{gat => }/pool/recipe/recipe.go | 0
test/config.go | 2 +-
test/runner.go | 2 +-
test/tester_test.go | 5 +-
43 files changed, 280 insertions(+), 1095 deletions(-)
rename lib/{bouncer => bounce}/backends/v0/accept.go (99%)
rename lib/{bouncer => bounce}/backends/v0/cancel.go (100%)
rename lib/{bouncer => bounce}/backends/v0/context.go (100%)
rename lib/{bouncer => bounce}/backends/v0/errors.go (100%)
rename lib/{bouncer => bounce}/backends/v0/options.go (81%)
rename lib/{bouncer => bounce}/backends/v0/query.go (100%)
rename lib/{bouncer => bounce}/bouncers/v2/bouncer.go (100%)
rename lib/{bouncer => bounce}/frontends/v0/accept.go (100%)
rename lib/{bouncer => bounce}/frontends/v0/authenticate.go (100%)
rename lib/{bouncer => bounce}/frontends/v0/context.go (100%)
rename lib/{bouncer => bounce}/frontends/v0/options.go (100%)
rename lib/{bouncer => bounce}/frontends/v0/params.go (100%)
rename lib/{bouncer => bounce}/sslmode.go (97%)
delete mode 100644 lib/gat/pool/client.go
delete mode 100644 lib/gat/pool/config.go
delete mode 100644 lib/gat/pool/conn.go
delete mode 100644 lib/gat/pool/errors.go
delete mode 100644 lib/gat/pool/flow.go
delete mode 100644 lib/gat/pool/pool.go
delete mode 100644 lib/gat/pool/recipe.go
delete mode 100644 lib/gat/pool/scaler.go
delete mode 100644 lib/gat/pool/server.go
delete mode 100644 lib/gat/pool/withcredentials.go
create mode 100644 lib/pool/backend.go
create mode 100644 lib/pool/config.go
create mode 100644 lib/pool/errors.go
create mode 100644 lib/pool/frontend.go
rename lib/{gat => }/pool/pooler.go (100%)
rename lib/{gat => }/pool/recipe/config.go (100%)
rename lib/{gat => }/pool/recipe/dialer.go (91%)
rename lib/{gat => }/pool/recipe/recipe.go (100%)
diff --git a/lib/bouncer/backends/v0/accept.go b/lib/bounce/backends/v0/accept.go
similarity index 99%
rename from lib/bouncer/backends/v0/accept.go
rename to lib/bounce/backends/v0/accept.go
index d094f2f1..7f29f985 100644
--- a/lib/bouncer/backends/v0/accept.go
+++ b/lib/bounce/backends/v0/accept.go
@@ -6,7 +6,6 @@ import (
"io"
"gfx.cafe/gfx/pggat/lib/auth"
- "gfx.cafe/gfx/pggat/lib/bouncer"
"gfx.cafe/gfx/pggat/lib/fed"
packets "gfx.cafe/gfx/pggat/lib/fed/packets/v3.0"
"gfx.cafe/gfx/pggat/lib/util/strutil"
@@ -346,7 +345,7 @@ func accept(ctx *acceptContext) error {
func Accept(
conn *fed.Conn,
- sslMode bouncer.SSLMode,
+ sslMode bounce.SSLMode,
sslConfig *tls.Config,
username string,
credentials auth.Credentials,
diff --git a/lib/bouncer/backends/v0/cancel.go b/lib/bounce/backends/v0/cancel.go
similarity index 100%
rename from lib/bouncer/backends/v0/cancel.go
rename to lib/bounce/backends/v0/cancel.go
diff --git a/lib/bouncer/backends/v0/context.go b/lib/bounce/backends/v0/context.go
similarity index 100%
rename from lib/bouncer/backends/v0/context.go
rename to lib/bounce/backends/v0/context.go
diff --git a/lib/bouncer/backends/v0/errors.go b/lib/bounce/backends/v0/errors.go
similarity index 100%
rename from lib/bouncer/backends/v0/errors.go
rename to lib/bounce/backends/v0/errors.go
diff --git a/lib/bouncer/backends/v0/options.go b/lib/bounce/backends/v0/options.go
similarity index 81%
rename from lib/bouncer/backends/v0/options.go
rename to lib/bounce/backends/v0/options.go
index 9d98d466..f23feb09 100644
--- a/lib/bouncer/backends/v0/options.go
+++ b/lib/bounce/backends/v0/options.go
@@ -4,12 +4,11 @@ import (
"crypto/tls"
"gfx.cafe/gfx/pggat/lib/auth"
- "gfx.cafe/gfx/pggat/lib/bouncer"
"gfx.cafe/gfx/pggat/lib/util/strutil"
)
type acceptOptions struct {
- SSLMode bouncer.SSLMode
+ SSLMode bounce.SSLMode
SSLConfig *tls.Config
Username string
Credentials auth.Credentials
diff --git a/lib/bouncer/backends/v0/query.go b/lib/bounce/backends/v0/query.go
similarity index 100%
rename from lib/bouncer/backends/v0/query.go
rename to lib/bounce/backends/v0/query.go
diff --git a/lib/bouncer/bouncers/v2/bouncer.go b/lib/bounce/bouncers/v2/bouncer.go
similarity index 100%
rename from lib/bouncer/bouncers/v2/bouncer.go
rename to lib/bounce/bouncers/v2/bouncer.go
diff --git a/lib/bouncer/frontends/v0/accept.go b/lib/bounce/frontends/v0/accept.go
similarity index 100%
rename from lib/bouncer/frontends/v0/accept.go
rename to lib/bounce/frontends/v0/accept.go
diff --git a/lib/bouncer/frontends/v0/authenticate.go b/lib/bounce/frontends/v0/authenticate.go
similarity index 100%
rename from lib/bouncer/frontends/v0/authenticate.go
rename to lib/bounce/frontends/v0/authenticate.go
diff --git a/lib/bouncer/frontends/v0/context.go b/lib/bounce/frontends/v0/context.go
similarity index 100%
rename from lib/bouncer/frontends/v0/context.go
rename to lib/bounce/frontends/v0/context.go
diff --git a/lib/bouncer/frontends/v0/options.go b/lib/bounce/frontends/v0/options.go
similarity index 100%
rename from lib/bouncer/frontends/v0/options.go
rename to lib/bounce/frontends/v0/options.go
diff --git a/lib/bouncer/frontends/v0/params.go b/lib/bounce/frontends/v0/params.go
similarity index 100%
rename from lib/bouncer/frontends/v0/params.go
rename to lib/bounce/frontends/v0/params.go
diff --git a/lib/bouncer/sslmode.go b/lib/bounce/sslmode.go
similarity index 97%
rename from lib/bouncer/sslmode.go
rename to lib/bounce/sslmode.go
index cb8b63d5..335e52ab 100644
--- a/lib/bouncer/sslmode.go
+++ b/lib/bounce/sslmode.go
@@ -1,4 +1,4 @@
-package bouncer
+package bounce
type SSLMode string
diff --git a/lib/gat/gatcaddyfile/handler.go b/lib/gat/gatcaddyfile/handler.go
index 75847872..501c4173 100644
--- a/lib/gat/gatcaddyfile/handler.go
+++ b/lib/gat/gatcaddyfile/handler.go
@@ -13,7 +13,6 @@ import (
"github.com/caddyserver/caddy/v2/caddyconfig"
"github.com/caddyserver/caddy/v2/caddyconfig/caddyfile"
- "gfx.cafe/gfx/pggat/lib/bouncer"
"gfx.cafe/gfx/pggat/lib/gat/handlers/allowed_startup_parameters"
"gfx.cafe/gfx/pggat/lib/gat/handlers/discovery"
"gfx.cafe/gfx/pggat/lib/gat/handlers/pgbouncer"
@@ -141,7 +140,7 @@ func init() {
"pooler",
warnings,
),
- ServerSSLMode: bouncer.SSLModePrefer,
+ ServerSSLMode: bounce.SSLModePrefer,
ServerSSL: JSONModuleObject(
&insecure_skip_verify.Client{},
SSLClient,
@@ -221,7 +220,7 @@ func init() {
return nil, d.ArgErr()
}
- module.ServerSSLMode = bouncer.SSLMode(d.Val())
+ module.ServerSSLMode = bounce.SSLMode(d.Val())
if !d.NextArg() {
return nil, d.ArgErr()
@@ -390,7 +389,7 @@ func init() {
warnings,
),
- ServerSSLMode: bouncer.SSLModePrefer,
+ ServerSSLMode: bounce.SSLModePrefer,
ServerSSL: JSONModuleObject(
&insecure_skip_verify.Client{},
SSLClient,
@@ -455,7 +454,7 @@ func init() {
return nil, d.ArgErr()
}
- module.ServerSSLMode = bouncer.SSLMode(d.Val())
+ module.ServerSSLMode = bounce.SSLMode(d.Val())
if !d.NextArg() {
return nil, d.ArgErr()
diff --git a/lib/gat/handlers/discovery/config.go b/lib/gat/handlers/discovery/config.go
index b2712561..bb4024b6 100644
--- a/lib/gat/handlers/discovery/config.go
+++ b/lib/gat/handlers/discovery/config.go
@@ -3,7 +3,6 @@ package discovery
import (
"encoding/json"
- "gfx.cafe/gfx/pggat/lib/bouncer"
"gfx.cafe/gfx/pggat/lib/util/dur"
)
@@ -15,7 +14,7 @@ type Config struct {
Pooler json.RawMessage `json:"pooler" caddy:"namespace=pggat.poolers inline_key=pooler"`
- ServerSSLMode bouncer.SSLMode `json:"server_ssl_mode,omitempty"`
+ ServerSSLMode bounce.SSLMode `json:"server_ssl_mode,omitempty"`
ServerSSL json.RawMessage `json:"server_ssl,omitempty" caddy:"namespace=pggat.ssl.clients inline_key=provider"`
ServerStartupParameters map[string]string `json:"server_startup_parameters,omitempty"`
diff --git a/lib/gat/handlers/discovery/discoverers/google_cloud_sql/discoverer.go b/lib/gat/handlers/discovery/discoverers/google_cloud_sql/discoverer.go
index 6bb9351b..cbaa009b 100644
--- a/lib/gat/handlers/discovery/discoverers/google_cloud_sql/discoverer.go
+++ b/lib/gat/handlers/discovery/discoverers/google_cloud_sql/discoverer.go
@@ -5,16 +5,13 @@ import (
"net"
"strings"
- "gfx.cafe/gfx/pggat/lib/gat/pool/recipe"
-
"github.com/caddyserver/caddy/v2"
sqladmin "google.golang.org/api/sqladmin/v1beta4"
"gfx.cafe/gfx/pggat/lib/gat/handlers/discovery"
+ "gfx.cafe/gfx/pggat/lib/pool/recipe"
"gfx.cafe/gfx/pggat/lib/auth/credentials"
- "gfx.cafe/gfx/pggat/lib/bouncer"
- "gfx.cafe/gfx/pggat/lib/bouncer/bouncers/v2"
"gfx.cafe/gfx/pggat/lib/fed"
"gfx.cafe/gfx/pggat/lib/gsql"
)
@@ -120,7 +117,7 @@ func (T *Discoverer) instanceToCluster(primary *sqladmin.DatabaseInstance, repli
admin, err = recipe.Dialer{
Network: "tcp",
Address: primaryAddress,
- SSLMode: bouncer.SSLModePrefer,
+ SSLMode: bounce.SSLModePrefer,
SSLConfig: &tls.Config{
InsecureSkipVerify: true,
},
diff --git a/lib/gat/handlers/discovery/module.go b/lib/gat/handlers/discovery/module.go
index bd6614d2..73e4bf49 100644
--- a/lib/gat/handlers/discovery/module.go
+++ b/lib/gat/handlers/discovery/module.go
@@ -9,14 +9,15 @@ import (
"github.com/caddyserver/caddy/v2"
"go.uber.org/zap"
+ "gfx.cafe/gfx/pggat/lib/gat/pool"
+
"gfx.cafe/gfx/pggat/lib/auth"
"gfx.cafe/gfx/pggat/lib/auth/credentials"
"gfx.cafe/gfx/pggat/lib/bouncer/frontends/v0"
"gfx.cafe/gfx/pggat/lib/fed"
"gfx.cafe/gfx/pggat/lib/gat"
"gfx.cafe/gfx/pggat/lib/gat/metrics"
- "gfx.cafe/gfx/pggat/lib/gat/pool"
- "gfx.cafe/gfx/pggat/lib/gat/pool/recipe"
+ recipe2 "gfx.cafe/gfx/pggat/lib/pool/recipe"
"gfx.cafe/gfx/pggat/lib/util/maps"
"gfx.cafe/gfx/pggat/lib/util/slices"
"gfx.cafe/gfx/pggat/lib/util/strutil"
@@ -223,7 +224,7 @@ func (T *Module) replacePrimary(users []User, databases []string, endpoint Endpo
for _, user := range users {
primaryCreds, _ := T.creds(user)
for _, database := range databases {
- primary := recipe.Dialer{
+ primary := recipe2.Dialer{
Network: endpoint.Network,
Address: endpoint.Address,
Username: user.Username,
@@ -240,7 +241,7 @@ func (T *Module) replacePrimary(users []User, databases []string, endpoint Endpo
}
p.RemoveRecipe("primary")
- p.AddRecipe("primary", recipe.NewRecipe(recipe.Config{
+ p.AddRecipe("primary", recipe2.NewRecipe(recipe2.Config{
Dialer: primary,
}))
}
@@ -258,7 +259,7 @@ func (T *Module) addReplicas(replicas map[string]Endpoint, users []User, databas
}
for id, r := range replicas {
- replica := recipe.Dialer{
+ replica := recipe2.Dialer{
Network: r.Network,
Address: r.Address,
Username: user.Username,
@@ -268,7 +269,7 @@ func (T *Module) addReplicas(replicas map[string]Endpoint, users []User, databas
SSLConfig: T.sslConfig,
StartupParameters: T.serverStartupParameters,
}
- replicaPool.AddRecipe(id, recipe.NewRecipe(recipe.Config{
+ replicaPool.AddRecipe(id, recipe2.NewRecipe(recipe2.Config{
Dialer: replica,
}))
}
@@ -297,7 +298,7 @@ func (T *Module) addReplica(users []User, databases []string, id string, endpoin
continue
}
- replica := recipe.Dialer{
+ replica := recipe2.Dialer{
Network: endpoint.Network,
Address: endpoint.Address,
Username: user.Username,
@@ -307,7 +308,7 @@ func (T *Module) addReplica(users []User, databases []string, id string, endpoin
SSLConfig: T.sslConfig,
StartupParameters: T.serverStartupParameters,
}
- p.AddRecipe(id, recipe.NewRecipe(recipe.Config{
+ p.AddRecipe(id, recipe2.NewRecipe(recipe2.Config{
Dialer: replica,
}))
}
@@ -331,7 +332,7 @@ func (T *Module) addUser(primaryEndpoint Endpoint, replicas map[string]Endpoint,
replicaUsername := T.replicaUsername(user.Username)
primaryCreds, replicaCreds := T.creds(user)
for _, database := range databases {
- base := recipe.Dialer{
+ base := recipe2.Dialer{
Username: user.Username,
Credentials: primaryCreds,
Database: database,
@@ -348,7 +349,7 @@ func (T *Module) addUser(primaryEndpoint Endpoint, replicas map[string]Endpoint,
Pool: T.pooler.NewPool(),
Credentials: primaryCreds,
}
- primaryPool.AddRecipe("primary", recipe.NewRecipe(recipe.Config{
+ primaryPool.AddRecipe("primary", recipe2.NewRecipe(recipe2.Config{
Dialer: primary,
}))
T.addPool(user.Username, database, primaryPool)
@@ -363,7 +364,7 @@ func (T *Module) addUser(primaryEndpoint Endpoint, replicas map[string]Endpoint,
replica := base
replica.Network = r.Network
replica.Address = r.Address
- replicaPool.AddRecipe(id, recipe.NewRecipe(recipe.Config{
+ replicaPool.AddRecipe(id, recipe2.NewRecipe(recipe2.Config{
Dialer: replica,
}))
}
@@ -390,7 +391,7 @@ func (T *Module) addDatabase(primaryEndpoint Endpoint, replicas map[string]Endpo
replicaUsername := T.replicaUsername(user.Username)
primaryCreds, replicaCreds := T.creds(user)
- base := recipe.Dialer{
+ base := recipe2.Dialer{
Username: user.Username,
Credentials: primaryCreds,
Database: database,
@@ -407,7 +408,7 @@ func (T *Module) addDatabase(primaryEndpoint Endpoint, replicas map[string]Endpo
Pool: T.pooler.NewPool(),
Credentials: primaryCreds,
}
- primaryPool.AddRecipe("primary", recipe.NewRecipe(recipe.Config{
+ primaryPool.AddRecipe("primary", recipe2.NewRecipe(recipe2.Config{
Dialer: primary,
}))
T.addPool(user.Username, database, primaryPool)
@@ -422,7 +423,7 @@ func (T *Module) addDatabase(primaryEndpoint Endpoint, replicas map[string]Endpo
replica := base
replica.Network = r.Network
replica.Address = r.Address
- replicaPool.AddRecipe(id, recipe.NewRecipe(recipe.Config{
+ replicaPool.AddRecipe(id, recipe2.NewRecipe(recipe2.Config{
Dialer: replica,
}))
}
diff --git a/lib/gat/handlers/pgbouncer/config.go b/lib/gat/handlers/pgbouncer/config.go
index 6f3b179a..146c519a 100644
--- a/lib/gat/handlers/pgbouncer/config.go
+++ b/lib/gat/handlers/pgbouncer/config.go
@@ -8,7 +8,6 @@ import (
"github.com/caddyserver/caddy/v2/caddyconfig"
- "gfx.cafe/gfx/pggat/lib/bouncer"
"gfx.cafe/gfx/pggat/lib/gat"
"gfx.cafe/gfx/pggat/lib/gat/ssl/servers/x509_key_pair"
"gfx.cafe/gfx/pggat/lib/util/encoding/ini"
@@ -111,7 +110,7 @@ type PgBouncer struct {
DnsNxdomainTtl float64 `ini:"dns_nxdomain_ttl"`
DnsZoneCheckPeriod float64 `ini:"dns_zone_check_period"`
ResolvConf string `ini:"resolv.conf"`
- ClientTLSSSLMode bouncer.SSLMode `ini:"client_tls_sslmode"`
+ ClientTLSSSLMode bounce.SSLMode `ini:"client_tls_sslmode"`
ClientTLSKeyFile string `ini:"client_tls_key_file"`
ClientTLSCertFile string `ini:"client_tls_cert_file"`
ClientTLSCaFile string `ini:"client_tls_ca_file"`
@@ -119,7 +118,7 @@ type PgBouncer struct {
ClientTLSCiphers []TLSCipher `ini:"client_tls_ciphers"`
ClientTLSECDHCurve TLSECDHCurve `ini:"client_tls_ecdhcurve"`
ClientTLSDHEParams TLSDHEParams `ini:"client_tls_dheparams"`
- ServerTLSSSLMode bouncer.SSLMode `ini:"server_tls_sslmode"`
+ ServerTLSSSLMode bounce.SSLMode `ini:"server_tls_sslmode"`
ServerTLSCaFile string `ini:"server_tls_ca_file"`
ServerTLSKeyFile string `ini:"server_tls_key_file"`
ServerTLSCertFile string `ini:"server_tls_cert_file"`
@@ -211,7 +210,7 @@ var Default = Config{
AutodbIdleTimeout: 3600.0,
DnsMaxTtl: 15.0,
DnsNxdomainTtl: 15.0,
- ClientTLSSSLMode: bouncer.SSLModeDisable,
+ ClientTLSSSLMode: bounce.SSLModeDisable,
ClientTLSProtocols: []TLSProtocol{
TLSProtocolSecure,
},
@@ -219,7 +218,7 @@ var Default = Config{
"fast",
},
ClientTLSECDHCurve: "auto",
- ServerTLSSSLMode: bouncer.SSLModePrefer,
+ ServerTLSSSLMode: bounce.SSLModePrefer,
ServerTLSProtocols: []TLSProtocol{
TLSProtocolSecure,
},
diff --git a/lib/gat/handlers/pgbouncer/module.go b/lib/gat/handlers/pgbouncer/module.go
index a5a8f025..962c45f3 100644
--- a/lib/gat/handlers/pgbouncer/module.go
+++ b/lib/gat/handlers/pgbouncer/module.go
@@ -19,14 +19,15 @@ import (
"gfx.cafe/gfx/pggat/lib/gat/poolers/session"
"gfx.cafe/gfx/pggat/lib/gat/poolers/transaction"
"gfx.cafe/gfx/pggat/lib/perror"
+ recipe2 "gfx.cafe/gfx/pggat/lib/pool/recipe"
"gfx.cafe/gfx/pggat/lib/util/dur"
"gfx.cafe/gfx/pggat/lib/util/slices"
+ "gfx.cafe/gfx/pggat/lib/gat/pool"
+
"gfx.cafe/gfx/pggat/lib/auth/credentials"
"gfx.cafe/gfx/pggat/lib/gat"
"gfx.cafe/gfx/pggat/lib/gat/metrics"
- "gfx.cafe/gfx/pggat/lib/gat/pool"
- "gfx.cafe/gfx/pggat/lib/gat/pool/recipe"
"gfx.cafe/gfx/pggat/lib/gsql"
"gfx.cafe/gfx/pggat/lib/util/maps"
"gfx.cafe/gfx/pggat/lib/util/strutil"
@@ -214,7 +215,7 @@ func (T *Module) tryCreate(user, database string) (pool.WithCredentials, bool) {
serverCreds = credentials.FromString(user, db.Password)
}
- dialer := recipe.Dialer{
+ dialer := recipe2.Dialer{
SSLMode: T.Config.PgBouncer.ServerTLSSSLMode,
SSLConfig: &tls.Config{
InsecureSkipVerify: true, // TODO(garet)
@@ -254,7 +255,7 @@ func (T *Module) tryCreate(user, database string) (pool.WithCredentials, bool) {
dialer.Address = address
}
- recipeOptions := recipe.Config{
+ recipeOptions := recipe2.Config{
Dialer: dialer,
MinConnections: db.MinPoolSize,
MaxConnections: db.MaxDBConnections,
@@ -265,7 +266,7 @@ func (T *Module) tryCreate(user, database string) (pool.WithCredentials, bool) {
if recipeOptions.MaxConnections == 0 {
recipeOptions.MaxConnections = T.Config.PgBouncer.MaxDBConnections
}
- r := recipe.NewRecipe(recipeOptions)
+ r := recipe2.NewRecipe(recipeOptions)
p.AddRecipe("pgbouncer", r)
diff --git a/lib/gat/handlers/pgbouncer_spilo/module.go b/lib/gat/handlers/pgbouncer_spilo/module.go
index dc41720b..a71759bd 100644
--- a/lib/gat/handlers/pgbouncer_spilo/module.go
+++ b/lib/gat/handlers/pgbouncer_spilo/module.go
@@ -7,7 +7,6 @@ import (
"gfx.cafe/gfx/pggat/lib/gat/handlers/pgbouncer"
- "gfx.cafe/gfx/pggat/lib/bouncer"
"gfx.cafe/gfx/pggat/lib/gat"
"gfx.cafe/gfx/pggat/lib/util/strutil"
@@ -52,12 +51,12 @@ func (T *Module) Provision(ctx caddy.Context) error {
pgb.PgBouncer.LogFile = "/var/log/pgbouncer/pgbouncer.log"
pgb.PgBouncer.PidFile = "/var/run/pgbouncer/pgbouncer.pid"
- pgb.PgBouncer.ServerTLSSSLMode = bouncer.SSLModeRequire
+ pgb.PgBouncer.ServerTLSSSLMode = bounce.SSLModeRequire
pgb.PgBouncer.ServerTLSCaFile = "/etc/ssl/certs/pgbouncer.crt"
pgb.PgBouncer.ServerTLSProtocols = []pgbouncer.TLSProtocol{
pgbouncer.TLSProtocolSecure,
}
- pgb.PgBouncer.ClientTLSSSLMode = bouncer.SSLModeRequire
+ pgb.PgBouncer.ClientTLSSSLMode = bounce.SSLModeRequire
pgb.PgBouncer.ClientTLSKeyFile = "/etc/ssl/certs/pgbouncer.key"
pgb.PgBouncer.ClientTLSCertFile = "/etc/ssl/certs/pgbouncer.crt"
diff --git a/lib/gat/handlers/pool/config.go b/lib/gat/handlers/pool/config.go
index 5a2fc336..49aa27a3 100644
--- a/lib/gat/handlers/pool/config.go
+++ b/lib/gat/handlers/pool/config.go
@@ -2,8 +2,6 @@ package pool_handler
import (
"encoding/json"
-
- "gfx.cafe/gfx/pggat/lib/bouncer"
)
type Config struct {
@@ -11,7 +9,7 @@ type Config struct {
// Server connect options
ServerAddress string `jsonn:"server_address"`
- ServerSSLMode bouncer.SSLMode `json:"server_ssl_mode,omitempty"`
+ ServerSSLMode bounce.SSLMode `json:"server_ssl_mode,omitempty"`
ServerSSL json.RawMessage `json:"server_ssl,omitempty" caddy:"namespace=pggat.ssl.clients inline_key=provider"`
// Server routing options
diff --git a/lib/gat/handlers/pool/module.go b/lib/gat/handlers/pool/module.go
index 61b0085a..bf627ddb 100644
--- a/lib/gat/handlers/pool/module.go
+++ b/lib/gat/handlers/pool/module.go
@@ -12,7 +12,7 @@ import (
"gfx.cafe/gfx/pggat/lib/fed"
"gfx.cafe/gfx/pggat/lib/gat"
"gfx.cafe/gfx/pggat/lib/gat/metrics"
- "gfx.cafe/gfx/pggat/lib/gat/pool/recipe"
+ recipe2 "gfx.cafe/gfx/pggat/lib/pool/recipe"
"gfx.cafe/gfx/pggat/lib/util/strutil"
)
@@ -65,7 +65,7 @@ func (T *Module) Provision(ctx caddy.Context) error {
network = "tcp"
}
- d := recipe.Dialer{
+ d := recipe2.Dialer{
Network: network,
Address: T.ServerAddress,
SSLMode: T.ServerSSLMode,
@@ -77,7 +77,7 @@ func (T *Module) Provision(ctx caddy.Context) error {
}
T.pool = pooler.NewPool()
- T.pool.AddRecipe("pool", recipe.NewRecipe(recipe.Config{
+ T.pool.AddRecipe("pool", recipe2.NewRecipe(recipe2.Config{
Dialer: d,
}))
diff --git a/lib/gat/pool/client.go b/lib/gat/pool/client.go
deleted file mode 100644
index c6d3c76e..00000000
--- a/lib/gat/pool/client.go
+++ /dev/null
@@ -1,55 +0,0 @@
-package pool
-
-import (
- "gfx.cafe/gfx/pggat/lib/fed"
- "gfx.cafe/gfx/pggat/lib/fed/middlewares/eqp"
- "gfx.cafe/gfx/pggat/lib/fed/middlewares/ps"
- "gfx.cafe/gfx/pggat/lib/fed/middlewares/unterminate"
-)
-
-type pooledClient struct {
- pooledConn
-
- ps *ps.Client
- eqp *eqp.Client
-}
-
-func newClient(
- options Config,
- conn *fed.Conn,
-) *pooledClient {
- conn.Middleware = append(
- conn.Middleware,
- unterminate.Unterminate,
- )
-
- var psClient *ps.Client
- if options.ParameterStatusSync == ParameterStatusSyncDynamic {
- // add ps middleware
- psClient = ps.NewClient(conn.InitialParameters)
- conn.Middleware = append(conn.Middleware, psClient)
- }
-
- var eqpClient *eqp.Client
- if options.ExtendedQuerySync {
- // add eqp middleware
- eqpClient = eqp.NewClient()
- conn.Middleware = append(conn.Middleware, eqpClient)
- }
-
- return &pooledClient{
- pooledConn: makeConn(
- conn,
- ),
- ps: psClient,
- eqp: eqpClient,
- }
-}
-
-func (T *pooledClient) GetEQP() *eqp.Client {
- return T.eqp
-}
-
-func (T *pooledClient) GetPS() *ps.Client {
- return T.ps
-}
diff --git a/lib/gat/pool/config.go b/lib/gat/pool/config.go
deleted file mode 100644
index 36604526..00000000
--- a/lib/gat/pool/config.go
+++ /dev/null
@@ -1,61 +0,0 @@
-package pool
-
-import (
- "go.uber.org/zap"
-
- "gfx.cafe/gfx/pggat/lib/util/dur"
- "gfx.cafe/gfx/pggat/lib/util/strutil"
-)
-
-type ParameterStatusSync int
-
-const (
- // ParameterStatusSyncNone does not attempt to sync parameter status.
- ParameterStatusSyncNone ParameterStatusSync = iota
- // ParameterStatusSyncInitial assumes both client and server have their initial status before syncing.
- // Use in session pooling for lower latency
- ParameterStatusSyncInitial
- // ParameterStatusSyncDynamic will track parameter status and ensure they are synced correctly.
- // Use in transaction pooling
- ParameterStatusSyncDynamic
-)
-
-type PoolingConfig struct {
- NewPooler func() Pooler
- // ReleaseAfterTransaction toggles whether servers should be released and re acquired after each transaction.
- // Use false for lower latency
- // Use true for better balancing
- ReleaseAfterTransaction bool
-
- // ParameterStatusSync is the parameter syncing mode
- ParameterStatusSync ParameterStatusSync
-
- // ExtendedQuerySync controls whether prepared statements and portals should be tracked and synced before use.
- // Use false for lower latency
- // Use true for transaction pooling
- ExtendedQuerySync bool
-}
-
-type ManagementConfig struct {
- ServerResetQuery string `json:"server_reset_query,omitempty"`
- // ServerIdleTimeout defines how long a server may be idle before it is disconnected
- ServerIdleTimeout dur.Duration `json:"server_idle_timeout,omitempty"`
-
- // ServerReconnectInitialTime defines how long to wait initially before attempting a server reconnect
- // 0 = disable, don't retry
- ServerReconnectInitialTime dur.Duration `json:"server_reconnect_initial_time,omitempty"`
- // ServerReconnectMaxTime defines the max amount of time to wait before attempting a server reconnect
- // 0 = disable, back off infinitely
- ServerReconnectMaxTime dur.Duration `json:"server_reconnect_max_time,omitempty"`
-
- // TrackedParameters are parameters which should be synced by updating the server, not the client.
- TrackedParameters []strutil.CIString `json:"tracked_parameters,omitempty"`
-}
-
-type Config struct {
- PoolingConfig
-
- ManagementConfig
-
- Logger *zap.Logger
-}
diff --git a/lib/gat/pool/conn.go b/lib/gat/pool/conn.go
deleted file mode 100644
index 065a6458..00000000
--- a/lib/gat/pool/conn.go
+++ /dev/null
@@ -1,120 +0,0 @@
-package pool
-
-import (
- "sync"
- "sync/atomic"
- "time"
-
- "github.com/google/uuid"
-
- "gfx.cafe/gfx/pggat/lib/fed"
- "gfx.cafe/gfx/pggat/lib/gat/metrics"
- "gfx.cafe/gfx/pggat/lib/util/strutil"
-)
-
-type pooledConn struct {
- id uuid.UUID
-
- conn *fed.Conn
-
- // metrics
-
- transactionCount atomic.Int64
-
- lastMetricsRead time.Time
-
- state metrics.ConnState
- peer uuid.UUID
- since time.Time
-
- util [metrics.ConnStateCount]time.Duration
-
- mu sync.RWMutex
-}
-
-func makeConn(
- conn *fed.Conn,
-) pooledConn {
- return pooledConn{
- id: uuid.New(),
- conn: conn,
-
- since: time.Now(),
- }
-}
-
-func (T *pooledConn) GetID() uuid.UUID {
- return T.id
-}
-
-func (T *pooledConn) GetConn() *fed.Conn {
- return T.conn
-}
-
-func (T *pooledConn) GetInitialParameters() map[strutil.CIString]string {
- return T.conn.InitialParameters
-}
-
-func (T *pooledConn) GetBackendKey() [8]byte {
- return T.conn.BackendKey
-}
-
-func (T *pooledConn) TransactionComplete() {
- T.transactionCount.Add(1)
-}
-
-func (T *pooledConn) SetState(state metrics.ConnState, peer uuid.UUID) {
- T.mu.Lock()
- defer T.mu.Unlock()
-
- now := time.Now()
-
- var since time.Duration
- if T.since.Before(T.lastMetricsRead) {
- since = now.Sub(T.lastMetricsRead)
- } else {
- since = now.Sub(T.since)
- }
- T.util[T.state] += since
-
- T.state = state
- T.peer = peer
- T.since = now
-}
-
-func (T *pooledConn) GetState() (state metrics.ConnState, peer uuid.UUID, since time.Time) {
- T.mu.RLock()
- defer T.mu.RUnlock()
- state = T.state
- peer = T.peer
- since = T.since
- return
-}
-
-func (T *pooledConn) ReadMetrics(m *metrics.Conn) {
- T.mu.Lock()
- defer T.mu.Unlock()
-
- now := time.Now()
-
- m.Time = now
-
- m.State = T.state
- m.Peer = T.peer
- m.Since = T.since
-
- m.Utilization = T.util
- T.util = [metrics.ConnStateCount]time.Duration{}
-
- var since time.Duration
- if m.Since.Before(T.lastMetricsRead) {
- since = now.Sub(T.lastMetricsRead)
- } else {
- since = now.Sub(m.Since)
- }
- m.Utilization[m.State] += since
-
- m.TransactionCount = int(T.transactionCount.Swap(0))
-
- T.lastMetricsRead = now
-}
diff --git a/lib/gat/pool/errors.go b/lib/gat/pool/errors.go
deleted file mode 100644
index b0a18cec..00000000
--- a/lib/gat/pool/errors.go
+++ /dev/null
@@ -1,5 +0,0 @@
-package pool
-
-import "errors"
-
-var ErrClosed = errors.New("pool closed")
diff --git a/lib/gat/pool/flow.go b/lib/gat/pool/flow.go
deleted file mode 100644
index e1263621..00000000
--- a/lib/gat/pool/flow.go
+++ /dev/null
@@ -1,115 +0,0 @@
-package pool
-
-import (
- "gfx.cafe/gfx/pggat/lib/bouncer/backends/v0"
- "gfx.cafe/gfx/pggat/lib/fed"
- "gfx.cafe/gfx/pggat/lib/fed/middlewares/eqp"
- "gfx.cafe/gfx/pggat/lib/fed/middlewares/ps"
- packets "gfx.cafe/gfx/pggat/lib/fed/packets/v3.0"
- "gfx.cafe/gfx/pggat/lib/gat/metrics"
- "gfx.cafe/gfx/pggat/lib/util/slices"
-)
-
-func pair(options Config, client *pooledClient, server *pooledServer) (clientErr, serverErr error) {
- defer func() {
- client.SetState(metrics.ConnStateActive, server.GetID())
- server.SetState(metrics.ConnStateActive, client.GetID())
- }()
-
- if options.ParameterStatusSync != ParameterStatusSyncNone || options.ExtendedQuerySync {
- client.SetState(metrics.ConnStatePairing, server.GetID())
- server.SetState(metrics.ConnStatePairing, client.GetID())
- }
-
- switch options.ParameterStatusSync {
- case ParameterStatusSyncDynamic:
- clientErr, serverErr = ps.Sync(options.TrackedParameters, client.GetConn(), client.GetPS(), server.GetConn(), server.GetPS())
- case ParameterStatusSyncInitial:
- clientErr, serverErr = syncInitialParameters(options, client, server)
- }
-
- if clientErr != nil || serverErr != nil {
- return
- }
-
- if options.ExtendedQuerySync {
- serverErr = eqp.Sync(client.GetEQP(), server.GetConn(), server.GetEQP())
- }
-
- return
-}
-
-func syncInitialParameters(options Config, client *pooledClient, server *pooledServer) (clientErr, serverErr error) {
- clientParams := client.GetInitialParameters()
- serverParams := server.GetInitialParameters()
-
- var packet fed.Packet
-
- for key, value := range clientParams {
- // skip already set params
- if serverParams[key] == value {
- p := packets.ParameterStatus{
- Key: key.String(),
- Value: serverParams[key],
- }
- packet = p.IntoPacket(packet)
- clientErr = client.GetConn().WritePacket(packet)
- if clientErr != nil {
- return
- }
- continue
- }
-
- setServer := slices.Contains(options.TrackedParameters, key)
-
- if !setServer {
- value = serverParams[key]
- }
-
- p := packets.ParameterStatus{
- Key: key.String(),
- Value: value,
- }
- packet = p.IntoPacket(packet)
- clientErr = client.GetConn().WritePacket(packet)
- if clientErr != nil {
- return
- }
-
- if !setServer {
- continue
- }
-
- serverErr, _, packet = backends.SetParameter(server.GetConn(), nil, packet, key, value)
- if serverErr != nil {
- return
- }
- }
-
- for key, value := range serverParams {
- if _, ok := clientParams[key]; ok {
- continue
- }
-
- // Don't need to run reset on server because it will reset it to the initial value
-
- // send to client
- p := packets.ParameterStatus{
- Key: key.String(),
- Value: value,
- }
- packet = p.IntoPacket(packet)
- clientErr = client.GetConn().WritePacket(packet)
- if clientErr != nil {
- return
- }
- }
-
- return
-
-}
-
-func transactionComplete(client *pooledClient, server *pooledServer) {
- client.TransactionComplete()
- server.TransactionComplete()
-}
diff --git a/lib/gat/pool/pool.go b/lib/gat/pool/pool.go
deleted file mode 100644
index 8399e39b..00000000
--- a/lib/gat/pool/pool.go
+++ /dev/null
@@ -1,497 +0,0 @@
-package pool
-
-import (
- "errors"
- "sync"
- "sync/atomic"
- "time"
-
- "github.com/google/uuid"
- "go.uber.org/zap"
-
- "gfx.cafe/gfx/pggat/lib/bouncer/backends/v0"
- "gfx.cafe/gfx/pggat/lib/bouncer/bouncers/v2"
- "gfx.cafe/gfx/pggat/lib/fed"
- packets "gfx.cafe/gfx/pggat/lib/fed/packets/v3.0"
- "gfx.cafe/gfx/pggat/lib/gat/metrics"
- "gfx.cafe/gfx/pggat/lib/util/slices"
-)
-
-type Pool struct {
- config Config
- pooler Pooler
-
- closed chan struct{}
-
- pendingCount atomic.Int64
- pending chan struct{}
-
- recipes map[string]*Recipe
- recipeScaleOrder slices.Sorted[string]
- clients map[uuid.UUID]*pooledClient
- clientsByKey map[[8]byte]*pooledClient
- servers map[uuid.UUID]*pooledServer
- serversByRecipe map[string][]*pooledServer
- mu sync.RWMutex
-}
-
-func NewPool(config Config) *Pool {
- if config.NewPooler == nil {
- panic("expected new pooler func")
- }
- pooler := config.NewPooler()
- if pooler == nil {
- panic("expected pooler")
- }
-
- p := &Pool{
- config: config,
- pooler: pooler,
-
- closed: make(chan struct{}),
- pending: make(chan struct{}, 1),
- }
-
- s := newScaler(p)
- go s.Run()
-
- return p
-}
-
-func (T *Pool) idlest() (server *pooledServer, at time.Time) {
- T.mu.RLock()
- defer T.mu.RUnlock()
-
- for _, s := range T.servers {
- state, _, since := s.GetState()
- if state != metrics.ConnStateIdle {
- continue
- }
-
- if at == (time.Time{}) || since.Before(at) {
- server = s
- at = since
- }
- }
-
- return
-}
-
-func (T *Pool) AddRecipe(name string, r *Recipe) {
- func() {
- T.mu.Lock()
- defer T.mu.Unlock()
-
- T.removeRecipe(name)
-
- if T.recipes == nil {
- T.recipes = make(map[string]*Recipe)
- }
- T.recipes[name] = r
-
- // add to front of scale order
- T.recipeScaleOrder = T.recipeScaleOrder.Insert(name, func(n string) int {
- return len(T.serversByRecipe[n])
- })
- }()
-
- count := r.AllocateInitial()
- for i := 0; i < count; i++ {
- if err := T.scaleUpL1(name, r); err != nil {
- T.config.Logger.Warn("failed to dial server", zap.Error(err))
- for j := i; j < count; j++ {
- r.Free()
- }
- break
- }
- }
-}
-
-func (T *Pool) RemoveRecipe(name string) {
- T.mu.Lock()
- defer T.mu.Unlock()
-
- T.removeRecipe(name)
-}
-
-func (T *Pool) removeRecipe(name string) {
- r, ok := T.recipes[name]
- if !ok {
- return
- }
- delete(T.recipes, name)
-
- servers := T.serversByRecipe[name]
- delete(T.serversByRecipe, name)
- // remove from recipeScaleOrder
- T.recipeScaleOrder = slices.Delete(T.recipeScaleOrder, name)
-
- for _, server := range servers {
- r.Free()
- T.removeServerL1(server)
- }
-}
-
-func (T *Pool) scaleUpL0() (string, *Recipe) {
- T.mu.RLock()
- defer T.mu.RUnlock()
-
- for _, name := range T.recipeScaleOrder {
- r := T.recipes[name]
- if r.Allocate() {
- return name, r
- }
- }
-
- return "", nil
-}
-
-func (T *Pool) scaleUpL1(name string, r *Recipe) error {
- conn, err := r.Dial()
- if err != nil {
- // failed to dial
- r.Free()
- return err
- }
-
- server, err := func() (*pooledServer, error) {
- T.mu.Lock()
- defer T.mu.Unlock()
- if T.recipes[name] != r {
- // recipe was removed
- r.Free()
- return nil, errors.New("recipe was removed")
- }
-
- server := newServer(
- T.config,
- name,
- conn,
- )
-
- if T.servers == nil {
- T.servers = make(map[uuid.UUID]*pooledServer)
- }
- T.servers[server.GetID()] = server
-
- if T.serversByRecipe == nil {
- T.serversByRecipe = make(map[string][]*pooledServer)
- }
- T.serversByRecipe[name] = append(T.serversByRecipe[name], server)
- // update order
- T.recipeScaleOrder.Update(slices.Index(T.recipeScaleOrder, name), func(n string) int {
- return len(T.serversByRecipe[n])
- })
- return server, nil
- }()
-
- if err != nil {
- return err
- }
-
- T.pooler.AddServer(server.GetID())
- return nil
-}
-
-func (T *Pool) scaleUp() bool {
- name, r := T.scaleUpL0()
- if r == nil {
- return false
- }
-
- err := T.scaleUpL1(name, r)
- if err != nil {
- T.config.Logger.Warn("failed to dial server", zap.Error(err))
- return false
- }
-
- return true
-}
-
-func (T *Pool) removeServer(server *pooledServer) {
- T.mu.Lock()
- defer T.mu.Unlock()
-
- T.removeServerL1(server)
-}
-
-func (T *Pool) removeServerL1(server *pooledServer) {
- delete(T.servers, server.GetID())
- T.pooler.DeleteServer(server.GetID())
- _ = server.GetConn().Close()
- if T.serversByRecipe != nil {
- name := server.GetRecipe()
- T.serversByRecipe[name] = slices.Delete(T.serversByRecipe[name], server)
- // update order
- index := slices.Index(T.recipeScaleOrder, name)
- if index != -1 {
- T.recipeScaleOrder.Update(index, func(n string) int {
- return len(T.serversByRecipe[n])
- })
- }
- }
-}
-
-func (T *Pool) acquireServer(client *pooledClient) *pooledServer {
- client.SetState(metrics.ConnStateAwaitingServer, uuid.Nil)
-
- for {
- serverID := T.pooler.Acquire(client.GetID(), SyncModeNonBlocking)
- if serverID == uuid.Nil {
- T.pendingCount.Add(1)
- select {
- case T.pending <- struct{}{}:
- default:
- }
- serverID = T.pooler.Acquire(client.GetID(), SyncModeBlocking)
- T.pendingCount.Add(-1)
- if serverID == uuid.Nil {
- return nil
- }
- }
-
- T.mu.RLock()
- server, ok := T.servers[serverID]
- T.mu.RUnlock()
- if !ok {
- T.pooler.DeleteServer(serverID)
- continue
- }
- return server
- }
-}
-
-func (T *Pool) releaseServer(server *pooledServer) {
- if T.config.ServerResetQuery != "" {
- server.SetState(metrics.ConnStateRunningResetQuery, uuid.Nil)
-
- err, _, _ := backends.QueryString(server.GetConn(), nil, nil, T.config.ServerResetQuery)
- if err != nil {
- T.removeServer(server)
- return
- }
- }
-
- server.SetState(metrics.ConnStateIdle, uuid.Nil)
-
- T.pooler.Release(server.GetID())
-}
-
-func (T *Pool) Serve(
- conn *fed.Conn,
-) error {
- defer func() {
- _ = conn.Close()
- }()
-
- client := newClient(
- T.config,
- conn,
- )
-
- return T.serve(client, false)
-}
-
-// ServeBot is for clients that don't need initial parameters, cancelling queries, and are ready now. Use Serve for
-// real clients
-func (T *Pool) ServeBot(
- conn fed.ReadWriteCloser,
-) error {
- defer func() {
- _ = conn.Close()
- }()
-
- client := newClient(
- T.config,
- &fed.Conn{
- ReadWriteCloser: conn,
- },
- )
-
- return T.serve(client, true)
-}
-
-func (T *Pool) serve(client *pooledClient, initialized bool) error {
- T.addClient(client)
- defer T.removeClient(client)
-
- var err error
- var serverErr error
-
- var server *pooledServer
- defer func() {
- if server != nil {
- if serverErr != nil {
- T.removeServer(server)
- } else {
- T.releaseServer(server)
- }
- server = nil
- }
- }()
-
- var packet fed.Packet
-
- if !initialized {
- server = T.acquireServer(client)
- if server == nil {
- return ErrClosed
- }
-
- err, serverErr = pair(T.config, client, server)
- if serverErr != nil {
- return serverErr
- }
- if err != nil {
- return err
- }
-
- p := packets.ReadyForQuery('I')
- packet = p.IntoPacket(packet)
- err = client.GetConn().WritePacket(packet)
- if err != nil {
- return err
- }
- }
-
- for {
- if server != nil && T.config.ReleaseAfterTransaction {
- client.SetState(metrics.ConnStateIdle, uuid.Nil)
- T.releaseServer(server)
- server = nil
- }
-
- packet, err = client.GetConn().ReadPacket(true, packet)
- if err != nil {
- return err
- }
-
- if server == nil {
- server = T.acquireServer(client)
- if server == nil {
- return ErrClosed
- }
-
- err, serverErr = pair(T.config, client, server)
- }
- if err == nil && serverErr == nil {
- packet, err, serverErr = bouncers.Bounce(client.GetConn(), server.GetConn(), packet)
- }
- if serverErr != nil {
- return serverErr
- } else {
- transactionComplete(client, server)
- }
-
- if err != nil {
- return err
- }
- }
-}
-
-func (T *Pool) addClient(client *pooledClient) {
- T.mu.Lock()
- defer T.mu.Unlock()
-
- if T.clients == nil {
- T.clients = make(map[uuid.UUID]*pooledClient)
- }
- T.clients[client.GetID()] = client
- if T.clientsByKey == nil {
- T.clientsByKey = make(map[[8]byte]*pooledClient)
- }
- T.clientsByKey[client.GetBackendKey()] = client
- T.pooler.AddClient(client.GetID())
-}
-
-func (T *Pool) removeClient(client *pooledClient) {
- T.mu.Lock()
- defer T.mu.Unlock()
-
- T.removeClientL1(client)
-}
-
-func (T *Pool) removeClientL1(client *pooledClient) {
- T.pooler.DeleteClient(client.GetID())
- _ = client.conn.Close()
- delete(T.clients, client.GetID())
- delete(T.clientsByKey, client.GetBackendKey())
-}
-
-func (T *Pool) Cancel(key [8]byte) {
- T.mu.RLock()
- defer T.mu.RUnlock()
-
- client, ok := T.clientsByKey[key]
- if !ok {
- return
- }
-
- state, peer, _ := client.GetState()
- if state != metrics.ConnStateActive {
- return
- }
-
- server, ok := T.servers[peer]
- if !ok {
- return
- }
-
- // prevent state from changing by RLocking the server
- server.mu.RLock()
- defer server.mu.RUnlock()
-
- // make sure peer is still set
- if server.peer != peer {
- return
- }
-
- r, ok := T.recipes[server.recipe]
- if !ok {
- return
- }
-
- r.Cancel(server.GetBackendKey())
-}
-
-func (T *Pool) ReadMetrics(m *metrics.Pool) {
- T.mu.RLock()
- defer T.mu.RUnlock()
-
- if len(T.clients) != 0 && m.Clients == nil {
- m.Clients = make(map[uuid.UUID]metrics.Conn)
- }
- if len(T.servers) != 0 && m.Servers == nil {
- m.Servers = make(map[uuid.UUID]metrics.Conn)
- }
-
- for id, client := range T.clients {
- var mc metrics.Conn
- client.ReadMetrics(&mc)
- m.Clients[id] = mc
- }
-
- for id, server := range T.servers {
- var mc metrics.Conn
- server.ReadMetrics(&mc)
- m.Servers[id] = mc
- }
-}
-
-func (T *Pool) Close() {
- close(T.closed)
- T.pooler.Close()
-
- T.mu.Lock()
- defer T.mu.Unlock()
-
- // remove clients
- for _, client := range T.clients {
- T.removeClientL1(client)
- }
-
- // remove recipes
- for name := range T.recipes {
- T.removeRecipe(name)
- }
-}
diff --git a/lib/gat/pool/recipe.go b/lib/gat/pool/recipe.go
deleted file mode 100644
index cfa70d17..00000000
--- a/lib/gat/pool/recipe.go
+++ /dev/null
@@ -1,5 +0,0 @@
-package pool
-
-import "gfx.cafe/gfx/pggat/lib/gat/pool/recipe"
-
-type Recipe = recipe.Recipe
diff --git a/lib/gat/pool/scaler.go b/lib/gat/pool/scaler.go
deleted file mode 100644
index 8beb64d3..00000000
--- a/lib/gat/pool/scaler.go
+++ /dev/null
@@ -1,115 +0,0 @@
-package pool
-
-import (
- "time"
-
- "go.uber.org/zap"
-)
-
-type scaler struct {
- pool *Pool
-
- backingOff bool
- backoff time.Duration
-
- // timers
- idle *time.Timer
- pending *time.Timer
-}
-
-func newScaler(pool *Pool) *scaler {
- s := &scaler{
- pool: pool,
- backoff: pool.config.ServerReconnectInitialTime.Duration(),
- }
-
- if pool.config.ServerIdleTimeout != 0 {
- s.idle = time.NewTimer(pool.config.ServerIdleTimeout.Duration())
- }
-
- return s
-}
-
-func (T *scaler) idleTimeout(now time.Time) {
- // idle loop for scaling down
- var wait time.Duration
-
- var idlest *pooledServer
- var idleStart time.Time
- for idlest, idleStart = T.pool.idlest(); idlest != nil && now.Sub(idleStart) > T.pool.config.ServerIdleTimeout.Duration(); idlest, idleStart = T.pool.idlest() {
- T.pool.removeServer(idlest)
- }
-
- if idlest == nil {
- wait = T.pool.config.ServerIdleTimeout.Duration()
- } else {
- wait = idleStart.Add(T.pool.config.ServerIdleTimeout.Duration()).Sub(now)
- }
-
- T.idle.Reset(wait)
-}
-
-func (T *scaler) pendingTimeout() {
- if T.backingOff {
- T.backoff *= 2
- if T.pool.config.ServerReconnectMaxTime != 0 && T.backoff > T.pool.config.ServerReconnectMaxTime.Duration() {
- T.backoff = T.pool.config.ServerReconnectMaxTime.Duration()
- }
- }
-
- for T.pool.pendingCount.Load() > 0 {
- // pending loop for scaling up
- if T.pool.scaleUp() {
- // scale up successful, see if we need to scale up more
- T.backoff = T.pool.config.ServerReconnectInitialTime.Duration()
- T.backingOff = false
- continue
- }
-
- if T.backoff == 0 {
- // no backoff
- T.backoff = T.pool.config.ServerReconnectInitialTime.Duration()
- T.backingOff = false
- continue
- }
-
- T.backingOff = true
- if T.pending == nil {
- T.pending = time.NewTimer(T.backoff)
- } else {
- T.pending.Reset(T.backoff)
- }
-
- T.pool.config.Logger.Warn("failed to dial server", zap.Duration("backoff", T.backoff))
-
- return
- }
-}
-
-func (T *scaler) Run() {
- for {
- var idle <-chan time.Time
- if T.idle != nil {
- idle = T.idle.C
- }
-
- var pending1 <-chan struct{}
- var pending2 <-chan time.Time
- if T.backingOff {
- pending2 = T.pending.C
- } else {
- pending1 = T.pool.pending
- }
-
- select {
- case t := <-idle:
- T.idleTimeout(t)
- case <-pending1:
- T.pendingTimeout()
- case <-pending2:
- T.pendingTimeout()
- case <-T.pool.closed:
- return
- }
- }
-}
diff --git a/lib/gat/pool/server.go b/lib/gat/pool/server.go
deleted file mode 100644
index 9afa7c99..00000000
--- a/lib/gat/pool/server.go
+++ /dev/null
@@ -1,57 +0,0 @@
-package pool
-
-import (
- "gfx.cafe/gfx/pggat/lib/fed"
- "gfx.cafe/gfx/pggat/lib/fed/middlewares/eqp"
- "gfx.cafe/gfx/pggat/lib/fed/middlewares/ps"
-)
-
-type pooledServer struct {
- pooledConn
-
- recipe string
-
- ps *ps.Server
- eqp *eqp.Server
-}
-
-func newServer(
- options Config,
- recipe string,
- conn *fed.Conn,
-) *pooledServer {
- var psServer *ps.Server
- if options.ParameterStatusSync == ParameterStatusSyncDynamic {
- // add ps middleware
- psServer = ps.NewServer(conn.InitialParameters)
- conn.Middleware = append(conn.Middleware, psServer)
- }
-
- var eqpServer *eqp.Server
- if options.ExtendedQuerySync {
- // add eqp middleware
- eqpServer = eqp.NewServer()
- conn.Middleware = append(conn.Middleware, eqpServer)
- }
-
- return &pooledServer{
- pooledConn: makeConn(
- conn,
- ),
- recipe: recipe,
- ps: psServer,
- eqp: eqpServer,
- }
-}
-
-func (T *pooledServer) GetRecipe() string {
- return T.recipe
-}
-
-func (T *pooledServer) GetEQP() *eqp.Server {
- return T.eqp
-}
-
-func (T *pooledServer) GetPS() *ps.Server {
- return T.ps
-}
diff --git a/lib/gat/pool/withcredentials.go b/lib/gat/pool/withcredentials.go
deleted file mode 100644
index 9c7382b3..00000000
--- a/lib/gat/pool/withcredentials.go
+++ /dev/null
@@ -1,8 +0,0 @@
-package pool
-
-import "gfx.cafe/gfx/pggat/lib/auth"
-
-type WithCredentials struct {
- *Pool
- Credentials auth.Credentials
-}
diff --git a/lib/pool/backend.go b/lib/pool/backend.go
new file mode 100644
index 00000000..d49aff6c
--- /dev/null
+++ b/lib/pool/backend.go
@@ -0,0 +1,209 @@
+package pool
+
+import (
+ "log"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "gfx.cafe/gfx/pggat/lib/fed"
+ "gfx.cafe/gfx/pggat/lib/pool/recipe"
+)
+
+type backendRecipe struct {
+ weight atomic.Int64
+ recipe *recipe.Recipe
+
+ servers []*fed.Conn // TODO(garet)
+ killed bool
+}
+
+type Backend struct {
+ config BackendConfig
+ pooler Pooler
+
+ closed chan struct{}
+
+ scaleUp chan struct{}
+ waiters atomic.Int64
+
+ recipes map[string]*backendRecipe
+ mu sync.RWMutex
+}
+
+func NewBackend(config BackendConfig) *Backend {
+ b := &Backend{
+ config: config,
+ pooler: config.NewPooler(),
+
+ closed: make(chan struct{}),
+
+ scaleUp: make(chan struct{}, 1),
+ }
+
+ go b.scaleLoop()
+
+ return b
+}
+
+func (T *Backend) addRecipe(name string, r *recipe.Recipe) {
+ if T.recipes == nil {
+ T.recipes = make(map[string]*backendRecipe)
+ }
+ T.recipes[name] = &backendRecipe{
+ recipe: r,
+ }
+}
+
+func (T *Backend) AddRecipe(name string, r *recipe.Recipe) {
+ T.mu.Lock()
+ defer T.mu.Unlock()
+
+ T.removeRecipe(name)
+ T.addRecipe(name, r)
+}
+
+func (T *Backend) removeRecipe(name string) {
+ r, ok := T.recipes[name]
+ if !ok {
+ return
+ }
+ delete(T.recipes, name)
+
+ r.killed = true
+ for _, server := range r.servers {
+ _ = server.Close()
+ r.recipe.Free()
+ }
+ r.servers = r.servers[:0]
+}
+
+func (T *Backend) RemoveRecipe(name string) {
+ T.mu.Lock()
+ defer T.mu.Unlock()
+
+ T.removeRecipe(name)
+}
+
+func (T *Backend) ScaleUp() error {
+ var target *backendRecipe
+ func() {
+ T.mu.RLock()
+ defer T.mu.RUnlock()
+
+ var targetWeight int64 = -1
+ for _, r := range T.recipes {
+ if target == nil {
+ target = r
+ continue
+ }
+ weight := r.weight.Load()
+ if weight > targetWeight {
+ target = r
+ targetWeight = weight
+ continue
+ }
+ }
+
+ if !target.recipe.Allocate() {
+ return
+ }
+ target.weight.Add(1)
+ }()
+
+ if target == nil {
+ return ErrNoScalableRecipe
+ }
+
+ server, err := target.recipe.Dial()
+ if err != nil {
+ target.recipe.Free()
+ target.weight.Add(-1)
+ return err
+ }
+
+ T.mu.Lock()
+ defer T.mu.Unlock()
+
+ if target.killed {
+ target.recipe.Free()
+ target.weight.Add(-1)
+ return nil
+ }
+
+ target.servers = append(target.servers, server)
+ return nil
+}
+
+// ScaleDown attempts to scale down the pool. Returns when the next scale down should happen
+func (T *Backend) ScaleDown() time.Duration {
+ // TODO(garet)
+ return T.config.IdleTimeout
+}
+
+func (T *Backend) scaleLoop() {
+ var scaleUpBackoffDuration time.Duration
+ var scaleUpBackoff *time.Timer
+
+ var idleTimeout *time.Timer
+ if T.config.IdleTimeout != 0 {
+ idleTimeout = time.NewTimer(T.config.IdleTimeout)
+ }
+
+ defer func() {
+ if scaleUpBackoff != nil {
+ scaleUpBackoff.Stop()
+ }
+ if idleTimeout != nil {
+ idleTimeout.Stop()
+ }
+ }()
+
+ for {
+ var scaleUpBackoffC <-chan time.Time
+ var scaleUpC <-chan struct{}
+ if scaleUpBackoffDuration != 0 {
+ scaleUpBackoffC = scaleUpBackoff.C
+ } else {
+ scaleUpC = T.scaleUp
+ }
+
+ var idleTimeoutC <-chan time.Time
+ if idleTimeout != nil {
+ idleTimeoutC = idleTimeout.C
+ }
+
+ select {
+ case <-scaleUpC:
+ if err := T.ScaleUp(); err != nil {
+ log.Printf("error scaling up: %v", err)
+
+ scaleUpBackoffDuration = T.config.ReconnectInitialTime
+ if scaleUpBackoffDuration != 0 {
+ if scaleUpBackoff == nil {
+ scaleUpBackoff = time.NewTimer(scaleUpBackoffDuration)
+ } else {
+ scaleUpBackoff.Reset(scaleUpBackoffDuration)
+ }
+ }
+ }
+ case <-scaleUpBackoffC:
+ if err := T.ScaleUp(); err != nil {
+ log.Printf("error scaling up: %v", err)
+
+ scaleUpBackoffDuration *= 2
+ scaleUpBackoff.Reset(scaleUpBackoffDuration)
+ } else {
+ scaleUpBackoffDuration = 0
+ }
+ case <-idleTimeoutC:
+ idleTimeout.Reset(T.ScaleDown())
+ case <-T.closed:
+ return
+ }
+ }
+}
+
+func (T *Backend) Close() {
+ close(T.closed)
+}
diff --git a/lib/pool/config.go b/lib/pool/config.go
new file mode 100644
index 00000000..8fa522a6
--- /dev/null
+++ b/lib/pool/config.go
@@ -0,0 +1,12 @@
+package pool
+
+import "time"
+
+type BackendConfig struct {
+ NewPooler func() Pooler
+
+ IdleTimeout time.Duration
+
+ ReconnectInitialTime time.Duration
+ ReconnectMaxTime time.Duration
+}
diff --git a/lib/pool/errors.go b/lib/pool/errors.go
new file mode 100644
index 00000000..4fbd6ab2
--- /dev/null
+++ b/lib/pool/errors.go
@@ -0,0 +1,7 @@
+package pool
+
+import "errors"
+
+var (
+ ErrNoScalableRecipe = errors.New("no scalable recipe")
+)
diff --git a/lib/pool/frontend.go b/lib/pool/frontend.go
new file mode 100644
index 00000000..799c9321
--- /dev/null
+++ b/lib/pool/frontend.go
@@ -0,0 +1,5 @@
+package pool
+
+type Frontend struct {
+ Backend *Backend
+}
diff --git a/lib/gat/pool/pooler.go b/lib/pool/pooler.go
similarity index 100%
rename from lib/gat/pool/pooler.go
rename to lib/pool/pooler.go
diff --git a/lib/gat/pool/recipe/config.go b/lib/pool/recipe/config.go
similarity index 100%
rename from lib/gat/pool/recipe/config.go
rename to lib/pool/recipe/config.go
diff --git a/lib/gat/pool/recipe/dialer.go b/lib/pool/recipe/dialer.go
similarity index 91%
rename from lib/gat/pool/recipe/dialer.go
rename to lib/pool/recipe/dialer.go
index 8202c2b9..3ce99919 100644
--- a/lib/gat/pool/recipe/dialer.go
+++ b/lib/pool/recipe/dialer.go
@@ -5,8 +5,6 @@ import (
"net"
"gfx.cafe/gfx/pggat/lib/auth"
- "gfx.cafe/gfx/pggat/lib/bouncer"
- "gfx.cafe/gfx/pggat/lib/bouncer/backends/v0"
"gfx.cafe/gfx/pggat/lib/fed"
"gfx.cafe/gfx/pggat/lib/util/strutil"
)
@@ -15,7 +13,7 @@ type Dialer struct {
Network string
Address string
- SSLMode bouncer.SSLMode
+ SSLMode bounce.SSLMode
SSLConfig *tls.Config
Username string
Credentials auth.Credentials
diff --git a/lib/gat/pool/recipe/recipe.go b/lib/pool/recipe/recipe.go
similarity index 100%
rename from lib/gat/pool/recipe/recipe.go
rename to lib/pool/recipe/recipe.go
diff --git a/test/config.go b/test/config.go
index bc13828d..e66fc44e 100644
--- a/test/config.go
+++ b/test/config.go
@@ -1,7 +1,7 @@
package test
import (
- "gfx.cafe/gfx/pggat/lib/gat/pool/recipe"
+ "gfx.cafe/gfx/pggat/lib/pool/recipe"
)
type Config struct {
diff --git a/test/runner.go b/test/runner.go
index d23e94c6..499127bc 100644
--- a/test/runner.go
+++ b/test/runner.go
@@ -8,8 +8,8 @@ import (
"gfx.cafe/gfx/pggat/lib/bouncer/bouncers/v2"
"gfx.cafe/gfx/pggat/lib/fed"
packets "gfx.cafe/gfx/pggat/lib/fed/packets/v3.0"
- "gfx.cafe/gfx/pggat/lib/gat/pool/recipe"
"gfx.cafe/gfx/pggat/lib/gsql"
+ "gfx.cafe/gfx/pggat/lib/pool/recipe"
"gfx.cafe/gfx/pggat/lib/util/flip"
"gfx.cafe/gfx/pggat/test/inst"
)
diff --git a/test/tester_test.go b/test/tester_test.go
index 6ef9b431..63c5cd05 100644
--- a/test/tester_test.go
+++ b/test/tester_test.go
@@ -12,16 +12,17 @@ import (
"github.com/caddyserver/caddy/v2"
"github.com/caddyserver/caddy/v2/caddyconfig"
+ "gfx.cafe/gfx/pggat/lib/gat/pool"
+
"gfx.cafe/gfx/pggat/lib/auth/credentials"
"gfx.cafe/gfx/pggat/lib/gat"
"gfx.cafe/gfx/pggat/lib/gat/gatcaddyfile"
pool_handler "gfx.cafe/gfx/pggat/lib/gat/handlers/pool"
"gfx.cafe/gfx/pggat/lib/gat/handlers/rewrite_password"
"gfx.cafe/gfx/pggat/lib/gat/matchers"
- "gfx.cafe/gfx/pggat/lib/gat/pool"
- "gfx.cafe/gfx/pggat/lib/gat/pool/recipe"
"gfx.cafe/gfx/pggat/lib/gat/poolers/session"
"gfx.cafe/gfx/pggat/lib/gat/poolers/transaction"
+ "gfx.cafe/gfx/pggat/lib/pool/recipe"
"gfx.cafe/gfx/pggat/test"
"gfx.cafe/gfx/pggat/test/tests"
)
--
GitLab