Newer
Older
"pggat/lib/gat/metrics"
"pggat/lib/gat/pool/recipe"
"pggat/lib/util/slices"
"pggat/lib/util/strutil"
recipes map[string]*recipe.Recipe
clients map[uuid.UUID]*Client
servers map[uuid.UUID]*Server
serversByRecipe map[string][]*Server
mu sync.RWMutex
if options.NewPooler == nil {
panic("expected new pooler func")
}
pooler := options.NewPooler()
if pooler == nil {
panic("expected pooler")
}
return p
}
func (T *Pool) idlest() (server *Server, at time.Time) {
T.mu.RLock()
defer T.mu.RUnlock()
for _, s := range T.servers {
state, _, since := s.GetState()
continue
}
if at == (time.Time{}) || since.Before(at) {
server = s
at = since
}
}
return
}
func (T *Pool) GetCredentials() auth.Credentials {
return T.options.Credentials
}
func (T *Pool) AddRecipe(name string, r *recipe.Recipe) {
if T.recipes == nil {
T.recipes = make(map[string]*recipe.Recipe)
}
T.recipes[name] = r
}()
count := r.AllocateInitial()
for i := 0; i < count; i++ {
if err := T.scaleUpL1(name, r); err != nil {
log.Printf("failed to dial server: %v", err)
for j := i; j < count; j++ {
r.Free()
}
break
}
func (T *Pool) RemoveRecipe(name string) {
T.mu.Lock()
defer T.mu.Unlock()
func (T *Pool) removeRecipe(name string) {
r, ok := T.recipes[name]
if !ok {
servers := T.serversByRecipe[name]
delete(T.serversByRecipe, name)
for _, server := range servers {
r.Free()
T.removeServerL1(server)
}
func (T *Pool) scaleUpL0() (string, *recipe.Recipe) {
T.mu.RLock()
defer T.mu.RUnlock()
for name, r := range T.recipes {
if r.Allocate() {
return name, r
func (T *Pool) scaleUpL1(name string, r *recipe.Recipe) error {
conn, params, err := r.Dial()
if err != nil {
// failed to dial
r.Free()
server, err := func() (*Server, error) {
T.mu.Lock()
defer T.mu.Unlock()
if T.recipes[name] != r {
// recipe was removed
r.Free()
return nil, errors.New("recipe was removed")
}
server := NewServer(
T.options,
name,
conn,
params.InitialParameters,
params.BackendKey,
)
if T.servers == nil {
T.servers = make(map[uuid.UUID]*Server)
}
T.servers[server.GetID()] = server
if T.serversByRecipe == nil {
T.serversByRecipe = make(map[string][]*Server)
}
T.serversByRecipe[name] = append(T.serversByRecipe[name], server)
return server, nil
}()
if err != nil {
return err
func (T *Pool) scaleUp() bool {
name, r := T.scaleUpL0()
if r == nil {
return false
}
err := T.scaleUpL1(name, r)
if err != nil {
log.Printf("failed to dial server: %v", err)
return false
}
return true
}
func (T *Pool) removeServer(server *Server) {
T.mu.Lock()
defer T.mu.Unlock()
T.removeServerL1(server)
}
func (T *Pool) removeServerL1(server *Server) {
if T.serversByRecipe != nil {
T.serversByRecipe[server.GetRecipe()] = slices.Delete(T.serversByRecipe[server.GetRecipe()], server)
func (T *Pool) acquireServer(client *Client) *Server {
serverID := T.pooler.Acquire(client.GetID(), SyncModeNonBlocking)
T.pendingCount.Add(1)
select {
case T.pending <- struct{}{}:
default:
}
serverID = T.pooler.Acquire(client.GetID(), SyncModeBlocking)
T.mu.RLock()
server, ok := T.servers[serverID]
T.mu.RUnlock()
if !ok {
server.SetState(metrics.ConnStateRunningResetQuery, uuid.Nil)
ctx := backends.Context{
Server: server.GetReadWriter(),
}
err := backends.QueryString(&ctx, T.options.ServerResetQuery)
conn fed.Conn,
initialParameters map[strutil.CIString]string,
backendKey [8]byte,
client := NewClient(
T.options,
conn,
initialParameters,
backendKey,
// ServeBot is for clients that don't need initial parameters, cancelling queries, and are ready now. Use Serve for
// real clients
func (T *Pool) ServeBot(
conn fed.Conn,
) error {
defer func() {
_ = conn.Close()
}()
client := NewClient(
T.options,
conn,
nil,
[8]byte{},
)
return T.serve(client, true)
}
func (T *Pool) serve(client *Client, initialized bool) error {
T.addClient(client)
defer T.removeClient(client)
defer func() {
if server != nil {
if serverErr != nil {
T.removeServer(server)
} else {
if serverErr != nil {
return serverErr
}
if err != nil {
return err
}
p := packets.ReadyForQuery('I')
packet = p.IntoPacket(packet)
err = client.GetConn().WritePacket(packet)
if server != nil && T.options.ReleaseAfterTransaction {
client.SetState(metrics.ConnStateIdle, uuid.Nil)
packet, err = client.GetConn().ReadPacket(true, packet)
if server == nil {
server = T.acquireServer(client)
packet, err, serverErr = bouncers.Bounce(client.GetReadWriter(), server.GetReadWriter(), packet)
}
if serverErr != nil {
return serverErr
} else {
func (T *Pool) addClient(client *Client) {
T.mu.Lock()
defer T.mu.Unlock()
if T.clients == nil {
T.clients = make(map[uuid.UUID]*Client)
if T.clientsByKey == nil {
T.clientsByKey = make(map[[8]byte]*Client)
}
T.clientsByKey[client.GetBackendKey()] = client
func (T *Pool) removeClient(client *Client) {
T.mu.Lock()
defer T.mu.Unlock()
T.removeClientL1(client)
}
func (T *Pool) removeClientL1(client *Client) {
func (T *Pool) Cancel(key [8]byte) error {
T.mu.RLock()
defer T.mu.RUnlock()
client, ok := T.clientsByKey[key]
if !ok {
return nil
}
state, peer, _ := client.GetState()
return nil
}
server, ok := T.servers[peer]
if !ok {
return nil
}
// prevent state from changing by RLocking the server
server.mu.RLock()
defer server.mu.RUnlock()
// make sure peer is still set
if server.peer != peer {
return nil
}
r, ok := T.recipes[server.recipe]
return r.Cancel(server.backendKey)
func (T *Pool) ReadMetrics(m *metrics.Pool) {
T.mu.RLock()
defer T.mu.RUnlock()
if len(T.clients) != 0 && m.Clients == nil {
m.Clients = make(map[uuid.UUID]metrics.Conn)
}
if len(T.servers) != 0 && m.Servers == nil {
m.Servers = make(map[uuid.UUID]metrics.Conn)
}
for id, client := range T.clients {
var mc metrics.Conn
client.ReadMetrics(&mc)
m.Clients[id] = mc
}
for id, server := range T.servers {
var mc metrics.Conn
server.ReadMetrics(&mc)
m.Servers[id] = mc
}
func (T *Pool) Close() {
close(T.closed)
T.mu.Lock()
defer T.mu.Unlock()
// remove clients
for _, client := range T.clients {
T.removeClient(client)
}
// remove recipes
for name := range T.recipes {
T.removeRecipe(name)
}
}