""" Vulnerability Checker Module - Security scanning and vulnerability detection. This module handles: - Bandit security scanning for Python code - Safety checks for dependency vulnerabilities - Custom security rule validation """ import os import subprocess import logging import json from pathlib import Path from typing import List, Dict, Any logger = logging.getLogger(__name__) class VulnerabilityChecker: """Security vulnerability scanning and detection.""" def __init__(self, config: Dict[str, Any], project_root: str): """ Initialize the vulnerability checker. Args: config: Configuration dictionary for vulnerability checker project_root: Root directory of the project """ self.config = config self.project_root = Path(project_root) self.results = { "bandit_issues": [], "safety_vulnerabilities": [], "custom_checks": [], "total_issues": 0, "severity_counts": { "CRITICAL": 0, "HIGH": 0, "MEDIUM": 0, "LOW": 0 }, "errors": [] } def run(self) -> Dict[str, Any]: """ Run all vulnerability checks. Returns: Dictionary containing results of vulnerability checks """ logger.info("Starting vulnerability checks...") if not self.config.get("enabled", True): logger.info("Vulnerability checker is disabled in configuration") return self.results # Run Bandit security scanner if self.config.get("run_bandit", True): self._run_bandit() # Run Safety dependency checker if self.config.get("run_safety", True): self._run_safety() # Run custom security checks if self.config.get("custom_security_checks", True): self._run_custom_checks() # Calculate total issues self.results["total_issues"] = ( len(self.results["bandit_issues"]) + len(self.results["safety_vulnerabilities"]) + len(self.results["custom_checks"]) ) logger.info(f"Vulnerability checks completed. Found {self.results['total_issues']} issues") return self.results def _run_bandit(self): """Run Bandit security scanner.""" logger.info("Running Bandit security scanner...") try: exclude_dirs = ",".join(self.config.get("exclude_dirs", [])) cmd = [ "bandit", "-r", str(self.project_root), "-f", "json", "-ll" # Only report low severity and above ] if exclude_dirs: cmd.extend(["-x", exclude_dirs]) result = subprocess.run( cmd, capture_output=True, text=True, cwd=self.project_root ) # Bandit returns non-zero if issues found, which is expected if result.stdout: try: bandit_output = json.loads(result.stdout) issues = bandit_output.get("results", []) for issue in issues: severity = issue.get("issue_severity", "LOW") self.results["bandit_issues"].append({ "file": issue.get("filename", ""), "line": issue.get("line_number", 0), "severity": severity, "confidence": issue.get("issue_confidence", ""), "issue": issue.get("issue_text", ""), "code": issue.get("code", "") }) # Update severity counts if severity in self.results["severity_counts"]: self.results["severity_counts"][severity] += 1 logger.info(f"Bandit found {len(issues)} security issues") except json.JSONDecodeError: logger.error("Failed to parse Bandit output") self.results["errors"].append("Failed to parse Bandit output") except FileNotFoundError: error_msg = "Bandit is not installed. Install with: pip install bandit" logger.error(error_msg) self.results["errors"].append(error_msg) except Exception as e: logger.error(f"Error running Bandit: {str(e)}") self.results["errors"].append(f"Bandit error: {str(e)}") def _run_safety(self): """Run Safety to check for vulnerable dependencies.""" logger.info("Running Safety dependency checker...") try: # Check if requirements.txt exists req_files = list(self.project_root.glob("**/requirements*.txt")) if not req_files: logger.warning("No requirements.txt files found") return for req_file in req_files: cmd = [ "safety", "check", "--file", str(req_file), "--json" ] result = subprocess.run( cmd, capture_output=True, text=True, cwd=self.project_root ) # Safety returns non-zero if vulnerabilities found if result.stdout: try: safety_output = json.loads(result.stdout) for vuln in safety_output: self.results["safety_vulnerabilities"].append({ "package": vuln[0], "installed_version": vuln[2], "affected_versions": vuln[1], "vulnerability": vuln[3], "severity": "HIGH", # Safety doesn't provide severity, assume HIGH "file": str(req_file) }) self.results["severity_counts"]["HIGH"] += 1 logger.info(f"Safety found {len(safety_output)} vulnerable dependencies in {req_file.name}") except json.JSONDecodeError: logger.warning(f"Could not parse Safety output for {req_file.name}") except FileNotFoundError: error_msg = "Safety is not installed. Install with: pip install safety" logger.error(error_msg) self.results["errors"].append(error_msg) except Exception as e: logger.error(f"Error running Safety: {str(e)}") self.results["errors"].append(f"Safety error: {str(e)}") def _run_custom_checks(self): """Run custom security checks specific to the project.""" logger.info("Running custom security checks...") # Check 1: Look for exposed secrets in .env files self._check_env_files() # Check 2: Verify CORS configuration self._check_cors_config() # Check 3: Check for DEBUG mode in production self._check_debug_mode() # Check 4: Validate JWT secret key strength self._check_jwt_secret() def _check_env_files(self): """Check for potential exposed secrets in .env files.""" env_files = list(self.project_root.glob("**/.env")) for env_file in env_files: if env_file.name == ".env.example": continue try: with open(env_file, 'r') as f: content = f.read() # Check for weak or default secrets if "SECRET_KEY=changeme" in content or "SECRET_KEY=secret" in content: self.results["custom_checks"].append({ "file": str(env_file), "severity": "CRITICAL", "issue": "Weak or default SECRET_KEY detected", "recommendation": "Use a strong, randomly generated secret key" }) self.results["severity_counts"]["CRITICAL"] += 1 except Exception as e: logger.warning(f"Could not read {env_file}: {str(e)}") def _check_cors_config(self): """Check for overly permissive CORS configuration.""" config_files = list(self.project_root.glob("**/config.py")) for config_file in config_files: try: with open(config_file, 'r') as f: content = f.read() # Check for allow all origins in production if 'BACKEND_CORS_ORIGINS' in content and '"*"' in content: self.results["custom_checks"].append({ "file": str(config_file), "severity": "MEDIUM", "issue": "CORS allows all origins (*)", "recommendation": "Restrict CORS origins in production to specific domains" }) self.results["severity_counts"]["MEDIUM"] += 1 except Exception as e: logger.warning(f"Could not read {config_file}: {str(e)}") def _check_debug_mode(self): """Check if DEBUG mode is enabled.""" config_files = list(self.project_root.glob("**/config.py")) for config_file in config_files: try: with open(config_file, 'r') as f: content = f.read() if 'DEBUG: bool = True' in content or 'DEBUG = True' in content: self.results["custom_checks"].append({ "file": str(config_file), "severity": "MEDIUM", "issue": "DEBUG mode is enabled", "recommendation": "Disable DEBUG mode in production environments" }) self.results["severity_counts"]["MEDIUM"] += 1 except Exception as e: logger.warning(f"Could not read {config_file}: {str(e)}") def _check_jwt_secret(self): """Check JWT secret key strength.""" env_files = list(self.project_root.glob("**/.env")) for env_file in env_files: if env_file.name == ".env.example": continue try: with open(env_file, 'r') as f: for line in f: if line.startswith("SECRET_KEY="): secret = line.split("=", 1)[1].strip() # Check secret key length if len(secret) < 32: self.results["custom_checks"].append({ "file": str(env_file), "severity": "HIGH", "issue": "JWT SECRET_KEY is too short (< 32 characters)", "recommendation": "Use a secret key of at least 32 characters" }) self.results["severity_counts"]["HIGH"] += 1 except Exception as e: logger.warning(f"Could not read {env_file}: {str(e)}") def get_summary(self) -> str: """Get a summary of vulnerability check results.""" summary = f""" Vulnerability Check Summary: - Total issues found: {self.results['total_issues']} - Bandit security issues: {len(self.results['bandit_issues'])} - Vulnerable dependencies: {len(self.results['safety_vulnerabilities'])} - Custom check issues: {len(self.results['custom_checks'])} Severity Breakdown: - CRITICAL: {self.results['severity_counts']['CRITICAL']} - HIGH: {self.results['severity_counts']['HIGH']} - MEDIUM: {self.results['severity_counts']['MEDIUM']} - LOW: {self.results['severity_counts']['LOW']} """ return summary