diff --git a/lib/gat/admin/admin.go b/lib/gat/admin/admin.go index 32823d98174ee701ac8e3dd63bddf96530de140a..4becc7f97f18931835d2fa033c08ed58f12a9e3c 100644 --- a/lib/gat/admin/admin.go +++ b/lib/gat/admin/admin.go @@ -1,22 +1,20 @@ package admin import ( + "context" "errors" "fmt" + "time" + "gfx.cafe/gfx/pggat/lib/config" "gfx.cafe/gfx/pggat/lib/gat" + "gfx.cafe/gfx/pggat/lib/gat/protocol" "gfx.cafe/gfx/pggat/lib/parse" "gfx.cafe/gfx/pggat/lib/util/cmux" - "time" ) // The admin database, implemented through the gat.Database interface, allowing it to be added to any existing Gat -import ( - "context" - "gfx.cafe/gfx/pggat/lib/gat/protocol" -) - const DataType_String = 25 const DataType_Int64 = 20 const DataType_Float64 = 701 diff --git a/lib/gat/gatling/client/client.go b/lib/gat/gatling/client/client.go index 20ac2ca5b4ba95db943bb5fe13dca464001e8d89..393ccb8deb84ac860b9286f0eeb0d113a110b14d 100644 --- a/lib/gat/gatling/client/client.go +++ b/lib/gat/gatling/client/client.go @@ -7,23 +7,25 @@ import ( "crypto/tls" "errors" "fmt" + "io" + "math" + "math/big" + "net" + "reflect" + "sync" + "sync/atomic" + "time" + "gfx.cafe/gfx/pggat/lib/config" "gfx.cafe/gfx/pggat/lib/gat" "gfx.cafe/gfx/pggat/lib/gat/gatling/messages" "gfx.cafe/gfx/pggat/lib/gat/protocol" "gfx.cafe/gfx/pggat/lib/gat/protocol/pg_error" + "gfx.cafe/gfx/pggat/lib/metrics" "gfx.cafe/ghalliday1/pg3p" "gfx.cafe/ghalliday1/pg3p/lex" "git.tuxpa.in/a/zlog" "git.tuxpa.in/a/zlog/log" - "io" - "math" - "math/big" - "net" - "reflect" - "sync" - "sync/atomic" - "time" ) type CountReader[T io.Reader] struct { @@ -415,11 +417,7 @@ func (c *Client) Accept(ctx context.Context) error { } open, err = c.tick(ctx) // add send and recv to pool - stats := c.server.GetDatabase().GetStats() - if stats != nil { - stats.AddTotalSent(c.wr.BytesWritten.Swap(0)) - stats.AddTotalReceived(c.r.BytesRead.Swap(0)) - } + metrics.RecordBytes(c.poolName, c.username, c.wr.BytesWritten.Swap(0), c.r.BytesRead.Swap(0)) if !open { break } diff --git a/lib/gat/pool/transaction/worker.go b/lib/gat/pool/transaction/worker.go index 94a3083aa85d221665de8c142c25ac2ede43d6c8..8ac9b0107ffc7a44478e32cc3128c97c41fd5427 100644 --- a/lib/gat/pool/transaction/worker.go +++ b/lib/gat/pool/transaction/worker.go @@ -4,14 +4,15 @@ import ( "context" "errors" "fmt" + "math/rand" + "sync" + "time" + "gfx.cafe/gfx/pggat/lib/config" "gfx.cafe/gfx/pggat/lib/gat" "gfx.cafe/gfx/pggat/lib/gat/pool/transaction/shard" "gfx.cafe/gfx/pggat/lib/gat/protocol" "gfx.cafe/gfx/pggat/lib/gat/protocol/pg_error" - "math/rand" - "sync" - "time" ) // a single use worker with an embedded connection database. @@ -181,6 +182,7 @@ func (w *worker) HandleSimpleQuery(ctx context.Context, c gat.Client, query stri start := time.Now() defer func() { + // w.w.user.Name w.w.database.GetStats().AddQueryTime(time.Now().Sub(start).Microseconds()) }() @@ -212,6 +214,7 @@ func (w *worker) HandleTransaction(ctx context.Context, c gat.Client, query stri start := time.Now() defer func() { w.w.database.GetStats().AddXactTime(time.Now().Sub(start).Microseconds()) + // metrics.PoolMetrics(w.., w.w.user.Name) }() errch := make(chan error) diff --git a/lib/metrics/buckets.go b/lib/metrics/buckets.go index d44cdc5f4857834d2a320847240fe7355a087ff9..f2ef0b46b442bfae5fdfaf4bde7184a7aeda5949 100644 --- a/lib/metrics/buckets.go +++ b/lib/metrics/buckets.go @@ -1,5 +1,6 @@ package metrics -var taskDurationBucketsUs = []float64{1000, 3000, 5000, 10000, 20000, 30000, 40000, 50000, 60000, 70000, 80000, 90000, 100000, 150000, 300000, 500000, 5000000, 10000000, 30000000, 45000000, 60000000} -var requestDurationHistBucketNs = []float64{1000000, 3000000, 5000000, 10000000, 20000000, 30000000, 40000000, 50000000, 60000000, 70000000, 80000000, 90000000, 100000000, 150000000, 300000000, 500000000, 5000000000, 10000000000, 30000000000, 45000000000, 60000000000} -var marshalDurationHistBucketsNs = []float64{100000, 300000, 500000, 1000000, 2000000, 3000000, 4000000, 5000000, 6000000, 7000000, 8000000, 9000000, 10000000, 15000000, 30000000, 50000000, 500000000, 1000000000, 3000000000, 4500000000, 6000000000} +var bucketsUs = []float64{1000, 3000, 5000, 10000, 20000, 30000, 40000, 50000, 60000, 70000, 80000, 90000, 100000, 150000, 300000, 500000, 5000000, 10000000, 30000000, 45000000, 60000000} + +// var requestDurationHistBucketNs = []float64{1000000, 3000000, 5000000, 10000000, 20000000, 30000000, 40000000, 50000000, 60000000, 70000000, 80000000, 90000000, 100000000, 150000000, 300000000, 500000000, 5000000000, 10000000000, 30000000000, 45000000000, 60000000000} +// var bucketsNs = []float64{100000, 300000, 500000, 1000000, 2000000, 3000000, 4000000, 5000000, 6000000, 7000000, 8000000, 9000000, 10000000, 15000000, 30000000, 50000000, 500000000, 1000000000, 3000000000, 4500000000, 6000000000} diff --git a/lib/metrics/pool.go b/lib/metrics/pool.go index 47c0ad4ce0c0e0f98ec1089f38c1ca6cf63555fe..908b196089dc7fb92f6a98b1cf75593acb8fa00c 100644 --- a/lib/metrics/pool.go +++ b/lib/metrics/pool.go @@ -1,6 +1,8 @@ package metrics import ( + "time" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" ) @@ -31,7 +33,7 @@ func newPoolMetrics(db string, user string) poolMetrics { TxLatency: promauto.NewHistogramVec(prometheus.HistogramOpts{ Name: "pggat_db_transaction_latency", Help: "transaction latency", - Buckets: taskDurationBucketsUs, + Buckets: bucketsUs, ConstLabels: prometheus.Labels{ "db": db, "user": user, @@ -40,7 +42,7 @@ func newPoolMetrics(db string, user string) poolMetrics { QueryLatency: promauto.NewHistogramVec(prometheus.HistogramOpts{ Name: "pggat_db_query_latency", Help: "query latency", - Buckets: taskDurationBucketsUs, + Buckets: bucketsUs, ConstLabels: prometheus.Labels{ "db": db, "user": user, @@ -49,7 +51,7 @@ func newPoolMetrics(db string, user string) poolMetrics { WaitLatency: promauto.NewHistogramVec(prometheus.HistogramOpts{ Name: "pggat_db_wait_latency", Help: "wait latency", - Buckets: taskDurationBucketsUs, + Buckets: bucketsUs, ConstLabels: prometheus.Labels{ "db": db, "user": user, @@ -74,3 +76,20 @@ func newPoolMetrics(db string, user string) poolMetrics { } return o } + +func RecordBytes(db string, user string, sent, received int64) { + if !On() { + return + } + p := PoolMetrics(db, user) + p.SentBytes.WithLabelValues().Add(float64(sent)) + p.ReceivedBytes.WithLabelValues().Add(float64(received)) +} + +func RecordQueryTime(db string, user string, dur time.Duration) { + if !On() { + return + } + p := PoolMetrics(db, user) + p.QueryLatency.WithLabelValues().Observe(float64(dur.Microseconds())) +}