diff --git a/lib/gat/admin/admin.go b/lib/gat/admin/admin.go index ba873fddfebed6d39169651f7943b644d1de42a9..5297630e68271e0b2612707bb0ae407f023f0ac6 100644 --- a/lib/gat/admin/admin.go +++ b/lib/gat/admin/admin.go @@ -5,6 +5,8 @@ import ( "fmt" "gfx.cafe/gfx/pggat/lib/config" "gfx.cafe/gfx/pggat/lib/gat" + "gfx.cafe/gfx/pggat/lib/parse" + "strings" ) // The admin database, implemented through the gat.Pool interface, allowing it to be added to any existing Gat @@ -14,6 +16,10 @@ import ( "gfx.cafe/gfx/pggat/lib/gat/protocol" ) +const DataType_String = 25 +const DataType_Int64 = 20 +const DataType_Float64 = 701 + func getServerInfo(g gat.Gat) []*protocol.ParameterStatus { return []*protocol.ParameterStatus{ { @@ -108,7 +114,7 @@ func (p *Pool) ConnectionPools() []gat.ConnectionPool { } } -func (p *Pool) Stats() gat.PoolStats { +func (p *Pool) Stats() *gat.PoolStats { return nil // TODO } @@ -140,23 +146,215 @@ func (c *ConnectionPool) EnsureConfig(conf *config.Pool) { } func (c *ConnectionPool) Describe(ctx context.Context, client gat.Client, describe *protocol.Describe) error { - return errors.New("not implemented") + return errors.New("describe not implemented") } func (c *ConnectionPool) Execute(ctx context.Context, client gat.Client, execute *protocol.Execute) error { - return errors.New("not implemented") + return errors.New("execute not implemented") } func (c *ConnectionPool) SimpleQuery(ctx context.Context, client gat.Client, query string) error { - return errors.New("not implemented") + parsed, err := parse.Parse(query) + if err != nil { + return err + } + for _, cmd := range parsed { + switch strings.ToLower(cmd.Command) { + case "show": + if len(cmd.Arguments) < 1 { + return errors.New("usage: show [item]") + } + + switch strings.ToLower(cmd.Arguments[0]) { + case "stats": + rowDesc := new(protocol.RowDescription) + rowDesc.Fields.Fields = []protocol.FieldsRowDescriptionFields{ + { + Name: "database", + DataType: DataType_String, + DataTypeSize: -1, + TypeModifier: -1, + }, + { + Name: "total_xact_count", + DataType: DataType_Int64, + DataTypeSize: 8, + TypeModifier: -1, + }, + { + Name: "total_query_count", + DataType: DataType_Int64, + DataTypeSize: 8, + TypeModifier: -1, + }, + { + Name: "total_received", + DataType: DataType_Int64, + DataTypeSize: 8, + TypeModifier: -1, + }, + { + Name: "total_sent", + DataType: DataType_Int64, + DataTypeSize: 8, + TypeModifier: -1, + }, + { + Name: "total_xact_time", + DataType: DataType_Int64, + DataTypeSize: 8, + TypeModifier: -1, + }, + { + Name: "total_query_time", + DataType: DataType_Int64, + DataTypeSize: 8, + TypeModifier: -1, + }, + { + Name: "total_wait_time", + DataType: DataType_Int64, + DataTypeSize: 8, + TypeModifier: -1, + }, + { + Name: "avg_xact_count", + DataType: DataType_Float64, + DataTypeSize: 8, + TypeModifier: -1, + }, + { + Name: "avg_query_count", + DataType: DataType_Float64, + DataTypeSize: 8, + TypeModifier: -1, + }, + { + Name: "avg_recv", + DataType: DataType_Float64, + DataTypeSize: 8, + TypeModifier: -1, + }, + { + Name: "avg_sent", + DataType: DataType_Float64, + DataTypeSize: 8, + TypeModifier: -1, + }, + { + Name: "avg_xact_time", + DataType: DataType_Float64, + DataTypeSize: 8, + TypeModifier: -1, + }, + { + Name: "avg_query_time", + DataType: DataType_Float64, + DataTypeSize: 8, + TypeModifier: -1, + }, + { + Name: "avg_wait_time", + DataType: DataType_Float64, + DataTypeSize: 8, + TypeModifier: -1, + }, + } + err = client.Send(rowDesc) + if err != nil { + return err + } + for name, pool := range c.pool.gat.Pools() { + stats := pool.Stats() + if stats == nil { + continue + } + row := new(protocol.DataRow) + row.Fields.Columns = []protocol.FieldsDataRowColumns{ + { + []byte(name), + }, + { + []byte(fmt.Sprintf("%d", stats.TotalXactCount())), + }, + { + []byte(fmt.Sprintf("%d", stats.TotalQueryCount())), + }, + { + []byte(fmt.Sprintf("%d", stats.TotalReceived())), + }, + { + []byte(fmt.Sprintf("%d", stats.TotalSent())), + }, + { + []byte(fmt.Sprintf("%d", stats.TotalXactTime())), + }, + { + []byte(fmt.Sprintf("%d", stats.TotalQueryTime())), + }, + { + []byte(fmt.Sprintf("%d", stats.TotalWaitTime())), + }, + { + []byte(fmt.Sprintf("%f", stats.AvgXactCount())), + }, + { + []byte(fmt.Sprintf("%f", stats.AvgQueryCount())), + }, + { + []byte(fmt.Sprintf("%f", stats.AvgRecv())), + }, + { + []byte(fmt.Sprintf("%f", stats.AvgSent())), + }, + { + []byte(fmt.Sprintf("%f", stats.AvgXactTime())), + }, + { + []byte(fmt.Sprintf("%f", stats.AvgQueryTime())), + }, + { + []byte(fmt.Sprintf("%f", stats.AvgWaitTime())), + }, + } + err = client.Send(row) + if err != nil { + return err + } + } + done := new(protocol.CommandComplete) + done.Fields.Data = cmd.Command + err = client.Send(done) + if err != nil { + return err + } + default: + return errors.New("unknown command") + } + case "pause": + case "disable": + case "enable": + case "reconnect": + case "kill": + case "suspend": + case "resume": + case "shutdown": + case "reload": + case "wait_close": + case "set": + default: + return errors.New("unknown command") + } + } + return nil } func (c *ConnectionPool) Transaction(ctx context.Context, client gat.Client, query string) error { - return errors.New("not implemented") + return errors.New("transactions not implemented") } func (c *ConnectionPool) CallFunction(ctx context.Context, client gat.Client, payload *protocol.FunctionCall) error { - return errors.New("not implemented") + return errors.New("functions not implemented") } var _ gat.ConnectionPool = (*ConnectionPool)(nil) diff --git a/lib/gat/gatling/gatling.go b/lib/gat/gatling/gatling.go index bc76eb2b2d00ef9de81bd3e310d627c6ce76ff8f..4d6f6055f38028b2c53a753e79120a9845b672be 100644 --- a/lib/gat/gatling/gatling.go +++ b/lib/gat/gatling/gatling.go @@ -78,16 +78,10 @@ func (g *Gatling) GetPool(name string) (gat.Pool, error) { return srv, nil } -func (g *Gatling) Pools() []gat.Pool { +func (g *Gatling) Pools() map[string]gat.Pool { g.mu.RLock() defer g.mu.RUnlock() - out := make([]gat.Pool, len(g.pools)) - idx := 0 - for _, p := range g.pools { - out[idx] = p - idx += 1 - } - return out + return g.pools } func (g *Gatling) GetClient(id gat.ClientID) (gat.Client, error) { diff --git a/lib/gat/gatling/pool/conn_pool/conn_pool.go b/lib/gat/gatling/pool/conn_pool/conn_pool.go index 2cb82695fcba943813b404f1588ccdc6a3825ea9..ffb1eed5a3ecb81c3b005a8ee07adeac6a4c24b7 100644 --- a/lib/gat/gatling/pool/conn_pool/conn_pool.go +++ b/lib/gat/gatling/pool/conn_pool/conn_pool.go @@ -103,10 +103,12 @@ func (c *ConnectionPool) Execute(ctx context.Context, client gat.Client, e *prot } func (c *ConnectionPool) SimpleQuery(ctx context.Context, client gat.Client, q string) error { + defer c.pool.Stats().IncQueryCount() return c.getWorker().HandleSimpleQuery(ctx, client, q) } func (c *ConnectionPool) Transaction(ctx context.Context, client gat.Client, q string) error { + defer c.pool.Stats().IncXactCount() return c.getWorker().HandleTransaction(ctx, client, q) } diff --git a/lib/gat/gatling/pool/pool.go b/lib/gat/gatling/pool/pool.go index 85a1d86788922e40850fb78c7468f200f463364f..60fb058826d780469903f42151cae23f54c00003 100644 --- a/lib/gat/gatling/pool/pool.go +++ b/lib/gat/gatling/pool/pool.go @@ -15,7 +15,7 @@ type Pool struct { users map[string]config.User connPools map[string]gat.ConnectionPool - stats *Stats + stats *gat.PoolStats router query_router.QueryRouter @@ -25,7 +25,7 @@ type Pool struct { func NewPool(conf *config.Pool) *Pool { pool := &Pool{ connPools: make(map[string]gat.ConnectionPool), - stats: newStats(), + stats: gat.NewPoolStats(), } pool.EnsureConfig(conf) return pool @@ -86,7 +86,7 @@ func (p *Pool) ConnectionPools() []gat.ConnectionPool { return out } -func (p *Pool) Stats() gat.PoolStats { +func (p *Pool) Stats() *gat.PoolStats { return p.stats } diff --git a/lib/gat/gatling/pool/stats.go b/lib/gat/gatling/pool/stats.go deleted file mode 100644 index 0c16b76ab53a213aac99cf950ecea294cecdf7ef..0000000000000000000000000000000000000000 --- a/lib/gat/gatling/pool/stats.go +++ /dev/null @@ -1,91 +0,0 @@ -package pool - -import ( - "gfx.cafe/gfx/pggat/lib/gat" - "time" -) - -type Stats struct { - start time.Time - - xactCount int - queryCount int - waitCount int - received int - sent int - xactTime int - queryTime int - waitTime int -} - -func newStats() *Stats { - return &Stats{ - start: time.Now(), - } -} - -func (s *Stats) TotalXactCount() int { - return s.xactCount -} - -func (s *Stats) TotalQueryCount() int { - return s.queryCount -} - -func (s *Stats) TotalReceived() int { - return s.received -} - -func (s *Stats) TotalSent() int { - return s.sent -} - -func (s *Stats) TotalXactTime() int { - return s.xactTime -} - -func (s *Stats) TotalQueryTime() int { - return s.queryTime -} - -func (s *Stats) TotalWaitTime() int { - return s.waitTime -} - -func (s *Stats) totalTime() time.Duration { - return time.Now().Sub(s.start) -} - -func (s *Stats) AvgXactCount() float64 { - seconds := s.totalTime().Seconds() - return float64(s.xactCount) / seconds -} - -func (s *Stats) AvgQueryCount() float64 { - seconds := s.totalTime().Seconds() - return float64(s.queryCount) / seconds -} - -func (s *Stats) AvgRecv() float64 { - seconds := s.totalTime().Seconds() - return float64(s.received) / seconds -} - -func (s *Stats) AvgSent() float64 { - seconds := s.totalTime().Seconds() - return float64(s.sent) / seconds -} - -func (s *Stats) AvgXactTime() float64 { - return float64(s.xactTime) / float64(s.xactCount) -} - -func (s *Stats) AvgQueryTime() float64 { - return float64(s.queryTime) / float64(s.queryCount) -} - -func (s *Stats) AvgWaitTime() float64 { - return float64(s.waitTime) / float64(s.waitCount) -} - -var _ gat.PoolStats = (*Stats)(nil) diff --git a/lib/gat/interfaces.go b/lib/gat/interfaces.go index efe2e3a5fd0dc2e64ee360a5b943655ff01f7e92..07c9181a54fe0e5cc87302f5527091e02b2cce03 100644 --- a/lib/gat/interfaces.go +++ b/lib/gat/interfaces.go @@ -38,7 +38,7 @@ type Gat interface { Version() string Config() *config.Global GetPool(name string) (Pool, error) - Pools() []Pool + Pools() map[string]Pool GetClient(id ClientID) (Client, error) Clients() []Client } @@ -52,42 +52,11 @@ type Pool interface { WithUser(name string) (ConnectionPool, error) ConnectionPools() []ConnectionPool - Stats() PoolStats + Stats() *PoolStats EnsureConfig(c *config.Pool) } -type PoolStats interface { - // Total transactions - TotalXactCount() int - // Total queries - TotalQueryCount() int - // Total bytes received over network - TotalReceived() int - // Total bytes sent over network - TotalSent() int - // Total time spent doing transactions (in microseconds) - TotalXactTime() int - // Total time spent doing queries (in microseconds) - TotalQueryTime() int - // Total time spent waiting (in microseconds) - TotalWaitTime() int - // Average amount of transactions per second - AvgXactCount() float64 - // Average amount of queries per second - AvgQueryCount() float64 - // Average bytes received per second - AvgRecv() float64 - // Average bytes sent per second - AvgSent() float64 - // Average time transactions take (in microseconds) - AvgXactTime() float64 - // Average time queries take (in microseconds) - AvgQueryTime() float64 - // Average time waiting for work (in microseconds) - AvgWaitTime() float64 -} - type QueryRouter interface { InferRole(query string) (config.ServerRole, error) } diff --git a/lib/gat/stats.go b/lib/gat/stats.go new file mode 100644 index 0000000000000000000000000000000000000000..31098212bedd3bfd3455f9dd1ce4af246b0f2503 --- /dev/null +++ b/lib/gat/stats.go @@ -0,0 +1,94 @@ +package gat + +import "time" + +type PoolStats struct { + start time.Time + + xactCount int + queryCount int + waitCount int + received int + sent int + xactTime int + queryTime int + waitTime int +} + +func NewPoolStats() *PoolStats { + return &PoolStats{ + start: time.Now(), + } +} + +func (s *PoolStats) TotalXactCount() int { + return s.xactCount +} + +func (s *PoolStats) IncXactCount() { + s.xactCount += 1 +} + +func (s *PoolStats) TotalQueryCount() int { + return s.queryCount +} + +func (s *PoolStats) IncQueryCount() { + s.queryCount += 1 +} + +func (s *PoolStats) TotalReceived() int { + return s.received +} + +func (s *PoolStats) TotalSent() int { + return s.sent +} + +func (s *PoolStats) TotalXactTime() int { + return s.xactTime +} + +func (s *PoolStats) TotalQueryTime() int { + return s.queryTime +} + +func (s *PoolStats) TotalWaitTime() int { + return s.waitTime +} + +func (s *PoolStats) totalTime() time.Duration { + return time.Now().Sub(s.start) +} + +func (s *PoolStats) AvgXactCount() float64 { + seconds := s.totalTime().Seconds() + return float64(s.xactCount) / seconds +} + +func (s *PoolStats) AvgQueryCount() float64 { + seconds := s.totalTime().Seconds() + return float64(s.queryCount) / seconds +} + +func (s *PoolStats) AvgRecv() float64 { + seconds := s.totalTime().Seconds() + return float64(s.received) / seconds +} + +func (s *PoolStats) AvgSent() float64 { + seconds := s.totalTime().Seconds() + return float64(s.sent) / seconds +} + +func (s *PoolStats) AvgXactTime() float64 { + return float64(s.xactTime) / float64(s.xactCount) +} + +func (s *PoolStats) AvgQueryTime() float64 { + return float64(s.queryTime) / float64(s.queryCount) +} + +func (s *PoolStats) AvgWaitTime() float64 { + return float64(s.waitTime) / float64(s.waitCount) +}