diff --git a/benchmark/suite_test.go b/benchmark/suite_test.go index 6ecf448cb678cedf40dcbb7e5253e0449909b795..91e642336481a006a9f7b2fa991267544aeef14f 100644 --- a/benchmark/suite_test.go +++ b/benchmark/suite_test.go @@ -12,6 +12,25 @@ import ( "gfx.cafe/open/jrpc/pkg/server" ) +func TestBenchmarkSuite(t *testing.T) { + var executeTest = jrpctest.TestExecutor(rdwr.ServerMaker) + var makeTest = func(name string, fm jrpctest.TestContext) { + t.Run(name, func(t *testing.T) { + executeTest(t, fm) + }) + } + + ctx := context.Background() + makeTest("SingleClient", func(t *testing.T, server *server.Server, client codec.Conn) { + for i := 0; i < 10; i++ { + err := client.Do(ctx, nil, "test_ping", nil) + if err != nil { + t.Error(err) + } + } + }) +} + func runBenchmarkSuite(b *testing.B, sm jrpctest.ServerMaker) { ctx := context.Background() executeBench := jrpctest.BenchExecutor(sm) diff --git a/contrib/codecs/rdwr/codec.go b/contrib/codecs/rdwr/codec.go index c50d0cc12d372769c51aed475496857588c48600..752f64caed5469e8db30846019aaf2672b634a16 100644 --- a/contrib/codecs/rdwr/codec.go +++ b/contrib/codecs/rdwr/codec.go @@ -48,7 +48,8 @@ func (c *Codec) PeerInfo() codec.PeerInfo { func (c *Codec) decodeSingleMessage(ctx context.Context) (*serverutil.Bundle, error) { c.decLock.Lock() defer c.decLock.Unlock() - c.decBuf = c.decBuf[:0] + //c.decBuf = c.decBuf[:0] + c.decBuf = json.RawMessage{} err := c.dec.DecodeContext(ctx, &c.decBuf) if err != nil { return nil, err diff --git a/contrib/codecs/websocket/codec.go b/contrib/codecs/websocket/codec.go index bbbc4b3a299d16038c8dabf45d8590b191e9a155..43f1cdabc3bc84b47b25ba29078cd583e71b7142 100644 --- a/contrib/codecs/websocket/codec.go +++ b/contrib/codecs/websocket/codec.go @@ -3,7 +3,6 @@ package websocket import ( "context" "io" - "log" "net/http" "sync" "time" @@ -17,12 +16,6 @@ import ( "gfx.cafe/open/jrpc/pkg/serverutil" ) -func init() { - go func() { - log.Println(http.ListenAndServe("localhost:6060", nil)) - }() -} - type Codec struct { closed chan struct{} conn *websocket.Conn diff --git a/pkg/clientutil/idreply.go b/pkg/clientutil/idreply.go index bfb17d6198ca2eb744a714cd61779dcfecfbd03e..86c0e4ff86834049ff665b0fd1fe90efed222dd5 100644 --- a/pkg/clientutil/idreply.go +++ b/pkg/clientutil/idreply.go @@ -31,6 +31,22 @@ func (i *IdReply) NextId() *codec.ID { return codec.NewNumberIDPtr(i.id.Add(1)) } +func (i *IdReply) makeOrTake(id []byte) chan msgOrError { + i.mu.Lock() + defer i.mu.Unlock() + ch, ok := i.chs[string(id)] + if ok { + // take + delete(i.chs, string(id)) + } else { + // make + ch = make(chan msgOrError, 1) + i.chs[string(id)] = ch + } + return ch + +} + func (i *IdReply) make(id []byte) <-chan msgOrError { i.mu.Lock() defer i.mu.Unlock() @@ -54,7 +70,7 @@ func (i *IdReply) remove(id []byte) { } func (i *IdReply) Resolve(id []byte, msg json.RawMessage, err error) { - ch := i.take(id) + ch := i.makeOrTake(id) if ch == nil { return } @@ -73,7 +89,7 @@ func (i *IdReply) Resolve(id []byte, msg json.RawMessage, err error) { func (i *IdReply) Ask(ctx context.Context, id []byte) (json.RawMessage, error) { select { - case resp := <-i.make(id): + case resp := <-i.makeOrTake(id): return resp.msg, resp.err case <-ctx.Done(): i.remove(id) diff --git a/pkg/server/responsewriter.go b/pkg/server/responsewriter.go index 4a31120d8b2d46f8bc0d3dcb88d651cfb2388cf2..b353781b6feb26c9f61a17d417618342c5594e6d 100644 --- a/pkg/server/responsewriter.go +++ b/pkg/server/responsewriter.go @@ -2,10 +2,8 @@ package server import ( "context" - "log" "net/http" "sync" - "time" "gfx.cafe/open/jrpc/pkg/codec" "github.com/goccy/go-json" @@ -63,14 +61,10 @@ func (c *callRespWriter) Send(v any, e error) (err error) { } return nil } - s := time.Now() - log.Println("try") err = c.cr.mu.Acquire(c.ctx, 1) if err != nil { return err } - log.Println("release", time.Since(s)) - s2 := time.Now() defer c.cr.mu.Release(1) err = c.cr.send(c.ctx, &callEnv{ v: &v, @@ -78,7 +72,6 @@ func (c *callRespWriter) Send(v any, e error) (err error) { id: c.msg.ID, extrafields: c.msg.ExtraFields, }) - log.Println("release", time.Since(s2)) err = c.cr.remote.Flush() if err != nil { return err diff --git a/pkg/server/server.go b/pkg/server/server.go index 8ca1430a2a11d6a18f8a5c0dad6029a6047dea3a..1a187bfe152115ee05b30dc4ded452aa64c9b22c 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -68,7 +68,6 @@ func (s *Server) ServeCodec(ctx context.Context, remote codec.ReaderWriter) erro }() } }() - wg.Wait() allErrs = append(allErrs, err) if len(allErrs) > 0 { return errors.Join(allErrs...) @@ -143,13 +142,6 @@ func (s *Server) serveBatch(ctx context.Context, batchResults = append(batchResults, v) } } - // early respond to nil requests - if v.err != nil { - v.sendCalled = true - v.doneMu.Release(1) - wg.Done() - continue - } // now process each request in its own goroutine // TODO: stress test this. go func() { @@ -162,9 +154,7 @@ func (s *Server) serveBatch(ctx context.Context, s.services.ServeRPC(v, req) }() } - if r.batch { - // we only need to do this if this is a batch call with requests err = doneMu.Acquire(ctx, int64(totalRequests)) if err != nil { return err