From 3ddc8f368552e1422a9a82eb65d9554f1b8881d9 Mon Sep 17 00:00:00 2001 From: Garet Halliday <me@garet.holiday> Date: Fri, 12 May 2023 14:51:08 -0500 Subject: [PATCH] sad --- cmd/cgat/main.go | 12 ++- .../middlewares/unterminate/unterminate.go | 31 ------- lib/mw2/interceptor/context.go | 27 ++++++ lib/mw2/interceptor/interceptor.go | 89 +++++++++++++++++++ .../middlewares/unterminate/unterminate.go | 24 +++++ 5 files changed, 148 insertions(+), 35 deletions(-) delete mode 100644 lib/middleware/middlewares/unterminate/unterminate.go create mode 100644 lib/mw2/interceptor/context.go create mode 100644 lib/mw2/interceptor/interceptor.go create mode 100644 lib/mw2/middlewares/unterminate/unterminate.go diff --git a/cmd/cgat/main.go b/cmd/cgat/main.go index 6058bc69..f3b3444f 100644 --- a/cmd/cgat/main.go +++ b/cmd/cgat/main.go @@ -9,7 +9,9 @@ import ( "pggat2/lib/bouncer/bouncers/v0" "pggat2/lib/bouncer/frontends/v0" "pggat2/lib/middleware/middlewares/onebuffer" - "pggat2/lib/middleware/middlewares/unterminate" + "pggat2/lib/mw2" + "pggat2/lib/mw2/interceptor" + "pggat2/lib/mw2/middlewares/unterminate" "pggat2/lib/rob" "pggat2/lib/rob/schedulers/v2" "pggat2/lib/zap" @@ -59,9 +61,11 @@ func main() { go func() { source := r.NewSource() client := zio.MakeReadWriter(conn) - ut := unterminate.MakeUnterminate(&client) - frontends.Accept(ut) ob := onebuffer.MakeOnebuffer(&client) + mw := interceptor.MakeInterceptor(&ob, []mw2.Middleware{ + unterminate.Unterminate, + }) + frontends.Accept(&mw) done := make(chan struct{}) defer close(done) for { @@ -70,7 +74,7 @@ func main() { break } source.Schedule(job{ - client: &ob, + client: &mw, done: done, }, 0) <-done diff --git a/lib/middleware/middlewares/unterminate/unterminate.go b/lib/middleware/middlewares/unterminate/unterminate.go deleted file mode 100644 index 5d818b4a..00000000 --- a/lib/middleware/middlewares/unterminate/unterminate.go +++ /dev/null @@ -1,31 +0,0 @@ -package unterminate - -import ( - "io" - - "pggat2/lib/zap" - packets "pggat2/lib/zap/packets/v3.0" -) - -type Unterminate struct { - zap.ReadWriter -} - -func MakeUnterminate(inner zap.ReadWriter) Unterminate { - return Unterminate{ - ReadWriter: inner, - } -} - -func (T Unterminate) Read() (zap.In, error) { - in, err := T.ReadWriter.Read() - if err != nil { - return zap.In{}, err - } - if in.Type() == packets.Terminate { - return zap.In{}, io.EOF - } - return in, nil -} - -var _ zap.ReadWriter = Unterminate{} diff --git a/lib/mw2/interceptor/context.go b/lib/mw2/interceptor/context.go new file mode 100644 index 00000000..4afb090c --- /dev/null +++ b/lib/mw2/interceptor/context.go @@ -0,0 +1,27 @@ +package interceptor + +import ( + "pggat2/lib/mw2" + "pggat2/lib/zap" +) + +type Context struct { + cancelled bool + zap.ReadWriter +} + +func makeContext(rw zap.ReadWriter) Context { + return Context{ + ReadWriter: rw, + } +} + +func (T *Context) reset() { + T.cancelled = false +} + +func (T *Context) Cancel() { + T.cancelled = true +} + +var _ mw2.Context = (*Context)(nil) diff --git a/lib/mw2/interceptor/interceptor.go b/lib/mw2/interceptor/interceptor.go new file mode 100644 index 00000000..09d87873 --- /dev/null +++ b/lib/mw2/interceptor/interceptor.go @@ -0,0 +1,89 @@ +package interceptor + +import ( + "pggat2/lib/mw2" + "pggat2/lib/util/decorator" + "pggat2/lib/zap" +) + +type Interceptor struct { + noCopy decorator.NoCopy + + middlewares []mw2.Middleware + Context +} + +func MakeInterceptor(rw zap.ReadWriter, middlewares []mw2.Middleware) Interceptor { + return Interceptor{ + middlewares: middlewares, + Context: makeContext(rw), + } +} + +func (T *Interceptor) Read() (zap.In, error) { + for { + in, err := T.ReadWriter.Read() + if err != nil { + return zap.In{}, err + } + + T.Context.reset() + for _, mw := range T.middlewares { + err = mw.Read(&T.Context, in) + if err != nil { + return zap.In{}, err + } + if T.cancelled { + break + } + } + + if !T.cancelled { + return in, nil + } + } +} + +func (T *Interceptor) ReadUntyped() (zap.In, error) { + for { + in, err := T.ReadWriter.ReadUntyped() + if err != nil { + return zap.In{}, err + } + + T.Context.reset() + for _, mw := range T.middlewares { + err = mw.Read(&T.Context, in) + if err != nil { + return zap.In{}, err + } + if T.cancelled { + break + } + } + + if !T.cancelled { + return in, nil + } + } +} + +func (T *Interceptor) Send(out zap.Out) error { + T.Context.reset() + for _, mw := range T.middlewares { + err := mw.Send(&T.Context, out) + if err != nil { + return err + } + if T.cancelled { + break + } + } + + if !T.cancelled { + return T.Context.ReadWriter.Send(out) + } + return nil +} + +var _ zap.ReadWriter = (*Interceptor)(nil) diff --git a/lib/mw2/middlewares/unterminate/unterminate.go b/lib/mw2/middlewares/unterminate/unterminate.go new file mode 100644 index 00000000..4fe9524e --- /dev/null +++ b/lib/mw2/middlewares/unterminate/unterminate.go @@ -0,0 +1,24 @@ +package unterminate + +import ( + "io" + + "pggat2/lib/mw2" + "pggat2/lib/zap" + packets "pggat2/lib/zap/packets/v3.0" +) + +var Unterminate = unterm{} + +type unterm struct { + mw2.Nil +} + +func (unterm) Read(_ mw2.Context, in zap.In) error { + if in.Type() == packets.Terminate { + return io.EOF + } + return nil +} + +var _ mw2.Middleware = unterm{} -- GitLab