diff --git a/contrib/codecs/broker/codec.go b/contrib/codecs/broker/codec.go index 454418352553ba617bcbef5ce5f062c5dfb8fe30..ed7726ced28a1349939ba46709a7b90ec4a51d21 100644 --- a/contrib/codecs/broker/codec.go +++ b/contrib/codecs/broker/codec.go @@ -46,14 +46,17 @@ func NewCodec(req json.RawMessage, replier func(json.RawMessage) error) *Codec { func (c *Codec) PeerInfo() codec.PeerInfo { return c.i } -func (c *Codec) ReadBatch(ctx context.Context, fn func([]*codec.Message, bool) error) error { +func (c *Codec) ReadBatch(ctx context.Context, fn func([]*codec.Message, bool)) (func(), error) { select { case ans := <-c.ansCh: - return fn(ans.Messages, ans.Batch) + return func() { + fn(ans.Messages, ans.Batch) + return + }, nil case <-ctx.Done(): - return ctx.Err() + return nil, ctx.Err() case <-c.ctx.Done(): - return c.ctx.Err() + return nil, c.ctx.Err() } } diff --git a/contrib/codecs/http/codec.go b/contrib/codecs/http/codec.go index 205fe34d41964f9f0c0de7262b1e96ced9096ee6..a900a5236542c9f39b7028151985032a91dd542b 100644 --- a/contrib/codecs/http/codec.go +++ b/contrib/codecs/http/codec.go @@ -204,17 +204,17 @@ func (c *Codec) doRead() { }() } -func (c *Codec) ReadBatch(ctx context.Context, fn func([]*codec.Message, bool) error) error { +func (c *Codec) ReadBatch(ctx context.Context, fn func([]*codec.Message, bool)) (func(), error) { select { case ans := <-c.msgs: - return fn(ans.Messages, ans.Batch) + return func() { fn(ans.Messages, ans.Batch) }, nil case err := <-c.errCh: http.Error(c.w, err.err.Error(), err.code) - return err.err + return nil, err.err case <-ctx.Done(): - return ctx.Err() + return nil, ctx.Err() case <-c.ctx.Done(): - return c.ctx.Err() + return nil, c.ctx.Err() } } diff --git a/contrib/codecs/inproc/inproc.go b/contrib/codecs/inproc/inproc.go index d0be82d438bdc4e5f6dd34ffb16340033570c83e..801cb0fcc8f0bdf4835bb348d3710b5527bc4611 100644 --- a/contrib/codecs/inproc/inproc.go +++ b/contrib/codecs/inproc/inproc.go @@ -41,14 +41,17 @@ func (c *Codec) PeerInfo() codec.PeerInfo { } } -func (c *Codec) ReadBatch(ctx context.Context, fn func([]*codec.Message, bool) error) error { +func (c *Codec) ReadBatch(ctx context.Context, fn func([]*codec.Message, bool)) (func(), error) { select { case ans := <-c.msgs: - return fn(ans.Messages, ans.Batch) + return func() { + fn(ans.Messages, ans.Batch) + return + }, nil case <-ctx.Done(): - return ctx.Err() + return nil, ctx.Err() case <-c.ctx.Done(): - return c.ctx.Err() + return nil, c.ctx.Err() } } diff --git a/contrib/codecs/rdwr/codec.go b/contrib/codecs/rdwr/codec.go index 8bd895af4afcfd07f297b41c4bb0e43fc4f94e49..e8c3c5e5a71bf25baf63b78cbd22b5b3aa660565 100644 --- a/contrib/codecs/rdwr/codec.go +++ b/contrib/codecs/rdwr/codec.go @@ -1,7 +1,6 @@ package rdwr import ( - "bufio" "bytes" "context" "io" @@ -11,13 +10,14 @@ import ( "gfx.cafe/open/jrpc/pkg/codec" "gfx.cafe/open/jrpc/pkg/serverutil" + "gfx.cafe/util/go/bytepool" ) type Codec struct { ctx context.Context cn func() - rd io.Reader + dec *json.Decoder wrLock sync.Mutex wr *bytes.Buffer w io.Writer @@ -29,34 +29,14 @@ func NewCodec(rd io.Reader, wr io.Writer, onError func(error)) *Codec { c := &Codec{ ctx: ctx, cn: cn, - rd: bufio.NewReader(rd), + dec: json.NewDecoder(rd), wr: new(bytes.Buffer), w: wr, msgs: make(chan *serverutil.Bundle, 8), } - go func() { - err := c.listen() - if err != nil && onError != nil { - onError(err) - } - }() return c } -func (c *Codec) listen() error { - dec := json.NewDecoder(c.rd) - for { - // reading a message - err := dec.Decode(&msg) - if err != nil { - c.cn() - return err - } - c.msgs <- serverutil.ParseBundle(msg) - msg = msg[:0] - } -} - // gets the peer info func (c *Codec) PeerInfo() codec.PeerInfo { return codec.PeerInfo{ @@ -66,15 +46,20 @@ func (c *Codec) PeerInfo() codec.PeerInfo { } } -func (c *Codec) ReadBatch(ctx context.Context, fn func([]*codec.Message, bool) error) error { - select { - case ans := <-c.msgs: - return fn(ans.Messages, ans.Batch) - case <-ctx.Done(): - return ctx.Err() - case <-c.ctx.Done(): - return c.ctx.Err() +func (c *Codec) ReadBatch(ctx context.Context, fn func([]*codec.Message, bool)) (func(), error) { + ctx, cn := context.WithCancel(ctx) + context.AfterFunc(c.ctx, cn) + var msg json.RawMessage + msg = bytepool.GetStd() + err := c.dec.DecodeContext(ctx, &msg) + if err != nil { + return nil, err } + return func() { + defer bytepool.PutStd(msg) + ans := serverutil.ParseBundle(msg) + fn(ans.Messages, ans.Batch) + }, nil } // closes the connection diff --git a/pkg/codec/transport.go b/pkg/codec/transport.go index 900e1aa6d2e61d9ffac872fafbc18fbf0d9266bb..1e77379b088c4d523007624ec93174abc8df671b 100644 --- a/pkg/codec/transport.go +++ b/pkg/codec/transport.go @@ -16,7 +16,7 @@ type Reader interface { // gets the peer info PeerInfo() PeerInfo // json.RawMessage can be an array of requests. if it is, then it is a batch request - ReadBatch(ctx context.Context, fn func(msgs []*Message, batch bool)) (err error) + ReadBatch(ctx context.Context, fn func(msgs []*Message, batch bool)) (func(), error) // closes the connection Close() error } diff --git a/pkg/server/server.go b/pkg/server/server.go index 51e17b26f1b6a02f24355fc12c84f98ecd82a099..eb685127cd2e285e327cb7bc4e6f6144c91f679c 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -151,6 +151,7 @@ func (s *Server) ServeCodec(pctx context.Context, remote codec.ReaderWriter) { s.errorHandler(remote, err) } }() + wg := sync.WaitGroup{} handleCodecMessage := s.codecHandler(ctx, remote, responder) for { select { @@ -158,13 +159,19 @@ func (s *Server) ServeCodec(pctx context.Context, remote codec.ReaderWriter) { remote.Close() default: } - err := remote.ReadBatch(ctx, handleCodecMessage) + call, err := remote.ReadBatch(ctx, handleCodecMessage) if err != nil { remote.Flush() s.errorHandler(remote, err) - return + break } + wg.Add(1) + go func() { + defer wg.Done() + call() + }() } + wg.Wait() } // Stop stops reading new requests, waits for stopPendingRequestTimeout to allow pending