good morning!!!!

Skip to content
Snippets Groups Projects
Verified Commit ecae21c9 authored by a's avatar a
Browse files

subscription engine

parent 11aa22d7
No related branches found
No related tags found
1 merge request!15Draft: V2
package subscription
import (
"context"
"strings"
"sync"
"gfx.cafe/open/jrpc"
"gfx.cafe/open/jrpc/pkg/codec"
)
const MethodSubscribeSuffix = "_subscribe"
const MethodUnsusbcribeSuffix = "_unsubscribe"
type Engine struct {
subscriptions map[SubID]*Notifier
mu sync.Mutex
idgen func() SubID
}
func NewEngine() *Engine {
return &Engine{
subscriptions: make(map[SubID]*Notifier),
idgen: randomIDGenerator(),
}
}
func (e *Engine) Middleware() func(jrpc.Handler) jrpc.Handler {
return func(h jrpc.Handler) jrpc.Handler {
return codec.HandlerFunc(func(w codec.ResponseWriter, r *codec.Request) {
// its a subscription, so install a notification handler
if strings.HasSuffix(r.Method, MethodSubscribeSuffix) {
// create the notifier to inject into the context
n := &Notifier{
h: w,
namespace: strings.TrimSuffix(r.Method, MethodSubscribeSuffix),
id: e.idgen(),
}
// get the subscription object
sub := n.createSubscription()
// now send the subscription id back
w.Send(sub, nil)
// then inject the notifier
r = r.WithContext(context.WithValue(r.Context(), notifierKey{}, n))
}
h.ServeRPC(w, r)
})
}
}
//// handleSubscribe processes *_subscribe method calls.
//func (h *handler) handleSubscribe(cp *callProc, r *Request) *Response {
// mw := NewReaderResponseWriterMsg(r.WithContext(cp.ctx))
......
package subscription
import (
"container/list"
"context"
"encoding/binary"
"encoding/hex"
"errors"
"reflect"
"strings"
"sync"
"sync/atomic"
"gfx.cafe/open/jrpc/pkg/codec"
"gfx.cafe/util/go/frand"
json "github.com/goccy/go-json"
......@@ -30,6 +31,7 @@ var (
ErrSubscriptionNotFound = errors.New("subscription not found")
)
var globalInc = atomic.Int64{}
var globalGen = randomIDGenerator()
type SubID string
......@@ -42,8 +44,9 @@ func NewID() SubID {
// randomIDGenerator returns a function generates a random IDs.
func randomIDGenerator() func() SubID {
return func() SubID {
id := make([]byte, 16)
id := make([]byte, 32)
frand.Read(id)
id = binary.LittleEndian.AppendUint64(id, uint64(globalInc.Add(1)))
return encodeSubID(id)
}
}
......@@ -73,32 +76,22 @@ func NotifierFromContext(ctx context.Context) (*Notifier, bool) {
// Notifier is tied to a RPC connection that supports subscriptions.
// Server callbacks use the notifier to send notifications.
type Notifier struct {
h *handler
h codec.ResponseWriter
namespace string
mu sync.Mutex
sub *Subscription
buffer []json.RawMessage
callReturned bool
activated bool
idgen func() SubID
id SubID
}
// CreateSubscription returns a new subscription that is coupled to the
// RPC connection. By default subscriptions are inactive and notifications
// are dropped until the subscription is marked as active. This is done
// by the RPC server after the subscription ID is send to the client.
func (n *Notifier) CreateSubscription() *Subscription {
n.mu.Lock()
defer n.mu.Unlock()
if n.sub != nil {
panic("can't create multiple subscriptions with Notifier")
} else if n.callReturned {
panic("can't create subscription after subscribe call has returned")
}
func (n *Notifier) createSubscription() *Subscription {
n.sub = &Subscription{
ID: n.idgen(),
ID: n.id,
namespace: n.namespace,
err: make(chan error, 1),
}
......@@ -107,63 +100,19 @@ func (n *Notifier) CreateSubscription() *Subscription {
// Notify sends a notification to the client with the given data as payload.
// If an error occurs the RPC connection is closed and the error is returned.
func (n *Notifier) Notify(id SubID, data interface{}) error {
func (n *Notifier) Notify(data interface{}) error {
enc, err := json.Marshal(data)
if err != nil {
return err
}
n.mu.Lock()
defer n.mu.Unlock()
if n.sub == nil {
panic("can't Notify before subscription is created")
} else if n.sub.ID != id {
panic("Notify with wrong ID")
}
if n.activated {
return n.send(n.sub, enc)
}
n.buffer = append(n.buffer, enc)
return nil
}
// Closed returns a channel that is closed when the RPC connection is closed.
// Deprecated: use subscription error channel
func (n *Notifier) Closed() <-chan interface{} {
return n.h.conn.Closed()
}
// takeSubscription returns the subscription (if one has been created). No subscription can
// be created after this call.
func (n *Notifier) takeSubscription() *Subscription {
n.mu.Lock()
defer n.mu.Unlock()
n.callReturned = true
return n.sub
}
// activate is called after the subscription ID was sent to client. Notifications are
// buffered before activation. This prevents notifications being sent to the client before
// the subscription ID is sent to the client.
func (n *Notifier) activate() error {
n.mu.Lock()
defer n.mu.Unlock()
for _, data := range n.buffer {
if err := n.send(n.sub, data); err != nil {
return err
}
}
n.activated = true
return nil
}
func (n *Notifier) send(sub *Subscription, data json.RawMessage) error {
params, _ := json.Marshal(&subscriptionResult{ID: string(sub.ID), Result: data})
ctx := context.Background()
return n.h.conn.WriteJSON(ctx, &jsonrpcMessage{
Method: n.namespace + notificationMethodSuffix,
Params: params,
})
return n.h.Notify(n.namespace+notificationMethodSuffix, json.RawMessage(params))
}
// A Subscription is created by a notifier and tied to that notifier. The client can use
......@@ -183,132 +132,3 @@ func (s *Subscription) Err() <-chan error {
func (s *Subscription) MarshalJSON() ([]byte, error) {
return json.Marshal(s.ID)
}
// ClientSubscription is a subscription established through the Client's Subscribe or
// EthSubscribe methods.
type ClientSubscription struct {
client *Client
etype reflect.Type
channel reflect.Value
namespace string
subid string
in chan json.RawMessage
quitOnce sync.Once // ensures quit is closed once
quit chan struct{} // quit is closed when the subscription exits
errOnce sync.Once // ensures err is closed once
err chan error
}
func newClientSubscription(c *Client, namespace string, channel reflect.Value) *ClientSubscription {
sub := &ClientSubscription{
client: c,
namespace: namespace,
etype: channel.Type().Elem(),
channel: channel,
quit: make(chan struct{}),
err: make(chan error, 1),
in: make(chan json.RawMessage),
}
return sub
}
// Err returns the subscription error channel. The intended use of Err is to schedule
// resubscription when the client connection is closed unexpectedly.
//
// The error channel receives a value when the subscription has ended due
// to an error. The received error is nil if Close has been called
// on the underlying client and no other error has occurred.
//
// The error channel is closed when Unsubscribe is called on the subscription.
func (sub *ClientSubscription) Err() <-chan error {
return sub.err
}
// Unsubscribe unsubscribes the notification and closes the error channel.
// It can safely be called more than once.
func (sub *ClientSubscription) Unsubscribe() {
sub.quitWithError(true, nil)
sub.errOnce.Do(func() { close(sub.err) })
}
func (sub *ClientSubscription) quitWithError(unsubscribeServer bool, err error) {
sub.quitOnce.Do(func() {
// The dispatch loop won't be able to execute the unsubscribe call
// if it is blocked on deliver. Close sub.quit first because it
// unblocks deliver.
close(sub.quit)
if unsubscribeServer {
sub.requestUnsubscribe()
}
if err != nil {
if err == ErrClientQuit {
err = nil // Adhere to subscription semantics.
}
sub.err <- err
}
})
}
func (sub *ClientSubscription) deliver(result json.RawMessage) (ok bool) {
select {
case sub.in <- result:
return true
case <-sub.quit:
return false
}
}
func (sub *ClientSubscription) start() {
sub.quitWithError(sub.forward())
}
func (sub *ClientSubscription) forward() (unsubscribeServer bool, err error) {
cases := []reflect.SelectCase{
{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.quit)},
{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.in)},
{Dir: reflect.SelectSend, Chan: sub.channel},
}
buffer := list.New()
defer buffer.Init()
for {
var chosen int
var recv reflect.Value
if buffer.Len() == 0 {
// Idle, omit send case.
chosen, recv, _ = reflect.Select(cases[:2])
} else {
// Non-empty buffer, send the first queued item.
cases[2].Send = reflect.ValueOf(buffer.Front().Value)
chosen, recv, _ = reflect.Select(cases)
}
switch chosen {
case 0: // <-sub.quit
return false, nil
case 1: // <-sub.in
val, err := sub.unmarshal(recv.Interface().(json.RawMessage))
if err != nil {
return true, err
}
if buffer.Len() == maxClientSubscriptionBuffer {
return true, ErrSubscriptionQueueOverflow
}
buffer.PushBack(val)
case 2: // sub.channel<-
cases[2].Send = reflect.Value{} // Don't hold onto the value.
buffer.Remove(buffer.Front())
}
}
}
func (sub *ClientSubscription) unmarshal(result json.RawMessage) (interface{}, error) {
val := reflect.New(sub.etype)
err := json.Unmarshal(result, val.Interface())
return val.Elem().Interface(), err
}
func (sub *ClientSubscription) requestUnsubscribe() error {
var result interface{}
return sub.client.Call(context.Background(), &result, sub.namespace+unsubscribeMethodSuffix, sub.subid)
}
package codec
// BatchElem is an element in a batch request.
type BatchElem struct {
Method string
Params any
IsNotification bool
// The result is unmarshaled into this field. Result must be set to a
// non-nil pointer value of the desired type, otherwise the response will be
// discarded.
Result any
// Error is set if the server returns an error for this request, or if
// unmarshaling into Result fails. It is not set for I/O errors.
Error error
}
package codec
import "context"
type clientContextKey struct{}
// ClientFromContext retrieves the client from the context, if any. This can be used to perform
// 'reverse calls' in a handler method.
func ContextWithConn(ctx context.Context, c Conn) context.Context {
client, _ := ctx.Value(clientContextKey{}).(Conn)
return context.WithValue(ctx, clientContextKey{}, client)
}
// ClientFromContext retrieves the client from the context, if any. This can be used to perform
// 'reverse calls' in a handler method.
func ConnFromContext(ctx context.Context) (Conn, bool) {
client, ok := ctx.Value(clientContextKey{}).(Conn)
return client, ok
}
func StreamingConnFromContext(ctx context.Context) (StreamingConn, bool) {
client, ok := ctx.Value(clientContextKey{}).(StreamingConn)
return client, ok
}
package codec
// http.handler, but for jrpc
type Handler interface {
ServeRPC(w ResponseWriter, r *Request)
}
// http.HandlerFunc,but for jrpc
type HandlerFunc func(w ResponseWriter, r *Request)
func (fn HandlerFunc) ServeRPC(w ResponseWriter, r *Request) {
(fn)(w, r)
}
......@@ -2,75 +2,28 @@ package codec
import (
"context"
"net/http"
"io"
)
// http.handler, but for jrpc
type Handler interface {
ServeRPC(w ResponseWriter, r *Request)
type Doer interface {
Do(ctx context.Context, result any, method string, params any) error
}
// http.HandlerFunc,but for jrpc
type HandlerFunc func(w ResponseWriter, r *Request)
func (fn HandlerFunc) ServeRPC(w ResponseWriter, r *Request) {
(fn)(w, r)
type BatchCaller interface {
BatchCall(ctx context.Context, b ...*BatchElem) error
}
// http.ResponseWriter interface, but for jrpc
type ResponseWriter interface {
Send(v any, err error) error
Option(k string, v any)
Header() http.Header
Notify(method string, v any) error
type Notifier interface {
Notify(ctx context.Context, method string, params any) error
}
type Conn interface {
Do(ctx context.Context, result any, method string, params any) error
Notify(ctx context.Context, method string, params any) error
BatchCall(ctx context.Context, b ...*BatchElem) error
Close() error
Doer
BatchCaller
io.Closer
}
type StreamingConn interface {
Conn
Handler
}
// BatchElem is an element in a batch request.
type BatchElem struct {
Method string
Params any
IsNotification bool
// The result is unmarshaled into this field. Result must be set to a
// non-nil pointer value of the desired type, otherwise the response will be
// discarded.
Result any
// Error is set if the server returns an error for this request, or if
// unmarshaling into Result fails. It is not set for I/O errors.
Error error
}
type clientContextKey struct{}
// ClientFromContext retrieves the client from the context, if any. This can be used to perform
// 'reverse calls' in a handler method.
func ContextWithConn(ctx context.Context, c Conn) context.Context {
client, _ := ctx.Value(clientContextKey{}).(Conn)
return context.WithValue(ctx, clientContextKey{}, client)
}
// ClientFromContext retrieves the client from the context, if any. This can be used to perform
// 'reverse calls' in a handler method.
func ConnFromContext(ctx context.Context) (Conn, bool) {
client, ok := ctx.Value(clientContextKey{}).(Conn)
return client, ok
}
func StreamingConnFromContext(ctx context.Context) (StreamingConn, bool) {
client, ok := ctx.Value(clientContextKey{}).(StreamingConn)
return client, ok
Notifier
}
......@@ -2,10 +2,20 @@ package codec
import (
"context"
"net/http"
json "github.com/goccy/go-json"
)
// http.ResponseWriter interface, but for jrpc
type ResponseWriter interface {
Send(v any, err error) error
Option(k string, v any)
Header() http.Header
Notify(method string, v any) error
}
type Response struct {
Version Version `json:"jsonrpc,omitempty"`
ID *ID `json:"id,omitempty"`
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment