""" AutonomousAgent — Devin/Manus-style Execution-First Agent v4.0 NOT a chatbot. NOT a prompt wrapper. A REAL autonomous coding operator that: 1. Plans task graph (DAG) 2. Executes via real terminal + filesystem 3. Self-repairs on errors (up to 3 retries) 4. Commits to GitHub 5. Deploys to Vercel/HuggingFace 6. Verifies deployment 7. Returns live URLs + repo links """ import asyncio import json import os import re import time import uuid from typing import Any, Dict, List, Optional import structlog from .base_agent import BaseAgent from tools.real_executor import RealToolRouter, get_tool_router log = structlog.get_logger() WORKSPACE = os.environ.get("WORKSPACE_DIR", "/tmp/god_workspace") GITHUB_TOKEN = os.environ.get("GITHUB_TOKEN", "") AUTONOMOUS_SYSTEM = """You are GOD AGENT — an elite autonomous software engineer like Devin + Manus combined. You EXECUTE, not just advise. You CODE, not just describe. You DEPLOY, not just plan. YOUR MISSION CONTROL PROCESS: 1. Analyze the goal deeply 2. Create a concrete execution plan (task graph) 3. Execute each step using real tools 4. Self-repair on failures 5. Verify results 6. Report live URLs AVAILABLE TOOLS (use tool_call JSON format): - terminal.run: {"command": "bash command"} - terminal.sequence: {"commands": ["cmd1", "cmd2"]} - terminal.run_with_repair: {"command": "cmd", "related_files": ["file.py"]} - fs.read: {"path": "filename"} - fs.write: {"path": "filename", "content": "..."} - fs.patch: {"path": "file", "old": "old text", "new": "new text"} - fs.tree: {} - fs.list: {"path": "dir"} - fs.search: {"query": "text", "pattern": "*.py"} - github.clone: {"url": "https://github.com/..."} - github.create_repo: {"name": "repo-name", "description": "..."} - github.commit_push: {"repo_path": "/path", "message": "feat: ...", "branch": "main"} - github.create_pr: {"owner": "user", "repo": "name", "title": "...", "body": "...", "head": "branch"} - deploy.vercel: {"dir": "/path", "name": "project"} - deploy.hf: {"repo_path": "/path", "space_name": "user/space"} - workspace.info: {} RESPOND IN THIS FORMAT: { "thinking": "brief analysis", "plan": ["step 1", "step 2", ...], "tool_call": {"tool": "tool.name", "params": {...}}, "result_summary": "what was accomplished" } For multi-step tasks, respond with one tool_call at a time. After each tool result, continue with the next step. """ class AutonomousAgent(BaseAgent): """ Execution-first autonomous agent. Uses real tool router for actual file/terminal/github operations. """ def __init__(self, ws_manager=None, ai_router=None): super().__init__("AutonomousAgent", ws_manager, ai_router) self.tool_router: Optional[RealToolRouter] = None def _get_router(self) -> RealToolRouter: if not self.tool_router: self.tool_router = get_tool_router( ws_manager=self.ws, ai_router=self.ai_router, ) return self.tool_router async def run(self, task: str, context: Dict = {}, **kwargs) -> str: session_id = kwargs.get("session_id", "") task_id = kwargs.get("task_id", "") max_steps = kwargs.get("max_steps", 20) await self.emit(task_id, "autonomous_start", { "agent": "AutonomousAgent", "task": task[:100], "max_steps": max_steps, }, session_id) router = self._get_router() results_history = [] all_artifacts = [] # Initial planning pass plan = await self._create_execution_plan(task, context, task_id=task_id, session_id=session_id) await self.emit(task_id, "plan_ready", { "agent": "AutonomousAgent", "plan": plan, "steps": len(plan), }, session_id) step_num = 0 conversation = [] # Build initial messages conversation.append({ "role": "system", "content": AUTONOMOUS_SYSTEM, }) conversation.append({ "role": "user", "content": ( f"GOAL: {task}\n\n" f"EXECUTION PLAN:\n" + "\n".join(f"{i+1}. {s}" for i, s in enumerate(plan)) + "\n\n" f"Context: {json.dumps(context)[:500]}\n\n" f"Workspace: {WORKSPACE}\n\n" f"Start execution. Respond with the first tool_call JSON." ), }) while step_num < max_steps: step_num += 1 await self.emit(task_id, "step_start", { "step": step_num, "max_steps": max_steps, }, session_id) # Get next action from LLM raw_response = await self.llm( conversation, task_id=task_id, session_id=session_id, temperature=0.1, max_tokens=4096, ) # Parse tool call tool_call = self._parse_tool_call(raw_response) if not tool_call: # No more tool calls — task complete await self.emit(task_id, "autonomous_complete", { "steps": step_num, "artifacts": all_artifacts, "summary": raw_response[:500], }, session_id) results_history.append({"step": step_num, "result": raw_response}) break tool = tool_call.get("tool", "") params = tool_call.get("params", {}) thinking = tool_call.get("thinking", "") if thinking: await self.emit(task_id, "agent_thinking", { "thought": thinking[:200], "step": step_num, }, session_id) await self.emit(task_id, "tool_calling", { "tool": tool, "step": step_num, "params_preview": str(params)[:100], }, session_id) # Execute the tool tool_result = await router.route( tool=tool, params=params, session_id=session_id, task_id=task_id, ) # Track artifacts if tool == "deploy.vercel" and tool_result.get("url"): all_artifacts.append({ "type": "deployment", "platform": "vercel", "url": tool_result["url"], }) elif tool == "deploy.hf" and tool_result.get("url"): all_artifacts.append({ "type": "deployment", "platform": "huggingface", "url": tool_result["url"], }) elif tool == "github.create_repo" and tool_result.get("url"): all_artifacts.append({ "type": "repository", "url": tool_result["url"], "name": tool_result.get("name", ""), }) elif tool == "fs.write" and tool_result.get("success"): all_artifacts.append({ "type": "file", "path": tool_result.get("path", ""), "lines": tool_result.get("lines", 0), }) results_history.append({ "step": step_num, "tool": tool, "params": params, "result": tool_result, }) # Add result to conversation conversation.append({ "role": "assistant", "content": raw_response, }) conversation.append({ "role": "user", "content": ( f"Tool result for {tool}:\n" f"Success: {tool_result.get('success', 'N/A')}\n" f"Output: {json.dumps(tool_result)[:1500]}\n\n" f"Artifacts so far: {json.dumps(all_artifacts)[:500]}\n\n" + ("Continue with next step. Respond with next tool_call JSON, or if DONE respond with a final summary (no tool_call)." if step_num < max_steps else "Provide final summary.") ), }) # Keep conversation manageable if len(conversation) > 30: # Keep system + first user + last 20 conversation = conversation[:2] + conversation[-20:] # Build final output return self._build_final_output(task, results_history, all_artifacts, step_num) async def _create_execution_plan( self, task: str, context: Dict, task_id: str = "", session_id: str = "", ) -> List[str]: """Generate a concrete execution plan.""" messages = [ { "role": "system", "content": ( "You are an expert software architect. Create a concrete execution plan.\n" "Return ONLY a JSON array of strings, no explanation.\n" "Each step must be a concrete ACTION (not vague).\n" 'Example: ["Create project directory", "Write main.py with FastAPI routes", "Install dependencies", "Run tests", "Deploy to Vercel"]\n' "Max 10 steps." ), }, { "role": "user", "content": f"Task: {task}\nContext: {json.dumps(context)[:300]}", }, ] raw = await self.llm(messages, task_id=task_id, session_id=session_id, temperature=0.2, max_tokens=500) try: start = raw.find("[") end = raw.rfind("]") + 1 if start >= 0 and end > start: plan = json.loads(raw[start:end]) if isinstance(plan, list): return [str(s) for s in plan[:10]] except Exception: pass # Fallback plan return [ "Analyze requirements", "Set up project structure", "Write core implementation", "Add error handling", "Test functionality", "Package and deploy", ] def _parse_tool_call(self, raw: str) -> Optional[Dict]: """Parse tool_call JSON from LLM response.""" # Try to find JSON block patterns = [ r'```json\s*(\{.*?\})\s*```', r'```\s*(\{.*?\})\s*```', r'(\{[^{}]*"tool_call"[^{}]*\{.*?\}.*?\})', ] for pattern in patterns: match = re.search(pattern, raw, re.DOTALL) if match: try: data = json.loads(match.group(1)) if "tool_call" in data: tc = data["tool_call"] tc["thinking"] = data.get("thinking", "") return tc return data except Exception: pass # Try direct JSON parse try: start = raw.find("{") end = raw.rfind("}") + 1 if start >= 0 and end > start: data = json.loads(raw[start:end]) if "tool_call" in data: tc = data["tool_call"] tc["thinking"] = data.get("thinking", "") return tc if "tool" in data: return data except Exception: pass return None def _build_final_output( self, task: str, results: List[Dict], artifacts: List[Dict], steps: int, ) -> str: """Build a comprehensive final output.""" lines = [f"## ✅ Task Complete: {task[:80]}\n"] lines.append(f"**Steps executed:** {steps}") if artifacts: lines.append("\n### 🎯 Artifacts\n") for a in artifacts: if a["type"] == "deployment": lines.append(f"- 🌐 **{a['platform'].title()} Deploy:** [{a['url']}]({a['url']})") elif a["type"] == "repository": lines.append(f"- 📦 **GitHub Repo:** [{a.get('name', a['url'])}]({a['url']})") elif a["type"] == "file": lines.append(f"- 📄 **File:** `{a['path']}` ({a.get('lines', 0)} lines)") # Show key steps lines.append("\n### 📋 Execution Log\n") for r in results[-8:]: tool = r.get("tool", "thinking") result = r.get("result", {}) success = result.get("success", True) if isinstance(result, dict) else True icon = "✅" if success else "❌" lines.append(f"{icon} Step {r['step']}: `{tool}`") return "\n".join(lines)