Compare commits
	
		
			1 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 254fe1556a | 
| @@ -13,49 +13,49 @@ import ( | |||||||
| // PubSub is a simple Pub/Sub Broker | // PubSub is a simple Pub/Sub Broker | ||||||
| // Clients can subscribe to a namespace and receive published messages on this namespace | // Clients can subscribe to a namespace and receive published messages on this namespace | ||||||
| // Messages are broadcast to all subscribers | // Messages are broadcast to all subscribers | ||||||
| type PubSub[T any] struct { | type PubSub[TNamespace comparable, TData any] struct { | ||||||
| 	masterLock *sync.Mutex | 	masterLock *sync.Mutex | ||||||
|  |  | ||||||
| 	subscriptions map[string][]*pubSubSubscription[T] | 	subscriptions map[TNamespace][]*pubSubSubscription[TNamespace, TData] | ||||||
| } | } | ||||||
|  |  | ||||||
| type PubSubSubscription interface { | type PubSubSubscription interface { | ||||||
| 	Unsubscribe() | 	Unsubscribe() | ||||||
| } | } | ||||||
|  |  | ||||||
| type pubSubSubscription[T any] struct { | type pubSubSubscription[TNamespace comparable, TData any] struct { | ||||||
| 	ID string | 	ID string | ||||||
|  |  | ||||||
| 	parent    *PubSub[T] | 	parent    *PubSub[TNamespace, TData] | ||||||
| 	namespace string | 	namespace TNamespace | ||||||
|  |  | ||||||
| 	subLock *sync.Mutex | 	subLock *sync.Mutex | ||||||
|  |  | ||||||
| 	Func func(T) | 	Func func(TData) | ||||||
| 	Chan chan T | 	Chan chan TData | ||||||
|  |  | ||||||
| 	UnsubChan chan bool | 	UnsubChan chan bool | ||||||
| } | } | ||||||
|  |  | ||||||
| func (p *pubSubSubscription[T]) Unsubscribe() { | func (p *pubSubSubscription[TNamespace, TData]) Unsubscribe() { | ||||||
| 	p.parent.unsubscribe(p) | 	p.parent.unsubscribe(p) | ||||||
| } | } | ||||||
|  |  | ||||||
| func NewPubSub[T any](capacity int) *PubSub[T] { | func NewPubSub[TNamespace comparable, TData any](capacity int) *PubSub[TNamespace, TData] { | ||||||
| 	return &PubSub[T]{ | 	return &PubSub[TNamespace, TData]{ | ||||||
| 		masterLock:    &sync.Mutex{}, | 		masterLock:    &sync.Mutex{}, | ||||||
| 		subscriptions: make(map[string][]*pubSubSubscription[T], capacity), | 		subscriptions: make(map[TNamespace][]*pubSubSubscription[TNamespace, TData], capacity), | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func (ps *PubSub[T]) Namespaces() []string { | func (ps *PubSub[TNamespace, TData]) Namespaces() []TNamespace { | ||||||
| 	ps.masterLock.Lock() | 	ps.masterLock.Lock() | ||||||
| 	defer ps.masterLock.Unlock() | 	defer ps.masterLock.Unlock() | ||||||
|  |  | ||||||
| 	return langext.MapKeyArr(ps.subscriptions) | 	return langext.MapKeyArr(ps.subscriptions) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (ps *PubSub[T]) SubscriberCount(ns string) int { | func (ps *PubSub[TNamespace, TData]) SubscriberCount(ns TNamespace) int { | ||||||
| 	ps.masterLock.Lock() | 	ps.masterLock.Lock() | ||||||
| 	defer ps.masterLock.Unlock() | 	defer ps.masterLock.Unlock() | ||||||
|  |  | ||||||
| @@ -64,7 +64,7 @@ func (ps *PubSub[T]) SubscriberCount(ns string) int { | |||||||
|  |  | ||||||
| // Publish sends `data` to all subscriber | // Publish sends `data` to all subscriber | ||||||
| // But unbuffered - if one is currently not listening, we skip (the actualReceiver < subscriber) | // But unbuffered - if one is currently not listening, we skip (the actualReceiver < subscriber) | ||||||
| func (ps *PubSub[T]) Publish(ns string, data T) (subscriber int, actualReceiver int) { | func (ps *PubSub[TNamespace, TData]) Publish(ns TNamespace, data TData) (subscriber int, actualReceiver int) { | ||||||
| 	ps.masterLock.Lock() | 	ps.masterLock.Lock() | ||||||
| 	subs := langext.ArrCopy(ps.subscriptions[ns]) | 	subs := langext.ArrCopy(ps.subscriptions[ns]) | ||||||
| 	ps.masterLock.Unlock() | 	ps.masterLock.Unlock() | ||||||
| @@ -94,7 +94,7 @@ func (ps *PubSub[T]) Publish(ns string, data T) (subscriber int, actualReceiver | |||||||
|  |  | ||||||
| // PublishWithContext sends `data` to all subscriber | // PublishWithContext sends `data` to all subscriber | ||||||
| // buffered - if one is currently not listening, we wait (but error out when the context runs out) | // buffered - if one is currently not listening, we wait (but error out when the context runs out) | ||||||
| func (ps *PubSub[T]) PublishWithContext(ctx context.Context, ns string, data T) (subscriber int, actualReceiver int, err error) { | func (ps *PubSub[TNamespace, TData]) PublishWithContext(ctx context.Context, ns TNamespace, data TData) (subscriber int, actualReceiver int, err error) { | ||||||
| 	ps.masterLock.Lock() | 	ps.masterLock.Lock() | ||||||
| 	subs := langext.ArrCopy(ps.subscriptions[ns]) | 	subs := langext.ArrCopy(ps.subscriptions[ns]) | ||||||
| 	ps.masterLock.Unlock() | 	ps.masterLock.Unlock() | ||||||
| @@ -134,7 +134,7 @@ func (ps *PubSub[T]) PublishWithContext(ctx context.Context, ns string, data T) | |||||||
|  |  | ||||||
| // PublishWithTimeout sends `data` to all subscriber | // PublishWithTimeout sends `data` to all subscriber | ||||||
| // buffered - if one is currently not listening, we wait (but wait at most `timeout` - if the timeout is exceeded then actualReceiver < subscriber) | // buffered - if one is currently not listening, we wait (but wait at most `timeout` - if the timeout is exceeded then actualReceiver < subscriber) | ||||||
| func (ps *PubSub[T]) PublishWithTimeout(ns string, data T, timeout time.Duration) (subscriber int, actualReceiver int) { | func (ps *PubSub[TNamespace, TData]) PublishWithTimeout(ns TNamespace, data TData, timeout time.Duration) (subscriber int, actualReceiver int) { | ||||||
| 	ps.masterLock.Lock() | 	ps.masterLock.Lock() | ||||||
| 	subs := langext.ArrCopy(ps.subscriptions[ns]) | 	subs := langext.ArrCopy(ps.subscriptions[ns]) | ||||||
| 	ps.masterLock.Unlock() | 	ps.masterLock.Unlock() | ||||||
| @@ -162,42 +162,42 @@ func (ps *PubSub[T]) PublishWithTimeout(ns string, data T, timeout time.Duration | |||||||
| 	return subscriber, actualReceiver | 	return subscriber, actualReceiver | ||||||
| } | } | ||||||
|  |  | ||||||
| func (ps *PubSub[T]) SubscribeByCallback(ns string, fn func(T)) PubSubSubscription { | func (ps *PubSub[TNamespace, TData]) SubscribeByCallback(ns TNamespace, fn func(TData)) PubSubSubscription { | ||||||
| 	ps.masterLock.Lock() | 	ps.masterLock.Lock() | ||||||
| 	defer ps.masterLock.Unlock() | 	defer ps.masterLock.Unlock() | ||||||
|  |  | ||||||
| 	sub := &pubSubSubscription[T]{ID: xid.New().String(), namespace: ns, parent: ps, subLock: &sync.Mutex{}, Func: fn, UnsubChan: nil} | 	sub := &pubSubSubscription[TNamespace, TData]{ID: xid.New().String(), namespace: ns, parent: ps, subLock: &sync.Mutex{}, Func: fn, UnsubChan: nil} | ||||||
|  |  | ||||||
| 	ps.subscriptions[ns] = append(ps.subscriptions[ns], sub) | 	ps.subscriptions[ns] = append(ps.subscriptions[ns], sub) | ||||||
|  |  | ||||||
| 	return sub | 	return sub | ||||||
| } | } | ||||||
|  |  | ||||||
| func (ps *PubSub[T]) SubscribeByChan(ns string, chanBufferSize int) (chan T, PubSubSubscription) { | func (ps *PubSub[TNamespace, TData]) SubscribeByChan(ns TNamespace, chanBufferSize int) (chan TData, PubSubSubscription) { | ||||||
| 	ps.masterLock.Lock() | 	ps.masterLock.Lock() | ||||||
| 	defer ps.masterLock.Unlock() | 	defer ps.masterLock.Unlock() | ||||||
|  |  | ||||||
| 	msgCh := make(chan T, chanBufferSize) | 	msgCh := make(chan TData, chanBufferSize) | ||||||
|  |  | ||||||
| 	sub := &pubSubSubscription[T]{ID: xid.New().String(), namespace: ns, parent: ps, subLock: &sync.Mutex{}, Chan: msgCh, UnsubChan: nil} | 	sub := &pubSubSubscription[TNamespace, TData]{ID: xid.New().String(), namespace: ns, parent: ps, subLock: &sync.Mutex{}, Chan: msgCh, UnsubChan: nil} | ||||||
|  |  | ||||||
| 	ps.subscriptions[ns] = append(ps.subscriptions[ns], sub) | 	ps.subscriptions[ns] = append(ps.subscriptions[ns], sub) | ||||||
|  |  | ||||||
| 	return msgCh, sub | 	return msgCh, sub | ||||||
| } | } | ||||||
|  |  | ||||||
| func (ps *PubSub[T]) SubscribeByIter(ns string, chanBufferSize int) (iter.Seq[T], PubSubSubscription) { | func (ps *PubSub[TNamespace, TData]) SubscribeByIter(ns TNamespace, chanBufferSize int) (iter.Seq[TData], PubSubSubscription) { | ||||||
| 	ps.masterLock.Lock() | 	ps.masterLock.Lock() | ||||||
| 	defer ps.masterLock.Unlock() | 	defer ps.masterLock.Unlock() | ||||||
|  |  | ||||||
| 	msgCh := make(chan T, chanBufferSize) | 	msgCh := make(chan TData, chanBufferSize) | ||||||
| 	unsubChan := make(chan bool, 8) | 	unsubChan := make(chan bool, 8) | ||||||
|  |  | ||||||
| 	sub := &pubSubSubscription[T]{ID: xid.New().String(), namespace: ns, parent: ps, subLock: &sync.Mutex{}, Chan: msgCh, UnsubChan: unsubChan} | 	sub := &pubSubSubscription[TNamespace, TData]{ID: xid.New().String(), namespace: ns, parent: ps, subLock: &sync.Mutex{}, Chan: msgCh, UnsubChan: unsubChan} | ||||||
|  |  | ||||||
| 	ps.subscriptions[ns] = append(ps.subscriptions[ns], sub) | 	ps.subscriptions[ns] = append(ps.subscriptions[ns], sub) | ||||||
|  |  | ||||||
| 	iterFun := func(yield func(T) bool) { | 	iterFun := func(yield func(TData) bool) { | ||||||
| 		for { | 		for { | ||||||
| 			select { | 			select { | ||||||
| 			case msg := <-msgCh: | 			case msg := <-msgCh: | ||||||
| @@ -215,7 +215,7 @@ func (ps *PubSub[T]) SubscribeByIter(ns string, chanBufferSize int) (iter.Seq[T] | |||||||
| 	return iterFun, sub | 	return iterFun, sub | ||||||
| } | } | ||||||
|  |  | ||||||
| func (ps *PubSub[T]) unsubscribe(p *pubSubSubscription[T]) { | func (ps *PubSub[TNamespace, TData]) unsubscribe(p *pubSubSubscription[TNamespace, TData]) { | ||||||
| 	ps.masterLock.Lock() | 	ps.masterLock.Lock() | ||||||
| 	defer ps.masterLock.Unlock() | 	defer ps.masterLock.Unlock() | ||||||
|  |  | ||||||
| @@ -232,7 +232,7 @@ func (ps *PubSub[T]) unsubscribe(p *pubSubSubscription[T]) { | |||||||
| 		p.UnsubChan = nil | 		p.UnsubChan = nil | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	ps.subscriptions[p.namespace] = langext.ArrFilter(ps.subscriptions[p.namespace], func(v *pubSubSubscription[T]) bool { | 	ps.subscriptions[p.namespace] = langext.ArrFilter(ps.subscriptions[p.namespace], func(v *pubSubSubscription[TNamespace, TData]) bool { | ||||||
| 		return v.ID != p.ID | 		return v.ID != p.ID | ||||||
| 	}) | 	}) | ||||||
| 	if len(ps.subscriptions[p.namespace]) == 0 { | 	if len(ps.subscriptions[p.namespace]) == 0 { | ||||||
|   | |||||||
| @@ -8,7 +8,7 @@ import ( | |||||||
| ) | ) | ||||||
|  |  | ||||||
| func TestNewPubSub(t *testing.T) { | func TestNewPubSub(t *testing.T) { | ||||||
| 	ps := NewPubSub[string](10) | 	ps := NewPubSub[string, string](10) | ||||||
| 	if ps == nil { | 	if ps == nil { | ||||||
| 		t.Fatal("NewPubSub returned nil") | 		t.Fatal("NewPubSub returned nil") | ||||||
| 	} | 	} | ||||||
| @@ -21,7 +21,7 @@ func TestNewPubSub(t *testing.T) { | |||||||
| } | } | ||||||
|  |  | ||||||
| func TestPubSub_Namespaces(t *testing.T) { | func TestPubSub_Namespaces(t *testing.T) { | ||||||
| 	ps := NewPubSub[string](10) | 	ps := NewPubSub[string, string](10) | ||||||
|  |  | ||||||
| 	// Initially no namespaces | 	// Initially no namespaces | ||||||
| 	namespaces := ps.Namespaces() | 	namespaces := ps.Namespaces() | ||||||
| @@ -60,7 +60,7 @@ func TestPubSub_Namespaces(t *testing.T) { | |||||||
| } | } | ||||||
|  |  | ||||||
| func TestPubSub_SubscribeByCallback(t *testing.T) { | func TestPubSub_SubscribeByCallback(t *testing.T) { | ||||||
| 	ps := NewPubSub[string](10) | 	ps := NewPubSub[string, string](10) | ||||||
|  |  | ||||||
| 	var received string | 	var received string | ||||||
| 	var wg sync.WaitGroup | 	var wg sync.WaitGroup | ||||||
| @@ -94,7 +94,7 @@ func TestPubSub_SubscribeByCallback(t *testing.T) { | |||||||
| } | } | ||||||
|  |  | ||||||
| func TestPubSub_SubscribeByChan(t *testing.T) { | func TestPubSub_SubscribeByChan(t *testing.T) { | ||||||
| 	ps := NewPubSub[string](10) | 	ps := NewPubSub[string, string](10) | ||||||
|  |  | ||||||
| 	ch, sub := ps.SubscribeByChan("test-ns", 1) | 	ch, sub := ps.SubscribeByChan("test-ns", 1) | ||||||
| 	defer sub.Unsubscribe() | 	defer sub.Unsubscribe() | ||||||
| @@ -122,7 +122,7 @@ func TestPubSub_SubscribeByChan(t *testing.T) { | |||||||
| } | } | ||||||
|  |  | ||||||
| func TestPubSub_SubscribeByIter(t *testing.T) { | func TestPubSub_SubscribeByIter(t *testing.T) { | ||||||
| 	ps := NewPubSub[string](10) | 	ps := NewPubSub[string, string](10) | ||||||
|  |  | ||||||
| 	iterSeq, sub := ps.SubscribeByIter("test-ns", 1) | 	iterSeq, sub := ps.SubscribeByIter("test-ns", 1) | ||||||
| 	defer sub.Unsubscribe() | 	defer sub.Unsubscribe() | ||||||
| @@ -165,7 +165,7 @@ func TestPubSub_SubscribeByIter(t *testing.T) { | |||||||
| } | } | ||||||
|  |  | ||||||
| func TestPubSub_Publish(t *testing.T) { | func TestPubSub_Publish(t *testing.T) { | ||||||
| 	ps := NewPubSub[string](10) | 	ps := NewPubSub[string, string](10) | ||||||
|  |  | ||||||
| 	// Test publishing to a namespace with no subscribers | 	// Test publishing to a namespace with no subscribers | ||||||
| 	subs, receivers := ps.Publish("empty-ns", "hello") | 	subs, receivers := ps.Publish("empty-ns", "hello") | ||||||
| @@ -215,7 +215,7 @@ func TestPubSub_Publish(t *testing.T) { | |||||||
| } | } | ||||||
|  |  | ||||||
| func TestPubSub_PublishWithTimeout(t *testing.T) { | func TestPubSub_PublishWithTimeout(t *testing.T) { | ||||||
| 	ps := NewPubSub[string](10) | 	ps := NewPubSub[string, string](10) | ||||||
|  |  | ||||||
| 	// Add a subscriber with a channel | 	// Add a subscriber with a channel | ||||||
| 	ch, sub := ps.SubscribeByChan("test-ns", 1) | 	ch, sub := ps.SubscribeByChan("test-ns", 1) | ||||||
| @@ -262,7 +262,7 @@ func TestPubSub_PublishWithTimeout(t *testing.T) { | |||||||
| } | } | ||||||
|  |  | ||||||
| func TestPubSub_PublishWithContext(t *testing.T) { | func TestPubSub_PublishWithContext(t *testing.T) { | ||||||
| 	ps := NewPubSub[string](10) | 	ps := NewPubSub[string, string](10) | ||||||
|  |  | ||||||
| 	// Add a subscriber with a channel | 	// Add a subscriber with a channel | ||||||
| 	ch, sub := ps.SubscribeByChan("test-ns", 1) | 	ch, sub := ps.SubscribeByChan("test-ns", 1) | ||||||
| @@ -328,7 +328,7 @@ func TestPubSub_PublishWithContext(t *testing.T) { | |||||||
| } | } | ||||||
|  |  | ||||||
| func TestPubSub_Unsubscribe(t *testing.T) { | func TestPubSub_Unsubscribe(t *testing.T) { | ||||||
| 	ps := NewPubSub[string](10) | 	ps := NewPubSub[string, string](10) | ||||||
|  |  | ||||||
| 	// Add a subscriber | 	// Add a subscriber | ||||||
| 	ch, sub := ps.SubscribeByChan("test-ns", 1) | 	ch, sub := ps.SubscribeByChan("test-ns", 1) | ||||||
| @@ -372,7 +372,7 @@ func TestPubSub_Unsubscribe(t *testing.T) { | |||||||
| } | } | ||||||
|  |  | ||||||
| func TestPubSub_MultipleSubscribers(t *testing.T) { | func TestPubSub_MultipleSubscribers(t *testing.T) { | ||||||
| 	ps := NewPubSub[string](10) | 	ps := NewPubSub[string, string](10) | ||||||
|  |  | ||||||
| 	// Add multiple subscribers | 	// Add multiple subscribers | ||||||
| 	ch1, sub1 := ps.SubscribeByChan("test-ns", 1) | 	ch1, sub1 := ps.SubscribeByChan("test-ns", 1) | ||||||
|   | |||||||
| @@ -1,5 +1,5 @@ | |||||||
| package goext | package goext | ||||||
|  |  | ||||||
| const GoextVersion = "0.0.592" | const GoextVersion = "0.0.593" | ||||||
|  |  | ||||||
| const GoextVersionTimestamp = "2025-07-16T12:46:18+0200" | const GoextVersionTimestamp = "2025-07-16T12:50:36+0200" | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user