import uuid import time from typing import Dict, Optional, Any from dataclasses import dataclass, field from enum import Enum import threading from pathlib import Path class JobStatus(str, Enum): PENDING = "pending" PROCESSING = "processing" COMPLETED = "completed" FAILED = "failed" @dataclass class Job: job_id: str status: JobStatus progress: float = 0.0 message: str = "" result: Optional[Any] = None error: Optional[str] = None created_at: float = field(default_factory=time.time) updated_at: float = field(default_factory=time.time) total_steps: int = 100 current_step: int = 0 class ProgressTracker: _instance = None _init_lock = threading.Lock() def __new__(cls): if cls._instance is None: with cls._init_lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._jobs: Dict[str, Job] = {} cls._instance._lock = threading.RLock() cls._instance._cleanup_interval = 300 cls._instance._file_cleanup_interval = 600 return cls._instance def create_job(self, message: str = "Starting...") -> str: job_id = str(uuid.uuid4()) with self._lock: self._jobs[job_id] = Job( job_id=job_id, status=JobStatus.PENDING, message=message ) self._cleanup_old_jobs_locked() return job_id def update_progress(self, job_id: str, progress: float, message: str = "", current_step: int = 0, total_steps: int = 100): with self._lock: if job_id in self._jobs: job = self._jobs[job_id] job.progress = min(progress, 100.0) job.current_step = current_step job.total_steps = total_steps if message: job.message = message job.status = JobStatus.PROCESSING job.updated_at = time.time() def complete_job(self, job_id: str, result: Any = None, message: str = "Completed"): with self._lock: if job_id in self._jobs: job = self._jobs[job_id] job.status = JobStatus.COMPLETED job.progress = 100.0 job.message = message job.result = result job.updated_at = time.time() def fail_job(self, job_id: str, error: str): with self._lock: if job_id in self._jobs: job = self._jobs[job_id] job.status = JobStatus.FAILED job.error = error job.message = f"Failed: {error}" job.updated_at = time.time() def get_job(self, job_id: str) -> Optional[Job]: with self._lock: job = self._jobs.get(job_id) if job: return Job( job_id=job.job_id, status=job.status, progress=job.progress, message=job.message, result=job.result, error=job.error, created_at=job.created_at, updated_at=job.updated_at, total_steps=job.total_steps, current_step=job.current_step ) return None def get_progress(self, job_id: str) -> Optional[dict]: with self._lock: job = self._jobs.get(job_id) if job is None: return None return { "job_id": job.job_id, "status": job.status.value, "progress": round(job.progress, 1), "message": job.message, "current_step": job.current_step, "total_steps": job.total_steps, "has_result": job.result is not None, "error": job.error } def remove_job_and_cleanup(self, job_id: str) -> Optional[str]: """Remove a job and return the result file path for deletion.""" with self._lock: job = self._jobs.pop(job_id, None) if job and job.result: return str(job.result) return None def _cleanup_old_jobs_locked(self): """Must be called with self._lock held.""" current_time = time.time() expired_jobs = [ job_id for job_id, job in self._jobs.items() if current_time - job.updated_at > self._cleanup_interval and job.status in (JobStatus.COMPLETED, JobStatus.FAILED) ] files_to_delete = [] for job_id in expired_jobs: job = self._jobs.pop(job_id, None) if job and job.result: files_to_delete.append(str(job.result)) for file_path in files_to_delete: try: path = Path(file_path) if path.exists(): path.unlink() except Exception: pass def get_tracker() -> ProgressTracker: return ProgressTracker()