""" Terminal Execution Engine — Real Command Runner Streaming logs, timeout control, session management, self-repair loop """ import asyncio import os import pty import select import signal import time import uuid from typing import AsyncGenerator, Callable, Dict, List, Optional import structlog log = structlog.get_logger() WORKSPACE = os.environ.get("WORKSPACE_DIR", "/tmp/god_workspace") BLOCKED_COMMANDS = [ "rm -rf /", ":(){ :|:& };:", "mkfs", "shutdown", "reboot", "halt", "dd if=/dev/zero", "dd if=/dev/urandom of=/dev", "> /dev/sda", "chmod 777 /", "chown root /", ] def _is_blocked(cmd: str) -> bool: c = cmd.strip() for b in BLOCKED_COMMANDS: if b in c: return True return False class ProcessSession: """Manages a persistent shell session.""" def __init__(self, session_id: str, cwd: str = WORKSPACE): self.session_id = session_id self.cwd = cwd self.history: List[Dict] = [] self.env = os.environ.copy() self.env["TERM"] = "xterm-256color" self.env["PYTHONUNBUFFERED"] = "1" self.created_at = time.time() self.last_used = time.time() def record(self, cmd: str, output: str, exit_code: int, duration: float): self.history.append({ "cmd": cmd, "output": output[:2000], "exit_code": exit_code, "duration": round(duration, 3), "timestamp": time.time(), }) self.last_used = time.time() if len(self.history) > 100: self.history = self.history[-100:] class TerminalEngine: """ Real Terminal Execution Engine - Async subprocess with streaming output - Session management (persistent cwd + env) - Timeout + kill control - Self-repair loop (run → error → analyze → patch → retry) """ def __init__(self, ws_manager=None): self.ws = ws_manager self._sessions: Dict[str, ProcessSession] = {} self._running_procs: Dict[str, asyncio.subprocess.Process] = {} # ─── Session Management ─────────────────────────────────────────────────── def get_or_create_session(self, session_id: str, cwd: str = WORKSPACE) -> ProcessSession: if session_id not in self._sessions: self._sessions[session_id] = ProcessSession(session_id, cwd) os.makedirs(cwd, exist_ok=True) return self._sessions[session_id] def get_session_history(self, session_id: str) -> List[Dict]: sess = self._sessions.get(session_id) return sess.history if sess else [] def cleanup_old_sessions(self, max_age: int = 3600): now = time.time() to_remove = [sid for sid, s in self._sessions.items() if now - s.last_used > max_age] for sid in to_remove: del self._sessions[sid] # ─── Execute Command ────────────────────────────────────────────────────── async def execute( self, command: str, session_id: str = "", task_id: str = "", cwd: str = "", timeout: int = 120, stream_callback: Optional[Callable] = None, env_override: Optional[Dict] = None, ) -> Dict: """ Execute a shell command with real streaming output. Returns: {success, output, exit_code, duration, command} """ if _is_blocked(command): result = { "success": False, "output": f"❌ BLOCKED: Dangerous command detected: {command[:60]}", "exit_code": -1, "command": command, "duration": 0.0, } await self._emit(task_id, session_id, "terminal_blocked", result) return result sess = self.get_or_create_session(session_id or "default", cwd or WORKSPACE) work_dir = cwd or sess.cwd os.makedirs(work_dir, exist_ok=True) env = sess.env.copy() if env_override: env.update(env_override) await self._emit(task_id, session_id, "terminal_exec", { "command": command[:300], "cwd": work_dir, "session_id": session_id, }) start = time.time() output_lines = [] try: proc = await asyncio.create_subprocess_shell( command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, cwd=work_dir, env=env, ) if session_id: self._running_procs[session_id] = proc # Stream output line by line async def read_output(): while True: try: line = await asyncio.wait_for(proc.stdout.readline(), timeout=5.0) if not line: break decoded = line.decode("utf-8", errors="replace").rstrip() output_lines.append(decoded) # Stream via WebSocket await self._emit(task_id, session_id, "terminal_output", { "line": decoded, "command": command[:80], }) if stream_callback: await stream_callback(decoded) if len(output_lines) > 500: output_lines.append("... [output truncated at 500 lines]") proc.kill() break except asyncio.TimeoutError: if proc.returncode is not None: break continue await asyncio.wait_for( asyncio.gather(read_output(), proc.wait()), timeout=timeout, ) except asyncio.TimeoutError: try: proc.kill() except Exception: pass output_lines.append(f"\n⚠️ Command timed out after {timeout}s") exit_code = -1 except Exception as e: output_lines.append(f"\n❌ Execution error: {str(e)}") exit_code = -1 else: exit_code = proc.returncode or 0 finally: if session_id in self._running_procs: del self._running_procs[session_id] duration = time.time() - start full_output = "\n".join(output_lines) if session_id: sess.record(command, full_output, exit_code, duration) result = { "success": exit_code == 0, "output": full_output, "exit_code": exit_code, "command": command, "duration": round(duration, 3), "cwd": work_dir, } await self._emit(task_id, session_id, "terminal_result", { "command": command[:100], "exit_code": exit_code, "success": exit_code == 0, "duration_ms": round(duration * 1000), "output_lines": len(output_lines), }) return result # ─── Kill Running Process ───────────────────────────────────────────────── async def kill(self, session_id: str) -> Dict: proc = self._running_procs.get(session_id) if proc: try: proc.kill() del self._running_procs[session_id] return {"success": True, "action": "killed", "session_id": session_id} except Exception as e: return {"success": False, "error": str(e)} return {"success": False, "error": "No running process for session"} # ─── Execute Multiple Commands in Sequence ──────────────────────────────── async def execute_chain( self, commands: List[str], session_id: str = "", task_id: str = "", cwd: str = "", stop_on_error: bool = True, ) -> Dict: """Run a chain of commands, passing cwd between them.""" results = [] current_cwd = cwd or WORKSPACE for i, cmd in enumerate(commands): # Handle 'cd' specially to persist directory if cmd.strip().startswith("cd "): new_dir = cmd.strip()[3:].strip().strip('"').strip("'") if not os.path.isabs(new_dir): new_dir = os.path.join(current_cwd, new_dir) if os.path.isdir(new_dir): current_cwd = new_dir results.append({"command": cmd, "success": True, "output": f"Changed to {current_cwd}", "exit_code": 0}) else: results.append({"command": cmd, "success": False, "output": f"Directory not found: {new_dir}", "exit_code": 1}) if stop_on_error: break continue result = await self.execute( cmd, session_id=session_id, task_id=task_id, cwd=current_cwd, ) results.append(result) if not result["success"] and stop_on_error: break success = all(r.get("success", False) for r in results) combined_output = "\n".join(f"$ {r['command']}\n{r['output']}" for r in results) return { "success": success, "commands": len(commands), "results": results, "combined_output": combined_output[:5000], "final_cwd": current_cwd, } # ─── Self-Repair Loop ───────────────────────────────────────────────────── async def self_repair( self, command: str, error_output: str, ai_router=None, session_id: str = "", task_id: str = "", max_attempts: int = 3, ) -> Dict: """ Real self-repair loop: Run → Error → Analyze → AI suggests fix → Retry """ await self._emit(task_id, session_id, "self_repair_start", { "command": command[:100], "error": error_output[:200], }) for attempt in range(1, max_attempts + 1): await self._emit(task_id, session_id, "self_repair_attempt", { "attempt": attempt, "max": max_attempts, }) if ai_router: fix_prompt = [ {"role": "system", "content": "You are an expert DevOps engineer. Analyze the error and provide the EXACT fixed command or commands. Return ONLY the command(s), one per line. No explanations."}, {"role": "user", "content": f"Original command: {command}\n\nError output:\n{error_output[:1000]}\n\nProvide the corrected command(s):"}, ] try: fixed_cmd = await ai_router.complete(fix_prompt, temperature=0.1, max_tokens=300) fixed_cmd = fixed_cmd.strip().strip("`").strip() # Remove markdown code blocks if fixed_cmd.startswith("```"): lines = fixed_cmd.split("\n") fixed_cmd = "\n".join(lines[1:-1]) if len(lines) > 2 else fixed_cmd except Exception: fixed_cmd = command await self._emit(task_id, session_id, "self_repair_fix", { "attempt": attempt, "fixed_command": fixed_cmd[:200], }) result = await self.execute(fixed_cmd, session_id=session_id, task_id=task_id) if result["success"]: await self._emit(task_id, session_id, "self_repair_success", { "attempt": attempt, "fixed_command": fixed_cmd[:100], }) return { "success": True, "fixed_command": fixed_cmd, "result": result, "attempts": attempt, } error_output = result["output"] else: break await self._emit(task_id, session_id, "self_repair_failed", { "max_attempts": max_attempts, "last_error": error_output[:200], }) return { "success": False, "error": "Self-repair exhausted all attempts", "last_error": error_output, "attempts": max_attempts, } # ─── Emit Helper ───────────────────────────────────────────────────────── async def _emit(self, task_id: str, session_id: str, event: str, data: Dict): if not self.ws: return try: if task_id: await self.ws.emit(task_id, event, data, session_id=session_id) if session_id: await self.ws.emit_chat(session_id, event, data) except Exception: pass