diff --git a/README.md b/README.md index aef9743d00944616ed7f7eaf59723df6f9223cc3..0b1c69c5e17ad88f6a8f97c3c2178f7e69d59def 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ i'll lyk when its done | Query cancellation | :white_check_mark: | :white_check_mark: | Supported both in transaction and session pooling modes. | | Load balancing of read queries | :white_check_mark: | :white_check_mark: | Using random between replicas. Primary is included when `primary_reads_enabled` is enabled (default). | | Sharding | :white_check_mark: | no | Transactions are sharded using `SET SHARD TO` and `SET SHARDING KEY TO` syntax extensions; see examples below. | -| Failover | :white_check_mark: | no | Replicas are tested with a health check. If a health check fails, remaining replicas are attempted; see below for algorithm description and examples. | +| Failover | :white_check_mark: | :white_check_mark: | Replicas are tested with a health check. If a health check fails, remaining replicas are attempted; see below for algorithm description and examples. | | Statistics | :white_check_mark: | :white_check_mark: | Statistics available in the admin database (`pgcat` and `pgbouncer`) with `SHOW STATS`, `SHOW POOLS` and others. | | Live configuration reloading | :white_check_mark: | kind of | Reload supported settings with a `SIGHUP` to the process, e.g. `kill -s SIGHUP $(pgrep pgcat)` or `RELOAD` query issued to the admin database. | | Client authentication | :white_check_mark: :wrench: | same as them | MD5 password authentication is supported, SCRAM is on the roadmap; one user is used to connect to Postgres with both SCRAM and MD5 supported. | diff --git a/lib/gat/admin/admin.go b/lib/gat/admin/admin.go index 7363c911551d95fee2d2d8b5fcbf7451d5c80e4c..c2d659d7146da8027e0d12c8150d03b6039ad3af 100644 --- a/lib/gat/admin/admin.go +++ b/lib/gat/admin/admin.go @@ -523,19 +523,23 @@ func (c *ConnectionPool) SimpleQuery(ctx context.Context, client gat.Client, que err = c.pool.showStats(client, false, true) case "totals": err = c.pool.showTotals(client) + case "servers": + case "clients": + case "pools": + case "lists": + case "users": + case "databases": + case "fds": + case "sockets", "active_sockets": + case "config": + case "mem": + case "dns_hosts": + case "dns_zones": + case "version": + default: return errors.New("unknown command") } - if err != nil { - return err - } - - done := new(protocol.CommandComplete) - done.Fields.Data = cmd.Command - err = client.Send(done) - if err != nil { - return err - } case "pause": case "disable": case "enable": @@ -550,6 +554,16 @@ func (c *ConnectionPool) SimpleQuery(ctx context.Context, client gat.Client, que default: return errors.New("unknown command") } + + if err != nil { + return err + } + done := new(protocol.CommandComplete) + done.Fields.Data = cmd.Command + err = client.Send(done) + if err != nil { + return err + } } return nil } diff --git a/lib/gat/gatling/pool/conn_pool/shard/server/server.go b/lib/gat/gatling/pool/conn_pool/shard/server/server.go index 047cb4d3368ec44af87edb22a8bd34e0f23efe0e..c2b77f1a89624f80b97d9a7b37ffc2a8c8b03ac8 100644 --- a/lib/gat/gatling/pool/conn_pool/shard/server/server.go +++ b/lib/gat/gatling/pool/conn_pool/shard/server/server.go @@ -50,6 +50,8 @@ type Server struct { dbpass string user config.User + healthy bool + log zlog.Logger mu sync.Mutex @@ -147,8 +149,50 @@ func (s *Server) GetRequestTime() time.Time { return s.lastActivity } +func (s *Server) failHealthCheck(err error) { + log.Println("Server failed a health check!!!!", err) + s.healthy = false +} + +func (s *Server) healthCheck() { + check := new(protocol.Query) + check.Fields.Query = "select 1" + err := s.writePacket(check) + if err != nil { + s.failHealthCheck(err) + return + } + err = s.flush() + if err != nil { + s.failHealthCheck(err) + return + } + + // read until we get a ready for query + for { + var recv protocol.Packet + recv, err = s.readPacket() + if err != nil { + s.failHealthCheck(err) + return + } + + switch r := recv.(type) { + case *protocol.ReadyForQuery: + if r.Fields.Status != 'I' { + s.failHealthCheck(fmt.Errorf("expected server to be in command mode but it isn't")) + } + return + case *protocol.DataRow, *protocol.RowDescription, *protocol.CommandComplete: + default: + s.failHealthCheck(fmt.Errorf("expected a Simple Query packet but server sent %#v", recv)) + return + } + } +} + func (s *Server) IsCloseNeeded() bool { - return false + return !s.healthy } func (s *Server) GetClient() gat.Client { @@ -164,6 +208,8 @@ func (s *Server) SetClient(client gat.Client) { if client != nil { s.state = gat.ConnectionActive } else { + // client no longer needs this connection, perform a health check + s.healthCheck() s.state = gat.ConnectionIdle } s.client = client @@ -325,6 +371,7 @@ func (s *Server) forwardTo(client gat.Client, predicate func(pkt protocol.Packet } func (s *Server) writePacket(pkt protocol.Packet) error { + //log.Printf("out %#v", pkt) _, err := pkt.Write(s.wr) return err } @@ -334,7 +381,9 @@ func (s *Server) flush() error { } func (s *Server) readPacket() (protocol.Packet, error) { - return protocol.ReadBackend(s.r) + p, err := protocol.ReadBackend(s.r) + //log.Printf("in %#v", p) + return p, err } func (s *Server) ensurePreparedStatement(client gat.Client, name string) error { @@ -572,7 +621,7 @@ func (s *Server) Transaction(ctx context.Context, client gat.Client, query strin if err != nil { end := new(protocol.Query) - end.Fields.Query = "END;" + end.Fields.Query = "END" _ = s.writePacket(end) _ = s.flush() } diff --git a/lib/gat/interfaces.go b/lib/gat/interfaces.go index b556f8f3e3d7cf29207c851419d9467eb78fc4a1..6c00bc7524fafd4e753b867c6756d27c8568f26f 100644 --- a/lib/gat/interfaces.go +++ b/lib/gat/interfaces.go @@ -112,12 +112,14 @@ type Connection interface { GetLocalAddress() net.Addr GetConnectTime() time.Time GetRequestTime() time.Time - IsCloseNeeded() bool GetClient() Client SetClient(client Client) GetRemotePid() int GetTLS() string + // IsCloseNeeded returns whether this connection failed a health check + IsCloseNeeded() bool + // actions Describe(client Client, payload *protocol.Describe) error Execute(client Client, payload *protocol.Execute) error