diff --git a/contrib/codecs/broker/broker.go b/contrib/codecs/broker/broker.go new file mode 100644 index 0000000000000000000000000000000000000000..453847e0149fadf298f882608d9fae8a7862ea3f --- /dev/null +++ b/contrib/codecs/broker/broker.go @@ -0,0 +1,29 @@ +package broker + +import ( + "context" + "encoding/json" +) + +type ServerSpoke interface { + ReadRequest(ctx context.Context) (json.RawMessage, func(json.RawMessage) error, error) +} + +type ClientSpoke interface { + WriteRequest(ctx context.Context, clientId string, msg json.RawMessage) error + Subscribe(ctx context.Context, clientId string) (Subscription, error) +} + +type Broker interface { + ServerSpoke + ClientSpoke +} + +type Subscription interface { + // channel that will close when done or error + Listen() <-chan json.RawMessage + // should close the channel and also stop listening + Close() error + // this hold errors + Err() error +} diff --git a/contrib/codecs/broker/broker_inproc.go b/contrib/codecs/broker/broker_inproc.go new file mode 100644 index 0000000000000000000000000000000000000000..392deba0af1d813a13c5d985019d8960180a30b9 --- /dev/null +++ b/contrib/codecs/broker/broker_inproc.go @@ -0,0 +1,154 @@ +package broker + +import ( + "context" + "encoding/json" + "strings" + "sync" + "sync/atomic" + + "tuxpa.in/a/zlog/log" +) + +type subscription struct { + topic string + ch chan json.RawMessage + err error + + closed atomic.Bool + mu sync.RWMutex +} + +// channel that will close when done or error +func (s *subscription) Listen() <-chan json.RawMessage { + return s.ch +} + +// should close the channel and also stop listening +func (s *subscription) Close() error { + s.closed.CompareAndSwap(false, true) + return nil +} + +// this hold errors +func (s *subscription) Err() error { + s.mu.RLock() + defer s.mu.RUnlock() + return s.err +} + +type frame struct { + topic string + data json.RawMessage +} + +type ChannelBroker struct { + mu sync.RWMutex + subs map[int]*subscription + subCount int + + msgs chan *frame + + domain string +} + +func NewChannelBroker() *ChannelBroker { + return &ChannelBroker{ + subs: map[int]*subscription{}, + msgs: make(chan *frame, 128), + } +} + +func (b *ChannelBroker) ReadRequest(ctx context.Context) (json.RawMessage, func(json.RawMessage) error, error) { + select { + case <-ctx.Done(): + return nil, nil, ctx.Err() + case f := <-b.msgs: + return f.data, func(resp json.RawMessage) error { + return b.Publish(context.Background(), f.topic, resp) + }, nil + } +} + +func (b *ChannelBroker) WriteRequest(ctx context.Context, topic string, msg json.RawMessage) error { + select { + case <-ctx.Done(): + return ctx.Err() + case b.msgs <- &frame{data: msg, topic: topic}: + } + return nil +} + +func (b *ChannelBroker) Publish(ctx context.Context, topic string, data []byte) error { + b.mu.RLock() + defer b.mu.RUnlock() + for _, v := range b.subs { + if v.closed.Load() { + continue + } + if childTopicMatchesParent(v.topic, topic) { + select { + case v.ch <- data: + default: + log.Trace().Str("topic", topic).Msg("dropped message") + } + } + } + return nil +} + +func (b *ChannelBroker) Subscribe(ctx context.Context, topic string) (Subscription, error) { + sub := &subscription{ + topic: topic, + ch: make(chan json.RawMessage, 16), + } + b.mu.Lock() + b.subCount = b.subCount + 1 + id := b.subCount + b.subs[id] = sub + b.mu.Unlock() + // gc after adding a new subscription + b.gc() + return sub, nil +} + +func (b *ChannelBroker) gc() { + b.mu.Lock() + defer b.mu.Unlock() + for k, v := range b.subs { + if v.closed.Load() { + delete(b.subs, k) + } + } +} + +// This is to see if a message with topic child should be matched with subscription parent +// so the child cannot contain wildcards * and >, but the parent can +func childTopicMatchesParent(parentString string, childString string) bool { + parent := strings.Split(parentString, ".") + child := strings.Split(childString, ".") + // if the length of the child is less than the length of the parent, its not possible for match + // for instance, if the parent topic is "one.two", and the child is "one", it will never match + if len(child) < len(parent) { + return false + } + // this is safe because length of child must be equal to or lower than parent, from previous + for idx, v := range parent { + // if parent is wildcard, match all, so continue + if v == "*" { + continue + } + // if the > wildcard is at the end, and we have exited since then, we are done + if v == ">" && len(parent)-1 == idx { + return true + } + // else make sure parent matches child. + if child[idx] != v { + return false + } + } + if len(child) == len(parent) { + return true + } + return false +} diff --git a/exp/redis/codec_test.go b/contrib/codecs/broker/brokernats/codec_test.go similarity index 68% rename from exp/redis/codec_test.go rename to contrib/codecs/broker/brokernats/codec_test.go index 5e5c8363832ad7119c5c08fb9c772d587f25555e..fc5f69f4c6b575696fbc35ac3ef6a9a1c82c6de5 100644 --- a/exp/redis/codec_test.go +++ b/contrib/codecs/broker/brokernats/codec_test.go @@ -1,19 +1,19 @@ -package redis +package nats import ( + "context" "testing" + "gfx.cafe/open/jrpc/contrib/codecs/broker" "gfx.cafe/open/jrpc/pkg/codec" "gfx.cafe/open/jrpc/pkg/server" "gfx.cafe/open/jrpc/pkg/jrpctest" "github.com/alicebob/miniredis/v2" "github.com/redis/go-redis/v9" - "github.com/stretchr/testify/require" ) func TestBasicSuite(t *testing.T) { - domain := "jrpc" jrpctest.RunBasicTestSuite(t, jrpctest.BasicTestSuiteArgs{ ServerMaker: func() (*server.Server, jrpctest.ClientMaker, func()) { @@ -22,14 +22,15 @@ func TestBasicSuite(t *testing.T) { Addrs: []string{redisServer.Addr()}, } ctx := redisServer.Ctx - ss, err := CreateServerStream(ctx, domain, connOpts) - require.NoError(t, err) + ctx, cn := context.WithCancel(ctx) + b := CreateBroker(ctx, "jrpc", connOpts) s := jrpctest.NewServer() - go (&Server{Server: s}).ServeRedis(ctx, ss) + spokeServer := (&broker.Server{Server: s}) + go spokeServer.ServeSpoke(ctx, b) return s, func() codec.Conn { - conn := NewClient(redis.NewUniversalClient(connOpts), domain) - return conn + return broker.NewClient(b) }, func() { + cn() redisServer.CtxCancel() } }, diff --git a/contrib/codecs/broker/brokernats/nats.go b/contrib/codecs/broker/brokernats/nats.go new file mode 100644 index 0000000000000000000000000000000000000000..210ebfa8cbf8547a9bafa797863c3fe80bf190cf --- /dev/null +++ b/contrib/codecs/broker/brokernats/nats.go @@ -0,0 +1,158 @@ +package nats + +import ( + "context" + "encoding/json" + "sync" + "sync/atomic" + "time" + "unsafe" + + "gfx.cafe/open/jrpc/contrib/codecs/broker" + "github.com/redis/go-redis/v9" + "github.com/rs/xid" +) + +type Broker struct { + client redis.UniversalClient + domain string + + id xid.ID +} + +type subscription struct { + ch chan json.RawMessage + err error + + closed atomic.Bool + mu sync.RWMutex + pubsub *redis.PubSub +} + +// channel that will close when done or error +func (s *subscription) Listen() <-chan json.RawMessage { + return s.ch +} + +// should close the channel and also stop listening +func (s *subscription) Close() error { + if s.closed.CompareAndSwap(false, true) { + s.pubsub.Close() + } + return nil +} + +// this hold errors +func (s *subscription) Err() error { + s.mu.RLock() + defer s.mu.RUnlock() + return s.err +} + +func (s *Broker) WriteRequest(ctx context.Context, clientId string, msg json.RawMessage) error { + req, err := json.Marshal(&RedisRequest{ + ReplyChannel: clientId, + Message: msg, + }) + if err != nil { + return err + } + return s.client.LPush(ctx, s.domain+reqDomainSuffix, req).Err() +} + +func (s *Broker) Subscribe(ctx context.Context, clientId string) (broker.Subscription, error) { + topic := s.domain + "." + clientId + respDomainSuffix + sub := &subscription{ + ch: make(chan json.RawMessage, 16), + } + + sub.pubsub = s.client.Subscribe(ctx, topic) + ch := sub.pubsub.Channel() + go func() { + for { + select { + case <-ctx.Done(): + sub.Close() + return + case t := <-ch: + select { + case sub.ch <- json.RawMessage(stringToBytes(t.Payload)): + default: + } + } + } + }() + + return sub, nil +} + +func CreateBroker(ctx context.Context, domain string, opts *redis.UniversalOptions) *Broker { + c := redis.NewUniversalClient(opts) + // the xid doesn't need to be secure, since we assume anyone with access to the redis cluster can do anything anyways. + s := &Broker{ + client: c, + id: xid.New(), + domain: domain, + } + return s +} + +type RedisRequest struct { + ReplyChannel string `json:"r"` + Message json.RawMessage `json:"msg"` +} + +func (s *Broker) ReadRequest(ctx context.Context) (json.RawMessage, func(json.RawMessage) error, error) { + timeout := time.Hour + + res, err := s.client.BLPop(ctx, timeout, s.domain+reqDomainSuffix).Result() + if err != nil { + return nil, nil, err + } + if len(res) != 2 { + return nil, nil, err + } + redisReq := &RedisRequest{} + err = json.Unmarshal(stringToBytes(res[1]), redisReq) + if err != nil { + return nil, nil, err + } + return redisReq.Message, func(rm json.RawMessage) error { + if len(rm) == 0 { + return nil + } + target := s.domain + "." + redisReq.ReplyChannel + respDomainSuffix + err := s.client.Publish(context.Background(), target, []byte(rm)).Err() + if err != nil { + return err + } + return nil + }, nil +} + +const reqDomainSuffix = ".req" +const respDomainSuffix = ".resp" + +// stringHeader is the runtime representation of a string. +// It should be identical to reflect.StringHeader +type stringHeader struct { + data unsafe.Pointer + stringLen int +} + +// sliceHeader is the runtime representation of a slice. +// It should be identical to reflect.sliceHeader +type sliceHeader struct { + data unsafe.Pointer + sliceLen int + sliceCap int +} + +func stringToBytes(s string) (b []byte) { + stringHeader := (*stringHeader)(unsafe.Pointer(&s)) + sliceHeader := (*sliceHeader)(unsafe.Pointer(&b)) + sliceHeader.data = stringHeader.data + sliceHeader.sliceLen = len(s) + sliceHeader.sliceCap = len(s) + return b +} diff --git a/contrib/codecs/broker/brokerredis/codec_test.go b/contrib/codecs/broker/brokerredis/codec_test.go new file mode 100644 index 0000000000000000000000000000000000000000..190286a6b6fde5a71bdeaf2cd48e4b0f0833e9e7 --- /dev/null +++ b/contrib/codecs/broker/brokerredis/codec_test.go @@ -0,0 +1,13 @@ +package redis + +import ( + "testing" + + "gfx.cafe/open/jrpc/pkg/jrpctest" +) + +func TestBasicSuite(t *testing.T) { + jrpctest.RunBasicTestSuite(t, jrpctest.BasicTestSuiteArgs{ + ServerMaker: ServerMaker, + }) +} diff --git a/contrib/codecs/broker/brokerredis/redis.go b/contrib/codecs/broker/brokerredis/redis.go new file mode 100644 index 0000000000000000000000000000000000000000..0f911072887968b7c28fa13437904297caf94aaa --- /dev/null +++ b/contrib/codecs/broker/brokerredis/redis.go @@ -0,0 +1,158 @@ +package redis + +import ( + "context" + "encoding/json" + "sync" + "sync/atomic" + "time" + "unsafe" + + "gfx.cafe/open/jrpc/contrib/codecs/broker" + "github.com/redis/go-redis/v9" + "github.com/rs/xid" +) + +type Broker struct { + client redis.UniversalClient + domain string + + id xid.ID +} + +type subscription struct { + ch chan json.RawMessage + err error + + closed atomic.Bool + mu sync.RWMutex + pubsub *redis.PubSub +} + +// channel that will close when done or error +func (s *subscription) Listen() <-chan json.RawMessage { + return s.ch +} + +// should close the channel and also stop listening +func (s *subscription) Close() error { + if s.closed.CompareAndSwap(false, true) { + s.pubsub.Close() + } + return nil +} + +// this hold errors +func (s *subscription) Err() error { + s.mu.RLock() + defer s.mu.RUnlock() + return s.err +} + +func (s *Broker) WriteRequest(ctx context.Context, clientId string, msg json.RawMessage) error { + req, err := json.Marshal(&RedisRequest{ + ReplyChannel: clientId, + Message: msg, + }) + if err != nil { + return err + } + return s.client.LPush(ctx, s.domain+reqDomainSuffix, req).Err() +} + +func (s *Broker) Subscribe(ctx context.Context, clientId string) (broker.Subscription, error) { + topic := s.domain + "." + clientId + respDomainSuffix + sub := &subscription{ + ch: make(chan json.RawMessage, 16), + } + + sub.pubsub = s.client.Subscribe(ctx, topic) + ch := sub.pubsub.Channel() + go func() { + for { + select { + case <-ctx.Done(): + sub.Close() + return + case t := <-ch: + select { + case sub.ch <- json.RawMessage(stringToBytes(t.Payload)): + default: + } + } + } + }() + + return sub, nil +} + +func CreateBroker(ctx context.Context, domain string, opts *redis.UniversalOptions) *Broker { + c := redis.NewUniversalClient(opts) + // the xid doesn't need to be secure, since we assume anyone with access to the redis cluster can do anything anyways. + s := &Broker{ + client: c, + id: xid.New(), + domain: domain, + } + return s +} + +type RedisRequest struct { + ReplyChannel string `json:"r"` + Message json.RawMessage `json:"msg"` +} + +func (s *Broker) ReadRequest(ctx context.Context) (json.RawMessage, func(json.RawMessage) error, error) { + timeout := time.Hour + + res, err := s.client.BLPop(ctx, timeout, s.domain+reqDomainSuffix).Result() + if err != nil { + return nil, nil, err + } + if len(res) != 2 { + return nil, nil, err + } + redisReq := &RedisRequest{} + err = json.Unmarshal(stringToBytes(res[1]), redisReq) + if err != nil { + return nil, nil, err + } + return redisReq.Message, func(rm json.RawMessage) error { + if len(rm) == 0 { + return nil + } + target := s.domain + "." + redisReq.ReplyChannel + respDomainSuffix + err := s.client.Publish(context.Background(), target, []byte(rm)).Err() + if err != nil { + return err + } + return nil + }, nil +} + +const reqDomainSuffix = ".req" +const respDomainSuffix = ".resp" + +// stringHeader is the runtime representation of a string. +// It should be identical to reflect.StringHeader +type stringHeader struct { + data unsafe.Pointer + stringLen int +} + +// sliceHeader is the runtime representation of a slice. +// It should be identical to reflect.sliceHeader +type sliceHeader struct { + data unsafe.Pointer + sliceLen int + sliceCap int +} + +func stringToBytes(s string) (b []byte) { + stringHeader := (*stringHeader)(unsafe.Pointer(&s)) + sliceHeader := (*sliceHeader)(unsafe.Pointer(&b)) + sliceHeader.data = stringHeader.data + sliceHeader.sliceLen = len(s) + sliceHeader.sliceCap = len(s) + return b +} diff --git a/contrib/codecs/broker/brokerredis/testing.go b/contrib/codecs/broker/brokerredis/testing.go new file mode 100644 index 0000000000000000000000000000000000000000..f210f32f07e03545e46b3426f07003271dc984cc --- /dev/null +++ b/contrib/codecs/broker/brokerredis/testing.go @@ -0,0 +1,34 @@ +package redis + +import ( + "context" + + "gfx.cafe/open/jrpc/contrib/codecs/broker" + "gfx.cafe/open/jrpc/pkg/codec" + "gfx.cafe/open/jrpc/pkg/jrpctest" + "gfx.cafe/open/jrpc/pkg/server" + "github.com/alicebob/miniredis/v2" + "github.com/redis/go-redis/v9" +) + +func ServerMaker() (*server.Server, jrpctest.ClientMaker, func()) { + redisServer, err := miniredis.Run() + if err != nil { + panic(err) + } + connOpts := &redis.UniversalOptions{ + Addrs: []string{redisServer.Addr()}, + } + ctx := redisServer.Ctx + ctx, cn := context.WithCancel(ctx) + b := CreateBroker(ctx, "jrpc", connOpts) + s := jrpctest.NewServer() + spokeServer := (&broker.Server{Server: s}) + go spokeServer.ServeSpoke(ctx, b) + return s, func() codec.Conn { + return broker.NewClient(b) + }, func() { + cn() + redisServer.CtxCancel() + } +} diff --git a/exp/redis/client.go b/contrib/codecs/broker/client.go similarity index 81% rename from exp/redis/client.go rename to contrib/codecs/broker/client.go index 8969d63d427601eee990324be6a1abb878aaf68a..6ccfbc39768bcb2be90260aa65ea0484a3b324dc 100644 --- a/exp/redis/client.go +++ b/contrib/codecs/broker/client.go @@ -1,24 +1,21 @@ -package redis +package broker import ( "bytes" "context" "encoding/json" - "fmt" "sync" "gfx.cafe/open/jrpc/pkg/clientutil" "gfx.cafe/open/jrpc/pkg/codec" - "github.com/redis/go-redis/v9" "github.com/rs/xid" ) type Client struct { p *clientutil.IdReply - c redis.UniversalClient + c ClientSpoke clientId string - domain string ctx context.Context cn context.CancelFunc @@ -30,21 +27,14 @@ type Client struct { handlerPeer codec.PeerInfo } -func Dial(url string, domain string) *Client { - return NewClient(redis.NewUniversalClient(&redis.UniversalOptions{ - Addrs: []string{url}, - }), domain) -} - -func NewClient(c redis.UniversalClient, domain string) *Client { +func NewClient(spoke ClientSpoke) *Client { cl := &Client{ - c: c, + c: spoke, p: clientutil.NewIdReply(), handlerPeer: codec.PeerInfo{ - Transport: "redis", + Transport: "broker", RemoteAddr: "", }, - domain: domain, // this doesn't need to be secure bc... you have access to the redis instance lol clientId: xid.New().String(), handler: codec.HandlerFunc(func(w codec.ResponseWriter, r *codec.Request) {}), @@ -72,12 +62,20 @@ func (c *Client) Mount(h codec.Middleware) { } func (c *Client) listen() error { - subCh := fmt.Sprintf(c.domain + "." + c.clientId) - sub := c.c.PSubscribe(c.ctx, subCh) - msgCh := sub.Channel() + defer c.cn() + sub, err := c.c.Subscribe(c.ctx, c.clientId) + if err != nil { + return err + } + defer sub.Close() for { - incomingMsg := <-msgCh - msgs, _ := codec.ParseMessage(stringToBytes(incomingMsg.Payload)) + var incomingMsg json.RawMessage + select { + case incomingMsg = <-sub.Listen(): + case <-c.ctx.Done(): + return c.ctx.Err() + } + msgs, _ := codec.ParseMessage(incomingMsg) for i := range msgs { v := msgs[i] if v == nil { @@ -122,11 +120,7 @@ func (c *Client) Do(ctx context.Context, result any, method string, params any) if err != nil { return err } - toFwd, _ := json.Marshal(&RedisRequest{ - ReplyChannel: c.clientId, - Message: fwd, - }) - err = c.writeContext(req.Context(), toFwd) + err = c.writeContext(req.Context(), fwd) if err != nil { return err } @@ -164,11 +158,7 @@ func (c *Client) BatchCall(ctx context.Context, b ...*codec.BatchElem) error { if err != nil { return err } - pkg, _ := json.Marshal(&RedisRequest{ - ReplyChannel: c.clientId, - Message: buf.Bytes(), - }) - err = c.writeContext(ctx, pkg) + err = c.writeContext(ctx, buf.Bytes()) if err != nil { return err } @@ -223,7 +213,7 @@ func (c *Client) Close() error { func (c *Client) writeContext(ctx context.Context, xs []byte) error { errch := make(chan error) go func() { - err := c.c.LPush(ctx, c.domain+reqDomainSuffix, xs).Err() + err := c.c.WriteRequest(ctx, c.clientId, xs) select { case errch <- err: case <-ctx.Done(): diff --git a/exp/redis/codec.go b/contrib/codecs/broker/codec.go similarity index 91% rename from exp/redis/codec.go rename to contrib/codecs/broker/codec.go index 5c0ec05161a4c366335e67104a2a4918acd5cbe3..b372afa27c17f06c6bc24c80d91db3a3b7343bbc 100644 --- a/exp/redis/codec.go +++ b/contrib/codecs/broker/codec.go @@ -1,4 +1,4 @@ -package redis +package broker import ( "bytes" @@ -30,14 +30,14 @@ type httpError struct { err error } -func NewCodec(req *RedisRequest, replier func(json.RawMessage) error) *Codec { +func NewCodec(req json.RawMessage, replier func(json.RawMessage) error) *Codec { c := &Codec{ replier: replier, ansCh: make(chan *serverutil.Bundle, 1), closeCh: make(chan struct{}), } c.ctx, c.cn = context.WithCancel(context.Background()) - bundle := serverutil.ParseBundle(req.Message) + bundle := serverutil.ParseBundle(req) c.ansCh <- bundle return c } diff --git a/contrib/codecs/broker/codec_test.go b/contrib/codecs/broker/codec_test.go new file mode 100644 index 0000000000000000000000000000000000000000..66e02f06d37737587f578720119cf4a17a3e8dfe --- /dev/null +++ b/contrib/codecs/broker/codec_test.go @@ -0,0 +1,28 @@ +package broker + +import ( + "context" + "testing" + + "gfx.cafe/open/jrpc/pkg/codec" + "gfx.cafe/open/jrpc/pkg/server" + + "gfx.cafe/open/jrpc/pkg/jrpctest" +) + +func TestBasicSuite(t *testing.T) { + ctx := context.Background() + jrpctest.RunBasicTestSuite(t, jrpctest.BasicTestSuiteArgs{ + ServerMaker: func() (*server.Server, jrpctest.ClientMaker, func()) { + broker := NewChannelBroker() + s := jrpctest.NewServer() + spokeServer := (&Server{Server: s}) + go spokeServer.ServeSpoke(ctx, broker) + return s, func() codec.Conn { + conn := NewClient(broker) + return conn + }, func() { + } + }, + }) +} diff --git a/exp/redis/server.go b/contrib/codecs/broker/server.go similarity index 69% rename from exp/redis/server.go rename to contrib/codecs/broker/server.go index 54771e1e6dabc898b4299a34ae0bc425358163c5..aee4e828cd52d324e7c4f4467af0d130a04e61cf 100644 --- a/exp/redis/server.go +++ b/contrib/codecs/broker/server.go @@ -1,17 +1,16 @@ -package redis +package broker import ( "context" "gfx.cafe/open/jrpc/pkg/server" - "tuxpa.in/a/zlog/log" ) type Server struct { Server *server.Server } -func (s *Server) ServeRedis(ctx context.Context, stream *ServerStream) { +func (s *Server) ServeSpoke(ctx context.Context, stream ServerSpoke) { for { select { case <-ctx.Done(): @@ -20,7 +19,6 @@ func (s *Server) ServeRedis(ctx context.Context, stream *ServerStream) { } req, fn, err := stream.ReadRequest(ctx) if err != nil { - log.Err(err).Msg("while reading bpop") continue } if req == nil { diff --git a/exp/redis/const.go b/exp/redis/const.go deleted file mode 100644 index 62e650cd353a69683753a0b630588bc5a6eba784..0000000000000000000000000000000000000000 --- a/exp/redis/const.go +++ /dev/null @@ -1,18 +0,0 @@ -package redis - -import "errors" - -const ( - // NOTE: if you change this, you will have to change the thing in jrpctest... its what its for now until tests get refactored - maxRequestContentLength = 1024 * 1024 * 5 - contentType = "application/json" -) - -// https://www.jsonrpc.org/historical/json-rpc-over-http.html#id13 -var acceptedContentTypes = []string{ - // https://www.jsonrpc.org/historical/json-rpc-over-http.html#id13 - contentType, "application/json-rpc", "application/jsonrequest", - // these are added because they make sense, fight me! - "application/jsonrpc2", "application/json-rpc2", "application/jrpc", -} -var ErrInvalidContentType = errors.New("invalid content type") diff --git a/exp/redis/redis.go b/exp/redis/redis.go deleted file mode 100644 index 281af544ffd5738600bfe478fa5a7a5b5387cdf7..0000000000000000000000000000000000000000 --- a/exp/redis/redis.go +++ /dev/null @@ -1,66 +0,0 @@ -package redis - -import ( - "context" - "encoding/json" - "time" - "unsafe" - - "github.com/redis/go-redis/v9" - "github.com/rs/xid" -) - -type ServerStream struct { - client redis.UniversalClient - domain string - - id xid.ID -} - -func CreateServerStream(ctx context.Context, domain string, opts *redis.UniversalOptions) (*ServerStream, error) { - c := redis.NewUniversalClient(opts) - // the xid doesn't need to be secure, since we assume anyone with access to the redis cluster can do anything anyways. - s := &ServerStream{ - client: c, - id: xid.New(), - domain: domain, - } - return s, nil -} - -type RedisRequest struct { - ReplyChannel string `json:"r"` - Message json.RawMessage `json:"msg"` -} - -func (s *ServerStream) ReadRequest(ctx context.Context) (*RedisRequest, func(json.RawMessage) error, error) { - timeout := time.Hour - res, err := s.client.BLPop(ctx, timeout, s.domain+reqDomainSuffix).Result() - if err != nil { - return nil, nil, err - } - if len(res) != 2 { - return nil, nil, err - } - redisReq := &RedisRequest{} - err = json.Unmarshal(stringToBytes(res[1]), redisReq) - if err != nil { - return nil, nil, err - } - return redisReq, func(rm json.RawMessage) error { - target := s.domain + "." + redisReq.ReplyChannel - return s.client.Publish(context.Background(), target, string(rm)).Err() - }, nil -} - -const reqDomainSuffix = ".req" -const respDomainSuffix = ".resp" - -func stringToBytes(s string) []byte { - return *(*[]byte)(unsafe.Pointer( - &struct { - string - Cap int - }{s, len(s)}, - )) -} diff --git a/go.mod b/go.mod index 606f5e2266af665cf1d132ba40a7560cf7e2f5f5..eb63b13381930e84df5106726c4c6fd458cae971 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/go-faster/jx v1.0.0 github.com/goccy/go-json v0.10.0 github.com/iancoleman/strcase v0.3.0 + github.com/nats-io/nats.go v1.28.0 github.com/redis/go-redis/v9 v9.0.5 github.com/rs/xid v1.4.0 github.com/stretchr/testify v1.8.2 @@ -27,16 +28,22 @@ require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/go-faster/errors v0.6.1 // indirect - github.com/klauspost/compress v1.15.15 // indirect + github.com/golang/protobuf v1.5.3 // indirect + github.com/klauspost/compress v1.16.7 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.18 // indirect + github.com/nats-io/nats-server/v2 v2.9.21 // indirect + github.com/nats-io/nkeys v0.4.4 // indirect + github.com/nats-io/nuid v1.0.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rs/zerolog v1.29.0 // indirect github.com/segmentio/asm v1.2.0 // indirect github.com/yuin/gopher-lua v1.1.0 // indirect + golang.org/x/crypto v0.11.0 // indirect golang.org/x/exp v0.0.0-20230206171751-46f607a40771 // indirect - golang.org/x/sys v0.7.0 // indirect + golang.org/x/sys v0.10.0 // indirect + google.golang.org/protobuf v1.31.0 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index fdfc007fb36fa6bb7d8ddff2374ad6a0fd015aba..466f91b5feee23d3c91988bb676d43a0fad32acc 100644 --- a/go.sum +++ b/go.sum @@ -40,14 +40,18 @@ github.com/go-faster/jx v1.0.0/go.mod h1:zm8SlkwK+H0TYNKYtVJ/7cWFS7soJBQWhcPctKy github.com/goccy/go-json v0.10.0 h1:mXKd9Qw4NuzShiRlOXKews24ufknHO7gx30lsDyokKA= github.com/goccy/go-json v0.10.0/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM= github.com/iancoleman/strcase v0.3.0 h1:nTXanmYxhfFAMjZL34Ov6gkzEsSJZ5DbhxWjvSASxEI= github.com/iancoleman/strcase v0.3.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho= github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= -github.com/klauspost/compress v1.15.15 h1:EF27CXIuDsYJ6mmvtBRlEuB2UVOqHG1tAXgZ7yIO+lw= -github.com/klauspost/compress v1.15.15/go.mod h1:ZcK2JAFqKOpnBlxcLsJzYfrS9X1akm9fHZNnD9+Vo/4= +github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= +github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= @@ -62,6 +66,16 @@ github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27k github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.18 h1:DOKFKCQ7FNG2L1rbrmstDN4QVRdS89Nkh85u68Uwp98= github.com/mattn/go-isatty v0.0.18/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= +github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4= +github.com/nats-io/nats-server/v2 v2.9.21 h1:2TBTh0UDE74eNXQmV4HofsmRSCiVN0TH2Wgrp6BD6fk= +github.com/nats-io/nats-server/v2 v2.9.21/go.mod h1:ozqMZc2vTHcNcblOiXMWIXkf8+0lDGAi5wQcG+O1mHU= +github.com/nats-io/nats.go v1.28.0 h1:Th4G6zdsz2d0OqXdfzKLClo6bOfoI/b1kInhRtFIy5c= +github.com/nats-io/nats.go v1.28.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc= +github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA= +github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -86,6 +100,8 @@ github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/yuin/gopher-lua v1.1.0 h1:BojcDhfyDWgU2f2TOzYK/g5p2gxMrku8oupLDqlnSqE= github.com/yuin/gopher-lua v1.1.0/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= +golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA= +golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio= golang.org/x/exp v0.0.0-20230206171751-46f607a40771 h1:xP7rWLUr1e1n2xkK5YB4LI0hPEy3LJC6Wk+D4pGlOJg= golang.org/x/exp v0.0.0-20230206171751-46f607a40771/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -94,10 +110,15 @@ golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220915200043-7b5979e65e41/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU= -golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA= +golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= +google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=