import asyncio import hmac import hashlib import httpx import logging from typing import Optional, List, Dict, Any from github import Github, GithubException from fastapi import HTTPException from config import settings logger = logging.getLogger(__name__) class GitHubService: def __init__(self): # Initialize PyGithub with the ContriBot token self.client = Github(settings.GITHUB_TOKEN) logger.info("[GITHUB] Initialized GitHubService") async def _run_async(self, func, *args, **kwargs): """Helper to run blocking PyGithub calls in a thread pool.""" loop = asyncio.get_running_loop() try: return await loop.run_in_executor(None, lambda: func(*args, **kwargs)) except GithubException as e: if isinstance(e.data, dict): detail = e.data.get("message") or str(e) else: detail = str(e.data) if e.data else str(e) # Log rate limit specifically if e.status == 403 and "rate limit" in detail.lower(): logger.warning(f"[GITHUB] Rate limit exceeded: {detail}") elif e.status != 404: # Don't log 404s as errors if they are expected logger.error(f"[GITHUB] API Error ({e.status}): {detail}") raise HTTPException(status_code=e.status, detail=detail) except Exception as e: logger.error(f"[GITHUB] Unexpected error: {e}") raise HTTPException(status_code=500, detail=str(e)) # --- Repository Operations --- async def get_repo(self, full_name: str) -> dict: logger.debug(f"[GITHUB][REPO: {full_name}] Fetching repo data") repo = await self._run_async(self.client.get_repo, full_name) return repo.raw_data async def get_repo_tree(self, full_name: str) -> dict: logger.debug(f"[GITHUB][REPO: {full_name}] Fetching recursive tree") def _get_tree(): repo = self.client.get_repo(full_name) branch = repo.get_branch(repo.default_branch) tree = repo.get_git_tree(branch.commit.sha, recursive=True) return {"tree": [{"path": t.path, "type": t.type, "sha": t.sha} for t in tree.tree]} return await self._run_async(_get_tree) async def get_file_content(self, full_name: str, path: str) -> str: logger.debug(f"[GITHUB][REPO: {full_name}] Fetching file content: {path}") def _get_content(): repo = self.client.get_repo(full_name) contents = repo.get_contents(path) # Handle case where path is a directory if isinstance(contents, list): raise Exception(f"Path {path} is a directory, not a file.") return contents.decoded_content.decode("utf-8") return await self._run_async(_get_content) async def file_exists(self, full_name: str, path: str, branch: str = None) -> bool: logger.debug(f"[GITHUB][REPO: {full_name}] Checking if file exists: {path} (branch: {branch})") def _check_exists(): repo = self.client.get_repo(full_name) try: repo.get_contents(path, ref=branch) if branch else repo.get_contents(path) return True except GithubException as e: if e.status == 404: return False raise e return await self._run_async(_check_exists) async def create_or_update_file(self, full_name: str, path: str, content: str, message: str, branch: str) -> dict: logger.info(f"[GITHUB][REPO: {full_name}] Committing file {path} to branch {branch}") def _commit_file(): repo = self.client.get_repo(full_name) try: contents = repo.get_contents(path, ref=branch) logger.debug(f"[GITHUB][REPO: {full_name}] Updating existing file {path}") res = repo.update_file(contents.path, message, content, contents.sha, branch=branch) return res["commit"].raw_data except GithubException as e: if e.status == 404: logger.debug(f"[GITHUB][REPO: {full_name}] Creating new file {path}") res = repo.create_file(path, message, content, branch=branch) return res["commit"].raw_data raise e return await self._run_async(_commit_file) async def create_branch(self, full_name: str, branch_name: str, from_branch: Optional[str] = None) -> dict: logger.info(f"[GITHUB][REPO: {full_name}] Creating branch {branch_name} from {from_branch or 'default'}") def _create_branch(): repo = self.client.get_repo(full_name) base = from_branch or repo.default_branch source_branch = repo.get_branch(base) try: ref = repo.create_git_ref(ref=f"refs/heads/{branch_name}", sha=source_branch.commit.sha) logger.info(f"[GITHUB][REPO: {full_name}] Branch {branch_name} created") return {"ref": ref.ref, "url": ref.url} except GithubException as e: if e.status == 422 and "Reference already exists" in str(e.data): logger.info(f"[GITHUB][REPO: {full_name}] Branch {branch_name} already exists") ref = repo.get_git_ref(f"heads/{branch_name}") return {"ref": ref.ref, "url": ref.url} raise e return await self._run_async(_create_branch) async def delete_branch(self, full_name: str, branch_name: str) -> dict: def _delete_branch(): repo = self.client.get_repo(full_name) ref = repo.get_git_ref(f"heads/{branch_name}") ref.delete() return {"status": "deleted", "branch": branch_name} return await self._run_async(_delete_branch) # --- Issue Operations --- async def create_issue(self, full_name: str, title: str, body: str, labels: list[str] = []) -> int: logger.info(f"[GITHUB][REPO: {full_name}] Creating issue: {title}") def _create_issue(): repo = self.client.get_repo(full_name) issue = repo.create_issue(title=title, body=body, labels=labels) logger.info(f"[GITHUB][REPO: {full_name}] Issue #{issue.number} created") return issue.number return await self._run_async(_create_issue) async def close_issue(self, full_name: str, issue_number: int, comment: str = None) -> dict: logger.info(f"[GITHUB][REPO: {full_name}] Closing issue #{issue_number}") def _close_issue(): repo = self.client.get_repo(full_name) issue = repo.get_issue(number=issue_number) if comment: logger.debug(f"[GITHUB][REPO: {full_name}] Adding closing comment to #{issue_number}") issue.create_comment(comment) issue.edit(state="closed") return issue.raw_data return await self._run_async(_close_issue) async def get_issue(self, full_name: str, issue_number: int) -> dict: logger.debug(f"[GITHUB][REPO: {full_name}] Fetching issue #{issue_number}") def _get_issue(): repo = self.client.get_repo(full_name) issue = repo.get_issue(number=issue_number) return issue.raw_data return await self._run_async(_get_issue) async def add_issue_comment(self, full_name: str, issue_number: int, body: str) -> dict: logger.info(f"[GITHUB][REPO: {full_name}] Adding comment to issue #{issue_number}") def _add_comment(): repo = self.client.get_repo(full_name) issue = repo.get_issue(number=issue_number) comment = issue.create_comment(body) return comment.raw_data return await self._run_async(_add_comment) async def get_issue_comments(self, full_name: str, issue_number: int) -> list[dict]: logger.debug(f"[GITHUB][REPO: {full_name}] Fetching comments for issue #{issue_number}") def _get_comments(): repo = self.client.get_repo(full_name) issue = repo.get_issue(number=issue_number) return [c.raw_data for c in issue.get_comments()] return await self._run_async(_get_comments) async def list_open_issues(self, full_name: str) -> list[dict]: logger.debug(f"[GITHUB][REPO: {full_name}] Listing open issues") def _list_issues(): repo = self.client.get_repo(full_name) return [i.raw_data for i in repo.get_issues(state="open")] return await self._run_async(_list_issues) # --- Pull Request Operations --- async def create_pull_request(self, full_name: str, title: str, body: str, head: str, base: Optional[str] = None) -> int: logger.info(f"[GITHUB][REPO: {full_name}] Creating Pull Request: {title} (head: {head})") def _create_pr(): repo = self.client.get_repo(full_name) target_base = base or repo.default_branch try: pr = repo.create_pull(title=title, body=body, head=head, base=target_base) logger.info(f"[GITHUB][REPO: {full_name}] PR #{pr.number} created") return pr.number except GithubException as e: if e.status == 422 and "A pull request already exists" in str(e.data): # Find the existing PR logger.info(f"[GITHUB][REPO: {full_name}] PR already exists for {head}, finding it...") prs = repo.get_pulls(state="open", head=f"{repo.owner.login}:{head}", base=target_base) for pr in prs: logger.info(f"[GITHUB][REPO: {full_name}] Found existing PR #{pr.number}") return pr.number raise e return await self._run_async(_create_pr) async def get_pull_request(self, full_name: str, pr_number: int) -> dict: logger.debug(f"[GITHUB][REPO: {full_name}] Fetching PR #{pr_number}") def _get_pr(): repo = self.client.get_repo(full_name) pr = repo.get_pull(pr_number) return pr.raw_data return await self._run_async(_get_pr) async def add_pr_review(self, full_name: str, pr_number: int, body: str, event: str = "COMMENT") -> dict: logger.info(f"[GITHUB][REPO: {full_name}] Adding PR review to #{pr_number} (event: {event})") def _add_review(): repo = self.client.get_repo(full_name) pr = repo.get_pull(pr_number) review = pr.create_review(body=body, event=event) return review.raw_data return await self._run_async(_add_review) async def list_pr_files(self, full_name: str, pr_number: int) -> list[dict]: logger.debug(f"[GITHUB][REPO: {full_name}] Listing files for PR #{pr_number}") def _list_files(): repo = self.client.get_repo(full_name) pr = repo.get_pull(pr_number) return [{"filename": f.filename, "status": f.status, "patch": f.patch, "additions": f.additions, "deletions": f.deletions} for f in pr.get_files()] return await self._run_async(_list_files) async def get_pr_diff(self, full_name: str, pr_number: int) -> str: logger.debug(f"[GITHUB][REPO: {full_name}] Fetching diff for PR #{pr_number}") # Using httpx to get the raw diff string directly from GitHub API url = f"https://api.github.com/repos/{full_name}/pulls/{pr_number}" headers = { "Authorization": f"Bearer {settings.GITHUB_TOKEN}", "Accept": "application/vnd.github.v3.diff" } async with httpx.AsyncClient() as client: resp = await client.get(url, headers=headers) if resp.status_code != 200: logger.error(f"[GITHUB][REPO: {full_name}] Failed to fetch PR diff for #{pr_number}: {resp.status_code}") raise HTTPException(status_code=resp.status_code, detail="Failed to fetch PR diff") return resp.text async def get_check_run_logs(self, full_name: str, check_run_id: int) -> str: logger.debug(f"[GITHUB][REPO: {full_name}] Fetching check run logs for {check_run_id}") url = f"https://api.github.com/repos/{full_name}/check-runs/{check_run_id}" headers = { "Authorization": f"Bearer {settings.GITHUB_TOKEN}", "Accept": "application/vnd.github.v3+json" } async with httpx.AsyncClient() as client: resp = await client.get(url, headers=headers) if resp.status_code != 200: logger.error(f"[GITHUB][REPO: {full_name}] Failed to fetch check run details for {check_run_id}: {resp.status_code}") return "Failed to fetch check run details." data = resp.json() output = data.get("output", {}) title = output.get("title", "No Title") summary = output.get("summary", "No Summary") text = output.get("text", "No Text") return f"Title: {title}\nSummary: {summary}\nText: {text}" # --- Webhook Operations --- async def register_webhook(self, full_name: str, webhook_url: str, secret: str) -> int: logger.info(f"[GITHUB][REPO: {full_name}] Registering webhook to {webhook_url}") def _register_hook(): repo = self.client.get_repo(full_name) config = { "url": webhook_url, "content_type": "json", "secret": secret } events = ["issues", "pull_request", "issue_comment", "push", "release", "check_run"] hook = repo.create_hook("web", config, events, active=True) logger.info(f"[GITHUB][REPO: {full_name}] Webhook registered with ID {hook.id}") return hook.id return await self._run_async(_register_hook) async def delete_webhook(self, full_name: str, hook_id: int) -> dict: logger.info(f"[GITHUB][REPO: {full_name}] Deleting webhook {hook_id}") def _delete_hook(): repo = self.client.get_repo(full_name) hook = repo.get_hook(hook_id) hook.delete() return {"status": "deleted", "hook_id": hook_id} return await self._run_async(_delete_hook) def verify_webhook_signature(self, payload: bytes, signature: str, secret: str) -> bool: if not signature or not secret: logger.warning("[GITHUB] Missing signature or secret for webhook verification") return False expected_signature = "sha256=" + hmac.new(secret.encode(), payload, hashlib.sha256).hexdigest() result = hmac.compare_digest(expected_signature, signature) if not result: logger.warning("[GITHUB] Webhook signature mismatch") return result # --- Release Operations --- async def create_release(self, full_name: str, tag: str, name: str, body: str, draft: bool = False) -> str: logger.info(f"[GITHUB][REPO: {full_name}] Creating release {tag}: {name}") def _create_release(): repo = self.client.get_repo(full_name) release = repo.create_git_release(tag=tag, name=name, message=body, draft=draft) logger.info(f"[GITHUB][REPO: {full_name}] Release created: {release.html_url}") return release.html_url return await self._run_async(_create_release) async def get_latest_release(self, full_name: str) -> dict | None: logger.debug(f"[GITHUB][REPO: {full_name}] Fetching latest release") def _get_latest(): repo = self.client.get_repo(full_name) try: release = repo.get_latest_release() return release.raw_data except GithubException as e: if e.status == 404: logger.debug(f"[GITHUB][REPO: {full_name}] No latest release found (404)") return None raise e return await self._run_async(_get_latest) async def get_all_tags(self, full_name: str) -> list[str]: logger.debug(f"[GITHUB][REPO: {full_name}] Listing all tags") def _get_tags(): repo = self.client.get_repo(full_name) return [tag.name for tag in repo.get_tags()] return await self._run_async(_get_tags) github_svc = GitHubService()