Buckets:
ktongue/docker_container / .vscode-server /extensions /ms-python.vscode-python-envs-1.22.0 /analysis /git_analysis.py
| # Copyright (c) Microsoft Corporation. All rights reserved. | |
| # Licensed under the MIT License. | |
| """ | |
| Git-based code analysis inspired by "Your Code as a Crime Scene" and "Software Design X-Rays". | |
| Extracts metrics from git history: | |
| - Change frequency (hotspots) | |
| - Code churn (lines added/removed) | |
| - Temporal coupling (files that change together) | |
| - Author diversity / bus factor | |
| - File age analysis | |
| """ | |
| import pathlib | |
| import subprocess | |
| from collections import defaultdict | |
| from dataclasses import dataclass, field | |
| from datetime import datetime, timezone | |
| from typing import Dict, List, Optional, Set, Tuple | |
| # Limit history analysis for performance | |
| MAX_COMMITS = 1000 | |
| DAYS_OF_HISTORY = 365 | |
| class FileStats: | |
| """Statistics for a single file from git history.""" | |
| path: str | |
| change_count: int = 0 | |
| lines_added: int = 0 | |
| lines_removed: int = 0 | |
| authors: Set[str] = field(default_factory=set) | |
| last_modified: Optional[datetime] = None | |
| first_seen: Optional[datetime] = None | |
| def churn(self) -> int: | |
| """Total code churn (additions + deletions).""" | |
| return self.lines_added + self.lines_removed | |
| def author_count(self) -> int: | |
| """Number of unique authors who touched this file.""" | |
| return len(self.authors) | |
| def to_dict(self) -> dict: | |
| """Convert to JSON-serializable dictionary.""" | |
| return { | |
| "path": self.path, | |
| "change_count": self.change_count, | |
| "lines_added": self.lines_added, | |
| "lines_removed": self.lines_removed, | |
| "churn": self.churn, | |
| "author_count": self.author_count, | |
| "authors": sorted(self.authors), | |
| "last_modified": self.last_modified.isoformat() | |
| if self.last_modified | |
| else None, | |
| "first_seen": self.first_seen.isoformat() if self.first_seen else None, | |
| "age_days": (datetime.now(timezone.utc) - self.last_modified).days | |
| if self.last_modified | |
| else None, | |
| } | |
| class TemporalCoupling: | |
| """Represents files that frequently change together.""" | |
| file1: str | |
| file2: str | |
| coupled_commits: int | |
| coupling_ratio: float # coupled_commits / min(file1_changes, file2_changes) | |
| def to_dict(self) -> dict: | |
| return { | |
| "file1": self.file1, | |
| "file2": self.file2, | |
| "coupled_commits": self.coupled_commits, | |
| "coupling_ratio": round(self.coupling_ratio, 3), | |
| } | |
| def run_git_command(args: List[str], cwd: pathlib.Path) -> str: | |
| """Run a git command and return stdout.""" | |
| result = subprocess.run( | |
| ["git"] + args, | |
| cwd=cwd, | |
| capture_output=True, | |
| text=True, | |
| check=True, | |
| timeout=60, | |
| ) | |
| return result.stdout | |
| def get_tracked_files(repo_root: pathlib.Path) -> Set[str]: | |
| """Get set of currently tracked files in the repository.""" | |
| output = run_git_command(["ls-files"], repo_root) | |
| return set(output.strip().split("\n")) if output.strip() else set() | |
| def analyze_git_log(repo_root: pathlib.Path) -> Dict[str, FileStats]: | |
| """ | |
| Parse git log to extract file statistics. | |
| Uses git log --numstat for efficient extraction of: | |
| - Change frequency per file | |
| - Lines added/removed per file | |
| - Authors per file | |
| - Timestamps | |
| """ | |
| file_stats: Dict[str, FileStats] = defaultdict(lambda: FileStats(path="")) | |
| # Get commits from last N days, limited to MAX_COMMITS | |
| try: | |
| log_output = run_git_command( | |
| [ | |
| "log", | |
| f"--since={DAYS_OF_HISTORY} days ago", | |
| f"-n{MAX_COMMITS}", | |
| "--numstat", | |
| "--format=%H|%aI|%aN", | |
| "--no-merges", | |
| ], | |
| repo_root, | |
| ) | |
| except subprocess.CalledProcessError: | |
| return {} | |
| current_commit_info: Optional[Tuple[str, datetime, str]] = None | |
| tracked_files = get_tracked_files(repo_root) | |
| for line in log_output.split("\n"): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| # Check if this is a commit header line | |
| if "|" in line and line.count("|") == 2: | |
| parts = line.split("|") | |
| if len(parts) == 3: | |
| commit_hash, date_str, author = parts | |
| try: | |
| commit_date = datetime.fromisoformat( | |
| date_str.replace("Z", "+00:00") | |
| ) | |
| current_commit_info = (commit_hash, commit_date, author) | |
| except ValueError: | |
| current_commit_info = None | |
| continue | |
| # Parse numstat line: "added\tremoved\tfilepath" | |
| if current_commit_info and "\t" in line: | |
| parts = line.split("\t") | |
| if len(parts) >= 3: | |
| added, removed, filepath = parts[0], parts[1], parts[2] | |
| # Skip binary files (shown as "-") | |
| if added == "-" or removed == "-": | |
| continue | |
| # Only track files that currently exist | |
| if filepath not in tracked_files: | |
| continue | |
| # Skip test files and generated files for hotspot analysis | |
| if _should_skip_file(filepath): | |
| continue | |
| try: | |
| lines_added = int(added) | |
| lines_removed = int(removed) | |
| except ValueError: | |
| continue | |
| _, commit_date, author = current_commit_info | |
| if filepath not in file_stats: | |
| file_stats[filepath] = FileStats(path=filepath) | |
| stats = file_stats[filepath] | |
| stats.change_count += 1 | |
| stats.lines_added += lines_added | |
| stats.lines_removed += lines_removed | |
| stats.authors.add(author) | |
| # Track dates | |
| if stats.last_modified is None or commit_date > stats.last_modified: | |
| stats.last_modified = commit_date | |
| if stats.first_seen is None or commit_date < stats.first_seen: | |
| stats.first_seen = commit_date | |
| return dict(file_stats) | |
| def _should_skip_file(filepath: str) -> bool: | |
| """Check if file should be excluded from analysis.""" | |
| # Normalize path separators so patterns work cross-platform | |
| normalized_path = filepath.replace("\\", "/") | |
| skip_patterns = [ | |
| "node_modules/", | |
| "dist/", | |
| ".vscode-test/", | |
| "__pycache__/", | |
| ".git/", | |
| "package-lock.json", | |
| ".vsix", | |
| # Skip test directories | |
| "/test/", | |
| "/tests/", | |
| "/__tests__/", | |
| "/mocks/", | |
| ] | |
| if any(pattern in normalized_path for pattern in skip_patterns): | |
| return True | |
| # Check filename-based test patterns | |
| path = pathlib.Path(filepath) | |
| filename = path.name | |
| # Common test file naming conventions | |
| if ( | |
| filename.startswith("test_") | |
| or filename.endswith("_test.py") | |
| or filename.endswith("_tests.py") | |
| or ".test." in filename | |
| or ".spec." in filename | |
| ): | |
| return True | |
| # Skip files in well-known test directories (check path parts) | |
| test_dirs = {"test", "tests", "__tests__", "mocks"} | |
| dir_parts = path.parts[:-1] # exclude the filename | |
| if any(part in test_dirs for part in dir_parts): | |
| return True | |
| return False | |
| def analyze_temporal_coupling( | |
| repo_root: pathlib.Path, min_coupling: int = 3, min_ratio: float = 0.3 | |
| ) -> List[TemporalCoupling]: | |
| """ | |
| Find files that frequently change together (temporal coupling). | |
| High temporal coupling can indicate: | |
| - Hidden dependencies | |
| - Copy-paste code | |
| - Features spread across files | |
| Args: | |
| repo_root: Repository root path | |
| min_coupling: Minimum number of co-changes to report | |
| min_ratio: Minimum coupling ratio (0.0 to 1.0) | |
| """ | |
| # Track which files changed in each commit | |
| commit_files: Dict[str, Set[str]] = defaultdict(set) | |
| file_change_count: Dict[str, int] = defaultdict(int) | |
| try: | |
| log_output = run_git_command( | |
| [ | |
| "log", | |
| f"--since={DAYS_OF_HISTORY} days ago", | |
| f"-n{MAX_COMMITS}", | |
| "--name-only", | |
| "--format=%H", | |
| "--no-merges", | |
| ], | |
| repo_root, | |
| ) | |
| except subprocess.CalledProcessError: | |
| return [] | |
| tracked_files = get_tracked_files(repo_root) | |
| current_commit: Optional[str] = None | |
| for line in log_output.split("\n"): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| # Commit hash is 40 hex characters | |
| if len(line) == 40 and all(c in "0123456789abcdef" for c in line): | |
| current_commit = line | |
| continue | |
| if current_commit and line in tracked_files and not _should_skip_file(line): | |
| commit_files[current_commit].add(line) | |
| file_change_count[line] += 1 | |
| # Calculate coupling between file pairs | |
| coupling_count: Dict[Tuple[str, str], int] = defaultdict(int) | |
| for files in commit_files.values(): | |
| file_list = sorted(files) | |
| for i, file1 in enumerate(file_list): | |
| for file2 in file_list[i + 1 :]: | |
| coupling_count[(file1, file2)] += 1 | |
| # Filter and create coupling objects | |
| couplings: List[TemporalCoupling] = [] | |
| for (file1, file2), count in coupling_count.items(): | |
| if count < min_coupling: | |
| continue | |
| # Calculate coupling ratio relative to less-changed file | |
| min_changes = min(file_change_count[file1], file_change_count[file2]) | |
| ratio = count / min_changes if min_changes > 0 else 0 | |
| if ratio >= min_ratio: | |
| couplings.append( | |
| TemporalCoupling( | |
| file1=file1, | |
| file2=file2, | |
| coupled_commits=count, | |
| coupling_ratio=ratio, | |
| ) | |
| ) | |
| # Sort by coupling strength | |
| couplings.sort(key=lambda c: (c.coupling_ratio, c.coupled_commits), reverse=True) | |
| return couplings[:50] # Top 50 couplings | |
| def calculate_bus_factor(file_stats: Dict[str, FileStats]) -> dict: | |
| """ | |
| Calculate bus factor metrics. | |
| Bus factor = minimum number of authors who need to leave | |
| before knowledge is lost. | |
| Low bus factor (1-2) indicates knowledge silos. | |
| """ | |
| # Overall project bus factor | |
| all_authors: Set[str] = set() | |
| single_author_files: List[str] = [] | |
| for stats in file_stats.values(): | |
| all_authors.update(stats.authors) | |
| if stats.author_count == 1: | |
| single_author_files.append(stats.path) | |
| # Files with low bus factor (knowledge silos) | |
| knowledge_silos = [ | |
| {"path": stats.path, "sole_author": sorted(stats.authors)[0]} | |
| for stats in file_stats.values() | |
| if stats.author_count == 1 and stats.change_count >= 3 | |
| ] | |
| # Sort by change count (more changes = higher risk) | |
| knowledge_silos.sort(key=lambda x: file_stats[x["path"]].change_count, reverse=True) | |
| return { | |
| "total_authors": len(all_authors), | |
| "single_author_file_count": len(single_author_files), | |
| "single_author_file_ratio": round(len(single_author_files) / len(file_stats), 3) | |
| if file_stats | |
| else 0, | |
| "knowledge_silos": knowledge_silos[:20], # Top 20 at-risk files | |
| } | |
| def get_hotspots(file_stats: Dict[str, FileStats], top_n: int = 30) -> List[dict]: | |
| """ | |
| Identify hotspots - files that change frequently. | |
| Hotspots are prime candidates for: | |
| - Code review focus | |
| - Refactoring | |
| - Test coverage | |
| """ | |
| sorted_files = sorted( | |
| file_stats.values(), | |
| key=lambda s: (s.change_count, s.churn), | |
| reverse=True, | |
| ) | |
| return [f.to_dict() for f in sorted_files[:top_n]] | |
| def analyze_repository(repo_root: pathlib.Path) -> dict: | |
| """ | |
| Run complete git-based analysis on a repository. | |
| Returns a dictionary with all git metrics. | |
| """ | |
| file_stats = analyze_git_log(repo_root) | |
| temporal_coupling = analyze_temporal_coupling(repo_root) | |
| bus_factor = calculate_bus_factor(file_stats) | |
| hotspots = get_hotspots(file_stats) | |
| return { | |
| "hotspots": hotspots, | |
| "temporal_coupling": [c.to_dict() for c in temporal_coupling], | |
| "bus_factor": bus_factor, | |
| "summary": { | |
| "files_analyzed": len(file_stats), | |
| "total_changes": sum(s.change_count for s in file_stats.values()), | |
| "total_churn": sum(s.churn for s in file_stats.values()), | |
| "history_days": DAYS_OF_HISTORY, | |
| "max_commits": MAX_COMMITS, | |
| }, | |
| } | |
| if __name__ == "__main__": | |
| import json | |
| repo = pathlib.Path(__file__).parent.parent | |
| results = analyze_repository(repo) | |
| print(json.dumps(results, indent=2)) | |
Xet Storage Details
- Size:
- 13 kB
- Xet hash:
- f69c0a69826d49726737a8454765a6c2095867c43c3dd8899547d42a5fb4e916
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.