diff --git a/lib/gat/acceptor.go b/lib/gat/acceptor.go index 87fdfe2aef4c02169dec77b2efe55fa337e77217..214ab12bb98ce2b9f7ab5e482aecc1f03d40c4c8 100644 --- a/lib/gat/acceptor.go +++ b/lib/gat/acceptor.go @@ -79,7 +79,7 @@ func serve(client fed.Conn, acceptParams frontends.AcceptParams, pools Pools) er pools.RegisterKey(authParams.BackendKey, acceptParams.User, acceptParams.Database) defer pools.UnregisterKey(authParams.BackendKey) - return p.Serve(client, acceptParams, authParams) + return p.Serve(client, acceptParams.InitialParameters, authParams.BackendKey) } func Serve(acceptor Acceptor, pools Pools) error { diff --git a/lib/gat/pool/client.go b/lib/gat/pool/client.go index d2c36e90e10a65e5382b7f063a7388d77e200dcf..04bab9289632d42ce9a43be594375bcacabbd8dc 100644 --- a/lib/gat/pool/client.go +++ b/lib/gat/pool/client.go @@ -1,76 +1,95 @@ package pool import ( - "sync" - "time" + "sync/atomic" "github.com/google/uuid" "pggat2/lib/fed" + "pggat2/lib/middleware" + "pggat2/lib/middleware/interceptor" + "pggat2/lib/middleware/middlewares/eqp" + "pggat2/lib/middleware/middlewares/ps" + "pggat2/lib/middleware/middlewares/unterminate" + "pggat2/lib/util/strutil" ) type Client struct { - conn fed.Conn - backendKey [8]byte + id uuid.UUID - metrics ItemMetrics - mu sync.RWMutex + conn fed.Conn + + ps *ps.Client + eqp *eqp.Client + + initialParameters map[strutil.CIString]string + backendKey [8]byte + + transactionCount atomic.Int64 } func NewClient( + options Options, conn fed.Conn, + initialParameters map[strutil.CIString]string, backendKey [8]byte, ) *Client { + middlewares := []middleware.Middleware{ + unterminate.Unterminate, + } + + var psClient *ps.Client + if options.ParameterStatusSync == ParameterStatusSyncDynamic { + // add ps middleware + psClient = ps.NewClient(initialParameters) + middlewares = append(middlewares, psClient) + } + + var eqpClient *eqp.Client + if options.ExtendedQuerySync { + // add eqp middleware + eqpClient = eqp.NewClient() + middlewares = append(middlewares, eqpClient) + } + + conn = interceptor.NewInterceptor( + conn, + middlewares..., + ) + return &Client{ + id: uuid.New(), conn: conn, + ps: psClient, + eqp: eqpClient, backendKey: backendKey, - - metrics: MakeItemMetrics(), } } -func (T *Client) GetConn() fed.Conn { - return T.conn +func (T *Client) GetID() uuid.UUID { + return T.id } -func (T *Client) GetBackendKey() [8]byte { - return T.backendKey -} - -// SetState replaces the peer. Returns the old peer -func (T *Client) SetState(state State, peer uuid.UUID) uuid.UUID { - T.mu.Lock() - defer T.mu.Unlock() - - old := T.metrics.Peer - T.metrics.SetState(state, peer) - return old +func (T *Client) GetConn() fed.Conn { + return T.conn } -func (T *Client) GetPeer() uuid.UUID { - T.mu.RLock() - defer T.mu.RUnlock() - - return T.metrics.Peer +func (T *Client) GetEQP() *eqp.Client { + return T.eqp } -func (T *Client) GetConnection() (uuid.UUID, time.Time) { - T.mu.RLock() - defer T.mu.RUnlock() - - return T.metrics.Peer, T.metrics.Since +func (T *Client) GetPS() *ps.Client { + return T.ps } func (T *Client) TransactionComplete() { - T.mu.Lock() - defer T.mu.Unlock() + T.transactionCount.Add(1) +} - T.metrics.Transactions++ +func (T *Client) GetInitialParameters() map[strutil.CIString]string { + return T.initialParameters } -func (T *Client) ReadMetrics(metrics *ItemMetrics) { - T.mu.Lock() - defer T.mu.Unlock() +func (T *Client) SetState(state State, peer uuid.UUID) { - T.metrics.Read(metrics) } diff --git a/lib/gat/pool/recipe/dialer/dialer.go b/lib/gat/pool/dialer/dialer.go similarity index 82% rename from lib/gat/pool/recipe/dialer/dialer.go rename to lib/gat/pool/dialer/dialer.go index b50d794767858fa1bf1f4ec254eb130c3d69ab54..21e03d0537df8f50a704921676c82010fe6deeb9 100644 --- a/lib/gat/pool/recipe/dialer/dialer.go +++ b/lib/gat/pool/dialer/dialer.go @@ -7,5 +7,5 @@ import ( type Dialer interface { Dial() (fed.Conn, backends.AcceptParams, error) - Cancel(cancelKey [8]byte) error + Cancel(key [8]byte) error } diff --git a/lib/gat/pool/recipe/dialer/net.go b/lib/gat/pool/dialer/net.go similarity index 87% rename from lib/gat/pool/recipe/dialer/net.go rename to lib/gat/pool/dialer/net.go index 0d24e894b3e662d12df7ef2ad17fbd6bc840fa8d..e4ee1341dd8be15694748e885ae508991d4f224e 100644 --- a/lib/gat/pool/recipe/dialer/net.go +++ b/lib/gat/pool/dialer/net.go @@ -24,11 +24,10 @@ func (T Net) Dial() (fed.Conn, backends.AcceptParams, error) { if err != nil { return nil, backends.AcceptParams{}, err } - return conn, params, nil } -func (T Net) Cancel(cancelKey [8]byte) error { +func (T Net) Cancel(key [8]byte) error { c, err := net.Dial(T.Network, T.Address) if err != nil { return err @@ -37,5 +36,7 @@ func (T Net) Cancel(cancelKey [8]byte) error { defer func() { _ = conn.Close() }() - return backends.Cancel(conn, cancelKey) + return backends.Cancel(conn, key) } + +var _ Dialer = Net{} diff --git a/lib/gat/pool/flow.go b/lib/gat/pool/flow.go new file mode 100644 index 0000000000000000000000000000000000000000..476fe204c274b251e33308d1578e801069e4d184 --- /dev/null +++ b/lib/gat/pool/flow.go @@ -0,0 +1,86 @@ +package pool + +import ( + "pggat2/lib/bouncer/backends/v0" + packets "pggat2/lib/fed/packets/v3.0" + "pggat2/lib/middleware/middlewares/ps" + "pggat2/lib/util/slices" +) + +func Pair(options Options, client *Client, server *Server) (clientErr, serverErr error) { + client.SetState(StateActive, server.GetID()) + server.SetState(StateActive, client.GetID()) + + switch options.ParameterStatusSync { + case ParameterStatusSyncDynamic: + clientErr, serverErr = ps.Sync(options.TrackedParameters, client.GetConn(), client.GetPS(), server.GetConn(), server.GetPS()) + case ParameterStatusSyncInitial: + clientErr, serverErr = SyncInitialParameters(options, client, server) + } + + if options.ExtendedQuerySync { + server.GetEQP().SetClient(client.GetEQP()) + } + + return +} + +func SyncInitialParameters(options Options, client *Client, server *Server) (clientErr, serverErr error) { + clientParams := client.GetInitialParameters() + serverParams := server.GetInitialParameters() + + for key, value := range clientParams { + setServer := slices.Contains(options.TrackedParameters, key) + + // skip already set params + if serverParams[key] == value { + setServer = false + } else if !setServer { + value = serverParams[key] + } + + p := packets.ParameterStatus{ + Key: key.String(), + Value: serverParams[key], + } + clientErr = client.GetConn().WritePacket(p.IntoPacket()) + if clientErr != nil { + return + } + + if !setServer { + continue + } + + serverErr = backends.SetParameter(new(backends.Context), server.GetConn(), key, value) + if serverErr != nil { + return + } + } + + for key, value := range serverParams { + if _, ok := clientParams[key]; ok { + continue + } + + // Don't need to run reset on server because it will reset it to the initial value + + // send to client + p := packets.ParameterStatus{ + Key: key.String(), + Value: value, + } + clientErr = client.GetConn().WritePacket(p.IntoPacket()) + if clientErr != nil { + return + } + } + + return + +} + +func TransactionComplete(client *Client, server *Server) { + client.TransactionComplete() + server.TransactionComplete() +} diff --git a/lib/gat/pool/metrics.go b/lib/gat/pool/metrics.go deleted file mode 100644 index 0ebeb95fad79c5ad44baf6d15b803e045c1d97e5..0000000000000000000000000000000000000000 --- a/lib/gat/pool/metrics.go +++ /dev/null @@ -1,200 +0,0 @@ -package pool - -import ( - "fmt" - "math" - "strconv" - "strings" - "time" - - "github.com/google/uuid" - - "pggat2/lib/util/maps" -) - -type State int - -const ( - StateActive State = iota - StateIdle - StateAwaitingServer - StateRunningResetQuery - - StateCount -) - -func (T State) String() string { - switch T { - case StateActive: - return "active" - case StateIdle: - return "idle" - case StateAwaitingServer: - return "awaiting server" - case StateRunningResetQuery: - return "running reset query" - default: - return "unknown state" - } -} - -type Metrics struct { - Servers map[uuid.UUID]ItemMetrics - Clients map[uuid.UUID]ItemMetrics -} - -func (T *Metrics) TransactionCount() int { - var serverTransactions int - var clientTransactions int - - for _, server := range T.Servers { - serverTransactions += server.Transactions - } - - for _, client := range T.Clients { - clientTransactions += client.Transactions - } - - if clientTransactions > serverTransactions { - return clientTransactions - } - return serverTransactions -} - -func stateCount(items map[uuid.UUID]ItemMetrics) [StateCount]int { - var states [StateCount]int - for _, item := range items { - states[item.State]++ - } - return states -} - -func stateUtil(items map[uuid.UUID]ItemMetrics) [StateCount]float64 { - var util [StateCount]time.Duration - var total time.Duration - for _, item := range items { - for state, amount := range item.InState { - util[state] += amount - total += amount - } - } - - var states [StateCount]float64 - for state := range states { - states[state] = float64(util[state]) / float64(total) - } - - return states -} - -func (T *Metrics) ServerStateCount() [StateCount]int { - return stateCount(T.Servers) -} - -func (T *Metrics) ServerStateUtil() [StateCount]float64 { - return stateUtil(T.Servers) -} - -func (T *Metrics) ClientStateCount() [StateCount]int { - return stateCount(T.Clients) -} - -func (T *Metrics) ClientStateUtil() [StateCount]float64 { - return stateUtil(T.Clients) -} - -func (T *Metrics) Clear() { - maps.Clear(T.Servers) - maps.Clear(T.Clients) -} - -func stateUtilString(count [StateCount]int, util [StateCount]float64) string { - var b strings.Builder - - var addSpace bool - for state, u := range util { - if u == 0.0 || math.IsNaN(u) { - continue - } - if addSpace { - b.WriteString(", ") - } else { - addSpace = true - } - b.WriteString(strconv.Itoa(count[state])) - b.WriteString(" ") - b.WriteString(State(state).String()) - b.WriteString(" (") - b.WriteString(strconv.FormatFloat(u*100, 'f', 2, 64)) - b.WriteString("%)") - } - - return b.String() -} - -func (T *Metrics) String() string { - return fmt.Sprintf("%d transactions | %d servers (%s) | %d clients (%s)", - T.TransactionCount(), - len(T.Servers), - stateUtilString(T.ServerStateCount(), T.ServerStateUtil()), - len(T.Clients), - stateUtilString(T.ClientStateCount(), T.ClientStateUtil()), - ) -} - -type ItemMetrics struct { - // Time is the time of this metrics read - Time time.Time - - State State - // Peer is the currently connected server or client - Peer uuid.UUID - // Since is the last time that Peer changed. - Since time.Time - - // InState is how long this item spent in each state - InState [StateCount]time.Duration - - // Transactions is the number of handled transactions since last metrics reset - Transactions int -} - -func MakeItemMetrics() ItemMetrics { - now := time.Now() - - return ItemMetrics{ - Time: now, - Since: now, - } -} - -func (T *ItemMetrics) commitSince(now time.Time) { - since := now.Sub(T.Since) - if T.Since.Before(T.Time) { - since = now.Sub(T.Time) - } - - T.InState[T.State] += since -} - -func (T *ItemMetrics) SetState(state State, peer uuid.UUID) { - now := time.Now() - - T.commitSince(now) - - T.Peer = peer - T.Since = now - T.State = state -} - -func (T *ItemMetrics) Read(metrics *ItemMetrics) { - now := time.Now() - - *metrics = *T - - metrics.commitSince(now) - - T.Time = now - T.InState = [StateCount]time.Duration{} - T.Transactions = 0 -} diff --git a/lib/gat/pool/metrics/conn.go b/lib/gat/pool/metrics/conn.go new file mode 100644 index 0000000000000000000000000000000000000000..42e32f6c8746dcd70cbf63951ac07dd3f4a9f837 --- /dev/null +++ b/lib/gat/pool/metrics/conn.go @@ -0,0 +1,24 @@ +package metrics + +import ( + "time" + + "github.com/google/uuid" +) + +type Conn struct { + // Time this report was generated + Time time.Time + + // Current state info + + State ConnState + Peer uuid.UUID + Since time.Time + + // Period metrics + + Utilization [ConnStateCount]time.Duration + + TransactionCount int +} diff --git a/lib/gat/pool/metrics/pool.go b/lib/gat/pool/metrics/pool.go new file mode 100644 index 0000000000000000000000000000000000000000..1fc744b385935f58d7c4284100f254cbd5c1737b --- /dev/null +++ b/lib/gat/pool/metrics/pool.go @@ -0,0 +1,8 @@ +package metrics + +import "github.com/google/uuid" + +type Pool struct { + Servers map[uuid.UUID]Conn + Clients map[uuid.UUID]Conn +} diff --git a/lib/gat/pool/metrics/state.go b/lib/gat/pool/metrics/state.go new file mode 100644 index 0000000000000000000000000000000000000000..4d6653da21f6c5b98d585234b094d2af24b98b1a --- /dev/null +++ b/lib/gat/pool/metrics/state.go @@ -0,0 +1,23 @@ +package metrics + +type ConnState int + +const ( + ConnStateActive ConnState = iota + ConnStateIdle + ConnStateAwaitingServer + ConnStateRunningResetQuery + + ConnStateCount +) + +var connStateString = [ConnStateCount]string{ + ConnStateActive: "active", + ConnStateIdle: "idle", + ConnStateAwaitingServer: "awaiting server", + ConnStateRunningResetQuery: "running reset query", +} + +func (T ConnState) String() string { + return connStateString[T] +} diff --git a/lib/gat/pool/options.go b/lib/gat/pool/options.go index 732cafd16fd2d402ff7d0dc530dd28a489f09f61..a6ad812de853c159cbdf5e1bde0e5b8668dd2ff3 100644 --- a/lib/gat/pool/options.go +++ b/lib/gat/pool/options.go @@ -21,8 +21,14 @@ const ( ) type Options struct { - Credentials auth.Credentials - Pooler Pooler + Credentials auth.Credentials + + Pooler Pooler + // ReleaseAfterTransaction toggles whether servers should be released and re acquired after each transaction. + // Use false for lower latency + // Use true for better balancing + ReleaseAfterTransaction bool + ServerResetQuery string // ServerIdleTimeout defines how long a server may be idle before it is disconnected ServerIdleTimeout time.Duration diff --git a/lib/gat/pool/pool.go b/lib/gat/pool/pool.go index 0550a6a15a3ccdbf73043c5317c2513d3d64f4c2..bd1ae4826df25c55f136e4e8b8c6890bfa46d040 100644 --- a/lib/gat/pool/pool.go +++ b/lib/gat/pool/pool.go @@ -2,577 +2,197 @@ package pool import ( "sync" - "time" - - "pggat2/lib/util/maps" "github.com/google/uuid" - "tuxpa.in/a/zlog/log" "pggat2/lib/auth" "pggat2/lib/bouncer/backends/v0" "pggat2/lib/bouncer/bouncers/v2" - "pggat2/lib/bouncer/frontends/v0" "pggat2/lib/fed" - packets "pggat2/lib/fed/packets/v3.0" - "pggat2/lib/middleware" - "pggat2/lib/middleware/interceptor" - "pggat2/lib/middleware/middlewares/eqp" - "pggat2/lib/middleware/middlewares/ps" - "pggat2/lib/middleware/middlewares/unterminate" - "pggat2/lib/util/slices" + "pggat2/lib/gat/pool/metrics" + "pggat2/lib/gat/pool/recipe" "pggat2/lib/util/strutil" ) -type poolRecipe struct { - recipe Recipe - - deleted bool - servers map[uuid.UUID]*Server - mu sync.RWMutex -} - -func (T *poolRecipe) AddServer(serverID uuid.UUID, server *Server) bool { - T.mu.Lock() - defer T.mu.Unlock() - - if T.deleted { - return false - } - - if T.recipe.MaxConnections != 0 && len(T.servers)+1 > T.recipe.MaxConnections { - return false - } - - if T.servers == nil { - T.servers = make(map[uuid.UUID]*Server) - } - T.servers[serverID] = server - return true -} - -func (T *poolRecipe) GetServer(serverID uuid.UUID) *Server { - T.mu.RLock() - defer T.mu.RUnlock() - - if T.deleted { - return nil - } - - return T.servers[serverID] -} - -func (T *poolRecipe) DeleteServer(serverID uuid.UUID) *Server { - T.mu.RLock() - defer T.mu.RUnlock() - - if T.deleted { - return nil - } - - server := T.servers[serverID] - delete(T.servers, serverID) - return server -} - -func (T *poolRecipe) Size() int { - T.mu.RLock() - defer T.mu.RUnlock() - - return len(T.servers) -} - -func (T *poolRecipe) RangeRLock(fn func(serverID uuid.UUID, server *Server) bool) bool { - T.mu.RLock() - defer T.mu.RUnlock() - - for serverID, server := range T.servers { - if !fn(serverID, server) { - return false - } - } - - return true -} - -func (T *poolRecipe) Delete(fn func(serverID uuid.UUID, server *Server)) { - T.mu.Lock() - defer T.mu.Unlock() - - T.deleted = true - for serverID, server := range T.servers { - fn(serverID, server) - delete(T.servers, serverID) - } -} - type Pool struct { options Options - recipes maps.RWLocked[string, *poolRecipe] - servers maps.RWLocked[uuid.UUID, *poolRecipe] - clients maps.RWLocked[uuid.UUID, *Client] + recipes map[string]*recipe.Recipe + clients map[uuid.UUID]*Client + servers map[uuid.UUID]*Server + mu sync.RWMutex } func NewPool(options Options) *Pool { - p := &Pool{ + return &Pool{ options: options, } - - if options.ServerIdleTimeout != 0 { - go p.idleTimeoutLoop() - } - - return p -} - -func (T *Pool) GetServer(serverID uuid.UUID) *Server { - recipe, _ := T.servers.Load(serverID) - if recipe == nil { - return nil - } - return recipe.GetServer(serverID) -} - -func (T *Pool) idlest() (idlest uuid.UUID, idle time.Time) { - T.recipes.Range(func(_ string, recipe *poolRecipe) bool { - recipe.RangeRLock(func(serverID uuid.UUID, server *Server) bool { - peer, since := server.GetConnection() - if peer != uuid.Nil { - return true - } - - if idle != (time.Time{}) && since.After(idle) { - return true - } - - idlest = serverID - idle = since - return true - }) - return true - }) - - return -} - -func (T *Pool) idleTimeoutLoop() { - for { - var wait time.Duration - - now := time.Now() - var idlest uuid.UUID - var idle time.Time - for idlest, idle = T.idlest(); idlest != uuid.Nil && now.Sub(idle) > T.options.ServerIdleTimeout; idlest, idle = T.idlest() { - T.removeServer(idlest) - } - - if idlest == uuid.Nil { - wait = T.options.ServerIdleTimeout - } else { - wait = idle.Add(T.options.ServerIdleTimeout).Sub(now) - } - - time.Sleep(wait) - } } func (T *Pool) GetCredentials() auth.Credentials { return T.options.Credentials } -func (T *Pool) scaleUpRecipe(r *poolRecipe) { - server, params, err := r.recipe.Dialer.Dial() - if err != nil { - log.Printf("failed to dial server: %v", err) - return - } - - var middlewares []middleware.Middleware +func (T *Pool) AddRecipe(name string, r *recipe.Recipe) { + T.mu.Lock() + defer T.mu.Unlock() - var psServer *ps.Server - if T.options.ParameterStatusSync == ParameterStatusSyncDynamic { - // add ps middleware - psServer = ps.NewServer(params.InitialParameters) - middlewares = append(middlewares, psServer) - } + T.removeRecipe(name) - var eqpServer *eqp.Server - if T.options.ExtendedQuerySync { - // add eqp middleware - eqpServer = eqp.NewServer() - middlewares = append(middlewares, eqpServer) + if T.recipes == nil { + T.recipes = make(map[string]*recipe.Recipe) } + T.recipes[name] = r - if len(middlewares) > 0 { - server = interceptor.NewInterceptor( - server, - middlewares..., - ) - } - - serverID := T.options.Pooler.NewServer() - ok := r.AddServer(serverID, NewServer( - server, - params.BackendKey, - params.InitialParameters, - - psServer, - eqpServer, - )) - if !ok { - _ = server.Close() - T.options.Pooler.DeleteServer(serverID) - return - } - T.servers.Store(serverID, r) + // TODO(garet) allocate servers until at the min } -func (T *Pool) AddRecipe(name string, recipe Recipe) { - r := &poolRecipe{ - recipe: recipe, - } - old, _ := T.recipes.Swap(name, r) - if old != nil { - old.Delete(func(serverID uuid.UUID, server *Server) { - _ = server.GetConn().Close() - T.options.Pooler.DeleteServer(serverID) - T.servers.Delete(serverID) - }) - } +func (T *Pool) RemoveRecipe(name string) { + T.mu.Lock() + defer T.mu.Unlock() - for i := 0; i < recipe.MinConnections; i++ { - T.scaleUpRecipe(r) - } + T.removeRecipe(name) } -func (T *Pool) RemoveRecipe(name string) { - old, _ := T.recipes.LoadAndDelete(name) - - if old == nil { +func (T *Pool) removeRecipe(name string) { + r, ok := T.recipes[name] + if !ok { return } + delete(T.recipes, name) - // close all servers with this recipe - - old.Delete(func(serverID uuid.UUID, server *Server) { - _ = server.GetConn().Close() - T.options.Pooler.DeleteServer(serverID) - T.servers.Delete(serverID) - }) + // TODO(garet) deallocate all servers created by recipe } -func (T *Pool) ScaleUp() { - failed := T.recipes.Range(func(_ string, r *poolRecipe) bool { - // this can race, but it will just dial an extra server and disconnect it in worst case - if r.recipe.MaxConnections == 0 || r.Size() < r.recipe.MaxConnections { - T.scaleUpRecipe(r) - return false - } - - return true - }) - if failed { - log.Println("No available recipe found to scale up") - } +func (T *Pool) scaleUp() { + // TODO(garet) } -func syncInitialParameters( - trackedParameters []strutil.CIString, - client fed.Conn, - clientParams map[strutil.CIString]string, - server fed.Conn, - serverParams map[strutil.CIString]string, -) (clientErr, serverErr error) { - for key, value := range clientParams { - setServer := slices.Contains(trackedParameters, key) - - // skip already set params - if serverParams[key] == value { - setServer = false - } else if !setServer { - value = serverParams[key] - } +func (T *Pool) removeServer(server *Server) { + T.mu.Lock() + defer T.mu.Unlock() - p := packets.ParameterStatus{ - Key: key.String(), - Value: serverParams[key], - } - clientErr = client.WritePacket(p.IntoPacket()) - if clientErr != nil { - return - } + delete(T.servers, server.GetID()) + T.options.Pooler.DeleteServer(server.GetID()) + _ = server.GetConn().Close() +} - if !setServer { - continue - } +func (T *Pool) acquireServer(client *Client) *Server { + client.SetState(StateAwaitingServer, uuid.Nil) - serverErr = backends.SetParameter(new(backends.Context), server, key, value) - if serverErr != nil { - return - } + serverID := T.options.Pooler.Acquire(client.GetID(), SyncModeNonBlocking) + if serverID == uuid.Nil { + // TODO(garet) can this be run on same thread and only create a goroutine if scaling is possible? + go T.scaleUp() + serverID = T.options.Pooler.Acquire(client.GetID(), SyncModeBlocking) } - for key, value := range serverParams { - if _, ok := clientParams[key]; ok { - continue - } + T.mu.RLock() + defer T.mu.RUnlock() + return T.servers[serverID] +} - // Don't need to run reset on server because it will reset it to the initial value +func (T *Pool) releaseServer(server *Server) { + server.SetState(StateRunningResetQuery, uuid.Nil) - // send to client - p := packets.ParameterStatus{ - Key: key.String(), - Value: value, - } - clientErr = client.WritePacket(p.IntoPacket()) - if clientErr != nil { + if T.options.ServerResetQuery != "" { + err := backends.QueryString(new(backends.Context), server.GetConn(), T.options.ServerResetQuery) + if err != nil { + T.removeServer(server) return } } - return + server.SetState(StateIdle, uuid.Nil) + + T.options.Pooler.Release(server.GetID()) } func (T *Pool) Serve( - client fed.Conn, - accept frontends.AcceptParams, - auth frontends.AuthenticateParams, + conn fed.Conn, + initialParameters map[strutil.CIString]string, + backendKey [8]byte, ) error { defer func() { - _ = client.Close() + _ = conn.Close() }() - middlewares := []middleware.Middleware{ - unterminate.Unterminate, - } - - var psClient *ps.Client - if T.options.ParameterStatusSync == ParameterStatusSyncDynamic { - // add ps middleware - psClient = ps.NewClient(accept.InitialParameters) - middlewares = append(middlewares, psClient) - } - - var eqpClient *eqp.Client - if T.options.ExtendedQuerySync { - // add eqp middleware - eqpClient = eqp.NewClient() - middlewares = append(middlewares, eqpClient) - } - - client = interceptor.NewInterceptor( - client, - middlewares..., + client := NewClient( + T.options, + conn, + initialParameters, + backendKey, ) - clientID := T.addClient(client, auth.BackendKey) - defer T.removeClient(clientID) + return T.serve(client) +} - var serverID uuid.UUID - var server *Server +func (T *Pool) serve(client *Client) error { + T.addClient(client) + defer T.removeClient(client) - defer func() { - if serverID != uuid.Nil { - T.releaseServer(serverID) - } - }() + var server *Server for { - packet, err := client.ReadPacket(true) + packet, err := client.GetConn().ReadPacket(true) if err != nil { + if server != nil { + T.releaseServer(server) + } return err } - var clientErr, serverErr error - if serverID == uuid.Nil { - serverID, server = T.acquireServer(clientID) - - switch T.options.ParameterStatusSync { - case ParameterStatusSyncDynamic: - clientErr, serverErr = ps.Sync(T.options.TrackedParameters, client, psClient, server.GetConn(), server.GetPSServer()) - case ParameterStatusSyncInitial: - clientErr, serverErr = syncInitialParameters(T.options.TrackedParameters, client, accept.InitialParameters, server.GetConn(), server.GetInitialParameters()) - } + var serverErr error + if server == nil { + server = T.acquireServer(client) - if T.options.ExtendedQuerySync { - server.GetEQPServer().SetClient(eqpClient) - } + err, serverErr = Pair(T.options, client, server) } - if clientErr == nil && serverErr == nil { - clientErr, serverErr = bouncers.Bounce(client, server.GetConn(), packet) + if err == nil && serverErr == nil { + err, serverErr = bouncers.Bounce(client.GetConn(), server.GetConn(), packet) } if serverErr != nil { - T.removeServer(serverID) - serverID = uuid.Nil - server = nil + T.removeServer(server) return serverErr } else { - T.transactionComplete(clientID, serverID) - if T.options.Pooler.ReleaseAfterTransaction() { - T.releaseServer(serverID) - serverID = uuid.Nil + TransactionComplete(client, server) + if T.options.ReleaseAfterTransaction { + client.SetState(StateIdle, uuid.Nil) + go T.releaseServer(server) // TODO(garet) does this need to be a goroutine server = nil } } - if clientErr != nil { - return clientErr - } - } -} - -func (T *Pool) addClient(client fed.Conn, key [8]byte) uuid.UUID { - clientID := T.options.Pooler.NewClient() - - T.clients.Store(clientID, NewClient( - client, - key, - )) - return clientID -} - -func (T *Pool) removeClient(clientID uuid.UUID) { - T.clients.Delete(clientID) - T.options.Pooler.DeleteClient(clientID) -} - -func (T *Pool) acquireServer(clientID uuid.UUID) (serverID uuid.UUID, server *Server) { - client, _ := T.clients.Load(clientID) - if client != nil { - client.SetState(StateAwaitingServer, uuid.Nil) - } - - serverID = T.options.Pooler.Acquire(clientID, SyncModeNonBlocking) - if serverID == uuid.Nil { - go T.ScaleUp() - serverID = T.options.Pooler.Acquire(clientID, SyncModeBlocking) - } - - server = T.GetServer(serverID) - if server != nil { - server.SetState(StateActive, clientID) - } - if client != nil { - client.SetState(StateActive, serverID) - } - return -} - -func (T *Pool) releaseServer(serverID uuid.UUID) { - server := T.GetServer(serverID) - if server == nil { - return - } - - clientID := server.SetState(StateRunningResetQuery, uuid.Nil) - - if clientID != uuid.Nil { - client, _ := T.clients.Load(clientID) - if client != nil { - client.SetState(StateIdle, uuid.Nil) - } - } - - if T.options.ServerResetQuery != "" { - err := backends.QueryString(new(backends.Context), server.GetConn(), T.options.ServerResetQuery) if err != nil { - T.removeServer(serverID) - return + if server != nil { + T.releaseServer(server) + } + return err } } - - server.SetState(StateIdle, uuid.Nil) - - T.options.Pooler.Release(serverID) } -func (T *Pool) transactionComplete(clientID, serverID uuid.UUID) { - func() { - server := T.GetServer(serverID) - if server == nil { - return - } - - server.TransactionComplete() - }() +func (T *Pool) addClient(client *Client) { + T.mu.Lock() + defer T.mu.Unlock() - client, _ := T.clients.Load(clientID) - if client == nil { - return + if T.clients == nil { + T.clients = make(map[uuid.UUID]*Client) } - - client.TransactionComplete() + T.clients[client.GetID()] = client } -func (T *Pool) removeServer(serverID uuid.UUID) { - recipe, _ := T.servers.LoadAndDelete(serverID) - if recipe == nil { - return - } - server := recipe.DeleteServer(serverID) - T.options.Pooler.DeleteServer(serverID) - if server == nil { - return - } - _ = server.GetConn().Close() +func (T *Pool) removeClient(client *Client) { + T.mu.Lock() + defer T.mu.Unlock() + + delete(T.clients, client.GetID()) } func (T *Pool) Cancel(key [8]byte) error { - var clientID uuid.UUID - T.clients.Range(func(id uuid.UUID, client *Client) bool { - if client.GetBackendKey() == key { - clientID = id - return false - } - return true - }) - - if clientID == uuid.Nil { - return nil - } - - // get peer - var r *poolRecipe - var serverKey [8]byte - if T.recipes.Range(func(_ string, recipe *poolRecipe) bool { - return recipe.RangeRLock(func(_ uuid.UUID, server *Server) bool { - if server.GetPeer() == clientID { - r = recipe - serverKey = server.GetBackendKey() - return false - } - return true - }) - }) { - return nil - } - return r.recipe.Dialer.Cancel(serverKey) } -func (T *Pool) ReadMetrics(metrics *Metrics) { - if metrics.Servers == nil { - metrics.Servers = make(map[uuid.UUID]ItemMetrics) - } - if metrics.Clients == nil { - metrics.Clients = make(map[uuid.UUID]ItemMetrics) - } +func (T *Pool) ReadMetrics(metrics *metrics.Pool) { - T.recipes.Range(func(_ string, recipe *poolRecipe) bool { - recipe.RangeRLock(func(serverID uuid.UUID, server *Server) bool { - var m ItemMetrics - server.ReadMetrics(&m) - metrics.Servers[serverID] = m - return true - }) - return true - }) - - T.clients.Range(func(clientID uuid.UUID, client *Client) bool { - var m ItemMetrics - client.ReadMetrics(&m) - metrics.Clients[clientID] = m - return true - }) } diff --git a/lib/gat/pool/pooler.go b/lib/gat/pool/pooler.go index 9ed1d4c464c515af9ed1b1cb9bf182b274277efe..a8d9d5068021faeddb0e4af6e10abc9171be1b8c 100644 --- a/lib/gat/pool/pooler.go +++ b/lib/gat/pool/pooler.go @@ -1,15 +1,11 @@ package pool -import ( - "github.com/google/uuid" -) +import "github.com/google/uuid" type SyncMode int const ( - // SyncModeNonBlocking will obtain a server without blocking SyncModeNonBlocking SyncMode = iota - // SyncModeBlocking will obtain a server by stalling SyncModeBlocking ) @@ -20,13 +16,6 @@ type Pooler interface { NewServer() uuid.UUID DeleteServer(server uuid.UUID) - // Acquire a peer with SyncMode - Acquire(client uuid.UUID, sync SyncMode) uuid.UUID - - // ReleaseAfterTransaction queries whether servers should be immediately released after a transaction is completed. - ReleaseAfterTransaction() bool - - // Release will force release the server. - // This should be called when the paired client has disconnected, or after CanRelease returns true. + Acquire(client uuid.UUID, sync SyncMode) (server uuid.UUID) Release(server uuid.UUID) } diff --git a/lib/gat/pool/recipe.go b/lib/gat/pool/recipe.go deleted file mode 100644 index 0dc7acfcceeb34714546250fb209cb3002db7933..0000000000000000000000000000000000000000 --- a/lib/gat/pool/recipe.go +++ /dev/null @@ -1,13 +0,0 @@ -package pool - -import ( - "pggat2/lib/gat/pool/recipe/dialer" -) - -type Recipe struct { - Dialer dialer.Dialer - MinConnections int - // MaxConnections is the max number of active server connections for this recipe. - // 0 = unlimited - MaxConnections int -} diff --git a/lib/gat/pool/recipe/options.go b/lib/gat/pool/recipe/options.go index 07b21307c88324ce8efa7fd8398871972af2ba1f..3c454875350a88236124e7b26c2cfc1cb35398f6 100644 --- a/lib/gat/pool/recipe/options.go +++ b/lib/gat/pool/recipe/options.go @@ -1,13 +1,12 @@ package recipe -import ( - "pggat2/lib/gat/pool/recipe/dialer" -) +import "pggat2/lib/gat/pool/dialer" type Options struct { Dialer dialer.Dialer MinConnections int - // MaxConnections is the max number of simultaneous connections from this recipe. 0 = unlimited + // MaxConnections is the max number of active server connections for this recipe. + // 0 = unlimited MaxConnections int } diff --git a/lib/gat/pool/recipe/recipe.go b/lib/gat/pool/recipe/recipe.go index 2e8077598e8d9b67f5437cdddd9f4984fd7be614..2ce6c30c09b7bffb8f1271bf150924c8c3d6358b 100644 --- a/lib/gat/pool/recipe/recipe.go +++ b/lib/gat/pool/recipe/recipe.go @@ -9,7 +9,3 @@ func NewRecipe(options Options) *Recipe { options: options, } } - -func (T *Recipe) Dial() { - -} diff --git a/lib/gat/pool/recipe/server.go b/lib/gat/pool/recipe/server.go deleted file mode 100644 index 95bf6346aba8036fd350439fa815580f9cbca92e..0000000000000000000000000000000000000000 --- a/lib/gat/pool/recipe/server.go +++ /dev/null @@ -1,4 +0,0 @@ -package recipe - -type Server struct { -} diff --git a/lib/gat/pool/server.go b/lib/gat/pool/server.go index 8a6f22dcb61a47faa2bc7ffae71974d28bb0dfa1..5f40612baeeaab74052ec1dfd37a1424907528eb 100644 --- a/lib/gat/pool/server.go +++ b/lib/gat/pool/server.go @@ -1,9 +1,6 @@ package pool import ( - "sync" - "time" - "github.com/google/uuid" "pggat2/lib/fed" @@ -13,91 +10,32 @@ import ( ) type Server struct { - conn fed.Conn - backendKey [8]byte - initialParameters map[strutil.CIString]string - - psServer *ps.Server - eqpServer *eqp.Server - - metrics ItemMetrics - mu sync.RWMutex } -func NewServer( - conn fed.Conn, - backendKey [8]byte, - initialParameters map[strutil.CIString]string, - - psServer *ps.Server, - eqpServer *eqp.Server, -) *Server { - return &Server{ - conn: conn, - backendKey: backendKey, - initialParameters: initialParameters, +func (T *Server) GetID() uuid.UUID { - psServer: psServer, - eqpServer: eqpServer, - - metrics: MakeItemMetrics(), - } } func (T *Server) GetConn() fed.Conn { - return T.conn -} - -func (T *Server) GetBackendKey() [8]byte { - return T.backendKey -} -func (T *Server) GetInitialParameters() map[strutil.CIString]string { - return T.initialParameters -} - -func (T *Server) GetPSServer() *ps.Server { - return T.psServer -} - -func (T *Server) GetEQPServer() *eqp.Server { - return T.eqpServer } -// SetState replaces the peer. Returns the old peer -func (T *Server) SetState(state State, peer uuid.UUID) uuid.UUID { - T.mu.Lock() - defer T.mu.Unlock() +func (T *Server) GetEQP() *eqp.Server { - old := T.metrics.Peer - T.metrics.SetState(state, peer) - return old } -func (T *Server) GetPeer() uuid.UUID { - T.mu.RLock() - defer T.mu.RUnlock() +func (T *Server) GetPS() *ps.Server { - return T.metrics.Peer } -func (T *Server) GetConnection() (uuid.UUID, time.Time) { - T.mu.RLock() - defer T.mu.RUnlock() +func (T *Server) TransactionComplete() { - return T.metrics.Peer, T.metrics.Since } -func (T *Server) TransactionComplete() { - T.mu.Lock() - defer T.mu.Unlock() +func (T *Server) GetInitialParameters() map[strutil.CIString]string { - T.metrics.Transactions++ } -func (T *Server) ReadMetrics(metrics *ItemMetrics) { - T.mu.Lock() - defer T.mu.Unlock() +func (T *Server) SetState(state State, peer uuid.UUID) { - T.metrics.Read(metrics) } diff --git a/lib/gat/pool/state.go b/lib/gat/pool/state.go new file mode 100644 index 0000000000000000000000000000000000000000..4bd6a1b47713acb3b2e2d2880ea648a12763bfd8 --- /dev/null +++ b/lib/gat/pool/state.go @@ -0,0 +1,10 @@ +package pool + +type State int + +const ( + StateActive State = iota + StateIdle + StateAwaitingServer + StateRunningResetQuery +)