Spaces:
Sleeping
Sleeping
| """ | |
| Storage helpers — manages temp file lifecycle and output paths. | |
| """ | |
| import os | |
| import shutil | |
| import tempfile | |
| from pathlib import Path | |
| TEMP_DIR = Path("temp") | |
| OUTPUT_DIR = Path("outputs") | |
| def ensure_dirs(): | |
| TEMP_DIR.mkdir(exist_ok=True) | |
| OUTPUT_DIR.mkdir(exist_ok=True) | |
| def job_temp_dir(job_id: str) -> Path: | |
| """Create and return a per-job temp directory.""" | |
| d = TEMP_DIR / job_id | |
| d.mkdir(parents=True, exist_ok=True) | |
| return d | |
| def cleanup_job(job_id: str): | |
| """Delete the per-job temp directory after merge is done.""" | |
| d = TEMP_DIR / job_id | |
| if d.exists(): | |
| shutil.rmtree(d, ignore_errors=True) | |
| def output_path(job_id: str, ext: str = "mp4") -> Path: | |
| return OUTPUT_DIR / f"{job_id}.{ext}" | |
| def temp_path(job_id: str, name: str) -> Path: | |
| return job_temp_dir(job_id) / name | |
| def make_temp_file(suffix: str = ".tmp") -> str: | |
| """Return a unique temp file path (caller is responsible for deletion).""" | |
| fd, path = tempfile.mkstemp(suffix=suffix, dir=str(TEMP_DIR)) | |
| os.close(fd) | |
| return path | |