diff --git a/lib/gat/handlers/discovery/discoverers/google_cloud_sql/discoverer.go b/lib/gat/handlers/discovery/discoverers/google_cloud_sql/discoverer.go index 8bd3fa07d6e98783e6f557a109bc655f09f8ddb6..8994731a28f7583ddc3de0a6622443842b6b32d4 100644 --- a/lib/gat/handlers/discovery/discoverers/google_cloud_sql/discoverer.go +++ b/lib/gat/handlers/discovery/discoverers/google_cloud_sql/discoverer.go @@ -5,7 +5,6 @@ import ( "net" "strings" - "gfx.cafe/gfx/pggat/lib/gat/pool/recipe" "gfx.cafe/gfx/pggat/lib/util/flip" "github.com/caddyserver/caddy/v2" @@ -118,7 +117,7 @@ func (T *Discoverer) instanceToCluster(primary *sqladmin.DatabaseInstance, repli } else { // dial admin connection if admin == nil { - admin, err = recipe.Dialer{ + admin, err = pool.Dialer{ Network: "tcp", Address: primaryAddress, SSLMode: bouncer.SSLModePrefer, diff --git a/lib/gat/handlers/discovery/module.go b/lib/gat/handlers/discovery/module.go index 3652a482255fbb723c260cf9b57de265e4123248..87b6589737961c06df432ea32eb12f79a4ab0762 100644 --- a/lib/gat/handlers/discovery/module.go +++ b/lib/gat/handlers/discovery/module.go @@ -16,7 +16,6 @@ import ( "gfx.cafe/gfx/pggat/lib/gat" "gfx.cafe/gfx/pggat/lib/gat/metrics" "gfx.cafe/gfx/pggat/lib/gat/pool" - "gfx.cafe/gfx/pggat/lib/gat/pool/recipe" "gfx.cafe/gfx/pggat/lib/util/maps" "gfx.cafe/gfx/pggat/lib/util/slices" "gfx.cafe/gfx/pggat/lib/util/strutil" @@ -223,7 +222,7 @@ func (T *Module) replacePrimary(users []User, databases []string, endpoint Endpo for _, user := range users { primaryCreds, _ := T.creds(user) for _, database := range databases { - primary := recipe.Dialer{ + primary := pool.Dialer{ Network: endpoint.Network, Address: endpoint.Address, Username: user.Username, @@ -240,7 +239,7 @@ func (T *Module) replacePrimary(users []User, databases []string, endpoint Endpo } p.RemoveRecipe("primary") - p.AddRecipe("primary", recipe.NewRecipe(recipe.Config{ + p.AddRecipe("primary", pool.NewRecipe(pool.RecipeConfig{ Dialer: primary, })) } @@ -258,7 +257,7 @@ func (T *Module) addReplicas(replicas map[string]Endpoint, users []User, databas } for id, r := range replicas { - replica := recipe.Dialer{ + replica := pool.Dialer{ Network: r.Network, Address: r.Address, Username: user.Username, @@ -268,7 +267,7 @@ func (T *Module) addReplicas(replicas map[string]Endpoint, users []User, databas SSLConfig: T.sslConfig, StartupParameters: T.serverStartupParameters, } - replicaPool.AddRecipe(id, recipe.NewRecipe(recipe.Config{ + replicaPool.AddRecipe(id, pool.NewRecipe(pool.RecipeConfig{ Dialer: replica, })) } @@ -297,7 +296,7 @@ func (T *Module) addReplica(users []User, databases []string, id string, endpoin continue } - replica := recipe.Dialer{ + replica := pool.Dialer{ Network: endpoint.Network, Address: endpoint.Address, Username: user.Username, @@ -307,7 +306,7 @@ func (T *Module) addReplica(users []User, databases []string, id string, endpoin SSLConfig: T.sslConfig, StartupParameters: T.serverStartupParameters, } - p.AddRecipe(id, recipe.NewRecipe(recipe.Config{ + p.AddRecipe(id, pool.NewRecipe(pool.RecipeConfig{ Dialer: replica, })) } @@ -331,7 +330,7 @@ func (T *Module) addUser(primaryEndpoint Endpoint, replicas map[string]Endpoint, replicaUsername := T.replicaUsername(user.Username) primaryCreds, replicaCreds := T.creds(user) for _, database := range databases { - base := recipe.Dialer{ + base := pool.Dialer{ Username: user.Username, Credentials: primaryCreds, Database: database, @@ -348,7 +347,7 @@ func (T *Module) addUser(primaryEndpoint Endpoint, replicas map[string]Endpoint, Pool: T.pooler.NewPool(), Credentials: primaryCreds, } - primaryPool.AddRecipe("primary", recipe.NewRecipe(recipe.Config{ + primaryPool.AddRecipe("primary", pool.NewRecipe(pool.RecipeConfig{ Dialer: primary, })) T.addPool(user.Username, database, primaryPool) @@ -363,7 +362,7 @@ func (T *Module) addUser(primaryEndpoint Endpoint, replicas map[string]Endpoint, replica := base replica.Network = r.Network replica.Address = r.Address - replicaPool.AddRecipe(id, recipe.NewRecipe(recipe.Config{ + replicaPool.AddRecipe(id, pool.NewRecipe(pool.RecipeConfig{ Dialer: replica, })) } @@ -390,7 +389,7 @@ func (T *Module) addDatabase(primaryEndpoint Endpoint, replicas map[string]Endpo replicaUsername := T.replicaUsername(user.Username) primaryCreds, replicaCreds := T.creds(user) - base := recipe.Dialer{ + base := pool.Dialer{ Username: user.Username, Credentials: primaryCreds, Database: database, @@ -407,7 +406,7 @@ func (T *Module) addDatabase(primaryEndpoint Endpoint, replicas map[string]Endpo Pool: T.pooler.NewPool(), Credentials: primaryCreds, } - primaryPool.AddRecipe("primary", recipe.NewRecipe(recipe.Config{ + primaryPool.AddRecipe("primary", pool.NewRecipe(pool.RecipeConfig{ Dialer: primary, })) T.addPool(user.Username, database, primaryPool) @@ -422,7 +421,7 @@ func (T *Module) addDatabase(primaryEndpoint Endpoint, replicas map[string]Endpo replica := base replica.Network = r.Network replica.Address = r.Address - replicaPool.AddRecipe(id, recipe.NewRecipe(recipe.Config{ + replicaPool.AddRecipe(id, pool.NewRecipe(pool.RecipeConfig{ Dialer: replica, })) } diff --git a/lib/gat/handlers/pgbouncer/module.go b/lib/gat/handlers/pgbouncer/module.go index 30deb767c54710d265b82c5e0b9b270db97000a7..5ce5a7507627c0b4bcdf2ef42d00fbd937e53595 100644 --- a/lib/gat/handlers/pgbouncer/module.go +++ b/lib/gat/handlers/pgbouncer/module.go @@ -27,7 +27,6 @@ import ( "gfx.cafe/gfx/pggat/lib/gat" "gfx.cafe/gfx/pggat/lib/gat/metrics" "gfx.cafe/gfx/pggat/lib/gat/pool" - "gfx.cafe/gfx/pggat/lib/gat/pool/recipe" "gfx.cafe/gfx/pggat/lib/gsql" "gfx.cafe/gfx/pggat/lib/util/maps" "gfx.cafe/gfx/pggat/lib/util/strutil" @@ -222,7 +221,7 @@ func (T *Module) tryCreate(user, database string) (pool.WithCredentials, bool) { serverCreds = credentials.FromString(user, db.Password) } - dialer := recipe.Dialer{ + dialer := pool.Dialer{ SSLMode: T.Config.PgBouncer.ServerTLSSSLMode, SSLConfig: &tls.Config{ InsecureSkipVerify: true, // TODO(garet) @@ -262,7 +261,7 @@ func (T *Module) tryCreate(user, database string) (pool.WithCredentials, bool) { dialer.Address = address } - recipeOptions := recipe.Config{ + recipeOptions := pool.RecipeConfig{ Dialer: dialer, MinConnections: db.MinPoolSize, MaxConnections: db.MaxDBConnections, @@ -273,7 +272,7 @@ func (T *Module) tryCreate(user, database string) (pool.WithCredentials, bool) { if recipeOptions.MaxConnections == 0 { recipeOptions.MaxConnections = T.Config.PgBouncer.MaxDBConnections } - r := recipe.NewRecipe(recipeOptions) + r := pool.NewRecipe(recipeOptions) p.AddRecipe("pgbouncer", r) diff --git a/lib/gat/handlers/pool/module.go b/lib/gat/handlers/pool/module.go index ef57d02e42edbf73219a9b66155faaffd6fa7249..0dafc872c7147e24513d0f535e7d96ce3c47dfd7 100644 --- a/lib/gat/handlers/pool/module.go +++ b/lib/gat/handlers/pool/module.go @@ -12,7 +12,8 @@ import ( "gfx.cafe/gfx/pggat/lib/fed" "gfx.cafe/gfx/pggat/lib/gat" "gfx.cafe/gfx/pggat/lib/gat/metrics" - "gfx.cafe/gfx/pggat/lib/gat/pool/recipe" + "gfx.cafe/gfx/pggat/lib/gat/pool" + "gfx.cafe/gfx/pggat/lib/gat/pool/pools/basic" "gfx.cafe/gfx/pggat/lib/util/strutil" ) @@ -23,7 +24,7 @@ func init() { type Module struct { Config - pool *gat.Pool + pool *basic.Pool } func (*Module) CaddyModule() caddy.ModuleInfo { @@ -65,7 +66,7 @@ func (T *Module) Provision(ctx caddy.Context) error { network = "tcp" } - d := recipe.Dialer{ + d := pool.Dialer{ Network: network, Address: T.ServerAddress, SSLMode: T.ServerSSLMode, @@ -77,7 +78,7 @@ func (T *Module) Provision(ctx caddy.Context) error { } T.pool = pooler.NewPool() - T.pool.AddRecipe("pool", recipe.NewRecipe(recipe.Config{ + T.pool.AddRecipe("pool", pool.NewRecipe(pool.RecipeConfig{ Dialer: d, })) diff --git a/lib/gat/pool.go b/lib/gat/pool.go index e3f96693e34a5bd3c83e2cbe073cd86ce154769a..31778045ad1ba20d8e0d7025ac8ce60299e0df9b 100644 --- a/lib/gat/pool.go +++ b/lib/gat/pool.go @@ -1,7 +1,14 @@ package gat import ( - "gfx.cafe/gfx/pggat/lib/gat/pool" + "gfx.cafe/gfx/pggat/lib/fed" + "gfx.cafe/gfx/pggat/lib/gat/metrics" ) -type Pool = pool.Pool +type Pool interface { + Serve(conn *fed.Conn) error + + Cancel(key fed.BackendKey) + ReadMetrics(m *metrics.Pool) + Close() +} diff --git a/lib/gat/pool/client.go b/lib/gat/pool/client.go deleted file mode 100644 index c6d3c76e45367c7843c3ad651a9bf694c538f3c8..0000000000000000000000000000000000000000 --- a/lib/gat/pool/client.go +++ /dev/null @@ -1,55 +0,0 @@ -package pool - -import ( - "gfx.cafe/gfx/pggat/lib/fed" - "gfx.cafe/gfx/pggat/lib/fed/middlewares/eqp" - "gfx.cafe/gfx/pggat/lib/fed/middlewares/ps" - "gfx.cafe/gfx/pggat/lib/fed/middlewares/unterminate" -) - -type pooledClient struct { - pooledConn - - ps *ps.Client - eqp *eqp.Client -} - -func newClient( - options Config, - conn *fed.Conn, -) *pooledClient { - conn.Middleware = append( - conn.Middleware, - unterminate.Unterminate, - ) - - var psClient *ps.Client - if options.ParameterStatusSync == ParameterStatusSyncDynamic { - // add ps middleware - psClient = ps.NewClient(conn.InitialParameters) - conn.Middleware = append(conn.Middleware, psClient) - } - - var eqpClient *eqp.Client - if options.ExtendedQuerySync { - // add eqp middleware - eqpClient = eqp.NewClient() - conn.Middleware = append(conn.Middleware, eqpClient) - } - - return &pooledClient{ - pooledConn: makeConn( - conn, - ), - ps: psClient, - eqp: eqpClient, - } -} - -func (T *pooledClient) GetEQP() *eqp.Client { - return T.eqp -} - -func (T *pooledClient) GetPS() *ps.Client { - return T.ps -} diff --git a/lib/gat/pool/config.go b/lib/gat/pool/config.go deleted file mode 100644 index 36604526c2916b47a33c13b5e2dff5a9a81ae8d8..0000000000000000000000000000000000000000 --- a/lib/gat/pool/config.go +++ /dev/null @@ -1,61 +0,0 @@ -package pool - -import ( - "go.uber.org/zap" - - "gfx.cafe/gfx/pggat/lib/util/dur" - "gfx.cafe/gfx/pggat/lib/util/strutil" -) - -type ParameterStatusSync int - -const ( - // ParameterStatusSyncNone does not attempt to sync parameter status. - ParameterStatusSyncNone ParameterStatusSync = iota - // ParameterStatusSyncInitial assumes both client and server have their initial status before syncing. - // Use in session pooling for lower latency - ParameterStatusSyncInitial - // ParameterStatusSyncDynamic will track parameter status and ensure they are synced correctly. - // Use in transaction pooling - ParameterStatusSyncDynamic -) - -type PoolingConfig struct { - NewPooler func() Pooler - // ReleaseAfterTransaction toggles whether servers should be released and re acquired after each transaction. - // Use false for lower latency - // Use true for better balancing - ReleaseAfterTransaction bool - - // ParameterStatusSync is the parameter syncing mode - ParameterStatusSync ParameterStatusSync - - // ExtendedQuerySync controls whether prepared statements and portals should be tracked and synced before use. - // Use false for lower latency - // Use true for transaction pooling - ExtendedQuerySync bool -} - -type ManagementConfig struct { - ServerResetQuery string `json:"server_reset_query,omitempty"` - // ServerIdleTimeout defines how long a server may be idle before it is disconnected - ServerIdleTimeout dur.Duration `json:"server_idle_timeout,omitempty"` - - // ServerReconnectInitialTime defines how long to wait initially before attempting a server reconnect - // 0 = disable, don't retry - ServerReconnectInitialTime dur.Duration `json:"server_reconnect_initial_time,omitempty"` - // ServerReconnectMaxTime defines the max amount of time to wait before attempting a server reconnect - // 0 = disable, back off infinitely - ServerReconnectMaxTime dur.Duration `json:"server_reconnect_max_time,omitempty"` - - // TrackedParameters are parameters which should be synced by updating the server, not the client. - TrackedParameters []strutil.CIString `json:"tracked_parameters,omitempty"` -} - -type Config struct { - PoolingConfig - - ManagementConfig - - Logger *zap.Logger -} diff --git a/lib/gat/pool/conn.go b/lib/gat/pool/conn.go index c1ea8981e07e6487e41bc72d0c49f126a1cb26d8..ce0eac955c55dece7216b0992d7a553d88282e86 100644 --- a/lib/gat/pool/conn.go +++ b/lib/gat/pool/conn.go @@ -9,22 +9,22 @@ import ( "gfx.cafe/gfx/pggat/lib/fed" "gfx.cafe/gfx/pggat/lib/gat/metrics" - "gfx.cafe/gfx/pggat/lib/util/strutil" ) -type pooledConn struct { - id uuid.UUID - - conn *fed.Conn +type Conn struct { + ID uuid.UUID + Conn *fed.Conn + // Recipe that created this conn, optional. + Recipe string // metrics - transactionCount atomic.Int64 + txnCount atomic.Int64 lastMetricsRead time.Time state metrics.ConnState - peer uuid.UUID + peer *Conn since time.Time util [metrics.ConnStateCount]time.Duration @@ -32,89 +32,92 @@ type pooledConn struct { mu sync.RWMutex } -func makeConn( - conn *fed.Conn, -) pooledConn { - return pooledConn{ - id: uuid.New(), - conn: conn, - - since: time.Now(), +func NewConn(conn *fed.Conn) *Conn { + return &Conn{ + ID: uuid.New(), + Conn: conn, } } -func (T *pooledConn) GetID() uuid.UUID { - return T.id -} - -func (T *pooledConn) GetConn() *fed.Conn { - return T.conn -} +func (T *Conn) GetState() (metrics.ConnState, time.Time) { + T.mu.RLock() + defer T.mu.RUnlock() -func (T *pooledConn) GetInitialParameters() map[strutil.CIString]string { - return T.conn.InitialParameters + return T.state, T.since } -func (T *pooledConn) GetBackendKey() fed.BackendKey { - return T.conn.BackendKey -} +func (T *Conn) GetPeer() *Conn { + T.mu.RLock() + defer T.mu.RUnlock() -func (T *pooledConn) TransactionComplete() { - T.transactionCount.Add(1) + return T.peer } -func (T *pooledConn) SetState(state metrics.ConnState, peer uuid.UUID) { +func (T *Conn) setState(now time.Time, state metrics.ConnState, peer *Conn) { T.mu.Lock() defer T.mu.Unlock() - now := time.Now() - - var since time.Duration + var dur time.Duration if T.since.Before(T.lastMetricsRead) { - since = now.Sub(T.lastMetricsRead) + dur = now.Sub(T.lastMetricsRead) } else { - since = now.Sub(T.since) + dur = now.Sub(T.since) } - T.util[T.state] += since + T.util[T.state] += dur T.state = state T.peer = peer T.since = now } -func (T *pooledConn) GetState() (state metrics.ConnState, peer uuid.UUID, since time.Time) { - T.mu.RLock() - defer T.mu.RUnlock() - state = T.state - peer = T.peer - since = T.since - return +func SetConnState(state metrics.ConnState, conns ...*Conn) { + now := time.Now() + + for i, conn := range conns { + var peer *Conn + if i == 0 { + if len(conns) > 1 { + peer = conns[1] + } + } else { + peer = conns[0] + } + conn.setState(now, state, peer) + } +} + +func ConnTransactionComplete(conns ...*Conn) { + for _, conn := range conns { + conn.txnCount.Add(1) + } } -func (T *pooledConn) ReadMetrics(m *metrics.Conn) { +func (T *Conn) ReadMetrics(m *metrics.Conn) { T.mu.Lock() defer T.mu.Unlock() - now := time.Now() - - m.Time = now + m.Time = time.Now() m.State = T.state - m.Peer = T.peer + if T.peer != nil { + m.Peer = T.peer.ID + } else { + m.Peer = uuid.Nil + } m.Since = T.since m.Utilization = T.util T.util = [metrics.ConnStateCount]time.Duration{} - var since time.Duration + var dur time.Duration if m.Since.Before(T.lastMetricsRead) { - since = now.Sub(T.lastMetricsRead) + dur = m.Time.Sub(T.lastMetricsRead) } else { - since = now.Sub(m.Since) + dur = m.Time.Sub(m.Since) } - m.Utilization[m.State] += since + m.Utilization[m.State] += dur - m.TransactionCount = int(T.transactionCount.Swap(0)) + m.TransactionCount = int(T.txnCount.Swap(0)) - T.lastMetricsRead = now + T.lastMetricsRead = m.Time } diff --git a/lib/gat/pool/recipe/dialer.go b/lib/gat/pool/dialer.go similarity index 98% rename from lib/gat/pool/recipe/dialer.go rename to lib/gat/pool/dialer.go index b11ac3a98efea8fea1c4ea03079f4104705ad94d..ca8ccca85590a5c694fd3211ea4da8d1967ab29f 100644 --- a/lib/gat/pool/recipe/dialer.go +++ b/lib/gat/pool/dialer.go @@ -1,4 +1,4 @@ -package recipe +package pool import ( "crypto/tls" diff --git a/lib/gat/pool/flow.go b/lib/gat/pool/flow.go deleted file mode 100644 index d4c2626237b1f52190849e169d6d070cf41cc60e..0000000000000000000000000000000000000000 --- a/lib/gat/pool/flow.go +++ /dev/null @@ -1,109 +0,0 @@ -package pool - -import ( - "gfx.cafe/gfx/pggat/lib/bouncer/backends/v0" - "gfx.cafe/gfx/pggat/lib/fed/middlewares/eqp" - "gfx.cafe/gfx/pggat/lib/fed/middlewares/ps" - packets "gfx.cafe/gfx/pggat/lib/fed/packets/v3.0" - "gfx.cafe/gfx/pggat/lib/gat/metrics" - "gfx.cafe/gfx/pggat/lib/util/slices" -) - -func pair(options Config, client *pooledClient, server *pooledServer) (clientErr, serverErr error) { - defer func() { - client.SetState(metrics.ConnStateActive, server.GetID()) - server.SetState(metrics.ConnStateActive, client.GetID()) - }() - - if options.ParameterStatusSync != ParameterStatusSyncNone || options.ExtendedQuerySync { - client.SetState(metrics.ConnStatePairing, server.GetID()) - server.SetState(metrics.ConnStatePairing, client.GetID()) - } - - switch options.ParameterStatusSync { - case ParameterStatusSyncDynamic: - clientErr, serverErr = ps.Sync(options.TrackedParameters, client.GetConn(), server.GetConn()) - case ParameterStatusSyncInitial: - clientErr, serverErr = syncInitialParameters(options, client, server) - } - - if clientErr != nil || serverErr != nil { - return - } - - if options.ExtendedQuerySync { - serverErr = eqp.Sync(server.GetConn(), server.GetConn()) - } - - return -} - -func syncInitialParameters(options Config, client *pooledClient, server *pooledServer) (clientErr, serverErr error) { - clientParams := client.GetInitialParameters() - serverParams := server.GetInitialParameters() - - for key, value := range clientParams { - // skip already set params - if serverParams[key] == value { - p := packets.ParameterStatus{ - Key: key.String(), - Value: serverParams[key], - } - clientErr = client.GetConn().WritePacket(&p) - if clientErr != nil { - return - } - continue - } - - setServer := slices.Contains(options.TrackedParameters, key) - - if !setServer { - value = serverParams[key] - } - - p := packets.ParameterStatus{ - Key: key.String(), - Value: value, - } - clientErr = client.GetConn().WritePacket(&p) - if clientErr != nil { - return - } - - if !setServer { - continue - } - - serverErr, _ = backends.SetParameter(server.GetConn(), nil, key, value) - if serverErr != nil { - return - } - } - - for key, value := range serverParams { - if _, ok := clientParams[key]; ok { - continue - } - - // Don't need to run reset on server because it will reset it to the initial value - - // send to client - p := packets.ParameterStatus{ - Key: key.String(), - Value: value, - } - clientErr = client.GetConn().WritePacket(&p) - if clientErr != nil { - return - } - } - - return - -} - -func transactionComplete(client *pooledClient, server *pooledServer) { - client.TransactionComplete() - server.TransactionComplete() -} diff --git a/lib/gat/pool/pool.go b/lib/gat/pool/pool.go deleted file mode 100644 index 8d79cc1ac528897631f7ee402fc6464efb9cc433..0000000000000000000000000000000000000000 --- a/lib/gat/pool/pool.go +++ /dev/null @@ -1,479 +0,0 @@ -package pool - -import ( - "errors" - "sync" - "sync/atomic" - "time" - - "github.com/google/uuid" - "go.uber.org/zap" - - "gfx.cafe/gfx/pggat/lib/bouncer/backends/v0" - "gfx.cafe/gfx/pggat/lib/bouncer/bouncers/v2" - "gfx.cafe/gfx/pggat/lib/fed" - packets "gfx.cafe/gfx/pggat/lib/fed/packets/v3.0" - "gfx.cafe/gfx/pggat/lib/gat/metrics" - "gfx.cafe/gfx/pggat/lib/util/slices" -) - -type Pool struct { - config Config - pooler Pooler - - closed chan struct{} - - pendingCount atomic.Int64 - pending chan struct{} - - recipes map[string]*Recipe - recipeScaleOrder slices.Sorted[string] - clients map[uuid.UUID]*pooledClient - clientsByKey map[fed.BackendKey]*pooledClient - servers map[uuid.UUID]*pooledServer - serversByRecipe map[string][]*pooledServer - mu sync.RWMutex -} - -func NewPool(config Config) *Pool { - if config.NewPooler == nil { - panic("expected new pooler func") - } - pooler := config.NewPooler() - if pooler == nil { - panic("expected pooler") - } - - p := &Pool{ - config: config, - pooler: pooler, - - closed: make(chan struct{}), - pending: make(chan struct{}, 1), - } - - s := newScaler(p) - go s.Run() - - return p -} - -func (T *Pool) idlest() (server *pooledServer, at time.Time) { - T.mu.RLock() - defer T.mu.RUnlock() - - for _, s := range T.servers { - state, _, since := s.GetState() - if state != metrics.ConnStateIdle { - continue - } - - if at == (time.Time{}) || since.Before(at) { - server = s - at = since - } - } - - return -} - -func (T *Pool) AddRecipe(name string, r *Recipe) { - func() { - T.mu.Lock() - defer T.mu.Unlock() - - T.removeRecipe(name) - - if T.recipes == nil { - T.recipes = make(map[string]*Recipe) - } - T.recipes[name] = r - - // add to front of scale order - T.recipeScaleOrder = T.recipeScaleOrder.Insert(name, func(n string) int { - return len(T.serversByRecipe[n]) - }) - }() - - count := r.AllocateInitial() - for i := 0; i < count; i++ { - if err := T.scaleUpL1(name, r); err != nil { - T.config.Logger.Warn("failed to dial server", zap.Error(err)) - for j := i; j < count; j++ { - r.Free() - } - break - } - } -} - -func (T *Pool) RemoveRecipe(name string) { - T.mu.Lock() - defer T.mu.Unlock() - - T.removeRecipe(name) -} - -func (T *Pool) removeRecipe(name string) { - r, ok := T.recipes[name] - if !ok { - return - } - delete(T.recipes, name) - - servers := T.serversByRecipe[name] - delete(T.serversByRecipe, name) - // remove from recipeScaleOrder - T.recipeScaleOrder = slices.Delete(T.recipeScaleOrder, name) - - for _, server := range servers { - r.Free() - T.removeServerL1(server) - } -} - -func (T *Pool) scaleUpL0() (string, *Recipe) { - T.mu.RLock() - defer T.mu.RUnlock() - - for _, name := range T.recipeScaleOrder { - r := T.recipes[name] - if r.Allocate() { - return name, r - } - } - - return "", nil -} - -func (T *Pool) scaleUpL1(name string, r *Recipe) error { - conn, err := r.Dial() - if err != nil { - // failed to dial - r.Free() - return err - } - - server, err := func() (*pooledServer, error) { - T.mu.Lock() - defer T.mu.Unlock() - if T.recipes[name] != r { - // recipe was removed - r.Free() - return nil, errors.New("recipe was removed") - } - - server := newServer( - T.config, - name, - conn, - ) - - if T.servers == nil { - T.servers = make(map[uuid.UUID]*pooledServer) - } - T.servers[server.GetID()] = server - - if T.serversByRecipe == nil { - T.serversByRecipe = make(map[string][]*pooledServer) - } - T.serversByRecipe[name] = append(T.serversByRecipe[name], server) - // update order - T.recipeScaleOrder.Update(slices.Index(T.recipeScaleOrder, name), func(n string) int { - return len(T.serversByRecipe[n]) - }) - return server, nil - }() - - if err != nil { - return err - } - - T.pooler.AddServer(server.GetID()) - return nil -} - -func (T *Pool) scaleUp() bool { - name, r := T.scaleUpL0() - if r == nil { - return false - } - - err := T.scaleUpL1(name, r) - if err != nil { - T.config.Logger.Warn("failed to dial server", zap.Error(err)) - return false - } - - return true -} - -func (T *Pool) removeServer(server *pooledServer) { - T.mu.Lock() - defer T.mu.Unlock() - - T.removeServerL1(server) -} - -func (T *Pool) removeServerL1(server *pooledServer) { - delete(T.servers, server.GetID()) - T.pooler.DeleteServer(server.GetID()) - _ = server.GetConn().Close() - if T.serversByRecipe != nil { - name := server.GetRecipe() - T.serversByRecipe[name] = slices.Delete(T.serversByRecipe[name], server) - // update order - index := slices.Index(T.recipeScaleOrder, name) - if index != -1 { - T.recipeScaleOrder.Update(index, func(n string) int { - return len(T.serversByRecipe[n]) - }) - } - } -} - -func (T *Pool) acquireServer(client *pooledClient) *pooledServer { - client.SetState(metrics.ConnStateAwaitingServer, uuid.Nil) - - for { - serverID := T.pooler.Acquire(client.GetID(), SyncModeNonBlocking) - if serverID == uuid.Nil { - T.pendingCount.Add(1) - select { - case T.pending <- struct{}{}: - default: - } - serverID = T.pooler.Acquire(client.GetID(), SyncModeBlocking) - T.pendingCount.Add(-1) - if serverID == uuid.Nil { - return nil - } - } - - T.mu.RLock() - server, ok := T.servers[serverID] - T.mu.RUnlock() - if !ok { - T.pooler.DeleteServer(serverID) - continue - } - return server - } -} - -func (T *Pool) releaseServer(server *pooledServer) { - if T.config.ServerResetQuery != "" { - server.SetState(metrics.ConnStateRunningResetQuery, uuid.Nil) - - err, _ := backends.QueryString(server.GetConn(), nil, T.config.ServerResetQuery) - if err != nil { - T.removeServer(server) - return - } - } - - server.SetState(metrics.ConnStateIdle, uuid.Nil) - - T.pooler.Release(server.GetID()) -} - -func (T *Pool) Serve( - conn *fed.Conn, -) error { - defer func() { - _ = conn.Close() - }() - - client := newClient( - T.config, - conn, - ) - - return T.serve(client) -} - -func (T *Pool) serve(client *pooledClient) error { - T.addClient(client) - defer T.removeClient(client) - - var err error - var serverErr error - - var server *pooledServer - defer func() { - if server != nil { - if serverErr != nil { - T.removeServer(server) - } else { - T.releaseServer(server) - } - server = nil - } - }() - - if !client.GetConn().Ready { - server = T.acquireServer(client) - if server == nil { - return ErrClosed - } - - err, serverErr = pair(T.config, client, server) - if serverErr != nil { - return serverErr - } - if err != nil { - return err - } - - p := packets.ReadyForQuery('I') - err = client.GetConn().WritePacket(&p) - if err != nil { - return err - } - - client.GetConn().Ready = true - } - - for { - if server != nil && T.config.ReleaseAfterTransaction { - client.SetState(metrics.ConnStateIdle, uuid.Nil) - T.releaseServer(server) - server = nil - } - - var packet fed.Packet - packet, err = client.GetConn().ReadPacket(true) - if err != nil { - return err - } - - if server == nil { - server = T.acquireServer(client) - if server == nil { - return ErrClosed - } - - err, serverErr = pair(T.config, client, server) - } - if err == nil && serverErr == nil { - err, serverErr = bouncers.Bounce(client.GetConn(), server.GetConn(), packet) - } - - if serverErr != nil { - return serverErr - } else { - transactionComplete(client, server) - } - - if err != nil { - return err - } - } -} - -func (T *Pool) addClient(client *pooledClient) { - T.mu.Lock() - defer T.mu.Unlock() - - if T.clients == nil { - T.clients = make(map[uuid.UUID]*pooledClient) - } - T.clients[client.GetID()] = client - if T.clientsByKey == nil { - T.clientsByKey = make(map[fed.BackendKey]*pooledClient) - } - T.clientsByKey[client.GetBackendKey()] = client - T.pooler.AddClient(client.GetID()) -} - -func (T *Pool) removeClient(client *pooledClient) { - T.mu.Lock() - defer T.mu.Unlock() - - T.removeClientL1(client) -} - -func (T *Pool) removeClientL1(client *pooledClient) { - T.pooler.DeleteClient(client.GetID()) - _ = client.conn.Close() - delete(T.clients, client.GetID()) - delete(T.clientsByKey, client.GetBackendKey()) -} - -func (T *Pool) Cancel(key fed.BackendKey) { - T.mu.RLock() - defer T.mu.RUnlock() - - client, ok := T.clientsByKey[key] - if !ok { - return - } - - state, peer, _ := client.GetState() - if state != metrics.ConnStateActive { - return - } - - server, ok := T.servers[peer] - if !ok { - return - } - - // prevent state from changing by RLocking the server - server.mu.RLock() - defer server.mu.RUnlock() - - // make sure peer is still set - if server.peer != peer { - return - } - - r, ok := T.recipes[server.recipe] - if !ok { - return - } - - r.Cancel(server.GetBackendKey()) -} - -func (T *Pool) ReadMetrics(m *metrics.Pool) { - T.mu.RLock() - defer T.mu.RUnlock() - - if len(T.clients) != 0 && m.Clients == nil { - m.Clients = make(map[uuid.UUID]metrics.Conn) - } - if len(T.servers) != 0 && m.Servers == nil { - m.Servers = make(map[uuid.UUID]metrics.Conn) - } - - for id, client := range T.clients { - var mc metrics.Conn - client.ReadMetrics(&mc) - m.Clients[id] = mc - } - - for id, server := range T.servers { - var mc metrics.Conn - server.ReadMetrics(&mc) - m.Servers[id] = mc - } -} - -func (T *Pool) Close() { - close(T.closed) - T.pooler.Close() - - T.mu.Lock() - defer T.mu.Unlock() - - // remove clients - for _, client := range T.clients { - T.removeClientL1(client) - } - - // remove recipes - for name := range T.recipes { - T.removeRecipe(name) - } -} diff --git a/lib/gat/pool/pooler.go b/lib/gat/pool/pooler.go deleted file mode 100644 index 06bba5a46ba99f4e34c4c69e8d196f02617799f8..0000000000000000000000000000000000000000 --- a/lib/gat/pool/pooler.go +++ /dev/null @@ -1,23 +0,0 @@ -package pool - -import "github.com/google/uuid" - -type SyncMode int - -const ( - SyncModeNonBlocking SyncMode = iota - SyncModeBlocking -) - -type Pooler interface { - AddClient(id uuid.UUID) - DeleteClient(client uuid.UUID) - - AddServer(id uuid.UUID) - DeleteServer(server uuid.UUID) - - Acquire(client uuid.UUID, sync SyncMode) (server uuid.UUID) - Release(server uuid.UUID) - - Close() -} diff --git a/lib/gat/pool2/clientpool/config.go b/lib/gat/pool/pools/basic/config.go similarity index 86% rename from lib/gat/pool2/clientpool/config.go rename to lib/gat/pool/pools/basic/config.go index c0a9d20147b9ba8f32e1e5b3fb2319ed756c9435..11ede312fb8506fbc15b2157e209f37d372d6b36 100644 --- a/lib/gat/pool2/clientpool/config.go +++ b/lib/gat/pool/pools/basic/config.go @@ -1,7 +1,7 @@ -package clientpool +package basic import ( - "gfx.cafe/gfx/pggat/lib/gat/pool2/scalingpool" + "gfx.cafe/gfx/pggat/lib/gat/pool/scalingpool" "gfx.cafe/gfx/pggat/lib/util/strutil" ) diff --git a/lib/gat/pool2/clientpool/pool.go b/lib/gat/pool/pools/basic/pool.go similarity index 75% rename from lib/gat/pool2/clientpool/pool.go rename to lib/gat/pool/pools/basic/pool.go index 0eeffc80a1be1c1a5ed39ab41b92e68b117c11de..02d8ee69932fd42fb9cf9f4b7b325b6267c87374 100644 --- a/lib/gat/pool2/clientpool/pool.go +++ b/lib/gat/pool/pools/basic/pool.go @@ -1,19 +1,20 @@ -package clientpool +package basic import ( "sync" + "github.com/google/uuid" + "gfx.cafe/gfx/pggat/lib/bouncer/backends/v0" "gfx.cafe/gfx/pggat/lib/bouncer/bouncers/v2" "gfx.cafe/gfx/pggat/lib/fed" "gfx.cafe/gfx/pggat/lib/fed/middlewares/eqp" "gfx.cafe/gfx/pggat/lib/fed/middlewares/ps" packets "gfx.cafe/gfx/pggat/lib/fed/packets/v3.0" + "gfx.cafe/gfx/pggat/lib/gat" "gfx.cafe/gfx/pggat/lib/gat/metrics" "gfx.cafe/gfx/pggat/lib/gat/pool" - "gfx.cafe/gfx/pggat/lib/gat/pool/recipe" - "gfx.cafe/gfx/pggat/lib/gat/pool2" - "gfx.cafe/gfx/pggat/lib/gat/pool2/scalingpool" + "gfx.cafe/gfx/pggat/lib/gat/pool/scalingpool" "gfx.cafe/gfx/pggat/lib/util/slices" ) @@ -22,7 +23,7 @@ type Pool struct { servers scalingpool.Pool - clientsByBackendKey map[fed.BackendKey]*pool2.Conn + clientsByBackendKey map[fed.BackendKey]*pool.Conn mu sync.RWMutex } @@ -34,7 +35,7 @@ func MakePool(config Config) Pool { } } -func (T *Pool) AddRecipe(name string, r *recipe.Recipe) { +func (T *Pool) AddRecipe(name string, r *pool.Recipe) { T.servers.AddRecipe(name, r) } @@ -42,18 +43,18 @@ func (T *Pool) RemoveRecipe(name string) { T.servers.RemoveRecipe(name) } -func (T *Pool) addClient(client *pool2.Conn) { +func (T *Pool) addClient(client *pool.Conn) { T.servers.AddClient(client) T.mu.Lock() defer T.mu.Unlock() if T.clientsByBackendKey == nil { - T.clientsByBackendKey = make(map[fed.BackendKey]*pool2.Conn) + T.clientsByBackendKey = make(map[fed.BackendKey]*pool.Conn) } T.clientsByBackendKey[client.Conn.BackendKey] = client } -func (T *Pool) removeClient(client *pool2.Conn) { +func (T *Pool) removeClient(client *pool.Conn) { T.servers.RemoveClient(client) T.mu.Lock() @@ -61,7 +62,7 @@ func (T *Pool) removeClient(client *pool2.Conn) { delete(T.clientsByBackendKey, client.Conn.BackendKey) } -func (T *Pool) syncInitialParameters(client, server *pool2.Conn) (clientErr, serverErr error) { +func (T *Pool) syncInitialParameters(client, server *pool.Conn) (clientErr, serverErr error) { clientParams := client.Conn.InitialParameters serverParams := server.Conn.InitialParameters @@ -126,9 +127,9 @@ func (T *Pool) syncInitialParameters(client, server *pool2.Conn) (clientErr, ser } -func (T *Pool) pair(client, server *pool2.Conn) (err, serverErr error) { +func (T *Pool) pair(client, server *pool.Conn) (err, serverErr error) { if T.config.ParameterStatusSync != pool.ParameterStatusSyncNone || T.config.ExtendedQuerySync { - pool2.SetConnState(metrics.ConnStatePairing, client, server) + pool.SetConnState(metrics.ConnStatePairing, client, server) } switch T.config.ParameterStatusSync { @@ -164,7 +165,7 @@ func (T *Pool) Serve(conn *fed.Conn) error { ) } - client := pool2.NewConn(conn) + client := pool.NewConn(conn) T.addClient(client) defer T.removeClient(client) @@ -172,7 +173,7 @@ func (T *Pool) Serve(conn *fed.Conn) error { var err error var serverErr error - var server *pool2.Conn + var server *pool.Conn defer func() { if server != nil { if serverErr != nil { @@ -187,7 +188,7 @@ func (T *Pool) Serve(conn *fed.Conn) error { if !client.Conn.Ready { server = T.servers.Acquire(client) if server == nil { - return pool2.ErrClosed + return pool.ErrClosed } err, serverErr = T.pair(client, server) @@ -214,7 +215,7 @@ func (T *Pool) Serve(conn *fed.Conn) error { if server == nil { server = T.servers.Acquire(client) if server == nil { - return pool2.ErrClosed + return pool.ErrClosed } err, serverErr = T.pair(client, server) @@ -226,8 +227,7 @@ func (T *Pool) Serve(conn *fed.Conn) error { if serverErr != nil { return serverErr } else { - client.TransactionComplete() - server.TransactionComplete() + pool.ConnTransactionComplete(client, server) } if err != nil { @@ -237,9 +237,44 @@ func (T *Pool) Serve(conn *fed.Conn) error { } func (T *Pool) Cancel(key fed.BackendKey) { - // TODO(garet) + var client *pool.Conn + func() { + T.mu.RLock() + defer T.mu.RUnlock() + + client = T.clientsByBackendKey[key] + }() + + if client == nil { + return + } + + server := client.GetPeer() + if server == nil { + return + } + + T.servers.Cancel(server) +} + +func (T *Pool) ReadMetrics(m *metrics.Pool) { + T.servers.ReadMetrics(m) + + if m.Clients == nil { + m.Clients = make(map[uuid.UUID]metrics.Conn) + } + + T.mu.RLock() + defer T.mu.RUnlock() + for _, client := range T.clientsByBackendKey { + var conn metrics.Conn + client.ReadMetrics(&conn) + m.Clients[client.ID] = conn + } } func (T *Pool) Close() { T.servers.Close() } + +var _ gat.Pool = (*Pool)(nil) diff --git a/lib/gat/pool/ps.go b/lib/gat/pool/ps.go new file mode 100644 index 0000000000000000000000000000000000000000..28189ed5810ee67bc1da2a7bc4471e287cf5e14b --- /dev/null +++ b/lib/gat/pool/ps.go @@ -0,0 +1,14 @@ +package pool + +type ParameterStatusSync int + +const ( + // ParameterStatusSyncNone does not attempt to sync parameter status. + ParameterStatusSyncNone ParameterStatusSync = iota + // ParameterStatusSyncInitial assumes both client and server have their initial status before syncing. + // Use in session pooling for lower latency + ParameterStatusSyncInitial + // ParameterStatusSyncDynamic will track parameter status and ensure they are synced correctly. + // Use in transaction pooling + ParameterStatusSyncDynamic +) diff --git a/lib/gat/pool/recipe.go b/lib/gat/pool/recipe.go index cfa70d173b7345e501ba653fd41a46d18f977969..b8d42d0de34b7d2c5a8caa1c2dfdef70852aa372 100644 --- a/lib/gat/pool/recipe.go +++ b/lib/gat/pool/recipe.go @@ -1,5 +1,84 @@ package pool -import "gfx.cafe/gfx/pggat/lib/gat/pool/recipe" +import ( + "sync" -type Recipe = recipe.Recipe + "gfx.cafe/gfx/pggat/lib/fed" +) + +type Recipe struct { + config RecipeConfig + + count int + mu sync.Mutex +} + +type RecipeConfig struct { + Dialer Dialer + + MinConnections int + // MaxConnections is the max number of active server connections for this recipe. + // 0 = unlimited + MaxConnections int +} + +func NewRecipe(config RecipeConfig) *Recipe { + return &Recipe{ + config: config, + } +} + +func (T *Recipe) AllocateInitial() int { + T.mu.Lock() + defer T.mu.Unlock() + + if T.count >= T.config.MinConnections { + return 0 + } + + amount := T.config.MinConnections - T.count + T.count = T.config.MinConnections + + return amount +} + +func (T *Recipe) Allocate() bool { + T.mu.Lock() + defer T.mu.Unlock() + + if T.config.MaxConnections != 0 { + if T.count >= T.config.MaxConnections { + return false + } + } + + T.count++ + return true +} + +func (T *Recipe) TryFree() bool { + T.mu.Lock() + defer T.mu.Unlock() + + if T.count <= T.config.MinConnections { + return false + } + + T.count-- + return true +} + +func (T *Recipe) Free() { + T.mu.Lock() + defer T.mu.Unlock() + + T.count-- +} + +func (T *Recipe) Dial() (*fed.Conn, error) { + return T.config.Dialer.Dial() +} + +func (T *Recipe) Cancel(key fed.BackendKey) { + T.config.Dialer.Cancel(key) +} diff --git a/lib/gat/pool/recipe/config.go b/lib/gat/pool/recipe/config.go deleted file mode 100644 index 37ae8d6602e4cde1eed617841d36d3909cf1caf3..0000000000000000000000000000000000000000 --- a/lib/gat/pool/recipe/config.go +++ /dev/null @@ -1,10 +0,0 @@ -package recipe - -type Config struct { - Dialer Dialer - - MinConnections int - // MaxConnections is the max number of active server connections for this recipe. - // 0 = unlimited - MaxConnections int -} diff --git a/lib/gat/pool/recipe/recipe.go b/lib/gat/pool/recipe/recipe.go deleted file mode 100644 index 78a79391ad159a1e286ac6b0cd69b7aa7f86ea45..0000000000000000000000000000000000000000 --- a/lib/gat/pool/recipe/recipe.go +++ /dev/null @@ -1,75 +0,0 @@ -package recipe - -import ( - "sync" - - "gfx.cafe/gfx/pggat/lib/fed" -) - -type Recipe struct { - config Config - - count int - mu sync.Mutex -} - -func NewRecipe(config Config) *Recipe { - return &Recipe{ - config: config, - } -} - -func (T *Recipe) AllocateInitial() int { - T.mu.Lock() - defer T.mu.Unlock() - - if T.count >= T.config.MinConnections { - return 0 - } - - amount := T.config.MinConnections - T.count - T.count = T.config.MinConnections - - return amount -} - -func (T *Recipe) Allocate() bool { - T.mu.Lock() - defer T.mu.Unlock() - - if T.config.MaxConnections != 0 { - if T.count >= T.config.MaxConnections { - return false - } - } - - T.count++ - return true -} - -func (T *Recipe) TryFree() bool { - T.mu.Lock() - defer T.mu.Unlock() - - if T.count <= T.config.MinConnections { - return false - } - - T.count-- - return true -} - -func (T *Recipe) Free() { - T.mu.Lock() - defer T.mu.Unlock() - - T.count-- -} - -func (T *Recipe) Dial() (*fed.Conn, error) { - return T.config.Dialer.Dial() -} - -func (T *Recipe) Cancel(key fed.BackendKey) { - T.config.Dialer.Cancel(key) -} diff --git a/lib/gat/pool2/recipepool/config.go b/lib/gat/pool/recipepool/config.go similarity index 89% rename from lib/gat/pool2/recipepool/config.go rename to lib/gat/pool/recipepool/config.go index e8740d8c79a24041d6048f5b7032bf2a010d7b5f..b0b0cb452e9f0f910d09c3d831368570603f6ce5 100644 --- a/lib/gat/pool2/recipepool/config.go +++ b/lib/gat/pool/recipepool/config.go @@ -1,8 +1,9 @@ package recipepool import ( + "gfx.cafe/gfx/pggat/lib/gat/pool/serverpool" + "gfx.cafe/gfx/pggat/lib/gat/pool" - "gfx.cafe/gfx/pggat/lib/gat/pool2/serverpool" ) type Config struct { diff --git a/lib/gat/pool2/recipepool/pool.go b/lib/gat/pool/recipepool/pool.go similarity index 79% rename from lib/gat/pool2/recipepool/pool.go rename to lib/gat/pool/recipepool/pool.go index 22e6cd307d80a0af457a3207fc42f1343e5cab4e..60fb072ac3f6cca6030fdf792e78c3905d131e55 100644 --- a/lib/gat/pool2/recipepool/pool.go +++ b/lib/gat/pool/recipepool/pool.go @@ -4,12 +4,11 @@ import ( "sync" "time" - "gfx.cafe/gfx/pggat/lib/fed" + "gfx.cafe/gfx/pggat/lib/gat" + "gfx.cafe/gfx/pggat/lib/gat/pool/serverpool" + "gfx.cafe/gfx/pggat/lib/gat/metrics" "gfx.cafe/gfx/pggat/lib/gat/pool" - "gfx.cafe/gfx/pggat/lib/gat/pool/recipe" - "gfx.cafe/gfx/pggat/lib/gat/pool2" - "gfx.cafe/gfx/pggat/lib/gat/pool2/serverpool" "gfx.cafe/gfx/pggat/lib/util/slices" ) @@ -45,7 +44,7 @@ func (T *Pool) scaleUpL0() (string, *Recipe) { return "", nil } -func (T *Pool) scaleUpL1(name string, r *Recipe) *pool2.Conn { +func (T *Pool) scaleUpL1(name string, r *Recipe) *pool.Conn { if r == nil { return nil } @@ -53,7 +52,7 @@ func (T *Pool) scaleUpL1(name string, r *Recipe) *pool2.Conn { return T.scaleUpL2(name, r, r.Dial()) } -func (T *Pool) scaleUpL2(name string, r *Recipe, conn *pool2.Conn) *pool2.Conn { +func (T *Pool) scaleUpL2(name string, r *Recipe, conn *pool.Conn) *pool.Conn { if conn == nil { r.r.Free() return nil @@ -90,11 +89,11 @@ func (T *Pool) ScaleDown(idleFor time.Duration) time.Duration { return T.servers.ScaleDown(idleFor) } -func (T *Pool) AddClient(client *pool2.Conn) { +func (T *Pool) AddClient(client *pool.Conn) { T.servers.AddClient(client) } -func (T *Pool) RemoveClient(client *pool2.Conn) { +func (T *Pool) RemoveClient(client *pool.Conn) { T.servers.RemoveClient(client) } @@ -121,7 +120,7 @@ func (T *Pool) addRecipe(name string, r *Recipe) { }) } -func (T *Pool) AddRecipe(name string, r *recipe.Recipe) { +func (T *Pool) AddRecipe(name string, r *pool.Recipe) { added := NewRecipe(T.config.ParameterStatusSync, T.config.ExtendedQuerySync, r) for _, server := range added.servers { @@ -158,7 +157,7 @@ func (T *Pool) RemoveRecipe(name string) { } } -func (T *Pool) RemoveServer(server *pool2.Conn) { +func (T *Pool) RemoveServer(server *pool.Conn) { T.servers.RemoveServer(server) // update recipe @@ -171,11 +170,11 @@ func (T *Pool) RemoveServer(server *pool2.Conn) { r.RemoveServer(server) } -func (T *Pool) Acquire(client *pool2.Conn, mode pool.SyncMode) (server *pool2.Conn) { +func (T *Pool) Acquire(client *pool.Conn, mode gat.SyncMode) (server *pool.Conn) { return T.servers.Acquire(client, mode) } -func (T *Pool) Release(server *pool2.Conn) { +func (T *Pool) Release(server *pool.Conn) { T.servers.Release(server) } @@ -183,18 +182,16 @@ func (T *Pool) ReadMetrics(m *metrics.Pool) { T.servers.ReadMetrics(m) } -func (T *Pool) Cancel(key fed.BackendKey) { +func (T *Pool) Cancel(server *pool.Conn) { T.mu.RLock() defer T.mu.RUnlock() - for _, r := range T.recipes { - for _, s := range r.servers { - if s.Conn.BackendKey == key { - r.Cancel(key) - return - } - } + r := T.recipes[server.Recipe] + if r == nil { + return } + + r.Cancel(server.Conn.BackendKey) } func (T *Pool) Close() { diff --git a/lib/gat/pool2/recipepool/recipe.go b/lib/gat/pool/recipepool/recipe.go similarity index 78% rename from lib/gat/pool2/recipepool/recipe.go rename to lib/gat/pool/recipepool/recipe.go index 4ec588050a9d517ec56d9ac46a84b68aedc69cc8..fd9389c352f44ed94d32e2ac946a3b89be35b3ed 100644 --- a/lib/gat/pool2/recipepool/recipe.go +++ b/lib/gat/pool/recipepool/recipe.go @@ -7,8 +7,6 @@ import ( "gfx.cafe/gfx/pggat/lib/fed/middlewares/eqp" "gfx.cafe/gfx/pggat/lib/fed/middlewares/ps" "gfx.cafe/gfx/pggat/lib/gat/pool" - "gfx.cafe/gfx/pggat/lib/gat/pool/recipe" - "gfx.cafe/gfx/pggat/lib/gat/pool2" "gfx.cafe/gfx/pggat/lib/util/slices" ) @@ -16,12 +14,12 @@ type Recipe struct { parameterStatusSync pool.ParameterStatusSync extendedQuerySync bool - r *recipe.Recipe + r *pool.Recipe - servers []*pool2.Conn + servers []*pool.Conn } -func NewRecipe(parameterStatusSync pool.ParameterStatusSync, extendedQuerySync bool, r *recipe.Recipe) *Recipe { +func NewRecipe(parameterStatusSync pool.ParameterStatusSync, extendedQuerySync bool, r *pool.Recipe) *Recipe { s := &Recipe{ parameterStatusSync: parameterStatusSync, extendedQuerySync: extendedQuerySync, @@ -34,7 +32,7 @@ func NewRecipe(parameterStatusSync pool.ParameterStatusSync, extendedQuerySync b func (T *Recipe) init() { count := T.r.AllocateInitial() - T.servers = make([]*pool2.Conn, 0, count) + T.servers = make([]*pool.Conn, 0, count) for i := 0; i < count; i++ { conn := T.dial() @@ -45,7 +43,7 @@ func (T *Recipe) init() { } } -func (T *Recipe) dial() *pool2.Conn { +func (T *Recipe) dial() *pool.Conn { conn, err := T.r.Dial() if err != nil { // TODO(garet) use proper logger @@ -67,10 +65,10 @@ func (T *Recipe) dial() *pool2.Conn { ) } - return pool2.NewConn(conn) + return pool.NewConn(conn) } -func (T *Recipe) Dial() *pool2.Conn { +func (T *Recipe) Dial() *pool.Conn { if !T.r.Allocate() { return nil } @@ -86,7 +84,7 @@ func (T *Recipe) Cancel(key fed.BackendKey) { T.r.Cancel(key) } -func (T *Recipe) TryRemoveServer(server *pool2.Conn) bool { +func (T *Recipe) TryRemoveServer(server *pool.Conn) bool { idx := slices.Index(T.servers, server) if idx == -1 { return false @@ -98,7 +96,7 @@ func (T *Recipe) TryRemoveServer(server *pool2.Conn) bool { return true } -func (T *Recipe) RemoveServer(server *pool2.Conn) { +func (T *Recipe) RemoveServer(server *pool.Conn) { idx := slices.Index(T.servers, server) if idx == -1 { return diff --git a/lib/gat/pool/scaler.go b/lib/gat/pool/scaler.go deleted file mode 100644 index 8beb64d3097d341369f26858cdb507dfb0bfb630..0000000000000000000000000000000000000000 --- a/lib/gat/pool/scaler.go +++ /dev/null @@ -1,115 +0,0 @@ -package pool - -import ( - "time" - - "go.uber.org/zap" -) - -type scaler struct { - pool *Pool - - backingOff bool - backoff time.Duration - - // timers - idle *time.Timer - pending *time.Timer -} - -func newScaler(pool *Pool) *scaler { - s := &scaler{ - pool: pool, - backoff: pool.config.ServerReconnectInitialTime.Duration(), - } - - if pool.config.ServerIdleTimeout != 0 { - s.idle = time.NewTimer(pool.config.ServerIdleTimeout.Duration()) - } - - return s -} - -func (T *scaler) idleTimeout(now time.Time) { - // idle loop for scaling down - var wait time.Duration - - var idlest *pooledServer - var idleStart time.Time - for idlest, idleStart = T.pool.idlest(); idlest != nil && now.Sub(idleStart) > T.pool.config.ServerIdleTimeout.Duration(); idlest, idleStart = T.pool.idlest() { - T.pool.removeServer(idlest) - } - - if idlest == nil { - wait = T.pool.config.ServerIdleTimeout.Duration() - } else { - wait = idleStart.Add(T.pool.config.ServerIdleTimeout.Duration()).Sub(now) - } - - T.idle.Reset(wait) -} - -func (T *scaler) pendingTimeout() { - if T.backingOff { - T.backoff *= 2 - if T.pool.config.ServerReconnectMaxTime != 0 && T.backoff > T.pool.config.ServerReconnectMaxTime.Duration() { - T.backoff = T.pool.config.ServerReconnectMaxTime.Duration() - } - } - - for T.pool.pendingCount.Load() > 0 { - // pending loop for scaling up - if T.pool.scaleUp() { - // scale up successful, see if we need to scale up more - T.backoff = T.pool.config.ServerReconnectInitialTime.Duration() - T.backingOff = false - continue - } - - if T.backoff == 0 { - // no backoff - T.backoff = T.pool.config.ServerReconnectInitialTime.Duration() - T.backingOff = false - continue - } - - T.backingOff = true - if T.pending == nil { - T.pending = time.NewTimer(T.backoff) - } else { - T.pending.Reset(T.backoff) - } - - T.pool.config.Logger.Warn("failed to dial server", zap.Duration("backoff", T.backoff)) - - return - } -} - -func (T *scaler) Run() { - for { - var idle <-chan time.Time - if T.idle != nil { - idle = T.idle.C - } - - var pending1 <-chan struct{} - var pending2 <-chan time.Time - if T.backingOff { - pending2 = T.pending.C - } else { - pending1 = T.pool.pending - } - - select { - case t := <-idle: - T.idleTimeout(t) - case <-pending1: - T.pendingTimeout() - case <-pending2: - T.pendingTimeout() - case <-T.pool.closed: - return - } - } -} diff --git a/lib/gat/pool2/scalingpool/config.go b/lib/gat/pool/scalingpool/config.go similarity index 92% rename from lib/gat/pool2/scalingpool/config.go rename to lib/gat/pool/scalingpool/config.go index a1d5240d9f90fd841f943c86f3d6ccfd3517c3b9..8d4ac4b5178d578850973fa8d195623067bfc038 100644 --- a/lib/gat/pool2/scalingpool/config.go +++ b/lib/gat/pool/scalingpool/config.go @@ -3,7 +3,7 @@ package scalingpool import ( "time" - "gfx.cafe/gfx/pggat/lib/gat/pool2/recipepool" + "gfx.cafe/gfx/pggat/lib/gat/pool/recipepool" ) type Config struct { diff --git a/lib/gat/pool2/scalingpool/pool.go b/lib/gat/pool/scalingpool/pool.go similarity index 76% rename from lib/gat/pool2/scalingpool/pool.go rename to lib/gat/pool/scalingpool/pool.go index c5fa7d1cc32eeab9cb12d564c8fc07f6098ea2e2..11f050782ab50ba13de2c9e4820f75223f8edcfe 100644 --- a/lib/gat/pool2/scalingpool/pool.go +++ b/lib/gat/pool/scalingpool/pool.go @@ -4,12 +4,11 @@ import ( "sync/atomic" "time" - "gfx.cafe/gfx/pggat/lib/fed" + "gfx.cafe/gfx/pggat/lib/gat" + "gfx.cafe/gfx/pggat/lib/gat/pool/recipepool" + "gfx.cafe/gfx/pggat/lib/gat/metrics" "gfx.cafe/gfx/pggat/lib/gat/pool" - "gfx.cafe/gfx/pggat/lib/gat/pool/recipe" - "gfx.cafe/gfx/pggat/lib/gat/pool2" - "gfx.cafe/gfx/pggat/lib/gat/pool2/recipepool" ) const pendingScaleUpSize = 4 @@ -42,15 +41,15 @@ func (T *Pool) init() { } } -func (T *Pool) AddClient(client *pool2.Conn) { +func (T *Pool) AddClient(client *pool.Conn) { T.servers.AddClient(client) } -func (T *Pool) RemoveClient(client *pool2.Conn) { +func (T *Pool) RemoveClient(client *pool.Conn) { T.servers.RemoveClient(client) } -func (T *Pool) AddRecipe(name string, r *recipe.Recipe) { +func (T *Pool) AddRecipe(name string, r *pool.Recipe) { T.init() T.servers.AddRecipe(name, r) @@ -60,30 +59,30 @@ func (T *Pool) RemoveRecipe(name string) { T.servers.RemoveRecipe(name) } -func (T *Pool) RemoveServer(server *pool2.Conn) { +func (T *Pool) RemoveServer(server *pool.Conn) { T.servers.RemoveServer(server) } -func (T *Pool) Acquire(client *pool2.Conn) (server *pool2.Conn) { - server = T.servers.Acquire(client, pool.SyncModeNonBlocking) +func (T *Pool) Acquire(client *pool.Conn) (server *pool.Conn) { + server = T.servers.Acquire(client, gat.SyncModeNonBlocking) if server == nil { select { case T.scale <- struct{}{}: default: } - server = T.servers.Acquire(client, pool.SyncModeBlocking) + server = T.servers.Acquire(client, gat.SyncModeBlocking) } return } -func (T *Pool) Release(server *pool2.Conn) { +func (T *Pool) Release(server *pool.Conn) { T.servers.Release(server) } -func (T *Pool) Cancel(key fed.BackendKey) { - T.servers.Cancel(key) +func (T *Pool) Cancel(server *pool.Conn) { + T.servers.Cancel(server) } func (T *Pool) ReadMetrics(m *metrics.Pool) { diff --git a/lib/gat/pool/server.go b/lib/gat/pool/server.go deleted file mode 100644 index 9afa7c994359bd6157c0df746dc368efd1c6500c..0000000000000000000000000000000000000000 --- a/lib/gat/pool/server.go +++ /dev/null @@ -1,57 +0,0 @@ -package pool - -import ( - "gfx.cafe/gfx/pggat/lib/fed" - "gfx.cafe/gfx/pggat/lib/fed/middlewares/eqp" - "gfx.cafe/gfx/pggat/lib/fed/middlewares/ps" -) - -type pooledServer struct { - pooledConn - - recipe string - - ps *ps.Server - eqp *eqp.Server -} - -func newServer( - options Config, - recipe string, - conn *fed.Conn, -) *pooledServer { - var psServer *ps.Server - if options.ParameterStatusSync == ParameterStatusSyncDynamic { - // add ps middleware - psServer = ps.NewServer(conn.InitialParameters) - conn.Middleware = append(conn.Middleware, psServer) - } - - var eqpServer *eqp.Server - if options.ExtendedQuerySync { - // add eqp middleware - eqpServer = eqp.NewServer() - conn.Middleware = append(conn.Middleware, eqpServer) - } - - return &pooledServer{ - pooledConn: makeConn( - conn, - ), - recipe: recipe, - ps: psServer, - eqp: eqpServer, - } -} - -func (T *pooledServer) GetRecipe() string { - return T.recipe -} - -func (T *pooledServer) GetEQP() *eqp.Server { - return T.eqp -} - -func (T *pooledServer) GetPS() *ps.Server { - return T.ps -} diff --git a/lib/gat/pool2/serverpool/config.go b/lib/gat/pool/serverpool/config.go similarity index 71% rename from lib/gat/pool2/serverpool/config.go rename to lib/gat/pool/serverpool/config.go index ed14cd0e8d6261e96ef1d860e4be9beed3df42dc..924bd1d878416b68aa7d02e2a9ae146ab652612c 100644 --- a/lib/gat/pool2/serverpool/config.go +++ b/lib/gat/pool/serverpool/config.go @@ -1,10 +1,12 @@ package serverpool -import "gfx.cafe/gfx/pggat/lib/gat/pool" +import ( + "gfx.cafe/gfx/pggat/lib/gat" +) type Config struct { // NewPooler allocates a new pooler - NewPooler func() pool.Pooler + NewPooler func() gat.Pooler // ServerResetQuery is the query to be run before the server is released ServerResetQuery string } diff --git a/lib/gat/pool2/serverpool/pool.go b/lib/gat/pool/serverpool/pool.go similarity index 79% rename from lib/gat/pool2/serverpool/pool.go rename to lib/gat/pool/serverpool/pool.go index f9841986f29bf4f371c1cb9fb4a1d56f98d08c9e..5d329caf8ee53b85dcce3bafa9b9e5332e61e789 100644 --- a/lib/gat/pool2/serverpool/pool.go +++ b/lib/gat/pool/serverpool/pool.go @@ -7,16 +7,16 @@ import ( "github.com/google/uuid" "gfx.cafe/gfx/pggat/lib/bouncer/backends/v0" + "gfx.cafe/gfx/pggat/lib/gat" "gfx.cafe/gfx/pggat/lib/gat/metrics" "gfx.cafe/gfx/pggat/lib/gat/pool" - "gfx.cafe/gfx/pggat/lib/gat/pool2" ) type Pool struct { config Config - pooler pool.Pooler + pooler gat.Pooler - servers map[uuid.UUID]*pool2.Conn + servers map[uuid.UUID]*pool.Conn mu sync.RWMutex } @@ -27,27 +27,27 @@ func MakePool(config Config) Pool { } } -func (T *Pool) AddClient(client *pool2.Conn) { +func (T *Pool) AddClient(client *pool.Conn) { T.pooler.AddClient(client.ID) } -func (T *Pool) RemoveClient(client *pool2.Conn) { +func (T *Pool) RemoveClient(client *pool.Conn) { T.pooler.DeleteClient(client.ID) } -func (T *Pool) AddServer(server *pool2.Conn) { +func (T *Pool) AddServer(server *pool.Conn) { func() { T.mu.Lock() defer T.mu.Unlock() if T.servers == nil { - T.servers = make(map[uuid.UUID]*pool2.Conn) + T.servers = make(map[uuid.UUID]*pool.Conn) } T.servers[server.ID] = server }() T.pooler.AddServer(server.ID) } -func (T *Pool) RemoveServer(server *pool2.Conn) { +func (T *Pool) RemoveServer(server *pool.Conn) { T.pooler.DeleteServer(server.ID) T.mu.Lock() @@ -55,7 +55,7 @@ func (T *Pool) RemoveServer(server *pool2.Conn) { delete(T.servers, server.ID) } -func (T *Pool) Acquire(client *pool2.Conn, mode pool.SyncMode) (server *pool2.Conn) { +func (T *Pool) Acquire(client *pool.Conn, mode gat.SyncMode) (server *pool.Conn) { for { serverID := T.pooler.Acquire(client.ID, mode) if serverID == uuid.Nil { @@ -74,9 +74,9 @@ func (T *Pool) Acquire(client *pool2.Conn, mode pool.SyncMode) (server *pool2.Co } } -func (T *Pool) Release(server *pool2.Conn) { +func (T *Pool) Release(server *pool.Conn) { if T.config.ServerResetQuery != "" { - pool2.SetConnState(metrics.ConnStateRunningResetQuery, server) + pool.SetConnState(metrics.ConnStateRunningResetQuery, server) err, _ := backends.QueryString(server.Conn, nil, T.config.ServerResetQuery) if err != nil { @@ -85,7 +85,7 @@ func (T *Pool) Release(server *pool2.Conn) { } } - pool2.SetConnState(metrics.ConnStateIdle, server) + pool.SetConnState(metrics.ConnStateIdle, server) T.pooler.Release(server.ID) } diff --git a/lib/gat/pool/withcredentials.go b/lib/gat/pool/withcredentials.go deleted file mode 100644 index 9c7382b380788ed36b6df39d241f3b5498e50493..0000000000000000000000000000000000000000 --- a/lib/gat/pool/withcredentials.go +++ /dev/null @@ -1,8 +0,0 @@ -package pool - -import "gfx.cafe/gfx/pggat/lib/auth" - -type WithCredentials struct { - *Pool - Credentials auth.Credentials -} diff --git a/lib/gat/pool2/conn.go b/lib/gat/pool2/conn.go deleted file mode 100644 index 0381f9651f45eab8734b27496b46cf0cc87c84e3..0000000000000000000000000000000000000000 --- a/lib/gat/pool2/conn.go +++ /dev/null @@ -1,110 +0,0 @@ -package pool2 - -import ( - "sync" - "sync/atomic" - "time" - - "github.com/google/uuid" - - "gfx.cafe/gfx/pggat/lib/fed" - "gfx.cafe/gfx/pggat/lib/gat/metrics" -) - -type Conn struct { - ID uuid.UUID - Conn *fed.Conn - // Recipe that created this conn, optional. - Recipe string - - // metrics - - txnCount atomic.Int64 - - lastMetricsRead time.Time - - state metrics.ConnState - peer uuid.UUID - since time.Time - - util [metrics.ConnStateCount]time.Duration - - mu sync.RWMutex -} - -func NewConn(conn *fed.Conn) *Conn { - return &Conn{ - ID: uuid.New(), - Conn: conn, - } -} - -func (T *Conn) TransactionComplete() { - T.txnCount.Add(1) -} - -func (T *Conn) GetState() (metrics.ConnState, time.Time) { - T.mu.RLock() - defer T.mu.RUnlock() - - return T.state, T.since -} - -func (T *Conn) setState(now time.Time, state metrics.ConnState, peer uuid.UUID) { - T.mu.Lock() - defer T.mu.Unlock() - - var dur time.Duration - if T.since.Before(T.lastMetricsRead) { - dur = now.Sub(T.lastMetricsRead) - } else { - dur = now.Sub(T.since) - } - T.util[T.state] += dur - - T.state = state - T.peer = peer - T.since = now -} - -func SetConnState(state metrics.ConnState, conns ...*Conn) { - now := time.Now() - - for i, conn := range conns { - var peer uuid.UUID - if i == 0 { - if len(conns) > 1 { - peer = conns[1].ID - } - } else { - peer = conns[0].ID - } - conn.setState(now, state, peer) - } -} - -func (T *Conn) ReadMetrics(m *metrics.Conn) { - T.mu.Lock() - defer T.mu.Unlock() - - m.Time = time.Now() - - m.State = T.state - m.Peer = T.peer - m.Since = T.since - - m.Utilization = T.util - T.util = [metrics.ConnStateCount]time.Duration{} - - var dur time.Duration - if m.Since.Before(T.lastMetricsRead) { - dur = m.Time.Sub(T.lastMetricsRead) - } else { - dur = m.Time.Sub(m.Since) - } - m.Utilization[m.State] += dur - - m.TransactionCount = int(T.txnCount.Swap(0)) - - T.lastMetricsRead = m.Time -} diff --git a/lib/gat/pool2/errors.go b/lib/gat/pool2/errors.go deleted file mode 100644 index 3df9814de84ea2fd5b8884355b59438879f81e55..0000000000000000000000000000000000000000 --- a/lib/gat/pool2/errors.go +++ /dev/null @@ -1,5 +0,0 @@ -package pool2 - -import "errors" - -var ErrClosed = errors.New("pool closed") diff --git a/lib/gat/pooler.go b/lib/gat/pooler.go index 39d130621821146ecaaff5b708f35c4285ea913b..648dbdb584f11e728cd132855656345464d3b34a 100644 --- a/lib/gat/pooler.go +++ b/lib/gat/pooler.go @@ -1,5 +1,23 @@ package gat +import "github.com/google/uuid" + +type SyncMode int + +const ( + SyncModeNonBlocking SyncMode = iota + SyncModeBlocking +) + type Pooler interface { - NewPool() *Pool + AddClient(id uuid.UUID) + DeleteClient(client uuid.UUID) + + AddServer(id uuid.UUID) + DeleteServer(server uuid.UUID) + + Acquire(client uuid.UUID, sync SyncMode) (server uuid.UUID) + Release(server uuid.UUID) + + Close() } diff --git a/test/config.go b/test/config.go index bc13828dc196ddf6f3d9101ac0b4b12cd08081f7..57e2bd932cd7f44913ea447ceba43aa7c39b948b 100644 --- a/test/config.go +++ b/test/config.go @@ -1,12 +1,8 @@ package test -import ( - "gfx.cafe/gfx/pggat/lib/gat/pool/recipe" -) - type Config struct { // Stress is how many connections to run simultaneously for stress testing. <= 1 disables stress testing. Stress int - Modes map[string]recipe.Dialer + Modes map[string]pool.Dialer } diff --git a/test/runner.go b/test/runner.go index c1f24ccc563ed15570d3eb9d676e0a22651716c1..1ccba93fa82a4eb317c936f5a5ae5c0c9787d9b6 100644 --- a/test/runner.go +++ b/test/runner.go @@ -10,7 +10,6 @@ import ( "gfx.cafe/gfx/pggat/lib/fed" "gfx.cafe/gfx/pggat/lib/fed/middlewares/unterminate" packets "gfx.cafe/gfx/pggat/lib/fed/packets/v3.0" - "gfx.cafe/gfx/pggat/lib/gat/pool/recipe" "gfx.cafe/gfx/pggat/lib/gsql" "gfx.cafe/gfx/pggat/lib/util/flip" "gfx.cafe/gfx/pggat/test/inst" @@ -115,7 +114,7 @@ func (T *Runner) prepare(client *fed.Conn, until int) error { return client.Flush() } -func (T *Runner) runModeL1(dialer recipe.Dialer, client *fed.Conn) error { +func (T *Runner) runModeL1(dialer pool.Dialer, client *fed.Conn) error { server, err := dialer.Dial() if err != nil { return err @@ -148,7 +147,7 @@ func (T *Runner) runModeL1(dialer recipe.Dialer, client *fed.Conn) error { return nil } -func (T *Runner) runModeOnce(dialer recipe.Dialer) ([]byte, error) { +func (T *Runner) runModeOnce(dialer pool.Dialer) ([]byte, error) { inward, outward := gsql.NewPair() if err := T.prepare(inward, len(T.test.Instructions)); err != nil { return nil, err @@ -165,7 +164,7 @@ func (T *Runner) runModeOnce(dialer recipe.Dialer) ([]byte, error) { return io.ReadAll(inward.NetConn) } -func (T *Runner) runModeFail(dialer recipe.Dialer) error { +func (T *Runner) runModeFail(dialer pool.Dialer) error { for i := 1; i < len(T.test.Instructions)+1; i++ { inward, outward := gsql.NewPair() if err := T.prepare(inward, i); err != nil { @@ -180,7 +179,7 @@ func (T *Runner) runModeFail(dialer recipe.Dialer) error { return nil } -func (T *Runner) runMode(dialer recipe.Dialer) ([]byte, error) { +func (T *Runner) runMode(dialer pool.Dialer) ([]byte, error) { instances := T.config.Stress if instances < 1 || T.test.SideEffects { return T.runModeOnce(dialer) diff --git a/test/tester_test.go b/test/tester_test.go index 6ef9b43191e68e04c7132e452a46e5202fe78dbd..37890479a716eb78c25e34cbd2296b048bdda2a7 100644 --- a/test/tester_test.go +++ b/test/tester_test.go @@ -19,7 +19,6 @@ import ( "gfx.cafe/gfx/pggat/lib/gat/handlers/rewrite_password" "gfx.cafe/gfx/pggat/lib/gat/matchers" "gfx.cafe/gfx/pggat/lib/gat/pool" - "gfx.cafe/gfx/pggat/lib/gat/pool/recipe" "gfx.cafe/gfx/pggat/lib/gat/poolers/session" "gfx.cafe/gfx/pggat/lib/gat/poolers/transaction" "gfx.cafe/gfx/pggat/test" @@ -167,7 +166,7 @@ func daisyChain(config *gat.Config, control dialer, n int) (dialer, error) { } func TestTester(t *testing.T) { - control := recipe.Dialer{ + control := pool.Dialer{ Network: "tcp", Address: "localhost:5432", Username: "postgres", @@ -206,7 +205,7 @@ func TestTester(t *testing.T) { config.Servers = append(config.Servers, server) - transactionDialer := recipe.Dialer{ + transactionDialer := pool.Dialer{ Network: resolveNetwork(dialers["transaction"].Address), Address: dialers["transaction"].Address, Username: dialers["transaction"].Username, @@ -216,7 +215,7 @@ func TestTester(t *testing.T) { ), Database: "transaction", } - sessionDialer := recipe.Dialer{ + sessionDialer := pool.Dialer{ Network: resolveNetwork(dialers["transaction"].Address), Address: dialers["session"].Address, Username: dialers["session"].Username, @@ -245,7 +244,7 @@ func TestTester(t *testing.T) { tester := test.NewTester(test.Config{ Stress: 8, - Modes: map[string]recipe.Dialer{ + Modes: map[string]pool.Dialer{ "control": control, "transaction": transactionDialer, "session": sessionDialer,