Spaces:
Sleeping
Sleeping
File size: 3,249 Bytes
595267b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
"""Batch processing for bulk operations."""
from __future__ import annotations
import uuid
from datetime import datetime
from typing import Dict, List, Any
from enum import Enum
class JobStatus(str, Enum):
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
class BatchJob:
def __init__(self, job_type: str, items: List[Any], metadata: Dict = None):
self.job_id = uuid.uuid4().hex
self.job_type = job_type
self.items = items
self.metadata = metadata or {}
self.status = JobStatus.PENDING
self.results = []
self.errors = []
self.created_at = datetime.utcnow()
self.completed_at = None
self.progress = 0
self.total = len(items)
def update_progress(self, completed: int):
self.progress = completed
def complete(self, results: List[Any]):
self.results = results
self.status = JobStatus.COMPLETED
self.completed_at = datetime.utcnow()
def fail(self, error: str):
self.status = JobStatus.FAILED
self.errors.append(error)
self.completed_at = datetime.utcnow()
def to_dict(self) -> Dict:
return {
"job_id": self.job_id,
"job_type": self.job_type,
"status": self.status.value,
"progress": self.progress,
"total": self.total,
"created_at": self.created_at.isoformat(),
"completed_at": self.completed_at.isoformat() if self.completed_at else None,
"results": self.results if self.status == JobStatus.COMPLETED else None,
"errors": self.errors if self.errors else None
}
class BatchProcessor:
def __init__(self):
self._jobs: Dict[str, BatchJob] = {}
def create_job(self, job_type: str, items: List[Any], metadata: Dict = None) -> str:
job = BatchJob(job_type, items, metadata)
self._jobs[job.job_id] = job
return job.job_id
def get_job(self, job_id: str) -> BatchJob | None:
return self._jobs.get(job_id)
def process_job(self, job_id: str, processor_func):
job = self.get_job(job_id)
if not job:
return
job.status = JobStatus.PROCESSING
results = []
try:
for idx, item in enumerate(job.items):
result = processor_func(item)
results.append(result)
job.update_progress(idx + 1)
job.complete(results)
except Exception as e:
job.fail(str(e))
def cleanup_old_jobs(self, max_age_hours: int = 24):
"""Remove jobs older than max_age_hours."""
current_time = datetime.utcnow()
to_delete = []
for job_id, job in self._jobs.items():
if job.completed_at:
age = (current_time - job.completed_at).total_seconds() / 3600
if age > max_age_hours:
to_delete.append(job_id)
for job_id in to_delete:
del self._jobs[job_id]
batch_processor = BatchProcessor()
|