File size: 13,386 Bytes
f8e78b2 3a62f17 f8e78b2 3a62f17 f8e78b2 3a62f17 f8e78b2 3a62f17 f8e78b2 3a62f17 f8e78b2 3a62f17 f8e78b2 3a62f17 f8e78b2 3a62f17 f8e78b2 3a62f17 f8e78b2 3a62f17 f8e78b2 3a62f17 f8e78b2 3a62f17 f8e78b2 3a62f17 f8e78b2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 |
"""
Result formatter for security scan results.
Transforms raw vulnerability data into beginner-friendly explanations.
"""
from datetime import datetime
from typing import Dict, List, Any
import sys
from pathlib import Path
# Import utilities
sys.path.insert(0, str(Path(__file__).parent))
from utils import get_severity_order
# Explanation templates for common vulnerabilities
EXPLANATION_TEMPLATES = {
"hardcoded_api_key": {
"what": "An API key is hardcoded directly in the source code",
"why": "Source code is stored in version control systems (Git) and can be accessed by multiple developers. "
"Anyone with access to the code can copy and misuse this API key. "
"If accidentally pushed to a public repository, it becomes exposed to the entire world. "
"Attackers can use this key to make unauthorized API calls, incur costs, or steal data.",
"how_to_fix": "Store API keys in environment variables or separate configuration files (e.g., .env), "
"and add these files to .gitignore to exclude them from version control.",
"example": "# Bad example\napi_key = 'sk-1234567890abcdef'\n\n"
"# Good example\nimport os\napi_key = os.getenv('API_KEY')\n\n"
"# In .env file\n# API_KEY=sk-1234567890abcdef",
"references": [
"https://owasp.org/www-community/vulnerabilities/Use_of_hard-coded_password",
"https://12factor.net/config"
]
},
"sql_injection": {
"what": "User input is directly inserted into SQL queries, causing SQL injection vulnerability",
"why": "If an attacker inputs malicious SQL code, they can query or delete all data in the database. "
"For example, input like 'admin' OR '1'='1' can bypass authentication, "
"or input like '; DROP TABLE users--' can delete entire tables.",
"how_to_fix": "Use parameterized queries (Prepared Statements) to separate user input from SQL code. "
"Using an ORM (SQLAlchemy, Django ORM, etc.) automatically handles this safely.",
"example": "# Bad example\nquery = f\"SELECT * FROM users WHERE id={user_id}\"\n\n"
"# Good example\nquery = \"SELECT * FROM users WHERE id=%s\"\ncursor.execute(query, (user_id,))\n\n"
"# Using ORM\nuser = User.objects.filter(id=user_id).first()",
"references": [
"https://owasp.org/www-community/attacks/SQL_Injection",
"https://cheatsheetseries.owasp.org/cheatsheets/SQL_Injection_Prevention_Cheat_Sheet.html"
]
},
"password": {
"what": "A password is stored in plaintext in the source code",
"why": "Anyone with access to the code can see this password. "
"It remains permanently in Git history and can be recovered even after deletion. "
"If the same password is reused on other services, the damage can be even greater.",
"how_to_fix": "Store passwords in environment variables, and if possible, use a secrets management service (AWS Secrets Manager, HashiCorp Vault, etc.).",
"example": "# Bad example\npassword = 'MyPassword123'\n\n"
"# Good example\nimport os\npassword = os.getenv('DB_PASSWORD')",
"references": [
"https://owasp.org/www-community/vulnerabilities/Use_of_hard-coded_password"
]
},
"pickle_usage": {
"what": "Untrusted data is being deserialized using pickle.loads()",
"why": "Pickle can execute arbitrary code when restoring Python objects. "
"If an attacker provides maliciously crafted pickle data, "
"they can execute arbitrary commands on the server or completely take over the system.",
"how_to_fix": "For untrusted data, use safe serialization formats like JSON, YAML (safe_load), "
"or Protocol Buffers instead of pickle.",
"example": "# Bad example\nimport pickle\ndata = pickle.loads(untrusted_input)\n\n"
"# Good example\nimport json\ndata = json.loads(untrusted_input)",
"references": [
"https://docs.python.org/3/library/pickle.html#module-pickle",
"https://owasp.org/www-community/vulnerabilities/Deserialization_of_untrusted_data"
]
},
"exec_usage": {
"what": "Code is being dynamically executed using exec() or eval()",
"why": "If user input or external data is executed with exec()/eval(), "
"attackers can run arbitrary Python code. "
"This can lead to catastrophic results such as accessing all server files, "
"attacking other systems, or installing malware.",
"how_to_fix": "Avoid using exec() and eval() whenever possible. "
"If necessary, use ast.literal_eval() for safe evaluation, "
"or use whitelist-based command mapping.",
"example": "# Bad example\nexec(user_input)\n\n"
"# Good example (literals only)\nimport ast\nvalue = ast.literal_eval(user_input)\n\n"
"# Or use whitelist\nallowed_commands = {'start': start_func, 'stop': stop_func}\ncommand = allowed_commands.get(user_input)",
"references": [
"https://docs.python.org/3/library/functions.html#eval",
"https://nedbatchelder.com/blog/201206/eval_really_is_dangerous.html"
]
},
"shell_injection": {
"what": "Commands are being executed with shell=True in subprocess",
"why": "shell=True executes commands through the shell, so if user input contains shell metacharacters "
"like semicolons (;) or pipes (|), additional commands can be executed. "
"For example, input like '; rm -rf /' could delete the entire system.",
"how_to_fix": "Remove shell=True and pass commands as a list. "
"Alternatively, escape input using shlex.quote().",
"example": "# Bad example\nimport subprocess\nsubprocess.call(f'ls {user_dir}', shell=True)\n\n"
"# Good example\nsubprocess.call(['ls', user_dir])\n\n"
"# Or use shlex\nimport shlex\nsafe_dir = shlex.quote(user_dir)\nsubprocess.call(f'ls {safe_dir}', shell=True)",
"references": [
"https://docs.python.org/3/library/subprocess.html#security-considerations",
"https://owasp.org/www-community/attacks/Command_Injection"
]
}
}
def get_explanation(vulnerability: Dict[str, Any]) -> Dict[str, Any]:
"""
Generate beginner-friendly explanation for a vulnerability.
Args:
vulnerability: Vulnerability dictionary from scanner
Returns:
Explanation dictionary with what, why, how_to_fix, example, references
"""
vuln_id = vulnerability.get("id", "")
vuln_type = vulnerability.get("pattern_type", "")
# Try to find a matching template
template = None
# Check by pattern type first
if vuln_type in EXPLANATION_TEMPLATES:
template = EXPLANATION_TEMPLATES[vuln_type]
# Check by ID pattern
elif "sql-injection" in vuln_id:
template = EXPLANATION_TEMPLATES["sql_injection"]
elif "api" in vuln_id.lower() or "api_key" in vuln_type:
template = EXPLANATION_TEMPLATES["hardcoded_api_key"]
elif "password" in vuln_id.lower() or "password" in vuln_type:
template = EXPLANATION_TEMPLATES["password"]
elif "pickle" in vuln_id.lower() or "B301" in vuln_id:
template = EXPLANATION_TEMPLATES["pickle_usage"]
elif "exec" in vuln_id.lower() or "eval" in vuln_id.lower() or "B102" in vuln_id:
template = EXPLANATION_TEMPLATES["exec_usage"]
elif "shell" in vuln_id.lower() or "B602" in vuln_id:
template = EXPLANATION_TEMPLATES["shell_injection"]
# Use template or create generic explanation
if template:
explanation = template.copy()
else:
# Generic explanation
description = vulnerability.get("description", "A security vulnerability was found")
explanation = {
"what": description,
"why": "This pattern can cause security vulnerabilities, and if exploited by attackers, it can damage the system.",
"how_to_fix": vulnerability.get("recommendation", "Follow security best practices and validate untrusted input."),
"example": "# Refer to security guides for writing secure code",
"references": [
"https://owasp.org/www-project-top-ten/",
"https://cheatsheetseries.owasp.org/"
]
}
return explanation
def remove_duplicates(vulnerabilities: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Remove duplicate vulnerabilities based on line number and issue type.
Args:
vulnerabilities: List of vulnerability dictionaries
Returns:
Deduplicated list
"""
seen = set()
unique = []
for vuln in vulnerabilities:
# Create a key based on line number and vulnerability type
key = (
vuln.get("line_number"),
vuln.get("id", "").split("-")[0], # Base ID without suffix
vuln.get("file_path", "")
)
if key not in seen:
seen.add(key)
unique.append(vuln)
return unique
def sort_vulnerabilities(vulnerabilities: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Sort vulnerabilities by severity and line number.
Args:
vulnerabilities: List of vulnerability dictionaries
Returns:
Sorted list
"""
severity_order = get_severity_order()
def sort_key(vuln):
severity = vuln.get("severity", "LOW").upper()
severity_value = severity_order.get(severity, 99)
line_number = vuln.get("line_number", 0)
return (severity_value, line_number)
return sorted(vulnerabilities, key=sort_key)
def calculate_summary(vulnerabilities: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Calculate summary statistics for vulnerabilities.
Args:
vulnerabilities: List of vulnerability dictionaries
Returns:
Summary dictionary with counts
"""
summary = {
"total_issues": len(vulnerabilities),
"critical": 0,
"high": 0,
"medium": 0,
"low": 0,
"scan_timestamp": datetime.utcnow().isoformat() + "Z"
}
for vuln in vulnerabilities:
severity = vuln.get("severity", "LOW").lower()
if severity in summary:
summary[severity] += 1
return summary
def format_results(
vulnerabilities: List[Dict[str, Any]],
severity_threshold: str = "MEDIUM"
) -> Dict[str, Any]:
"""
Format scan results into beginner-friendly output.
Args:
vulnerabilities: List of vulnerability dictionaries from scanners
severity_threshold: Minimum severity threshold used
Returns:
Formatted results dictionary
"""
# Remove duplicates
unique_vulns = remove_duplicates(vulnerabilities)
# Sort by severity and line number
sorted_vulns = sort_vulnerabilities(unique_vulns)
# Add explanations to each vulnerability
formatted_vulns = []
for vuln in sorted_vulns:
formatted_vuln = vuln.copy()
# Add explanation if not already present
if "explanation" not in formatted_vuln:
formatted_vuln["explanation"] = get_explanation(vuln)
formatted_vulns.append(formatted_vuln)
# Calculate summary
summary = calculate_summary(formatted_vulns)
summary["severity_threshold"] = severity_threshold
# Create final result
result = {
"summary": summary,
"vulnerabilities": formatted_vulns
}
return result
def format_for_display(results: Dict[str, Any]) -> str:
"""
Format results for console display.
Args:
results: Formatted results dictionary
Returns:
Human-readable string
"""
summary = results.get("summary", {})
vulns = results.get("vulnerabilities", [])
output = []
output.append("\n" + "=" * 70)
output.append("Security Scan Results")
output.append("=" * 70)
output.append(f"\nTotal issues found: {summary.get('total_issues', 0)}")
output.append(f" - CRITICAL: {summary.get('critical', 0)}")
output.append(f" - HIGH: {summary.get('high', 0)}")
output.append(f" - MEDIUM: {summary.get('medium', 0)}")
output.append(f" - LOW: {summary.get('low', 0)}")
output.append(f"\nScan timestamp: {summary.get('scan_timestamp', 'N/A')}")
output.append("\n" + "-" * 70)
for i, vuln in enumerate(vulns, 1):
output.append(f"\n[{i}] {vuln.get('title', 'Unknown Issue')}")
output.append(f"Severity: {vuln.get('severity', 'UNKNOWN')}")
output.append(f"Location: Line {vuln.get('line_number', 'N/A')}")
output.append(f"Code: {vuln.get('code_snippet', '')[:60]}...")
explanation = vuln.get("explanation", {})
if explanation:
output.append(f"\nIssue: {explanation.get('what', '')}")
output.append(f"Risk: {explanation.get('why', '')[:100]}...")
output.append(f"Fix: {explanation.get('how_to_fix', '')[:100]}...")
output.append("\n" + "-" * 70)
return "\n".join(output) |