diff --git a/lib/README.md b/lib/README.md new file mode 100644 index 0000000000000000000000000000000000000000..382708b0f526fb069561a07fe76a89db729367ee --- /dev/null +++ b/lib/README.md @@ -0,0 +1,31 @@ +All the business logic for pggat happens here. + +# Folder overview +In general, the top level folder will hold the interface. A subfolder (generally the plural version of the parent folder's name) will hold versioned implementations. + +## auth +All authentication functions. Protocol unspecific. + +## backend +Connection handler for pggat -> postgres + +## frontend +Connection handler for client -> pggat + +## perror +Special postgres error types + +## pnet +Zero allocation network handling + +### pnet/packet +Packet reading/writing helpers + +## rob +A fair-share scheduler + +## router +Routing between frontends and backends + +## util +Project generic helper structures and functions diff --git a/lib/backend/backends/v0/server.go b/lib/backend/backends/v0/server.go index 09d7e0db1fd01c89d85d3eb3ba97471c83e1682f..1526d42abd4f1b174079cf56335762877b9a4c78 100644 --- a/lib/backend/backends/v0/server.go +++ b/lib/backend/backends/v0/server.go @@ -2,7 +2,9 @@ package backends import ( "fmt" + "log" "net" + "runtime/debug" "pggat2/lib/auth/md5" "pggat2/lib/auth/sasl" @@ -14,6 +16,12 @@ import ( "pggat2/lib/util/decorator" ) +var ErrServerFailed = perror.New( + perror.ERROR, + perror.InternalError, + "Internal server error", +) + type Server struct { noCopy decorator.NoCopy @@ -286,6 +294,11 @@ func (T *Server) accept() perror.Error { return nil } +func (T *Server) fail() { + log.Println("!!!!!!!!!!!!!!!!!!!!SERVER FAILED!!!!!!!!!!!!!!!!!!!!!!!!!!") + debug.PrintStack() +} + func (T *Server) copyIn0(peer pnet.ReadWriter) (bool, perror.Error) { in, err := peer.Read() if err != nil { @@ -295,10 +308,18 @@ func (T *Server) copyIn0(peer pnet.ReadWriter) (bool, perror.Error) { switch in.Type() { case packet.CopyData: err = pnet.ProxyPacket(T, in) - return false, perror.Wrap(err) + if err != nil { + T.fail() + return false, ErrServerFailed + } + return false, nil case packet.CopyDone, packet.CopyFail: err = pnet.ProxyPacket(T, in) - return true, perror.Wrap(err) + if err != nil { + T.fail() + return false, ErrServerFailed + } + return true, nil default: return false, pnet.ErrProtocolError } @@ -315,6 +336,7 @@ func (T *Server) copyIn(peer pnet.ReadWriter, in packet.In) perror.Error { for { done, err := T.copyIn0(peer) if err != nil { + // TODO(garet) return the server to a normal state return err } if done { @@ -324,10 +346,53 @@ func (T *Server) copyIn(peer pnet.ReadWriter, in packet.In) perror.Error { return nil } -func (T *Server) query0(peer pnet.ReadWriter) (bool, perror.Error) { +func (T *Server) copyOut0(peer pnet.ReadWriter) (bool, perror.Error) { in, err := T.Read() if err != nil { + T.fail() + return false, ErrServerFailed + } + + switch in.Type() { + case packet.CopyData: + err = pnet.ProxyPacket(peer, in) return false, perror.Wrap(err) + case packet.CopyDone, packet.ErrorResponse: + err = pnet.ProxyPacket(peer, in) + return true, perror.Wrap(err) + default: + T.fail() + return false, ErrServerFailed + } +} + +func (T *Server) copyOut(peer pnet.ReadWriter, in packet.In) perror.Error { + // send in (copyOutResponse) to server + err := pnet.ProxyPacket(T, in) + if err != nil { + T.fail() + return ErrServerFailed + } + + // copy out from server + for { + done, err := T.copyOut0(peer) + if err != nil { + // TODO(garet) return the server to a normal state + return err + } + if done { + break + } + } + return nil +} + +func (T *Server) query0(peer pnet.ReadWriter) (bool, perror.Error) { + in, err := T.Read() + if err != nil { + T.fail() + return false, ErrServerFailed } switch in.Type() { case packet.CommandComplete, @@ -342,11 +407,8 @@ func (T *Server) query0(peer pnet.ReadWriter) (bool, perror.Error) { err := T.copyIn(peer, in) return false, err case packet.CopyOutResponse: - return false, perror.New( - perror.FATAL, - perror.FeatureNotSupported, - "not implemented", - ) // TODO(garet) + err := T.copyOut(peer, in) + return false, err case packet.ReadyForQuery: err = pnet.ProxyPacket(peer, in) return true, perror.Wrap(err) @@ -357,7 +419,8 @@ func (T *Server) query0(peer pnet.ReadWriter) (bool, perror.Error) { } return false, perror.Wrap(pnet.ProxyPacket(peer, in)) default: - return false, pnet.ErrProtocolError + T.fail() + return false, ErrServerFailed } } @@ -365,12 +428,55 @@ func (T *Server) query(peer pnet.ReadWriter, in packet.In) perror.Error { // send in (initial query) to server err := pnet.ProxyPacket(T, in) if err != nil { - return perror.Wrap(err) + T.fail() + return ErrServerFailed } for { done, err := T.query0(peer) if err != nil { + // TODO(garet) return to normal state + return err + } + if done { + break + } + } + return nil +} + +func (T *Server) functionCall0(peer pnet.ReadWriter) (bool, perror.Error) { + in, err := T.Read() + if err != nil { + T.fail() + return false, ErrServerFailed + } + + switch in.Type() { + case packet.ErrorResponse, packet.FunctionCallResponse, packet.NoticeResponse: + err = pnet.ProxyPacket(peer, in) + return false, perror.Wrap(err) + case packet.ReadyForQuery: + err = pnet.ProxyPacket(peer, in) + return true, perror.Wrap(err) + default: + T.fail() + return false, ErrServerFailed + } +} + +func (T *Server) functionCall(peer pnet.ReadWriter, in packet.In) perror.Error { + // send in (FunctionCall) to server + err := pnet.ProxyPacket(T, in) + if err != nil { + T.fail() + return ErrServerFailed + } + + for { + done, err := T.functionCall0(peer) + if err != nil { + // TODO(garet) return to normal state return err } if done { @@ -389,6 +495,8 @@ func (T *Server) Transaction(peer pnet.ReadWriter) perror.Error { switch in.Type() { case packet.Query: return T.query(peer, in) + case packet.FunctionCall: + return T.functionCall(peer, in) default: return perror.New( perror.FATAL, diff --git a/lib/router/router.go b/lib/router/router.go new file mode 100644 index 0000000000000000000000000000000000000000..f9865429ba9c78384e137f7ef6b6509aac408e73 --- /dev/null +++ b/lib/router/router.go @@ -0,0 +1,10 @@ +package router + +import ( + "pggat2/lib/perror" + "pggat2/lib/pnet" +) + +type Router interface { + Transaction(peer pnet.ReadWriter) perror.Error +}