Newer
Older
return
}
if s.Tracing.ErrorLogger != nil {
s.Tracing.ErrorLogger(remote, err)
func (s *Server) codecLoop(ctx context.Context, remote codec.ReaderWriter, responder *callResponder) error {
if err != nil {
remote.Flush()
s.printError(remote, err)
return err
}
env := &callEnv{
batch: batch,
}
// if it is empty batch, send the empty batch warning
responder.toSend <- &callEnv{
responses: []*callRespWriter{{
pkt: &codec.Message{
ID: codec.NewNullIDPtr(),
Error: codec.NewInvalidRequestError("empty batch"),
},
pkt: &codec.Message{
ID: codec.NewNullIDPtr(),
},
msg: &codec.Message{
ID: codec.NewNullIDPtr(),
},
notifications: responder.toNotify,
header: remote.PeerInfo().HTTP.Headers,
}
if v.msg.ID == nil || v.msg.ID.IsNull() {
// it's a notification, so we mark skip and we don't write anything for it
v.skip = true
wg.Done()
continue
}
// ServeCodec reads incoming requests from codec, calls the appropriate callback and writes
// the response back using the given codec. It will block until the codec is closed or the
// server is stopped. In either case the codec is closed.
func (s *Server) ServeCodec(pctx context.Context, remote codec.ReaderWriter) {
// Don't serve if server is stopped.
if atomic.LoadInt32(&s.run) == 0 {
return
}
// Add the codec to the set so it can be closed by Stop.
s.codecs.Add(remote)
defer s.codecs.Remove(remote)
responder := &callResponder{
toSend: make(chan *callEnv, 8),
toNotify: make(chan *notifyEnv, 8),
remote: remote,
}
ctx, cn := context.WithCancel(pctx)
defer cn()
go func() {
defer cn()
err := responder.run(ctx)
if err != nil {
s.printError(remote, err)
}
if err != nil {
s.printError(remote, err)
return
}
}
}
// Stop stops reading new requests, waits for stopPendingRequestTimeout to allow pending
// requests to finish, then closes all codecs which will cancel pending requests and
// subscriptions.
func (s *Server) Stop() {
if atomic.CompareAndSwapInt32(&s.run, 1, 0) {
s.codecs.Each(func(c any) bool {
c.(codec.ReaderWriter).Close()
return true
})
}
}
type callResponder struct {
toSend chan *callEnv
toNotify chan *notifyEnv
}
func (c *callResponder) run(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case env := <-c.toSend:
err := c.send(ctx, env)
if err != nil {
return err
}
case env := <-c.toNotify:
err := c.notify(ctx, env)
if err != nil {
return err
}
}
type notifyEnv struct {
method string
dat any
extra []codec.RequestField
}
func (c *callResponder) notify(ctx context.Context, env *notifyEnv) error {
enc := jx.GetEncoder()
if cap(enc.Bytes()) < 4096 {
enc.SetBytes(make([]byte, 0, 4096))
}
enc.ResetWriter(c.remote)
defer jx.PutEncoder(enc)
//enc := jx.NewStreamingEncoder(c.remote, 4096)
msg := &codec.Message{}
var err error
// allocate a temp buffer for this packet
msg.ExtraFields = env.extra
// add the method
msg.Method = env.method
err = codec.MarshalMessage(msg, enc)
return enc.Close()
}
type callEnv struct {
responses []*callRespWriter
batch bool
func (c *callResponder) send(ctx context.Context, env *callEnv) (err error) {
// if all msgs in batch are notification, we trigger an allSkip and write nothing
if env.batch {
allSkip := true
for _, v := range env.responses {
if v.skip != true {
allSkip = false
}
}
if allSkip {
return nil
}
}
enc := jx.GetEncoder()
if cap(enc.Bytes()) < 4096 {
enc.SetBytes(make([]byte, 0, 4096))
}
enc.ResetWriter(c.remote)
defer jx.PutEncoder(enc)
if env.batch {
enc.ArrStart()
}
for _, v := range env.responses {
msg := v.pkt
// if we are a batch AND we are supposed to skip, then continue
// this means that for a non-batch notification, we do not skip!
// if there is no error, we try to marshal the result
if msg.Error == nil {
buf := bufpool.GetStd()
defer bufpool.PutStd(buf)
je := json.NewEncoder(buf)
err = je.EncodeWithOption(v.dat)
msg.Error = err
} else {
msg.Result = buf.Bytes()
msg.Result = bytes.TrimSuffix(msg.Result, []byte{'\n'})
}
// then marshal the whole message into the stream
err := codec.MarshalMessage(msg, enc)
if err != nil {
return err
}