#!/usr/bin/env python3 """ Security Scanner Module - AI-powered vulnerability detection for MCP deployments Uses Nebius AI to analyze Python code for security vulnerabilities before deployment. Focuses on real threats: code injection, malicious behavior, resource abuse. """ import os import hashlib import json from datetime import datetime, timedelta from typing import Optional from openai import OpenAI # Cache for security scan results (code_hash -> scan_result) # Avoids re-scanning identical code _scan_cache = {} _cache_expiry = {} CACHE_TTL_SECONDS = 3600 # 1 hour def _get_code_hash(code: str) -> str: """Generate SHA256 hash of code for caching""" return hashlib.sha256(code.encode('utf-8')).hexdigest() def _get_cached_scan(code_hash: str) -> Optional[dict]: """Retrieve cached scan result if still valid""" if code_hash in _scan_cache: expiry = _cache_expiry.get(code_hash) if expiry and datetime.now() < expiry: return _scan_cache[code_hash] else: # Expired, remove from cache _scan_cache.pop(code_hash, None) _cache_expiry.pop(code_hash, None) return None def _cache_scan_result(code_hash: str, result: dict): """Cache scan result with TTL""" _scan_cache[code_hash] = result _cache_expiry[code_hash] = datetime.now() + timedelta(seconds=CACHE_TTL_SECONDS) def _map_severity(malicious_type: str) -> str: """ Map malicious type to severity level. Critical: Immediate threat to system/data High: Significant vulnerability Medium: Potential issue Low: Minor concern Safe: No issues """ severity_map = { # Critical threats "ransomware": "critical", "backdoor": "critical", "remote_access_tool": "critical", "credential_harvesting": "critical", # High severity "sql_injection": "high", "command_injection": "high", "ddos_script": "high", # Medium severity "obfuscated_suspicious": "medium", "trojan": "medium", "keylogger": "medium", # Low severity "other": "low", "virus": "low", "worm": "low", # Safe "none": "safe" } return severity_map.get(malicious_type.lower(), "medium") def _build_security_prompt(code: str, context: dict) -> str: """ Build comprehensive security analysis prompt. Focuses on real threats while ignoring false positives like hardcoded keys (since all deployed code is public on Modal.com). """ server_name = context.get("server_name", "Unknown") packages = context.get("packages", []) description = context.get("description", "") prompt = f"""You are an expert security analyst reviewing Python code for MCP server deployments on Modal.com. **IMPORTANT CONTEXT:** - All deployed code is PUBLIC and visible to anyone - Hardcoded API keys/credentials are NOT a security threat for this platform (though bad practice) - Focus on vulnerabilities that could harm the platform or users **Code to Analyze:** ```python {code} ``` **Deployment Context:** - Server Name: {server_name} - Packages: {', '.join(packages) if packages else 'None'} - Description: {description} **Check for REAL THREATS (flag these):** 1. **Code Injection Vulnerabilities:** - eval() or exec() with user input - subprocess calls with unsanitized input (especially shell=True) - SQL queries using string concatenation - Dynamic imports from user input 2. **Malicious Network Behavior:** - Data exfiltration to suspicious domains - Command & Control (C2) communication patterns - Cryptocurrency mining - Unusual outbound connections to non-standard ports 3. **Resource Abuse:** - Infinite loops or recursive calls - Memory exhaustion attacks - CPU intensive operations without limits - Denial of Service patterns 4. **Destructive Operations:** - Attempts to escape sandbox/container - System file manipulation - Process manipulation (killing other processes) - Privilege escalation attempts 5. **Malicious Packages:** - Known malicious PyPI packages - Typosquatting package names - Packages with known CVEs **DO NOT FLAG (these are acceptable):** - Hardcoded API keys, passwords, or tokens (code is public anyway) - Legitimate external API calls (OpenAI, Anthropic, etc.) - Normal file operations (reading/writing files in sandbox) - Standard web requests to known services - Environment variable usage **Provide detailed analysis with specific line references if issues found.** """ return prompt def scan_code_for_security(code: str, context: dict) -> dict: """ Scan Python code for security vulnerabilities using Nebius AI. Args: code: The Python code to scan context: Dictionary with deployment context: - server_name: Name of the server - packages: List of pip packages - description: Server description - deployment_id: Optional deployment ID Returns: dict with: - scan_completed: bool (whether scan finished) - is_safe: bool (whether code is safe to deploy) - severity: str ("safe", "low", "medium", "high", "critical") - malicious_type: str (type of threat or "none") - explanation: str (human-readable explanation) - reasoning_steps: list[str] (AI's reasoning process) - issues: list[dict] (specific issues found) - recommendation: str (what to do) - scanned_at: str (ISO timestamp) - cached: bool (whether result came from cache) """ # Check if scanning is enabled if os.getenv("SECURITY_SCANNING_ENABLED", "true").lower() != "true": return { "scan_completed": False, "is_safe": True, "severity": "safe", "malicious_type": "none", "explanation": "Security scanning is disabled", "reasoning_steps": ["Security scanning disabled via SECURITY_SCANNING_ENABLED=false"], "issues": [], "recommendation": "Allow (scanning disabled)", "scanned_at": datetime.now().isoformat(), "cached": False } # Check cache first code_hash = _get_code_hash(code) cached_result = _get_cached_scan(code_hash) if cached_result: cached_result["cached"] = True return cached_result # Get API key api_key = os.getenv("NEBIUS_API_KEY") if not api_key: # Fall back to warning mode if no API key return { "scan_completed": False, "is_safe": True, "severity": "safe", "malicious_type": "none", "explanation": "NEBIUS_API_KEY not configured - security scanning unavailable", "reasoning_steps": ["No API key found in environment"], "issues": [], "recommendation": "Warn (no API key)", "scanned_at": datetime.now().isoformat(), "cached": False } try: # Initialize Nebius client (OpenAI-compatible) client = OpenAI( base_url="https://api.tokenfactory.nebius.com/v1/", api_key=api_key ) # Build security analysis prompt prompt = _build_security_prompt(code, context) # Call Nebius API with structured JSON schema response = client.chat.completions.create( model="Qwen/Qwen3-32B-fast", temperature=0.6, top_p=0.95, timeout=30.0, # 30 second timeout response_format={ "type": "json_schema", "json_schema": { "name": "security_analysis_schema", "strict": True, "schema": { "type": "object", "properties": { "reasoning_steps": { "type": "array", "items": { "type": "string" }, "description": "The reasoning steps leading to the final conclusion." }, "is_malicious": { "type": "boolean", "description": "Indicates whether the provided code or content is malicious (true) or safe/non-malicious (false)." }, "malicious_type": { "type": "string", "enum": [ "none", "virus", "worm", "ransomware", "trojan", "keylogger", "backdoor", "remote_access_tool", "sql_injection", "command_injection", "ddos_script", "credential_harvesting", "obfuscated_suspicious", "other" ], "description": "If malicious, classify the type. Use 'none' when code is safe." }, "explanation": { "type": "string", "description": "A short, safe explanation of why the code is considered malicious or not, without including harmful details." }, "answer": { "type": "string", "description": "The final answer, taking all reasoning steps into account." } }, "required": [ "reasoning_steps", "is_malicious", "malicious_type", "explanation", "answer" ], "additionalProperties": False } } }, messages=[ { "role": "user", "content": prompt } ] ) # Parse response response_content = response.choices[0].message.content scan_data = json.loads(response_content) # Map to our format severity = _map_severity(scan_data["malicious_type"]) is_safe = not scan_data["is_malicious"] # Determine recommendation if severity in ["critical", "high"]: recommendation = "Block deployment" elif severity in ["medium", "low"]: recommendation = "Warn and allow" else: recommendation = "Allow" # Build issues list issues = [] if scan_data["is_malicious"]: issues.append({ "type": scan_data["malicious_type"], "severity": severity, "description": scan_data["explanation"] }) result = { "scan_completed": True, "is_safe": is_safe, "severity": severity, "malicious_type": scan_data["malicious_type"], "explanation": scan_data["explanation"], "reasoning_steps": scan_data["reasoning_steps"], "issues": issues, "recommendation": recommendation, "scanned_at": datetime.now().isoformat(), "cached": False, "raw_answer": scan_data.get("answer", "") } # Cache the result _cache_scan_result(code_hash, result) return result except Exception as e: # On error, fall back to warning mode (allow deployment with warning) error_msg = str(e) return { "scan_completed": False, "is_safe": True, # Allow on error "severity": "safe", "malicious_type": "none", "explanation": f"Security scan failed: {error_msg}", "reasoning_steps": [f"Error during scan: {error_msg}"], "issues": [], "recommendation": "Warn (scan failed)", "scanned_at": datetime.now().isoformat(), "cached": False, "error": error_msg } def clear_scan_cache(): """Clear the security scan cache (useful for testing)""" _scan_cache.clear() _cache_expiry.clear()