diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -1,64 +1,3808 @@ + + +import os +import json +import time +import re +import logging +import datetime +import concurrent.futures +import sys +import base64 +import tempfile +from pathlib import Path +from typing import Dict, List, Union, Any, Optional, Tuple, Set +from collections import Counter, defaultdict +from dataclasses import dataclass, field, asdict +from io import BytesIO, StringIO +import urllib.request + +import numpy as np +import pandas as pd +import matplotlib.pyplot as plt +import seaborn as sns +import networkx as nx +import plotly.express as px +import plotly.graph_objects as go +from plotly.subplots import make_subplots +from tqdm.notebook import tqdm +from dateutil.relativedelta import relativedelta +from github import Github, GithubException, RateLimitExceededException import gradio as gr -from huggingface_hub import InferenceClient - -""" -For more information on `huggingface_hub` Inference API support, please check the docs: https://huggingface.co/docs/huggingface_hub/v0.22.2/en/guides/inference -""" -client = InferenceClient("HuggingFaceH4/zephyr-7b-beta") - - -def respond( - message, - history: list[tuple[str, str]], - system_message, - max_tokens, - temperature, - top_p, -): - messages = [{"role": "system", "content": system_message}] - - for val in history: - if val[0]: - messages.append({"role": "user", "content": val[0]}) - if val[1]: - messages.append({"role": "assistant", "content": val[1]}) - - messages.append({"role": "user", "content": message}) - - response = "" - - for message in client.chat_completion( - messages, - max_tokens=max_tokens, - stream=True, - temperature=temperature, - top_p=top_p, - ): - token = message.choices[0].delta.content - - response += token - yield response - - -""" -For information on how to customize the ChatInterface, peruse the gradio docs: https://www.gradio.app/docs/chatinterface -""" -demo = gr.ChatInterface( - respond, - additional_inputs=[ - gr.Textbox(value="You are a friendly Chatbot.", label="System message"), - gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="Max new tokens"), - gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature"), - gr.Slider( - minimum=0.1, - maximum=1.0, - value=0.95, - step=0.05, - label="Top-p (nucleus sampling)", - ), - ], + +# For PDF Generation +from reportlab.lib.pagesizes import letter, A4 +from reportlab.lib import colors +from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle +from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Image, Table, TableStyle, PageBreak +from reportlab.lib.units import inch +from reportlab.pdfgen import canvas +from reportlab.lib.enums import TA_CENTER, TA_LEFT + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.StreamHandler() + ] ) +logger = logging.getLogger("github_analyzer") + + +@dataclass +class GitHubAPIConfig: + """Configuration for the GitHub API client with sensible defaults.""" + + # API access configuration + token: str = None + max_retries: int = 5 + backoff_factor: int = 2 + per_page: int = 100 # Max allowed by GitHub + timeout: int = 30 + + # Retry status codes + retry_status_codes: Set[int] = field(default_factory=lambda: { + 403, 429, 500, 502, 503, 504 + }) + + # Permission types + collaborator_permission_types: List[str] = field(default_factory=lambda: [ + "admin", "push", "pull", "maintain", "triage" + ]) + + # File classification + code_extensions: List[str] = field(default_factory=lambda: [ + ".py", ".js", ".java", ".c", ".cpp", ".cs", ".go", ".php", ".rb", + ".swift", ".kt", ".ts", ".rs", ".scala", ".lua", ".m", ".mm", + ".h", ".hpp", ".cc", ".hh", ".f", ".f90", ".f95", ".f03", ".f08", + ".for", ".f77", ".jl", ".pl", ".pm", ".t", ".r", ".dart", ".groovy", + ".v", ".vhd", ".vhdl", ".erl", ".hrl", ".hs", ".lhs", ".ex", ".exs", ".hx" + ]) + + markup_extensions: List[str] = field(default_factory=lambda: [ + ".md", ".html", ".htm", ".xml", ".json", ".yaml", ".yml", ".txt", + ".rst", ".tex", ".adoc", ".csv", ".tsv", ".toml", ".ini", ".cfg" + ]) + + script_extensions: List[str] = field(default_factory=lambda: [ + ".sh", ".bash", ".zsh", ".ps1", ".bat", ".cmd" + ]) + + notebook_extensions: List[str] = field(default_factory=lambda: [ + ".ipynb" + ]) + + data_extensions: List[str] = field(default_factory=lambda: [ + ".csv", ".tsv", ".json", ".xml", ".xls", ".xlsx", ".hdf5", + ".parquet", ".feather", ".pkl", ".sav", ".dta", ".arff" + ]) + + config_extensions: List[str] = field(default_factory=lambda: [ + ".yml", ".yaml", ".json", ".toml", ".ini", ".cfg", ".conf" + ]) + + other_extensions: List[str] = field(default_factory=lambda: [ + ".txt", ".log", ".svg", ".png", ".jpg", ".jpeg" + ]) + + # Data collection limits (set to None for no limit) + max_contributors: Optional[int] = 50 + max_issues: Optional[int] = 100 + max_commits: Optional[int] = 200 + max_search_results: Optional[int] = 50 + max_pull_requests: Optional[int] = 100 + max_collaborators: Optional[int] = 30 + + # Output configuration + output_dir: str = "/tmp/github_data" + generate_visualizations: bool = True + + def __post_init__(self): + """Ensure output directory exists""" + os.makedirs(self.output_dir, exist_ok=True) + + def all_code_extensions(self) -> List[str]: + """Return all code-related file extensions""" + return list(set( + self.code_extensions + + self.script_extensions + + self.config_extensions + )) + + +class GithubClient: + """ + A robust GitHub client that handles rate limiting, retries, and provides + consistent error handling. + """ + + def __init__(self, config: GitHubAPIConfig): + """Initialize the GitHub client with configuration.""" + self.config = config + self.github = Github( + config.token, + per_page=config.per_page, + timeout=config.timeout, + retry=config.max_retries + ) + self.cache = {} # Simple in-memory cache + + def get_repo(self, repo_path: str): + """Get a repository by owner/name with caching.""" + cache_key = f"repo:{repo_path}" + if cache_key in self.cache: + return self.cache[cache_key] + + repo = self.github.get_repo(repo_path) + self.cache[cache_key] = repo + return repo + + def _handle_exception(self, e: GithubException, retry_count: int) -> bool: + """ + Handle GitHub exceptions with proper retries and backoff strategy. + + Args: + e: The exception to handle + retry_count: Current retry count + + Returns: + bool: True if retry should be attempted, False otherwise + """ + if retry_count >= self.config.max_retries: + logger.error(f"Max retries ({self.config.max_retries}) exceeded.") + return False + + if isinstance(e, RateLimitExceededException): + # Handle primary rate limit + rate_limit = self.github.get_rate_limit() + reset_time = rate_limit.core.reset.timestamp() if hasattr(rate_limit, 'core') else time.time() + 3600 + sleep_time = max(0, int(reset_time - time.time())) + 1 + + logger.warning(f"Rate limit exceeded. Waiting for {sleep_time} seconds...") + time.sleep(sleep_time) + return True + + elif e.status in self.config.retry_status_codes: + # Handle secondary rate limits and server errors + sleep_time = self.config.backoff_factor ** retry_count + logger.warning( + f"Temporary error (status {e.status}). Retrying in {sleep_time} seconds. " + f"Attempt {retry_count+1}/{self.config.max_retries}." + ) + time.sleep(sleep_time) + return True + + # Non-recoverable error + logger.error(f"Non-recoverable GitHub API error: {e}") + return False + + def _paginated_request(self, method, *args, **kwargs): + """ + Execute a paginated GitHub API request with retry logic. + + Args: + method: The PyGithub method to call + + Returns: + List of results or None on non-recoverable error + """ + results = [] + retry_count = 0 + max_results = kwargs.pop('max_results', None) + + while retry_count <= self.config.max_retries: + try: + paginated_list = method(*args, **kwargs) + + # Process items + for item in paginated_list: + results.append(item) + if max_results and len(results) >= max_results: + return results + + # Check if we've reached the end + if paginated_list.totalCount <= len(results): + break + + # Reset retry counter on success + retry_count = 0 + + except GithubException as e: + if self._handle_exception(e, retry_count): + retry_count += 1 + else: + return None + + return results + + def _execute_request(self, method, *args, **kwargs): + """ + Execute a single GitHub API request with retry logic. + + Args: + method: The PyGithub method to call + + Returns: + Result of the API call or None on non-recoverable error + """ + retry_count = 0 + while retry_count <= self.config.max_retries: + try: + result = method(*args, **kwargs) + return result + except GithubException as e: + # Special case for 404 errors - file not found + if e.status == 404: + logger.info(f"Resource not found: {e}") + return None + + if self._handle_exception(e, retry_count): + retry_count += 1 + else: + return None + + return None +class GitHubRepoAnalyzer: + """ + Main class for analyzing GitHub repositories and generating insights. + """ + + def __init__(self, config: GitHubAPIConfig): + """Initialize the analyzer with configuration.""" + self.config = config + self.client = GithubClient(config) + + def get_repo_details(self, repo) -> Dict[str, Any]: + """Get comprehensive repository metadata.""" + logger.info(f"Fetching repository details for {repo.full_name}") + + return { + "name": repo.name, + "full_name": repo.full_name, + "description": repo.description, + "html_url": repo.html_url, + "stargazers_count": repo.stargazers_count, + "watchers_count": repo.watchers_count, + "forks_count": repo.forks_count, + "open_issues_count": repo.open_issues_count, + "language": repo.language, + "default_branch": repo.default_branch, + "created_at": repo.created_at.isoformat() if repo.created_at else None, + "updated_at": repo.updated_at.isoformat() if repo.updated_at else None, + "pushed_at": repo.pushed_at.isoformat() if repo.pushed_at else None, + "license": repo.license.name if repo.license else None, + "topics": list(repo.get_topics()), + "archived": repo.archived, + "disabled": repo.disabled, + "visibility": repo.visibility, + "has_wiki": repo.has_wiki, + "has_pages": repo.has_pages, + "has_projects": repo.has_projects, + "has_issues": repo.has_issues, + "has_discussions": repo.has_discussions if hasattr(repo, 'has_discussions') else None, + "size": repo.size, # Size in KB + "network_count": repo.network_count, + "subscribers_count": repo.subscribers_count, + "organization": repo.organization.login if repo.organization else None, + "parent": repo.parent.full_name if hasattr(repo, 'parent') and repo.parent else None, + "fork": repo.fork, + } + + def get_contributors(self, repo) -> List[Dict[str, Any]]: + """Get repository contributors with detailed information.""" + logger.info(f"Fetching contributors for {repo.full_name}") + + contributors = self.client._paginated_request( + repo.get_contributors, + max_results=self.config.max_contributors + ) + + if contributors is None: + return [] + + return [ + { + "login": c.login, + "id": c.id, + "contributions": c.contributions, + "type": c.type, + "html_url": c.html_url, + "followers": c.followers, + "following": c.following, + "public_repos": c.public_repos if hasattr(c, 'public_repos') else None, + "bio": c.bio if hasattr(c, 'bio') else None, + "location": c.location if hasattr(c, 'location') else None, + "company": c.company if hasattr(c, 'company') else None, + "email": c.email if hasattr(c, 'email') else None, + "avatar_url": c.avatar_url if hasattr(c, 'avatar_url') else None, + } + for c in contributors + ] + + def get_languages(self, repo) -> Dict[str, int]: + """Get languages used in the repository.""" + logger.info(f"Fetching languages for {repo.full_name}") + + languages = self.client._execute_request(repo.get_languages) + return languages or {} + + def get_issues(self, repo, state: str = "all") -> List[Dict[str, Any]]: + """Get repository issues.""" + logger.info(f"Fetching issues for {repo.full_name} with state={state}") + + issues = self.client._paginated_request( + repo.get_issues, + state=state, + max_results=self.config.max_issues + ) + + if issues is None: + return [] + + return [ + { + "id": issue.id, + "number": issue.number, + "title": issue.title, + "body": issue.body, + "state": issue.state, + "user_login": issue.user.login if issue.user else None, + "labels": [label.name for label in issue.labels], + "comments": issue.comments, + "created_at": issue.created_at.isoformat() if issue.created_at else None, + "updated_at": issue.updated_at.isoformat() if issue.updated_at else None, + "closed_at": issue.closed_at.isoformat() if issue.closed_at else None, + "pull_request": issue.pull_request is not None, + "milestone": issue.milestone.title if issue.milestone else None, + "assignees": [user.login for user in issue.assignees] if issue.assignees else [], + } + for issue in issues + ] + + def get_commits(self, repo) -> List[Dict[str, Any]]: + """Get repository commits.""" + logger.info(f"Fetching commits for {repo.full_name}") + + commits = self.client._paginated_request( + repo.get_commits, + max_results=self.config.max_commits + ) + + if commits is None: + return [] + + return [ + { + "sha": commit.sha, + "commit_message": commit.commit.message, + "author_login": commit.author.login if commit.author else None, + "author_name": commit.commit.author.name if commit.commit and commit.commit.author else None, + "author_email": commit.commit.author.email if commit.commit and commit.commit.author else None, + "committer_login": commit.committer.login if commit.committer else None, + "committer_name": commit.commit.committer.name if commit.commit and commit.commit.committer else None, + "date": commit.commit.author.date.isoformat() if commit.commit and commit.commit.author else None, + "html_url": commit.html_url, + "stats": { + "additions": commit.stats.additions if hasattr(commit, 'stats') else None, + "deletions": commit.stats.deletions if hasattr(commit, 'stats') else None, + "total": commit.stats.total if hasattr(commit, 'stats') else None, + }, + "files_changed": [ + {"filename": f.filename, "additions": f.additions, "deletions": f.deletions, "status": f.status} + for f in commit.files + ] if hasattr(commit, 'files') else [], + } + for commit in commits + ] + + def get_readme(self, repo) -> str: + """Get repository README content.""" + logger.info(f"Fetching README for {repo.full_name}") + + readme = self.client._execute_request(repo.get_readme) + if readme is None: + return "" + + try: + return readme.decoded_content.decode('utf-8') + except UnicodeDecodeError: + logger.warning(f"Could not decode README content for {repo.full_name}") + return "" + + def get_pull_requests(self, repo, state: str = "all") -> List[Dict[str, Any]]: + """Get repository pull requests.""" + logger.info(f"Fetching pull requests for {repo.full_name} with state={state}") + + pulls = self.client._paginated_request( + repo.get_pulls, + state=state, + max_results=self.config.max_pull_requests + ) + + if pulls is None: + return [] + + return [ + { + "id": pull.id, + "number": pull.number, + "title": pull.title, + "body": pull.body, + "state": pull.state, + "user_login": pull.user.login if pull.user else None, + "created_at": pull.created_at.isoformat() if pull.created_at else None, + "updated_at": pull.updated_at.isoformat() if pull.updated_at else None, + "closed_at": pull.closed_at.isoformat() if pull.closed_at else None, + "merged_at": pull.merged_at.isoformat() if pull.merged_at else None, + "draft": pull.draft if hasattr(pull, 'draft') else None, + "mergeable": pull.mergeable if hasattr(pull, 'mergeable') else None, + "mergeable_state": pull.mergeable_state if hasattr(pull, 'mergeable_state') else None, + "merged": pull.merged if hasattr(pull, 'merged') else None, + "merge_commit_sha": pull.merge_commit_sha if hasattr(pull, 'merge_commit_sha') else None, + "comments": pull.comments if hasattr(pull, 'comments') else 0, + "review_comments": pull.review_comments if hasattr(pull, 'review_comments') else 0, + "commits": pull.commits if hasattr(pull, 'commits') else 0, + "additions": pull.additions if hasattr(pull, 'additions') else 0, + "deletions": pull.deletions if hasattr(pull, 'deletions') else 0, + "changed_files": pull.changed_files if hasattr(pull, 'changed_files') else 0, + "head_ref": pull.head.ref if hasattr(pull, 'head') and pull.head else None, + "base_ref": pull.base.ref if hasattr(pull, 'base') and pull.base else None, + "labels": [label.name for label in pull.labels] if hasattr(pull, 'labels') else [], + "assignees": [user.login for user in pull.assignees] if hasattr(pull, 'assignees') else [], + "requested_reviewers": [user.login for user in pull.requested_reviewers] if hasattr(pull, 'requested_reviewers') else [], + } + for pull in pulls + ] + + def get_collaborators(self, repo, affiliation: str = "all") -> List[Dict[str, Any]]: + """Get repository collaborators.""" + logger.info(f"Fetching collaborators for {repo.full_name} with affiliation={affiliation}") + + collaborators = self.client._paginated_request( + repo.get_collaborators, + affiliation=affiliation, + max_results=self.config.max_collaborators + ) + + if collaborators is None: + return [] + + return [ + { + "login": c.login, + "id": c.id, + "type": c.type, + "url": c.url, + "site_admin": c.site_admin if hasattr(c, 'site_admin') else None, + "role_name": self._get_permission_level(repo, c.login), + "avatar_url": c.avatar_url if hasattr(c, 'avatar_url') else None, + } + for c in collaborators + ] + + def _get_permission_level(self, repo, username: str) -> str: + """Get permission level for a collaborator.""" + try: + return repo.get_collaborator_permission(username) + except GithubException: + return "unknown" + + def get_file_distribution(self, repo) -> Dict[str, int]: + """Analyze file types distribution in the repository.""" + logger.info(f"Analyzing file distribution for {repo.full_name}") + + # Get all files in the repo (only feasible for smaller repos) + try: + contents = self.client._execute_request(repo.get_contents, "") + if not contents: + return {} + + file_types = defaultdict(int) + directories = [] + + # Process initial contents + for item in contents: + if item.type == "dir": + directories.append(item.path) + elif item.type == "file": + ext = os.path.splitext(item.name)[1].lower() + file_types[ext if ext else "no_extension"] += 1 + + # Process directories (up to a reasonable depth to avoid API rate limits) + max_depth = 3 + for depth in range(max_depth): + if not directories: + break + + next_level = [] + for directory in directories[:100]: # Limit to avoid excessive API calls + dir_contents = self.client._execute_request(repo.get_contents, directory) + if not dir_contents: + continue + + for item in dir_contents: + if item.type == "dir": + next_level.append(item.path) + elif item.type == "file": + ext = os.path.splitext(item.name)[1].lower() + file_types[ext if ext else "no_extension"] += 1 + + directories = next_level + + return dict(file_types) + except GithubException: + logger.warning(f"Could not get file distribution for {repo.full_name}") + return {} + + def search_code(self, repo, query_terms: List[str]) -> List[Dict[str, Any]]: + """Search for specific terms in the repository code.""" + logger.info(f"Searching code in {repo.full_name} for terms: {query_terms}") + + results = [] + for term in query_terms: + query = f"repo:{repo.full_name} {term}" + search_results = self.client._paginated_request( + self.client.github.search_code, + query, + max_results=self.config.max_search_results + ) + + if search_results: + results.extend([ + { + "term": term, + "name": result.name, + "path": result.path, + "sha": result.sha, + "url": result.html_url, + "repository": result.repository.full_name, + } + for result in search_results + if result.repository.full_name == repo.full_name + ]) + + return results + + def get_branches(self, repo) -> List[Dict[str, Any]]: + """Get repository branches.""" + logger.info(f"Fetching branches for {repo.full_name}") + + branches = self.client._paginated_request(repo.get_branches) + + if branches is None: + return [] + + return [ + { + "name": branch.name, + "protected": branch.protected, + "commit_sha": branch.commit.sha if branch.commit else None, + } + for branch in branches + ] + + def get_releases(self, repo) -> List[Dict[str, Any]]: + """Get repository releases.""" + logger.info(f"Fetching releases for {repo.full_name}") + + releases = self.client._paginated_request(repo.get_releases) + + if releases is None: + return [] + + return [ + { + "id": release.id, + "tag_name": release.tag_name, + "name": release.title, + "body": release.body, + "draft": release.draft, + "prerelease": release.prerelease, + "created_at": release.created_at.isoformat() if release.created_at else None, + "published_at": release.published_at.isoformat() if release.published_at else None, + "author_login": release.author.login if release.author else None, + "html_url": release.html_url, + "assets": [ + { + "name": asset.name, + "label": asset.label, + "content_type": asset.content_type, + "size": asset.size, + "download_count": asset.download_count, + "browser_download_url": asset.browser_download_url, + } + for asset in release.get_assets() + ], + } + for release in releases + ] + + def get_workflows(self, repo) -> List[Dict[str, Any]]: + """Get repository GitHub Actions workflows.""" + logger.info(f"Fetching workflows for {repo.full_name}") + + try: + workflows = self.client._paginated_request(repo.get_workflows) + + if workflows is None: + return [] + + return [ + { + "id": workflow.id, + "name": workflow.name, + "path": workflow.path, + "state": workflow.state, + "created_at": workflow.created_at.isoformat() if workflow.created_at else None, + "updated_at": workflow.updated_at.isoformat() if workflow.updated_at else None, + } + for workflow in workflows + ] + except (GithubException, AttributeError): + # Older PyGithub versions or repositories without workflows + return [] + + def analyze_commit_activity(self, repo) -> Dict[str, Any]: + """Analyze commit activity patterns.""" + logger.info(f"Analyzing commit activity for {repo.full_name}") + + # Get stats commit activity + stats = self.client._execute_request(repo.get_stats_commit_activity) + if not stats: + return {} + + weekly_commits = [] + for week in stats: + if hasattr(week, 'week') and hasattr(week, 'total'): + date = datetime.datetime.fromtimestamp(week.week).strftime('%Y-%m-%d') + weekly_commits.append({ + "week": date, + "total": week.total, + "days": week.days if hasattr(week, 'days') else [], + }) + + # Get code frequency + code_freq = self.client._execute_request(repo.get_stats_code_frequency) + if not code_freq: + code_frequency = [] + else: + code_frequency = [] + for item in code_freq: + date = datetime.datetime.fromtimestamp(item[0]).strftime('%Y-%m-%d') + code_frequency.append({ + "week": date, + "additions": item[1], + "deletions": -item[2], # Convert to positive for better readability + }) + + return { + "weekly_commits": weekly_commits, + "code_frequency": code_frequency, + } + + def analyze_contributor_activity(self, repo) -> Dict[str, Any]: + """Analyze contributor activity patterns.""" + logger.info(f"Analyzing contributor activity for {repo.full_name}") + + # Get contributor stats + stats = self.client._execute_request(repo.get_stats_contributors) + if not stats: + return {} + + contributor_stats = [] + for stat in stats: + if not hasattr(stat, 'author') or not stat.author: + continue + + weeks_data = [] + for week in stat.weeks: + if hasattr(week, 'w'): + date = datetime.datetime.fromtimestamp(week.w).strftime('%Y-%m-%d') + weeks_data.append({ + "week": date, + "additions": week.a, + "deletions": week.d, + "commits": week.c, + }) + + contributor_stats.append({ + "author": stat.author.login, + "total_commits": stat.total, + "weeks": weeks_data, + }) + + return { + "contributor_stats": contributor_stats, + } + + def analyze_issue_distribution(self, issues: List[Dict[str, Any]]) -> Dict[str, Any]: + """Analyze distribution of issues by various metrics.""" + if not issues: + return {} + + # Convert to DataFrame for easier analysis + df = pd.DataFrame(issues) + + # Issues by state + state_counts = df['state'].value_counts().to_dict() if 'state' in df else {} + + # Issues by user + user_counts = df['user_login'].value_counts().head(10).to_dict() if 'user_login' in df else {} + + # Pull requests vs regular issues + is_pr_counts = df['pull_request'].value_counts().to_dict() if 'pull_request' in df else {} + + # Issues by labels (flattening the labels list) + labels = [] + if 'labels' in df: + for label_list in df['labels']: + if label_list: + labels.extend(label_list) + + label_counts = Counter(labels) + top_labels = dict(label_counts.most_common(10)) + + # Time analysis + if 'created_at' in df: + df['created_date'] = pd.to_datetime(df['created_at']) + df['month_year'] = df['created_date'].dt.strftime('%Y-%m') + issues_by_month = df.groupby('month_year').size().to_dict() + else: + issues_by_month = {} + + # Calculate resolution time for closed issues + resolution_times = [] + if 'created_at' in df and 'closed_at' in df: + for _, issue in df.iterrows(): + if pd.notna(issue.get('closed_at')) and pd.notna(issue.get('created_at')): + created = pd.to_datetime(issue['created_at']) + closed = pd.to_datetime(issue['closed_at']) + resolution_time = (closed - created).total_seconds() / 3600 # hours + resolution_times.append(resolution_time) + + resolution_stats = {} + if resolution_times: + resolution_stats = { + "mean_hours": sum(resolution_times) / len(resolution_times), + "median_hours": sorted(resolution_times)[len(resolution_times) // 2], + "min_hours": min(resolution_times), + "max_hours": max(resolution_times), + } + + return { + "by_state": state_counts, + "by_user": user_counts, + "pr_vs_issue": is_pr_counts, + "by_label": top_labels, + "by_month": issues_by_month, + "resolution_time": resolution_stats, + } + + def generate_insights(self, repo_data: Dict[str, Any]) -> Dict[str, Any]: + """Generate higher-level insights from the collected repository data.""" + insights = {} + + # Repository activity and health + if "repo_details" in repo_data: + repo_details = repo_data["repo_details"] + insights["repository_age_days"] = self._calculate_age_days(repo_details.get("created_at")) + insights["freshness_days"] = self._calculate_freshness_days(repo_details.get("pushed_at")) + + # Popularity metrics + insights["popularity"] = { + "stars": repo_details.get("stargazers_count", 0), + "forks": repo_details.get("forks_count", 0), + "watchers": repo_details.get("watchers_count", 0), + "star_fork_ratio": self._calculate_ratio( + repo_details.get("stargazers_count", 0), + repo_details.get("forks_count", 0) + ), + } + + # Language distribution + if "languages" in repo_data: + languages = repo_data["languages"] + total_bytes = sum(languages.values()) if languages else 0 + + if total_bytes > 0: + language_percentages = { + lang: (bytes_count / total_bytes) * 100 + for lang, bytes_count in languages.items() + } + + insights["language_distribution"] = { + "primary_language": max(languages.items(), key=lambda x: x[1])[0] if languages else None, + "language_count": len(languages), + "percentages": language_percentages, + } + + # Contributor insights + if "contributors" in repo_data: + contributors = repo_data["contributors"] + + if contributors: + total_contributions = sum(c.get("contributions", 0) for c in contributors) + insights["contributor_insights"] = { + "contributor_count": len(contributors), + "total_contributions": total_contributions, + "avg_contributions_per_contributor": total_contributions / len(contributors) if len(contributors) > 0 else 0, + "contribution_distribution": self._analyze_contribution_distribution(contributors), + } + + # Issue and PR dynamics + if "issues" in repo_data: + issues = repo_data["issues"] + insights["issue_insights"] = self.analyze_issue_distribution(issues) + + if "pull_requests" in repo_data: + prs = repo_data["pull_requests"] + insights["pr_insights"] = self.analyze_issue_distribution(prs) # Reuse the same analysis + + # Additional PR-specific metrics + if prs: + insights["pr_code_change_stats"] = self._analyze_pr_code_changes(prs) + + # Commit patterns + if "commits" in repo_data: + commits = repo_data["commits"] + insights["commit_insights"] = self._analyze_commit_patterns(commits) + + # Check for CI/CD presence + insights["ci_cd_presence"] = self._detect_ci_cd(repo_data) + + # Documentation quality + if "readme" in repo_data: + readme = repo_data["readme"] + insights["documentation_quality"] = self._assess_documentation_quality(readme) + + # Project Activity Level + insights["activity_level"] = self._calculate_activity_level(repo_data) + + # Code complexity analysis + insights["code_complexity"] = self._analyze_code_complexity(repo_data) + + # Community health analysis + insights["community_health"] = self._analyze_community_health(repo_data) + + return insights + + def _calculate_age_days(self, created_at_iso: str) -> float: + """Calculate repository age in days.""" + if not created_at_iso: + return 0 + + try: + created_at = datetime.datetime.fromisoformat(created_at_iso.replace('Z', '+00:00')) + now = datetime.datetime.now(datetime.timezone.utc) + return (now - created_at).total_seconds() / (24 * 3600) + except ValueError: + return 0 + + def _calculate_freshness_days(self, pushed_at_iso: str) -> float: + """Calculate days since last push.""" + if not pushed_at_iso: + return float('inf') + + try: + pushed_at = datetime.datetime.fromisoformat(pushed_at_iso.replace('Z', '+00:00')) + now = datetime.datetime.now(datetime.timezone.utc) + return (now - pushed_at).total_seconds() / (24 * 3600) + except ValueError: + return float('inf') + + def _calculate_ratio(self, numerator: int, denominator: int) -> float: + """Calculate ratio with handling for zero denominator.""" + return numerator / denominator if denominator and denominator > 0 else float('inf') + + def _analyze_contribution_distribution(self, contributors: List[Dict[str, Any]]) -> Dict[str, Any]: + """Analyze the distribution of contributions among contributors.""" + if not contributors: + return {} + + # Sort contributors by number of contributions + sorted_contributors = sorted(contributors, key=lambda c: c.get("contributions", 0), reverse=True) + + # Calculate percentiles + total_contributions = sum(c.get("contributions", 0) for c in contributors) + cumulative_contributions = 0 + percentile_20 = 0 + percentile_50 = 0 + percentile_80 = 0 + + for i, contributor in enumerate(sorted_contributors): + contributions = contributor.get("contributions", 0) + cumulative_contributions += contributions + percentage = (cumulative_contributions / total_contributions) * 100 + + if percentage >= 20 and percentile_20 == 0: + percentile_20 = i + 1 + if percentage >= 50 and percentile_50 == 0: + percentile_50 = i + 1 + if percentage >= 80 and percentile_80 == 0: + percentile_80 = i + 1 + + # Calculate Gini coefficient to measure inequality + gini = self._calculate_gini([c.get("contributions", 0) for c in contributors]) + + return { + "contributors_for_20_percent": percentile_20, + "contributors_for_50_percent": percentile_50, + "contributors_for_80_percent": percentile_80, + "gini_coefficient": gini, + "top_contributor_percentage": (sorted_contributors[0].get("contributions", 0) / total_contributions) * 100 if sorted_contributors else 0, + } + + def _calculate_gini(self, values: List[int]) -> float: + """Calculate the Gini coefficient of a distribution.""" + if not values or sum(values) == 0: + return 0 + + values = sorted(values) + n = len(values) + cumsum = 0 + for i, value in enumerate(values): + cumsum += value + values[i] = cumsum + + return (2 * sum(values) / (n * sum(values[-1]))) - (n + 1) / n + + def _analyze_pr_code_changes(self, prs: List[Dict[str, Any]]) -> Dict[str, Any]: + """Analyze code changes across pull requests.""" + if not prs: + return {} + + # Extract metrics + additions = [pr.get("additions", 0) for pr in prs if pr.get("additions") is not None] + deletions = [pr.get("deletions", 0) for pr in prs if pr.get("deletions") is not None] + changed_files = [pr.get("changed_files", 0) for pr in prs if pr.get("changed_files") is not None] + + # Calculate stats + stats = {} + + if additions: + stats["additions"] = { + "mean": sum(additions) / len(additions), + "median": sorted(additions)[len(additions) // 2], + "max": max(additions), + "total": sum(additions), + } + + if deletions: + stats["deletions"] = { + "mean": sum(deletions) / len(deletions), + "median": sorted(deletions)[len(deletions) // 2], + "max": max(deletions), + "total": sum(deletions), + } + + if changed_files: + stats["changed_files"] = { + "mean": sum(changed_files) / len(changed_files), + "median": sorted(changed_files)[len(changed_files) // 2], + "max": max(changed_files), + "total": sum(changed_files), + } + + return stats + + def _analyze_commit_patterns(self, commits: List[Dict[str, Any]]) -> Dict[str, Any]: + """Analyze patterns in commit data.""" + if not commits: + return {} + + # Count by author + commit_counts = Counter( + commit.get("author_login", "Unknown") + for commit in commits + if commit.get("author_login") + ) + + # Analyze message patterns + message_lengths = [ + len(commit.get("commit_message", "")) + for commit in commits + if commit.get("commit_message") + ] + + # Extract dates for time-based analysis + dates = [] + for commit in commits: + date_str = commit.get("date") + if date_str: + try: + date = datetime.datetime.fromisoformat(date_str.replace('Z', '+00:00')) + dates.append(date) + except ValueError: + pass + + # Analyze times of day + hours = [date.hour for date in dates] + hour_counts = Counter(hours) + + # Analyze days of week + weekdays = [date.weekday() for date in dates] + weekday_counts = Counter(weekdays) + weekday_names = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"] + weekday_data = {weekday_names[day]: count for day, count in weekday_counts.items()} + + # Analyze frequency of commits over time + commit_frequency = {} + if dates: + dates_sorted = sorted(dates) + first_date = dates_sorted[0] + last_date = dates_sorted[-1] + + # Calculate commit frequency by month + current_date = first_date.replace(day=1) + while current_date <= last_date: + next_month = current_date.replace(day=28) + datetime.timedelta(days=4) + next_month = next_month.replace(day=1) + + month_key = current_date.strftime('%Y-%m') + commit_frequency[month_key] = sum( + 1 for date in dates + if date.year == current_date.year and date.month == current_date.month + ) + + current_date = next_month + + return { + "top_contributors": dict(commit_counts.most_common(5)), + "message_length": { + "mean": sum(message_lengths) / len(message_lengths) if message_lengths else 0, + "max": max(message_lengths) if message_lengths else 0, + "min": min(message_lengths) if message_lengths else 0, + }, + "commit_time_patterns": { + "by_hour": dict(sorted(hour_counts.items())), + "by_weekday": weekday_data, + }, + "commit_frequency": commit_frequency, + } + + def _detect_ci_cd(self, repo_data: Dict[str, Any]) -> Dict[str, Any]: + """Detect CI/CD presence and configuration in the repository.""" + ci_cd_indicators = { + "github_actions": False, + "travis": False, + "circle_ci": False, + "jenkins": False, + "gitlab_ci": False, + "azure_pipelines": False, + } + + # Check workflows + if "workflows" in repo_data and repo_data["workflows"]: + ci_cd_indicators["github_actions"] = True + + # Check for CI configuration files + if "file_distribution" in repo_data: + files = repo_data.get("file_distribution", {}) + if ".travis.yml" in files: + ci_cd_indicators["travis"] = True + if ".circleci/config.yml" in files or "circle.yml" in files: + ci_cd_indicators["circle_ci"] = True + if "Jenkinsfile" in files: + ci_cd_indicators["jenkins"] = True + if ".gitlab-ci.yml" in files: + ci_cd_indicators["gitlab_ci"] = True + if "azure-pipelines.yml" in files: + ci_cd_indicators["azure_pipelines"] = True + + return { + "has_ci_cd": any(ci_cd_indicators.values()), + "ci_cd_systems": ci_cd_indicators, + } + + def _assess_documentation_quality(self, readme: str) -> Dict[str, Any]: + """Assess the quality of documentation based on the README.""" + if not readme: + return { + "has_readme": False, + "readme_length": 0, + "score": 0, + "sections": {}, + } + + # Analyze the README content + lines = readme.strip().split('\n') + word_count = len(readme.split()) + sections = {} + + # Check for common README sections + section_keywords = { + "introduction": ["introduction", "overview", "about"], + "installation": ["installation", "install", "setup", "getting started"], + "usage": ["usage", "using", "example", "examples"], + "api": ["api", "reference", "documentation"], + "contributing": ["contributing", "contribute", "development"], + "license": ["license", "licensing"], + "code_of_conduct": ["code of conduct"], + } + + for section, keywords in section_keywords.items(): + sections[section] = any( + any(keyword.lower() in line.lower() for keyword in keywords) + for line in lines + ) + + # Count images/diagrams (markdown format) + image_count = readme.count("![") + + # Count code examples + code_block_count = readme.count("```") + + # Calculate a simple score + section_score = sum(1 for present in sections.values() if present) / len(sections) + has_images = image_count > 0 + has_code = code_block_count > 0 + length_score = min(1.0, word_count / 1000) # Normalize to 0-1, with 1000+ words being "complete" + + score = (section_score * 0.5) + (has_images * 0.2) + (has_code * 0.2) + (length_score * 0.1) + + return { + "has_readme": True, + "readme_length": word_count, + "score": score, + "sections": sections, + "has_images": has_images, + "image_count": image_count, + "has_code_examples": has_code, + "code_block_count": code_block_count // 2, # Each block has opening and closing ``` + } + + def _calculate_activity_level(self, repo_data: Dict[str, Any]) -> Dict[str, Any]: + """Calculate repository activity level based on commits, PRs, and issues.""" + activity_score = 0 + activity_details = {} + + # Get repository age in months + if "repo_details" in repo_data: + age_days = self._calculate_age_days(repo_data["repo_details"].get("created_at")) + age_months = age_days / 30.5 # Approximate + + if age_months < 1: + age_months = 1 # Avoid division by zero + + activity_details["age_months"] = age_months + else: + age_months = 1 + + # Check recent commits (last 3 months) + recent_commits = 0 + if "commits" in repo_data: + commits = repo_data["commits"] + three_months_ago = datetime.datetime.now(datetime.timezone.utc) - relativedelta(months=3) + + for commit in commits: + if commit.get("date"): + commit_date = datetime.datetime.fromisoformat(commit["date"].replace('Z', '+00:00')) + if commit_date >= three_months_ago: + recent_commits += 1 + + activity_details["recent_commits"] = recent_commits + activity_score += min(10, recent_commits / 10) # Up to 10 points for recent commits + + # Check recent PRs and issues (last 3 months) + recent_prs = 0 + if "pull_requests" in repo_data: + prs = repo_data["pull_requests"] + three_months_ago = datetime.datetime.now(datetime.timezone.utc) - relativedelta(months=3) + + for pr in prs: + if pr.get("created_at"): + pr_date = datetime.datetime.fromisoformat(pr["created_at"].replace('Z', '+00:00')) + if pr_date >= three_months_ago: + recent_prs += 1 + + activity_details["recent_prs"] = recent_prs + activity_score += min(5, recent_prs / 5) # Up to 5 points for recent PRs + + recent_issues = 0 + if "issues" in repo_data: + issues = [issue for issue in repo_data["issues"] if not issue.get("pull_request")] + three_months_ago = datetime.datetime.now(datetime.timezone.utc) - relativedelta(months=3) + + for issue in issues: + if issue.get("created_at"): + issue_date = datetime.datetime.fromisoformat(issue["created_at"].replace('Z', '+00:00')) + if issue_date >= three_months_ago: + recent_issues += 1 + + activity_details["recent_issues"] = recent_issues + activity_score += min(5, recent_issues / 5) # Up to 5 points for recent issues + + # Check release frequency + if "releases" in repo_data: + releases = repo_data["releases"] + release_count = len(releases) + + # Calculate releases per month + releases_per_month = release_count / max(1, age_months) + activity_details["releases_per_month"] = releases_per_month + activity_score += min(5, releases_per_month * 2.5) # Up to 5 points for regular releases + + # Determine activity level + activity_level = "None" + if activity_score >= 20: + activity_level = "Very High" + elif activity_score >= 15: + activity_level = "High" + elif activity_score >= 10: + activity_level = "Medium" + elif activity_score >= 5: + activity_level = "Low" + elif activity_score > 0: + activity_level = "Very Low" + + return { + "score": activity_score, + "level": activity_level, + "details": activity_details, + } + + def _analyze_code_complexity(self, repo_data: Dict[str, Any]) -> Dict[str, Any]: + """Estimate code complexity based on available metrics.""" + complexity = {} + + # Analyze file distribution + if "file_distribution" in repo_data: + file_types = repo_data["file_distribution"] + total_files = sum(file_types.values()) + + code_files = sum( + count for ext, count in file_types.items() + if ext in self.config.all_code_extensions() + ) + + complexity["file_counts"] = { + "total_files": total_files, + "code_files": code_files, + } + + # Analyze PR complexity + if "pull_requests" in repo_data: + prs = repo_data["pull_requests"] + + # Get average changes per PR + additions = [pr.get("additions", 0) for pr in prs if pr.get("additions") is not None] + deletions = [pr.get("deletions", 0) for pr in prs if pr.get("deletions") is not None] + changed_files = [pr.get("changed_files", 0) for pr in prs if pr.get("changed_files") is not None] + + if additions and deletions and changed_files: + avg_additions = sum(additions) / len(additions) + avg_deletions = sum(deletions) / len(deletions) + avg_changed_files = sum(changed_files) / len(changed_files) + + complexity["pr_complexity"] = { + "avg_additions": avg_additions, + "avg_deletions": avg_deletions, + "avg_changed_files": avg_changed_files, + } + + # Estimate complexity score + pr_complexity_score = min(10, (avg_additions + avg_deletions) / 100) + complexity["pr_complexity_score"] = pr_complexity_score + + # Check dependency complexity + dependency_complexity_score = 0 + if "commit_insights" in repo_data.get("insights", {}): + commit_messages = [ + commit.get("commit_message", "").lower() + for commit in repo_data.get("commits", []) + ] + + # Check for dependency-related keywords + dependency_keywords = ["dependency", "dependencies", "upgrade", "update", "version", "package"] + dependency_commits = sum( + 1 for message in commit_messages + if any(keyword in message for keyword in dependency_keywords) + ) + + dependency_ratio = dependency_commits / len(commit_messages) if commit_messages else 0 + dependency_complexity_score = min(5, dependency_ratio * 20) # Up to 5 points + + complexity["dependency_complexity"] = { + "dependency_commits": dependency_commits, + "dependency_ratio": dependency_ratio, + "score": dependency_complexity_score, + } + + # Overall complexity score + overall_score = 0 + contributors = len(repo_data.get("contributors", [])) + if contributors > 0: + contributor_score = min(5, contributors / 10) # Up to 5 points + overall_score += contributor_score + + if "pr_complexity_score" in complexity: + overall_score += complexity["pr_complexity_score"] + + overall_score += dependency_complexity_score + + # Code size complexity + if "languages" in repo_data: + languages = repo_data["languages"] + total_bytes = sum(languages.values()) if languages else 0 + + # Size points based on code size in MB + size_mb = total_bytes / (1024 * 1024) + size_score = min(10, size_mb / 5) # Up to 10 points for large codebases + overall_score += size_score + + complexity["code_size"] = { + "total_bytes": total_bytes, + "size_mb": size_mb, + "score": size_score, + } + + # Determine complexity level + complexity_level = "Low" + if overall_score >= 25: + complexity_level = "Very High" + elif overall_score >= 20: + complexity_level = "High" + elif overall_score >= 15: + complexity_level = "Medium-High" + elif overall_score >= 10: + complexity_level = "Medium" + elif overall_score >= 5: + complexity_level = "Low-Medium" + + complexity["overall"] = { + "score": overall_score, + "level": complexity_level, + } + + return complexity + + def _analyze_community_health(self, repo_data: Dict[str, Any]) -> Dict[str, Any]: + """Analyze the community health of the repository.""" + health = {} + + # Calculate issue responsiveness + if "issues" in repo_data: + issues = repo_data["issues"] + closed_issues = [issue for issue in issues if issue.get("state") == "closed"] + + if issues: + closure_rate = len(closed_issues) / len(issues) + health["issue_closure_rate"] = closure_rate + + # Calculate average time to close + resolution_times = [] + for issue in closed_issues: + if issue.get("created_at") and issue.get("closed_at"): + created = datetime.datetime.fromisoformat(issue["created_at"].replace('Z', '+00:00')) + closed = datetime.datetime.fromisoformat(issue["closed_at"].replace('Z', '+00:00')) + resolution_time = (closed - created).total_seconds() / 3600 # hours + resolution_times.append(resolution_time) + + if resolution_times: + avg_resolution_time = sum(resolution_times) / len(resolution_times) + health["avg_issue_resolution_time_hours"] = avg_resolution_time + + # Calculate PR review responsiveness + if "pull_requests" in repo_data: + prs = repo_data["pull_requests"] + merged_prs = [pr for pr in prs if pr.get("merged")] + + if prs: + merge_rate = len(merged_prs) / len(prs) + health["pr_merge_rate"] = merge_rate + + # Calculate average time to merge + merge_times = [] + for pr in merged_prs: + if pr.get("created_at") and pr.get("merged_at"): + created = datetime.datetime.fromisoformat(pr["created_at"].replace('Z', '+00:00')) + merged = datetime.datetime.fromisoformat(pr["merged_at"].replace('Z', '+00:00')) + merge_time = (merged - created).total_seconds() / 3600 # hours + merge_times.append(merge_time) + + if merge_times: + avg_merge_time = sum(merge_times) / len(merge_times) + health["avg_pr_merge_time_hours"] = avg_merge_time + + # Check for community guidelines + community_files = [ + "CONTRIBUTING.md", + "CODE_OF_CONDUCT.md", + "SECURITY.md", + "SUPPORT.md", + "GOVERNANCE.md", + ] + + community_file_presence = {} + if "file_distribution" in repo_data: + file_paths = [] + for item in repo_data.get("file_distribution", {}): + file_paths.append(item) + + for community_file in community_files: + present = any(community_file.lower() in path.lower() for path in file_paths) + community_file_presence[community_file] = present + + health["community_guidelines"] = community_file_presence + + # Calculate contributor diversity + if "contributors" in repo_data: + contributors = repo_data["contributors"] + + if contributors: + # Calculate Gini coefficient for contribution distribution + gini = self._calculate_gini([c.get("contributions", 0) for c in contributors]) + health["contributor_gini"] = gini + + # Interpret Gini coefficient + if gini < 0.4: + diversity_level = "High" + elif gini < 0.6: + diversity_level = "Medium" + else: + diversity_level = "Low" + + health["contributor_diversity"] = diversity_level + + # Calculate overall health score + health_score = 0 + + # Points for issue responsiveness + if "issue_closure_rate" in health: + health_score += health["issue_closure_rate"] * 10 # Up to 10 points + + # Points for PR responsiveness + if "pr_merge_rate" in health: + health_score += health["pr_merge_rate"] * 10 # Up to 10 points + + # Points for community guidelines + guideline_count = sum(1 for present in community_file_presence.values() if present) + health_score += guideline_count * 2 # Up to 10 points + + # Points for contributor diversity + if "contributor_gini" in health: + diversity_score = 10 * (1 - health["contributor_gini"]) # Up to 10 points + health_score += diversity_score + + # Determine health level + health_level = "Poor" + if health_score >= 30: + health_level = "Excellent" + elif health_score >= 25: + health_level = "Very Good" + elif health_score >= 20: + health_level = "Good" + elif health_score >= 15: + health_level = "Fair" + elif health_score >= 10: + health_level = "Needs Improvement" + + health["overall"] = { + "score": health_score, + "level": health_level, + } + + return health + + def generate_visualizations(self, repo_data: Dict[str, Any], insights: Dict[str, Any]) -> Dict[str, plt.Figure]: + """ + Generate visualizations of repository data. + + Returns: + Dict of visualization figures + """ + if not self.config.generate_visualizations: + return {} + + figures = {} + + # Create visualizations + lang_fig = self._visualize_language_distribution(repo_data) + if lang_fig: + figures["language_distribution"] = lang_fig + + commit_figs = self._visualize_commit_activity(repo_data, insights) + figures.update(commit_figs) + + contrib_figs = self._visualize_contributor_activity(repo_data, insights) + figures.update(contrib_figs) + + issue_figs = self._visualize_issues_and_prs(repo_data, insights) + figures.update(issue_figs) + + # Add interactive visualizations with Plotly + plotly_figs = self._generate_plotly_visualizations(repo_data, insights) + figures.update(plotly_figs) + + # Generate collaboration network + collab_fig = self._visualize_collaboration_network(repo_data, insights) + if collab_fig: + figures["collaboration_network"] = collab_fig + + return figures + + def _visualize_language_distribution(self, repo_data: Dict[str, Any]) -> Optional[plt.Figure]: + """Create a visualization of language distribution.""" + languages = repo_data.get("languages", {}) + if not languages: + return None + + # Create a pie chart of language distribution + fig, ax = plt.subplots(figsize=(10, 6)) + total = sum(languages.values()) + + # Filter out small languages for better visualization + threshold = total * 0.01 # 1% threshold + other_sum = sum(size for lang, size in languages.items() if size < threshold) + filtered_languages = {lang: size for lang, size in languages.items() if size >= threshold} + if other_sum > 0: + filtered_languages["Other"] = other_sum + + sizes = list(filtered_languages.values()) + labels = list(filtered_languages.keys()) + + wedges, texts, autotexts = ax.pie( + sizes, + labels=labels, + autopct='%1.1f%%', + startangle=90, + shadow=False, + textprops={'fontsize': 9}, # Smaller font for better fit + wedgeprops={'linewidth': 1, 'edgecolor': 'white'} # Add white edge + ) + + # Make the percentage labels more readable + for autotext in autotexts: + autotext.set_color('white') + autotext.set_fontweight('bold') + + ax.axis('equal') + plt.title(f"Language Distribution", fontsize=16) + plt.tight_layout() + + return fig + + def _visualize_commit_activity(self, repo_data: Dict[str, Any], insights: Dict[str, Any]) -> Dict[str, plt.Figure]: + """Create visualizations of commit activity.""" + figures = {} + + commit_activity = repo_data.get("commit_activity", {}) + weekly_commits = commit_activity.get("weekly_commits", []) + + if weekly_commits: + # Extract weeks and commit counts + weeks = [item["week"] for item in weekly_commits] + commits = [item["total"] for item in weekly_commits] + + # Create a time series plot + fig, ax = plt.subplots(figsize=(12, 6)) + ax.plot(weeks, commits, marker='o', linestyle='-', color='blue', alpha=0.7) + + # Add trend line + z = np.polyfit(range(len(weeks)), commits, 1) + p = np.poly1d(z) + ax.plot(weeks, p(range(len(weeks))), "r--", alpha=0.7) + + ax.set_title("Weekly Commit Activity", fontsize=16) + ax.set_xlabel("Week") + ax.set_ylabel("Number of Commits") + plt.xticks(rotation=45) + ax.grid(True, linestyle='--', alpha=0.7) + + # Show only some x-axis labels to avoid crowding + if len(weeks) > 20: + every_nth = len(weeks) // 10 + for n, label in enumerate(ax.xaxis.get_ticklabels()): + if n % every_nth != 0: + label.set_visible(False) + + plt.tight_layout() + + figures["weekly_commits"] = fig + + # Visualize code frequency if available + code_frequency = commit_activity.get("code_frequency", []) + if code_frequency: + weeks = [item["week"] for item in code_frequency] + additions = [item["additions"] for item in code_frequency] + deletions = [item["deletions"] for item in code_frequency] + + fig, ax = plt.subplots(figsize=(12, 6)) + ax.plot(weeks, additions, marker='o', linestyle='-', color='green', label='Additions') + ax.plot(weeks, deletions, marker='o', linestyle='-', color='red', label='Deletions') + ax.set_title("Code Frequency", fontsize=16) + ax.set_xlabel("Week") + ax.set_ylabel("Lines Changed") + plt.xticks(rotation=45) + ax.legend() + ax.grid(True, linestyle='--', alpha=0.7) + + # Show only some x-axis labels to avoid crowding + if len(weeks) > 20: + every_nth = len(weeks) // 10 + for n, label in enumerate(ax.xaxis.get_ticklabels()): + if n % every_nth != 0: + label.set_visible(False) + + plt.tight_layout() + + figures["code_frequency"] = fig + + # Commits by weekday + if "commit_insights" in insights: + commit_insights = insights["commit_insights"] + by_weekday = commit_insights.get("commit_time_patterns", {}).get("by_weekday", {}) + + if by_weekday: + fig, ax = plt.subplots(figsize=(10, 6)) + weekdays = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"] + counts = [by_weekday.get(day, 0) for day in weekdays] + + # Create gradient colors based on commit counts + colors = plt.cm.Blues(np.array(counts) / max(counts)) + + ax.bar(weekdays, counts, color=colors) + ax.set_title("Commits by Day of Week", fontsize=16) + ax.set_xlabel("Day of Week") + ax.set_ylabel("Number of Commits") + ax.grid(True, axis='y', linestyle='--', alpha=0.7) + plt.tight_layout() + + figures["commits_by_weekday"] = fig + + # Commits by hour + by_hour = commit_insights.get("commit_time_patterns", {}).get("by_hour", {}) + + if by_hour: + fig, ax = plt.subplots(figsize=(12, 6)) + hours = sorted(by_hour.keys()) + counts = [by_hour[hour] for hour in hours] + + # Create gradient colors based on commit counts + colors = plt.cm.Greens(np.array(counts) / max(counts)) + + ax.bar(hours, counts, color=colors) + ax.set_title("Commits by Hour of Day (UTC)", fontsize=16) + ax.set_xlabel("Hour") + ax.set_ylabel("Number of Commits") + ax.set_xticks(range(0, 24, 2)) + ax.grid(True, axis='y', linestyle='--', alpha=0.7) + plt.tight_layout() + + figures["commits_by_hour"] = fig + + return figures + + def _visualize_contributor_activity(self, repo_data: Dict[str, Any], insights: Dict[str, Any]) -> Dict[str, plt.Figure]: + """Create visualizations of contributor activity.""" + figures = {} + + contributors = repo_data.get("contributors", []) + + if contributors: + # Create a bar chart of top contributors + contributors_sorted = sorted(contributors, key=lambda x: x.get("contributions", 0), reverse=True) + top_n = min(10, len(contributors_sorted)) + + fig, ax = plt.subplots(figsize=(12, 6)) + names = [c.get("login", "Unknown") for c in contributors_sorted[:top_n]] + contributions = [c.get("contributions", 0) for c in contributors_sorted[:top_n]] + + # Create gradient colors based on contribution counts + colors = plt.cm.viridis(np.array(contributions) / max(contributions)) + + bars = ax.bar(names, contributions, color=colors) + ax.set_title("Top Contributors by Commit Count", fontsize=16) + ax.set_xlabel("Contributor") + ax.set_ylabel("Number of Commits") + plt.xticks(rotation=45, ha='right') + ax.grid(True, axis='y', linestyle='--', alpha=0.7) + + # Add value labels on top of bars + for bar in bars: + height = bar.get_height() + ax.annotate(f'{height}', + xy=(bar.get_x() + bar.get_width() / 2, height), + xytext=(0, 3), # 3 points vertical offset + textcoords="offset points", + ha='center', va='bottom') + + plt.tight_layout() + + figures["top_contributors"] = fig + + # Visualize contribution distribution if insights available + if "contributor_insights" in insights: + contributor_insights = insights["contributor_insights"] + distribution = contributor_insights.get("contribution_distribution", {}) + if distribution: + # Create a pie chart showing contributor concentration + fig, ax = plt.subplots(figsize=(10, 6)) + + percentiles = [ + distribution.get("contributors_for_20_percent", 0), + distribution.get("contributors_for_50_percent", 0) - distribution.get("contributors_for_20_percent", 0), + distribution.get("contributors_for_80_percent", 0) - distribution.get("contributors_for_50_percent", 0), + len(contributors) - distribution.get("contributors_for_80_percent", 0) + ] + + labels = [ + f"Top {percentiles[0]} contributors (0-20%)", + f"Next {percentiles[1]} contributors (20-50%)", + f"Next {percentiles[2]} contributors (50-80%)", + f"Remaining {percentiles[3]} contributors (80-100%)" + ] + + wedges, texts, autotexts = ax.pie( + [20, 30, 30, 20], # Fixed percentages for visualization + labels=labels, + autopct='%1.1f%%', + startangle=90, + shadow=False, + explode=(0.1, 0, 0, 0), # Emphasize the top contributors + wedgeprops={'linewidth': 1, 'edgecolor': 'white'} # Add white edge + ) + + # Make the percentage labels more readable + for autotext in autotexts: + autotext.set_color('white') + autotext.set_fontweight('bold') + + ax.axis('equal') + ax.set_title("Contribution Distribution", fontsize=16) + plt.tight_layout() + + figures["contribution_distribution"] = fig + + return figures + + def _visualize_issues_and_prs(self, repo_data: Dict[str, Any], insights: Dict[str, Any]) -> Dict[str, plt.Figure]: + """Create visualizations of issues and pull requests.""" + figures = {} + + # Visualize issue distribution if available + if "issue_insights" in insights: + issue_insights = insights["issue_insights"] + + # Issues by state + by_state = issue_insights.get("by_state", {}) + if by_state: + fig, ax = plt.subplots(figsize=(8, 6)) + states = list(by_state.keys()) + counts = list(by_state.values()) + + colors = ['red' if state.lower() == 'open' else 'green' for state in states] + ax.bar(states, counts, color=colors) + ax.set_title("Issues by State", fontsize=16) + ax.set_xlabel("State") + ax.set_ylabel("Count") + + # Add count labels on top of bars + for i, v in enumerate(counts): + ax.text(i, v + 0.5, str(v), ha='center') + + ax.grid(True, axis='y', linestyle='--', alpha=0.7) + plt.tight_layout() + + figures["issues_by_state"] = fig + + # Issues by month + by_month = issue_insights.get("by_month", {}) + if by_month: + fig, ax = plt.subplots(figsize=(12, 6)) + months = sorted(by_month.keys()) + counts = [by_month[month] for month in months] + + ax.plot(months, counts, marker='o', linestyle='-', color='blue') + + # Add trend line + z = np.polyfit(range(len(months)), counts, 1) + p = np.poly1d(z) + ax.plot(months, p(range(len(months))), "r--", alpha=0.7) + + ax.set_title("Issues Created by Month", fontsize=16) + ax.set_xlabel("Month") + ax.set_ylabel("Number of Issues") + plt.xticks(rotation=45) + ax.grid(True, linestyle='--', alpha=0.7) + + # Show only some x-axis labels to avoid crowding + if len(months) > 12: + every_nth = max(1, len(months) // 12) + for n, label in enumerate(ax.xaxis.get_ticklabels()): + if n % every_nth != 0: + label.set_visible(False) + + plt.tight_layout() + + figures["issues_by_month"] = fig + + # Issues by label + by_label = issue_insights.get("by_label", {}) + if by_label and len(by_label) > 1: + fig, ax = plt.subplots(figsize=(12, 6)) + labels = list(by_label.keys()) + counts = list(by_label.values()) + + # Sort by count + sorted_indices = np.argsort(counts)[::-1] + labels = [labels[i] for i in sorted_indices] + counts = [counts[i] for i in sorted_indices] + + # Limit to top 10 + if len(labels) > 10: + labels = labels[:10] + counts = counts[:10] + + # Create gradient colors + colors = plt.cm.tab10(np.linspace(0, 1, len(labels))) + + bars = ax.barh(labels, counts, color=colors) + ax.set_title("Top Issue Labels", fontsize=16) + ax.set_xlabel("Count") + ax.set_ylabel("Label") + + # Add count labels + for bar in bars: + width = bar.get_width() + ax.annotate(f'{int(width)}', + xy=(width, bar.get_y() + bar.get_height() / 2), + xytext=(3, 0), # 3 points horizontal offset + textcoords="offset points", + ha='left', va='center') + + ax.grid(True, axis='x', linestyle='--', alpha=0.7) + plt.tight_layout() + + figures["issues_by_label"] = fig + + # Visualize PR insights if available + if "pr_insights" in insights and "pr_code_change_stats" in insights: + pr_code_stats = insights["pr_code_change_stats"] + + # Additions and deletions by PR + if "additions" in pr_code_stats and "deletions" in pr_code_stats: + fig, ax = plt.subplots(figsize=(10, 6)) + + categories = ["Mean", "Median", "Max"] + additions = [ + pr_code_stats["additions"].get("mean", 0), + pr_code_stats["additions"].get("median", 0), + pr_code_stats["additions"].get("max", 0) / 10 # Scale down for visibility + ] + deletions = [ + pr_code_stats["deletions"].get("mean", 0), + pr_code_stats["deletions"].get("median", 0), + pr_code_stats["deletions"].get("max", 0) / 10 # Scale down for visibility + ] + + x = range(len(categories)) + width = 0.35 + + addition_bars = ax.bar([i - width/2 for i in x], additions, width, label='Additions', color='green') + deletion_bars = ax.bar([i + width/2 for i in x], deletions, width, label='Deletions', color='red') + + ax.set_xlabel('Metric') + ax.set_ylabel('Lines of Code') + ax.set_title('PR Code Change Statistics') + plt.xticks(x, categories) + ax.legend() + + # Add value labels + for bars in [addition_bars, deletion_bars]: + for bar in bars: + height = bar.get_height() + ax.annotate(f'{int(height)}', + xy=(bar.get_x() + bar.get_width() / 2, height), + xytext=(0, 3), # 3 points vertical offset + textcoords="offset points", + ha='center', va='bottom') + + if "max" in pr_code_stats["additions"]: + plt.annotate(f"Max: {int(pr_code_stats['additions']['max'])}", + (2 - width/2, additions[2] + 5), + textcoords="offset points", + xytext=(0,10), + ha='center') + + if "max" in pr_code_stats["deletions"]: + plt.annotate(f"Max: {int(pr_code_stats['deletions']['max'])}", + (2 + width/2, deletions[2] + 5), + textcoords="offset points", + xytext=(0,10), + ha='center') + + plt.tight_layout() + figures["pr_code_changes"] = fig + + return figures + + def _generate_plotly_visualizations(self, repo_data: Dict[str, Any], insights: Dict[str, Any]) -> Dict[str, Any]: + """Generate interactive Plotly visualizations.""" + plotly_figures = {} + + # Activity heatmap (commits by day and hour) + if "commits" in repo_data: + commits = repo_data["commits"] + dates = [] + + for commit in commits: + date_str = commit.get("date") + if date_str: + try: + date = datetime.datetime.fromisoformat(date_str.replace('Z', '+00:00')) + dates.append(date) + except ValueError: + pass + + if dates: + # Group by day of week and hour + day_hour_counts = defaultdict(int) + for date in dates: + day_hour_counts[(date.weekday(), date.hour)] += 1 + + # Create 2D array for heatmap + days = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"] + hours = list(range(24)) + + z = np.zeros((7, 24)) + for (day, hour), count in day_hour_counts.items(): + z[day][hour] = count + + # Create heatmap + fig = go.Figure(data=go.Heatmap( + z=z, + x=hours, + y=days, + colorscale='Viridis', + hoverongaps=False, + hovertemplate='Day: %{y}
Hour: %{x}
Commits: %{z}' + )) + + fig.update_layout( + title='Commit Activity Heatmap', + xaxis_title='Hour of Day (UTC)', + yaxis_title='Day of Week', + yaxis={'categoryorder': 'array', 'categoryarray': days}, + width=900, + height=500 + ) + + plotly_figures["commit_heatmap"] = fig + + # Language breakdown treemap + if "languages" in repo_data: + languages = repo_data["languages"] + + if languages: + # Create data for treemap + labels = list(languages.keys()) + values = list(languages.values()) + + fig = go.Figure(go.Treemap( + labels=labels, + values=values, + parents=[""] * len(labels), + marker_colorscale='RdBu', + hovertemplate='Language: %{label}
Bytes: %{value}
Percentage: %{percentRoot:.2%}' + )) + + fig.update_layout( + title='Repository Language Breakdown', + width=800, + height=600 + ) + + plotly_figures["language_treemap"] = fig + + # Issue/PR timeline + issues = repo_data.get("issues", []) + prs = repo_data.get("pull_requests", []) + + if issues or prs: + # Create timeline data + timeline_data = [] + + for issue in issues: + if not issue.get("pull_request") and issue.get("created_at"): + try: + created_date = datetime.datetime.fromisoformat(issue["created_at"].replace('Z', '+00:00')) + timeline_data.append({ + "date": created_date, + "type": "Issue", + "id": issue.get("number", ""), + "title": issue.get("title", ""), + "state": issue.get("state", "") + }) + except ValueError: + pass + + for pr in prs: + if pr.get("created_at"): + try: + created_date = datetime.datetime.fromisoformat(pr["created_at"].replace('Z', '+00:00')) + timeline_data.append({ + "date": created_date, + "type": "PR", + "id": pr.get("number", ""), + "title": pr.get("title", ""), + "state": pr.get("state", "") + }) + except ValueError: + pass + + if timeline_data: + # Sort by date + timeline_data.sort(key=lambda x: x["date"]) + + # Create DataFrame for easier plotting + df = pd.DataFrame(timeline_data) + + # Calculate cumulative counts + df["cumulative_issues"] = (df["type"] == "Issue").cumsum() + df["cumulative_prs"] = (df["type"] == "PR").cumsum() + + # Create plot + fig = go.Figure() + fig.add_trace(go.Scatter( + x=df["date"], + y=df["cumulative_issues"], + mode='lines', + name='Issues', + line=dict(color='red', width=2) + )) + + fig.add_trace(go.Scatter( + x=df["date"], + y=df["cumulative_prs"], + mode='lines', + name='Pull Requests', + line=dict(color='blue', width=2) + )) + + fig.update_layout( + title='Cumulative Issues and Pull Requests Over Time', + xaxis_title='Date', + yaxis_title='Count', + legend=dict( + yanchor="top", + y=0.99, + xanchor="left", + x=0.01 + ), + width=900, + height=500 + ) + + plotly_figures["issue_pr_timeline"] = fig + + return plotly_figures + + def _visualize_collaboration_network(self, repo_data: Dict[str, Any], insights: Dict[str, Any]) -> Optional[plt.Figure]: + """Create a visualization of the collaboration network.""" + if "pull_requests" not in repo_data or "contributors" not in repo_data: + return None + + prs = repo_data["pull_requests"] + contributors = repo_data["contributors"] + + # Create a network of collaborations + G = nx.Graph() + + # Add nodes (contributors) + contributor_logins = [c.get("login") for c in contributors if c.get("login")] + for login in contributor_logins: + G.add_node(login) + + # Add edges (collaborations through PRs) + collaborations = defaultdict(int) + + for pr in prs: + author = pr.get("user_login") + if not author or author not in contributor_logins: + continue + + # Consider reviewers as collaborators + reviewers = pr.get("requested_reviewers", []) + + for reviewer in reviewers: + if reviewer in contributor_logins and reviewer != author: + pair = tuple(sorted([author, reviewer])) + collaborations[pair] += 1 + + for (author, reviewer), weight in collaborations.items(): + G.add_edge(author, reviewer, weight=weight) + + if not G.edges(): + return None + + # Draw the collaboration network + fig, ax = plt.subplots(figsize=(12, 10)) + + # Calculate node sizes based on contributions + contributor_dict = {c.get("login"): c.get("contributions", 1) for c in contributors if c.get("login")} + node_sizes = [contributor_dict.get(node, 1) * 30 for node in G.nodes()] + + # Calculate edge widths based on collaboration count + edge_widths = [G[u][v]['weight'] * 0.5 for u, v in G.edges()] + + # Calculate node colors based on contributor roles + # (assign different colors to different types of contributors) + color_map = [] + for node in G.nodes(): + degree = G.degree(node) + if degree > 5: + color_map.append('red') # Central collaborators + elif degree > 2: + color_map.append('blue') # Active collaborators + else: + color_map.append('green') # Peripheral contributors + + # Position nodes using a force-directed layout + pos = nx.spring_layout(G, seed=42) + + # Draw the network + nx.draw_networkx_nodes(G, pos, node_size=node_sizes, node_color=color_map, alpha=0.8) + nx.draw_networkx_edges(G, pos, width=edge_widths, alpha=0.5, edge_color='gray') + nx.draw_networkx_labels(G, pos, font_size=8, font_family='sans-serif') + + ax.set_title("Collaboration Network", fontsize=16) + ax.axis('off') + plt.tight_layout() + + return fig + + def analyze_repo(self, owner: str, repo_name: str) -> Dict[str, Any]: + """ + Main method to analyze a repository. + + Args: + owner: GitHub username or organization + repo_name: Name of the repository + + Returns: + Dict containing all repository data and insights + """ + start_time = time.time() + logger.info(f"Starting analysis of {owner}/{repo_name}") + + repo_path = f"{owner}/{repo_name}" + repo = self.client.get_repo(repo_path) + + repo_data = {} + + # Collect basic repository metadata + repo_data["repo_details"] = self.get_repo_details(repo) + + # Define data collection tasks + tasks = [ + ("contributors", lambda: self.get_contributors(repo)), + ("languages", lambda: self.get_languages(repo)), + ("issues", lambda: self.get_issues(repo, "all")), + ("pull_requests", lambda: self.get_pull_requests(repo, "all")), + ("commits", lambda: self.get_commits(repo)), + ("readme", lambda: self.get_readme(repo)), + ("branches", lambda: self.get_branches(repo)), + ("releases", lambda: self.get_releases(repo)), + ("workflows", lambda: self.get_workflows(repo)), + ("file_distribution", lambda: self.get_file_distribution(repo)), + ("collaborators", lambda: self.get_collaborators(repo)), + ("commit_activity", lambda: self.analyze_commit_activity(repo)), + ("contributor_activity", lambda: self.analyze_contributor_activity(repo)), + ] + + # Search for security and quality indicators + important_terms = [ + "security", "vulnerability", "auth", "password", "token", + "test", "spec", "fixture", "mock", "stub", + "TODO", "FIXME", "HACK", "XXX" + ] + tasks.append(("code_search", lambda: self.search_code(repo, important_terms))) + + # Collect data with progress bar + with tqdm(total=len(tasks), desc="Collecting repository data") as pbar: + for key, task_func in tasks: + try: + result = task_func() + repo_data[key] = result + except Exception as e: + logger.error(f"Error collecting {key}: {e}") + finally: + pbar.update(1) + + # Generate insights from collected data + repo_data["insights"] = self.generate_insights(repo_data) + + # Generate visualizations + if self.config.generate_visualizations: + repo_data["visualizations"] = self.generate_visualizations(repo_data, repo_data["insights"]) + + end_time = time.time() + logger.info(f"Analysis completed in {end_time - start_time:.2f} seconds") + + return repo_data + + +class PDFReportGenerator: + """ + Class for generating comprehensive PDF reports from repository analysis data. + """ + + def __init__(self, repo_data: Dict[str, Any], output_path: str = None): + """Initialize the PDF report generator with repository data.""" + self.repo_data = repo_data + self.output_path = output_path or tempfile.mktemp(suffix='.pdf') + self.styles = getSampleStyleSheet() + + # Create custom styles + self.styles.add(ParagraphStyle( + name='SectionTitle', + parent=self.styles['Heading2'], + fontSize=14, + spaceAfter=10 + )) + + self.styles.add(ParagraphStyle( + name='SubsectionTitle', + parent=self.styles['Heading3'], + fontSize=12, + spaceAfter=6 + )) + + self.styles.add(ParagraphStyle( + name='MetricsTable', + parent=self.styles['Normal'], + fontSize=10, + alignment=TA_LEFT + )) + + self.styles.add(ParagraphStyle( + name='Small', + parent=self.styles['Normal'], + fontSize=8 + )) + + self.styles.add(ParagraphStyle( + name='ReportTitle', + parent=self.styles['Title'], + fontSize=24, + alignment=TA_CENTER, + spaceAfter=20 + )) + + def generate_report(self) -> str: + """ + Generate a PDF report of repository analysis. + + Returns: + str: Path to the generated PDF file + """ + doc = SimpleDocTemplate( + self.output_path, + pagesize=letter, + rightMargin=72, leftMargin=72, + topMargin=72, bottomMargin=72 + ) + + elements = [] + + # Add report title + repo_name = self.repo_data.get("repo_details", {}).get("full_name", "Repository") + elements.append(Paragraph(f"GitHub Repository Analysis: {repo_name}", self.styles['ReportTitle'])) + + # Add report generation date + report_date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + elements.append(Paragraph(f"Report generated on: {report_date}", self.styles['Normal'])) + elements.append(Spacer(1, 20)) + + # Add repository overview section + elements.extend(self._create_repo_overview()) + elements.append(PageBreak()) + + # Add activity analysis section + elements.extend(self._create_activity_analysis()) + elements.append(PageBreak()) + + # Add code analysis section + elements.extend(self._create_code_analysis()) + elements.append(PageBreak()) + + # Add community analysis section + elements.extend(self._create_community_analysis()) + + # Add visualizations if available + if self.repo_data.get("visualizations"): + elements.append(PageBreak()) + elements.extend(self._create_visualization_pages()) + + # Add summary and recommendations + elements.append(PageBreak()) + elements.extend(self._create_summary_and_recommendations()) + + # Build the PDF + doc.build(elements) + + return self.output_path + + def _create_repo_overview(self) -> List[Any]: + """Create repository overview section of the report.""" + elements = [] + + # Section title + elements.append(Paragraph("Repository Overview", self.styles['Heading1'])) + elements.append(Spacer(1, 10)) + + # Basic repository information + repo_details = self.repo_data.get("repo_details", {}) + + # Create a table for repository details + data = [ + ["Name", repo_details.get("name", "N/A")], + ["Full Name", repo_details.get("full_name", "N/A")], + ["Description", repo_details.get("description", "No description")], + ["URL", repo_details.get("html_url", "N/A")], + ["Primary Language", repo_details.get("language", "Not specified")], + ["Created On", repo_details.get("created_at", "N/A")], + ["Last Updated", repo_details.get("updated_at", "N/A")], + ["Stars", str(repo_details.get("stargazers_count", 0))], + ["Forks", str(repo_details.get("forks_count", 0))], + ["Watchers", str(repo_details.get("watchers_count", 0))], + ["Open Issues", str(repo_details.get("open_issues_count", 0))], + ["License", repo_details.get("license", "Not specified")], + ["Fork", "Yes" if repo_details.get("fork", False) else "No"], + ["Archived", "Yes" if repo_details.get("archived", False) else "No"], + ["Visibility", repo_details.get("visibility", "N/A").capitalize()], + ] + + table = Table(data, colWidths=[100, 350]) + table.setStyle(TableStyle([ + ('BACKGROUND', (0, 0), (0, -1), colors.lightgrey), + ('TEXTCOLOR', (0, 0), (0, -1), colors.black), + ('ALIGN', (0, 0), (0, -1), 'RIGHT'), + ('ALIGN', (1, 0), (1, -1), 'LEFT'), + ('VALIGN', (0, 0), (-1, -1), 'MIDDLE'), + ('FONTNAME', (0, 0), (0, -1), 'Helvetica-Bold'), + ('FONTSIZE', (0, 0), (-1, -1), 10), + ('BOTTOMPADDING', (0, 0), (-1, -1), 6), + ('TOPPADDING', (0, 0), (-1, -1), 6), + ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), + ])) + + elements.append(table) + elements.append(Spacer(1, 20)) + + # Key metrics and insights + elements.append(Paragraph("Key Metrics & Insights", self.styles['SectionTitle'])) + + insights = self.repo_data.get("insights", {}) + + # Repository age + age_days = insights.get("repository_age_days", 0) + age_years = age_days / 365.25 + freshness_days = insights.get("freshness_days", 0) + + age_text = f"Repository Age: {age_years:.1f} years ({int(age_days)} days)" + freshness_text = f"Last Activity: {int(freshness_days)} days ago" + + elements.append(Paragraph(age_text, self.styles['Normal'])) + elements.append(Paragraph(freshness_text, self.styles['Normal'])) + elements.append(Spacer(1, 10)) + + # Activity level + activity_level = insights.get("activity_level", {}) + if activity_level: + activity_text = f"Activity Level: {activity_level.get('level', 'Unknown')} (Score: {activity_level.get('score', 0):.1f}/25)" + elements.append(Paragraph(activity_text, self.styles['Normal'])) + elements.append(Spacer(1, 10)) + + # Code complexity + code_complexity = insights.get("code_complexity", {}).get("overall", {}) + if code_complexity: + complexity_text = f"Code Complexity: {code_complexity.get('level', 'Unknown')} (Score: {code_complexity.get('score', 0):.1f}/30)" + elements.append(Paragraph(complexity_text, self.styles['Normal'])) + elements.append(Spacer(1, 10)) + + # Documentation quality + doc_quality = insights.get("documentation_quality", {}) + if doc_quality: + quality_score = doc_quality.get("score", 0) + quality_level = "High" if quality_score > 0.7 else "Medium" if quality_score > 0.4 else "Low" + doc_text = f"Documentation Quality: {quality_level} (Score: {quality_score:.2f})" + elements.append(Paragraph(doc_text, self.styles['Normal'])) + elements.append(Spacer(1, 10)) + + # Community health + community_health = insights.get("community_health", {}).get("overall", {}) + if community_health: + health_text = f"Community Health: {community_health.get('level', 'Unknown')} (Score: {community_health.get('score', 0):.1f}/40)" + elements.append(Paragraph(health_text, self.styles['Normal'])) + + return elements + + def _create_activity_analysis(self) -> List[Any]: + """Create activity analysis section of the report.""" + elements = [] + + # Section title + elements.append(Paragraph("Activity Analysis", self.styles['Heading1'])) + elements.append(Spacer(1, 10)) + + insights = self.repo_data.get("insights", {}) + + # Commit activity + elements.append(Paragraph("Commit Activity", self.styles['SectionTitle'])) + + commit_insights = insights.get("commit_insights", {}) + if commit_insights: + # Top contributors + top_contributors = commit_insights.get("top_contributors", {}) + if top_contributors: + elements.append(Paragraph("Top Contributors by Commits:", self.styles['SubsectionTitle'])) + + data = [["Contributor", "Commits"]] + for contributor, commits in top_contributors.items(): + data.append([contributor, str(commits)]) + + table = Table(data, colWidths=[200, 100]) + table.setStyle(TableStyle([ + ('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey), + ('TEXTCOLOR', (0, 0), (-1, 0), colors.black), + ('ALIGN', (0, 0), (0, -1), 'LEFT'), + ('ALIGN', (1, 0), (1, -1), 'RIGHT'), + ('VALIGN', (0, 0), (-1, -1), 'MIDDLE'), + ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), + ('FONTSIZE', (0, 0), (-1, -1), 10), + ('BOTTOMPADDING', (0, 0), (-1, -1), 4), + ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), + ])) + + elements.append(table) + elements.append(Spacer(1, 15)) + + # Commit time patterns + time_patterns = commit_insights.get("commit_time_patterns", {}) + if time_patterns: + elements.append(Paragraph("Commit Timing Patterns:", self.styles['SubsectionTitle'])) + + weekday_data = time_patterns.get("by_weekday", {}) + if weekday_data: + day_text = "Most active day: " + max(weekday_data.items(), key=lambda x: x[1])[0] + elements.append(Paragraph(day_text, self.styles['Normal'])) + + hour_data = time_patterns.get("by_hour", {}) + if hour_data and hour_data: + hour = max(hour_data.items(), key=lambda x: x[1])[0] + hour_text = f"Most active hour: {hour}:00 UTC" + elements.append(Paragraph(hour_text, self.styles['Normal'])) + + elements.append(Spacer(1, 10)) + + # Pull Request activity + elements.append(Paragraph("Pull Request Activity", self.styles['SectionTitle'])) + + pr_insights = insights.get("pr_insights", {}) + pr_code_changes = insights.get("pr_code_change_stats", {}) + + if pr_insights or pr_code_changes: + # PR state distribution + state_counts = pr_insights.get("by_state", {}) + if state_counts: + elements.append(Paragraph("Pull Request States:", self.styles['SubsectionTitle'])) + + data = [["State", "Count"]] + for state, count in state_counts.items(): + data.append([state.capitalize(), str(count)]) + + table = Table(data, colWidths=[100, 100]) + table.setStyle(TableStyle([ + ('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey), + ('TEXTCOLOR', (0, 0), (-1, 0), colors.black), + ('ALIGN', (0, 0), (0, -1), 'LEFT'), + ('ALIGN', (1, 0), (1, -1), 'RIGHT'), + ('VALIGN', (0, 0), (-1, -1), 'MIDDLE'), + ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), + ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), + ])) + + elements.append(table) + elements.append(Spacer(1, 15)) + + # PR code change statistics + if pr_code_changes: + elements.append(Paragraph("Pull Request Size Statistics:", self.styles['SubsectionTitle'])) + + # Table for code change stats + data = [["Metric", "Additions", "Deletions", "Files Changed"]] + + metrics = ["mean", "median", "max", "total"] + for metric in metrics: + row = [metric.capitalize()] + for stat_type in ["additions", "deletions", "changed_files"]: + if stat_type in pr_code_changes and metric in pr_code_changes[stat_type]: + value = pr_code_changes[stat_type][metric] + row.append(f"{value:.1f}" if isinstance(value, float) else str(value)) + else: + row.append("N/A") + + data.append(row) + + table = Table(data, colWidths=[80, 80, 80, 80]) + table.setStyle(TableStyle([ + ('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey), + ('TEXTCOLOR', (0, 0), (-1, 0), colors.black), + ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), + ('ALIGN', (0, 0), (0, -1), 'LEFT'), + ('ALIGN', (1, 0), (-1, -1), 'RIGHT'), + ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), + ])) + + elements.append(table) + elements.append(Spacer(1, 15)) + + # Issue activity + elements.append(Paragraph("Issue Activity", self.styles['SectionTitle'])) + + issue_insights = insights.get("issue_insights", {}) + if issue_insights: + # Issue state distribution + state_counts = issue_insights.get("by_state", {}) + if state_counts: + elements.append(Paragraph("Issue States:", self.styles['SubsectionTitle'])) + + data = [["State", "Count"]] + for state, count in state_counts.items(): + data.append([state.capitalize(), str(count)]) + + table = Table(data, colWidths=[100, 100]) + table.setStyle(TableStyle([ + ('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey), + ('TEXTCOLOR', (0, 0), (-1, 0), colors.black), + ('ALIGN', (0, 0), (0, -1), 'LEFT'), + ('ALIGN', (1, 0), (1, -1), 'RIGHT'), + ('VALIGN', (0, 0), (-1, -1), 'MIDDLE'), + ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), + ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), + ])) + + elements.append(table) + elements.append(Spacer(1, 15)) + + # Issue resolution time + resolution_stats = issue_insights.get("resolution_time", {}) + if resolution_stats: + elements.append(Paragraph("Issue Resolution Time (hours):", self.styles['SubsectionTitle'])) + + mean_hours = resolution_stats.get("mean_hours", 0) + median_hours = resolution_stats.get("median_hours", 0) + + if mean_hours > 24: + mean_days = mean_hours / 24 + mean_text = f"Mean: {mean_days:.1f} days" + else: + mean_text = f"Mean: {mean_hours:.1f} hours" + + if median_hours > 24: + median_days = median_hours / 24 + median_text = f"Median: {median_days:.1f} days" + else: + median_text = f"Median: {median_hours:.1f} hours" + + elements.append(Paragraph(mean_text, self.styles['Normal'])) + elements.append(Paragraph(median_text, self.styles['Normal'])) + elements.append(Spacer(1, 10)) + + # Top issue labels + top_labels = issue_insights.get("by_label", {}) + if top_labels: + elements.append(Paragraph("Top Issue Labels:", self.styles['SubsectionTitle'])) + + data = [["Label", "Count"]] + for label, count in list(top_labels.items())[:5]: # Top 5 labels + data.append([label, str(count)]) + + table = Table(data, colWidths=[150, 50]) + table.setStyle(TableStyle([ + ('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey), + ('TEXTCOLOR', (0, 0), (-1, 0), colors.black), + ('ALIGN', (0, 0), (0, -1), 'LEFT'), + ('ALIGN', (1, 0), (1, -1), 'RIGHT'), + ('VALIGN', (0, 0), (-1, -1), 'MIDDLE'), + ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), + ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), + ])) + + elements.append(table) + + return elements + + def _create_code_analysis(self) -> List[Any]: + """Create code analysis section of the report.""" + elements = [] + + # Section title + elements.append(Paragraph("Code Analysis", self.styles['Heading1'])) + elements.append(Spacer(1, 10)) + + # Language distribution + elements.append(Paragraph("Language Distribution", self.styles['SectionTitle'])) + + languages = self.repo_data.get("languages", {}) + insights = self.repo_data.get("insights", {}) + + if languages: + # Sort languages by byte count + sorted_languages = sorted(languages.items(), key=lambda x: x[1], reverse=True) + + # Create language distribution table + data = [["Language", "Bytes", "Percentage"]] + + total_bytes = sum(languages.values()) + for language, bytes_count in sorted_languages[:10]: # Top 10 languages + percentage = (bytes_count / total_bytes) * 100 + data.append([ + language, + f"{bytes_count:,}", + f"{percentage:.1f}%" + ]) + + table = Table(data, colWidths=[120, 120, 80]) + table.setStyle(TableStyle([ + ('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey), + ('TEXTCOLOR', (0, 0), (-1, 0), colors.black), + ('ALIGN', (0, 0), (0, -1), 'LEFT'), + ('ALIGN', (1, 0), (2, -1), 'RIGHT'), + ('VALIGN', (0, 0), (-1, -1), 'MIDDLE'), + ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), + ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), + ])) + + elements.append(table) + elements.append(Spacer(1, 15)) + + # File distribution + elements.append(Paragraph("File Type Distribution", self.styles['SectionTitle'])) + + file_dist = self.repo_data.get("file_distribution", {}) + if file_dist: + # Group extensions by type + file_types = { + "Code": sum(file_dist.get(ext, 0) for ext in self.config.code_extensions), + "Markup": sum(file_dist.get(ext, 0) for ext in self.config.markup_extensions), + "Scripts": sum(file_dist.get(ext, 0) for ext in self.config.script_extensions), + "Data": sum(file_dist.get(ext, 0) for ext in self.config.data_extensions), + "Config": sum(file_dist.get(ext, 0) for ext in self.config.config_extensions), + "Notebooks": sum(file_dist.get(ext, 0) for ext in self.config.notebook_extensions), + "Other": sum(file_dist.get(ext, 0) for ext in self.config.other_extensions) + } + + # Create file type distribution table + data = [["File Type", "Count", "Percentage"]] + + total_files = sum(file_types.values()) + for file_type, count in sorted(file_types.items(), key=lambda x: x[1], reverse=True): + if count > 0: + percentage = (count / total_files) * 100 + data.append([ + file_type, + str(count), + f"{percentage:.1f}%" + ]) + + table = Table(data, colWidths=[120, 80, 80]) + table.setStyle(TableStyle([ + ('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey), + ('TEXTCOLOR', (0, 0), (-1, 0), colors.black), + ('ALIGN', (0, 0), (0, -1), 'LEFT'), + ('ALIGN', (1, 0), (2, -1), 'RIGHT'), + ('VALIGN', (0, 0), (-1, -1), 'MIDDLE'), + ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), + ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), + ])) + + elements.append(table) + elements.append(Spacer(1, 15)) + + # Code complexity analysis + elements.append(Paragraph("Code Complexity Analysis", self.styles['SectionTitle'])) + + code_complexity = insights.get("code_complexity", {}) + if code_complexity: + complexity_overall = code_complexity.get("overall", {}) + elements.append(Paragraph( + f"Overall Complexity: {complexity_overall.get('level', 'Unknown')} (Score: {complexity_overall.get('score', 0):.1f}/30)", + self.styles['Normal'] + )) + elements.append(Spacer(1, 10)) + + # Code size + code_size = code_complexity.get("code_size", {}) + if code_size: + size_mb = code_size.get("size_mb", 0) + elements.append(Paragraph(f"Code Size: {size_mb:.2f} MB", self.styles['Normal'])) + elements.append(Spacer(1, 5)) + + # PR complexity + pr_complexity = code_complexity.get("pr_complexity", {}) + if pr_complexity: + elements.append(Paragraph("Average Pull Request Size:", self.styles['SubsectionTitle'])) + + avg_additions = pr_complexity.get("avg_additions", 0) + avg_deletions = pr_complexity.get("avg_deletions", 0) + avg_files = pr_complexity.get("avg_changed_files", 0) + + elements.append(Paragraph(f"Lines Added: {avg_additions:.1f}", self.styles['Normal'])) + elements.append(Paragraph(f"Lines Deleted: {avg_deletions:.1f}", self.styles['Normal'])) + elements.append(Paragraph(f"Files Changed: {avg_files:.1f}", self.styles['Normal'])) + elements.append(Spacer(1, 10)) + + # CI/CD presence + elements.append(Paragraph("CI/CD Systems", self.styles['SectionTitle'])) + + ci_cd = insights.get("ci_cd_presence", {}) + if ci_cd: + has_ci_cd = ci_cd.get("has_ci_cd", False) + systems = ci_cd.get("ci_cd_systems", {}) + + if has_ci_cd: + elements.append(Paragraph("Detected CI/CD Systems:", self.styles['Normal'])) + + detected_systems = [name for name, present in systems.items() if present] + for system in detected_systems: + elements.append(Paragraph(f"• {system.replace('_', ' ').title()}", self.styles['Normal'])) + else: + elements.append(Paragraph("No CI/CD systems detected", self.styles['Normal'])) + + return elements + + def _create_community_analysis(self) -> List[Any]: + """Create community analysis section of the report.""" + elements = [] + + # Section title + elements.append(Paragraph("Community Analysis", self.styles['Heading1'])) + elements.append(Spacer(1, 10)) + + insights = self.repo_data.get("insights", {}) + + # Contributor insights + elements.append(Paragraph("Contributor Analysis", self.styles['SectionTitle'])) + + contributor_insights = insights.get("contributor_insights", {}) + if contributor_insights: + contributor_count = contributor_insights.get("contributor_count", 0) + total_contributions = contributor_insights.get("total_contributions", 0) + avg_contributions = contributor_insights.get("avg_contributions_per_contributor", 0) + + elements.append(Paragraph(f"Total Contributors: {contributor_count}", self.styles['Normal'])) + elements.append(Paragraph(f"Total Contributions: {total_contributions}", self.styles['Normal'])) + elements.append(Paragraph(f"Average Contributions per Contributor: {avg_contributions:.1f}", self.styles['Normal'])) + elements.append(Spacer(1, 10)) + + # Contribution distribution + distribution = contributor_insights.get("contribution_distribution", {}) + if distribution: + elements.append(Paragraph("Contribution Distribution:", self.styles['SubsectionTitle'])) + + gini = distribution.get("gini_coefficient", 0) + top_percent = distribution.get("top_contributor_percentage", 0) + contributors_20 = distribution.get("contributors_for_20_percent", 0) + contributors_50 = distribution.get("contributors_for_50_percent", 0) + contributors_80 = distribution.get("contributors_for_80_percent", 0) + + # Format distribution metrics + elements.append(Paragraph(f"Top Contributor: {top_percent:.1f}% of all contributions", self.styles['Normal'])) + elements.append(Paragraph(f"Contributors for first 20% work: {contributors_20}", self.styles['Normal'])) + elements.append(Paragraph(f"Contributors for first 50% work: {contributors_50}", self.styles['Normal'])) + elements.append(Paragraph(f"Contributors for first 80% work: {contributors_80}", self.styles['Normal'])) + elements.append(Paragraph(f"Gini Coefficient: {gini:.2f} ({'High' if gini > 0.6 else 'Medium' if gini > 0.4 else 'Low'} inequality)", self.styles['Normal'])) + elements.append(Spacer(1, 15)) + + # Community health + elements.append(Paragraph("Community Health", self.styles['SectionTitle'])) + + community_health = insights.get("community_health", {}) + if community_health: + health_overall = community_health.get("overall", {}) + elements.append(Paragraph( + f"Overall Health: {health_overall.get('level', 'Unknown')} (Score: {health_overall.get('score', 0):.1f}/40)", + self.styles['Normal'] + )) + elements.append(Spacer(1, 10)) + + # Issue and PR responsiveness + if "issue_closure_rate" in community_health: + closure_rate = community_health.get("issue_closure_rate", 0) + elements.append(Paragraph(f"Issue Closure Rate: {closure_rate:.1%}", self.styles['Normal'])) + + if "avg_issue_resolution_time_hours" in community_health: + resolution_hours = community_health.get("avg_issue_resolution_time_hours", 0) + if resolution_hours > 72: + resolution_days = resolution_hours / 24 + elements.append(Paragraph(f"Avg. Issue Resolution Time: {resolution_days:.1f} days", self.styles['Normal'])) + else: + elements.append(Paragraph(f"Avg. Issue Resolution Time: {resolution_hours:.1f} hours", self.styles['Normal'])) + + if "pr_merge_rate" in community_health: + merge_rate = community_health.get("pr_merge_rate", 0) + elements.append(Paragraph(f"PR Merge Rate: {merge_rate:.1%}", self.styles['Normal'])) + + if "avg_pr_merge_time_hours" in community_health: + merge_hours = community_health.get("avg_pr_merge_time_hours", 0) + if merge_hours > 72: + merge_days = merge_hours / 24 + elements.append(Paragraph(f"Avg. PR Merge Time: {merge_days:.1f} days", self.styles['Normal'])) + else: + elements.append(Paragraph(f"Avg. PR Merge Time: {merge_hours:.1f} hours", self.styles['Normal'])) + + elements.append(Spacer(1, 10)) + + # Community guidelines + community_files = community_health.get("community_guidelines", {}) + if community_files: + elements.append(Paragraph("Community Guidelines:", self.styles['SubsectionTitle'])) + + files = [ + ("CONTRIBUTING.md", "Contributing Guidelines"), + ("CODE_OF_CONDUCT.md", "Code of Conduct"), + ("SECURITY.md", "Security Policy"), + ("SUPPORT.md", "Support Information"), + ("GOVERNANCE.md", "Governance Model") + ] + + data = [["Guideline", "Present"]] + for file_name, display_name in files: + present = community_files.get(file_name, False) + data.append([display_name, "✓" if present else "✗"]) + + table = Table(data, colWidths=[150, 50]) + table.setStyle(TableStyle([ + ('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey), + ('TEXTCOLOR', (0, 0), (-1, 0), colors.black), + ('ALIGN', (0, 0), (0, -1), 'LEFT'), + ('ALIGN', (1, 0), (1, -1), 'CENTER'), + ('VALIGN', (0, 0), (-1, -1), 'MIDDLE'), + ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), + ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), + ('TEXTCOLOR', (1, 1), (1, -1), lambda row, col: colors.green if data[row][col] == "✓" else colors.red), + ])) + + elements.append(table) + elements.append(Spacer(1, 15)) + + # Documentation quality + elements.append(Paragraph("Documentation Analysis", self.styles['SectionTitle'])) + + doc_quality = insights.get("documentation_quality", {}) + if doc_quality: + has_readme = doc_quality.get("has_readme", False) + + if has_readme: + quality_score = doc_quality.get("score", 0) + quality_level = "High" if quality_score > 0.7 else "Medium" if quality_score > 0.4 else "Low" + word_count = doc_quality.get("readme_length", 0) + + elements.append(Paragraph(f"README Quality: {quality_level} (Score: {quality_score:.2f})", self.styles['Normal'])) + elements.append(Paragraph(f"README Length: {word_count} words", self.styles['Normal'])) + elements.append(Spacer(1, 10)) + + # Section analysis + sections = doc_quality.get("sections", {}) + if sections: + elements.append(Paragraph("README Sections Present:", self.styles['SubsectionTitle'])) + + section_labels = { + "introduction": "Introduction/Overview", + "installation": "Installation Instructions", + "usage": "Usage Examples", + "api": "API Documentation", + "contributing": "Contributing Guidelines", + "license": "License Information", + "code_of_conduct": "Code of Conduct" + } + + data = [["Section", "Present"]] + for section_key, section_label in section_labels.items(): + present = sections.get(section_key, False) + data.append([section_label, "✓" if present else "✗"]) + + table = Table(data, colWidths=[150, 50]) + table.setStyle(TableStyle([ + ('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey), + ('TEXTCOLOR', (0, 0), (-1, 0), colors.black), + ('ALIGN', (0, 0), (0, -1), 'LEFT'), + ('ALIGN', (1, 0), (1, -1), 'CENTER'), + ('VALIGN', (0, 0), (-1, -1), 'MIDDLE'), + ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), + ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), + ('TEXTCOLOR', (1, 1), (1, -1), lambda row, col: colors.green if data[row][col] == "✓" else colors.red), + ])) + + elements.append(table) + elements.append(Spacer(1, 10)) + + # Additional doc quality metrics + has_images = doc_quality.get("has_images", False) + has_code = doc_quality.get("has_code_examples", False) + + metrics_text = "Additional Features: " + if has_images: + img_count = doc_quality.get("image_count", 0) + metrics_text += f"{img_count} images/diagrams, " + if has_code: + code_blocks = doc_quality.get("code_block_count", 0) + metrics_text += f"{code_blocks} code examples" + + if has_images or has_code: + elements.append(Paragraph(metrics_text, self.styles['Normal'])) + else: + elements.append(Paragraph("No README file found.", self.styles['Normal'])) + + return elements + + def _create_visualization_pages(self) -> List[Any]: + """Create pages with visualizations.""" + elements = [] + + # Section title + elements.append(Paragraph("Visualizations", self.styles['Heading1'])) + elements.append(Spacer(1, 10)) + + visualizations = self.repo_data.get("visualizations", {}) + + # Organize visualizations by category + categories = { + "Language Analysis": ["language_distribution", "language_treemap"], + "Commit Activity": ["weekly_commits", "code_frequency", "commits_by_weekday", "commits_by_hour", "commit_heatmap"], + "Contributor Analysis": ["top_contributors", "contribution_distribution", "collaboration_network"], + "Issue & PR Analysis": ["issues_by_state", "issues_by_month", "issues_by_label", "pr_code_changes", "issue_pr_timeline"] + } + + # Add visualizations by category + for category, viz_keys in categories.items(): + category_visualizations = [key for key in viz_keys if key in visualizations] + + if category_visualizations: + elements.append(Paragraph(category, self.styles['SectionTitle'])) + elements.append(Spacer(1, 10)) + + for viz_key in category_visualizations: + fig = visualizations.get(viz_key) + if fig: + # Save figure to a temporary buffer + img_buffer = BytesIO() + + if isinstance(fig, go.Figure): + # Handle Plotly figures + fig.write_image(img_buffer, format="png", width=800, height=500) + else: + # Handle Matplotlib figures + fig.savefig(img_buffer, format="png", dpi=150) + + img_buffer.seek(0) + img = Image(img_buffer, width=6*inch, height=4*inch) + + # Add caption + caption = viz_key.replace("_", " ").title() + elements.append(Paragraph(caption, self.styles['SubsectionTitle'])) + elements.append(img) + elements.append(Spacer(1, 20)) + + # Add page break after each category + elements.append(PageBreak()) + + return elements + + def _create_summary_and_recommendations(self) -> List[Any]: + """Create summary and recommendations section.""" + elements = [] + + # Section title + elements.append(Paragraph("Summary & Recommendations", self.styles['Heading1'])) + elements.append(Spacer(1, 10)) + + # Repository summary + elements.append(Paragraph("Project Summary", self.styles['SectionTitle'])) + + insights = self.repo_data.get("insights", {}) + repo_details = self.repo_data.get("repo_details", {}) + + # Short description of the project + repo_name = repo_details.get("name", "The repository") + repo_desc = repo_details.get("description", "") + primary_lang = repo_details.get("language", "various languages") + + summary_text = f"{repo_name} is a {primary_lang} project" + if repo_desc: + summary_text += f" that {repo_desc.lower() if repo_desc[0].isupper() else repo_desc}" + summary_text += "." + + elements.append(Paragraph(summary_text, self.styles['Normal'])) + elements.append(Spacer(1, 10)) + + # Key metrics summary + community_health = insights.get("community_health", {}).get("overall", {}) + activity_level = insights.get("activity_level", {}) + code_complexity = insights.get("code_complexity", {}).get("overall", {}) + + metrics_text = f"The project has {repo_details.get('stargazers_count', 0)} stars and {repo_details.get('forks_count', 0)} forks." + + if "contributor_insights" in insights: + contributor_count = insights["contributor_insights"].get("contributor_count", 0) + metrics_text += f" It has {contributor_count} contributors" + + gini = insights["contributor_insights"].get("contribution_distribution", {}).get("gini_coefficient", 0) + if gini > 0.7: + metrics_text += " with a highly centralized contribution pattern" + elif gini > 0.4: + metrics_text += " with a moderately distributed contribution pattern" + else: + metrics_text += " with a well-distributed contribution pattern" + + metrics_text += "." + + elements.append(Paragraph(metrics_text, self.styles['Normal'])) + elements.append(Spacer(1, 10)) + + # Activity summary + if activity_level: + activity_text = f"The project shows {activity_level.get('level', 'Unknown').lower()} activity levels" + + # Add activity context + if activity_level.get('level') in ["High", "Very High"]: + activity_text += " with regular commits and issue management." + elif activity_level.get('level') in ["Medium"]: + activity_text += " with moderate development progress." + else: + activity_text += " with limited recent development." + + elements.append(Paragraph(activity_text, self.styles['Normal'])) + elements.append(Spacer(1, 10)) + + # Code quality summary + if code_complexity: + complexity_text = f"The codebase has {code_complexity.get('level', 'Unknown').lower()} complexity" + + if code_complexity.get('level') in ["High", "Very High"]: + complexity_text += ", which may present challenges for new contributors and maintenance." + elif code_complexity.get('level') in ["Medium", "Medium-High"]: + complexity_text += " with a reasonable balance between functionality and maintainability." + else: + complexity_text += " and should be relatively straightforward to understand and maintain." + + elements.append(Paragraph(complexity_text, self.styles['Normal'])) + elements.append(Spacer(1, 10)) + + # Community health summary + if community_health: + health_text = f"The project demonstrates {community_health.get('level', 'Unknown').lower()} community health" + + if community_health.get('level') in ["Excellent", "Very Good", "Good"]: + health_text += " with responsive maintainers and clear contribution guidelines." + elif community_health.get('level') in ["Fair"]: + health_text += " with some community structures in place." + else: + health_text += " with opportunities for improved community engagement." + + elements.append(Paragraph(health_text, self.styles['Normal'])) + elements.append(Spacer(1, 15)) + + # Recommendations + elements.append(Paragraph("Recommendations", self.styles['SectionTitle'])) + + recommendations = [] + + # Documentation recommendations + doc_quality = insights.get("documentation_quality", {}) + if doc_quality: + score = doc_quality.get("score", 0) + if score < 0.4: + recommendations.append("Improve documentation by adding more comprehensive README content, including usage examples and API documentation.") + elif score < 0.7: + recommendations.append("Enhance existing documentation with more examples and clearer installation instructions.") + + sections = doc_quality.get("sections", {}) + missing_key_sections = [] + if not sections.get("installation", False): + missing_key_sections.append("installation instructions") + if not sections.get("usage", False): + missing_key_sections.append("usage examples") + + if missing_key_sections: + recommendations.append(f"Add missing documentation sections: {', '.join(missing_key_sections)}.") + + # Community recommendations + community_files = insights.get("community_health", {}).get("community_guidelines", {}) + if community_files: + missing_guidelines = [] + if not community_files.get("CONTRIBUTING.md", False): + missing_guidelines.append("contribution guidelines") + if not community_files.get("CODE_OF_CONDUCT.md", False): + missing_guidelines.append("code of conduct") + + if missing_guidelines: + recommendations.append(f"Create missing community files: {', '.join(missing_guidelines)}.") + + # Issue management recommendations + issue_insights = insights.get("issue_insights", {}) + if issue_insights: + resolution_time = issue_insights.get("resolution_time", {}).get("mean_hours", 0) + if resolution_time > 168: # 1 week + recommendations.append("Improve issue response time to enhance user experience and community engagement.") + + # Code complexity recommendations + if code_complexity and code_complexity.get('level') in ["High", "Very High"]: + recommendations.append("Consider refactoring complex parts of the codebase to improve maintainability.") + + # CI/CD recommendations + ci_cd = insights.get("ci_cd_presence", {}) + if not ci_cd.get("has_ci_cd", False): + recommendations.append("Implement CI/CD pipelines (e.g., GitHub Actions) to automate testing and deployment.") + + # Activity recommendations + if activity_level and activity_level.get('level') in ["Low", "Very Low", "None"]: + recommendations.append("Revitalize project with regular updates and community engagement to attract more contributors.") + + # Add recommendations to the report + if recommendations: + for i, recommendation in enumerate(recommendations, 1): + elements.append(Paragraph(f"{i}. {recommendation}", self.styles['Normal'])) + elements.append(Spacer(1, 5)) + else: + elements.append(Paragraph("This project follows good development practices and no significant improvements are needed at this time.", self.styles['Normal'])) + + return elements + + +class RAGHelper: + """ + Helper class for Retrieval Augmented Generation (RAG) to enhance chatbot responses + with repository insights. + """ + + def __init__(self, repo_data: Dict[str, Any]): + """Initialize with repository data.""" + self.repo_data = repo_data + self.insights = repo_data.get("insights", {}) + + # Extract key information for easy retrieval + self._extract_key_info() + + def _extract_key_info(self): + """Extract and organize key information from repository data.""" + self.repo_info = {} + + # Basic repository details + if "repo_details" in self.repo_data: + details = self.repo_data["repo_details"] + self.repo_info["name"] = details.get("name", "") + self.repo_info["full_name"] = details.get("full_name", "") + self.repo_info["description"] = details.get("description", "") + self.repo_info["url"] = details.get("html_url", "") + self.repo_info["stars"] = details.get("stargazers_count", 0) + self.repo_info["forks"] = details.get("forks_count", 0) + self.repo_info["language"] = details.get("language", "") + self.repo_info["created_at"] = details.get("created_at", "") + self.repo_info["license"] = details.get("license", "") + + # Languages used + if "languages" in self.repo_data: + languages = self.repo_data["languages"] + total_bytes = sum(languages.values()) if languages else 0 + + if total_bytes > 0: + language_percentages = { + lang: (bytes_count / total_bytes) * 100 + for lang, bytes_count in languages.items() + } + + self.repo_info["language_breakdown"] = language_percentages + sorted_languages = sorted(language_percentages.items(), key=lambda x: x[1], reverse=True) + self.repo_info["top_languages"] = sorted_languages[:5] + + # Contributors + if "contributors" in self.repo_data: + contributors = self.repo_data["contributors"] + self.repo_info["total_contributors"] = len(contributors) + + if contributors: + sorted_contributors = sorted(contributors, key=lambda x: x.get("contributions", 0), reverse=True) + self.repo_info["top_contributors"] = [ + { + "name": c.get("login", "Unknown"), + "contributions": c.get("contributions", 0) + } + for c in sorted_contributors[:5] + ] + + # Activity metrics + if "commit_insights" in self.insights: + commit_insights = self.insights["commit_insights"] + self.repo_info["commit_patterns"] = commit_insights.get("commit_time_patterns", {}) + self.repo_info["top_committers"] = commit_insights.get("top_contributors", {}) + + # Documentation quality + if "documentation_quality" in self.insights: + doc_quality = self.insights["documentation_quality"] + self.repo_info["documentation_score"] = doc_quality.get("score", 0) + self.repo_info["documentation_quality"] = ( + "High" if doc_quality.get("score", 0) > 0.7 + else "Medium" if doc_quality.get("score", 0) > 0.4 + else "Low" + ) + self.repo_info["readme_sections"] = doc_quality.get("sections", {}) + + # Community health + if "community_health" in self.insights: + community_health = self.insights["community_health"] + self.repo_info["community_health_level"] = community_health.get("overall", {}).get("level", "Unknown") + self.repo_info["community_guidelines"] = community_health.get("community_guidelines", {}) + + # Activity level + if "activity_level" in self.insights: + activity_level = self.insights["activity_level"] + self.repo_info["activity_level"] = activity_level.get("level", "Unknown") + + # Code complexity + if "code_complexity" in self.insights: + code_complexity = self.insights["code_complexity"] + self.repo_info["code_complexity_level"] = code_complexity.get("overall", {}).get("level", "Unknown") + + def get_context_for_query(self, query: str) -> str: + """ + Retrieve relevant context from repository data based on the query. + + Args: + query: The user's query + + Returns: + str: Contextual information to enhance the response + """ + # Convert query to lowercase for easier matching + query_lower = query.lower() + + # Define keywords for different aspects of the repository + keywords = { + "overview": ["overview", "about", "what is", "tell me about", "summary"], + "languages": ["language", "programming language", "code language", "tech stack"], + "contributors": ["contributor", "who", "team", "maintainer", "author"], + "activity": ["activity", "active", "commit", "update", "recent", "frequency"], + "documentation": ["documentation", "docs", "readme", "well documented"], + "community": ["community", "health", "governance", "conduct", "guideline"], + "complexity": ["complex", "complexity", "difficult", "simple", "codebase", "understand"], + "issues": ["issue", "bug", "problem", "ticket", "feature request"], + "pulls": ["pull request", "pr", "merge", "contribution"], + } + + # Check which aspects are relevant to the query + relevant_aspects = [] + for aspect, terms in keywords.items(): + if any(term in query_lower for term in terms): + relevant_aspects.append(aspect) + + # If no specific aspects are identified, provide a general overview + if not relevant_aspects: + relevant_aspects = ["overview"] + + # Build context information based on relevant aspects + context_parts = [] + + # Repository overview + if "overview" in relevant_aspects: + repo_name = self.repo_info.get("full_name", "The repository") + stars = self.repo_info.get("stars", 0) + forks = self.repo_info.get("forks", 0) + description = self.repo_info.get("description", "") + + overview = f"{repo_name} is a GitHub repository with {stars} stars and {forks} forks. " + if description: + overview += f"Description: {description}. " + + language = self.repo_info.get("language", "") + if language: + overview += f"It's primarily written in {language}. " + + created_at = self.repo_info.get("created_at", "") + if created_at: + try: + date = datetime.datetime.fromisoformat(created_at.replace('Z', '+00:00')) + overview += f"The repository was created on {date.strftime('%B %d, %Y')}. " + except (ValueError, AttributeError): + pass + + context_parts.append(overview) + + # Language breakdown + if "languages" in relevant_aspects: + top_languages = self.repo_info.get("top_languages", []) + if top_languages: + languages_text = "Language breakdown: " + languages_text += ", ".join([f"{lang}: {pct:.1f}%" for lang, pct in top_languages]) + languages_text += "." + context_parts.append(languages_text) + + # Contributors + if "contributors" in relevant_aspects: + total_contributors = self.repo_info.get("total_contributors", 0) + top_contributors = self.repo_info.get("top_contributors", []) + + contributors_text = f"The repository has {total_contributors} contributors. " + if top_contributors: + contributors_text += "Top contributors: " + contributors_text += ", ".join([ + f"{c['name']} ({c['contributions']} commits)" + for c in top_contributors + ]) + contributors_text += "." + + context_parts.append(contributors_text) + + # Activity metrics + if "activity" in relevant_aspects: + activity_level = self.repo_info.get("activity_level", "Unknown") + + activity_text = f"Activity level: {activity_level}. " + + commit_patterns = self.repo_info.get("commit_patterns", {}) + by_weekday = commit_patterns.get("by_weekday", {}) + if by_weekday: + most_active_day = max(by_weekday.items(), key=lambda x: x[1])[0] + activity_text += f"Most active day of the week: {most_active_day}. " + + context_parts.append(activity_text) + + # Documentation quality + if "documentation" in relevant_aspects: + doc_quality = self.repo_info.get("documentation_quality", "Unknown") + doc_score = self.repo_info.get("documentation_score", 0) + + docs_text = f"Documentation quality: {doc_quality} (score: {doc_score:.2f}/1.0). " + + readme_sections = self.repo_info.get("readme_sections", {}) + if readme_sections: + present_sections = [k for k, v in readme_sections.items() if v] + missing_sections = [k for k, v in readme_sections.items() if not v] + + if present_sections: + docs_text += f"README includes sections on: {', '.join(present_sections)}. " + if missing_sections: + docs_text += f"README is missing sections on: {', '.join(missing_sections)}." + + context_parts.append(docs_text) + + # Community health + if "community" in relevant_aspects: + health_level = self.repo_info.get("community_health_level", "Unknown") + guidelines = self.repo_info.get("community_guidelines", {}) + + community_text = f"Community health: {health_level}. " + + if guidelines: + present_guidelines = [k for k, v in guidelines.items() if v] + missing_guidelines = [k for k, v in guidelines.items() if not v] + + if present_guidelines: + community_text += f"Has community files: {', '.join(present_guidelines)}. " + if missing_guidelines: + community_text += f"Missing community files: {', '.join(missing_guidelines)}." + + context_parts.append(community_text) + + # Code complexity + if "complexity" in relevant_aspects: + complexity_level = self.repo_info.get("code_complexity_level", "Unknown") + complexity_text = f"Code complexity: {complexity_level}." + context_parts.append(complexity_text) + + # Issues + if "issues" in relevant_aspects and "issue_insights" in self.insights: + issue_insights = self.insights["issue_insights"] + by_state = issue_insights.get("by_state", {}) + + issues_text = "Issues: " + if by_state: + issues_text += ", ".join([f"{count} {state}" for state, count in by_state.items()]) + issues_text += ". " + + resolution_time = issue_insights.get("resolution_time", {}) + if resolution_time: + mean_hours = resolution_time.get("mean_hours", 0) + if mean_hours > 24: + mean_days = mean_hours / 24 + issues_text += f"Average resolution time: {mean_days:.1f} days." + else: + issues_text += f"Average resolution time: {mean_hours:.1f} hours." + + context_parts.append(issues_text) + + # Pull requests + if "pulls" in relevant_aspects and "pr_insights" in self.insights: + pr_insights = self.insights["pr_insights"] + by_state = pr_insights.get("by_state", {}) + + prs_text = "Pull Requests: " + if by_state: + prs_text += ", ".join([f"{count} {state}" for state, count in by_state.items()]) + prs_text += ". " + + context_parts.append(prs_text) + + # Join all context parts + context = " ".join(context_parts) + + return context + + +def create_gradio_interface(): + """ + Create and launch the Gradio interface for GitHub repository analysis. + """ + # Styling + css = """ + .gradio-container {max-width: 100% !important} + .main-analysis-area {min-height: 600px} + .analysis-result {overflow-y: auto; max-height: 500px} + .chat-interface {border: 1px solid #ccc; border-radius: 5px; padding: 10px} + .pdf-download {margin-top: 20px} + """ + + # Initialize state + repo_data = {} + analyzer = None + + def parse_repo_url(url: str) -> Tuple[str, str]: + """Parse GitHub repository URL into owner and repo name.""" + # Pattern for GitHub repo URLs + patterns = [ + r"github\.com\/([^\/]+)\/([^\/]+)", # github.com/owner/repo + r"github\.com\/([^\/]+)\/([^\/]+)\/?$", # github.com/owner/repo/ + r"github\.com\/([^\/]+)\/([^\/]+)\.git", # github.com/owner/repo.git + ] + + for pattern in patterns: + match = re.search(pattern, url) + if match: + return match.group(1), match.group(2) + + return None, None + + def analyze_repository(repo_url: str, is_private: bool, github_token: str = None, progress=gr.Progress()) -> Tuple[str, Dict]: + """Analyze GitHub repository and return the analysis results.""" + # Validate URL and extract owner/repo + owner, repo_name = parse_repo_url(repo_url) + + if not owner or not repo_name: + return "Invalid GitHub repository URL. Please use format: https://github.com/owner/repo", {} + + # Use provided token or default token + token = github_token if is_private and github_token else os.environ.get("GITHUB_TOKEN", "") + + if is_private and not token: + return "GitHub token is required for private repositories.", {} + + # Configure analyzer + config = GitHubAPIConfig(token=token) + nonlocal analyzer + analyzer = GitHubRepoAnalyzer(config) + + # Analyze repository with progress updates + progress(0, desc="Starting repository analysis...") + try: + progress(0.1, desc="Fetching repository details...") + global repo_data + repo_data = analyzer.analyze_repo(owner, repo_name) + + progress(0.9, desc="Generating insights...") + + # Create a summary of the analysis + repo_details = repo_data.get("repo_details", {}) + insights = repo_data.get("insights", {}) + + repo_name = repo_details.get("full_name", "") + description = repo_details.get("description", "No description provided") + stars = repo_details.get("stargazers_count", 0) + forks = repo_details.get("forks_count", 0) + language = repo_details.get("language", "Unknown") + + # Calculate age + created_at = repo_details.get("created_at", "") + age_str = "Unknown" + if created_at: + try: + created_date = datetime.datetime.fromisoformat(created_at.replace('Z', '+00:00')) + age_days = (datetime.datetime.now(datetime.timezone.utc) - created_date).days + age_years = age_days / 365.25 + age_str = f"{age_years:.1f} years ({age_days} days)" + except (ValueError, AttributeError): + pass + + # Get activity level + activity_level = insights.get("activity_level", {}).get("level", "Unknown") + + # Documentation quality + doc_quality = insights.get("documentation_quality", {}) + has_readme = doc_quality.get("has_readme", False) + doc_score = doc_quality.get("score", 0) if has_readme else 0 + doc_quality_level = "High" if doc_score > 0.7 else "Medium" if doc_score > 0.4 else "Low" + + # Community health + community_health = insights.get("community_health", {}).get("overall", {}) + health_level = community_health.get("level", "Unknown") + + # Code complexity + code_complexity = insights.get("code_complexity", {}).get("overall", {}) + complexity_level = code_complexity.get("level", "Unknown") + + # Create summary HTML + summary_html = f""" +

{repo_name}

+

Description: {description}

+
+
+

Repository Details

+ +
+
+

Key Insights

+ +
+
+ """ + + # Contributors section + contributors = repo_data.get("contributors", []) + if contributors: + top_contributors = sorted(contributors, key=lambda x: x.get("contributions", 0), reverse=True)[:5] + + summary_html += f""" +
+

Top Contributors

+
+ """ + + for contributor in top_contributors: + avatar_url = contributor.get("avatar_url", "") + login = contributor.get("login", "Unknown") + contributions = contributor.get("contributions", 0) + + summary_html += f""" +
+ +
{login}
+
{contributions} commits
+
+ """ + + summary_html += """ +
+
+ """ + + # Language distribution section + languages = repo_data.get("languages", {}) + if languages: + total_bytes = sum(languages.values()) + language_percentages = [ + (lang, bytes_count, (bytes_count / total_bytes) * 100) + for lang, bytes_count in languages.items() + ] + sorted_languages = sorted(language_percentages, key=lambda x: x[1], reverse=True)[:5] + + summary_html += f""" +
+

Language Distribution

+
+ """ + + for lang, bytes_count, percentage in sorted_languages: + bar_width = max(1, min(100, percentage)) + summary_html += f""" +
+
+
{lang}
+
+
+
+
{percentage:.1f}%
+
+
+ """ + + summary_html += """ +
+
+ """ + + progress(1.0, desc="Analysis complete!") + return summary_html, repo_data + + except Exception as e: + error_message = f"Error analyzing repository: {str(e)}" + logger.error(error_message) + return error_message, {} + + def generate_pdf_report() -> Tuple[str, Dict[str, str]]: + """Generate and download PDF report.""" + if not repo_data: + return "Please analyze a repository first.", {} + + try: + # Create PDF report + pdf_generator = PDFReportGenerator(repo_data) + pdf_path = pdf_generator.generate_report() + + # Return file path for download + repo_name = repo_data.get("repo_details", {}).get("full_name", "repository").replace("/", "_") + return f"PDF report generated for {repo_name}", {"report.pdf": pdf_path} + + except Exception as e: + error_message = f"Error generating PDF report: {str(e)}" + logger.error(error_message) + return error_message, {} + + def chat_with_repo(query: str, history: List[Tuple[str, str]]) -> str: + """ + Chat with the repository analysis data using RAG approach. + + Args: + query: User's question + history: Chat history + + Returns: + str: Response to the user's question + """ + if not repo_data: + return "Please analyze a repository first before asking questions." + + try: + # Use RAG helper to get relevant context + rag_helper = RAGHelper(repo_data) + context = rag_helper.get_context_for_query(query) + + # For a real implementation, you would use the Gemini API here + # This is a simulated response based on the context + + # Format response based on the query and context + response = "" + + # Extract repo name for more natural responses + repo_name = repo_data.get("repo_details", {}).get("name", "The repository") + + # General info about the repo + if any(term in query.lower() for term in ["what is", "tell me about", "overview", "about"]): + response = f"{context}\n\nIs there something specific about {repo_name} you'd like to know more about?" + + # Language related queries + elif any(term in query.lower() for term in ["language", "programming", "written in"]): + response = f"{context}\n\nWould you like to know more about any specific language used in {repo_name}?" + + # Contributor related queries + elif any(term in query.lower() for term in ["contributor", "who", "maintain", "author"]): + response = f"{context}\n\nI can provide more details about specific contributors if you're interested." + + # Activity related queries + elif any(term in query.lower() for term in ["active", "activity", "commit", "frequency"]): + response = f"{context}\n\nWould you like to see visualizations of the commit activity patterns?" + + # Documentation related queries + elif any(term in query.lower() for term in ["document", "readme", "docs"]): + response = f"{context}\n\nIs there a specific aspect of the documentation you'd like feedback on?" + + # Code complexity related queries + elif any(term in query.lower() for term in ["complex", "difficulty", "understand"]): + response = f"{context}\n\nWould you like suggestions for navigating this codebase effectively?" + + # Default response for other queries + else: + response = f"Based on my analysis of {repo_name}:\n\n{context}\n\nIs there anything specific you'd like to know more about?" + + return response + + except Exception as e: + error_message = f"Error processing your question: {str(e)}" + logger.error(error_message) + return error_message + + # Create Gradio interface + with gr.Blocks(css=css) as interface: + gr.Markdown("# GitHub Repository Analyzer") + gr.Markdown("Analyze GitHub repositories and chat about the insights") + + with gr.Tab("Repository Analysis"): + with gr.Row(): + with gr.Column(scale=3): + repo_url = gr.Textbox(label="GitHub Repository URL", placeholder="https://github.com/owner/repo") + with gr.Column(scale=1): + is_private = gr.Checkbox(label="Private Repository") + github_token = gr.Textbox(label="GitHub Token (for private repos)", type="password", visible=False) + + # Show/hide token input based on private repo checkbox + is_private.change(fn=lambda x: gr.update(visible=x), inputs=[is_private], outputs=[github_token]) + + analyze_btn = gr.Button("Analyze Repository", variant="primary") + + with gr.Row(): + with gr.Column(scale=2): + analysis_result = gr.HTML(label="Analysis Result", elem_classes=["analysis-result"]) + with gr.Column(scale=1): + with gr.Group(): + gr.Markdown("### PDF Report") + pdf_btn = gr.Button("Generate PDF Report", variant="secondary") + pdf_output = gr.Markdown() + pdf_download = gr.File(label="Download Report", elem_classes=["pdf-download"]) + + # Connect buttons to functions + analyze_btn.click( + fn=analyze_repository, + inputs=[repo_url, is_private, github_token], + outputs=[analysis_result, pdf_output] + ) + + pdf_btn.click( + fn=generate_pdf_report, + inputs=[], + outputs=[pdf_output, pdf_download] + ) + + with gr.Tab("Chat with Repository"): + gr.Markdown("Ask questions about the repository and get insights") + + chatbot = gr.Chatbot(elem_classes=["chat-interface"]) + msg = gr.Textbox( + placeholder="Ask me anything about the repository...", + show_label=False + ) + clear = gr.Button("Clear") + + # Connect chat interface + msg.submit( + fn=chat_with_repo, + inputs=[msg, chatbot], + outputs=[chatbot], + postprocess=lambda x: [(msg.value, x)] + ).then(lambda: "", None, msg) + + clear.click(lambda: None, None, chatbot, queue=False) + + return interface + +# Main code to run the application if __name__ == "__main__": - demo.launch() + # Create and launch Gradio interface + interface = create_gradio_interface() + interface.launch(debug=True, share=True) + + \ No newline at end of file