File size: 12,874 Bytes
626b033
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
#!/usr/bin/env python3
"""
Security Scanner Module - AI-powered vulnerability detection for MCP deployments

Uses Nebius AI to analyze Python code for security vulnerabilities before deployment.
Focuses on real threats: code injection, malicious behavior, resource abuse.
"""

import os
import hashlib
import json
from datetime import datetime, timedelta
from typing import Optional
from openai import OpenAI


# Cache for security scan results (code_hash -> scan_result)
# Avoids re-scanning identical code
_scan_cache = {}
_cache_expiry = {}
CACHE_TTL_SECONDS = 3600  # 1 hour


def _get_code_hash(code: str) -> str:
    """Generate SHA256 hash of code for caching"""
    return hashlib.sha256(code.encode('utf-8')).hexdigest()


def _get_cached_scan(code_hash: str) -> Optional[dict]:
    """Retrieve cached scan result if still valid"""
    if code_hash in _scan_cache:
        expiry = _cache_expiry.get(code_hash)
        if expiry and datetime.now() < expiry:
            return _scan_cache[code_hash]
        else:
            # Expired, remove from cache
            _scan_cache.pop(code_hash, None)
            _cache_expiry.pop(code_hash, None)
    return None


def _cache_scan_result(code_hash: str, result: dict):
    """Cache scan result with TTL"""
    _scan_cache[code_hash] = result
    _cache_expiry[code_hash] = datetime.now() + timedelta(seconds=CACHE_TTL_SECONDS)


def _map_severity(malicious_type: str) -> str:
    """
    Map malicious type to severity level.

    Critical: Immediate threat to system/data
    High: Significant vulnerability
    Medium: Potential issue
    Low: Minor concern
    Safe: No issues
    """
    severity_map = {
        # Critical threats
        "ransomware": "critical",
        "backdoor": "critical",
        "remote_access_tool": "critical",
        "credential_harvesting": "critical",

        # High severity
        "sql_injection": "high",
        "command_injection": "high",
        "ddos_script": "high",

        # Medium severity
        "obfuscated_suspicious": "medium",
        "trojan": "medium",
        "keylogger": "medium",

        # Low severity
        "other": "low",
        "virus": "low",
        "worm": "low",

        # Safe
        "none": "safe"
    }

    return severity_map.get(malicious_type.lower(), "medium")


def _build_security_prompt(code: str, context: dict) -> str:
    """
    Build comprehensive security analysis prompt.

    Focuses on real threats while ignoring false positives like hardcoded keys
    (since all deployed code is public on Modal.com).
    """
    server_name = context.get("server_name", "Unknown")
    packages = context.get("packages", [])
    description = context.get("description", "")

    prompt = f"""You are an expert security analyst reviewing Python code for MCP server deployments on Modal.com.

**IMPORTANT CONTEXT:**
- All deployed code is PUBLIC and visible to anyone
- Hardcoded API keys/credentials are NOT a security threat for this platform (though bad practice)
- Focus on vulnerabilities that could harm the platform or users

**Code to Analyze:**
```python
{code}
```

**Deployment Context:**
- Server Name: {server_name}
- Packages: {', '.join(packages) if packages else 'None'}
- Description: {description}

**Check for REAL THREATS (flag these):**

1. **Code Injection Vulnerabilities:**
   - eval() or exec() with user input
   - subprocess calls with unsanitized input (especially shell=True)
   - SQL queries using string concatenation
   - Dynamic imports from user input

2. **Malicious Network Behavior:**
   - Data exfiltration to suspicious domains
   - Command & Control (C2) communication patterns
   - Cryptocurrency mining
   - Unusual outbound connections to non-standard ports

3. **Resource Abuse:**
   - Infinite loops or recursive calls
   - Memory exhaustion attacks
   - CPU intensive operations without limits
   - Denial of Service patterns

4. **Destructive Operations:**
   - Attempts to escape sandbox/container
   - System file manipulation
   - Process manipulation (killing other processes)
   - Privilege escalation attempts

5. **Malicious Packages:**
   - Known malicious PyPI packages
   - Typosquatting package names
   - Packages with known CVEs

**DO NOT FLAG (these are acceptable):**
- Hardcoded API keys, passwords, or tokens (code is public anyway)
- Legitimate external API calls (OpenAI, Anthropic, etc.)
- Normal file operations (reading/writing files in sandbox)
- Standard web requests to known services
- Environment variable usage

**Provide detailed analysis with specific line references if issues found.**
"""

    return prompt


