Compare commits

...

2 Commits

Author SHA1 Message Date
a70ab33559 v0.0.601 Add Wait and Update method to Atomic[T]
Some checks failed
Build Docker and Deploy / Run goext test-suite (push) Failing after 1m34s
2025-09-13 19:04:52 +02:00
a58bb4b14b v0.0.600
Some checks failed
Build Docker and Deploy / Run goext test-suite (push) Failing after 2m30s
2025-09-13 18:45:23 +02:00
4 changed files with 153 additions and 18 deletions

View File

@@ -40,7 +40,7 @@ func (p *broadcastSubscription[TData]) Unsubscribe() {
p.parent.unsubscribe(p) p.parent.unsubscribe(p)
} }
func NewBroadcast[TData any](capacity int) *Broadcaster[TData] { func NewBroadcaster[TData any](capacity int) *Broadcaster[TData] {
return &Broadcaster[TData]{ return &Broadcaster[TData]{
masterLock: &sync.Mutex{}, masterLock: &sync.Mutex{},
subscriptions: make([]*broadcastSubscription[TData], 0, capacity), subscriptions: make([]*broadcastSubscription[TData], 0, capacity),

View File

@@ -8,9 +8,9 @@ import (
) )
func TestNewBroadcast(t *testing.T) { func TestNewBroadcast(t *testing.T) {
bb := NewBroadcast[string](10) bb := NewBroadcaster[string](10)
if bb == nil { if bb == nil {
t.Fatal("NewBroadcast returned nil") t.Fatal("NewBroadcaster returned nil")
} }
if bb.masterLock == nil { if bb.masterLock == nil {
t.Fatal("masterLock is nil") t.Fatal("masterLock is nil")
@@ -21,7 +21,7 @@ func TestNewBroadcast(t *testing.T) {
} }
func TestBroadcast_SubscribeByCallback(t *testing.T) { func TestBroadcast_SubscribeByCallback(t *testing.T) {
bb := NewBroadcast[string](10) bb := NewBroadcaster[string](10)
var received string var received string
var wg sync.WaitGroup var wg sync.WaitGroup
@@ -55,7 +55,7 @@ func TestBroadcast_SubscribeByCallback(t *testing.T) {
} }
func TestBroadcast_SubscribeByChan(t *testing.T) { func TestBroadcast_SubscribeByChan(t *testing.T) {
bb := NewBroadcast[string](10) bb := NewBroadcaster[string](10)
ch, sub := bb.SubscribeByChan(1) ch, sub := bb.SubscribeByChan(1)
defer sub.Unsubscribe() defer sub.Unsubscribe()
@@ -83,7 +83,7 @@ func TestBroadcast_SubscribeByChan(t *testing.T) {
} }
func TestBroadcast_SubscribeByIter(t *testing.T) { func TestBroadcast_SubscribeByIter(t *testing.T) {
bb := NewBroadcast[string](10) bb := NewBroadcaster[string](10)
iterSeq, sub := bb.SubscribeByIter(1) iterSeq, sub := bb.SubscribeByIter(1)
defer sub.Unsubscribe() defer sub.Unsubscribe()
@@ -126,7 +126,7 @@ func TestBroadcast_SubscribeByIter(t *testing.T) {
} }
func TestBroadcast_Publish(t *testing.T) { func TestBroadcast_Publish(t *testing.T) {
bb := NewBroadcast[string](10) bb := NewBroadcaster[string](10)
// Test publishing with no subscribers // Test publishing with no subscribers
subs, receivers := bb.Publish("hello") subs, receivers := bb.Publish("hello")
@@ -176,7 +176,7 @@ func TestBroadcast_Publish(t *testing.T) {
} }
func TestBroadcast_PublishWithTimeout(t *testing.T) { func TestBroadcast_PublishWithTimeout(t *testing.T) {
bb := NewBroadcast[string](10) bb := NewBroadcaster[string](10)
// Add a subscriber with a channel // Add a subscriber with a channel
ch, sub := bb.SubscribeByChan(1) ch, sub := bb.SubscribeByChan(1)
@@ -223,7 +223,7 @@ func TestBroadcast_PublishWithTimeout(t *testing.T) {
} }
func TestBroadcast_PublishWithContext(t *testing.T) { func TestBroadcast_PublishWithContext(t *testing.T) {
bb := NewBroadcast[string](10) bb := NewBroadcaster[string](10)
// Add a subscriber with a channel // Add a subscriber with a channel
ch, sub := bb.SubscribeByChan(1) ch, sub := bb.SubscribeByChan(1)
@@ -289,7 +289,7 @@ func TestBroadcast_PublishWithContext(t *testing.T) {
} }
func TestBroadcast_Unsubscribe(t *testing.T) { func TestBroadcast_Unsubscribe(t *testing.T) {
bb := NewBroadcast[string](10) bb := NewBroadcaster[string](10)
// Add a subscriber // Add a subscriber
ch, sub := bb.SubscribeByChan(1) ch, sub := bb.SubscribeByChan(1)

View File

@@ -1,5 +1,5 @@
package goext package goext
const GoextVersion = "0.0.599" const GoextVersion = "0.0.601"
const GoextVersionTimestamp = "2025-09-13T18:42:17+0200" const GoextVersionTimestamp = "2025-09-13T19:04:52+0200"

View File

@@ -1,17 +1,23 @@
package syncext package syncext
import ( import (
"context"
"sync" "sync"
"time"
"git.blackforestbytes.com/BlackForestBytes/goext/langext"
) )
type Atomic[T any] struct { type Atomic[T comparable] struct {
v T v T
lock sync.RWMutex lock sync.RWMutex
listener map[string]chan T
} }
func NewAtomic[T any](value T) *Atomic[T] { func NewAtomic[T comparable](value T) *Atomic[T] {
return &Atomic[T]{ return &Atomic[T]{
v: value, v: value,
listener: make(map[string]chan T),
lock: sync.RWMutex{}, lock: sync.RWMutex{},
} }
} }
@@ -26,9 +32,138 @@ func (a *Atomic[T]) Set(value T) T {
a.lock.Lock() a.lock.Lock()
defer a.lock.Unlock() defer a.lock.Unlock()
return a.setInternal(value)
}
func (a *Atomic[T]) setInternal(value T) T {
// not locked !!
// only call from locked context
oldValue := a.v oldValue := a.v
a.v = value a.v = value
for k, v := range a.listener {
select {
case v <- value:
// message sent
default:
// no receiver on channel
delete(a.listener, k)
}
}
return oldValue return oldValue
} }
func (a *Atomic[T]) WaitForChange() chan T {
outChan := make(chan T)
inChan := make(chan T)
uuid, _ := langext.NewHexUUID()
a.lock.Lock()
a.listener[uuid] = inChan
a.lock.Unlock()
go func() {
v := <-inChan
a.lock.Lock()
delete(a.listener, uuid)
a.lock.Unlock()
outChan <- v
close(outChan)
}()
return outChan
}
func (a *Atomic[T]) Wait(waitFor T) {
_ = a.WaitWithContext(context.Background(), waitFor)
}
func (a *Atomic[T]) WaitWithTimeout(timeout time.Duration, waitFor T) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return a.WaitWithContext(ctx, waitFor)
}
func (a *Atomic[T]) WaitWithContext(ctx context.Context, waitFor T) error {
if err := ctx.Err(); err != nil {
return err
}
if a.Get() == waitFor {
return nil
}
uuid, _ := langext.NewHexUUID()
waitchan := make(chan T)
a.lock.Lock()
a.listener[uuid] = waitchan
a.lock.Unlock()
defer func() {
a.lock.Lock()
delete(a.listener, uuid)
a.lock.Unlock()
}()
for {
if err := ctx.Err(); err != nil {
return err
}
timeOut := 1024 * time.Millisecond
if dl, ok := ctx.Deadline(); ok {
timeOutMax := dl.Sub(time.Now())
if timeOutMax <= 0 {
timeOut = 0
} else if 0 < timeOutMax && timeOutMax < timeOut {
timeOut = timeOutMax
}
}
if v, ok := ReadChannelWithTimeout(waitchan, timeOut); ok {
if v == waitFor {
return nil
}
} else {
if err := ctx.Err(); err != nil {
return err
}
if a.Get() == waitFor {
return nil
}
}
}
}
func (a *Atomic[T]) Update(fn func(old T) T) {
a.lock.Lock()
defer a.lock.Unlock()
oldValue := a.v
newValue := fn(oldValue)
a.setInternal(newValue)
}
func (a *Atomic[T]) CompareAndSwap(old, new T) bool {
a.lock.Lock()
defer a.lock.Unlock()
if a.v == old {
a.setInternal(new)
return true
} else {
return false
}
}