diff --git a/.gitignore b/.gitignore index 723ef36f4e4f32c4560383aa5987c575a30c6535..21e6a342de84bc7601fe7d63e7fa1069fdd4a4b1 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,6 @@ -.idea \ No newline at end of file +.idea +*.out +*.pb.gz +*.tmp +*.log +*.test diff --git a/benchmark/http_test.go b/benchmark/http_test.go new file mode 100644 index 0000000000000000000000000000000000000000..b1d6c548a450aa63394116f1cfd811bed2d92d3d --- /dev/null +++ b/benchmark/http_test.go @@ -0,0 +1,25 @@ +package benchmark + +import ( + "context" + "net/http/httptest" + "testing" + + "gfx.cafe/open/jrpc/contrib/codecs/http" + "gfx.cafe/open/jrpc/pkg/jrpctest" + "github.com/stretchr/testify/require" +) + +func BenchmarkHttpClientServer(b *testing.B) { + ctx := context.Background() + s := jrpctest.NewServer() + + b.Run("SingleClient", func(b *testing.B) { + hsrv := httptest.NewServer(&http.Server{Server: s}) + conn, err := http.DialHTTP(hsrv.URL) + require.NoError(b, err) + for i := 0; i < b.N; i++ { + conn.Do(ctx, nil, "ping", nil) + } + }) +} diff --git a/benchmark/inproc_test.go b/benchmark/inproc_test.go new file mode 100644 index 0000000000000000000000000000000000000000..2b13b002a20526e2ef7e9479112df3966981568d --- /dev/null +++ b/benchmark/inproc_test.go @@ -0,0 +1,23 @@ +package benchmark + +import ( + "context" + "testing" + + "gfx.cafe/open/jrpc/contrib/codecs/inproc" + "gfx.cafe/open/jrpc/pkg/jrpctest" +) + +func BenchmarkInprocClientServer(b *testing.B) { + ctx := context.Background() + s := jrpctest.NewServer() + + b.Run("SingleClient", func(b *testing.B) { + clientCodec := inproc.NewCodec() + go s.ServeCodec(ctx, clientCodec) + conn := inproc.NewClient(clientCodec, nil) + for i := 0; i < b.N; i++ { + conn.Do(ctx, nil, "ping", nil) + } + }) +} diff --git a/benchmark/rdwr_test.go b/benchmark/rdwr_test.go new file mode 100644 index 0000000000000000000000000000000000000000..829eb8cc99457df492821baa85c26d4ca9e1a073 --- /dev/null +++ b/benchmark/rdwr_test.go @@ -0,0 +1,26 @@ +package benchmark + +import ( + "context" + "io" + "testing" + + "gfx.cafe/open/jrpc/contrib/codecs/rdwr" + "gfx.cafe/open/jrpc/pkg/jrpctest" +) + +func BenchmarkIoPipeClientServer(b *testing.B) { + ctx := context.Background() + s := jrpctest.NewServer() + + b.Run("SingleClient", func(b *testing.B) { + rd_s, wr_s := io.Pipe() + rd_c, wr_c := io.Pipe() + clientCodec := rdwr.NewCodec(rd_c, wr_s, nil) + go s.ServeCodec(ctx, clientCodec) + conn := rdwr.NewClient(rd_s, wr_c) + for i := 0; i < b.N; i++ { + conn.Do(ctx, nil, "ping", nil) + } + }) +} diff --git a/benchmark/websocket_test.go b/benchmark/websocket_test.go new file mode 100644 index 0000000000000000000000000000000000000000..2e7809d20599ee942140c9bf342b65580f108aaf --- /dev/null +++ b/benchmark/websocket_test.go @@ -0,0 +1,25 @@ +package benchmark + +import ( + "context" + "net/http/httptest" + "testing" + + "gfx.cafe/open/jrpc/contrib/codecs/websocket" + "gfx.cafe/open/jrpc/pkg/jrpctest" + "github.com/stretchr/testify/require" +) + +func BenchmarkWebsocketClientServer(b *testing.B) { + ctx := context.Background() + s := jrpctest.NewServer() + + b.Run("SingleClient", func(b *testing.B) { + hsrv := httptest.NewServer(&websocket.Server{Server: s}) + conn, err := websocket.DialWebsocket(ctx, hsrv.URL, "") + require.NoError(b, err) + for i := 0; i < b.N; i++ { + conn.Do(ctx, nil, "ping", nil) + } + }) +} diff --git a/contrib/codecs/http/codec_test.go b/contrib/codecs/http/codec_test.go index 822c36c3c9103e8f3fbc2261f523fe879cc31fd6..7b284aa648e75b961eb6effcd2e083758dd3e37e 100644 --- a/contrib/codecs/http/codec_test.go +++ b/contrib/codecs/http/codec_test.go @@ -1,11 +1,12 @@ package http import ( - "gfx.cafe/open/jrpc/pkg/codec" - "gfx.cafe/open/jrpc/pkg/server" "net/http/httptest" "testing" + "gfx.cafe/open/jrpc/pkg/codec" + "gfx.cafe/open/jrpc/pkg/server" + "gfx.cafe/open/jrpc/pkg/jrpctest" "github.com/stretchr/testify/require" ) diff --git a/contrib/codecs/http/testing.go b/contrib/codecs/http/testing.go new file mode 100644 index 0000000000000000000000000000000000000000..d02cfda642c35bda7612cc799cd3c0a9c9acc6d4 --- /dev/null +++ b/contrib/codecs/http/testing.go @@ -0,0 +1 @@ +package http diff --git a/contrib/codecs/inproc/codec_test.go b/contrib/codecs/inproc/codec_test.go index 694667c5d01055fbf4af9b5bb879f75790afe5b5..b821e80da6ddac0a3af0bef9b511212c14dac98a 100644 --- a/contrib/codecs/inproc/codec_test.go +++ b/contrib/codecs/inproc/codec_test.go @@ -11,18 +11,19 @@ import ( "gfx.cafe/open/jrpc/pkg/jrpctest" ) -func TestBasicSuite(t *testing.T) { +func mockServerMaker() (*server.Server, jrpctest.ClientMaker, func()) { + s := jrpctest.NewServer() + clientCodec := inproc.NewCodec() + go func() { + s.ServeCodec(context.Background(), clientCodec) + }() + return s, func() codec.Conn { + return inproc.NewClient(clientCodec, nil) + }, func() {} +} +func TestBasicSuite(t *testing.T) { jrpctest.RunBasicTestSuite(t, jrpctest.BasicTestSuiteArgs{ - ServerMaker: func() (*server.Server, jrpctest.ClientMaker, func()) { - s := jrpctest.NewServer() - clientCodec := inproc.NewCodec() - go func() { - s.ServeCodec(context.Background(), clientCodec) - }() - return s, func() codec.Conn { - return inproc.NewClient(clientCodec, nil) - }, func() {} - }, + ServerMaker: mockServerMaker, }) } diff --git a/pkg/jrpctest/services.go b/pkg/jrpctest/services.go index bdc7d54efef8c0eeddffb9a3c0d38e90f419ae26..fe668b5fac1aaa1d2372c82c36c6eb2630ffb4dd 100644 --- a/pkg/jrpctest/services.go +++ b/pkg/jrpctest/services.go @@ -34,6 +34,10 @@ func (s *testService) EchoAny(n any) any { return n } +func (s *testService) Ping(ctx context.Context) string { + return "pong" +} + func (s *testService) Echo(str string, i int, args *EchoArgs) EchoResult { return EchoResult{str, i, args} } diff --git a/pkg/server/server.go b/pkg/server/server.go index a4e0f52e33c67e6219bd651b3ed9dff7ea3b20bb..62ee9b51025ac890c9ed78fc18890f48557f7a40 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -222,7 +222,13 @@ type notifyEnv struct { } func (c *callResponder) notify(ctx context.Context, env *notifyEnv) error { - enc := jx.NewStreamingEncoder(c.remote, 4096) + enc := jx.GetEncoder() + if cap(enc.Bytes()) < 4096 { + enc.SetBytes(make([]byte, 0, 4096)) + } + enc.ResetWriter(c.remote) + defer jx.PutEncoder(enc) + //enc := jx.NewStreamingEncoder(c.remote, 4096) msg := &codec.Message{} var err error // allocate a temp buffer for this packet @@ -242,7 +248,6 @@ func (c *callResponder) notify(ctx context.Context, env *notifyEnv) error { return err } return enc.Close() - } type callEnv struct { @@ -265,7 +270,12 @@ func (c *callResponder) send(ctx context.Context, env *callEnv) (err error) { } } // create the streaming encoder - enc := jx.NewStreamingEncoder(c.remote, 4096) + enc := jx.GetEncoder() + if cap(enc.Bytes()) < 4096 { + enc.SetBytes(make([]byte, 0, 4096)) + } + enc.ResetWriter(c.remote) + defer jx.PutEncoder(enc) if env.batch { enc.ArrStart() }