Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import time | |
| import re | |
| import logging | |
| import datetime | |
| import concurrent.futures | |
| import sys | |
| import base64 | |
| import tempfile | |
| from pathlib import Path | |
| from typing import Dict, List, Union, Any, Optional, Tuple, Set | |
| from collections import Counter, defaultdict | |
| from dataclasses import dataclass, field, asdict | |
| from io import BytesIO, StringIO | |
| import urllib.request | |
| import numpy as np | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| import networkx as nx | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| from plotly.subplots import make_subplots | |
| from tqdm.notebook import tqdm | |
| from dateutil.relativedelta import relativedelta | |
| from github import Github, GithubException, RateLimitExceededException | |
| import gradio as gr | |
| # For PDF Generation | |
| from reportlab.lib.pagesizes import letter, A4 | |
| from reportlab.lib import colors | |
| from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle | |
| from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Image, Table, TableStyle, PageBreak | |
| from reportlab.lib.units import inch | |
| from reportlab.pdfgen import canvas | |
| from reportlab.lib.enums import TA_CENTER, TA_LEFT | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| logger = logging.getLogger("github_analyzer") | |
| class GitHubAPIConfig: | |
| """Configuration for the GitHub API client with sensible defaults.""" | |
| # API access configuration | |
| token: str = None | |
| max_retries: int = 5 | |
| backoff_factor: int = 2 | |
| per_page: int = 100 # Max allowed by GitHub | |
| timeout: int = 30 | |
| # Retry status codes | |
| retry_status_codes: Set[int] = field(default_factory=lambda: { | |
| 403, 429, 500, 502, 503, 504 | |
| }) | |
| # Permission types | |
| collaborator_permission_types: List[str] = field(default_factory=lambda: [ | |
| "admin", "push", "pull", "maintain", "triage" | |
| ]) | |
| # File classification | |
| code_extensions: List[str] = field(default_factory=lambda: [ | |
| ".py", ".js", ".java", ".c", ".cpp", ".cs", ".go", ".php", ".rb", | |
| ".swift", ".kt", ".ts", ".rs", ".scala", ".lua", ".m", ".mm", | |
| ".h", ".hpp", ".cc", ".hh", ".f", ".f90", ".f95", ".f03", ".f08", | |
| ".for", ".f77", ".jl", ".pl", ".pm", ".t", ".r", ".dart", ".groovy", | |
| ".v", ".vhd", ".vhdl", ".erl", ".hrl", ".hs", ".lhs", ".ex", ".exs", ".hx" | |
| ]) | |
| markup_extensions: List[str] = field(default_factory=lambda: [ | |
| ".md", ".html", ".htm", ".xml", ".json", ".yaml", ".yml", ".txt", | |
| ".rst", ".tex", ".adoc", ".csv", ".tsv", ".toml", ".ini", ".cfg" | |
| ]) | |
| script_extensions: List[str] = field(default_factory=lambda: [ | |
| ".sh", ".bash", ".zsh", ".ps1", ".bat", ".cmd" | |
| ]) | |
| notebook_extensions: List[str] = field(default_factory=lambda: [ | |
| ".ipynb" | |
| ]) | |
| data_extensions: List[str] = field(default_factory=lambda: [ | |
| ".csv", ".tsv", ".json", ".xml", ".xls", ".xlsx", ".hdf5", | |
| ".parquet", ".feather", ".pkl", ".sav", ".dta", ".arff" | |
| ]) | |
| config_extensions: List[str] = field(default_factory=lambda: [ | |
| ".yml", ".yaml", ".json", ".toml", ".ini", ".cfg", ".conf" | |
| ]) | |
| other_extensions: List[str] = field(default_factory=lambda: [ | |
| ".txt", ".log", ".svg", ".png", ".jpg", ".jpeg" | |
| ]) | |
| # Data collection limits (set to None for no limit) | |
| max_contributors: Optional[int] = 50 | |
| max_issues: Optional[int] = 100 | |
| max_commits: Optional[int] = 200 | |
| max_search_results: Optional[int] = 50 | |
| max_pull_requests: Optional[int] = 100 | |
| max_collaborators: Optional[int] = 30 | |
| # Output configuration | |
| output_dir: str = "/tmp/github_data" | |
| generate_visualizations: bool = True | |
| def __post_init__(self): | |
| """Ensure output directory exists""" | |
| os.makedirs(self.output_dir, exist_ok=True) | |
| def all_code_extensions(self) -> List[str]: | |
| """Return all code-related file extensions""" | |
| return list(set( | |
| self.code_extensions + | |
| self.script_extensions + | |
| self.config_extensions | |
| )) | |
| class GithubClient: | |
| """ | |
| A robust GitHub client that handles rate limiting, retries, and provides | |
| consistent error handling. | |
| """ | |
| def __init__(self, config: GitHubAPIConfig): | |
| """Initialize the GitHub client with configuration.""" | |
| self.config = config | |
| self.github = Github( | |
| config.token, | |
| per_page=config.per_page, | |
| timeout=config.timeout, | |
| retry=config.max_retries | |
| ) | |
| self.cache = {} # Simple in-memory cache | |
| def get_repo(self, repo_path: str): | |
| """Get a repository by owner/name with caching.""" | |
| cache_key = f"repo:{repo_path}" | |
| if cache_key in self.cache: | |
| return self.cache[cache_key] | |
| repo = self.github.get_repo(repo_path) | |
| self.cache[cache_key] = repo | |
| return repo | |
| def _handle_exception(self, e: GithubException, retry_count: int) -> bool: | |
| """ | |
| Handle GitHub exceptions with proper retries and backoff strategy. | |
| Args: | |
| e: The exception to handle | |
| retry_count: Current retry count | |
| Returns: | |
| bool: True if retry should be attempted, False otherwise | |
| """ | |
| if retry_count >= self.config.max_retries: | |
| logger.error(f"Max retries ({self.config.max_retries}) exceeded.") | |
| return False | |
| if isinstance(e, RateLimitExceededException): | |
| # Handle primary rate limit | |
| rate_limit = self.github.get_rate_limit() | |
| reset_time = rate_limit.core.reset.timestamp() if hasattr(rate_limit, 'core') else time.time() + 3600 | |
| sleep_time = max(0, int(reset_time - time.time())) + 1 | |
| logger.warning(f"Rate limit exceeded. Waiting for {sleep_time} seconds...") | |
| time.sleep(sleep_time) | |
| return True | |
| elif e.status in self.config.retry_status_codes: | |
| # Handle secondary rate limits and server errors | |
| sleep_time = self.config.backoff_factor ** retry_count | |
| logger.warning( | |
| f"Temporary error (status {e.status}). Retrying in {sleep_time} seconds. " | |
| f"Attempt {retry_count+1}/{self.config.max_retries}." | |
| ) | |
| time.sleep(sleep_time) | |
| return True | |
| # Non-recoverable error | |
| logger.error(f"Non-recoverable GitHub API error: {e}") | |
| return False | |
| def _paginated_request(self, method, *args, **kwargs): | |
| """ | |
| Execute a paginated GitHub API request with retry logic. | |
| Args: | |
| method: The PyGithub method to call | |
| Returns: | |
| List of results or None on non-recoverable error | |
| """ | |
| results = [] | |
| retry_count = 0 | |
| max_results = kwargs.pop('max_results', None) | |
| while retry_count <= self.config.max_retries: | |
| try: | |
| paginated_list = method(*args, **kwargs) | |
| # Process items | |
| for item in paginated_list: | |
| results.append(item) | |
| if max_results and len(results) >= max_results: | |
| return results | |
| # Check if we've reached the end | |
| if paginated_list.totalCount <= len(results): | |
| break | |
| # Reset retry counter on success | |
| retry_count = 0 | |
| except GithubException as e: | |
| if self._handle_exception(e, retry_count): | |
| retry_count += 1 | |
| else: | |
| return None | |
| return results | |
| def _execute_request(self, method, *args, **kwargs): | |
| """ | |
| Execute a single GitHub API request with retry logic. | |
| Args: | |
| method: The PyGithub method to call | |
| Returns: | |
| Result of the API call or None on non-recoverable error | |
| """ | |
| retry_count = 0 | |
| while retry_count <= self.config.max_retries: | |
| try: | |
| result = method(*args, **kwargs) | |
| return result | |
| except GithubException as e: | |
| # Special case for 404 errors - file not found | |
| if e.status == 404: | |
| logger.info(f"Resource not found: {e}") | |
| return None | |
| if self._handle_exception(e, retry_count): | |
| retry_count += 1 | |
| else: | |
| return None | |
| return None | |
| class GitHubRepoAnalyzer: | |
| """ | |
| Main class for analyzing GitHub repositories and generating insights. | |
| """ | |
| def __init__(self, config: GitHubAPIConfig): | |
| """Initialize the analyzer with configuration.""" | |
| self.config = config | |
| self.client = GithubClient(config) | |
| def get_repo_details(self, repo) -> Dict[str, Any]: | |
| """Get comprehensive repository metadata.""" | |
| logger.info(f"Fetching repository details for {repo.full_name}") | |
| return { | |
| "name": repo.name, | |
| "full_name": repo.full_name, | |
| "description": repo.description, | |
| "html_url": repo.html_url, | |
| "stargazers_count": repo.stargazers_count, | |
| "watchers_count": repo.watchers_count, | |
| "forks_count": repo.forks_count, | |
| "open_issues_count": repo.open_issues_count, | |
| "language": repo.language, | |
| "default_branch": repo.default_branch, | |
| "created_at": repo.created_at.isoformat() if repo.created_at else None, | |
| "updated_at": repo.updated_at.isoformat() if repo.updated_at else None, | |
| "pushed_at": repo.pushed_at.isoformat() if repo.pushed_at else None, | |
| "license": repo.license.name if repo.license else None, | |
| "topics": list(repo.get_topics()), | |
| "archived": repo.archived, | |
| "disabled": repo.disabled, | |
| "visibility": repo.visibility, | |
| "has_wiki": repo.has_wiki, | |
| "has_pages": repo.has_pages, | |
| "has_projects": repo.has_projects, | |
| "has_issues": repo.has_issues, | |
| "has_discussions": repo.has_discussions if hasattr(repo, 'has_discussions') else None, | |
| "size": repo.size, # Size in KB | |
| "network_count": repo.network_count, | |
| "subscribers_count": repo.subscribers_count, | |
| "organization": repo.organization.login if repo.organization else None, | |
| "parent": repo.parent.full_name if hasattr(repo, 'parent') and repo.parent else None, | |
| "fork": repo.fork, | |
| } | |
| def get_contributors(self, repo) -> List[Dict[str, Any]]: | |
| """Get repository contributors with detailed information.""" | |
| logger.info(f"Fetching contributors for {repo.full_name}") | |
| contributors = self.client._paginated_request( | |
| repo.get_contributors, | |
| max_results=self.config.max_contributors | |
| ) | |
| if contributors is None: | |
| return [] | |
| return [ | |
| { | |
| "login": c.login, | |
| "id": c.id, | |
| "contributions": c.contributions, | |
| "type": c.type, | |
| "html_url": c.html_url, | |
| "followers": c.followers, | |
| "following": c.following, | |
| "public_repos": c.public_repos if hasattr(c, 'public_repos') else None, | |
| "bio": c.bio if hasattr(c, 'bio') else None, | |
| "location": c.location if hasattr(c, 'location') else None, | |
| "company": c.company if hasattr(c, 'company') else None, | |
| "email": c.email if hasattr(c, 'email') else None, | |
| "avatar_url": c.avatar_url if hasattr(c, 'avatar_url') else None, | |
| } | |
| for c in contributors | |
| ] | |
| def get_languages(self, repo) -> Dict[str, int]: | |
| """Get languages used in the repository.""" | |
| logger.info(f"Fetching languages for {repo.full_name}") | |
| languages = self.client._execute_request(repo.get_languages) | |
| return languages or {} | |
| def get_issues(self, repo, state: str = "all") -> List[Dict[str, Any]]: | |
| """Get repository issues.""" | |
| logger.info(f"Fetching issues for {repo.full_name} with state={state}") | |
| issues = self.client._paginated_request( | |
| repo.get_issues, | |
| state=state, | |
| max_results=self.config.max_issues | |
| ) | |
| if issues is None: | |
| return [] | |
| return [ | |
| { | |
| "id": issue.id, | |
| "number": issue.number, | |
| "title": issue.title, | |
| "body": issue.body, | |
| "state": issue.state, | |
| "user_login": issue.user.login if issue.user else None, | |
| "labels": [label.name for label in issue.labels], | |
| "comments": issue.comments, | |
| "created_at": issue.created_at.isoformat() if issue.created_at else None, | |
| "updated_at": issue.updated_at.isoformat() if issue.updated_at else None, | |
| "closed_at": issue.closed_at.isoformat() if issue.closed_at else None, | |
| "pull_request": issue.pull_request is not None, | |
| "milestone": issue.milestone.title if issue.milestone else None, | |
| "assignees": [user.login for user in issue.assignees] if issue.assignees else [], | |
| } | |
| for issue in issues | |
| ] | |
| def get_commits(self, repo) -> List[Dict[str, Any]]: | |
| """Get repository commits.""" | |
| logger.info(f"Fetching commits for {repo.full_name}") | |
| commits = self.client._paginated_request( | |
| repo.get_commits, | |
| max_results=self.config.max_commits | |
| ) | |
| if commits is None: | |
| return [] | |
| return [ | |
| { | |
| "sha": commit.sha, | |
| "commit_message": commit.commit.message, | |
| "author_login": commit.author.login if commit.author else None, | |
| "author_name": commit.commit.author.name if commit.commit and commit.commit.author else None, | |
| "author_email": commit.commit.author.email if commit.commit and commit.commit.author else None, | |
| "committer_login": commit.committer.login if commit.committer else None, | |
| "committer_name": commit.commit.committer.name if commit.commit and commit.commit.committer else None, | |
| "date": commit.commit.author.date.isoformat() if commit.commit and commit.commit.author else None, | |
| "html_url": commit.html_url, | |
| "stats": { | |
| "additions": commit.stats.additions if hasattr(commit, 'stats') else None, | |
| "deletions": commit.stats.deletions if hasattr(commit, 'stats') else None, | |
| "total": commit.stats.total if hasattr(commit, 'stats') else None, | |
| }, | |
| "files_changed": [ | |
| {"filename": f.filename, "additions": f.additions, "deletions": f.deletions, "status": f.status} | |
| for f in commit.files | |
| ] if hasattr(commit, 'files') else [], | |
| } | |
| for commit in commits | |
| ] | |
| def get_readme(self, repo) -> str: | |
| """Get repository README content.""" | |
| logger.info(f"Fetching README for {repo.full_name}") | |
| readme = self.client._execute_request(repo.get_readme) | |
| if readme is None: | |
| return "" | |
| try: | |
| return readme.decoded_content.decode('utf-8') | |
| except UnicodeDecodeError: | |
| logger.warning(f"Could not decode README content for {repo.full_name}") | |
| return "" | |
| def get_pull_requests(self, repo, state: str = "all") -> List[Dict[str, Any]]: | |
| """Get repository pull requests.""" | |
| logger.info(f"Fetching pull requests for {repo.full_name} with state={state}") | |
| pulls = self.client._paginated_request( | |
| repo.get_pulls, | |
| state=state, | |
| max_results=self.config.max_pull_requests | |
| ) | |
| if pulls is None: | |
| return [] | |
| return [ | |
| { | |
| "id": pull.id, | |
| "number": pull.number, | |
| "title": pull.title, | |
| "body": pull.body, | |
| "state": pull.state, | |
| "user_login": pull.user.login if pull.user else None, | |
| "created_at": pull.created_at.isoformat() if pull.created_at else None, | |
| "updated_at": pull.updated_at.isoformat() if pull.updated_at else None, | |
| "closed_at": pull.closed_at.isoformat() if pull.closed_at else None, | |
| "merged_at": pull.merged_at.isoformat() if pull.merged_at else None, | |
| "draft": pull.draft if hasattr(pull, 'draft') else None, | |
| "mergeable": pull.mergeable if hasattr(pull, 'mergeable') else None, | |
| "mergeable_state": pull.mergeable_state if hasattr(pull, 'mergeable_state') else None, | |
| "merged": pull.merged if hasattr(pull, 'merged') else None, | |
| "merge_commit_sha": pull.merge_commit_sha if hasattr(pull, 'merge_commit_sha') else None, | |
| "comments": pull.comments if hasattr(pull, 'comments') else 0, | |
| "review_comments": pull.review_comments if hasattr(pull, 'review_comments') else 0, | |
| "commits": pull.commits if hasattr(pull, 'commits') else 0, | |
| "additions": pull.additions if hasattr(pull, 'additions') else 0, | |
| "deletions": pull.deletions if hasattr(pull, 'deletions') else 0, | |
| "changed_files": pull.changed_files if hasattr(pull, 'changed_files') else 0, | |
| "head_ref": pull.head.ref if hasattr(pull, 'head') and pull.head else None, | |
| "base_ref": pull.base.ref if hasattr(pull, 'base') and pull.base else None, | |
| "labels": [label.name for label in pull.labels] if hasattr(pull, 'labels') else [], | |
| "assignees": [user.login for user in pull.assignees] if hasattr(pull, 'assignees') else [], | |
| "requested_reviewers": [user.login for user in pull.requested_reviewers] if hasattr(pull, 'requested_reviewers') else [], | |
| } | |
| for pull in pulls | |
| ] | |
| def get_collaborators(self, repo, affiliation: str = "all") -> List[Dict[str, Any]]: | |
| """Get repository collaborators.""" | |
| logger.info(f"Fetching collaborators for {repo.full_name} with affiliation={affiliation}") | |
| collaborators = self.client._paginated_request( | |
| repo.get_collaborators, | |
| affiliation=affiliation, | |
| max_results=self.config.max_collaborators | |
| ) | |
| if collaborators is None: | |
| return [] | |
| return [ | |
| { | |
| "login": c.login, | |
| "id": c.id, | |
| "type": c.type, | |
| "url": c.url, | |
| "site_admin": c.site_admin if hasattr(c, 'site_admin') else None, | |
| "role_name": self._get_permission_level(repo, c.login), | |
| "avatar_url": c.avatar_url if hasattr(c, 'avatar_url') else None, | |
| } | |
| for c in collaborators | |
| ] | |
| def _get_permission_level(self, repo, username: str) -> str: | |
| """Get permission level for a collaborator.""" | |
| try: | |
| return repo.get_collaborator_permission(username) | |
| except GithubException: | |
| return "unknown" | |
| def get_file_distribution(self, repo) -> Dict[str, int]: | |
| """Analyze file types distribution in the repository.""" | |
| logger.info(f"Analyzing file distribution for {repo.full_name}") | |
| # Get all files in the repo (only feasible for smaller repos) | |
| try: | |
| contents = self.client._execute_request(repo.get_contents, "") | |
| if not contents: | |
| return {} | |
| file_types = defaultdict(int) | |
| directories = [] | |
| # Process initial contents | |
| for item in contents: | |
| if item.type == "dir": | |
| directories.append(item.path) | |
| elif item.type == "file": | |
| ext = os.path.splitext(item.name)[1].lower() | |
| file_types[ext if ext else "no_extension"] += 1 | |
| # Process directories (up to a reasonable depth to avoid API rate limits) | |
| max_depth = 3 | |
| for depth in range(max_depth): | |
| if not directories: | |
| break | |
| next_level = [] | |
| for directory in directories[:100]: # Limit to avoid excessive API calls | |
| dir_contents = self.client._execute_request(repo.get_contents, directory) | |
| if not dir_contents: | |
| continue | |
| for item in dir_contents: | |
| if item.type == "dir": | |
| next_level.append(item.path) | |
| elif item.type == "file": | |
| ext = os.path.splitext(item.name)[1].lower() | |
| file_types[ext if ext else "no_extension"] += 1 | |
| directories = next_level | |
| return dict(file_types) | |
| except GithubException: | |
| logger.warning(f"Could not get file distribution for {repo.full_name}") | |
| return {} | |
| def search_code(self, repo, query_terms: List[str]) -> List[Dict[str, Any]]: | |
| """Search for specific terms in the repository code.""" | |
| logger.info(f"Searching code in {repo.full_name} for terms: {query_terms}") | |
| results = [] | |
| for term in query_terms: | |
| query = f"repo:{repo.full_name} {term}" | |
| search_results = self.client._paginated_request( | |
| self.client.github.search_code, | |
| query, | |
| max_results=self.config.max_search_results | |
| ) | |
| if search_results: | |
| results.extend([ | |
| { | |
| "term": term, | |
| "name": result.name, | |
| "path": result.path, | |
| "sha": result.sha, | |
| "url": result.html_url, | |
| "repository": result.repository.full_name, | |
| } | |
| for result in search_results | |
| if result.repository.full_name == repo.full_name | |
| ]) | |
| return results | |
| def get_branches(self, repo) -> List[Dict[str, Any]]: | |
| """Get repository branches.""" | |
| logger.info(f"Fetching branches for {repo.full_name}") | |
| branches = self.client._paginated_request(repo.get_branches) | |
| if branches is None: | |
| return [] | |
| return [ | |
| { | |
| "name": branch.name, | |
| "protected": branch.protected, | |
| "commit_sha": branch.commit.sha if branch.commit else None, | |
| } | |
| for branch in branches | |
| ] | |
| def get_releases(self, repo) -> List[Dict[str, Any]]: | |
| """Get repository releases.""" | |
| logger.info(f"Fetching releases for {repo.full_name}") | |
| releases = self.client._paginated_request(repo.get_releases) | |
| if releases is None: | |
| return [] | |
| return [ | |
| { | |
| "id": release.id, | |
| "tag_name": release.tag_name, | |
| "name": release.title, | |
| "body": release.body, | |
| "draft": release.draft, | |
| "prerelease": release.prerelease, | |
| "created_at": release.created_at.isoformat() if release.created_at else None, | |
| "published_at": release.published_at.isoformat() if release.published_at else None, | |
| "author_login": release.author.login if release.author else None, | |
| "html_url": release.html_url, | |
| "assets": [ | |
| { | |
| "name": asset.name, | |
| "label": asset.label, | |
| "content_type": asset.content_type, | |
| "size": asset.size, | |
| "download_count": asset.download_count, | |
| "browser_download_url": asset.browser_download_url, | |
| } | |
| for asset in release.get_assets() | |
| ], | |
| } | |
| for release in releases | |
| ] | |
| def get_workflows(self, repo) -> List[Dict[str, Any]]: | |
| """Get repository GitHub Actions workflows.""" | |
| logger.info(f"Fetching workflows for {repo.full_name}") | |
| try: | |
| workflows = self.client._paginated_request(repo.get_workflows) | |
| if workflows is None: | |
| return [] | |
| return [ | |
| { | |
| "id": workflow.id, | |
| "name": workflow.name, | |
| "path": workflow.path, | |
| "state": workflow.state, | |
| "created_at": workflow.created_at.isoformat() if workflow.created_at else None, | |
| "updated_at": workflow.updated_at.isoformat() if workflow.updated_at else None, | |
| } | |
| for workflow in workflows | |
| ] | |
| except (GithubException, AttributeError): | |
| # Older PyGithub versions or repositories without workflows | |
| return [] | |
| def analyze_commit_activity(self, repo) -> Dict[str, Any]: | |
| """Analyze commit activity patterns.""" | |
| logger.info(f"Analyzing commit activity for {repo.full_name}") | |
| # Get stats commit activity | |
| stats = self.client._execute_request(repo.get_stats_commit_activity) | |
| if not stats: | |
| return {} | |
| weekly_commits = [] | |
| for week in stats: | |
| if hasattr(week, 'week') and hasattr(week, 'total'): | |
| date = datetime.datetime.fromtimestamp(week.week).strftime('%Y-%m-%d') | |
| weekly_commits.append({ | |
| "week": date, | |
| "total": week.total, | |
| "days": week.days if hasattr(week, 'days') else [], | |
| }) | |
| # Get code frequency | |
| code_freq = self.client._execute_request(repo.get_stats_code_frequency) | |
| if not code_freq: | |
| code_frequency = [] | |
| else: | |
| code_frequency = [] | |
| for item in code_freq: | |
| date = datetime.datetime.fromtimestamp(item[0]).strftime('%Y-%m-%d') | |
| code_frequency.append({ | |
| "week": date, | |
| "additions": item[1], | |
| "deletions": -item[2], # Convert to positive for better readability | |
| }) | |
| return { | |
| "weekly_commits": weekly_commits, | |
| "code_frequency": code_frequency, | |
| } | |
| def analyze_contributor_activity(self, repo) -> Dict[str, Any]: | |
| """Analyze contributor activity patterns.""" | |
| logger.info(f"Analyzing contributor activity for {repo.full_name}") | |
| # Get contributor stats | |
| stats = self.client._execute_request(repo.get_stats_contributors) | |
| if not stats: | |
| return {} | |
| contributor_stats = [] | |
| for stat in stats: | |
| if not hasattr(stat, 'author') or not stat.author: | |
| continue | |
| weeks_data = [] | |
| for week in stat.weeks: | |
| if hasattr(week, 'w'): | |
| date = datetime.datetime.fromtimestamp(week.w).strftime('%Y-%m-%d') | |
| weeks_data.append({ | |
| "week": date, | |
| "additions": week.a, | |
| "deletions": week.d, | |
| "commits": week.c, | |
| }) | |
| contributor_stats.append({ | |
| "author": stat.author.login, | |
| "total_commits": stat.total, | |
| "weeks": weeks_data, | |
| }) | |
| return { | |
| "contributor_stats": contributor_stats, | |
| } | |
| def analyze_issue_distribution(self, issues: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """Analyze distribution of issues by various metrics.""" | |
| if not issues: | |
| return {} | |
| # Convert to DataFrame for easier analysis | |
| df = pd.DataFrame(issues) | |
| # Issues by state | |
| state_counts = df['state'].value_counts().to_dict() if 'state' in df else {} | |
| # Issues by user | |
| user_counts = df['user_login'].value_counts().head(10).to_dict() if 'user_login' in df else {} | |
| # Pull requests vs regular issues | |
| is_pr_counts = df['pull_request'].value_counts().to_dict() if 'pull_request' in df else {} | |
| # Issues by labels (flattening the labels list) | |
| labels = [] | |
| if 'labels' in df: | |
| for label_list in df['labels']: | |
| if label_list: | |
| labels.extend(label_list) | |
| label_counts = Counter(labels) | |
| top_labels = dict(label_counts.most_common(10)) | |
| # Time analysis | |
| if 'created_at' in df: | |
| df['created_date'] = pd.to_datetime(df['created_at']) | |
| df['month_year'] = df['created_date'].dt.strftime('%Y-%m') | |
| issues_by_month = df.groupby('month_year').size().to_dict() | |
| else: | |
| issues_by_month = {} | |
| # Calculate resolution time for closed issues | |
| resolution_times = [] | |
| if 'created_at' in df and 'closed_at' in df: | |
| for _, issue in df.iterrows(): | |
| if pd.notna(issue.get('closed_at')) and pd.notna(issue.get('created_at')): | |
| created = pd.to_datetime(issue['created_at']) | |
| closed = pd.to_datetime(issue['closed_at']) | |
| resolution_time = (closed - created).total_seconds() / 3600 # hours | |
| resolution_times.append(resolution_time) | |
| resolution_stats = {} | |
| if resolution_times: | |
| resolution_stats = { | |
| "mean_hours": sum(resolution_times) / len(resolution_times), | |
| "median_hours": sorted(resolution_times)[len(resolution_times) // 2], | |
| "min_hours": min(resolution_times), | |
| "max_hours": max(resolution_times), | |
| } | |
| return { | |
| "by_state": state_counts, | |
| "by_user": user_counts, | |
| "pr_vs_issue": is_pr_counts, | |
| "by_label": top_labels, | |
| "by_month": issues_by_month, | |
| "resolution_time": resolution_stats, | |
| } | |
| def generate_insights(self, repo_data: Dict[str, Any]) -> Dict[str, Any]: | |
| """Generate higher-level insights from the collected repository data.""" | |
| insights = {} | |
| # Repository activity and health | |
| if "repo_details" in repo_data: | |
| repo_details = repo_data["repo_details"] | |
| insights["repository_age_days"] = self._calculate_age_days(repo_details.get("created_at")) | |
| insights["freshness_days"] = self._calculate_freshness_days(repo_details.get("pushed_at")) | |
| # Popularity metrics | |
| insights["popularity"] = { | |
| "stars": repo_details.get("stargazers_count", 0), | |
| "forks": repo_details.get("forks_count", 0), | |
| "watchers": repo_details.get("watchers_count", 0), | |
| "star_fork_ratio": self._calculate_ratio( | |
| repo_details.get("stargazers_count", 0), | |
| repo_details.get("forks_count", 0) | |
| ), | |
| } | |
| # Language distribution | |
| if "languages" in repo_data: | |
| languages = repo_data["languages"] | |
| total_bytes = sum(languages.values()) if languages else 0 | |
| if total_bytes > 0: | |
| language_percentages = { | |
| lang: (bytes_count / total_bytes) * 100 | |
| for lang, bytes_count in languages.items() | |
| } | |
| insights["language_distribution"] = { | |
| "primary_language": max(languages.items(), key=lambda x: x[1])[0] if languages else None, | |
| "language_count": len(languages), | |
| "percentages": language_percentages, | |
| } | |
| # Contributor insights | |
| if "contributors" in repo_data: | |
| contributors = repo_data["contributors"] | |
| if contributors: | |
| total_contributions = sum(c.get("contributions", 0) for c in contributors) | |
| insights["contributor_insights"] = { | |
| "contributor_count": len(contributors), | |
| "total_contributions": total_contributions, | |
| "avg_contributions_per_contributor": total_contributions / len(contributors) if len(contributors) > 0 else 0, | |
| "contribution_distribution": self._analyze_contribution_distribution(contributors), | |
| } | |
| # Issue and PR dynamics | |
| if "issues" in repo_data: | |
| issues = repo_data["issues"] | |
| insights["issue_insights"] = self.analyze_issue_distribution(issues) | |
| if "pull_requests" in repo_data: | |
| prs = repo_data["pull_requests"] | |
| insights["pr_insights"] = self.analyze_issue_distribution(prs) # Reuse the same analysis | |
| # Additional PR-specific metrics | |
| if prs: | |
| insights["pr_code_change_stats"] = self._analyze_pr_code_changes(prs) | |
| # Commit patterns | |
| if "commits" in repo_data: | |
| commits = repo_data["commits"] | |
| insights["commit_insights"] = self._analyze_commit_patterns(commits) | |
| # Check for CI/CD presence | |
| insights["ci_cd_presence"] = self._detect_ci_cd(repo_data) | |
| # Documentation quality | |
| if "readme" in repo_data: | |
| readme = repo_data["readme"] | |
| insights["documentation_quality"] = self._assess_documentation_quality(readme) | |
| # Project Activity Level | |
| insights["activity_level"] = self._calculate_activity_level(repo_data) | |
| # Code complexity analysis | |
| insights["code_complexity"] = self._analyze_code_complexity(repo_data) | |
| # Community health analysis | |
| insights["community_health"] = self._analyze_community_health(repo_data) | |
| return insights | |
| def _calculate_age_days(self, created_at_iso: str) -> float: | |
| """Calculate repository age in days.""" | |
| if not created_at_iso: | |
| return 0 | |
| try: | |
| created_at = datetime.datetime.fromisoformat(created_at_iso.replace('Z', '+00:00')) | |
| now = datetime.datetime.now(datetime.timezone.utc) | |
| return (now - created_at).total_seconds() / (24 * 3600) | |
| except ValueError: | |
| return 0 | |
| def _calculate_freshness_days(self, pushed_at_iso: str) -> float: | |
| """Calculate days since last push.""" | |
| if not pushed_at_iso: | |
| return float('inf') | |
| try: | |
| pushed_at = datetime.datetime.fromisoformat(pushed_at_iso.replace('Z', '+00:00')) | |
| now = datetime.datetime.now(datetime.timezone.utc) | |
| return (now - pushed_at).total_seconds() / (24 * 3600) | |
| except ValueError: | |
| return float('inf') | |
| def _calculate_ratio(self, numerator: int, denominator: int) -> float: | |
| """Calculate ratio with handling for zero denominator.""" | |
| return numerator / denominator if denominator and denominator > 0 else float('inf') | |
| def _analyze_contribution_distribution(self, contributors: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """Analyze the distribution of contributions among contributors.""" | |
| if not contributors: | |
| return {} | |
| # Sort contributors by number of contributions | |
| sorted_contributors = sorted(contributors, key=lambda c: c.get("contributions", 0), reverse=True) | |
| # Calculate percentiles | |
| total_contributions = sum(c.get("contributions", 0) for c in contributors) | |
| cumulative_contributions = 0 | |
| percentile_20 = 0 | |
| percentile_50 = 0 | |
| percentile_80 = 0 | |
| for i, contributor in enumerate(sorted_contributors): | |
| contributions = contributor.get("contributions", 0) | |
| cumulative_contributions += contributions | |
| percentage = (cumulative_contributions / total_contributions) * 100 | |
| if percentage >= 20 and percentile_20 == 0: | |
| percentile_20 = i + 1 | |
| if percentage >= 50 and percentile_50 == 0: | |
| percentile_50 = i + 1 | |
| if percentage >= 80 and percentile_80 == 0: | |
| percentile_80 = i + 1 | |
| # Calculate Gini coefficient to measure inequality | |
| gini = self._calculate_gini([c.get("contributions", 0) for c in contributors]) | |
| return { | |
| "contributors_for_20_percent": percentile_20, | |
| "contributors_for_50_percent": percentile_50, | |
| "contributors_for_80_percent": percentile_80, | |
| "gini_coefficient": gini, | |
| "top_contributor_percentage": (sorted_contributors[0].get("contributions", 0) / total_contributions) * 100 if sorted_contributors else 0, | |
| } | |
| def _calculate_gini(self, values: List[int]) -> float: | |
| """Calculate the Gini coefficient of a distribution.""" | |
| if not values or sum(values) == 0: | |
| return 0 | |
| values = sorted(values) | |
| n = len(values) | |
| cumsum = 0 | |
| for i, value in enumerate(values): | |
| cumsum += value | |
| values[i] = cumsum | |
| return (2 * sum(values) / (n * sum(values[-1]))) - (n + 1) / n | |
| def _analyze_pr_code_changes(self, prs: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """Analyze code changes across pull requests.""" | |
| if not prs: | |
| return {} | |
| # Extract metrics | |
| additions = [pr.get("additions", 0) for pr in prs if pr.get("additions") is not None] | |
| deletions = [pr.get("deletions", 0) for pr in prs if pr.get("deletions") is not None] | |
| changed_files = [pr.get("changed_files", 0) for pr in prs if pr.get("changed_files") is not None] | |
| # Calculate stats | |
| stats = {} | |
| if additions: | |
| stats["additions"] = { | |
| "mean": sum(additions) / len(additions), | |
| "median": sorted(additions)[len(additions) // 2], | |
| "max": max(additions), | |
| "total": sum(additions), | |
| } | |
| if deletions: | |
| stats["deletions"] = { | |
| "mean": sum(deletions) / len(deletions), | |
| "median": sorted(deletions)[len(deletions) // 2], | |
| "max": max(deletions), | |
| "total": sum(deletions), | |
| } | |
| if changed_files: | |
| stats["changed_files"] = { | |
| "mean": sum(changed_files) / len(changed_files), | |
| "median": sorted(changed_files)[len(changed_files) // 2], | |
| "max": max(changed_files), | |
| "total": sum(changed_files), | |
| } | |
| return stats | |
| def _analyze_commit_patterns(self, commits: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """Analyze patterns in commit data.""" | |
| if not commits: | |
| return {} | |
| # Count by author | |
| commit_counts = Counter( | |
| commit.get("author_login", "Unknown") | |
| for commit in commits | |
| if commit.get("author_login") | |
| ) | |
| # Analyze message patterns | |
| message_lengths = [ | |
| len(commit.get("commit_message", "")) | |
| for commit in commits | |
| if commit.get("commit_message") | |
| ] | |
| # Extract dates for time-based analysis | |
| dates = [] | |
| for commit in commits: | |
| date_str = commit.get("date") | |
| if date_str: | |
| try: | |
| date = datetime.datetime.fromisoformat(date_str.replace('Z', '+00:00')) | |
| dates.append(date) | |
| except ValueError: | |
| pass | |
| # Analyze times of day | |
| hours = [date.hour for date in dates] | |
| hour_counts = Counter(hours) | |
| # Analyze days of week | |
| weekdays = [date.weekday() for date in dates] | |
| weekday_counts = Counter(weekdays) | |
| weekday_names = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"] | |
| weekday_data = {weekday_names[day]: count for day, count in weekday_counts.items()} | |
| # Analyze frequency of commits over time | |
| commit_frequency = {} | |
| if dates: | |
| dates_sorted = sorted(dates) | |
| first_date = dates_sorted[0] | |
| last_date = dates_sorted[-1] | |
| # Calculate commit frequency by month | |
| current_date = first_date.replace(day=1) | |
| while current_date <= last_date: | |
| next_month = current_date.replace(day=28) + datetime.timedelta(days=4) | |
| next_month = next_month.replace(day=1) | |
| month_key = current_date.strftime('%Y-%m') | |
| commit_frequency[month_key] = sum( | |
| 1 for date in dates | |
| if date.year == current_date.year and date.month == current_date.month | |
| ) | |
| current_date = next_month | |
| return { | |
| "top_contributors": dict(commit_counts.most_common(5)), | |
| "message_length": { | |
| "mean": sum(message_lengths) / len(message_lengths) if message_lengths else 0, | |
| "max": max(message_lengths) if message_lengths else 0, | |
| "min": min(message_lengths) if message_lengths else 0, | |
| }, | |
| "commit_time_patterns": { | |
| "by_hour": dict(sorted(hour_counts.items())), | |
| "by_weekday": weekday_data, | |
| }, | |
| "commit_frequency": commit_frequency, | |
| } | |
| def _detect_ci_cd(self, repo_data: Dict[str, Any]) -> Dict[str, Any]: | |
| """Detect CI/CD presence and configuration in the repository.""" | |
| ci_cd_indicators = { | |
| "github_actions": False, | |
| "travis": False, | |
| "circle_ci": False, | |
| "jenkins": False, | |
| "gitlab_ci": False, | |
| "azure_pipelines": False, | |
| } | |
| # Check workflows | |
| if "workflows" in repo_data and repo_data["workflows"]: | |
| ci_cd_indicators["github_actions"] = True | |
| # Check for CI configuration files | |
| if "file_distribution" in repo_data: | |
| files = repo_data.get("file_distribution", {}) | |
| if ".travis.yml" in files: | |
| ci_cd_indicators["travis"] = True | |
| if ".circleci/config.yml" in files or "circle.yml" in files: | |
| ci_cd_indicators["circle_ci"] = True | |
| if "Jenkinsfile" in files: | |
| ci_cd_indicators["jenkins"] = True | |
| if ".gitlab-ci.yml" in files: | |
| ci_cd_indicators["gitlab_ci"] = True | |
| if "azure-pipelines.yml" in files: | |
| ci_cd_indicators["azure_pipelines"] = True | |
| return { | |
| "has_ci_cd": any(ci_cd_indicators.values()), | |
| "ci_cd_systems": ci_cd_indicators, | |
| } | |
| def _assess_documentation_quality(self, readme: str) -> Dict[str, Any]: | |
| """Assess the quality of documentation based on the README.""" | |
| if not readme: | |
| return { | |
| "has_readme": False, | |
| "readme_length": 0, | |
| "score": 0, | |
| "sections": {}, | |
| } | |
| # Analyze the README content | |
| lines = readme.strip().split('\n') | |
| word_count = len(readme.split()) | |
| sections = {} | |
| # Check for common README sections | |
| section_keywords = { | |
| "introduction": ["introduction", "overview", "about"], | |
| "installation": ["installation", "install", "setup", "getting started"], | |
| "usage": ["usage", "using", "example", "examples"], | |
| "api": ["api", "reference", "documentation"], | |
| "contributing": ["contributing", "contribute", "development"], | |
| "license": ["license", "licensing"], | |
| "code_of_conduct": ["code of conduct"], | |
| } | |
| for section, keywords in section_keywords.items(): | |
| sections[section] = any( | |
| any(keyword.lower() in line.lower() for keyword in keywords) | |
| for line in lines | |
| ) | |
| # Count images/diagrams (markdown format) | |
| image_count = readme.count("![") | |
| # Count code examples | |
| code_block_count = readme.count("```") | |
| # Calculate a simple score | |
| section_score = sum(1 for present in sections.values() if present) / len(sections) | |
| has_images = image_count > 0 | |
| has_code = code_block_count > 0 | |
| length_score = min(1.0, word_count / 1000) # Normalize to 0-1, with 1000+ words being "complete" | |
| score = (section_score * 0.5) + (has_images * 0.2) + (has_code * 0.2) + (length_score * 0.1) | |
| return { | |
| "has_readme": True, | |
| "readme_length": word_count, | |
| "score": score, | |
| "sections": sections, | |
| "has_images": has_images, | |
| "image_count": image_count, | |
| "has_code_examples": has_code, | |
| "code_block_count": code_block_count // 2, # Each block has opening and closing ``` | |
| } | |
| def _calculate_activity_level(self, repo_data: Dict[str, Any]) -> Dict[str, Any]: | |
| """Calculate repository activity level based on commits, PRs, and issues.""" | |
| activity_score = 0 | |
| activity_details = {} | |
| # Get repository age in months | |
| if "repo_details" in repo_data: | |
| age_days = self._calculate_age_days(repo_data["repo_details"].get("created_at")) | |
| age_months = age_days / 30.5 # Approximate | |
| if age_months < 1: | |
| age_months = 1 # Avoid division by zero | |
| activity_details["age_months"] = age_months | |
| else: | |
| age_months = 1 | |
| # Check recent commits (last 3 months) | |
| recent_commits = 0 | |
| if "commits" in repo_data: | |
| commits = repo_data["commits"] | |
| three_months_ago = datetime.datetime.now(datetime.timezone.utc) - relativedelta(months=3) | |
| for commit in commits: | |
| if commit.get("date"): | |
| commit_date = datetime.datetime.fromisoformat(commit["date"].replace('Z', '+00:00')) | |
| if commit_date >= three_months_ago: | |
| recent_commits += 1 | |
| activity_details["recent_commits"] = recent_commits | |
| activity_score += min(10, recent_commits / 10) # Up to 10 points for recent commits | |
| # Check recent PRs and issues (last 3 months) | |
| recent_prs = 0 | |
| if "pull_requests" in repo_data: | |
| prs = repo_data["pull_requests"] | |
| three_months_ago = datetime.datetime.now(datetime.timezone.utc) - relativedelta(months=3) | |
| for pr in prs: | |
| if pr.get("created_at"): | |
| pr_date = datetime.datetime.fromisoformat(pr["created_at"].replace('Z', '+00:00')) | |
| if pr_date >= three_months_ago: | |
| recent_prs += 1 | |
| activity_details["recent_prs"] = recent_prs | |
| activity_score += min(5, recent_prs / 5) # Up to 5 points for recent PRs | |
| recent_issues = 0 | |
| if "issues" in repo_data: | |
| issues = [issue for issue in repo_data["issues"] if not issue.get("pull_request")] | |
| three_months_ago = datetime.datetime.now(datetime.timezone.utc) - relativedelta(months=3) | |
| for issue in issues: | |
| if issue.get("created_at"): | |
| issue_date = datetime.datetime.fromisoformat(issue["created_at"].replace('Z', '+00:00')) | |
| if issue_date >= three_months_ago: | |
| recent_issues += 1 | |
| activity_details["recent_issues"] = recent_issues | |
| activity_score += min(5, recent_issues / 5) # Up to 5 points for recent issues | |
| # Check release frequency | |
| if "releases" in repo_data: | |
| releases = repo_data["releases"] | |
| release_count = len(releases) | |
| # Calculate releases per month | |
| releases_per_month = release_count / max(1, age_months) | |
| activity_details["releases_per_month"] = releases_per_month | |
| activity_score += min(5, releases_per_month * 2.5) # Up to 5 points for regular releases | |
| # Determine activity level | |
| activity_level = "None" | |
| if activity_score >= 20: | |
| activity_level = "Very High" | |
| elif activity_score >= 15: | |
| activity_level = "High" | |
| elif activity_score >= 10: | |
| activity_level = "Medium" | |
| elif activity_score >= 5: | |
| activity_level = "Low" | |
| elif activity_score > 0: | |
| activity_level = "Very Low" | |
| return { | |
| "score": activity_score, | |
| "level": activity_level, | |
| "details": activity_details, | |
| } | |
| def _analyze_code_complexity(self, repo_data: Dict[str, Any]) -> Dict[str, Any]: | |
| """Estimate code complexity based on available metrics.""" | |
| complexity = {} | |
| # Analyze file distribution | |
| if "file_distribution" in repo_data: | |
| file_types = repo_data["file_distribution"] | |
| total_files = sum(file_types.values()) | |
| code_files = sum( | |
| count for ext, count in file_types.items() | |
| if ext in self.config.all_code_extensions() | |
| ) | |
| complexity["file_counts"] = { | |
| "total_files": total_files, | |
| "code_files": code_files, | |
| } | |
| # Analyze PR complexity | |
| if "pull_requests" in repo_data: | |
| prs = repo_data["pull_requests"] | |
| # Get average changes per PR | |
| additions = [pr.get("additions", 0) for pr in prs if pr.get("additions") is not None] | |
| deletions = [pr.get("deletions", 0) for pr in prs if pr.get("deletions") is not None] | |
| changed_files = [pr.get("changed_files", 0) for pr in prs if pr.get("changed_files") is not None] | |
| if additions and deletions and changed_files: | |
| avg_additions = sum(additions) / len(additions) | |
| avg_deletions = sum(deletions) / len(deletions) | |
| avg_changed_files = sum(changed_files) / len(changed_files) | |
| complexity["pr_complexity"] = { | |
| "avg_additions": avg_additions, | |
| "avg_deletions": avg_deletions, | |
| "avg_changed_files": avg_changed_files, | |
| } | |
| # Estimate complexity score | |
| pr_complexity_score = min(10, (avg_additions + avg_deletions) / 100) | |
| complexity["pr_complexity_score"] = pr_complexity_score | |
| # Check dependency complexity | |
| dependency_complexity_score = 0 | |
| if "commit_insights" in repo_data.get("insights", {}): | |
| commit_messages = [ | |
| commit.get("commit_message", "").lower() | |
| for commit in repo_data.get("commits", []) | |
| ] | |
| # Check for dependency-related keywords | |
| dependency_keywords = ["dependency", "dependencies", "upgrade", "update", "version", "package"] | |
| dependency_commits = sum( | |
| 1 for message in commit_messages | |
| if any(keyword in message for keyword in dependency_keywords) | |
| ) | |
| dependency_ratio = dependency_commits / len(commit_messages) if commit_messages else 0 | |
| dependency_complexity_score = min(5, dependency_ratio * 20) # Up to 5 points | |
| complexity["dependency_complexity"] = { | |
| "dependency_commits": dependency_commits, | |
| "dependency_ratio": dependency_ratio, | |
| "score": dependency_complexity_score, | |
| } | |
| # Overall complexity score | |
| overall_score = 0 | |
| contributors = len(repo_data.get("contributors", [])) | |
| if contributors > 0: | |
| contributor_score = min(5, contributors / 10) # Up to 5 points | |
| overall_score += contributor_score | |
| if "pr_complexity_score" in complexity: | |
| overall_score += complexity["pr_complexity_score"] | |
| overall_score += dependency_complexity_score | |
| # Code size complexity | |
| if "languages" in repo_data: | |
| languages = repo_data["languages"] | |
| total_bytes = sum(languages.values()) if languages else 0 | |
| # Size points based on code size in MB | |
| size_mb = total_bytes / (1024 * 1024) | |
| size_score = min(10, size_mb / 5) # Up to 10 points for large codebases | |
| overall_score += size_score | |
| complexity["code_size"] = { | |
| "total_bytes": total_bytes, | |
| "size_mb": size_mb, | |
| "score": size_score, | |
| } | |
| # Determine complexity level | |
| complexity_level = "Low" | |
| if overall_score >= 25: | |
| complexity_level = "Very High" | |
| elif overall_score >= 20: | |
| complexity_level = "High" | |
| elif overall_score >= 15: | |
| complexity_level = "Medium-High" | |
| elif overall_score >= 10: | |
| complexity_level = "Medium" | |
| elif overall_score >= 5: | |
| complexity_level = "Low-Medium" | |
| complexity["overall"] = { | |
| "score": overall_score, | |
| "level": complexity_level, | |
| } | |
| return complexity | |
| def _analyze_community_health(self, repo_data: Dict[str, Any]) -> Dict[str, Any]: | |
| """Analyze the community health of the repository.""" | |
| health = {} | |
| # Calculate issue responsiveness | |
| if "issues" in repo_data: | |
| issues = repo_data["issues"] | |
| closed_issues = [issue for issue in issues if issue.get("state") == "closed"] | |
| if issues: | |
| closure_rate = len(closed_issues) / len(issues) | |
| health["issue_closure_rate"] = closure_rate | |
| # Calculate average time to close | |
| resolution_times = [] | |
| for issue in closed_issues: | |
| if issue.get("created_at") and issue.get("closed_at"): | |
| created = datetime.datetime.fromisoformat(issue["created_at"].replace('Z', '+00:00')) | |
| closed = datetime.datetime.fromisoformat(issue["closed_at"].replace('Z', '+00:00')) | |
| resolution_time = (closed - created).total_seconds() / 3600 # hours | |
| resolution_times.append(resolution_time) | |
| if resolution_times: | |
| avg_resolution_time = sum(resolution_times) / len(resolution_times) | |
| health["avg_issue_resolution_time_hours"] = avg_resolution_time | |
| # Calculate PR review responsiveness | |
| if "pull_requests" in repo_data: | |
| prs = repo_data["pull_requests"] | |
| merged_prs = [pr for pr in prs if pr.get("merged")] | |
| if prs: | |
| merge_rate = len(merged_prs) / len(prs) | |
| health["pr_merge_rate"] = merge_rate | |
| # Calculate average time to merge | |
| merge_times = [] | |
| for pr in merged_prs: | |
| if pr.get("created_at") and pr.get("merged_at"): | |
| created = datetime.datetime.fromisoformat(pr["created_at"].replace('Z', '+00:00')) | |
| merged = datetime.datetime.fromisoformat(pr["merged_at"].replace('Z', '+00:00')) | |
| merge_time = (merged - created).total_seconds() / 3600 # hours | |
| merge_times.append(merge_time) | |
| if merge_times: | |
| avg_merge_time = sum(merge_times) / len(merge_times) | |
| health["avg_pr_merge_time_hours"] = avg_merge_time | |
| # Check for community guidelines | |
| community_files = [ | |
| "CONTRIBUTING.md", | |
| "CODE_OF_CONDUCT.md", | |
| "SECURITY.md", | |
| "SUPPORT.md", | |
| "GOVERNANCE.md", | |
| ] | |
| community_file_presence = {} | |
| if "file_distribution" in repo_data: | |
| file_paths = [] | |
| for item in repo_data.get("file_distribution", {}): | |
| file_paths.append(item) | |
| for community_file in community_files: | |
| present = any(community_file.lower() in path.lower() for path in file_paths) | |
| community_file_presence[community_file] = present | |
| health["community_guidelines"] = community_file_presence | |
| # Calculate contributor diversity | |
| if "contributors" in repo_data: | |
| contributors = repo_data["contributors"] | |
| if contributors: | |
| # Calculate Gini coefficient for contribution distribution | |
| gini = self._calculate_gini([c.get("contributions", 0) for c in contributors]) | |
| health["contributor_gini"] = gini | |
| # Interpret Gini coefficient | |
| if gini < 0.4: | |
| diversity_level = "High" | |
| elif gini < 0.6: | |
| diversity_level = "Medium" | |
| else: | |
| diversity_level = "Low" | |
| health["contributor_diversity"] = diversity_level | |
| # Calculate overall health score | |
| health_score = 0 | |
| # Points for issue responsiveness | |
| if "issue_closure_rate" in health: | |
| health_score += health["issue_closure_rate"] * 10 # Up to 10 points | |
| # Points for PR responsiveness | |
| if "pr_merge_rate" in health: | |
| health_score += health["pr_merge_rate"] * 10 # Up to 10 points | |
| # Points for community guidelines | |
| guideline_count = sum(1 for present in community_file_presence.values() if present) | |
| health_score += guideline_count * 2 # Up to 10 points | |
| # Points for contributor diversity | |
| if "contributor_gini" in health: | |
| diversity_score = 10 * (1 - health["contributor_gini"]) # Up to 10 points | |
| health_score += diversity_score | |
| # Determine health level | |
| health_level = "Poor" | |
| if health_score >= 30: | |
| health_level = "Excellent" | |
| elif health_score >= 25: | |
| health_level = "Very Good" | |
| elif health_score >= 20: | |
| health_level = "Good" | |
| elif health_score >= 15: | |
| health_level = "Fair" | |
| elif health_score >= 10: | |
| health_level = "Needs Improvement" | |
| health["overall"] = { | |
| "score": health_score, | |
| "level": health_level, | |
| } | |
| return health | |
| def generate_visualizations(self, repo_data: Dict[str, Any], insights: Dict[str, Any]) -> Dict[str, plt.Figure]: | |
| """ | |
| Generate visualizations of repository data. | |
| Returns: | |
| Dict of visualization figures | |
| """ | |
| if not self.config.generate_visualizations: | |
| return {} | |
| figures = {} | |
| # Create visualizations | |
| lang_fig = self._visualize_language_distribution(repo_data) | |
| if lang_fig: | |
| figures["language_distribution"] = lang_fig | |
| commit_figs = self._visualize_commit_activity(repo_data, insights) | |
| figures.update(commit_figs) | |
| contrib_figs = self._visualize_contributor_activity(repo_data, insights) | |
| figures.update(contrib_figs) | |
| issue_figs = self._visualize_issues_and_prs(repo_data, insights) | |
| figures.update(issue_figs) | |
| # Add interactive visualizations with Plotly | |
| plotly_figs = self._generate_plotly_visualizations(repo_data, insights) | |
| figures.update(plotly_figs) | |
| # Generate collaboration network | |
| collab_fig = self._visualize_collaboration_network(repo_data, insights) | |
| if collab_fig: | |
| figures["collaboration_network"] = collab_fig | |
| return figures | |
| def _visualize_language_distribution(self, repo_data: Dict[str, Any]) -> Optional[plt.Figure]: | |
| """Create a visualization of language distribution.""" | |
| languages = repo_data.get("languages", {}) | |
| if not languages: | |
| return None | |
| # Create a pie chart of language distribution | |
| fig, ax = plt.subplots(figsize=(10, 6)) | |
| total = sum(languages.values()) | |
| # Filter out small languages for better visualization | |
| threshold = total * 0.01 # 1% threshold | |
| other_sum = sum(size for lang, size in languages.items() if size < threshold) | |
| filtered_languages = {lang: size for lang, size in languages.items() if size >= threshold} | |
| if other_sum > 0: | |
| filtered_languages["Other"] = other_sum | |
| sizes = list(filtered_languages.values()) | |
| labels = list(filtered_languages.keys()) | |
| wedges, texts, autotexts = ax.pie( | |
| sizes, | |
| labels=labels, | |
| autopct='%1.1f%%', | |
| startangle=90, | |
| shadow=False, | |
| textprops={'fontsize': 9}, # Smaller font for better fit | |
| wedgeprops={'linewidth': 1, 'edgecolor': 'white'} # Add white edge | |
| ) | |
| # Make the percentage labels more readable | |
| for autotext in autotexts: | |
| autotext.set_color('white') | |
| autotext.set_fontweight('bold') | |
| ax.axis('equal') | |
| plt.title(f"Language Distribution", fontsize=16) | |
| plt.tight_layout() | |
| return fig | |
| def _visualize_commit_activity(self, repo_data: Dict[str, Any], insights: Dict[str, Any]) -> Dict[str, plt.Figure]: | |
| """Create visualizations of commit activity.""" | |
| figures = {} | |
| commit_activity = repo_data.get("commit_activity", {}) | |
| weekly_commits = commit_activity.get("weekly_commits", []) | |
| if weekly_commits: | |
| # Extract weeks and commit counts | |
| weeks = [item["week"] for item in weekly_commits] | |
| commits = [item["total"] for item in weekly_commits] | |
| # Create a time series plot | |
| fig, ax = plt.subplots(figsize=(12, 6)) | |
| ax.plot(weeks, commits, marker='o', linestyle='-', color='blue', alpha=0.7) | |
| # Add trend line | |
| z = np.polyfit(range(len(weeks)), commits, 1) | |
| p = np.poly1d(z) | |
| ax.plot(weeks, p(range(len(weeks))), "r--", alpha=0.7) | |
| ax.set_title("Weekly Commit Activity", fontsize=16) | |
| ax.set_xlabel("Week") | |
| ax.set_ylabel("Number of Commits") | |
| plt.xticks(rotation=45) | |
| ax.grid(True, linestyle='--', alpha=0.7) | |
| # Show only some x-axis labels to avoid crowding | |
| if len(weeks) > 20: | |
| every_nth = len(weeks) // 10 | |
| for n, label in enumerate(ax.xaxis.get_ticklabels()): | |
| if n % every_nth != 0: | |
| label.set_visible(False) | |
| plt.tight_layout() | |
| figures["weekly_commits"] = fig | |
| # Visualize code frequency if available | |
| code_frequency = commit_activity.get("code_frequency", []) | |
| if code_frequency: | |
| weeks = [item["week"] for item in code_frequency] | |
| additions = [item["additions"] for item in code_frequency] | |
| deletions = [item["deletions"] for item in code_frequency] | |
| fig, ax = plt.subplots(figsize=(12, 6)) | |
| ax.plot(weeks, additions, marker='o', linestyle='-', color='green', label='Additions') | |
| ax.plot(weeks, deletions, marker='o', linestyle='-', color='red', label='Deletions') | |
| ax.set_title("Code Frequency", fontsize=16) | |
| ax.set_xlabel("Week") | |
| ax.set_ylabel("Lines Changed") | |
| plt.xticks(rotation=45) | |
| ax.legend() | |
| ax.grid(True, linestyle='--', alpha=0.7) | |
| # Show only some x-axis labels to avoid crowding | |
| if len(weeks) > 20: | |
| every_nth = len(weeks) // 10 | |
| for n, label in enumerate(ax.xaxis.get_ticklabels()): | |
| if n % every_nth != 0: | |
| label.set_visible(False) | |
| plt.tight_layout() | |
| figures["code_frequency"] = fig | |
| # Commits by weekday | |
| if "commit_insights" in insights: | |
| commit_insights = insights["commit_insights"] | |
| by_weekday = commit_insights.get("commit_time_patterns", {}).get("by_weekday", {}) | |
| if by_weekday: | |
| fig, ax = plt.subplots(figsize=(10, 6)) | |
| weekdays = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"] | |
| counts = [by_weekday.get(day, 0) for day in weekdays] | |
| # Create gradient colors based on commit counts | |
| colors = plt.cm.Blues(np.array(counts) / max(counts)) | |
| ax.bar(weekdays, counts, color=colors) | |
| ax.set_title("Commits by Day of Week", fontsize=16) | |
| ax.set_xlabel("Day of Week") | |
| ax.set_ylabel("Number of Commits") | |
| ax.grid(True, axis='y', linestyle='--', alpha=0.7) | |
| plt.tight_layout() | |
| figures["commits_by_weekday"] = fig | |
| # Commits by hour | |
| by_hour = commit_insights.get("commit_time_patterns", {}).get("by_hour", {}) | |
| if by_hour: | |
| fig, ax = plt.subplots(figsize=(12, 6)) | |
| hours = sorted(by_hour.keys()) | |
| counts = [by_hour[hour] for hour in hours] | |
| # Create gradient colors based on commit counts | |
| colors = plt.cm.Greens(np.array(counts) / max(counts)) | |
| ax.bar(hours, counts, color=colors) | |
| ax.set_title("Commits by Hour of Day (UTC)", fontsize=16) | |
| ax.set_xlabel("Hour") | |
| ax.set_ylabel("Number of Commits") | |
| ax.set_xticks(range(0, 24, 2)) | |
| ax.grid(True, axis='y', linestyle='--', alpha=0.7) | |
| plt.tight_layout() | |
| figures["commits_by_hour"] = fig | |
| return figures | |
| def _visualize_contributor_activity(self, repo_data: Dict[str, Any], insights: Dict[str, Any]) -> Dict[str, plt.Figure]: | |
| """Create visualizations of contributor activity.""" | |
| figures = {} | |
| contributors = repo_data.get("contributors", []) | |
| if contributors: | |
| # Create a bar chart of top contributors | |
| contributors_sorted = sorted(contributors, key=lambda x: x.get("contributions", 0), reverse=True) | |
| top_n = min(10, len(contributors_sorted)) | |
| fig, ax = plt.subplots(figsize=(12, 6)) | |
| names = [c.get("login", "Unknown") for c in contributors_sorted[:top_n]] | |
| contributions = [c.get("contributions", 0) for c in contributors_sorted[:top_n]] | |
| # Create gradient colors based on contribution counts | |
| colors = plt.cm.viridis(np.array(contributions) / max(contributions)) | |
| bars = ax.bar(names, contributions, color=colors) | |
| ax.set_title("Top Contributors by Commit Count", fontsize=16) | |
| ax.set_xlabel("Contributor") | |
| ax.set_ylabel("Number of Commits") | |
| plt.xticks(rotation=45, ha='right') | |
| ax.grid(True, axis='y', linestyle='--', alpha=0.7) | |
| # Add value labels on top of bars | |
| for bar in bars: | |
| height = bar.get_height() | |
| ax.annotate(f'{height}', | |
| xy=(bar.get_x() + bar.get_width() / 2, height), | |
| xytext=(0, 3), # 3 points vertical offset | |
| textcoords="offset points", | |
| ha='center', va='bottom') | |
| plt.tight_layout() | |
| figures["top_contributors"] = fig | |
| # Visualize contribution distribution if insights available | |
| if "contributor_insights" in insights: | |
| contributor_insights = insights["contributor_insights"] | |
| distribution = contributor_insights.get("contribution_distribution", {}) | |
| if distribution: | |
| # Create a pie chart showing contributor concentration | |
| fig, ax = plt.subplots(figsize=(10, 6)) | |
| percentiles = [ | |
| distribution.get("contributors_for_20_percent", 0), | |
| distribution.get("contributors_for_50_percent", 0) - distribution.get("contributors_for_20_percent", 0), | |
| distribution.get("contributors_for_80_percent", 0) - distribution.get("contributors_for_50_percent", 0), | |
| len(contributors) - distribution.get("contributors_for_80_percent", 0) | |
| ] | |
| labels = [ | |
| f"Top {percentiles[0]} contributors (0-20%)", | |
| f"Next {percentiles[1]} contributors (20-50%)", | |
| f"Next {percentiles[2]} contributors (50-80%)", | |
| f"Remaining {percentiles[3]} contributors (80-100%)" | |
| ] | |
| wedges, texts, autotexts = ax.pie( | |
| [20, 30, 30, 20], # Fixed percentages for visualization | |
| labels=labels, | |
| autopct='%1.1f%%', | |
| startangle=90, | |
| shadow=False, | |
| explode=(0.1, 0, 0, 0), # Emphasize the top contributors | |
| wedgeprops={'linewidth': 1, 'edgecolor': 'white'} # Add white edge | |
| ) | |
| # Make the percentage labels more readable | |
| for autotext in autotexts: | |
| autotext.set_color('white') | |
| autotext.set_fontweight('bold') | |
| ax.axis('equal') | |
| ax.set_title("Contribution Distribution", fontsize=16) | |
| plt.tight_layout() | |
| figures["contribution_distribution"] = fig | |
| return figures | |
| def _visualize_issues_and_prs(self, repo_data: Dict[str, Any], insights: Dict[str, Any]) -> Dict[str, plt.Figure]: | |
| """Create visualizations of issues and pull requests.""" | |
| figures = {} | |
| # Visualize issue distribution if available | |
| if "issue_insights" in insights: | |
| issue_insights = insights["issue_insights"] | |
| # Issues by state | |
| by_state = issue_insights.get("by_state", {}) | |
| if by_state: | |
| fig, ax = plt.subplots(figsize=(8, 6)) | |
| states = list(by_state.keys()) | |
| counts = list(by_state.values()) | |
| colors = ['red' if state.lower() == 'open' else 'green' for state in states] | |
| ax.bar(states, counts, color=colors) | |
| ax.set_title("Issues by State", fontsize=16) | |
| ax.set_xlabel("State") | |
| ax.set_ylabel("Count") | |
| # Add count labels on top of bars | |
| for i, v in enumerate(counts): | |
| ax.text(i, v + 0.5, str(v), ha='center') | |
| ax.grid(True, axis='y', linestyle='--', alpha=0.7) | |
| plt.tight_layout() | |
| figures["issues_by_state"] = fig | |
| # Issues by month | |
| by_month = issue_insights.get("by_month", {}) | |
| if by_month: | |
| fig, ax = plt.subplots(figsize=(12, 6)) | |
| months = sorted(by_month.keys()) | |
| counts = [by_month[month] for month in months] | |
| ax.plot(months, counts, marker='o', linestyle='-', color='blue') | |
| # Add trend line | |
| z = np.polyfit(range(len(months)), counts, 1) | |
| p = np.poly1d(z) | |
| ax.plot(months, p(range(len(months))), "r--", alpha=0.7) | |
| ax.set_title("Issues Created by Month", fontsize=16) | |
| ax.set_xlabel("Month") | |
| ax.set_ylabel("Number of Issues") | |
| plt.xticks(rotation=45) | |
| ax.grid(True, linestyle='--', alpha=0.7) | |
| # Show only some x-axis labels to avoid crowding | |
| if len(months) > 12: | |
| every_nth = max(1, len(months) // 12) | |
| for n, label in enumerate(ax.xaxis.get_ticklabels()): | |
| if n % every_nth != 0: | |
| label.set_visible(False) | |
| plt.tight_layout() | |
| figures["issues_by_month"] = fig | |
| # Issues by label | |
| by_label = issue_insights.get("by_label", {}) | |
| if by_label and len(by_label) > 1: | |
| fig, ax = plt.subplots(figsize=(12, 6)) | |
| labels = list(by_label.keys()) | |
| counts = list(by_label.values()) | |
| # Sort by count | |
| sorted_indices = np.argsort(counts)[::-1] | |
| labels = [labels[i] for i in sorted_indices] | |
| counts = [counts[i] for i in sorted_indices] | |
| # Limit to top 10 | |
| if len(labels) > 10: | |
| labels = labels[:10] | |
| counts = counts[:10] | |
| # Create gradient colors | |
| colors = plt.cm.tab10(np.linspace(0, 1, len(labels))) | |
| bars = ax.barh(labels, counts, color=colors) | |
| ax.set_title("Top Issue Labels", fontsize=16) | |
| ax.set_xlabel("Count") | |
| ax.set_ylabel("Label") | |
| # Add count labels | |
| for bar in bars: | |
| width = bar.get_width() | |
| ax.annotate(f'{int(width)}', | |
| xy=(width, bar.get_y() + bar.get_height() / 2), | |
| xytext=(3, 0), # 3 points horizontal offset | |
| textcoords="offset points", | |
| ha='left', va='center') | |
| ax.grid(True, axis='x', linestyle='--', alpha=0.7) | |
| plt.tight_layout() | |
| figures["issues_by_label"] = fig | |
| # Visualize PR insights if available | |
| if "pr_insights" in insights and "pr_code_change_stats" in insights: | |
| pr_code_stats = insights["pr_code_change_stats"] | |
| # Additions and deletions by PR | |
| if "additions" in pr_code_stats and "deletions" in pr_code_stats: | |
| fig, ax = plt.subplots(figsize=(10, 6)) | |
| categories = ["Mean", "Median", "Max"] | |
| additions = [ | |
| pr_code_stats["additions"].get("mean", 0), | |
| pr_code_stats["additions"].get("median", 0), | |
| pr_code_stats["additions"].get("max", 0) / 10 # Scale down for visibility | |
| ] | |
| deletions = [ | |
| pr_code_stats["deletions"].get("mean", 0), | |
| pr_code_stats["deletions"].get("median", 0), | |
| pr_code_stats["deletions"].get("max", 0) / 10 # Scale down for visibility | |
| ] | |
| x = range(len(categories)) | |
| width = 0.35 | |
| addition_bars = ax.bar([i - width/2 for i in x], additions, width, label='Additions', color='green') | |
| deletion_bars = ax.bar([i + width/2 for i in x], deletions, width, label='Deletions', color='red') | |
| ax.set_xlabel('Metric') | |
| ax.set_ylabel('Lines of Code') | |
| ax.set_title('PR Code Change Statistics') | |
| plt.xticks(x, categories) | |
| ax.legend() | |
| # Add value labels | |
| for bars in [addition_bars, deletion_bars]: | |
| for bar in bars: | |
| height = bar.get_height() | |
| ax.annotate(f'{int(height)}', | |
| xy=(bar.get_x() + bar.get_width() / 2, height), | |
| xytext=(0, 3), # 3 points vertical offset | |
| textcoords="offset points", | |
| ha='center', va='bottom') | |
| if "max" in pr_code_stats["additions"]: | |
| plt.annotate(f"Max: {int(pr_code_stats['additions']['max'])}", | |
| (2 - width/2, additions[2] + 5), | |
| textcoords="offset points", | |
| xytext=(0,10), | |
| ha='center') | |
| if "max" in pr_code_stats["deletions"]: | |
| plt.annotate(f"Max: {int(pr_code_stats['deletions']['max'])}", | |
| (2 + width/2, deletions[2] + 5), | |
| textcoords="offset points", | |
| xytext=(0,10), | |
| ha='center') | |
| plt.tight_layout() | |
| figures["pr_code_changes"] = fig | |
| return figures | |
| def _generate_plotly_visualizations(self, repo_data: Dict[str, Any], insights: Dict[str, Any]) -> Dict[str, Any]: | |
| """Generate interactive Plotly visualizations.""" | |
| plotly_figures = {} | |
| # Activity heatmap (commits by day and hour) | |
| if "commits" in repo_data: | |
| commits = repo_data["commits"] | |
| dates = [] | |
| for commit in commits: | |
| date_str = commit.get("date") | |
| if date_str: | |
| try: | |
| date = datetime.datetime.fromisoformat(date_str.replace('Z', '+00:00')) | |
| dates.append(date) | |
| except ValueError: | |
| pass | |
| if dates: | |
| # Group by day of week and hour | |
| day_hour_counts = defaultdict(int) | |
| for date in dates: | |
| day_hour_counts[(date.weekday(), date.hour)] += 1 | |
| # Create 2D array for heatmap | |
| days = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"] | |
| hours = list(range(24)) | |
| z = np.zeros((7, 24)) | |
| for (day, hour), count in day_hour_counts.items(): | |
| z[day][hour] = count | |
| # Create heatmap | |
| fig = go.Figure(data=go.Heatmap( | |
| z=z, | |
| x=hours, | |
| y=days, | |
| colorscale='Viridis', | |
| hoverongaps=False, | |
| hovertemplate='Day: %{y}<br>Hour: %{x}<br>Commits: %{z}<extra></extra>' | |
| )) | |
| fig.update_layout( | |
| title='Commit Activity Heatmap', | |
| xaxis_title='Hour of Day (UTC)', | |
| yaxis_title='Day of Week', | |
| yaxis={'categoryorder': 'array', 'categoryarray': days}, | |
| width=900, | |
| height=500 | |
| ) | |
| plotly_figures["commit_heatmap"] = fig | |
| # Language breakdown treemap | |
| if "languages" in repo_data: | |
| languages = repo_data["languages"] | |
| if languages: | |
| # Create data for treemap | |
| labels = list(languages.keys()) | |
| values = list(languages.values()) | |
| fig = go.Figure(go.Treemap( | |
| labels=labels, | |
| values=values, | |
| parents=[""] * len(labels), | |
| marker_colorscale='RdBu', | |
| hovertemplate='Language: %{label}<br>Bytes: %{value}<br>Percentage: %{percentRoot:.2%}<extra></extra>' | |
| )) | |
| fig.update_layout( | |
| title='Repository Language Breakdown', | |
| width=800, | |
| height=600 | |
| ) | |
| plotly_figures["language_treemap"] = fig | |
| # Issue/PR timeline | |
| issues = repo_data.get("issues", []) | |
| prs = repo_data.get("pull_requests", []) | |
| if issues or prs: | |
| # Create timeline data | |
| timeline_data = [] | |
| for issue in issues: | |
| if not issue.get("pull_request") and issue.get("created_at"): | |
| try: | |
| created_date = datetime.datetime.fromisoformat(issue["created_at"].replace('Z', '+00:00')) | |
| timeline_data.append({ | |
| "date": created_date, | |
| "type": "Issue", | |
| "id": issue.get("number", ""), | |
| "title": issue.get("title", ""), | |
| "state": issue.get("state", "") | |
| }) | |
| except ValueError: | |
| pass | |
| for pr in prs: | |
| if pr.get("created_at"): | |
| try: | |
| created_date = datetime.datetime.fromisoformat(pr["created_at"].replace('Z', '+00:00')) | |
| timeline_data.append({ | |
| "date": created_date, | |
| "type": "PR", | |
| "id": pr.get("number", ""), | |
| "title": pr.get("title", ""), | |
| "state": pr.get("state", "") | |
| }) | |
| except ValueError: | |
| pass | |
| if timeline_data: | |
| # Sort by date | |
| timeline_data.sort(key=lambda x: x["date"]) | |
| # Create DataFrame for easier plotting | |
| df = pd.DataFrame(timeline_data) | |
| # Calculate cumulative counts | |
| df["cumulative_issues"] = (df["type"] == "Issue").cumsum() | |
| df["cumulative_prs"] = (df["type"] == "PR").cumsum() | |
| # Create plot | |
| fig = go.Figure() | |
| fig.add_trace(go.Scatter( | |
| x=df["date"], | |
| y=df["cumulative_issues"], | |
| mode='lines', | |
| name='Issues', | |
| line=dict(color='red', width=2) | |
| )) | |
| fig.add_trace(go.Scatter( | |
| x=df["date"], | |
| y=df["cumulative_prs"], | |
| mode='lines', | |
| name='Pull Requests', | |
| line=dict(color='blue', width=2) | |
| )) | |
| fig.update_layout( | |
| title='Cumulative Issues and Pull Requests Over Time', | |
| xaxis_title='Date', | |
| yaxis_title='Count', | |
| legend=dict( | |
| yanchor="top", | |
| y=0.99, | |
| xanchor="left", | |
| x=0.01 | |
| ), | |
| width=900, | |
| height=500 | |
| ) | |
| plotly_figures["issue_pr_timeline"] = fig | |
| return plotly_figures | |
| def _visualize_collaboration_network(self, repo_data: Dict[str, Any], insights: Dict[str, Any]) -> Optional[plt.Figure]: | |
| """Create a visualization of the collaboration network.""" | |
| if "pull_requests" not in repo_data or "contributors" not in repo_data: | |
| return None | |
| prs = repo_data["pull_requests"] | |
| contributors = repo_data["contributors"] | |
| # Create a network of collaborations | |
| G = nx.Graph() | |
| # Add nodes (contributors) | |
| contributor_logins = [c.get("login") for c in contributors if c.get("login")] | |
| for login in contributor_logins: | |
| G.add_node(login) | |
| # Add edges (collaborations through PRs) | |
| collaborations = defaultdict(int) | |
| for pr in prs: | |
| author = pr.get("user_login") | |
| if not author or author not in contributor_logins: | |
| continue | |
| # Consider reviewers as collaborators | |
| reviewers = pr.get("requested_reviewers", []) | |
| for reviewer in reviewers: | |
| if reviewer in contributor_logins and reviewer != author: | |
| pair = tuple(sorted([author, reviewer])) | |
| collaborations[pair] += 1 | |
| for (author, reviewer), weight in collaborations.items(): | |
| G.add_edge(author, reviewer, weight=weight) | |
| if not G.edges(): | |
| return None | |
| # Draw the collaboration network | |
| fig, ax = plt.subplots(figsize=(12, 10)) | |
| # Calculate node sizes based on contributions | |
| contributor_dict = {c.get("login"): c.get("contributions", 1) for c in contributors if c.get("login")} | |
| node_sizes = [contributor_dict.get(node, 1) * 30 for node in G.nodes()] | |
| # Calculate edge widths based on collaboration count | |
| edge_widths = [G[u][v]['weight'] * 0.5 for u, v in G.edges()] | |
| # Calculate node colors based on contributor roles | |
| # (assign different colors to different types of contributors) | |
| color_map = [] | |
| for node in G.nodes(): | |
| degree = G.degree(node) | |
| if degree > 5: | |
| color_map.append('red') # Central collaborators | |
| elif degree > 2: | |
| color_map.append('blue') # Active collaborators | |
| else: | |
| color_map.append('green') # Peripheral contributors | |
| # Position nodes using a force-directed layout | |
| pos = nx.spring_layout(G, seed=42) | |
| # Draw the network | |
| nx.draw_networkx_nodes(G, pos, node_size=node_sizes, node_color=color_map, alpha=0.8) | |
| nx.draw_networkx_edges(G, pos, width=edge_widths, alpha=0.5, edge_color='gray') | |
| nx.draw_networkx_labels(G, pos, font_size=8, font_family='sans-serif') | |
| ax.set_title("Collaboration Network", fontsize=16) | |
| ax.axis('off') | |
| plt.tight_layout() | |
| return fig | |
| def analyze_repo(self, owner: str, repo_name: str) -> Dict[str, Any]: | |
| """ | |
| Main method to analyze a repository. | |
| Args: | |
| owner: GitHub username or organization | |
| repo_name: Name of the repository | |
| Returns: | |
| Dict containing all repository data and insights | |
| """ | |
| start_time = time.time() | |
| logger.info(f"Starting analysis of {owner}/{repo_name}") | |
| repo_path = f"{owner}/{repo_name}" | |
| repo = self.client.get_repo(repo_path) | |
| repo_data = {} | |
| # Collect basic repository metadata | |
| repo_data["repo_details"] = self.get_repo_details(repo) | |
| # Define data collection tasks | |
| tasks = [ | |
| ("contributors", lambda: self.get_contributors(repo)), | |
| ("languages", lambda: self.get_languages(repo)), | |
| ("issues", lambda: self.get_issues(repo, "all")), | |
| ("pull_requests", lambda: self.get_pull_requests(repo, "all")), | |
| ("commits", lambda: self.get_commits(repo)), | |
| ("readme", lambda: self.get_readme(repo)), | |
| ("branches", lambda: self.get_branches(repo)), | |
| ("releases", lambda: self.get_releases(repo)), | |
| ("workflows", lambda: self.get_workflows(repo)), | |
| ("file_distribution", lambda: self.get_file_distribution(repo)), | |
| ("collaborators", lambda: self.get_collaborators(repo)), | |
| ("commit_activity", lambda: self.analyze_commit_activity(repo)), | |
| ("contributor_activity", lambda: self.analyze_contributor_activity(repo)), | |
| ] | |
| # Search for security and quality indicators | |
| important_terms = [ | |
| "security", "vulnerability", "auth", "password", "token", | |
| "test", "spec", "fixture", "mock", "stub", | |
| "TODO", "FIXME", "HACK", "XXX" | |
| ] | |
| tasks.append(("code_search", lambda: self.search_code(repo, important_terms))) | |
| # Collect data with progress bar | |
| with tqdm(total=len(tasks), desc="Collecting repository data") as pbar: | |
| for key, task_func in tasks: | |
| try: | |
| result = task_func() | |
| repo_data[key] = result | |
| except Exception as e: | |
| logger.error(f"Error collecting {key}: {e}") | |
| finally: | |
| pbar.update(1) | |
| # Generate insights from collected data | |
| repo_data["insights"] = self.generate_insights(repo_data) | |
| # Generate visualizations | |
| if self.config.generate_visualizations: | |
| repo_data["visualizations"] = self.generate_visualizations(repo_data, repo_data["insights"]) | |
| end_time = time.time() | |
| logger.info(f"Analysis completed in {end_time - start_time:.2f} seconds") | |
| return repo_data | |
| class PDFReportGenerator: | |
| """ | |
| Class for generating comprehensive PDF reports from repository analysis data. | |
| """ | |
| def __init__(self, repo_data: Dict[str, Any], output_path: str = None): | |
| """Initialize the PDF report generator with repository data.""" | |
| self.repo_data = repo_data | |
| self.output_path = output_path or tempfile.mktemp(suffix='.pdf') | |
| self.styles = getSampleStyleSheet() | |
| # Create custom styles | |
| self.styles.add(ParagraphStyle( | |
| name='SectionTitle', | |
| parent=self.styles['Heading2'], | |
| fontSize=14, | |
| spaceAfter=10 | |
| )) | |
| self.styles.add(ParagraphStyle( | |
| name='SubsectionTitle', | |
| parent=self.styles['Heading3'], | |
| fontSize=12, | |
| spaceAfter=6 | |
| )) | |
| self.styles.add(ParagraphStyle( | |
| name='MetricsTable', | |
| parent=self.styles['Normal'], | |
| fontSize=10, | |
| alignment=TA_LEFT | |
| )) | |
| self.styles.add(ParagraphStyle( | |
| name='Small', | |
| parent=self.styles['Normal'], | |
| fontSize=8 | |
| )) | |
| self.styles.add(ParagraphStyle( | |
| name='ReportTitle', | |
| parent=self.styles['Title'], | |
| fontSize=24, | |
| alignment=TA_CENTER, | |
| spaceAfter=20 | |
| )) | |
| def generate_report(self) -> str: | |
| """ | |
| Generate a PDF report of repository analysis. | |
| Returns: | |
| str: Path to the generated PDF file | |
| """ | |
| doc = SimpleDocTemplate( | |
| self.output_path, | |
| pagesize=letter, | |
| rightMargin=72, leftMargin=72, | |
| topMargin=72, bottomMargin=72 | |
| ) | |
| elements = [] | |
| # Add report title | |
| repo_name = self.repo_data.get("repo_details", {}).get("full_name", "Repository") | |
| elements.append(Paragraph(f"GitHub Repository Analysis: {repo_name}", self.styles['ReportTitle'])) | |
| # Add report generation date | |
| report_date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| elements.append(Paragraph(f"Report generated on: {report_date}", self.styles['Normal'])) | |
| elements.append(Spacer(1, 20)) | |
| # Add repository overview section | |
| elements.extend(self._create_repo_overview()) | |
| elements.append(PageBreak()) | |
| # Add activity analysis section | |
| elements.extend(self._create_activity_analysis()) | |
| elements.append(PageBreak()) | |
| # Add code analysis section | |
| elements.extend(self._create_code_analysis()) | |
| elements.append(PageBreak()) | |
| # Add community analysis section | |
| elements.extend(self._create_community_analysis()) | |
| # Add visualizations if available | |
| if self.repo_data.get("visualizations"): | |
| elements.append(PageBreak()) | |
| elements.extend(self._create_visualization_pages()) | |
| # Add summary and recommendations | |
| elements.append(PageBreak()) | |
| elements.extend(self._create_summary_and_recommendations()) | |
| # Build the PDF | |
| doc.build(elements) | |
| return self.output_path | |
| def _create_repo_overview(self) -> List[Any]: | |
| """Create repository overview section of the report.""" | |
| elements = [] | |
| # Section title | |
| elements.append(Paragraph("Repository Overview", self.styles['Heading1'])) | |
| elements.append(Spacer(1, 10)) | |
| # Basic repository information | |
| repo_details = self.repo_data.get("repo_details", {}) | |
| # Create a table for repository details | |
| data = [ | |
| ["Name", repo_details.get("name", "N/A")], | |
| ["Full Name", repo_details.get("full_name", "N/A")], | |
| ["Description", repo_details.get("description", "No description")], | |
| ["URL", repo_details.get("html_url", "N/A")], | |
| ["Primary Language", repo_details.get("language", "Not specified")], | |
| ["Created On", repo_details.get("created_at", "N/A")], | |
| ["Last Updated", repo_details.get("updated_at", "N/A")], | |
| ["Stars", str(repo_details.get("stargazers_count", 0))], | |
| ["Forks", str(repo_details.get("forks_count", 0))], | |
| ["Watchers", str(repo_details.get("watchers_count", 0))], | |
| ["Open Issues", str(repo_details.get("open_issues_count", 0))], | |
| ["License", repo_details.get("license", "Not specified")], | |
| ["Fork", "Yes" if repo_details.get("fork", False) else "No"], | |
| ["Archived", "Yes" if repo_details.get("archived", False) else "No"], | |
| ["Visibility", repo_details.get("visibility", "N/A").capitalize()], | |
| ] | |
| table = Table(data, colWidths=[100, 350]) | |
| table.setStyle(TableStyle([ | |
| ('BACKGROUND', (0, 0), (0, -1), colors.lightgrey), | |
| ('TEXTCOLOR', (0, 0), (0, -1), colors.black), | |
| ('ALIGN', (0, 0), (0, -1), 'RIGHT'), | |
| ('ALIGN', (1, 0), (1, -1), 'LEFT'), | |
| ('VALIGN', (0, 0), (-1, -1), 'MIDDLE'), | |
| ('FONTNAME', (0, 0), (0, -1), 'Helvetica-Bold'), | |
| ('FONTSIZE', (0, 0), (-1, -1), 10), | |
| ('BOTTOMPADDING', (0, 0), (-1, -1), 6), | |
| ('TOPPADDING', (0, 0), (-1, -1), 6), | |
| ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), | |
| ])) | |
| elements.append(table) | |
| elements.append(Spacer(1, 20)) | |
| # Key metrics and insights | |
| elements.append(Paragraph("Key Metrics & Insights", self.styles['SectionTitle'])) | |
| insights = self.repo_data.get("insights", {}) | |
| # Repository age | |
| age_days = insights.get("repository_age_days", 0) | |
| age_years = age_days / 365.25 | |
| freshness_days = insights.get("freshness_days", 0) | |
| age_text = f"Repository Age: {age_years:.1f} years ({int(age_days)} days)" | |
| freshness_text = f"Last Activity: {int(freshness_days)} days ago" | |
| elements.append(Paragraph(age_text, self.styles['Normal'])) | |
| elements.append(Paragraph(freshness_text, self.styles['Normal'])) | |
| elements.append(Spacer(1, 10)) | |
| # Activity level | |
| activity_level = insights.get("activity_level", {}) | |
| if activity_level: | |
| activity_text = f"Activity Level: {activity_level.get('level', 'Unknown')} (Score: {activity_level.get('score', 0):.1f}/25)" | |
| elements.append(Paragraph(activity_text, self.styles['Normal'])) | |
| elements.append(Spacer(1, 10)) | |
| # Code complexity | |
| code_complexity = insights.get("code_complexity", {}).get("overall", {}) | |
| if code_complexity: | |
| complexity_text = f"Code Complexity: {code_complexity.get('level', 'Unknown')} (Score: {code_complexity.get('score', 0):.1f}/30)" | |
| elements.append(Paragraph(complexity_text, self.styles['Normal'])) | |
| elements.append(Spacer(1, 10)) | |
| # Documentation quality | |
| doc_quality = insights.get("documentation_quality", {}) | |
| if doc_quality: | |
| quality_score = doc_quality.get("score", 0) | |
| quality_level = "High" if quality_score > 0.7 else "Medium" if quality_score > 0.4 else "Low" | |
| doc_text = f"Documentation Quality: {quality_level} (Score: {quality_score:.2f})" | |
| elements.append(Paragraph(doc_text, self.styles['Normal'])) | |
| elements.append(Spacer(1, 10)) | |
| # Community health | |
| community_health = insights.get("community_health", {}).get("overall", {}) | |
| if community_health: | |
| health_text = f"Community Health: {community_health.get('level', 'Unknown')} (Score: {community_health.get('score', 0):.1f}/40)" | |
| elements.append(Paragraph(health_text, self.styles['Normal'])) | |
| return elements | |
| def _create_activity_analysis(self) -> List[Any]: | |
| """Create activity analysis section of the report.""" | |
| elements = [] | |
| # Section title | |
| elements.append(Paragraph("Activity Analysis", self.styles['Heading1'])) | |
| elements.append(Spacer(1, 10)) | |
| insights = self.repo_data.get("insights", {}) | |
| # Commit activity | |
| elements.append(Paragraph("Commit Activity", self.styles['SectionTitle'])) | |
| commit_insights = insights.get("commit_insights", {}) | |
| if commit_insights: | |
| # Top contributors | |
| top_contributors = commit_insights.get("top_contributors", {}) | |
| if top_contributors: | |
| elements.append(Paragraph("Top Contributors by Commits:", self.styles['SubsectionTitle'])) | |
| data = [["Contributor", "Commits"]] | |
| for contributor, commits in top_contributors.items(): | |
| data.append([contributor, str(commits)]) | |
| table = Table(data, colWidths=[200, 100]) | |
| table.setStyle(TableStyle([ | |
| ('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey), | |
| ('TEXTCOLOR', (0, 0), (-1, 0), colors.black), | |
| ('ALIGN', (0, 0), (0, -1), 'LEFT'), | |
| ('ALIGN', (1, 0), (1, -1), 'RIGHT'), | |
| ('VALIGN', (0, 0), (-1, -1), 'MIDDLE'), | |
| ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), | |
| ('FONTSIZE', (0, 0), (-1, -1), 10), | |
| ('BOTTOMPADDING', (0, 0), (-1, -1), 4), | |
| ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), | |
| ])) | |
| elements.append(table) | |
| elements.append(Spacer(1, 15)) | |
| # Commit time patterns | |
| time_patterns = commit_insights.get("commit_time_patterns", {}) | |
| if time_patterns: | |
| elements.append(Paragraph("Commit Timing Patterns:", self.styles['SubsectionTitle'])) | |
| weekday_data = time_patterns.get("by_weekday", {}) | |
| if weekday_data: | |
| day_text = "Most active day: " + max(weekday_data.items(), key=lambda x: x[1])[0] | |
| elements.append(Paragraph(day_text, self.styles['Normal'])) | |
| hour_data = time_patterns.get("by_hour", {}) | |
| if hour_data and hour_data: | |
| hour = max(hour_data.items(), key=lambda x: x[1])[0] | |
| hour_text = f"Most active hour: {hour}:00 UTC" | |
| elements.append(Paragraph(hour_text, self.styles['Normal'])) | |
| elements.append(Spacer(1, 10)) | |
| # Pull Request activity | |
| elements.append(Paragraph("Pull Request Activity", self.styles['SectionTitle'])) | |
| pr_insights = insights.get("pr_insights", {}) | |
| pr_code_changes = insights.get("pr_code_change_stats", {}) | |
| if pr_insights or pr_code_changes: | |
| # PR state distribution | |
| state_counts = pr_insights.get("by_state", {}) | |
| if state_counts: | |
| elements.append(Paragraph("Pull Request States:", self.styles['SubsectionTitle'])) | |
| data = [["State", "Count"]] | |
| for state, count in state_counts.items(): | |
| data.append([state.capitalize(), str(count)]) | |
| table = Table(data, colWidths=[100, 100]) | |
| table.setStyle(TableStyle([ | |
| ('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey), | |
| ('TEXTCOLOR', (0, 0), (-1, 0), colors.black), | |
| ('ALIGN', (0, 0), (0, -1), 'LEFT'), | |
| ('ALIGN', (1, 0), (1, -1), 'RIGHT'), | |
| ('VALIGN', (0, 0), (-1, -1), 'MIDDLE'), | |
| ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), | |
| ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), | |
| ])) | |
| elements.append(table) | |
| elements.append(Spacer(1, 15)) | |
| # PR code change statistics | |
| if pr_code_changes: | |
| elements.append(Paragraph("Pull Request Size Statistics:", self.styles['SubsectionTitle'])) | |
| # Table for code change stats | |
| data = [["Metric", "Additions", "Deletions", "Files Changed"]] | |
| metrics = ["mean", "median", "max", "total"] | |
| for metric in metrics: | |
| row = [metric.capitalize()] | |
| for stat_type in ["additions", "deletions", "changed_files"]: | |
| if stat_type in pr_code_changes and metric in pr_code_changes[stat_type]: | |
| value = pr_code_changes[stat_type][metric] | |
| row.append(f"{value:.1f}" if isinstance(value, float) else str(value)) | |
| else: | |
| row.append("N/A") | |
| data.append(row) | |
| table = Table(data, colWidths=[80, 80, 80, 80]) | |
| table.setStyle(TableStyle([ | |
| ('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey), | |
| ('TEXTCOLOR', (0, 0), (-1, 0), colors.black), | |
| ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), | |
| ('ALIGN', (0, 0), (0, -1), 'LEFT'), | |
| ('ALIGN', (1, 0), (-1, -1), 'RIGHT'), | |
| ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), | |
| ])) | |
| elements.append(table) | |
| elements.append(Spacer(1, 15)) | |
| # Issue activity | |
| elements.append(Paragraph("Issue Activity", self.styles['SectionTitle'])) | |
| issue_insights = insights.get("issue_insights", {}) | |
| if issue_insights: | |
| # Issue state distribution | |
| state_counts = issue_insights.get("by_state", {}) | |
| if state_counts: | |
| elements.append(Paragraph("Issue States:", self.styles['SubsectionTitle'])) | |
| data = [["State", "Count"]] | |
| for state, count in state_counts.items(): | |
| data.append([state.capitalize(), str(count)]) | |
| table = Table(data, colWidths=[100, 100]) | |
| table.setStyle(TableStyle([ | |
| ('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey), | |
| ('TEXTCOLOR', (0, 0), (-1, 0), colors.black), | |
| ('ALIGN', (0, 0), (0, -1), 'LEFT'), | |
| ('ALIGN', (1, 0), (1, -1), 'RIGHT'), | |
| ('VALIGN', (0, 0), (-1, -1), 'MIDDLE'), | |
| ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), | |
| ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), | |
| ])) | |
| elements.append(table) | |
| elements.append(Spacer(1, 15)) | |
| # Issue resolution time | |
| resolution_stats = issue_insights.get("resolution_time", {}) | |
| if resolution_stats: | |
| elements.append(Paragraph("Issue Resolution Time (hours):", self.styles['SubsectionTitle'])) | |
| mean_hours = resolution_stats.get("mean_hours", 0) | |
| median_hours = resolution_stats.get("median_hours", 0) | |
| if mean_hours > 24: | |
| mean_days = mean_hours / 24 | |
| mean_text = f"Mean: {mean_days:.1f} days" | |
| else: | |
| mean_text = f"Mean: {mean_hours:.1f} hours" | |
| if median_hours > 24: | |
| median_days = median_hours / 24 | |
| median_text = f"Median: {median_days:.1f} days" | |
| else: | |
| median_text = f"Median: {median_hours:.1f} hours" | |
| elements.append(Paragraph(mean_text, self.styles['Normal'])) | |
| elements.append(Paragraph(median_text, self.styles['Normal'])) | |
| elements.append(Spacer(1, 10)) | |
| # Top issue labels | |
| top_labels = issue_insights.get("by_label", {}) | |
| if top_labels: | |
| elements.append(Paragraph("Top Issue Labels:", self.styles['SubsectionTitle'])) | |
| data = [["Label", "Count"]] | |
| for label, count in list(top_labels.items())[:5]: # Top 5 labels | |
| data.append([label, str(count)]) | |
| table = Table(data, colWidths=[150, 50]) | |
| table.setStyle(TableStyle([ | |
| ('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey), | |
| ('TEXTCOLOR', (0, 0), (-1, 0), colors.black), | |
| ('ALIGN', (0, 0), (0, -1), 'LEFT'), | |
| ('ALIGN', (1, 0), (1, -1), 'RIGHT'), | |
| ('VALIGN', (0, 0), (-1, -1), 'MIDDLE'), | |
| ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), | |
| ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), | |
| ])) | |
| elements.append(table) | |
| return elements | |
| def _create_code_analysis(self) -> List[Any]: | |
| """Create code analysis section of the report.""" | |
| elements = [] | |
| # Section title | |
| elements.append(Paragraph("Code Analysis", self.styles['Heading1'])) | |
| elements.append(Spacer(1, 10)) | |
| # Language distribution | |
| elements.append(Paragraph("Language Distribution", self.styles['SectionTitle'])) | |
| languages = self.repo_data.get("languages", {}) | |
| insights = self.repo_data.get("insights", {}) | |
| if languages: | |
| # Sort languages by byte count | |
| sorted_languages = sorted(languages.items(), key=lambda x: x[1], reverse=True) | |
| # Create language distribution table | |
| data = [["Language", "Bytes", "Percentage"]] | |
| total_bytes = sum(languages.values()) | |
| for language, bytes_count in sorted_languages[:10]: # Top 10 languages | |
| percentage = (bytes_count / total_bytes) * 100 | |
| data.append([ | |
| language, | |
| f"{bytes_count:,}", | |
| f"{percentage:.1f}%" | |
| ]) | |
| table = Table(data, colWidths=[120, 120, 80]) | |
| table.setStyle(TableStyle([ | |
| ('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey), | |
| ('TEXTCOLOR', (0, 0), (-1, 0), colors.black), | |
| ('ALIGN', (0, 0), (0, -1), 'LEFT'), | |
| ('ALIGN', (1, 0), (2, -1), 'RIGHT'), | |
| ('VALIGN', (0, 0), (-1, -1), 'MIDDLE'), | |
| ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), | |
| ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), | |
| ])) | |
| elements.append(table) | |
| elements.append(Spacer(1, 15)) | |
| # File distribution | |
| elements.append(Paragraph("File Type Distribution", self.styles['SectionTitle'])) | |
| file_dist = self.repo_data.get("file_distribution", {}) | |
| if file_dist: | |
| # Group extensions by type | |
| file_types = { | |
| "Code": sum(file_dist.get(ext, 0) for ext in self.config.code_extensions), | |
| "Markup": sum(file_dist.get(ext, 0) for ext in self.config.markup_extensions), | |
| "Scripts": sum(file_dist.get(ext, 0) for ext in self.config.script_extensions), | |
| "Data": sum(file_dist.get(ext, 0) for ext in self.config.data_extensions), | |
| "Config": sum(file_dist.get(ext, 0) for ext in self.config.config_extensions), | |
| "Notebooks": sum(file_dist.get(ext, 0) for ext in self.config.notebook_extensions), | |
| "Other": sum(file_dist.get(ext, 0) for ext in self.config.other_extensions) | |
| } | |
| # Create file type distribution table | |
| data = [["File Type", "Count", "Percentage"]] | |
| total_files = sum(file_types.values()) | |
| for file_type, count in sorted(file_types.items(), key=lambda x: x[1], reverse=True): | |
| if count > 0: | |
| percentage = (count / total_files) * 100 | |
| data.append([ | |
| file_type, | |
| str(count), | |
| f"{percentage:.1f}%" | |
| ]) | |
| table = Table(data, colWidths=[120, 80, 80]) | |
| table.setStyle(TableStyle([ | |
| ('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey), | |
| ('TEXTCOLOR', (0, 0), (-1, 0), colors.black), | |
| ('ALIGN', (0, 0), (0, -1), 'LEFT'), | |
| ('ALIGN', (1, 0), (2, -1), 'RIGHT'), | |
| ('VALIGN', (0, 0), (-1, -1), 'MIDDLE'), | |
| ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), | |
| ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), | |
| ])) | |
| elements.append(table) | |
| elements.append(Spacer(1, 15)) | |
| # Code complexity analysis | |
| elements.append(Paragraph("Code Complexity Analysis", self.styles['SectionTitle'])) | |
| code_complexity = insights.get("code_complexity", {}) | |
| if code_complexity: | |
| complexity_overall = code_complexity.get("overall", {}) | |
| elements.append(Paragraph( | |
| f"Overall Complexity: {complexity_overall.get('level', 'Unknown')} (Score: {complexity_overall.get('score', 0):.1f}/30)", | |
| self.styles['Normal'] | |
| )) | |
| elements.append(Spacer(1, 10)) | |
| # Code size | |
| code_size = code_complexity.get("code_size", {}) | |
| if code_size: | |
| size_mb = code_size.get("size_mb", 0) | |
| elements.append(Paragraph(f"Code Size: {size_mb:.2f} MB", self.styles['Normal'])) | |
| elements.append(Spacer(1, 5)) | |
| # PR complexity | |
| pr_complexity = code_complexity.get("pr_complexity", {}) | |
| if pr_complexity: | |
| elements.append(Paragraph("Average Pull Request Size:", self.styles['SubsectionTitle'])) | |
| avg_additions = pr_complexity.get("avg_additions", 0) | |
| avg_deletions = pr_complexity.get("avg_deletions", 0) | |
| avg_files = pr_complexity.get("avg_changed_files", 0) | |
| elements.append(Paragraph(f"Lines Added: {avg_additions:.1f}", self.styles['Normal'])) | |
| elements.append(Paragraph(f"Lines Deleted: {avg_deletions:.1f}", self.styles['Normal'])) | |
| elements.append(Paragraph(f"Files Changed: {avg_files:.1f}", self.styles['Normal'])) | |
| elements.append(Spacer(1, 10)) | |
| # CI/CD presence | |
| elements.append(Paragraph("CI/CD Systems", self.styles['SectionTitle'])) | |
| ci_cd = insights.get("ci_cd_presence", {}) | |
| if ci_cd: | |
| has_ci_cd = ci_cd.get("has_ci_cd", False) | |
| systems = ci_cd.get("ci_cd_systems", {}) | |
| if has_ci_cd: | |
| elements.append(Paragraph("Detected CI/CD Systems:", self.styles['Normal'])) | |
| detected_systems = [name for name, present in systems.items() if present] | |
| for system in detected_systems: | |
| elements.append(Paragraph(f"• {system.replace('_', ' ').title()}", self.styles['Normal'])) | |
| else: | |
| elements.append(Paragraph("No CI/CD systems detected", self.styles['Normal'])) | |
| return elements | |
| def _create_community_analysis(self) -> List[Any]: | |
| """Create community analysis section of the report.""" | |
| elements = [] | |
| # Section title | |
| elements.append(Paragraph("Community Analysis", self.styles['Heading1'])) | |
| elements.append(Spacer(1, 10)) | |
| insights = self.repo_data.get("insights", {}) | |
| # Contributor insights | |
| elements.append(Paragraph("Contributor Analysis", self.styles['SectionTitle'])) | |
| contributor_insights = insights.get("contributor_insights", {}) | |
| if contributor_insights: | |
| contributor_count = contributor_insights.get("contributor_count", 0) | |
| total_contributions = contributor_insights.get("total_contributions", 0) | |
| avg_contributions = contributor_insights.get("avg_contributions_per_contributor", 0) | |
| elements.append(Paragraph(f"Total Contributors: {contributor_count}", self.styles['Normal'])) | |
| elements.append(Paragraph(f"Total Contributions: {total_contributions}", self.styles['Normal'])) | |
| elements.append(Paragraph(f"Average Contributions per Contributor: {avg_contributions:.1f}", self.styles['Normal'])) | |
| elements.append(Spacer(1, 10)) | |
| # Contribution distribution | |
| distribution = contributor_insights.get("contribution_distribution", {}) | |
| if distribution: | |
| elements.append(Paragraph("Contribution Distribution:", self.styles['SubsectionTitle'])) | |
| gini = distribution.get("gini_coefficient", 0) | |
| top_percent = distribution.get("top_contributor_percentage", 0) | |
| contributors_20 = distribution.get("contributors_for_20_percent", 0) | |
| contributors_50 = distribution.get("contributors_for_50_percent", 0) | |
| contributors_80 = distribution.get("contributors_for_80_percent", 0) | |
| # Format distribution metrics | |
| elements.append(Paragraph(f"Top Contributor: {top_percent:.1f}% of all contributions", self.styles['Normal'])) | |
| elements.append(Paragraph(f"Contributors for first 20% work: {contributors_20}", self.styles['Normal'])) | |
| elements.append(Paragraph(f"Contributors for first 50% work: {contributors_50}", self.styles['Normal'])) | |
| elements.append(Paragraph(f"Contributors for first 80% work: {contributors_80}", self.styles['Normal'])) | |
| elements.append(Paragraph(f"Gini Coefficient: {gini:.2f} ({'High' if gini > 0.6 else 'Medium' if gini > 0.4 else 'Low'} inequality)", self.styles['Normal'])) | |
| elements.append(Spacer(1, 15)) | |
| # Community health | |
| elements.append(Paragraph("Community Health", self.styles['SectionTitle'])) | |
| community_health = insights.get("community_health", {}) | |
| if community_health: | |
| health_overall = community_health.get("overall", {}) | |
| elements.append(Paragraph( | |
| f"Overall Health: {health_overall.get('level', 'Unknown')} (Score: {health_overall.get('score', 0):.1f}/40)", | |
| self.styles['Normal'] | |
| )) | |
| elements.append(Spacer(1, 10)) | |
| # Issue and PR responsiveness | |
| if "issue_closure_rate" in community_health: | |
| closure_rate = community_health.get("issue_closure_rate", 0) | |
| elements.append(Paragraph(f"Issue Closure Rate: {closure_rate:.1%}", self.styles['Normal'])) | |
| if "avg_issue_resolution_time_hours" in community_health: | |
| resolution_hours = community_health.get("avg_issue_resolution_time_hours", 0) | |
| if resolution_hours > 72: | |
| resolution_days = resolution_hours / 24 | |
| elements.append(Paragraph(f"Avg. Issue Resolution Time: {resolution_days:.1f} days", self.styles['Normal'])) | |
| else: | |
| elements.append(Paragraph(f"Avg. Issue Resolution Time: {resolution_hours:.1f} hours", self.styles['Normal'])) | |
| if "pr_merge_rate" in community_health: | |
| merge_rate = community_health.get("pr_merge_rate", 0) | |
| elements.append(Paragraph(f"PR Merge Rate: {merge_rate:.1%}", self.styles['Normal'])) | |
| if "avg_pr_merge_time_hours" in community_health: | |
| merge_hours = community_health.get("avg_pr_merge_time_hours", 0) | |
| if merge_hours > 72: | |
| merge_days = merge_hours / 24 | |
| elements.append(Paragraph(f"Avg. PR Merge Time: {merge_days:.1f} days", self.styles['Normal'])) | |
| else: | |
| elements.append(Paragraph(f"Avg. PR Merge Time: {merge_hours:.1f} hours", self.styles['Normal'])) | |
| elements.append(Spacer(1, 10)) | |
| # Community guidelines | |
| community_files = community_health.get("community_guidelines", {}) | |
| if community_files: | |
| elements.append(Paragraph("Community Guidelines:", self.styles['SubsectionTitle'])) | |
| files = [ | |
| ("CONTRIBUTING.md", "Contributing Guidelines"), | |
| ("CODE_OF_CONDUCT.md", "Code of Conduct"), | |
| ("SECURITY.md", "Security Policy"), | |
| ("SUPPORT.md", "Support Information"), | |
| ("GOVERNANCE.md", "Governance Model") | |
| ] | |
| data = [["Guideline", "Present"]] | |
| for file_name, display_name in files: | |
| present = community_files.get(file_name, False) | |
| data.append([display_name, "✓" if present else "✗"]) | |
| table = Table(data, colWidths=[150, 50]) | |
| table.setStyle(TableStyle([ | |
| ('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey), | |
| ('TEXTCOLOR', (0, 0), (-1, 0), colors.black), | |
| ('ALIGN', (0, 0), (0, -1), 'LEFT'), | |
| ('ALIGN', (1, 0), (1, -1), 'CENTER'), | |
| ('VALIGN', (0, 0), (-1, -1), 'MIDDLE'), | |
| ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), | |
| ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), | |
| ('TEXTCOLOR', (1, 1), (1, -1), lambda row, col: colors.green if data[row][col] == "✓" else colors.red), | |
| ])) | |
| elements.append(table) | |
| elements.append(Spacer(1, 15)) | |
| # Documentation quality | |
| elements.append(Paragraph("Documentation Analysis", self.styles['SectionTitle'])) | |
| doc_quality = insights.get("documentation_quality", {}) | |
| if doc_quality: | |
| has_readme = doc_quality.get("has_readme", False) | |
| if has_readme: | |
| quality_score = doc_quality.get("score", 0) | |
| quality_level = "High" if quality_score > 0.7 else "Medium" if quality_score > 0.4 else "Low" | |
| word_count = doc_quality.get("readme_length", 0) | |
| elements.append(Paragraph(f"README Quality: {quality_level} (Score: {quality_score:.2f})", self.styles['Normal'])) | |
| elements.append(Paragraph(f"README Length: {word_count} words", self.styles['Normal'])) | |
| elements.append(Spacer(1, 10)) | |
| # Section analysis | |
| sections = doc_quality.get("sections", {}) | |
| if sections: | |
| elements.append(Paragraph("README Sections Present:", self.styles['SubsectionTitle'])) | |
| section_labels = { | |
| "introduction": "Introduction/Overview", | |
| "installation": "Installation Instructions", | |
| "usage": "Usage Examples", | |
| "api": "API Documentation", | |
| "contributing": "Contributing Guidelines", | |
| "license": "License Information", | |
| "code_of_conduct": "Code of Conduct" | |
| } | |
| data = [["Section", "Present"]] | |
| for section_key, section_label in section_labels.items(): | |
| present = sections.get(section_key, False) | |
| data.append([section_label, "✓" if present else "✗"]) | |
| table = Table(data, colWidths=[150, 50]) | |
| table.setStyle(TableStyle([ | |
| ('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey), | |
| ('TEXTCOLOR', (0, 0), (-1, 0), colors.black), | |
| ('ALIGN', (0, 0), (0, -1), 'LEFT'), | |
| ('ALIGN', (1, 0), (1, -1), 'CENTER'), | |
| ('VALIGN', (0, 0), (-1, -1), 'MIDDLE'), | |
| ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), | |
| ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), | |
| ('TEXTCOLOR', (1, 1), (1, -1), lambda row, col: colors.green if data[row][col] == "✓" else colors.red), | |
| ])) | |
| elements.append(table) | |
| elements.append(Spacer(1, 10)) | |
| # Additional doc quality metrics | |
| has_images = doc_quality.get("has_images", False) | |
| has_code = doc_quality.get("has_code_examples", False) | |
| metrics_text = "Additional Features: " | |
| if has_images: | |
| img_count = doc_quality.get("image_count", 0) | |
| metrics_text += f"{img_count} images/diagrams, " | |
| if has_code: | |
| code_blocks = doc_quality.get("code_block_count", 0) | |
| metrics_text += f"{code_blocks} code examples" | |
| if has_images or has_code: | |
| elements.append(Paragraph(metrics_text, self.styles['Normal'])) | |
| else: | |
| elements.append(Paragraph("No README file found.", self.styles['Normal'])) | |
| return elements | |
| def _create_visualization_pages(self) -> List[Any]: | |
| """Create pages with visualizations.""" | |
| elements = [] | |
| # Section title | |
| elements.append(Paragraph("Visualizations", self.styles['Heading1'])) | |
| elements.append(Spacer(1, 10)) | |
| visualizations = self.repo_data.get("visualizations", {}) | |
| # Organize visualizations by category | |
| categories = { | |
| "Language Analysis": ["language_distribution", "language_treemap"], | |
| "Commit Activity": ["weekly_commits", "code_frequency", "commits_by_weekday", "commits_by_hour", "commit_heatmap"], | |
| "Contributor Analysis": ["top_contributors", "contribution_distribution", "collaboration_network"], | |
| "Issue & PR Analysis": ["issues_by_state", "issues_by_month", "issues_by_label", "pr_code_changes", "issue_pr_timeline"] | |
| } | |
| # Add visualizations by category | |
| for category, viz_keys in categories.items(): | |
| category_visualizations = [key for key in viz_keys if key in visualizations] | |
| if category_visualizations: | |
| elements.append(Paragraph(category, self.styles['SectionTitle'])) | |
| elements.append(Spacer(1, 10)) | |
| for viz_key in category_visualizations: | |
| fig = visualizations.get(viz_key) | |
| if fig: | |
| # Save figure to a temporary buffer | |
| img_buffer = BytesIO() | |
| if isinstance(fig, go.Figure): | |
| # Handle Plotly figures | |
| fig.write_image(img_buffer, format="png", width=800, height=500) | |
| else: | |
| # Handle Matplotlib figures | |
| fig.savefig(img_buffer, format="png", dpi=150) | |
| img_buffer.seek(0) | |
| img = Image(img_buffer, width=6*inch, height=4*inch) | |
| # Add caption | |
| caption = viz_key.replace("_", " ").title() | |
| elements.append(Paragraph(caption, self.styles['SubsectionTitle'])) | |
| elements.append(img) | |
| elements.append(Spacer(1, 20)) | |
| # Add page break after each category | |
| elements.append(PageBreak()) | |
| return elements | |
| def _create_summary_and_recommendations(self) -> List[Any]: | |
| """Create summary and recommendations section.""" | |
| elements = [] | |
| # Section title | |
| elements.append(Paragraph("Summary & Recommendations", self.styles['Heading1'])) | |
| elements.append(Spacer(1, 10)) | |
| # Repository summary | |
| elements.append(Paragraph("Project Summary", self.styles['SectionTitle'])) | |
| insights = self.repo_data.get("insights", {}) | |
| repo_details = self.repo_data.get("repo_details", {}) | |
| # Short description of the project | |
| repo_name = repo_details.get("name", "The repository") | |
| repo_desc = repo_details.get("description", "") | |
| primary_lang = repo_details.get("language", "various languages") | |
| summary_text = f"{repo_name} is a {primary_lang} project" | |
| if repo_desc: | |
| summary_text += f" that {repo_desc.lower() if repo_desc[0].isupper() else repo_desc}" | |
| summary_text += "." | |
| elements.append(Paragraph(summary_text, self.styles['Normal'])) | |
| elements.append(Spacer(1, 10)) | |
| # Key metrics summary | |
| community_health = insights.get("community_health", {}).get("overall", {}) | |
| activity_level = insights.get("activity_level", {}) | |
| code_complexity = insights.get("code_complexity", {}).get("overall", {}) | |
| metrics_text = f"The project has {repo_details.get('stargazers_count', 0)} stars and {repo_details.get('forks_count', 0)} forks." | |
| if "contributor_insights" in insights: | |
| contributor_count = insights["contributor_insights"].get("contributor_count", 0) | |
| metrics_text += f" It has {contributor_count} contributors" | |
| gini = insights["contributor_insights"].get("contribution_distribution", {}).get("gini_coefficient", 0) | |
| if gini > 0.7: | |
| metrics_text += " with a highly centralized contribution pattern" | |
| elif gini > 0.4: | |
| metrics_text += " with a moderately distributed contribution pattern" | |
| else: | |
| metrics_text += " with a well-distributed contribution pattern" | |
| metrics_text += "." | |
| elements.append(Paragraph(metrics_text, self.styles['Normal'])) | |
| elements.append(Spacer(1, 10)) | |
| # Activity summary | |
| if activity_level: | |
| activity_text = f"The project shows {activity_level.get('level', 'Unknown').lower()} activity levels" | |
| # Add activity context | |
| if activity_level.get('level') in ["High", "Very High"]: | |
| activity_text += " with regular commits and issue management." | |
| elif activity_level.get('level') in ["Medium"]: | |
| activity_text += " with moderate development progress." | |
| else: | |
| activity_text += " with limited recent development." | |
| elements.append(Paragraph(activity_text, self.styles['Normal'])) | |
| elements.append(Spacer(1, 10)) | |
| # Code quality summary | |
| if code_complexity: | |
| complexity_text = f"The codebase has {code_complexity.get('level', 'Unknown').lower()} complexity" | |
| if code_complexity.get('level') in ["High", "Very High"]: | |
| complexity_text += ", which may present challenges for new contributors and maintenance." | |
| elif code_complexity.get('level') in ["Medium", "Medium-High"]: | |
| complexity_text += " with a reasonable balance between functionality and maintainability." | |
| else: | |
| complexity_text += " and should be relatively straightforward to understand and maintain." | |
| elements.append(Paragraph(complexity_text, self.styles['Normal'])) | |
| elements.append(Spacer(1, 10)) | |
| # Community health summary | |
| if community_health: | |
| health_text = f"The project demonstrates {community_health.get('level', 'Unknown').lower()} community health" | |
| if community_health.get('level') in ["Excellent", "Very Good", "Good"]: | |
| health_text += " with responsive maintainers and clear contribution guidelines." | |
| elif community_health.get('level') in ["Fair"]: | |
| health_text += " with some community structures in place." | |
| else: | |
| health_text += " with opportunities for improved community engagement." | |
| elements.append(Paragraph(health_text, self.styles['Normal'])) | |
| elements.append(Spacer(1, 15)) | |
| # Recommendations | |
| elements.append(Paragraph("Recommendations", self.styles['SectionTitle'])) | |
| recommendations = [] | |
| # Documentation recommendations | |
| doc_quality = insights.get("documentation_quality", {}) | |
| if doc_quality: | |
| score = doc_quality.get("score", 0) | |
| if score < 0.4: | |
| recommendations.append("Improve documentation by adding more comprehensive README content, including usage examples and API documentation.") | |
| elif score < 0.7: | |
| recommendations.append("Enhance existing documentation with more examples and clearer installation instructions.") | |
| sections = doc_quality.get("sections", {}) | |
| missing_key_sections = [] | |
| if not sections.get("installation", False): | |
| missing_key_sections.append("installation instructions") | |
| if not sections.get("usage", False): | |
| missing_key_sections.append("usage examples") | |
| if missing_key_sections: | |
| recommendations.append(f"Add missing documentation sections: {', '.join(missing_key_sections)}.") | |
| # Community recommendations | |
| community_files = insights.get("community_health", {}).get("community_guidelines", {}) | |
| if community_files: | |
| missing_guidelines = [] | |
| if not community_files.get("CONTRIBUTING.md", False): | |
| missing_guidelines.append("contribution guidelines") | |
| if not community_files.get("CODE_OF_CONDUCT.md", False): | |
| missing_guidelines.append("code of conduct") | |
| if missing_guidelines: | |
| recommendations.append(f"Create missing community files: {', '.join(missing_guidelines)}.") | |
| # Issue management recommendations | |
| issue_insights = insights.get("issue_insights", {}) | |
| if issue_insights: | |
| resolution_time = issue_insights.get("resolution_time", {}).get("mean_hours", 0) | |
| if resolution_time > 168: # 1 week | |
| recommendations.append("Improve issue response time to enhance user experience and community engagement.") | |
| # Code complexity recommendations | |
| if code_complexity and code_complexity.get('level') in ["High", "Very High"]: | |
| recommendations.append("Consider refactoring complex parts of the codebase to improve maintainability.") | |
| # CI/CD recommendations | |
| ci_cd = insights.get("ci_cd_presence", {}) | |
| if not ci_cd.get("has_ci_cd", False): | |
| recommendations.append("Implement CI/CD pipelines (e.g., GitHub Actions) to automate testing and deployment.") | |
| # Activity recommendations | |
| if activity_level and activity_level.get('level') in ["Low", "Very Low", "None"]: | |
| recommendations.append("Revitalize project with regular updates and community engagement to attract more contributors.") | |
| # Add recommendations to the report | |
| if recommendations: | |
| for i, recommendation in enumerate(recommendations, 1): | |
| elements.append(Paragraph(f"{i}. {recommendation}", self.styles['Normal'])) | |
| elements.append(Spacer(1, 5)) | |
| else: | |
| elements.append(Paragraph("This project follows good development practices and no significant improvements are needed at this time.", self.styles['Normal'])) | |
| return elements | |
| class RAGHelper: | |
| """ | |
| Helper class for Retrieval Augmented Generation (RAG) to enhance chatbot responses | |
| with repository insights. | |
| """ | |
| def __init__(self, repo_data: Dict[str, Any]): | |
| """Initialize with repository data.""" | |
| self.repo_data = repo_data | |
| self.insights = repo_data.get("insights", {}) | |
| # Extract key information for easy retrieval | |
| self._extract_key_info() | |
| def _extract_key_info(self): | |
| """Extract and organize key information from repository data.""" | |
| self.repo_info = {} | |
| # Basic repository details | |
| if "repo_details" in self.repo_data: | |
| details = self.repo_data["repo_details"] | |
| self.repo_info["name"] = details.get("name", "") | |
| self.repo_info["full_name"] = details.get("full_name", "") | |
| self.repo_info["description"] = details.get("description", "") | |
| self.repo_info["url"] = details.get("html_url", "") | |
| self.repo_info["stars"] = details.get("stargazers_count", 0) | |
| self.repo_info["forks"] = details.get("forks_count", 0) | |
| self.repo_info["language"] = details.get("language", "") | |
| self.repo_info["created_at"] = details.get("created_at", "") | |
| self.repo_info["license"] = details.get("license", "") | |
| # Languages used | |
| if "languages" in self.repo_data: | |
| languages = self.repo_data["languages"] | |
| total_bytes = sum(languages.values()) if languages else 0 | |
| if total_bytes > 0: | |
| language_percentages = { | |
| lang: (bytes_count / total_bytes) * 100 | |
| for lang, bytes_count in languages.items() | |
| } | |
| self.repo_info["language_breakdown"] = language_percentages | |
| sorted_languages = sorted(language_percentages.items(), key=lambda x: x[1], reverse=True) | |
| self.repo_info["top_languages"] = sorted_languages[:5] | |
| # Contributors | |
| if "contributors" in self.repo_data: | |
| contributors = self.repo_data["contributors"] | |
| self.repo_info["total_contributors"] = len(contributors) | |
| if contributors: | |
| sorted_contributors = sorted(contributors, key=lambda x: x.get("contributions", 0), reverse=True) | |
| self.repo_info["top_contributors"] = [ | |
| { | |
| "name": c.get("login", "Unknown"), | |
| "contributions": c.get("contributions", 0) | |
| } | |
| for c in sorted_contributors[:5] | |
| ] | |
| # Activity metrics | |
| if "commit_insights" in self.insights: | |
| commit_insights = self.insights["commit_insights"] | |
| self.repo_info["commit_patterns"] = commit_insights.get("commit_time_patterns", {}) | |
| self.repo_info["top_committers"] = commit_insights.get("top_contributors", {}) | |
| # Documentation quality | |
| if "documentation_quality" in self.insights: | |
| doc_quality = self.insights["documentation_quality"] | |
| self.repo_info["documentation_score"] = doc_quality.get("score", 0) | |
| self.repo_info["documentation_quality"] = ( | |
| "High" if doc_quality.get("score", 0) > 0.7 | |
| else "Medium" if doc_quality.get("score", 0) > 0.4 | |
| else "Low" | |
| ) | |
| self.repo_info["readme_sections"] = doc_quality.get("sections", {}) | |
| # Community health | |
| if "community_health" in self.insights: | |
| community_health = self.insights["community_health"] | |
| self.repo_info["community_health_level"] = community_health.get("overall", {}).get("level", "Unknown") | |
| self.repo_info["community_guidelines"] = community_health.get("community_guidelines", {}) | |
| # Activity level | |
| if "activity_level" in self.insights: | |
| activity_level = self.insights["activity_level"] | |
| self.repo_info["activity_level"] = activity_level.get("level", "Unknown") | |
| # Code complexity | |
| if "code_complexity" in self.insights: | |
| code_complexity = self.insights["code_complexity"] | |
| self.repo_info["code_complexity_level"] = code_complexity.get("overall", {}).get("level", "Unknown") | |
| def get_context_for_query(self, query: str) -> str: | |
| """ | |
| Retrieve relevant context from repository data based on the query. | |
| Args: | |
| query: The user's query | |
| Returns: | |
| str: Contextual information to enhance the response | |
| """ | |
| # Convert query to lowercase for easier matching | |
| query_lower = query.lower() | |
| # Define keywords for different aspects of the repository | |
| keywords = { | |
| "overview": ["overview", "about", "what is", "tell me about", "summary"], | |
| "languages": ["language", "programming language", "code language", "tech stack"], | |
| "contributors": ["contributor", "who", "team", "maintainer", "author"], | |
| "activity": ["activity", "active", "commit", "update", "recent", "frequency"], | |
| "documentation": ["documentation", "docs", "readme", "well documented"], | |
| "community": ["community", "health", "governance", "conduct", "guideline"], | |
| "complexity": ["complex", "complexity", "difficult", "simple", "codebase", "understand"], | |
| "issues": ["issue", "bug", "problem", "ticket", "feature request"], | |
| "pulls": ["pull request", "pr", "merge", "contribution"], | |
| } | |
| # Check which aspects are relevant to the query | |
| relevant_aspects = [] | |
| for aspect, terms in keywords.items(): | |
| if any(term in query_lower for term in terms): | |
| relevant_aspects.append(aspect) | |
| # If no specific aspects are identified, provide a general overview | |
| if not relevant_aspects: | |
| relevant_aspects = ["overview"] | |
| # Build context information based on relevant aspects | |
| context_parts = [] | |
| # Repository overview | |
| if "overview" in relevant_aspects: | |
| repo_name = self.repo_info.get("full_name", "The repository") | |
| stars = self.repo_info.get("stars", 0) | |
| forks = self.repo_info.get("forks", 0) | |
| description = self.repo_info.get("description", "") | |
| overview = f"{repo_name} is a GitHub repository with {stars} stars and {forks} forks. " | |
| if description: | |
| overview += f"Description: {description}. " | |
| language = self.repo_info.get("language", "") | |
| if language: | |
| overview += f"It's primarily written in {language}. " | |
| created_at = self.repo_info.get("created_at", "") | |
| if created_at: | |
| try: | |
| date = datetime.datetime.fromisoformat(created_at.replace('Z', '+00:00')) | |
| overview += f"The repository was created on {date.strftime('%B %d, %Y')}. " | |
| except (ValueError, AttributeError): | |
| pass | |
| context_parts.append(overview) | |
| # Language breakdown | |
| if "languages" in relevant_aspects: | |
| top_languages = self.repo_info.get("top_languages", []) | |
| if top_languages: | |
| languages_text = "Language breakdown: " | |
| languages_text += ", ".join([f"{lang}: {pct:.1f}%" for lang, pct in top_languages]) | |
| languages_text += "." | |
| context_parts.append(languages_text) | |
| # Contributors | |
| if "contributors" in relevant_aspects: | |
| total_contributors = self.repo_info.get("total_contributors", 0) | |
| top_contributors = self.repo_info.get("top_contributors", []) | |
| contributors_text = f"The repository has {total_contributors} contributors. " | |
| if top_contributors: | |
| contributors_text += "Top contributors: " | |
| contributors_text += ", ".join([ | |
| f"{c['name']} ({c['contributions']} commits)" | |
| for c in top_contributors | |
| ]) | |
| contributors_text += "." | |
| context_parts.append(contributors_text) | |
| # Activity metrics | |
| if "activity" in relevant_aspects: | |
| activity_level = self.repo_info.get("activity_level", "Unknown") | |
| activity_text = f"Activity level: {activity_level}. " | |
| commit_patterns = self.repo_info.get("commit_patterns", {}) | |
| by_weekday = commit_patterns.get("by_weekday", {}) | |
| if by_weekday: | |
| most_active_day = max(by_weekday.items(), key=lambda x: x[1])[0] | |
| activity_text += f"Most active day of the week: {most_active_day}. " | |
| context_parts.append(activity_text) | |
| # Documentation quality | |
| if "documentation" in relevant_aspects: | |
| doc_quality = self.repo_info.get("documentation_quality", "Unknown") | |
| doc_score = self.repo_info.get("documentation_score", 0) | |
| docs_text = f"Documentation quality: {doc_quality} (score: {doc_score:.2f}/1.0). " | |
| readme_sections = self.repo_info.get("readme_sections", {}) | |
| if readme_sections: | |
| present_sections = [k for k, v in readme_sections.items() if v] | |
| missing_sections = [k for k, v in readme_sections.items() if not v] | |
| if present_sections: | |
| docs_text += f"README includes sections on: {', '.join(present_sections)}. " | |
| if missing_sections: | |
| docs_text += f"README is missing sections on: {', '.join(missing_sections)}." | |
| context_parts.append(docs_text) | |
| # Community health | |
| if "community" in relevant_aspects: | |
| health_level = self.repo_info.get("community_health_level", "Unknown") | |
| guidelines = self.repo_info.get("community_guidelines", {}) | |
| community_text = f"Community health: {health_level}. " | |
| if guidelines: | |
| present_guidelines = [k for k, v in guidelines.items() if v] | |
| missing_guidelines = [k for k, v in guidelines.items() if not v] | |
| if present_guidelines: | |
| community_text += f"Has community files: {', '.join(present_guidelines)}. " | |
| if missing_guidelines: | |
| community_text += f"Missing community files: {', '.join(missing_guidelines)}." | |
| context_parts.append(community_text) | |
| # Code complexity | |
| if "complexity" in relevant_aspects: | |
| complexity_level = self.repo_info.get("code_complexity_level", "Unknown") | |
| complexity_text = f"Code complexity: {complexity_level}." | |
| context_parts.append(complexity_text) | |
| # Issues | |
| if "issues" in relevant_aspects and "issue_insights" in self.insights: | |
| issue_insights = self.insights["issue_insights"] | |
| by_state = issue_insights.get("by_state", {}) | |
| issues_text = "Issues: " | |
| if by_state: | |
| issues_text += ", ".join([f"{count} {state}" for state, count in by_state.items()]) | |
| issues_text += ". " | |
| resolution_time = issue_insights.get("resolution_time", {}) | |
| if resolution_time: | |
| mean_hours = resolution_time.get("mean_hours", 0) | |
| if mean_hours > 24: | |
| mean_days = mean_hours / 24 | |
| issues_text += f"Average resolution time: {mean_days:.1f} days." | |
| else: | |
| issues_text += f"Average resolution time: {mean_hours:.1f} hours." | |
| context_parts.append(issues_text) | |
| # Pull requests | |
| if "pulls" in relevant_aspects and "pr_insights" in self.insights: | |
| pr_insights = self.insights["pr_insights"] | |
| by_state = pr_insights.get("by_state", {}) | |
| prs_text = "Pull Requests: " | |
| if by_state: | |
| prs_text += ", ".join([f"{count} {state}" for state, count in by_state.items()]) | |
| prs_text += ". " | |
| context_parts.append(prs_text) | |
| # Join all context parts | |
| context = " ".join(context_parts) | |
| return context | |
| def create_gradio_interface(): | |
| """ | |
| Create and launch the Gradio interface for GitHub repository analysis. | |
| """ | |
| # Styling | |
| css = """ | |
| .gradio-container {max-width: 100% !important} | |
| .main-analysis-area {min-height: 600px} | |
| .analysis-result {overflow-y: auto; max-height: 500px} | |
| .chat-interface {border: 1px solid #ccc; border-radius: 5px; padding: 10px} | |
| .pdf-download {margin-top: 20px} | |
| """ | |
| # Initialize state | |
| repo_data = {} | |
| analyzer = None | |
| def parse_repo_url(url: str) -> Tuple[str, str]: | |
| """Parse GitHub repository URL into owner and repo name.""" | |
| # Pattern for GitHub repo URLs | |
| patterns = [ | |
| r"github\.com\/([^\/]+)\/([^\/]+)", # github.com/owner/repo | |
| r"github\.com\/([^\/]+)\/([^\/]+)\/?$", # github.com/owner/repo/ | |
| r"github\.com\/([^\/]+)\/([^\/]+)\.git", # github.com/owner/repo.git | |
| ] | |
| for pattern in patterns: | |
| match = re.search(pattern, url) | |
| if match: | |
| return match.group(1), match.group(2) | |
| return None, None | |
| def analyze_repository(repo_url: str, is_private: bool, github_token: str = None, progress=gr.Progress()) -> Tuple[str, Dict]: | |
| """Analyze GitHub repository and return the analysis results.""" | |
| # Validate URL and extract owner/repo | |
| owner, repo_name = parse_repo_url(repo_url) | |
| if not owner or not repo_name: | |
| return "Invalid GitHub repository URL. Please use format: https://github.com/owner/repo", {} | |
| # Use provided token or default token | |
| token = github_token if is_private and github_token else os.environ.get("GITHUB_TOKEN", "") | |
| if is_private and not token: | |
| return "GitHub token is required for private repositories.", {} | |
| # Configure analyzer | |
| config = GitHubAPIConfig(token=token) | |
| nonlocal analyzer | |
| analyzer = GitHubRepoAnalyzer(config) | |
| # Analyze repository with progress updates | |
| progress(0, desc="Starting repository analysis...") | |
| try: | |
| progress(0.1, desc="Fetching repository details...") | |
| global repo_data | |
| repo_data = analyzer.analyze_repo(owner, repo_name) | |
| progress(0.9, desc="Generating insights...") | |
| # Create a summary of the analysis | |
| repo_details = repo_data.get("repo_details", {}) | |
| insights = repo_data.get("insights", {}) | |
| repo_name = repo_details.get("full_name", "") | |
| description = repo_details.get("description", "No description provided") | |
| stars = repo_details.get("stargazers_count", 0) | |
| forks = repo_details.get("forks_count", 0) | |
| language = repo_details.get("language", "Unknown") | |
| # Calculate age | |
| created_at = repo_details.get("created_at", "") | |
| age_str = "Unknown" | |
| if created_at: | |
| try: | |
| created_date = datetime.datetime.fromisoformat(created_at.replace('Z', '+00:00')) | |
| age_days = (datetime.datetime.now(datetime.timezone.utc) - created_date).days | |
| age_years = age_days / 365.25 | |
| age_str = f"{age_years:.1f} years ({age_days} days)" | |
| except (ValueError, AttributeError): | |
| pass | |
| # Get activity level | |
| activity_level = insights.get("activity_level", {}).get("level", "Unknown") | |
| # Documentation quality | |
| doc_quality = insights.get("documentation_quality", {}) | |
| has_readme = doc_quality.get("has_readme", False) | |
| doc_score = doc_quality.get("score", 0) if has_readme else 0 | |
| doc_quality_level = "High" if doc_score > 0.7 else "Medium" if doc_score > 0.4 else "Low" | |
| # Community health | |
| community_health = insights.get("community_health", {}).get("overall", {}) | |
| health_level = community_health.get("level", "Unknown") | |
| # Code complexity | |
| code_complexity = insights.get("code_complexity", {}).get("overall", {}) | |
| complexity_level = code_complexity.get("level", "Unknown") | |
| # Create summary HTML | |
| summary_html = f""" | |
| <h1>{repo_name}</h1> | |
| <p><strong>Description:</strong> {description}</p> | |
| <div style="display: flex; flex-wrap: wrap; gap: 20px; margin-bottom: 20px;"> | |
| <div style="flex: 1; min-width: 200px;"> | |
| <h3>Repository Details</h3> | |
| <ul> | |
| <li><strong>Primary Language:</strong> {language}</li> | |
| <li><strong>Stars:</strong> {stars}</li> | |
| <li><strong>Forks:</strong> {forks}</li> | |
| <li><strong>Age:</strong> {age_str}</li> | |
| <li><strong>License:</strong> {repo_details.get("license", "Not specified")}</li> | |
| </ul> | |
| </div> | |
| <div style="flex: 1; min-width: 200px;"> | |
| <h3>Key Insights</h3> | |
| <ul> | |
| <li><strong>Activity Level:</strong> {activity_level}</li> | |
| <li><strong>Documentation Quality:</strong> {doc_quality_level}</li> | |
| <li><strong>Community Health:</strong> {health_level}</li> | |
| <li><strong>Code Complexity:</strong> {complexity_level}</li> | |
| </ul> | |
| </div> | |
| </div> | |
| """ | |
| # Contributors section | |
| contributors = repo_data.get("contributors", []) | |
| if contributors: | |
| top_contributors = sorted(contributors, key=lambda x: x.get("contributions", 0), reverse=True)[:5] | |
| summary_html += f""" | |
| <div style="margin-bottom: 20px;"> | |
| <h3>Top Contributors</h3> | |
| <div style="display: flex; flex-wrap: wrap; gap: 10px;"> | |
| """ | |
| for contributor in top_contributors: | |
| avatar_url = contributor.get("avatar_url", "") | |
| login = contributor.get("login", "Unknown") | |
| contributions = contributor.get("contributions", 0) | |
| summary_html += f""" | |
| <div style="text-align: center; width: 100px;"> | |
| <img src="{avatar_url}" style="width: 50px; height: 50px; border-radius: 25px; margin-bottom: 5px;"> | |
| <div><strong>{login}</strong></div> | |
| <div>{contributions} commits</div> | |
| </div> | |
| """ | |
| summary_html += """ | |
| </div> | |
| </div> | |
| """ | |
| # Language distribution section | |
| languages = repo_data.get("languages", {}) | |
| if languages: | |
| total_bytes = sum(languages.values()) | |
| language_percentages = [ | |
| (lang, bytes_count, (bytes_count / total_bytes) * 100) | |
| for lang, bytes_count in languages.items() | |
| ] | |
| sorted_languages = sorted(language_percentages, key=lambda x: x[1], reverse=True)[:5] | |
| summary_html += f""" | |
| <div style="margin-bottom: 20px;"> | |
| <h3>Language Distribution</h3> | |
| <div style="display: flex; flex-direction: column; gap: 5px;"> | |
| """ | |
| for lang, bytes_count, percentage in sorted_languages: | |
| bar_width = max(1, min(100, percentage)) | |
| summary_html += f""" | |
| <div> | |
| <div style="display: flex; align-items: center; gap: 10px;"> | |
| <div style="width: 100px; text-align: right;"><strong>{lang}</strong></div> | |
| <div style="flex-grow: 1; background-color: #eee; height: 20px; border-radius: 10px;"> | |
| <div style="width: {bar_width}%; background-color: #4CAF50; height: 100%; border-radius: 10px;"></div> | |
| </div> | |
| <div style="width: 60px;">{percentage:.1f}%</div> | |
| </div> | |
| </div> | |
| """ | |
| summary_html += """ | |
| </div> | |
| </div> | |
| """ | |
| progress(1.0, desc="Analysis complete!") | |
| return summary_html, repo_data | |
| except Exception as e: | |
| error_message = f"Error analyzing repository: {str(e)}" | |
| logger.error(error_message) | |
| return error_message, {} | |
| def generate_pdf_report() -> Tuple[str, Dict[str, str]]: | |
| """Generate and download PDF report.""" | |
| if not repo_data: | |
| return "Please analyze a repository first.", {} | |
| try: | |
| # Create PDF report | |
| pdf_generator = PDFReportGenerator(repo_data) | |
| pdf_path = pdf_generator.generate_report() | |
| # Return file path for download | |
| repo_name = repo_data.get("repo_details", {}).get("full_name", "repository").replace("/", "_") | |
| return f"PDF report generated for {repo_name}", {"report.pdf": pdf_path} | |
| except Exception as e: | |
| error_message = f"Error generating PDF report: {str(e)}" | |
| logger.error(error_message) | |
| return error_message, {} | |
| def chat_with_repo(query: str, history: List[Tuple[str, str]]) -> str: | |
| """ | |
| Chat with the repository analysis data using RAG approach. | |
| Args: | |
| query: User's question | |
| history: Chat history | |
| Returns: | |
| str: Response to the user's question | |
| """ | |
| if not repo_data: | |
| return "Please analyze a repository first before asking questions." | |
| try: | |
| # Use RAG helper to get relevant context | |
| rag_helper = RAGHelper(repo_data) | |
| context = rag_helper.get_context_for_query(query) | |
| # For a real implementation, you would use the Gemini API here | |
| # This is a simulated response based on the context | |
| # Format response based on the query and context | |
| response = "" | |
| # Extract repo name for more natural responses | |
| repo_name = repo_data.get("repo_details", {}).get("name", "The repository") | |
| # General info about the repo | |
| if any(term in query.lower() for term in ["what is", "tell me about", "overview", "about"]): | |
| response = f"{context}\n\nIs there something specific about {repo_name} you'd like to know more about?" | |
| # Language related queries | |
| elif any(term in query.lower() for term in ["language", "programming", "written in"]): | |
| response = f"{context}\n\nWould you like to know more about any specific language used in {repo_name}?" | |
| # Contributor related queries | |
| elif any(term in query.lower() for term in ["contributor", "who", "maintain", "author"]): | |
| response = f"{context}\n\nI can provide more details about specific contributors if you're interested." | |
| # Activity related queries | |
| elif any(term in query.lower() for term in ["active", "activity", "commit", "frequency"]): | |
| response = f"{context}\n\nWould you like to see visualizations of the commit activity patterns?" | |
| # Documentation related queries | |
| elif any(term in query.lower() for term in ["document", "readme", "docs"]): | |
| response = f"{context}\n\nIs there a specific aspect of the documentation you'd like feedback on?" | |
| # Code complexity related queries | |
| elif any(term in query.lower() for term in ["complex", "difficulty", "understand"]): | |
| response = f"{context}\n\nWould you like suggestions for navigating this codebase effectively?" | |
| # Default response for other queries | |
| else: | |
| response = f"Based on my analysis of {repo_name}:\n\n{context}\n\nIs there anything specific you'd like to know more about?" | |
| return response | |
| except Exception as e: | |
| error_message = f"Error processing your question: {str(e)}" | |
| logger.error(error_message) | |
| return error_message | |
| # Create Gradio interface | |
| with gr.Blocks(css=css) as interface: | |
| gr.Markdown("# GitHub Repository Analyzer") | |
| gr.Markdown("Analyze GitHub repositories and chat about the insights") | |
| with gr.Tab("Repository Analysis"): | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| repo_url = gr.Textbox(label="GitHub Repository URL", placeholder="https://github.com/owner/repo") | |
| with gr.Column(scale=1): | |
| is_private = gr.Checkbox(label="Private Repository") | |
| github_token = gr.Textbox(label="GitHub Token (for private repos)", type="password", visible=False) | |
| # Show/hide token input based on private repo checkbox | |
| is_private.change(fn=lambda x: gr.update(visible=x), inputs=[is_private], outputs=[github_token]) | |
| analyze_btn = gr.Button("Analyze Repository", variant="primary") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| analysis_result = gr.HTML(label="Analysis Result", elem_classes=["analysis-result"]) | |
| with gr.Column(scale=1): | |
| with gr.Group(): | |
| gr.Markdown("### PDF Report") | |
| pdf_btn = gr.Button("Generate PDF Report", variant="secondary") | |
| pdf_output = gr.Markdown() | |
| pdf_download = gr.File(label="Download Report", elem_classes=["pdf-download"]) | |
| # Connect buttons to functions | |
| analyze_btn.click( | |
| fn=analyze_repository, | |
| inputs=[repo_url, is_private, github_token], | |
| outputs=[analysis_result, pdf_output] | |
| ) | |
| pdf_btn.click( | |
| fn=generate_pdf_report, | |
| inputs=[], | |
| outputs=[pdf_output, pdf_download] | |
| ) | |
| with gr.Tab("Chat with Repository"): | |
| gr.Markdown("Ask questions about the repository and get insights") | |
| chatbot = gr.Chatbot(elem_classes=["chat-interface"]) | |
| msg = gr.Textbox( | |
| placeholder="Ask me anything about the repository...", | |
| show_label=False | |
| ) | |
| clear = gr.Button("Clear") | |
| # Connect chat interface | |
| msg.submit( | |
| fn=chat_with_repo, | |
| inputs=[msg, chatbot], | |
| outputs=[chatbot], | |
| postprocess=lambda x: [(msg.value, x)] | |
| ).then(lambda: "", None, msg) | |
| clear.click(lambda: None, None, chatbot, queue=False) | |
| return interface | |
| # Main code to run the application | |
| if __name__ == "__main__": | |
| # Create and launch Gradio interface | |
| interface = create_gradio_interface() | |
| interface.launch(debug=True, share=True) | |