def scan_code_for_security(code: str, context: dict) -> dict:
    """
    Scan Python code for security vulnerabilities using Nebius AI.

    Args:
        code: The Python code to scan
        context: Dictionary with deployment context:
            - server_name: Name of the server
            - packages: List of pip packages
            - description: Server description
            - deployment_id: Optional deployment ID

    Returns:
        dict with:
        - scan_completed: bool (whether scan finished)
        - is_safe: bool (whether code is safe to deploy)
        - severity: str ("safe", "low", "medium", "high", "critical")
        - malicious_type: str (type of threat or "none")
        - explanation: str (human-readable explanation)
        - reasoning_steps: list[str] (AI's reasoning process)
        - issues: list[dict] (specific issues found)
        - recommendation: str (what to do)
        - scanned_at: str (ISO timestamp)
        - cached: bool (whether result came from cache)
    """

    # Check if scanning is enabled
    if os.getenv("SECURITY_SCANNING_ENABLED", "true").lower() != "true":
        return {
            "scan_completed": False,
            "is_safe": True,
            "severity": "safe",
            "malicious_type": "none",
            "explanation": "Security scanning is disabled",
            "reasoning_steps": ["Security scanning disabled via SECURITY_SCANNING_ENABLED=false"],
            "issues": [],
            "recommendation": "Allow (scanning disabled)",
            "scanned_at": datetime.now().isoformat(),
            "cached": False
        }

    # Check cache first
    code_hash = _get_code_hash(code)
    cached_result = _get_cached_scan(code_hash)
    if cached_result:
        cached_result["cached"] = True
        return cached_result

    # Get API key
    api_key = os.getenv("NEBIUS_API_KEY")
    if not api_key:
        # Fall back to warning mode if no API key
        return {
            "scan_completed": False,
            "is_safe": True,
            "severity": "safe",
            "malicious_type": "none",
            "explanation": "NEBIUS_API_KEY not configured - security scanning unavailable",
            "reasoning_steps": ["No API key found in environment"],
            "issues": [],
            "recommendation": "Warn (no API key)",
            "scanned_at": datetime.now().isoformat(),
            "cached": False
        }

    try:
        # Initialize Nebius client (OpenAI-compatible)
        client = OpenAI(
            base_url="https://api.tokenfactory.nebius.com/v1/",
            api_key=api_key
        )

        # Build security analysis prompt
        prompt = _build_security_prompt(code, context)

        # Call Nebius API with structured JSON schema
        response = client.chat.completions.create(
            model="Qwen/Qwen3-32B-fast",
            temperature=0.6,
            top_p=0.95,
            timeout=30.0,  # 30 second timeout
            response_format={
                "type": "json_schema",
                "json_schema": {
                    "name": "security_analysis_schema",
                    "strict": True,
                    "schema": {
                        "type": "object",
                        "properties": {
                            "reasoning_steps": {
                                "type": "array",
                                "items": {
                                    "type": "string"
                                },
                                "description": "The reasoning steps leading to the final conclusion."
                            },
                            "is_malicious": {
                                "type": "boolean",
                                "description": "Indicates whether the provided code or content is malicious (true) or safe/non-malicious (false)."
                            },
                            "malicious_type": {
                                "type": "string",
                                "enum": [
                                    "none",
                                    "virus",
                                    "worm",
                                    "ransomware",
                                    "trojan",
                                    "keylogger",
                                    "backdoor",
                                    "remote_access_tool",
                                    "sql_injection",
                                    "command_injection",
                                    "ddos_script",
                                    "credential_harvesting",
                                    "obfuscated_suspicious",
                                    "other"
                                ],
                                "description": "If malicious, classify the type. Use 'none' when code is safe."
                            },
                            "explanation": {
                                "type": "string",
                                "description": "A short, safe explanation of why the code is considered malicious or not, without including harmful details."
                            },
                            "answer": {
                                "type": "string",
                                "description": "The final answer, taking all reasoning steps into account."
                            }
                        },
                        "required": [
                            "reasoning_steps",
                            "is_malicious",
                            "malicious_type",
                            "explanation",
                            "answer"
                        ],
                        "additionalProperties": False
                    }
                }
            },
            messages=[
                {
                    "role": "user",
                    "content": prompt
                }
            ]
        )

        # Parse response
        response_content = response.choices[0].message.content
        scan_data = json.loads(response_content)

        # Map to our format
        severity = _map_severity(scan_data["malicious_type"])
        is_safe = not scan_data["is_malicious"]

        # Determine recommendation
        if severity in ["critical", "high"]:
            recommendation = "Block deployment"
        elif severity in ["medium", "low"]:
            recommendation = "Warn and allow"
        else:
            recommendation = "Allow"

        # Build issues list
        issues = []
        if scan_data["is_malicious"]:
            issues.append({
                "type": scan_data["malicious_type"],
                "severity": severity,
                "description": scan_data["explanation"]
            })

        result = {
            "scan_completed": True,
            "is_safe": is_safe,
            "severity": severity,
            "malicious_type": scan_data["malicious_type"],
            "explanation": scan_data["explanation"],
            "reasoning_steps": scan_data["reasoning_steps"],
            "issues": issues,
            "recommendation": recommendation,
            "scanned_at": datetime.now().isoformat(),
            "cached": False,
            "raw_answer": scan_data.get("answer", "")
        }

        # Cache the result
        _cache_scan_result(code_hash, result)

        return result

    except Exception as e:
        # On error, fall back to warning mode (allow deployment with warning)
        error_msg = str(e)

        return {
            "scan_completed": False,
            "is_safe": True,  # Allow on error
            "severity": "safe",
            "malicious_type": "none",
            "explanation": f"Security scan failed: {error_msg}",
            "reasoning_steps": [f"Error during scan: {error_msg}"],
            "issues": [],
            "recommendation": "Warn (scan failed)",
            "scanned_at": datetime.now().isoformat(),
            "cached": False,
            "error": error_msg
        }


def clear_scan_cache():
    """Clear the security scan cache (useful for testing)"""
    _scan_cache.clear()
    _cache_expiry.clear()