|
|
""" |
|
|
Result formatter for security scan results. |
|
|
|
|
|
Transforms raw vulnerability data into beginner-friendly explanations. |
|
|
""" |
|
|
|
|
|
from datetime import datetime |
|
|
from typing import Dict, List, Any |
|
|
import sys |
|
|
from pathlib import Path |
|
|
|
|
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent)) |
|
|
from utils import get_severity_order |
|
|
|
|
|
|
|
|
EXPLANATION_TEMPLATES = { |
|
|
"hardcoded_api_key": { |
|
|
"what": "An API key is hardcoded directly in the source code", |
|
|
"why": "Source code is stored in version control systems (Git) and can be accessed by multiple developers. " |
|
|
"Anyone with access to the code can copy and misuse this API key. " |
|
|
"If accidentally pushed to a public repository, it becomes exposed to the entire world. " |
|
|
"Attackers can use this key to make unauthorized API calls, incur costs, or steal data.", |
|
|
"how_to_fix": "Store API keys in environment variables or separate configuration files (e.g., .env), " |
|
|
"and add these files to .gitignore to exclude them from version control.", |
|
|
"example": "# Bad example\napi_key = 'sk-1234567890abcdef'\n\n" |
|
|
"# Good example\nimport os\napi_key = os.getenv('API_KEY')\n\n" |
|
|
"# In .env file\n# API_KEY=sk-1234567890abcdef", |
|
|
"references": [ |
|
|
"https://owasp.org/www-community/vulnerabilities/Use_of_hard-coded_password", |
|
|
"https://12factor.net/config" |
|
|
] |
|
|
}, |
|
|
"sql_injection": { |
|
|
"what": "User input is directly inserted into SQL queries, causing SQL injection vulnerability", |
|
|
"why": "If an attacker inputs malicious SQL code, they can query or delete all data in the database. " |
|
|
"For example, input like 'admin' OR '1'='1' can bypass authentication, " |
|
|
"or input like '; DROP TABLE users--' can delete entire tables.", |
|
|
"how_to_fix": "Use parameterized queries (Prepared Statements) to separate user input from SQL code. " |
|
|
"Using an ORM (SQLAlchemy, Django ORM, etc.) automatically handles this safely.", |
|
|
"example": "# Bad example\nquery = f\"SELECT * FROM users WHERE id={user_id}\"\n\n" |
|
|
"# Good example\nquery = \"SELECT * FROM users WHERE id=%s\"\ncursor.execute(query, (user_id,))\n\n" |
|
|
"# Using ORM\nuser = User.objects.filter(id=user_id).first()", |
|
|
"references": [ |
|
|
"https://owasp.org/www-community/attacks/SQL_Injection", |
|
|
"https://cheatsheetseries.owasp.org/cheatsheets/SQL_Injection_Prevention_Cheat_Sheet.html" |
|
|
] |
|
|
}, |
|
|
"password": { |
|
|
"what": "A password is stored in plaintext in the source code", |
|
|
"why": "Anyone with access to the code can see this password. " |
|
|
"It remains permanently in Git history and can be recovered even after deletion. " |
|
|
"If the same password is reused on other services, the damage can be even greater.", |
|
|
"how_to_fix": "Store passwords in environment variables, and if possible, use a secrets management service (AWS Secrets Manager, HashiCorp Vault, etc.).", |
|
|
"example": "# Bad example\npassword = 'MyPassword123'\n\n" |
|
|
"# Good example\nimport os\npassword = os.getenv('DB_PASSWORD')", |
|
|
"references": [ |
|
|
"https://owasp.org/www-community/vulnerabilities/Use_of_hard-coded_password" |
|
|
] |
|
|
}, |
|
|
"pickle_usage": { |
|
|
"what": "Untrusted data is being deserialized using pickle.loads()", |
|
|
"why": "Pickle can execute arbitrary code when restoring Python objects. " |
|
|
"If an attacker provides maliciously crafted pickle data, " |
|
|
"they can execute arbitrary commands on the server or completely take over the system.", |
|
|
"how_to_fix": "For untrusted data, use safe serialization formats like JSON, YAML (safe_load), " |
|
|
"or Protocol Buffers instead of pickle.", |
|
|
"example": "# Bad example\nimport pickle\ndata = pickle.loads(untrusted_input)\n\n" |
|
|
"# Good example\nimport json\ndata = json.loads(untrusted_input)", |
|
|
"references": [ |
|
|
"https://docs.python.org/3/library/pickle.html#module-pickle", |
|
|
"https://owasp.org/www-community/vulnerabilities/Deserialization_of_untrusted_data" |
|
|
] |
|
|
}, |
|
|
"exec_usage": { |
|
|
"what": "Code is being dynamically executed using exec() or eval()", |
|
|
"why": "If user input or external data is executed with exec()/eval(), " |
|
|
"attackers can run arbitrary Python code. " |
|
|
"This can lead to catastrophic results such as accessing all server files, " |
|
|
"attacking other systems, or installing malware.", |
|
|
"how_to_fix": "Avoid using exec() and eval() whenever possible. " |
|
|
"If necessary, use ast.literal_eval() for safe evaluation, " |
|
|
"or use whitelist-based command mapping.", |
|
|
"example": "# Bad example\nexec(user_input)\n\n" |
|
|
"# Good example (literals only)\nimport ast\nvalue = ast.literal_eval(user_input)\n\n" |
|
|
"# Or use whitelist\nallowed_commands = {'start': start_func, 'stop': stop_func}\ncommand = allowed_commands.get(user_input)", |
|
|
"references": [ |
|
|
"https://docs.python.org/3/library/functions.html#eval", |
|
|
"https://nedbatchelder.com/blog/201206/eval_really_is_dangerous.html" |
|
|
] |
|
|
}, |
|
|
"shell_injection": { |
|
|
"what": "Commands are being executed with shell=True in subprocess", |
|
|
"why": "shell=True executes commands through the shell, so if user input contains shell metacharacters " |
|
|
"like semicolons (;) or pipes (|), additional commands can be executed. " |
|
|
"For example, input like '; rm -rf /' could delete the entire system.", |
|
|
"how_to_fix": "Remove shell=True and pass commands as a list. " |
|
|
"Alternatively, escape input using shlex.quote().", |
|
|
"example": "# Bad example\nimport subprocess\nsubprocess.call(f'ls {user_dir}', shell=True)\n\n" |
|
|
"# Good example\nsubprocess.call(['ls', user_dir])\n\n" |
|
|
"# Or use shlex\nimport shlex\nsafe_dir = shlex.quote(user_dir)\nsubprocess.call(f'ls {safe_dir}', shell=True)", |
|
|
"references": [ |
|
|
"https://docs.python.org/3/library/subprocess.html#security-considerations", |
|
|
"https://owasp.org/www-community/attacks/Command_Injection" |
|
|
] |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
def get_explanation(vulnerability: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
Generate beginner-friendly explanation for a vulnerability. |
|
|
|
|
|
Args: |
|
|
vulnerability: Vulnerability dictionary from scanner |
|
|
|
|
|
Returns: |
|
|
Explanation dictionary with what, why, how_to_fix, example, references |
|
|
""" |
|
|
vuln_id = vulnerability.get("id", "") |
|
|
vuln_type = vulnerability.get("pattern_type", "") |
|
|
|
|
|
|
|
|
template = None |
|
|
|
|
|
|
|
|
if vuln_type in EXPLANATION_TEMPLATES: |
|
|
template = EXPLANATION_TEMPLATES[vuln_type] |
|
|
|
|
|
elif "sql-injection" in vuln_id: |
|
|
template = EXPLANATION_TEMPLATES["sql_injection"] |
|
|
elif "api" in vuln_id.lower() or "api_key" in vuln_type: |
|
|
template = EXPLANATION_TEMPLATES["hardcoded_api_key"] |
|
|
elif "password" in vuln_id.lower() or "password" in vuln_type: |
|
|
template = EXPLANATION_TEMPLATES["password"] |
|
|
elif "pickle" in vuln_id.lower() or "B301" in vuln_id: |
|
|
template = EXPLANATION_TEMPLATES["pickle_usage"] |
|
|
elif "exec" in vuln_id.lower() or "eval" in vuln_id.lower() or "B102" in vuln_id: |
|
|
template = EXPLANATION_TEMPLATES["exec_usage"] |
|
|
elif "shell" in vuln_id.lower() or "B602" in vuln_id: |
|
|
template = EXPLANATION_TEMPLATES["shell_injection"] |
|
|
|
|
|
|
|
|
if template: |
|
|
explanation = template.copy() |
|
|
else: |
|
|
|
|
|
description = vulnerability.get("description", "A security vulnerability was found") |
|
|
explanation = { |
|
|
"what": description, |
|
|
"why": "This pattern can cause security vulnerabilities, and if exploited by attackers, it can damage the system.", |
|
|
"how_to_fix": vulnerability.get("recommendation", "Follow security best practices and validate untrusted input."), |
|
|
"example": "# Refer to security guides for writing secure code", |
|
|
"references": [ |
|
|
"https://owasp.org/www-project-top-ten/", |
|
|
"https://cheatsheetseries.owasp.org/" |
|
|
] |
|
|
} |
|
|
|
|
|
return explanation |
|
|
|
|
|
|
|
|
def remove_duplicates(vulnerabilities: List[Dict[str, Any]]) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Remove duplicate vulnerabilities based on line number and issue type. |
|
|
|
|
|
Args: |
|
|
vulnerabilities: List of vulnerability dictionaries |
|
|
|
|
|
Returns: |
|
|
Deduplicated list |
|
|
""" |
|
|
seen = set() |
|
|
unique = [] |
|
|
|
|
|
for vuln in vulnerabilities: |
|
|
|
|
|
key = ( |
|
|
vuln.get("line_number"), |
|
|
vuln.get("id", "").split("-")[0], |
|
|
vuln.get("file_path", "") |
|
|
) |
|
|
|
|
|
if key not in seen: |
|
|
seen.add(key) |
|
|
unique.append(vuln) |
|
|
|
|
|
return unique |
|
|
|
|
|
|
|
|
def sort_vulnerabilities(vulnerabilities: List[Dict[str, Any]]) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Sort vulnerabilities by severity and line number. |
|
|
|
|
|
Args: |
|
|
vulnerabilities: List of vulnerability dictionaries |
|
|
|
|
|
Returns: |
|
|
Sorted list |
|
|
""" |
|
|
severity_order = get_severity_order() |
|
|
|
|
|
def sort_key(vuln): |
|
|
severity = vuln.get("severity", "LOW").upper() |
|
|
severity_value = severity_order.get(severity, 99) |
|
|
line_number = vuln.get("line_number", 0) |
|
|
return (severity_value, line_number) |
|
|
|
|
|
return sorted(vulnerabilities, key=sort_key) |
|
|
|
|
|
|
|
|
def calculate_summary(vulnerabilities: List[Dict[str, Any]]) -> Dict[str, Any]: |
|
|
""" |
|
|
Calculate summary statistics for vulnerabilities. |
|
|
|
|
|
Args: |
|
|
vulnerabilities: List of vulnerability dictionaries |
|
|
|
|
|
Returns: |
|
|
Summary dictionary with counts |
|
|
""" |
|
|
summary = { |
|
|
"total_issues": len(vulnerabilities), |
|
|
"critical": 0, |
|
|
"high": 0, |
|
|
"medium": 0, |
|
|
"low": 0, |
|
|
"scan_timestamp": datetime.utcnow().isoformat() + "Z" |
|
|
} |
|
|
|
|
|
for vuln in vulnerabilities: |
|
|
severity = vuln.get("severity", "LOW").lower() |
|
|
if severity in summary: |
|
|
summary[severity] += 1 |
|
|
|
|
|
return summary |
|
|
|
|
|
|
|
|
def format_results( |
|
|
vulnerabilities: List[Dict[str, Any]], |
|
|
severity_threshold: str = "MEDIUM" |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Format scan results into beginner-friendly output. |
|
|
|
|
|
Args: |
|
|
vulnerabilities: List of vulnerability dictionaries from scanners |
|
|
severity_threshold: Minimum severity threshold used |
|
|
|
|
|
Returns: |
|
|
Formatted results dictionary |
|
|
""" |
|
|
|
|
|
unique_vulns = remove_duplicates(vulnerabilities) |
|
|
|
|
|
|
|
|
sorted_vulns = sort_vulnerabilities(unique_vulns) |
|
|
|
|
|
|
|
|
formatted_vulns = [] |
|
|
for vuln in sorted_vulns: |
|
|
formatted_vuln = vuln.copy() |
|
|
|
|
|
|
|
|
if "explanation" not in formatted_vuln: |
|
|
formatted_vuln["explanation"] = get_explanation(vuln) |
|
|
|
|
|
formatted_vulns.append(formatted_vuln) |
|
|
|
|
|
|
|
|
summary = calculate_summary(formatted_vulns) |
|
|
summary["severity_threshold"] = severity_threshold |
|
|
|
|
|
|
|
|
result = { |
|
|
"summary": summary, |
|
|
"vulnerabilities": formatted_vulns |
|
|
} |
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
def format_for_display(results: Dict[str, Any]) -> str: |
|
|
""" |
|
|
Format results for console display. |
|
|
|
|
|
Args: |
|
|
results: Formatted results dictionary |
|
|
|
|
|
Returns: |
|
|
Human-readable string |
|
|
""" |
|
|
summary = results.get("summary", {}) |
|
|
vulns = results.get("vulnerabilities", []) |
|
|
|
|
|
output = [] |
|
|
output.append("\n" + "=" * 70) |
|
|
output.append("Security Scan Results") |
|
|
output.append("=" * 70) |
|
|
output.append(f"\nTotal issues found: {summary.get('total_issues', 0)}") |
|
|
output.append(f" - CRITICAL: {summary.get('critical', 0)}") |
|
|
output.append(f" - HIGH: {summary.get('high', 0)}") |
|
|
output.append(f" - MEDIUM: {summary.get('medium', 0)}") |
|
|
output.append(f" - LOW: {summary.get('low', 0)}") |
|
|
output.append(f"\nScan timestamp: {summary.get('scan_timestamp', 'N/A')}") |
|
|
output.append("\n" + "-" * 70) |
|
|
|
|
|
for i, vuln in enumerate(vulns, 1): |
|
|
output.append(f"\n[{i}] {vuln.get('title', 'Unknown Issue')}") |
|
|
output.append(f"Severity: {vuln.get('severity', 'UNKNOWN')}") |
|
|
output.append(f"Location: Line {vuln.get('line_number', 'N/A')}") |
|
|
output.append(f"Code: {vuln.get('code_snippet', '')[:60]}...") |
|
|
|
|
|
explanation = vuln.get("explanation", {}) |
|
|
if explanation: |
|
|
output.append(f"\nIssue: {explanation.get('what', '')}") |
|
|
output.append(f"Risk: {explanation.get('why', '')[:100]}...") |
|
|
output.append(f"Fix: {explanation.get('how_to_fix', '')[:100]}...") |
|
|
|
|
|
output.append("\n" + "-" * 70) |
|
|
|
|
|
return "\n".join(output) |