Newer
Older
"gfx.cafe/util/go/bufpool"
"github.com/go-faster/jx"
"github.com/goccy/go-json"
// it is in charge of calling the handler on the message object, the json encoding of responses, and dealing with batch semantics.
// a server can be used to listenandserve multiple codecs at a time
// ServeCodec reads incoming requests from codec, calls the appropriate callback and writes
// the response back using the given codec. It will block until the codec is closed
func (s *Server) ServeCodec(ctx context.Context, remote codec.ReaderWriter) error {
defer remote.Close()
// add a cancel to the context so we can cancel all the child tasks on return
ctx, cn := context.WithCancel(ContextWithPeerInfo(ctx, remote.PeerInfo()))
defer cn()
allErrs := []error{}
var mu sync.Mutex
wg := sync.WaitGroup{}
err := func() error {
for {
// read messages from the stream synchronously
incoming, batch, err := remote.ReadBatch(ctx)
if err != nil {
defer wg.Done()
responder := &callResponder{
remote: remote,
batch: batch,
mu: sema,
}
err = s.serveBatch(ctx, incoming, responder)
allErrs = append(allErrs, err)
if len(allErrs) > 0 {
return errors.Join(allErrs...)
func (s *Server) serveBatch(ctx context.Context,
incoming []*codec.Message,
err := r.mu.Acquire(ctx, 1)
if err != nil {
return err
}
defer r.mu.Release(1)
err = r.send(ctx, &callEnv{
id: codec.NewNullIDPtr(),
err: codec.NewInvalidRequestError("empty batch"),
err = r.remote.Flush()
if err != nil {
return err
}
return nil
// assume if the method is not there AND the id is not there that it's an invalid REQUEST not notification
// this makes sure we add 1 to totalRequests
}
var doneMu *semaphore.Weighted
doneMu = semaphore.NewWeighted(int64(totalRequests))
err := doneMu.Acquire(ctx, int64(totalRequests))
if err != nil {
return err
peerInfo := r.remote.PeerInfo()
batchResults := []*callRespWriter{}
for _, vRef := range rs {
if v.msg.ID != nil {
v.doneMu = doneMu
batchResults = append(batchResults, v)
}
}
err = doneMu.Acquire(ctx, int64(totalRequests))
if err != nil {
return err
}
defer r.mu.Release(1)
// write them, one by one
_, err = r.remote.Write([]byte{'['})
if err != nil {
return err
}
for i, v := range batchResults {
err: v.err,
id: v.msg.ID,
extrafields: v.msg.ExtraFields,
})
if err != nil {
return err
}
// write the comma or ]
char := ','
if i == len(batchResults)-1 {
char = ']'
}
_, err = r.remote.Write([]byte{byte(char)})
if err != nil {
return err
}
}
err = r.remote.Flush()
} else if totalRequests == 0 {
err = r.mu.Acquire(ctx, 1)
if err != nil {
return err
}
defer r.mu.Release(1)
err := r.remote.Flush()
if err != nil {
return err
}
type callResponder struct {
remote codec.ReaderWriter
mu *semaphore.Weighted
batch bool
batchStarted bool
}
func (c *callResponder) send(ctx context.Context, env *callEnv) (err error) {
enc := jx.GetEncoder()
defer jx.PutEncoder(enc)
enc.Grow(4096)
enc.ResetWriter(c.remote)
enc.Obj(func(e *jx.Encoder) {
e.Field("jsonrpc", func(e *jx.Encoder) {
e.Str("2.0")
})
if env.id != nil {
e.Field("id", func(e *jx.Encoder) {
e.Raw(env.id.RawMessage())
})
if env.extrafields != nil {
for k, v := range env.extrafields {
e.Field(k, func(e *jx.Encoder) {
e.Raw(v)
}
}
if env.err != nil {
e.Field("error", func(e *jx.Encoder) {
codec.EncodeError(e, env.err)
})
} else {
// if there is no error, we try to marshal the result
e.Field("result", func(e *jx.Encoder) {
if env.v != nil {
case json.RawMessage:
e.Raw(cast)
default:
err = json.NewEncoder(e).EncodeWithOption(cast, func(eo *json.EncodeOption) {
eo.DisableNewline = true
})
if err != nil {
return
// a json encoding error here is possibly fatal....
if err != nil {
return err
}
err = enc.Close()
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
type notifyEnv struct {
method string
dat any
extra codec.ExtraFields
}
func (c *callResponder) notify(ctx context.Context, env *notifyEnv) (err error) {
msg := &codec.Message{}
// allocate a temp buffer for this packet
buf := bufpool.GetStd()
defer bufpool.PutStd(buf)
err = json.NewEncoder(buf).Encode(env.dat)
if err != nil {
msg.Error = err
} else {
msg.Params = buf.Bytes()
}
msg.ExtraFields = env.extra
// add the method
msg.Method = env.method
enc := jx.GetEncoder()
defer jx.PutEncoder(enc)
enc.Grow(4096)
enc.ResetWriter(c.remote)
err = codec.MarshalMessage(msg, enc)
if err != nil {
return err
}
return enc.Close()
}