good morning!!!!

Skip to content
Commits on Source (13)
.idea
\ No newline at end of file
.idea
*.out
*.pb.gz
*.tmp
*.log
*.test
package benchmark
import (
"context"
"testing"
"gfx.cafe/open/jrpc/contrib/codecs/http"
"gfx.cafe/open/jrpc/contrib/codecs/inproc"
"gfx.cafe/open/jrpc/contrib/codecs/rdwr"
"gfx.cafe/open/jrpc/contrib/codecs/websocket"
"gfx.cafe/open/jrpc/pkg/codec"
"gfx.cafe/open/jrpc/pkg/jrpctest"
"gfx.cafe/open/jrpc/pkg/server"
)
func runBenchmarkSuite(b *testing.B, sm jrpctest.ServerMaker) {
ctx := context.Background()
executeBench := jrpctest.BenchExecutor(sm)
var makeBench = func(name string, fm jrpctest.BenchContext) {
b.Run(name, func(b *testing.B) {
executeBench(b, fm)
})
}
makeBench("SingleClient", func(b *testing.B, server *server.Server, client codec.Conn) {
for i := 0; i < b.N; i++ {
err := client.Do(ctx, nil, "test_ping", nil)
if err != nil {
panic(err)
}
}
})
}
func BenchmarkSimpleSuite(b *testing.B) {
makers := map[string]jrpctest.ServerMaker{
"Http": http.ServerMaker,
"WebSocket": websocket.ServerMaker,
"InProc": inproc.ServerMaker,
"IoPipe": rdwr.ServerMaker,
}
for k, v := range makers {
b.Run(k, func(b *testing.B) {
runBenchmarkSuite(b, v)
})
}
}
package client
import (
"context"
"sync"
"gfx.cafe/open/jrpc"
"gfx.cafe/open/jrpc/pkg/codec"
)
type Reconnecting struct {
dialer func(ctx context.Context) (jrpc.Conn, error)
base codec.Conn
alive bool
middleware []codec.Middleware
mu sync.Mutex
}
func NewReconnecting(dialer func(ctx context.Context) (jrpc.Conn, error)) *Reconnecting {
r := &Reconnecting{
dialer: dialer,
}
return r
}
func (r *Reconnecting) getClient(ctx context.Context) (jrpc.Conn, error) {
reconnect := func() error {
conn, err := r.dialer(ctx)
if err != nil {
return err
}
r.base = conn
r.alive = true
return nil
}
r.mu.Lock()
defer r.mu.Unlock()
if r.base == nil {
err := reconnect()
if err != nil {
return nil, err
}
} else {
select {
case <-r.base.Closed():
err := reconnect()
if err != nil {
return nil, err
}
default:
}
}
return r.base, nil
}
func (r *Reconnecting) Do(ctx context.Context, result any, method string, params any) error {
errChan := make(chan error)
go func() {
conn, err := r.getClient(ctx)
if err != nil {
errChan <- err
return
}
errChan <- conn.Do(ctx, result, method, params)
}()
return <-errChan
}
func (r *Reconnecting) BatchCall(ctx context.Context, b ...*codec.BatchElem) error {
errChan := make(chan error)
go func() {
conn, err := r.getClient(ctx)
if err != nil {
errChan <- err
return
}
errChan <- conn.BatchCall(ctx, b...)
}()
return <-errChan
}
func (r *Reconnecting) Mount(m codec.Middleware) {
r.middleware = append(r.middleware, m)
}
// why would you want to do this....
func (r *Reconnecting) Close() error {
conn, err := r.getClient(context.Background())
if err != nil {
return err
}
return conn.Close()
}
// never....
func (r *Reconnecting) Closed() <-chan struct{} {
return make(<-chan struct{})
}
......@@ -8,6 +8,7 @@ import (
"gfx.cafe/open/jrpc/contrib/codecs/http"
"gfx.cafe/open/jrpc/contrib/codecs/rdwr"
"gfx.cafe/open/jrpc/contrib/codecs/websocket"
"gfx.cafe/open/jrpc/exp/redis"
"gfx.cafe/open/jrpc/pkg/codec"
)
......@@ -21,6 +22,12 @@ func DialContext(ctx context.Context, u string) (codec.Conn, error) {
return http.Dial(ctx, nil, u)
case "ws", "wss":
return websocket.DialWebsocket(ctx, u, "")
case "redis":
domain := pu.Query().Get("domain")
if domain == "" {
domain = "jrpc"
}
return redis.Dial(pu.Host, domain), nil
case "tcp":
tcpAddr, err := net.ResolveTCPAddr("tcp", u)
if err != nil {
......
......@@ -66,6 +66,10 @@ func (c *Client) SetHeader(key string, value string) {
c.headers.Set(key, value)
}
func (c *Client) Closed() <-chan struct{} {
return make(chan struct{})
}
func (c *Client) Do(ctx context.Context, result any, method string, params any) error {
req, err := codec.NewRequest(ctx, codec.NewId(c.id.Add(1)), method, params)
if err != nil {
......
......@@ -18,6 +18,7 @@ import (
var _ codec.ReaderWriter = (*Codec)(nil)
// Reusable codec. use Reset()
type Codec struct {
ctx context.Context
cn func()
......@@ -37,23 +38,32 @@ type httpError struct {
}
func NewCodec(w http.ResponseWriter, r *http.Request) *Codec {
c := &Codec{}
c.Reset(w, r)
return c
}
func (c *Codec) Reset(w http.ResponseWriter, r *http.Request) {
ir := io.Writer(w)
if w == nil {
ir = io.Discard
}
c := &Codec{
r: r,
w: w,
wr: bufio.NewWriter(ir),
msgs: make(chan *serverutil.Bundle, 1),
errCh: make(chan httpError, 1),
c.r = r
c.w = w
if c.wr == nil {
c.wr = bufio.NewWriter(ir)
} else {
c.wr.Reset(ir)
}
ctx := r.Context()
c.msgs = make(chan *serverutil.Bundle, 1)
c.errCh = make(chan httpError, 1)
ctx := c.r.Context()
c.ctx, c.cn = context.WithCancel(ctx)
c.peerInfo()
c.doRead()
return c
}
func (c *Codec) peerInfo() {
c.i.Transport = "http"
c.i.RemoteAddr = c.r.RemoteAddr
......
package http
import (
"gfx.cafe/open/jrpc/pkg/codec"
"gfx.cafe/open/jrpc/pkg/server"
"net/http/httptest"
"testing"
"gfx.cafe/open/jrpc/pkg/jrpctest"
"github.com/stretchr/testify/require"
)
func TestBasicSuite(t *testing.T) {
jrpctest.RunBasicTestSuite(t, jrpctest.BasicTestSuiteArgs{
ServerMaker: func() (*server.Server, jrpctest.ClientMaker, func()) {
s := jrpctest.NewServer()
hsrv := httptest.NewServer(&Server{Server: s})
return s, func() codec.Conn {
conn, err := DialHTTP(hsrv.URL)
require.NoError(t, err)
return conn
}, hsrv.Close
},
ServerMaker: ServerMaker,
})
}
......@@ -2,6 +2,7 @@ package http
import (
"net/http"
"sync"
"gfx.cafe/open/jrpc/pkg/server"
)
......@@ -14,12 +15,23 @@ type Server struct {
Server *server.Server
}
var codecPool = sync.Pool{
New: func() any {
return &Codec{}
},
}
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if s.Server == nil {
http.Error(w, "no server set", http.StatusInternalServerError)
return
}
c := NewCodec(w, r)
c := codecPool.Get().(*Codec)
c.Reset(w, r)
w.Header().Set("content-type", contentType)
s.Server.ServeCodec(r.Context(), c)
go func() {
<-c.Closed()
codecPool.Put(c)
}()
}
package http
import (
"net/http/httptest"
"gfx.cafe/open/jrpc/pkg/codec"
"gfx.cafe/open/jrpc/pkg/jrpctest"
"gfx.cafe/open/jrpc/pkg/server"
)
func ServerMaker() (*server.Server, jrpctest.ClientMaker, func()) {
s := jrpctest.NewServer()
hsrv := httptest.NewServer(&Server{Server: s})
return s, func() codec.Conn {
conn, err := DialHTTP(hsrv.URL)
if err != nil {
panic(err)
}
return conn
}, hsrv.Close
}
......@@ -14,6 +14,9 @@ type Client struct {
p *clientutil.IdReply
c *Codec
ctx context.Context
cn context.CancelFunc
m codec.Middlewares
handler codec.Handler
mu sync.Mutex
......@@ -34,14 +37,22 @@ func NewClient(c *Codec, handler codec.Handler) *Client {
c: c,
handler: handler,
}
cl.ctx, cl.cn = context.WithCancel(context.Background())
go cl.listen()
return cl
}
func (c *Client) Closed() <-chan struct{} {
return c.ctx.Done()
}
func (c *Client) listen() error {
var msg json.RawMessage
defer c.cn()
dec := json.NewDecoder(c.c.rd)
for {
err := json.NewDecoder(c.c.rd).Decode(&msg)
err := dec.Decode(&msg)
if err != nil {
return err
}
......
package inproc_test
import (
"context"
"testing"
"gfx.cafe/open/jrpc/contrib/codecs/inproc"
"gfx.cafe/open/jrpc/pkg/codec"
"gfx.cafe/open/jrpc/pkg/server"
"gfx.cafe/open/jrpc/pkg/jrpctest"
)
func TestBasicSuite(t *testing.T) {
jrpctest.RunBasicTestSuite(t, jrpctest.BasicTestSuiteArgs{
ServerMaker: func() (*server.Server, jrpctest.ClientMaker, func()) {
s := jrpctest.NewServer()
clientCodec := inproc.NewCodec()
go func() {
s.ServeCodec(context.Background(), clientCodec)
}()
return s, func() codec.Conn {
return inproc.NewClient(clientCodec, nil)
}, func() {}
},
ServerMaker: inproc.ServerMaker,
})
}
package inproc
import (
"context"
"gfx.cafe/open/jrpc/pkg/codec"
"gfx.cafe/open/jrpc/pkg/jrpctest"
"gfx.cafe/open/jrpc/pkg/server"
)
func ServerMaker() (*server.Server, jrpctest.ClientMaker, func()) {
s := jrpctest.NewServer()
clientCodec := NewCodec()
go func() {
s.ServeCodec(context.Background(), clientCodec)
}()
return s, func() codec.Conn {
return NewClient(clientCodec, nil)
}, func() {}
}
package rdwr
import (
"bytes"
"context"
"encoding/json"
"io"
......@@ -9,6 +8,7 @@ import (
"gfx.cafe/open/jrpc/pkg/clientutil"
"gfx.cafe/open/jrpc/pkg/codec"
"gfx.cafe/util/go/bufpool"
)
type Client struct {
......@@ -47,6 +47,10 @@ func (c *Client) SetHandlerPeer(pi codec.PeerInfo) {
c.handlerPeer = pi
}
func (c *Client) Closed() <-chan struct{} {
return c.ctx.Done()
}
func (c *Client) Mount(h codec.Middleware) {
c.mu.Lock()
defer c.mu.Unlock()
......@@ -58,8 +62,10 @@ func (c *Client) Mount(h codec.Middleware) {
func (c *Client) listen() error {
var msg json.RawMessage
defer c.cn()
dec := json.NewDecoder(c.rd)
for {
err := json.NewDecoder(c.rd).Decode(&msg)
err := dec.Decode(&msg)
if err != nil {
return err
}
......@@ -100,15 +106,17 @@ func (c *Client) listen() error {
func (c *Client) Do(ctx context.Context, result any, method string, params any) error {
id := c.p.NextId()
buf := bufpool.GetStd()
defer bufpool.PutStd(buf)
req, err := codec.NewRequest(ctx, codec.NewId(id), method, params)
if err != nil {
return err
}
fwd, err := json.Marshal(req)
err = json.NewEncoder(buf).Encode(req)
if err != nil {
return err
}
err = c.writeContext(req.Context(), fwd)
err = c.writeContext(req.Context(), buf.Bytes())
if err != nil {
return err
}
......@@ -129,7 +137,8 @@ func (c *Client) BatchCall(ctx context.Context, b ...*codec.BatchElem) error {
if ctx == nil {
ctx = context.Background()
}
buf := new(bytes.Buffer)
buf := bufpool.GetStd()
defer bufpool.PutStd(buf)
enc := json.NewEncoder(buf)
reqs := make([]*codec.Request, 0, len(b))
ids := make([]*codec.ID, 0, len(b))
......
......@@ -42,9 +42,10 @@ func NewCodec(rd io.Reader, wr io.Writer, onError func(error)) *Codec {
func (c *Codec) listen() error {
var msg json.RawMessage
dec := json.NewDecoder(c.rd)
for {
// reading a message
err := json.NewDecoder(c.rd).Decode(&msg)
err := dec.Decode(&msg)
if err != nil {
c.cn()
return err
......
package rdwr_test
import (
"context"
"io"
"testing"
"gfx.cafe/open/jrpc/contrib/codecs/rdwr"
"gfx.cafe/open/jrpc/pkg/codec"
"gfx.cafe/open/jrpc/pkg/server"
"gfx.cafe/open/jrpc/pkg/jrpctest"
)
func TestBasicSuite(t *testing.T) {
jrpctest.RunBasicTestSuite(t, jrpctest.BasicTestSuiteArgs{
ServerMaker: func() (*server.Server, jrpctest.ClientMaker, func()) {
rd_s, wr_s := io.Pipe()
rd_c, wr_c := io.Pipe()
s := jrpctest.NewServer()
clientCodec := rdwr.NewCodec(rd_c, wr_s, nil)
go func() {
s.ServeCodec(context.Background(), clientCodec)
}()
return s, func() codec.Conn {
return rdwr.NewClient(rd_s, wr_c)
}, func() {}
},
ServerMaker: rdwr.ServerMaker,
})
}
package rdwr
import (
"context"
"io"
"gfx.cafe/open/jrpc/pkg/codec"
"gfx.cafe/open/jrpc/pkg/jrpctest"
"gfx.cafe/open/jrpc/pkg/server"
)
func ServerMaker() (*server.Server, jrpctest.ClientMaker, func()) {
rd_s, wr_s := io.Pipe()
rd_c, wr_c := io.Pipe()
s := jrpctest.NewServer()
clientCodec := NewCodec(rd_c, wr_s, nil)
go func() {
s.ServeCodec(context.Background(), clientCodec)
}()
return s, func() codec.Conn {
return NewClient(rd_s, wr_c)
}, func() {}
}
package websocket
import (
"context"
"gfx.cafe/open/jrpc/pkg/codec"
"gfx.cafe/open/jrpc/pkg/jrpctest"
"gfx.cafe/open/jrpc/pkg/server"
"github.com/stretchr/testify/require"
"net/http/httptest"
"testing"
"gfx.cafe/open/jrpc/pkg/jrpctest"
)
func TestBasicSuite(t *testing.T) {
jrpctest.RunBasicTestSuite(t, jrpctest.BasicTestSuiteArgs{
ServerMaker: func() (*server.Server, jrpctest.ClientMaker, func()) {
s := jrpctest.NewServer()
hsrv := httptest.NewServer(&Server{Server: s})
return s, func() codec.Conn {
conn, err := DialWebsocket(context.Background(), hsrv.URL, "")
require.NoError(t, err)
return conn
}, hsrv.Close
},
ServerMaker: ServerMaker,
})
}
package websocket
import (
"context"
"net/http/httptest"
"gfx.cafe/open/jrpc/pkg/codec"
"gfx.cafe/open/jrpc/pkg/jrpctest"
"gfx.cafe/open/jrpc/pkg/server"
)
func ServerMaker() (*server.Server, jrpctest.ClientMaker, func()) {
s := jrpctest.NewServer()
hsrv := httptest.NewServer(&Server{Server: s})
return s, func() codec.Conn {
conn, err := DialWebsocket(context.Background(), hsrv.URL, "")
if err != nil {
panic(err)
}
return conn
}, hsrv.Close
}
......@@ -11,6 +11,8 @@ import (
"gfx.cafe/open/jrpc/pkg/codec"
)
var _ codec.StreamingConn = (*WrapClient)(nil)
type WrapClient struct {
subs map[string]*clientSub
......@@ -18,6 +20,10 @@ type WrapClient struct {
mu sync.RWMutex
}
func (w *WrapClient) Closed() <-chan struct{} {
return w.conn.Closed()
}
func NewWrapClient(conn codec.StreamingConn) *WrapClient {
return &WrapClient{
subs: map[string]*clientSub{},
......
......@@ -30,6 +30,12 @@ type Client struct {
handlerPeer codec.PeerInfo
}
func Dial(url string, domain string) *Client {
return NewClient(redis.NewUniversalClient(&redis.UniversalOptions{
Addrs: []string{url},
}), domain)
}
func NewClient(c redis.UniversalClient, domain string) *Client {
cl := &Client{
c: c,
......@@ -48,6 +54,10 @@ func NewClient(c redis.UniversalClient, domain string) *Client {
return cl
}
func (c *Client) Closed() <-chan struct{} {
return c.ctx.Done()
}
func (c *Client) SetHandlerPeer(pi codec.PeerInfo) {
c.handlerPeer = pi
}
......