good morning!!!!

Skip to content
Commits on Source (5)
  • a's avatar
    ok · 3607aca5
    a authored
    3607aca5
  • a's avatar
    okg · c626e729
    a authored
    c626e729
  • a's avatar
    remove · e4663fef
    a authored
    e4663fef
  • a's avatar
    reduce dep · 1307aee2
    a authored
    1307aee2
  • a's avatar
    one less · 82bd7f24
    a authored
    82bd7f24
......@@ -6,8 +6,6 @@ import (
"strings"
"sync"
"sync/atomic"
"tuxpa.in/a/zlog/log"
)
type subscription struct {
......@@ -47,6 +45,8 @@ type ChannelBroker struct {
subs map[int]*subscription
subCount int
onDroppedMessage func(string, []byte)
msgs chan *frame
domain string
......@@ -59,6 +59,11 @@ func NewChannelBroker() *ChannelBroker {
}
}
func (b *ChannelBroker) SetDroppedMessageHandler(fn func(string, []byte)) *ChannelBroker {
b.onDroppedMessage = fn
return b
}
func (b *ChannelBroker) ReadRequest(ctx context.Context) (json.RawMessage, func(json.RawMessage) error, error) {
select {
case <-ctx.Done():
......@@ -90,7 +95,9 @@ func (b *ChannelBroker) Publish(ctx context.Context, topic string, data []byte)
select {
case v.ch <- data:
default:
log.Trace().Str("topic", topic).Msg("dropped message")
if b.onDroppedMessage != nil {
b.onDroppedMessage(topic, data)
}
}
}
}
......
......@@ -68,7 +68,7 @@ func (c *WrapClient) Subscribe(ctx context.Context, namespace string, channel an
// send the actual message to initialize the subscription
var result string
err := c.conn.Do(ctx, &result, namespace+subscribeMethodSuffix, args)
err := c.conn.Do(ctx, &result, namespace+serviceMethodSeparator+subscribeMethodSuffix, args)
if err != nil {
return nil, err
}
......@@ -166,7 +166,7 @@ func (c *clientSub) Err() <-chan error {
func (c *clientSub) Unsubscribe() error {
// TODO: dont use context background here...
var result string
err := c.conn.Do(context.Background(), &result, c.namespace+unsubscribeMethodSuffix, nil)
err := c.conn.Do(context.Background(), &result, c.namespace+serviceMethodSeparator+unsubscribeMethodSuffix, nil)
if err != nil {
return err
}
......
......@@ -41,11 +41,11 @@ func (e *Engine) Middleware() func(codec.Handler) codec.Handler {
return codec.HandlerFunc(func(w codec.ResponseWriter, r *codec.Request) {
// its a subscription, so install a notification handler
switch {
case strings.HasSuffix(r.Method, subscribeMethodSuffix):
case strings.HasSuffix(r.Method, serviceMethodSeparator+subscribeMethodSuffix):
// create the notifier to inject into the context
n := &Notifier{
h: w,
namespace: strings.TrimSuffix(r.Method, subscribeMethodSuffix),
namespace: strings.TrimSuffix(r.Method, serviceMethodSeparator+subscribeMethodSuffix),
id: e.idgen(),
err: make(chan error, 1),
}
......@@ -59,7 +59,7 @@ func (e *Engine) Middleware() func(codec.Handler) codec.Handler {
// then inject the notifier
r = r.WithContext(context.WithValue(r.Context(), notifierKey{}, n))
h.ServeRPC(w, r)
case strings.HasSuffix(r.Method, unsubscribeMethodSuffix):
case strings.HasSuffix(r.Method, serviceMethodSeparator+unsubscribeMethodSuffix):
// read the subscription id to close
var subid SubID
err := r.ParamArray(subid)
......
......@@ -15,11 +15,16 @@ import (
json "github.com/goccy/go-json"
)
var serviceMethodSeparator = "/"
func SetServiceMethodSeparator(val string) {
serviceMethodSeparator = val
}
const (
subscribeMethodSuffix = "/subscribe"
notificationMethodSuffix = "/subscription"
unsubscribeMethodSuffix = "/unsubscribe"
serviceMethodSeparator = "/"
subscribeMethodSuffix = "subscribe"
notificationMethodSuffix = "subscription"
unsubscribeMethodSuffix = "unsubscribe"
maxClientSubscriptionBuffer = 12800
)
......@@ -105,5 +110,8 @@ func (n *Notifier) Err() <-chan error {
func (n *Notifier) send(data json.RawMessage) error {
params, _ := json.Marshal(&subscriptionResult{ID: string(n.id), Result: data})
return n.h.Notify(n.namespace+notificationMethodSuffix, json.RawMessage(params))
return n.h.Notify(
n.namespace+
serviceMethodSeparator+
notificationMethodSuffix, json.RawMessage(params))
}
......@@ -11,14 +11,14 @@ import (
"gfx.cafe/open/jrpc/contrib/openrpc/templates"
"gfx.cafe/open/jrpc/contrib/openrpc/types"
"github.com/iancoleman/strcase"
"gfx.cafe/open/jrpc/contrib/openrpc/util"
)
var funcs = template.FuncMap{
"list": func(v ...any) []any {
return v
},
"camelCase": strcase.ToCamel,
"camelCase": util.ToCamel,
"goType": func(v string) string {
switch v {
case "boolean":
......
package util
import (
"strings"
"sync"
)
var uppercaseAcronym = sync.Map{}
//"ID": "id",
// ConfigureAcronym allows you to add additional words which will be considered acronyms
func ConfigureAcronym(key, val string) {
uppercaseAcronym.Store(key, val)
}
// Converts a string to CamelCase
func toCamelInitCase(s string, initCase bool) string {
s = strings.TrimSpace(s)
if s == "" {
return s
}
a, hasAcronym := uppercaseAcronym.Load(s)
if hasAcronym {
s = a.(string)
}
n := strings.Builder{}
n.Grow(len(s))
capNext := initCase
prevIsCap := false
for i, v := range []byte(s) {
vIsCap := v >= 'A' && v <= 'Z'
vIsLow := v >= 'a' && v <= 'z'
if capNext {
if vIsLow {
v += 'A'
v -= 'a'
}
} else if i == 0 {
if vIsCap {
v += 'a'
v -= 'A'
}
} else if prevIsCap && vIsCap && !hasAcronym {
v += 'a'
v -= 'A'
}
prevIsCap = vIsCap
if vIsCap || vIsLow {
n.WriteByte(v)
capNext = false
} else if vIsNum := v >= '0' && v <= '9'; vIsNum {
n.WriteByte(v)
capNext = true
} else {
capNext = v == '_' || v == ' ' || v == '-' || v == '.'
}
}
return n.String()
}
// ToCamel converts a string to CamelCase
func ToCamel(s string) string {
return toCamelInitCase(s, true)
}
......@@ -4,8 +4,6 @@ import (
"regexp"
"strings"
"unicode"
"github.com/davecgh/go-spew/spew"
)
func SanitizeBackticks(s string) string {
......@@ -21,10 +19,6 @@ func Slice(val []interface{}, index int) interface{} {
return val[index]
}
func Inpect(val interface{}) string {
return spew.Sdump(val)
}
func CamelCase(name string) string {
var in []string
if strings.Contains(name, " ") {
......
......@@ -8,26 +8,20 @@ require (
gfx.cafe/util/go/frand v0.0.0-20230721185457-c559e86c829c
gfx.cafe/util/go/generic v0.0.0-20230721185457-c559e86c829c
github.com/alecthomas/kong v0.8.0
github.com/davecgh/go-spew v1.1.1
github.com/deckarep/golang-set v1.8.0
github.com/go-faster/jx v1.1.0
github.com/goccy/go-json v0.10.2
github.com/iancoleman/strcase v0.3.0
github.com/rs/xid v1.5.0
github.com/stretchr/testify v1.8.4
sigs.k8s.io/yaml v1.3.0
tuxpa.in/a/zlog v1.61.0
)
require (
github.com/aead/chacha20 v0.0.0-20180709150244-8b13a72661da // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-faster/errors v0.6.1 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rs/zerolog v1.30.0 // indirect
github.com/segmentio/asm v1.2.0 // indirect
golang.org/x/exp v0.0.0-20230206171751-46f607a40771 // indirect
golang.org/x/sys v0.11.0 // indirect
......
......@@ -9,28 +9,25 @@ gfx.cafe/util/go/generic v0.0.0-20230721185457-c559e86c829c/go.mod h1:WvSX4JsCRB
github.com/aead/chacha20 v0.0.0-20180709150244-8b13a72661da h1:KjTM2ks9d14ZYCvmHS9iAKVt9AyzRSqNU1qabPih5BY=
github.com/aead/chacha20 v0.0.0-20180709150244-8b13a72661da/go.mod h1:eHEWzANqSiWQsof+nXEI9bUVUyV6F53Fp89EuCh2EAA=
github.com/alecthomas/assert/v2 v2.1.0 h1:tbredtNcQnoSd3QBhQWI7QZ3XHOVkw1Moklp2ojoH/0=
github.com/alecthomas/assert/v2 v2.1.0/go.mod h1:b/+1DI2Q6NckYi+3mXyH3wFb8qG37K/DuK80n7WefXA=
github.com/alecthomas/kong v0.8.0 h1:ryDCzutfIqJPnNn0omnrgHLbAggDQM2VWHikE1xqK7s=
github.com/alecthomas/kong v0.8.0/go.mod h1:n1iCIO2xS46oE8ZfYCNDqdR0b0wZNrXAIAqro/2132U=
github.com/alecthomas/repr v0.1.0 h1:ENn2e1+J3k09gyj2shc0dHr/yjaWSHRlrJ4DPMevDqE=
github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/alecthomas/repr v0.1.0/go.mod h1:2kn6fqh/zIyPLmm3ugklbEi5hg5wS435eygvNfaDQL8=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/deckarep/golang-set v1.8.0 h1:sk9/l/KqpunDwP7pSjUg0keiOOLEnOBHzykLrsPppp4=
github.com/deckarep/golang-set v1.8.0/go.mod h1:5nI87KwE7wgsBU1F4GKAw2Qod7p5kyS383rP6+o6qqo=
github.com/go-faster/errors v0.6.1 h1:nNIPOBkprlKzkThvS/0YaX8Zs9KewLCOSFQS5BU06FI=
github.com/go-faster/errors v0.6.1/go.mod h1:5MGV2/2T9yvlrbhe9pD9LO5Z/2zCSq2T8j+Jpi2LAyY=
github.com/go-faster/jx v1.1.0 h1:ZsW3wD+snOdmTDy9eIVgQdjUpXRRV4rqW8NS3t+20bg=
github.com/go-faster/jx v1.1.0/go.mod h1:vKDNikrKoyUmpzaJ0OkIkRQClNHFX/nF3dnTJZb3skg=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM=
github.com/iancoleman/strcase v0.3.0 h1:nTXanmYxhfFAMjZL34Ov6gkzEsSJZ5DbhxWjvSASxEI=
github.com/iancoleman/strcase v0.3.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho=
github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg=
github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
......@@ -41,36 +38,19 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc=
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.28.0/go.mod h1:NILgTygv/Uej1ra5XxGf82ZFSLk58MFGAUS2o6usyD0=
github.com/rs/zerolog v1.30.0 h1:SymVODrcRsaRaSInD9yQtKbtWqwsfoPcRff/oRXLj4c=
github.com/rs/zerolog v1.30.0/go.mod h1:/tk+P47gFdPXq4QYjvCmT5/Gsug2nagsFWBWhAiSi1w=
github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=
github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
golang.org/x/exp v0.0.0-20230206171751-46f607a40771 h1:xP7rWLUr1e1n2xkK5YB4LI0hPEy3LJC6Wk+D4pGlOJg=
golang.org/x/exp v0.0.0-20230206171751-46f607a40771/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220915200043-7b5979e65e41/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
......@@ -84,5 +64,3 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo=
sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8=
tuxpa.in/a/zlog v1.61.0 h1:7wrS6G4QwpnOmgHRQknrr7IgiMXrfGpekkU0PjM9FhE=
tuxpa.in/a/zlog v1.61.0/go.mod h1:CNpMe8laDHLSypx/DyxfX1S0oyxUydeo3aGTEbtRBhg=
......@@ -7,10 +7,10 @@ import (
"sync/atomic"
"gfx.cafe/open/jrpc/pkg/codec"
"gfx.cafe/open/jrpc/pkg/util/mapset"
"gfx.cafe/util/go/bufpool"
mapset "github.com/deckarep/golang-set"
"github.com/go-faster/jx"
"github.com/goccy/go-json"
)
......@@ -19,7 +19,7 @@ import (
type Server struct {
services codec.Handler
run int32
codecs mapset.Set
codecs *mapset.Set[codec.ReaderWriter]
Tracing Tracing
}
......@@ -30,7 +30,7 @@ type Tracing struct {
// NewServer creates a new server instance with no registered handlers.
func NewServer(r codec.Handler) *Server {
server := &Server{
codecs: mapset.NewSet(),
codecs: mapset.NewSet[codec.ReaderWriter](),
run: 1,
}
server.services = r
......@@ -175,8 +175,8 @@ func (s *Server) ServeCodec(pctx context.Context, remote codec.ReaderWriter) {
// subscriptions.
func (s *Server) Stop() {
if atomic.CompareAndSwapInt32(&s.run, 1, 0) {
s.codecs.Each(func(c any) bool {
c.(codec.ReaderWriter).Close()
s.codecs.Each(func(c codec.ReaderWriter) bool {
c.Close()
return true
})
}
......
package mapset
import "sync"
type Set[T comparable] struct {
m map[T]struct{}
mu sync.RWMutex
}
func NewSet[T comparable]() *Set[T] {
return &Set[T]{
m: make(map[T]struct{}),
}
}
func (s *Set[T]) Add(x T) {
s.mu.Lock()
defer s.mu.Unlock()
s.m[x] = struct{}{}
}
func (s *Set[T]) Remove(x T) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.m, x)
}
func (s *Set[T]) Each(fn func(x T) bool) {
for k := range s.m {
if !fn(k) {
return
}
}
}