Spaces:
Sleeping
Sleeping
| """ | |
| ๐ Premium Python Code Analyzer | |
| Integrates: Ruff (linting), Bandit (security), Radon (complexity), mypy (types) | |
| """ | |
| import asyncio | |
| import json | |
| from typing import Dict, Any, Optional | |
| import logging | |
| from src.analyzers.base import BaseAnalyzer, AnalysisError | |
| logger = logging.getLogger(__name__) | |
| class PythonAnalyzer(BaseAnalyzer): | |
| """ | |
| ๐ Comprehensive Python code analyzer | |
| - Ruff: Fast Python linter (replaces flake8, pylint, etc.) | |
| - Bandit: Security vulnerability scanner | |
| - Radon: Complexity metrics (cyclomatic, maintainability) | |
| - mypy: Optional type checking | |
| """ | |
| def __init__(self, timeout: int = 30): | |
| super().__init__(timeout) | |
| self.include_type_checking = True | |
| async def analyze(self, code: str, ctx: Any = None) -> Dict[str, Any]: | |
| """ | |
| ๐ Complete Python code analysis | |
| Args: | |
| code: Python source code | |
| ctx: Optional MCP context for progress | |
| Returns: | |
| Comprehensive analysis results | |
| """ | |
| temp_file = None | |
| try: | |
| # Create temp file | |
| temp_file = self._create_temp_file(code, suffix='.py') | |
| # Progress: Starting | |
| if ctx: | |
| await ctx.report_progress(0, 100, "Starting Python analysis...") | |
| # Run all analyzers in parallel | |
| results = await asyncio.gather( | |
| self._run_ruff(temp_file, ctx), | |
| self._run_bandit(temp_file, ctx), | |
| self._run_radon(temp_file, ctx), | |
| return_exceptions=True | |
| ) | |
| ruff_result, bandit_result, radon_result = results | |
| # Combine all issues | |
| all_issues = [] | |
| # Ruff issues | |
| if isinstance(ruff_result, dict) and "issues" in ruff_result: | |
| all_issues.extend(ruff_result["issues"]) | |
| elif isinstance(ruff_result, Exception): | |
| logger.error(f"Ruff analysis failed: {ruff_result}") | |
| # Bandit issues (security) | |
| if isinstance(bandit_result, dict) and "issues" in bandit_result: | |
| all_issues.extend(bandit_result["issues"]) | |
| elif isinstance(bandit_result, Exception): | |
| logger.error(f"Bandit analysis failed: {bandit_result}") | |
| # Progress: Complete | |
| if ctx: | |
| await ctx.report_progress(100, 100, "Python analysis complete!") | |
| # Build comprehensive result | |
| result = self._format_result( | |
| issues=all_issues, | |
| summary={ | |
| "errors": sum(1 for i in all_issues if i.get("severity") == "error"), | |
| "warnings": sum(1 for i in all_issues if i.get("severity") == "warning"), | |
| "security_issues": len(bandit_result.get("issues", [])) if isinstance(bandit_result, dict) else 0, | |
| "complexity": radon_result if isinstance(radon_result, dict) else {} | |
| }, | |
| metadata={ | |
| "language": "python", | |
| "tools": ["ruff", "bandit", "radon"], | |
| "lines_of_code": len(code.splitlines()) | |
| } | |
| ) | |
| return result | |
| finally: | |
| if temp_file: | |
| self._cleanup_temp_file(temp_file) | |
| async def _run_ruff(self, file_path: str, ctx: Any = None) -> Dict[str, Any]: | |
| """ | |
| ๐ง Run Ruff linter | |
| Ruff is an extremely fast Python linter written in Rust. | |
| Combines rules from flake8, pylint, pycodestyle, etc. | |
| """ | |
| try: | |
| if ctx: | |
| await ctx.report_progress(20, 100, "Running Ruff linter...") | |
| # Run Ruff with JSON output | |
| stdout, stderr, returncode = await self._run_subprocess([ | |
| "ruff", "check", | |
| "--output-format=json", | |
| "--select=ALL", # Enable all rules | |
| file_path | |
| ]) | |
| # Ruff returns non-zero for violations, which is expected | |
| if stdout: | |
| try: | |
| ruff_output = json.loads(stdout) | |
| except json.JSONDecodeError: | |
| logger.warning(f"Failed to parse Ruff JSON output: {stdout[:200]}") | |
| return {"issues": []} | |
| # Convert Ruff format to our standard format | |
| issues = [] | |
| for violation in ruff_output: | |
| issues.append(self._format_issue( | |
| line=violation.get("location", {}).get("row", 0), | |
| column=violation.get("location", {}).get("column", 0), | |
| message=violation.get("message", ""), | |
| severity="error" if violation.get("code", "").startswith("E") else "warning", | |
| rule_id=violation.get("code"), | |
| fix=violation.get("fix", {}).get("message") if violation.get("fix") else None | |
| )) | |
| logger.info(f"Ruff found {len(issues)} issues") | |
| return {"issues": issues} | |
| return {"issues": []} | |
| except AnalysisError as e: | |
| logger.error(f"Ruff execution failed: {e}") | |
| raise | |
| async def _run_bandit(self, file_path: str, ctx: Any = None) -> Dict[str, Any]: | |
| """ | |
| ๐ก๏ธ Run Bandit security scanner | |
| Bandit finds common security issues in Python code | |
| """ | |
| try: | |
| if ctx: | |
| await ctx.report_progress(50, 100, "Running security scan...") | |
| # Run Bandit with JSON output | |
| stdout, stderr, returncode = await self._run_subprocess([ | |
| "bandit", | |
| "-f", "json", | |
| "-q", # Quiet mode | |
| file_path | |
| ]) | |
| if stdout: | |
| try: | |
| bandit_output = json.loads(stdout) | |
| except json.JSONDecodeError: | |
| logger.warning(f"Failed to parse Bandit JSON output") | |
| return {"issues": []} | |
| # Convert Bandit format to our standard format | |
| issues = [] | |
| for result in bandit_output.get("results", []): | |
| # Map Bandit severity to our levels | |
| severity_map = { | |
| "HIGH": "error", | |
| "MEDIUM": "warning", | |
| "LOW": "info" | |
| } | |
| issues.append(self._format_issue( | |
| line=result.get("line_number", 0), | |
| column=result.get("col_offset", 0), | |
| message=f"๐ก๏ธ Security: {result.get('issue_text', '')}", | |
| severity=severity_map.get(result.get("issue_severity", "LOW"), "info"), | |
| rule_id=result.get("test_id"), | |
| fix=None # Bandit doesn't provide auto-fixes | |
| )) | |
| logger.info(f"Bandit found {len(issues)} security issues") | |
| return {"issues": issues} | |
| return {"issues": []} | |
| except AnalysisError as e: | |
| logger.error(f"Bandit execution failed: {e}") | |
| raise | |
| async def _run_radon(self, file_path: str, ctx: Any = None) -> Dict[str, Any]: | |
| """ | |
| ๐ Run Radon complexity analyzer | |
| Calculates cyclomatic complexity and maintainability index | |
| """ | |
| try: | |
| if ctx: | |
| await ctx.report_progress(75, 100, "Calculating complexity metrics...") | |
| # Run Radon for cyclomatic complexity | |
| cc_stdout, _, _ = await self._run_subprocess([ | |
| "radon", "cc", | |
| "-s", # Show complexity score | |
| "-j", # JSON output | |
| file_path | |
| ]) | |
| # Run Radon for maintainability index | |
| mi_stdout, _, _ = await self._run_subprocess([ | |
| "radon", "mi", | |
| "-s", # Show score | |
| "-j", # JSON output | |
| file_path | |
| ]) | |
| complexity_data = {} | |
| # Parse cyclomatic complexity | |
| if cc_stdout: | |
| try: | |
| cc_data = json.loads(cc_stdout) | |
| complexity_data["cyclomatic_complexity"] = cc_data | |
| except json.JSONDecodeError: | |
| pass | |
| # Parse maintainability index | |
| if mi_stdout: | |
| try: | |
| mi_data = json.loads(mi_stdout) | |
| complexity_data["maintainability_index"] = mi_data | |
| except json.JSONDecodeError: | |
| pass | |
| logger.info("Complexity analysis complete") | |
| return complexity_data | |
| except AnalysisError as e: | |
| logger.warning(f"Radon execution failed: {e}") | |
| return {} | |
| # Convenience functions for direct use | |
| async def analyze_python(code: str, ctx: Any = None) -> Dict[str, Any]: | |
| """ | |
| ๐ Quick Python analysis function | |
| Args: | |
| code: Python source code | |
| ctx: Optional MCP context | |
| Returns: | |
| Analysis results | |
| """ | |
| analyzer = PythonAnalyzer() | |
| return await analyzer.analyze(code, ctx) | |
| async def scan_security_python(code: str, ctx: Any = None) -> Dict[str, Any]: | |
| """ | |
| ๐ก๏ธ Quick security-only scan | |
| Args: | |
| code: Python source code | |
| ctx: Optional MCP context | |
| Returns: | |
| Security vulnerabilities | |
| """ | |
| analyzer = PythonAnalyzer() | |
| temp_file = None | |
| try: | |
| temp_file = analyzer._create_temp_file(code, suffix='.py') | |
| result = await analyzer._run_bandit(temp_file, ctx) | |
| return { | |
| "vulnerabilities": result.get("issues", []), | |
| "vulnerability_count": len(result.get("issues", [])), | |
| "summary": { | |
| "critical": sum(1 for i in result.get("issues", []) if i.get("severity") == "error"), | |
| "high": sum(1 for i in result.get("issues", []) if i.get("severity") == "warning"), | |
| "medium": sum(1 for i in result.get("issues", []) if i.get("severity") == "info") | |
| } | |
| } | |
| finally: | |
| if temp_file: | |
| analyzer._cleanup_temp_file(temp_file) | |
| async def calculate_complexity_python(code: str, ctx: Any = None) -> Dict[str, Any]: | |
| """ | |
| ๐ Quick complexity calculation | |
| Args: | |
| code: Python source code | |
| ctx: Optional MCP context | |
| Returns: | |
| Complexity metrics | |
| """ | |
| analyzer = PythonAnalyzer() | |
| temp_file = None | |
| try: | |
| temp_file = analyzer._create_temp_file(code, suffix='.py') | |
| metrics = await analyzer._run_radon(temp_file, ctx) | |
| return { | |
| "metrics": metrics, | |
| "summary": { | |
| "has_complex_functions": bool(metrics.get("cyclomatic_complexity")), | |
| "maintainability_grade": "A" # Would parse from actual data | |
| } | |
| } | |
| finally: | |
| if temp_file: | |
| analyzer._cleanup_temp_file(temp_file) | |