Imgenhance / progress_tracker.py
m9jaex
Add progress tracking to all image processing endpoints
115023b
import uuid
import time
from typing import Dict, Optional, Any
from dataclasses import dataclass, field
from enum import Enum
import threading
from pathlib import Path
class JobStatus(str, Enum):
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class Job:
job_id: str
status: JobStatus
progress: float = 0.0
message: str = ""
result: Optional[Any] = None
error: Optional[str] = None
created_at: float = field(default_factory=time.time)
updated_at: float = field(default_factory=time.time)
total_steps: int = 100
current_step: int = 0
class ProgressTracker:
_instance = None
_init_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._init_lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._jobs: Dict[str, Job] = {}
cls._instance._lock = threading.RLock()
cls._instance._cleanup_interval = 300
cls._instance._file_cleanup_interval = 600
return cls._instance
def create_job(self, message: str = "Starting...") -> str:
job_id = str(uuid.uuid4())
with self._lock:
self._jobs[job_id] = Job(
job_id=job_id,
status=JobStatus.PENDING,
message=message
)
self._cleanup_old_jobs_locked()
return job_id
def update_progress(self, job_id: str, progress: float, message: str = "",
current_step: int = 0, total_steps: int = 100):
with self._lock:
if job_id in self._jobs:
job = self._jobs[job_id]
job.progress = min(progress, 100.0)
job.current_step = current_step
job.total_steps = total_steps
if message:
job.message = message
job.status = JobStatus.PROCESSING
job.updated_at = time.time()
def complete_job(self, job_id: str, result: Any = None, message: str = "Completed"):
with self._lock:
if job_id in self._jobs:
job = self._jobs[job_id]
job.status = JobStatus.COMPLETED
job.progress = 100.0
job.message = message
job.result = result
job.updated_at = time.time()
def fail_job(self, job_id: str, error: str):
with self._lock:
if job_id in self._jobs:
job = self._jobs[job_id]
job.status = JobStatus.FAILED
job.error = error
job.message = f"Failed: {error}"
job.updated_at = time.time()
def get_job(self, job_id: str) -> Optional[Job]:
with self._lock:
job = self._jobs.get(job_id)
if job:
return Job(
job_id=job.job_id,
status=job.status,
progress=job.progress,
message=job.message,
result=job.result,
error=job.error,
created_at=job.created_at,
updated_at=job.updated_at,
total_steps=job.total_steps,
current_step=job.current_step
)
return None
def get_progress(self, job_id: str) -> Optional[dict]:
with self._lock:
job = self._jobs.get(job_id)
if job is None:
return None
return {
"job_id": job.job_id,
"status": job.status.value,
"progress": round(job.progress, 1),
"message": job.message,
"current_step": job.current_step,
"total_steps": job.total_steps,
"has_result": job.result is not None,
"error": job.error
}
def remove_job_and_cleanup(self, job_id: str) -> Optional[str]:
"""Remove a job and return the result file path for deletion."""
with self._lock:
job = self._jobs.pop(job_id, None)
if job and job.result:
return str(job.result)
return None
def _cleanup_old_jobs_locked(self):
"""Must be called with self._lock held."""
current_time = time.time()
expired_jobs = [
job_id for job_id, job in self._jobs.items()
if current_time - job.updated_at > self._cleanup_interval
and job.status in (JobStatus.COMPLETED, JobStatus.FAILED)
]
files_to_delete = []
for job_id in expired_jobs:
job = self._jobs.pop(job_id, None)
if job and job.result:
files_to_delete.append(str(job.result))
for file_path in files_to_delete:
try:
path = Path(file_path)
if path.exists():
path.unlink()
except Exception:
pass
def get_tracker() -> ProgressTracker:
return ProgressTracker()