From ef3f7afab69bb86cf99374523811c44931096e07 Mon Sep 17 00:00:00 2001 From: Garet Halliday <me@garet.holiday> Date: Thu, 7 Sep 2023 18:34:04 -0500 Subject: [PATCH] works --- lib/gat/metrics/pool.go | 82 ++++++++++++++++++++++++++++++++++++++++- lib/gat/pool/conn.go | 50 ++++++++++++++++++++++--- lib/gat/pool/flow.go | 5 ++- lib/gat/pool/pool.go | 12 +++--- lib/gat/pool/state.go | 10 ----- 5 files changed, 135 insertions(+), 24 deletions(-) delete mode 100644 lib/gat/pool/state.go diff --git a/lib/gat/metrics/pool.go b/lib/gat/metrics/pool.go index 105e4ba3..509d4b94 100644 --- a/lib/gat/metrics/pool.go +++ b/lib/gat/metrics/pool.go @@ -1,6 +1,12 @@ package metrics import ( + "fmt" + "math" + "strconv" + "strings" + "time" + "github.com/google/uuid" "pggat2/lib/util/maps" @@ -11,11 +17,85 @@ type Pool struct { Clients map[uuid.UUID]Conn } +func (T *Pool) TransactionCount() int { + var serverTransactions int + var clientTransactions int + + for _, server := range T.Servers { + serverTransactions += server.TransactionCount + } + + for _, client := range T.Clients { + clientTransactions += client.TransactionCount + } + + if clientTransactions > serverTransactions { + return clientTransactions + } + return serverTransactions +} + +func connStateCounts(items map[uuid.UUID]Conn) [ConnStateCount]int { + var states [ConnStateCount]int + for _, item := range items { + states[item.State]++ + } + return states +} + +func connStateUtils(items map[uuid.UUID]Conn) [ConnStateCount]float64 { + var util [ConnStateCount]time.Duration + var total time.Duration + for _, item := range items { + for state, amount := range item.Utilization { + util[state] += amount + total += amount + } + } + + var states [ConnStateCount]float64 + for state := range states { + states[state] = float64(util[state]) / float64(total) + } + + return states +} + +func connStateUtilString(count [ConnStateCount]int, util [ConnStateCount]float64) string { + var b strings.Builder + + var addSpace bool + for state, u := range util { + if u == 0.0 || math.IsNaN(u) { + continue + } + if addSpace { + b.WriteString(", ") + } else { + addSpace = true + } + b.WriteString(strconv.Itoa(count[state])) + b.WriteString(" ") + b.WriteString(ConnState(state).String()) + b.WriteString(" (") + b.WriteString(strconv.FormatFloat(u*100, 'f', 2, 64)) + b.WriteString("%)") + } + + return b.String() +} + func (T *Pool) Clear() { maps.Clear(T.Servers) maps.Clear(T.Clients) } func (T *Pool) String() string { - return "TODO(garet)" // TODO(garet) + return fmt.Sprintf("%d transactions | %d servers (%s) | %d clients (%s)", + T.TransactionCount(), + len(T.Servers), + connStateUtilString(connStateCounts(T.Servers), connStateUtils(T.Servers)), + len(T.Clients), + connStateUtilString(connStateCounts(T.Clients), connStateUtils(T.Clients)), + ) } diff --git a/lib/gat/pool/conn.go b/lib/gat/pool/conn.go index 4da7df2a..9498b1b8 100644 --- a/lib/gat/pool/conn.go +++ b/lib/gat/pool/conn.go @@ -24,10 +24,15 @@ type Conn struct { transactionCount atomic.Int64 - state State + lastMetricsRead time.Time + + state metrics.ConnState peer uuid.UUID since time.Time - mu sync.RWMutex + + util [metrics.ConnStateCount]time.Duration + + mu sync.RWMutex } func MakeConn( @@ -66,15 +71,26 @@ func (T *Conn) TransactionComplete() { T.transactionCount.Add(1) } -func (T *Conn) SetState(state State, peer uuid.UUID) { +func (T *Conn) SetState(state metrics.ConnState, peer uuid.UUID) { T.mu.Lock() defer T.mu.Unlock() + + now := time.Now() + + var since time.Duration + if T.since.Before(T.lastMetricsRead) { + since = now.Sub(T.lastMetricsRead) + } else { + since = now.Sub(T.since) + } + T.util[T.state] += since + T.state = state T.peer = peer - T.since = time.Now() + T.since = now } -func (T *Conn) GetState() (state State, peer uuid.UUID, since time.Time) { +func (T *Conn) GetState() (state metrics.ConnState, peer uuid.UUID, since time.Time) { T.mu.RLock() defer T.mu.Unlock() state = T.state @@ -84,5 +100,29 @@ func (T *Conn) GetState() (state State, peer uuid.UUID, since time.Time) { } func (T *Conn) ReadMetrics(m *metrics.Conn) { + T.mu.Lock() + defer T.mu.Unlock() + + now := time.Now() + + m.Time = now + + m.State = T.state + m.Peer = T.peer + m.Since = T.since + + m.Utilization = T.util + T.util = [metrics.ConnStateCount]time.Duration{} + + var since time.Duration + if m.Since.Before(T.lastMetricsRead) { + since = now.Sub(T.lastMetricsRead) + } else { + since = now.Sub(m.Since) + } + m.Utilization[m.State] += since + + m.TransactionCount = int(T.transactionCount.Swap(0)) + T.lastMetricsRead = now } diff --git a/lib/gat/pool/flow.go b/lib/gat/pool/flow.go index 476fe204..e0dff1ec 100644 --- a/lib/gat/pool/flow.go +++ b/lib/gat/pool/flow.go @@ -3,13 +3,14 @@ package pool import ( "pggat2/lib/bouncer/backends/v0" packets "pggat2/lib/fed/packets/v3.0" + "pggat2/lib/gat/metrics" "pggat2/lib/middleware/middlewares/ps" "pggat2/lib/util/slices" ) func Pair(options Options, client *Client, server *Server) (clientErr, serverErr error) { - client.SetState(StateActive, server.GetID()) - server.SetState(StateActive, client.GetID()) + client.SetState(metrics.ConnStateActive, server.GetID()) + server.SetState(metrics.ConnStateActive, client.GetID()) switch options.ParameterStatusSync { case ParameterStatusSyncDynamic: diff --git a/lib/gat/pool/pool.go b/lib/gat/pool/pool.go index e0aa8533..d7c1a7fa 100644 --- a/lib/gat/pool/pool.go +++ b/lib/gat/pool/pool.go @@ -45,7 +45,7 @@ func (T *Pool) idlest() (server *Server, at time.Time) { for _, s := range T.servers { state, _, since := s.GetState() - if state != StateIdle { + if state != metrics.ConnStateIdle { continue } @@ -199,7 +199,7 @@ func (T *Pool) removeServerL1(server *Server) { } func (T *Pool) acquireServer(client *Client) *Server { - client.SetState(StateAwaitingServer, uuid.Nil) + client.SetState(metrics.ConnStateAwaitingServer, uuid.Nil) serverID := T.options.Pooler.Acquire(client.GetID(), SyncModeNonBlocking) if serverID == uuid.Nil { @@ -214,7 +214,7 @@ func (T *Pool) acquireServer(client *Client) *Server { } func (T *Pool) releaseServer(server *Server) { - server.SetState(StateRunningResetQuery, uuid.Nil) + server.SetState(metrics.ConnStateRunningResetQuery, uuid.Nil) if T.options.ServerResetQuery != "" { err := backends.QueryString(new(backends.Context), server.GetConn(), T.options.ServerResetQuery) @@ -224,7 +224,7 @@ func (T *Pool) releaseServer(server *Server) { } } - server.SetState(StateIdle, uuid.Nil) + server.SetState(metrics.ConnStateIdle, uuid.Nil) T.options.Pooler.Release(server.GetID()) } @@ -278,7 +278,7 @@ func (T *Pool) serve(client *Client) error { } else { TransactionComplete(client, server) if T.options.ReleaseAfterTransaction { - client.SetState(StateIdle, uuid.Nil) + client.SetState(metrics.ConnStateIdle, uuid.Nil) go T.releaseServer(server) // TODO(garet) does this need to be a goroutine server = nil } @@ -325,7 +325,7 @@ func (T *Pool) Cancel(key [8]byte) error { } state, peer, _ := client.GetState() - if state != StateActive { + if state != metrics.ConnStateActive { return nil } diff --git a/lib/gat/pool/state.go b/lib/gat/pool/state.go deleted file mode 100644 index 8221f3fd..00000000 --- a/lib/gat/pool/state.go +++ /dev/null @@ -1,10 +0,0 @@ -package pool - -type State int - -const ( - StateIdle State = iota - StateActive - StateAwaitingServer - StateRunningResetQuery -) -- GitLab