Spaces:
Running on Zero
Running on Zero
| from __future__ import annotations | |
| from dataclasses import dataclass | |
| import json | |
| import os | |
| from pathlib import Path | |
| from typing import Any | |
| import uuid | |
| from hackathon_advisor.dashboard import validate_dashboard_payload | |
| from hackathon_advisor._text import utc_now | |
| LATEST_FILENAME = "latest.json" | |
| STORAGE_SCHEMA_VERSION = 1 | |
| class DashboardStorageError(RuntimeError): | |
| pass | |
| class DashboardArtifacts: | |
| projects_path: Path | |
| index_path: Path | |
| dashboard_path: Path | |
| manifest_path: Path | |
| dashboard: dict[str, Any] | |
| manifest: dict[str, Any] | |
| quest_analysis_path: Path | None = None | |
| def cache_dir_from_env(env: dict[str, str] | None = None) -> Path | None: | |
| raw = (env or os.environ).get("ADVISOR_CACHE_DIR", "").strip() | |
| return Path(raw).expanduser() if raw else None | |
| def require_writable_cache_dir(env: dict[str, str] | None = None) -> Path: | |
| cache_dir = cache_dir_from_env(env) | |
| if cache_dir is None: | |
| raise DashboardStorageError( | |
| "ADVISOR_CACHE_DIR must point to a writable dashboard cache directory. " | |
| "Use a mounted Storage Bucket in deployment or a local directory in development." | |
| ) | |
| if not cache_dir.exists() or not cache_dir.is_dir(): | |
| raise DashboardStorageError(f"ADVISOR_CACHE_DIR does not exist or is not a directory: {cache_dir}") | |
| probe = cache_dir / f".advisor-write-check-{uuid.uuid4().hex}" | |
| try: | |
| probe.write_text("ok", encoding="utf-8") | |
| probe.unlink() | |
| except OSError as error: | |
| raise DashboardStorageError(f"ADVISOR_CACHE_DIR is not writable: {cache_dir}") from error | |
| return cache_dir | |
| def load_latest_artifacts(cache_dir: Path | None) -> DashboardArtifacts | None: | |
| if cache_dir is None: | |
| return None | |
| latest_path = cache_dir / LATEST_FILENAME | |
| if not latest_path.exists(): | |
| return None | |
| latest = _read_json(latest_path) | |
| if latest.get("schema_version") != STORAGE_SCHEMA_VERSION: | |
| raise DashboardStorageError("unsupported latest dashboard storage schema") | |
| projects_path = _resolve_under(cache_dir, str(latest.get("projects") or "")) | |
| index_path = _resolve_under(cache_dir, str(latest.get("index") or "")) | |
| dashboard_path = _resolve_under(cache_dir, str(latest.get("dashboard") or "")) | |
| manifest_path = _resolve_under(cache_dir, str(latest.get("manifest") or "")) | |
| for path in (projects_path, index_path, dashboard_path, manifest_path): | |
| if not path.is_file(): | |
| raise DashboardStorageError(f"cached dashboard artifact is missing: {path}") | |
| dashboard = _read_json(dashboard_path) | |
| validate_dashboard_payload(dashboard) | |
| manifest = _read_json(manifest_path) | |
| return DashboardArtifacts( | |
| projects_path=projects_path, | |
| index_path=index_path, | |
| dashboard_path=dashboard_path, | |
| manifest_path=manifest_path, | |
| dashboard=dashboard, | |
| manifest=manifest, | |
| ) | |
| def persist_refresh_artifacts( | |
| cache_dir: Path, | |
| run_id: str, | |
| *, | |
| projects_payload: dict[str, Any], | |
| index_payload: dict[str, Any], | |
| dashboard_payload: dict[str, Any], | |
| quest_analysis_payload: dict[str, Any] | None = None, | |
| ) -> DashboardArtifacts: | |
| validate_dashboard_payload(dashboard_payload) | |
| relative_run_dir = Path("runs") / run_id | |
| run_dir = cache_dir / relative_run_dir | |
| run_dir.mkdir(parents=True, exist_ok=False) | |
| projects_path = run_dir / "projects.json" | |
| index_path = run_dir / "project_index.json" | |
| dashboard_path = run_dir / "dashboard.json" | |
| quest_analysis_path = run_dir / "quest_analysis.json" if quest_analysis_payload is not None else None | |
| manifest_path = run_dir / "manifest.json" | |
| _write_json(projects_path, projects_payload) | |
| _write_json(index_path, index_payload) | |
| _write_json(dashboard_path, dashboard_payload) | |
| if quest_analysis_path is not None: | |
| _write_json(quest_analysis_path, quest_analysis_payload) | |
| artifact_paths = { | |
| "projects": _relative(cache_dir, projects_path), | |
| "index": _relative(cache_dir, index_path), | |
| "dashboard": _relative(cache_dir, dashboard_path), | |
| } | |
| if quest_analysis_path is not None: | |
| artifact_paths["quest_analysis"] = _relative(cache_dir, quest_analysis_path) | |
| manifest = { | |
| "schema_version": STORAGE_SCHEMA_VERSION, | |
| "run_id": run_id, | |
| "generated_at": utc_now(), | |
| "project_count": dashboard_payload["project_count"], | |
| "snapshot_digest": dashboard_payload["provenance"]["snapshot_digest"], | |
| "artifacts": artifact_paths, | |
| } | |
| _write_json(manifest_path, manifest) | |
| latest = { | |
| "schema_version": STORAGE_SCHEMA_VERSION, | |
| "run_id": run_id, | |
| "generated_at": manifest["generated_at"], | |
| "projects": _relative(cache_dir, projects_path), | |
| "index": _relative(cache_dir, index_path), | |
| "dashboard": _relative(cache_dir, dashboard_path), | |
| "manifest": _relative(cache_dir, manifest_path), | |
| } | |
| if quest_analysis_path is not None: | |
| latest["quest_analysis"] = _relative(cache_dir, quest_analysis_path) | |
| latest_path = cache_dir / LATEST_FILENAME | |
| tmp_path = cache_dir / f".{LATEST_FILENAME}.{run_id}.tmp" | |
| _write_json(tmp_path, latest) | |
| os.replace(tmp_path, latest_path) | |
| return DashboardArtifacts( | |
| projects_path=projects_path, | |
| index_path=index_path, | |
| dashboard_path=dashboard_path, | |
| manifest_path=manifest_path, | |
| dashboard=dashboard_payload, | |
| manifest=manifest, | |
| quest_analysis_path=quest_analysis_path, | |
| ) | |
| def _read_json(path: Path) -> dict[str, Any]: | |
| try: | |
| payload = json.loads(path.read_text(encoding="utf-8")) | |
| except (OSError, json.JSONDecodeError) as error: | |
| raise DashboardStorageError(f"could not read dashboard artifact: {path}") from error | |
| if not isinstance(payload, dict): | |
| raise DashboardStorageError(f"dashboard artifact must be a JSON object: {path}") | |
| return payload | |
| def _write_json(path: Path, payload: dict[str, Any]) -> None: | |
| path.write_text(json.dumps(payload, indent=2, ensure_ascii=False) + "\n", encoding="utf-8") | |
| def _relative(root: Path, path: Path) -> str: | |
| return path.relative_to(root).as_posix() | |
| def _resolve_under(root: Path, relative: str) -> Path: | |
| if not relative: | |
| raise DashboardStorageError("cached dashboard artifact path is empty") | |
| target = (root / relative).resolve() | |
| root_resolved = root.resolve() | |
| if target != root_resolved and root_resolved not in target.parents: | |
| raise DashboardStorageError(f"cached dashboard artifact path escapes cache dir: {relative}") | |
| return target | |