diff --git a/contrib/codecs/broker/broker_inproc.go b/contrib/codecs/broker/broker_inproc.go index 1339465edbe749813424dcf5a7543635f95b82ce..dcc5215be98638e47c5ba592c5a16a4a2b9bc583 100644 --- a/contrib/codecs/broker/broker_inproc.go +++ b/contrib/codecs/broker/broker_inproc.go @@ -3,6 +3,7 @@ package broker import ( "context" "encoding/json" + "log" "strings" "sync" "sync/atomic" @@ -65,10 +66,13 @@ func (b *ChannelBroker) SetDroppedMessageHandler(fn func(string, []byte)) *Chann } func (b *ChannelBroker) ReadRequest(ctx context.Context) (json.RawMessage, func(json.RawMessage) error, error) { + log.Println("start recv") select { case <-ctx.Done(): + log.Println("doned") return nil, nil, ctx.Err() case f := <-b.msgs: + log.Println("recv", string(f.data)) return f.data, func(resp json.RawMessage) error { return b.Publish(context.Background(), f.topic, resp) }, nil @@ -80,6 +84,7 @@ func (b *ChannelBroker) WriteRequest(ctx context.Context, topic string, msg json case <-ctx.Done(): return ctx.Err() case b.msgs <- &frame{data: msg, topic: topic}: + log.Println("wrote", string(msg)) } return nil } diff --git a/contrib/codecs/broker/codec.go b/contrib/codecs/broker/codec.go index b372afa27c17f06c6bc24c80d91db3a3b7343bbc..5db312c0885bfe617f75284aa0edefaed1e27829 100644 --- a/contrib/codecs/broker/codec.go +++ b/contrib/codecs/broker/codec.go @@ -67,15 +67,8 @@ func (c *Codec) Close() error { return nil } -func (c *Codec) Write(p []byte) (n int, err error) { - return c.wr.Write(p) -} - -func (c *Codec) Flush() (err error) { - c.replier(c.wr.Bytes()) - c.wr.Reset() - c.Close() - return +func (c *Codec) Send(ctx context.Context, msg json.RawMessage) (err error) { + return c.replier(msg) } // Closed returns a channel which is closed when the connection is closed. diff --git a/contrib/codecs/http/codec.go b/contrib/codecs/http/codec.go index 4e2f1e5c6fdfc966da26fb0e9de996a4234a19f1..426e9ff27d3012d88bcc22ed46e087cf086a4c69 100644 --- a/contrib/codecs/http/codec.go +++ b/contrib/codecs/http/codec.go @@ -4,6 +4,7 @@ import ( "bufio" "context" "encoding/base64" + "encoding/json" "errors" "fmt" "io" @@ -224,17 +225,12 @@ func (c *Codec) Close() error { return nil } -func (c *Codec) Write(p []byte) (n int, err error) { - return c.wr.Write(p) -} - -func (c *Codec) Flush() (err error) { - err = c.wr.Flush() +func (c *Codec) Send(ctx context.Context, msg json.RawMessage) (err error) { + _, err = c.wr.Write(msg) if err != nil { return err } - c.cn() - return + return c.wr.Flush() } // Closed returns a channel which is closed when the connection is closed. diff --git a/contrib/codecs/inproc/inproc.go b/contrib/codecs/inproc/inproc.go index b7e60e818270f3a87aebc13454eba98bc932737d..5a79a5885f47a5d13d3b3696a4e68512a649d52e 100644 --- a/contrib/codecs/inproc/inproc.go +++ b/contrib/codecs/inproc/inproc.go @@ -3,6 +3,7 @@ package inproc import ( "bufio" "context" + "encoding/json" "io" "sync" @@ -58,15 +59,13 @@ func (c *Codec) Close() error { return nil } -func (c *Codec) Write(p []byte) (n int, err error) { - c.wrLock.Lock() - defer c.wrLock.Unlock() - return c.wr.Write(p) -} - -func (c *Codec) Flush() (err error) { +func (c *Codec) Send(ctx context.Context, msg json.RawMessage) (err error) { c.wrLock.Lock() defer c.wrLock.Unlock() + _, err = c.wr.Write(msg) + if err != nil { + return err + } return c.wr.Flush() } diff --git a/contrib/codecs/rdwr/codec.go b/contrib/codecs/rdwr/codec.go index b50887c3d4be3df6f7b4efb4906ffc7e1d594eb9..1c8acfa451d7bbf10ec3427da16ec33b3b8dc72e 100644 --- a/contrib/codecs/rdwr/codec.go +++ b/contrib/codecs/rdwr/codec.go @@ -2,7 +2,6 @@ package rdwr import ( "bufio" - "bytes" "context" "io" "sync" @@ -19,7 +18,6 @@ type Codec struct { rd io.Reader wrLock sync.Mutex - wr *bytes.Buffer w io.Writer msgs chan *serverutil.Bundle @@ -36,7 +34,6 @@ func NewCodec(rd io.Reader, wr io.Writer) *Codec { cn: cn, rd: bufr, dec: json.NewDecoder(rd), - wr: new(bytes.Buffer), w: wr, msgs: make(chan *serverutil.Bundle, 8), } @@ -77,25 +74,11 @@ func (c *Codec) Close() error { return nil } -func (c *Codec) Write(p []byte) (n int, err error) { +func (c *Codec) Send(ctx context.Context, buf json.RawMessage) error { c.wrLock.Lock() defer c.wrLock.Unlock() - return c.wr.Write(p) -} - -func (c *Codec) Flush() (err error) { - c.wrLock.Lock() - defer c.wrLock.Unlock() - defer c.wr.Reset() - err = c.wr.WriteByte('\n') - if err != nil { - return err - } - _, err = c.wr.WriteTo(c.w) - if err != nil { - return err - } - return nil + _, err := c.w.Write(buf) + return err } // Closed returns a channel which is closed when the connection is closed. diff --git a/pkg/codec/transport.go b/pkg/codec/transport.go index a8393593f0c72310cff7398ad73ebf6d3e72f643..de6cc1f9f4e3a82e5bf74d5c12f48725b660ee25 100644 --- a/pkg/codec/transport.go +++ b/pkg/codec/transport.go @@ -2,7 +2,7 @@ package codec import ( "context" - "io" + "encoding/json" ) // ReaderWriter represents a single stream @@ -27,10 +27,7 @@ type Reader interface { // Implementations must be safe for concurrent use. type Writer interface { // write json blob to stream - io.Writer - // Flush flushes the writer to the stream between messages - Flush() error - + Send(context.Context, json.RawMessage) error // Closed returns a channel which is closed when the connection is closed. Closed() <-chan struct{} // RemoteAddr returns the peer address of the connection. diff --git a/pkg/server/server.go b/pkg/server/server.go index 9e63162c4af061a0df7c639954256301c2066498..a4baf3dd6f1961fc10db40f7151905472cbd9936 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -159,9 +159,6 @@ func (s *Server) ServeCodec(pctx context.Context, remote codec.ReaderWriter) err case <-ctx.Done(): return nil case err := <-errch: - // perform a flush on error just in case there are dangling things to be sent, states to be cleaned up, etc. - // the connection is already dead, so at this point there are no rules, so this is okay to do i think - remote.Flush() return err } } @@ -192,10 +189,9 @@ type notifyEnv struct { func (c *callResponder) notify(ctx context.Context, env *notifyEnv) error { c.mu.Lock() defer c.mu.Unlock() - defer c.remote.Flush() enc := jx.GetEncoder() + enc.Reset() enc.Grow(4096) - enc.ResetWriter(c.remote) defer jx.PutEncoder(enc) //enc := jx.NewStreamingEncoder(c.remote, 4096) msg := &codec.Message{} @@ -216,7 +212,7 @@ func (c *callResponder) notify(ctx context.Context, env *notifyEnv) error { if err != nil { return err } - err = enc.Close() + err = c.remote.Send(ctx, enc.Bytes()) if err != nil { return err } @@ -231,7 +227,6 @@ type callEnv struct { func (c *callResponder) send(ctx context.Context, env *callEnv) (err error) { c.mu.Lock() defer c.mu.Unlock() - defer c.remote.Flush() // notification gets nothing // if all msgs in batch are notification, we trigger an allSkip and write nothing if env.batch { @@ -247,8 +242,8 @@ func (c *callResponder) send(ctx context.Context, env *callEnv) (err error) { } // create the streaming encoder enc := jx.GetEncoder() + enc.Reset() enc.Grow(4096) - enc.ResetWriter(c.remote) defer jx.PutEncoder(enc) if env.batch { enc.ArrStart() @@ -282,7 +277,7 @@ func (c *callResponder) send(ctx context.Context, env *callEnv) (err error) { if env.batch { enc.ArrEnd() } - err = enc.Close() + err = c.remote.Send(ctx, enc.Bytes()) if err != nil { return err }