From 2a05ed4e66a4a03a2cf3d6c543bc40c0c9c7157d Mon Sep 17 00:00:00 2001 From: Garet Halliday <me@garet.holiday> Date: Tue, 17 Oct 2023 20:08:26 -0500 Subject: [PATCH] amazing --- lib/fed/conn.go | 7 +- lib/gat/handlers/pool/pools/hybrid/pool.go | 103 ++++++++++++++++++++- 2 files changed, 106 insertions(+), 4 deletions(-) diff --git a/lib/fed/conn.go b/lib/fed/conn.go index aa80ac65..9ea71495 100644 --- a/lib/fed/conn.go +++ b/lib/fed/conn.go @@ -19,14 +19,15 @@ type Conn struct { Middleware []Middleware - SSL bool - Ready bool + SSL bool User string Database string InitialParameters map[strutil.CIString]string - Authenticated bool BackendKey BackendKey + + Authenticated bool + Ready bool } func NewConn(rw net.Conn) *Conn { diff --git a/lib/gat/handlers/pool/pools/hybrid/pool.go b/lib/gat/handlers/pool/pools/hybrid/pool.go index 306bade7..221ba276 100644 --- a/lib/gat/handlers/pool/pools/hybrid/pool.go +++ b/lib/gat/handlers/pool/pools/hybrid/pool.go @@ -14,6 +14,7 @@ import ( "gfx.cafe/gfx/pggat/lib/gat/handlers/pool" "gfx.cafe/gfx/pggat/lib/gat/handlers/pool/spool" "gfx.cafe/gfx/pggat/lib/gat/metrics" + "gfx.cafe/gfx/pggat/lib/util/strutil" ) type Pool struct { @@ -110,7 +111,7 @@ func (T *Pool) removeClient(client *Client) { delete(T.clients, client.Conn.BackendKey) } -func (T *Pool) Serve(conn *fed.Conn) error { +func (T *Pool) serveRW(conn *fed.Conn) error { m := NewMiddleware() eqpa := eqp.NewClient() @@ -261,6 +262,106 @@ func (T *Pool) Serve(conn *fed.Conn) error { } } +func (T *Pool) serveRO(conn *fed.Conn) error { + conn.Middleware = append( + conn.Middleware, + unterminate.Unterminate, + ps.NewClient(conn.InitialParameters), + eqp.NewClient(), + ) + + client := NewClient(conn) + + T.addClient(client) + defer T.removeClient(client) + + T.replica.AddClient(client.ID) + defer T.replica.RemoveClient(client.ID) + + var err, serverErr error + + var replica *spool.Server + defer func() { + if replica != nil { + if serverErr != nil { + T.replica.RemoveServer(replica) + } else { + T.replica.Release(replica) + } + replica = nil + } + }() + + if !conn.Ready { + client.SetState(metrics.ConnStateAwaitingServer, nil, true) + + replica = T.replica.Acquire(client.ID) + if replica == nil { + return pool.ErrClosed + } + + err, serverErr = T.Pair(client, replica) + if serverErr != nil { + return serverErr + } + if err != nil { + return err + } + + p := packets.ReadyForQuery('I') + if err = conn.WritePacket(&p); err != nil { + return err + } + + conn.Ready = true + } + + for { + if replica != nil { + T.replica.Release(replica) + replica = nil + } + client.SetState(metrics.ConnStateIdle, nil, true) + + var packet fed.Packet + packet, err = conn.ReadPacket(true) + if err != nil { + return err + } + + client.SetState(metrics.ConnStateAwaitingServer, nil, true) + + replica = T.replica.Acquire(client.ID) + if replica == nil { + return pool.ErrClosed + } + + err, serverErr = T.Pair(client, replica) + + if err == nil && serverErr == nil { + err, serverErr = bouncers.Bounce(conn, replica.Conn, packet) + } + if serverErr != nil { + return serverErr + } else { + replica.TransactionComplete() + client.TransactionComplete() + } + + if err != nil { + return err + } + } +} + +func (T *Pool) Serve(conn *fed.Conn) error { + if conn.InitialParameters[strutil.MakeCIString("hybrid.mode")] == "ro" { + return T.serveRO(conn) + } else { + return T.serveRW(conn) + } +} + func (T *Pool) Cancel(key fed.BackendKey) { peer, replica := func() (*spool.Server, bool) { T.mu.RLock() -- GitLab