Files
goext/dataext/broadcaster.go
Mike Schwörer a58bb4b14b
Some checks failed
Build Docker and Deploy / Run goext test-suite (push) Failing after 2m30s
v0.0.600
2025-09-13 18:45:23 +02:00

231 lines
5.4 KiB
Go

package dataext
import (
"context"
"iter"
"sync"
"time"
"git.blackforestbytes.com/BlackForestBytes/goext/langext"
"git.blackforestbytes.com/BlackForestBytes/goext/syncext"
"github.com/rs/xid"
)
// Broadcaster is a simple Broadcaster channel
// This is a simpler interface over Broadcaster - which does not have distinct namespaces
type Broadcaster[TData any] struct {
masterLock *sync.Mutex
subscriptions []*broadcastSubscription[TData]
}
type BroadcastSubscription interface {
Unsubscribe()
}
type broadcastSubscription[TData any] struct {
ID string
parent *Broadcaster[TData]
subLock *sync.Mutex
Func func(TData)
Chan chan TData
UnsubChan chan bool
}
func (p *broadcastSubscription[TData]) Unsubscribe() {
p.parent.unsubscribe(p)
}
func NewBroadcaster[TData any](capacity int) *Broadcaster[TData] {
return &Broadcaster[TData]{
masterLock: &sync.Mutex{},
subscriptions: make([]*broadcastSubscription[TData], 0, capacity),
}
}
func (bb *Broadcaster[TData]) SubscriberCount() int {
bb.masterLock.Lock()
defer bb.masterLock.Unlock()
return len(bb.subscriptions)
}
// Publish sends `data` to all subscriber
// But unbuffered - if one is currently not listening, we skip (the actualReceiver < subscriber)
func (bb *Broadcaster[TData]) Publish(data TData) (subscriber int, actualReceiver int) {
bb.masterLock.Lock()
subs := langext.ArrCopy(bb.subscriptions)
bb.masterLock.Unlock()
subscriber = len(subs)
actualReceiver = 0
for _, sub := range subs {
func() {
sub.subLock.Lock()
defer sub.subLock.Unlock()
if sub.Func != nil {
go func() { sub.Func(data) }()
actualReceiver++
} else if sub.Chan != nil {
msgSent := syncext.WriteNonBlocking(sub.Chan, data)
if msgSent {
actualReceiver++
}
}
}()
}
return subscriber, actualReceiver
}
// PublishWithContext sends `data` to all subscriber
// buffered - if one is currently not listening, we wait (but error out when the context runs out)
func (bb *Broadcaster[TData]) PublishWithContext(ctx context.Context, data TData) (subscriber int, actualReceiver int, err error) {
bb.masterLock.Lock()
subs := langext.ArrCopy(bb.subscriptions)
bb.masterLock.Unlock()
subscriber = len(subs)
actualReceiver = 0
for _, sub := range subs {
err := func() error {
sub.subLock.Lock()
defer sub.subLock.Unlock()
if err := ctx.Err(); err != nil {
return err
}
if sub.Func != nil {
go func() { sub.Func(data) }()
actualReceiver++
} else if sub.Chan != nil {
err := syncext.WriteChannelWithContext(ctx, sub.Chan, data)
if err != nil {
return err
}
actualReceiver++
}
return nil
}()
if err != nil {
return subscriber, actualReceiver, err
}
}
return subscriber, actualReceiver, nil
}
// PublishWithTimeout sends `data` to all subscriber
// buffered - if one is currently not listening, we wait (but wait at most `timeout` - if the timeout is exceeded then actualReceiver < subscriber)
func (bb *Broadcaster[TData]) PublishWithTimeout(data TData, timeout time.Duration) (subscriber int, actualReceiver int) {
bb.masterLock.Lock()
subs := langext.ArrCopy(bb.subscriptions)
bb.masterLock.Unlock()
subscriber = len(subs)
actualReceiver = 0
for _, sub := range subs {
func() {
sub.subLock.Lock()
defer sub.subLock.Unlock()
if sub.Func != nil {
go func() { sub.Func(data) }()
actualReceiver++
} else if sub.Chan != nil {
ok := syncext.WriteChannelWithTimeout(sub.Chan, data, timeout)
if ok {
actualReceiver++
}
}
}()
}
return subscriber, actualReceiver
}
func (bb *Broadcaster[TData]) SubscribeByCallback(fn func(TData)) BroadcastSubscription {
bb.masterLock.Lock()
defer bb.masterLock.Unlock()
sub := &broadcastSubscription[TData]{ID: xid.New().String(), parent: bb, subLock: &sync.Mutex{}, Func: fn, UnsubChan: nil}
bb.subscriptions = append(bb.subscriptions, sub)
return sub
}
func (bb *Broadcaster[TData]) SubscribeByChan(chanBufferSize int) (chan TData, BroadcastSubscription) {
bb.masterLock.Lock()
defer bb.masterLock.Unlock()
msgCh := make(chan TData, chanBufferSize)
sub := &broadcastSubscription[TData]{ID: xid.New().String(), parent: bb, subLock: &sync.Mutex{}, Chan: msgCh, UnsubChan: nil}
bb.subscriptions = append(bb.subscriptions, sub)
return msgCh, sub
}
func (bb *Broadcaster[TData]) SubscribeByIter(chanBufferSize int) (iter.Seq[TData], BroadcastSubscription) {
bb.masterLock.Lock()
defer bb.masterLock.Unlock()
msgCh := make(chan TData, chanBufferSize)
unsubChan := make(chan bool, 8)
sub := &broadcastSubscription[TData]{ID: xid.New().String(), parent: bb, subLock: &sync.Mutex{}, Chan: msgCh, UnsubChan: unsubChan}
bb.subscriptions = append(bb.subscriptions, sub)
iterFun := func(yield func(TData) bool) {
for {
select {
case msg := <-msgCh:
if !yield(msg) {
sub.Unsubscribe()
return
}
case <-sub.UnsubChan:
sub.Unsubscribe()
return
}
}
}
return iterFun, sub
}
func (bb *Broadcaster[TData]) unsubscribe(p *broadcastSubscription[TData]) {
bb.masterLock.Lock()
defer bb.masterLock.Unlock()
p.subLock.Lock()
defer p.subLock.Unlock()
if p.Chan != nil {
close(p.Chan)
p.Chan = nil
}
if p.UnsubChan != nil {
syncext.WriteNonBlocking(p.UnsubChan, true)
close(p.UnsubChan)
p.UnsubChan = nil
}
bb.subscriptions = langext.ArrFilter(bb.subscriptions, func(v *broadcastSubscription[TData]) bool {
return v.ID != p.ID
})
}