diff --git a/benchmark/suite_test.go b/benchmark/suite_test.go index 2837881cd37e83254124db56100280280e404313..deb7679318c5a9bc29d9d86163220ee77467308b 100644 --- a/benchmark/suite_test.go +++ b/benchmark/suite_test.go @@ -19,12 +19,12 @@ type testCase struct { } var testCases = []testCase{ - {"SingleClient", "small_largeResp", false}, - {"SingleClientMedium", "medium_largeResp", false}, - {"SingleClientLarge", "large_largeResp", false}, - {"ParallelClient", "small_largeResp", true}, - {"ParallelClientMedium", "medium_largeResp", true}, - {"ParallelClientLarge", "large_largeResp", true}, + {"SingleClient", "/small/largeResp", false}, + {"SingleClientMedium", "/medium/largeResp", false}, + {"SingleClientLarge", "/large/largeResp", false}, + {"ParallelClient", "/small/largeResp", true}, + {"ParallelClientMedium", "/medium/largeResp", true}, + {"ParallelClientLarge", "/large/largeResp", true}, } func runTestCase(ctx context.Context, client jsonrpc.Conn, method string, parallel bool) error { @@ -32,9 +32,9 @@ func runTestCase(ctx context.Context, client jsonrpc.Conn, method string, parall return client.Do(ctx, nil, method, nil) } var wg sync.WaitGroup - errs := make(chan error, 10) + errs := make(chan error, 16) - for i := 0; i < 10; i++ { + for i := 0; i < 16; i++ { wg.Add(1) go func() { defer wg.Done() diff --git a/contrib/middleware/replacer.go b/contrib/middleware/replacer.go index 749624bd5f79e5af6e8d511a168279246208d5e2..96e461b0bcfcf5190fbaebc19dccb844c6892634 100644 --- a/contrib/middleware/replacer.go +++ b/contrib/middleware/replacer.go @@ -12,7 +12,9 @@ var LegacyUnderscoreReplacer = MethodReplacer(strings.NewReplacer("_", "/")) func MethodReplacer(replacer *strings.Replacer) jsonrpc.Middleware { return func(next jsonrpc.Handler) jsonrpc.Handler { return jsonrpc.HandlerFunc(func(w jsonrpc.ResponseWriter, r *jsonrpc.Request) { - r.Method = replacer.Replace(r.Method) + if strings.Contains(r.Method, "_") { + r.Method = replacer.Replace(r.Method) + } next.ServeRPC(w, r) }) } diff --git a/pkg/jrpctest/server.go b/pkg/jrpctest/server.go index b368172da9aa3afe772b3912805c1d59a27317a8..b7ecf57d58672365b2df0483061cd3a2288422ea 100644 --- a/pkg/jrpctest/server.go +++ b/pkg/jrpctest/server.go @@ -1,6 +1,7 @@ package jrpctest import ( + "encoding/json" "strings" jmux2 "gfx.cafe/open/jrpc/contrib/jmux" @@ -40,9 +41,9 @@ func NewRouter() *jmux2.Mux { } func largeResp(length int) jsonrpc.HandlerFunc { - str := []byte(strings.Repeat("x", length)) + str := json.RawMessage(`"` + strings.Repeat("x", length) + `"`) return func(w jsonrpc.ResponseWriter, r *jsonrpc.Request) { - w.Send(string(str), nil) + w.Send(str, nil) } } func NewRouterWithMaxSize(size int) *jmux2.Mux { diff --git a/pkg/jsonrpc/message.go b/pkg/jsonrpc/message.go index 6fb1559d5bb23840c00e2fd00b3089ec3ffba20a..161409e953d18a7eb334459f16a0802574ba8951 100644 --- a/pkg/jsonrpc/message.go +++ b/pkg/jsonrpc/message.go @@ -3,9 +3,9 @@ package jsonrpc import ( "encoding/json" "io" + "sync" "golang.org/x/net/context" - "golang.org/x/sync/semaphore" ) type MessageStreamer interface { @@ -25,34 +25,32 @@ func flushIfFlusher(w io.Writer) error { // MessageStream is a writer used to write jsonrpc message to a stream type MessageStream struct { w io.Writer - mu *semaphore.Weighted + mu *sync.Mutex } func NewStream(w io.Writer) *MessageStream { return &MessageStream{ w: w, - mu: semaphore.NewWeighted(1), + mu: &sync.Mutex{}, } } // sends a flush in order to send an empty payload func (m *MessageStream) Flush(ctx context.Context) error { - err := m.mu.Acquire(ctx, 1) - if err != nil { - return err + if m.mu != nil { + m.mu.Lock() + defer m.mu.Unlock() } - defer m.mu.Release(1) return flushIfFlusher(m.w) } // ReadFrom calls io.Copy within the semaphore, then calls flush func (m *MessageStream) ReadFrom(ctx context.Context, r io.Reader) error { - err := m.mu.Acquire(ctx, 1) - if err != nil { - return err + if m.mu != nil { + m.mu.Lock() + defer m.mu.Unlock() } - defer m.mu.Release(1) - _, err = io.Copy(m.w, r) + _, err := io.Copy(m.w, r) if err != nil { return err } @@ -61,7 +59,7 @@ func (m *MessageStream) ReadFrom(ctx context.Context, r io.Reader) error { type MessageWriter struct { w io.Writer - mu *semaphore.Weighted + mu *sync.Mutex } // NewMessage starts a new message and acquires the write lock. @@ -69,15 +67,12 @@ type MessageWriter struct { // the lock MUST be closed if and only if err == nil func (m *MessageStream) NewMessage(ctx context.Context) (*MessageWriter, error) { if m.mu != nil { - err := m.mu.Acquire(ctx, 1) - if err != nil { - return nil, err - } + m.mu.Lock() } _, err := m.w.Write([]byte(`{"jsonrpc":"2.0"`)) if err != nil { if m.mu != nil { - m.mu.Release(1) + m.mu.Unlock() } return nil, err } @@ -91,7 +86,7 @@ func (m *MessageStream) NewMessage(ctx context.Context) (*MessageWriter, error) // it releases the write lock func (m *MessageWriter) Close() error { if m.mu != nil { - defer m.mu.Release(1) + defer m.mu.Unlock() } _, err := m.w.Write([]byte("}")) if err != nil { @@ -132,7 +127,7 @@ func (m *MessageWriter) Params() (io.WriteCloser, error) { type BatchWriter struct { w io.Writer - mu *semaphore.Weighted + mu *sync.Mutex ms *MessageStream isNotFirst bool } @@ -149,15 +144,12 @@ func (w *writer) Write(p []byte) (n int, err error) { // caller MUST call Close() on the BatchWriter iff err == nil func (m *MessageStream) NewBatch(ctx context.Context) (*BatchWriter, error) { if m.mu != nil { - err := m.mu.Acquire(ctx, 1) - if err != nil { - return nil, err - } + m.mu.Lock() } _, err := m.w.Write([]byte("[")) if err != nil { if m.mu != nil { - m.mu.Release(1) + defer m.mu.Unlock() } return nil, err } @@ -168,6 +160,8 @@ func (m *MessageStream) NewBatch(ctx context.Context) (*BatchWriter, error) { // when the messagestream creates its subwrites, they won't pass the interface check for Flush // so they wont flush when they close. w: &writer{m.w}, + // the mutex is nil because we are going to use the mutex from the message stream. + mu: nil, }, mu: m.mu, }, nil @@ -191,7 +185,7 @@ func (m *BatchWriter) NewMessage(ctx context.Context) (*MessageWriter, error) { // it releases the write lock func (m *BatchWriter) Close() error { if m.mu != nil { - defer m.mu.Release(1) + defer m.mu.Unlock() } _, err := m.w.Write([]byte("]")) if err != nil {