restokes92's picture
Upload Kaiju Coder 7 OpenCode helper package
c75f885 verified
#!/usr/bin/env python3
"""Coding artifact harness for implementation-style prompts.
This handles the gap between a full generated project and a technical plan:
customers often ask for a production-ready utility, parser, router, or safe
file writer. Those requests should produce code, tests, and verification notes.
"""
from __future__ import annotations
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
FORBIDDEN_TOKENS = ["sk_live_", "sk_test_", "rk_live_", "pplx-", "AIza", "anthropic_api_key"]
@dataclass
class CodingSpec:
title: str
artifact_kind: str
language: str = "TypeScript"
files: list[str] = field(default_factory=list)
verification: list[str] = field(default_factory=list)
safety_notes: list[str] = field(default_factory=list)
def clean_text(value: Any, fallback: str) -> str:
if not isinstance(value, str):
return fallback
value = re.sub(r"\s+", " ", value).strip()
return value or fallback
def slugify(value: str) -> str:
return re.sub(r"[^a-z0-9]+", "-", value.lower()).strip("-")[:70] or "coding-artifact"
def infer_kind(prompt: str) -> str:
lower = prompt.lower()
if "sse" in lower or "streaming response" in lower:
return "sse_parser"
if "artifact writer" in lower or "write files" in lower or "path traversal" in lower:
return "artifact_writer"
if "fleet router" in lower or "model fleet" in lower or "goku" in lower:
return "fleet_router"
if "rate limiter" in lower or "token bucket" in lower:
return "rate_limiter"
return "typescript_utility"
def spec_from_prompt(prompt: str) -> CodingSpec:
kind = infer_kind(prompt)
defaults = {
"rate_limiter": CodingSpec(
title="Token Bucket Rate Limiter",
artifact_kind=kind,
files=["src/rate-limit.ts", "tests/rate-limit.test.ts"],
verification=["npm run test", "concurrent reserve/refund smoke", "timeout and refill edge cases"],
safety_notes=["state is per key", "reserve/debit/refund are explicit", "no provider secrets are stored"],
),
"sse_parser": CodingSpec(
title="OpenAI-Compatible SSE Parser",
artifact_kind=kind,
files=["src/sse-parser.ts", "tests/sse-parser.test.ts"],
verification=["npm run test", "malformed JSON smoke", "[DONE] termination smoke"],
safety_notes=["malformed events are reported", "partial chunks are preserved", "stream end is explicit"],
),
"artifact_writer": CodingSpec(
title="Safe Artifact Writer",
artifact_kind=kind,
files=["src/artifact-writer.ts", "tests/artifact-writer.test.ts"],
verification=["npm run test", "path traversal rejection smoke", "atomic rename smoke"],
safety_notes=["workspace root is enforced", "writes use temp files then rename", "unrelated files are preserved"],
),
"fleet_router": CodingSpec(
title="Local Model Fleet Router",
artifact_kind=kind,
files=["src/fleet-router.ts", "tests/fleet-router.test.ts"],
verification=["npm run test", "primary failure fallback smoke", "circuit breaker smoke"],
safety_notes=["customer sees public brain name only", "internal hosts stay hidden", "timeouts prevent endless hangs"],
),
"typescript_utility": CodingSpec(
title="Production TypeScript Utility",
artifact_kind=kind,
files=["src/index.ts", "tests/index.test.ts"],
verification=["npm run test", "input validation smoke"],
safety_notes=["validate inputs", "avoid hardcoded secrets", "return explicit errors"],
),
}
return defaults[kind]
def normalize_spec(raw: dict[str, Any] | CodingSpec, prompt: str = "") -> CodingSpec:
fallback = spec_from_prompt(prompt)
if isinstance(raw, CodingSpec):
return raw
files = raw.get("files") if isinstance(raw.get("files"), list) else fallback.files
verification = raw.get("verification") if isinstance(raw.get("verification"), list) else fallback.verification
safety_notes = raw.get("safety_notes") if isinstance(raw.get("safety_notes"), list) else fallback.safety_notes
kind = clean_text(raw.get("artifact_kind"), fallback.artifact_kind)
if kind not in {"rate_limiter", "sse_parser", "artifact_writer", "fleet_router", "typescript_utility"}:
kind = fallback.artifact_kind
return CodingSpec(
title=clean_text(raw.get("title"), fallback.title),
artifact_kind=kind,
language=clean_text(raw.get("language"), fallback.language),
files=[clean_text(item, "") for item in files if isinstance(item, str)] or fallback.files,
verification=[clean_text(item, "") for item in verification if isinstance(item, str)] or fallback.verification,
safety_notes=[clean_text(item, "") for item in safety_notes if isinstance(item, str)] or fallback.safety_notes,
)
def render_rate_limiter() -> str:
return """```ts
export type BucketSnapshot = {
key: string;
capacity: number;
tokens: number;
updatedAtMs: number;
};
export type Reservation = {
ok: boolean;
key: string;
tokens: number;
retryAfterMs: number;
};
export class TokenBucketRateLimiter {
private buckets = new Map<string, BucketSnapshot>();
constructor(
private readonly capacity = 20,
private readonly refillPerSecond = 1,
private readonly now = () => Date.now(),
) {}
reserve(key: string, tokens = 1): Reservation {
if (!key.trim()) throw new Error("key is required");
if (tokens <= 0 || tokens > this.capacity) throw new Error("invalid token request");
const bucket = this.refill(key);
if (bucket.tokens < tokens) {
const missing = tokens - bucket.tokens;
return { ok: false, key, tokens: 0, retryAfterMs: Math.ceil((missing / this.refillPerSecond) * 1000) };
}
bucket.tokens -= tokens;
return { ok: true, key, tokens, retryAfterMs: 0 };
}
debit(reservation: Reservation): void {
if (!reservation.ok) throw new Error("cannot debit failed reservation");
}
refund(reservation: Reservation): void {
if (!reservation.ok) return;
const bucket = this.refill(reservation.key);
bucket.tokens = Math.min(this.capacity, bucket.tokens + reservation.tokens);
}
snapshot(key: string): BucketSnapshot {
return { ...this.refill(key) };
}
private refill(key: string): BucketSnapshot {
const nowMs = this.now();
const existing = this.buckets.get(key) ?? { key, capacity: this.capacity, tokens: this.capacity, updatedAtMs: nowMs };
const elapsedSeconds = Math.max(0, (nowMs - existing.updatedAtMs) / 1000);
existing.tokens = Math.min(this.capacity, existing.tokens + elapsedSeconds * this.refillPerSecond);
existing.updatedAtMs = nowMs;
this.buckets.set(key, existing);
return existing;
}
}
```"""
def render_sse_parser() -> str:
return """```ts
export type StreamDelta = { type: "content"; content: string } | { type: "done" } | { type: "error"; error: string };
export function parseOpenAISse(input: string): StreamDelta[] {
const events: StreamDelta[] = [];
for (const rawLine of input.split(/\\r?\\n/)) {
const line = rawLine.trim();
if (!line || line.startsWith(":")) continue;
if (!line.startsWith("data:")) continue;
const payload = line.slice("data:".length).trim();
if (payload === "[DONE]") {
events.push({ type: "done" });
continue;
}
try {
const parsed = JSON.parse(payload);
const content = parsed?.choices?.[0]?.delta?.content;
if (typeof content === "string" && content.length > 0) {
events.push({ type: "content", content });
}
} catch (error) {
events.push({ type: "error", error: error instanceof Error ? error.message : "invalid JSON event" });
}
}
return events;
}
```"""
def render_artifact_writer() -> str:
return """```ts
import { mkdir, rename, writeFile } from "node:fs/promises";
import path from "node:path";
import crypto from "node:crypto";
export type ArtifactFile = { relativePath: string; contents: string };
export type ArtifactManifest = { root: string; files: string[]; writtenAt: string };
export async function writeArtifacts(root: string, files: ArtifactFile[]): Promise<ArtifactManifest> {
const rootResolved = path.resolve(root);
const written: string[] = [];
for (const file of files) {
const target = safeResolve(rootResolved, file.relativePath);
await mkdir(path.dirname(target), { recursive: true });
const temp = `${target}.${crypto.randomUUID()}.tmp`;
await writeFile(temp, file.contents, "utf8");
await rename(temp, target);
written.push(path.relative(rootResolved, target));
}
return { root: rootResolved, files: written.sort(), writtenAt: new Date().toISOString() };
}
export function safeResolve(rootResolved: string, relativePath: string): string {
if (!relativePath || path.isAbsolute(relativePath)) throw new Error("relative path required");
const target = path.resolve(rootResolved, relativePath);
if (target !== rootResolved && !target.startsWith(rootResolved + path.sep)) {
throw new Error("path traversal blocked");
}
return target;
}
```"""
def render_fleet_router() -> str:
return """```ts
export type ModelNode = { id: string; publicName: string; url: string; priority: number; timeoutMs: number };
export type RouteEvent = { type: "status" | "fallback" | "error"; message: string; nodeId?: string };
export class ModelFleetRouter {
private failures = new Map<string, { count: number; openedUntil: number }>();
constructor(private readonly nodes: ModelNode[], private readonly now = () => Date.now()) {}
async route(prompt: string, emit: (event: RouteEvent) => void, callNode: (node: ModelNode, prompt: string) => Promise<string>): Promise<string> {
for (const node of [...this.nodes].sort((a, b) => a.priority - b.priority)) {
if (this.circuitOpen(node.id)) {
emit({ type: "fallback", nodeId: node.id, message: "Node temporarily unhealthy; trying next route." });
continue;
}
emit({ type: "status", nodeId: node.id, message: `Working through ${node.publicName}.` });
try {
const result = await this.withTimeout(callNode(node, prompt), node.timeoutMs);
this.failures.delete(node.id);
return result;
} catch (error) {
this.recordFailure(node.id);
emit({ type: "error", nodeId: node.id, message: error instanceof Error ? error.message : "route failed" });
}
}
throw new Error("All model routes failed");
}
private circuitOpen(nodeId: string): boolean {
return (this.failures.get(nodeId)?.openedUntil ?? 0) > this.now();
}
private recordFailure(nodeId: string): void {
const current = this.failures.get(nodeId) ?? { count: 0, openedUntil: 0 };
current.count += 1;
if (current.count >= 3) current.openedUntil = this.now() + 30_000;
this.failures.set(nodeId, current);
}
private withTimeout<T>(promise: Promise<T>, timeoutMs: number): Promise<T> {
return Promise.race([
promise,
new Promise<T>((_, reject) => setTimeout(() => reject(new Error("model route timed out")), timeoutMs)),
]);
}
}
```"""
def code_for_kind(kind: str) -> str:
if kind == "rate_limiter":
return render_rate_limiter()
if kind == "sse_parser":
return render_sse_parser()
if kind == "artifact_writer":
return render_artifact_writer()
if kind == "fleet_router":
return render_fleet_router()
return render_rate_limiter()
def render_tests(spec: CodingSpec) -> str:
name = spec.files[-1] if spec.files else "tests/index.test.ts"
if spec.artifact_kind == "sse_parser":
body = """```ts
import { describe, expect, it } from "vitest";
import { parseOpenAISse } from "../src/sse-parser";
describe("parseOpenAISse", () => {
it("returns content and done events", () => {
const events = parseOpenAISse('data: {"choices":[{"delta":{"content":"hi"}}]}\\n\\ndata: [DONE]\\n\\n');
expect(events).toEqual([{ type: "content", content: "hi" }, { type: "done" }]);
});
it("reports malformed JSON without dropping the stream", () => {
expect(parseOpenAISse("data: {bad}\\n")[0].type).toBe("error");
});
});
```"""
elif spec.artifact_kind == "artifact_writer":
body = """```ts
import { mkdtemp, readFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import path from "node:path";
import { describe, expect, it } from "vitest";
import { safeResolve, writeArtifacts } from "../src/artifact-writer";
describe("writeArtifacts", () => {
it("blocks path traversal", () => {
expect(() => safeResolve("/tmp/workspace", "../secret.txt")).toThrow("path traversal blocked");
});
it("writes files under the workspace", async () => {
const root = await mkdtemp(path.join(tmpdir(), "kaiju-"));
const manifest = await writeArtifacts(root, [{ relativePath: "index.html", contents: "<h1>ok</h1>" }]);
expect(await readFile(path.join(root, "index.html"), "utf8")).toContain("ok");
expect(manifest.files).toEqual(["index.html"]);
});
});
```"""
elif spec.artifact_kind == "fleet_router":
body = """```ts
import { describe, expect, it } from "vitest";
import { ModelFleetRouter } from "../src/fleet-router";
describe("ModelFleetRouter", () => {
it("falls back after a failed primary route", async () => {
const events: unknown[] = [];
const router = new ModelFleetRouter([
{ id: "goku", publicName: "Arianna", url: "hidden", priority: 1, timeoutMs: 50 },
{ id: "gojira-a", publicName: "Arianna", url: "hidden", priority: 2, timeoutMs: 50 },
]);
const result = await router.route("build", (event) => events.push(event), async (node) => {
if (node.id === "goku") throw new Error("down");
return "ok";
});
expect(result).toBe("ok");
expect(events.length).toBeGreaterThan(1);
});
});
```"""
else:
body = """```ts
import { describe, expect, it } from "vitest";
import { TokenBucketRateLimiter } from "../src/rate-limit";
describe("TokenBucketRateLimiter", () => {
it("reserves, debits, and refunds tokens", () => {
const limiter = new TokenBucketRateLimiter(2, 1, () => 1_000);
const reservation = limiter.reserve("user-1", 2);
expect(reservation.ok).toBe(true);
expect(limiter.reserve("user-1").ok).toBe(false);
limiter.refund(reservation);
expect(limiter.reserve("user-1").ok).toBe(true);
});
});
```"""
return f"### {name}\n\n{body}"
def render_markdown(spec: CodingSpec, prompt: str) -> str:
return f"""# {spec.title}
This is an implementation-ready {spec.language} answer, not a plan. It includes file structure, code, tests, state/config notes, safety rules, and verification.
## File Structure
{chr(10).join(f"- `{file}`" for file in spec.files)}
## Implementation
### {spec.files[0] if spec.files else "src/index.ts"}
{code_for_kind(spec.artifact_kind)}
## Tests
{render_tests(spec)}
## State And Config
- State is explicit and scoped to the caller key, workspace, stream, or model route.
- Config should come from environment variables or constructor arguments, never hardcoded provider secrets.
- Persisted state should be written through a controlled storage layer when this moves from in-memory tests to production.
## Safety
{chr(10).join(f"- {note}" for note in spec.safety_notes)}
- Preserve unrelated user files and validate inputs before any destructive action.
## Verification
{chr(10).join(f"- `{step}`" for step in spec.verification)}
- Add one smoke test for the exact customer flow before shipping.
## Fit For The Original Request
The request was: {prompt.strip()}
"""
def validate_markdown(markdown: str, spec: CodingSpec) -> list[str]:
errors: list[str] = []
lower = markdown.lower()
if not markdown.lstrip().startswith("# "):
errors.append("coding artifact missing markdown title")
if "```ts" not in markdown:
errors.append("coding artifact missing TypeScript code block")
if "describe(" not in markdown or "expect(" not in markdown:
errors.append("coding artifact missing tests")
if "state" not in lower or "config" not in lower:
errors.append("coding artifact missing state/config notes")
if "safety" not in lower or ("verify" not in lower and "verification" not in lower):
errors.append("coding artifact missing safety or verification")
for token in FORBIDDEN_TOKENS:
if token.lower() in lower:
errors.append(f"forbidden token found: {token}")
return errors
def render_from_prompt(prompt: str) -> tuple[CodingSpec, str, list[str]]:
spec = spec_from_prompt(prompt)
markdown = render_markdown(spec, prompt)
return spec, markdown, validate_markdown(markdown, spec)
def write_markdown(path: Path, markdown: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(markdown, encoding="utf-8")