| """ |
| Terminal Execution Engine β Real Command Runner |
| Streaming logs, timeout control, session management, self-repair loop |
| """ |
|
|
| import asyncio |
| import os |
| import pty |
| import select |
| import signal |
| import time |
| import uuid |
| from typing import AsyncGenerator, Callable, Dict, List, Optional |
|
|
| import structlog |
|
|
| log = structlog.get_logger() |
|
|
| WORKSPACE = os.environ.get("WORKSPACE_DIR", "/tmp/god_workspace") |
|
|
| BLOCKED_COMMANDS = [ |
| "rm -rf /", |
| ":(){ :|:& };:", |
| "mkfs", |
| "shutdown", |
| "reboot", |
| "halt", |
| "dd if=/dev/zero", |
| "dd if=/dev/urandom of=/dev", |
| "> /dev/sda", |
| "chmod 777 /", |
| "chown root /", |
| ] |
|
|
|
|
| def _is_blocked(cmd: str) -> bool: |
| c = cmd.strip() |
| for b in BLOCKED_COMMANDS: |
| if b in c: |
| return True |
| return False |
|
|
|
|
| class ProcessSession: |
| """Manages a persistent shell session.""" |
| def __init__(self, session_id: str, cwd: str = WORKSPACE): |
| self.session_id = session_id |
| self.cwd = cwd |
| self.history: List[Dict] = [] |
| self.env = os.environ.copy() |
| self.env["TERM"] = "xterm-256color" |
| self.env["PYTHONUNBUFFERED"] = "1" |
| self.created_at = time.time() |
| self.last_used = time.time() |
|
|
| def record(self, cmd: str, output: str, exit_code: int, duration: float): |
| self.history.append({ |
| "cmd": cmd, |
| "output": output[:2000], |
| "exit_code": exit_code, |
| "duration": round(duration, 3), |
| "timestamp": time.time(), |
| }) |
| self.last_used = time.time() |
| if len(self.history) > 100: |
| self.history = self.history[-100:] |
|
|
|
|
| class TerminalEngine: |
| """ |
| Real Terminal Execution Engine |
| - Async subprocess with streaming output |
| - Session management (persistent cwd + env) |
| - Timeout + kill control |
| - Self-repair loop (run β error β analyze β patch β retry) |
| """ |
|
|
| def __init__(self, ws_manager=None): |
| self.ws = ws_manager |
| self._sessions: Dict[str, ProcessSession] = {} |
| self._running_procs: Dict[str, asyncio.subprocess.Process] = {} |
|
|
| |
|
|
| def get_or_create_session(self, session_id: str, cwd: str = WORKSPACE) -> ProcessSession: |
| if session_id not in self._sessions: |
| self._sessions[session_id] = ProcessSession(session_id, cwd) |
| os.makedirs(cwd, exist_ok=True) |
| return self._sessions[session_id] |
|
|
| def get_session_history(self, session_id: str) -> List[Dict]: |
| sess = self._sessions.get(session_id) |
| return sess.history if sess else [] |
|
|
| def cleanup_old_sessions(self, max_age: int = 3600): |
| now = time.time() |
| to_remove = [sid for sid, s in self._sessions.items() if now - s.last_used > max_age] |
| for sid in to_remove: |
| del self._sessions[sid] |
|
|
| |
|
|
| async def execute( |
| self, |
| command: str, |
| session_id: str = "", |
| task_id: str = "", |
| cwd: str = "", |
| timeout: int = 120, |
| stream_callback: Optional[Callable] = None, |
| env_override: Optional[Dict] = None, |
| ) -> Dict: |
| """ |
| Execute a shell command with real streaming output. |
| Returns: {success, output, exit_code, duration, command} |
| """ |
| if _is_blocked(command): |
| result = { |
| "success": False, |
| "output": f"β BLOCKED: Dangerous command detected: {command[:60]}", |
| "exit_code": -1, |
| "command": command, |
| "duration": 0.0, |
| } |
| await self._emit(task_id, session_id, "terminal_blocked", result) |
| return result |
|
|
| sess = self.get_or_create_session(session_id or "default", cwd or WORKSPACE) |
| work_dir = cwd or sess.cwd |
| os.makedirs(work_dir, exist_ok=True) |
|
|
| env = sess.env.copy() |
| if env_override: |
| env.update(env_override) |
|
|
| await self._emit(task_id, session_id, "terminal_exec", { |
| "command": command[:300], |
| "cwd": work_dir, |
| "session_id": session_id, |
| }) |
|
|
| start = time.time() |
| output_lines = [] |
|
|
| try: |
| proc = await asyncio.create_subprocess_shell( |
| command, |
| stdout=asyncio.subprocess.PIPE, |
| stderr=asyncio.subprocess.STDOUT, |
| cwd=work_dir, |
| env=env, |
| ) |
|
|
| if session_id: |
| self._running_procs[session_id] = proc |
|
|
| |
| async def read_output(): |
| while True: |
| try: |
| line = await asyncio.wait_for(proc.stdout.readline(), timeout=5.0) |
| if not line: |
| break |
| decoded = line.decode("utf-8", errors="replace").rstrip() |
| output_lines.append(decoded) |
|
|
| |
| await self._emit(task_id, session_id, "terminal_output", { |
| "line": decoded, |
| "command": command[:80], |
| }) |
|
|
| if stream_callback: |
| await stream_callback(decoded) |
|
|
| if len(output_lines) > 500: |
| output_lines.append("... [output truncated at 500 lines]") |
| proc.kill() |
| break |
| except asyncio.TimeoutError: |
| if proc.returncode is not None: |
| break |
| continue |
|
|
| await asyncio.wait_for( |
| asyncio.gather(read_output(), proc.wait()), |
| timeout=timeout, |
| ) |
|
|
| except asyncio.TimeoutError: |
| try: |
| proc.kill() |
| except Exception: |
| pass |
| output_lines.append(f"\nβ οΈ Command timed out after {timeout}s") |
| exit_code = -1 |
| except Exception as e: |
| output_lines.append(f"\nβ Execution error: {str(e)}") |
| exit_code = -1 |
| else: |
| exit_code = proc.returncode or 0 |
|
|
| finally: |
| if session_id in self._running_procs: |
| del self._running_procs[session_id] |
|
|
| duration = time.time() - start |
| full_output = "\n".join(output_lines) |
|
|
| if session_id: |
| sess.record(command, full_output, exit_code, duration) |
|
|
| result = { |
| "success": exit_code == 0, |
| "output": full_output, |
| "exit_code": exit_code, |
| "command": command, |
| "duration": round(duration, 3), |
| "cwd": work_dir, |
| } |
|
|
| await self._emit(task_id, session_id, "terminal_result", { |
| "command": command[:100], |
| "exit_code": exit_code, |
| "success": exit_code == 0, |
| "duration_ms": round(duration * 1000), |
| "output_lines": len(output_lines), |
| }) |
|
|
| return result |
|
|
| |
|
|
| async def kill(self, session_id: str) -> Dict: |
| proc = self._running_procs.get(session_id) |
| if proc: |
| try: |
| proc.kill() |
| del self._running_procs[session_id] |
| return {"success": True, "action": "killed", "session_id": session_id} |
| except Exception as e: |
| return {"success": False, "error": str(e)} |
| return {"success": False, "error": "No running process for session"} |
|
|
| |
|
|
| async def execute_chain( |
| self, |
| commands: List[str], |
| session_id: str = "", |
| task_id: str = "", |
| cwd: str = "", |
| stop_on_error: bool = True, |
| ) -> Dict: |
| """Run a chain of commands, passing cwd between them.""" |
| results = [] |
| current_cwd = cwd or WORKSPACE |
|
|
| for i, cmd in enumerate(commands): |
| |
| if cmd.strip().startswith("cd "): |
| new_dir = cmd.strip()[3:].strip().strip('"').strip("'") |
| if not os.path.isabs(new_dir): |
| new_dir = os.path.join(current_cwd, new_dir) |
| if os.path.isdir(new_dir): |
| current_cwd = new_dir |
| results.append({"command": cmd, "success": True, "output": f"Changed to {current_cwd}", "exit_code": 0}) |
| else: |
| results.append({"command": cmd, "success": False, "output": f"Directory not found: {new_dir}", "exit_code": 1}) |
| if stop_on_error: |
| break |
| continue |
|
|
| result = await self.execute( |
| cmd, |
| session_id=session_id, |
| task_id=task_id, |
| cwd=current_cwd, |
| ) |
| results.append(result) |
|
|
| if not result["success"] and stop_on_error: |
| break |
|
|
| success = all(r.get("success", False) for r in results) |
| combined_output = "\n".join(f"$ {r['command']}\n{r['output']}" for r in results) |
|
|
| return { |
| "success": success, |
| "commands": len(commands), |
| "results": results, |
| "combined_output": combined_output[:5000], |
| "final_cwd": current_cwd, |
| } |
|
|
| |
|
|
| async def self_repair( |
| self, |
| command: str, |
| error_output: str, |
| ai_router=None, |
| session_id: str = "", |
| task_id: str = "", |
| max_attempts: int = 3, |
| ) -> Dict: |
| """ |
| Real self-repair loop: |
| Run β Error β Analyze β AI suggests fix β Retry |
| """ |
| await self._emit(task_id, session_id, "self_repair_start", { |
| "command": command[:100], |
| "error": error_output[:200], |
| }) |
|
|
| for attempt in range(1, max_attempts + 1): |
| await self._emit(task_id, session_id, "self_repair_attempt", { |
| "attempt": attempt, |
| "max": max_attempts, |
| }) |
|
|
| if ai_router: |
| fix_prompt = [ |
| {"role": "system", "content": "You are an expert DevOps engineer. Analyze the error and provide the EXACT fixed command or commands. Return ONLY the command(s), one per line. No explanations."}, |
| {"role": "user", "content": f"Original command: {command}\n\nError output:\n{error_output[:1000]}\n\nProvide the corrected command(s):"}, |
| ] |
| try: |
| fixed_cmd = await ai_router.complete(fix_prompt, temperature=0.1, max_tokens=300) |
| fixed_cmd = fixed_cmd.strip().strip("`").strip() |
| |
| if fixed_cmd.startswith("```"): |
| lines = fixed_cmd.split("\n") |
| fixed_cmd = "\n".join(lines[1:-1]) if len(lines) > 2 else fixed_cmd |
| except Exception: |
| fixed_cmd = command |
|
|
| await self._emit(task_id, session_id, "self_repair_fix", { |
| "attempt": attempt, |
| "fixed_command": fixed_cmd[:200], |
| }) |
|
|
| result = await self.execute(fixed_cmd, session_id=session_id, task_id=task_id) |
| if result["success"]: |
| await self._emit(task_id, session_id, "self_repair_success", { |
| "attempt": attempt, |
| "fixed_command": fixed_cmd[:100], |
| }) |
| return { |
| "success": True, |
| "fixed_command": fixed_cmd, |
| "result": result, |
| "attempts": attempt, |
| } |
| error_output = result["output"] |
| else: |
| break |
|
|
| await self._emit(task_id, session_id, "self_repair_failed", { |
| "max_attempts": max_attempts, |
| "last_error": error_output[:200], |
| }) |
| return { |
| "success": False, |
| "error": "Self-repair exhausted all attempts", |
| "last_error": error_output, |
| "attempts": max_attempts, |
| } |
|
|
| |
|
|
| async def _emit(self, task_id: str, session_id: str, event: str, data: Dict): |
| if not self.ws: |
| return |
| try: |
| if task_id: |
| await self.ws.emit(task_id, event, data, session_id=session_id) |
| if session_id: |
| await self.ws.emit_chat(session_id, event, data) |
| except Exception: |
| pass |
|
|