| """ |
| GitHub Tool β Real Autonomous GitHub Operations |
| cloneRepo / createRepo / createBranch / commitChanges / pushChanges / openPR / readIssues |
| """ |
|
|
| import asyncio |
| import os |
| import subprocess |
| from typing import Dict, List, Optional |
|
|
| import httpx |
| import structlog |
|
|
| log = structlog.get_logger() |
|
|
| GITHUB_TOKEN = os.environ.get("GITHUB_TOKEN", "") |
| WORKSPACE = os.environ.get("WORKSPACE_DIR", "/tmp/god_workspace") |
|
|
|
|
| class GitHubTool: |
| def __init__(self): |
| self.token = GITHUB_TOKEN or os.environ.get("GITHUB_TOKEN", "") |
| self.api = "https://api.github.com" |
| self.headers = { |
| "Authorization": f"token {self.token}", |
| "Accept": "application/vnd.github.v3+json", |
| "Content-Type": "application/json", |
| } |
|
|
| def _refresh_token(self): |
| self.token = os.environ.get("GITHUB_TOKEN", "") |
| self.headers["Authorization"] = f"token {self.token}" |
|
|
| async def _api(self, method: str, path: str, **kwargs) -> Dict: |
| self._refresh_token() |
| url = f"{self.api}{path}" if not path.startswith("http") else path |
| async with httpx.AsyncClient(timeout=30) as client: |
| resp = await client.request(method, url, headers=self.headers, **kwargs) |
| if resp.status_code >= 400: |
| return {"success": False, "error": resp.text, "status": resp.status_code} |
| try: |
| return {"success": True, "data": resp.json()} |
| except Exception: |
| return {"success": True, "data": resp.text} |
|
|
| |
|
|
| async def _git(self, cmd: str, cwd: str = WORKSPACE, timeout: int = 60) -> Dict: |
| try: |
| env = os.environ.copy() |
| env["GIT_ASKPASS"] = "echo" |
| env["GIT_TERMINAL_PROMPT"] = "0" |
| if self.token: |
| env["GIT_TOKEN"] = self.token |
| proc = await asyncio.create_subprocess_shell( |
| cmd, |
| stdout=asyncio.subprocess.PIPE, |
| stderr=asyncio.subprocess.PIPE, |
| cwd=cwd, |
| env=env, |
| ) |
| stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout) |
| out = stdout.decode("utf-8", errors="replace") |
| err = stderr.decode("utf-8", errors="replace") |
| return { |
| "success": proc.returncode == 0, |
| "stdout": out, |
| "stderr": err, |
| "returncode": proc.returncode, |
| "output": out or err, |
| } |
| except asyncio.TimeoutError: |
| return {"success": False, "error": f"Git command timed out: {cmd[:60]}"} |
| except Exception as e: |
| return {"success": False, "error": str(e)} |
|
|
| |
|
|
| async def clone_repo(self, repo_url: str, dest: str = "", branch: str = "") -> Dict: |
| """Clone a GitHub repository into workspace.""" |
| if self.token and "github.com" in repo_url: |
| if repo_url.startswith("https://github.com/"): |
| repo_url = repo_url.replace( |
| "https://github.com/", |
| f"https://{self.token}@github.com/", |
| ) |
| dest_path = dest or os.path.join(WORKSPACE, repo_url.split("/")[-1].replace(".git", "")) |
| os.makedirs(dest_path, exist_ok=True) |
| branch_flag = f"--branch {branch}" if branch else "" |
| result = await self._git(f"git clone --depth 1 {branch_flag} {repo_url} .", cwd=dest_path, timeout=120) |
| if result["success"]: |
| return { |
| "success": True, |
| "repo_url": repo_url, |
| "local_path": dest_path, |
| "action": "cloned", |
| } |
| return result |
|
|
| |
|
|
| async def create_repo( |
| self, |
| name: str, |
| description: str = "", |
| private: bool = False, |
| auto_init: bool = True, |
| ) -> Dict: |
| """Create a new GitHub repo via API.""" |
| result = await self._api("POST", "/user/repos", json={ |
| "name": name, |
| "description": description, |
| "private": private, |
| "auto_init": auto_init, |
| }) |
| if result["success"]: |
| data = result["data"] |
| return { |
| "success": True, |
| "repo_name": name, |
| "url": data.get("html_url", ""), |
| "clone_url": data.get("clone_url", ""), |
| "action": "created", |
| } |
| return result |
|
|
| |
|
|
| async def create_branch(self, branch: str, cwd: str = WORKSPACE, from_branch: str = "main") -> Dict: |
| """Create and checkout a new git branch.""" |
| |
| await self._git(f"git fetch origin {from_branch} --quiet", cwd=cwd) |
| result = await self._git(f"git checkout -b {branch}", cwd=cwd) |
| if result["success"]: |
| return {"success": True, "branch": branch, "action": "created_and_checked_out"} |
| |
| result2 = await self._git(f"git checkout {branch}", cwd=cwd) |
| return result2 if result2["success"] else result |
|
|
| |
|
|
| async def commit_changes( |
| self, |
| message: str, |
| cwd: str = WORKSPACE, |
| files: Optional[List[str]] = None, |
| ) -> Dict: |
| """Stage and commit changes.""" |
| if files: |
| for f in files: |
| await self._git(f"git add {f}", cwd=cwd) |
| else: |
| await self._git("git add -A", cwd=cwd) |
|
|
| result = await self._git(f'git commit -m "{message}"', cwd=cwd) |
| return { |
| "success": result["success"], |
| "message": message, |
| "output": result.get("output", ""), |
| "action": "committed", |
| } |
|
|
| |
|
|
| async def push_changes(self, branch: str = "", cwd: str = WORKSPACE, force: bool = False) -> Dict: |
| """Push commits to remote.""" |
| self._refresh_token() |
| branch_arg = branch or "" |
| force_flag = "--force" if force else "" |
| if branch_arg: |
| cmd = f"git push origin {branch_arg} {force_flag}".strip() |
| else: |
| cmd = f"git push {force_flag}".strip() |
| result = await self._git(cmd, cwd=cwd, timeout=60) |
| return { |
| "success": result["success"], |
| "branch": branch_arg, |
| "output": result.get("output", ""), |
| "action": "pushed", |
| } |
|
|
| |
|
|
| async def open_pr( |
| self, |
| owner: str, |
| repo: str, |
| title: str, |
| body: str = "", |
| head: str = "genspark_ai_developer", |
| base: str = "main", |
| ) -> Dict: |
| """Create a pull request via GitHub API.""" |
| result = await self._api("POST", f"/repos/{owner}/{repo}/pulls", json={ |
| "title": title, |
| "body": body, |
| "head": head, |
| "base": base, |
| }) |
| if result["success"]: |
| data = result["data"] |
| return { |
| "success": True, |
| "pr_url": data.get("html_url", ""), |
| "pr_number": data.get("number"), |
| "title": title, |
| "action": "pr_opened", |
| } |
| return result |
|
|
| |
|
|
| async def read_issues(self, owner: str, repo: str, state: str = "open", limit: int = 10) -> Dict: |
| result = await self._api("GET", f"/repos/{owner}/{repo}/issues?state={state}&per_page={limit}") |
| if result["success"]: |
| issues = result["data"] |
| return { |
| "success": True, |
| "issues": [ |
| { |
| "number": i.get("number"), |
| "title": i.get("title"), |
| "state": i.get("state"), |
| "body": (i.get("body") or "")[:500], |
| "url": i.get("html_url"), |
| "labels": [l["name"] for l in i.get("labels", [])], |
| } |
| for i in issues |
| ], |
| } |
| return result |
|
|
| |
|
|
| async def get_repo_info(self, owner: str, repo: str) -> Dict: |
| result = await self._api("GET", f"/repos/{owner}/{repo}") |
| if result["success"]: |
| d = result["data"] |
| return { |
| "success": True, |
| "name": d.get("name"), |
| "full_name": d.get("full_name"), |
| "description": d.get("description"), |
| "url": d.get("html_url"), |
| "clone_url": d.get("clone_url"), |
| "default_branch": d.get("default_branch"), |
| "stars": d.get("stargazers_count"), |
| "language": d.get("language"), |
| "private": d.get("private"), |
| } |
| return result |
|
|
| |
|
|
| async def get_file(self, owner: str, repo: str, path: str, branch: str = "main") -> Dict: |
| import base64 |
| result = await self._api("GET", f"/repos/{owner}/{repo}/contents/{path}?ref={branch}") |
| if result["success"]: |
| d = result["data"] |
| content = base64.b64decode(d.get("content", "")).decode("utf-8", errors="replace") |
| return {"success": True, "path": path, "content": content, "sha": d.get("sha")} |
| return result |
|
|
| |
|
|
| async def setup_git_config(self, cwd: str = WORKSPACE, name: str = "God Agent", email: str = "god-agent@ai.com") -> Dict: |
| await self._git(f'git config user.name "{name}"', cwd=cwd) |
| await self._git(f'git config user.email "{email}"', cwd=cwd) |
| return {"success": True, "action": "git_config_set"} |
|
|
| |
|
|
| async def status(self, cwd: str = WORKSPACE) -> Dict: |
| result = await self._git("git status --short", cwd=cwd) |
| log_result = await self._git("git log --oneline -5", cwd=cwd) |
| branch_result = await self._git("git branch --show-current", cwd=cwd) |
| return { |
| "success": True, |
| "status": result.get("stdout", ""), |
| "recent_commits": log_result.get("stdout", ""), |
| "current_branch": branch_result.get("stdout", "").strip(), |
| } |
|
|
| |
|
|
| async def init_repo(self, cwd: str = WORKSPACE, remote_url: str = "") -> Dict: |
| await self._git("git init", cwd=cwd) |
| await self.setup_git_config(cwd=cwd) |
| if remote_url: |
| await self._git(f"git remote add origin {remote_url}", cwd=cwd) |
| return {"success": True, "cwd": cwd, "action": "initialized"} |
|
|
| |
|
|
| async def auto_commit_push( |
| self, |
| message: str, |
| branch: str = "main", |
| cwd: str = WORKSPACE, |
| ) -> Dict: |
| """All-in-one: add β commit β push.""" |
| await self.setup_git_config(cwd=cwd) |
| commit_r = await self.commit_changes(message, cwd=cwd) |
| push_r = await self.push_changes(branch, cwd=cwd) |
| return { |
| "success": push_r.get("success", False), |
| "commit": commit_r, |
| "push": push_r, |
| "action": "committed_and_pushed", |
| } |
|
|