Spaces:
Sleeping
Sleeping
File size: 5,242 Bytes
d037fc6 115023b d037fc6 115023b d037fc6 115023b d037fc6 115023b d037fc6 115023b d037fc6 115023b d037fc6 115023b d037fc6 115023b d037fc6 115023b d037fc6 115023b d037fc6 115023b d037fc6 115023b d037fc6 115023b d037fc6 115023b d037fc6 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 | import uuid
import time
from typing import Dict, Optional, Any
from dataclasses import dataclass, field
from enum import Enum
import threading
from pathlib import Path
class JobStatus(str, Enum):
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class Job:
job_id: str
status: JobStatus
progress: float = 0.0
message: str = ""
result: Optional[Any] = None
error: Optional[str] = None
created_at: float = field(default_factory=time.time)
updated_at: float = field(default_factory=time.time)
total_steps: int = 100
current_step: int = 0
class ProgressTracker:
_instance = None
_init_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._init_lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._jobs: Dict[str, Job] = {}
cls._instance._lock = threading.RLock()
cls._instance._cleanup_interval = 300
cls._instance._file_cleanup_interval = 600
return cls._instance
def create_job(self, message: str = "Starting...") -> str:
job_id = str(uuid.uuid4())
with self._lock:
self._jobs[job_id] = Job(
job_id=job_id,
status=JobStatus.PENDING,
message=message
)
self._cleanup_old_jobs_locked()
return job_id
def update_progress(self, job_id: str, progress: float, message: str = "",
current_step: int = 0, total_steps: int = 100):
with self._lock:
if job_id in self._jobs:
job = self._jobs[job_id]
job.progress = min(progress, 100.0)
job.current_step = current_step
job.total_steps = total_steps
if message:
job.message = message
job.status = JobStatus.PROCESSING
job.updated_at = time.time()
def complete_job(self, job_id: str, result: Any = None, message: str = "Completed"):
with self._lock:
if job_id in self._jobs:
job = self._jobs[job_id]
job.status = JobStatus.COMPLETED
job.progress = 100.0
job.message = message
job.result = result
job.updated_at = time.time()
def fail_job(self, job_id: str, error: str):
with self._lock:
if job_id in self._jobs:
job = self._jobs[job_id]
job.status = JobStatus.FAILED
job.error = error
job.message = f"Failed: {error}"
job.updated_at = time.time()
def get_job(self, job_id: str) -> Optional[Job]:
with self._lock:
job = self._jobs.get(job_id)
if job:
return Job(
job_id=job.job_id,
status=job.status,
progress=job.progress,
message=job.message,
result=job.result,
error=job.error,
created_at=job.created_at,
updated_at=job.updated_at,
total_steps=job.total_steps,
current_step=job.current_step
)
return None
def get_progress(self, job_id: str) -> Optional[dict]:
with self._lock:
job = self._jobs.get(job_id)
if job is None:
return None
return {
"job_id": job.job_id,
"status": job.status.value,
"progress": round(job.progress, 1),
"message": job.message,
"current_step": job.current_step,
"total_steps": job.total_steps,
"has_result": job.result is not None,
"error": job.error
}
def remove_job_and_cleanup(self, job_id: str) -> Optional[str]:
"""Remove a job and return the result file path for deletion."""
with self._lock:
job = self._jobs.pop(job_id, None)
if job and job.result:
return str(job.result)
return None
def _cleanup_old_jobs_locked(self):
"""Must be called with self._lock held."""
current_time = time.time()
expired_jobs = [
job_id for job_id, job in self._jobs.items()
if current_time - job.updated_at > self._cleanup_interval
and job.status in (JobStatus.COMPLETED, JobStatus.FAILED)
]
files_to_delete = []
for job_id in expired_jobs:
job = self._jobs.pop(job_id, None)
if job and job.result:
files_to_delete.append(str(job.result))
for file_path in files_to_delete:
try:
path = Path(file_path)
if path.exists():
path.unlink()
except Exception:
pass
def get_tracker() -> ProgressTracker:
return ProgressTracker()
|