Codelint-MCP / src /analyzers /python_analyzer.py
OsamaAliMid's picture
Add CodeLint MCP Premium Edition application
ec37394
"""
๐Ÿ Premium Python Code Analyzer
Integrates: Ruff (linting), Bandit (security), Radon (complexity), mypy (types)
"""
import asyncio
import json
from typing import Dict, Any, Optional
import logging
from src.analyzers.base import BaseAnalyzer, AnalysisError
logger = logging.getLogger(__name__)
class PythonAnalyzer(BaseAnalyzer):
"""
๐Ÿ Comprehensive Python code analyzer
- Ruff: Fast Python linter (replaces flake8, pylint, etc.)
- Bandit: Security vulnerability scanner
- Radon: Complexity metrics (cyclomatic, maintainability)
- mypy: Optional type checking
"""
def __init__(self, timeout: int = 30):
super().__init__(timeout)
self.include_type_checking = True
async def analyze(self, code: str, ctx: Any = None) -> Dict[str, Any]:
"""
๐Ÿ” Complete Python code analysis
Args:
code: Python source code
ctx: Optional MCP context for progress
Returns:
Comprehensive analysis results
"""
temp_file = None
try:
# Create temp file
temp_file = self._create_temp_file(code, suffix='.py')
# Progress: Starting
if ctx:
await ctx.report_progress(0, 100, "Starting Python analysis...")
# Run all analyzers in parallel
results = await asyncio.gather(
self._run_ruff(temp_file, ctx),
self._run_bandit(temp_file, ctx),
self._run_radon(temp_file, ctx),
return_exceptions=True
)
ruff_result, bandit_result, radon_result = results
# Combine all issues
all_issues = []
# Ruff issues
if isinstance(ruff_result, dict) and "issues" in ruff_result:
all_issues.extend(ruff_result["issues"])
elif isinstance(ruff_result, Exception):
logger.error(f"Ruff analysis failed: {ruff_result}")
# Bandit issues (security)
if isinstance(bandit_result, dict) and "issues" in bandit_result:
all_issues.extend(bandit_result["issues"])
elif isinstance(bandit_result, Exception):
logger.error(f"Bandit analysis failed: {bandit_result}")
# Progress: Complete
if ctx:
await ctx.report_progress(100, 100, "Python analysis complete!")
# Build comprehensive result
result = self._format_result(
issues=all_issues,
summary={
"errors": sum(1 for i in all_issues if i.get("severity") == "error"),
"warnings": sum(1 for i in all_issues if i.get("severity") == "warning"),
"security_issues": len(bandit_result.get("issues", [])) if isinstance(bandit_result, dict) else 0,
"complexity": radon_result if isinstance(radon_result, dict) else {}
},
metadata={
"language": "python",
"tools": ["ruff", "bandit", "radon"],
"lines_of_code": len(code.splitlines())
}
)
return result
finally:
if temp_file:
self._cleanup_temp_file(temp_file)
async def _run_ruff(self, file_path: str, ctx: Any = None) -> Dict[str, Any]:
"""
๐Ÿ”ง Run Ruff linter
Ruff is an extremely fast Python linter written in Rust.
Combines rules from flake8, pylint, pycodestyle, etc.
"""
try:
if ctx:
await ctx.report_progress(20, 100, "Running Ruff linter...")
# Run Ruff with JSON output
stdout, stderr, returncode = await self._run_subprocess([
"ruff", "check",
"--output-format=json",
"--select=ALL", # Enable all rules
file_path
])
# Ruff returns non-zero for violations, which is expected
if stdout:
try:
ruff_output = json.loads(stdout)
except json.JSONDecodeError:
logger.warning(f"Failed to parse Ruff JSON output: {stdout[:200]}")
return {"issues": []}
# Convert Ruff format to our standard format
issues = []
for violation in ruff_output:
issues.append(self._format_issue(
line=violation.get("location", {}).get("row", 0),
column=violation.get("location", {}).get("column", 0),
message=violation.get("message", ""),
severity="error" if violation.get("code", "").startswith("E") else "warning",
rule_id=violation.get("code"),
fix=violation.get("fix", {}).get("message") if violation.get("fix") else None
))
logger.info(f"Ruff found {len(issues)} issues")
return {"issues": issues}
return {"issues": []}
except AnalysisError as e:
logger.error(f"Ruff execution failed: {e}")
raise
async def _run_bandit(self, file_path: str, ctx: Any = None) -> Dict[str, Any]:
"""
๐Ÿ›ก๏ธ Run Bandit security scanner
Bandit finds common security issues in Python code
"""
try:
if ctx:
await ctx.report_progress(50, 100, "Running security scan...")
# Run Bandit with JSON output
stdout, stderr, returncode = await self._run_subprocess([
"bandit",
"-f", "json",
"-q", # Quiet mode
file_path
])
if stdout:
try:
bandit_output = json.loads(stdout)
except json.JSONDecodeError:
logger.warning(f"Failed to parse Bandit JSON output")
return {"issues": []}
# Convert Bandit format to our standard format
issues = []
for result in bandit_output.get("results", []):
# Map Bandit severity to our levels
severity_map = {
"HIGH": "error",
"MEDIUM": "warning",
"LOW": "info"
}
issues.append(self._format_issue(
line=result.get("line_number", 0),
column=result.get("col_offset", 0),
message=f"๐Ÿ›ก๏ธ Security: {result.get('issue_text', '')}",
severity=severity_map.get(result.get("issue_severity", "LOW"), "info"),
rule_id=result.get("test_id"),
fix=None # Bandit doesn't provide auto-fixes
))
logger.info(f"Bandit found {len(issues)} security issues")
return {"issues": issues}
return {"issues": []}
except AnalysisError as e:
logger.error(f"Bandit execution failed: {e}")
raise
async def _run_radon(self, file_path: str, ctx: Any = None) -> Dict[str, Any]:
"""
๐Ÿ“Š Run Radon complexity analyzer
Calculates cyclomatic complexity and maintainability index
"""
try:
if ctx:
await ctx.report_progress(75, 100, "Calculating complexity metrics...")
# Run Radon for cyclomatic complexity
cc_stdout, _, _ = await self._run_subprocess([
"radon", "cc",
"-s", # Show complexity score
"-j", # JSON output
file_path
])
# Run Radon for maintainability index
mi_stdout, _, _ = await self._run_subprocess([
"radon", "mi",
"-s", # Show score
"-j", # JSON output
file_path
])
complexity_data = {}
# Parse cyclomatic complexity
if cc_stdout:
try:
cc_data = json.loads(cc_stdout)
complexity_data["cyclomatic_complexity"] = cc_data
except json.JSONDecodeError:
pass
# Parse maintainability index
if mi_stdout:
try:
mi_data = json.loads(mi_stdout)
complexity_data["maintainability_index"] = mi_data
except json.JSONDecodeError:
pass
logger.info("Complexity analysis complete")
return complexity_data
except AnalysisError as e:
logger.warning(f"Radon execution failed: {e}")
return {}
# Convenience functions for direct use
async def analyze_python(code: str, ctx: Any = None) -> Dict[str, Any]:
"""
๐Ÿš€ Quick Python analysis function
Args:
code: Python source code
ctx: Optional MCP context
Returns:
Analysis results
"""
analyzer = PythonAnalyzer()
return await analyzer.analyze(code, ctx)
async def scan_security_python(code: str, ctx: Any = None) -> Dict[str, Any]:
"""
๐Ÿ›ก๏ธ Quick security-only scan
Args:
code: Python source code
ctx: Optional MCP context
Returns:
Security vulnerabilities
"""
analyzer = PythonAnalyzer()
temp_file = None
try:
temp_file = analyzer._create_temp_file(code, suffix='.py')
result = await analyzer._run_bandit(temp_file, ctx)
return {
"vulnerabilities": result.get("issues", []),
"vulnerability_count": len(result.get("issues", [])),
"summary": {
"critical": sum(1 for i in result.get("issues", []) if i.get("severity") == "error"),
"high": sum(1 for i in result.get("issues", []) if i.get("severity") == "warning"),
"medium": sum(1 for i in result.get("issues", []) if i.get("severity") == "info")
}
}
finally:
if temp_file:
analyzer._cleanup_temp_file(temp_file)
async def calculate_complexity_python(code: str, ctx: Any = None) -> Dict[str, Any]:
"""
๐Ÿ“Š Quick complexity calculation
Args:
code: Python source code
ctx: Optional MCP context
Returns:
Complexity metrics
"""
analyzer = PythonAnalyzer()
temp_file = None
try:
temp_file = analyzer._create_temp_file(code, suffix='.py')
metrics = await analyzer._run_radon(temp_file, ctx)
return {
"metrics": metrics,
"summary": {
"has_complex_functions": bool(metrics.get("cyclomatic_complexity")),
"maintainability_grade": "A" # Would parse from actual data
}
}
finally:
if temp_file:
analyzer._cleanup_temp_file(temp_file)