diff --git a/lib/gsql/client.go b/lib/gsql/client.go index ff9a15e6f35f254d02463ee72d2a2da136359127..436acd50d24dacc93c8a5a4e0aa84e29c8b9596f 100644 --- a/lib/gsql/client.go +++ b/lib/gsql/client.go @@ -41,6 +41,22 @@ func (T *Client) Do(result ResultWriter, packets ...fed.Packet) { } } +func (T *Client) queueNext() bool { + b, ok := T.queue.PopFront() + if ok { + for _, packet := range b.packets { + T.read.PushBack(packet) + } + T.write = b.result + if T.writeC != nil { + T.writeC.Broadcast() + } + return true + } + + return false +} + func (T *Client) ReadPacket(typed bool) (fed.Packet, error) { T.mu.Lock() defer T.mu.Unlock() @@ -54,15 +70,7 @@ func (T *Client) ReadPacket(typed bool) (fed.Packet, error) { } // try to add next in queue - b, ok := T.queue.PopFront() - if ok { - for _, packet := range b.packets { - T.read.PushBack(packet) - } - T.write = b.result - if T.writeC != nil { - T.writeC.Broadcast() - } + if T.queueNext() { continue } @@ -88,6 +96,10 @@ func (T *Client) WritePacket(packet fed.Packet) error { defer T.mu.Unlock() for T.write == nil { + if T.read.Length() == 0 && T.queueNext() { + continue + } + if T.closed { return io.EOF } diff --git a/lib/middleware/middlewares/ps/client.go b/lib/middleware/middlewares/ps/client.go index bdc92e77157d12e7a15dc0ca3956666c5a5e4169..e3970c8f388258aa97b6efccf32f5c00112a4e45 100644 --- a/lib/middleware/middlewares/ps/client.go +++ b/lib/middleware/middlewares/ps/client.go @@ -30,7 +30,7 @@ func (T *Client) Write(ctx middleware.Context, packet fed.Packet) error { return errors.New("bad packet format i") } ikey := strutil.MakeCIString(ps.Key) - if T.parameters[ikey] == ps.Value { + if T.synced && T.parameters[ikey] == ps.Value { // already set ctx.Cancel() break diff --git a/lib/util/maps/clone.go b/lib/util/maps/clone.go new file mode 100644 index 0000000000000000000000000000000000000000..4b6c6162caf47bc545007e11868ae52ad59e78d8 --- /dev/null +++ b/lib/util/maps/clone.go @@ -0,0 +1,14 @@ +package maps + +func Clone[K comparable, V any](m map[K]V) map[K]V { + if m == nil { + return nil + } + + m2 := make(map[K]V, len(m)) + for k, v := range m { + m2[k] = v + } + + return m2 +} diff --git a/test/runner.go b/test/runner.go index 74e234f934c1b353af198ebb902ac890b5d4f850..f61cb293c74cb6bcee2e7d68981e848b89e86d55 100644 --- a/test/runner.go +++ b/test/runner.go @@ -6,6 +6,7 @@ import ( "tuxpa.in/a/zlog/log" + "pggat/lib/bouncer/bouncers/v2" "pggat/lib/fed" packets "pggat/lib/fed/packets/v3.0" "pggat/lib/gat/pool" @@ -37,7 +38,10 @@ func (T *Runner) setup() error { } for name, options := range T.config.Modes { - p := pool.NewPool(options) + opts := options + // allowing ps sync would mess up testing + opts.ParameterStatusSync = pool.ParameterStatusSyncNone + p := pool.NewPool(opts) p.AddRecipe("server", recipe.NewRecipe( recipe.Options{ Dialer: T.config.Peer, @@ -57,7 +61,45 @@ func (logWriter) WritePacket(pkt fed.Packet) error { } func (T *Runner) run(pkts ...fed.Packet) error { + // expected + { + log.Print("expected packets") + + var client gsql.Client + client.Do(logWriter{}, pkts...) + if err := client.Close(); err != nil { + return err + } + + server, _, err := T.config.Peer.Dial() + if err != nil { + return err + } + + for { + p, err := client.ReadPacket(true) + if err != nil { + if errors.Is(err, io.EOF) { + break + } + return err + } + + clientErr, serverErr := bouncers.Bounce(&client, server, p) + if clientErr != nil { + return clientErr + } + if serverErr != nil { + return serverErr + } + } + } + + // actual for name, p := range T.pools { + log.Print() + log.Print("pool ", name) + var client gsql.Client client.Do(logWriter{}, pkts...) if err := client.Close(); err != nil { diff --git a/test/test.go b/test/test.go index 8ef8748796644fe518da51c6de763fb87ce20961..49b095d0ca320f4ddf33c84a1197991c908eabcf 100644 --- a/test/test.go +++ b/test/test.go @@ -3,6 +3,5 @@ package test import "pggat/test/inst" type Test struct { - Parallel bool Instructions []inst.Instruction } diff --git a/test/tests/simple_query.go b/test/tests/simple_query.go index d1697c19cb21f005bb8b7f4a2321dacefba39b80..638ec6819ba375d84a08d755378b24bdde6c5bd5 100644 --- a/test/tests/simple_query.go +++ b/test/tests/simple_query.go @@ -6,7 +6,6 @@ import ( ) var SimpleQuery = test.Test{ - Parallel: true, Instructions: []inst.Instruction{ inst.SimpleQuery("select 1;"), },