simple-security-scanner / src /scanner /semgrep_wrapper.py
garibong's picture
Add Gradio app with MCP server support
f8e78b2
"""
Semgrep integration wrapper for multi-language security scanning.
Runs semgrep as a subprocess and parses the JSON output.
"""
import subprocess
import json
import tempfile
import os
import sys
from pathlib import Path
from typing import Dict, List, Any, Optional
# Add parent directory to path to import utils
sys.path.insert(0, str(Path(__file__).parent.parent))
# Severity mapping from semgrep to standard format
SEVERITY_MAPPING = {
"ERROR": "CRITICAL",
"WARNING": "HIGH",
"INFO": "MEDIUM"
}
# Default Python security rule categories
DEFAULT_PYTHON_RULES = [
"python.lang.security",
"python.flask.security",
"python.django.security",
"python.requests.security"
]
def check_semgrep_installed() -> bool:
"""
Check if semgrep is installed and accessible.
Returns:
True if semgrep is available, False otherwise
"""
try:
result = subprocess.run(
["semgrep", "--version"],
capture_output=True,
text=True,
timeout=5
)
return result.returncode == 0
except (subprocess.TimeoutExpired, FileNotFoundError):
return False
def find_custom_rules() -> List[str]:
"""
Find custom rule files in the rules/ directory.
Returns:
List of paths to custom rule files
"""
rules_dir = Path(__file__).parent.parent.parent / "rules"
custom_rules = []
if rules_dir.exists() and rules_dir.is_dir():
# Look for YAML rule files
for pattern in ["*.yaml", "*.yml"]:
custom_rules.extend([str(f) for f in rules_dir.glob(pattern)])
return custom_rules
def load_config_rules(config: Optional[Dict[str, Any]] = None) -> List[str]:
"""
Load custom rules from configuration.
Args:
config: Configuration dictionary with optional 'custom_rules' key
Returns:
List of rule file paths from configuration
"""
if not config:
return []
custom_rules = config.get("custom_rules", [])
if isinstance(custom_rules, list):
return custom_rules
return []
def run_semgrep(
file_path: str,
config: Optional[Dict[str, Any]] = None,
timeout: int = 30
) -> Dict[str, Any]:
"""
Run semgrep on a file and return JSON results.
Args:
file_path: Path to the file to scan
config: Optional configuration dictionary
timeout: Timeout in seconds (default 30)
Returns:
Dictionary with semgrep results or error information
"""
# Check if semgrep is installed
if not check_semgrep_installed():
return {
"error": "semgrep_not_installed",
"message": "Semgrep is not installed. Install it with: pip install semgrep"
}
# Build semgrep command
cmd = ["semgrep", "--json", "--quiet"]
# Add custom rules
custom_rules = find_custom_rules()
config_rules = load_config_rules(config)
all_rules = custom_rules + config_rules
if all_rules:
# Use custom rules
for rule_file in all_rules:
if os.path.exists(rule_file):
cmd.extend(["--config", rule_file])
else:
# Use auto configuration (community rules)
cmd.extend(["--config", "auto"])
# Add target file
cmd.append(file_path)
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout
)
# Parse JSON output
if result.stdout:
try:
output = json.loads(result.stdout)
return output
except json.JSONDecodeError as e:
return {
"error": "json_parse_error",
"message": f"Failed to parse semgrep output: {str(e)}",
"raw_output": result.stdout
}
else:
# No output means no findings
return {"results": []}
except subprocess.TimeoutExpired:
return {
"error": "timeout",
"message": f"Semgrep scan timed out after {timeout} seconds"
}
except FileNotFoundError:
return {
"error": "semgrep_not_found",
"message": "Semgrep executable not found in PATH"
}
except Exception as e:
return {
"error": "unexpected_error",
"message": f"Unexpected error running semgrep: {str(e)}"
}
def parse_semgrep_results(
semgrep_output: Dict[str, Any],
file_path: str
) -> List[Dict[str, Any]]:
"""
Parse semgrep JSON output into standard vulnerability format.
Args:
semgrep_output: Raw semgrep JSON output
file_path: Path to the scanned file
Returns:
List of standardized vulnerability dictionaries
"""
vulnerabilities = []
# Check for errors
if "error" in semgrep_output:
# Return empty list for errors - they've been logged
return vulnerabilities
# Get results from semgrep output
results = semgrep_output.get("results", [])
for finding in results:
# Extract basic information
check_id = finding.get("check_id", "unknown")
message = finding.get("extra", {}).get("message", finding.get("message", ""))
severity = finding.get("extra", {}).get("severity", "INFO").upper()
# Map severity to standard format
standard_severity = SEVERITY_MAPPING.get(severity, "MEDIUM")
# Get location information
start = finding.get("start", {})
line_number = start.get("line", 0)
# Get code snippet
lines = finding.get("extra", {}).get("lines", "")
if not lines:
# Try to extract from the finding
lines = finding.get("lines", "")
code_snippet = lines.strip() if lines else ""
# Get metadata
metadata = finding.get("extra", {}).get("metadata", {})
# Create vulnerability entry
vulnerability = {
"id": f"semgrep-{check_id}",
"severity": standard_severity,
"title": f"Semgrep: {message[:80]}",
"description": message,
"line_number": line_number,
"code_snippet": code_snippet,
"file_path": file_path,
"scanner": "semgrep",
"semgrep_rule_id": check_id,
"semgrep_message": message,
"semgrep_metadata": metadata
}
vulnerabilities.append(vulnerability)
return vulnerabilities
def scan_with_semgrep(
file_path: str,
code: str,
config: Optional[Dict[str, Any]] = None
) -> List[Dict[str, Any]]:
"""
Scan code using semgrep.
Args:
file_path: Original file path (for reference)
code: Source code to scan
config: Optional configuration dictionary
Returns:
List of vulnerability dictionaries
"""
vulnerabilities = []
# Create temporary file with the code
with tempfile.NamedTemporaryFile(
mode='w',
suffix=Path(file_path).suffix or '.py',
delete=False
) as temp_file:
temp_file.write(code)
temp_path = temp_file.name
try:
# Run semgrep on temporary file
timeout = config.get("semgrep_timeout", 30) if config else 30
semgrep_output = run_semgrep(temp_path, config, timeout)
# Parse results
vulnerabilities = parse_semgrep_results(semgrep_output, file_path)
finally:
# Clean up temporary file
try:
os.unlink(temp_path)
except Exception:
pass # Ignore cleanup errors
return vulnerabilities
def scan_with_custom_rules(
file_path: str,
code: str,
rule_files: List[str]
) -> List[Dict[str, Any]]:
"""
Scan code using specific custom rule files.
Args:
file_path: Original file path (for reference)
code: Source code to scan
rule_files: List of paths to rule files
Returns:
List of vulnerability dictionaries
"""
config = {"custom_rules": rule_files}
return scan_with_semgrep(file_path, code, config)
# Test function
def test_semgrep_wrapper():
"""Test the semgrep wrapper with sample vulnerable code."""
print("Testing Semgrep Wrapper...")
print("-" * 50)
# Check if semgrep is installed
if not check_semgrep_installed():
print("❌ Semgrep is not installed")
print("Install it with: pip install semgrep")
return
print("✓ Semgrep is installed")
# Test code with security issues
test_code = '''
import pickle
import subprocess
# Insecure deserialization
def load_data(data):
return pickle.loads(data) # Vulnerable to code execution
# Command injection
def run_command(user_input):
subprocess.call("ls " + user_input, shell=True) # Shell injection
# Hardcoded secret
api_key = "sk-1234567890abcdef"
'''
print("\nScanning test code...")
vulnerabilities = scan_with_semgrep("test.py", test_code)
print(f"\n✓ Found {len(vulnerabilities)} issue(s)")
if vulnerabilities:
print("\nDetected vulnerabilities:")
for i, vuln in enumerate(vulnerabilities, 1):
print(f"\n[{i}] {vuln['title']}")
print(f" Severity: {vuln['severity']}")
print(f" Line: {vuln['line_number']}")
print(f" Rule: {vuln['semgrep_rule_id']}")
# Test custom rules
custom_rules = find_custom_rules()
if custom_rules:
print(f"\n✓ Found {len(custom_rules)} custom rule file(s):")
for rule in custom_rules:
print(f" - {rule}")
else:
print("\n✓ No custom rule files found in rules/")
print("\n✅ Semgrep wrapper test: SUCCESS")
if __name__ == "__main__":
test_semgrep_wrapper()