""" GitHub App Authentication and API Module for Codesage Handles JWT generation, installation token exchange, and GitHub API calls """ import jwt import time import requests from typing import Dict, List, Optional from datetime import datetime, timedelta import os class GitHubAppAuth: """ Enterprise-grade GitHub App authentication No PATs, no user passwords - just secure app-level access """ def __init__(self, app_id: str, private_key: str): """ Initialize GitHub App authentication Args: app_id: Your GitHub App ID private_key: Contents of your .pem private key file """ self.app_id = app_id self.private_key = private_key self.base_url = "https://api.github.com" def generate_jwt(self) -> str: """ Generate JWT for GitHub App authentication Following official GitHub documentation JWT Rules: - Algorithm: RS256 - Expiry: 10 minutes maximum - Issued at: Current time Returns: JWT token string """ # Match official GitHub documentation exactly # Using 5 minutes instead of 10 to account for clock drift payload = { # Issued at time 'iat': int(time.time()), # JWT expiration time (5 minutes to avoid clock drift issues) 'exp': int(time.time()) + 300, # GitHub App's client ID (App ID) 'iss': self.app_id } # Generate JWT using RS256 algorithm # Private key is already read as string, which PyJWT handles correctly jwt_token = jwt.encode(payload, self.private_key, algorithm="RS256") return jwt_token def get_installation_token(self, installation_id: int) -> Dict: """ Exchange JWT for Installation Access Token This token: - Can access private repos - Is scoped to permissions you selected - Expires automatically (usually 1 hour) Args: installation_id: The installation ID from org install Returns: Dict with 'token', 'expires_at', and other metadata """ jwt_token = self.generate_jwt() url = f"{self.base_url}/app/installations/{installation_id}/access_tokens" headers = { "Authorization": f"Bearer {jwt_token}", "Accept": "application/vnd.github+json", "X-GitHub-Api-Version": "2022-11-28" } response = requests.post(url, headers=headers) response.raise_for_status() return response.json() def get_installation_repositories(self, installation_token: str) -> List[Dict]: """ List all repositories accessible to this installation Args: installation_token: Active installation access token Returns: List of repository objects """ url = f"{self.base_url}/installation/repositories" headers = { "Authorization": f"Bearer {installation_token}", "Accept": "application/vnd.github+json", "X-GitHub-Api-Version": "2022-11-28" } response = requests.get(url, headers=headers) response.raise_for_status() return response.json().get("repositories", []) class GitHubInsights: """ Fetch powerful insights from GitHub repositories All read-only, zero-risk operations """ def __init__(self, installation_token: str): """ Initialize with an active installation token Args: installation_token: Valid installation access token """ self.token = installation_token self.base_url = "https://api.github.com" self.headers = { "Authorization": f"Bearer {installation_token}", "Accept": "application/vnd.github+json", "X-GitHub-Api-Version": "2022-11-28" } def get_commits(self, org: str, repo: str, per_page: int = 30) -> List[Dict]: """ Fetch recent commits from a repository Args: org: Organization name repo: Repository name per_page: Number of commits to fetch (default 30) Returns: List of commit objects """ url = f"{self.base_url}/repos/{org}/{repo}/commits" params = {"per_page": per_page} response = requests.get(url, headers=self.headers, params=params) if response.status_code == 409: # Empty repo or missing default branch return [] response.raise_for_status() return response.json() def get_pull_requests(self, org: str, repo: str, state: str = "all") -> List[Dict]: """ Fetch pull requests from a repository Args: org: Organization name repo: Repository name state: "open", "closed", or "all" (default "all") Returns: List of pull request objects """ url = f"{self.base_url}/repos/{org}/{repo}/pulls" params = {"state": state, "per_page": 100} response = requests.get(url, headers=self.headers, params=params) response.raise_for_status() return response.json() def get_issues(self, org: str, repo: str, state: str = "all") -> List[Dict]: """ Fetch issues from a repository Args: org: Organization name repo: Repository name state: "open", "closed", or "all" (default "all") Returns: List of issue objects """ url = f"{self.base_url}/repos/{org}/{repo}/issues" params = {"state": state, "per_page": 100} response = requests.get(url, headers=self.headers, params=params) response.raise_for_status() return response.json() def get_contributors(self, org: str, repo: str) -> List[Dict]: """ Fetch contributors and their stats Args: org: Organization name repo: Repository name Returns: List of contributor objects with stats """ url = f"{self.base_url}/repos/{org}/{repo}/contributors" params = {"per_page": 100} response = requests.get(url, headers=self.headers, params=params) if response.status_code == 204 or not response.text: return [] response.raise_for_status() return response.json() def get_code_frequency(self, org: str, repo: str) -> List[List[int]]: """ Get code frequency stats (additions/deletions per week) Args: org: Organization name repo: Repository name Returns: List of [timestamp, additions, deletions] arrays """ url = f"{self.base_url}/repos/{org}/{repo}/stats/code_frequency" response = requests.get(url, headers=self.headers) if response.status_code == 202 or response.status_code == 204 or not response.text: return [] response.raise_for_status() return response.json() def get_commit_activity(self, org: str, repo: str) -> List[Dict]: """ Get commit activity by week for the last year Args: org: Organization name repo: Repository name Returns: List of activity objects with days and total """ url = f"{self.base_url}/repos/{org}/{repo}/stats/commit_activity" response = requests.get(url, headers=self.headers) if response.status_code == 202 or response.status_code == 204 or not response.text: return [] response.raise_for_status() return response.json() def get_repository_info(self, org: str, repo: str) -> Dict: """ Get detailed repository information Args: org: Organization name repo: Repository name Returns: Repository object with metadata """ url = f"{self.base_url}/repos/{org}/{repo}" response = requests.get(url, headers=self.headers) response.raise_for_status() return response.json() def get_languages(self, org: str, repo: str) -> Dict[str, int]: """ Get programming languages used in the repository Args: org: Organization name repo: Repository name Returns: Dict mapping language name to bytes of code """ url = f"{self.base_url}/repos/{org}/{repo}/languages" response = requests.get(url, headers=self.headers) if response.status_code == 204 or not response.text: return {} response.raise_for_status() return response.json() def get_workflow_runs(self, org: str, repo: str) -> List[Dict]: """ Get GitHub Actions workflow runs (CI/CD insights) Args: org: Organization name repo: Repository name Returns: List of workflow run objects """ url = f"{self.base_url}/repos/{org}/{repo}/actions/runs" params = {"per_page": 50} response = requests.get(url, headers=self.headers, params=params) response.raise_for_status() return response.json().get("workflow_runs", []) def get_dependabot_alerts(self, org: str, repo: str) -> List[Dict]: """ Get Dependabot security alerts Args: org: Organization name repo: Repository name Returns: List of Dependabot alert objects """ url = f"{self.base_url}/repos/{org}/{repo}/dependabot/alerts" response = requests.get(url, headers=self.headers) # Not all repos have this enabled if response.status_code == 404: return [] response.raise_for_status() return response.json() class TokenManager: """ Manages installation tokens and automatic refresh Tokens expire after ~1 hour, this handles recreation """ def __init__(self, github_auth: GitHubAppAuth): self.github_auth = github_auth self.token_cache: Dict[int, Dict] = {} def get_token(self, installation_id: int) -> str: """ Get valid installation token, refreshing if needed Args: installation_id: The installation ID Returns: Valid installation access token """ cached = self.token_cache.get(installation_id) # Check if we have a cached token that's still valid if cached: expires_at = datetime.fromisoformat(cached["expires_at"].replace("Z", "+00:00")) # Refresh if less than 5 minutes remaining if datetime.now(expires_at.tzinfo) < expires_at - timedelta(minutes=5): return cached["token"] # Get fresh token token_data = self.github_auth.get_installation_token(installation_id) self.token_cache[installation_id] = token_data return token_data["token"]