OsamaAliMid's picture
Add CodeLint MCP Premium Edition application
ec37394
"""
๐Ÿ—๏ธ Base Analyzer Framework
Provides robust subprocess handling and error management for all analyzers
"""
import asyncio
import tempfile
import os
from pathlib import Path
from typing import Dict, Any, List, Optional
from abc import ABC, abstractmethod
import logging
logger = logging.getLogger(__name__)
class AnalysisError(Exception):
"""Custom exception for analysis errors"""
pass
class BaseAnalyzer(ABC):
"""
๐Ÿ—๏ธ Base class for all code analyzers
Handles subprocess execution, error handling, and result formatting
"""
def __init__(self, timeout: int = 30):
"""
Initialize analyzer
Args:
timeout: Maximum seconds for subprocess execution
"""
self.timeout = timeout
@abstractmethod
async def analyze(self, code: str, ctx: Any = None) -> Dict[str, Any]:
"""
Analyze code and return results
Args:
code: Source code to analyze
ctx: Optional MCP context for progress reporting
Returns:
Dict with analysis results
"""
pass
async def _run_subprocess(
self,
command: List[str],
input_text: Optional[str] = None,
cwd: Optional[str] = None
) -> tuple[str, str, int]:
"""
๐Ÿ”ง Robust subprocess execution with timeout and error handling
Args:
command: Command and arguments to execute
input_text: Optional stdin input
cwd: Working directory
Returns:
Tuple of (stdout, stderr, returncode)
Raises:
AnalysisError: If subprocess fails or times out
"""
try:
logger.debug(f"Running command: {' '.join(command)}")
process = await asyncio.create_subprocess_exec(
*command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.PIPE if input_text else None,
cwd=cwd
)
try:
stdout, stderr = await asyncio.wait_for(
process.communicate(input_text.encode() if input_text else None),
timeout=self.timeout
)
return (
stdout.decode('utf-8', errors='replace'),
stderr.decode('utf-8', errors='replace'),
process.returncode
)
except asyncio.TimeoutError:
process.kill()
await process.wait()
raise AnalysisError(f"Command timed out after {self.timeout}s: {' '.join(command)}")
except FileNotFoundError:
raise AnalysisError(f"Command not found: {command[0]}. Is it installed?")
except Exception as e:
raise AnalysisError(f"Subprocess execution failed: {e}")
def _create_temp_file(self, code: str, suffix: str = '.py') -> str:
"""
Create temporary file with code content
Args:
code: Code content
suffix: File extension
Returns:
Path to temporary file
"""
try:
fd, temp_path = tempfile.mkstemp(suffix=suffix, text=True)
with os.fdopen(fd, 'w', encoding='utf-8') as f:
f.write(code)
return temp_path
except Exception as e:
raise AnalysisError(f"Failed to create temp file: {e}")
def _cleanup_temp_file(self, path: str):
"""Safely remove temporary file"""
try:
if path and os.path.exists(path):
os.unlink(path)
logger.debug(f"Cleaned up temp file: {path}")
except Exception as e:
logger.warning(f"Failed to cleanup temp file {path}: {e}")
def _format_issue(
self,
line: int,
column: int,
message: str,
severity: str,
rule_id: Optional[str] = None,
fix: Optional[str] = None
) -> Dict[str, Any]:
"""
๐Ÿ“‹ Format issue in standard structure
Args:
line: Line number
column: Column number
message: Issue description
severity: error/warning/info
rule_id: Rule or check identifier
fix: Optional fix suggestion
Returns:
Formatted issue dict
"""
return {
"line": line,
"column": column,
"message": message,
"severity": severity,
"rule_id": rule_id,
"fix": fix,
"location": {
"row": line,
"col": column
}
}
def _format_result(
self,
issues: List[Dict[str, Any]],
summary: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
๐Ÿ“Š Format analysis result in standard structure
Args:
issues: List of issues found
summary: Optional summary statistics
metadata: Optional metadata (tool version, etc.)
Returns:
Formatted result dict
"""
result = {
"issues": issues,
"issue_count": len(issues),
"summary": summary or {
"errors": sum(1 for i in issues if i.get("severity") == "error"),
"warnings": sum(1 for i in issues if i.get("severity") == "warning"),
"info": sum(1 for i in issues if i.get("severity") == "info")
}
}
if metadata:
result["metadata"] = metadata
return result
class AsyncAnalyzerPool:
"""
๐Ÿš€ Async pool for parallel analysis operations
Manages concurrent analyzer execution with resource limits
"""
def __init__(self, max_concurrent: int = 4):
"""
Initialize analyzer pool
Args:
max_concurrent: Maximum concurrent analysis operations
"""
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)
async def analyze(
self,
analyzer: BaseAnalyzer,
code: str,
ctx: Any = None
) -> Dict[str, Any]:
"""
Run analysis with concurrency control
Args:
analyzer: Analyzer instance
code: Code to analyze
ctx: Optional context
Returns:
Analysis results
"""
async with self.semaphore:
return await analyzer.analyze(code, ctx)
async def analyze_batch(
self,
analyzer: BaseAnalyzer,
codes: List[tuple[str, str]], # List of (filename, code)
ctx: Any = None
) -> Dict[str, Dict[str, Any]]:
"""
Analyze multiple files in parallel
Args:
analyzer: Analyzer instance
codes: List of (filename, code) tuples
ctx: Optional context
Returns:
Dict mapping filename to results
"""
tasks = []
for filename, code in codes:
task = self.analyze(analyzer, code, ctx)
tasks.append((filename, task))
results = {}
for filename, task in tasks:
try:
result = await task
results[filename] = result
except Exception as e:
logger.error(f"Analysis failed for {filename}: {e}")
results[filename] = {
"error": str(e),
"issues": [],
"issue_count": 0
}
return results