Codelint-MCP / src /analyzers /git_analyzer.py
OsamaAliMid's picture
Add CodeLint MCP Premium Edition application
ec37394
"""
Git Diff Analyzer
Analyzes code changes in Git diffs:
- Parse Git diffs
- Analyze only changed files
- Focus on added/modified lines
"""
import re
import logging
from typing import Any, Dict, List, Optional, Tuple
from pathlib import Path
from .base import BaseAnalyzer, AnalysisError
from .python_analyzer import PythonAnalyzer
from .javascript_analyzer import JavaScriptAnalyzer
import asyncio
import subprocess
logger = logging.getLogger(__name__)
class GitAnalyzer(BaseAnalyzer):
"""Analyzer for Git diffs and changed files"""
def __init__(self):
super().__init__()
self.python_analyzer = PythonAnalyzer()
self.javascript_analyzer = JavaScriptAnalyzer()
# File extension mapping
self.extension_map = {
'.py': ('python', self.python_analyzer),
'.js': ('javascript', self.javascript_analyzer),
'.jsx': ('javascript', self.javascript_analyzer),
'.ts': ('typescript', self.javascript_analyzer),
'.tsx': ('typescript', self.javascript_analyzer),
}
async def analyze_diff(
self,
repo_path: str,
base_ref: str = "HEAD",
target_ref: Optional[str] = None,
ctx: Any = None
) -> Dict[str, Any]:
"""
Analyze changes in a Git diff
Args:
repo_path: Path to Git repository
base_ref: Base reference (commit, branch) for comparison
target_ref: Target reference (default: working directory)
ctx: Optional MCP context for progress reporting
Returns:
Dictionary with analysis results for changed files
"""
try:
repo_dir = Path(repo_path).resolve()
if not repo_dir.exists():
raise AnalysisError(f"Repository not found: {repo_path}")
if ctx:
await ctx.report_progress(0, 100, "Getting Git diff...")
# Get list of changed files
changed_files = await self._get_changed_files(repo_dir, base_ref, target_ref)
if not changed_files:
return self._format_result(
issues=[],
summary={"message": "No changed files found"},
metadata={"repo_path": str(repo_dir), "base_ref": base_ref}
)
if ctx:
await ctx.report_progress(
20, 100,
f"Found {len(changed_files)} changed files..."
)
# Analyze each changed file
all_results = []
for i, file_path in enumerate(changed_files):
progress = 20 + int((i / len(changed_files)) * 70)
if ctx:
await ctx.report_progress(
progress, 100,
f"Analyzing {file_path.name}..."
)
result = await self._analyze_changed_file(repo_dir, file_path, ctx)
if result:
all_results.append(result)
if ctx:
await ctx.report_progress(90, 100, "Aggregating results...")
# Aggregate results
aggregated = self._aggregate_git_results(all_results, repo_dir, base_ref)
if ctx:
await ctx.report_progress(100, 100, "Git diff analysis complete!")
return aggregated
except Exception as e:
logger.error(f"Git diff analysis failed: {e}")
raise AnalysisError(f"Git diff analysis failed: {e}")
async def _get_changed_files(
self,
repo_dir: Path,
base_ref: str,
target_ref: Optional[str]
) -> List[Path]:
"""
Get list of changed files in the diff
Returns:
List of changed file paths
"""
try:
# Build git diff command
if target_ref:
command = ["git", "diff", "--name-only", base_ref, target_ref]
else:
# Compare with working directory
command = ["git", "diff", "--name-only", base_ref]
stdout, stderr, returncode = await self._run_subprocess(
command,
cwd=repo_dir
)
if returncode != 0:
logger.warning(f"Git diff returned non-zero: {stderr}")
return []
# Parse file paths
changed_files = []
for line in stdout.strip().split('\n'):
if line:
file_path = repo_dir / line.strip()
# Only include files with supported extensions
if file_path.suffix in self.extension_map and file_path.exists():
changed_files.append(file_path)
return changed_files
except Exception as e:
logger.error(f"Failed to get changed files: {e}")
return []
async def _analyze_changed_file(
self,
repo_dir: Path,
file_path: Path,
ctx: Any = None
) -> Optional[Dict[str, Any]]:
"""
Analyze a single changed file
Returns:
Analysis result or None if failed
"""
try:
# Read current file content
with open(file_path, 'r', encoding='utf-8') as f:
code = f.read()
# Get appropriate analyzer
extension = file_path.suffix
if extension not in self.extension_map:
return None
language, analyzer = self.extension_map[extension]
# Analyze the file
if isinstance(analyzer, PythonAnalyzer):
result = await analyzer.analyze(code, ctx=None)
elif isinstance(analyzer, JavaScriptAnalyzer):
result = await analyzer.analyze(code, ctx=None, language=language)
else:
return None
# Add file metadata
result['file'] = str(file_path)
result['relative_path'] = str(file_path.relative_to(repo_dir))
return result
except Exception as e:
logger.warning(f"Failed to analyze {file_path}: {e}")
return None
def _aggregate_git_results(
self,
results: List[Dict[str, Any]],
repo_dir: Path,
base_ref: str
) -> Dict[str, Any]:
"""
Aggregate results from changed files
Returns:
Aggregated analysis result
"""
all_issues = []
total_errors = 0
total_warnings = 0
total_security_issues = 0
files_analyzed = len(results)
for result in results:
# Aggregate issues
issues = result.get('issues', [])
for issue in issues:
issue['file'] = result.get('relative_path', result.get('file', ''))
all_issues.append(issue)
# Aggregate counts
summary = result.get('summary', {})
total_errors += summary.get('errors', 0)
total_warnings += summary.get('warnings', 0)
total_security_issues += summary.get('security_issues', 0)
return self._format_result(
issues=all_issues,
summary={
"total_errors": total_errors,
"total_warnings": total_warnings,
"total_security_issues": total_security_issues,
"files_changed": files_analyzed,
"issues_per_file": len(all_issues) / files_analyzed if files_analyzed else 0,
},
metadata={
"repo_path": str(repo_dir),
"base_ref": base_ref,
"analysis_type": "git_diff"
}
)
# Convenience functions
async def analyze_git_diff(
repo_path: str,
base_ref: str = "HEAD",
target_ref: Optional[str] = None,
ctx: Any = None
) -> Dict[str, Any]:
"""
Convenience function to analyze Git diff
Args:
repo_path: Path to Git repository
base_ref: Base reference for comparison (default: HEAD)
target_ref: Target reference (default: working directory)
ctx: Optional MCP context for progress reporting
Returns:
Analysis results for changed files
"""
analyzer = GitAnalyzer()
return await analyzer.analyze_diff(repo_path, base_ref, target_ref, ctx)