diff --git a/lib/pool/backend.go b/lib/pool/backend.go index 7453b5528a40d014dec86dbd6c8e9aeb78834309..d49aff6c5813ffeb256bbfc94b8500b5c8994aa0 100644 --- a/lib/pool/backend.go +++ b/lib/pool/backend.go @@ -2,14 +2,11 @@ package pool import ( "log" - "math" "sync" "sync/atomic" "time" - "github.com/google/uuid" - - "gfx.cafe/gfx/pggat/lib/gat/metrics" + "gfx.cafe/gfx/pggat/lib/fed" "gfx.cafe/gfx/pggat/lib/pool/recipe" ) @@ -17,7 +14,7 @@ type backendRecipe struct { weight atomic.Int64 recipe *recipe.Recipe - servers []*Conn + servers []*fed.Conn // TODO(garet) killed bool } @@ -49,15 +46,13 @@ func NewBackend(config BackendConfig) *Backend { return b } -func (T *Backend) addRecipe(name string, r *recipe.Recipe) *backendRecipe { - target := &backendRecipe{ - recipe: r, - } +func (T *Backend) addRecipe(name string, r *recipe.Recipe) { if T.recipes == nil { T.recipes = make(map[string]*backendRecipe) } - T.recipes[name] = target - return target + T.recipes[name] = &backendRecipe{ + recipe: r, + } } func (T *Backend) AddRecipe(name string, r *recipe.Recipe) { @@ -109,12 +104,16 @@ func (T *Backend) ScaleUp() error { continue } } + + if !target.recipe.Allocate() { + return + } + target.weight.Add(1) }() - if target == nil || !target.recipe.Allocate() { + if target == nil { return ErrNoScalableRecipe } - target.weight.Add(1) server, err := target.recipe.Dial() if err != nil { @@ -132,50 +131,14 @@ func (T *Backend) ScaleUp() error { return nil } - target.servers = append(target.servers, NewConn(server)) + target.servers = append(target.servers, server) return nil } // ScaleDown attempts to scale down the pool. Returns when the next scale down should happen func (T *Backend) ScaleDown() time.Duration { - if T.config.IdleTimeout == 0 { - return math.MaxInt64 - } - - T.mu.Lock() - defer T.mu.Unlock() - - now := time.Now() - - var next = T.config.IdleTimeout - - for _, r := range T.recipes { - for i := 0; i < len(r.servers); i++ { - server := r.servers[i] - code, _, since := server.GetState() - if code != metrics.ConnStateIdle { - continue - } - - dur := now.Sub(since) - if dur > T.config.IdleTimeout { - if r.recipe.TryFree() { - r.weight.Add(-1) - _ = server.Close() - copy(r.servers[i:], r.servers[i+1:]) - r.servers = r.servers[:len(r.servers)-1] - i-- - } - } else { - dur = T.config.IdleTimeout - dur - if dur < next { - next = dur - } - } - } - } - - return next + // TODO(garet) + return T.config.IdleTimeout } func (T *Backend) scaleLoop() { @@ -241,21 +204,6 @@ func (T *Backend) scaleLoop() { } } -func (T *Backend) AddClient(id uuid.UUID) { - T.pooler.AddClient(id) -} - -func (T *Backend) RemoveClient(id uuid.UUID) { - T.pooler.RemoveClient(id) -} - func (T *Backend) Close() { close(T.closed) - - T.mu.Lock() - defer T.mu.Unlock() - - for name := range T.recipes { - T.removeRecipe(name) - } } diff --git a/lib/pool/conn.go b/lib/pool/conn.go deleted file mode 100644 index c3e6a545b71b8af352d4d924046e9a20974bf90c..0000000000000000000000000000000000000000 --- a/lib/pool/conn.go +++ /dev/null @@ -1,45 +0,0 @@ -package pool - -import ( - "sync" - "time" - - "github.com/google/uuid" - - "gfx.cafe/gfx/pggat/lib/fed" - "gfx.cafe/gfx/pggat/lib/gat/metrics" -) - -type Conn struct { - conn *fed.Conn - - state metrics.ConnState - peer uuid.UUID - since time.Time - mu sync.RWMutex -} - -func NewConn(conn *fed.Conn) *Conn { - return &Conn{ - conn: conn, - } -} - -func (T *Conn) GetState() (code metrics.ConnState, peer uuid.UUID, since time.Time) { - T.mu.RLock() - defer T.mu.RUnlock() - - code, peer, since = T.state, T.peer, T.since - return -} - -func (T *Conn) SetState(code metrics.ConnState, peer uuid.UUID) { - T.mu.Lock() - defer T.mu.Unlock() - - T.state, T.peer, T.since = code, peer, time.Now() -} - -func (T *Conn) Close() error { - return T.conn.Close() -} diff --git a/lib/pool/pooler.go b/lib/pool/pooler.go index 289016e8956d9af55faba05550d3641a533df1f3..06bba5a46ba99f4e34c4c69e8d196f02617799f8 100644 --- a/lib/pool/pooler.go +++ b/lib/pool/pooler.go @@ -11,10 +11,10 @@ const ( type Pooler interface { AddClient(id uuid.UUID) - RemoveClient(client uuid.UUID) + DeleteClient(client uuid.UUID) AddServer(id uuid.UUID) - RemoveServer(server uuid.UUID) + DeleteServer(server uuid.UUID) Acquire(client uuid.UUID, sync SyncMode) (server uuid.UUID) Release(server uuid.UUID) diff --git a/lib/pool/recipe/dialer.go b/lib/pool/recipe/dialer.go index b240b7d5051485147e55b16a05cbecb136f07a8c..3ce99919ecbbddb8eb939c6f09a450dd8b42aaac 100644 --- a/lib/pool/recipe/dialer.go +++ b/lib/pool/recipe/dialer.go @@ -5,8 +5,6 @@ import ( "net" "gfx.cafe/gfx/pggat/lib/auth" - "gfx.cafe/gfx/pggat/lib/bounce" - "gfx.cafe/gfx/pggat/lib/bounce/backends/v0" "gfx.cafe/gfx/pggat/lib/fed" "gfx.cafe/gfx/pggat/lib/util/strutil" )