From 528a888a7580e81bf6458c443c1ac8a3ee0e6b3f Mon Sep 17 00:00:00 2001
From: Garet Halliday <me@garet.holiday>
Date: Thu, 7 Sep 2023 16:15:29 -0500
Subject: [PATCH] better pool maybe?

---
 lib/gat/acceptor.go                        |   2 +-
 lib/gat/pool/client.go                     |  97 ++--
 lib/gat/pool/{recipe => }/dialer/dialer.go |   2 +-
 lib/gat/pool/{recipe => }/dialer/net.go    |   7 +-
 lib/gat/pool/flow.go                       |  86 +++
 lib/gat/pool/metrics.go                    | 200 -------
 lib/gat/pool/metrics/conn.go               |  24 +
 lib/gat/pool/metrics/pool.go               |   8 +
 lib/gat/pool/metrics/state.go              |  23 +
 lib/gat/pool/options.go                    |  10 +-
 lib/gat/pool/pool.go                       | 578 ++++-----------------
 lib/gat/pool/pooler.go                     |  15 +-
 lib/gat/pool/recipe.go                     |  13 -
 lib/gat/pool/recipe/options.go             |   7 +-
 lib/gat/pool/recipe/recipe.go              |   4 -
 lib/gat/pool/recipe/server.go              |   4 -
 lib/gat/pool/server.go                     |  74 +--
 lib/gat/pool/state.go                      |  10 +
 18 files changed, 333 insertions(+), 831 deletions(-)
 rename lib/gat/pool/{recipe => }/dialer/dialer.go (82%)
 rename lib/gat/pool/{recipe => }/dialer/net.go (87%)
 create mode 100644 lib/gat/pool/flow.go
 delete mode 100644 lib/gat/pool/metrics.go
 create mode 100644 lib/gat/pool/metrics/conn.go
 create mode 100644 lib/gat/pool/metrics/pool.go
 create mode 100644 lib/gat/pool/metrics/state.go
 delete mode 100644 lib/gat/pool/recipe.go
 delete mode 100644 lib/gat/pool/recipe/server.go
 create mode 100644 lib/gat/pool/state.go

diff --git a/lib/gat/acceptor.go b/lib/gat/acceptor.go
index 87fdfe2a..214ab12b 100644
--- a/lib/gat/acceptor.go
+++ b/lib/gat/acceptor.go
@@ -79,7 +79,7 @@ func serve(client fed.Conn, acceptParams frontends.AcceptParams, pools Pools) er
 	pools.RegisterKey(authParams.BackendKey, acceptParams.User, acceptParams.Database)
 	defer pools.UnregisterKey(authParams.BackendKey)
 
-	return p.Serve(client, acceptParams, authParams)
+	return p.Serve(client, acceptParams.InitialParameters, authParams.BackendKey)
 }
 
 func Serve(acceptor Acceptor, pools Pools) error {
diff --git a/lib/gat/pool/client.go b/lib/gat/pool/client.go
index d2c36e90..04bab928 100644
--- a/lib/gat/pool/client.go
+++ b/lib/gat/pool/client.go
@@ -1,76 +1,95 @@
 package pool
 
 import (
-	"sync"
-	"time"
+	"sync/atomic"
 
 	"github.com/google/uuid"
 
 	"pggat2/lib/fed"
+	"pggat2/lib/middleware"
+	"pggat2/lib/middleware/interceptor"
+	"pggat2/lib/middleware/middlewares/eqp"
+	"pggat2/lib/middleware/middlewares/ps"
+	"pggat2/lib/middleware/middlewares/unterminate"
+	"pggat2/lib/util/strutil"
 )
 
 type Client struct {
-	conn       fed.Conn
-	backendKey [8]byte
+	id uuid.UUID
 
-	metrics ItemMetrics
-	mu      sync.RWMutex
+	conn fed.Conn
+
+	ps  *ps.Client
+	eqp *eqp.Client
+
+	initialParameters map[strutil.CIString]string
+	backendKey        [8]byte
+
+	transactionCount atomic.Int64
 }
 
 func NewClient(
+	options Options,
 	conn fed.Conn,
+	initialParameters map[strutil.CIString]string,
 	backendKey [8]byte,
 ) *Client {
+	middlewares := []middleware.Middleware{
+		unterminate.Unterminate,
+	}
+
+	var psClient *ps.Client
+	if options.ParameterStatusSync == ParameterStatusSyncDynamic {
+		// add ps middleware
+		psClient = ps.NewClient(initialParameters)
+		middlewares = append(middlewares, psClient)
+	}
+
+	var eqpClient *eqp.Client
+	if options.ExtendedQuerySync {
+		// add eqp middleware
+		eqpClient = eqp.NewClient()
+		middlewares = append(middlewares, eqpClient)
+	}
+
+	conn = interceptor.NewInterceptor(
+		conn,
+		middlewares...,
+	)
+
 	return &Client{
+		id:         uuid.New(),
 		conn:       conn,
+		ps:         psClient,
+		eqp:        eqpClient,
 		backendKey: backendKey,
-
-		metrics: MakeItemMetrics(),
 	}
 }
 
-func (T *Client) GetConn() fed.Conn {
-	return T.conn
+func (T *Client) GetID() uuid.UUID {
+	return T.id
 }
 
-func (T *Client) GetBackendKey() [8]byte {
-	return T.backendKey
-}
-
-// SetState replaces the peer. Returns the old peer
-func (T *Client) SetState(state State, peer uuid.UUID) uuid.UUID {
-	T.mu.Lock()
-	defer T.mu.Unlock()
-
-	old := T.metrics.Peer
-	T.metrics.SetState(state, peer)
-	return old
+func (T *Client) GetConn() fed.Conn {
+	return T.conn
 }
 
-func (T *Client) GetPeer() uuid.UUID {
-	T.mu.RLock()
-	defer T.mu.RUnlock()
-
-	return T.metrics.Peer
+func (T *Client) GetEQP() *eqp.Client {
+	return T.eqp
 }
 
-func (T *Client) GetConnection() (uuid.UUID, time.Time) {
-	T.mu.RLock()
-	defer T.mu.RUnlock()
-
-	return T.metrics.Peer, T.metrics.Since
+func (T *Client) GetPS() *ps.Client {
+	return T.ps
 }
 
 func (T *Client) TransactionComplete() {
-	T.mu.Lock()
-	defer T.mu.Unlock()
+	T.transactionCount.Add(1)
+}
 
-	T.metrics.Transactions++
+func (T *Client) GetInitialParameters() map[strutil.CIString]string {
+	return T.initialParameters
 }
 
-func (T *Client) ReadMetrics(metrics *ItemMetrics) {
-	T.mu.Lock()
-	defer T.mu.Unlock()
+func (T *Client) SetState(state State, peer uuid.UUID) {
 
-	T.metrics.Read(metrics)
 }
diff --git a/lib/gat/pool/recipe/dialer/dialer.go b/lib/gat/pool/dialer/dialer.go
similarity index 82%
rename from lib/gat/pool/recipe/dialer/dialer.go
rename to lib/gat/pool/dialer/dialer.go
index b50d7947..21e03d05 100644
--- a/lib/gat/pool/recipe/dialer/dialer.go
+++ b/lib/gat/pool/dialer/dialer.go
@@ -7,5 +7,5 @@ import (
 
 type Dialer interface {
 	Dial() (fed.Conn, backends.AcceptParams, error)
-	Cancel(cancelKey [8]byte) error
+	Cancel(key [8]byte) error
 }
diff --git a/lib/gat/pool/recipe/dialer/net.go b/lib/gat/pool/dialer/net.go
similarity index 87%
rename from lib/gat/pool/recipe/dialer/net.go
rename to lib/gat/pool/dialer/net.go
index 0d24e894..e4ee1341 100644
--- a/lib/gat/pool/recipe/dialer/net.go
+++ b/lib/gat/pool/dialer/net.go
@@ -24,11 +24,10 @@ func (T Net) Dial() (fed.Conn, backends.AcceptParams, error) {
 	if err != nil {
 		return nil, backends.AcceptParams{}, err
 	}
-
 	return conn, params, nil
 }
 
-func (T Net) Cancel(cancelKey [8]byte) error {
+func (T Net) Cancel(key [8]byte) error {
 	c, err := net.Dial(T.Network, T.Address)
 	if err != nil {
 		return err
@@ -37,5 +36,7 @@ func (T Net) Cancel(cancelKey [8]byte) error {
 	defer func() {
 		_ = conn.Close()
 	}()
-	return backends.Cancel(conn, cancelKey)
+	return backends.Cancel(conn, key)
 }
+
+var _ Dialer = Net{}
diff --git a/lib/gat/pool/flow.go b/lib/gat/pool/flow.go
new file mode 100644
index 00000000..476fe204
--- /dev/null
+++ b/lib/gat/pool/flow.go
@@ -0,0 +1,86 @@
+package pool
+
+import (
+	"pggat2/lib/bouncer/backends/v0"
+	packets "pggat2/lib/fed/packets/v3.0"
+	"pggat2/lib/middleware/middlewares/ps"
+	"pggat2/lib/util/slices"
+)
+
+func Pair(options Options, client *Client, server *Server) (clientErr, serverErr error) {
+	client.SetState(StateActive, server.GetID())
+	server.SetState(StateActive, client.GetID())
+
+	switch options.ParameterStatusSync {
+	case ParameterStatusSyncDynamic:
+		clientErr, serverErr = ps.Sync(options.TrackedParameters, client.GetConn(), client.GetPS(), server.GetConn(), server.GetPS())
+	case ParameterStatusSyncInitial:
+		clientErr, serverErr = SyncInitialParameters(options, client, server)
+	}
+
+	if options.ExtendedQuerySync {
+		server.GetEQP().SetClient(client.GetEQP())
+	}
+
+	return
+}
+
+func SyncInitialParameters(options Options, client *Client, server *Server) (clientErr, serverErr error) {
+	clientParams := client.GetInitialParameters()
+	serverParams := server.GetInitialParameters()
+
+	for key, value := range clientParams {
+		setServer := slices.Contains(options.TrackedParameters, key)
+
+		// skip already set params
+		if serverParams[key] == value {
+			setServer = false
+		} else if !setServer {
+			value = serverParams[key]
+		}
+
+		p := packets.ParameterStatus{
+			Key:   key.String(),
+			Value: serverParams[key],
+		}
+		clientErr = client.GetConn().WritePacket(p.IntoPacket())
+		if clientErr != nil {
+			return
+		}
+
+		if !setServer {
+			continue
+		}
+
+		serverErr = backends.SetParameter(new(backends.Context), server.GetConn(), key, value)
+		if serverErr != nil {
+			return
+		}
+	}
+
+	for key, value := range serverParams {
+		if _, ok := clientParams[key]; ok {
+			continue
+		}
+
+		// Don't need to run reset on server because it will reset it to the initial value
+
+		// send to client
+		p := packets.ParameterStatus{
+			Key:   key.String(),
+			Value: value,
+		}
+		clientErr = client.GetConn().WritePacket(p.IntoPacket())
+		if clientErr != nil {
+			return
+		}
+	}
+
+	return
+
+}
+
+func TransactionComplete(client *Client, server *Server) {
+	client.TransactionComplete()
+	server.TransactionComplete()
+}
diff --git a/lib/gat/pool/metrics.go b/lib/gat/pool/metrics.go
deleted file mode 100644
index 0ebeb95f..00000000
--- a/lib/gat/pool/metrics.go
+++ /dev/null
@@ -1,200 +0,0 @@
-package pool
-
-import (
-	"fmt"
-	"math"
-	"strconv"
-	"strings"
-	"time"
-
-	"github.com/google/uuid"
-
-	"pggat2/lib/util/maps"
-)
-
-type State int
-
-const (
-	StateActive State = iota
-	StateIdle
-	StateAwaitingServer
-	StateRunningResetQuery
-
-	StateCount
-)
-
-func (T State) String() string {
-	switch T {
-	case StateActive:
-		return "active"
-	case StateIdle:
-		return "idle"
-	case StateAwaitingServer:
-		return "awaiting server"
-	case StateRunningResetQuery:
-		return "running reset query"
-	default:
-		return "unknown state"
-	}
-}
-
-type Metrics struct {
-	Servers map[uuid.UUID]ItemMetrics
-	Clients map[uuid.UUID]ItemMetrics
-}
-
-func (T *Metrics) TransactionCount() int {
-	var serverTransactions int
-	var clientTransactions int
-
-	for _, server := range T.Servers {
-		serverTransactions += server.Transactions
-	}
-
-	for _, client := range T.Clients {
-		clientTransactions += client.Transactions
-	}
-
-	if clientTransactions > serverTransactions {
-		return clientTransactions
-	}
-	return serverTransactions
-}
-
-func stateCount(items map[uuid.UUID]ItemMetrics) [StateCount]int {
-	var states [StateCount]int
-	for _, item := range items {
-		states[item.State]++
-	}
-	return states
-}
-
-func stateUtil(items map[uuid.UUID]ItemMetrics) [StateCount]float64 {
-	var util [StateCount]time.Duration
-	var total time.Duration
-	for _, item := range items {
-		for state, amount := range item.InState {
-			util[state] += amount
-			total += amount
-		}
-	}
-
-	var states [StateCount]float64
-	for state := range states {
-		states[state] = float64(util[state]) / float64(total)
-	}
-
-	return states
-}
-
-func (T *Metrics) ServerStateCount() [StateCount]int {
-	return stateCount(T.Servers)
-}
-
-func (T *Metrics) ServerStateUtil() [StateCount]float64 {
-	return stateUtil(T.Servers)
-}
-
-func (T *Metrics) ClientStateCount() [StateCount]int {
-	return stateCount(T.Clients)
-}
-
-func (T *Metrics) ClientStateUtil() [StateCount]float64 {
-	return stateUtil(T.Clients)
-}
-
-func (T *Metrics) Clear() {
-	maps.Clear(T.Servers)
-	maps.Clear(T.Clients)
-}
-
-func stateUtilString(count [StateCount]int, util [StateCount]float64) string {
-	var b strings.Builder
-
-	var addSpace bool
-	for state, u := range util {
-		if u == 0.0 || math.IsNaN(u) {
-			continue
-		}
-		if addSpace {
-			b.WriteString(", ")
-		} else {
-			addSpace = true
-		}
-		b.WriteString(strconv.Itoa(count[state]))
-		b.WriteString(" ")
-		b.WriteString(State(state).String())
-		b.WriteString(" (")
-		b.WriteString(strconv.FormatFloat(u*100, 'f', 2, 64))
-		b.WriteString("%)")
-	}
-
-	return b.String()
-}
-
-func (T *Metrics) String() string {
-	return fmt.Sprintf("%d transactions | %d servers (%s) | %d clients (%s)",
-		T.TransactionCount(),
-		len(T.Servers),
-		stateUtilString(T.ServerStateCount(), T.ServerStateUtil()),
-		len(T.Clients),
-		stateUtilString(T.ClientStateCount(), T.ClientStateUtil()),
-	)
-}
-
-type ItemMetrics struct {
-	// Time is the time of this metrics read
-	Time time.Time
-
-	State State
-	// Peer is the currently connected server or client
-	Peer uuid.UUID
-	// Since is the last time that Peer changed.
-	Since time.Time
-
-	// InState is how long this item spent in each state
-	InState [StateCount]time.Duration
-
-	// Transactions is the number of handled transactions since last metrics reset
-	Transactions int
-}
-
-func MakeItemMetrics() ItemMetrics {
-	now := time.Now()
-
-	return ItemMetrics{
-		Time:  now,
-		Since: now,
-	}
-}
-
-func (T *ItemMetrics) commitSince(now time.Time) {
-	since := now.Sub(T.Since)
-	if T.Since.Before(T.Time) {
-		since = now.Sub(T.Time)
-	}
-
-	T.InState[T.State] += since
-}
-
-func (T *ItemMetrics) SetState(state State, peer uuid.UUID) {
-	now := time.Now()
-
-	T.commitSince(now)
-
-	T.Peer = peer
-	T.Since = now
-	T.State = state
-}
-
-func (T *ItemMetrics) Read(metrics *ItemMetrics) {
-	now := time.Now()
-
-	*metrics = *T
-
-	metrics.commitSince(now)
-
-	T.Time = now
-	T.InState = [StateCount]time.Duration{}
-	T.Transactions = 0
-}
diff --git a/lib/gat/pool/metrics/conn.go b/lib/gat/pool/metrics/conn.go
new file mode 100644
index 00000000..42e32f6c
--- /dev/null
+++ b/lib/gat/pool/metrics/conn.go
@@ -0,0 +1,24 @@
+package metrics
+
+import (
+	"time"
+
+	"github.com/google/uuid"
+)
+
+type Conn struct {
+	// Time this report was generated
+	Time time.Time
+
+	// Current state info
+
+	State ConnState
+	Peer  uuid.UUID
+	Since time.Time
+
+	// Period metrics
+
+	Utilization [ConnStateCount]time.Duration
+
+	TransactionCount int
+}
diff --git a/lib/gat/pool/metrics/pool.go b/lib/gat/pool/metrics/pool.go
new file mode 100644
index 00000000..1fc744b3
--- /dev/null
+++ b/lib/gat/pool/metrics/pool.go
@@ -0,0 +1,8 @@
+package metrics
+
+import "github.com/google/uuid"
+
+type Pool struct {
+	Servers map[uuid.UUID]Conn
+	Clients map[uuid.UUID]Conn
+}
diff --git a/lib/gat/pool/metrics/state.go b/lib/gat/pool/metrics/state.go
new file mode 100644
index 00000000..4d6653da
--- /dev/null
+++ b/lib/gat/pool/metrics/state.go
@@ -0,0 +1,23 @@
+package metrics
+
+type ConnState int
+
+const (
+	ConnStateActive ConnState = iota
+	ConnStateIdle
+	ConnStateAwaitingServer
+	ConnStateRunningResetQuery
+
+	ConnStateCount
+)
+
+var connStateString = [ConnStateCount]string{
+	ConnStateActive:            "active",
+	ConnStateIdle:              "idle",
+	ConnStateAwaitingServer:    "awaiting server",
+	ConnStateRunningResetQuery: "running reset query",
+}
+
+func (T ConnState) String() string {
+	return connStateString[T]
+}
diff --git a/lib/gat/pool/options.go b/lib/gat/pool/options.go
index 732cafd1..a6ad812d 100644
--- a/lib/gat/pool/options.go
+++ b/lib/gat/pool/options.go
@@ -21,8 +21,14 @@ const (
 )
 
 type Options struct {
-	Credentials      auth.Credentials
-	Pooler           Pooler
+	Credentials auth.Credentials
+
+	Pooler Pooler
+	// ReleaseAfterTransaction toggles whether servers should be released and re acquired after each transaction.
+	// Use false for lower latency
+	// Use true for better balancing
+	ReleaseAfterTransaction bool
+
 	ServerResetQuery string
 	// ServerIdleTimeout defines how long a server may be idle before it is disconnected
 	ServerIdleTimeout time.Duration
diff --git a/lib/gat/pool/pool.go b/lib/gat/pool/pool.go
index 0550a6a1..bd1ae482 100644
--- a/lib/gat/pool/pool.go
+++ b/lib/gat/pool/pool.go
@@ -2,577 +2,197 @@ package pool
 
 import (
 	"sync"
-	"time"
-
-	"pggat2/lib/util/maps"
 
 	"github.com/google/uuid"
-	"tuxpa.in/a/zlog/log"
 
 	"pggat2/lib/auth"
 	"pggat2/lib/bouncer/backends/v0"
 	"pggat2/lib/bouncer/bouncers/v2"
-	"pggat2/lib/bouncer/frontends/v0"
 	"pggat2/lib/fed"
-	packets "pggat2/lib/fed/packets/v3.0"
-	"pggat2/lib/middleware"
-	"pggat2/lib/middleware/interceptor"
-	"pggat2/lib/middleware/middlewares/eqp"
-	"pggat2/lib/middleware/middlewares/ps"
-	"pggat2/lib/middleware/middlewares/unterminate"
-	"pggat2/lib/util/slices"
+	"pggat2/lib/gat/pool/metrics"
+	"pggat2/lib/gat/pool/recipe"
 	"pggat2/lib/util/strutil"
 )
 
-type poolRecipe struct {
-	recipe Recipe
-
-	deleted bool
-	servers map[uuid.UUID]*Server
-	mu      sync.RWMutex
-}
-
-func (T *poolRecipe) AddServer(serverID uuid.UUID, server *Server) bool {
-	T.mu.Lock()
-	defer T.mu.Unlock()
-
-	if T.deleted {
-		return false
-	}
-
-	if T.recipe.MaxConnections != 0 && len(T.servers)+1 > T.recipe.MaxConnections {
-		return false
-	}
-
-	if T.servers == nil {
-		T.servers = make(map[uuid.UUID]*Server)
-	}
-	T.servers[serverID] = server
-	return true
-}
-
-func (T *poolRecipe) GetServer(serverID uuid.UUID) *Server {
-	T.mu.RLock()
-	defer T.mu.RUnlock()
-
-	if T.deleted {
-		return nil
-	}
-
-	return T.servers[serverID]
-}
-
-func (T *poolRecipe) DeleteServer(serverID uuid.UUID) *Server {
-	T.mu.RLock()
-	defer T.mu.RUnlock()
-
-	if T.deleted {
-		return nil
-	}
-
-	server := T.servers[serverID]
-	delete(T.servers, serverID)
-	return server
-}
-
-func (T *poolRecipe) Size() int {
-	T.mu.RLock()
-	defer T.mu.RUnlock()
-
-	return len(T.servers)
-}
-
-func (T *poolRecipe) RangeRLock(fn func(serverID uuid.UUID, server *Server) bool) bool {
-	T.mu.RLock()
-	defer T.mu.RUnlock()
-
-	for serverID, server := range T.servers {
-		if !fn(serverID, server) {
-			return false
-		}
-	}
-
-	return true
-}
-
-func (T *poolRecipe) Delete(fn func(serverID uuid.UUID, server *Server)) {
-	T.mu.Lock()
-	defer T.mu.Unlock()
-
-	T.deleted = true
-	for serverID, server := range T.servers {
-		fn(serverID, server)
-		delete(T.servers, serverID)
-	}
-}
-
 type Pool struct {
 	options Options
 
-	recipes maps.RWLocked[string, *poolRecipe]
-	servers maps.RWLocked[uuid.UUID, *poolRecipe]
-	clients maps.RWLocked[uuid.UUID, *Client]
+	recipes map[string]*recipe.Recipe
+	clients map[uuid.UUID]*Client
+	servers map[uuid.UUID]*Server
+	mu      sync.RWMutex
 }
 
 func NewPool(options Options) *Pool {
-	p := &Pool{
+	return &Pool{
 		options: options,
 	}
-
-	if options.ServerIdleTimeout != 0 {
-		go p.idleTimeoutLoop()
-	}
-
-	return p
-}
-
-func (T *Pool) GetServer(serverID uuid.UUID) *Server {
-	recipe, _ := T.servers.Load(serverID)
-	if recipe == nil {
-		return nil
-	}
-	return recipe.GetServer(serverID)
-}
-
-func (T *Pool) idlest() (idlest uuid.UUID, idle time.Time) {
-	T.recipes.Range(func(_ string, recipe *poolRecipe) bool {
-		recipe.RangeRLock(func(serverID uuid.UUID, server *Server) bool {
-			peer, since := server.GetConnection()
-			if peer != uuid.Nil {
-				return true
-			}
-
-			if idle != (time.Time{}) && since.After(idle) {
-				return true
-			}
-
-			idlest = serverID
-			idle = since
-			return true
-		})
-		return true
-	})
-
-	return
-}
-
-func (T *Pool) idleTimeoutLoop() {
-	for {
-		var wait time.Duration
-
-		now := time.Now()
-		var idlest uuid.UUID
-		var idle time.Time
-		for idlest, idle = T.idlest(); idlest != uuid.Nil && now.Sub(idle) > T.options.ServerIdleTimeout; idlest, idle = T.idlest() {
-			T.removeServer(idlest)
-		}
-
-		if idlest == uuid.Nil {
-			wait = T.options.ServerIdleTimeout
-		} else {
-			wait = idle.Add(T.options.ServerIdleTimeout).Sub(now)
-		}
-
-		time.Sleep(wait)
-	}
 }
 
 func (T *Pool) GetCredentials() auth.Credentials {
 	return T.options.Credentials
 }
 
-func (T *Pool) scaleUpRecipe(r *poolRecipe) {
-	server, params, err := r.recipe.Dialer.Dial()
-	if err != nil {
-		log.Printf("failed to dial server: %v", err)
-		return
-	}
-
-	var middlewares []middleware.Middleware
+func (T *Pool) AddRecipe(name string, r *recipe.Recipe) {
+	T.mu.Lock()
+	defer T.mu.Unlock()
 
-	var psServer *ps.Server
-	if T.options.ParameterStatusSync == ParameterStatusSyncDynamic {
-		// add ps middleware
-		psServer = ps.NewServer(params.InitialParameters)
-		middlewares = append(middlewares, psServer)
-	}
+	T.removeRecipe(name)
 
-	var eqpServer *eqp.Server
-	if T.options.ExtendedQuerySync {
-		// add eqp middleware
-		eqpServer = eqp.NewServer()
-		middlewares = append(middlewares, eqpServer)
+	if T.recipes == nil {
+		T.recipes = make(map[string]*recipe.Recipe)
 	}
+	T.recipes[name] = r
 
-	if len(middlewares) > 0 {
-		server = interceptor.NewInterceptor(
-			server,
-			middlewares...,
-		)
-	}
-
-	serverID := T.options.Pooler.NewServer()
-	ok := r.AddServer(serverID, NewServer(
-		server,
-		params.BackendKey,
-		params.InitialParameters,
-
-		psServer,
-		eqpServer,
-	))
-	if !ok {
-		_ = server.Close()
-		T.options.Pooler.DeleteServer(serverID)
-		return
-	}
-	T.servers.Store(serverID, r)
+	// TODO(garet) allocate servers until at the min
 }
 
-func (T *Pool) AddRecipe(name string, recipe Recipe) {
-	r := &poolRecipe{
-		recipe: recipe,
-	}
-	old, _ := T.recipes.Swap(name, r)
-	if old != nil {
-		old.Delete(func(serverID uuid.UUID, server *Server) {
-			_ = server.GetConn().Close()
-			T.options.Pooler.DeleteServer(serverID)
-			T.servers.Delete(serverID)
-		})
-	}
+func (T *Pool) RemoveRecipe(name string) {
+	T.mu.Lock()
+	defer T.mu.Unlock()
 
-	for i := 0; i < recipe.MinConnections; i++ {
-		T.scaleUpRecipe(r)
-	}
+	T.removeRecipe(name)
 }
 
-func (T *Pool) RemoveRecipe(name string) {
-	old, _ := T.recipes.LoadAndDelete(name)
-
-	if old == nil {
+func (T *Pool) removeRecipe(name string) {
+	r, ok := T.recipes[name]
+	if !ok {
 		return
 	}
+	delete(T.recipes, name)
 
-	// close all servers with this recipe
-
-	old.Delete(func(serverID uuid.UUID, server *Server) {
-		_ = server.GetConn().Close()
-		T.options.Pooler.DeleteServer(serverID)
-		T.servers.Delete(serverID)
-	})
+	// TODO(garet) deallocate all servers created by recipe
 }
 
-func (T *Pool) ScaleUp() {
-	failed := T.recipes.Range(func(_ string, r *poolRecipe) bool {
-		// this can race, but it will just dial an extra server and disconnect it in worst case
-		if r.recipe.MaxConnections == 0 || r.Size() < r.recipe.MaxConnections {
-			T.scaleUpRecipe(r)
-			return false
-		}
-
-		return true
-	})
-	if failed {
-		log.Println("No available recipe found to scale up")
-	}
+func (T *Pool) scaleUp() {
+	// TODO(garet)
 }
 
-func syncInitialParameters(
-	trackedParameters []strutil.CIString,
-	client fed.Conn,
-	clientParams map[strutil.CIString]string,
-	server fed.Conn,
-	serverParams map[strutil.CIString]string,
-) (clientErr, serverErr error) {
-	for key, value := range clientParams {
-		setServer := slices.Contains(trackedParameters, key)
-
-		// skip already set params
-		if serverParams[key] == value {
-			setServer = false
-		} else if !setServer {
-			value = serverParams[key]
-		}
+func (T *Pool) removeServer(server *Server) {
+	T.mu.Lock()
+	defer T.mu.Unlock()
 
-		p := packets.ParameterStatus{
-			Key:   key.String(),
-			Value: serverParams[key],
-		}
-		clientErr = client.WritePacket(p.IntoPacket())
-		if clientErr != nil {
-			return
-		}
+	delete(T.servers, server.GetID())
+	T.options.Pooler.DeleteServer(server.GetID())
+	_ = server.GetConn().Close()
+}
 
-		if !setServer {
-			continue
-		}
+func (T *Pool) acquireServer(client *Client) *Server {
+	client.SetState(StateAwaitingServer, uuid.Nil)
 
-		serverErr = backends.SetParameter(new(backends.Context), server, key, value)
-		if serverErr != nil {
-			return
-		}
+	serverID := T.options.Pooler.Acquire(client.GetID(), SyncModeNonBlocking)
+	if serverID == uuid.Nil {
+		// TODO(garet) can this be run on same thread and only create a goroutine if scaling is possible?
+		go T.scaleUp()
+		serverID = T.options.Pooler.Acquire(client.GetID(), SyncModeBlocking)
 	}
 
-	for key, value := range serverParams {
-		if _, ok := clientParams[key]; ok {
-			continue
-		}
+	T.mu.RLock()
+	defer T.mu.RUnlock()
+	return T.servers[serverID]
+}
 
-		// Don't need to run reset on server because it will reset it to the initial value
+func (T *Pool) releaseServer(server *Server) {
+	server.SetState(StateRunningResetQuery, uuid.Nil)
 
-		// send to client
-		p := packets.ParameterStatus{
-			Key:   key.String(),
-			Value: value,
-		}
-		clientErr = client.WritePacket(p.IntoPacket())
-		if clientErr != nil {
+	if T.options.ServerResetQuery != "" {
+		err := backends.QueryString(new(backends.Context), server.GetConn(), T.options.ServerResetQuery)
+		if err != nil {
+			T.removeServer(server)
 			return
 		}
 	}
 
-	return
+	server.SetState(StateIdle, uuid.Nil)
+
+	T.options.Pooler.Release(server.GetID())
 }
 
 func (T *Pool) Serve(
-	client fed.Conn,
-	accept frontends.AcceptParams,
-	auth frontends.AuthenticateParams,
+	conn fed.Conn,
+	initialParameters map[strutil.CIString]string,
+	backendKey [8]byte,
 ) error {
 	defer func() {
-		_ = client.Close()
+		_ = conn.Close()
 	}()
 
-	middlewares := []middleware.Middleware{
-		unterminate.Unterminate,
-	}
-
-	var psClient *ps.Client
-	if T.options.ParameterStatusSync == ParameterStatusSyncDynamic {
-		// add ps middleware
-		psClient = ps.NewClient(accept.InitialParameters)
-		middlewares = append(middlewares, psClient)
-	}
-
-	var eqpClient *eqp.Client
-	if T.options.ExtendedQuerySync {
-		// add eqp middleware
-		eqpClient = eqp.NewClient()
-		middlewares = append(middlewares, eqpClient)
-	}
-
-	client = interceptor.NewInterceptor(
-		client,
-		middlewares...,
+	client := NewClient(
+		T.options,
+		conn,
+		initialParameters,
+		backendKey,
 	)
 
-	clientID := T.addClient(client, auth.BackendKey)
-	defer T.removeClient(clientID)
+	return T.serve(client)
+}
 
-	var serverID uuid.UUID
-	var server *Server
+func (T *Pool) serve(client *Client) error {
+	T.addClient(client)
+	defer T.removeClient(client)
 
-	defer func() {
-		if serverID != uuid.Nil {
-			T.releaseServer(serverID)
-		}
-	}()
+	var server *Server
 
 	for {
-		packet, err := client.ReadPacket(true)
+		packet, err := client.GetConn().ReadPacket(true)
 		if err != nil {
+			if server != nil {
+				T.releaseServer(server)
+			}
 			return err
 		}
 
-		var clientErr, serverErr error
-		if serverID == uuid.Nil {
-			serverID, server = T.acquireServer(clientID)
-
-			switch T.options.ParameterStatusSync {
-			case ParameterStatusSyncDynamic:
-				clientErr, serverErr = ps.Sync(T.options.TrackedParameters, client, psClient, server.GetConn(), server.GetPSServer())
-			case ParameterStatusSyncInitial:
-				clientErr, serverErr = syncInitialParameters(T.options.TrackedParameters, client, accept.InitialParameters, server.GetConn(), server.GetInitialParameters())
-			}
+		var serverErr error
+		if server == nil {
+			server = T.acquireServer(client)
 
-			if T.options.ExtendedQuerySync {
-				server.GetEQPServer().SetClient(eqpClient)
-			}
+			err, serverErr = Pair(T.options, client, server)
 		}
-		if clientErr == nil && serverErr == nil {
-			clientErr, serverErr = bouncers.Bounce(client, server.GetConn(), packet)
+		if err == nil && serverErr == nil {
+			err, serverErr = bouncers.Bounce(client.GetConn(), server.GetConn(), packet)
 		}
 		if serverErr != nil {
-			T.removeServer(serverID)
-			serverID = uuid.Nil
-			server = nil
+			T.removeServer(server)
 			return serverErr
 		} else {
-			T.transactionComplete(clientID, serverID)
-			if T.options.Pooler.ReleaseAfterTransaction() {
-				T.releaseServer(serverID)
-				serverID = uuid.Nil
+			TransactionComplete(client, server)
+			if T.options.ReleaseAfterTransaction {
+				client.SetState(StateIdle, uuid.Nil)
+				go T.releaseServer(server) // TODO(garet) does this need to be a goroutine
 				server = nil
 			}
 		}
 
-		if clientErr != nil {
-			return clientErr
-		}
-	}
-}
-
-func (T *Pool) addClient(client fed.Conn, key [8]byte) uuid.UUID {
-	clientID := T.options.Pooler.NewClient()
-
-	T.clients.Store(clientID, NewClient(
-		client,
-		key,
-	))
-	return clientID
-}
-
-func (T *Pool) removeClient(clientID uuid.UUID) {
-	T.clients.Delete(clientID)
-	T.options.Pooler.DeleteClient(clientID)
-}
-
-func (T *Pool) acquireServer(clientID uuid.UUID) (serverID uuid.UUID, server *Server) {
-	client, _ := T.clients.Load(clientID)
-	if client != nil {
-		client.SetState(StateAwaitingServer, uuid.Nil)
-	}
-
-	serverID = T.options.Pooler.Acquire(clientID, SyncModeNonBlocking)
-	if serverID == uuid.Nil {
-		go T.ScaleUp()
-		serverID = T.options.Pooler.Acquire(clientID, SyncModeBlocking)
-	}
-
-	server = T.GetServer(serverID)
-	if server != nil {
-		server.SetState(StateActive, clientID)
-	}
-	if client != nil {
-		client.SetState(StateActive, serverID)
-	}
-	return
-}
-
-func (T *Pool) releaseServer(serverID uuid.UUID) {
-	server := T.GetServer(serverID)
-	if server == nil {
-		return
-	}
-
-	clientID := server.SetState(StateRunningResetQuery, uuid.Nil)
-
-	if clientID != uuid.Nil {
-		client, _ := T.clients.Load(clientID)
-		if client != nil {
-			client.SetState(StateIdle, uuid.Nil)
-		}
-	}
-
-	if T.options.ServerResetQuery != "" {
-		err := backends.QueryString(new(backends.Context), server.GetConn(), T.options.ServerResetQuery)
 		if err != nil {
-			T.removeServer(serverID)
-			return
+			if server != nil {
+				T.releaseServer(server)
+			}
+			return err
 		}
 	}
-
-	server.SetState(StateIdle, uuid.Nil)
-
-	T.options.Pooler.Release(serverID)
 }
 
-func (T *Pool) transactionComplete(clientID, serverID uuid.UUID) {
-	func() {
-		server := T.GetServer(serverID)
-		if server == nil {
-			return
-		}
-
-		server.TransactionComplete()
-	}()
+func (T *Pool) addClient(client *Client) {
+	T.mu.Lock()
+	defer T.mu.Unlock()
 
-	client, _ := T.clients.Load(clientID)
-	if client == nil {
-		return
+	if T.clients == nil {
+		T.clients = make(map[uuid.UUID]*Client)
 	}
-
-	client.TransactionComplete()
+	T.clients[client.GetID()] = client
 }
 
-func (T *Pool) removeServer(serverID uuid.UUID) {
-	recipe, _ := T.servers.LoadAndDelete(serverID)
-	if recipe == nil {
-		return
-	}
-	server := recipe.DeleteServer(serverID)
-	T.options.Pooler.DeleteServer(serverID)
-	if server == nil {
-		return
-	}
-	_ = server.GetConn().Close()
+func (T *Pool) removeClient(client *Client) {
+	T.mu.Lock()
+	defer T.mu.Unlock()
+
+	delete(T.clients, client.GetID())
 }
 
 func (T *Pool) Cancel(key [8]byte) error {
-	var clientID uuid.UUID
-	T.clients.Range(func(id uuid.UUID, client *Client) bool {
-		if client.GetBackendKey() == key {
-			clientID = id
-			return false
-		}
-		return true
-	})
-
-	if clientID == uuid.Nil {
-		return nil
-	}
-
-	// get peer
-	var r *poolRecipe
-	var serverKey [8]byte
-	if T.recipes.Range(func(_ string, recipe *poolRecipe) bool {
-		return recipe.RangeRLock(func(_ uuid.UUID, server *Server) bool {
-			if server.GetPeer() == clientID {
-				r = recipe
-				serverKey = server.GetBackendKey()
-				return false
-			}
-			return true
-		})
-	}) {
-		return nil
-	}
 
-	return r.recipe.Dialer.Cancel(serverKey)
 }
 
-func (T *Pool) ReadMetrics(metrics *Metrics) {
-	if metrics.Servers == nil {
-		metrics.Servers = make(map[uuid.UUID]ItemMetrics)
-	}
-	if metrics.Clients == nil {
-		metrics.Clients = make(map[uuid.UUID]ItemMetrics)
-	}
+func (T *Pool) ReadMetrics(metrics *metrics.Pool) {
 
-	T.recipes.Range(func(_ string, recipe *poolRecipe) bool {
-		recipe.RangeRLock(func(serverID uuid.UUID, server *Server) bool {
-			var m ItemMetrics
-			server.ReadMetrics(&m)
-			metrics.Servers[serverID] = m
-			return true
-		})
-		return true
-	})
-
-	T.clients.Range(func(clientID uuid.UUID, client *Client) bool {
-		var m ItemMetrics
-		client.ReadMetrics(&m)
-		metrics.Clients[clientID] = m
-		return true
-	})
 }
diff --git a/lib/gat/pool/pooler.go b/lib/gat/pool/pooler.go
index 9ed1d4c4..a8d9d506 100644
--- a/lib/gat/pool/pooler.go
+++ b/lib/gat/pool/pooler.go
@@ -1,15 +1,11 @@
 package pool
 
-import (
-	"github.com/google/uuid"
-)
+import "github.com/google/uuid"
 
 type SyncMode int
 
 const (
-	// SyncModeNonBlocking will obtain a server without blocking
 	SyncModeNonBlocking SyncMode = iota
-	// SyncModeBlocking will obtain a server by stalling
 	SyncModeBlocking
 )
 
@@ -20,13 +16,6 @@ type Pooler interface {
 	NewServer() uuid.UUID
 	DeleteServer(server uuid.UUID)
 
-	// Acquire a peer with SyncMode
-	Acquire(client uuid.UUID, sync SyncMode) uuid.UUID
-
-	// ReleaseAfterTransaction queries whether servers should be immediately released after a transaction is completed.
-	ReleaseAfterTransaction() bool
-
-	// Release will force release the server.
-	// This should be called when the paired client has disconnected, or after CanRelease returns true.
+	Acquire(client uuid.UUID, sync SyncMode) (server uuid.UUID)
 	Release(server uuid.UUID)
 }
diff --git a/lib/gat/pool/recipe.go b/lib/gat/pool/recipe.go
deleted file mode 100644
index 0dc7acfc..00000000
--- a/lib/gat/pool/recipe.go
+++ /dev/null
@@ -1,13 +0,0 @@
-package pool
-
-import (
-	"pggat2/lib/gat/pool/recipe/dialer"
-)
-
-type Recipe struct {
-	Dialer         dialer.Dialer
-	MinConnections int
-	// MaxConnections is the max number of active server connections for this recipe.
-	// 0 = unlimited
-	MaxConnections int
-}
diff --git a/lib/gat/pool/recipe/options.go b/lib/gat/pool/recipe/options.go
index 07b21307..3c454875 100644
--- a/lib/gat/pool/recipe/options.go
+++ b/lib/gat/pool/recipe/options.go
@@ -1,13 +1,12 @@
 package recipe
 
-import (
-	"pggat2/lib/gat/pool/recipe/dialer"
-)
+import "pggat2/lib/gat/pool/dialer"
 
 type Options struct {
 	Dialer dialer.Dialer
 
 	MinConnections int
-	// MaxConnections is the max number of simultaneous connections from this recipe. 0 = unlimited
+	// MaxConnections is the max number of active server connections for this recipe.
+	// 0 = unlimited
 	MaxConnections int
 }
diff --git a/lib/gat/pool/recipe/recipe.go b/lib/gat/pool/recipe/recipe.go
index 2e807759..2ce6c30c 100644
--- a/lib/gat/pool/recipe/recipe.go
+++ b/lib/gat/pool/recipe/recipe.go
@@ -9,7 +9,3 @@ func NewRecipe(options Options) *Recipe {
 		options: options,
 	}
 }
-
-func (T *Recipe) Dial() {
-
-}
diff --git a/lib/gat/pool/recipe/server.go b/lib/gat/pool/recipe/server.go
deleted file mode 100644
index 95bf6346..00000000
--- a/lib/gat/pool/recipe/server.go
+++ /dev/null
@@ -1,4 +0,0 @@
-package recipe
-
-type Server struct {
-}
diff --git a/lib/gat/pool/server.go b/lib/gat/pool/server.go
index 8a6f22dc..5f40612b 100644
--- a/lib/gat/pool/server.go
+++ b/lib/gat/pool/server.go
@@ -1,9 +1,6 @@
 package pool
 
 import (
-	"sync"
-	"time"
-
 	"github.com/google/uuid"
 
 	"pggat2/lib/fed"
@@ -13,91 +10,32 @@ import (
 )
 
 type Server struct {
-	conn              fed.Conn
-	backendKey        [8]byte
-	initialParameters map[strutil.CIString]string
-
-	psServer  *ps.Server
-	eqpServer *eqp.Server
-
-	metrics ItemMetrics
-	mu      sync.RWMutex
 }
 
-func NewServer(
-	conn fed.Conn,
-	backendKey [8]byte,
-	initialParameters map[strutil.CIString]string,
-
-	psServer *ps.Server,
-	eqpServer *eqp.Server,
-) *Server {
-	return &Server{
-		conn:              conn,
-		backendKey:        backendKey,
-		initialParameters: initialParameters,
+func (T *Server) GetID() uuid.UUID {
 
-		psServer:  psServer,
-		eqpServer: eqpServer,
-
-		metrics: MakeItemMetrics(),
-	}
 }
 
 func (T *Server) GetConn() fed.Conn {
-	return T.conn
-}
-
-func (T *Server) GetBackendKey() [8]byte {
-	return T.backendKey
-}
 
-func (T *Server) GetInitialParameters() map[strutil.CIString]string {
-	return T.initialParameters
-}
-
-func (T *Server) GetPSServer() *ps.Server {
-	return T.psServer
-}
-
-func (T *Server) GetEQPServer() *eqp.Server {
-	return T.eqpServer
 }
 
-// SetState replaces the peer. Returns the old peer
-func (T *Server) SetState(state State, peer uuid.UUID) uuid.UUID {
-	T.mu.Lock()
-	defer T.mu.Unlock()
+func (T *Server) GetEQP() *eqp.Server {
 
-	old := T.metrics.Peer
-	T.metrics.SetState(state, peer)
-	return old
 }
 
-func (T *Server) GetPeer() uuid.UUID {
-	T.mu.RLock()
-	defer T.mu.RUnlock()
+func (T *Server) GetPS() *ps.Server {
 
-	return T.metrics.Peer
 }
 
-func (T *Server) GetConnection() (uuid.UUID, time.Time) {
-	T.mu.RLock()
-	defer T.mu.RUnlock()
+func (T *Server) TransactionComplete() {
 
-	return T.metrics.Peer, T.metrics.Since
 }
 
-func (T *Server) TransactionComplete() {
-	T.mu.Lock()
-	defer T.mu.Unlock()
+func (T *Server) GetInitialParameters() map[strutil.CIString]string {
 
-	T.metrics.Transactions++
 }
 
-func (T *Server) ReadMetrics(metrics *ItemMetrics) {
-	T.mu.Lock()
-	defer T.mu.Unlock()
+func (T *Server) SetState(state State, peer uuid.UUID) {
 
-	T.metrics.Read(metrics)
 }
diff --git a/lib/gat/pool/state.go b/lib/gat/pool/state.go
new file mode 100644
index 00000000..4bd6a1b4
--- /dev/null
+++ b/lib/gat/pool/state.go
@@ -0,0 +1,10 @@
+package pool
+
+type State int
+
+const (
+	StateActive State = iota
+	StateIdle
+	StateAwaitingServer
+	StateRunningResetQuery
+)
-- 
GitLab