diff --git a/server.go b/server.go index 99770879b61094f5116a217cf5571ceb01f588e0..3f1d270ad55ae6d4f630e7f23b28986fe77ebf18 100644 --- a/server.go +++ b/server.go @@ -4,6 +4,7 @@ import ( "context" "io" "net/http" + "sync" "sync/atomic" "gfx.cafe/open/jrpc/codec" @@ -92,14 +93,35 @@ func (s *Server) ServeCodec(pctx context.Context, remote codec.ReaderWriter) { batch: batch, } for _, v := range msg { - env.responses = append(env.responses, - &callRespWriter{ - id: v.ID, - notifications: responder.toNotify, - header: remote.PeerInfo().HTTP.Headers, - }, - ) + rw := &callRespWriter{ + msg: v, + notifications: responder.toNotify, + header: remote.PeerInfo().HTTP.Headers, + } + env.responses = append(env.responses, rw) + } + + wg := sync.WaitGroup{} + wg.Add(len(msg)) + for _, vv := range env.responses { + v := vv + go func() { + if v.msg.ID == nil { + wg.Done() + } else { + defer wg.Done() + } + s.services.ServeRPC(vv, &Request{ + ctx: ctx, + ID: v.msg.ID, + Version: v.msg.Version, + Method: v.msg.Method, + Params: v.msg.Params, + Peer: remote.PeerInfo(), + }) + }() } + wg.Wait() responder.toSend <- env } } @@ -175,7 +197,7 @@ func (c *callResponder) send(ctx context.Context, env *callEnv) error { enc.FieldStart("jsonrpc") enc.Str("2.0") enc.FieldStart("id") - enc.Raw(v.id.RawMessage()) + enc.Raw(v.msg.ID.RawMessage()) err := v.dat(buf) if err != nil { enc.FieldStart("error") @@ -209,7 +231,7 @@ type notifyEnv struct { } type callRespWriter struct { - id *codec.ID + msg *codec.Message dat func(io.Writer) error err error skip bool