package dataext import ( "context" "sync" "testing" "time" ) func TestNewPubSub(t *testing.T) { ps := NewPubSub[string, string](10) if ps == nil { t.Fatal("NewPubSub returned nil") } if ps.masterLock == nil { t.Fatal("masterLock is nil") } if ps.subscriptions == nil { t.Fatal("subscriptions is nil") } } func TestPubSub_Namespaces(t *testing.T) { ps := NewPubSub[string, string](10) // Initially no namespaces namespaces := ps.Namespaces() if len(namespaces) != 0 { t.Fatalf("Expected 0 namespaces, got %d", len(namespaces)) } // Add a subscription to create a namespace _, sub1 := ps.SubscribeByChan("test-ns1", 1) defer sub1.Unsubscribe() // Add another subscription to a different namespace _, sub2 := ps.SubscribeByChan("test-ns2", 1) defer sub2.Unsubscribe() // Check namespaces namespaces = ps.Namespaces() if len(namespaces) != 2 { t.Fatalf("Expected 2 namespaces, got %d", len(namespaces)) } // Check if namespaces contain the expected values found1, found2 := false, false for _, ns := range namespaces { if ns == "test-ns1" { found1 = true } if ns == "test-ns2" { found2 = true } } if !found1 || !found2 { t.Fatalf("Expected to find both namespaces, found ns1: %v, ns2: %v", found1, found2) } } func TestPubSub_SubscribeByCallback(t *testing.T) { ps := NewPubSub[string, string](10) var received string var wg sync.WaitGroup wg.Add(1) callback := func(msg string) { received = msg wg.Done() } sub := ps.SubscribeByCallback("test-ns", callback) defer sub.Unsubscribe() // Publish a message subs, receivers := ps.Publish("test-ns", "hello") if subs != 1 { t.Fatalf("Expected 1 subscriber, got %d", subs) } if receivers != 1 { t.Fatalf("Expected 1 receiver, got %d", receivers) } // Wait for the callback to be executed wg.Wait() if received != "hello" { t.Fatalf("Expected to receive 'hello', got '%s'", received) } } func TestPubSub_SubscribeByChan(t *testing.T) { ps := NewPubSub[string, string](10) ch, sub := ps.SubscribeByChan("test-ns", 1) defer sub.Unsubscribe() // Publish a message subs, receivers := ps.Publish("test-ns", "hello") if subs != 1 { t.Fatalf("Expected 1 subscriber, got %d", subs) } if receivers != 1 { t.Fatalf("Expected 1 receiver, got %d", receivers) } // Read from the channel with a timeout to avoid blocking select { case msg := <-ch: if msg != "hello" { t.Fatalf("Expected to receive 'hello', got '%s'", msg) } case <-time.After(time.Second): t.Fatal("Timed out waiting for message") } } func TestPubSub_SubscribeByIter(t *testing.T) { ps := NewPubSub[string, string](10) iterSeq, sub := ps.SubscribeByIter("test-ns", 1) defer sub.Unsubscribe() // Channel to communicate when message is received done := make(chan bool) received := false // Start a goroutine to use the iterator go func() { for msg := range iterSeq { if msg == "hello" { received = true done <- true return // Stop iteration } } }() // Give time for the iterator to start time.Sleep(100 * time.Millisecond) // Publish a message ps.Publish("test-ns", "hello") // Wait for the message to be received or timeout select { case <-done: if !received { t.Fatal("Message was received but not 'hello'") } case <-time.After(time.Second): t.Fatal("Timed out waiting for message") } subCount := ps.SubscriberCount("test-ns") if subCount != 0 { t.Fatalf("Expected 0 receivers, got %d", subCount) } } func TestPubSub_Publish(t *testing.T) { ps := NewPubSub[string, string](10) // Test publishing to a namespace with no subscribers subs, receivers := ps.Publish("empty-ns", "hello") if subs != 0 { t.Fatalf("Expected 0 subscribers, got %d", subs) } if receivers != 0 { t.Fatalf("Expected 0 receivers, got %d", receivers) } // Add a subscriber ch, sub := ps.SubscribeByChan("test-ns", 1) defer sub.Unsubscribe() // Publish a message subs, receivers = ps.Publish("test-ns", "hello") if subs != 1 { t.Fatalf("Expected 1 subscriber, got %d", subs) } if receivers != 1 { t.Fatalf("Expected 1 receiver, got %d", receivers) } // Verify the message was received select { case msg := <-ch: if msg != "hello" { t.Fatalf("Expected to receive 'hello', got '%s'", msg) } case <-time.After(time.Second): t.Fatal("Timed out waiting for message") } // Test non-blocking behavior with a full channel // First fill the channel ps.Publish("test-ns", "fill") // Now publish again - this should not block but skip the receiver subs, receivers = ps.Publish("test-ns", "overflow") if subs != 1 { t.Fatalf("Expected 1 subscriber, got %d", subs) } // The receiver count might be 0 if the channel is full // Drain the channel <-ch } func TestPubSub_PublishWithTimeout(t *testing.T) { ps := NewPubSub[string, string](10) // Add a subscriber with a channel ch, sub := ps.SubscribeByChan("test-ns", 1) defer sub.Unsubscribe() // Publish with a timeout subs, receivers := ps.PublishWithTimeout("test-ns", "hello", 100*time.Millisecond) if subs != 1 { t.Fatalf("Expected 1 subscriber, got %d", subs) } if receivers != 1 { t.Fatalf("Expected 1 receiver, got %d", receivers) } // Verify the message was received select { case msg := <-ch: if msg != "hello" { t.Fatalf("Expected to receive 'hello', got '%s'", msg) } case <-time.After(time.Second): t.Fatal("Timed out waiting for message") } // Fill the channel ps.Publish("test-ns", "fill") // Test timeout behavior with a full channel start := time.Now() subs, receivers = ps.PublishWithTimeout("test-ns", "timeout-test", 50*time.Millisecond) elapsed := time.Since(start) if subs != 1 { t.Fatalf("Expected 1 subscriber, got %d", subs) } // The receiver count should be 0 if the timeout occurred if elapsed < 50*time.Millisecond { t.Fatalf("Expected to wait at least 50ms, only waited %v", elapsed) } // Drain the channel <-ch } func TestPubSub_PublishWithContext(t *testing.T) { ps := NewPubSub[string, string](10) // Add a subscriber with a channel ch, sub := ps.SubscribeByChan("test-ns", 1) defer sub.Unsubscribe() // Create a context ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() // Publish with context subs, receivers, err := ps.PublishWithContext(ctx, "test-ns", "hello") if err != nil { t.Fatalf("Unexpected error: %v", err) } if subs != 1 { t.Fatalf("Expected 1 subscriber, got %d", subs) } if receivers != 1 { t.Fatalf("Expected 1 receiver, got %d", receivers) } // Verify the message was received select { case msg := <-ch: if msg != "hello" { t.Fatalf("Expected to receive 'hello', got '%s'", msg) } case <-time.After(time.Second): t.Fatal("Timed out waiting for message") } // Fill the channel ps.Publish("test-ns", "fill") // Test context cancellation with a full channel ctx, cancel = context.WithCancel(context.Background()) // Cancel the context after a short delay go func() { time.Sleep(50 * time.Millisecond) cancel() }() start := time.Now() subs, receivers, err = ps.PublishWithContext(ctx, "test-ns", "context-test") elapsed := time.Since(start) if subs != 1 { t.Fatalf("Expected 1 subscriber, got %d", subs) } // Should get a context canceled error if err == nil { t.Fatal("Expected context canceled error, got nil") } if elapsed < 50*time.Millisecond { t.Fatalf("Expected to wait at least 50ms, only waited %v", elapsed) } // Drain the channel <-ch } func TestPubSub_Unsubscribe(t *testing.T) { ps := NewPubSub[string, string](10) // Add a subscriber ch, sub := ps.SubscribeByChan("test-ns", 1) // Publish a message subs, receivers := ps.Publish("test-ns", "hello") if subs != 1 { t.Fatalf("Expected 1 subscriber, got %d", subs) } if receivers != 1 { t.Fatalf("Expected 1 receiver, got %d", receivers) } // Verify the message was received select { case msg := <-ch: if msg != "hello" { t.Fatalf("Expected to receive 'hello', got '%s'", msg) } case <-time.After(time.Second): t.Fatal("Timed out waiting for message") } // Unsubscribe sub.Unsubscribe() // Publish again subs, receivers = ps.Publish("test-ns", "after-unsub") if subs != 0 { t.Fatalf("Expected 0 subscribers after unsubscribe, got %d", subs) } if receivers != 0 { t.Fatalf("Expected 0 receivers after unsubscribe, got %d", receivers) } // Check that the namespace is removed namespaces := ps.Namespaces() if len(namespaces) != 0 { t.Fatalf("Expected 0 namespaces after unsubscribe, got %d", len(namespaces)) } } func TestPubSub_MultipleSubscribers(t *testing.T) { ps := NewPubSub[string, string](10) // Add multiple subscribers ch1, sub1 := ps.SubscribeByChan("test-ns", 1) defer sub1.Unsubscribe() ch2, sub2 := ps.SubscribeByChan("test-ns", 1) defer sub2.Unsubscribe() var received string var wg sync.WaitGroup wg.Add(1) sub3 := ps.SubscribeByCallback("test-ns", func(msg string) { received = msg wg.Done() }) defer sub3.Unsubscribe() // Publish a message subs, receivers := ps.Publish("test-ns", "hello") if subs != 3 { t.Fatalf("Expected 3 subscribers, got %d", subs) } if receivers != 3 { t.Fatalf("Expected 3 receivers, got %d", receivers) } // Verify the message was received by all subscribers select { case msg := <-ch1: if msg != "hello" { t.Fatalf("Expected ch1 to receive 'hello', got '%s'", msg) } case <-time.After(time.Second): t.Fatal("Timed out waiting for message on ch1") } select { case msg := <-ch2: if msg != "hello" { t.Fatalf("Expected ch2 to receive 'hello', got '%s'", msg) } case <-time.After(time.Second): t.Fatal("Timed out waiting for message on ch2") } // Wait for the callback wg.Wait() if received != "hello" { t.Fatalf("Expected callback to receive 'hello', got '%s'", received) } }