Spaces:
Sleeping
Sleeping
| """ | |
| JavaScript/TypeScript Code Analyzer | |
| Integrates multiple JavaScript analysis tools: | |
| - ESLint: Linting and code quality | |
| - Complexity Analysis: Cyclomatic complexity metrics | |
| """ | |
| import json | |
| import logging | |
| from typing import Any, Dict, List, Optional | |
| from pathlib import Path | |
| from .base import BaseAnalyzer, AnalysisError | |
| import asyncio | |
| import tempfile | |
| import os | |
| logger = logging.getLogger(__name__) | |
| class JavaScriptAnalyzer(BaseAnalyzer): | |
| """Analyzer for JavaScript and TypeScript code""" | |
| def __init__(self): | |
| super().__init__() | |
| self.supported_extensions = ['.js', '.jsx', '.ts', '.tsx', '.mjs', '.cjs'] | |
| async def analyze(self, code: str, ctx: Any = None, language: str = "javascript") -> Dict[str, Any]: | |
| """ | |
| Analyze JavaScript/TypeScript code for issues, complexity, and style | |
| Args: | |
| code: JavaScript/TypeScript code to analyze | |
| ctx: Optional MCP context for progress reporting | |
| language: 'javascript' or 'typescript' | |
| Returns: | |
| Dictionary with analysis results | |
| """ | |
| temp_file = None | |
| try: | |
| # Determine file extension | |
| extension = '.ts' if language == 'typescript' else '.js' | |
| temp_file = self._create_temp_file(code, suffix=extension) | |
| if ctx: | |
| await ctx.report_progress(0, 100, f"Starting {language} analysis...") | |
| # Run ESLint analysis | |
| if ctx: | |
| await ctx.report_progress(20, 100, "Running ESLint...") | |
| eslint_result = await self._run_eslint(temp_file, ctx) | |
| if ctx: | |
| await ctx.report_progress(75, 100, "Analyzing complexity...") | |
| complexity_result = await self._analyze_complexity(code, ctx) | |
| # Aggregate all issues | |
| all_issues = [] | |
| if isinstance(eslint_result, dict) and "issues" in eslint_result: | |
| all_issues.extend(eslint_result["issues"]) | |
| if ctx: | |
| await ctx.report_progress(100, 100, f"{language} analysis complete!") | |
| return self._format_result( | |
| issues=all_issues, | |
| summary={ | |
| "errors": sum(1 for i in all_issues if i.get("severity") == "error"), | |
| "warnings": sum(1 for i in all_issues if i.get("severity") == "warning"), | |
| "complexity": complexity_result if isinstance(complexity_result, dict) else {} | |
| }, | |
| metadata={ | |
| "language": language, | |
| "tools": ["eslint", "complexity"], | |
| "lines_of_code": len(code.splitlines()) | |
| } | |
| ) | |
| finally: | |
| if temp_file: | |
| self._cleanup_temp_file(temp_file) | |
| async def _run_eslint(self, file_path: str, ctx: Any = None) -> Dict[str, Any]: | |
| """ | |
| Run ESLint on JavaScript/TypeScript file | |
| Returns: | |
| Dictionary with issues found | |
| """ | |
| try: | |
| # Create minimal ESLint config | |
| config = { | |
| "env": { | |
| "browser": True, | |
| "es2021": True, | |
| "node": True | |
| }, | |
| "extends": "eslint:recommended", | |
| "parserOptions": { | |
| "ecmaVersion": "latest", | |
| "sourceType": "module" | |
| }, | |
| "rules": { | |
| "no-unused-vars": "warn", | |
| "no-undef": "error", | |
| "no-console": "off" | |
| } | |
| } | |
| # Create temp config file | |
| config_fd, config_path = tempfile.mkstemp(suffix='.json', text=True) | |
| try: | |
| os.write(config_fd, json.dumps(config).encode()) | |
| os.close(config_fd) | |
| # Try to run ESLint | |
| command = [ | |
| "npx", "--yes", "eslint", | |
| "--format", "json", | |
| "--config", config_path, | |
| file_path | |
| ] | |
| stdout, stderr, returncode = await self._run_subprocess( | |
| command, | |
| cwd=Path(file_path).parent | |
| ) | |
| # ESLint returns non-zero when it finds issues (normal behavior) | |
| if stdout: | |
| try: | |
| results = json.loads(stdout) | |
| issues = [] | |
| if results and len(results) > 0: | |
| for message in results[0].get("messages", []): | |
| severity = "error" if message.get("severity") == 2 else "warning" | |
| issue = self._format_issue( | |
| line=message.get("line", 0), | |
| column=message.get("column", 0), | |
| message=message.get("message", ""), | |
| severity=severity, | |
| rule_id=message.get("ruleId"), | |
| fix=message.get("fix") | |
| ) | |
| issues.append(issue) | |
| return {"issues": issues} | |
| except json.JSONDecodeError as e: | |
| logger.warning(f"Failed to parse ESLint JSON: {e}") | |
| return {"issues": []} | |
| # If ESLint is not available, return basic analysis | |
| logger.info("ESLint not available, skipping JavaScript linting") | |
| return {"issues": []} | |
| finally: | |
| # Cleanup config file | |
| try: | |
| os.unlink(config_path) | |
| except: | |
| pass | |
| except Exception as e: | |
| logger.error(f"ESLint analysis failed: {e}") | |
| # Fallback to basic pattern-based linting | |
| return await self._basic_lint(file_path) | |
| async def _analyze_complexity(self, code: str, ctx: Any = None) -> Dict[str, Any]: | |
| """ | |
| Analyze code complexity (basic implementation) | |
| Returns: | |
| Dictionary with complexity metrics | |
| """ | |
| try: | |
| lines = code.splitlines() | |
| # Count various complexity indicators | |
| function_count = sum(1 for line in lines if 'function' in line or '=>' in line) | |
| if_count = sum(1 for line in lines if 'if' in line or 'else' in line) | |
| loop_count = sum(1 for line in lines if 'for' in line or 'while' in line) | |
| switch_count = sum(1 for line in lines if 'switch' in line or 'case' in line) | |
| # Calculate approximate cyclomatic complexity | |
| # Base complexity = 1, +1 for each decision point | |
| cyclomatic_complexity = 1 + if_count + loop_count + switch_count | |
| return { | |
| "cyclomatic_complexity": cyclomatic_complexity, | |
| "function_count": function_count, | |
| "decision_points": if_count + loop_count + switch_count, | |
| "maintainability_index": self._calculate_maintainability( | |
| len(lines), | |
| cyclomatic_complexity, | |
| function_count | |
| ) | |
| } | |
| except Exception as e: | |
| logger.error(f"Complexity analysis failed: {e}") | |
| return {} | |
| async def _basic_lint(self, file_path: str) -> Dict[str, Any]: | |
| """ | |
| Basic pattern-based linting when ESLint is not available | |
| Enhanced with security and async/race condition detection | |
| """ | |
| issues = [] | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| code = f.read() | |
| lines = code.splitlines() | |
| for line_num, line in enumerate(lines, 1): | |
| # Check for common JavaScript issues | |
| # == instead of === | |
| if ' == ' in line and '===' not in line: | |
| issues.append(self._format_issue( | |
| line=line_num, | |
| column=line.find(' == '), | |
| message="Use '===' instead of '=='", | |
| severity="warning", | |
| rule_id="eqeqeq" | |
| )) | |
| # eval() usage - CRITICAL SECURITY | |
| if 'eval(' in line: | |
| issues.append(self._format_issue( | |
| line=line_num, | |
| column=line.find('eval('), | |
| message="eval() is dangerous and should be avoided - Code Injection Risk", | |
| severity="error", | |
| rule_id="no-eval" | |
| )) | |
| # console.log in production | |
| if 'console.log' in line: | |
| issues.append(self._format_issue( | |
| line=line_num, | |
| column=line.find('console.log'), | |
| message="Unexpected console statement", | |
| severity="warning", | |
| rule_id="no-console" | |
| )) | |
| # var instead of let/const | |
| if line.strip().startswith('var '): | |
| issues.append(self._format_issue( | |
| line=line_num, | |
| column=0, | |
| message="Use 'let' or 'const' instead of 'var'", | |
| severity="warning", | |
| rule_id="no-var" | |
| )) | |
| # Assignment in condition | |
| if 'if' in line and ' = ' in line and '==' not in line and '!=' not in line: | |
| issues.append(self._format_issue( | |
| line=line_num, | |
| column=line.find(' = '), | |
| message="Expected a conditional expression and instead saw an assignment", | |
| severity="error", | |
| rule_id="no-cond-assign" | |
| )) | |
| # Hardcoded credentials - SECURITY | |
| credential_keywords = ['password', 'secret', 'token', 'api_key', 'apikey', 'private_key'] | |
| for keyword in credential_keywords: | |
| if keyword.lower() in line.lower() and ('=' in line or ':' in line): | |
| if any(c in line for c in ['"', "'"]): | |
| issues.append(self._format_issue( | |
| line=line_num, | |
| column=0, | |
| message=f"Possible hardcoded {keyword} detected - Security Risk", | |
| severity="error", | |
| rule_id="no-hardcoded-credentials" | |
| )) | |
| break | |
| # Missing semicolon (simplified check) | |
| stripped = line.strip() | |
| if stripped and not stripped.startswith('//') and not stripped.startswith('/*'): | |
| if any(stripped.startswith(kw) for kw in ['const ', 'let ', 'var ', 'return ']): | |
| if not stripped.endswith((';', '{', '}', ',', '(', ')')): | |
| issues.append(self._format_issue( | |
| line=line_num, | |
| column=len(line) - 1, | |
| message="Missing semicolon", | |
| severity="warning", | |
| rule_id="semi" | |
| )) | |
| # NEW: Prototype pollution - SECURITY | |
| if 'for' in line and 'in' in line and '[' in line and ']' in line: | |
| if 'hasOwnProperty' not in line and 'Object.prototype' not in line: | |
| issues.append(self._format_issue( | |
| line=line_num, | |
| column=0, | |
| message="Potential prototype pollution - Use hasOwnProperty() check", | |
| severity="error", | |
| rule_id="no-prototype-builtins" | |
| )) | |
| # NEW: Missing await in async functions | |
| if 'async' in line and 'function' in line: | |
| # Track async function starts | |
| pass | |
| if any(keyword in line for keyword in ['Promise(', '.then(', '.catch(']): | |
| if 'await' not in line and 'return' not in line: | |
| issues.append(self._format_issue( | |
| line=line_num, | |
| column=0, | |
| message="Missing 'await' for Promise - Potential unhandled rejection", | |
| severity="warning", | |
| rule_id="no-async-promise-executor" | |
| )) | |
| # NEW: Race conditions - concurrent operations on shared state | |
| if '++' in line or '--' in line: | |
| # Check if this is modifying a variable that could be shared | |
| var_name = line.split('++')[0].split('--')[0].strip().split()[-1] | |
| if not any(keyword in line for keyword in ['let ', 'const ', 'var ']): | |
| issues.append(self._format_issue( | |
| line=line_num, | |
| column=line.find('++') if '++' in line else line.find('--'), | |
| message=f"Potential race condition - '{var_name}' may be modified concurrently", | |
| severity="warning", | |
| rule_id="require-atomic-updates" | |
| )) | |
| # NEW: Blocking synchronous operations - PERFORMANCE | |
| sync_operations = ['readFileSync', 'writeFileSync', 'readdirSync', 'statSync', 'unlinkSync'] | |
| for sync_op in sync_operations: | |
| if sync_op in line: | |
| issues.append(self._format_issue( | |
| line=line_num, | |
| column=line.find(sync_op), | |
| message=f"'{sync_op}' blocks the event loop - Use async version", | |
| severity="warning", | |
| rule_id="no-sync" | |
| )) | |
| # NEW: Unsafe regular expressions - ReDoS attack | |
| if 'new RegExp' in line or 're.compile' in line: | |
| if any(pattern in line for pattern in ['.*', '.+', '(.*)', '(.+)']): | |
| issues.append(self._format_issue( | |
| line=line_num, | |
| column=0, | |
| message="Potentially unsafe regex - ReDoS vulnerability risk", | |
| severity="warning", | |
| rule_id="no-unsafe-regex" | |
| )) | |
| # NEW: SQL Injection patterns - CRITICAL SECURITY | |
| if any(keyword in line.lower() for keyword in ['select ', 'insert ', 'update ', 'delete ']): | |
| if any(concat in line for concat in ['+', '${', '`']): | |
| issues.append(self._format_issue( | |
| line=line_num, | |
| column=0, | |
| message="Possible SQL injection - Use parameterized queries", | |
| severity="error", | |
| rule_id="detect-sql-injection" | |
| )) | |
| # NEW: Command injection - CRITICAL SECURITY | |
| dangerous_exec = ['exec(', 'spawn(', 'execSync(', 'spawnSync(', 'child_process'] | |
| for exec_func in dangerous_exec: | |
| if exec_func in line: | |
| if 'req.' in line or 'input' in line.lower(): | |
| issues.append(self._format_issue( | |
| line=line_num, | |
| column=line.find(exec_func), | |
| message=f"Command injection risk with {exec_func} - Validate and sanitize input", | |
| severity="error", | |
| rule_id="detect-child-process" | |
| )) | |
| # NEW: Weak crypto - SECURITY | |
| weak_crypto = ['md5', 'sha1', 'Math.random()'] | |
| for weak in weak_crypto: | |
| if weak in line: | |
| issues.append(self._format_issue( | |
| line=line_num, | |
| column=line.find(weak), | |
| message=f"Weak cryptography detected: {weak} - Use crypto.randomBytes() or stronger hash", | |
| severity="error", | |
| rule_id="detect-weak-crypto" | |
| )) | |
| # NEW: Path traversal - SECURITY | |
| if 'readFile' in line or 'writeFile' in line or 'require(' in line: | |
| if any(pattern in line for pattern in ['../', '..\\']): | |
| issues.append(self._format_issue( | |
| line=line_num, | |
| column=0, | |
| message="Path traversal vulnerability - Validate file paths", | |
| severity="error", | |
| rule_id="detect-path-traversal" | |
| )) | |
| # NEW: CORS misconfiguration - SECURITY | |
| if 'Access-Control-Allow-Origin' in line and '*' in line: | |
| issues.append(self._format_issue( | |
| line=line_num, | |
| column=line.find('*'), | |
| message="Overly permissive CORS policy - Restrict allowed origins", | |
| severity="warning", | |
| rule_id="detect-cors-misconfiguration" | |
| )) | |
| # NEW: Memory leaks - setInterval without clearInterval | |
| if 'setInterval' in line: | |
| issues.append(self._format_issue( | |
| line=line_num, | |
| column=line.find('setInterval'), | |
| message="Potential memory leak - Ensure clearInterval() is called", | |
| severity="warning", | |
| rule_id="detect-memory-leak" | |
| )) | |
| # NEW: Insecure randomness for security purposes | |
| if 'Math.random()' in line: | |
| if any(keyword in line.lower() for keyword in ['token', 'session', 'id', 'key']): | |
| issues.append(self._format_issue( | |
| line=line_num, | |
| column=line.find('Math.random()'), | |
| message="Math.random() is not cryptographically secure - Use crypto.randomBytes()", | |
| severity="error", | |
| rule_id="detect-insecure-randomness" | |
| )) | |
| # NEW: Type coercion bugs with + operator | |
| if '+' in line and any(var in line for var in ['req.', 'input', 'params', 'query', 'body']): | |
| if not any(keyword in line for keyword in ['parseInt', 'parseFloat', 'Number(']): | |
| issues.append(self._format_issue( | |
| line=line_num, | |
| column=line.find('+'), | |
| message="Potential type coercion bug - Use parseInt/parseFloat for numeric operations", | |
| severity="warning", | |
| rule_id="detect-type-coercion" | |
| )) | |
| # NEW: Floating point comparison | |
| if any(op in line for op in [' == ', ' === ']) and any(num in line for num in ['.', '0.', '1.']): | |
| if 'toFixed' not in line and 'Math.abs' not in line: | |
| issues.append(self._format_issue( | |
| line=line_num, | |
| column=0, | |
| message="Floating point comparison may be inaccurate - Use Math.abs(a - b) < epsilon", | |
| severity="warning", | |
| rule_id="no-floating-point-equality" | |
| )) | |
| # NEW: Callback hell detection (nested callbacks) | |
| indent_level = len(line) - len(line.lstrip()) | |
| if 'function(' in line or 'function (' in line: | |
| if indent_level > 16: # More than 4 levels of nesting | |
| issues.append(self._format_issue( | |
| line=line_num, | |
| column=0, | |
| message="Deep callback nesting detected - Consider using async/await or Promises", | |
| severity="warning", | |
| rule_id="callback-hell" | |
| )) | |
| # NEW: Unreachable code after return | |
| if 'return' in line and not line.strip().startswith('//'): | |
| # Check if next line has code at same or deeper indent | |
| if line_num < len(lines): | |
| next_line = lines[line_num] if line_num < len(lines) else "" | |
| next_stripped = next_line.strip() | |
| if next_stripped and not next_stripped.startswith('}') and not next_stripped.startswith('//'): | |
| next_indent = len(next_line) - len(next_line.lstrip()) | |
| current_indent = len(line) - len(line.lstrip()) | |
| if next_indent >= current_indent: | |
| issues.append(self._format_issue( | |
| line=line_num + 1, | |
| column=0, | |
| message="Unreachable code detected", | |
| severity="error", | |
| rule_id="no-unreachable" | |
| )) | |
| # NEW: Unused variable declaration | |
| if any(line.strip().startswith(kw) for kw in ['const ', 'let ', 'var ']): | |
| var_match = line.split('=')[0].strip().split()[-1] if '=' in line else None | |
| if var_match and var_match not in ['req', 'res', 'next', 'err', 'error']: | |
| # Simple heuristic - if variable name suggests it's unused | |
| if 'unused' in var_match.lower() or 'temp' in var_match.lower(): | |
| issues.append(self._format_issue( | |
| line=line_num, | |
| column=0, | |
| message=f"Variable '{var_match}' appears to be unused", | |
| severity="warning", | |
| rule_id="no-unused-vars" | |
| )) | |
| # NEW: Missing error handling in async functions | |
| if any(keyword in line for keyword in ['async function', 'async (', 'async(']): | |
| # Mark async function start for try-catch checking | |
| pass | |
| if any(keyword in line for keyword in ['.then(', '.catch(', 'await ']): | |
| # Check if there's error handling nearby | |
| has_catch = False | |
| for check_line in lines[max(0, line_num-3):min(len(lines), line_num+3)]: | |
| if '.catch' in check_line or 'try' in check_line or 'catch' in check_line: | |
| has_catch = True | |
| break | |
| if not has_catch: | |
| issues.append(self._format_issue( | |
| line=line_num, | |
| column=0, | |
| message="Missing error handling for async operation", | |
| severity="warning", | |
| rule_id="require-error-handling" | |
| )) | |
| # NEW: Express route without error handling | |
| if 'app.' in line and any(method in line for method in ['get(', 'post(', 'put(', 'delete(', 'patch(']): | |
| # Check if route has try-catch or error parameter | |
| has_error_handling = False | |
| for check_line in lines[line_num:min(len(lines), line_num+10)]: | |
| if 'try' in check_line or 'catch' in check_line or ', err' in check_line: | |
| has_error_handling = True | |
| break | |
| if not has_error_handling: | |
| issues.append(self._format_issue( | |
| line=line_num, | |
| column=0, | |
| message="Express route missing error handling", | |
| severity="warning", | |
| rule_id="express-no-error-handler" | |
| )) | |
| return {"issues": issues} | |
| except Exception as e: | |
| logger.error(f"Basic linting failed: {e}") | |
| return {"issues": []} | |
| def _calculate_maintainability(self, loc: int, complexity: int, functions: int) -> float: | |
| """ | |
| Calculate a simple maintainability index | |
| Scale: 0-100 (higher is better) | |
| """ | |
| if loc == 0: | |
| return 100.0 | |
| # Penalize high complexity and low function density | |
| complexity_penalty = min(complexity * 2, 50) | |
| function_density = (functions / loc * 100) if loc > 0 else 0 | |
| # Base score of 100, subtract penalties | |
| score = 100 - complexity_penalty + function_density | |
| # Clamp to 0-100 | |
| return max(0, min(100, score)) | |
| # Convenience functions | |
| async def analyze_javascript(code: str, ctx: Any = None) -> Dict[str, Any]: | |
| """ | |
| Convenience function to analyze JavaScript code | |
| Args: | |
| code: JavaScript code to analyze | |
| ctx: Optional MCP context for progress reporting | |
| Returns: | |
| Analysis results dictionary | |
| """ | |
| analyzer = JavaScriptAnalyzer() | |
| return await analyzer.analyze(code, ctx, language="javascript") | |
| async def analyze_typescript(code: str, ctx: Any = None) -> Dict[str, Any]: | |
| """ | |
| Convenience function to analyze TypeScript code | |
| Args: | |
| code: TypeScript code to analyze | |
| ctx: Optional MCP context for progress reporting | |
| Returns: | |
| Analysis results dictionary | |
| """ | |
| analyzer = JavaScriptAnalyzer() | |
| return await analyzer.analyze(code, ctx, language="typescript") | |
| async def calculate_complexity_javascript(code: str, ctx: Any = None) -> Dict[str, Any]: | |
| """ | |
| Calculate complexity metrics for JavaScript code | |
| Args: | |
| code: JavaScript code to analyze | |
| ctx: Optional MCP context | |
| Returns: | |
| Complexity metrics dictionary | |
| """ | |
| analyzer = JavaScriptAnalyzer() | |
| return await analyzer._analyze_complexity(code, ctx) | |