Spaces:
Sleeping
Sleeping
| """ | |
| Git Diff Analyzer | |
| Analyzes code changes in Git diffs: | |
| - Parse Git diffs | |
| - Analyze only changed files | |
| - Focus on added/modified lines | |
| """ | |
| import re | |
| import logging | |
| from typing import Any, Dict, List, Optional, Tuple | |
| from pathlib import Path | |
| from .base import BaseAnalyzer, AnalysisError | |
| from .python_analyzer import PythonAnalyzer | |
| from .javascript_analyzer import JavaScriptAnalyzer | |
| import asyncio | |
| import subprocess | |
| logger = logging.getLogger(__name__) | |
| class GitAnalyzer(BaseAnalyzer): | |
| """Analyzer for Git diffs and changed files""" | |
| def __init__(self): | |
| super().__init__() | |
| self.python_analyzer = PythonAnalyzer() | |
| self.javascript_analyzer = JavaScriptAnalyzer() | |
| # File extension mapping | |
| self.extension_map = { | |
| '.py': ('python', self.python_analyzer), | |
| '.js': ('javascript', self.javascript_analyzer), | |
| '.jsx': ('javascript', self.javascript_analyzer), | |
| '.ts': ('typescript', self.javascript_analyzer), | |
| '.tsx': ('typescript', self.javascript_analyzer), | |
| } | |
| async def analyze_diff( | |
| self, | |
| repo_path: str, | |
| base_ref: str = "HEAD", | |
| target_ref: Optional[str] = None, | |
| ctx: Any = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Analyze changes in a Git diff | |
| Args: | |
| repo_path: Path to Git repository | |
| base_ref: Base reference (commit, branch) for comparison | |
| target_ref: Target reference (default: working directory) | |
| ctx: Optional MCP context for progress reporting | |
| Returns: | |
| Dictionary with analysis results for changed files | |
| """ | |
| try: | |
| repo_dir = Path(repo_path).resolve() | |
| if not repo_dir.exists(): | |
| raise AnalysisError(f"Repository not found: {repo_path}") | |
| if ctx: | |
| await ctx.report_progress(0, 100, "Getting Git diff...") | |
| # Get list of changed files | |
| changed_files = await self._get_changed_files(repo_dir, base_ref, target_ref) | |
| if not changed_files: | |
| return self._format_result( | |
| issues=[], | |
| summary={"message": "No changed files found"}, | |
| metadata={"repo_path": str(repo_dir), "base_ref": base_ref} | |
| ) | |
| if ctx: | |
| await ctx.report_progress( | |
| 20, 100, | |
| f"Found {len(changed_files)} changed files..." | |
| ) | |
| # Analyze each changed file | |
| all_results = [] | |
| for i, file_path in enumerate(changed_files): | |
| progress = 20 + int((i / len(changed_files)) * 70) | |
| if ctx: | |
| await ctx.report_progress( | |
| progress, 100, | |
| f"Analyzing {file_path.name}..." | |
| ) | |
| result = await self._analyze_changed_file(repo_dir, file_path, ctx) | |
| if result: | |
| all_results.append(result) | |
| if ctx: | |
| await ctx.report_progress(90, 100, "Aggregating results...") | |
| # Aggregate results | |
| aggregated = self._aggregate_git_results(all_results, repo_dir, base_ref) | |
| if ctx: | |
| await ctx.report_progress(100, 100, "Git diff analysis complete!") | |
| return aggregated | |
| except Exception as e: | |
| logger.error(f"Git diff analysis failed: {e}") | |
| raise AnalysisError(f"Git diff analysis failed: {e}") | |
| async def _get_changed_files( | |
| self, | |
| repo_dir: Path, | |
| base_ref: str, | |
| target_ref: Optional[str] | |
| ) -> List[Path]: | |
| """ | |
| Get list of changed files in the diff | |
| Returns: | |
| List of changed file paths | |
| """ | |
| try: | |
| # Build git diff command | |
| if target_ref: | |
| command = ["git", "diff", "--name-only", base_ref, target_ref] | |
| else: | |
| # Compare with working directory | |
| command = ["git", "diff", "--name-only", base_ref] | |
| stdout, stderr, returncode = await self._run_subprocess( | |
| command, | |
| cwd=repo_dir | |
| ) | |
| if returncode != 0: | |
| logger.warning(f"Git diff returned non-zero: {stderr}") | |
| return [] | |
| # Parse file paths | |
| changed_files = [] | |
| for line in stdout.strip().split('\n'): | |
| if line: | |
| file_path = repo_dir / line.strip() | |
| # Only include files with supported extensions | |
| if file_path.suffix in self.extension_map and file_path.exists(): | |
| changed_files.append(file_path) | |
| return changed_files | |
| except Exception as e: | |
| logger.error(f"Failed to get changed files: {e}") | |
| return [] | |
| async def _analyze_changed_file( | |
| self, | |
| repo_dir: Path, | |
| file_path: Path, | |
| ctx: Any = None | |
| ) -> Optional[Dict[str, Any]]: | |
| """ | |
| Analyze a single changed file | |
| Returns: | |
| Analysis result or None if failed | |
| """ | |
| try: | |
| # Read current file content | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| code = f.read() | |
| # Get appropriate analyzer | |
| extension = file_path.suffix | |
| if extension not in self.extension_map: | |
| return None | |
| language, analyzer = self.extension_map[extension] | |
| # Analyze the file | |
| if isinstance(analyzer, PythonAnalyzer): | |
| result = await analyzer.analyze(code, ctx=None) | |
| elif isinstance(analyzer, JavaScriptAnalyzer): | |
| result = await analyzer.analyze(code, ctx=None, language=language) | |
| else: | |
| return None | |
| # Add file metadata | |
| result['file'] = str(file_path) | |
| result['relative_path'] = str(file_path.relative_to(repo_dir)) | |
| return result | |
| except Exception as e: | |
| logger.warning(f"Failed to analyze {file_path}: {e}") | |
| return None | |
| def _aggregate_git_results( | |
| self, | |
| results: List[Dict[str, Any]], | |
| repo_dir: Path, | |
| base_ref: str | |
| ) -> Dict[str, Any]: | |
| """ | |
| Aggregate results from changed files | |
| Returns: | |
| Aggregated analysis result | |
| """ | |
| all_issues = [] | |
| total_errors = 0 | |
| total_warnings = 0 | |
| total_security_issues = 0 | |
| files_analyzed = len(results) | |
| for result in results: | |
| # Aggregate issues | |
| issues = result.get('issues', []) | |
| for issue in issues: | |
| issue['file'] = result.get('relative_path', result.get('file', '')) | |
| all_issues.append(issue) | |
| # Aggregate counts | |
| summary = result.get('summary', {}) | |
| total_errors += summary.get('errors', 0) | |
| total_warnings += summary.get('warnings', 0) | |
| total_security_issues += summary.get('security_issues', 0) | |
| return self._format_result( | |
| issues=all_issues, | |
| summary={ | |
| "total_errors": total_errors, | |
| "total_warnings": total_warnings, | |
| "total_security_issues": total_security_issues, | |
| "files_changed": files_analyzed, | |
| "issues_per_file": len(all_issues) / files_analyzed if files_analyzed else 0, | |
| }, | |
| metadata={ | |
| "repo_path": str(repo_dir), | |
| "base_ref": base_ref, | |
| "analysis_type": "git_diff" | |
| } | |
| ) | |
| # Convenience functions | |
| async def analyze_git_diff( | |
| repo_path: str, | |
| base_ref: str = "HEAD", | |
| target_ref: Optional[str] = None, | |
| ctx: Any = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Convenience function to analyze Git diff | |
| Args: | |
| repo_path: Path to Git repository | |
| base_ref: Base reference for comparison (default: HEAD) | |
| target_ref: Target reference (default: working directory) | |
| ctx: Optional MCP context for progress reporting | |
| Returns: | |
| Analysis results for changed files | |
| """ | |
| analyzer = GitAnalyzer() | |
| return await analyzer.analyze_diff(repo_path, base_ref, target_ref, ctx) | |