| """GitHub repository analyzer. Fetches metadata via API and generates LLM-scored recruiter scorecards.""" |
|
|
| import asyncio |
| import base64 |
| import re |
| from typing import Optional |
| from dataclasses import dataclass, field |
| from datetime import datetime |
|
|
| import httpx |
|
|
| from app.config import get_settings |
| from app.security import validate_github_url, sanitize_string |
| from app.logging_config import get_logger |
|
|
| logger = get_logger("services.github_analyzer") |
|
|
| |
| GITHUB_API = "https://api.github.com" |
|
|
| |
| CODE_EXTENSIONS = { |
| ".py", ".js", ".ts", ".jsx", ".tsx", ".java", ".go", ".rs", ".rb", |
| ".cpp", ".c", ".h", ".hpp", ".cs", ".swift", ".kt", ".scala", ".php", |
| ".vue", ".svelte", ".html", ".css", ".scss", ".sass", ".less", |
| } |
|
|
| CONFIG_FILES = { |
| "package.json", "requirements.txt", "pyproject.toml", "Cargo.toml", |
| "go.mod", "pom.xml", "build.gradle", "Gemfile", "composer.json", |
| "Dockerfile", "docker-compose.yml", "docker-compose.yaml", |
| ".github/workflows", "Makefile", "CMakeLists.txt", |
| "tsconfig.json", "vite.config", "webpack.config", |
| "pytest.ini", "setup.py", "setup.cfg", "jest.config", |
| } |
|
|
| QUALITY_INDICATORS = { |
| "README.md", "README", "LICENSE", "CONTRIBUTING.md", |
| ".gitignore", ".editorconfig", ".prettierrc", ".eslintrc", |
| "CHANGELOG.md", "SECURITY.md", "CODE_OF_CONDUCT.md", |
| } |
|
|
| TEST_PATTERNS = { |
| "test_", "_test.py", ".test.js", ".test.ts", ".spec.js", ".spec.ts", |
| "tests/", "__tests__/", "spec/", "test/", |
| } |
|
|
|
|
| @dataclass |
| class RepoMetadata: |
| """Repository metadata from GitHub API.""" |
| owner: str |
| name: str |
| full_name: str |
| description: Optional[str] |
| primary_language: Optional[str] |
| languages: dict[str, int] = field(default_factory=dict) |
| stars: int = 0 |
| forks: int = 0 |
| open_issues: int = 0 |
| created_at: Optional[datetime] = None |
| updated_at: Optional[datetime] = None |
| default_branch: str = "main" |
| topics: list[str] = field(default_factory=list) |
| license_name: Optional[str] = None |
| has_readme: bool = False |
| has_license: bool = False |
|
|
|
|
| @dataclass |
| class FileAnalysis: |
| """Analysis of repository file structure.""" |
| total_files: int = 0 |
| total_dirs: int = 0 |
| code_files: int = 0 |
| test_files: int = 0 |
| config_files: list[str] = field(default_factory=list) |
| quality_files: list[str] = field(default_factory=list) |
| file_tree: list[str] = field(default_factory=list) |
| has_ci: bool = False |
| has_docker: bool = False |
| has_tests: bool = False |
| estimated_loc: int = 0 |
|
|
|
|
| @dataclass |
| class RepoAnalysisResult: |
| """Complete repository analysis result.""" |
| metadata: RepoMetadata |
| file_analysis: FileAnalysis |
| readme_content: Optional[str] = None |
| sample_code_files: dict[str, str] = field(default_factory=dict) |
|
|
|
|
| def _parse_github_url(url: str) -> tuple[str, str]: |
| """Extract owner and repo name from GitHub URL. Raises ValueError if invalid.""" |
| url = validate_github_url(url) |
| match = re.match(r"https://github\.com/([^/]+)/([^/]+)/?$", url) |
| if not match: |
| raise ValueError(f"Invalid GitHub URL format: {url}") |
| return match.group(1), match.group(2) |
|
|
|
|
| class GitHubAnalyzer: |
| """ |
| Analyzes GitHub repositories using the GitHub API. |
| |
| Does NOT clone repositories — uses REST API for all data. |
| This is safer and respects GitHub's terms of service. |
| """ |
|
|
| def __init__(self, github_token: Optional[str] = None): |
| """ |
| Initialize the analyzer. |
| |
| Args: |
| github_token: Optional GitHub personal access token for higher rate limits. |
| If not provided, uses unauthenticated API (60 req/hour). |
| """ |
| self.token = github_token |
| self._client: Optional[httpx.AsyncClient] = None |
|
|
| async def _get_client(self) -> httpx.AsyncClient: |
| """Get or create the HTTP client.""" |
| if self._client is None or self._client.is_closed: |
| headers = { |
| "Accept": "application/vnd.github+json", |
| "X-GitHub-Api-Version": "2022-11-28", |
| "User-Agent": "Shortlist-Portfolio-Analyzer/1.0", |
| } |
| if self.token: |
| headers["Authorization"] = f"Bearer {self.token}" |
| |
| self._client = httpx.AsyncClient( |
| base_url=GITHUB_API, |
| headers=headers, |
| timeout=30.0, |
| follow_redirects=True, |
| ) |
| return self._client |
|
|
| async def close(self) -> None: |
| """Close the HTTP client.""" |
| if self._client and not self._client.is_closed: |
| await self._client.aclose() |
| self._client = None |
|
|
| async def _api_get(self, path: str) -> Optional[dict]: |
| """Make a GET request to GitHub API. Returns None on 404.""" |
| client = await self._get_client() |
| try: |
| response = await client.get(path) |
| if response.status_code == 404: |
| return None |
| if response.status_code == 403: |
| logger.warning("GitHub API rate limit may be exceeded") |
| raise RuntimeError("GitHub API rate limit exceeded. Please try again later.") |
| response.raise_for_status() |
| return response.json() |
| except httpx.HTTPStatusError as e: |
| logger.error(f"GitHub API error: {e.response.status_code} for {path}") |
| raise |
|
|
| async def get_repo_metadata(self, owner: str, repo: str) -> RepoMetadata: |
| """Fetch repository metadata from GitHub API.""" |
| data = await self._api_get(f"/repos/{owner}/{repo}") |
| if not data: |
| raise ValueError(f"Repository not found: {owner}/{repo}") |
|
|
| |
| languages = await self._api_get(f"/repos/{owner}/{repo}/languages") or {} |
|
|
| |
| created_at = None |
| updated_at = None |
| if data.get("created_at"): |
| created_at = datetime.fromisoformat(data["created_at"].replace("Z", "+00:00")) |
| if data.get("updated_at"): |
| updated_at = datetime.fromisoformat(data["updated_at"].replace("Z", "+00:00")) |
|
|
| return RepoMetadata( |
| owner=owner, |
| name=repo, |
| full_name=data.get("full_name", f"{owner}/{repo}"), |
| description=sanitize_string(data.get("description") or "", max_length=500), |
| primary_language=data.get("language"), |
| languages=languages, |
| stars=data.get("stargazers_count", 0), |
| forks=data.get("forks_count", 0), |
| open_issues=data.get("open_issues_count", 0), |
| created_at=created_at, |
| updated_at=updated_at, |
| default_branch=data.get("default_branch", "main"), |
| topics=data.get("topics", []), |
| license_name=data.get("license", {}).get("name") if data.get("license") else None, |
| has_readme=True, |
| has_license=data.get("license") is not None, |
| ) |
|
|
| async def get_file_tree( |
| self, |
| owner: str, |
| repo: str, |
| branch: str = "main", |
| max_files: int = 500, |
| ) -> FileAnalysis: |
| """ |
| Fetch repository file tree using the Git Trees API. |
| |
| Uses recursive tree fetch for efficiency (single API call). |
| """ |
| data = await self._api_get(f"/repos/{owner}/{repo}/git/trees/{branch}?recursive=1") |
| if not data or "tree" not in data: |
| logger.warning(f"Could not fetch tree for {owner}/{repo}:{branch}") |
| return FileAnalysis() |
|
|
| tree = data["tree"][:max_files] |
| |
| analysis = FileAnalysis() |
| analysis.file_tree = [] |
| |
| for item in tree: |
| path = item.get("path", "") |
| item_type = item.get("type", "") |
| size = item.get("size", 0) |
| |
| if item_type == "tree": |
| analysis.total_dirs += 1 |
| elif item_type == "blob": |
| analysis.total_files += 1 |
| analysis.file_tree.append(path) |
| |
| |
| ext = "." + path.split(".")[-1] if "." in path else "" |
| filename = path.split("/")[-1] |
| |
| if ext.lower() in CODE_EXTENSIONS: |
| analysis.code_files += 1 |
| analysis.estimated_loc += size // 40 |
| |
| |
| if any(pattern in path.lower() for pattern in TEST_PATTERNS): |
| analysis.test_files += 1 |
| analysis.has_tests = True |
| |
| |
| for cf in CONFIG_FILES: |
| if cf in path: |
| if cf not in analysis.config_files: |
| analysis.config_files.append(cf) |
| if "dockerfile" in cf.lower(): |
| analysis.has_docker = True |
| if ".github/workflows" in cf: |
| analysis.has_ci = True |
| |
| |
| for qf in QUALITY_INDICATORS: |
| if filename.lower() == qf.lower() or path.lower() == qf.lower(): |
| if qf not in analysis.quality_files: |
| analysis.quality_files.append(qf) |
|
|
| return analysis |
|
|
| async def get_file_content( |
| self, |
| owner: str, |
| repo: str, |
| path: str, |
| max_size: int = 50_000, |
| ) -> Optional[str]: |
| """ |
| Fetch a single file's content from the repository. |
| |
| Returns None if file is too large or doesn't exist. |
| """ |
| data = await self._api_get(f"/repos/{owner}/{repo}/contents/{path}") |
| if not data: |
| return None |
| |
| |
| size = data.get("size", 0) |
| if size > max_size: |
| logger.info(f"File {path} too large ({size} bytes), skipping") |
| return None |
| |
| |
| content = data.get("content", "") |
| encoding = data.get("encoding", "") |
| |
| if encoding == "base64": |
| try: |
| return base64.b64decode(content).decode("utf-8", errors="replace") |
| except Exception as e: |
| logger.warning(f"Failed to decode {path}: {e}") |
| return None |
| |
| return content |
|
|
| async def analyze_repository(self, github_url: str) -> RepoAnalysisResult: |
| """ |
| Perform complete repository analysis. |
| |
| Args: |
| github_url: Full GitHub URL (https://github.com/owner/repo) |
| |
| Returns: |
| RepoAnalysisResult with metadata, file analysis, and sample code. |
| """ |
| owner, repo = _parse_github_url(github_url) |
| logger.info(f"Starting analysis of {owner}/{repo}") |
|
|
| |
| metadata = await self.get_repo_metadata(owner, repo) |
| |
| |
| file_analysis = await self.get_file_tree( |
| owner, repo, metadata.default_branch |
| ) |
| |
| |
| readme_content = None |
| for readme_name in ["README.md", "README.rst", "README.txt", "README"]: |
| content = await self.get_file_content(owner, repo, readme_name) |
| if content: |
| readme_content = sanitize_string(content, max_length=15_000) |
| break |
| |
| metadata.has_readme = readme_content is not None |
| |
| |
| sample_files: dict[str, str] = {} |
| interesting_files = [ |
| f for f in file_analysis.file_tree |
| if any(f.endswith(ext) for ext in [".py", ".ts", ".js", ".go", ".rs"]) |
| and not any(skip in f.lower() for skip in ["test", "spec", "mock", "__pycache__"]) |
| and "/" in f |
| ][:3] |
| |
| for file_path in interesting_files: |
| content = await self.get_file_content(owner, repo, file_path, max_size=20_000) |
| if content: |
| sample_files[file_path] = content |
|
|
| logger.info( |
| f"Analysis complete for {owner}/{repo}: " |
| f"{file_analysis.total_files} files, " |
| f"{file_analysis.code_files} code files, " |
| f"{file_analysis.test_files} test files" |
| ) |
|
|
| return RepoAnalysisResult( |
| metadata=metadata, |
| file_analysis=file_analysis, |
| readme_content=readme_content, |
| sample_code_files=sample_files, |
| ) |
|
|
|
|
| async def analyze_github_repo(github_url: str) -> RepoAnalysisResult: |
| """ |
| Convenience function to analyze a repository. |
| |
| Creates a new analyzer instance, performs analysis, and cleans up. |
| """ |
| settings = get_settings() |
| |
| |
| token = getattr(settings, "GITHUB_TOKEN", None) |
| |
| analyzer = GitHubAnalyzer(github_token=token) |
| try: |
| return await analyzer.analyze_repository(github_url) |
| finally: |
| await analyzer.close() |
|
|