From ad73fc67d8dda32f0b6c035389d3429cc068b1ad Mon Sep 17 00:00:00 2001 From: a <a@tuxpa.in> Date: Sun, 29 Oct 2023 21:10:13 -0500 Subject: [PATCH] reader --- contrib/codecs/http/client.go | 9 ++------- contrib/codecs/rdwr/client.go | 4 ++-- pkg/clientutil/helper.go | 2 +- pkg/clientutil/helper_test.go | 7 +++---- pkg/clientutil/idreply.go | 8 ++++---- pkg/clientutil/idreply_test.go | 14 ++++++-------- pkg/codec/json.go | 35 +++++++++++++++------------------- 7 files changed, 33 insertions(+), 46 deletions(-) diff --git a/contrib/codecs/http/client.go b/contrib/codecs/http/client.go index a707be4..24d1d7a 100644 --- a/contrib/codecs/http/client.go +++ b/contrib/codecs/http/client.go @@ -98,8 +98,8 @@ func (c *Client) Do(ctx context.Context, result any, method string, params any) if msg.Error != nil { return msg.Error } - if result != nil && len(msg.Result) > 0 { - err = json.Unmarshal(msg.Result, result) + if result != nil && msg.Result != nil { + err = json.NewDecoder(msg.Result).Decode(result) if err != nil { return err } @@ -148,11 +148,6 @@ func (c *Client) BatchCall(ctx context.Context, b ...*codec.BatchElem) error { defer resp.Body.Close() msgs := []*codec.Message{} - for i := 0; i < len(ids); i++ { - msg := clientutil.GetMessage() - defer clientutil.PutMessage(msg) - msgs = append(msgs, msg) - } err = json.NewDecoder(resp.Body).Decode(&msgs) if err != nil { return err diff --git a/contrib/codecs/rdwr/client.go b/contrib/codecs/rdwr/client.go index c9fa522..6224631 100644 --- a/contrib/codecs/rdwr/client.go +++ b/contrib/codecs/rdwr/client.go @@ -129,7 +129,7 @@ func (c *Client) Do(ctx context.Context, result any, method string, params any) return err } if result != nil { - err = json.Unmarshal(ans, result) + err = json.NewDecoder(ans).Decode(result) if err != nil { return err } @@ -176,7 +176,7 @@ func (c *Client) BatchCall(ctx context.Context, b ...*codec.BatchElem) error { return } if b[idx].Result != nil { - err = json.Unmarshal(ans, b[idx].Result) + err = json.NewDecoder(ans).Decode(b[idx].Result) if err != nil { b[idx].Error = err return diff --git a/pkg/clientutil/helper.go b/pkg/clientutil/helper.go index c139aa0..1519b5b 100644 --- a/pkg/clientutil/helper.go +++ b/pkg/clientutil/helper.go @@ -44,7 +44,7 @@ func FillBatch(ids map[int]int, msgs []*codec.Message, b []*codec.BatchElem) { if b[idx].Result == nil { continue } - err := json.Unmarshal(ans.Result, b[idx].Result) + err := json.NewDecoder(ans.Result).Decode(b[idx].Result) if err != nil { b[idx].Error = err } diff --git a/pkg/clientutil/helper_test.go b/pkg/clientutil/helper_test.go index ed765ee..134cda5 100644 --- a/pkg/clientutil/helper_test.go +++ b/pkg/clientutil/helper_test.go @@ -1,7 +1,6 @@ package clientutil import ( - "encoding/json" "testing" "github.com/stretchr/testify/assert" @@ -18,16 +17,16 @@ func TestFillBatch(t *testing.T) { msgs := []*codec.Message{ { ID: ptr(codec.ID(`"5"`)), - Result: json.RawMessage(`["test", "abc", "123"]`), + Result: codec.NewStringReader(`["test", "abc", "123"]`), }, { ID: ptr(codec.ID(`"6"`)), - Result: json.RawMessage(`12345`), + Result: codec.NewStringReader(`12345`), }, {}, { ID: ptr(codec.ID(`"7"`)), - Result: json.RawMessage(`"abcdefgh"`), + Result: codec.NewStringReader(`"abcdefgh"`), }, } ids := map[int]int{ diff --git a/pkg/clientutil/idreply.go b/pkg/clientutil/idreply.go index 86c0e4f..f30fefe 100644 --- a/pkg/clientutil/idreply.go +++ b/pkg/clientutil/idreply.go @@ -2,7 +2,7 @@ package clientutil import ( "context" - "encoding/json" + "io" "sync" "sync/atomic" @@ -17,7 +17,7 @@ type IdReply struct { } type msgOrError struct { - msg json.RawMessage + msg io.ReadCloser err error } @@ -69,7 +69,7 @@ func (i *IdReply) remove(id []byte) { delete(i.chs, string(id)) } -func (i *IdReply) Resolve(id []byte, msg json.RawMessage, err error) { +func (i *IdReply) Resolve(id []byte, msg io.ReadCloser, err error) { ch := i.makeOrTake(id) if ch == nil { return @@ -87,7 +87,7 @@ func (i *IdReply) Resolve(id []byte, msg json.RawMessage, err error) { } -func (i *IdReply) Ask(ctx context.Context, id []byte) (json.RawMessage, error) { +func (i *IdReply) Ask(ctx context.Context, id []byte) (io.ReadCloser, error) { select { case resp := <-i.makeOrTake(id): return resp.msg, resp.err diff --git a/pkg/clientutil/idreply_test.go b/pkg/clientutil/idreply_test.go index 79ddf31..a0ced8d 100644 --- a/pkg/clientutil/idreply_test.go +++ b/pkg/clientutil/idreply_test.go @@ -1,13 +1,13 @@ package clientutil import ( - "bytes" "context" - "encoding/json" + "io" "sync" "testing" "gfx.cafe/open/jrpc/pkg/codec" + "github.com/stretchr/testify/require" ) const count = 1000 @@ -15,7 +15,7 @@ const count = 1000 func TestIdReply(t *testing.T) { reply := NewIdReply() - testMessage := json.RawMessage("{\"test\": 123}") + testMessage := "{\"test\": 123}" var wg sync.WaitGroup @@ -31,16 +31,14 @@ func TestIdReply(t *testing.T) { return } - if !bytes.Equal(v, testMessage) { - t.Error("expected contents to be equal") - return - } + x, _ := io.ReadAll(v) + require.EqualValues(t, testMessage, string(x)) }() } for i := 0; i < count; i++ { go func(id int) { - reply.Resolve(codec.NewNumberID(int64(id+1)), testMessage, nil) + reply.Resolve(codec.NewNumberID(int64(id+1)), codec.NewStringReader(testMessage), nil) }(i) } diff --git a/pkg/codec/json.go b/pkg/codec/json.go index 84662e0..2c0dca2 100644 --- a/pkg/codec/json.go +++ b/pkg/codec/json.go @@ -4,7 +4,9 @@ import ( "bytes" "encoding/json" "fmt" + "io" "strconv" + "strings" "github.com/go-faster/jx" ) @@ -23,12 +25,17 @@ type Message struct { ID *ID `json:"id,omitempty"` Method string `json:"method,omitempty"` Params json.RawMessage `json:"params,omitempty"` - Result json.RawMessage `json:"result,omitempty"` Error error `json:"error,omitempty"` + Result io.ReadCloser `json:"result,omitempty"` + ExtraFields ExtraFields `json:"-"` } +func NewStringReader(x string) io.ReadCloser { + return io.NopCloser(strings.NewReader(x)) +} + func MarshalMessage(m *Message, enc *jx.Encoder) error { // use encoder fail := enc.Obj(func(e *jx.Encoder) { @@ -61,9 +68,9 @@ func MarshalMessage(m *Message, enc *jx.Encoder) error { e.Raw(m.Params) }) } - if len(m.Result) != 0 { + if m.Result != nil { e.Field("result", func(e *jx.Encoder) { - e.Raw(m.Result) + io.Copy(e, m.Result) }) } }) @@ -110,29 +117,18 @@ func UnmarshalMessage(m *Message, dec *jx.Decoder) error { case "method": m.Method, err = d.Str() case "params": - val, err := d.Raw() - if err != nil { - return err - } - buf := bytes.NewBuffer(m.Params) - buf.Reset() - _, err = buf.Write(val) + val, err := d.RawAppend(nil) if err != nil { return err } - m.Params = buf.Bytes() + m.Params = json.RawMessage(val) case "result": - val, err := d.Raw() - if err != nil { - return err - } - buf := bytes.NewBuffer(m.Result) - buf.Reset() - _, err = buf.Write(val) + // allocate full result :) + val, err := d.RawAppend(nil) if err != nil { return err } - m.Result = buf.Bytes() + m.Result = io.NopCloser(bytes.NewBuffer(val)) case "error": val, err := d.Raw() if err != nil { @@ -253,7 +249,6 @@ func ParseMessage(in json.RawMessage) ([]*Message, bool) { // Message. func ReadMessage(dec *jx.Decoder) ([]*Message, bool) { msgs := []*Message{{}} - switch dec.Next() { case jx.Object: _ = UnmarshalMessage(msgs[0], dec) -- GitLab