Spaces:
Sleeping
Sleeping
| """ | |
| ๐๏ธ Base Analyzer Framework | |
| Provides robust subprocess handling and error management for all analyzers | |
| """ | |
| import asyncio | |
| import tempfile | |
| import os | |
| from pathlib import Path | |
| from typing import Dict, Any, List, Optional | |
| from abc import ABC, abstractmethod | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| class AnalysisError(Exception): | |
| """Custom exception for analysis errors""" | |
| pass | |
| class BaseAnalyzer(ABC): | |
| """ | |
| ๐๏ธ Base class for all code analyzers | |
| Handles subprocess execution, error handling, and result formatting | |
| """ | |
| def __init__(self, timeout: int = 30): | |
| """ | |
| Initialize analyzer | |
| Args: | |
| timeout: Maximum seconds for subprocess execution | |
| """ | |
| self.timeout = timeout | |
| async def analyze(self, code: str, ctx: Any = None) -> Dict[str, Any]: | |
| """ | |
| Analyze code and return results | |
| Args: | |
| code: Source code to analyze | |
| ctx: Optional MCP context for progress reporting | |
| Returns: | |
| Dict with analysis results | |
| """ | |
| pass | |
| async def _run_subprocess( | |
| self, | |
| command: List[str], | |
| input_text: Optional[str] = None, | |
| cwd: Optional[str] = None | |
| ) -> tuple[str, str, int]: | |
| """ | |
| ๐ง Robust subprocess execution with timeout and error handling | |
| Args: | |
| command: Command and arguments to execute | |
| input_text: Optional stdin input | |
| cwd: Working directory | |
| Returns: | |
| Tuple of (stdout, stderr, returncode) | |
| Raises: | |
| AnalysisError: If subprocess fails or times out | |
| """ | |
| try: | |
| logger.debug(f"Running command: {' '.join(command)}") | |
| process = await asyncio.create_subprocess_exec( | |
| *command, | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.PIPE, | |
| stdin=asyncio.subprocess.PIPE if input_text else None, | |
| cwd=cwd | |
| ) | |
| try: | |
| stdout, stderr = await asyncio.wait_for( | |
| process.communicate(input_text.encode() if input_text else None), | |
| timeout=self.timeout | |
| ) | |
| return ( | |
| stdout.decode('utf-8', errors='replace'), | |
| stderr.decode('utf-8', errors='replace'), | |
| process.returncode | |
| ) | |
| except asyncio.TimeoutError: | |
| process.kill() | |
| await process.wait() | |
| raise AnalysisError(f"Command timed out after {self.timeout}s: {' '.join(command)}") | |
| except FileNotFoundError: | |
| raise AnalysisError(f"Command not found: {command[0]}. Is it installed?") | |
| except Exception as e: | |
| raise AnalysisError(f"Subprocess execution failed: {e}") | |
| def _create_temp_file(self, code: str, suffix: str = '.py') -> str: | |
| """ | |
| Create temporary file with code content | |
| Args: | |
| code: Code content | |
| suffix: File extension | |
| Returns: | |
| Path to temporary file | |
| """ | |
| try: | |
| fd, temp_path = tempfile.mkstemp(suffix=suffix, text=True) | |
| with os.fdopen(fd, 'w', encoding='utf-8') as f: | |
| f.write(code) | |
| return temp_path | |
| except Exception as e: | |
| raise AnalysisError(f"Failed to create temp file: {e}") | |
| def _cleanup_temp_file(self, path: str): | |
| """Safely remove temporary file""" | |
| try: | |
| if path and os.path.exists(path): | |
| os.unlink(path) | |
| logger.debug(f"Cleaned up temp file: {path}") | |
| except Exception as e: | |
| logger.warning(f"Failed to cleanup temp file {path}: {e}") | |
| def _format_issue( | |
| self, | |
| line: int, | |
| column: int, | |
| message: str, | |
| severity: str, | |
| rule_id: Optional[str] = None, | |
| fix: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| ๐ Format issue in standard structure | |
| Args: | |
| line: Line number | |
| column: Column number | |
| message: Issue description | |
| severity: error/warning/info | |
| rule_id: Rule or check identifier | |
| fix: Optional fix suggestion | |
| Returns: | |
| Formatted issue dict | |
| """ | |
| return { | |
| "line": line, | |
| "column": column, | |
| "message": message, | |
| "severity": severity, | |
| "rule_id": rule_id, | |
| "fix": fix, | |
| "location": { | |
| "row": line, | |
| "col": column | |
| } | |
| } | |
| def _format_result( | |
| self, | |
| issues: List[Dict[str, Any]], | |
| summary: Optional[Dict[str, Any]] = None, | |
| metadata: Optional[Dict[str, Any]] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| ๐ Format analysis result in standard structure | |
| Args: | |
| issues: List of issues found | |
| summary: Optional summary statistics | |
| metadata: Optional metadata (tool version, etc.) | |
| Returns: | |
| Formatted result dict | |
| """ | |
| result = { | |
| "issues": issues, | |
| "issue_count": len(issues), | |
| "summary": summary or { | |
| "errors": sum(1 for i in issues if i.get("severity") == "error"), | |
| "warnings": sum(1 for i in issues if i.get("severity") == "warning"), | |
| "info": sum(1 for i in issues if i.get("severity") == "info") | |
| } | |
| } | |
| if metadata: | |
| result["metadata"] = metadata | |
| return result | |
| class AsyncAnalyzerPool: | |
| """ | |
| ๐ Async pool for parallel analysis operations | |
| Manages concurrent analyzer execution with resource limits | |
| """ | |
| def __init__(self, max_concurrent: int = 4): | |
| """ | |
| Initialize analyzer pool | |
| Args: | |
| max_concurrent: Maximum concurrent analysis operations | |
| """ | |
| self.max_concurrent = max_concurrent | |
| self.semaphore = asyncio.Semaphore(max_concurrent) | |
| async def analyze( | |
| self, | |
| analyzer: BaseAnalyzer, | |
| code: str, | |
| ctx: Any = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Run analysis with concurrency control | |
| Args: | |
| analyzer: Analyzer instance | |
| code: Code to analyze | |
| ctx: Optional context | |
| Returns: | |
| Analysis results | |
| """ | |
| async with self.semaphore: | |
| return await analyzer.analyze(code, ctx) | |
| async def analyze_batch( | |
| self, | |
| analyzer: BaseAnalyzer, | |
| codes: List[tuple[str, str]], # List of (filename, code) | |
| ctx: Any = None | |
| ) -> Dict[str, Dict[str, Any]]: | |
| """ | |
| Analyze multiple files in parallel | |
| Args: | |
| analyzer: Analyzer instance | |
| codes: List of (filename, code) tuples | |
| ctx: Optional context | |
| Returns: | |
| Dict mapping filename to results | |
| """ | |
| tasks = [] | |
| for filename, code in codes: | |
| task = self.analyze(analyzer, code, ctx) | |
| tasks.append((filename, task)) | |
| results = {} | |
| for filename, task in tasks: | |
| try: | |
| result = await task | |
| results[filename] = result | |
| except Exception as e: | |
| logger.error(f"Analysis failed for {filename}: {e}") | |
| results[filename] = { | |
| "error": str(e), | |
| "issues": [], | |
| "issue_count": 0 | |
| } | |
| return results | |