package worker import ( "context" "errors" "io" "log/slog" "sync" "testing" "time" ) type fakeScheduledPostPromoter struct { mu sync.Mutex calls int limits []int results []int errAt int callHook func(int) } func (f *fakeScheduledPostPromoter) PromoteDueScheduled(_ context.Context, limit int) (int, error) { f.mu.Lock() defer f.mu.Unlock() f.calls++ callNo := f.calls f.limits = append(f.limits, limit) if f.callHook != nil { f.callHook(callNo) } if f.errAt > 0 && callNo == f.errAt { return 0, errors.New("boom") } if len(f.results) == 0 { return 0, nil } result := f.results[0] f.results = f.results[1:] return result, nil } func (f *fakeScheduledPostPromoter) Calls() int { f.mu.Lock() defer f.mu.Unlock() return f.calls } func (f *fakeScheduledPostPromoter) Limits() []int { f.mu.Lock() defer f.mu.Unlock() out := make([]int, len(f.limits)) copy(out, f.limits) return out } func newDiscardLogger() *slog.Logger { return slog.New(slog.NewTextHandler(io.Discard, nil)) } func TestPromoteDueScheduledOnceBatchesUntilEmpty(t *testing.T) { repo := &fakeScheduledPostPromoter{results: []int{2, 3, 0}} RunOnce(context.Background(), repo.PromoteDueScheduled, 50, newDiscardLogger(), "scheduled post promotion") if got := repo.Calls(); got != 3 { t.Fatalf("expected 3 calls, got %d", got) } limits := repo.Limits() if len(limits) != 3 || limits[0] != 50 || limits[1] != 50 || limits[2] != 50 { t.Fatalf("unexpected limits: %+v", limits) } } func TestPromoteDueScheduledOnceStopsOnError(t *testing.T) { repo := &fakeScheduledPostPromoter{ results: []int{4, 4}, errAt: 2, } RunOnce(context.Background(), repo.PromoteDueScheduled, 25, newDiscardLogger(), "scheduled post promotion") if got := repo.Calls(); got != 2 { t.Fatalf("expected 2 calls before stop, got %d", got) } } func TestRunScheduledPostPromoterStopsOnContextCancel(t *testing.T) { callCh := make(chan int, 8) repo := &fakeScheduledPostPromoter{ results: []int{0, 0, 0, 0}, callHook: func(call int) { callCh <- call }, } ctx, cancel := context.WithCancel(context.Background()) done := make(chan struct{}) go func() { Run(ctx, repo.PromoteDueScheduled, 10*time.Millisecond, 10, newDiscardLogger(), "scheduled post promotion") close(done) }() timeout := time.After(300 * time.Millisecond) for { select { case <-callCh: if repo.Calls() >= 2 { cancel() select { case <-done: return case <-time.After(200 * time.Millisecond): t.Fatal("promoter did not stop after context cancel") } } case <-timeout: cancel() t.Fatal("timed out waiting for promoter calls") } } }