Spaces:
Sleeping
Sleeping
| import shutil | |
| from datetime import datetime, timedelta | |
| from pathlib import Path | |
| from threading import RLock | |
| from typing import Dict, Optional | |
| from jobs.models import JobInfo, JobStatus | |
| _BASE_DIR = Path("/tmp/detection_jobs") | |
| def get_job_directory(job_id: str) -> Path: | |
| return _BASE_DIR / job_id | |
| def get_input_video_path(job_id: str) -> Path: | |
| return get_job_directory(job_id) / "input.mp4" | |
| def get_output_video_path(job_id: str) -> Path: | |
| return get_job_directory(job_id) / "output.mp4" | |
| def get_first_frame_path(job_id: str) -> Path: | |
| return get_job_directory(job_id) / "first_frame.jpg" | |
| def get_depth_output_path(job_id: str) -> Path: | |
| """Get path for depth estimation video output.""" | |
| return get_job_directory(job_id) / "depth.mp4" | |
| def get_first_frame_depth_path(job_id: str) -> Path: | |
| """Get path for first frame depth visualization.""" | |
| return get_job_directory(job_id) / "first_frame_depth.jpg" | |
| class JobStorage: | |
| def __init__(self) -> None: | |
| self._jobs: Dict[str, JobInfo] = {} | |
| self._tracks: Dict[str, Dict[int, list]] = {} # job_id -> {frame_idx -> tracks} | |
| self._lock = RLock() | |
| def create(self, job: JobInfo) -> None: | |
| with self._lock: | |
| self._jobs[job.job_id] = job | |
| self._tracks[job.job_id] = {} | |
| def set_track_data(self, job_id: str, frame_idx: int, tracks: list) -> None: | |
| with self._lock: | |
| if job_id in self._tracks: | |
| self._tracks[job_id][frame_idx] = tracks | |
| def get_track_data(self, job_id: str, frame_idx: int) -> list: | |
| with self._lock: | |
| return self._tracks.get(job_id, {}).get(frame_idx, []) | |
| def get(self, job_id: str) -> Optional[JobInfo]: | |
| with self._lock: | |
| return self._jobs.get(job_id) | |
| def update(self, job_id: str, **updates) -> None: | |
| with self._lock: | |
| job = self._jobs.get(job_id) | |
| if not job: | |
| return | |
| for key, value in updates.items(): | |
| setattr(job, key, value) | |
| def delete(self, job_id: str) -> None: | |
| with self._lock: | |
| self._jobs.pop(job_id, None) | |
| self._tracks.pop(job_id, None) | |
| shutil.rmtree(get_job_directory(job_id), ignore_errors=True) | |
| def cleanup_expired(self, max_age: timedelta) -> None: | |
| cutoff = datetime.utcnow() - max_age | |
| to_delete = [] | |
| with self._lock: | |
| for job_id, job in self._jobs.items(): | |
| if job.status in {JobStatus.COMPLETED, JobStatus.FAILED} and job.created_at < cutoff: | |
| to_delete.append(job_id) | |
| for job_id in to_delete: | |
| self.delete(job_id) | |
| _STORAGE: Optional[JobStorage] = None | |
| def get_job_storage() -> JobStorage: | |
| global _STORAGE | |
| if _STORAGE is None: | |
| _STORAGE = JobStorage() | |
| return _STORAGE | |
| def get_track_data(job_id: str, frame_idx: int) -> list: | |
| return get_job_storage().get_track_data(job_id, frame_idx) | |
| def set_track_data(job_id: str, frame_idx: int, tracks: list) -> None: | |
| get_job_storage().set_track_data(job_id, frame_idx, tracks) | |