From 365c5a9cd0fb00308e0c91695626d3661cc3f296 Mon Sep 17 00:00:00 2001 From: a <a@tuxpa.in> Date: Thu, 26 Oct 2023 05:20:33 -0500 Subject: [PATCH] a --- contrib/codecs/http/codec.go | 1 + contrib/codecs/rdwr/client.go | 4 ++- contrib/codecs/rdwr/codec.go | 12 ++++---- go.mod | 4 +++ go.sum | 4 +-- pkg/server/server.go | 54 ++++++++++++++++++++++++----------- 6 files changed, 53 insertions(+), 26 deletions(-) diff --git a/contrib/codecs/http/codec.go b/contrib/codecs/http/codec.go index c5c4c97..cedd651 100644 --- a/contrib/codecs/http/codec.go +++ b/contrib/codecs/http/codec.go @@ -223,6 +223,7 @@ func (c *Codec) Close() error { func (c *Codec) Send(fn func(e *jx.Encoder) error) error { defer c.cn() + defer c.jx.ResetWriter(c.wr) if err := fn(c.jx); err != nil { return err } diff --git a/contrib/codecs/rdwr/client.go b/contrib/codecs/rdwr/client.go index 283d632..98419d6 100644 --- a/contrib/codecs/rdwr/client.go +++ b/contrib/codecs/rdwr/client.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "io" + "log" "sync" "gfx.cafe/open/jrpc/pkg/clientutil" @@ -64,12 +65,13 @@ func (c *Client) Mount(h codec.Middleware) { func (c *Client) listen() error { var msg json.RawMessage defer c.cn() - dec := json.NewDecoder(c.rd) + dec := json.NewDecoder(bufio.NewReader(c.rd)) for { err := dec.Decode(&msg) if err != nil { return err } + log.Println("got", msg) msgs, _ := codec.ParseMessage(msg) for i := range msgs { v := msgs[i] diff --git a/contrib/codecs/rdwr/codec.go b/contrib/codecs/rdwr/codec.go index a071009..6c46003 100644 --- a/contrib/codecs/rdwr/codec.go +++ b/contrib/codecs/rdwr/codec.go @@ -4,6 +4,7 @@ import ( "bufio" "context" "io" + "os" "sync" "github.com/go-faster/jx" @@ -19,7 +20,7 @@ type Codec struct { rd io.Reader wrLock sync.Mutex - wr *bufio.Writer + wr io.Writer jx *jx.Encoder dec *json.Decoder @@ -29,14 +30,15 @@ type Codec struct { func NewCodec(rd io.Reader, wr io.Writer) *Codec { ctx, cn := context.WithCancel(context.TODO()) + mw := io.MultiWriter(wr, os.Stdout) c := &Codec{ ctx: ctx, cn: cn, rd: bufio.NewReader(rd), - wr: bufio.NewWriter(wr), + wr: mw, dec: json.NewDecoder(rd), } - c.jx = jx.NewStreamingEncoder(wr, 4096) + c.jx = jx.NewStreamingEncoder(mw, 4096) return c } @@ -81,15 +83,13 @@ func (c *Codec) Send(fn func(e *jx.Encoder) error) error { if err := fn(c.jx); err != nil { return err } + if err := c.jx.Close(); err != nil { return err } if _, err := c.wr.Write([]byte("\n")); err != nil { return err } - if err := c.wr.Flush(); err != nil { - return err - } return nil } diff --git a/go.mod b/go.mod index eda2be9..55ace5b 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,10 @@ module gfx.cafe/open/jrpc go 1.21 +replace github.com/goccy/go-json v0.10.2 => github.com/elee1766/go-json v0.10.2-1 + +replace github.com/go-faster/jx v1.1.0 => ../../../github.com/elee1766/jx + require ( gfx.cafe/open/websocket v1.9.2 gfx.cafe/util/go/bufpool v0.0.0-20230721185457-c559e86c829c diff --git a/go.sum b/go.sum index e2d6132..a465def 100644 --- a/go.sum +++ b/go.sum @@ -17,12 +17,12 @@ github.com/alecthomas/repr v0.1.0/go.mod h1:2kn6fqh/zIyPLmm3ugklbEi5hg5wS435eygv github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/elee1766/go-json v0.10.2-1 h1:AW2+8FAs2wVgRo90yz1l7pXKEiPxC71jquZNzgebkkc= +github.com/elee1766/go-json v0.10.2-1/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/go-faster/errors v0.6.1 h1:nNIPOBkprlKzkThvS/0YaX8Zs9KewLCOSFQS5BU06FI= github.com/go-faster/errors v0.6.1/go.mod h1:5MGV2/2T9yvlrbhe9pD9LO5Z/2zCSq2T8j+Jpi2LAyY= github.com/go-faster/jx v1.1.0 h1:ZsW3wD+snOdmTDy9eIVgQdjUpXRRV4rqW8NS3t+20bg= github.com/go-faster/jx v1.1.0/go.mod h1:vKDNikrKoyUmpzaJ0OkIkRQClNHFX/nF3dnTJZb3skg= -github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= -github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= diff --git a/pkg/server/server.go b/pkg/server/server.go index e2b9281..e006669 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -1,7 +1,6 @@ package server import ( - "bytes" "context" "sync" @@ -210,24 +209,45 @@ func (c *callResponder) send(ctx context.Context, env *callEnv) (err error) { if env.batch && v.skip { continue } - // if there is no error, we try to marshal the result - if msg.Error == nil { - buf := bufpool.GetStd() - defer bufpool.PutStd(buf) - je := json.NewEncoder(buf) - err = je.EncodeWithOption(v.dat) - if err != nil { - msg.Error = err + m := msg + enc.Obj(func(e *jx.Encoder) { + e.Field("jsonrpc", func(e *jx.Encoder) { + e.Str("2.0") + }) + if m.ID != nil { + e.Field("id", func(e *jx.Encoder) { + e.Raw(m.ID.RawMessage()) + }) + } + if m.Method != "" { + e.Field("method", func(e *jx.Encoder) { + e.Str(m.Method) + }) + } + for _, v := range m.ExtraFields { + e.Field(v.Name, func(e *jx.Encoder) { + e.Raw(v.Value) + }) + } + if m.Error != nil { + e.Field("error", func(e *jx.Encoder) { + codec.EncodeError(e, m.Error) + }) } else { - msg.Result = buf.Bytes() - msg.Result = bytes.TrimSuffix(msg.Result, []byte{'\n'}) + // if there is no error, we try to marshal the result + e.Field("result", func(e *jx.Encoder) { + if v.dat != nil { + err = json.NewEncoder(e).EncodeWithOption(v.dat, func(eo *json.EncodeOption) { + eo.DisableNewline = true + }) + if err != nil { + } + } else { + e.Null() + } + }) } - } - // then marshal the whole message into the stream - err := codec.MarshalMessage(msg, enc) - if err != nil { - return err - } + }) } if env.batch { enc.ArrEnd() -- GitLab