God Agent OS CI
🚀 Deploy God Agent OS v11 - 2026-05-17 07:49
02117ee
"""
GitAgent v7 — Autonomous Git operations, PR creation, code review
Full GitHub integration like Manus/Genspark autonomous coding
"""
import asyncio
import json
import os
import re
from typing import Dict, List
import structlog
from .base_agent import BaseAgent
log = structlog.get_logger()
GIT_SYSTEM = """You are an elite autonomous Git and GitHub operations agent.
You can clone, commit, push, pull, create branches, PRs, review code,
generate commit messages, manage workflows, and handle merge conflicts.
Always write clear, conventional commit messages (feat/fix/chore/docs/refactor).
"""
class GitAgent(BaseAgent):
def __init__(self, ws_manager=None, ai_router=None):
super().__init__("GitAgent", ws_manager, ai_router)
self.workspace = os.environ.get("WORKSPACE_DIR", "/tmp/god_workspace")
async def run(self, task: str, context: Dict = {}, **kwargs) -> str:
session_id = kwargs.get("session_id", "")
task_id = kwargs.get("task_id", "")
await self.emit(task_id, "agent_start", {"agent": "GitAgent", "task": task[:80]}, session_id)
t = task.lower()
if any(k in t for k in ["clone", "checkout"]):
return await self._clone_repo(task, context, task_id, session_id)
if any(k in t for k in ["commit", "push", "pull request", " pr "]):
return await self._commit_and_push(task, context, task_id, session_id)
if any(k in t for k in ["review", "analyze code", "audit"]):
return await self._code_review(task, context, task_id, session_id)
return await self._git_ai_task(task, context, task_id, session_id)
async def _clone_repo(self, task: str, context: Dict, task_id: str, session_id: str) -> str:
urls = re.findall(r'https?://github\.com/[^\s]+', task)
if not urls:
return "❌ No GitHub URL found in task."
url = urls[0]
repo_name = url.rstrip("/").split("/")[-1].replace(".git", "")
dest = os.path.join(self.workspace, repo_name)
await self.emit(task_id, "tool_called", {"agent": "GitAgent", "tool": "git_clone", "step": f"Cloning {repo_name}"}, session_id)
token = os.environ.get("GITHUB_TOKEN", "")
auth_url = url.replace("https://", f"https://{token}@") if token else url
r = await self._run_cmd(["git", "clone", auth_url, dest])
if r["returncode"] == 0:
await self.emit(task_id, "git_cloned", {"repo": repo_name, "path": dest}, session_id)
return f"✅ **Cloned** `{repo_name}` → `{dest}`\n```\n{r['stdout'][:500]}\n```"
return f"❌ Clone failed:\n```\n{r['stderr'][:500]}\n```"
async def _commit_and_push(self, task: str, context: Dict, task_id: str, session_id: str) -> str:
repo_path = context.get("repo_path", self.workspace)
await self.emit(task_id, "tool_called", {"agent": "GitAgent", "tool": "git_commit", "step": "Committing"}, session_id)
msgs = [
{"role": "system", "content": "Generate a conventional commit message. Return ONLY the one-line message."},
{"role": "user", "content": f"Task: {task}\nContext: {json.dumps(context)[:200]}"},
]
commit_msg = (await self.llm(msgs, task_id=task_id, session_id=session_id, temperature=0.3, max_tokens=80)).strip().split("\n")[0][:100]
results = []
for cmd in [["git", "add", "-A"], ["git", "commit", "-m", commit_msg]]:
r = await self._run_cmd(cmd, cwd=repo_path)
results.append(f"$ {' '.join(cmd)}\n{r['stdout']}{r['stderr']}")
branch = context.get("branch", "main")
r = await self._run_cmd(["git", "push", "origin", branch], cwd=repo_path)
results.append(f"$ git push\n{r['stdout']}{r['stderr']}")
return f"✅ **Committed:** `{commit_msg}`\n\n```\n" + "\n".join(results) + "\n```"
async def _code_review(self, task: str, context: Dict, task_id: str, session_id: str) -> str:
repo_path = context.get("repo_path", self.workspace)
diff = await self._run_cmd(["git", "diff", "HEAD~1"], cwd=repo_path)
msgs = [
{"role": "system", "content": GIT_SYSTEM},
{"role": "user", "content": f"Task: {task}\n\nDiff:\n{diff['stdout'][:2500]}\n\nProvide code review: summary, issues, suggestions, score (1-10)."},
]
return await self.llm(msgs, task_id=task_id, session_id=session_id, temperature=0.3, max_tokens=4096)
async def _git_ai_task(self, task: str, context: Dict, task_id: str, session_id: str) -> str:
msgs = [
{"role": "system", "content": GIT_SYSTEM},
{"role": "user", "content": f"Task: {task}\nWorkspace: {self.workspace}\nContext: {json.dumps(context)[:300]}"},
]
return await self.llm(msgs, task_id=task_id, session_id=session_id, temperature=0.3, max_tokens=4096)
async def create_github_pr(self, repo_owner: str, repo_name: str, title: str, body: str,
head_branch: str, base_branch: str = "main",
task_id: str = "", session_id: str = "") -> Dict:
import httpx
token = os.environ.get("GITHUB_TOKEN", "")
if not token:
return {"error": "GITHUB_TOKEN not set"}
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.post(
f"https://api.github.com/repos/{repo_owner}/{repo_name}/pulls",
headers={"Authorization": f"token {token}", "Accept": "application/vnd.github.v3+json"},
json={"title": title, "body": body, "head": head_branch, "base": base_branch},
)
data = resp.json()
if resp.status_code == 201:
await self.emit(task_id, "pr_created", {"url": data.get("html_url")}, session_id)
return {"success": True, "url": data.get("html_url")}
return {"error": data.get("message", "Failed"), "status": resp.status_code}
async def _run_cmd(self, cmd: List[str], cwd: str = None, timeout: int = 60) -> Dict:
try:
proc = await asyncio.create_subprocess_exec(
*cmd, cwd=cwd or self.workspace,
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
return {"returncode": proc.returncode,
"stdout": stdout.decode("utf-8", errors="replace"),
"stderr": stderr.decode("utf-8", errors="replace")}
except Exception as e:
return {"returncode": -1, "stdout": "", "stderr": str(e)}