diff --git a/lib/bouncer/bouncers/v1/bouncer.go b/lib/bouncer/bouncers/v1/bouncer.go index 50ec65fa7f5ea4016932c31407cafb60117ae794..8cbbbdb085dffcd124ba171357ed13890a631931 100644 --- a/lib/bouncer/bouncers/v1/bouncer.go +++ b/lib/bouncer/bouncers/v1/bouncer.go @@ -9,6 +9,27 @@ import ( packets "pggat2/lib/zap/packets/v3.0" ) +// serverRead is a wrapper for bctx.Context's ServerRead but it handles async operations +func serverRead(ctx *bctx.Context) (zap.In, berr.Error) { + for { + in, err := ctx.ServerRead() + if err != nil { + return zap.In{}, err + } + switch in.Type() { + case packets.NoticeResponse, + packets.ParameterStatus, + packets.NotificationResponse: + err = ctx.ClientProxy(in) + if err != nil { + return zap.In{}, err + } + default: + return in, nil + } + } +} + func readyForQuery(ctx *bctx.Context, in zap.In) berr.Error { state, ok := packets.ReadReadyForQuery(in) if !ok { @@ -51,7 +72,7 @@ func copyIn(ctx *bctx.Context) berr.Error { } func copyOut0(ctx *bctx.Context) berr.Error { - in, err := ctx.ServerRead() + in, err := serverRead(ctx) if err != nil { return err } @@ -83,7 +104,7 @@ func copyOut(ctx *bctx.Context) berr.Error { } func query0(ctx *bctx.Context) berr.Error { - in, err := ctx.ServerRead() + in, err := serverRead(ctx) if err != nil { return err } @@ -93,9 +114,7 @@ func query0(ctx *bctx.Context) berr.Error { packets.RowDescription, packets.DataRow, packets.EmptyQueryResponse, - packets.ErrorResponse, - packets.NoticeResponse, - packets.ParameterStatus: + packets.ErrorResponse: return ctx.ClientProxy(in) case packets.CopyInResponse: err = ctx.ClientProxy(in) @@ -137,13 +156,13 @@ func query(ctx *bctx.Context) berr.Error { } func functionCall0(ctx *bctx.Context) berr.Error { - in, err := ctx.ServerRead() + in, err := serverRead(ctx) if err != nil { return err } switch in.Type() { - case packets.ErrorResponse, packets.FunctionCallResponse, packets.NoticeResponse: + case packets.ErrorResponse, packets.FunctionCallResponse: return ctx.ClientProxy(in) case packets.ReadyForQuery: err = ctx.ClientProxy(in) @@ -173,7 +192,7 @@ func functionCall(ctx *bctx.Context) berr.Error { } func sync0(ctx *bctx.Context) berr.Error { - in, err := ctx.ServerRead() + in, err := serverRead(ctx) if err != nil { return err } @@ -189,8 +208,6 @@ func sync0(ctx *bctx.Context) berr.Error { packets.CommandComplete, packets.DataRow, packets.EmptyQueryResponse, - packets.NoticeResponse, - packets.ParameterStatus, packets.PortalSuspended: return ctx.ClientProxy(in) case packets.ReadyForQuery: