#!/usr/bin/env python3 """Apply reviewed Cloudflare D1/KV/R2 bindings to the Kaiju Worker config. The script is preview-only by default. It accepts resource IDs/names, not secrets, and refuses placeholder or secret-looking values. """ from __future__ import annotations import argparse import json import re import sys from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[1] DEFAULT_BINDINGS = ROOT / "release/cloudflare-bindings.json" DEFAULT_WRANGLER = ROOT / "gateway/cloudflare-worker/wrangler.jsonc" SECRET_PATTERNS = [ ("openai_api_key", re.compile(r"\bsk-[A-Za-z0-9][A-Za-z0-9_-]{20,}\b")), ("anthropic_api_key", re.compile(r"\bsk-ant-[A-Za-z0-9_-]{20,}\b")), ("stripe_secret_key", re.compile(r"\b[rs]k_(?:live|test)_[A-Za-z0-9]{16,}\b")), ("stripe_webhook_secret", re.compile(r"\bwhsec_[A-Za-z0-9]{16,}\b")), ("huggingface_token", re.compile(r"\bhf_[A-Za-z0-9]{20,}\b")), ("github_token", re.compile(r"\b(?:ghp_[A-Za-z0-9]{20,}|github_pat_[A-Za-z0-9_]{22,})\b")), ("bearer_token", re.compile(r"\bBearer\s+[A-Za-z0-9._~+/-]{24,}={0,2}\b", re.IGNORECASE)), ("private_key_block", re.compile(r"-----BEGIN (?:RSA |OPENSSH |EC |DSA )?PRIVATE KEY-----")), ] def strip_jsonc(text: str) -> str: text = re.sub(r"/\*.*?\*/", "", text, flags=re.DOTALL) lines = [] for line in text.splitlines(): in_string = False escaped = False output = [] index = 0 while index < len(line): char = line[index] nxt = line[index + 1] if index + 1 < len(line) else "" if char == "\\" and in_string: escaped = not escaped output.append(char) elif char == '"' and not escaped: in_string = not in_string output.append(char) elif char == "/" and nxt == "/" and not in_string: break else: escaped = False output.append(char) index += 1 lines.append("".join(output)) return re.sub(r",\s*([}\]])", r"\1", "\n".join(lines)) def load_json_or_jsonc(path: Path) -> dict[str, Any]: text = path.read_text(encoding="utf-8") try: return json.loads(strip_jsonc(text)) except json.JSONDecodeError as exc: raise SystemExit(f"{path} is not valid JSON/JSONC: {exc}") from exc def secret_findings(payload: Any) -> list[str]: rendered = json.dumps(payload, sort_keys=True) return sorted({label for label, pattern in SECRET_PATTERNS if pattern.search(rendered)}) def safe_value(value: Any, *, name: str, pattern: str, allow_placeholder: bool = False) -> str: text = str(value or "").strip() if not text: raise SystemExit(f"Missing required Cloudflare binding value: {name}") if not allow_placeholder and text.startswith("replace_with_"): raise SystemExit(f"Refusing placeholder Cloudflare binding value for {name}: {text}") if not re.fullmatch(pattern, text): raise SystemExit(f"Unsafe Cloudflare binding value for {name}: {text!r}") return text def build_bindings(raw: dict[str, Any]) -> dict[str, Any]: findings = secret_findings(raw) if findings: raise SystemExit("Refusing secret-looking Cloudflare binding input: " + ", ".join(findings)) d1 = raw.get("d1_database") or {} kv = raw.get("kv_namespace") or {} r2 = raw.get("r2_bucket") or {} result: dict[str, Any] = { "d1_databases": [ { "binding": safe_value(d1.get("binding", "KAIJU_BILLING_DB"), name="d1_database.binding", pattern=r"[A-Z0-9_]{3,64}"), "database_name": safe_value(d1.get("database_name", "kaiju_api_billing"), name="d1_database.database_name", pattern=r"[A-Za-z0-9._-]{3,128}"), "database_id": safe_value(d1.get("database_id"), name="d1_database.database_id", pattern=r"[A-Za-z0-9_-]{12,128}"), } ], "kv_namespaces": [ { "binding": safe_value(kv.get("binding", "KAIJU_RATE_LIMIT_KV"), name="kv_namespace.binding", pattern=r"[A-Z0-9_]{3,64}"), "id": safe_value(kv.get("id"), name="kv_namespace.id", pattern=r"[A-Za-z0-9_-]{12,128}"), } ], "r2_buckets": [ { "binding": safe_value(r2.get("binding", "KAIJU_ARTIFACT_BUCKET"), name="r2_bucket.binding", pattern=r"[A-Z0-9_]{3,64}"), "bucket_name": safe_value(r2.get("bucket_name", "kaiju-api-artifacts"), name="r2_bucket.bucket_name", pattern=r"[a-z0-9][a-z0-9.-]{1,61}[a-z0-9]"), } ], } if "workers_dev" in raw: if not isinstance(raw["workers_dev"], bool): raise SystemExit("workers_dev must be true or false") result["workers_dev"] = raw["workers_dev"] return result def apply_bindings(config: dict[str, Any], bindings: dict[str, Any]) -> dict[str, Any]: updated = dict(config) for key in ["d1_databases", "kv_namespaces", "r2_buckets"]: updated[key] = bindings[key] if "workers_dev" in bindings: updated["workers_dev"] = bindings["workers_dev"] return updated def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--bindings-file", type=Path, default=DEFAULT_BINDINGS) parser.add_argument("--wrangler-config", type=Path, default=DEFAULT_WRANGLER) parser.add_argument("--out", type=Path, help="Write preview to this file instead of stdout. With --write, defaults to --wrangler-config.") parser.add_argument("--write", action="store_true", help="Update wrangler.jsonc. Default is preview only.") return parser.parse_args() def main() -> int: args = parse_args() raw = load_json_or_jsonc(args.bindings_file) config = load_json_or_jsonc(args.wrangler_config) updated = apply_bindings(config, build_bindings(raw)) rendered = json.dumps(updated, indent=2, sort_keys=False) + "\n" if args.write: target = args.out or args.wrangler_config target.parent.mkdir(parents=True, exist_ok=True) target.write_text(rendered, encoding="utf-8") print(f"Wrote reviewed Cloudflare bindings to {target}") elif args.out: args.out.parent.mkdir(parents=True, exist_ok=True) args.out.write_text(rendered, encoding="utf-8") print(f"Wrote preview Cloudflare config to {args.out}") else: print(rendered, end="") print("Preview only. Pass --write to update wrangler.jsonc.", file=sys.stderr) return 0 if __name__ == "__main__": raise SystemExit(main())