Spaces:
Paused
Paused
| from __future__ import annotations | |
| from datetime import datetime, timedelta, timezone | |
| from pathlib import Path | |
| from ..core.config import OUTPUT_DIR | |
| def cleanup_outputs(retention_days: int) -> int: | |
| if retention_days <= 0: | |
| return 0 | |
| OUTPUT_DIR.mkdir(parents=True, exist_ok=True) | |
| cutoff = datetime.now(timezone.utc) - timedelta(days=retention_days) | |
| removed = 0 | |
| for item in OUTPUT_DIR.iterdir(): | |
| if not item.is_dir(): | |
| continue | |
| dt = _parse_date(item.name) | |
| if dt and dt < cutoff: | |
| _remove_tree(item) | |
| removed += 1 | |
| return removed | |
| def _parse_date(name: str) -> datetime | None: | |
| try: | |
| return datetime.strptime(name, "%Y-%m-%d").replace(tzinfo=timezone.utc) | |
| except ValueError: | |
| return None | |
| def _remove_tree(path: Path) -> None: | |
| for child in path.glob("**/*"): | |
| if child.is_file(): | |
| child.unlink(missing_ok=True) | |
| for child in sorted(path.glob("**/*"), reverse=True): | |
| if child.is_dir(): | |
| child.rmdir() | |
| path.rmdir() | |