add core lib
This commit is contained in:
70
worker/poller.go
Normal file
70
worker/poller.go
Normal file
@@ -0,0 +1,70 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"time"
|
||||
)
|
||||
|
||||
const defaultTaskName = "batch worker"
|
||||
|
||||
// BatchRunner executes one batch and returns the number of processed records.
|
||||
// Returning 0 stops the current drain cycle.
|
||||
type BatchRunner func(ctx context.Context, limit int) (int, error)
|
||||
|
||||
func Run(ctx context.Context, runner BatchRunner, interval time.Duration, batchSize int, logger *slog.Logger, taskName string) {
|
||||
if runner == nil || interval <= 0 || batchSize <= 0 {
|
||||
return
|
||||
}
|
||||
if logger == nil {
|
||||
logger = slog.Default()
|
||||
}
|
||||
if taskName == "" {
|
||||
taskName = defaultTaskName
|
||||
}
|
||||
|
||||
RunOnce(ctx, runner, batchSize, logger, taskName)
|
||||
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
RunOnce(ctx, runner, batchSize, logger, taskName)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func RunOnce(ctx context.Context, runner BatchRunner, batchSize int, logger *slog.Logger, taskName string) {
|
||||
if runner == nil || batchSize <= 0 {
|
||||
return
|
||||
}
|
||||
if logger == nil {
|
||||
logger = slog.Default()
|
||||
}
|
||||
if taskName == "" {
|
||||
taskName = defaultTaskName
|
||||
}
|
||||
|
||||
totalProcessed := 0
|
||||
for {
|
||||
processed, err := runner(ctx, batchSize)
|
||||
if err != nil {
|
||||
if ctx.Err() == nil {
|
||||
logger.Warn(taskName+" run failed", "error", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
if processed <= 0 {
|
||||
break
|
||||
}
|
||||
totalProcessed += processed
|
||||
}
|
||||
|
||||
if totalProcessed > 0 {
|
||||
logger.Info(taskName+" completed", "processed", totalProcessed)
|
||||
}
|
||||
}
|
||||
125
worker/poller_test.go
Normal file
125
worker/poller_test.go
Normal file
@@ -0,0 +1,125 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
type fakeScheduledPostPromoter struct {
|
||||
mu sync.Mutex
|
||||
calls int
|
||||
limits []int
|
||||
results []int
|
||||
errAt int
|
||||
callHook func(int)
|
||||
}
|
||||
|
||||
func (f *fakeScheduledPostPromoter) PromoteDueScheduled(_ context.Context, limit int) (int, error) {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
|
||||
f.calls++
|
||||
callNo := f.calls
|
||||
f.limits = append(f.limits, limit)
|
||||
if f.callHook != nil {
|
||||
f.callHook(callNo)
|
||||
}
|
||||
|
||||
if f.errAt > 0 && callNo == f.errAt {
|
||||
return 0, errors.New("boom")
|
||||
}
|
||||
|
||||
if len(f.results) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
result := f.results[0]
|
||||
f.results = f.results[1:]
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (f *fakeScheduledPostPromoter) Calls() int {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
return f.calls
|
||||
}
|
||||
|
||||
func (f *fakeScheduledPostPromoter) Limits() []int {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
out := make([]int, len(f.limits))
|
||||
copy(out, f.limits)
|
||||
return out
|
||||
}
|
||||
|
||||
func newDiscardLogger() *slog.Logger {
|
||||
return slog.New(slog.NewTextHandler(io.Discard, nil))
|
||||
}
|
||||
|
||||
func TestPromoteDueScheduledOnceBatchesUntilEmpty(t *testing.T) {
|
||||
repo := &fakeScheduledPostPromoter{results: []int{2, 3, 0}}
|
||||
|
||||
RunOnce(context.Background(), repo.PromoteDueScheduled, 50, newDiscardLogger(), "scheduled post promotion")
|
||||
|
||||
if got := repo.Calls(); got != 3 {
|
||||
t.Fatalf("expected 3 calls, got %d", got)
|
||||
}
|
||||
limits := repo.Limits()
|
||||
if len(limits) != 3 || limits[0] != 50 || limits[1] != 50 || limits[2] != 50 {
|
||||
t.Fatalf("unexpected limits: %+v", limits)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPromoteDueScheduledOnceStopsOnError(t *testing.T) {
|
||||
repo := &fakeScheduledPostPromoter{
|
||||
results: []int{4, 4},
|
||||
errAt: 2,
|
||||
}
|
||||
|
||||
RunOnce(context.Background(), repo.PromoteDueScheduled, 25, newDiscardLogger(), "scheduled post promotion")
|
||||
|
||||
if got := repo.Calls(); got != 2 {
|
||||
t.Fatalf("expected 2 calls before stop, got %d", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunScheduledPostPromoterStopsOnContextCancel(t *testing.T) {
|
||||
callCh := make(chan int, 8)
|
||||
repo := &fakeScheduledPostPromoter{
|
||||
results: []int{0, 0, 0, 0},
|
||||
callHook: func(call int) {
|
||||
callCh <- call
|
||||
},
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
Run(ctx, repo.PromoteDueScheduled, 10*time.Millisecond, 10, newDiscardLogger(), "scheduled post promotion")
|
||||
close(done)
|
||||
}()
|
||||
|
||||
timeout := time.After(300 * time.Millisecond)
|
||||
for {
|
||||
select {
|
||||
case <-callCh:
|
||||
if repo.Calls() >= 2 {
|
||||
cancel()
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
case <-time.After(200 * time.Millisecond):
|
||||
t.Fatal("promoter did not stop after context cancel")
|
||||
}
|
||||
}
|
||||
case <-timeout:
|
||||
cancel()
|
||||
t.Fatal("timed out waiting for promoter calls")
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user