good morning!!!!

Skip to content
Snippets Groups Projects
Commit 27ecdeeb authored by Garet Halliday's avatar Garet Halliday
Browse files

handle async operations

parent 73cdafb1
No related branches found
No related tags found
No related merge requests found
...@@ -9,6 +9,27 @@ import ( ...@@ -9,6 +9,27 @@ import (
packets "pggat2/lib/zap/packets/v3.0" packets "pggat2/lib/zap/packets/v3.0"
) )
// serverRead is a wrapper for bctx.Context's ServerRead but it handles async operations
func serverRead(ctx *bctx.Context) (zap.In, berr.Error) {
for {
in, err := ctx.ServerRead()
if err != nil {
return zap.In{}, err
}
switch in.Type() {
case packets.NoticeResponse,
packets.ParameterStatus,
packets.NotificationResponse:
err = ctx.ClientProxy(in)
if err != nil {
return zap.In{}, err
}
default:
return in, nil
}
}
}
func readyForQuery(ctx *bctx.Context, in zap.In) berr.Error { func readyForQuery(ctx *bctx.Context, in zap.In) berr.Error {
state, ok := packets.ReadReadyForQuery(in) state, ok := packets.ReadReadyForQuery(in)
if !ok { if !ok {
...@@ -51,7 +72,7 @@ func copyIn(ctx *bctx.Context) berr.Error { ...@@ -51,7 +72,7 @@ func copyIn(ctx *bctx.Context) berr.Error {
} }
func copyOut0(ctx *bctx.Context) berr.Error { func copyOut0(ctx *bctx.Context) berr.Error {
in, err := ctx.ServerRead() in, err := serverRead(ctx)
if err != nil { if err != nil {
return err return err
} }
...@@ -83,7 +104,7 @@ func copyOut(ctx *bctx.Context) berr.Error { ...@@ -83,7 +104,7 @@ func copyOut(ctx *bctx.Context) berr.Error {
} }
func query0(ctx *bctx.Context) berr.Error { func query0(ctx *bctx.Context) berr.Error {
in, err := ctx.ServerRead() in, err := serverRead(ctx)
if err != nil { if err != nil {
return err return err
} }
...@@ -93,9 +114,7 @@ func query0(ctx *bctx.Context) berr.Error { ...@@ -93,9 +114,7 @@ func query0(ctx *bctx.Context) berr.Error {
packets.RowDescription, packets.RowDescription,
packets.DataRow, packets.DataRow,
packets.EmptyQueryResponse, packets.EmptyQueryResponse,
packets.ErrorResponse, packets.ErrorResponse:
packets.NoticeResponse,
packets.ParameterStatus:
return ctx.ClientProxy(in) return ctx.ClientProxy(in)
case packets.CopyInResponse: case packets.CopyInResponse:
err = ctx.ClientProxy(in) err = ctx.ClientProxy(in)
...@@ -137,13 +156,13 @@ func query(ctx *bctx.Context) berr.Error { ...@@ -137,13 +156,13 @@ func query(ctx *bctx.Context) berr.Error {
} }
func functionCall0(ctx *bctx.Context) berr.Error { func functionCall0(ctx *bctx.Context) berr.Error {
in, err := ctx.ServerRead() in, err := serverRead(ctx)
if err != nil { if err != nil {
return err return err
} }
switch in.Type() { switch in.Type() {
case packets.ErrorResponse, packets.FunctionCallResponse, packets.NoticeResponse: case packets.ErrorResponse, packets.FunctionCallResponse:
return ctx.ClientProxy(in) return ctx.ClientProxy(in)
case packets.ReadyForQuery: case packets.ReadyForQuery:
err = ctx.ClientProxy(in) err = ctx.ClientProxy(in)
...@@ -173,7 +192,7 @@ func functionCall(ctx *bctx.Context) berr.Error { ...@@ -173,7 +192,7 @@ func functionCall(ctx *bctx.Context) berr.Error {
} }
func sync0(ctx *bctx.Context) berr.Error { func sync0(ctx *bctx.Context) berr.Error {
in, err := ctx.ServerRead() in, err := serverRead(ctx)
if err != nil { if err != nil {
return err return err
} }
...@@ -189,8 +208,6 @@ func sync0(ctx *bctx.Context) berr.Error { ...@@ -189,8 +208,6 @@ func sync0(ctx *bctx.Context) berr.Error {
packets.CommandComplete, packets.CommandComplete,
packets.DataRow, packets.DataRow,
packets.EmptyQueryResponse, packets.EmptyQueryResponse,
packets.NoticeResponse,
packets.ParameterStatus,
packets.PortalSuspended: packets.PortalSuspended:
return ctx.ClientProxy(in) return ctx.ClientProxy(in)
case packets.ReadyForQuery: case packets.ReadyForQuery:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment