garibong's picture
Add Gradio app with MCP server support
f8e78b2
"""
Bandit security scanner wrapper.
Integrates the Bandit tool for Python-specific security analysis.
"""
import json
import subprocess
import sys
from pathlib import Path
from typing import Dict, List, Any, Optional
# Import config loader
sys.path.insert(0, str(Path(__file__).parent.parent))
from utils import load_config
# Severity mapping from Bandit to our standard
SEVERITY_MAPPING = {
"HIGH": "CRITICAL",
"MEDIUM": "HIGH",
"LOW": "MEDIUM",
}
def is_bandit_available() -> bool:
"""
Check if bandit is installed and available.
Returns:
True if bandit is available, False otherwise
"""
try:
result = subprocess.run(
["bandit", "--version"],
capture_output=True,
text=True,
timeout=5
)
return result.returncode == 0
except (FileNotFoundError, subprocess.TimeoutExpired):
return False
def map_severity(bandit_severity: str) -> str:
"""
Map Bandit severity to our standard severity levels.
Args:
bandit_severity: Bandit's severity (HIGH, MEDIUM, LOW)
Returns:
Standard severity level (CRITICAL, HIGH, MEDIUM, LOW)
"""
return SEVERITY_MAPPING.get(bandit_severity.upper(), "MEDIUM")
def run_bandit(file_path: str, config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Run bandit on a file and return JSON results.
Args:
file_path: Path to Python file to scan
config: Optional configuration dictionary
Returns:
Bandit results as dictionary
Raises:
RuntimeError: If bandit is not available
subprocess.TimeoutExpired: If bandit execution times out
subprocess.CalledProcessError: If bandit execution fails
"""
if not is_bandit_available():
raise RuntimeError(
"Bandit is not installed. Please install it with: pip install bandit"
)
if config is None:
config = load_config().get("scanners", {}).get("bandit", {})
# Build bandit command
cmd = [
"bandit",
"-f", "json", # JSON output format
"-r", # Recursive (even for single file, bandit expects this)
file_path
]
# Add confidence level if specified
confidence_level = config.get("confidence_level", "").upper()
if confidence_level in ["HIGH", "MEDIUM", "LOW"]:
cmd.extend(["-ll"]) # Set minimum confidence level
# Add skip tests if specified
skip_tests = config.get("skip_tests", [])
if skip_tests:
cmd.extend(["-s", ",".join(skip_tests)])
try:
# Run bandit
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=30, # 30 second timeout
check=False # Don't raise exception on non-zero exit
)
# Bandit returns exit code 1 if issues found, which is expected
# Only fail on actual errors (exit code > 1)
if result.returncode > 1:
raise subprocess.CalledProcessError(
result.returncode,
cmd,
result.stdout,
result.stderr
)
# Parse JSON output
if result.stdout:
return json.loads(result.stdout)
else:
return {"results": []}
except json.JSONDecodeError as e:
raise RuntimeError(f"Failed to parse bandit output: {e}")
except subprocess.TimeoutExpired:
raise RuntimeError("Bandit execution timed out (30s limit)")
def parse_bandit_results(bandit_output: Dict[str, Any], file_path: str) -> List[Dict[str, Any]]:
"""
Parse bandit JSON output into standard vulnerability format.
Args:
bandit_output: Raw bandit JSON output
file_path: Path to the scanned file
Returns:
List of vulnerability dictionaries in standard format
"""
vulnerabilities = []
results = bandit_output.get("results", [])
for issue in results:
# Extract bandit data
test_id = issue.get("test_id", "UNKNOWN")
test_name = issue.get("test_name", "unknown")
bandit_severity = issue.get("issue_severity", "MEDIUM")
confidence = issue.get("issue_confidence", "MEDIUM")
line_number = issue.get("line_number", 0)
code_snippet = issue.get("code", "").strip()
issue_text = issue.get("issue_text", "Security issue detected")
# Map to standard severity
standard_severity = map_severity(bandit_severity)
# Create vulnerability entry
vulnerability = {
"id": f"bandit-{test_id}",
"severity": standard_severity,
"title": f"Bandit: {issue_text}",
"description": issue_text,
"line_number": line_number,
"code_snippet": code_snippet,
"file_path": file_path,
"scanner": "bandit",
"bandit_test_id": test_id,
"bandit_test_name": test_name,
"bandit_severity": bandit_severity,
"bandit_confidence": confidence,
}
vulnerabilities.append(vulnerability)
return vulnerabilities
def scan_with_bandit(file_path: str, config: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]:
"""
Main function to scan a file with bandit.
Args:
file_path: Path to Python file to scan
config: Optional configuration dictionary
Returns:
List of vulnerability dictionaries
Raises:
RuntimeError: If bandit is not available or execution fails
"""
try:
# Run bandit
bandit_output = run_bandit(file_path, config)
# Parse results
vulnerabilities = parse_bandit_results(bandit_output, file_path)
# Filter by confidence if needed
if config is None:
config = load_config().get("scanners", {}).get("bandit", {})
confidence_level = config.get("confidence_level", "").upper()
if confidence_level:
confidence_order = {"HIGH": 2, "MEDIUM": 1, "LOW": 0}
min_confidence = confidence_order.get(confidence_level, 0)
vulnerabilities = [
v for v in vulnerabilities
if confidence_order.get(v["bandit_confidence"], 0) >= min_confidence
]
return vulnerabilities
except Exception as e:
# Log the error but don't fail the entire scan
print(f"Warning: Bandit scan failed: {e}", file=sys.stderr)
return []
def get_bandit_version() -> str:
"""
Get the installed bandit version.
Returns:
Version string or "not installed"
"""
try:
result = subprocess.run(
["bandit", "--version"],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
# Parse version from output
for line in result.stdout.split("\n"):
if "bandit" in line.lower():
return line.strip()
return "unknown version"
except (FileNotFoundError, subprocess.TimeoutExpired):
return "not installed"