autonomous-coding-system-v5 / tools /terminal_engine.py
PYAE1994's picture
feat: GOD MODE+ v4.0 - tools/terminal_engine.py
87fb933 verified
"""
Terminal Execution Engine β€” Real Command Runner
Streaming logs, timeout control, session management, self-repair loop
"""
import asyncio
import os
import pty
import select
import signal
import time
import uuid
from typing import AsyncGenerator, Callable, Dict, List, Optional
import structlog
log = structlog.get_logger()
WORKSPACE = os.environ.get("WORKSPACE_DIR", "/tmp/god_workspace")
BLOCKED_COMMANDS = [
"rm -rf /",
":(){ :|:& };:",
"mkfs",
"shutdown",
"reboot",
"halt",
"dd if=/dev/zero",
"dd if=/dev/urandom of=/dev",
"> /dev/sda",
"chmod 777 /",
"chown root /",
]
def _is_blocked(cmd: str) -> bool:
c = cmd.strip()
for b in BLOCKED_COMMANDS:
if b in c:
return True
return False
class ProcessSession:
"""Manages a persistent shell session."""
def __init__(self, session_id: str, cwd: str = WORKSPACE):
self.session_id = session_id
self.cwd = cwd
self.history: List[Dict] = []
self.env = os.environ.copy()
self.env["TERM"] = "xterm-256color"
self.env["PYTHONUNBUFFERED"] = "1"
self.created_at = time.time()
self.last_used = time.time()
def record(self, cmd: str, output: str, exit_code: int, duration: float):
self.history.append({
"cmd": cmd,
"output": output[:2000],
"exit_code": exit_code,
"duration": round(duration, 3),
"timestamp": time.time(),
})
self.last_used = time.time()
if len(self.history) > 100:
self.history = self.history[-100:]
class TerminalEngine:
"""
Real Terminal Execution Engine
- Async subprocess with streaming output
- Session management (persistent cwd + env)
- Timeout + kill control
- Self-repair loop (run β†’ error β†’ analyze β†’ patch β†’ retry)
"""
def __init__(self, ws_manager=None):
self.ws = ws_manager
self._sessions: Dict[str, ProcessSession] = {}
self._running_procs: Dict[str, asyncio.subprocess.Process] = {}
# ─── Session Management ───────────────────────────────────────────────────
def get_or_create_session(self, session_id: str, cwd: str = WORKSPACE) -> ProcessSession:
if session_id not in self._sessions:
self._sessions[session_id] = ProcessSession(session_id, cwd)
os.makedirs(cwd, exist_ok=True)
return self._sessions[session_id]
def get_session_history(self, session_id: str) -> List[Dict]:
sess = self._sessions.get(session_id)
return sess.history if sess else []
def cleanup_old_sessions(self, max_age: int = 3600):
now = time.time()
to_remove = [sid for sid, s in self._sessions.items() if now - s.last_used > max_age]
for sid in to_remove:
del self._sessions[sid]
# ─── Execute Command ──────────────────────────────────────────────────────
async def execute(
self,
command: str,
session_id: str = "",
task_id: str = "",
cwd: str = "",
timeout: int = 120,
stream_callback: Optional[Callable] = None,
env_override: Optional[Dict] = None,
) -> Dict:
"""
Execute a shell command with real streaming output.
Returns: {success, output, exit_code, duration, command}
"""
if _is_blocked(command):
result = {
"success": False,
"output": f"❌ BLOCKED: Dangerous command detected: {command[:60]}",
"exit_code": -1,
"command": command,
"duration": 0.0,
}
await self._emit(task_id, session_id, "terminal_blocked", result)
return result
sess = self.get_or_create_session(session_id or "default", cwd or WORKSPACE)
work_dir = cwd or sess.cwd
os.makedirs(work_dir, exist_ok=True)
env = sess.env.copy()
if env_override:
env.update(env_override)
await self._emit(task_id, session_id, "terminal_exec", {
"command": command[:300],
"cwd": work_dir,
"session_id": session_id,
})
start = time.time()
output_lines = []
try:
proc = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
cwd=work_dir,
env=env,
)
if session_id:
self._running_procs[session_id] = proc
# Stream output line by line
async def read_output():
while True:
try:
line = await asyncio.wait_for(proc.stdout.readline(), timeout=5.0)
if not line:
break
decoded = line.decode("utf-8", errors="replace").rstrip()
output_lines.append(decoded)
# Stream via WebSocket
await self._emit(task_id, session_id, "terminal_output", {
"line": decoded,
"command": command[:80],
})
if stream_callback:
await stream_callback(decoded)
if len(output_lines) > 500:
output_lines.append("... [output truncated at 500 lines]")
proc.kill()
break
except asyncio.TimeoutError:
if proc.returncode is not None:
break
continue
await asyncio.wait_for(
asyncio.gather(read_output(), proc.wait()),
timeout=timeout,
)
except asyncio.TimeoutError:
try:
proc.kill()
except Exception:
pass
output_lines.append(f"\n⚠️ Command timed out after {timeout}s")
exit_code = -1
except Exception as e:
output_lines.append(f"\n❌ Execution error: {str(e)}")
exit_code = -1
else:
exit_code = proc.returncode or 0
finally:
if session_id in self._running_procs:
del self._running_procs[session_id]
duration = time.time() - start
full_output = "\n".join(output_lines)
if session_id:
sess.record(command, full_output, exit_code, duration)
result = {
"success": exit_code == 0,
"output": full_output,
"exit_code": exit_code,
"command": command,
"duration": round(duration, 3),
"cwd": work_dir,
}
await self._emit(task_id, session_id, "terminal_result", {
"command": command[:100],
"exit_code": exit_code,
"success": exit_code == 0,
"duration_ms": round(duration * 1000),
"output_lines": len(output_lines),
})
return result
# ─── Kill Running Process ─────────────────────────────────────────────────
async def kill(self, session_id: str) -> Dict:
proc = self._running_procs.get(session_id)
if proc:
try:
proc.kill()
del self._running_procs[session_id]
return {"success": True, "action": "killed", "session_id": session_id}
except Exception as e:
return {"success": False, "error": str(e)}
return {"success": False, "error": "No running process for session"}
# ─── Execute Multiple Commands in Sequence ────────────────────────────────
async def execute_chain(
self,
commands: List[str],
session_id: str = "",
task_id: str = "",
cwd: str = "",
stop_on_error: bool = True,
) -> Dict:
"""Run a chain of commands, passing cwd between them."""
results = []
current_cwd = cwd or WORKSPACE
for i, cmd in enumerate(commands):
# Handle 'cd' specially to persist directory
if cmd.strip().startswith("cd "):
new_dir = cmd.strip()[3:].strip().strip('"').strip("'")
if not os.path.isabs(new_dir):
new_dir = os.path.join(current_cwd, new_dir)
if os.path.isdir(new_dir):
current_cwd = new_dir
results.append({"command": cmd, "success": True, "output": f"Changed to {current_cwd}", "exit_code": 0})
else:
results.append({"command": cmd, "success": False, "output": f"Directory not found: {new_dir}", "exit_code": 1})
if stop_on_error:
break
continue
result = await self.execute(
cmd,
session_id=session_id,
task_id=task_id,
cwd=current_cwd,
)
results.append(result)
if not result["success"] and stop_on_error:
break
success = all(r.get("success", False) for r in results)
combined_output = "\n".join(f"$ {r['command']}\n{r['output']}" for r in results)
return {
"success": success,
"commands": len(commands),
"results": results,
"combined_output": combined_output[:5000],
"final_cwd": current_cwd,
}
# ─── Self-Repair Loop ─────────────────────────────────────────────────────
async def self_repair(
self,
command: str,
error_output: str,
ai_router=None,
session_id: str = "",
task_id: str = "",
max_attempts: int = 3,
) -> Dict:
"""
Real self-repair loop:
Run β†’ Error β†’ Analyze β†’ AI suggests fix β†’ Retry
"""
await self._emit(task_id, session_id, "self_repair_start", {
"command": command[:100],
"error": error_output[:200],
})
for attempt in range(1, max_attempts + 1):
await self._emit(task_id, session_id, "self_repair_attempt", {
"attempt": attempt,
"max": max_attempts,
})
if ai_router:
fix_prompt = [
{"role": "system", "content": "You are an expert DevOps engineer. Analyze the error and provide the EXACT fixed command or commands. Return ONLY the command(s), one per line. No explanations."},
{"role": "user", "content": f"Original command: {command}\n\nError output:\n{error_output[:1000]}\n\nProvide the corrected command(s):"},
]
try:
fixed_cmd = await ai_router.complete(fix_prompt, temperature=0.1, max_tokens=300)
fixed_cmd = fixed_cmd.strip().strip("`").strip()
# Remove markdown code blocks
if fixed_cmd.startswith("```"):
lines = fixed_cmd.split("\n")
fixed_cmd = "\n".join(lines[1:-1]) if len(lines) > 2 else fixed_cmd
except Exception:
fixed_cmd = command
await self._emit(task_id, session_id, "self_repair_fix", {
"attempt": attempt,
"fixed_command": fixed_cmd[:200],
})
result = await self.execute(fixed_cmd, session_id=session_id, task_id=task_id)
if result["success"]:
await self._emit(task_id, session_id, "self_repair_success", {
"attempt": attempt,
"fixed_command": fixed_cmd[:100],
})
return {
"success": True,
"fixed_command": fixed_cmd,
"result": result,
"attempts": attempt,
}
error_output = result["output"]
else:
break
await self._emit(task_id, session_id, "self_repair_failed", {
"max_attempts": max_attempts,
"last_error": error_output[:200],
})
return {
"success": False,
"error": "Self-repair exhausted all attempts",
"last_error": error_output,
"attempts": max_attempts,
}
# ─── Emit Helper ─────────────────────────────────────────────────────────
async def _emit(self, task_id: str, session_id: str, event: str, data: Dict):
if not self.ws:
return
try:
if task_id:
await self.ws.emit(task_id, event, data, session_id=session_id)
if session_id:
await self.ws.emit_chat(session_id, event, data)
except Exception:
pass