From bd81290c21bdf33ec3e81bcc5c9566badc76d7cd Mon Sep 17 00:00:00 2001
From: a <a@tuxpa.in>
Date: Sat, 5 Aug 2023 17:22:59 -0500
Subject: [PATCH] ok

---
 benchmark/http_test.go                 | 25 -------------
 benchmark/inproc_test.go               | 23 ------------
 benchmark/rdwr_test.go                 | 26 --------------
 benchmark/suite_test.go                | 49 ++++++++++++++++++++++++++
 benchmark/websocket_test.go            | 25 -------------
 contrib/codecs/http/codec.go           | 26 +++++++++-----
 contrib/codecs/http/codec_test.go      | 15 +-------
 contrib/codecs/http/handler.go         | 14 +++++++-
 contrib/codecs/http/testing.go         | 20 +++++++++++
 contrib/codecs/inproc/client.go        |  3 +-
 contrib/codecs/inproc/codec_test.go    | 17 +--------
 contrib/codecs/inproc/testing.go       | 20 +++++++++++
 contrib/codecs/rdwr/client.go          |  3 +-
 contrib/codecs/rdwr/codec.go           |  3 +-
 contrib/codecs/rdwr/codec_test.go      | 19 +---------
 contrib/codecs/rdwr/testing.go         | 23 ++++++++++++
 contrib/codecs/websocket/codec_test.go | 18 ++--------
 contrib/codecs/websocket/testing.go    | 22 ++++++++++++
 pkg/jrpctest/suites.go                 | 27 ++++++++++----
 19 files changed, 198 insertions(+), 180 deletions(-)
 delete mode 100644 benchmark/http_test.go
 delete mode 100644 benchmark/inproc_test.go
 delete mode 100644 benchmark/rdwr_test.go
 create mode 100644 benchmark/suite_test.go
 delete mode 100644 benchmark/websocket_test.go
 create mode 100644 contrib/codecs/inproc/testing.go
 create mode 100644 contrib/codecs/rdwr/testing.go
 create mode 100644 contrib/codecs/websocket/testing.go

