good morning!!!!

Skip to content
Snippets Groups Projects
broker_inproc.go 3.48 KiB
Newer Older
a's avatar
a committed
package broker

import (
	"context"
	"encoding/json"
	"strings"
	"sync"
	"sync/atomic"
)

type subscription struct {
	topic string
	ch    chan json.RawMessage
	err   error

	closed atomic.Bool
	mu     sync.RWMutex
}

// channel that will close when done or error
func (s *subscription) Listen() <-chan json.RawMessage {
	return s.ch
}

// should close the channel and also stop listening
func (s *subscription) Close() error {
	s.closed.CompareAndSwap(false, true)
	return nil
}

// this hold errors
func (s *subscription) Err() error {
	s.mu.RLock()
	defer s.mu.RUnlock()
	return s.err
}

type frame struct {
	topic string
	data  json.RawMessage
}

type ChannelBroker struct {
	mu       sync.RWMutex
	subs     map[int]*subscription
	subCount int

a's avatar
a committed
	onDroppedMessage func(string, []byte)

a's avatar
a committed
	msgs chan *frame

	domain string
}

func NewChannelBroker() *ChannelBroker {
	return &ChannelBroker{
		subs: map[int]*subscription{},
		msgs: make(chan *frame, 128),
	}
}

a's avatar
a committed
func (b *ChannelBroker) SetDroppedMessageHandler(fn func(string, []byte)) *ChannelBroker {
	b.onDroppedMessage = fn
	return b
}

a's avatar
a committed
func (b *ChannelBroker) ReadRequest(ctx context.Context) (json.RawMessage, func(json.RawMessage) error, error) {
	select {
	case <-ctx.Done():
		return nil, nil, ctx.Err()
	case f := <-b.msgs:
		return f.data, func(resp json.RawMessage) error {
			return b.Publish(context.Background(), f.topic, resp)
		}, nil
	}
}

func (b *ChannelBroker) WriteRequest(ctx context.Context, topic string, msg json.RawMessage) error {
	select {
	case <-ctx.Done():
		return ctx.Err()
	case b.msgs <- &frame{data: msg, topic: topic}:
	}
	return nil
}

func (b *ChannelBroker) Publish(ctx context.Context, topic string, data []byte) error {
	b.mu.RLock()
	defer b.mu.RUnlock()
	for _, v := range b.subs {
		if v.closed.Load() {
			continue
		}
		if childTopicMatchesParent(v.topic, topic) {
			select {
			case v.ch <- data:
			default:
a's avatar
a committed
				if b.onDroppedMessage != nil {
					b.onDroppedMessage(topic, data)
				}
a's avatar
a committed
			}
		}
	}
	return nil
}

func (b *ChannelBroker) Subscribe(ctx context.Context, topic string) (Subscription, error) {
	sub := &subscription{
		topic: topic,
		ch:    make(chan json.RawMessage, 16),
	}
	b.mu.Lock()
	b.subCount = b.subCount + 1
	id := b.subCount
	b.subs[id] = sub
	b.mu.Unlock()
	// gc after adding a new subscription
	b.gc()
	return sub, nil
}

func (b *ChannelBroker) gc() {
	b.mu.Lock()
	defer b.mu.Unlock()
	for k, v := range b.subs {
		if v.closed.Load() {
			delete(b.subs, k)
		}
	}
}

// This is to see if a message with topic child should be matched with subscription parent
// so the child cannot contain wildcards * and >, but the parent can
func childTopicMatchesParent(parentString string, childString string) bool {
	parent := strings.Split(parentString, ".")
	child := strings.Split(childString, ".")
	// if the length of the child is less than the length of the parent, its not possible for match
	// for instance, if the parent topic is "one.two", and the child is "one", it will never match
	if len(child) < len(parent) {
		return false
	}
	// this is safe because length of child must be equal to or lower than parent, from previous
	for idx, v := range parent {
		// if parent is wildcard, match all, so continue
		if v == "*" {
			continue
		}
		// if the > wildcard is at the end, and we have exited since then, we are done
		if v == ">" && len(parent)-1 == idx {
			return true
		}
		// else make sure parent matches child.
		if child[idx] != v {
			return false
		}
	}
	if len(child) == len(parent) {
		return true
	}
	return false
}