Newer
Older
package broker
import (
"context"
"encoding/json"
"strings"
"sync"
"sync/atomic"
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
)
type subscription struct {
topic string
ch chan json.RawMessage
err error
closed atomic.Bool
mu sync.RWMutex
}
// channel that will close when done or error
func (s *subscription) Listen() <-chan json.RawMessage {
return s.ch
}
// should close the channel and also stop listening
func (s *subscription) Close() error {
s.closed.CompareAndSwap(false, true)
return nil
}
// this hold errors
func (s *subscription) Err() error {
s.mu.RLock()
defer s.mu.RUnlock()
return s.err
}
type frame struct {
topic string
data json.RawMessage
}
type ChannelBroker struct {
mu sync.RWMutex
subs map[int]*subscription
subCount int
msgs chan *frame
domain string
}
func NewChannelBroker() *ChannelBroker {
return &ChannelBroker{
subs: map[int]*subscription{},
msgs: make(chan *frame, 128),
}
}
func (b *ChannelBroker) SetDroppedMessageHandler(fn func(string, []byte)) *ChannelBroker {
b.onDroppedMessage = fn
return b
}
func (b *ChannelBroker) ReadRequest(ctx context.Context) (json.RawMessage, Replier, error) {
select {
case <-ctx.Done():
return nil, nil, ctx.Err()
case f := <-b.msgs:
return f.data, ReplierFunc(func(fn func(*jx.Encoder) error) error {
enc := &jx.Encoder{}
err := fn(enc)
if err != nil {
return err
}
return b.Publish(context.Background(), f.topic, json.RawMessage(enc.Bytes()))
}), nil
}
}
func (b *ChannelBroker) WriteRequest(ctx context.Context, topic string, msg json.RawMessage) error {
select {
case <-ctx.Done():
return ctx.Err()
case b.msgs <- &frame{data: msg, topic: topic}:
}
return nil
}
func (b *ChannelBroker) Publish(ctx context.Context, topic string, data []byte) error {
b.mu.RLock()
defer b.mu.RUnlock()
for _, v := range b.subs {
if v.closed.Load() {
continue
}
if childTopicMatchesParent(v.topic, topic) {
select {
case v.ch <- data:
default:
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
}
}
}
return nil
}
func (b *ChannelBroker) Subscribe(ctx context.Context, topic string) (Subscription, error) {
sub := &subscription{
topic: topic,
ch: make(chan json.RawMessage, 16),
}
b.mu.Lock()
b.subCount = b.subCount + 1
id := b.subCount
b.subs[id] = sub
b.mu.Unlock()
// gc after adding a new subscription
b.gc()
return sub, nil
}
func (b *ChannelBroker) gc() {
b.mu.Lock()
defer b.mu.Unlock()
for k, v := range b.subs {
if v.closed.Load() {
delete(b.subs, k)
}
}
}
// This is to see if a message with topic child should be matched with subscription parent
// so the child cannot contain wildcards * and >, but the parent can
func childTopicMatchesParent(parentString string, childString string) bool {
parent := strings.Split(parentString, ".")
child := strings.Split(childString, ".")
// if the length of the child is less than the length of the parent, its not possible for match
// for instance, if the parent topic is "one.two", and the child is "one", it will never match
if len(child) < len(parent) {
return false
}
// this is safe because length of child must be equal to or lower than parent, from previous
for idx, v := range parent {
// if parent is wildcard, match all, so continue
if v == "*" {
continue
}
// if the > wildcard is at the end, and we have exited since then, we are done
if v == ">" && len(parent)-1 == idx {
return true
}
// else make sure parent matches child.
if child[idx] != v {
return false
}
}
if len(child) == len(parent) {
return true
}
return false
}