| """ |
| GitAgent v7 — Autonomous Git operations, PR creation, code review |
| Full GitHub integration like Manus/Genspark autonomous coding |
| """ |
| import asyncio |
| import json |
| import os |
| import re |
| from typing import Dict, List |
| import structlog |
| from .base_agent import BaseAgent |
|
|
| log = structlog.get_logger() |
|
|
| GIT_SYSTEM = """You are an elite autonomous Git and GitHub operations agent. |
| You can clone, commit, push, pull, create branches, PRs, review code, |
| generate commit messages, manage workflows, and handle merge conflicts. |
| Always write clear, conventional commit messages (feat/fix/chore/docs/refactor). |
| """ |
|
|
|
|
| class GitAgent(BaseAgent): |
| def __init__(self, ws_manager=None, ai_router=None): |
| super().__init__("GitAgent", ws_manager, ai_router) |
| self.workspace = os.environ.get("WORKSPACE_DIR", "/tmp/god_workspace") |
|
|
| async def run(self, task: str, context: Dict = {}, **kwargs) -> str: |
| session_id = kwargs.get("session_id", "") |
| task_id = kwargs.get("task_id", "") |
| await self.emit(task_id, "agent_start", {"agent": "GitAgent", "task": task[:80]}, session_id) |
|
|
| t = task.lower() |
| if any(k in t for k in ["clone", "checkout"]): |
| return await self._clone_repo(task, context, task_id, session_id) |
| if any(k in t for k in ["commit", "push", "pull request", " pr "]): |
| return await self._commit_and_push(task, context, task_id, session_id) |
| if any(k in t for k in ["review", "analyze code", "audit"]): |
| return await self._code_review(task, context, task_id, session_id) |
| return await self._git_ai_task(task, context, task_id, session_id) |
|
|
| async def _clone_repo(self, task: str, context: Dict, task_id: str, session_id: str) -> str: |
| urls = re.findall(r'https?://github\.com/[^\s]+', task) |
| if not urls: |
| return "❌ No GitHub URL found in task." |
| url = urls[0] |
| repo_name = url.rstrip("/").split("/")[-1].replace(".git", "") |
| dest = os.path.join(self.workspace, repo_name) |
| await self.emit(task_id, "tool_called", {"agent": "GitAgent", "tool": "git_clone", "step": f"Cloning {repo_name}"}, session_id) |
| token = os.environ.get("GITHUB_TOKEN", "") |
| auth_url = url.replace("https://", f"https://{token}@") if token else url |
| r = await self._run_cmd(["git", "clone", auth_url, dest]) |
| if r["returncode"] == 0: |
| await self.emit(task_id, "git_cloned", {"repo": repo_name, "path": dest}, session_id) |
| return f"✅ **Cloned** `{repo_name}` → `{dest}`\n```\n{r['stdout'][:500]}\n```" |
| return f"❌ Clone failed:\n```\n{r['stderr'][:500]}\n```" |
|
|
| async def _commit_and_push(self, task: str, context: Dict, task_id: str, session_id: str) -> str: |
| repo_path = context.get("repo_path", self.workspace) |
| await self.emit(task_id, "tool_called", {"agent": "GitAgent", "tool": "git_commit", "step": "Committing"}, session_id) |
| msgs = [ |
| {"role": "system", "content": "Generate a conventional commit message. Return ONLY the one-line message."}, |
| {"role": "user", "content": f"Task: {task}\nContext: {json.dumps(context)[:200]}"}, |
| ] |
| commit_msg = (await self.llm(msgs, task_id=task_id, session_id=session_id, temperature=0.3, max_tokens=80)).strip().split("\n")[0][:100] |
| results = [] |
| for cmd in [["git", "add", "-A"], ["git", "commit", "-m", commit_msg]]: |
| r = await self._run_cmd(cmd, cwd=repo_path) |
| results.append(f"$ {' '.join(cmd)}\n{r['stdout']}{r['stderr']}") |
| branch = context.get("branch", "main") |
| r = await self._run_cmd(["git", "push", "origin", branch], cwd=repo_path) |
| results.append(f"$ git push\n{r['stdout']}{r['stderr']}") |
| return f"✅ **Committed:** `{commit_msg}`\n\n```\n" + "\n".join(results) + "\n```" |
|
|
| async def _code_review(self, task: str, context: Dict, task_id: str, session_id: str) -> str: |
| repo_path = context.get("repo_path", self.workspace) |
| diff = await self._run_cmd(["git", "diff", "HEAD~1"], cwd=repo_path) |
| msgs = [ |
| {"role": "system", "content": GIT_SYSTEM}, |
| {"role": "user", "content": f"Task: {task}\n\nDiff:\n{diff['stdout'][:2500]}\n\nProvide code review: summary, issues, suggestions, score (1-10)."}, |
| ] |
| return await self.llm(msgs, task_id=task_id, session_id=session_id, temperature=0.3, max_tokens=4096) |
|
|
| async def _git_ai_task(self, task: str, context: Dict, task_id: str, session_id: str) -> str: |
| msgs = [ |
| {"role": "system", "content": GIT_SYSTEM}, |
| {"role": "user", "content": f"Task: {task}\nWorkspace: {self.workspace}\nContext: {json.dumps(context)[:300]}"}, |
| ] |
| return await self.llm(msgs, task_id=task_id, session_id=session_id, temperature=0.3, max_tokens=4096) |
|
|
| async def create_github_pr(self, repo_owner: str, repo_name: str, title: str, body: str, |
| head_branch: str, base_branch: str = "main", |
| task_id: str = "", session_id: str = "") -> Dict: |
| import httpx |
| token = os.environ.get("GITHUB_TOKEN", "") |
| if not token: |
| return {"error": "GITHUB_TOKEN not set"} |
| async with httpx.AsyncClient(timeout=30) as client: |
| resp = await client.post( |
| f"https://api.github.com/repos/{repo_owner}/{repo_name}/pulls", |
| headers={"Authorization": f"token {token}", "Accept": "application/vnd.github.v3+json"}, |
| json={"title": title, "body": body, "head": head_branch, "base": base_branch}, |
| ) |
| data = resp.json() |
| if resp.status_code == 201: |
| await self.emit(task_id, "pr_created", {"url": data.get("html_url")}, session_id) |
| return {"success": True, "url": data.get("html_url")} |
| return {"error": data.get("message", "Failed"), "status": resp.status_code} |
|
|
| async def _run_cmd(self, cmd: List[str], cwd: str = None, timeout: int = 60) -> Dict: |
| try: |
| proc = await asyncio.create_subprocess_exec( |
| *cmd, cwd=cwd or self.workspace, |
| stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, |
| ) |
| stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout) |
| return {"returncode": proc.returncode, |
| "stdout": stdout.decode("utf-8", errors="replace"), |
| "stderr": stderr.decode("utf-8", errors="replace")} |
| except Exception as e: |
| return {"returncode": -1, "stdout": "", "stderr": str(e)} |
|
|