stem-separator / backend /file_manager.py
sourav-das's picture
Upload folder using huggingface_hub
7dfae77 verified
import os
import shutil
import time
import uuid
import asyncio
import json
from pathlib import Path
BASE_DIR = Path("/tmp/stem-sep")
MAX_AGE_SECONDS = 30 * 60 # 30 minutes
CLEANUP_INTERVAL = 5 * 60 # 5 minutes
def create_job() -> str:
job_id = uuid.uuid4().hex[:12]
job_dir = BASE_DIR / job_id
job_dir.mkdir(parents=True, exist_ok=True)
# Touch a marker so we can track creation time
(job_dir / ".created").write_text(str(time.time()))
return job_id
def get_job_dir(job_id: str) -> Path:
return BASE_DIR / job_id
def save_upload(job_id: str, filename: str, content: bytes) -> Path:
job_dir = get_job_dir(job_id)
ext = Path(filename).suffix or ".wav"
input_path = job_dir / f"input{ext}"
input_path.write_bytes(content)
return input_path
def save_imported_audio(job_id: str, source_filename: str, content: bytes) -> Path:
return save_upload(job_id, source_filename, content)
def get_input_file(job_id: str) -> Path | None:
job_dir = get_job_dir(job_id)
if not job_dir.exists():
return None
for file in job_dir.iterdir():
if file.is_file() and file.name.startswith("input"):
return file
return None
def get_metadata_path(job_id: str) -> Path:
return get_job_dir(job_id) / "metadata.json"
def save_job_metadata(job_id: str, metadata: dict) -> Path:
path = get_metadata_path(job_id)
path.write_text(json.dumps(metadata, indent=2), encoding="utf-8")
return path
def load_job_metadata(job_id: str) -> dict | None:
path = get_metadata_path(job_id)
if not path.exists():
return None
return json.loads(path.read_text(encoding="utf-8"))
def get_output_dir(job_id: str) -> Path:
out = get_job_dir(job_id) / "stems"
out.mkdir(parents=True, exist_ok=True)
return out
def list_stem_files(job_id: str) -> dict[str, str]:
"""Find all stem audio files, checking stems subdir and job root."""
result = {}
audio_exts = {".wav", ".mp3", ".flac", ".aac"}
for check_dir in [get_output_dir(job_id), get_job_dir(job_id)]:
if not check_dir.exists():
continue
for f in check_dir.iterdir():
if f.is_file() and f.suffix.lower() in audio_exts and not f.name.startswith("input") and f.stem not in result:
result[f.stem] = f.name
return result
def get_file_path(job_id: str, filename: str) -> Path | None:
# Check stems dir first, then job root
stem_path = get_output_dir(job_id) / filename
if stem_path.exists():
return stem_path
root_path = get_job_dir(job_id) / filename
if root_path.exists():
return root_path
return None
def delete_job(job_id: str):
job_dir = get_job_dir(job_id)
if job_dir.exists():
shutil.rmtree(job_dir, ignore_errors=True)
async def cleanup_loop():
"""Background task: delete job dirs older than MAX_AGE_SECONDS."""
while True:
await asyncio.sleep(CLEANUP_INTERVAL)
try:
if not BASE_DIR.exists():
continue
now = time.time()
for entry in BASE_DIR.iterdir():
if not entry.is_dir():
continue
marker = entry / ".created"
if marker.exists():
created = float(marker.read_text().strip())
if now - created > MAX_AGE_SECONDS:
shutil.rmtree(entry, ignore_errors=True)
else:
# No marker — use mtime
if now - entry.stat().st_mtime > MAX_AGE_SECONDS:
shutil.rmtree(entry, ignore_errors=True)
except Exception:
pass