File size: 10,403 Bytes
3436bdd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
#!/usr/bin/env python3
from __future__ import annotations

import argparse
import json
import re
import subprocess
import sys
import tempfile
from pathlib import Path
from typing import Any

import yaml

ROOT = Path(__file__).resolve().parents[1]
DEFAULT_CONFIG = ROOT / "self_improve.yaml"
DEFAULT_SCHEMA = ROOT / "schemas" / "self_improve_proposal_v0.json"
DEFAULT_INFERENCE = ROOT / "inference.yaml"
DEFAULT_USER_GOVERNANCE = ROOT / "build" / "system" / "user_governance.json"


def load_yaml(path: Path) -> dict[str, Any]:
    data = yaml.safe_load(path.read_text(encoding="utf-8"))
    if not isinstance(data, dict):
        raise ValueError(f"{path} did not decode to a mapping")
    return data


def load_json(path: Path) -> dict[str, Any]:
    data = json.loads(path.read_text(encoding="utf-8"))
    if not isinstance(data, dict):
        raise ValueError(f"{path} did not decode to an object")
    return data


def allowed_path(path: str, roots: list[str]) -> bool:
    for root in roots:
        if root.endswith("/"):
            if path.startswith(root):
                return True
        elif path == root:
            return True
    return False


def backend_timeout_seconds(backend: dict[str, Any]) -> float | None:
    raw = backend.get("timeout_seconds")
    if raw in (None, "", 0):
        return None
    timeout = float(raw)
    if timeout <= 0:
        raise ValueError("timeout_seconds must be positive when configured")
    return timeout


def sanitize_manifest_id(goal: str) -> str:
    slug = re.sub(r"[^a-z0-9]+", "-", goal.lower()).strip("-")
    slug = slug[:48] or "self-improve"
    return f"self-improve-{slug}"


def build_prompt(
    *,
    goal: str,
    config: dict[str, Any],
    system_context: dict[str, Any],
    policy: dict[str, Any],
    runtime_contract: dict[str, Any],
    default_benchmark: str,
    user_governance: dict[str, Any] | None,
) -> str:
    compact_context = {
        "current_position": {
            "slice_id": system_context.get("current_position", {}).get("slice_id"),
            "default_profile": system_context.get("current_position", {}).get("default_profile"),
            "role": system_context.get("current_position", {}).get("role"),
        },
        "latest_runtime_state": system_context.get("latest_runtime_state", {}),
        "agent_bootstrap": {
            "trust_order": system_context.get("agent_bootstrap", {}).get("trust_order", []),
            "first_move": system_context.get("agent_bootstrap", {}).get("first_move"),
        },
    }
    compact_policy = {
        "bits": policy.get("bits", []),
        "vectors": policy.get("vectors", []),
        "invariants": policy.get("invariants", []),
    }
    compact_runtime = {
        "one_liner": runtime_contract.get("one_liner"),
        "contract": runtime_contract.get("contract"),
        "acceptance_bar": runtime_contract.get("acceptance_bar", []),
    }
    compact_governance = None
    if user_governance:
        compact_governance = {
            "governing_rules": user_governance.get("governing_rules", []),
            "motif_rule": user_governance.get("motif_rule", ""),
            "next_moves": user_governance.get("next_moves", []),
            "operator_next_tasks": user_governance.get("operator_next_tasks", []),
        }
    return "\n".join(
        [
            "You are proposing one bounded self-improvement for the bit_vector_tensor_control_policy repo.",
            "Produce only JSON matching the schema.",
            "The proposal must be small, concrete, and safe to execute through the local runtime.",
            f"Maximum touched files: {config['max_files']}.",
            f"Allowed roots: {', '.join(config['allowed_roots'])}.",
            "Only use manifest actions of type `write_file`.",
            "Do not propose shell actions.",
            "Return full replacement content for every touched file.",
            "Prefer docs, configs, and thin runtime/policy glue over large rewrites.",
            f"Use this benchmark command unless a narrower benchmark is clearly better: {default_benchmark}.",
            "The change should improve the product shell itself, not produce an external research artifact.",
            "Prefer the highest-ranked partial or requested next move from user governance when it can be advanced in one bounded change.",
            "",
            "System context:",
            json.dumps(compact_context, ensure_ascii=True, separators=(",", ":")),
            "",
            "Policy context:",
            json.dumps(compact_policy, ensure_ascii=True, separators=(",", ":")),
            "",
            "Runtime contract:",
            json.dumps(compact_runtime, ensure_ascii=True, separators=(",", ":")),
            "",
            "User governance:",
            json.dumps(compact_governance or {}, ensure_ascii=True, separators=(",", ":")),
            "",
            f"Improvement goal: {goal}",
        ]
    )


