diff --git a/lib/gat/pool/metrics/conn.go b/lib/gat/metrics/conn.go similarity index 100% rename from lib/gat/pool/metrics/conn.go rename to lib/gat/metrics/conn.go diff --git a/lib/gat/metrics/pool.go b/lib/gat/metrics/pool.go new file mode 100644 index 0000000000000000000000000000000000000000..105e4ba321d255ae4ef175dfcbff20b97cc6dae5 --- /dev/null +++ b/lib/gat/metrics/pool.go @@ -0,0 +1,21 @@ +package metrics + +import ( + "github.com/google/uuid" + + "pggat2/lib/util/maps" +) + +type Pool struct { + Servers map[uuid.UUID]Conn + Clients map[uuid.UUID]Conn +} + +func (T *Pool) Clear() { + maps.Clear(T.Servers) + maps.Clear(T.Clients) +} + +func (T *Pool) String() string { + return "TODO(garet)" // TODO(garet) +} diff --git a/lib/gat/metrics/pools.go b/lib/gat/metrics/pools.go new file mode 100644 index 0000000000000000000000000000000000000000..3c929867ebb26ee150ecd8c64d11e96d52632e3a --- /dev/null +++ b/lib/gat/metrics/pools.go @@ -0,0 +1,6 @@ +package metrics + +type Pools struct { + // TODO(garet) + Pool +} diff --git a/lib/gat/pool/metrics/state.go b/lib/gat/metrics/state.go similarity index 100% rename from lib/gat/pool/metrics/state.go rename to lib/gat/metrics/state.go diff --git a/lib/gat/modes/pgbouncer/config.go b/lib/gat/modes/pgbouncer/config.go index e97f8faf80d36c5acc7bdaa0e8f5cf4953b0bc77..94ec5cefec11ef833d1b82f7df2ba321c4a7f6de 100644 --- a/lib/gat/modes/pgbouncer/config.go +++ b/lib/gat/modes/pgbouncer/config.go @@ -12,7 +12,7 @@ import ( "pggat2/lib/bouncer" "pggat2/lib/bouncer/frontends/v0" "pggat2/lib/gat" - "pggat2/lib/gat/pool" + "pggat2/lib/gat/metrics" "pggat2/lib/util/encoding/ini" "pggat2/lib/util/flip" "pggat2/lib/util/strutil" @@ -288,12 +288,12 @@ func (T *Config) ListenAndServe() error { } go func() { - var metrics pool.Metrics + var m metrics.Pools for { - metrics.Clear() + m.Clear() time.Sleep(1 * time.Second) - pools.ReadMetrics(&metrics) - log.Print(metrics.String()) + pools.ReadMetrics(&m) + log.Print(m.String()) } }() diff --git a/lib/gat/modes/pgbouncer/pools.go b/lib/gat/modes/pgbouncer/pools.go index db7858cb3311ce3e77def2d5f57065fc0d2e6d7f..4ef0386ab878a07335d428b8e511b3be04f7102a 100644 --- a/lib/gat/modes/pgbouncer/pools.go +++ b/lib/gat/modes/pgbouncer/pools.go @@ -12,12 +12,13 @@ import ( "pggat2/lib/auth/credentials" "pggat2/lib/bouncer/backends/v0" - "pggat2/lib/bouncer/frontends/v0" "pggat2/lib/gat" + "pggat2/lib/gat/metrics" "pggat2/lib/gat/pool" + "pggat2/lib/gat/pool/dialer" "pggat2/lib/gat/pool/pools/session" "pggat2/lib/gat/pool/pools/transaction" - dialer2 "pggat2/lib/gat/pool/recipe/dialer" + "pggat2/lib/gat/pool/recipe" "pggat2/lib/gsql" "pggat2/lib/util/maps" "pggat2/lib/util/strutil" @@ -108,7 +109,7 @@ func (T *Pools) Lookup(user, database string) *pool.Pool { log.Println("auth query failed:", err) return nil } - err = authPool.Serve(client, frontends.AcceptParams{}, frontends.AuthenticateParams{}) + err = authPool.Serve(client, nil, [8]byte{}) if err != nil && !errors.Is(err, net.ErrClosed) { log.Println("auth query failed:", err) return nil @@ -173,7 +174,7 @@ func (T *Pools) Lookup(user, database string) *pool.Pool { Database: database, }, p) - var d dialer2.Dialer + var d dialer.Dialer dbCreds := creds if db.Password != "" { @@ -205,7 +206,7 @@ func (T *Pools) Lookup(user, database string) *pool.Pool { dir = dir + ".s.PGSQL." + strconv.Itoa(port) - d = dialer2.Net{ + d = dialer.Net{ Network: "unix", Address: dir, AcceptOptions: acceptOptions, @@ -219,33 +220,34 @@ func (T *Pools) Lookup(user, database string) *pool.Pool { } // connect over tcp - d = dialer2.Net{ + d = dialer.Net{ Network: "tcp", Address: address, AcceptOptions: acceptOptions, } } - recipe := pool.Recipe{ + recipeOptions := recipe.Options{ Dialer: d, MinConnections: db.MinPoolSize, MaxConnections: db.MaxDBConnections, } - if recipe.MinConnections == 0 { - recipe.MinConnections = T.Config.PgBouncer.MinPoolSize + if recipeOptions.MinConnections == 0 { + recipeOptions.MinConnections = T.Config.PgBouncer.MinPoolSize } - if recipe.MaxConnections == 0 { - recipe.MaxConnections = T.Config.PgBouncer.MaxDBConnections + if recipeOptions.MaxConnections == 0 { + recipeOptions.MaxConnections = T.Config.PgBouncer.MaxDBConnections } + r := recipe.NewRecipe(recipeOptions) - p.AddRecipe("pgbouncer", recipe) + p.AddRecipe("pgbouncer", r) return p } -func (T *Pools) ReadMetrics(metrics *pool.Metrics) { +func (T *Pools) ReadMetrics(metrics *metrics.Pools) { T.pools.Range(func(_ poolKey, p *pool.Pool) bool { - p.ReadMetrics(metrics) + p.ReadMetrics(&metrics.Pool) return true }) } diff --git a/lib/gat/pool/metrics/pool.go b/lib/gat/pool/metrics/pool.go deleted file mode 100644 index 1fc744b385935f58d7c4284100f254cbd5c1737b..0000000000000000000000000000000000000000 --- a/lib/gat/pool/metrics/pool.go +++ /dev/null @@ -1,8 +0,0 @@ -package metrics - -import "github.com/google/uuid" - -type Pool struct { - Servers map[uuid.UUID]Conn - Clients map[uuid.UUID]Conn -} diff --git a/lib/gat/pool/pool.go b/lib/gat/pool/pool.go index bd1ae4826df25c55f136e4e8b8c6890bfa46d040..1a1657cbd2123ec263ce32669c7f020733b11aae 100644 --- a/lib/gat/pool/pool.go +++ b/lib/gat/pool/pool.go @@ -9,18 +9,20 @@ import ( "pggat2/lib/bouncer/backends/v0" "pggat2/lib/bouncer/bouncers/v2" "pggat2/lib/fed" - "pggat2/lib/gat/pool/metrics" + "pggat2/lib/gat/metrics" "pggat2/lib/gat/pool/recipe" + "pggat2/lib/util/slices" "pggat2/lib/util/strutil" ) type Pool struct { options Options - recipes map[string]*recipe.Recipe - clients map[uuid.UUID]*Client - servers map[uuid.UUID]*Server - mu sync.RWMutex + recipes map[string]*recipe.Recipe + clients map[uuid.UUID]*Client + servers map[uuid.UUID]*Server + serversByRecipe map[string][]*Server + mu sync.RWMutex } func NewPool(options Options) *Pool { @@ -61,20 +63,82 @@ func (T *Pool) removeRecipe(name string) { } delete(T.recipes, name) - // TODO(garet) deallocate all servers created by recipe + servers := T.serversByRecipe[name] + delete(T.serversByRecipe, name) + + for _, server := range servers { + r.Free() + T.removeServerL1(server) + } } func (T *Pool) scaleUp() { - // TODO(garet) + name, r := func() (string, *recipe.Recipe) { + T.mu.RLock() + defer T.mu.RUnlock() + for name, r := range T.recipes { + if r.Allocate() { + return name, r + } + } + + return "", nil + }() + if r == nil { + // no recipe to scale + return + } + + conn, params, err := r.Dial() + if err != nil { + // failed to dial + r.Free() + return + } + + T.mu.Lock() + defer T.mu.Unlock() + if T.recipes[name] != r { + // recipe was removed + r.Free() + return + } + + id := T.options.Pooler.NewServer() + server := NewServer( + T.options, + id, + name, + conn, + params.InitialParameters, + params.BackendKey, + ) + + if T.servers == nil { + T.servers = make(map[uuid.UUID]*Server) + } + T.servers[id] = server + + if T.serversByRecipe == nil { + T.serversByRecipe = make(map[string][]*Server) + } + T.serversByRecipe[name] = append(T.serversByRecipe[name], server) } func (T *Pool) removeServer(server *Server) { T.mu.Lock() defer T.mu.Unlock() + T.removeServerL1(server) +} + +func (T *Pool) removeServerL1(server *Server) { delete(T.servers, server.GetID()) T.options.Pooler.DeleteServer(server.GetID()) _ = server.GetConn().Close() + if T.serversByRecipe != nil { + T.serversByRecipe[server.GetRecipe()] = slices.Remove(T.serversByRecipe[server.GetRecipe()], server) + } } func (T *Pool) acquireServer(client *Client) *Server { @@ -190,9 +254,9 @@ func (T *Pool) removeClient(client *Client) { } func (T *Pool) Cancel(key [8]byte) error { - + return nil // TODO(garet) } func (T *Pool) ReadMetrics(metrics *metrics.Pool) { - + // TODO(garet) } diff --git a/lib/gat/pool/recipe/recipe.go b/lib/gat/pool/recipe/recipe.go index 2ce6c30c09b7bffb8f1271bf150924c8c3d6358b..7c6558be2bc4cd4fe30076aedd7f98f0f5994cf8 100644 --- a/lib/gat/pool/recipe/recipe.go +++ b/lib/gat/pool/recipe/recipe.go @@ -1,7 +1,17 @@ package recipe +import ( + "sync" + + "pggat2/lib/bouncer/backends/v0" + "pggat2/lib/fed" +) + type Recipe struct { options Options + + count int + mu sync.Mutex } func NewRecipe(options Options) *Recipe { @@ -9,3 +19,40 @@ func NewRecipe(options Options) *Recipe { options: options, } } + +func (T *Recipe) Allocate() bool { + T.mu.Lock() + defer T.mu.Unlock() + + if T.options.MaxConnections != 0 { + if T.count >= T.options.MaxConnections { + return false + } + } + + T.count++ + return true +} + +func (T *Recipe) TryFree() bool { + T.mu.Lock() + defer T.mu.Unlock() + + if T.count <= T.options.MinConnections { + return false + } + + T.count-- + return true +} + +func (T *Recipe) Free() { + T.mu.Lock() + defer T.mu.Unlock() + + T.count-- +} + +func (T *Recipe) Dial() (fed.Conn, backends.AcceptParams, error) { + return T.options.Dialer.Dial() +} diff --git a/lib/gat/pool/server.go b/lib/gat/pool/server.go index 5f40612baeeaab74052ec1dfd37a1424907528eb..1f26d5f446e226c23bb687a9de065d3b5ce0eadf 100644 --- a/lib/gat/pool/server.go +++ b/lib/gat/pool/server.go @@ -1,39 +1,100 @@ package pool import ( + "sync/atomic" + "github.com/google/uuid" "pggat2/lib/fed" + "pggat2/lib/middleware" + "pggat2/lib/middleware/interceptor" "pggat2/lib/middleware/middlewares/eqp" "pggat2/lib/middleware/middlewares/ps" "pggat2/lib/util/strutil" ) type Server struct { + id uuid.UUID + recipe string + + conn fed.Conn + + ps *ps.Server + eqp *eqp.Server + + initialParameters map[strutil.CIString]string + backendKey [8]byte + + transactionCount atomic.Int64 +} + +func NewServer( + options Options, + id uuid.UUID, + recipe string, + conn fed.Conn, + initialParameters map[strutil.CIString]string, + backendKey [8]byte, +) *Server { + var middlewares []middleware.Middleware + + var psServer *ps.Server + if options.ParameterStatusSync == ParameterStatusSyncDynamic { + // add ps middleware + psServer = ps.NewServer(initialParameters) + middlewares = append(middlewares, psServer) + } + + var eqpServer *eqp.Server + if options.ExtendedQuerySync { + // add eqp middleware + eqpServer = eqp.NewServer() + middlewares = append(middlewares, eqpServer) + } + + if len(middlewares) > 0 { + conn = interceptor.NewInterceptor( + conn, + middlewares..., + ) + } + + return &Server{ + id: id, + recipe: recipe, + conn: conn, + ps: psServer, + eqp: eqpServer, + backendKey: backendKey, + } } func (T *Server) GetID() uuid.UUID { + return T.id +} +func (T *Server) GetRecipe() string { + return T.recipe } func (T *Server) GetConn() fed.Conn { - + return T.conn } func (T *Server) GetEQP() *eqp.Server { - + return T.eqp } func (T *Server) GetPS() *ps.Server { - + return T.ps } func (T *Server) TransactionComplete() { - + T.transactionCount.Add(1) } func (T *Server) GetInitialParameters() map[strutil.CIString]string { - + return T.initialParameters } func (T *Server) SetState(state State, peer uuid.UUID) { diff --git a/lib/gat/pools.go b/lib/gat/pools.go index 58658d2ad774b8b8c3c82ab2b1602bb541cb8c73..db20e1fd0c71cf574b327dc1f3da377e3709fdb7 100644 --- a/lib/gat/pools.go +++ b/lib/gat/pools.go @@ -1,6 +1,7 @@ package gat import ( + "pggat2/lib/gat/metrics" "pggat2/lib/gat/pool" "pggat2/lib/util/maps" ) @@ -8,7 +9,7 @@ import ( type Pools interface { Lookup(user, database string) *pool.Pool - ReadMetrics(metrics *pool.Metrics) + ReadMetrics(metrics *metrics.Pools) // Key based lookup functions (for cancellation) @@ -50,9 +51,9 @@ func (T *PoolsMap) Lookup(user, database string) *pool.Pool { return p } -func (T *PoolsMap) ReadMetrics(metrics *pool.Metrics) { +func (T *PoolsMap) ReadMetrics(metrics *metrics.Pools) { T.pools.Range(func(_ mapKey, p *pool.Pool) bool { - p.ReadMetrics(metrics) + p.ReadMetrics(&metrics.Pool) return true }) } diff --git a/lib/util/slices/remove.go b/lib/util/slices/remove.go index 6b62ffaf67601321d1052e0cbb17c15c0af053b0..7e38812ac877ffc90d67bc98e6de764f2c8c9309 100644 --- a/lib/util/slices/remove.go +++ b/lib/util/slices/remove.go @@ -1,9 +1,13 @@ package slices +// Remove will check for item in the target slice. If it finds it, it will move it to the end of the slice and return a slice +// with length-1. The original slice will contain all items (though in a different order), and the new slice will contain all +// but item. func Remove[T comparable](slice []T, item T) []T { for i, s := range slice { if s == item { copy(slice[i:], slice[i+1:]) + slice[len(slice)-1] = item return slice[:len(slice)-1] } }