From 5f5b3e91b2c9e467c79e88edd7f15716d3a67007 Mon Sep 17 00:00:00 2001
From: a <a@tuxpa.in>
Date: Mon, 17 Jun 2024 19:28:36 -0500
Subject: [PATCH] noot

---
 lib/gat/handlers/pool/pools/basic/pool.go  | 26 +++++++++++-
 lib/gat/handlers/pool/pools/hybrid/pool.go | 49 ++++++++++++++++++----
 lib/instrumentation/prom/metrics.go        |  9 ----
 lib/instrumentation/prom/pool_hybrid.go    | 44 +++++++++++++++++++
 lib/instrumentation/prom/pool_simple.go    | 37 ++++++++++++++++
 5 files changed, 146 insertions(+), 19 deletions(-)
 create mode 100644 lib/instrumentation/prom/pool_hybrid.go
 create mode 100644 lib/instrumentation/prom/pool_simple.go

diff --git a/lib/gat/handlers/pool/pools/basic/pool.go b/lib/gat/handlers/pool/pools/basic/pool.go
index bfbb6c10..fb60015d 100644
--- a/lib/gat/handlers/pool/pools/basic/pool.go
+++ b/lib/gat/handlers/pool/pools/basic/pool.go
@@ -3,6 +3,7 @@ package basic
 import (
 	"fmt"
 	"sync"
+	"time"
 
 	"github.com/google/uuid"
 
@@ -15,6 +16,7 @@ import (
 	"gfx.cafe/gfx/pggat/lib/gat/handlers/pool"
 	"gfx.cafe/gfx/pggat/lib/gat/handlers/pool/spool"
 	"gfx.cafe/gfx/pggat/lib/gat/metrics"
+	"gfx.cafe/gfx/pggat/lib/instrumentation/prom"
 	"gfx.cafe/gfx/pggat/lib/util/slices"
 )
 
@@ -216,6 +218,18 @@ func (T *Pool) Serve(conn *fed.Conn) error {
 		client.Conn.Ready = true
 	}
 
+	poolLabels := prom.PoolSimpleLabels{}
+	{
+		if T.config.ReleaseAfterTransaction {
+			poolLabels.Mode = "transaction"
+		} else {
+			poolLabels.Mode = "session"
+		}
+		prom.PoolSimple.Accepted(poolLabels).Inc()
+		prom.PoolSimple.Current(poolLabels).Inc()
+		defer prom.PoolSimple.Current(poolLabels).Dec()
+	}
+	opLabels := poolLabels.ToOperation()
 	for {
 		if server != nil && T.config.ReleaseAfterTransaction {
 			client.SetState(metrics.ConnStateIdle, nil)
@@ -230,6 +244,7 @@ func (T *Pool) Serve(conn *fed.Conn) error {
 		}
 
 		if server == nil {
+			start := time.Now()
 			client.SetState(metrics.ConnStateAwaitingServer, nil)
 
 			server = T.servers.Acquire(client.ID)
@@ -238,9 +253,18 @@ func (T *Pool) Serve(conn *fed.Conn) error {
 			}
 
 			err, serverErr = T.Pair(client, server)
+			dur := time.Since(start)
+			if err == nil && serverErr == nil {
+				prom.OperationSimple.Acquire(opLabels).Observe(float64(dur) / float64(time.Millisecond))
+			}
 		}
 		if err == nil && serverErr == nil {
-			err, serverErr = bouncers.Bounce(client.Conn, server.Conn, packet)
+			{
+				start := time.Now()
+				err, serverErr = bouncers.Bounce(client.Conn, server.Conn, packet)
+				dur := time.Since(start)
+				prom.OperationSimple.Execution(opLabels).Observe(float64(dur) / float64(time.Millisecond))
+			}
 		}
 
 		if serverErr != nil {
diff --git a/lib/gat/handlers/pool/pools/hybrid/pool.go b/lib/gat/handlers/pool/pools/hybrid/pool.go
index b014d843..01135460 100644
--- a/lib/gat/handlers/pool/pools/hybrid/pool.go
+++ b/lib/gat/handlers/pool/pools/hybrid/pool.go
@@ -3,6 +3,7 @@ package hybrid
 import (
 	"fmt"
 	"sync"
+	"time"
 
 	"github.com/google/uuid"
 
@@ -113,7 +114,7 @@ func (T *Pool) removeClient(client *Client) {
 	delete(T.clients, client.Conn.BackendKey)
 }
 
-func (T *Pool) serveRW(conn *fed.Conn) error {
+func (T *Pool) serveRW(l prom.PoolHybridLabels, conn *fed.Conn) error {
 	m := NewMiddleware()
 
 	eqpa := eqp.NewClient()
@@ -223,18 +224,24 @@ func (T *Pool) serveRW(conn *fed.Conn) error {
 
 		// try replica first (if it isn't empty)
 		if !T.replica.Empty() {
+			start := time.Now()
 			replica = T.replica.Acquire(client.ID)
 			if replica == nil {
 				return pool.ErrFailedToAcquirePeer
 			}
 
 			err, serverErr = T.Pair(client, replica)
+			dur := time.Since(start)
 
 			psi.Set(psa)
 			eqpi.Set(eqpa)
 
 			if err == nil && serverErr == nil {
+				prom.OperationHybrid.Acquire(l.ToOperation("replica")).Observe(float64(dur) / float64(time.Millisecond))
+				start := time.Now()
 				err, serverErr = bouncers.Bounce(conn, replica.Conn, packet)
+				dur := time.Since(start)
+				prom.OperationHybrid.Execution(l.ToOperation("replica")).Observe(float64(dur) / float64(time.Millisecond))
 			}
 			if serverErr != nil {
 				return fmt.Errorf("server error: %w", serverErr)
@@ -244,6 +251,7 @@ func (T *Pool) serveRW(conn *fed.Conn) error {
 
 			// fallback to primary
 			if err == (ErrReadOnly{}) {
+				prom.OperationHybrid.Miss(l.ToOperation("replica")).Inc()
 				m.Primary()
 
 				T.replica.Release(replica)
@@ -257,21 +265,29 @@ func (T *Pool) serveRW(conn *fed.Conn) error {
 				client.SetState(metrics.ConnStateAwaitingServer, nil, false)
 
 				// acquire primary
+				start := time.Now()
 				primary = T.primary.Acquire(client.ID)
 				if primary == nil {
 					return pool.ErrFailedToAcquirePeer
 				}
 
 				serverErr = T.PairPrimary(client, psi, eqpi, primary)
+				dur := time.Since(start)
 
 				if serverErr == nil {
+					prom.OperationHybrid.Acquire(l.ToOperation("primary")).Observe(float64(dur) / float64(time.Millisecond))
+					start := time.Now()
 					err, serverErr = bouncers.Bounce(conn, primary.Conn, packet)
+					dur := time.Since(start)
+					prom.OperationHybrid.Execution(l.ToOperation("primary")).Observe(float64(dur) / float64(time.Millisecond))
 				}
 				if serverErr != nil {
 					return fmt.Errorf("server error: %w", serverErr)
 				} else {
 					primary.TransactionComplete()
 				}
+			} else {
+				prom.OperationHybrid.Hit(l.ToOperation("replica")).Inc()
 			}
 		} else {
 			// straight to primary
@@ -284,6 +300,7 @@ func (T *Pool) serveRW(conn *fed.Conn) error {
 
 			client.SetState(metrics.ConnStateAwaitingServer, nil, false)
 
+			start := time.Now()
 			// acquire primary
 			primary = T.primary.Acquire(client.ID)
 			if primary == nil {
@@ -292,8 +309,15 @@ func (T *Pool) serveRW(conn *fed.Conn) error {
 
 			err, serverErr = T.Pair(client, primary)
 
+			dur := time.Since(start)
+
 			if err == nil && serverErr == nil {
+				prom.OperationHybrid.Acquire(l.ToOperation("primary")).Observe(float64(dur) / float64(time.Millisecond))
+				start := time.Now()
 				err, serverErr = bouncers.Bounce(conn, primary.Conn, packet)
+				dur := time.Since(start)
+				prom.OperationHybrid.Execution(l.ToOperation("primary")).Observe(float64(dur) / float64(time.Millisecond))
+
 			}
 			if serverErr != nil {
 				return fmt.Errorf("server error: %w", serverErr)
@@ -310,7 +334,7 @@ func (T *Pool) serveRW(conn *fed.Conn) error {
 	}
 }
 
-func (T *Pool) serveOnly(conn *fed.Conn, write bool) error {
+func (T *Pool) serveOnly(l prom.PoolHybridLabels, conn *fed.Conn, write bool) error {
 	var sp *spool.Pool
 	if write {
 		sp = &T.primary
@@ -410,20 +434,27 @@ func (T *Pool) serveOnly(conn *fed.Conn, write bool) error {
 }
 
 func (T *Pool) Serve(conn *fed.Conn) error {
-	labels := prom.HybridPoolLabels{}
+	labels := prom.PoolHybridLabels{}
 	switch conn.InitialParameters[strutil.MakeCIString("hybrid.mode")] {
 	case "ro":
 		labels.Mode = "ro"
-		prom.Pool.AcceptedHybrid(labels).Inc()
-		return T.serveOnly(conn, false)
 	case "wo":
 		labels.Mode = "wo"
-		prom.Pool.AcceptedHybrid(labels).Inc()
-		return T.serveOnly(conn, true)
 	default:
 		labels.Mode = "rw"
-		prom.Pool.AcceptedHybrid(labels).Inc()
-		return T.serveRW(conn)
+	}
+	prom.PoolHybrid.Accepted(labels).Inc()
+	prom.PoolHybrid.Current(labels).Inc()
+	defer prom.PoolHybrid.Current(labels).Dec()
+	switch labels.Mode {
+	case "ro":
+		return T.serveOnly(labels, conn, false)
+	case "wo":
+		return T.serveOnly(labels, conn, true)
+	case "rw":
+		return T.serveRW(labels, conn)
+	default:
+		panic("impossible")
 	}
 }
 
diff --git a/lib/instrumentation/prom/metrics.go b/lib/instrumentation/prom/metrics.go
index 0124ddbb..53835c9d 100644
--- a/lib/instrumentation/prom/metrics.go
+++ b/lib/instrumentation/prom/metrics.go
@@ -15,15 +15,6 @@ var Listener struct {
 	Client   func(ListenerLabels) prometheus.Gauge   `name:"client" help:"current clients"`
 }
 
-type HybridPoolLabels struct {
-	Mode string `label:"hybrid_mode"`
-}
-
-var Pool struct {
-	AcceptedHybrid func(HybridPoolLabels) prometheus.Counter `name:"accepted_hybrid" help:"hybrid connections accepted"`
-}
-
 func init() {
 	gotoprom.MustInit(&Listener, "pggat_listener", prometheus.Labels{})
-	gotoprom.MustInit(&Pool, "pggat_pool", prometheus.Labels{})
 }
diff --git a/lib/instrumentation/prom/pool_hybrid.go b/lib/instrumentation/prom/pool_hybrid.go
new file mode 100644
index 00000000..588839f1
--- /dev/null
+++ b/lib/instrumentation/prom/pool_hybrid.go
@@ -0,0 +1,44 @@
+package prom
+
+import (
+	"gfx.cafe/open/gotoprom"
+	"github.com/prometheus/client_golang/prometheus"
+)
+
+func init() {
+	gotoprom.MustInit(&PoolHybrid, "pggat_pool_hybrid", prometheus.Labels{})
+	gotoprom.MustInit(&OperationHybrid, "pggat_operation_hybrid", prometheus.Labels{})
+}
+
+type PoolHybridLabels struct {
+	Mode string `label:"hybrid_mode"`
+}
+
+type OperationHybridLabels struct {
+	Pool string `label:"pool"`
+	Mode string `label:"mode"`
+
+	Target string `label:"target"`
+}
+
+func (s *PoolHybridLabels) ToOperation(
+	target string,
+) OperationHybridLabels {
+	return OperationHybridLabels{
+		Pool:   "hybrid",
+		Mode:   s.Mode,
+		Target: target,
+	}
+}
+
+var PoolHybrid struct {
+	Accepted func(PoolHybridLabels) prometheus.Counter `name:"accepted" help:"hybrid connections accepted"`
+	Current  func(PoolHybridLabels) prometheus.Gauge   `name:"current" help:"current hybrid connections"`
+}
+
+var OperationHybrid struct {
+	Acquire   func(OperationHybridLabels) prometheus.Histogram `name:"acquire_ms"    buckets:"0.005,0.01,0.1,0.25,0.5,0.75,1,5,10,100,500,1000,5000"  help:"ms to acquire from pool"`
+	Execution func(OperationHybridLabels) prometheus.Histogram `name:"execution_ms"  buckets:"1,5,10,30,75,150,300,500,1000,2000,5000,7500,10000,15000,30000" help:"ms that the txn took to execute on remote"`
+	Miss      func(OperationHybridLabels) prometheus.Counter   `name:"write_misses" help:"queries which failed replica"`
+	Hit       func(OperationHybridLabels) prometheus.Counter   `name:"write_hits" help:"queries which failed replica"`
+}
diff --git a/lib/instrumentation/prom/pool_simple.go b/lib/instrumentation/prom/pool_simple.go
new file mode 100644
index 00000000..ceffaebd
--- /dev/null
+++ b/lib/instrumentation/prom/pool_simple.go
@@ -0,0 +1,37 @@
+package prom
+
+import (
+	"gfx.cafe/open/gotoprom"
+	"github.com/prometheus/client_golang/prometheus"
+)
+
+func init() {
+	gotoprom.MustInit(&PoolSimple, "pggat_pool_simple", make(prometheus.Labels))
+	gotoprom.MustInit(&OperationSimple, "pggat_operation_simple", make(prometheus.Labels))
+}
+
+var PoolSimple struct {
+	Accepted func(PoolSimpleLabels) prometheus.Counter `name:"accepted" help:"simple connections accepted"`
+	Current  func(PoolSimpleLabels) prometheus.Gauge   `name:"current" help:"current simple connections"`
+}
+
+type PoolSimpleLabels struct {
+	Mode string `label:"mode"`
+}
+
+func (s *PoolSimpleLabels) ToOperation() OperationSimpleLabels {
+	return OperationSimpleLabels{
+		Pool: "basic",
+		Mode: s.Mode,
+	}
+}
+
+type OperationSimpleLabels struct {
+	Pool string `label:"pool"`
+	Mode string `label:"mode"`
+}
+
+var OperationSimple struct {
+	Acquire   func(OperationSimpleLabels) prometheus.Histogram `name:"acquire_ms"    buckets:"0.005,0.01,0.1,0.25,0.5,0.75,1,5,10,100,500,1000,5000"  help:"ms to acquire from pool"`
+	Execution func(OperationSimpleLabels) prometheus.Histogram `name:"execution_ms"  buckets:"1,5,10,30,75,150,300,500,1000,2000,5000,7500,10000,15000,30000" help:"ms that the txn took to execute on remote"`
+}
-- 
GitLab