good morning!!!!

Skip to content
Commits on Source (25)
package main
import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"os"
"strings"
"gfx.cafe/open/jrpc"
"sigs.k8s.io/yaml"
)
func main() {
scan := bufio.NewReader(os.Stdin)
ctx := context.Background()
c := &cli{}
c.silent(`commands:
[m]ethod, [d]ial, [s]end, [p]arams, [c]lient, [q]uit
`)
for {
if err := c.tick(ctx, scan); err != nil {
if !errors.Is(err, context.Canceled) {
c.silent("%s", err)
}
break
}
}
}
type cli struct {
remote string
method string
params json.RawMessage
verbose bool
conn jrpc.Conn
}
func (c *cli) silent(s string, args ...any) {
if c.verbose {
fmt.Printf(s+"\n", args...)
}
}
func (c *cli) tick(ctx context.Context, scan *bufio.Reader) error {
cmd, err := scan.ReadString('\n')
if err != nil {
return err
}
cmd = strings.TrimSpace(cmd)
if cmd == "" {
return nil
}
splt := strings.SplitN(cmd, " ", 2)
if len(splt) == 0 {
return nil
}
switch splt[0] {
case "d", "dial":
if len(splt) == 1 {
c.silent("need to specify url to dial to")
return nil
}
c.conn, err = jrpc.DialContext(ctx, splt[1])
if err != nil {
return fmt.Errorf("failed to dial: %w", err)
}
c.remote = splt[1]
c.silent("connected to %s", splt[1])
case "m", "method":
if len(splt) == 1 {
fmt.Println("need to specify method to use")
return nil
}
c.method = splt[1]
c.silent("method to %s", splt[1])
case "p", "params":
nextExit := false
bts := []byte(splt[1])
for {
tmp, err := scan.ReadBytes('\n')
if err != nil {
return err
}
if len(tmp) == 1 {
if nextExit {
break
}
nextExit = true
continue
}
bts = append(bts, tmp...)
}
var m any
err = yaml.Unmarshal(bts, &m)
if err != nil {
return err
}
c.params, _ = json.Marshal(m)
case "c", "client":
fmt.Printf(
`
remote: %s
method: %s
params: %s
`, c.remote, c.method, c.params)
case "s", "send":
var res json.RawMessage
c.conn.Do(ctx, &res, c.method, c.params)
fmt.Println(string(res))
case "q", "quit", "exit":
os.Exit(0)
}
return nil
}
d wss://mainnet.rpc.gfx.xyz
m eth_getBlockByNumber
p [1500000, true]
send
......@@ -4,8 +4,23 @@ import (
"gfx.cafe/open/jrpc/contrib/codecs/http"
"gfx.cafe/open/jrpc/contrib/codecs/inproc"
"gfx.cafe/open/jrpc/contrib/codecs/websocket"
"gfx.cafe/open/jrpc/pkg/server"
gohttp "net/http"
)
var NewInProc = inproc.NewCodec
var WebsocketHandler = websocket.WebsocketHandler
var HttpHandler = http.HttpHandler
var HttpWebsocketHandler = func(srv *server.Server, origins []string) gohttp.Handler {
cwss := WebsocketHandler(srv, origins)
chttp := HttpHandler(srv)
return gohttp.HandlerFunc(func(w gohttp.ResponseWriter, r *gohttp.Request) {
if r.Header.Get("upgrade") != "" {
cwss.ServeHTTP(w, r)
return
}
chttp.ServeHTTP(w, r)
})
}
......@@ -30,13 +30,12 @@ func DialContext(ctx context.Context, u string) (codec.Conn, error) {
if err != nil {
return nil, err
}
return rdwr.NewClient(conn, conn, nil), nil
return rdwr.NewClient(conn, conn), nil
}
return nil, nil
}
func Dial(u string) (codec.Conn, error) {
ctx := context.Background()
return DialContext(ctx, u)
}
......@@ -8,13 +8,15 @@ import (
"fmt"
"io"
"net/http"
"sync"
"sync/atomic"
"time"
"gfx.cafe/open/jrpc/pkg/codec"
"gfx.cafe/open/jrpc/pkg/clientutil"
"gfx.cafe/util/go/bufpool"
"gfx.cafe/open/jrpc/pkg/clientutil"
)
var (
......@@ -41,6 +43,19 @@ type Client struct {
id atomic.Int64
headers http.Header
m codec.Middlewares
handler codec.Handler
mu sync.RWMutex
}
func (c *Client) Mount(h codec.Middleware) {
c.mu.Lock()
defer c.mu.Unlock()
c.m = append(c.m, h)
c.handler = c.m.HandlerFunc(func(w codec.ResponseWriter, r *codec.Request) {
// do nothing on no handler
})
}
func DialHTTP(target string) (*Client, error) {
......@@ -93,7 +108,7 @@ func (c *Client) Do(ctx context.Context, result any, method string, params any)
}
func (c *Client) post(req *codec.Request) (*http.Response, error) {
//TODO: use buffer for this
// TODO: use buffer for this
buf := bufpool.GetStd()
defer bufpool.PutStd(buf)
buf.Reset()
......@@ -125,13 +140,13 @@ func (c *Client) Notify(ctx context.Context, method string, params any) error {
func (c *Client) BatchCall(ctx context.Context, b ...*codec.BatchElem) error {
reqs := make([]*codec.Request, len(b))
ids := make([]int, 0, len(b))
for _, v := range b {
ids := make(map[int]int, len(b))
for idx, v := range b {
if v.IsNotification {
reqs = append(reqs, codec.NewRequest(ctx, "", v.Method, v.Params))
} else {
id := int(c.id.Add(1))
ids = append(ids, id)
ids[idx] = id
reqs = append(reqs, codec.NewRequestInt(ctx, id, v.Method, v.Params))
}
}
......
......@@ -7,11 +7,12 @@ import (
"encoding/json"
"errors"
"fmt"
"gfx.cafe/open/jrpc/pkg/codec"
"io"
"mime"
"net/http"
"net/url"
"gfx.cafe/open/jrpc/pkg/codec"
)
type Codec struct {
......@@ -74,6 +75,9 @@ func (c *Codec) PeerInfo() codec.PeerInfo {
func (r *Codec) doReadGet() (msgs json.RawMessage, err error) {
method_up := r.r.URL.Query().Get("method")
if method_up == "" {
method_up = r.r.URL.Path
}
params, _ := url.QueryUnescape(r.r.URL.Query().Get("params"))
param := []byte(params)
if pb, err := base64.URLEncoding.DecodeString(params); err == nil {
......@@ -87,6 +91,23 @@ func (r *Codec) doReadGet() (msgs json.RawMessage, err error) {
return req.MarshalJSON()
}
func (r *Codec) doReadPut() (msgs json.RawMessage, err error) {
method_up := r.r.URL.Query().Get("method")
if method_up == "" {
method_up = r.r.URL.Path
}
id := r.r.URL.Query().Get("id")
if id == "" {
id = "1"
}
data, err := io.ReadAll(r.r.Body)
if err != nil {
return nil, err
}
req := codec.NewRequest(r.ctx, id, method_up, data)
return req.MarshalJSON()
}
// validateRequest returns a non-zero response code and error message if the
// request is invalid.
func ValidateRequest(r *http.Request) (int, error) {
......@@ -130,6 +151,8 @@ func (c *Codec) doRead() {
switch c.r.Method {
case http.MethodGet:
data, err = c.doReadGet()
case http.MethodPut:
data, err = c.doReadPut()
case http.MethodPost:
data, err = io.ReadAll(c.r.Body)
}
......
......@@ -20,5 +20,6 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
c := NewCodec(w, r)
w.Header().Set("content-type", contentType)
s.Server.ServeCodec(r.Context(), c)
}
......@@ -14,7 +14,18 @@ type Client struct {
p *clientutil.IdReply
c *Codec
m codec.Middlewares
handler codec.Handler
mu sync.Mutex
}
func (c *Client) Mount(h codec.Middleware) {
c.mu.Lock()
defer c.mu.Unlock()
c.m = append(c.m, h)
c.handler = c.m.HandlerFunc(func(w codec.ResponseWriter, r *codec.Request) {
// do nothing on no handler
})
}
func NewClient(c *Codec, handler codec.Handler) *Client {
......@@ -39,9 +50,16 @@ func (c *Client) listen() error {
v := msgs[i]
id := v.ID.Number()
if id == 0 {
//if c.handler != nil {
// c.handler.ServeRPC(w, r)
//}
if c.handler != nil {
c.handler.ServeRPC(nil, codec.NewRequestFromRaw(c.c.ctx, &codec.RequestMarshaling{
Method: v.Method,
Params: v.Params,
Peer: codec.PeerInfo{
Transport: "ipc",
RemoteAddr: "",
},
}))
}
continue
}
var err error
......
......@@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"io"
"sync"
"gfx.cafe/open/jrpc/pkg/codec"
)
......@@ -13,9 +14,10 @@ type Codec struct {
ctx context.Context
cn func()
rd io.Reader
wr *bufio.Writer
msgs chan json.RawMessage
rd io.Reader
wrLock sync.Mutex
wr *bufio.Writer
msgs chan json.RawMessage
}
func NewCodec() *Codec {
......@@ -58,10 +60,14 @@ func (c *Codec) Close() error {
}
func (c *Codec) Write(p []byte) (n int, err error) {
c.wrLock.Lock()
defer c.wrLock.Unlock()
return c.wr.Write(p)
}
func (c *Codec) Flush() (err error) {
c.wrLock.Lock()
defer c.wrLock.Unlock()
return c.wr.Flush()
}
......@@ -76,7 +82,7 @@ func (c *Codec) RemoteAddr() string {
}
// DialInProc attaches an in-process connection to the given RPC server.
//func DialInProc(handler *Server) *Client {
// func DialInProc(handler *Server) *Client {
// initctx := context.Background()
// c, _ := newClient(initctx, func(context.Context) (ServerCodec, error) {
// p1, p2 := net.Pipe()
......@@ -84,4 +90,4 @@ func (c *Codec) RemoteAddr() string {
// return NewCodec(p2), nil
// })
// return c
//}
// }
......@@ -20,21 +20,42 @@ type Client struct {
ctx context.Context
cn context.CancelFunc
m codec.Middlewares
handler codec.Handler
mu sync.RWMutex
handlerPeer codec.PeerInfo
}
func NewClient(rd io.Reader, wr io.Writer, handler codec.Handler) *Client {
func NewClient(rd io.Reader, wr io.Writer) *Client {
cl := &Client{
p: clientutil.NewIdReply(),
rd: rd,
wr: wr,
handler: handler,
p: clientutil.NewIdReply(),
rd: rd,
wr: wr,
handlerPeer: codec.PeerInfo{
Transport: "ipc",
RemoteAddr: "",
},
handler: codec.HandlerFunc(func(w codec.ResponseWriter, r *codec.Request) {}),
}
cl.ctx, cl.cn = context.WithCancel(context.Background())
go cl.listen()
return cl
}
func (c *Client) SetHandlerPeer(pi codec.PeerInfo) {
c.handlerPeer = pi
}
func (c *Client) Mount(h codec.Middleware) {
c.mu.Lock()
defer c.mu.Unlock()
c.m = append(c.m, h)
c.handler = c.m.HandlerFunc(func(w codec.ResponseWriter, r *codec.Request) {
// do nothing on no handler
})
}
func (c *Client) listen() error {
var msg json.RawMessage
for {
......@@ -51,20 +72,18 @@ func (c *Client) listen() error {
id := v.ID.Number()
// messages without ids are notifications
if id == 0 {
//TODO: implement this
if c.handler != nil {
// writer should only be allowed to send notifications
// reader should contain the message above
// the context is the client context
c.handler.ServeRPC(nil, codec.NewRequestFromRaw(c.ctx, &codec.RequestMarshaling{
Method: v.Method,
Params: v.Params,
Peer: codec.PeerInfo{
Transport: "ipc",
RemoteAddr: "",
},
}))
}
var handler codec.Handler
c.mu.RLock()
handler = c.handler
c.mu.RUnlock()
// writer should only be allowed to send notifications
// reader should contain the message above
// the context is the client context
handler.ServeRPC(nil, codec.NewRequestFromRaw(c.ctx, &codec.RequestMarshaling{
Method: v.Method,
Params: v.Result,
Peer: c.handlerPeer,
}))
continue
}
var err error
......
......@@ -4,18 +4,21 @@ import (
"bufio"
"context"
"io"
"sync"
"gfx.cafe/open/jrpc/pkg/codec"
"github.com/goccy/go-json"
"gfx.cafe/open/jrpc/pkg/codec"
)
type Codec struct {
ctx context.Context
cn func()
rd io.Reader
wr *bufio.Writer
msgs chan json.RawMessage
rd io.Reader
wrLock sync.Mutex
wr *bufio.Writer
msgs chan json.RawMessage
}
func NewCodec(rd io.Reader, wr io.Writer, onError func(error)) *Codec {
......@@ -77,10 +80,14 @@ func (c *Codec) Close() error {
}
func (c *Codec) Write(p []byte) (n int, err error) {
c.wrLock.Lock()
defer c.wrLock.Unlock()
return c.wr.Write(p)
}
func (c *Codec) Flush() (err error) {
c.wrLock.Lock()
defer c.wrLock.Unlock()
c.wr.WriteByte('\n')
return c.wr.Flush()
}
......@@ -96,7 +103,7 @@ func (c *Codec) RemoteAddr() string {
}
// Dialrdwr attaches an in-process connection to the given RPC server.
//func Dialrdwr(handler *Server) *Client {
// func Dialrdwr(handler *Server) *Client {
// initctx := context.Background()
// c, _ := newClient(initctx, func(context.Context) (ServerCodec, error) {
// p1, p2 := net.Pipe()
......@@ -104,4 +111,4 @@ func (c *Codec) RemoteAddr() string {
// return NewCodec(p2), nil
// })
// return c
//}
// }
......@@ -24,7 +24,7 @@ func TestBasicSuite(t *testing.T) {
s.ServeCodec(context.Background(), clientCodec)
}()
return s, func() codec.Conn {
return rdwr.NewClient(rd_s, wr_c, nil)
return rdwr.NewClient(rd_s, wr_c)
}, func() {}
},
})
......
......@@ -22,7 +22,7 @@ func TestRDWRSetup(t *testing.T) {
rd_c, wr_c := io.Pipe()
clientCodec := rdwr.NewCodec(rd_s, wr_c, nil)
client := rdwr.NewClient(rd_c, wr_s, nil)
client := rdwr.NewClient(rd_c, wr_s)
go func() {
srv.ServeCodec(ctx, clientCodec)
}()
......
......@@ -2,6 +2,7 @@ package websocket
import (
"gfx.cafe/open/jrpc/contrib/codecs/rdwr"
"gfx.cafe/open/jrpc/pkg/codec"
"context"
......@@ -17,9 +18,13 @@ func newClient(conn *websocket.Conn) (*Client, error) {
conn.SetReadLimit(WsMessageSizeLimit)
netConn := websocket.NetConn(context.Background(), conn, websocket.MessageText)
c := &Client{
Client: rdwr.NewClient(netConn, netConn, nil),
Client: rdwr.NewClient(netConn, netConn),
conn: conn,
}
c.SetHandlerPeer(codec.PeerInfo{
Transport: "ws",
RemoteAddr: "",
})
return c, nil
}
......
......@@ -2,14 +2,15 @@ package websocket_test
import (
"context"
"net/http/httptest"
"strings"
"testing"
"gfx.cafe/open/jrpc/contrib/codecs/websocket"
"gfx.cafe/open/jrpc/contrib/jmux"
"gfx.cafe/open/jrpc/pkg/codec"
"gfx.cafe/open/jrpc/pkg/jrpctest"
"gfx.cafe/open/jrpc/pkg/server"
"net/http/httptest"
"strings"
"testing"
)
func TestWebsocketClientHeaders(t *testing.T) {
......@@ -153,7 +154,7 @@ func TestClientWebsocketLargeMessage(t *testing.T) {
}
var r string
if err := c.Do(nil, &r, "test_largeResp", nil); err != nil {
if err := c.Do(nil, &r, "test/largeResp", nil); err != nil {
t.Fatal("call failed:", err)
}
if len(r) != respLength {
......
......@@ -6,24 +6,9 @@ import (
"gfx.cafe/util/go/bufpool"
json "github.com/goccy/go-json"
jsoniter "github.com/json-iterator/go"
"nhooyr.io/websocket"
)
var JZON = jsoniter.Config{
IndentionStep: 0,
MarshalFloatWith6Digits: false,
EscapeHTML: true,
SortMapKeys: true,
UseNumber: false,
DisallowUnknownFields: false,
TagKey: "",
OnlyTaggedField: false,
ValidateJsonRawMessage: false,
ObjectFieldMustBeSimpleString: false,
CaseSensitive: false,
}.Froze()
// Read reads a JSON message from c into v.
// It will reuse buffers in between calls to avoid allocations.
func Read(ctx context.Context, c *websocket.Conn, v interface{}) error {
......@@ -59,10 +44,8 @@ func write(ctx context.Context, c *websocket.Conn, v interface{}) (err error) {
if err != nil {
return err
}
st := JZON.BorrowStream(w)
defer JZON.ReturnStream(st)
st.WriteVal(v)
err = st.Flush()
st := json.NewEncoder(w)
err = st.Encode(v)
if err != nil {
return fmt.Errorf("failed to marshal JSON: %w", err)
}
......
package subscription
import (
"context"
"encoding/json"
"reflect"
"strings"
"sync"
"sync/atomic"
"gfx.cafe/open/jrpc/pkg/codec"
)
type WrapClient struct {
subs map[string]*clientSub
conn codec.StreamingConn
mu sync.RWMutex
}
func NewWrapClient(conn codec.StreamingConn) *WrapClient {
return &WrapClient{
subs: map[string]*clientSub{},
conn: conn,
}
}
func (c *WrapClient) Middleware(h codec.Handler) codec.Handler {
return codec.HandlerFunc(func(w codec.ResponseWriter, r *codec.Request) {
// use normal handler
if !strings.HasSuffix(r.Method, notificationMethodSuffix) {
h.ServeRPC(w, r)
return
}
var params subscriptionResult
// NOTE: this error is ignored because notifications ignore errors
err := json.Unmarshal(r.Params, &params)
_ = err
if params.ID == "" {
// probably some malformed packet, ignore it
return
}
c.mu.Lock()
clientSub, ok := c.subs[params.ID]
c.mu.Unlock()
if ok {
clientSub.onmsg <- params.Result
}
})
}
func (c *WrapClient) Subscribe(ctx context.Context, namespace string, channel any, args any) (ClientSubscription, error) {
chanVal := reflect.ValueOf(channel)
// make sure its a proper channel
chanVal.Kind()
if chanVal.Kind() != reflect.Chan || chanVal.Type().ChanDir()&reflect.SendDir == 0 {
panic("first argument to Subscribe must be a writable channel")
}
if chanVal.IsNil() {
panic("channel given to Subscribe must not be nil")
}
// send the actual message to initialize the subscription
var result string
err := c.conn.Do(ctx, &result, namespace+subscribeMethodSuffix, args)
if err != nil {
return nil, err
}
// check the result
if result == "" {
return nil, ErrSubscriptionNotFound
}
// now create a client sub
sub := &clientSub{
engine: c,
conn: c.conn,
namespace: namespace,
id: result,
channel: chanVal,
// BUG: a worse is better solution... it means that when this fills, you might receive subscriptions in an undefined error
onmsg: make(chan json.RawMessage, 32),
subdone: make(chan struct{}),
}
// will get the type of the event
etype := chanVal.Type().Elem()
go func() {
for {
select {
case <-sub.subdone:
// sub is done, so close readErr
close(sub.readErr)
case params, ok := <-sub.onmsg:
if !ok {
close(sub.readErr)
return
}
val := reflect.New(etype)
err := json.Unmarshal(params, val.Interface())
if err != nil {
sub.readErr <- err
return
}
// and now send the elem
sub.channel.Send(val.Elem())
case <-ctx.Done():
close(sub.readErr)
return
}
}
}()
c.mu.Lock()
c.subs[sub.id] = sub
c.mu.Unlock()
return sub, nil
}
func (c *WrapClient) Do(ctx context.Context, result any, method string, params any) error {
return c.conn.Do(ctx, result, method, params)
}
func (c *WrapClient) BatchCall(ctx context.Context, b ...*codec.BatchElem) error {
return c.conn.BatchCall(ctx, b...)
}
func (c *WrapClient) Close() error {
return c.conn.Close()
}
func (c *WrapClient) Mount(m codec.Middleware) {
c.conn.Mount(m)
}
func (c *WrapClient) Notify(ctx context.Context, method string, params any) error {
return c.conn.Notify(ctx, method, params)
}
// the actual subscription
type clientSub struct {
engine *WrapClient
conn codec.StreamingConn
namespace string
id string
channel reflect.Value
onmsg chan json.RawMessage
subdone chan struct{}
readErr chan error
done atomic.Bool
}
func (c *clientSub) Err() <-chan error {
return c.readErr
}
func (c *clientSub) Unsubscribe() error {
// TODO: dont use context background here...
var result string
err := c.conn.Do(context.Background(), &result, c.namespace+unsubscribeMethodSuffix, nil)
if err != nil {
return err
}
if c.done.CompareAndSwap(false, true) {
close(c.subdone)
}
return nil
}
func (c *clientSub) String() string {
return c.id
}
package subscription
import (
"context"
"gfx.cafe/open/jrpc/pkg/codec"
)
type Conn interface {
Subscribe(ctx context.Context, namespace string, channel any, args any) (ClientSubscription, error)
codec.StreamingConn
}
func UpgradeConn(c codec.Conn, err error) (Conn, error) {
if err != nil {
return nil, err
}
if val, ok := c.(codec.StreamingConn); ok {
engine := NewWrapClient(val)
val.Mount(engine.Middleware)
return engine, nil
}
return nil, ErrNotificationsUnsupported
}
type ClientSubscription interface {
Err() <-chan error
Unsubscribe() error
String() string
}
package subscription
import (
"context"
"strings"
"sync"
"gfx.cafe/open/jrpc/pkg/codec"
)
type Engine struct {
subscriptions map[SubID]*Notifier
mu sync.Mutex
idgen func() SubID
}
func NewEngine() *Engine {
return &Engine{
subscriptions: make(map[SubID]*Notifier),
idgen: randomIDGenerator(),
}
}
func (e *Engine) closeSub(subid SubID) (bool, error) {
e.mu.Lock()
defer e.mu.Unlock()
val, ok := e.subscriptions[subid]
if ok {
val.err <- ErrSubscriptionClosed
close(val.err)
delete(e.subscriptions, subid)
}
if !ok {
return ok, ErrSubscriptionNotFound
}
return ok, nil
}
func (e *Engine) Middleware() func(codec.Handler) codec.Handler {
return func(h codec.Handler) codec.Handler {
return codec.HandlerFunc(func(w codec.ResponseWriter, r *codec.Request) {
// its a subscription, so install a notification handler
switch {
case strings.HasSuffix(r.Method, subscribeMethodSuffix):
// create the notifier to inject into the context
n := &Notifier{
h: w,
namespace: strings.TrimSuffix(r.Method, subscribeMethodSuffix),
id: e.idgen(),
err: make(chan error, 1),
}
// get the subscription object
// add to the map
e.mu.Lock()
e.subscriptions[n.id] = n
e.mu.Unlock()
// now send the subscription id back
w.Send(n.id, nil)
// then inject the notifier
r = r.WithContext(context.WithValue(r.Context(), notifierKey{}, n))
h.ServeRPC(w, r)
case strings.HasSuffix(r.Method, unsubscribeMethodSuffix):
// read the subscription id to close
var subid SubID
err := r.ParamArray(subid)
if err != nil {
w.Send(false, err)
return
}
// close that sub
w.Send(e.closeSub(subid))
default:
h.ServeRPC(w, r)
}
})
}
}