diff --git a/lib/gat/modes/digitalocean_discovery/config.go b/lib/gat/modes/digitalocean_discovery/config.go index f21b8195d399131ad185733f29e9eef04a5fc1ea..ddf416577b32c468ce5b694ed1a488dceff8f9a4 100644 --- a/lib/gat/modes/digitalocean_discovery/config.go +++ b/lib/gat/modes/digitalocean_discovery/config.go @@ -164,7 +164,7 @@ func (T *Config) ListenAndServe() error { replicaAddr = net.JoinHostPort(replica.Connection.Host, strconv.Itoa(replica.Connection.Port)) } - p2.AddRecipe(replica.Name, recipe.NewRecipe(recipe.Options{ + p2.AddRecipe(replica.ID, recipe.NewRecipe(recipe.Options{ Dialer: dialer.Net{ Network: "tcp", Address: replicaAddr, diff --git a/lib/gat/pool/pool.go b/lib/gat/pool/pool.go index 9df08d807d469079446a9711a1b10bea0166b3b9..c68bbe0a8bb61af0d5c6136ebd062b98e4fa6c00 100644 --- a/lib/gat/pool/pool.go +++ b/lib/gat/pool/pool.go @@ -16,6 +16,7 @@ import ( packets "pggat/lib/fed/packets/v3.0" "pggat/lib/gat/metrics" "pggat/lib/gat/pool/recipe" + "pggat/lib/util/ring" "pggat/lib/util/slices" "pggat/lib/util/strutil" ) @@ -30,6 +31,8 @@ type Pool struct { pending chan struct{} recipes map[string]*recipe.Recipe + recipeOrder ring.Ring[string] + recipeOrderMu sync.Mutex clients map[uuid.UUID]*Client clientsByKey map[[8]byte]*Client servers map[uuid.UUID]*Server @@ -94,6 +97,7 @@ func (T *Pool) AddRecipe(name string, r *recipe.Recipe) { T.recipes = make(map[string]*recipe.Recipe) } T.recipes[name] = r + T.recipeOrder.PushBack(name) }() count := r.AllocateInitial() @@ -134,16 +138,25 @@ func (T *Pool) removeRecipe(name string) { func (T *Pool) scaleUpL0() (string, *recipe.Recipe) { T.mu.RLock() defer T.mu.RUnlock() + T.recipeOrderMu.Lock() + defer T.recipeOrderMu.Unlock() - for name, r := range T.recipes { + count := T.recipeOrder.Length() + for i := 0; i < count; i++ { + name, ok := T.recipeOrder.PopFront() + if !ok { + break + } + r, ok := T.recipes[name] + if !ok { + continue + } + T.recipeOrder.PushBack(name) if r.Allocate() { return name, r } } - if len(T.servers) > 0 { - return "", nil - } return "", nil } @@ -482,4 +495,5 @@ func (T *Pool) Close() { for name := range T.recipes { T.removeRecipe(name) } + T.recipeOrder.Clear() }