diff --git a/lib/bouncer/backends/v0/accept.go b/lib/bounce/backends/v0/accept.go similarity index 99% rename from lib/bouncer/backends/v0/accept.go rename to lib/bounce/backends/v0/accept.go index d094f2f19f5f39bf5b38ca68f71a7a589ba7c5c9..7f29f985e314ef0909593b4ab314c2f693242cc0 100644 --- a/lib/bouncer/backends/v0/accept.go +++ b/lib/bounce/backends/v0/accept.go @@ -6,7 +6,6 @@ import ( "io" "gfx.cafe/gfx/pggat/lib/auth" - "gfx.cafe/gfx/pggat/lib/bouncer" "gfx.cafe/gfx/pggat/lib/fed" packets "gfx.cafe/gfx/pggat/lib/fed/packets/v3.0" "gfx.cafe/gfx/pggat/lib/util/strutil" @@ -346,7 +345,7 @@ func accept(ctx *acceptContext) error { func Accept( conn *fed.Conn, - sslMode bouncer.SSLMode, + sslMode bounce.SSLMode, sslConfig *tls.Config, username string, credentials auth.Credentials, diff --git a/lib/bouncer/backends/v0/cancel.go b/lib/bounce/backends/v0/cancel.go similarity index 100% rename from lib/bouncer/backends/v0/cancel.go rename to lib/bounce/backends/v0/cancel.go diff --git a/lib/bouncer/backends/v0/context.go b/lib/bounce/backends/v0/context.go similarity index 100% rename from lib/bouncer/backends/v0/context.go rename to lib/bounce/backends/v0/context.go diff --git a/lib/bouncer/backends/v0/errors.go b/lib/bounce/backends/v0/errors.go similarity index 100% rename from lib/bouncer/backends/v0/errors.go rename to lib/bounce/backends/v0/errors.go diff --git a/lib/bouncer/backends/v0/options.go b/lib/bounce/backends/v0/options.go similarity index 81% rename from lib/bouncer/backends/v0/options.go rename to lib/bounce/backends/v0/options.go index 9d98d466be8d1eb0219b78a869f09d646b37f274..f23feb098d983caa54d603e410980941c95c398a 100644 --- a/lib/bouncer/backends/v0/options.go +++ b/lib/bounce/backends/v0/options.go @@ -4,12 +4,11 @@ import ( "crypto/tls" "gfx.cafe/gfx/pggat/lib/auth" - "gfx.cafe/gfx/pggat/lib/bouncer" "gfx.cafe/gfx/pggat/lib/util/strutil" ) type acceptOptions struct { - SSLMode bouncer.SSLMode + SSLMode bounce.SSLMode SSLConfig *tls.Config Username string Credentials auth.Credentials diff --git a/lib/bouncer/backends/v0/query.go b/lib/bounce/backends/v0/query.go similarity index 100% rename from lib/bouncer/backends/v0/query.go rename to lib/bounce/backends/v0/query.go diff --git a/lib/bouncer/bouncers/v2/bouncer.go b/lib/bounce/bouncers/v2/bouncer.go similarity index 100% rename from lib/bouncer/bouncers/v2/bouncer.go rename to lib/bounce/bouncers/v2/bouncer.go diff --git a/lib/bouncer/frontends/v0/accept.go b/lib/bounce/frontends/v0/accept.go similarity index 100% rename from lib/bouncer/frontends/v0/accept.go rename to lib/bounce/frontends/v0/accept.go diff --git a/lib/bouncer/frontends/v0/authenticate.go b/lib/bounce/frontends/v0/authenticate.go similarity index 100% rename from lib/bouncer/frontends/v0/authenticate.go rename to lib/bounce/frontends/v0/authenticate.go diff --git a/lib/bouncer/frontends/v0/context.go b/lib/bounce/frontends/v0/context.go similarity index 100% rename from lib/bouncer/frontends/v0/context.go rename to lib/bounce/frontends/v0/context.go diff --git a/lib/bouncer/frontends/v0/options.go b/lib/bounce/frontends/v0/options.go similarity index 100% rename from lib/bouncer/frontends/v0/options.go rename to lib/bounce/frontends/v0/options.go diff --git a/lib/bouncer/frontends/v0/params.go b/lib/bounce/frontends/v0/params.go similarity index 100% rename from lib/bouncer/frontends/v0/params.go rename to lib/bounce/frontends/v0/params.go diff --git a/lib/bouncer/sslmode.go b/lib/bounce/sslmode.go similarity index 97% rename from lib/bouncer/sslmode.go rename to lib/bounce/sslmode.go index cb8b63d59e6304d6220957a41605e64e0064ffb3..335e52ab6aef47c39562a65596228f8b97381b24 100644 --- a/lib/bouncer/sslmode.go +++ b/lib/bounce/sslmode.go @@ -1,4 +1,4 @@ -package bouncer +package bounce type SSLMode string diff --git a/lib/gat/gatcaddyfile/handler.go b/lib/gat/gatcaddyfile/handler.go index 758478728e635faee8bf397e87390a27334d2cbf..501c4173d8c3659ae022c4f121857364f37d0d62 100644 --- a/lib/gat/gatcaddyfile/handler.go +++ b/lib/gat/gatcaddyfile/handler.go @@ -13,7 +13,6 @@ import ( "github.com/caddyserver/caddy/v2/caddyconfig" "github.com/caddyserver/caddy/v2/caddyconfig/caddyfile" - "gfx.cafe/gfx/pggat/lib/bouncer" "gfx.cafe/gfx/pggat/lib/gat/handlers/allowed_startup_parameters" "gfx.cafe/gfx/pggat/lib/gat/handlers/discovery" "gfx.cafe/gfx/pggat/lib/gat/handlers/pgbouncer" @@ -141,7 +140,7 @@ func init() { "pooler", warnings, ), - ServerSSLMode: bouncer.SSLModePrefer, + ServerSSLMode: bounce.SSLModePrefer, ServerSSL: JSONModuleObject( &insecure_skip_verify.Client{}, SSLClient, @@ -221,7 +220,7 @@ func init() { return nil, d.ArgErr() } - module.ServerSSLMode = bouncer.SSLMode(d.Val()) + module.ServerSSLMode = bounce.SSLMode(d.Val()) if !d.NextArg() { return nil, d.ArgErr() @@ -390,7 +389,7 @@ func init() { warnings, ), - ServerSSLMode: bouncer.SSLModePrefer, + ServerSSLMode: bounce.SSLModePrefer, ServerSSL: JSONModuleObject( &insecure_skip_verify.Client{}, SSLClient, @@ -455,7 +454,7 @@ func init() { return nil, d.ArgErr() } - module.ServerSSLMode = bouncer.SSLMode(d.Val()) + module.ServerSSLMode = bounce.SSLMode(d.Val()) if !d.NextArg() { return nil, d.ArgErr() diff --git a/lib/gat/handlers/discovery/config.go b/lib/gat/handlers/discovery/config.go index b2712561ddabae7281b277222057eb05be92581d..bb4024b6b896449a34da71274895f44c13b923e4 100644 --- a/lib/gat/handlers/discovery/config.go +++ b/lib/gat/handlers/discovery/config.go @@ -3,7 +3,6 @@ package discovery import ( "encoding/json" - "gfx.cafe/gfx/pggat/lib/bouncer" "gfx.cafe/gfx/pggat/lib/util/dur" ) @@ -15,7 +14,7 @@ type Config struct { Pooler json.RawMessage `json:"pooler" caddy:"namespace=pggat.poolers inline_key=pooler"` - ServerSSLMode bouncer.SSLMode `json:"server_ssl_mode,omitempty"` + ServerSSLMode bounce.SSLMode `json:"server_ssl_mode,omitempty"` ServerSSL json.RawMessage `json:"server_ssl,omitempty" caddy:"namespace=pggat.ssl.clients inline_key=provider"` ServerStartupParameters map[string]string `json:"server_startup_parameters,omitempty"` diff --git a/lib/gat/handlers/discovery/discoverers/google_cloud_sql/discoverer.go b/lib/gat/handlers/discovery/discoverers/google_cloud_sql/discoverer.go index 6bb9351b023cd1c14533778c68fe9c1a575b239d..cbaa009b0c096afcaa90406d3eb52bf89f632384 100644 --- a/lib/gat/handlers/discovery/discoverers/google_cloud_sql/discoverer.go +++ b/lib/gat/handlers/discovery/discoverers/google_cloud_sql/discoverer.go @@ -5,16 +5,13 @@ import ( "net" "strings" - "gfx.cafe/gfx/pggat/lib/gat/pool/recipe" - "github.com/caddyserver/caddy/v2" sqladmin "google.golang.org/api/sqladmin/v1beta4" "gfx.cafe/gfx/pggat/lib/gat/handlers/discovery" + "gfx.cafe/gfx/pggat/lib/pool/recipe" "gfx.cafe/gfx/pggat/lib/auth/credentials" - "gfx.cafe/gfx/pggat/lib/bouncer" - "gfx.cafe/gfx/pggat/lib/bouncer/bouncers/v2" "gfx.cafe/gfx/pggat/lib/fed" "gfx.cafe/gfx/pggat/lib/gsql" ) @@ -120,7 +117,7 @@ func (T *Discoverer) instanceToCluster(primary *sqladmin.DatabaseInstance, repli admin, err = recipe.Dialer{ Network: "tcp", Address: primaryAddress, - SSLMode: bouncer.SSLModePrefer, + SSLMode: bounce.SSLModePrefer, SSLConfig: &tls.Config{ InsecureSkipVerify: true, }, diff --git a/lib/gat/handlers/discovery/module.go b/lib/gat/handlers/discovery/module.go index bd6614d2e3f9350789e84dd0cb2637f060d85bbd..73e4bf49c07e065c233f1deaee945085d2f37ee9 100644 --- a/lib/gat/handlers/discovery/module.go +++ b/lib/gat/handlers/discovery/module.go @@ -9,14 +9,15 @@ import ( "github.com/caddyserver/caddy/v2" "go.uber.org/zap" + "gfx.cafe/gfx/pggat/lib/gat/pool" + "gfx.cafe/gfx/pggat/lib/auth" "gfx.cafe/gfx/pggat/lib/auth/credentials" "gfx.cafe/gfx/pggat/lib/bouncer/frontends/v0" "gfx.cafe/gfx/pggat/lib/fed" "gfx.cafe/gfx/pggat/lib/gat" "gfx.cafe/gfx/pggat/lib/gat/metrics" - "gfx.cafe/gfx/pggat/lib/gat/pool" - "gfx.cafe/gfx/pggat/lib/gat/pool/recipe" + recipe2 "gfx.cafe/gfx/pggat/lib/pool/recipe" "gfx.cafe/gfx/pggat/lib/util/maps" "gfx.cafe/gfx/pggat/lib/util/slices" "gfx.cafe/gfx/pggat/lib/util/strutil" @@ -223,7 +224,7 @@ func (T *Module) replacePrimary(users []User, databases []string, endpoint Endpo for _, user := range users { primaryCreds, _ := T.creds(user) for _, database := range databases { - primary := recipe.Dialer{ + primary := recipe2.Dialer{ Network: endpoint.Network, Address: endpoint.Address, Username: user.Username, @@ -240,7 +241,7 @@ func (T *Module) replacePrimary(users []User, databases []string, endpoint Endpo } p.RemoveRecipe("primary") - p.AddRecipe("primary", recipe.NewRecipe(recipe.Config{ + p.AddRecipe("primary", recipe2.NewRecipe(recipe2.Config{ Dialer: primary, })) } @@ -258,7 +259,7 @@ func (T *Module) addReplicas(replicas map[string]Endpoint, users []User, databas } for id, r := range replicas { - replica := recipe.Dialer{ + replica := recipe2.Dialer{ Network: r.Network, Address: r.Address, Username: user.Username, @@ -268,7 +269,7 @@ func (T *Module) addReplicas(replicas map[string]Endpoint, users []User, databas SSLConfig: T.sslConfig, StartupParameters: T.serverStartupParameters, } - replicaPool.AddRecipe(id, recipe.NewRecipe(recipe.Config{ + replicaPool.AddRecipe(id, recipe2.NewRecipe(recipe2.Config{ Dialer: replica, })) } @@ -297,7 +298,7 @@ func (T *Module) addReplica(users []User, databases []string, id string, endpoin continue } - replica := recipe.Dialer{ + replica := recipe2.Dialer{ Network: endpoint.Network, Address: endpoint.Address, Username: user.Username, @@ -307,7 +308,7 @@ func (T *Module) addReplica(users []User, databases []string, id string, endpoin SSLConfig: T.sslConfig, StartupParameters: T.serverStartupParameters, } - p.AddRecipe(id, recipe.NewRecipe(recipe.Config{ + p.AddRecipe(id, recipe2.NewRecipe(recipe2.Config{ Dialer: replica, })) } @@ -331,7 +332,7 @@ func (T *Module) addUser(primaryEndpoint Endpoint, replicas map[string]Endpoint, replicaUsername := T.replicaUsername(user.Username) primaryCreds, replicaCreds := T.creds(user) for _, database := range databases { - base := recipe.Dialer{ + base := recipe2.Dialer{ Username: user.Username, Credentials: primaryCreds, Database: database, @@ -348,7 +349,7 @@ func (T *Module) addUser(primaryEndpoint Endpoint, replicas map[string]Endpoint, Pool: T.pooler.NewPool(), Credentials: primaryCreds, } - primaryPool.AddRecipe("primary", recipe.NewRecipe(recipe.Config{ + primaryPool.AddRecipe("primary", recipe2.NewRecipe(recipe2.Config{ Dialer: primary, })) T.addPool(user.Username, database, primaryPool) @@ -363,7 +364,7 @@ func (T *Module) addUser(primaryEndpoint Endpoint, replicas map[string]Endpoint, replica := base replica.Network = r.Network replica.Address = r.Address - replicaPool.AddRecipe(id, recipe.NewRecipe(recipe.Config{ + replicaPool.AddRecipe(id, recipe2.NewRecipe(recipe2.Config{ Dialer: replica, })) } @@ -390,7 +391,7 @@ func (T *Module) addDatabase(primaryEndpoint Endpoint, replicas map[string]Endpo replicaUsername := T.replicaUsername(user.Username) primaryCreds, replicaCreds := T.creds(user) - base := recipe.Dialer{ + base := recipe2.Dialer{ Username: user.Username, Credentials: primaryCreds, Database: database, @@ -407,7 +408,7 @@ func (T *Module) addDatabase(primaryEndpoint Endpoint, replicas map[string]Endpo Pool: T.pooler.NewPool(), Credentials: primaryCreds, } - primaryPool.AddRecipe("primary", recipe.NewRecipe(recipe.Config{ + primaryPool.AddRecipe("primary", recipe2.NewRecipe(recipe2.Config{ Dialer: primary, })) T.addPool(user.Username, database, primaryPool) @@ -422,7 +423,7 @@ func (T *Module) addDatabase(primaryEndpoint Endpoint, replicas map[string]Endpo replica := base replica.Network = r.Network replica.Address = r.Address - replicaPool.AddRecipe(id, recipe.NewRecipe(recipe.Config{ + replicaPool.AddRecipe(id, recipe2.NewRecipe(recipe2.Config{ Dialer: replica, })) } diff --git a/lib/gat/handlers/pgbouncer/config.go b/lib/gat/handlers/pgbouncer/config.go index 6f3b179adbe9adf4901e9b04d18dc68f4f5cffcf..146c519aef270bceb6f6e997f813d3ffb15b77cf 100644 --- a/lib/gat/handlers/pgbouncer/config.go +++ b/lib/gat/handlers/pgbouncer/config.go @@ -8,7 +8,6 @@ import ( "github.com/caddyserver/caddy/v2/caddyconfig" - "gfx.cafe/gfx/pggat/lib/bouncer" "gfx.cafe/gfx/pggat/lib/gat" "gfx.cafe/gfx/pggat/lib/gat/ssl/servers/x509_key_pair" "gfx.cafe/gfx/pggat/lib/util/encoding/ini" @@ -111,7 +110,7 @@ type PgBouncer struct { DnsNxdomainTtl float64 `ini:"dns_nxdomain_ttl"` DnsZoneCheckPeriod float64 `ini:"dns_zone_check_period"` ResolvConf string `ini:"resolv.conf"` - ClientTLSSSLMode bouncer.SSLMode `ini:"client_tls_sslmode"` + ClientTLSSSLMode bounce.SSLMode `ini:"client_tls_sslmode"` ClientTLSKeyFile string `ini:"client_tls_key_file"` ClientTLSCertFile string `ini:"client_tls_cert_file"` ClientTLSCaFile string `ini:"client_tls_ca_file"` @@ -119,7 +118,7 @@ type PgBouncer struct { ClientTLSCiphers []TLSCipher `ini:"client_tls_ciphers"` ClientTLSECDHCurve TLSECDHCurve `ini:"client_tls_ecdhcurve"` ClientTLSDHEParams TLSDHEParams `ini:"client_tls_dheparams"` - ServerTLSSSLMode bouncer.SSLMode `ini:"server_tls_sslmode"` + ServerTLSSSLMode bounce.SSLMode `ini:"server_tls_sslmode"` ServerTLSCaFile string `ini:"server_tls_ca_file"` ServerTLSKeyFile string `ini:"server_tls_key_file"` ServerTLSCertFile string `ini:"server_tls_cert_file"` @@ -211,7 +210,7 @@ var Default = Config{ AutodbIdleTimeout: 3600.0, DnsMaxTtl: 15.0, DnsNxdomainTtl: 15.0, - ClientTLSSSLMode: bouncer.SSLModeDisable, + ClientTLSSSLMode: bounce.SSLModeDisable, ClientTLSProtocols: []TLSProtocol{ TLSProtocolSecure, }, @@ -219,7 +218,7 @@ var Default = Config{ "fast", }, ClientTLSECDHCurve: "auto", - ServerTLSSSLMode: bouncer.SSLModePrefer, + ServerTLSSSLMode: bounce.SSLModePrefer, ServerTLSProtocols: []TLSProtocol{ TLSProtocolSecure, }, diff --git a/lib/gat/handlers/pgbouncer/module.go b/lib/gat/handlers/pgbouncer/module.go index a5a8f025fcdc70e1c491edd94bd21d854f08dc38..962c45f3350613ff84c6d1733b24b3328892b224 100644 --- a/lib/gat/handlers/pgbouncer/module.go +++ b/lib/gat/handlers/pgbouncer/module.go @@ -19,14 +19,15 @@ import ( "gfx.cafe/gfx/pggat/lib/gat/poolers/session" "gfx.cafe/gfx/pggat/lib/gat/poolers/transaction" "gfx.cafe/gfx/pggat/lib/perror" + recipe2 "gfx.cafe/gfx/pggat/lib/pool/recipe" "gfx.cafe/gfx/pggat/lib/util/dur" "gfx.cafe/gfx/pggat/lib/util/slices" + "gfx.cafe/gfx/pggat/lib/gat/pool" + "gfx.cafe/gfx/pggat/lib/auth/credentials" "gfx.cafe/gfx/pggat/lib/gat" "gfx.cafe/gfx/pggat/lib/gat/metrics" - "gfx.cafe/gfx/pggat/lib/gat/pool" - "gfx.cafe/gfx/pggat/lib/gat/pool/recipe" "gfx.cafe/gfx/pggat/lib/gsql" "gfx.cafe/gfx/pggat/lib/util/maps" "gfx.cafe/gfx/pggat/lib/util/strutil" @@ -214,7 +215,7 @@ func (T *Module) tryCreate(user, database string) (pool.WithCredentials, bool) { serverCreds = credentials.FromString(user, db.Password) } - dialer := recipe.Dialer{ + dialer := recipe2.Dialer{ SSLMode: T.Config.PgBouncer.ServerTLSSSLMode, SSLConfig: &tls.Config{ InsecureSkipVerify: true, // TODO(garet) @@ -254,7 +255,7 @@ func (T *Module) tryCreate(user, database string) (pool.WithCredentials, bool) { dialer.Address = address } - recipeOptions := recipe.Config{ + recipeOptions := recipe2.Config{ Dialer: dialer, MinConnections: db.MinPoolSize, MaxConnections: db.MaxDBConnections, @@ -265,7 +266,7 @@ func (T *Module) tryCreate(user, database string) (pool.WithCredentials, bool) { if recipeOptions.MaxConnections == 0 { recipeOptions.MaxConnections = T.Config.PgBouncer.MaxDBConnections } - r := recipe.NewRecipe(recipeOptions) + r := recipe2.NewRecipe(recipeOptions) p.AddRecipe("pgbouncer", r) diff --git a/lib/gat/handlers/pgbouncer_spilo/module.go b/lib/gat/handlers/pgbouncer_spilo/module.go index dc41720b5b91196e3dd19d2f6d636d5dc5412f23..a71759bd84f9544178c9981d8cbf8d0158faf13b 100644 --- a/lib/gat/handlers/pgbouncer_spilo/module.go +++ b/lib/gat/handlers/pgbouncer_spilo/module.go @@ -7,7 +7,6 @@ import ( "gfx.cafe/gfx/pggat/lib/gat/handlers/pgbouncer" - "gfx.cafe/gfx/pggat/lib/bouncer" "gfx.cafe/gfx/pggat/lib/gat" "gfx.cafe/gfx/pggat/lib/util/strutil" @@ -52,12 +51,12 @@ func (T *Module) Provision(ctx caddy.Context) error { pgb.PgBouncer.LogFile = "/var/log/pgbouncer/pgbouncer.log" pgb.PgBouncer.PidFile = "/var/run/pgbouncer/pgbouncer.pid" - pgb.PgBouncer.ServerTLSSSLMode = bouncer.SSLModeRequire + pgb.PgBouncer.ServerTLSSSLMode = bounce.SSLModeRequire pgb.PgBouncer.ServerTLSCaFile = "/etc/ssl/certs/pgbouncer.crt" pgb.PgBouncer.ServerTLSProtocols = []pgbouncer.TLSProtocol{ pgbouncer.TLSProtocolSecure, } - pgb.PgBouncer.ClientTLSSSLMode = bouncer.SSLModeRequire + pgb.PgBouncer.ClientTLSSSLMode = bounce.SSLModeRequire pgb.PgBouncer.ClientTLSKeyFile = "/etc/ssl/certs/pgbouncer.key" pgb.PgBouncer.ClientTLSCertFile = "/etc/ssl/certs/pgbouncer.crt" diff --git a/lib/gat/handlers/pool/config.go b/lib/gat/handlers/pool/config.go index 5a2fc336fd46a6aee54c26118aa067f7cdb855c4..49aa27a3df855a39e970a91499665d0116142b0f 100644 --- a/lib/gat/handlers/pool/config.go +++ b/lib/gat/handlers/pool/config.go @@ -2,8 +2,6 @@ package pool_handler import ( "encoding/json" - - "gfx.cafe/gfx/pggat/lib/bouncer" ) type Config struct { @@ -11,7 +9,7 @@ type Config struct { // Server connect options ServerAddress string `jsonn:"server_address"` - ServerSSLMode bouncer.SSLMode `json:"server_ssl_mode,omitempty"` + ServerSSLMode bounce.SSLMode `json:"server_ssl_mode,omitempty"` ServerSSL json.RawMessage `json:"server_ssl,omitempty" caddy:"namespace=pggat.ssl.clients inline_key=provider"` // Server routing options diff --git a/lib/gat/handlers/pool/module.go b/lib/gat/handlers/pool/module.go index 61b0085a7abd477aac62cd69d0d83d37df226374..bf627ddb690b74ecda5cb8536527a0d801458259 100644 --- a/lib/gat/handlers/pool/module.go +++ b/lib/gat/handlers/pool/module.go @@ -12,7 +12,7 @@ import ( "gfx.cafe/gfx/pggat/lib/fed" "gfx.cafe/gfx/pggat/lib/gat" "gfx.cafe/gfx/pggat/lib/gat/metrics" - "gfx.cafe/gfx/pggat/lib/gat/pool/recipe" + recipe2 "gfx.cafe/gfx/pggat/lib/pool/recipe" "gfx.cafe/gfx/pggat/lib/util/strutil" ) @@ -65,7 +65,7 @@ func (T *Module) Provision(ctx caddy.Context) error { network = "tcp" } - d := recipe.Dialer{ + d := recipe2.Dialer{ Network: network, Address: T.ServerAddress, SSLMode: T.ServerSSLMode, @@ -77,7 +77,7 @@ func (T *Module) Provision(ctx caddy.Context) error { } T.pool = pooler.NewPool() - T.pool.AddRecipe("pool", recipe.NewRecipe(recipe.Config{ + T.pool.AddRecipe("pool", recipe2.NewRecipe(recipe2.Config{ Dialer: d, })) diff --git a/lib/gat/pool/client.go b/lib/gat/pool/client.go deleted file mode 100644 index c6d3c76e45367c7843c3ad651a9bf694c538f3c8..0000000000000000000000000000000000000000 --- a/lib/gat/pool/client.go +++ /dev/null @@ -1,55 +0,0 @@ -package pool - -import ( - "gfx.cafe/gfx/pggat/lib/fed" - "gfx.cafe/gfx/pggat/lib/fed/middlewares/eqp" - "gfx.cafe/gfx/pggat/lib/fed/middlewares/ps" - "gfx.cafe/gfx/pggat/lib/fed/middlewares/unterminate" -) - -type pooledClient struct { - pooledConn - - ps *ps.Client - eqp *eqp.Client -} - -func newClient( - options Config, - conn *fed.Conn, -) *pooledClient { - conn.Middleware = append( - conn.Middleware, - unterminate.Unterminate, - ) - - var psClient *ps.Client - if options.ParameterStatusSync == ParameterStatusSyncDynamic { - // add ps middleware - psClient = ps.NewClient(conn.InitialParameters) - conn.Middleware = append(conn.Middleware, psClient) - } - - var eqpClient *eqp.Client - if options.ExtendedQuerySync { - // add eqp middleware - eqpClient = eqp.NewClient() - conn.Middleware = append(conn.Middleware, eqpClient) - } - - return &pooledClient{ - pooledConn: makeConn( - conn, - ), - ps: psClient, - eqp: eqpClient, - } -} - -func (T *pooledClient) GetEQP() *eqp.Client { - return T.eqp -} - -func (T *pooledClient) GetPS() *ps.Client { - return T.ps -} diff --git a/lib/gat/pool/config.go b/lib/gat/pool/config.go deleted file mode 100644 index 36604526c2916b47a33c13b5e2dff5a9a81ae8d8..0000000000000000000000000000000000000000 --- a/lib/gat/pool/config.go +++ /dev/null @@ -1,61 +0,0 @@ -package pool - -import ( - "go.uber.org/zap" - - "gfx.cafe/gfx/pggat/lib/util/dur" - "gfx.cafe/gfx/pggat/lib/util/strutil" -) - -type ParameterStatusSync int - -const ( - // ParameterStatusSyncNone does not attempt to sync parameter status. - ParameterStatusSyncNone ParameterStatusSync = iota - // ParameterStatusSyncInitial assumes both client and server have their initial status before syncing. - // Use in session pooling for lower latency - ParameterStatusSyncInitial - // ParameterStatusSyncDynamic will track parameter status and ensure they are synced correctly. - // Use in transaction pooling - ParameterStatusSyncDynamic -) - -type PoolingConfig struct { - NewPooler func() Pooler - // ReleaseAfterTransaction toggles whether servers should be released and re acquired after each transaction. - // Use false for lower latency - // Use true for better balancing - ReleaseAfterTransaction bool - - // ParameterStatusSync is the parameter syncing mode - ParameterStatusSync ParameterStatusSync - - // ExtendedQuerySync controls whether prepared statements and portals should be tracked and synced before use. - // Use false for lower latency - // Use true for transaction pooling - ExtendedQuerySync bool -} - -type ManagementConfig struct { - ServerResetQuery string `json:"server_reset_query,omitempty"` - // ServerIdleTimeout defines how long a server may be idle before it is disconnected - ServerIdleTimeout dur.Duration `json:"server_idle_timeout,omitempty"` - - // ServerReconnectInitialTime defines how long to wait initially before attempting a server reconnect - // 0 = disable, don't retry - ServerReconnectInitialTime dur.Duration `json:"server_reconnect_initial_time,omitempty"` - // ServerReconnectMaxTime defines the max amount of time to wait before attempting a server reconnect - // 0 = disable, back off infinitely - ServerReconnectMaxTime dur.Duration `json:"server_reconnect_max_time,omitempty"` - - // TrackedParameters are parameters which should be synced by updating the server, not the client. - TrackedParameters []strutil.CIString `json:"tracked_parameters,omitempty"` -} - -type Config struct { - PoolingConfig - - ManagementConfig - - Logger *zap.Logger -} diff --git a/lib/gat/pool/conn.go b/lib/gat/pool/conn.go deleted file mode 100644 index 065a645895951e6693e756a2054e03f45f89b031..0000000000000000000000000000000000000000 --- a/lib/gat/pool/conn.go +++ /dev/null @@ -1,120 +0,0 @@ -package pool - -import ( - "sync" - "sync/atomic" - "time" - - "github.com/google/uuid" - - "gfx.cafe/gfx/pggat/lib/fed" - "gfx.cafe/gfx/pggat/lib/gat/metrics" - "gfx.cafe/gfx/pggat/lib/util/strutil" -) - -type pooledConn struct { - id uuid.UUID - - conn *fed.Conn - - // metrics - - transactionCount atomic.Int64 - - lastMetricsRead time.Time - - state metrics.ConnState - peer uuid.UUID - since time.Time - - util [metrics.ConnStateCount]time.Duration - - mu sync.RWMutex -} - -func makeConn( - conn *fed.Conn, -) pooledConn { - return pooledConn{ - id: uuid.New(), - conn: conn, - - since: time.Now(), - } -} - -func (T *pooledConn) GetID() uuid.UUID { - return T.id -} - -func (T *pooledConn) GetConn() *fed.Conn { - return T.conn -} - -func (T *pooledConn) GetInitialParameters() map[strutil.CIString]string { - return T.conn.InitialParameters -} - -func (T *pooledConn) GetBackendKey() [8]byte { - return T.conn.BackendKey -} - -func (T *pooledConn) TransactionComplete() { - T.transactionCount.Add(1) -} - -func (T *pooledConn) SetState(state metrics.ConnState, peer uuid.UUID) { - T.mu.Lock() - defer T.mu.Unlock() - - now := time.Now() - - var since time.Duration - if T.since.Before(T.lastMetricsRead) { - since = now.Sub(T.lastMetricsRead) - } else { - since = now.Sub(T.since) - } - T.util[T.state] += since - - T.state = state - T.peer = peer - T.since = now -} - -func (T *pooledConn) GetState() (state metrics.ConnState, peer uuid.UUID, since time.Time) { - T.mu.RLock() - defer T.mu.RUnlock() - state = T.state - peer = T.peer - since = T.since - return -} - -func (T *pooledConn) ReadMetrics(m *metrics.Conn) { - T.mu.Lock() - defer T.mu.Unlock() - - now := time.Now() - - m.Time = now - - m.State = T.state - m.Peer = T.peer - m.Since = T.since - - m.Utilization = T.util - T.util = [metrics.ConnStateCount]time.Duration{} - - var since time.Duration - if m.Since.Before(T.lastMetricsRead) { - since = now.Sub(T.lastMetricsRead) - } else { - since = now.Sub(m.Since) - } - m.Utilization[m.State] += since - - m.TransactionCount = int(T.transactionCount.Swap(0)) - - T.lastMetricsRead = now -} diff --git a/lib/gat/pool/errors.go b/lib/gat/pool/errors.go deleted file mode 100644 index b0a18cecdeae5d460c0c6df65417ac908aaf926b..0000000000000000000000000000000000000000 --- a/lib/gat/pool/errors.go +++ /dev/null @@ -1,5 +0,0 @@ -package pool - -import "errors" - -var ErrClosed = errors.New("pool closed") diff --git a/lib/gat/pool/flow.go b/lib/gat/pool/flow.go deleted file mode 100644 index e12636210e444859639229cf308bb4e48c2b6cec..0000000000000000000000000000000000000000 --- a/lib/gat/pool/flow.go +++ /dev/null @@ -1,115 +0,0 @@ -package pool - -import ( - "gfx.cafe/gfx/pggat/lib/bouncer/backends/v0" - "gfx.cafe/gfx/pggat/lib/fed" - "gfx.cafe/gfx/pggat/lib/fed/middlewares/eqp" - "gfx.cafe/gfx/pggat/lib/fed/middlewares/ps" - packets "gfx.cafe/gfx/pggat/lib/fed/packets/v3.0" - "gfx.cafe/gfx/pggat/lib/gat/metrics" - "gfx.cafe/gfx/pggat/lib/util/slices" -) - -func pair(options Config, client *pooledClient, server *pooledServer) (clientErr, serverErr error) { - defer func() { - client.SetState(metrics.ConnStateActive, server.GetID()) - server.SetState(metrics.ConnStateActive, client.GetID()) - }() - - if options.ParameterStatusSync != ParameterStatusSyncNone || options.ExtendedQuerySync { - client.SetState(metrics.ConnStatePairing, server.GetID()) - server.SetState(metrics.ConnStatePairing, client.GetID()) - } - - switch options.ParameterStatusSync { - case ParameterStatusSyncDynamic: - clientErr, serverErr = ps.Sync(options.TrackedParameters, client.GetConn(), client.GetPS(), server.GetConn(), server.GetPS()) - case ParameterStatusSyncInitial: - clientErr, serverErr = syncInitialParameters(options, client, server) - } - - if clientErr != nil || serverErr != nil { - return - } - - if options.ExtendedQuerySync { - serverErr = eqp.Sync(client.GetEQP(), server.GetConn(), server.GetEQP()) - } - - return -} - -func syncInitialParameters(options Config, client *pooledClient, server *pooledServer) (clientErr, serverErr error) { - clientParams := client.GetInitialParameters() - serverParams := server.GetInitialParameters() - - var packet fed.Packet - - for key, value := range clientParams { - // skip already set params - if serverParams[key] == value { - p := packets.ParameterStatus{ - Key: key.String(), - Value: serverParams[key], - } - packet = p.IntoPacket(packet) - clientErr = client.GetConn().WritePacket(packet) - if clientErr != nil { - return - } - continue - } - - setServer := slices.Contains(options.TrackedParameters, key) - - if !setServer { - value = serverParams[key] - } - - p := packets.ParameterStatus{ - Key: key.String(), - Value: value, - } - packet = p.IntoPacket(packet) - clientErr = client.GetConn().WritePacket(packet) - if clientErr != nil { - return - } - - if !setServer { - continue - } - - serverErr, _, packet = backends.SetParameter(server.GetConn(), nil, packet, key, value) - if serverErr != nil { - return - } - } - - for key, value := range serverParams { - if _, ok := clientParams[key]; ok { - continue - } - - // Don't need to run reset on server because it will reset it to the initial value - - // send to client - p := packets.ParameterStatus{ - Key: key.String(), - Value: value, - } - packet = p.IntoPacket(packet) - clientErr = client.GetConn().WritePacket(packet) - if clientErr != nil { - return - } - } - - return - -} - -func transactionComplete(client *pooledClient, server *pooledServer) { - client.TransactionComplete() - server.TransactionComplete() -} diff --git a/lib/gat/pool/pool.go b/lib/gat/pool/pool.go deleted file mode 100644 index 8399e39b80bb5ba6e913de1415d8a4dbafc9b7c1..0000000000000000000000000000000000000000 --- a/lib/gat/pool/pool.go +++ /dev/null @@ -1,497 +0,0 @@ -package pool - -import ( - "errors" - "sync" - "sync/atomic" - "time" - - "github.com/google/uuid" - "go.uber.org/zap" - - "gfx.cafe/gfx/pggat/lib/bouncer/backends/v0" - "gfx.cafe/gfx/pggat/lib/bouncer/bouncers/v2" - "gfx.cafe/gfx/pggat/lib/fed" - packets "gfx.cafe/gfx/pggat/lib/fed/packets/v3.0" - "gfx.cafe/gfx/pggat/lib/gat/metrics" - "gfx.cafe/gfx/pggat/lib/util/slices" -) - -type Pool struct { - config Config - pooler Pooler - - closed chan struct{} - - pendingCount atomic.Int64 - pending chan struct{} - - recipes map[string]*Recipe - recipeScaleOrder slices.Sorted[string] - clients map[uuid.UUID]*pooledClient - clientsByKey map[[8]byte]*pooledClient - servers map[uuid.UUID]*pooledServer - serversByRecipe map[string][]*pooledServer - mu sync.RWMutex -} - -func NewPool(config Config) *Pool { - if config.NewPooler == nil { - panic("expected new pooler func") - } - pooler := config.NewPooler() - if pooler == nil { - panic("expected pooler") - } - - p := &Pool{ - config: config, - pooler: pooler, - - closed: make(chan struct{}), - pending: make(chan struct{}, 1), - } - - s := newScaler(p) - go s.Run() - - return p -} - -func (T *Pool) idlest() (server *pooledServer, at time.Time) { - T.mu.RLock() - defer T.mu.RUnlock() - - for _, s := range T.servers { - state, _, since := s.GetState() - if state != metrics.ConnStateIdle { - continue - } - - if at == (time.Time{}) || since.Before(at) { - server = s - at = since - } - } - - return -} - -func (T *Pool) AddRecipe(name string, r *Recipe) { - func() { - T.mu.Lock() - defer T.mu.Unlock() - - T.removeRecipe(name) - - if T.recipes == nil { - T.recipes = make(map[string]*Recipe) - } - T.recipes[name] = r - - // add to front of scale order - T.recipeScaleOrder = T.recipeScaleOrder.Insert(name, func(n string) int { - return len(T.serversByRecipe[n]) - }) - }() - - count := r.AllocateInitial() - for i := 0; i < count; i++ { - if err := T.scaleUpL1(name, r); err != nil { - T.config.Logger.Warn("failed to dial server", zap.Error(err)) - for j := i; j < count; j++ { - r.Free() - } - break - } - } -} - -func (T *Pool) RemoveRecipe(name string) { - T.mu.Lock() - defer T.mu.Unlock() - - T.removeRecipe(name) -} - -func (T *Pool) removeRecipe(name string) { - r, ok := T.recipes[name] - if !ok { - return - } - delete(T.recipes, name) - - servers := T.serversByRecipe[name] - delete(T.serversByRecipe, name) - // remove from recipeScaleOrder - T.recipeScaleOrder = slices.Delete(T.recipeScaleOrder, name) - - for _, server := range servers { - r.Free() - T.removeServerL1(server) - } -} - -func (T *Pool) scaleUpL0() (string, *Recipe) { - T.mu.RLock() - defer T.mu.RUnlock() - - for _, name := range T.recipeScaleOrder { - r := T.recipes[name] - if r.Allocate() { - return name, r - } - } - - return "", nil -} - -func (T *Pool) scaleUpL1(name string, r *Recipe) error { - conn, err := r.Dial() - if err != nil { - // failed to dial - r.Free() - return err - } - - server, err := func() (*pooledServer, error) { - T.mu.Lock() - defer T.mu.Unlock() - if T.recipes[name] != r { - // recipe was removed - r.Free() - return nil, errors.New("recipe was removed") - } - - server := newServer( - T.config, - name, - conn, - ) - - if T.servers == nil { - T.servers = make(map[uuid.UUID]*pooledServer) - } - T.servers[server.GetID()] = server - - if T.serversByRecipe == nil { - T.serversByRecipe = make(map[string][]*pooledServer) - } - T.serversByRecipe[name] = append(T.serversByRecipe[name], server) - // update order - T.recipeScaleOrder.Update(slices.Index(T.recipeScaleOrder, name), func(n string) int { - return len(T.serversByRecipe[n]) - }) - return server, nil - }() - - if err != nil { - return err - } - - T.pooler.AddServer(server.GetID()) - return nil -} - -func (T *Pool) scaleUp() bool { - name, r := T.scaleUpL0() - if r == nil { - return false - } - - err := T.scaleUpL1(name, r) - if err != nil { - T.config.Logger.Warn("failed to dial server", zap.Error(err)) - return false - } - - return true -} - -func (T *Pool) removeServer(server *pooledServer) { - T.mu.Lock() - defer T.mu.Unlock() - - T.removeServerL1(server) -} - -func (T *Pool) removeServerL1(server *pooledServer) { - delete(T.servers, server.GetID()) - T.pooler.DeleteServer(server.GetID()) - _ = server.GetConn().Close() - if T.serversByRecipe != nil { - name := server.GetRecipe() - T.serversByRecipe[name] = slices.Delete(T.serversByRecipe[name], server) - // update order - index := slices.Index(T.recipeScaleOrder, name) - if index != -1 { - T.recipeScaleOrder.Update(index, func(n string) int { - return len(T.serversByRecipe[n]) - }) - } - } -} - -func (T *Pool) acquireServer(client *pooledClient) *pooledServer { - client.SetState(metrics.ConnStateAwaitingServer, uuid.Nil) - - for { - serverID := T.pooler.Acquire(client.GetID(), SyncModeNonBlocking) - if serverID == uuid.Nil { - T.pendingCount.Add(1) - select { - case T.pending <- struct{}{}: - default: - } - serverID = T.pooler.Acquire(client.GetID(), SyncModeBlocking) - T.pendingCount.Add(-1) - if serverID == uuid.Nil { - return nil - } - } - - T.mu.RLock() - server, ok := T.servers[serverID] - T.mu.RUnlock() - if !ok { - T.pooler.DeleteServer(serverID) - continue - } - return server - } -} - -func (T *Pool) releaseServer(server *pooledServer) { - if T.config.ServerResetQuery != "" { - server.SetState(metrics.ConnStateRunningResetQuery, uuid.Nil) - - err, _, _ := backends.QueryString(server.GetConn(), nil, nil, T.config.ServerResetQuery) - if err != nil { - T.removeServer(server) - return - } - } - - server.SetState(metrics.ConnStateIdle, uuid.Nil) - - T.pooler.Release(server.GetID()) -} - -func (T *Pool) Serve( - conn *fed.Conn, -) error { - defer func() { - _ = conn.Close() - }() - - client := newClient( - T.config, - conn, - ) - - return T.serve(client, false) -} - -// ServeBot is for clients that don't need initial parameters, cancelling queries, and are ready now. Use Serve for -// real clients -func (T *Pool) ServeBot( - conn fed.ReadWriteCloser, -) error { - defer func() { - _ = conn.Close() - }() - - client := newClient( - T.config, - &fed.Conn{ - ReadWriteCloser: conn, - }, - ) - - return T.serve(client, true) -} - -func (T *Pool) serve(client *pooledClient, initialized bool) error { - T.addClient(client) - defer T.removeClient(client) - - var err error - var serverErr error - - var server *pooledServer - defer func() { - if server != nil { - if serverErr != nil { - T.removeServer(server) - } else { - T.releaseServer(server) - } - server = nil - } - }() - - var packet fed.Packet - - if !initialized { - server = T.acquireServer(client) - if server == nil { - return ErrClosed - } - - err, serverErr = pair(T.config, client, server) - if serverErr != nil { - return serverErr - } - if err != nil { - return err - } - - p := packets.ReadyForQuery('I') - packet = p.IntoPacket(packet) - err = client.GetConn().WritePacket(packet) - if err != nil { - return err - } - } - - for { - if server != nil && T.config.ReleaseAfterTransaction { - client.SetState(metrics.ConnStateIdle, uuid.Nil) - T.releaseServer(server) - server = nil - } - - packet, err = client.GetConn().ReadPacket(true, packet) - if err != nil { - return err - } - - if server == nil { - server = T.acquireServer(client) - if server == nil { - return ErrClosed - } - - err, serverErr = pair(T.config, client, server) - } - if err == nil && serverErr == nil { - packet, err, serverErr = bouncers.Bounce(client.GetConn(), server.GetConn(), packet) - } - if serverErr != nil { - return serverErr - } else { - transactionComplete(client, server) - } - - if err != nil { - return err - } - } -} - -func (T *Pool) addClient(client *pooledClient) { - T.mu.Lock() - defer T.mu.Unlock() - - if T.clients == nil { - T.clients = make(map[uuid.UUID]*pooledClient) - } - T.clients[client.GetID()] = client - if T.clientsByKey == nil { - T.clientsByKey = make(map[[8]byte]*pooledClient) - } - T.clientsByKey[client.GetBackendKey()] = client - T.pooler.AddClient(client.GetID()) -} - -func (T *Pool) removeClient(client *pooledClient) { - T.mu.Lock() - defer T.mu.Unlock() - - T.removeClientL1(client) -} - -func (T *Pool) removeClientL1(client *pooledClient) { - T.pooler.DeleteClient(client.GetID()) - _ = client.conn.Close() - delete(T.clients, client.GetID()) - delete(T.clientsByKey, client.GetBackendKey()) -} - -func (T *Pool) Cancel(key [8]byte) { - T.mu.RLock() - defer T.mu.RUnlock() - - client, ok := T.clientsByKey[key] - if !ok { - return - } - - state, peer, _ := client.GetState() - if state != metrics.ConnStateActive { - return - } - - server, ok := T.servers[peer] - if !ok { - return - } - - // prevent state from changing by RLocking the server - server.mu.RLock() - defer server.mu.RUnlock() - - // make sure peer is still set - if server.peer != peer { - return - } - - r, ok := T.recipes[server.recipe] - if !ok { - return - } - - r.Cancel(server.GetBackendKey()) -} - -func (T *Pool) ReadMetrics(m *metrics.Pool) { - T.mu.RLock() - defer T.mu.RUnlock() - - if len(T.clients) != 0 && m.Clients == nil { - m.Clients = make(map[uuid.UUID]metrics.Conn) - } - if len(T.servers) != 0 && m.Servers == nil { - m.Servers = make(map[uuid.UUID]metrics.Conn) - } - - for id, client := range T.clients { - var mc metrics.Conn - client.ReadMetrics(&mc) - m.Clients[id] = mc - } - - for id, server := range T.servers { - var mc metrics.Conn - server.ReadMetrics(&mc) - m.Servers[id] = mc - } -} - -func (T *Pool) Close() { - close(T.closed) - T.pooler.Close() - - T.mu.Lock() - defer T.mu.Unlock() - - // remove clients - for _, client := range T.clients { - T.removeClientL1(client) - } - - // remove recipes - for name := range T.recipes { - T.removeRecipe(name) - } -} diff --git a/lib/gat/pool/recipe.go b/lib/gat/pool/recipe.go deleted file mode 100644 index cfa70d173b7345e501ba653fd41a46d18f977969..0000000000000000000000000000000000000000 --- a/lib/gat/pool/recipe.go +++ /dev/null @@ -1,5 +0,0 @@ -package pool - -import "gfx.cafe/gfx/pggat/lib/gat/pool/recipe" - -type Recipe = recipe.Recipe diff --git a/lib/gat/pool/scaler.go b/lib/gat/pool/scaler.go deleted file mode 100644 index 8beb64d3097d341369f26858cdb507dfb0bfb630..0000000000000000000000000000000000000000 --- a/lib/gat/pool/scaler.go +++ /dev/null @@ -1,115 +0,0 @@ -package pool - -import ( - "time" - - "go.uber.org/zap" -) - -type scaler struct { - pool *Pool - - backingOff bool - backoff time.Duration - - // timers - idle *time.Timer - pending *time.Timer -} - -func newScaler(pool *Pool) *scaler { - s := &scaler{ - pool: pool, - backoff: pool.config.ServerReconnectInitialTime.Duration(), - } - - if pool.config.ServerIdleTimeout != 0 { - s.idle = time.NewTimer(pool.config.ServerIdleTimeout.Duration()) - } - - return s -} - -func (T *scaler) idleTimeout(now time.Time) { - // idle loop for scaling down - var wait time.Duration - - var idlest *pooledServer - var idleStart time.Time - for idlest, idleStart = T.pool.idlest(); idlest != nil && now.Sub(idleStart) > T.pool.config.ServerIdleTimeout.Duration(); idlest, idleStart = T.pool.idlest() { - T.pool.removeServer(idlest) - } - - if idlest == nil { - wait = T.pool.config.ServerIdleTimeout.Duration() - } else { - wait = idleStart.Add(T.pool.config.ServerIdleTimeout.Duration()).Sub(now) - } - - T.idle.Reset(wait) -} - -func (T *scaler) pendingTimeout() { - if T.backingOff { - T.backoff *= 2 - if T.pool.config.ServerReconnectMaxTime != 0 && T.backoff > T.pool.config.ServerReconnectMaxTime.Duration() { - T.backoff = T.pool.config.ServerReconnectMaxTime.Duration() - } - } - - for T.pool.pendingCount.Load() > 0 { - // pending loop for scaling up - if T.pool.scaleUp() { - // scale up successful, see if we need to scale up more - T.backoff = T.pool.config.ServerReconnectInitialTime.Duration() - T.backingOff = false - continue - } - - if T.backoff == 0 { - // no backoff - T.backoff = T.pool.config.ServerReconnectInitialTime.Duration() - T.backingOff = false - continue - } - - T.backingOff = true - if T.pending == nil { - T.pending = time.NewTimer(T.backoff) - } else { - T.pending.Reset(T.backoff) - } - - T.pool.config.Logger.Warn("failed to dial server", zap.Duration("backoff", T.backoff)) - - return - } -} - -func (T *scaler) Run() { - for { - var idle <-chan time.Time - if T.idle != nil { - idle = T.idle.C - } - - var pending1 <-chan struct{} - var pending2 <-chan time.Time - if T.backingOff { - pending2 = T.pending.C - } else { - pending1 = T.pool.pending - } - - select { - case t := <-idle: - T.idleTimeout(t) - case <-pending1: - T.pendingTimeout() - case <-pending2: - T.pendingTimeout() - case <-T.pool.closed: - return - } - } -} diff --git a/lib/gat/pool/server.go b/lib/gat/pool/server.go deleted file mode 100644 index 9afa7c994359bd6157c0df746dc368efd1c6500c..0000000000000000000000000000000000000000 --- a/lib/gat/pool/server.go +++ /dev/null @@ -1,57 +0,0 @@ -package pool - -import ( - "gfx.cafe/gfx/pggat/lib/fed" - "gfx.cafe/gfx/pggat/lib/fed/middlewares/eqp" - "gfx.cafe/gfx/pggat/lib/fed/middlewares/ps" -) - -type pooledServer struct { - pooledConn - - recipe string - - ps *ps.Server - eqp *eqp.Server -} - -func newServer( - options Config, - recipe string, - conn *fed.Conn, -) *pooledServer { - var psServer *ps.Server - if options.ParameterStatusSync == ParameterStatusSyncDynamic { - // add ps middleware - psServer = ps.NewServer(conn.InitialParameters) - conn.Middleware = append(conn.Middleware, psServer) - } - - var eqpServer *eqp.Server - if options.ExtendedQuerySync { - // add eqp middleware - eqpServer = eqp.NewServer() - conn.Middleware = append(conn.Middleware, eqpServer) - } - - return &pooledServer{ - pooledConn: makeConn( - conn, - ), - recipe: recipe, - ps: psServer, - eqp: eqpServer, - } -} - -func (T *pooledServer) GetRecipe() string { - return T.recipe -} - -func (T *pooledServer) GetEQP() *eqp.Server { - return T.eqp -} - -func (T *pooledServer) GetPS() *ps.Server { - return T.ps -} diff --git a/lib/gat/pool/withcredentials.go b/lib/gat/pool/withcredentials.go deleted file mode 100644 index 9c7382b380788ed36b6df39d241f3b5498e50493..0000000000000000000000000000000000000000 --- a/lib/gat/pool/withcredentials.go +++ /dev/null @@ -1,8 +0,0 @@ -package pool - -import "gfx.cafe/gfx/pggat/lib/auth" - -type WithCredentials struct { - *Pool - Credentials auth.Credentials -} diff --git a/lib/pool/backend.go b/lib/pool/backend.go new file mode 100644 index 0000000000000000000000000000000000000000..d49aff6c5813ffeb256bbfc94b8500b5c8994aa0 --- /dev/null +++ b/lib/pool/backend.go @@ -0,0 +1,209 @@ +package pool + +import ( + "log" + "sync" + "sync/atomic" + "time" + + "gfx.cafe/gfx/pggat/lib/fed" + "gfx.cafe/gfx/pggat/lib/pool/recipe" +) + +type backendRecipe struct { + weight atomic.Int64 + recipe *recipe.Recipe + + servers []*fed.Conn // TODO(garet) + killed bool +} + +type Backend struct { + config BackendConfig + pooler Pooler + + closed chan struct{} + + scaleUp chan struct{} + waiters atomic.Int64 + + recipes map[string]*backendRecipe + mu sync.RWMutex +} + +func NewBackend(config BackendConfig) *Backend { + b := &Backend{ + config: config, + pooler: config.NewPooler(), + + closed: make(chan struct{}), + + scaleUp: make(chan struct{}, 1), + } + + go b.scaleLoop() + + return b +} + +func (T *Backend) addRecipe(name string, r *recipe.Recipe) { + if T.recipes == nil { + T.recipes = make(map[string]*backendRecipe) + } + T.recipes[name] = &backendRecipe{ + recipe: r, + } +} + +func (T *Backend) AddRecipe(name string, r *recipe.Recipe) { + T.mu.Lock() + defer T.mu.Unlock() + + T.removeRecipe(name) + T.addRecipe(name, r) +} + +func (T *Backend) removeRecipe(name string) { + r, ok := T.recipes[name] + if !ok { + return + } + delete(T.recipes, name) + + r.killed = true + for _, server := range r.servers { + _ = server.Close() + r.recipe.Free() + } + r.servers = r.servers[:0] +} + +func (T *Backend) RemoveRecipe(name string) { + T.mu.Lock() + defer T.mu.Unlock() + + T.removeRecipe(name) +} + +func (T *Backend) ScaleUp() error { + var target *backendRecipe + func() { + T.mu.RLock() + defer T.mu.RUnlock() + + var targetWeight int64 = -1 + for _, r := range T.recipes { + if target == nil { + target = r + continue + } + weight := r.weight.Load() + if weight > targetWeight { + target = r + targetWeight = weight + continue + } + } + + if !target.recipe.Allocate() { + return + } + target.weight.Add(1) + }() + + if target == nil { + return ErrNoScalableRecipe + } + + server, err := target.recipe.Dial() + if err != nil { + target.recipe.Free() + target.weight.Add(-1) + return err + } + + T.mu.Lock() + defer T.mu.Unlock() + + if target.killed { + target.recipe.Free() + target.weight.Add(-1) + return nil + } + + target.servers = append(target.servers, server) + return nil +} + +// ScaleDown attempts to scale down the pool. Returns when the next scale down should happen +func (T *Backend) ScaleDown() time.Duration { + // TODO(garet) + return T.config.IdleTimeout +} + +func (T *Backend) scaleLoop() { + var scaleUpBackoffDuration time.Duration + var scaleUpBackoff *time.Timer + + var idleTimeout *time.Timer + if T.config.IdleTimeout != 0 { + idleTimeout = time.NewTimer(T.config.IdleTimeout) + } + + defer func() { + if scaleUpBackoff != nil { + scaleUpBackoff.Stop() + } + if idleTimeout != nil { + idleTimeout.Stop() + } + }() + + for { + var scaleUpBackoffC <-chan time.Time + var scaleUpC <-chan struct{} + if scaleUpBackoffDuration != 0 { + scaleUpBackoffC = scaleUpBackoff.C + } else { + scaleUpC = T.scaleUp + } + + var idleTimeoutC <-chan time.Time + if idleTimeout != nil { + idleTimeoutC = idleTimeout.C + } + + select { + case <-scaleUpC: + if err := T.ScaleUp(); err != nil { + log.Printf("error scaling up: %v", err) + + scaleUpBackoffDuration = T.config.ReconnectInitialTime + if scaleUpBackoffDuration != 0 { + if scaleUpBackoff == nil { + scaleUpBackoff = time.NewTimer(scaleUpBackoffDuration) + } else { + scaleUpBackoff.Reset(scaleUpBackoffDuration) + } + } + } + case <-scaleUpBackoffC: + if err := T.ScaleUp(); err != nil { + log.Printf("error scaling up: %v", err) + + scaleUpBackoffDuration *= 2 + scaleUpBackoff.Reset(scaleUpBackoffDuration) + } else { + scaleUpBackoffDuration = 0 + } + case <-idleTimeoutC: + idleTimeout.Reset(T.ScaleDown()) + case <-T.closed: + return + } + } +} + +func (T *Backend) Close() { + close(T.closed) +} diff --git a/lib/pool/config.go b/lib/pool/config.go new file mode 100644 index 0000000000000000000000000000000000000000..8fa522a6fc393e08eb0843d82484f73eeedad591 --- /dev/null +++ b/lib/pool/config.go @@ -0,0 +1,12 @@ +package pool + +import "time" + +type BackendConfig struct { + NewPooler func() Pooler + + IdleTimeout time.Duration + + ReconnectInitialTime time.Duration + ReconnectMaxTime time.Duration +} diff --git a/lib/pool/errors.go b/lib/pool/errors.go new file mode 100644 index 0000000000000000000000000000000000000000..4fbd6ab2f03f43cd129f70c426f76ebbb545d2c1 --- /dev/null +++ b/lib/pool/errors.go @@ -0,0 +1,7 @@ +package pool + +import "errors" + +var ( + ErrNoScalableRecipe = errors.New("no scalable recipe") +) diff --git a/lib/pool/frontend.go b/lib/pool/frontend.go new file mode 100644 index 0000000000000000000000000000000000000000..799c93213f9a46d2cf811c29135eaf01aeaa6303 --- /dev/null +++ b/lib/pool/frontend.go @@ -0,0 +1,5 @@ +package pool + +type Frontend struct { + Backend *Backend +} diff --git a/lib/gat/pool/pooler.go b/lib/pool/pooler.go similarity index 100% rename from lib/gat/pool/pooler.go rename to lib/pool/pooler.go diff --git a/lib/gat/pool/recipe/config.go b/lib/pool/recipe/config.go similarity index 100% rename from lib/gat/pool/recipe/config.go rename to lib/pool/recipe/config.go diff --git a/lib/gat/pool/recipe/dialer.go b/lib/pool/recipe/dialer.go similarity index 91% rename from lib/gat/pool/recipe/dialer.go rename to lib/pool/recipe/dialer.go index 8202c2b9a6b1633fb835e7c092d9e8a84757ee69..3ce99919ecbbddb8eb939c6f09a450dd8b42aaac 100644 --- a/lib/gat/pool/recipe/dialer.go +++ b/lib/pool/recipe/dialer.go @@ -5,8 +5,6 @@ import ( "net" "gfx.cafe/gfx/pggat/lib/auth" - "gfx.cafe/gfx/pggat/lib/bouncer" - "gfx.cafe/gfx/pggat/lib/bouncer/backends/v0" "gfx.cafe/gfx/pggat/lib/fed" "gfx.cafe/gfx/pggat/lib/util/strutil" ) @@ -15,7 +13,7 @@ type Dialer struct { Network string Address string - SSLMode bouncer.SSLMode + SSLMode bounce.SSLMode SSLConfig *tls.Config Username string Credentials auth.Credentials diff --git a/lib/gat/pool/recipe/recipe.go b/lib/pool/recipe/recipe.go similarity index 100% rename from lib/gat/pool/recipe/recipe.go rename to lib/pool/recipe/recipe.go diff --git a/test/config.go b/test/config.go index bc13828dc196ddf6f3d9101ac0b4b12cd08081f7..e66fc44e5970502491ab742e820c4575d55a30de 100644 --- a/test/config.go +++ b/test/config.go @@ -1,7 +1,7 @@ package test import ( - "gfx.cafe/gfx/pggat/lib/gat/pool/recipe" + "gfx.cafe/gfx/pggat/lib/pool/recipe" ) type Config struct { diff --git a/test/runner.go b/test/runner.go index d23e94c631e58064f1d933793dcb7a782834a316..499127bc0044ba66d81f6f95d0d4e7696bba95b5 100644 --- a/test/runner.go +++ b/test/runner.go @@ -8,8 +8,8 @@ import ( "gfx.cafe/gfx/pggat/lib/bouncer/bouncers/v2" "gfx.cafe/gfx/pggat/lib/fed" packets "gfx.cafe/gfx/pggat/lib/fed/packets/v3.0" - "gfx.cafe/gfx/pggat/lib/gat/pool/recipe" "gfx.cafe/gfx/pggat/lib/gsql" + "gfx.cafe/gfx/pggat/lib/pool/recipe" "gfx.cafe/gfx/pggat/lib/util/flip" "gfx.cafe/gfx/pggat/test/inst" ) diff --git a/test/tester_test.go b/test/tester_test.go index 6ef9b43191e68e04c7132e452a46e5202fe78dbd..63c5cd0536e464f6a9c1bd1ff34ab3255316ba6e 100644 --- a/test/tester_test.go +++ b/test/tester_test.go @@ -12,16 +12,17 @@ import ( "github.com/caddyserver/caddy/v2" "github.com/caddyserver/caddy/v2/caddyconfig" + "gfx.cafe/gfx/pggat/lib/gat/pool" + "gfx.cafe/gfx/pggat/lib/auth/credentials" "gfx.cafe/gfx/pggat/lib/gat" "gfx.cafe/gfx/pggat/lib/gat/gatcaddyfile" pool_handler "gfx.cafe/gfx/pggat/lib/gat/handlers/pool" "gfx.cafe/gfx/pggat/lib/gat/handlers/rewrite_password" "gfx.cafe/gfx/pggat/lib/gat/matchers" - "gfx.cafe/gfx/pggat/lib/gat/pool" - "gfx.cafe/gfx/pggat/lib/gat/pool/recipe" "gfx.cafe/gfx/pggat/lib/gat/poolers/session" "gfx.cafe/gfx/pggat/lib/gat/poolers/transaction" + "gfx.cafe/gfx/pggat/lib/pool/recipe" "gfx.cafe/gfx/pggat/test" "gfx.cafe/gfx/pggat/test/tests" )