diff --git a/pkg/server/server.go b/pkg/server/server.go index 737af864ee92ec48d4fe0683946152aa56c080b5..6b83d31d46138577235c71a9fa959ff134cb9f1f 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -165,6 +165,7 @@ func (s *Server) Stop() { type callResponder struct { remote codec.ReaderWriter + mu sync.Mutex } type notifyEnv struct { @@ -174,6 +175,9 @@ type notifyEnv struct { } func (c *callResponder) notify(ctx context.Context, env *notifyEnv) error { + c.mu.Lock() + defer c.mu.Unlock() + defer c.remote.Flush() enc := jx.GetEncoder() enc.Grow(4096) enc.ResetWriter(c.remote) @@ -197,7 +201,11 @@ func (c *callResponder) notify(ctx context.Context, env *notifyEnv) error { if err != nil { return err } - return enc.Close() + err = enc.Close() + if err != nil { + return err + } + return nil } type callEnv struct { @@ -206,6 +214,9 @@ type callEnv struct { } func (c *callResponder) send(ctx context.Context, env *callEnv) (err error) { + c.mu.Lock() + defer c.mu.Unlock() + defer c.remote.Flush() // notification gets nothing // if all msgs in batch are notification, we trigger an allSkip and write nothing if env.batch {