diff --git a/contrib/codecs/rdwr/client.go b/contrib/codecs/rdwr/client.go index d1d22e6207ef56a342843bd5668d7e7d58c3be69..0827a7a67b187cd02a5c6c2c01b26f1cf780339e 100644 --- a/contrib/codecs/rdwr/client.go +++ b/contrib/codecs/rdwr/client.go @@ -1,4 +1,4 @@ -package inproc +package rdwr import ( "bytes" @@ -164,6 +164,7 @@ func (c *Client) SetHeader(key string, value string) { } func (c *Client) Close() error { + c.cn() return nil } @@ -174,11 +175,14 @@ func (c *Client) writeContext(ctx context.Context, xs []byte) error { select { case errch <- err: case <-ctx.Done(): + case <-c.ctx.Done(): } }() select { case err := <-errch: return err + case <-c.ctx.Done(): + return c.ctx.Err() case <-ctx.Done(): return ctx.Err() } diff --git a/contrib/codecs/rdwr/inproc.go b/contrib/codecs/rdwr/codec.go similarity index 82% rename from contrib/codecs/rdwr/inproc.go rename to contrib/codecs/rdwr/codec.go index b7f2e5175e0b36ac50e3d27052d4ac2473244278..bb49256f4211a8d60cd14e7a4f4eff3d4e6078bd 100644 --- a/contrib/codecs/rdwr/inproc.go +++ b/contrib/codecs/rdwr/codec.go @@ -1,4 +1,4 @@ -package inproc +package rdwr import ( "bufio" @@ -20,13 +20,27 @@ type Codec struct { func NewCodec(rd io.Reader, wr io.Writer) *Codec { ctx, cn := context.WithCancel(context.TODO()) - return &Codec{ + c := &Codec{ ctx: ctx, cn: cn, rd: bufio.NewReader(rd), wr: bufio.NewWriter(wr), msgs: make(chan json.RawMessage, 8), } + go c.listen() + return c +} + +func (c *Codec) listen() { + var msg json.RawMessage + for { + err := json.NewDecoder(c.rd).Decode(&msg) + if err != nil { + c.cn() + return + } + c.msgs <- msg + } } // gets the peer info @@ -74,8 +88,8 @@ func (c *Codec) RemoteAddr() string { return "" } -// DialInProc attaches an in-process connection to the given RPC server. -//func DialInProc(handler *Server) *Client { +// Dialrdwr attaches an in-process connection to the given RPC server. +//func Dialrdwr(handler *Server) *Client { // initctx := context.Background() // c, _ := newClient(initctx, func(context.Context) (ServerCodec, error) { // p1, p2 := net.Pipe() diff --git a/contrib/codecs/rdwr/codec_test.go b/contrib/codecs/rdwr/codec_test.go index 694667c5d01055fbf4af9b5bb879f75790afe5b5..2f919dec7af3e422bcdde5a4dc81b9709d0d5a66 100644 --- a/contrib/codecs/rdwr/codec_test.go +++ b/contrib/codecs/rdwr/codec_test.go @@ -1,10 +1,11 @@ -package inproc_test +package rdwr_test import ( "context" + "io" "testing" - "gfx.cafe/open/jrpc/contrib/codecs/inproc" + "gfx.cafe/open/jrpc/contrib/codecs/rdwr" "gfx.cafe/open/jrpc/pkg/codec" "gfx.cafe/open/jrpc/pkg/server" @@ -15,13 +16,15 @@ func TestBasicSuite(t *testing.T) { jrpctest.RunBasicTestSuite(t, jrpctest.BasicTestSuiteArgs{ ServerMaker: func() (*server.Server, jrpctest.ClientMaker, func()) { + rd_s, wr_s := io.Pipe() + rd_c, wr_c := io.Pipe() s := jrpctest.NewServer() - clientCodec := inproc.NewCodec() + clientCodec := rdwr.NewCodec(rd_c, wr_s) go func() { s.ServeCodec(context.Background(), clientCodec) }() return s, func() codec.Conn { - return inproc.NewClient(clientCodec, nil) + return rdwr.NewClient(rd_s, wr_c, nil) }, func() {} }, }) diff --git a/contrib/codecs/rdwr/inproc_test.go b/contrib/codecs/rdwr/rdwr_test.go similarity index 62% rename from contrib/codecs/rdwr/inproc_test.go rename to contrib/codecs/rdwr/rdwr_test.go index 439eaeae2caf8d5d83341c69cbfe8387227df340..1b8921a95b5618085e5a8f35d19231c790840078 100644 --- a/contrib/codecs/rdwr/inproc_test.go +++ b/contrib/codecs/rdwr/rdwr_test.go @@ -1,24 +1,28 @@ -package inproc_test +package rdwr_test import ( "context" + "io" "testing" - "gfx.cafe/open/jrpc/contrib/codecs/inproc" + "gfx.cafe/open/jrpc/contrib/codecs/rdwr" "gfx.cafe/open/jrpc/contrib/jmux" "gfx.cafe/open/jrpc/pkg/server" "github.com/stretchr/testify/require" ) -func TestInprocSetup(t *testing.T) { +func TestRDWRSetup(t *testing.T) { mux := jmux.NewMux() srv := server.NewServer(mux) ctx := context.Background() - clientCodec := inproc.NewCodec() - client := inproc.NewClient(clientCodec, nil) + rd_s, wr_s := io.Pipe() + rd_c, wr_c := io.Pipe() + + clientCodec := rdwr.NewCodec(rd_s, wr_c) + client := rdwr.NewClient(rd_c, wr_s, nil) go func() { srv.ServeCodec(ctx, clientCodec) }() diff --git a/contrib/codecs/rdwr/rw.go b/contrib/codecs/rdwr/rw.go deleted file mode 100644 index 27b6e7a8292bd88dda1d436f9a5d9d9dc6f3035a..0000000000000000000000000000000000000000 --- a/contrib/codecs/rdwr/rw.go +++ /dev/null @@ -1,31 +0,0 @@ -package inproc - -import ( - "encoding/json" - "io" - "net/http" - - "gfx.cafe/open/jrpc/pkg/codec" -) - -type callRespWriter struct { - msg *codec.Message - w io.Writer - header http.Header -} - -func (c *callRespWriter) Send(v any, err error) error { - return nil -} - -func (c *callRespWriter) Option(k string, v any) { - // no options for now -} - -func (c *callRespWriter) Header() http.Header { - return c.header -} - -func (c *callRespWriter) Notify(v any) error { - return json.NewEncoder(w).Encode(v) -} diff --git a/pkg/clientutil/idreply.go b/pkg/clientutil/idreply.go index 0677e9d7b6420cc074ad1854695ecc26b4652a84..7cc6d553823dc62070e200eacdb30539a4426312 100644 --- a/pkg/clientutil/idreply.go +++ b/pkg/clientutil/idreply.go @@ -3,7 +3,6 @@ package clientutil import ( "context" "encoding/json" - "log" "sync" "sync/atomic" ) @@ -43,7 +42,6 @@ func (i *IdReply) makeOrTake(id int) chan msgOrError { } func (i *IdReply) Resolve(id int, msg json.RawMessage, err error) { - log.Println(err == nil) if err != nil { i.makeOrTake(id) <- msgOrError{ err: err,