#!/usr/bin/env python3 """ Security Checker Application ============================ A comprehensive security analysis tool that combines: 1. Static Application Security Testing (SAST) 2. NIST National Vulnerability Database (NVD) integration Think of this as a "security doctor" for your applications: - SAST = X-ray machine (looks inside without running) - NVD = Medical database (known vulnerabilities/diseases) - Report = Diagnosis with treatment plan Author: Security Checker Project """ import os import re import json import hashlib import asyncio import aiohttp from pathlib import Path from dataclasses import dataclass, field from typing import List, Dict, Optional, Tuple from enum import Enum from datetime import datetime from urllib.parse import urlparse import fnmatch from concurrent.futures import ThreadPoolExecutor, as_completed class RiskLevel(Enum): """ Risk levels follow CVSS (Common Vulnerability Scoring System). Think of it like triage in an emergency room: - CRITICAL: Life-threatening, needs immediate attention - HIGH: Serious condition, treat soon - MEDIUM: Concerning, schedule treatment - LOW: Minor issue, monitor - INFO: Just a note for awareness """ CRITICAL = "CRITICAL" # CVSS 9.0-10.0 HIGH = "HIGH" # CVSS 7.0-8.9 MEDIUM = "MEDIUM" # CVSS 4.0-6.9 LOW = "LOW" # CVSS 0.1-3.9 INFO = "INFO" # Informational @dataclass class Vulnerability: """ Represents a single vulnerability found in the code. Analogy: This is like a medical diagnosis report entry: - name: Disease name - description: What's wrong - file_path: Where in the body (code) the problem is - line_number: Exact location - code_snippet: The problematic tissue sample - risk_level: How serious is it - remediation: Treatment plan - cve_ids: Reference to known disease database (NVD) """ name: str description: str file_path: str line_number: int code_snippet: str risk_level: RiskLevel remediation: str cve_ids: List[str] = field(default_factory=list) cwe_id: Optional[str] = None confidence: str = "HIGH" # HIGH, MEDIUM, LOW def to_dict(self) -> Dict: return { "name": self.name, "description": self.description, "file_path": self.file_path, "line_number": self.line_number, "code_snippet": self.code_snippet, "risk_level": self.risk_level.value, "remediation": self.remediation, "cve_ids": self.cve_ids, "cwe_id": self.cwe_id, "confidence": self.confidence } @dataclass class ScanResult: """ Complete scan results - like a full medical report. """ target: str scan_type: str # "local" or "web" start_time: datetime end_time: Optional[datetime] = None vulnerabilities: List[Vulnerability] = field(default_factory=list) files_scanned: int = 0 errors: List[str] = field(default_factory=list) def summary(self) -> Dict: """Generate a summary of findings by risk level.""" summary = {level.value: 0 for level in RiskLevel} for vuln in self.vulnerabilities: summary[vuln.risk_level.value] += 1 return { "target": self.target, "scan_type": self.scan_type, "duration_seconds": (self.end_time - self.start_time).total_seconds() if self.end_time else None, "files_scanned": self.files_scanned, "total_vulnerabilities": len(self.vulnerabilities), "by_severity": summary, "errors": len(self.errors) } class SASTRule: """ A single SAST detection rule. Analogy: Like a specific test in a medical lab - pattern: What symptom to look for - name: Name of the condition - languages: Which "body types" this applies to """ def __init__( self, name: str, pattern: str, description: str, risk_level: RiskLevel, remediation: str, cwe_id: str, languages: List[str], false_positive_patterns: List[str] = None ): self.name = name self.pattern = re.compile(pattern, re.IGNORECASE | re.MULTILINE) self.description = description self.risk_level = risk_level self.remediation = remediation self.cwe_id = cwe_id self.languages = languages # File extensions: ['.py', '.js', etc.] self.false_positive_patterns = [ re.compile(fp, re.IGNORECASE) for fp in (false_positive_patterns or []) ] def matches(self, code: str, file_ext: str) -> List[Tuple[int, str]]: """ Find all matches in the code. Returns list of (line_number, matched_snippet). """ if file_ext not in self.languages: return [] matches = [] lines = code.split('\n') for i, line in enumerate(lines, 1): if self.pattern.search(line): # Check for false positives is_false_positive = any( fp.search(line) for fp in self.false_positive_patterns ) if not is_false_positive: # Get context (line before and after) start = max(0, i - 2) end = min(len(lines), i + 1) snippet = '\n'.join(lines[start:end]) matches.append((i, snippet)) return matches class SASTEngine: """ Static Application Security Testing Engine Analogy: This is like a diagnostic imaging department - Scans code without executing it (like X-ray/MRI) - Looks for known vulnerability patterns - Reports findings with locations How it works: 1. Load detection rules (what to look for) 2. Read source files 3. Match patterns against code 4. Report findings """ def __init__(self): self.rules = self._load_rules() self.file_extensions = { '.py': 'python', '.js': 'javascript', '.ts': 'typescript', '.jsx': 'javascript', '.tsx': 'typescript', '.java': 'java', '.php': 'php', '.rb': 'ruby', '.go': 'go', '.cs': 'csharp', '.c': 'c', '.cpp': 'cpp', '.h': 'c', '.hpp': 'cpp', '.sql': 'sql', '.html': 'html', '.htm': 'html', '.xml': 'xml', '.yml': 'yaml', '.yaml': 'yaml', '.json': 'json', '.sh': 'shell', '.bash': 'shell', } # Directories to skip (like avoiding scanning healthy tissue) self.skip_dirs = { 'node_modules', 'venv', '.venv', 'env', '.env', '__pycache__', '.git', '.svn', '.hg', 'dist', 'build', 'target', 'vendor', '.idea', '.vscode', 'coverage' } def _load_rules(self) -> List[SASTRule]: """ Load vulnerability detection rules. These rules are like a checklist of known security problems. Each rule defines: - A pattern to match (regex) - The type of vulnerability - How severe it is - How to fix it """ return [ # ============================================================ # INJECTION VULNERABILITIES (The "contamination" category) # Like checking for contaminants in food/medicine # ============================================================ SASTRule( name="SQL Injection", pattern=r"""(?:execute|cursor\.execute|query|raw|rawQuery|executeQuery)\s*\(\s*[f"'].*?%s.*?['"]\s*%|(?:execute|cursor\.execute)\s*\(\s*[f"'].*?\{.*?\}.*?['"]|(?:SELECT|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER).*?['"]\s*\+\s*|f['"]\s*(?:SELECT|INSERT|UPDATE|DELETE).*?\{""", description="Potential SQL Injection vulnerability. User input may be directly concatenated into SQL queries, allowing attackers to manipulate database operations.", risk_level=RiskLevel.CRITICAL, remediation="""Use parameterized queries or prepared statements: VULNERABLE: cursor.execute(f"SELECT * FROM users WHERE id = {user_id}") SECURE: cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,)) For ORMs, use built-in query builders instead of raw SQL.""", cwe_id="CWE-89", languages=['.py', '.java', '.php', '.js', '.ts', '.rb', '.go', '.cs'], false_positive_patterns=[r'#.*SQL', r'//.*SQL', r'/\*.*SQL'] ), SASTRule( name="Command Injection", pattern=r"""(?:os\.system|os\.popen|subprocess\.call|subprocess\.run|subprocess\.Popen|exec|eval|Runtime\.getRuntime\(\)\.exec|shell_exec|system|passthru|popen)\s*\([^)]*(?:\+|%|\.format|\{|\$)""", description="Potential Command Injection vulnerability. User input may be passed to system commands, allowing attackers to execute arbitrary commands.", risk_level=RiskLevel.CRITICAL, remediation="""Avoid passing user input to shell commands. If necessary: VULNERABLE: os.system(f"ping {user_input}") SECURE: import shlex subprocess.run(["ping", shlex.quote(user_input)], shell=False) Best practice: Use libraries instead of shell commands when possible.""", cwe_id="CWE-78", languages=['.py', '.java', '.php', '.js', '.ts', '.rb', '.go', '.sh'] ), SASTRule( name="XSS (Cross-Site Scripting)", pattern=r"""(?:innerHTML|outerHTML|document\.write|\.html\(|v-html|dangerouslySetInnerHTML|\[innerHTML\])\s*=?\s*(?:[^;]*(?:\+|`|\$\{))""", description="Potential Cross-Site Scripting (XSS) vulnerability. Untrusted data may be inserted into the DOM without proper encoding.", risk_level=RiskLevel.HIGH, remediation="""Sanitize and encode output before inserting into HTML: VULNERABLE: element.innerHTML = userInput; SECURE: element.textContent = userInput; // For text // Or use a sanitization library like DOMPurify: element.innerHTML = DOMPurify.sanitize(userInput); For React, avoid dangerouslySetInnerHTML unless absolutely necessary.""", cwe_id="CWE-79", languages=['.js', '.ts', '.jsx', '.tsx', '.html', '.php'] ), SASTRule( name="Path Traversal", pattern=r"""(?:open|read|write|file_get_contents|file_put_contents|include|require|fopen|readFile|writeFile|createReadStream)\s*\([^)]*(?:\+|`|\$\{|\.\./)""", description="Potential Path Traversal vulnerability. User input may be used to construct file paths, allowing attackers to access unauthorized files.", risk_level=RiskLevel.HIGH, remediation="""Validate and sanitize file paths: VULNERABLE: with open(f"/uploads/{filename}") as f: SECURE: import os safe_path = os.path.normpath(filename) if '..' in safe_path or safe_path.startswith('/'): raise ValueError("Invalid path") full_path = os.path.join(UPLOAD_DIR, safe_path) if not full_path.startswith(UPLOAD_DIR): raise ValueError("Path traversal detected")""", cwe_id="CWE-22", languages=['.py', '.java', '.php', '.js', '.ts', '.rb', '.go'] ), SASTRule( name="LDAP Injection", pattern=r"""(?:ldap_search|ldap_bind|search_s|search_ext_s)\s*\([^)]*(?:\+|%|\.format|\{)""", description="Potential LDAP Injection vulnerability. User input may be used in LDAP queries without proper escaping.", risk_level=RiskLevel.HIGH, remediation="""Escape special LDAP characters in user input: VULNERABLE: ldap.search_s(base, scope, f"(uid={username})") SECURE: from ldap3.utils.conv import escape_filter_chars safe_username = escape_filter_chars(username) ldap.search_s(base, scope, f"(uid={safe_username})")""", cwe_id="CWE-90", languages=['.py', '.java', '.php', '.cs'] ), # ============================================================ # AUTHENTICATION & SESSION VULNERABILITIES # Like checking if the locks and keys are secure # ============================================================ SASTRule( name="Hardcoded Credentials", pattern=r"""(?:password|passwd|pwd|secret|api_key|apikey|api_secret|access_token|auth_token|private_key)\s*[=:]\s*['"]\w{8,}['"]""", description="Hardcoded credentials detected. Sensitive information should not be stored in source code.", risk_level=RiskLevel.HIGH, remediation="""Store credentials securely: VULNERABLE: password = "MySecretPass123" api_key = "sk-1234567890abcdef" SECURE: import os password = os.environ.get('DB_PASSWORD') api_key = os.environ.get('API_KEY') Use environment variables, secrets managers (AWS Secrets Manager, HashiCorp Vault), or encrypted configuration files.""", cwe_id="CWE-798", languages=['.py', '.java', '.php', '.js', '.ts', '.rb', '.go', '.cs', '.yml', '.yaml', '.json'], false_positive_patterns=[r'example', r'placeholder', r'your_', r'<.*>', r'xxx', r'\$\{'] ), SASTRule( name="Weak Password Hashing", pattern=r"""(?:md5|sha1)\s*\(|hashlib\.(?:md5|sha1)\(|MessageDigest\.getInstance\s*\(\s*['"](MD5|SHA-?1)['"]|password.*=.*(?:md5|sha1)""", description="Weak hashing algorithm used for passwords. MD5 and SHA1 are cryptographically broken for password storage.", risk_level=RiskLevel.HIGH, remediation="""Use strong password hashing algorithms: VULNERABLE: hashed = hashlib.md5(password.encode()).hexdigest() SECURE: import bcrypt hashed = bcrypt.hashpw(password.encode(), bcrypt.gensalt()) # Or use argon2 (recommended): from argon2 import PasswordHasher ph = PasswordHasher() hashed = ph.hash(password) Recommended algorithms: Argon2, bcrypt, scrypt, PBKDF2""", cwe_id="CWE-328", languages=['.py', '.java', '.php', '.js', '.ts', '.rb', '.go', '.cs'] ), SASTRule( name="JWT Without Verification", pattern=r"""jwt\.decode\s*\([^)]*verify\s*=\s*False|algorithms\s*=\s*\[?\s*['"](none|HS256)['"]|\.decode\(\s*token\s*\)|jsonwebtoken\.decode\s*\(""", description="JWT token decoded without proper verification or using weak/no algorithm.", risk_level=RiskLevel.HIGH, remediation="""Always verify JWT signatures: VULNERABLE: payload = jwt.decode(token, verify=False) payload = jwt.decode(token, algorithms=['none']) SECURE: payload = jwt.decode( token, SECRET_KEY, algorithms=['RS256'], # Use asymmetric algorithms options={'verify_exp': True} ) Use RS256 or ES256 instead of HS256 for better security.""", cwe_id="CWE-347", languages=['.py', '.js', '.ts', '.java', '.go'] ), SASTRule( name="Session Fixation Risk", pattern=r"""session\s*\[\s*['"].*['"]\s*\]\s*=.*request\.|req\.session\s*=.*req\.(body|query|params)|session_id\s*=.*(?:GET|POST|request)""", description="Potential session fixation vulnerability. Session identifiers should be regenerated after authentication.", risk_level=RiskLevel.MEDIUM, remediation="""Regenerate session after authentication: VULNERABLE: session['user_id'] = user.id # Without regenerating SECURE (Python/Flask): from flask import session session.regenerate() # Regenerate session ID session['user_id'] = user.id SECURE (Node.js/Express): req.session.regenerate((err) => { req.session.userId = user.id; });""", cwe_id="CWE-384", languages=['.py', '.js', '.ts', '.php', '.java'] ), # ============================================================ # CRYPTOGRAPHIC VULNERABILITIES # Like checking if the safe is actually secure # ============================================================ SASTRule( name="Weak Cryptographic Algorithm", pattern=r"""(?:DES|RC4|RC2|Blowfish|IDEA)(?:\.|\s|Cipher)|Cipher\.getInstance\s*\(\s*['"](DES|RC4|Blowfish)['"]\)|from\s+Crypto\.Cipher\s+import\s+(DES|Blowfish)|cryptography.*(?:DES|RC4|Blowfish)""", description="Weak cryptographic algorithm detected. DES, RC4, RC2, and Blowfish are considered insecure.", risk_level=RiskLevel.HIGH, remediation="""Use modern cryptographic algorithms: VULNERABLE: from Crypto.Cipher import DES cipher = DES.new(key, DES.MODE_CBC) SECURE: from cryptography.fernet import Fernet # Or for low-level: from cryptography.hazmat.primitives.ciphers import Cipher, algorithms cipher = Cipher(algorithms.AES(key), modes.GCM(iv)) Recommended: AES-256-GCM, ChaCha20-Poly1305""", cwe_id="CWE-327", languages=['.py', '.java', '.js', '.ts', '.go', '.cs', '.php'] ), SASTRule( name="Insecure Random Number Generator", pattern=r"""(?:random\.random|random\.randint|Math\.random|rand\(\)|srand\(\)|mt_rand)\s*\(""", description="Insecure random number generator used. These are not cryptographically secure and shouldn't be used for security purposes.", risk_level=RiskLevel.MEDIUM, remediation="""Use cryptographically secure random generators: VULNERABLE: token = ''.join(random.choices(string.ascii_letters, k=32)) SECURE (Python): import secrets token = secrets.token_urlsafe(32) SECURE (JavaScript): const array = new Uint8Array(32); crypto.getRandomValues(array); SECURE (Java): SecureRandom random = new SecureRandom();""", cwe_id="CWE-338", languages=['.py', '.js', '.ts', '.java', '.php', '.c', '.cpp'], false_positive_patterns=[r'random\.seed', r'shuffle', r'sample'] ), SASTRule( name="Hardcoded Cryptographic Key", pattern=r"""(?:key|iv|nonce|salt)\s*[=:]\s*(?:b?['"]\w{16,}['"]|bytes\s*\(\s*['"]\w{16,}['"])""", description="Hardcoded cryptographic key detected. Encryption keys should never be stored in source code.", risk_level=RiskLevel.CRITICAL, remediation="""Store cryptographic keys securely: VULNERABLE: key = b'ThisIsASecretKey1234567890123456' SECURE: import os key = os.environ.get('ENCRYPTION_KEY').encode() # Or use a key management system: from aws_encryption_sdk import KMSMasterKeyProvider key_provider = KMSMasterKeyProvider(key_ids=[KEY_ARN]) Best practice: Use Hardware Security Modules (HSM) or Key Management Services (AWS KMS, Azure Key Vault).""", cwe_id="CWE-321", languages=['.py', '.java', '.js', '.ts', '.go', '.cs', '.php'] ), # ============================================================ # INSECURE DESERIALIZATION # Like accepting packages without checking what's inside # ============================================================ SASTRule( name="Insecure Deserialization (Python)", pattern=r"""pickle\.loads?\s*\(|yaml\.(?:unsafe_)?load\s*\([^)]*(?!Loader\s*=\s*yaml\.SafeLoader)|marshal\.loads?\s*\(|shelve\.open\s*\(""", description="Insecure deserialization detected. Deserializing untrusted data can lead to remote code execution.", risk_level=RiskLevel.CRITICAL, remediation="""Use safe deserialization methods: VULNERABLE: data = pickle.loads(user_input) config = yaml.load(file) SECURE: import json data = json.loads(user_input) # JSON is safe # For YAML, always use SafeLoader: config = yaml.load(file, Loader=yaml.SafeLoader) # Or better: config = yaml.safe_load(file) Never deserialize untrusted data with pickle/marshal.""", cwe_id="CWE-502", languages=['.py'] ), SASTRule( name="Insecure Deserialization (Java)", pattern=r"""ObjectInputStream\s*\(|readObject\s*\(\)|XMLDecoder\s*\(|XStream\.fromXML\s*\(|JSON\.parse\s*\(.*\)\.class""", description="Insecure deserialization detected in Java. Can lead to remote code execution.", risk_level=RiskLevel.CRITICAL, remediation="""Validate and filter deserialization: VULNERABLE: ObjectInputStream ois = new ObjectInputStream(input); Object obj = ois.readObject(); SECURE: // Use a whitelist filter ObjectInputFilter filter = ObjectInputFilter.Config.createFilter( "com.myapp.SafeClass;!*" ); ois.setObjectInputFilter(filter); // Or use JSON/Protocol Buffers instead of Java serialization Consider: Jackson with @JsonTypeInfo restrictions, or Protocol Buffers for type-safe serialization.""", cwe_id="CWE-502", languages=['.java'] ), SASTRule( name="Insecure Deserialization (JavaScript)", pattern=r"""(?:eval|Function)\s*\(\s*(?:JSON\.parse|atob|decodeURIComponent)|node-serialize|serialize-javascript.*(?:eval|Function)|unserialize\s*\(""", description="Insecure deserialization in JavaScript. Eval of untrusted data can lead to code execution.", risk_level=RiskLevel.CRITICAL, remediation="""Never eval deserialized data: VULNERABLE: eval(JSON.parse(userInput).code); const obj = serialize.unserialize(userInput); SECURE: const data = JSON.parse(userInput); // Validate structure before use if (typeof data.name !== 'string') { throw new Error('Invalid data'); } Avoid node-serialize and similar libraries with eval-based deserialization.""", cwe_id="CWE-502", languages=['.js', '.ts'] ), # ============================================================ # INFORMATION DISCLOSURE # Like leaving sensitive documents in public view # ============================================================ SASTRule( name="Debug Mode Enabled", pattern=r"""(?:DEBUG|debug)\s*[=:]\s*(?:True|true|1|['"](true|on|yes)['"])|app\.run\s*\([^)]*debug\s*=\s*True|FLASK_DEBUG\s*=\s*1""", description="Debug mode appears to be enabled. This can expose sensitive information in production.", risk_level=RiskLevel.MEDIUM, remediation="""Disable debug mode in production: VULNERABLE: app.run(debug=True) DEBUG = True SECURE: import os DEBUG = os.environ.get('DEBUG', 'False').lower() == 'true' # In production: app.run(debug=False) Use environment variables to control debug settings.""", cwe_id="CWE-215", languages=['.py', '.js', '.ts', '.java', '.php', '.rb', '.yml', '.yaml', '.json'], false_positive_patterns=[r'#.*DEBUG', r'//.*DEBUG', r'debug.*log'] ), SASTRule( name="Sensitive Data in Logs", pattern=r"""(?:log(?:ger)?\.(?:info|debug|warn|error|critical)|print|console\.log|System\.out\.print)\s*\([^)]*(?:password|secret|token|key|credit.?card|ssn|api.?key)""", description="Sensitive data may be written to logs. This can expose credentials and personal information.", risk_level=RiskLevel.MEDIUM, remediation="""Never log sensitive information: VULNERABLE: logger.info(f"User login: {username}, password: {password}") SECURE: logger.info(f"User login: {username}") # Or mask sensitive data: logger.info(f"API key: {api_key[:4]}****") Use structured logging with sensitive field filtering.""", cwe_id="CWE-532", languages=['.py', '.java', '.js', '.ts', '.rb', '.go', '.php'] ), SASTRule( name="Stack Trace Exposure", pattern=r"""(?:printStackTrace|traceback\.print_exc|console\.trace|e\.stack|err\.stack)\s*\(?\)?|except.*?:?\s*pass|rescue\s*=>\s*nil""", description="Stack traces may be exposed to users or exceptions silently ignored.", risk_level=RiskLevel.LOW, remediation="""Handle exceptions properly without exposing internals: VULNERABLE: except Exception as e: return str(e) # Exposes internal details SECURE: except Exception as e: logger.exception("Operation failed") # Log internally return {"error": "An error occurred"} # Generic message Never expose full stack traces to end users.""", cwe_id="CWE-209", languages=['.py', '.java', '.js', '.ts', '.rb', '.php'] ), # ============================================================ # SECURITY MISCONFIGURATION # Like leaving doors unlocked or windows open # ============================================================ SASTRule( name="CORS Wildcard", pattern=r"""(?:Access-Control-Allow-Origin|cors)\s*[=:]\s*['"]\*['"]|\.allowedOrigins\s*\(\s*['"]\*['"]|cors\s*\(\s*\{[^}]*origin\s*:\s*(?:true|['"]\*['"])""", description="CORS configured to allow all origins. This can enable cross-site request attacks.", risk_level=RiskLevel.MEDIUM, remediation="""Restrict CORS to specific trusted origins: VULNERABLE: Access-Control-Allow-Origin: * cors({ origin: '*' }) SECURE: cors({ origin: ['https://trusted-site.com'], methods: ['GET', 'POST'], credentials: true }) Never use wildcard CORS with credentials.""", cwe_id="CWE-942", languages=['.py', '.java', '.js', '.ts', '.php', '.rb', '.go'] ), SASTRule( name="SSL/TLS Verification Disabled", pattern=r"""verify\s*[=:]\s*False|VERIFY_SSL\s*=\s*False|ssl\s*[=:]\s*False|rejectUnauthorized\s*[=:]\s*false|InsecureSkipVerify\s*[=:]\s*true|CURLOPT_SSL_VERIFYPEER.*false""", description="SSL/TLS certificate verification is disabled. This makes the application vulnerable to man-in-the-middle attacks.", risk_level=RiskLevel.HIGH, remediation="""Always verify SSL/TLS certificates: VULNERABLE: requests.get(url, verify=False) https.request({rejectUnauthorized: false}) SECURE: requests.get(url, verify=True) # Or with custom CA: requests.get(url, verify='/path/to/ca-bundle.crt') If you need to use self-signed certs in development, use environment-based configuration.""", cwe_id="CWE-295", languages=['.py', '.java', '.js', '.ts', '.go', '.php', '.rb'] ), SASTRule( name="Insecure HTTP", pattern=r"""['"](http://(?!localhost|127\.0\.0\.1|0\.0\.0\.0)[^'"]+)['"]""", description="Insecure HTTP URL detected. Data transmitted over HTTP can be intercepted.", risk_level=RiskLevel.LOW, remediation="""Use HTTPS for all external communications: VULNERABLE: api_url = "http://api.example.com/data" SECURE: api_url = "https://api.example.com/data" Configure HSTS (HTTP Strict Transport Security) on your servers.""", cwe_id="CWE-319", languages=['.py', '.java', '.js', '.ts', '.go', '.php', '.rb', '.yml', '.yaml', '.json'], false_positive_patterns=[r'#.*http://', r'//.*http://', r'example\.com', r'schema.*http://'] ), SASTRule( name="Missing Security Headers", pattern=r"""(?:Content-Security-Policy|X-Frame-Options|X-Content-Type-Options|Strict-Transport-Security)\s*[=:]\s*['"]['""]|no_header|disable.*header""", description="Security headers may be missing or disabled. This can enable various attacks.", risk_level=RiskLevel.LOW, remediation="""Configure security headers: Add these headers to your responses: Content-Security-Policy: default-src 'self' X-Frame-Options: DENY X-Content-Type-Options: nosniff Strict-Transport-Security: max-age=31536000; includeSubDomains X-XSS-Protection: 1; mode=block Use helmet.js (Node), django-csp, or similar libraries.""", cwe_id="CWE-693", languages=['.py', '.java', '.js', '.ts', '.php', '.rb'] ), # ============================================================ # XML VULNERABILITIES # XML parsers can be tricked into dangerous behavior # ============================================================ SASTRule( name="XXE (XML External Entity)", pattern=r"""(?:xml\.etree|lxml|xml\.dom|xml\.sax|XMLReader|DocumentBuilder|SAXParser|XMLParser).*(?:parse|read|load)| List[Vulnerability]: """ Scan a single file for vulnerabilities. Like running a specific diagnostic test on one tissue sample. """ vulnerabilities = [] try: file_ext = Path(file_path).suffix.lower() if file_ext not in self.file_extensions: return vulnerabilities with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: code = f.read() for rule in self.rules: matches = rule.matches(code, file_ext) for line_num, snippet in matches: vuln = Vulnerability( name=rule.name, description=rule.description, file_path=file_path, line_number=line_num, code_snippet=snippet, risk_level=rule.risk_level, remediation=rule.remediation, cwe_id=rule.cwe_id ) vulnerabilities.append(vuln) except Exception as e: # Log but don't fail on individual file errors pass return vulnerabilities def scan_directory(self, directory: str, max_workers: int = 8, use_parallel: bool = True) -> Tuple[List[Vulnerability], int]: """ Recursively scan a directory with optional parallel processing. Like performing a full-body scan. Args: directory: Path to directory to scan max_workers: Number of parallel workers (default: 8) use_parallel: Whether to use parallel processing (default: True) Returns: Tuple of (vulnerabilities, files_scanned) """ vulnerabilities = [] files_scanned = 0 # Collect all files to scan files_to_scan = [] for root, dirs, files in os.walk(directory): # Skip unwanted directories dirs[:] = [d for d in dirs if d not in self.skip_dirs] for file in files: file_path = os.path.join(root, file) file_ext = Path(file_path).suffix.lower() if file_ext in self.file_extensions: files_to_scan.append(file_path) if not use_parallel or len(files_to_scan) <= 1: # Sequential processing (original behavior) for file_path in files_to_scan: files_scanned += 1 vulns = self.scan_file(file_path) vulnerabilities.extend(vulns) else: # Parallel processing using ThreadPoolExecutor with ThreadPoolExecutor(max_workers=max_workers) as executor: # Submit all scan jobs future_to_file = { executor.submit(self.scan_file, file_path): file_path for file_path in files_to_scan } # Collect results as they complete for future in as_completed(future_to_file): file_path = future_to_file[future] try: vulns = future.result() vulnerabilities.extend(vulns) files_scanned += 1 except Exception as e: # Log error but continue with other files print(f"Error scanning {file_path}: {e}") files_scanned += 1 # Count as scanned even if error return vulnerabilities, files_scanned class NVDClient: """ NIST National Vulnerability Database Client Analogy: This is like searching a medical journal database - Searches for known vulnerabilities (diseases) by keyword - Retrieves detailed information including severity scores - Provides references to official documentation The NVD contains over 200,000 known vulnerabilities (CVEs) with detailed descriptions, severity scores, and references. """ BASE_URL = "https://services.nvd.nist.gov/rest/json/cves/2.0" def __init__(self, api_key: Optional[str] = None): """ Initialize NVD client. Args: api_key: Optional NVD API key for higher rate limits. Get one free at: https://nvd.nist.gov/developers/request-an-api-key """ self.api_key = api_key self.rate_limit_delay = 0.6 if api_key else 6.0 # NVD rate limits async def search_vulnerabilities( self, keyword: Optional[str] = None, cwe_id: Optional[str] = None, severity: Optional[str] = None, limit: int = 20 ) -> List[Dict]: """ Search the NVD for vulnerabilities. Args: keyword: Search term (e.g., "sql injection python") cwe_id: CWE ID to filter by (e.g., "CWE-89") severity: Severity level (LOW, MEDIUM, HIGH, CRITICAL) limit: Maximum results to return Returns: List of CVE entries with details """ params = {"resultsPerPage": min(limit, 100)} if keyword: params["keywordSearch"] = keyword if cwe_id: # Format: CWE-89 -> CWE-89 params["cweId"] = cwe_id if severity: params["cvssV3Severity"] = severity.upper() headers = {} if self.api_key: headers["apiKey"] = self.api_key try: async with aiohttp.ClientSession() as session: async with session.get( self.BASE_URL, params=params, headers=headers, timeout=aiohttp.ClientTimeout(total=30) ) as response: if response.status == 200: data = await response.json() return self._parse_results(data) elif response.status == 403: return [{"error": "NVD API rate limited. Consider using an API key."}] else: return [{"error": f"NVD API error: {response.status}"}] except asyncio.TimeoutError: return [{"error": "NVD API request timed out"}] except Exception as e: return [{"error": f"NVD API error: {str(e)}"}] def _parse_results(self, data: Dict) -> List[Dict]: """Parse NVD API response into a cleaner format.""" results = [] for vuln in data.get("vulnerabilities", []): cve = vuln.get("cve", {}) cve_id = cve.get("id", "Unknown") # Get description descriptions = cve.get("descriptions", []) description = next( (d["value"] for d in descriptions if d.get("lang") == "en"), "No description available" ) # Get CVSS score and severity metrics = cve.get("metrics", {}) cvss_data = None severity = "UNKNOWN" score = 0.0 # Try CVSS v3.1, then v3.0, then v2.0 for version in ["cvssMetricV31", "cvssMetricV30", "cvssMetricV2"]: if version in metrics and metrics[version]: cvss_data = metrics[version][0] if "cvssData" in cvss_data: score = cvss_data["cvssData"].get("baseScore", 0) severity = cvss_data["cvssData"].get("baseSeverity", "UNKNOWN") break # Get references references = [ ref.get("url") for ref in cve.get("references", [])[:5] ] # Get CWE IDs cwes = [] for weakness in cve.get("weaknesses", []): for desc in weakness.get("description", []): if desc.get("value", "").startswith("CWE-"): cwes.append(desc["value"]) results.append({ "cve_id": cve_id, "description": description[:500] + "..." if len(description) > 500 else description, "severity": severity, "cvss_score": score, "cwes": cwes, "references": references, "published": cve.get("published", "Unknown"), "last_modified": cve.get("lastModified", "Unknown") }) return results async def get_cve_details(self, cve_id: str) -> Optional[Dict]: """ Get detailed information about a specific CVE. Args: cve_id: CVE identifier (e.g., "CVE-2021-44228") Returns: Detailed CVE information or None if not found """ params = {"cveId": cve_id} headers = {} if self.api_key: headers["apiKey"] = self.api_key try: async with aiohttp.ClientSession() as session: async with session.get( self.BASE_URL, params=params, headers=headers, timeout=aiohttp.ClientTimeout(total=30) ) as response: if response.status == 200: data = await response.json() results = self._parse_results(data) return results[0] if results else None return None except Exception: return None async def find_related_cves(self, cwe_id: str, limit: int = 10) -> List[Dict]: """ Find CVEs related to a specific CWE. This helps answer "What known attacks use this vulnerability type?" Args: cwe_id: CWE identifier (e.g., "CWE-89") limit: Maximum results Returns: List of related CVEs """ return await self.search_vulnerabilities(cwe_id=cwe_id, limit=limit) class WebAppScanner: """ Web Application Scanner Analogy: Like a physical security inspector - Checks doors and windows (endpoints) - Tests locks (authentication) - Looks for signs of vulnerability This performs basic web security checks without being intrusive. For full web app testing, specialized tools like OWASP ZAP are recommended. """ def __init__(self): self.common_paths = [ # Admin paths "/admin", "/administrator", "/admin.php", "/admin.html", "/wp-admin", "/cpanel", "/phpmyadmin", # Sensitive files "/.git/config", "/.env", "/config.php", "/wp-config.php", "/.htaccess", "/web.config", "/robots.txt", "/sitemap.xml", # Backup files "/backup.zip", "/backup.sql", "/db.sql", "/database.sql", # API endpoints "/api", "/api/v1", "/graphql", "/swagger.json", "/openapi.json", # Debug/test "/debug", "/test", "/phpinfo.php", "/info.php", ] self.security_headers = [ "Content-Security-Policy", "X-Frame-Options", "X-Content-Type-Options", "X-XSS-Protection", "Strict-Transport-Security", "Referrer-Policy", "Permissions-Policy" ] async def scan_url(self, url: str) -> List[Vulnerability]: """ Perform security scan on a web application. Args: url: Target URL (e.g., "https://example.com") Returns: List of discovered vulnerabilities """ vulnerabilities = [] # Normalize URL if not url.startswith(('http://', 'https://')): url = 'https://' + url parsed = urlparse(url) base_url = f"{parsed.scheme}://{parsed.netloc}" async with aiohttp.ClientSession() as session: # Check security headers header_vulns = await self._check_security_headers(session, base_url) vulnerabilities.extend(header_vulns) # Check for exposed sensitive files exposure_vulns = await self._check_sensitive_paths(session, base_url) vulnerabilities.extend(exposure_vulns) # Check HTTPS configuration https_vulns = await self._check_https(session, url) vulnerabilities.extend(https_vulns) # Check for common vulnerabilities in responses content_vulns = await self._check_response_content(session, base_url) vulnerabilities.extend(content_vulns) return vulnerabilities async def _check_security_headers( self, session: aiohttp.ClientSession, url: str ) -> List[Vulnerability]: """Check for missing or misconfigured security headers.""" vulnerabilities = [] try: async with session.head(url, timeout=aiohttp.ClientTimeout(total=10)) as response: headers = response.headers for header in self.security_headers: if header not in headers: vuln = Vulnerability( name=f"Missing Security Header: {header}", description=f"The {header} header is not set. This header helps protect against various attacks.", file_path=url, line_number=0, code_snippet=f"Response headers do not include {header}", risk_level=RiskLevel.LOW if header != "Content-Security-Policy" else RiskLevel.MEDIUM, remediation=self._get_header_remediation(header), cwe_id="CWE-693" ) vulnerabilities.append(vuln) # Check for server version disclosure if "Server" in headers and any(v in headers["Server"].lower() for v in ["apache/", "nginx/", "iis/"]): vuln = Vulnerability( name="Server Version Disclosure", description=f"Server header reveals version information: {headers['Server']}", file_path=url, line_number=0, code_snippet=f"Server: {headers['Server']}", risk_level=RiskLevel.INFO, remediation="Configure your web server to hide version information. In Apache, use 'ServerTokens Prod'. In Nginx, use 'server_tokens off'.", cwe_id="CWE-200" ) vulnerabilities.append(vuln) except Exception: pass return vulnerabilities async def _check_sensitive_paths( self, session: aiohttp.ClientSession, base_url: str ) -> List[Vulnerability]: """Check for exposed sensitive files and directories.""" vulnerabilities = [] async def check_path(path: str): try: url = f"{base_url}{path}" async with session.get( url, timeout=aiohttp.ClientTimeout(total=5), allow_redirects=False ) as response: if response.status == 200: return path, response.status return None except Exception: return None # Check paths concurrently tasks = [check_path(path) for path in self.common_paths] results = await asyncio.gather(*tasks) for result in results: if result: path, status = result risk = RiskLevel.HIGH if any( s in path for s in ['.git', '.env', 'config', 'backup', 'sql'] ) else RiskLevel.MEDIUM vuln = Vulnerability( name=f"Exposed Sensitive Path: {path}", description=f"The path {path} is accessible and may expose sensitive information.", file_path=f"{base_url}{path}", line_number=0, code_snippet=f"HTTP {status} returned for {path}", risk_level=risk, remediation=f"Restrict access to {path} using web server configuration. Add authentication or remove from public access.", cwe_id="CWE-538" ) vulnerabilities.append(vuln) return vulnerabilities async def _check_https( self, session: aiohttp.ClientSession, url: str ) -> List[Vulnerability]: """Check HTTPS configuration.""" vulnerabilities = [] parsed = urlparse(url) if parsed.scheme == "http": vuln = Vulnerability( name="Insecure HTTP Connection", description="The target is using HTTP instead of HTTPS. All data transmitted is unencrypted.", file_path=url, line_number=0, code_snippet=f"URL scheme: {parsed.scheme}", risk_level=RiskLevel.HIGH, remediation="Enable HTTPS with a valid TLS certificate. Consider using Let's Encrypt for free certificates. Configure HSTS to prevent downgrade attacks.", cwe_id="CWE-319" ) vulnerabilities.append(vuln) return vulnerabilities async def _check_response_content( self, session: aiohttp.ClientSession, base_url: str ) -> List[Vulnerability]: """Check response content for potential vulnerabilities.""" vulnerabilities = [] try: async with session.get( base_url, timeout=aiohttp.ClientTimeout(total=10) ) as response: if response.status == 200: content = await response.text() # Check for error messages that reveal information error_patterns = [ (r"mysql_error|mysqli_error|pg_error", "Database Error Disclosure", "CWE-209"), (r"stack\s*trace|traceback|exception.*at\s+line", "Stack Trace Disclosure", "CWE-209"), (r"debug\s*=\s*true|debug_mode|development_mode", "Debug Mode Enabled", "CWE-215"), (r"", "Sensitive Data in Comments", "CWE-615"), ] for pattern, name, cwe in error_patterns: if re.search(pattern, content, re.IGNORECASE): vuln = Vulnerability( name=name, description=f"The response contains {name.lower()} which may reveal sensitive information.", file_path=base_url, line_number=0, code_snippet=f"Pattern detected: {pattern}", risk_level=RiskLevel.MEDIUM, remediation=f"Remove {name.lower()} from production responses. Configure error handling to show generic messages.", cwe_id=cwe ) vulnerabilities.append(vuln) except Exception: pass return vulnerabilities def _get_header_remediation(self, header: str) -> str: """Get specific remediation advice for missing headers.""" remediations = { "Content-Security-Policy": "Add CSP header to control resource loading. Start with: Content-Security-Policy: default-src 'self'", "X-Frame-Options": "Add: X-Frame-Options: DENY (or SAMEORIGIN if you need framing)", "X-Content-Type-Options": "Add: X-Content-Type-Options: nosniff", "X-XSS-Protection": "Add: X-XSS-Protection: 1; mode=block (note: deprecated in favor of CSP)", "Strict-Transport-Security": "Add: Strict-Transport-Security: max-age=31536000; includeSubDomains", "Referrer-Policy": "Add: Referrer-Policy: strict-origin-when-cross-origin", "Permissions-Policy": "Add: Permissions-Policy: geolocation=(), microphone=(), camera=()" } return remediations.get(header, f"Configure the {header} header appropriately.") class SecurityChecker: """ Main Security Checker - Orchestrates all scanning capabilities. Analogy: This is like a complete medical center - Diagnostic imaging (SAST) - Medical database (NVD) - Physical examination (Web Scanner) - Report generation (Results) Usage: checker = SecurityChecker() # Scan local code result = await checker.scan_local("/path/to/project") # Scan web app result = await checker.scan_web("https://example.com") # Generate report report = checker.generate_report(result) """ def __init__(self, nvd_api_key: Optional[str] = None): self.sast_engine = SASTEngine() self.nvd_client = NVDClient(api_key=nvd_api_key) self.web_scanner = WebAppScanner() async def scan_local(self, path: str, include_nvd: bool = True, max_workers: int = 8, use_parallel: bool = True) -> ScanResult: """ Scan a local directory for vulnerabilities. Args: path: Path to directory or file include_nvd: Whether to enrich results with NVD data max_workers: Number of parallel workers for file scanning (default: 8) use_parallel: Whether to use parallel processing (default: True) Returns: ScanResult with all findings """ result = ScanResult( target=path, scan_type="local", start_time=datetime.now() ) if not os.path.exists(path): result.errors.append(f"Path does not exist: {path}") result.end_time = datetime.now() return result # Run SAST scan if os.path.isfile(path): vulns = self.sast_engine.scan_file(path) result.files_scanned = 1 else: vulns, files_scanned = self.sast_engine.scan_directory( path, max_workers=max_workers, use_parallel=use_parallel ) result.files_scanned = files_scanned # Enrich with NVD data if requested if include_nvd and vulns: vulns = await self._enrich_with_nvd(vulns) result.vulnerabilities = vulns result.end_time = datetime.now() return result async def scan_web(self, url: str, include_nvd: bool = True) -> ScanResult: """ Scan a web application for vulnerabilities. Args: url: Target URL include_nvd: Whether to enrich results with NVD data Returns: ScanResult with all findings """ result = ScanResult( target=url, scan_type="web", start_time=datetime.now() ) try: vulns = await self.web_scanner.scan_url(url) if include_nvd and vulns: vulns = await self._enrich_with_nvd(vulns) result.vulnerabilities = vulns result.files_scanned = 1 # One URL scanned except Exception as e: result.errors.append(str(e)) result.end_time = datetime.now() return result async def _enrich_with_nvd( self, vulnerabilities: List[Vulnerability] ) -> List[Vulnerability]: """ Enrich vulnerability findings with NVD data. This adds related CVEs to each finding, showing real-world examples of the vulnerability being exploited. """ # Group by CWE to reduce API calls cwe_groups = {} for vuln in vulnerabilities: if vuln.cwe_id: if vuln.cwe_id not in cwe_groups: cwe_groups[vuln.cwe_id] = [] cwe_groups[vuln.cwe_id].append(vuln) # Fetch CVEs for each CWE for cwe_id, vuln_list in cwe_groups.items(): try: cves = await self.nvd_client.find_related_cves(cwe_id, limit=5) cve_ids = [cve.get("cve_id") for cve in cves if "error" not in cve] for vuln in vuln_list: vuln.cve_ids = cve_ids[:3] # Add top 3 related CVEs # Rate limiting await asyncio.sleep(self.nvd_client.rate_limit_delay) except Exception: pass return vulnerabilities async def search_nvd( self, keyword: Optional[str] = None, cwe_id: Optional[str] = None, severity: Optional[str] = None ) -> List[Dict]: """ Search the NVD directly. Useful for researching specific vulnerabilities. """ return await self.nvd_client.search_vulnerabilities( keyword=keyword, cwe_id=cwe_id, severity=severity ) def generate_report( self, result: ScanResult, format: str = "text" ) -> str: """ Generate a human-readable report. Args: result: ScanResult from a scan format: Output format ("text", "json", "html") Returns: Formatted report string """ if format == "json": return self._generate_json_report(result) elif format == "markdown": return self._generate_markdown_report(result) else: return self._generate_text_report(result) def _generate_text_report(self, result: ScanResult) -> str: """Generate plain text report.""" lines = [ "=" * 70, "SECURITY SCAN REPORT", "=" * 70, "", f"Target: {result.target}", f"Scan Type: {result.scan_type.upper()}", f"Start Time: {result.start_time.strftime('%Y-%m-%d %H:%M:%S')}", f"End Time: {result.end_time.strftime('%Y-%m-%d %H:%M:%S') if result.end_time else 'N/A'}", f"Files Scanned: {result.files_scanned}", "", ] # Summary summary = result.summary() lines.extend([ "-" * 70, "SUMMARY", "-" * 70, f"Total Vulnerabilities: {summary['total_vulnerabilities']}", "", "By Severity:", ]) for severity, count in summary["by_severity"].items(): if count > 0: lines.append(f" {severity}: {count}") lines.append("") if result.errors: lines.extend([ "-" * 70, "ERRORS", "-" * 70, ]) for error in result.errors: lines.append(f" • {error}") lines.append("") # Vulnerabilities by severity if result.vulnerabilities: lines.extend([ "-" * 70, "DETAILED FINDINGS", "-" * 70, "", ]) # Sort by severity severity_order = { RiskLevel.CRITICAL: 0, RiskLevel.HIGH: 1, RiskLevel.MEDIUM: 2, RiskLevel.LOW: 3, RiskLevel.INFO: 4 } sorted_vulns = sorted( result.vulnerabilities, key=lambda v: severity_order.get(v.risk_level, 5) ) for i, vuln in enumerate(sorted_vulns, 1): lines.extend([ f"[{i}] {vuln.name}", f" Severity: {vuln.risk_level.value}", f" Location: {vuln.file_path}:{vuln.line_number}", f" CWE: {vuln.cwe_id or 'N/A'}", "", f" Description:", f" {vuln.description}", "", f" Code:", " " + "-" * 40, ]) for line in vuln.code_snippet.split('\n'): lines.append(f" {line}") lines.extend([ " " + "-" * 40, "", f" Remediation:", ]) if vuln.remediation and vuln.remediation != "No known solution": for line in vuln.remediation.split('\n'): lines.append(f" {line}") else: lines.append(" No known solution") if vuln.cve_ids: lines.extend([ "", f" Related CVEs: {', '.join(vuln.cve_ids)}", ]) lines.extend(["", ""]) lines.extend([ "=" * 70, "END OF REPORT", "=" * 70, ]) return "\n".join(lines) def _generate_json_report(self, result: ScanResult) -> str: """Generate JSON report.""" report = { "target": result.target, "scan_type": result.scan_type, "start_time": result.start_time.isoformat(), "end_time": result.end_time.isoformat() if result.end_time else None, "summary": result.summary(), "vulnerabilities": [v.to_dict() for v in result.vulnerabilities], "errors": result.errors } return json.dumps(report, indent=2) def _generate_markdown_report(self, result: ScanResult) -> str: """Generate Markdown report optimized for vibe-coding platforms.""" summary = result.summary() severity_order = ["CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO"] lines = [] lines.append("# Security Scan Report\n") lines.append(f"**Target:** `{result.target}` ") lines.append(f"**Scan Type:** {result.scan_type.upper()} ") lines.append(f"**Date:** {result.start_time.strftime('%Y-%m-%d %H:%M:%S')} ") lines.append(f"**Files Scanned:** {result.files_scanned} ") # Summary line counts = [] for sev in severity_order: count = summary['by_severity'].get(sev, 0) if count > 0: counts.append(f"{count} {sev.capitalize()}") total = summary['total_vulnerabilities'] lines.append(f"**Total Vulnerabilities:** {total}" + (f" ({', '.join(counts)})" if counts else "")) lines.append("\n---\n") if total == 0: lines.append("No vulnerabilities found.\n") return "\n".join(lines) # Group vulnerabilities by severity by_severity = {} for vuln in result.vulnerabilities: sev = vuln.risk_level.value by_severity.setdefault(sev, []).append(vuln) finding_num = 0 for sev in severity_order: vulns = by_severity.get(sev, []) if not vulns: continue lines.append(f"## {sev.capitalize()}\n") for vuln in vulns: finding_num += 1 cwe = f" ({vuln.cwe_id})" if vuln.cwe_id else "" lines.append(f"### {finding_num}. {vuln.name}{cwe}\n") lines.append(f"- **File:** `{vuln.file_path}:{vuln.line_number}`") lines.append(f"- **Confidence:** {vuln.confidence}") lines.append(f"- **Description:** {vuln.description}") if vuln.code_snippet and vuln.code_snippet.strip(): ext = os.path.splitext(vuln.file_path)[1].lstrip(".") lang = ext if ext else "" lines.append(f"- **Code:**") lines.append(f" ```{lang}") lines.append(f" {vuln.code_snippet.strip()}") lines.append(f" ```") if vuln.remediation: lines.append(f"- **Remediation:** {vuln.remediation.strip()}") if vuln.cve_ids: lines.append(f"- **Related CVEs:** {', '.join(vuln.cve_ids)}") lines.append("") lines.append("---\n") lines.append(f"*Generated by Security Auditor | {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*\n") return "\n".join(lines) # ============================================================ # CLI Interface # ============================================================ async def main(): """Command-line interface for the security checker.""" import argparse parser = argparse.ArgumentParser( description="Security Checker - SAST and NVD-powered vulnerability scanner", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Scan a local project python security_checker.py /path/to/project # Scan a web application python security_checker.py https://example.com --web # Generate HTML report python security_checker.py /path/to/project --format html -o report.html # Search NVD for SQL injection vulnerabilities python security_checker.py --nvd-search "sql injection" --severity HIGH """ ) parser.add_argument( "target", nargs="?", help="Target to scan (local path or URL)" ) parser.add_argument( "--web", action="store_true", help="Treat target as web URL" ) parser.add_argument( "--format", choices=["text", "json", "html"], default="text", help="Output format (default: text)" ) parser.add_argument( "-o", "--output", help="Output file (default: stdout)" ) parser.add_argument( "--nvd-api-key", help="NVD API key for higher rate limits" ) parser.add_argument( "--no-nvd", action="store_true", help="Skip NVD enrichment" ) parser.add_argument( "--nvd-search", help="Search NVD for vulnerabilities by keyword" ) parser.add_argument( "--severity", choices=["LOW", "MEDIUM", "HIGH", "CRITICAL"], help="Filter NVD search by severity" ) args = parser.parse_args() # Initialize checker checker = SecurityChecker(nvd_api_key=args.nvd_api_key) # NVD search mode if args.nvd_search: print(f"Searching NVD for: {args.nvd_search}") results = await checker.search_nvd( keyword=args.nvd_search, severity=args.severity ) if results and "error" not in results[0]: for cve in results: print(f"\n{cve['cve_id']} ({cve['severity']} - {cve['cvss_score']})") print(f" {cve['description'][:200]}...") if cve['cwes']: print(f" CWEs: {', '.join(cve['cwes'])}") else: print(f"Error: {results[0].get('error', 'Unknown error')}") return # Require target for scanning if not args.target: parser.print_help() return # Perform scan print(f"Scanning: {args.target}") print("This may take a moment...") if args.web or args.target.startswith(('http://', 'https://')): result = await checker.scan_web(args.target, include_nvd=not args.no_nvd) else: result = await checker.scan_local(args.target, include_nvd=not args.no_nvd) # Generate report report = checker.generate_report(result, format=args.format) # Output if args.output: with open(args.output, 'w') as f: f.write(report) print(f"Report saved to: {args.output}") else: print(report) if __name__ == "__main__": asyncio.run(main())