From 07501505cf056d12a622147306d7a1785b0ef27c Mon Sep 17 00:00:00 2001 From: Garet Halliday <ghalliday@gfxlabs.io> Date: Thu, 15 Sep 2022 15:38:09 -0500 Subject: [PATCH] general stats complete --- README.md | 2 +- lib/gat/admin/admin.go | 533 +++++++++++----- lib/gat/gatling/client/client.go | 649 ++------------------ lib/gat/gatling/pool/conn_pool/conn_pool.go | 20 +- lib/gat/gatling/pool/conn_pool/worker.go | 11 + lib/gat/gatling/pool/pool.go | 489 +-------------- lib/gat/interfaces.go | 8 +- lib/gat/stats.go | 56 +- 8 files changed, 498 insertions(+), 1270 deletions(-) diff --git a/README.md b/README.md index 566844bd..aef9743d 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ i'll lyk when its done | Load balancing of read queries | :white_check_mark: | :white_check_mark: | Using random between replicas. Primary is included when `primary_reads_enabled` is enabled (default). | | Sharding | :white_check_mark: | no | Transactions are sharded using `SET SHARD TO` and `SET SHARDING KEY TO` syntax extensions; see examples below. | | Failover | :white_check_mark: | no | Replicas are tested with a health check. If a health check fails, remaining replicas are attempted; see below for algorithm description and examples. | -| Statistics | :white_check_mark: | no | Statistics available in the admin database (`pgcat` and `pgbouncer`) with `SHOW STATS`, `SHOW POOLS` and others. | +| Statistics | :white_check_mark: | :white_check_mark: | Statistics available in the admin database (`pgcat` and `pgbouncer`) with `SHOW STATS`, `SHOW POOLS` and others. | | Live configuration reloading | :white_check_mark: | kind of | Reload supported settings with a `SIGHUP` to the process, e.g. `kill -s SIGHUP $(pgrep pgcat)` or `RELOAD` query issued to the admin database. | | Client authentication | :white_check_mark: :wrench: | same as them | MD5 password authentication is supported, SCRAM is on the roadmap; one user is used to connect to Postgres with both SCRAM and MD5 supported. | | Admin database | :white_check_mark: | :white_check_mark: | The admin database, similar to PgBouncer's, allows to query for statistics and reload the configuration. | diff --git a/lib/gat/admin/admin.go b/lib/gat/admin/admin.go index 5297630e..c0854cc9 100644 --- a/lib/gat/admin/admin.go +++ b/lib/gat/admin/admin.go @@ -7,6 +7,7 @@ import ( "gfx.cafe/gfx/pggat/lib/gat" "gfx.cafe/gfx/pggat/lib/parse" "strings" + "time" ) // The admin database, implemented through the gat.Pool interface, allowing it to be added to any existing Gat @@ -78,6 +79,350 @@ type Pool struct { connPool *ConnectionPool } +func (p *Pool) showStats(client gat.Client, totals, averages bool) error { + rowDesc := new(protocol.RowDescription) + rowDesc.Fields.Fields = []protocol.FieldsRowDescriptionFields{ + { + Name: "database", + DataType: DataType_String, + DataTypeSize: -1, + TypeModifier: -1, + }, + } + if totals { + rowDesc.Fields.Fields = append(rowDesc.Fields.Fields, + protocol.FieldsRowDescriptionFields{ + Name: "total_xact_count", + DataType: DataType_Int64, + DataTypeSize: 8, + TypeModifier: -1, + }, + protocol.FieldsRowDescriptionFields{ + Name: "total_query_count", + DataType: DataType_Int64, + DataTypeSize: 8, + TypeModifier: -1, + }, + protocol.FieldsRowDescriptionFields{ + Name: "total_received", + DataType: DataType_Int64, + DataTypeSize: 8, + TypeModifier: -1, + }, + protocol.FieldsRowDescriptionFields{ + Name: "total_sent", + DataType: DataType_Int64, + DataTypeSize: 8, + TypeModifier: -1, + }, + protocol.FieldsRowDescriptionFields{ + Name: "total_xact_time", + DataType: DataType_Int64, + DataTypeSize: 8, + TypeModifier: -1, + }, + protocol.FieldsRowDescriptionFields{ + Name: "total_query_time", + DataType: DataType_Int64, + DataTypeSize: 8, + TypeModifier: -1, + }, + protocol.FieldsRowDescriptionFields{ + Name: "total_wait_time", + DataType: DataType_Int64, + DataTypeSize: 8, + TypeModifier: -1, + }) + } + if averages { + rowDesc.Fields.Fields = append(rowDesc.Fields.Fields, + protocol.FieldsRowDescriptionFields{ + Name: "avg_xact_count", + DataType: DataType_Float64, + DataTypeSize: 8, + TypeModifier: -1, + }, + protocol.FieldsRowDescriptionFields{ + Name: "avg_query_count", + DataType: DataType_Float64, + DataTypeSize: 8, + TypeModifier: -1, + }, + protocol.FieldsRowDescriptionFields{ + Name: "avg_recv", + DataType: DataType_Float64, + DataTypeSize: 8, + TypeModifier: -1, + }, + protocol.FieldsRowDescriptionFields{ + Name: "avg_sent", + DataType: DataType_Float64, + DataTypeSize: 8, + TypeModifier: -1, + }, + protocol.FieldsRowDescriptionFields{ + Name: "avg_xact_time", + DataType: DataType_Float64, + DataTypeSize: 8, + TypeModifier: -1, + }, + protocol.FieldsRowDescriptionFields{ + Name: "avg_query_time", + DataType: DataType_Float64, + DataTypeSize: 8, + TypeModifier: -1, + }, + protocol.FieldsRowDescriptionFields{ + Name: "avg_wait_time", + DataType: DataType_Float64, + DataTypeSize: 8, + TypeModifier: -1, + }) + } + err := client.Send(rowDesc) + if err != nil { + return err + } + for name, pl := range p.gat.Pools() { + stats := pl.GetStats() + if stats == nil { + continue + } + row := new(protocol.DataRow) + row.Fields.Columns = []protocol.FieldsDataRowColumns{ + { + []byte(name), + }, + } + if totals { + row.Fields.Columns = append(row.Fields.Columns, + protocol.FieldsDataRowColumns{ + []byte(fmt.Sprintf("%d", stats.TotalXactCount())), + }, + protocol.FieldsDataRowColumns{ + []byte(fmt.Sprintf("%d", stats.TotalQueryCount())), + }, + protocol.FieldsDataRowColumns{ + []byte(fmt.Sprintf("%d", stats.TotalReceived())), + }, + protocol.FieldsDataRowColumns{ + []byte(fmt.Sprintf("%d", stats.TotalSent())), + }, + protocol.FieldsDataRowColumns{ + []byte(fmt.Sprintf("%d", stats.TotalXactTime())), + }, + protocol.FieldsDataRowColumns{ + []byte(fmt.Sprintf("%d", stats.TotalQueryTime())), + }, + protocol.FieldsDataRowColumns{ + []byte(fmt.Sprintf("%d", stats.TotalWaitTime())), + }) + } + if averages { + row.Fields.Columns = append(row.Fields.Columns, + protocol.FieldsDataRowColumns{ + []byte(fmt.Sprintf("%f", stats.AvgXactCount())), + }, + protocol.FieldsDataRowColumns{ + []byte(fmt.Sprintf("%f", stats.AvgQueryCount())), + }, + protocol.FieldsDataRowColumns{ + []byte(fmt.Sprintf("%f", stats.AvgRecv())), + }, + protocol.FieldsDataRowColumns{ + []byte(fmt.Sprintf("%f", stats.AvgSent())), + }, + protocol.FieldsDataRowColumns{ + []byte(fmt.Sprintf("%f", stats.AvgXactTime())), + }, + protocol.FieldsDataRowColumns{ + []byte(fmt.Sprintf("%f", stats.AvgQueryTime())), + }, + protocol.FieldsDataRowColumns{ + []byte(fmt.Sprintf("%f", stats.AvgWaitTime())), + }) + } + err = client.Send(row) + if err != nil { + return err + } + } + return nil +} + +func (p *Pool) showTotals(client gat.Client) error { + rowDesc := new(protocol.RowDescription) + rowDesc.Fields.Fields = []protocol.FieldsRowDescriptionFields{ + { + Name: "total_xact_count", + DataType: DataType_Int64, + DataTypeSize: 8, + TypeModifier: -1, + }, + { + Name: "total_query_count", + DataType: DataType_Int64, + DataTypeSize: 8, + TypeModifier: -1, + }, + { + Name: "total_received", + DataType: DataType_Int64, + DataTypeSize: 8, + TypeModifier: -1, + }, + { + Name: "total_sent", + DataType: DataType_Int64, + DataTypeSize: 8, + TypeModifier: -1, + }, + { + Name: "total_xact_time", + DataType: DataType_Int64, + DataTypeSize: 8, + TypeModifier: -1, + }, + { + Name: "total_query_time", + DataType: DataType_Int64, + DataTypeSize: 8, + TypeModifier: -1, + }, + { + Name: "total_wait_time", + DataType: DataType_Int64, + DataTypeSize: 8, + TypeModifier: -1, + }, + { + Name: "avg_xact_count", + DataType: DataType_Float64, + DataTypeSize: 8, + TypeModifier: -1, + }, + { + Name: "avg_query_count", + DataType: DataType_Float64, + DataTypeSize: 8, + TypeModifier: -1, + }, + { + Name: "avg_recv", + DataType: DataType_Float64, + DataTypeSize: 8, + TypeModifier: -1, + }, + { + Name: "avg_sent", + DataType: DataType_Float64, + DataTypeSize: 8, + TypeModifier: -1, + }, + { + Name: "avg_xact_time", + DataType: DataType_Float64, + DataTypeSize: 8, + TypeModifier: -1, + }, + { + Name: "avg_query_time", + DataType: DataType_Float64, + DataTypeSize: 8, + TypeModifier: -1, + }, + { + Name: "avg_wait_time", + DataType: DataType_Float64, + DataTypeSize: 8, + TypeModifier: -1, + }, + } + err := client.Send(rowDesc) + if err != nil { + return err + } + + var totalXactCount, totalQueryCount, totalWaitCount, totalReceived, totalSent, totalXactTime, totalQueryTime, totalWaitTime int + var alive time.Duration + + for _, pl := range p.gat.Pools() { + stats := pl.GetStats() + if stats == nil { + continue + } + totalXactCount += stats.TotalXactCount() + totalQueryCount += stats.TotalQueryCount() + totalWaitCount += stats.TotalWaitCount() + totalReceived += stats.TotalReceived() + totalSent += stats.TotalSent() + totalXactTime += stats.TotalXactTime() + totalQueryTime += stats.TotalQueryTime() + totalWaitTime += stats.TotalWaitTime() + + active := stats.TimeActive() + if active > alive { + alive = active + } + } + + avgXactCount := float64(totalXactCount) / alive.Seconds() + avgQueryCount := float64(totalQueryCount) / alive.Seconds() + avgReceive := float64(totalReceived) / alive.Seconds() + avgSent := float64(totalSent) / alive.Seconds() + avgXactTime := float64(totalXactTime) / float64(totalXactCount) + avgQueryTime := float64(totalQueryTime) / float64(totalQueryCount) + avgWaitTime := float64(totalWaitTime) / float64(totalWaitCount) + + row := new(protocol.DataRow) + row.Fields.Columns = []protocol.FieldsDataRowColumns{ + { + []byte(fmt.Sprintf("%d", totalXactCount)), + }, + { + []byte(fmt.Sprintf("%d", totalQueryCount)), + }, + { + []byte(fmt.Sprintf("%d", totalReceived)), + }, + { + []byte(fmt.Sprintf("%d", totalSent)), + }, + { + []byte(fmt.Sprintf("%d", totalXactTime)), + }, + { + []byte(fmt.Sprintf("%d", totalQueryTime)), + }, + { + []byte(fmt.Sprintf("%d", totalWaitTime)), + }, + { + []byte(fmt.Sprintf("%f", avgXactCount)), + }, + { + []byte(fmt.Sprintf("%f", avgQueryCount)), + }, + { + []byte(fmt.Sprintf("%f", avgReceive)), + }, + { + []byte(fmt.Sprintf("%f", avgSent)), + }, + { + []byte(fmt.Sprintf("%f", avgXactTime)), + }, + { + []byte(fmt.Sprintf("%f", avgQueryTime)), + }, + { + []byte(fmt.Sprintf("%f", avgWaitTime)), + }, + } + + return client.Send(row) +} + func NewPool(g gat.Gat) *Pool { out := &Pool{ gat: g, @@ -114,8 +459,8 @@ func (p *Pool) ConnectionPools() []gat.ConnectionPool { } } -func (p *Pool) Stats() *gat.PoolStats { - return nil // TODO +func (p *Pool) GetStats() *gat.PoolStats { + return nil } func (p *Pool) EnsureConfig(c *config.Pool) { @@ -136,7 +481,11 @@ func (c *ConnectionPool) GetServerInfo() []*protocol.ParameterStatus { return getServerInfo(c.pool.gat) } -func (c *ConnectionPool) Shards() []gat.Shard { +func (c *ConnectionPool) GetPool() gat.Pool { + return c.pool +} + +func (c *ConnectionPool) GetShards() []gat.Shard { // this db is within gat, there are no shards return nil } @@ -167,170 +516,26 @@ func (c *ConnectionPool) SimpleQuery(ctx context.Context, client gat.Client, que switch strings.ToLower(cmd.Arguments[0]) { case "stats": - rowDesc := new(protocol.RowDescription) - rowDesc.Fields.Fields = []protocol.FieldsRowDescriptionFields{ - { - Name: "database", - DataType: DataType_String, - DataTypeSize: -1, - TypeModifier: -1, - }, - { - Name: "total_xact_count", - DataType: DataType_Int64, - DataTypeSize: 8, - TypeModifier: -1, - }, - { - Name: "total_query_count", - DataType: DataType_Int64, - DataTypeSize: 8, - TypeModifier: -1, - }, - { - Name: "total_received", - DataType: DataType_Int64, - DataTypeSize: 8, - TypeModifier: -1, - }, - { - Name: "total_sent", - DataType: DataType_Int64, - DataTypeSize: 8, - TypeModifier: -1, - }, - { - Name: "total_xact_time", - DataType: DataType_Int64, - DataTypeSize: 8, - TypeModifier: -1, - }, - { - Name: "total_query_time", - DataType: DataType_Int64, - DataTypeSize: 8, - TypeModifier: -1, - }, - { - Name: "total_wait_time", - DataType: DataType_Int64, - DataTypeSize: 8, - TypeModifier: -1, - }, - { - Name: "avg_xact_count", - DataType: DataType_Float64, - DataTypeSize: 8, - TypeModifier: -1, - }, - { - Name: "avg_query_count", - DataType: DataType_Float64, - DataTypeSize: 8, - TypeModifier: -1, - }, - { - Name: "avg_recv", - DataType: DataType_Float64, - DataTypeSize: 8, - TypeModifier: -1, - }, - { - Name: "avg_sent", - DataType: DataType_Float64, - DataTypeSize: 8, - TypeModifier: -1, - }, - { - Name: "avg_xact_time", - DataType: DataType_Float64, - DataTypeSize: 8, - TypeModifier: -1, - }, - { - Name: "avg_query_time", - DataType: DataType_Float64, - DataTypeSize: 8, - TypeModifier: -1, - }, - { - Name: "avg_wait_time", - DataType: DataType_Float64, - DataTypeSize: 8, - TypeModifier: -1, - }, - } - err = client.Send(rowDesc) - if err != nil { - return err - } - for name, pool := range c.pool.gat.Pools() { - stats := pool.Stats() - if stats == nil { - continue - } - row := new(protocol.DataRow) - row.Fields.Columns = []protocol.FieldsDataRowColumns{ - { - []byte(name), - }, - { - []byte(fmt.Sprintf("%d", stats.TotalXactCount())), - }, - { - []byte(fmt.Sprintf("%d", stats.TotalQueryCount())), - }, - { - []byte(fmt.Sprintf("%d", stats.TotalReceived())), - }, - { - []byte(fmt.Sprintf("%d", stats.TotalSent())), - }, - { - []byte(fmt.Sprintf("%d", stats.TotalXactTime())), - }, - { - []byte(fmt.Sprintf("%d", stats.TotalQueryTime())), - }, - { - []byte(fmt.Sprintf("%d", stats.TotalWaitTime())), - }, - { - []byte(fmt.Sprintf("%f", stats.AvgXactCount())), - }, - { - []byte(fmt.Sprintf("%f", stats.AvgQueryCount())), - }, - { - []byte(fmt.Sprintf("%f", stats.AvgRecv())), - }, - { - []byte(fmt.Sprintf("%f", stats.AvgSent())), - }, - { - []byte(fmt.Sprintf("%f", stats.AvgXactTime())), - }, - { - []byte(fmt.Sprintf("%f", stats.AvgQueryTime())), - }, - { - []byte(fmt.Sprintf("%f", stats.AvgWaitTime())), - }, - } - err = client.Send(row) - if err != nil { - return err - } - } - done := new(protocol.CommandComplete) - done.Fields.Data = cmd.Command - err = client.Send(done) - if err != nil { - return err - } + err = c.pool.showStats(client, true, true) + case "stats_totals": + err = c.pool.showStats(client, true, false) + case "stats_averages": + err = c.pool.showStats(client, false, true) + case "totals": + err = c.pool.showTotals(client) default: return errors.New("unknown command") } + if err != nil { + return err + } + + done := new(protocol.CommandComplete) + done.Fields.Data = cmd.Command + err = client.Send(done) + if err != nil { + return err + } case "pause": case "disable": case "enable": diff --git a/lib/gat/gatling/client/client.go b/lib/gat/gatling/client/client.go index b2246063..aaa23518 100644 --- a/lib/gat/gatling/client/client.go +++ b/lib/gat/gatling/client/client.go @@ -23,14 +23,49 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" ) +type CountReader[T io.Reader] struct { + BytesRead atomic.Int64 + Reader T +} + +func NewCountReader[T io.Reader](reader T) *CountReader[T] { + return &CountReader[T]{ + Reader: reader, + } +} + +func (C *CountReader[T]) Read(p []byte) (n int, err error) { + n, err = C.Reader.Read(p) + C.BytesRead.Add(int64(n)) + return +} + +type CountWriter[T io.Writer] struct { + BytesWritten atomic.Int64 + Writer T +} + +func NewCountWriter[T io.Writer](writer T) *CountWriter[T] { + return &CountWriter[T]{ + Writer: writer, + } +} + +func (C *CountWriter[T]) Write(p []byte) (n int, err error) { + n, err = C.Writer.Write(p) + C.BytesWritten.Add(int64(n)) + return +} + // / client state, one per client type Client struct { conn net.Conn - r *bufio.Reader - wr *bufio.Writer + r *CountReader[*bufio.Reader] + wr *CountWriter[*bufio.Writer] recv chan protocol.Packet @@ -99,6 +134,10 @@ func (c *Client) RemotePid() int { return int(c.pid) } +func (c *Client) GetConnectionPool() gat.ConnectionPool { + return c.server +} + func NewClient( gatling gat.Gat, conf *config.Global, @@ -110,8 +149,8 @@ func NewClient( c := &Client{ conn: conn, - r: bufio.NewReader(conn), - wr: bufio.NewWriter(conn), + r: NewCountReader(bufio.NewReader(conn)), + wr: NewCountWriter(bufio.NewWriter(conn)), recv: make(chan protocol.Packet), pid: int32(pid.Int64()), secretKey: int32(skey.Int64()), @@ -163,7 +202,7 @@ func (c *Client) Accept(ctx context.Context) error { if err != nil { return err } - err = c.wr.Flush() + err = c.wr.Writer.Flush() if err != nil { return err } @@ -177,7 +216,7 @@ func (c *Client) Accept(ctx context.Context) error { if err != nil { return err } - err = c.wr.Flush() + err = c.wr.Writer.Flush() if err != nil { return err } @@ -192,8 +231,8 @@ func (c *Client) Accept(ctx context.Context) error { InsecureSkipVerify: true, } c.conn = tls.Server(c.conn, cfg) - c.r = bufio.NewReader(c.conn) - c.wr = bufio.NewWriter(c.conn) + c.r.Reader = bufio.NewReader(c.conn) + c.wr.Writer = bufio.NewWriter(c.conn) err = startup.Read(c.r) if err != nil { return err @@ -343,6 +382,12 @@ func (c *Client) Accept(ctx context.Context) error { return err } open, err = c.tick(ctx) + // add send and recv to pool + stats := c.server.GetPool().GetStats() + if stats != nil { + stats.AddTotalSent(int(c.wr.BytesWritten.Swap(0))) + stats.AddTotalReceived(int(c.r.BytesRead.Swap(0))) + } if !open { break } @@ -552,7 +597,7 @@ func (c *Client) Send(pkt protocol.Packet) error { func (c *Client) Flush() error { c.mu.Lock() defer c.mu.Unlock() - return c.wr.Flush() + return c.wr.Writer.Flush() } func (c *Client) Recv() <-chan protocol.Packet { @@ -560,589 +605,3 @@ func (c *Client) Recv() <-chan protocol.Packet { } var _ gat.Client = (*Client)(nil) - -func todo() { - // - // /// Handle cancel request. - // pub async fn cancel( - // read: S, - // write: T, - // addr: std::net::SocketAddr, - // mut bytes: BytesMut, // The rest of the startup message. - // client_server_map: ClientServerMap, - // shutdown: Receiver<()>, - // ) -> Result<Client<S, T>, Err> { - // let process_id = bytes.get_i32(); - // let secretKey = bytes.get_i32(); - // return Ok(Client { - // read: BufReader::new(read), - // write: write, - // addr, - // buffer: BytesMut::with_capacity(8196), - // cancelMode: true, - // transaction_mode: false, - // process_id, - // secretKey, - // client_server_map, - // parameters: HashMap::new(), - // stats: get_reporter(), - // admin: false, - // last_address_id: None, - // last_server_id: None, - // poolName: String::from("undefined"), - // username: String::from("undefined"), - // shutdown, - // connected_to_server: false, - // }); - // } - // - // /// Handle a connected and authenticated client. - // pub async fn handle(&mut self) -> Result<(), Err> { - // // The client wants to cancel a query it has issued previously. - // if self.cancelMode { - // trace!("Sending CancelRequest"); - // - // let (process_id, secretKey, address, port) = { - // let guard = self.client_server_map.lock(); - // - // match guard.get(&(self.process_id, self.secretKey)) { - // // Drop the mutex as soon as possible. - // // We found the server the client is using for its query - // // that it wants to cancel. - // Some((process_id, secretKey, address, port)) => ( - // process_id.clone(), - // secretKey.clone(), - // address.clone(), - // *port, - // ), - // - // // The client doesn't know / got the wrong server, - // // we're closing the connection for security reasons. - // None => return Ok(()), - // } - // }; - // - // // Opens a new separate connection to the server, sends the backend_id - // // and secretKey and then closes it for security reasons. No other interactions - // // take place. - // return Ok(Server::cancel(&address, port, process_id, secretKey).await?); - // } - // - // // The query router determines where the query is going to go, - // // e.g. primary, replica, which shard. - // let mut query_router = QueryRouter::new(); - // - // // Our custom protocol loop. - // // We expect the client to either start a transaction with regular queries - // // or issue commands for our sharding and server selection protocol. - // loop { - // trace!( - // "Client idle, waiting for message, transaction mode: {}", - // self.transaction_mode - // ); - // - // // Read a complete message from the client, which normally would be - // // either a `Q` (query) or `P` (prepare, extended protocol). - // // We can parse it here before grabbing a server from the pool, - // // in case the client is sending some custom protocol messages, e.g. - // // SET SHARDING KEY TO 'bigint'; - // - // let mut message = tokio::select! { - // _ = self.shutdown.recv() => { - // if !self.admin { - // error_response_terminal( - // &mut self.write, - // &format!("terminating connection due to administrator command") - // ).await?; - // return Ok(()) - // } - // - // // Admin clients ignore shutdown. - // else { - // read_message(&mut self.read).await? - // } - // }, - // message_result = read_message(&mut self.read) => message_result? - // }; - // - // // Avoid taking a server if the client just wants to disconnect. - // if message[0] as char == 'X' { - // debug!("Client disconnecting"); - // return Ok(()); - // } - // - // // Handle admin database queries. - // if self.admin { - // debug!("Handling admin command"); - // handle_admin(&mut self.write, message, self.client_server_map.clone()).await?; - // continue; - // } - // - // // Get a pool instance referenced by the most up-to-date - // // pointer. This ensures we always read the latest config - // // when starting a query. - // let pool = match get_pool(self.poolName.clone(), self.username.clone()) { - // Some(pool) => pool, - // None => { - // error_response( - // &mut self.write, - // &format!( - // "No pool configured for database: {:?}, user: {:?}", - // self.poolName, self.username - // ), - // ) - // .await?; - // return Err(Err::ClientError); - // } - // }; - // query_router.update_pool_settings(pool.settings.clone()); - // let current_shard = query_router.shard(); - // - // // Handle all custom protocol commands, if any. - // match query_router.try_execute_command(message.clone()) { - // // Normal query, not a custom command. - // None => { - // if query_router.query_parser_enabled() { - // query_router.infer_role(message.clone()); - // } - // } - // - // // SET SHARD TO - // Some((Command::SetShard, _)) => { - // // Selected shard is not configured. - // if query_router.shard() >= pool.shards() { - // // Set the shard back to what it was. - // query_router.set_shard(current_shard); - // - // error_response( - // &mut self.write, - // &format!( - // "shard {} is more than configured {}, staying on shard {}", - // query_router.shard(), - // pool.shards(), - // current_shard, - // ), - // ) - // .await?; - // } else { - // custom_protocol_response_ok(&mut self.write, "SET SHARD").await?; - // } - // continue; - // } - // - // // SET PRIMARY READS TO - // Some((Command::SetPrimaryReads, _)) => { - // custom_protocol_response_ok(&mut self.write, "SET PRIMARY READS").await?; - // continue; - // } - // - // // SET SHARDING KEY TO - // Some((Command::SetShardingKey, _)) => { - // custom_protocol_response_ok(&mut self.write, "SET SHARDING KEY").await?; - // continue; - // } - // - // // SET SERVER ROLE TO - // Some((Command::SetServerRole, _)) => { - // custom_protocol_response_ok(&mut self.write, "SET SERVER ROLE").await?; - // continue; - // } - // - // // SHOW SERVER ROLE - // Some((Command::ShowServerRole, value)) => { - // show_response(&mut self.write, "server role", &value).await?; - // continue; - // } - // - // // SHOW SHARD - // Some((Command::ShowShard, value)) => { - // show_response(&mut self.write, "shard", &value).await?; - // continue; - // } - // - // // SHOW PRIMARY READS - // Some((Command::ShowPrimaryReads, value)) => { - // show_response(&mut self.write, "primary reads", &value).await?; - // continue; - // } - // }; - // - // debug!("Waiting for connection from pool"); - // - // // Grab a server from the pool. - // let connection = match pool - // .get(query_router.shard(), query_router.role(), self.process_id) - // .await - // { - // Ok(conn) => { - // debug!("Got connection from pool"); - // conn - // } - // Err(err) => { - // // Clients do not expect to get SystemError followed by ReadyForQuery in the middle - // // of extended protocol submission. So we will hold off on sending the actual error - // // message to the client until we get 'S' message - // match message[0] as char { - // 'P' | 'B' | 'E' | 'D' => (), - // _ => { - // error_response( - // &mut self.write, - // "could not get connection from the pool", - // ) - // .await?; - // } - // }; - // - // error!("Could not get connection from pool: {:?}", err); - // - // continue; - // } - // }; - // - // let mut reference = connection.0; - // let address = connection.1; - // let server = &mut *reference; - // - // // Server is assigned to the client in case the client wants to - // // cancel a query later. - // server.claim(self.process_id, self.secretKey); - // self.connected_to_server = true; - // - // // Update statistics. - // if let Some(last_address_id) = self.last_address_id { - // self.stats - // .client_disconnecting(self.process_id, last_address_id); - // } - // self.stats.client_active(self.process_id, address.id); - // - // self.last_address_id = Some(address.id); - // self.last_server_id = Some(server.process_id()); - // - // debug!( - // "Client {:?} talking to server {:?}", - // self.addr, - // server.address() - // ); - // - // // Set application_name if any. - // // TODO: investigate other parameters and set them too. - // if self.parameters.contains_key("application_name") { - // server - // .set_name(&self.parameters["application_name"]) - // .await?; - // } - // - // // Transaction loop. Multiple queries can be issued by the client here. - // // The connection belongs to the client until the transaction is over, - // // or until the client disconnects if we are in session mode. - // // - // // If the client is in session mode, no more custom protocol - // // commands will be accepted. - // loop { - // let mut message = if message.len() == 0 { - // trace!("Waiting for message inside transaction or in session mode"); - // - // match read_message(&mut self.read).await { - // Ok(message) => message, - // Err(err) => { - // // Client disconnected inside a transaction. - // // Clean up the server and re-use it. - // // This prevents connection thrashing by bad clients. - // if server.in_transaction() { - // server.query("ROLLBACK").await?; - // server.query("DISCARD ALL").await?; - // server.set_name("pgcat").await?; - // } - // - // return Err(err); - // } - // } - // } else { - // let msg = message.clone(); - // message.clear(); - // msg - // }; - // - // // The message will be forwarded to the server intact. We still would like to - // // parse it below to figure out what to do with it. - // let original = message.clone(); - // - // let code = message.get_u8() as char; - // let _len = message.get_i32() as usize; - // - // trace!("Message: {}", code); - // - // match code { - // // ReadyForQuery - // 'Q' => { - // debug!("Sending query to server"); - // - // self.send_and_receive_loop(code, original, server, &address, &pool) - // .await?; - // - // if !server.in_transaction() { - // // Report transaction executed statistics. - // self.stats.transaction(self.process_id, address.id); - // - // // Release server back to the pool if we are in transaction mode. - // // If we are in session mode, we keep the server until the client disconnects. - // if self.transaction_mode { - // break; - // } - // } - // } - // - // // Terminate - // 'X' => { - // // Client closing. Rollback and clean up - // // connection before releasing into the pool. - // // Pgbouncer closes the connection which leads to - // // connection thrashing when clients misbehave. - // if server.in_transaction() { - // server.query("ROLLBACK").await?; - // server.query("DISCARD ALL").await?; - // server.set_name("pgcat").await?; - // } - // - // self.release(); - // - // return Ok(()); - // } - // - // // Parse - // // The query with placeholders is here, e.g. `SELECT * FROM users WHERE email = $1 AND active = $2`. - // 'P' => { - // self.buffer.put(&original[..]); - // } - // - // // Bind - // // The placeholder's replacements are here, e.g. 'user@email.com' and 'true' - // 'B' => { - // self.buffer.put(&original[..]); - // } - // - // // Describe - // // Command a client can issue to describe a previously prepared named statement. - // 'D' => { - // self.buffer.put(&original[..]); - // } - // - // // Execute - // // Execute a prepared statement prepared in `P` and bound in `B`. - // 'E' => { - // self.buffer.put(&original[..]); - // } - // - // // Sync - // // Frontend (client) is asking for the query result now. - // 'S' => { - // debug!("Sending query to server"); - // - // self.buffer.put(&original[..]); - // - // self.send_and_receive_loop( - // code, - // self.buffer.clone(), - // server, - // &address, - // &pool, - // ) - // .await?; - // - // self.buffer.clear(); - // - // if !server.in_transaction() { - // self.stats.transaction(self.process_id, address.id); - // - // // Release server back to the pool if we are in transaction mode. - // // If we are in session mode, we keep the server until the client disconnects. - // if self.transaction_mode { - // break; - // } - // } - // } - // - // // CopyData - // 'd' => { - // // Forward the data to the server, - // // don't buffer it since it can be rather large. - // self.send_server_message(server, original, &address, &pool) - // .await?; - // } - // - // // CopyDone or CopyFail - // // Copy is done, successfully or not. - // 'c' | 'f' => { - // self.send_server_message(server, original, &address, &pool) - // .await?; - // - // let response = self.receive_server_message(server, &address, &pool).await?; - // - // match write_all_half(&mut self.write, response).await { - // Ok(_) => (), - // Err(err) => { - // server.mark_bad(); - // return Err(err); - // } - // }; - // - // if !server.in_transaction() { - // self.stats.transaction(self.process_id, address.id); - // - // // Release server back to the pool if we are in transaction mode. - // // If we are in session mode, we keep the server until the client disconnects. - // if self.transaction_mode { - // break; - // } - // } - // } - // - // // Some unexpected message. We either did not implement the protocol correctly - // // or this is not a Postgres client we're talking to. - // _ => { - // error!("Unexpected code: {}", code); - // } - // } - // } - // - // // The server is no longer bound to us, we can't cancel it's queries anymore. - // debug!("Releasing server back into the pool"); - // self.stats.server_idle(server.process_id(), address.id); - // self.connected_to_server = false; - // self.release(); - // self.stats.client_idle(self.process_id, address.id); - // } - // } - // - // /// Release the server from the client: it can't cancel its queries anymore. - // pub fn release(&self) { - // let mut guard = self.client_server_map.lock(); - // guard.remove(&(self.process_id, self.secretKey)); - // } - // - // async fn send_and_receive_loop( - // &mut self, - // code: char, - // message: BytesMut, - // server: &mut Server, - // address: &Address, - // pool: &ConnectionPool, - // ) -> Result<(), Err> { - // debug!("Sending {} to server", code); - // - // self.send_server_message(server, message, &address, &pool) - // .await?; - // - // // Read all data the server has to offer, which can be multiple messages - // // buffered in 8196 bytes chunks. - // loop { - // let response = self.receive_server_message(server, &address, &pool).await?; - // - // match write_all_half(&mut self.write, response).await { - // Ok(_) => (), - // Err(err) => { - // server.mark_bad(); - // return Err(err); - // } - // }; - // - // if !server.is_data_available() { - // break; - // } - // } - // - // // Report query executed statistics. - // self.stats.query(self.process_id, address.id); - // - // Ok(()) - // } - // - // async fn send_server_message( - // &self, - // server: &mut Server, - // message: BytesMut, - // address: &Address, - // pool: &ConnectionPool, - // ) -> Result<(), Err> { - // match server.send(message).await { - // Ok(_) => Ok(()), - // Err(err) => { - // pool.ban(address, self.process_id); - // Err(err) - // } - // } - // } - // - // async fn receive_server_message( - // &mut self, - // server: &mut Server, - // address: &Address, - // pool: &ConnectionPool, - // ) -> Result<BytesMut, Err> { - // if pool.settings.user.statement_timeout > 0 { - // match tokio::time::timeout( - // tokio::time::Duration::from_millis(pool.settings.user.statement_timeout), - // server.recv(), - // ) - // .await - // { - // Ok(result) => match result { - // Ok(message) => Ok(message), - // Err(err) => { - // pool.ban(address, self.process_id); - // error_response_terminal( - // &mut self.write, - // &format!("error receiving data from server: {:?}", err), - // ) - // .await?; - // Err(err) - // } - // }, - // Err(_) => { - // error!( - // "Statement timeout while talking to {:?} with user {}", - // address, pool.settings.user.username - // ); - // server.mark_bad(); - // pool.ban(address, self.process_id); - // error_response_terminal(&mut self.write, "pool statement timeout").await?; - // Err(Err::StatementTimeout) - // } - // } - // } else { - // match server.recv().await { - // Ok(message) => Ok(message), - // Err(err) => { - // pool.ban(address, self.process_id); - // error_response_terminal( - // &mut self.write, - // &format!("error receiving data from server: {:?}", err), - // ) - // .await?; - // Err(err) - // } - // } - // } - // } - //} - // - //impl<S, T> Drop for Client<S, T> { - // fn drop(&mut self) { - // let mut guard = self.client_server_map.lock(); - // guard.remove(&(self.process_id, self.secretKey)); - // - // // Dirty shutdown - // // TODO: refactor, this is not the best way to handle state management. - // if let Some(address_id) = self.last_address_id { - // self.stats.client_disconnecting(self.process_id, address_id); - // - // if self.connected_to_server { - // if let Some(process_id) = self.last_server_id { - // self.stats.server_idle(process_id, address_id); - // } - // } - // } - // } - //} - -} diff --git a/lib/gat/gatling/pool/conn_pool/conn_pool.go b/lib/gat/gatling/pool/conn_pool/conn_pool.go index ffb1eed5..697c6205 100644 --- a/lib/gat/gatling/pool/conn_pool/conn_pool.go +++ b/lib/gat/gatling/pool/conn_pool/conn_pool.go @@ -2,13 +2,13 @@ package conn_pool import ( "context" - "reflect" - "runtime" - "sync" - "gfx.cafe/gfx/pggat/lib/config" "gfx.cafe/gfx/pggat/lib/gat" "gfx.cafe/gfx/pggat/lib/gat/protocol" + "reflect" + "runtime" + "sync" + "time" ) type ConnectionPool struct { @@ -38,7 +38,15 @@ func NewConnectionPool(pool gat.Pool, conf *config.Pool, user *config.User) *Con return p } +func (c *ConnectionPool) GetPool() gat.Pool { + return c.pool +} + func (c *ConnectionPool) getWorker() *worker { + start := time.Now() + defer func() { + c.pool.GetStats().AddWaitTime(int(time.Now().Sub(start).Microseconds())) + }() select { case w := <-c.workerPool: return w @@ -84,7 +92,7 @@ func (c *ConnectionPool) GetServerInfo() []*protocol.ParameterStatus { return c.getWorker().GetServerInfo() } -func (c *ConnectionPool) Shards() []gat.Shard { +func (c *ConnectionPool) GetShards() []gat.Shard { var shards []gat.Shard c.mu.Lock() defer c.mu.Unlock() @@ -103,12 +111,10 @@ func (c *ConnectionPool) Execute(ctx context.Context, client gat.Client, e *prot } func (c *ConnectionPool) SimpleQuery(ctx context.Context, client gat.Client, q string) error { - defer c.pool.Stats().IncQueryCount() return c.getWorker().HandleSimpleQuery(ctx, client, q) } func (c *ConnectionPool) Transaction(ctx context.Context, client gat.Client, q string) error { - defer c.pool.Stats().IncXactCount() return c.getWorker().HandleTransaction(ctx, client, q) } diff --git a/lib/gat/gatling/pool/conn_pool/worker.go b/lib/gat/gatling/pool/conn_pool/worker.go index 592033c7..930f62b4 100644 --- a/lib/gat/gatling/pool/conn_pool/worker.go +++ b/lib/gat/gatling/pool/conn_pool/worker.go @@ -9,6 +9,7 @@ import ( "gfx.cafe/gfx/pggat/lib/gat/protocol" "gfx.cafe/gfx/pggat/lib/gat/protocol/pg_error" "sync" + "time" ) // a single use worker with an embedded connection pool. @@ -165,6 +166,11 @@ func (w *worker) HandleFunction(ctx context.Context, c gat.Client, fn *protocol. func (w *worker) HandleSimpleQuery(ctx context.Context, c gat.Client, query string) error { defer w.ret() + start := time.Now() + defer func() { + w.w.pool.GetStats().AddQueryTime(int(time.Now().Sub(start).Microseconds())) + }() + errch := make(chan error) go func() { defer close(errch) @@ -186,6 +192,11 @@ func (w *worker) HandleSimpleQuery(ctx context.Context, c gat.Client, query stri func (w *worker) HandleTransaction(ctx context.Context, c gat.Client, query string) error { defer w.ret() + start := time.Now() + defer func() { + w.w.pool.GetStats().AddXactTime(int(time.Now().Sub(start).Microseconds())) + }() + errch := make(chan error) go func() { defer close(errch) diff --git a/lib/gat/gatling/pool/pool.go b/lib/gat/gatling/pool/pool.go index 60fb0588..4b1d91e1 100644 --- a/lib/gat/gatling/pool/pool.go +++ b/lib/gat/gatling/pool/pool.go @@ -86,495 +86,8 @@ func (p *Pool) ConnectionPools() []gat.ConnectionPool { return out } -func (p *Pool) Stats() *gat.PoolStats { +func (p *Pool) GetStats() *gat.PoolStats { return p.stats } var _ gat.Pool = (*Pool)(nil) - -//TODO: implement server pool -//#[async_trait] -//impl ManageConnection for ServerPool { -// type Connection = Server; -// type Err = Err; -// -// /// Attempts to create a new connection. -// async fn connect(&self) -> Result<Self::Connection, Self::Err> { -// info!( -// "Creating a new connection to {:?} using user {:?}", -// self.address.name(), -// self.user.username -// ); -// -// // Put a temporary process_id into the stats -// // for server login. -// let process_id = rand::random::<i32>(); -// self.stats.server_login(process_id, self.address.id); -// -// // Connect to the PostgreSQL server. -// match Server::startup( -// &self.address, -// &self.user, -// &self.database, -// self.client_server_map.clone(), -// self.stats.clone(), -// ) -// .await -// { -// Ok(conn) => { -// // Remove the temporary process_id from the stats. -// self.stats.server_disconnecting(process_id, self.address.id); -// Ok(conn) -// } -// Err(err) => { -// // Remove the temporary process_id from the stats. -// self.stats.server_disconnecting(process_id, self.address.id); -// Err(err) -// } -// } -// } -// -// /// Determines if the connection is still connected to the database. -// async fn is_valid(&self, _conn: &mut PooledConnection<'_, Self>) -> Result<(), Self::Err> { -// Ok(()) -// } -// -// /// Synchronously determine if the connection is no longer usable, if possible. -// fn has_broken(&self, conn: &mut Self::Connection) -> bool { -// conn.is_bad() -// } -//} -// -///// Get the connection pool -//pub fn get_pool(db: String, user: String) -> Option<ConnectionPool> { -// match get_all_pools().get(&(db, user)) { -// Some(pool) => Some(pool.clone()), -// None => None, -// } -//} -// -///// How many total servers we have in the config. -//pub fn get_number_of_addresses() -> usize { -// get_all_pools() -// .iter() -// .map(|(_, pool)| pool.databases()) -// .sum() -//} -// -///// Get a pointer to all configured pools. -//pub fn get_all_pools() -> HashMap<(String, String), ConnectionPool> { -// return (*(*POOLS.load())).clone(); -//} - -//TODO: implement this -// /// Construct the connection pool from the configuration. -// func (c *ConnectionPool) from_config(client_server_map: ClientServerMap) Result<(), Err> { -// let config = get_config() -// -// new_pools = HashMap::new() -// address_id = 0 -// -// for (pool_name, pool_config) in &config.pools { -// // There is one pool per database/user pair. -// for (_, user) in &pool_config.users { -// shards = Vec::new() -// addresses = Vec::new() -// banlist = Vec::new() -// shard_ids = pool_config -// .shards -// .clone() -// .into_keys() -// .map(|x| x.to_string()) -// .collect::<Vec<string>>() -// -// // Sort by shard number to ensure consistency. -// shard_ids.sort_by_key(|k| k.parse::<i64>().unwrap()) -// -// for shard_idx in &shard_ids { -// let shard = &pool_config.shards[shard_idx] -// pools = Vec::new() -// servers = Vec::new() -// address_index = 0 -// replica_number = 0 -// -// for server in shard.servers.iter() { -// let role = match server.2.as_ref() { -// "primary" => Role::Primary, -// "replica" => Role::Replica, -// _ => { -// error!("Config error: server role can be 'primary' or 'replica', have: '{}'. Defaulting to 'replica'.", server.2) -// Role::Replica -// } -// } -// -// let address = Address { -// id: address_id, -// database: shard.database.clone(), -// host: server.0.clone(), -// port: server.1 as u16, -// role: role, -// address_index, -// replica_number, -// shard: shard_idx.parse::<usize>().unwrap(), -// username: user.username.clone(), -// pool_name: pool_name.clone(), -// } -// -// address_id += 1 -// address_index += 1 -// -// if role == Role::Replica { -// replica_number += 1 -// } -// -// let manager = ServerPool::new( -// address.clone(), -// user.clone(), -// &shard.database, -// client_server_map.clone(), -// get_reporter(), -// ) -// -// let pool = Pool::builder() -// .max_size(user.pool_size) -// .connection_timeout(std::time::Duration::from_millis( -// config.general.connect_timeout, -// )) -// .test_on_check_out(false) -// .build(manager) -// .await -// .unwrap() -// -// pools.push(pool) -// servers.push(address) -// } -// -// shards.push(pools) -// addresses.push(servers) -// banlist.push(HashMap::new()) -// } -// -// assert_eq!(shards.len(), addresses.len()) -// -// pool = ConnectionPool { -// databases: shards, -// addresses: addresses, -// banlist: Arc::new(RwLock::new(banlist)), -// stats: get_reporter(), -// server_info: BytesMut::new(), -// settings: PoolSettings { -// pool_mode: match pool_config.pool_mode.as_str() { -// "transaction" => PoolMode::Transaction, -// "session" => PoolMode::Session, -// _ => unreachable!(), -// }, -// // shards: pool_config.shards.clone(), -// shards: shard_ids.len(), -// user: user.clone(), -// default_role: match pool_config.default_role.as_str() { -// "any" => None, -// "replica" => Some(Role::Replica), -// "primary" => Some(Role::Primary), -// _ => unreachable!(), -// }, -// query_parser_enabled: pool_config.query_parser_enabled.clone(), -// primary_reads_enabled: pool_config.primary_reads_enabled, -// sharding_function: match pool_config.sharding_function.as_str() { -// "pg_bigint_hash" => ShardingFunction::PgBigintHash, -// "sha1" => ShardingFunction::Sha1, -// _ => unreachable!(), -// }, -// }, -// } -// -// // Connect to the servers to make sure pool configuration is valid -// // before setting it globally. -// match pool.validate().await { -// Ok(_) => (), -// Err(err) => { -// error!("Could not validate connection pool: {:?}", err) -// return Err(err) -// } -// } -// -// // There is one pool per database/user pair. -// new_pools.insert((pool_name.clone(), user.username.clone()), pool) -// } -// } -// -// POOLS.store(Arc::new(new_pools.clone())) -// -// Ok(()) -// } -// -// /// Connect to all shards and grab server information. -// /// Return server information we will pass to the clients -// /// when they connect. -// /// This also warms up the pool for clients that connect when -// /// the pooler starts up. -// async fn validate(&mut self) Result<(), Err> { -// server_infos = Vec::new() -// for shard in 0..self.shards() { -// for server in 0..self.servers(shard) { -// let connection = match self.databases[shard][server].get().await { -// Ok(conn) => conn, -// Err(err) => { -// error!("Shard {} down or misconfigured: {:?}", shard, err) -// continue -// } -// } -// -// let proxy = connection -// let server = &*proxy -// let server_info = server.server_info() -// -// if server_infos.len() > 0 { -// // Compare against the last server checked. -// if server_info != server_infos[server_infos.len() - 1] { -// warn!( -// "{:?} has different server configuration than the last server", -// proxy.address() -// ) -// } -// } -// -// server_infos.push(server_info) -// } -// } -// -// // TODO: compare server information to make sure -// // all shards are running identical configurations. -// if server_infos.len() == 0 { -// return Err(Err::AllServersDown) -// } -// -// // We're assuming all servers are identical. -// // TODO: not true. -// self.server_info = server_infos[0].clone() -// -// Ok(()) -// } -// -// /// Get a connection from the pool. -// func (c *ConnectionPool) get( -// &self, -// shard: usize, // shard number -// role: Option<Role>, // primary or replica -// process_id: i32, // client id -// ) Result<(PooledConnection<'_, ServerPool>, Address), Err> { -// let now = Instant::now() -// candidates: Vec<&Address> = self.addresses[shard] -// .iter() -// .filter(|address| address.role == role) -// .collect() -// -// // Random load balancing -// candidates.shuffle(&mut thread_rng()) -// -// let healthcheck_timeout = get_config().general.healthcheck_timeout -// let healthcheck_delay = get_config().general.healthcheck_delay as u128 -// -// while !candidates.is_empty() { -// // Get the next candidate -// let address = match candidates.pop() { -// Some(address) => address, -// None => break, -// } -// -// if self.is_banned(&address, role) { -// debug!("Address {:?} is banned", address) -// continue -// } -// -// // Indicate we're waiting on a server connection from a pool. -// self.stats.client_waiting(process_id, address.id) -// -// // Check if we can connect -// conn = match self.databases[address.shard][address.address_index] -// .get() -// .await -// { -// Ok(conn) => conn, -// Err(err) => { -// error!("Banning instance {:?}, error: {:?}", address, err) -// self.ban(&address, process_id) -// self.stats -// .checkout_time(now.elapsed().as_micros(), process_id, address.id) -// continue -// } -// } -// -// // // Check if this server is alive with a health check. -// let server = &mut *conn -// -// // Will return error if timestamp is greater than current system time, which it should never be set to -// let require_healthcheck = -// server.last_activity().elapsed().unwrap().as_millis() > healthcheck_delay -// -// // Do not issue a health check unless it's been a little while -// // since we last checked the server is ok. -// // Health checks are pretty expensive. -// if !require_healthcheck { -// self.stats -// .checkout_time(now.elapsed().as_micros(), process_id, address.id) -// self.stats.server_active(conn.process_id(), address.id) -// return Ok((conn, address.clone())) -// } -// -// debug!("Running health check on server {:?}", address) -// -// self.stats.server_tested(server.process_id(), address.id) -// -// match tokio::time::timeout( -// tokio::time::Duration::from_millis(healthcheck_timeout), -// server.query(""), // Cheap query (query parser not used in PG) -// ) -// .await -// { -// // Check if health check succeeded. -// Ok(res) => match res { -// Ok(_) => { -// self.stats -// .checkout_time(now.elapsed().as_micros(), process_id, address.id) -// self.stats.server_active(conn.process_id(), address.id) -// return Ok((conn, address.clone())) -// } -// -// // Health check failed. -// Err(err) => { -// error!( -// "Banning instance {:?} because of failed health check, {:?}", -// address, err -// ) -// -// // Don't leave a bad connection in the pool. -// server.mark_bad() -// -// self.ban(&address, process_id) -// continue -// } -// }, -// -// // Health check timed out. -// Err(err) => { -// error!( -// "Banning instance {:?} because of health check timeout, {:?}", -// address, err -// ) -// // Don't leave a bad connection in the pool. -// server.mark_bad() -// -// self.ban(&address, process_id) -// continue -// } -// } -// } -// -// Err(Err::AllServersDown) -// } -// -// /// Ban an address (i.e. replica). It no longer will serve -// /// traffic for any new transactions. Existing transactions on that replica -// /// will finish successfully or error out to the clients. -// func (c *ConnectionPool) ban(&self, address: &Address, process_id: i32) { -// self.stats.client_disconnecting(process_id, address.id) -// -// error!("Banning {:?}", address) -// -// let now = chrono::offset::Utc::now().naive_utc() -// guard = self.banlist.write() -// guard[address.shard].insert(address.clone(), now) -// } -// -// /// Clear the replica to receive traffic again. Takes effect immediately -// /// for all new transactions. -// func (c *ConnectionPool) _unban(&self, address: &Address) { -// guard = self.banlist.write() -// guard[address.shard].remove(address) -// } -// -// /// Check if a replica can serve traffic. If all replicas are banned, -// /// we unban all of them. Better to try then not to. -// func (c *ConnectionPool) is_banned(&self, address: &Address, role: Option<Role>) bool { -// let replicas_available = match role { -// Some(Role::Replica) => self.addresses[address.shard] -// .iter() -// .filter(|addr| addr.role == Role::Replica) -// .count(), -// None => self.addresses[address.shard].len(), -// Some(Role::Primary) => return false, // Primary cannot be banned. -// } -// -// debug!("Available targets for {:?}: {}", role, replicas_available) -// -// let guard = self.banlist.read() -// -// // Everything is banned = nothing is banned. -// if guard[address.shard].len() == replicas_available { -// drop(guard) -// guard = self.banlist.write() -// guard[address.shard].clear() -// drop(guard) -// warn!("Unbanning all replicas.") -// return false -// } -// -// // I expect this to miss 99.9999% of the time. -// match guard[address.shard].get(address) { -// Some(timestamp) => { -// let now = chrono::offset::Utc::now().naive_utc() -// let config = get_config() -// -// // Ban expired. -// if now.timestamp() - timestamp.timestamp() > config.general.ban_time { -// drop(guard) -// warn!("Unbanning {:?}", address) -// guard = self.banlist.write() -// guard[address.shard].remove(address) -// false -// } else { -// debug!("{:?} is banned", address) -// true -// } -// } -// -// None => { -// debug!("{:?} is ok", address) -// false -// } -// } -// } -// -// /// Get the number of configured shards. -// func (c *ConnectionPool) shards(&self) usize { -// self.databases.len() -// } -// -// /// Get the number of servers (primary and replicas) -// /// configured for a shard. -// func (c *ConnectionPool) servers(&self, shard: usize) usize { -// self.addresses[shard].len() -// } -// -// /// Get the total number of servers (databases) we are connected to. -// func (c *ConnectionPool) databases(&self) usize { -// databases = 0 -// for shard in 0..self.shards() { -// databases += self.servers(shard) -// } -// databases -// } -// -// /// Get pool state for a particular shard server as reported by bb8. -// func (c *ConnectionPool) pool_state(&self, shard: usize, server: usize) bb8::State { -// self.databases[shard][server].state() -// } -// -// /// Get the address information for a shard server. -// func (c *ConnectionPool) address(&self, shard: usize, server: usize) &Address { -// &self.addresses[shard][server] -// } -// -// func (c *ConnectionPool) server_info(&self) BytesMut { -// self.server_info.clone() -// } diff --git a/lib/gat/interfaces.go b/lib/gat/interfaces.go index 07c9181a..c682e663 100644 --- a/lib/gat/interfaces.go +++ b/lib/gat/interfaces.go @@ -19,6 +19,8 @@ type Client interface { GetCurrentConn() (Connection, error) SetCurrentConn(conn Connection) + GetConnectionPool() ConnectionPool + State() string Addr() string Port() int @@ -52,7 +54,7 @@ type Pool interface { WithUser(name string) (ConnectionPool, error) ConnectionPools() []ConnectionPool - Stats() *PoolStats + GetStats() *PoolStats EnsureConfig(c *config.Pool) } @@ -65,7 +67,9 @@ type ConnectionPool interface { GetUser() *config.User GetServerInfo() []*protocol.ParameterStatus - Shards() []Shard + GetPool() Pool + + GetShards() []Shard EnsureConfig(c *config.Pool) diff --git a/lib/gat/stats.go b/lib/gat/stats.go index 31098212..3b07dede 100644 --- a/lib/gat/stats.go +++ b/lib/gat/stats.go @@ -1,6 +1,8 @@ package gat -import "time" +import ( + "time" +) type PoolStats struct { start time.Time @@ -21,74 +23,102 @@ func NewPoolStats() *PoolStats { } } -func (s *PoolStats) TotalXactCount() int { - return s.xactCount +func (s *PoolStats) TimeActive() time.Duration { + return time.Now().Sub(s.start) } -func (s *PoolStats) IncXactCount() { - s.xactCount += 1 +func (s *PoolStats) TotalXactCount() int { + return s.xactCount } func (s *PoolStats) TotalQueryCount() int { return s.queryCount } -func (s *PoolStats) IncQueryCount() { - s.queryCount += 1 +func (s *PoolStats) TotalWaitCount() int { + return s.waitCount } func (s *PoolStats) TotalReceived() int { return s.received } +func (s *PoolStats) AddTotalReceived(amount int) { + s.received += amount +} + func (s *PoolStats) TotalSent() int { return s.sent } +func (s *PoolStats) AddTotalSent(amount int) { + s.sent += amount +} + func (s *PoolStats) TotalXactTime() int { return s.xactTime } +func (s *PoolStats) AddXactTime(time int) { + s.xactCount += 1 + s.xactTime += time +} + func (s *PoolStats) TotalQueryTime() int { return s.queryTime } +func (s *PoolStats) AddQueryTime(time int) { + s.queryCount += 1 + s.queryTime += time +} + func (s *PoolStats) TotalWaitTime() int { return s.waitTime } -func (s *PoolStats) totalTime() time.Duration { - return time.Now().Sub(s.start) +func (s *PoolStats) AddWaitTime(time int) { + s.waitCount += 1 + s.waitTime += time } func (s *PoolStats) AvgXactCount() float64 { - seconds := s.totalTime().Seconds() + seconds := s.TimeActive().Seconds() return float64(s.xactCount) / seconds } func (s *PoolStats) AvgQueryCount() float64 { - seconds := s.totalTime().Seconds() + seconds := s.TimeActive().Seconds() return float64(s.queryCount) / seconds } func (s *PoolStats) AvgRecv() float64 { - seconds := s.totalTime().Seconds() + seconds := s.TimeActive().Seconds() return float64(s.received) / seconds } func (s *PoolStats) AvgSent() float64 { - seconds := s.totalTime().Seconds() + seconds := s.TimeActive().Seconds() return float64(s.sent) / seconds } func (s *PoolStats) AvgXactTime() float64 { + if s.xactCount == 0 { + return 0 + } return float64(s.xactTime) / float64(s.xactCount) } func (s *PoolStats) AvgQueryTime() float64 { + if s.queryCount == 0 { + return 0 + } return float64(s.queryTime) / float64(s.queryCount) } func (s *PoolStats) AvgWaitTime() float64 { + if s.waitCount == 0 { + return 0 + } return float64(s.waitTime) / float64(s.waitCount) } -- GitLab