Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d30e778bd4 | |||
| 73e867f75a |
@@ -90,15 +90,17 @@ func TestBroadcast_SubscribeByIter(t *testing.T) {
|
|||||||
|
|
||||||
// Channel to communicate when message is received
|
// Channel to communicate when message is received
|
||||||
done := make(chan bool)
|
done := make(chan bool)
|
||||||
|
goroutineDone := make(chan struct{})
|
||||||
received := false
|
received := false
|
||||||
|
|
||||||
// Start a goroutine to use the iterator
|
// Start a goroutine to use the iterator
|
||||||
go func() {
|
go func() {
|
||||||
|
defer close(goroutineDone)
|
||||||
for msg := range iterSeq {
|
for msg := range iterSeq {
|
||||||
if msg == "hello" {
|
if msg == "hello" {
|
||||||
received = true
|
received = true
|
||||||
done <- true
|
done <- true
|
||||||
return // Stop iteration
|
return // Stop iteration — triggers Unsubscribe via yield returning false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@@ -119,6 +121,14 @@ func TestBroadcast_SubscribeByIter(t *testing.T) {
|
|||||||
t.Fatal("Timed out waiting for message")
|
t.Fatal("Timed out waiting for message")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wait for the goroutine to fully exit so Unsubscribe (triggered by the
|
||||||
|
// iterator cleanup when yield returns false) has completed.
|
||||||
|
select {
|
||||||
|
case <-goroutineDone:
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatal("Timed out waiting for goroutine to finish")
|
||||||
|
}
|
||||||
|
|
||||||
subCount := bb.SubscriberCount()
|
subCount := bb.SubscriberCount()
|
||||||
if subCount != 0 {
|
if subCount != 0 {
|
||||||
t.Fatalf("Expected 0 receivers, got %d", subCount)
|
t.Fatalf("Expected 0 receivers, got %d", subCount)
|
||||||
|
|||||||
+11
-1
@@ -129,15 +129,17 @@ func TestPubSub_SubscribeByIter(t *testing.T) {
|
|||||||
|
|
||||||
// Channel to communicate when message is received
|
// Channel to communicate when message is received
|
||||||
done := make(chan bool)
|
done := make(chan bool)
|
||||||
|
goroutineDone := make(chan struct{})
|
||||||
received := false
|
received := false
|
||||||
|
|
||||||
// Start a goroutine to use the iterator
|
// Start a goroutine to use the iterator
|
||||||
go func() {
|
go func() {
|
||||||
|
defer close(goroutineDone)
|
||||||
for msg := range iterSeq {
|
for msg := range iterSeq {
|
||||||
if msg == "hello" {
|
if msg == "hello" {
|
||||||
received = true
|
received = true
|
||||||
done <- true
|
done <- true
|
||||||
return // Stop iteration
|
return // Stop iteration — triggers Unsubscribe via yield returning false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@@ -158,6 +160,14 @@ func TestPubSub_SubscribeByIter(t *testing.T) {
|
|||||||
t.Fatal("Timed out waiting for message")
|
t.Fatal("Timed out waiting for message")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wait for the goroutine to fully exit so Unsubscribe (triggered by the
|
||||||
|
// iterator cleanup when yield returns false) has completed.
|
||||||
|
select {
|
||||||
|
case <-goroutineDone:
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatal("Timed out waiting for goroutine to finish")
|
||||||
|
}
|
||||||
|
|
||||||
subCount := ps.SubscriberCount("test-ns")
|
subCount := ps.SubscriberCount("test-ns")
|
||||||
if subCount != 0 {
|
if subCount != 0 {
|
||||||
t.Fatalf("Expected 0 receivers, got %d", subCount)
|
t.Fatalf("Expected 0 receivers, got %d", subCount)
|
||||||
|
|||||||
+5
-4
@@ -2,12 +2,13 @@ package exerr
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"git.blackforestbytes.com/BlackForestBytes/goext/langext"
|
|
||||||
"github.com/rs/xid"
|
|
||||||
"github.com/rs/zerolog"
|
|
||||||
"reflect"
|
"reflect"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.blackforestbytes.com/BlackForestBytes/goext/langext"
|
||||||
|
"github.com/rs/xid"
|
||||||
|
"github.com/rs/zerolog"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ExErr struct {
|
type ExErr struct {
|
||||||
@@ -136,7 +137,7 @@ func (ee *ExErr) FormatLog(lvl LogPrintLevel) string {
|
|||||||
for curr := ee; curr != nil; curr = curr.OriginalError {
|
for curr := ee; curr != nil; curr = curr.OriginalError {
|
||||||
indent.WriteString(" ")
|
indent.WriteString(" ")
|
||||||
|
|
||||||
str.WriteString(indent).String()
|
str.WriteString(indent.String())
|
||||||
str.WriteString("-> ")
|
str.WriteString("-> ")
|
||||||
strmsg := strings.Trim(curr.Message, " \r\n\t")
|
strmsg := strings.Trim(curr.Message, " \r\n\t")
|
||||||
if lbidx := strings.Index(curr.Message, "\n"); lbidx >= 0 {
|
if lbidx := strings.Index(curr.Message, "\n"); lbidx >= 0 {
|
||||||
|
|||||||
Reference in New Issue
Block a user