Files
goext/syncext/atomic.go
Mike Schwörer a70ab33559
Some checks failed
Build Docker and Deploy / Run goext test-suite (push) Failing after 1m34s
v0.0.601 Add Wait and Update method to Atomic[T]
2025-09-13 19:04:52 +02:00

170 lines
2.7 KiB
Go

package syncext
import (
"context"
"sync"
"time"
"git.blackforestbytes.com/BlackForestBytes/goext/langext"
)
type Atomic[T comparable] struct {
v T
lock sync.RWMutex
listener map[string]chan T
}
func NewAtomic[T comparable](value T) *Atomic[T] {
return &Atomic[T]{
v: value,
listener: make(map[string]chan T),
lock: sync.RWMutex{},
}
}
func (a *Atomic[T]) Get() T {
a.lock.RLock()
defer a.lock.RUnlock()
return a.v
}
func (a *Atomic[T]) Set(value T) T {
a.lock.Lock()
defer a.lock.Unlock()
return a.setInternal(value)
}
func (a *Atomic[T]) setInternal(value T) T {
// not locked !!
// only call from locked context
oldValue := a.v
a.v = value
for k, v := range a.listener {
select {
case v <- value:
// message sent
default:
// no receiver on channel
delete(a.listener, k)
}
}
return oldValue
}
func (a *Atomic[T]) WaitForChange() chan T {
outChan := make(chan T)
inChan := make(chan T)
uuid, _ := langext.NewHexUUID()
a.lock.Lock()
a.listener[uuid] = inChan
a.lock.Unlock()
go func() {
v := <-inChan
a.lock.Lock()
delete(a.listener, uuid)
a.lock.Unlock()
outChan <- v
close(outChan)
}()
return outChan
}
func (a *Atomic[T]) Wait(waitFor T) {
_ = a.WaitWithContext(context.Background(), waitFor)
}
func (a *Atomic[T]) WaitWithTimeout(timeout time.Duration, waitFor T) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return a.WaitWithContext(ctx, waitFor)
}
func (a *Atomic[T]) WaitWithContext(ctx context.Context, waitFor T) error {
if err := ctx.Err(); err != nil {
return err
}
if a.Get() == waitFor {
return nil
}
uuid, _ := langext.NewHexUUID()
waitchan := make(chan T)
a.lock.Lock()
a.listener[uuid] = waitchan
a.lock.Unlock()
defer func() {
a.lock.Lock()
delete(a.listener, uuid)
a.lock.Unlock()
}()
for {
if err := ctx.Err(); err != nil {
return err
}
timeOut := 1024 * time.Millisecond
if dl, ok := ctx.Deadline(); ok {
timeOutMax := dl.Sub(time.Now())
if timeOutMax <= 0 {
timeOut = 0
} else if 0 < timeOutMax && timeOutMax < timeOut {
timeOut = timeOutMax
}
}
if v, ok := ReadChannelWithTimeout(waitchan, timeOut); ok {
if v == waitFor {
return nil
}
} else {
if err := ctx.Err(); err != nil {
return err
}
if a.Get() == waitFor {
return nil
}
}
}
}
func (a *Atomic[T]) Update(fn func(old T) T) {
a.lock.Lock()
defer a.lock.Unlock()
oldValue := a.v
newValue := fn(oldValue)
a.setInternal(newValue)
}
func (a *Atomic[T]) CompareAndSwap(old, new T) bool {
a.lock.Lock()
defer a.lock.Unlock()
if a.v == old {
a.setInternal(new)
return true
} else {
return false
}
}