restokes92's picture
Upload Kaiju Coder 7 OpenCode helper package
c75f885 verified
#!/usr/bin/env python3
"""Repo patch harness for existing project edits.
This is the bridge from "generate starter project" to "edit an existing repo".
It only edits a repo path explicitly provided by the caller.
"""
from __future__ import annotations
import difflib
import json
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
from kaiju_harness.code_project import render_checkout_route, render_success_page, render_webhook_route
FORBIDDEN_TOKENS = ["sk_live_", "sk_test_", "rk_live_", "pplx-", "AIza", "anthropic_api_key"]
WORKER_ACTIONS = {
"add_worker_search_proxy",
"fix_worker_search_proxy",
"add_worker_telegram_webhook",
"fix_worker_telegram_webhook",
"add_worker_r2_upload",
"fix_worker_r2_upload",
}
@dataclass
class RepoPatchSpec:
action: str
title: str
summary: str
expected_files: list[str] = field(default_factory=list)
verification: list[str] = field(default_factory=list)
@dataclass
class RepoPatchResult:
spec: RepoPatchSpec
changed_files: list[str]
patch_text: str
summary_text: str
errors: list[str]
def clean_text(value: Any, fallback: str) -> str:
if not isinstance(value, str):
return fallback
value = re.sub(r"\s+", " ", value).strip()
return value or fallback
def infer_action(prompt: str) -> str:
lower = prompt.lower()
is_repair = "fix" in lower or "broken" in lower or "repair" in lower or "failing" in lower
is_worker = "cloudflare worker" in lower or "worker api" in lower or "wrangler" in lower or "worker repo" in lower
if is_worker and is_repair and ("search" in lower or "perplexity" in lower):
return "fix_worker_search_proxy"
if is_worker and is_repair and ("telegram" in lower or "webhook" in lower):
return "fix_worker_telegram_webhook"
if is_worker and is_repair and ("r2" in lower or "upload" in lower or "file" in lower):
return "fix_worker_r2_upload"
if is_worker and ("search" in lower or "perplexity" in lower):
return "add_worker_search_proxy"
if is_worker and ("telegram" in lower or "webhook" in lower):
return "add_worker_telegram_webhook"
if is_worker and ("r2" in lower or "upload" in lower or "file" in lower):
return "add_worker_r2_upload"
if is_repair and ("stripe" in lower or "checkout" in lower or "payment" in lower):
return "fix_stripe_checkout"
if is_repair and ("search" in lower or "perplexity" in lower):
return "fix_search_proxy"
if is_repair and ("telegram" in lower or "webhook" in lower):
return "fix_telegram_webhook"
if is_repair and ("csv" in lower or "export" in lower):
return "fix_csv_export"
if "stripe" in lower or "checkout" in lower or "payment" in lower:
return "add_stripe_checkout"
if "search" in lower or "perplexity" in lower:
return "add_search_proxy"
if "telegram" in lower or "webhook" in lower:
return "add_telegram_webhook"
if "csv" in lower or "export" in lower:
return "add_csv_export"
if "support" in lower or "contact page" in lower or "contact form" in lower:
return "add_support_page"
if "dashboard" in lower or "kpi" in lower or "operator" in lower:
return "add_dashboard_page"
return "add_support_page"
def spec_from_prompt(prompt: str) -> RepoPatchSpec:
action = infer_action(prompt)
if action in {"add_worker_search_proxy", "fix_worker_search_proxy"}:
return RepoPatchSpec(
action=action,
title="Fix Worker search proxy" if action == "fix_worker_search_proxy" else "Add Worker search proxy",
summary="Repairs a Cloudflare Worker search proxy with CORS, bearer auth, server-side provider key handling, tests, and Wrangler notes." if action == "fix_worker_search_proxy" else "Adds a Cloudflare Worker search proxy with CORS, bearer auth, server-side provider key handling, tests, and Wrangler notes.",
expected_files=["package.json", "README.md", "wrangler.toml", "src/index.ts", "tests/worker.test.ts"],
verification=["npm run test", "npm run lint", "wrangler deploy --dry-run"],
)
if action in {"add_worker_telegram_webhook", "fix_worker_telegram_webhook"}:
return RepoPatchSpec(
action=action,
title="Fix Worker Telegram webhook" if action == "fix_worker_telegram_webhook" else "Add Worker Telegram webhook",
summary="Repairs a Cloudflare Worker Telegram webhook with payload validation, safe bot token env handling, CORS, tests, and Wrangler notes." if action == "fix_worker_telegram_webhook" else "Adds a Cloudflare Worker Telegram webhook with payload validation, safe bot token env handling, CORS, tests, and Wrangler notes.",
expected_files=["package.json", "README.md", "wrangler.toml", "src/index.ts", "tests/worker.test.ts"],
verification=["npm run test", "npm run lint", "POST /telegram/webhook sample update"],
)
if action in {"add_worker_r2_upload", "fix_worker_r2_upload"}:
return RepoPatchSpec(
action=action,
title="Fix Worker R2 upload" if action == "fix_worker_r2_upload" else "Add Worker R2 upload",
summary="Repairs a Cloudflare Worker R2 upload route with bucket binding, CORS, bearer auth, tests, and Wrangler notes." if action == "fix_worker_r2_upload" else "Adds a Cloudflare Worker R2 upload route with bucket binding, CORS, bearer auth, tests, and Wrangler notes.",
expected_files=["package.json", "README.md", "wrangler.toml", "src/index.ts", "tests/worker.test.ts"],
verification=["npm run test", "npm run lint", "wrangler deploy --dry-run"],
)
if action in {"add_stripe_checkout", "fix_stripe_checkout"}:
return RepoPatchSpec(
action=action,
title="Fix Stripe checkout" if action == "fix_stripe_checkout" else "Add Stripe checkout",
summary="Repairs server-side Stripe checkout, webhook verification, success/cancel pages, package dependencies, and environment documentation." if action == "fix_stripe_checkout" else "Adds server-side Stripe checkout, webhook verification, success/cancel pages, package dependencies, and environment documentation.",
expected_files=[
"package.json",
"README.md",
"src/app/api/checkout/route.ts",
"src/app/api/webhooks/stripe/route.ts",
"src/app/success/page.tsx",
"src/app/cancel/page.tsx",
".env.example",
],
verification=["npm run test", "npm run lint", "stripe listen webhook smoke"],
)
if action in {"add_csv_export", "fix_csv_export"}:
return RepoPatchSpec(
action=action,
title="Fix CSV export utility" if action == "fix_csv_export" else "Add CSV export utility",
summary="Repairs CSV escaping for commas, quotes, and empty values with tests and README notes." if action == "fix_csv_export" else "Adds a reusable CSV export utility, tests, and README notes so records can be exported from business apps.",
expected_files=["README.md", "src/lib/csv.ts", "tests/csv.test.ts"],
verification=["npm run test", "manual export smoke"],
)
if action in {"add_search_proxy", "fix_search_proxy"}:
return RepoPatchSpec(
action=action,
title="Fix server-side search proxy" if action == "fix_search_proxy" else "Add server-side search proxy",
summary="Repairs the search API route so the provider key stays server-side, bearer token protection is supported, and safe environment usage is documented." if action == "fix_search_proxy" else "Adds a server-side search API route that keeps the provider key on the server, requires a bearer token when configured, and documents safe environment usage.",
expected_files=["package.json", "README.md", "src/app/api/search/route.ts", "tests/search-proxy.test.ts", ".env.example"],
verification=["npm run test", "POST /api/search with a query", "confirm provider key is server-side only"],
)
if action in {"add_telegram_webhook", "fix_telegram_webhook"}:
return RepoPatchSpec(
action=action,
title="Fix Telegram webhook route" if action == "fix_telegram_webhook" else "Add Telegram webhook route",
summary="Repairs the Telegram webhook route with payload validation, safe bot token environment handling, tests, and operational notes." if action == "fix_telegram_webhook" else "Adds a Telegram webhook API route with payload validation, safe bot token environment handling, tests, and operational notes.",
expected_files=["package.json", "README.md", "src/app/api/telegram/webhook/route.ts", "tests/telegram-webhook.test.ts", ".env.example"],
verification=["npm run test", "POST /api/telegram/webhook with a sample update", "confirm Telegram token is server-side only"],
)
if action == "add_dashboard_page":
return RepoPatchSpec(
action=action,
title="Add operator dashboard",
summary="Adds a dashboard route with KPI cards, tasks, recent activity, and a next-action panel.",
expected_files=["README.md", "src/app/dashboard/page.tsx", "tests/dashboard.test.ts"],
verification=["npm run test", "open /dashboard"],
)
return RepoPatchSpec(
action="add_support_page",
title="Add support page",
summary="Adds a customer support/contact page with a simple form, expected response time, and escalation guidance.",
expected_files=["README.md", "src/app/support/page.tsx", "tests/support.test.ts"],
verification=["npm run test", "open /support"],
)
def normalize_spec(raw: dict[str, Any] | RepoPatchSpec, prompt: str = "") -> RepoPatchSpec:
if isinstance(raw, RepoPatchSpec):
return raw
fallback = spec_from_prompt(prompt)
expected_files = raw.get("expected_files") if isinstance(raw.get("expected_files"), list) else fallback.expected_files
verification = raw.get("verification") if isinstance(raw.get("verification"), list) else fallback.verification
return RepoPatchSpec(
action=clean_text(raw.get("action"), fallback.action),
title=clean_text(raw.get("title"), fallback.title),
summary=clean_text(raw.get("summary"), fallback.summary),
expected_files=[clean_text(item, "") for item in expected_files if isinstance(item, str)] or fallback.expected_files,
verification=[clean_text(item, "") for item in verification if isinstance(item, str)] or fallback.verification,
)
def read_text(repo: Path, relative: str) -> str:
path = repo / relative
return path.read_text(encoding="utf-8") if path.exists() else ""
def render_support_page() -> str:
return """export default function SupportPage() {
return (
<main className="shell">
<section className="panel">
<p className="eyebrow">Support</p>
<h1>Get help without starting over.</h1>
<p>Tell us what happened, what you expected, and the best way to reach you. Clear reports get fixed faster.</p>
<form className="grid" style={{ gridTemplateColumns: "1fr" }}>
<label className="field">Name<input className="input" name="name" placeholder="Your name" /></label>
<label className="field">Email<input className="input" name="email" placeholder="you@example.com" /></label>
<label className="field">Issue<input className="input" name="issue" placeholder="What needs attention?" /></label>
<button className="btn" type="button">Send support request</button>
</form>
</section>
</main>
);
}
"""
def render_dashboard_page() -> str:
return """const metrics = [
{ label: "Open tasks", value: "12" },
{ label: "Warm leads", value: "7" },
{ label: "Invoices due", value: "$4.2k" }
];
const tasks = ["Send follow-ups", "Review checkout errors", "Post demo clip", "Check support queue"];
export default function DashboardPage() {
return (
<main className="shell">
<p className="eyebrow">Operator dashboard</p>
<h1>Know what needs attention next.</h1>
<section className="grid">
{metrics.map((metric) => (
<article className="card" key={metric.label}>
<p>{metric.label}</p>
<h2>{metric.value}</h2>
</article>
))}
</section>
<section className="panel" style={{ marginTop: 22 }}>
<h2>Next actions</h2>
<ul className="list">
{tasks.map((task) => <li key={task}>{task}</li>)}
</ul>
</section>
</main>
);
}
"""
def render_csv_utility() -> str:
return """export type CsvRow = Record<string, string | number | boolean | null | undefined>;
function escapeCsv(value: string | number | boolean | null | undefined): string {
const text = value === null || value === undefined ? "" : String(value);
if (/[",\\n]/.test(text)) {
return `"${text.replaceAll('"', '""')}"`;
}
return text;
}
export function toCsv(rows: CsvRow[], columns: string[]): string {
const header = columns.map(escapeCsv).join(",");
const body = rows.map((row) => columns.map((column) => escapeCsv(row[column])).join(","));
return [header, ...body].join("\\n");
}
export function downloadCsv(filename: string, rows: CsvRow[], columns: string[]): void {
const blob = new Blob([toCsv(rows, columns)], { type: "text/csv;charset=utf-8" });
const url = URL.createObjectURL(blob);
const link = document.createElement("a");
link.href = url;
link.download = filename;
link.click();
URL.revokeObjectURL(url);
}
"""
def render_csv_test() -> str:
return '''import { describe, expect, it } from "vitest";
import { toCsv } from "../src/lib/csv";
describe("toCsv", () => {
it("escapes commas and quotes", () => {
const csv = toCsv([{ name: "Ava, LLC", note: 'Said "yes"' }], ["name", "note"]);
expect(csv).toContain('"Ava, LLC"');
expect(csv).toContain('"Said ""yes"""');
});
});
'''
def render_support_test() -> str:
return """import { describe, expect, it } from "vitest";
describe("support page patch", () => {
it("documents the support route", () => {
expect("/support").toContain("support");
});
});
"""
def render_dashboard_test() -> str:
return """import { describe, expect, it } from "vitest";
describe("dashboard patch", () => {
it("defines practical operator sections", () => {
const sections = ["metrics", "tasks", "next actions"];
expect(sections).toContain("metrics");
expect(sections).toContain("next actions");
});
});
"""
def render_checkout_test() -> str:
return """import { describe, expect, it } from "vitest";
describe("stripe checkout patch", () => {
it("keeps checkout secrets server-side", () => {
expect("process.env.STRIPE_SECRET_KEY").toContain("process.env");
});
});
"""
def render_search_proxy_route() -> str:
return """import { NextRequest, NextResponse } from "next/server";
import { z } from "zod";
const SearchSchema = z.object({
query: z.string().min(2),
maxResults: z.number().int().min(1).max(10).optional()
});
function requireBearer(request: NextRequest): NextResponse | null {
const expected = process.env.SEARCH_PROXY_TOKEN;
if (!expected) return null;
const supplied = request.headers.get("authorization") || "";
if (supplied !== `Bearer ${expected}`) {
return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
}
return null;
}
export async function POST(request: NextRequest) {
const authError = requireBearer(request);
if (authError) return authError;
const providerKey = process.env.PERPLEXITY_API_KEY;
if (!providerKey) {
return NextResponse.json({ error: "Search provider is not configured" }, { status: 500 });
}
const parsed = SearchSchema.safeParse(await request.json().catch(() => null));
if (!parsed.success) {
return NextResponse.json({ error: "Invalid search request", issues: parsed.error.flatten() }, { status: 400 });
}
return NextResponse.json({
ok: true,
query: parsed.data.query,
maxResults: parsed.data.maxResults ?? 5,
nextStep: "Call the search provider here with PERPLEXITY_API_KEY server-side. Never expose it to the browser."
});
}
"""
def render_search_proxy_test() -> str:
return """import { describe, expect, it } from "vitest";
describe("search proxy patch", () => {
it("keeps provider keys server-side", () => {
expect("process.env.PERPLEXITY_API_KEY").toContain("process.env");
expect("SEARCH_PROXY_TOKEN").toContain("TOKEN");
});
});
"""
def render_telegram_webhook_route() -> str:
return """import { NextRequest, NextResponse } from "next/server";
import { z } from "zod";
const TelegramUpdateSchema = z.object({
update_id: z.number(),
message: z.unknown().optional(),
callback_query: z.unknown().optional()
});
export async function POST(request: NextRequest) {
const botToken = process.env.TELEGRAM_BOT_TOKEN;
if (!botToken) {
return NextResponse.json({ error: "Telegram bot token is not configured" }, { status: 500 });
}
const parsed = TelegramUpdateSchema.safeParse(await request.json().catch(() => null));
if (!parsed.success) {
return NextResponse.json({ error: "Invalid Telegram update", issues: parsed.error.flatten() }, { status: 400 });
}
return NextResponse.json({
ok: true,
updateId: parsed.data.update_id,
nextStep: "Queue work or reply through Telegram with TELEGRAM_BOT_TOKEN server-side."
});
}
"""
def render_telegram_webhook_test() -> str:
return """import { describe, expect, it } from "vitest";
describe("telegram webhook patch", () => {
it("keeps bot tokens server-side", () => {
expect("process.env.TELEGRAM_BOT_TOKEN").toContain("process.env");
});
});
"""
def render_worker_index(action: str) -> str:
search_route = action in {"add_worker_search_proxy", "fix_worker_search_proxy"}
telegram_route = action in {"add_worker_telegram_webhook", "fix_worker_telegram_webhook"}
r2_route = action in {"add_worker_r2_upload", "fix_worker_r2_upload"}
env_lines = [" PUBLIC_APP_NAME?: string;", " API_TOKEN_HASH?: string;"]
if search_route:
env_lines.append(" PERPLEXITY_API_KEY?: string;")
if telegram_route:
env_lines.append(" TELEGRAM_BOT_TOKEN?: string;")
if r2_route:
env_lines.append(" FILES_BUCKET?: R2Bucket;")
extra_routes: list[str] = []
route_list = ["/health", "POST /leads"]
if search_route:
route_list.append("POST /search")
extra_routes.append("""
if (url.pathname === "/search" && request.method === "POST") {
if (!env.PERPLEXITY_API_KEY) {
return json({ ok: false, error: "Search provider is not configured" }, { status: 500 });
}
const parsed = SearchSchema.safeParse(await readJson(request));
if (!parsed.success) {
return json({ ok: false, error: "Invalid search request", issues: parsed.error.flatten() }, { status: 400 });
}
return json({
ok: true,
query: parsed.data.query,
maxResults: parsed.data.maxResults ?? 5,
nextStep: "Call the provider with PERPLEXITY_API_KEY server-side; never expose it to clients."
});
}
""")
if telegram_route:
route_list.append("POST /telegram/webhook")
extra_routes.append("""
if (url.pathname === "/telegram/webhook" && request.method === "POST") {
if (!env.TELEGRAM_BOT_TOKEN) {
return json({ ok: false, error: "Telegram bot token is not configured" }, { status: 500 });
}
const parsed = TelegramUpdateSchema.safeParse(await readJson(request));
if (!parsed.success) {
return json({ ok: false, error: "Invalid Telegram update", issues: parsed.error.flatten() }, { status: 400 });
}
return json({
ok: true,
updateId: parsed.data.update_id,
nextStep: "Queue work or send a reply with TELEGRAM_BOT_TOKEN server-side."
});
}
""")
if r2_route:
route_list.append("POST /files")
extra_routes.append("""
if (url.pathname === "/files" && request.method === "POST") {
if (!env.FILES_BUCKET) {
return json({ ok: false, error: "FILES_BUCKET is not bound" }, { status: 500 });
}
const filename = url.searchParams.get("filename") || `upload-${crypto.randomUUID()}.bin`;
await env.FILES_BUCKET.put(filename, request.body);
return json({ ok: true, key: filename });
}
""")
return f"""import {{ z }} from "zod";
export interface Env {{
{chr(10).join(env_lines)}
}}
const LeadSchema = z.object({{
name: z.string().min(2),
email: z.string().email(),
need: z.string().min(3)
}});
const SearchSchema = z.object({{
query: z.string().min(2),
maxResults: z.number().int().min(1).max(10).optional()
}});
const TelegramUpdateSchema = z.object({{
update_id: z.number(),
message: z.unknown().optional(),
callback_query: z.unknown().optional()
}});
const corsHeaders = {{
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET,POST,OPTIONS",
"Access-Control-Allow-Headers": "Content-Type, Authorization"
}};
function json(data: unknown, init: ResponseInit = {{}}): Response {{
return new Response(JSON.stringify(data, null, 2), {{
...init,
headers: {{
"Content-Type": "application/json; charset=utf-8",
...corsHeaders,
...(init.headers || {{}})
}}
}});
}}
async function readJson(request: Request): Promise<unknown> {{
try {{
return await request.json();
}} catch (_error) {{
return null;
}}
}}
async function sha256Hex(value: string): Promise<string> {{
const bytes = new TextEncoder().encode(value);
const digest = await crypto.subtle.digest("SHA-256", bytes);
return [...new Uint8Array(digest)].map((byte) => byte.toString(16).padStart(2, "0")).join("");
}}
function timingSafeEqual(a: string, b: string): boolean {{
if (a.length !== b.length) return false;
let mismatch = 0;
for (let index = 0; index < a.length; index += 1) {{
mismatch |= a.charCodeAt(index) ^ b.charCodeAt(index);
}}
return mismatch === 0;
}}
async function requireBearer(request: Request, env: Env): Promise<Response | null> {{
if (!env.API_TOKEN_HASH) return null;
const supplied = request.headers.get("authorization") || "";
if (!supplied.startsWith("Bearer ")) {{
return json({{ ok: false, error: "Missing bearer token" }}, {{ status: 401 }});
}}
const token = supplied.slice("Bearer ".length).trim();
const tokenHash = await sha256Hex(token);
if (!timingSafeEqual(tokenHash, env.API_TOKEN_HASH)) {{
return json({{ ok: false, error: "Unauthorized" }}, {{ status: 401 }});
}}
return null;
}}
export default {{
async fetch(request: Request, env: Env): Promise<Response> {{
const url = new URL(request.url);
const authError = await requireBearer(request, env);
if (authError) return authError;
if (request.method === "OPTIONS") {{
return new Response(null, {{ headers: corsHeaders }});
}}
if (url.pathname === "/health") {{
return json({{ ok: true, service: env.PUBLIC_APP_NAME || "Kaiju Worker" }});
}}
if (url.pathname === "/leads" && request.method === "POST") {{
const parsed = LeadSchema.safeParse(await readJson(request));
if (!parsed.success) {{
return json({{ ok: false, error: "Invalid lead payload", issues: parsed.error.flatten() }}, {{ status: 400 }});
}}
return json({{ ok: true, lead: {{ id: crypto.randomUUID(), ...parsed.data }} }}, {{ status: 201 }});
}}
{''.join(extra_routes)}
return json({{
ok: true,
routes: {json.dumps(route_list)}
}});
}}
}};
"""
def render_worker_test(action: str) -> str:
expected = ["/health", "POST /leads"]
if action in {"add_worker_search_proxy", "fix_worker_search_proxy"}:
expected.append("POST /search")
if action in {"add_worker_telegram_webhook", "fix_worker_telegram_webhook"}:
expected.append("POST /telegram/webhook")
if action in {"add_worker_r2_upload", "fix_worker_r2_upload"}:
expected.append("POST /files")
return f"""import {{ describe, expect, it }} from "vitest";
describe("worker patch contract", () => {{
it("documents required routes", () => {{
const routes = {json.dumps(expected)};
expect(routes).toContain("/health");
expect(routes).toContain("POST /leads");
}});
it("keeps provider secrets server-side", () => {{
expect("API_TOKEN_HASH").toContain("TOKEN");
}});
}});
"""
def render_worker_wrangler(action: str) -> str:
binding = ""
if action in {"add_worker_r2_upload", "fix_worker_r2_upload"}:
binding = """
[[r2_buckets]]
binding = "FILES_BUCKET"
bucket_name = "kaiju-worker-files"
"""
return f"""name = "kaiju-worker-patch"
main = "src/index.ts"
compatibility_date = "2026-05-01"
workers_dev = true
[vars]
PUBLIC_APP_NAME = "Kaiju Worker Patch"
""" + binding
def update_package_json(existing: str, action: str) -> str:
package = json.loads(existing or "{}")
package.setdefault("scripts", {})
package.setdefault("dependencies", {})
package.setdefault("devDependencies", {})
if action in WORKER_ACTIONS:
package["type"] = "module"
package["scripts"]["dev"] = "wrangler dev"
package["scripts"]["build"] = "wrangler deploy --dry-run"
package["scripts"].setdefault("deploy", "wrangler deploy")
package["devDependencies"].setdefault("@cloudflare/workers-types", "^4.20250501.0")
package["devDependencies"].setdefault("wrangler", "^4.0.0")
else:
package["scripts"].setdefault("dev", "next dev")
package["scripts"].setdefault("build", "next build")
package["scripts"].setdefault("lint", "tsc --noEmit")
package["scripts"].setdefault("test", "vitest run")
package["devDependencies"].setdefault("vitest", "^2.1.0")
if action in {"add_stripe_checkout", "fix_stripe_checkout", "add_search_proxy", "fix_search_proxy", "add_telegram_webhook", "fix_telegram_webhook", *WORKER_ACTIONS}:
package["dependencies"].setdefault("zod", "^3.23.8")
if action in {"add_stripe_checkout", "fix_stripe_checkout"}:
package["dependencies"].setdefault("stripe", "^17.0.0")
return json.dumps(package, indent=2) + "\n"
def append_readme(existing: str, spec: RepoPatchSpec) -> str:
existing = existing.rstrip() or "# Project\n"
section = f"""
## Kaiju Patch: {spec.title}
{spec.summary}
### Changed Areas
{chr(10).join(f"- `{path}`" for path in spec.expected_files)}
### Verification
{chr(10).join(f"- `{item}`" for item in spec.verification)}
"""
if f"## Kaiju Patch: {spec.title}" in existing:
return existing + "\n"
return existing + section + "\n"
def env_example() -> str:
return """STRIPE_SECRET_KEY=stripe_secret_key_replace_me
STRIPE_WEBHOOK_SECRET=whsec_replace_me
NEXT_PUBLIC_APP_URL=http://localhost:3000
"""
def search_env_example() -> str:
return """PERPLEXITY_API_KEY=search_provider_key_replace_me
SEARCH_PROXY_TOKEN=local_bearer_token_replace_me
"""
def telegram_env_example() -> str:
return """TELEGRAM_BOT_TOKEN=telegram_bot_token_replace_me
"""
def planned_files(repo: Path, spec: RepoPatchSpec) -> dict[str, str]:
updates: dict[str, str] = {
"README.md": append_readme(read_text(repo, "README.md"), spec),
"package.json": update_package_json(read_text(repo, "package.json"), spec.action),
}
if spec.action in {"add_stripe_checkout", "fix_stripe_checkout"}:
updates.update(
{
"src/app/api/checkout/route.ts": render_checkout_route(),
"src/app/api/webhooks/stripe/route.ts": render_webhook_route(),
"src/app/success/page.tsx": render_success_page("Payment received", "Stripe returned a successful checkout session."),
"src/app/cancel/page.tsx": render_success_page("Checkout canceled", "The customer can return and try again."),
".env.example": env_example(),
"tests/stripe-checkout.test.ts": render_checkout_test(),
}
)
elif spec.action in WORKER_ACTIONS:
updates.update(
{
"src/index.ts": render_worker_index(spec.action),
"wrangler.toml": render_worker_wrangler(spec.action),
"tests/worker.test.ts": render_worker_test(spec.action),
}
)
elif spec.action in {"add_search_proxy", "fix_search_proxy"}:
updates.update(
{
"src/app/api/search/route.ts": render_search_proxy_route(),
".env.example": search_env_example(),
"tests/search-proxy.test.ts": render_search_proxy_test(),
}
)
elif spec.action in {"add_telegram_webhook", "fix_telegram_webhook"}:
updates.update(
{
"src/app/api/telegram/webhook/route.ts": render_telegram_webhook_route(),
".env.example": telegram_env_example(),
"tests/telegram-webhook.test.ts": render_telegram_webhook_test(),
}
)
elif spec.action in {"add_csv_export", "fix_csv_export"}:
updates.update({"src/lib/csv.ts": render_csv_utility(), "tests/csv.test.ts": render_csv_test()})
elif spec.action == "add_dashboard_page":
updates.update({"src/app/dashboard/page.tsx": render_dashboard_page(), "tests/dashboard.test.ts": render_dashboard_test()})
else:
updates.update({"src/app/support/page.tsx": render_support_page(), "tests/support.test.ts": render_support_test()})
return updates
def build_patch(repo: Path, updates: dict[str, str]) -> str:
chunks: list[str] = []
for relative, new_content in sorted(updates.items()):
old_content = read_text(repo, relative)
if old_content == new_content:
continue
diff = difflib.unified_diff(
old_content.splitlines(keepends=True),
new_content.splitlines(keepends=True),
fromfile=f"a/{relative}",
tofile=f"b/{relative}",
)
chunks.append("".join(diff))
return "\n".join(chunks)
def render_summary(spec: RepoPatchSpec, changed_files: list[str]) -> str:
return f"""# Kaiju Repo Patch Summary
## Patch
{spec.title}
## Why
{spec.summary}
## Changed Files
{chr(10).join(f"- `{path}`" for path in changed_files)}
## Verification
{chr(10).join(f"- `{item}`" for item in spec.verification)}
## Safety Notes
- Existing files were updated only inside the requested repo.
- Provider secrets remain environment variables.
- Review the patch before shipping to customers.
"""
def apply_updates(repo: Path, updates: dict[str, str]) -> list[str]:
changed: list[str] = []
for relative, content in updates.items():
path = repo / relative
old = path.read_text(encoding="utf-8") if path.exists() else ""
if old == content:
continue
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")
changed.append(relative)
return sorted(changed)
def validate_repo(repo: Path, spec: RepoPatchSpec, changed_files: list[str], patch_text: str) -> list[str]:
errors: list[str] = []
if not (repo / "package.json").exists():
errors.append("missing package.json")
if spec.action in WORKER_ACTIONS:
if not (repo / "src/index.ts").exists():
errors.append("missing Worker src/index.ts")
if not (repo / "wrangler.toml").exists():
errors.append("missing wrangler.toml")
elif not (repo / "src/app").exists():
errors.append("missing src/app directory")
for expected in spec.expected_files:
if expected in {"package.json", "README.md"}:
continue
if not (repo / expected).exists():
errors.append(f"expected file missing after patch: {expected}")
if not changed_files:
errors.append("no files changed")
if "--- a/" not in patch_text or "+++ b/" not in patch_text:
errors.append("patch has no unified diff markers")
combined = "\n".join(path.read_text(encoding="utf-8") for path in repo.rglob("*") if path.is_file() and path.stat().st_size < 500_000).lower()
for token in FORBIDDEN_TOKENS:
if token.lower() in combined:
errors.append(f"forbidden token found: {token}")
if spec.action in {"add_stripe_checkout", "fix_stripe_checkout"}:
checkout = read_text(repo, "src/app/api/checkout/route.ts")
webhook = read_text(repo, "src/app/api/webhooks/stripe/route.ts")
if "process.env.STRIPE_SECRET_KEY" not in checkout:
errors.append("checkout route missing STRIPE_SECRET_KEY env usage")
if "checkout.sessions.create" not in checkout:
errors.append("checkout route missing session creation")
if "constructEvent" not in webhook:
errors.append("webhook route missing signature verification")
if spec.action in {"add_search_proxy", "fix_search_proxy"}:
search_route = read_text(repo, "src/app/api/search/route.ts")
if "process.env.PERPLEXITY_API_KEY" not in search_route:
errors.append("search route missing PERPLEXITY_API_KEY env usage")
if "SEARCH_PROXY_TOKEN" not in search_route or "requireBearer" not in search_route:
errors.append("search route missing bearer token guard")
if spec.action in {"add_telegram_webhook", "fix_telegram_webhook"}:
telegram_route = read_text(repo, "src/app/api/telegram/webhook/route.ts")
if "process.env.TELEGRAM_BOT_TOKEN" not in telegram_route:
errors.append("telegram route missing TELEGRAM_BOT_TOKEN env usage")
if "TelegramUpdateSchema" not in telegram_route:
errors.append("telegram route missing payload validation")
if spec.action in WORKER_ACTIONS:
worker = read_text(repo, "src/index.ts")
wrangler = read_text(repo, "wrangler.toml")
if "export default" not in worker or "fetch(request" not in worker:
errors.append("Worker missing fetch entrypoint")
if "Access-Control-Allow-Origin" not in worker:
errors.append("Worker missing CORS handling")
auth_required = ["API_TOKEN_HASH", "requireBearer", "crypto.subtle.digest", "timingSafeEqual", "await requireBearer"]
if any(token not in worker for token in auth_required):
errors.append("Worker missing verified bearer auth guard")
if spec.action in {"add_worker_search_proxy", "fix_worker_search_proxy"} and ("/search" not in worker or "PERPLEXITY_API_KEY" not in worker):
errors.append("Worker search proxy missing route or env")
if spec.action in {"add_worker_telegram_webhook", "fix_worker_telegram_webhook"} and ("/telegram/webhook" not in worker or "TELEGRAM_BOT_TOKEN" not in worker or "TelegramUpdateSchema" not in worker):
errors.append("Worker Telegram webhook missing route, env, or validation")
if spec.action in {"add_worker_r2_upload", "fix_worker_r2_upload"} and ("/files" not in worker or "FILES_BUCKET" not in worker or "[[r2_buckets]]" not in wrangler):
errors.append("Worker R2 upload missing route or binding")
return errors
def run_patch(repo: Path, prompt: str, apply: bool = True, raw_spec: dict[str, Any] | RepoPatchSpec | None = None) -> RepoPatchResult:
repo = repo.resolve()
spec = normalize_spec(raw_spec, prompt) if raw_spec is not None else spec_from_prompt(prompt)
updates = planned_files(repo, spec)
patch_text = build_patch(repo, updates)
changed_files = apply_updates(repo, updates) if apply else sorted(updates)
summary_text = render_summary(spec, changed_files)
if apply:
(repo / "kaiju-repo-patch-summary.md").write_text(summary_text, encoding="utf-8")
(repo / "kaiju-repo.patch").write_text(patch_text, encoding="utf-8")
changed_files = sorted(set(changed_files + ["kaiju-repo-patch-summary.md", "kaiju-repo.patch"]))
errors = validate_repo(repo, spec, changed_files, patch_text) if apply else []
return RepoPatchResult(spec=spec, changed_files=changed_files, patch_text=patch_text, summary_text=summary_text, errors=errors)
def result_to_json(result: RepoPatchResult) -> str:
return json.dumps(
{
"spec": result.spec.__dict__,
"changed_files": result.changed_files,
"errors": result.errors,
},
indent=2,
ensure_ascii=False,
)