diff --git a/lib/gat/pool/pool.go b/lib/gat/pool/pool.go index 22ca2e24df0dbf8fb0c3aab7742fc0b21d3e90d8..5d093d1e2eac9af8e8a65554df27a522ce9edbab 100644 --- a/lib/gat/pool/pool.go +++ b/lib/gat/pool/pool.go @@ -3,6 +3,7 @@ package pool import ( "errors" "sync" + "sync/atomic" "time" "github.com/google/uuid" @@ -24,6 +25,8 @@ type Pool struct { closed chan struct{} + backingOff atomic.Bool + recipes map[string]*recipe.Recipe clients map[uuid.UUID]*Client clientsByKey map[[8]byte]*Client @@ -145,6 +148,13 @@ func (T *Pool) removeRecipe(name string) { func (T *Pool) scaleUp() { backoff := T.options.ServerReconnectInitialTime + backingOff := false + + defer func() { + if backingOff { + T.backingOff.Store(false) + } + }() for { select { @@ -153,6 +163,11 @@ func (T *Pool) scaleUp() { default: } + if !backingOff && T.backingOff.Load() { + // already in backoff + return + } + name, r := func() (string, *recipe.Recipe) { T.mu.RLock() defer T.mu.RUnlock() @@ -184,6 +199,13 @@ func (T *Pool) scaleUp() { return } + if !backingOff { + if T.backingOff.Swap(true) { + return + } + backingOff = true + } + time.Sleep(backoff) backoff *= 2 @@ -268,7 +290,7 @@ func (T *Pool) acquireServer(client *Client) *Server { } } -func (T *Pool) releaseServer(server *Server) { +func (T *Pool) releaseServerSlow(server *Server) { if T.options.ServerResetQuery != "" { server.SetState(metrics.ConnStateRunningResetQuery, uuid.Nil) @@ -284,6 +306,18 @@ func (T *Pool) releaseServer(server *Server) { T.options.Pooler.Release(server.GetID()) } +func (T *Pool) releaseServer(server *Server) { + if T.options.ServerResetQuery != "" { + // we will have to query server, fallback to slow path + go T.releaseServerSlow(server) + return + } + + server.SetState(metrics.ConnStateIdle, uuid.Nil) + + T.options.Pooler.Release(server.GetID()) +} + func (T *Pool) Serve( conn fed.Conn, initialParameters map[strutil.CIString]string, @@ -335,7 +369,7 @@ func (T *Pool) serve(client *Client, initialize bool) error { if serverErr != nil { T.removeServer(server) } else { - T.releaseServer(server) + T.releaseServerSlow(server) } server = nil } @@ -362,7 +396,7 @@ func (T *Pool) serve(client *Client, initialize bool) error { for { if server != nil && T.options.ReleaseAfterTransaction { client.SetState(metrics.ConnStateIdle, uuid.Nil) - go T.releaseServer(server) // TODO(garet) does this need to be a goroutine + T.releaseServer(server) // TODO(garet) does this need to be a goroutine server = nil }