security_auditor / security_checker.py
MugdhaV
Initial deployment: Gradio frontend with Modal backend - Multi-language security scanner with parallel processing
e1e9580
#!/usr/bin/env python3
"""
Security Checker Application
============================
A comprehensive security analysis tool that combines:
1. Static Application Security Testing (SAST)
2. NIST National Vulnerability Database (NVD) integration
Think of this as a "security doctor" for your applications:
- SAST = X-ray machine (looks inside without running)
- NVD = Medical database (known vulnerabilities/diseases)
- Report = Diagnosis with treatment plan
Author: Security Checker Project
"""
import os
import re
import json
import hashlib
import asyncio
import aiohttp
from pathlib import Path
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Tuple
from enum import Enum
from datetime import datetime
from urllib.parse import urlparse
import fnmatch
from concurrent.futures import ThreadPoolExecutor, as_completed
class RiskLevel(Enum):
"""
Risk levels follow CVSS (Common Vulnerability Scoring System).
Think of it like triage in an emergency room:
- CRITICAL: Life-threatening, needs immediate attention
- HIGH: Serious condition, treat soon
- MEDIUM: Concerning, schedule treatment
- LOW: Minor issue, monitor
- INFO: Just a note for awareness
"""
CRITICAL = "CRITICAL" # CVSS 9.0-10.0
HIGH = "HIGH" # CVSS 7.0-8.9
MEDIUM = "MEDIUM" # CVSS 4.0-6.9
LOW = "LOW" # CVSS 0.1-3.9
INFO = "INFO" # Informational
@dataclass
class Vulnerability:
"""
Represents a single vulnerability found in the code.
Analogy: This is like a medical diagnosis report entry:
- name: Disease name
- description: What's wrong
- file_path: Where in the body (code) the problem is
- line_number: Exact location
- code_snippet: The problematic tissue sample
- risk_level: How serious is it
- remediation: Treatment plan
- cve_ids: Reference to known disease database (NVD)
"""
name: str
description: str
file_path: str
line_number: int
code_snippet: str
risk_level: RiskLevel
remediation: str
cve_ids: List[str] = field(default_factory=list)
cwe_id: Optional[str] = None
confidence: str = "HIGH" # HIGH, MEDIUM, LOW
def to_dict(self) -> Dict:
return {
"name": self.name,
"description": self.description,
"file_path": self.file_path,
"line_number": self.line_number,
"code_snippet": self.code_snippet,
"risk_level": self.risk_level.value,
"remediation": self.remediation,
"cve_ids": self.cve_ids,
"cwe_id": self.cwe_id,
"confidence": self.confidence
}
@dataclass
class ScanResult:
"""
Complete scan results - like a full medical report.
"""
target: str
scan_type: str # "local" or "web"
start_time: datetime
end_time: Optional[datetime] = None
vulnerabilities: List[Vulnerability] = field(default_factory=list)
files_scanned: int = 0
errors: List[str] = field(default_factory=list)
def summary(self) -> Dict:
"""Generate a summary of findings by risk level."""
summary = {level.value: 0 for level in RiskLevel}
for vuln in self.vulnerabilities:
summary[vuln.risk_level.value] += 1
return {
"target": self.target,
"scan_type": self.scan_type,
"duration_seconds": (self.end_time - self.start_time).total_seconds() if self.end_time else None,
"files_scanned": self.files_scanned,
"total_vulnerabilities": len(self.vulnerabilities),
"by_severity": summary,
"errors": len(self.errors)
}
class SASTRule:
"""
A single SAST detection rule.
Analogy: Like a specific test in a medical lab
- pattern: What symptom to look for
- name: Name of the condition
- languages: Which "body types" this applies to
"""
def __init__(
self,
name: str,
pattern: str,
description: str,
risk_level: RiskLevel,
remediation: str,
cwe_id: str,
languages: List[str],
false_positive_patterns: List[str] = None
):
self.name = name
self.pattern = re.compile(pattern, re.IGNORECASE | re.MULTILINE)
self.description = description
self.risk_level = risk_level
self.remediation = remediation
self.cwe_id = cwe_id
self.languages = languages # File extensions: ['.py', '.js', etc.]
self.false_positive_patterns = [
re.compile(fp, re.IGNORECASE) for fp in (false_positive_patterns or [])
]
def matches(self, code: str, file_ext: str) -> List[Tuple[int, str]]:
"""
Find all matches in the code.
Returns list of (line_number, matched_snippet).
"""
if file_ext not in self.languages:
return []
matches = []
lines = code.split('\n')
for i, line in enumerate(lines, 1):
if self.pattern.search(line):
# Check for false positives
is_false_positive = any(
fp.search(line) for fp in self.false_positive_patterns
)
if not is_false_positive:
# Get context (line before and after)
start = max(0, i - 2)
end = min(len(lines), i + 1)
snippet = '\n'.join(lines[start:end])
matches.append((i, snippet))
return matches
class SASTEngine:
"""
Static Application Security Testing Engine
Analogy: This is like a diagnostic imaging department
- Scans code without executing it (like X-ray/MRI)
- Looks for known vulnerability patterns
- Reports findings with locations
How it works:
1. Load detection rules (what to look for)
2. Read source files
3. Match patterns against code
4. Report findings
"""
def __init__(self):
self.rules = self._load_rules()
self.file_extensions = {
'.py': 'python',
'.js': 'javascript',
'.ts': 'typescript',
'.jsx': 'javascript',
'.tsx': 'typescript',
'.java': 'java',
'.php': 'php',
'.rb': 'ruby',
'.go': 'go',
'.cs': 'csharp',
'.c': 'c',
'.cpp': 'cpp',
'.h': 'c',
'.hpp': 'cpp',
'.sql': 'sql',
'.html': 'html',
'.htm': 'html',
'.xml': 'xml',
'.yml': 'yaml',
'.yaml': 'yaml',
'.json': 'json',
'.sh': 'shell',
'.bash': 'shell',
}
# Directories to skip (like avoiding scanning healthy tissue)
self.skip_dirs = {
'node_modules', 'venv', '.venv', 'env', '.env',
'__pycache__', '.git', '.svn', '.hg',
'dist', 'build', 'target', 'vendor',
'.idea', '.vscode', 'coverage'
}
def _load_rules(self) -> List[SASTRule]:
"""
Load vulnerability detection rules.
These rules are like a checklist of known security problems.
Each rule defines:
- A pattern to match (regex)
- The type of vulnerability
- How severe it is
- How to fix it
"""
return [
# ============================================================
# INJECTION VULNERABILITIES (The "contamination" category)
# Like checking for contaminants in food/medicine
# ============================================================
SASTRule(
name="SQL Injection",
pattern=r"""(?:execute|cursor\.execute|query|raw|rawQuery|executeQuery)\s*\(\s*[f"'].*?%s.*?['"]\s*%|(?:execute|cursor\.execute)\s*\(\s*[f"'].*?\{.*?\}.*?['"]|(?:SELECT|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER).*?['"]\s*\+\s*|f['"]\s*(?:SELECT|INSERT|UPDATE|DELETE).*?\{""",
description="Potential SQL Injection vulnerability. User input may be directly concatenated into SQL queries, allowing attackers to manipulate database operations.",
risk_level=RiskLevel.CRITICAL,
remediation="""Use parameterized queries or prepared statements:
VULNERABLE:
cursor.execute(f"SELECT * FROM users WHERE id = {user_id}")
SECURE:
cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,))
For ORMs, use built-in query builders instead of raw SQL.""",
cwe_id="CWE-89",
languages=['.py', '.java', '.php', '.js', '.ts', '.rb', '.go', '.cs'],
false_positive_patterns=[r'#.*SQL', r'//.*SQL', r'/\*.*SQL']
),
SASTRule(
name="Command Injection",
pattern=r"""(?:os\.system|os\.popen|subprocess\.call|subprocess\.run|subprocess\.Popen|exec|eval|Runtime\.getRuntime\(\)\.exec|shell_exec|system|passthru|popen)\s*\([^)]*(?:\+|%|\.format|\{|\$)""",
description="Potential Command Injection vulnerability. User input may be passed to system commands, allowing attackers to execute arbitrary commands.",
risk_level=RiskLevel.CRITICAL,
remediation="""Avoid passing user input to shell commands. If necessary:
VULNERABLE:
os.system(f"ping {user_input}")
SECURE:
import shlex
subprocess.run(["ping", shlex.quote(user_input)], shell=False)
Best practice: Use libraries instead of shell commands when possible.""",
cwe_id="CWE-78",
languages=['.py', '.java', '.php', '.js', '.ts', '.rb', '.go', '.sh']
),
SASTRule(
name="XSS (Cross-Site Scripting)",
pattern=r"""(?:innerHTML|outerHTML|document\.write|\.html\(|v-html|dangerouslySetInnerHTML|\[innerHTML\])\s*=?\s*(?:[^;]*(?:\+|`|\$\{))""",
description="Potential Cross-Site Scripting (XSS) vulnerability. Untrusted data may be inserted into the DOM without proper encoding.",
risk_level=RiskLevel.HIGH,
remediation="""Sanitize and encode output before inserting into HTML:
VULNERABLE:
element.innerHTML = userInput;
SECURE:
element.textContent = userInput; // For text
// Or use a sanitization library like DOMPurify:
element.innerHTML = DOMPurify.sanitize(userInput);
For React, avoid dangerouslySetInnerHTML unless absolutely necessary.""",
cwe_id="CWE-79",
languages=['.js', '.ts', '.jsx', '.tsx', '.html', '.php']
),
SASTRule(
name="Path Traversal",
pattern=r"""(?:open|read|write|file_get_contents|file_put_contents|include|require|fopen|readFile|writeFile|createReadStream)\s*\([^)]*(?:\+|`|\$\{|\.\./)""",
description="Potential Path Traversal vulnerability. User input may be used to construct file paths, allowing attackers to access unauthorized files.",
risk_level=RiskLevel.HIGH,
remediation="""Validate and sanitize file paths:
VULNERABLE:
with open(f"/uploads/{filename}") as f:
SECURE:
import os
safe_path = os.path.normpath(filename)
if '..' in safe_path or safe_path.startswith('/'):
raise ValueError("Invalid path")
full_path = os.path.join(UPLOAD_DIR, safe_path)
if not full_path.startswith(UPLOAD_DIR):
raise ValueError("Path traversal detected")""",
cwe_id="CWE-22",
languages=['.py', '.java', '.php', '.js', '.ts', '.rb', '.go']
),
SASTRule(
name="LDAP Injection",
pattern=r"""(?:ldap_search|ldap_bind|search_s|search_ext_s)\s*\([^)]*(?:\+|%|\.format|\{)""",
description="Potential LDAP Injection vulnerability. User input may be used in LDAP queries without proper escaping.",
risk_level=RiskLevel.HIGH,
remediation="""Escape special LDAP characters in user input:
VULNERABLE:
ldap.search_s(base, scope, f"(uid={username})")
SECURE:
from ldap3.utils.conv import escape_filter_chars
safe_username = escape_filter_chars(username)
ldap.search_s(base, scope, f"(uid={safe_username})")""",
cwe_id="CWE-90",
languages=['.py', '.java', '.php', '.cs']
),
# ============================================================
# AUTHENTICATION & SESSION VULNERABILITIES
# Like checking if the locks and keys are secure
# ============================================================
SASTRule(
name="Hardcoded Credentials",
pattern=r"""(?:password|passwd|pwd|secret|api_key|apikey|api_secret|access_token|auth_token|private_key)\s*[=:]\s*['"]\w{8,}['"]""",
description="Hardcoded credentials detected. Sensitive information should not be stored in source code.",
risk_level=RiskLevel.HIGH,
remediation="""Store credentials securely:
VULNERABLE:
password = "MySecretPass123"
api_key = "sk-1234567890abcdef"
SECURE:
import os
password = os.environ.get('DB_PASSWORD')
api_key = os.environ.get('API_KEY')
Use environment variables, secrets managers (AWS Secrets Manager,
HashiCorp Vault), or encrypted configuration files.""",
cwe_id="CWE-798",
languages=['.py', '.java', '.php', '.js', '.ts', '.rb', '.go', '.cs', '.yml', '.yaml', '.json'],
false_positive_patterns=[r'example', r'placeholder', r'your_', r'<.*>', r'xxx', r'\$\{']
),
SASTRule(
name="Weak Password Hashing",
pattern=r"""(?:md5|sha1)\s*\(|hashlib\.(?:md5|sha1)\(|MessageDigest\.getInstance\s*\(\s*['"](MD5|SHA-?1)['"]|password.*=.*(?:md5|sha1)""",
description="Weak hashing algorithm used for passwords. MD5 and SHA1 are cryptographically broken for password storage.",
risk_level=RiskLevel.HIGH,
remediation="""Use strong password hashing algorithms:
VULNERABLE:
hashed = hashlib.md5(password.encode()).hexdigest()
SECURE:
import bcrypt
hashed = bcrypt.hashpw(password.encode(), bcrypt.gensalt())
# Or use argon2 (recommended):
from argon2 import PasswordHasher
ph = PasswordHasher()
hashed = ph.hash(password)
Recommended algorithms: Argon2, bcrypt, scrypt, PBKDF2""",
cwe_id="CWE-328",
languages=['.py', '.java', '.php', '.js', '.ts', '.rb', '.go', '.cs']
),
SASTRule(
name="JWT Without Verification",
pattern=r"""jwt\.decode\s*\([^)]*verify\s*=\s*False|algorithms\s*=\s*\[?\s*['"](none|HS256)['"]|\.decode\(\s*token\s*\)|jsonwebtoken\.decode\s*\(""",
description="JWT token decoded without proper verification or using weak/no algorithm.",
risk_level=RiskLevel.HIGH,
remediation="""Always verify JWT signatures:
VULNERABLE:
payload = jwt.decode(token, verify=False)
payload = jwt.decode(token, algorithms=['none'])
SECURE:
payload = jwt.decode(
token,
SECRET_KEY,
algorithms=['RS256'], # Use asymmetric algorithms
options={'verify_exp': True}
)
Use RS256 or ES256 instead of HS256 for better security.""",
cwe_id="CWE-347",
languages=['.py', '.js', '.ts', '.java', '.go']
),
SASTRule(
name="Session Fixation Risk",
pattern=r"""session\s*\[\s*['"].*['"]\s*\]\s*=.*request\.|req\.session\s*=.*req\.(body|query|params)|session_id\s*=.*(?:GET|POST|request)""",
description="Potential session fixation vulnerability. Session identifiers should be regenerated after authentication.",
risk_level=RiskLevel.MEDIUM,
remediation="""Regenerate session after authentication:
VULNERABLE:
session['user_id'] = user.id # Without regenerating
SECURE (Python/Flask):
from flask import session
session.regenerate() # Regenerate session ID
session['user_id'] = user.id
SECURE (Node.js/Express):
req.session.regenerate((err) => {
req.session.userId = user.id;
});""",
cwe_id="CWE-384",
languages=['.py', '.js', '.ts', '.php', '.java']
),
# ============================================================
# CRYPTOGRAPHIC VULNERABILITIES
# Like checking if the safe is actually secure
# ============================================================
SASTRule(
name="Weak Cryptographic Algorithm",
pattern=r"""(?:DES|RC4|RC2|Blowfish|IDEA)(?:\.|\s|Cipher)|Cipher\.getInstance\s*\(\s*['"](DES|RC4|Blowfish)['"]\)|from\s+Crypto\.Cipher\s+import\s+(DES|Blowfish)|cryptography.*(?:DES|RC4|Blowfish)""",
description="Weak cryptographic algorithm detected. DES, RC4, RC2, and Blowfish are considered insecure.",
risk_level=RiskLevel.HIGH,
remediation="""Use modern cryptographic algorithms:
VULNERABLE:
from Crypto.Cipher import DES
cipher = DES.new(key, DES.MODE_CBC)
SECURE:
from cryptography.fernet import Fernet
# Or for low-level:
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms
cipher = Cipher(algorithms.AES(key), modes.GCM(iv))
Recommended: AES-256-GCM, ChaCha20-Poly1305""",
cwe_id="CWE-327",
languages=['.py', '.java', '.js', '.ts', '.go', '.cs', '.php']
),
SASTRule(
name="Insecure Random Number Generator",
pattern=r"""(?:random\.random|random\.randint|Math\.random|rand\(\)|srand\(\)|mt_rand)\s*\(""",
description="Insecure random number generator used. These are not cryptographically secure and shouldn't be used for security purposes.",
risk_level=RiskLevel.MEDIUM,
remediation="""Use cryptographically secure random generators:
VULNERABLE:
token = ''.join(random.choices(string.ascii_letters, k=32))
SECURE (Python):
import secrets
token = secrets.token_urlsafe(32)
SECURE (JavaScript):
const array = new Uint8Array(32);
crypto.getRandomValues(array);
SECURE (Java):
SecureRandom random = new SecureRandom();""",
cwe_id="CWE-338",
languages=['.py', '.js', '.ts', '.java', '.php', '.c', '.cpp'],
false_positive_patterns=[r'random\.seed', r'shuffle', r'sample']
),
SASTRule(
name="Hardcoded Cryptographic Key",
pattern=r"""(?:key|iv|nonce|salt)\s*[=:]\s*(?:b?['"]\w{16,}['"]|bytes\s*\(\s*['"]\w{16,}['"])""",
description="Hardcoded cryptographic key detected. Encryption keys should never be stored in source code.",
risk_level=RiskLevel.CRITICAL,
remediation="""Store cryptographic keys securely:
VULNERABLE:
key = b'ThisIsASecretKey1234567890123456'
SECURE:
import os
key = os.environ.get('ENCRYPTION_KEY').encode()
# Or use a key management system:
from aws_encryption_sdk import KMSMasterKeyProvider
key_provider = KMSMasterKeyProvider(key_ids=[KEY_ARN])
Best practice: Use Hardware Security Modules (HSM) or
Key Management Services (AWS KMS, Azure Key Vault).""",
cwe_id="CWE-321",
languages=['.py', '.java', '.js', '.ts', '.go', '.cs', '.php']
),
# ============================================================
# INSECURE DESERIALIZATION
# Like accepting packages without checking what's inside
# ============================================================
SASTRule(
name="Insecure Deserialization (Python)",
pattern=r"""pickle\.loads?\s*\(|yaml\.(?:unsafe_)?load\s*\([^)]*(?!Loader\s*=\s*yaml\.SafeLoader)|marshal\.loads?\s*\(|shelve\.open\s*\(""",
description="Insecure deserialization detected. Deserializing untrusted data can lead to remote code execution.",
risk_level=RiskLevel.CRITICAL,
remediation="""Use safe deserialization methods:
VULNERABLE:
data = pickle.loads(user_input)
config = yaml.load(file)
SECURE:
import json
data = json.loads(user_input) # JSON is safe
# For YAML, always use SafeLoader:
config = yaml.load(file, Loader=yaml.SafeLoader)
# Or better:
config = yaml.safe_load(file)
Never deserialize untrusted data with pickle/marshal.""",
cwe_id="CWE-502",
languages=['.py']
),
SASTRule(
name="Insecure Deserialization (Java)",
pattern=r"""ObjectInputStream\s*\(|readObject\s*\(\)|XMLDecoder\s*\(|XStream\.fromXML\s*\(|JSON\.parse\s*\(.*\)\.class""",
description="Insecure deserialization detected in Java. Can lead to remote code execution.",
risk_level=RiskLevel.CRITICAL,
remediation="""Validate and filter deserialization:
VULNERABLE:
ObjectInputStream ois = new ObjectInputStream(input);
Object obj = ois.readObject();
SECURE:
// Use a whitelist filter
ObjectInputFilter filter = ObjectInputFilter.Config.createFilter(
"com.myapp.SafeClass;!*"
);
ois.setObjectInputFilter(filter);
// Or use JSON/Protocol Buffers instead of Java serialization
Consider: Jackson with @JsonTypeInfo restrictions,
or Protocol Buffers for type-safe serialization.""",
cwe_id="CWE-502",
languages=['.java']
),
SASTRule(
name="Insecure Deserialization (JavaScript)",
pattern=r"""(?:eval|Function)\s*\(\s*(?:JSON\.parse|atob|decodeURIComponent)|node-serialize|serialize-javascript.*(?:eval|Function)|unserialize\s*\(""",
description="Insecure deserialization in JavaScript. Eval of untrusted data can lead to code execution.",
risk_level=RiskLevel.CRITICAL,
remediation="""Never eval deserialized data:
VULNERABLE:
eval(JSON.parse(userInput).code);
const obj = serialize.unserialize(userInput);
SECURE:
const data = JSON.parse(userInput);
// Validate structure before use
if (typeof data.name !== 'string') {
throw new Error('Invalid data');
}
Avoid node-serialize and similar libraries with
eval-based deserialization.""",
cwe_id="CWE-502",
languages=['.js', '.ts']
),
# ============================================================
# INFORMATION DISCLOSURE
# Like leaving sensitive documents in public view
# ============================================================
SASTRule(
name="Debug Mode Enabled",
pattern=r"""(?:DEBUG|debug)\s*[=:]\s*(?:True|true|1|['"](true|on|yes)['"])|app\.run\s*\([^)]*debug\s*=\s*True|FLASK_DEBUG\s*=\s*1""",
description="Debug mode appears to be enabled. This can expose sensitive information in production.",
risk_level=RiskLevel.MEDIUM,
remediation="""Disable debug mode in production:
VULNERABLE:
app.run(debug=True)
DEBUG = True
SECURE:
import os
DEBUG = os.environ.get('DEBUG', 'False').lower() == 'true'
# In production:
app.run(debug=False)
Use environment variables to control debug settings.""",
cwe_id="CWE-215",
languages=['.py', '.js', '.ts', '.java', '.php', '.rb', '.yml', '.yaml', '.json'],
false_positive_patterns=[r'#.*DEBUG', r'//.*DEBUG', r'debug.*log']
),
SASTRule(
name="Sensitive Data in Logs",
pattern=r"""(?:log(?:ger)?\.(?:info|debug|warn|error|critical)|print|console\.log|System\.out\.print)\s*\([^)]*(?:password|secret|token|key|credit.?card|ssn|api.?key)""",
description="Sensitive data may be written to logs. This can expose credentials and personal information.",
risk_level=RiskLevel.MEDIUM,
remediation="""Never log sensitive information:
VULNERABLE:
logger.info(f"User login: {username}, password: {password}")
SECURE:
logger.info(f"User login: {username}")
# Or mask sensitive data:
logger.info(f"API key: {api_key[:4]}****")
Use structured logging with sensitive field filtering.""",
cwe_id="CWE-532",
languages=['.py', '.java', '.js', '.ts', '.rb', '.go', '.php']
),
SASTRule(
name="Stack Trace Exposure",
pattern=r"""(?:printStackTrace|traceback\.print_exc|console\.trace|e\.stack|err\.stack)\s*\(?\)?|except.*?:?\s*pass|rescue\s*=>\s*nil""",
description="Stack traces may be exposed to users or exceptions silently ignored.",
risk_level=RiskLevel.LOW,
remediation="""Handle exceptions properly without exposing internals:
VULNERABLE:
except Exception as e:
return str(e) # Exposes internal details
SECURE:
except Exception as e:
logger.exception("Operation failed") # Log internally
return {"error": "An error occurred"} # Generic message
Never expose full stack traces to end users.""",
cwe_id="CWE-209",
languages=['.py', '.java', '.js', '.ts', '.rb', '.php']
),
# ============================================================
# SECURITY MISCONFIGURATION
# Like leaving doors unlocked or windows open
# ============================================================
SASTRule(
name="CORS Wildcard",
pattern=r"""(?:Access-Control-Allow-Origin|cors)\s*[=:]\s*['"]\*['"]|\.allowedOrigins\s*\(\s*['"]\*['"]|cors\s*\(\s*\{[^}]*origin\s*:\s*(?:true|['"]\*['"])""",
description="CORS configured to allow all origins. This can enable cross-site request attacks.",
risk_level=RiskLevel.MEDIUM,
remediation="""Restrict CORS to specific trusted origins:
VULNERABLE:
Access-Control-Allow-Origin: *
cors({ origin: '*' })
SECURE:
cors({
origin: ['https://trusted-site.com'],
methods: ['GET', 'POST'],
credentials: true
})
Never use wildcard CORS with credentials.""",
cwe_id="CWE-942",
languages=['.py', '.java', '.js', '.ts', '.php', '.rb', '.go']
),
SASTRule(
name="SSL/TLS Verification Disabled",
pattern=r"""verify\s*[=:]\s*False|VERIFY_SSL\s*=\s*False|ssl\s*[=:]\s*False|rejectUnauthorized\s*[=:]\s*false|InsecureSkipVerify\s*[=:]\s*true|CURLOPT_SSL_VERIFYPEER.*false""",
description="SSL/TLS certificate verification is disabled. This makes the application vulnerable to man-in-the-middle attacks.",
risk_level=RiskLevel.HIGH,
remediation="""Always verify SSL/TLS certificates:
VULNERABLE:
requests.get(url, verify=False)
https.request({rejectUnauthorized: false})
SECURE:
requests.get(url, verify=True)
# Or with custom CA:
requests.get(url, verify='/path/to/ca-bundle.crt')
If you need to use self-signed certs in development,
use environment-based configuration.""",
cwe_id="CWE-295",
languages=['.py', '.java', '.js', '.ts', '.go', '.php', '.rb']
),
SASTRule(
name="Insecure HTTP",
pattern=r"""['"](http://(?!localhost|127\.0\.0\.1|0\.0\.0\.0)[^'"]+)['"]""",
description="Insecure HTTP URL detected. Data transmitted over HTTP can be intercepted.",
risk_level=RiskLevel.LOW,
remediation="""Use HTTPS for all external communications:
VULNERABLE:
api_url = "http://api.example.com/data"
SECURE:
api_url = "https://api.example.com/data"
Configure HSTS (HTTP Strict Transport Security) on your servers.""",
cwe_id="CWE-319",
languages=['.py', '.java', '.js', '.ts', '.go', '.php', '.rb', '.yml', '.yaml', '.json'],
false_positive_patterns=[r'#.*http://', r'//.*http://', r'example\.com', r'schema.*http://']
),
SASTRule(
name="Missing Security Headers",
pattern=r"""(?:Content-Security-Policy|X-Frame-Options|X-Content-Type-Options|Strict-Transport-Security)\s*[=:]\s*['"]['""]|no_header|disable.*header""",
description="Security headers may be missing or disabled. This can enable various attacks.",
risk_level=RiskLevel.LOW,
remediation="""Configure security headers:
Add these headers to your responses:
Content-Security-Policy: default-src 'self'
X-Frame-Options: DENY
X-Content-Type-Options: nosniff
Strict-Transport-Security: max-age=31536000; includeSubDomains
X-XSS-Protection: 1; mode=block
Use helmet.js (Node), django-csp, or similar libraries.""",
cwe_id="CWE-693",
languages=['.py', '.java', '.js', '.ts', '.php', '.rb']
),
# ============================================================
# XML VULNERABILITIES
# XML parsers can be tricked into dangerous behavior
# ============================================================
SASTRule(
name="XXE (XML External Entity)",
pattern=r"""(?:xml\.etree|lxml|xml\.dom|xml\.sax|XMLReader|DocumentBuilder|SAXParser|XMLParser).*(?:parse|read|load)|<!ENTITY|SYSTEM\s+['""]|resolve_entities\s*=\s*True""",
description="Potential XML External Entity (XXE) vulnerability. XML parsers should disable external entity processing.",
risk_level=RiskLevel.HIGH,
remediation="""Disable external entity processing:
VULNERABLE (Python):
tree = etree.parse(xml_file)
SECURE (Python):
parser = etree.XMLParser(resolve_entities=False, no_network=True)
tree = etree.parse(xml_file, parser)
SECURE (Java):
DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
dbf.setFeature("http://apache.org/xml/features/disallow-doctype-decl", true);
dbf.setExpandEntityReferences(false);""",
cwe_id="CWE-611",
languages=['.py', '.java', '.php', '.cs', '.rb']
),
# ============================================================
# SERVER-SIDE REQUEST FORGERY (SSRF)
# Like being tricked into making calls you shouldn't
# ============================================================
SASTRule(
name="Server-Side Request Forgery (SSRF)",
pattern=r"""(?:requests\.get|urllib\.request\.urlopen|http\.get|fetch|axios\.get|HttpClient)\s*\([^)]*(?:request\.|req\.|params\.|query\.|body\.|input|GET|POST)""",
description="Potential SSRF vulnerability. User input may control server-side HTTP requests.",
risk_level=RiskLevel.HIGH,
remediation="""Validate and whitelist URLs:
VULNERABLE:
url = request.args.get('url')
response = requests.get(url)
SECURE:
from urllib.parse import urlparse
ALLOWED_HOSTS = ['api.trusted.com', 'data.trusted.com']
parsed = urlparse(url)
if parsed.hostname not in ALLOWED_HOSTS:
raise ValueError("URL not allowed")
if parsed.scheme not in ['http', 'https']:
raise ValueError("Invalid scheme")
# Block internal IPs
if is_internal_ip(parsed.hostname):
raise ValueError("Internal URLs not allowed")""",
cwe_id="CWE-918",
languages=['.py', '.java', '.js', '.ts', '.go', '.php', '.rb']
),
# ============================================================
# ADDITIONAL COMMON VULNERABILITIES
# ============================================================
SASTRule(
name="Unsafe Regex (ReDoS)",
pattern=r"""(?:re\.compile|new\s+RegExp|regex|pattern)\s*\([^)]*(?:\+\*|\*\+|\.+\*|\.+\+|\(\.\*\)|\(\.\+\)|(?:\[[^\]]*\]){2,}\*|\{\d+,\}\*|\{\d+,\}\+)""",
description="Potentially vulnerable regular expression that could cause ReDoS (Regular Expression Denial of Service).",
risk_level=RiskLevel.MEDIUM,
remediation="""Avoid nested quantifiers in regex:
VULNERABLE:
pattern = re.compile(r'(a+)+b') # Catastrophic backtracking
SECURE:
pattern = re.compile(r'a+b') # Simple, efficient
# Or use atomic groups/possessive quantifiers where supported
# Set timeouts for regex operations:
import regex
regex.match(pattern, text, timeout=1.0)""",
cwe_id="CWE-1333",
languages=['.py', '.java', '.js', '.ts', '.go', '.php', '.rb']
),
SASTRule(
name="Prototype Pollution",
pattern=r"""(?:Object\.assign|_\.merge|_\.extend|_\.defaults|jQuery\.extend|angular\.(?:merge|extend))\s*\([^,]*,\s*(?:req\.|request\.|params\.|body\.|input)|\[['"]__proto__['"]\]|\[['"]constructor['"]\]\.prototype""",
description="Potential prototype pollution vulnerability. Merging user input into objects can modify Object.prototype.",
risk_level=RiskLevel.HIGH,
remediation="""Validate and sanitize object keys:
VULNERABLE:
Object.assign(target, req.body);
_.merge(config, userInput);
SECURE:
// Use Object.create(null) for prototype-less objects
const safeObj = Object.create(null);
// Whitelist allowed properties
const allowed = ['name', 'email'];
for (const key of allowed) {
if (key in userInput) {
safeObj[key] = userInput[key];
}
}
// Or use libraries like 'lodash' with safeguards""",
cwe_id="CWE-1321",
languages=['.js', '.ts']
),
SASTRule(
name="Open Redirect",
pattern=r"""(?:redirect|res\.redirect|header\s*\(\s*['""]Location|window\.location|document\.location)\s*[=(]\s*(?:req\.|request\.|params\.|query\.|input|GET|POST|\$_)""",
description="Potential open redirect vulnerability. User input controls redirect destination.",
risk_level=RiskLevel.MEDIUM,
remediation="""Validate redirect URLs:
VULNERABLE:
redirect_url = request.args.get('next')
return redirect(redirect_url)
SECURE:
from urllib.parse import urlparse
redirect_url = request.args.get('next', '/')
parsed = urlparse(redirect_url)
# Only allow relative URLs or specific domains
if parsed.netloc and parsed.netloc != 'mysite.com':
redirect_url = '/'
return redirect(redirect_url)""",
cwe_id="CWE-601",
languages=['.py', '.java', '.js', '.ts', '.php', '.rb']
),
SASTRule(
name="Mass Assignment",
pattern=r"""(?:\.update_attributes|\.update\(|\.create\(|\.build\(|Model\.create|\.save\()\s*\(?[^)]*(?:req\.|request\.|params\[|body\[|:permit\s*\(\s*!)""",
description="Potential mass assignment vulnerability. User input may modify unintended model attributes.",
risk_level=RiskLevel.MEDIUM,
remediation="""Whitelist allowed attributes:
VULNERABLE (Rails):
User.create(params[:user])
SECURE (Rails):
User.create(params.require(:user).permit(:name, :email))
VULNERABLE (Django):
User.objects.create(**request.POST)
SECURE (Django):
User.objects.create(
name=request.POST.get('name'),
email=request.POST.get('email')
)
Always explicitly specify which fields can be mass-assigned.""",
cwe_id="CWE-915",
languages=['.py', '.rb', '.java', '.js', '.ts', '.php']
),
]
def scan_file(self, file_path: str) -> List[Vulnerability]:
"""
Scan a single file for vulnerabilities.
Like running a specific diagnostic test on one tissue sample.
"""
vulnerabilities = []
try:
file_ext = Path(file_path).suffix.lower()
if file_ext not in self.file_extensions:
return vulnerabilities
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
code = f.read()
for rule in self.rules:
matches = rule.matches(code, file_ext)
for line_num, snippet in matches:
vuln = Vulnerability(
name=rule.name,
description=rule.description,
file_path=file_path,
line_number=line_num,
code_snippet=snippet,
risk_level=rule.risk_level,
remediation=rule.remediation,
cwe_id=rule.cwe_id
)
vulnerabilities.append(vuln)
except Exception as e:
# Log but don't fail on individual file errors
pass
return vulnerabilities
def scan_directory(self, directory: str, max_workers: int = 8, use_parallel: bool = True) -> Tuple[List[Vulnerability], int]:
"""
Recursively scan a directory with optional parallel processing.
Like performing a full-body scan.
Args:
directory: Path to directory to scan
max_workers: Number of parallel workers (default: 8)
use_parallel: Whether to use parallel processing (default: True)
Returns:
Tuple of (vulnerabilities, files_scanned)
"""
vulnerabilities = []
files_scanned = 0
# Collect all files to scan
files_to_scan = []
for root, dirs, files in os.walk(directory):
# Skip unwanted directories
dirs[:] = [d for d in dirs if d not in self.skip_dirs]
for file in files:
file_path = os.path.join(root, file)
file_ext = Path(file_path).suffix.lower()
if file_ext in self.file_extensions:
files_to_scan.append(file_path)
if not use_parallel or len(files_to_scan) <= 1:
# Sequential processing (original behavior)
for file_path in files_to_scan:
files_scanned += 1
vulns = self.scan_file(file_path)
vulnerabilities.extend(vulns)
else:
# Parallel processing using ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all scan jobs
future_to_file = {
executor.submit(self.scan_file, file_path): file_path
for file_path in files_to_scan
}
# Collect results as they complete
for future in as_completed(future_to_file):
file_path = future_to_file[future]
try:
vulns = future.result()
vulnerabilities.extend(vulns)
files_scanned += 1
except Exception as e:
# Log error but continue with other files
print(f"Error scanning {file_path}: {e}")
files_scanned += 1 # Count as scanned even if error
return vulnerabilities, files_scanned
class NVDClient:
"""
NIST National Vulnerability Database Client
Analogy: This is like searching a medical journal database
- Searches for known vulnerabilities (diseases) by keyword
- Retrieves detailed information including severity scores
- Provides references to official documentation
The NVD contains over 200,000 known vulnerabilities (CVEs)
with detailed descriptions, severity scores, and references.
"""
BASE_URL = "https://services.nvd.nist.gov/rest/json/cves/2.0"
def __init__(self, api_key: Optional[str] = None):
"""
Initialize NVD client.
Args:
api_key: Optional NVD API key for higher rate limits.
Get one free at: https://nvd.nist.gov/developers/request-an-api-key
"""
self.api_key = api_key
self.rate_limit_delay = 0.6 if api_key else 6.0 # NVD rate limits
async def search_vulnerabilities(
self,
keyword: Optional[str] = None,
cwe_id: Optional[str] = None,
severity: Optional[str] = None,
limit: int = 20
) -> List[Dict]:
"""
Search the NVD for vulnerabilities.
Args:
keyword: Search term (e.g., "sql injection python")
cwe_id: CWE ID to filter by (e.g., "CWE-89")
severity: Severity level (LOW, MEDIUM, HIGH, CRITICAL)
limit: Maximum results to return
Returns:
List of CVE entries with details
"""
params = {"resultsPerPage": min(limit, 100)}
if keyword:
params["keywordSearch"] = keyword
if cwe_id:
# Format: CWE-89 -> CWE-89
params["cweId"] = cwe_id
if severity:
params["cvssV3Severity"] = severity.upper()
headers = {}
if self.api_key:
headers["apiKey"] = self.api_key
try:
async with aiohttp.ClientSession() as session:
async with session.get(
self.BASE_URL,
params=params,
headers=headers,
timeout=aiohttp.ClientTimeout(total=30)
) as response:
if response.status == 200:
data = await response.json()
return self._parse_results(data)
elif response.status == 403:
return [{"error": "NVD API rate limited. Consider using an API key."}]
else:
return [{"error": f"NVD API error: {response.status}"}]
except asyncio.TimeoutError:
return [{"error": "NVD API request timed out"}]
except Exception as e:
return [{"error": f"NVD API error: {str(e)}"}]
def _parse_results(self, data: Dict) -> List[Dict]:
"""Parse NVD API response into a cleaner format."""
results = []
for vuln in data.get("vulnerabilities", []):
cve = vuln.get("cve", {})
cve_id = cve.get("id", "Unknown")
# Get description
descriptions = cve.get("descriptions", [])
description = next(
(d["value"] for d in descriptions if d.get("lang") == "en"),
"No description available"
)
# Get CVSS score and severity
metrics = cve.get("metrics", {})
cvss_data = None
severity = "UNKNOWN"
score = 0.0
# Try CVSS v3.1, then v3.0, then v2.0
for version in ["cvssMetricV31", "cvssMetricV30", "cvssMetricV2"]:
if version in metrics and metrics[version]:
cvss_data = metrics[version][0]
if "cvssData" in cvss_data:
score = cvss_data["cvssData"].get("baseScore", 0)
severity = cvss_data["cvssData"].get("baseSeverity", "UNKNOWN")
break
# Get references
references = [
ref.get("url") for ref in cve.get("references", [])[:5]
]
# Get CWE IDs
cwes = []
for weakness in cve.get("weaknesses", []):
for desc in weakness.get("description", []):
if desc.get("value", "").startswith("CWE-"):
cwes.append(desc["value"])
results.append({
"cve_id": cve_id,
"description": description[:500] + "..." if len(description) > 500 else description,
"severity": severity,
"cvss_score": score,
"cwes": cwes,
"references": references,
"published": cve.get("published", "Unknown"),
"last_modified": cve.get("lastModified", "Unknown")
})
return results
async def get_cve_details(self, cve_id: str) -> Optional[Dict]:
"""
Get detailed information about a specific CVE.
Args:
cve_id: CVE identifier (e.g., "CVE-2021-44228")
Returns:
Detailed CVE information or None if not found
"""
params = {"cveId": cve_id}
headers = {}
if self.api_key:
headers["apiKey"] = self.api_key
try:
async with aiohttp.ClientSession() as session:
async with session.get(
self.BASE_URL,
params=params,
headers=headers,
timeout=aiohttp.ClientTimeout(total=30)
) as response:
if response.status == 200:
data = await response.json()
results = self._parse_results(data)
return results[0] if results else None
return None
except Exception:
return None
async def find_related_cves(self, cwe_id: str, limit: int = 10) -> List[Dict]:
"""
Find CVEs related to a specific CWE.
This helps answer "What known attacks use this vulnerability type?"
Args:
cwe_id: CWE identifier (e.g., "CWE-89")
limit: Maximum results
Returns:
List of related CVEs
"""
return await self.search_vulnerabilities(cwe_id=cwe_id, limit=limit)
class WebAppScanner:
"""
Web Application Scanner
Analogy: Like a physical security inspector
- Checks doors and windows (endpoints)
- Tests locks (authentication)
- Looks for signs of vulnerability
This performs basic web security checks without being intrusive.
For full web app testing, specialized tools like OWASP ZAP are recommended.
"""
def __init__(self):
self.common_paths = [
# Admin paths
"/admin", "/administrator", "/admin.php", "/admin.html",
"/wp-admin", "/cpanel", "/phpmyadmin",
# Sensitive files
"/.git/config", "/.env", "/config.php", "/wp-config.php",
"/.htaccess", "/web.config", "/robots.txt", "/sitemap.xml",
# Backup files
"/backup.zip", "/backup.sql", "/db.sql", "/database.sql",
# API endpoints
"/api", "/api/v1", "/graphql", "/swagger.json", "/openapi.json",
# Debug/test
"/debug", "/test", "/phpinfo.php", "/info.php",
]
self.security_headers = [
"Content-Security-Policy",
"X-Frame-Options",
"X-Content-Type-Options",
"X-XSS-Protection",
"Strict-Transport-Security",
"Referrer-Policy",
"Permissions-Policy"
]
async def scan_url(self, url: str) -> List[Vulnerability]:
"""
Perform security scan on a web application.
Args:
url: Target URL (e.g., "https://example.com")
Returns:
List of discovered vulnerabilities
"""
vulnerabilities = []
# Normalize URL
if not url.startswith(('http://', 'https://')):
url = 'https://' + url
parsed = urlparse(url)
base_url = f"{parsed.scheme}://{parsed.netloc}"
async with aiohttp.ClientSession() as session:
# Check security headers
header_vulns = await self._check_security_headers(session, base_url)
vulnerabilities.extend(header_vulns)
# Check for exposed sensitive files
exposure_vulns = await self._check_sensitive_paths(session, base_url)
vulnerabilities.extend(exposure_vulns)
# Check HTTPS configuration
https_vulns = await self._check_https(session, url)
vulnerabilities.extend(https_vulns)
# Check for common vulnerabilities in responses
content_vulns = await self._check_response_content(session, base_url)
vulnerabilities.extend(content_vulns)
return vulnerabilities
async def _check_security_headers(
self, session: aiohttp.ClientSession, url: str
) -> List[Vulnerability]:
"""Check for missing or misconfigured security headers."""
vulnerabilities = []
try:
async with session.head(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
headers = response.headers
for header in self.security_headers:
if header not in headers:
vuln = Vulnerability(
name=f"Missing Security Header: {header}",
description=f"The {header} header is not set. This header helps protect against various attacks.",
file_path=url,
line_number=0,
code_snippet=f"Response headers do not include {header}",
risk_level=RiskLevel.LOW if header != "Content-Security-Policy" else RiskLevel.MEDIUM,
remediation=self._get_header_remediation(header),
cwe_id="CWE-693"
)
vulnerabilities.append(vuln)
# Check for server version disclosure
if "Server" in headers and any(v in headers["Server"].lower() for v in ["apache/", "nginx/", "iis/"]):
vuln = Vulnerability(
name="Server Version Disclosure",
description=f"Server header reveals version information: {headers['Server']}",
file_path=url,
line_number=0,
code_snippet=f"Server: {headers['Server']}",
risk_level=RiskLevel.INFO,
remediation="Configure your web server to hide version information. In Apache, use 'ServerTokens Prod'. In Nginx, use 'server_tokens off'.",
cwe_id="CWE-200"
)
vulnerabilities.append(vuln)
except Exception:
pass
return vulnerabilities
async def _check_sensitive_paths(
self, session: aiohttp.ClientSession, base_url: str
) -> List[Vulnerability]:
"""Check for exposed sensitive files and directories."""
vulnerabilities = []
async def check_path(path: str):
try:
url = f"{base_url}{path}"
async with session.get(
url,
timeout=aiohttp.ClientTimeout(total=5),
allow_redirects=False
) as response:
if response.status == 200:
return path, response.status
return None
except Exception:
return None
# Check paths concurrently
tasks = [check_path(path) for path in self.common_paths]
results = await asyncio.gather(*tasks)
for result in results:
if result:
path, status = result
risk = RiskLevel.HIGH if any(
s in path for s in ['.git', '.env', 'config', 'backup', 'sql']
) else RiskLevel.MEDIUM
vuln = Vulnerability(
name=f"Exposed Sensitive Path: {path}",
description=f"The path {path} is accessible and may expose sensitive information.",
file_path=f"{base_url}{path}",
line_number=0,
code_snippet=f"HTTP {status} returned for {path}",
risk_level=risk,
remediation=f"Restrict access to {path} using web server configuration. Add authentication or remove from public access.",
cwe_id="CWE-538"
)
vulnerabilities.append(vuln)
return vulnerabilities
async def _check_https(
self, session: aiohttp.ClientSession, url: str
) -> List[Vulnerability]:
"""Check HTTPS configuration."""
vulnerabilities = []
parsed = urlparse(url)
if parsed.scheme == "http":
vuln = Vulnerability(
name="Insecure HTTP Connection",
description="The target is using HTTP instead of HTTPS. All data transmitted is unencrypted.",
file_path=url,
line_number=0,
code_snippet=f"URL scheme: {parsed.scheme}",
risk_level=RiskLevel.HIGH,
remediation="Enable HTTPS with a valid TLS certificate. Consider using Let's Encrypt for free certificates. Configure HSTS to prevent downgrade attacks.",
cwe_id="CWE-319"
)
vulnerabilities.append(vuln)
return vulnerabilities
async def _check_response_content(
self, session: aiohttp.ClientSession, base_url: str
) -> List[Vulnerability]:
"""Check response content for potential vulnerabilities."""
vulnerabilities = []
try:
async with session.get(
base_url,
timeout=aiohttp.ClientTimeout(total=10)
) as response:
if response.status == 200:
content = await response.text()
# Check for error messages that reveal information
error_patterns = [
(r"mysql_error|mysqli_error|pg_error", "Database Error Disclosure", "CWE-209"),
(r"stack\s*trace|traceback|exception.*at\s+line", "Stack Trace Disclosure", "CWE-209"),
(r"debug\s*=\s*true|debug_mode|development_mode", "Debug Mode Enabled", "CWE-215"),
(r"<!--.*(?:password|api.?key|secret).*-->", "Sensitive Data in Comments", "CWE-615"),
]
for pattern, name, cwe in error_patterns:
if re.search(pattern, content, re.IGNORECASE):
vuln = Vulnerability(
name=name,
description=f"The response contains {name.lower()} which may reveal sensitive information.",
file_path=base_url,
line_number=0,
code_snippet=f"Pattern detected: {pattern}",
risk_level=RiskLevel.MEDIUM,
remediation=f"Remove {name.lower()} from production responses. Configure error handling to show generic messages.",
cwe_id=cwe
)
vulnerabilities.append(vuln)
except Exception:
pass
return vulnerabilities
def _get_header_remediation(self, header: str) -> str:
"""Get specific remediation advice for missing headers."""
remediations = {
"Content-Security-Policy": "Add CSP header to control resource loading. Start with: Content-Security-Policy: default-src 'self'",
"X-Frame-Options": "Add: X-Frame-Options: DENY (or SAMEORIGIN if you need framing)",
"X-Content-Type-Options": "Add: X-Content-Type-Options: nosniff",
"X-XSS-Protection": "Add: X-XSS-Protection: 1; mode=block (note: deprecated in favor of CSP)",
"Strict-Transport-Security": "Add: Strict-Transport-Security: max-age=31536000; includeSubDomains",
"Referrer-Policy": "Add: Referrer-Policy: strict-origin-when-cross-origin",
"Permissions-Policy": "Add: Permissions-Policy: geolocation=(), microphone=(), camera=()"
}
return remediations.get(header, f"Configure the {header} header appropriately.")
class SecurityChecker:
"""
Main Security Checker - Orchestrates all scanning capabilities.
Analogy: This is like a complete medical center
- Diagnostic imaging (SAST)
- Medical database (NVD)
- Physical examination (Web Scanner)
- Report generation (Results)
Usage:
checker = SecurityChecker()
# Scan local code
result = await checker.scan_local("/path/to/project")
# Scan web app
result = await checker.scan_web("https://example.com")
# Generate report
report = checker.generate_report(result)
"""
def __init__(self, nvd_api_key: Optional[str] = None):
self.sast_engine = SASTEngine()
self.nvd_client = NVDClient(api_key=nvd_api_key)
self.web_scanner = WebAppScanner()
async def scan_local(self, path: str, include_nvd: bool = True, max_workers: int = 8, use_parallel: bool = True) -> ScanResult:
"""
Scan a local directory for vulnerabilities.
Args:
path: Path to directory or file
include_nvd: Whether to enrich results with NVD data
max_workers: Number of parallel workers for file scanning (default: 8)
use_parallel: Whether to use parallel processing (default: True)
Returns:
ScanResult with all findings
"""
result = ScanResult(
target=path,
scan_type="local",
start_time=datetime.now()
)
if not os.path.exists(path):
result.errors.append(f"Path does not exist: {path}")
result.end_time = datetime.now()
return result
# Run SAST scan
if os.path.isfile(path):
vulns = self.sast_engine.scan_file(path)
result.files_scanned = 1
else:
vulns, files_scanned = self.sast_engine.scan_directory(
path,
max_workers=max_workers,
use_parallel=use_parallel
)
result.files_scanned = files_scanned
# Enrich with NVD data if requested
if include_nvd and vulns:
vulns = await self._enrich_with_nvd(vulns)
result.vulnerabilities = vulns
result.end_time = datetime.now()
return result
async def scan_web(self, url: str, include_nvd: bool = True) -> ScanResult:
"""
Scan a web application for vulnerabilities.
Args:
url: Target URL
include_nvd: Whether to enrich results with NVD data
Returns:
ScanResult with all findings
"""
result = ScanResult(
target=url,
scan_type="web",
start_time=datetime.now()
)
try:
vulns = await self.web_scanner.scan_url(url)
if include_nvd and vulns:
vulns = await self._enrich_with_nvd(vulns)
result.vulnerabilities = vulns
result.files_scanned = 1 # One URL scanned
except Exception as e:
result.errors.append(str(e))
result.end_time = datetime.now()
return result
async def _enrich_with_nvd(
self, vulnerabilities: List[Vulnerability]
) -> List[Vulnerability]:
"""
Enrich vulnerability findings with NVD data.
This adds related CVEs to each finding, showing real-world
examples of the vulnerability being exploited.
"""
# Group by CWE to reduce API calls
cwe_groups = {}
for vuln in vulnerabilities:
if vuln.cwe_id:
if vuln.cwe_id not in cwe_groups:
cwe_groups[vuln.cwe_id] = []
cwe_groups[vuln.cwe_id].append(vuln)
# Fetch CVEs for each CWE
for cwe_id, vuln_list in cwe_groups.items():
try:
cves = await self.nvd_client.find_related_cves(cwe_id, limit=5)
cve_ids = [cve.get("cve_id") for cve in cves if "error" not in cve]
for vuln in vuln_list:
vuln.cve_ids = cve_ids[:3] # Add top 3 related CVEs
# Rate limiting
await asyncio.sleep(self.nvd_client.rate_limit_delay)
except Exception:
pass
return vulnerabilities
async def search_nvd(
self,
keyword: Optional[str] = None,
cwe_id: Optional[str] = None,
severity: Optional[str] = None
) -> List[Dict]:
"""
Search the NVD directly.
Useful for researching specific vulnerabilities.
"""
return await self.nvd_client.search_vulnerabilities(
keyword=keyword,
cwe_id=cwe_id,
severity=severity
)
def generate_report(
self,
result: ScanResult,
format: str = "text"
) -> str:
"""
Generate a human-readable report.
Args:
result: ScanResult from a scan
format: Output format ("text", "json", "html")
Returns:
Formatted report string
"""
if format == "json":
return self._generate_json_report(result)
elif format == "markdown":
return self._generate_markdown_report(result)
else:
return self._generate_text_report(result)
def _generate_text_report(self, result: ScanResult) -> str:
"""Generate plain text report."""
lines = [
"=" * 70,
"SECURITY SCAN REPORT",
"=" * 70,
"",
f"Target: {result.target}",
f"Scan Type: {result.scan_type.upper()}",
f"Start Time: {result.start_time.strftime('%Y-%m-%d %H:%M:%S')}",
f"End Time: {result.end_time.strftime('%Y-%m-%d %H:%M:%S') if result.end_time else 'N/A'}",
f"Files Scanned: {result.files_scanned}",
"",
]
# Summary
summary = result.summary()
lines.extend([
"-" * 70,
"SUMMARY",
"-" * 70,
f"Total Vulnerabilities: {summary['total_vulnerabilities']}",
"",
"By Severity:",
])
for severity, count in summary["by_severity"].items():
if count > 0:
lines.append(f" {severity}: {count}")
lines.append("")
if result.errors:
lines.extend([
"-" * 70,
"ERRORS",
"-" * 70,
])
for error in result.errors:
lines.append(f" • {error}")
lines.append("")
# Vulnerabilities by severity
if result.vulnerabilities:
lines.extend([
"-" * 70,
"DETAILED FINDINGS",
"-" * 70,
"",
])
# Sort by severity
severity_order = {
RiskLevel.CRITICAL: 0,
RiskLevel.HIGH: 1,
RiskLevel.MEDIUM: 2,
RiskLevel.LOW: 3,
RiskLevel.INFO: 4
}
sorted_vulns = sorted(
result.vulnerabilities,
key=lambda v: severity_order.get(v.risk_level, 5)
)
for i, vuln in enumerate(sorted_vulns, 1):
lines.extend([
f"[{i}] {vuln.name}",
f" Severity: {vuln.risk_level.value}",
f" Location: {vuln.file_path}:{vuln.line_number}",
f" CWE: {vuln.cwe_id or 'N/A'}",
"",
f" Description:",
f" {vuln.description}",
"",
f" Code:",
" " + "-" * 40,
])
for line in vuln.code_snippet.split('\n'):
lines.append(f" {line}")
lines.extend([
" " + "-" * 40,
"",
f" Remediation:",
])
if vuln.remediation and vuln.remediation != "No known solution":
for line in vuln.remediation.split('\n'):
lines.append(f" {line}")
else:
lines.append(" No known solution")
if vuln.cve_ids:
lines.extend([
"",
f" Related CVEs: {', '.join(vuln.cve_ids)}",
])
lines.extend(["", ""])
lines.extend([
"=" * 70,
"END OF REPORT",
"=" * 70,
])
return "\n".join(lines)
def _generate_json_report(self, result: ScanResult) -> str:
"""Generate JSON report."""
report = {
"target": result.target,
"scan_type": result.scan_type,
"start_time": result.start_time.isoformat(),
"end_time": result.end_time.isoformat() if result.end_time else None,
"summary": result.summary(),
"vulnerabilities": [v.to_dict() for v in result.vulnerabilities],
"errors": result.errors
}
return json.dumps(report, indent=2)
def _generate_markdown_report(self, result: ScanResult) -> str:
"""Generate Markdown report optimized for vibe-coding platforms."""
summary = result.summary()
severity_order = ["CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO"]
lines = []
lines.append("# Security Scan Report\n")
lines.append(f"**Target:** `{result.target}` ")
lines.append(f"**Scan Type:** {result.scan_type.upper()} ")
lines.append(f"**Date:** {result.start_time.strftime('%Y-%m-%d %H:%M:%S')} ")
lines.append(f"**Files Scanned:** {result.files_scanned} ")
# Summary line
counts = []
for sev in severity_order:
count = summary['by_severity'].get(sev, 0)
if count > 0:
counts.append(f"{count} {sev.capitalize()}")
total = summary['total_vulnerabilities']
lines.append(f"**Total Vulnerabilities:** {total}" + (f" ({', '.join(counts)})" if counts else ""))
lines.append("\n---\n")
if total == 0:
lines.append("No vulnerabilities found.\n")
return "\n".join(lines)
# Group vulnerabilities by severity
by_severity = {}
for vuln in result.vulnerabilities:
sev = vuln.risk_level.value
by_severity.setdefault(sev, []).append(vuln)
finding_num = 0
for sev in severity_order:
vulns = by_severity.get(sev, [])
if not vulns:
continue
lines.append(f"## {sev.capitalize()}\n")
for vuln in vulns:
finding_num += 1
cwe = f" ({vuln.cwe_id})" if vuln.cwe_id else ""
lines.append(f"### {finding_num}. {vuln.name}{cwe}\n")
lines.append(f"- **File:** `{vuln.file_path}:{vuln.line_number}`")
lines.append(f"- **Confidence:** {vuln.confidence}")
lines.append(f"- **Description:** {vuln.description}")
if vuln.code_snippet and vuln.code_snippet.strip():
ext = os.path.splitext(vuln.file_path)[1].lstrip(".")
lang = ext if ext else ""
lines.append(f"- **Code:**")
lines.append(f" ```{lang}")
lines.append(f" {vuln.code_snippet.strip()}")
lines.append(f" ```")
if vuln.remediation:
lines.append(f"- **Remediation:** {vuln.remediation.strip()}")
if vuln.cve_ids:
lines.append(f"- **Related CVEs:** {', '.join(vuln.cve_ids)}")
lines.append("")
lines.append("---\n")
lines.append(f"*Generated by Security Auditor | {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*\n")
return "\n".join(lines)
# ============================================================
# CLI Interface
# ============================================================
async def main():
"""Command-line interface for the security checker."""
import argparse
parser = argparse.ArgumentParser(
description="Security Checker - SAST and NVD-powered vulnerability scanner",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Scan a local project
python security_checker.py /path/to/project
# Scan a web application
python security_checker.py https://example.com --web
# Generate HTML report
python security_checker.py /path/to/project --format html -o report.html
# Search NVD for SQL injection vulnerabilities
python security_checker.py --nvd-search "sql injection" --severity HIGH
"""
)
parser.add_argument(
"target",
nargs="?",
help="Target to scan (local path or URL)"
)
parser.add_argument(
"--web",
action="store_true",
help="Treat target as web URL"
)
parser.add_argument(
"--format",
choices=["text", "json", "html"],
default="text",
help="Output format (default: text)"
)
parser.add_argument(
"-o", "--output",
help="Output file (default: stdout)"
)
parser.add_argument(
"--nvd-api-key",
help="NVD API key for higher rate limits"
)
parser.add_argument(
"--no-nvd",
action="store_true",
help="Skip NVD enrichment"
)
parser.add_argument(
"--nvd-search",
help="Search NVD for vulnerabilities by keyword"
)
parser.add_argument(
"--severity",
choices=["LOW", "MEDIUM", "HIGH", "CRITICAL"],
help="Filter NVD search by severity"
)
args = parser.parse_args()
# Initialize checker
checker = SecurityChecker(nvd_api_key=args.nvd_api_key)
# NVD search mode
if args.nvd_search:
print(f"Searching NVD for: {args.nvd_search}")
results = await checker.search_nvd(
keyword=args.nvd_search,
severity=args.severity
)
if results and "error" not in results[0]:
for cve in results:
print(f"\n{cve['cve_id']} ({cve['severity']} - {cve['cvss_score']})")
print(f" {cve['description'][:200]}...")
if cve['cwes']:
print(f" CWEs: {', '.join(cve['cwes'])}")
else:
print(f"Error: {results[0].get('error', 'Unknown error')}")
return
# Require target for scanning
if not args.target:
parser.print_help()
return
# Perform scan
print(f"Scanning: {args.target}")
print("This may take a moment...")
if args.web or args.target.startswith(('http://', 'https://')):
result = await checker.scan_web(args.target, include_nvd=not args.no_nvd)
else:
result = await checker.scan_local(args.target, include_nvd=not args.no_nvd)
# Generate report
report = checker.generate_report(result, format=args.format)
# Output
if args.output:
with open(args.output, 'w') as f:
f.write(report)
print(f"Report saved to: {args.output}")
else:
print(report)
if __name__ == "__main__":
asyncio.run(main())