github_app / github_api.py
Samyak000's picture
Update github_api.py
11f61f3 verified
"""
GitHub App Authentication and API Module for Codesage
Handles JWT generation, installation token exchange, and GitHub API calls
"""
import jwt
import time
import requests
from typing import Dict, List, Optional
from datetime import datetime, timedelta
import os
class GitHubAppAuth:
"""
Enterprise-grade GitHub App authentication
No PATs, no user passwords - just secure app-level access
"""
def __init__(self, app_id: str, private_key: str):
"""
Initialize GitHub App authentication
Args:
app_id: Your GitHub App ID
private_key: Contents of your .pem private key file
"""
self.app_id = app_id
self.private_key = private_key
self.base_url = "https://api.github.com"
def generate_jwt(self) -> str:
"""
Generate JWT for GitHub App authentication
Following official GitHub documentation
JWT Rules:
- Algorithm: RS256
- Expiry: 10 minutes maximum
- Issued at: Current time
Returns:
JWT token string
"""
# Match official GitHub documentation exactly
# Using 5 minutes instead of 10 to account for clock drift
payload = {
# Issued at time
'iat': int(time.time()),
# JWT expiration time (5 minutes to avoid clock drift issues)
'exp': int(time.time()) + 300,
# GitHub App's client ID (App ID)
'iss': self.app_id
}
# Generate JWT using RS256 algorithm
# Private key is already read as string, which PyJWT handles correctly
jwt_token = jwt.encode(payload, self.private_key, algorithm="RS256")
return jwt_token
def get_installation_token(self, installation_id: int) -> Dict:
"""
Exchange JWT for Installation Access Token
This token:
- Can access private repos
- Is scoped to permissions you selected
- Expires automatically (usually 1 hour)
Args:
installation_id: The installation ID from org install
Returns:
Dict with 'token', 'expires_at', and other metadata
"""
jwt_token = self.generate_jwt()
url = f"{self.base_url}/app/installations/{installation_id}/access_tokens"
headers = {
"Authorization": f"Bearer {jwt_token}",
"Accept": "application/vnd.github+json",
"X-GitHub-Api-Version": "2022-11-28"
}
response = requests.post(url, headers=headers)
response.raise_for_status()
return response.json()
def get_installation_repositories(self, installation_token: str) -> List[Dict]:
"""
List all repositories accessible to this installation
Args:
installation_token: Active installation access token
Returns:
List of repository objects
"""
url = f"{self.base_url}/installation/repositories"
headers = {
"Authorization": f"Bearer {installation_token}",
"Accept": "application/vnd.github+json",
"X-GitHub-Api-Version": "2022-11-28"
}
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json().get("repositories", [])
class GitHubInsights:
"""
Fetch powerful insights from GitHub repositories
All read-only, zero-risk operations
"""
def __init__(self, installation_token: str):
"""
Initialize with an active installation token
Args:
installation_token: Valid installation access token
"""
self.token = installation_token
self.base_url = "https://api.github.com"
self.headers = {
"Authorization": f"Bearer {installation_token}",
"Accept": "application/vnd.github+json",
"X-GitHub-Api-Version": "2022-11-28"
}
def get_commits(self, org: str, repo: str, per_page: int = 30) -> List[Dict]:
"""
Fetch recent commits from a repository
Args:
org: Organization name
repo: Repository name
per_page: Number of commits to fetch (default 30)
Returns:
List of commit objects
"""
url = f"{self.base_url}/repos/{org}/{repo}/commits"
params = {"per_page": per_page}
response = requests.get(url, headers=self.headers, params=params)
if response.status_code == 409:
# Empty repo or missing default branch
return []
response.raise_for_status()
return response.json()
def get_pull_requests(self, org: str, repo: str, state: str = "all") -> List[Dict]:
"""
Fetch pull requests from a repository
Args:
org: Organization name
repo: Repository name
state: "open", "closed", or "all" (default "all")
Returns:
List of pull request objects
"""
url = f"{self.base_url}/repos/{org}/{repo}/pulls"
params = {"state": state, "per_page": 100}
response = requests.get(url, headers=self.headers, params=params)
response.raise_for_status()
return response.json()
def get_issues(self, org: str, repo: str, state: str = "all") -> List[Dict]:
"""
Fetch issues from a repository
Args:
org: Organization name
repo: Repository name
state: "open", "closed", or "all" (default "all")
Returns:
List of issue objects
"""
url = f"{self.base_url}/repos/{org}/{repo}/issues"
params = {"state": state, "per_page": 100}
response = requests.get(url, headers=self.headers, params=params)
response.raise_for_status()
return response.json()
def get_contributors(self, org: str, repo: str) -> List[Dict]:
"""
Fetch contributors and their stats
Args:
org: Organization name
repo: Repository name
Returns:
List of contributor objects with stats
"""
url = f"{self.base_url}/repos/{org}/{repo}/contributors"
params = {"per_page": 100}
response = requests.get(url, headers=self.headers, params=params)
if response.status_code == 204 or not response.text:
return []
response.raise_for_status()
return response.json()
def get_code_frequency(self, org: str, repo: str) -> List[List[int]]:
"""
Get code frequency stats (additions/deletions per week)
Args:
org: Organization name
repo: Repository name
Returns:
List of [timestamp, additions, deletions] arrays
"""
url = f"{self.base_url}/repos/{org}/{repo}/stats/code_frequency"
response = requests.get(url, headers=self.headers)
if response.status_code == 202 or response.status_code == 204 or not response.text:
return []
response.raise_for_status()
return response.json()
def get_commit_activity(self, org: str, repo: str) -> List[Dict]:
"""
Get commit activity by week for the last year
Args:
org: Organization name
repo: Repository name
Returns:
List of activity objects with days and total
"""
url = f"{self.base_url}/repos/{org}/{repo}/stats/commit_activity"
response = requests.get(url, headers=self.headers)
if response.status_code == 202 or response.status_code == 204 or not response.text:
return []
response.raise_for_status()
return response.json()
def get_repository_info(self, org: str, repo: str) -> Dict:
"""
Get detailed repository information
Args:
org: Organization name
repo: Repository name
Returns:
Repository object with metadata
"""
url = f"{self.base_url}/repos/{org}/{repo}"
response = requests.get(url, headers=self.headers)
response.raise_for_status()
return response.json()
def get_languages(self, org: str, repo: str) -> Dict[str, int]:
"""
Get programming languages used in the repository
Args:
org: Organization name
repo: Repository name
Returns:
Dict mapping language name to bytes of code
"""
url = f"{self.base_url}/repos/{org}/{repo}/languages"
response = requests.get(url, headers=self.headers)
if response.status_code == 204 or not response.text:
return {}
response.raise_for_status()
return response.json()
def get_workflow_runs(self, org: str, repo: str) -> List[Dict]:
"""
Get GitHub Actions workflow runs (CI/CD insights)
Args:
org: Organization name
repo: Repository name
Returns:
List of workflow run objects
"""
url = f"{self.base_url}/repos/{org}/{repo}/actions/runs"
params = {"per_page": 50}
response = requests.get(url, headers=self.headers, params=params)
response.raise_for_status()
return response.json().get("workflow_runs", [])
def get_dependabot_alerts(self, org: str, repo: str) -> List[Dict]:
"""
Get Dependabot security alerts
Args:
org: Organization name
repo: Repository name
Returns:
List of Dependabot alert objects
"""
url = f"{self.base_url}/repos/{org}/{repo}/dependabot/alerts"
response = requests.get(url, headers=self.headers)
# Not all repos have this enabled
if response.status_code == 404:
return []
response.raise_for_status()
return response.json()
class TokenManager:
"""
Manages installation tokens and automatic refresh
Tokens expire after ~1 hour, this handles recreation
"""
def __init__(self, github_auth: GitHubAppAuth):
self.github_auth = github_auth
self.token_cache: Dict[int, Dict] = {}
def get_token(self, installation_id: int) -> str:
"""
Get valid installation token, refreshing if needed
Args:
installation_id: The installation ID
Returns:
Valid installation access token
"""
cached = self.token_cache.get(installation_id)
# Check if we have a cached token that's still valid
if cached:
expires_at = datetime.fromisoformat(cached["expires_at"].replace("Z", "+00:00"))
# Refresh if less than 5 minutes remaining
if datetime.now(expires_at.tzinfo) < expires_at - timedelta(minutes=5):
return cached["token"]
# Get fresh token
token_data = self.github_auth.get_installation_token(installation_id)
self.token_cache[installation_id] = token_data
return token_data["token"]