Newer
Older
"gfx.cafe/util/go/bufpool"
"github.com/go-faster/jx"
"github.com/goccy/go-json"
// it is in charge of calling the handler on the message object, the json encoding of responses, and dealing with batch semantics.
// a server can be used to listenandserve multiple codecs at a time
// ServeCodec reads incoming requests from codec, calls the appropriate callback and writes
// the response back using the given codec. It will block until the codec is closed
func (s *Server) ServeCodec(ctx context.Context, remote codec.ReaderWriter) error {
defer remote.Close()
// add a cancel to the context so we can cancel all the child tasks on return
ctx, cn := context.WithCancel(ContextWithPeerInfo(ctx, remote.PeerInfo()))
defer cn()
allErrs := []error{}
var mu sync.Mutex
wg := sync.WaitGroup{}
err := func() error {
for {
// read messages from the stream synchronously
incoming, batch, err := remote.ReadBatch(ctx)
if err != nil {
defer wg.Done()
responder := &callResponder{
remote: remote,
batch: batch,
mu: sema,
}
err = s.serveBatch(ctx, incoming, responder)
// remote.Flush()
mu.Lock()
defer mu.Unlock()
allErrs = append(allErrs, err)
allErrs = append(allErrs, err)
if len(allErrs) > 0 {
return errors.Join(allErrs...)
func (s *Server) serveBatch(ctx context.Context,
incoming []*codec.Message,
err := r.send(ctx, &callEnv{
pkt: &codec.Message{
ID: codec.NewNullIDPtr(),
Error: codec.NewInvalidRequestError("empty batch"),
},
// a nil incoming message means an empty response
if v == nil {
rw.msg = &codec.Message{ID: codec.NewNullIDPtr()}
continue
}
rw.msg = v
}
var doneMu *semaphore.Weighted
doneMu = semaphore.NewWeighted(int64(totalRequests))
err := doneMu.Acquire(ctx, int64(totalRequests))
if err != nil {
return err
peerInfo := r.remote.PeerInfo()
isBatchWithRequests := totalRequests > 1 && !r.batch
batchResults := []*callRespWriter{}
for _, vRef := range rs {
v.doneMu = doneMu
if isBatchWithRequests {
v.noStream = true
batchResults = append(batchResults, v)
}
// now process each request in its own goroutine
// TODO: stress test this.
// we only need to do this if this is a batch call with requests
// first we need to wait for every single request to be completed
err = doneMu.Acquire(ctx, int64(totalRequests))
if err != nil {
return err
}
if isBatchWithRequests {
err = r.mu.Acquire(ctx, 1)
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
defer r.mu.Release(1)
// write them, one by one
_, err = r.remote.Write([]byte{'['})
if err != nil {
return err
}
for i, v := range batchResults {
err = r.send(ctx, &callEnv{
v: v.payload,
err: v.err,
id: v.msg.ID,
extrafields: v.msg.ExtraFields,
})
if err != nil {
return err
}
// write the comma or ]
char := ','
if i == len(batchResults)-1 {
char = ']'
}
_, err = r.remote.Write([]byte{byte(char)})
if err != nil {
return err
}
}
err = r.remote.Flush()
type callResponder struct {
remote codec.ReaderWriter
mu *semaphore.Weighted
batch bool
batchStarted bool
}
v any
err error
pkt *codec.Message
id *codec.ID
extrafields codec.ExtraFields
func (c *callResponder) send(ctx context.Context, env *callEnv) (err error) {
enc := jx.GetEncoder()
defer jx.PutEncoder(enc)
enc.Grow(4096)
enc.ResetWriter(c.remote)
enc.Obj(func(e *jx.Encoder) {
e.Field("jsonrpc", func(e *jx.Encoder) {
e.Str("2.0")
})
if env.id != nil {
e.Field("id", func(e *jx.Encoder) {
e.Raw(env.id.RawMessage())
})
if env.extrafields != nil {
for k, v := range env.extrafields {
e.Field(k, func(e *jx.Encoder) {
e.Raw(v)
}
}
if env.err != nil {
e.Field("error", func(e *jx.Encoder) {
codec.EncodeError(e, env.err)
})
} else {
// if there is no error, we try to marshal the result
e.Field("result", func(e *jx.Encoder) {
if env.v != nil {
switch cast := env.v.(type) {
case json.RawMessage:
e.Raw(cast)
default:
err = json.NewEncoder(e).EncodeWithOption(cast, func(eo *json.EncodeOption) {
eo.DisableNewline = true
})
if err != nil {
return
// a json encoding error here is possibly fatal....
if err != nil {
return err
}
err = enc.Close()
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
type notifyEnv struct {
method string
dat any
extra codec.ExtraFields
}
func (c *callResponder) notify(ctx context.Context, env *notifyEnv) (err error) {
msg := &codec.Message{}
// allocate a temp buffer for this packet
buf := bufpool.GetStd()
defer bufpool.PutStd(buf)
err = json.NewEncoder(buf).Encode(env.dat)
if err != nil {
msg.Error = err
} else {
msg.Params = buf.Bytes()
}
msg.ExtraFields = env.extra
// add the method
msg.Method = env.method
enc := jx.GetEncoder()
defer jx.PutEncoder(enc)
enc.Grow(4096)
enc.ResetWriter(c.remote)
err = codec.MarshalMessage(msg, enc)
if err != nil {
return err
}
return enc.Close()
}