""" 🏗️ Base Analyzer Framework Provides robust subprocess handling and error management for all analyzers """ import asyncio import tempfile import os from pathlib import Path from typing import Dict, Any, List, Optional from abc import ABC, abstractmethod import logging logger = logging.getLogger(__name__) class AnalysisError(Exception): """Custom exception for analysis errors""" pass class BaseAnalyzer(ABC): """ 🏗️ Base class for all code analyzers Handles subprocess execution, error handling, and result formatting """ def __init__(self, timeout: int = 30): """ Initialize analyzer Args: timeout: Maximum seconds for subprocess execution """ self.timeout = timeout @abstractmethod async def analyze(self, code: str, ctx: Any = None) -> Dict[str, Any]: """ Analyze code and return results Args: code: Source code to analyze ctx: Optional MCP context for progress reporting Returns: Dict with analysis results """ pass async def _run_subprocess( self, command: List[str], input_text: Optional[str] = None, cwd: Optional[str] = None ) -> tuple[str, str, int]: """ 🔧 Robust subprocess execution with timeout and error handling Args: command: Command and arguments to execute input_text: Optional stdin input cwd: Working directory Returns: Tuple of (stdout, stderr, returncode) Raises: AnalysisError: If subprocess fails or times out """ try: logger.debug(f"Running command: {' '.join(command)}") process = await asyncio.create_subprocess_exec( *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE if input_text else None, cwd=cwd ) try: stdout, stderr = await asyncio.wait_for( process.communicate(input_text.encode() if input_text else None), timeout=self.timeout ) return ( stdout.decode('utf-8', errors='replace'), stderr.decode('utf-8', errors='replace'), process.returncode ) except asyncio.TimeoutError: process.kill() await process.wait() raise AnalysisError(f"Command timed out after {self.timeout}s: {' '.join(command)}") except FileNotFoundError: raise AnalysisError(f"Command not found: {command[0]}. Is it installed?") except Exception as e: raise AnalysisError(f"Subprocess execution failed: {e}") def _create_temp_file(self, code: str, suffix: str = '.py') -> str: """ Create temporary file with code content Args: code: Code content suffix: File extension Returns: Path to temporary file """ try: fd, temp_path = tempfile.mkstemp(suffix=suffix, text=True) with os.fdopen(fd, 'w', encoding='utf-8') as f: f.write(code) return temp_path except Exception as e: raise AnalysisError(f"Failed to create temp file: {e}") def _cleanup_temp_file(self, path: str): """Safely remove temporary file""" try: if path and os.path.exists(path): os.unlink(path) logger.debug(f"Cleaned up temp file: {path}") except Exception as e: logger.warning(f"Failed to cleanup temp file {path}: {e}") def _format_issue( self, line: int, column: int, message: str, severity: str, rule_id: Optional[str] = None, fix: Optional[str] = None ) -> Dict[str, Any]: """ 📋 Format issue in standard structure Args: line: Line number column: Column number message: Issue description severity: error/warning/info rule_id: Rule or check identifier fix: Optional fix suggestion Returns: Formatted issue dict """ return { "line": line, "column": column, "message": message, "severity": severity, "rule_id": rule_id, "fix": fix, "location": { "row": line, "col": column } } def _format_result( self, issues: List[Dict[str, Any]], summary: Optional[Dict[str, Any]] = None, metadata: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ 📊 Format analysis result in standard structure Args: issues: List of issues found summary: Optional summary statistics metadata: Optional metadata (tool version, etc.) Returns: Formatted result dict """ result = { "issues": issues, "issue_count": len(issues), "summary": summary or { "errors": sum(1 for i in issues if i.get("severity") == "error"), "warnings": sum(1 for i in issues if i.get("severity") == "warning"), "info": sum(1 for i in issues if i.get("severity") == "info") } } if metadata: result["metadata"] = metadata return result class AsyncAnalyzerPool: """ 🚀 Async pool for parallel analysis operations Manages concurrent analyzer execution with resource limits """ def __init__(self, max_concurrent: int = 4): """ Initialize analyzer pool Args: max_concurrent: Maximum concurrent analysis operations """ self.max_concurrent = max_concurrent self.semaphore = asyncio.Semaphore(max_concurrent) async def analyze( self, analyzer: BaseAnalyzer, code: str, ctx: Any = None ) -> Dict[str, Any]: """ Run analysis with concurrency control Args: analyzer: Analyzer instance code: Code to analyze ctx: Optional context Returns: Analysis results """ async with self.semaphore: return await analyzer.analyze(code, ctx) async def analyze_batch( self, analyzer: BaseAnalyzer, codes: List[tuple[str, str]], # List of (filename, code) ctx: Any = None ) -> Dict[str, Dict[str, Any]]: """ Analyze multiple files in parallel Args: analyzer: Analyzer instance codes: List of (filename, code) tuples ctx: Optional context Returns: Dict mapping filename to results """ tasks = [] for filename, code in codes: task = self.analyze(analyzer, code, ctx) tasks.append((filename, task)) results = {} for filename, task in tasks: try: result = await task results[filename] = result except Exception as e: logger.error(f"Analysis failed for {filename}: {e}") results[filename] = { "error": str(e), "issues": [], "issue_count": 0 } return results