| import asyncio | |
| import base64 | |
| from datetime import datetime, timezone, timedelta | |
| import jwt | |
| import threading | |
| import time | |
| from typing import List | |
| import requests | |
| from config import APP_ID, APP_PRIVATE_KEY | |
| installation_tokens = {} | |
| token_lock = threading.Lock() | |
| def generate_jwt(): | |
| """Generate a JWT signed with GitHub App private key.""" | |
| now = int(time.time()) | |
| payload = { | |
| "iat": now, | |
| "exp": now + (10 * 60), | |
| "iss": APP_ID, | |
| } | |
| encoded_jwt = jwt.encode(payload, APP_PRIVATE_KEY, algorithm="RS256") | |
| return encoded_jwt | |
| def github_request(method, url, headers=None, **kwargs): | |
| if headers is None: | |
| jwt_token = generate_jwt() | |
| headers = { | |
| "Authorization": f"Bearer {jwt_token}", | |
| "Accept": "application/vnd.github.v3+json", | |
| } | |
| while True: | |
| response = requests.request(method, url, headers=headers, **kwargs) | |
| remaining = response.headers.get("X-RateLimit-Remaining") | |
| reset_time = response.headers.get("X-RateLimit-Reset") | |
| if remaining is None or reset_time is None: | |
| return response | |
| remaining = int(remaining) | |
| reset_time = int(reset_time) | |
| print(f"[GitHub] Remaining: {remaining}, Reset: {reset_time}") | |
| if response.status_code == 403 and "rate limit" in response.text.lower(): | |
| wait = reset_time - int(time.time()) + 5 | |
| print(f"Hit rate limit. Sleeping for {wait} seconds.") | |
| time.sleep(max(wait, 0)) | |
| continue | |
| if remaining <= 2: | |
| wait = reset_time - int(time.time()) + 5 | |
| print(f"Approaching rate limit ({remaining} left). Sleeping for {wait} seconds.") | |
| time.sleep(max(wait, 0)) | |
| continue | |
| return response | |
| def get_installation_id(owner, repo): | |
| """Fetch the installation ID for the app on a repo.""" | |
| url = f"https://api.github.com/repos/{owner}/{repo}/installation" | |
| response = github_request("GET", url) | |
| if response.status_code == 200: | |
| data = response.json() | |
| return data["id"] | |
| else: | |
| raise Exception(f"Failed to get installation ID for {owner}/{repo}: {response.status_code}. Please [install the GitHub App](https://github.com/apps/opensorus) on this repository before using the agent.") | |
| def get_installation_token(installation_id): | |
| """Return a valid installation token, fetch new if expired or missing.""" | |
| with token_lock: | |
| token_info = installation_tokens.get(installation_id) | |
| if token_info and token_info["expires_at"] > datetime.now(timezone.utc) + timedelta(seconds=30): | |
| return token_info["token"] | |
| url = f"https://api.github.com/app/installations/{installation_id}/access_tokens" | |
| response = github_request("POST", url) | |
| if response.status_code != 201: | |
| raise Exception(f"Failed to fetch installation token: {response.status_code}. Please [install the GitHub App](https://github.com/apps/opensorus) on this repository before using the agent.") | |
| token_data = response.json() | |
| token = token_data["token"] | |
| expires_at = datetime.strptime(token_data["expires_at"], "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc) | |
| installation_tokens[installation_id] = {"token": token, "expires_at": expires_at} | |
| return token | |
| async def fetch_repo_files(owner: str, repo: str, ref: str = "main") -> List[str]: | |
| """ | |
| Lists all files in the repository by recursively fetching the Git tree from GitHub API. | |
| Returns a list of file paths. | |
| """ | |
| installation_id = get_installation_id(owner, repo) | |
| token = get_installation_token(installation_id) | |
| url = f"https://api.github.com/repos/{owner}/{repo}/git/trees/{ref}?recursive=1" | |
| headers = { | |
| "Authorization": f"Bearer {token}", | |
| "Accept": "application/vnd.github.v3+json" | |
| } | |
| response = await asyncio.to_thread(github_request, "GET", url, headers=headers) | |
| if response.status_code != 200: | |
| raise Exception(f"Failed to fetch repository files: {response.status_code}. Please ensure the branch name is correct and files exist in this branch.") | |
| tree = response.json().get("tree", []) | |
| file_paths = [item["path"] for item in tree if item["type"] == "blob"] | |
| return file_paths | |
| async def fetch_file_content(owner: str, repo: str, path: str, ref: str = "main") -> str: | |
| """ | |
| Fetches the content of a file from the GitHub repository. | |
| """ | |
| installation_id = get_installation_id(owner, repo) | |
| token = await asyncio.to_thread(get_installation_token, installation_id) | |
| url = f"https://api.github.com/repos/{owner}/{repo}/contents/{path}?ref={ref}" | |
| headers = { | |
| "Authorization": f"Bearer {token}", | |
| "Accept": "application/vnd.github.v3+json" | |
| } | |
| response = await asyncio.to_thread(github_request, "GET", url, headers=headers) | |
| if response.status_code != 200: | |
| raise Exception(f"Failed to fetch file content {path}: {response.status_code} {response.text}") | |
| content_json = response.json() | |
| content = base64.b64decode(content_json["content"]).decode("utf-8", errors="ignore") | |
| return content | |