From 1f1e7a3c8bd13261c6ec48d9e274d05abd26c97f Mon Sep 17 00:00:00 2001
From: a <a@tuxpa.in>
Date: Mon, 23 Dec 2024 16:57:02 -0600
Subject: [PATCH] noot

---
 benchmark/suite_test.go        | 16 ++++++------
 contrib/middleware/replacer.go |  4 ++-
 pkg/jrpctest/server.go         |  5 ++--
 pkg/jsonrpc/message.go         | 46 +++++++++++++++-------------------
 4 files changed, 34 insertions(+), 37 deletions(-)

diff --git a/benchmark/suite_test.go b/benchmark/suite_test.go
index 2837881..deb7679 100644
--- a/benchmark/suite_test.go
+++ b/benchmark/suite_test.go
@@ -19,12 +19,12 @@ type testCase struct {
 }
 
 var testCases = []testCase{
-	{"SingleClient", "small_largeResp", false},
-	{"SingleClientMedium", "medium_largeResp", false},
-	{"SingleClientLarge", "large_largeResp", false},
-	{"ParallelClient", "small_largeResp", true},
-	{"ParallelClientMedium", "medium_largeResp", true},
-	{"ParallelClientLarge", "large_largeResp", true},
+	{"SingleClient", "/small/largeResp", false},
+	{"SingleClientMedium", "/medium/largeResp", false},
+	{"SingleClientLarge", "/large/largeResp", false},
+	{"ParallelClient", "/small/largeResp", true},
+	{"ParallelClientMedium", "/medium/largeResp", true},
+	{"ParallelClientLarge", "/large/largeResp", true},
 }
 
 func runTestCase(ctx context.Context, client jsonrpc.Conn, method string, parallel bool) error {
@@ -32,9 +32,9 @@ func runTestCase(ctx context.Context, client jsonrpc.Conn, method string, parall
 		return client.Do(ctx, nil, method, nil)
 	}
 	var wg sync.WaitGroup
-	errs := make(chan error, 10)
+	errs := make(chan error, 16)
 
-	for i := 0; i < 10; i++ {
+	for i := 0; i < 16; i++ {
 		wg.Add(1)
 		go func() {
 			defer wg.Done()
diff --git a/contrib/middleware/replacer.go b/contrib/middleware/replacer.go
index 749624b..96e461b 100644
--- a/contrib/middleware/replacer.go
+++ b/contrib/middleware/replacer.go
@@ -12,7 +12,9 @@ var LegacyUnderscoreReplacer = MethodReplacer(strings.NewReplacer("_", "/"))
 func MethodReplacer(replacer *strings.Replacer) jsonrpc.Middleware {
 	return func(next jsonrpc.Handler) jsonrpc.Handler {
 		return jsonrpc.HandlerFunc(func(w jsonrpc.ResponseWriter, r *jsonrpc.Request) {
-			r.Method = replacer.Replace(r.Method)
+			if strings.Contains(r.Method, "_") {
+				r.Method = replacer.Replace(r.Method)
+			}
 			next.ServeRPC(w, r)
 		})
 	}
diff --git a/pkg/jrpctest/server.go b/pkg/jrpctest/server.go
index b368172..b7ecf57 100644
--- a/pkg/jrpctest/server.go
+++ b/pkg/jrpctest/server.go
@@ -1,6 +1,7 @@
 package jrpctest
 
 import (
+	"encoding/json"
 	"strings"
 
 	jmux2 "gfx.cafe/open/jrpc/contrib/jmux"
@@ -40,9 +41,9 @@ func NewRouter() *jmux2.Mux {
 }
 
 func largeResp(length int) jsonrpc.HandlerFunc {
-	str := []byte(strings.Repeat("x", length))
+	str := json.RawMessage(`"` + strings.Repeat("x", length) + `"`)
 	return func(w jsonrpc.ResponseWriter, r *jsonrpc.Request) {
-		w.Send(string(str), nil)
+		w.Send(str, nil)
 	}
 }
 func NewRouterWithMaxSize(size int) *jmux2.Mux {
diff --git a/pkg/jsonrpc/message.go b/pkg/jsonrpc/message.go
index 6fb1559..161409e 100644
--- a/pkg/jsonrpc/message.go
+++ b/pkg/jsonrpc/message.go
@@ -3,9 +3,9 @@ package jsonrpc
 import (
 	"encoding/json"
 	"io"
+	"sync"
 
 	"golang.org/x/net/context"
-	"golang.org/x/sync/semaphore"
 )
 
 type MessageStreamer interface {
@@ -25,34 +25,32 @@ func flushIfFlusher(w io.Writer) error {
 // MessageStream is a writer used to write jsonrpc message to a stream
 type MessageStream struct {
 	w  io.Writer
-	mu *semaphore.Weighted
+	mu *sync.Mutex
 }
 
 func NewStream(w io.Writer) *MessageStream {
 	return &MessageStream{
 		w:  w,
-		mu: semaphore.NewWeighted(1),
+		mu: &sync.Mutex{},
 	}
 }
 
 // sends a flush in order to send an empty payload
 func (m *MessageStream) Flush(ctx context.Context) error {
-	err := m.mu.Acquire(ctx, 1)
-	if err != nil {
-		return err
+	if m.mu != nil {
+		m.mu.Lock()
+		defer m.mu.Unlock()
 	}
-	defer m.mu.Release(1)
 	return flushIfFlusher(m.w)
 }
 
 // ReadFrom calls io.Copy within the semaphore, then calls flush
 func (m *MessageStream) ReadFrom(ctx context.Context, r io.Reader) error {
-	err := m.mu.Acquire(ctx, 1)
-	if err != nil {
-		return err
+	if m.mu != nil {
+		m.mu.Lock()
+		defer m.mu.Unlock()
 	}
-	defer m.mu.Release(1)
-	_, err = io.Copy(m.w, r)
+	_, err := io.Copy(m.w, r)
 	if err != nil {
 		return err
 	}
@@ -61,7 +59,7 @@ func (m *MessageStream) ReadFrom(ctx context.Context, r io.Reader) error {
 
 type MessageWriter struct {
 	w  io.Writer
-	mu *semaphore.Weighted
+	mu *sync.Mutex
 }
 
 // NewMessage starts a new message and acquires the write lock.
@@ -69,15 +67,12 @@ type MessageWriter struct {
 // the lock MUST be closed if and only if err == nil
 func (m *MessageStream) NewMessage(ctx context.Context) (*MessageWriter, error) {
 	if m.mu != nil {
-		err := m.mu.Acquire(ctx, 1)
-		if err != nil {
-			return nil, err
-		}
+		m.mu.Lock()
 	}
 	_, err := m.w.Write([]byte(`{"jsonrpc":"2.0"`))
 	if err != nil {
 		if m.mu != nil {
-			m.mu.Release(1)
+			m.mu.Unlock()
 		}
 		return nil, err
 	}
@@ -91,7 +86,7 @@ func (m *MessageStream) NewMessage(ctx context.Context) (*MessageWriter, error)
 // it releases the write lock
 func (m *MessageWriter) Close() error {
 	if m.mu != nil {
-		defer m.mu.Release(1)
+		defer m.mu.Unlock()
 	}
 	_, err := m.w.Write([]byte("}"))
 	if err != nil {
@@ -132,7 +127,7 @@ func (m *MessageWriter) Params() (io.WriteCloser, error) {
 
 type BatchWriter struct {
 	w          io.Writer
-	mu         *semaphore.Weighted
+	mu         *sync.Mutex
 	ms         *MessageStream
 	isNotFirst bool
 }
@@ -149,15 +144,12 @@ func (w *writer) Write(p []byte) (n int, err error) {
 // caller MUST call Close() on the BatchWriter iff err == nil
 func (m *MessageStream) NewBatch(ctx context.Context) (*BatchWriter, error) {
 	if m.mu != nil {
-		err := m.mu.Acquire(ctx, 1)
-		if err != nil {
-			return nil, err
-		}
+		m.mu.Lock()
 	}
 	_, err := m.w.Write([]byte("["))
 	if err != nil {
 		if m.mu != nil {
-			m.mu.Release(1)
+			defer m.mu.Unlock()
 		}
 		return nil, err
 	}
@@ -168,6 +160,8 @@ func (m *MessageStream) NewBatch(ctx context.Context) (*BatchWriter, error) {
 			// when the messagestream creates its subwrites, they won't pass the interface check for Flush
 			// so they wont flush when they close.
 			w: &writer{m.w},
+			// the mutex is nil because we are going to use the mutex from the message stream.
+			mu: nil,
 		},
 		mu: m.mu,
 	}, nil
@@ -191,7 +185,7 @@ func (m *BatchWriter) NewMessage(ctx context.Context) (*MessageWriter, error) {
 // it releases the write lock
 func (m *BatchWriter) Close() error {
 	if m.mu != nil {
-		defer m.mu.Release(1)
+		defer m.mu.Unlock()
 	}
 	_, err := m.w.Write([]byte("]"))
 	if err != nil {
-- 
GitLab