good morning!!!!

Skip to content
Snippets Groups Projects
Verified Commit 72623c1e authored by a's avatar a
Browse files

ads

parent ecae21c9
Branches
Tags
1 merge request!15Draft: V2
package main
import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"os"
"strings"
"gfx.cafe/open/jrpc"
"sigs.k8s.io/yaml"
)
func main() {
scan := bufio.NewReader(os.Stdin)
ctx := context.Background()
c := &cli{}
c.silent(`commands:
[m]ethod, [d]ial, [s]end, [p]arams, [c]lient, [q]uit
`)
for {
if err := c.tick(ctx, scan); err != nil {
if !errors.Is(err, context.Canceled) {
c.silent("%s", err)
}
break
}
}
}
type cli struct {
remote string
method string
params json.RawMessage
verbose bool
conn jrpc.Conn
}
func (c *cli) silent(s string, args ...any) {
if c.verbose {
fmt.Printf(s+"\n", args...)
}
}
func (c *cli) tick(ctx context.Context, scan *bufio.Reader) error {
cmd, err := scan.ReadString('\n')
if err != nil {
return err
}
cmd = strings.TrimSpace(cmd)
if cmd == "" {
return nil
}
splt := strings.SplitN(cmd, " ", 2)
if len(splt) == 0 {
return nil
}
switch splt[0] {
case "d", "dial":
if len(splt) == 1 {
c.silent("need to specify url to dial to")
return nil
}
c.conn, err = jrpc.DialContext(ctx, splt[1])
if err != nil {
return fmt.Errorf("failed to dial: %w", err)
}
c.remote = splt[1]
c.silent("connected to %s", splt[1])
case "m", "method":
if len(splt) == 1 {
fmt.Println("need to specify method to use")
return nil
}
c.method = splt[1]
c.silent("method to %s", splt[1])
case "p", "params":
nextExit := false
bts := []byte(splt[1])
for {
tmp, err := scan.ReadBytes('\n')
if err != nil {
return err
}
if len(tmp) == 1 {
if nextExit {
break
}
nextExit = true
continue
}
bts = append(bts, tmp...)
}
var m any
err = yaml.Unmarshal(bts, &m)
if err != nil {
return err
}
c.params, _ = json.Marshal(m)
case "c", "client":
fmt.Printf(
`
remote: %s
method: %s
params: %s
`, c.remote, c.method, c.params)
case "s", "send":
var res json.RawMessage
c.conn.Do(ctx, &res, c.method, c.params)
fmt.Println(string(res))
case "q", "quit", "exit":
os.Exit(0)
}
return nil
}
d wss://mainnet.rpc.gfx.xyz
m eth_getBlockByNumber
p [1500000, true]
send
......@@ -41,6 +41,13 @@ type Client struct {
id atomic.Int64
headers http.Header
handler codec.Handler
}
func (c *Client) Mount(h codec.Handler) codec.Conn {
c.handler = h
return c
}
func DialHTTP(target string) (*Client, error) {
......
......@@ -17,6 +17,11 @@ type Client struct {
handler codec.Handler
}
func (c *Client) Mount(h codec.Handler) codec.Conn {
c.handler = h
return c
}
func NewClient(c *Codec, handler codec.Handler) *Client {
cl := &Client{
p: clientutil.NewIdReply(),
......@@ -39,9 +44,9 @@ func (c *Client) listen() error {
v := msgs[i]
id := v.ID.Number()
if id == 0 {
//if c.handler != nil {
// c.handler.ServeRPC(w, r)
//}
if c.handler != nil {
c.handler.ServeRPC(w, r)
}
continue
}
var err error
......
......@@ -2,11 +2,14 @@ package subscription
import (
"context"
"gfx.cafe/open/jrpc/pkg/codec"
)
type SubscriptionConn interface {
type Conn interface {
codec.StreamingConn
Subscribe(ctx context.Context, namespace string, channel any, args ...any) (*ClientSubscription, error)
}
type ClientSubscription struct {
}
......@@ -9,9 +9,6 @@ import (
"gfx.cafe/open/jrpc/pkg/codec"
)
const MethodSubscribeSuffix = "_subscribe"
const MethodUnsusbcribeSuffix = "_unsubscribe"
type Engine struct {
subscriptions map[SubID]*Notifier
mu sync.Mutex
......@@ -25,25 +22,57 @@ func NewEngine() *Engine {
}
}
func (e *Engine) closeSub(subid SubID) (bool, error) {
e.mu.Lock()
defer e.mu.Unlock()
val, ok := e.subscriptions[subid]
if ok {
val.err <- ErrSubscriptionClosed
close(val.err)
delete(e.subscriptions, subid)
}
if !ok {
return ok, ErrSubscriptionNotFound
}
return ok, nil
}
func (e *Engine) Middleware() func(jrpc.Handler) jrpc.Handler {
return func(h jrpc.Handler) jrpc.Handler {
return codec.HandlerFunc(func(w codec.ResponseWriter, r *codec.Request) {
// its a subscription, so install a notification handler
if strings.HasSuffix(r.Method, MethodSubscribeSuffix) {
switch {
case strings.HasSuffix(r.Method, subscribeMethodSuffix):
// create the notifier to inject into the context
n := &Notifier{
h: w,
namespace: strings.TrimSuffix(r.Method, MethodSubscribeSuffix),
namespace: strings.TrimSuffix(r.Method, subscribeMethodSuffix),
id: e.idgen(),
err: make(chan error, 1),
}
// get the subscription object
sub := n.createSubscription()
// add to the map
e.mu.Lock()
e.subscriptions[n.id] = n
e.mu.Unlock()
// now send the subscription id back
w.Send(sub, nil)
w.Send(n, nil)
// then inject the notifier
r = r.WithContext(context.WithValue(r.Context(), notifierKey{}, n))
h.ServeRPC(w, r)
case strings.HasSuffix(r.Method, unsubscribeMethodSuffix):
// read the subscription id to close
var subid SubID
err := r.ParamArray(subid)
if err != nil {
w.Send(false, err)
return
}
// close that sub
w.Send(e.closeSub(subid))
default:
h.ServeRPC(w, r)
}
})
}
}
......
......@@ -29,6 +29,8 @@ var (
ErrNotificationsUnsupported = errors.New("notifications not supported")
// ErrNotificationNotFound is returned when the notification for the given id is not found
ErrSubscriptionNotFound = errors.New("subscription not found")
// ErrNotificationNotFound is returned when the notification for the given id is not found
ErrSubscriptionClosed = errors.New("subscription not found")
)
var globalInc = atomic.Int64{}
......@@ -80,22 +82,9 @@ type Notifier struct {
namespace string
mu sync.Mutex
sub *Subscription
id SubID
}
// CreateSubscription returns a new subscription that is coupled to the
// RPC connection. By default subscriptions are inactive and notifications
// are dropped until the subscription is marked as active. This is done
// by the RPC server after the subscription ID is send to the client.
func (n *Notifier) createSubscription() *Subscription {
n.sub = &Subscription{
ID: n.id,
namespace: n.namespace,
err: make(chan error, 1),
}
return n.sub
err chan error // closed on unsubscribe
}
// Notify sends a notification to the client with the given data as payload.
......@@ -107,28 +96,10 @@ func (n *Notifier) Notify(data interface{}) error {
}
n.mu.Lock()
defer n.mu.Unlock()
return n.send(n.sub, enc)
return n.send(enc)
}
func (n *Notifier) send(sub *Subscription, data json.RawMessage) error {
params, _ := json.Marshal(&subscriptionResult{ID: string(sub.ID), Result: data})
func (n *Notifier) send(data json.RawMessage) error {
params, _ := json.Marshal(&subscriptionResult{ID: string(n.id), Result: data})
return n.h.Notify(n.namespace+notificationMethodSuffix, json.RawMessage(params))
}
// A Subscription is created by a notifier and tied to that notifier. The client can use
// this subscription to wait for an unsubscribe request for the client, see Err().
type Subscription struct {
ID SubID
namespace string
err chan error // closed on unsubscribe
}
// Err returns a channel that is closed when the client send an unsubscribe request.
func (s *Subscription) Err() <-chan error {
return s.err
}
// MarshalJSON marshals a subscription as its ID.
func (s *Subscription) MarshalJSON() ([]byte, error) {
return json.Marshal(s.ID)
}
package clientutil
......@@ -17,10 +17,16 @@ type Notifier interface {
Notify(ctx context.Context, method string, params any) error
}
type Mounter interface {
Mount(Handler) Conn
}
type Conn interface {
Doer
BatchCaller
io.Closer
Mounter
}
type StreamingConn interface {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment