diff --git a/cmd/cgat/main.go b/cmd/cgat/main.go index 21ea9abb7c4f723fde7b046bde0d1f84a8b1194e..866546b594f7114b959b616612fbc89aa1ff8eb5 100644 --- a/cmd/cgat/main.go +++ b/cmd/cgat/main.go @@ -7,7 +7,8 @@ import ( "time" "pggat2/lib/gat" - "pggat2/lib/gat/pools/session" + "pggat2/lib/gat/pools/transaction" + "pggat2/lib/rob" ) func main() { @@ -24,51 +25,64 @@ func main() { pooler.AddUser("postgres", postgres) // create pool - { + rawPool := transaction.NewPool() + pool := gat.NewPool(rawPool, 15*time.Second) + postgres.AddPool("uniswap", pool) + pool.AddRecipe("localhost", gat.TCPRecipe{ + Database: "uniswap", + Address: "localhost:5432", + User: "postgres", + Password: "password", + MinConnections: 0, + MaxConnections: 5, + }) + /* + { + rawPool := session.NewPool(false) + pool := gat.NewPool(rawPool, 15*time.Second) + postgres.AddPool("postgres", pool) + pool.AddRecipe("localhost", gat.TCPRecipe{ + Database: "postgres", + Address: "localhost:5432", + User: "postgres", + Password: "password", + MinConnections: 0, + MaxConnections: 5, + }) + } rawPool := session.NewPool(false) pool := gat.NewPool(rawPool, 15*time.Second) - postgres.AddPool("postgres", pool) + postgres.AddPool("regression", pool) pool.AddRecipe("localhost", gat.TCPRecipe{ - Database: "postgres", + Database: "regression", Address: "localhost:5432", User: "postgres", Password: "password", MinConnections: 0, MaxConnections: 5, }) - } - rawPool := session.NewPool(false) - pool := gat.NewPool(rawPool, 15*time.Second) - postgres.AddPool("regression", pool) - pool.AddRecipe("localhost", gat.TCPRecipe{ - Database: "regression", - Address: "localhost:5432", - User: "postgres", - Password: "password", - MinConnections: 0, - MaxConnections: 5, - }) + */ + + go func() { + var metrics rob.Metrics + for { + time.Sleep(1 * time.Second) + rawPool.ReadSchedulerMetrics(&metrics) + log.Println(metrics.String()) + } + }() /* go func() { - var metrics rob.Metrics + var metrics session.Metrics for { time.Sleep(1 * time.Second) - rawPool.ReadSchedulerMetrics(&metrics) + rawPool.ReadMetrics(&metrics) log.Println(metrics.String()) } }() */ - go func() { - var metrics session.Metrics - - for { - time.Sleep(1 * time.Second) - rawPool.ReadMetrics(&metrics) - log.Println(metrics.String()) - } - }() log.Println("Listening on :6432") diff --git a/lib/middleware/middlewares/eqp/client.go b/lib/middleware/middlewares/eqp/client.go index 0144293aa4a70f4689ba22906f4eb092a7ccca2f..fb559527b914536e5201abbeec24752de2355f1b 100644 --- a/lib/middleware/middlewares/eqp/client.go +++ b/lib/middleware/middlewares/eqp/client.go @@ -53,7 +53,7 @@ func (T *Client) Write(_ middleware.Context, packet *zap.Packet) error { read := packet.Read() switch read.ReadType() { case packets.ReadyForQuery: - state, ok := packets.ReadReadyForQuery(&read) + state, ok := packets.ReadReadyForQuery(read) if !ok { return errors.New("bad packet format") } @@ -71,8 +71,7 @@ func (T *Client) Write(_ middleware.Context, packet *zap.Packet) error { } func (T *Client) Read(ctx middleware.Context, packet *zap.Packet) error { - read := packet.Read() - switch read.ReadType() { + switch packet.ReadType() { case packets.Query: // clobber unnamed portal and unnamed prepared statement T.deletePreparedStatement("") @@ -80,7 +79,7 @@ func (T *Client) Read(ctx middleware.Context, packet *zap.Packet) error { case packets.Parse: ctx.Cancel() - destination, preparedStatement, ok := ReadParse(&read) + destination, preparedStatement, ok := ReadParse(packet.Read()) if !ok { return errors.New("bad packet format") } @@ -96,7 +95,7 @@ func (T *Client) Read(ctx middleware.Context, packet *zap.Packet) error { case packets.Bind: ctx.Cancel() - destination, portal, ok := ReadBind(&read) + destination, portal, ok := ReadBind(packet.Read()) if !ok { return errors.New("bad packet format") } @@ -112,7 +111,7 @@ func (T *Client) Read(ctx middleware.Context, packet *zap.Packet) error { case packets.Close: ctx.Cancel() - which, target, ok := packets.ReadClose(&read) + which, target, ok := packets.ReadClose(packet.Read()) if !ok { return errors.New("bad packet format") } @@ -133,7 +132,7 @@ func (T *Client) Read(ctx middleware.Context, packet *zap.Packet) error { } case packets.Describe: // ensure target exists - which, _, ok := packets.ReadDescribe(&read) + which, _, ok := packets.ReadDescribe(packet.Read()) if !ok { return errors.New("bad packet format") } @@ -144,7 +143,7 @@ func (T *Client) Read(ctx middleware.Context, packet *zap.Packet) error { return errors.New("unknown describe target") } case packets.Execute: - _, _, ok := packets.ReadExecute(&read) + _, _, ok := packets.ReadExecute(packet.Read()) if !ok { return errors.New("bad packet format") } diff --git a/lib/middleware/middlewares/eqp/portal.go b/lib/middleware/middlewares/eqp/portal.go index 0ece97b993f3b9599d9a4bdc874bdfbe3398ae0b..4e29022cc39fbb1f3437064228760e3d5a34ac28 100644 --- a/lib/middleware/middlewares/eqp/portal.go +++ b/lib/middleware/middlewares/eqp/portal.go @@ -13,11 +13,11 @@ type Portal struct { hash uint64 } -func ReadBind(in *zap.ReadablePacket) (destination string, portal Portal, ok bool) { +func ReadBind(in zap.ReadablePacket) (destination string, portal Portal, ok bool) { if in.ReadType() != packets.Bind { return } - in2 := *in + in2 := in destination, ok = in2.ReadString() if !ok { return diff --git a/lib/middleware/middlewares/eqp/preparedStatement.go b/lib/middleware/middlewares/eqp/preparedStatement.go index 96f341334e15a6eac0db341acf0424ada128cbdd..c15f929ff34244706b479aea3b0778cb1feadb97 100644 --- a/lib/middleware/middlewares/eqp/preparedStatement.go +++ b/lib/middleware/middlewares/eqp/preparedStatement.go @@ -12,11 +12,11 @@ type PreparedStatement struct { hash uint64 } -func ReadParse(in *zap.ReadablePacket) (destination string, preparedStatement PreparedStatement, ok bool) { +func ReadParse(in zap.ReadablePacket) (destination string, preparedStatement PreparedStatement, ok bool) { if in.ReadType() != packets.Parse { return } - in2 := *in + in2 := in destination, ok = in2.ReadString() if !ok { return diff --git a/lib/middleware/middlewares/eqp/server.go b/lib/middleware/middlewares/eqp/server.go index 9b391ba56b3740a15e3c726827fec9cebed3bf1d..3d8aaf47b5d3d110302c3453965f79cb420dab7a 100644 --- a/lib/middleware/middlewares/eqp/server.go +++ b/lib/middleware/middlewares/eqp/server.go @@ -213,8 +213,7 @@ func (T *Server) syncPortal(ctx middleware.Context, target string) error { } func (T *Server) Write(ctx middleware.Context, packet *zap.Packet) error { - read := packet.Read() - switch read.ReadType() { + switch packet.ReadType() { case packets.Query: // clobber unnamed portal and unnamed prepared statement T.deletePreparedStatement("") @@ -224,7 +223,7 @@ func (T *Server) Write(ctx middleware.Context, packet *zap.Packet) error { panic("unreachable") case packets.Describe: // ensure target exists - which, target, ok := packets.ReadDescribe(&read) + which, target, ok := packets.ReadDescribe(packet.Read()) if !ok { // should've been caught by eqp.Client panic("unreachable") @@ -246,7 +245,7 @@ func (T *Server) Write(ctx middleware.Context, packet *zap.Packet) error { panic("unknown describe target") } case packets.Execute: - target, _, ok := packets.ReadExecute(&read) + target, _, ok := packets.ReadExecute(packet.Read()) if !ok { // should've been caught by eqp.Client panic("unreachable") @@ -277,7 +276,7 @@ func (T *Server) Read(ctx middleware.Context, packet *zap.Packet) error { T.pendingCloses.PopFront() case packets.ReadyForQuery: - state, ok := packets.ReadReadyForQuery(&read) + state, ok := packets.ReadReadyForQuery(packet.Read()) if !ok { return errors.New("bad packet format") } diff --git a/lib/middleware/middlewares/ps/client.go b/lib/middleware/middlewares/ps/client.go index a78e7ef631889b288d500bcdbfeb106d9c9a5e49..563849c3704c096fe9e8a9db645f83611e5f5f22 100644 --- a/lib/middleware/middlewares/ps/client.go +++ b/lib/middleware/middlewares/ps/client.go @@ -21,10 +21,9 @@ func NewClient() *Client { } func (T *Client) Send(ctx middleware.Context, packet *zap.Packet) error { - read := packet.Read() - switch read.ReadType() { + switch packet.ReadType() { case packets.ParameterStatus: - key, value, ok := packets.ReadParameterStatus(&read) + key, value, ok := packets.ReadParameterStatus(packet.Read()) if !ok { return errors.New("bad packet format") } diff --git a/lib/middleware/middlewares/ps/server.go b/lib/middleware/middlewares/ps/server.go index fd5cd0d44f0bb6e1a9ae43a44986daeb2edb0ebb..971025ba9142e16b130c3b667b3ac0bd5d2413b0 100644 --- a/lib/middleware/middlewares/ps/server.go +++ b/lib/middleware/middlewares/ps/server.go @@ -53,10 +53,9 @@ func (T *Server) Sync(client zap.ReadWriter, ps *Client) error { } func (T *Server) Read(_ middleware.Context, in *zap.Packet) error { - read := in.Read() - switch read.ReadType() { + switch in.ReadType() { case packets.ParameterStatus: - key, value, ok := packets.ReadParameterStatus(&read) + key, value, ok := packets.ReadParameterStatus(in.Read()) if !ok { return errors.New("bad packet format") }