diff --git a/goextVersion.go b/goextVersion.go index 8d97dd1..0a02cc1 100644 --- a/goextVersion.go +++ b/goextVersion.go @@ -1,5 +1,5 @@ package goext -const GoextVersion = "0.0.600" +const GoextVersion = "0.0.601" -const GoextVersionTimestamp = "2025-09-13T18:45:23+0200" +const GoextVersionTimestamp = "2025-09-13T19:04:52+0200" diff --git a/syncext/atomic.go b/syncext/atomic.go index 2c9c915..dbe0a30 100644 --- a/syncext/atomic.go +++ b/syncext/atomic.go @@ -1,18 +1,24 @@ package syncext import ( + "context" "sync" + "time" + + "git.blackforestbytes.com/BlackForestBytes/goext/langext" ) -type Atomic[T any] struct { - v T - lock sync.RWMutex +type Atomic[T comparable] struct { + v T + lock sync.RWMutex + listener map[string]chan T } -func NewAtomic[T any](value T) *Atomic[T] { +func NewAtomic[T comparable](value T) *Atomic[T] { return &Atomic[T]{ - v: value, - lock: sync.RWMutex{}, + v: value, + listener: make(map[string]chan T), + lock: sync.RWMutex{}, } } @@ -26,9 +32,138 @@ func (a *Atomic[T]) Set(value T) T { a.lock.Lock() defer a.lock.Unlock() + return a.setInternal(value) +} + +func (a *Atomic[T]) setInternal(value T) T { + // not locked !! + // only call from locked context + oldValue := a.v a.v = value + for k, v := range a.listener { + select { + case v <- value: + // message sent + default: + // no receiver on channel + delete(a.listener, k) + } + } + return oldValue } + +func (a *Atomic[T]) WaitForChange() chan T { + outChan := make(chan T) + inChan := make(chan T) + + uuid, _ := langext.NewHexUUID() + + a.lock.Lock() + a.listener[uuid] = inChan + a.lock.Unlock() + + go func() { + + v := <-inChan + + a.lock.Lock() + delete(a.listener, uuid) + a.lock.Unlock() + + outChan <- v + + close(outChan) + }() + + return outChan +} + +func (a *Atomic[T]) Wait(waitFor T) { + _ = a.WaitWithContext(context.Background(), waitFor) +} + +func (a *Atomic[T]) WaitWithTimeout(timeout time.Duration, waitFor T) error { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + return a.WaitWithContext(ctx, waitFor) +} + +func (a *Atomic[T]) WaitWithContext(ctx context.Context, waitFor T) error { + if err := ctx.Err(); err != nil { + return err + } + + if a.Get() == waitFor { + return nil + } + + uuid, _ := langext.NewHexUUID() + + waitchan := make(chan T) + + a.lock.Lock() + a.listener[uuid] = waitchan + a.lock.Unlock() + defer func() { + a.lock.Lock() + delete(a.listener, uuid) + a.lock.Unlock() + }() + + for { + if err := ctx.Err(); err != nil { + return err + } + + timeOut := 1024 * time.Millisecond + + if dl, ok := ctx.Deadline(); ok { + timeOutMax := dl.Sub(time.Now()) + if timeOutMax <= 0 { + timeOut = 0 + } else if 0 < timeOutMax && timeOutMax < timeOut { + timeOut = timeOutMax + } + } + + if v, ok := ReadChannelWithTimeout(waitchan, timeOut); ok { + if v == waitFor { + return nil + } + } else { + if err := ctx.Err(); err != nil { + return err + } + + if a.Get() == waitFor { + return nil + } + } + } +} + +func (a *Atomic[T]) Update(fn func(old T) T) { + a.lock.Lock() + defer a.lock.Unlock() + + oldValue := a.v + newValue := fn(oldValue) + + a.setInternal(newValue) +} + +func (a *Atomic[T]) CompareAndSwap(old, new T) bool { + a.lock.Lock() + defer a.lock.Unlock() + + if a.v == old { + a.setInternal(new) + return true + } else { + return false + } +}