|
|
""" |
|
|
Bandit security scanner wrapper. |
|
|
|
|
|
Integrates the Bandit tool for Python-specific security analysis. |
|
|
""" |
|
|
|
|
|
import json |
|
|
import subprocess |
|
|
import sys |
|
|
from pathlib import Path |
|
|
from typing import Dict, List, Any, Optional |
|
|
|
|
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent.parent)) |
|
|
from utils import load_config |
|
|
|
|
|
|
|
|
SEVERITY_MAPPING = { |
|
|
"HIGH": "CRITICAL", |
|
|
"MEDIUM": "HIGH", |
|
|
"LOW": "MEDIUM", |
|
|
} |
|
|
|
|
|
|
|
|
def is_bandit_available() -> bool: |
|
|
""" |
|
|
Check if bandit is installed and available. |
|
|
|
|
|
Returns: |
|
|
True if bandit is available, False otherwise |
|
|
""" |
|
|
try: |
|
|
result = subprocess.run( |
|
|
["bandit", "--version"], |
|
|
capture_output=True, |
|
|
text=True, |
|
|
timeout=5 |
|
|
) |
|
|
return result.returncode == 0 |
|
|
except (FileNotFoundError, subprocess.TimeoutExpired): |
|
|
return False |
|
|
|
|
|
|
|
|
def map_severity(bandit_severity: str) -> str: |
|
|
""" |
|
|
Map Bandit severity to our standard severity levels. |
|
|
|
|
|
Args: |
|
|
bandit_severity: Bandit's severity (HIGH, MEDIUM, LOW) |
|
|
|
|
|
Returns: |
|
|
Standard severity level (CRITICAL, HIGH, MEDIUM, LOW) |
|
|
""" |
|
|
return SEVERITY_MAPPING.get(bandit_severity.upper(), "MEDIUM") |
|
|
|
|
|
|
|
|
def run_bandit(file_path: str, config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: |
|
|
""" |
|
|
Run bandit on a file and return JSON results. |
|
|
|
|
|
Args: |
|
|
file_path: Path to Python file to scan |
|
|
config: Optional configuration dictionary |
|
|
|
|
|
Returns: |
|
|
Bandit results as dictionary |
|
|
|
|
|
Raises: |
|
|
RuntimeError: If bandit is not available |
|
|
subprocess.TimeoutExpired: If bandit execution times out |
|
|
subprocess.CalledProcessError: If bandit execution fails |
|
|
""" |
|
|
if not is_bandit_available(): |
|
|
raise RuntimeError( |
|
|
"Bandit is not installed. Please install it with: pip install bandit" |
|
|
) |
|
|
|
|
|
if config is None: |
|
|
config = load_config().get("scanners", {}).get("bandit", {}) |
|
|
|
|
|
|
|
|
cmd = [ |
|
|
"bandit", |
|
|
"-f", "json", |
|
|
"-r", |
|
|
file_path |
|
|
] |
|
|
|
|
|
|
|
|
confidence_level = config.get("confidence_level", "").upper() |
|
|
if confidence_level in ["HIGH", "MEDIUM", "LOW"]: |
|
|
cmd.extend(["-ll"]) |
|
|
|
|
|
|
|
|
skip_tests = config.get("skip_tests", []) |
|
|
if skip_tests: |
|
|
cmd.extend(["-s", ",".join(skip_tests)]) |
|
|
|
|
|
try: |
|
|
|
|
|
result = subprocess.run( |
|
|
cmd, |
|
|
capture_output=True, |
|
|
text=True, |
|
|
timeout=30, |
|
|
check=False |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
if result.returncode > 1: |
|
|
raise subprocess.CalledProcessError( |
|
|
result.returncode, |
|
|
cmd, |
|
|
result.stdout, |
|
|
result.stderr |
|
|
) |
|
|
|
|
|
|
|
|
if result.stdout: |
|
|
return json.loads(result.stdout) |
|
|
else: |
|
|
return {"results": []} |
|
|
|
|
|
except json.JSONDecodeError as e: |
|
|
raise RuntimeError(f"Failed to parse bandit output: {e}") |
|
|
except subprocess.TimeoutExpired: |
|
|
raise RuntimeError("Bandit execution timed out (30s limit)") |
|
|
|
|
|
|
|
|
def parse_bandit_results(bandit_output: Dict[str, Any], file_path: str) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Parse bandit JSON output into standard vulnerability format. |
|
|
|
|
|
Args: |
|
|
bandit_output: Raw bandit JSON output |
|
|
file_path: Path to the scanned file |
|
|
|
|
|
Returns: |
|
|
List of vulnerability dictionaries in standard format |
|
|
""" |
|
|
vulnerabilities = [] |
|
|
|
|
|
results = bandit_output.get("results", []) |
|
|
|
|
|
for issue in results: |
|
|
|
|
|
test_id = issue.get("test_id", "UNKNOWN") |
|
|
test_name = issue.get("test_name", "unknown") |
|
|
bandit_severity = issue.get("issue_severity", "MEDIUM") |
|
|
confidence = issue.get("issue_confidence", "MEDIUM") |
|
|
line_number = issue.get("line_number", 0) |
|
|
code_snippet = issue.get("code", "").strip() |
|
|
issue_text = issue.get("issue_text", "Security issue detected") |
|
|
|
|
|
|
|
|
standard_severity = map_severity(bandit_severity) |
|
|
|
|
|
|
|
|
vulnerability = { |
|
|
"id": f"bandit-{test_id}", |
|
|
"severity": standard_severity, |
|
|
"title": f"Bandit: {issue_text}", |
|
|
"description": issue_text, |
|
|
"line_number": line_number, |
|
|
"code_snippet": code_snippet, |
|
|
"file_path": file_path, |
|
|
"scanner": "bandit", |
|
|
"bandit_test_id": test_id, |
|
|
"bandit_test_name": test_name, |
|
|
"bandit_severity": bandit_severity, |
|
|
"bandit_confidence": confidence, |
|
|
} |
|
|
|
|
|
vulnerabilities.append(vulnerability) |
|
|
|
|
|
return vulnerabilities |
|
|
|
|
|
|
|
|
def scan_with_bandit(file_path: str, config: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Main function to scan a file with bandit. |
|
|
|
|
|
Args: |
|
|
file_path: Path to Python file to scan |
|
|
config: Optional configuration dictionary |
|
|
|
|
|
Returns: |
|
|
List of vulnerability dictionaries |
|
|
|
|
|
Raises: |
|
|
RuntimeError: If bandit is not available or execution fails |
|
|
""" |
|
|
try: |
|
|
|
|
|
bandit_output = run_bandit(file_path, config) |
|
|
|
|
|
|
|
|
vulnerabilities = parse_bandit_results(bandit_output, file_path) |
|
|
|
|
|
|
|
|
if config is None: |
|
|
config = load_config().get("scanners", {}).get("bandit", {}) |
|
|
|
|
|
confidence_level = config.get("confidence_level", "").upper() |
|
|
if confidence_level: |
|
|
confidence_order = {"HIGH": 2, "MEDIUM": 1, "LOW": 0} |
|
|
min_confidence = confidence_order.get(confidence_level, 0) |
|
|
|
|
|
vulnerabilities = [ |
|
|
v for v in vulnerabilities |
|
|
if confidence_order.get(v["bandit_confidence"], 0) >= min_confidence |
|
|
] |
|
|
|
|
|
return vulnerabilities |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
print(f"Warning: Bandit scan failed: {e}", file=sys.stderr) |
|
|
return [] |
|
|
|
|
|
|
|
|
def get_bandit_version() -> str: |
|
|
""" |
|
|
Get the installed bandit version. |
|
|
|
|
|
Returns: |
|
|
Version string or "not installed" |
|
|
""" |
|
|
try: |
|
|
result = subprocess.run( |
|
|
["bandit", "--version"], |
|
|
capture_output=True, |
|
|
text=True, |
|
|
timeout=5 |
|
|
) |
|
|
if result.returncode == 0: |
|
|
|
|
|
for line in result.stdout.split("\n"): |
|
|
if "bandit" in line.lower(): |
|
|
return line.strip() |
|
|
return "unknown version" |
|
|
except (FileNotFoundError, subprocess.TimeoutExpired): |
|
|
return "not installed" |