Compare commits

...

4 Commits

Author SHA1 Message Date
a70ab33559 v0.0.601 Add Wait and Update method to Atomic[T]
Some checks failed
Build Docker and Deploy / Run goext test-suite (push) Failing after 1m34s
2025-09-13 19:04:52 +02:00
a58bb4b14b v0.0.600
Some checks failed
Build Docker and Deploy / Run goext test-suite (push) Failing after 2m30s
2025-09-13 18:45:23 +02:00
dc62bbe55f v0.0.599 implement dataext.broadcaster
Some checks failed
Build Docker and Deploy / Run goext test-suite (push) Failing after 1m37s
2025-09-13 18:42:17 +02:00
b832d77d3e v0.0.598 prevent json marshalling of PassHash
Some checks failed
Build Docker and Deploy / Run goext test-suite (push) Failing after 2m35s
2025-09-11 11:17:34 +02:00
9 changed files with 756 additions and 22 deletions

6
.idea/copilot.data.migration.agent.xml generated Normal file
View File

@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="AgentMigrationStateService">
<option name="migrationStatus" value="COMPLETED" />
</component>
</project>

6
.idea/copilot.data.migration.edit.xml generated Normal file
View File

@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="EditMigrationStateService">
<option name="migrationStatus" value="COMPLETED" />
</component>
</project>

View File

@@ -6,13 +6,15 @@ import (
"crypto/sha512" "crypto/sha512"
"encoding/base64" "encoding/base64"
"encoding/hex" "encoding/hex"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"strconv"
"strings"
"git.blackforestbytes.com/BlackForestBytes/goext/langext" "git.blackforestbytes.com/BlackForestBytes/goext/langext"
"git.blackforestbytes.com/BlackForestBytes/goext/totpext" "git.blackforestbytes.com/BlackForestBytes/goext/totpext"
"golang.org/x/crypto/bcrypt" "golang.org/x/crypto/bcrypt"
"strconv"
"strings"
) )
const LatestPassHashVersion = 5 const LatestPassHashVersion = 5
@@ -317,6 +319,13 @@ func (ph PassHash) String() string {
return string(ph) return string(ph)
} }
func (ph PassHash) MarshalJSON() ([]byte, error) {
if ph == "" {
return json.Marshal("")
}
return json.Marshal("*****")
}
func HashPassword(plainpass string, totpSecret []byte) (PassHash, error) { func HashPassword(plainpass string, totpSecret []byte) (PassHash, error) {
return HashPasswordV5(plainpass, totpSecret) return HashPasswordV5(plainpass, totpSecret)
} }

230
dataext/broadcaster.go Normal file
View File

@@ -0,0 +1,230 @@
package dataext
import (
"context"
"iter"
"sync"
"time"
"git.blackforestbytes.com/BlackForestBytes/goext/langext"
"git.blackforestbytes.com/BlackForestBytes/goext/syncext"
"github.com/rs/xid"
)
// Broadcaster is a simple Broadcaster channel
// This is a simpler interface over Broadcaster - which does not have distinct namespaces
type Broadcaster[TData any] struct {
masterLock *sync.Mutex
subscriptions []*broadcastSubscription[TData]
}
type BroadcastSubscription interface {
Unsubscribe()
}
type broadcastSubscription[TData any] struct {
ID string
parent *Broadcaster[TData]
subLock *sync.Mutex
Func func(TData)
Chan chan TData
UnsubChan chan bool
}
func (p *broadcastSubscription[TData]) Unsubscribe() {
p.parent.unsubscribe(p)
}
func NewBroadcaster[TData any](capacity int) *Broadcaster[TData] {
return &Broadcaster[TData]{
masterLock: &sync.Mutex{},
subscriptions: make([]*broadcastSubscription[TData], 0, capacity),
}
}
func (bb *Broadcaster[TData]) SubscriberCount() int {
bb.masterLock.Lock()
defer bb.masterLock.Unlock()
return len(bb.subscriptions)
}
// Publish sends `data` to all subscriber
// But unbuffered - if one is currently not listening, we skip (the actualReceiver < subscriber)
func (bb *Broadcaster[TData]) Publish(data TData) (subscriber int, actualReceiver int) {
bb.masterLock.Lock()
subs := langext.ArrCopy(bb.subscriptions)
bb.masterLock.Unlock()
subscriber = len(subs)
actualReceiver = 0
for _, sub := range subs {
func() {
sub.subLock.Lock()
defer sub.subLock.Unlock()
if sub.Func != nil {
go func() { sub.Func(data) }()
actualReceiver++
} else if sub.Chan != nil {
msgSent := syncext.WriteNonBlocking(sub.Chan, data)
if msgSent {
actualReceiver++
}
}
}()
}
return subscriber, actualReceiver
}
// PublishWithContext sends `data` to all subscriber
// buffered - if one is currently not listening, we wait (but error out when the context runs out)
func (bb *Broadcaster[TData]) PublishWithContext(ctx context.Context, data TData) (subscriber int, actualReceiver int, err error) {
bb.masterLock.Lock()
subs := langext.ArrCopy(bb.subscriptions)
bb.masterLock.Unlock()
subscriber = len(subs)
actualReceiver = 0
for _, sub := range subs {
err := func() error {
sub.subLock.Lock()
defer sub.subLock.Unlock()
if err := ctx.Err(); err != nil {
return err
}
if sub.Func != nil {
go func() { sub.Func(data) }()
actualReceiver++
} else if sub.Chan != nil {
err := syncext.WriteChannelWithContext(ctx, sub.Chan, data)
if err != nil {
return err
}
actualReceiver++
}
return nil
}()
if err != nil {
return subscriber, actualReceiver, err
}
}
return subscriber, actualReceiver, nil
}
// PublishWithTimeout sends `data` to all subscriber
// buffered - if one is currently not listening, we wait (but wait at most `timeout` - if the timeout is exceeded then actualReceiver < subscriber)
func (bb *Broadcaster[TData]) PublishWithTimeout(data TData, timeout time.Duration) (subscriber int, actualReceiver int) {
bb.masterLock.Lock()
subs := langext.ArrCopy(bb.subscriptions)
bb.masterLock.Unlock()
subscriber = len(subs)
actualReceiver = 0
for _, sub := range subs {
func() {
sub.subLock.Lock()
defer sub.subLock.Unlock()
if sub.Func != nil {
go func() { sub.Func(data) }()
actualReceiver++
} else if sub.Chan != nil {
ok := syncext.WriteChannelWithTimeout(sub.Chan, data, timeout)
if ok {
actualReceiver++
}
}
}()
}
return subscriber, actualReceiver
}
func (bb *Broadcaster[TData]) SubscribeByCallback(fn func(TData)) BroadcastSubscription {
bb.masterLock.Lock()
defer bb.masterLock.Unlock()
sub := &broadcastSubscription[TData]{ID: xid.New().String(), parent: bb, subLock: &sync.Mutex{}, Func: fn, UnsubChan: nil}
bb.subscriptions = append(bb.subscriptions, sub)
return sub
}
func (bb *Broadcaster[TData]) SubscribeByChan(chanBufferSize int) (chan TData, BroadcastSubscription) {
bb.masterLock.Lock()
defer bb.masterLock.Unlock()
msgCh := make(chan TData, chanBufferSize)
sub := &broadcastSubscription[TData]{ID: xid.New().String(), parent: bb, subLock: &sync.Mutex{}, Chan: msgCh, UnsubChan: nil}
bb.subscriptions = append(bb.subscriptions, sub)
return msgCh, sub
}
func (bb *Broadcaster[TData]) SubscribeByIter(chanBufferSize int) (iter.Seq[TData], BroadcastSubscription) {
bb.masterLock.Lock()
defer bb.masterLock.Unlock()
msgCh := make(chan TData, chanBufferSize)
unsubChan := make(chan bool, 8)
sub := &broadcastSubscription[TData]{ID: xid.New().String(), parent: bb, subLock: &sync.Mutex{}, Chan: msgCh, UnsubChan: unsubChan}
bb.subscriptions = append(bb.subscriptions, sub)
iterFun := func(yield func(TData) bool) {
for {
select {
case msg := <-msgCh:
if !yield(msg) {
sub.Unsubscribe()
return
}
case <-sub.UnsubChan:
sub.Unsubscribe()
return
}
}
}
return iterFun, sub
}
func (bb *Broadcaster[TData]) unsubscribe(p *broadcastSubscription[TData]) {
bb.masterLock.Lock()
defer bb.masterLock.Unlock()
p.subLock.Lock()
defer p.subLock.Unlock()
if p.Chan != nil {
close(p.Chan)
p.Chan = nil
}
if p.UnsubChan != nil {
syncext.WriteNonBlocking(p.UnsubChan, true)
close(p.UnsubChan)
p.UnsubChan = nil
}
bb.subscriptions = langext.ArrFilter(bb.subscriptions, func(v *broadcastSubscription[TData]) bool {
return v.ID != p.ID
})
}

