hackathon-advisor / hackathon_advisor /dashboard_storage.py
JacobLinCool's picture
deploy: sync GitHub main de5dbf9
13fe947 verified
from __future__ import annotations
from dataclasses import dataclass
import json
import os
from pathlib import Path
from typing import Any
import uuid
from hackathon_advisor.dashboard import validate_dashboard_payload
from hackathon_advisor._text import utc_now
LATEST_FILENAME = "latest.json"
STORAGE_SCHEMA_VERSION = 1
class DashboardStorageError(RuntimeError):
pass
@dataclass(frozen=True)
class DashboardArtifacts:
projects_path: Path
index_path: Path
dashboard_path: Path
manifest_path: Path
dashboard: dict[str, Any]
manifest: dict[str, Any]
quest_analysis_path: Path | None = None
def cache_dir_from_env(env: dict[str, str] | None = None) -> Path | None:
raw = (env or os.environ).get("ADVISOR_CACHE_DIR", "").strip()
return Path(raw).expanduser() if raw else None
def require_writable_cache_dir(env: dict[str, str] | None = None) -> Path:
cache_dir = cache_dir_from_env(env)
if cache_dir is None:
raise DashboardStorageError(
"ADVISOR_CACHE_DIR must point to a writable dashboard cache directory. "
"Use a mounted Storage Bucket in deployment or a local directory in development."
)
if not cache_dir.exists() or not cache_dir.is_dir():
raise DashboardStorageError(f"ADVISOR_CACHE_DIR does not exist or is not a directory: {cache_dir}")
probe = cache_dir / f".advisor-write-check-{uuid.uuid4().hex}"
try:
probe.write_text("ok", encoding="utf-8")
probe.unlink()
except OSError as error:
raise DashboardStorageError(f"ADVISOR_CACHE_DIR is not writable: {cache_dir}") from error
return cache_dir
def load_latest_artifacts(cache_dir: Path | None) -> DashboardArtifacts | None:
if cache_dir is None:
return None
latest_path = cache_dir / LATEST_FILENAME
if not latest_path.exists():
return None
latest = _read_json(latest_path)
if latest.get("schema_version") != STORAGE_SCHEMA_VERSION:
raise DashboardStorageError("unsupported latest dashboard storage schema")
projects_path = _resolve_under(cache_dir, str(latest.get("projects") or ""))
index_path = _resolve_under(cache_dir, str(latest.get("index") or ""))
dashboard_path = _resolve_under(cache_dir, str(latest.get("dashboard") or ""))
manifest_path = _resolve_under(cache_dir, str(latest.get("manifest") or ""))
for path in (projects_path, index_path, dashboard_path, manifest_path):
if not path.is_file():
raise DashboardStorageError(f"cached dashboard artifact is missing: {path}")
dashboard = _read_json(dashboard_path)
validate_dashboard_payload(dashboard)
manifest = _read_json(manifest_path)
return DashboardArtifacts(
projects_path=projects_path,
index_path=index_path,
dashboard_path=dashboard_path,
manifest_path=manifest_path,
dashboard=dashboard,
manifest=manifest,
)
def persist_refresh_artifacts(
cache_dir: Path,
run_id: str,
*,
projects_payload: dict[str, Any],
index_payload: dict[str, Any],
dashboard_payload: dict[str, Any],
quest_analysis_payload: dict[str, Any] | None = None,
) -> DashboardArtifacts:
validate_dashboard_payload(dashboard_payload)
relative_run_dir = Path("runs") / run_id
run_dir = cache_dir / relative_run_dir
run_dir.mkdir(parents=True, exist_ok=False)
projects_path = run_dir / "projects.json"
index_path = run_dir / "project_index.json"
dashboard_path = run_dir / "dashboard.json"
quest_analysis_path = run_dir / "quest_analysis.json" if quest_analysis_payload is not None else None
manifest_path = run_dir / "manifest.json"
_write_json(projects_path, projects_payload)
_write_json(index_path, index_payload)
_write_json(dashboard_path, dashboard_payload)
if quest_analysis_path is not None:
_write_json(quest_analysis_path, quest_analysis_payload)
artifact_paths = {
"projects": _relative(cache_dir, projects_path),
"index": _relative(cache_dir, index_path),
"dashboard": _relative(cache_dir, dashboard_path),
}
if quest_analysis_path is not None:
artifact_paths["quest_analysis"] = _relative(cache_dir, quest_analysis_path)
manifest = {
"schema_version": STORAGE_SCHEMA_VERSION,
"run_id": run_id,
"generated_at": utc_now(),
"project_count": dashboard_payload["project_count"],
"snapshot_digest": dashboard_payload["provenance"]["snapshot_digest"],
"artifacts": artifact_paths,
}
_write_json(manifest_path, manifest)
latest = {
"schema_version": STORAGE_SCHEMA_VERSION,
"run_id": run_id,
"generated_at": manifest["generated_at"],
"projects": _relative(cache_dir, projects_path),
"index": _relative(cache_dir, index_path),
"dashboard": _relative(cache_dir, dashboard_path),
"manifest": _relative(cache_dir, manifest_path),
}
if quest_analysis_path is not None:
latest["quest_analysis"] = _relative(cache_dir, quest_analysis_path)
latest_path = cache_dir / LATEST_FILENAME
tmp_path = cache_dir / f".{LATEST_FILENAME}.{run_id}.tmp"
_write_json(tmp_path, latest)
os.replace(tmp_path, latest_path)
return DashboardArtifacts(
projects_path=projects_path,
index_path=index_path,
dashboard_path=dashboard_path,
manifest_path=manifest_path,
dashboard=dashboard_payload,
manifest=manifest,
quest_analysis_path=quest_analysis_path,
)
def _read_json(path: Path) -> dict[str, Any]:
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError) as error:
raise DashboardStorageError(f"could not read dashboard artifact: {path}") from error
if not isinstance(payload, dict):
raise DashboardStorageError(f"dashboard artifact must be a JSON object: {path}")
return payload
def _write_json(path: Path, payload: dict[str, Any]) -> None:
path.write_text(json.dumps(payload, indent=2, ensure_ascii=False) + "\n", encoding="utf-8")
def _relative(root: Path, path: Path) -> str:
return path.relative_to(root).as_posix()
def _resolve_under(root: Path, relative: str) -> Path:
if not relative:
raise DashboardStorageError("cached dashboard artifact path is empty")
target = (root / relative).resolve()
root_resolved = root.resolve()
if target != root_resolved and root_resolved not in target.parents:
raise DashboardStorageError(f"cached dashboard artifact path escapes cache dir: {relative}")
return target