diff --git a/benchmark/suite_test.go b/benchmark/suite_test.go index 1a6824d1f240ed5a08919e06848c75b41f2ee388..6ecf448cb678cedf40dcbb7e5253e0449909b795 100644 --- a/benchmark/suite_test.go +++ b/benchmark/suite_test.go @@ -5,7 +5,6 @@ import ( "testing" "gfx.cafe/open/jrpc/contrib/codecs/http" - "gfx.cafe/open/jrpc/contrib/codecs/inproc" "gfx.cafe/open/jrpc/contrib/codecs/rdwr" "gfx.cafe/open/jrpc/contrib/codecs/websocket" "gfx.cafe/open/jrpc/pkg/codec" @@ -36,7 +35,6 @@ func BenchmarkSimpleSuite(b *testing.B) { makers := map[string]jrpctest.ServerMaker{ "Http": http.ServerMaker, "WebSocket": websocket.ServerMaker, - "InProc": inproc.ServerMaker, "IoPipe": rdwr.ServerMaker, } diff --git a/contrib/codecs/broker/broker.go b/contrib/codecs/broker/broker.go index bac583928eb6c44a2f6914b601c27423ad63a1cd..b234fb4cd6ac3763d93ddc8fc470d19c277fdf93 100644 --- a/contrib/codecs/broker/broker.go +++ b/contrib/codecs/broker/broker.go @@ -3,10 +3,12 @@ package broker import ( "context" "encoding/json" + + "github.com/go-faster/jx" ) type ServerSpoke interface { - ReadRequest(ctx context.Context) (json.RawMessage, func(json.RawMessage) error, error) + ReadRequest(ctx context.Context) (json.RawMessage, Replier, error) } type ClientSpoke interface { @@ -20,7 +22,13 @@ type Broker interface { } type Replier interface { - Send(json.RawMessage) + Send(fn func(*jx.Encoder) error) error +} + +type ReplierFunc func(fn func(*jx.Encoder) error) error + +func (r ReplierFunc) Send(fn func(*jx.Encoder) error) error { + return r(fn) } type Subscription interface { diff --git a/contrib/codecs/broker/broker_inproc.go b/contrib/codecs/broker/broker_inproc.go index 1339465edbe749813424dcf5a7543635f95b82ce..c91308f4496c830143982f999f11ec6a0abfbda8 100644 --- a/contrib/codecs/broker/broker_inproc.go +++ b/contrib/codecs/broker/broker_inproc.go @@ -6,6 +6,8 @@ import ( "strings" "sync" "sync/atomic" + + "github.com/go-faster/jx" ) type subscription struct { @@ -64,14 +66,19 @@ func (b *ChannelBroker) SetDroppedMessageHandler(fn func(string, []byte)) *Chann return b } -func (b *ChannelBroker) ReadRequest(ctx context.Context) (json.RawMessage, func(json.RawMessage) error, error) { +func (b *ChannelBroker) ReadRequest(ctx context.Context) (json.RawMessage, Replier, error) { select { case <-ctx.Done(): return nil, nil, ctx.Err() case f := <-b.msgs: - return f.data, func(resp json.RawMessage) error { - return b.Publish(context.Background(), f.topic, resp) - }, nil + return f.data, ReplierFunc(func(fn func(*jx.Encoder) error) error { + enc := &jx.Encoder{} + err := fn(enc) + if err != nil { + return err + } + return b.Publish(context.Background(), f.topic, json.RawMessage(enc.Bytes())) + }), nil } } diff --git a/contrib/codecs/broker/codec.go b/contrib/codecs/broker/codec.go index 5db312c0885bfe617f75284aa0edefaed1e27829..96542ca9f2aafaf6bcf04c88a44bc61244445779 100644 --- a/contrib/codecs/broker/codec.go +++ b/contrib/codecs/broker/codec.go @@ -8,6 +8,7 @@ import ( "gfx.cafe/open/jrpc/pkg/codec" "gfx.cafe/open/jrpc/pkg/serverutil" + "github.com/go-faster/jx" ) var _ codec.ReaderWriter = (*Codec)(nil) @@ -17,7 +18,7 @@ type Codec struct { cn func() wr bytes.Buffer - replier func(json.RawMessage) error + replier Replier ansCh chan *serverutil.Bundle closed atomic.Bool closeCh chan struct{} @@ -30,7 +31,7 @@ type httpError struct { err error } -func NewCodec(req json.RawMessage, replier func(json.RawMessage) error) *Codec { +func NewCodec(req json.RawMessage, replier Replier) *Codec { c := &Codec{ replier: replier, ansCh: make(chan *serverutil.Bundle, 1), @@ -67,8 +68,8 @@ func (c *Codec) Close() error { return nil } -func (c *Codec) Send(ctx context.Context, msg json.RawMessage) (err error) { - return c.replier(msg) +func (c *Codec) Send(fn func(e *jx.Encoder) error) error { + return c.replier.Send(fn) } // Closed returns a channel which is closed when the connection is closed. diff --git a/contrib/codecs/codecs.go b/contrib/codecs/codecs.go index dcab57dd448058bbe9a20f77249d36dc0dc6b447..e974765efeeba8942ffe7b4b6ebd6e986534db85 100644 --- a/contrib/codecs/codecs.go +++ b/contrib/codecs/codecs.go @@ -4,7 +4,6 @@ import ( "fmt" "gfx.cafe/open/jrpc/contrib/codecs/http" - "gfx.cafe/open/jrpc/contrib/codecs/inproc" "gfx.cafe/open/jrpc/contrib/codecs/websocket" "gfx.cafe/open/jrpc/pkg/server" @@ -12,7 +11,6 @@ import ( "net/url" ) -var NewInProc = inproc.NewCodec var WebsocketHandler = websocket.WebsocketHandler var HttpHandler = http.HttpHandler diff --git a/contrib/codecs/http/codec.go b/contrib/codecs/http/codec.go index 7e2ca915e07587a772cc2cfe2f22448ccb34a6e2..c5c4c9783f6c691fb30fdd850c82b8414d39bd22 100644 --- a/contrib/codecs/http/codec.go +++ b/contrib/codecs/http/codec.go @@ -3,7 +3,6 @@ package http import ( "context" "encoding/base64" - "encoding/json" "errors" "fmt" "io" @@ -14,6 +13,7 @@ import ( "gfx.cafe/open/jrpc/pkg/codec" "gfx.cafe/open/jrpc/pkg/serverutil" + "github.com/go-faster/jx" ) var _ codec.ReaderWriter = (*Codec)(nil) @@ -26,6 +26,7 @@ type Codec struct { r *http.Request w http.ResponseWriter wr io.Writer + jx *jx.Encoder msgs chan *serverutil.Bundle errCh chan httpError @@ -52,6 +53,7 @@ func (c *Codec) Reset(w http.ResponseWriter, r *http.Request) { c.w = w c.msgs = make(chan *serverutil.Bundle, 1) c.errCh = make(chan httpError, 1) + c.jx = jx.NewStreamingEncoder(w, 4096) ctx := c.r.Context() c.ctx, c.cn = context.WithCancel(ctx) @@ -219,13 +221,12 @@ func (c *Codec) Close() error { return nil } -func (c *Codec) Send(ctx context.Context, msg json.RawMessage) (err error) { +func (c *Codec) Send(fn func(e *jx.Encoder) error) error { defer c.cn() - _, err = c.wr.Write(msg) - if err != nil { + if err := fn(c.jx); err != nil { return err } - return nil + return c.jx.Close() } // Closed returns a channel which is closed when the connection is closed. diff --git a/contrib/codecs/inproc/client.go b/contrib/codecs/inproc/client.go deleted file mode 100644 index 7f019ef6abe0cf4611bddd5e6c7c1f0b09060584..0000000000000000000000000000000000000000 --- a/contrib/codecs/inproc/client.go +++ /dev/null @@ -1,195 +0,0 @@ -package inproc - -import ( - "context" - "encoding/json" - "sync" - - "gfx.cafe/open/jrpc/pkg/clientutil" - "gfx.cafe/open/jrpc/pkg/codec" - "gfx.cafe/open/jrpc/pkg/serverutil" -) - -type Client struct { - p *clientutil.IdReply - c *Codec - - ctx context.Context - cn context.CancelFunc - - m codec.Middlewares - handler codec.Handler - mu sync.Mutex -} - -func (c *Client) Mount(h codec.Middleware) { - c.mu.Lock() - defer c.mu.Unlock() - c.m = append(c.m, h) - c.handler = c.m.HandlerFunc(func(w codec.ResponseWriter, r *codec.Request) { - // do nothing on no handler - }) -} - -func NewClient(c *Codec, handler codec.Handler) *Client { - cl := &Client{ - p: clientutil.NewIdReply(), - c: c, - handler: handler, - } - cl.ctx, cl.cn = context.WithCancel(context.Background()) - go cl.listen() - - return cl -} - -func (c *Client) Closed() <-chan struct{} { - return c.ctx.Done() -} - -func (c *Client) listen() error { - var msg json.RawMessage - defer c.cn() - dec := json.NewDecoder(c.c.rd) - for { - err := dec.Decode(&msg) - if err != nil { - return err - } - msgs, _ := codec.ParseMessage(msg) - for i := range msgs { - v := msgs[i] - id := v.ID - if id == nil { - if c.handler != nil { - req := codec.NewRawRequest(c.c.ctx, - nil, - v.Method, - v.Params, - ) - req.Peer = codec.PeerInfo{ - Transport: "ipc", - RemoteAddr: "", - } - c.handler.ServeRPC(nil, req) - } - continue - } - var err error - if v.Error != nil { - err = v.Error - } - c.p.Resolve(*id, v.Result, err) - } - } - -} - -func (c *Client) Do(ctx context.Context, result any, method string, params any) error { - if ctx == nil { - ctx = context.Background() - } - dat, err := json.Marshal(params) - if err != nil { - return err - } - id := c.p.NextId() - fwd := &serverutil.Bundle{ - Messages: []*codec.Message{{ - ID: id, - Method: method, - Params: dat, - }}, - Batch: false, - } - select { - case c.c.msgs <- fwd: - case <-ctx.Done(): - return ctx.Err() - } - ans, err := c.p.Ask(ctx, *id) - if err != nil { - return err - } - if result != nil { - err = json.Unmarshal(ans, result) - if err != nil { - return err - } - } - return nil -} - -func (c *Client) BatchCall(ctx context.Context, b ...*codec.BatchElem) error { - if ctx == nil { - ctx = context.Background() - } - ids := make([]*codec.ID, 0, len(b)) - reqs := &serverutil.Bundle{Batch: true} - for _, v := range b { - id := c.p.NextId() - dat, err := json.Marshal(v.Params) - if err != nil { - return err - } - req := &codec.Message{ID: id, Method: v.Method, Params: dat} - ids = append(ids, id) - reqs.Messages = append(reqs.Messages, req) - } - c.c.msgs <- reqs - // TODO: wait for response - wg := sync.WaitGroup{} - wg.Add(len(ids)) - for i := range ids { - idx := i - go func() { - defer wg.Done() - ans, err := c.p.Ask(ctx, *ids[idx]) - if err != nil { - b[idx].Error = err - return - } - if b[idx].Result != nil { - err = json.Unmarshal(ans, b[idx].Result) - if err != nil { - b[idx].Error = err - return - } - } - }() - } - wg.Wait() - - return nil -} - -func (c *Client) SetHeader(key string, value string) { -} - -func (c *Client) Close() error { - return c.c.Close() -} - -func (c *Client) Notify(ctx context.Context, method string, params any) error { - if ctx == nil { - ctx = context.Background() - } - dat, err := json.Marshal(params) - if err != nil { - return err - } - msg := &serverutil.Bundle{ - Messages: []*codec.Message{{ - ID: nil, - Method: method, - Params: dat, - }}, - Batch: false, - } - select { - case c.c.msgs <- msg: - case <-ctx.Done(): - return ctx.Err() - } - return nil -} diff --git a/contrib/codecs/inproc/codec_test.go b/contrib/codecs/inproc/codec_test.go deleted file mode 100644 index 9deacc32de5ec624cacc0d1d176587927d99e620..0000000000000000000000000000000000000000 --- a/contrib/codecs/inproc/codec_test.go +++ /dev/null @@ -1,14 +0,0 @@ -package inproc_test - -import ( - "testing" - - "gfx.cafe/open/jrpc/contrib/codecs/inproc" - "gfx.cafe/open/jrpc/pkg/jrpctest" -) - -func TestBasicSuite(t *testing.T) { - jrpctest.RunBasicTestSuite(t, jrpctest.BasicTestSuiteArgs{ - ServerMaker: inproc.ServerMaker, - }) -} diff --git a/contrib/codecs/inproc/inproc.go b/contrib/codecs/inproc/inproc.go deleted file mode 100644 index 5a79a5885f47a5d13d3b3696a4e68512a649d52e..0000000000000000000000000000000000000000 --- a/contrib/codecs/inproc/inproc.go +++ /dev/null @@ -1,91 +0,0 @@ -package inproc - -import ( - "bufio" - "context" - "encoding/json" - "io" - "sync" - - "gfx.cafe/open/jrpc/pkg/codec" - "gfx.cafe/open/jrpc/pkg/serverutil" -) - -type Codec struct { - ctx context.Context - cn func() - - rd io.Reader - wrLock sync.Mutex - wr *bufio.Writer - msgs chan *serverutil.Bundle -} - -func NewCodec() *Codec { - rd, wr := io.Pipe() - ctx, cn := context.WithCancel(context.TODO()) - return &Codec{ - ctx: ctx, - cn: cn, - rd: bufio.NewReader(rd), - wr: bufio.NewWriter(wr), - msgs: make(chan *serverutil.Bundle, 8), - } -} - -// gets the peer info -func (c *Codec) PeerInfo() codec.PeerInfo { - return codec.PeerInfo{ - Transport: "ipc", - RemoteAddr: "", - HTTP: codec.HttpInfo{}, - } -} - -func (c *Codec) ReadBatch(ctx context.Context) ([]*codec.Message, bool, error) { - select { - case ans := <-c.msgs: - return ans.Messages, ans.Batch, nil - case <-ctx.Done(): - return nil, false, ctx.Err() - case <-c.ctx.Done(): - return nil, false, c.ctx.Err() - } -} - -// closes the connection -func (c *Codec) Close() error { - c.cn() - return nil -} - -func (c *Codec) Send(ctx context.Context, msg json.RawMessage) (err error) { - c.wrLock.Lock() - defer c.wrLock.Unlock() - _, err = c.wr.Write(msg) - if err != nil { - return err - } - return c.wr.Flush() -} - -// Closed returns a channel which is closed when the connection is closed. -func (c *Codec) Closed() <-chan struct{} { - return c.ctx.Done() -} - -// RemoteAddr returns the peer address of the connection. -func (c *Codec) RemoteAddr() string { - return "" -} - -// DialInProc attaches an in-process connection to the given RPC server. -// func DialInProc(handler *Server) *Client { -// initctx := context.Background() -// c, _ := newClient(initctx, func(context.Context) (ServerCodec, error) { -// p1, p2 := net.Pipe() -// go handler.ServeCodec(NewCodec(p1)) -// return NewCodec(p2), nil -// }) -// return c -// } diff --git a/contrib/codecs/inproc/inproc_test.go b/contrib/codecs/inproc/inproc_test.go deleted file mode 100644 index 439eaeae2caf8d5d83341c69cbfe8387227df340..0000000000000000000000000000000000000000 --- a/contrib/codecs/inproc/inproc_test.go +++ /dev/null @@ -1,29 +0,0 @@ -package inproc_test - -import ( - "context" - "testing" - - "gfx.cafe/open/jrpc/contrib/codecs/inproc" - "gfx.cafe/open/jrpc/contrib/jmux" - "gfx.cafe/open/jrpc/pkg/server" - - "github.com/stretchr/testify/require" -) - -func TestInprocSetup(t *testing.T) { - mux := jmux.NewMux() - srv := server.NewServer(mux) - - ctx := context.Background() - - clientCodec := inproc.NewCodec() - client := inproc.NewClient(clientCodec, nil) - go func() { - srv.ServeCodec(ctx, clientCodec) - }() - - var res any - err := client.Do(ctx, res, "hi_there", []any{}) - require.ErrorContains(t, err, "does not exist") -} diff --git a/contrib/codecs/inproc/testing.go b/contrib/codecs/inproc/testing.go deleted file mode 100644 index da13f44b61537ec21d4b71b113889fdb9da1f5f6..0000000000000000000000000000000000000000 --- a/contrib/codecs/inproc/testing.go +++ /dev/null @@ -1,20 +0,0 @@ -package inproc - -import ( - "context" - - "gfx.cafe/open/jrpc/pkg/codec" - "gfx.cafe/open/jrpc/pkg/jrpctest" - "gfx.cafe/open/jrpc/pkg/server" -) - -func ServerMaker() (*server.Server, jrpctest.ClientMaker, func()) { - s := jrpctest.NewServer() - clientCodec := NewCodec() - go func() { - s.ServeCodec(context.Background(), clientCodec) - }() - return s, func() codec.Conn { - return NewClient(clientCodec, nil) - }, func() {} -} diff --git a/contrib/codecs/rdwr/client.go b/contrib/codecs/rdwr/client.go index cee08fcde26be0027def0d252ab261f0693c38eb..283d6322d154f35bb5ff717f78e77d88bfa6e2d0 100644 --- a/contrib/codecs/rdwr/client.go +++ b/contrib/codecs/rdwr/client.go @@ -1,6 +1,7 @@ package rdwr import ( + "bufio" "context" "encoding/json" "io" @@ -30,7 +31,7 @@ type Client struct { func NewClient(rd io.Reader, wr io.Writer) *Client { cl := &Client{ p: clientutil.NewIdReply(), - rd: rd, + rd: bufio.NewReader(rd), wr: wr, handlerPeer: codec.PeerInfo{ Transport: "ipc", diff --git a/contrib/codecs/rdwr/codec.go b/contrib/codecs/rdwr/codec.go index 1a3311ec8de73fc52c93af9d6fbe8415f3dec4ef..a071009f12822f0349aa080a9127140fabf0d483 100644 --- a/contrib/codecs/rdwr/codec.go +++ b/contrib/codecs/rdwr/codec.go @@ -6,6 +6,7 @@ import ( "io" "sync" + "github.com/go-faster/jx" "github.com/goccy/go-json" "gfx.cafe/open/jrpc/pkg/codec" @@ -18,7 +19,8 @@ type Codec struct { rd io.Reader wrLock sync.Mutex - w io.Writer + wr *bufio.Writer + jx *jx.Encoder dec *json.Decoder decBuf json.RawMessage @@ -27,14 +29,14 @@ type Codec struct { func NewCodec(rd io.Reader, wr io.Writer) *Codec { ctx, cn := context.WithCancel(context.TODO()) - bufr := bufio.NewReader(rd) c := &Codec{ ctx: ctx, cn: cn, - rd: bufr, + rd: bufio.NewReader(rd), + wr: bufio.NewWriter(wr), dec: json.NewDecoder(rd), - w: wr, } + c.jx = jx.NewStreamingEncoder(wr, 4096) return c } @@ -72,11 +74,23 @@ func (c *Codec) Close() error { return nil } -func (c *Codec) Send(ctx context.Context, buf json.RawMessage) error { +func (c *Codec) Send(fn func(e *jx.Encoder) error) error { c.wrLock.Lock() defer c.wrLock.Unlock() - _, err := c.w.Write(append(buf, '\n')) - return err + defer c.jx.ResetWriter(c.wr) + if err := fn(c.jx); err != nil { + return err + } + if err := c.jx.Close(); err != nil { + return err + } + if _, err := c.wr.Write([]byte("\n")); err != nil { + return err + } + if err := c.wr.Flush(); err != nil { + return err + } + return nil } // Closed returns a channel which is closed when the connection is closed. diff --git a/pkg/codec/transport.go b/pkg/codec/transport.go index de6cc1f9f4e3a82e5bf74d5c12f48725b660ee25..2a9543d03bd344c7c057302dac2003e79722bfca 100644 --- a/pkg/codec/transport.go +++ b/pkg/codec/transport.go @@ -2,7 +2,8 @@ package codec import ( "context" - "encoding/json" + + "github.com/go-faster/jx" ) // ReaderWriter represents a single stream @@ -27,7 +28,7 @@ type Reader interface { // Implementations must be safe for concurrent use. type Writer interface { // write json blob to stream - Send(context.Context, json.RawMessage) error + Send(fn func(e *jx.Encoder) error) error // Closed returns a channel which is closed when the connection is closed. Closed() <-chan struct{} // RemoteAddr returns the peer address of the connection. diff --git a/pkg/server/server.go b/pkg/server/server.go index 45a9bee538288d1dc3b70bbd593f272a2dbfcb53..e2b9281206f40a99140289cecde26d9e23046156 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -152,32 +152,27 @@ type notifyEnv struct { } func (c *callResponder) notify(ctx context.Context, env *notifyEnv) error { - c.mu.Lock() - defer c.mu.Unlock() - enc := jx.GetEncoder() - enc.Reset() - enc.Grow(4096) - defer jx.PutEncoder(enc) - //enc := jx.NewStreamingEncoder(c.remote, 4096) - msg := &codec.Message{} - var err error - // allocate a temp buffer for this packet - buf := bufpool.GetStd() - defer bufpool.PutStd(buf) - err = json.NewEncoder(buf).Encode(env.dat) - if err != nil { - msg.Error = err - } else { - msg.Params = buf.Bytes() - } - msg.ExtraFields = env.extra - // add the method - msg.Method = env.method - err = codec.MarshalMessage(msg, enc) - if err != nil { - return err - } - err = c.remote.Send(ctx, enc.Bytes()) + err := c.remote.Send(func(e *jx.Encoder) error { + msg := &codec.Message{} + var err error + // allocate a temp buffer for this packet + buf := bufpool.GetStd() + defer bufpool.PutStd(buf) + err = json.NewEncoder(buf).Encode(env.dat) + if err != nil { + msg.Error = err + } else { + msg.Params = buf.Bytes() + } + msg.ExtraFields = env.extra + // add the method + msg.Method = env.method + err = codec.MarshalMessage(msg, e) + if err != nil { + return err + } + return nil + }) if err != nil { return err } @@ -190,8 +185,6 @@ type callEnv struct { } func (c *callResponder) send(ctx context.Context, env *callEnv) (err error) { - c.mu.Lock() - defer c.mu.Unlock() // notification gets nothing // if all msgs in batch are notification, we trigger an allSkip and write nothing if env.batch { @@ -202,47 +195,45 @@ func (c *callResponder) send(ctx context.Context, env *callEnv) (err error) { } } if allSkip { - return c.remote.Send(ctx, nil) + return c.remote.Send(func(e *jx.Encoder) error { return nil }) } } // create the streaming encoder - enc := jx.GetEncoder() - enc.Reset() - enc.Grow(4096) - defer jx.PutEncoder(enc) - if env.batch { - enc.ArrStart() - } - for _, v := range env.responses { - msg := v.pkt - // if we are a batch AND we are supposed to skip, then continue - // this means that for a non-batch notification, we do not skip! this is to ensure we get always a "response" for http-like endpoints - if env.batch && v.skip { - continue + err = c.remote.Send(func(enc *jx.Encoder) error { + if env.batch { + enc.ArrStart() } - // if there is no error, we try to marshal the result - if msg.Error == nil { - buf := bufpool.GetStd() - defer bufpool.PutStd(buf) - je := json.NewEncoder(buf) - err = je.EncodeWithOption(v.dat) + for _, v := range env.responses { + msg := v.pkt + // if we are a batch AND we are supposed to skip, then continue + // this means that for a non-batch notification, we do not skip! this is to ensure we get always a "response" for http-like endpoints + if env.batch && v.skip { + continue + } + // if there is no error, we try to marshal the result + if msg.Error == nil { + buf := bufpool.GetStd() + defer bufpool.PutStd(buf) + je := json.NewEncoder(buf) + err = je.EncodeWithOption(v.dat) + if err != nil { + msg.Error = err + } else { + msg.Result = buf.Bytes() + msg.Result = bytes.TrimSuffix(msg.Result, []byte{'\n'}) + } + } + // then marshal the whole message into the stream + err := codec.MarshalMessage(msg, enc) if err != nil { - msg.Error = err - } else { - msg.Result = buf.Bytes() - msg.Result = bytes.TrimSuffix(msg.Result, []byte{'\n'}) + return err } } - // then marshal the whole message into the stream - err := codec.MarshalMessage(msg, enc) - if err != nil { - return err + if env.batch { + enc.ArrEnd() } - } - if env.batch { - enc.ArrEnd() - } - err = c.remote.Send(ctx, enc.Bytes()) + return nil + }) if err != nil { return err }