diff --git a/contrib/codecs/broker/brokernats/codec_test.go b/contrib/codecs/broker/brokernats/codec_test.go deleted file mode 100644 index fc5f69f4c6b575696fbc35ac3ef6a9a1c82c6de5..0000000000000000000000000000000000000000 --- a/contrib/codecs/broker/brokernats/codec_test.go +++ /dev/null @@ -1,38 +0,0 @@ -package nats - -import ( - "context" - "testing" - - "gfx.cafe/open/jrpc/contrib/codecs/broker" - "gfx.cafe/open/jrpc/pkg/codec" - "gfx.cafe/open/jrpc/pkg/server" - - "gfx.cafe/open/jrpc/pkg/jrpctest" - "github.com/alicebob/miniredis/v2" - "github.com/redis/go-redis/v9" -) - -func TestBasicSuite(t *testing.T) { - - jrpctest.RunBasicTestSuite(t, jrpctest.BasicTestSuiteArgs{ - ServerMaker: func() (*server.Server, jrpctest.ClientMaker, func()) { - redisServer := miniredis.RunT(t) - connOpts := &redis.UniversalOptions{ - Addrs: []string{redisServer.Addr()}, - } - ctx := redisServer.Ctx - ctx, cn := context.WithCancel(ctx) - b := CreateBroker(ctx, "jrpc", connOpts) - s := jrpctest.NewServer() - spokeServer := (&broker.Server{Server: s}) - go spokeServer.ServeSpoke(ctx, b) - return s, func() codec.Conn { - return broker.NewClient(b) - }, func() { - cn() - redisServer.CtxCancel() - } - }, - }) -} diff --git a/contrib/codecs/broker/brokernats/nats.go b/contrib/codecs/broker/brokernats/nats.go deleted file mode 100644 index 210ebfa8cbf8547a9bafa797863c3fe80bf190cf..0000000000000000000000000000000000000000 --- a/contrib/codecs/broker/brokernats/nats.go +++ /dev/null @@ -1,158 +0,0 @@ -package nats - -import ( - "context" - "encoding/json" - "sync" - "sync/atomic" - "time" - "unsafe" - - "gfx.cafe/open/jrpc/contrib/codecs/broker" - "github.com/redis/go-redis/v9" - "github.com/rs/xid" -) - -type Broker struct { - client redis.UniversalClient - domain string - - id xid.ID -} - -type subscription struct { - ch chan json.RawMessage - err error - - closed atomic.Bool - mu sync.RWMutex - pubsub *redis.PubSub -} - -// channel that will close when done or error -func (s *subscription) Listen() <-chan json.RawMessage { - return s.ch -} - -// should close the channel and also stop listening -func (s *subscription) Close() error { - if s.closed.CompareAndSwap(false, true) { - s.pubsub.Close() - } - return nil -} - -// this hold errors -func (s *subscription) Err() error { - s.mu.RLock() - defer s.mu.RUnlock() - return s.err -} - -func (s *Broker) WriteRequest(ctx context.Context, clientId string, msg json.RawMessage) error { - req, err := json.Marshal(&RedisRequest{ - ReplyChannel: clientId, - Message: msg, - }) - if err != nil { - return err - } - return s.client.LPush(ctx, s.domain+reqDomainSuffix, req).Err() -} - -func (s *Broker) Subscribe(ctx context.Context, clientId string) (broker.Subscription, error) { - topic := s.domain + "." + clientId + respDomainSuffix - sub := &subscription{ - ch: make(chan json.RawMessage, 16), - } - - sub.pubsub = s.client.Subscribe(ctx, topic) - ch := sub.pubsub.Channel() - go func() { - for { - select { - case <-ctx.Done(): - sub.Close() - return - case t := <-ch: - select { - case sub.ch <- json.RawMessage(stringToBytes(t.Payload)): - default: - } - } - } - }() - - return sub, nil -} - -func CreateBroker(ctx context.Context, domain string, opts *redis.UniversalOptions) *Broker { - c := redis.NewUniversalClient(opts) - // the xid doesn't need to be secure, since we assume anyone with access to the redis cluster can do anything anyways. - s := &Broker{ - client: c, - id: xid.New(), - domain: domain, - } - return s -} - -type RedisRequest struct { - ReplyChannel string `json:"r"` - Message json.RawMessage `json:"msg"` -} - -func (s *Broker) ReadRequest(ctx context.Context) (json.RawMessage, func(json.RawMessage) error, error) { - timeout := time.Hour - - res, err := s.client.BLPop(ctx, timeout, s.domain+reqDomainSuffix).Result() - if err != nil { - return nil, nil, err - } - if len(res) != 2 { - return nil, nil, err - } - redisReq := &RedisRequest{} - err = json.Unmarshal(stringToBytes(res[1]), redisReq) - if err != nil { - return nil, nil, err - } - return redisReq.Message, func(rm json.RawMessage) error { - if len(rm) == 0 { - return nil - } - target := s.domain + "." + redisReq.ReplyChannel + respDomainSuffix - err := s.client.Publish(context.Background(), target, []byte(rm)).Err() - if err != nil { - return err - } - return nil - }, nil -} - -const reqDomainSuffix = ".req" -const respDomainSuffix = ".resp" - -// stringHeader is the runtime representation of a string. -// It should be identical to reflect.StringHeader -type stringHeader struct { - data unsafe.Pointer - stringLen int -} - -// sliceHeader is the runtime representation of a slice. -// It should be identical to reflect.sliceHeader -type sliceHeader struct { - data unsafe.Pointer - sliceLen int - sliceCap int -} - -func stringToBytes(s string) (b []byte) { - stringHeader := (*stringHeader)(unsafe.Pointer(&s)) - sliceHeader := (*sliceHeader)(unsafe.Pointer(&b)) - sliceHeader.data = stringHeader.data - sliceHeader.sliceLen = len(s) - sliceHeader.sliceCap = len(s) - return b -} diff --git a/contrib/codecs/broker/brokerredis/codec_test.go b/contrib/codecs/broker/brokerredis/codec_test.go deleted file mode 100644 index 190286a6b6fde5a71bdeaf2cd48e4b0f0833e9e7..0000000000000000000000000000000000000000 --- a/contrib/codecs/broker/brokerredis/codec_test.go +++ /dev/null @@ -1,13 +0,0 @@ -package redis - -import ( - "testing" - - "gfx.cafe/open/jrpc/pkg/jrpctest" -) - -func TestBasicSuite(t *testing.T) { - jrpctest.RunBasicTestSuite(t, jrpctest.BasicTestSuiteArgs{ - ServerMaker: ServerMaker, - }) -} diff --git a/contrib/codecs/broker/brokerredis/redis.go b/contrib/codecs/broker/brokerredis/redis.go deleted file mode 100644 index a1694b578d4b17bcbc9cf4e029a7dcedab755215..0000000000000000000000000000000000000000 --- a/contrib/codecs/broker/brokerredis/redis.go +++ /dev/null @@ -1,157 +0,0 @@ -package redis - -import ( - "context" - "encoding/json" - "sync" - "sync/atomic" - "time" - "unsafe" - - "gfx.cafe/open/jrpc/contrib/codecs/broker" - "github.com/redis/go-redis/v9" - "github.com/rs/xid" -) - -type Broker struct { - client redis.UniversalClient - domain string - - id xid.ID -} - -type subscription struct { - ch chan json.RawMessage - err error - - closed atomic.Bool - mu sync.RWMutex - pubsub *redis.PubSub -} - -// channel that will close when done or error -func (s *subscription) Listen() <-chan json.RawMessage { - return s.ch -} - -// should close the channel and also stop listening -func (s *subscription) Close() error { - if s.closed.CompareAndSwap(false, true) { - s.pubsub.Close() - } - return nil -} - -// this hold errors -func (s *subscription) Err() error { - s.mu.RLock() - defer s.mu.RUnlock() - return s.err -} - -func (s *Broker) WriteRequest(ctx context.Context, clientId string, msg json.RawMessage) error { - req, err := json.Marshal(&RedisRequest{ - ReplyChannel: clientId, - Message: msg, - }) - if err != nil { - return err - } - return s.client.LPush(ctx, s.domain+reqDomainSuffix, req).Err() -} - -func (s *Broker) Subscribe(ctx context.Context, clientId string) (broker.Subscription, error) { - topic := s.domain + "." + clientId + respDomainSuffix - sub := &subscription{ - ch: make(chan json.RawMessage, 16), - } - - sub.pubsub = s.client.Subscribe(ctx, topic) - ch := sub.pubsub.Channel() - go func() { - for { - select { - case <-ctx.Done(): - sub.Close() - return - case t := <-ch: - select { - case sub.ch <- json.RawMessage(stringToBytes(t.Payload)): - default: - } - } - } - }() - - return sub, nil -} - -func CreateBroker(ctx context.Context, domain string, opts *redis.UniversalOptions) *Broker { - c := redis.NewUniversalClient(opts) - // the xid doesn't need to be secure, since we assume anyone with access to the redis cluster can do anything anyways. - s := &Broker{ - client: c, - id: xid.New(), - domain: domain, - } - return s -} - -type RedisRequest struct { - ReplyChannel string `json:"r"` - Message json.RawMessage `json:"msg"` -} - -func (s *Broker) ReadRequest(ctx context.Context) (json.RawMessage, func(json.RawMessage) error, error) { - timeout := time.Second - res, err := s.client.BLPop(ctx, timeout, s.domain+reqDomainSuffix).Result() - if err != nil { - return nil, nil, err - } - if len(res) != 2 { - return nil, nil, err - } - redisReq := &RedisRequest{} - err = json.Unmarshal(stringToBytes(res[1]), redisReq) - if err != nil { - return nil, nil, err - } - return redisReq.Message, func(rm json.RawMessage) error { - if len(rm) == 0 { - return nil - } - target := s.domain + "." + redisReq.ReplyChannel + respDomainSuffix - err := s.client.Publish(context.Background(), target, []byte(rm)).Err() - if err != nil { - return err - } - return nil - }, nil -} - -const reqDomainSuffix = ".req" -const respDomainSuffix = ".resp" - -// stringHeader is the runtime representation of a string. -// It should be identical to reflect.StringHeader -type stringHeader struct { - data unsafe.Pointer - stringLen int -} - -// sliceHeader is the runtime representation of a slice. -// It should be identical to reflect.sliceHeader -type sliceHeader struct { - data unsafe.Pointer - sliceLen int - sliceCap int -} - -func stringToBytes(s string) (b []byte) { - stringHeader := (*stringHeader)(unsafe.Pointer(&s)) - sliceHeader := (*sliceHeader)(unsafe.Pointer(&b)) - sliceHeader.data = stringHeader.data - sliceHeader.sliceLen = len(s) - sliceHeader.sliceCap = len(s) - return b -} diff --git a/contrib/codecs/broker/brokerredis/testing.go b/contrib/codecs/broker/brokerredis/testing.go deleted file mode 100644 index f210f32f07e03545e46b3426f07003271dc984cc..0000000000000000000000000000000000000000 --- a/contrib/codecs/broker/brokerredis/testing.go +++ /dev/null @@ -1,34 +0,0 @@ -package redis - -import ( - "context" - - "gfx.cafe/open/jrpc/contrib/codecs/broker" - "gfx.cafe/open/jrpc/pkg/codec" - "gfx.cafe/open/jrpc/pkg/jrpctest" - "gfx.cafe/open/jrpc/pkg/server" - "github.com/alicebob/miniredis/v2" - "github.com/redis/go-redis/v9" -) - -func ServerMaker() (*server.Server, jrpctest.ClientMaker, func()) { - redisServer, err := miniredis.Run() - if err != nil { - panic(err) - } - connOpts := &redis.UniversalOptions{ - Addrs: []string{redisServer.Addr()}, - } - ctx := redisServer.Ctx - ctx, cn := context.WithCancel(ctx) - b := CreateBroker(ctx, "jrpc", connOpts) - s := jrpctest.NewServer() - spokeServer := (&broker.Server{Server: s}) - go spokeServer.ServeSpoke(ctx, b) - return s, func() codec.Conn { - return broker.NewClient(b) - }, func() { - cn() - redisServer.CtxCancel() - } -} diff --git a/go.mod b/go.mod index b8e8286a25c399f61da04646b5e1dfa8aadeca01..e914fcb0ebcd2b3c7786fb6e9486c86e9f4c2076 100644 --- a/go.mod +++ b/go.mod @@ -8,13 +8,11 @@ require ( gfx.cafe/util/go/frand v0.0.0-20230721185457-c559e86c829c gfx.cafe/util/go/generic v0.0.0-20230721185457-c559e86c829c github.com/alecthomas/kong v0.8.0 - github.com/alicebob/miniredis/v2 v2.30.5 github.com/davecgh/go-spew v1.1.1 github.com/deckarep/golang-set v1.8.0 github.com/go-faster/jx v1.1.0 github.com/goccy/go-json v0.10.2 github.com/iancoleman/strcase v0.3.0 - github.com/redis/go-redis/v9 v9.1.0 github.com/rs/xid v1.5.0 github.com/stretchr/testify v1.8.4 sigs.k8s.io/yaml v1.3.0 @@ -23,9 +21,6 @@ require ( require ( github.com/aead/chacha20 v0.0.0-20180709150244-8b13a72661da // indirect - github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 // indirect - github.com/cespare/xxhash/v2 v2.2.0 // indirect - github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/go-faster/errors v0.6.1 // indirect github.com/klauspost/compress v1.16.7 // indirect github.com/kr/pretty v0.3.1 // indirect @@ -34,7 +29,6 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rs/zerolog v1.30.0 // indirect github.com/segmentio/asm v1.2.0 // indirect - github.com/yuin/gopher-lua v1.1.0 // indirect golang.org/x/exp v0.0.0-20230206171751-46f607a40771 // indirect golang.org/x/sys v0.11.0 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect diff --git a/go.sum b/go.sum index a36765fbd4891a9b6aea0492aa16052a5f5d9c48..dea721540448421111c4f9e67d3d7a49156946b6 100644 --- a/go.sum +++ b/go.sum @@ -12,18 +12,6 @@ github.com/alecthomas/assert/v2 v2.1.0 h1:tbredtNcQnoSd3QBhQWI7QZ3XHOVkw1Moklp2o github.com/alecthomas/kong v0.8.0 h1:ryDCzutfIqJPnNn0omnrgHLbAggDQM2VWHikE1xqK7s= github.com/alecthomas/kong v0.8.0/go.mod h1:n1iCIO2xS46oE8ZfYCNDqdR0b0wZNrXAIAqro/2132U= github.com/alecthomas/repr v0.1.0 h1:ENn2e1+J3k09gyj2shc0dHr/yjaWSHRlrJ4DPMevDqE= -github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= -github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 h1:uvdUDbHQHO85qeSydJtItA4T55Pw6BtAejd0APRJOCE= -github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= -github.com/alicebob/miniredis/v2 v2.30.5 h1:3r6kTHdKnuP4fkS8k2IrvSfxpxUTcW1SOL0wN7b7Dt0= -github.com/alicebob/miniredis/v2 v2.30.5/go.mod h1:b25qWj4fCEsBeAAR2mlb0ufImGC6uH3VlUfb/HS5zKg= -github.com/bsm/ginkgo/v2 v2.9.5 h1:rtVBYPs3+TC5iLUVOis1B9tjLTup7Cj5IfzosKtvTJ0= -github.com/bsm/gomega v1.26.0 h1:LhQm+AFcgV2M0WyKroMASzAzCAJVpAxQXv4SaI9a69Y= -github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= -github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= -github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= -github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= @@ -31,8 +19,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/deckarep/golang-set v1.8.0 h1:sk9/l/KqpunDwP7pSjUg0keiOOLEnOBHzykLrsPppp4= github.com/deckarep/golang-set v1.8.0/go.mod h1:5nI87KwE7wgsBU1F4GKAw2Qod7p5kyS383rP6+o6qqo= -github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= -github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/go-faster/errors v0.6.1 h1:nNIPOBkprlKzkThvS/0YaX8Zs9KewLCOSFQS5BU06FI= github.com/go-faster/errors v0.6.1/go.mod h1:5MGV2/2T9yvlrbhe9pD9LO5Z/2zCSq2T8j+Jpi2LAyY= github.com/go-faster/jx v1.1.0 h1:ZsW3wD+snOdmTDy9eIVgQdjUpXRRV4rqW8NS3t+20bg= @@ -66,8 +52,6 @@ github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsK github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/redis/go-redis/v9 v9.1.0 h1:137FnGdk+EQdCbye1FW+qOEcY5S+SpY9T0NiuqvtfMY= -github.com/redis/go-redis/v9 v9.1.0/go.mod h1:urWj3He21Dj5k4TK1y59xH8Uj6ATueP8AH1cY3lZl4c= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= @@ -80,11 +64,8 @@ github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -github.com/yuin/gopher-lua v1.1.0 h1:BojcDhfyDWgU2f2TOzYK/g5p2gxMrku8oupLDqlnSqE= -github.com/yuin/gopher-lua v1.1.0/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= golang.org/x/exp v0.0.0-20230206171751-46f607a40771 h1:xP7rWLUr1e1n2xkK5YB4LI0hPEy3LJC6Wk+D4pGlOJg= golang.org/x/exp v0.0.0-20230206171751-46f607a40771/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= -golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=