From 64864afef188b28bd8582f99cb29f70bfa287a81 Mon Sep 17 00:00:00 2001 From: Garet Halliday <me@garet.holiday> Date: Thu, 12 Oct 2023 18:57:51 -0500 Subject: [PATCH] Revert "hmmmm" This reverts commit 62208f73f9be166a3fe85f2a51765ccfdb53a230. --- lib/{bounce => bouncer}/backends/v0/accept.go | 3 +- lib/{bounce => bouncer}/backends/v0/cancel.go | 0 .../backends/v0/context.go | 0 lib/{bounce => bouncer}/backends/v0/errors.go | 0 .../backends/v0/options.go | 3 +- lib/{bounce => bouncer}/backends/v0/query.go | 0 .../bouncers/v2/bouncer.go | 0 .../frontends/v0/accept.go | 0 .../frontends/v0/authenticate.go | 0 .../frontends/v0/context.go | 0 .../frontends/v0/options.go | 0 .../frontends/v0/params.go | 0 lib/{bounce => bouncer}/sslmode.go | 2 +- lib/gat/gatcaddyfile/handler.go | 9 +- lib/gat/handlers/discovery/config.go | 3 +- .../google_cloud_sql/discoverer.go | 7 +- lib/gat/handlers/discovery/module.go | 29 +- lib/gat/handlers/pgbouncer/config.go | 9 +- lib/gat/handlers/pgbouncer/module.go | 11 +- lib/gat/handlers/pgbouncer_spilo/module.go | 5 +- lib/gat/handlers/pool/config.go | 4 +- lib/gat/handlers/pool/module.go | 6 +- lib/gat/pool/client.go | 55 ++ lib/gat/pool/config.go | 61 +++ lib/gat/pool/conn.go | 120 +++++ lib/gat/pool/errors.go | 5 + lib/gat/pool/flow.go | 115 ++++ lib/gat/pool/pool.go | 497 ++++++++++++++++++ lib/{ => gat}/pool/pooler.go | 0 lib/gat/pool/recipe.go | 5 + lib/{ => gat}/pool/recipe/config.go | 0 lib/{ => gat}/pool/recipe/dialer.go | 4 +- lib/{ => gat}/pool/recipe/recipe.go | 0 lib/gat/pool/scaler.go | 115 ++++ lib/gat/pool/server.go | 57 ++ lib/gat/pool/withcredentials.go | 8 + lib/pool/backend.go | 209 -------- lib/pool/config.go | 12 - lib/pool/errors.go | 7 - lib/pool/frontend.go | 5 - test/config.go | 2 +- test/runner.go | 2 +- test/tester_test.go | 5 +- 43 files changed, 1095 insertions(+), 280 deletions(-) rename lib/{bounce => bouncer}/backends/v0/accept.go (99%) rename lib/{bounce => bouncer}/backends/v0/cancel.go (100%) rename lib/{bounce => bouncer}/backends/v0/context.go (100%) rename lib/{bounce => bouncer}/backends/v0/errors.go (100%) rename lib/{bounce => bouncer}/backends/v0/options.go (81%) rename lib/{bounce => bouncer}/backends/v0/query.go (100%) rename lib/{bounce => bouncer}/bouncers/v2/bouncer.go (100%) rename lib/{bounce => bouncer}/frontends/v0/accept.go (100%) rename lib/{bounce => bouncer}/frontends/v0/authenticate.go (100%) rename lib/{bounce => bouncer}/frontends/v0/context.go (100%) rename lib/{bounce => bouncer}/frontends/v0/options.go (100%) rename lib/{bounce => bouncer}/frontends/v0/params.go (100%) rename lib/{bounce => bouncer}/sslmode.go (97%) create mode 100644 lib/gat/pool/client.go create mode 100644 lib/gat/pool/config.go create mode 100644 lib/gat/pool/conn.go create mode 100644 lib/gat/pool/errors.go create mode 100644 lib/gat/pool/flow.go create mode 100644 lib/gat/pool/pool.go rename lib/{ => gat}/pool/pooler.go (100%) create mode 100644 lib/gat/pool/recipe.go rename lib/{ => gat}/pool/recipe/config.go (100%) rename lib/{ => gat}/pool/recipe/dialer.go (91%) rename lib/{ => gat}/pool/recipe/recipe.go (100%) create mode 100644 lib/gat/pool/scaler.go create mode 100644 lib/gat/pool/server.go create mode 100644 lib/gat/pool/withcredentials.go delete mode 100644 lib/pool/backend.go delete mode 100644 lib/pool/config.go delete mode 100644 lib/pool/errors.go delete mode 100644 lib/pool/frontend.go diff --git a/lib/bounce/backends/v0/accept.go b/lib/bouncer/backends/v0/accept.go similarity index 99% rename from lib/bounce/backends/v0/accept.go rename to lib/bouncer/backends/v0/accept.go index 7f29f985..d094f2f1 100644 --- a/lib/bounce/backends/v0/accept.go +++ b/lib/bouncer/backends/v0/accept.go @@ -6,6 +6,7 @@ import ( "io" "gfx.cafe/gfx/pggat/lib/auth" + "gfx.cafe/gfx/pggat/lib/bouncer" "gfx.cafe/gfx/pggat/lib/fed" packets "gfx.cafe/gfx/pggat/lib/fed/packets/v3.0" "gfx.cafe/gfx/pggat/lib/util/strutil" @@ -345,7 +346,7 @@ func accept(ctx *acceptContext) error { func Accept( conn *fed.Conn, - sslMode bounce.SSLMode, + sslMode bouncer.SSLMode, sslConfig *tls.Config, username string, credentials auth.Credentials, diff --git a/lib/bounce/backends/v0/cancel.go b/lib/bouncer/backends/v0/cancel.go similarity index 100% rename from lib/bounce/backends/v0/cancel.go rename to lib/bouncer/backends/v0/cancel.go diff --git a/lib/bounce/backends/v0/context.go b/lib/bouncer/backends/v0/context.go similarity index 100% rename from lib/bounce/backends/v0/context.go rename to lib/bouncer/backends/v0/context.go diff --git a/lib/bounce/backends/v0/errors.go b/lib/bouncer/backends/v0/errors.go similarity index 100% rename from lib/bounce/backends/v0/errors.go rename to lib/bouncer/backends/v0/errors.go diff --git a/lib/bounce/backends/v0/options.go b/lib/bouncer/backends/v0/options.go similarity index 81% rename from lib/bounce/backends/v0/options.go rename to lib/bouncer/backends/v0/options.go index f23feb09..9d98d466 100644 --- a/lib/bounce/backends/v0/options.go +++ b/lib/bouncer/backends/v0/options.go @@ -4,11 +4,12 @@ import ( "crypto/tls" "gfx.cafe/gfx/pggat/lib/auth" + "gfx.cafe/gfx/pggat/lib/bouncer" "gfx.cafe/gfx/pggat/lib/util/strutil" ) type acceptOptions struct { - SSLMode bounce.SSLMode + SSLMode bouncer.SSLMode SSLConfig *tls.Config Username string Credentials auth.Credentials diff --git a/lib/bounce/backends/v0/query.go b/lib/bouncer/backends/v0/query.go similarity index 100% rename from lib/bounce/backends/v0/query.go rename to lib/bouncer/backends/v0/query.go diff --git a/lib/bounce/bouncers/v2/bouncer.go b/lib/bouncer/bouncers/v2/bouncer.go similarity index 100% rename from lib/bounce/bouncers/v2/bouncer.go rename to lib/bouncer/bouncers/v2/bouncer.go diff --git a/lib/bounce/frontends/v0/accept.go b/lib/bouncer/frontends/v0/accept.go similarity index 100% rename from lib/bounce/frontends/v0/accept.go rename to lib/bouncer/frontends/v0/accept.go diff --git a/lib/bounce/frontends/v0/authenticate.go b/lib/bouncer/frontends/v0/authenticate.go similarity index 100% rename from lib/bounce/frontends/v0/authenticate.go rename to lib/bouncer/frontends/v0/authenticate.go diff --git a/lib/bounce/frontends/v0/context.go b/lib/bouncer/frontends/v0/context.go similarity index 100% rename from lib/bounce/frontends/v0/context.go rename to lib/bouncer/frontends/v0/context.go diff --git a/lib/bounce/frontends/v0/options.go b/lib/bouncer/frontends/v0/options.go similarity index 100% rename from lib/bounce/frontends/v0/options.go rename to lib/bouncer/frontends/v0/options.go diff --git a/lib/bounce/frontends/v0/params.go b/lib/bouncer/frontends/v0/params.go similarity index 100% rename from lib/bounce/frontends/v0/params.go rename to lib/bouncer/frontends/v0/params.go diff --git a/lib/bounce/sslmode.go b/lib/bouncer/sslmode.go similarity index 97% rename from lib/bounce/sslmode.go rename to lib/bouncer/sslmode.go index 335e52ab..cb8b63d5 100644 --- a/lib/bounce/sslmode.go +++ b/lib/bouncer/sslmode.go @@ -1,4 +1,4 @@ -package bounce +package bouncer type SSLMode string diff --git a/lib/gat/gatcaddyfile/handler.go b/lib/gat/gatcaddyfile/handler.go index 501c4173..75847872 100644 --- a/lib/gat/gatcaddyfile/handler.go +++ b/lib/gat/gatcaddyfile/handler.go @@ -13,6 +13,7 @@ import ( "github.com/caddyserver/caddy/v2/caddyconfig" "github.com/caddyserver/caddy/v2/caddyconfig/caddyfile" + "gfx.cafe/gfx/pggat/lib/bouncer" "gfx.cafe/gfx/pggat/lib/gat/handlers/allowed_startup_parameters" "gfx.cafe/gfx/pggat/lib/gat/handlers/discovery" "gfx.cafe/gfx/pggat/lib/gat/handlers/pgbouncer" @@ -140,7 +141,7 @@ func init() { "pooler", warnings, ), - ServerSSLMode: bounce.SSLModePrefer, + ServerSSLMode: bouncer.SSLModePrefer, ServerSSL: JSONModuleObject( &insecure_skip_verify.Client{}, SSLClient, @@ -220,7 +221,7 @@ func init() { return nil, d.ArgErr() } - module.ServerSSLMode = bounce.SSLMode(d.Val()) + module.ServerSSLMode = bouncer.SSLMode(d.Val()) if !d.NextArg() { return nil, d.ArgErr() @@ -389,7 +390,7 @@ func init() { warnings, ), - ServerSSLMode: bounce.SSLModePrefer, + ServerSSLMode: bouncer.SSLModePrefer, ServerSSL: JSONModuleObject( &insecure_skip_verify.Client{}, SSLClient, @@ -454,7 +455,7 @@ func init() { return nil, d.ArgErr() } - module.ServerSSLMode = bounce.SSLMode(d.Val()) + module.ServerSSLMode = bouncer.SSLMode(d.Val()) if !d.NextArg() { return nil, d.ArgErr() diff --git a/lib/gat/handlers/discovery/config.go b/lib/gat/handlers/discovery/config.go index bb4024b6..b2712561 100644 --- a/lib/gat/handlers/discovery/config.go +++ b/lib/gat/handlers/discovery/config.go @@ -3,6 +3,7 @@ package discovery import ( "encoding/json" + "gfx.cafe/gfx/pggat/lib/bouncer" "gfx.cafe/gfx/pggat/lib/util/dur" ) @@ -14,7 +15,7 @@ type Config struct { Pooler json.RawMessage `json:"pooler" caddy:"namespace=pggat.poolers inline_key=pooler"` - ServerSSLMode bounce.SSLMode `json:"server_ssl_mode,omitempty"` + ServerSSLMode bouncer.SSLMode `json:"server_ssl_mode,omitempty"` ServerSSL json.RawMessage `json:"server_ssl,omitempty" caddy:"namespace=pggat.ssl.clients inline_key=provider"` ServerStartupParameters map[string]string `json:"server_startup_parameters,omitempty"` diff --git a/lib/gat/handlers/discovery/discoverers/google_cloud_sql/discoverer.go b/lib/gat/handlers/discovery/discoverers/google_cloud_sql/discoverer.go index cbaa009b..6bb9351b 100644 --- a/lib/gat/handlers/discovery/discoverers/google_cloud_sql/discoverer.go +++ b/lib/gat/handlers/discovery/discoverers/google_cloud_sql/discoverer.go @@ -5,13 +5,16 @@ import ( "net" "strings" + "gfx.cafe/gfx/pggat/lib/gat/pool/recipe" + "github.com/caddyserver/caddy/v2" sqladmin "google.golang.org/api/sqladmin/v1beta4" "gfx.cafe/gfx/pggat/lib/gat/handlers/discovery" - "gfx.cafe/gfx/pggat/lib/pool/recipe" "gfx.cafe/gfx/pggat/lib/auth/credentials" + "gfx.cafe/gfx/pggat/lib/bouncer" + "gfx.cafe/gfx/pggat/lib/bouncer/bouncers/v2" "gfx.cafe/gfx/pggat/lib/fed" "gfx.cafe/gfx/pggat/lib/gsql" ) @@ -117,7 +120,7 @@ func (T *Discoverer) instanceToCluster(primary *sqladmin.DatabaseInstance, repli admin, err = recipe.Dialer{ Network: "tcp", Address: primaryAddress, - SSLMode: bounce.SSLModePrefer, + SSLMode: bouncer.SSLModePrefer, SSLConfig: &tls.Config{ InsecureSkipVerify: true, }, diff --git a/lib/gat/handlers/discovery/module.go b/lib/gat/handlers/discovery/module.go index 73e4bf49..bd6614d2 100644 --- a/lib/gat/handlers/discovery/module.go +++ b/lib/gat/handlers/discovery/module.go @@ -9,15 +9,14 @@ import ( "github.com/caddyserver/caddy/v2" "go.uber.org/zap" - "gfx.cafe/gfx/pggat/lib/gat/pool" - "gfx.cafe/gfx/pggat/lib/auth" "gfx.cafe/gfx/pggat/lib/auth/credentials" "gfx.cafe/gfx/pggat/lib/bouncer/frontends/v0" "gfx.cafe/gfx/pggat/lib/fed" "gfx.cafe/gfx/pggat/lib/gat" "gfx.cafe/gfx/pggat/lib/gat/metrics" - recipe2 "gfx.cafe/gfx/pggat/lib/pool/recipe" + "gfx.cafe/gfx/pggat/lib/gat/pool" + "gfx.cafe/gfx/pggat/lib/gat/pool/recipe" "gfx.cafe/gfx/pggat/lib/util/maps" "gfx.cafe/gfx/pggat/lib/util/slices" "gfx.cafe/gfx/pggat/lib/util/strutil" @@ -224,7 +223,7 @@ func (T *Module) replacePrimary(users []User, databases []string, endpoint Endpo for _, user := range users { primaryCreds, _ := T.creds(user) for _, database := range databases { - primary := recipe2.Dialer{ + primary := recipe.Dialer{ Network: endpoint.Network, Address: endpoint.Address, Username: user.Username, @@ -241,7 +240,7 @@ func (T *Module) replacePrimary(users []User, databases []string, endpoint Endpo } p.RemoveRecipe("primary") - p.AddRecipe("primary", recipe2.NewRecipe(recipe2.Config{ + p.AddRecipe("primary", recipe.NewRecipe(recipe.Config{ Dialer: primary, })) } @@ -259,7 +258,7 @@ func (T *Module) addReplicas(replicas map[string]Endpoint, users []User, databas } for id, r := range replicas { - replica := recipe2.Dialer{ + replica := recipe.Dialer{ Network: r.Network, Address: r.Address, Username: user.Username, @@ -269,7 +268,7 @@ func (T *Module) addReplicas(replicas map[string]Endpoint, users []User, databas SSLConfig: T.sslConfig, StartupParameters: T.serverStartupParameters, } - replicaPool.AddRecipe(id, recipe2.NewRecipe(recipe2.Config{ + replicaPool.AddRecipe(id, recipe.NewRecipe(recipe.Config{ Dialer: replica, })) } @@ -298,7 +297,7 @@ func (T *Module) addReplica(users []User, databases []string, id string, endpoin continue } - replica := recipe2.Dialer{ + replica := recipe.Dialer{ Network: endpoint.Network, Address: endpoint.Address, Username: user.Username, @@ -308,7 +307,7 @@ func (T *Module) addReplica(users []User, databases []string, id string, endpoin SSLConfig: T.sslConfig, StartupParameters: T.serverStartupParameters, } - p.AddRecipe(id, recipe2.NewRecipe(recipe2.Config{ + p.AddRecipe(id, recipe.NewRecipe(recipe.Config{ Dialer: replica, })) } @@ -332,7 +331,7 @@ func (T *Module) addUser(primaryEndpoint Endpoint, replicas map[string]Endpoint, replicaUsername := T.replicaUsername(user.Username) primaryCreds, replicaCreds := T.creds(user) for _, database := range databases { - base := recipe2.Dialer{ + base := recipe.Dialer{ Username: user.Username, Credentials: primaryCreds, Database: database, @@ -349,7 +348,7 @@ func (T *Module) addUser(primaryEndpoint Endpoint, replicas map[string]Endpoint, Pool: T.pooler.NewPool(), Credentials: primaryCreds, } - primaryPool.AddRecipe("primary", recipe2.NewRecipe(recipe2.Config{ + primaryPool.AddRecipe("primary", recipe.NewRecipe(recipe.Config{ Dialer: primary, })) T.addPool(user.Username, database, primaryPool) @@ -364,7 +363,7 @@ func (T *Module) addUser(primaryEndpoint Endpoint, replicas map[string]Endpoint, replica := base replica.Network = r.Network replica.Address = r.Address - replicaPool.AddRecipe(id, recipe2.NewRecipe(recipe2.Config{ + replicaPool.AddRecipe(id, recipe.NewRecipe(recipe.Config{ Dialer: replica, })) } @@ -391,7 +390,7 @@ func (T *Module) addDatabase(primaryEndpoint Endpoint, replicas map[string]Endpo replicaUsername := T.replicaUsername(user.Username) primaryCreds, replicaCreds := T.creds(user) - base := recipe2.Dialer{ + base := recipe.Dialer{ Username: user.Username, Credentials: primaryCreds, Database: database, @@ -408,7 +407,7 @@ func (T *Module) addDatabase(primaryEndpoint Endpoint, replicas map[string]Endpo Pool: T.pooler.NewPool(), Credentials: primaryCreds, } - primaryPool.AddRecipe("primary", recipe2.NewRecipe(recipe2.Config{ + primaryPool.AddRecipe("primary", recipe.NewRecipe(recipe.Config{ Dialer: primary, })) T.addPool(user.Username, database, primaryPool) @@ -423,7 +422,7 @@ func (T *Module) addDatabase(primaryEndpoint Endpoint, replicas map[string]Endpo replica := base replica.Network = r.Network replica.Address = r.Address - replicaPool.AddRecipe(id, recipe2.NewRecipe(recipe2.Config{ + replicaPool.AddRecipe(id, recipe.NewRecipe(recipe.Config{ Dialer: replica, })) } diff --git a/lib/gat/handlers/pgbouncer/config.go b/lib/gat/handlers/pgbouncer/config.go index 146c519a..6f3b179a 100644 --- a/lib/gat/handlers/pgbouncer/config.go +++ b/lib/gat/handlers/pgbouncer/config.go @@ -8,6 +8,7 @@ import ( "github.com/caddyserver/caddy/v2/caddyconfig" + "gfx.cafe/gfx/pggat/lib/bouncer" "gfx.cafe/gfx/pggat/lib/gat" "gfx.cafe/gfx/pggat/lib/gat/ssl/servers/x509_key_pair" "gfx.cafe/gfx/pggat/lib/util/encoding/ini" @@ -110,7 +111,7 @@ type PgBouncer struct { DnsNxdomainTtl float64 `ini:"dns_nxdomain_ttl"` DnsZoneCheckPeriod float64 `ini:"dns_zone_check_period"` ResolvConf string `ini:"resolv.conf"` - ClientTLSSSLMode bounce.SSLMode `ini:"client_tls_sslmode"` + ClientTLSSSLMode bouncer.SSLMode `ini:"client_tls_sslmode"` ClientTLSKeyFile string `ini:"client_tls_key_file"` ClientTLSCertFile string `ini:"client_tls_cert_file"` ClientTLSCaFile string `ini:"client_tls_ca_file"` @@ -118,7 +119,7 @@ type PgBouncer struct { ClientTLSCiphers []TLSCipher `ini:"client_tls_ciphers"` ClientTLSECDHCurve TLSECDHCurve `ini:"client_tls_ecdhcurve"` ClientTLSDHEParams TLSDHEParams `ini:"client_tls_dheparams"` - ServerTLSSSLMode bounce.SSLMode `ini:"server_tls_sslmode"` + ServerTLSSSLMode bouncer.SSLMode `ini:"server_tls_sslmode"` ServerTLSCaFile string `ini:"server_tls_ca_file"` ServerTLSKeyFile string `ini:"server_tls_key_file"` ServerTLSCertFile string `ini:"server_tls_cert_file"` @@ -210,7 +211,7 @@ var Default = Config{ AutodbIdleTimeout: 3600.0, DnsMaxTtl: 15.0, DnsNxdomainTtl: 15.0, - ClientTLSSSLMode: bounce.SSLModeDisable, + ClientTLSSSLMode: bouncer.SSLModeDisable, ClientTLSProtocols: []TLSProtocol{ TLSProtocolSecure, }, @@ -218,7 +219,7 @@ var Default = Config{ "fast", }, ClientTLSECDHCurve: "auto", - ServerTLSSSLMode: bounce.SSLModePrefer, + ServerTLSSSLMode: bouncer.SSLModePrefer, ServerTLSProtocols: []TLSProtocol{ TLSProtocolSecure, }, diff --git a/lib/gat/handlers/pgbouncer/module.go b/lib/gat/handlers/pgbouncer/module.go index 962c45f3..a5a8f025 100644 --- a/lib/gat/handlers/pgbouncer/module.go +++ b/lib/gat/handlers/pgbouncer/module.go @@ -19,15 +19,14 @@ import ( "gfx.cafe/gfx/pggat/lib/gat/poolers/session" "gfx.cafe/gfx/pggat/lib/gat/poolers/transaction" "gfx.cafe/gfx/pggat/lib/perror" - recipe2 "gfx.cafe/gfx/pggat/lib/pool/recipe" "gfx.cafe/gfx/pggat/lib/util/dur" "gfx.cafe/gfx/pggat/lib/util/slices" - "gfx.cafe/gfx/pggat/lib/gat/pool" - "gfx.cafe/gfx/pggat/lib/auth/credentials" "gfx.cafe/gfx/pggat/lib/gat" "gfx.cafe/gfx/pggat/lib/gat/metrics" + "gfx.cafe/gfx/pggat/lib/gat/pool" + "gfx.cafe/gfx/pggat/lib/gat/pool/recipe" "gfx.cafe/gfx/pggat/lib/gsql" "gfx.cafe/gfx/pggat/lib/util/maps" "gfx.cafe/gfx/pggat/lib/util/strutil" @@ -215,7 +214,7 @@ func (T *Module) tryCreate(user, database string) (pool.WithCredentials, bool) { serverCreds = credentials.FromString(user, db.Password) } - dialer := recipe2.Dialer{ + dialer := recipe.Dialer{ SSLMode: T.Config.PgBouncer.ServerTLSSSLMode, SSLConfig: &tls.Config{ InsecureSkipVerify: true, // TODO(garet) @@ -255,7 +254,7 @@ func (T *Module) tryCreate(user, database string) (pool.WithCredentials, bool) { dialer.Address = address } - recipeOptions := recipe2.Config{ + recipeOptions := recipe.Config{ Dialer: dialer, MinConnections: db.MinPoolSize, MaxConnections: db.MaxDBConnections, @@ -266,7 +265,7 @@ func (T *Module) tryCreate(user, database string) (pool.WithCredentials, bool) { if recipeOptions.MaxConnections == 0 { recipeOptions.MaxConnections = T.Config.PgBouncer.MaxDBConnections } - r := recipe2.NewRecipe(recipeOptions) + r := recipe.NewRecipe(recipeOptions) p.AddRecipe("pgbouncer", r) diff --git a/lib/gat/handlers/pgbouncer_spilo/module.go b/lib/gat/handlers/pgbouncer_spilo/module.go index a71759bd..dc41720b 100644 --- a/lib/gat/handlers/pgbouncer_spilo/module.go +++ b/lib/gat/handlers/pgbouncer_spilo/module.go @@ -7,6 +7,7 @@ import ( "gfx.cafe/gfx/pggat/lib/gat/handlers/pgbouncer" + "gfx.cafe/gfx/pggat/lib/bouncer" "gfx.cafe/gfx/pggat/lib/gat" "gfx.cafe/gfx/pggat/lib/util/strutil" @@ -51,12 +52,12 @@ func (T *Module) Provision(ctx caddy.Context) error { pgb.PgBouncer.LogFile = "/var/log/pgbouncer/pgbouncer.log" pgb.PgBouncer.PidFile = "/var/run/pgbouncer/pgbouncer.pid" - pgb.PgBouncer.ServerTLSSSLMode = bounce.SSLModeRequire + pgb.PgBouncer.ServerTLSSSLMode = bouncer.SSLModeRequire pgb.PgBouncer.ServerTLSCaFile = "/etc/ssl/certs/pgbouncer.crt" pgb.PgBouncer.ServerTLSProtocols = []pgbouncer.TLSProtocol{ pgbouncer.TLSProtocolSecure, } - pgb.PgBouncer.ClientTLSSSLMode = bounce.SSLModeRequire + pgb.PgBouncer.ClientTLSSSLMode = bouncer.SSLModeRequire pgb.PgBouncer.ClientTLSKeyFile = "/etc/ssl/certs/pgbouncer.key" pgb.PgBouncer.ClientTLSCertFile = "/etc/ssl/certs/pgbouncer.crt" diff --git a/lib/gat/handlers/pool/config.go b/lib/gat/handlers/pool/config.go index 49aa27a3..5a2fc336 100644 --- a/lib/gat/handlers/pool/config.go +++ b/lib/gat/handlers/pool/config.go @@ -2,6 +2,8 @@ package pool_handler import ( "encoding/json" + + "gfx.cafe/gfx/pggat/lib/bouncer" ) type Config struct { @@ -9,7 +11,7 @@ type Config struct { // Server connect options ServerAddress string `jsonn:"server_address"` - ServerSSLMode bounce.SSLMode `json:"server_ssl_mode,omitempty"` + ServerSSLMode bouncer.SSLMode `json:"server_ssl_mode,omitempty"` ServerSSL json.RawMessage `json:"server_ssl,omitempty" caddy:"namespace=pggat.ssl.clients inline_key=provider"` // Server routing options diff --git a/lib/gat/handlers/pool/module.go b/lib/gat/handlers/pool/module.go index bf627ddb..61b0085a 100644 --- a/lib/gat/handlers/pool/module.go +++ b/lib/gat/handlers/pool/module.go @@ -12,7 +12,7 @@ import ( "gfx.cafe/gfx/pggat/lib/fed" "gfx.cafe/gfx/pggat/lib/gat" "gfx.cafe/gfx/pggat/lib/gat/metrics" - recipe2 "gfx.cafe/gfx/pggat/lib/pool/recipe" + "gfx.cafe/gfx/pggat/lib/gat/pool/recipe" "gfx.cafe/gfx/pggat/lib/util/strutil" ) @@ -65,7 +65,7 @@ func (T *Module) Provision(ctx caddy.Context) error { network = "tcp" } - d := recipe2.Dialer{ + d := recipe.Dialer{ Network: network, Address: T.ServerAddress, SSLMode: T.ServerSSLMode, @@ -77,7 +77,7 @@ func (T *Module) Provision(ctx caddy.Context) error { } T.pool = pooler.NewPool() - T.pool.AddRecipe("pool", recipe2.NewRecipe(recipe2.Config{ + T.pool.AddRecipe("pool", recipe.NewRecipe(recipe.Config{ Dialer: d, })) diff --git a/lib/gat/pool/client.go b/lib/gat/pool/client.go new file mode 100644 index 00000000..c6d3c76e --- /dev/null +++ b/lib/gat/pool/client.go @@ -0,0 +1,55 @@ +package pool + +import ( + "gfx.cafe/gfx/pggat/lib/fed" + "gfx.cafe/gfx/pggat/lib/fed/middlewares/eqp" + "gfx.cafe/gfx/pggat/lib/fed/middlewares/ps" + "gfx.cafe/gfx/pggat/lib/fed/middlewares/unterminate" +) + +type pooledClient struct { + pooledConn + + ps *ps.Client + eqp *eqp.Client +} + +func newClient( + options Config, + conn *fed.Conn, +) *pooledClient { + conn.Middleware = append( + conn.Middleware, + unterminate.Unterminate, + ) + + var psClient *ps.Client + if options.ParameterStatusSync == ParameterStatusSyncDynamic { + // add ps middleware + psClient = ps.NewClient(conn.InitialParameters) + conn.Middleware = append(conn.Middleware, psClient) + } + + var eqpClient *eqp.Client + if options.ExtendedQuerySync { + // add eqp middleware + eqpClient = eqp.NewClient() + conn.Middleware = append(conn.Middleware, eqpClient) + } + + return &pooledClient{ + pooledConn: makeConn( + conn, + ), + ps: psClient, + eqp: eqpClient, + } +} + +func (T *pooledClient) GetEQP() *eqp.Client { + return T.eqp +} + +func (T *pooledClient) GetPS() *ps.Client { + return T.ps +} diff --git a/lib/gat/pool/config.go b/lib/gat/pool/config.go new file mode 100644 index 00000000..36604526 --- /dev/null +++ b/lib/gat/pool/config.go @@ -0,0 +1,61 @@ +package pool + +import ( + "go.uber.org/zap" + + "gfx.cafe/gfx/pggat/lib/util/dur" + "gfx.cafe/gfx/pggat/lib/util/strutil" +) + +type ParameterStatusSync int + +const ( + // ParameterStatusSyncNone does not attempt to sync parameter status. + ParameterStatusSyncNone ParameterStatusSync = iota + // ParameterStatusSyncInitial assumes both client and server have their initial status before syncing. + // Use in session pooling for lower latency + ParameterStatusSyncInitial + // ParameterStatusSyncDynamic will track parameter status and ensure they are synced correctly. + // Use in transaction pooling + ParameterStatusSyncDynamic +) + +type PoolingConfig struct { + NewPooler func() Pooler + // ReleaseAfterTransaction toggles whether servers should be released and re acquired after each transaction. + // Use false for lower latency + // Use true for better balancing + ReleaseAfterTransaction bool + + // ParameterStatusSync is the parameter syncing mode + ParameterStatusSync ParameterStatusSync + + // ExtendedQuerySync controls whether prepared statements and portals should be tracked and synced before use. + // Use false for lower latency + // Use true for transaction pooling + ExtendedQuerySync bool +} + +type ManagementConfig struct { + ServerResetQuery string `json:"server_reset_query,omitempty"` + // ServerIdleTimeout defines how long a server may be idle before it is disconnected + ServerIdleTimeout dur.Duration `json:"server_idle_timeout,omitempty"` + + // ServerReconnectInitialTime defines how long to wait initially before attempting a server reconnect + // 0 = disable, don't retry + ServerReconnectInitialTime dur.Duration `json:"server_reconnect_initial_time,omitempty"` + // ServerReconnectMaxTime defines the max amount of time to wait before attempting a server reconnect + // 0 = disable, back off infinitely + ServerReconnectMaxTime dur.Duration `json:"server_reconnect_max_time,omitempty"` + + // TrackedParameters are parameters which should be synced by updating the server, not the client. + TrackedParameters []strutil.CIString `json:"tracked_parameters,omitempty"` +} + +type Config struct { + PoolingConfig + + ManagementConfig + + Logger *zap.Logger +} diff --git a/lib/gat/pool/conn.go b/lib/gat/pool/conn.go new file mode 100644 index 00000000..065a6458 --- /dev/null +++ b/lib/gat/pool/conn.go @@ -0,0 +1,120 @@ +package pool + +import ( + "sync" + "sync/atomic" + "time" + + "github.com/google/uuid" + + "gfx.cafe/gfx/pggat/lib/fed" + "gfx.cafe/gfx/pggat/lib/gat/metrics" + "gfx.cafe/gfx/pggat/lib/util/strutil" +) + +type pooledConn struct { + id uuid.UUID + + conn *fed.Conn + + // metrics + + transactionCount atomic.Int64 + + lastMetricsRead time.Time + + state metrics.ConnState + peer uuid.UUID + since time.Time + + util [metrics.ConnStateCount]time.Duration + + mu sync.RWMutex +} + +func makeConn( + conn *fed.Conn, +) pooledConn { + return pooledConn{ + id: uuid.New(), + conn: conn, + + since: time.Now(), + } +} + +func (T *pooledConn) GetID() uuid.UUID { + return T.id +} + +func (T *pooledConn) GetConn() *fed.Conn { + return T.conn +} + +func (T *pooledConn) GetInitialParameters() map[strutil.CIString]string { + return T.conn.InitialParameters +} + +func (T *pooledConn) GetBackendKey() [8]byte { + return T.conn.BackendKey +} + +func (T *pooledConn) TransactionComplete() { + T.transactionCount.Add(1) +} + +func (T *pooledConn) SetState(state metrics.ConnState, peer uuid.UUID) { + T.mu.Lock() + defer T.mu.Unlock() + + now := time.Now() + + var since time.Duration + if T.since.Before(T.lastMetricsRead) { + since = now.Sub(T.lastMetricsRead) + } else { + since = now.Sub(T.since) + } + T.util[T.state] += since + + T.state = state + T.peer = peer + T.since = now +} + +func (T *pooledConn) GetState() (state metrics.ConnState, peer uuid.UUID, since time.Time) { + T.mu.RLock() + defer T.mu.RUnlock() + state = T.state + peer = T.peer + since = T.since + return +} + +func (T *pooledConn) ReadMetrics(m *metrics.Conn) { + T.mu.Lock() + defer T.mu.Unlock() + + now := time.Now() + + m.Time = now + + m.State = T.state + m.Peer = T.peer + m.Since = T.since + + m.Utilization = T.util + T.util = [metrics.ConnStateCount]time.Duration{} + + var since time.Duration + if m.Since.Before(T.lastMetricsRead) { + since = now.Sub(T.lastMetricsRead) + } else { + since = now.Sub(m.Since) + } + m.Utilization[m.State] += since + + m.TransactionCount = int(T.transactionCount.Swap(0)) + + T.lastMetricsRead = now +} diff --git a/lib/gat/pool/errors.go b/lib/gat/pool/errors.go new file mode 100644 index 00000000..b0a18cec --- /dev/null +++ b/lib/gat/pool/errors.go @@ -0,0 +1,5 @@ +package pool + +import "errors" + +var ErrClosed = errors.New("pool closed") diff --git a/lib/gat/pool/flow.go b/lib/gat/pool/flow.go new file mode 100644 index 00000000..e1263621 --- /dev/null +++ b/lib/gat/pool/flow.go @@ -0,0 +1,115 @@ +package pool + +import ( + "gfx.cafe/gfx/pggat/lib/bouncer/backends/v0" + "gfx.cafe/gfx/pggat/lib/fed" + "gfx.cafe/gfx/pggat/lib/fed/middlewares/eqp" + "gfx.cafe/gfx/pggat/lib/fed/middlewares/ps" + packets "gfx.cafe/gfx/pggat/lib/fed/packets/v3.0" + "gfx.cafe/gfx/pggat/lib/gat/metrics" + "gfx.cafe/gfx/pggat/lib/util/slices" +) + +func pair(options Config, client *pooledClient, server *pooledServer) (clientErr, serverErr error) { + defer func() { + client.SetState(metrics.ConnStateActive, server.GetID()) + server.SetState(metrics.ConnStateActive, client.GetID()) + }() + + if options.ParameterStatusSync != ParameterStatusSyncNone || options.ExtendedQuerySync { + client.SetState(metrics.ConnStatePairing, server.GetID()) + server.SetState(metrics.ConnStatePairing, client.GetID()) + } + + switch options.ParameterStatusSync { + case ParameterStatusSyncDynamic: + clientErr, serverErr = ps.Sync(options.TrackedParameters, client.GetConn(), client.GetPS(), server.GetConn(), server.GetPS()) + case ParameterStatusSyncInitial: + clientErr, serverErr = syncInitialParameters(options, client, server) + } + + if clientErr != nil || serverErr != nil { + return + } + + if options.ExtendedQuerySync { + serverErr = eqp.Sync(client.GetEQP(), server.GetConn(), server.GetEQP()) + } + + return +} + +func syncInitialParameters(options Config, client *pooledClient, server *pooledServer) (clientErr, serverErr error) { + clientParams := client.GetInitialParameters() + serverParams := server.GetInitialParameters() + + var packet fed.Packet + + for key, value := range clientParams { + // skip already set params + if serverParams[key] == value { + p := packets.ParameterStatus{ + Key: key.String(), + Value: serverParams[key], + } + packet = p.IntoPacket(packet) + clientErr = client.GetConn().WritePacket(packet) + if clientErr != nil { + return + } + continue + } + + setServer := slices.Contains(options.TrackedParameters, key) + + if !setServer { + value = serverParams[key] + } + + p := packets.ParameterStatus{ + Key: key.String(), + Value: value, + } + packet = p.IntoPacket(packet) + clientErr = client.GetConn().WritePacket(packet) + if clientErr != nil { + return + } + + if !setServer { + continue + } + + serverErr, _, packet = backends.SetParameter(server.GetConn(), nil, packet, key, value) + if serverErr != nil { + return + } + } + + for key, value := range serverParams { + if _, ok := clientParams[key]; ok { + continue + } + + // Don't need to run reset on server because it will reset it to the initial value + + // send to client + p := packets.ParameterStatus{ + Key: key.String(), + Value: value, + } + packet = p.IntoPacket(packet) + clientErr = client.GetConn().WritePacket(packet) + if clientErr != nil { + return + } + } + + return + +} + +func transactionComplete(client *pooledClient, server *pooledServer) { + client.TransactionComplete() + server.TransactionComplete() +} diff --git a/lib/gat/pool/pool.go b/lib/gat/pool/pool.go new file mode 100644 index 00000000..8399e39b --- /dev/null +++ b/lib/gat/pool/pool.go @@ -0,0 +1,497 @@ +package pool + +import ( + "errors" + "sync" + "sync/atomic" + "time" + + "github.com/google/uuid" + "go.uber.org/zap" + + "gfx.cafe/gfx/pggat/lib/bouncer/backends/v0" + "gfx.cafe/gfx/pggat/lib/bouncer/bouncers/v2" + "gfx.cafe/gfx/pggat/lib/fed" + packets "gfx.cafe/gfx/pggat/lib/fed/packets/v3.0" + "gfx.cafe/gfx/pggat/lib/gat/metrics" + "gfx.cafe/gfx/pggat/lib/util/slices" +) + +type Pool struct { + config Config + pooler Pooler + + closed chan struct{} + + pendingCount atomic.Int64 + pending chan struct{} + + recipes map[string]*Recipe + recipeScaleOrder slices.Sorted[string] + clients map[uuid.UUID]*pooledClient + clientsByKey map[[8]byte]*pooledClient + servers map[uuid.UUID]*pooledServer + serversByRecipe map[string][]*pooledServer + mu sync.RWMutex +} + +func NewPool(config Config) *Pool { + if config.NewPooler == nil { + panic("expected new pooler func") + } + pooler := config.NewPooler() + if pooler == nil { + panic("expected pooler") + } + + p := &Pool{ + config: config, + pooler: pooler, + + closed: make(chan struct{}), + pending: make(chan struct{}, 1), + } + + s := newScaler(p) + go s.Run() + + return p +} + +func (T *Pool) idlest() (server *pooledServer, at time.Time) { + T.mu.RLock() + defer T.mu.RUnlock() + + for _, s := range T.servers { + state, _, since := s.GetState() + if state != metrics.ConnStateIdle { + continue + } + + if at == (time.Time{}) || since.Before(at) { + server = s + at = since + } + } + + return +} + +func (T *Pool) AddRecipe(name string, r *Recipe) { + func() { + T.mu.Lock() + defer T.mu.Unlock() + + T.removeRecipe(name) + + if T.recipes == nil { + T.recipes = make(map[string]*Recipe) + } + T.recipes[name] = r + + // add to front of scale order + T.recipeScaleOrder = T.recipeScaleOrder.Insert(name, func(n string) int { + return len(T.serversByRecipe[n]) + }) + }() + + count := r.AllocateInitial() + for i := 0; i < count; i++ { + if err := T.scaleUpL1(name, r); err != nil { + T.config.Logger.Warn("failed to dial server", zap.Error(err)) + for j := i; j < count; j++ { + r.Free() + } + break + } + } +} + +func (T *Pool) RemoveRecipe(name string) { + T.mu.Lock() + defer T.mu.Unlock() + + T.removeRecipe(name) +} + +func (T *Pool) removeRecipe(name string) { + r, ok := T.recipes[name] + if !ok { + return + } + delete(T.recipes, name) + + servers := T.serversByRecipe[name] + delete(T.serversByRecipe, name) + // remove from recipeScaleOrder + T.recipeScaleOrder = slices.Delete(T.recipeScaleOrder, name) + + for _, server := range servers { + r.Free() + T.removeServerL1(server) + } +} + +func (T *Pool) scaleUpL0() (string, *Recipe) { + T.mu.RLock() + defer T.mu.RUnlock() + + for _, name := range T.recipeScaleOrder { + r := T.recipes[name] + if r.Allocate() { + return name, r + } + } + + return "", nil +} + +func (T *Pool) scaleUpL1(name string, r *Recipe) error { + conn, err := r.Dial() + if err != nil { + // failed to dial + r.Free() + return err + } + + server, err := func() (*pooledServer, error) { + T.mu.Lock() + defer T.mu.Unlock() + if T.recipes[name] != r { + // recipe was removed + r.Free() + return nil, errors.New("recipe was removed") + } + + server := newServer( + T.config, + name, + conn, + ) + + if T.servers == nil { + T.servers = make(map[uuid.UUID]*pooledServer) + } + T.servers[server.GetID()] = server + + if T.serversByRecipe == nil { + T.serversByRecipe = make(map[string][]*pooledServer) + } + T.serversByRecipe[name] = append(T.serversByRecipe[name], server) + // update order + T.recipeScaleOrder.Update(slices.Index(T.recipeScaleOrder, name), func(n string) int { + return len(T.serversByRecipe[n]) + }) + return server, nil + }() + + if err != nil { + return err + } + + T.pooler.AddServer(server.GetID()) + return nil +} + +func (T *Pool) scaleUp() bool { + name, r := T.scaleUpL0() + if r == nil { + return false + } + + err := T.scaleUpL1(name, r) + if err != nil { + T.config.Logger.Warn("failed to dial server", zap.Error(err)) + return false + } + + return true +} + +func (T *Pool) removeServer(server *pooledServer) { + T.mu.Lock() + defer T.mu.Unlock() + + T.removeServerL1(server) +} + +func (T *Pool) removeServerL1(server *pooledServer) { + delete(T.servers, server.GetID()) + T.pooler.DeleteServer(server.GetID()) + _ = server.GetConn().Close() + if T.serversByRecipe != nil { + name := server.GetRecipe() + T.serversByRecipe[name] = slices.Delete(T.serversByRecipe[name], server) + // update order + index := slices.Index(T.recipeScaleOrder, name) + if index != -1 { + T.recipeScaleOrder.Update(index, func(n string) int { + return len(T.serversByRecipe[n]) + }) + } + } +} + +func (T *Pool) acquireServer(client *pooledClient) *pooledServer { + client.SetState(metrics.ConnStateAwaitingServer, uuid.Nil) + + for { + serverID := T.pooler.Acquire(client.GetID(), SyncModeNonBlocking) + if serverID == uuid.Nil { + T.pendingCount.Add(1) + select { + case T.pending <- struct{}{}: + default: + } + serverID = T.pooler.Acquire(client.GetID(), SyncModeBlocking) + T.pendingCount.Add(-1) + if serverID == uuid.Nil { + return nil + } + } + + T.mu.RLock() + server, ok := T.servers[serverID] + T.mu.RUnlock() + if !ok { + T.pooler.DeleteServer(serverID) + continue + } + return server + } +} + +func (T *Pool) releaseServer(server *pooledServer) { + if T.config.ServerResetQuery != "" { + server.SetState(metrics.ConnStateRunningResetQuery, uuid.Nil) + + err, _, _ := backends.QueryString(server.GetConn(), nil, nil, T.config.ServerResetQuery) + if err != nil { + T.removeServer(server) + return + } + } + + server.SetState(metrics.ConnStateIdle, uuid.Nil) + + T.pooler.Release(server.GetID()) +} + +func (T *Pool) Serve( + conn *fed.Conn, +) error { + defer func() { + _ = conn.Close() + }() + + client := newClient( + T.config, + conn, + ) + + return T.serve(client, false) +} + +// ServeBot is for clients that don't need initial parameters, cancelling queries, and are ready now. Use Serve for +// real clients +func (T *Pool) ServeBot( + conn fed.ReadWriteCloser, +) error { + defer func() { + _ = conn.Close() + }() + + client := newClient( + T.config, + &fed.Conn{ + ReadWriteCloser: conn, + }, + ) + + return T.serve(client, true) +} + +func (T *Pool) serve(client *pooledClient, initialized bool) error { + T.addClient(client) + defer T.removeClient(client) + + var err error + var serverErr error + + var server *pooledServer + defer func() { + if server != nil { + if serverErr != nil { + T.removeServer(server) + } else { + T.releaseServer(server) + } + server = nil + } + }() + + var packet fed.Packet + + if !initialized { + server = T.acquireServer(client) + if server == nil { + return ErrClosed + } + + err, serverErr = pair(T.config, client, server) + if serverErr != nil { + return serverErr + } + if err != nil { + return err + } + + p := packets.ReadyForQuery('I') + packet = p.IntoPacket(packet) + err = client.GetConn().WritePacket(packet) + if err != nil { + return err + } + } + + for { + if server != nil && T.config.ReleaseAfterTransaction { + client.SetState(metrics.ConnStateIdle, uuid.Nil) + T.releaseServer(server) + server = nil + } + + packet, err = client.GetConn().ReadPacket(true, packet) + if err != nil { + return err + } + + if server == nil { + server = T.acquireServer(client) + if server == nil { + return ErrClosed + } + + err, serverErr = pair(T.config, client, server) + } + if err == nil && serverErr == nil { + packet, err, serverErr = bouncers.Bounce(client.GetConn(), server.GetConn(), packet) + } + if serverErr != nil { + return serverErr + } else { + transactionComplete(client, server) + } + + if err != nil { + return err + } + } +} + +func (T *Pool) addClient(client *pooledClient) { + T.mu.Lock() + defer T.mu.Unlock() + + if T.clients == nil { + T.clients = make(map[uuid.UUID]*pooledClient) + } + T.clients[client.GetID()] = client + if T.clientsByKey == nil { + T.clientsByKey = make(map[[8]byte]*pooledClient) + } + T.clientsByKey[client.GetBackendKey()] = client + T.pooler.AddClient(client.GetID()) +} + +func (T *Pool) removeClient(client *pooledClient) { + T.mu.Lock() + defer T.mu.Unlock() + + T.removeClientL1(client) +} + +func (T *Pool) removeClientL1(client *pooledClient) { + T.pooler.DeleteClient(client.GetID()) + _ = client.conn.Close() + delete(T.clients, client.GetID()) + delete(T.clientsByKey, client.GetBackendKey()) +} + +func (T *Pool) Cancel(key [8]byte) { + T.mu.RLock() + defer T.mu.RUnlock() + + client, ok := T.clientsByKey[key] + if !ok { + return + } + + state, peer, _ := client.GetState() + if state != metrics.ConnStateActive { + return + } + + server, ok := T.servers[peer] + if !ok { + return + } + + // prevent state from changing by RLocking the server + server.mu.RLock() + defer server.mu.RUnlock() + + // make sure peer is still set + if server.peer != peer { + return + } + + r, ok := T.recipes[server.recipe] + if !ok { + return + } + + r.Cancel(server.GetBackendKey()) +} + +func (T *Pool) ReadMetrics(m *metrics.Pool) { + T.mu.RLock() + defer T.mu.RUnlock() + + if len(T.clients) != 0 && m.Clients == nil { + m.Clients = make(map[uuid.UUID]metrics.Conn) + } + if len(T.servers) != 0 && m.Servers == nil { + m.Servers = make(map[uuid.UUID]metrics.Conn) + } + + for id, client := range T.clients { + var mc metrics.Conn + client.ReadMetrics(&mc) + m.Clients[id] = mc + } + + for id, server := range T.servers { + var mc metrics.Conn + server.ReadMetrics(&mc) + m.Servers[id] = mc + } +} + +func (T *Pool) Close() { + close(T.closed) + T.pooler.Close() + + T.mu.Lock() + defer T.mu.Unlock() + + // remove clients + for _, client := range T.clients { + T.removeClientL1(client) + } + + // remove recipes + for name := range T.recipes { + T.removeRecipe(name) + } +} diff --git a/lib/pool/pooler.go b/lib/gat/pool/pooler.go similarity index 100% rename from lib/pool/pooler.go rename to lib/gat/pool/pooler.go diff --git a/lib/gat/pool/recipe.go b/lib/gat/pool/recipe.go new file mode 100644 index 00000000..cfa70d17 --- /dev/null +++ b/lib/gat/pool/recipe.go @@ -0,0 +1,5 @@ +package pool + +import "gfx.cafe/gfx/pggat/lib/gat/pool/recipe" + +type Recipe = recipe.Recipe diff --git a/lib/pool/recipe/config.go b/lib/gat/pool/recipe/config.go similarity index 100% rename from lib/pool/recipe/config.go rename to lib/gat/pool/recipe/config.go diff --git a/lib/pool/recipe/dialer.go b/lib/gat/pool/recipe/dialer.go similarity index 91% rename from lib/pool/recipe/dialer.go rename to lib/gat/pool/recipe/dialer.go index 3ce99919..8202c2b9 100644 --- a/lib/pool/recipe/dialer.go +++ b/lib/gat/pool/recipe/dialer.go @@ -5,6 +5,8 @@ import ( "net" "gfx.cafe/gfx/pggat/lib/auth" + "gfx.cafe/gfx/pggat/lib/bouncer" + "gfx.cafe/gfx/pggat/lib/bouncer/backends/v0" "gfx.cafe/gfx/pggat/lib/fed" "gfx.cafe/gfx/pggat/lib/util/strutil" ) @@ -13,7 +15,7 @@ type Dialer struct { Network string Address string - SSLMode bounce.SSLMode + SSLMode bouncer.SSLMode SSLConfig *tls.Config Username string Credentials auth.Credentials diff --git a/lib/pool/recipe/recipe.go b/lib/gat/pool/recipe/recipe.go similarity index 100% rename from lib/pool/recipe/recipe.go rename to lib/gat/pool/recipe/recipe.go diff --git a/lib/gat/pool/scaler.go b/lib/gat/pool/scaler.go new file mode 100644 index 00000000..8beb64d3 --- /dev/null +++ b/lib/gat/pool/scaler.go @@ -0,0 +1,115 @@ +package pool + +import ( + "time" + + "go.uber.org/zap" +) + +type scaler struct { + pool *Pool + + backingOff bool + backoff time.Duration + + // timers + idle *time.Timer + pending *time.Timer +} + +func newScaler(pool *Pool) *scaler { + s := &scaler{ + pool: pool, + backoff: pool.config.ServerReconnectInitialTime.Duration(), + } + + if pool.config.ServerIdleTimeout != 0 { + s.idle = time.NewTimer(pool.config.ServerIdleTimeout.Duration()) + } + + return s +} + +func (T *scaler) idleTimeout(now time.Time) { + // idle loop for scaling down + var wait time.Duration + + var idlest *pooledServer + var idleStart time.Time + for idlest, idleStart = T.pool.idlest(); idlest != nil && now.Sub(idleStart) > T.pool.config.ServerIdleTimeout.Duration(); idlest, idleStart = T.pool.idlest() { + T.pool.removeServer(idlest) + } + + if idlest == nil { + wait = T.pool.config.ServerIdleTimeout.Duration() + } else { + wait = idleStart.Add(T.pool.config.ServerIdleTimeout.Duration()).Sub(now) + } + + T.idle.Reset(wait) +} + +func (T *scaler) pendingTimeout() { + if T.backingOff { + T.backoff *= 2 + if T.pool.config.ServerReconnectMaxTime != 0 && T.backoff > T.pool.config.ServerReconnectMaxTime.Duration() { + T.backoff = T.pool.config.ServerReconnectMaxTime.Duration() + } + } + + for T.pool.pendingCount.Load() > 0 { + // pending loop for scaling up + if T.pool.scaleUp() { + // scale up successful, see if we need to scale up more + T.backoff = T.pool.config.ServerReconnectInitialTime.Duration() + T.backingOff = false + continue + } + + if T.backoff == 0 { + // no backoff + T.backoff = T.pool.config.ServerReconnectInitialTime.Duration() + T.backingOff = false + continue + } + + T.backingOff = true + if T.pending == nil { + T.pending = time.NewTimer(T.backoff) + } else { + T.pending.Reset(T.backoff) + } + + T.pool.config.Logger.Warn("failed to dial server", zap.Duration("backoff", T.backoff)) + + return + } +} + +func (T *scaler) Run() { + for { + var idle <-chan time.Time + if T.idle != nil { + idle = T.idle.C + } + + var pending1 <-chan struct{} + var pending2 <-chan time.Time + if T.backingOff { + pending2 = T.pending.C + } else { + pending1 = T.pool.pending + } + + select { + case t := <-idle: + T.idleTimeout(t) + case <-pending1: + T.pendingTimeout() + case <-pending2: + T.pendingTimeout() + case <-T.pool.closed: + return + } + } +} diff --git a/lib/gat/pool/server.go b/lib/gat/pool/server.go new file mode 100644 index 00000000..9afa7c99 --- /dev/null +++ b/lib/gat/pool/server.go @@ -0,0 +1,57 @@ +package pool + +import ( + "gfx.cafe/gfx/pggat/lib/fed" + "gfx.cafe/gfx/pggat/lib/fed/middlewares/eqp" + "gfx.cafe/gfx/pggat/lib/fed/middlewares/ps" +) + +type pooledServer struct { + pooledConn + + recipe string + + ps *ps.Server + eqp *eqp.Server +} + +func newServer( + options Config, + recipe string, + conn *fed.Conn, +) *pooledServer { + var psServer *ps.Server + if options.ParameterStatusSync == ParameterStatusSyncDynamic { + // add ps middleware + psServer = ps.NewServer(conn.InitialParameters) + conn.Middleware = append(conn.Middleware, psServer) + } + + var eqpServer *eqp.Server + if options.ExtendedQuerySync { + // add eqp middleware + eqpServer = eqp.NewServer() + conn.Middleware = append(conn.Middleware, eqpServer) + } + + return &pooledServer{ + pooledConn: makeConn( + conn, + ), + recipe: recipe, + ps: psServer, + eqp: eqpServer, + } +} + +func (T *pooledServer) GetRecipe() string { + return T.recipe +} + +func (T *pooledServer) GetEQP() *eqp.Server { + return T.eqp +} + +func (T *pooledServer) GetPS() *ps.Server { + return T.ps +} diff --git a/lib/gat/pool/withcredentials.go b/lib/gat/pool/withcredentials.go new file mode 100644 index 00000000..9c7382b3 --- /dev/null +++ b/lib/gat/pool/withcredentials.go @@ -0,0 +1,8 @@ +package pool + +import "gfx.cafe/gfx/pggat/lib/auth" + +type WithCredentials struct { + *Pool + Credentials auth.Credentials +} diff --git a/lib/pool/backend.go b/lib/pool/backend.go deleted file mode 100644 index d49aff6c..00000000 --- a/lib/pool/backend.go +++ /dev/null @@ -1,209 +0,0 @@ -package pool - -import ( - "log" - "sync" - "sync/atomic" - "time" - - "gfx.cafe/gfx/pggat/lib/fed" - "gfx.cafe/gfx/pggat/lib/pool/recipe" -) - -type backendRecipe struct { - weight atomic.Int64 - recipe *recipe.Recipe - - servers []*fed.Conn // TODO(garet) - killed bool -} - -type Backend struct { - config BackendConfig - pooler Pooler - - closed chan struct{} - - scaleUp chan struct{} - waiters atomic.Int64 - - recipes map[string]*backendRecipe - mu sync.RWMutex -} - -func NewBackend(config BackendConfig) *Backend { - b := &Backend{ - config: config, - pooler: config.NewPooler(), - - closed: make(chan struct{}), - - scaleUp: make(chan struct{}, 1), - } - - go b.scaleLoop() - - return b -} - -func (T *Backend) addRecipe(name string, r *recipe.Recipe) { - if T.recipes == nil { - T.recipes = make(map[string]*backendRecipe) - } - T.recipes[name] = &backendRecipe{ - recipe: r, - } -} - -func (T *Backend) AddRecipe(name string, r *recipe.Recipe) { - T.mu.Lock() - defer T.mu.Unlock() - - T.removeRecipe(name) - T.addRecipe(name, r) -} - -func (T *Backend) removeRecipe(name string) { - r, ok := T.recipes[name] - if !ok { - return - } - delete(T.recipes, name) - - r.killed = true - for _, server := range r.servers { - _ = server.Close() - r.recipe.Free() - } - r.servers = r.servers[:0] -} - -func (T *Backend) RemoveRecipe(name string) { - T.mu.Lock() - defer T.mu.Unlock() - - T.removeRecipe(name) -} - -func (T *Backend) ScaleUp() error { - var target *backendRecipe - func() { - T.mu.RLock() - defer T.mu.RUnlock() - - var targetWeight int64 = -1 - for _, r := range T.recipes { - if target == nil { - target = r - continue - } - weight := r.weight.Load() - if weight > targetWeight { - target = r - targetWeight = weight - continue - } - } - - if !target.recipe.Allocate() { - return - } - target.weight.Add(1) - }() - - if target == nil { - return ErrNoScalableRecipe - } - - server, err := target.recipe.Dial() - if err != nil { - target.recipe.Free() - target.weight.Add(-1) - return err - } - - T.mu.Lock() - defer T.mu.Unlock() - - if target.killed { - target.recipe.Free() - target.weight.Add(-1) - return nil - } - - target.servers = append(target.servers, server) - return nil -} - -// ScaleDown attempts to scale down the pool. Returns when the next scale down should happen -func (T *Backend) ScaleDown() time.Duration { - // TODO(garet) - return T.config.IdleTimeout -} - -func (T *Backend) scaleLoop() { - var scaleUpBackoffDuration time.Duration - var scaleUpBackoff *time.Timer - - var idleTimeout *time.Timer - if T.config.IdleTimeout != 0 { - idleTimeout = time.NewTimer(T.config.IdleTimeout) - } - - defer func() { - if scaleUpBackoff != nil { - scaleUpBackoff.Stop() - } - if idleTimeout != nil { - idleTimeout.Stop() - } - }() - - for { - var scaleUpBackoffC <-chan time.Time - var scaleUpC <-chan struct{} - if scaleUpBackoffDuration != 0 { - scaleUpBackoffC = scaleUpBackoff.C - } else { - scaleUpC = T.scaleUp - } - - var idleTimeoutC <-chan time.Time - if idleTimeout != nil { - idleTimeoutC = idleTimeout.C - } - - select { - case <-scaleUpC: - if err := T.ScaleUp(); err != nil { - log.Printf("error scaling up: %v", err) - - scaleUpBackoffDuration = T.config.ReconnectInitialTime - if scaleUpBackoffDuration != 0 { - if scaleUpBackoff == nil { - scaleUpBackoff = time.NewTimer(scaleUpBackoffDuration) - } else { - scaleUpBackoff.Reset(scaleUpBackoffDuration) - } - } - } - case <-scaleUpBackoffC: - if err := T.ScaleUp(); err != nil { - log.Printf("error scaling up: %v", err) - - scaleUpBackoffDuration *= 2 - scaleUpBackoff.Reset(scaleUpBackoffDuration) - } else { - scaleUpBackoffDuration = 0 - } - case <-idleTimeoutC: - idleTimeout.Reset(T.ScaleDown()) - case <-T.closed: - return - } - } -} - -func (T *Backend) Close() { - close(T.closed) -} diff --git a/lib/pool/config.go b/lib/pool/config.go deleted file mode 100644 index 8fa522a6..00000000 --- a/lib/pool/config.go +++ /dev/null @@ -1,12 +0,0 @@ -package pool - -import "time" - -type BackendConfig struct { - NewPooler func() Pooler - - IdleTimeout time.Duration - - ReconnectInitialTime time.Duration - ReconnectMaxTime time.Duration -} diff --git a/lib/pool/errors.go b/lib/pool/errors.go deleted file mode 100644 index 4fbd6ab2..00000000 --- a/lib/pool/errors.go +++ /dev/null @@ -1,7 +0,0 @@ -package pool - -import "errors" - -var ( - ErrNoScalableRecipe = errors.New("no scalable recipe") -) diff --git a/lib/pool/frontend.go b/lib/pool/frontend.go deleted file mode 100644 index 799c9321..00000000 --- a/lib/pool/frontend.go +++ /dev/null @@ -1,5 +0,0 @@ -package pool - -type Frontend struct { - Backend *Backend -} diff --git a/test/config.go b/test/config.go index e66fc44e..bc13828d 100644 --- a/test/config.go +++ b/test/config.go @@ -1,7 +1,7 @@ package test import ( - "gfx.cafe/gfx/pggat/lib/pool/recipe" + "gfx.cafe/gfx/pggat/lib/gat/pool/recipe" ) type Config struct { diff --git a/test/runner.go b/test/runner.go index 499127bc..d23e94c6 100644 --- a/test/runner.go +++ b/test/runner.go @@ -8,8 +8,8 @@ import ( "gfx.cafe/gfx/pggat/lib/bouncer/bouncers/v2" "gfx.cafe/gfx/pggat/lib/fed" packets "gfx.cafe/gfx/pggat/lib/fed/packets/v3.0" + "gfx.cafe/gfx/pggat/lib/gat/pool/recipe" "gfx.cafe/gfx/pggat/lib/gsql" - "gfx.cafe/gfx/pggat/lib/pool/recipe" "gfx.cafe/gfx/pggat/lib/util/flip" "gfx.cafe/gfx/pggat/test/inst" ) diff --git a/test/tester_test.go b/test/tester_test.go index 63c5cd05..6ef9b431 100644 --- a/test/tester_test.go +++ b/test/tester_test.go @@ -12,17 +12,16 @@ import ( "github.com/caddyserver/caddy/v2" "github.com/caddyserver/caddy/v2/caddyconfig" - "gfx.cafe/gfx/pggat/lib/gat/pool" - "gfx.cafe/gfx/pggat/lib/auth/credentials" "gfx.cafe/gfx/pggat/lib/gat" "gfx.cafe/gfx/pggat/lib/gat/gatcaddyfile" pool_handler "gfx.cafe/gfx/pggat/lib/gat/handlers/pool" "gfx.cafe/gfx/pggat/lib/gat/handlers/rewrite_password" "gfx.cafe/gfx/pggat/lib/gat/matchers" + "gfx.cafe/gfx/pggat/lib/gat/pool" + "gfx.cafe/gfx/pggat/lib/gat/pool/recipe" "gfx.cafe/gfx/pggat/lib/gat/poolers/session" "gfx.cafe/gfx/pggat/lib/gat/poolers/transaction" - "gfx.cafe/gfx/pggat/lib/pool/recipe" "gfx.cafe/gfx/pggat/test" "gfx.cafe/gfx/pggat/test/tests" ) -- GitLab