Spaces:
Running on Zero
Running on Zero
File size: 6,668 Bytes
4791c0a 13fe947 4791c0a c810fc6 4791c0a c810fc6 4791c0a c810fc6 4791c0a c810fc6 4791c0a 13fe947 4791c0a c810fc6 4791c0a c810fc6 4791c0a c810fc6 4791c0a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 | from __future__ import annotations
from dataclasses import dataclass
import json
import os
from pathlib import Path
from typing import Any
import uuid
from hackathon_advisor.dashboard import validate_dashboard_payload
from hackathon_advisor._text import utc_now
LATEST_FILENAME = "latest.json"
STORAGE_SCHEMA_VERSION = 1
class DashboardStorageError(RuntimeError):
pass
@dataclass(frozen=True)
class DashboardArtifacts:
projects_path: Path
index_path: Path
dashboard_path: Path
manifest_path: Path
dashboard: dict[str, Any]
manifest: dict[str, Any]
quest_analysis_path: Path | None = None
def cache_dir_from_env(env: dict[str, str] | None = None) -> Path | None:
raw = (env or os.environ).get("ADVISOR_CACHE_DIR", "").strip()
return Path(raw).expanduser() if raw else None
def require_writable_cache_dir(env: dict[str, str] | None = None) -> Path:
cache_dir = cache_dir_from_env(env)
if cache_dir is None:
raise DashboardStorageError(
"ADVISOR_CACHE_DIR must point to a writable dashboard cache directory. "
"Use a mounted Storage Bucket in deployment or a local directory in development."
)
if not cache_dir.exists() or not cache_dir.is_dir():
raise DashboardStorageError(f"ADVISOR_CACHE_DIR does not exist or is not a directory: {cache_dir}")
probe = cache_dir / f".advisor-write-check-{uuid.uuid4().hex}"
try:
probe.write_text("ok", encoding="utf-8")
probe.unlink()
except OSError as error:
raise DashboardStorageError(f"ADVISOR_CACHE_DIR is not writable: {cache_dir}") from error
return cache_dir
def load_latest_artifacts(cache_dir: Path | None) -> DashboardArtifacts | None:
if cache_dir is None:
return None
latest_path = cache_dir / LATEST_FILENAME
if not latest_path.exists():
return None
latest = _read_json(latest_path)
if latest.get("schema_version") != STORAGE_SCHEMA_VERSION:
raise DashboardStorageError("unsupported latest dashboard storage schema")
projects_path = _resolve_under(cache_dir, str(latest.get("projects") or ""))
index_path = _resolve_under(cache_dir, str(latest.get("index") or ""))
dashboard_path = _resolve_under(cache_dir, str(latest.get("dashboard") or ""))
manifest_path = _resolve_under(cache_dir, str(latest.get("manifest") or ""))
for path in (projects_path, index_path, dashboard_path, manifest_path):
if not path.is_file():
raise DashboardStorageError(f"cached dashboard artifact is missing: {path}")
dashboard = _read_json(dashboard_path)
validate_dashboard_payload(dashboard)
manifest = _read_json(manifest_path)
return DashboardArtifacts(
projects_path=projects_path,
index_path=index_path,
dashboard_path=dashboard_path,
manifest_path=manifest_path,
dashboard=dashboard,
manifest=manifest,
)
def persist_refresh_artifacts(
cache_dir: Path,
run_id: str,
*,
projects_payload: dict[str, Any],
index_payload: dict[str, Any],
dashboard_payload: dict[str, Any],
quest_analysis_payload: dict[str, Any] | None = None,
) -> DashboardArtifacts:
validate_dashboard_payload(dashboard_payload)
relative_run_dir = Path("runs") / run_id
run_dir = cache_dir / relative_run_dir
run_dir.mkdir(parents=True, exist_ok=False)
projects_path = run_dir / "projects.json"
index_path = run_dir / "project_index.json"
dashboard_path = run_dir / "dashboard.json"
quest_analysis_path = run_dir / "quest_analysis.json" if quest_analysis_payload is not None else None
manifest_path = run_dir / "manifest.json"
_write_json(projects_path, projects_payload)
_write_json(index_path, index_payload)
_write_json(dashboard_path, dashboard_payload)
if quest_analysis_path is not None:
_write_json(quest_analysis_path, quest_analysis_payload)
artifact_paths = {
"projects": _relative(cache_dir, projects_path),
"index": _relative(cache_dir, index_path),
"dashboard": _relative(cache_dir, dashboard_path),
}
if quest_analysis_path is not None:
artifact_paths["quest_analysis"] = _relative(cache_dir, quest_analysis_path)
manifest = {
"schema_version": STORAGE_SCHEMA_VERSION,
"run_id": run_id,
"generated_at": utc_now(),
"project_count": dashboard_payload["project_count"],
"snapshot_digest": dashboard_payload["provenance"]["snapshot_digest"],
"artifacts": artifact_paths,
}
_write_json(manifest_path, manifest)
latest = {
"schema_version": STORAGE_SCHEMA_VERSION,
"run_id": run_id,
"generated_at": manifest["generated_at"],
"projects": _relative(cache_dir, projects_path),
"index": _relative(cache_dir, index_path),
"dashboard": _relative(cache_dir, dashboard_path),
"manifest": _relative(cache_dir, manifest_path),
}
if quest_analysis_path is not None:
latest["quest_analysis"] = _relative(cache_dir, quest_analysis_path)
latest_path = cache_dir / LATEST_FILENAME
tmp_path = cache_dir / f".{LATEST_FILENAME}.{run_id}.tmp"
_write_json(tmp_path, latest)
os.replace(tmp_path, latest_path)
return DashboardArtifacts(
projects_path=projects_path,
index_path=index_path,
dashboard_path=dashboard_path,
manifest_path=manifest_path,
dashboard=dashboard_payload,
manifest=manifest,
quest_analysis_path=quest_analysis_path,
)
def _read_json(path: Path) -> dict[str, Any]:
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError) as error:
raise DashboardStorageError(f"could not read dashboard artifact: {path}") from error
if not isinstance(payload, dict):
raise DashboardStorageError(f"dashboard artifact must be a JSON object: {path}")
return payload
def _write_json(path: Path, payload: dict[str, Any]) -> None:
path.write_text(json.dumps(payload, indent=2, ensure_ascii=False) + "\n", encoding="utf-8")
def _relative(root: Path, path: Path) -> str:
return path.relative_to(root).as_posix()
def _resolve_under(root: Path, relative: str) -> Path:
if not relative:
raise DashboardStorageError("cached dashboard artifact path is empty")
target = (root / relative).resolve()
root_resolved = root.resolve()
if target != root_resolved and root_resolved not in target.parents:
raise DashboardStorageError(f"cached dashboard artifact path escapes cache dir: {relative}")
return target
|