Spaces:
Running
Running
MugdhaV
Initial deployment: Gradio frontend with Modal backend - Multi-language security scanner with parallel processing
e1e9580 | #!/usr/bin/env python3 | |
| """ | |
| Security Checker Application | |
| ============================ | |
| A comprehensive security analysis tool that combines: | |
| 1. Static Application Security Testing (SAST) | |
| 2. NIST National Vulnerability Database (NVD) integration | |
| Think of this as a "security doctor" for your applications: | |
| - SAST = X-ray machine (looks inside without running) | |
| - NVD = Medical database (known vulnerabilities/diseases) | |
| - Report = Diagnosis with treatment plan | |
| Author: Security Checker Project | |
| """ | |
| import os | |
| import re | |
| import json | |
| import hashlib | |
| import asyncio | |
| import aiohttp | |
| from pathlib import Path | |
| from dataclasses import dataclass, field | |
| from typing import List, Dict, Optional, Tuple | |
| from enum import Enum | |
| from datetime import datetime | |
| from urllib.parse import urlparse | |
| import fnmatch | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| class RiskLevel(Enum): | |
| """ | |
| Risk levels follow CVSS (Common Vulnerability Scoring System). | |
| Think of it like triage in an emergency room: | |
| - CRITICAL: Life-threatening, needs immediate attention | |
| - HIGH: Serious condition, treat soon | |
| - MEDIUM: Concerning, schedule treatment | |
| - LOW: Minor issue, monitor | |
| - INFO: Just a note for awareness | |
| """ | |
| CRITICAL = "CRITICAL" # CVSS 9.0-10.0 | |
| HIGH = "HIGH" # CVSS 7.0-8.9 | |
| MEDIUM = "MEDIUM" # CVSS 4.0-6.9 | |
| LOW = "LOW" # CVSS 0.1-3.9 | |
| INFO = "INFO" # Informational | |
| class Vulnerability: | |
| """ | |
| Represents a single vulnerability found in the code. | |
| Analogy: This is like a medical diagnosis report entry: | |
| - name: Disease name | |
| - description: What's wrong | |
| - file_path: Where in the body (code) the problem is | |
| - line_number: Exact location | |
| - code_snippet: The problematic tissue sample | |
| - risk_level: How serious is it | |
| - remediation: Treatment plan | |
| - cve_ids: Reference to known disease database (NVD) | |
| """ | |
| name: str | |
| description: str | |
| file_path: str | |
| line_number: int | |
| code_snippet: str | |
| risk_level: RiskLevel | |
| remediation: str | |
| cve_ids: List[str] = field(default_factory=list) | |
| cwe_id: Optional[str] = None | |
| confidence: str = "HIGH" # HIGH, MEDIUM, LOW | |
| def to_dict(self) -> Dict: | |
| return { | |
| "name": self.name, | |
| "description": self.description, | |
| "file_path": self.file_path, | |
| "line_number": self.line_number, | |
| "code_snippet": self.code_snippet, | |
| "risk_level": self.risk_level.value, | |
| "remediation": self.remediation, | |
| "cve_ids": self.cve_ids, | |
| "cwe_id": self.cwe_id, | |
| "confidence": self.confidence | |
| } | |
| class ScanResult: | |
| """ | |
| Complete scan results - like a full medical report. | |
| """ | |
| target: str | |
| scan_type: str # "local" or "web" | |
| start_time: datetime | |
| end_time: Optional[datetime] = None | |
| vulnerabilities: List[Vulnerability] = field(default_factory=list) | |
| files_scanned: int = 0 | |
| errors: List[str] = field(default_factory=list) | |
| def summary(self) -> Dict: | |
| """Generate a summary of findings by risk level.""" | |
| summary = {level.value: 0 for level in RiskLevel} | |
| for vuln in self.vulnerabilities: | |
| summary[vuln.risk_level.value] += 1 | |
| return { | |
| "target": self.target, | |
| "scan_type": self.scan_type, | |
| "duration_seconds": (self.end_time - self.start_time).total_seconds() if self.end_time else None, | |
| "files_scanned": self.files_scanned, | |
| "total_vulnerabilities": len(self.vulnerabilities), | |
| "by_severity": summary, | |
| "errors": len(self.errors) | |
| } | |
| class SASTRule: | |
| """ | |
| A single SAST detection rule. | |
| Analogy: Like a specific test in a medical lab | |
| - pattern: What symptom to look for | |
| - name: Name of the condition | |
| - languages: Which "body types" this applies to | |
| """ | |
| def __init__( | |
| self, | |
| name: str, | |
| pattern: str, | |
| description: str, | |
| risk_level: RiskLevel, | |
| remediation: str, | |
| cwe_id: str, | |
| languages: List[str], | |
| false_positive_patterns: List[str] = None | |
| ): | |
| self.name = name | |
| self.pattern = re.compile(pattern, re.IGNORECASE | re.MULTILINE) | |
| self.description = description | |
| self.risk_level = risk_level | |
| self.remediation = remediation | |
| self.cwe_id = cwe_id | |
| self.languages = languages # File extensions: ['.py', '.js', etc.] | |
| self.false_positive_patterns = [ | |
| re.compile(fp, re.IGNORECASE) for fp in (false_positive_patterns or []) | |
| ] | |
| def matches(self, code: str, file_ext: str) -> List[Tuple[int, str]]: | |
| """ | |
| Find all matches in the code. | |
| Returns list of (line_number, matched_snippet). | |
| """ | |
| if file_ext not in self.languages: | |
| return [] | |
| matches = [] | |
| lines = code.split('\n') | |
| for i, line in enumerate(lines, 1): | |
| if self.pattern.search(line): | |
| # Check for false positives | |
| is_false_positive = any( | |
| fp.search(line) for fp in self.false_positive_patterns | |
| ) | |
| if not is_false_positive: | |
| # Get context (line before and after) | |
| start = max(0, i - 2) | |
| end = min(len(lines), i + 1) | |
| snippet = '\n'.join(lines[start:end]) | |
| matches.append((i, snippet)) | |
| return matches | |
| class SASTEngine: | |
| """ | |
| Static Application Security Testing Engine | |
| Analogy: This is like a diagnostic imaging department | |
| - Scans code without executing it (like X-ray/MRI) | |
| - Looks for known vulnerability patterns | |
| - Reports findings with locations | |
| How it works: | |
| 1. Load detection rules (what to look for) | |
| 2. Read source files | |
| 3. Match patterns against code | |
| 4. Report findings | |
| """ | |
| def __init__(self): | |
| self.rules = self._load_rules() | |
| self.file_extensions = { | |
| '.py': 'python', | |
| '.js': 'javascript', | |
| '.ts': 'typescript', | |
| '.jsx': 'javascript', | |
| '.tsx': 'typescript', | |
| '.java': 'java', | |
| '.php': 'php', | |
| '.rb': 'ruby', | |
| '.go': 'go', | |
| '.cs': 'csharp', | |
| '.c': 'c', | |
| '.cpp': 'cpp', | |
| '.h': 'c', | |
| '.hpp': 'cpp', | |
| '.sql': 'sql', | |
| '.html': 'html', | |
| '.htm': 'html', | |
| '.xml': 'xml', | |
| '.yml': 'yaml', | |
| '.yaml': 'yaml', | |
| '.json': 'json', | |
| '.sh': 'shell', | |
| '.bash': 'shell', | |
| } | |
| # Directories to skip (like avoiding scanning healthy tissue) | |
| self.skip_dirs = { | |
| 'node_modules', 'venv', '.venv', 'env', '.env', | |
| '__pycache__', '.git', '.svn', '.hg', | |
| 'dist', 'build', 'target', 'vendor', | |
| '.idea', '.vscode', 'coverage' | |
| } | |
| def _load_rules(self) -> List[SASTRule]: | |
| """ | |
| Load vulnerability detection rules. | |
| These rules are like a checklist of known security problems. | |
| Each rule defines: | |
| - A pattern to match (regex) | |
| - The type of vulnerability | |
| - How severe it is | |
| - How to fix it | |
| """ | |
| return [ | |
| # ============================================================ | |
| # INJECTION VULNERABILITIES (The "contamination" category) | |
| # Like checking for contaminants in food/medicine | |
| # ============================================================ | |
| SASTRule( | |
| name="SQL Injection", | |
| pattern=r"""(?:execute|cursor\.execute|query|raw|rawQuery|executeQuery)\s*\(\s*[f"'].*?%s.*?['"]\s*%|(?:execute|cursor\.execute)\s*\(\s*[f"'].*?\{.*?\}.*?['"]|(?:SELECT|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER).*?['"]\s*\+\s*|f['"]\s*(?:SELECT|INSERT|UPDATE|DELETE).*?\{""", | |
| description="Potential SQL Injection vulnerability. User input may be directly concatenated into SQL queries, allowing attackers to manipulate database operations.", | |
| risk_level=RiskLevel.CRITICAL, | |
| remediation="""Use parameterized queries or prepared statements: | |
| VULNERABLE: | |
| cursor.execute(f"SELECT * FROM users WHERE id = {user_id}") | |
| SECURE: | |
| cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,)) | |
| For ORMs, use built-in query builders instead of raw SQL.""", | |
| cwe_id="CWE-89", | |
| languages=['.py', '.java', '.php', '.js', '.ts', '.rb', '.go', '.cs'], | |
| false_positive_patterns=[r'#.*SQL', r'//.*SQL', r'/\*.*SQL'] | |
| ), | |
| SASTRule( | |
| name="Command Injection", | |
| pattern=r"""(?:os\.system|os\.popen|subprocess\.call|subprocess\.run|subprocess\.Popen|exec|eval|Runtime\.getRuntime\(\)\.exec|shell_exec|system|passthru|popen)\s*\([^)]*(?:\+|%|\.format|\{|\$)""", | |
| description="Potential Command Injection vulnerability. User input may be passed to system commands, allowing attackers to execute arbitrary commands.", | |
| risk_level=RiskLevel.CRITICAL, | |
| remediation="""Avoid passing user input to shell commands. If necessary: | |
| VULNERABLE: | |
| os.system(f"ping {user_input}") | |
| SECURE: | |
| import shlex | |
| subprocess.run(["ping", shlex.quote(user_input)], shell=False) | |
| Best practice: Use libraries instead of shell commands when possible.""", | |
| cwe_id="CWE-78", | |
| languages=['.py', '.java', '.php', '.js', '.ts', '.rb', '.go', '.sh'] | |
| ), | |
| SASTRule( | |
| name="XSS (Cross-Site Scripting)", | |
| pattern=r"""(?:innerHTML|outerHTML|document\.write|\.html\(|v-html|dangerouslySetInnerHTML|\[innerHTML\])\s*=?\s*(?:[^;]*(?:\+|`|\$\{))""", | |
| description="Potential Cross-Site Scripting (XSS) vulnerability. Untrusted data may be inserted into the DOM without proper encoding.", | |
| risk_level=RiskLevel.HIGH, | |
| remediation="""Sanitize and encode output before inserting into HTML: | |
| VULNERABLE: | |
| element.innerHTML = userInput; | |
| SECURE: | |
| element.textContent = userInput; // For text | |
| // Or use a sanitization library like DOMPurify: | |
| element.innerHTML = DOMPurify.sanitize(userInput); | |
| For React, avoid dangerouslySetInnerHTML unless absolutely necessary.""", | |
| cwe_id="CWE-79", | |
| languages=['.js', '.ts', '.jsx', '.tsx', '.html', '.php'] | |
| ), | |
| SASTRule( | |
| name="Path Traversal", | |
| pattern=r"""(?:open|read|write|file_get_contents|file_put_contents|include|require|fopen|readFile|writeFile|createReadStream)\s*\([^)]*(?:\+|`|\$\{|\.\./)""", | |
| description="Potential Path Traversal vulnerability. User input may be used to construct file paths, allowing attackers to access unauthorized files.", | |
| risk_level=RiskLevel.HIGH, | |
| remediation="""Validate and sanitize file paths: | |
| VULNERABLE: | |
| with open(f"/uploads/{filename}") as f: | |
| SECURE: | |
| import os | |
| safe_path = os.path.normpath(filename) | |
| if '..' in safe_path or safe_path.startswith('/'): | |
| raise ValueError("Invalid path") | |
| full_path = os.path.join(UPLOAD_DIR, safe_path) | |
| if not full_path.startswith(UPLOAD_DIR): | |
| raise ValueError("Path traversal detected")""", | |
| cwe_id="CWE-22", | |
| languages=['.py', '.java', '.php', '.js', '.ts', '.rb', '.go'] | |
| ), | |
| SASTRule( | |
| name="LDAP Injection", | |
| pattern=r"""(?:ldap_search|ldap_bind|search_s|search_ext_s)\s*\([^)]*(?:\+|%|\.format|\{)""", | |
| description="Potential LDAP Injection vulnerability. User input may be used in LDAP queries without proper escaping.", | |
| risk_level=RiskLevel.HIGH, | |
| remediation="""Escape special LDAP characters in user input: | |
| VULNERABLE: | |
| ldap.search_s(base, scope, f"(uid={username})") | |
| SECURE: | |
| from ldap3.utils.conv import escape_filter_chars | |
| safe_username = escape_filter_chars(username) | |
| ldap.search_s(base, scope, f"(uid={safe_username})")""", | |
| cwe_id="CWE-90", | |
| languages=['.py', '.java', '.php', '.cs'] | |
| ), | |
| # ============================================================ | |
| # AUTHENTICATION & SESSION VULNERABILITIES | |
| # Like checking if the locks and keys are secure | |
| # ============================================================ | |
| SASTRule( | |
| name="Hardcoded Credentials", | |
| pattern=r"""(?:password|passwd|pwd|secret|api_key|apikey|api_secret|access_token|auth_token|private_key)\s*[=:]\s*['"]\w{8,}['"]""", | |
| description="Hardcoded credentials detected. Sensitive information should not be stored in source code.", | |
| risk_level=RiskLevel.HIGH, | |
| remediation="""Store credentials securely: | |
| VULNERABLE: | |
| password = "MySecretPass123" | |
| api_key = "sk-1234567890abcdef" | |
| SECURE: | |
| import os | |
| password = os.environ.get('DB_PASSWORD') | |
| api_key = os.environ.get('API_KEY') | |
| Use environment variables, secrets managers (AWS Secrets Manager, | |
| HashiCorp Vault), or encrypted configuration files.""", | |
| cwe_id="CWE-798", | |
| languages=['.py', '.java', '.php', '.js', '.ts', '.rb', '.go', '.cs', '.yml', '.yaml', '.json'], | |
| false_positive_patterns=[r'example', r'placeholder', r'your_', r'<.*>', r'xxx', r'\$\{'] | |
| ), | |
| SASTRule( | |
| name="Weak Password Hashing", | |
| pattern=r"""(?:md5|sha1)\s*\(|hashlib\.(?:md5|sha1)\(|MessageDigest\.getInstance\s*\(\s*['"](MD5|SHA-?1)['"]|password.*=.*(?:md5|sha1)""", | |
| description="Weak hashing algorithm used for passwords. MD5 and SHA1 are cryptographically broken for password storage.", | |
| risk_level=RiskLevel.HIGH, | |
| remediation="""Use strong password hashing algorithms: | |
| VULNERABLE: | |
| hashed = hashlib.md5(password.encode()).hexdigest() | |
| SECURE: | |
| import bcrypt | |
| hashed = bcrypt.hashpw(password.encode(), bcrypt.gensalt()) | |
| # Or use argon2 (recommended): | |
| from argon2 import PasswordHasher | |
| ph = PasswordHasher() | |
| hashed = ph.hash(password) | |
| Recommended algorithms: Argon2, bcrypt, scrypt, PBKDF2""", | |
| cwe_id="CWE-328", | |
| languages=['.py', '.java', '.php', '.js', '.ts', '.rb', '.go', '.cs'] | |
| ), | |
| SASTRule( | |
| name="JWT Without Verification", | |
| pattern=r"""jwt\.decode\s*\([^)]*verify\s*=\s*False|algorithms\s*=\s*\[?\s*['"](none|HS256)['"]|\.decode\(\s*token\s*\)|jsonwebtoken\.decode\s*\(""", | |
| description="JWT token decoded without proper verification or using weak/no algorithm.", | |
| risk_level=RiskLevel.HIGH, | |
| remediation="""Always verify JWT signatures: | |
| VULNERABLE: | |
| payload = jwt.decode(token, verify=False) | |
| payload = jwt.decode(token, algorithms=['none']) | |
| SECURE: | |
| payload = jwt.decode( | |
| token, | |
| SECRET_KEY, | |
| algorithms=['RS256'], # Use asymmetric algorithms | |
| options={'verify_exp': True} | |
| ) | |
| Use RS256 or ES256 instead of HS256 for better security.""", | |
| cwe_id="CWE-347", | |
| languages=['.py', '.js', '.ts', '.java', '.go'] | |
| ), | |
| SASTRule( | |
| name="Session Fixation Risk", | |
| pattern=r"""session\s*\[\s*['"].*['"]\s*\]\s*=.*request\.|req\.session\s*=.*req\.(body|query|params)|session_id\s*=.*(?:GET|POST|request)""", | |
| description="Potential session fixation vulnerability. Session identifiers should be regenerated after authentication.", | |
| risk_level=RiskLevel.MEDIUM, | |
| remediation="""Regenerate session after authentication: | |
| VULNERABLE: | |
| session['user_id'] = user.id # Without regenerating | |
| SECURE (Python/Flask): | |
| from flask import session | |
| session.regenerate() # Regenerate session ID | |
| session['user_id'] = user.id | |
| SECURE (Node.js/Express): | |
| req.session.regenerate((err) => { | |
| req.session.userId = user.id; | |
| });""", | |
| cwe_id="CWE-384", | |
| languages=['.py', '.js', '.ts', '.php', '.java'] | |
| ), | |
| # ============================================================ | |
| # CRYPTOGRAPHIC VULNERABILITIES | |
| # Like checking if the safe is actually secure | |
| # ============================================================ | |
| SASTRule( | |
| name="Weak Cryptographic Algorithm", | |
| pattern=r"""(?:DES|RC4|RC2|Blowfish|IDEA)(?:\.|\s|Cipher)|Cipher\.getInstance\s*\(\s*['"](DES|RC4|Blowfish)['"]\)|from\s+Crypto\.Cipher\s+import\s+(DES|Blowfish)|cryptography.*(?:DES|RC4|Blowfish)""", | |
| description="Weak cryptographic algorithm detected. DES, RC4, RC2, and Blowfish are considered insecure.", | |
| risk_level=RiskLevel.HIGH, | |
| remediation="""Use modern cryptographic algorithms: | |
| VULNERABLE: | |
| from Crypto.Cipher import DES | |
| cipher = DES.new(key, DES.MODE_CBC) | |
| SECURE: | |
| from cryptography.fernet import Fernet | |
| # Or for low-level: | |
| from cryptography.hazmat.primitives.ciphers import Cipher, algorithms | |
| cipher = Cipher(algorithms.AES(key), modes.GCM(iv)) | |
| Recommended: AES-256-GCM, ChaCha20-Poly1305""", | |
| cwe_id="CWE-327", | |
| languages=['.py', '.java', '.js', '.ts', '.go', '.cs', '.php'] | |
| ), | |
| SASTRule( | |
| name="Insecure Random Number Generator", | |
| pattern=r"""(?:random\.random|random\.randint|Math\.random|rand\(\)|srand\(\)|mt_rand)\s*\(""", | |
| description="Insecure random number generator used. These are not cryptographically secure and shouldn't be used for security purposes.", | |
| risk_level=RiskLevel.MEDIUM, | |
| remediation="""Use cryptographically secure random generators: | |
| VULNERABLE: | |
| token = ''.join(random.choices(string.ascii_letters, k=32)) | |
| SECURE (Python): | |
| import secrets | |
| token = secrets.token_urlsafe(32) | |
| SECURE (JavaScript): | |
| const array = new Uint8Array(32); | |
| crypto.getRandomValues(array); | |
| SECURE (Java): | |
| SecureRandom random = new SecureRandom();""", | |
| cwe_id="CWE-338", | |
| languages=['.py', '.js', '.ts', '.java', '.php', '.c', '.cpp'], | |
| false_positive_patterns=[r'random\.seed', r'shuffle', r'sample'] | |
| ), | |
| SASTRule( | |
| name="Hardcoded Cryptographic Key", | |
| pattern=r"""(?:key|iv|nonce|salt)\s*[=:]\s*(?:b?['"]\w{16,}['"]|bytes\s*\(\s*['"]\w{16,}['"])""", | |
| description="Hardcoded cryptographic key detected. Encryption keys should never be stored in source code.", | |
| risk_level=RiskLevel.CRITICAL, | |
| remediation="""Store cryptographic keys securely: | |
| VULNERABLE: | |
| key = b'ThisIsASecretKey1234567890123456' | |
| SECURE: | |
| import os | |
| key = os.environ.get('ENCRYPTION_KEY').encode() | |
| # Or use a key management system: | |
| from aws_encryption_sdk import KMSMasterKeyProvider | |
| key_provider = KMSMasterKeyProvider(key_ids=[KEY_ARN]) | |
| Best practice: Use Hardware Security Modules (HSM) or | |
| Key Management Services (AWS KMS, Azure Key Vault).""", | |
| cwe_id="CWE-321", | |
| languages=['.py', '.java', '.js', '.ts', '.go', '.cs', '.php'] | |
| ), | |
| # ============================================================ | |
| # INSECURE DESERIALIZATION | |
| # Like accepting packages without checking what's inside | |
| # ============================================================ | |
| SASTRule( | |
| name="Insecure Deserialization (Python)", | |
| pattern=r"""pickle\.loads?\s*\(|yaml\.(?:unsafe_)?load\s*\([^)]*(?!Loader\s*=\s*yaml\.SafeLoader)|marshal\.loads?\s*\(|shelve\.open\s*\(""", | |
| description="Insecure deserialization detected. Deserializing untrusted data can lead to remote code execution.", | |
| risk_level=RiskLevel.CRITICAL, | |
| remediation="""Use safe deserialization methods: | |
| VULNERABLE: | |
| data = pickle.loads(user_input) | |
| config = yaml.load(file) | |
| SECURE: | |
| import json | |
| data = json.loads(user_input) # JSON is safe | |
| # For YAML, always use SafeLoader: | |
| config = yaml.load(file, Loader=yaml.SafeLoader) | |
| # Or better: | |
| config = yaml.safe_load(file) | |
| Never deserialize untrusted data with pickle/marshal.""", | |
| cwe_id="CWE-502", | |
| languages=['.py'] | |
| ), | |
| SASTRule( | |
| name="Insecure Deserialization (Java)", | |
| pattern=r"""ObjectInputStream\s*\(|readObject\s*\(\)|XMLDecoder\s*\(|XStream\.fromXML\s*\(|JSON\.parse\s*\(.*\)\.class""", | |
| description="Insecure deserialization detected in Java. Can lead to remote code execution.", | |
| risk_level=RiskLevel.CRITICAL, | |
| remediation="""Validate and filter deserialization: | |
| VULNERABLE: | |
| ObjectInputStream ois = new ObjectInputStream(input); | |
| Object obj = ois.readObject(); | |
| SECURE: | |
| // Use a whitelist filter | |
| ObjectInputFilter filter = ObjectInputFilter.Config.createFilter( | |
| "com.myapp.SafeClass;!*" | |
| ); | |
| ois.setObjectInputFilter(filter); | |
| // Or use JSON/Protocol Buffers instead of Java serialization | |
| Consider: Jackson with @JsonTypeInfo restrictions, | |
| or Protocol Buffers for type-safe serialization.""", | |
| cwe_id="CWE-502", | |
| languages=['.java'] | |
| ), | |
| SASTRule( | |
| name="Insecure Deserialization (JavaScript)", | |
| pattern=r"""(?:eval|Function)\s*\(\s*(?:JSON\.parse|atob|decodeURIComponent)|node-serialize|serialize-javascript.*(?:eval|Function)|unserialize\s*\(""", | |
| description="Insecure deserialization in JavaScript. Eval of untrusted data can lead to code execution.", | |
| risk_level=RiskLevel.CRITICAL, | |
| remediation="""Never eval deserialized data: | |
| VULNERABLE: | |
| eval(JSON.parse(userInput).code); | |
| const obj = serialize.unserialize(userInput); | |
| SECURE: | |
| const data = JSON.parse(userInput); | |
| // Validate structure before use | |
| if (typeof data.name !== 'string') { | |
| throw new Error('Invalid data'); | |
| } | |
| Avoid node-serialize and similar libraries with | |
| eval-based deserialization.""", | |
| cwe_id="CWE-502", | |
| languages=['.js', '.ts'] | |
| ), | |
| # ============================================================ | |
| # INFORMATION DISCLOSURE | |
| # Like leaving sensitive documents in public view | |
| # ============================================================ | |
| SASTRule( | |
| name="Debug Mode Enabled", | |
| pattern=r"""(?:DEBUG|debug)\s*[=:]\s*(?:True|true|1|['"](true|on|yes)['"])|app\.run\s*\([^)]*debug\s*=\s*True|FLASK_DEBUG\s*=\s*1""", | |
| description="Debug mode appears to be enabled. This can expose sensitive information in production.", | |
| risk_level=RiskLevel.MEDIUM, | |
| remediation="""Disable debug mode in production: | |
| VULNERABLE: | |
| app.run(debug=True) | |
| DEBUG = True | |
| SECURE: | |
| import os | |
| DEBUG = os.environ.get('DEBUG', 'False').lower() == 'true' | |
| # In production: | |
| app.run(debug=False) | |
| Use environment variables to control debug settings.""", | |
| cwe_id="CWE-215", | |
| languages=['.py', '.js', '.ts', '.java', '.php', '.rb', '.yml', '.yaml', '.json'], | |
| false_positive_patterns=[r'#.*DEBUG', r'//.*DEBUG', r'debug.*log'] | |
| ), | |
| SASTRule( | |
| name="Sensitive Data in Logs", | |
| pattern=r"""(?:log(?:ger)?\.(?:info|debug|warn|error|critical)|print|console\.log|System\.out\.print)\s*\([^)]*(?:password|secret|token|key|credit.?card|ssn|api.?key)""", | |
| description="Sensitive data may be written to logs. This can expose credentials and personal information.", | |
| risk_level=RiskLevel.MEDIUM, | |
| remediation="""Never log sensitive information: | |
| VULNERABLE: | |
| logger.info(f"User login: {username}, password: {password}") | |
| SECURE: | |
| logger.info(f"User login: {username}") | |
| # Or mask sensitive data: | |
| logger.info(f"API key: {api_key[:4]}****") | |
| Use structured logging with sensitive field filtering.""", | |
| cwe_id="CWE-532", | |
| languages=['.py', '.java', '.js', '.ts', '.rb', '.go', '.php'] | |
| ), | |
| SASTRule( | |
| name="Stack Trace Exposure", | |
| pattern=r"""(?:printStackTrace|traceback\.print_exc|console\.trace|e\.stack|err\.stack)\s*\(?\)?|except.*?:?\s*pass|rescue\s*=>\s*nil""", | |
| description="Stack traces may be exposed to users or exceptions silently ignored.", | |
| risk_level=RiskLevel.LOW, | |
| remediation="""Handle exceptions properly without exposing internals: | |
| VULNERABLE: | |
| except Exception as e: | |
| return str(e) # Exposes internal details | |
| SECURE: | |
| except Exception as e: | |
| logger.exception("Operation failed") # Log internally | |
| return {"error": "An error occurred"} # Generic message | |
| Never expose full stack traces to end users.""", | |
| cwe_id="CWE-209", | |
| languages=['.py', '.java', '.js', '.ts', '.rb', '.php'] | |
| ), | |
| # ============================================================ | |
| # SECURITY MISCONFIGURATION | |
| # Like leaving doors unlocked or windows open | |
| # ============================================================ | |
| SASTRule( | |
| name="CORS Wildcard", | |
| pattern=r"""(?:Access-Control-Allow-Origin|cors)\s*[=:]\s*['"]\*['"]|\.allowedOrigins\s*\(\s*['"]\*['"]|cors\s*\(\s*\{[^}]*origin\s*:\s*(?:true|['"]\*['"])""", | |
| description="CORS configured to allow all origins. This can enable cross-site request attacks.", | |
| risk_level=RiskLevel.MEDIUM, | |
| remediation="""Restrict CORS to specific trusted origins: | |
| VULNERABLE: | |
| Access-Control-Allow-Origin: * | |
| cors({ origin: '*' }) | |
| SECURE: | |
| cors({ | |
| origin: ['https://trusted-site.com'], | |
| methods: ['GET', 'POST'], | |
| credentials: true | |
| }) | |
| Never use wildcard CORS with credentials.""", | |
| cwe_id="CWE-942", | |
| languages=['.py', '.java', '.js', '.ts', '.php', '.rb', '.go'] | |
| ), | |
| SASTRule( | |
| name="SSL/TLS Verification Disabled", | |
| pattern=r"""verify\s*[=:]\s*False|VERIFY_SSL\s*=\s*False|ssl\s*[=:]\s*False|rejectUnauthorized\s*[=:]\s*false|InsecureSkipVerify\s*[=:]\s*true|CURLOPT_SSL_VERIFYPEER.*false""", | |
| description="SSL/TLS certificate verification is disabled. This makes the application vulnerable to man-in-the-middle attacks.", | |
| risk_level=RiskLevel.HIGH, | |
| remediation="""Always verify SSL/TLS certificates: | |
| VULNERABLE: | |
| requests.get(url, verify=False) | |
| https.request({rejectUnauthorized: false}) | |
| SECURE: | |
| requests.get(url, verify=True) | |
| # Or with custom CA: | |
| requests.get(url, verify='/path/to/ca-bundle.crt') | |
| If you need to use self-signed certs in development, | |
| use environment-based configuration.""", | |
| cwe_id="CWE-295", | |
| languages=['.py', '.java', '.js', '.ts', '.go', '.php', '.rb'] | |
| ), | |
| SASTRule( | |
| name="Insecure HTTP", | |
| pattern=r"""['"](http://(?!localhost|127\.0\.0\.1|0\.0\.0\.0)[^'"]+)['"]""", | |
| description="Insecure HTTP URL detected. Data transmitted over HTTP can be intercepted.", | |
| risk_level=RiskLevel.LOW, | |
| remediation="""Use HTTPS for all external communications: | |
| VULNERABLE: | |
| api_url = "http://api.example.com/data" | |
| SECURE: | |
| api_url = "https://api.example.com/data" | |
| Configure HSTS (HTTP Strict Transport Security) on your servers.""", | |
| cwe_id="CWE-319", | |
| languages=['.py', '.java', '.js', '.ts', '.go', '.php', '.rb', '.yml', '.yaml', '.json'], | |
| false_positive_patterns=[r'#.*http://', r'//.*http://', r'example\.com', r'schema.*http://'] | |
| ), | |
| SASTRule( | |
| name="Missing Security Headers", | |
| pattern=r"""(?:Content-Security-Policy|X-Frame-Options|X-Content-Type-Options|Strict-Transport-Security)\s*[=:]\s*['"]['""]|no_header|disable.*header""", | |
| description="Security headers may be missing or disabled. This can enable various attacks.", | |
| risk_level=RiskLevel.LOW, | |
| remediation="""Configure security headers: | |
| Add these headers to your responses: | |
| Content-Security-Policy: default-src 'self' | |
| X-Frame-Options: DENY | |
| X-Content-Type-Options: nosniff | |
| Strict-Transport-Security: max-age=31536000; includeSubDomains | |
| X-XSS-Protection: 1; mode=block | |
| Use helmet.js (Node), django-csp, or similar libraries.""", | |
| cwe_id="CWE-693", | |
| languages=['.py', '.java', '.js', '.ts', '.php', '.rb'] | |
| ), | |
| # ============================================================ | |
| # XML VULNERABILITIES | |
| # XML parsers can be tricked into dangerous behavior | |
| # ============================================================ | |
| SASTRule( | |
| name="XXE (XML External Entity)", | |
| pattern=r"""(?:xml\.etree|lxml|xml\.dom|xml\.sax|XMLReader|DocumentBuilder|SAXParser|XMLParser).*(?:parse|read|load)|<!ENTITY|SYSTEM\s+['""]|resolve_entities\s*=\s*True""", | |
| description="Potential XML External Entity (XXE) vulnerability. XML parsers should disable external entity processing.", | |
| risk_level=RiskLevel.HIGH, | |
| remediation="""Disable external entity processing: | |
| VULNERABLE (Python): | |
| tree = etree.parse(xml_file) | |
| SECURE (Python): | |
| parser = etree.XMLParser(resolve_entities=False, no_network=True) | |
| tree = etree.parse(xml_file, parser) | |
| SECURE (Java): | |
| DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance(); | |
| dbf.setFeature("http://apache.org/xml/features/disallow-doctype-decl", true); | |
| dbf.setExpandEntityReferences(false);""", | |
| cwe_id="CWE-611", | |
| languages=['.py', '.java', '.php', '.cs', '.rb'] | |
| ), | |
| # ============================================================ | |
| # SERVER-SIDE REQUEST FORGERY (SSRF) | |
| # Like being tricked into making calls you shouldn't | |
| # ============================================================ | |
| SASTRule( | |
| name="Server-Side Request Forgery (SSRF)", | |
| pattern=r"""(?:requests\.get|urllib\.request\.urlopen|http\.get|fetch|axios\.get|HttpClient)\s*\([^)]*(?:request\.|req\.|params\.|query\.|body\.|input|GET|POST)""", | |
| description="Potential SSRF vulnerability. User input may control server-side HTTP requests.", | |
| risk_level=RiskLevel.HIGH, | |
| remediation="""Validate and whitelist URLs: | |
| VULNERABLE: | |
| url = request.args.get('url') | |
| response = requests.get(url) | |
| SECURE: | |
| from urllib.parse import urlparse | |
| ALLOWED_HOSTS = ['api.trusted.com', 'data.trusted.com'] | |
| parsed = urlparse(url) | |
| if parsed.hostname not in ALLOWED_HOSTS: | |
| raise ValueError("URL not allowed") | |
| if parsed.scheme not in ['http', 'https']: | |
| raise ValueError("Invalid scheme") | |
| # Block internal IPs | |
| if is_internal_ip(parsed.hostname): | |
| raise ValueError("Internal URLs not allowed")""", | |
| cwe_id="CWE-918", | |
| languages=['.py', '.java', '.js', '.ts', '.go', '.php', '.rb'] | |
| ), | |
| # ============================================================ | |
| # ADDITIONAL COMMON VULNERABILITIES | |
| # ============================================================ | |
| SASTRule( | |
| name="Unsafe Regex (ReDoS)", | |
| pattern=r"""(?:re\.compile|new\s+RegExp|regex|pattern)\s*\([^)]*(?:\+\*|\*\+|\.+\*|\.+\+|\(\.\*\)|\(\.\+\)|(?:\[[^\]]*\]){2,}\*|\{\d+,\}\*|\{\d+,\}\+)""", | |
| description="Potentially vulnerable regular expression that could cause ReDoS (Regular Expression Denial of Service).", | |
| risk_level=RiskLevel.MEDIUM, | |
| remediation="""Avoid nested quantifiers in regex: | |
| VULNERABLE: | |
| pattern = re.compile(r'(a+)+b') # Catastrophic backtracking | |
| SECURE: | |
| pattern = re.compile(r'a+b') # Simple, efficient | |
| # Or use atomic groups/possessive quantifiers where supported | |
| # Set timeouts for regex operations: | |
| import regex | |
| regex.match(pattern, text, timeout=1.0)""", | |
| cwe_id="CWE-1333", | |
| languages=['.py', '.java', '.js', '.ts', '.go', '.php', '.rb'] | |
| ), | |
| SASTRule( | |
| name="Prototype Pollution", | |
| pattern=r"""(?:Object\.assign|_\.merge|_\.extend|_\.defaults|jQuery\.extend|angular\.(?:merge|extend))\s*\([^,]*,\s*(?:req\.|request\.|params\.|body\.|input)|\[['"]__proto__['"]\]|\[['"]constructor['"]\]\.prototype""", | |
| description="Potential prototype pollution vulnerability. Merging user input into objects can modify Object.prototype.", | |
| risk_level=RiskLevel.HIGH, | |
| remediation="""Validate and sanitize object keys: | |
| VULNERABLE: | |
| Object.assign(target, req.body); | |
| _.merge(config, userInput); | |
| SECURE: | |
| // Use Object.create(null) for prototype-less objects | |
| const safeObj = Object.create(null); | |
| // Whitelist allowed properties | |
| const allowed = ['name', 'email']; | |
| for (const key of allowed) { | |
| if (key in userInput) { | |
| safeObj[key] = userInput[key]; | |
| } | |
| } | |
| // Or use libraries like 'lodash' with safeguards""", | |
| cwe_id="CWE-1321", | |
| languages=['.js', '.ts'] | |
| ), | |
| SASTRule( | |
| name="Open Redirect", | |
| pattern=r"""(?:redirect|res\.redirect|header\s*\(\s*['""]Location|window\.location|document\.location)\s*[=(]\s*(?:req\.|request\.|params\.|query\.|input|GET|POST|\$_)""", | |
| description="Potential open redirect vulnerability. User input controls redirect destination.", | |
| risk_level=RiskLevel.MEDIUM, | |
| remediation="""Validate redirect URLs: | |
| VULNERABLE: | |
| redirect_url = request.args.get('next') | |
| return redirect(redirect_url) | |
| SECURE: | |
| from urllib.parse import urlparse | |
| redirect_url = request.args.get('next', '/') | |
| parsed = urlparse(redirect_url) | |
| # Only allow relative URLs or specific domains | |
| if parsed.netloc and parsed.netloc != 'mysite.com': | |
| redirect_url = '/' | |
| return redirect(redirect_url)""", | |
| cwe_id="CWE-601", | |
| languages=['.py', '.java', '.js', '.ts', '.php', '.rb'] | |
| ), | |
| SASTRule( | |
| name="Mass Assignment", | |
| pattern=r"""(?:\.update_attributes|\.update\(|\.create\(|\.build\(|Model\.create|\.save\()\s*\(?[^)]*(?:req\.|request\.|params\[|body\[|:permit\s*\(\s*!)""", | |
| description="Potential mass assignment vulnerability. User input may modify unintended model attributes.", | |
| risk_level=RiskLevel.MEDIUM, | |
| remediation="""Whitelist allowed attributes: | |
| VULNERABLE (Rails): | |
| User.create(params[:user]) | |
| SECURE (Rails): | |
| User.create(params.require(:user).permit(:name, :email)) | |
| VULNERABLE (Django): | |
| User.objects.create(**request.POST) | |
| SECURE (Django): | |
| User.objects.create( | |
| name=request.POST.get('name'), | |
| email=request.POST.get('email') | |
| ) | |
| Always explicitly specify which fields can be mass-assigned.""", | |
| cwe_id="CWE-915", | |
| languages=['.py', '.rb', '.java', '.js', '.ts', '.php'] | |
| ), | |
| ] | |
| def scan_file(self, file_path: str) -> List[Vulnerability]: | |
| """ | |
| Scan a single file for vulnerabilities. | |
| Like running a specific diagnostic test on one tissue sample. | |
| """ | |
| vulnerabilities = [] | |
| try: | |
| file_ext = Path(file_path).suffix.lower() | |
| if file_ext not in self.file_extensions: | |
| return vulnerabilities | |
| with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
| code = f.read() | |
| for rule in self.rules: | |
| matches = rule.matches(code, file_ext) | |
| for line_num, snippet in matches: | |
| vuln = Vulnerability( | |
| name=rule.name, | |
| description=rule.description, | |
| file_path=file_path, | |
| line_number=line_num, | |
| code_snippet=snippet, | |
| risk_level=rule.risk_level, | |
| remediation=rule.remediation, | |
| cwe_id=rule.cwe_id | |
| ) | |
| vulnerabilities.append(vuln) | |
| except Exception as e: | |
| # Log but don't fail on individual file errors | |
| pass | |
| return vulnerabilities | |
| def scan_directory(self, directory: str, max_workers: int = 8, use_parallel: bool = True) -> Tuple[List[Vulnerability], int]: | |
| """ | |
| Recursively scan a directory with optional parallel processing. | |
| Like performing a full-body scan. | |
| Args: | |
| directory: Path to directory to scan | |
| max_workers: Number of parallel workers (default: 8) | |
| use_parallel: Whether to use parallel processing (default: True) | |
| Returns: | |
| Tuple of (vulnerabilities, files_scanned) | |
| """ | |
| vulnerabilities = [] | |
| files_scanned = 0 | |
| # Collect all files to scan | |
| files_to_scan = [] | |
| for root, dirs, files in os.walk(directory): | |
| # Skip unwanted directories | |
| dirs[:] = [d for d in dirs if d not in self.skip_dirs] | |
| for file in files: | |
| file_path = os.path.join(root, file) | |
| file_ext = Path(file_path).suffix.lower() | |
| if file_ext in self.file_extensions: | |
| files_to_scan.append(file_path) | |
| if not use_parallel or len(files_to_scan) <= 1: | |
| # Sequential processing (original behavior) | |
| for file_path in files_to_scan: | |
| files_scanned += 1 | |
| vulns = self.scan_file(file_path) | |
| vulnerabilities.extend(vulns) | |
| else: | |
| # Parallel processing using ThreadPoolExecutor | |
| with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
| # Submit all scan jobs | |
| future_to_file = { | |
| executor.submit(self.scan_file, file_path): file_path | |
| for file_path in files_to_scan | |
| } | |
| # Collect results as they complete | |
| for future in as_completed(future_to_file): | |
| file_path = future_to_file[future] | |
| try: | |
| vulns = future.result() | |
| vulnerabilities.extend(vulns) | |
| files_scanned += 1 | |
| except Exception as e: | |
| # Log error but continue with other files | |
| print(f"Error scanning {file_path}: {e}") | |
| files_scanned += 1 # Count as scanned even if error | |
| return vulnerabilities, files_scanned | |
| class NVDClient: | |
| """ | |
| NIST National Vulnerability Database Client | |
| Analogy: This is like searching a medical journal database | |
| - Searches for known vulnerabilities (diseases) by keyword | |
| - Retrieves detailed information including severity scores | |
| - Provides references to official documentation | |
| The NVD contains over 200,000 known vulnerabilities (CVEs) | |
| with detailed descriptions, severity scores, and references. | |
| """ | |
| BASE_URL = "https://services.nvd.nist.gov/rest/json/cves/2.0" | |
| def __init__(self, api_key: Optional[str] = None): | |
| """ | |
| Initialize NVD client. | |
| Args: | |
| api_key: Optional NVD API key for higher rate limits. | |
| Get one free at: https://nvd.nist.gov/developers/request-an-api-key | |
| """ | |
| self.api_key = api_key | |
| self.rate_limit_delay = 0.6 if api_key else 6.0 # NVD rate limits | |
| async def search_vulnerabilities( | |
| self, | |
| keyword: Optional[str] = None, | |
| cwe_id: Optional[str] = None, | |
| severity: Optional[str] = None, | |
| limit: int = 20 | |
| ) -> List[Dict]: | |
| """ | |
| Search the NVD for vulnerabilities. | |
| Args: | |
| keyword: Search term (e.g., "sql injection python") | |
| cwe_id: CWE ID to filter by (e.g., "CWE-89") | |
| severity: Severity level (LOW, MEDIUM, HIGH, CRITICAL) | |
| limit: Maximum results to return | |
| Returns: | |
| List of CVE entries with details | |
| """ | |
| params = {"resultsPerPage": min(limit, 100)} | |
| if keyword: | |
| params["keywordSearch"] = keyword | |
| if cwe_id: | |
| # Format: CWE-89 -> CWE-89 | |
| params["cweId"] = cwe_id | |
| if severity: | |
| params["cvssV3Severity"] = severity.upper() | |
| headers = {} | |
| if self.api_key: | |
| headers["apiKey"] = self.api_key | |
| try: | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get( | |
| self.BASE_URL, | |
| params=params, | |
| headers=headers, | |
| timeout=aiohttp.ClientTimeout(total=30) | |
| ) as response: | |
| if response.status == 200: | |
| data = await response.json() | |
| return self._parse_results(data) | |
| elif response.status == 403: | |
| return [{"error": "NVD API rate limited. Consider using an API key."}] | |
| else: | |
| return [{"error": f"NVD API error: {response.status}"}] | |
| except asyncio.TimeoutError: | |
| return [{"error": "NVD API request timed out"}] | |
| except Exception as e: | |
| return [{"error": f"NVD API error: {str(e)}"}] | |
| def _parse_results(self, data: Dict) -> List[Dict]: | |
| """Parse NVD API response into a cleaner format.""" | |
| results = [] | |
| for vuln in data.get("vulnerabilities", []): | |
| cve = vuln.get("cve", {}) | |
| cve_id = cve.get("id", "Unknown") | |
| # Get description | |
| descriptions = cve.get("descriptions", []) | |
| description = next( | |
| (d["value"] for d in descriptions if d.get("lang") == "en"), | |
| "No description available" | |
| ) | |
| # Get CVSS score and severity | |
| metrics = cve.get("metrics", {}) | |
| cvss_data = None | |
| severity = "UNKNOWN" | |
| score = 0.0 | |
| # Try CVSS v3.1, then v3.0, then v2.0 | |
| for version in ["cvssMetricV31", "cvssMetricV30", "cvssMetricV2"]: | |
| if version in metrics and metrics[version]: | |
| cvss_data = metrics[version][0] | |
| if "cvssData" in cvss_data: | |
| score = cvss_data["cvssData"].get("baseScore", 0) | |
| severity = cvss_data["cvssData"].get("baseSeverity", "UNKNOWN") | |
| break | |
| # Get references | |
| references = [ | |
| ref.get("url") for ref in cve.get("references", [])[:5] | |
| ] | |
| # Get CWE IDs | |
| cwes = [] | |
| for weakness in cve.get("weaknesses", []): | |
| for desc in weakness.get("description", []): | |
| if desc.get("value", "").startswith("CWE-"): | |
| cwes.append(desc["value"]) | |
| results.append({ | |
| "cve_id": cve_id, | |
| "description": description[:500] + "..." if len(description) > 500 else description, | |
| "severity": severity, | |
| "cvss_score": score, | |
| "cwes": cwes, | |
| "references": references, | |
| "published": cve.get("published", "Unknown"), | |
| "last_modified": cve.get("lastModified", "Unknown") | |
| }) | |
| return results | |
| async def get_cve_details(self, cve_id: str) -> Optional[Dict]: | |
| """ | |
| Get detailed information about a specific CVE. | |
| Args: | |
| cve_id: CVE identifier (e.g., "CVE-2021-44228") | |
| Returns: | |
| Detailed CVE information or None if not found | |
| """ | |
| params = {"cveId": cve_id} | |
| headers = {} | |
| if self.api_key: | |
| headers["apiKey"] = self.api_key | |
| try: | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get( | |
| self.BASE_URL, | |
| params=params, | |
| headers=headers, | |
| timeout=aiohttp.ClientTimeout(total=30) | |
| ) as response: | |
| if response.status == 200: | |
| data = await response.json() | |
| results = self._parse_results(data) | |
| return results[0] if results else None | |
| return None | |
| except Exception: | |
| return None | |
| async def find_related_cves(self, cwe_id: str, limit: int = 10) -> List[Dict]: | |
| """ | |
| Find CVEs related to a specific CWE. | |
| This helps answer "What known attacks use this vulnerability type?" | |
| Args: | |
| cwe_id: CWE identifier (e.g., "CWE-89") | |
| limit: Maximum results | |
| Returns: | |
| List of related CVEs | |
| """ | |
| return await self.search_vulnerabilities(cwe_id=cwe_id, limit=limit) | |
| class WebAppScanner: | |
| """ | |
| Web Application Scanner | |
| Analogy: Like a physical security inspector | |
| - Checks doors and windows (endpoints) | |
| - Tests locks (authentication) | |
| - Looks for signs of vulnerability | |
| This performs basic web security checks without being intrusive. | |
| For full web app testing, specialized tools like OWASP ZAP are recommended. | |
| """ | |
| def __init__(self): | |
| self.common_paths = [ | |
| # Admin paths | |
| "/admin", "/administrator", "/admin.php", "/admin.html", | |
| "/wp-admin", "/cpanel", "/phpmyadmin", | |
| # Sensitive files | |
| "/.git/config", "/.env", "/config.php", "/wp-config.php", | |
| "/.htaccess", "/web.config", "/robots.txt", "/sitemap.xml", | |
| # Backup files | |
| "/backup.zip", "/backup.sql", "/db.sql", "/database.sql", | |
| # API endpoints | |
| "/api", "/api/v1", "/graphql", "/swagger.json", "/openapi.json", | |
| # Debug/test | |
| "/debug", "/test", "/phpinfo.php", "/info.php", | |
| ] | |
| self.security_headers = [ | |
| "Content-Security-Policy", | |
| "X-Frame-Options", | |
| "X-Content-Type-Options", | |
| "X-XSS-Protection", | |
| "Strict-Transport-Security", | |
| "Referrer-Policy", | |
| "Permissions-Policy" | |
| ] | |
| async def scan_url(self, url: str) -> List[Vulnerability]: | |
| """ | |
| Perform security scan on a web application. | |
| Args: | |
| url: Target URL (e.g., "https://example.com") | |
| Returns: | |
| List of discovered vulnerabilities | |
| """ | |
| vulnerabilities = [] | |
| # Normalize URL | |
| if not url.startswith(('http://', 'https://')): | |
| url = 'https://' + url | |
| parsed = urlparse(url) | |
| base_url = f"{parsed.scheme}://{parsed.netloc}" | |
| async with aiohttp.ClientSession() as session: | |
| # Check security headers | |
| header_vulns = await self._check_security_headers(session, base_url) | |
| vulnerabilities.extend(header_vulns) | |
| # Check for exposed sensitive files | |
| exposure_vulns = await self._check_sensitive_paths(session, base_url) | |
| vulnerabilities.extend(exposure_vulns) | |
| # Check HTTPS configuration | |
| https_vulns = await self._check_https(session, url) | |
| vulnerabilities.extend(https_vulns) | |
| # Check for common vulnerabilities in responses | |
| content_vulns = await self._check_response_content(session, base_url) | |
| vulnerabilities.extend(content_vulns) | |
| return vulnerabilities | |
| async def _check_security_headers( | |
| self, session: aiohttp.ClientSession, url: str | |
| ) -> List[Vulnerability]: | |
| """Check for missing or misconfigured security headers.""" | |
| vulnerabilities = [] | |
| try: | |
| async with session.head(url, timeout=aiohttp.ClientTimeout(total=10)) as response: | |
| headers = response.headers | |
| for header in self.security_headers: | |
| if header not in headers: | |
| vuln = Vulnerability( | |
| name=f"Missing Security Header: {header}", | |
| description=f"The {header} header is not set. This header helps protect against various attacks.", | |
| file_path=url, | |
| line_number=0, | |
| code_snippet=f"Response headers do not include {header}", | |
| risk_level=RiskLevel.LOW if header != "Content-Security-Policy" else RiskLevel.MEDIUM, | |
| remediation=self._get_header_remediation(header), | |
| cwe_id="CWE-693" | |
| ) | |
| vulnerabilities.append(vuln) | |
| # Check for server version disclosure | |
| if "Server" in headers and any(v in headers["Server"].lower() for v in ["apache/", "nginx/", "iis/"]): | |
| vuln = Vulnerability( | |
| name="Server Version Disclosure", | |
| description=f"Server header reveals version information: {headers['Server']}", | |
| file_path=url, | |
| line_number=0, | |
| code_snippet=f"Server: {headers['Server']}", | |
| risk_level=RiskLevel.INFO, | |
| remediation="Configure your web server to hide version information. In Apache, use 'ServerTokens Prod'. In Nginx, use 'server_tokens off'.", | |
| cwe_id="CWE-200" | |
| ) | |
| vulnerabilities.append(vuln) | |
| except Exception: | |
| pass | |
| return vulnerabilities | |
| async def _check_sensitive_paths( | |
| self, session: aiohttp.ClientSession, base_url: str | |
| ) -> List[Vulnerability]: | |
| """Check for exposed sensitive files and directories.""" | |
| vulnerabilities = [] | |
| async def check_path(path: str): | |
| try: | |
| url = f"{base_url}{path}" | |
| async with session.get( | |
| url, | |
| timeout=aiohttp.ClientTimeout(total=5), | |
| allow_redirects=False | |
| ) as response: | |
| if response.status == 200: | |
| return path, response.status | |
| return None | |
| except Exception: | |
| return None | |
| # Check paths concurrently | |
| tasks = [check_path(path) for path in self.common_paths] | |
| results = await asyncio.gather(*tasks) | |
| for result in results: | |
| if result: | |
| path, status = result | |
| risk = RiskLevel.HIGH if any( | |
| s in path for s in ['.git', '.env', 'config', 'backup', 'sql'] | |
| ) else RiskLevel.MEDIUM | |
| vuln = Vulnerability( | |
| name=f"Exposed Sensitive Path: {path}", | |
| description=f"The path {path} is accessible and may expose sensitive information.", | |
| file_path=f"{base_url}{path}", | |
| line_number=0, | |
| code_snippet=f"HTTP {status} returned for {path}", | |
| risk_level=risk, | |
| remediation=f"Restrict access to {path} using web server configuration. Add authentication or remove from public access.", | |
| cwe_id="CWE-538" | |
| ) | |
| vulnerabilities.append(vuln) | |
| return vulnerabilities | |
| async def _check_https( | |
| self, session: aiohttp.ClientSession, url: str | |
| ) -> List[Vulnerability]: | |
| """Check HTTPS configuration.""" | |
| vulnerabilities = [] | |
| parsed = urlparse(url) | |
| if parsed.scheme == "http": | |
| vuln = Vulnerability( | |
| name="Insecure HTTP Connection", | |
| description="The target is using HTTP instead of HTTPS. All data transmitted is unencrypted.", | |
| file_path=url, | |
| line_number=0, | |
| code_snippet=f"URL scheme: {parsed.scheme}", | |
| risk_level=RiskLevel.HIGH, | |
| remediation="Enable HTTPS with a valid TLS certificate. Consider using Let's Encrypt for free certificates. Configure HSTS to prevent downgrade attacks.", | |
| cwe_id="CWE-319" | |
| ) | |
| vulnerabilities.append(vuln) | |
| return vulnerabilities | |
| async def _check_response_content( | |
| self, session: aiohttp.ClientSession, base_url: str | |
| ) -> List[Vulnerability]: | |
| """Check response content for potential vulnerabilities.""" | |
| vulnerabilities = [] | |
| try: | |
| async with session.get( | |
| base_url, | |
| timeout=aiohttp.ClientTimeout(total=10) | |
| ) as response: | |
| if response.status == 200: | |
| content = await response.text() | |
| # Check for error messages that reveal information | |
| error_patterns = [ | |
| (r"mysql_error|mysqli_error|pg_error", "Database Error Disclosure", "CWE-209"), | |
| (r"stack\s*trace|traceback|exception.*at\s+line", "Stack Trace Disclosure", "CWE-209"), | |
| (r"debug\s*=\s*true|debug_mode|development_mode", "Debug Mode Enabled", "CWE-215"), | |
| (r"<!--.*(?:password|api.?key|secret).*-->", "Sensitive Data in Comments", "CWE-615"), | |
| ] | |
| for pattern, name, cwe in error_patterns: | |
| if re.search(pattern, content, re.IGNORECASE): | |
| vuln = Vulnerability( | |
| name=name, | |
| description=f"The response contains {name.lower()} which may reveal sensitive information.", | |
| file_path=base_url, | |
| line_number=0, | |
| code_snippet=f"Pattern detected: {pattern}", | |
| risk_level=RiskLevel.MEDIUM, | |
| remediation=f"Remove {name.lower()} from production responses. Configure error handling to show generic messages.", | |
| cwe_id=cwe | |
| ) | |
| vulnerabilities.append(vuln) | |
| except Exception: | |
| pass | |
| return vulnerabilities | |
| def _get_header_remediation(self, header: str) -> str: | |
| """Get specific remediation advice for missing headers.""" | |
| remediations = { | |
| "Content-Security-Policy": "Add CSP header to control resource loading. Start with: Content-Security-Policy: default-src 'self'", | |
| "X-Frame-Options": "Add: X-Frame-Options: DENY (or SAMEORIGIN if you need framing)", | |
| "X-Content-Type-Options": "Add: X-Content-Type-Options: nosniff", | |
| "X-XSS-Protection": "Add: X-XSS-Protection: 1; mode=block (note: deprecated in favor of CSP)", | |
| "Strict-Transport-Security": "Add: Strict-Transport-Security: max-age=31536000; includeSubDomains", | |
| "Referrer-Policy": "Add: Referrer-Policy: strict-origin-when-cross-origin", | |
| "Permissions-Policy": "Add: Permissions-Policy: geolocation=(), microphone=(), camera=()" | |
| } | |
| return remediations.get(header, f"Configure the {header} header appropriately.") | |
| class SecurityChecker: | |
| """ | |
| Main Security Checker - Orchestrates all scanning capabilities. | |
| Analogy: This is like a complete medical center | |
| - Diagnostic imaging (SAST) | |
| - Medical database (NVD) | |
| - Physical examination (Web Scanner) | |
| - Report generation (Results) | |
| Usage: | |
| checker = SecurityChecker() | |
| # Scan local code | |
| result = await checker.scan_local("/path/to/project") | |
| # Scan web app | |
| result = await checker.scan_web("https://example.com") | |
| # Generate report | |
| report = checker.generate_report(result) | |
| """ | |
| def __init__(self, nvd_api_key: Optional[str] = None): | |
| self.sast_engine = SASTEngine() | |
| self.nvd_client = NVDClient(api_key=nvd_api_key) | |
| self.web_scanner = WebAppScanner() | |
| async def scan_local(self, path: str, include_nvd: bool = True, max_workers: int = 8, use_parallel: bool = True) -> ScanResult: | |
| """ | |
| Scan a local directory for vulnerabilities. | |
| Args: | |
| path: Path to directory or file | |
| include_nvd: Whether to enrich results with NVD data | |
| max_workers: Number of parallel workers for file scanning (default: 8) | |
| use_parallel: Whether to use parallel processing (default: True) | |
| Returns: | |
| ScanResult with all findings | |
| """ | |
| result = ScanResult( | |
| target=path, | |
| scan_type="local", | |
| start_time=datetime.now() | |
| ) | |
| if not os.path.exists(path): | |
| result.errors.append(f"Path does not exist: {path}") | |
| result.end_time = datetime.now() | |
| return result | |
| # Run SAST scan | |
| if os.path.isfile(path): | |
| vulns = self.sast_engine.scan_file(path) | |
| result.files_scanned = 1 | |
| else: | |
| vulns, files_scanned = self.sast_engine.scan_directory( | |
| path, | |
| max_workers=max_workers, | |
| use_parallel=use_parallel | |
| ) | |
| result.files_scanned = files_scanned | |
| # Enrich with NVD data if requested | |
| if include_nvd and vulns: | |
| vulns = await self._enrich_with_nvd(vulns) | |
| result.vulnerabilities = vulns | |
| result.end_time = datetime.now() | |
| return result | |
| async def scan_web(self, url: str, include_nvd: bool = True) -> ScanResult: | |
| """ | |
| Scan a web application for vulnerabilities. | |
| Args: | |
| url: Target URL | |
| include_nvd: Whether to enrich results with NVD data | |
| Returns: | |
| ScanResult with all findings | |
| """ | |
| result = ScanResult( | |
| target=url, | |
| scan_type="web", | |
| start_time=datetime.now() | |
| ) | |
| try: | |
| vulns = await self.web_scanner.scan_url(url) | |
| if include_nvd and vulns: | |
| vulns = await self._enrich_with_nvd(vulns) | |
| result.vulnerabilities = vulns | |
| result.files_scanned = 1 # One URL scanned | |
| except Exception as e: | |
| result.errors.append(str(e)) | |
| result.end_time = datetime.now() | |
| return result | |
| async def _enrich_with_nvd( | |
| self, vulnerabilities: List[Vulnerability] | |
| ) -> List[Vulnerability]: | |
| """ | |
| Enrich vulnerability findings with NVD data. | |
| This adds related CVEs to each finding, showing real-world | |
| examples of the vulnerability being exploited. | |
| """ | |
| # Group by CWE to reduce API calls | |
| cwe_groups = {} | |
| for vuln in vulnerabilities: | |
| if vuln.cwe_id: | |
| if vuln.cwe_id not in cwe_groups: | |
| cwe_groups[vuln.cwe_id] = [] | |
| cwe_groups[vuln.cwe_id].append(vuln) | |
| # Fetch CVEs for each CWE | |
| for cwe_id, vuln_list in cwe_groups.items(): | |
| try: | |
| cves = await self.nvd_client.find_related_cves(cwe_id, limit=5) | |
| cve_ids = [cve.get("cve_id") for cve in cves if "error" not in cve] | |
| for vuln in vuln_list: | |
| vuln.cve_ids = cve_ids[:3] # Add top 3 related CVEs | |
| # Rate limiting | |
| await asyncio.sleep(self.nvd_client.rate_limit_delay) | |
| except Exception: | |
| pass | |
| return vulnerabilities | |
| async def search_nvd( | |
| self, | |
| keyword: Optional[str] = None, | |
| cwe_id: Optional[str] = None, | |
| severity: Optional[str] = None | |
| ) -> List[Dict]: | |
| """ | |
| Search the NVD directly. | |
| Useful for researching specific vulnerabilities. | |
| """ | |
| return await self.nvd_client.search_vulnerabilities( | |
| keyword=keyword, | |
| cwe_id=cwe_id, | |
| severity=severity | |
| ) | |
| def generate_report( | |
| self, | |
| result: ScanResult, | |
| format: str = "text" | |
| ) -> str: | |
| """ | |
| Generate a human-readable report. | |
| Args: | |
| result: ScanResult from a scan | |
| format: Output format ("text", "json", "html") | |
| Returns: | |
| Formatted report string | |
| """ | |
| if format == "json": | |
| return self._generate_json_report(result) | |
| elif format == "markdown": | |
| return self._generate_markdown_report(result) | |
| else: | |
| return self._generate_text_report(result) | |
| def _generate_text_report(self, result: ScanResult) -> str: | |
| """Generate plain text report.""" | |
| lines = [ | |
| "=" * 70, | |
| "SECURITY SCAN REPORT", | |
| "=" * 70, | |
| "", | |
| f"Target: {result.target}", | |
| f"Scan Type: {result.scan_type.upper()}", | |
| f"Start Time: {result.start_time.strftime('%Y-%m-%d %H:%M:%S')}", | |
| f"End Time: {result.end_time.strftime('%Y-%m-%d %H:%M:%S') if result.end_time else 'N/A'}", | |
| f"Files Scanned: {result.files_scanned}", | |
| "", | |
| ] | |
| # Summary | |
| summary = result.summary() | |
| lines.extend([ | |
| "-" * 70, | |
| "SUMMARY", | |
| "-" * 70, | |
| f"Total Vulnerabilities: {summary['total_vulnerabilities']}", | |
| "", | |
| "By Severity:", | |
| ]) | |
| for severity, count in summary["by_severity"].items(): | |
| if count > 0: | |
| lines.append(f" {severity}: {count}") | |
| lines.append("") | |
| if result.errors: | |
| lines.extend([ | |
| "-" * 70, | |
| "ERRORS", | |
| "-" * 70, | |
| ]) | |
| for error in result.errors: | |
| lines.append(f" • {error}") | |
| lines.append("") | |
| # Vulnerabilities by severity | |
| if result.vulnerabilities: | |
| lines.extend([ | |
| "-" * 70, | |
| "DETAILED FINDINGS", | |
| "-" * 70, | |
| "", | |
| ]) | |
| # Sort by severity | |
| severity_order = { | |
| RiskLevel.CRITICAL: 0, | |
| RiskLevel.HIGH: 1, | |
| RiskLevel.MEDIUM: 2, | |
| RiskLevel.LOW: 3, | |
| RiskLevel.INFO: 4 | |
| } | |
| sorted_vulns = sorted( | |
| result.vulnerabilities, | |
| key=lambda v: severity_order.get(v.risk_level, 5) | |
| ) | |
| for i, vuln in enumerate(sorted_vulns, 1): | |
| lines.extend([ | |
| f"[{i}] {vuln.name}", | |
| f" Severity: {vuln.risk_level.value}", | |
| f" Location: {vuln.file_path}:{vuln.line_number}", | |
| f" CWE: {vuln.cwe_id or 'N/A'}", | |
| "", | |
| f" Description:", | |
| f" {vuln.description}", | |
| "", | |
| f" Code:", | |
| " " + "-" * 40, | |
| ]) | |
| for line in vuln.code_snippet.split('\n'): | |
| lines.append(f" {line}") | |
| lines.extend([ | |
| " " + "-" * 40, | |
| "", | |
| f" Remediation:", | |
| ]) | |
| if vuln.remediation and vuln.remediation != "No known solution": | |
| for line in vuln.remediation.split('\n'): | |
| lines.append(f" {line}") | |
| else: | |
| lines.append(" No known solution") | |
| if vuln.cve_ids: | |
| lines.extend([ | |
| "", | |
| f" Related CVEs: {', '.join(vuln.cve_ids)}", | |
| ]) | |
| lines.extend(["", ""]) | |
| lines.extend([ | |
| "=" * 70, | |
| "END OF REPORT", | |
| "=" * 70, | |
| ]) | |
| return "\n".join(lines) | |
| def _generate_json_report(self, result: ScanResult) -> str: | |
| """Generate JSON report.""" | |
| report = { | |
| "target": result.target, | |
| "scan_type": result.scan_type, | |
| "start_time": result.start_time.isoformat(), | |
| "end_time": result.end_time.isoformat() if result.end_time else None, | |
| "summary": result.summary(), | |
| "vulnerabilities": [v.to_dict() for v in result.vulnerabilities], | |
| "errors": result.errors | |
| } | |
| return json.dumps(report, indent=2) | |
| def _generate_markdown_report(self, result: ScanResult) -> str: | |
| """Generate Markdown report optimized for vibe-coding platforms.""" | |
| summary = result.summary() | |
| severity_order = ["CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO"] | |
| lines = [] | |
| lines.append("# Security Scan Report\n") | |
| lines.append(f"**Target:** `{result.target}` ") | |
| lines.append(f"**Scan Type:** {result.scan_type.upper()} ") | |
| lines.append(f"**Date:** {result.start_time.strftime('%Y-%m-%d %H:%M:%S')} ") | |
| lines.append(f"**Files Scanned:** {result.files_scanned} ") | |
| # Summary line | |
| counts = [] | |
| for sev in severity_order: | |
| count = summary['by_severity'].get(sev, 0) | |
| if count > 0: | |
| counts.append(f"{count} {sev.capitalize()}") | |
| total = summary['total_vulnerabilities'] | |
| lines.append(f"**Total Vulnerabilities:** {total}" + (f" ({', '.join(counts)})" if counts else "")) | |
| lines.append("\n---\n") | |
| if total == 0: | |
| lines.append("No vulnerabilities found.\n") | |
| return "\n".join(lines) | |
| # Group vulnerabilities by severity | |
| by_severity = {} | |
| for vuln in result.vulnerabilities: | |
| sev = vuln.risk_level.value | |
| by_severity.setdefault(sev, []).append(vuln) | |
| finding_num = 0 | |
| for sev in severity_order: | |
| vulns = by_severity.get(sev, []) | |
| if not vulns: | |
| continue | |
| lines.append(f"## {sev.capitalize()}\n") | |
| for vuln in vulns: | |
| finding_num += 1 | |
| cwe = f" ({vuln.cwe_id})" if vuln.cwe_id else "" | |
| lines.append(f"### {finding_num}. {vuln.name}{cwe}\n") | |
| lines.append(f"- **File:** `{vuln.file_path}:{vuln.line_number}`") | |
| lines.append(f"- **Confidence:** {vuln.confidence}") | |
| lines.append(f"- **Description:** {vuln.description}") | |
| if vuln.code_snippet and vuln.code_snippet.strip(): | |
| ext = os.path.splitext(vuln.file_path)[1].lstrip(".") | |
| lang = ext if ext else "" | |
| lines.append(f"- **Code:**") | |
| lines.append(f" ```{lang}") | |
| lines.append(f" {vuln.code_snippet.strip()}") | |
| lines.append(f" ```") | |
| if vuln.remediation: | |
| lines.append(f"- **Remediation:** {vuln.remediation.strip()}") | |
| if vuln.cve_ids: | |
| lines.append(f"- **Related CVEs:** {', '.join(vuln.cve_ids)}") | |
| lines.append("") | |
| lines.append("---\n") | |
| lines.append(f"*Generated by Security Auditor | {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*\n") | |
| return "\n".join(lines) | |
| # ============================================================ | |
| # CLI Interface | |
| # ============================================================ | |
| async def main(): | |
| """Command-line interface for the security checker.""" | |
| import argparse | |
| parser = argparse.ArgumentParser( | |
| description="Security Checker - SAST and NVD-powered vulnerability scanner", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| # Scan a local project | |
| python security_checker.py /path/to/project | |
| # Scan a web application | |
| python security_checker.py https://example.com --web | |
| # Generate HTML report | |
| python security_checker.py /path/to/project --format html -o report.html | |
| # Search NVD for SQL injection vulnerabilities | |
| python security_checker.py --nvd-search "sql injection" --severity HIGH | |
| """ | |
| ) | |
| parser.add_argument( | |
| "target", | |
| nargs="?", | |
| help="Target to scan (local path or URL)" | |
| ) | |
| parser.add_argument( | |
| "--web", | |
| action="store_true", | |
| help="Treat target as web URL" | |
| ) | |
| parser.add_argument( | |
| "--format", | |
| choices=["text", "json", "html"], | |
| default="text", | |
| help="Output format (default: text)" | |
| ) | |
| parser.add_argument( | |
| "-o", "--output", | |
| help="Output file (default: stdout)" | |
| ) | |
| parser.add_argument( | |
| "--nvd-api-key", | |
| help="NVD API key for higher rate limits" | |
| ) | |
| parser.add_argument( | |
| "--no-nvd", | |
| action="store_true", | |
| help="Skip NVD enrichment" | |
| ) | |
| parser.add_argument( | |
| "--nvd-search", | |
| help="Search NVD for vulnerabilities by keyword" | |
| ) | |
| parser.add_argument( | |
| "--severity", | |
| choices=["LOW", "MEDIUM", "HIGH", "CRITICAL"], | |
| help="Filter NVD search by severity" | |
| ) | |
| args = parser.parse_args() | |
| # Initialize checker | |
| checker = SecurityChecker(nvd_api_key=args.nvd_api_key) | |
| # NVD search mode | |
| if args.nvd_search: | |
| print(f"Searching NVD for: {args.nvd_search}") | |
| results = await checker.search_nvd( | |
| keyword=args.nvd_search, | |
| severity=args.severity | |
| ) | |
| if results and "error" not in results[0]: | |
| for cve in results: | |
| print(f"\n{cve['cve_id']} ({cve['severity']} - {cve['cvss_score']})") | |
| print(f" {cve['description'][:200]}...") | |
| if cve['cwes']: | |
| print(f" CWEs: {', '.join(cve['cwes'])}") | |
| else: | |
| print(f"Error: {results[0].get('error', 'Unknown error')}") | |
| return | |
| # Require target for scanning | |
| if not args.target: | |
| parser.print_help() | |
| return | |
| # Perform scan | |
| print(f"Scanning: {args.target}") | |
| print("This may take a moment...") | |
| if args.web or args.target.startswith(('http://', 'https://')): | |
| result = await checker.scan_web(args.target, include_nvd=not args.no_nvd) | |
| else: | |
| result = await checker.scan_local(args.target, include_nvd=not args.no_nvd) | |
| # Generate report | |
| report = checker.generate_report(result, format=args.format) | |
| # Output | |
| if args.output: | |
| with open(args.output, 'w') as f: | |
| f.write(report) | |
| print(f"Report saved to: {args.output}") | |
| else: | |
| print(report) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |