| |
| """Launch an interactive Codex TUI in tmux and approve its requests. |
| |
| This is deliberately an interactive Codex session, not ``codex exec``. A |
| small daemon watches only tmux sessions created with the configured prefix and |
| presses the first, one-shot approval choice when a known Codex approval overlay |
| has been stable for multiple polls. |
| |
| Typical use:: |
| |
| python3 codex_auto_run.py -p "implement the task and run the tests" |
| python3 codex_auto_run.py -p "fix it" -- --model gpt-5.4 |
| python3 codex_auto_run.py --resume-last -C ~/code |
| python3 codex_auto_run.py --resume SESSION_ID -p "continue the task" |
| python3 codex_auto_run.py --resume -C ~/code |
| python3 codex_auto_run.py --status |
| python3 codex_auto_run.py --stop-daemon |
| |
| The wrapper owns ``-p``/``--prompt``. Codex itself uses ``-p`` for profiles; |
| use ``--codex-profile NAME`` or place Codex options after ``--``. |
| |
| Security model |
| -------------- |
| By default the watcher approves Codex command, edit, permission, and network |
| approval overlays. It does not answer ordinary questions, enable full access, |
| trust hooks, install plugins, or approve MCP/app tool calls. Use |
| ``--approve-mcp`` and ``--auto-trust-directory`` only when those broader side |
| effects are intended. ``--bypass`` maps to Codex's dangerous no-sandbox mode |
| and should only be used inside an externally isolated container or VM. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import fcntl |
| import hashlib |
| import json |
| import os |
| import re |
| import secrets |
| import shlex |
| import shutil |
| import signal |
| import subprocess |
| import sys |
| import time |
| from dataclasses import dataclass |
| from datetime import datetime |
| from pathlib import Path |
| from typing import TextIO |
|
|
|
|
| SCRIPT_VERSION = 1 |
| DEFAULT_SESSION_PREFIX = "codex-auto" |
| DEFAULT_POLL_INTERVAL = 0.35 |
| DEFAULT_COOLDOWN = 0.8 |
| DEFAULT_STABILITY_POLLS = 2 |
| DEFAULT_REARM_INTERVAL = 2.0 |
| DEFAULT_IDLE_EXIT_SECONDS = 120.0 |
| MAX_LOG_BYTES = 2 * 1024 * 1024 |
| LOG_BACKUPS = 3 |
| MAX_OVERLAY_ROWS = 80 |
|
|
| ANSI_ESCAPE_RE = re.compile(r"\x1b(?:\[[0-?]*[ -/]*[@-~]|\][^\x07]*(?:\x07|\x1b\\))") |
| SELECTED_FIRST_RE = re.compile(r"^[›>]\s*1\.\s+(?P<label>.+)$", re.IGNORECASE) |
| STANDARD_FOOTER_RE = re.compile( |
| r"^press\s+enter\s+to\s+confirm\s+or\s+.+\s+to\s+cancel(?:\s+or\s+.+)?$", |
| re.IGNORECASE, |
| ) |
| MCP_FORM_FOOTER_RE = re.compile(r"^enter to submit(?: all)?\s*\|\s*esc to cancel$", re.IGNORECASE) |
| TRUST_FOOTER_RE = re.compile(r"^press enter to continue.*$", re.IGNORECASE) |
| NETWORK_TITLE_RE = re.compile( |
| r'^do you want to approve network access to ".+"\?$', |
| re.IGNORECASE, |
| ) |
| MCP_TITLE_RE = re.compile(r"^.+ needs your approval\.$", re.IGNORECASE) |
|
|
| STANDARD_TITLES: dict[str, str] = { |
| "Would you like to run the following command?": "command", |
| "Would you like to make the following edits?": "edit", |
| "Would you like to grant these permissions?": "permissions", |
| } |
|
|
| POSITIVE_LABELS: dict[str, tuple[str, ...]] = { |
| "command": ("yes, proceed",), |
| "edit": ("yes, proceed",), |
| "permissions": ("yes, grant these permissions for this turn",), |
| "network": ("yes, just this once",), |
| "mcp": ("yes, provide the requested info",), |
| } |
|
|
| NEGATIVE_SIGNALS: dict[str, tuple[str, ...]] = { |
| "command": ( |
| "no, continue without running it", |
| "no, and tell codex what to do differently", |
| ), |
| "edit": ("no, and tell codex what to do differently",), |
| "permissions": ("no, continue without permissions",), |
| "network": ( |
| "no, continue without running it", |
| "no, and tell codex what to do differently", |
| "no, and block this host in the future", |
| ), |
| "mcp": ("no, but continue without it", "cancel this request"), |
| } |
|
|
| BLOCKED_CODEX_ARGS = { |
| "-C", |
| "--cd", |
| "-a", |
| "--ask-for-approval", |
| "-s", |
| "--sandbox", |
| "--dangerously-bypass-approvals-and-sandbox", |
| "--yolo", |
| "--full-auto", |
| "--no-alt-screen", |
| } |
|
|
| CODEX_VALUE_OPTIONS = { |
| "-c", |
| "--config", |
| "--enable", |
| "--disable", |
| "--remote", |
| "--remote-auth-token-env", |
| "-i", |
| "--image", |
| "-m", |
| "--model", |
| "--local-provider", |
| "-p", |
| "--profile", |
| "--add-dir", |
| } |
|
|
|
|
| def normalize_line(line: str) -> str: |
| """Normalize a captured terminal row without joining physical rows.""" |
|
|
| line = ANSI_ESCAPE_RE.sub("", line).replace("\xa0", " ") |
| line = "".join(ch for ch in line if ch == "\t" or ord(ch) >= 32) |
| return re.sub(r"\s+", " ", line).strip() |
|
|
|
|
| def build_runtime_dir(session_prefix: str, explicit: str | None) -> Path: |
| if explicit: |
| return Path(explicit).expanduser().resolve() |
| return (Path.home() / ".runtime" / session_prefix).resolve() |
|
|
|
|
| def ensure_private_dir(path: Path) -> None: |
| path.mkdir(mode=0o700, parents=True, exist_ok=True) |
| try: |
| path.chmod(0o700) |
| except OSError: |
| pass |
|
|
|
|
| def register_session(config: Config, session_name: str) -> None: |
| """Record that a tmux session was created by this wrapper.""" |
|
|
| ensure_private_dir(config.sessions_dir) |
| marker = config.sessions_dir / session_name |
| fd = os.open(marker, os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o600) |
| with os.fdopen(fd, "w", encoding="utf-8") as handle: |
| handle.write(f"pid={os.getpid()}\n") |
|
|
|
|
| def unregister_session(config: Config, session_name: str) -> None: |
| try: |
| (config.sessions_dir / session_name).unlink() |
| except OSError: |
| pass |
|
|
|
|
| def registered_sessions(config: Config) -> set[str]: |
| try: |
| return { |
| marker.name |
| for marker in config.sessions_dir.iterdir() |
| if marker.is_file() |
| } |
| except OSError: |
| return set() |
|
|
|
|
| @dataclass(frozen=True) |
| class Config: |
| session_prefix: str |
| runtime_dir: Path |
| poll_interval: float |
| cooldown: float |
| stability_polls: int |
| rearm_interval: float |
| idle_exit_seconds: float |
| approve_mcp: bool |
| auto_trust_directory: bool |
| keep_dead_session: bool |
|
|
| @property |
| def log_path(self) -> Path: |
| return self.runtime_dir / "approve-debug.log" |
|
|
| @property |
| def pid_path(self) -> Path: |
| return self.runtime_dir / "approver-daemon.json" |
|
|
| @property |
| def lock_path(self) -> Path: |
| return self.runtime_dir / "approver-daemon.lock" |
|
|
| @property |
| def sessions_dir(self) -> Path: |
| return self.runtime_dir / "sessions" |
|
|
| def daemon_fingerprint(self) -> str: |
| relevant = { |
| "version": SCRIPT_VERSION, |
| "session_prefix": self.session_prefix, |
| "poll_interval": self.poll_interval, |
| "cooldown": self.cooldown, |
| "stability_polls": self.stability_polls, |
| "rearm_interval": self.rearm_interval, |
| "idle_exit_seconds": self.idle_exit_seconds, |
| "approve_mcp": self.approve_mcp, |
| "auto_trust_directory": self.auto_trust_directory, |
| } |
| encoded = json.dumps(relevant, sort_keys=True, separators=(",", ":")).encode() |
| return hashlib.sha256(encoded).hexdigest()[:16] |
|
|
|
|
| @dataclass(frozen=True) |
| class PaneInfo: |
| session_name: str |
| pane_id: str |
| pane_dead: bool |
|
|
|
|
| @dataclass(frozen=True) |
| class PromptCandidate: |
| kind: str |
| signature: str |
|
|
|
|
| @dataclass(frozen=True) |
| class DaemonRecord: |
| pid: int |
| token: str |
| script: str |
| fingerprint: str |
|
|
|
|
| class SessionState: |
| def __init__(self) -> None: |
| self.pending_signature: str | None = None |
| self.pending_count = 0 |
| self.active_signature: str | None = None |
| self.last_action_at = 0.0 |
|
|
| def clear_candidate(self) -> bool: |
| had_active = self.active_signature is not None |
| self.pending_signature = None |
| self.pending_count = 0 |
| self.active_signature = None |
| return had_active |
|
|
| def ready(self, candidate: PromptCandidate, now: float, config: Config) -> bool: |
| if ( |
| self.active_signature == candidate.signature |
| and now - self.last_action_at < config.rearm_interval |
| ): |
| self.pending_signature = None |
| self.pending_count = 0 |
| return False |
|
|
| if now - self.last_action_at < config.cooldown: |
| return False |
|
|
| if self.pending_signature == candidate.signature: |
| self.pending_count += 1 |
| else: |
| self.pending_signature = candidate.signature |
| self.pending_count = 1 |
|
|
| return self.pending_count >= config.stability_polls |
|
|
| def mark_approved(self, candidate: PromptCandidate, now: float) -> None: |
| self.active_signature = candidate.signature |
| self.last_action_at = now |
| self.pending_signature = None |
| self.pending_count = 0 |
|
|
|
|
| class SecureLogger: |
| def __init__(self, config: Config): |
| self.config = config |
|
|
| def _rotate_if_needed(self) -> None: |
| path = self.config.log_path |
| try: |
| if path.stat().st_size < MAX_LOG_BYTES: |
| return |
| except FileNotFoundError: |
| return |
| except OSError: |
| return |
|
|
| try: |
| oldest = path.with_name(f"{path.name}.{LOG_BACKUPS}") |
| if oldest.exists(): |
| oldest.unlink() |
| for idx in range(LOG_BACKUPS - 1, 0, -1): |
| source = path.with_name(f"{path.name}.{idx}") |
| target = path.with_name(f"{path.name}.{idx + 1}") |
| if source.exists(): |
| source.replace(target) |
| path.replace(path.with_name(f"{path.name}.1")) |
| except OSError: |
| pass |
|
|
| def write(self, message: str) -> None: |
| try: |
| ensure_private_dir(self.config.runtime_dir) |
| self._rotate_if_needed() |
| fd = os.open( |
| self.config.log_path, |
| os.O_WRONLY | os.O_CREAT | os.O_APPEND, |
| 0o600, |
| ) |
| with os.fdopen(fd, "a", encoding="utf-8") as handle: |
| ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
| handle.write(f"[{ts}] {message}\n") |
| except OSError: |
| pass |
|
|
|
|
| class TmuxClient: |
| def __init__(self, config: Config): |
| self.config = config |
| self.logger = SecureLogger(config) |
|
|
| def log(self, message: str) -> None: |
| self.logger.write(message) |
|
|
| def run(self, *args: str, timeout: float = 5.0) -> tuple[int, str, str]: |
| try: |
| result = subprocess.run( |
| ["tmux", *args], |
| capture_output=True, |
| text=True, |
| timeout=timeout, |
| ) |
| return result.returncode, result.stdout, result.stderr |
| except subprocess.TimeoutExpired: |
| return -1, "", "tmux command timed out" |
| except OSError as exc: |
| return -1, "", str(exc) |
|
|
| def list_panes(self) -> list[PaneInfo]: |
| rc, output, _ = self.run( |
| "list-panes", |
| "-a", |
| "-F", |
| "#{session_name}\t#{pane_id}\t#{pane_dead}", |
| ) |
| if rc != 0: |
| return [] |
|
|
| prefix = f"{self.config.session_prefix}-" |
| allowed_sessions = registered_sessions(self.config) |
| seen_prefixed_sessions: set[str] = set() |
| panes: list[PaneInfo] = [] |
| for raw in output.splitlines(): |
| parts = raw.split("\t") |
| if len(parts) != 3 or not parts[0].startswith(prefix): |
| continue |
| seen_prefixed_sessions.add(parts[0]) |
| if parts[0] not in allowed_sessions: |
| continue |
| panes.append( |
| PaneInfo( |
| session_name=parts[0], |
| pane_id=parts[1], |
| pane_dead=parts[2].strip() == "1", |
| ) |
| ) |
|
|
| for stale_session in allowed_sessions - seen_prefixed_sessions: |
| unregister_session(self.config, stale_session) |
| return panes |
|
|
| def capture_pane(self, pane_id: str) -> list[str]: |
| |
| |
| rc, output, _ = self.run("capture-pane", "-t", pane_id, "-p") |
| return output.splitlines() if rc == 0 else [] |
|
|
| def confirm_selected_choice(self, pane_id: str) -> bool: |
| |
| |
| |
| rc, _, _ = self.run("send-keys", "-t", pane_id, "Enter") |
| return rc == 0 |
|
|
| def session_exists(self, session_name: str) -> bool: |
| rc, _, _ = self.run("has-session", "-t", f"={session_name}") |
| return rc == 0 |
|
|
| def new_session( |
| self, |
| session_name: str, |
| argv: list[str], |
| cwd: Path, |
| cols: int, |
| rows: int, |
| ) -> tuple[bool, str]: |
| rc, _, error = self.run( |
| "new-session", |
| "-d", |
| "-s", |
| session_name, |
| "-c", |
| str(cwd), |
| "-x", |
| str(cols), |
| "-y", |
| str(rows), |
| *argv, |
| timeout=15.0, |
| ) |
| if rc != 0: |
| return False, error.strip() |
| self.run( |
| "set-window-option", |
| "-t", |
| f"={session_name}:", |
| "remain-on-exit", |
| "on" if self.config.keep_dead_session else "off", |
| ) |
| return True, "" |
|
|
| def kill_session(self, session_name: str) -> None: |
| self.run("kill-session", "-t", f"={session_name}") |
|
|
| def attach_or_switch(self, session_name: str) -> int: |
| if os.environ.get("TMUX"): |
| return subprocess.run( |
| ["tmux", "switch-client", "-t", f"={session_name}"] |
| ).returncode |
| return subprocess.run( |
| ["tmux", "attach-session", "-t", f"={session_name}"] |
| ).returncode |
|
|
|
|
| def _last_nonempty_row(lines: list[str]) -> int | None: |
| for idx in range(len(lines) - 1, -1, -1): |
| if lines[idx]: |
| return idx |
| return None |
|
|
|
|
| def _last_matching_row( |
| lines: list[str], |
| predicate, |
| *, |
| before: int | None = None, |
| ) -> int | None: |
| end = len(lines) - 1 if before is None else min(before - 1, len(lines) - 1) |
| for idx in range(end, -1, -1): |
| if predicate(lines[idx]): |
| return idx |
| return None |
|
|
|
|
| def _matching_tail_span( |
| lines: list[str], |
| regex: re.Pattern[str], |
| *, |
| max_rows: int = 3, |
| ) -> tuple[int, int] | None: |
| """Match a footer at the screen tail, including physical line wraps.""" |
|
|
| end = _last_nonempty_row(lines) |
| if end is None: |
| return None |
| lower = max(0, end - max_rows + 1) |
| for start in range(end, lower - 1, -1): |
| logical = " ".join(line for line in lines[start : end + 1] if line) |
| if regex.match(logical): |
| return start, end |
| return None |
|
|
|
|
| def _classify_standard_title(text: str, approve_mcp: bool) -> str | None: |
| folded = text.casefold() |
| for title, kind in STANDARD_TITLES.items(): |
| if folded == title.casefold(): |
| return kind |
| if NETWORK_TITLE_RE.match(text): |
| return "network" |
| if approve_mcp and MCP_TITLE_RE.match(text): |
| return "mcp" |
| return None |
|
|
|
|
| def _find_standard_title_span( |
| lines: list[str], |
| *, |
| before: int, |
| approve_mcp: bool, |
| ) -> tuple[int, int, str] | None: |
| """Find the nearest known title, tolerating up to three wrapped rows.""" |
|
|
| lower = max(0, before - MAX_OVERLAY_ROWS) |
| for end in range(before - 1, lower - 1, -1): |
| for start in range(end, max(lower, end - 2) - 1, -1): |
| logical = " ".join(line for line in lines[start : end + 1] if line) |
| kind = _classify_standard_title(logical, approve_mcp) |
| if kind is not None: |
| return start, end, kind |
| return None |
|
|
|
|
| def _selected_first_row(lines: list[str], before: int) -> tuple[int, str] | None: |
| for idx in range(before - 1, -1, -1): |
| match = SELECTED_FIRST_RE.match(lines[idx]) |
| if match: |
| return idx, match.group("label") |
| return None |
|
|
|
|
| def _contains_any(lines: list[str], values: tuple[str, ...]) -> bool: |
| |
| joined = " ".join(lines).casefold() |
| return any(value in joined for value in values) |
|
|
|
|
| def _candidate_signature(kind: str, overlay: list[str]) -> str: |
| material = "\n".join(line for line in overlay if line).encode("utf-8", "replace") |
| digest = hashlib.sha256(material).hexdigest()[:20] |
| return f"{kind}:{digest}" |
|
|
|
|
| def detect_standard_approval(lines: list[str], approve_mcp: bool) -> PromptCandidate | None: |
| footer = _matching_tail_span(lines, STANDARD_FOOTER_RE) |
| if footer is None: |
| return None |
| footer_start, footer_end = footer |
|
|
| selected = _selected_first_row(lines, footer_start) |
| if selected is None: |
| return None |
| selected_row, selected_label = selected |
|
|
| title = _find_standard_title_span( |
| lines, |
| before=selected_row, |
| approve_mcp=approve_mcp, |
| ) |
| if title is None: |
| return None |
| title_start, _, kind = title |
|
|
| label = selected_label.casefold() |
| if not any(label.startswith(prefix) for prefix in POSITIVE_LABELS[kind]): |
| return None |
|
|
| option_block = lines[selected_row + 1 : footer_start] |
| if not _contains_any(option_block, NEGATIVE_SIGNALS[kind]): |
| return None |
|
|
| body = lines[title_start:selected_row] |
| if kind == "command" and not any(line.startswith("$ ") for line in body): |
| return None |
| if kind == "permissions" and not any( |
| line.casefold().startswith("permission rule:") for line in body |
| ): |
| return None |
|
|
| overlay = lines[title_start : footer_end + 1] |
| return PromptCandidate(kind=kind, signature=_candidate_signature(kind, overlay)) |
|
|
|
|
| def detect_mcp_tool_approval(lines: list[str]) -> PromptCandidate | None: |
| footer = _matching_tail_span(lines, MCP_FORM_FOOTER_RE) |
| if footer is None: |
| return None |
| footer_start, footer_end = footer |
|
|
| selected = _selected_first_row(lines, footer_start) |
| if selected is None: |
| return None |
| selected_row, selected_label = selected |
| if not selected_label.casefold().startswith("allow"): |
| return None |
|
|
| lower = max(0, selected_row - MAX_OVERLAY_ROWS) |
| field_row = _last_matching_row( |
| lines, |
| lambda line: bool(re.match(r"^field 1/1$", line, re.IGNORECASE)), |
| before=selected_row, |
| ) |
| if field_row is None or field_row < lower: |
| return None |
| overlay = lines[field_row : footer_end + 1] |
| if not _contains_any(overlay, ("run the tool and continue",)): |
| return None |
| if not _contains_any(overlay, ("cancel this tool call",)): |
| return None |
|
|
| return PromptCandidate(kind="mcp_tool", signature=_candidate_signature("mcp_tool", overlay)) |
|
|
|
|
| def detect_trust_directory(lines: list[str]) -> PromptCandidate | None: |
| footer = _matching_tail_span(lines, TRUST_FOOTER_RE) |
| if footer is None: |
| return None |
| footer_start, footer_end = footer |
|
|
| selected = _selected_first_row(lines, footer_start) |
| if selected is None: |
| return None |
| selected_row, label = selected |
| if not label.casefold().startswith("yes, continue"): |
| return None |
|
|
| overlay = lines[max(0, selected_row - 20) : footer_end + 1] |
| if not _contains_any(overlay, ("do you trust the contents of this directory?",)): |
| return None |
| if not _contains_any(overlay, ("no, quit",)): |
| return None |
| return PromptCandidate(kind="trust_directory", signature=_candidate_signature("trust", overlay)) |
|
|
|
|
| def detect_candidate(raw_lines: list[str], config: Config) -> PromptCandidate | None: |
| lines = [normalize_line(line) for line in raw_lines] |
| candidate = detect_standard_approval(lines, config.approve_mcp) |
| if candidate is not None: |
| return candidate |
| if config.approve_mcp: |
| candidate = detect_mcp_tool_approval(lines) |
| if candidate is not None: |
| return candidate |
| if config.auto_trust_directory: |
| return detect_trust_directory(lines) |
| return None |
|
|
|
|
| def _write_daemon_record(config: Config, record: DaemonRecord) -> None: |
| ensure_private_dir(config.runtime_dir) |
| temporary = config.pid_path.with_name(f".{config.pid_path.name}.{os.getpid()}.tmp") |
| data = { |
| "pid": record.pid, |
| "token": record.token, |
| "script": record.script, |
| "fingerprint": record.fingerprint, |
| } |
| fd = os.open(temporary, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600) |
| try: |
| with os.fdopen(fd, "w", encoding="utf-8") as handle: |
| json.dump(data, handle, sort_keys=True) |
| handle.write("\n") |
| temporary.replace(config.pid_path) |
| config.pid_path.chmod(0o600) |
| finally: |
| try: |
| temporary.unlink() |
| except FileNotFoundError: |
| pass |
|
|
|
|
| def _read_daemon_record(config: Config) -> DaemonRecord | None: |
| try: |
| data = json.loads(config.pid_path.read_text(encoding="utf-8")) |
| return DaemonRecord( |
| pid=int(data["pid"]), |
| token=str(data["token"]), |
| script=str(data["script"]), |
| fingerprint=str(data["fingerprint"]), |
| ) |
| except (FileNotFoundError, OSError, ValueError, KeyError, TypeError, json.JSONDecodeError): |
| return None |
|
|
|
|
| def _process_matches(record: DaemonRecord) -> bool: |
| try: |
| os.kill(record.pid, 0) |
| cmdline = Path(f"/proc/{record.pid}/cmdline").read_bytes().split(b"\0") |
| except (FileNotFoundError, ProcessLookupError, PermissionError, OSError): |
| return False |
|
|
| decoded = [part.decode("utf-8", "replace") for part in cmdline if part] |
| return ( |
| record.token in decoded |
| and "--daemon" in decoded |
| and str(Path(record.script).resolve()) |
| in { |
| str(Path(part).resolve()) |
| for part in decoded |
| if part.startswith("/") |
| } |
| ) |
|
|
|
|
| def daemon_status(config: Config) -> tuple[bool, DaemonRecord | None, str | None]: |
| record = _read_daemon_record(config) |
| if record is None or not _process_matches(record): |
| return False, record, None |
| mismatch = None |
| if record.fingerprint != config.daemon_fingerprint(): |
| mismatch = ( |
| f"running daemon settings are {record.fingerprint}, requested settings are " |
| f"{config.daemon_fingerprint()}" |
| ) |
| return True, record, mismatch |
|
|
|
|
| def _acquire_daemon_lock(config: Config) -> TextIO | None: |
| ensure_private_dir(config.runtime_dir) |
| handle = config.lock_path.open("a+", encoding="utf-8") |
| try: |
| config.lock_path.chmod(0o600) |
| fcntl.flock(handle.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) |
| except (BlockingIOError, OSError): |
| handle.close() |
| return None |
| return handle |
|
|
|
|
| def daemon_loop(config: Config, token: str) -> int: |
| lock_handle = _acquire_daemon_lock(config) |
| if lock_handle is None: |
| return 0 |
|
|
| tmux = TmuxClient(config) |
| record = DaemonRecord( |
| pid=os.getpid(), |
| token=token, |
| script=str(Path(__file__).resolve()), |
| fingerprint=config.daemon_fingerprint(), |
| ) |
| _write_daemon_record(config, record) |
| tmux.log( |
| f"daemon_start pid={record.pid} prefix={config.session_prefix} " |
| f"settings={record.fingerprint}" |
| ) |
|
|
| running = True |
|
|
| def handle_signal(signum, _frame) -> None: |
| nonlocal running |
| tmux.log(f"daemon_signal signum={signum}") |
| running = False |
|
|
| signal.signal(signal.SIGTERM, handle_signal) |
| signal.signal(signal.SIGINT, handle_signal) |
| signal.signal(signal.SIGHUP, signal.SIG_IGN) |
|
|
| states: dict[str, SessionState] = {} |
| idle_started_at: float | None = None |
|
|
| try: |
| while running: |
| panes = tmux.list_panes() |
| live_panes = [pane for pane in panes if not pane.pane_dead] |
| live_session_names = {pane.session_name for pane in live_panes} |
| for pane in panes: |
| if pane.pane_dead and pane.session_name not in live_session_names: |
| unregister_session(config, pane.session_name) |
| now = time.monotonic() |
| if not live_panes: |
| if idle_started_at is None: |
| idle_started_at = now |
| elif now - idle_started_at >= config.idle_exit_seconds: |
| tmux.log("daemon_exit reason=no_matching_panes") |
| break |
| else: |
| idle_started_at = None |
|
|
| visible_ids = {pane.pane_id for pane in live_panes} |
| for pane_id in list(states): |
| if pane_id not in visible_ids: |
| states.pop(pane_id, None) |
| tmux.log(f"pane_removed pane={pane_id}") |
|
|
| for pane in live_panes: |
| state = states.setdefault(pane.pane_id, SessionState()) |
| candidate = detect_candidate(tmux.capture_pane(pane.pane_id), config) |
| if candidate is None: |
| if state.clear_candidate(): |
| tmux.log(f"clear_active pane={pane.pane_id}") |
| continue |
|
|
| previous_pending = state.pending_signature |
| if not state.ready(candidate, now, config): |
| if previous_pending != candidate.signature and state.pending_signature: |
| tmux.log( |
| f"pending pane={pane.pane_id} kind={candidate.kind} " |
| f"sig={candidate.signature}" |
| ) |
| continue |
|
|
| tmux.log( |
| f"approve pane={pane.pane_id} kind={candidate.kind} " |
| f"sig={candidate.signature}" |
| ) |
| if tmux.confirm_selected_choice(pane.pane_id): |
| state.mark_approved(candidate, time.monotonic()) |
| else: |
| tmux.log(f"approve_failed pane={pane.pane_id} kind={candidate.kind}") |
| state.pending_signature = None |
| state.pending_count = 0 |
|
|
| time.sleep(config.poll_interval) |
| finally: |
| current = _read_daemon_record(config) |
| if current == record: |
| try: |
| config.pid_path.unlink() |
| except OSError: |
| pass |
| tmux.log("daemon_stop") |
| try: |
| fcntl.flock(lock_handle.fileno(), fcntl.LOCK_UN) |
| except OSError: |
| pass |
| lock_handle.close() |
| return 0 |
|
|
|
|
| def start_daemon(config: Config) -> tuple[bool, str | None]: |
| alive, _, mismatch = daemon_status(config) |
| if alive: |
| return mismatch is None, mismatch |
|
|
| ensure_private_dir(config.runtime_dir) |
| try: |
| config.pid_path.unlink() |
| except OSError: |
| pass |
|
|
| token = secrets.token_hex(16) |
| command = [ |
| sys.executable, |
| str(Path(__file__).resolve()), |
| "--daemon", |
| "--daemon-token", |
| token, |
| "--session-prefix", |
| config.session_prefix, |
| "--runtime-dir", |
| str(config.runtime_dir), |
| "--poll-interval", |
| str(config.poll_interval), |
| "--cooldown", |
| str(config.cooldown), |
| "--stability-polls", |
| str(config.stability_polls), |
| "--rearm-interval", |
| str(config.rearm_interval), |
| "--idle-exit-seconds", |
| str(config.idle_exit_seconds), |
| ] |
| if config.approve_mcp: |
| command.append("--approve-mcp") |
| if config.auto_trust_directory: |
| command.append("--auto-trust-directory") |
|
|
| with open(os.devnull, "rb") as devnull_in, open(os.devnull, "ab") as devnull_out: |
| subprocess.Popen( |
| command, |
| stdin=devnull_in, |
| stdout=devnull_out, |
| stderr=devnull_out, |
| start_new_session=True, |
| close_fds=True, |
| ) |
|
|
| deadline = time.monotonic() + 5.0 |
| while time.monotonic() < deadline: |
| alive, _, mismatch = daemon_status(config) |
| if alive: |
| return mismatch is None, mismatch |
| time.sleep(0.1) |
| return False, "daemon did not become ready within 5 seconds" |
|
|
|
|
| def stop_daemon(config: Config) -> str: |
| alive, record, _ = daemon_status(config) |
| if not alive or record is None: |
| try: |
| config.pid_path.unlink() |
| except OSError: |
| pass |
| return "not_running" |
|
|
| try: |
| os.kill(record.pid, signal.SIGTERM) |
| except ProcessLookupError: |
| pass |
|
|
| deadline = time.monotonic() + 5.0 |
| while time.monotonic() < deadline: |
| if not _process_matches(record): |
| break |
| time.sleep(0.1) |
| if _process_matches(record): |
| return "timeout" |
| try: |
| config.pid_path.unlink() |
| except OSError: |
| pass |
| return "stopped" |
|
|
|
|
| def list_matching_sessions(config: Config) -> list[str]: |
| return sorted( |
| { |
| pane.session_name |
| for pane in TmuxClient(config).list_panes() |
| if not pane.pane_dead |
| } |
| ) |
|
|
|
|
| def _normalize_executable(path: Path) -> Path | None: |
| try: |
| resolved = path.expanduser().resolve(strict=True) |
| except (FileNotFoundError, OSError): |
| return None |
| if not resolved.is_file() or not os.access(resolved, os.X_OK): |
| return None |
| return resolved |
|
|
|
|
| def probe_codex_binary(path: Path, timeout: float = 5.0) -> bool: |
| candidate = _normalize_executable(path) |
| if candidate is None: |
| return False |
| try: |
| result = subprocess.run( |
| [str(candidate), "--version"], |
| capture_output=True, |
| text=True, |
| timeout=timeout, |
| ) |
| except (OSError, subprocess.SubprocessError): |
| return False |
| output = f"{result.stdout}\n{result.stderr}".casefold() |
| return result.returncode == 0 and ("codex-cli" in output or "openai codex" in output) |
|
|
|
|
| def find_codex(explicit: str | None = None) -> str | None: |
| if explicit: |
| normalized = _normalize_executable(Path(explicit)) |
| if normalized is not None and probe_codex_binary(normalized): |
| return str(normalized) |
| return None |
|
|
| raw_candidates: list[Path] = [] |
| on_path = shutil.which("codex") |
| if on_path: |
| raw_candidates.append(Path(on_path)) |
| raw_candidates.append(Path.home() / ".local" / "bin" / "codex") |
|
|
| seen: set[Path] = set() |
| for raw in raw_candidates: |
| normalized = _normalize_executable(raw) |
| if normalized is None or normalized in seen: |
| continue |
| seen.add(normalized) |
| if probe_codex_binary(normalized): |
| return str(normalized) |
| return None |
|
|
|
|
| def validate_codex_args(parser: argparse.ArgumentParser, codex_args: list[str]) -> None: |
| idx = 0 |
| while idx < len(codex_args): |
| arg = codex_args[idx] |
| key = arg.split("=", 1)[0] |
| if key in BLOCKED_CODEX_ARGS: |
| parser.error( |
| f"Codex option {arg!r} conflicts with the wrapper; use the wrapper's " |
| "--cd/--sandbox/--bypass options instead" |
| ) |
| if arg in {"-h", "--help", "-V", "--version"}: |
| parser.error( |
| f"Codex option {arg!r} exits instead of starting an interactive session" |
| ) |
| if arg == "--" or not arg.startswith("-"): |
| parser.error( |
| f"unexpected positional Codex argument {arg!r}; " |
| "use the wrapper's -p/--prompt for the initial or follow-up prompt" |
| ) |
|
|
| if key in CODEX_VALUE_OPTIONS and "=" not in arg: |
| if idx + 1 >= len(codex_args): |
| parser.error(f"Codex option {arg!r} requires a value") |
| idx += 2 |
| else: |
| idx += 1 |
|
|
|
|
| def build_codex_argv( |
| codex_binary: str, |
| prompt: str | None, |
| cwd: Path, |
| codex_args: list[str], |
| codex_profile: str | None, |
| sandbox: str, |
| bypass: bool, |
| resume_session: str | None = None, |
| resume_last: bool = False, |
| ) -> list[str]: |
| if resume_session is not None and resume_last: |
| raise ValueError("resume_session and resume_last are mutually exclusive") |
| if resume_session == "" and prompt is not None: |
| raise ValueError("the resume picker cannot accept an initial prompt") |
|
|
| resuming = resume_session is not None or resume_last |
| argv = [codex_binary] |
| if resuming: |
| argv.append("resume") |
| argv.extend(codex_args) |
| if codex_profile: |
| argv.extend(["--profile", codex_profile]) |
| argv.extend(["--no-alt-screen", "-C", str(cwd)]) |
| if bypass: |
| argv.append("--dangerously-bypass-approvals-and-sandbox") |
| else: |
| argv.extend( |
| [ |
| "-a", |
| "on-request", |
| "-s", |
| sandbox, |
| "-c", |
| 'approvals_reviewer="user"', |
| ] |
| ) |
| if resume_last: |
| argv.append("--last") |
|
|
| positionals: list[str] = [] |
| if resume_session: |
| positionals.append(resume_session) |
| if prompt is not None: |
| positionals.append(prompt) |
| if positionals: |
| argv.append("--") |
| argv.extend(positionals) |
| return argv |
|
|
|
|
| def unique_session_name(config: Config) -> str: |
| stamp = datetime.now().strftime("%Y%m%d-%H%M%S") |
| return f"{config.session_prefix}-{stamp}-{os.getpid()}-{secrets.token_hex(2)}" |
|
|
|
|
| def run_launcher(args: argparse.Namespace, config: Config, prompt: str | None) -> int: |
| if shutil.which("tmux") is None: |
| print("Error: tmux is required", file=sys.stderr) |
| return 1 |
|
|
| codex_binary = find_codex(args.codex_binary) |
| if codex_binary is None: |
| detail = f" at {args.codex_binary}" if args.codex_binary else "" |
| print(f"Error: no healthy Codex binary found{detail}", file=sys.stderr) |
| return 1 |
|
|
| cwd = Path(args.cd).expanduser().resolve() |
| if not cwd.is_dir(): |
| print(f"Error: working directory does not exist: {cwd}", file=sys.stderr) |
| return 1 |
|
|
| argv = build_codex_argv( |
| codex_binary=codex_binary, |
| prompt=prompt, |
| cwd=cwd, |
| codex_args=args.codex_args, |
| codex_profile=args.codex_profile, |
| sandbox=args.sandbox, |
| bypass=args.bypass, |
| resume_session=args.resume, |
| resume_last=args.resume_last, |
| ) |
| session_name = unique_session_name(config) |
| try: |
| cols, rows = os.get_terminal_size() |
| except OSError: |
| cols, rows = 120, 40 |
|
|
| tmux = TmuxClient(config) |
| created, error = tmux.new_session(session_name, argv, cwd, cols, rows) |
| if not created: |
| print( |
| f"Error: failed to create tmux session: {error or 'unknown tmux error'}", |
| file=sys.stderr, |
| ) |
| return 1 |
|
|
| try: |
| register_session(config, session_name) |
| except OSError as exc: |
| tmux.kill_session(session_name) |
| print(f"Error: failed to register tmux session: {exc}", file=sys.stderr) |
| return 1 |
|
|
| ok, daemon_error = start_daemon(config) |
| if not ok: |
| unregister_session(config, session_name) |
| tmux.kill_session(session_name) |
| print(f"Error: failed to start approval daemon: {daemon_error}", file=sys.stderr) |
| print( |
| "If a daemon with different settings is running, stop it with --stop-daemon.", |
| file=sys.stderr, |
| ) |
| return 1 |
|
|
| |
| |
| |
| tmux.log(f"launcher_ready session={session_name}") |
|
|
| print(f"Codex running in tmux session: {session_name}") |
| print(f"Codex binary: {codex_binary}") |
| print(f"Working directory: {cwd}") |
| if args.resume_last: |
| print("Resume mode: most recent session in this working directory") |
| elif args.resume == "": |
| print("Resume mode: interactive session picker") |
| elif args.resume is not None: |
| print(f"Resume mode: session {args.resume}") |
| print(f"Approval log: {config.log_path}") |
| print(f"Reattach: tmux attach -t {shlex.quote(session_name)}") |
| if config.approve_mcp: |
| print("MCP/app approval: enabled (external side effects may be approved)") |
| if config.auto_trust_directory: |
| print("Automatic directory trust: enabled") |
| if args.bypass: |
| print("WARNING: Codex approvals and sandbox are disabled for this session") |
| print() |
|
|
| if args.detach: |
| return 0 |
|
|
| attach_result = 0 |
| try: |
| attach_result = tmux.attach_or_switch(session_name) |
| except KeyboardInterrupt: |
| pass |
| finally: |
| if tmux.session_exists(session_name): |
| print(f"Session still running: tmux attach -t {shlex.quote(session_name)}") |
| if attach_result != 0: |
| print("Error: failed to attach or switch to the tmux session", file=sys.stderr) |
| return attach_result |
|
|
|
|
| def parse_args(argv: list[str] | None = None) -> tuple[argparse.ArgumentParser, argparse.Namespace]: |
| parser = argparse.ArgumentParser( |
| description="Interactive Codex tmux launcher with strict auto-approval detection", |
| epilog=( |
| "Put Codex-specific options after '--'. The wrapper reserves -p for the " |
| "initial or resumed-session follow-up prompt." |
| ), |
| ) |
| control = parser.add_mutually_exclusive_group() |
| control.add_argument("--daemon", action="store_true", help=argparse.SUPPRESS) |
| control.add_argument("--start-daemon", action="store_true", help="start the watcher only") |
| control.add_argument("--stop-daemon", action="store_true", help="stop the watcher") |
| control.add_argument("--status", action="store_true", help="show watcher and tmux status") |
| control.add_argument( |
| "--resume", |
| nargs="?", |
| const="", |
| metavar="SESSION_ID", |
| help=( |
| "resume an interactive Codex session by ID/name; omit SESSION_ID " |
| "to open Codex's session picker" |
| ), |
| ) |
| control.add_argument( |
| "--resume-last", |
| action="store_true", |
| help="resume the most recent interactive session for --cd", |
| ) |
|
|
| prompt_group = parser.add_mutually_exclusive_group() |
| prompt_group.add_argument( |
| "-p", |
| "--prompt", |
| help="initial Codex prompt, or follow-up prompt when resuming", |
| ) |
| prompt_group.add_argument( |
| "--prompt-file", |
| help="read the initial/follow-up prompt from a UTF-8 file, or - for stdin", |
| ) |
|
|
| parser.add_argument("-C", "--cd", default=os.getcwd(), help="Codex working directory") |
| parser.add_argument("--codex-binary", help="explicit Codex executable") |
| parser.add_argument("--codex-profile", help="Codex config profile (Codex's own -p)") |
| parser.add_argument( |
| "--sandbox", |
| choices=("read-only", "workspace-write"), |
| default="workspace-write", |
| help="Codex sandbox used with interactive approvals", |
| ) |
| parser.add_argument( |
| "--bypass", |
| action="store_true", |
| help="DANGEROUS: disable Codex approvals and sandbox (isolated environments only)", |
| ) |
| parser.add_argument( |
| "--approve-mcp", |
| action="store_true", |
| help="also auto-approve MCP/app tool calls that may have external side effects", |
| ) |
| parser.add_argument( |
| "--auto-trust-directory", |
| action="store_true", |
| help="automatically trust a directory on Codex's first-run screen", |
| ) |
| parser.add_argument( |
| "--detach", |
| action="store_true", |
| help="create the session without attaching", |
| ) |
| parser.add_argument( |
| "--keep-dead-session", |
| action="store_true", |
| help="keep the tmux session after Codex exits", |
| ) |
| parser.add_argument( |
| "--session-prefix", |
| default=DEFAULT_SESSION_PREFIX, |
| help="tmux session prefix", |
| ) |
| parser.add_argument("--runtime-dir", help="directory for daemon state and logs") |
| parser.add_argument( |
| "--poll-interval", |
| type=float, |
| default=DEFAULT_POLL_INTERVAL, |
| help="watcher polling interval in seconds", |
| ) |
| parser.add_argument( |
| "--cooldown", |
| type=float, |
| default=DEFAULT_COOLDOWN, |
| help="minimum seconds between approvals in one pane", |
| ) |
| parser.add_argument( |
| "--stability-polls", |
| type=int, |
| default=DEFAULT_STABILITY_POLLS, |
| help="identical captures required before approval", |
| ) |
| parser.add_argument( |
| "--rearm-interval", |
| type=float, |
| default=DEFAULT_REARM_INTERVAL, |
| help="retry an unchanged approval after this many seconds", |
| ) |
| parser.add_argument( |
| "--idle-exit-seconds", |
| type=float, |
| default=DEFAULT_IDLE_EXIT_SECONDS, |
| help="watcher exits after no matching panes for this duration", |
| ) |
| parser.add_argument("--daemon-token", help=argparse.SUPPRESS) |
| parser.add_argument( |
| "codex_args", |
| nargs=argparse.REMAINDER, |
| help="Codex options after --", |
| ) |
|
|
| args = parser.parse_args(argv) |
| if args.codex_args and args.codex_args[0] == "--": |
| args.codex_args = args.codex_args[1:] |
|
|
| if not re.fullmatch(r"[A-Za-z0-9][A-Za-z0-9_-]{0,47}", args.session_prefix): |
| parser.error( |
| "--session-prefix must contain only letters, digits, '_' or '-' " |
| "(max 48 chars)" |
| ) |
| if args.poll_interval < 0.05: |
| parser.error("--poll-interval must be at least 0.05 seconds") |
| if args.cooldown < 0: |
| parser.error("--cooldown cannot be negative") |
| if args.stability_polls < 1: |
| parser.error("--stability-polls must be at least 1") |
| if args.rearm_interval < args.cooldown: |
| parser.error("--rearm-interval must be greater than or equal to --cooldown") |
| if args.idle_exit_seconds < 1: |
| parser.error("--idle-exit-seconds must be at least 1 second") |
| if args.daemon and not args.daemon_token: |
| parser.error("internal daemon token is required") |
| if args.resume == "" and (args.prompt is not None or args.prompt_file is not None): |
| parser.error( |
| "the resume picker cannot be combined with -p/--prompt or --prompt-file; " |
| "select a session first, then enter the prompt in Codex" |
| ) |
| validate_codex_args(parser, args.codex_args) |
| passthrough_keys = { |
| value.split("=", 1)[0] |
| for value in args.codex_args |
| if value.startswith("-") |
| } |
| if "--last" in passthrough_keys: |
| parser.error("Codex option '--last' conflicts with the wrapper; use --resume-last") |
| selector_mode = args.resume == "" or args.resume_last |
| for option in ("--all", "--include-non-interactive"): |
| if option in passthrough_keys and not selector_mode: |
| parser.error( |
| f"Codex option {option!r} is only valid with --resume or --resume-last" |
| ) |
| return parser, args |
|
|
|
|
| def build_config(args: argparse.Namespace) -> Config: |
| return Config( |
| session_prefix=args.session_prefix, |
| runtime_dir=build_runtime_dir(args.session_prefix, args.runtime_dir), |
| poll_interval=args.poll_interval, |
| cooldown=args.cooldown, |
| stability_polls=args.stability_polls, |
| rearm_interval=args.rearm_interval, |
| idle_exit_seconds=args.idle_exit_seconds, |
| approve_mcp=args.approve_mcp, |
| auto_trust_directory=args.auto_trust_directory, |
| keep_dead_session=args.keep_dead_session, |
| ) |
|
|
|
|
| def load_prompt(args: argparse.Namespace) -> str | None: |
| if args.prompt_file is None: |
| return args.prompt |
| if args.prompt_file == "-": |
| return sys.stdin.read() |
| return Path(args.prompt_file).expanduser().read_text(encoding="utf-8") |
|
|
|
|
| def main(argv: list[str] | None = None) -> int: |
| parser, args = parse_args(argv) |
| config = build_config(args) |
|
|
| if args.daemon: |
| return daemon_loop(config, args.daemon_token) |
|
|
| if args.stop_daemon: |
| result = stop_daemon(config) |
| if result == "stopped": |
| print("Daemon stopped") |
| return 0 |
| if result == "not_running": |
| print("Daemon was not running") |
| return 0 |
| print("Daemon did not stop within 5 seconds; state was preserved", file=sys.stderr) |
| return 1 |
|
|
| if args.start_daemon: |
| ok, error = start_daemon(config) |
| print("Daemon: running" if ok else f"Daemon: failed ({error})") |
| return 0 if ok else 1 |
|
|
| if args.status: |
| alive, record, mismatch = daemon_status(config) |
| print(f"Daemon: {'running' if alive else 'not running'}") |
| if alive and record: |
| print(f"Daemon PID: {record.pid}") |
| print(f"Daemon settings: {record.fingerprint}") |
| if mismatch: |
| print(f"Settings mismatch: {mismatch}") |
| sessions = list_matching_sessions(config) |
| print(f"Active sessions: {', '.join(sessions) if sessions else 'none'}") |
| print(f"Runtime dir: {config.runtime_dir}") |
| return 0 |
|
|
| try: |
| prompt = load_prompt(args) |
| except OSError as exc: |
| parser.error(f"could not read --prompt-file: {exc}") |
| return run_launcher(args, config, prompt) |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|