diff --git a/cmd/jrpcc/main.go b/cmd/jrpcc/main.go new file mode 100644 index 0000000000000000000000000000000000000000..4966aff55b61e9e4feec40dd294184d5fbd3aa2c --- /dev/null +++ b/cmd/jrpcc/main.go @@ -0,0 +1,118 @@ +package main + +import ( + "bufio" + "context" + "encoding/json" + "errors" + "fmt" + "os" + "strings" + + "gfx.cafe/open/jrpc" + "sigs.k8s.io/yaml" +) + +func main() { + scan := bufio.NewReader(os.Stdin) + ctx := context.Background() + c := &cli{} + c.silent(`commands: + [m]ethod, [d]ial, [s]end, [p]arams, [c]lient, [q]uit + `) + for { + if err := c.tick(ctx, scan); err != nil { + if !errors.Is(err, context.Canceled) { + c.silent("%s", err) + } + break + } + } +} + +type cli struct { + remote string + method string + params json.RawMessage + verbose bool + + conn jrpc.Conn +} + +func (c *cli) silent(s string, args ...any) { + if c.verbose { + fmt.Printf(s+"\n", args...) + } +} + +func (c *cli) tick(ctx context.Context, scan *bufio.Reader) error { + cmd, err := scan.ReadString('\n') + if err != nil { + return err + } + cmd = strings.TrimSpace(cmd) + if cmd == "" { + return nil + } + splt := strings.SplitN(cmd, " ", 2) + if len(splt) == 0 { + return nil + } + switch splt[0] { + case "d", "dial": + if len(splt) == 1 { + c.silent("need to specify url to dial to") + return nil + } + c.conn, err = jrpc.DialContext(ctx, splt[1]) + if err != nil { + return fmt.Errorf("failed to dial: %w", err) + } + c.remote = splt[1] + c.silent("connected to %s", splt[1]) + case "m", "method": + if len(splt) == 1 { + fmt.Println("need to specify method to use") + return nil + } + c.method = splt[1] + c.silent("method to %s", splt[1]) + case "p", "params": + nextExit := false + bts := []byte(splt[1]) + for { + tmp, err := scan.ReadBytes('\n') + if err != nil { + return err + } + if len(tmp) == 1 { + if nextExit { + break + } + nextExit = true + continue + } + bts = append(bts, tmp...) + } + var m any + err = yaml.Unmarshal(bts, &m) + if err != nil { + return err + } + c.params, _ = json.Marshal(m) + case "c", "client": + fmt.Printf( + ` +remote: %s +method: %s +params: %s +`, c.remote, c.method, c.params) + case "s", "send": + var res json.RawMessage + c.conn.Do(ctx, &res, c.method, c.params) + fmt.Println(string(res)) + case "q", "quit", "exit": + os.Exit(0) + } + return nil +} diff --git a/cmd/jrpcc/test.script b/cmd/jrpcc/test.script new file mode 100644 index 0000000000000000000000000000000000000000..ca4ba178cad010e479c95dbbc989dc10f13c6da5 --- /dev/null +++ b/cmd/jrpcc/test.script @@ -0,0 +1,6 @@ +d wss://mainnet.rpc.gfx.xyz +m eth_getBlockByNumber +p [1500000, true] + + +send diff --git a/contrib/codecs/http/client.go b/contrib/codecs/http/client.go index ea0b846cf8684533a8699cd2c9efb3ec33657c77..d3fb14f5a427d856845b1c22a0a6c8c1b8be7f77 100644 --- a/contrib/codecs/http/client.go +++ b/contrib/codecs/http/client.go @@ -41,6 +41,13 @@ type Client struct { id atomic.Int64 headers http.Header + + handler codec.Handler +} + +func (c *Client) Mount(h codec.Handler) codec.Conn { + c.handler = h + return c } func DialHTTP(target string) (*Client, error) { diff --git a/contrib/codecs/inproc/client.go b/contrib/codecs/inproc/client.go index 43cdb0f65b67b9cfc14fb47921326c62afcdad45..63df52950e7485aacb1b70867c1eadfee0d45c91 100644 --- a/contrib/codecs/inproc/client.go +++ b/contrib/codecs/inproc/client.go @@ -17,6 +17,11 @@ type Client struct { handler codec.Handler } +func (c *Client) Mount(h codec.Handler) codec.Conn { + c.handler = h + return c +} + func NewClient(c *Codec, handler codec.Handler) *Client { cl := &Client{ p: clientutil.NewIdReply(), @@ -39,9 +44,9 @@ func (c *Client) listen() error { v := msgs[i] id := v.ID.Number() if id == 0 { - //if c.handler != nil { - // c.handler.ServeRPC(w, r) - //} + if c.handler != nil { + c.handler.ServeRPC(w, r) + } continue } var err error diff --git a/contrib/subscription/conn.go b/contrib/extension/subscription/conn.go similarity index 78% rename from contrib/subscription/conn.go rename to contrib/extension/subscription/conn.go index 3655f25502d18c013931f4aae395496ee87de685..d4decaa72e469f19cba67472d940488b16cc10d7 100644 --- a/contrib/subscription/conn.go +++ b/contrib/extension/subscription/conn.go @@ -2,11 +2,14 @@ package subscription import ( "context" + "gfx.cafe/open/jrpc/pkg/codec" ) -type SubscriptionConn interface { +type Conn interface { codec.StreamingConn - Subscribe(ctx context.Context, namespace string, channel any, args ...any) (*ClientSubscription, error) } + +type ClientSubscription struct { +} diff --git a/contrib/subscription/engine.go b/contrib/extension/subscription/engine.go similarity index 79% rename from contrib/subscription/engine.go rename to contrib/extension/subscription/engine.go index 0843b166d42d2231c5fb09a34032a763aaa36486..daab5d5476e4dd1d30def4a6893b4d85bc96a1ff 100644 --- a/contrib/subscription/engine.go +++ b/contrib/extension/subscription/engine.go @@ -9,9 +9,6 @@ import ( "gfx.cafe/open/jrpc/pkg/codec" ) -const MethodSubscribeSuffix = "_subscribe" -const MethodUnsusbcribeSuffix = "_unsubscribe" - type Engine struct { subscriptions map[SubID]*Notifier mu sync.Mutex @@ -25,25 +22,57 @@ func NewEngine() *Engine { } } +func (e *Engine) closeSub(subid SubID) (bool, error) { + e.mu.Lock() + defer e.mu.Unlock() + val, ok := e.subscriptions[subid] + if ok { + val.err <- ErrSubscriptionClosed + close(val.err) + delete(e.subscriptions, subid) + } + if !ok { + return ok, ErrSubscriptionNotFound + } + return ok, nil +} + func (e *Engine) Middleware() func(jrpc.Handler) jrpc.Handler { return func(h jrpc.Handler) jrpc.Handler { return codec.HandlerFunc(func(w codec.ResponseWriter, r *codec.Request) { // its a subscription, so install a notification handler - if strings.HasSuffix(r.Method, MethodSubscribeSuffix) { + switch { + case strings.HasSuffix(r.Method, subscribeMethodSuffix): // create the notifier to inject into the context n := &Notifier{ h: w, - namespace: strings.TrimSuffix(r.Method, MethodSubscribeSuffix), + namespace: strings.TrimSuffix(r.Method, subscribeMethodSuffix), id: e.idgen(), + err: make(chan error, 1), } // get the subscription object - sub := n.createSubscription() + // add to the map + e.mu.Lock() + e.subscriptions[n.id] = n + e.mu.Unlock() // now send the subscription id back - w.Send(sub, nil) + w.Send(n, nil) // then inject the notifier r = r.WithContext(context.WithValue(r.Context(), notifierKey{}, n)) + h.ServeRPC(w, r) + case strings.HasSuffix(r.Method, unsubscribeMethodSuffix): + // read the subscription id to close + var subid SubID + err := r.ParamArray(subid) + if err != nil { + w.Send(false, err) + return + } + // close that sub + w.Send(e.closeSub(subid)) + default: + h.ServeRPC(w, r) } - h.ServeRPC(w, r) }) } } diff --git a/contrib/subscription/subscription.go b/contrib/extension/subscription/subscription.go similarity index 65% rename from contrib/subscription/subscription.go rename to contrib/extension/subscription/subscription.go index d3c9748352a32edb21f7a4f471c8528246cbec4e..d7bba481ebf9afd86796b43953b9593df4e7f724 100644 --- a/contrib/subscription/subscription.go +++ b/contrib/extension/subscription/subscription.go @@ -29,6 +29,8 @@ var ( ErrNotificationsUnsupported = errors.New("notifications not supported") // ErrNotificationNotFound is returned when the notification for the given id is not found ErrSubscriptionNotFound = errors.New("subscription not found") + // ErrNotificationNotFound is returned when the notification for the given id is not found + ErrSubscriptionClosed = errors.New("subscription not found") ) var globalInc = atomic.Int64{} @@ -79,23 +81,10 @@ type Notifier struct { h codec.ResponseWriter namespace string - mu sync.Mutex - sub *Subscription - - id SubID -} + mu sync.Mutex -// CreateSubscription returns a new subscription that is coupled to the -// RPC connection. By default subscriptions are inactive and notifications -// are dropped until the subscription is marked as active. This is done -// by the RPC server after the subscription ID is send to the client. -func (n *Notifier) createSubscription() *Subscription { - n.sub = &Subscription{ - ID: n.id, - namespace: n.namespace, - err: make(chan error, 1), - } - return n.sub + id SubID + err chan error // closed on unsubscribe } // Notify sends a notification to the client with the given data as payload. @@ -107,28 +96,10 @@ func (n *Notifier) Notify(data interface{}) error { } n.mu.Lock() defer n.mu.Unlock() - return n.send(n.sub, enc) + return n.send(enc) } -func (n *Notifier) send(sub *Subscription, data json.RawMessage) error { - params, _ := json.Marshal(&subscriptionResult{ID: string(sub.ID), Result: data}) +func (n *Notifier) send(data json.RawMessage) error { + params, _ := json.Marshal(&subscriptionResult{ID: string(n.id), Result: data}) return n.h.Notify(n.namespace+notificationMethodSuffix, json.RawMessage(params)) } - -// A Subscription is created by a notifier and tied to that notifier. The client can use -// this subscription to wait for an unsubscribe request for the client, see Err(). -type Subscription struct { - ID SubID - namespace string - err chan error // closed on unsubscribe -} - -// Err returns a channel that is closed when the client send an unsubscribe request. -func (s *Subscription) Err() <-chan error { - return s.err -} - -// MarshalJSON marshals a subscription as its ID. -func (s *Subscription) MarshalJSON() ([]byte, error) { - return json.Marshal(s.ID) -} diff --git a/pkg/clientutil/client_handler.go b/pkg/clientutil/client_handler.go new file mode 100644 index 0000000000000000000000000000000000000000..f26d68c1bc79242d60eeadfdf6938179a593617a --- /dev/null +++ b/pkg/clientutil/client_handler.go @@ -0,0 +1 @@ +package clientutil diff --git a/pkg/codec/jrpc.go b/pkg/codec/jrpc.go index 3dab02e915139db4ade07f7f35d8645dc4f6f7de..35cbfebb47424889b48df801ccab79d54ee66513 100644 --- a/pkg/codec/jrpc.go +++ b/pkg/codec/jrpc.go @@ -17,10 +17,16 @@ type Notifier interface { Notify(ctx context.Context, method string, params any) error } +type Mounter interface { + Mount(Handler) Conn +} + type Conn interface { Doer BatchCaller io.Closer + + Mounter } type StreamingConn interface {