diff --git a/lib/gat/pool/pool.go b/lib/gat/pool/pool.go index c68bbe0a8bb61af0d5c6136ebd062b98e4fa6c00..1d668135eee24d51a5e1f048335c5af4b79225ab 100644 --- a/lib/gat/pool/pool.go +++ b/lib/gat/pool/pool.go @@ -16,7 +16,6 @@ import ( packets "pggat/lib/fed/packets/v3.0" "pggat/lib/gat/metrics" "pggat/lib/gat/pool/recipe" - "pggat/lib/util/ring" "pggat/lib/util/slices" "pggat/lib/util/strutil" ) @@ -30,14 +29,13 @@ type Pool struct { pendingCount atomic.Int64 pending chan struct{} - recipes map[string]*recipe.Recipe - recipeOrder ring.Ring[string] - recipeOrderMu sync.Mutex - clients map[uuid.UUID]*Client - clientsByKey map[[8]byte]*Client - servers map[uuid.UUID]*Server - serversByRecipe map[string][]*Server - mu sync.RWMutex + recipes map[string]*recipe.Recipe + recipeScaleOrder slices.Sorted[string] + clients map[uuid.UUID]*Client + clientsByKey map[[8]byte]*Client + servers map[uuid.UUID]*Server + serversByRecipe map[string][]*Server + mu sync.RWMutex } func NewPool(options Options) *Pool { @@ -97,7 +95,11 @@ func (T *Pool) AddRecipe(name string, r *recipe.Recipe) { T.recipes = make(map[string]*recipe.Recipe) } T.recipes[name] = r - T.recipeOrder.PushBack(name) + + // add to front of scale order + T.recipeScaleOrder = T.recipeScaleOrder.Insert(name, func(n string) int { + return len(T.serversByRecipe[n]) + }) }() count := r.AllocateInitial() @@ -128,6 +130,8 @@ func (T *Pool) removeRecipe(name string) { servers := T.serversByRecipe[name] delete(T.serversByRecipe, name) + // remove from recipeScaleOrder + T.recipeScaleOrder = slices.Delete(T.recipeScaleOrder, name) for _, server := range servers { r.Free() @@ -138,20 +142,9 @@ func (T *Pool) removeRecipe(name string) { func (T *Pool) scaleUpL0() (string, *recipe.Recipe) { T.mu.RLock() defer T.mu.RUnlock() - T.recipeOrderMu.Lock() - defer T.recipeOrderMu.Unlock() - count := T.recipeOrder.Length() - for i := 0; i < count; i++ { - name, ok := T.recipeOrder.PopFront() - if !ok { - break - } - r, ok := T.recipes[name] - if !ok { - continue - } - T.recipeOrder.PushBack(name) + for _, name := range T.recipeScaleOrder { + r := T.recipes[name] if r.Allocate() { return name, r } @@ -194,6 +187,10 @@ func (T *Pool) scaleUpL1(name string, r *recipe.Recipe) error { T.serversByRecipe = make(map[string][]*Server) } T.serversByRecipe[name] = append(T.serversByRecipe[name], server) + // update order + T.recipeScaleOrder.Update(slices.Index(T.recipeScaleOrder, name), func(n string) int { + return len(T.serversByRecipe[n]) + }) return server, nil }() @@ -232,7 +229,12 @@ func (T *Pool) removeServerL1(server *Server) { T.pooler.DeleteServer(server.GetID()) _ = server.GetConn().Close() if T.serversByRecipe != nil { - T.serversByRecipe[server.GetRecipe()] = slices.Delete(T.serversByRecipe[server.GetRecipe()], server) + name := server.GetRecipe() + T.serversByRecipe[name] = slices.Delete(T.serversByRecipe[name], server) + // update order + T.recipeScaleOrder.Update(slices.Index(T.recipeScaleOrder, name), func(n string) int { + return len(T.serversByRecipe[n]) + }) } } @@ -495,5 +497,4 @@ func (T *Pool) Close() { for name := range T.recipes { T.removeRecipe(name) } - T.recipeOrder.Clear() } diff --git a/lib/util/slices/index.go b/lib/util/slices/index.go new file mode 100644 index 0000000000000000000000000000000000000000..dc2af77934c636bc1679d9e7c0665f7afd6c914f --- /dev/null +++ b/lib/util/slices/index.go @@ -0,0 +1,10 @@ +package slices + +func Index[T comparable](haystack []T, needle T) int { + for i, v := range haystack { + if needle == v { + return i + } + } + return -1 +} diff --git a/lib/util/slices/remove.go b/lib/util/slices/remove.go index e0c239a653295225d72f58619285524a12bd6131..1848711f5065da4f78b972da1e0675d17ecd24bc 100644 --- a/lib/util/slices/remove.go +++ b/lib/util/slices/remove.go @@ -4,26 +4,22 @@ package slices // with length-1. The original slice will contain all items (though in a different order), and the new slice will contain all // but item. func Remove[T comparable](slice []T, item T) []T { - for i, s := range slice { - if s == item { - copy(slice[i:], slice[i+1:]) - slice[len(slice)-1] = item - return slice[:len(slice)-1] - } + i := Index(slice, item) + if i == -1 { + return slice } - - return slice + copy(slice[i:], slice[i+1:]) + slice[len(slice)-1] = item + return slice[:len(slice)-1] } // Delete is similar to Remove but leaves a *new(T) in the old slice, allowing the value to be GC'd func Delete[T comparable](slice []T, item T) []T { - for i, s := range slice { - if s == item { - copy(slice[i:], slice[i+1:]) - slice[len(slice)-1] = *new(T) - return slice[:len(slice)-1] - } + i := Index(slice, item) + if i == -1 { + return slice } - - return slice + copy(slice[i:], slice[i+1:]) + slice[len(slice)-1] = *new(T) + return slice[:len(slice)-1] } diff --git a/lib/util/slices/sorted.go b/lib/util/slices/sorted.go new file mode 100644 index 0000000000000000000000000000000000000000..2c8a4eb87414d89701e7aa517d0cd210d0336672 --- /dev/null +++ b/lib/util/slices/sorted.go @@ -0,0 +1,55 @@ +package slices + +// Sorted is a sorted slice. As long as all items are inserted by Insert, updated by Update, and removed by Delete, +// this slice will stay sorted +type Sorted[V any] []V + +func (T Sorted[V]) Insert(value V, sorter func(V) int) Sorted[V] { + key := sorter(value) + for i, v := range T { + if sorter(v) < key { + continue + } + + res := append(T, *new(V)) + copy(res[i+1:], res[i:]) + res[i] = value + return res + } + + return append(T, value) +} + +func (T Sorted[V]) Update(index int, sorter func(V) int) { + value := T[index] + key := sorter(value) + + for i, v := range T { + switch { + case i < index: + if sorter(v) < key { + continue + } + + // move all up by one, move from index to i + copy(T[i+1:], T[i:index]) + T[i] = value + return + case i > index: + if sorter(v) < key { + continue + } + + // move all down by one, move from index to i + copy(T[index:], T[index+1:i]) + T[i-1] = value + return + default: + continue + } + } + + // move all down by one, move from index to i + copy(T[index:], T[index+1:]) + T[len(T)-1] = value +} diff --git a/lib/util/slices/sorted_test.go b/lib/util/slices/sorted_test.go new file mode 100644 index 0000000000000000000000000000000000000000..49e19e073bf939cd94dfaaed2f793cfde6936b31 --- /dev/null +++ b/lib/util/slices/sorted_test.go @@ -0,0 +1,79 @@ +package slices + +import ( + "sort" + "testing" + + "tuxpa.in/a/zlog/log" +) + +func TestSorted_Insert(t *testing.T) { + sorter := func(v string) int { + return len(v) + } + + expected := []string{ + "test", + "abc", + "this is a long string", + "gjkdfjgksg", + "retre", + "abd", + "def", + "ttierotiretiiret34t43t34534", + } + + var x Sorted[string] + for _, v := range expected { + x = x.Insert(v, sorter) + } + + if !sort.SliceIsSorted(x, func(i, j int) bool { + return sorter(x[i]) < sorter(x[j]) + }) { + t.Errorf("slice isn't sorted: %#v", x) + } +} + +func TestSorted_Update(t *testing.T) { + values := map[string]int{ + "abc": 43, + "def": 32, + "cool": 594390069, + "amazing": -432, + "i hope this works": 32, + } + + sorter := func(v string) int { + return values[v] + } + + var x Sorted[string] + for v := range values { + x = x.Insert(v, sorter) + } + + if !sort.SliceIsSorted(x, func(i, j int) bool { + return sorter(x[i]) < sorter(x[j]) + }) { + t.Errorf("slice isn't sorted: %#v", x) + } + + log.Printf("%#v", x) + + values["cool"] = -10 + x.Update(Index(x, "cool"), sorter) + values["amazing"] = 543543 + x.Update(Index(x, "amazing"), sorter) + x.Update(Index(x, "abc"), sorter) + values["i hope this works"] = 44 + x.Update(Index(x, "i hope this works"), sorter) + values["abc"] = 31 + x.Update(Index(x, "abc"), sorter) + + if !sort.SliceIsSorted(x, func(i, j int) bool { + return sorter(x[i]) < sorter(x[j]) + }) { + t.Errorf("slice isn't sorted: %#v", x) + } +}