good morning!!!!

Skip to content
Commits on Source (131)
.idea
\ No newline at end of file
.idea
*.out
*.pb.gz
*.tmp
*.log
*.test
.go-cache:
variables:
GOPATH: $CI_PROJECT_DIR/.go
before_script:
- mkdir -p .go
cache:
paths:
- .go/pkg/mod/
stages:
- test
lint:
image: registry.gitlab.com/gitlab-org/gitlab-build-images:golangci-lint-alpine
stage: test
extends: .go-cache
script:
# Use default .golangci.yml file from the image if one is not present in the project root.
- "[ -e .golangci.yml ] || cp /golangci/.golangci.yml ."
# Write the code coverage report to gl-code-quality-report.json
# and print linting issues to stdout in the format: path/to/file:line description
# remove `--issues-exit-code 0` or set to non-zero to fail the job if linting issues are detected
- golangci-lint run --issues-exit-code 0 --print-issued-lines=false --out-format code-climate:gl-code-quality-report.json,line-number
artifacts:
when: always
reports:
codequality: gl-code-quality-report.json
paths:
- gl-code-quality-report.json
coverage:
stage: test
image: golang:1.20-alpine
coverage: '/\(statements\)(?:\s+)?(\d+(?:\.\d+)?%)/'
extends: .go-cache
script:
- go run gotest.tools/gotestsum@latest --junitfile report.xml --format testname -- -coverprofile=coverage.txt -covermode count ./...
- go tool cover -func=coverage.txt
- go run github.com/boumenot/gocover-cobertura@master < coverage.txt > coverage.xml
artifacts:
reports:
junit: report.xml
coverage_report:
coverage_format: cobertura
path: coverage.xml
run:
deadline: 10m
skip-dirs:
- hack
linters:
disable-all: true
enable:
- gofmt
- errcheck
- gosimple
- govet
- ineffassign
- staticcheck
- gocritic
- bodyclose
- gosec
- prealloc
- unconvert
- unused
- cyclop
linters-settings:
cyclop:
max-complexity: 20
gocritic:
# Which checks should be enabled; can't be combined with 'disabled-checks';
# See https://go-critic.github.io/overview#checks-overview
# To check which checks are enabled run `GL_DEBUG=gocritic ./build/bin/golangci-lint run`
# By default list of stable checks is used.
enabled-checks:
- ruleguard
- truncateCmp
# Which checks should be disabled; can't be combined with 'enabled-checks'; default is empty
disabled-checks:
- captLocal
- assignOp
- paramTypeCombine
- importShadow
- commentFormatting
# Enable multiple checks by tags, run `GL_DEBUG=gocritic golangci-lint run` to see all tags and checks.
# Empty list by default. See https://github.com/go-critic/go-critic#usage -> section "Tags".
enabled-tags:
- performance
- diagnostic
- opinionated
disabled-tags:
- experimental
settings:
hugeParam:
# size in bytes that makes the warning trigger (default 80)
sizeThreshold: 1000
rangeExprCopy:
# size in bytes that makes the warning trigger (default 512)
sizeThreshold: 512
# whether to check test functions (default true)
skipTestFuncs: true
truncateCmp:
# whether to skip int/uint/uintptr types (default true)
skipArchDependent: true
underef:
# whether to skip (*x).method() calls where x is a pointer receiver (default true)
skipRecvDeref: true
govet:
disable:
- deepequalerrors
- fieldalignment
- shadow
- unsafeptr
goconst:
min-len: 2
min-occurrences: 2
gofmt:
auto-fix: false
issues:
exclude-rules:
- linters:
- golint
text: "should be"
- linters:
- errcheck
text: "not checked"
- linters:
- staticcheck
text: "SA(1019|1029|5011)"
This is free and unencumbered software released into the public domain.
Anyone is free to copy, modify, publish, use, compile, sell, or
distribute this software, either in source code form or as a compiled
binary, for any purpose, commercial or non-commercial, and by any
means.
In jurisdictions that recognize copyright laws, the author or authors
of this software dedicate any and all copyright interest in the
software to the public domain. We make this dedication for the benefit
of the public at large and to the detriment of our heirs and
successors. We intend this dedication to be an overt act of
relinquishment in perpetuity of all present and future rights to this
software under copyright law.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
OTHER DEALINGS IN THE SOFTWARE.
For more information, please refer to <https://unlicense.org>
package benchmark
import (
"context"
"testing"
"gfx.cafe/open/jrpc/contrib/codecs/http"
"gfx.cafe/open/jrpc/contrib/codecs/inproc"
"gfx.cafe/open/jrpc/contrib/codecs/rdwr"
"gfx.cafe/open/jrpc/contrib/codecs/websocket"
"gfx.cafe/open/jrpc/pkg/codec"
"gfx.cafe/open/jrpc/pkg/jrpctest"
"gfx.cafe/open/jrpc/pkg/server"
)
func runBenchmarkSuite(b *testing.B, sm jrpctest.ServerMaker) {
ctx := context.Background()
executeBench := jrpctest.BenchExecutor(sm)
var makeBench = func(name string, fm jrpctest.BenchContext) {
b.Run(name, func(b *testing.B) {
executeBench(b, fm)
})
}
makeBench("SingleClient", func(b *testing.B, server *server.Server, client codec.Conn) {
for i := 0; i < b.N; i++ {
err := client.Do(ctx, nil, "test_ping", nil)
if err != nil {
panic(err)
}
}
})
}
func BenchmarkSimpleSuite(b *testing.B) {
makers := map[string]jrpctest.ServerMaker{
"Http": http.ServerMaker,
"WebSocket": websocket.ServerMaker,
"InProc": inproc.ServerMaker,
"IoPipe": rdwr.ServerMaker,
}
for k, v := range makers {
b.Run(k, func(b *testing.B) {
runBenchmarkSuite(b, v)
})
}
}
package jrpc
import (
"testing"
"golang.org/x/sync/errgroup"
)
func BenchmarkClientHTTPEcho(b *testing.B) {
server := newTestServer()
defer server.Stop()
client, hs := httpTestClient(server, "http", nil)
defer hs.Close()
defer client.Close()
// Launch concurrent requests.
wantBack := map[string]any{
"one": map[string]any{"two": "three"},
"e": map[string]any{"two": "three"},
"oe": map[string]any{"two": "three"},
"on": map[string]any{"two": "three"},
}
b.StartTimer()
for n := 0; n < b.N; n++ {
eg := &errgroup.Group{}
eg.SetLimit(4)
for i := 0; i < 1000; i++ {
eg.Go(func() error {
return client.Call(nil, nil, "test_echoAny", []any{1, 2, 3, 4, 56, 6, wantBack, wantBack, wantBack})
})
}
eg.Wait()
}
}
func BenchmarkClientHTTPEchoEmpty(b *testing.B) {
server := newTestServer()
defer server.Stop()
client, hs := httpTestClient(server, "http", nil)
defer hs.Close()
defer client.Close()
// Launch concurrent requests.
b.StartTimer()
for n := 0; n < b.N; n++ {
eg := &errgroup.Group{}
eg.SetLimit(4)
for i := 0; i < 1000; i++ {
eg.Go(func() error {
return client.Call(nil, nil, "test_echoAny", 0)
})
}
eg.Wait()
}
}
func BenchmarkClientWebsocketEcho(b *testing.B) {
server := newTestServer()
defer server.Stop()
client, hs := httpTestClient(server, "ws", nil)
defer hs.Close()
defer client.Close()
// Launch concurrent requests.
wantBack := map[string]any{
"one": map[string]any{"two": "three"},
"e": map[string]any{"two": "three"},
"oe": map[string]any{"two": "three"},
"on": map[string]any{"two": "three"},
}
payload := []any{1, 2, 3, 4, 56, 6, wantBack, wantBack, wantBack}
b.StartTimer()
for n := 0; n < b.N; n++ {
eg := &errgroup.Group{}
eg.SetLimit(4)
for i := 0; i < 1000; i++ {
eg.Go(func() error {
return client.Call(nil, nil, "test_echoAny", payload)
})
}
eg.Wait()
}
}
func BenchmarkClientWebsocketEchoEmpty(b *testing.B) {
server := newTestServer()
defer server.Stop()
client, hs := httpTestClient(server, "ws", nil)
defer hs.Close()
defer client.Close()
// Launch concurrent requests.
b.StartTimer()
for n := 0; n < b.N; n++ {
eg := &errgroup.Group{}
eg.SetLimit(4)
for i := 0; i < 1000; i++ {
eg.Go(func() error {
return client.Call(nil, nil, "test_echoAny", 0)
})
}
eg.Wait()
}
}
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package jrpc
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/url"
"reflect"
"sync/atomic"
"time"
jsoniter "github.com/json-iterator/go"
"tuxpa.in/a/zlog/log"
)
var (
ErrClientQuit = errors.New("client is closed")
ErrNoResult = errors.New("no result in JSON-RPC response")
ErrSubscriptionQueueOverflow = errors.New("subscription queue overflow")
errClientReconnected = errors.New("client reconnected")
errDead = errors.New("connection lost")
)
const (
// Timeouts
defaultDialTimeout = 10 * time.Second // used if context has no deadline
subscribeTimeout = 5 * time.Second // overall timeout eth_subscribe, rpc_modules calls
)
// BatchElem is an element in a batch request.
type BatchElem struct {
Method string
Args []any
// The result is unmarshaled into this field. Result must be set to a
// non-nil pointer value of the desired type, otherwise the response will be
// discarded.
Result any
// Error is set if the server returns an error for this request, or if
// unmarshaling into Result fails. It is not set for I/O errors.
Error error
}
var _ SubscriptionConn = (*Client)(nil)
// Client represents a connection to an RPC server.
type Client struct {
isHTTP bool // connection type: http, ws or ipc
idCounter uint64
r Router
// This function, if non-nil, is called when the connection is lost.
reconnectFunc reconnectFunc
// writeConn is used for writing to the connection on the caller's goroutine. It should
// only be accessed outside of dispatch, with the write lock held. The write lock is
// taken by sending on reqInit and released by sending on reqSent.
writeConn JsonWriter
// for dispatch
close chan struct{}
closing chan struct{} // closed when client is quitting
didClose chan struct{} // closed when client quits
reconnected chan ServerCodec // where write/reconnect sends the new connection
readOp chan readOp // read messages
readErr chan error // errors from read
reqInit chan *requestOp // register response IDs, takes write lock
reqSent chan error // signals write completion, releases write lock
reqTimeout chan *requestOp // removes response IDs when call timeout expires
}
func (c *Client) Router() Router {
if c.r == nil {
c.r = NewMux()
}
return c.r
}
type reconnectFunc func(ctx context.Context) (ServerCodec, error)
type clientContextKey struct{}
type clientConn struct {
codec ServerCodec
handler *handler
}
func (c *Client) newClientConn(conn ServerCodec) *clientConn {
ctx := context.Background()
ctx = context.WithValue(ctx, clientContextKey{}, c)
ctx = context.WithValue(ctx, peerInfoContextKey{}, conn.PeerInfo())
handler := newHandler(ctx, conn, c.r)
return &clientConn{conn, handler}
}
func (cc *clientConn) close(err error, inflightReq *requestOp) {
cc.handler.close(err, inflightReq)
cc.codec.Close()
}
type readOp struct {
msgs []*jsonrpcMessage
batch bool
}
type requestOp struct {
ids []json.RawMessage
err error
resp chan *jsonrpcMessage // receives up to len(ids) responses
sub *ClientSubscription
}
func (op *requestOp) wait(ctx context.Context, c *Client) (*jsonrpcMessage, error) {
select {
case <-ctx.Done():
// Send the timeout to dispatch so it can remove the request IDs.
if !c.isHTTP {
select {
case c.reqTimeout <- op:
case <-c.closing:
}
}
return nil, ctx.Err()
case resp := <-op.resp:
return resp, op.err
}
}
// Dial creates a new client for the given URL.
//
// The currently supported URL schemes are "http", "https", "ws" and "wss". If rawurl is a
// file name with no URL scheme, a local socket connection is established using UNIX
// domain sockets on supported platforms and named pipes on Windows. If you want to
// configure transport options, use DialHTTP, DialWebsocket or DialIPC instead.
//
// For websocket connections, the origin is set to the local host name.
//
// The client reconnects automatically if the connection is lost.
func Dial(rawurl string) (*Client, error) {
return DialContext(context.Background(), rawurl)
}
// DialContext creates a new RPC client, just like Dial.
//
// The context is used to cancel or time out the initial connection establishment. It does
// not affect subsequent interactions with the client.
func DialContext(ctx context.Context, rawurl string) (*Client, error) {
u, err := url.Parse(rawurl)
if err != nil {
return nil, err
}
switch u.Scheme {
case "http", "https":
return DialHTTP(rawurl)
case "ws", "wss":
return DialWebsocket(ctx, rawurl, "")
case "tcp":
return DialTCP(ctx, rawurl)
case "stdio":
return DialStdIO(ctx)
case "":
return DialIPC(ctx, rawurl)
default:
return nil, fmt.Errorf("no known transport for URL scheme %q", u.Scheme)
}
}
// ClientFromContext retrieves the client from the context, if any. This can be used to perform
// 'reverse calls' in a handler method.
func ClientFromContext(ctx context.Context) (*Client, bool) {
client, ok := ctx.Value(clientContextKey{}).(*Client)
return client, ok
}
func newClient(initctx context.Context, connect reconnectFunc) (*Client, error) {
conn, err := connect(initctx)
if err != nil {
return nil, err
}
c := initClient(conn, NewMux())
c.reconnectFunc = connect
return c, nil
}
func initClient(conn ServerCodec, r Router) *Client {
_, isHTTP := conn.(*httpConn)
c := &Client{
r: r,
isHTTP: isHTTP,
writeConn: conn,
close: make(chan struct{}),
closing: make(chan struct{}),
didClose: make(chan struct{}),
reconnected: make(chan ServerCodec),
readOp: make(chan readOp),
readErr: make(chan error),
reqInit: make(chan *requestOp),
reqSent: make(chan error, 1),
reqTimeout: make(chan *requestOp),
}
if !isHTTP {
go c.dispatch(conn)
}
return c
}
func (c *Client) nextID() *ID {
id := atomic.AddUint64(&c.idCounter, 1)
return NewNumberIDPtr(int64(id))
}
// SupportedModules calls the rpc_modules method, retrieving the list of
// APIs that are available on the server.
func (c *Client) SupportedModules() (map[string]string, error) {
var result map[string]string
ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
defer cancel()
err := c.Call(ctx, &result, "rpc_modules")
return result, err
}
// Close closes the client, aborting any in-flight requests.
func (c *Client) Close() error {
if c.isHTTP {
return nil
}
select {
case c.close <- struct{}{}:
<-c.didClose
case <-c.didClose:
}
return nil
}
// SetHeader adds a custom HTTP header to the client's requests.
// This method only works for clients using HTTP, it doesn't have
// any effect for clients using another transport.
func (c *Client) SetHeader(key, value string) {
if !c.isHTTP {
return
}
conn := c.writeConn.(*httpConn)
conn.mu.Lock()
conn.headers.Set(key, value)
conn.mu.Unlock()
}
func (c *Client) call(ctx context.Context, result any, msg *jsonrpcMessage) error {
var err error
op := &requestOp{ids: []json.RawMessage{msg.ID.RawMessage()}, resp: make(chan *jsonrpcMessage, 1)}
if c.isHTTP {
err = c.sendHTTP(ctx, op, msg)
} else {
err = c.send(ctx, op, msg)
}
if err != nil {
return err
}
// dispatch has accepted the request and will close the channel when it quits.
resp, err := op.wait(ctx, c)
if err != nil {
return err
}
switch {
case resp.Error != nil:
return resp.Error
case len(resp.Result) == 0:
return ErrNoResult
case result == nil:
return nil
default:
return json.Unmarshal(resp.Result, &result)
}
}
// Do performs a JSON-RPC call with the given arguments and unmarshals into
// result if no error occurred.
//
// The result must be a pointer so that package json can unmarshal into it. You
// can also pass nil, in which case the result is ignored.
func (c *Client) Do(ctx context.Context, result any, method string, params any) error {
if result != nil && reflect.TypeOf(result).Kind() != reflect.Ptr {
return fmt.Errorf("call result parameter must be pointer or nil interface: %v", result)
}
msg, err := c.newMessageP(method, params)
if err != nil {
return err
}
if ctx == nil {
ctx = context.TODO()
}
return c.call(ctx, result, msg)
}
// Call calls Do, except accepts variadic parameters
func (c *Client) Call(ctx context.Context, result any, method string, args ...any) error {
return c.Do(ctx, result, method, args)
}
// BatchCall sends all given requests as a single batch and waits for the server
// to return a response for all of them.
//
// In contrast to Call, BatchCall only returns I/O errors. Any error specific to
// a request is reported through the Error field of the corresponding BatchElem.
//
// Note that batch calls may not be executed atomically on the server side.
func (c *Client) BatchCall(ctx context.Context, b ...BatchElem) error {
var (
msgs = make([]*jsonrpcMessage, len(b))
byID = make(map[string]int, len(b))
)
if ctx == nil {
ctx = context.TODO()
}
op := &requestOp{
ids: make([]json.RawMessage, len(b)),
resp: make(chan *jsonrpcMessage, len(b)),
}
for i, elem := range b {
msg, err := c.newMessage(elem.Method, elem.Args...)
if err != nil {
return err
}
msgs[i] = msg
op.ids[i] = msg.ID.RawMessage()
byID[string(msg.ID.RawMessage())] = i
}
var err error
if c.isHTTP {
err = c.sendBatchHTTP(ctx, op, msgs)
} else {
err = c.send(ctx, op, msgs)
}
// Wait for all responses to come back.
for n := 0; n < len(b) && err == nil; n++ {
var resp *jsonrpcMessage
resp, err = op.wait(ctx, c)
if err != nil {
break
}
// Find the element corresponding to this response.
// The element is guaranteed to be present because dispatch
// only sends valid IDs to our channel.
elem := &b[byID[string(resp.ID.RawMessage())]]
if resp.Error != nil {
elem.Error = resp.Error
continue
}
if len(resp.Result) == 0 {
elem.Error = ErrNoResult
continue
}
elem.Error = json.Unmarshal(resp.Result, elem.Result)
}
return err
}
func (c *Client) Notify(ctx context.Context, method string, args ...any) error {
op := new(requestOp)
msg, err := c.newMessageP(method, args)
if err != nil {
return err
}
if ctx == nil {
ctx = context.TODO()
}
msg.ID = nil
if c.isHTTP {
return c.sendHTTP(ctx, op, msg)
}
return c.send(ctx, op, msg)
}
func (c *Client) Subscribe(ctx context.Context, namespace string, channel interface{}, args ...interface{}) (*ClientSubscription, error) {
// Check type of channel first.
chanVal := reflect.ValueOf(channel)
if chanVal.Kind() != reflect.Chan || chanVal.Type().ChanDir()&reflect.SendDir == 0 {
panic("first argument to Subscribe must be a writable channel")
}
if chanVal.IsNil() {
panic("channel given to Subscribe must not be nil")
}
if c.isHTTP {
return nil, ErrNotificationsUnsupported
}
msg, err := c.newMessage(namespace+subscribeMethodSuffix, args...)
if err != nil {
return nil, err
}
op := &requestOp{
ids: []json.RawMessage{msg.ID.RawMessage()},
resp: make(chan *jsonrpcMessage),
sub: newClientSubscription(c, namespace, chanVal),
}
// Send the subscription request.
// The arrival and validity of the response is signaled on sub.quit.
if err := c.send(ctx, op, msg); err != nil {
return nil, err
}
if _, err := op.wait(ctx, c); err != nil {
return nil, err
}
return op.sub, nil
}
func (c *Client) newMessage(method string, paramsIn ...any) (*jsonrpcMessage, error) {
return c.newMessageP(method, paramsIn)
}
func (c *Client) newMessageP(method string, paramIn any) (*jsonrpcMessage, error) {
msg := &jsonrpcMessage{ID: c.nextID(), Method: method}
if paramIn != nil { // prevent sending "params":null
var err error
if msg.Params, err = jsoniter.Marshal(paramIn); err != nil {
return nil, err
}
}
return msg, nil
}
// send registers op with the dispatch loop, then sends msg on the connection.
// if sending fails, op is deregistered.
func (c *Client) send(ctx context.Context, op *requestOp, msg any) error {
select {
case c.reqInit <- op:
err := c.write(ctx, msg, false)
c.reqSent <- err
return err
case <-ctx.Done():
// This can happen if the client is overloaded or unable to keep up with
// subscription notifications.
return ctx.Err()
case <-c.closing:
return ErrClientQuit
}
}
func (c *Client) write(ctx context.Context, msg any, retry bool) error {
if c.writeConn == nil {
// The previous write failed. Try to establish a new connection.
// time.Sleep(500 * time.Millisecond)
err := c.reconnect(ctx)
if err != nil {
return err
}
}
err := c.writeConn.WriteJSON(ctx, msg)
if err != nil {
c.writeConn = nil
if !retry {
return c.write(ctx, msg, true)
}
}
return err
}
func (c *Client) reconnect(ctx context.Context) error {
if c.reconnectFunc == nil {
return errDead
}
if _, ok := ctx.Deadline(); !ok {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, defaultDialTimeout)
defer cancel()
}
newconn, err := c.reconnectFunc(ctx)
if err != nil {
log.Trace().Err(err).Msg("RPC client reconnect failed")
return err
}
select {
case c.reconnected <- newconn:
c.writeConn = newconn
return nil
case <-c.didClose:
newconn.Close()
return ErrClientQuit
}
}
// dispatch is the main loop of the client.
// It sends read messages to waiting calls to Call and BatchCall
// and subscription notifications to registered subscriptions.
func (c *Client) dispatch(codec ServerCodec) {
var (
lastOp *requestOp // tracks last send operation
reqInitLock = c.reqInit // nil while the send lock is held
conn = c.newClientConn(codec)
reading = true
)
defer func() {
close(c.closing)
if reading {
conn.close(ErrClientQuit, nil)
c.drainRead()
}
close(c.didClose)
}()
// Spawn the initial read loop.
go c.read(codec)
for {
select {
case <-c.close:
return
// Read path:
case op := <-c.readOp:
if op.batch {
conn.handler.handleBatch(op.msgs)
} else {
conn.handler.handleMsg(op.msgs[0])
}
case err := <-c.readErr:
conn.handler.log.Debug().Err(err).Msg("RPC connection read error")
conn.close(err, lastOp)
reading = false
// Reconnect:
case newcodec := <-c.reconnected:
log.Debug().Bool("reading", reading).Str("conn", newcodec.RemoteAddr()).Msg("RPC client reconnected")
if reading {
// Wait for the previous read loop to exit. This is a rare case which
// happens if this loop isn't notified in time after the connection breaks.
// In those cases the caller will notice first and reconnect. Closing the
// handler terminates all waiting requests (closing op.resp) except for
// lastOp, which will be transferred to the new handler.
conn.close(errClientReconnected, lastOp)
c.drainRead()
}
go c.read(newcodec)
reading = true
conn = c.newClientConn(newcodec)
// Re-register the in-flight request on the new handler
// because that's where it will be sent.
conn.handler.addRequestOp(lastOp)
// Send path:
case op := <-reqInitLock:
// Stop listening for further requests until the current one has been sent.
reqInitLock = nil
lastOp = op
conn.handler.addRequestOp(op)
case err := <-c.reqSent:
if err != nil {
// Remove response handlers for the last send. When the read loop
// goes down, it will signal all other current operations.
conn.handler.removeRequestOp(lastOp)
}
// Let the next request in.
reqInitLock = c.reqInit
lastOp = nil
case op := <-c.reqTimeout:
conn.handler.removeRequestOp(op)
}
}
}
// drainRead drops read messages until an error occurs.
func (c *Client) drainRead() {
for {
select {
case <-c.readOp:
case <-c.readErr:
return
}
}
}
// read decodes RPC messages from a codec, feeding them into dispatch.
func (c *Client) read(codec ServerCodec) {
for {
msgs, batch, err := codec.ReadBatch()
if _, ok := err.(*json.SyntaxError); ok {
codec.WriteJSON(context.Background(), errorMessage(&parseError{err.Error()}))
}
if err != nil {
c.readErr <- err
return
}
c.readOp <- readOp{msgs, batch}
}
}
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package jrpc
import (
"context"
"fmt"
"math/rand"
"net"
"net/http"
"net/http/httptest"
"os"
"reflect"
"runtime"
"sync"
"testing"
"time"
"github.com/davecgh/go-spew/spew"
"tuxpa.in/a/zlog"
"tuxpa.in/a/zlog/log"
)
func init() {
zlog.SetGlobalLevel(zlog.FatalLevel)
}
func TestClientRequest(t *testing.T) {
server := newTestServer()
defer server.Stop()
client := DialInProc(server)
defer client.Close()
var resp echoResult
if err := client.Call(nil, &resp, "test_echo", "hello", 10, &echoArgs{"world"}); err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(resp, echoResult{"hello", 10, &echoArgs{"world"}}) {
t.Errorf("incorrect result %#v", resp)
}
}
func TestClientResponseType(t *testing.T) {
server := newTestServer()
defer server.Stop()
client := DialInProc(server)
defer client.Close()
if err := client.Call(nil, nil, "test_echo", "hello", 10, &echoArgs{"world"}); err != nil {
t.Errorf("Passing nil as result should be fine, but got an error: %v", err)
}
var resultVar echoResult
// Note: passing the var, not a ref
err := client.Call(nil, resultVar, "test_echo", "hello", 10, &echoArgs{"world"})
if err == nil {
t.Error("Passing a var as result should be an error")
}
}
// This test checks that server-returned errors with code and data come out of Client.Call.
func TestClientErrorData(t *testing.T) {
server := newTestServer()
defer server.Stop()
client := DialInProc(server)
defer client.Close()
var resp any
err := client.Call(nil, &resp, "test_returnError")
if err == nil {
t.Fatal("expected error")
}
// Check code.
if e, ok := err.(Error); !ok {
t.Fatalf("client did not return rpc.Error, got %#v", e)
} else if e.ErrorCode() != (testError{}.ErrorCode()) {
t.Fatalf("wrong error code %d, want %d", e.ErrorCode(), testError{}.ErrorCode())
}
// Check data.
if e, ok := err.(DataError); !ok {
t.Fatalf("client did not return rpc.DataError, got %#v", e)
} else if e.ErrorData() != (testError{}.ErrorData()) {
t.Fatalf("wrong error data %#v, want %#v", e.ErrorData(), testError{}.ErrorData())
}
}
func TestClientBatchRequest(t *testing.T) {
server := newTestServer()
defer server.Stop()
client := DialInProc(server)
defer client.Close()
batch := []BatchElem{
{
Method: "test_echo",
Args: []any{"hello", 10, &echoArgs{"world"}},
Result: new(echoResult),
},
{
Method: "test_echo",
Args: []any{"hello2", 11, &echoArgs{"world"}},
Result: new(echoResult),
},
{
Method: "no_such_method",
Args: []any{1, 2, 3},
Result: new(int),
},
}
if err := client.BatchCall(nil, batch...); err != nil {
t.Fatal(err)
}
wantResult := []BatchElem{
{
Method: "test_echo",
Args: []any{"hello", 10, &echoArgs{"world"}},
Result: &echoResult{"hello", 10, &echoArgs{"world"}},
},
{
Method: "test_echo",
Args: []any{"hello2", 11, &echoArgs{"world"}},
Result: &echoResult{"hello2", 11, &echoArgs{"world"}},
},
{
Method: "no_such_method",
Args: []any{1, 2, 3},
Result: new(int),
Error: &jsonError{Code: -32601, Message: "the method no_such_method does not exist/is not available"},
},
}
if !reflect.DeepEqual(batch, wantResult) {
t.Errorf("batch results mismatch:\ngot %swant %s", spew.Sdump(batch), spew.Sdump(wantResult))
}
}
func TestClientNotify(t *testing.T) {
server := newTestServer()
defer server.Stop()
client := DialInProc(server)
defer client.Close()
if err := client.Notify(context.Background(), "test_echo", "hello", 10, &echoArgs{"world"}); err != nil {
t.Fatal(err)
}
}
// func TestClientCancelInproc(t *testing.T) { testClientCancel("inproc", t) }
func TestClientCancelWebsocket(t *testing.T) { testClientCancel("ws", t) }
func TestClientCancelHTTP(t *testing.T) { testClientCancel("http", t) }
func TestClientCancelIPC(t *testing.T) { testClientCancel("ipc", t) }
// This test checks that requests made through Call can be canceled by canceling
// the context.
func testClientCancel(transport string, t *testing.T) {
// These tests take a lot of time, run them all at once.
// You probably want to run with -parallel 1 or comment out
// the call to t.Parallel if you enable the logging.
t.Parallel()
server := newTestServer()
defer server.Stop()
// What we want to achieve is that the context gets canceled
// at various stages of request processing. The interesting cases
// are:
// - cancel during dial
// - cancel while performing a HTTP request
// - cancel while waiting for a response
//
// To trigger those, the times are chosen such that connections
// are killed within the deadline for every other call (maxKillTimeout
// is 2x maxCancelTimeout).
//
// Once a connection is dead, there is a fair chance it won't connect
// successfully because the accept is delayed by 1s.
maxContextCancelTimeout := 300 * time.Millisecond
fl := &flakeyListener{
maxAcceptDelay: 1 * time.Second,
maxKillTimeout: 600 * time.Millisecond,
}
var client *Client
switch transport {
case "ws", "http":
c, hs := httpTestClient(server, transport, fl)
defer hs.Close()
client = c
case "ipc":
c, l := ipcTestClient(server, fl)
defer l.Close()
client = c
default:
panic("unknown transport: " + transport)
}
// The actual test starts here.
var (
wg sync.WaitGroup
nreqs = 10
ncallers = 10
)
caller := func(index int) {
defer wg.Done()
for i := 0; i < nreqs; i++ {
var (
ctx context.Context
cancel func()
timeout = time.Duration(rand.Int63n(int64(maxContextCancelTimeout)))
)
if index < ncallers/2 {
// For half of the callers, create a context without deadline
// and cancel it later.
ctx, cancel = context.WithCancel(context.Background())
time.AfterFunc(timeout, cancel)
} else {
// For the other half, create a context with a deadline instead. This is
// different because the context deadline is used to set the socket write
// deadline.
ctx, cancel = context.WithTimeout(context.Background(), timeout)
}
// Now perform a call with the context.
// The key thing here is that no call will ever complete successfully.
err := client.Call(ctx, nil, "test_block")
switch {
case err == nil:
_, hasDeadline := ctx.Deadline()
t.Errorf("no error for call with %v wait time (deadline: %v)", timeout, hasDeadline)
// default:
// t.Logf("got expected error with %v wait time: %v", timeout, err)
}
cancel()
}
}
wg.Add(ncallers)
for i := 0; i < ncallers; i++ {
go caller(i)
}
wg.Wait()
}
func TestClientSetHeader(t *testing.T) {
var gotHeader bool
srv := newTestServer()
httpsrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Header.Get("test") == "ok" {
gotHeader = true
}
srv.ServeHTTP(w, r)
}))
defer httpsrv.Close()
defer srv.Stop()
client, err := Dial(httpsrv.URL)
if err != nil {
t.Fatal(err)
}
defer client.Close()
client.SetHeader("test", "ok")
if _, err := client.SupportedModules(); err != nil {
t.Fatal(err)
}
if !gotHeader {
t.Fatal("client did not set custom header")
}
//NOTE: this test is removed because we accept invalid content types
// Check that Content-Type can be replaced.
//client.SetHeader("content-type", "application/x-garbage")
//_, err = client.SupportedModules()
//if err == nil {
// t.Fatal("no error for invalid content-type header")
//} else if !strings.Contains(err.Error(), "Unsupported Media Type") {
// t.Fatalf("error is not related to content-type: %q", err)
//}
}
func TestClientHTTP(t *testing.T) {
server := newTestServer()
defer server.Stop()
client, hs := httpTestClient(server, "http", nil)
defer hs.Close()
defer client.Close()
// Launch concurrent requests.
var (
results = make([]echoResult, 100)
errc = make(chan error, len(results))
wantResult = echoResult{"a", 1, new(echoArgs)}
)
defer client.Close()
for i := range results {
i := i
go func() {
errc <- client.Call(nil, &results[i], "test_echo", wantResult.String, wantResult.Int, wantResult.Args)
}()
}
// Wait for all of them to complete.
timeout := time.NewTimer(5 * time.Second)
defer timeout.Stop()
for i := range results {
select {
case err := <-errc:
if err != nil {
t.Fatal(err)
}
case <-timeout.C:
t.Fatalf("timeout (got %d/%d) results)", i+1, len(results))
}
}
// Check results.
for i := range results {
if !reflect.DeepEqual(results[i], wantResult) {
t.Errorf("result %d mismatch: got %#v, want %#v", i, results[i], wantResult)
}
}
}
func TestClientReconnect(t *testing.T) {
startServer := func(addr string) (*Server, net.Listener) {
srv := newTestServer()
l, err := net.Listen("tcp", addr)
if err != nil {
t.Fatal("can't listen:", err)
}
go http.Serve(l, srv.WebsocketHandler([]string{"*"}))
return srv, l
}
ctx, cancel := context.WithTimeout(context.Background(), 12*time.Second)
defer cancel()
// Start a server and corresponding client.
s1, l1 := startServer("127.0.0.1:0")
client, err := DialContext(ctx, "ws://"+l1.Addr().String())
if err != nil {
t.Fatal("can't dial", err)
}
defer client.Close()
// Perform a call. This should work because the server is up.
var resp echoResult
if err := client.Call(ctx, &resp, "test_echo", "", 1, nil); err != nil {
t.Fatal(err)
}
// Shut down the server and allow for some cool down time so we can listen on the same
// address again.
l1.Close()
s1.Stop()
time.Sleep(2 * time.Second)
// Try calling again. It shouldn't work.
if err := client.Call(ctx, &resp, "test_echo", "", 2, nil); err == nil {
t.Error("successful call while the server is down")
t.Logf("resp: %#v", resp)
}
// Start it up again and call again. The connection should be reestablished.
// We spawn multiple calls here to check whether this hangs somehow.
s2, l2 := startServer(l1.Addr().String())
defer l2.Close()
defer s2.Stop()
start := make(chan struct{})
errors := make(chan error, 20)
for i := 0; i < cap(errors); i++ {
go func() {
<-start
var resp echoResult
errors <- client.Call(ctx, &resp, "test_echo", "", 3, nil)
}()
}
close(start)
errcount := 0
for i := 0; i < cap(errors); i++ {
if err = <-errors; err != nil {
errcount++
}
}
t.Logf("%d errors, last error: %v", errcount, err)
if errcount > 1 {
t.Errorf("expected one error after disconnect, got %d", errcount)
}
}
func httpTestClient(srv *Server, transport string, fl *flakeyListener) (*Client, *httptest.Server) {
// Create the HTTP server.
var hs *httptest.Server
switch transport {
case "ws":
hs = httptest.NewUnstartedServer(srv.WebsocketHandler([]string{"*"}))
case "http":
hs = httptest.NewUnstartedServer(srv)
default:
panic("unknown HTTP transport: " + transport)
}
// Wrap the listener if required.
if fl != nil {
fl.Listener = hs.Listener
hs.Listener = fl
}
// Connect the client.
hs.Start()
client, err := Dial(transport + "://" + hs.Listener.Addr().String())
if err != nil {
panic(err)
}
return client, hs
}
func ipcTestClient(srv *Server, fl *flakeyListener) (*Client, net.Listener) {
// Listen on a random endpoint.
endpoint := fmt.Sprintf("go-ethereum-test-ipc-%d-%d", os.Getpid(), rand.Int63())
if runtime.GOOS == "windows" {
endpoint = `\\.\pipe\` + endpoint
} else {
endpoint = os.TempDir() + "/" + endpoint
}
l, err := ipcListen(endpoint)
if err != nil {
panic(err)
}
// Connect the listener to the server.
if fl != nil {
fl.Listener = l
l = fl
}
go srv.ServeListener(l)
// Connect the client.
client, err := Dial(endpoint)
if err != nil {
panic(err)
}
return client, l
}
// flakeyListener kills accepted connections after a random timeout.
type flakeyListener struct {
net.Listener
maxKillTimeout time.Duration
maxAcceptDelay time.Duration
}
func (l *flakeyListener) Accept() (net.Conn, error) {
delay := time.Duration(rand.Int63n(int64(l.maxAcceptDelay)))
time.Sleep(delay)
c, err := l.Listener.Accept()
if err == nil {
timeout := time.Duration(rand.Int63n(int64(l.maxKillTimeout)))
time.AfterFunc(timeout, func() {
log.Debug().Msg(fmt.Sprintf("killing conn %v after %v", c.LocalAddr(), timeout))
c.Close()
})
}
return c, err
}
package main
import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"os"
"strings"
"gfx.cafe/open/jrpc"
"sigs.k8s.io/yaml"
)
func main() {
scan := bufio.NewReader(os.Stdin)
ctx := context.Background()
c := &cli{}
c.silent(`commands:
[m]ethod, [d]ial, [s]end, [p]arams, [c]lient, [q]uit
`)
for {
if err := c.tick(ctx, scan); err != nil {
if !errors.Is(err, context.Canceled) {
c.silent("%s", err)
}
break
}
}
}
type cli struct {
remote string
method string
params json.RawMessage
verbose bool
conn jrpc.Conn
}
func (c *cli) silent(s string, args ...any) {
if c.verbose {
fmt.Printf(s+"\n", args...)
}
}
func (c *cli) tick(ctx context.Context, scan *bufio.Reader) error {
cmd, err := scan.ReadString('\n')
if err != nil {
return err
}
cmd = strings.TrimSpace(cmd)
if cmd == "" {
return nil
}
splt := strings.SplitN(cmd, " ", 2)
if len(splt) == 0 {
return nil
}
switch splt[0] {
case "d", "dial":
if len(splt) == 1 {
c.silent("need to specify url to dial to")
return nil
}
c.conn, err = jrpc.DialContext(ctx, splt[1])
if err != nil {
return fmt.Errorf("failed to dial: %w", err)
}
c.remote = splt[1]
c.silent("connected to %s", splt[1])
case "m", "method":
if len(splt) == 1 {
fmt.Println("need to specify method to use")
return nil
}
c.method = splt[1]
c.silent("method to %s", splt[1])
case "p", "params":
nextExit := false
bts := []byte(splt[1])
for {
tmp, err := scan.ReadBytes('\n')
if err != nil {
return err
}
if len(tmp) == 1 {
if nextExit {
break
}
nextExit = true
continue
}
bts = append(bts, tmp...)
}
var m any
err = yaml.Unmarshal(bts, &m)
if err != nil {
return err
}
c.params, _ = json.Marshal(m)
case "c", "client":
fmt.Printf(
`
remote: %s
method: %s
params: %s
`, c.remote, c.method, c.params)
case "s", "send":
var res json.RawMessage
c.conn.Do(ctx, &res, c.method, c.params)
fmt.Println(string(res))
case "q", "quit", "exit":
os.Exit(0)
}
return nil
}
d wss://mainnet.rpc.gfx.xyz
m eth_getBlockByNumber
p [1500000, true]
send
package jrpc
import "context"
var _ Conn = (*Client)(nil)
type Conn interface {
Do(ctx context.Context, result any, method string, params any) error
BatchCall(ctx context.Context, b ...BatchElem) error
SetHeader(key, value string)
Close() error
}
type SubscriptionConn interface {
Conn
Notify(ctx context.Context, method string, args ...any) error
Subscribe(ctx context.Context, namespace string, channel any, args ...any) (*ClientSubscription, error)
}
package client
import (
"context"
"net"
"sync"
"sync/atomic"
"gfx.cafe/open/jrpc"
"gfx.cafe/open/jrpc/pkg/codec"
)
var _ codec.Conn = (*Pooling)(nil)
type Pooling struct {
dialer func(ctx context.Context) (jrpc.Conn, error)
conns chan codec.Conn
base codec.Conn
closed atomic.Bool
middleware []codec.Middleware
mu sync.Mutex
}
func NewPooling(dialer func(ctx context.Context) (jrpc.Conn, error), max int) *Pooling {
r := &Pooling{
dialer: dialer,
conns: make(chan codec.Conn, max),
}
return r
}
func (r *Pooling) Do(ctx context.Context, result any, method string, params any) error {
if r.closed.Load() {
return net.ErrClosed
}
errChan := make(chan error)
go func() {
conn, err := r.getClient(ctx)
if err != nil {
errChan <- err
return
}
defer r.putClient(conn)
errChan <- conn.Do(ctx, result, method, params)
}()
return <-errChan
}
func (r *Pooling) BatchCall(ctx context.Context, b ...*codec.BatchElem) error {
if r.closed.Load() {
return net.ErrClosed
}
errChan := make(chan error)
go func() {
conn, err := r.getClient(ctx)
if err != nil {
errChan <- err
return
}
defer r.putClient(conn)
errChan <- conn.BatchCall(ctx, b...)
}()
return <-errChan
}
func (p *Pooling) Mount(m codec.Middleware) {
p.middleware = append(p.middleware, m)
}
func (p *Pooling) Close() error {
if p.closed.CompareAndSwap(false, true) {
for k := range p.conns {
k.Close()
}
}
return nil
}
func (p *Pooling) Closed() <-chan struct{} {
return make(<-chan struct{})
}
func (r *Pooling) getClient(ctx context.Context) (jrpc.Conn, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-r.conns:
default:
}
conn, err := r.dialer(ctx)
if err != nil {
return nil, err
}
return conn, nil
}
func (r *Pooling) putClient(conn jrpc.Conn) {
if r.closed.Load() {
return
}
select {
case <-conn.Closed():
default:
}
select {
case r.conns <- conn:
default:
conn.Close()
}
}
package client
import (
"context"
"sync"
"gfx.cafe/open/jrpc"
"gfx.cafe/open/jrpc/pkg/codec"
)
var _ jrpc.Conn = (*Reconnecting)(nil)
type Reconnecting struct {
dialer func(ctx context.Context) (jrpc.Conn, error)
base codec.Conn
alive bool
middleware []codec.Middleware
mu sync.Mutex
}
func NewReconnecting(dialer func(ctx context.Context) (jrpc.Conn, error)) *Reconnecting {
r := &Reconnecting{
dialer: dialer,
}
return r
}
func (r *Reconnecting) getClient(ctx context.Context) (jrpc.Conn, error) {
reconnect := func() error {
conn, err := r.dialer(ctx)
if err != nil {
return err
}
r.base = conn
r.alive = true
return nil
}
r.mu.Lock()
defer r.mu.Unlock()
if r.base == nil {
err := reconnect()
if err != nil {
return nil, err
}
} else {
select {
case <-r.base.Closed():
err := reconnect()
if err != nil {
return nil, err
}
default:
}
}
return r.base, nil
}
func (r *Reconnecting) Do(ctx context.Context, result any, method string, params any) error {
errChan := make(chan error)
go func() {
conn, err := r.getClient(ctx)
if err != nil {
errChan <- err
return
}
errChan <- conn.Do(ctx, result, method, params)
}()
return <-errChan
}
func (r *Reconnecting) BatchCall(ctx context.Context, b ...*codec.BatchElem) error {
errChan := make(chan error)
go func() {
conn, err := r.getClient(ctx)
if err != nil {
errChan <- err
return
}
errChan <- conn.BatchCall(ctx, b...)
}()
return <-errChan
}
func (r *Reconnecting) Mount(m codec.Middleware) {
r.middleware = append(r.middleware, m)
}
// why would you want to do this....
func (r *Reconnecting) Close() error {
conn, err := r.getClient(context.Background())
if err != nil {
return err
}
return conn.Close()
}
// never....
func (r *Reconnecting) Closed() <-chan struct{} {
return make(<-chan struct{})
}
package broker
import (
"context"
"encoding/json"
)
type ServerSpoke interface {
ReadRequest(ctx context.Context) (json.RawMessage, func(json.RawMessage) error, error)
}
type ClientSpoke interface {
WriteRequest(ctx context.Context, clientId string, msg json.RawMessage) error
Subscribe(ctx context.Context, clientId string) (Subscription, error)
}
type Broker interface {
ServerSpoke
ClientSpoke
}
type Subscription interface {
// channel that will close when done or error
Listen() <-chan json.RawMessage
// should close the channel and also stop listening
Close() error
// this hold errors
Err() error
}
package broker
import (
"context"
"encoding/json"
"strings"
"sync"
"sync/atomic"
"tuxpa.in/a/zlog/log"
)
type subscription struct {
topic string
ch chan json.RawMessage
err error
closed atomic.Bool
mu sync.RWMutex
}
// channel that will close when done or error
func (s *subscription) Listen() <-chan json.RawMessage {
return s.ch
}
// should close the channel and also stop listening
func (s *subscription) Close() error {
s.closed.CompareAndSwap(false, true)
return nil
}
// this hold errors
func (s *subscription) Err() error {
s.mu.RLock()
defer s.mu.RUnlock()
return s.err
}
type frame struct {
topic string
data json.RawMessage
}
type ChannelBroker struct {
mu sync.RWMutex
subs map[int]*subscription
subCount int
msgs chan *frame
domain string
}
func NewChannelBroker() *ChannelBroker {
return &ChannelBroker{
subs: map[int]*subscription{},
msgs: make(chan *frame, 128),
}
}
func (b *ChannelBroker) ReadRequest(ctx context.Context) (json.RawMessage, func(json.RawMessage) error, error) {
select {
case <-ctx.Done():
return nil, nil, ctx.Err()
case f := <-b.msgs:
return f.data, func(resp json.RawMessage) error {
return b.Publish(context.Background(), f.topic, resp)
}, nil
}
}
func (b *ChannelBroker) WriteRequest(ctx context.Context, topic string, msg json.RawMessage) error {
select {
case <-ctx.Done():
return ctx.Err()
case b.msgs <- &frame{data: msg, topic: topic}:
}
return nil
}
func (b *ChannelBroker) Publish(ctx context.Context, topic string, data []byte) error {
b.mu.RLock()
defer b.mu.RUnlock()
for _, v := range b.subs {
if v.closed.Load() {
continue
}
if childTopicMatchesParent(v.topic, topic) {
select {
case v.ch <- data:
default:
log.Trace().Str("topic", topic).Msg("dropped message")
}
}
}
return nil
}
func (b *ChannelBroker) Subscribe(ctx context.Context, topic string) (Subscription, error) {
sub := &subscription{
topic: topic,
ch: make(chan json.RawMessage, 16),
}
b.mu.Lock()
b.subCount = b.subCount + 1
id := b.subCount
b.subs[id] = sub
b.mu.Unlock()
// gc after adding a new subscription
b.gc()
return sub, nil
}
func (b *ChannelBroker) gc() {
b.mu.Lock()
defer b.mu.Unlock()
for k, v := range b.subs {
if v.closed.Load() {
delete(b.subs, k)
}
}
}
// This is to see if a message with topic child should be matched with subscription parent
// so the child cannot contain wildcards * and >, but the parent can
func childTopicMatchesParent(parentString string, childString string) bool {
parent := strings.Split(parentString, ".")
child := strings.Split(childString, ".")
// if the length of the child is less than the length of the parent, its not possible for match
// for instance, if the parent topic is "one.two", and the child is "one", it will never match
if len(child) < len(parent) {
return false
}
// this is safe because length of child must be equal to or lower than parent, from previous
for idx, v := range parent {
// if parent is wildcard, match all, so continue
if v == "*" {
continue
}
// if the > wildcard is at the end, and we have exited since then, we are done
if v == ">" && len(parent)-1 == idx {
return true
}
// else make sure parent matches child.
if child[idx] != v {
return false
}
}
if len(child) == len(parent) {
return true
}
return false
}
package nats
import (
"context"
"testing"
"gfx.cafe/open/jrpc/contrib/codecs/broker"
"gfx.cafe/open/jrpc/pkg/codec"
"gfx.cafe/open/jrpc/pkg/server"
"gfx.cafe/open/jrpc/pkg/jrpctest"
"github.com/alicebob/miniredis/v2"
"github.com/redis/go-redis/v9"
)
func TestBasicSuite(t *testing.T) {
jrpctest.RunBasicTestSuite(t, jrpctest.BasicTestSuiteArgs{
ServerMaker: func() (*server.Server, jrpctest.ClientMaker, func()) {
redisServer := miniredis.RunT(t)
connOpts := &redis.UniversalOptions{
Addrs: []string{redisServer.Addr()},
}
ctx := redisServer.Ctx
ctx, cn := context.WithCancel(ctx)
b := CreateBroker(ctx, "jrpc", connOpts)
s := jrpctest.NewServer()
spokeServer := (&broker.Server{Server: s})
go spokeServer.ServeSpoke(ctx, b)
return s, func() codec.Conn {
return broker.NewClient(b)
}, func() {
cn()
redisServer.CtxCancel()
}
},
})
}
package nats
import (
"context"
"encoding/json"
"sync"
"sync/atomic"
"time"
"unsafe"
"gfx.cafe/open/jrpc/contrib/codecs/broker"
"github.com/redis/go-redis/v9"
"github.com/rs/xid"
)
type Broker struct {
client redis.UniversalClient
domain string
id xid.ID
}
type subscription struct {
ch chan json.RawMessage
err error
closed atomic.Bool
mu sync.RWMutex
pubsub *redis.PubSub
}
// channel that will close when done or error
func (s *subscription) Listen() <-chan json.RawMessage {
return s.ch
}
// should close the channel and also stop listening
func (s *subscription) Close() error {
if s.closed.CompareAndSwap(false, true) {
s.pubsub.Close()
}
return nil
}
// this hold errors
func (s *subscription) Err() error {
s.mu.RLock()
defer s.mu.RUnlock()
return s.err
}
func (s *Broker) WriteRequest(ctx context.Context, clientId string, msg json.RawMessage) error {
req, err := json.Marshal(&RedisRequest{
ReplyChannel: clientId,
Message: msg,
})
if err != nil {
return err
}
return s.client.LPush(ctx, s.domain+reqDomainSuffix, req).Err()
}
func (s *Broker) Subscribe(ctx context.Context, clientId string) (broker.Subscription, error) {
topic := s.domain + "." + clientId + respDomainSuffix
sub := &subscription{
ch: make(chan json.RawMessage, 16),
}
sub.pubsub = s.client.Subscribe(ctx, topic)
ch := sub.pubsub.Channel()
go func() {
for {
select {
case <-ctx.Done():
sub.Close()
return
case t := <-ch:
select {
case sub.ch <- json.RawMessage(stringToBytes(t.Payload)):
default:
}
}
}
}()
return sub, nil
}
func CreateBroker(ctx context.Context, domain string, opts *redis.UniversalOptions) *Broker {
c := redis.NewUniversalClient(opts)
// the xid doesn't need to be secure, since we assume anyone with access to the redis cluster can do anything anyways.
s := &Broker{
client: c,
id: xid.New(),
domain: domain,
}
return s
}
type RedisRequest struct {
ReplyChannel string `json:"r"`
Message json.RawMessage `json:"msg"`
}
func (s *Broker) ReadRequest(ctx context.Context) (json.RawMessage, func(json.RawMessage) error, error) {
timeout := time.Hour
res, err := s.client.BLPop(ctx, timeout, s.domain+reqDomainSuffix).Result()
if err != nil {
return nil, nil, err
}
if len(res) != 2 {
return nil, nil, err
}
redisReq := &RedisRequest{}
err = json.Unmarshal(stringToBytes(res[1]), redisReq)
if err != nil {
return nil, nil, err
}
return redisReq.Message, func(rm json.RawMessage) error {
if len(rm) == 0 {
return nil
}
target := s.domain + "." + redisReq.ReplyChannel + respDomainSuffix
err := s.client.Publish(context.Background(), target, []byte(rm)).Err()
if err != nil {
return err
}
return nil
}, nil
}
const reqDomainSuffix = ".req"
const respDomainSuffix = ".resp"
// stringHeader is the runtime representation of a string.
// It should be identical to reflect.StringHeader
type stringHeader struct {
data unsafe.Pointer
stringLen int
}
// sliceHeader is the runtime representation of a slice.
// It should be identical to reflect.sliceHeader
type sliceHeader struct {
data unsafe.Pointer
sliceLen int
sliceCap int
}
func stringToBytes(s string) (b []byte) {
stringHeader := (*stringHeader)(unsafe.Pointer(&s))
sliceHeader := (*sliceHeader)(unsafe.Pointer(&b))
sliceHeader.data = stringHeader.data
sliceHeader.sliceLen = len(s)
sliceHeader.sliceCap = len(s)
return b
}
package redis
import (
"testing"
"gfx.cafe/open/jrpc/pkg/jrpctest"
)
func TestBasicSuite(t *testing.T) {
jrpctest.RunBasicTestSuite(t, jrpctest.BasicTestSuiteArgs{
ServerMaker: ServerMaker,
})
}
package redis
import (
"context"
"encoding/json"
"sync"
"sync/atomic"
"time"
"unsafe"
"gfx.cafe/open/jrpc/contrib/codecs/broker"
"github.com/redis/go-redis/v9"
"github.com/rs/xid"
)
type Broker struct {
client redis.UniversalClient
domain string
id xid.ID
}
type subscription struct {
ch chan json.RawMessage
err error
closed atomic.Bool
mu sync.RWMutex
pubsub *redis.PubSub
}
// channel that will close when done or error
func (s *subscription) Listen() <-chan json.RawMessage {
return s.ch
}
// should close the channel and also stop listening
func (s *subscription) Close() error {
if s.closed.CompareAndSwap(false, true) {
s.pubsub.Close()
}
return nil
}
// this hold errors
func (s *subscription) Err() error {
s.mu.RLock()
defer s.mu.RUnlock()
return s.err
}
func (s *Broker) WriteRequest(ctx context.Context, clientId string, msg json.RawMessage) error {
req, err := json.Marshal(&RedisRequest{
ReplyChannel: clientId,
Message: msg,
})
if err != nil {
return err
}
return s.client.LPush(ctx, s.domain+reqDomainSuffix, req).Err()
}
func (s *Broker) Subscribe(ctx context.Context, clientId string) (broker.Subscription, error) {
topic := s.domain + "." + clientId + respDomainSuffix
sub := &subscription{
ch: make(chan json.RawMessage, 16),
}
sub.pubsub = s.client.Subscribe(ctx, topic)
ch := sub.pubsub.Channel()
go func() {
for {
select {
case <-ctx.Done():
sub.Close()
return
case t := <-ch:
select {
case sub.ch <- json.RawMessage(stringToBytes(t.Payload)):
default:
}
}
}
}()
return sub, nil
}
func CreateBroker(ctx context.Context, domain string, opts *redis.UniversalOptions) *Broker {
c := redis.NewUniversalClient(opts)
// the xid doesn't need to be secure, since we assume anyone with access to the redis cluster can do anything anyways.
s := &Broker{
client: c,
id: xid.New(),
domain: domain,
}
return s
}
type RedisRequest struct {
ReplyChannel string `json:"r"`
Message json.RawMessage `json:"msg"`
}
func (s *Broker) ReadRequest(ctx context.Context) (json.RawMessage, func(json.RawMessage) error, error) {
timeout := time.Second
res, err := s.client.BLPop(ctx, timeout, s.domain+reqDomainSuffix).Result()
if err != nil {
return nil, nil, err
}
if len(res) != 2 {
return nil, nil, err
}
redisReq := &RedisRequest{}
err = json.Unmarshal(stringToBytes(res[1]), redisReq)
if err != nil {
return nil, nil, err
}
return redisReq.Message, func(rm json.RawMessage) error {
if len(rm) == 0 {
return nil
}
target := s.domain + "." + redisReq.ReplyChannel + respDomainSuffix
err := s.client.Publish(context.Background(), target, []byte(rm)).Err()
if err != nil {
return err
}
return nil
}, nil
}
const reqDomainSuffix = ".req"
const respDomainSuffix = ".resp"
// stringHeader is the runtime representation of a string.
// It should be identical to reflect.StringHeader
type stringHeader struct {
data unsafe.Pointer
stringLen int
}
// sliceHeader is the runtime representation of a slice.
// It should be identical to reflect.sliceHeader
type sliceHeader struct {
data unsafe.Pointer
sliceLen int
sliceCap int
}
func stringToBytes(s string) (b []byte) {
stringHeader := (*stringHeader)(unsafe.Pointer(&s))
sliceHeader := (*sliceHeader)(unsafe.Pointer(&b))
sliceHeader.data = stringHeader.data
sliceHeader.sliceLen = len(s)
sliceHeader.sliceCap = len(s)
return b
}
package redis
import (
"context"
"gfx.cafe/open/jrpc/contrib/codecs/broker"
"gfx.cafe/open/jrpc/pkg/codec"
"gfx.cafe/open/jrpc/pkg/jrpctest"
"gfx.cafe/open/jrpc/pkg/server"
"github.com/alicebob/miniredis/v2"
"github.com/redis/go-redis/v9"
)
func ServerMaker() (*server.Server, jrpctest.ClientMaker, func()) {
redisServer, err := miniredis.Run()
if err != nil {
panic(err)
}
connOpts := &redis.UniversalOptions{
Addrs: []string{redisServer.Addr()},
}
ctx := redisServer.Ctx
ctx, cn := context.WithCancel(ctx)
b := CreateBroker(ctx, "jrpc", connOpts)
s := jrpctest.NewServer()
spokeServer := (&broker.Server{Server: s})
go spokeServer.ServeSpoke(ctx, b)
return s, func() codec.Conn {
return broker.NewClient(b)
}, func() {
cn()
redisServer.CtxCancel()
}
}