Spaces:
Sleeping
Sleeping
File size: 12,874 Bytes
626b033 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 | #!/usr/bin/env python3
"""
Security Scanner Module - AI-powered vulnerability detection for MCP deployments
Uses Nebius AI to analyze Python code for security vulnerabilities before deployment.
Focuses on real threats: code injection, malicious behavior, resource abuse.
"""
import os
import hashlib
import json
from datetime import datetime, timedelta
from typing import Optional
from openai import OpenAI
# Cache for security scan results (code_hash -> scan_result)
# Avoids re-scanning identical code
_scan_cache = {}
_cache_expiry = {}
CACHE_TTL_SECONDS = 3600 # 1 hour
def _get_code_hash(code: str) -> str:
"""Generate SHA256 hash of code for caching"""
return hashlib.sha256(code.encode('utf-8')).hexdigest()
def _get_cached_scan(code_hash: str) -> Optional[dict]:
"""Retrieve cached scan result if still valid"""
if code_hash in _scan_cache:
expiry = _cache_expiry.get(code_hash)
if expiry and datetime.now() < expiry:
return _scan_cache[code_hash]
else:
# Expired, remove from cache
_scan_cache.pop(code_hash, None)
_cache_expiry.pop(code_hash, None)
return None
def _cache_scan_result(code_hash: str, result: dict):
"""Cache scan result with TTL"""
_scan_cache[code_hash] = result
_cache_expiry[code_hash] = datetime.now() + timedelta(seconds=CACHE_TTL_SECONDS)
def _map_severity(malicious_type: str) -> str:
"""
Map malicious type to severity level.
Critical: Immediate threat to system/data
High: Significant vulnerability
Medium: Potential issue
Low: Minor concern
Safe: No issues
"""
severity_map = {
# Critical threats
"ransomware": "critical",
"backdoor": "critical",
"remote_access_tool": "critical",
"credential_harvesting": "critical",
# High severity
"sql_injection": "high",
"command_injection": "high",
"ddos_script": "high",
# Medium severity
"obfuscated_suspicious": "medium",
"trojan": "medium",
"keylogger": "medium",
# Low severity
"other": "low",
"virus": "low",
"worm": "low",
# Safe
"none": "safe"
}
return severity_map.get(malicious_type.lower(), "medium")
def _build_security_prompt(code: str, context: dict) -> str:
"""
Build comprehensive security analysis prompt.
Focuses on real threats while ignoring false positives like hardcoded keys
(since all deployed code is public on Modal.com).
"""
server_name = context.get("server_name", "Unknown")
packages = context.get("packages", [])
description = context.get("description", "")
prompt = f"""You are an expert security analyst reviewing Python code for MCP server deployments on Modal.com.
**IMPORTANT CONTEXT:**
- All deployed code is PUBLIC and visible to anyone
- Hardcoded API keys/credentials are NOT a security threat for this platform (though bad practice)
- Focus on vulnerabilities that could harm the platform or users
**Code to Analyze:**
```python
{code}
```
**Deployment Context:**
- Server Name: {server_name}
- Packages: {', '.join(packages) if packages else 'None'}
- Description: {description}
**Check for REAL THREATS (flag these):**
1. **Code Injection Vulnerabilities:**
- eval() or exec() with user input
- subprocess calls with unsanitized input (especially shell=True)
- SQL queries using string concatenation
- Dynamic imports from user input
2. **Malicious Network Behavior:**
- Data exfiltration to suspicious domains
- Command & Control (C2) communication patterns
- Cryptocurrency mining
- Unusual outbound connections to non-standard ports
3. **Resource Abuse:**
- Infinite loops or recursive calls
- Memory exhaustion attacks
- CPU intensive operations without limits
- Denial of Service patterns
4. **Destructive Operations:**
- Attempts to escape sandbox/container
- System file manipulation
- Process manipulation (killing other processes)
- Privilege escalation attempts
5. **Malicious Packages:**
- Known malicious PyPI packages
- Typosquatting package names
- Packages with known CVEs
**DO NOT FLAG (these are acceptable):**
- Hardcoded API keys, passwords, or tokens (code is public anyway)
- Legitimate external API calls (OpenAI, Anthropic, etc.)
- Normal file operations (reading/writing files in sandbox)
- Standard web requests to known services
- Environment variable usage
**Provide detailed analysis with specific line references if issues found.**
"""
return prompt
def scan_code_for_security(code: str, context: dict) -> dict:
"""
Scan Python code for security vulnerabilities using Nebius AI.
Args:
code: The Python code to scan
context: Dictionary with deployment context:
- server_name: Name of the server
- packages: List of pip packages
- description: Server description
- deployment_id: Optional deployment ID
Returns:
dict with:
- scan_completed: bool (whether scan finished)
- is_safe: bool (whether code is safe to deploy)
- severity: str ("safe", "low", "medium", "high", "critical")
- malicious_type: str (type of threat or "none")
- explanation: str (human-readable explanation)
- reasoning_steps: list[str] (AI's reasoning process)
- issues: list[dict] (specific issues found)
- recommendation: str (what to do)
- scanned_at: str (ISO timestamp)
- cached: bool (whether result came from cache)
"""
# Check if scanning is enabled
if os.getenv("SECURITY_SCANNING_ENABLED", "true").lower() != "true":
return {
"scan_completed": False,
"is_safe": True,
"severity": "safe",
"malicious_type": "none",
"explanation": "Security scanning is disabled",
"reasoning_steps": ["Security scanning disabled via SECURITY_SCANNING_ENABLED=false"],
"issues": [],
"recommendation": "Allow (scanning disabled)",
"scanned_at": datetime.now().isoformat(),
"cached": False
}
# Check cache first
code_hash = _get_code_hash(code)
cached_result = _get_cached_scan(code_hash)
if cached_result:
cached_result["cached"] = True
return cached_result
# Get API key
api_key = os.getenv("NEBIUS_API_KEY")
if not api_key:
# Fall back to warning mode if no API key
return {
"scan_completed": False,
"is_safe": True,
"severity": "safe",
"malicious_type": "none",
"explanation": "NEBIUS_API_KEY not configured - security scanning unavailable",
"reasoning_steps": ["No API key found in environment"],
"issues": [],
"recommendation": "Warn (no API key)",
"scanned_at": datetime.now().isoformat(),
"cached": False
}
try:
# Initialize Nebius client (OpenAI-compatible)
client = OpenAI(
base_url="https://api.tokenfactory.nebius.com/v1/",
api_key=api_key
)
# Build security analysis prompt
prompt = _build_security_prompt(code, context)
# Call Nebius API with structured JSON schema
response = client.chat.completions.create(
model="Qwen/Qwen3-32B-fast",
temperature=0.6,
top_p=0.95,
timeout=30.0, # 30 second timeout
response_format={
"type": "json_schema",
"json_schema": {
"name": "security_analysis_schema",
"strict": True,
"schema": {
"type": "object",
"properties": {
"reasoning_steps": {
"type": "array",
"items": {
"type": "string"
},
"description": "The reasoning steps leading to the final conclusion."
},
"is_malicious": {
"type": "boolean",
"description": "Indicates whether the provided code or content is malicious (true) or safe/non-malicious (false)."
},
"malicious_type": {
"type": "string",
"enum": [
"none",
"virus",
"worm",
"ransomware",
"trojan",
"keylogger",
"backdoor",
"remote_access_tool",
"sql_injection",
"command_injection",
"ddos_script",
"credential_harvesting",
"obfuscated_suspicious",
"other"
],
"description": "If malicious, classify the type. Use 'none' when code is safe."
},
"explanation": {
"type": "string",
"description": "A short, safe explanation of why the code is considered malicious or not, without including harmful details."
},
"answer": {
"type": "string",
"description": "The final answer, taking all reasoning steps into account."
}
},
"required": [
"reasoning_steps",
"is_malicious",
"malicious_type",
"explanation",
"answer"
],
"additionalProperties": False
}
}
},
messages=[
{
"role": "user",
"content": prompt
}
]
)
# Parse response
response_content = response.choices[0].message.content
scan_data = json.loads(response_content)
# Map to our format
severity = _map_severity(scan_data["malicious_type"])
is_safe = not scan_data["is_malicious"]
# Determine recommendation
if severity in ["critical", "high"]:
recommendation = "Block deployment"
elif severity in ["medium", "low"]:
recommendation = "Warn and allow"
else:
recommendation = "Allow"
# Build issues list
issues = []
if scan_data["is_malicious"]:
issues.append({
"type": scan_data["malicious_type"],
"severity": severity,
"description": scan_data["explanation"]
})
result = {
"scan_completed": True,
"is_safe": is_safe,
"severity": severity,
"malicious_type": scan_data["malicious_type"],
"explanation": scan_data["explanation"],
"reasoning_steps": scan_data["reasoning_steps"],
"issues": issues,
"recommendation": recommendation,
"scanned_at": datetime.now().isoformat(),
"cached": False,
"raw_answer": scan_data.get("answer", "")
}
# Cache the result
_cache_scan_result(code_hash, result)
return result
except Exception as e:
# On error, fall back to warning mode (allow deployment with warning)
error_msg = str(e)
return {
"scan_completed": False,
"is_safe": True, # Allow on error
"severity": "safe",
"malicious_type": "none",
"explanation": f"Security scan failed: {error_msg}",
"reasoning_steps": [f"Error during scan: {error_msg}"],
"issues": [],
"recommendation": "Warn (scan failed)",
"scanned_at": datetime.now().isoformat(),
"cached": False,
"error": error_msg
}
def clear_scan_cache():
"""Clear the security scan cache (useful for testing)"""
_scan_cache.clear()
_cache_expiry.clear()
|