good morning!!!!

Skip to content
Snippets Groups Projects
Commit f9d012be authored by Garet Halliday's avatar Garet Halliday
Browse files

fix backoff and server release

parent e206d84d
No related branches found
No related tags found
No related merge requests found
...@@ -3,6 +3,7 @@ package pool ...@@ -3,6 +3,7 @@ package pool
import ( import (
"errors" "errors"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/google/uuid" "github.com/google/uuid"
...@@ -24,6 +25,8 @@ type Pool struct { ...@@ -24,6 +25,8 @@ type Pool struct {
closed chan struct{} closed chan struct{}
backingOff atomic.Bool
recipes map[string]*recipe.Recipe recipes map[string]*recipe.Recipe
clients map[uuid.UUID]*Client clients map[uuid.UUID]*Client
clientsByKey map[[8]byte]*Client clientsByKey map[[8]byte]*Client
...@@ -145,6 +148,13 @@ func (T *Pool) removeRecipe(name string) { ...@@ -145,6 +148,13 @@ func (T *Pool) removeRecipe(name string) {
func (T *Pool) scaleUp() { func (T *Pool) scaleUp() {
backoff := T.options.ServerReconnectInitialTime backoff := T.options.ServerReconnectInitialTime
backingOff := false
defer func() {
if backingOff {
T.backingOff.Store(false)
}
}()
for { for {
select { select {
...@@ -153,6 +163,11 @@ func (T *Pool) scaleUp() { ...@@ -153,6 +163,11 @@ func (T *Pool) scaleUp() {
default: default:
} }
if !backingOff && T.backingOff.Load() {
// already in backoff
return
}
name, r := func() (string, *recipe.Recipe) { name, r := func() (string, *recipe.Recipe) {
T.mu.RLock() T.mu.RLock()
defer T.mu.RUnlock() defer T.mu.RUnlock()
...@@ -184,6 +199,13 @@ func (T *Pool) scaleUp() { ...@@ -184,6 +199,13 @@ func (T *Pool) scaleUp() {
return return
} }
if !backingOff {
if T.backingOff.Swap(true) {
return
}
backingOff = true
}
time.Sleep(backoff) time.Sleep(backoff)
backoff *= 2 backoff *= 2
...@@ -268,7 +290,7 @@ func (T *Pool) acquireServer(client *Client) *Server { ...@@ -268,7 +290,7 @@ func (T *Pool) acquireServer(client *Client) *Server {
} }
} }
func (T *Pool) releaseServer(server *Server) { func (T *Pool) releaseServerSlow(server *Server) {
if T.options.ServerResetQuery != "" { if T.options.ServerResetQuery != "" {
server.SetState(metrics.ConnStateRunningResetQuery, uuid.Nil) server.SetState(metrics.ConnStateRunningResetQuery, uuid.Nil)
...@@ -284,6 +306,18 @@ func (T *Pool) releaseServer(server *Server) { ...@@ -284,6 +306,18 @@ func (T *Pool) releaseServer(server *Server) {
T.options.Pooler.Release(server.GetID()) T.options.Pooler.Release(server.GetID())
} }
func (T *Pool) releaseServer(server *Server) {
if T.options.ServerResetQuery != "" {
// we will have to query server, fallback to slow path
go T.releaseServerSlow(server)
return
}
server.SetState(metrics.ConnStateIdle, uuid.Nil)
T.options.Pooler.Release(server.GetID())
}
func (T *Pool) Serve( func (T *Pool) Serve(
conn fed.Conn, conn fed.Conn,
initialParameters map[strutil.CIString]string, initialParameters map[strutil.CIString]string,
...@@ -335,7 +369,7 @@ func (T *Pool) serve(client *Client, initialize bool) error { ...@@ -335,7 +369,7 @@ func (T *Pool) serve(client *Client, initialize bool) error {
if serverErr != nil { if serverErr != nil {
T.removeServer(server) T.removeServer(server)
} else { } else {
T.releaseServer(server) T.releaseServerSlow(server)
} }
server = nil server = nil
} }
...@@ -362,7 +396,7 @@ func (T *Pool) serve(client *Client, initialize bool) error { ...@@ -362,7 +396,7 @@ func (T *Pool) serve(client *Client, initialize bool) error {
for { for {
if server != nil && T.options.ReleaseAfterTransaction { if server != nil && T.options.ReleaseAfterTransaction {
client.SetState(metrics.ConnStateIdle, uuid.Nil) client.SetState(metrics.ConnStateIdle, uuid.Nil)
go T.releaseServer(server) // TODO(garet) does this need to be a goroutine T.releaseServer(server) // TODO(garet) does this need to be a goroutine
server = nil server = nil
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment