#!/usr/bin/env python3 """Coding artifact harness for implementation-style prompts. This handles the gap between a full generated project and a technical plan: customers often ask for a production-ready utility, parser, router, or safe file writer. Those requests should produce code, tests, and verification notes. """ from __future__ import annotations import re from dataclasses import dataclass, field from pathlib import Path from typing import Any FORBIDDEN_TOKENS = ["sk_live_", "sk_test_", "rk_live_", "pplx-", "AIza", "anthropic_api_key"] @dataclass class CodingSpec: title: str artifact_kind: str language: str = "TypeScript" files: list[str] = field(default_factory=list) verification: list[str] = field(default_factory=list) safety_notes: list[str] = field(default_factory=list) def clean_text(value: Any, fallback: str) -> str: if not isinstance(value, str): return fallback value = re.sub(r"\s+", " ", value).strip() return value or fallback def slugify(value: str) -> str: return re.sub(r"[^a-z0-9]+", "-", value.lower()).strip("-")[:70] or "coding-artifact" def infer_kind(prompt: str) -> str: lower = prompt.lower() if "sse" in lower or "streaming response" in lower: return "sse_parser" if "artifact writer" in lower or "write files" in lower or "path traversal" in lower: return "artifact_writer" if "fleet router" in lower or "model fleet" in lower or "goku" in lower: return "fleet_router" if "rate limiter" in lower or "token bucket" in lower: return "rate_limiter" return "typescript_utility" def spec_from_prompt(prompt: str) -> CodingSpec: kind = infer_kind(prompt) defaults = { "rate_limiter": CodingSpec( title="Token Bucket Rate Limiter", artifact_kind=kind, files=["src/rate-limit.ts", "tests/rate-limit.test.ts"], verification=["npm run test", "concurrent reserve/refund smoke", "timeout and refill edge cases"], safety_notes=["state is per key", "reserve/debit/refund are explicit", "no provider secrets are stored"], ), "sse_parser": CodingSpec( title="OpenAI-Compatible SSE Parser", artifact_kind=kind, files=["src/sse-parser.ts", "tests/sse-parser.test.ts"], verification=["npm run test", "malformed JSON smoke", "[DONE] termination smoke"], safety_notes=["malformed events are reported", "partial chunks are preserved", "stream end is explicit"], ), "artifact_writer": CodingSpec( title="Safe Artifact Writer", artifact_kind=kind, files=["src/artifact-writer.ts", "tests/artifact-writer.test.ts"], verification=["npm run test", "path traversal rejection smoke", "atomic rename smoke"], safety_notes=["workspace root is enforced", "writes use temp files then rename", "unrelated files are preserved"], ), "fleet_router": CodingSpec( title="Local Model Fleet Router", artifact_kind=kind, files=["src/fleet-router.ts", "tests/fleet-router.test.ts"], verification=["npm run test", "primary failure fallback smoke", "circuit breaker smoke"], safety_notes=["customer sees public brain name only", "internal hosts stay hidden", "timeouts prevent endless hangs"], ), "typescript_utility": CodingSpec( title="Production TypeScript Utility", artifact_kind=kind, files=["src/index.ts", "tests/index.test.ts"], verification=["npm run test", "input validation smoke"], safety_notes=["validate inputs", "avoid hardcoded secrets", "return explicit errors"], ), } return defaults[kind] def normalize_spec(raw: dict[str, Any] | CodingSpec, prompt: str = "") -> CodingSpec: fallback = spec_from_prompt(prompt) if isinstance(raw, CodingSpec): return raw files = raw.get("files") if isinstance(raw.get("files"), list) else fallback.files verification = raw.get("verification") if isinstance(raw.get("verification"), list) else fallback.verification safety_notes = raw.get("safety_notes") if isinstance(raw.get("safety_notes"), list) else fallback.safety_notes kind = clean_text(raw.get("artifact_kind"), fallback.artifact_kind) if kind not in {"rate_limiter", "sse_parser", "artifact_writer", "fleet_router", "typescript_utility"}: kind = fallback.artifact_kind return CodingSpec( title=clean_text(raw.get("title"), fallback.title), artifact_kind=kind, language=clean_text(raw.get("language"), fallback.language), files=[clean_text(item, "") for item in files if isinstance(item, str)] or fallback.files, verification=[clean_text(item, "") for item in verification if isinstance(item, str)] or fallback.verification, safety_notes=[clean_text(item, "") for item in safety_notes if isinstance(item, str)] or fallback.safety_notes, ) def render_rate_limiter() -> str: return """```ts export type BucketSnapshot = { key: string; capacity: number; tokens: number; updatedAtMs: number; }; export type Reservation = { ok: boolean; key: string; tokens: number; retryAfterMs: number; }; export class TokenBucketRateLimiter { private buckets = new Map(); constructor( private readonly capacity = 20, private readonly refillPerSecond = 1, private readonly now = () => Date.now(), ) {} reserve(key: string, tokens = 1): Reservation { if (!key.trim()) throw new Error("key is required"); if (tokens <= 0 || tokens > this.capacity) throw new Error("invalid token request"); const bucket = this.refill(key); if (bucket.tokens < tokens) { const missing = tokens - bucket.tokens; return { ok: false, key, tokens: 0, retryAfterMs: Math.ceil((missing / this.refillPerSecond) * 1000) }; } bucket.tokens -= tokens; return { ok: true, key, tokens, retryAfterMs: 0 }; } debit(reservation: Reservation): void { if (!reservation.ok) throw new Error("cannot debit failed reservation"); } refund(reservation: Reservation): void { if (!reservation.ok) return; const bucket = this.refill(reservation.key); bucket.tokens = Math.min(this.capacity, bucket.tokens + reservation.tokens); } snapshot(key: string): BucketSnapshot { return { ...this.refill(key) }; } private refill(key: string): BucketSnapshot { const nowMs = this.now(); const existing = this.buckets.get(key) ?? { key, capacity: this.capacity, tokens: this.capacity, updatedAtMs: nowMs }; const elapsedSeconds = Math.max(0, (nowMs - existing.updatedAtMs) / 1000); existing.tokens = Math.min(this.capacity, existing.tokens + elapsedSeconds * this.refillPerSecond); existing.updatedAtMs = nowMs; this.buckets.set(key, existing); return existing; } } ```""" def render_sse_parser() -> str: return """```ts export type StreamDelta = { type: "content"; content: string } | { type: "done" } | { type: "error"; error: string }; export function parseOpenAISse(input: string): StreamDelta[] { const events: StreamDelta[] = []; for (const rawLine of input.split(/\\r?\\n/)) { const line = rawLine.trim(); if (!line || line.startsWith(":")) continue; if (!line.startsWith("data:")) continue; const payload = line.slice("data:".length).trim(); if (payload === "[DONE]") { events.push({ type: "done" }); continue; } try { const parsed = JSON.parse(payload); const content = parsed?.choices?.[0]?.delta?.content; if (typeof content === "string" && content.length > 0) { events.push({ type: "content", content }); } } catch (error) { events.push({ type: "error", error: error instanceof Error ? error.message : "invalid JSON event" }); } } return events; } ```""" def render_artifact_writer() -> str: return """```ts import { mkdir, rename, writeFile } from "node:fs/promises"; import path from "node:path"; import crypto from "node:crypto"; export type ArtifactFile = { relativePath: string; contents: string }; export type ArtifactManifest = { root: string; files: string[]; writtenAt: string }; export async function writeArtifacts(root: string, files: ArtifactFile[]): Promise { const rootResolved = path.resolve(root); const written: string[] = []; for (const file of files) { const target = safeResolve(rootResolved, file.relativePath); await mkdir(path.dirname(target), { recursive: true }); const temp = `${target}.${crypto.randomUUID()}.tmp`; await writeFile(temp, file.contents, "utf8"); await rename(temp, target); written.push(path.relative(rootResolved, target)); } return { root: rootResolved, files: written.sort(), writtenAt: new Date().toISOString() }; } export function safeResolve(rootResolved: string, relativePath: string): string { if (!relativePath || path.isAbsolute(relativePath)) throw new Error("relative path required"); const target = path.resolve(rootResolved, relativePath); if (target !== rootResolved && !target.startsWith(rootResolved + path.sep)) { throw new Error("path traversal blocked"); } return target; } ```""" def render_fleet_router() -> str: return """```ts export type ModelNode = { id: string; publicName: string; url: string; priority: number; timeoutMs: number }; export type RouteEvent = { type: "status" | "fallback" | "error"; message: string; nodeId?: string }; export class ModelFleetRouter { private failures = new Map(); constructor(private readonly nodes: ModelNode[], private readonly now = () => Date.now()) {} async route(prompt: string, emit: (event: RouteEvent) => void, callNode: (node: ModelNode, prompt: string) => Promise): Promise { for (const node of [...this.nodes].sort((a, b) => a.priority - b.priority)) { if (this.circuitOpen(node.id)) { emit({ type: "fallback", nodeId: node.id, message: "Node temporarily unhealthy; trying next route." }); continue; } emit({ type: "status", nodeId: node.id, message: `Working through ${node.publicName}.` }); try { const result = await this.withTimeout(callNode(node, prompt), node.timeoutMs); this.failures.delete(node.id); return result; } catch (error) { this.recordFailure(node.id); emit({ type: "error", nodeId: node.id, message: error instanceof Error ? error.message : "route failed" }); } } throw new Error("All model routes failed"); } private circuitOpen(nodeId: string): boolean { return (this.failures.get(nodeId)?.openedUntil ?? 0) > this.now(); } private recordFailure(nodeId: string): void { const current = this.failures.get(nodeId) ?? { count: 0, openedUntil: 0 }; current.count += 1; if (current.count >= 3) current.openedUntil = this.now() + 30_000; this.failures.set(nodeId, current); } private withTimeout(promise: Promise, timeoutMs: number): Promise { return Promise.race([ promise, new Promise((_, reject) => setTimeout(() => reject(new Error("model route timed out")), timeoutMs)), ]); } } ```""" def code_for_kind(kind: str) -> str: if kind == "rate_limiter": return render_rate_limiter() if kind == "sse_parser": return render_sse_parser() if kind == "artifact_writer": return render_artifact_writer() if kind == "fleet_router": return render_fleet_router() return render_rate_limiter() def render_tests(spec: CodingSpec) -> str: name = spec.files[-1] if spec.files else "tests/index.test.ts" if spec.artifact_kind == "sse_parser": body = """```ts import { describe, expect, it } from "vitest"; import { parseOpenAISse } from "../src/sse-parser"; describe("parseOpenAISse", () => { it("returns content and done events", () => { const events = parseOpenAISse('data: {"choices":[{"delta":{"content":"hi"}}]}\\n\\ndata: [DONE]\\n\\n'); expect(events).toEqual([{ type: "content", content: "hi" }, { type: "done" }]); }); it("reports malformed JSON without dropping the stream", () => { expect(parseOpenAISse("data: {bad}\\n")[0].type).toBe("error"); }); }); ```""" elif spec.artifact_kind == "artifact_writer": body = """```ts import { mkdtemp, readFile } from "node:fs/promises"; import { tmpdir } from "node:os"; import path from "node:path"; import { describe, expect, it } from "vitest"; import { safeResolve, writeArtifacts } from "../src/artifact-writer"; describe("writeArtifacts", () => { it("blocks path traversal", () => { expect(() => safeResolve("/tmp/workspace", "../secret.txt")).toThrow("path traversal blocked"); }); it("writes files under the workspace", async () => { const root = await mkdtemp(path.join(tmpdir(), "kaiju-")); const manifest = await writeArtifacts(root, [{ relativePath: "index.html", contents: "

ok

" }]); expect(await readFile(path.join(root, "index.html"), "utf8")).toContain("ok"); expect(manifest.files).toEqual(["index.html"]); }); }); ```""" elif spec.artifact_kind == "fleet_router": body = """```ts import { describe, expect, it } from "vitest"; import { ModelFleetRouter } from "../src/fleet-router"; describe("ModelFleetRouter", () => { it("falls back after a failed primary route", async () => { const events: unknown[] = []; const router = new ModelFleetRouter([ { id: "goku", publicName: "Arianna", url: "hidden", priority: 1, timeoutMs: 50 }, { id: "gojira-a", publicName: "Arianna", url: "hidden", priority: 2, timeoutMs: 50 }, ]); const result = await router.route("build", (event) => events.push(event), async (node) => { if (node.id === "goku") throw new Error("down"); return "ok"; }); expect(result).toBe("ok"); expect(events.length).toBeGreaterThan(1); }); }); ```""" else: body = """```ts import { describe, expect, it } from "vitest"; import { TokenBucketRateLimiter } from "../src/rate-limit"; describe("TokenBucketRateLimiter", () => { it("reserves, debits, and refunds tokens", () => { const limiter = new TokenBucketRateLimiter(2, 1, () => 1_000); const reservation = limiter.reserve("user-1", 2); expect(reservation.ok).toBe(true); expect(limiter.reserve("user-1").ok).toBe(false); limiter.refund(reservation); expect(limiter.reserve("user-1").ok).toBe(true); }); }); ```""" return f"### {name}\n\n{body}" def render_markdown(spec: CodingSpec, prompt: str) -> str: return f"""# {spec.title} This is an implementation-ready {spec.language} answer, not a plan. It includes file structure, code, tests, state/config notes, safety rules, and verification. ## File Structure {chr(10).join(f"- `{file}`" for file in spec.files)} ## Implementation ### {spec.files[0] if spec.files else "src/index.ts"} {code_for_kind(spec.artifact_kind)} ## Tests {render_tests(spec)} ## State And Config - State is explicit and scoped to the caller key, workspace, stream, or model route. - Config should come from environment variables or constructor arguments, never hardcoded provider secrets. - Persisted state should be written through a controlled storage layer when this moves from in-memory tests to production. ## Safety {chr(10).join(f"- {note}" for note in spec.safety_notes)} - Preserve unrelated user files and validate inputs before any destructive action. ## Verification {chr(10).join(f"- `{step}`" for step in spec.verification)} - Add one smoke test for the exact customer flow before shipping. ## Fit For The Original Request The request was: {prompt.strip()} """ def validate_markdown(markdown: str, spec: CodingSpec) -> list[str]: errors: list[str] = [] lower = markdown.lower() if not markdown.lstrip().startswith("# "): errors.append("coding artifact missing markdown title") if "```ts" not in markdown: errors.append("coding artifact missing TypeScript code block") if "describe(" not in markdown or "expect(" not in markdown: errors.append("coding artifact missing tests") if "state" not in lower or "config" not in lower: errors.append("coding artifact missing state/config notes") if "safety" not in lower or ("verify" not in lower and "verification" not in lower): errors.append("coding artifact missing safety or verification") for token in FORBIDDEN_TOKENS: if token.lower() in lower: errors.append(f"forbidden token found: {token}") return errors def render_from_prompt(prompt: str) -> tuple[CodingSpec, str, list[str]]: spec = spec_from_prompt(prompt) markdown = render_markdown(spec, prompt) return spec, markdown, validate_markdown(markdown, spec) def write_markdown(path: Path, markdown: str) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(markdown, encoding="utf-8")