""" GitHub Tool — Real Autonomous GitHub Operations cloneRepo / createRepo / createBranch / commitChanges / pushChanges / openPR / readIssues """ import asyncio import os import subprocess from typing import Dict, List, Optional import httpx import structlog log = structlog.get_logger() GITHUB_TOKEN = os.environ.get("GITHUB_TOKEN", "") WORKSPACE = os.environ.get("WORKSPACE_DIR", "/tmp/god_workspace") class GitHubTool: def __init__(self): self.token = GITHUB_TOKEN or os.environ.get("GITHUB_TOKEN", "") self.api = "https://api.github.com" self.headers = { "Authorization": f"token {self.token}", "Accept": "application/vnd.github.v3+json", "Content-Type": "application/json", } def _refresh_token(self): self.token = os.environ.get("GITHUB_TOKEN", "") self.headers["Authorization"] = f"token {self.token}" async def _api(self, method: str, path: str, **kwargs) -> Dict: self._refresh_token() url = f"{self.api}{path}" if not path.startswith("http") else path async with httpx.AsyncClient(timeout=30) as client: resp = await client.request(method, url, headers=self.headers, **kwargs) if resp.status_code >= 400: return {"success": False, "error": resp.text, "status": resp.status_code} try: return {"success": True, "data": resp.json()} except Exception: return {"success": True, "data": resp.text} # ─── Run Git Command ─────────────────────────────────────────────────────── async def _git(self, cmd: str, cwd: str = WORKSPACE, timeout: int = 60) -> Dict: try: env = os.environ.copy() env["GIT_ASKPASS"] = "echo" env["GIT_TERMINAL_PROMPT"] = "0" if self.token: env["GIT_TOKEN"] = self.token proc = await asyncio.create_subprocess_shell( cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=cwd, env=env, ) stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout) out = stdout.decode("utf-8", errors="replace") err = stderr.decode("utf-8", errors="replace") return { "success": proc.returncode == 0, "stdout": out, "stderr": err, "returncode": proc.returncode, "output": out or err, } except asyncio.TimeoutError: return {"success": False, "error": f"Git command timed out: {cmd[:60]}"} except Exception as e: return {"success": False, "error": str(e)} # ─── Clone Repo ─────────────────────────────────────────────────────────── async def clone_repo(self, repo_url: str, dest: str = "", branch: str = "") -> Dict: """Clone a GitHub repository into workspace.""" if self.token and "github.com" in repo_url: if repo_url.startswith("https://github.com/"): repo_url = repo_url.replace( "https://github.com/", f"https://{self.token}@github.com/", ) dest_path = dest or os.path.join(WORKSPACE, repo_url.split("/")[-1].replace(".git", "")) os.makedirs(dest_path, exist_ok=True) branch_flag = f"--branch {branch}" if branch else "" result = await self._git(f"git clone --depth 1 {branch_flag} {repo_url} .", cwd=dest_path, timeout=120) if result["success"]: return { "success": True, "repo_url": repo_url, "local_path": dest_path, "action": "cloned", } return result # ─── Create Repo ────────────────────────────────────────────────────────── async def create_repo( self, name: str, description: str = "", private: bool = False, auto_init: bool = True, ) -> Dict: """Create a new GitHub repo via API.""" result = await self._api("POST", "/user/repos", json={ "name": name, "description": description, "private": private, "auto_init": auto_init, }) if result["success"]: data = result["data"] return { "success": True, "repo_name": name, "url": data.get("html_url", ""), "clone_url": data.get("clone_url", ""), "action": "created", } return result # ─── Create Branch ──────────────────────────────────────────────────────── async def create_branch(self, branch: str, cwd: str = WORKSPACE, from_branch: str = "main") -> Dict: """Create and checkout a new git branch.""" # First fetch to ensure we have latest await self._git(f"git fetch origin {from_branch} --quiet", cwd=cwd) result = await self._git(f"git checkout -b {branch}", cwd=cwd) if result["success"]: return {"success": True, "branch": branch, "action": "created_and_checked_out"} # Try checkout if branch exists result2 = await self._git(f"git checkout {branch}", cwd=cwd) return result2 if result2["success"] else result # ─── Commit Changes ─────────────────────────────────────────────────────── async def commit_changes( self, message: str, cwd: str = WORKSPACE, files: Optional[List[str]] = None, ) -> Dict: """Stage and commit changes.""" if files: for f in files: await self._git(f"git add {f}", cwd=cwd) else: await self._git("git add -A", cwd=cwd) result = await self._git(f'git commit -m "{message}"', cwd=cwd) return { "success": result["success"], "message": message, "output": result.get("output", ""), "action": "committed", } # ─── Push Changes ───────────────────────────────────────────────────────── async def push_changes(self, branch: str = "", cwd: str = WORKSPACE, force: bool = False) -> Dict: """Push commits to remote.""" self._refresh_token() branch_arg = branch or "" force_flag = "--force" if force else "" if branch_arg: cmd = f"git push origin {branch_arg} {force_flag}".strip() else: cmd = f"git push {force_flag}".strip() result = await self._git(cmd, cwd=cwd, timeout=60) return { "success": result["success"], "branch": branch_arg, "output": result.get("output", ""), "action": "pushed", } # ─── Open PR ────────────────────────────────────────────────────────────── async def open_pr( self, owner: str, repo: str, title: str, body: str = "", head: str = "genspark_ai_developer", base: str = "main", ) -> Dict: """Create a pull request via GitHub API.""" result = await self._api("POST", f"/repos/{owner}/{repo}/pulls", json={ "title": title, "body": body, "head": head, "base": base, }) if result["success"]: data = result["data"] return { "success": True, "pr_url": data.get("html_url", ""), "pr_number": data.get("number"), "title": title, "action": "pr_opened", } return result # ─── Read Issues ────────────────────────────────────────────────────────── async def read_issues(self, owner: str, repo: str, state: str = "open", limit: int = 10) -> Dict: result = await self._api("GET", f"/repos/{owner}/{repo}/issues?state={state}&per_page={limit}") if result["success"]: issues = result["data"] return { "success": True, "issues": [ { "number": i.get("number"), "title": i.get("title"), "state": i.get("state"), "body": (i.get("body") or "")[:500], "url": i.get("html_url"), "labels": [l["name"] for l in i.get("labels", [])], } for i in issues ], } return result # ─── Get Repo Info ──────────────────────────────────────────────────────── async def get_repo_info(self, owner: str, repo: str) -> Dict: result = await self._api("GET", f"/repos/{owner}/{repo}") if result["success"]: d = result["data"] return { "success": True, "name": d.get("name"), "full_name": d.get("full_name"), "description": d.get("description"), "url": d.get("html_url"), "clone_url": d.get("clone_url"), "default_branch": d.get("default_branch"), "stars": d.get("stargazers_count"), "language": d.get("language"), "private": d.get("private"), } return result # ─── Get File from GitHub ───────────────────────────────────────────────── async def get_file(self, owner: str, repo: str, path: str, branch: str = "main") -> Dict: import base64 result = await self._api("GET", f"/repos/{owner}/{repo}/contents/{path}?ref={branch}") if result["success"]: d = result["data"] content = base64.b64decode(d.get("content", "")).decode("utf-8", errors="replace") return {"success": True, "path": path, "content": content, "sha": d.get("sha")} return result # ─── Setup Git Config ───────────────────────────────────────────────────── async def setup_git_config(self, cwd: str = WORKSPACE, name: str = "God Agent", email: str = "god-agent@ai.com") -> Dict: await self._git(f'git config user.name "{name}"', cwd=cwd) await self._git(f'git config user.email "{email}"', cwd=cwd) return {"success": True, "action": "git_config_set"} # ─── Get Status ─────────────────────────────────────────────────────────── async def status(self, cwd: str = WORKSPACE) -> Dict: result = await self._git("git status --short", cwd=cwd) log_result = await self._git("git log --oneline -5", cwd=cwd) branch_result = await self._git("git branch --show-current", cwd=cwd) return { "success": True, "status": result.get("stdout", ""), "recent_commits": log_result.get("stdout", ""), "current_branch": branch_result.get("stdout", "").strip(), } # ─── Init + Setup Repo ──────────────────────────────────────────────────── async def init_repo(self, cwd: str = WORKSPACE, remote_url: str = "") -> Dict: await self._git("git init", cwd=cwd) await self.setup_git_config(cwd=cwd) if remote_url: await self._git(f"git remote add origin {remote_url}", cwd=cwd) return {"success": True, "cwd": cwd, "action": "initialized"} # ─── Full Autonomous Commit + Push ──────────────────────────────────────── async def auto_commit_push( self, message: str, branch: str = "main", cwd: str = WORKSPACE, ) -> Dict: """All-in-one: add → commit → push.""" await self.setup_git_config(cwd=cwd) commit_r = await self.commit_changes(message, cwd=cwd) push_r = await self.push_changes(branch, cwd=cwd) return { "success": push_r.get("success", False), "commit": commit_r, "push": push_r, "action": "committed_and_pushed", }