"""Batch processing for bulk operations.""" from __future__ import annotations import uuid from datetime import datetime from typing import Dict, List, Any from enum import Enum class JobStatus(str, Enum): PENDING = "pending" PROCESSING = "processing" COMPLETED = "completed" FAILED = "failed" class BatchJob: def __init__(self, job_type: str, items: List[Any], metadata: Dict = None): self.job_id = uuid.uuid4().hex self.job_type = job_type self.items = items self.metadata = metadata or {} self.status = JobStatus.PENDING self.results = [] self.errors = [] self.created_at = datetime.utcnow() self.completed_at = None self.progress = 0 self.total = len(items) def update_progress(self, completed: int): self.progress = completed def complete(self, results: List[Any]): self.results = results self.status = JobStatus.COMPLETED self.completed_at = datetime.utcnow() def fail(self, error: str): self.status = JobStatus.FAILED self.errors.append(error) self.completed_at = datetime.utcnow() def to_dict(self) -> Dict: return { "job_id": self.job_id, "job_type": self.job_type, "status": self.status.value, "progress": self.progress, "total": self.total, "created_at": self.created_at.isoformat(), "completed_at": self.completed_at.isoformat() if self.completed_at else None, "results": self.results if self.status == JobStatus.COMPLETED else None, "errors": self.errors if self.errors else None } class BatchProcessor: def __init__(self): self._jobs: Dict[str, BatchJob] = {} def create_job(self, job_type: str, items: List[Any], metadata: Dict = None) -> str: job = BatchJob(job_type, items, metadata) self._jobs[job.job_id] = job return job.job_id def get_job(self, job_id: str) -> BatchJob | None: return self._jobs.get(job_id) def process_job(self, job_id: str, processor_func): job = self.get_job(job_id) if not job: return job.status = JobStatus.PROCESSING results = [] try: for idx, item in enumerate(job.items): result = processor_func(item) results.append(result) job.update_progress(idx + 1) job.complete(results) except Exception as e: job.fail(str(e)) def cleanup_old_jobs(self, max_age_hours: int = 24): """Remove jobs older than max_age_hours.""" current_time = datetime.utcnow() to_delete = [] for job_id, job in self._jobs.items(): if job.completed_at: age = (current_time - job.completed_at).total_seconds() / 3600 if age > max_age_hours: to_delete.append(job_id) for job_id in to_delete: del self._jobs[job_id] batch_processor = BatchProcessor()