From f6362a269877f30e8538f95dfefbd09022156dbe Mon Sep 17 00:00:00 2001
From: a <a@tuxpa.in>
Date: Sat, 5 Aug 2023 15:56:32 -0500
Subject: [PATCH] Streaming encoder

---
 .gitignore                          |  7 ++++++-
 benchmark/http_test.go              | 25 +++++++++++++++++++++++++
 benchmark/inproc_test.go            | 23 +++++++++++++++++++++++
 benchmark/rdwr_test.go              | 26 ++++++++++++++++++++++++++
 benchmark/websocket_test.go         | 25 +++++++++++++++++++++++++
 contrib/codecs/http/codec_test.go   |  5 +++--
 contrib/codecs/http/testing.go      |  1 +
 contrib/codecs/inproc/codec_test.go | 23 ++++++++++++-----------
 pkg/jrpctest/services.go            |  4 ++++
 pkg/server/server.go                | 16 +++++++++++++---
 10 files changed, 138 insertions(+), 17 deletions(-)
 create mode 100644 benchmark/http_test.go
 create mode 100644 benchmark/inproc_test.go
 create mode 100644 benchmark/rdwr_test.go
 create mode 100644 benchmark/websocket_test.go
 create mode 100644 contrib/codecs/http/testing.go

diff --git a/.gitignore b/.gitignore
index 723ef36..21e6a34 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1 +1,6 @@
-.idea
\ No newline at end of file
+.idea
+*.out
+*.pb.gz
+*.tmp
+*.log
+*.test
diff --git a/benchmark/http_test.go b/benchmark/http_test.go
new file mode 100644
index 0000000..b1d6c54
--- /dev/null
+++ b/benchmark/http_test.go
@@ -0,0 +1,25 @@
+package benchmark
+
+import (
+	"context"
+	"net/http/httptest"
+	"testing"
+
+	"gfx.cafe/open/jrpc/contrib/codecs/http"
+	"gfx.cafe/open/jrpc/pkg/jrpctest"
+	"github.com/stretchr/testify/require"
+)
+
+func BenchmarkHttpClientServer(b *testing.B) {
+	ctx := context.Background()
+	s := jrpctest.NewServer()
+
+	b.Run("SingleClient", func(b *testing.B) {
+		hsrv := httptest.NewServer(&http.Server{Server: s})
+		conn, err := http.DialHTTP(hsrv.URL)
+		require.NoError(b, err)
+		for i := 0; i < b.N; i++ {
+			conn.Do(ctx, nil, "ping", nil)
+		}
+	})
+}
diff --git a/benchmark/inproc_test.go b/benchmark/inproc_test.go
new file mode 100644
index 0000000..2b13b00
--- /dev/null
+++ b/benchmark/inproc_test.go
@@ -0,0 +1,23 @@
+package benchmark
+
+import (
+	"context"
+	"testing"
+
+	"gfx.cafe/open/jrpc/contrib/codecs/inproc"
+	"gfx.cafe/open/jrpc/pkg/jrpctest"
+)
+
+func BenchmarkInprocClientServer(b *testing.B) {
+	ctx := context.Background()
+	s := jrpctest.NewServer()
+
+	b.Run("SingleClient", func(b *testing.B) {
+		clientCodec := inproc.NewCodec()
+		go s.ServeCodec(ctx, clientCodec)
+		conn := inproc.NewClient(clientCodec, nil)
+		for i := 0; i < b.N; i++ {
+			conn.Do(ctx, nil, "ping", nil)
+		}
+	})
+}
diff --git a/benchmark/rdwr_test.go b/benchmark/rdwr_test.go
new file mode 100644
index 0000000..829eb8c
--- /dev/null
+++ b/benchmark/rdwr_test.go
@@ -0,0 +1,26 @@
+package benchmark
+
+import (
+	"context"
+	"io"
+	"testing"
+
+	"gfx.cafe/open/jrpc/contrib/codecs/rdwr"
+	"gfx.cafe/open/jrpc/pkg/jrpctest"
+)
+
+func BenchmarkIoPipeClientServer(b *testing.B) {
+	ctx := context.Background()
+	s := jrpctest.NewServer()
+
+	b.Run("SingleClient", func(b *testing.B) {
+		rd_s, wr_s := io.Pipe()
+		rd_c, wr_c := io.Pipe()
+		clientCodec := rdwr.NewCodec(rd_c, wr_s, nil)
+		go s.ServeCodec(ctx, clientCodec)
+		conn := rdwr.NewClient(rd_s, wr_c)
+		for i := 0; i < b.N; i++ {
+			conn.Do(ctx, nil, "ping", nil)
+		}
+	})
+}
diff --git a/benchmark/websocket_test.go b/benchmark/websocket_test.go
new file mode 100644
index 0000000..2e7809d
--- /dev/null
+++ b/benchmark/websocket_test.go
@@ -0,0 +1,25 @@
+package benchmark
+
+import (
+	"context"
+	"net/http/httptest"
+	"testing"
+
+	"gfx.cafe/open/jrpc/contrib/codecs/websocket"
+	"gfx.cafe/open/jrpc/pkg/jrpctest"
+	"github.com/stretchr/testify/require"
+)
+
+func BenchmarkWebsocketClientServer(b *testing.B) {
+	ctx := context.Background()
+	s := jrpctest.NewServer()
+
+	b.Run("SingleClient", func(b *testing.B) {
+		hsrv := httptest.NewServer(&websocket.Server{Server: s})
+		conn, err := websocket.DialWebsocket(ctx, hsrv.URL, "")
+		require.NoError(b, err)
+		for i := 0; i < b.N; i++ {
+			conn.Do(ctx, nil, "ping", nil)
+		}
+	})
+}
diff --git a/contrib/codecs/http/codec_test.go b/contrib/codecs/http/codec_test.go
index 822c36c..7b284aa 100644
--- a/contrib/codecs/http/codec_test.go
+++ b/contrib/codecs/http/codec_test.go
@@ -1,11 +1,12 @@
 package http
 
 import (
-	"gfx.cafe/open/jrpc/pkg/codec"
-	"gfx.cafe/open/jrpc/pkg/server"
 	"net/http/httptest"
 	"testing"
 
+	"gfx.cafe/open/jrpc/pkg/codec"
+	"gfx.cafe/open/jrpc/pkg/server"
+
 	"gfx.cafe/open/jrpc/pkg/jrpctest"
 	"github.com/stretchr/testify/require"
 )
