good morning!!!!

Skip to content
Snippets Groups Projects
Verified Commit 51952f2e authored by a's avatar a
Browse files

ok

parent be635676
No related branches found
No related tags found
No related merge requests found
Pipeline #25935 failed
package nats
import (
"context"
"testing"
"gfx.cafe/open/jrpc/contrib/codecs/broker"
"gfx.cafe/open/jrpc/pkg/codec"
"gfx.cafe/open/jrpc/pkg/server"
"gfx.cafe/open/jrpc/pkg/jrpctest"
"github.com/alicebob/miniredis/v2"
"github.com/redis/go-redis/v9"
)
func TestBasicSuite(t *testing.T) {
jrpctest.RunBasicTestSuite(t, jrpctest.BasicTestSuiteArgs{
ServerMaker: func() (*server.Server, jrpctest.ClientMaker, func()) {
redisServer := miniredis.RunT(t)
connOpts := &redis.UniversalOptions{
Addrs: []string{redisServer.Addr()},
}
ctx := redisServer.Ctx
ctx, cn := context.WithCancel(ctx)
b := CreateBroker(ctx, "jrpc", connOpts)
s := jrpctest.NewServer()
spokeServer := (&broker.Server{Server: s})
go spokeServer.ServeSpoke(ctx, b)
return s, func() codec.Conn {
return broker.NewClient(b)
}, func() {
cn()
redisServer.CtxCancel()
}
},
})
}
package nats
import (
"context"
"encoding/json"
"sync"
"sync/atomic"
"time"
"unsafe"
"gfx.cafe/open/jrpc/contrib/codecs/broker"
"github.com/redis/go-redis/v9"
"github.com/rs/xid"
)
type Broker struct {
client redis.UniversalClient
domain string
id xid.ID
}
type subscription struct {
ch chan json.RawMessage
err error
closed atomic.Bool
mu sync.RWMutex
pubsub *redis.PubSub
}
// channel that will close when done or error
func (s *subscription) Listen() <-chan json.RawMessage {
return s.ch
}
// should close the channel and also stop listening
func (s *subscription) Close() error {
if s.closed.CompareAndSwap(false, true) {
s.pubsub.Close()
}
return nil
}
// this hold errors
func (s *subscription) Err() error {
s.mu.RLock()
defer s.mu.RUnlock()
return s.err
}
func (s *Broker) WriteRequest(ctx context.Context, clientId string, msg json.RawMessage) error {
req, err := json.Marshal(&RedisRequest{
ReplyChannel: clientId,
Message: msg,
})
if err != nil {
return err
}
return s.client.LPush(ctx, s.domain+reqDomainSuffix, req).Err()
}
func (s *Broker) Subscribe(ctx context.Context, clientId string) (broker.Subscription, error) {
topic := s.domain + "." + clientId + respDomainSuffix
sub := &subscription{
ch: make(chan json.RawMessage, 16),
}
sub.pubsub = s.client.Subscribe(ctx, topic)
ch := sub.pubsub.Channel()
go func() {
for {
select {
case <-ctx.Done():
sub.Close()
return
case t := <-ch:
select {
case sub.ch <- json.RawMessage(stringToBytes(t.Payload)):
default:
}
}
}
}()
return sub, nil
}
func CreateBroker(ctx context.Context, domain string, opts *redis.UniversalOptions) *Broker {
c := redis.NewUniversalClient(opts)
// the xid doesn't need to be secure, since we assume anyone with access to the redis cluster can do anything anyways.
s := &Broker{
client: c,
id: xid.New(),
domain: domain,
}
return s
}
type RedisRequest struct {
ReplyChannel string `json:"r"`
Message json.RawMessage `json:"msg"`
}
func (s *Broker) ReadRequest(ctx context.Context) (json.RawMessage, func(json.RawMessage) error, error) {
timeout := time.Hour
res, err := s.client.BLPop(ctx, timeout, s.domain+reqDomainSuffix).Result()
if err != nil {
return nil, nil, err
}
if len(res) != 2 {
return nil, nil, err
}
redisReq := &RedisRequest{}
err = json.Unmarshal(stringToBytes(res[1]), redisReq)
if err != nil {
return nil, nil, err
}
return redisReq.Message, func(rm json.RawMessage) error {
if len(rm) == 0 {
return nil
}
target := s.domain + "." + redisReq.ReplyChannel + respDomainSuffix
err := s.client.Publish(context.Background(), target, []byte(rm)).Err()
if err != nil {
return err
}
return nil
}, nil
}
const reqDomainSuffix = ".req"
const respDomainSuffix = ".resp"
// stringHeader is the runtime representation of a string.
// It should be identical to reflect.StringHeader
type stringHeader struct {
data unsafe.Pointer
stringLen int
}
// sliceHeader is the runtime representation of a slice.
// It should be identical to reflect.sliceHeader
type sliceHeader struct {
data unsafe.Pointer
sliceLen int
sliceCap int
}
func stringToBytes(s string) (b []byte) {
stringHeader := (*stringHeader)(unsafe.Pointer(&s))
sliceHeader := (*sliceHeader)(unsafe.Pointer(&b))
sliceHeader.data = stringHeader.data
sliceHeader.sliceLen = len(s)
sliceHeader.sliceCap = len(s)
return b
}
package redis
import (
"testing"
"gfx.cafe/open/jrpc/pkg/jrpctest"
)
func TestBasicSuite(t *testing.T) {
jrpctest.RunBasicTestSuite(t, jrpctest.BasicTestSuiteArgs{
ServerMaker: ServerMaker,
})
}
package redis
import (
"context"
"encoding/json"
"sync"
"sync/atomic"
"time"
"unsafe"
"gfx.cafe/open/jrpc/contrib/codecs/broker"
"github.com/redis/go-redis/v9"
"github.com/rs/xid"
)
type Broker struct {
client redis.UniversalClient
domain string
id xid.ID
}
type subscription struct {
ch chan json.RawMessage
err error
closed atomic.Bool
mu sync.RWMutex
pubsub *redis.PubSub
}
// channel that will close when done or error
func (s *subscription) Listen() <-chan json.RawMessage {
return s.ch
}
// should close the channel and also stop listening
func (s *subscription) Close() error {
if s.closed.CompareAndSwap(false, true) {
s.pubsub.Close()
}
return nil
}
// this hold errors
func (s *subscription) Err() error {
s.mu.RLock()
defer s.mu.RUnlock()
return s.err
}
func (s *Broker) WriteRequest(ctx context.Context, clientId string, msg json.RawMessage) error {
req, err := json.Marshal(&RedisRequest{
ReplyChannel: clientId,
Message: msg,
})
if err != nil {
return err
}
return s.client.LPush(ctx, s.domain+reqDomainSuffix, req).Err()
}
func (s *Broker) Subscribe(ctx context.Context, clientId string) (broker.Subscription, error) {
topic := s.domain + "." + clientId + respDomainSuffix
sub := &subscription{
ch: make(chan json.RawMessage, 16),
}
sub.pubsub = s.client.Subscribe(ctx, topic)
ch := sub.pubsub.Channel()
go func() {
for {
select {
case <-ctx.Done():
sub.Close()
return
case t := <-ch:
select {
case sub.ch <- json.RawMessage(stringToBytes(t.Payload)):
default:
}
}
}
}()
return sub, nil
}
func CreateBroker(ctx context.Context, domain string, opts *redis.UniversalOptions) *Broker {
c := redis.NewUniversalClient(opts)
// the xid doesn't need to be secure, since we assume anyone with access to the redis cluster can do anything anyways.
s := &Broker{
client: c,
id: xid.New(),
domain: domain,
}
return s
}
type RedisRequest struct {
ReplyChannel string `json:"r"`
Message json.RawMessage `json:"msg"`
}
func (s *Broker) ReadRequest(ctx context.Context) (json.RawMessage, func(json.RawMessage) error, error) {
timeout := time.Second
res, err := s.client.BLPop(ctx, timeout, s.domain+reqDomainSuffix).Result()
if err != nil {
return nil, nil, err
}
if len(res) != 2 {
return nil, nil, err
}
redisReq := &RedisRequest{}
err = json.Unmarshal(stringToBytes(res[1]), redisReq)
if err != nil {
return nil, nil, err
}
return redisReq.Message, func(rm json.RawMessage) error {
if len(rm) == 0 {
return nil
}
target := s.domain + "." + redisReq.ReplyChannel + respDomainSuffix
err := s.client.Publish(context.Background(), target, []byte(rm)).Err()
if err != nil {
return err
}
return nil
}, nil
}
const reqDomainSuffix = ".req"
const respDomainSuffix = ".resp"
// stringHeader is the runtime representation of a string.
// It should be identical to reflect.StringHeader
type stringHeader struct {
data unsafe.Pointer
stringLen int
}
// sliceHeader is the runtime representation of a slice.
// It should be identical to reflect.sliceHeader
type sliceHeader struct {
data unsafe.Pointer
sliceLen int
sliceCap int
}
func stringToBytes(s string) (b []byte) {
stringHeader := (*stringHeader)(unsafe.Pointer(&s))
sliceHeader := (*sliceHeader)(unsafe.Pointer(&b))
sliceHeader.data = stringHeader.data
sliceHeader.sliceLen = len(s)
sliceHeader.sliceCap = len(s)
return b
}
package redis
import (
"context"
"gfx.cafe/open/jrpc/contrib/codecs/broker"
"gfx.cafe/open/jrpc/pkg/codec"
"gfx.cafe/open/jrpc/pkg/jrpctest"
"gfx.cafe/open/jrpc/pkg/server"
"github.com/alicebob/miniredis/v2"
"github.com/redis/go-redis/v9"
)
func ServerMaker() (*server.Server, jrpctest.ClientMaker, func()) {
redisServer, err := miniredis.Run()
if err != nil {
panic(err)
}
connOpts := &redis.UniversalOptions{
Addrs: []string{redisServer.Addr()},
}
ctx := redisServer.Ctx
ctx, cn := context.WithCancel(ctx)
b := CreateBroker(ctx, "jrpc", connOpts)
s := jrpctest.NewServer()
spokeServer := (&broker.Server{Server: s})
go spokeServer.ServeSpoke(ctx, b)
return s, func() codec.Conn {
return broker.NewClient(b)
}, func() {
cn()
redisServer.CtxCancel()
}
}
......@@ -8,13 +8,11 @@ require (
gfx.cafe/util/go/frand v0.0.0-20230721185457-c559e86c829c
gfx.cafe/util/go/generic v0.0.0-20230721185457-c559e86c829c
github.com/alecthomas/kong v0.8.0
github.com/alicebob/miniredis/v2 v2.30.5
github.com/davecgh/go-spew v1.1.1
github.com/deckarep/golang-set v1.8.0
github.com/go-faster/jx v1.1.0
github.com/goccy/go-json v0.10.2
github.com/iancoleman/strcase v0.3.0
github.com/redis/go-redis/v9 v9.1.0
github.com/rs/xid v1.5.0
github.com/stretchr/testify v1.8.4
sigs.k8s.io/yaml v1.3.0
......@@ -23,9 +21,6 @@ require (
require (
github.com/aead/chacha20 v0.0.0-20180709150244-8b13a72661da // indirect
github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/go-faster/errors v0.6.1 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/kr/pretty v0.3.1 // indirect
......@@ -34,7 +29,6 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rs/zerolog v1.30.0 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/yuin/gopher-lua v1.1.0 // indirect
golang.org/x/exp v0.0.0-20230206171751-46f607a40771 // indirect
golang.org/x/sys v0.11.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
......
......@@ -12,18 +12,6 @@ github.com/alecthomas/assert/v2 v2.1.0 h1:tbredtNcQnoSd3QBhQWI7QZ3XHOVkw1Moklp2o
github.com/alecthomas/kong v0.8.0 h1:ryDCzutfIqJPnNn0omnrgHLbAggDQM2VWHikE1xqK7s=
github.com/alecthomas/kong v0.8.0/go.mod h1:n1iCIO2xS46oE8ZfYCNDqdR0b0wZNrXAIAqro/2132U=
github.com/alecthomas/repr v0.1.0 h1:ENn2e1+J3k09gyj2shc0dHr/yjaWSHRlrJ4DPMevDqE=
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 h1:uvdUDbHQHO85qeSydJtItA4T55Pw6BtAejd0APRJOCE=
github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
github.com/alicebob/miniredis/v2 v2.30.5 h1:3r6kTHdKnuP4fkS8k2IrvSfxpxUTcW1SOL0wN7b7Dt0=
github.com/alicebob/miniredis/v2 v2.30.5/go.mod h1:b25qWj4fCEsBeAAR2mlb0ufImGC6uH3VlUfb/HS5zKg=
github.com/bsm/ginkgo/v2 v2.9.5 h1:rtVBYPs3+TC5iLUVOis1B9tjLTup7Cj5IfzosKtvTJ0=
github.com/bsm/gomega v1.26.0 h1:LhQm+AFcgV2M0WyKroMASzAzCAJVpAxQXv4SaI9a69Y=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
......@@ -31,8 +19,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/deckarep/golang-set v1.8.0 h1:sk9/l/KqpunDwP7pSjUg0keiOOLEnOBHzykLrsPppp4=
github.com/deckarep/golang-set v1.8.0/go.mod h1:5nI87KwE7wgsBU1F4GKAw2Qod7p5kyS383rP6+o6qqo=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/go-faster/errors v0.6.1 h1:nNIPOBkprlKzkThvS/0YaX8Zs9KewLCOSFQS5BU06FI=
github.com/go-faster/errors v0.6.1/go.mod h1:5MGV2/2T9yvlrbhe9pD9LO5Z/2zCSq2T8j+Jpi2LAyY=
github.com/go-faster/jx v1.1.0 h1:ZsW3wD+snOdmTDy9eIVgQdjUpXRRV4rqW8NS3t+20bg=
......@@ -66,8 +52,6 @@ github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsK
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.1.0 h1:137FnGdk+EQdCbye1FW+qOEcY5S+SpY9T0NiuqvtfMY=
github.com/redis/go-redis/v9 v9.1.0/go.mod h1:urWj3He21Dj5k4TK1y59xH8Uj6ATueP8AH1cY3lZl4c=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
......@@ -80,11 +64,8 @@ github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=
github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/yuin/gopher-lua v1.1.0 h1:BojcDhfyDWgU2f2TOzYK/g5p2gxMrku8oupLDqlnSqE=
github.com/yuin/gopher-lua v1.1.0/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
golang.org/x/exp v0.0.0-20230206171751-46f607a40771 h1:xP7rWLUr1e1n2xkK5YB4LI0hPEy3LJC6Wk+D4pGlOJg=
golang.org/x/exp v0.0.0-20230206171751-46f607a40771/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment