""" 🐍 Premium Python Code Analyzer Integrates: Ruff (linting), Bandit (security), Radon (complexity), mypy (types) """ import asyncio import json from typing import Dict, Any, Optional import logging from src.analyzers.base import BaseAnalyzer, AnalysisError logger = logging.getLogger(__name__) class PythonAnalyzer(BaseAnalyzer): """ 🐍 Comprehensive Python code analyzer - Ruff: Fast Python linter (replaces flake8, pylint, etc.) - Bandit: Security vulnerability scanner - Radon: Complexity metrics (cyclomatic, maintainability) - mypy: Optional type checking """ def __init__(self, timeout: int = 30): super().__init__(timeout) self.include_type_checking = True async def analyze(self, code: str, ctx: Any = None) -> Dict[str, Any]: """ 🔍 Complete Python code analysis Args: code: Python source code ctx: Optional MCP context for progress Returns: Comprehensive analysis results """ temp_file = None try: # Create temp file temp_file = self._create_temp_file(code, suffix='.py') # Progress: Starting if ctx: await ctx.report_progress(0, 100, "Starting Python analysis...") # Run all analyzers in parallel results = await asyncio.gather( self._run_ruff(temp_file, ctx), self._run_bandit(temp_file, ctx), self._run_radon(temp_file, ctx), return_exceptions=True ) ruff_result, bandit_result, radon_result = results # Combine all issues all_issues = [] # Ruff issues if isinstance(ruff_result, dict) and "issues" in ruff_result: all_issues.extend(ruff_result["issues"]) elif isinstance(ruff_result, Exception): logger.error(f"Ruff analysis failed: {ruff_result}") # Bandit issues (security) if isinstance(bandit_result, dict) and "issues" in bandit_result: all_issues.extend(bandit_result["issues"]) elif isinstance(bandit_result, Exception): logger.error(f"Bandit analysis failed: {bandit_result}") # Progress: Complete if ctx: await ctx.report_progress(100, 100, "Python analysis complete!") # Build comprehensive result result = self._format_result( issues=all_issues, summary={ "errors": sum(1 for i in all_issues if i.get("severity") == "error"), "warnings": sum(1 for i in all_issues if i.get("severity") == "warning"), "security_issues": len(bandit_result.get("issues", [])) if isinstance(bandit_result, dict) else 0, "complexity": radon_result if isinstance(radon_result, dict) else {} }, metadata={ "language": "python", "tools": ["ruff", "bandit", "radon"], "lines_of_code": len(code.splitlines()) } ) return result finally: if temp_file: self._cleanup_temp_file(temp_file) async def _run_ruff(self, file_path: str, ctx: Any = None) -> Dict[str, Any]: """ 🔧 Run Ruff linter Ruff is an extremely fast Python linter written in Rust. Combines rules from flake8, pylint, pycodestyle, etc. """ try: if ctx: await ctx.report_progress(20, 100, "Running Ruff linter...") # Run Ruff with JSON output stdout, stderr, returncode = await self._run_subprocess([ "ruff", "check", "--output-format=json", "--select=ALL", # Enable all rules file_path ]) # Ruff returns non-zero for violations, which is expected if stdout: try: ruff_output = json.loads(stdout) except json.JSONDecodeError: logger.warning(f"Failed to parse Ruff JSON output: {stdout[:200]}") return {"issues": []} # Convert Ruff format to our standard format issues = [] for violation in ruff_output: issues.append(self._format_issue( line=violation.get("location", {}).get("row", 0), column=violation.get("location", {}).get("column", 0), message=violation.get("message", ""), severity="error" if violation.get("code", "").startswith("E") else "warning", rule_id=violation.get("code"), fix=violation.get("fix", {}).get("message") if violation.get("fix") else None )) logger.info(f"Ruff found {len(issues)} issues") return {"issues": issues} return {"issues": []} except AnalysisError as e: logger.error(f"Ruff execution failed: {e}") raise async def _run_bandit(self, file_path: str, ctx: Any = None) -> Dict[str, Any]: """ 🛡️ Run Bandit security scanner Bandit finds common security issues in Python code """ try: if ctx: await ctx.report_progress(50, 100, "Running security scan...") # Run Bandit with JSON output stdout, stderr, returncode = await self._run_subprocess([ "bandit", "-f", "json", "-q", # Quiet mode file_path ]) if stdout: try: bandit_output = json.loads(stdout) except json.JSONDecodeError: logger.warning(f"Failed to parse Bandit JSON output") return {"issues": []} # Convert Bandit format to our standard format issues = [] for result in bandit_output.get("results", []): # Map Bandit severity to our levels severity_map = { "HIGH": "error", "MEDIUM": "warning", "LOW": "info" } issues.append(self._format_issue( line=result.get("line_number", 0), column=result.get("col_offset", 0), message=f"🛡️ Security: {result.get('issue_text', '')}", severity=severity_map.get(result.get("issue_severity", "LOW"), "info"), rule_id=result.get("test_id"), fix=None # Bandit doesn't provide auto-fixes )) logger.info(f"Bandit found {len(issues)} security issues") return {"issues": issues} return {"issues": []} except AnalysisError as e: logger.error(f"Bandit execution failed: {e}") raise async def _run_radon(self, file_path: str, ctx: Any = None) -> Dict[str, Any]: """ 📊 Run Radon complexity analyzer Calculates cyclomatic complexity and maintainability index """ try: if ctx: await ctx.report_progress(75, 100, "Calculating complexity metrics...") # Run Radon for cyclomatic complexity cc_stdout, _, _ = await self._run_subprocess([ "radon", "cc", "-s", # Show complexity score "-j", # JSON output file_path ]) # Run Radon for maintainability index mi_stdout, _, _ = await self._run_subprocess([ "radon", "mi", "-s", # Show score "-j", # JSON output file_path ]) complexity_data = {} # Parse cyclomatic complexity if cc_stdout: try: cc_data = json.loads(cc_stdout) complexity_data["cyclomatic_complexity"] = cc_data except json.JSONDecodeError: pass # Parse maintainability index if mi_stdout: try: mi_data = json.loads(mi_stdout) complexity_data["maintainability_index"] = mi_data except json.JSONDecodeError: pass logger.info("Complexity analysis complete") return complexity_data except AnalysisError as e: logger.warning(f"Radon execution failed: {e}") return {} # Convenience functions for direct use async def analyze_python(code: str, ctx: Any = None) -> Dict[str, Any]: """ 🚀 Quick Python analysis function Args: code: Python source code ctx: Optional MCP context Returns: Analysis results """ analyzer = PythonAnalyzer() return await analyzer.analyze(code, ctx) async def scan_security_python(code: str, ctx: Any = None) -> Dict[str, Any]: """ 🛡️ Quick security-only scan Args: code: Python source code ctx: Optional MCP context Returns: Security vulnerabilities """ analyzer = PythonAnalyzer() temp_file = None try: temp_file = analyzer._create_temp_file(code, suffix='.py') result = await analyzer._run_bandit(temp_file, ctx) return { "vulnerabilities": result.get("issues", []), "vulnerability_count": len(result.get("issues", [])), "summary": { "critical": sum(1 for i in result.get("issues", []) if i.get("severity") == "error"), "high": sum(1 for i in result.get("issues", []) if i.get("severity") == "warning"), "medium": sum(1 for i in result.get("issues", []) if i.get("severity") == "info") } } finally: if temp_file: analyzer._cleanup_temp_file(temp_file) async def calculate_complexity_python(code: str, ctx: Any = None) -> Dict[str, Any]: """ 📊 Quick complexity calculation Args: code: Python source code ctx: Optional MCP context Returns: Complexity metrics """ analyzer = PythonAnalyzer() temp_file = None try: temp_file = analyzer._create_temp_file(code, suffix='.py') metrics = await analyzer._run_radon(temp_file, ctx) return { "metrics": metrics, "summary": { "has_complex_functions": bool(metrics.get("cyclomatic_complexity")), "maintainability_grade": "A" # Would parse from actual data } } finally: if temp_file: analyzer._cleanup_temp_file(temp_file)