diff --git a/dataext/broadcaster_test.go b/dataext/broadcaster_test.go index c41d89e..5fd94c0 100644 --- a/dataext/broadcaster_test.go +++ b/dataext/broadcaster_test.go @@ -90,15 +90,17 @@ func TestBroadcast_SubscribeByIter(t *testing.T) { // Channel to communicate when message is received done := make(chan bool) + goroutineDone := make(chan struct{}) received := false // Start a goroutine to use the iterator go func() { + defer close(goroutineDone) for msg := range iterSeq { if msg == "hello" { received = true done <- true - return // Stop iteration + return // Stop iteration — triggers Unsubscribe via yield returning false } } }() @@ -119,6 +121,14 @@ func TestBroadcast_SubscribeByIter(t *testing.T) { t.Fatal("Timed out waiting for message") } + // Wait for the goroutine to fully exit so Unsubscribe (triggered by the + // iterator cleanup when yield returns false) has completed. + select { + case <-goroutineDone: + case <-time.After(time.Second): + t.Fatal("Timed out waiting for goroutine to finish") + } + subCount := bb.SubscriberCount() if subCount != 0 { t.Fatalf("Expected 0 receivers, got %d", subCount) diff --git a/dataext/pubsub_test.go b/dataext/pubsub_test.go index df935a3..64848f6 100644 --- a/dataext/pubsub_test.go +++ b/dataext/pubsub_test.go @@ -129,15 +129,17 @@ func TestPubSub_SubscribeByIter(t *testing.T) { // Channel to communicate when message is received done := make(chan bool) + goroutineDone := make(chan struct{}) received := false // Start a goroutine to use the iterator go func() { + defer close(goroutineDone) for msg := range iterSeq { if msg == "hello" { received = true done <- true - return // Stop iteration + return // Stop iteration — triggers Unsubscribe via yield returning false } } }() @@ -158,6 +160,14 @@ func TestPubSub_SubscribeByIter(t *testing.T) { t.Fatal("Timed out waiting for message") } + // Wait for the goroutine to fully exit so Unsubscribe (triggered by the + // iterator cleanup when yield returns false) has completed. + select { + case <-goroutineDone: + case <-time.After(time.Second): + t.Fatal("Timed out waiting for goroutine to finish") + } + subCount := ps.SubscriberCount("test-ns") if subCount != 0 { t.Fatalf("Expected 0 receivers, got %d", subCount)