From 64f2cd7219277a8334effb0a6746cdf8c03b30a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mike=20Schw=C3=B6rer?= Date: Wed, 16 Jul 2025 12:44:55 +0200 Subject: [PATCH] v0.0.591 implement namespaced PubSub Broker in dataext --- dataext/pubsub.go | 238 +++++++++++++++++++++++ dataext/pubsub_test.go | 428 +++++++++++++++++++++++++++++++++++++++++ go.mod | 2 +- goextVersion.go | 4 +- syncext/channel.go | 10 + 5 files changed, 679 insertions(+), 3 deletions(-) create mode 100644 dataext/pubsub.go create mode 100644 dataext/pubsub_test.go diff --git a/dataext/pubsub.go b/dataext/pubsub.go new file mode 100644 index 0000000..71ccad8 --- /dev/null +++ b/dataext/pubsub.go @@ -0,0 +1,238 @@ +package dataext + +import ( + "context" + "git.blackforestbytes.com/BlackForestBytes/goext/langext" + "git.blackforestbytes.com/BlackForestBytes/goext/syncext" + "github.com/rs/xid" + "iter" + "sync" + "time" +) + +type PubSubSubscription interface { + Unsubscribe() +} + +type pubSubSubscription[T any] struct { + ID string + + parent *PubSub[T] + namespace string + + subLock *sync.Mutex + + Func func(T) + Chan chan T + + UnsubChan chan bool +} + +func (p *pubSubSubscription[T]) Unsubscribe() { + p.parent.unsubscribe(p) +} + +type PubSub[T any] struct { + masterLock *sync.Mutex + + subscriptions map[string][]*pubSubSubscription[T] +} + +func NewPubSub[T any](capacity int) *PubSub[T] { + return &PubSub[T]{ + masterLock: &sync.Mutex{}, + subscriptions: make(map[string][]*pubSubSubscription[T], capacity), + } +} + +func (ps *PubSub[T]) Namespaces() []string { + ps.masterLock.Lock() + defer ps.masterLock.Unlock() + + return langext.MapKeyArr(ps.subscriptions) +} + +func (ps *PubSub[T]) SubscriberCount(ns string) int { + ps.masterLock.Lock() + defer ps.masterLock.Unlock() + + return len(ps.subscriptions[ns]) +} + +// Publish sends `data` to all subscriber +// But unbuffered - if one is currently not listening, we skip (the actualReceiver < subscriber) +func (ps *PubSub[T]) Publish(ns string, data T) (subscriber int, actualReceiver int) { + ps.masterLock.Lock() + subs := langext.ArrCopy(ps.subscriptions[ns]) + ps.masterLock.Unlock() + + subscriber = len(subs) + actualReceiver = 0 + + for _, sub := range subs { + func() { + sub.subLock.Lock() + defer sub.subLock.Unlock() + + if sub.Func != nil { + go func() { sub.Func(data) }() + actualReceiver++ + } else if sub.Chan != nil { + msgSent := syncext.WriteNonBlocking(sub.Chan, data) + if msgSent { + actualReceiver++ + } + } + }() + } + + return subscriber, actualReceiver +} + +// PublishWithContext sends `data` to all subscriber +// buffered - if one is currently not listening, we wait (but error out when the context runs out) +func (ps *PubSub[T]) PublishWithContext(ctx context.Context, ns string, data T) (subscriber int, actualReceiver int, err error) { + ps.masterLock.Lock() + subs := langext.ArrCopy(ps.subscriptions[ns]) + ps.masterLock.Unlock() + + subscriber = len(subs) + actualReceiver = 0 + + for _, sub := range subs { + err := func() error { + sub.subLock.Lock() + defer sub.subLock.Unlock() + + if err := ctx.Err(); err != nil { + return err + } + + if sub.Func != nil { + go func() { sub.Func(data) }() + actualReceiver++ + } else if sub.Chan != nil { + err := syncext.WriteChannelWithContext(ctx, sub.Chan, data) + if err != nil { + return err + } + actualReceiver++ + } + + return nil + }() + if err != nil { + return subscriber, actualReceiver, err + } + } + + return subscriber, actualReceiver, nil +} + +// PublishWithTimeout sends `data` to all subscriber +// buffered - if one is currently not listening, we wait (but wait at most `timeout` - if the timeout is exceeded then actualReceiver < subscriber) +func (ps *PubSub[T]) PublishWithTimeout(ns string, data T, timeout time.Duration) (subscriber int, actualReceiver int) { + ps.masterLock.Lock() + subs := langext.ArrCopy(ps.subscriptions[ns]) + ps.masterLock.Unlock() + + subscriber = len(subs) + actualReceiver = 0 + + for _, sub := range subs { + func() { + sub.subLock.Lock() + defer sub.subLock.Unlock() + + if sub.Func != nil { + go func() { sub.Func(data) }() + actualReceiver++ + } else if sub.Chan != nil { + ok := syncext.WriteChannelWithTimeout(sub.Chan, data, timeout) + if ok { + actualReceiver++ + } + } + }() + } + + return subscriber, actualReceiver +} + +func (ps *PubSub[T]) SubscribeByCallback(ns string, fn func(T)) PubSubSubscription { + ps.masterLock.Lock() + defer ps.masterLock.Unlock() + + sub := &pubSubSubscription[T]{ID: xid.New().String(), namespace: ns, parent: ps, subLock: &sync.Mutex{}, Func: fn, UnsubChan: nil} + + ps.subscriptions[ns] = append(ps.subscriptions[ns], sub) + + return sub +} + +func (ps *PubSub[T]) SubscribeByChan(ns string, chanBufferSize int) (chan T, PubSubSubscription) { + ps.masterLock.Lock() + defer ps.masterLock.Unlock() + + msgCh := make(chan T, chanBufferSize) + + sub := &pubSubSubscription[T]{ID: xid.New().String(), namespace: ns, parent: ps, subLock: &sync.Mutex{}, Chan: msgCh, UnsubChan: nil} + + ps.subscriptions[ns] = append(ps.subscriptions[ns], sub) + + return msgCh, sub +} + +func (ps *PubSub[T]) SubscribeByIter(ns string, chanBufferSize int) (iter.Seq[T], PubSubSubscription) { + ps.masterLock.Lock() + defer ps.masterLock.Unlock() + + msgCh := make(chan T, chanBufferSize) + unsubChan := make(chan bool, 8) + + sub := &pubSubSubscription[T]{ID: xid.New().String(), namespace: ns, parent: ps, subLock: &sync.Mutex{}, Chan: msgCh, UnsubChan: unsubChan} + + ps.subscriptions[ns] = append(ps.subscriptions[ns], sub) + + iterFun := func(yield func(T) bool) { + for { + select { + case msg := <-msgCh: + if !yield(msg) { + sub.Unsubscribe() + return + } + case <-sub.UnsubChan: + sub.Unsubscribe() + return + } + } + } + + return iterFun, sub +} + +func (ps *PubSub[T]) unsubscribe(p *pubSubSubscription[T]) { + ps.masterLock.Lock() + defer ps.masterLock.Unlock() + + p.subLock.Lock() + defer p.subLock.Unlock() + + if p.Chan != nil { + close(p.Chan) + p.Chan = nil + } + if p.UnsubChan != nil { + syncext.WriteNonBlocking(p.UnsubChan, true) + close(p.UnsubChan) + p.UnsubChan = nil + } + + ps.subscriptions[p.namespace] = langext.ArrFilter(ps.subscriptions[p.namespace], func(v *pubSubSubscription[T]) bool { + return v.ID != p.ID + }) + if len(ps.subscriptions[p.namespace]) == 0 { + delete(ps.subscriptions, p.namespace) + } +} diff --git a/dataext/pubsub_test.go b/dataext/pubsub_test.go new file mode 100644 index 0000000..081cc4e --- /dev/null +++ b/dataext/pubsub_test.go @@ -0,0 +1,428 @@ +package dataext + +import ( + "context" + "sync" + "testing" + "time" +) + +func TestNewPubSub(t *testing.T) { + ps := NewPubSub[string](10) + if ps == nil { + t.Fatal("NewPubSub returned nil") + } + if ps.masterLock == nil { + t.Fatal("masterLock is nil") + } + if ps.subscriptions == nil { + t.Fatal("subscriptions is nil") + } +} + +func TestPubSub_Namespaces(t *testing.T) { + ps := NewPubSub[string](10) + + // Initially no namespaces + namespaces := ps.Namespaces() + if len(namespaces) != 0 { + t.Fatalf("Expected 0 namespaces, got %d", len(namespaces)) + } + + // Add a subscription to create a namespace + _, sub1 := ps.SubscribeByChan("test-ns1", 1) + defer sub1.Unsubscribe() + + // Add another subscription to a different namespace + _, sub2 := ps.SubscribeByChan("test-ns2", 1) + defer sub2.Unsubscribe() + + // Check namespaces + namespaces = ps.Namespaces() + if len(namespaces) != 2 { + t.Fatalf("Expected 2 namespaces, got %d", len(namespaces)) + } + + // Check if namespaces contain the expected values + found1, found2 := false, false + for _, ns := range namespaces { + if ns == "test-ns1" { + found1 = true + } + if ns == "test-ns2" { + found2 = true + } + } + + if !found1 || !found2 { + t.Fatalf("Expected to find both namespaces, found ns1: %v, ns2: %v", found1, found2) + } +} + +func TestPubSub_SubscribeByCallback(t *testing.T) { + ps := NewPubSub[string](10) + + var received string + var wg sync.WaitGroup + wg.Add(1) + + callback := func(msg string) { + received = msg + wg.Done() + } + + sub := ps.SubscribeByCallback("test-ns", callback) + defer sub.Unsubscribe() + + // Publish a message + subs, receivers := ps.Publish("test-ns", "hello") + + if subs != 1 { + t.Fatalf("Expected 1 subscriber, got %d", subs) + } + + if receivers != 1 { + t.Fatalf("Expected 1 receiver, got %d", receivers) + } + + // Wait for the callback to be executed + wg.Wait() + + if received != "hello" { + t.Fatalf("Expected to receive 'hello', got '%s'", received) + } +} + +func TestPubSub_SubscribeByChan(t *testing.T) { + ps := NewPubSub[string](10) + + ch, sub := ps.SubscribeByChan("test-ns", 1) + defer sub.Unsubscribe() + + // Publish a message + subs, receivers := ps.Publish("test-ns", "hello") + + if subs != 1 { + t.Fatalf("Expected 1 subscriber, got %d", subs) + } + + if receivers != 1 { + t.Fatalf("Expected 1 receiver, got %d", receivers) + } + + // Read from the channel with a timeout to avoid blocking + select { + case msg := <-ch: + if msg != "hello" { + t.Fatalf("Expected to receive 'hello', got '%s'", msg) + } + case <-time.After(time.Second): + t.Fatal("Timed out waiting for message") + } +} + +func TestPubSub_SubscribeByIter(t *testing.T) { + ps := NewPubSub[string](10) + + iterSeq, sub := ps.SubscribeByIter("test-ns", 1) + defer sub.Unsubscribe() + + // Channel to communicate when message is received + done := make(chan bool) + received := false + + // Start a goroutine to use the iterator + go func() { + for msg := range iterSeq { + if msg == "hello" { + received = true + done <- true + return // Stop iteration + } + } + }() + + // Give time for the iterator to start + time.Sleep(100 * time.Millisecond) + + // Publish a message + ps.Publish("test-ns", "hello") + + // Wait for the message to be received or timeout + select { + case <-done: + if !received { + t.Fatal("Message was received but not 'hello'") + } + case <-time.After(time.Second): + t.Fatal("Timed out waiting for message") + } + + subCount := ps.SubscriberCount("test-ns") + if subCount != 0 { + t.Fatalf("Expected 0 receivers, got %d", subCount) + } +} + +func TestPubSub_Publish(t *testing.T) { + ps := NewPubSub[string](10) + + // Test publishing to a namespace with no subscribers + subs, receivers := ps.Publish("empty-ns", "hello") + if subs != 0 { + t.Fatalf("Expected 0 subscribers, got %d", subs) + } + if receivers != 0 { + t.Fatalf("Expected 0 receivers, got %d", receivers) + } + + // Add a subscriber + ch, sub := ps.SubscribeByChan("test-ns", 1) + defer sub.Unsubscribe() + + // Publish a message + subs, receivers = ps.Publish("test-ns", "hello") + if subs != 1 { + t.Fatalf("Expected 1 subscriber, got %d", subs) + } + if receivers != 1 { + t.Fatalf("Expected 1 receiver, got %d", receivers) + } + + // Verify the message was received + select { + case msg := <-ch: + if msg != "hello" { + t.Fatalf("Expected to receive 'hello', got '%s'", msg) + } + case <-time.After(time.Second): + t.Fatal("Timed out waiting for message") + } + + // Test non-blocking behavior with a full channel + // First fill the channel + ps.Publish("test-ns", "fill") + + // Now publish again - this should not block but skip the receiver + subs, receivers = ps.Publish("test-ns", "overflow") + if subs != 1 { + t.Fatalf("Expected 1 subscriber, got %d", subs) + } + // The receiver count might be 0 if the channel is full + + // Drain the channel + <-ch +} + +func TestPubSub_PublishWithTimeout(t *testing.T) { + ps := NewPubSub[string](10) + + // Add a subscriber with a channel + ch, sub := ps.SubscribeByChan("test-ns", 1) + defer sub.Unsubscribe() + + // Publish with a timeout + subs, receivers := ps.PublishWithTimeout("test-ns", "hello", 100*time.Millisecond) + if subs != 1 { + t.Fatalf("Expected 1 subscriber, got %d", subs) + } + if receivers != 1 { + t.Fatalf("Expected 1 receiver, got %d", receivers) + } + + // Verify the message was received + select { + case msg := <-ch: + if msg != "hello" { + t.Fatalf("Expected to receive 'hello', got '%s'", msg) + } + case <-time.After(time.Second): + t.Fatal("Timed out waiting for message") + } + + // Fill the channel + ps.Publish("test-ns", "fill") + + // Test timeout behavior with a full channel + start := time.Now() + subs, receivers = ps.PublishWithTimeout("test-ns", "timeout-test", 50*time.Millisecond) + elapsed := time.Since(start) + + if subs != 1 { + t.Fatalf("Expected 1 subscriber, got %d", subs) + } + + // The receiver count should be 0 if the timeout occurred + if elapsed < 50*time.Millisecond { + t.Fatalf("Expected to wait at least 50ms, only waited %v", elapsed) + } + + // Drain the channel + <-ch +} + +func TestPubSub_PublishWithContext(t *testing.T) { + ps := NewPubSub[string](10) + + // Add a subscriber with a channel + ch, sub := ps.SubscribeByChan("test-ns", 1) + defer sub.Unsubscribe() + + // Create a context + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + // Publish with context + subs, receivers, err := ps.PublishWithContext(ctx, "test-ns", "hello") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if subs != 1 { + t.Fatalf("Expected 1 subscriber, got %d", subs) + } + if receivers != 1 { + t.Fatalf("Expected 1 receiver, got %d", receivers) + } + + // Verify the message was received + select { + case msg := <-ch: + if msg != "hello" { + t.Fatalf("Expected to receive 'hello', got '%s'", msg) + } + case <-time.After(time.Second): + t.Fatal("Timed out waiting for message") + } + + // Fill the channel + ps.Publish("test-ns", "fill") + + // Test context cancellation with a full channel + ctx, cancel = context.WithCancel(context.Background()) + + // Cancel the context after a short delay + go func() { + time.Sleep(50 * time.Millisecond) + cancel() + }() + + start := time.Now() + subs, receivers, err = ps.PublishWithContext(ctx, "test-ns", "context-test") + elapsed := time.Since(start) + + if subs != 1 { + t.Fatalf("Expected 1 subscriber, got %d", subs) + } + + // Should get a context canceled error + if err == nil { + t.Fatal("Expected context canceled error, got nil") + } + + if elapsed < 50*time.Millisecond { + t.Fatalf("Expected to wait at least 50ms, only waited %v", elapsed) + } + + // Drain the channel + <-ch +} + +func TestPubSub_Unsubscribe(t *testing.T) { + ps := NewPubSub[string](10) + + // Add a subscriber + ch, sub := ps.SubscribeByChan("test-ns", 1) + + // Publish a message + subs, receivers := ps.Publish("test-ns", "hello") + if subs != 1 { + t.Fatalf("Expected 1 subscriber, got %d", subs) + } + if receivers != 1 { + t.Fatalf("Expected 1 receiver, got %d", receivers) + } + + // Verify the message was received + select { + case msg := <-ch: + if msg != "hello" { + t.Fatalf("Expected to receive 'hello', got '%s'", msg) + } + case <-time.After(time.Second): + t.Fatal("Timed out waiting for message") + } + + // Unsubscribe + sub.Unsubscribe() + + // Publish again + subs, receivers = ps.Publish("test-ns", "after-unsub") + if subs != 0 { + t.Fatalf("Expected 0 subscribers after unsubscribe, got %d", subs) + } + if receivers != 0 { + t.Fatalf("Expected 0 receivers after unsubscribe, got %d", receivers) + } + + // Check that the namespace is removed + namespaces := ps.Namespaces() + if len(namespaces) != 0 { + t.Fatalf("Expected 0 namespaces after unsubscribe, got %d", len(namespaces)) + } +} + +func TestPubSub_MultipleSubscribers(t *testing.T) { + ps := NewPubSub[string](10) + + // Add multiple subscribers + ch1, sub1 := ps.SubscribeByChan("test-ns", 1) + defer sub1.Unsubscribe() + + ch2, sub2 := ps.SubscribeByChan("test-ns", 1) + defer sub2.Unsubscribe() + + var received string + var wg sync.WaitGroup + wg.Add(1) + + sub3 := ps.SubscribeByCallback("test-ns", func(msg string) { + received = msg + wg.Done() + }) + defer sub3.Unsubscribe() + + // Publish a message + subs, receivers := ps.Publish("test-ns", "hello") + if subs != 3 { + t.Fatalf("Expected 3 subscribers, got %d", subs) + } + if receivers != 3 { + t.Fatalf("Expected 3 receivers, got %d", receivers) + } + + // Verify the message was received by all subscribers + select { + case msg := <-ch1: + if msg != "hello" { + t.Fatalf("Expected ch1 to receive 'hello', got '%s'", msg) + } + case <-time.After(time.Second): + t.Fatal("Timed out waiting for message on ch1") + } + + select { + case msg := <-ch2: + if msg != "hello" { + t.Fatalf("Expected ch2 to receive 'hello', got '%s'", msg) + } + case <-time.After(time.Second): + t.Fatal("Timed out waiting for message on ch2") + } + + // Wait for the callback + wg.Wait() + + if received != "hello" { + t.Fatalf("Expected callback to receive 'hello', got '%s'", received) + } +} diff --git a/go.mod b/go.mod index d751e71..671fb3f 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( require ( github.com/disintegration/imaging v1.6.2 github.com/jung-kurt/gofpdf v1.16.2 + golang.org/x/net v0.42.0 golang.org/x/sync v0.16.0 ) @@ -55,7 +56,6 @@ require ( github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect golang.org/x/arch v0.19.0 // indirect golang.org/x/image v0.29.0 // indirect - golang.org/x/net v0.42.0 // indirect golang.org/x/text v0.27.0 // indirect google.golang.org/protobuf v1.36.6 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/goextVersion.go b/goextVersion.go index dbdf665..1cd3a60 100644 --- a/goextVersion.go +++ b/goextVersion.go @@ -1,5 +1,5 @@ package goext -const GoextVersion = "0.0.590" +const GoextVersion = "0.0.591" -const GoextVersionTimestamp = "2025-07-15T14:14:22+0200" +const GoextVersionTimestamp = "2025-07-16T12:44:55+0200" diff --git a/syncext/channel.go b/syncext/channel.go index e64a7cb..f5b1ac3 100644 --- a/syncext/channel.go +++ b/syncext/channel.go @@ -1,6 +1,7 @@ package syncext import ( + "golang.org/x/net/context" "time" ) @@ -26,6 +27,15 @@ func WriteChannelWithTimeout[T any](c chan T, msg T, timeout time.Duration) bool } } +func WriteChannelWithContext[T any](ctx context.Context, c chan T, msg T) error { + select { + case c <- msg: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + func ReadNonBlocking[T any](c chan T) (T, bool) { select { case msg := <-c: