perception2 / jobs /storage.py
Zhen Ye
feat: Real-time radar directionality via track sync
3a793fe
import shutil
from datetime import datetime, timedelta
from pathlib import Path
from threading import RLock
from typing import Dict, Optional
from jobs.models import JobInfo, JobStatus
_BASE_DIR = Path("/tmp/detection_jobs")
def get_job_directory(job_id: str) -> Path:
return _BASE_DIR / job_id
def get_input_video_path(job_id: str) -> Path:
return get_job_directory(job_id) / "input.mp4"
def get_output_video_path(job_id: str) -> Path:
return get_job_directory(job_id) / "output.mp4"
def get_first_frame_path(job_id: str) -> Path:
return get_job_directory(job_id) / "first_frame.jpg"
def get_depth_output_path(job_id: str) -> Path:
"""Get path for depth estimation video output."""
return get_job_directory(job_id) / "depth.mp4"
def get_first_frame_depth_path(job_id: str) -> Path:
"""Get path for first frame depth visualization."""
return get_job_directory(job_id) / "first_frame_depth.jpg"
class JobStorage:
def __init__(self) -> None:
self._jobs: Dict[str, JobInfo] = {}
self._tracks: Dict[str, Dict[int, list]] = {} # job_id -> {frame_idx -> tracks}
self._lock = RLock()
def create(self, job: JobInfo) -> None:
with self._lock:
self._jobs[job.job_id] = job
self._tracks[job.job_id] = {}
def set_track_data(self, job_id: str, frame_idx: int, tracks: list) -> None:
with self._lock:
if job_id in self._tracks:
self._tracks[job_id][frame_idx] = tracks
def get_track_data(self, job_id: str, frame_idx: int) -> list:
with self._lock:
return self._tracks.get(job_id, {}).get(frame_idx, [])
def get(self, job_id: str) -> Optional[JobInfo]:
with self._lock:
return self._jobs.get(job_id)
def update(self, job_id: str, **updates) -> None:
with self._lock:
job = self._jobs.get(job_id)
if not job:
return
for key, value in updates.items():
setattr(job, key, value)
def delete(self, job_id: str) -> None:
with self._lock:
self._jobs.pop(job_id, None)
self._tracks.pop(job_id, None)
shutil.rmtree(get_job_directory(job_id), ignore_errors=True)
def cleanup_expired(self, max_age: timedelta) -> None:
cutoff = datetime.utcnow() - max_age
to_delete = []
with self._lock:
for job_id, job in self._jobs.items():
if job.status in {JobStatus.COMPLETED, JobStatus.FAILED} and job.created_at < cutoff:
to_delete.append(job_id)
for job_id in to_delete:
self.delete(job_id)
_STORAGE: Optional[JobStorage] = None
def get_job_storage() -> JobStorage:
global _STORAGE
if _STORAGE is None:
_STORAGE = JobStorage()
return _STORAGE
def get_track_data(job_id: str, frame_idx: int) -> list:
return get_job_storage().get_track_data(job_id, frame_idx)
def set_track_data(job_id: str, frame_idx: int, tracks: list) -> None:
get_job_storage().set_track_data(job_id, frame_idx, tracks)