diff --git a/contrib/codecs/http/testing.go b/contrib/codecs/http/testing.go
new file mode 100644
index 0000000..d02cfda
--- /dev/null
+++ b/contrib/codecs/http/testing.go
@@ -0,0 +1 @@
+package http
diff --git a/contrib/codecs/inproc/codec_test.go b/contrib/codecs/inproc/codec_test.go
index 694667c..b821e80 100644
--- a/contrib/codecs/inproc/codec_test.go
+++ b/contrib/codecs/inproc/codec_test.go
@@ -11,18 +11,19 @@ import (
 	"gfx.cafe/open/jrpc/pkg/jrpctest"
 )
 
-func TestBasicSuite(t *testing.T) {
+func mockServerMaker() (*server.Server, jrpctest.ClientMaker, func()) {
+	s := jrpctest.NewServer()
+	clientCodec := inproc.NewCodec()
+	go func() {
+		s.ServeCodec(context.Background(), clientCodec)
+	}()
+	return s, func() codec.Conn {
+		return inproc.NewClient(clientCodec, nil)
+	}, func() {}
+}
 
+func TestBasicSuite(t *testing.T) {
 	jrpctest.RunBasicTestSuite(t, jrpctest.BasicTestSuiteArgs{
-		ServerMaker: func() (*server.Server, jrpctest.ClientMaker, func()) {
-			s := jrpctest.NewServer()
-			clientCodec := inproc.NewCodec()
-			go func() {
-				s.ServeCodec(context.Background(), clientCodec)
-			}()
-			return s, func() codec.Conn {
-				return inproc.NewClient(clientCodec, nil)
-			}, func() {}
-		},
+		ServerMaker: mockServerMaker,
 	})
 }
diff --git a/pkg/jrpctest/services.go b/pkg/jrpctest/services.go
index bdc7d54..fe668b5 100644
--- a/pkg/jrpctest/services.go
+++ b/pkg/jrpctest/services.go
@@ -34,6 +34,10 @@ func (s *testService) EchoAny(n any) any {
 	return n
 }
 
+func (s *testService) Ping(ctx context.Context) string {
+	return "pong"
+}
+
 func (s *testService) Echo(str string, i int, args *EchoArgs) EchoResult {
 	return EchoResult{str, i, args}
 }
diff --git a/pkg/server/server.go b/pkg/server/server.go
index a4e0f52..62ee9b5 100644
--- a/pkg/server/server.go
+++ b/pkg/server/server.go
@@ -222,7 +222,13 @@ type notifyEnv struct {
 }
 
 func (c *callResponder) notify(ctx context.Context, env *notifyEnv) error {
-	enc := jx.NewStreamingEncoder(c.remote, 4096)
+	enc := jx.GetEncoder()
+	if cap(enc.Bytes()) < 4096 {
+		enc.SetBytes(make([]byte, 0, 4096))
+	}
+	enc.ResetWriter(c.remote)
+	defer jx.PutEncoder(enc)
+	//enc := jx.NewStreamingEncoder(c.remote, 4096)
 	msg := &codec.Message{}
 	var err error
 	//  allocate a temp buffer for this packet
@@ -242,7 +248,6 @@ func (c *callResponder) notify(ctx context.Context, env *notifyEnv) error {
 		return err
 	}
 	return enc.Close()
-
 }
 
 type callEnv struct {
@@ -265,7 +270,12 @@ func (c *callResponder) send(ctx context.Context, env *callEnv) (err error) {
 		}
 	}
 	// create the streaming encoder
-	enc := jx.NewStreamingEncoder(c.remote, 4096)
+	enc := jx.GetEncoder()
+	if cap(enc.Bytes()) < 4096 {
+		enc.SetBytes(make([]byte, 0, 4096))
+	}
+	enc.ResetWriter(c.remote)
+	defer jx.PutEncoder(enc)
 	if env.batch {
 		enc.ArrStart()
 	}
-- 
GitLab