From e81137d4ff8485e760548c0cabf7384b5098f8f0 Mon Sep 17 00:00:00 2001
From: a <a@tuxpa.in>
Date: Wed, 7 Jun 2023 05:11:03 -0500
Subject: [PATCH] rdwr

---
 contrib/codecs/rdwr/client.go                 |  6 +++-
 contrib/codecs/rdwr/{inproc.go => codec.go}   | 22 ++++++++++---
 contrib/codecs/rdwr/codec_test.go             | 11 ++++---
 .../rdwr/{inproc_test.go => rdwr_test.go}     | 14 ++++++---
 contrib/codecs/rdwr/rw.go                     | 31 -------------------
 pkg/clientutil/idreply.go                     |  2 --
 6 files changed, 39 insertions(+), 47 deletions(-)
 rename contrib/codecs/rdwr/{inproc.go => codec.go} (82%)
 rename contrib/codecs/rdwr/{inproc_test.go => rdwr_test.go} (62%)
 delete mode 100644 contrib/codecs/rdwr/rw.go

diff --git a/contrib/codecs/rdwr/client.go b/contrib/codecs/rdwr/client.go
index d1d22e6..0827a7a 100644
--- a/contrib/codecs/rdwr/client.go
+++ b/contrib/codecs/rdwr/client.go
@@ -1,4 +1,4 @@
-package inproc
+package rdwr
 
 import (
 	"bytes"
@@ -164,6 +164,7 @@ func (c *Client) SetHeader(key string, value string) {
 }
 
 func (c *Client) Close() error {
+	c.cn()
 	return nil
 }
 
@@ -174,11 +175,14 @@ func (c *Client) writeContext(ctx context.Context, xs []byte) error {
 		select {
 		case errch <- err:
 		case <-ctx.Done():
+		case <-c.ctx.Done():
 		}
 	}()
 	select {
 	case err := <-errch:
 		return err
+	case <-c.ctx.Done():
+		return c.ctx.Err()
 	case <-ctx.Done():
 		return ctx.Err()
 	}
diff --git a/contrib/codecs/rdwr/inproc.go b/contrib/codecs/rdwr/codec.go
similarity index 82%
rename from contrib/codecs/rdwr/inproc.go
rename to contrib/codecs/rdwr/codec.go
index b7f2e51..bb49256 100644
--- a/contrib/codecs/rdwr/inproc.go
+++ b/contrib/codecs/rdwr/codec.go
@@ -1,4 +1,4 @@
-package inproc
+package rdwr
 
 import (
 	"bufio"
@@ -20,13 +20,27 @@ type Codec struct {
 
 func NewCodec(rd io.Reader, wr io.Writer) *Codec {
 	ctx, cn := context.WithCancel(context.TODO())
-	return &Codec{
+	c := &Codec{
 		ctx:  ctx,
 		cn:   cn,
 		rd:   bufio.NewReader(rd),
 		wr:   bufio.NewWriter(wr),
 		msgs: make(chan json.RawMessage, 8),
 	}
+	go c.listen()
+	return c
+}
+
+func (c *Codec) listen() {
+	var msg json.RawMessage
+	for {
+		err := json.NewDecoder(c.rd).Decode(&msg)
+		if err != nil {
+			c.cn()
+			return
+		}
+		c.msgs <- msg
+	}
 }
 
 // gets the peer info
@@ -74,8 +88,8 @@ func (c *Codec) RemoteAddr() string {
 	return ""
 }
 
-// DialInProc attaches an in-process connection to the given RPC server.
-//func DialInProc(handler *Server) *Client {
+// Dialrdwr attaches an in-process connection to the given RPC server.
+//func Dialrdwr(handler *Server) *Client {
 //	initctx := context.Background()
 //	c, _ := newClient(initctx, func(context.Context) (ServerCodec, error) {
 //		p1, p2 := net.Pipe()
diff --git a/contrib/codecs/rdwr/codec_test.go b/contrib/codecs/rdwr/codec_test.go
index 694667c..2f919de 100644
--- a/contrib/codecs/rdwr/codec_test.go
+++ b/contrib/codecs/rdwr/codec_test.go
@@ -1,10 +1,11 @@
-package inproc_test
+package rdwr_test
 
 import (
 	"context"
+	"io"
 	"testing"
 
-	"gfx.cafe/open/jrpc/contrib/codecs/inproc"
+	"gfx.cafe/open/jrpc/contrib/codecs/rdwr"
 	"gfx.cafe/open/jrpc/pkg/codec"
 	"gfx.cafe/open/jrpc/pkg/server"
 
@@ -15,13 +16,15 @@ func TestBasicSuite(t *testing.T) {
 
 	jrpctest.RunBasicTestSuite(t, jrpctest.BasicTestSuiteArgs{
 		ServerMaker: func() (*server.Server, jrpctest.ClientMaker, func()) {
+			rd_s, wr_s := io.Pipe()
+			rd_c, wr_c := io.Pipe()
 			s := jrpctest.NewServer()
-			clientCodec := inproc.NewCodec()
+			clientCodec := rdwr.NewCodec(rd_c, wr_s)
 			go func() {
 				s.ServeCodec(context.Background(), clientCodec)
 			}()
 			return s, func() codec.Conn {
-				return inproc.NewClient(clientCodec, nil)
+				return rdwr.NewClient(rd_s, wr_c, nil)
 			}, func() {}
 		},
 	})
diff --git a/contrib/codecs/rdwr/inproc_test.go b/contrib/codecs/rdwr/rdwr_test.go
similarity index 62%
rename from contrib/codecs/rdwr/inproc_test.go
rename to contrib/codecs/rdwr/rdwr_test.go
index 439eaea..1b8921a 100644
--- a/contrib/codecs/rdwr/inproc_test.go
+++ b/contrib/codecs/rdwr/rdwr_test.go
@@ -1,24 +1,28 @@
-package inproc_test
+package rdwr_test
 
 import (
 	"context"
+	"io"
 	"testing"
 
-	"gfx.cafe/open/jrpc/contrib/codecs/inproc"
+	"gfx.cafe/open/jrpc/contrib/codecs/rdwr"
 	"gfx.cafe/open/jrpc/contrib/jmux"
 	"gfx.cafe/open/jrpc/pkg/server"
 
 	"github.com/stretchr/testify/require"
 )
 
-func TestInprocSetup(t *testing.T) {
+func TestRDWRSetup(t *testing.T) {
 	mux := jmux.NewMux()
 	srv := server.NewServer(mux)
 
 	ctx := context.Background()
 
-	clientCodec := inproc.NewCodec()
-	client := inproc.NewClient(clientCodec, nil)
+	rd_s, wr_s := io.Pipe()
+	rd_c, wr_c := io.Pipe()
+
+	clientCodec := rdwr.NewCodec(rd_s, wr_c)
+	client := rdwr.NewClient(rd_c, wr_s, nil)
 	go func() {
 		srv.ServeCodec(ctx, clientCodec)
 	}()
diff --git a/contrib/codecs/rdwr/rw.go b/contrib/codecs/rdwr/rw.go
deleted file mode 100644
index 27b6e7a..0000000
--- a/contrib/codecs/rdwr/rw.go
+++ /dev/null
@@ -1,31 +0,0 @@
-package inproc
-
-import (
-	"encoding/json"
-	"io"
-	"net/http"
-
-	"gfx.cafe/open/jrpc/pkg/codec"
-)
-
-type callRespWriter struct {
-	msg    *codec.Message
-	w      io.Writer
-	header http.Header
-}
-
-func (c *callRespWriter) Send(v any, err error) error {
-	return nil
-}
-
-func (c *callRespWriter) Option(k string, v any) {
-	// no options for now
-}
-
-func (c *callRespWriter) Header() http.Header {
-	return c.header
-}
-
-func (c *callRespWriter) Notify(v any) error {
-	return json.NewEncoder(w).Encode(v)
-}
diff --git a/pkg/clientutil/idreply.go b/pkg/clientutil/idreply.go
index 0677e9d..7cc6d55 100644
--- a/pkg/clientutil/idreply.go
+++ b/pkg/clientutil/idreply.go
@@ -3,7 +3,6 @@ package clientutil
 import (
 	"context"
 	"encoding/json"
-	"log"
 	"sync"
 	"sync/atomic"
 )
@@ -43,7 +42,6 @@ func (i *IdReply) makeOrTake(id int) chan msgOrError {
 }
 
 func (i *IdReply) Resolve(id int, msg json.RawMessage, err error) {
-	log.Println(err == nil)
 	if err != nil {
 		i.makeOrTake(id) <- msgOrError{
 			err: err,
-- 
GitLab