| |
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import ssl |
| import sys |
| from typing import Any, Dict, List, Optional |
| from urllib.error import HTTPError, URLError |
| from urllib.parse import urljoin, urlparse |
| from urllib.request import urlopen |
|
|
| try: |
| import certifi |
| except Exception: |
| certifi = None |
|
|
| FORBIDDEN_AUDIENCE_VALUES = { |
| "https://masters-toolkit-api", |
| "https://masters-toolkit-api/", |
| } |
| FORBIDDEN_AUTH_TEXT = ( |
| "legacy placeholder https://masters-toolkit-api", |
| "service not found: https://masters-toolkit-api", |
| ) |
|
|
|
|
| def _parse_bool(value: str) -> bool: |
| return str(value or "").strip().lower() in {"1", "true", "yes", "on"} |
|
|
|
|
| def _normalize_origin(value: str) -> str: |
| raw = str(value or "").strip() |
| if not raw: |
| return "" |
| parsed = urlparse(raw) |
| if parsed.scheme not in {"http", "https"} or not parsed.netloc: |
| return "" |
| host = parsed.hostname or "" |
| if not host: |
| return "" |
| port = f":{parsed.port}" if parsed.port else "" |
| return f"{parsed.scheme.lower()}://{host.lower()}{port}" |
|
|
|
|
| def _build_ssl_context() -> ssl.SSLContext: |
| cafile = "" |
| try: |
| cafile = str(certifi.where() or "").strip() if certifi is not None else "" |
| except Exception: |
| cafile = "" |
| if cafile: |
| try: |
| return ssl.create_default_context(cafile=cafile) |
| except Exception: |
| pass |
| return ssl.create_default_context() |
|
|
|
|
| def _fetch_json(base_url: str, path: str, timeout_s: float) -> Dict[str, Any]: |
| url = urljoin(base_url.rstrip("/") + "/", path.lstrip("/")) |
| with urlopen(url, timeout=timeout_s, context=_build_ssl_context()) as response: |
| raw = response.read().decode("utf-8") |
| return json.loads(raw) |
|
|
|
|
| def _collect_auth_text(build_info: Dict[str, Any], health: Dict[str, Any]) -> List[str]: |
| auth = health.get("auth") if isinstance(health.get("auth"), dict) else {} |
| out: List[str] = [] |
| for key in ( |
| "auth_config_error", |
| "config_error", |
| ): |
| for payload in (build_info, auth): |
| value = str(payload.get(key) or "").strip() |
| if value: |
| out.append(value) |
| for key in ( |
| "auth_config_details", |
| "auth_config_warnings", |
| "config_details", |
| "config_warnings", |
| ): |
| for payload in (build_info, auth): |
| value = payload.get(key) |
| if isinstance(value, list): |
| out.extend(str(item or "").strip() for item in value if str(item or "").strip()) |
| return out |
|
|
|
|
| def _build_report(args: argparse.Namespace) -> Dict[str, Any]: |
| failures: List[str] = [] |
| warnings: List[str] = [] |
|
|
| build_info = _fetch_json(args.base_url, "/build-info", args.timeout_s) |
| health: Dict[str, Any] = {} |
| health_ok: Optional[bool] = None |
| health_status_code: Optional[int] = None |
| health_access = "ok" |
| router_catalog_status: Dict[str, Any] = {} |
| router_catalog_loaded: Optional[bool] = None |
| router_catalog_product_count: Optional[int] = None |
| router_catalog_access = "not_checked" |
| router_catalog_status_code: Optional[int] = None |
| try: |
| health = _fetch_json(args.base_url, "/api/health", args.timeout_s) |
| health_ok = bool(health.get("ok", False)) |
| except HTTPError as exc: |
| health_status_code = int(exc.code) |
| if health_status_code in {401, 403} and args.expect_auth_required: |
| health_access = "protected" |
| warnings.append( |
| f"Hosted /api/health returned HTTP {health_status_code}; treating it as an expected auth-protected endpoint." |
| ) |
| else: |
| raise |
| auth = health.get("auth") if isinstance(health.get("auth"), dict) else {} |
| router_bootstrap = build_info.get("rapid_router_catalog_bootstrap") if isinstance(build_info.get("rapid_router_catalog_bootstrap"), dict) else {} |
| router_bootstrap_found = bool(router_bootstrap.get("found", False)) |
| router_bootstrap_load_attempted = bool(router_bootstrap.get("load_attempted", False)) |
| router_bootstrap_load_succeeded = bool(router_bootstrap.get("load_succeeded", False)) |
| router_bootstrap_already_loaded = bool(router_bootstrap.get("already_loaded", False)) |
| router_bootstrap_error = str(router_bootstrap.get("error") or "").strip() |
| router_bootstrap_source = str(router_bootstrap.get("source") or "").strip() |
| router_bootstrap_dataset_repo_id = str(router_bootstrap.get("dataset_repo_id") or "").strip() |
| router_bootstrap_dataset_revision = str(router_bootstrap.get("dataset_revision") or "").strip() |
| router_bootstrap_dataset_manifest_path = str(router_bootstrap.get("dataset_manifest_path") or "").strip() |
| router_bootstrap_dataset_path = str(router_bootstrap.get("dataset_path") or "").strip() |
| router_bootstrap_checksum_sha256 = str(router_bootstrap.get("checksum_sha256") or "").strip() |
| router_bootstrap_published_at = str(router_bootstrap.get("published_at") or "").strip() |
|
|
| if args.require_router_workbook_loaded: |
| router_catalog_access = "ok" |
| try: |
| router_catalog_status = _fetch_json(args.base_url, "/api/rapid_router/catalog/status", args.timeout_s) |
| catalog = router_catalog_status.get("catalog") if isinstance(router_catalog_status.get("catalog"), dict) else {} |
| router_catalog_loaded = bool(catalog.get("loaded", False)) |
| try: |
| router_catalog_product_count = int(catalog.get("product_count")) if catalog.get("product_count") is not None else None |
| except Exception: |
| router_catalog_product_count = None |
| except HTTPError as exc: |
| router_catalog_status_code = int(exc.code) |
| if router_catalog_status_code in {401, 403} and args.expect_auth_required: |
| router_catalog_access = "protected" |
| if router_bootstrap_load_succeeded or router_bootstrap_already_loaded: |
| router_catalog_loaded = True |
| warnings.append( |
| "Hosted /api/rapid_router/catalog/status returned " |
| f"HTTP {router_catalog_status_code}; inferring a loaded router workbook catalog from build-info bootstrap because the endpoint is auth-protected." |
| ) |
| else: |
| failures.append( |
| "Hosted /api/rapid_router/catalog/status is auth-protected, and build-info did not confirm a loaded router workbook catalog." |
| ) |
| else: |
| router_catalog_access = "error" |
| failures.append( |
| "Hosted /api/rapid_router/catalog/status could not be validated" |
| f" (HTTP {router_catalog_status_code})." |
| ) |
| except (URLError, TimeoutError, json.JSONDecodeError) as exc: |
| router_catalog_access = "error" |
| failures.append( |
| "Hosted /api/rapid_router/catalog/status could not be validated" |
| f" ({type(exc).__name__}: {exc})." |
| ) |
|
|
| build_version = str(build_info.get("build_version") or "").strip() |
| git_sha = str(build_info.get("git_sha") or "").strip() |
| startup_integrity_ok = bool(build_info.get("startup_integrity_ok", False)) |
| auth_required = bool(build_info.get("auth_required", auth.get("required", False))) |
| auth_enabled = bool(build_info.get("auth_enabled", auth.get("enabled", False))) |
| audience = str(auth.get("audience") or "").strip() |
| auth_text = _collect_auth_text(build_info, health) |
| expected_origin = _normalize_origin(args.base_url) |
| app_base_url = str(build_info.get("app_base_url") or "").strip() |
| vite_app_base_url = str(build_info.get("vite_app_base_url") or "").strip() |
| app_base_origin = _normalize_origin(app_base_url) |
| vite_app_base_origin = _normalize_origin(vite_app_base_url) |
|
|
| if args.expected_build_version and build_version != args.expected_build_version: |
| failures.append( |
| f"Hosted build_version mismatch: expected '{args.expected_build_version}', got '{build_version or '<empty>'}'." |
| ) |
| if args.expected_git_sha and git_sha != args.expected_git_sha: |
| failures.append(f"Hosted git_sha mismatch: expected '{args.expected_git_sha}', got '{git_sha or '<empty>'}'.") |
| if not startup_integrity_ok: |
| failures.append("Hosted startup integrity is not healthy.") |
| if health_ok is False: |
| failures.append("Hosted /api/health returned ok=false.") |
| if health_access == "protected" and not args.expect_auth_required: |
| failures.append("Hosted /api/health is auth-protected, but auth_required was expected to be false.") |
| if auth_required != args.expect_auth_required: |
| failures.append( |
| f"Hosted auth_required mismatch: expected {args.expect_auth_required}, got {auth_required}." |
| ) |
| if auth_enabled != args.expect_auth_enabled: |
| failures.append( |
| f"Hosted auth_enabled mismatch: expected {args.expect_auth_enabled}, got {auth_enabled}." |
| ) |
| if args.require_router_workbook_loaded and router_catalog_loaded is False: |
| failures.append("Hosted router workbook catalog is not loaded.") |
| if args.require_router_workbook_bootstrap: |
| if not router_bootstrap_found: |
| failures.append("Hosted build-info did not report a startup router workbook source file.") |
| if not (router_bootstrap_load_attempted or router_bootstrap_already_loaded): |
| failures.append("Hosted build-info did not report a startup router workbook bootstrap attempt.") |
| if router_bootstrap_error: |
| failures.append(f"Hosted build-info reported a router workbook bootstrap error: {router_bootstrap_error}") |
| expected_router_bootstrap_source = str(getattr(args, "require_router_workbook_bootstrap_source", "") or "").strip() |
| if expected_router_bootstrap_source and router_bootstrap_source != expected_router_bootstrap_source: |
| failures.append( |
| "Hosted router workbook bootstrap source mismatch: " |
| f"expected '{expected_router_bootstrap_source}', got '{router_bootstrap_source or '<empty>'}'." |
| ) |
| if audience in FORBIDDEN_AUDIENCE_VALUES: |
| failures.append(f"Hosted auth audience still resolves to removed placeholder '{audience}'.") |
| if expected_origin: |
| if not app_base_origin and not vite_app_base_origin: |
| failures.append("Hosted build-info did not report APP_BASE_URL or VITE_APP_BASE_URL.") |
| if app_base_url and not app_base_origin: |
| failures.append(f"Hosted APP_BASE_URL is invalid: '{app_base_url}'.") |
| if vite_app_base_url and not vite_app_base_origin: |
| failures.append(f"Hosted VITE_APP_BASE_URL is invalid: '{vite_app_base_url}'.") |
| if app_base_origin and app_base_origin != expected_origin: |
| failures.append( |
| f"Hosted APP_BASE_URL origin mismatch: expected '{expected_origin}', got '{app_base_origin}'." |
| ) |
| if vite_app_base_origin and vite_app_base_origin != expected_origin: |
| failures.append( |
| f"Hosted VITE_APP_BASE_URL origin mismatch: expected '{expected_origin}', got '{vite_app_base_origin}'." |
| ) |
|
|
| lowered = [entry.lower() for entry in auth_text] |
| for forbidden in FORBIDDEN_AUTH_TEXT: |
| if any(forbidden in entry for entry in lowered): |
| failures.append(f"Hosted auth diagnostics still mention forbidden placeholder text: '{forbidden}'.") |
|
|
| if not build_version: |
| warnings.append("Hosted build_version is empty.") |
| if not git_sha: |
| warnings.append("Hosted git_sha is empty.") |
|
|
| return { |
| "ok": not failures, |
| "failures": failures, |
| "warnings": warnings, |
| "checks": { |
| "build_version": build_version, |
| "git_sha": git_sha, |
| "startup_integrity_ok": startup_integrity_ok, |
| "health_ok": health_ok, |
| "health_access": health_access, |
| "health_status_code": health_status_code, |
| "router_catalog_loaded": router_catalog_loaded, |
| "router_catalog_product_count": router_catalog_product_count, |
| "router_catalog_access": router_catalog_access, |
| "router_catalog_status_code": router_catalog_status_code, |
| "router_catalog_status": router_catalog_status, |
| "router_bootstrap_found": router_bootstrap_found, |
| "router_bootstrap_load_attempted": router_bootstrap_load_attempted, |
| "router_bootstrap_load_succeeded": router_bootstrap_load_succeeded, |
| "router_bootstrap_already_loaded": router_bootstrap_already_loaded, |
| "router_bootstrap_error": router_bootstrap_error, |
| "router_bootstrap_source": router_bootstrap_source, |
| "router_bootstrap_dataset_repo_id": router_bootstrap_dataset_repo_id, |
| "router_bootstrap_dataset_revision": router_bootstrap_dataset_revision, |
| "router_bootstrap_dataset_manifest_path": router_bootstrap_dataset_manifest_path, |
| "router_bootstrap_dataset_path": router_bootstrap_dataset_path, |
| "router_bootstrap_checksum_sha256": router_bootstrap_checksum_sha256, |
| "router_bootstrap_published_at": router_bootstrap_published_at, |
| "router_bootstrap_status": router_bootstrap, |
| "auth_required": auth_required, |
| "auth_enabled": auth_enabled, |
| "auth_audience": audience, |
| "auth_text": auth_text, |
| "expected_origin": expected_origin, |
| "app_base_url": app_base_url, |
| "app_base_origin": app_base_origin, |
| "vite_app_base_url": vite_app_base_url, |
| "vite_app_base_origin": vite_app_base_origin, |
| "base_url": args.base_url.rstrip("/"), |
| }, |
| } |
|
|
|
|
| def main() -> int: |
| parser = argparse.ArgumentParser(description="Validate a hosted HF/Auth0 runtime via /build-info and /api/health.") |
| parser.add_argument("--base-url", required=True, help="Hosted base URL, for example https://owner-space.hf.space") |
| parser.add_argument("--expected-build-version", default="", help="Expected hosted build_version.") |
| parser.add_argument("--expected-git-sha", default="", help="Expected hosted git_sha.") |
| parser.add_argument("--expect-auth-required", default="true", help="Expected hosted auth_required value.") |
| parser.add_argument("--expect-auth-enabled", default="true", help="Expected hosted auth_enabled value.") |
| parser.add_argument( |
| "--require-router-workbook-loaded", |
| default="false", |
| help="Whether /api/rapid_router/catalog/status must report catalog.loaded=true.", |
| ) |
| parser.add_argument( |
| "--require-router-workbook-bootstrap", |
| default="false", |
| help="Whether /build-info must report a startup router workbook source path and bootstrap attempt.", |
| ) |
| parser.add_argument( |
| "--require-router-workbook-bootstrap-source", |
| default="", |
| help="Optional expected startup router workbook bootstrap source label, for example hf_dataset.", |
| ) |
| parser.add_argument("--timeout-s", type=float, default=20.0, help="HTTP timeout per request.") |
| parser.add_argument("--out", default="", help="Optional output JSON path.") |
| args = parser.parse_args() |
|
|
| args.expect_auth_required = _parse_bool(args.expect_auth_required) |
| args.expect_auth_enabled = _parse_bool(args.expect_auth_enabled) |
| args.require_router_workbook_loaded = _parse_bool(args.require_router_workbook_loaded) |
| args.require_router_workbook_bootstrap = _parse_bool(args.require_router_workbook_bootstrap) |
| args.require_router_workbook_bootstrap_source = str(args.require_router_workbook_bootstrap_source or "").strip() |
|
|
| try: |
| report = _build_report(args) |
| except (HTTPError, URLError, TimeoutError, json.JSONDecodeError) as exc: |
| report = { |
| "ok": False, |
| "failures": [f"Hosted validation request failed: {type(exc).__name__}: {exc}"], |
| "warnings": [], |
| "checks": { |
| "base_url": args.base_url.rstrip("/"), |
| }, |
| } |
|
|
| payload = json.dumps(report, indent=2) |
| if args.out: |
| from pathlib import Path |
|
|
| out_path = Path(args.out) |
| out_path.parent.mkdir(parents=True, exist_ok=True) |
| out_path.write_text(payload + "\n", encoding="utf-8") |
| print(f"Wrote hosted validation report: {out_path}") |
| print(payload) |
| return 0 if report.get("ok") else 2 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|