From 49bc52d63edb15f80274bfcfcbbbd55118c82230 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mike=20Schw=C3=B6rer?= Date: Sun, 11 May 2025 19:17:05 +0200 Subject: [PATCH] v0.0.575 DelayedCombiningInvoker --- dataext/delayedCombiningInvoker.go | 132 +++++++++++++++++++++++++++++ dataext/syncMap.go | 30 +++++++ goextVersion.go | 4 +- 3 files changed, 164 insertions(+), 2 deletions(-) create mode 100644 dataext/delayedCombiningInvoker.go diff --git a/dataext/delayedCombiningInvoker.go b/dataext/delayedCombiningInvoker.go new file mode 100644 index 0000000..9cd5778 --- /dev/null +++ b/dataext/delayedCombiningInvoker.go @@ -0,0 +1,132 @@ +package dataext + +import ( + "context" + "git.blackforestbytes.com/BlackForestBytes/goext/langext" + "git.blackforestbytes.com/BlackForestBytes/goext/syncext" + "sync" + "time" +) + +type DelayedCombiningInvoker struct { + syncLock sync.Mutex + triggerChan chan bool + cancelChan chan bool + execNowChan chan bool + action func() + delay time.Duration + maxDelay time.Duration + executorRunning *syncext.AtomicBool + lastRequestTime time.Time + initialRequestTime time.Time +} + +func NewDelayedCombiningInvoker(action func(), delay time.Duration, maxDelay time.Duration) *DelayedCombiningInvoker { + return &DelayedCombiningInvoker{ + action: action, + delay: delay, + maxDelay: maxDelay, + executorRunning: syncext.NewAtomicBool(false), + triggerChan: make(chan bool), + cancelChan: make(chan bool, 1), + execNowChan: make(chan bool, 1), + lastRequestTime: time.Now(), + initialRequestTime: time.Now(), + } +} + +func (d *DelayedCombiningInvoker) Request() { + now := time.Now() + + d.syncLock.Lock() + defer d.syncLock.Unlock() + + if d.executorRunning.Get() { + d.lastRequestTime = now + + d.triggerChan <- true + } else { + d.initialRequestTime = now + d.lastRequestTime = now + + d.executorRunning.Set(true) + syncext.ReadNonBlocking(d.triggerChan) // clear the channel + syncext.ReadNonBlocking(d.cancelChan) // clear the channel + syncext.ReadNonBlocking(d.execNowChan) // clear the channel + go d.run() + } +} + +func (d *DelayedCombiningInvoker) run() { + defer func() { + d.syncLock.Lock() + d.executorRunning.Set(false) + d.syncLock.Unlock() + }() + + for { + d.syncLock.Lock() + timeOut := min(d.maxDelay-time.Since(d.initialRequestTime), d.delay-time.Since(d.lastRequestTime)) + if timeOut < 0 { + timeOut = 0 + } + d.syncLock.Unlock() + + immediately := false + + select { + case <-d.execNowChan: + // run immediately + immediately = true + break + case <-d.triggerChan: + // external trigger - needs to re-evaluate + break + case <-d.cancelChan: + // cancel + return + case <-time.After(timeOut): + // time elapsed - check for execution + break + + } + + d.syncLock.Lock() + execute := immediately || time.Since(d.lastRequestTime) >= d.delay || time.Since(d.initialRequestTime) >= d.maxDelay + if !execute { + d.syncLock.Unlock() + continue + } + + _ = langext.RunPanicSafe(d.action) + d.syncLock.Unlock() + return + } +} + +func (d *DelayedCombiningInvoker) CancelPendingRequests() { + d.syncLock.Lock() + defer d.syncLock.Unlock() + + syncext.WriteNonBlocking(d.cancelChan, true) +} + +func (d *DelayedCombiningInvoker) HasPendingRequests() bool { + return d.executorRunning.Get() +} + +func (d *DelayedCombiningInvoker) ExecuteNow() bool { + d.syncLock.Lock() + defer d.syncLock.Unlock() + + if d.executorRunning.Get() { + syncext.WriteNonBlocking(d.execNowChan, true) + return true + } else { + return false + } +} + +func (d *DelayedCombiningInvoker) WaitForCompletion(ctx context.Context) error { + return d.executorRunning.WaitWithContext(ctx, false) +} diff --git a/dataext/syncMap.go b/dataext/syncMap.go index 6fd7e82..995a91a 100644 --- a/dataext/syncMap.go +++ b/dataext/syncMap.go @@ -119,6 +119,25 @@ func (s *SyncMap[TKey, TData]) Delete(key TKey) bool { return ok } +func (s *SyncMap[TKey, TData]) DeleteIf(fn func(key TKey, data TData) bool) int { + s.lock.Lock() + defer s.lock.Unlock() + + if s.data == nil { + s.data = make(map[TKey]TData) + } + + rm := 0 + for k, v := range s.data { + if fn(k, v) { + delete(s.data, k) + rm++ + } + } + + return rm +} + func (s *SyncMap[TKey, TData]) Clear() { s.lock.Lock() defer s.lock.Unlock() @@ -172,3 +191,14 @@ func (s *SyncMap[TKey, TData]) GetAllValues() []TData { return r } + +func (s *SyncMap[TKey, TData]) Count() int { + s.lock.Lock() + defer s.lock.Unlock() + + if s.data == nil { + s.data = make(map[TKey]TData) + } + + return len(s.data) +} diff --git a/goextVersion.go b/goextVersion.go index 071416b..8294bb9 100644 --- a/goextVersion.go +++ b/goextVersion.go @@ -1,5 +1,5 @@ package goext -const GoextVersion = "0.0.574" +const GoextVersion = "0.0.575" -const GoextVersionTimestamp = "2025-05-07T15:28:15+0200" +const GoextVersionTimestamp = "2025-05-11T19:17:05+0200"