|
|
""" |
|
|
Semgrep integration wrapper for multi-language security scanning. |
|
|
|
|
|
Runs semgrep as a subprocess and parses the JSON output. |
|
|
""" |
|
|
|
|
|
import subprocess |
|
|
import json |
|
|
import tempfile |
|
|
import os |
|
|
import sys |
|
|
from pathlib import Path |
|
|
from typing import Dict, List, Any, Optional |
|
|
|
|
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent.parent)) |
|
|
|
|
|
|
|
|
SEVERITY_MAPPING = { |
|
|
"ERROR": "CRITICAL", |
|
|
"WARNING": "HIGH", |
|
|
"INFO": "MEDIUM" |
|
|
} |
|
|
|
|
|
|
|
|
DEFAULT_PYTHON_RULES = [ |
|
|
"python.lang.security", |
|
|
"python.flask.security", |
|
|
"python.django.security", |
|
|
"python.requests.security" |
|
|
] |
|
|
|
|
|
|
|
|
def check_semgrep_installed() -> bool: |
|
|
""" |
|
|
Check if semgrep is installed and accessible. |
|
|
|
|
|
Returns: |
|
|
True if semgrep is available, False otherwise |
|
|
""" |
|
|
try: |
|
|
result = subprocess.run( |
|
|
["semgrep", "--version"], |
|
|
capture_output=True, |
|
|
text=True, |
|
|
timeout=5 |
|
|
) |
|
|
return result.returncode == 0 |
|
|
except (subprocess.TimeoutExpired, FileNotFoundError): |
|
|
return False |
|
|
|
|
|
|
|
|
def find_custom_rules() -> List[str]: |
|
|
""" |
|
|
Find custom rule files in the rules/ directory. |
|
|
|
|
|
Returns: |
|
|
List of paths to custom rule files |
|
|
""" |
|
|
rules_dir = Path(__file__).parent.parent.parent / "rules" |
|
|
custom_rules = [] |
|
|
|
|
|
if rules_dir.exists() and rules_dir.is_dir(): |
|
|
|
|
|
for pattern in ["*.yaml", "*.yml"]: |
|
|
custom_rules.extend([str(f) for f in rules_dir.glob(pattern)]) |
|
|
|
|
|
return custom_rules |
|
|
|
|
|
|
|
|
def load_config_rules(config: Optional[Dict[str, Any]] = None) -> List[str]: |
|
|
""" |
|
|
Load custom rules from configuration. |
|
|
|
|
|
Args: |
|
|
config: Configuration dictionary with optional 'custom_rules' key |
|
|
|
|
|
Returns: |
|
|
List of rule file paths from configuration |
|
|
""" |
|
|
if not config: |
|
|
return [] |
|
|
|
|
|
custom_rules = config.get("custom_rules", []) |
|
|
if isinstance(custom_rules, list): |
|
|
return custom_rules |
|
|
|
|
|
return [] |
|
|
|
|
|
|
|
|
def run_semgrep( |
|
|
file_path: str, |
|
|
config: Optional[Dict[str, Any]] = None, |
|
|
timeout: int = 30 |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Run semgrep on a file and return JSON results. |
|
|
|
|
|
Args: |
|
|
file_path: Path to the file to scan |
|
|
config: Optional configuration dictionary |
|
|
timeout: Timeout in seconds (default 30) |
|
|
|
|
|
Returns: |
|
|
Dictionary with semgrep results or error information |
|
|
""" |
|
|
|
|
|
if not check_semgrep_installed(): |
|
|
return { |
|
|
"error": "semgrep_not_installed", |
|
|
"message": "Semgrep is not installed. Install it with: pip install semgrep" |
|
|
} |
|
|
|
|
|
|
|
|
cmd = ["semgrep", "--json", "--quiet"] |
|
|
|
|
|
|
|
|
custom_rules = find_custom_rules() |
|
|
config_rules = load_config_rules(config) |
|
|
all_rules = custom_rules + config_rules |
|
|
|
|
|
if all_rules: |
|
|
|
|
|
for rule_file in all_rules: |
|
|
if os.path.exists(rule_file): |
|
|
cmd.extend(["--config", rule_file]) |
|
|
else: |
|
|
|
|
|
cmd.extend(["--config", "auto"]) |
|
|
|
|
|
|
|
|
cmd.append(file_path) |
|
|
|
|
|
try: |
|
|
result = subprocess.run( |
|
|
cmd, |
|
|
capture_output=True, |
|
|
text=True, |
|
|
timeout=timeout |
|
|
) |
|
|
|
|
|
|
|
|
if result.stdout: |
|
|
try: |
|
|
output = json.loads(result.stdout) |
|
|
return output |
|
|
except json.JSONDecodeError as e: |
|
|
return { |
|
|
"error": "json_parse_error", |
|
|
"message": f"Failed to parse semgrep output: {str(e)}", |
|
|
"raw_output": result.stdout |
|
|
} |
|
|
else: |
|
|
|
|
|
return {"results": []} |
|
|
|
|
|
except subprocess.TimeoutExpired: |
|
|
return { |
|
|
"error": "timeout", |
|
|
"message": f"Semgrep scan timed out after {timeout} seconds" |
|
|
} |
|
|
except FileNotFoundError: |
|
|
return { |
|
|
"error": "semgrep_not_found", |
|
|
"message": "Semgrep executable not found in PATH" |
|
|
} |
|
|
except Exception as e: |
|
|
return { |
|
|
"error": "unexpected_error", |
|
|
"message": f"Unexpected error running semgrep: {str(e)}" |
|
|
} |
|
|
|
|
|
|
|
|
def parse_semgrep_results( |
|
|
semgrep_output: Dict[str, Any], |
|
|
file_path: str |
|
|
) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Parse semgrep JSON output into standard vulnerability format. |
|
|
|
|
|
Args: |
|
|
semgrep_output: Raw semgrep JSON output |
|
|
file_path: Path to the scanned file |
|
|
|
|
|
Returns: |
|
|
List of standardized vulnerability dictionaries |
|
|
""" |
|
|
vulnerabilities = [] |
|
|
|
|
|
|
|
|
if "error" in semgrep_output: |
|
|
|
|
|
return vulnerabilities |
|
|
|
|
|
|
|
|
results = semgrep_output.get("results", []) |
|
|
|
|
|
for finding in results: |
|
|
|
|
|
check_id = finding.get("check_id", "unknown") |
|
|
message = finding.get("extra", {}).get("message", finding.get("message", "")) |
|
|
severity = finding.get("extra", {}).get("severity", "INFO").upper() |
|
|
|
|
|
|
|
|
standard_severity = SEVERITY_MAPPING.get(severity, "MEDIUM") |
|
|
|
|
|
|
|
|
start = finding.get("start", {}) |
|
|
line_number = start.get("line", 0) |
|
|
|
|
|
|
|
|
lines = finding.get("extra", {}).get("lines", "") |
|
|
if not lines: |
|
|
|
|
|
lines = finding.get("lines", "") |
|
|
code_snippet = lines.strip() if lines else "" |
|
|
|
|
|
|
|
|
metadata = finding.get("extra", {}).get("metadata", {}) |
|
|
|
|
|
|
|
|
vulnerability = { |
|
|
"id": f"semgrep-{check_id}", |
|
|
"severity": standard_severity, |
|
|
"title": f"Semgrep: {message[:80]}", |
|
|
"description": message, |
|
|
"line_number": line_number, |
|
|
"code_snippet": code_snippet, |
|
|
"file_path": file_path, |
|
|
"scanner": "semgrep", |
|
|
"semgrep_rule_id": check_id, |
|
|
"semgrep_message": message, |
|
|
"semgrep_metadata": metadata |
|
|
} |
|
|
|
|
|
vulnerabilities.append(vulnerability) |
|
|
|
|
|
return vulnerabilities |
|
|
|
|
|
|
|
|
def scan_with_semgrep( |
|
|
file_path: str, |
|
|
code: str, |
|
|
config: Optional[Dict[str, Any]] = None |
|
|
) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Scan code using semgrep. |
|
|
|
|
|
Args: |
|
|
file_path: Original file path (for reference) |
|
|
code: Source code to scan |
|
|
config: Optional configuration dictionary |
|
|
|
|
|
Returns: |
|
|
List of vulnerability dictionaries |
|
|
""" |
|
|
vulnerabilities = [] |
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile( |
|
|
mode='w', |
|
|
suffix=Path(file_path).suffix or '.py', |
|
|
delete=False |
|
|
) as temp_file: |
|
|
temp_file.write(code) |
|
|
temp_path = temp_file.name |
|
|
|
|
|
try: |
|
|
|
|
|
timeout = config.get("semgrep_timeout", 30) if config else 30 |
|
|
semgrep_output = run_semgrep(temp_path, config, timeout) |
|
|
|
|
|
|
|
|
vulnerabilities = parse_semgrep_results(semgrep_output, file_path) |
|
|
|
|
|
finally: |
|
|
|
|
|
try: |
|
|
os.unlink(temp_path) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
return vulnerabilities |
|
|
|
|
|
|
|
|
def scan_with_custom_rules( |
|
|
file_path: str, |
|
|
code: str, |
|
|
rule_files: List[str] |
|
|
) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Scan code using specific custom rule files. |
|
|
|
|
|
Args: |
|
|
file_path: Original file path (for reference) |
|
|
code: Source code to scan |
|
|
rule_files: List of paths to rule files |
|
|
|
|
|
Returns: |
|
|
List of vulnerability dictionaries |
|
|
""" |
|
|
config = {"custom_rules": rule_files} |
|
|
return scan_with_semgrep(file_path, code, config) |
|
|
|
|
|
|
|
|
|
|
|
def test_semgrep_wrapper(): |
|
|
"""Test the semgrep wrapper with sample vulnerable code.""" |
|
|
print("Testing Semgrep Wrapper...") |
|
|
print("-" * 50) |
|
|
|
|
|
|
|
|
if not check_semgrep_installed(): |
|
|
print("❌ Semgrep is not installed") |
|
|
print("Install it with: pip install semgrep") |
|
|
return |
|
|
|
|
|
print("✓ Semgrep is installed") |
|
|
|
|
|
|
|
|
test_code = ''' |
|
|
import pickle |
|
|
import subprocess |
|
|
|
|
|
# Insecure deserialization |
|
|
def load_data(data): |
|
|
return pickle.loads(data) # Vulnerable to code execution |
|
|
|
|
|
# Command injection |
|
|
def run_command(user_input): |
|
|
subprocess.call("ls " + user_input, shell=True) # Shell injection |
|
|
|
|
|
# Hardcoded secret |
|
|
api_key = "sk-1234567890abcdef" |
|
|
''' |
|
|
|
|
|
print("\nScanning test code...") |
|
|
vulnerabilities = scan_with_semgrep("test.py", test_code) |
|
|
|
|
|
print(f"\n✓ Found {len(vulnerabilities)} issue(s)") |
|
|
|
|
|
if vulnerabilities: |
|
|
print("\nDetected vulnerabilities:") |
|
|
for i, vuln in enumerate(vulnerabilities, 1): |
|
|
print(f"\n[{i}] {vuln['title']}") |
|
|
print(f" Severity: {vuln['severity']}") |
|
|
print(f" Line: {vuln['line_number']}") |
|
|
print(f" Rule: {vuln['semgrep_rule_id']}") |
|
|
|
|
|
|
|
|
custom_rules = find_custom_rules() |
|
|
if custom_rules: |
|
|
print(f"\n✓ Found {len(custom_rules)} custom rule file(s):") |
|
|
for rule in custom_rules: |
|
|
print(f" - {rule}") |
|
|
else: |
|
|
print("\n✓ No custom rule files found in rules/") |
|
|
|
|
|
print("\n✅ Semgrep wrapper test: SUCCESS") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
test_semgrep_wrapper() |
|
|
|