From 1edc2712ed98725040925dbd381728de8ee2fd1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mike=20Schw=C3=B6rer?= Date: Mon, 13 Apr 2026 16:08:28 +0200 Subject: [PATCH] v0.0.632 Pubsub.PublishAsync --- dataext/pubsub.go | 32 +++++++++++++++++++++++++++++--- goextVersion.go | 4 ++-- 2 files changed, 31 insertions(+), 5 deletions(-) diff --git a/dataext/pubsub.go b/dataext/pubsub.go index 47d5a89..a08ebc1 100644 --- a/dataext/pubsub.go +++ b/dataext/pubsub.go @@ -2,12 +2,13 @@ package dataext import ( "context" - "git.blackforestbytes.com/BlackForestBytes/goext/langext" - "git.blackforestbytes.com/BlackForestBytes/goext/syncext" - "github.com/rs/xid" "iter" "sync" "time" + + "git.blackforestbytes.com/BlackForestBytes/goext/langext" + "git.blackforestbytes.com/BlackForestBytes/goext/syncext" + "github.com/rs/xid" ) // PubSub is a simple Pub/Sub Broker @@ -162,6 +163,31 @@ func (ps *PubSub[TNamespace, TData]) PublishWithTimeout(ns TNamespace, data TDat return subscriber, actualReceiver } +// PublishAsync sends `data` to all subscriber +// does not wait for subscriber (this method returns immediately), waits at most {timeout} seconds on channels (async) +func (ps *PubSub[TNamespace, TData]) PublishAsync(ns TNamespace, data TData, timeout time.Duration) (subscriber int) { + ps.masterLock.Lock() + subs := langext.ArrCopy(ps.subscriptions[ns]) + ps.masterLock.Unlock() + + subscriber = len(subs) + + for _, sub := range subs { + func() { + sub.subLock.Lock() + defer sub.subLock.Unlock() + + if sub.Func != nil { + go func() { sub.Func(data) }() + } else if sub.Chan != nil { + go func() { _ = syncext.WriteChannelWithTimeout(sub.Chan, data, timeout) }() + } + }() + } + + return subscriber +} + func (ps *PubSub[TNamespace, TData]) SubscribeByCallback(ns TNamespace, fn func(TData)) PubSubSubscription { ps.masterLock.Lock() defer ps.masterLock.Unlock() diff --git a/goextVersion.go b/goextVersion.go index 4e03b15..d82fb52 100644 --- a/goextVersion.go +++ b/goextVersion.go @@ -1,5 +1,5 @@ package goext -const GoextVersion = "0.0.631" +const GoextVersion = "0.0.632" -const GoextVersionTimestamp = "2026-03-14T15:14:38+0100" +const GoextVersionTimestamp = "2026-04-13T16:07:00+0100"