Spaces:
Sleeping
Sleeping
| import uuid | |
| import time | |
| from typing import Dict, Optional, Any | |
| from dataclasses import dataclass, field | |
| from enum import Enum | |
| import threading | |
| from pathlib import Path | |
| class JobStatus(str, Enum): | |
| PENDING = "pending" | |
| PROCESSING = "processing" | |
| COMPLETED = "completed" | |
| FAILED = "failed" | |
| class Job: | |
| job_id: str | |
| status: JobStatus | |
| progress: float = 0.0 | |
| message: str = "" | |
| result: Optional[Any] = None | |
| error: Optional[str] = None | |
| created_at: float = field(default_factory=time.time) | |
| updated_at: float = field(default_factory=time.time) | |
| total_steps: int = 100 | |
| current_step: int = 0 | |
| class ProgressTracker: | |
| _instance = None | |
| _init_lock = threading.Lock() | |
| def __new__(cls): | |
| if cls._instance is None: | |
| with cls._init_lock: | |
| if cls._instance is None: | |
| cls._instance = super().__new__(cls) | |
| cls._instance._jobs: Dict[str, Job] = {} | |
| cls._instance._lock = threading.RLock() | |
| cls._instance._cleanup_interval = 300 | |
| cls._instance._file_cleanup_interval = 600 | |
| return cls._instance | |
| def create_job(self, message: str = "Starting...") -> str: | |
| job_id = str(uuid.uuid4()) | |
| with self._lock: | |
| self._jobs[job_id] = Job( | |
| job_id=job_id, | |
| status=JobStatus.PENDING, | |
| message=message | |
| ) | |
| self._cleanup_old_jobs_locked() | |
| return job_id | |
| def update_progress(self, job_id: str, progress: float, message: str = "", | |
| current_step: int = 0, total_steps: int = 100): | |
| with self._lock: | |
| if job_id in self._jobs: | |
| job = self._jobs[job_id] | |
| job.progress = min(progress, 100.0) | |
| job.current_step = current_step | |
| job.total_steps = total_steps | |
| if message: | |
| job.message = message | |
| job.status = JobStatus.PROCESSING | |
| job.updated_at = time.time() | |
| def complete_job(self, job_id: str, result: Any = None, message: str = "Completed"): | |
| with self._lock: | |
| if job_id in self._jobs: | |
| job = self._jobs[job_id] | |
| job.status = JobStatus.COMPLETED | |
| job.progress = 100.0 | |
| job.message = message | |
| job.result = result | |
| job.updated_at = time.time() | |
| def fail_job(self, job_id: str, error: str): | |
| with self._lock: | |
| if job_id in self._jobs: | |
| job = self._jobs[job_id] | |
| job.status = JobStatus.FAILED | |
| job.error = error | |
| job.message = f"Failed: {error}" | |
| job.updated_at = time.time() | |
| def get_job(self, job_id: str) -> Optional[Job]: | |
| with self._lock: | |
| job = self._jobs.get(job_id) | |
| if job: | |
| return Job( | |
| job_id=job.job_id, | |
| status=job.status, | |
| progress=job.progress, | |
| message=job.message, | |
| result=job.result, | |
| error=job.error, | |
| created_at=job.created_at, | |
| updated_at=job.updated_at, | |
| total_steps=job.total_steps, | |
| current_step=job.current_step | |
| ) | |
| return None | |
| def get_progress(self, job_id: str) -> Optional[dict]: | |
| with self._lock: | |
| job = self._jobs.get(job_id) | |
| if job is None: | |
| return None | |
| return { | |
| "job_id": job.job_id, | |
| "status": job.status.value, | |
| "progress": round(job.progress, 1), | |
| "message": job.message, | |
| "current_step": job.current_step, | |
| "total_steps": job.total_steps, | |
| "has_result": job.result is not None, | |
| "error": job.error | |
| } | |
| def remove_job_and_cleanup(self, job_id: str) -> Optional[str]: | |
| """Remove a job and return the result file path for deletion.""" | |
| with self._lock: | |
| job = self._jobs.pop(job_id, None) | |
| if job and job.result: | |
| return str(job.result) | |
| return None | |
| def _cleanup_old_jobs_locked(self): | |
| """Must be called with self._lock held.""" | |
| current_time = time.time() | |
| expired_jobs = [ | |
| job_id for job_id, job in self._jobs.items() | |
| if current_time - job.updated_at > self._cleanup_interval | |
| and job.status in (JobStatus.COMPLETED, JobStatus.FAILED) | |
| ] | |
| files_to_delete = [] | |
| for job_id in expired_jobs: | |
| job = self._jobs.pop(job_id, None) | |
| if job and job.result: | |
| files_to_delete.append(str(job.result)) | |
| for file_path in files_to_delete: | |
| try: | |
| path = Path(file_path) | |
| if path.exists(): | |
| path.unlink() | |
| except Exception: | |
| pass | |
| def get_tracker() -> ProgressTracker: | |
| return ProgressTracker() | |