package dataext import ( "context" "git.blackforestbytes.com/BlackForestBytes/goext/langext" "git.blackforestbytes.com/BlackForestBytes/goext/syncext" "github.com/rs/xid" "iter" "sync" "time" ) // PubSub is a simple Pub/Sub Broker // Clients can subscribe to a namespace and receive published messages on this namespace // Messages are broadcast to all subscribers type PubSub[TNamespace comparable, TData any] struct { masterLock *sync.Mutex subscriptions map[TNamespace][]*pubSubSubscription[TNamespace, TData] } type PubSubSubscription interface { Unsubscribe() } type pubSubSubscription[TNamespace comparable, TData any] struct { ID string parent *PubSub[TNamespace, TData] namespace TNamespace subLock *sync.Mutex Func func(TData) Chan chan TData UnsubChan chan bool } func (p *pubSubSubscription[TNamespace, TData]) Unsubscribe() { p.parent.unsubscribe(p) } func NewPubSub[TNamespace comparable, TData any](capacity int) *PubSub[TNamespace, TData] { return &PubSub[TNamespace, TData]{ masterLock: &sync.Mutex{}, subscriptions: make(map[TNamespace][]*pubSubSubscription[TNamespace, TData], capacity), } } func (ps *PubSub[TNamespace, TData]) Namespaces() []TNamespace { ps.masterLock.Lock() defer ps.masterLock.Unlock() return langext.MapKeyArr(ps.subscriptions) } func (ps *PubSub[TNamespace, TData]) SubscriberCount(ns TNamespace) int { ps.masterLock.Lock() defer ps.masterLock.Unlock() return len(ps.subscriptions[ns]) } // Publish sends `data` to all subscriber // But unbuffered - if one is currently not listening, we skip (the actualReceiver < subscriber) func (ps *PubSub[TNamespace, TData]) Publish(ns TNamespace, data TData) (subscriber int, actualReceiver int) { ps.masterLock.Lock() subs := langext.ArrCopy(ps.subscriptions[ns]) ps.masterLock.Unlock() subscriber = len(subs) actualReceiver = 0 for _, sub := range subs { func() { sub.subLock.Lock() defer sub.subLock.Unlock() if sub.Func != nil { go func() { sub.Func(data) }() actualReceiver++ } else if sub.Chan != nil { msgSent := syncext.WriteNonBlocking(sub.Chan, data) if msgSent { actualReceiver++ } } }() } return subscriber, actualReceiver } // PublishWithContext sends `data` to all subscriber // buffered - if one is currently not listening, we wait (but error out when the context runs out) func (ps *PubSub[TNamespace, TData]) PublishWithContext(ctx context.Context, ns TNamespace, data TData) (subscriber int, actualReceiver int, err error) { ps.masterLock.Lock() subs := langext.ArrCopy(ps.subscriptions[ns]) ps.masterLock.Unlock() subscriber = len(subs) actualReceiver = 0 for _, sub := range subs { err := func() error { sub.subLock.Lock() defer sub.subLock.Unlock() if err := ctx.Err(); err != nil { return err } if sub.Func != nil { go func() { sub.Func(data) }() actualReceiver++ } else if sub.Chan != nil { err := syncext.WriteChannelWithContext(ctx, sub.Chan, data) if err != nil { return err } actualReceiver++ } return nil }() if err != nil { return subscriber, actualReceiver, err } } return subscriber, actualReceiver, nil } // PublishWithTimeout sends `data` to all subscriber // buffered - if one is currently not listening, we wait (but wait at most `timeout` - if the timeout is exceeded then actualReceiver < subscriber) func (ps *PubSub[TNamespace, TData]) PublishWithTimeout(ns TNamespace, data TData, timeout time.Duration) (subscriber int, actualReceiver int) { ps.masterLock.Lock() subs := langext.ArrCopy(ps.subscriptions[ns]) ps.masterLock.Unlock() subscriber = len(subs) actualReceiver = 0 for _, sub := range subs { func() { sub.subLock.Lock() defer sub.subLock.Unlock() if sub.Func != nil { go func() { sub.Func(data) }() actualReceiver++ } else if sub.Chan != nil { ok := syncext.WriteChannelWithTimeout(sub.Chan, data, timeout) if ok { actualReceiver++ } } }() } return subscriber, actualReceiver } func (ps *PubSub[TNamespace, TData]) SubscribeByCallback(ns TNamespace, fn func(TData)) PubSubSubscription { ps.masterLock.Lock() defer ps.masterLock.Unlock() sub := &pubSubSubscription[TNamespace, TData]{ID: xid.New().String(), namespace: ns, parent: ps, subLock: &sync.Mutex{}, Func: fn, UnsubChan: nil} ps.subscriptions[ns] = append(ps.subscriptions[ns], sub) return sub } func (ps *PubSub[TNamespace, TData]) SubscribeByChan(ns TNamespace, chanBufferSize int) (chan TData, PubSubSubscription) { ps.masterLock.Lock() defer ps.masterLock.Unlock() msgCh := make(chan TData, chanBufferSize) sub := &pubSubSubscription[TNamespace, TData]{ID: xid.New().String(), namespace: ns, parent: ps, subLock: &sync.Mutex{}, Chan: msgCh, UnsubChan: nil} ps.subscriptions[ns] = append(ps.subscriptions[ns], sub) return msgCh, sub } func (ps *PubSub[TNamespace, TData]) SubscribeByIter(ns TNamespace, chanBufferSize int) (iter.Seq[TData], PubSubSubscription) { ps.masterLock.Lock() defer ps.masterLock.Unlock() msgCh := make(chan TData, chanBufferSize) unsubChan := make(chan bool, 8) sub := &pubSubSubscription[TNamespace, TData]{ID: xid.New().String(), namespace: ns, parent: ps, subLock: &sync.Mutex{}, Chan: msgCh, UnsubChan: unsubChan} ps.subscriptions[ns] = append(ps.subscriptions[ns], sub) iterFun := func(yield func(TData) bool) { for { select { case msg := <-msgCh: if !yield(msg) { sub.Unsubscribe() return } case <-sub.UnsubChan: sub.Unsubscribe() return } } } return iterFun, sub } func (ps *PubSub[TNamespace, TData]) unsubscribe(p *pubSubSubscription[TNamespace, TData]) { ps.masterLock.Lock() defer ps.masterLock.Unlock() p.subLock.Lock() defer p.subLock.Unlock() if p.Chan != nil { close(p.Chan) p.Chan = nil } if p.UnsubChan != nil { syncext.WriteNonBlocking(p.UnsubChan, true) close(p.UnsubChan) p.UnsubChan = nil } ps.subscriptions[p.namespace] = langext.ArrFilter(ps.subscriptions[p.namespace], func(v *pubSubSubscription[TNamespace, TData]) bool { return v.ID != p.ID }) if len(ps.subscriptions[p.namespace]) == 0 { delete(ps.subscriptions, p.namespace) } }