good morning!!!!

Skip to content
Snippets Groups Projects
Verified Commit c3863fbf authored by a's avatar a
Browse files

add semaphore

parent 9e13bfe7
Branches
Tags
No related merge requests found
Pipeline #32905 failed
...@@ -3,31 +3,46 @@ package jsonrpc ...@@ -3,31 +3,46 @@ package jsonrpc
import ( import (
"encoding/json" "encoding/json"
"io" "io"
"golang.org/x/net/context"
"golang.org/x/sync/semaphore"
) )
// MessageStream is a writer used to write jsonrpc message to a stream // MessageStream is a writer used to write jsonrpc message to a stream
type MessageStream struct { type MessageStream struct {
w io.Writer w io.Writer
mu *semaphore.Weighted
} }
func NewStream(w io.Writer) *MessageStream { func NewStream(w io.Writer) *MessageStream {
return &MessageStream{ return &MessageStream{
w: w, w: w,
mu: semaphore.NewWeighted(1),
} }
} }
func (m *MessageStream) NewMessage() (*MessageWriter, error) { // NewMessage starts a new message and acquires the write lock.
_, err := m.w.Write([]byte(`{"jsonrpc":"2.0"`)) // to free the write lock, you must call *MessageWriter.Close()
// the lock MUST be closed if and only if err != nil
func (m *MessageStream) NewMessage(ctx context.Context) (*MessageWriter, error) {
err := m.mu.Acquire(ctx, 1)
if err != nil {
return nil, err
}
_, err = m.w.Write([]byte(`{"jsonrpc":"2.0"`))
if err != nil { if err != nil {
m.mu.Release(1)
return nil, err return nil, err
} }
return &MessageWriter{ return &MessageWriter{
w: m.w, w: m.w,
mu: m.mu,
}, nil }, nil
} }
type MessageWriter struct { type MessageWriter struct {
w io.Writer w io.Writer
mu *semaphore.Weighted
} }
func (m *MessageWriter) Field(name string, value json.RawMessage) error { func (m *MessageWriter) Field(name string, value json.RawMessage) error {
...@@ -51,6 +66,8 @@ func (m *MessageWriter) Result() (io.Writer, error) { ...@@ -51,6 +66,8 @@ func (m *MessageWriter) Result() (io.Writer, error) {
return &ResultWriter{w: m.w}, nil return &ResultWriter{w: m.w}, nil
} }
// close must be called when you are done writing the message.
// it releases the write lock
func (m *MessageWriter) Close() error { func (m *MessageWriter) Close() error {
_, err := m.w.Write([]byte("}")) _, err := m.w.Write([]byte("}"))
return err return err
......
...@@ -293,7 +293,7 @@ type callEnv struct { ...@@ -293,7 +293,7 @@ type callEnv struct {
func (c *callResponder) send(ctx context.Context, env *callEnv) (err error) { func (c *callResponder) send(ctx context.Context, env *callEnv) (err error) {
w := c.remote w := c.remote
s, err := jsonrpc.NewStream(w).NewMessage() s, err := jsonrpc.NewStream(w).NewMessage(ctx)
if err != nil { if err != nil {
return err return err
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment