diff --git a/dataext/pubsub.go b/dataext/pubsub.go index 2eb643f..47d5a89 100644 --- a/dataext/pubsub.go +++ b/dataext/pubsub.go @@ -13,49 +13,49 @@ import ( // PubSub is a simple Pub/Sub Broker // Clients can subscribe to a namespace and receive published messages on this namespace // Messages are broadcast to all subscribers -type PubSub[T any] struct { +type PubSub[TNamespace comparable, TData any] struct { masterLock *sync.Mutex - subscriptions map[string][]*pubSubSubscription[T] + subscriptions map[TNamespace][]*pubSubSubscription[TNamespace, TData] } type PubSubSubscription interface { Unsubscribe() } -type pubSubSubscription[T any] struct { +type pubSubSubscription[TNamespace comparable, TData any] struct { ID string - parent *PubSub[T] - namespace string + parent *PubSub[TNamespace, TData] + namespace TNamespace subLock *sync.Mutex - Func func(T) - Chan chan T + Func func(TData) + Chan chan TData UnsubChan chan bool } -func (p *pubSubSubscription[T]) Unsubscribe() { +func (p *pubSubSubscription[TNamespace, TData]) Unsubscribe() { p.parent.unsubscribe(p) } -func NewPubSub[T any](capacity int) *PubSub[T] { - return &PubSub[T]{ +func NewPubSub[TNamespace comparable, TData any](capacity int) *PubSub[TNamespace, TData] { + return &PubSub[TNamespace, TData]{ masterLock: &sync.Mutex{}, - subscriptions: make(map[string][]*pubSubSubscription[T], capacity), + subscriptions: make(map[TNamespace][]*pubSubSubscription[TNamespace, TData], capacity), } } -func (ps *PubSub[T]) Namespaces() []string { +func (ps *PubSub[TNamespace, TData]) Namespaces() []TNamespace { ps.masterLock.Lock() defer ps.masterLock.Unlock() return langext.MapKeyArr(ps.subscriptions) } -func (ps *PubSub[T]) SubscriberCount(ns string) int { +func (ps *PubSub[TNamespace, TData]) SubscriberCount(ns TNamespace) int { ps.masterLock.Lock() defer ps.masterLock.Unlock() @@ -64,7 +64,7 @@ func (ps *PubSub[T]) SubscriberCount(ns string) int { // Publish sends `data` to all subscriber // But unbuffered - if one is currently not listening, we skip (the actualReceiver < subscriber) -func (ps *PubSub[T]) Publish(ns string, data T) (subscriber int, actualReceiver int) { +func (ps *PubSub[TNamespace, TData]) Publish(ns TNamespace, data TData) (subscriber int, actualReceiver int) { ps.masterLock.Lock() subs := langext.ArrCopy(ps.subscriptions[ns]) ps.masterLock.Unlock() @@ -94,7 +94,7 @@ func (ps *PubSub[T]) Publish(ns string, data T) (subscriber int, actualReceiver // PublishWithContext sends `data` to all subscriber // buffered - if one is currently not listening, we wait (but error out when the context runs out) -func (ps *PubSub[T]) PublishWithContext(ctx context.Context, ns string, data T) (subscriber int, actualReceiver int, err error) { +func (ps *PubSub[TNamespace, TData]) PublishWithContext(ctx context.Context, ns TNamespace, data TData) (subscriber int, actualReceiver int, err error) { ps.masterLock.Lock() subs := langext.ArrCopy(ps.subscriptions[ns]) ps.masterLock.Unlock() @@ -134,7 +134,7 @@ func (ps *PubSub[T]) PublishWithContext(ctx context.Context, ns string, data T) // PublishWithTimeout sends `data` to all subscriber // buffered - if one is currently not listening, we wait (but wait at most `timeout` - if the timeout is exceeded then actualReceiver < subscriber) -func (ps *PubSub[T]) PublishWithTimeout(ns string, data T, timeout time.Duration) (subscriber int, actualReceiver int) { +func (ps *PubSub[TNamespace, TData]) PublishWithTimeout(ns TNamespace, data TData, timeout time.Duration) (subscriber int, actualReceiver int) { ps.masterLock.Lock() subs := langext.ArrCopy(ps.subscriptions[ns]) ps.masterLock.Unlock() @@ -162,42 +162,42 @@ func (ps *PubSub[T]) PublishWithTimeout(ns string, data T, timeout time.Duration return subscriber, actualReceiver } -func (ps *PubSub[T]) SubscribeByCallback(ns string, fn func(T)) PubSubSubscription { +func (ps *PubSub[TNamespace, TData]) SubscribeByCallback(ns TNamespace, fn func(TData)) PubSubSubscription { ps.masterLock.Lock() defer ps.masterLock.Unlock() - sub := &pubSubSubscription[T]{ID: xid.New().String(), namespace: ns, parent: ps, subLock: &sync.Mutex{}, Func: fn, UnsubChan: nil} + sub := &pubSubSubscription[TNamespace, TData]{ID: xid.New().String(), namespace: ns, parent: ps, subLock: &sync.Mutex{}, Func: fn, UnsubChan: nil} ps.subscriptions[ns] = append(ps.subscriptions[ns], sub) return sub } -func (ps *PubSub[T]) SubscribeByChan(ns string, chanBufferSize int) (chan T, PubSubSubscription) { +func (ps *PubSub[TNamespace, TData]) SubscribeByChan(ns TNamespace, chanBufferSize int) (chan TData, PubSubSubscription) { ps.masterLock.Lock() defer ps.masterLock.Unlock() - msgCh := make(chan T, chanBufferSize) + msgCh := make(chan TData, chanBufferSize) - sub := &pubSubSubscription[T]{ID: xid.New().String(), namespace: ns, parent: ps, subLock: &sync.Mutex{}, Chan: msgCh, UnsubChan: nil} + sub := &pubSubSubscription[TNamespace, TData]{ID: xid.New().String(), namespace: ns, parent: ps, subLock: &sync.Mutex{}, Chan: msgCh, UnsubChan: nil} ps.subscriptions[ns] = append(ps.subscriptions[ns], sub) return msgCh, sub } -func (ps *PubSub[T]) SubscribeByIter(ns string, chanBufferSize int) (iter.Seq[T], PubSubSubscription) { +func (ps *PubSub[TNamespace, TData]) SubscribeByIter(ns TNamespace, chanBufferSize int) (iter.Seq[TData], PubSubSubscription) { ps.masterLock.Lock() defer ps.masterLock.Unlock() - msgCh := make(chan T, chanBufferSize) + msgCh := make(chan TData, chanBufferSize) unsubChan := make(chan bool, 8) - sub := &pubSubSubscription[T]{ID: xid.New().String(), namespace: ns, parent: ps, subLock: &sync.Mutex{}, Chan: msgCh, UnsubChan: unsubChan} + sub := &pubSubSubscription[TNamespace, TData]{ID: xid.New().String(), namespace: ns, parent: ps, subLock: &sync.Mutex{}, Chan: msgCh, UnsubChan: unsubChan} ps.subscriptions[ns] = append(ps.subscriptions[ns], sub) - iterFun := func(yield func(T) bool) { + iterFun := func(yield func(TData) bool) { for { select { case msg := <-msgCh: @@ -215,7 +215,7 @@ func (ps *PubSub[T]) SubscribeByIter(ns string, chanBufferSize int) (iter.Seq[T] return iterFun, sub } -func (ps *PubSub[T]) unsubscribe(p *pubSubSubscription[T]) { +func (ps *PubSub[TNamespace, TData]) unsubscribe(p *pubSubSubscription[TNamespace, TData]) { ps.masterLock.Lock() defer ps.masterLock.Unlock() @@ -232,7 +232,7 @@ func (ps *PubSub[T]) unsubscribe(p *pubSubSubscription[T]) { p.UnsubChan = nil } - ps.subscriptions[p.namespace] = langext.ArrFilter(ps.subscriptions[p.namespace], func(v *pubSubSubscription[T]) bool { + ps.subscriptions[p.namespace] = langext.ArrFilter(ps.subscriptions[p.namespace], func(v *pubSubSubscription[TNamespace, TData]) bool { return v.ID != p.ID }) if len(ps.subscriptions[p.namespace]) == 0 { diff --git a/dataext/pubsub_test.go b/dataext/pubsub_test.go index 081cc4e..df935a3 100644 --- a/dataext/pubsub_test.go +++ b/dataext/pubsub_test.go @@ -8,7 +8,7 @@ import ( ) func TestNewPubSub(t *testing.T) { - ps := NewPubSub[string](10) + ps := NewPubSub[string, string](10) if ps == nil { t.Fatal("NewPubSub returned nil") } @@ -21,7 +21,7 @@ func TestNewPubSub(t *testing.T) { } func TestPubSub_Namespaces(t *testing.T) { - ps := NewPubSub[string](10) + ps := NewPubSub[string, string](10) // Initially no namespaces namespaces := ps.Namespaces() @@ -60,7 +60,7 @@ func TestPubSub_Namespaces(t *testing.T) { } func TestPubSub_SubscribeByCallback(t *testing.T) { - ps := NewPubSub[string](10) + ps := NewPubSub[string, string](10) var received string var wg sync.WaitGroup @@ -94,7 +94,7 @@ func TestPubSub_SubscribeByCallback(t *testing.T) { } func TestPubSub_SubscribeByChan(t *testing.T) { - ps := NewPubSub[string](10) + ps := NewPubSub[string, string](10) ch, sub := ps.SubscribeByChan("test-ns", 1) defer sub.Unsubscribe() @@ -122,7 +122,7 @@ func TestPubSub_SubscribeByChan(t *testing.T) { } func TestPubSub_SubscribeByIter(t *testing.T) { - ps := NewPubSub[string](10) + ps := NewPubSub[string, string](10) iterSeq, sub := ps.SubscribeByIter("test-ns", 1) defer sub.Unsubscribe() @@ -165,7 +165,7 @@ func TestPubSub_SubscribeByIter(t *testing.T) { } func TestPubSub_Publish(t *testing.T) { - ps := NewPubSub[string](10) + ps := NewPubSub[string, string](10) // Test publishing to a namespace with no subscribers subs, receivers := ps.Publish("empty-ns", "hello") @@ -215,7 +215,7 @@ func TestPubSub_Publish(t *testing.T) { } func TestPubSub_PublishWithTimeout(t *testing.T) { - ps := NewPubSub[string](10) + ps := NewPubSub[string, string](10) // Add a subscriber with a channel ch, sub := ps.SubscribeByChan("test-ns", 1) @@ -262,7 +262,7 @@ func TestPubSub_PublishWithTimeout(t *testing.T) { } func TestPubSub_PublishWithContext(t *testing.T) { - ps := NewPubSub[string](10) + ps := NewPubSub[string, string](10) // Add a subscriber with a channel ch, sub := ps.SubscribeByChan("test-ns", 1) @@ -328,7 +328,7 @@ func TestPubSub_PublishWithContext(t *testing.T) { } func TestPubSub_Unsubscribe(t *testing.T) { - ps := NewPubSub[string](10) + ps := NewPubSub[string, string](10) // Add a subscriber ch, sub := ps.SubscribeByChan("test-ns", 1) @@ -372,7 +372,7 @@ func TestPubSub_Unsubscribe(t *testing.T) { } func TestPubSub_MultipleSubscribers(t *testing.T) { - ps := NewPubSub[string](10) + ps := NewPubSub[string, string](10) // Add multiple subscribers ch1, sub1 := ps.SubscribeByChan("test-ns", 1) diff --git a/goextVersion.go b/goextVersion.go index 3595348..e06a89e 100644 --- a/goextVersion.go +++ b/goextVersion.go @@ -1,5 +1,5 @@ package goext -const GoextVersion = "0.0.592" +const GoextVersion = "0.0.593" -const GoextVersionTimestamp = "2025-07-16T12:46:18+0200" +const GoextVersionTimestamp = "2025-07-16T12:50:36+0200"