""" Semgrep integration wrapper for multi-language security scanning. Runs semgrep as a subprocess and parses the JSON output. """ import subprocess import json import tempfile import os import sys from pathlib import Path from typing import Dict, List, Any, Optional # Add parent directory to path to import utils sys.path.insert(0, str(Path(__file__).parent.parent)) # Severity mapping from semgrep to standard format SEVERITY_MAPPING = { "ERROR": "CRITICAL", "WARNING": "HIGH", "INFO": "MEDIUM" } # Default Python security rule categories DEFAULT_PYTHON_RULES = [ "python.lang.security", "python.flask.security", "python.django.security", "python.requests.security" ] def check_semgrep_installed() -> bool: """ Check if semgrep is installed and accessible. Returns: True if semgrep is available, False otherwise """ try: result = subprocess.run( ["semgrep", "--version"], capture_output=True, text=True, timeout=5 ) return result.returncode == 0 except (subprocess.TimeoutExpired, FileNotFoundError): return False def find_custom_rules() -> List[str]: """ Find custom rule files in the rules/ directory. Returns: List of paths to custom rule files """ rules_dir = Path(__file__).parent.parent.parent / "rules" custom_rules = [] if rules_dir.exists() and rules_dir.is_dir(): # Look for YAML rule files for pattern in ["*.yaml", "*.yml"]: custom_rules.extend([str(f) for f in rules_dir.glob(pattern)]) return custom_rules def load_config_rules(config: Optional[Dict[str, Any]] = None) -> List[str]: """ Load custom rules from configuration. Args: config: Configuration dictionary with optional 'custom_rules' key Returns: List of rule file paths from configuration """ if not config: return [] custom_rules = config.get("custom_rules", []) if isinstance(custom_rules, list): return custom_rules return [] def run_semgrep( file_path: str, config: Optional[Dict[str, Any]] = None, timeout: int = 30 ) -> Dict[str, Any]: """ Run semgrep on a file and return JSON results. Args: file_path: Path to the file to scan config: Optional configuration dictionary timeout: Timeout in seconds (default 30) Returns: Dictionary with semgrep results or error information """ # Check if semgrep is installed if not check_semgrep_installed(): return { "error": "semgrep_not_installed", "message": "Semgrep is not installed. Install it with: pip install semgrep" } # Build semgrep command cmd = ["semgrep", "--json", "--quiet"] # Add custom rules custom_rules = find_custom_rules() config_rules = load_config_rules(config) all_rules = custom_rules + config_rules if all_rules: # Use custom rules for rule_file in all_rules: if os.path.exists(rule_file): cmd.extend(["--config", rule_file]) else: # Use auto configuration (community rules) cmd.extend(["--config", "auto"]) # Add target file cmd.append(file_path) try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout ) # Parse JSON output if result.stdout: try: output = json.loads(result.stdout) return output except json.JSONDecodeError as e: return { "error": "json_parse_error", "message": f"Failed to parse semgrep output: {str(e)}", "raw_output": result.stdout } else: # No output means no findings return {"results": []} except subprocess.TimeoutExpired: return { "error": "timeout", "message": f"Semgrep scan timed out after {timeout} seconds" } except FileNotFoundError: return { "error": "semgrep_not_found", "message": "Semgrep executable not found in PATH" } except Exception as e: return { "error": "unexpected_error", "message": f"Unexpected error running semgrep: {str(e)}" } def parse_semgrep_results( semgrep_output: Dict[str, Any], file_path: str ) -> List[Dict[str, Any]]: """ Parse semgrep JSON output into standard vulnerability format. Args: semgrep_output: Raw semgrep JSON output file_path: Path to the scanned file Returns: List of standardized vulnerability dictionaries """ vulnerabilities = [] # Check for errors if "error" in semgrep_output: # Return empty list for errors - they've been logged return vulnerabilities # Get results from semgrep output results = semgrep_output.get("results", []) for finding in results: # Extract basic information check_id = finding.get("check_id", "unknown") message = finding.get("extra", {}).get("message", finding.get("message", "")) severity = finding.get("extra", {}).get("severity", "INFO").upper() # Map severity to standard format standard_severity = SEVERITY_MAPPING.get(severity, "MEDIUM") # Get location information start = finding.get("start", {}) line_number = start.get("line", 0) # Get code snippet lines = finding.get("extra", {}).get("lines", "") if not lines: # Try to extract from the finding lines = finding.get("lines", "") code_snippet = lines.strip() if lines else "" # Get metadata metadata = finding.get("extra", {}).get("metadata", {}) # Create vulnerability entry vulnerability = { "id": f"semgrep-{check_id}", "severity": standard_severity, "title": f"Semgrep: {message[:80]}", "description": message, "line_number": line_number, "code_snippet": code_snippet, "file_path": file_path, "scanner": "semgrep", "semgrep_rule_id": check_id, "semgrep_message": message, "semgrep_metadata": metadata } vulnerabilities.append(vulnerability) return vulnerabilities def scan_with_semgrep( file_path: str, code: str, config: Optional[Dict[str, Any]] = None ) -> List[Dict[str, Any]]: """ Scan code using semgrep. Args: file_path: Original file path (for reference) code: Source code to scan config: Optional configuration dictionary Returns: List of vulnerability dictionaries """ vulnerabilities = [] # Create temporary file with the code with tempfile.NamedTemporaryFile( mode='w', suffix=Path(file_path).suffix or '.py', delete=False ) as temp_file: temp_file.write(code) temp_path = temp_file.name try: # Run semgrep on temporary file timeout = config.get("semgrep_timeout", 30) if config else 30 semgrep_output = run_semgrep(temp_path, config, timeout) # Parse results vulnerabilities = parse_semgrep_results(semgrep_output, file_path) finally: # Clean up temporary file try: os.unlink(temp_path) except Exception: pass # Ignore cleanup errors return vulnerabilities def scan_with_custom_rules( file_path: str, code: str, rule_files: List[str] ) -> List[Dict[str, Any]]: """ Scan code using specific custom rule files. Args: file_path: Original file path (for reference) code: Source code to scan rule_files: List of paths to rule files Returns: List of vulnerability dictionaries """ config = {"custom_rules": rule_files} return scan_with_semgrep(file_path, code, config) # Test function def test_semgrep_wrapper(): """Test the semgrep wrapper with sample vulnerable code.""" print("Testing Semgrep Wrapper...") print("-" * 50) # Check if semgrep is installed if not check_semgrep_installed(): print("❌ Semgrep is not installed") print("Install it with: pip install semgrep") return print("✓ Semgrep is installed") # Test code with security issues test_code = ''' import pickle import subprocess # Insecure deserialization def load_data(data): return pickle.loads(data) # Vulnerable to code execution # Command injection def run_command(user_input): subprocess.call("ls " + user_input, shell=True) # Shell injection # Hardcoded secret api_key = "sk-1234567890abcdef" ''' print("\nScanning test code...") vulnerabilities = scan_with_semgrep("test.py", test_code) print(f"\n✓ Found {len(vulnerabilities)} issue(s)") if vulnerabilities: print("\nDetected vulnerabilities:") for i, vuln in enumerate(vulnerabilities, 1): print(f"\n[{i}] {vuln['title']}") print(f" Severity: {vuln['severity']}") print(f" Line: {vuln['line_number']}") print(f" Rule: {vuln['semgrep_rule_id']}") # Test custom rules custom_rules = find_custom_rules() if custom_rules: print(f"\n✓ Found {len(custom_rules)} custom rule file(s):") for rule in custom_rules: print(f" - {rule}") else: print("\n✓ No custom rule files found in rules/") print("\n✅ Semgrep wrapper test: SUCCESS") if __name__ == "__main__": test_semgrep_wrapper()