From 4cc7cffa8dd99c2e2cdd7a3f69ef3e114fae5a7f Mon Sep 17 00:00:00 2001
From: a <a@tuxpa.in>
Date: Wed, 6 Sep 2023 11:07:47 -0500
Subject: [PATCH] chng

---
 contrib/codecs/broker/codec.go  | 11 +++++---
 contrib/codecs/http/codec.go    | 10 +++----
 contrib/codecs/inproc/inproc.go | 11 +++++---
 contrib/codecs/rdwr/codec.go    | 47 +++++++++++----------------------
 pkg/codec/transport.go          |  2 +-
 pkg/server/server.go            | 11 ++++++--
 6 files changed, 45 insertions(+), 47 deletions(-)

diff --git a/contrib/codecs/broker/codec.go b/contrib/codecs/broker/codec.go
index 4544183..ed7726c 100644
--- a/contrib/codecs/broker/codec.go
+++ b/contrib/codecs/broker/codec.go
@@ -46,14 +46,17 @@ func NewCodec(req json.RawMessage, replier func(json.RawMessage) error) *Codec {
 func (c *Codec) PeerInfo() codec.PeerInfo {
 	return c.i
 }
-func (c *Codec) ReadBatch(ctx context.Context, fn func([]*codec.Message, bool) error) error {
+func (c *Codec) ReadBatch(ctx context.Context, fn func([]*codec.Message, bool)) (func(), error) {
 	select {
 	case ans := <-c.ansCh:
-		return fn(ans.Messages, ans.Batch)
+		return func() {
+			fn(ans.Messages, ans.Batch)
+			return
+		}, nil
 	case <-ctx.Done():
-		return ctx.Err()
+		return nil, ctx.Err()
 	case <-c.ctx.Done():
-		return c.ctx.Err()
+		return nil, c.ctx.Err()
 	}
 }
 
diff --git a/contrib/codecs/http/codec.go b/contrib/codecs/http/codec.go
index 205fe34..a900a52 100644
--- a/contrib/codecs/http/codec.go
+++ b/contrib/codecs/http/codec.go
@@ -204,17 +204,17 @@ func (c *Codec) doRead() {
 	}()
 }
 
-func (c *Codec) ReadBatch(ctx context.Context, fn func([]*codec.Message, bool) error) error {
+func (c *Codec) ReadBatch(ctx context.Context, fn func([]*codec.Message, bool)) (func(), error) {
 	select {
 	case ans := <-c.msgs:
-		return fn(ans.Messages, ans.Batch)
+		return func() { fn(ans.Messages, ans.Batch) }, nil
 	case err := <-c.errCh:
 		http.Error(c.w, err.err.Error(), err.code)
-		return err.err
+		return nil, err.err
 	case <-ctx.Done():
-		return ctx.Err()
+		return nil, ctx.Err()
 	case <-c.ctx.Done():
-		return c.ctx.Err()
+		return nil, c.ctx.Err()
 	}
 }
 
diff --git a/contrib/codecs/inproc/inproc.go b/contrib/codecs/inproc/inproc.go
index d0be82d..801cb0f 100644
--- a/contrib/codecs/inproc/inproc.go
+++ b/contrib/codecs/inproc/inproc.go
@@ -41,14 +41,17 @@ func (c *Codec) PeerInfo() codec.PeerInfo {
 	}
 }
 
-func (c *Codec) ReadBatch(ctx context.Context, fn func([]*codec.Message, bool) error) error {
+func (c *Codec) ReadBatch(ctx context.Context, fn func([]*codec.Message, bool)) (func(), error) {
 	select {
 	case ans := <-c.msgs:
-		return fn(ans.Messages, ans.Batch)
+		return func() {
+			fn(ans.Messages, ans.Batch)
+			return
+		}, nil
 	case <-ctx.Done():
-		return ctx.Err()
+		return nil, ctx.Err()
 	case <-c.ctx.Done():
-		return c.ctx.Err()
+		return nil, c.ctx.Err()
 	}
 }
 
diff --git a/contrib/codecs/rdwr/codec.go b/contrib/codecs/rdwr/codec.go
index 8bd895a..e8c3c5e 100644
--- a/contrib/codecs/rdwr/codec.go
+++ b/contrib/codecs/rdwr/codec.go
@@ -1,7 +1,6 @@
 package rdwr
 
 import (
-	"bufio"
 	"bytes"
 	"context"
 	"io"
@@ -11,13 +10,14 @@ import (
 
 	"gfx.cafe/open/jrpc/pkg/codec"
 	"gfx.cafe/open/jrpc/pkg/serverutil"
+	"gfx.cafe/util/go/bytepool"
 )
 
 type Codec struct {
 	ctx context.Context
 	cn  func()
 
-	rd     io.Reader
+	dec    *json.Decoder
 	wrLock sync.Mutex
 	wr     *bytes.Buffer
 	w      io.Writer
@@ -29,34 +29,14 @@ func NewCodec(rd io.Reader, wr io.Writer, onError func(error)) *Codec {
 	c := &Codec{
 		ctx:  ctx,
 		cn:   cn,
-		rd:   bufio.NewReader(rd),
+		dec:  json.NewDecoder(rd),
 		wr:   new(bytes.Buffer),
 		w:    wr,
 		msgs: make(chan *serverutil.Bundle, 8),
 	}
-	go func() {
-		err := c.listen()
-		if err != nil && onError != nil {
-			onError(err)
-		}
-	}()
 	return c
 }
 
-func (c *Codec) listen() error {
-	dec := json.NewDecoder(c.rd)
-	for {
-		// reading a message
-		err := dec.Decode(&msg)
-		if err != nil {
-			c.cn()
-			return err
-		}
-		c.msgs <- serverutil.ParseBundle(msg)
-		msg = msg[:0]
-	}
-}
-
 // gets the peer info
 func (c *Codec) PeerInfo() codec.PeerInfo {
 	return codec.PeerInfo{
@@ -66,15 +46,20 @@ func (c *Codec) PeerInfo() codec.PeerInfo {
 	}
 }
 
-func (c *Codec) ReadBatch(ctx context.Context, fn func([]*codec.Message, bool) error) error {
-	select {
-	case ans := <-c.msgs:
-		return fn(ans.Messages, ans.Batch)
-	case <-ctx.Done():
-		return ctx.Err()
-	case <-c.ctx.Done():
-		return c.ctx.Err()
+func (c *Codec) ReadBatch(ctx context.Context, fn func([]*codec.Message, bool)) (func(), error) {
+	ctx, cn := context.WithCancel(ctx)
+	context.AfterFunc(c.ctx, cn)
+	var msg json.RawMessage
+	msg = bytepool.GetStd()
+	err := c.dec.DecodeContext(ctx, &msg)
+	if err != nil {
+		return nil, err
 	}
+	return func() {
+		defer bytepool.PutStd(msg)
+		ans := serverutil.ParseBundle(msg)
+		fn(ans.Messages, ans.Batch)
+	}, nil
 }
 
 // closes the connection
diff --git a/pkg/codec/transport.go b/pkg/codec/transport.go
index 900e1aa..1e77379 100644
--- a/pkg/codec/transport.go
+++ b/pkg/codec/transport.go
@@ -16,7 +16,7 @@ type Reader interface {
 	// gets the peer info
 	PeerInfo() PeerInfo
 	// json.RawMessage can be an array of requests. if it is, then it is a batch request
-	ReadBatch(ctx context.Context, fn func(msgs []*Message, batch bool)) (err error)
+	ReadBatch(ctx context.Context, fn func(msgs []*Message, batch bool)) (func(), error)
 	// closes the connection
 	Close() error
 }
diff --git a/pkg/server/server.go b/pkg/server/server.go
index 51e17b2..eb68512 100644
--- a/pkg/server/server.go
+++ b/pkg/server/server.go
@@ -151,6 +151,7 @@ func (s *Server) ServeCodec(pctx context.Context, remote codec.ReaderWriter) {
 			s.errorHandler(remote, err)
 		}
 	}()
+	wg := sync.WaitGroup{}
 	handleCodecMessage := s.codecHandler(ctx, remote, responder)
 	for {
 		select {
@@ -158,13 +159,19 @@ func (s *Server) ServeCodec(pctx context.Context, remote codec.ReaderWriter) {
 			remote.Close()
 		default:
 		}
-		err := remote.ReadBatch(ctx, handleCodecMessage)
+		call, err := remote.ReadBatch(ctx, handleCodecMessage)
 		if err != nil {
 			remote.Flush()
 			s.errorHandler(remote, err)
-			return
+			break
 		}
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			call()
+		}()
 	}
+	wg.Wait()
 }
 
 // Stop stops reading new requests, waits for stopPendingRequestTimeout to allow pending
-- 
GitLab