diff --git a/contrib/subscription/engine.go b/contrib/subscription/engine.go index f2b2cff6a5fb8e4d73dc0a70bc2e4a7c62ddb974..0843b166d42d2231c5fb09a34032a763aaa36486 100644 --- a/contrib/subscription/engine.go +++ b/contrib/subscription/engine.go @@ -1,5 +1,53 @@ package subscription +import ( + "context" + "strings" + "sync" + + "gfx.cafe/open/jrpc" + "gfx.cafe/open/jrpc/pkg/codec" +) + +const MethodSubscribeSuffix = "_subscribe" +const MethodUnsusbcribeSuffix = "_unsubscribe" + +type Engine struct { + subscriptions map[SubID]*Notifier + mu sync.Mutex + idgen func() SubID +} + +func NewEngine() *Engine { + return &Engine{ + subscriptions: make(map[SubID]*Notifier), + idgen: randomIDGenerator(), + } +} + +func (e *Engine) Middleware() func(jrpc.Handler) jrpc.Handler { + return func(h jrpc.Handler) jrpc.Handler { + return codec.HandlerFunc(func(w codec.ResponseWriter, r *codec.Request) { + // its a subscription, so install a notification handler + if strings.HasSuffix(r.Method, MethodSubscribeSuffix) { + // create the notifier to inject into the context + n := &Notifier{ + h: w, + namespace: strings.TrimSuffix(r.Method, MethodSubscribeSuffix), + id: e.idgen(), + } + // get the subscription object + sub := n.createSubscription() + // now send the subscription id back + w.Send(sub, nil) + // then inject the notifier + r = r.WithContext(context.WithValue(r.Context(), notifierKey{}, n)) + } + h.ServeRPC(w, r) + }) + } +} + //// handleSubscribe processes *_subscribe method calls. //func (h *handler) handleSubscribe(cp *callProc, r *Request) *Response { // mw := NewReaderResponseWriterMsg(r.WithContext(cp.ctx)) diff --git a/contrib/subscription/subscription.go b/contrib/subscription/subscription.go index a3b2b66bacade40e967287503873309b2d39580b..d3c9748352a32edb21f7a4f471c8528246cbec4e 100644 --- a/contrib/subscription/subscription.go +++ b/contrib/subscription/subscription.go @@ -1,14 +1,15 @@ package subscription import ( - "container/list" "context" + "encoding/binary" "encoding/hex" "errors" - "reflect" "strings" "sync" + "sync/atomic" + "gfx.cafe/open/jrpc/pkg/codec" "gfx.cafe/util/go/frand" json "github.com/goccy/go-json" @@ -30,6 +31,7 @@ var ( ErrSubscriptionNotFound = errors.New("subscription not found") ) +var globalInc = atomic.Int64{} var globalGen = randomIDGenerator() type SubID string @@ -42,8 +44,9 @@ func NewID() SubID { // randomIDGenerator returns a function generates a random IDs. func randomIDGenerator() func() SubID { return func() SubID { - id := make([]byte, 16) + id := make([]byte, 32) frand.Read(id) + id = binary.LittleEndian.AppendUint64(id, uint64(globalInc.Add(1))) return encodeSubID(id) } } @@ -73,32 +76,22 @@ func NotifierFromContext(ctx context.Context) (*Notifier, bool) { // Notifier is tied to a RPC connection that supports subscriptions. // Server callbacks use the notifier to send notifications. type Notifier struct { - h *handler + h codec.ResponseWriter namespace string - mu sync.Mutex - sub *Subscription - buffer []json.RawMessage - callReturned bool - activated bool + mu sync.Mutex + sub *Subscription - idgen func() SubID + id SubID } // CreateSubscription returns a new subscription that is coupled to the // RPC connection. By default subscriptions are inactive and notifications // are dropped until the subscription is marked as active. This is done // by the RPC server after the subscription ID is send to the client. -func (n *Notifier) CreateSubscription() *Subscription { - n.mu.Lock() - defer n.mu.Unlock() - if n.sub != nil { - panic("can't create multiple subscriptions with Notifier") - } else if n.callReturned { - panic("can't create subscription after subscribe call has returned") - } +func (n *Notifier) createSubscription() *Subscription { n.sub = &Subscription{ - ID: n.idgen(), + ID: n.id, namespace: n.namespace, err: make(chan error, 1), } @@ -107,63 +100,19 @@ func (n *Notifier) CreateSubscription() *Subscription { // Notify sends a notification to the client with the given data as payload. // If an error occurs the RPC connection is closed and the error is returned. -func (n *Notifier) Notify(id SubID, data interface{}) error { +func (n *Notifier) Notify(data interface{}) error { enc, err := json.Marshal(data) if err != nil { return err } n.mu.Lock() defer n.mu.Unlock() - if n.sub == nil { - panic("can't Notify before subscription is created") - } else if n.sub.ID != id { - panic("Notify with wrong ID") - } - if n.activated { - return n.send(n.sub, enc) - } - n.buffer = append(n.buffer, enc) - return nil -} - -// Closed returns a channel that is closed when the RPC connection is closed. -// Deprecated: use subscription error channel -func (n *Notifier) Closed() <-chan interface{} { - return n.h.conn.Closed() -} - -// takeSubscription returns the subscription (if one has been created). No subscription can -// be created after this call. -func (n *Notifier) takeSubscription() *Subscription { - n.mu.Lock() - defer n.mu.Unlock() - n.callReturned = true - return n.sub -} - -// activate is called after the subscription ID was sent to client. Notifications are -// buffered before activation. This prevents notifications being sent to the client before -// the subscription ID is sent to the client. -func (n *Notifier) activate() error { - n.mu.Lock() - defer n.mu.Unlock() - - for _, data := range n.buffer { - if err := n.send(n.sub, data); err != nil { - return err - } - } - n.activated = true - return nil + return n.send(n.sub, enc) } func (n *Notifier) send(sub *Subscription, data json.RawMessage) error { params, _ := json.Marshal(&subscriptionResult{ID: string(sub.ID), Result: data}) - ctx := context.Background() - return n.h.conn.WriteJSON(ctx, &jsonrpcMessage{ - Method: n.namespace + notificationMethodSuffix, - Params: params, - }) + return n.h.Notify(n.namespace+notificationMethodSuffix, json.RawMessage(params)) } // A Subscription is created by a notifier and tied to that notifier. The client can use @@ -183,132 +132,3 @@ func (s *Subscription) Err() <-chan error { func (s *Subscription) MarshalJSON() ([]byte, error) { return json.Marshal(s.ID) } - -// ClientSubscription is a subscription established through the Client's Subscribe or -// EthSubscribe methods. -type ClientSubscription struct { - client *Client - etype reflect.Type - channel reflect.Value - namespace string - subid string - in chan json.RawMessage - - quitOnce sync.Once // ensures quit is closed once - quit chan struct{} // quit is closed when the subscription exits - errOnce sync.Once // ensures err is closed once - err chan error -} - -func newClientSubscription(c *Client, namespace string, channel reflect.Value) *ClientSubscription { - sub := &ClientSubscription{ - client: c, - namespace: namespace, - etype: channel.Type().Elem(), - channel: channel, - quit: make(chan struct{}), - err: make(chan error, 1), - in: make(chan json.RawMessage), - } - return sub -} - -// Err returns the subscription error channel. The intended use of Err is to schedule -// resubscription when the client connection is closed unexpectedly. -// -// The error channel receives a value when the subscription has ended due -// to an error. The received error is nil if Close has been called -// on the underlying client and no other error has occurred. -// -// The error channel is closed when Unsubscribe is called on the subscription. -func (sub *ClientSubscription) Err() <-chan error { - return sub.err -} - -// Unsubscribe unsubscribes the notification and closes the error channel. -// It can safely be called more than once. -func (sub *ClientSubscription) Unsubscribe() { - sub.quitWithError(true, nil) - sub.errOnce.Do(func() { close(sub.err) }) -} - -func (sub *ClientSubscription) quitWithError(unsubscribeServer bool, err error) { - sub.quitOnce.Do(func() { - // The dispatch loop won't be able to execute the unsubscribe call - // if it is blocked on deliver. Close sub.quit first because it - // unblocks deliver. - close(sub.quit) - if unsubscribeServer { - sub.requestUnsubscribe() - } - if err != nil { - if err == ErrClientQuit { - err = nil // Adhere to subscription semantics. - } - sub.err <- err - } - }) -} - -func (sub *ClientSubscription) deliver(result json.RawMessage) (ok bool) { - select { - case sub.in <- result: - return true - case <-sub.quit: - return false - } -} - -func (sub *ClientSubscription) start() { - sub.quitWithError(sub.forward()) -} - -func (sub *ClientSubscription) forward() (unsubscribeServer bool, err error) { - cases := []reflect.SelectCase{ - {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.quit)}, - {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.in)}, - {Dir: reflect.SelectSend, Chan: sub.channel}, - } - buffer := list.New() - defer buffer.Init() - for { - var chosen int - var recv reflect.Value - if buffer.Len() == 0 { - // Idle, omit send case. - chosen, recv, _ = reflect.Select(cases[:2]) - } else { - // Non-empty buffer, send the first queued item. - cases[2].Send = reflect.ValueOf(buffer.Front().Value) - chosen, recv, _ = reflect.Select(cases) - } - - switch chosen { - case 0: // <-sub.quit - return false, nil - case 1: // <-sub.in - val, err := sub.unmarshal(recv.Interface().(json.RawMessage)) - if err != nil { - return true, err - } - if buffer.Len() == maxClientSubscriptionBuffer { - return true, ErrSubscriptionQueueOverflow - } - buffer.PushBack(val) - case 2: // sub.channel<- - cases[2].Send = reflect.Value{} // Don't hold onto the value. - buffer.Remove(buffer.Front()) - } - } -} - -func (sub *ClientSubscription) unmarshal(result json.RawMessage) (interface{}, error) { - val := reflect.New(sub.etype) - err := json.Unmarshal(result, val.Interface()) - return val.Elem().Interface(), err -} - -func (sub *ClientSubscription) requestUnsubscribe() error { - var result interface{} - return sub.client.Call(context.Background(), &result, sub.namespace+unsubscribeMethodSuffix, sub.subid) -} diff --git a/pkg/codec/batch.go b/pkg/codec/batch.go new file mode 100644 index 0000000000000000000000000000000000000000..5fb0b3bee338726264b46ed29ac7db609f367849 --- /dev/null +++ b/pkg/codec/batch.go @@ -0,0 +1,17 @@ +package codec + +// BatchElem is an element in a batch request. +type BatchElem struct { + Method string + Params any + + IsNotification bool + + // The result is unmarshaled into this field. Result must be set to a + // non-nil pointer value of the desired type, otherwise the response will be + // discarded. + Result any + // Error is set if the server returns an error for this request, or if + // unmarshaling into Result fails. It is not set for I/O errors. + Error error +} diff --git a/pkg/codec/context.go b/pkg/codec/context.go new file mode 100644 index 0000000000000000000000000000000000000000..20a1c1a6f17ff6bf5547ce7725f0c6568c7a61a5 --- /dev/null +++ b/pkg/codec/context.go @@ -0,0 +1,24 @@ +package codec + +import "context" + +type clientContextKey struct{} + +// ClientFromContext retrieves the client from the context, if any. This can be used to perform +// 'reverse calls' in a handler method. +func ContextWithConn(ctx context.Context, c Conn) context.Context { + client, _ := ctx.Value(clientContextKey{}).(Conn) + return context.WithValue(ctx, clientContextKey{}, client) +} + +// ClientFromContext retrieves the client from the context, if any. This can be used to perform +// 'reverse calls' in a handler method. +func ConnFromContext(ctx context.Context) (Conn, bool) { + client, ok := ctx.Value(clientContextKey{}).(Conn) + return client, ok +} + +func StreamingConnFromContext(ctx context.Context) (StreamingConn, bool) { + client, ok := ctx.Value(clientContextKey{}).(StreamingConn) + return client, ok +} diff --git a/pkg/codec/handler.go b/pkg/codec/handler.go new file mode 100644 index 0000000000000000000000000000000000000000..cb518199a9b07160ec1896e69136410b701b20d1 --- /dev/null +++ b/pkg/codec/handler.go @@ -0,0 +1,13 @@ +package codec + +// http.handler, but for jrpc +type Handler interface { + ServeRPC(w ResponseWriter, r *Request) +} + +// http.HandlerFunc,but for jrpc +type HandlerFunc func(w ResponseWriter, r *Request) + +func (fn HandlerFunc) ServeRPC(w ResponseWriter, r *Request) { + (fn)(w, r) +} diff --git a/pkg/codec/jrpc.go b/pkg/codec/jrpc.go index da3f71db92d3f7550974a7145436709ed3195ba2..3dab02e915139db4ade07f7f35d8645dc4f6f7de 100644 --- a/pkg/codec/jrpc.go +++ b/pkg/codec/jrpc.go @@ -2,75 +2,28 @@ package codec import ( "context" - "net/http" + "io" ) -// http.handler, but for jrpc -type Handler interface { - ServeRPC(w ResponseWriter, r *Request) +type Doer interface { + Do(ctx context.Context, result any, method string, params any) error } -// http.HandlerFunc,but for jrpc -type HandlerFunc func(w ResponseWriter, r *Request) - -func (fn HandlerFunc) ServeRPC(w ResponseWriter, r *Request) { - (fn)(w, r) +type BatchCaller interface { + BatchCall(ctx context.Context, b ...*BatchElem) error } -// http.ResponseWriter interface, but for jrpc -type ResponseWriter interface { - Send(v any, err error) error - Option(k string, v any) - Header() http.Header - - Notify(method string, v any) error +type Notifier interface { + Notify(ctx context.Context, method string, params any) error } type Conn interface { - Do(ctx context.Context, result any, method string, params any) error - Notify(ctx context.Context, method string, params any) error - BatchCall(ctx context.Context, b ...*BatchElem) error - Close() error + Doer + BatchCaller + io.Closer } type StreamingConn interface { Conn - Handler -} - -// BatchElem is an element in a batch request. -type BatchElem struct { - Method string - Params any - - IsNotification bool - - // The result is unmarshaled into this field. Result must be set to a - // non-nil pointer value of the desired type, otherwise the response will be - // discarded. - Result any - // Error is set if the server returns an error for this request, or if - // unmarshaling into Result fails. It is not set for I/O errors. - Error error -} - -type clientContextKey struct{} - -// ClientFromContext retrieves the client from the context, if any. This can be used to perform -// 'reverse calls' in a handler method. -func ContextWithConn(ctx context.Context, c Conn) context.Context { - client, _ := ctx.Value(clientContextKey{}).(Conn) - return context.WithValue(ctx, clientContextKey{}, client) -} - -// ClientFromContext retrieves the client from the context, if any. This can be used to perform -// 'reverse calls' in a handler method. -func ConnFromContext(ctx context.Context) (Conn, bool) { - client, ok := ctx.Value(clientContextKey{}).(Conn) - return client, ok -} - -func StreamingConnFromContext(ctx context.Context) (StreamingConn, bool) { - client, ok := ctx.Value(clientContextKey{}).(StreamingConn) - return client, ok + Notifier } diff --git a/pkg/codec/reqresp.go b/pkg/codec/reqresp.go index 56928668c1ec82ee1e161c5a51a475ae0b2147c8..eba76c8ba8844f09768fe6029bef213af895cab0 100644 --- a/pkg/codec/reqresp.go +++ b/pkg/codec/reqresp.go @@ -2,10 +2,20 @@ package codec import ( "context" + "net/http" json "github.com/goccy/go-json" ) +// http.ResponseWriter interface, but for jrpc +type ResponseWriter interface { + Send(v any, err error) error + Option(k string, v any) + Header() http.Header + + Notify(method string, v any) error +} + type Response struct { Version Version `json:"jsonrpc,omitempty"` ID *ID `json:"id,omitempty"`