Compare commits
	
		
			4 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 52e74b59f5 | |||
| 64f2cd7219 | |||
| a29aec8fb5 | |||
| 8ea9b3f79f | 
							
								
								
									
										241
									
								
								dataext/pubsub.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										241
									
								
								dataext/pubsub.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,241 @@ | |||||||
|  | package dataext | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"git.blackforestbytes.com/BlackForestBytes/goext/langext" | ||||||
|  | 	"git.blackforestbytes.com/BlackForestBytes/goext/syncext" | ||||||
|  | 	"github.com/rs/xid" | ||||||
|  | 	"iter" | ||||||
|  | 	"sync" | ||||||
|  | 	"time" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | // PubSub is a simple Pub/Sub Broker | ||||||
|  | // Clients can subscribe to a namespace and receive published messages on this namespace | ||||||
|  | // Messages are broadcast to all subscribers | ||||||
|  | type PubSub[T any] struct { | ||||||
|  | 	masterLock *sync.Mutex | ||||||
|  |  | ||||||
|  | 	subscriptions map[string][]*pubSubSubscription[T] | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type PubSubSubscription interface { | ||||||
|  | 	Unsubscribe() | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type pubSubSubscription[T any] struct { | ||||||
|  | 	ID string | ||||||
|  |  | ||||||
|  | 	parent    *PubSub[T] | ||||||
|  | 	namespace string | ||||||
|  |  | ||||||
|  | 	subLock *sync.Mutex | ||||||
|  |  | ||||||
|  | 	Func func(T) | ||||||
|  | 	Chan chan T | ||||||
|  |  | ||||||
|  | 	UnsubChan chan bool | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (p *pubSubSubscription[T]) Unsubscribe() { | ||||||
|  | 	p.parent.unsubscribe(p) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func NewPubSub[T any](capacity int) *PubSub[T] { | ||||||
|  | 	return &PubSub[T]{ | ||||||
|  | 		masterLock:    &sync.Mutex{}, | ||||||
|  | 		subscriptions: make(map[string][]*pubSubSubscription[T], capacity), | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (ps *PubSub[T]) Namespaces() []string { | ||||||
|  | 	ps.masterLock.Lock() | ||||||
|  | 	defer ps.masterLock.Unlock() | ||||||
|  |  | ||||||
|  | 	return langext.MapKeyArr(ps.subscriptions) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (ps *PubSub[T]) SubscriberCount(ns string) int { | ||||||
|  | 	ps.masterLock.Lock() | ||||||
|  | 	defer ps.masterLock.Unlock() | ||||||
|  |  | ||||||
|  | 	return len(ps.subscriptions[ns]) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Publish sends `data` to all subscriber | ||||||
|  | // But unbuffered - if one is currently not listening, we skip (the actualReceiver < subscriber) | ||||||
|  | func (ps *PubSub[T]) Publish(ns string, data T) (subscriber int, actualReceiver int) { | ||||||
|  | 	ps.masterLock.Lock() | ||||||
|  | 	subs := langext.ArrCopy(ps.subscriptions[ns]) | ||||||
|  | 	ps.masterLock.Unlock() | ||||||
|  |  | ||||||
|  | 	subscriber = len(subs) | ||||||
|  | 	actualReceiver = 0 | ||||||
|  |  | ||||||
|  | 	for _, sub := range subs { | ||||||
|  | 		func() { | ||||||
|  | 			sub.subLock.Lock() | ||||||
|  | 			defer sub.subLock.Unlock() | ||||||
|  |  | ||||||
|  | 			if sub.Func != nil { | ||||||
|  | 				go func() { sub.Func(data) }() | ||||||
|  | 				actualReceiver++ | ||||||
|  | 			} else if sub.Chan != nil { | ||||||
|  | 				msgSent := syncext.WriteNonBlocking(sub.Chan, data) | ||||||
|  | 				if msgSent { | ||||||
|  | 					actualReceiver++ | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 		}() | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return subscriber, actualReceiver | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // PublishWithContext sends `data` to all subscriber | ||||||
|  | // buffered - if one is currently not listening, we wait (but error out when the context runs out) | ||||||
|  | func (ps *PubSub[T]) PublishWithContext(ctx context.Context, ns string, data T) (subscriber int, actualReceiver int, err error) { | ||||||
|  | 	ps.masterLock.Lock() | ||||||
|  | 	subs := langext.ArrCopy(ps.subscriptions[ns]) | ||||||
|  | 	ps.masterLock.Unlock() | ||||||
|  |  | ||||||
|  | 	subscriber = len(subs) | ||||||
|  | 	actualReceiver = 0 | ||||||
|  |  | ||||||
|  | 	for _, sub := range subs { | ||||||
|  | 		err := func() error { | ||||||
|  | 			sub.subLock.Lock() | ||||||
|  | 			defer sub.subLock.Unlock() | ||||||
|  |  | ||||||
|  | 			if err := ctx.Err(); err != nil { | ||||||
|  | 				return err | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			if sub.Func != nil { | ||||||
|  | 				go func() { sub.Func(data) }() | ||||||
|  | 				actualReceiver++ | ||||||
|  | 			} else if sub.Chan != nil { | ||||||
|  | 				err := syncext.WriteChannelWithContext(ctx, sub.Chan, data) | ||||||
|  | 				if err != nil { | ||||||
|  | 					return err | ||||||
|  | 				} | ||||||
|  | 				actualReceiver++ | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			return nil | ||||||
|  | 		}() | ||||||
|  | 		if err != nil { | ||||||
|  | 			return subscriber, actualReceiver, err | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return subscriber, actualReceiver, nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // PublishWithTimeout sends `data` to all subscriber | ||||||
|  | // buffered - if one is currently not listening, we wait (but wait at most `timeout` - if the timeout is exceeded then actualReceiver < subscriber) | ||||||
|  | func (ps *PubSub[T]) PublishWithTimeout(ns string, data T, timeout time.Duration) (subscriber int, actualReceiver int) { | ||||||
|  | 	ps.masterLock.Lock() | ||||||
|  | 	subs := langext.ArrCopy(ps.subscriptions[ns]) | ||||||
|  | 	ps.masterLock.Unlock() | ||||||
|  |  | ||||||
|  | 	subscriber = len(subs) | ||||||
|  | 	actualReceiver = 0 | ||||||
|  |  | ||||||
|  | 	for _, sub := range subs { | ||||||
|  | 		func() { | ||||||
|  | 			sub.subLock.Lock() | ||||||
|  | 			defer sub.subLock.Unlock() | ||||||
|  |  | ||||||
|  | 			if sub.Func != nil { | ||||||
|  | 				go func() { sub.Func(data) }() | ||||||
|  | 				actualReceiver++ | ||||||
|  | 			} else if sub.Chan != nil { | ||||||
|  | 				ok := syncext.WriteChannelWithTimeout(sub.Chan, data, timeout) | ||||||
|  | 				if ok { | ||||||
|  | 					actualReceiver++ | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 		}() | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return subscriber, actualReceiver | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (ps *PubSub[T]) SubscribeByCallback(ns string, fn func(T)) PubSubSubscription { | ||||||
|  | 	ps.masterLock.Lock() | ||||||
|  | 	defer ps.masterLock.Unlock() | ||||||
|  |  | ||||||
|  | 	sub := &pubSubSubscription[T]{ID: xid.New().String(), namespace: ns, parent: ps, subLock: &sync.Mutex{}, Func: fn, UnsubChan: nil} | ||||||
|  |  | ||||||
|  | 	ps.subscriptions[ns] = append(ps.subscriptions[ns], sub) | ||||||
|  |  | ||||||
|  | 	return sub | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (ps *PubSub[T]) SubscribeByChan(ns string, chanBufferSize int) (chan T, PubSubSubscription) { | ||||||
|  | 	ps.masterLock.Lock() | ||||||
|  | 	defer ps.masterLock.Unlock() | ||||||
|  |  | ||||||
|  | 	msgCh := make(chan T, chanBufferSize) | ||||||
|  |  | ||||||
|  | 	sub := &pubSubSubscription[T]{ID: xid.New().String(), namespace: ns, parent: ps, subLock: &sync.Mutex{}, Chan: msgCh, UnsubChan: nil} | ||||||
|  |  | ||||||
|  | 	ps.subscriptions[ns] = append(ps.subscriptions[ns], sub) | ||||||
|  |  | ||||||
|  | 	return msgCh, sub | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (ps *PubSub[T]) SubscribeByIter(ns string, chanBufferSize int) (iter.Seq[T], PubSubSubscription) { | ||||||
|  | 	ps.masterLock.Lock() | ||||||
|  | 	defer ps.masterLock.Unlock() | ||||||
|  |  | ||||||
|  | 	msgCh := make(chan T, chanBufferSize) | ||||||
|  | 	unsubChan := make(chan bool, 8) | ||||||
|  |  | ||||||
|  | 	sub := &pubSubSubscription[T]{ID: xid.New().String(), namespace: ns, parent: ps, subLock: &sync.Mutex{}, Chan: msgCh, UnsubChan: unsubChan} | ||||||
|  |  | ||||||
|  | 	ps.subscriptions[ns] = append(ps.subscriptions[ns], sub) | ||||||
|  |  | ||||||
|  | 	iterFun := func(yield func(T) bool) { | ||||||
|  | 		for { | ||||||
|  | 			select { | ||||||
|  | 			case msg := <-msgCh: | ||||||
|  | 				if !yield(msg) { | ||||||
|  | 					sub.Unsubscribe() | ||||||
|  | 					return | ||||||
|  | 				} | ||||||
|  | 			case <-sub.UnsubChan: | ||||||
|  | 				sub.Unsubscribe() | ||||||
|  | 				return | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return iterFun, sub | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (ps *PubSub[T]) unsubscribe(p *pubSubSubscription[T]) { | ||||||
|  | 	ps.masterLock.Lock() | ||||||
|  | 	defer ps.masterLock.Unlock() | ||||||
|  |  | ||||||
|  | 	p.subLock.Lock() | ||||||
|  | 	defer p.subLock.Unlock() | ||||||
|  |  | ||||||
|  | 	if p.Chan != nil { | ||||||
|  | 		close(p.Chan) | ||||||
|  | 		p.Chan = nil | ||||||
|  | 	} | ||||||
|  | 	if p.UnsubChan != nil { | ||||||
|  | 		syncext.WriteNonBlocking(p.UnsubChan, true) | ||||||
|  | 		close(p.UnsubChan) | ||||||
|  | 		p.UnsubChan = nil | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	ps.subscriptions[p.namespace] = langext.ArrFilter(ps.subscriptions[p.namespace], func(v *pubSubSubscription[T]) bool { | ||||||
|  | 		return v.ID != p.ID | ||||||
|  | 	}) | ||||||
|  | 	if len(ps.subscriptions[p.namespace]) == 0 { | ||||||
|  | 		delete(ps.subscriptions, p.namespace) | ||||||
|  | 	} | ||||||
|  | } | ||||||
							
								
								
									
										428
									
								
								dataext/pubsub_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										428
									
								
								dataext/pubsub_test.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,428 @@ | |||||||
|  | package dataext | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"sync" | ||||||
|  | 	"testing" | ||||||
|  | 	"time" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | func TestNewPubSub(t *testing.T) { | ||||||
|  | 	ps := NewPubSub[string](10) | ||||||
|  | 	if ps == nil { | ||||||
|  | 		t.Fatal("NewPubSub returned nil") | ||||||
|  | 	} | ||||||
|  | 	if ps.masterLock == nil { | ||||||
|  | 		t.Fatal("masterLock is nil") | ||||||
|  | 	} | ||||||
|  | 	if ps.subscriptions == nil { | ||||||
|  | 		t.Fatal("subscriptions is nil") | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func TestPubSub_Namespaces(t *testing.T) { | ||||||
|  | 	ps := NewPubSub[string](10) | ||||||
|  |  | ||||||
|  | 	// Initially no namespaces | ||||||
|  | 	namespaces := ps.Namespaces() | ||||||
|  | 	if len(namespaces) != 0 { | ||||||
|  | 		t.Fatalf("Expected 0 namespaces, got %d", len(namespaces)) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Add a subscription to create a namespace | ||||||
|  | 	_, sub1 := ps.SubscribeByChan("test-ns1", 1) | ||||||
|  | 	defer sub1.Unsubscribe() | ||||||
|  |  | ||||||
|  | 	// Add another subscription to a different namespace | ||||||
|  | 	_, sub2 := ps.SubscribeByChan("test-ns2", 1) | ||||||
|  | 	defer sub2.Unsubscribe() | ||||||
|  |  | ||||||
|  | 	// Check namespaces | ||||||
|  | 	namespaces = ps.Namespaces() | ||||||
|  | 	if len(namespaces) != 2 { | ||||||
|  | 		t.Fatalf("Expected 2 namespaces, got %d", len(namespaces)) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Check if namespaces contain the expected values | ||||||
|  | 	found1, found2 := false, false | ||||||
|  | 	for _, ns := range namespaces { | ||||||
|  | 		if ns == "test-ns1" { | ||||||
|  | 			found1 = true | ||||||
|  | 		} | ||||||
|  | 		if ns == "test-ns2" { | ||||||
|  | 			found2 = true | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if !found1 || !found2 { | ||||||
|  | 		t.Fatalf("Expected to find both namespaces, found ns1: %v, ns2: %v", found1, found2) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func TestPubSub_SubscribeByCallback(t *testing.T) { | ||||||
|  | 	ps := NewPubSub[string](10) | ||||||
|  |  | ||||||
|  | 	var received string | ||||||
|  | 	var wg sync.WaitGroup | ||||||
|  | 	wg.Add(1) | ||||||
|  |  | ||||||
|  | 	callback := func(msg string) { | ||||||
|  | 		received = msg | ||||||
|  | 		wg.Done() | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	sub := ps.SubscribeByCallback("test-ns", callback) | ||||||
|  | 	defer sub.Unsubscribe() | ||||||
|  |  | ||||||
|  | 	// Publish a message | ||||||
|  | 	subs, receivers := ps.Publish("test-ns", "hello") | ||||||
|  |  | ||||||
|  | 	if subs != 1 { | ||||||
|  | 		t.Fatalf("Expected 1 subscriber, got %d", subs) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if receivers != 1 { | ||||||
|  | 		t.Fatalf("Expected 1 receiver, got %d", receivers) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Wait for the callback to be executed | ||||||
|  | 	wg.Wait() | ||||||
|  |  | ||||||
|  | 	if received != "hello" { | ||||||
|  | 		t.Fatalf("Expected to receive 'hello', got '%s'", received) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func TestPubSub_SubscribeByChan(t *testing.T) { | ||||||
|  | 	ps := NewPubSub[string](10) | ||||||
|  |  | ||||||
|  | 	ch, sub := ps.SubscribeByChan("test-ns", 1) | ||||||
|  | 	defer sub.Unsubscribe() | ||||||
|  |  | ||||||
|  | 	// Publish a message | ||||||
|  | 	subs, receivers := ps.Publish("test-ns", "hello") | ||||||
|  |  | ||||||
|  | 	if subs != 1 { | ||||||
|  | 		t.Fatalf("Expected 1 subscriber, got %d", subs) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if receivers != 1 { | ||||||
|  | 		t.Fatalf("Expected 1 receiver, got %d", receivers) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Read from the channel with a timeout to avoid blocking | ||||||
|  | 	select { | ||||||
|  | 	case msg := <-ch: | ||||||
|  | 		if msg != "hello" { | ||||||
|  | 			t.Fatalf("Expected to receive 'hello', got '%s'", msg) | ||||||
|  | 		} | ||||||
|  | 	case <-time.After(time.Second): | ||||||
|  | 		t.Fatal("Timed out waiting for message") | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func TestPubSub_SubscribeByIter(t *testing.T) { | ||||||
|  | 	ps := NewPubSub[string](10) | ||||||
|  |  | ||||||
|  | 	iterSeq, sub := ps.SubscribeByIter("test-ns", 1) | ||||||
|  | 	defer sub.Unsubscribe() | ||||||
|  |  | ||||||
|  | 	// Channel to communicate when message is received | ||||||
|  | 	done := make(chan bool) | ||||||
|  | 	received := false | ||||||
|  |  | ||||||
|  | 	// Start a goroutine to use the iterator | ||||||
|  | 	go func() { | ||||||
|  | 		for msg := range iterSeq { | ||||||
|  | 			if msg == "hello" { | ||||||
|  | 				received = true | ||||||
|  | 				done <- true | ||||||
|  | 				return // Stop iteration | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	}() | ||||||
|  |  | ||||||
|  | 	// Give time for the iterator to start | ||||||
|  | 	time.Sleep(100 * time.Millisecond) | ||||||
|  |  | ||||||
|  | 	// Publish a message | ||||||
|  | 	ps.Publish("test-ns", "hello") | ||||||
|  |  | ||||||
|  | 	// Wait for the message to be received or timeout | ||||||
|  | 	select { | ||||||
|  | 	case <-done: | ||||||
|  | 		if !received { | ||||||
|  | 			t.Fatal("Message was received but not 'hello'") | ||||||
|  | 		} | ||||||
|  | 	case <-time.After(time.Second): | ||||||
|  | 		t.Fatal("Timed out waiting for message") | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	subCount := ps.SubscriberCount("test-ns") | ||||||
|  | 	if subCount != 0 { | ||||||
|  | 		t.Fatalf("Expected 0 receivers, got %d", subCount) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func TestPubSub_Publish(t *testing.T) { | ||||||
|  | 	ps := NewPubSub[string](10) | ||||||
|  |  | ||||||
|  | 	// Test publishing to a namespace with no subscribers | ||||||
|  | 	subs, receivers := ps.Publish("empty-ns", "hello") | ||||||
|  | 	if subs != 0 { | ||||||
|  | 		t.Fatalf("Expected 0 subscribers, got %d", subs) | ||||||
|  | 	} | ||||||
|  | 	if receivers != 0 { | ||||||
|  | 		t.Fatalf("Expected 0 receivers, got %d", receivers) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Add a subscriber | ||||||
|  | 	ch, sub := ps.SubscribeByChan("test-ns", 1) | ||||||
|  | 	defer sub.Unsubscribe() | ||||||
|  |  | ||||||
|  | 	// Publish a message | ||||||
|  | 	subs, receivers = ps.Publish("test-ns", "hello") | ||||||
|  | 	if subs != 1 { | ||||||
|  | 		t.Fatalf("Expected 1 subscriber, got %d", subs) | ||||||
|  | 	} | ||||||
|  | 	if receivers != 1 { | ||||||
|  | 		t.Fatalf("Expected 1 receiver, got %d", receivers) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Verify the message was received | ||||||
|  | 	select { | ||||||
|  | 	case msg := <-ch: | ||||||
|  | 		if msg != "hello" { | ||||||
|  | 			t.Fatalf("Expected to receive 'hello', got '%s'", msg) | ||||||
|  | 		} | ||||||
|  | 	case <-time.After(time.Second): | ||||||
|  | 		t.Fatal("Timed out waiting for message") | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Test non-blocking behavior with a full channel | ||||||
|  | 	// First fill the channel | ||||||
|  | 	ps.Publish("test-ns", "fill") | ||||||
|  |  | ||||||
|  | 	// Now publish again - this should not block but skip the receiver | ||||||
|  | 	subs, receivers = ps.Publish("test-ns", "overflow") | ||||||
|  | 	if subs != 1 { | ||||||
|  | 		t.Fatalf("Expected 1 subscriber, got %d", subs) | ||||||
|  | 	} | ||||||
|  | 	// The receiver count might be 0 if the channel is full | ||||||
|  |  | ||||||
|  | 	// Drain the channel | ||||||
|  | 	<-ch | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func TestPubSub_PublishWithTimeout(t *testing.T) { | ||||||
|  | 	ps := NewPubSub[string](10) | ||||||
|  |  | ||||||
|  | 	// Add a subscriber with a channel | ||||||
|  | 	ch, sub := ps.SubscribeByChan("test-ns", 1) | ||||||
|  | 	defer sub.Unsubscribe() | ||||||
|  |  | ||||||
|  | 	// Publish with a timeout | ||||||
|  | 	subs, receivers := ps.PublishWithTimeout("test-ns", "hello", 100*time.Millisecond) | ||||||
|  | 	if subs != 1 { | ||||||
|  | 		t.Fatalf("Expected 1 subscriber, got %d", subs) | ||||||
|  | 	} | ||||||
|  | 	if receivers != 1 { | ||||||
|  | 		t.Fatalf("Expected 1 receiver, got %d", receivers) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Verify the message was received | ||||||
|  | 	select { | ||||||
|  | 	case msg := <-ch: | ||||||
|  | 		if msg != "hello" { | ||||||
|  | 			t.Fatalf("Expected to receive 'hello', got '%s'", msg) | ||||||
|  | 		} | ||||||
|  | 	case <-time.After(time.Second): | ||||||
|  | 		t.Fatal("Timed out waiting for message") | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Fill the channel | ||||||
|  | 	ps.Publish("test-ns", "fill") | ||||||
|  |  | ||||||
|  | 	// Test timeout behavior with a full channel | ||||||
|  | 	start := time.Now() | ||||||
|  | 	subs, receivers = ps.PublishWithTimeout("test-ns", "timeout-test", 50*time.Millisecond) | ||||||
|  | 	elapsed := time.Since(start) | ||||||
|  |  | ||||||
|  | 	if subs != 1 { | ||||||
|  | 		t.Fatalf("Expected 1 subscriber, got %d", subs) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// The receiver count should be 0 if the timeout occurred | ||||||
|  | 	if elapsed < 50*time.Millisecond { | ||||||
|  | 		t.Fatalf("Expected to wait at least 50ms, only waited %v", elapsed) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Drain the channel | ||||||
|  | 	<-ch | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func TestPubSub_PublishWithContext(t *testing.T) { | ||||||
|  | 	ps := NewPubSub[string](10) | ||||||
|  |  | ||||||
|  | 	// Add a subscriber with a channel | ||||||
|  | 	ch, sub := ps.SubscribeByChan("test-ns", 1) | ||||||
|  | 	defer sub.Unsubscribe() | ||||||
|  |  | ||||||
|  | 	// Create a context | ||||||
|  | 	ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) | ||||||
|  | 	defer cancel() | ||||||
|  |  | ||||||
|  | 	// Publish with context | ||||||
|  | 	subs, receivers, err := ps.PublishWithContext(ctx, "test-ns", "hello") | ||||||
|  | 	if err != nil { | ||||||
|  | 		t.Fatalf("Unexpected error: %v", err) | ||||||
|  | 	} | ||||||
|  | 	if subs != 1 { | ||||||
|  | 		t.Fatalf("Expected 1 subscriber, got %d", subs) | ||||||
|  | 	} | ||||||
|  | 	if receivers != 1 { | ||||||
|  | 		t.Fatalf("Expected 1 receiver, got %d", receivers) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Verify the message was received | ||||||
|  | 	select { | ||||||
|  | 	case msg := <-ch: | ||||||
|  | 		if msg != "hello" { | ||||||
|  | 			t.Fatalf("Expected to receive 'hello', got '%s'", msg) | ||||||
|  | 		} | ||||||
|  | 	case <-time.After(time.Second): | ||||||
|  | 		t.Fatal("Timed out waiting for message") | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Fill the channel | ||||||
|  | 	ps.Publish("test-ns", "fill") | ||||||
|  |  | ||||||
|  | 	// Test context cancellation with a full channel | ||||||
|  | 	ctx, cancel = context.WithCancel(context.Background()) | ||||||
|  |  | ||||||
|  | 	// Cancel the context after a short delay | ||||||
|  | 	go func() { | ||||||
|  | 		time.Sleep(50 * time.Millisecond) | ||||||
|  | 		cancel() | ||||||
|  | 	}() | ||||||
|  |  | ||||||
|  | 	start := time.Now() | ||||||
|  | 	subs, receivers, err = ps.PublishWithContext(ctx, "test-ns", "context-test") | ||||||
|  | 	elapsed := time.Since(start) | ||||||
|  |  | ||||||
|  | 	if subs != 1 { | ||||||
|  | 		t.Fatalf("Expected 1 subscriber, got %d", subs) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Should get a context canceled error | ||||||
|  | 	if err == nil { | ||||||
|  | 		t.Fatal("Expected context canceled error, got nil") | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if elapsed < 50*time.Millisecond { | ||||||
|  | 		t.Fatalf("Expected to wait at least 50ms, only waited %v", elapsed) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Drain the channel | ||||||
|  | 	<-ch | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func TestPubSub_Unsubscribe(t *testing.T) { | ||||||
|  | 	ps := NewPubSub[string](10) | ||||||
|  |  | ||||||
|  | 	// Add a subscriber | ||||||
|  | 	ch, sub := ps.SubscribeByChan("test-ns", 1) | ||||||
|  |  | ||||||
|  | 	// Publish a message | ||||||
|  | 	subs, receivers := ps.Publish("test-ns", "hello") | ||||||
|  | 	if subs != 1 { | ||||||
|  | 		t.Fatalf("Expected 1 subscriber, got %d", subs) | ||||||
|  | 	} | ||||||
|  | 	if receivers != 1 { | ||||||
|  | 		t.Fatalf("Expected 1 receiver, got %d", receivers) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Verify the message was received | ||||||
|  | 	select { | ||||||
|  | 	case msg := <-ch: | ||||||
|  | 		if msg != "hello" { | ||||||
|  | 			t.Fatalf("Expected to receive 'hello', got '%s'", msg) | ||||||
|  | 		} | ||||||
|  | 	case <-time.After(time.Second): | ||||||
|  | 		t.Fatal("Timed out waiting for message") | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Unsubscribe | ||||||
|  | 	sub.Unsubscribe() | ||||||
|  |  | ||||||
|  | 	// Publish again | ||||||
|  | 	subs, receivers = ps.Publish("test-ns", "after-unsub") | ||||||
|  | 	if subs != 0 { | ||||||
|  | 		t.Fatalf("Expected 0 subscribers after unsubscribe, got %d", subs) | ||||||
|  | 	} | ||||||
|  | 	if receivers != 0 { | ||||||
|  | 		t.Fatalf("Expected 0 receivers after unsubscribe, got %d", receivers) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Check that the namespace is removed | ||||||
|  | 	namespaces := ps.Namespaces() | ||||||
|  | 	if len(namespaces) != 0 { | ||||||
|  | 		t.Fatalf("Expected 0 namespaces after unsubscribe, got %d", len(namespaces)) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func TestPubSub_MultipleSubscribers(t *testing.T) { | ||||||
|  | 	ps := NewPubSub[string](10) | ||||||
|  |  | ||||||
|  | 	// Add multiple subscribers | ||||||
|  | 	ch1, sub1 := ps.SubscribeByChan("test-ns", 1) | ||||||
|  | 	defer sub1.Unsubscribe() | ||||||
|  |  | ||||||
|  | 	ch2, sub2 := ps.SubscribeByChan("test-ns", 1) | ||||||
|  | 	defer sub2.Unsubscribe() | ||||||
|  |  | ||||||
|  | 	var received string | ||||||
|  | 	var wg sync.WaitGroup | ||||||
|  | 	wg.Add(1) | ||||||
|  |  | ||||||
|  | 	sub3 := ps.SubscribeByCallback("test-ns", func(msg string) { | ||||||
|  | 		received = msg | ||||||
|  | 		wg.Done() | ||||||
|  | 	}) | ||||||
|  | 	defer sub3.Unsubscribe() | ||||||
|  |  | ||||||
|  | 	// Publish a message | ||||||
|  | 	subs, receivers := ps.Publish("test-ns", "hello") | ||||||
|  | 	if subs != 3 { | ||||||
|  | 		t.Fatalf("Expected 3 subscribers, got %d", subs) | ||||||
|  | 	} | ||||||
|  | 	if receivers != 3 { | ||||||
|  | 		t.Fatalf("Expected 3 receivers, got %d", receivers) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Verify the message was received by all subscribers | ||||||
|  | 	select { | ||||||
|  | 	case msg := <-ch1: | ||||||
|  | 		if msg != "hello" { | ||||||
|  | 			t.Fatalf("Expected ch1 to receive 'hello', got '%s'", msg) | ||||||
|  | 		} | ||||||
|  | 	case <-time.After(time.Second): | ||||||
|  | 		t.Fatal("Timed out waiting for message on ch1") | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	select { | ||||||
|  | 	case msg := <-ch2: | ||||||
|  | 		if msg != "hello" { | ||||||
|  | 			t.Fatalf("Expected ch2 to receive 'hello', got '%s'", msg) | ||||||
|  | 		} | ||||||
|  | 	case <-time.After(time.Second): | ||||||
|  | 		t.Fatal("Timed out waiting for message on ch2") | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Wait for the callback | ||||||
|  | 	wg.Wait() | ||||||
|  |  | ||||||
|  | 	if received != "hello" { | ||||||
|  | 		t.Fatalf("Expected callback to receive 'hello', got '%s'", received) | ||||||
|  | 	} | ||||||
|  | } | ||||||
							
								
								
									
										4
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										4
									
								
								go.mod
									
									
									
									
									
								
							| @@ -19,6 +19,7 @@ require ( | |||||||
| require ( | require ( | ||||||
| 	github.com/disintegration/imaging v1.6.2 | 	github.com/disintegration/imaging v1.6.2 | ||||||
| 	github.com/jung-kurt/gofpdf v1.16.2 | 	github.com/jung-kurt/gofpdf v1.16.2 | ||||||
|  | 	golang.org/x/net v0.42.0 | ||||||
| 	golang.org/x/sync v0.16.0 | 	golang.org/x/sync v0.16.0 | ||||||
| ) | ) | ||||||
|  |  | ||||||
| @@ -38,7 +39,7 @@ require ( | |||||||
| 	github.com/google/uuid v1.5.0 // indirect | 	github.com/google/uuid v1.5.0 // indirect | ||||||
| 	github.com/json-iterator/go v1.1.12 // indirect | 	github.com/json-iterator/go v1.1.12 // indirect | ||||||
| 	github.com/klauspost/compress v1.18.0 // indirect | 	github.com/klauspost/compress v1.18.0 // indirect | ||||||
| 	github.com/klauspost/cpuid/v2 v2.2.11 // indirect | 	github.com/klauspost/cpuid/v2 v2.3.0 // indirect | ||||||
| 	github.com/leodido/go-urn v1.4.0 // indirect | 	github.com/leodido/go-urn v1.4.0 // indirect | ||||||
| 	github.com/mattn/go-colorable v0.1.14 // indirect | 	github.com/mattn/go-colorable v0.1.14 // indirect | ||||||
| 	github.com/mattn/go-isatty v0.0.20 // indirect | 	github.com/mattn/go-isatty v0.0.20 // indirect | ||||||
| @@ -55,7 +56,6 @@ require ( | |||||||
| 	github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect | 	github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect | ||||||
| 	golang.org/x/arch v0.19.0 // indirect | 	golang.org/x/arch v0.19.0 // indirect | ||||||
| 	golang.org/x/image v0.29.0 // indirect | 	golang.org/x/image v0.29.0 // indirect | ||||||
| 	golang.org/x/net v0.42.0 // indirect |  | ||||||
| 	golang.org/x/text v0.27.0 // indirect | 	golang.org/x/text v0.27.0 // indirect | ||||||
| 	google.golang.org/protobuf v1.36.6 // indirect | 	google.golang.org/protobuf v1.36.6 // indirect | ||||||
| 	gopkg.in/yaml.v3 v3.0.1 // indirect | 	gopkg.in/yaml.v3 v3.0.1 // indirect | ||||||
|   | |||||||
							
								
								
									
										2
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								go.sum
									
									
									
									
									
								
							| @@ -132,6 +132,8 @@ github.com/klauspost/cpuid/v2 v2.2.10 h1:tBs3QSyvjDyFTq3uoc/9xFpCuOsJQFNPiAhYdw2 | |||||||
| github.com/klauspost/cpuid/v2 v2.2.10/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= | github.com/klauspost/cpuid/v2 v2.2.10/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= | ||||||
| github.com/klauspost/cpuid/v2 v2.2.11 h1:0OwqZRYI2rFrjS4kvkDnqJkKHdHaRnCm68/DY4OxRzU= | github.com/klauspost/cpuid/v2 v2.2.11 h1:0OwqZRYI2rFrjS4kvkDnqJkKHdHaRnCm68/DY4OxRzU= | ||||||
| github.com/klauspost/cpuid/v2 v2.2.11/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= | github.com/klauspost/cpuid/v2 v2.2.11/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= | ||||||
|  | github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y= | ||||||
|  | github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= | ||||||
| github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M= | github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M= | ||||||
| github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= | github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= | ||||||
| github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= | github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= | ||||||
|   | |||||||
| @@ -1,5 +1,5 @@ | |||||||
| package goext | package goext | ||||||
|  |  | ||||||
| const GoextVersion = "0.0.588" | const GoextVersion = "0.0.592" | ||||||
|  |  | ||||||
| const GoextVersionTimestamp = "2025-07-11T11:50:29+0200" | const GoextVersionTimestamp = "2025-07-16T12:46:18+0200" | ||||||
|   | |||||||
| @@ -10,7 +10,8 @@ type RFCTime interface { | |||||||
|  |  | ||||||
| 	After(u AnyTime) bool | 	After(u AnyTime) bool | ||||||
| 	Before(u AnyTime) bool | 	Before(u AnyTime) bool | ||||||
| 	Equal(u AnyTime) bool |  | ||||||
|  | 	EqualAny(u AnyTime) bool | ||||||
|  |  | ||||||
| 	Sub(u AnyTime) time.Duration | 	Sub(u AnyTime) time.Duration | ||||||
| } | } | ||||||
| @@ -49,45 +50,15 @@ type AnyTime interface { | |||||||
| } | } | ||||||
|  |  | ||||||
| type RFCDuration interface { | type RFCDuration interface { | ||||||
| 	Time() time.Time | 	Hours() float64 | ||||||
| 	Serialize() string | 	Minutes() float64 | ||||||
|  | 	Seconds() float64 | ||||||
| 	UnmarshalJSON(bytes []byte) error | 	Microseconds() int64 | ||||||
| 	MarshalJSON() ([]byte, error) | 	Milliseconds() int64 | ||||||
|  | 	Nanoseconds() int64 | ||||||
| 	MarshalBinary() ([]byte, error) |  | ||||||
| 	UnmarshalBinary(data []byte) error |  | ||||||
|  |  | ||||||
| 	GobEncode() ([]byte, error) |  | ||||||
| 	GobDecode(data []byte) error |  | ||||||
|  |  | ||||||
| 	MarshalText() ([]byte, error) |  | ||||||
| 	UnmarshalText(data []byte) error |  | ||||||
|  |  | ||||||
| 	After(u AnyTime) bool |  | ||||||
| 	Before(u AnyTime) bool |  | ||||||
| 	Equal(u AnyTime) bool |  | ||||||
| 	IsZero() bool |  | ||||||
| 	Date() (year int, month time.Month, day int) |  | ||||||
| 	Year() int |  | ||||||
| 	Month() time.Month |  | ||||||
| 	Day() int |  | ||||||
| 	Weekday() time.Weekday |  | ||||||
| 	ISOWeek() (year, week int) |  | ||||||
| 	Clock() (hour, min, sec int) |  | ||||||
| 	Hour() int |  | ||||||
| 	Minute() int |  | ||||||
| 	Second() int |  | ||||||
| 	Nanosecond() int |  | ||||||
| 	YearDay() int |  | ||||||
| 	Sub(u AnyTime) time.Duration |  | ||||||
| 	Unix() int64 |  | ||||||
| 	UnixMilli() int64 |  | ||||||
| 	UnixMicro() int64 |  | ||||||
| 	UnixNano() int64 |  | ||||||
| 	Format(layout string) string |  | ||||||
| 	GoString() string |  | ||||||
| 	String() string | 	String() string | ||||||
|  |  | ||||||
|  | 	Duration() time.Duration | ||||||
| } | } | ||||||
|  |  | ||||||
| func tt(v AnyTime) time.Time { | func tt(v AnyTime) time.Time { | ||||||
|   | |||||||
| @@ -4,11 +4,11 @@ import ( | |||||||
| 	"encoding/json" | 	"encoding/json" | ||||||
| 	"errors" | 	"errors" | ||||||
| 	"fmt" | 	"fmt" | ||||||
|  | 	"git.blackforestbytes.com/BlackForestBytes/goext/langext" | ||||||
| 	"go.mongodb.org/mongo-driver/bson" | 	"go.mongodb.org/mongo-driver/bson" | ||||||
| 	"go.mongodb.org/mongo-driver/bson/bsoncodec" | 	"go.mongodb.org/mongo-driver/bson/bsoncodec" | ||||||
| 	"go.mongodb.org/mongo-driver/bson/bsonrw" | 	"go.mongodb.org/mongo-driver/bson/bsonrw" | ||||||
| 	"go.mongodb.org/mongo-driver/bson/bsontype" | 	"go.mongodb.org/mongo-driver/bson/bsontype" | ||||||
| 	"git.blackforestbytes.com/BlackForestBytes/goext/langext" |  | ||||||
| 	"reflect" | 	"reflect" | ||||||
| 	"time" | 	"time" | ||||||
| ) | ) | ||||||
| @@ -142,7 +142,14 @@ func (t RFC3339Time) Before(u AnyTime) bool { | |||||||
| 	return t.Time().Before(tt(u)) | 	return t.Time().Before(tt(u)) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (t RFC3339Time) Equal(u AnyTime) bool { | func (t RFC3339Time) Equal(u RFC3339Time) bool { | ||||||
|  | 	return t.Time().Equal(u.Time()) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (t RFC3339Time) EqualAny(u AnyTime) bool { | ||||||
|  | 	if u == nil { | ||||||
|  | 		return false | ||||||
|  | 	} | ||||||
| 	return t.Time().Equal(tt(u)) | 	return t.Time().Equal(tt(u)) | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -4,11 +4,11 @@ import ( | |||||||
| 	"encoding/json" | 	"encoding/json" | ||||||
| 	"errors" | 	"errors" | ||||||
| 	"fmt" | 	"fmt" | ||||||
|  | 	"git.blackforestbytes.com/BlackForestBytes/goext/langext" | ||||||
| 	"go.mongodb.org/mongo-driver/bson" | 	"go.mongodb.org/mongo-driver/bson" | ||||||
| 	"go.mongodb.org/mongo-driver/bson/bsoncodec" | 	"go.mongodb.org/mongo-driver/bson/bsoncodec" | ||||||
| 	"go.mongodb.org/mongo-driver/bson/bsonrw" | 	"go.mongodb.org/mongo-driver/bson/bsonrw" | ||||||
| 	"go.mongodb.org/mongo-driver/bson/bsontype" | 	"go.mongodb.org/mongo-driver/bson/bsontype" | ||||||
| 	"git.blackforestbytes.com/BlackForestBytes/goext/langext" |  | ||||||
| 	"reflect" | 	"reflect" | ||||||
| 	"time" | 	"time" | ||||||
| ) | ) | ||||||
| @@ -142,7 +142,14 @@ func (t RFC3339NanoTime) Before(u AnyTime) bool { | |||||||
| 	return t.Time().Before(tt(u)) | 	return t.Time().Before(tt(u)) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (t RFC3339NanoTime) Equal(u AnyTime) bool { | func (t RFC3339NanoTime) Equal(u RFC3339NanoTime) bool { | ||||||
|  | 	return t.Time().Equal(u.Time()) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (t RFC3339NanoTime) EqualAny(u AnyTime) bool { | ||||||
|  | 	if u == nil { | ||||||
|  | 		return false | ||||||
|  | 	} | ||||||
| 	return t.Time().Equal(tt(u)) | 	return t.Time().Equal(tt(u)) | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -41,7 +41,7 @@ func TestRoundtrip(t *testing.T) { | |||||||
|  |  | ||||||
| 	tst.AssertEqual(t, string(jstr1), string(jstr2)) | 	tst.AssertEqual(t, string(jstr1), string(jstr2)) | ||||||
|  |  | ||||||
| 	if !w1.Value.Equal(&w2.Value) { | 	if !w1.Value.EqualAny(&w2.Value) { | ||||||
| 		t.Errorf("time differs") | 		t.Errorf("time differs") | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|   | |||||||
| @@ -4,11 +4,11 @@ import ( | |||||||
| 	"encoding/json" | 	"encoding/json" | ||||||
| 	"errors" | 	"errors" | ||||||
| 	"fmt" | 	"fmt" | ||||||
|  | 	"git.blackforestbytes.com/BlackForestBytes/goext/timeext" | ||||||
| 	"go.mongodb.org/mongo-driver/bson" | 	"go.mongodb.org/mongo-driver/bson" | ||||||
| 	"go.mongodb.org/mongo-driver/bson/bsoncodec" | 	"go.mongodb.org/mongo-driver/bson/bsoncodec" | ||||||
| 	"go.mongodb.org/mongo-driver/bson/bsonrw" | 	"go.mongodb.org/mongo-driver/bson/bsonrw" | ||||||
| 	"go.mongodb.org/mongo-driver/bson/bsontype" | 	"go.mongodb.org/mongo-driver/bson/bsontype" | ||||||
| 	"git.blackforestbytes.com/BlackForestBytes/goext/timeext" |  | ||||||
| 	"reflect" | 	"reflect" | ||||||
| 	"time" | 	"time" | ||||||
| ) | ) | ||||||
|   | |||||||
| @@ -4,11 +4,11 @@ import ( | |||||||
| 	"encoding/json" | 	"encoding/json" | ||||||
| 	"errors" | 	"errors" | ||||||
| 	"fmt" | 	"fmt" | ||||||
|  | 	"git.blackforestbytes.com/BlackForestBytes/goext/langext" | ||||||
| 	"go.mongodb.org/mongo-driver/bson" | 	"go.mongodb.org/mongo-driver/bson" | ||||||
| 	"go.mongodb.org/mongo-driver/bson/bsoncodec" | 	"go.mongodb.org/mongo-driver/bson/bsoncodec" | ||||||
| 	"go.mongodb.org/mongo-driver/bson/bsonrw" | 	"go.mongodb.org/mongo-driver/bson/bsonrw" | ||||||
| 	"go.mongodb.org/mongo-driver/bson/bsontype" | 	"go.mongodb.org/mongo-driver/bson/bsontype" | ||||||
| 	"git.blackforestbytes.com/BlackForestBytes/goext/langext" |  | ||||||
| 	"reflect" | 	"reflect" | ||||||
| 	"strconv" | 	"strconv" | ||||||
| 	"time" | 	"time" | ||||||
| @@ -136,7 +136,14 @@ func (t UnixTime) Before(u AnyTime) bool { | |||||||
| 	return t.Time().Before(tt(u)) | 	return t.Time().Before(tt(u)) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (t UnixTime) Equal(u AnyTime) bool { | func (t UnixTime) Equal(u UnixTime) bool { | ||||||
|  | 	return t.Time().Equal(u.Time()) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (t UnixTime) EqualAny(u AnyTime) bool { | ||||||
|  | 	if u == nil { | ||||||
|  | 		return false | ||||||
|  | 	} | ||||||
| 	return t.Time().Equal(tt(u)) | 	return t.Time().Equal(tt(u)) | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -4,11 +4,11 @@ import ( | |||||||
| 	"encoding/json" | 	"encoding/json" | ||||||
| 	"errors" | 	"errors" | ||||||
| 	"fmt" | 	"fmt" | ||||||
|  | 	"git.blackforestbytes.com/BlackForestBytes/goext/langext" | ||||||
| 	"go.mongodb.org/mongo-driver/bson" | 	"go.mongodb.org/mongo-driver/bson" | ||||||
| 	"go.mongodb.org/mongo-driver/bson/bsoncodec" | 	"go.mongodb.org/mongo-driver/bson/bsoncodec" | ||||||
| 	"go.mongodb.org/mongo-driver/bson/bsonrw" | 	"go.mongodb.org/mongo-driver/bson/bsonrw" | ||||||
| 	"go.mongodb.org/mongo-driver/bson/bsontype" | 	"go.mongodb.org/mongo-driver/bson/bsontype" | ||||||
| 	"git.blackforestbytes.com/BlackForestBytes/goext/langext" |  | ||||||
| 	"reflect" | 	"reflect" | ||||||
| 	"strconv" | 	"strconv" | ||||||
| 	"time" | 	"time" | ||||||
| @@ -136,7 +136,14 @@ func (t UnixMilliTime) Before(u AnyTime) bool { | |||||||
| 	return t.Time().Before(tt(u)) | 	return t.Time().Before(tt(u)) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (t UnixMilliTime) Equal(u AnyTime) bool { | func (t UnixMilliTime) Equal(u UnixMilliTime) bool { | ||||||
|  | 	return t.Time().Equal(u.Time()) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (t UnixMilliTime) EqualAny(u AnyTime) bool { | ||||||
|  | 	if u == nil { | ||||||
|  | 		return false | ||||||
|  | 	} | ||||||
| 	return t.Time().Equal(tt(u)) | 	return t.Time().Equal(tt(u)) | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -4,11 +4,11 @@ import ( | |||||||
| 	"encoding/json" | 	"encoding/json" | ||||||
| 	"errors" | 	"errors" | ||||||
| 	"fmt" | 	"fmt" | ||||||
|  | 	"git.blackforestbytes.com/BlackForestBytes/goext/langext" | ||||||
| 	"go.mongodb.org/mongo-driver/bson" | 	"go.mongodb.org/mongo-driver/bson" | ||||||
| 	"go.mongodb.org/mongo-driver/bson/bsoncodec" | 	"go.mongodb.org/mongo-driver/bson/bsoncodec" | ||||||
| 	"go.mongodb.org/mongo-driver/bson/bsonrw" | 	"go.mongodb.org/mongo-driver/bson/bsonrw" | ||||||
| 	"go.mongodb.org/mongo-driver/bson/bsontype" | 	"go.mongodb.org/mongo-driver/bson/bsontype" | ||||||
| 	"git.blackforestbytes.com/BlackForestBytes/goext/langext" |  | ||||||
| 	"reflect" | 	"reflect" | ||||||
| 	"strconv" | 	"strconv" | ||||||
| 	"time" | 	"time" | ||||||
| @@ -136,7 +136,14 @@ func (t UnixNanoTime) Before(u AnyTime) bool { | |||||||
| 	return t.Time().Before(tt(u)) | 	return t.Time().Before(tt(u)) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (t UnixNanoTime) Equal(u AnyTime) bool { | func (t UnixNanoTime) Equal(u UnixNanoTime) bool { | ||||||
|  | 	return t.Time().Equal(u.Time()) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (t UnixNanoTime) EqualAny(u AnyTime) bool { | ||||||
|  | 	if u == nil { | ||||||
|  | 		return false | ||||||
|  | 	} | ||||||
| 	return t.Time().Equal(tt(u)) | 	return t.Time().Equal(tt(u)) | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -1,6 +1,7 @@ | |||||||
| package syncext | package syncext | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
|  | 	"golang.org/x/net/context" | ||||||
| 	"time" | 	"time" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| @@ -26,6 +27,15 @@ func WriteChannelWithTimeout[T any](c chan T, msg T, timeout time.Duration) bool | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func WriteChannelWithContext[T any](ctx context.Context, c chan T, msg T) error { | ||||||
|  | 	select { | ||||||
|  | 	case c <- msg: | ||||||
|  | 		return nil | ||||||
|  | 	case <-ctx.Done(): | ||||||
|  | 		return ctx.Err() | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
| func ReadNonBlocking[T any](c chan T) (T, bool) { | func ReadNonBlocking[T any](c chan T) (T, bool) { | ||||||
| 	select { | 	select { | ||||||
| 	case msg := <-c: | 	case msg := <-c: | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user