diff --git a/lib/fed/middleware.go b/lib/fed/middleware.go
index c47a3f7043a23b693c72bacff6f43c6898ab283d..a9a889f73913cb64c2a267e518eddeafd9274a29 100644
--- a/lib/fed/middleware.go
+++ b/lib/fed/middleware.go
@@ -5,3 +5,14 @@ type Middleware interface {
ReadPacket(packet Packet) (Packet, error)
WritePacket(packet Packet) (Packet, error)
}
+
+func LookupMiddleware[T Middleware](conn *Conn) (T, bool) {
+ for _, mw := range conn.Middleware {
+ m, ok := mw.(T)
+ if ok {
+ return m, true
+ }
+ }
+
+ return *new(T), false
+}
diff --git a/lib/fed/middlewares/eqp/sync.go b/lib/fed/middlewares/eqp/sync.go
index 05fc56e53b2860c246faf1e5549f87a8ce9715dc..18ec8fcc1479b814c88fdc16e3dd161efd605065 100644
--- a/lib/fed/middlewares/eqp/sync.go
+++ b/lib/fed/middlewares/eqp/sync.go
@@ -20,7 +20,16 @@ func preparedStatementsEqual(a, b *packets.Parse) bool {
return true
}
-func Sync(c *Client, server *fed.Conn, s *Server) error {
+func Sync(client, server *fed.Conn) error {
+ c, ok := fed.LookupMiddleware[*Client](client)
+ if !ok {
+ panic("middleware not found")
+ }
+ s, ok := fed.LookupMiddleware[*Server](server)
+ if !ok {
+ panic("middleware not found")
+ }
+
var needsBackendSync bool
// close all portals on server
diff --git a/lib/fed/middlewares/ps/sync.go b/lib/fed/middlewares/ps/sync.go
index 8f9f19267e23c7918527a56c781338c06742a2be..dea000782b07a32a49aa1b96d669e87c20ec4e6d 100644
--- a/lib/fed/middlewares/ps/sync.go
+++ b/lib/fed/middlewares/ps/sync.go
@@ -55,7 +55,16 @@ func sync(tracking []strutil.CIString, client *fed.Conn, c *Client, server *fed.
return nil
}
-func Sync(tracking []strutil.CIString, client *fed.Conn, c *Client, server *fed.Conn, s *Server) (clientErr, serverErr error) {
+func Sync(tracking []strutil.CIString, client, server *fed.Conn) (clientErr, serverErr error) {
+ c, ok := fed.LookupMiddleware[*Client](client)
+ if !ok {
+ panic("middleware not found")
+ }
+ s, ok := fed.LookupMiddleware[*Server](server)
+ if !ok {
+ panic("middleware not found")
+ }
+
for name := range c.parameters {
if serverErr = sync(tracking, client, c, server, s, name); serverErr != nil {
return
@@ -63,7 +72,7 @@ func Sync(tracking []strutil.CIString, client *fed.Conn, c *Client, server *fed.
}
for name := range s.parameters {
- if _, ok := c.parameters[name]; ok {
+ if _, ok = c.parameters[name]; ok {
continue
}
if serverErr = sync(tracking, client, c, server, s, name); serverErr != nil {
diff --git a/lib/gat/pool/flow.go b/lib/gat/pool/flow.go
index 712255fcf206e81e5cfe29ee250290a0faa6d922..d4c2626237b1f52190849e169d6d070cf41cc60e 100644
--- a/lib/gat/pool/flow.go
+++ b/lib/gat/pool/flow.go
@@ -22,7 +22,7 @@ func pair(options Config, client *pooledClient, server *pooledServer) (clientErr
switch options.ParameterStatusSync {
case ParameterStatusSyncDynamic:
- clientErr, serverErr = ps.Sync(options.TrackedParameters, client.GetConn(), client.GetPS(), server.GetConn(), server.GetPS())
+ clientErr, serverErr = ps.Sync(options.TrackedParameters, client.GetConn(), server.GetConn())
case ParameterStatusSyncInitial:
clientErr, serverErr = syncInitialParameters(options, client, server)
}
@@ -32,7 +32,7 @@ func pair(options Config, client *pooledClient, server *pooledServer) (clientErr
}
if options.ExtendedQuerySync {
- serverErr = eqp.Sync(client.GetEQP(), server.GetConn(), server.GetEQP())
+ serverErr = eqp.Sync(server.GetConn(), server.GetConn())
}
return
diff --git a/lib/gat/pool2/clientpool/config.go b/lib/gat/pool2/clientpool/config.go
new file mode 100644
index 0000000000000000000000000000000000000000..c0a9d20147b9ba8f32e1e5b3fb2319ed756c9435
--- /dev/null
+++ b/lib/gat/pool2/clientpool/config.go
@@ -0,0 +1,18 @@
+package clientpool
+
+import (
+ "gfx.cafe/gfx/pggat/lib/gat/pool2/scalingpool"
+ "gfx.cafe/gfx/pggat/lib/util/strutil"
+)
+
+type Config struct {
+ // ReleaseAfterTransaction toggles whether servers should be released and re acquired after each transaction.
+ // Use false for lower latency
+ // Use true for better balancing
+ ReleaseAfterTransaction bool
+
+ // TrackedParameters are parameters which should be synced by updating the server, not the client.
+ TrackedParameters []strutil.CIString
+
+ scalingpool.Config
+}
diff --git a/lib/gat/pool2/clientpool/pool.go b/lib/gat/pool2/clientpool/pool.go
new file mode 100644
index 0000000000000000000000000000000000000000..0eeffc80a1be1c1a5ed39ab41b92e68b117c11de
--- /dev/null
+++ b/lib/gat/pool2/clientpool/pool.go
@@ -0,0 +1,245 @@
+package clientpool
+
+import (
+ "sync"
+
+ "gfx.cafe/gfx/pggat/lib/bouncer/backends/v0"
+ "gfx.cafe/gfx/pggat/lib/bouncer/bouncers/v2"
+ "gfx.cafe/gfx/pggat/lib/fed"
+ "gfx.cafe/gfx/pggat/lib/fed/middlewares/eqp"
+ "gfx.cafe/gfx/pggat/lib/fed/middlewares/ps"
+ packets "gfx.cafe/gfx/pggat/lib/fed/packets/v3.0"
+ "gfx.cafe/gfx/pggat/lib/gat/metrics"
+ "gfx.cafe/gfx/pggat/lib/gat/pool"
+ "gfx.cafe/gfx/pggat/lib/gat/pool/recipe"
+ "gfx.cafe/gfx/pggat/lib/gat/pool2"
+ "gfx.cafe/gfx/pggat/lib/gat/pool2/scalingpool"
+ "gfx.cafe/gfx/pggat/lib/util/slices"
+)
+
+type Pool struct {
+ config Config
+
+ servers scalingpool.Pool
+
+ clientsByBackendKey map[fed.BackendKey]*pool2.Conn
+ mu sync.RWMutex
+}
+
+func MakePool(config Config) Pool {
+ return Pool{
+ config: config,
+
+ servers: scalingpool.MakePool(config.Config),
+ }
+}
+
+func (T *Pool) AddRecipe(name string, r *recipe.Recipe) {
+ T.servers.AddRecipe(name, r)
+}
+
+func (T *Pool) RemoveRecipe(name string) {
+ T.servers.RemoveRecipe(name)
+}
+
+func (T *Pool) addClient(client *pool2.Conn) {
+ T.servers.AddClient(client)
+
+ T.mu.Lock()
+ defer T.mu.Unlock()
+ if T.clientsByBackendKey == nil {
+ T.clientsByBackendKey = make(map[fed.BackendKey]*pool2.Conn)
+ }
+ T.clientsByBackendKey[client.Conn.BackendKey] = client
+}
+
+func (T *Pool) removeClient(client *pool2.Conn) {
+ T.servers.RemoveClient(client)
+
+ T.mu.Lock()
+ defer T.mu.Unlock()
+ delete(T.clientsByBackendKey, client.Conn.BackendKey)
+}
+
+func (T *Pool) syncInitialParameters(client, server *pool2.Conn) (clientErr, serverErr error) {
+ clientParams := client.Conn.InitialParameters
+ serverParams := server.Conn.InitialParameters
+
+ for key, value := range clientParams {
+ // skip already set params
+ if serverParams[key] == value {
+ p := packets.ParameterStatus{
+ Key: key.String(),
+ Value: serverParams[key],
+ }
+ clientErr = client.Conn.WritePacket(&p)
+ if clientErr != nil {
+ return
+ }
+ continue
+ }
+
+ setServer := slices.Contains(T.config.TrackedParameters, key)
+
+ if !setServer {
+ value = serverParams[key]
+ }
+
+ p := packets.ParameterStatus{
+ Key: key.String(),
+ Value: value,
+ }
+ clientErr = client.Conn.WritePacket(&p)
+ if clientErr != nil {
+ return
+ }
+
+ if !setServer {
+ continue
+ }
+
+ serverErr, _ = backends.SetParameter(server.Conn, nil, key, value)
+ if serverErr != nil {
+ return
+ }
+ }
+
+ for key, value := range serverParams {
+ if _, ok := clientParams[key]; ok {
+ continue
+ }
+
+ // Don't need to run reset on server because it will reset it to the initial value
+
+ // send to client
+ p := packets.ParameterStatus{
+ Key: key.String(),
+ Value: value,
+ }
+ clientErr = client.Conn.WritePacket(&p)
+ if clientErr != nil {
+ return
+ }
+ }
+
+ return
+
+}
+
+func (T *Pool) pair(client, server *pool2.Conn) (err, serverErr error) {
+ if T.config.ParameterStatusSync != pool.ParameterStatusSyncNone || T.config.ExtendedQuerySync {
+ pool2.SetConnState(metrics.ConnStatePairing, client, server)
+ }
+
+ switch T.config.ParameterStatusSync {
+ case pool.ParameterStatusSyncDynamic:
+ err, serverErr = ps.Sync(T.config.TrackedParameters, client.Conn, server.Conn)
+ case pool.ParameterStatusSyncInitial:
+ err, serverErr = T.syncInitialParameters(client, server)
+ }
+
+ if err != nil || serverErr != nil {
+ return
+ }
+
+ if T.config.ExtendedQuerySync {
+ serverErr = eqp.Sync(client.Conn, server.Conn)
+ }
+
+ return
+}
+
+func (T *Pool) Serve(conn *fed.Conn) error {
+ if T.config.ExtendedQuerySync {
+ conn.Middleware = append(
+ conn.Middleware,
+ eqp.NewClient(),
+ )
+ }
+
+ if T.config.ParameterStatusSync == pool.ParameterStatusSyncDynamic {
+ conn.Middleware = append(
+ conn.Middleware,
+ ps.NewClient(conn.InitialParameters),
+ )
+ }
+
+ client := pool2.NewConn(conn)
+
+ T.addClient(client)
+ defer T.removeClient(client)
+
+ var err error
+ var serverErr error
+
+ var server *pool2.Conn
+ defer func() {
+ if server != nil {
+ if serverErr != nil {
+ T.servers.RemoveServer(server)
+ } else {
+ T.servers.Release(server)
+ }
+ server = nil
+ }
+ }()
+
+ if !client.Conn.Ready {
+ server = T.servers.Acquire(client)
+ if server == nil {
+ return pool2.ErrClosed
+ }
+
+ err, serverErr = T.pair(client, server)
+ if serverErr != nil {
+ return serverErr
+ }
+ if err != nil {
+ return err
+ }
+ }
+
+ for {
+ if server != nil && T.config.ReleaseAfterTransaction {
+ T.servers.Release(server)
+ server = nil
+ }
+
+ var packet fed.Packet
+ packet, err = client.Conn.ReadPacket(true)
+ if err != nil {
+ return err
+ }
+
+ if server == nil {
+ server = T.servers.Acquire(client)
+ if server == nil {
+ return pool2.ErrClosed
+ }
+
+ err, serverErr = T.pair(client, server)
+ }
+ if err == nil && serverErr == nil {
+ err, serverErr = bouncers.Bounce(client.Conn, server.Conn, packet)
+ }
+
+ if serverErr != nil {
+ return serverErr
+ } else {
+ client.TransactionComplete()
+ server.TransactionComplete()
+ }
+
+ if err != nil {
+ return err
+ }
+ }
+}
+
+func (T *Pool) Cancel(key fed.BackendKey) {
+ // TODO(garet)
+}
+
+func (T *Pool) Close() {
+ T.servers.Close()
+}
diff --git a/lib/gat/pool2/conn.go b/lib/gat/pool2/conn.go
new file mode 100644
index 0000000000000000000000000000000000000000..0381f9651f45eab8734b27496b46cf0cc87c84e3
--- /dev/null
+++ b/lib/gat/pool2/conn.go
@@ -0,0 +1,110 @@
+package pool2
+
+import (
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/google/uuid"
+
+ "gfx.cafe/gfx/pggat/lib/fed"
+ "gfx.cafe/gfx/pggat/lib/gat/metrics"
+)
+
+type Conn struct {
+ ID uuid.UUID
+ Conn *fed.Conn
+ // Recipe that created this conn, optional.
+ Recipe string
+
+ // metrics
+
+ txnCount atomic.Int64
+
+ lastMetricsRead time.Time
+
+ state metrics.ConnState
+ peer uuid.UUID
+ since time.Time
+
+ util [metrics.ConnStateCount]time.Duration
+
+ mu sync.RWMutex
+}
+
+func NewConn(conn *fed.Conn) *Conn {
+ return &Conn{
+ ID: uuid.New(),
+ Conn: conn,
+ }
+}
+
+func (T *Conn) TransactionComplete() {
+ T.txnCount.Add(1)
+}
+
+func (T *Conn) GetState() (metrics.ConnState, time.Time) {
+ T.mu.RLock()
+ defer T.mu.RUnlock()
+
+ return T.state, T.since
+}
+
+func (T *Conn) setState(now time.Time, state metrics.ConnState, peer uuid.UUID) {
+ T.mu.Lock()
+ defer T.mu.Unlock()
+
+ var dur time.Duration
+ if T.since.Before(T.lastMetricsRead) {
+ dur = now.Sub(T.lastMetricsRead)
+ } else {
+ dur = now.Sub(T.since)
+ }
+ T.util[T.state] += dur
+
+ T.state = state
+ T.peer = peer
+ T.since = now
+}
+
+func SetConnState(state metrics.ConnState, conns ...*Conn) {
+ now := time.Now()
+
+ for i, conn := range conns {
+ var peer uuid.UUID
+ if i == 0 {
+ if len(conns) > 1 {
+ peer = conns[1].ID
+ }
+ } else {
+ peer = conns[0].ID
+ }
+ conn.setState(now, state, peer)
+ }
+}
+
+func (T *Conn) ReadMetrics(m *metrics.Conn) {
+ T.mu.Lock()
+ defer T.mu.Unlock()
+
+ m.Time = time.Now()
+
+ m.State = T.state
+ m.Peer = T.peer
+ m.Since = T.since
+
+ m.Utilization = T.util
+ T.util = [metrics.ConnStateCount]time.Duration{}
+
+ var dur time.Duration
+ if m.Since.Before(T.lastMetricsRead) {
+ dur = m.Time.Sub(T.lastMetricsRead)
+ } else {
+ dur = m.Time.Sub(m.Since)
+ }
+ m.Utilization[m.State] += dur
+
+ m.TransactionCount = int(T.txnCount.Swap(0))
+
+ T.lastMetricsRead = m.Time
+}
diff --git a/lib/gat/pool2/errors.go b/lib/gat/pool2/errors.go
new file mode 100644
index 0000000000000000000000000000000000000000..3df9814de84ea2fd5b8884355b59438879f81e55
--- /dev/null
+++ b/lib/gat/pool2/errors.go
@@ -0,0 +1,5 @@
+package pool2
+
+import "errors"
+
+var ErrClosed = errors.New("pool closed")
diff --git a/lib/gat/pool2/recipepool/config.go b/lib/gat/pool2/recipepool/config.go
new file mode 100644
index 0000000000000000000000000000000000000000..e8740d8c79a24041d6048f5b7032bf2a010d7b5f
--- /dev/null
+++ b/lib/gat/pool2/recipepool/config.go
@@ -0,0 +1,18 @@
+package recipepool
+
+import (
+ "gfx.cafe/gfx/pggat/lib/gat/pool"
+ "gfx.cafe/gfx/pggat/lib/gat/pool2/serverpool"
+)
+
+type Config struct {
+ // ParameterStatusSync is the parameter syncing mode
+ ParameterStatusSync pool.ParameterStatusSync
+
+ // ExtendedQuerySync controls whether prepared statements and portals should be tracked and synced before use.
+ // Use false for lower latency
+ // Use true for transaction pooling
+ ExtendedQuerySync bool
+
+ serverpool.Config
+}
diff --git a/lib/gat/pool2/recipepool/pool.go b/lib/gat/pool2/recipepool/pool.go
new file mode 100644
index 0000000000000000000000000000000000000000..22e6cd307d80a0af457a3207fc42f1343e5cab4e
--- /dev/null
+++ b/lib/gat/pool2/recipepool/pool.go
@@ -0,0 +1,206 @@
+package recipepool
+
+import (
+ "sync"
+ "time"
+
+ "gfx.cafe/gfx/pggat/lib/fed"
+ "gfx.cafe/gfx/pggat/lib/gat/metrics"
+ "gfx.cafe/gfx/pggat/lib/gat/pool"
+ "gfx.cafe/gfx/pggat/lib/gat/pool/recipe"
+ "gfx.cafe/gfx/pggat/lib/gat/pool2"
+ "gfx.cafe/gfx/pggat/lib/gat/pool2/serverpool"
+ "gfx.cafe/gfx/pggat/lib/util/slices"
+)
+
+type Pool struct {
+ config Config
+
+ servers serverpool.Pool
+
+ recipes map[string]*Recipe
+ recipeScaleOrder slices.Sorted[string]
+ mu sync.RWMutex
+}
+
+func MakePool(config Config) Pool {
+ return Pool{
+ config: config,
+
+ servers: serverpool.MakePool(config.Config),
+ }
+}
+
+func (T *Pool) scaleUpL0() (string, *Recipe) {
+ T.mu.RLock()
+ defer T.mu.RUnlock()
+
+ for _, name := range T.recipeScaleOrder {
+ r := T.recipes[name]
+ if r.r.Allocate() {
+ return name, r
+ }
+ }
+
+ return "", nil
+}
+
+func (T *Pool) scaleUpL1(name string, r *Recipe) *pool2.Conn {
+ if r == nil {
+ return nil
+ }
+
+ return T.scaleUpL2(name, r, r.Dial())
+}
+
+func (T *Pool) scaleUpL2(name string, r *Recipe, conn *pool2.Conn) *pool2.Conn {
+ if conn == nil {
+ r.r.Free()
+ return nil
+ }
+
+ T.mu.Lock()
+ defer T.mu.Unlock()
+ if T.recipes[name] != r {
+ // recipe was removed
+ r.r.Free()
+ return nil
+ }
+
+ r.servers = append(r.servers, conn)
+ // update order
+ T.recipeScaleOrder.Update(slices.Index(T.recipeScaleOrder, name), func(n string) int {
+ return len(T.recipes[n].servers)
+ })
+ return nil
+}
+
+// ScaleUp will attempt to allocate a new server connection. Returns whether the operation was successful.
+func (T *Pool) ScaleUp() bool {
+ conn := T.scaleUpL1(T.scaleUpL0())
+ if conn == nil {
+ return false
+ }
+
+ T.servers.AddServer(conn)
+ return true
+}
+
+func (T *Pool) ScaleDown(idleFor time.Duration) time.Duration {
+ return T.servers.ScaleDown(idleFor)
+}
+
+func (T *Pool) AddClient(client *pool2.Conn) {
+ T.servers.AddClient(client)
+}
+
+func (T *Pool) RemoveClient(client *pool2.Conn) {
+ T.servers.RemoveClient(client)
+}
+
+func (T *Pool) removeRecipe(name string) *Recipe {
+ r, ok := T.recipes[name]
+ if !ok {
+ return nil
+ }
+ delete(T.recipes, name)
+ T.recipeScaleOrder = slices.Delete(T.recipeScaleOrder, name)
+
+ return r
+}
+
+func (T *Pool) addRecipe(name string, r *Recipe) {
+ if T.recipes == nil {
+ T.recipes = make(map[string]*Recipe)
+ }
+ T.recipes[name] = r
+
+ // insert
+ T.recipeScaleOrder = T.recipeScaleOrder.Insert(name, func(n string) int {
+ return len(T.recipes[name].servers)
+ })
+}
+
+func (T *Pool) AddRecipe(name string, r *recipe.Recipe) {
+ added := NewRecipe(T.config.ParameterStatusSync, T.config.ExtendedQuerySync, r)
+
+ for _, server := range added.servers {
+ T.servers.AddServer(server)
+ }
+
+ var removed *Recipe
+
+ func() {
+ T.mu.Lock()
+ defer T.mu.Unlock()
+
+ removed = T.removeRecipe(name)
+ T.addRecipe(name, added)
+ }()
+
+ for _, server := range removed.servers {
+ T.servers.RemoveServer(server)
+ }
+}
+
+func (T *Pool) RemoveRecipe(name string) {
+ var removed *Recipe
+
+ func() {
+ T.mu.Lock()
+ defer T.mu.Unlock()
+
+ removed = T.removeRecipe(name)
+ }()
+
+ for _, server := range removed.servers {
+ T.servers.RemoveServer(server)
+ }
+}
+
+func (T *Pool) RemoveServer(server *pool2.Conn) {
+ T.servers.RemoveServer(server)
+
+ // update recipe
+ T.mu.Lock()
+ defer T.mu.Unlock()
+ r, ok := T.recipes[server.Recipe]
+ if !ok {
+ return
+ }
+ r.RemoveServer(server)
+}
+
+func (T *Pool) Acquire(client *pool2.Conn, mode pool.SyncMode) (server *pool2.Conn) {
+ return T.servers.Acquire(client, mode)
+}
+
+func (T *Pool) Release(server *pool2.Conn) {
+ T.servers.Release(server)
+}
+
+func (T *Pool) ReadMetrics(m *metrics.Pool) {
+ T.servers.ReadMetrics(m)
+}
+
+func (T *Pool) Cancel(key fed.BackendKey) {
+ T.mu.RLock()
+ defer T.mu.RUnlock()
+
+ for _, r := range T.recipes {
+ for _, s := range r.servers {
+ if s.Conn.BackendKey == key {
+ r.Cancel(key)
+ return
+ }
+ }
+ }
+}
+
+func (T *Pool) Close() {
+ T.servers.Close()
+
+ T.mu.Lock()
+ defer T.mu.Unlock()
+ clear(T.recipes)
+}
diff --git a/lib/gat/pool2/recipepool/recipe.go b/lib/gat/pool2/recipepool/recipe.go
new file mode 100644
index 0000000000000000000000000000000000000000..4ec588050a9d517ec56d9ac46a84b68aedc69cc8
--- /dev/null
+++ b/lib/gat/pool2/recipepool/recipe.go
@@ -0,0 +1,108 @@
+package recipepool
+
+import (
+ "log"
+
+ "gfx.cafe/gfx/pggat/lib/fed"
+ "gfx.cafe/gfx/pggat/lib/fed/middlewares/eqp"
+ "gfx.cafe/gfx/pggat/lib/fed/middlewares/ps"
+ "gfx.cafe/gfx/pggat/lib/gat/pool"
+ "gfx.cafe/gfx/pggat/lib/gat/pool/recipe"
+ "gfx.cafe/gfx/pggat/lib/gat/pool2"
+ "gfx.cafe/gfx/pggat/lib/util/slices"
+)
+
+type Recipe struct {
+ parameterStatusSync pool.ParameterStatusSync
+ extendedQuerySync bool
+
+ r *recipe.Recipe
+
+ servers []*pool2.Conn
+}
+
+func NewRecipe(parameterStatusSync pool.ParameterStatusSync, extendedQuerySync bool, r *recipe.Recipe) *Recipe {
+ s := &Recipe{
+ parameterStatusSync: parameterStatusSync,
+ extendedQuerySync: extendedQuerySync,
+
+ r: r,
+ }
+ s.init()
+ return s
+}
+
+func (T *Recipe) init() {
+ count := T.r.AllocateInitial()
+ T.servers = make([]*pool2.Conn, 0, count)
+
+ for i := 0; i < count; i++ {
+ conn := T.dial()
+ if conn == nil {
+ T.r.Free()
+ }
+ T.servers = append(T.servers, conn)
+ }
+}
+
+func (T *Recipe) dial() *pool2.Conn {
+ conn, err := T.r.Dial()
+ if err != nil {
+ // TODO(garet) use proper logger
+ log.Printf("failed to dial server: %v", err)
+ return nil
+ }
+
+ if T.extendedQuerySync {
+ conn.Middleware = append(
+ conn.Middleware,
+ eqp.NewServer(),
+ )
+ }
+
+ if T.parameterStatusSync == pool.ParameterStatusSyncDynamic {
+ conn.Middleware = append(
+ conn.Middleware,
+ ps.NewServer(conn.InitialParameters),
+ )
+ }
+
+ return pool2.NewConn(conn)
+}
+
+func (T *Recipe) Dial() *pool2.Conn {
+ if !T.r.Allocate() {
+ return nil
+ }
+
+ c := T.dial()
+ if c == nil {
+ T.r.Free()
+ }
+ return c
+}
+
+func (T *Recipe) Cancel(key fed.BackendKey) {
+ T.r.Cancel(key)
+}
+
+func (T *Recipe) TryRemoveServer(server *pool2.Conn) bool {
+ idx := slices.Index(T.servers, server)
+ if idx == -1 {
+ return false
+ }
+ if !T.r.TryFree() {
+ return false
+ }
+ T.servers = slices.DeleteIndex(T.servers, idx)
+ return true
+}
+
+func (T *Recipe) RemoveServer(server *pool2.Conn) {
+ idx := slices.Index(T.servers, server)
+ if idx == -1 {
+ return
+ }
+ T.servers = slices.DeleteIndex(T.servers, idx)
+ T.r.Free()
+}
diff --git a/lib/gat/pool2/scalingpool/config.go b/lib/gat/pool2/scalingpool/config.go
new file mode 100644
index 0000000000000000000000000000000000000000..a1d5240d9f90fd841f943c86f3d6ccfd3517c3b9
--- /dev/null
+++ b/lib/gat/pool2/scalingpool/config.go
@@ -0,0 +1,22 @@
+package scalingpool
+
+import (
+ "time"
+
+ "gfx.cafe/gfx/pggat/lib/gat/pool2/recipepool"
+)
+
+type Config struct {
+ // ServerIdleTimeout defines how long a server may be idle before it is disconnected.
+ // 0 = disable
+ ServerIdleTimeout time.Duration
+
+ // ServerReconnectInitialTime defines how long to wait initially before attempting a server reconnect
+ // 0 = disable, don't retry
+ ServerReconnectInitialTime time.Duration
+ // ServerReconnectMaxTime defines the max amount of time to wait before attempting a server reconnect
+ // 0 = disable, back off infinitely
+ ServerReconnectMaxTime time.Duration
+
+ recipepool.Config
+}
diff --git a/lib/gat/pool2/scalingpool/pool.go b/lib/gat/pool2/scalingpool/pool.go
new file mode 100644
index 0000000000000000000000000000000000000000..c5fa7d1cc32eeab9cb12d564c8fc07f6098ea2e2
--- /dev/null
+++ b/lib/gat/pool2/scalingpool/pool.go
@@ -0,0 +1,152 @@
+package scalingpool
+
+import (
+ "sync/atomic"
+ "time"
+
+ "gfx.cafe/gfx/pggat/lib/fed"
+ "gfx.cafe/gfx/pggat/lib/gat/metrics"
+ "gfx.cafe/gfx/pggat/lib/gat/pool"
+ "gfx.cafe/gfx/pggat/lib/gat/pool/recipe"
+ "gfx.cafe/gfx/pggat/lib/gat/pool2"
+ "gfx.cafe/gfx/pggat/lib/gat/pool2/recipepool"
+)
+
+const pendingScaleUpSize = 4
+
+type Pool struct {
+ config Config
+
+ servers recipepool.Pool
+
+ started atomic.Bool
+
+ scale chan struct{}
+ closed chan struct{}
+}
+
+func MakePool(config Config) Pool {
+ return Pool{
+ config: config,
+
+ servers: recipepool.MakePool(config.Config),
+
+ scale: make(chan struct{}, pendingScaleUpSize),
+ closed: make(chan struct{}),
+ }
+}
+
+func (T *Pool) init() {
+ if !T.started.Swap(true) {
+ go T.scaleLoop()
+ }
+}
+
+func (T *Pool) AddClient(client *pool2.Conn) {
+ T.servers.AddClient(client)
+}
+
+func (T *Pool) RemoveClient(client *pool2.Conn) {
+ T.servers.RemoveClient(client)
+}
+
+func (T *Pool) AddRecipe(name string, r *recipe.Recipe) {
+ T.init()
+
+ T.servers.AddRecipe(name, r)
+}
+
+func (T *Pool) RemoveRecipe(name string) {
+ T.servers.RemoveRecipe(name)
+}
+
+func (T *Pool) RemoveServer(server *pool2.Conn) {
+ T.servers.RemoveServer(server)
+}
+
+func (T *Pool) Acquire(client *pool2.Conn) (server *pool2.Conn) {
+ server = T.servers.Acquire(client, pool.SyncModeNonBlocking)
+ if server == nil {
+ select {
+ case T.scale <- struct{}{}:
+ default:
+ }
+
+ server = T.servers.Acquire(client, pool.SyncModeBlocking)
+ }
+
+ return
+}
+
+func (T *Pool) Release(server *pool2.Conn) {
+ T.servers.Release(server)
+}
+
+func (T *Pool) Cancel(key fed.BackendKey) {
+ T.servers.Cancel(key)
+}
+
+func (T *Pool) ReadMetrics(m *metrics.Pool) {
+ T.servers.ReadMetrics(m)
+}
+
+func (T *Pool) Close() {
+ close(T.closed)
+
+ T.servers.Close()
+}
+
+func (T *Pool) scaleLoop() {
+ var idle *time.Timer
+ if T.config.ServerIdleTimeout != 0 {
+ idle = time.NewTimer(T.config.ServerIdleTimeout)
+ }
+
+ var backoff time.Duration
+ var scale *time.Timer
+
+ for {
+ var idle1 <-chan time.Time
+ if idle != nil {
+ idle1 = idle.C
+ }
+
+ var scale1 <-chan struct{}
+ var scale2 <-chan time.Time
+ if backoff != 0 {
+ scale1 = T.scale
+ } else {
+ scale2 = scale.C
+ }
+
+ select {
+ case <-idle1:
+ idle.Reset(T.servers.ScaleDown(T.config.ServerIdleTimeout))
+ case <-scale1:
+ if !T.servers.ScaleUp() {
+ backoff = T.config.ServerReconnectInitialTime
+ if backoff == 0 {
+ continue
+ }
+ if scale == nil {
+ scale = time.NewTimer(backoff)
+ } else {
+ scale.Reset(backoff)
+ }
+ continue
+ }
+ case <-scale2:
+ if !T.servers.ScaleUp() {
+ backoff *= 2
+ if T.config.ServerReconnectMaxTime != 0 && backoff > T.config.ServerReconnectMaxTime {
+ backoff = T.config.ServerReconnectMaxTime
+ }
+ scale.Reset(backoff)
+ continue
+ }
+ backoff = 0
+ case <-T.closed:
+ return
+ }
+ }
+}
diff --git a/lib/gat/pool2/serverpool/config.go b/lib/gat/pool2/serverpool/config.go
new file mode 100644
index 0000000000000000000000000000000000000000..ed14cd0e8d6261e96ef1d860e4be9beed3df42dc
--- /dev/null
+++ b/lib/gat/pool2/serverpool/config.go
@@ -0,0 +1,10 @@
+package serverpool
+
+import "gfx.cafe/gfx/pggat/lib/gat/pool"
+
+type Config struct {
+ // NewPooler allocates a new pooler
+ NewPooler func() pool.Pooler
+ // ServerResetQuery is the query to be run before the server is released
+ ServerResetQuery string
+}
diff --git a/lib/gat/pool2/serverpool/pool.go b/lib/gat/pool2/serverpool/pool.go
new file mode 100644
index 0000000000000000000000000000000000000000..f9841986f29bf4f371c1cb9fb4a1d56f98d08c9e
--- /dev/null
+++ b/lib/gat/pool2/serverpool/pool.go
@@ -0,0 +1,151 @@
+package serverpool
+
+import (
+ "sync"
+ "time"
+
+ "github.com/google/uuid"
+
+ "gfx.cafe/gfx/pggat/lib/bouncer/backends/v0"
+ "gfx.cafe/gfx/pggat/lib/gat/metrics"
+ "gfx.cafe/gfx/pggat/lib/gat/pool"
+ "gfx.cafe/gfx/pggat/lib/gat/pool2"
+)
+
+type Pool struct {
+ config Config
+ pooler pool.Pooler
+
+ servers map[uuid.UUID]*pool2.Conn
+ mu sync.RWMutex
+}
+
+func MakePool(config Config) Pool {
+ return Pool{
+ config: config,
+ pooler: config.NewPooler(),
+ }
+}
+
+func (T *Pool) AddClient(client *pool2.Conn) {
+ T.pooler.AddClient(client.ID)
+}
+
+func (T *Pool) RemoveClient(client *pool2.Conn) {
+ T.pooler.DeleteClient(client.ID)
+}
+
+func (T *Pool) AddServer(server *pool2.Conn) {
+ func() {
+ T.mu.Lock()
+ defer T.mu.Unlock()
+ if T.servers == nil {
+ T.servers = make(map[uuid.UUID]*pool2.Conn)
+ }
+ T.servers[server.ID] = server
+ }()
+ T.pooler.AddServer(server.ID)
+}
+
+func (T *Pool) RemoveServer(server *pool2.Conn) {
+ T.pooler.DeleteServer(server.ID)
+
+ T.mu.Lock()
+ defer T.mu.Unlock()
+ delete(T.servers, server.ID)
+}
+
+func (T *Pool) Acquire(client *pool2.Conn, mode pool.SyncMode) (server *pool2.Conn) {
+ for {
+ serverID := T.pooler.Acquire(client.ID, mode)
+ if serverID == uuid.Nil {
+ return
+ }
+
+ T.mu.RLock()
+ server, _ = T.servers[serverID]
+ T.mu.RUnlock()
+
+ if server != nil {
+ return
+ }
+
+ T.pooler.DeleteServer(serverID)
+ }
+}
+
+func (T *Pool) Release(server *pool2.Conn) {
+ if T.config.ServerResetQuery != "" {
+ pool2.SetConnState(metrics.ConnStateRunningResetQuery, server)
+
+ err, _ := backends.QueryString(server.Conn, nil, T.config.ServerResetQuery)
+ if err != nil {
+ T.RemoveServer(server)
+ return
+ }
+ }
+
+ pool2.SetConnState(metrics.ConnStateIdle, server)
+
+ T.pooler.Release(server.ID)
+}
+
+// ScaleDown removes any servers that have been idle for longer than idleFor. Returns the next time to attempt to scale
+// down again
+func (T *Pool) ScaleDown(idleFor time.Duration) time.Duration {
+ T.mu.Lock()
+ defer T.mu.Unlock()
+
+ now := time.Now()
+
+ var oldest time.Time
+
+ for id, server := range T.servers {
+ state, since := server.GetState()
+ if state != metrics.ConnStateIdle {
+ continue
+ }
+
+ dur := now.Sub(since)
+ if dur > idleFor {
+ T.pooler.DeleteServer(id)
+ delete(T.servers, id)
+ }
+
+ if oldest != (time.Time{}) && since.Before(oldest) {
+ oldest = since
+ }
+ }
+
+ dur := now.Sub(oldest)
+ if dur > idleFor {
+ dur = idleFor
+ }
+
+ return dur
+}
+
+func (T *Pool) ReadMetrics(m *metrics.Pool) {
+ T.mu.RLock()
+ defer T.mu.RUnlock()
+
+ if m.Servers == nil {
+ m.Servers = make(map[uuid.UUID]metrics.Conn)
+ }
+
+ for id, server := range T.servers {
+ var c metrics.Conn
+ server.ReadMetrics(&c)
+ m.Servers[id] = c
+ }
+}
+
+func (T *Pool) Close() {
+ T.mu.Lock()
+ defer T.mu.Unlock()
+
+ for id, server := range T.servers {
+ _ = server.Conn.Close()
+ delete(T.servers, id)
+ }
+}