""" Bandit security scanner wrapper. Integrates the Bandit tool for Python-specific security analysis. """ import json import subprocess import sys from pathlib import Path from typing import Dict, List, Any, Optional # Import config loader sys.path.insert(0, str(Path(__file__).parent.parent)) from utils import load_config # Severity mapping from Bandit to our standard SEVERITY_MAPPING = { "HIGH": "CRITICAL", "MEDIUM": "HIGH", "LOW": "MEDIUM", } def is_bandit_available() -> bool: """ Check if bandit is installed and available. Returns: True if bandit is available, False otherwise """ try: result = subprocess.run( ["bandit", "--version"], capture_output=True, text=True, timeout=5 ) return result.returncode == 0 except (FileNotFoundError, subprocess.TimeoutExpired): return False def map_severity(bandit_severity: str) -> str: """ Map Bandit severity to our standard severity levels. Args: bandit_severity: Bandit's severity (HIGH, MEDIUM, LOW) Returns: Standard severity level (CRITICAL, HIGH, MEDIUM, LOW) """ return SEVERITY_MAPPING.get(bandit_severity.upper(), "MEDIUM") def run_bandit(file_path: str, config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """ Run bandit on a file and return JSON results. Args: file_path: Path to Python file to scan config: Optional configuration dictionary Returns: Bandit results as dictionary Raises: RuntimeError: If bandit is not available subprocess.TimeoutExpired: If bandit execution times out subprocess.CalledProcessError: If bandit execution fails """ if not is_bandit_available(): raise RuntimeError( "Bandit is not installed. Please install it with: pip install bandit" ) if config is None: config = load_config().get("scanners", {}).get("bandit", {}) # Build bandit command cmd = [ "bandit", "-f", "json", # JSON output format "-r", # Recursive (even for single file, bandit expects this) file_path ] # Add confidence level if specified confidence_level = config.get("confidence_level", "").upper() if confidence_level in ["HIGH", "MEDIUM", "LOW"]: cmd.extend(["-ll"]) # Set minimum confidence level # Add skip tests if specified skip_tests = config.get("skip_tests", []) if skip_tests: cmd.extend(["-s", ",".join(skip_tests)]) try: # Run bandit result = subprocess.run( cmd, capture_output=True, text=True, timeout=30, # 30 second timeout check=False # Don't raise exception on non-zero exit ) # Bandit returns exit code 1 if issues found, which is expected # Only fail on actual errors (exit code > 1) if result.returncode > 1: raise subprocess.CalledProcessError( result.returncode, cmd, result.stdout, result.stderr ) # Parse JSON output if result.stdout: return json.loads(result.stdout) else: return {"results": []} except json.JSONDecodeError as e: raise RuntimeError(f"Failed to parse bandit output: {e}") except subprocess.TimeoutExpired: raise RuntimeError("Bandit execution timed out (30s limit)") def parse_bandit_results(bandit_output: Dict[str, Any], file_path: str) -> List[Dict[str, Any]]: """ Parse bandit JSON output into standard vulnerability format. Args: bandit_output: Raw bandit JSON output file_path: Path to the scanned file Returns: List of vulnerability dictionaries in standard format """ vulnerabilities = [] results = bandit_output.get("results", []) for issue in results: # Extract bandit data test_id = issue.get("test_id", "UNKNOWN") test_name = issue.get("test_name", "unknown") bandit_severity = issue.get("issue_severity", "MEDIUM") confidence = issue.get("issue_confidence", "MEDIUM") line_number = issue.get("line_number", 0) code_snippet = issue.get("code", "").strip() issue_text = issue.get("issue_text", "Security issue detected") # Map to standard severity standard_severity = map_severity(bandit_severity) # Create vulnerability entry vulnerability = { "id": f"bandit-{test_id}", "severity": standard_severity, "title": f"Bandit: {issue_text}", "description": issue_text, "line_number": line_number, "code_snippet": code_snippet, "file_path": file_path, "scanner": "bandit", "bandit_test_id": test_id, "bandit_test_name": test_name, "bandit_severity": bandit_severity, "bandit_confidence": confidence, } vulnerabilities.append(vulnerability) return vulnerabilities def scan_with_bandit(file_path: str, config: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]: """ Main function to scan a file with bandit. Args: file_path: Path to Python file to scan config: Optional configuration dictionary Returns: List of vulnerability dictionaries Raises: RuntimeError: If bandit is not available or execution fails """ try: # Run bandit bandit_output = run_bandit(file_path, config) # Parse results vulnerabilities = parse_bandit_results(bandit_output, file_path) # Filter by confidence if needed if config is None: config = load_config().get("scanners", {}).get("bandit", {}) confidence_level = config.get("confidence_level", "").upper() if confidence_level: confidence_order = {"HIGH": 2, "MEDIUM": 1, "LOW": 0} min_confidence = confidence_order.get(confidence_level, 0) vulnerabilities = [ v for v in vulnerabilities if confidence_order.get(v["bandit_confidence"], 0) >= min_confidence ] return vulnerabilities except Exception as e: # Log the error but don't fail the entire scan print(f"Warning: Bandit scan failed: {e}", file=sys.stderr) return [] def get_bandit_version() -> str: """ Get the installed bandit version. Returns: Version string or "not installed" """ try: result = subprocess.run( ["bandit", "--version"], capture_output=True, text=True, timeout=5 ) if result.returncode == 0: # Parse version from output for line in result.stdout.split("\n"): if "bandit" in line.lower(): return line.strip() return "unknown version" except (FileNotFoundError, subprocess.TimeoutExpired): return "not installed"