File size: 9,265 Bytes
c1bf102 1fe486d c1bf102 81ee344 c1bf102 81ee344 c1bf102 81ee344 c1bf102 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 | #!/usr/bin/env python3
"""Run a bounded public OpenCode smoke test for Kaiju Coder 7.
This proves the packaged OpenCode profile can be installed and used from this
Mac for a real one-file write without wrong-directory output. It records a
small public-safe report and avoids reading auth tokens or process command
lines.
"""
from __future__ import annotations
import argparse
import json
import shlex
import shutil
import subprocess
import sys
import tempfile
import urllib.request
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[1]
MODEL_ID = "kaiju-coder-7"
EXPECTED_TEXT = "Kaiju Coder 7 public OpenCode smoke ok"
DEFAULT_RUNS_DIR = ROOT / "runs/public-opencode-smoke"
DEFAULT_BASE_URL = "http://127.0.0.1:18181/v1"
@dataclass
class Check:
name: str
status: str
detail: str
def utc_stamp() -> str:
return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
def run_command(args: list[str], *, timeout: int, cwd: Path = ROOT) -> subprocess.CompletedProcess[str]:
return subprocess.run(
args,
cwd=cwd,
check=False,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
timeout=timeout,
)
def shell_join(args: list[str]) -> str:
return " ".join(shlex.quote(arg) for arg in args)
def add_installer_check(checks: list[Check], timeout: int, base_url: str | None) -> None:
with tempfile.TemporaryDirectory(prefix="kaiju-opencode-config-") as tmp:
args = [
sys.executable,
"scripts/install_kaiju_opencode_profile.py",
"--config-dir",
tmp,
"--dry-run",
]
if base_url:
args.extend(["--base-url", base_url])
result = run_command(args, timeout=timeout)
if (
result.returncode == 0
and MODEL_ID in result.stdout
and "kaiju-no-autocontinue.mjs" in result.stdout
and '"model": "kaiju/kaiju-coder-7"' in result.stdout
and '"default_agent": "kaiju-coder-7"' in result.stdout
):
checks.append(Check("installer dry-run", "pass", "provider, default model, agent, and loop guard preview emitted"))
else:
checks.append(Check("installer dry-run", "fail", result.stdout.strip()[:800]))
def add_opencode_version_check(checks: list[Check], timeout: int) -> None:
if not shutil.which("opencode"):
checks.append(Check("opencode binary", "fail", "opencode is not on PATH"))
return
result = run_command(["opencode", "--version"], timeout=timeout)
if result.returncode == 0 and result.stdout.strip():
checks.append(Check("opencode binary", "pass", f"opencode {result.stdout.strip()}"))
else:
checks.append(Check("opencode binary", "fail", result.stdout.strip()[:800]))
def add_live_model_check(checks: list[Check], timeout: int, base_url: str | None) -> None:
try:
with urllib.request.urlopen((base_url or DEFAULT_BASE_URL).rstrip("/") + "/models", timeout=timeout) as response:
payload = json.loads(response.read().decode("utf-8", errors="replace"))
except Exception as exc: # noqa: BLE001 - smoke report should capture connection failures.
checks.append(Check("live model", "fail", f"could not read /models: {exc!r}"))
return
models = payload.get("data") or []
model = next((item for item in models if item.get("id") == MODEL_ID), None)
if model and int(model.get("max_model_len") or 0) >= 16384:
checks.append(Check("live model", "pass", f"{base_url or DEFAULT_BASE_URL} reports {MODEL_ID}, max_model_len={model.get('max_model_len')}"))
else:
checks.append(Check("live model", "fail", f"unexpected /models payload: {payload}"))
def run_opencode_smoke(checks: list[Check], timeout: int, keep_dir: bool) -> dict[str, Any]:
stamp = utc_stamp()
workspace = Path(tempfile.mkdtemp(prefix=f"kaiju-public-opencode-{stamp}-"))
filename = f"kaiju_public_smoke_{stamp}.txt"
target = workspace / filename
prompt = (
"Run pwd first. Then create "
f"{filename} with exactly this content and no extra characters: {EXPECTED_TEXT}"
)
command = [
"opencode",
"run",
"--dir",
str(workspace),
"--dangerously-skip-permissions",
prompt,
]
result = run_command(command, timeout=timeout)
expected_locations = [
("workspace", target),
("repo", ROOT / filename),
("home", Path.home() / filename),
]
observed = {
label: path.read_text(encoding="utf-8", errors="replace") if path.is_file() else None
for label, path in expected_locations
}
if result.returncode != 0:
checks.append(Check("opencode run", "fail", result.stdout.strip()[-1200:]))
elif observed["workspace"] == EXPECTED_TEXT and observed["repo"] is None and observed["home"] is None:
checks.append(Check("opencode run", "pass", f"{filename} written only in {workspace}"))
else:
details = []
for label, value in observed.items():
if value is not None:
details.append(f"{label}={value!r}")
checks.append(Check("opencode run", "fail", "; ".join(details) or "expected file missing"))
if not keep_dir and target.is_file():
# Keep successful smoke workspaces for inspection only when requested.
shutil.rmtree(workspace, ignore_errors=True)
return {
"workspace": str(workspace),
"filename": filename,
"resolved_default_model": "kaiju/kaiju-coder-7",
"resolved_default_agent": "kaiju-coder-7",
"command": command,
"returncode": result.returncode,
"stdout_tail": result.stdout.strip()[-2000:],
"observed": observed,
"kept": keep_dir,
}
def write_report(run_dir: Path, checks: list[Check], details: dict[str, Any]) -> None:
run_dir.mkdir(parents=True, exist_ok=True)
summary = {
"ready": not any(check.status in {"fail", "manual"} for check in checks),
"summary": {
"pass": sum(1 for check in checks if check.status == "pass"),
"fail": sum(1 for check in checks if check.status == "fail"),
"manual": sum(1 for check in checks if check.status == "manual"),
},
"checks": [asdict(check) for check in checks],
"details": details,
}
(run_dir / "result.json").write_text(json.dumps(summary, indent=2) + "\n", encoding="utf-8")
lines = [
"# Kaiju Coder 7 Public OpenCode Smoke",
"",
f"Ready: `{summary['ready']}`",
"Resolved default model: `kaiju/kaiju-coder-7`",
"Resolved default agent: `kaiju-coder-7`",
"Normal user path: `opencode run` without `-m`, `--agent`, or `/kaiju`.",
f"Summary: `{summary['summary']['pass']} pass / {summary['summary']['fail']} fail / {summary['summary']['manual']} manual`",
"",
"| Status | Check | Detail |",
"|---|---|---|",
]
for check in checks:
lines.append(f"| {check.status} | {check.name} | {check.detail.replace('|', '/') } |")
if details.get("command"):
lines.extend(
[
"",
"## OpenCode Command",
"",
"```bash",
shell_join(details["command"]),
"```",
]
)
(run_dir / "summary.md").write_text("\n".join(lines) + "\n", encoding="utf-8")
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--base-url", default=None)
parser.add_argument("--timeout", type=int, default=900)
parser.add_argument("--runs-dir", type=Path, default=DEFAULT_RUNS_DIR)
parser.add_argument("--skip-live", action="store_true")
parser.add_argument("--skip-opencode", action="store_true", help="Only run installer/live checks.")
parser.add_argument("--keep-dir", action="store_true", help="Keep the temp OpenCode workspace.")
args = parser.parse_args()
checks: list[Check] = []
add_installer_check(checks, timeout=60, base_url=args.base_url)
add_opencode_version_check(checks, timeout=30)
if args.skip_live:
checks.append(Check("live model", "manual", "skipped by --skip-live"))
else:
add_live_model_check(checks, timeout=10, base_url=args.base_url)
details: dict[str, Any] = {}
if args.skip_opencode:
checks.append(Check("opencode run", "manual", "skipped by --skip-opencode"))
elif any(check.status == "fail" for check in checks):
checks.append(Check("opencode run", "manual", "skipped because prerequisite checks failed"))
else:
details = run_opencode_smoke(checks, timeout=args.timeout, keep_dir=args.keep_dir)
run_dir = args.runs_dir / utc_stamp()
write_report(run_dir, checks, details)
ready = not any(check.status in {"fail", "manual"} for check in checks)
print(f"Wrote {run_dir / 'summary.md'}")
for check in checks:
print(f"[{check.status}] {check.name} - {check.detail}")
return 0 if ready else 1
if __name__ == "__main__":
raise SystemExit(main())
|