""" Utility — File & Job Directory Manager Creates/cleans temp directories, tracks disk usage, handles expiry. """ import logging import shutil import uuid import time from pathlib import Path from config import TEMP_DIR, JOB_EXPIRY_HOURS logger = logging.getLogger(__name__) def create_job_directory() -> tuple: """Create a unique job directory. Returns (job_id, job_dir).""" job_id = str(uuid.uuid4())[:8] job_dir = TEMP_DIR / job_id job_dir.mkdir(parents=True, exist_ok=True) logger.info(f"Created job directory: {job_dir}") return job_id, job_dir def get_job_directory(job_id: str) -> Path: """Get the directory for a given job ID.""" job_dir = TEMP_DIR / job_id if not job_dir.exists(): raise FileNotFoundError(f"Job directory not found: {job_id}") return job_dir def cleanup_job(job_id: str): """Remove all temp files for a job.""" job_dir = TEMP_DIR / job_id if job_dir.exists(): shutil.rmtree(job_dir) logger.info(f"Cleaned up job: {job_id}") def cleanup_expired_jobs(): """Remove job directories older than JOB_EXPIRY_HOURS.""" if not TEMP_DIR.exists(): return now = time.time() expiry_seconds = JOB_EXPIRY_HOURS * 3600 cleaned = 0 for job_dir in TEMP_DIR.iterdir(): if job_dir.is_dir(): age = now - job_dir.stat().st_mtime if age > expiry_seconds: shutil.rmtree(job_dir) logger.info(f"Expired job cleaned: {job_dir.name}") cleaned += 1 if cleaned: logger.info(f"Cleaned {cleaned} expired jobs") def get_job_disk_usage(job_id: str) -> str: """Get human-readable disk usage for a job.""" job_dir = TEMP_DIR / job_id if not job_dir.exists(): return "0 B" total = sum(f.stat().st_size for f in job_dir.rglob("*") if f.is_file()) if total > 1e9: return f"{total / 1e9:.1f} GB" elif total > 1e6: return f"{total / 1e6:.1f} MB" elif total > 1e3: return f"{total / 1e3:.1f} KB" return f"{total} B"