kaiju-coder-7-opencode / scripts /run_kaiju_public_opencode_smoke.py
restokes92's picture
Upload Kaiju Coder 7 OpenCode helper package
c1bf102 verified
raw
history blame
8.92 kB
#!/usr/bin/env python3
"""Run a bounded public OpenCode smoke test for Kaiju Coder 7.
This proves the packaged OpenCode profile can be installed and used from this
Mac for a real one-file write without wrong-directory output. It records a
small public-safe report and avoids reading auth tokens or process command
lines.
"""
from __future__ import annotations
import argparse
import json
import shlex
import shutil
import subprocess
import sys
import tempfile
import urllib.request
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[1]
MODEL = "kaiju/kaiju-coder-7"
AGENT = "kaiju-coder-7"
MODEL_ID = "kaiju-coder-7"
EXPECTED_TEXT = "Kaiju Coder 7 public OpenCode smoke ok"
DEFAULT_RUNS_DIR = ROOT / "runs/public-opencode-smoke"
DEFAULT_BASE_URL = "http://100.109.109.14:18083/v1"
@dataclass
class Check:
name: str
status: str
detail: str
def utc_stamp() -> str:
return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
def run_command(args: list[str], *, timeout: int, cwd: Path = ROOT) -> subprocess.CompletedProcess[str]:
return subprocess.run(
args,
cwd=cwd,
check=False,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
timeout=timeout,
)
def shell_join(args: list[str]) -> str:
return " ".join(shlex.quote(arg) for arg in args)
def add_installer_check(checks: list[Check], timeout: int, base_url: str | None) -> None:
with tempfile.TemporaryDirectory(prefix="kaiju-opencode-config-") as tmp:
args = [
sys.executable,
"scripts/install_kaiju_opencode_profile.py",
"--config-dir",
tmp,
"--dry-run",
]
if base_url:
args.extend(["--base-url", base_url])
result = run_command(args, timeout=timeout)
if result.returncode == 0 and MODEL_ID in result.stdout and "kaiju-no-autocontinue.mjs" in result.stdout:
checks.append(Check("installer dry-run", "pass", "provider, agent, and loop guard preview emitted"))
else:
checks.append(Check("installer dry-run", "fail", result.stdout.strip()[:800]))
def add_opencode_version_check(checks: list[Check], timeout: int) -> None:
if not shutil.which("opencode"):
checks.append(Check("opencode binary", "fail", "opencode is not on PATH"))
return
result = run_command(["opencode", "--version"], timeout=timeout)
if result.returncode == 0 and result.stdout.strip():
checks.append(Check("opencode binary", "pass", f"opencode {result.stdout.strip()}"))
else:
checks.append(Check("opencode binary", "fail", result.stdout.strip()[:800]))
def add_live_model_check(checks: list[Check], timeout: int, base_url: str | None) -> None:
try:
with urllib.request.urlopen((base_url or DEFAULT_BASE_URL).rstrip("/") + "/models", timeout=timeout) as response:
payload = json.loads(response.read().decode("utf-8", errors="replace"))
except Exception as exc: # noqa: BLE001 - smoke report should capture connection failures.
checks.append(Check("live model", "fail", f"could not read /models: {exc!r}"))
return
models = payload.get("data") or []
model = next((item for item in models if item.get("id") == MODEL_ID), None)
if model and int(model.get("max_model_len") or 0) >= 16384:
checks.append(Check("live model", "pass", f"{base_url or DEFAULT_BASE_URL} reports {MODEL_ID}, max_model_len={model.get('max_model_len')}"))
else:
checks.append(Check("live model", "fail", f"unexpected /models payload: {payload}"))
def run_opencode_smoke(checks: list[Check], timeout: int, keep_dir: bool) -> dict[str, Any]:
stamp = utc_stamp()
workspace = Path(tempfile.mkdtemp(prefix=f"kaiju-public-opencode-{stamp}-"))
filename = f"kaiju_public_smoke_{stamp}.txt"
target = workspace / filename
prompt = (
"Run pwd first. Then create "
f"{filename} with exactly this content and no extra characters: {EXPECTED_TEXT}"
)
command = [
"opencode",
"run",
"-m",
MODEL,
"--agent",
AGENT,
"--dir",
str(workspace),
"--dangerously-skip-permissions",
prompt,
]
result = run_command(command, timeout=timeout)
expected_locations = [
("workspace", target),
("repo", ROOT / filename),
("home", Path.home() / filename),
]
observed = {
label: path.read_text(encoding="utf-8", errors="replace") if path.is_file() else None
for label, path in expected_locations
}
if result.returncode != 0:
checks.append(Check("opencode run", "fail", result.stdout.strip()[-1200:]))
elif observed["workspace"] == EXPECTED_TEXT and observed["repo"] is None and observed["home"] is None:
checks.append(Check("opencode run", "pass", f"{filename} written only in {workspace}"))
else:
details = []
for label, value in observed.items():
if value is not None:
details.append(f"{label}={value!r}")
checks.append(Check("opencode run", "fail", "; ".join(details) or "expected file missing"))
if not keep_dir and target.is_file():
# Keep successful smoke workspaces for inspection only when requested.
shutil.rmtree(workspace, ignore_errors=True)
return {
"workspace": str(workspace),
"filename": filename,
"command": command,
"returncode": result.returncode,
"stdout_tail": result.stdout.strip()[-2000:],
"observed": observed,
"kept": keep_dir,
}
def write_report(run_dir: Path, checks: list[Check], details: dict[str, Any]) -> None:
run_dir.mkdir(parents=True, exist_ok=True)
summary = {
"ready": not any(check.status in {"fail", "manual"} for check in checks),
"summary": {
"pass": sum(1 for check in checks if check.status == "pass"),
"fail": sum(1 for check in checks if check.status == "fail"),
"manual": sum(1 for check in checks if check.status == "manual"),
},
"checks": [asdict(check) for check in checks],
"details": details,
}
(run_dir / "result.json").write_text(json.dumps(summary, indent=2) + "\n", encoding="utf-8")
lines = [
"# Kaiju Coder 7 Public OpenCode Smoke",
"",
f"Ready: `{summary['ready']}`",
f"Summary: `{summary['summary']['pass']} pass / {summary['summary']['fail']} fail / {summary['summary']['manual']} manual`",
"",
"| Status | Check | Detail |",
"|---|---|---|",
]
for check in checks:
lines.append(f"| {check.status} | {check.name} | {check.detail.replace('|', '/') } |")
if details.get("command"):
lines.extend(
[
"",
"## OpenCode Command",
"",
"```bash",
shell_join(details["command"]),
"```",
]
)
(run_dir / "summary.md").write_text("\n".join(lines) + "\n", encoding="utf-8")
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--base-url", default=None)
parser.add_argument("--timeout", type=int, default=900)
parser.add_argument("--runs-dir", type=Path, default=DEFAULT_RUNS_DIR)
parser.add_argument("--skip-live", action="store_true")
parser.add_argument("--skip-opencode", action="store_true", help="Only run installer/live checks.")
parser.add_argument("--keep-dir", action="store_true", help="Keep the temp OpenCode workspace.")
args = parser.parse_args()
checks: list[Check] = []
add_installer_check(checks, timeout=60, base_url=args.base_url)
add_opencode_version_check(checks, timeout=30)
if args.skip_live:
checks.append(Check("live model", "manual", "skipped by --skip-live"))
else:
add_live_model_check(checks, timeout=10, base_url=args.base_url)
details: dict[str, Any] = {}
if args.skip_opencode:
checks.append(Check("opencode run", "manual", "skipped by --skip-opencode"))
elif any(check.status == "fail" for check in checks):
checks.append(Check("opencode run", "manual", "skipped because prerequisite checks failed"))
else:
details = run_opencode_smoke(checks, timeout=args.timeout, keep_dir=args.keep_dir)
run_dir = args.runs_dir / utc_stamp()
write_report(run_dir, checks, details)
ready = not any(check.status in {"fail", "manual"} for check in checks)
print(f"Wrote {run_dir / 'summary.md'}")
for check in checks:
print(f"[{check.status}] {check.name} - {check.detail}")
return 0 if ready else 1
if __name__ == "__main__":
raise SystemExit(main())