Newer
Older
"gfx.cafe/util/go/bufpool"
"github.com/go-faster/jx"
"github.com/goccy/go-json"
// it is in charge of calling the handler on the message object, the json encoding of responses, and dealing with batch semantics.
// a server can be used to listenandserve multiple codecs at a time
// ServeCodec reads incoming requests from codec, calls the appropriate callback and writes
// the response back using the given codec. It will block until the codec is closed
func (s *Server) ServeCodec(ctx context.Context, remote codec.ReaderWriter) error {
defer remote.Close()
// add a cancel to the context so we can cancel all the child tasks on return
ctx, cn := context.WithCancel(ContextWithPeerInfo(ctx, remote.PeerInfo()))
defer cn()
allErrs := []error{}
var mu sync.Mutex
wg := sync.WaitGroup{}
err := func() error {
for {
// read messages from the stream synchronously
incoming, batch, err := remote.ReadBatch(ctx)
if err != nil {
defer wg.Done()
responder := &callResponder{
remote: remote,
batch: batch,
mu: sema,
}
err = s.serveBatch(ctx, incoming, responder)
allErrs = append(allErrs, err)
if len(allErrs) > 0 {
return errors.Join(allErrs...)
func (s *Server) serveBatch(ctx context.Context,
incoming []*codec.Message,
err := r.mu.Acquire(ctx, 1)
if err != nil {
return err
}
defer r.mu.Release(1)
err = r.send(ctx, &callEnv{
id: codec.NewNullIDPtr(),
err: codec.NewInvalidRequestError("empty batch"),
err = r.remote.Flush()
if err != nil {
return err
}
return nil
}
var doneMu *semaphore.Weighted
doneMu = semaphore.NewWeighted(int64(totalRequests))
err := doneMu.Acquire(ctx, int64(totalRequests))
if err != nil {
return err
peerInfo := r.remote.PeerInfo()
batchResults := []*callRespWriter{}
for _, vRef := range rs {
if v.msg.ID != nil {
v.doneMu = doneMu
batchResults = append(batchResults, v)
}
}
err = doneMu.Acquire(ctx, int64(totalRequests))
if err != nil {
return err
}
defer r.mu.Release(1)
// write them, one by one
_, err = r.remote.Write([]byte{'['})
if err != nil {
return err
}
for i, v := range batchResults {
err: v.err,
id: v.msg.ID,
extrafields: v.msg.ExtraFields,
})
if err != nil {
return err
}
// write the comma or ]
char := ','
if i == len(batchResults)-1 {
char = ']'
}
_, err = r.remote.Write([]byte{byte(char)})
if err != nil {
return err
}
}
err = r.remote.Flush()
} else if totalRequests == 0 {
err = r.mu.Acquire(ctx, 1)
if err != nil {
return err
}
defer r.mu.Release(1)
err := r.remote.Flush()
if err != nil {
return err
}
type callResponder struct {
remote codec.ReaderWriter
mu *semaphore.Weighted
batch bool
batchStarted bool
}
func (c *callResponder) send(ctx context.Context, env *callEnv) (err error) {
enc := jx.GetEncoder()
defer jx.PutEncoder(enc)
enc.Grow(4096)
enc.ResetWriter(c.remote)
enc.Obj(func(e *jx.Encoder) {
e.Field("jsonrpc", func(e *jx.Encoder) {
e.Str("2.0")
})
if env.id != nil {
e.Field("id", func(e *jx.Encoder) {
e.Raw(env.id.RawMessage())
})
if env.extrafields != nil {
for k, v := range env.extrafields {
e.Field(k, func(e *jx.Encoder) {
e.Raw(v)
}
}
if env.err != nil {
e.Field("error", func(e *jx.Encoder) {
codec.EncodeError(e, env.err)
})
} else {
// if there is no error, we try to marshal the result
e.Field("result", func(e *jx.Encoder) {
if env.v != nil {
case json.RawMessage:
e.Raw(cast)
default:
err = json.NewEncoder(e).EncodeWithOption(cast, func(eo *json.EncodeOption) {
eo.DisableNewline = true
})
if err != nil {
return
// a json encoding error here is possibly fatal....
if err != nil {
return err
}
err = enc.Close()
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
type notifyEnv struct {
method string
dat any
extra codec.ExtraFields
}
func (c *callResponder) notify(ctx context.Context, env *notifyEnv) (err error) {
msg := &codec.Message{}
// allocate a temp buffer for this packet
buf := bufpool.GetStd()
defer bufpool.PutStd(buf)
err = json.NewEncoder(buf).Encode(env.dat)
if err != nil {
msg.Error = err
} else {
msg.Params = buf.Bytes()
}
msg.ExtraFields = env.extra
// add the method
msg.Method = env.method
enc := jx.GetEncoder()
defer jx.PutEncoder(enc)
enc.Grow(4096)
enc.ResetWriter(c.remote)
err = codec.MarshalMessage(msg, enc)
if err != nil {
return err
}
return enc.Close()
}