| """ |
| Process Registry -- In-memory registry for managed background processes. |
| |
| Tracks processes spawned via terminal(background=true), providing: |
| - Output buffering (rolling 200KB window) |
| - Status polling and log retrieval |
| - Blocking wait with interrupt support |
| - Process killing |
| - Crash recovery via JSON checkpoint file |
| - Session-scoped tracking for gateway reset protection |
| |
| Background processes execute THROUGH the environment interface -- nothing |
| runs on the host machine unless TERMINAL_ENV=local. For Docker, Singularity, |
| Modal, Daytona, and SSH backends, the command runs inside the sandbox. |
| |
| Usage: |
| from tools.process_registry import process_registry |
| |
| # Spawn a background process (called from terminal_tool) |
| session = process_registry.spawn(env, "pytest -v", task_id="task_123") |
| |
| # Poll for status |
| result = process_registry.poll(session.id) |
| |
| # Block until done |
| result = process_registry.wait(session.id, timeout=300) |
| |
| # Kill it |
| process_registry.kill(session.id) |
| """ |
|
|
| import json |
| import logging |
| import os |
| import platform |
| import shlex |
| import shutil |
| import signal |
| import subprocess |
| import threading |
| import time |
| import uuid |
|
|
| _IS_WINDOWS = platform.system() == "Windows" |
| from tools.environments.local import _find_shell, _sanitize_subprocess_env |
| from dataclasses import dataclass, field |
| from pathlib import Path |
| from typing import Any, Dict, List, Optional |
|
|
| from hermes_cli.config import get_hermes_home |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| |
| CHECKPOINT_PATH = get_hermes_home() / "processes.json" |
|
|
| |
| MAX_OUTPUT_CHARS = 200_000 |
| FINISHED_TTL_SECONDS = 1800 |
| MAX_PROCESSES = 64 |
|
|
|
|
| @dataclass |
| class ProcessSession: |
| """A tracked background process with output buffering.""" |
| id: str |
| command: str |
| task_id: str = "" |
| session_key: str = "" |
| pid: Optional[int] = None |
| process: Optional[subprocess.Popen] = None |
| env_ref: Any = None |
| cwd: Optional[str] = None |
| started_at: float = 0.0 |
| exited: bool = False |
| exit_code: Optional[int] = None |
| output_buffer: str = "" |
| max_output_chars: int = MAX_OUTPUT_CHARS |
| detached: bool = False |
| |
| watcher_platform: str = "" |
| watcher_chat_id: str = "" |
| watcher_thread_id: str = "" |
| watcher_interval: int = 0 |
| _lock: threading.Lock = field(default_factory=threading.Lock) |
| _reader_thread: Optional[threading.Thread] = field(default=None, repr=False) |
| _pty: Any = field(default=None, repr=False) |
|
|
|
|
| class ProcessRegistry: |
| """ |
| In-memory registry of running and finished background processes. |
| |
| Thread-safe. Accessed from: |
| - Executor threads (terminal_tool, process tool handlers) |
| - Gateway asyncio loop (watcher tasks, session reset checks) |
| - Cleanup thread (sandbox reaping coordination) |
| """ |
|
|
| _SHELL_NOISE_SUBSTRINGS = ( |
| "bash: cannot set terminal process group", |
| "bash: no job control in this shell", |
| "no job control in this shell", |
| "cannot set terminal process group", |
| "tcsetattr: Inappropriate ioctl for device", |
| ) |
|
|
| def __init__(self): |
| self._running: Dict[str, ProcessSession] = {} |
| self._finished: Dict[str, ProcessSession] = {} |
| self._lock = threading.Lock() |
|
|
| |
| self.pending_watchers: List[Dict[str, Any]] = [] |
|
|
| @staticmethod |
| def _clean_shell_noise(text: str) -> str: |
| """Strip shell startup warnings from the beginning of output.""" |
| lines = text.split("\n") |
| while lines and any(noise in lines[0] for noise in ProcessRegistry._SHELL_NOISE_SUBSTRINGS): |
| lines.pop(0) |
| return "\n".join(lines) |
|
|
| |
|
|
| def spawn_local( |
| self, |
| command: str, |
| cwd: str = None, |
| task_id: str = "", |
| session_key: str = "", |
| env_vars: dict = None, |
| use_pty: bool = False, |
| ) -> ProcessSession: |
| """ |
| Spawn a background process locally. |
| |
| Only for TERMINAL_ENV=local. Other backends use spawn_via_env(). |
| |
| Args: |
| use_pty: If True, use a pseudo-terminal via ptyprocess for interactive |
| CLI tools (Codex, Claude Code, Python REPL). Falls back to |
| subprocess.Popen if ptyprocess is not installed. |
| """ |
| session = ProcessSession( |
| id=f"proc_{uuid.uuid4().hex[:12]}", |
| command=command, |
| task_id=task_id, |
| session_key=session_key, |
| cwd=cwd or os.getcwd(), |
| started_at=time.time(), |
| ) |
|
|
| if use_pty: |
| |
| try: |
| if _IS_WINDOWS: |
| from winpty import PtyProcess as _PtyProcessCls |
| else: |
| from ptyprocess import PtyProcess as _PtyProcessCls |
| user_shell = _find_shell() |
| pty_env = _sanitize_subprocess_env(os.environ, env_vars) |
| pty_env["PYTHONUNBUFFERED"] = "1" |
| pty_proc = _PtyProcessCls.spawn( |
| [user_shell, "-lic", command], |
| cwd=session.cwd, |
| env=pty_env, |
| dimensions=(30, 120), |
| ) |
| session.pid = pty_proc.pid |
| |
| session._pty = pty_proc |
|
|
| |
| reader = threading.Thread( |
| target=self._pty_reader_loop, |
| args=(session,), |
| daemon=True, |
| name=f"proc-pty-reader-{session.id}", |
| ) |
| session._reader_thread = reader |
| reader.start() |
|
|
| with self._lock: |
| self._prune_if_needed() |
| self._running[session.id] = session |
|
|
| self._write_checkpoint() |
| return session |
|
|
| except ImportError: |
| logger.warning("ptyprocess not installed, falling back to pipe mode") |
| except Exception as e: |
| logger.warning("PTY spawn failed (%s), falling back to pipe mode", e) |
|
|
| |
| |
| |
| user_shell = _find_shell() |
| |
| |
| |
| bg_env = _sanitize_subprocess_env(os.environ, env_vars) |
| bg_env["PYTHONUNBUFFERED"] = "1" |
| proc = subprocess.Popen( |
| [user_shell, "-lic", command], |
| text=True, |
| cwd=session.cwd, |
| env=bg_env, |
| encoding="utf-8", |
| errors="replace", |
| stdout=subprocess.PIPE, |
| stderr=subprocess.STDOUT, |
| stdin=subprocess.PIPE, |
| preexec_fn=None if _IS_WINDOWS else os.setsid, |
| ) |
|
|
| session.process = proc |
| session.pid = proc.pid |
|
|
| |
| reader = threading.Thread( |
| target=self._reader_loop, |
| args=(session,), |
| daemon=True, |
| name=f"proc-reader-{session.id}", |
| ) |
| session._reader_thread = reader |
| reader.start() |
|
|
| with self._lock: |
| self._prune_if_needed() |
| self._running[session.id] = session |
|
|
| self._write_checkpoint() |
| return session |
|
|
| def spawn_via_env( |
| self, |
| env: Any, |
| command: str, |
| cwd: str = None, |
| task_id: str = "", |
| session_key: str = "", |
| timeout: int = 10, |
| ) -> ProcessSession: |
| """ |
| Spawn a background process through a non-local environment backend. |
| |
| For Docker/Singularity/Modal/Daytona/SSH: runs the command inside the sandbox |
| using the environment's execute() interface. We wrap the command to |
| capture the in-sandbox PID and redirect output to a log file inside |
| the sandbox, then poll the log via subsequent execute() calls. |
| |
| This is less capable than local spawn (no live stdout pipe, no stdin), |
| but it ensures the command runs in the correct sandbox context. |
| """ |
| session = ProcessSession( |
| id=f"proc_{uuid.uuid4().hex[:12]}", |
| command=command, |
| task_id=task_id, |
| session_key=session_key, |
| cwd=cwd, |
| started_at=time.time(), |
| env_ref=env, |
| ) |
|
|
| |
| log_path = f"/tmp/hermes_bg_{session.id}.log" |
| pid_path = f"/tmp/hermes_bg_{session.id}.pid" |
| quoted_command = shlex.quote(command) |
| bg_command = ( |
| f"nohup bash -c {quoted_command} > {log_path} 2>&1 & " |
| f"echo $! > {pid_path} && cat {pid_path}" |
| ) |
|
|
| try: |
| result = env.execute(bg_command, timeout=timeout) |
| output = result.get("output", "").strip() |
| |
| for line in output.splitlines(): |
| line = line.strip() |
| if line.isdigit(): |
| session.pid = int(line) |
| break |
| except Exception as e: |
| session.exited = True |
| session.exit_code = -1 |
| session.output_buffer = f"Failed to start: {e}" |
|
|
| if not session.exited: |
| |
| reader = threading.Thread( |
| target=self._env_poller_loop, |
| args=(session, env, log_path, pid_path), |
| daemon=True, |
| name=f"proc-poller-{session.id}", |
| ) |
| session._reader_thread = reader |
| reader.start() |
|
|
| with self._lock: |
| self._prune_if_needed() |
| self._running[session.id] = session |
|
|
| self._write_checkpoint() |
| return session |
|
|
| |
|
|
| def _reader_loop(self, session: ProcessSession): |
| """Background thread: read stdout from a local Popen process.""" |
| first_chunk = True |
| try: |
| while True: |
| chunk = session.process.stdout.read(4096) |
| if not chunk: |
| break |
| if first_chunk: |
| chunk = self._clean_shell_noise(chunk) |
| first_chunk = False |
| with session._lock: |
| session.output_buffer += chunk |
| if len(session.output_buffer) > session.max_output_chars: |
| session.output_buffer = session.output_buffer[-session.max_output_chars:] |
| except Exception as e: |
| logger.debug("Process stdout reader ended: %s", e) |
|
|
| |
| try: |
| session.process.wait(timeout=5) |
| except Exception as e: |
| logger.debug("Process wait timed out or failed: %s", e) |
| session.exited = True |
| session.exit_code = session.process.returncode |
| self._move_to_finished(session) |
|
|
| def _env_poller_loop( |
| self, session: ProcessSession, env: Any, log_path: str, pid_path: str |
| ): |
| """Background thread: poll a sandbox log file for non-local backends.""" |
| while not session.exited: |
| time.sleep(2) |
| try: |
| |
| result = env.execute(f"cat {log_path} 2>/dev/null", timeout=10) |
| new_output = result.get("output", "") |
| if new_output: |
| with session._lock: |
| session.output_buffer = new_output |
| if len(session.output_buffer) > session.max_output_chars: |
| session.output_buffer = session.output_buffer[-session.max_output_chars:] |
|
|
| |
| check = env.execute( |
| f"kill -0 $(cat {pid_path} 2>/dev/null) 2>/dev/null; echo $?", |
| timeout=5, |
| ) |
| check_output = check.get("output", "").strip() |
| if check_output and check_output.splitlines()[-1].strip() != "0": |
| |
| exit_result = env.execute( |
| f"wait $(cat {pid_path} 2>/dev/null) 2>/dev/null; echo $?", |
| timeout=5, |
| ) |
| exit_str = exit_result.get("output", "").strip() |
| try: |
| session.exit_code = int(exit_str.splitlines()[-1].strip()) |
| except (ValueError, IndexError): |
| session.exit_code = -1 |
| session.exited = True |
| self._move_to_finished(session) |
| return |
|
|
| except Exception: |
| |
| session.exited = True |
| session.exit_code = -1 |
| self._move_to_finished(session) |
| return |
|
|
| def _pty_reader_loop(self, session: ProcessSession): |
| """Background thread: read output from a PTY process.""" |
| pty = session._pty |
| try: |
| while pty.isalive(): |
| try: |
| chunk = pty.read(4096) |
| if chunk: |
| |
| text = chunk if isinstance(chunk, str) else chunk.decode("utf-8", errors="replace") |
| with session._lock: |
| session.output_buffer += text |
| if len(session.output_buffer) > session.max_output_chars: |
| session.output_buffer = session.output_buffer[-session.max_output_chars:] |
| except EOFError: |
| break |
| except Exception: |
| break |
| except Exception as e: |
| logger.debug("PTY stdout reader ended: %s", e) |
|
|
| |
| try: |
| pty.wait() |
| except Exception as e: |
| logger.debug("PTY wait timed out or failed: %s", e) |
| session.exited = True |
| session.exit_code = pty.exitstatus if hasattr(pty, 'exitstatus') else -1 |
| self._move_to_finished(session) |
|
|
| def _move_to_finished(self, session: ProcessSession): |
| """Move a session from running to finished.""" |
| with self._lock: |
| self._running.pop(session.id, None) |
| self._finished[session.id] = session |
| self._write_checkpoint() |
|
|
| |
|
|
| def get(self, session_id: str) -> Optional[ProcessSession]: |
| """Get a session by ID (running or finished).""" |
| with self._lock: |
| return self._running.get(session_id) or self._finished.get(session_id) |
|
|
| def poll(self, session_id: str) -> dict: |
| """Check status and get new output for a background process.""" |
| from tools.ansi_strip import strip_ansi |
|
|
| session = self.get(session_id) |
| if session is None: |
| return {"status": "not_found", "error": f"No process with ID {session_id}"} |
|
|
| with session._lock: |
| output_preview = strip_ansi(session.output_buffer[-1000:]) if session.output_buffer else "" |
|
|
| result = { |
| "session_id": session.id, |
| "command": session.command, |
| "status": "exited" if session.exited else "running", |
| "pid": session.pid, |
| "uptime_seconds": int(time.time() - session.started_at), |
| "output_preview": output_preview, |
| } |
| if session.exited: |
| result["exit_code"] = session.exit_code |
| if session.detached: |
| result["detached"] = True |
| result["note"] = "Process recovered after restart -- output history unavailable" |
| return result |
|
|
| def read_log(self, session_id: str, offset: int = 0, limit: int = 200) -> dict: |
| """Read the full output log with optional pagination by lines.""" |
| from tools.ansi_strip import strip_ansi |
|
|
| session = self.get(session_id) |
| if session is None: |
| return {"status": "not_found", "error": f"No process with ID {session_id}"} |
|
|
| with session._lock: |
| full_output = strip_ansi(session.output_buffer) |
|
|
| lines = full_output.splitlines() |
| total_lines = len(lines) |
|
|
| |
| if offset == 0 and limit > 0: |
| selected = lines[-limit:] |
| else: |
| selected = lines[offset:offset + limit] |
|
|
| return { |
| "session_id": session.id, |
| "status": "exited" if session.exited else "running", |
| "output": "\n".join(selected), |
| "total_lines": total_lines, |
| "showing": f"{len(selected)} lines", |
| } |
|
|
| def wait(self, session_id: str, timeout: int = None) -> dict: |
| """ |
| Block until a process exits, timeout, or interrupt. |
| |
| Args: |
| session_id: The process to wait for. |
| timeout: Max seconds to block. Falls back to TERMINAL_TIMEOUT config. |
| |
| Returns: |
| dict with status ("exited", "timeout", "interrupted", "not_found") |
| and output snapshot. |
| """ |
| from tools.ansi_strip import strip_ansi |
| from tools.terminal_tool import _interrupt_event |
|
|
| default_timeout = int(os.getenv("TERMINAL_TIMEOUT", "180")) |
| max_timeout = default_timeout |
| requested_timeout = timeout |
| timeout_note = None |
|
|
| if requested_timeout and requested_timeout > max_timeout: |
| effective_timeout = max_timeout |
| timeout_note = ( |
| f"Requested wait of {requested_timeout}s was clamped " |
| f"to configured limit of {max_timeout}s" |
| ) |
| else: |
| effective_timeout = requested_timeout or max_timeout |
|
|
| session = self.get(session_id) |
| if session is None: |
| return {"status": "not_found", "error": f"No process with ID {session_id}"} |
|
|
| deadline = time.monotonic() + effective_timeout |
|
|
| while time.monotonic() < deadline: |
| if session.exited: |
| result = { |
| "status": "exited", |
| "exit_code": session.exit_code, |
| "output": strip_ansi(session.output_buffer[-2000:]), |
| } |
| if timeout_note: |
| result["timeout_note"] = timeout_note |
| return result |
|
|
| if _interrupt_event.is_set(): |
| result = { |
| "status": "interrupted", |
| "output": strip_ansi(session.output_buffer[-1000:]), |
| "note": "User sent a new message -- wait interrupted", |
| } |
| if timeout_note: |
| result["timeout_note"] = timeout_note |
| return result |
|
|
| time.sleep(1) |
|
|
| result = { |
| "status": "timeout", |
| "output": strip_ansi(session.output_buffer[-1000:]), |
| } |
| if timeout_note: |
| result["timeout_note"] = timeout_note |
| else: |
| result["timeout_note"] = f"Waited {effective_timeout}s, process still running" |
| return result |
|
|
| def kill_process(self, session_id: str) -> dict: |
| """Kill a background process.""" |
| session = self.get(session_id) |
| if session is None: |
| return {"status": "not_found", "error": f"No process with ID {session_id}"} |
|
|
| if session.exited: |
| return { |
| "status": "already_exited", |
| "exit_code": session.exit_code, |
| } |
|
|
| |
| try: |
| if session._pty: |
| |
| try: |
| session._pty.terminate(force=True) |
| except Exception: |
| if session.pid: |
| os.kill(session.pid, signal.SIGTERM) |
| elif session.process: |
| |
| try: |
| if _IS_WINDOWS: |
| session.process.terminate() |
| else: |
| os.killpg(os.getpgid(session.process.pid), signal.SIGTERM) |
| except (ProcessLookupError, PermissionError): |
| session.process.kill() |
| elif session.env_ref and session.pid: |
| |
| session.env_ref.execute(f"kill {session.pid} 2>/dev/null", timeout=5) |
| session.exited = True |
| session.exit_code = -15 |
| self._move_to_finished(session) |
| self._write_checkpoint() |
| return {"status": "killed", "session_id": session.id} |
| except Exception as e: |
| return {"status": "error", "error": str(e)} |
|
|
| def write_stdin(self, session_id: str, data: str) -> dict: |
| """Send raw data to a running process's stdin (no newline appended).""" |
| session = self.get(session_id) |
| if session is None: |
| return {"status": "not_found", "error": f"No process with ID {session_id}"} |
| if session.exited: |
| return {"status": "already_exited", "error": "Process has already finished"} |
|
|
| |
| if hasattr(session, '_pty') and session._pty: |
| try: |
| pty_data = data.encode("utf-8") if isinstance(data, str) else data |
| session._pty.write(pty_data) |
| return {"status": "ok", "bytes_written": len(data)} |
| except Exception as e: |
| return {"status": "error", "error": str(e)} |
|
|
| |
| if not session.process or not session.process.stdin: |
| return {"status": "error", "error": "Process stdin not available (non-local backend or stdin closed)"} |
| try: |
| session.process.stdin.write(data) |
| session.process.stdin.flush() |
| return {"status": "ok", "bytes_written": len(data)} |
| except Exception as e: |
| return {"status": "error", "error": str(e)} |
|
|
| def submit_stdin(self, session_id: str, data: str = "") -> dict: |
| """Send data + newline to a running process's stdin (like pressing Enter).""" |
| return self.write_stdin(session_id, data + "\n") |
|
|
| def list_sessions(self, task_id: str = None) -> list: |
| """List all running and recently-finished processes.""" |
| with self._lock: |
| all_sessions = list(self._running.values()) + list(self._finished.values()) |
|
|
| if task_id: |
| all_sessions = [s for s in all_sessions if s.task_id == task_id] |
|
|
| result = [] |
| for s in all_sessions: |
| entry = { |
| "session_id": s.id, |
| "command": s.command[:200], |
| "cwd": s.cwd, |
| "pid": s.pid, |
| "started_at": time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(s.started_at)), |
| "uptime_seconds": int(time.time() - s.started_at), |
| "status": "exited" if s.exited else "running", |
| "output_preview": s.output_buffer[-200:] if s.output_buffer else "", |
| } |
| if s.exited: |
| entry["exit_code"] = s.exit_code |
| if s.detached: |
| entry["detached"] = True |
| result.append(entry) |
| return result |
|
|
| |
|
|
| def has_active_processes(self, task_id: str) -> bool: |
| """Check if there are active (running) processes for a task_id.""" |
| with self._lock: |
| return any( |
| s.task_id == task_id and not s.exited |
| for s in self._running.values() |
| ) |
|
|
| def has_active_for_session(self, session_key: str) -> bool: |
| """Check if there are active processes for a gateway session key.""" |
| with self._lock: |
| return any( |
| s.session_key == session_key and not s.exited |
| for s in self._running.values() |
| ) |
|
|
| def kill_all(self, task_id: str = None) -> int: |
| """Kill all running processes, optionally filtered by task_id. Returns count killed.""" |
| with self._lock: |
| targets = [ |
| s for s in self._running.values() |
| if (task_id is None or s.task_id == task_id) and not s.exited |
| ] |
|
|
| killed = 0 |
| for session in targets: |
| result = self.kill_process(session.id) |
| if result.get("status") in ("killed", "already_exited"): |
| killed += 1 |
| return killed |
|
|
| |
|
|
| def _prune_if_needed(self): |
| """Remove oldest finished sessions if over MAX_PROCESSES. Must hold _lock.""" |
| |
| now = time.time() |
| expired = [ |
| sid for sid, s in self._finished.items() |
| if (now - s.started_at) > FINISHED_TTL_SECONDS |
| ] |
| for sid in expired: |
| del self._finished[sid] |
|
|
| |
| total = len(self._running) + len(self._finished) |
| if total >= MAX_PROCESSES and self._finished: |
| oldest_id = min(self._finished, key=lambda sid: self._finished[sid].started_at) |
| del self._finished[oldest_id] |
|
|
| def cleanup_expired(self): |
| """Public method to prune expired finished sessions.""" |
| with self._lock: |
| self._prune_if_needed() |
|
|
| |
|
|
| def _write_checkpoint(self): |
| """Write running process metadata to checkpoint file atomically.""" |
| try: |
| with self._lock: |
| entries = [] |
| for s in self._running.values(): |
| if not s.exited: |
| entries.append({ |
| "session_id": s.id, |
| "command": s.command, |
| "pid": s.pid, |
| "cwd": s.cwd, |
| "started_at": s.started_at, |
| "task_id": s.task_id, |
| "session_key": s.session_key, |
| "watcher_platform": s.watcher_platform, |
| "watcher_chat_id": s.watcher_chat_id, |
| "watcher_thread_id": s.watcher_thread_id, |
| "watcher_interval": s.watcher_interval, |
| }) |
| |
| |
| from utils import atomic_json_write |
| atomic_json_write(CHECKPOINT_PATH, entries) |
| except Exception as e: |
| logger.debug("Failed to write checkpoint file: %s", e, exc_info=True) |
|
|
| def recover_from_checkpoint(self) -> int: |
| """ |
| On gateway startup, probe PIDs from checkpoint file. |
| |
| Returns the number of processes recovered as detached. |
| """ |
| if not CHECKPOINT_PATH.exists(): |
| return 0 |
|
|
| try: |
| entries = json.loads(CHECKPOINT_PATH.read_text(encoding="utf-8")) |
| except Exception: |
| return 0 |
|
|
| recovered = 0 |
| for entry in entries: |
| pid = entry.get("pid") |
| if not pid: |
| continue |
|
|
| |
| alive = False |
| try: |
| os.kill(pid, 0) |
| alive = True |
| except (ProcessLookupError, PermissionError): |
| pass |
|
|
| if alive: |
| session = ProcessSession( |
| id=entry["session_id"], |
| command=entry.get("command", "unknown"), |
| task_id=entry.get("task_id", ""), |
| session_key=entry.get("session_key", ""), |
| pid=pid, |
| cwd=entry.get("cwd"), |
| started_at=entry.get("started_at", time.time()), |
| detached=True, |
| watcher_platform=entry.get("watcher_platform", ""), |
| watcher_chat_id=entry.get("watcher_chat_id", ""), |
| watcher_thread_id=entry.get("watcher_thread_id", ""), |
| watcher_interval=entry.get("watcher_interval", 0), |
| ) |
| with self._lock: |
| self._running[session.id] = session |
| recovered += 1 |
| logger.info("Recovered detached process: %s (pid=%d)", session.command[:60], pid) |
|
|
| |
| if session.watcher_interval > 0: |
| self.pending_watchers.append({ |
| "session_id": session.id, |
| "check_interval": session.watcher_interval, |
| "session_key": session.session_key, |
| "platform": session.watcher_platform, |
| "chat_id": session.watcher_chat_id, |
| "thread_id": session.watcher_thread_id, |
| }) |
|
|
| |
| try: |
| from utils import atomic_json_write |
| atomic_json_write(CHECKPOINT_PATH, []) |
| except Exception as e: |
| logger.debug("Could not clear checkpoint file: %s", e, exc_info=True) |
|
|
| return recovered |
|
|
|
|
| |
| process_registry = ProcessRegistry() |
|
|
|
|
| |
| |
| |
| from tools.registry import registry |
|
|
| PROCESS_SCHEMA = { |
| "name": "process", |
| "description": ( |
| "Manage background processes started with terminal(background=true). " |
| "Actions: 'list' (show all), 'poll' (check status + new output), " |
| "'log' (full output with pagination), 'wait' (block until done or timeout), " |
| "'kill' (terminate), 'write' (send raw stdin data without newline), " |
| "'submit' (send data + Enter, for answering prompts)." |
| ), |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "action": { |
| "type": "string", |
| "enum": ["list", "poll", "log", "wait", "kill", "write", "submit"], |
| "description": "Action to perform on background processes" |
| }, |
| "session_id": { |
| "type": "string", |
| "description": "Process session ID (from terminal background output). Required for all actions except 'list'." |
| }, |
| "data": { |
| "type": "string", |
| "description": "Text to send to process stdin (for 'write' and 'submit' actions)" |
| }, |
| "timeout": { |
| "type": "integer", |
| "description": "Max seconds to block for 'wait' action. Returns partial output on timeout.", |
| "minimum": 1 |
| }, |
| "offset": { |
| "type": "integer", |
| "description": "Line offset for 'log' action (default: last 200 lines)" |
| }, |
| "limit": { |
| "type": "integer", |
| "description": "Max lines to return for 'log' action", |
| "minimum": 1 |
| } |
| }, |
| "required": ["action"] |
| } |
| } |
|
|
|
|
| def _handle_process(args, **kw): |
| import json as _json |
| task_id = kw.get("task_id") |
| action = args.get("action", "") |
| |
| session_id = str(args.get("session_id", "")) if args.get("session_id") is not None else "" |
|
|
| if action == "list": |
| return _json.dumps({"processes": process_registry.list_sessions(task_id=task_id)}, ensure_ascii=False) |
| elif action in ("poll", "log", "wait", "kill", "write", "submit"): |
| if not session_id: |
| return _json.dumps({"error": f"session_id is required for {action}"}, ensure_ascii=False) |
| if action == "poll": |
| return _json.dumps(process_registry.poll(session_id), ensure_ascii=False) |
| elif action == "log": |
| return _json.dumps(process_registry.read_log( |
| session_id, offset=args.get("offset", 0), limit=args.get("limit", 200)), ensure_ascii=False) |
| elif action == "wait": |
| return _json.dumps(process_registry.wait(session_id, timeout=args.get("timeout")), ensure_ascii=False) |
| elif action == "kill": |
| return _json.dumps(process_registry.kill_process(session_id), ensure_ascii=False) |
| elif action == "write": |
| return _json.dumps(process_registry.write_stdin(session_id, str(args.get("data", ""))), ensure_ascii=False) |
| elif action == "submit": |
| return _json.dumps(process_registry.submit_stdin(session_id, str(args.get("data", ""))), ensure_ascii=False) |
| return _json.dumps({"error": f"Unknown process action: {action}. Use: list, poll, log, wait, kill, write, submit"}, ensure_ascii=False) |
|
|
|
|
| registry.register( |
| name="process", |
| toolset="terminal", |
| schema=PROCESS_SCHEMA, |
| handler=_handle_process, |
| emoji="⚙️", |
| ) |
|
|