diff --git a/lib/gat/handlers/pool/pools/hybrid/factory.go b/lib/gat/handlers/pool/pools/hybrid/factory.go index 14a625dafd18792ba7eadc27d2432f21c2ee1595..a2e8ec3df96162f492e4c7ee67017714e39dbc6c 100644 --- a/lib/gat/handlers/pool/pools/hybrid/factory.go +++ b/lib/gat/handlers/pool/pools/hybrid/factory.go @@ -23,7 +23,7 @@ func (T *Factory) CaddyModule() caddy.ModuleInfo { } func (T *Factory) NewPool() pool.Pool { - return new(Pool) + return NewPool() } var _ pool.PoolFactory = (*Factory)(nil) diff --git a/lib/gat/handlers/pool/pools/hybrid/middleware.go b/lib/gat/handlers/pool/pools/hybrid/middleware.go new file mode 100644 index 0000000000000000000000000000000000000000..9a747406c5acfc107ebadc898177c6af3b228e0f --- /dev/null +++ b/lib/gat/handlers/pool/pools/hybrid/middleware.go @@ -0,0 +1,29 @@ +package hybrid + +import ( + "log" + + "gfx.cafe/gfx/pggat/lib/fed" + packets "gfx.cafe/gfx/pggat/lib/fed/packets/v3.0" +) + +type Middleware struct{} + +func (T *Middleware) ReadPacket(packet fed.Packet) (fed.Packet, error) { + return packet, nil +} + +func (T *Middleware) WritePacket(packet fed.Packet) (fed.Packet, error) { + switch packet.Type() { + case packets.TypeErrorResponse: + var p packets.ErrorResponse + if err := fed.ToConcrete(&p, packet); err != nil { + return nil, err + } + log.Printf("%#v", p) + return &p, nil + } + return packet, nil +} + +var _ fed.Middleware = (*Middleware)(nil) diff --git a/lib/gat/handlers/pool/pools/hybrid/pool.go b/lib/gat/handlers/pool/pools/hybrid/pool.go index 92cb728e75197d9353cbee51993a65fb39446f51..be9875dbf4edb0acfa21c90338b58e61933ee6aa 100644 --- a/lib/gat/handlers/pool/pools/hybrid/pool.go +++ b/lib/gat/handlers/pool/pools/hybrid/pool.go @@ -1,8 +1,16 @@ package hybrid import ( + "time" + + "github.com/google/uuid" + + "gfx.cafe/gfx/pggat/lib/bouncer/bouncers/v2" "gfx.cafe/gfx/pggat/lib/fed" + "gfx.cafe/gfx/pggat/lib/fed/middlewares/unterminate" + packets "gfx.cafe/gfx/pggat/lib/fed/packets/v3.0" "gfx.cafe/gfx/pggat/lib/gat/handlers/pool" + "gfx.cafe/gfx/pggat/lib/gat/handlers/pool/poolers/rob" "gfx.cafe/gfx/pggat/lib/gat/handlers/pool/spool" "gfx.cafe/gfx/pggat/lib/gat/metrics" ) @@ -12,6 +20,25 @@ type Pool struct { replica spool.Pool } +func NewPool() *Pool { + config := spool.Config{ + PoolerFactory: new(rob.Factory), + UsePS: true, + UseEQP: true, + IdleTimeout: 5 * time.Minute, + ReconnectInitialTime: 5 * time.Second, + ReconnectMaxTime: 1 * time.Minute, + } + + p := &Pool{ + primary: spool.MakePool(config), + replica: spool.MakePool(config), + } + go p.primary.ScaleLoop() + go p.replica.ScaleLoop() + return p +} + func (T *Pool) AddReplicaRecipe(name string, recipe *pool.Recipe) { T.replica.AddRecipe(name, recipe) } @@ -29,8 +56,72 @@ func (T *Pool) RemoveRecipe(name string) { } func (T *Pool) Serve(conn *fed.Conn) error { - // TODO implement me - panic("implement me") + conn.Middleware = append(conn.Middleware, unterminate.Unterminate, &Middleware{}) + + id := uuid.New() + T.primary.AddClient(id) + defer T.primary.RemoveClient(id) + T.replica.AddClient(id) + defer T.replica.RemoveClient(id) + + var err, serverErr error + + var server *spool.Server + defer func() { + if server != nil { + if serverErr != nil { + T.replica.RemoveServer(server) + } else { + T.replica.Release(server) + } + server = nil + } + }() + + if !conn.Ready { + // TODO(garet) pair + enc := packets.ParameterStatus{ + Key: "client_encoding", + Value: "UTF-8", + } + if err = conn.WritePacket(&enc); err != nil { + return err + } + + p := packets.ReadyForQuery('I') + if err = conn.WritePacket(&p); err != nil { + return err + } + + conn.Ready = true + } + + for { + if server != nil { + T.replica.Release(server) + server = nil + } + + var packet fed.Packet + packet, err = conn.ReadPacket(true) + if err != nil { + return err + } + + server = T.replica.Acquire(id) + if server == nil { + return pool.ErrClosed + } + + // TODO(garet) pair + err, serverErr = bouncers.Bounce(conn, server.Conn, packet) + if serverErr != nil { + return serverErr + } + if err != nil { + return err + } + } } func (T *Pool) Cancel(key fed.BackendKey) {