Newer
Older
"gfx.cafe/util/go/bufpool"
"github.com/go-faster/jx"
"github.com/goccy/go-json"
// it is in charge of calling the handler on the message object, the json encoding of responses, and dealing with batch semantics.
// a server can be used to listenandserve multiple codecs at a time
// ServeCodec reads incoming requests from codec, calls the appropriate callback and writes
// the response back using the given codec. It will block until the codec is closed
func (s *Server) ServeCodec(ctx context.Context, remote jsonrpc.ReaderWriter) error {
// add a cancel to the context so we can cancel all the child tasks on return
ctx, cn := context.WithCancel(ContextWithPeerInfo(ctx, remote.PeerInfo()))
defer cn()
allErrs := []error{}
var mu sync.Mutex
wg := sync.WaitGroup{}
err := func() error {
for {
// read messages from the stream synchronously
incoming, batch, err := remote.ReadBatch(ctx)
if err != nil {
defer wg.Done()
responder := &callResponder{
remote: remote,
batch: batch,
mu: sema,
}
allErrs = append(allErrs, err)
if len(allErrs) > 0 {
return errors.Join(allErrs...)
r *callResponder,
) error {
if r.batch {
return s.serveBatch(ctx, incoming, r)
} else {
return s.serveSingle(ctx, incoming[0], r)
}
}
func (s *Server) serveSingle(ctx context.Context,
r *callResponder,
) error {
rw := &streamingRespWriter{
ctx: ctx,
cr: r,
}
rw.msg, rw.err = produceOutputMessage(incoming)
ctx,
rw.msg,
)
req.Peer = r.remote.PeerInfo()
if rw.msg.ID == nil {
// all notification, so immediately flush
err := r.mu.Acquire(ctx, 1)
if err != nil {
return err
}
defer r.mu.Release(1)
err = r.remote.Flush()
if err != nil {
return err
}
}
s.services.ServeRPC(rw, req)
if rw.sendCalled == false && rw.msg.ID != nil {
func produceOutputMessage(inputMessage *jsonrpc.Message) (out *jsonrpc.Message, err error) {
// a nil incoming message means return an invalid request.
if inputMessage == nil {
inputMessage = &jsonrpc.Message{ID: jsonrpc.NewNullIDPtr()}
err = jsonrpc.NewInvalidRequestError("invalid request")
}
out = inputMessage
out.Error = nil
// zero length method is always invalid request
if len(out.Method) == 0 {
// assume if the method is not there AND the id is not there that it's an invalid REQUEST not notification
// this makes sure we add 1 to totalRequests
if out.ID == nil {
err := r.mu.Acquire(ctx, 1)
if err != nil {
return err
}
defer r.mu.Release(1)
err = r.send(ctx, &callEnv{
id: jsonrpc.NewNullIDPtr(),
err: jsonrpc.NewInvalidRequestError("empty batch"),
err = r.remote.Flush()
if err != nil {
return err
}
return nil
// create a waitgroup for when every handler returns
returnWg := sync.WaitGroup{}
returnWg.Add(len(rs))
batchResults := []*batchingRespWriter{}
respWg := &sync.WaitGroup{}
respWg.Add(totalRequests)
if v.msg.ID != nil {
v.wg = respWg
batchResults = append(batchResults, v)
if totalRequests > 0 {
// TODO: channel?
respWg.Wait()
err := r.mu.Acquire(ctx, 1)
defer r.mu.Release(1)
// write them, one by one
_, err = r.remote.Write([]byte{'['})
if err != nil {
return err
}
for i, v := range batchResults {
err = r.send(ctx, &callEnv{
})
if err != nil {
return err
}
// write the comma or ]
char := ','
if i == len(batchResults)-1 {
char = ']'
}
_, err = r.remote.Write([]byte{byte(char)})
if err != nil {
return err
}
}
err = r.remote.Flush()
func (c *callResponder) send(ctx context.Context, env *callEnv) (err error) {
enc := jx.GetEncoder()
defer jx.PutEncoder(enc)
enc.Grow(4096)
enc.ResetWriter(c.remote)
enc.Obj(func(e *jx.Encoder) {
e.Field("jsonrpc", func(e *jx.Encoder) {
e.Str("2.0")
})
if env.id != nil {
e.Field("id", func(e *jx.Encoder) {
e.Raw(env.id.RawMessage())
})
})
} else {
// if there is no error, we try to marshal the result
e.Field("result", func(e *jx.Encoder) {
if env.v != nil {
default:
err = json.NewEncoder(e).EncodeWithOption(cast, func(eo *json.EncodeOption) {
eo.DisableNewline = true
})
}
} else {
e.Null()
if env.err == nil && err != nil {
e.Field("error", func(e *jx.Encoder) {
jsonrpc.EncodeError(e, err)
})
}
type notifyEnv struct {
method string
dat any
}
func (c *callResponder) notify(ctx context.Context, env *notifyEnv) (err error) {
// allocate a temp buffer for this packet
buf := bufpool.GetStd()
defer bufpool.PutStd(buf)
err = json.NewEncoder(buf).Encode(env.dat)
if err != nil {
msg.Error = err
} else {
msg.Params = buf.Bytes()
}
// add the method
msg.Method = env.method
enc := jx.GetEncoder()
defer jx.PutEncoder(enc)
enc.Grow(4096)
enc.ResetWriter(c.remote)