Newer
Older
"github.com/google/uuid"
"tuxpa.in/a/zlog/log"
"pggat2/lib/auth"
"pggat2/lib/bouncer/backends/v0"
"pggat2/lib/bouncer/bouncers/v2"
"pggat2/lib/bouncer/frontends/v0"
"pggat2/lib/middleware"
"pggat2/lib/middleware/interceptor"
"pggat2/lib/middleware/middlewares/eqp"
"pggat2/lib/middleware/middlewares/ps"
"pggat2/lib/middleware/middlewares/unterminate"
"pggat2/lib/util/slices"
"pggat2/lib/util/strutil"
)
type poolRecipe struct {
recipe Recipe
recipes maps.RWLocked[string, *poolRecipe]
servers maps.RWLocked[uuid.UUID, *Server]
clients maps.RWLocked[uuid.UUID, *Client]
func (T *Pool) idlest() (idlest uuid.UUID, idle time.Time) {
T.servers.Range(func(serverID uuid.UUID, server *Server) bool {
peer, since := server.GetConnection()
if peer != uuid.Nil {
return true
}
return
}
func (T *Pool) idleTimeoutLoop() {
for {
var wait time.Duration
now := time.Now()
var idlest uuid.UUID
var idle time.Time
for idlest, idle = T.idlest(); idlest != uuid.Nil && now.Sub(idle) > T.options.ServerIdleTimeout; idlest, idle = T.idlest() {
T.removeServer(idlest)
}
if idlest == uuid.Nil {
wait = T.options.ServerIdleTimeout
} else {
wait = idle.Add(T.options.ServerIdleTimeout).Sub(now)
}
time.Sleep(wait)
}
}
func (T *Pool) GetCredentials() auth.Credentials {
return T.options.Credentials
}
func (T *Pool) _scaleUpRecipe(name string) {
server, params, err := r.recipe.Dialer.Dial()
if err != nil {
log.Printf("failed to dial server: %v", err)
}
var middlewares []middleware.Middleware
var psServer *ps.Server
if T.options.ParameterStatusSync == ParameterStatusSyncDynamic {
// add ps middleware
psServer = ps.NewServer(params.InitialParameters)
middlewares = append(middlewares, psServer)
}
var eqpServer *eqp.Server
if T.options.ExtendedQuerySync {
// add eqp middleware
eqpServer = eqp.NewServer()
middlewares = append(middlewares, eqpServer)
}
if len(middlewares) > 0 {
server = interceptor.NewInterceptor(
server,
middlewares...,
)
}
r.count.Add(1)
serverID := uuid.New()
T.servers.Store(serverID, NewServer(
server,
params.BackendKey,
params.InitialParameters,
name,
psServer,
eqpServer,
))
T.options.Pooler.AddServer(serverID)
}
func (T *Pool) AddRecipe(name string, recipe Recipe) {
})
if hasOld {
T.servers.Range(func(serverID uuid.UUID, server *Server) bool {
if server.GetRecipe() == name {
_ = server.GetConn().Close()
T.options.Pooler.RemoveServer(serverID)
T.servers.Delete(serverID)
}
return true
})
}
for i := 0; i < recipe.MinConnections; i++ {
T._scaleUpRecipe(name)
}
}
func (T *Pool) RemoveRecipe(name string) {
T.servers.Range(func(serverID uuid.UUID, server *Server) bool {
if server.GetRecipe() == name {
_ = server.GetConn().Close()
T.options.Pooler.RemoveServer(serverID)
T.servers.Delete(serverID)
func (T *Pool) ScaleUp() {
T.recipes.Range(func(name string, r *poolRecipe) bool {
if r.recipe.MaxConnections == 0 || int(r.count.Load()) < r.recipe.MaxConnections {
func syncInitialParameters(
trackedParameters []strutil.CIString,
serverParams map[strutil.CIString]string,
) (clientErr, serverErr error) {
for key, value := range clientParams {
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
// skip already set params
if serverParams[key] == value {
setServer = false
} else if !setServer {
value = serverParams[key]
}
p := packets.ParameterStatus{
Key: key.String(),
Value: serverParams[key],
}
clientErr = client.WritePacket(p.IntoPacket())
if clientErr != nil {
return
}
if !setServer {
continue
}
serverErr = backends.SetParameter(new(backends.Context), server, key, value)
if serverErr != nil {
return
}
}
for key, value := range serverParams {
if _, ok := clientParams[key]; ok {
continue
}
// Don't need to run reset on server because it will reset it to the initial value
// send to client
p := packets.ParameterStatus{
Key: key.String(),
Value: value,
}
clientErr = client.WritePacket(p.IntoPacket())
if clientErr != nil {
return
}
}
return
}
func (T *Pool) Serve(
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
accept frontends.AcceptParams,
auth frontends.AuthenticateParams,
) error {
defer func() {
_ = client.Close()
}()
middlewares := []middleware.Middleware{
unterminate.Unterminate,
}
var psClient *ps.Client
if T.options.ParameterStatusSync == ParameterStatusSyncDynamic {
// add ps middleware
psClient = ps.NewClient(accept.InitialParameters)
middlewares = append(middlewares, psClient)
}
var eqpClient *eqp.Client
if T.options.ExtendedQuerySync {
// add eqp middleware
eqpClient = eqp.NewClient()
middlewares = append(middlewares, eqpClient)
}
client = interceptor.NewInterceptor(
client,
middlewares...,
)
defer func() {
if serverID != uuid.Nil {
T.releaseServer(serverID)
}
}()
for {
packet, err := client.ReadPacket(true)
if err != nil {
return err
}
var clientErr, serverErr error
if serverID == uuid.Nil {
serverID, server = T.acquireServer(clientID)
switch T.options.ParameterStatusSync {
case ParameterStatusSyncDynamic:
clientErr, serverErr = ps.Sync(T.options.TrackedParameters, client, psClient, server.GetConn(), server.GetPSServer())
clientErr, serverErr = syncInitialParameters(T.options.TrackedParameters, client, accept.InitialParameters, server.GetConn(), server.GetInitialParameters())
clientErr, serverErr = bouncers.Bounce(client, server.GetConn(), packet)
}
if serverErr != nil {
T.removeServer(serverID)
serverID = uuid.Nil
return serverErr
} else {
if T.options.Pooler.ReleaseAfterTransaction() {
T.releaseServer(serverID)
serverID = uuid.Nil
}
}
if clientErr != nil {
return clientErr
}
}
}
T.options.Pooler.AddClient(clientID)
return clientID
}
func (T *Pool) acquireServer(clientID uuid.UUID) (serverID uuid.UUID, server *Server) {
serverID = T.options.Pooler.Acquire(clientID, SyncModeNonBlocking)
serverID = T.options.Pooler.Acquire(clientID, SyncModeBlocking)
server, _ = T.servers.Load(serverID)
client, _ := T.clients.Load(clientID)
return
}
func (T *Pool) releaseServer(serverID uuid.UUID) {
err := backends.QueryString(new(backends.Context), server.GetConn(), T.options.ServerResetQuery)
return
}
}
T.options.Pooler.Release(serverID)
}
}
func (T *Pool) removeServer(serverID uuid.UUID) {
server, _ := T.servers.LoadAndDelete(serverID)
if server == nil {
return
}
_ = server.GetConn().Close()
T.options.Pooler.RemoveServer(serverID)
r, _ := T.recipes.Load(server.GetRecipe())
if r != nil {
r.count.Add(-1)
}
func (T *Pool) Cancel(key [8]byte) error {
var clientID uuid.UUID
T.clients.Range(func(id uuid.UUID, client *Client) bool {
if client.GetBackendKey() == key {
clientID = id
return false
// get peer
var recipe string
var serverKey [8]byte
if T.servers.Range(func(_ uuid.UUID, server *Server) bool {
if server.GetPeer() == clientID {
recipe = server.GetRecipe()
serverKey = server.GetBackendKey()
return false
return r.recipe.Dialer.Cancel(serverKey)
}
func (T *Pool) ReadMetrics(metrics *Metrics) {
maps.Clear(metrics.Servers)
maps.Clear(metrics.Clients)
T.servers.Range(func(serverID uuid.UUID, server *Server) bool {
var m ServerMetrics
server.ReadMetrics(&m)
metrics.Servers[serverID] = m
return true
})
T.clients.Range(func(clientID uuid.UUID, client *Client) bool {
var m ClientMetrics
client.ReadMetrics(&m)
metrics.Clients[clientID] = m
return true
})