fixflow / backend /github_client.py
E5K7's picture
feat: Next.js frontend with live IDE, URL routing, retry logic, and step tracker
342230a
"""
GitHub client for fetching issues, repo trees, and file contents.
Supports both public repos (no auth) and private repos (with token).
"""
import re
import logging
from typing import Dict, List, Optional, Tuple
from urllib.parse import urlparse
import requests
from github import Github, GithubException, Auth
from backend.config import (
GITHUB_TOKEN,
IGNORE_EXTENSIONS,
IGNORE_DIRS,
CODE_EXTENSIONS,
MAX_FILE_SIZE_BYTES,
MAX_REPO_FILES,
)
logger = logging.getLogger(__name__)
# ── URL Parsing Helpers ───────────────────────────────────────────────────────
def parse_issue_url(issue_url: str) -> Tuple[str, str, int]:
"""
Parse a GitHub issue URL into (owner, repo, issue_number).
Supports:
https://github.com/owner/repo/issues/123
"""
issue_url = issue_url.strip().rstrip("/")
pattern = r"github\.com/([^/]+)/([^/]+)/issues/(\d+)"
match = re.search(pattern, issue_url)
if not match:
raise ValueError(
f"Could not parse GitHub issue URL: {issue_url!r}\n"
"Expected format: https://github.com/owner/repo/issues/123"
)
owner, repo, issue_num = match.groups()
return owner, repo, int(issue_num)
def parse_repo_url(repo_url: str) -> Tuple[str, str]:
"""
Parse a GitHub repo URL into (owner, repo).
Supports:
https://github.com/owner/repo
https://github.com/owner/repo.git
"""
repo_url = repo_url.strip().rstrip("/").removesuffix(".git")
pattern = r"github\.com/([^/]+)/([^/]+)"
match = re.search(pattern, repo_url)
if not match:
raise ValueError(
f"Could not parse GitHub repo URL: {repo_url!r}\n"
"Expected format: https://github.com/owner/repo"
)
owner, repo = match.groups()
return owner, repo
# ── GitHub Client ─────────────────────────────────────────────────────────────
class GitHubClient:
"""Wraps PyGithub for FixFlow's use cases."""
def __init__(self, token: Optional[str] = None):
tok = token or GITHUB_TOKEN
if tok:
auth = Auth.Token(tok)
self._gh = Github(auth=auth)
else:
self._gh = Github() # unauthenticated (60 req/hr)
self._rate_limit_warned = False
# ── Issue Fetching ────────────────────────────────────────────────────────
def fetch_issue(self, issue_url: str) -> Dict:
"""
Fetch a GitHub issue and return a structured dict:
{title, body, labels, state, author, comments, url}
"""
owner, repo_name, issue_num = parse_issue_url(issue_url)
logger.info("Fetching issue #%d from %s/%s", issue_num, owner, repo_name)
try:
repo = self._gh.get_repo(f"{owner}/{repo_name}")
issue = repo.get_issue(number=issue_num)
except GithubException as e:
raise RuntimeError(
f"Failed to fetch issue from GitHub: {e.data.get('message', str(e))}"
) from e
# Collect top comments (up to 10)
comments = []
try:
for comment in issue.get_comments():
comments.append({
"author": comment.user.login if comment.user else "unknown",
"body": comment.body or "",
"created_at": str(comment.created_at),
})
if len(comments) >= 10:
break
except GithubException:
pass
return {
"title": issue.title or "",
"body": issue.body or "",
"labels": [lbl.name for lbl in issue.labels],
"state": issue.state,
"author": issue.user.login if issue.user else "unknown",
"url": issue.html_url,
"number": issue_num,
"comments": comments,
"repo_owner": owner,
"repo_name": repo_name,
}
def list_open_issues(self, repo_url: str, limit: int = 20) -> List[Dict]:
"""
List open issues for a repository.
Returns a list of structured dicts: {title, number, url, author, created_at, body_snippet}
"""
owner, repo_name = parse_repo_url(repo_url)
logger.info("Listing open issues for %s/%s", owner, repo_name)
try:
repo = self._gh.get_repo(f"{owner}/{repo_name}")
# state='open' by default
issues = repo.get_issues(state='open', sort='updated', direction='desc')
result = []
for issue in issues:
# Skip Pull Requests (PyGithub get_issues() returns both)
if issue.pull_request:
continue
result.append({
"title": issue.title,
"number": issue.number,
"url": issue.html_url,
"author": issue.user.login if issue.user else "unknown",
"created_at": str(issue.created_at),
"body_snippet": (issue.body[:200] + "...") if issue.body else "",
})
if len(result) >= limit:
break
return result
except GithubException as e:
raise RuntimeError(
f"Failed to list issues: {e.data.get('message', str(e))}"
) from e
# ── Repo Tree ─────────────────────────────────────────────────────────────
def fetch_repo_tree(
self,
repo_url: str,
token: Optional[str] = None,
) -> List[Dict]:
"""
Return a flat list of code files in the repo.
Each entry: {path, size, type}
Filters out binary files, ignored dirs, etc.
"""
owner, repo_name = parse_repo_url(repo_url)
logger.info("Fetching repo tree for %s/%s", owner, repo_name)
# Refresh client if a token was provided on this call
if token and not GITHUB_TOKEN:
auth = Auth.Token(token)
self._gh = Github(auth=auth)
try:
repo = self._gh.get_repo(f"{owner}/{repo_name}")
# Use recursive git tree for efficiency
tree = repo.get_git_tree("HEAD", recursive=True)
except GithubException as e:
raise RuntimeError(
f"Failed to fetch repo tree: {e.data.get('message', str(e))}"
) from e
files = []
for item in tree.tree:
if item.type != "blob":
continue
path = item.path
# Skip ignored directories
parts = path.split("/")
if any(p in IGNORE_DIRS for p in parts[:-1]):
continue
# Skip ignored/non-code extensions
ext = "." + path.rsplit(".", 1)[-1].lower() if "." in path else ""
if ext in IGNORE_EXTENSIONS:
continue
if ext not in CODE_EXTENSIONS and ext:
continue
# Skip overly large files
size = item.size or 0
if size > MAX_FILE_SIZE_BYTES:
logger.debug("Skipping large file (%d bytes): %s", size, path)
continue
files.append({"path": path, "size": size, "type": item.type})
if len(files) >= MAX_REPO_FILES:
logger.warning("Hit MAX_REPO_FILES limit (%d)", MAX_REPO_FILES)
break
logger.info("Found %d code files in %s/%s", len(files), owner, repo_name)
return files
# ── File Content ──────────────────────────────────────────────────────────
def fetch_file_content(
self,
repo_url: str,
file_path: str,
) -> str:
"""
Fetch the raw text content of a single file from the repo.
Returns empty string on failure (binary, too large, etc).
"""
owner, repo_name = parse_repo_url(repo_url)
try:
repo = self._gh.get_repo(f"{owner}/{repo_name}")
content_obj = repo.get_contents(file_path)
# Handle list (shouldn't happen for blobs, but defensive)
if isinstance(content_obj, list):
content_obj = content_obj[0]
if content_obj.size > MAX_FILE_SIZE_BYTES:
return f"[File too large to display: {content_obj.size} bytes]"
decoded = content_obj.decoded_content
return decoded.decode("utf-8", errors="replace")
except GithubException as e:
logger.warning("Could not fetch %s: %s", file_path, e)
return ""
except Exception as e:
logger.warning("Error decoding %s: %s", file_path, e)
return ""
def fetch_multiple_files(
self,
repo_url: str,
file_paths: List[str],
) -> Dict[str, str]:
"""
Fetch contents of multiple files. Returns {path: content} dict.
"""
result = {}
owner, repo_name = parse_repo_url(repo_url)
logger.info("Fetching %d files from %s/%s", len(file_paths), owner, repo_name)
for path in file_paths:
content = self.fetch_file_content(repo_url, path)
if content:
result[path] = content
return result
# ── Pull Request Creation ─────────────────────────────────────────────────
def create_pull_request(
self,
repo_url: str,
branch_name: str,
files_content: Dict[str, str],
title: str,
body: str,
) -> str:
"""
Creates a new branch and commits all changed files, then opens a pull request.
Requires a GitHub token with write access to the repository.
Returns the HTML URL of the created PR.
"""
if not self._gh.get_user():
raise RuntimeError("A valid GitHub Token with write access is required to create a PR.")
owner, repo_name = parse_repo_url(repo_url)
logger.info("Creating PR on %s/%s branch %s", owner, repo_name, branch_name)
try:
repo = self._gh.get_repo(f"{owner}/{repo_name}")
from github import InputGitTreeElement
base_branch = repo.default_branch
base_ref = repo.get_git_ref(f"heads/{base_branch}")
# Create new branch off base branch
try:
repo.create_git_ref(ref=f"refs/heads/{branch_name}", sha=base_ref.object.sha)
except GithubException:
logger.warning(f"Branch {branch_name} may already exist, proceeding to update it.")
base_tree = repo.get_git_tree(base_ref.object.sha)
# Create a blob for each changed file
elements = []
for filepath, content in files_content.items():
blob = repo.create_git_blob(content, "utf-8")
elements.append(
InputGitTreeElement(path=filepath, mode='100644', type='blob', sha=blob.sha)
)
# Create new tree with all blob changes batched together
new_tree = repo.create_git_tree(elements, base_tree)
parent = repo.get_git_commit(base_ref.object.sha)
commit = repo.create_git_commit(message=title, tree=new_tree, parents=[parent])
# Update the branch reference to point to the new commit
ref = repo.get_git_ref(f"heads/{branch_name}")
ref.edit(commit.sha)
# Create the actual PR
pr = repo.create_pull(title=title, body=body, head=branch_name, base=base_branch)
return pr.html_url
except GithubException as e:
raise RuntimeError(
f"Failed to create PR. Ensure your GitHub token has write access to {owner}/{repo_name}. Detail: {e.data.get('message', str(e))}"
) from e
# ── Rate Limit Info ───────────────────────────────────────────────────────
def get_rate_limit_info(self) -> Dict:
"""Return current GitHub API rate limit information."""
try:
rl = self._gh.get_rate_limit()
return {
"core_remaining": rl.core.remaining,
"core_limit": rl.core.limit,
"reset_at": str(rl.core.reset),
}
except Exception:
return {}