| |
| """Run a bounded public OpenCode smoke test for Kaiju Coder 7. |
| |
| This proves the packaged OpenCode profile can be installed and used from this |
| Mac for a real one-file write without wrong-directory output. It records a |
| small public-safe report and avoids reading auth tokens or process command |
| lines. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import shlex |
| import shutil |
| import subprocess |
| import sys |
| import tempfile |
| import urllib.request |
| from dataclasses import asdict, dataclass |
| from datetime import datetime, timezone |
| from pathlib import Path |
| from typing import Any |
|
|
|
|
| ROOT = Path(__file__).resolve().parents[1] |
| MODEL = "kaiju/kaiju-coder-7" |
| AGENT = "kaiju-coder-7" |
| MODEL_ID = "kaiju-coder-7" |
| EXPECTED_TEXT = "Kaiju Coder 7 public OpenCode smoke ok" |
| DEFAULT_RUNS_DIR = ROOT / "runs/public-opencode-smoke" |
| DEFAULT_BASE_URL = "http://100.109.109.14:18083/v1" |
|
|
|
|
| @dataclass |
| class Check: |
| name: str |
| status: str |
| detail: str |
|
|
|
|
| def utc_stamp() -> str: |
| return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") |
|
|
|
|
| def run_command(args: list[str], *, timeout: int, cwd: Path = ROOT) -> subprocess.CompletedProcess[str]: |
| return subprocess.run( |
| args, |
| cwd=cwd, |
| check=False, |
| text=True, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.STDOUT, |
| timeout=timeout, |
| ) |
|
|
|
|
| def shell_join(args: list[str]) -> str: |
| return " ".join(shlex.quote(arg) for arg in args) |
|
|
|
|
| def add_installer_check(checks: list[Check], timeout: int, base_url: str | None) -> None: |
| with tempfile.TemporaryDirectory(prefix="kaiju-opencode-config-") as tmp: |
| args = [ |
| sys.executable, |
| "scripts/install_kaiju_opencode_profile.py", |
| "--config-dir", |
| tmp, |
| "--dry-run", |
| ] |
| if base_url: |
| args.extend(["--base-url", base_url]) |
| result = run_command(args, timeout=timeout) |
| if result.returncode == 0 and MODEL_ID in result.stdout and "kaiju-no-autocontinue.mjs" in result.stdout: |
| checks.append(Check("installer dry-run", "pass", "provider, agent, and loop guard preview emitted")) |
| else: |
| checks.append(Check("installer dry-run", "fail", result.stdout.strip()[:800])) |
|
|
|
|
| def add_opencode_version_check(checks: list[Check], timeout: int) -> None: |
| if not shutil.which("opencode"): |
| checks.append(Check("opencode binary", "fail", "opencode is not on PATH")) |
| return |
| result = run_command(["opencode", "--version"], timeout=timeout) |
| if result.returncode == 0 and result.stdout.strip(): |
| checks.append(Check("opencode binary", "pass", f"opencode {result.stdout.strip()}")) |
| else: |
| checks.append(Check("opencode binary", "fail", result.stdout.strip()[:800])) |
|
|
|
|
| def add_live_model_check(checks: list[Check], timeout: int, base_url: str | None) -> None: |
| try: |
| with urllib.request.urlopen((base_url or DEFAULT_BASE_URL).rstrip("/") + "/models", timeout=timeout) as response: |
| payload = json.loads(response.read().decode("utf-8", errors="replace")) |
| except Exception as exc: |
| checks.append(Check("live model", "fail", f"could not read /models: {exc!r}")) |
| return |
| models = payload.get("data") or [] |
| model = next((item for item in models if item.get("id") == MODEL_ID), None) |
| if model and int(model.get("max_model_len") or 0) >= 16384: |
| checks.append(Check("live model", "pass", f"{base_url or DEFAULT_BASE_URL} reports {MODEL_ID}, max_model_len={model.get('max_model_len')}")) |
| else: |
| checks.append(Check("live model", "fail", f"unexpected /models payload: {payload}")) |
|
|
|
|
| def run_opencode_smoke(checks: list[Check], timeout: int, keep_dir: bool) -> dict[str, Any]: |
| stamp = utc_stamp() |
| workspace = Path(tempfile.mkdtemp(prefix=f"kaiju-public-opencode-{stamp}-")) |
| filename = f"kaiju_public_smoke_{stamp}.txt" |
| target = workspace / filename |
| prompt = ( |
| "Run pwd first. Then create " |
| f"{filename} with exactly this content and no extra characters: {EXPECTED_TEXT}" |
| ) |
| command = [ |
| "opencode", |
| "run", |
| "-m", |
| MODEL, |
| "--agent", |
| AGENT, |
| "--dir", |
| str(workspace), |
| "--dangerously-skip-permissions", |
| prompt, |
| ] |
| result = run_command(command, timeout=timeout) |
| expected_locations = [ |
| ("workspace", target), |
| ("repo", ROOT / filename), |
| ("home", Path.home() / filename), |
| ] |
| observed = { |
| label: path.read_text(encoding="utf-8", errors="replace") if path.is_file() else None |
| for label, path in expected_locations |
| } |
| if result.returncode != 0: |
| checks.append(Check("opencode run", "fail", result.stdout.strip()[-1200:])) |
| elif observed["workspace"] == EXPECTED_TEXT and observed["repo"] is None and observed["home"] is None: |
| checks.append(Check("opencode run", "pass", f"{filename} written only in {workspace}")) |
| else: |
| details = [] |
| for label, value in observed.items(): |
| if value is not None: |
| details.append(f"{label}={value!r}") |
| checks.append(Check("opencode run", "fail", "; ".join(details) or "expected file missing")) |
|
|
| if not keep_dir and target.is_file(): |
| |
| shutil.rmtree(workspace, ignore_errors=True) |
|
|
| return { |
| "workspace": str(workspace), |
| "filename": filename, |
| "command": command, |
| "returncode": result.returncode, |
| "stdout_tail": result.stdout.strip()[-2000:], |
| "observed": observed, |
| "kept": keep_dir, |
| } |
|
|
|
|
| def write_report(run_dir: Path, checks: list[Check], details: dict[str, Any]) -> None: |
| run_dir.mkdir(parents=True, exist_ok=True) |
| summary = { |
| "ready": not any(check.status in {"fail", "manual"} for check in checks), |
| "summary": { |
| "pass": sum(1 for check in checks if check.status == "pass"), |
| "fail": sum(1 for check in checks if check.status == "fail"), |
| "manual": sum(1 for check in checks if check.status == "manual"), |
| }, |
| "checks": [asdict(check) for check in checks], |
| "details": details, |
| } |
| (run_dir / "result.json").write_text(json.dumps(summary, indent=2) + "\n", encoding="utf-8") |
| lines = [ |
| "# Kaiju Coder 7 Public OpenCode Smoke", |
| "", |
| f"Ready: `{summary['ready']}`", |
| f"Summary: `{summary['summary']['pass']} pass / {summary['summary']['fail']} fail / {summary['summary']['manual']} manual`", |
| "", |
| "| Status | Check | Detail |", |
| "|---|---|---|", |
| ] |
| for check in checks: |
| lines.append(f"| {check.status} | {check.name} | {check.detail.replace('|', '/') } |") |
| if details.get("command"): |
| lines.extend( |
| [ |
| "", |
| "## OpenCode Command", |
| "", |
| "```bash", |
| shell_join(details["command"]), |
| "```", |
| ] |
| ) |
| (run_dir / "summary.md").write_text("\n".join(lines) + "\n", encoding="utf-8") |
|
|
|
|
| def main() -> int: |
| parser = argparse.ArgumentParser(description=__doc__) |
| parser.add_argument("--base-url", default=None) |
| parser.add_argument("--timeout", type=int, default=900) |
| parser.add_argument("--runs-dir", type=Path, default=DEFAULT_RUNS_DIR) |
| parser.add_argument("--skip-live", action="store_true") |
| parser.add_argument("--skip-opencode", action="store_true", help="Only run installer/live checks.") |
| parser.add_argument("--keep-dir", action="store_true", help="Keep the temp OpenCode workspace.") |
| args = parser.parse_args() |
|
|
| checks: list[Check] = [] |
| add_installer_check(checks, timeout=60, base_url=args.base_url) |
| add_opencode_version_check(checks, timeout=30) |
| if args.skip_live: |
| checks.append(Check("live model", "manual", "skipped by --skip-live")) |
| else: |
| add_live_model_check(checks, timeout=10, base_url=args.base_url) |
| details: dict[str, Any] = {} |
| if args.skip_opencode: |
| checks.append(Check("opencode run", "manual", "skipped by --skip-opencode")) |
| elif any(check.status == "fail" for check in checks): |
| checks.append(Check("opencode run", "manual", "skipped because prerequisite checks failed")) |
| else: |
| details = run_opencode_smoke(checks, timeout=args.timeout, keep_dir=args.keep_dir) |
|
|
| run_dir = args.runs_dir / utc_stamp() |
| write_report(run_dir, checks, details) |
| ready = not any(check.status in {"fail", "manual"} for check in checks) |
| print(f"Wrote {run_dir / 'summary.md'}") |
| for check in checks: |
| print(f"[{check.status}] {check.name} - {check.detail}") |
| return 0 if ready else 1 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|