All checks were successful
Build Docker and Deploy / Run goext test-suite (push) Successful in 2m54s
242 lines
6.1 KiB
Go
242 lines
6.1 KiB
Go
package dataext
|
|
|
|
import (
|
|
"context"
|
|
"git.blackforestbytes.com/BlackForestBytes/goext/langext"
|
|
"git.blackforestbytes.com/BlackForestBytes/goext/syncext"
|
|
"github.com/rs/xid"
|
|
"iter"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// PubSub is a simple Pub/Sub Broker
|
|
// Clients can subscribe to a namespace and receive published messages on this namespace
|
|
// Messages are broadcast to all subscribers
|
|
type PubSub[TNamespace comparable, TData any] struct {
|
|
masterLock *sync.Mutex
|
|
|
|
subscriptions map[TNamespace][]*pubSubSubscription[TNamespace, TData]
|
|
}
|
|
|
|
type PubSubSubscription interface {
|
|
Unsubscribe()
|
|
}
|
|
|
|
type pubSubSubscription[TNamespace comparable, TData any] struct {
|
|
ID string
|
|
|
|
parent *PubSub[TNamespace, TData]
|
|
namespace TNamespace
|
|
|
|
subLock *sync.Mutex
|
|
|
|
Func func(TData)
|
|
Chan chan TData
|
|
|
|
UnsubChan chan bool
|
|
}
|
|
|
|
func (p *pubSubSubscription[TNamespace, TData]) Unsubscribe() {
|
|
p.parent.unsubscribe(p)
|
|
}
|
|
|
|
func NewPubSub[TNamespace comparable, TData any](capacity int) *PubSub[TNamespace, TData] {
|
|
return &PubSub[TNamespace, TData]{
|
|
masterLock: &sync.Mutex{},
|
|
subscriptions: make(map[TNamespace][]*pubSubSubscription[TNamespace, TData], capacity),
|
|
}
|
|
}
|
|
|
|
func (ps *PubSub[TNamespace, TData]) Namespaces() []TNamespace {
|
|
ps.masterLock.Lock()
|
|
defer ps.masterLock.Unlock()
|
|
|
|
return langext.MapKeyArr(ps.subscriptions)
|
|
}
|
|
|
|
func (ps *PubSub[TNamespace, TData]) SubscriberCount(ns TNamespace) int {
|
|
ps.masterLock.Lock()
|
|
defer ps.masterLock.Unlock()
|
|
|
|
return len(ps.subscriptions[ns])
|
|
}
|
|
|
|
// Publish sends `data` to all subscriber
|
|
// But unbuffered - if one is currently not listening, we skip (the actualReceiver < subscriber)
|
|
func (ps *PubSub[TNamespace, TData]) Publish(ns TNamespace, data TData) (subscriber int, actualReceiver int) {
|
|
ps.masterLock.Lock()
|
|
subs := langext.ArrCopy(ps.subscriptions[ns])
|
|
ps.masterLock.Unlock()
|
|
|
|
subscriber = len(subs)
|
|
actualReceiver = 0
|
|
|
|
for _, sub := range subs {
|
|
func() {
|
|
sub.subLock.Lock()
|
|
defer sub.subLock.Unlock()
|
|
|
|
if sub.Func != nil {
|
|
go func() { sub.Func(data) }()
|
|
actualReceiver++
|
|
} else if sub.Chan != nil {
|
|
msgSent := syncext.WriteNonBlocking(sub.Chan, data)
|
|
if msgSent {
|
|
actualReceiver++
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
return subscriber, actualReceiver
|
|
}
|
|
|
|
// PublishWithContext sends `data` to all subscriber
|
|
// buffered - if one is currently not listening, we wait (but error out when the context runs out)
|
|
func (ps *PubSub[TNamespace, TData]) PublishWithContext(ctx context.Context, ns TNamespace, data TData) (subscriber int, actualReceiver int, err error) {
|
|
ps.masterLock.Lock()
|
|
subs := langext.ArrCopy(ps.subscriptions[ns])
|
|
ps.masterLock.Unlock()
|
|
|
|
subscriber = len(subs)
|
|
actualReceiver = 0
|
|
|
|
for _, sub := range subs {
|
|
err := func() error {
|
|
sub.subLock.Lock()
|
|
defer sub.subLock.Unlock()
|
|
|
|
if err := ctx.Err(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if sub.Func != nil {
|
|
go func() { sub.Func(data) }()
|
|
actualReceiver++
|
|
} else if sub.Chan != nil {
|
|
err := syncext.WriteChannelWithContext(ctx, sub.Chan, data)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
actualReceiver++
|
|
}
|
|
|
|
return nil
|
|
}()
|
|
if err != nil {
|
|
return subscriber, actualReceiver, err
|
|
}
|
|
}
|
|
|
|
return subscriber, actualReceiver, nil
|
|
}
|
|
|
|
// PublishWithTimeout sends `data` to all subscriber
|
|
// buffered - if one is currently not listening, we wait (but wait at most `timeout` - if the timeout is exceeded then actualReceiver < subscriber)
|
|
func (ps *PubSub[TNamespace, TData]) PublishWithTimeout(ns TNamespace, data TData, timeout time.Duration) (subscriber int, actualReceiver int) {
|
|
ps.masterLock.Lock()
|
|
subs := langext.ArrCopy(ps.subscriptions[ns])
|
|
ps.masterLock.Unlock()
|
|
|
|
subscriber = len(subs)
|
|
actualReceiver = 0
|
|
|
|
for _, sub := range subs {
|
|
func() {
|
|
sub.subLock.Lock()
|
|
defer sub.subLock.Unlock()
|
|
|
|
if sub.Func != nil {
|
|
go func() { sub.Func(data) }()
|
|
actualReceiver++
|
|
} else if sub.Chan != nil {
|
|
ok := syncext.WriteChannelWithTimeout(sub.Chan, data, timeout)
|
|
if ok {
|
|
actualReceiver++
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
return subscriber, actualReceiver
|
|
}
|
|
|
|
func (ps *PubSub[TNamespace, TData]) SubscribeByCallback(ns TNamespace, fn func(TData)) PubSubSubscription {
|
|
ps.masterLock.Lock()
|
|
defer ps.masterLock.Unlock()
|
|
|
|
sub := &pubSubSubscription[TNamespace, TData]{ID: xid.New().String(), namespace: ns, parent: ps, subLock: &sync.Mutex{}, Func: fn, UnsubChan: nil}
|
|
|
|
ps.subscriptions[ns] = append(ps.subscriptions[ns], sub)
|
|
|
|
return sub
|
|
}
|
|
|
|
func (ps *PubSub[TNamespace, TData]) SubscribeByChan(ns TNamespace, chanBufferSize int) (chan TData, PubSubSubscription) {
|
|
ps.masterLock.Lock()
|
|
defer ps.masterLock.Unlock()
|
|
|
|
msgCh := make(chan TData, chanBufferSize)
|
|
|
|
sub := &pubSubSubscription[TNamespace, TData]{ID: xid.New().String(), namespace: ns, parent: ps, subLock: &sync.Mutex{}, Chan: msgCh, UnsubChan: nil}
|
|
|
|
ps.subscriptions[ns] = append(ps.subscriptions[ns], sub)
|
|
|
|
return msgCh, sub
|
|
}
|
|
|
|
func (ps *PubSub[TNamespace, TData]) SubscribeByIter(ns TNamespace, chanBufferSize int) (iter.Seq[TData], PubSubSubscription) {
|
|
ps.masterLock.Lock()
|
|
defer ps.masterLock.Unlock()
|
|
|
|
msgCh := make(chan TData, chanBufferSize)
|
|
unsubChan := make(chan bool, 8)
|
|
|
|
sub := &pubSubSubscription[TNamespace, TData]{ID: xid.New().String(), namespace: ns, parent: ps, subLock: &sync.Mutex{}, Chan: msgCh, UnsubChan: unsubChan}
|
|
|
|
ps.subscriptions[ns] = append(ps.subscriptions[ns], sub)
|
|
|
|
iterFun := func(yield func(TData) bool) {
|
|
for {
|
|
select {
|
|
case msg := <-msgCh:
|
|
if !yield(msg) {
|
|
sub.Unsubscribe()
|
|
return
|
|
}
|
|
case <-sub.UnsubChan:
|
|
sub.Unsubscribe()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
return iterFun, sub
|
|
}
|
|
|
|
func (ps *PubSub[TNamespace, TData]) unsubscribe(p *pubSubSubscription[TNamespace, TData]) {
|
|
ps.masterLock.Lock()
|
|
defer ps.masterLock.Unlock()
|
|
|
|
p.subLock.Lock()
|
|
defer p.subLock.Unlock()
|
|
|
|
if p.Chan != nil {
|
|
close(p.Chan)
|
|
p.Chan = nil
|
|
}
|
|
if p.UnsubChan != nil {
|
|
syncext.WriteNonBlocking(p.UnsubChan, true)
|
|
close(p.UnsubChan)
|
|
p.UnsubChan = nil
|
|
}
|
|
|
|
ps.subscriptions[p.namespace] = langext.ArrFilter(ps.subscriptions[p.namespace], func(v *pubSubSubscription[TNamespace, TData]) bool {
|
|
return v.ID != p.ID
|
|
})
|
|
if len(ps.subscriptions[p.namespace]) == 0 {
|
|
delete(ps.subscriptions, p.namespace)
|
|
}
|
|
}
|