From ce675d6ea7f4e19cf3d3308bb8e339c9cb407464 Mon Sep 17 00:00:00 2001
From: Garet Halliday <me@garet.holiday>
Date: Thu, 9 Nov 2023 15:43:28 -0600
Subject: [PATCH] a

---
 .../discoverers/digitalocean/config.go        |  8 +--
 .../discoverers/digitalocean/discoverer.go    | 19 +------
 .../pool/penalties/latency/penalty.go         | 42 ++++++++++++++++
 lib/gat/handlers/pool/penalty.go              |  8 +++
 lib/gat/handlers/pool/spool/config.go         |  3 ++
 lib/gat/handlers/pool/spool/pool.go           | 49 +++++++++++++++++--
 lib/gat/handlers/pool/spool/recipe.go         | 11 ++++-
 7 files changed, 109 insertions(+), 31 deletions(-)
 create mode 100644 lib/gat/handlers/pool/penalties/latency/penalty.go
 create mode 100644 lib/gat/handlers/pool/penalty.go

diff --git a/lib/gat/handlers/discovery/discoverers/digitalocean/config.go b/lib/gat/handlers/discovery/discoverers/digitalocean/config.go
index e00a9482..32ff4927 100644
--- a/lib/gat/handlers/discovery/discoverers/digitalocean/config.go
+++ b/lib/gat/handlers/discovery/discoverers/digitalocean/config.go
@@ -2,15 +2,9 @@ package digitalocean
 
 import "gfx.cafe/gfx/pggat/lib/util/strutil"
 
-type Priority struct {
-	Filter strutil.Matcher `json:"filter"`
-	Value  int             `json:"value"`
-}
-
 type Config struct {
 	APIKey  string `json:"api_key"`
 	Private bool   `json:"private,omitempty"`
 
-	Filter   strutil.Matcher `json:"filter,omitempty"`
-	Priority []Priority      `json:"priority,omitempty"`
+	Filter strutil.Matcher `json:"filter,omitempty"`
 }
diff --git a/lib/gat/handlers/discovery/discoverers/digitalocean/discoverer.go b/lib/gat/handlers/discovery/discoverers/digitalocean/discoverer.go
index 0f0b8752..59942c33 100644
--- a/lib/gat/handlers/discovery/discoverers/digitalocean/discoverer.go
+++ b/lib/gat/handlers/discovery/discoverers/digitalocean/discoverer.go
@@ -44,19 +44,6 @@ func (T *Discoverer) filter(tags []string) bool {
 	return T.Filter.Matches("")
 }
 
