Compare commits
	
		
			4 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 1586314e3e | |||
| 254fe1556a | |||
| 52e74b59f5 | |||
| 64f2cd7219 | 
							
								
								
									
										241
									
								
								dataext/pubsub.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										241
									
								
								dataext/pubsub.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,241 @@ | ||||
| package dataext | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"git.blackforestbytes.com/BlackForestBytes/goext/langext" | ||||
| 	"git.blackforestbytes.com/BlackForestBytes/goext/syncext" | ||||
| 	"github.com/rs/xid" | ||||
| 	"iter" | ||||
| 	"sync" | ||||
| 	"time" | ||||
| ) | ||||
|  | ||||
| // PubSub is a simple Pub/Sub Broker | ||||
| // Clients can subscribe to a namespace and receive published messages on this namespace | ||||
| // Messages are broadcast to all subscribers | ||||
| type PubSub[TNamespace comparable, TData any] struct { | ||||
| 	masterLock *sync.Mutex | ||||
|  | ||||
| 	subscriptions map[TNamespace][]*pubSubSubscription[TNamespace, TData] | ||||
| } | ||||
|  | ||||
| type PubSubSubscription interface { | ||||
| 	Unsubscribe() | ||||
| } | ||||
|  | ||||
| type pubSubSubscription[TNamespace comparable, TData any] struct { | ||||
| 	ID string | ||||
|  | ||||
| 	parent    *PubSub[TNamespace, TData] | ||||
| 	namespace TNamespace | ||||
|  | ||||
| 	subLock *sync.Mutex | ||||
|  | ||||
| 	Func func(TData) | ||||
| 	Chan chan TData | ||||
|  | ||||
| 	UnsubChan chan bool | ||||
| } | ||||
|  | ||||
| func (p *pubSubSubscription[TNamespace, TData]) Unsubscribe() { | ||||
| 	p.parent.unsubscribe(p) | ||||
| } | ||||
|  | ||||
| func NewPubSub[TNamespace comparable, TData any](capacity int) *PubSub[TNamespace, TData] { | ||||
| 	return &PubSub[TNamespace, TData]{ | ||||
| 		masterLock:    &sync.Mutex{}, | ||||
| 		subscriptions: make(map[TNamespace][]*pubSubSubscription[TNamespace, TData], capacity), | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (ps *PubSub[TNamespace, TData]) Namespaces() []TNamespace { | ||||
| 	ps.masterLock.Lock() | ||||
| 	defer ps.masterLock.Unlock() | ||||
|  | ||||
| 	return langext.MapKeyArr(ps.subscriptions) | ||||
| } | ||||
|  | ||||
| func (ps *PubSub[TNamespace, TData]) SubscriberCount(ns TNamespace) int { | ||||
| 	ps.masterLock.Lock() | ||||
| 	defer ps.masterLock.Unlock() | ||||
|  | ||||
| 	return len(ps.subscriptions[ns]) | ||||
| } | ||||
|  | ||||
| // Publish sends `data` to all subscriber | ||||
| // But unbuffered - if one is currently not listening, we skip (the actualReceiver < subscriber) | ||||
| func (ps *PubSub[TNamespace, TData]) Publish(ns TNamespace, data TData) (subscriber int, actualReceiver int) { | ||||
| 	ps.masterLock.Lock() | ||||
| 	subs := langext.ArrCopy(ps.subscriptions[ns]) | ||||
| 	ps.masterLock.Unlock() | ||||
|  | ||||
| 	subscriber = len(subs) | ||||
| 	actualReceiver = 0 | ||||
|  | ||||
| 	for _, sub := range subs { | ||||
| 		func() { | ||||
| 			sub.subLock.Lock() | ||||
| 			defer sub.subLock.Unlock() | ||||
|  | ||||
| 			if sub.Func != nil { | ||||
| 				go func() { sub.Func(data) }() | ||||
| 				actualReceiver++ | ||||
| 			} else if sub.Chan != nil { | ||||
| 				msgSent := syncext.WriteNonBlocking(sub.Chan, data) | ||||
| 				if msgSent { | ||||
| 					actualReceiver++ | ||||
| 				} | ||||
| 			} | ||||
| 		}() | ||||
| 	} | ||||
|  | ||||
| 	return subscriber, actualReceiver | ||||
| } | ||||
|  | ||||
| // PublishWithContext sends `data` to all subscriber | ||||
| // buffered - if one is currently not listening, we wait (but error out when the context runs out) | ||||
| func (ps *PubSub[TNamespace, TData]) PublishWithContext(ctx context.Context, ns TNamespace, data TData) (subscriber int, actualReceiver int, err error) { | ||||
| 	ps.masterLock.Lock() | ||||
| 	subs := langext.ArrCopy(ps.subscriptions[ns]) | ||||
| 	ps.masterLock.Unlock() | ||||
|  | ||||
| 	subscriber = len(subs) | ||||
| 	actualReceiver = 0 | ||||
|  | ||||
| 	for _, sub := range subs { | ||||
| 		err := func() error { | ||||
| 			sub.subLock.Lock() | ||||
| 			defer sub.subLock.Unlock() | ||||
|  | ||||
| 			if err := ctx.Err(); err != nil { | ||||
| 				return err | ||||
| 			} | ||||
|  | ||||
| 			if sub.Func != nil { | ||||
| 				go func() { sub.Func(data) }() | ||||
| 				actualReceiver++ | ||||
| 			} else if sub.Chan != nil { | ||||
| 				err := syncext.WriteChannelWithContext(ctx, sub.Chan, data) | ||||
| 				if err != nil { | ||||
| 					return err | ||||
| 				} | ||||
| 				actualReceiver++ | ||||
| 			} | ||||
|  | ||||
| 			return nil | ||||
| 		}() | ||||
| 		if err != nil { | ||||
| 			return subscriber, actualReceiver, err | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return subscriber, actualReceiver, nil | ||||
| } | ||||
|  | ||||
| // PublishWithTimeout sends `data` to all subscriber | ||||
| // buffered - if one is currently not listening, we wait (but wait at most `timeout` - if the timeout is exceeded then actualReceiver < subscriber) | ||||
| func (ps *PubSub[TNamespace, TData]) PublishWithTimeout(ns TNamespace, data TData, timeout time.Duration) (subscriber int, actualReceiver int) { | ||||
| 	ps.masterLock.Lock() | ||||
| 	subs := langext.ArrCopy(ps.subscriptions[ns]) | ||||
| 	ps.masterLock.Unlock() | ||||
|  | ||||
| 	subscriber = len(subs) | ||||
| 	actualReceiver = 0 | ||||
|  | ||||
| 	for _, sub := range subs { | ||||
| 		func() { | ||||
| 			sub.subLock.Lock() | ||||
| 			defer sub.subLock.Unlock() | ||||
|  | ||||
| 			if sub.Func != nil { | ||||
| 				go func() { sub.Func(data) }() | ||||
| 				actualReceiver++ | ||||
| 			} else if sub.Chan != nil { | ||||
| 				ok := syncext.WriteChannelWithTimeout(sub.Chan, data, timeout) | ||||
| 				if ok { | ||||
| 					actualReceiver++ | ||||
| 				} | ||||
| 			} | ||||
| 		}() | ||||
| 	} | ||||
|  | ||||
| 	return subscriber, actualReceiver | ||||
| } | ||||
|  | ||||
| func (ps *PubSub[TNamespace, TData]) SubscribeByCallback(ns TNamespace, fn func(TData)) PubSubSubscription { | ||||
| 	ps.masterLock.Lock() | ||||
| 	defer ps.masterLock.Unlock() | ||||
|  | ||||
| 	sub := &pubSubSubscription[TNamespace, TData]{ID: xid.New().String(), namespace: ns, parent: ps, subLock: &sync.Mutex{}, Func: fn, UnsubChan: nil} | ||||
|  | ||||
| 	ps.subscriptions[ns] = append(ps.subscriptions[ns], sub) | ||||
|  | ||||
| 	return sub | ||||
| } | ||||
|  | ||||
| func (ps *PubSub[TNamespace, TData]) SubscribeByChan(ns TNamespace, chanBufferSize int) (chan TData, PubSubSubscription) { | ||||
| 	ps.masterLock.Lock() | ||||
| 	defer ps.masterLock.Unlock() | ||||
|  | ||||
| 	msgCh := make(chan TData, chanBufferSize) | ||||
|  | ||||
| 	sub := &pubSubSubscription[TNamespace, TData]{ID: xid.New().String(), namespace: ns, parent: ps, subLock: &sync.Mutex{}, Chan: msgCh, UnsubChan: nil} | ||||
|  | ||||
| 	ps.subscriptions[ns] = append(ps.subscriptions[ns], sub) | ||||
|  | ||||
| 	return msgCh, sub | ||||
| } | ||||
|  | ||||
| func (ps *PubSub[TNamespace, TData]) SubscribeByIter(ns TNamespace, chanBufferSize int) (iter.Seq[TData], PubSubSubscription) { | ||||
| 	ps.masterLock.Lock() | ||||
| 	defer ps.masterLock.Unlock() | ||||
|  | ||||
| 	msgCh := make(chan TData, chanBufferSize) | ||||
| 	unsubChan := make(chan bool, 8) | ||||
|  | ||||
| 	sub := &pubSubSubscription[TNamespace, TData]{ID: xid.New().String(), namespace: ns, parent: ps, subLock: &sync.Mutex{}, Chan: msgCh, UnsubChan: unsubChan} | ||||
|  | ||||
| 	ps.subscriptions[ns] = append(ps.subscriptions[ns], sub) | ||||
|  | ||||
| 	iterFun := func(yield func(TData) bool) { | ||||
| 		for { | ||||
| 			select { | ||||
| 			case msg := <-msgCh: | ||||
| 				if !yield(msg) { | ||||
| 					sub.Unsubscribe() | ||||
| 					return | ||||
| 				} | ||||
| 			case <-sub.UnsubChan: | ||||
| 				sub.Unsubscribe() | ||||
| 				return | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return iterFun, sub | ||||
| } | ||||
|  | ||||
| func (ps *PubSub[TNamespace, TData]) unsubscribe(p *pubSubSubscription[TNamespace, TData]) { | ||||
| 	ps.masterLock.Lock() | ||||
| 	defer ps.masterLock.Unlock() | ||||
|  | ||||
| 	p.subLock.Lock() | ||||
| 	defer p.subLock.Unlock() | ||||
|  | ||||
| 	if p.Chan != nil { | ||||
| 		close(p.Chan) | ||||
| 		p.Chan = nil | ||||
| 	} | ||||
| 	if p.UnsubChan != nil { | ||||
| 		syncext.WriteNonBlocking(p.UnsubChan, true) | ||||
| 		close(p.UnsubChan) | ||||
| 		p.UnsubChan = nil | ||||
| 	} | ||||
|  | ||||
| 	ps.subscriptions[p.namespace] = langext.ArrFilter(ps.subscriptions[p.namespace], func(v *pubSubSubscription[TNamespace, TData]) bool { | ||||
| 		return v.ID != p.ID | ||||
| 	}) | ||||
| 	if len(ps.subscriptions[p.namespace]) == 0 { | ||||
| 		delete(ps.subscriptions, p.namespace) | ||||
| 	} | ||||
| } | ||||
							
								
								
									
										428
									
								
								dataext/pubsub_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										428
									
								
								dataext/pubsub_test.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,428 @@ | ||||
| package dataext | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"sync" | ||||
| 	"testing" | ||||
| 	"time" | ||||
| ) | ||||
|  | ||||
| func TestNewPubSub(t *testing.T) { | ||||
| 	ps := NewPubSub[string, string](10) | ||||
| 	if ps == nil { | ||||
| 		t.Fatal("NewPubSub returned nil") | ||||
| 	} | ||||
| 	if ps.masterLock == nil { | ||||
| 		t.Fatal("masterLock is nil") | ||||
| 	} | ||||
| 	if ps.subscriptions == nil { | ||||
| 		t.Fatal("subscriptions is nil") | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestPubSub_Namespaces(t *testing.T) { | ||||
| 	ps := NewPubSub[string, string](10) | ||||
|  | ||||
| 	// Initially no namespaces | ||||
| 	namespaces := ps.Namespaces() | ||||
| 	if len(namespaces) != 0 { | ||||
| 		t.Fatalf("Expected 0 namespaces, got %d", len(namespaces)) | ||||
| 	} | ||||
|  | ||||
| 	// Add a subscription to create a namespace | ||||
| 	_, sub1 := ps.SubscribeByChan("test-ns1", 1) | ||||
| 	defer sub1.Unsubscribe() | ||||
|  | ||||
| 	// Add another subscription to a different namespace | ||||
| 	_, sub2 := ps.SubscribeByChan("test-ns2", 1) | ||||
| 	defer sub2.Unsubscribe() | ||||
|  | ||||
| 	// Check namespaces | ||||
| 	namespaces = ps.Namespaces() | ||||
| 	if len(namespaces) != 2 { | ||||
| 		t.Fatalf("Expected 2 namespaces, got %d", len(namespaces)) | ||||
| 	} | ||||
|  | ||||
| 	// Check if namespaces contain the expected values | ||||
| 	found1, found2 := false, false | ||||
| 	for _, ns := range namespaces { | ||||
| 		if ns == "test-ns1" { | ||||
| 			found1 = true | ||||
| 		} | ||||
| 		if ns == "test-ns2" { | ||||
| 			found2 = true | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if !found1 || !found2 { | ||||
| 		t.Fatalf("Expected to find both namespaces, found ns1: %v, ns2: %v", found1, found2) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestPubSub_SubscribeByCallback(t *testing.T) { | ||||
| 	ps := NewPubSub[string, string](10) | ||||
|  | ||||
| 	var received string | ||||
| 	var wg sync.WaitGroup | ||||
| 	wg.Add(1) | ||||
|  | ||||
| 	callback := func(msg string) { | ||||
| 		received = msg | ||||
| 		wg.Done() | ||||
| 	} | ||||
|  | ||||
| 	sub := ps.SubscribeByCallback("test-ns", callback) | ||||
| 	defer sub.Unsubscribe() | ||||
|  | ||||
| 	// Publish a message | ||||
| 	subs, receivers := ps.Publish("test-ns", "hello") | ||||
|  | ||||
| 	if subs != 1 { | ||||
| 		t.Fatalf("Expected 1 subscriber, got %d", subs) | ||||
| 	} | ||||
|  | ||||
| 	if receivers != 1 { | ||||
| 		t.Fatalf("Expected 1 receiver, got %d", receivers) | ||||
| 	} | ||||
|  | ||||
| 	// Wait for the callback to be executed | ||||
| 	wg.Wait() | ||||
|  | ||||
| 	if received != "hello" { | ||||
| 		t.Fatalf("Expected to receive 'hello', got '%s'", received) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestPubSub_SubscribeByChan(t *testing.T) { | ||||
| 	ps := NewPubSub[string, string](10) | ||||
|  | ||||
| 	ch, sub := ps.SubscribeByChan("test-ns", 1) | ||||
| 	defer sub.Unsubscribe() | ||||
|  | ||||
| 	// Publish a message | ||||
| 	subs, receivers := ps.Publish("test-ns", "hello") | ||||
|  | ||||
| 	if subs != 1 { | ||||
| 		t.Fatalf("Expected 1 subscriber, got %d", subs) | ||||
| 	} | ||||
|  | ||||
| 	if receivers != 1 { | ||||
| 		t.Fatalf("Expected 1 receiver, got %d", receivers) | ||||
| 	} | ||||
|  | ||||
| 	// Read from the channel with a timeout to avoid blocking | ||||
| 	select { | ||||
| 	case msg := <-ch: | ||||
| 		if msg != "hello" { | ||||
| 			t.Fatalf("Expected to receive 'hello', got '%s'", msg) | ||||
| 		} | ||||
| 	case <-time.After(time.Second): | ||||
| 		t.Fatal("Timed out waiting for message") | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestPubSub_SubscribeByIter(t *testing.T) { | ||||
| 	ps := NewPubSub[string, string](10) | ||||
|  | ||||
| 	iterSeq, sub := ps.SubscribeByIter("test-ns", 1) | ||||
| 	defer sub.Unsubscribe() | ||||
|  | ||||
| 	// Channel to communicate when message is received | ||||
| 	done := make(chan bool) | ||||
| 	received := false | ||||
|  | ||||
| 	// Start a goroutine to use the iterator | ||||
| 	go func() { | ||||
| 		for msg := range iterSeq { | ||||
| 			if msg == "hello" { | ||||
| 				received = true | ||||
| 				done <- true | ||||
| 				return // Stop iteration | ||||
| 			} | ||||
| 		} | ||||
| 	}() | ||||
|  | ||||
| 	// Give time for the iterator to start | ||||
| 	time.Sleep(100 * time.Millisecond) | ||||
|  | ||||
| 	// Publish a message | ||||
| 	ps.Publish("test-ns", "hello") | ||||
|  | ||||
| 	// Wait for the message to be received or timeout | ||||
| 	select { | ||||
| 	case <-done: | ||||
| 		if !received { | ||||
| 			t.Fatal("Message was received but not 'hello'") | ||||
| 		} | ||||
| 	case <-time.After(time.Second): | ||||
| 		t.Fatal("Timed out waiting for message") | ||||
| 	} | ||||
|  | ||||
| 	subCount := ps.SubscriberCount("test-ns") | ||||
| 	if subCount != 0 { | ||||
| 		t.Fatalf("Expected 0 receivers, got %d", subCount) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestPubSub_Publish(t *testing.T) { | ||||
| 	ps := NewPubSub[string, string](10) | ||||
|  | ||||
| 	// Test publishing to a namespace with no subscribers | ||||
| 	subs, receivers := ps.Publish("empty-ns", "hello") | ||||
| 	if subs != 0 { | ||||
| 		t.Fatalf("Expected 0 subscribers, got %d", subs) | ||||
| 	} | ||||
| 	if receivers != 0 { | ||||
| 		t.Fatalf("Expected 0 receivers, got %d", receivers) | ||||
| 	} | ||||
|  | ||||
| 	// Add a subscriber | ||||
| 	ch, sub := ps.SubscribeByChan("test-ns", 1) | ||||
| 	defer sub.Unsubscribe() | ||||
|  | ||||
| 	// Publish a message | ||||
| 	subs, receivers = ps.Publish("test-ns", "hello") | ||||
| 	if subs != 1 { | ||||
| 		t.Fatalf("Expected 1 subscriber, got %d", subs) | ||||
| 	} | ||||
| 	if receivers != 1 { | ||||
| 		t.Fatalf("Expected 1 receiver, got %d", receivers) | ||||
| 	} | ||||
|  | ||||
| 	// Verify the message was received | ||||
| 	select { | ||||
| 	case msg := <-ch: | ||||
| 		if msg != "hello" { | ||||
| 			t.Fatalf("Expected to receive 'hello', got '%s'", msg) | ||||
| 		} | ||||
| 	case <-time.After(time.Second): | ||||
| 		t.Fatal("Timed out waiting for message") | ||||
| 	} | ||||
|  | ||||
| 	// Test non-blocking behavior with a full channel | ||||
| 	// First fill the channel | ||||
| 	ps.Publish("test-ns", "fill") | ||||
|  | ||||
| 	// Now publish again - this should not block but skip the receiver | ||||
| 	subs, receivers = ps.Publish("test-ns", "overflow") | ||||
| 	if subs != 1 { | ||||
| 		t.Fatalf("Expected 1 subscriber, got %d", subs) | ||||
| 	} | ||||
| 	// The receiver count might be 0 if the channel is full | ||||
|  | ||||
| 	// Drain the channel | ||||
| 	<-ch | ||||
| } | ||||
|  | ||||
| func TestPubSub_PublishWithTimeout(t *testing.T) { | ||||
| 	ps := NewPubSub[string, string](10) | ||||
|  | ||||
| 	// Add a subscriber with a channel | ||||
| 	ch, sub := ps.SubscribeByChan("test-ns", 1) | ||||
| 	defer sub.Unsubscribe() | ||||
|  | ||||
| 	// Publish with a timeout | ||||
| 	subs, receivers := ps.PublishWithTimeout("test-ns", "hello", 100*time.Millisecond) | ||||
| 	if subs != 1 { | ||||
| 		t.Fatalf("Expected 1 subscriber, got %d", subs) | ||||
| 	} | ||||
| 	if receivers != 1 { | ||||
| 		t.Fatalf("Expected 1 receiver, got %d", receivers) | ||||
| 	} | ||||
|  | ||||
| 	// Verify the message was received | ||||
| 	select { | ||||
| 	case msg := <-ch: | ||||
| 		if msg != "hello" { | ||||
| 			t.Fatalf("Expected to receive 'hello', got '%s'", msg) | ||||
| 		} | ||||
| 	case <-time.After(time.Second): | ||||
| 		t.Fatal("Timed out waiting for message") | ||||
| 	} | ||||
|  | ||||
| 	// Fill the channel | ||||
| 	ps.Publish("test-ns", "fill") | ||||
|  | ||||
| 	// Test timeout behavior with a full channel | ||||
| 	start := time.Now() | ||||
| 	subs, receivers = ps.PublishWithTimeout("test-ns", "timeout-test", 50*time.Millisecond) | ||||
| 	elapsed := time.Since(start) | ||||
|  | ||||
| 	if subs != 1 { | ||||
| 		t.Fatalf("Expected 1 subscriber, got %d", subs) | ||||
| 	} | ||||
|  | ||||
| 	// The receiver count should be 0 if the timeout occurred | ||||
| 	if elapsed < 50*time.Millisecond { | ||||
| 		t.Fatalf("Expected to wait at least 50ms, only waited %v", elapsed) | ||||
| 	} | ||||
|  | ||||
| 	// Drain the channel | ||||
| 	<-ch | ||||
| } | ||||
|  | ||||
| func TestPubSub_PublishWithContext(t *testing.T) { | ||||
| 	ps := NewPubSub[string, string](10) | ||||
|  | ||||
| 	// Add a subscriber with a channel | ||||
| 	ch, sub := ps.SubscribeByChan("test-ns", 1) | ||||
| 	defer sub.Unsubscribe() | ||||
|  | ||||
| 	// Create a context | ||||
| 	ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) | ||||
| 	defer cancel() | ||||
|  | ||||
| 	// Publish with context | ||||
| 	subs, receivers, err := ps.PublishWithContext(ctx, "test-ns", "hello") | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("Unexpected error: %v", err) | ||||
| 	} | ||||
| 	if subs != 1 { | ||||
| 		t.Fatalf("Expected 1 subscriber, got %d", subs) | ||||
| 	} | ||||
| 	if receivers != 1 { | ||||
| 		t.Fatalf("Expected 1 receiver, got %d", receivers) | ||||
| 	} | ||||
|  | ||||
| 	// Verify the message was received | ||||
| 	select { | ||||
| 	case msg := <-ch: | ||||
| 		if msg != "hello" { | ||||
| 			t.Fatalf("Expected to receive 'hello', got '%s'", msg) | ||||
| 		} | ||||
| 	case <-time.After(time.Second): | ||||
| 		t.Fatal("Timed out waiting for message") | ||||
| 	} | ||||
|  | ||||
| 	// Fill the channel | ||||
| 	ps.Publish("test-ns", "fill") | ||||
|  | ||||
| 	// Test context cancellation with a full channel | ||||
| 	ctx, cancel = context.WithCancel(context.Background()) | ||||
|  | ||||
| 	// Cancel the context after a short delay | ||||
| 	go func() { | ||||
| 		time.Sleep(50 * time.Millisecond) | ||||
| 		cancel() | ||||
| 	}() | ||||
|  | ||||
| 	start := time.Now() | ||||
| 	subs, receivers, err = ps.PublishWithContext(ctx, "test-ns", "context-test") | ||||
| 	elapsed := time.Since(start) | ||||
|  | ||||
| 	if subs != 1 { | ||||
| 		t.Fatalf("Expected 1 subscriber, got %d", subs) | ||||
| 	} | ||||
|  | ||||
| 	// Should get a context canceled error | ||||
| 	if err == nil { | ||||
| 		t.Fatal("Expected context canceled error, got nil") | ||||
| 	} | ||||
|  | ||||
| 	if elapsed < 50*time.Millisecond { | ||||
| 		t.Fatalf("Expected to wait at least 50ms, only waited %v", elapsed) | ||||
| 	} | ||||
|  | ||||
| 	// Drain the channel | ||||
| 	<-ch | ||||
| } | ||||
|  | ||||
| func TestPubSub_Unsubscribe(t *testing.T) { | ||||
| 	ps := NewPubSub[string, string](10) | ||||
|  | ||||
| 	// Add a subscriber | ||||
| 	ch, sub := ps.SubscribeByChan("test-ns", 1) | ||||
|  | ||||
| 	// Publish a message | ||||
| 	subs, receivers := ps.Publish("test-ns", "hello") | ||||
| 	if subs != 1 { | ||||
| 		t.Fatalf("Expected 1 subscriber, got %d", subs) | ||||
| 	} | ||||
| 	if receivers != 1 { | ||||
| 		t.Fatalf("Expected 1 receiver, got %d", receivers) | ||||
| 	} | ||||
|  | ||||
| 	// Verify the message was received | ||||
| 	select { | ||||
| 	case msg := <-ch: | ||||
| 		if msg != "hello" { | ||||
| 			t.Fatalf("Expected to receive 'hello', got '%s'", msg) | ||||
| 		} | ||||
| 	case <-time.After(time.Second): | ||||
| 		t.Fatal("Timed out waiting for message") | ||||
| 	} | ||||
|  | ||||
| 	// Unsubscribe | ||||
| 	sub.Unsubscribe() | ||||
|  | ||||
| 	// Publish again | ||||
| 	subs, receivers = ps.Publish("test-ns", "after-unsub") | ||||
| 	if subs != 0 { | ||||
| 		t.Fatalf("Expected 0 subscribers after unsubscribe, got %d", subs) | ||||
| 	} | ||||
| 	if receivers != 0 { | ||||
| 		t.Fatalf("Expected 0 receivers after unsubscribe, got %d", receivers) | ||||
| 	} | ||||
|  | ||||
| 	// Check that the namespace is removed | ||||
| 	namespaces := ps.Namespaces() | ||||
| 	if len(namespaces) != 0 { | ||||
| 		t.Fatalf("Expected 0 namespaces after unsubscribe, got %d", len(namespaces)) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestPubSub_MultipleSubscribers(t *testing.T) { | ||||
| 	ps := NewPubSub[string, string](10) | ||||
|  | ||||
| 	// Add multiple subscribers | ||||
| 	ch1, sub1 := ps.SubscribeByChan("test-ns", 1) | ||||
| 	defer sub1.Unsubscribe() | ||||
|  | ||||
| 	ch2, sub2 := ps.SubscribeByChan("test-ns", 1) | ||||
| 	defer sub2.Unsubscribe() | ||||
|  | ||||
| 	var received string | ||||
| 	var wg sync.WaitGroup | ||||
| 	wg.Add(1) | ||||
|  | ||||
| 	sub3 := ps.SubscribeByCallback("test-ns", func(msg string) { | ||||
| 		received = msg | ||||
| 		wg.Done() | ||||
| 	}) | ||||
| 	defer sub3.Unsubscribe() | ||||
|  | ||||
| 	// Publish a message | ||||
| 	subs, receivers := ps.Publish("test-ns", "hello") | ||||
| 	if subs != 3 { | ||||
| 		t.Fatalf("Expected 3 subscribers, got %d", subs) | ||||
| 	} | ||||
| 	if receivers != 3 { | ||||
| 		t.Fatalf("Expected 3 receivers, got %d", receivers) | ||||
| 	} | ||||
|  | ||||
| 	// Verify the message was received by all subscribers | ||||
| 	select { | ||||
| 	case msg := <-ch1: | ||||
| 		if msg != "hello" { | ||||
| 			t.Fatalf("Expected ch1 to receive 'hello', got '%s'", msg) | ||||
| 		} | ||||
| 	case <-time.After(time.Second): | ||||
| 		t.Fatal("Timed out waiting for message on ch1") | ||||
| 	} | ||||
|  | ||||
| 	select { | ||||
| 	case msg := <-ch2: | ||||
| 		if msg != "hello" { | ||||
| 			t.Fatalf("Expected ch2 to receive 'hello', got '%s'", msg) | ||||
| 		} | ||||
| 	case <-time.After(time.Second): | ||||
| 		t.Fatal("Timed out waiting for message on ch2") | ||||
| 	} | ||||
|  | ||||
| 	// Wait for the callback | ||||
| 	wg.Wait() | ||||
|  | ||||
| 	if received != "hello" { | ||||
| 		t.Fatalf("Expected callback to receive 'hello', got '%s'", received) | ||||
| 	} | ||||
| } | ||||
| @@ -456,10 +456,19 @@ func (b *Builder) Output(ctx context.Context, g *gin.Context) { | ||||
|  | ||||
| 	// this is only here to add one level to the trace | ||||
| 	// so that .Build() and .Output() and .Print() have the same depth and our stack-skip logger can have the same skip-count | ||||
| 	b.doOutput(ctx, g) | ||||
| 	b.doGinOutput(ctx, g) | ||||
| } | ||||
|  | ||||
| func (b *Builder) doOutput(ctx context.Context, g *gin.Context) { | ||||
| // OutputRaw works teh same as Output() - but does not depend on gin and works with a raw http.ResponseWriter | ||||
| func (b *Builder) OutputRaw(w http.ResponseWriter) { | ||||
| 	warnOnPkgConfigNotInitialized() | ||||
|  | ||||
| 	// this is only here to add one level to the trace | ||||
| 	// so that .Build() and .Output() and .Print() have the same depth and our stack-skip logger can have the same skip-count | ||||
| 	b.doRawOutput(w) | ||||
| } | ||||
|  | ||||
| func (b *Builder) doGinOutput(ctx context.Context, g *gin.Context) { | ||||
| 	b.errorData.Output(g) | ||||
|  | ||||
| 	if (b.errorData.Severity == SevErr || b.errorData.Severity == SevFatal) && (pkgconfig.ZeroLogErrGinOutput || pkgconfig.ZeroLogAllGinOutput) { | ||||
| @@ -471,6 +480,18 @@ func (b *Builder) doOutput(ctx context.Context, g *gin.Context) { | ||||
| 	b.errorData.CallListener(MethodOutput, ListenerOpt{NoLog: b.noLog}) | ||||
| } | ||||
|  | ||||
| func (b *Builder) doRawOutput(w http.ResponseWriter) { | ||||
| 	b.errorData.OutputRaw(w) | ||||
|  | ||||
| 	if (b.errorData.Severity == SevErr || b.errorData.Severity == SevFatal) && (pkgconfig.ZeroLogErrGinOutput || pkgconfig.ZeroLogAllGinOutput) { | ||||
| 		b.errorData.Log(pkgconfig.ZeroLogger.Error()) | ||||
| 	} else if (b.errorData.Severity == SevWarn) && (pkgconfig.ZeroLogAllGinOutput) { | ||||
| 		b.errorData.Log(pkgconfig.ZeroLogger.Warn()) | ||||
| 	} | ||||
|  | ||||
| 	b.errorData.CallListener(MethodOutput, ListenerOpt{NoLog: b.noLog}) | ||||
| } | ||||
|  | ||||
| // Print prints the error | ||||
| // If the error is SevErr we also send it to the error-service | ||||
| func (b *Builder) Print(ctxs ...context.Context) Proxy { | ||||
|   | ||||
							
								
								
									
										34
									
								
								exerr/gin.go
									
									
									
									
									
								
							
							
						
						
									
										34
									
								
								exerr/gin.go
									
									
									
									
									
								
							| @@ -1,9 +1,9 @@ | ||||
| package exerr | ||||
|  | ||||
| import ( | ||||
| 	"github.com/gin-gonic/gin" | ||||
| 	json "git.blackforestbytes.com/BlackForestBytes/goext/gojson" | ||||
| 	"git.blackforestbytes.com/BlackForestBytes/goext/langext" | ||||
| 	"github.com/gin-gonic/gin" | ||||
| 	"net/http" | ||||
| 	"time" | ||||
| ) | ||||
| @@ -68,7 +68,6 @@ func (ee *ExErr) ToDefaultAPIJson() (string, error) { | ||||
| 	gjr := json.GoJsonRender{Data: ee.ToAPIJson(true, pkgconfig.ExtendedGinOutput, pkgconfig.IncludeMetaInGinOutput), NilSafeSlices: true, NilSafeMaps: true} | ||||
|  | ||||
| 	r, err := gjr.RenderString() | ||||
|  | ||||
| 	if err != nil { | ||||
| 		return "", err | ||||
| 	} | ||||
| @@ -143,3 +142,34 @@ func (ee *ExErr) Output(g *gin.Context) { | ||||
|  | ||||
| 	g.Render(statuscode, json.GoJsonRender{Data: ginOutput, NilSafeSlices: true, NilSafeMaps: true}) | ||||
| } | ||||
|  | ||||
| func (ee *ExErr) OutputRaw(w http.ResponseWriter) { | ||||
|  | ||||
| 	warnOnPkgConfigNotInitialized() | ||||
|  | ||||
| 	var statuscode = http.StatusInternalServerError | ||||
|  | ||||
| 	var baseCat = ee.RecursiveCategory() | ||||
| 	var baseType = ee.RecursiveType() | ||||
| 	var baseStatuscode = ee.RecursiveStatuscode() | ||||
|  | ||||
| 	if baseCat == CatUser { | ||||
| 		statuscode = http.StatusBadRequest | ||||
| 	} else if baseCat == CatSystem { | ||||
| 		statuscode = http.StatusInternalServerError | ||||
| 	} | ||||
|  | ||||
| 	if baseStatuscode != nil { | ||||
| 		statuscode = *ee.StatusCode | ||||
| 	} else if baseType.DefaultStatusCode != nil { | ||||
| 		statuscode = *baseType.DefaultStatusCode | ||||
| 	} | ||||
|  | ||||
| 	ginOutput, err := ee.ToDefaultAPIJson() | ||||
| 	if err != nil { | ||||
| 		panic(err) // cannot happen | ||||
| 	} | ||||
|  | ||||
| 	w.WriteHeader(statuscode) | ||||
| 	_, _ = w.Write([]byte(ginOutput)) | ||||
| } | ||||
|   | ||||
							
								
								
									
										2
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								go.mod
									
									
									
									
									
								
							| @@ -19,6 +19,7 @@ require ( | ||||
| require ( | ||||
| 	github.com/disintegration/imaging v1.6.2 | ||||
| 	github.com/jung-kurt/gofpdf v1.16.2 | ||||
| 	golang.org/x/net v0.42.0 | ||||
| 	golang.org/x/sync v0.16.0 | ||||
| ) | ||||
|  | ||||
| @@ -55,7 +56,6 @@ require ( | ||||
| 	github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect | ||||
| 	golang.org/x/arch v0.19.0 // indirect | ||||
| 	golang.org/x/image v0.29.0 // indirect | ||||
| 	golang.org/x/net v0.42.0 // indirect | ||||
| 	golang.org/x/text v0.27.0 // indirect | ||||
| 	google.golang.org/protobuf v1.36.6 // indirect | ||||
| 	gopkg.in/yaml.v3 v3.0.1 // indirect | ||||
|   | ||||
| @@ -1,5 +1,5 @@ | ||||
| package goext | ||||
|  | ||||
| const GoextVersion = "0.0.590" | ||||
| const GoextVersion = "0.0.594" | ||||
|  | ||||
| const GoextVersionTimestamp = "2025-07-15T14:14:22+0200" | ||||
| const GoextVersionTimestamp = "2025-07-16T17:13:07+0200" | ||||
|   | ||||
| @@ -1,6 +1,7 @@ | ||||
| package syncext | ||||
|  | ||||
| import ( | ||||
| 	"golang.org/x/net/context" | ||||
| 	"time" | ||||
| ) | ||||
|  | ||||
| @@ -26,6 +27,15 @@ func WriteChannelWithTimeout[T any](c chan T, msg T, timeout time.Duration) bool | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func WriteChannelWithContext[T any](ctx context.Context, c chan T, msg T) error { | ||||
| 	select { | ||||
| 	case c <- msg: | ||||
| 		return nil | ||||
| 	case <-ctx.Done(): | ||||
| 		return ctx.Err() | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func ReadNonBlocking[T any](c chan T) (T, bool) { | ||||
| 	select { | ||||
| 	case msg := <-c: | ||||
|   | ||||
		Reference in New Issue
	
	Block a user