""" Result formatter for security scan results. Transforms raw vulnerability data into beginner-friendly explanations. """ from datetime import datetime from typing import Dict, List, Any import sys from pathlib import Path # Import utilities sys.path.insert(0, str(Path(__file__).parent)) from utils import get_severity_order # Explanation templates for common vulnerabilities EXPLANATION_TEMPLATES = { "hardcoded_api_key": { "what": "An API key is hardcoded directly in the source code", "why": "Source code is stored in version control systems (Git) and can be accessed by multiple developers. " "Anyone with access to the code can copy and misuse this API key. " "If accidentally pushed to a public repository, it becomes exposed to the entire world. " "Attackers can use this key to make unauthorized API calls, incur costs, or steal data.", "how_to_fix": "Store API keys in environment variables or separate configuration files (e.g., .env), " "and add these files to .gitignore to exclude them from version control.", "example": "# Bad example\napi_key = 'sk-1234567890abcdef'\n\n" "# Good example\nimport os\napi_key = os.getenv('API_KEY')\n\n" "# In .env file\n# API_KEY=sk-1234567890abcdef", "references": [ "https://owasp.org/www-community/vulnerabilities/Use_of_hard-coded_password", "https://12factor.net/config" ] }, "sql_injection": { "what": "User input is directly inserted into SQL queries, causing SQL injection vulnerability", "why": "If an attacker inputs malicious SQL code, they can query or delete all data in the database. " "For example, input like 'admin' OR '1'='1' can bypass authentication, " "or input like '; DROP TABLE users--' can delete entire tables.", "how_to_fix": "Use parameterized queries (Prepared Statements) to separate user input from SQL code. " "Using an ORM (SQLAlchemy, Django ORM, etc.) automatically handles this safely.", "example": "# Bad example\nquery = f\"SELECT * FROM users WHERE id={user_id}\"\n\n" "# Good example\nquery = \"SELECT * FROM users WHERE id=%s\"\ncursor.execute(query, (user_id,))\n\n" "# Using ORM\nuser = User.objects.filter(id=user_id).first()", "references": [ "https://owasp.org/www-community/attacks/SQL_Injection", "https://cheatsheetseries.owasp.org/cheatsheets/SQL_Injection_Prevention_Cheat_Sheet.html" ] }, "password": { "what": "A password is stored in plaintext in the source code", "why": "Anyone with access to the code can see this password. " "It remains permanently in Git history and can be recovered even after deletion. " "If the same password is reused on other services, the damage can be even greater.", "how_to_fix": "Store passwords in environment variables, and if possible, use a secrets management service (AWS Secrets Manager, HashiCorp Vault, etc.).", "example": "# Bad example\npassword = 'MyPassword123'\n\n" "# Good example\nimport os\npassword = os.getenv('DB_PASSWORD')", "references": [ "https://owasp.org/www-community/vulnerabilities/Use_of_hard-coded_password" ] }, "pickle_usage": { "what": "Untrusted data is being deserialized using pickle.loads()", "why": "Pickle can execute arbitrary code when restoring Python objects. " "If an attacker provides maliciously crafted pickle data, " "they can execute arbitrary commands on the server or completely take over the system.", "how_to_fix": "For untrusted data, use safe serialization formats like JSON, YAML (safe_load), " "or Protocol Buffers instead of pickle.", "example": "# Bad example\nimport pickle\ndata = pickle.loads(untrusted_input)\n\n" "# Good example\nimport json\ndata = json.loads(untrusted_input)", "references": [ "https://docs.python.org/3/library/pickle.html#module-pickle", "https://owasp.org/www-community/vulnerabilities/Deserialization_of_untrusted_data" ] }, "exec_usage": { "what": "Code is being dynamically executed using exec() or eval()", "why": "If user input or external data is executed with exec()/eval(), " "attackers can run arbitrary Python code. " "This can lead to catastrophic results such as accessing all server files, " "attacking other systems, or installing malware.", "how_to_fix": "Avoid using exec() and eval() whenever possible. " "If necessary, use ast.literal_eval() for safe evaluation, " "or use whitelist-based command mapping.", "example": "# Bad example\nexec(user_input)\n\n" "# Good example (literals only)\nimport ast\nvalue = ast.literal_eval(user_input)\n\n" "# Or use whitelist\nallowed_commands = {'start': start_func, 'stop': stop_func}\ncommand = allowed_commands.get(user_input)", "references": [ "https://docs.python.org/3/library/functions.html#eval", "https://nedbatchelder.com/blog/201206/eval_really_is_dangerous.html" ] }, "shell_injection": { "what": "Commands are being executed with shell=True in subprocess", "why": "shell=True executes commands through the shell, so if user input contains shell metacharacters " "like semicolons (;) or pipes (|), additional commands can be executed. " "For example, input like '; rm -rf /' could delete the entire system.", "how_to_fix": "Remove shell=True and pass commands as a list. " "Alternatively, escape input using shlex.quote().", "example": "# Bad example\nimport subprocess\nsubprocess.call(f'ls {user_dir}', shell=True)\n\n" "# Good example\nsubprocess.call(['ls', user_dir])\n\n" "# Or use shlex\nimport shlex\nsafe_dir = shlex.quote(user_dir)\nsubprocess.call(f'ls {safe_dir}', shell=True)", "references": [ "https://docs.python.org/3/library/subprocess.html#security-considerations", "https://owasp.org/www-community/attacks/Command_Injection" ] } } def get_explanation(vulnerability: Dict[str, Any]) -> Dict[str, Any]: """ Generate beginner-friendly explanation for a vulnerability. Args: vulnerability: Vulnerability dictionary from scanner Returns: Explanation dictionary with what, why, how_to_fix, example, references """ vuln_id = vulnerability.get("id", "") vuln_type = vulnerability.get("pattern_type", "") # Try to find a matching template template = None # Check by pattern type first if vuln_type in EXPLANATION_TEMPLATES: template = EXPLANATION_TEMPLATES[vuln_type] # Check by ID pattern elif "sql-injection" in vuln_id: template = EXPLANATION_TEMPLATES["sql_injection"] elif "api" in vuln_id.lower() or "api_key" in vuln_type: template = EXPLANATION_TEMPLATES["hardcoded_api_key"] elif "password" in vuln_id.lower() or "password" in vuln_type: template = EXPLANATION_TEMPLATES["password"] elif "pickle" in vuln_id.lower() or "B301" in vuln_id: template = EXPLANATION_TEMPLATES["pickle_usage"] elif "exec" in vuln_id.lower() or "eval" in vuln_id.lower() or "B102" in vuln_id: template = EXPLANATION_TEMPLATES["exec_usage"] elif "shell" in vuln_id.lower() or "B602" in vuln_id: template = EXPLANATION_TEMPLATES["shell_injection"] # Use template or create generic explanation if template: explanation = template.copy() else: # Generic explanation description = vulnerability.get("description", "A security vulnerability was found") explanation = { "what": description, "why": "This pattern can cause security vulnerabilities, and if exploited by attackers, it can damage the system.", "how_to_fix": vulnerability.get("recommendation", "Follow security best practices and validate untrusted input."), "example": "# Refer to security guides for writing secure code", "references": [ "https://owasp.org/www-project-top-ten/", "https://cheatsheetseries.owasp.org/" ] } return explanation def remove_duplicates(vulnerabilities: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Remove duplicate vulnerabilities based on line number and issue type. Args: vulnerabilities: List of vulnerability dictionaries Returns: Deduplicated list """ seen = set() unique = [] for vuln in vulnerabilities: # Create a key based on line number and vulnerability type key = ( vuln.get("line_number"), vuln.get("id", "").split("-")[0], # Base ID without suffix vuln.get("file_path", "") ) if key not in seen: seen.add(key) unique.append(vuln) return unique def sort_vulnerabilities(vulnerabilities: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Sort vulnerabilities by severity and line number. Args: vulnerabilities: List of vulnerability dictionaries Returns: Sorted list """ severity_order = get_severity_order() def sort_key(vuln): severity = vuln.get("severity", "LOW").upper() severity_value = severity_order.get(severity, 99) line_number = vuln.get("line_number", 0) return (severity_value, line_number) return sorted(vulnerabilities, key=sort_key) def calculate_summary(vulnerabilities: List[Dict[str, Any]]) -> Dict[str, Any]: """ Calculate summary statistics for vulnerabilities. Args: vulnerabilities: List of vulnerability dictionaries Returns: Summary dictionary with counts """ summary = { "total_issues": len(vulnerabilities), "critical": 0, "high": 0, "medium": 0, "low": 0, "scan_timestamp": datetime.utcnow().isoformat() + "Z" } for vuln in vulnerabilities: severity = vuln.get("severity", "LOW").lower() if severity in summary: summary[severity] += 1 return summary def format_results( vulnerabilities: List[Dict[str, Any]], severity_threshold: str = "MEDIUM" ) -> Dict[str, Any]: """ Format scan results into beginner-friendly output. Args: vulnerabilities: List of vulnerability dictionaries from scanners severity_threshold: Minimum severity threshold used Returns: Formatted results dictionary """ # Remove duplicates unique_vulns = remove_duplicates(vulnerabilities) # Sort by severity and line number sorted_vulns = sort_vulnerabilities(unique_vulns) # Add explanations to each vulnerability formatted_vulns = [] for vuln in sorted_vulns: formatted_vuln = vuln.copy() # Add explanation if not already present if "explanation" not in formatted_vuln: formatted_vuln["explanation"] = get_explanation(vuln) formatted_vulns.append(formatted_vuln) # Calculate summary summary = calculate_summary(formatted_vulns) summary["severity_threshold"] = severity_threshold # Create final result result = { "summary": summary, "vulnerabilities": formatted_vulns } return result def format_for_display(results: Dict[str, Any]) -> str: """ Format results for console display. Args: results: Formatted results dictionary Returns: Human-readable string """ summary = results.get("summary", {}) vulns = results.get("vulnerabilities", []) output = [] output.append("\n" + "=" * 70) output.append("Security Scan Results") output.append("=" * 70) output.append(f"\nTotal issues found: {summary.get('total_issues', 0)}") output.append(f" - CRITICAL: {summary.get('critical', 0)}") output.append(f" - HIGH: {summary.get('high', 0)}") output.append(f" - MEDIUM: {summary.get('medium', 0)}") output.append(f" - LOW: {summary.get('low', 0)}") output.append(f"\nScan timestamp: {summary.get('scan_timestamp', 'N/A')}") output.append("\n" + "-" * 70) for i, vuln in enumerate(vulns, 1): output.append(f"\n[{i}] {vuln.get('title', 'Unknown Issue')}") output.append(f"Severity: {vuln.get('severity', 'UNKNOWN')}") output.append(f"Location: Line {vuln.get('line_number', 'N/A')}") output.append(f"Code: {vuln.get('code_snippet', '')[:60]}...") explanation = vuln.get("explanation", {}) if explanation: output.append(f"\nIssue: {explanation.get('what', '')}") output.append(f"Risk: {explanation.get('why', '')[:100]}...") output.append(f"Fix: {explanation.get('how_to_fix', '')[:100]}...") output.append("\n" + "-" * 70) return "\n".join(output)