332
dataext/broadcaster_test.go Normal file
View File

@@ -0,0 +1,332 @@
package dataext
import (
"context"
"sync"
"testing"
"time"
)
func TestNewBroadcast(t *testing.T) {
bb := NewBroadcaster[string](10)
if bb == nil {
t.Fatal("NewBroadcaster returned nil")
}
if bb.masterLock == nil {
t.Fatal("masterLock is nil")
}
if bb.subscriptions == nil {
t.Fatal("subscriptions is nil")
}
}
func TestBroadcast_SubscribeByCallback(t *testing.T) {
bb := NewBroadcaster[string](10)
var received string
var wg sync.WaitGroup
wg.Add(1)
callback := func(msg string) {
received = msg
wg.Done()
}
sub := bb.SubscribeByCallback(callback)
defer sub.Unsubscribe()
// Publish a message
subs, receivers := bb.Publish("hello")
if subs != 1 {
t.Fatalf("Expected 1 subscriber, got %d", subs)
}
if receivers != 1 {
t.Fatalf("Expected 1 receiver, got %d", receivers)
}
// Wait for the callback to be executed
wg.Wait()
if received != "hello" {
t.Fatalf("Expected to receive 'hello', got '%s'", received)
}
}
func TestBroadcast_SubscribeByChan(t *testing.T) {
bb := NewBroadcaster[string](10)
ch, sub := bb.SubscribeByChan(1)
defer sub.Unsubscribe()
// Publish a message
subs, receivers := bb.Publish("hello")
if subs != 1 {
t.Fatalf("Expected 1 subscriber, got %d", subs)
}
if receivers != 1 {
t.Fatalf("Expected 1 receiver, got %d", receivers)
}
// Read from the channel with a timeout to avoid blocking
select {
case msg := <-ch:
if msg != "hello" {
t.Fatalf("Expected to receive 'hello', got '%s'", msg)
}
case <-time.After(time.Second):
t.Fatal("Timed out waiting for message")
}
}
func TestBroadcast_SubscribeByIter(t *testing.T) {
bb := NewBroadcaster[string](10)
iterSeq, sub := bb.SubscribeByIter(1)
defer sub.Unsubscribe()
// Channel to communicate when message is received
done := make(chan bool)
received := false
// Start a goroutine to use the iterator
go func() {
for msg := range iterSeq {
if msg == "hello" {
received = true
done <- true
return // Stop iteration
}
}
}()
// Give time for the iterator to start
time.Sleep(100 * time.Millisecond)
// Publish a message
bb.Publish("hello")
// Wait for the message to be received or timeout
select {
case <-done:
if !received {
t.Fatal("Message was received but not 'hello'")
}
case <-time.After(time.Second):
t.Fatal("Timed out waiting for message")
}
subCount := bb.SubscriberCount()
if subCount != 0 {
t.Fatalf("Expected 0 receivers, got %d", subCount)
}
}
func TestBroadcast_Publish(t *testing.T) {
bb := NewBroadcaster[string](10)
// Test publishing with no subscribers
subs, receivers := bb.Publish("hello")
if subs != 0 {
t.Fatalf("Expected 0 subscribers, got %d", subs)
}
if receivers != 0 {
t.Fatalf("Expected 0 receivers, got %d", receivers)
}
// Add a subscriber
ch, sub := bb.SubscribeByChan(1)
defer sub.Unsubscribe()
// Publish a message
subs, receivers = bb.Publish("hello")
if subs != 1 {
t.Fatalf("Expected 1 subscriber, got %d", subs)
}
if receivers != 1 {
t.Fatalf("Expected 1 receiver, got %d", receivers)
}
// Verify the message was received
select {
case msg := <-ch:
if msg != "hello" {
t.Fatalf("Expected to receive 'hello', got '%s'", msg)
}
case <-time.After(time.Second):
t.Fatal("Timed out waiting for message")
}
// Test non-blocking behavior with a full channel
// First fill the channel
bb.Publish("fill")
// Now publish again - this should not block but may skip the receiver
subs, receivers = bb.Publish("overflow")
if subs != 1 {
t.Fatalf("Expected 1 subscriber, got %d", subs)
}
_ = receivers // may be 0 if channel is full
// Drain the channel
<-ch
}
func TestBroadcast_PublishWithTimeout(t *testing.T) {
bb := NewBroadcaster[string](10)
// Add a subscriber with a channel
ch, sub := bb.SubscribeByChan(1)
defer sub.Unsubscribe()
// Publish with a timeout
subs, receivers := bb.PublishWithTimeout("hello", 100*time.Millisecond)
if subs != 1 {
t.Fatalf("Expected 1 subscriber, got %d", subs)
}
if receivers != 1 {
t.Fatalf("Expected 1 receiver, got %d", receivers)
}
// Verify the message was received
select {
case msg := <-ch:
if msg != "hello" {
t.Fatalf("Expected to receive 'hello', got '%s'", msg)
}
case <-time.After(time.Second):
t.Fatal("Timed out waiting for message")
}
// Fill the channel
bb.Publish("fill")
// Test timeout behavior with a full channel
start := time.Now()
subs, receivers = bb.PublishWithTimeout("timeout-test", 50*time.Millisecond)
elapsed := time.Since(start)
if subs != 1 {
t.Fatalf("Expected 1 subscriber, got %d", subs)
}
// The receiver count should be 0 if the timeout occurred
if elapsed < 50*time.Millisecond {
t.Fatalf("Expected to wait at least 50ms, only waited %v", elapsed)
}
// Drain the channel
<-ch
}
func TestBroadcast_PublishWithContext(t *testing.T) {
bb := NewBroadcaster[string](10)
// Add a subscriber with a channel
ch, sub := bb.SubscribeByChan(1)
defer sub.Unsubscribe()
// Create a context
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
// Publish with context
subs, receivers, err := bb.PublishWithContext(ctx, "hello")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if subs != 1 {
t.Fatalf("Expected 1 subscriber, got %d", subs)
}
if receivers != 1 {
t.Fatalf("Expected 1 receiver, got %d", receivers)
}
// Verify the message was received
select {
case msg := <-ch:
if msg != "hello" {
t.Fatalf("Expected to receive 'hello', got '%s'", msg)
}
case <-time.After(time.Second):
t.Fatal("Timed out waiting for message")
}
// Fill the channel
bb.Publish("fill")
// Test context cancellation with a full channel
ctx, cancel = context.WithCancel(context.Background())
// Cancel the context after a short delay
go func() {
time.Sleep(50 * time.Millisecond)
cancel()
}()
start := time.Now()
subs, receivers, err = bb.PublishWithContext(ctx, "context-test")
elapsed := time.Since(start)
if subs != 1 {
t.Fatalf("Expected 1 subscriber, got %d", subs)
}
// Should get a context canceled error
if err == nil {
t.Fatal("Expected context canceled error, got nil")
}
if elapsed < 50*time.Millisecond {
t.Fatalf("Expected to wait at least 50ms, only waited %v", elapsed)
}
// Drain the channel
<-ch
}
func TestBroadcast_Unsubscribe(t *testing.T) {
bb := NewBroadcaster[string](10)
// Add a subscriber
ch, sub := bb.SubscribeByChan(1)
// Publish a message
subs, receivers := bb.Publish("hello")
if subs != 1 {
t.Fatalf("Expected 1 subscriber, got %d", subs)
}
if receivers != 1 {
t.Fatalf("Expected 1 receiver, got %d", receivers)
}
// Verify the message was received
select {
case msg := <-ch:
if msg != "hello" {
t.Fatalf("Expected to receive 'hello', got '%s'", msg)
}
case <-time.After(time.Second):
t.Fatal("Timed out waiting for message")
}
// Unsubscribe
sub.Unsubscribe()
// Publish again
subs, receivers = bb.Publish("after-unsub")
if subs != 0 {
t.Fatalf("Expected 0 subscribers after unsubscribe, got %d", subs)
}
if receivers != 0 {
t.Fatalf("Expected 0 receivers after unsubscribe, got %d", receivers)
}
// Check that the subscriber count is 0
if bb.SubscriberCount() != 0 {
t.Fatalf("Expected SubscriberCount() == 0, got %d", bb.SubscriberCount())
}
}

