package dataext import ( "context" "git.blackforestbytes.com/BlackForestBytes/goext/langext" "git.blackforestbytes.com/BlackForestBytes/goext/syncext" "sync" "time" ) type DelayedCombiningInvoker struct { syncLock sync.Mutex triggerChan chan bool cancelChan chan bool execNowChan chan bool action func() delay time.Duration maxDelay time.Duration executorRunning *syncext.AtomicBool lastRequestTime time.Time initialRequestTime time.Time } func NewDelayedCombiningInvoker(action func(), delay time.Duration, maxDelay time.Duration) *DelayedCombiningInvoker { return &DelayedCombiningInvoker{ action: action, delay: delay, maxDelay: maxDelay, executorRunning: syncext.NewAtomicBool(false), triggerChan: make(chan bool), cancelChan: make(chan bool, 1), execNowChan: make(chan bool, 1), lastRequestTime: time.Now(), initialRequestTime: time.Now(), } } func (d *DelayedCombiningInvoker) Request() { now := time.Now() d.syncLock.Lock() defer d.syncLock.Unlock() if d.executorRunning.Get() { d.lastRequestTime = now d.triggerChan <- true } else { d.initialRequestTime = now d.lastRequestTime = now d.executorRunning.Set(true) syncext.ReadNonBlocking(d.triggerChan) // clear the channel syncext.ReadNonBlocking(d.cancelChan) // clear the channel syncext.ReadNonBlocking(d.execNowChan) // clear the channel go d.run() } } func (d *DelayedCombiningInvoker) run() { defer func() { d.syncLock.Lock() d.executorRunning.Set(false) d.syncLock.Unlock() }() for { d.syncLock.Lock() timeOut := min(d.maxDelay-time.Since(d.initialRequestTime), d.delay-time.Since(d.lastRequestTime)) if timeOut < 0 { timeOut = 0 } d.syncLock.Unlock() immediately := false select { case <-d.execNowChan: // run immediately immediately = true break case <-d.triggerChan: // external trigger - needs to re-evaluate break case <-d.cancelChan: // cancel return case <-time.After(timeOut): // time elapsed - check for execution break } d.syncLock.Lock() execute := immediately || time.Since(d.lastRequestTime) >= d.delay || time.Since(d.initialRequestTime) >= d.maxDelay if !execute { d.syncLock.Unlock() continue } _ = langext.RunPanicSafe(d.action) d.syncLock.Unlock() return } } func (d *DelayedCombiningInvoker) CancelPendingRequests() { d.syncLock.Lock() defer d.syncLock.Unlock() syncext.WriteNonBlocking(d.cancelChan, true) } func (d *DelayedCombiningInvoker) HasPendingRequests() bool { return d.executorRunning.Get() } func (d *DelayedCombiningInvoker) ExecuteNow() bool { d.syncLock.Lock() defer d.syncLock.Unlock() if d.executorRunning.Get() { syncext.WriteNonBlocking(d.execNowChan, true) return true } else { return false } } func (d *DelayedCombiningInvoker) WaitForCompletion(ctx context.Context) error { return d.executorRunning.WaitWithContext(ctx, false) }