def validate_proposal(proposal: dict[str, Any], config: dict[str, Any]) -> None:
    roots = config["allowed_roots"]
    target_files = proposal.get("target_files", [])
    if not target_files:
        raise ValueError("proposal did not include target_files")
    if len(target_files) > int(config["max_files"]):
        raise ValueError("proposal exceeded max_files")
    for path in target_files:
        if not allowed_path(path, roots):
            raise ValueError(f"target file outside allowed roots: {path}")
    manifest = proposal.get("manifest", {})
    actions = manifest.get("actions", [])
    if len(actions) == 0:
        raise ValueError("proposal manifest had no actions")
    if len(actions) > int(config["max_files"]):
        raise ValueError("proposal manifest exceeded max_files")
    action_paths = []
    for action in actions:
        if action.get("type") != "write_file":
            raise ValueError("proposal manifest included unsupported action type")
        path = action.get("path", "")
        if not allowed_path(path, roots):
            raise ValueError(f"manifest path outside allowed roots: {path}")
        action_paths.append(path)
    if sorted(target_files) != sorted(action_paths):
        raise ValueError("target_files and manifest action paths diverged")


def run_codex_proposal(
    *,
    goal: str,
    config_path: Path,
    system_context_path: Path,
    output_path: Path,
    schema_path: Path,
) -> dict[str, Any]:
    config = load_yaml(config_path)
    inference = load_yaml(DEFAULT_INFERENCE)
    backend_id = inference["default_backend"]
    backend = dict(inference["backends"][backend_id])
    proposal_model = config.get("proposal_model")
    if proposal_model:
        backend["model"] = proposal_model
    system_context = load_json(system_context_path)
    policy = load_json(ROOT / "policy" / "control_language_v0.json")
    runtime_contract = load_json(ROOT / "runtime" / "work_manifest_v0.json")
    user_governance = load_json(DEFAULT_USER_GOVERNANCE) if DEFAULT_USER_GOVERNANCE.exists() else None
    default_benchmark = config["default_benchmark"]["command"]
    prompt = build_prompt(
        goal=goal,
        config=config,
        system_context=system_context,
        policy=policy,
        runtime_contract=runtime_contract,
        default_benchmark=default_benchmark,
        user_governance=user_governance,
    )

    output_path.parent.mkdir(parents=True, exist_ok=True)
    with tempfile.NamedTemporaryFile("w", suffix=".json", delete=False, encoding="utf-8") as temp_schema:
        temp_schema.write(schema_path.read_text(encoding="utf-8"))
        temp_schema_path = Path(temp_schema.name)

    command = [backend.get("command", "codex"), "exec"]
    if backend.get("model"):
        command.extend(["-m", str(backend["model"])])
    if backend.get("sandbox"):
        command.extend(["-s", str(backend["sandbox"])])
    if backend.get("ephemeral", False):
        command.append("--ephemeral")
    if backend.get("skip_git_repo_check", False):
        command.append("--skip-git-repo-check")
    command.extend(
        [
            "-C",
            str(ROOT),
            "--output-schema",
            str(temp_schema_path),
            "-o",
            str(output_path),
            "-",
        ]
    )

    timeout_seconds = config.get("proposal_timeout_seconds")
    if timeout_seconds in (None, "", 0):
        timeout = backend_timeout_seconds(backend)
    else:
        timeout = float(timeout_seconds)
        if timeout <= 0:
            raise ValueError("proposal_timeout_seconds must be positive when configured")

    try:
        completed = subprocess.run(
            command,
            input=prompt,
            text=True,
            capture_output=True,
            cwd=ROOT,
            check=False,
            timeout=timeout,
        )
    except subprocess.TimeoutExpired as exc:
        raise RuntimeError(f"codex exec timed out after {exc.timeout} seconds") from exc
    finally:
        temp_schema_path.unlink(missing_ok=True)

    if completed.returncode != 0:
        raise RuntimeError(completed.stderr.strip() or "codex exec failed")

    proposal = load_json(output_path)
    proposal["manifest"]["manifest_id"] = sanitize_manifest_id(goal)
    proposal["manifest"]["goal"] = proposal.get("goal", goal)
    if not proposal.get("benchmark", {}).get("command"):
        proposal["benchmark"] = {"command": default_benchmark}
    validate_proposal(proposal, config)
    output_path.write_text(json.dumps(proposal, indent=2, sort_keys=True) + "\n", encoding="utf-8")
    return proposal


def main() -> int:
    parser = argparse.ArgumentParser(description="Use Codex CLI to propose one bounded self-improvement manifest.")
    parser.add_argument("--goal", required=True)
    parser.add_argument("--config", default=str(DEFAULT_CONFIG))
    parser.add_argument("--schema", default=str(DEFAULT_SCHEMA))
    parser.add_argument("--system-context", required=True)
    parser.add_argument("--output", required=True)
    args = parser.parse_args()

    proposal = run_codex_proposal(
        goal=args.goal,
        config_path=Path(args.config),
        system_context_path=Path(args.system_context),
        output_path=Path(args.output),
        schema_path=Path(args.schema),
    )
    json.dump(proposal, sys.stdout, indent=2)
    sys.stdout.write("\n")
    return 0


if __name__ == "__main__":
    raise SystemExit(main())