#!/usr/bin/env python3 """Run a bounded public OpenCode smoke test for Kaiju Coder 7. This proves the packaged OpenCode profile can be installed and used from this Mac for a real one-file write without wrong-directory output. It records a small public-safe report and avoids reading auth tokens or process command lines. """ from __future__ import annotations import argparse import json import shlex import shutil import subprocess import sys import tempfile import urllib.request from dataclasses import asdict, dataclass from datetime import datetime, timezone from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[1] MODEL_ID = "kaiju-coder-7" EXPECTED_TEXT = "Kaiju Coder 7 public OpenCode smoke ok" DEFAULT_RUNS_DIR = ROOT / "runs/public-opencode-smoke" DEFAULT_BASE_URL = "http://127.0.0.1:18181/v1" @dataclass class Check: name: str status: str detail: str def utc_stamp() -> str: return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") def run_command(args: list[str], *, timeout: int, cwd: Path = ROOT) -> subprocess.CompletedProcess[str]: return subprocess.run( args, cwd=cwd, check=False, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, timeout=timeout, ) def shell_join(args: list[str]) -> str: return " ".join(shlex.quote(arg) for arg in args) def add_installer_check(checks: list[Check], timeout: int, base_url: str | None) -> None: with tempfile.TemporaryDirectory(prefix="kaiju-opencode-config-") as tmp: args = [ sys.executable, "scripts/install_kaiju_opencode_profile.py", "--config-dir", tmp, "--dry-run", ] if base_url: args.extend(["--base-url", base_url]) result = run_command(args, timeout=timeout) if ( result.returncode == 0 and MODEL_ID in result.stdout and "kaiju-no-autocontinue.mjs" in result.stdout and '"model": "kaiju/kaiju-coder-7"' in result.stdout and '"default_agent": "kaiju-coder-7"' in result.stdout ): checks.append(Check("installer dry-run", "pass", "provider, default model, agent, and loop guard preview emitted")) else: checks.append(Check("installer dry-run", "fail", result.stdout.strip()[:800])) def add_opencode_version_check(checks: list[Check], timeout: int) -> None: if not shutil.which("opencode"): checks.append(Check("opencode binary", "fail", "opencode is not on PATH")) return result = run_command(["opencode", "--version"], timeout=timeout) if result.returncode == 0 and result.stdout.strip(): checks.append(Check("opencode binary", "pass", f"opencode {result.stdout.strip()}")) else: checks.append(Check("opencode binary", "fail", result.stdout.strip()[:800])) def add_live_model_check(checks: list[Check], timeout: int, base_url: str | None) -> None: try: with urllib.request.urlopen((base_url or DEFAULT_BASE_URL).rstrip("/") + "/models", timeout=timeout) as response: payload = json.loads(response.read().decode("utf-8", errors="replace")) except Exception as exc: # noqa: BLE001 - smoke report should capture connection failures. checks.append(Check("live model", "fail", f"could not read /models: {exc!r}")) return models = payload.get("data") or [] model = next((item for item in models if item.get("id") == MODEL_ID), None) if model and int(model.get("max_model_len") or 0) >= 16384: checks.append(Check("live model", "pass", f"{base_url or DEFAULT_BASE_URL} reports {MODEL_ID}, max_model_len={model.get('max_model_len')}")) else: checks.append(Check("live model", "fail", f"unexpected /models payload: {payload}")) def run_opencode_smoke(checks: list[Check], timeout: int, keep_dir: bool) -> dict[str, Any]: stamp = utc_stamp() workspace = Path(tempfile.mkdtemp(prefix=f"kaiju-public-opencode-{stamp}-")) filename = f"kaiju_public_smoke_{stamp}.txt" target = workspace / filename prompt = ( "Run pwd first. Then create " f"{filename} with exactly this content and no extra characters: {EXPECTED_TEXT}" ) command = [ "opencode", "run", "--dir", str(workspace), "--dangerously-skip-permissions", prompt, ] result = run_command(command, timeout=timeout) expected_locations = [ ("workspace", target), ("repo", ROOT / filename), ("home", Path.home() / filename), ] observed = { label: path.read_text(encoding="utf-8", errors="replace") if path.is_file() else None for label, path in expected_locations } if result.returncode != 0: checks.append(Check("opencode run", "fail", result.stdout.strip()[-1200:])) elif observed["workspace"] == EXPECTED_TEXT and observed["repo"] is None and observed["home"] is None: checks.append(Check("opencode run", "pass", f"{filename} written only in {workspace}")) else: details = [] for label, value in observed.items(): if value is not None: details.append(f"{label}={value!r}") checks.append(Check("opencode run", "fail", "; ".join(details) or "expected file missing")) if not keep_dir and target.is_file(): # Keep successful smoke workspaces for inspection only when requested. shutil.rmtree(workspace, ignore_errors=True) return { "workspace": str(workspace), "filename": filename, "resolved_default_model": "kaiju/kaiju-coder-7", "resolved_default_agent": "kaiju-coder-7", "command": command, "returncode": result.returncode, "stdout_tail": result.stdout.strip()[-2000:], "observed": observed, "kept": keep_dir, } def write_report(run_dir: Path, checks: list[Check], details: dict[str, Any]) -> None: run_dir.mkdir(parents=True, exist_ok=True) summary = { "ready": not any(check.status in {"fail", "manual"} for check in checks), "summary": { "pass": sum(1 for check in checks if check.status == "pass"), "fail": sum(1 for check in checks if check.status == "fail"), "manual": sum(1 for check in checks if check.status == "manual"), }, "checks": [asdict(check) for check in checks], "details": details, } (run_dir / "result.json").write_text(json.dumps(summary, indent=2) + "\n", encoding="utf-8") lines = [ "# Kaiju Coder 7 Public OpenCode Smoke", "", f"Ready: `{summary['ready']}`", "Resolved default model: `kaiju/kaiju-coder-7`", "Resolved default agent: `kaiju-coder-7`", "Normal user path: `opencode run` without `-m`, `--agent`, or `/kaiju`.", f"Summary: `{summary['summary']['pass']} pass / {summary['summary']['fail']} fail / {summary['summary']['manual']} manual`", "", "| Status | Check | Detail |", "|---|---|---|", ] for check in checks: lines.append(f"| {check.status} | {check.name} | {check.detail.replace('|', '/') } |") if details.get("command"): lines.extend( [ "", "## OpenCode Command", "", "```bash", shell_join(details["command"]), "```", ] ) (run_dir / "summary.md").write_text("\n".join(lines) + "\n", encoding="utf-8") def main() -> int: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--base-url", default=None) parser.add_argument("--timeout", type=int, default=900) parser.add_argument("--runs-dir", type=Path, default=DEFAULT_RUNS_DIR) parser.add_argument("--skip-live", action="store_true") parser.add_argument("--skip-opencode", action="store_true", help="Only run installer/live checks.") parser.add_argument("--keep-dir", action="store_true", help="Keep the temp OpenCode workspace.") args = parser.parse_args() checks: list[Check] = [] add_installer_check(checks, timeout=60, base_url=args.base_url) add_opencode_version_check(checks, timeout=30) if args.skip_live: checks.append(Check("live model", "manual", "skipped by --skip-live")) else: add_live_model_check(checks, timeout=10, base_url=args.base_url) details: dict[str, Any] = {} if args.skip_opencode: checks.append(Check("opencode run", "manual", "skipped by --skip-opencode")) elif any(check.status == "fail" for check in checks): checks.append(Check("opencode run", "manual", "skipped because prerequisite checks failed")) else: details = run_opencode_smoke(checks, timeout=args.timeout, keep_dir=args.keep_dir) run_dir = args.runs_dir / utc_stamp() write_report(run_dir, checks, details) ready = not any(check.status in {"fail", "manual"} for check in checks) print(f"Wrote {run_dir / 'summary.md'}") for check in checks: print(f"[{check.status}] {check.name} - {check.detail}") return 0 if ready else 1 if __name__ == "__main__": raise SystemExit(main())