diff --git a/contrib/codecs/broker/codec.go b/contrib/codecs/broker/codec.go index b372afa27c17f06c6bc24c80d91db3a3b7343bbc..5db312c0885bfe617f75284aa0edefaed1e27829 100644 --- a/contrib/codecs/broker/codec.go +++ b/contrib/codecs/broker/codec.go @@ -67,15 +67,8 @@ func (c *Codec) Close() error { return nil } -func (c *Codec) Write(p []byte) (n int, err error) { - return c.wr.Write(p) -} - -func (c *Codec) Flush() (err error) { - c.replier(c.wr.Bytes()) - c.wr.Reset() - c.Close() - return +func (c *Codec) Send(ctx context.Context, msg json.RawMessage) (err error) { + return c.replier(msg) } // Closed returns a channel which is closed when the connection is closed. diff --git a/contrib/codecs/broker/server.go b/contrib/codecs/broker/server.go index aee4e828cd52d324e7c4f4467af0d130a04e61cf..6e76c20a766ba9bbc78dea8d0d312d04d586a8f8 100644 --- a/contrib/codecs/broker/server.go +++ b/contrib/codecs/broker/server.go @@ -25,6 +25,9 @@ func (s *Server) ServeSpoke(ctx context.Context, stream ServerSpoke) { continue } cd := NewCodec(req, fn) - s.Server.ServeCodec(ctx, cd) + go func() { + s.Server.ServeCodec(ctx, cd) + cd.Close() + }() } } diff --git a/contrib/codecs/http/codec.go b/contrib/codecs/http/codec.go index 4e2f1e5c6fdfc966da26fb0e9de996a4234a19f1..7e2ca915e07587a772cc2cfe2f22448ccb34a6e2 100644 --- a/contrib/codecs/http/codec.go +++ b/contrib/codecs/http/codec.go @@ -1,9 +1,9 @@ package http import ( - "bufio" "context" "encoding/base64" + "encoding/json" "errors" "fmt" "io" @@ -25,7 +25,7 @@ type Codec struct { r *http.Request w http.ResponseWriter - wr *bufio.Writer + wr io.Writer msgs chan *serverutil.Bundle errCh chan httpError @@ -44,17 +44,12 @@ func NewCodec(w http.ResponseWriter, r *http.Request) *Codec { } func (c *Codec) Reset(w http.ResponseWriter, r *http.Request) { - ir := io.Writer(w) + c.wr = w if w == nil { - ir = io.Discard + c.wr = io.Discard } c.r = r c.w = w - if c.wr == nil { - c.wr = bufio.NewWriter(ir) - } else { - c.wr.Reset(ir) - } c.msgs = make(chan *serverutil.Bundle, 1) c.errCh = make(chan httpError, 1) @@ -224,17 +219,13 @@ func (c *Codec) Close() error { return nil } -func (c *Codec) Write(p []byte) (n int, err error) { - return c.wr.Write(p) -} - -func (c *Codec) Flush() (err error) { - err = c.wr.Flush() +func (c *Codec) Send(ctx context.Context, msg json.RawMessage) (err error) { + defer c.cn() + _, err = c.wr.Write(msg) if err != nil { return err } - c.cn() - return + return nil } // Closed returns a channel which is closed when the connection is closed. diff --git a/contrib/codecs/inproc/inproc.go b/contrib/codecs/inproc/inproc.go index b7e60e818270f3a87aebc13454eba98bc932737d..5a79a5885f47a5d13d3b3696a4e68512a649d52e 100644 --- a/contrib/codecs/inproc/inproc.go +++ b/contrib/codecs/inproc/inproc.go @@ -3,6 +3,7 @@ package inproc import ( "bufio" "context" + "encoding/json" "io" "sync" @@ -58,15 +59,13 @@ func (c *Codec) Close() error { return nil } -func (c *Codec) Write(p []byte) (n int, err error) { - c.wrLock.Lock() - defer c.wrLock.Unlock() - return c.wr.Write(p) -} - -func (c *Codec) Flush() (err error) { +func (c *Codec) Send(ctx context.Context, msg json.RawMessage) (err error) { c.wrLock.Lock() defer c.wrLock.Unlock() + _, err = c.wr.Write(msg) + if err != nil { + return err + } return c.wr.Flush() } diff --git a/contrib/codecs/rdwr/codec.go b/contrib/codecs/rdwr/codec.go index 0a6850a65f86164c3a428c3fd1f01f814716b5ae..1a3311ec8de73fc52c93af9d6fbe8415f3dec4ef 100644 --- a/contrib/codecs/rdwr/codec.go +++ b/contrib/codecs/rdwr/codec.go @@ -2,7 +2,6 @@ package rdwr import ( "bufio" - "bytes" "context" "io" "sync" @@ -19,7 +18,6 @@ type Codec struct { rd io.Reader wrLock sync.Mutex - wr *bytes.Buffer w io.Writer dec *json.Decoder @@ -35,7 +33,6 @@ func NewCodec(rd io.Reader, wr io.Writer) *Codec { cn: cn, rd: bufr, dec: json.NewDecoder(rd), - wr: new(bytes.Buffer), w: wr, } return c @@ -75,25 +72,11 @@ func (c *Codec) Close() error { return nil } -func (c *Codec) Write(p []byte) (n int, err error) { +func (c *Codec) Send(ctx context.Context, buf json.RawMessage) error { c.wrLock.Lock() defer c.wrLock.Unlock() - return c.wr.Write(p) -} - -func (c *Codec) Flush() (err error) { - c.wrLock.Lock() - defer c.wrLock.Unlock() - defer c.wr.Reset() - err = c.wr.WriteByte('\n') - if err != nil { - return err - } - _, err = c.wr.WriteTo(c.w) - if err != nil { - return err - } - return nil + _, err := c.w.Write(append(buf, '\n')) + return err } // Closed returns a channel which is closed when the connection is closed. diff --git a/pkg/codec/transport.go b/pkg/codec/transport.go index a8393593f0c72310cff7398ad73ebf6d3e72f643..de6cc1f9f4e3a82e5bf74d5c12f48725b660ee25 100644 --- a/pkg/codec/transport.go +++ b/pkg/codec/transport.go @@ -2,7 +2,7 @@ package codec import ( "context" - "io" + "encoding/json" ) // ReaderWriter represents a single stream @@ -27,10 +27,7 @@ type Reader interface { // Implementations must be safe for concurrent use. type Writer interface { // write json blob to stream - io.Writer - // Flush flushes the writer to the stream between messages - Flush() error - + Send(context.Context, json.RawMessage) error // Closed returns a channel which is closed when the connection is closed. Closed() <-chan struct{} // RemoteAddr returns the peer address of the connection. diff --git a/pkg/server/server.go b/pkg/server/server.go index 28fa1480d10923c06434e3117eee061d2cbaefba..45a9bee538288d1dc3b70bbd593f272a2dbfcb53 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -60,9 +60,6 @@ func (s *Server) ServeCodec(ctx context.Context, remote codec.ReaderWriter) erro case <-ctx.Done(): return nil case err := <-errch: - // perform a flush on error just in case there are dangling things to be sent, states to be cleaned up, etc. - // the connection is already dead, so at this point there are no rules, so this is okay to do i think - remote.Flush() return err } } @@ -157,10 +154,9 @@ type notifyEnv struct { func (c *callResponder) notify(ctx context.Context, env *notifyEnv) error { c.mu.Lock() defer c.mu.Unlock() - defer c.remote.Flush() enc := jx.GetEncoder() + enc.Reset() enc.Grow(4096) - enc.ResetWriter(c.remote) defer jx.PutEncoder(enc) //enc := jx.NewStreamingEncoder(c.remote, 4096) msg := &codec.Message{} @@ -181,7 +177,7 @@ func (c *callResponder) notify(ctx context.Context, env *notifyEnv) error { if err != nil { return err } - err = enc.Close() + err = c.remote.Send(ctx, enc.Bytes()) if err != nil { return err } @@ -196,7 +192,6 @@ type callEnv struct { func (c *callResponder) send(ctx context.Context, env *callEnv) (err error) { c.mu.Lock() defer c.mu.Unlock() - defer c.remote.Flush() // notification gets nothing // if all msgs in batch are notification, we trigger an allSkip and write nothing if env.batch { @@ -207,13 +202,13 @@ func (c *callResponder) send(ctx context.Context, env *callEnv) (err error) { } } if allSkip { - return nil + return c.remote.Send(ctx, nil) } } // create the streaming encoder enc := jx.GetEncoder() + enc.Reset() enc.Grow(4096) - enc.ResetWriter(c.remote) defer jx.PutEncoder(enc) if env.batch { enc.ArrStart() @@ -247,7 +242,7 @@ func (c *callResponder) send(ctx context.Context, env *callEnv) (err error) { if env.batch { enc.ArrEnd() } - err = enc.Close() + err = c.remote.Send(ctx, enc.Bytes()) if err != nil { return err } diff --git a/pkg/util/mapset/mapset.go b/pkg/util/mapset/mapset.go index ae1817a2cbfa488a35f7ff66e7bc9dcd107df79d..88a54c1fa8da0375edc333543c44dd108a7766a1 100644 --- a/pkg/util/mapset/mapset.go +++ b/pkg/util/mapset/mapset.go @@ -26,6 +26,8 @@ func (s *Set[T]) Remove(x T) { } func (s *Set[T]) Each(fn func(x T) bool) { + s.mu.RLock() + defer s.mu.RUnlock() for k := range s.m { if !fn(k) { return