diff --git a/lib/gat/modes/pgbouncer/pools.go b/lib/gat/modes/pgbouncer/pools.go index c6d0585d6d731d74d285534aa321b35390e2cf97..0080800e41e29d5d9fb05fa1bd548211fd121d86 100644 --- a/lib/gat/modes/pgbouncer/pools.go +++ b/lib/gat/modes/pgbouncer/pools.go @@ -159,15 +159,16 @@ func (T *Pools) Lookup(user, database string) *pool.Pool { switch poolMode { case PoolModeSession: - p = session.NewPool(poolOptions) + poolOptions = session.Apply(poolOptions) case PoolModeTransaction: if T.Config.PgBouncer.ServerResetQueryAlways == 0 { poolOptions.ServerResetQuery = "" } - p = transaction.NewPool(poolOptions) + poolOptions = transaction.Apply(poolOptions) default: return nil } + p = pool.NewPool(poolOptions) T.pools.Store(poolKey{ User: user, diff --git a/lib/gat/modes/zalando_operator_discovery/server.go b/lib/gat/modes/zalando_operator_discovery/server.go index eaf744048dcfe3047f5cd89a44d9594d399714d1..77ba379470dc6b63cfbd6a784e9104ae836af868 100644 --- a/lib/gat/modes/zalando_operator_discovery/server.go +++ b/lib/gat/modes/zalando_operator_discovery/server.go @@ -138,16 +138,16 @@ func (T *Server) addPool(name string, userCreds, serverCreds auth.Credentials, d poolOptions := pool.Options{ Credentials: userCreds, } - var p *pool.Pool switch T.opConfig.Mode { case "transaction": - p = transaction.NewPool(poolOptions) + poolOptions = transaction.Apply(poolOptions) case "session": - p = session.NewPool(poolOptions) + poolOptions = session.Apply(poolOptions) default: log.Printf(`unknown pool mode "%s"`, T.opConfig.Mode) return } + p := pool.NewPool(poolOptions) recipeOptions := recipe.Options{ Dialer: d, diff --git a/lib/gat/pool/pool.go b/lib/gat/pool/pool.go index 6ebc5f99ab1088843e0915fc2d36d3b9916f3bb3..11a23a69e25c2cedc1ed853d33dfd585daafec73 100644 --- a/lib/gat/pool/pool.go +++ b/lib/gat/pool/pool.go @@ -1,6 +1,7 @@ package pool import ( + "log" "sync" "time" @@ -157,6 +158,7 @@ func (T *Pool) scaleUp() { func (T *Pool) scaleUpL1(name string, r *recipe.Recipe) { conn, params, err := r.Dial() if err != nil { + log.Print("failed to dial server: ", err) // failed to dial r.Free() return diff --git a/lib/gat/pool/pools/session/pool.go b/lib/gat/pool/pools/session/apply.go similarity index 69% rename from lib/gat/pool/pools/session/pool.go rename to lib/gat/pool/pools/session/apply.go index 4f405641c735e0a59aff5caac5ac951f1119283b..f4ddee0e53e95c125439a677e3daf65354b5740b 100644 --- a/lib/gat/pool/pools/session/pool.go +++ b/lib/gat/pool/pools/session/apply.go @@ -4,9 +4,9 @@ import ( "pggat/lib/gat/pool" ) -func NewPool(options pool.Options) *pool.Pool { +func Apply(options pool.Options) pool.Options { options.Pooler = new(Pooler) options.ParameterStatusSync = pool.ParameterStatusSyncInitial options.ExtendedQuerySync = false - return pool.NewPool(options) + return options } diff --git a/lib/gat/pool/pools/transaction/pool.go b/lib/gat/pool/pools/transaction/apply.go similarity index 69% rename from lib/gat/pool/pools/transaction/pool.go rename to lib/gat/pool/pools/transaction/apply.go index 922f244df951fb1a77ced300c832da9f5d196fbe..a431fe945578f9735c9145649a0192090e0dc406 100644 --- a/lib/gat/pool/pools/transaction/pool.go +++ b/lib/gat/pool/pools/transaction/apply.go @@ -2,9 +2,9 @@ package transaction import "pggat/lib/gat/pool" -func NewPool(options pool.Options) *pool.Pool { +func Apply(options pool.Options) pool.Options { options.Pooler = new(Pooler) options.ParameterStatusSync = pool.ParameterStatusSyncDynamic options.ExtendedQuerySync = true - return pool.NewPool(options) + return options } diff --git a/test/config.go b/test/config.go index fe864abfdfd9d818e6afbb2391da56aa230ac5fe..6ccdb9e551cb0d5fabdc87604844ef0244d53746 100644 --- a/test/config.go +++ b/test/config.go @@ -1,9 +1,11 @@ package test import ( + "pggat/lib/gat/pool" "pggat/lib/gat/pool/dialer" ) type Config struct { - Peer dialer.Dialer + Modes map[string]pool.Options + Peer dialer.Dialer } diff --git a/test/runner.go b/test/runner.go new file mode 100644 index 0000000000000000000000000000000000000000..74e234f934c1b353af198ebb902ac890b5d4f850 --- /dev/null +++ b/test/runner.go @@ -0,0 +1,93 @@ +package test + +import ( + "errors" + "io" + + "tuxpa.in/a/zlog/log" + + "pggat/lib/fed" + packets "pggat/lib/fed/packets/v3.0" + "pggat/lib/gat/pool" + "pggat/lib/gat/pool/recipe" + "pggat/lib/gsql" + "pggat/lib/util/maps" + "pggat/test/inst" +) + +type Runner struct { + config Config + test Test + + pools map[string]*pool.Pool +} + +func MakeRunner(config Config, test Test) Runner { + return Runner{ + config: config, + test: test, + } +} + +func (T *Runner) setup() error { + // get pools ready + maps.Clear(T.pools) + if T.pools == nil { + T.pools = make(map[string]*pool.Pool) + } + + for name, options := range T.config.Modes { + p := pool.NewPool(options) + p.AddRecipe("server", recipe.NewRecipe( + recipe.Options{ + Dialer: T.config.Peer, + }, + )) + T.pools[name] = p + } + + return nil +} + +type logWriter struct{} + +func (logWriter) WritePacket(pkt fed.Packet) error { + log.Print("got packet ", pkt) + return nil +} + +func (T *Runner) run(pkts ...fed.Packet) error { + for name, p := range T.pools { + var client gsql.Client + client.Do(logWriter{}, pkts...) + if err := client.Close(); err != nil { + return err + } + + if err := p.Serve(&client, nil, [8]byte{}); err != nil && !errors.Is(err, io.EOF) { + return err + } + + _ = name + } + + return nil +} + +func (T *Runner) Run() error { + if err := T.setup(); err != nil { + return err + } + + for _, i := range T.test.Instructions { + switch v := i.(type) { + case inst.SimpleQuery: + q := packets.Query(v) + if err := T.run(q.IntoPacket()); err != nil { + return err + } + } + } + + return nil +} diff --git a/test/tester.go b/test/tester.go index 11c7b877be64a61eeb293b664914cf4e359f5af5..c06ce26870b2c60299b45e06fa128eefb378dda0 100644 --- a/test/tester.go +++ b/test/tester.go @@ -1,11 +1,5 @@ package test -import ( - "tuxpa.in/a/zlog/log" - - "pggat/test/inst" -) - type Tester struct { config Config } @@ -16,19 +10,10 @@ func NewTester(config Config) *Tester { } } -func (T *Tester) run(test Test) error { - for _, v := range test.Instructions { - switch i := v.(type) { - case inst.SimpleQuery: - log.Println("run", i) - } - } - return nil // TODO(garet) -} - func (T *Tester) Run(tests ...Test) error { for _, test := range tests { - if err := T.run(test); err != nil { + runner := MakeRunner(T.config, test) + if err := runner.Run(); err != nil { return err } } diff --git a/test/tester_test.go b/test/tester_test.go index 4c351bbaf5a45b2cb1c87ff7f3b70d26898f0dce..9f74ebbffe9ec1c1ef3ec1898655edcde53e93ff 100644 --- a/test/tester_test.go +++ b/test/tester_test.go @@ -5,13 +5,22 @@ import ( "pggat/lib/auth/credentials" "pggat/lib/bouncer/backends/v0" + "pggat/lib/gat/pool" "pggat/lib/gat/pool/dialer" + "pggat/lib/gat/pool/pools/session" + "pggat/lib/gat/pool/pools/transaction" "pggat/test" "pggat/test/tests" ) func TestTester(t *testing.T) { tester := test.NewTester(test.Config{ + Modes: map[string]pool.Options{ + "transaction": transaction.Apply(pool.Options{}), + "session": session.Apply(pool.Options{ + ServerResetQuery: "discard all", + }), + }, Peer: dialer.Net{ Network: "tcp", Address: "localhost:5432", @@ -20,7 +29,7 @@ func TestTester(t *testing.T) { Username: "postgres", Password: "password", }, - Database: "pggat", + Database: "postgres", }, }, })