From 51eb0e8776a918ab7de5def9a3bba12219a1ecaa Mon Sep 17 00:00:00 2001 From: Garet Halliday <me@garet.holiday> Date: Thu, 7 Sep 2023 18:17:09 -0500 Subject: [PATCH] just need metrics --- lib/gat/pool/client.go | 44 +++--------- lib/gat/pool/conn.go | 88 +++++++++++++++++++++++ lib/gat/pool/pool.go | 129 ++++++++++++++++++++++++++++++---- lib/gat/pool/recipe/recipe.go | 18 +++++ lib/gat/pool/server.go | 47 +++---------- lib/gat/pool/state.go | 4 +- 6 files changed, 245 insertions(+), 85 deletions(-) create mode 100644 lib/gat/pool/conn.go diff --git a/lib/gat/pool/client.go b/lib/gat/pool/client.go index 04bab928..e8a7d1f6 100644 --- a/lib/gat/pool/client.go +++ b/lib/gat/pool/client.go @@ -1,8 +1,6 @@ package pool import ( - "sync/atomic" - "github.com/google/uuid" "pggat2/lib/fed" @@ -15,17 +13,10 @@ import ( ) type Client struct { - id uuid.UUID - - conn fed.Conn + Conn ps *ps.Client eqp *eqp.Client - - initialParameters map[strutil.CIString]string - backendKey [8]byte - - transactionCount atomic.Int64 } func NewClient( @@ -58,22 +49,17 @@ func NewClient( ) return &Client{ - id: uuid.New(), - conn: conn, - ps: psClient, - eqp: eqpClient, - backendKey: backendKey, + Conn: MakeConn( + uuid.New(), + conn, + initialParameters, + backendKey, + ), + ps: psClient, + eqp: eqpClient, } } -func (T *Client) GetID() uuid.UUID { - return T.id -} - -func (T *Client) GetConn() fed.Conn { - return T.conn -} - func (T *Client) GetEQP() *eqp.Client { return T.eqp } @@ -81,15 +67,3 @@ func (T *Client) GetEQP() *eqp.Client { func (T *Client) GetPS() *ps.Client { return T.ps } - -func (T *Client) TransactionComplete() { - T.transactionCount.Add(1) -} - -func (T *Client) GetInitialParameters() map[strutil.CIString]string { - return T.initialParameters -} - -func (T *Client) SetState(state State, peer uuid.UUID) { - -} diff --git a/lib/gat/pool/conn.go b/lib/gat/pool/conn.go new file mode 100644 index 00000000..4da7df2a --- /dev/null +++ b/lib/gat/pool/conn.go @@ -0,0 +1,88 @@ +package pool + +import ( + "sync" + "sync/atomic" + "time" + + "github.com/google/uuid" + + "pggat2/lib/fed" + "pggat2/lib/gat/metrics" + "pggat2/lib/util/strutil" +) + +type Conn struct { + id uuid.UUID + + conn fed.Conn + + initialParameters map[strutil.CIString]string + backendKey [8]byte + + // metrics + + transactionCount atomic.Int64 + + state State + peer uuid.UUID + since time.Time + mu sync.RWMutex +} + +func MakeConn( + id uuid.UUID, + conn fed.Conn, + initialParameters map[strutil.CIString]string, + backendKey [8]byte, +) Conn { + return Conn{ + id: id, + conn: conn, + initialParameters: initialParameters, + backendKey: backendKey, + + since: time.Now(), + } +} + +func (T *Conn) GetID() uuid.UUID { + return T.id +} + +func (T *Conn) GetConn() fed.Conn { + return T.conn +} + +func (T *Conn) GetInitialParameters() map[strutil.CIString]string { + return T.initialParameters +} + +func (T *Conn) GetBackendKey() [8]byte { + return T.backendKey +} + +func (T *Conn) TransactionComplete() { + T.transactionCount.Add(1) +} + +func (T *Conn) SetState(state State, peer uuid.UUID) { + T.mu.Lock() + defer T.mu.Unlock() + T.state = state + T.peer = peer + T.since = time.Now() +} + +func (T *Conn) GetState() (state State, peer uuid.UUID, since time.Time) { + T.mu.RLock() + defer T.mu.Unlock() + state = T.state + peer = T.peer + since = T.since + return +} + +func (T *Conn) ReadMetrics(m *metrics.Conn) { + +} diff --git a/lib/gat/pool/pool.go b/lib/gat/pool/pool.go index 1a1657cb..e0aa8533 100644 --- a/lib/gat/pool/pool.go +++ b/lib/gat/pool/pool.go @@ -2,6 +2,7 @@ package pool import ( "sync" + "time" "github.com/google/uuid" @@ -20,15 +21,62 @@ type Pool struct { recipes map[string]*recipe.Recipe clients map[uuid.UUID]*Client + clientsByKey map[[8]byte]*Client servers map[uuid.UUID]*Server serversByRecipe map[string][]*Server mu sync.RWMutex } func NewPool(options Options) *Pool { - return &Pool{ + p := &Pool{ options: options, } + + if options.ServerIdleTimeout != 0 { + go p.idleLoop() + } + + return p +} + +func (T *Pool) idlest() (server *Server, at time.Time) { + T.mu.RLock() + defer T.mu.RUnlock() + + for _, s := range T.servers { + state, _, since := s.GetState() + if state != StateIdle { + continue + } + + if at == (time.Time{}) || since.Before(at) { + server = s + at = since + } + } + + return +} + +func (T *Pool) idleLoop() { + for { + var wait time.Duration + + now := time.Now() + var idlest *Server + var idle time.Time + for idlest, idle = T.idlest(); idlest != nil && now.Sub(idle) > T.options.ServerIdleTimeout; idlest, idle = T.idlest() { + T.removeServer(idlest) + } + + if idlest == nil { + wait = T.options.ServerIdleTimeout + } else { + wait = idle.Add(T.options.ServerIdleTimeout).Sub(now) + } + + time.Sleep(wait) + } } func (T *Pool) GetCredentials() auth.Credentials { @@ -36,17 +84,22 @@ func (T *Pool) GetCredentials() auth.Credentials { } func (T *Pool) AddRecipe(name string, r *recipe.Recipe) { - T.mu.Lock() - defer T.mu.Unlock() + func() { + T.mu.Lock() + defer T.mu.Unlock() - T.removeRecipe(name) + T.removeRecipe(name) - if T.recipes == nil { - T.recipes = make(map[string]*recipe.Recipe) - } - T.recipes[name] = r + if T.recipes == nil { + T.recipes = make(map[string]*recipe.Recipe) + } + T.recipes[name] = r + }() - // TODO(garet) allocate servers until at the min + count := r.AllocateInitial() + for i := 0; i < count; i++ { + T.scaleUpL1(name, r) + } } func (T *Pool) RemoveRecipe(name string) { @@ -89,6 +142,10 @@ func (T *Pool) scaleUp() { return } + T.scaleUpL1(name, r) +} + +func (T *Pool) scaleUpL1(name string, r *recipe.Recipe) { conn, params, err := r.Dial() if err != nil { // failed to dial @@ -244,6 +301,10 @@ func (T *Pool) addClient(client *Client) { T.clients = make(map[uuid.UUID]*Client) } T.clients[client.GetID()] = client + if T.clientsByKey == nil { + T.clientsByKey = make(map[[8]byte]*Client) + } + T.clientsByKey[client.GetBackendKey()] = client } func (T *Pool) removeClient(client *Client) { @@ -251,12 +312,56 @@ func (T *Pool) removeClient(client *Client) { defer T.mu.Unlock() delete(T.clients, client.GetID()) + delete(T.clientsByKey, client.GetBackendKey()) } func (T *Pool) Cancel(key [8]byte) error { - return nil // TODO(garet) + T.mu.RLock() + defer T.mu.RUnlock() + + client, ok := T.clientsByKey[key] + if !ok { + return nil + } + + state, peer, _ := client.GetState() + if state != StateActive { + return nil + } + + server, ok := T.servers[peer] + if !ok { + return nil + } + + r, ok := T.recipes[server.GetRecipe()] + if !ok { + return nil + } + + return r.Cancel(server.GetBackendKey()) } -func (T *Pool) ReadMetrics(metrics *metrics.Pool) { - // TODO(garet) +func (T *Pool) ReadMetrics(m *metrics.Pool) { + T.mu.RLock() + defer T.mu.RUnlock() + + if len(T.clients) != 0 && m.Clients == nil { + m.Clients = make(map[uuid.UUID]metrics.Conn) + } + if len(T.servers) != 0 && m.Servers == nil { + m.Servers = make(map[uuid.UUID]metrics.Conn) + } + + for id, client := range T.clients { + var mc metrics.Conn + client.ReadMetrics(&mc) + m.Clients[id] = mc + } + + for id, server := range T.servers { + var mc metrics.Conn + server.ReadMetrics(&mc) + m.Servers[id] = mc + } } diff --git a/lib/gat/pool/recipe/recipe.go b/lib/gat/pool/recipe/recipe.go index 7c6558be..b3615811 100644 --- a/lib/gat/pool/recipe/recipe.go +++ b/lib/gat/pool/recipe/recipe.go @@ -20,6 +20,20 @@ func NewRecipe(options Options) *Recipe { } } +func (T *Recipe) AllocateInitial() int { + T.mu.Lock() + defer T.mu.Unlock() + + if T.count >= T.options.MinConnections { + return 0 + } + + amount := T.options.MinConnections - T.count + T.count = T.options.MinConnections + + return amount +} + func (T *Recipe) Allocate() bool { T.mu.Lock() defer T.mu.Unlock() @@ -56,3 +70,7 @@ func (T *Recipe) Free() { func (T *Recipe) Dial() (fed.Conn, backends.AcceptParams, error) { return T.options.Dialer.Dial() } + +func (T *Recipe) Cancel(key [8]byte) error { + return T.options.Dialer.Cancel(key) +} diff --git a/lib/gat/pool/server.go b/lib/gat/pool/server.go index 1f26d5f4..fbddda79 100644 --- a/lib/gat/pool/server.go +++ b/lib/gat/pool/server.go @@ -1,8 +1,6 @@ package pool import ( - "sync/atomic" - "github.com/google/uuid" "pggat2/lib/fed" @@ -14,18 +12,12 @@ import ( ) type Server struct { - id uuid.UUID - recipe string + Conn - conn fed.Conn + recipe string ps *ps.Server eqp *eqp.Server - - initialParameters map[strutil.CIString]string - backendKey [8]byte - - transactionCount atomic.Int64 } func NewServer( @@ -60,27 +52,22 @@ func NewServer( } return &Server{ - id: id, - recipe: recipe, - conn: conn, - ps: psServer, - eqp: eqpServer, - backendKey: backendKey, + Conn: MakeConn( + id, + conn, + initialParameters, + backendKey, + ), + recipe: recipe, + ps: psServer, + eqp: eqpServer, } } -func (T *Server) GetID() uuid.UUID { - return T.id -} - func (T *Server) GetRecipe() string { return T.recipe } -func (T *Server) GetConn() fed.Conn { - return T.conn -} - func (T *Server) GetEQP() *eqp.Server { return T.eqp } @@ -88,15 +75,3 @@ func (T *Server) GetEQP() *eqp.Server { func (T *Server) GetPS() *ps.Server { return T.ps } - -func (T *Server) TransactionComplete() { - T.transactionCount.Add(1) -} - -func (T *Server) GetInitialParameters() map[strutil.CIString]string { - return T.initialParameters -} - -func (T *Server) SetState(state State, peer uuid.UUID) { - -} diff --git a/lib/gat/pool/state.go b/lib/gat/pool/state.go index 4bd6a1b4..8221f3fd 100644 --- a/lib/gat/pool/state.go +++ b/lib/gat/pool/state.go @@ -3,8 +3,8 @@ package pool type State int const ( - StateActive State = iota - StateIdle + StateIdle State = iota + StateActive StateAwaitingServer StateRunningResetQuery ) -- GitLab