githubComplete / app.py
nihalaninihal's picture
Update app.py
d431f8e verified
import os
import json
import time
import re
import logging
import datetime
import concurrent.futures
import sys
import base64
import tempfile
from pathlib import Path
from typing import Dict, List, Union, Any, Optional, Tuple, Set
from collections import Counter, defaultdict
from dataclasses import dataclass, field, asdict
from io import BytesIO, StringIO
import urllib.request
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import networkx as nx
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from tqdm.notebook import tqdm
from dateutil.relativedelta import relativedelta
from github import Github, GithubException, RateLimitExceededException
import gradio as gr
# For PDF Generation
from reportlab.lib.pagesizes import letter, A4
from reportlab.lib import colors
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Image, Table, TableStyle, PageBreak
from reportlab.lib.units import inch
from reportlab.pdfgen import canvas
from reportlab.lib.enums import TA_CENTER, TA_LEFT
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler()
]
)
logger = logging.getLogger("github_analyzer")
@dataclass
class GitHubAPIConfig:
"""Configuration for the GitHub API client with sensible defaults."""
# API access configuration
token: str = None
max_retries: int = 5
backoff_factor: int = 2
per_page: int = 100 # Max allowed by GitHub
timeout: int = 30
# Retry status codes
retry_status_codes: Set[int] = field(default_factory=lambda: {
403, 429, 500, 502, 503, 504
})
# Permission types
collaborator_permission_types: List[str] = field(default_factory=lambda: [
"admin", "push", "pull", "maintain", "triage"
])
# File classification
code_extensions: List[str] = field(default_factory=lambda: [
".py", ".js", ".java", ".c", ".cpp", ".cs", ".go", ".php", ".rb",
".swift", ".kt", ".ts", ".rs", ".scala", ".lua", ".m", ".mm",
".h", ".hpp", ".cc", ".hh", ".f", ".f90", ".f95", ".f03", ".f08",
".for", ".f77", ".jl", ".pl", ".pm", ".t", ".r", ".dart", ".groovy",
".v", ".vhd", ".vhdl", ".erl", ".hrl", ".hs", ".lhs", ".ex", ".exs", ".hx"
])
markup_extensions: List[str] = field(default_factory=lambda: [
".md", ".html", ".htm", ".xml", ".json", ".yaml", ".yml", ".txt",
".rst", ".tex", ".adoc", ".csv", ".tsv", ".toml", ".ini", ".cfg"
])
script_extensions: List[str] = field(default_factory=lambda: [
".sh", ".bash", ".zsh", ".ps1", ".bat", ".cmd"
])
notebook_extensions: List[str] = field(default_factory=lambda: [
".ipynb"
])
data_extensions: List[str] = field(default_factory=lambda: [
".csv", ".tsv", ".json", ".xml", ".xls", ".xlsx", ".hdf5",
".parquet", ".feather", ".pkl", ".sav", ".dta", ".arff"
])
config_extensions: List[str] = field(default_factory=lambda: [
".yml", ".yaml", ".json", ".toml", ".ini", ".cfg", ".conf"
])
other_extensions: List[str] = field(default_factory=lambda: [
".txt", ".log", ".svg", ".png", ".jpg", ".jpeg"
])
# Data collection limits (set to None for no limit)
max_contributors: Optional[int] = 50
max_issues: Optional[int] = 100
max_commits: Optional[int] = 200
max_search_results: Optional[int] = 50
max_pull_requests: Optional[int] = 100
max_collaborators: Optional[int] = 30
# Output configuration
output_dir: str = "/tmp/github_data"
generate_visualizations: bool = True
def __post_init__(self):
"""Ensure output directory exists"""
os.makedirs(self.output_dir, exist_ok=True)
def all_code_extensions(self) -> List[str]:
"""Return all code-related file extensions"""
return list(set(
self.code_extensions +
self.script_extensions +
self.config_extensions
))
class GithubClient:
"""
A robust GitHub client that handles rate limiting, retries, and provides
consistent error handling.
"""
def __init__(self, config: GitHubAPIConfig):
"""Initialize the GitHub client with configuration."""
self.config = config
self.github = Github(
config.token,
per_page=config.per_page,
timeout=config.timeout,
retry=config.max_retries
)
self.cache = {} # Simple in-memory cache
def get_repo(self, repo_path: str):
"""Get a repository by owner/name with caching."""
cache_key = f"repo:{repo_path}"
if cache_key in self.cache:
return self.cache[cache_key]
repo = self.github.get_repo(repo_path)
self.cache[cache_key] = repo
return repo
def _handle_exception(self, e: GithubException, retry_count: int) -> bool:
"""
Handle GitHub exceptions with proper retries and backoff strategy.
Args:
e: The exception to handle
retry_count: Current retry count
Returns:
bool: True if retry should be attempted, False otherwise
"""
if retry_count >= self.config.max_retries:
logger.error(f"Max retries ({self.config.max_retries}) exceeded.")
return False
if isinstance(e, RateLimitExceededException):
# Handle primary rate limit
rate_limit = self.github.get_rate_limit()
reset_time = rate_limit.core.reset.timestamp() if hasattr(rate_limit, 'core') else time.time() + 3600
sleep_time = max(0, int(reset_time - time.time())) + 1
logger.warning(f"Rate limit exceeded. Waiting for {sleep_time} seconds...")
time.sleep(sleep_time)
return True
elif e.status in self.config.retry_status_codes:
# Handle secondary rate limits and server errors
sleep_time = self.config.backoff_factor ** retry_count
logger.warning(
f"Temporary error (status {e.status}). Retrying in {sleep_time} seconds. "
f"Attempt {retry_count+1}/{self.config.max_retries}."
)
time.sleep(sleep_time)
return True
# Non-recoverable error
logger.error(f"Non-recoverable GitHub API error: {e}")
return False
def _paginated_request(self, method, *args, **kwargs):
"""
Execute a paginated GitHub API request with retry logic.
Args:
method: The PyGithub method to call
Returns:
List of results or None on non-recoverable error
"""
results = []
retry_count = 0
max_results = kwargs.pop('max_results', None)
while retry_count <= self.config.max_retries:
try:
paginated_list = method(*args, **kwargs)
# Process items
for item in paginated_list:
results.append(item)
if max_results and len(results) >= max_results:
return results
# Check if we've reached the end
if paginated_list.totalCount <= len(results):
break
# Reset retry counter on success
retry_count = 0
except GithubException as e:
if self._handle_exception(e, retry_count):
retry_count += 1
else:
return None
return results
def _execute_request(self, method, *args, **kwargs):
"""
Execute a single GitHub API request with retry logic.
Args:
method: The PyGithub method to call
Returns:
Result of the API call or None on non-recoverable error
"""
retry_count = 0
while retry_count <= self.config.max_retries:
try:
result = method(*args, **kwargs)
return result
except GithubException as e:
# Special case for 404 errors - file not found
if e.status == 404:
logger.info(f"Resource not found: {e}")
return None
if self._handle_exception(e, retry_count):
retry_count += 1
else:
return None
return None
class GitHubRepoAnalyzer:
"""
Main class for analyzing GitHub repositories and generating insights.
"""
def __init__(self, config: GitHubAPIConfig):
"""Initialize the analyzer with configuration."""
self.config = config
self.client = GithubClient(config)
def get_repo_details(self, repo) -> Dict[str, Any]:
"""Get comprehensive repository metadata."""
logger.info(f"Fetching repository details for {repo.full_name}")
return {
"name": repo.name,
"full_name": repo.full_name,
"description": repo.description,
"html_url": repo.html_url,
"stargazers_count": repo.stargazers_count,
"watchers_count": repo.watchers_count,
"forks_count": repo.forks_count,
"open_issues_count": repo.open_issues_count,
"language": repo.language,
"default_branch": repo.default_branch,
"created_at": repo.created_at.isoformat() if repo.created_at else None,
"updated_at": repo.updated_at.isoformat() if repo.updated_at else None,
"pushed_at": repo.pushed_at.isoformat() if repo.pushed_at else None,
"license": repo.license.name if repo.license else None,
"topics": list(repo.get_topics()),
"archived": repo.archived,
"disabled": repo.disabled,
"visibility": repo.visibility,
"has_wiki": repo.has_wiki,
"has_pages": repo.has_pages,
"has_projects": repo.has_projects,
"has_issues": repo.has_issues,
"has_discussions": repo.has_discussions if hasattr(repo, 'has_discussions') else None,
"size": repo.size, # Size in KB
"network_count": repo.network_count,
"subscribers_count": repo.subscribers_count,
"organization": repo.organization.login if repo.organization else None,
"parent": repo.parent.full_name if hasattr(repo, 'parent') and repo.parent else None,
"fork": repo.fork,
}
def get_contributors(self, repo) -> List[Dict[str, Any]]:
"""Get repository contributors with detailed information."""
logger.info(f"Fetching contributors for {repo.full_name}")
contributors = self.client._paginated_request(
repo.get_contributors,
max_results=self.config.max_contributors
)
if contributors is None:
return []
return [
{
"login": c.login,
"id": c.id,
"contributions": c.contributions,
"type": c.type,
"html_url": c.html_url,
"followers": c.followers,
"following": c.following,
"public_repos": c.public_repos if hasattr(c, 'public_repos') else None,
"bio": c.bio if hasattr(c, 'bio') else None,
"location": c.location if hasattr(c, 'location') else None,
"company": c.company if hasattr(c, 'company') else None,
"email": c.email if hasattr(c, 'email') else None,
"avatar_url": c.avatar_url if hasattr(c, 'avatar_url') else None,
}
for c in contributors
]
def get_languages(self, repo) -> Dict[str, int]:
"""Get languages used in the repository."""
logger.info(f"Fetching languages for {repo.full_name}")
languages = self.client._execute_request(repo.get_languages)
return languages or {}
def get_issues(self, repo, state: str = "all") -> List[Dict[str, Any]]:
"""Get repository issues."""
logger.info(f"Fetching issues for {repo.full_name} with state={state}")
issues = self.client._paginated_request(
repo.get_issues,
state=state,
max_results=self.config.max_issues
)
if issues is None:
return []
return [
{
"id": issue.id,
"number": issue.number,
"title": issue.title,
"body": issue.body,
"state": issue.state,
"user_login": issue.user.login if issue.user else None,
"labels": [label.name for label in issue.labels],
"comments": issue.comments,
"created_at": issue.created_at.isoformat() if issue.created_at else None,
"updated_at": issue.updated_at.isoformat() if issue.updated_at else None,
"closed_at": issue.closed_at.isoformat() if issue.closed_at else None,
"pull_request": issue.pull_request is not None,
"milestone": issue.milestone.title if issue.milestone else None,
"assignees": [user.login for user in issue.assignees] if issue.assignees else [],
}
for issue in issues
]
def get_commits(self, repo) -> List[Dict[str, Any]]:
"""Get repository commits."""
logger.info(f"Fetching commits for {repo.full_name}")
commits = self.client._paginated_request(
repo.get_commits,
max_results=self.config.max_commits
)
if commits is None:
return []
return [
{
"sha": commit.sha,
"commit_message": commit.commit.message,
"author_login": commit.author.login if commit.author else None,
"author_name": commit.commit.author.name if commit.commit and commit.commit.author else None,
"author_email": commit.commit.author.email if commit.commit and commit.commit.author else None,
"committer_login": commit.committer.login if commit.committer else None,
"committer_name": commit.commit.committer.name if commit.commit and commit.commit.committer else None,
"date": commit.commit.author.date.isoformat() if commit.commit and commit.commit.author else None,
"html_url": commit.html_url,
"stats": {
"additions": commit.stats.additions if hasattr(commit, 'stats') else None,
"deletions": commit.stats.deletions if hasattr(commit, 'stats') else None,
"total": commit.stats.total if hasattr(commit, 'stats') else None,
},
"files_changed": [
{"filename": f.filename, "additions": f.additions, "deletions": f.deletions, "status": f.status}
for f in commit.files
] if hasattr(commit, 'files') else [],
}
for commit in commits
]
def get_readme(self, repo) -> str:
"""Get repository README content."""
logger.info(f"Fetching README for {repo.full_name}")
readme = self.client._execute_request(repo.get_readme)
if readme is None:
return ""
try:
return readme.decoded_content.decode('utf-8')
except UnicodeDecodeError:
logger.warning(f"Could not decode README content for {repo.full_name}")
return ""
def get_pull_requests(self, repo, state: str = "all") -> List[Dict[str, Any]]:
"""Get repository pull requests."""
logger.info(f"Fetching pull requests for {repo.full_name} with state={state}")
pulls = self.client._paginated_request(
repo.get_pulls,
state=state,
max_results=self.config.max_pull_requests
)
if pulls is None:
return []
return [
{
"id": pull.id,
"number": pull.number,
"title": pull.title,
"body": pull.body,
"state": pull.state,
"user_login": pull.user.login if pull.user else None,
"created_at": pull.created_at.isoformat() if pull.created_at else None,
"updated_at": pull.updated_at.isoformat() if pull.updated_at else None,
"closed_at": pull.closed_at.isoformat() if pull.closed_at else None,
"merged_at": pull.merged_at.isoformat() if pull.merged_at else None,
"draft": pull.draft if hasattr(pull, 'draft') else None,
"mergeable": pull.mergeable if hasattr(pull, 'mergeable') else None,
"mergeable_state": pull.mergeable_state if hasattr(pull, 'mergeable_state') else None,
"merged": pull.merged if hasattr(pull, 'merged') else None,
"merge_commit_sha": pull.merge_commit_sha if hasattr(pull, 'merge_commit_sha') else None,
"comments": pull.comments if hasattr(pull, 'comments') else 0,
"review_comments": pull.review_comments if hasattr(pull, 'review_comments') else 0,
"commits": pull.commits if hasattr(pull, 'commits') else 0,
"additions": pull.additions if hasattr(pull, 'additions') else 0,
"deletions": pull.deletions if hasattr(pull, 'deletions') else 0,
"changed_files": pull.changed_files if hasattr(pull, 'changed_files') else 0,
"head_ref": pull.head.ref if hasattr(pull, 'head') and pull.head else None,
"base_ref": pull.base.ref if hasattr(pull, 'base') and pull.base else None,
"labels": [label.name for label in pull.labels] if hasattr(pull, 'labels') else [],
"assignees": [user.login for user in pull.assignees] if hasattr(pull, 'assignees') else [],
"requested_reviewers": [user.login for user in pull.requested_reviewers] if hasattr(pull, 'requested_reviewers') else [],
}
for pull in pulls
]
def get_collaborators(self, repo, affiliation: str = "all") -> List[Dict[str, Any]]:
"""Get repository collaborators."""
logger.info(f"Fetching collaborators for {repo.full_name} with affiliation={affiliation}")
collaborators = self.client._paginated_request(
repo.get_collaborators,
affiliation=affiliation,
max_results=self.config.max_collaborators
)
if collaborators is None:
return []
return [
{
"login": c.login,
"id": c.id,
"type": c.type,
"url": c.url,
"site_admin": c.site_admin if hasattr(c, 'site_admin') else None,
"role_name": self._get_permission_level(repo, c.login),
"avatar_url": c.avatar_url if hasattr(c, 'avatar_url') else None,
}
for c in collaborators
]
def _get_permission_level(self, repo, username: str) -> str:
"""Get permission level for a collaborator."""
try:
return repo.get_collaborator_permission(username)
except GithubException:
return "unknown"
def get_file_distribution(self, repo) -> Dict[str, int]:
"""Analyze file types distribution in the repository."""
logger.info(f"Analyzing file distribution for {repo.full_name}")
# Get all files in the repo (only feasible for smaller repos)
try:
contents = self.client._execute_request(repo.get_contents, "")
if not contents:
return {}
file_types = defaultdict(int)
directories = []
# Process initial contents
for item in contents:
if item.type == "dir":
directories.append(item.path)
elif item.type == "file":
ext = os.path.splitext(item.name)[1].lower()
file_types[ext if ext else "no_extension"] += 1
# Process directories (up to a reasonable depth to avoid API rate limits)
max_depth = 3
for depth in range(max_depth):
if not directories:
break
next_level = []
for directory in directories[:100]: # Limit to avoid excessive API calls
dir_contents = self.client._execute_request(repo.get_contents, directory)
if not dir_contents:
continue
for item in dir_contents:
if item.type == "dir":
next_level.append(item.path)
elif item.type == "file":
ext = os.path.splitext(item.name)[1].lower()
file_types[ext if ext else "no_extension"] += 1
directories = next_level
return dict(file_types)
except GithubException:
logger.warning(f"Could not get file distribution for {repo.full_name}")
return {}
def search_code(self, repo, query_terms: List[str]) -> List[Dict[str, Any]]:
"""Search for specific terms in the repository code."""
logger.info(f"Searching code in {repo.full_name} for terms: {query_terms}")
results = []
for term in query_terms:
query = f"repo:{repo.full_name} {term}"
search_results = self.client._paginated_request(
self.client.github.search_code,
query,
max_results=self.config.max_search_results
)
if search_results:
results.extend([
{
"term": term,
"name": result.name,
"path": result.path,
"sha": result.sha,
"url": result.html_url,
"repository": result.repository.full_name,
}
for result in search_results
if result.repository.full_name == repo.full_name
])
return results
def get_branches(self, repo) -> List[Dict[str, Any]]:
"""Get repository branches."""
logger.info(f"Fetching branches for {repo.full_name}")
branches = self.client._paginated_request(repo.get_branches)
if branches is None:
return []
return [
{
"name": branch.name,
"protected": branch.protected,
"commit_sha": branch.commit.sha if branch.commit else None,
}
for branch in branches
]
def get_releases(self, repo) -> List[Dict[str, Any]]:
"""Get repository releases."""
logger.info(f"Fetching releases for {repo.full_name}")
releases = self.client._paginated_request(repo.get_releases)
if releases is None:
return []
return [
{
"id": release.id,
"tag_name": release.tag_name,
"name": release.title,
"body": release.body,
"draft": release.draft,
"prerelease": release.prerelease,
"created_at": release.created_at.isoformat() if release.created_at else None,
"published_at": release.published_at.isoformat() if release.published_at else None,
"author_login": release.author.login if release.author else None,
"html_url": release.html_url,
"assets": [
{
"name": asset.name,
"label": asset.label,
"content_type": asset.content_type,
"size": asset.size,
"download_count": asset.download_count,
"browser_download_url": asset.browser_download_url,
}
for asset in release.get_assets()
],
}
for release in releases
]
def get_workflows(self, repo) -> List[Dict[str, Any]]:
"""Get repository GitHub Actions workflows."""
logger.info(f"Fetching workflows for {repo.full_name}")
try:
workflows = self.client._paginated_request(repo.get_workflows)
if workflows is None:
return []
return [
{
"id": workflow.id,
"name": workflow.name,
"path": workflow.path,
"state": workflow.state,
"created_at": workflow.created_at.isoformat() if workflow.created_at else None,
"updated_at": workflow.updated_at.isoformat() if workflow.updated_at else None,
}
for workflow in workflows
]
except (GithubException, AttributeError):
# Older PyGithub versions or repositories without workflows
return []
def analyze_commit_activity(self, repo) -> Dict[str, Any]:
"""Analyze commit activity patterns."""
logger.info(f"Analyzing commit activity for {repo.full_name}")
# Get stats commit activity
stats = self.client._execute_request(repo.get_stats_commit_activity)
if not stats:
return {}
weekly_commits = []
for week in stats:
if hasattr(week, 'week') and hasattr(week, 'total'):
date = datetime.datetime.fromtimestamp(week.week).strftime('%Y-%m-%d')
weekly_commits.append({
"week": date,
"total": week.total,
"days": week.days if hasattr(week, 'days') else [],
})
# Get code frequency
code_freq = self.client._execute_request(repo.get_stats_code_frequency)
if not code_freq:
code_frequency = []
else:
code_frequency = []
for item in code_freq:
date = datetime.datetime.fromtimestamp(item[0]).strftime('%Y-%m-%d')
code_frequency.append({
"week": date,
"additions": item[1],
"deletions": -item[2], # Convert to positive for better readability
})
return {
"weekly_commits": weekly_commits,
"code_frequency": code_frequency,
}
def analyze_contributor_activity(self, repo) -> Dict[str, Any]:
"""Analyze contributor activity patterns."""
logger.info(f"Analyzing contributor activity for {repo.full_name}")
# Get contributor stats
stats = self.client._execute_request(repo.get_stats_contributors)
if not stats:
return {}
contributor_stats = []
for stat in stats:
if not hasattr(stat, 'author') or not stat.author:
continue
weeks_data = []
for week in stat.weeks:
if hasattr(week, 'w'):
date = datetime.datetime.fromtimestamp(week.w).strftime('%Y-%m-%d')
weeks_data.append({
"week": date,
"additions": week.a,
"deletions": week.d,
"commits": week.c,
})
contributor_stats.append({
"author": stat.author.login,
"total_commits": stat.total,
"weeks": weeks_data,
})
return {
"contributor_stats": contributor_stats,
}
def analyze_issue_distribution(self, issues: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analyze distribution of issues by various metrics."""
if not issues:
return {}
# Convert to DataFrame for easier analysis
df = pd.DataFrame(issues)
# Issues by state
state_counts = df['state'].value_counts().to_dict() if 'state' in df else {}
# Issues by user
user_counts = df['user_login'].value_counts().head(10).to_dict() if 'user_login' in df else {}
# Pull requests vs regular issues
is_pr_counts = df['pull_request'].value_counts().to_dict() if 'pull_request' in df else {}
# Issues by labels (flattening the labels list)
labels = []
if 'labels' in df:
for label_list in df['labels']:
if label_list:
labels.extend(label_list)
label_counts = Counter(labels)
top_labels = dict(label_counts.most_common(10))
# Time analysis
if 'created_at' in df:
df['created_date'] = pd.to_datetime(df['created_at'])
df['month_year'] = df['created_date'].dt.strftime('%Y-%m')
issues_by_month = df.groupby('month_year').size().to_dict()
else:
issues_by_month = {}
# Calculate resolution time for closed issues
resolution_times = []
if 'created_at' in df and 'closed_at' in df:
for _, issue in df.iterrows():
if pd.notna(issue.get('closed_at')) and pd.notna(issue.get('created_at')):
created = pd.to_datetime(issue['created_at'])
closed = pd.to_datetime(issue['closed_at'])
resolution_time = (closed - created).total_seconds() / 3600 # hours
resolution_times.append(resolution_time)
resolution_stats = {}
if resolution_times:
resolution_stats = {
"mean_hours": sum(resolution_times) / len(resolution_times),
"median_hours": sorted(resolution_times)[len(resolution_times) // 2],
"min_hours": min(resolution_times),
"max_hours": max(resolution_times),
}
return {
"by_state": state_counts,
"by_user": user_counts,
"pr_vs_issue": is_pr_counts,
"by_label": top_labels,
"by_month": issues_by_month,
"resolution_time": resolution_stats,
}
def generate_insights(self, repo_data: Dict[str, Any]) -> Dict[str, Any]:
"""Generate higher-level insights from the collected repository data."""
insights = {}
# Repository activity and health
if "repo_details" in repo_data:
repo_details = repo_data["repo_details"]
insights["repository_age_days"] = self._calculate_age_days(repo_details.get("created_at"))
insights["freshness_days"] = self._calculate_freshness_days(repo_details.get("pushed_at"))
# Popularity metrics
insights["popularity"] = {
"stars": repo_details.get("stargazers_count", 0),
"forks": repo_details.get("forks_count", 0),
"watchers": repo_details.get("watchers_count", 0),
"star_fork_ratio": self._calculate_ratio(
repo_details.get("stargazers_count", 0),
repo_details.get("forks_count", 0)
),
}
# Language distribution
if "languages" in repo_data:
languages = repo_data["languages"]
total_bytes = sum(languages.values()) if languages else 0
if total_bytes > 0:
language_percentages = {
lang: (bytes_count / total_bytes) * 100
for lang, bytes_count in languages.items()
}
insights["language_distribution"] = {
"primary_language": max(languages.items(), key=lambda x: x[1])[0] if languages else None,
"language_count": len(languages),
"percentages": language_percentages,
}
# Contributor insights
if "contributors" in repo_data:
contributors = repo_data["contributors"]
if contributors:
total_contributions = sum(c.get("contributions", 0) for c in contributors)
insights["contributor_insights"] = {
"contributor_count": len(contributors),
"total_contributions": total_contributions,
"avg_contributions_per_contributor": total_contributions / len(contributors) if len(contributors) > 0 else 0,
"contribution_distribution": self._analyze_contribution_distribution(contributors),
}
# Issue and PR dynamics
if "issues" in repo_data:
issues = repo_data["issues"]
insights["issue_insights"] = self.analyze_issue_distribution(issues)
if "pull_requests" in repo_data:
prs = repo_data["pull_requests"]
insights["pr_insights"] = self.analyze_issue_distribution(prs) # Reuse the same analysis
# Additional PR-specific metrics
if prs:
insights["pr_code_change_stats"] = self._analyze_pr_code_changes(prs)
# Commit patterns
if "commits" in repo_data:
commits = repo_data["commits"]
insights["commit_insights"] = self._analyze_commit_patterns(commits)
# Check for CI/CD presence
insights["ci_cd_presence"] = self._detect_ci_cd(repo_data)
# Documentation quality
if "readme" in repo_data:
readme = repo_data["readme"]
insights["documentation_quality"] = self._assess_documentation_quality(readme)
# Project Activity Level
insights["activity_level"] = self._calculate_activity_level(repo_data)
# Code complexity analysis
insights["code_complexity"] = self._analyze_code_complexity(repo_data)
# Community health analysis
insights["community_health"] = self._analyze_community_health(repo_data)
return insights
def _calculate_age_days(self, created_at_iso: str) -> float:
"""Calculate repository age in days."""
if not created_at_iso:
return 0
try:
created_at = datetime.datetime.fromisoformat(created_at_iso.replace('Z', '+00:00'))
now = datetime.datetime.now(datetime.timezone.utc)
return (now - created_at).total_seconds() / (24 * 3600)
except ValueError:
return 0
def _calculate_freshness_days(self, pushed_at_iso: str) -> float:
"""Calculate days since last push."""
if not pushed_at_iso:
return float('inf')
try:
pushed_at = datetime.datetime.fromisoformat(pushed_at_iso.replace('Z', '+00:00'))
now = datetime.datetime.now(datetime.timezone.utc)
return (now - pushed_at).total_seconds() / (24 * 3600)
except ValueError:
return float('inf')
def _calculate_ratio(self, numerator: int, denominator: int) -> float:
"""Calculate ratio with handling for zero denominator."""
return numerator / denominator if denominator and denominator > 0 else float('inf')
def _analyze_contribution_distribution(self, contributors: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analyze the distribution of contributions among contributors."""
if not contributors:
return {}
# Sort contributors by number of contributions
sorted_contributors = sorted(contributors, key=lambda c: c.get("contributions", 0), reverse=True)
# Calculate percentiles
total_contributions = sum(c.get("contributions", 0) for c in contributors)
cumulative_contributions = 0
percentile_20 = 0
percentile_50 = 0
percentile_80 = 0
for i, contributor in enumerate(sorted_contributors):
contributions = contributor.get("contributions", 0)
cumulative_contributions += contributions
percentage = (cumulative_contributions / total_contributions) * 100
if percentage >= 20 and percentile_20 == 0:
percentile_20 = i + 1
if percentage >= 50 and percentile_50 == 0:
percentile_50 = i + 1
if percentage >= 80 and percentile_80 == 0:
percentile_80 = i + 1
# Calculate Gini coefficient to measure inequality
gini = self._calculate_gini([c.get("contributions", 0) for c in contributors])
return {
"contributors_for_20_percent": percentile_20,
"contributors_for_50_percent": percentile_50,
"contributors_for_80_percent": percentile_80,
"gini_coefficient": gini,
"top_contributor_percentage": (sorted_contributors[0].get("contributions", 0) / total_contributions) * 100 if sorted_contributors else 0,
}
def _calculate_gini(self, values: List[int]) -> float:
"""Calculate the Gini coefficient of a distribution."""
if not values or sum(values) == 0:
return 0
values = sorted(values)
n = len(values)
cumsum = 0
for i, value in enumerate(values):
cumsum += value
values[i] = cumsum
return (2 * sum(values) / (n * sum(values[-1]))) - (n + 1) / n
def _analyze_pr_code_changes(self, prs: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analyze code changes across pull requests."""
if not prs:
return {}
# Extract metrics
additions = [pr.get("additions", 0) for pr in prs if pr.get("additions") is not None]
deletions = [pr.get("deletions", 0) for pr in prs if pr.get("deletions") is not None]
changed_files = [pr.get("changed_files", 0) for pr in prs if pr.get("changed_files") is not None]
# Calculate stats
stats = {}
if additions:
stats["additions"] = {
"mean": sum(additions) / len(additions),
"median": sorted(additions)[len(additions) // 2],
"max": max(additions),
"total": sum(additions),
}
if deletions:
stats["deletions"] = {
"mean": sum(deletions) / len(deletions),
"median": sorted(deletions)[len(deletions) // 2],
"max": max(deletions),
"total": sum(deletions),
}
if changed_files:
stats["changed_files"] = {
"mean": sum(changed_files) / len(changed_files),
"median": sorted(changed_files)[len(changed_files) // 2],
"max": max(changed_files),
"total": sum(changed_files),
}
return stats
def _analyze_commit_patterns(self, commits: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analyze patterns in commit data."""
if not commits:
return {}
# Count by author
commit_counts = Counter(
commit.get("author_login", "Unknown")
for commit in commits
if commit.get("author_login")
)
# Analyze message patterns
message_lengths = [
len(commit.get("commit_message", ""))
for commit in commits
if commit.get("commit_message")
]
# Extract dates for time-based analysis
dates = []
for commit in commits:
date_str = commit.get("date")
if date_str:
try:
date = datetime.datetime.fromisoformat(date_str.replace('Z', '+00:00'))
dates.append(date)
except ValueError:
pass
# Analyze times of day
hours = [date.hour for date in dates]
hour_counts = Counter(hours)
# Analyze days of week
weekdays = [date.weekday() for date in dates]
weekday_counts = Counter(weekdays)
weekday_names = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]
weekday_data = {weekday_names[day]: count for day, count in weekday_counts.items()}
# Analyze frequency of commits over time
commit_frequency = {}
if dates:
dates_sorted = sorted(dates)
first_date = dates_sorted[0]
last_date = dates_sorted[-1]
# Calculate commit frequency by month
current_date = first_date.replace(day=1)
while current_date <= last_date:
next_month = current_date.replace(day=28) + datetime.timedelta(days=4)
next_month = next_month.replace(day=1)
month_key = current_date.strftime('%Y-%m')
commit_frequency[month_key] = sum(
1 for date in dates
if date.year == current_date.year and date.month == current_date.month
)
current_date = next_month
return {
"top_contributors": dict(commit_counts.most_common(5)),
"message_length": {
"mean": sum(message_lengths) / len(message_lengths) if message_lengths else 0,
"max": max(message_lengths) if message_lengths else 0,
"min": min(message_lengths) if message_lengths else 0,
},
"commit_time_patterns": {
"by_hour": dict(sorted(hour_counts.items())),
"by_weekday": weekday_data,
},
"commit_frequency": commit_frequency,
}
def _detect_ci_cd(self, repo_data: Dict[str, Any]) -> Dict[str, Any]:
"""Detect CI/CD presence and configuration in the repository."""
ci_cd_indicators = {
"github_actions": False,
"travis": False,
"circle_ci": False,
"jenkins": False,
"gitlab_ci": False,
"azure_pipelines": False,
}
# Check workflows
if "workflows" in repo_data and repo_data["workflows"]:
ci_cd_indicators["github_actions"] = True
# Check for CI configuration files
if "file_distribution" in repo_data:
files = repo_data.get("file_distribution", {})
if ".travis.yml" in files:
ci_cd_indicators["travis"] = True
if ".circleci/config.yml" in files or "circle.yml" in files:
ci_cd_indicators["circle_ci"] = True
if "Jenkinsfile" in files:
ci_cd_indicators["jenkins"] = True
if ".gitlab-ci.yml" in files:
ci_cd_indicators["gitlab_ci"] = True
if "azure-pipelines.yml" in files:
ci_cd_indicators["azure_pipelines"] = True
return {
"has_ci_cd": any(ci_cd_indicators.values()),
"ci_cd_systems": ci_cd_indicators,
}
def _assess_documentation_quality(self, readme: str) -> Dict[str, Any]:
"""Assess the quality of documentation based on the README."""
if not readme:
return {
"has_readme": False,
"readme_length": 0,
"score": 0,
"sections": {},
}
# Analyze the README content
lines = readme.strip().split('\n')
word_count = len(readme.split())
sections = {}
# Check for common README sections
section_keywords = {
"introduction": ["introduction", "overview", "about"],
"installation": ["installation", "install", "setup", "getting started"],
"usage": ["usage", "using", "example", "examples"],
"api": ["api", "reference", "documentation"],
"contributing": ["contributing", "contribute", "development"],
"license": ["license", "licensing"],
"code_of_conduct": ["code of conduct"],
}
for section, keywords in section_keywords.items():
sections[section] = any(
any(keyword.lower() in line.lower() for keyword in keywords)
for line in lines
)
# Count images/diagrams (markdown format)
image_count = readme.count("![")
# Count code examples
code_block_count = readme.count("```")
# Calculate a simple score
section_score = sum(1 for present in sections.values() if present) / len(sections)
has_images = image_count > 0
has_code = code_block_count > 0
length_score = min(1.0, word_count / 1000) # Normalize to 0-1, with 1000+ words being "complete"
score = (section_score * 0.5) + (has_images * 0.2) + (has_code * 0.2) + (length_score * 0.1)
return {
"has_readme": True,
"readme_length": word_count,
"score": score,
"sections": sections,
"has_images": has_images,
"image_count": image_count,
"has_code_examples": has_code,
"code_block_count": code_block_count // 2, # Each block has opening and closing ```
}
def _calculate_activity_level(self, repo_data: Dict[str, Any]) -> Dict[str, Any]:
"""Calculate repository activity level based on commits, PRs, and issues."""
activity_score = 0
activity_details = {}
# Get repository age in months
if "repo_details" in repo_data:
age_days = self._calculate_age_days(repo_data["repo_details"].get("created_at"))
age_months = age_days / 30.5 # Approximate
if age_months < 1:
age_months = 1 # Avoid division by zero
activity_details["age_months"] = age_months
else:
age_months = 1
# Check recent commits (last 3 months)
recent_commits = 0
if "commits" in repo_data:
commits = repo_data["commits"]
three_months_ago = datetime.datetime.now(datetime.timezone.utc) - relativedelta(months=3)
for commit in commits:
if commit.get("date"):
commit_date = datetime.datetime.fromisoformat(commit["date"].replace('Z', '+00:00'))
if commit_date >= three_months_ago:
recent_commits += 1
activity_details["recent_commits"] = recent_commits
activity_score += min(10, recent_commits / 10) # Up to 10 points for recent commits
# Check recent PRs and issues (last 3 months)
recent_prs = 0
if "pull_requests" in repo_data:
prs = repo_data["pull_requests"]
three_months_ago = datetime.datetime.now(datetime.timezone.utc) - relativedelta(months=3)
for pr in prs:
if pr.get("created_at"):
pr_date = datetime.datetime.fromisoformat(pr["created_at"].replace('Z', '+00:00'))
if pr_date >= three_months_ago:
recent_prs += 1
activity_details["recent_prs"] = recent_prs
activity_score += min(5, recent_prs / 5) # Up to 5 points for recent PRs
recent_issues = 0
if "issues" in repo_data:
issues = [issue for issue in repo_data["issues"] if not issue.get("pull_request")]
three_months_ago = datetime.datetime.now(datetime.timezone.utc) - relativedelta(months=3)
for issue in issues:
if issue.get("created_at"):
issue_date = datetime.datetime.fromisoformat(issue["created_at"].replace('Z', '+00:00'))
if issue_date >= three_months_ago:
recent_issues += 1
activity_details["recent_issues"] = recent_issues
activity_score += min(5, recent_issues / 5) # Up to 5 points for recent issues
# Check release frequency
if "releases" in repo_data:
releases = repo_data["releases"]
release_count = len(releases)
# Calculate releases per month
releases_per_month = release_count / max(1, age_months)
activity_details["releases_per_month"] = releases_per_month
activity_score += min(5, releases_per_month * 2.5) # Up to 5 points for regular releases
# Determine activity level
activity_level = "None"
if activity_score >= 20:
activity_level = "Very High"
elif activity_score >= 15:
activity_level = "High"
elif activity_score >= 10:
activity_level = "Medium"
elif activity_score >= 5:
activity_level = "Low"
elif activity_score > 0:
activity_level = "Very Low"
return {
"score": activity_score,
"level": activity_level,
"details": activity_details,
}
def _analyze_code_complexity(self, repo_data: Dict[str, Any]) -> Dict[str, Any]:
"""Estimate code complexity based on available metrics."""
complexity = {}
# Analyze file distribution
if "file_distribution" in repo_data:
file_types = repo_data["file_distribution"]
total_files = sum(file_types.values())
code_files = sum(
count for ext, count in file_types.items()
if ext in self.config.all_code_extensions()
)
complexity["file_counts"] = {
"total_files": total_files,
"code_files": code_files,
}
# Analyze PR complexity
if "pull_requests" in repo_data:
prs = repo_data["pull_requests"]
# Get average changes per PR
additions = [pr.get("additions", 0) for pr in prs if pr.get("additions") is not None]
deletions = [pr.get("deletions", 0) for pr in prs if pr.get("deletions") is not None]
changed_files = [pr.get("changed_files", 0) for pr in prs if pr.get("changed_files") is not None]
if additions and deletions and changed_files:
avg_additions = sum(additions) / len(additions)
avg_deletions = sum(deletions) / len(deletions)
avg_changed_files = sum(changed_files) / len(changed_files)
complexity["pr_complexity"] = {
"avg_additions": avg_additions,
"avg_deletions": avg_deletions,
"avg_changed_files": avg_changed_files,
}
# Estimate complexity score
pr_complexity_score = min(10, (avg_additions + avg_deletions) / 100)
complexity["pr_complexity_score"] = pr_complexity_score
# Check dependency complexity
dependency_complexity_score = 0
if "commit_insights" in repo_data.get("insights", {}):
commit_messages = [
commit.get("commit_message", "").lower()
for commit in repo_data.get("commits", [])
]
# Check for dependency-related keywords
dependency_keywords = ["dependency", "dependencies", "upgrade", "update", "version", "package"]
dependency_commits = sum(
1 for message in commit_messages
if any(keyword in message for keyword in dependency_keywords)
)
dependency_ratio = dependency_commits / len(commit_messages) if commit_messages else 0
dependency_complexity_score = min(5, dependency_ratio * 20) # Up to 5 points
complexity["dependency_complexity"] = {
"dependency_commits": dependency_commits,
"dependency_ratio": dependency_ratio,
"score": dependency_complexity_score,
}
# Overall complexity score
overall_score = 0
contributors = len(repo_data.get("contributors", []))
if contributors > 0:
contributor_score = min(5, contributors / 10) # Up to 5 points
overall_score += contributor_score
if "pr_complexity_score" in complexity:
overall_score += complexity["pr_complexity_score"]
overall_score += dependency_complexity_score
# Code size complexity
if "languages" in repo_data:
languages = repo_data["languages"]
total_bytes = sum(languages.values()) if languages else 0
# Size points based on code size in MB
size_mb = total_bytes / (1024 * 1024)
size_score = min(10, size_mb / 5) # Up to 10 points for large codebases
overall_score += size_score
complexity["code_size"] = {
"total_bytes": total_bytes,
"size_mb": size_mb,
"score": size_score,
}
# Determine complexity level
complexity_level = "Low"
if overall_score >= 25:
complexity_level = "Very High"
elif overall_score >= 20:
complexity_level = "High"
elif overall_score >= 15:
complexity_level = "Medium-High"
elif overall_score >= 10:
complexity_level = "Medium"
elif overall_score >= 5:
complexity_level = "Low-Medium"
complexity["overall"] = {
"score": overall_score,
"level": complexity_level,
}
return complexity
def _analyze_community_health(self, repo_data: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze the community health of the repository."""
health = {}
# Calculate issue responsiveness
if "issues" in repo_data:
issues = repo_data["issues"]
closed_issues = [issue for issue in issues if issue.get("state") == "closed"]
if issues:
closure_rate = len(closed_issues) / len(issues)
health["issue_closure_rate"] = closure_rate
# Calculate average time to close
resolution_times = []
for issue in closed_issues:
if issue.get("created_at") and issue.get("closed_at"):
created = datetime.datetime.fromisoformat(issue["created_at"].replace('Z', '+00:00'))
closed = datetime.datetime.fromisoformat(issue["closed_at"].replace('Z', '+00:00'))
resolution_time = (closed - created).total_seconds() / 3600 # hours
resolution_times.append(resolution_time)
if resolution_times:
avg_resolution_time = sum(resolution_times) / len(resolution_times)
health["avg_issue_resolution_time_hours"] = avg_resolution_time
# Calculate PR review responsiveness
if "pull_requests" in repo_data:
prs = repo_data["pull_requests"]
merged_prs = [pr for pr in prs if pr.get("merged")]
if prs:
merge_rate = len(merged_prs) / len(prs)
health["pr_merge_rate"] = merge_rate
# Calculate average time to merge
merge_times = []
for pr in merged_prs:
if pr.get("created_at") and pr.get("merged_at"):
created = datetime.datetime.fromisoformat(pr["created_at"].replace('Z', '+00:00'))
merged = datetime.datetime.fromisoformat(pr["merged_at"].replace('Z', '+00:00'))
merge_time = (merged - created).total_seconds() / 3600 # hours
merge_times.append(merge_time)
if merge_times:
avg_merge_time = sum(merge_times) / len(merge_times)
health["avg_pr_merge_time_hours"] = avg_merge_time
# Check for community guidelines
community_files = [
"CONTRIBUTING.md",
"CODE_OF_CONDUCT.md",
"SECURITY.md",
"SUPPORT.md",
"GOVERNANCE.md",
]
community_file_presence = {}
if "file_distribution" in repo_data:
file_paths = []
for item in repo_data.get("file_distribution", {}):
file_paths.append(item)
for community_file in community_files:
present = any(community_file.lower() in path.lower() for path in file_paths)
community_file_presence[community_file] = present
health["community_guidelines"] = community_file_presence
# Calculate contributor diversity
if "contributors" in repo_data:
contributors = repo_data["contributors"]
if contributors:
# Calculate Gini coefficient for contribution distribution
gini = self._calculate_gini([c.get("contributions", 0) for c in contributors])
health["contributor_gini"] = gini
# Interpret Gini coefficient
if gini < 0.4:
diversity_level = "High"
elif gini < 0.6:
diversity_level = "Medium"
else:
diversity_level = "Low"
health["contributor_diversity"] = diversity_level
# Calculate overall health score
health_score = 0
# Points for issue responsiveness
if "issue_closure_rate" in health:
health_score += health["issue_closure_rate"] * 10 # Up to 10 points
# Points for PR responsiveness
if "pr_merge_rate" in health:
health_score += health["pr_merge_rate"] * 10 # Up to 10 points
# Points for community guidelines
guideline_count = sum(1 for present in community_file_presence.values() if present)
health_score += guideline_count * 2 # Up to 10 points
# Points for contributor diversity
if "contributor_gini" in health:
diversity_score = 10 * (1 - health["contributor_gini"]) # Up to 10 points
health_score += diversity_score
# Determine health level
health_level = "Poor"
if health_score >= 30:
health_level = "Excellent"
elif health_score >= 25:
health_level = "Very Good"
elif health_score >= 20:
health_level = "Good"
elif health_score >= 15:
health_level = "Fair"
elif health_score >= 10:
health_level = "Needs Improvement"
health["overall"] = {
"score": health_score,
"level": health_level,
}
return health
def generate_visualizations(self, repo_data: Dict[str, Any], insights: Dict[str, Any]) -> Dict[str, plt.Figure]:
"""
Generate visualizations of repository data.
Returns:
Dict of visualization figures
"""
if not self.config.generate_visualizations:
return {}
figures = {}
# Create visualizations
lang_fig = self._visualize_language_distribution(repo_data)
if lang_fig:
figures["language_distribution"] = lang_fig
commit_figs = self._visualize_commit_activity(repo_data, insights)
figures.update(commit_figs)
contrib_figs = self._visualize_contributor_activity(repo_data, insights)
figures.update(contrib_figs)
issue_figs = self._visualize_issues_and_prs(repo_data, insights)
figures.update(issue_figs)
# Add interactive visualizations with Plotly
plotly_figs = self._generate_plotly_visualizations(repo_data, insights)
figures.update(plotly_figs)
# Generate collaboration network
collab_fig = self._visualize_collaboration_network(repo_data, insights)
if collab_fig:
figures["collaboration_network"] = collab_fig
return figures
def _visualize_language_distribution(self, repo_data: Dict[str, Any]) -> Optional[plt.Figure]:
"""Create a visualization of language distribution."""
languages = repo_data.get("languages", {})
if not languages:
return None
# Create a pie chart of language distribution
fig, ax = plt.subplots(figsize=(10, 6))
total = sum(languages.values())
# Filter out small languages for better visualization
threshold = total * 0.01 # 1% threshold
other_sum = sum(size for lang, size in languages.items() if size < threshold)
filtered_languages = {lang: size for lang, size in languages.items() if size >= threshold}
if other_sum > 0:
filtered_languages["Other"] = other_sum
sizes = list(filtered_languages.values())
labels = list(filtered_languages.keys())
wedges, texts, autotexts = ax.pie(
sizes,
labels=labels,
autopct='%1.1f%%',
startangle=90,
shadow=False,
textprops={'fontsize': 9}, # Smaller font for better fit
wedgeprops={'linewidth': 1, 'edgecolor': 'white'} # Add white edge
)
# Make the percentage labels more readable
for autotext in autotexts:
autotext.set_color('white')
autotext.set_fontweight('bold')
ax.axis('equal')
plt.title(f"Language Distribution", fontsize=16)
plt.tight_layout()
return fig
def _visualize_commit_activity(self, repo_data: Dict[str, Any], insights: Dict[str, Any]) -> Dict[str, plt.Figure]:
"""Create visualizations of commit activity."""
figures = {}
commit_activity = repo_data.get("commit_activity", {})
weekly_commits = commit_activity.get("weekly_commits", [])
if weekly_commits:
# Extract weeks and commit counts
weeks = [item["week"] for item in weekly_commits]
commits = [item["total"] for item in weekly_commits]
# Create a time series plot
fig, ax = plt.subplots(figsize=(12, 6))
ax.plot(weeks, commits, marker='o', linestyle='-', color='blue', alpha=0.7)
# Add trend line
z = np.polyfit(range(len(weeks)), commits, 1)
p = np.poly1d(z)
ax.plot(weeks, p(range(len(weeks))), "r--", alpha=0.7)
ax.set_title("Weekly Commit Activity", fontsize=16)
ax.set_xlabel("Week")
ax.set_ylabel("Number of Commits")
plt.xticks(rotation=45)
ax.grid(True, linestyle='--', alpha=0.7)
# Show only some x-axis labels to avoid crowding
if len(weeks) > 20:
every_nth = len(weeks) // 10
for n, label in enumerate(ax.xaxis.get_ticklabels()):
if n % every_nth != 0:
label.set_visible(False)
plt.tight_layout()
figures["weekly_commits"] = fig
# Visualize code frequency if available
code_frequency = commit_activity.get("code_frequency", [])
if code_frequency:
weeks = [item["week"] for item in code_frequency]
additions = [item["additions"] for item in code_frequency]
deletions = [item["deletions"] for item in code_frequency]
fig, ax = plt.subplots(figsize=(12, 6))
ax.plot(weeks, additions, marker='o', linestyle='-', color='green', label='Additions')
ax.plot(weeks, deletions, marker='o', linestyle='-', color='red', label='Deletions')
ax.set_title("Code Frequency", fontsize=16)
ax.set_xlabel("Week")
ax.set_ylabel("Lines Changed")
plt.xticks(rotation=45)
ax.legend()
ax.grid(True, linestyle='--', alpha=0.7)
# Show only some x-axis labels to avoid crowding
if len(weeks) > 20:
every_nth = len(weeks) // 10
for n, label in enumerate(ax.xaxis.get_ticklabels()):
if n % every_nth != 0:
label.set_visible(False)
plt.tight_layout()
figures["code_frequency"] = fig
# Commits by weekday
if "commit_insights" in insights:
commit_insights = insights["commit_insights"]
by_weekday = commit_insights.get("commit_time_patterns", {}).get("by_weekday", {})
if by_weekday:
fig, ax = plt.subplots(figsize=(10, 6))
weekdays = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]
counts = [by_weekday.get(day, 0) for day in weekdays]
# Create gradient colors based on commit counts
colors = plt.cm.Blues(np.array(counts) / max(counts))
ax.bar(weekdays, counts, color=colors)
ax.set_title("Commits by Day of Week", fontsize=16)
ax.set_xlabel("Day of Week")
ax.set_ylabel("Number of Commits")
ax.grid(True, axis='y', linestyle='--', alpha=0.7)
plt.tight_layout()
figures["commits_by_weekday"] = fig
# Commits by hour
by_hour = commit_insights.get("commit_time_patterns", {}).get("by_hour", {})
if by_hour:
fig, ax = plt.subplots(figsize=(12, 6))
hours = sorted(by_hour.keys())
counts = [by_hour[hour] for hour in hours]
# Create gradient colors based on commit counts
colors = plt.cm.Greens(np.array(counts) / max(counts))
ax.bar(hours, counts, color=colors)
ax.set_title("Commits by Hour of Day (UTC)", fontsize=16)
ax.set_xlabel("Hour")
ax.set_ylabel("Number of Commits")
ax.set_xticks(range(0, 24, 2))
ax.grid(True, axis='y', linestyle='--', alpha=0.7)
plt.tight_layout()
figures["commits_by_hour"] = fig
return figures
def _visualize_contributor_activity(self, repo_data: Dict[str, Any], insights: Dict[str, Any]) -> Dict[str, plt.Figure]:
"""Create visualizations of contributor activity."""
figures = {}
contributors = repo_data.get("contributors", [])
if contributors:
# Create a bar chart of top contributors
contributors_sorted = sorted(contributors, key=lambda x: x.get("contributions", 0), reverse=True)
top_n = min(10, len(contributors_sorted))
fig, ax = plt.subplots(figsize=(12, 6))
names = [c.get("login", "Unknown") for c in contributors_sorted[:top_n]]
contributions = [c.get("contributions", 0) for c in contributors_sorted[:top_n]]
# Create gradient colors based on contribution counts
colors = plt.cm.viridis(np.array(contributions) / max(contributions))
bars = ax.bar(names, contributions, color=colors)
ax.set_title("Top Contributors by Commit Count", fontsize=16)
ax.set_xlabel("Contributor")
ax.set_ylabel("Number of Commits")
plt.xticks(rotation=45, ha='right')
ax.grid(True, axis='y', linestyle='--', alpha=0.7)
# Add value labels on top of bars
for bar in bars:
height = bar.get_height()
ax.annotate(f'{height}',
xy=(bar.get_x() + bar.get_width() / 2, height),
xytext=(0, 3), # 3 points vertical offset
textcoords="offset points",
ha='center', va='bottom')
plt.tight_layout()
figures["top_contributors"] = fig
# Visualize contribution distribution if insights available
if "contributor_insights" in insights:
contributor_insights = insights["contributor_insights"]
distribution = contributor_insights.get("contribution_distribution", {})
if distribution:
# Create a pie chart showing contributor concentration
fig, ax = plt.subplots(figsize=(10, 6))
percentiles = [
distribution.get("contributors_for_20_percent", 0),
distribution.get("contributors_for_50_percent", 0) - distribution.get("contributors_for_20_percent", 0),
distribution.get("contributors_for_80_percent", 0) - distribution.get("contributors_for_50_percent", 0),
len(contributors) - distribution.get("contributors_for_80_percent", 0)
]
labels = [
f"Top {percentiles[0]} contributors (0-20%)",
f"Next {percentiles[1]} contributors (20-50%)",
f"Next {percentiles[2]} contributors (50-80%)",
f"Remaining {percentiles[3]} contributors (80-100%)"
]
wedges, texts, autotexts = ax.pie(
[20, 30, 30, 20], # Fixed percentages for visualization
labels=labels,
autopct='%1.1f%%',
startangle=90,
shadow=False,
explode=(0.1, 0, 0, 0), # Emphasize the top contributors
wedgeprops={'linewidth': 1, 'edgecolor': 'white'} # Add white edge
)
# Make the percentage labels more readable
for autotext in autotexts:
autotext.set_color('white')
autotext.set_fontweight('bold')
ax.axis('equal')
ax.set_title("Contribution Distribution", fontsize=16)
plt.tight_layout()
figures["contribution_distribution"] = fig
return figures
def _visualize_issues_and_prs(self, repo_data: Dict[str, Any], insights: Dict[str, Any]) -> Dict[str, plt.Figure]:
"""Create visualizations of issues and pull requests."""
figures = {}
# Visualize issue distribution if available
if "issue_insights" in insights:
issue_insights = insights["issue_insights"]
# Issues by state
by_state = issue_insights.get("by_state", {})
if by_state:
fig, ax = plt.subplots(figsize=(8, 6))
states = list(by_state.keys())
counts = list(by_state.values())
colors = ['red' if state.lower() == 'open' else 'green' for state in states]
ax.bar(states, counts, color=colors)
ax.set_title("Issues by State", fontsize=16)
ax.set_xlabel("State")
ax.set_ylabel("Count")
# Add count labels on top of bars
for i, v in enumerate(counts):
ax.text(i, v + 0.5, str(v), ha='center')
ax.grid(True, axis='y', linestyle='--', alpha=0.7)
plt.tight_layout()
figures["issues_by_state"] = fig
# Issues by month
by_month = issue_insights.get("by_month", {})
if by_month:
fig, ax = plt.subplots(figsize=(12, 6))
months = sorted(by_month.keys())
counts = [by_month[month] for month in months]
ax.plot(months, counts, marker='o', linestyle='-', color='blue')
# Add trend line
z = np.polyfit(range(len(months)), counts, 1)
p = np.poly1d(z)
ax.plot(months, p(range(len(months))), "r--", alpha=0.7)
ax.set_title("Issues Created by Month", fontsize=16)
ax.set_xlabel("Month")
ax.set_ylabel("Number of Issues")
plt.xticks(rotation=45)
ax.grid(True, linestyle='--', alpha=0.7)
# Show only some x-axis labels to avoid crowding
if len(months) > 12:
every_nth = max(1, len(months) // 12)
for n, label in enumerate(ax.xaxis.get_ticklabels()):
if n % every_nth != 0:
label.set_visible(False)
plt.tight_layout()
figures["issues_by_month"] = fig
# Issues by label
by_label = issue_insights.get("by_label", {})
if by_label and len(by_label) > 1:
fig, ax = plt.subplots(figsize=(12, 6))
labels = list(by_label.keys())
counts = list(by_label.values())
# Sort by count
sorted_indices = np.argsort(counts)[::-1]
labels = [labels[i] for i in sorted_indices]
counts = [counts[i] for i in sorted_indices]
# Limit to top 10
if len(labels) > 10:
labels = labels[:10]
counts = counts[:10]
# Create gradient colors
colors = plt.cm.tab10(np.linspace(0, 1, len(labels)))
bars = ax.barh(labels, counts, color=colors)
ax.set_title("Top Issue Labels", fontsize=16)
ax.set_xlabel("Count")
ax.set_ylabel("Label")
# Add count labels
for bar in bars:
width = bar.get_width()
ax.annotate(f'{int(width)}',
xy=(width, bar.get_y() + bar.get_height() / 2),
xytext=(3, 0), # 3 points horizontal offset
textcoords="offset points",
ha='left', va='center')
ax.grid(True, axis='x', linestyle='--', alpha=0.7)
plt.tight_layout()
figures["issues_by_label"] = fig
# Visualize PR insights if available
if "pr_insights" in insights and "pr_code_change_stats" in insights:
pr_code_stats = insights["pr_code_change_stats"]
# Additions and deletions by PR
if "additions" in pr_code_stats and "deletions" in pr_code_stats:
fig, ax = plt.subplots(figsize=(10, 6))
categories = ["Mean", "Median", "Max"]
additions = [
pr_code_stats["additions"].get("mean", 0),
pr_code_stats["additions"].get("median", 0),
pr_code_stats["additions"].get("max", 0) / 10 # Scale down for visibility
]
deletions = [
pr_code_stats["deletions"].get("mean", 0),
pr_code_stats["deletions"].get("median", 0),
pr_code_stats["deletions"].get("max", 0) / 10 # Scale down for visibility
]
x = range(len(categories))
width = 0.35
addition_bars = ax.bar([i - width/2 for i in x], additions, width, label='Additions', color='green')
deletion_bars = ax.bar([i + width/2 for i in x], deletions, width, label='Deletions', color='red')
ax.set_xlabel('Metric')
ax.set_ylabel('Lines of Code')
ax.set_title('PR Code Change Statistics')
plt.xticks(x, categories)
ax.legend()
# Add value labels
for bars in [addition_bars, deletion_bars]:
for bar in bars:
height = bar.get_height()
ax.annotate(f'{int(height)}',
xy=(bar.get_x() + bar.get_width() / 2, height),
xytext=(0, 3), # 3 points vertical offset
textcoords="offset points",
ha='center', va='bottom')
if "max" in pr_code_stats["additions"]:
plt.annotate(f"Max: {int(pr_code_stats['additions']['max'])}",
(2 - width/2, additions[2] + 5),
textcoords="offset points",
xytext=(0,10),
ha='center')
if "max" in pr_code_stats["deletions"]:
plt.annotate(f"Max: {int(pr_code_stats['deletions']['max'])}",
(2 + width/2, deletions[2] + 5),
textcoords="offset points",
xytext=(0,10),
ha='center')
plt.tight_layout()
figures["pr_code_changes"] = fig
return figures
def _generate_plotly_visualizations(self, repo_data: Dict[str, Any], insights: Dict[str, Any]) -> Dict[str, Any]:
"""Generate interactive Plotly visualizations."""
plotly_figures = {}
# Activity heatmap (commits by day and hour)
if "commits" in repo_data:
commits = repo_data["commits"]
dates = []
for commit in commits:
date_str = commit.get("date")
if date_str:
try:
date = datetime.datetime.fromisoformat(date_str.replace('Z', '+00:00'))
dates.append(date)
except ValueError:
pass
if dates:
# Group by day of week and hour
day_hour_counts = defaultdict(int)
for date in dates:
day_hour_counts[(date.weekday(), date.hour)] += 1
# Create 2D array for heatmap
days = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]
hours = list(range(24))
z = np.zeros((7, 24))
for (day, hour), count in day_hour_counts.items():
z[day][hour] = count
# Create heatmap
fig = go.Figure(data=go.Heatmap(
z=z,
x=hours,
y=days,
colorscale='Viridis',
hoverongaps=False,
hovertemplate='Day: %{y}<br>Hour: %{x}<br>Commits: %{z}<extra></extra>'
))
fig.update_layout(
title='Commit Activity Heatmap',
xaxis_title='Hour of Day (UTC)',
yaxis_title='Day of Week',
yaxis={'categoryorder': 'array', 'categoryarray': days},
width=900,
height=500
)
plotly_figures["commit_heatmap"] = fig
# Language breakdown treemap
if "languages" in repo_data:
languages = repo_data["languages"]
if languages:
# Create data for treemap
labels = list(languages.keys())
values = list(languages.values())
fig = go.Figure(go.Treemap(
labels=labels,
values=values,
parents=[""] * len(labels),
marker_colorscale='RdBu',
hovertemplate='Language: %{label}<br>Bytes: %{value}<br>Percentage: %{percentRoot:.2%}<extra></extra>'
))
fig.update_layout(
title='Repository Language Breakdown',
width=800,
height=600
)
plotly_figures["language_treemap"] = fig
# Issue/PR timeline
issues = repo_data.get("issues", [])
prs = repo_data.get("pull_requests", [])
if issues or prs:
# Create timeline data
timeline_data = []
for issue in issues:
if not issue.get("pull_request") and issue.get("created_at"):
try:
created_date = datetime.datetime.fromisoformat(issue["created_at"].replace('Z', '+00:00'))
timeline_data.append({
"date": created_date,
"type": "Issue",
"id": issue.get("number", ""),
"title": issue.get("title", ""),
"state": issue.get("state", "")
})
except ValueError:
pass
for pr in prs:
if pr.get("created_at"):
try:
created_date = datetime.datetime.fromisoformat(pr["created_at"].replace('Z', '+00:00'))
timeline_data.append({
"date": created_date,
"type": "PR",
"id": pr.get("number", ""),
"title": pr.get("title", ""),
"state": pr.get("state", "")
})
except ValueError:
pass
if timeline_data:
# Sort by date
timeline_data.sort(key=lambda x: x["date"])
# Create DataFrame for easier plotting
df = pd.DataFrame(timeline_data)
# Calculate cumulative counts
df["cumulative_issues"] = (df["type"] == "Issue").cumsum()
df["cumulative_prs"] = (df["type"] == "PR").cumsum()
# Create plot
fig = go.Figure()
fig.add_trace(go.Scatter(
x=df["date"],
y=df["cumulative_issues"],
mode='lines',
name='Issues',
line=dict(color='red', width=2)
))
fig.add_trace(go.Scatter(
x=df["date"],
y=df["cumulative_prs"],
mode='lines',
name='Pull Requests',
line=dict(color='blue', width=2)
))
fig.update_layout(
title='Cumulative Issues and Pull Requests Over Time',
xaxis_title='Date',
yaxis_title='Count',
legend=dict(
yanchor="top",
y=0.99,
xanchor="left",
x=0.01
),
width=900,
height=500
)
plotly_figures["issue_pr_timeline"] = fig
return plotly_figures
def _visualize_collaboration_network(self, repo_data: Dict[str, Any], insights: Dict[str, Any]) -> Optional[plt.Figure]:
"""Create a visualization of the collaboration network."""
if "pull_requests" not in repo_data or "contributors" not in repo_data:
return None
prs = repo_data["pull_requests"]
contributors = repo_data["contributors"]
# Create a network of collaborations
G = nx.Graph()
# Add nodes (contributors)
contributor_logins = [c.get("login") for c in contributors if c.get("login")]
for login in contributor_logins:
G.add_node(login)
# Add edges (collaborations through PRs)
collaborations = defaultdict(int)
for pr in prs:
author = pr.get("user_login")
if not author or author not in contributor_logins:
continue
# Consider reviewers as collaborators
reviewers = pr.get("requested_reviewers", [])
for reviewer in reviewers:
if reviewer in contributor_logins and reviewer != author:
pair = tuple(sorted([author, reviewer]))
collaborations[pair] += 1
for (author, reviewer), weight in collaborations.items():
G.add_edge(author, reviewer, weight=weight)
if not G.edges():
return None
# Draw the collaboration network
fig, ax = plt.subplots(figsize=(12, 10))
# Calculate node sizes based on contributions
contributor_dict = {c.get("login"): c.get("contributions", 1) for c in contributors if c.get("login")}
node_sizes = [contributor_dict.get(node, 1) * 30 for node in G.nodes()]
# Calculate edge widths based on collaboration count
edge_widths = [G[u][v]['weight'] * 0.5 for u, v in G.edges()]
# Calculate node colors based on contributor roles
# (assign different colors to different types of contributors)
color_map = []
for node in G.nodes():
degree = G.degree(node)
if degree > 5:
color_map.append('red') # Central collaborators
elif degree > 2:
color_map.append('blue') # Active collaborators
else:
color_map.append('green') # Peripheral contributors
# Position nodes using a force-directed layout
pos = nx.spring_layout(G, seed=42)
# Draw the network
nx.draw_networkx_nodes(G, pos, node_size=node_sizes, node_color=color_map, alpha=0.8)
nx.draw_networkx_edges(G, pos, width=edge_widths, alpha=0.5, edge_color='gray')
nx.draw_networkx_labels(G, pos, font_size=8, font_family='sans-serif')
ax.set_title("Collaboration Network", fontsize=16)
ax.axis('off')
plt.tight_layout()
return fig
def analyze_repo(self, owner: str, repo_name: str) -> Dict[str, Any]:
"""
Main method to analyze a repository.
Args:
owner: GitHub username or organization
repo_name: Name of the repository
Returns:
Dict containing all repository data and insights
"""
start_time = time.time()
logger.info(f"Starting analysis of {owner}/{repo_name}")
repo_path = f"{owner}/{repo_name}"
repo = self.client.get_repo(repo_path)
repo_data = {}
# Collect basic repository metadata
repo_data["repo_details"] = self.get_repo_details(repo)
# Define data collection tasks
tasks = [
("contributors", lambda: self.get_contributors(repo)),
("languages", lambda: self.get_languages(repo)),
("issues", lambda: self.get_issues(repo, "all")),
("pull_requests", lambda: self.get_pull_requests(repo, "all")),
("commits", lambda: self.get_commits(repo)),
("readme", lambda: self.get_readme(repo)),
("branches", lambda: self.get_branches(repo)),
("releases", lambda: self.get_releases(repo)),
("workflows", lambda: self.get_workflows(repo)),
("file_distribution", lambda: self.get_file_distribution(repo)),
("collaborators", lambda: self.get_collaborators(repo)),
("commit_activity", lambda: self.analyze_commit_activity(repo)),
("contributor_activity", lambda: self.analyze_contributor_activity(repo)),
]
# Search for security and quality indicators
important_terms = [
"security", "vulnerability", "auth", "password", "token",
"test", "spec", "fixture", "mock", "stub",
"TODO", "FIXME", "HACK", "XXX"
]
tasks.append(("code_search", lambda: self.search_code(repo, important_terms)))
# Collect data with progress bar
with tqdm(total=len(tasks), desc="Collecting repository data") as pbar:
for key, task_func in tasks:
try:
result = task_func()
repo_data[key] = result
except Exception as e:
logger.error(f"Error collecting {key}: {e}")
finally:
pbar.update(1)
# Generate insights from collected data
repo_data["insights"] = self.generate_insights(repo_data)
# Generate visualizations
if self.config.generate_visualizations:
repo_data["visualizations"] = self.generate_visualizations(repo_data, repo_data["insights"])
end_time = time.time()
logger.info(f"Analysis completed in {end_time - start_time:.2f} seconds")
return repo_data
class PDFReportGenerator:
"""
Class for generating comprehensive PDF reports from repository analysis data.
"""
def __init__(self, repo_data: Dict[str, Any], output_path: str = None):
"""Initialize the PDF report generator with repository data."""
self.repo_data = repo_data
self.output_path = output_path or tempfile.mktemp(suffix='.pdf')
self.styles = getSampleStyleSheet()
# Create custom styles
self.styles.add(ParagraphStyle(
name='SectionTitle',
parent=self.styles['Heading2'],
fontSize=14,
spaceAfter=10
))
self.styles.add(ParagraphStyle(
name='SubsectionTitle',
parent=self.styles['Heading3'],
fontSize=12,
spaceAfter=6
))
self.styles.add(ParagraphStyle(
name='MetricsTable',
parent=self.styles['Normal'],
fontSize=10,
alignment=TA_LEFT
))
self.styles.add(ParagraphStyle(
name='Small',
parent=self.styles['Normal'],
fontSize=8
))
self.styles.add(ParagraphStyle(
name='ReportTitle',
parent=self.styles['Title'],
fontSize=24,
alignment=TA_CENTER,
spaceAfter=20
))
def generate_report(self) -> str:
"""
Generate a PDF report of repository analysis.
Returns:
str: Path to the generated PDF file
"""
doc = SimpleDocTemplate(
self.output_path,
pagesize=letter,
rightMargin=72, leftMargin=72,
topMargin=72, bottomMargin=72
)
elements = []
# Add report title
repo_name = self.repo_data.get("repo_details", {}).get("full_name", "Repository")
elements.append(Paragraph(f"GitHub Repository Analysis: {repo_name}", self.styles['ReportTitle']))
# Add report generation date
report_date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
elements.append(Paragraph(f"Report generated on: {report_date}", self.styles['Normal']))
elements.append(Spacer(1, 20))
# Add repository overview section
elements.extend(self._create_repo_overview())
elements.append(PageBreak())
# Add activity analysis section
elements.extend(self._create_activity_analysis())
elements.append(PageBreak())
# Add code analysis section
elements.extend(self._create_code_analysis())
elements.append(PageBreak())
# Add community analysis section
elements.extend(self._create_community_analysis())
# Add visualizations if available
if self.repo_data.get("visualizations"):
elements.append(PageBreak())
elements.extend(self._create_visualization_pages())
# Add summary and recommendations
elements.append(PageBreak())
elements.extend(self._create_summary_and_recommendations())
# Build the PDF
doc.build(elements)
return self.output_path
def _create_repo_overview(self) -> List[Any]:
"""Create repository overview section of the report."""
elements = []
# Section title
elements.append(Paragraph("Repository Overview", self.styles['Heading1']))
elements.append(Spacer(1, 10))
# Basic repository information
repo_details = self.repo_data.get("repo_details", {})
# Create a table for repository details
data = [
["Name", repo_details.get("name", "N/A")],
["Full Name", repo_details.get("full_name", "N/A")],
["Description", repo_details.get("description", "No description")],
["URL", repo_details.get("html_url", "N/A")],
["Primary Language", repo_details.get("language", "Not specified")],
["Created On", repo_details.get("created_at", "N/A")],
["Last Updated", repo_details.get("updated_at", "N/A")],
["Stars", str(repo_details.get("stargazers_count", 0))],
["Forks", str(repo_details.get("forks_count", 0))],
["Watchers", str(repo_details.get("watchers_count", 0))],
["Open Issues", str(repo_details.get("open_issues_count", 0))],
["License", repo_details.get("license", "Not specified")],
["Fork", "Yes" if repo_details.get("fork", False) else "No"],
["Archived", "Yes" if repo_details.get("archived", False) else "No"],
["Visibility", repo_details.get("visibility", "N/A").capitalize()],
]
table = Table(data, colWidths=[100, 350])
table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (0, -1), colors.lightgrey),
('TEXTCOLOR', (0, 0), (0, -1), colors.black),
('ALIGN', (0, 0), (0, -1), 'RIGHT'),
('ALIGN', (1, 0), (1, -1), 'LEFT'),
('VALIGN', (0, 0), (-1, -1), 'MIDDLE'),
('FONTNAME', (0, 0), (0, -1), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, -1), 10),
('BOTTOMPADDING', (0, 0), (-1, -1), 6),
('TOPPADDING', (0, 0), (-1, -1), 6),
('GRID', (0, 0), (-1, -1), 0.5, colors.grey),
]))
elements.append(table)
elements.append(Spacer(1, 20))
# Key metrics and insights
elements.append(Paragraph("Key Metrics & Insights", self.styles['SectionTitle']))
insights = self.repo_data.get("insights", {})
# Repository age
age_days = insights.get("repository_age_days", 0)
age_years = age_days / 365.25
freshness_days = insights.get("freshness_days", 0)
age_text = f"Repository Age: {age_years:.1f} years ({int(age_days)} days)"
freshness_text = f"Last Activity: {int(freshness_days)} days ago"
elements.append(Paragraph(age_text, self.styles['Normal']))
elements.append(Paragraph(freshness_text, self.styles['Normal']))
elements.append(Spacer(1, 10))
# Activity level
activity_level = insights.get("activity_level", {})
if activity_level:
activity_text = f"Activity Level: {activity_level.get('level', 'Unknown')} (Score: {activity_level.get('score', 0):.1f}/25)"
elements.append(Paragraph(activity_text, self.styles['Normal']))
elements.append(Spacer(1, 10))
# Code complexity
code_complexity = insights.get("code_complexity", {}).get("overall", {})
if code_complexity:
complexity_text = f"Code Complexity: {code_complexity.get('level', 'Unknown')} (Score: {code_complexity.get('score', 0):.1f}/30)"
elements.append(Paragraph(complexity_text, self.styles['Normal']))
elements.append(Spacer(1, 10))
# Documentation quality
doc_quality = insights.get("documentation_quality", {})
if doc_quality:
quality_score = doc_quality.get("score", 0)
quality_level = "High" if quality_score > 0.7 else "Medium" if quality_score > 0.4 else "Low"
doc_text = f"Documentation Quality: {quality_level} (Score: {quality_score:.2f})"
elements.append(Paragraph(doc_text, self.styles['Normal']))
elements.append(Spacer(1, 10))
# Community health
community_health = insights.get("community_health", {}).get("overall", {})
if community_health:
health_text = f"Community Health: {community_health.get('level', 'Unknown')} (Score: {community_health.get('score', 0):.1f}/40)"
elements.append(Paragraph(health_text, self.styles['Normal']))
return elements
def _create_activity_analysis(self) -> List[Any]:
"""Create activity analysis section of the report."""
elements = []
# Section title
elements.append(Paragraph("Activity Analysis", self.styles['Heading1']))
elements.append(Spacer(1, 10))
insights = self.repo_data.get("insights", {})
# Commit activity
elements.append(Paragraph("Commit Activity", self.styles['SectionTitle']))
commit_insights = insights.get("commit_insights", {})
if commit_insights:
# Top contributors
top_contributors = commit_insights.get("top_contributors", {})
if top_contributors:
elements.append(Paragraph("Top Contributors by Commits:", self.styles['SubsectionTitle']))
data = [["Contributor", "Commits"]]
for contributor, commits in top_contributors.items():
data.append([contributor, str(commits)])
table = Table(data, colWidths=[200, 100])
table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.black),
('ALIGN', (0, 0), (0, -1), 'LEFT'),
('ALIGN', (1, 0), (1, -1), 'RIGHT'),
('VALIGN', (0, 0), (-1, -1), 'MIDDLE'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, -1), 10),
('BOTTOMPADDING', (0, 0), (-1, -1), 4),
('GRID', (0, 0), (-1, -1), 0.5, colors.grey),
]))
elements.append(table)
elements.append(Spacer(1, 15))
# Commit time patterns
time_patterns = commit_insights.get("commit_time_patterns", {})
if time_patterns:
elements.append(Paragraph("Commit Timing Patterns:", self.styles['SubsectionTitle']))
weekday_data = time_patterns.get("by_weekday", {})
if weekday_data:
day_text = "Most active day: " + max(weekday_data.items(), key=lambda x: x[1])[0]
elements.append(Paragraph(day_text, self.styles['Normal']))
hour_data = time_patterns.get("by_hour", {})
if hour_data and hour_data:
hour = max(hour_data.items(), key=lambda x: x[1])[0]
hour_text = f"Most active hour: {hour}:00 UTC"
elements.append(Paragraph(hour_text, self.styles['Normal']))
elements.append(Spacer(1, 10))
# Pull Request activity
elements.append(Paragraph("Pull Request Activity", self.styles['SectionTitle']))
pr_insights = insights.get("pr_insights", {})
pr_code_changes = insights.get("pr_code_change_stats", {})
if pr_insights or pr_code_changes:
# PR state distribution
state_counts = pr_insights.get("by_state", {})
if state_counts:
elements.append(Paragraph("Pull Request States:", self.styles['SubsectionTitle']))
data = [["State", "Count"]]
for state, count in state_counts.items():
data.append([state.capitalize(), str(count)])
table = Table(data, colWidths=[100, 100])
table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.black),
('ALIGN', (0, 0), (0, -1), 'LEFT'),
('ALIGN', (1, 0), (1, -1), 'RIGHT'),
('VALIGN', (0, 0), (-1, -1), 'MIDDLE'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('GRID', (0, 0), (-1, -1), 0.5, colors.grey),
]))
elements.append(table)
elements.append(Spacer(1, 15))
# PR code change statistics
if pr_code_changes:
elements.append(Paragraph("Pull Request Size Statistics:", self.styles['SubsectionTitle']))
# Table for code change stats
data = [["Metric", "Additions", "Deletions", "Files Changed"]]
metrics = ["mean", "median", "max", "total"]
for metric in metrics:
row = [metric.capitalize()]
for stat_type in ["additions", "deletions", "changed_files"]:
if stat_type in pr_code_changes and metric in pr_code_changes[stat_type]:
value = pr_code_changes[stat_type][metric]
row.append(f"{value:.1f}" if isinstance(value, float) else str(value))
else:
row.append("N/A")
data.append(row)
table = Table(data, colWidths=[80, 80, 80, 80])
table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.black),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('ALIGN', (0, 0), (0, -1), 'LEFT'),
('ALIGN', (1, 0), (-1, -1), 'RIGHT'),
('GRID', (0, 0), (-1, -1), 0.5, colors.grey),
]))
elements.append(table)
elements.append(Spacer(1, 15))
# Issue activity
elements.append(Paragraph("Issue Activity", self.styles['SectionTitle']))
issue_insights = insights.get("issue_insights", {})
if issue_insights:
# Issue state distribution
state_counts = issue_insights.get("by_state", {})
if state_counts:
elements.append(Paragraph("Issue States:", self.styles['SubsectionTitle']))
data = [["State", "Count"]]
for state, count in state_counts.items():
data.append([state.capitalize(), str(count)])
table = Table(data, colWidths=[100, 100])
table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.black),
('ALIGN', (0, 0), (0, -1), 'LEFT'),
('ALIGN', (1, 0), (1, -1), 'RIGHT'),
('VALIGN', (0, 0), (-1, -1), 'MIDDLE'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('GRID', (0, 0), (-1, -1), 0.5, colors.grey),
]))
elements.append(table)
elements.append(Spacer(1, 15))
# Issue resolution time
resolution_stats = issue_insights.get("resolution_time", {})
if resolution_stats:
elements.append(Paragraph("Issue Resolution Time (hours):", self.styles['SubsectionTitle']))
mean_hours = resolution_stats.get("mean_hours", 0)
median_hours = resolution_stats.get("median_hours", 0)
if mean_hours > 24:
mean_days = mean_hours / 24
mean_text = f"Mean: {mean_days:.1f} days"
else:
mean_text = f"Mean: {mean_hours:.1f} hours"
if median_hours > 24:
median_days = median_hours / 24
median_text = f"Median: {median_days:.1f} days"
else:
median_text = f"Median: {median_hours:.1f} hours"
elements.append(Paragraph(mean_text, self.styles['Normal']))
elements.append(Paragraph(median_text, self.styles['Normal']))
elements.append(Spacer(1, 10))
# Top issue labels
top_labels = issue_insights.get("by_label", {})
if top_labels:
elements.append(Paragraph("Top Issue Labels:", self.styles['SubsectionTitle']))
data = [["Label", "Count"]]
for label, count in list(top_labels.items())[:5]: # Top 5 labels
data.append([label, str(count)])
table = Table(data, colWidths=[150, 50])
table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.black),
('ALIGN', (0, 0), (0, -1), 'LEFT'),
('ALIGN', (1, 0), (1, -1), 'RIGHT'),
('VALIGN', (0, 0), (-1, -1), 'MIDDLE'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('GRID', (0, 0), (-1, -1), 0.5, colors.grey),
]))
elements.append(table)
return elements
def _create_code_analysis(self) -> List[Any]:
"""Create code analysis section of the report."""
elements = []
# Section title
elements.append(Paragraph("Code Analysis", self.styles['Heading1']))
elements.append(Spacer(1, 10))
# Language distribution
elements.append(Paragraph("Language Distribution", self.styles['SectionTitle']))
languages = self.repo_data.get("languages", {})
insights = self.repo_data.get("insights", {})
if languages:
# Sort languages by byte count
sorted_languages = sorted(languages.items(), key=lambda x: x[1], reverse=True)
# Create language distribution table
data = [["Language", "Bytes", "Percentage"]]
total_bytes = sum(languages.values())
for language, bytes_count in sorted_languages[:10]: # Top 10 languages
percentage = (bytes_count / total_bytes) * 100
data.append([
language,
f"{bytes_count:,}",
f"{percentage:.1f}%"
])
table = Table(data, colWidths=[120, 120, 80])
table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.black),
('ALIGN', (0, 0), (0, -1), 'LEFT'),
('ALIGN', (1, 0), (2, -1), 'RIGHT'),
('VALIGN', (0, 0), (-1, -1), 'MIDDLE'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('GRID', (0, 0), (-1, -1), 0.5, colors.grey),
]))
elements.append(table)
elements.append(Spacer(1, 15))
# File distribution
elements.append(Paragraph("File Type Distribution", self.styles['SectionTitle']))
file_dist = self.repo_data.get("file_distribution", {})
if file_dist:
# Group extensions by type
file_types = {
"Code": sum(file_dist.get(ext, 0) for ext in self.config.code_extensions),
"Markup": sum(file_dist.get(ext, 0) for ext in self.config.markup_extensions),
"Scripts": sum(file_dist.get(ext, 0) for ext in self.config.script_extensions),
"Data": sum(file_dist.get(ext, 0) for ext in self.config.data_extensions),
"Config": sum(file_dist.get(ext, 0) for ext in self.config.config_extensions),
"Notebooks": sum(file_dist.get(ext, 0) for ext in self.config.notebook_extensions),
"Other": sum(file_dist.get(ext, 0) for ext in self.config.other_extensions)
}
# Create file type distribution table
data = [["File Type", "Count", "Percentage"]]
total_files = sum(file_types.values())
for file_type, count in sorted(file_types.items(), key=lambda x: x[1], reverse=True):
if count > 0:
percentage = (count / total_files) * 100
data.append([
file_type,
str(count),
f"{percentage:.1f}%"
])
table = Table(data, colWidths=[120, 80, 80])
table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.black),
('ALIGN', (0, 0), (0, -1), 'LEFT'),
('ALIGN', (1, 0), (2, -1), 'RIGHT'),
('VALIGN', (0, 0), (-1, -1), 'MIDDLE'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('GRID', (0, 0), (-1, -1), 0.5, colors.grey),
]))
elements.append(table)
elements.append(Spacer(1, 15))
# Code complexity analysis
elements.append(Paragraph("Code Complexity Analysis", self.styles['SectionTitle']))
code_complexity = insights.get("code_complexity", {})
if code_complexity:
complexity_overall = code_complexity.get("overall", {})
elements.append(Paragraph(
f"Overall Complexity: {complexity_overall.get('level', 'Unknown')} (Score: {complexity_overall.get('score', 0):.1f}/30)",
self.styles['Normal']
))
elements.append(Spacer(1, 10))
# Code size
code_size = code_complexity.get("code_size", {})
if code_size:
size_mb = code_size.get("size_mb", 0)
elements.append(Paragraph(f"Code Size: {size_mb:.2f} MB", self.styles['Normal']))
elements.append(Spacer(1, 5))
# PR complexity
pr_complexity = code_complexity.get("pr_complexity", {})
if pr_complexity:
elements.append(Paragraph("Average Pull Request Size:", self.styles['SubsectionTitle']))
avg_additions = pr_complexity.get("avg_additions", 0)
avg_deletions = pr_complexity.get("avg_deletions", 0)
avg_files = pr_complexity.get("avg_changed_files", 0)
elements.append(Paragraph(f"Lines Added: {avg_additions:.1f}", self.styles['Normal']))
elements.append(Paragraph(f"Lines Deleted: {avg_deletions:.1f}", self.styles['Normal']))
elements.append(Paragraph(f"Files Changed: {avg_files:.1f}", self.styles['Normal']))
elements.append(Spacer(1, 10))
# CI/CD presence
elements.append(Paragraph("CI/CD Systems", self.styles['SectionTitle']))
ci_cd = insights.get("ci_cd_presence", {})
if ci_cd:
has_ci_cd = ci_cd.get("has_ci_cd", False)
systems = ci_cd.get("ci_cd_systems", {})
if has_ci_cd:
elements.append(Paragraph("Detected CI/CD Systems:", self.styles['Normal']))
detected_systems = [name for name, present in systems.items() if present]
for system in detected_systems:
elements.append(Paragraph(f"• {system.replace('_', ' ').title()}", self.styles['Normal']))
else:
elements.append(Paragraph("No CI/CD systems detected", self.styles['Normal']))
return elements
def _create_community_analysis(self) -> List[Any]:
"""Create community analysis section of the report."""
elements = []
# Section title
elements.append(Paragraph("Community Analysis", self.styles['Heading1']))
elements.append(Spacer(1, 10))
insights = self.repo_data.get("insights", {})
# Contributor insights
elements.append(Paragraph("Contributor Analysis", self.styles['SectionTitle']))
contributor_insights = insights.get("contributor_insights", {})
if contributor_insights:
contributor_count = contributor_insights.get("contributor_count", 0)
total_contributions = contributor_insights.get("total_contributions", 0)
avg_contributions = contributor_insights.get("avg_contributions_per_contributor", 0)
elements.append(Paragraph(f"Total Contributors: {contributor_count}", self.styles['Normal']))
elements.append(Paragraph(f"Total Contributions: {total_contributions}", self.styles['Normal']))
elements.append(Paragraph(f"Average Contributions per Contributor: {avg_contributions:.1f}", self.styles['Normal']))
elements.append(Spacer(1, 10))
# Contribution distribution
distribution = contributor_insights.get("contribution_distribution", {})
if distribution:
elements.append(Paragraph("Contribution Distribution:", self.styles['SubsectionTitle']))
gini = distribution.get("gini_coefficient", 0)
top_percent = distribution.get("top_contributor_percentage", 0)
contributors_20 = distribution.get("contributors_for_20_percent", 0)
contributors_50 = distribution.get("contributors_for_50_percent", 0)
contributors_80 = distribution.get("contributors_for_80_percent", 0)
# Format distribution metrics
elements.append(Paragraph(f"Top Contributor: {top_percent:.1f}% of all contributions", self.styles['Normal']))
elements.append(Paragraph(f"Contributors for first 20% work: {contributors_20}", self.styles['Normal']))
elements.append(Paragraph(f"Contributors for first 50% work: {contributors_50}", self.styles['Normal']))
elements.append(Paragraph(f"Contributors for first 80% work: {contributors_80}", self.styles['Normal']))
elements.append(Paragraph(f"Gini Coefficient: {gini:.2f} ({'High' if gini > 0.6 else 'Medium' if gini > 0.4 else 'Low'} inequality)", self.styles['Normal']))
elements.append(Spacer(1, 15))
# Community health
elements.append(Paragraph("Community Health", self.styles['SectionTitle']))
community_health = insights.get("community_health", {})
if community_health:
health_overall = community_health.get("overall", {})
elements.append(Paragraph(
f"Overall Health: {health_overall.get('level', 'Unknown')} (Score: {health_overall.get('score', 0):.1f}/40)",
self.styles['Normal']
))
elements.append(Spacer(1, 10))
# Issue and PR responsiveness
if "issue_closure_rate" in community_health:
closure_rate = community_health.get("issue_closure_rate", 0)
elements.append(Paragraph(f"Issue Closure Rate: {closure_rate:.1%}", self.styles['Normal']))
if "avg_issue_resolution_time_hours" in community_health:
resolution_hours = community_health.get("avg_issue_resolution_time_hours", 0)
if resolution_hours > 72:
resolution_days = resolution_hours / 24
elements.append(Paragraph(f"Avg. Issue Resolution Time: {resolution_days:.1f} days", self.styles['Normal']))
else:
elements.append(Paragraph(f"Avg. Issue Resolution Time: {resolution_hours:.1f} hours", self.styles['Normal']))
if "pr_merge_rate" in community_health:
merge_rate = community_health.get("pr_merge_rate", 0)
elements.append(Paragraph(f"PR Merge Rate: {merge_rate:.1%}", self.styles['Normal']))
if "avg_pr_merge_time_hours" in community_health:
merge_hours = community_health.get("avg_pr_merge_time_hours", 0)
if merge_hours > 72:
merge_days = merge_hours / 24
elements.append(Paragraph(f"Avg. PR Merge Time: {merge_days:.1f} days", self.styles['Normal']))
else:
elements.append(Paragraph(f"Avg. PR Merge Time: {merge_hours:.1f} hours", self.styles['Normal']))
elements.append(Spacer(1, 10))
# Community guidelines
community_files = community_health.get("community_guidelines", {})
if community_files:
elements.append(Paragraph("Community Guidelines:", self.styles['SubsectionTitle']))
files = [
("CONTRIBUTING.md", "Contributing Guidelines"),
("CODE_OF_CONDUCT.md", "Code of Conduct"),
("SECURITY.md", "Security Policy"),
("SUPPORT.md", "Support Information"),
("GOVERNANCE.md", "Governance Model")
]
data = [["Guideline", "Present"]]
for file_name, display_name in files:
present = community_files.get(file_name, False)
data.append([display_name, "✓" if present else "✗"])
table = Table(data, colWidths=[150, 50])
table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.black),
('ALIGN', (0, 0), (0, -1), 'LEFT'),
('ALIGN', (1, 0), (1, -1), 'CENTER'),
('VALIGN', (0, 0), (-1, -1), 'MIDDLE'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('GRID', (0, 0), (-1, -1), 0.5, colors.grey),
('TEXTCOLOR', (1, 1), (1, -1), lambda row, col: colors.green if data[row][col] == "✓" else colors.red),
]))
elements.append(table)
elements.append(Spacer(1, 15))
# Documentation quality
elements.append(Paragraph("Documentation Analysis", self.styles['SectionTitle']))
doc_quality = insights.get("documentation_quality", {})
if doc_quality:
has_readme = doc_quality.get("has_readme", False)
if has_readme:
quality_score = doc_quality.get("score", 0)
quality_level = "High" if quality_score > 0.7 else "Medium" if quality_score > 0.4 else "Low"
word_count = doc_quality.get("readme_length", 0)
elements.append(Paragraph(f"README Quality: {quality_level} (Score: {quality_score:.2f})", self.styles['Normal']))
elements.append(Paragraph(f"README Length: {word_count} words", self.styles['Normal']))
elements.append(Spacer(1, 10))
# Section analysis
sections = doc_quality.get("sections", {})
if sections:
elements.append(Paragraph("README Sections Present:", self.styles['SubsectionTitle']))
section_labels = {
"introduction": "Introduction/Overview",
"installation": "Installation Instructions",
"usage": "Usage Examples",
"api": "API Documentation",
"contributing": "Contributing Guidelines",
"license": "License Information",
"code_of_conduct": "Code of Conduct"
}
data = [["Section", "Present"]]
for section_key, section_label in section_labels.items():
present = sections.get(section_key, False)
data.append([section_label, "✓" if present else "✗"])
table = Table(data, colWidths=[150, 50])
table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.black),
('ALIGN', (0, 0), (0, -1), 'LEFT'),
('ALIGN', (1, 0), (1, -1), 'CENTER'),
('VALIGN', (0, 0), (-1, -1), 'MIDDLE'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('GRID', (0, 0), (-1, -1), 0.5, colors.grey),
('TEXTCOLOR', (1, 1), (1, -1), lambda row, col: colors.green if data[row][col] == "✓" else colors.red),
]))
elements.append(table)
elements.append(Spacer(1, 10))
# Additional doc quality metrics
has_images = doc_quality.get("has_images", False)
has_code = doc_quality.get("has_code_examples", False)
metrics_text = "Additional Features: "
if has_images:
img_count = doc_quality.get("image_count", 0)
metrics_text += f"{img_count} images/diagrams, "
if has_code:
code_blocks = doc_quality.get("code_block_count", 0)
metrics_text += f"{code_blocks} code examples"
if has_images or has_code:
elements.append(Paragraph(metrics_text, self.styles['Normal']))
else:
elements.append(Paragraph("No README file found.", self.styles['Normal']))
return elements
def _create_visualization_pages(self) -> List[Any]:
"""Create pages with visualizations."""
elements = []
# Section title
elements.append(Paragraph("Visualizations", self.styles['Heading1']))
elements.append(Spacer(1, 10))
visualizations = self.repo_data.get("visualizations", {})
# Organize visualizations by category
categories = {
"Language Analysis": ["language_distribution", "language_treemap"],
"Commit Activity": ["weekly_commits", "code_frequency", "commits_by_weekday", "commits_by_hour", "commit_heatmap"],
"Contributor Analysis": ["top_contributors", "contribution_distribution", "collaboration_network"],
"Issue & PR Analysis": ["issues_by_state", "issues_by_month", "issues_by_label", "pr_code_changes", "issue_pr_timeline"]
}
# Add visualizations by category
for category, viz_keys in categories.items():
category_visualizations = [key for key in viz_keys if key in visualizations]
if category_visualizations:
elements.append(Paragraph(category, self.styles['SectionTitle']))
elements.append(Spacer(1, 10))
for viz_key in category_visualizations:
fig = visualizations.get(viz_key)
if fig:
# Save figure to a temporary buffer
img_buffer = BytesIO()
if isinstance(fig, go.Figure):
# Handle Plotly figures
fig.write_image(img_buffer, format="png", width=800, height=500)
else:
# Handle Matplotlib figures
fig.savefig(img_buffer, format="png", dpi=150)
img_buffer.seek(0)
img = Image(img_buffer, width=6*inch, height=4*inch)
# Add caption
caption = viz_key.replace("_", " ").title()
elements.append(Paragraph(caption, self.styles['SubsectionTitle']))
elements.append(img)
elements.append(Spacer(1, 20))
# Add page break after each category
elements.append(PageBreak())
return elements
def _create_summary_and_recommendations(self) -> List[Any]:
"""Create summary and recommendations section."""
elements = []
# Section title
elements.append(Paragraph("Summary & Recommendations", self.styles['Heading1']))
elements.append(Spacer(1, 10))
# Repository summary
elements.append(Paragraph("Project Summary", self.styles['SectionTitle']))
insights = self.repo_data.get("insights", {})
repo_details = self.repo_data.get("repo_details", {})
# Short description of the project
repo_name = repo_details.get("name", "The repository")
repo_desc = repo_details.get("description", "")
primary_lang = repo_details.get("language", "various languages")
summary_text = f"{repo_name} is a {primary_lang} project"
if repo_desc:
summary_text += f" that {repo_desc.lower() if repo_desc[0].isupper() else repo_desc}"
summary_text += "."
elements.append(Paragraph(summary_text, self.styles['Normal']))
elements.append(Spacer(1, 10))
# Key metrics summary
community_health = insights.get("community_health", {}).get("overall", {})
activity_level = insights.get("activity_level", {})
code_complexity = insights.get("code_complexity", {}).get("overall", {})
metrics_text = f"The project has {repo_details.get('stargazers_count', 0)} stars and {repo_details.get('forks_count', 0)} forks."
if "contributor_insights" in insights:
contributor_count = insights["contributor_insights"].get("contributor_count", 0)
metrics_text += f" It has {contributor_count} contributors"
gini = insights["contributor_insights"].get("contribution_distribution", {}).get("gini_coefficient", 0)
if gini > 0.7:
metrics_text += " with a highly centralized contribution pattern"
elif gini > 0.4:
metrics_text += " with a moderately distributed contribution pattern"
else:
metrics_text += " with a well-distributed contribution pattern"
metrics_text += "."
elements.append(Paragraph(metrics_text, self.styles['Normal']))
elements.append(Spacer(1, 10))
# Activity summary
if activity_level:
activity_text = f"The project shows {activity_level.get('level', 'Unknown').lower()} activity levels"
# Add activity context
if activity_level.get('level') in ["High", "Very High"]:
activity_text += " with regular commits and issue management."
elif activity_level.get('level') in ["Medium"]:
activity_text += " with moderate development progress."
else:
activity_text += " with limited recent development."
elements.append(Paragraph(activity_text, self.styles['Normal']))
elements.append(Spacer(1, 10))
# Code quality summary
if code_complexity:
complexity_text = f"The codebase has {code_complexity.get('level', 'Unknown').lower()} complexity"
if code_complexity.get('level') in ["High", "Very High"]:
complexity_text += ", which may present challenges for new contributors and maintenance."
elif code_complexity.get('level') in ["Medium", "Medium-High"]:
complexity_text += " with a reasonable balance between functionality and maintainability."
else:
complexity_text += " and should be relatively straightforward to understand and maintain."
elements.append(Paragraph(complexity_text, self.styles['Normal']))
elements.append(Spacer(1, 10))
# Community health summary
if community_health:
health_text = f"The project demonstrates {community_health.get('level', 'Unknown').lower()} community health"
if community_health.get('level') in ["Excellent", "Very Good", "Good"]:
health_text += " with responsive maintainers and clear contribution guidelines."
elif community_health.get('level') in ["Fair"]:
health_text += " with some community structures in place."
else:
health_text += " with opportunities for improved community engagement."
elements.append(Paragraph(health_text, self.styles['Normal']))
elements.append(Spacer(1, 15))
# Recommendations
elements.append(Paragraph("Recommendations", self.styles['SectionTitle']))
recommendations = []
# Documentation recommendations
doc_quality = insights.get("documentation_quality", {})
if doc_quality:
score = doc_quality.get("score", 0)
if score < 0.4:
recommendations.append("Improve documentation by adding more comprehensive README content, including usage examples and API documentation.")
elif score < 0.7:
recommendations.append("Enhance existing documentation with more examples and clearer installation instructions.")
sections = doc_quality.get("sections", {})
missing_key_sections = []
if not sections.get("installation", False):
missing_key_sections.append("installation instructions")
if not sections.get("usage", False):
missing_key_sections.append("usage examples")
if missing_key_sections:
recommendations.append(f"Add missing documentation sections: {', '.join(missing_key_sections)}.")
# Community recommendations
community_files = insights.get("community_health", {}).get("community_guidelines", {})
if community_files:
missing_guidelines = []
if not community_files.get("CONTRIBUTING.md", False):
missing_guidelines.append("contribution guidelines")
if not community_files.get("CODE_OF_CONDUCT.md", False):
missing_guidelines.append("code of conduct")
if missing_guidelines:
recommendations.append(f"Create missing community files: {', '.join(missing_guidelines)}.")
# Issue management recommendations
issue_insights = insights.get("issue_insights", {})
if issue_insights:
resolution_time = issue_insights.get("resolution_time", {}).get("mean_hours", 0)
if resolution_time > 168: # 1 week
recommendations.append("Improve issue response time to enhance user experience and community engagement.")
# Code complexity recommendations
if code_complexity and code_complexity.get('level') in ["High", "Very High"]:
recommendations.append("Consider refactoring complex parts of the codebase to improve maintainability.")
# CI/CD recommendations
ci_cd = insights.get("ci_cd_presence", {})
if not ci_cd.get("has_ci_cd", False):
recommendations.append("Implement CI/CD pipelines (e.g., GitHub Actions) to automate testing and deployment.")
# Activity recommendations
if activity_level and activity_level.get('level') in ["Low", "Very Low", "None"]:
recommendations.append("Revitalize project with regular updates and community engagement to attract more contributors.")
# Add recommendations to the report
if recommendations:
for i, recommendation in enumerate(recommendations, 1):
elements.append(Paragraph(f"{i}. {recommendation}", self.styles['Normal']))
elements.append(Spacer(1, 5))
else:
elements.append(Paragraph("This project follows good development practices and no significant improvements are needed at this time.", self.styles['Normal']))
return elements
class RAGHelper:
"""
Helper class for Retrieval Augmented Generation (RAG) to enhance chatbot responses
with repository insights.
"""
def __init__(self, repo_data: Dict[str, Any]):
"""Initialize with repository data."""
self.repo_data = repo_data
self.insights = repo_data.get("insights", {})
# Extract key information for easy retrieval
self._extract_key_info()
def _extract_key_info(self):
"""Extract and organize key information from repository data."""
self.repo_info = {}
# Basic repository details
if "repo_details" in self.repo_data:
details = self.repo_data["repo_details"]
self.repo_info["name"] = details.get("name", "")
self.repo_info["full_name"] = details.get("full_name", "")
self.repo_info["description"] = details.get("description", "")
self.repo_info["url"] = details.get("html_url", "")
self.repo_info["stars"] = details.get("stargazers_count", 0)
self.repo_info["forks"] = details.get("forks_count", 0)
self.repo_info["language"] = details.get("language", "")
self.repo_info["created_at"] = details.get("created_at", "")
self.repo_info["license"] = details.get("license", "")
# Languages used
if "languages" in self.repo_data:
languages = self.repo_data["languages"]
total_bytes = sum(languages.values()) if languages else 0
if total_bytes > 0:
language_percentages = {
lang: (bytes_count / total_bytes) * 100
for lang, bytes_count in languages.items()
}
self.repo_info["language_breakdown"] = language_percentages
sorted_languages = sorted(language_percentages.items(), key=lambda x: x[1], reverse=True)
self.repo_info["top_languages"] = sorted_languages[:5]
# Contributors
if "contributors" in self.repo_data:
contributors = self.repo_data["contributors"]
self.repo_info["total_contributors"] = len(contributors)
if contributors:
sorted_contributors = sorted(contributors, key=lambda x: x.get("contributions", 0), reverse=True)
self.repo_info["top_contributors"] = [
{
"name": c.get("login", "Unknown"),
"contributions": c.get("contributions", 0)
}
for c in sorted_contributors[:5]
]
# Activity metrics
if "commit_insights" in self.insights:
commit_insights = self.insights["commit_insights"]
self.repo_info["commit_patterns"] = commit_insights.get("commit_time_patterns", {})
self.repo_info["top_committers"] = commit_insights.get("top_contributors", {})
# Documentation quality
if "documentation_quality" in self.insights:
doc_quality = self.insights["documentation_quality"]
self.repo_info["documentation_score"] = doc_quality.get("score", 0)
self.repo_info["documentation_quality"] = (
"High" if doc_quality.get("score", 0) > 0.7
else "Medium" if doc_quality.get("score", 0) > 0.4
else "Low"
)
self.repo_info["readme_sections"] = doc_quality.get("sections", {})
# Community health
if "community_health" in self.insights:
community_health = self.insights["community_health"]
self.repo_info["community_health_level"] = community_health.get("overall", {}).get("level", "Unknown")
self.repo_info["community_guidelines"] = community_health.get("community_guidelines", {})
# Activity level
if "activity_level" in self.insights:
activity_level = self.insights["activity_level"]
self.repo_info["activity_level"] = activity_level.get("level", "Unknown")
# Code complexity
if "code_complexity" in self.insights:
code_complexity = self.insights["code_complexity"]
self.repo_info["code_complexity_level"] = code_complexity.get("overall", {}).get("level", "Unknown")
def get_context_for_query(self, query: str) -> str:
"""
Retrieve relevant context from repository data based on the query.
Args:
query: The user's query
Returns:
str: Contextual information to enhance the response
"""
# Convert query to lowercase for easier matching
query_lower = query.lower()
# Define keywords for different aspects of the repository
keywords = {
"overview": ["overview", "about", "what is", "tell me about", "summary"],
"languages": ["language", "programming language", "code language", "tech stack"],
"contributors": ["contributor", "who", "team", "maintainer", "author"],
"activity": ["activity", "active", "commit", "update", "recent", "frequency"],
"documentation": ["documentation", "docs", "readme", "well documented"],
"community": ["community", "health", "governance", "conduct", "guideline"],
"complexity": ["complex", "complexity", "difficult", "simple", "codebase", "understand"],
"issues": ["issue", "bug", "problem", "ticket", "feature request"],
"pulls": ["pull request", "pr", "merge", "contribution"],
}
# Check which aspects are relevant to the query
relevant_aspects = []
for aspect, terms in keywords.items():
if any(term in query_lower for term in terms):
relevant_aspects.append(aspect)
# If no specific aspects are identified, provide a general overview
if not relevant_aspects:
relevant_aspects = ["overview"]
# Build context information based on relevant aspects
context_parts = []
# Repository overview
if "overview" in relevant_aspects:
repo_name = self.repo_info.get("full_name", "The repository")
stars = self.repo_info.get("stars", 0)
forks = self.repo_info.get("forks", 0)
description = self.repo_info.get("description", "")
overview = f"{repo_name} is a GitHub repository with {stars} stars and {forks} forks. "
if description:
overview += f"Description: {description}. "
language = self.repo_info.get("language", "")
if language:
overview += f"It's primarily written in {language}. "
created_at = self.repo_info.get("created_at", "")
if created_at:
try:
date = datetime.datetime.fromisoformat(created_at.replace('Z', '+00:00'))
overview += f"The repository was created on {date.strftime('%B %d, %Y')}. "
except (ValueError, AttributeError):
pass
context_parts.append(overview)
# Language breakdown
if "languages" in relevant_aspects:
top_languages = self.repo_info.get("top_languages", [])
if top_languages:
languages_text = "Language breakdown: "
languages_text += ", ".join([f"{lang}: {pct:.1f}%" for lang, pct in top_languages])
languages_text += "."
context_parts.append(languages_text)
# Contributors
if "contributors" in relevant_aspects:
total_contributors = self.repo_info.get("total_contributors", 0)
top_contributors = self.repo_info.get("top_contributors", [])
contributors_text = f"The repository has {total_contributors} contributors. "
if top_contributors:
contributors_text += "Top contributors: "
contributors_text += ", ".join([
f"{c['name']} ({c['contributions']} commits)"
for c in top_contributors
])
contributors_text += "."
context_parts.append(contributors_text)
# Activity metrics
if "activity" in relevant_aspects:
activity_level = self.repo_info.get("activity_level", "Unknown")
activity_text = f"Activity level: {activity_level}. "
commit_patterns = self.repo_info.get("commit_patterns", {})
by_weekday = commit_patterns.get("by_weekday", {})
if by_weekday:
most_active_day = max(by_weekday.items(), key=lambda x: x[1])[0]
activity_text += f"Most active day of the week: {most_active_day}. "
context_parts.append(activity_text)
# Documentation quality
if "documentation" in relevant_aspects:
doc_quality = self.repo_info.get("documentation_quality", "Unknown")
doc_score = self.repo_info.get("documentation_score", 0)
docs_text = f"Documentation quality: {doc_quality} (score: {doc_score:.2f}/1.0). "
readme_sections = self.repo_info.get("readme_sections", {})
if readme_sections:
present_sections = [k for k, v in readme_sections.items() if v]
missing_sections = [k for k, v in readme_sections.items() if not v]
if present_sections:
docs_text += f"README includes sections on: {', '.join(present_sections)}. "
if missing_sections:
docs_text += f"README is missing sections on: {', '.join(missing_sections)}."
context_parts.append(docs_text)
# Community health
if "community" in relevant_aspects:
health_level = self.repo_info.get("community_health_level", "Unknown")
guidelines = self.repo_info.get("community_guidelines", {})
community_text = f"Community health: {health_level}. "
if guidelines:
present_guidelines = [k for k, v in guidelines.items() if v]
missing_guidelines = [k for k, v in guidelines.items() if not v]
if present_guidelines:
community_text += f"Has community files: {', '.join(present_guidelines)}. "
if missing_guidelines:
community_text += f"Missing community files: {', '.join(missing_guidelines)}."
context_parts.append(community_text)
# Code complexity
if "complexity" in relevant_aspects:
complexity_level = self.repo_info.get("code_complexity_level", "Unknown")
complexity_text = f"Code complexity: {complexity_level}."
context_parts.append(complexity_text)
# Issues
if "issues" in relevant_aspects and "issue_insights" in self.insights:
issue_insights = self.insights["issue_insights"]
by_state = issue_insights.get("by_state", {})
issues_text = "Issues: "
if by_state:
issues_text += ", ".join([f"{count} {state}" for state, count in by_state.items()])
issues_text += ". "
resolution_time = issue_insights.get("resolution_time", {})
if resolution_time:
mean_hours = resolution_time.get("mean_hours", 0)
if mean_hours > 24:
mean_days = mean_hours / 24
issues_text += f"Average resolution time: {mean_days:.1f} days."
else:
issues_text += f"Average resolution time: {mean_hours:.1f} hours."
context_parts.append(issues_text)
# Pull requests
if "pulls" in relevant_aspects and "pr_insights" in self.insights:
pr_insights = self.insights["pr_insights"]
by_state = pr_insights.get("by_state", {})
prs_text = "Pull Requests: "
if by_state:
prs_text += ", ".join([f"{count} {state}" for state, count in by_state.items()])
prs_text += ". "
context_parts.append(prs_text)
# Join all context parts
context = " ".join(context_parts)
return context
def create_gradio_interface():
"""
Create and launch the Gradio interface for GitHub repository analysis.
"""
# Styling
css = """
.gradio-container {max-width: 100% !important}
.main-analysis-area {min-height: 600px}
.analysis-result {overflow-y: auto; max-height: 500px}
.chat-interface {border: 1px solid #ccc; border-radius: 5px; padding: 10px}
.pdf-download {margin-top: 20px}
"""
# Initialize state
repo_data = {}
analyzer = None
def parse_repo_url(url: str) -> Tuple[str, str]:
"""Parse GitHub repository URL into owner and repo name."""
# Pattern for GitHub repo URLs
patterns = [
r"github\.com\/([^\/]+)\/([^\/]+)", # github.com/owner/repo
r"github\.com\/([^\/]+)\/([^\/]+)\/?$", # github.com/owner/repo/
r"github\.com\/([^\/]+)\/([^\/]+)\.git", # github.com/owner/repo.git
]
for pattern in patterns:
match = re.search(pattern, url)
if match:
return match.group(1), match.group(2)
return None, None
def analyze_repository(repo_url: str, is_private: bool, github_token: str = None, progress=gr.Progress()) -> Tuple[str, Dict]:
"""Analyze GitHub repository and return the analysis results."""
# Validate URL and extract owner/repo
owner, repo_name = parse_repo_url(repo_url)
if not owner or not repo_name:
return "Invalid GitHub repository URL. Please use format: https://github.com/owner/repo", {}
# Use provided token or default token
token = github_token if is_private and github_token else os.environ.get("GITHUB_TOKEN", "")
if is_private and not token:
return "GitHub token is required for private repositories.", {}
# Configure analyzer
config = GitHubAPIConfig(token=token)
nonlocal analyzer
analyzer = GitHubRepoAnalyzer(config)
# Analyze repository with progress updates
progress(0, desc="Starting repository analysis...")
try:
progress(0.1, desc="Fetching repository details...")
global repo_data
repo_data = analyzer.analyze_repo(owner, repo_name)
progress(0.9, desc="Generating insights...")
# Create a summary of the analysis
repo_details = repo_data.get("repo_details", {})
insights = repo_data.get("insights", {})
repo_name = repo_details.get("full_name", "")
description = repo_details.get("description", "No description provided")
stars = repo_details.get("stargazers_count", 0)
forks = repo_details.get("forks_count", 0)
language = repo_details.get("language", "Unknown")
# Calculate age
created_at = repo_details.get("created_at", "")
age_str = "Unknown"
if created_at:
try:
created_date = datetime.datetime.fromisoformat(created_at.replace('Z', '+00:00'))
age_days = (datetime.datetime.now(datetime.timezone.utc) - created_date).days
age_years = age_days / 365.25
age_str = f"{age_years:.1f} years ({age_days} days)"
except (ValueError, AttributeError):
pass
# Get activity level
activity_level = insights.get("activity_level", {}).get("level", "Unknown")
# Documentation quality
doc_quality = insights.get("documentation_quality", {})
has_readme = doc_quality.get("has_readme", False)
doc_score = doc_quality.get("score", 0) if has_readme else 0
doc_quality_level = "High" if doc_score > 0.7 else "Medium" if doc_score > 0.4 else "Low"
# Community health
community_health = insights.get("community_health", {}).get("overall", {})
health_level = community_health.get("level", "Unknown")
# Code complexity
code_complexity = insights.get("code_complexity", {}).get("overall", {})
complexity_level = code_complexity.get("level", "Unknown")
# Create summary HTML
summary_html = f"""
<h1>{repo_name}</h1>
<p><strong>Description:</strong> {description}</p>
<div style="display: flex; flex-wrap: wrap; gap: 20px; margin-bottom: 20px;">
<div style="flex: 1; min-width: 200px;">
<h3>Repository Details</h3>
<ul>
<li><strong>Primary Language:</strong> {language}</li>
<li><strong>Stars:</strong> {stars}</li>
<li><strong>Forks:</strong> {forks}</li>
<li><strong>Age:</strong> {age_str}</li>
<li><strong>License:</strong> {repo_details.get("license", "Not specified")}</li>
</ul>
</div>
<div style="flex: 1; min-width: 200px;">
<h3>Key Insights</h3>
<ul>
<li><strong>Activity Level:</strong> {activity_level}</li>
<li><strong>Documentation Quality:</strong> {doc_quality_level}</li>
<li><strong>Community Health:</strong> {health_level}</li>
<li><strong>Code Complexity:</strong> {complexity_level}</li>
</ul>
</div>
</div>
"""
# Contributors section
contributors = repo_data.get("contributors", [])
if contributors:
top_contributors = sorted(contributors, key=lambda x: x.get("contributions", 0), reverse=True)[:5]
summary_html += f"""
<div style="margin-bottom: 20px;">
<h3>Top Contributors</h3>
<div style="display: flex; flex-wrap: wrap; gap: 10px;">
"""
for contributor in top_contributors:
avatar_url = contributor.get("avatar_url", "")
login = contributor.get("login", "Unknown")
contributions = contributor.get("contributions", 0)
summary_html += f"""
<div style="text-align: center; width: 100px;">
<img src="{avatar_url}" style="width: 50px; height: 50px; border-radius: 25px; margin-bottom: 5px;">
<div><strong>{login}</strong></div>
<div>{contributions} commits</div>
</div>
"""
summary_html += """
</div>
</div>
"""
# Language distribution section
languages = repo_data.get("languages", {})
if languages:
total_bytes = sum(languages.values())
language_percentages = [
(lang, bytes_count, (bytes_count / total_bytes) * 100)
for lang, bytes_count in languages.items()
]
sorted_languages = sorted(language_percentages, key=lambda x: x[1], reverse=True)[:5]
summary_html += f"""
<div style="margin-bottom: 20px;">
<h3>Language Distribution</h3>
<div style="display: flex; flex-direction: column; gap: 5px;">
"""
for lang, bytes_count, percentage in sorted_languages:
bar_width = max(1, min(100, percentage))
summary_html += f"""
<div>
<div style="display: flex; align-items: center; gap: 10px;">
<div style="width: 100px; text-align: right;"><strong>{lang}</strong></div>
<div style="flex-grow: 1; background-color: #eee; height: 20px; border-radius: 10px;">
<div style="width: {bar_width}%; background-color: #4CAF50; height: 100%; border-radius: 10px;"></div>
</div>
<div style="width: 60px;">{percentage:.1f}%</div>
</div>
</div>
"""
summary_html += """
</div>
</div>
"""
progress(1.0, desc="Analysis complete!")
return summary_html, repo_data
except Exception as e:
error_message = f"Error analyzing repository: {str(e)}"
logger.error(error_message)
return error_message, {}
def generate_pdf_report() -> Tuple[str, Dict[str, str]]:
"""Generate and download PDF report."""
if not repo_data:
return "Please analyze a repository first.", {}
try:
# Create PDF report
pdf_generator = PDFReportGenerator(repo_data)
pdf_path = pdf_generator.generate_report()
# Return file path for download
repo_name = repo_data.get("repo_details", {}).get("full_name", "repository").replace("/", "_")
return f"PDF report generated for {repo_name}", {"report.pdf": pdf_path}
except Exception as e:
error_message = f"Error generating PDF report: {str(e)}"
logger.error(error_message)
return error_message, {}
def chat_with_repo(query: str, history: List[Tuple[str, str]]) -> str:
"""
Chat with the repository analysis data using RAG approach.
Args:
query: User's question
history: Chat history
Returns:
str: Response to the user's question
"""
if not repo_data:
return "Please analyze a repository first before asking questions."
try:
# Use RAG helper to get relevant context
rag_helper = RAGHelper(repo_data)
context = rag_helper.get_context_for_query(query)
# For a real implementation, you would use the Gemini API here
# This is a simulated response based on the context
# Format response based on the query and context
response = ""
# Extract repo name for more natural responses
repo_name = repo_data.get("repo_details", {}).get("name", "The repository")
# General info about the repo
if any(term in query.lower() for term in ["what is", "tell me about", "overview", "about"]):
response = f"{context}\n\nIs there something specific about {repo_name} you'd like to know more about?"
# Language related queries
elif any(term in query.lower() for term in ["language", "programming", "written in"]):
response = f"{context}\n\nWould you like to know more about any specific language used in {repo_name}?"
# Contributor related queries
elif any(term in query.lower() for term in ["contributor", "who", "maintain", "author"]):
response = f"{context}\n\nI can provide more details about specific contributors if you're interested."
# Activity related queries
elif any(term in query.lower() for term in ["active", "activity", "commit", "frequency"]):
response = f"{context}\n\nWould you like to see visualizations of the commit activity patterns?"
# Documentation related queries
elif any(term in query.lower() for term in ["document", "readme", "docs"]):
response = f"{context}\n\nIs there a specific aspect of the documentation you'd like feedback on?"
# Code complexity related queries
elif any(term in query.lower() for term in ["complex", "difficulty", "understand"]):
response = f"{context}\n\nWould you like suggestions for navigating this codebase effectively?"
# Default response for other queries
else:
response = f"Based on my analysis of {repo_name}:\n\n{context}\n\nIs there anything specific you'd like to know more about?"
return response
except Exception as e:
error_message = f"Error processing your question: {str(e)}"
logger.error(error_message)
return error_message
# Create Gradio interface
with gr.Blocks(css=css) as interface:
gr.Markdown("# GitHub Repository Analyzer")
gr.Markdown("Analyze GitHub repositories and chat about the insights")
with gr.Tab("Repository Analysis"):
with gr.Row():
with gr.Column(scale=3):
repo_url = gr.Textbox(label="GitHub Repository URL", placeholder="https://github.com/owner/repo")
with gr.Column(scale=1):
is_private = gr.Checkbox(label="Private Repository")
github_token = gr.Textbox(label="GitHub Token (for private repos)", type="password", visible=False)
# Show/hide token input based on private repo checkbox
is_private.change(fn=lambda x: gr.update(visible=x), inputs=[is_private], outputs=[github_token])
analyze_btn = gr.Button("Analyze Repository", variant="primary")
with gr.Row():
with gr.Column(scale=2):
analysis_result = gr.HTML(label="Analysis Result", elem_classes=["analysis-result"])
with gr.Column(scale=1):
with gr.Group():
gr.Markdown("### PDF Report")
pdf_btn = gr.Button("Generate PDF Report", variant="secondary")
pdf_output = gr.Markdown()
pdf_download = gr.File(label="Download Report", elem_classes=["pdf-download"])
# Connect buttons to functions
analyze_btn.click(
fn=analyze_repository,
inputs=[repo_url, is_private, github_token],
outputs=[analysis_result, pdf_output]
)
pdf_btn.click(
fn=generate_pdf_report,
inputs=[],
outputs=[pdf_output, pdf_download]
)
with gr.Tab("Chat with Repository"):
gr.Markdown("Ask questions about the repository and get insights")
chatbot = gr.Chatbot(elem_classes=["chat-interface"])
msg = gr.Textbox(
placeholder="Ask me anything about the repository...",
show_label=False
)
clear = gr.Button("Clear")
# Connect chat interface
msg.submit(
fn=chat_with_repo,
inputs=[msg, chatbot],
outputs=[chatbot],
postprocess=lambda x: [(msg.value, x)]
).then(lambda: "", None, msg)
clear.click(lambda: None, None, chatbot, queue=False)
return interface
# Main code to run the application
if __name__ == "__main__":
# Create and launch Gradio interface
interface = create_gradio_interface()
interface.launch(debug=True, share=True)