""" Git Diff Analyzer Analyzes code changes in Git diffs: - Parse Git diffs - Analyze only changed files - Focus on added/modified lines """ import re import logging from typing import Any, Dict, List, Optional, Tuple from pathlib import Path from .base import BaseAnalyzer, AnalysisError from .python_analyzer import PythonAnalyzer from .javascript_analyzer import JavaScriptAnalyzer import asyncio import subprocess logger = logging.getLogger(__name__) class GitAnalyzer(BaseAnalyzer): """Analyzer for Git diffs and changed files""" def __init__(self): super().__init__() self.python_analyzer = PythonAnalyzer() self.javascript_analyzer = JavaScriptAnalyzer() # File extension mapping self.extension_map = { '.py': ('python', self.python_analyzer), '.js': ('javascript', self.javascript_analyzer), '.jsx': ('javascript', self.javascript_analyzer), '.ts': ('typescript', self.javascript_analyzer), '.tsx': ('typescript', self.javascript_analyzer), } async def analyze_diff( self, repo_path: str, base_ref: str = "HEAD", target_ref: Optional[str] = None, ctx: Any = None ) -> Dict[str, Any]: """ Analyze changes in a Git diff Args: repo_path: Path to Git repository base_ref: Base reference (commit, branch) for comparison target_ref: Target reference (default: working directory) ctx: Optional MCP context for progress reporting Returns: Dictionary with analysis results for changed files """ try: repo_dir = Path(repo_path).resolve() if not repo_dir.exists(): raise AnalysisError(f"Repository not found: {repo_path}") if ctx: await ctx.report_progress(0, 100, "Getting Git diff...") # Get list of changed files changed_files = await self._get_changed_files(repo_dir, base_ref, target_ref) if not changed_files: return self._format_result( issues=[], summary={"message": "No changed files found"}, metadata={"repo_path": str(repo_dir), "base_ref": base_ref} ) if ctx: await ctx.report_progress( 20, 100, f"Found {len(changed_files)} changed files..." ) # Analyze each changed file all_results = [] for i, file_path in enumerate(changed_files): progress = 20 + int((i / len(changed_files)) * 70) if ctx: await ctx.report_progress( progress, 100, f"Analyzing {file_path.name}..." ) result = await self._analyze_changed_file(repo_dir, file_path, ctx) if result: all_results.append(result) if ctx: await ctx.report_progress(90, 100, "Aggregating results...") # Aggregate results aggregated = self._aggregate_git_results(all_results, repo_dir, base_ref) if ctx: await ctx.report_progress(100, 100, "Git diff analysis complete!") return aggregated except Exception as e: logger.error(f"Git diff analysis failed: {e}") raise AnalysisError(f"Git diff analysis failed: {e}") async def _get_changed_files( self, repo_dir: Path, base_ref: str, target_ref: Optional[str] ) -> List[Path]: """ Get list of changed files in the diff Returns: List of changed file paths """ try: # Build git diff command if target_ref: command = ["git", "diff", "--name-only", base_ref, target_ref] else: # Compare with working directory command = ["git", "diff", "--name-only", base_ref] stdout, stderr, returncode = await self._run_subprocess( command, cwd=repo_dir ) if returncode != 0: logger.warning(f"Git diff returned non-zero: {stderr}") return [] # Parse file paths changed_files = [] for line in stdout.strip().split('\n'): if line: file_path = repo_dir / line.strip() # Only include files with supported extensions if file_path.suffix in self.extension_map and file_path.exists(): changed_files.append(file_path) return changed_files except Exception as e: logger.error(f"Failed to get changed files: {e}") return [] async def _analyze_changed_file( self, repo_dir: Path, file_path: Path, ctx: Any = None ) -> Optional[Dict[str, Any]]: """ Analyze a single changed file Returns: Analysis result or None if failed """ try: # Read current file content with open(file_path, 'r', encoding='utf-8') as f: code = f.read() # Get appropriate analyzer extension = file_path.suffix if extension not in self.extension_map: return None language, analyzer = self.extension_map[extension] # Analyze the file if isinstance(analyzer, PythonAnalyzer): result = await analyzer.analyze(code, ctx=None) elif isinstance(analyzer, JavaScriptAnalyzer): result = await analyzer.analyze(code, ctx=None, language=language) else: return None # Add file metadata result['file'] = str(file_path) result['relative_path'] = str(file_path.relative_to(repo_dir)) return result except Exception as e: logger.warning(f"Failed to analyze {file_path}: {e}") return None def _aggregate_git_results( self, results: List[Dict[str, Any]], repo_dir: Path, base_ref: str ) -> Dict[str, Any]: """ Aggregate results from changed files Returns: Aggregated analysis result """ all_issues = [] total_errors = 0 total_warnings = 0 total_security_issues = 0 files_analyzed = len(results) for result in results: # Aggregate issues issues = result.get('issues', []) for issue in issues: issue['file'] = result.get('relative_path', result.get('file', '')) all_issues.append(issue) # Aggregate counts summary = result.get('summary', {}) total_errors += summary.get('errors', 0) total_warnings += summary.get('warnings', 0) total_security_issues += summary.get('security_issues', 0) return self._format_result( issues=all_issues, summary={ "total_errors": total_errors, "total_warnings": total_warnings, "total_security_issues": total_security_issues, "files_changed": files_analyzed, "issues_per_file": len(all_issues) / files_analyzed if files_analyzed else 0, }, metadata={ "repo_path": str(repo_dir), "base_ref": base_ref, "analysis_type": "git_diff" } ) # Convenience functions async def analyze_git_diff( repo_path: str, base_ref: str = "HEAD", target_ref: Optional[str] = None, ctx: Any = None ) -> Dict[str, Any]: """ Convenience function to analyze Git diff Args: repo_path: Path to Git repository base_ref: Base reference for comparison (default: HEAD) target_ref: Target reference (default: working directory) ctx: Optional MCP context for progress reporting Returns: Analysis results for changed files """ analyzer = GitAnalyzer() return await analyzer.analyze_diff(repo_path, base_ref, target_ref, ctx)