God Agent AI
feat: v4.0 Real Execution Engine - TerminalEngine, FilesystemEngine, GitHubEngine, SelfRepairEngine, DeploymentEngine, AutonomousAgent
e85db04 | """ | |
| AutonomousAgent β Devin/Manus-style Execution-First Agent v4.0 | |
| NOT a chatbot. NOT a prompt wrapper. | |
| A REAL autonomous coding operator that: | |
| 1. Plans task graph (DAG) | |
| 2. Executes via real terminal + filesystem | |
| 3. Self-repairs on errors (up to 3 retries) | |
| 4. Commits to GitHub | |
| 5. Deploys to Vercel/HuggingFace | |
| 6. Verifies deployment | |
| 7. Returns live URLs + repo links | |
| """ | |
| import asyncio | |
| import json | |
| import os | |
| import re | |
| import time | |
| import uuid | |
| from typing import Any, Dict, List, Optional | |
| import structlog | |
| from .base_agent import BaseAgent | |
| from tools.real_executor import RealToolRouter, get_tool_router | |
| log = structlog.get_logger() | |
| WORKSPACE = os.environ.get("WORKSPACE_DIR", "/tmp/god_workspace") | |
| GITHUB_TOKEN = os.environ.get("GITHUB_TOKEN", "") | |
| AUTONOMOUS_SYSTEM = """You are GOD AGENT β an elite autonomous software engineer like Devin + Manus combined. | |
| You EXECUTE, not just advise. | |
| You CODE, not just describe. | |
| You DEPLOY, not just plan. | |
| YOUR MISSION CONTROL PROCESS: | |
| 1. Analyze the goal deeply | |
| 2. Create a concrete execution plan (task graph) | |
| 3. Execute each step using real tools | |
| 4. Self-repair on failures | |
| 5. Verify results | |
| 6. Report live URLs | |
| AVAILABLE TOOLS (use tool_call JSON format): | |
| - terminal.run: {"command": "bash command"} | |
| - terminal.sequence: {"commands": ["cmd1", "cmd2"]} | |
| - terminal.run_with_repair: {"command": "cmd", "related_files": ["file.py"]} | |
| - fs.read: {"path": "filename"} | |
| - fs.write: {"path": "filename", "content": "..."} | |
| - fs.patch: {"path": "file", "old": "old text", "new": "new text"} | |
| - fs.tree: {} | |
| - fs.list: {"path": "dir"} | |
| - fs.search: {"query": "text", "pattern": "*.py"} | |
| - github.clone: {"url": "https://github.com/..."} | |
| - github.create_repo: {"name": "repo-name", "description": "..."} | |
| - github.commit_push: {"repo_path": "/path", "message": "feat: ...", "branch": "main"} | |
| - github.create_pr: {"owner": "user", "repo": "name", "title": "...", "body": "...", "head": "branch"} | |
| - deploy.vercel: {"dir": "/path", "name": "project"} | |
| - deploy.hf: {"repo_path": "/path", "space_name": "user/space"} | |
| - workspace.info: {} | |
| RESPOND IN THIS FORMAT: | |
| { | |
| "thinking": "brief analysis", | |
| "plan": ["step 1", "step 2", ...], | |
| "tool_call": {"tool": "tool.name", "params": {...}}, | |
| "result_summary": "what was accomplished" | |
| } | |
| For multi-step tasks, respond with one tool_call at a time. | |
| After each tool result, continue with the next step. | |
| """ | |
| class AutonomousAgent(BaseAgent): | |
| """ | |
| Execution-first autonomous agent. | |
| Uses real tool router for actual file/terminal/github operations. | |
| """ | |
| def __init__(self, ws_manager=None, ai_router=None): | |
| super().__init__("AutonomousAgent", ws_manager, ai_router) | |
| self.tool_router: Optional[RealToolRouter] = None | |
| def _get_router(self) -> RealToolRouter: | |
| if not self.tool_router: | |
| self.tool_router = get_tool_router( | |
| ws_manager=self.ws, | |
| ai_router=self.ai_router, | |
| ) | |
| return self.tool_router | |
| async def run(self, task: str, context: Dict = {}, **kwargs) -> str: | |
| session_id = kwargs.get("session_id", "") | |
| task_id = kwargs.get("task_id", "") | |
| max_steps = kwargs.get("max_steps", 20) | |
| await self.emit(task_id, "autonomous_start", { | |
| "agent": "AutonomousAgent", | |
| "task": task[:100], | |
| "max_steps": max_steps, | |
| }, session_id) | |
| router = self._get_router() | |
| results_history = [] | |
| all_artifacts = [] | |
| # Initial planning pass | |
| plan = await self._create_execution_plan(task, context, task_id=task_id, session_id=session_id) | |
| await self.emit(task_id, "plan_ready", { | |
| "agent": "AutonomousAgent", | |
| "plan": plan, | |
| "steps": len(plan), | |
| }, session_id) | |
| step_num = 0 | |
| conversation = [] | |
| # Build initial messages | |
| conversation.append({ | |
| "role": "system", | |
| "content": AUTONOMOUS_SYSTEM, | |
| }) | |
| conversation.append({ | |
| "role": "user", | |
| "content": ( | |
| f"GOAL: {task}\n\n" | |
| f"EXECUTION PLAN:\n" + "\n".join(f"{i+1}. {s}" for i, s in enumerate(plan)) + "\n\n" | |
| f"Context: {json.dumps(context)[:500]}\n\n" | |
| f"Workspace: {WORKSPACE}\n\n" | |
| f"Start execution. Respond with the first tool_call JSON." | |
| ), | |
| }) | |
| while step_num < max_steps: | |
| step_num += 1 | |
| await self.emit(task_id, "step_start", { | |
| "step": step_num, | |
| "max_steps": max_steps, | |
| }, session_id) | |
| # Get next action from LLM | |
| raw_response = await self.llm( | |
| conversation, | |
| task_id=task_id, | |
| session_id=session_id, | |
| temperature=0.1, | |
| max_tokens=4096, | |
| ) | |
| # Parse tool call | |
| tool_call = self._parse_tool_call(raw_response) | |
| if not tool_call: | |
| # No more tool calls β task complete | |
| await self.emit(task_id, "autonomous_complete", { | |
| "steps": step_num, | |
| "artifacts": all_artifacts, | |
| "summary": raw_response[:500], | |
| }, session_id) | |
| results_history.append({"step": step_num, "result": raw_response}) | |
| break | |
| tool = tool_call.get("tool", "") | |
| params = tool_call.get("params", {}) | |
| thinking = tool_call.get("thinking", "") | |
| if thinking: | |
| await self.emit(task_id, "agent_thinking", { | |
| "thought": thinking[:200], | |
| "step": step_num, | |
| }, session_id) | |
| await self.emit(task_id, "tool_calling", { | |
| "tool": tool, | |
| "step": step_num, | |
| "params_preview": str(params)[:100], | |
| }, session_id) | |
| # Execute the tool | |
| tool_result = await router.route( | |
| tool=tool, | |
| params=params, | |
| session_id=session_id, | |
| task_id=task_id, | |
| ) | |
| # Track artifacts | |
| if tool == "deploy.vercel" and tool_result.get("url"): | |
| all_artifacts.append({ | |
| "type": "deployment", | |
| "platform": "vercel", | |
| "url": tool_result["url"], | |
| }) | |
| elif tool == "deploy.hf" and tool_result.get("url"): | |
| all_artifacts.append({ | |
| "type": "deployment", | |
| "platform": "huggingface", | |
| "url": tool_result["url"], | |
| }) | |
| elif tool == "github.create_repo" and tool_result.get("url"): | |
| all_artifacts.append({ | |
| "type": "repository", | |
| "url": tool_result["url"], | |
| "name": tool_result.get("name", ""), | |
| }) | |
| elif tool == "fs.write" and tool_result.get("success"): | |
| all_artifacts.append({ | |
| "type": "file", | |
| "path": tool_result.get("path", ""), | |
| "lines": tool_result.get("lines", 0), | |
| }) | |
| results_history.append({ | |
| "step": step_num, | |
| "tool": tool, | |
| "params": params, | |
| "result": tool_result, | |
| }) | |
| # Add result to conversation | |
| conversation.append({ | |
| "role": "assistant", | |
| "content": raw_response, | |
| }) | |
| conversation.append({ | |
| "role": "user", | |
| "content": ( | |
| f"Tool result for {tool}:\n" | |
| f"Success: {tool_result.get('success', 'N/A')}\n" | |
| f"Output: {json.dumps(tool_result)[:1500]}\n\n" | |
| f"Artifacts so far: {json.dumps(all_artifacts)[:500]}\n\n" | |
| + ("Continue with next step. Respond with next tool_call JSON, or if DONE respond with a final summary (no tool_call)." if step_num < max_steps else "Provide final summary.") | |
| ), | |
| }) | |
| # Keep conversation manageable | |
| if len(conversation) > 30: | |
| # Keep system + first user + last 20 | |
| conversation = conversation[:2] + conversation[-20:] | |
| # Build final output | |
| return self._build_final_output(task, results_history, all_artifacts, step_num) | |
| async def _create_execution_plan( | |
| self, | |
| task: str, | |
| context: Dict, | |
| task_id: str = "", | |
| session_id: str = "", | |
| ) -> List[str]: | |
| """Generate a concrete execution plan.""" | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": ( | |
| "You are an expert software architect. Create a concrete execution plan.\n" | |
| "Return ONLY a JSON array of strings, no explanation.\n" | |
| "Each step must be a concrete ACTION (not vague).\n" | |
| 'Example: ["Create project directory", "Write main.py with FastAPI routes", "Install dependencies", "Run tests", "Deploy to Vercel"]\n' | |
| "Max 10 steps." | |
| ), | |
| }, | |
| { | |
| "role": "user", | |
| "content": f"Task: {task}\nContext: {json.dumps(context)[:300]}", | |
| }, | |
| ] | |
| raw = await self.llm(messages, task_id=task_id, session_id=session_id, temperature=0.2, max_tokens=500) | |
| try: | |
| start = raw.find("[") | |
| end = raw.rfind("]") + 1 | |
| if start >= 0 and end > start: | |
| plan = json.loads(raw[start:end]) | |
| if isinstance(plan, list): | |
| return [str(s) for s in plan[:10]] | |
| except Exception: | |
| pass | |
| # Fallback plan | |
| return [ | |
| "Analyze requirements", | |
| "Set up project structure", | |
| "Write core implementation", | |
| "Add error handling", | |
| "Test functionality", | |
| "Package and deploy", | |
| ] | |
| def _parse_tool_call(self, raw: str) -> Optional[Dict]: | |
| """Parse tool_call JSON from LLM response.""" | |
| # Try to find JSON block | |
| patterns = [ | |
| r'```json\s*(\{.*?\})\s*```', | |
| r'```\s*(\{.*?\})\s*```', | |
| r'(\{[^{}]*"tool_call"[^{}]*\{.*?\}.*?\})', | |
| ] | |
| for pattern in patterns: | |
| match = re.search(pattern, raw, re.DOTALL) | |
| if match: | |
| try: | |
| data = json.loads(match.group(1)) | |
| if "tool_call" in data: | |
| tc = data["tool_call"] | |
| tc["thinking"] = data.get("thinking", "") | |
| return tc | |
| return data | |
| except Exception: | |
| pass | |
| # Try direct JSON parse | |
| try: | |
| start = raw.find("{") | |
| end = raw.rfind("}") + 1 | |
| if start >= 0 and end > start: | |
| data = json.loads(raw[start:end]) | |
| if "tool_call" in data: | |
| tc = data["tool_call"] | |
| tc["thinking"] = data.get("thinking", "") | |
| return tc | |
| if "tool" in data: | |
| return data | |
| except Exception: | |
| pass | |
| return None | |
| def _build_final_output( | |
| self, | |
| task: str, | |
| results: List[Dict], | |
| artifacts: List[Dict], | |
| steps: int, | |
| ) -> str: | |
| """Build a comprehensive final output.""" | |
| lines = [f"## β Task Complete: {task[:80]}\n"] | |
| lines.append(f"**Steps executed:** {steps}") | |
| if artifacts: | |
| lines.append("\n### π― Artifacts\n") | |
| for a in artifacts: | |
| if a["type"] == "deployment": | |
| lines.append(f"- π **{a['platform'].title()} Deploy:** [{a['url']}]({a['url']})") | |
| elif a["type"] == "repository": | |
| lines.append(f"- π¦ **GitHub Repo:** [{a.get('name', a['url'])}]({a['url']})") | |
| elif a["type"] == "file": | |
| lines.append(f"- π **File:** `{a['path']}` ({a.get('lines', 0)} lines)") | |
| # Show key steps | |
| lines.append("\n### π Execution Log\n") | |
| for r in results[-8:]: | |
| tool = r.get("tool", "thinking") | |
| result = r.get("result", {}) | |
| success = result.get("success", True) if isinstance(result, dict) else True | |
| icon = "β " if success else "β" | |
| lines.append(f"{icon} Step {r['step']}: `{tool}`") | |
| return "\n".join(lines) | |