From cde0a5d6f346e1817e31109fda946b6b5a981e4d Mon Sep 17 00:00:00 2001 From: a <a@tuxpa.in> Date: Tue, 24 Oct 2023 12:03:47 -0500 Subject: [PATCH] tests --- contrib/codecs/broker/server.go | 5 ++++- contrib/codecs/http/codec.go | 17 ++++++----------- contrib/codecs/rdwr/codec.go | 2 +- pkg/server/server.go | 5 ++++- 4 files changed, 15 insertions(+), 14 deletions(-) diff --git a/contrib/codecs/broker/server.go b/contrib/codecs/broker/server.go index 4aedc4d..6e76c20 100644 --- a/contrib/codecs/broker/server.go +++ b/contrib/codecs/broker/server.go @@ -25,6 +25,9 @@ func (s *Server) ServeSpoke(ctx context.Context, stream ServerSpoke) { continue } cd := NewCodec(req, fn) - go s.Server.ServeCodec(ctx, cd) + go func() { + s.Server.ServeCodec(ctx, cd) + cd.Close() + }() } } diff --git a/contrib/codecs/http/codec.go b/contrib/codecs/http/codec.go index 426e9ff..fa57823 100644 --- a/contrib/codecs/http/codec.go +++ b/contrib/codecs/http/codec.go @@ -1,7 +1,6 @@ package http import ( - "bufio" "context" "encoding/base64" "encoding/json" @@ -26,7 +25,7 @@ type Codec struct { r *http.Request w http.ResponseWriter - wr *bufio.Writer + wr io.Writer msgs chan *serverutil.Bundle errCh chan httpError @@ -45,17 +44,12 @@ func NewCodec(w http.ResponseWriter, r *http.Request) *Codec { } func (c *Codec) Reset(w http.ResponseWriter, r *http.Request) { - ir := io.Writer(w) + c.wr = w if w == nil { - ir = io.Discard + c.wr = io.Discard } c.r = r c.w = w - if c.wr == nil { - c.wr = bufio.NewWriter(ir) - } else { - c.wr.Reset(ir) - } c.msgs = make(chan *serverutil.Bundle, 1) c.errCh = make(chan httpError, 1) @@ -210,7 +204,7 @@ func (c *Codec) ReadBatch(ctx context.Context) ([]*codec.Message, bool, error) { case ans := <-c.msgs: return ans.Messages, ans.Batch, nil case err := <-c.errCh: - http.Error(c.w, err.err.Error(), err.code) + // http.Error(c.w, err.err.Error(), err.code) return nil, false, err.err case <-ctx.Done(): return nil, false, ctx.Err() @@ -226,11 +220,12 @@ func (c *Codec) Close() error { } func (c *Codec) Send(ctx context.Context, msg json.RawMessage) (err error) { + defer c.cn() _, err = c.wr.Write(msg) if err != nil { return err } - return c.wr.Flush() + return nil } // Closed returns a channel which is closed when the connection is closed. diff --git a/contrib/codecs/rdwr/codec.go b/contrib/codecs/rdwr/codec.go index 1c8acfa..8226ac7 100644 --- a/contrib/codecs/rdwr/codec.go +++ b/contrib/codecs/rdwr/codec.go @@ -77,7 +77,7 @@ func (c *Codec) Close() error { func (c *Codec) Send(ctx context.Context, buf json.RawMessage) error { c.wrLock.Lock() defer c.wrLock.Unlock() - _, err := c.w.Write(buf) + _, err := c.w.Write(append(buf, '\n')) return err } diff --git a/pkg/server/server.go b/pkg/server/server.go index a4baf3d..e2c63ac 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -159,6 +159,9 @@ func (s *Server) ServeCodec(pctx context.Context, remote codec.ReaderWriter) err case <-ctx.Done(): return nil case err := <-errch: + // perform a flush on error just in case there are dangling things to be sent, states to be cleaned up, etc. + // the connection is already dead, so at this point there are no rules, so this is okay to do i think + remote.Send(context.Background(), nil) return err } } @@ -237,7 +240,7 @@ func (c *callResponder) send(ctx context.Context, env *callEnv) (err error) { } } if allSkip { - return nil + return c.remote.Send(ctx, nil) } } // create the streaming encoder -- GitLab