diff --git a/contrib/codecs/codecs.go b/contrib/codecs/codecs.go index 1ec9f4f041bf2b8680bb98889865c6a2266393c6..e6f78b76f5cc0ffa7b0113bbae21640e2992b4a2 100644 --- a/contrib/codecs/codecs.go +++ b/contrib/codecs/codecs.go @@ -1,12 +1,15 @@ package codecs import ( + "fmt" + "gfx.cafe/open/jrpc/contrib/codecs/http" "gfx.cafe/open/jrpc/contrib/codecs/inproc" "gfx.cafe/open/jrpc/contrib/codecs/websocket" "gfx.cafe/open/jrpc/pkg/server" gohttp "net/http" + "net/url" ) var NewInProc = inproc.NewCodec @@ -24,3 +27,18 @@ var HttpWebsocketHandler = func(srv *server.Server, origins []string) gohttp.Han chttp.ServeHTTP(w, r) }) } + +func ListenAndServe(u string, srv *server.Server, opts map[string]any) error { + pu, err := url.Parse(u) + if err != nil { + return err + } + if opts == nil { + opts = map[string]any{} + } + handler := handlerFuncs[pu.Scheme] + if handler == nil { + return fmt.Errorf("%w: %s", ErrSchemeNotSupported, pu.Scheme) + } + return handler(pu, srv, opts) +} diff --git a/contrib/codecs/dialer.go b/contrib/codecs/dialer.go index bca245eb7e7e9245e0bba4b250809a0bbf646dde..7e010c1e9606f1e84cbaed8d51ec9181d1e35545 100644 --- a/contrib/codecs/dialer.go +++ b/contrib/codecs/dialer.go @@ -2,43 +2,25 @@ package codecs import ( "context" - "net" + "errors" + "fmt" "net/url" - "gfx.cafe/open/jrpc/contrib/codecs/http" - "gfx.cafe/open/jrpc/contrib/codecs/rdwr" - "gfx.cafe/open/jrpc/contrib/codecs/websocket" "gfx.cafe/open/jrpc/pkg/codec" ) +var ErrSchemeNotSupported = errors.New("url scheme not supported") + func DialContext(ctx context.Context, u string) (codec.Conn, error) { pu, err := url.Parse(u) if err != nil { return nil, err } - switch pu.Scheme { - case "http", "https": - return http.Dial(ctx, nil, u) - case "ws", "wss": - return websocket.DialWebsocket(ctx, u, "") -// case "redis": -// domain := pu.Query().Get("domain") -// if domain == "" { -// domain = "jrpc" -// } -// return redis.Dial(pu.Host, domain), nil - case "tcp": - tcpAddr, err := net.ResolveTCPAddr("tcp", u) - if err != nil { - return nil, err - } - conn, err := net.DialTCP("tcp", nil, tcpAddr) - if err != nil { - return nil, err - } - return rdwr.NewClient(conn, conn), nil + dialer := dialers[pu.Scheme] + if dialer == nil { + return nil, fmt.Errorf("%w: %s", ErrSchemeNotSupported, pu.Scheme) } - return nil, nil + return dialer(ctx, u) } func Dial(u string) (codec.Conn, error) { diff --git a/contrib/codecs/init.go b/contrib/codecs/init.go new file mode 100644 index 0000000000000000000000000000000000000000..5f73b70a82b3f5501247bd8f4569335b5c9c44ab --- /dev/null +++ b/contrib/codecs/init.go @@ -0,0 +1,70 @@ +package codecs + +import ( + "context" + "net" + "net/url" + + gohttp "net/http" + + "gfx.cafe/open/jrpc/contrib/codecs/http" + "gfx.cafe/open/jrpc/contrib/codecs/rdwr" + "gfx.cafe/open/jrpc/contrib/codecs/websocket" + "gfx.cafe/open/jrpc/pkg/codec" + "gfx.cafe/open/jrpc/pkg/server" +) + +func init() { + RegisterHandler(func(bind *url.URL, srv *server.Server, opts map[string]any) error { + return gohttp.ListenAndServe(bind.Host, HttpHandler(srv)) + }, "http") + RegisterHandler(func(bind *url.URL, srv *server.Server, opts map[string]any) error { + origins := []string{} + if val, ok := opts["origins"]; ok { + if t, ok := val.([]string); ok { + origins = t + } + } + return gohttp.ListenAndServe(bind.Host, HttpWebsocketHandler(srv, origins)) + }, "http+ws") + + RegisterHandler(func(bind *url.URL, srv *server.Server, opts map[string]any) error { + tcpAddr, err := net.ResolveTCPAddr("tcp", bind.String()) + if err != nil { + return err + } + tcpListener, err := net.ListenTCP("tcp", tcpAddr) + if err != nil { + return err + } + for { + conn, err := tcpListener.Accept() + if err != nil { + return err + } + go rdwr.NewCodec(conn, conn, nil) + } + }, "tcp") + + RegisterHandler(func(bind *url.URL, srv *server.Server, opts map[string]any) error { + return gohttp.ListenAndServe(bind.Host, HttpHandler(srv)) + }, "http") + RegisterDialer(func(ctx context.Context, url string) (codec.Conn, error) { + return http.Dial(ctx, nil, url) + }, "https", "http") + RegisterDialer(func(ctx context.Context, url string) (codec.Conn, error) { + return websocket.DialWebsocket(ctx, url, "") + }, "wss", "ws") + + RegisterDialer(func(ctx context.Context, url string) (codec.Conn, error) { + tcpAddr, err := net.ResolveTCPAddr("tcp", url) + if err != nil { + return nil, err + } + conn, err := net.DialTCP("tcp", nil, tcpAddr) + if err != nil { + return nil, err + } + return rdwr.NewClient(conn, conn), nil + }, "tcp") +} diff --git a/contrib/codecs/registry.go b/contrib/codecs/registry.go new file mode 100644 index 0000000000000000000000000000000000000000..122cdc593cb9768ddf94f67d0a7fc016739da318 --- /dev/null +++ b/contrib/codecs/registry.go @@ -0,0 +1,29 @@ +package codecs + +import ( + "context" + "net/url" + + "gfx.cafe/open/jrpc/pkg/codec" + "gfx.cafe/open/jrpc/pkg/server" +) + +type handlerFunc = func(bind *url.URL, srv *server.Server, opts map[string]any) error + +var handlerFuncs map[string]handlerFunc = map[string]handlerFunc{} + +func RegisterHandler(fn handlerFunc, names ...string) { + for _, v := range names { + handlerFuncs[v] = fn + } +} + +type dialerFunc = func(ctx context.Context, url string) (codec.Conn, error) + +var dialers map[string]dialerFunc = map[string]dialerFunc{} + +func RegisterDialer(fn dialerFunc, names ...string) { + for _, v := range names { + dialers[v] = fn + } +}