| """ |
| Vulnerability Checker Module - Security scanning and vulnerability detection. |
| |
| This module handles: |
| - Bandit security scanning for Python code |
| - Safety checks for dependency vulnerabilities |
| - Custom security rule validation |
| """ |
|
|
| import os |
| import subprocess |
| import logging |
| import json |
| from pathlib import Path |
| from typing import List, Dict, Any |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class VulnerabilityChecker: |
| """Security vulnerability scanning and detection.""" |
| |
| def __init__(self, config: Dict[str, Any], project_root: str): |
| """ |
| Initialize the vulnerability checker. |
| |
| Args: |
| config: Configuration dictionary for vulnerability checker |
| project_root: Root directory of the project |
| """ |
| self.config = config |
| self.project_root = Path(project_root) |
| self.results = { |
| "bandit_issues": [], |
| "safety_vulnerabilities": [], |
| "custom_checks": [], |
| "total_issues": 0, |
| "severity_counts": { |
| "CRITICAL": 0, |
| "HIGH": 0, |
| "MEDIUM": 0, |
| "LOW": 0 |
| }, |
| "errors": [] |
| } |
| |
| def run(self) -> Dict[str, Any]: |
| """ |
| Run all vulnerability checks. |
| |
| Returns: |
| Dictionary containing results of vulnerability checks |
| """ |
| logger.info("Starting vulnerability checks...") |
| |
| if not self.config.get("enabled", True): |
| logger.info("Vulnerability checker is disabled in configuration") |
| return self.results |
| |
| |
| if self.config.get("run_bandit", True): |
| self._run_bandit() |
| |
| |
| if self.config.get("run_safety", True): |
| self._run_safety() |
| |
| |
| if self.config.get("custom_security_checks", True): |
| self._run_custom_checks() |
| |
| |
| self.results["total_issues"] = ( |
| len(self.results["bandit_issues"]) + |
| len(self.results["safety_vulnerabilities"]) + |
| len(self.results["custom_checks"]) |
| ) |
| |
| logger.info(f"Vulnerability checks completed. Found {self.results['total_issues']} issues") |
| return self.results |
| |
| def _run_bandit(self): |
| """Run Bandit security scanner.""" |
| logger.info("Running Bandit security scanner...") |
| |
| try: |
| exclude_dirs = ",".join(self.config.get("exclude_dirs", [])) |
| |
| cmd = [ |
| "bandit", |
| "-r", str(self.project_root), |
| "-f", "json", |
| "-ll" |
| ] |
| |
| if exclude_dirs: |
| cmd.extend(["-x", exclude_dirs]) |
| |
| result = subprocess.run( |
| cmd, |
| capture_output=True, |
| text=True, |
| cwd=self.project_root |
| ) |
| |
| |
| if result.stdout: |
| try: |
| bandit_output = json.loads(result.stdout) |
| issues = bandit_output.get("results", []) |
| |
| for issue in issues: |
| severity = issue.get("issue_severity", "LOW") |
| self.results["bandit_issues"].append({ |
| "file": issue.get("filename", ""), |
| "line": issue.get("line_number", 0), |
| "severity": severity, |
| "confidence": issue.get("issue_confidence", ""), |
| "issue": issue.get("issue_text", ""), |
| "code": issue.get("code", "") |
| }) |
| |
| |
| if severity in self.results["severity_counts"]: |
| self.results["severity_counts"][severity] += 1 |
| |
| logger.info(f"Bandit found {len(issues)} security issues") |
| |
| except json.JSONDecodeError: |
| logger.error("Failed to parse Bandit output") |
| self.results["errors"].append("Failed to parse Bandit output") |
| |
| except FileNotFoundError: |
| error_msg = "Bandit is not installed. Install with: pip install bandit" |
| logger.error(error_msg) |
| self.results["errors"].append(error_msg) |
| except Exception as e: |
| logger.error(f"Error running Bandit: {str(e)}") |
| self.results["errors"].append(f"Bandit error: {str(e)}") |
| |
| def _run_safety(self): |
| """Run Safety to check for vulnerable dependencies.""" |
| logger.info("Running Safety dependency checker...") |
| |
| try: |
| |
| req_files = list(self.project_root.glob("**/requirements*.txt")) |
| |
| if not req_files: |
| logger.warning("No requirements.txt files found") |
| return |
| |
| for req_file in req_files: |
| cmd = [ |
| "safety", |
| "check", |
| "--file", str(req_file), |
| "--json" |
| ] |
| |
| result = subprocess.run( |
| cmd, |
| capture_output=True, |
| text=True, |
| cwd=self.project_root |
| ) |
| |
| |
| if result.stdout: |
| try: |
| safety_output = json.loads(result.stdout) |
| |
| for vuln in safety_output: |
| self.results["safety_vulnerabilities"].append({ |
| "package": vuln[0], |
| "installed_version": vuln[2], |
| "affected_versions": vuln[1], |
| "vulnerability": vuln[3], |
| "severity": "HIGH", |
| "file": str(req_file) |
| }) |
| self.results["severity_counts"]["HIGH"] += 1 |
| |
| logger.info(f"Safety found {len(safety_output)} vulnerable dependencies in {req_file.name}") |
| |
| except json.JSONDecodeError: |
| logger.warning(f"Could not parse Safety output for {req_file.name}") |
| |
| except FileNotFoundError: |
| error_msg = "Safety is not installed. Install with: pip install safety" |
| logger.error(error_msg) |
| self.results["errors"].append(error_msg) |
| except Exception as e: |
| logger.error(f"Error running Safety: {str(e)}") |
| self.results["errors"].append(f"Safety error: {str(e)}") |
| |
| def _run_custom_checks(self): |
| """Run custom security checks specific to the project.""" |
| logger.info("Running custom security checks...") |
| |
| |
| self._check_env_files() |
| |
| |
| self._check_cors_config() |
| |
| |
| self._check_debug_mode() |
| |
| |
| self._check_jwt_secret() |
| |
| def _check_env_files(self): |
| """Check for potential exposed secrets in .env files.""" |
| env_files = list(self.project_root.glob("**/.env")) |
| |
| for env_file in env_files: |
| if env_file.name == ".env.example": |
| continue |
| |
| try: |
| with open(env_file, 'r') as f: |
| content = f.read() |
| |
| |
| if "SECRET_KEY=changeme" in content or "SECRET_KEY=secret" in content: |
| self.results["custom_checks"].append({ |
| "file": str(env_file), |
| "severity": "CRITICAL", |
| "issue": "Weak or default SECRET_KEY detected", |
| "recommendation": "Use a strong, randomly generated secret key" |
| }) |
| self.results["severity_counts"]["CRITICAL"] += 1 |
| |
| except Exception as e: |
| logger.warning(f"Could not read {env_file}: {str(e)}") |
| |
| def _check_cors_config(self): |
| """Check for overly permissive CORS configuration.""" |
| config_files = list(self.project_root.glob("**/config.py")) |
| |
| for config_file in config_files: |
| try: |
| with open(config_file, 'r') as f: |
| content = f.read() |
| |
| |
| if 'BACKEND_CORS_ORIGINS' in content and '"*"' in content: |
| self.results["custom_checks"].append({ |
| "file": str(config_file), |
| "severity": "MEDIUM", |
| "issue": "CORS allows all origins (*)", |
| "recommendation": "Restrict CORS origins in production to specific domains" |
| }) |
| self.results["severity_counts"]["MEDIUM"] += 1 |
| |
| except Exception as e: |
| logger.warning(f"Could not read {config_file}: {str(e)}") |
| |
| def _check_debug_mode(self): |
| """Check if DEBUG mode is enabled.""" |
| config_files = list(self.project_root.glob("**/config.py")) |
| |
| for config_file in config_files: |
| try: |
| with open(config_file, 'r') as f: |
| content = f.read() |
| |
| if 'DEBUG: bool = True' in content or 'DEBUG = True' in content: |
| self.results["custom_checks"].append({ |
| "file": str(config_file), |
| "severity": "MEDIUM", |
| "issue": "DEBUG mode is enabled", |
| "recommendation": "Disable DEBUG mode in production environments" |
| }) |
| self.results["severity_counts"]["MEDIUM"] += 1 |
| |
| except Exception as e: |
| logger.warning(f"Could not read {config_file}: {str(e)}") |
| |
| def _check_jwt_secret(self): |
| """Check JWT secret key strength.""" |
| env_files = list(self.project_root.glob("**/.env")) |
| |
| for env_file in env_files: |
| if env_file.name == ".env.example": |
| continue |
| |
| try: |
| with open(env_file, 'r') as f: |
| for line in f: |
| if line.startswith("SECRET_KEY="): |
| secret = line.split("=", 1)[1].strip() |
| |
| |
| if len(secret) < 32: |
| self.results["custom_checks"].append({ |
| "file": str(env_file), |
| "severity": "HIGH", |
| "issue": "JWT SECRET_KEY is too short (< 32 characters)", |
| "recommendation": "Use a secret key of at least 32 characters" |
| }) |
| self.results["severity_counts"]["HIGH"] += 1 |
| |
| except Exception as e: |
| logger.warning(f"Could not read {env_file}: {str(e)}") |
| |
| def get_summary(self) -> str: |
| """Get a summary of vulnerability check results.""" |
| summary = f""" |
| Vulnerability Check Summary: |
| - Total issues found: {self.results['total_issues']} |
| - Bandit security issues: {len(self.results['bandit_issues'])} |
| - Vulnerable dependencies: {len(self.results['safety_vulnerabilities'])} |
| - Custom check issues: {len(self.results['custom_checks'])} |
| |
| Severity Breakdown: |
| - CRITICAL: {self.results['severity_counts']['CRITICAL']} |
| - HIGH: {self.results['severity_counts']['HIGH']} |
| - MEDIUM: {self.results['severity_counts']['MEDIUM']} |
| - LOW: {self.results['severity_counts']['LOW']} |
| """ |
| return summary |
|
|