File size: 6,664 Bytes
d95f073
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
#!/usr/bin/env python3
"""Apply reviewed Cloudflare D1/KV/R2 bindings to the Kaiju Worker config.

The script is preview-only by default. It accepts resource IDs/names, not
secrets, and refuses placeholder or secret-looking values.
"""

from __future__ import annotations

import argparse
import json
import re
import sys
from pathlib import Path
from typing import Any


ROOT = Path(__file__).resolve().parents[1]
DEFAULT_BINDINGS = ROOT / "release/cloudflare-bindings.json"
DEFAULT_WRANGLER = ROOT / "gateway/cloudflare-worker/wrangler.jsonc"
SECRET_PATTERNS = [
    ("openai_api_key", re.compile(r"\bsk-[A-Za-z0-9][A-Za-z0-9_-]{20,}\b")),
    ("anthropic_api_key", re.compile(r"\bsk-ant-[A-Za-z0-9_-]{20,}\b")),
    ("stripe_secret_key", re.compile(r"\b[rs]k_(?:live|test)_[A-Za-z0-9]{16,}\b")),
    ("stripe_webhook_secret", re.compile(r"\bwhsec_[A-Za-z0-9]{16,}\b")),
    ("huggingface_token", re.compile(r"\bhf_[A-Za-z0-9]{20,}\b")),
    ("github_token", re.compile(r"\b(?:ghp_[A-Za-z0-9]{20,}|github_pat_[A-Za-z0-9_]{22,})\b")),
    ("bearer_token", re.compile(r"\bBearer\s+[A-Za-z0-9._~+/-]{24,}={0,2}\b", re.IGNORECASE)),
    ("private_key_block", re.compile(r"-----BEGIN (?:RSA |OPENSSH |EC |DSA )?PRIVATE KEY-----")),
]


def strip_jsonc(text: str) -> str:
    text = re.sub(r"/\*.*?\*/", "", text, flags=re.DOTALL)
    lines = []
    for line in text.splitlines():
        in_string = False
        escaped = False
        output = []
        index = 0
        while index < len(line):
            char = line[index]
            nxt = line[index + 1] if index + 1 < len(line) else ""
            if char == "\\" and in_string:
                escaped = not escaped
                output.append(char)
            elif char == '"' and not escaped:
                in_string = not in_string
                output.append(char)
            elif char == "/" and nxt == "/" and not in_string:
                break
            else:
                escaped = False
                output.append(char)
            index += 1
        lines.append("".join(output))
    return re.sub(r",\s*([}\]])", r"\1", "\n".join(lines))


def load_json_or_jsonc(path: Path) -> dict[str, Any]:
    text = path.read_text(encoding="utf-8")
    try:
        return json.loads(strip_jsonc(text))
    except json.JSONDecodeError as exc:
        raise SystemExit(f"{path} is not valid JSON/JSONC: {exc}") from exc


def secret_findings(payload: Any) -> list[str]:
    rendered = json.dumps(payload, sort_keys=True)
    return sorted({label for label, pattern in SECRET_PATTERNS if pattern.search(rendered)})


def safe_value(value: Any, *, name: str, pattern: str, allow_placeholder: bool = False) -> str:
    text = str(value or "").strip()
    if not text:
        raise SystemExit(f"Missing required Cloudflare binding value: {name}")
    if not allow_placeholder and text.startswith("replace_with_"):
        raise SystemExit(f"Refusing placeholder Cloudflare binding value for {name}: {text}")
    if not re.fullmatch(pattern, text):
        raise SystemExit(f"Unsafe Cloudflare binding value for {name}: {text!r}")
    return text


def build_bindings(raw: dict[str, Any]) -> dict[str, Any]:
    findings = secret_findings(raw)
    if findings:
        raise SystemExit("Refusing secret-looking Cloudflare binding input: " + ", ".join(findings))

    d1 = raw.get("d1_database") or {}
    kv = raw.get("kv_namespace") or {}
    r2 = raw.get("r2_bucket") or {}

    result: dict[str, Any] = {
        "d1_databases": [
            {
                "binding": safe_value(d1.get("binding", "KAIJU_BILLING_DB"), name="d1_database.binding", pattern=r"[A-Z0-9_]{3,64}"),
                "database_name": safe_value(d1.get("database_name", "kaiju_api_billing"), name="d1_database.database_name", pattern=r"[A-Za-z0-9._-]{3,128}"),
                "database_id": safe_value(d1.get("database_id"), name="d1_database.database_id", pattern=r"[A-Za-z0-9_-]{12,128}"),
            }
        ],
        "kv_namespaces": [
            {
                "binding": safe_value(kv.get("binding", "KAIJU_RATE_LIMIT_KV"), name="kv_namespace.binding", pattern=r"[A-Z0-9_]{3,64}"),
                "id": safe_value(kv.get("id"), name="kv_namespace.id", pattern=r"[A-Za-z0-9_-]{12,128}"),
            }
        ],
        "r2_buckets": [
            {
                "binding": safe_value(r2.get("binding", "KAIJU_ARTIFACT_BUCKET"), name="r2_bucket.binding", pattern=r"[A-Z0-9_]{3,64}"),
                "bucket_name": safe_value(r2.get("bucket_name", "kaiju-api-artifacts"), name="r2_bucket.bucket_name", pattern=r"[a-z0-9][a-z0-9.-]{1,61}[a-z0-9]"),
            }
        ],
    }
    if "workers_dev" in raw:
        if not isinstance(raw["workers_dev"], bool):
            raise SystemExit("workers_dev must be true or false")
        result["workers_dev"] = raw["workers_dev"]
    return result


def apply_bindings(config: dict[str, Any], bindings: dict[str, Any]) -> dict[str, Any]:
    updated = dict(config)
    for key in ["d1_databases", "kv_namespaces", "r2_buckets"]:
        updated[key] = bindings[key]
    if "workers_dev" in bindings:
        updated["workers_dev"] = bindings["workers_dev"]
    return updated


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(description=__doc__)
    parser.add_argument("--bindings-file", type=Path, default=DEFAULT_BINDINGS)
    parser.add_argument("--wrangler-config", type=Path, default=DEFAULT_WRANGLER)
    parser.add_argument("--out", type=Path, help="Write preview to this file instead of stdout. With --write, defaults to --wrangler-config.")
    parser.add_argument("--write", action="store_true", help="Update wrangler.jsonc. Default is preview only.")
    return parser.parse_args()


def main() -> int:
    args = parse_args()
    raw = load_json_or_jsonc(args.bindings_file)
    config = load_json_or_jsonc(args.wrangler_config)
    updated = apply_bindings(config, build_bindings(raw))
    rendered = json.dumps(updated, indent=2, sort_keys=False) + "\n"

    if args.write:
        target = args.out or args.wrangler_config
        target.parent.mkdir(parents=True, exist_ok=True)
        target.write_text(rendered, encoding="utf-8")
        print(f"Wrote reviewed Cloudflare bindings to {target}")
    elif args.out:
        args.out.parent.mkdir(parents=True, exist_ok=True)
        args.out.write_text(rendered, encoding="utf-8")
        print(f"Wrote preview Cloudflare config to {args.out}")
    else:
        print(rendered, end="")
        print("Preview only. Pass --write to update wrangler.jsonc.", file=sys.stderr)
    return 0


if __name__ == "__main__":
    raise SystemExit(main())