#!/usr/bin/env python3 """Verify uploaded Kaiju Coder 7 Hugging Face repos after private upload. The default mode is a dry run that prints the exact checks without downloading or reading auth tokens. Pass --apply after Hugging Face namespace permission and human review are complete. Private repos are verified through the existing HF CLI login; tokens are never accepted as arguments or printed. """ from __future__ import annotations import argparse import json import shutil import subprocess import sys import tempfile import urllib.error import urllib.request from dataclasses import asdict, dataclass from pathlib import Path from typing import Any MODEL_ID = "kaiju-coder-7" DEFAULT_NAMESPACE = "RMDWLLC" DEFAULT_BASE_URL = "http://127.0.0.1:18181/v1" @dataclass(frozen=True) class RepoSpec: key: str suffix: str label: str required_files: tuple[str, ...] marker_files: tuple[tuple[str, tuple[str, ...]], ...] def repo_id(self, namespace: str) -> str: return f"{namespace}/{self.suffix}" @dataclass class Check: name: str status: str detail: str REPOS: tuple[RepoSpec, ...] = ( RepoSpec( key="adapter", suffix="kaiju-coder-7-adapter", label="adapter repo", required_files=( "README.md", "assets/RMDWlogo.png", "adapter_config.json", "adapter_model.safetensors", "DATA_PROVENANCE_DRAFT.md", "SOURCE_INVENTORY.md", "EVAL_SCOREBOARD.md", "SERVING_BENCHMARKS.md", "PAID_API_READINESS.md", "PUBLIC_TESTING_QUICKSTART.md", "FINAL_RELEASE_REPORT.md", "GOAL_COMPLETION_AUDIT.md", "HF_UPLOAD_EVIDENCE.md", "UPSTREAM_LICENSE_CHECK.md", "upstream/qwen3.6-27b/LICENSE", "scripts/make_hf_release_public.sh", "scripts/check_hf_uploaded_release.py", "scripts/check_hf_release_permission_evidence.py", ), marker_files=( ("README.md", ("Kaiju Coder 7", MODEL_ID)), ("PUBLIC_TESTING_QUICKSTART.md", ("Kaiju Coder 7 Public Testing Quickstart", MODEL_ID)), ("FINAL_RELEASE_REPORT.md", ("Kaiju Coder 7 Final Release Report", "Public Launch Blockers")), ), ), RepoSpec( key="opencode", suffix="kaiju-coder-7-opencode", label="OpenCode helper repo", required_files=( "README.md", "assets/RMDWlogo.png", "PUBLIC_TESTING_QUICKSTART.md", "opencode.kaiju-coder-7.jsonc", ".opencode/agents/kaiju-coder-7.md", ".opencode/commands/kaiju.md", "kaiju_harness/__init__.py", "kaiju_harness/router.py", "kaiju_harness/website.py", "kaiju_harness/business_suite.py", "kaiju_harness/verification.py", "prompts/kaiju-website-spec-system.md", "prompts/kaiju-business-spec-system.md", "scripts/install_kaiju_opencode_profile.py", "scripts/opencode-kaiju-no-autocontinue.mjs", "scripts/make_hf_release_public.sh", "scripts/run_kaiju_router.py", "scripts/run_kaiju_public_opencode_smoke.py", "scripts/run_kaiju_public_demo_pack.py", "scripts/run_kaiju_opencode_customer_pack.py", "scripts/check_hf_uploaded_release.py", "evals/tasks/opencode-customer-readiness.jsonl", ), marker_files=( ("README.md", ("Kaiju Coder 7", "default primary agent", "opencode")), ("opencode.kaiju-coder-7.jsonc", (MODEL_ID, '"context": 16384')), (".opencode/agents/kaiju-coder-7.md", ("You are Kaiju Coder 7", "kaiju_artifact")), (".opencode/commands/kaiju.md", ("kaiju_artifact", "$ARGUMENTS")), ( "scripts/opencode-kaiju-no-autocontinue.mjs", ("experimental.compaction.autocontinue", MODEL_ID, "kaiju_artifact"), ), ), ), RepoSpec( key="quantized-runtime", suffix="kaiju-coder-7-quantized-runtime", label="runtime quantization helper repo", required_files=( "README.md", "assets/RMDWlogo.png", "PUBLIC_TESTING_QUICKSTART.md", "GGUF_CANDIDATE.md", "scripts/start-qwen36-merged-vllm.sh", "scripts/stop-qwen36-merged-vllm.sh", "scripts/run-gojira-b-vllm-serving-benchmark.sh", "scripts/probe-gojira-b-persisted-quantization.sh", "scripts/run-gojira-b-kaiju-gguf-convert.sh", ), marker_files=( ("README.md", ("Runtime-Quantized Local Candidate", "bitsandbytes", "Kaiju Coder 7")), ("GGUF_CANDIDATE.md", ("GGUF Candidate", "596a2c227a429c7309db753061d88d71ee3f8a3b48f17e41ba9d81b0f55bdd4e")), ("PUBLIC_TESTING_QUICKSTART.md", ("Kaiju Coder 7 Public Testing Quickstart", MODEL_ID)), ), ), ) def shell_join(args: list[str]) -> str: import shlex return " ".join(shlex.quote(arg) for arg in args) def run_command(args: list[str], *, cwd: Path | None = None, timeout: int) -> subprocess.CompletedProcess[str]: return subprocess.run( args, cwd=cwd, check=False, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, timeout=timeout, ) def read_text(path: Path) -> str: return path.read_text(encoding="utf-8", errors="replace") def selected_repos(args: argparse.Namespace) -> list[RepoSpec]: skipped = { "adapter": args.skip_adapter, "opencode": args.skip_opencode, "quantized-runtime": args.skip_quantized_runtime, } return [spec for spec in REPOS if not skipped[spec.key]] def add_dry_run_checks(checks: list[Check], repos: list[RepoSpec], namespace: str, download_dir: Path) -> None: checks.append(Check("HF uploaded release mode", "manual", "dry run only; pass --apply to download and verify repos")) for spec in repos: target = download_dir / spec.suffix command = ["hf", "download", spec.repo_id(namespace), "--repo-type", "model", "--local-dir", str(target)] checks.append(Check(f"{spec.label} download command", "manual", shell_join(command))) def add_hf_cli_check(checks: list[Check], timeout: int) -> bool: hf_bin = shutil.which("hf") if not hf_bin: checks.append(Check("HF CLI", "fail", "`hf` is not on PATH")) return False result = run_command([hf_bin, "auth", "whoami"], timeout=timeout) if result.returncode == 0 and "user=" in result.stdout: checks.append(Check("HF CLI", "pass", result.stdout.strip().replace("\n", "; "))) return True checks.append(Check("HF CLI", "fail", result.stdout.strip()[:800])) return False def download_repo(checks: list[Check], spec: RepoSpec, namespace: str, download_root: Path, timeout: int) -> Path | None: target = download_root / spec.suffix target.mkdir(parents=True, exist_ok=True) command = ["hf", "download", spec.repo_id(namespace), "--repo-type", "model", "--local-dir", str(target)] result = run_command(command, timeout=timeout) if result.returncode == 0: checks.append(Check(f"{spec.label} download", "pass", f"{spec.repo_id(namespace)} downloaded to {target}")) return target checks.append(Check(f"{spec.label} download", "fail", result.stdout.strip()[-1200:])) return None def check_required_files(checks: list[Check], spec: RepoSpec, root: Path) -> None: missing = [name for name in spec.required_files if not (root / name).is_file()] if missing: checks.append(Check(f"{spec.label} required files", "fail", "missing: " + ", ".join(missing))) else: checks.append(Check(f"{spec.label} required files", "pass", f"{len(spec.required_files)} files present")) def check_markers(checks: list[Check], spec: RepoSpec, root: Path) -> None: failures: list[str] = [] for file_name, markers in spec.marker_files: path = root / file_name if not path.is_file(): failures.append(f"{file_name} missing") continue text = read_text(path) missing = [marker for marker in markers if marker not in text] if missing: failures.append(f"{file_name} missing {', '.join(missing)}") if failures: checks.append(Check(f"{spec.label} content markers", "fail", "; ".join(failures))) else: checks.append(Check(f"{spec.label} content markers", "pass", "expected Kaiju Coder 7 markers found")) def check_public_quickstart_naming(checks: list[Check], spec: RepoSpec, root: Path) -> None: path = root / "PUBLIC_TESTING_QUICKSTART.md" if not path.is_file(): return lowered = read_text(path).lower() forbidden = [term for term in ("qwen", "v1.8") if term in lowered] if forbidden: checks.append(Check(f"{spec.label} public naming hygiene", "fail", "contains: " + ", ".join(forbidden))) else: checks.append(Check(f"{spec.label} public naming hygiene", "pass", "public quickstart avoids internal upstream/checkpoint naming")) def check_opencode_installer(checks: list[Check], opencode_root: Path, timeout: int) -> None: installer = opencode_root / "scripts/install_kaiju_opencode_profile.py" if not installer.is_file(): checks.append(Check("uploaded OpenCode installer dry-run", "fail", f"missing {installer}")) return with tempfile.TemporaryDirectory(prefix="kaiju-uploaded-opencode-config-") as tmp: result = run_command( [sys.executable, str(installer), "--config-dir", tmp, "--dry-run"], cwd=opencode_root, timeout=timeout, ) expected = [ "kaiju-no-autocontinue.mjs", MODEL_ID, '"model": "kaiju/kaiju-coder-7"', '"default_agent": "kaiju-coder-7"', "kaiju-coder-7-run", "kaiju-coder-7-runtime", "commands/kaiju.md", "@opencode-ai/plugin", ] if result.returncode == 0 and all(marker in result.stdout for marker in expected): checks.append(Check("uploaded OpenCode installer dry-run", "pass", "staged helper installs provider, agent, loop guard, and runner")) else: checks.append(Check("uploaded OpenCode installer dry-run", "fail", result.stdout.strip()[:1000])) def check_opencode_router_runtime(checks: list[Check], opencode_root: Path, timeout: int) -> None: script = opencode_root / "scripts/run_kaiju_router.py" if not script.is_file(): checks.append(Check("uploaded OpenCode router runtime", "fail", f"missing {script}")) return with tempfile.TemporaryDirectory(prefix="kaiju-uploaded-router-") as tmp: out_dir = Path(tmp) / "out" result = run_command( [ sys.executable, str(script), "--no-planner", "--kind", "website", "--out-dir", str(out_dir), "--prompt", "Build a simple website for Harborline Bookkeeping.", ], cwd=opencode_root, timeout=timeout, ) html_files = sorted(out_dir.rglob("index.html")) if result.returncode == 0 and html_files: checks.append(Check("uploaded OpenCode router runtime", "pass", "downloaded helper can run router and create a website artifact")) else: detail = result.stdout.strip()[-1200:] checks.append(Check("uploaded OpenCode router runtime", "fail", detail or "router did not create index.html")) def run_opencode_smoke(checks: list[Check], opencode_root: Path, base_url: str, timeout: int) -> None: script = opencode_root / "scripts/run_kaiju_public_opencode_smoke.py" if not script.is_file(): checks.append(Check("uploaded OpenCode smoke", "fail", f"missing {script}")) return result = run_command([sys.executable, str(script), "--base-url", base_url, "--timeout", str(timeout)], cwd=opencode_root, timeout=timeout + 120) if result.returncode == 0: checks.append(Check("uploaded OpenCode smoke", "pass", "downloaded helper completed live public OpenCode smoke")) else: checks.append(Check("uploaded OpenCode smoke", "fail", result.stdout.strip()[-1200:])) def check_public_visibility(checks: list[Check], spec: RepoSpec, namespace: str, timeout: int) -> None: repo_id = spec.repo_id(namespace) url = f"https://huggingface.co/api/models/{repo_id}" request = urllib.request.Request(url, headers={"User-Agent": "kaiju-coder-7-release-check"}) try: with urllib.request.urlopen(request, timeout=timeout) as response: if response.status == 200: checks.append(Check(f"{spec.label} public visibility", "pass", f"{repo_id} is publicly readable")) return checks.append(Check(f"{spec.label} public visibility", "fail", f"{url} returned HTTP {response.status}")) except urllib.error.HTTPError as exc: checks.append(Check(f"{spec.label} public visibility", "fail", f"{url} returned HTTP {exc.code}")) except Exception as exc: # noqa: BLE001 - report network failures clearly. checks.append(Check(f"{spec.label} public visibility", "fail", f"{url} failed: {exc!r}")) def verify_downloaded_repo(checks: list[Check], spec: RepoSpec, root: Path, *, installer_timeout: int) -> None: check_required_files(checks, spec, root) check_markers(checks, spec, root) check_public_quickstart_naming(checks, spec, root) if spec.key == "opencode": check_opencode_installer(checks, root, timeout=installer_timeout) check_opencode_router_runtime(checks, root, timeout=installer_timeout) def summarize(checks: list[Check], *, applied: bool) -> dict[str, Any]: return { "ready": applied and not any(check.status in {"fail", "manual"} for check in checks), "applied": applied, "summary": { "pass": sum(1 for check in checks if check.status == "pass"), "fail": sum(1 for check in checks if check.status == "fail"), "manual": sum(1 for check in checks if check.status == "manual"), }, "checks": [asdict(check) for check in checks], } def print_text(result: dict[str, Any]) -> None: print(f"Kaiju Coder 7 uploaded HF release verification: ready={result['ready']} applied={result['applied']}") print( "Summary: " f"{result['summary']['pass']} pass, " f"{result['summary']['fail']} fail, " f"{result['summary']['manual']} manual" ) for check in result["checks"]: print(f"[{check['status']}] {check['name']} - {check['detail']}") def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--namespace", default=DEFAULT_NAMESPACE) parser.add_argument("--download-dir", type=Path, default=None) parser.add_argument("--apply", action="store_true", help="Download uploaded repos and verify contents.") parser.add_argument("--require-public", action="store_true", help="Require repos to be publicly readable without auth.") parser.add_argument("--run-opencode-smoke", action="store_true", help="Run the downloaded OpenCode helper live smoke.") parser.add_argument("--base-url", default=DEFAULT_BASE_URL) parser.add_argument("--download-timeout", type=int, default=900) parser.add_argument("--installer-timeout", type=int, default=60) parser.add_argument("--public-timeout", type=int, default=15) parser.add_argument("--opencode-timeout", type=int, default=900) parser.add_argument("--skip-adapter", action="store_true") parser.add_argument("--skip-opencode", action="store_true") parser.add_argument("--skip-quantized-runtime", action="store_true") parser.add_argument("--json", action="store_true") return parser.parse_args() def main() -> int: args = parse_args() repos = selected_repos(args) checks: list[Check] = [] if not repos: checks.append(Check("repo selection", "fail", "all repos were skipped")) result = summarize(checks, applied=args.apply) if args.json: print(json.dumps(result, indent=2)) else: print_text(result) return 1 if args.download_dir: download_root = args.download_dir download_root.mkdir(parents=True, exist_ok=True) temp_context: Any = None else: temp_context = tempfile.TemporaryDirectory(prefix="kaiju-hf-uploaded-") download_root = Path(temp_context.name) try: if not args.apply: add_dry_run_checks(checks, repos, args.namespace, download_root) result = summarize(checks, applied=False) if args.json: print(json.dumps(result, indent=2)) else: print_text(result) return 0 if not add_hf_cli_check(checks, timeout=30): result = summarize(checks, applied=True) if args.json: print(json.dumps(result, indent=2)) else: print_text(result) return 1 downloaded: dict[str, Path] = {} for spec in repos: if args.require_public: check_public_visibility(checks, spec, args.namespace, timeout=args.public_timeout) root = download_repo(checks, spec, args.namespace, download_root, timeout=args.download_timeout) if root: downloaded[spec.key] = root verify_downloaded_repo(checks, spec, root, installer_timeout=args.installer_timeout) if args.run_opencode_smoke: opencode_root = downloaded.get("opencode") if opencode_root: run_opencode_smoke(checks, opencode_root, base_url=args.base_url, timeout=args.opencode_timeout) else: checks.append(Check("uploaded OpenCode smoke", "fail", "OpenCode helper repo was not downloaded")) result = summarize(checks, applied=True) if args.json: print(json.dumps(result, indent=2)) else: print_text(result) return 0 if result["ready"] else 1 finally: if temp_context is not None: temp_context.cleanup() if __name__ == "__main__": raise SystemExit(main())