71 lines
1.4 KiB
Go
71 lines
1.4 KiB
Go
package worker
|
|
|
|
import (
|
|
"context"
|
|
"log/slog"
|
|
"time"
|
|
)
|
|
|
|
const defaultTaskName = "batch worker"
|
|
|
|
// BatchRunner executes one batch and returns the number of processed records.
|
|
// Returning 0 stops the current drain cycle.
|
|
type BatchRunner func(ctx context.Context, limit int) (int, error)
|
|
|
|
func Run(ctx context.Context, runner BatchRunner, interval time.Duration, batchSize int, logger *slog.Logger, taskName string) {
|
|
if runner == nil || interval <= 0 || batchSize <= 0 {
|
|
return
|
|
}
|
|
if logger == nil {
|
|
logger = slog.Default()
|
|
}
|
|
if taskName == "" {
|
|
taskName = defaultTaskName
|
|
}
|
|
|
|
RunOnce(ctx, runner, batchSize, logger, taskName)
|
|
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
RunOnce(ctx, runner, batchSize, logger, taskName)
|
|
}
|
|
}
|
|
}
|
|
|
|
func RunOnce(ctx context.Context, runner BatchRunner, batchSize int, logger *slog.Logger, taskName string) {
|
|
if runner == nil || batchSize <= 0 {
|
|
return
|
|
}
|
|
if logger == nil {
|
|
logger = slog.Default()
|
|
}
|
|
if taskName == "" {
|
|
taskName = defaultTaskName
|
|
}
|
|
|
|
totalProcessed := 0
|
|
for {
|
|
processed, err := runner(ctx, batchSize)
|
|
if err != nil {
|
|
if ctx.Err() == nil {
|
|
logger.Warn(taskName+" run failed", "error", err)
|
|
}
|
|
return
|
|
}
|
|
if processed <= 0 {
|
|
break
|
|
}
|
|
totalProcessed += processed
|
|
}
|
|
|
|
if totalProcessed > 0 {
|
|
logger.Info(taskName+" completed", "processed", totalProcessed)
|
|
}
|
|
}
|