Newer
Older
"gfx.cafe/util/go/bufpool"
"github.com/go-faster/jx"
"github.com/goccy/go-json"
// it is in charge of calling the handler on the message object, the json encoding of responses, and dealing with batch semantics.
// a server can be used to listenandserve multiple codecs at a time
func (s *Server) serveBatch(ctx context.Context,
incoming []*codec.Message,
batch bool,
remote codec.ReaderWriter, responder *callResponder) error {
// if it is empty batch, send the empty batch error and immediately return
return responder.send(ctx, &callEnv{
pkt: &codec.Message{
ID: codec.NewNullIDPtr(),
Error: codec.NewInvalidRequestError("empty batch"),
},
// a nil incoming message means an empty response
if v == nil {
rw.msg = &codec.Message{ID: codec.NewNullIDPtr()}
rw.pkt = &codec.Message{ID: codec.NewNullIDPtr()}
continue
}
rw.msg = v
if v.ID == nil {
rw.pkt = &codec.Message{ID: codec.NewNullIDPtr()}
continue
}
rw.pkt = &codec.Message{ID: v.ID}
// early respond to nil requests
if v.msg == nil || len(v.msg.Method) == 0 {
v.pkt.Error = codec.NewInvalidRequestError("invalid request")
return
}
if v.msg.ID == nil || v.msg.ID.IsNull() {
// it's a notification, so we mark skip and we don't write anything for it
v.skip = true
return
}
// ServeCodec reads incoming requests from codec, calls the appropriate callback and writes
// the response back using the given codec. It will block until the codec is closed or the
// server is stopped. In either case the codec is closed when this function returns.
func (s *Server) ServeCodec(pctx context.Context, remote codec.ReaderWriter) error {
s.codecs.Add(remote)
defer s.codecs.Remove(remote)
responder := &callResponder{
// add a cancel to the context so we can cancel all the child tasks on return
ctx, cn := context.WithCancel(ContextWithPeerInfo(pctx, remote.PeerInfo()))
errch := make(chan error)
go func() {
for {
// read messages from the stream synchronously
incoming, batch, err := remote.ReadBatch(ctx)
if err != nil {
errch <- err
return
}
// process each in a goroutine
go func() {
// the only reason this should error is if
err = s.serveBatch(ctx, incoming, batch, remote, responder)
if err != nil {
errch <- err
return
}
}()
}()
// exit on either the first error, or the context closing.
select {
case <-ctx.Done():
return nil
case err := <-errch:
// perform a flush on error just in case there are dangling things to be sent, states to be cleaned up, etc.
// the connection is already dead, so at this point there are no rules, so this is okay to do i think
remote.Flush()
return err
// Stop stops reading new requests, waits for stopPendingRequestTimeout to allow pending
// requests to finish, then closes all codecs which will cancel pending requests and
// subscriptions.
func (s *Server) Stop() {
if atomic.CompareAndSwapInt32(&s.run, 1, 0) {
type notifyEnv struct {
method string
dat any
extra []codec.RequestField
}
func (c *callResponder) notify(ctx context.Context, env *notifyEnv) error {
enc.ResetWriter(c.remote)
defer jx.PutEncoder(enc)
//enc := jx.NewStreamingEncoder(c.remote, 4096)
msg := &codec.Message{}
var err error
// allocate a temp buffer for this packet
msg.ExtraFields = env.extra
// add the method
msg.Method = env.method
err = codec.MarshalMessage(msg, enc)
}
type callEnv struct {
responses []*callRespWriter
batch bool
func (c *callResponder) send(ctx context.Context, env *callEnv) (err error) {
// if all msgs in batch are notification, we trigger an allSkip and write nothing
if env.batch {
allSkip := true
for _, v := range env.responses {
if v.skip != true {
allSkip = false
}
}
if allSkip {
return nil
}
}
if env.batch {
enc.ArrStart()
}
for _, v := range env.responses {
msg := v.pkt
// if we are a batch AND we are supposed to skip, then continue
// this means that for a non-batch notification, we do not skip! this is to ensure we get always a "response" for http-like endpoints
// if there is no error, we try to marshal the result
if msg.Error == nil {
buf := bufpool.GetStd()
defer bufpool.PutStd(buf)
je := json.NewEncoder(buf)
err = je.EncodeWithOption(v.dat)
msg.Error = err
} else {
msg.Result = buf.Bytes()
msg.Result = bytes.TrimSuffix(msg.Result, []byte{'\n'})
}
// then marshal the whole message into the stream
err := codec.MarshalMessage(msg, enc)
if err != nil {
return err
}