diff --git a/lib/gat/pool.go b/lib/gat/pool.go index 0c89faf78e42e1df246bb7a42328ed866d4fb550..2b7d76c049ca4df39a2a174d48b7783d5c5adc7e 100644 --- a/lib/gat/pool.go +++ b/lib/gat/pool.go @@ -1,12 +1,15 @@ package gat import ( + "errors" "log" "sync" + "time" "github.com/google/uuid" "pggat2/lib/bouncer/backends/v0" + "pggat2/lib/util/maps" "pggat2/lib/util/maths" "pggat2/lib/zap" ) @@ -21,268 +24,180 @@ type RawPool interface { AddServer(server zap.ReadWriter) uuid.UUID GetServer(id uuid.UUID) zap.ReadWriter RemoveServer(id uuid.UUID) zap.ReadWriter -} -type recipeWithConns struct { - recipe Recipe + ScaleDown(amount int) (remaining int) + IdleSince() time.Time +} +type PoolRecipe struct { removed bool - conns []uuid.UUID + servers []uuid.UUID mu sync.Mutex -} -func (T *recipeWithConns) scaleUp(pool *Pool, currentScale int) bool { - if currentScale >= T.recipe.GetMaxConnections() || T.removed { - return false - } + r Recipe +} - T.mu.Unlock() - conn, err := T.recipe.Connect() +func (T *PoolRecipe) connect() (zap.ReadWriter, error) { + rw, err := T.r.Connect() if err != nil { - log.Printf("Failed to connect: %v", err) - T.mu.Lock() - return false + return nil, err } - err2 := backends.Accept(conn, T.recipe.GetUser(), T.recipe.GetPassword(), T.recipe.GetDatabase()) + + err2 := backends.Accept(rw, T.r.GetUser(), T.r.GetPassword(), T.r.GetDatabase()) if err2 != nil { - _ = conn.Close() - log.Printf("Failed to connect: %v", err2) - T.mu.Lock() - return false + return nil, errors.New(err2.Message()) } - id := pool.raw.AddServer(conn) - T.mu.Lock() - T.conns = append(T.conns, id) - return true + return rw, nil } -func (T *recipeWithConns) scaleDown(pool *Pool, currentScale int) bool { - if currentScale <= T.recipe.GetMinConnections() || T.removed { - return false - } - - if len(T.conns) == 0 { - // none to close - return false - } +type Pool struct { + recipes maps.RWLocked[string, *PoolRecipe] - id := T.conns[len(T.conns)-1] - conn := pool.raw.RemoveServer(id) - if conn != nil { - _ = conn.Close() - } - T.conns = T.conns[:len(T.conns)-1] - return true + ctx Context + raw RawPool } -func (T *recipeWithConns) scale(pool *Pool, currentScale int, amount int) int { - if T.removed { - return amount +func NewPool(raw RawPool) *Pool { + onWait := make(chan struct{}) + pool := &Pool{ + ctx: Context{ + OnWait: onWait, + }, + raw: raw, } - if amount > 0 { - for amount > 0 { - if T.scaleUp(pool, currentScale) { - amount-- - currentScale++ - } else { - break - } + go func() { + for range onWait { + pool.ScaleUp(1) } - } else { - for amount < 0 { - if T.scaleDown(pool, currentScale) { - amount++ - currentScale-- - } else { - break + }() + + go func() { + for { + var wait time.Duration + + now := time.Now() + idle := pool.IdleSince() + for now.Sub(idle) > 15*time.Second { + if idle == (time.Time{}) { + break + } + pool.ScaleDown(1) + idle = pool.IdleSince() } - } - } - return amount -} -func (T *recipeWithConns) currentScale(pool *Pool) int { - if T.removed { - return 0 - } + if idle == (time.Time{}) { + wait = 15 * time.Second + } else { + wait = now.Sub(idle.Add(15 * time.Second)) + } - i := 0 - for j := 0; j < len(T.conns); j++ { - if pool.raw.GetServer(T.conns[j]) != nil { - T.conns[i] = T.conns[j] - i++ + time.Sleep(wait) } - } - - T.conns = T.conns[:i] - return i -} - -func (T *recipeWithConns) CurrentScale(pool *Pool) int { - T.mu.Lock() - defer T.mu.Unlock() + }() - return T.currentScale(pool) + return pool } -func (T *recipeWithConns) Scale(pool *Pool, amount int) int { - T.mu.Lock() - defer T.mu.Unlock() +func (T *Pool) tryAddServers(recipe *PoolRecipe, amount int) (remaining int) { + recipe.mu.Lock() + defer recipe.mu.Unlock() - if T.removed { - return amount - } - currentScale := T.currentScale(pool) - return T.scale(pool, currentScale, amount) -} + remaining = amount -func (T *recipeWithConns) setScale(pool *Pool, scale int) { - if T.removed { + if recipe.removed { return } - target := maths.Clamp(scale, T.recipe.GetMinConnections(), T.recipe.GetMaxConnections()) - currentScale := T.currentScale(pool) - target -= currentScale - - T.scale(pool, currentScale, target) -} -func (T *recipeWithConns) SetScale(pool *Pool, scale int) { - T.mu.Lock() - defer T.mu.Unlock() - - T.setScale(pool, scale) -} - -func (T *recipeWithConns) Added(pool *Pool) { - T.mu.Lock() - defer T.mu.Unlock() - - T.removed = false - T.setScale(pool, 0) -} - -func (T *recipeWithConns) Removed(pool *Pool) { - T.mu.Lock() - defer T.mu.Unlock() + j := 0 + for i := 0; i < len(recipe.servers); i++ { + if T.raw.GetServer(recipe.servers[i]) != nil { + recipe.servers[j] = recipe.servers[i] + j++ + } + } + recipe.servers = recipe.servers[:j] - T.removed = true + max := maths.Min(recipe.r.GetMaxConnections()-j, amount) + for i := 0; i < max; i++ { + conn, err := recipe.connect() + if err != nil { + log.Printf("error connecting to server: %v", err) + continue + } - for _, conn := range T.conns { - pool.raw.RemoveServer(conn) + id := T.raw.AddServer(conn) + recipe.servers = append(recipe.servers, id) + remaining-- } - T.conns = T.conns[:0] + return } -type Pool struct { - recipes map[string]*recipeWithConns - mu sync.Mutex +func (T *Pool) addRecipe(recipe *PoolRecipe) { + recipe.mu.Lock() + defer recipe.mu.Unlock() - ctx Context + recipe.removed = false + min := recipe.r.GetMinConnections() - len(recipe.servers) + for i := 0; i < min; i++ { + conn, err := recipe.connect() + if err != nil { + log.Printf("error connecting to server: %v", err) + continue + } - raw RawPool + id := T.raw.AddServer(conn) + recipe.servers = append(recipe.servers, id) + } } -func NewPool(rawPool RawPool) *Pool { - onWait := make(chan struct{}) +func (T *Pool) removeRecipe(recipe *PoolRecipe) { + recipe.mu.Lock() + defer recipe.mu.Unlock() - p := &Pool{ - ctx: Context{ - OnWait: onWait, - }, - raw: rawPool, + recipe.removed = true + for _, id := range recipe.servers { + T.raw.RemoveServer(id) } - go func() { - for range onWait { - p.Scale(1) - } - }() - - return p + recipe.servers = recipe.servers[:0] } -func (T *Pool) Serve(client zap.ReadWriter) { - T.raw.Serve(&T.ctx, client) +func (T *Pool) ScaleUp(amount int) (remaining int) { + remaining = amount + T.recipes.Range(func(_ string, r *PoolRecipe) bool { + remaining = T.tryAddServers(r, remaining) + return remaining != 0 + }) + return remaining } -func (T *Pool) CurrentScale() int { - T.mu.Lock() - recipes := make([]*recipeWithConns, 0, len(T.recipes)) - for _, recipe := range T.recipes { - recipes = append(recipes, recipe) - } - T.mu.Unlock() - - scale := 0 - for _, recipe := range recipes { - scale += recipe.CurrentScale(T) - } - return scale +func (T *Pool) ScaleDown(amount int) (remaining int) { + return T.raw.ScaleDown(amount) } -func (T *Pool) Scale(amount int) { - T.mu.Lock() - recipes := make([]*recipeWithConns, 0, len(T.recipes)) - for _, recipe := range T.recipes { - recipes = append(recipes, recipe) - } - T.mu.Unlock() - -outer: - for len(recipes) > 0 { - j := 0 - for i := 0; i < len(recipes); i++ { - recipe := recipes[i] - if amount > 0 { - if recipe.Scale(T, 1) == 0 { - amount-- - recipes[j] = recipes[i] - j++ - } - } else if amount < 0 { - if recipe.Scale(T, -1) == 0 { - amount++ - recipes[j] = recipes[i] - j++ - } - } else { - break outer - } - } - recipes = recipes[:j] - } +func (T *Pool) IdleSince() time.Time { + return T.raw.IdleSince() } func (T *Pool) AddRecipe(name string, recipe Recipe) { - r := &recipeWithConns{ - recipe: recipe, + r := &PoolRecipe{ + r: recipe, } - r.Added(T) - - T.mu.Lock() - old, ok := T.recipes[name] - if T.recipes == nil { - T.recipes = make(map[string]*recipeWithConns) - } - T.recipes[name] = r - T.mu.Unlock() - - if ok { - old.Removed(T) + T.addRecipe(r) + if old, ok := T.recipes.Swap(name, r); ok { + T.removeRecipe(old) } } func (T *Pool) RemoveRecipe(name string) { - T.mu.Lock() - r, ok := T.recipes[name] - delete(T.recipes, name) - T.mu.Unlock() - - if ok { - r.Removed(T) + if r, ok := T.recipes.LoadAndDelete(name); ok { + T.removeRecipe(r) } } + +func (T *Pool) Serve(client zap.ReadWriter) { + T.raw.Serve(&T.ctx, client) +} diff --git a/lib/gat/pools/session/pool.go b/lib/gat/pools/session/pool.go index 02ddf027356ff0c4169640d714c3677de93f7194..ea402cc7674dbacacdee0d5f5aaec5d07c59f1ba 100644 --- a/lib/gat/pools/session/pool.go +++ b/lib/gat/pools/session/pool.go @@ -10,6 +10,7 @@ import ( "pggat2/lib/gat" "pggat2/lib/util/chans" "pggat2/lib/util/maps" + "pggat2/lib/util/ring" "pggat2/lib/zap" ) @@ -20,7 +21,7 @@ type queueItem struct { type Pool struct { // use slice lifo for better perf - queue []queueItem + queue ring.Ring[queueItem] conns map[uuid.UUID]zap.ReadWriter ready sync.Cond qmu sync.Mutex @@ -35,25 +36,17 @@ func NewPool() *Pool { func (T *Pool) acquire(ctx *gat.Context) (uuid.UUID, zap.ReadWriter) { T.qmu.Lock() defer T.qmu.Unlock() - for { - if len(T.queue) > 0 { - item := T.queue[len(T.queue)-1] - T.queue = T.queue[:len(T.queue)-1] - conn, ok := T.conns[item.id] - if !ok { - continue - } - return item.id, conn - } - if ctx.OnWait != nil { - chans.TrySend(ctx.OnWait, struct{}{}) - } + for T.queue.Length() == 0 { + chans.TrySend(ctx.OnWait, struct{}{}) T.ready.Wait() } + + entry, _ := T.queue.PopBack() + return entry.id, T.conns[entry.id] } func (T *Pool) _release(id uuid.UUID) { - T.queue = append(T.queue, queueItem{ + T.queue.PushBack(queueItem{ added: time.Now(), id: id, }) @@ -118,6 +111,39 @@ func (T *Pool) RemoveServer(id uuid.UUID) zap.ReadWriter { return conn } +func (T *Pool) ScaleDown(amount int) (remaining int) { + remaining = amount + + T.qmu.Lock() + defer T.qmu.Unlock() + + for i := 0; i < amount; i++ { + v, ok := T.queue.PopFront() + if !ok { + break + } + + conn, ok := T.conns[v.id] + if !ok { + continue + } + delete(T.conns, v.id) + + _ = conn.Close() + remaining-- + } + + return +} + +func (T *Pool) IdleSince() time.Time { + T.qmu.Lock() + defer T.qmu.Unlock() + + v, _ := T.queue.Get(0) + return v.added +} + func (T *Pool) ReadMetrics(metrics *Metrics) { maps.Clear(metrics.Workers) @@ -128,7 +154,8 @@ func (T *Pool) ReadMetrics(metrics *Metrics) { T.qmu.Lock() defer T.qmu.Unlock() - for _, item := range T.queue { + for i := 0; i < T.queue.Length(); i++ { + item, _ := T.queue.Get(i) metrics.Workers[item.id] = WorkerMetrics{ LastActive: item.added, } diff --git a/lib/gat/pools/transaction/pool.go b/lib/gat/pools/transaction/pool.go index cfd3fe357583ef1bfd519fd97f28c33b7a6c8196..adb2fe43b819a6acf3ae4375fbc776cd15d37c3c 100644 --- a/lib/gat/pools/transaction/pool.go +++ b/lib/gat/pools/transaction/pool.go @@ -1,6 +1,8 @@ package transaction import ( + "time" + "github.com/google/uuid" "pggat2/lib/gat" @@ -85,6 +87,16 @@ func (T *Pool) Serve(ctx *gat.Context, client zap.ReadWriter) { _ = client.Close() } +func (T *Pool) ScaleDown(amount int) (remaining int) { + remaining = amount + // TODO(garet) + return +} + +func (T *Pool) IdleSince() time.Time { + return time.Time{} +} + func (T *Pool) ReadSchedulerMetrics(metrics *rob.Metrics) { T.s.ReadMetrics(metrics) } diff --git a/lib/util/maps/rwlocked.go b/lib/util/maps/rwlocked.go index a51239a20c400c5d54ba7da8b18ad967767b752e..7c8276c87257039c1fd203de08abf2fccf1efd4f 100644 --- a/lib/util/maps/rwlocked.go +++ b/lib/util/maps/rwlocked.go @@ -61,3 +61,16 @@ func (T *RWLocked[K, V]) Swap(key K, value V) (previous V, loaded bool) { T.inner[key] = value return } + +func (T *RWLocked[K, V]) Range(fn func(key K, value V) bool) bool { + T.mu.RLock() + for k, v := range T.inner { + T.mu.RUnlock() + if !fn(k, v) { + return false + } + T.mu.RLock() + } + T.mu.RUnlock() + return true +} diff --git a/lib/util/queue/queue.go b/lib/util/queue/queue.go new file mode 100644 index 0000000000000000000000000000000000000000..bb029b01cada5f8f1f675e0c6568ca3729bfe97d --- /dev/null +++ b/lib/util/queue/queue.go @@ -0,0 +1,38 @@ +package queue + +import "sync" + +type LIFO[T any] struct { + items []T + signal sync.Cond + mu sync.Mutex +} + +func (v *LIFO[T]) init() { + if v.signal.L == nil { + v.signal.L = &v.mu + } +} + +func (v *LIFO[T]) Push(item T) { + v.mu.Lock() + defer v.mu.Unlock() + v.init() + + v.items = append(v.items, item) + v.signal.Signal() +} + +func (v *LIFO[T]) Pop() T { + v.mu.Lock() + defer v.mu.Unlock() + v.init() + + for len(v.items) == 0 { + v.signal.Wait() + } + item := v.items[len(v.items)-1] + v.items = v.items[:len(v.items)-1] + + return item +}