goext/dataext/pubsub.go
Mike Schwörer 64f2cd7219
Some checks failed
Build Docker and Deploy / Run goext test-suite (push) Has been cancelled
v0.0.591 implement namespaced PubSub Broker in dataext
2025-07-16 12:44:55 +02:00

239 lines
5.4 KiB
Go

package dataext
import (
"context"
"git.blackforestbytes.com/BlackForestBytes/goext/langext"
"git.blackforestbytes.com/BlackForestBytes/goext/syncext"
"github.com/rs/xid"
"iter"
"sync"
"time"
)
type PubSubSubscription interface {
Unsubscribe()
}
type pubSubSubscription[T any] struct {
ID string
parent *PubSub[T]
namespace string
subLock *sync.Mutex
Func func(T)
Chan chan T
UnsubChan chan bool
}
func (p *pubSubSubscription[T]) Unsubscribe() {
p.parent.unsubscribe(p)
}
type PubSub[T any] struct {
masterLock *sync.Mutex
subscriptions map[string][]*pubSubSubscription[T]
}
func NewPubSub[T any](capacity int) *PubSub[T] {
return &PubSub[T]{
masterLock: &sync.Mutex{},
subscriptions: make(map[string][]*pubSubSubscription[T], capacity),
}
}
func (ps *PubSub[T]) Namespaces() []string {
ps.masterLock.Lock()
defer ps.masterLock.Unlock()
return langext.MapKeyArr(ps.subscriptions)
}
func (ps *PubSub[T]) SubscriberCount(ns string) int {
ps.masterLock.Lock()
defer ps.masterLock.Unlock()
return len(ps.subscriptions[ns])
}
// Publish sends `data` to all subscriber
// But unbuffered - if one is currently not listening, we skip (the actualReceiver < subscriber)
func (ps *PubSub[T]) Publish(ns string, data T) (subscriber int, actualReceiver int) {
ps.masterLock.Lock()
subs := langext.ArrCopy(ps.subscriptions[ns])
ps.masterLock.Unlock()
subscriber = len(subs)
actualReceiver = 0
for _, sub := range subs {
func() {
sub.subLock.Lock()
defer sub.subLock.Unlock()
if sub.Func != nil {
go func() { sub.Func(data) }()
actualReceiver++
} else if sub.Chan != nil {
msgSent := syncext.WriteNonBlocking(sub.Chan, data)
if msgSent {
actualReceiver++
}
}
}()
}
return subscriber, actualReceiver
}
// PublishWithContext sends `data` to all subscriber
// buffered - if one is currently not listening, we wait (but error out when the context runs out)
func (ps *PubSub[T]) PublishWithContext(ctx context.Context, ns string, data T) (subscriber int, actualReceiver int, err error) {
ps.masterLock.Lock()
subs := langext.ArrCopy(ps.subscriptions[ns])
ps.masterLock.Unlock()
subscriber = len(subs)
actualReceiver = 0
for _, sub := range subs {
err := func() error {
sub.subLock.Lock()
defer sub.subLock.Unlock()
if err := ctx.Err(); err != nil {
return err
}
if sub.Func != nil {
go func() { sub.Func(data) }()
actualReceiver++
} else if sub.Chan != nil {
err := syncext.WriteChannelWithContext(ctx, sub.Chan, data)
if err != nil {
return err
}
actualReceiver++
}
return nil
}()
if err != nil {
return subscriber, actualReceiver, err
}
}
return subscriber, actualReceiver, nil
}
// PublishWithTimeout sends `data` to all subscriber
// buffered - if one is currently not listening, we wait (but wait at most `timeout` - if the timeout is exceeded then actualReceiver < subscriber)
func (ps *PubSub[T]) PublishWithTimeout(ns string, data T, timeout time.Duration) (subscriber int, actualReceiver int) {
ps.masterLock.Lock()
subs := langext.ArrCopy(ps.subscriptions[ns])
ps.masterLock.Unlock()
subscriber = len(subs)
actualReceiver = 0
for _, sub := range subs {
func() {
sub.subLock.Lock()
defer sub.subLock.Unlock()
if sub.Func != nil {
go func() { sub.Func(data) }()
actualReceiver++
} else if sub.Chan != nil {
ok := syncext.WriteChannelWithTimeout(sub.Chan, data, timeout)
if ok {
actualReceiver++
}
}
}()
}
return subscriber, actualReceiver
}
func (ps *PubSub[T]) SubscribeByCallback(ns string, fn func(T)) PubSubSubscription {
ps.masterLock.Lock()
defer ps.masterLock.Unlock()
sub := &pubSubSubscription[T]{ID: xid.New().String(), namespace: ns, parent: ps, subLock: &sync.Mutex{}, Func: fn, UnsubChan: nil}
ps.subscriptions[ns] = append(ps.subscriptions[ns], sub)
return sub
}
func (ps *PubSub[T]) SubscribeByChan(ns string, chanBufferSize int) (chan T, PubSubSubscription) {
ps.masterLock.Lock()
defer ps.masterLock.Unlock()
msgCh := make(chan T, chanBufferSize)
sub := &pubSubSubscription[T]{ID: xid.New().String(), namespace: ns, parent: ps, subLock: &sync.Mutex{}, Chan: msgCh, UnsubChan: nil}
ps.subscriptions[ns] = append(ps.subscriptions[ns], sub)
return msgCh, sub
}
func (ps *PubSub[T]) SubscribeByIter(ns string, chanBufferSize int) (iter.Seq[T], PubSubSubscription) {
ps.masterLock.Lock()
defer ps.masterLock.Unlock()
msgCh := make(chan T, chanBufferSize)
unsubChan := make(chan bool, 8)
sub := &pubSubSubscription[T]{ID: xid.New().String(), namespace: ns, parent: ps, subLock: &sync.Mutex{}, Chan: msgCh, UnsubChan: unsubChan}
ps.subscriptions[ns] = append(ps.subscriptions[ns], sub)
iterFun := func(yield func(T) bool) {
for {
select {
case msg := <-msgCh:
if !yield(msg) {
sub.Unsubscribe()
return
}
case <-sub.UnsubChan:
sub.Unsubscribe()
return
}
}
}
return iterFun, sub
}
func (ps *PubSub[T]) unsubscribe(p *pubSubSubscription[T]) {
ps.masterLock.Lock()
defer ps.masterLock.Unlock()
p.subLock.Lock()
defer p.subLock.Unlock()
if p.Chan != nil {
close(p.Chan)
p.Chan = nil
}
if p.UnsubChan != nil {
syncext.WriteNonBlocking(p.UnsubChan, true)
close(p.UnsubChan)
p.UnsubChan = nil
}
ps.subscriptions[p.namespace] = langext.ArrFilter(ps.subscriptions[p.namespace], func(v *pubSubSubscription[T]) bool {
return v.ID != p.ID
})
if len(ps.subscriptions[p.namespace]) == 0 {
delete(ps.subscriptions, p.namespace)
}
}