22
go.mod
View File

@@ -1,8 +1,6 @@
module git.blackforestbytes.com/BlackForestBytes/goext module git.blackforestbytes.com/BlackForestBytes/goext
go 1.23.0 go 1.24.0
toolchain go1.24.0
require ( require (
github.com/gin-gonic/gin v1.10.1 github.com/gin-gonic/gin v1.10.1
@@ -11,16 +9,16 @@ require (
github.com/rs/xid v1.6.0 github.com/rs/xid v1.6.0
github.com/rs/zerolog v1.34.0 github.com/rs/zerolog v1.34.0
go.mongodb.org/mongo-driver v1.17.4 go.mongodb.org/mongo-driver v1.17.4
golang.org/x/crypto v0.41.0 golang.org/x/crypto v0.42.0
golang.org/x/sys v0.35.0 golang.org/x/sys v0.36.0
golang.org/x/term v0.34.0 golang.org/x/term v0.35.0
) )
require ( require (
github.com/disintegration/imaging v1.6.2 github.com/disintegration/imaging v1.6.2
github.com/jung-kurt/gofpdf v1.16.2 github.com/jung-kurt/gofpdf v1.16.2
golang.org/x/net v0.43.0 golang.org/x/net v0.44.0
golang.org/x/sync v0.16.0 golang.org/x/sync v0.17.0
) )
require ( require (
@@ -55,10 +53,10 @@ require (
github.com/xdg-go/scram v1.1.2 // indirect github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect
golang.org/x/arch v0.20.0 // indirect golang.org/x/arch v0.21.0 // indirect
golang.org/x/image v0.30.0 // indirect golang.org/x/image v0.31.0 // indirect
golang.org/x/text v0.28.0 // indirect golang.org/x/text v0.29.0 // indirect
google.golang.org/protobuf v1.36.8 // indirect google.golang.org/protobuf v1.36.9 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect
modernc.org/libc v1.37.6 // indirect modernc.org/libc v1.37.6 // indirect
modernc.org/mathutil v1.6.0 // indirect modernc.org/mathutil v1.6.0 // indirect

18
go.sum
View File

@@ -244,6 +244,8 @@ golang.org/x/arch v0.19.0 h1:LmbDQUodHThXE+htjrnmVD73M//D9GTH6wFZjyDkjyU=
golang.org/x/arch v0.19.0/go.mod h1:bdwinDaKcfZUGpH09BB7ZmOfhalA8lQdzl62l8gGWsk= golang.org/x/arch v0.19.0/go.mod h1:bdwinDaKcfZUGpH09BB7ZmOfhalA8lQdzl62l8gGWsk=
golang.org/x/arch v0.20.0 h1:dx1zTU0MAE98U+TQ8BLl7XsJbgze2WnNKF/8tGp/Q6c= golang.org/x/arch v0.20.0 h1:dx1zTU0MAE98U+TQ8BLl7XsJbgze2WnNKF/8tGp/Q6c=
golang.org/x/arch v0.20.0/go.mod h1:bdwinDaKcfZUGpH09BB7ZmOfhalA8lQdzl62l8gGWsk= golang.org/x/arch v0.20.0/go.mod h1:bdwinDaKcfZUGpH09BB7ZmOfhalA8lQdzl62l8gGWsk=
golang.org/x/arch v0.21.0 h1:iTC9o7+wP6cPWpDWkivCvQFGAHDQ59SrSxsLPcnkArw=
golang.org/x/arch v0.21.0/go.mod h1:dNHoOeKiyja7GTvF9NJS1l3Z2yntpQNzgrjh1cU103A=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw= golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw=
@@ -272,6 +274,8 @@ golang.org/x/crypto v0.40.0 h1:r4x+VvoG5Fm+eJcxMaY8CQM7Lb0l1lsmjGBQ6s8BfKM=
golang.org/x/crypto v0.40.0/go.mod h1:Qr1vMER5WyS2dfPHAlsOj01wgLbsyWtFn/aY+5+ZdxY= golang.org/x/crypto v0.40.0/go.mod h1:Qr1vMER5WyS2dfPHAlsOj01wgLbsyWtFn/aY+5+ZdxY=
golang.org/x/crypto v0.41.0 h1:WKYxWedPGCTVVl5+WHSSrOBT0O8lx32+zxmHxijgXp4= golang.org/x/crypto v0.41.0 h1:WKYxWedPGCTVVl5+WHSSrOBT0O8lx32+zxmHxijgXp4=
golang.org/x/crypto v0.41.0/go.mod h1:pO5AFd7FA68rFak7rOAGVuygIISepHftHnr8dr6+sUc= golang.org/x/crypto v0.41.0/go.mod h1:pO5AFd7FA68rFak7rOAGVuygIISepHftHnr8dr6+sUc=
golang.org/x/crypto v0.42.0 h1:chiH31gIWm57EkTXpwnqf8qeuMUi0yekh6mT2AvFlqI=
golang.org/x/crypto v0.42.0/go.mod h1:4+rDnOTJhQCx2q7/j6rAN5XDw8kPjeaXEUR2eL94ix8=
golang.org/x/image v0.0.0-20190910094157-69e4b8554b2a/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/image v0.0.0-20190910094157-69e4b8554b2a/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/image v0.0.0-20191009234506-e7c1f5e7dbb8/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/image v0.0.0-20191009234506-e7c1f5e7dbb8/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/image v0.21.0 h1:c5qV36ajHpdj4Qi0GnE0jUc/yuo33OLFaa0d+crTD5s= golang.org/x/image v0.21.0 h1:c5qV36ajHpdj4Qi0GnE0jUc/yuo33OLFaa0d+crTD5s=
@@ -294,6 +298,8 @@ golang.org/x/image v0.29.0 h1:HcdsyR4Gsuys/Axh0rDEmlBmB68rW1U9BUdB3UVHsas=
golang.org/x/image v0.29.0/go.mod h1:RVJROnf3SLK8d26OW91j4FrIHGbsJ8QnbEocVTOWQDA= golang.org/x/image v0.29.0/go.mod h1:RVJROnf3SLK8d26OW91j4FrIHGbsJ8QnbEocVTOWQDA=
golang.org/x/image v0.30.0 h1:jD5RhkmVAnjqaCUXfbGBrn3lpxbknfN9w2UhHHU+5B4= golang.org/x/image v0.30.0 h1:jD5RhkmVAnjqaCUXfbGBrn3lpxbknfN9w2UhHHU+5B4=
golang.org/x/image v0.30.0/go.mod h1:SAEUTxCCMWSrJcCy/4HwavEsfZZJlYxeHLc6tTiAe/c= golang.org/x/image v0.30.0/go.mod h1:SAEUTxCCMWSrJcCy/4HwavEsfZZJlYxeHLc6tTiAe/c=
golang.org/x/image v0.31.0 h1:mLChjE2MV6g1S7oqbXC0/UcKijjm5fnJLUYKIYrLESA=
golang.org/x/image v0.31.0/go.mod h1:R9ec5Lcp96v9FTF+ajwaH3uGxPH4fKfHHAVbUILxghA=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
@@ -324,6 +330,8 @@ golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs=
golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8= golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8=
golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE= golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE=
golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg= golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg=
golang.org/x/net v0.44.0 h1:evd8IRDyfNBMBTTY5XRF1vaZlD+EmWx6x8PkhR04H/I=
golang.org/x/net v0.44.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
@@ -344,6 +352,8 @@ golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8=
golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw= golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw=
golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=
golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
@@ -373,6 +383,8 @@ golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA=
golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k=
golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.25.0 h1:WtHI/ltw4NvSUig5KARz9h521QvRC8RmF/cuYqifU24= golang.org/x/term v0.25.0 h1:WtHI/ltw4NvSUig5KARz9h521QvRC8RmF/cuYqifU24=
@@ -395,6 +407,8 @@ golang.org/x/term v0.33.0 h1:NuFncQrRcaRvVmgRkvM3j/F00gWIAlcmlB8ACEKmGIg=
golang.org/x/term v0.33.0/go.mod h1:s18+ql9tYWp1IfpV9DmCtQDDSRBUjKaw9M1eAv5UeF0= golang.org/x/term v0.33.0/go.mod h1:s18+ql9tYWp1IfpV9DmCtQDDSRBUjKaw9M1eAv5UeF0=
golang.org/x/term v0.34.0 h1:O/2T7POpk0ZZ7MAzMeWFSg6S5IpWd/RXDlM9hgM3DR4= golang.org/x/term v0.34.0 h1:O/2T7POpk0ZZ7MAzMeWFSg6S5IpWd/RXDlM9hgM3DR4=
golang.org/x/term v0.34.0/go.mod h1:5jC53AEywhIVebHgPVeg0mj8OD3VO9OzclacVrqpaAw= golang.org/x/term v0.34.0/go.mod h1:5jC53AEywhIVebHgPVeg0mj8OD3VO9OzclacVrqpaAw=
golang.org/x/term v0.35.0 h1:bZBVKBudEyhRcajGcNc3jIfWPqV4y/Kt2XcoigOWtDQ=
golang.org/x/term v0.35.0/go.mod h1:TPGtkTLesOwf2DE8CgVYiZinHAOuy5AYUYT1lENIZnA=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
@@ -419,6 +433,8 @@ golang.org/x/text v0.27.0 h1:4fGWRpyh641NLlecmyl4LOe6yDdfaYNrGb2zdfo4JV4=
golang.org/x/text v0.27.0/go.mod h1:1D28KMCvyooCX9hBiosv5Tz/+YLxj0j7XhWjpSUF7CU= golang.org/x/text v0.27.0/go.mod h1:1D28KMCvyooCX9hBiosv5Tz/+YLxj0j7XhWjpSUF7CU=
golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng= golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng=
golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU= golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU=
golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk=
golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
@@ -443,6 +459,8 @@ google.golang.org/protobuf v1.36.7 h1:IgrO7UwFQGJdRNXH/sQux4R1Dj1WAKcLElzeeRaXV2
google.golang.org/protobuf v1.36.7/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= google.golang.org/protobuf v1.36.7/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc= google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc=
google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU=
google.golang.org/protobuf v1.36.9 h1:w2gp2mA27hUeUzj9Ex9FBjsBm40zfaDtEWow293U7Iw=
google.golang.org/protobuf v1.36.9/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -1,5 +1,5 @@
package goext package goext
const GoextVersion = "0.0.597" const GoextVersion = "0.0.601"
const GoextVersionTimestamp = "2025-09-04T14:25:05+0200" const GoextVersionTimestamp = "2025-09-13T19:04:52+0200"

View File

@@ -1,18 +1,24 @@
package syncext package syncext
import ( import (
"context"
"sync" "sync"
"time"
"git.blackforestbytes.com/BlackForestBytes/goext/langext"
) )
type Atomic[T any] struct { type Atomic[T comparable] struct {
v T v T
lock sync.RWMutex lock sync.RWMutex
listener map[string]chan T
} }
func NewAtomic[T any](value T) *Atomic[T] { func NewAtomic[T comparable](value T) *Atomic[T] {
return &Atomic[T]{ return &Atomic[T]{
v: value, v: value,
lock: sync.RWMutex{}, listener: make(map[string]chan T),
lock: sync.RWMutex{},
} }
} }
@@ -26,9 +32,138 @@ func (a *Atomic[T]) Set(value T) T {
a.lock.Lock() a.lock.Lock()
defer a.lock.Unlock() defer a.lock.Unlock()
return a.setInternal(value)
}
func (a *Atomic[T]) setInternal(value T) T {
// not locked !!
// only call from locked context
oldValue := a.v oldValue := a.v
a.v = value a.v = value
for k, v := range a.listener {
select {
case v <- value:
// message sent
default:
// no receiver on channel
delete(a.listener, k)
}
}
return oldValue return oldValue
} }
func (a *Atomic[T]) WaitForChange() chan T {
outChan := make(chan T)
inChan := make(chan T)
uuid, _ := langext.NewHexUUID()
a.lock.Lock()
a.listener[uuid] = inChan
a.lock.Unlock()
go func() {
v := <-inChan
a.lock.Lock()
delete(a.listener, uuid)
a.lock.Unlock()
outChan <- v
close(outChan)
}()
return outChan
}
func (a *Atomic[T]) Wait(waitFor T) {
_ = a.WaitWithContext(context.Background(), waitFor)
}
func (a *Atomic[T]) WaitWithTimeout(timeout time.Duration, waitFor T) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return a.WaitWithContext(ctx, waitFor)
}
func (a *Atomic[T]) WaitWithContext(ctx context.Context, waitFor T) error {
if err := ctx.Err(); err != nil {
return err
}
if a.Get() == waitFor {
return nil
}
uuid, _ := langext.NewHexUUID()
waitchan := make(chan T)
a.lock.Lock()
a.listener[uuid] = waitchan
a.lock.Unlock()
defer func() {
a.lock.Lock()
delete(a.listener, uuid)
a.lock.Unlock()
}()
for {
if err := ctx.Err(); err != nil {
return err
}
timeOut := 1024 * time.Millisecond
if dl, ok := ctx.Deadline(); ok {
timeOutMax := dl.Sub(time.Now())
if timeOutMax <= 0 {
timeOut = 0
} else if 0 < timeOutMax && timeOutMax < timeOut {
timeOut = timeOutMax
}
}
if v, ok := ReadChannelWithTimeout(waitchan, timeOut); ok {
if v == waitFor {
return nil
}
} else {
if err := ctx.Err(); err != nil {
return err
}
if a.Get() == waitFor {
return nil
}
}
}
}
func (a *Atomic[T]) Update(fn func(old T) T) {
a.lock.Lock()
defer a.lock.Unlock()
oldValue := a.v
newValue := fn(oldValue)
a.setInternal(newValue)
}
func (a *Atomic[T]) CompareAndSwap(old, new T) bool {
a.lock.Lock()
defer a.lock.Unlock()
if a.v == old {
a.setInternal(new)
return true
} else {
return false
}
}