| |
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import os |
| import sys |
| from pathlib import Path |
| from typing import Any, Dict, List |
| from urllib.parse import urlparse |
|
|
| REPO_ROOT = Path(__file__).resolve().parents[2] |
| BACKEND_DIR = REPO_ROOT / "backend" |
| if str(BACKEND_DIR) not in sys.path: |
| sys.path.insert(0, str(BACKEND_DIR)) |
|
|
| from app.auth import load_auth_settings |
|
|
| EXPECTED_DEFAULT_MODEL = "gpt-5-mini" |
| FORBIDDEN_AUDIENCE_VALUES = { |
| "https://masters-toolkit-api", |
| "https://masters-toolkit-api/", |
| } |
| PARSEC_RELATIVE_CANDIDATES = ( |
| Path("01_documents") / "antennas" / "parsec" / "Parsec-Portfolio-Catalog-2024-09.pdf", |
| Path("01_documents") / "routers" / "parsec" / "Parsec-Antenna-Data Sheet.pdf", |
| Path("01_documents") / "antennas" / "parsec" / "Parsec-Enterprise Portfolio-Router Matching Guide.pdf", |
| ) |
|
|
|
|
| def _is_placeholder_key(value: str) -> bool: |
| v = str(value or "").strip() |
| if not v: |
| return True |
| vu = v.upper() |
| return vu.startswith("YOUR_KEY") or vu in {"<YOUR_OPENAI_API_KEY>", "YOUR_OPENAI_API_KEY", "REPLACE_ME"} |
|
|
|
|
| def _clean_env_value(value: str) -> str: |
| return str(value or "").strip().strip("\"'").strip() |
|
|
|
|
| def _resolve_router_rag_paths() -> Dict[str, Path]: |
| data_dir = Path(str(os.getenv("ROUTER_RAG_DATA_DIR", "") or "").strip() or (REPO_ROOT / "_RAG_Ready_KB_Organized")) |
| chunks = Path(str(os.getenv("ROUTER_RAG_CHUNKS_PATH", "") or "").strip() or (data_dir / "04_ingestion" / "rag_ingestion_chunks.jsonl")) |
| manifest = Path(str(os.getenv("ROUTER_RAG_MANIFEST_PATH", "") or "").strip() or (data_dir / "03_manifests" / "rag_manifest_organized.csv")) |
| return {"data_dir": data_dir, "chunks": chunks, "manifest": manifest} |
|
|
|
|
| def _resolve_parsec_catalog_path(rag_paths: Dict[str, Path]) -> Path | None: |
| configured = _clean_env_value(os.getenv("PARSEC_CATALOG_PATH", "")) |
| if configured: |
| configured_path = Path(configured) |
| if configured_path.exists(): |
| return configured_path |
|
|
| repo_asset = REPO_ROOT / "ParsecCatalog.pdf" |
| if repo_asset.exists(): |
| return repo_asset |
|
|
| data_dir = Path(rag_paths.get("data_dir") or "") |
| for rel in PARSEC_RELATIVE_CANDIDATES: |
| candidate = data_dir / rel |
| if candidate.exists(): |
| return candidate |
| return None |
|
|
|
|
| def _check_path(path: Path, *, file_required: bool = True) -> Dict[str, Any]: |
| exists = path.exists() |
| ok = exists and (path.is_file() if file_required else path.is_dir()) |
| return {"path": str(path), "exists": exists, "ok": ok} |
|
|
|
|
| def _normalize_origin(value: str) -> str: |
| raw = _clean_env_value(value) |
| if not raw: |
| return "" |
| parsed = urlparse(raw) |
| if parsed.scheme not in {"http", "https"} or not parsed.netloc: |
| return "" |
| host = parsed.hostname or "" |
| if not host: |
| return "" |
| port = f":{parsed.port}" if parsed.port else "" |
| return f"{parsed.scheme.lower()}://{host.lower()}{port}" |
|
|
|
|
| def _build_report(args: argparse.Namespace) -> Dict[str, Any]: |
| checks: Dict[str, Any] = {} |
| failures: List[str] = [] |
| warnings: List[str] = [] |
|
|
| openai_key = str(os.getenv("OPENAI_API_KEY", "") or "").strip() |
| openai_ok = bool(openai_key) and not _is_placeholder_key(openai_key) |
| checks["openai"] = { |
| "required": bool(args.require_openai), |
| "present": bool(openai_key), |
| "placeholder": _is_placeholder_key(openai_key) if openai_key else True, |
| "ok": openai_ok or (not args.require_openai), |
| } |
| if args.require_openai and not openai_ok: |
| failures.append("OPENAI_API_KEY is missing or placeholder.") |
|
|
| auth = load_auth_settings() |
| checks["auth"] = { |
| "required": bool(auth.required), |
| "enabled": bool(auth.enabled), |
| "config_error": auth.config_error, |
| "config_details": list(auth.config_details), |
| "config_warnings": list(auth.config_warnings), |
| "ok": (not auth.required) or bool(auth.enabled), |
| } |
| if auth.required and not auth.enabled: |
| failures.append(auth.config_error or "Auth is required but Auth0 config is invalid.") |
| if auth.config_warnings: |
| warnings.extend(list(auth.config_warnings)) |
|
|
| model_env_names = ( |
| "OPENAI_MODEL", |
| "UNIFIED_KB_OPENAI_MODEL", |
| "ROUTER_RAG_OPENAI_MODEL", |
| ) |
| checks["model_env"] = {} |
| for name in model_env_names: |
| raw = _clean_env_value(os.getenv(name, "")) |
| model_ok = (not raw) or raw == EXPECTED_DEFAULT_MODEL or (not args.enforce_gpt5_mini) |
| checks["model_env"][name] = { |
| "set": bool(raw), |
| "value": raw, |
| "expected": EXPECTED_DEFAULT_MODEL, |
| "ok": model_ok, |
| } |
| if args.enforce_gpt5_mini and raw and raw != EXPECTED_DEFAULT_MODEL: |
| failures.append(f"{name} must be unset or '{EXPECTED_DEFAULT_MODEL}', found '{raw}'.") |
|
|
| audience_env_names = ("AUTH0_AUDIENCE", "VITE_AUTH0_AUDIENCE") |
| checks["forbidden_audience"] = {} |
| for name in audience_env_names: |
| raw = _clean_env_value(os.getenv(name, "")) |
| normalized = raw.rstrip("/") if raw.startswith("https://") else raw |
| forbidden = raw in FORBIDDEN_AUDIENCE_VALUES or normalized in {v.rstrip("/") for v in FORBIDDEN_AUDIENCE_VALUES} |
| checks["forbidden_audience"][name] = { |
| "set": bool(raw), |
| "value": raw, |
| "ok": not forbidden, |
| } |
| if forbidden: |
| failures.append(f"{name} must remain unset unless a real API Identifier exists; found removed placeholder '{raw}'.") |
|
|
| app_base_url = _clean_env_value(os.getenv("APP_BASE_URL", "")) |
| vite_app_base_url = _clean_env_value(os.getenv("VITE_APP_BASE_URL", "")) |
| app_base_origin = _normalize_origin(app_base_url) |
| vite_app_base_origin = _normalize_origin(vite_app_base_url) |
| base_urls_ok = True |
| if app_base_url and not app_base_origin: |
| base_urls_ok = False |
| failures.append(f"APP_BASE_URL must be an absolute http(s) URL, found '{app_base_url}'.") |
| if vite_app_base_url and not vite_app_base_origin: |
| base_urls_ok = False |
| failures.append(f"VITE_APP_BASE_URL must be an absolute http(s) URL, found '{vite_app_base_url}'.") |
| if app_base_origin and vite_app_base_origin and app_base_origin != vite_app_base_origin: |
| base_urls_ok = False |
| failures.append( |
| "APP_BASE_URL and VITE_APP_BASE_URL must point at the same origin; " |
| f"found '{app_base_origin}' vs '{vite_app_base_origin}'." |
| ) |
| checks["base_urls"] = { |
| "APP_BASE_URL": {"set": bool(app_base_url), "value": app_base_url, "origin": app_base_origin}, |
| "VITE_APP_BASE_URL": {"set": bool(vite_app_base_url), "value": vite_app_base_url, "origin": vite_app_base_origin}, |
| "ok": base_urls_ok, |
| } |
|
|
| rag = _resolve_router_rag_paths() |
|
|
| repo_assets = { |
| "routers_eos_eol_by_sku.csv": REPO_ROOT / "routers_eos_eol_by_sku.csv", |
| "feb2026routers.csv": REPO_ROOT / "feb2026routers.csv", |
| "FAQ_master_updated.csv": REPO_ROOT / "docs" / "faq" / "FAQ_master_updated.csv", |
| } |
| checks["required_assets"] = {name: _check_path(path, file_required=True) for name, path in repo_assets.items()} |
| for name, data in checks["required_assets"].items(): |
| if not bool(data.get("ok")): |
| failures.append(f"Missing required asset: {name}") |
|
|
| checks["router_rag"] = { |
| "data_dir": _check_path(rag["data_dir"], file_required=False), |
| "chunks": _check_path(rag["chunks"], file_required=True), |
| "manifest": _check_path(rag["manifest"], file_required=True), |
| "strict": bool(args.fail_on_missing_router_rag), |
| } |
| if args.fail_on_missing_router_rag: |
| if not checks["router_rag"]["chunks"]["ok"]: |
| failures.append("Router RAG chunks file is missing.") |
| if not checks["router_rag"]["manifest"]["ok"]: |
| failures.append("Router RAG manifest file is missing.") |
| else: |
| if not checks["router_rag"]["chunks"]["ok"] or not checks["router_rag"]["manifest"]["ok"]: |
| warnings.append("Router RAG ingestion artifacts are missing; router-doc retrieval may be degraded.") |
|
|
| parsec_catalog_path = _resolve_parsec_catalog_path(rag) |
| parsec_default = Path(_clean_env_value(os.getenv("PARSEC_CATALOG_PATH", "")) or (REPO_ROOT / "ParsecCatalog.pdf")) |
| parsec_check = _check_path(parsec_catalog_path or parsec_default, file_required=True) |
| parsec_check["required"] = True |
| checks["parsec_catalog"] = parsec_check |
| if not bool(parsec_check.get("ok")): |
| failures.append("Parsec catalog asset is missing.") |
|
|
| return { |
| "ok": not failures, |
| "failures": failures, |
| "warnings": warnings, |
| "checks": checks, |
| } |
|
|
|
|
| def main() -> int: |
| parser = argparse.ArgumentParser(description="Preflight environment/configuration checks for CI and release gates.") |
| parser.add_argument("--require-openai", action="store_true", help="Fail if OPENAI_API_KEY is missing/placeholder.") |
| parser.add_argument( |
| "--enforce-gpt5-mini", |
| action="store_true", |
| help=f"Fail if active model env vars are set to anything other than {EXPECTED_DEFAULT_MODEL}.", |
| ) |
| parser.add_argument( |
| "--fail-on-missing-router-rag", |
| action="store_true", |
| help="Fail if router RAG manifest/chunks files are missing.", |
| ) |
| parser.add_argument("--out", default="", help="Optional output JSON path.") |
| args = parser.parse_args() |
|
|
| report = _build_report(args) |
| payload = json.dumps(report, indent=2) |
| if args.out: |
| out_path = Path(args.out) |
| out_path.parent.mkdir(parents=True, exist_ok=True) |
| out_path.write_text(payload + "\n", encoding="utf-8") |
| print(f"Wrote preflight report: {out_path}") |
| print(payload) |
| return 0 if report.get("ok") else 2 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|