good morning!!!!

Skip to content
Snippets Groups Projects
Commit 663c1f53 authored by Garet Halliday's avatar Garet Halliday
Browse files

almost there

parent 2ad9783a
Branches
Tags
No related merge requests found
...@@ -23,7 +23,7 @@ func (T *Factory) CaddyModule() caddy.ModuleInfo { ...@@ -23,7 +23,7 @@ func (T *Factory) CaddyModule() caddy.ModuleInfo {
} }
func (T *Factory) NewPool() pool.Pool { func (T *Factory) NewPool() pool.Pool {
return new(Pool) return NewPool()
} }
var _ pool.PoolFactory = (*Factory)(nil) var _ pool.PoolFactory = (*Factory)(nil)
......
package hybrid
import (
"log"
"gfx.cafe/gfx/pggat/lib/fed"
packets "gfx.cafe/gfx/pggat/lib/fed/packets/v3.0"
)
type Middleware struct{}
func (T *Middleware) ReadPacket(packet fed.Packet) (fed.Packet, error) {
return packet, nil
}
func (T *Middleware) WritePacket(packet fed.Packet) (fed.Packet, error) {
switch packet.Type() {
case packets.TypeErrorResponse:
var p packets.ErrorResponse
if err := fed.ToConcrete(&p, packet); err != nil {
return nil, err
}
log.Printf("%#v", p)
return &p, nil
}
return packet, nil
}
var _ fed.Middleware = (*Middleware)(nil)
package hybrid package hybrid
import ( import (
"time"
"github.com/google/uuid"
"gfx.cafe/gfx/pggat/lib/bouncer/bouncers/v2"
"gfx.cafe/gfx/pggat/lib/fed" "gfx.cafe/gfx/pggat/lib/fed"
"gfx.cafe/gfx/pggat/lib/fed/middlewares/unterminate"
packets "gfx.cafe/gfx/pggat/lib/fed/packets/v3.0"
"gfx.cafe/gfx/pggat/lib/gat/handlers/pool" "gfx.cafe/gfx/pggat/lib/gat/handlers/pool"
"gfx.cafe/gfx/pggat/lib/gat/handlers/pool/poolers/rob"
"gfx.cafe/gfx/pggat/lib/gat/handlers/pool/spool" "gfx.cafe/gfx/pggat/lib/gat/handlers/pool/spool"
"gfx.cafe/gfx/pggat/lib/gat/metrics" "gfx.cafe/gfx/pggat/lib/gat/metrics"
) )
...@@ -12,6 +20,25 @@ type Pool struct { ...@@ -12,6 +20,25 @@ type Pool struct {
replica spool.Pool replica spool.Pool
} }
func NewPool() *Pool {
config := spool.Config{
PoolerFactory: new(rob.Factory),
UsePS: true,
UseEQP: true,
IdleTimeout: 5 * time.Minute,
ReconnectInitialTime: 5 * time.Second,
ReconnectMaxTime: 1 * time.Minute,
}
p := &Pool{
primary: spool.MakePool(config),
replica: spool.MakePool(config),
}
go p.primary.ScaleLoop()
go p.replica.ScaleLoop()
return p
}
func (T *Pool) AddReplicaRecipe(name string, recipe *pool.Recipe) { func (T *Pool) AddReplicaRecipe(name string, recipe *pool.Recipe) {
T.replica.AddRecipe(name, recipe) T.replica.AddRecipe(name, recipe)
} }
...@@ -29,8 +56,72 @@ func (T *Pool) RemoveRecipe(name string) { ...@@ -29,8 +56,72 @@ func (T *Pool) RemoveRecipe(name string) {
} }
func (T *Pool) Serve(conn *fed.Conn) error { func (T *Pool) Serve(conn *fed.Conn) error {
// TODO implement me conn.Middleware = append(conn.Middleware, unterminate.Unterminate, &Middleware{})
panic("implement me")
id := uuid.New()
T.primary.AddClient(id)
defer T.primary.RemoveClient(id)
T.replica.AddClient(id)
defer T.replica.RemoveClient(id)
var err, serverErr error
var server *spool.Server
defer func() {
if server != nil {
if serverErr != nil {
T.replica.RemoveServer(server)
} else {
T.replica.Release(server)
}
server = nil
}
}()
if !conn.Ready {
// TODO(garet) pair
enc := packets.ParameterStatus{
Key: "client_encoding",
Value: "UTF-8",
}
if err = conn.WritePacket(&enc); err != nil {
return err
}
p := packets.ReadyForQuery('I')
if err = conn.WritePacket(&p); err != nil {
return err
}
conn.Ready = true
}
for {
if server != nil {
T.replica.Release(server)
server = nil
}
var packet fed.Packet
packet, err = conn.ReadPacket(true)
if err != nil {
return err
}
server = T.replica.Acquire(id)
if server == nil {
return pool.ErrClosed
}
// TODO(garet) pair
err, serverErr = bouncers.Bounce(conn, server.Conn, packet)
if serverErr != nil {
return serverErr
}
if err != nil {
return err
}
}
} }
func (T *Pool) Cancel(key fed.BackendKey) { func (T *Pool) Cancel(key fed.BackendKey) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment