package dataext import ( "sync/atomic" "testing" "time" ) func waitForCalls(t *testing.T, calls *int64, want int64, max time.Duration) { t.Helper() deadline := time.Now().Add(max) for time.Now().Before(deadline) { if atomic.LoadInt64(calls) >= want { return } time.Sleep(5 * time.Millisecond) } } func TestDelayedCombiningInvoker_SingleRequest(t *testing.T) { var calls int64 d := NewDelayedCombiningInvoker(func() { atomic.AddInt64(&calls, 1) }, 20*time.Millisecond, 200*time.Millisecond) d.Request() waitForCalls(t, &calls, 1, 2*time.Second) if c := atomic.LoadInt64(&calls); c != 1 { t.Fatalf("calls=%d want 1", c) } } func TestDelayedCombiningInvoker_TwoRequestsCombine(t *testing.T) { var calls int64 d := NewDelayedCombiningInvoker(func() { atomic.AddInt64(&calls, 1) }, 50*time.Millisecond, 1*time.Second) d.Request() time.Sleep(10 * time.Millisecond) d.Request() waitForCalls(t, &calls, 1, 2*time.Second) if c := atomic.LoadInt64(&calls); c != 1 { t.Fatalf("calls=%d want 1 (should be combined)", c) } } func TestDelayedCombiningInvoker_SequentialRuns(t *testing.T) { var calls int64 d := NewDelayedCombiningInvoker(func() { atomic.AddInt64(&calls, 1) }, 20*time.Millisecond, 200*time.Millisecond) d.Request() waitForCalls(t, &calls, 1, 2*time.Second) if c := atomic.LoadInt64(&calls); c != 1 { t.Fatalf("after first wait calls=%d want 1", c) } // allow executorRunning to clear time.Sleep(50 * time.Millisecond) d.Request() waitForCalls(t, &calls, 2, 2*time.Second) if c := atomic.LoadInt64(&calls); c != 2 { t.Fatalf("calls=%d want 2", c) } } func TestDelayedCombiningInvoker_ExecuteNow(t *testing.T) { var calls int64 d := NewDelayedCombiningInvoker(func() { atomic.AddInt64(&calls, 1) }, 5*time.Second, 30*time.Second) d.Request() if !d.HasPendingRequests() { t.Fatal("should have pending requests") } if !d.ExecuteNow() { t.Fatal("ExecuteNow should return true when running") } deadline := time.Now().Add(2 * time.Second) for time.Now().Before(deadline) { if atomic.LoadInt64(&calls) >= 1 { break } time.Sleep(10 * time.Millisecond) } if c := atomic.LoadInt64(&calls); c != 1 { t.Fatalf("calls=%d want 1 (ExecuteNow should fire well before delay)", c) } // allow internal state cleanup for i := 0; i < 100; i++ { if !d.HasPendingRequests() { break } time.Sleep(10 * time.Millisecond) } if d.ExecuteNow() { t.Fatal("ExecuteNow should return false when no pending") } } func TestDelayedCombiningInvoker_Cancel(t *testing.T) { var calls int64 d := NewDelayedCombiningInvoker(func() { atomic.AddInt64(&calls, 1) }, 500*time.Millisecond, 5*time.Second) d.Request() d.CancelPendingRequests() time.Sleep(200 * time.Millisecond) if c := atomic.LoadInt64(&calls); c != 0 { t.Fatalf("calls=%d want 0 after cancel", c) } } func TestDelayedCombiningInvoker_HasAndCountPending(t *testing.T) { d := NewDelayedCombiningInvoker(func() { // no-op }, 500*time.Millisecond, 5*time.Second) if d.HasPendingRequests() { t.Fatal("should not have pending before any Request") } if d.CountPendingRequests() != 0 { t.Fatalf("count=%d want 0", d.CountPendingRequests()) } d.Request() if !d.HasPendingRequests() { t.Fatal("should have pending") } if d.CountPendingRequests() < 1 { t.Fatalf("count=%d want >=1", d.CountPendingRequests()) } d.CancelPendingRequests() } func TestDelayedCombiningInvoker_Listeners(t *testing.T) { var ( startCount int64 doneCount int64 requestCount int64 ) d := NewDelayedCombiningInvoker(func() { // no-op }, 20*time.Millisecond, 200*time.Millisecond) d.RegisterOnExecutionStart(func(immediately bool) { atomic.AddInt64(&startCount, 1) }) d.RegisterOnExecutionDone(func() { atomic.AddInt64(&doneCount, 1) }) d.RegisterOnRequest(func(pending int, initial bool) { atomic.AddInt64(&requestCount, 1) }) d.Request() waitForCalls(t, &doneCount, 1, 2*time.Second) if atomic.LoadInt64(&startCount) != 1 { t.Fatalf("startCount=%d want 1", startCount) } if atomic.LoadInt64(&doneCount) != 1 { t.Fatalf("doneCount=%d want 1", doneCount) } if atomic.LoadInt64(&requestCount) != 1 { t.Fatalf("requestCount=%d want 1", requestCount) } }