From dc62bbe55f6996d851385e8ac05c6e1e42c5affc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mike=20Schw=C3=B6rer?= Date: Sat, 13 Sep 2025 18:42:17 +0200 Subject: [PATCH] v0.0.599 implement dataext.broadcaster --- dataext/broadcaster.go | 230 +++++++++++++++++++++++++ dataext/broadcaster_test.go | 332 ++++++++++++++++++++++++++++++++++++ goextVersion.go | 4 +- 3 files changed, 564 insertions(+), 2 deletions(-) create mode 100644 dataext/broadcaster.go create mode 100644 dataext/broadcaster_test.go diff --git a/dataext/broadcaster.go b/dataext/broadcaster.go new file mode 100644 index 0000000..04c5069 --- /dev/null +++ b/dataext/broadcaster.go @@ -0,0 +1,230 @@ +package dataext + +import ( + "context" + "iter" + "sync" + "time" + + "git.blackforestbytes.com/BlackForestBytes/goext/langext" + "git.blackforestbytes.com/BlackForestBytes/goext/syncext" + "github.com/rs/xid" +) + +// Broadcaster is a simple Broadcaster channel +// This is a simpler interface over Broadcaster - which does not have distinct namespaces +type Broadcaster[TData any] struct { + masterLock *sync.Mutex + + subscriptions []*broadcastSubscription[TData] +} + +type BroadcastSubscription interface { + Unsubscribe() +} + +type broadcastSubscription[TData any] struct { + ID string + + parent *Broadcaster[TData] + + subLock *sync.Mutex + + Func func(TData) + Chan chan TData + + UnsubChan chan bool +} + +func (p *broadcastSubscription[TData]) Unsubscribe() { + p.parent.unsubscribe(p) +} + +func NewBroadcast[TData any](capacity int) *Broadcaster[TData] { + return &Broadcaster[TData]{ + masterLock: &sync.Mutex{}, + subscriptions: make([]*broadcastSubscription[TData], 0, capacity), + } +} + +func (bb *Broadcaster[TData]) SubscriberCount() int { + bb.masterLock.Lock() + defer bb.masterLock.Unlock() + + return len(bb.subscriptions) +} + +// Publish sends `data` to all subscriber +// But unbuffered - if one is currently not listening, we skip (the actualReceiver < subscriber) +func (bb *Broadcaster[TData]) Publish(data TData) (subscriber int, actualReceiver int) { + bb.masterLock.Lock() + subs := langext.ArrCopy(bb.subscriptions) + bb.masterLock.Unlock() + + subscriber = len(subs) + actualReceiver = 0 + + for _, sub := range subs { + func() { + sub.subLock.Lock() + defer sub.subLock.Unlock() + + if sub.Func != nil { + go func() { sub.Func(data) }() + actualReceiver++ + } else if sub.Chan != nil { + msgSent := syncext.WriteNonBlocking(sub.Chan, data) + if msgSent { + actualReceiver++ + } + } + }() + } + + return subscriber, actualReceiver +} + +// PublishWithContext sends `data` to all subscriber +// buffered - if one is currently not listening, we wait (but error out when the context runs out) +func (bb *Broadcaster[TData]) PublishWithContext(ctx context.Context, data TData) (subscriber int, actualReceiver int, err error) { + bb.masterLock.Lock() + subs := langext.ArrCopy(bb.subscriptions) + bb.masterLock.Unlock() + + subscriber = len(subs) + actualReceiver = 0 + + for _, sub := range subs { + err := func() error { + sub.subLock.Lock() + defer sub.subLock.Unlock() + + if err := ctx.Err(); err != nil { + return err + } + + if sub.Func != nil { + go func() { sub.Func(data) }() + actualReceiver++ + } else if sub.Chan != nil { + err := syncext.WriteChannelWithContext(ctx, sub.Chan, data) + if err != nil { + return err + } + actualReceiver++ + } + + return nil + }() + if err != nil { + return subscriber, actualReceiver, err + } + } + + return subscriber, actualReceiver, nil +} + +// PublishWithTimeout sends `data` to all subscriber +// buffered - if one is currently not listening, we wait (but wait at most `timeout` - if the timeout is exceeded then actualReceiver < subscriber) +func (bb *Broadcaster[TData]) PublishWithTimeout(data TData, timeout time.Duration) (subscriber int, actualReceiver int) { + bb.masterLock.Lock() + subs := langext.ArrCopy(bb.subscriptions) + bb.masterLock.Unlock() + + subscriber = len(subs) + actualReceiver = 0 + + for _, sub := range subs { + func() { + sub.subLock.Lock() + defer sub.subLock.Unlock() + + if sub.Func != nil { + go func() { sub.Func(data) }() + actualReceiver++ + } else if sub.Chan != nil { + ok := syncext.WriteChannelWithTimeout(sub.Chan, data, timeout) + if ok { + actualReceiver++ + } + } + }() + } + + return subscriber, actualReceiver +} + +func (bb *Broadcaster[TData]) SubscribeByCallback(fn func(TData)) BroadcastSubscription { + bb.masterLock.Lock() + defer bb.masterLock.Unlock() + + sub := &broadcastSubscription[TData]{ID: xid.New().String(), parent: bb, subLock: &sync.Mutex{}, Func: fn, UnsubChan: nil} + + bb.subscriptions = append(bb.subscriptions, sub) + + return sub +} + +func (bb *Broadcaster[TData]) SubscribeByChan(chanBufferSize int) (chan TData, BroadcastSubscription) { + bb.masterLock.Lock() + defer bb.masterLock.Unlock() + + msgCh := make(chan TData, chanBufferSize) + + sub := &broadcastSubscription[TData]{ID: xid.New().String(), parent: bb, subLock: &sync.Mutex{}, Chan: msgCh, UnsubChan: nil} + + bb.subscriptions = append(bb.subscriptions, sub) + + return msgCh, sub +} + +func (bb *Broadcaster[TData]) SubscribeByIter(chanBufferSize int) (iter.Seq[TData], BroadcastSubscription) { + bb.masterLock.Lock() + defer bb.masterLock.Unlock() + + msgCh := make(chan TData, chanBufferSize) + unsubChan := make(chan bool, 8) + + sub := &broadcastSubscription[TData]{ID: xid.New().String(), parent: bb, subLock: &sync.Mutex{}, Chan: msgCh, UnsubChan: unsubChan} + + bb.subscriptions = append(bb.subscriptions, sub) + + iterFun := func(yield func(TData) bool) { + for { + select { + case msg := <-msgCh: + if !yield(msg) { + sub.Unsubscribe() + return + } + case <-sub.UnsubChan: + sub.Unsubscribe() + return + } + } + } + + return iterFun, sub +} + +func (bb *Broadcaster[TData]) unsubscribe(p *broadcastSubscription[TData]) { + bb.masterLock.Lock() + defer bb.masterLock.Unlock() + + p.subLock.Lock() + defer p.subLock.Unlock() + + if p.Chan != nil { + close(p.Chan) + p.Chan = nil + } + if p.UnsubChan != nil { + syncext.WriteNonBlocking(p.UnsubChan, true) + close(p.UnsubChan) + p.UnsubChan = nil + } + + bb.subscriptions = langext.ArrFilter(bb.subscriptions, func(v *broadcastSubscription[TData]) bool { + return v.ID != p.ID + }) +} diff --git a/dataext/broadcaster_test.go b/dataext/broadcaster_test.go new file mode 100644 index 0000000..31c46ff --- /dev/null +++ b/dataext/broadcaster_test.go @@ -0,0 +1,332 @@ +package dataext + +import ( + "context" + "sync" + "testing" + "time" +) + +func TestNewBroadcast(t *testing.T) { + bb := NewBroadcast[string](10) + if bb == nil { + t.Fatal("NewBroadcast returned nil") + } + if bb.masterLock == nil { + t.Fatal("masterLock is nil") + } + if bb.subscriptions == nil { + t.Fatal("subscriptions is nil") + } +} + +func TestBroadcast_SubscribeByCallback(t *testing.T) { + bb := NewBroadcast[string](10) + + var received string + var wg sync.WaitGroup + wg.Add(1) + + callback := func(msg string) { + received = msg + wg.Done() + } + + sub := bb.SubscribeByCallback(callback) + defer sub.Unsubscribe() + + // Publish a message + subs, receivers := bb.Publish("hello") + + if subs != 1 { + t.Fatalf("Expected 1 subscriber, got %d", subs) + } + + if receivers != 1 { + t.Fatalf("Expected 1 receiver, got %d", receivers) + } + + // Wait for the callback to be executed + wg.Wait() + + if received != "hello" { + t.Fatalf("Expected to receive 'hello', got '%s'", received) + } +} + +func TestBroadcast_SubscribeByChan(t *testing.T) { + bb := NewBroadcast[string](10) + + ch, sub := bb.SubscribeByChan(1) + defer sub.Unsubscribe() + + // Publish a message + subs, receivers := bb.Publish("hello") + + if subs != 1 { + t.Fatalf("Expected 1 subscriber, got %d", subs) + } + + if receivers != 1 { + t.Fatalf("Expected 1 receiver, got %d", receivers) + } + + // Read from the channel with a timeout to avoid blocking + select { + case msg := <-ch: + if msg != "hello" { + t.Fatalf("Expected to receive 'hello', got '%s'", msg) + } + case <-time.After(time.Second): + t.Fatal("Timed out waiting for message") + } +} + +func TestBroadcast_SubscribeByIter(t *testing.T) { + bb := NewBroadcast[string](10) + + iterSeq, sub := bb.SubscribeByIter(1) + defer sub.Unsubscribe() + + // Channel to communicate when message is received + done := make(chan bool) + received := false + + // Start a goroutine to use the iterator + go func() { + for msg := range iterSeq { + if msg == "hello" { + received = true + done <- true + return // Stop iteration + } + } + }() + + // Give time for the iterator to start + time.Sleep(100 * time.Millisecond) + + // Publish a message + bb.Publish("hello") + + // Wait for the message to be received or timeout + select { + case <-done: + if !received { + t.Fatal("Message was received but not 'hello'") + } + case <-time.After(time.Second): + t.Fatal("Timed out waiting for message") + } + + subCount := bb.SubscriberCount() + if subCount != 0 { + t.Fatalf("Expected 0 receivers, got %d", subCount) + } +} + +func TestBroadcast_Publish(t *testing.T) { + bb := NewBroadcast[string](10) + + // Test publishing with no subscribers + subs, receivers := bb.Publish("hello") + if subs != 0 { + t.Fatalf("Expected 0 subscribers, got %d", subs) + } + if receivers != 0 { + t.Fatalf("Expected 0 receivers, got %d", receivers) + } + + // Add a subscriber + ch, sub := bb.SubscribeByChan(1) + defer sub.Unsubscribe() + + // Publish a message + subs, receivers = bb.Publish("hello") + if subs != 1 { + t.Fatalf("Expected 1 subscriber, got %d", subs) + } + if receivers != 1 { + t.Fatalf("Expected 1 receiver, got %d", receivers) + } + + // Verify the message was received + select { + case msg := <-ch: + if msg != "hello" { + t.Fatalf("Expected to receive 'hello', got '%s'", msg) + } + case <-time.After(time.Second): + t.Fatal("Timed out waiting for message") + } + + // Test non-blocking behavior with a full channel + // First fill the channel + bb.Publish("fill") + + // Now publish again - this should not block but may skip the receiver + subs, receivers = bb.Publish("overflow") + if subs != 1 { + t.Fatalf("Expected 1 subscriber, got %d", subs) + } + _ = receivers // may be 0 if channel is full + + // Drain the channel + <-ch +} + +func TestBroadcast_PublishWithTimeout(t *testing.T) { + bb := NewBroadcast[string](10) + + // Add a subscriber with a channel + ch, sub := bb.SubscribeByChan(1) + defer sub.Unsubscribe() + + // Publish with a timeout + subs, receivers := bb.PublishWithTimeout("hello", 100*time.Millisecond) + if subs != 1 { + t.Fatalf("Expected 1 subscriber, got %d", subs) + } + if receivers != 1 { + t.Fatalf("Expected 1 receiver, got %d", receivers) + } + + // Verify the message was received + select { + case msg := <-ch: + if msg != "hello" { + t.Fatalf("Expected to receive 'hello', got '%s'", msg) + } + case <-time.After(time.Second): + t.Fatal("Timed out waiting for message") + } + + // Fill the channel + bb.Publish("fill") + + // Test timeout behavior with a full channel + start := time.Now() + subs, receivers = bb.PublishWithTimeout("timeout-test", 50*time.Millisecond) + elapsed := time.Since(start) + + if subs != 1 { + t.Fatalf("Expected 1 subscriber, got %d", subs) + } + + // The receiver count should be 0 if the timeout occurred + if elapsed < 50*time.Millisecond { + t.Fatalf("Expected to wait at least 50ms, only waited %v", elapsed) + } + + // Drain the channel + <-ch +} + +func TestBroadcast_PublishWithContext(t *testing.T) { + bb := NewBroadcast[string](10) + + // Add a subscriber with a channel + ch, sub := bb.SubscribeByChan(1) + defer sub.Unsubscribe() + + // Create a context + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + // Publish with context + subs, receivers, err := bb.PublishWithContext(ctx, "hello") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if subs != 1 { + t.Fatalf("Expected 1 subscriber, got %d", subs) + } + if receivers != 1 { + t.Fatalf("Expected 1 receiver, got %d", receivers) + } + + // Verify the message was received + select { + case msg := <-ch: + if msg != "hello" { + t.Fatalf("Expected to receive 'hello', got '%s'", msg) + } + case <-time.After(time.Second): + t.Fatal("Timed out waiting for message") + } + + // Fill the channel + bb.Publish("fill") + + // Test context cancellation with a full channel + ctx, cancel = context.WithCancel(context.Background()) + + // Cancel the context after a short delay + go func() { + time.Sleep(50 * time.Millisecond) + cancel() + }() + + start := time.Now() + subs, receivers, err = bb.PublishWithContext(ctx, "context-test") + elapsed := time.Since(start) + + if subs != 1 { + t.Fatalf("Expected 1 subscriber, got %d", subs) + } + + // Should get a context canceled error + if err == nil { + t.Fatal("Expected context canceled error, got nil") + } + + if elapsed < 50*time.Millisecond { + t.Fatalf("Expected to wait at least 50ms, only waited %v", elapsed) + } + + // Drain the channel + <-ch +} + +func TestBroadcast_Unsubscribe(t *testing.T) { + bb := NewBroadcast[string](10) + + // Add a subscriber + ch, sub := bb.SubscribeByChan(1) + + // Publish a message + subs, receivers := bb.Publish("hello") + if subs != 1 { + t.Fatalf("Expected 1 subscriber, got %d", subs) + } + if receivers != 1 { + t.Fatalf("Expected 1 receiver, got %d", receivers) + } + + // Verify the message was received + select { + case msg := <-ch: + if msg != "hello" { + t.Fatalf("Expected to receive 'hello', got '%s'", msg) + } + case <-time.After(time.Second): + t.Fatal("Timed out waiting for message") + } + + // Unsubscribe + sub.Unsubscribe() + + // Publish again + subs, receivers = bb.Publish("after-unsub") + if subs != 0 { + t.Fatalf("Expected 0 subscribers after unsubscribe, got %d", subs) + } + if receivers != 0 { + t.Fatalf("Expected 0 receivers after unsubscribe, got %d", receivers) + } + + // Check that the subscriber count is 0 + if bb.SubscriberCount() != 0 { + t.Fatalf("Expected SubscriberCount() == 0, got %d", bb.SubscriberCount()) + } +} diff --git a/goextVersion.go b/goextVersion.go index d9593d2..c6e88df 100644 --- a/goextVersion.go +++ b/goextVersion.go @@ -1,5 +1,5 @@ package goext -const GoextVersion = "0.0.598" +const GoextVersion = "0.0.599" -const GoextVersionTimestamp = "2025-09-11T11:17:34+0200" +const GoextVersionTimestamp = "2025-09-13T18:42:17+0200"