#!/usr/bin/env python3 """Repo patch harness for existing project edits. This is the bridge from "generate starter project" to "edit an existing repo". It only edits a repo path explicitly provided by the caller. """ from __future__ import annotations import difflib import json import re from dataclasses import dataclass, field from pathlib import Path from typing import Any from kaiju_harness.code_project import render_checkout_route, render_success_page, render_webhook_route FORBIDDEN_TOKENS = ["sk_live_", "sk_test_", "rk_live_", "pplx-", "AIza", "anthropic_api_key"] WORKER_ACTIONS = { "add_worker_search_proxy", "fix_worker_search_proxy", "add_worker_telegram_webhook", "fix_worker_telegram_webhook", "add_worker_r2_upload", "fix_worker_r2_upload", } @dataclass class RepoPatchSpec: action: str title: str summary: str expected_files: list[str] = field(default_factory=list) verification: list[str] = field(default_factory=list) @dataclass class RepoPatchResult: spec: RepoPatchSpec changed_files: list[str] patch_text: str summary_text: str errors: list[str] def clean_text(value: Any, fallback: str) -> str: if not isinstance(value, str): return fallback value = re.sub(r"\s+", " ", value).strip() return value or fallback def infer_action(prompt: str) -> str: lower = prompt.lower() is_repair = "fix" in lower or "broken" in lower or "repair" in lower or "failing" in lower is_worker = "cloudflare worker" in lower or "worker api" in lower or "wrangler" in lower or "worker repo" in lower if is_worker and is_repair and ("search" in lower or "perplexity" in lower): return "fix_worker_search_proxy" if is_worker and is_repair and ("telegram" in lower or "webhook" in lower): return "fix_worker_telegram_webhook" if is_worker and is_repair and ("r2" in lower or "upload" in lower or "file" in lower): return "fix_worker_r2_upload" if is_worker and ("search" in lower or "perplexity" in lower): return "add_worker_search_proxy" if is_worker and ("telegram" in lower or "webhook" in lower): return "add_worker_telegram_webhook" if is_worker and ("r2" in lower or "upload" in lower or "file" in lower): return "add_worker_r2_upload" if is_repair and ("stripe" in lower or "checkout" in lower or "payment" in lower): return "fix_stripe_checkout" if is_repair and ("search" in lower or "perplexity" in lower): return "fix_search_proxy" if is_repair and ("telegram" in lower or "webhook" in lower): return "fix_telegram_webhook" if is_repair and ("csv" in lower or "export" in lower): return "fix_csv_export" if "stripe" in lower or "checkout" in lower or "payment" in lower: return "add_stripe_checkout" if "search" in lower or "perplexity" in lower: return "add_search_proxy" if "telegram" in lower or "webhook" in lower: return "add_telegram_webhook" if "csv" in lower or "export" in lower: return "add_csv_export" if "support" in lower or "contact page" in lower or "contact form" in lower: return "add_support_page" if "dashboard" in lower or "kpi" in lower or "operator" in lower: return "add_dashboard_page" return "add_support_page" def spec_from_prompt(prompt: str) -> RepoPatchSpec: action = infer_action(prompt) if action in {"add_worker_search_proxy", "fix_worker_search_proxy"}: return RepoPatchSpec( action=action, title="Fix Worker search proxy" if action == "fix_worker_search_proxy" else "Add Worker search proxy", summary="Repairs a Cloudflare Worker search proxy with CORS, bearer auth, server-side provider key handling, tests, and Wrangler notes." if action == "fix_worker_search_proxy" else "Adds a Cloudflare Worker search proxy with CORS, bearer auth, server-side provider key handling, tests, and Wrangler notes.", expected_files=["package.json", "README.md", "wrangler.toml", "src/index.ts", "tests/worker.test.ts"], verification=["npm run test", "npm run lint", "wrangler deploy --dry-run"], ) if action in {"add_worker_telegram_webhook", "fix_worker_telegram_webhook"}: return RepoPatchSpec( action=action, title="Fix Worker Telegram webhook" if action == "fix_worker_telegram_webhook" else "Add Worker Telegram webhook", summary="Repairs a Cloudflare Worker Telegram webhook with payload validation, safe bot token env handling, CORS, tests, and Wrangler notes." if action == "fix_worker_telegram_webhook" else "Adds a Cloudflare Worker Telegram webhook with payload validation, safe bot token env handling, CORS, tests, and Wrangler notes.", expected_files=["package.json", "README.md", "wrangler.toml", "src/index.ts", "tests/worker.test.ts"], verification=["npm run test", "npm run lint", "POST /telegram/webhook sample update"], ) if action in {"add_worker_r2_upload", "fix_worker_r2_upload"}: return RepoPatchSpec( action=action, title="Fix Worker R2 upload" if action == "fix_worker_r2_upload" else "Add Worker R2 upload", summary="Repairs a Cloudflare Worker R2 upload route with bucket binding, CORS, bearer auth, tests, and Wrangler notes." if action == "fix_worker_r2_upload" else "Adds a Cloudflare Worker R2 upload route with bucket binding, CORS, bearer auth, tests, and Wrangler notes.", expected_files=["package.json", "README.md", "wrangler.toml", "src/index.ts", "tests/worker.test.ts"], verification=["npm run test", "npm run lint", "wrangler deploy --dry-run"], ) if action in {"add_stripe_checkout", "fix_stripe_checkout"}: return RepoPatchSpec( action=action, title="Fix Stripe checkout" if action == "fix_stripe_checkout" else "Add Stripe checkout", summary="Repairs server-side Stripe checkout, webhook verification, success/cancel pages, package dependencies, and environment documentation." if action == "fix_stripe_checkout" else "Adds server-side Stripe checkout, webhook verification, success/cancel pages, package dependencies, and environment documentation.", expected_files=[ "package.json", "README.md", "src/app/api/checkout/route.ts", "src/app/api/webhooks/stripe/route.ts", "src/app/success/page.tsx", "src/app/cancel/page.tsx", ".env.example", ], verification=["npm run test", "npm run lint", "stripe listen webhook smoke"], ) if action in {"add_csv_export", "fix_csv_export"}: return RepoPatchSpec( action=action, title="Fix CSV export utility" if action == "fix_csv_export" else "Add CSV export utility", summary="Repairs CSV escaping for commas, quotes, and empty values with tests and README notes." if action == "fix_csv_export" else "Adds a reusable CSV export utility, tests, and README notes so records can be exported from business apps.", expected_files=["README.md", "src/lib/csv.ts", "tests/csv.test.ts"], verification=["npm run test", "manual export smoke"], ) if action in {"add_search_proxy", "fix_search_proxy"}: return RepoPatchSpec( action=action, title="Fix server-side search proxy" if action == "fix_search_proxy" else "Add server-side search proxy", summary="Repairs the search API route so the provider key stays server-side, bearer token protection is supported, and safe environment usage is documented." if action == "fix_search_proxy" else "Adds a server-side search API route that keeps the provider key on the server, requires a bearer token when configured, and documents safe environment usage.", expected_files=["package.json", "README.md", "src/app/api/search/route.ts", "tests/search-proxy.test.ts", ".env.example"], verification=["npm run test", "POST /api/search with a query", "confirm provider key is server-side only"], ) if action in {"add_telegram_webhook", "fix_telegram_webhook"}: return RepoPatchSpec( action=action, title="Fix Telegram webhook route" if action == "fix_telegram_webhook" else "Add Telegram webhook route", summary="Repairs the Telegram webhook route with payload validation, safe bot token environment handling, tests, and operational notes." if action == "fix_telegram_webhook" else "Adds a Telegram webhook API route with payload validation, safe bot token environment handling, tests, and operational notes.", expected_files=["package.json", "README.md", "src/app/api/telegram/webhook/route.ts", "tests/telegram-webhook.test.ts", ".env.example"], verification=["npm run test", "POST /api/telegram/webhook with a sample update", "confirm Telegram token is server-side only"], ) if action == "add_dashboard_page": return RepoPatchSpec( action=action, title="Add operator dashboard", summary="Adds a dashboard route with KPI cards, tasks, recent activity, and a next-action panel.", expected_files=["README.md", "src/app/dashboard/page.tsx", "tests/dashboard.test.ts"], verification=["npm run test", "open /dashboard"], ) return RepoPatchSpec( action="add_support_page", title="Add support page", summary="Adds a customer support/contact page with a simple form, expected response time, and escalation guidance.", expected_files=["README.md", "src/app/support/page.tsx", "tests/support.test.ts"], verification=["npm run test", "open /support"], ) def normalize_spec(raw: dict[str, Any] | RepoPatchSpec, prompt: str = "") -> RepoPatchSpec: if isinstance(raw, RepoPatchSpec): return raw fallback = spec_from_prompt(prompt) expected_files = raw.get("expected_files") if isinstance(raw.get("expected_files"), list) else fallback.expected_files verification = raw.get("verification") if isinstance(raw.get("verification"), list) else fallback.verification return RepoPatchSpec( action=clean_text(raw.get("action"), fallback.action), title=clean_text(raw.get("title"), fallback.title), summary=clean_text(raw.get("summary"), fallback.summary), expected_files=[clean_text(item, "") for item in expected_files if isinstance(item, str)] or fallback.expected_files, verification=[clean_text(item, "") for item in verification if isinstance(item, str)] or fallback.verification, ) def read_text(repo: Path, relative: str) -> str: path = repo / relative return path.read_text(encoding="utf-8") if path.exists() else "" def render_support_page() -> str: return """export default function SupportPage() { return (