diff --git a/benchmark/http_test.go b/benchmark/http_test.go
deleted file mode 100644
index b1d6c54..0000000
--- a/benchmark/http_test.go
+++ /dev/null
@@ -1,25 +0,0 @@
-package benchmark
-
-import (
-	"context"
-	"net/http/httptest"
-	"testing"
-
-	"gfx.cafe/open/jrpc/contrib/codecs/http"
-	"gfx.cafe/open/jrpc/pkg/jrpctest"
-	"github.com/stretchr/testify/require"
-)
-
-func BenchmarkHttpClientServer(b *testing.B) {
-	ctx := context.Background()
-	s := jrpctest.NewServer()
-
-	b.Run("SingleClient", func(b *testing.B) {
-		hsrv := httptest.NewServer(&http.Server{Server: s})
-		conn, err := http.DialHTTP(hsrv.URL)
-		require.NoError(b, err)
-		for i := 0; i < b.N; i++ {
-			conn.Do(ctx, nil, "ping", nil)
-		}
-	})
-}
diff --git a/benchmark/inproc_test.go b/benchmark/inproc_test.go
deleted file mode 100644
index 2b13b00..0000000
--- a/benchmark/inproc_test.go
+++ /dev/null
@@ -1,23 +0,0 @@
-package benchmark
-
-import (
-	"context"
-	"testing"
-
-	"gfx.cafe/open/jrpc/contrib/codecs/inproc"
-	"gfx.cafe/open/jrpc/pkg/jrpctest"
-)
-
-func BenchmarkInprocClientServer(b *testing.B) {
-	ctx := context.Background()
-	s := jrpctest.NewServer()
-
-	b.Run("SingleClient", func(b *testing.B) {
-		clientCodec := inproc.NewCodec()
-		go s.ServeCodec(ctx, clientCodec)
-		conn := inproc.NewClient(clientCodec, nil)
-		for i := 0; i < b.N; i++ {
-			conn.Do(ctx, nil, "ping", nil)
-		}
-	})
-}
diff --git a/benchmark/rdwr_test.go b/benchmark/rdwr_test.go
deleted file mode 100644
index 829eb8c..0000000
--- a/benchmark/rdwr_test.go
+++ /dev/null
@@ -1,26 +0,0 @@
-package benchmark
-
-import (
-	"context"
-	"io"
-	"testing"
-
-	"gfx.cafe/open/jrpc/contrib/codecs/rdwr"
-	"gfx.cafe/open/jrpc/pkg/jrpctest"
-)
-
-func BenchmarkIoPipeClientServer(b *testing.B) {
-	ctx := context.Background()
-	s := jrpctest.NewServer()
-
-	b.Run("SingleClient", func(b *testing.B) {
-		rd_s, wr_s := io.Pipe()
-		rd_c, wr_c := io.Pipe()
-		clientCodec := rdwr.NewCodec(rd_c, wr_s, nil)
-		go s.ServeCodec(ctx, clientCodec)
-		conn := rdwr.NewClient(rd_s, wr_c)
-		for i := 0; i < b.N; i++ {
-			conn.Do(ctx, nil, "ping", nil)
-		}
-	})
-}
diff --git a/benchmark/suite_test.go b/benchmark/suite_test.go
new file mode 100644
index 0000000..1a6824d
--- /dev/null
+++ b/benchmark/suite_test.go
@@ -0,0 +1,49 @@
+package benchmark
+
+import (
+	"context"
+	"testing"
+
+	"gfx.cafe/open/jrpc/contrib/codecs/http"
+	"gfx.cafe/open/jrpc/contrib/codecs/inproc"
+	"gfx.cafe/open/jrpc/contrib/codecs/rdwr"
+	"gfx.cafe/open/jrpc/contrib/codecs/websocket"
+	"gfx.cafe/open/jrpc/pkg/codec"
+	"gfx.cafe/open/jrpc/pkg/jrpctest"
+	"gfx.cafe/open/jrpc/pkg/server"
+)
+
+func runBenchmarkSuite(b *testing.B, sm jrpctest.ServerMaker) {
+	ctx := context.Background()
+	executeBench := jrpctest.BenchExecutor(sm)
+	var makeBench = func(name string, fm jrpctest.BenchContext) {
+		b.Run(name, func(b *testing.B) {
+			executeBench(b, fm)
+		})
+	}
+	makeBench("SingleClient", func(b *testing.B, server *server.Server, client codec.Conn) {
+		for i := 0; i < b.N; i++ {
+			err := client.Do(ctx, nil, "test_ping", nil)
+			if err != nil {
+				panic(err)
+			}
+		}
+	})
+}
+
+func BenchmarkSimpleSuite(b *testing.B) {
+
+	makers := map[string]jrpctest.ServerMaker{
+		"Http":      http.ServerMaker,
+		"WebSocket": websocket.ServerMaker,
+		"InProc":    inproc.ServerMaker,
+		"IoPipe":    rdwr.ServerMaker,
+	}
+
+	for k, v := range makers {
+		b.Run(k, func(b *testing.B) {
+			runBenchmarkSuite(b, v)
+		})
+	}
+
+}
diff --git a/benchmark/websocket_test.go b/benchmark/websocket_test.go
deleted file mode 100644
index 2e7809d..0000000
--- a/benchmark/websocket_test.go
+++ /dev/null
@@ -1,25 +0,0 @@
-package benchmark
-
-import (
-	"context"
-	"net/http/httptest"
-	"testing"
-
-	"gfx.cafe/open/jrpc/contrib/codecs/websocket"
-	"gfx.cafe/open/jrpc/pkg/jrpctest"
-	"github.com/stretchr/testify/require"
-)
-
-func BenchmarkWebsocketClientServer(b *testing.B) {
-	ctx := context.Background()
-	s := jrpctest.NewServer()
-
-	b.Run("SingleClient", func(b *testing.B) {
-		hsrv := httptest.NewServer(&websocket.Server{Server: s})
-		conn, err := websocket.DialWebsocket(ctx, hsrv.URL, "")
-		require.NoError(b, err)
-		for i := 0; i < b.N; i++ {
-			conn.Do(ctx, nil, "ping", nil)
-		}
-	})
-}
diff --git a/contrib/codecs/http/codec.go b/contrib/codecs/http/codec.go
index ee21a7d..4e2f1e5 100644
--- a/contrib/codecs/http/codec.go
+++ b/contrib/codecs/http/codec.go
@@ -18,6 +18,7 @@ import (
 
 var _ codec.ReaderWriter = (*Codec)(nil)
 
+// Reusable codec. use Reset()
 type Codec struct {
 	ctx context.Context
 	cn  func()
@@ -37,23 +38,32 @@ type httpError struct {
 }
 
 func NewCodec(w http.ResponseWriter, r *http.Request) *Codec {
+	c := &Codec{}
+	c.Reset(w, r)
+	return c
+}
+
+func (c *Codec) Reset(w http.ResponseWriter, r *http.Request) {
 	ir := io.Writer(w)
 	if w == nil {
 		ir = io.Discard
 	}
-	c := &Codec{
-		r:     r,
-		w:     w,
-		wr:    bufio.NewWriter(ir),
-		msgs:  make(chan *serverutil.Bundle, 1),
-		errCh: make(chan httpError, 1),
+	c.r = r
+	c.w = w
+	if c.wr == nil {
+		c.wr = bufio.NewWriter(ir)
+	} else {
+		c.wr.Reset(ir)
 	}
-	ctx := r.Context()
+	c.msgs = make(chan *serverutil.Bundle, 1)
+	c.errCh = make(chan httpError, 1)
+
+	ctx := c.r.Context()
 	c.ctx, c.cn = context.WithCancel(ctx)
 	c.peerInfo()
 	c.doRead()
-	return c
 }
+
 func (c *Codec) peerInfo() {
 	c.i.Transport = "http"
 	c.i.RemoteAddr = c.r.RemoteAddr
diff --git a/contrib/codecs/http/codec_test.go b/contrib/codecs/http/codec_test.go
index 7b284aa..e7ae8ee 100644
--- a/contrib/codecs/http/codec_test.go
+++ b/contrib/codecs/http/codec_test.go
@@ -1,26 +1,13 @@
 package http
 
 import (
-	"net/http/httptest"
 	"testing"
 
-	"gfx.cafe/open/jrpc/pkg/codec"
-	"gfx.cafe/open/jrpc/pkg/server"
-
 	"gfx.cafe/open/jrpc/pkg/jrpctest"
-	"github.com/stretchr/testify/require"
 )
 
 func TestBasicSuite(t *testing.T) {
 	jrpctest.RunBasicTestSuite(t, jrpctest.BasicTestSuiteArgs{
-		ServerMaker: func() (*server.Server, jrpctest.ClientMaker, func()) {
-			s := jrpctest.NewServer()
-			hsrv := httptest.NewServer(&Server{Server: s})
-			return s, func() codec.Conn {
-				conn, err := DialHTTP(hsrv.URL)
-				require.NoError(t, err)
-				return conn
-			}, hsrv.Close
-		},
+		ServerMaker: ServerMaker,
 	})
 }
diff --git a/contrib/codecs/http/handler.go b/contrib/codecs/http/handler.go
index d03fc66..b3ba786 100644
--- a/contrib/codecs/http/handler.go
+++ b/contrib/codecs/http/handler.go
@@ -2,6 +2,7 @@ package http
 
 import (
 	"net/http"
+	"sync"
 
 	"gfx.cafe/open/jrpc/pkg/server"
 )
@@ -14,12 +15,23 @@ type Server struct {
 	Server *server.Server
 }
 
+var codecPool = sync.Pool{
+	New: func() any {
+		return &Codec{}
+	},
+}
+
 func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	if s.Server == nil {
 		http.Error(w, "no server set", http.StatusInternalServerError)
 		return
 	}
-	c := NewCodec(w, r)
+	c := codecPool.Get().(*Codec)
+	c.Reset(w, r)
 	w.Header().Set("content-type", contentType)
 	s.Server.ServeCodec(r.Context(), c)
+	go func() {
+		<-c.Closed()
+		codecPool.Put(c)
+	}()
 }
diff --git a/contrib/codecs/http/testing.go b/contrib/codecs/http/testing.go
index d02cfda..7c91c4f 100644
--- a/contrib/codecs/http/testing.go
+++ b/contrib/codecs/http/testing.go
@@ -1 +1,21 @@
 package http
+
+import (
+	"net/http/httptest"
+
+	"gfx.cafe/open/jrpc/pkg/codec"
+	"gfx.cafe/open/jrpc/pkg/jrpctest"
+	"gfx.cafe/open/jrpc/pkg/server"
+)
+
+func ServerMaker() (*server.Server, jrpctest.ClientMaker, func()) {
+	s := jrpctest.NewServer()
+	hsrv := httptest.NewServer(&Server{Server: s})
+	return s, func() codec.Conn {
+		conn, err := DialHTTP(hsrv.URL)
+		if err != nil {
+			panic(err)
+		}
+		return conn
+	}, hsrv.Close
+}
diff --git a/contrib/codecs/inproc/client.go b/contrib/codecs/inproc/client.go
index 88c325e..7f019ef 100644
--- a/contrib/codecs/inproc/client.go
+++ b/contrib/codecs/inproc/client.go
@@ -50,8 +50,9 @@ func (c *Client) Closed() <-chan struct{} {
 func (c *Client) listen() error {
 	var msg json.RawMessage
 	defer c.cn()
+	dec := json.NewDecoder(c.c.rd)
 	for {
-		err := json.NewDecoder(c.c.rd).Decode(&msg)
+		err := dec.Decode(&msg)
 		if err != nil {
 			return err
 		}
diff --git a/contrib/codecs/inproc/codec_test.go b/contrib/codecs/inproc/codec_test.go
index b821e80..9deacc3 100644
--- a/contrib/codecs/inproc/codec_test.go
+++ b/contrib/codecs/inproc/codec_test.go
@@ -1,29 +1,14 @@
 package inproc_test
 
 import (
-	"context"
 	"testing"
 
 	"gfx.cafe/open/jrpc/contrib/codecs/inproc"
-	"gfx.cafe/open/jrpc/pkg/codec"
-	"gfx.cafe/open/jrpc/pkg/server"
-
 	"gfx.cafe/open/jrpc/pkg/jrpctest"
 )
 
-func mockServerMaker() (*server.Server, jrpctest.ClientMaker, func()) {
-	s := jrpctest.NewServer()
-	clientCodec := inproc.NewCodec()
-	go func() {
-		s.ServeCodec(context.Background(), clientCodec)
-	}()
-	return s, func() codec.Conn {
-		return inproc.NewClient(clientCodec, nil)
-	}, func() {}
-}
-
 func TestBasicSuite(t *testing.T) {
 	jrpctest.RunBasicTestSuite(t, jrpctest.BasicTestSuiteArgs{
-		ServerMaker: mockServerMaker,
+		ServerMaker: inproc.ServerMaker,
 	})
 }
diff --git a/contrib/codecs/inproc/testing.go b/contrib/codecs/inproc/testing.go
new file mode 100644
index 0000000..da13f44
--- /dev/null
+++ b/contrib/codecs/inproc/testing.go
@@ -0,0 +1,20 @@
+package inproc
+
+import (
+	"context"
+
+	"gfx.cafe/open/jrpc/pkg/codec"
+	"gfx.cafe/open/jrpc/pkg/jrpctest"
+	"gfx.cafe/open/jrpc/pkg/server"
+)
+
+func ServerMaker() (*server.Server, jrpctest.ClientMaker, func()) {
+	s := jrpctest.NewServer()
+	clientCodec := NewCodec()
+	go func() {
+		s.ServeCodec(context.Background(), clientCodec)
+	}()
+	return s, func() codec.Conn {
+		return NewClient(clientCodec, nil)
+	}, func() {}
+}
diff --git a/contrib/codecs/rdwr/client.go b/contrib/codecs/rdwr/client.go
index 0637c34..dcc6a65 100644
--- a/contrib/codecs/rdwr/client.go
+++ b/contrib/codecs/rdwr/client.go
@@ -63,8 +63,9 @@ func (c *Client) Mount(h codec.Middleware) {
 func (c *Client) listen() error {
 	var msg json.RawMessage
 	defer c.cn()
+	dec := json.NewDecoder(c.rd)
 	for {
-		err := json.NewDecoder(c.rd).Decode(&msg)
+		err := dec.Decode(&msg)
 		if err != nil {
 			return err
 		}
diff --git a/contrib/codecs/rdwr/codec.go b/contrib/codecs/rdwr/codec.go
index 9ebcf94..f02a013 100644
--- a/contrib/codecs/rdwr/codec.go
+++ b/contrib/codecs/rdwr/codec.go
@@ -42,9 +42,10 @@ func NewCodec(rd io.Reader, wr io.Writer, onError func(error)) *Codec {
 
 func (c *Codec) listen() error {
 	var msg json.RawMessage
+	dec := json.NewDecoder(c.rd)
 	for {
 		// reading a message
-		err := json.NewDecoder(c.rd).Decode(&msg)
+		err := dec.Decode(&msg)
 		if err != nil {
 			c.cn()
 			return err
diff --git a/contrib/codecs/rdwr/codec_test.go b/contrib/codecs/rdwr/codec_test.go
index 6dcadf1..6177da4 100644
--- a/contrib/codecs/rdwr/codec_test.go
+++ b/contrib/codecs/rdwr/codec_test.go
@@ -1,31 +1,14 @@
 package rdwr_test
 
 import (
-	"context"
-	"io"
 	"testing"
 
 	"gfx.cafe/open/jrpc/contrib/codecs/rdwr"
-	"gfx.cafe/open/jrpc/pkg/codec"
-	"gfx.cafe/open/jrpc/pkg/server"
-
 	"gfx.cafe/open/jrpc/pkg/jrpctest"
 )
 
 func TestBasicSuite(t *testing.T) {
-
 	jrpctest.RunBasicTestSuite(t, jrpctest.BasicTestSuiteArgs{
-		ServerMaker: func() (*server.Server, jrpctest.ClientMaker, func()) {
-			rd_s, wr_s := io.Pipe()
-			rd_c, wr_c := io.Pipe()
-			s := jrpctest.NewServer()
-			clientCodec := rdwr.NewCodec(rd_c, wr_s, nil)
-			go func() {
-				s.ServeCodec(context.Background(), clientCodec)
-			}()
-			return s, func() codec.Conn {
-				return rdwr.NewClient(rd_s, wr_c)
-			}, func() {}
-		},
+		ServerMaker: rdwr.ServerMaker,
 	})
 }
diff --git a/contrib/codecs/rdwr/testing.go b/contrib/codecs/rdwr/testing.go
new file mode 100644
index 0000000..c040e93
--- /dev/null
+++ b/contrib/codecs/rdwr/testing.go
@@ -0,0 +1,23 @@
+package rdwr
+
+import (
+	"context"
+	"io"
+
+	"gfx.cafe/open/jrpc/pkg/codec"
+	"gfx.cafe/open/jrpc/pkg/jrpctest"
+	"gfx.cafe/open/jrpc/pkg/server"
+)
+
+func ServerMaker() (*server.Server, jrpctest.ClientMaker, func()) {
+	rd_s, wr_s := io.Pipe()
+	rd_c, wr_c := io.Pipe()
+	s := jrpctest.NewServer()
+	clientCodec := NewCodec(rd_c, wr_s, nil)
+	go func() {
+		s.ServeCodec(context.Background(), clientCodec)
+	}()
+	return s, func() codec.Conn {
+		return NewClient(rd_s, wr_c)
+	}, func() {}
+}
diff --git a/contrib/codecs/websocket/codec_test.go b/contrib/codecs/websocket/codec_test.go
index dbb3d6d..bedffc1 100644
--- a/contrib/codecs/websocket/codec_test.go
+++ b/contrib/codecs/websocket/codec_test.go
@@ -1,25 +1,13 @@
 package websocket
 
 import (
-	"context"
-	"gfx.cafe/open/jrpc/pkg/codec"
-	"gfx.cafe/open/jrpc/pkg/jrpctest"
-	"gfx.cafe/open/jrpc/pkg/server"
-	"github.com/stretchr/testify/require"
-	"net/http/httptest"
 	"testing"
+
+	"gfx.cafe/open/jrpc/pkg/jrpctest"
 )
 
 func TestBasicSuite(t *testing.T) {
 	jrpctest.RunBasicTestSuite(t, jrpctest.BasicTestSuiteArgs{
-		ServerMaker: func() (*server.Server, jrpctest.ClientMaker, func()) {
-			s := jrpctest.NewServer()
-			hsrv := httptest.NewServer(&Server{Server: s})
-			return s, func() codec.Conn {
-				conn, err := DialWebsocket(context.Background(), hsrv.URL, "")
-				require.NoError(t, err)
-				return conn
-			}, hsrv.Close
-		},
+		ServerMaker: ServerMaker,
 	})
 }
diff --git a/contrib/codecs/websocket/testing.go b/contrib/codecs/websocket/testing.go
new file mode 100644
index 0000000..e5bdb46
--- /dev/null
+++ b/contrib/codecs/websocket/testing.go
@@ -0,0 +1,22 @@
+package websocket
+
+import (
+	"context"
+	"net/http/httptest"
+
+	"gfx.cafe/open/jrpc/pkg/codec"
+	"gfx.cafe/open/jrpc/pkg/jrpctest"
+	"gfx.cafe/open/jrpc/pkg/server"
+)
+
+func ServerMaker() (*server.Server, jrpctest.ClientMaker, func()) {
+	s := jrpctest.NewServer()
+	hsrv := httptest.NewServer(&Server{Server: s})
+	return s, func() codec.Conn {
+		conn, err := DialWebsocket(context.Background(), hsrv.URL, "")
+		if err != nil {
+			panic(err)
+		}
+		return conn
+	}, hsrv.Close
+}
diff --git a/pkg/jrpctest/suites.go b/pkg/jrpctest/suites.go
index 9bb6bf1..ddb4325 100644
--- a/pkg/jrpctest/suites.go
+++ b/pkg/jrpctest/suites.go
@@ -24,19 +24,34 @@ type BasicTestSuiteArgs struct {
 }
 
 type TestContext func(t *testing.T, server *server.Server, client codec.Conn)
+type BenchContext func(t *testing.B, server *server.Server, client codec.Conn)
 
-// go:embed testdata/
-var testData embed.FS
-
-func RunBasicTestSuite(t *testing.T, args BasicTestSuiteArgs) {
-	var executeTest = func(t *testing.T, c TestContext) {
-		server, dialer, cn := args.ServerMaker()
+func TestExecutor(sm ServerMaker) func(t *testing.T, c TestContext) {
+	return func(t *testing.T, c TestContext) {
+		server, dialer, cn := sm()
 		defer cn()
 		defer server.Stop()
 		client := dialer()
 		defer client.Close()
 		c(t, server, client)
 	}
+}
+func BenchExecutor(sm ServerMaker) func(t *testing.B, c BenchContext) {
+	return func(t *testing.B, c BenchContext) {
+		server, dialer, cn := sm()
+		defer cn()
+		defer server.Stop()
+		client := dialer()
+		defer client.Close()
+		c(t, server, client)
+	}
+}
+
+// go:embed testdata/
+var testData embed.FS
+
+func RunBasicTestSuite(t *testing.T, args BasicTestSuiteArgs) {
+	var executeTest = TestExecutor(args.ServerMaker)
 
 	var makeTest = func(name string, fm TestContext) {
 		t.Run(name, func(t *testing.T) {
-- 
GitLab