| |
| """Verify uploaded Kaiju Coder 7 Hugging Face repos after private upload. |
| |
| The default mode is a dry run that prints the exact checks without downloading |
| or reading auth tokens. Pass --apply after Hugging Face namespace permission and |
| human review are complete. Private repos are verified through the existing HF |
| CLI login; tokens are never accepted as arguments or printed. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import shutil |
| import subprocess |
| import sys |
| import tempfile |
| import urllib.error |
| import urllib.request |
| from dataclasses import asdict, dataclass |
| from pathlib import Path |
| from typing import Any |
|
|
|
|
| MODEL_ID = "kaiju-coder-7" |
| DEFAULT_NAMESPACE = "RMDWLLC" |
| DEFAULT_BASE_URL = "http://100.109.109.14:18083/v1" |
|
|
|
|
| @dataclass(frozen=True) |
| class RepoSpec: |
| key: str |
| suffix: str |
| label: str |
| required_files: tuple[str, ...] |
| marker_files: tuple[tuple[str, tuple[str, ...]], ...] |
|
|
| def repo_id(self, namespace: str) -> str: |
| return f"{namespace}/{self.suffix}" |
|
|
|
|
| @dataclass |
| class Check: |
| name: str |
| status: str |
| detail: str |
|
|
|
|
| REPOS: tuple[RepoSpec, ...] = ( |
| RepoSpec( |
| key="adapter", |
| suffix="kaiju-coder-7-adapter", |
| label="adapter repo", |
| required_files=( |
| "README.md", |
| "adapter_config.json", |
| "adapter_model.safetensors", |
| "DATA_PROVENANCE_DRAFT.md", |
| "SOURCE_INVENTORY.md", |
| "EVAL_SCOREBOARD.md", |
| "SERVING_BENCHMARKS.md", |
| "PAID_API_READINESS.md", |
| "PUBLIC_TESTING_QUICKSTART.md", |
| "FINAL_RELEASE_REPORT.md", |
| "GOAL_COMPLETION_AUDIT.md", |
| "UPSTREAM_LICENSE_CHECK.md", |
| "upstream/qwen3.6-27b/LICENSE", |
| "scripts/check_hf_uploaded_release.py", |
| "scripts/check_hf_release_permission_evidence.py", |
| ), |
| marker_files=( |
| ("README.md", ("Kaiju Coder 7", MODEL_ID)), |
| ("PUBLIC_TESTING_QUICKSTART.md", ("Kaiju Coder 7 Public Testing Quickstart", MODEL_ID)), |
| ("FINAL_RELEASE_REPORT.md", ("Kaiju Coder 7 Final Release Report", "Public Launch Blockers")), |
| ), |
| ), |
| RepoSpec( |
| key="opencode", |
| suffix="kaiju-coder-7-opencode", |
| label="OpenCode helper repo", |
| required_files=( |
| "README.md", |
| "PUBLIC_TESTING_QUICKSTART.md", |
| "opencode.kaiju-coder-7.jsonc", |
| ".opencode/agents/kaiju-coder-7.md", |
| "scripts/install_kaiju_opencode_profile.py", |
| "scripts/opencode-kaiju-no-autocontinue.mjs", |
| "scripts/run_kaiju_public_opencode_smoke.py", |
| "scripts/run_kaiju_opencode_customer_pack.py", |
| "scripts/check_hf_uploaded_release.py", |
| "evals/tasks/opencode-customer-readiness.jsonl", |
| ), |
| marker_files=( |
| ("README.md", ("Kaiju Coder 7", "opencode -m kaiju/kaiju-coder-7")), |
| ("opencode.kaiju-coder-7.jsonc", (MODEL_ID, '"context": 16384')), |
| (".opencode/agents/kaiju-coder-7.md", ("You are Kaiju Coder 7", "Confirm the current working directory")), |
| ("scripts/opencode-kaiju-no-autocontinue.mjs", ("experimental.compaction.autocontinue", MODEL_ID)), |
| ), |
| ), |
| RepoSpec( |
| key="quantized-runtime", |
| suffix="kaiju-coder-7-quantized-runtime", |
| label="runtime quantization helper repo", |
| required_files=( |
| "README.md", |
| "PUBLIC_TESTING_QUICKSTART.md", |
| "scripts/start-qwen36-merged-vllm.sh", |
| "scripts/stop-qwen36-merged-vllm.sh", |
| "scripts/run-gojira-b-vllm-serving-benchmark.sh", |
| ), |
| marker_files=( |
| ("README.md", ("Runtime-Quantized Local Candidate", "bitsandbytes", "Kaiju Coder 7")), |
| ("PUBLIC_TESTING_QUICKSTART.md", ("Kaiju Coder 7 Public Testing Quickstart", MODEL_ID)), |
| ), |
| ), |
| ) |
|
|
|
|
| def shell_join(args: list[str]) -> str: |
| import shlex |
|
|
| return " ".join(shlex.quote(arg) for arg in args) |
|
|
|
|
| def run_command(args: list[str], *, cwd: Path | None = None, timeout: int) -> subprocess.CompletedProcess[str]: |
| return subprocess.run( |
| args, |
| cwd=cwd, |
| check=False, |
| text=True, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.STDOUT, |
| timeout=timeout, |
| ) |
|
|
|
|
| def read_text(path: Path) -> str: |
| return path.read_text(encoding="utf-8", errors="replace") |
|
|
|
|
| def selected_repos(args: argparse.Namespace) -> list[RepoSpec]: |
| skipped = { |
| "adapter": args.skip_adapter, |
| "opencode": args.skip_opencode, |
| "quantized-runtime": args.skip_quantized_runtime, |
| } |
| return [spec for spec in REPOS if not skipped[spec.key]] |
|
|
|
|
| def add_dry_run_checks(checks: list[Check], repos: list[RepoSpec], namespace: str, download_dir: Path) -> None: |
| checks.append(Check("HF uploaded release mode", "manual", "dry run only; pass --apply to download and verify repos")) |
| for spec in repos: |
| target = download_dir / spec.suffix |
| command = ["hf", "download", spec.repo_id(namespace), "--repo-type", "model", "--local-dir", str(target)] |
| checks.append(Check(f"{spec.label} download command", "manual", shell_join(command))) |
|
|
|
|
| def add_hf_cli_check(checks: list[Check], timeout: int) -> bool: |
| hf_bin = shutil.which("hf") |
| if not hf_bin: |
| checks.append(Check("HF CLI", "fail", "`hf` is not on PATH")) |
| return False |
| result = run_command([hf_bin, "auth", "whoami"], timeout=timeout) |
| if result.returncode == 0 and "user=" in result.stdout: |
| checks.append(Check("HF CLI", "pass", result.stdout.strip().replace("\n", "; "))) |
| return True |
| checks.append(Check("HF CLI", "fail", result.stdout.strip()[:800])) |
| return False |
|
|
|
|
| def download_repo(checks: list[Check], spec: RepoSpec, namespace: str, download_root: Path, timeout: int) -> Path | None: |
| target = download_root / spec.suffix |
| target.mkdir(parents=True, exist_ok=True) |
| command = ["hf", "download", spec.repo_id(namespace), "--repo-type", "model", "--local-dir", str(target)] |
| result = run_command(command, timeout=timeout) |
| if result.returncode == 0: |
| checks.append(Check(f"{spec.label} download", "pass", f"{spec.repo_id(namespace)} downloaded to {target}")) |
| return target |
| checks.append(Check(f"{spec.label} download", "fail", result.stdout.strip()[-1200:])) |
| return None |
|
|
|
|
| def check_required_files(checks: list[Check], spec: RepoSpec, root: Path) -> None: |
| missing = [name for name in spec.required_files if not (root / name).is_file()] |
| if missing: |
| checks.append(Check(f"{spec.label} required files", "fail", "missing: " + ", ".join(missing))) |
| else: |
| checks.append(Check(f"{spec.label} required files", "pass", f"{len(spec.required_files)} files present")) |
|
|
|
|
| def check_markers(checks: list[Check], spec: RepoSpec, root: Path) -> None: |
| failures: list[str] = [] |
| for file_name, markers in spec.marker_files: |
| path = root / file_name |
| if not path.is_file(): |
| failures.append(f"{file_name} missing") |
| continue |
| text = read_text(path) |
| missing = [marker for marker in markers if marker not in text] |
| if missing: |
| failures.append(f"{file_name} missing {', '.join(missing)}") |
| if failures: |
| checks.append(Check(f"{spec.label} content markers", "fail", "; ".join(failures))) |
| else: |
| checks.append(Check(f"{spec.label} content markers", "pass", "expected Kaiju Coder 7 markers found")) |
|
|
|
|
| def check_public_quickstart_naming(checks: list[Check], spec: RepoSpec, root: Path) -> None: |
| path = root / "PUBLIC_TESTING_QUICKSTART.md" |
| if not path.is_file(): |
| return |
| lowered = read_text(path).lower() |
| forbidden = [term for term in ("qwen", "v1.8") if term in lowered] |
| if forbidden: |
| checks.append(Check(f"{spec.label} public naming hygiene", "fail", "contains: " + ", ".join(forbidden))) |
| else: |
| checks.append(Check(f"{spec.label} public naming hygiene", "pass", "public quickstart avoids internal upstream/checkpoint naming")) |
|
|
|
|
| def check_opencode_installer(checks: list[Check], opencode_root: Path, timeout: int) -> None: |
| installer = opencode_root / "scripts/install_kaiju_opencode_profile.py" |
| if not installer.is_file(): |
| checks.append(Check("uploaded OpenCode installer dry-run", "fail", f"missing {installer}")) |
| return |
| with tempfile.TemporaryDirectory(prefix="kaiju-uploaded-opencode-config-") as tmp: |
| result = run_command( |
| [sys.executable, str(installer), "--config-dir", tmp, "--dry-run"], |
| cwd=opencode_root, |
| timeout=timeout, |
| ) |
| if result.returncode == 0 and "kaiju-no-autocontinue.mjs" in result.stdout and MODEL_ID in result.stdout: |
| checks.append(Check("uploaded OpenCode installer dry-run", "pass", "staged helper installs provider, agent, and loop guard")) |
| else: |
| checks.append(Check("uploaded OpenCode installer dry-run", "fail", result.stdout.strip()[:1000])) |
|
|
|
|
| def run_opencode_smoke(checks: list[Check], opencode_root: Path, base_url: str, timeout: int) -> None: |
| script = opencode_root / "scripts/run_kaiju_public_opencode_smoke.py" |
| if not script.is_file(): |
| checks.append(Check("uploaded OpenCode smoke", "fail", f"missing {script}")) |
| return |
| result = run_command([sys.executable, str(script), "--base-url", base_url, "--timeout", str(timeout)], cwd=opencode_root, timeout=timeout + 120) |
| if result.returncode == 0: |
| checks.append(Check("uploaded OpenCode smoke", "pass", "downloaded helper completed live public OpenCode smoke")) |
| else: |
| checks.append(Check("uploaded OpenCode smoke", "fail", result.stdout.strip()[-1200:])) |
|
|
|
|
| def check_public_visibility(checks: list[Check], spec: RepoSpec, namespace: str, timeout: int) -> None: |
| repo_id = spec.repo_id(namespace) |
| url = f"https://huggingface.co/api/models/{repo_id}" |
| request = urllib.request.Request(url, headers={"User-Agent": "kaiju-coder-7-release-check"}) |
| try: |
| with urllib.request.urlopen(request, timeout=timeout) as response: |
| if response.status == 200: |
| checks.append(Check(f"{spec.label} public visibility", "pass", f"{repo_id} is publicly readable")) |
| return |
| checks.append(Check(f"{spec.label} public visibility", "fail", f"{url} returned HTTP {response.status}")) |
| except urllib.error.HTTPError as exc: |
| checks.append(Check(f"{spec.label} public visibility", "fail", f"{url} returned HTTP {exc.code}")) |
| except Exception as exc: |
| checks.append(Check(f"{spec.label} public visibility", "fail", f"{url} failed: {exc!r}")) |
|
|
|
|
| def verify_downloaded_repo(checks: list[Check], spec: RepoSpec, root: Path, *, installer_timeout: int) -> None: |
| check_required_files(checks, spec, root) |
| check_markers(checks, spec, root) |
| check_public_quickstart_naming(checks, spec, root) |
| if spec.key == "opencode": |
| check_opencode_installer(checks, root, timeout=installer_timeout) |
|
|
|
|
| def summarize(checks: list[Check], *, applied: bool) -> dict[str, Any]: |
| return { |
| "ready": applied and not any(check.status in {"fail", "manual"} for check in checks), |
| "applied": applied, |
| "summary": { |
| "pass": sum(1 for check in checks if check.status == "pass"), |
| "fail": sum(1 for check in checks if check.status == "fail"), |
| "manual": sum(1 for check in checks if check.status == "manual"), |
| }, |
| "checks": [asdict(check) for check in checks], |
| } |
|
|
|
|
| def print_text(result: dict[str, Any]) -> None: |
| print(f"Kaiju Coder 7 uploaded HF release verification: ready={result['ready']} applied={result['applied']}") |
| print( |
| "Summary: " |
| f"{result['summary']['pass']} pass, " |
| f"{result['summary']['fail']} fail, " |
| f"{result['summary']['manual']} manual" |
| ) |
| for check in result["checks"]: |
| print(f"[{check['status']}] {check['name']} - {check['detail']}") |
|
|
|
|
| def parse_args() -> argparse.Namespace: |
| parser = argparse.ArgumentParser(description=__doc__) |
| parser.add_argument("--namespace", default=DEFAULT_NAMESPACE) |
| parser.add_argument("--download-dir", type=Path, default=None) |
| parser.add_argument("--apply", action="store_true", help="Download uploaded repos and verify contents.") |
| parser.add_argument("--require-public", action="store_true", help="Require repos to be publicly readable without auth.") |
| parser.add_argument("--run-opencode-smoke", action="store_true", help="Run the downloaded OpenCode helper live smoke.") |
| parser.add_argument("--base-url", default=DEFAULT_BASE_URL) |
| parser.add_argument("--download-timeout", type=int, default=900) |
| parser.add_argument("--installer-timeout", type=int, default=60) |
| parser.add_argument("--public-timeout", type=int, default=15) |
| parser.add_argument("--opencode-timeout", type=int, default=900) |
| parser.add_argument("--skip-adapter", action="store_true") |
| parser.add_argument("--skip-opencode", action="store_true") |
| parser.add_argument("--skip-quantized-runtime", action="store_true") |
| parser.add_argument("--json", action="store_true") |
| return parser.parse_args() |
|
|
|
|
| def main() -> int: |
| args = parse_args() |
| repos = selected_repos(args) |
| checks: list[Check] = [] |
| if not repos: |
| checks.append(Check("repo selection", "fail", "all repos were skipped")) |
| result = summarize(checks, applied=args.apply) |
| if args.json: |
| print(json.dumps(result, indent=2)) |
| else: |
| print_text(result) |
| return 1 |
|
|
| if args.download_dir: |
| download_root = args.download_dir |
| download_root.mkdir(parents=True, exist_ok=True) |
| temp_context: Any = None |
| else: |
| temp_context = tempfile.TemporaryDirectory(prefix="kaiju-hf-uploaded-") |
| download_root = Path(temp_context.name) |
|
|
| try: |
| if not args.apply: |
| add_dry_run_checks(checks, repos, args.namespace, download_root) |
| result = summarize(checks, applied=False) |
| if args.json: |
| print(json.dumps(result, indent=2)) |
| else: |
| print_text(result) |
| return 0 |
|
|
| if not add_hf_cli_check(checks, timeout=30): |
| result = summarize(checks, applied=True) |
| if args.json: |
| print(json.dumps(result, indent=2)) |
| else: |
| print_text(result) |
| return 1 |
|
|
| downloaded: dict[str, Path] = {} |
| for spec in repos: |
| if args.require_public: |
| check_public_visibility(checks, spec, args.namespace, timeout=args.public_timeout) |
| root = download_repo(checks, spec, args.namespace, download_root, timeout=args.download_timeout) |
| if root: |
| downloaded[spec.key] = root |
| verify_downloaded_repo(checks, spec, root, installer_timeout=args.installer_timeout) |
|
|
| if args.run_opencode_smoke: |
| opencode_root = downloaded.get("opencode") |
| if opencode_root: |
| run_opencode_smoke(checks, opencode_root, base_url=args.base_url, timeout=args.opencode_timeout) |
| else: |
| checks.append(Check("uploaded OpenCode smoke", "fail", "OpenCode helper repo was not downloaded")) |
|
|
| result = summarize(checks, applied=True) |
| if args.json: |
| print(json.dumps(result, indent=2)) |
| else: |
| print_text(result) |
| return 0 if result["ready"] else 1 |
| finally: |
| if temp_context is not None: |
| temp_context.cleanup() |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|