package worker import ( "context" "log/slog" "time" ) const defaultTaskName = "batch worker" // BatchRunner executes one batch and returns the number of processed records. // Returning 0 stops the current drain cycle. type BatchRunner func(ctx context.Context, limit int) (int, error) func Run(ctx context.Context, runner BatchRunner, interval time.Duration, batchSize int, logger *slog.Logger, taskName string) { if runner == nil || interval <= 0 || batchSize <= 0 { return } if logger == nil { logger = slog.Default() } if taskName == "" { taskName = defaultTaskName } RunOnce(ctx, runner, batchSize, logger, taskName) ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: RunOnce(ctx, runner, batchSize, logger, taskName) } } } func RunOnce(ctx context.Context, runner BatchRunner, batchSize int, logger *slog.Logger, taskName string) { if runner == nil || batchSize <= 0 { return } if logger == nil { logger = slog.Default() } if taskName == "" { taskName = defaultTaskName } totalProcessed := 0 for { processed, err := runner(ctx, batchSize) if err != nil { if ctx.Err() == nil { logger.Warn(taskName+" run failed", "error", err) } return } if processed <= 0 { break } totalProcessed += processed } if totalProcessed > 0 { logger.Info(taskName+" completed", "processed", totalProcessed) } }