Newer
Older
"github.com/google/uuid"
"tuxpa.in/a/zlog/log"
"pggat2/lib/auth"
"pggat2/lib/bouncer/backends/v0"
"pggat2/lib/bouncer/bouncers/v2"
"pggat2/lib/bouncer/frontends/v0"
"pggat2/lib/middleware"
"pggat2/lib/middleware/interceptor"
"pggat2/lib/middleware/middlewares/eqp"
"pggat2/lib/middleware/middlewares/ps"
"pggat2/lib/middleware/middlewares/unterminate"
"pggat2/lib/util/slices"
"pggat2/lib/util/strutil"
)
type poolRecipe struct {
recipe Recipe
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
deleted bool
servers map[uuid.UUID]*Server
mu sync.RWMutex
}
func (T *poolRecipe) AddServer(serverID uuid.UUID, server *Server) bool {
T.mu.Lock()
defer T.mu.Unlock()
if T.deleted {
return false
}
if T.recipe.MaxConnections != 0 && len(T.servers)+1 > T.recipe.MaxConnections {
return false
}
if T.servers == nil {
T.servers = make(map[uuid.UUID]*Server)
}
T.servers[serverID] = server
return true
}
func (T *poolRecipe) GetServer(serverID uuid.UUID) *Server {
T.mu.RLock()
defer T.mu.RUnlock()
if T.deleted {
return nil
}
return T.servers[serverID]
}
func (T *poolRecipe) DeleteServer(serverID uuid.UUID) *Server {
T.mu.RLock()
defer T.mu.RUnlock()
if T.deleted {
return nil
}
server := T.servers[serverID]
delete(T.servers, serverID)
return server
}
func (T *poolRecipe) Size() int {
T.mu.RLock()
defer T.mu.RUnlock()
return len(T.servers)
}
func (T *poolRecipe) RangeRLock(fn func(serverID uuid.UUID, server *Server) bool) bool {
T.mu.RLock()
defer T.mu.RUnlock()
for serverID, server := range T.servers {
if !fn(serverID, server) {
return false
}
}
return true
}
func (T *poolRecipe) Delete(fn func(serverID uuid.UUID, server *Server)) {
T.mu.Lock()
defer T.mu.Unlock()
T.deleted = true
for serverID, server := range T.servers {
fn(serverID, server)
delete(T.servers, serverID)
}
func (T *Pool) GetServer(serverID uuid.UUID) *Server {
recipe, _ := T.servers.Load(serverID)
if recipe == nil {
return nil
}
return recipe.GetServer(serverID)
}
func (T *Pool) idlest() (idlest uuid.UUID, idle time.Time) {
T.recipes.Range(func(_ string, recipe *poolRecipe) bool {
recipe.RangeRLock(func(serverID uuid.UUID, server *Server) bool {
peer, since := server.GetConnection()
if peer != uuid.Nil {
return true
}
if idle != (time.Time{}) && since.After(idle) {
return true
}
return
}
func (T *Pool) idleTimeoutLoop() {
for {
var wait time.Duration
now := time.Now()
var idlest uuid.UUID
var idle time.Time
for idlest, idle = T.idlest(); idlest != uuid.Nil && now.Sub(idle) > T.options.ServerIdleTimeout; idlest, idle = T.idlest() {
T.removeServer(idlest)
}
if idlest == uuid.Nil {
wait = T.options.ServerIdleTimeout
} else {
wait = idle.Add(T.options.ServerIdleTimeout).Sub(now)
}
time.Sleep(wait)
}
}
func (T *Pool) GetCredentials() auth.Credentials {
return T.options.Credentials
}
server, params, err := r.recipe.Dialer.Dial()
if err != nil {
log.Printf("failed to dial server: %v", err)
}
var middlewares []middleware.Middleware
var psServer *ps.Server
if T.options.ParameterStatusSync == ParameterStatusSyncDynamic {
// add ps middleware
psServer = ps.NewServer(params.InitialParameters)
middlewares = append(middlewares, psServer)
}
var eqpServer *eqp.Server
if T.options.ExtendedQuerySync {
// add eqp middleware
eqpServer = eqp.NewServer()
middlewares = append(middlewares, eqpServer)
}
if len(middlewares) > 0 {
server = interceptor.NewInterceptor(
server,
middlewares...,
)
}
serverID := T.options.Pooler.NewServer()
ok := r.AddServer(serverID, NewServer(
if !ok {
_ = server.Close()
T.options.Pooler.DeleteServer(serverID)
return
}
T.servers.Store(serverID, r)
}
func (T *Pool) AddRecipe(name string, recipe Recipe) {
}
old, _ := T.recipes.Swap(name, r)
if old != nil {
old.Delete(func(serverID uuid.UUID, server *Server) {
_ = server.GetConn().Close()
T.options.Pooler.DeleteServer(serverID)
T.servers.Delete(serverID)
}
}
func (T *Pool) RemoveRecipe(name string) {
old, _ := T.recipes.LoadAndDelete(name)
if old == nil {
return
}
old.Delete(func(serverID uuid.UUID, server *Server) {
_ = server.GetConn().Close()
T.options.Pooler.DeleteServer(serverID)
T.servers.Delete(serverID)
// this can race, but it will just dial an extra server and disconnect it in worst case
if r.recipe.MaxConnections == 0 || r.Size() < r.recipe.MaxConnections {
T.scaleUpRecipe(r)
if failed {
log.Println("No available recipe found to scale up")
}
func syncInitialParameters(
trackedParameters []strutil.CIString,
serverParams map[strutil.CIString]string,
) (clientErr, serverErr error) {
for key, value := range clientParams {
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
// skip already set params
if serverParams[key] == value {
setServer = false
} else if !setServer {
value = serverParams[key]
}
p := packets.ParameterStatus{
Key: key.String(),
Value: serverParams[key],
}
clientErr = client.WritePacket(p.IntoPacket())
if clientErr != nil {
return
}
if !setServer {
continue
}
serverErr = backends.SetParameter(new(backends.Context), server, key, value)
if serverErr != nil {
return
}
}
for key, value := range serverParams {
if _, ok := clientParams[key]; ok {
continue
}
// Don't need to run reset on server because it will reset it to the initial value
// send to client
p := packets.ParameterStatus{
Key: key.String(),
Value: value,
}
clientErr = client.WritePacket(p.IntoPacket())
if clientErr != nil {
return
}
}
return
}
func (T *Pool) Serve(
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
accept frontends.AcceptParams,
auth frontends.AuthenticateParams,
) error {
defer func() {
_ = client.Close()
}()
middlewares := []middleware.Middleware{
unterminate.Unterminate,
}
var psClient *ps.Client
if T.options.ParameterStatusSync == ParameterStatusSyncDynamic {
// add ps middleware
psClient = ps.NewClient(accept.InitialParameters)
middlewares = append(middlewares, psClient)
}
var eqpClient *eqp.Client
if T.options.ExtendedQuerySync {
// add eqp middleware
eqpClient = eqp.NewClient()
middlewares = append(middlewares, eqpClient)
}
client = interceptor.NewInterceptor(
client,
middlewares...,
)
defer func() {
if serverID != uuid.Nil {
T.releaseServer(serverID)
}
}()
for {
packet, err := client.ReadPacket(true)
if err != nil {
return err
}
var clientErr, serverErr error
if serverID == uuid.Nil {
serverID, server = T.acquireServer(clientID)
switch T.options.ParameterStatusSync {
case ParameterStatusSyncDynamic:
clientErr, serverErr = ps.Sync(T.options.TrackedParameters, client, psClient, server.GetConn(), server.GetPSServer())
clientErr, serverErr = syncInitialParameters(T.options.TrackedParameters, client, accept.InitialParameters, server.GetConn(), server.GetInitialParameters())
clientErr, serverErr = bouncers.Bounce(client, server.GetConn(), packet)
}
if serverErr != nil {
T.removeServer(serverID)
serverID = uuid.Nil
if T.options.Pooler.ReleaseAfterTransaction() {
T.releaseServer(serverID)
serverID = uuid.Nil
}
}
if clientErr != nil {
return clientErr
}
}
}
func (T *Pool) acquireServer(clientID uuid.UUID) (serverID uuid.UUID, server *Server) {
serverID = T.options.Pooler.Acquire(clientID, SyncModeNonBlocking)
serverID = T.options.Pooler.Acquire(clientID, SyncModeBlocking)
return
}
func (T *Pool) releaseServer(serverID uuid.UUID) {
err := backends.QueryString(new(backends.Context), server.GetConn(), T.options.ServerResetQuery)
func (T *Pool) transactionComplete(clientID, serverID uuid.UUID) {
func() {
server := T.GetServer(serverID)
if server == nil {
return
}
server.TransactionComplete()
}()
client, _ := T.clients.Load(clientID)
if client == nil {
return
}
client.TransactionComplete()
}
func (T *Pool) removeServer(serverID uuid.UUID) {
recipe, _ := T.servers.LoadAndDelete(serverID)
if recipe == nil {
return
}
server := recipe.DeleteServer(serverID)
T.options.Pooler.DeleteServer(serverID)
func (T *Pool) Cancel(key [8]byte) error {
var clientID uuid.UUID
T.clients.Range(func(id uuid.UUID, client *Client) bool {
if client.GetBackendKey() == key {
clientID = id
return false
if T.recipes.Range(func(_ string, recipe *poolRecipe) bool {
return recipe.RangeRLock(func(_ uuid.UUID, server *Server) bool {
if server.GetPeer() == clientID {
r = recipe
serverKey = server.GetBackendKey()
return false
}
return true
})
return r.recipe.Dialer.Cancel(serverKey)
}
func (T *Pool) ReadMetrics(metrics *Metrics) {
if metrics.Servers == nil {
metrics.Servers = make(map[uuid.UUID]ItemMetrics)
}
if metrics.Clients == nil {
metrics.Clients = make(map[uuid.UUID]ItemMetrics)
}
T.recipes.Range(func(_ string, recipe *poolRecipe) bool {
recipe.RangeRLock(func(serverID uuid.UUID, server *Server) bool {
server.ReadMetrics(&m)
metrics.Servers[serverID] = m
return true
})
return true
})
T.clients.Range(func(clientID uuid.UUID, client *Client) bool {
client.ReadMetrics(&m)
metrics.Clients[clientID] = m
return true
})