package dataext import ( "context" "sync" "testing" "time" ) func TestNewBroadcast(t *testing.T) { bb := NewBroadcaster[string](10) if bb == nil { t.Fatal("NewBroadcaster returned nil") } if bb.masterLock == nil { t.Fatal("masterLock is nil") } if bb.subscriptions == nil { t.Fatal("subscriptions is nil") } } func TestBroadcast_SubscribeByCallback(t *testing.T) { bb := NewBroadcaster[string](10) var received string var wg sync.WaitGroup wg.Add(1) callback := func(msg string) { received = msg wg.Done() } sub := bb.SubscribeByCallback(callback) defer sub.Unsubscribe() // Publish a message subs, receivers := bb.Publish("hello") if subs != 1 { t.Fatalf("Expected 1 subscriber, got %d", subs) } if receivers != 1 { t.Fatalf("Expected 1 receiver, got %d", receivers) } // Wait for the callback to be executed wg.Wait() if received != "hello" { t.Fatalf("Expected to receive 'hello', got '%s'", received) } } func TestBroadcast_SubscribeByChan(t *testing.T) { bb := NewBroadcaster[string](10) ch, sub := bb.SubscribeByChan(1) defer sub.Unsubscribe() // Publish a message subs, receivers := bb.Publish("hello") if subs != 1 { t.Fatalf("Expected 1 subscriber, got %d", subs) } if receivers != 1 { t.Fatalf("Expected 1 receiver, got %d", receivers) } // Read from the channel with a timeout to avoid blocking select { case msg := <-ch: if msg != "hello" { t.Fatalf("Expected to receive 'hello', got '%s'", msg) } case <-time.After(time.Second): t.Fatal("Timed out waiting for message") } } func TestBroadcast_SubscribeByIter(t *testing.T) { bb := NewBroadcaster[string](10) iterSeq, sub := bb.SubscribeByIter(1) defer sub.Unsubscribe() // Channel to communicate when message is received done := make(chan bool) received := false // Start a goroutine to use the iterator go func() { for msg := range iterSeq { if msg == "hello" { received = true done <- true return // Stop iteration } } }() // Give time for the iterator to start time.Sleep(100 * time.Millisecond) // Publish a message bb.Publish("hello") // Wait for the message to be received or timeout select { case <-done: if !received { t.Fatal("Message was received but not 'hello'") } case <-time.After(time.Second): t.Fatal("Timed out waiting for message") } subCount := bb.SubscriberCount() if subCount != 0 { t.Fatalf("Expected 0 receivers, got %d", subCount) } } func TestBroadcast_Publish(t *testing.T) { bb := NewBroadcaster[string](10) // Test publishing with no subscribers subs, receivers := bb.Publish("hello") if subs != 0 { t.Fatalf("Expected 0 subscribers, got %d", subs) } if receivers != 0 { t.Fatalf("Expected 0 receivers, got %d", receivers) } // Add a subscriber ch, sub := bb.SubscribeByChan(1) defer sub.Unsubscribe() // Publish a message subs, receivers = bb.Publish("hello") if subs != 1 { t.Fatalf("Expected 1 subscriber, got %d", subs) } if receivers != 1 { t.Fatalf("Expected 1 receiver, got %d", receivers) } // Verify the message was received select { case msg := <-ch: if msg != "hello" { t.Fatalf("Expected to receive 'hello', got '%s'", msg) } case <-time.After(time.Second): t.Fatal("Timed out waiting for message") } // Test non-blocking behavior with a full channel // First fill the channel bb.Publish("fill") // Now publish again - this should not block but may skip the receiver subs, receivers = bb.Publish("overflow") if subs != 1 { t.Fatalf("Expected 1 subscriber, got %d", subs) } _ = receivers // may be 0 if channel is full // Drain the channel <-ch } func TestBroadcast_PublishWithTimeout(t *testing.T) { bb := NewBroadcaster[string](10) // Add a subscriber with a channel ch, sub := bb.SubscribeByChan(1) defer sub.Unsubscribe() // Publish with a timeout subs, receivers := bb.PublishWithTimeout("hello", 100*time.Millisecond) if subs != 1 { t.Fatalf("Expected 1 subscriber, got %d", subs) } if receivers != 1 { t.Fatalf("Expected 1 receiver, got %d", receivers) } // Verify the message was received select { case msg := <-ch: if msg != "hello" { t.Fatalf("Expected to receive 'hello', got '%s'", msg) } case <-time.After(time.Second): t.Fatal("Timed out waiting for message") } // Fill the channel bb.Publish("fill") // Test timeout behavior with a full channel start := time.Now() subs, receivers = bb.PublishWithTimeout("timeout-test", 50*time.Millisecond) elapsed := time.Since(start) if subs != 1 { t.Fatalf("Expected 1 subscriber, got %d", subs) } // The receiver count should be 0 if the timeout occurred if elapsed < 50*time.Millisecond { t.Fatalf("Expected to wait at least 50ms, only waited %v", elapsed) } // Drain the channel <-ch } func TestBroadcast_PublishWithContext(t *testing.T) { bb := NewBroadcaster[string](10) // Add a subscriber with a channel ch, sub := bb.SubscribeByChan(1) defer sub.Unsubscribe() // Create a context ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() // Publish with context subs, receivers, err := bb.PublishWithContext(ctx, "hello") if err != nil { t.Fatalf("Unexpected error: %v", err) } if subs != 1 { t.Fatalf("Expected 1 subscriber, got %d", subs) } if receivers != 1 { t.Fatalf("Expected 1 receiver, got %d", receivers) } // Verify the message was received select { case msg := <-ch: if msg != "hello" { t.Fatalf("Expected to receive 'hello', got '%s'", msg) } case <-time.After(time.Second): t.Fatal("Timed out waiting for message") } // Fill the channel bb.Publish("fill") // Test context cancellation with a full channel ctx, cancel = context.WithCancel(context.Background()) // Cancel the context after a short delay go func() { time.Sleep(50 * time.Millisecond) cancel() }() start := time.Now() subs, receivers, err = bb.PublishWithContext(ctx, "context-test") elapsed := time.Since(start) if subs != 1 { t.Fatalf("Expected 1 subscriber, got %d", subs) } // Should get a context canceled error if err == nil { t.Fatal("Expected context canceled error, got nil") } if elapsed < 50*time.Millisecond { t.Fatalf("Expected to wait at least 50ms, only waited %v", elapsed) } // Drain the channel <-ch } func TestBroadcast_Unsubscribe(t *testing.T) { bb := NewBroadcaster[string](10) // Add a subscriber ch, sub := bb.SubscribeByChan(1) // Publish a message subs, receivers := bb.Publish("hello") if subs != 1 { t.Fatalf("Expected 1 subscriber, got %d", subs) } if receivers != 1 { t.Fatalf("Expected 1 receiver, got %d", receivers) } // Verify the message was received select { case msg := <-ch: if msg != "hello" { t.Fatalf("Expected to receive 'hello', got '%s'", msg) } case <-time.After(time.Second): t.Fatal("Timed out waiting for message") } // Unsubscribe sub.Unsubscribe() // Publish again subs, receivers = bb.Publish("after-unsub") if subs != 0 { t.Fatalf("Expected 0 subscribers after unsubscribe, got %d", subs) } if receivers != 0 { t.Fatalf("Expected 0 receivers after unsubscribe, got %d", receivers) } // Check that the subscriber count is 0 if bb.SubscriberCount() != 0 { t.Fatalf("Expected SubscriberCount() == 0, got %d", bb.SubscriberCount()) } }