good morning!!!!

Skip to content
Commits on Source (26)
.go-cache:
variables:
GOPATH: $CI_PROJECT_DIR/.go
before_script:
- mkdir -p .go
cache:
paths:
- .go/pkg/mod/
stages:
- test
lint:
image: registry.gitlab.com/gitlab-org/gitlab-build-images:golangci-lint-alpine
stage: test
extends: .go-cache
script:
# Use default .golangci.yml file from the image if one is not present in the project root.
- "[ -e .golangci.yml ] || cp /golangci/.golangci.yml ."
# Write the code coverage report to gl-code-quality-report.json
# and print linting issues to stdout in the format: path/to/file:line description
# remove `--issues-exit-code 0` or set to non-zero to fail the job if linting issues are detected
- golangci-lint run --issues-exit-code 0 --print-issued-lines=false --out-format code-climate:gl-code-quality-report.json,line-number
artifacts:
when: always
reports:
codequality: gl-code-quality-report.json
paths:
- gl-code-quality-report.json
coverage:
stage: test
image: golang:1.20-alpine
coverage: '/\(statements\)(?:\s+)?(\d+(?:\.\d+)?%)/'
extends: .go-cache
script:
- go run gotest.tools/gotestsum@latest --junitfile report.xml --format testname -- -coverprofile=coverage.txt -covermode count ./...
- go tool cover -func=coverage.txt
- go run github.com/boumenot/gocover-cobertura@master < coverage.txt > coverage.xml
artifacts:
reports:
junit: report.xml
coverage_report:
coverage_format: cobertura
path: coverage.xml
run:
deadline: 10m
skip-dirs:
- hack
linters:
disable-all: true
enable:
- gofmt
- errcheck
- gosimple
- govet
- ineffassign
- staticcheck
- gocritic
- bodyclose
- gosec
- prealloc
- unconvert
- unused
- cyclop
linters-settings:
cyclop:
max-complexity: 20
gocritic:
# Which checks should be enabled; can't be combined with 'disabled-checks';
# See https://go-critic.github.io/overview#checks-overview
# To check which checks are enabled run `GL_DEBUG=gocritic ./build/bin/golangci-lint run`
# By default list of stable checks is used.
enabled-checks:
- ruleguard
- truncateCmp
# Which checks should be disabled; can't be combined with 'enabled-checks'; default is empty
disabled-checks:
- captLocal
- assignOp
- paramTypeCombine
- importShadow
- commentFormatting
# Enable multiple checks by tags, run `GL_DEBUG=gocritic golangci-lint run` to see all tags and checks.
# Empty list by default. See https://github.com/go-critic/go-critic#usage -> section "Tags".
enabled-tags:
- performance
- diagnostic
- opinionated
disabled-tags:
- experimental
settings:
hugeParam:
# size in bytes that makes the warning trigger (default 80)
sizeThreshold: 1000
rangeExprCopy:
# size in bytes that makes the warning trigger (default 512)
sizeThreshold: 512
# whether to check test functions (default true)
skipTestFuncs: true
truncateCmp:
# whether to skip int/uint/uintptr types (default true)
skipArchDependent: true
underef:
# whether to skip (*x).method() calls where x is a pointer receiver (default true)
skipRecvDeref: true
govet:
disable:
- deepequalerrors
- fieldalignment
- shadow
- unsafeptr
goconst:
min-len: 2
min-occurrences: 2
gofmt:
auto-fix: false
issues:
exclude-rules:
- linters:
- golint
text: "should be"
- linters:
- errcheck
text: "not checked"
- linters:
- staticcheck
text: "SA(1019|1029|5011)"
package jrpc
import (
"testing"
"golang.org/x/sync/errgroup"
)
func BenchmarkClientHTTPEcho(b *testing.B) {
server := newTestServer()
defer server.Stop()
client, hs := httpTestClient(server, "http", nil)
defer hs.Close()
defer client.Close()
// Launch concurrent requests.
wantBack := map[string]any{
"one": map[string]any{"two": "three"},
"e": map[string]any{"two": "three"},
"oe": map[string]any{"two": "three"},
"on": map[string]any{"two": "three"},
}
b.StartTimer()
for n := 0; n < b.N; n++ {
eg := &errgroup.Group{}
eg.SetLimit(4)
for i := 0; i < 1000; i++ {
eg.Go(func() error {
return client.Call(nil, nil, "test_echoAny", []any{1, 2, 3, 4, 56, 6, wantBack, wantBack, wantBack})
})
}
eg.Wait()
}
}
func BenchmarkClientHTTPEchoEmpty(b *testing.B) {
server := newTestServer()
defer server.Stop()
client, hs := httpTestClient(server, "http", nil)
defer hs.Close()
defer client.Close()
// Launch concurrent requests.
b.StartTimer()
for n := 0; n < b.N; n++ {
eg := &errgroup.Group{}
eg.SetLimit(4)
for i := 0; i < 1000; i++ {
eg.Go(func() error {
return client.Call(nil, nil, "test_echoAny", 0)
})
}
eg.Wait()
}
}
func BenchmarkClientWebsocketEcho(b *testing.B) {
server := newTestServer()
defer server.Stop()
client, hs := httpTestClient(server, "ws", nil)
defer hs.Close()
defer client.Close()
// Launch concurrent requests.
wantBack := map[string]any{
"one": map[string]any{"two": "three"},
"e": map[string]any{"two": "three"},
"oe": map[string]any{"two": "three"},
"on": map[string]any{"two": "three"},
}
payload := []any{1, 2, 3, 4, 56, 6, wantBack, wantBack, wantBack}
b.StartTimer()
for n := 0; n < b.N; n++ {
eg := &errgroup.Group{}
eg.SetLimit(4)
for i := 0; i < 1000; i++ {
eg.Go(func() error {
return client.Call(nil, nil, "test_echoAny", payload)
})
}
eg.Wait()
}
}
func BenchmarkClientWebsocketEchoEmpty(b *testing.B) {
server := newTestServer()
defer server.Stop()
client, hs := httpTestClient(server, "ws", nil)
defer hs.Close()
defer client.Close()
// Launch concurrent requests.
b.StartTimer()
for n := 0; n < b.N; n++ {
eg := &errgroup.Group{}
eg.SetLimit(4)
for i := 0; i < 1000; i++ {
eg.Go(func() error {
return client.Call(nil, nil, "test_echoAny", 0)
})
}
eg.Wait()
}
}
......@@ -10,7 +10,6 @@ import (
"net/http"
"sync"
"sync/atomic"
"time"
"gfx.cafe/open/jrpc/pkg/codec"
......@@ -27,12 +26,6 @@ var (
errDead = errors.New("connection lost")
)
const (
// Timeouts
defaultDialTimeout = 10 * time.Second // used if context has no deadline
subscribeTimeout = 5 * time.Second // overall timeout eth_subscribe, rpc_modules calls
)
var _ codec.Conn = (*Client)(nil)
// Client represents a connection to an RPC server.
......@@ -74,7 +67,10 @@ func (c *Client) SetHeader(key string, value string) {
}
func (c *Client) Do(ctx context.Context, result any, method string, params any) error {
req := codec.NewRequestInt(ctx, int(c.id.Add(1)), method, params)
req, err := codec.NewRequest(ctx, codec.NewId(c.id.Add(1)), method, params)
if err != nil {
return err
}
resp, err := c.post(req)
if err != nil {
return err
......@@ -129,7 +125,10 @@ func (c *Client) post(req *codec.Request) (*http.Response, error) {
}
func (c *Client) Notify(ctx context.Context, method string, params any) error {
req := codec.NewNotification(ctx, method, params)
req, err := codec.NewRequest(ctx, nil, method, params)
if err != nil {
return err
}
resp, err := c.post(req)
if err != nil {
return err
......@@ -142,13 +141,18 @@ func (c *Client) BatchCall(ctx context.Context, b ...*codec.BatchElem) error {
reqs := make([]*codec.Request, len(b))
ids := make(map[int]int, len(b))
for idx, v := range b {
var rid *codec.ID
if v.IsNotification {
reqs = append(reqs, codec.NewRequest(ctx, "", v.Method, v.Params))
} else {
id := int(c.id.Add(1))
ids[idx] = id
reqs = append(reqs, codec.NewRequestInt(ctx, id, v.Method, v.Params))
rid = codec.NewNumberIDPtr(int64(id))
}
req, err := codec.NewRequest(ctx, rid, v.Method, v.Params)
if err != nil {
return err
}
reqs = append(reqs, req)
}
dat, err := json.Marshal(reqs)
if err != nil {
......
......@@ -4,7 +4,6 @@ import (
"bufio"
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
......@@ -14,8 +13,11 @@ import (
"strings"
"gfx.cafe/open/jrpc/pkg/codec"
"gfx.cafe/open/jrpc/pkg/serverutil"
)
var _ codec.ReaderWriter = (*Codec)(nil)
type Codec struct {
ctx context.Context
cn func()
......@@ -23,7 +25,7 @@ type Codec struct {
r *http.Request
w http.ResponseWriter
wr *bufio.Writer
msgs chan json.RawMessage
msgs chan *serverutil.Bundle
errCh chan httpError
i codec.PeerInfo
......@@ -35,11 +37,15 @@ type httpError struct {
}
func NewCodec(w http.ResponseWriter, r *http.Request) *Codec {
ir := io.Writer(w)
if w == nil {
ir = io.Discard
}
c := &Codec{
r: r,
w: w,
wr: bufio.NewWriter(w),
msgs: make(chan json.RawMessage, 1),
wr: bufio.NewWriter(ir),
msgs: make(chan *serverutil.Bundle, 1),
errCh: make(chan httpError, 1),
}
ctx := r.Context()
......@@ -74,7 +80,7 @@ func (c *Codec) PeerInfo() codec.PeerInfo {
return c.i
}
func (r *Codec) doReadGet() (msgs json.RawMessage, err error) {
func (r *Codec) doReadGet() (msg *serverutil.Bundle, err error) {
method_up := r.r.URL.Query().Get("method")
if method_up == "" {
method_up = r.r.URL.Path
......@@ -88,11 +94,17 @@ func (r *Codec) doReadGet() (msgs json.RawMessage, err error) {
if id == "" {
id = "1"
}
req := codec.NewRequest(r.ctx, id, method_up, json.RawMessage(param))
return req.MarshalJSON()
return &serverutil.Bundle{
Messages: []*codec.Message{{
ID: codec.NewId(id),
Method: method_up,
Params: param,
}},
Batch: false,
}, nil
}
func (r *Codec) doReadRPC() (msgs json.RawMessage, err error) {
func (r *Codec) doReadRPC() (msg *serverutil.Bundle, err error) {
method_up := r.r.URL.Query().Get("method")
if method_up == "" {
method_up = r.r.URL.Path
......@@ -105,8 +117,22 @@ func (r *Codec) doReadRPC() (msgs json.RawMessage, err error) {
if err != nil {
return nil, err
}
req := codec.NewRequest(r.ctx, id, method_up, json.RawMessage(data))
return req.MarshalJSON()
return &serverutil.Bundle{
Messages: []*codec.Message{{
ID: codec.NewId(id),
Method: method_up,
Params: data,
}},
Batch: false,
}, nil
}
func (r *Codec) doReadPost() (msg *serverutil.Bundle, err error) {
data, err := io.ReadAll(r.r.Body)
if err != nil {
return nil, err
}
return serverutil.ParseBundle(data), nil
}
// validateRequest returns a non-zero response code and error message if the
......@@ -147,7 +173,7 @@ func (c *Codec) doRead() {
return
}
go func() {
var data json.RawMessage
var data *serverutil.Bundle
// TODO: implement eventsource
switch strings.ToUpper(c.r.Method) {
case http.MethodGet:
......@@ -155,7 +181,7 @@ func (c *Codec) doRead() {
case "RPC":
data, err = c.doReadRPC()
case http.MethodPost:
data, err = io.ReadAll(c.r.Body)
data, err = c.doReadPost()
}
if err != nil {
c.errCh <- httpError{
......@@ -168,18 +194,17 @@ func (c *Codec) doRead() {
}()
}
// json.RawMessage can be an array of requests. if it is, then it is a batch request
func (c *Codec) ReadBatch(ctx context.Context) (msgs json.RawMessage, err error) {
func (c *Codec) ReadBatch(ctx context.Context) ([]*codec.Message, bool, error) {
select {
case ans := <-c.msgs:
return ans, nil
return ans.Messages, ans.Batch, nil
case err := <-c.errCh:
http.Error(c.w, err.err.Error(), err.code)
return nil, err.err
return nil, false, err.err
case <-ctx.Done():
return nil, ctx.Err()
return nil, false, ctx.Err()
case <-c.ctx.Done():
return nil, c.ctx.Err()
return nil, false, c.ctx.Err()
}
}
......
package inproc
import (
"bytes"
"context"
"encoding/json"
"sync"
"gfx.cafe/open/jrpc/pkg/clientutil"
"gfx.cafe/open/jrpc/pkg/codec"
"gfx.cafe/open/jrpc/pkg/serverutil"
)
type Client struct {
......@@ -48,17 +48,19 @@ func (c *Client) listen() error {
msgs, _ := codec.ParseMessage(msg)
for i := range msgs {
v := msgs[i]
id := v.ID.Number()
if id == 0 {
id := v.ID
if id == nil {
if c.handler != nil {
c.handler.ServeRPC(nil, codec.NewRequestFromRaw(c.c.ctx, &codec.RequestMarshaling{
Method: v.Method,
Params: v.Params,
Peer: codec.PeerInfo{
Transport: "ipc",
RemoteAddr: "",
},
}))
req := codec.NewRawRequest(c.c.ctx,
nil,
v.Method,
v.Params,
)
req.Peer = codec.PeerInfo{
Transport: "ipc",
RemoteAddr: "",
}
c.handler.ServeRPC(nil, req)
}
continue
}
......@@ -66,7 +68,7 @@ func (c *Client) listen() error {
if v.Error != nil {
err = v.Error
}
c.p.Resolve(id, v.Result, err)
c.p.Resolve(*id, v.Result, err)
}
}
......@@ -76,18 +78,25 @@ func (c *Client) Do(ctx context.Context, result any, method string, params any)
if ctx == nil {
ctx = context.Background()
}
id := c.p.NextId()
req := codec.NewRequestInt(ctx, id, method, params)
fwd, err := json.Marshal(req)
dat, err := json.Marshal(params)
if err != nil {
return err
}
id := c.p.NextId()
fwd := &serverutil.Bundle{
Messages: []*codec.Message{{
ID: id,
Method: method,
Params: dat,
}},
Batch: false,
}
select {
case c.c.msgs <- fwd:
case <-ctx.Done():
return ctx.Err()
}
ans, err := c.p.Ask(req.Context(), id)
ans, err := c.p.Ask(ctx, *id)
if err != nil {
return err
}
......@@ -104,21 +113,19 @@ func (c *Client) BatchCall(ctx context.Context, b ...*codec.BatchElem) error {
if ctx == nil {
ctx = context.Background()
}
buf := new(bytes.Buffer)
enc := json.NewEncoder(buf)
reqs := make([]*codec.Request, 0, len(b))
ids := make([]int, 0, len(b))
ids := make([]*codec.ID, 0, len(b))
reqs := &serverutil.Bundle{Batch: true}
for _, v := range b {
id := c.p.NextId()
req := codec.NewRequestInt(ctx, id, v.Method, v.Params)
dat, err := json.Marshal(v.Params)
if err != nil {
return err
}
req := &codec.Message{ID: id, Method: v.Method, Params: dat}
ids = append(ids, id)
reqs = append(reqs, req)
reqs.Messages = append(reqs.Messages, req)
}
err := enc.Encode(reqs)
if err != nil {
return err
}
c.c.msgs <- buf.Bytes()
c.c.msgs <- reqs
// TODO: wait for response
wg := sync.WaitGroup{}
wg.Add(len(ids))
......@@ -126,7 +133,7 @@ func (c *Client) BatchCall(ctx context.Context, b ...*codec.BatchElem) error {
idx := i
go func() {
defer wg.Done()
ans, err := c.p.Ask(reqs[idx].Context(), ids[idx])
ans, err := c.p.Ask(ctx, *ids[idx])
if err != nil {
b[idx].Error = err
return
......@@ -142,7 +149,7 @@ func (c *Client) BatchCall(ctx context.Context, b ...*codec.BatchElem) error {
}
wg.Wait()
return err
return nil
}
func (c *Client) SetHeader(key string, value string) {
......@@ -156,13 +163,20 @@ func (c *Client) Notify(ctx context.Context, method string, params any) error {
if ctx == nil {
ctx = context.Background()
}
req := codec.NewRequest(ctx, "", method, params)
fwd, err := json.Marshal(req)
dat, err := json.Marshal(params)
if err != nil {
return err
}
msg := &serverutil.Bundle{
Messages: []*codec.Message{{
ID: nil,
Method: method,
Params: dat,
}},
Batch: false,
}
select {
case c.c.msgs <- fwd:
case c.c.msgs <- msg:
case <-ctx.Done():
return ctx.Err()
}
......
......@@ -3,11 +3,11 @@ package inproc
import (
"bufio"
"context"
"encoding/json"
"io"
"sync"
"gfx.cafe/open/jrpc/pkg/codec"
"gfx.cafe/open/jrpc/pkg/serverutil"
)
type Codec struct {
......@@ -17,7 +17,7 @@ type Codec struct {
rd io.Reader
wrLock sync.Mutex
wr *bufio.Writer
msgs chan json.RawMessage
msgs chan *serverutil.Bundle
}
func NewCodec() *Codec {
......@@ -28,7 +28,7 @@ func NewCodec() *Codec {
cn: cn,
rd: bufio.NewReader(rd),
wr: bufio.NewWriter(wr),
msgs: make(chan json.RawMessage, 8),
msgs: make(chan *serverutil.Bundle, 8),
}
}
......@@ -41,15 +41,14 @@ func (c *Codec) PeerInfo() codec.PeerInfo {
}
}
// json.RawMessage can be an array of requests. if it is, then it is a batch request
func (c *Codec) ReadBatch(ctx context.Context) (msgs json.RawMessage, err error) {
func (c *Codec) ReadBatch(ctx context.Context) ([]*codec.Message, bool, error) {
select {
case ans := <-c.msgs:
return ans, nil
return ans.Messages, ans.Batch, nil
case <-ctx.Done():
return nil, ctx.Err()
return nil, false, ctx.Err()
case <-c.ctx.Done():
return nil, c.ctx.Err()
return nil, false, c.ctx.Err()
}
}
......
......@@ -69,9 +69,9 @@ func (c *Client) listen() error {
if v == nil {
continue
}
id := v.ID.Number()
id := v.ID
// messages without ids are notifications
if id == 0 {
if id == nil {
var handler codec.Handler
c.mu.RLock()
handler = c.handler
......@@ -79,38 +79,40 @@ func (c *Client) listen() error {
// writer should only be allowed to send notifications
// reader should contain the message above
// the context is the client context
handler.ServeRPC(nil, codec.NewRequestFromRaw(c.ctx, &codec.RequestMarshaling{
Method: v.Method,
Params: v.Result,
Peer: c.handlerPeer,
}))
req := codec.NewRawRequest(c.ctx,
nil,
v.Method,
v.Params,
)
req.Peer = c.handlerPeer
handler.ServeRPC(nil, req)
continue
}
var err error
if v.Error != nil {
err = v.Error
}
c.p.Resolve(id, v.Result, err)
c.p.Resolve(*id, v.Result, err)
}
}
}
func (c *Client) Do(ctx context.Context, result any, method string, params any) error {
if ctx == nil {
ctx = context.Background()
}
id := c.p.NextId()
req := codec.NewRequestInt(ctx, id, method, params)
req, err := codec.NewRequest(ctx, codec.NewId(id), method, params)
if err != nil {
return err
}
fwd, err := json.Marshal(req)
if err != nil {
return err
}
err = c.writeContext(ctx, fwd)
err = c.writeContext(req.Context(), fwd)
if err != nil {
return err
}
ans, err := c.p.Ask(req.Context(), id)
ans, err := c.p.Ask(req.Context(), *id)
if err != nil {
return err
}
......@@ -130,10 +132,13 @@ func (c *Client) BatchCall(ctx context.Context, b ...*codec.BatchElem) error {
buf := new(bytes.Buffer)
enc := json.NewEncoder(buf)
reqs := make([]*codec.Request, 0, len(b))
ids := make([]int, 0, len(b))
ids := make([]*codec.ID, 0, len(b))
for _, v := range b {
id := c.p.NextId()
req := codec.NewRequestInt(ctx, id, v.Method, v.Params)
req, err := codec.NewRequest(ctx, codec.NewId(id), v.Method, v.Params)
if err != nil {
return err
}
ids = append(ids, id)
reqs = append(reqs, req)
}
......@@ -152,7 +157,7 @@ func (c *Client) BatchCall(ctx context.Context, b ...*codec.BatchElem) error {
idx := i
go func() {
defer wg.Done()
ans, err := c.p.Ask(reqs[idx].Context(), ids[idx])
ans, err := c.p.Ask(reqs[idx].Context(), *ids[idx])
if err != nil {
b[idx].Error = err
return
......@@ -174,7 +179,10 @@ func (c *Client) Notify(ctx context.Context, method string, params any) error {
if ctx == nil {
ctx = context.Background()
}
req := codec.NewRequest(ctx, "", method, params)
req, err := codec.NewRequest(ctx, nil, method, params)
if err != nil {
return err
}
fwd, err := json.Marshal(req)
if err != nil {
return err
......
......@@ -9,6 +9,7 @@ import (
"github.com/goccy/go-json"
"gfx.cafe/open/jrpc/pkg/codec"
"gfx.cafe/open/jrpc/pkg/serverutil"
)
type Codec struct {
......@@ -18,7 +19,7 @@ type Codec struct {
rd io.Reader
wrLock sync.Mutex
wr *bufio.Writer
msgs chan json.RawMessage
msgs chan *serverutil.Bundle
}
func NewCodec(rd io.Reader, wr io.Writer, onError func(error)) *Codec {
......@@ -28,7 +29,7 @@ func NewCodec(rd io.Reader, wr io.Writer, onError func(error)) *Codec {
cn: cn,
rd: bufio.NewReader(rd),
wr: bufio.NewWriter(wr),
msgs: make(chan json.RawMessage, 8),
msgs: make(chan *serverutil.Bundle, 8),
}
go func() {
err := c.listen()
......@@ -40,15 +41,16 @@ func NewCodec(rd io.Reader, wr io.Writer, onError func(error)) *Codec {
}
func (c *Codec) listen() error {
var msg json.RawMessage
for {
var msg json.RawMessage
// reading a message
err := json.NewDecoder(c.rd).Decode(&msg)
if err != nil {
c.cn()
return err
}
c.msgs <- msg
c.msgs <- serverutil.ParseBundle(msg)
msg = msg[:0]
}
}
......@@ -61,15 +63,14 @@ func (c *Codec) PeerInfo() codec.PeerInfo {
}
}
// json.RawMessage can be an array of requests. if it is, then it is a batch request
func (c *Codec) ReadBatch(ctx context.Context) (msgs json.RawMessage, err error) {
func (c *Codec) ReadBatch(ctx context.Context) ([]*codec.Message, bool, error) {
select {
case ans := <-c.msgs:
return ans, nil
return ans.Messages, ans.Batch, nil
case <-ctx.Done():
return nil, ctx.Err()
return nil, false, ctx.Err()
case <-c.ctx.Done():
return nil, c.ctx.Err()
return nil, false, c.ctx.Err()
}
}
......
package redis
import (
"bytes"
"context"
"encoding/json"
"fmt"
"sync"
"gfx.cafe/open/jrpc/pkg/clientutil"
"gfx.cafe/open/jrpc/pkg/codec"
"github.com/redis/go-redis/v9"
"github.com/rs/xid"
)
type Client struct {
p *clientutil.IdReply
c redis.UniversalClient
clientId string
domain string
ctx context.Context
cn context.CancelFunc
m codec.Middlewares
handler codec.Handler
mu sync.RWMutex
handlerPeer codec.PeerInfo
}
func NewClient(c redis.UniversalClient, domain string) *Client {
cl := &Client{
c: c,
p: clientutil.NewIdReply(),
handlerPeer: codec.PeerInfo{
Transport: "redis",
RemoteAddr: "",
},
domain: domain,
// this doesn't need to be secure bc... you have access to the redis instance lol
clientId: xid.New().String(),
handler: codec.HandlerFunc(func(w codec.ResponseWriter, r *codec.Request) {}),
}
cl.ctx, cl.cn = context.WithCancel(context.Background())
go cl.listen()
return cl
}
func (c *Client) SetHandlerPeer(pi codec.PeerInfo) {
c.handlerPeer = pi
}
func (c *Client) Mount(h codec.Middleware) {
c.mu.Lock()
defer c.mu.Unlock()
c.m = append(c.m, h)
c.handler = c.m.HandlerFunc(func(w codec.ResponseWriter, r *codec.Request) {
// do nothing on no handler
})
}
func (c *Client) listen() error {
subCh := fmt.Sprintf(c.domain + "." + c.clientId)
sub := c.c.PSubscribe(c.ctx, subCh)
msgCh := sub.Channel()
for {
incomingMsg := <-msgCh
msgs, _ := codec.ParseMessage(stringToBytes(incomingMsg.Payload))
for i := range msgs {
v := msgs[i]
if v == nil {
continue
}
id := v.ID
// messages without ids are notifications
if id == nil {
var handler codec.Handler
c.mu.RLock()
handler = c.handler
c.mu.RUnlock()
// writer should only be allowed to send notifications
// reader should contain the message above
// the context is the client context
req := codec.NewRawRequest(c.ctx,
nil,
v.Method,
v.Params,
)
req.Peer = c.handlerPeer
handler.ServeRPC(nil, req)
continue
}
var err error
if v.Error != nil {
err = v.Error
}
c.p.Resolve(*id, v.Result, err)
}
}
}
func (c *Client) Do(ctx context.Context, result any, method string, params any) error {
id := c.p.NextId()
req, err := codec.NewRequest(ctx, codec.NewId(id), method, params)
if err != nil {
return err
}
fwd, err := json.Marshal(req)
if err != nil {
return err
}
toFwd, _ := json.Marshal(&RedisRequest{
ReplyChannel: c.clientId,
Message: fwd,
})
err = c.writeContext(req.Context(), toFwd)
if err != nil {
return err
}
ans, err := c.p.Ask(req.Context(), *id)
if err != nil {
return err
}
if result != nil {
err = json.Unmarshal(ans, result)
if err != nil {
return err
}
}
return nil
}
func (c *Client) BatchCall(ctx context.Context, b ...*codec.BatchElem) error {
if ctx == nil {
ctx = context.Background()
}
buf := new(bytes.Buffer)
enc := json.NewEncoder(buf)
reqs := make([]*codec.Request, 0, len(b))
ids := make([]*codec.ID, 0, len(b))
for _, v := range b {
id := c.p.NextId()
req, err := codec.NewRequest(ctx, codec.NewId(id), v.Method, v.Params)
if err != nil {
return err
}
ids = append(ids, id)
reqs = append(reqs, req)
}
err := enc.Encode(reqs)
if err != nil {
return err
}
pkg, _ := json.Marshal(&RedisRequest{
ReplyChannel: c.clientId,
Message: buf.Bytes(),
})
err = c.writeContext(ctx, pkg)
if err != nil {
return err
}
// TODO: wait for response
wg := sync.WaitGroup{}
wg.Add(len(ids))
for i := range ids {
idx := i
go func() {
defer wg.Done()
ans, err := c.p.Ask(reqs[idx].Context(), *ids[idx])
if err != nil {
b[idx].Error = err
return
}
if b[idx].Result != nil {
err = json.Unmarshal(ans, b[idx].Result)
if err != nil {
b[idx].Error = err
return
}
}
}()
}
wg.Wait()
return err
}
func (c *Client) Notify(ctx context.Context, method string, params any) error {
if ctx == nil {
ctx = context.Background()
}
req, err := codec.NewRequest(ctx, nil, method, params)
if err != nil {
return err
}
fwd, err := json.Marshal(req)
if err != nil {
return err
}
return c.writeContext(ctx, fwd)
}
func (c *Client) SetHeader(key string, value string) {
}
func (c *Client) Close() error {
c.cn()
return nil
}
func (c *Client) writeContext(ctx context.Context, xs []byte) error {
errch := make(chan error)
go func() {
err := c.c.LPush(ctx, c.domain+reqDomainSuffix, xs).Err()
select {
case errch <- err:
case <-ctx.Done():
case <-c.ctx.Done():
}
}()
select {
case err := <-errch:
return err
case <-c.ctx.Done():
return c.ctx.Err()
case <-ctx.Done():
return ctx.Err()
}
}
package redis
import (
"bytes"
"context"
"encoding/json"
"sync/atomic"
"gfx.cafe/open/jrpc/pkg/codec"
"gfx.cafe/open/jrpc/pkg/serverutil"
)
var _ codec.ReaderWriter = (*Codec)(nil)
type Codec struct {
ctx context.Context
cn func()
wr bytes.Buffer
replier func(json.RawMessage) error
ansCh chan *serverutil.Bundle
closed atomic.Bool
closeCh chan struct{}
i codec.PeerInfo
}
type httpError struct {
code int
err error
}
func NewCodec(req *RedisRequest, replier func(json.RawMessage) error) *Codec {
c := &Codec{
replier: replier,
ansCh: make(chan *serverutil.Bundle, 1),
closeCh: make(chan struct{}),
}
c.ctx, c.cn = context.WithCancel(context.Background())
bundle := serverutil.ParseBundle(req.Message)
c.ansCh <- bundle
return c
}
// gets the peer info
func (c *Codec) PeerInfo() codec.PeerInfo {
return c.i
}
func (c *Codec) ReadBatch(ctx context.Context) ([]*codec.Message, bool, error) {
select {
case ans := <-c.ansCh:
return ans.Messages, ans.Batch, nil
case <-ctx.Done():
return nil, false, ctx.Err()
case <-c.ctx.Done():
return nil, false, c.ctx.Err()
}
}
// closes the connection
func (c *Codec) Close() error {
if c.closed.CompareAndSwap(false, true) {
close(c.closeCh)
}
c.cn()
return nil
}
func (c *Codec) Write(p []byte) (n int, err error) {
return c.wr.Write(p)
}
func (c *Codec) Flush() (err error) {
c.replier(c.wr.Bytes())
c.wr.Reset()
c.Close()
return
}
// Closed returns a channel which is closed when the connection is closed.
func (c *Codec) Closed() <-chan struct{} {
return c.closeCh
}
// RemoteAddr returns the peer address of the connection.
func (c *Codec) RemoteAddr() string {
return ""
}
package redis
import (
"testing"
"gfx.cafe/open/jrpc/pkg/codec"
"gfx.cafe/open/jrpc/pkg/server"
"gfx.cafe/open/jrpc/pkg/jrpctest"
"github.com/alicebob/miniredis/v2"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/require"
)
func TestBasicSuite(t *testing.T) {
domain := "jrpc"
jrpctest.RunBasicTestSuite(t, jrpctest.BasicTestSuiteArgs{
ServerMaker: func() (*server.Server, jrpctest.ClientMaker, func()) {
redisServer := miniredis.RunT(t)
connOpts := &redis.UniversalOptions{
Addrs: []string{redisServer.Addr()},
}
ctx := redisServer.Ctx
ss, err := CreateServerStream(ctx, domain, connOpts)
require.NoError(t, err)
s := jrpctest.NewServer()
go (&Server{Server: s}).ServeRedis(ctx, ss)
return s, func() codec.Conn {
conn := NewClient(redis.NewUniversalClient(connOpts), domain)
return conn
}, func() {
redisServer.CtxCancel()
}
},
})
}
package redis
import "errors"
const (
// NOTE: if you change this, you will have to change the thing in jrpctest... its what its for now until tests get refactored
maxRequestContentLength = 1024 * 1024 * 5
contentType = "application/json"
)
// https://www.jsonrpc.org/historical/json-rpc-over-http.html#id13
var acceptedContentTypes = []string{
// https://www.jsonrpc.org/historical/json-rpc-over-http.html#id13
contentType, "application/json-rpc", "application/jsonrequest",
// these are added because they make sense, fight me!
"application/jsonrpc2", "application/json-rpc2", "application/jrpc",
}
var ErrInvalidContentType = errors.New("invalid content type")
package redis
import (
"context"
"encoding/json"
"time"
"unsafe"
"github.com/redis/go-redis/v9"
"github.com/rs/xid"
)
type ServerStream struct {
client redis.UniversalClient
domain string
id xid.ID
}
func CreateServerStream(ctx context.Context, domain string, opts *redis.UniversalOptions) (*ServerStream, error) {
c := redis.NewUniversalClient(opts)
// the xid doesn't need to be secure, since we assume anyone with access to the redis cluster can do anything anyways.
s := &ServerStream{
client: c,
id: xid.New(),
domain: domain,
}
return s, nil
}
type RedisRequest struct {
ReplyChannel string `json:"r"`
Message json.RawMessage `json:"msg"`
}
func (s *ServerStream) ReadRequest(ctx context.Context) (*RedisRequest, func(json.RawMessage) error, error) {
timeout := time.Hour
res, err := s.client.BLPop(ctx, timeout, s.domain+reqDomainSuffix).Result()
if err != nil {
return nil, nil, err
}
if len(res) != 2 {
return nil, nil, err
}
redisReq := &RedisRequest{}
err = json.Unmarshal(stringToBytes(res[1]), redisReq)
if err != nil {
return nil, nil, err
}
return redisReq, func(rm json.RawMessage) error {
target := s.domain + "." + redisReq.ReplyChannel
return s.client.Publish(context.Background(), target, string(rm)).Err()
}, nil
}
const reqDomainSuffix = ".req"
const respDomainSuffix = ".resp"
func stringToBytes(s string) []byte {
return *(*[]byte)(unsafe.Pointer(
&struct {
string
Cap int
}{s, len(s)},
))
}
package redis
import (
"context"
"gfx.cafe/open/jrpc/pkg/server"
"tuxpa.in/a/zlog/log"
)
type Server struct {
Server *server.Server
}
func (s *Server) ServeRedis(ctx context.Context, stream *ServerStream) {
for {
select {
case <-ctx.Done():
return
default:
}
req, fn, err := stream.ReadRequest(ctx)
if err != nil {
log.Err(err).Msg("while reading bpop")
continue
}
if req == nil {
continue
}
cd := NewCodec(req, fn)
s.Server.ServeCodec(ctx, cd)
}
}
......@@ -86,9 +86,6 @@ func (m *Mux) RegisterFunc(name string, rcvr any) error {
return fmt.Errorf("no service name for type %s", rcvrVal.Type().String())
}
cb := argreflect.NewCallback(reflect.ValueOf(nil), rcvrVal)
if cb == nil {
return fmt.Errorf("invalid function registeration for %s", name)
}
m.Mount(name, cb)
return nil
}
......
......@@ -445,11 +445,12 @@ func (n *node) findEdge(ntyp nodeTyp, label byte) *node {
i, j := 0, num-1
for i <= j {
idx = i + (j-i)/2
if label > nds[idx].label {
switch {
case label > nds[idx].label:
i = idx + 1
} else if label < nds[idx].label {
case label < nds[idx].label:
j = idx - 1
} else {
default:
i = num // breaks cond
}
}
......@@ -670,11 +671,12 @@ func (ns nodes) findEdge(label byte) *node {
i, j := 0, num-1
for i <= j {
idx = i + (j-i)/2
if label > ns[idx].label {
switch {
case label > ns[idx].label:
i = idx + 1
} else if label < ns[idx].label {
case label < ns[idx].label:
j = idx - 1
} else {
default:
i = num // breaks cond
}
}
......@@ -714,7 +716,7 @@ func walk(r Routes, walkFn WalkFunc, parentRoute string, parentMw ...func(codec.
handler := route.Handler
fullRoute := parentRoute + sepString + route.Pattern
fullRoute = strings.Replace(fullRoute, sepString+"*"+sepString, sepString, -1)
fullRoute = strings.ReplaceAll(fullRoute, sepString+"*"+sepString, sepString)
if chain, ok := handler.(*ChainHandler); ok {
if err := walkFn(fullRoute, chain.Endpoint, append(mws, chain.Middlewares...)...); err != nil {
......
......@@ -18,9 +18,7 @@ var funcs = template.FuncMap{
"list": func(v ...any) []any {
return v
},
"camelCase": func(v string) string {
return strcase.ToCamel(v)
},
"camelCase": strcase.ToCamel,
"goType": func(v string) string {
switch v {
case "boolean":
......@@ -37,9 +35,7 @@ var funcs = template.FuncMap{
panic(fmt.Sprintln("unknown go type:", v))
}
},
"refName": func(v string) string {
return filepath.Base(v)
},
"refName": filepath.Base,
}
func Generate(rpc *types.OpenRPC, ts string, output string) error {
......
......@@ -8,7 +8,7 @@ import (
"gfx.cafe/open/jrpc"
"gfx.cafe/open/jrpc/contrib/codecs"
"gfx.cafe/open/jrpc/contrib/jmux"
"gfx.cafe/open/jrpc/openrpc/out"
"gfx.cafe/open/jrpc/contrib/openrpc/out"
)
func main() {
......
......@@ -26,10 +26,17 @@ type ServerVariable struct {
}
type Info struct {
Title string `json:"title"`
Description string `json:"description"`
Version string `json:"version"`
Contact Contact `json:"contact,omitempty"`
Title string `json:"title"`
Description string `json:"description"`
Version string `json:"version"`
Contact Contact `json:"contact,omitempty"`
Socials []*SocialInfo `json:"x-social,omitempty"`
}
type SocialInfo struct {
Name string `json:"name"`
URL string `json:"url"`
Color string `json:"color"`
}
type Contact struct {
......