-func (T *Discoverer) priority(tags []string) int {
-	priority := 0
-	for _, setter := range T.Priority {
-		for _, tag := range tags {
-			if setter.Filter.Matches(tag) {
-				priority = setter.Value
-				break
-			}
-		}
-	}
-	return priority
-}
-
 func (T *Discoverer) Clusters() ([]discovery.Cluster, error) {
 	clusters, _, err := T.do.Databases.List(context.Background(), nil)
 	if err != nil {
@@ -84,8 +71,7 @@ func (T *Discoverer) Clusters() ([]discovery.Cluster, error) {
 		c := discovery.Cluster{
 			ID: cluster.ID,
 			Primary: discovery.Node{
-				Address:  primaryAddr,
-				Priority: T.priority(cluster.Tags),
+				Address: primaryAddr,
 			},
 			Databases: cluster.DBNames,
 			Users:     make([]discovery.User, 0, len(cluster.Users)),
@@ -117,8 +103,7 @@ func (T *Discoverer) Clusters() ([]discovery.Cluster, error) {
 				replicaAddr = net.JoinHostPort(replica.Connection.Host, strconv.Itoa(replica.Connection.Port))
 			}
 			c.Replicas[replica.ID] = discovery.Node{
-				Address:  replicaAddr,
-				Priority: T.priority(replica.Tags),
+				Address: replicaAddr,
 			}
 		}
 
diff --git a/lib/gat/handlers/pool/penalties/latency/penalty.go b/lib/gat/handlers/pool/penalties/latency/penalty.go
new file mode 100644
index 00000000..7b607f07
--- /dev/null
+++ b/lib/gat/handlers/pool/penalties/latency/penalty.go
@@ -0,0 +1,42 @@
+package latency
+
+import (
+	"time"
+
+	"github.com/caddyserver/caddy/v2"
+
+	"gfx.cafe/gfx/pggat/lib/bouncer/backends/v0"
+	"gfx.cafe/gfx/pggat/lib/fed"
+	"gfx.cafe/gfx/pggat/lib/gat/handlers/pool"
+)
+
+func init() {
+	caddy.RegisterModule((*Penalty)(nil))
+}
+
+type Penalty struct {
+	Threshold caddy.Duration `json:"threshold"`
+}
+
+func (T *Penalty) CaddyModule() caddy.ModuleInfo {
+	return caddy.ModuleInfo{
+		ID: "pggat.handlers.pool.penalties.latency",
+		New: func() caddy.Module {
+			return new(Penalty)
+		},
+	}
+}
+
+func (T *Penalty) Score(conn *fed.Conn) (int, error) {
+	start := time.Now()
+	err, _ := backends.QueryString(conn, nil, "select 0")
+	if err != nil {
+		return 0, err
+	}
+	dur := time.Since(start)
+	penalty := int(dur / time.Duration(T.Threshold))
+	return penalty, nil
+}
+
+var _ pool.Penalty = (*Penalty)(nil)
+var _ caddy.Module = (*Penalty)(nil)
diff --git a/lib/gat/handlers/pool/penalty.go b/lib/gat/handlers/pool/penalty.go
new file mode 100644
index 00000000..790090a8
--- /dev/null
+++ b/lib/gat/handlers/pool/penalty.go
@@ -0,0 +1,8 @@
+package pool
+
+import "gfx.cafe/gfx/pggat/lib/fed"
+
+type Penalty interface {
+	// Score calculates how much a recipe should be penalized. Lower is better
+	Score(conn *fed.Conn) (int, error)
+}
diff --git a/lib/gat/handlers/pool/spool/config.go b/lib/gat/handlers/pool/spool/config.go
index cc29f91c..e07415c4 100644
--- a/lib/gat/handlers/pool/spool/config.go
+++ b/lib/gat/handlers/pool/spool/config.go
@@ -23,5 +23,8 @@ type Config struct {
 	ReconnectInitialTime time.Duration
 	ReconnectMaxTime     time.Duration
 
+	PenaltyPeriod time.Duration
+	Penalties     []pool.Penalty
+
 	Logger *zap.Logger
 }
diff --git a/lib/gat/handlers/pool/spool/pool.go b/lib/gat/handlers/pool/spool/pool.go
index 7c9d131b..dadc9aab 100644
--- a/lib/gat/handlers/pool/spool/pool.go
+++ b/lib/gat/handlers/pool/spool/pool.go
@@ -1,6 +1,7 @@
 package spool
 
 import (
+	"math"
 	"sort"
 	"sync"
 	"time"
@@ -24,6 +25,7 @@ type Pool struct {
 
 	recipes          map[string]*Recipe
 	recipeScaleOrder []*Recipe
+	lastPenalize     time.Time
 	servers          map[uuid.UUID]*Server
 	mu               sync.RWMutex
 }
@@ -57,15 +59,53 @@ func (T *Pool) removeServer(server *Server, deleteFromRecipe, freeFromRecipe boo
 	}
 }
 
-func (T *Pool) sortRecipeScaleOrder() {
+func (T *Pool) penalizeRecipe(recipe *Recipe) error {
+	T.mu.RUnlock()
+	defer T.mu.RLock()
+
+	recipe.Penalty = 0
+
+	conn, err := recipe.Recipe.Dial()
+	if err != nil {
+		return err
+	}
+	defer func() {
+		_ = conn.Close()
+	}()
+
+	for _, penalty := range T.config.Penalties {
+		var p int
+		p, err = penalty.Score(conn)
+		if err != nil {
+			return err
+		}
+
+		recipe.Penalty += p
+	}
+
+	return nil
+}
+
+func (T *Pool) sortRecipes() {
+	if len(T.config.Penalties) > 0 && time.Since(T.lastPenalize) > T.config.PenaltyPeriod {
+		for _, recipe := range T.recipes {
+			if err := T.penalizeRecipe(recipe); err != nil {
+				T.config.Logger.Error("failed to score recipe", zap.Error(err))
+				recipe.Penalty = math.MaxInt
+			}
+		}
+
+		T.lastPenalize = time.Now()
+	}
+
 	sort.Slice(T.recipeScaleOrder, func(i, j int) bool {
 		a := T.recipeScaleOrder[i]
 		b := T.recipeScaleOrder[j]
 		// sort by priority first
-		if a.Recipe.Priority < b.Recipe.Priority {
+		if a.Score() < b.Score() {
 			return true
 		}
-		if a.Recipe.Priority > b.Recipe.Priority {
+		if a.Score() > b.Score() {
 			return false
 		}
 		// then sort by number of servers
@@ -81,7 +121,6 @@ func (T *Pool) addRecipe(name string, recipe *pool.Recipe) *Recipe {
 	}
 	T.recipes[name] = r
 	T.recipeScaleOrder = append(T.recipeScaleOrder, r)
-	T.sortRecipeScaleOrder()
 
 	return r
 }
@@ -122,6 +161,7 @@ func (T *Pool) RemoveRecipe(name string) {
 }
 
 func (T *Pool) scaleUpL0() *Recipe {
+	T.sortRecipes()
 	for _, recipe := range T.recipeScaleOrder {
 		if !recipe.Recipe.Allocate() {
 			continue
@@ -162,7 +202,6 @@ func (T *Pool) ScaleUpOnce(recipe *Recipe) bool {
 		T.servers = make(map[uuid.UUID]*Server)
 	}
 	T.servers[server.ID] = server
-	T.sortRecipeScaleOrder()
 
 	T.pooler.AddServer(server.ID)
 
diff --git a/lib/gat/handlers/pool/spool/recipe.go b/lib/gat/handlers/pool/spool/recipe.go
index b0e30a17..ee7700bd 100644
--- a/lib/gat/handlers/pool/spool/recipe.go
+++ b/lib/gat/handlers/pool/spool/recipe.go
@@ -3,8 +3,11 @@ package spool
 import "gfx.cafe/gfx/pggat/lib/gat/handlers/pool"
 
 type Recipe struct {
-	Name    string
-	Recipe  *pool.Recipe
+	Name   string
+	Recipe *pool.Recipe
+
+	Penalty int
+
 	Servers []*Server
 }
 
@@ -14,3 +17,7 @@ func NewRecipe(name string, recipe *pool.Recipe) *Recipe {
 		Recipe: recipe,
 	}
 }
+
+func (T *Recipe) Score() int {
+	return T.Recipe.Priority + T.Penalty
+}
-- 
GitLab