package syncext import ( "context" "sync" "time" "git.blackforestbytes.com/BlackForestBytes/goext/langext" ) type Atomic[T comparable] struct { v T lock sync.RWMutex listener map[string]chan T } func NewAtomic[T comparable](value T) *Atomic[T] { return &Atomic[T]{ v: value, listener: make(map[string]chan T), lock: sync.RWMutex{}, } } func (a *Atomic[T]) Get() T { a.lock.RLock() defer a.lock.RUnlock() return a.v } func (a *Atomic[T]) Set(value T) T { a.lock.Lock() defer a.lock.Unlock() return a.setInternal(value) } func (a *Atomic[T]) setInternal(value T) T { // not locked !! // only call from locked context oldValue := a.v a.v = value for k, v := range a.listener { select { case v <- value: // message sent default: // no receiver on channel delete(a.listener, k) } } return oldValue } func (a *Atomic[T]) WaitForChange() chan T { outChan := make(chan T) inChan := make(chan T) uuid, _ := langext.NewHexUUID() a.lock.Lock() a.listener[uuid] = inChan a.lock.Unlock() go func() { v := <-inChan a.lock.Lock() delete(a.listener, uuid) a.lock.Unlock() outChan <- v close(outChan) }() return outChan } func (a *Atomic[T]) Wait(waitFor T) { _ = a.WaitWithContext(context.Background(), waitFor) } func (a *Atomic[T]) WaitWithTimeout(timeout time.Duration, waitFor T) error { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() return a.WaitWithContext(ctx, waitFor) } func (a *Atomic[T]) WaitWithContext(ctx context.Context, waitFor T) error { if err := ctx.Err(); err != nil { return err } if a.Get() == waitFor { return nil } uuid, _ := langext.NewHexUUID() waitchan := make(chan T) a.lock.Lock() a.listener[uuid] = waitchan a.lock.Unlock() defer func() { a.lock.Lock() delete(a.listener, uuid) a.lock.Unlock() }() for { if err := ctx.Err(); err != nil { return err } timeOut := 1024 * time.Millisecond if dl, ok := ctx.Deadline(); ok { timeOutMax := dl.Sub(time.Now()) if timeOutMax <= 0 { timeOut = 0 } else if 0 < timeOutMax && timeOutMax < timeOut { timeOut = timeOutMax } } if v, ok := ReadChannelWithTimeout(waitchan, timeOut); ok { if v == waitFor { return nil } } else { if err := ctx.Err(); err != nil { return err } if a.Get() == waitFor { return nil } } } } func (a *Atomic[T]) Update(fn func(old T) T) { a.lock.Lock() defer a.lock.Unlock() oldValue := a.v newValue := fn(oldValue) a.setInternal(newValue) } func (a *Atomic[T]) CompareAndSwap(old, new T) bool { a.lock.Lock() defer a.lock.Unlock() if a.v == old { a.setInternal(new) return true } else { return false } }