Finance / modules /vulnerability_checker.py
BOLO-KESARI
Initial commit for deployment
d2426db
"""
Vulnerability Checker Module - Security scanning and vulnerability detection.
This module handles:
- Bandit security scanning for Python code
- Safety checks for dependency vulnerabilities
- Custom security rule validation
"""
import os
import subprocess
import logging
import json
from pathlib import Path
from typing import List, Dict, Any
logger = logging.getLogger(__name__)
class VulnerabilityChecker:
"""Security vulnerability scanning and detection."""
def __init__(self, config: Dict[str, Any], project_root: str):
"""
Initialize the vulnerability checker.
Args:
config: Configuration dictionary for vulnerability checker
project_root: Root directory of the project
"""
self.config = config
self.project_root = Path(project_root)
self.results = {
"bandit_issues": [],
"safety_vulnerabilities": [],
"custom_checks": [],
"total_issues": 0,
"severity_counts": {
"CRITICAL": 0,
"HIGH": 0,
"MEDIUM": 0,
"LOW": 0
},
"errors": []
}
def run(self) -> Dict[str, Any]:
"""
Run all vulnerability checks.
Returns:
Dictionary containing results of vulnerability checks
"""
logger.info("Starting vulnerability checks...")
if not self.config.get("enabled", True):
logger.info("Vulnerability checker is disabled in configuration")
return self.results
# Run Bandit security scanner
if self.config.get("run_bandit", True):
self._run_bandit()
# Run Safety dependency checker
if self.config.get("run_safety", True):
self._run_safety()
# Run custom security checks
if self.config.get("custom_security_checks", True):
self._run_custom_checks()
# Calculate total issues
self.results["total_issues"] = (
len(self.results["bandit_issues"]) +
len(self.results["safety_vulnerabilities"]) +
len(self.results["custom_checks"])
)
logger.info(f"Vulnerability checks completed. Found {self.results['total_issues']} issues")
return self.results
def _run_bandit(self):
"""Run Bandit security scanner."""
logger.info("Running Bandit security scanner...")
try:
exclude_dirs = ",".join(self.config.get("exclude_dirs", []))
cmd = [
"bandit",
"-r", str(self.project_root),
"-f", "json",
"-ll" # Only report low severity and above
]
if exclude_dirs:
cmd.extend(["-x", exclude_dirs])
result = subprocess.run(
cmd,
capture_output=True,
text=True,
cwd=self.project_root
)
# Bandit returns non-zero if issues found, which is expected
if result.stdout:
try:
bandit_output = json.loads(result.stdout)
issues = bandit_output.get("results", [])
for issue in issues:
severity = issue.get("issue_severity", "LOW")
self.results["bandit_issues"].append({
"file": issue.get("filename", ""),
"line": issue.get("line_number", 0),
"severity": severity,
"confidence": issue.get("issue_confidence", ""),
"issue": issue.get("issue_text", ""),
"code": issue.get("code", "")
})
# Update severity counts
if severity in self.results["severity_counts"]:
self.results["severity_counts"][severity] += 1
logger.info(f"Bandit found {len(issues)} security issues")
except json.JSONDecodeError:
logger.error("Failed to parse Bandit output")
self.results["errors"].append("Failed to parse Bandit output")
except FileNotFoundError:
error_msg = "Bandit is not installed. Install with: pip install bandit"
logger.error(error_msg)
self.results["errors"].append(error_msg)
except Exception as e:
logger.error(f"Error running Bandit: {str(e)}")
self.results["errors"].append(f"Bandit error: {str(e)}")
def _run_safety(self):
"""Run Safety to check for vulnerable dependencies."""
logger.info("Running Safety dependency checker...")
try:
# Check if requirements.txt exists
req_files = list(self.project_root.glob("**/requirements*.txt"))
if not req_files:
logger.warning("No requirements.txt files found")
return
for req_file in req_files:
cmd = [
"safety",
"check",
"--file", str(req_file),
"--json"
]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
cwd=self.project_root
)
# Safety returns non-zero if vulnerabilities found
if result.stdout:
try:
safety_output = json.loads(result.stdout)
for vuln in safety_output:
self.results["safety_vulnerabilities"].append({
"package": vuln[0],
"installed_version": vuln[2],
"affected_versions": vuln[1],
"vulnerability": vuln[3],
"severity": "HIGH", # Safety doesn't provide severity, assume HIGH
"file": str(req_file)
})
self.results["severity_counts"]["HIGH"] += 1
logger.info(f"Safety found {len(safety_output)} vulnerable dependencies in {req_file.name}")
except json.JSONDecodeError:
logger.warning(f"Could not parse Safety output for {req_file.name}")
except FileNotFoundError:
error_msg = "Safety is not installed. Install with: pip install safety"
logger.error(error_msg)
self.results["errors"].append(error_msg)
except Exception as e:
logger.error(f"Error running Safety: {str(e)}")
self.results["errors"].append(f"Safety error: {str(e)}")
def _run_custom_checks(self):
"""Run custom security checks specific to the project."""
logger.info("Running custom security checks...")
# Check 1: Look for exposed secrets in .env files
self._check_env_files()
# Check 2: Verify CORS configuration
self._check_cors_config()
# Check 3: Check for DEBUG mode in production
self._check_debug_mode()
# Check 4: Validate JWT secret key strength
self._check_jwt_secret()
def _check_env_files(self):
"""Check for potential exposed secrets in .env files."""
env_files = list(self.project_root.glob("**/.env"))
for env_file in env_files:
if env_file.name == ".env.example":
continue
try:
with open(env_file, 'r') as f:
content = f.read()
# Check for weak or default secrets
if "SECRET_KEY=changeme" in content or "SECRET_KEY=secret" in content:
self.results["custom_checks"].append({
"file": str(env_file),
"severity": "CRITICAL",
"issue": "Weak or default SECRET_KEY detected",
"recommendation": "Use a strong, randomly generated secret key"
})
self.results["severity_counts"]["CRITICAL"] += 1
except Exception as e:
logger.warning(f"Could not read {env_file}: {str(e)}")
def _check_cors_config(self):
"""Check for overly permissive CORS configuration."""
config_files = list(self.project_root.glob("**/config.py"))
for config_file in config_files:
try:
with open(config_file, 'r') as f:
content = f.read()
# Check for allow all origins in production
if 'BACKEND_CORS_ORIGINS' in content and '"*"' in content:
self.results["custom_checks"].append({
"file": str(config_file),
"severity": "MEDIUM",
"issue": "CORS allows all origins (*)",
"recommendation": "Restrict CORS origins in production to specific domains"
})
self.results["severity_counts"]["MEDIUM"] += 1
except Exception as e:
logger.warning(f"Could not read {config_file}: {str(e)}")
def _check_debug_mode(self):
"""Check if DEBUG mode is enabled."""
config_files = list(self.project_root.glob("**/config.py"))
for config_file in config_files:
try:
with open(config_file, 'r') as f:
content = f.read()
if 'DEBUG: bool = True' in content or 'DEBUG = True' in content:
self.results["custom_checks"].append({
"file": str(config_file),
"severity": "MEDIUM",
"issue": "DEBUG mode is enabled",
"recommendation": "Disable DEBUG mode in production environments"
})
self.results["severity_counts"]["MEDIUM"] += 1
except Exception as e:
logger.warning(f"Could not read {config_file}: {str(e)}")
def _check_jwt_secret(self):
"""Check JWT secret key strength."""
env_files = list(self.project_root.glob("**/.env"))
for env_file in env_files:
if env_file.name == ".env.example":
continue
try:
with open(env_file, 'r') as f:
for line in f:
if line.startswith("SECRET_KEY="):
secret = line.split("=", 1)[1].strip()
# Check secret key length
if len(secret) < 32:
self.results["custom_checks"].append({
"file": str(env_file),
"severity": "HIGH",
"issue": "JWT SECRET_KEY is too short (< 32 characters)",
"recommendation": "Use a secret key of at least 32 characters"
})
self.results["severity_counts"]["HIGH"] += 1
except Exception as e:
logger.warning(f"Could not read {env_file}: {str(e)}")
def get_summary(self) -> str:
"""Get a summary of vulnerability check results."""
summary = f"""
Vulnerability Check Summary:
- Total issues found: {self.results['total_issues']}
- Bandit security issues: {len(self.results['bandit_issues'])}
- Vulnerable dependencies: {len(self.results['safety_vulnerabilities'])}
- Custom check issues: {len(self.results['custom_checks'])}
Severity Breakdown:
- CRITICAL: {self.results['severity_counts']['CRITICAL']}
- HIGH: {self.results['severity_counts']['HIGH']}
- MEDIUM: {self.results['severity_counts']['MEDIUM']}
- LOW: {self.results['severity_counts']['LOW']}
"""
return summary