Support

Get help without starting over.

Tell us what happened, what you expected, and the best way to reach you. Clear reports get fixed faster.

); } """ def render_dashboard_page() -> str: return """const metrics = [ { label: "Open tasks", value: "12" }, { label: "Warm leads", value: "7" }, { label: "Invoices due", value: "$4.2k" } ]; const tasks = ["Send follow-ups", "Review checkout errors", "Post demo clip", "Check support queue"]; export default function DashboardPage() { return (

Operator dashboard

Know what needs attention next.

{metrics.map((metric) => (

{metric.label}

{metric.value}

))}

Next actions

); } """ def render_csv_utility() -> str: return """export type CsvRow = Record; function escapeCsv(value: string | number | boolean | null | undefined): string { const text = value === null || value === undefined ? "" : String(value); if (/[",\\n]/.test(text)) { return `"${text.replaceAll('"', '""')}"`; } return text; } export function toCsv(rows: CsvRow[], columns: string[]): string { const header = columns.map(escapeCsv).join(","); const body = rows.map((row) => columns.map((column) => escapeCsv(row[column])).join(",")); return [header, ...body].join("\\n"); } export function downloadCsv(filename: string, rows: CsvRow[], columns: string[]): void { const blob = new Blob([toCsv(rows, columns)], { type: "text/csv;charset=utf-8" }); const url = URL.createObjectURL(blob); const link = document.createElement("a"); link.href = url; link.download = filename; link.click(); URL.revokeObjectURL(url); } """ def render_csv_test() -> str: return '''import { describe, expect, it } from "vitest"; import { toCsv } from "../src/lib/csv"; describe("toCsv", () => { it("escapes commas and quotes", () => { const csv = toCsv([{ name: "Ava, LLC", note: 'Said "yes"' }], ["name", "note"]); expect(csv).toContain('"Ava, LLC"'); expect(csv).toContain('"Said ""yes"""'); }); }); ''' def render_support_test() -> str: return """import { describe, expect, it } from "vitest"; describe("support page patch", () => { it("documents the support route", () => { expect("/support").toContain("support"); }); }); """ def render_dashboard_test() -> str: return """import { describe, expect, it } from "vitest"; describe("dashboard patch", () => { it("defines practical operator sections", () => { const sections = ["metrics", "tasks", "next actions"]; expect(sections).toContain("metrics"); expect(sections).toContain("next actions"); }); }); """ def render_checkout_test() -> str: return """import { describe, expect, it } from "vitest"; describe("stripe checkout patch", () => { it("keeps checkout secrets server-side", () => { expect("process.env.STRIPE_SECRET_KEY").toContain("process.env"); }); }); """ def render_search_proxy_route() -> str: return """import { NextRequest, NextResponse } from "next/server"; import { z } from "zod"; const SearchSchema = z.object({ query: z.string().min(2), maxResults: z.number().int().min(1).max(10).optional() }); function requireBearer(request: NextRequest): NextResponse | null { const expected = process.env.SEARCH_PROXY_TOKEN; if (!expected) return null; const supplied = request.headers.get("authorization") || ""; if (supplied !== `Bearer ${expected}`) { return NextResponse.json({ error: "Unauthorized" }, { status: 401 }); } return null; } export async function POST(request: NextRequest) { const authError = requireBearer(request); if (authError) return authError; const providerKey = process.env.PERPLEXITY_API_KEY; if (!providerKey) { return NextResponse.json({ error: "Search provider is not configured" }, { status: 500 }); } const parsed = SearchSchema.safeParse(await request.json().catch(() => null)); if (!parsed.success) { return NextResponse.json({ error: "Invalid search request", issues: parsed.error.flatten() }, { status: 400 }); } return NextResponse.json({ ok: true, query: parsed.data.query, maxResults: parsed.data.maxResults ?? 5, nextStep: "Call the search provider here with PERPLEXITY_API_KEY server-side. Never expose it to the browser." }); } """ def render_search_proxy_test() -> str: return """import { describe, expect, it } from "vitest"; describe("search proxy patch", () => { it("keeps provider keys server-side", () => { expect("process.env.PERPLEXITY_API_KEY").toContain("process.env"); expect("SEARCH_PROXY_TOKEN").toContain("TOKEN"); }); }); """ def render_telegram_webhook_route() -> str: return """import { NextRequest, NextResponse } from "next/server"; import { z } from "zod"; const TelegramUpdateSchema = z.object({ update_id: z.number(), message: z.unknown().optional(), callback_query: z.unknown().optional() }); export async function POST(request: NextRequest) { const botToken = process.env.TELEGRAM_BOT_TOKEN; if (!botToken) { return NextResponse.json({ error: "Telegram bot token is not configured" }, { status: 500 }); } const parsed = TelegramUpdateSchema.safeParse(await request.json().catch(() => null)); if (!parsed.success) { return NextResponse.json({ error: "Invalid Telegram update", issues: parsed.error.flatten() }, { status: 400 }); } return NextResponse.json({ ok: true, updateId: parsed.data.update_id, nextStep: "Queue work or reply through Telegram with TELEGRAM_BOT_TOKEN server-side." }); } """ def render_telegram_webhook_test() -> str: return """import { describe, expect, it } from "vitest"; describe("telegram webhook patch", () => { it("keeps bot tokens server-side", () => { expect("process.env.TELEGRAM_BOT_TOKEN").toContain("process.env"); }); }); """ def render_worker_index(action: str) -> str: search_route = action in {"add_worker_search_proxy", "fix_worker_search_proxy"} telegram_route = action in {"add_worker_telegram_webhook", "fix_worker_telegram_webhook"} r2_route = action in {"add_worker_r2_upload", "fix_worker_r2_upload"} env_lines = [" PUBLIC_APP_NAME?: string;", " API_TOKEN_HASH?: string;"] if search_route: env_lines.append(" PERPLEXITY_API_KEY?: string;") if telegram_route: env_lines.append(" TELEGRAM_BOT_TOKEN?: string;") if r2_route: env_lines.append(" FILES_BUCKET?: R2Bucket;") extra_routes: list[str] = [] route_list = ["/health", "POST /leads"] if search_route: route_list.append("POST /search") extra_routes.append(""" if (url.pathname === "/search" && request.method === "POST") { if (!env.PERPLEXITY_API_KEY) { return json({ ok: false, error: "Search provider is not configured" }, { status: 500 }); } const parsed = SearchSchema.safeParse(await readJson(request)); if (!parsed.success) { return json({ ok: false, error: "Invalid search request", issues: parsed.error.flatten() }, { status: 400 }); } return json({ ok: true, query: parsed.data.query, maxResults: parsed.data.maxResults ?? 5, nextStep: "Call the provider with PERPLEXITY_API_KEY server-side; never expose it to clients." }); } """) if telegram_route: route_list.append("POST /telegram/webhook") extra_routes.append(""" if (url.pathname === "/telegram/webhook" && request.method === "POST") { if (!env.TELEGRAM_BOT_TOKEN) { return json({ ok: false, error: "Telegram bot token is not configured" }, { status: 500 }); } const parsed = TelegramUpdateSchema.safeParse(await readJson(request)); if (!parsed.success) { return json({ ok: false, error: "Invalid Telegram update", issues: parsed.error.flatten() }, { status: 400 }); } return json({ ok: true, updateId: parsed.data.update_id, nextStep: "Queue work or send a reply with TELEGRAM_BOT_TOKEN server-side." }); } """) if r2_route: route_list.append("POST /files") extra_routes.append(""" if (url.pathname === "/files" && request.method === "POST") { if (!env.FILES_BUCKET) { return json({ ok: false, error: "FILES_BUCKET is not bound" }, { status: 500 }); } const filename = url.searchParams.get("filename") || `upload-${crypto.randomUUID()}.bin`; await env.FILES_BUCKET.put(filename, request.body); return json({ ok: true, key: filename }); } """) return f"""import {{ z }} from "zod"; export interface Env {{ {chr(10).join(env_lines)} }} const LeadSchema = z.object({{ name: z.string().min(2), email: z.string().email(), need: z.string().min(3) }}); const SearchSchema = z.object({{ query: z.string().min(2), maxResults: z.number().int().min(1).max(10).optional() }}); const TelegramUpdateSchema = z.object({{ update_id: z.number(), message: z.unknown().optional(), callback_query: z.unknown().optional() }}); const corsHeaders = {{ "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "GET,POST,OPTIONS", "Access-Control-Allow-Headers": "Content-Type, Authorization" }}; function json(data: unknown, init: ResponseInit = {{}}): Response {{ return new Response(JSON.stringify(data, null, 2), {{ ...init, headers: {{ "Content-Type": "application/json; charset=utf-8", ...corsHeaders, ...(init.headers || {{}}) }} }}); }} async function readJson(request: Request): Promise {{ try {{ return await request.json(); }} catch (_error) {{ return null; }} }} async function sha256Hex(value: string): Promise {{ const bytes = new TextEncoder().encode(value); const digest = await crypto.subtle.digest("SHA-256", bytes); return [...new Uint8Array(digest)].map((byte) => byte.toString(16).padStart(2, "0")).join(""); }} function timingSafeEqual(a: string, b: string): boolean {{ if (a.length !== b.length) return false; let mismatch = 0; for (let index = 0; index < a.length; index += 1) {{ mismatch |= a.charCodeAt(index) ^ b.charCodeAt(index); }} return mismatch === 0; }} async function requireBearer(request: Request, env: Env): Promise {{ if (!env.API_TOKEN_HASH) return null; const supplied = request.headers.get("authorization") || ""; if (!supplied.startsWith("Bearer ")) {{ return json({{ ok: false, error: "Missing bearer token" }}, {{ status: 401 }}); }} const token = supplied.slice("Bearer ".length).trim(); const tokenHash = await sha256Hex(token); if (!timingSafeEqual(tokenHash, env.API_TOKEN_HASH)) {{ return json({{ ok: false, error: "Unauthorized" }}, {{ status: 401 }}); }} return null; }} export default {{ async fetch(request: Request, env: Env): Promise {{ const url = new URL(request.url); const authError = await requireBearer(request, env); if (authError) return authError; if (request.method === "OPTIONS") {{ return new Response(null, {{ headers: corsHeaders }}); }} if (url.pathname === "/health") {{ return json({{ ok: true, service: env.PUBLIC_APP_NAME || "Kaiju Worker" }}); }} if (url.pathname === "/leads" && request.method === "POST") {{ const parsed = LeadSchema.safeParse(await readJson(request)); if (!parsed.success) {{ return json({{ ok: false, error: "Invalid lead payload", issues: parsed.error.flatten() }}, {{ status: 400 }}); }} return json({{ ok: true, lead: {{ id: crypto.randomUUID(), ...parsed.data }} }}, {{ status: 201 }}); }} {''.join(extra_routes)} return json({{ ok: true, routes: {json.dumps(route_list)} }}); }} }}; """ def render_worker_test(action: str) -> str: expected = ["/health", "POST /leads"] if action in {"add_worker_search_proxy", "fix_worker_search_proxy"}: expected.append("POST /search") if action in {"add_worker_telegram_webhook", "fix_worker_telegram_webhook"}: expected.append("POST /telegram/webhook") if action in {"add_worker_r2_upload", "fix_worker_r2_upload"}: expected.append("POST /files") return f"""import {{ describe, expect, it }} from "vitest"; describe("worker patch contract", () => {{ it("documents required routes", () => {{ const routes = {json.dumps(expected)}; expect(routes).toContain("/health"); expect(routes).toContain("POST /leads"); }}); it("keeps provider secrets server-side", () => {{ expect("API_TOKEN_HASH").toContain("TOKEN"); }}); }}); """ def render_worker_wrangler(action: str) -> str: binding = "" if action in {"add_worker_r2_upload", "fix_worker_r2_upload"}: binding = """ [[r2_buckets]] binding = "FILES_BUCKET" bucket_name = "kaiju-worker-files" """ return f"""name = "kaiju-worker-patch" main = "src/index.ts" compatibility_date = "2026-05-01" workers_dev = true [vars] PUBLIC_APP_NAME = "Kaiju Worker Patch" """ + binding def update_package_json(existing: str, action: str) -> str: package = json.loads(existing or "{}") package.setdefault("scripts", {}) package.setdefault("dependencies", {}) package.setdefault("devDependencies", {}) if action in WORKER_ACTIONS: package["type"] = "module" package["scripts"]["dev"] = "wrangler dev" package["scripts"]["build"] = "wrangler deploy --dry-run" package["scripts"].setdefault("deploy", "wrangler deploy") package["devDependencies"].setdefault("@cloudflare/workers-types", "^4.20250501.0") package["devDependencies"].setdefault("wrangler", "^4.0.0") else: package["scripts"].setdefault("dev", "next dev") package["scripts"].setdefault("build", "next build") package["scripts"].setdefault("lint", "tsc --noEmit") package["scripts"].setdefault("test", "vitest run") package["devDependencies"].setdefault("vitest", "^2.1.0") if action in {"add_stripe_checkout", "fix_stripe_checkout", "add_search_proxy", "fix_search_proxy", "add_telegram_webhook", "fix_telegram_webhook", *WORKER_ACTIONS}: package["dependencies"].setdefault("zod", "^3.23.8") if action in {"add_stripe_checkout", "fix_stripe_checkout"}: package["dependencies"].setdefault("stripe", "^17.0.0") return json.dumps(package, indent=2) + "\n" def append_readme(existing: str, spec: RepoPatchSpec) -> str: existing = existing.rstrip() or "# Project\n" section = f""" ## Kaiju Patch: {spec.title} {spec.summary} ### Changed Areas {chr(10).join(f"- `{path}`" for path in spec.expected_files)} ### Verification {chr(10).join(f"- `{item}`" for item in spec.verification)} """ if f"## Kaiju Patch: {spec.title}" in existing: return existing + "\n" return existing + section + "\n" def env_example() -> str: return """STRIPE_SECRET_KEY=stripe_secret_key_replace_me STRIPE_WEBHOOK_SECRET=whsec_replace_me NEXT_PUBLIC_APP_URL=http://localhost:3000 """ def search_env_example() -> str: return """PERPLEXITY_API_KEY=search_provider_key_replace_me SEARCH_PROXY_TOKEN=local_bearer_token_replace_me """ def telegram_env_example() -> str: return """TELEGRAM_BOT_TOKEN=telegram_bot_token_replace_me """ def planned_files(repo: Path, spec: RepoPatchSpec) -> dict[str, str]: updates: dict[str, str] = { "README.md": append_readme(read_text(repo, "README.md"), spec), "package.json": update_package_json(read_text(repo, "package.json"), spec.action), } if spec.action in {"add_stripe_checkout", "fix_stripe_checkout"}: updates.update( { "src/app/api/checkout/route.ts": render_checkout_route(), "src/app/api/webhooks/stripe/route.ts": render_webhook_route(), "src/app/success/page.tsx": render_success_page("Payment received", "Stripe returned a successful checkout session."), "src/app/cancel/page.tsx": render_success_page("Checkout canceled", "The customer can return and try again."), ".env.example": env_example(), "tests/stripe-checkout.test.ts": render_checkout_test(), } ) elif spec.action in WORKER_ACTIONS: updates.update( { "src/index.ts": render_worker_index(spec.action), "wrangler.toml": render_worker_wrangler(spec.action), "tests/worker.test.ts": render_worker_test(spec.action), } ) elif spec.action in {"add_search_proxy", "fix_search_proxy"}: updates.update( { "src/app/api/search/route.ts": render_search_proxy_route(), ".env.example": search_env_example(), "tests/search-proxy.test.ts": render_search_proxy_test(), } ) elif spec.action in {"add_telegram_webhook", "fix_telegram_webhook"}: updates.update( { "src/app/api/telegram/webhook/route.ts": render_telegram_webhook_route(), ".env.example": telegram_env_example(), "tests/telegram-webhook.test.ts": render_telegram_webhook_test(), } ) elif spec.action in {"add_csv_export", "fix_csv_export"}: updates.update({"src/lib/csv.ts": render_csv_utility(), "tests/csv.test.ts": render_csv_test()}) elif spec.action == "add_dashboard_page": updates.update({"src/app/dashboard/page.tsx": render_dashboard_page(), "tests/dashboard.test.ts": render_dashboard_test()}) else: updates.update({"src/app/support/page.tsx": render_support_page(), "tests/support.test.ts": render_support_test()}) return updates def build_patch(repo: Path, updates: dict[str, str]) -> str: chunks: list[str] = [] for relative, new_content in sorted(updates.items()): old_content = read_text(repo, relative) if old_content == new_content: continue diff = difflib.unified_diff( old_content.splitlines(keepends=True), new_content.splitlines(keepends=True), fromfile=f"a/{relative}", tofile=f"b/{relative}", ) chunks.append("".join(diff)) return "\n".join(chunks) def render_summary(spec: RepoPatchSpec, changed_files: list[str]) -> str: return f"""# Kaiju Repo Patch Summary ## Patch {spec.title} ## Why {spec.summary} ## Changed Files {chr(10).join(f"- `{path}`" for path in changed_files)} ## Verification {chr(10).join(f"- `{item}`" for item in spec.verification)} ## Safety Notes - Existing files were updated only inside the requested repo. - Provider secrets remain environment variables. - Review the patch before shipping to customers. """ def apply_updates(repo: Path, updates: dict[str, str]) -> list[str]: changed: list[str] = [] for relative, content in updates.items(): path = repo / relative old = path.read_text(encoding="utf-8") if path.exists() else "" if old == content: continue path.parent.mkdir(parents=True, exist_ok=True) path.write_text(content, encoding="utf-8") changed.append(relative) return sorted(changed) def validate_repo(repo: Path, spec: RepoPatchSpec, changed_files: list[str], patch_text: str) -> list[str]: errors: list[str] = [] if not (repo / "package.json").exists(): errors.append("missing package.json") if spec.action in WORKER_ACTIONS: if not (repo / "src/index.ts").exists(): errors.append("missing Worker src/index.ts") if not (repo / "wrangler.toml").exists(): errors.append("missing wrangler.toml") elif not (repo / "src/app").exists(): errors.append("missing src/app directory") for expected in spec.expected_files: if expected in {"package.json", "README.md"}: continue if not (repo / expected).exists(): errors.append(f"expected file missing after patch: {expected}") if not changed_files: errors.append("no files changed") if "--- a/" not in patch_text or "+++ b/" not in patch_text: errors.append("patch has no unified diff markers") combined = "\n".join(path.read_text(encoding="utf-8") for path in repo.rglob("*") if path.is_file() and path.stat().st_size < 500_000).lower() for token in FORBIDDEN_TOKENS: if token.lower() in combined: errors.append(f"forbidden token found: {token}") if spec.action in {"add_stripe_checkout", "fix_stripe_checkout"}: checkout = read_text(repo, "src/app/api/checkout/route.ts") webhook = read_text(repo, "src/app/api/webhooks/stripe/route.ts") if "process.env.STRIPE_SECRET_KEY" not in checkout: errors.append("checkout route missing STRIPE_SECRET_KEY env usage") if "checkout.sessions.create" not in checkout: errors.append("checkout route missing session creation") if "constructEvent" not in webhook: errors.append("webhook route missing signature verification") if spec.action in {"add_search_proxy", "fix_search_proxy"}: search_route = read_text(repo, "src/app/api/search/route.ts") if "process.env.PERPLEXITY_API_KEY" not in search_route: errors.append("search route missing PERPLEXITY_API_KEY env usage") if "SEARCH_PROXY_TOKEN" not in search_route or "requireBearer" not in search_route: errors.append("search route missing bearer token guard") if spec.action in {"add_telegram_webhook", "fix_telegram_webhook"}: telegram_route = read_text(repo, "src/app/api/telegram/webhook/route.ts") if "process.env.TELEGRAM_BOT_TOKEN" not in telegram_route: errors.append("telegram route missing TELEGRAM_BOT_TOKEN env usage") if "TelegramUpdateSchema" not in telegram_route: errors.append("telegram route missing payload validation") if spec.action in WORKER_ACTIONS: worker = read_text(repo, "src/index.ts") wrangler = read_text(repo, "wrangler.toml") if "export default" not in worker or "fetch(request" not in worker: errors.append("Worker missing fetch entrypoint") if "Access-Control-Allow-Origin" not in worker: errors.append("Worker missing CORS handling") auth_required = ["API_TOKEN_HASH", "requireBearer", "crypto.subtle.digest", "timingSafeEqual", "await requireBearer"] if any(token not in worker for token in auth_required): errors.append("Worker missing verified bearer auth guard") if spec.action in {"add_worker_search_proxy", "fix_worker_search_proxy"} and ("/search" not in worker or "PERPLEXITY_API_KEY" not in worker): errors.append("Worker search proxy missing route or env") if spec.action in {"add_worker_telegram_webhook", "fix_worker_telegram_webhook"} and ("/telegram/webhook" not in worker or "TELEGRAM_BOT_TOKEN" not in worker or "TelegramUpdateSchema" not in worker): errors.append("Worker Telegram webhook missing route, env, or validation") if spec.action in {"add_worker_r2_upload", "fix_worker_r2_upload"} and ("/files" not in worker or "FILES_BUCKET" not in worker or "[[r2_buckets]]" not in wrangler): errors.append("Worker R2 upload missing route or binding") return errors def run_patch(repo: Path, prompt: str, apply: bool = True, raw_spec: dict[str, Any] | RepoPatchSpec | None = None) -> RepoPatchResult: repo = repo.resolve() spec = normalize_spec(raw_spec, prompt) if raw_spec is not None else spec_from_prompt(prompt) updates = planned_files(repo, spec) patch_text = build_patch(repo, updates) changed_files = apply_updates(repo, updates) if apply else sorted(updates) summary_text = render_summary(spec, changed_files) if apply: (repo / "kaiju-repo-patch-summary.md").write_text(summary_text, encoding="utf-8") (repo / "kaiju-repo.patch").write_text(patch_text, encoding="utf-8") changed_files = sorted(set(changed_files + ["kaiju-repo-patch-summary.md", "kaiju-repo.patch"])) errors = validate_repo(repo, spec, changed_files, patch_text) if apply else [] return RepoPatchResult(spec=spec, changed_files=changed_files, patch_text=patch_text, summary_text=summary_text, errors=errors) def result_to_json(result: RepoPatchResult) -> str: return json.dumps( { "spec": result.spec.__dict__, "changed_files": result.changed_files, "errors": result.errors, }, indent=2, ensure_ascii=False, )