From 129527e90c8b6d88a47b7ad25b8089b39202805d Mon Sep 17 00:00:00 2001 From: a <a@tuxpa.in> Date: Thu, 11 May 2023 19:08:59 -0500 Subject: [PATCH] wg --- server.go | 40 +++++++++++++++++++++++++++++++--------- 1 file changed, 31 insertions(+), 9 deletions(-) diff --git a/server.go b/server.go index 9977087..3f1d270 100644 --- a/server.go +++ b/server.go @@ -4,6 +4,7 @@ import ( "context" "io" "net/http" + "sync" "sync/atomic" "gfx.cafe/open/jrpc/codec" @@ -92,14 +93,35 @@ func (s *Server) ServeCodec(pctx context.Context, remote codec.ReaderWriter) { batch: batch, } for _, v := range msg { - env.responses = append(env.responses, - &callRespWriter{ - id: v.ID, - notifications: responder.toNotify, - header: remote.PeerInfo().HTTP.Headers, - }, - ) + rw := &callRespWriter{ + msg: v, + notifications: responder.toNotify, + header: remote.PeerInfo().HTTP.Headers, + } + env.responses = append(env.responses, rw) + } + + wg := sync.WaitGroup{} + wg.Add(len(msg)) + for _, vv := range env.responses { + v := vv + go func() { + if v.msg.ID == nil { + wg.Done() + } else { + defer wg.Done() + } + s.services.ServeRPC(vv, &Request{ + ctx: ctx, + ID: v.msg.ID, + Version: v.msg.Version, + Method: v.msg.Method, + Params: v.msg.Params, + Peer: remote.PeerInfo(), + }) + }() } + wg.Wait() responder.toSend <- env } } @@ -175,7 +197,7 @@ func (c *callResponder) send(ctx context.Context, env *callEnv) error { enc.FieldStart("jsonrpc") enc.Str("2.0") enc.FieldStart("id") - enc.Raw(v.id.RawMessage()) + enc.Raw(v.msg.ID.RawMessage()) err := v.dat(buf) if err != nil { enc.FieldStart("error") @@ -209,7 +231,7 @@ type notifyEnv struct { } type callRespWriter struct { - id *codec.ID + msg *codec.Message dat func(io.Writer) error err error skip bool -- GitLab