Spaces:
Running
Running
| import os | |
| import shutil | |
| import time | |
| import uuid | |
| import asyncio | |
| import json | |
| from pathlib import Path | |
| BASE_DIR = Path("/tmp/stem-sep") | |
| MAX_AGE_SECONDS = 30 * 60 # 30 minutes | |
| CLEANUP_INTERVAL = 5 * 60 # 5 minutes | |
| def create_job() -> str: | |
| job_id = uuid.uuid4().hex[:12] | |
| job_dir = BASE_DIR / job_id | |
| job_dir.mkdir(parents=True, exist_ok=True) | |
| # Touch a marker so we can track creation time | |
| (job_dir / ".created").write_text(str(time.time())) | |
| return job_id | |
| def get_job_dir(job_id: str) -> Path: | |
| return BASE_DIR / job_id | |
| def save_upload(job_id: str, filename: str, content: bytes) -> Path: | |
| job_dir = get_job_dir(job_id) | |
| ext = Path(filename).suffix or ".wav" | |
| input_path = job_dir / f"input{ext}" | |
| input_path.write_bytes(content) | |
| return input_path | |
| def save_imported_audio(job_id: str, source_filename: str, content: bytes) -> Path: | |
| return save_upload(job_id, source_filename, content) | |
| def get_input_file(job_id: str) -> Path | None: | |
| job_dir = get_job_dir(job_id) | |
| if not job_dir.exists(): | |
| return None | |
| for file in job_dir.iterdir(): | |
| if file.is_file() and file.name.startswith("input"): | |
| return file | |
| return None | |
| def get_metadata_path(job_id: str) -> Path: | |
| return get_job_dir(job_id) / "metadata.json" | |
| def save_job_metadata(job_id: str, metadata: dict) -> Path: | |
| path = get_metadata_path(job_id) | |
| path.write_text(json.dumps(metadata, indent=2), encoding="utf-8") | |
| return path | |
| def load_job_metadata(job_id: str) -> dict | None: | |
| path = get_metadata_path(job_id) | |
| if not path.exists(): | |
| return None | |
| return json.loads(path.read_text(encoding="utf-8")) | |
| def get_output_dir(job_id: str) -> Path: | |
| out = get_job_dir(job_id) / "stems" | |
| out.mkdir(parents=True, exist_ok=True) | |
| return out | |
| def list_stem_files(job_id: str) -> dict[str, str]: | |
| """Find all stem audio files, checking stems subdir and job root.""" | |
| result = {} | |
| audio_exts = {".wav", ".mp3", ".flac", ".aac"} | |
| for check_dir in [get_output_dir(job_id), get_job_dir(job_id)]: | |
| if not check_dir.exists(): | |
| continue | |
| for f in check_dir.iterdir(): | |
| if f.is_file() and f.suffix.lower() in audio_exts and not f.name.startswith("input") and f.stem not in result: | |
| result[f.stem] = f.name | |
| return result | |
| def get_file_path(job_id: str, filename: str) -> Path | None: | |
| # Check stems dir first, then job root | |
| stem_path = get_output_dir(job_id) / filename | |
| if stem_path.exists(): | |
| return stem_path | |
| root_path = get_job_dir(job_id) / filename | |
| if root_path.exists(): | |
| return root_path | |
| return None | |
| def delete_job(job_id: str): | |
| job_dir = get_job_dir(job_id) | |
| if job_dir.exists(): | |
| shutil.rmtree(job_dir, ignore_errors=True) | |
| async def cleanup_loop(): | |
| """Background task: delete job dirs older than MAX_AGE_SECONDS.""" | |
| while True: | |
| await asyncio.sleep(CLEANUP_INTERVAL) | |
| try: | |
| if not BASE_DIR.exists(): | |
| continue | |
| now = time.time() | |
| for entry in BASE_DIR.iterdir(): | |
| if not entry.is_dir(): | |
| continue | |
| marker = entry / ".created" | |
| if marker.exists(): | |
| created = float(marker.read_text().strip()) | |
| if now - created > MAX_AGE_SECONDS: | |
| shutil.rmtree(entry, ignore_errors=True) | |
| else: | |
| # No marker — use mtime | |
| if now - entry.stat().st_mtime > MAX_AGE_SECONDS: | |
| shutil.rmtree(entry, ignore_errors=True) | |
| except Exception: | |
| pass | |