package dataext import ( "context" "sync" "time" "git.blackforestbytes.com/BlackForestBytes/goext/langext" "git.blackforestbytes.com/BlackForestBytes/goext/syncext" ) type DelayedCombiningInvoker struct { syncLock sync.Mutex triggerChan chan bool cancelChan chan bool execNowChan chan bool action func() delay time.Duration maxDelay time.Duration executorRunning *syncext.AtomicBool pendingRequests *syncext.Atomic[int] lastRequestTime time.Time initialRequestTime time.Time onExecutionStart []func(immediately bool) // listener ( actual execution of action starts ) onExecutionDone []func() // listener ( actual execution of action finished ) onRequest []func(pending int, initial bool) // listener ( a request came in, waiting for execution ) } func NewDelayedCombiningInvoker(action func(), delay time.Duration, maxDelay time.Duration) *DelayedCombiningInvoker { return &DelayedCombiningInvoker{ action: action, delay: delay, maxDelay: maxDelay, executorRunning: syncext.NewAtomicBool(false), pendingRequests: syncext.NewAtomic[int](0), triggerChan: make(chan bool), cancelChan: make(chan bool, 1), execNowChan: make(chan bool, 1), lastRequestTime: time.Now(), initialRequestTime: time.Now(), onExecutionStart: make([]func(bool), 0), onExecutionDone: make([]func(), 0), onRequest: make([]func(int, bool), 0), } } func (d *DelayedCombiningInvoker) Request() { now := time.Now() d.syncLock.Lock() defer d.syncLock.Unlock() if d.executorRunning.Get() { d.lastRequestTime = now d.pendingRequests.Update(func(v int) int { return v + 1 }) for _, fn := range d.onRequest { _ = langext.RunPanicSafe(func() { fn(d.pendingRequests.Get(), true) }) } d.triggerChan <- true } else { d.initialRequestTime = now d.lastRequestTime = now d.executorRunning.Set(true) d.pendingRequests.Set(1) syncext.ReadNonBlocking(d.triggerChan) // clear the channel syncext.ReadNonBlocking(d.cancelChan) // clear the channel syncext.ReadNonBlocking(d.execNowChan) // clear the channel for _, fn := range d.onRequest { _ = langext.RunPanicSafe(func() { fn(d.pendingRequests.Get(), false) }) } go d.run() } } func (d *DelayedCombiningInvoker) run() { defer func() { d.syncLock.Lock() d.executorRunning.Set(false) d.syncLock.Unlock() }() for { d.syncLock.Lock() timeOut := min(d.maxDelay-time.Since(d.initialRequestTime), d.delay-time.Since(d.lastRequestTime)) if timeOut < 0 { timeOut = 0 } d.syncLock.Unlock() immediately := false select { case <-d.execNowChan: // run immediately immediately = true break case <-d.triggerChan: // external trigger - needs to re-evaluate break case <-d.cancelChan: // cancel return case <-time.After(timeOut): // time elapsed - check for execution break } d.syncLock.Lock() execute := immediately || time.Since(d.lastRequestTime) >= d.delay || time.Since(d.initialRequestTime) >= d.maxDelay if !execute { d.syncLock.Unlock() continue } d.pendingRequests.Set(0) for _, fn := range d.onExecutionStart { _ = langext.RunPanicSafe(func() { fn(immediately) }) } // ================================================= _ = langext.RunPanicSafe(d.action) // ================================================= for _, fn := range d.onExecutionDone { _ = langext.RunPanicSafe(fn) } d.syncLock.Unlock() return } } func (d *DelayedCombiningInvoker) CancelPendingRequests() { d.syncLock.Lock() defer d.syncLock.Unlock() syncext.WriteNonBlocking(d.cancelChan, true) } func (d *DelayedCombiningInvoker) HasPendingRequests() bool { return d.executorRunning.Get() } func (d *DelayedCombiningInvoker) CountPendingRequests() int { return d.pendingRequests.Get() } func (d *DelayedCombiningInvoker) ExecuteNow() bool { d.syncLock.Lock() defer d.syncLock.Unlock() if d.executorRunning.Get() { syncext.WriteNonBlocking(d.execNowChan, true) return true } else { return false } } func (d *DelayedCombiningInvoker) WaitForCompletion(ctx context.Context) error { return d.executorRunning.WaitWithContext(ctx, false) } func (d *DelayedCombiningInvoker) RegisterOnExecutionStart(fn func(immediately bool)) { d.syncLock.Lock() defer d.syncLock.Unlock() d.onExecutionStart = append(d.onExecutionStart, fn) } func (d *DelayedCombiningInvoker) RegisterOnExecutionDone(fn func()) { d.syncLock.Lock() defer d.syncLock.Unlock() d.onExecutionDone = append(d.onExecutionDone, fn) } func (d *DelayedCombiningInvoker) RegisterOnRequest(fn func(pending int, initial bool)) { d.syncLock.Lock() defer d.syncLock.Unlock() d.onRequest = append(d.onRequest, fn) }