| """
|
| Data exfiltration and suspicious intent detector.
|
|
|
| Detects attempts to leak, steal, or exfiltrate data.
|
| """
|
|
|
| from __future__ import annotations
|
|
|
| import re
|
| from typing import Any, Dict, Optional
|
|
|
| from ..base import ScannerPlugin, ScanResult, PluginMetadata
|
|
|
|
|
|
|
| EXFILTRATION_PATTERNS = [
|
| r"\bexfiltrat(e|ion)\b",
|
| r"send\s+all\s+(secrets|api\s*keys|data)\b",
|
| r"\b(upload|post|send)\s+.*\s+to\s+(https?://|attacker|evil|external|server|endpoint|webhook|collector)",
|
| r"\b(upload|post|send)\s+to\b",
|
| r"webhook",
|
| r"http[s]?://[^\s]*(evil|attacker|malicious)\.com",
|
| ]
|
|
|
|
|
| SENSITIVE_SERVERS = {"filesystem", "database", "api", "auth_service", "credentials"}
|
|
|
|
|
| DANGEROUS_TOOL_KEYWORDS = ["delete", "drop", "truncate", "chmod", "chown", "rm "]
|
|
|
|
|
| class ExfiltrationDetector(ScannerPlugin):
|
| """Detects data exfiltration attempts and suspicious intent."""
|
|
|
| def __init__(self):
|
| super().__init__(
|
| metadata=PluginMetadata(
|
| name="ExfiltrationDetector",
|
| version="1.0.0",
|
| description="Detects data exfiltration attempts and suspicious data leakage intent",
|
| author="SecurityGateway",
|
| )
|
| )
|
|
|
| def scan(
|
| self,
|
| user_id: Optional[str],
|
| server_key: str,
|
| tool: str,
|
| arguments: Dict[str, Any],
|
| llm_context: Optional[str] = None,
|
| ) -> ScanResult:
|
| """
|
| Scan for exfiltration and data leakage attempts.
|
|
|
| Checks for:
|
| - Exfiltration intent patterns (send secrets, upload to, webhook, etc)
|
| - Sensitive server types (filesystem, database, API, auth)
|
| - Dangerous tool names (delete, drop, truncate, chmod, etc)
|
| - Exfiltration via network tools (fetch + exfiltration intent)
|
|
|
| Args:
|
| user_id: User identifier
|
| server_key: Server key
|
| tool: Tool name
|
| arguments: Tool arguments
|
| llm_context: Optional context
|
|
|
| Returns:
|
| ScanResult with detection status
|
| """
|
| context = (llm_context or "") + " " + self._flatten_json(arguments)
|
|
|
| reasons = []
|
| flags = {}
|
| risk_score = 0.0
|
|
|
|
|
| exfiltration_detected = self._contains_pattern(context, EXFILTRATION_PATTERNS)
|
| if exfiltration_detected:
|
| reasons.append("Potential data exfiltration intent detected.")
|
| flags["exfiltration_like"] = True
|
| risk_score += 0.30
|
|
|
|
|
| if server_key in SENSITIVE_SERVERS:
|
| reasons.append(f"Sensitive server type: {server_key}.")
|
| flags["sensitive_server"] = True
|
| risk_score += 0.15
|
|
|
|
|
| tool_lower = tool.lower()
|
| if any(keyword in tool_lower for keyword in DANGEROUS_TOOL_KEYWORDS):
|
| reasons.append("Tool name suggests potentially destructive action.")
|
| flags["dangerous_tool_name"] = True
|
| risk_score += 0.15
|
|
|
|
|
| if server_key in {"fetch"} and exfiltration_detected:
|
| reasons.append("Exfiltration intent with network tool.")
|
| flags["exfiltration_network"] = True
|
| risk_score += 0.40
|
|
|
| detected = bool(reasons)
|
|
|
| return ScanResult(
|
| plugin_name=self.get_metadata().name,
|
| detected=detected,
|
| risk_score=min(1.0, risk_score),
|
| reasons=reasons if reasons else ["No exfiltration intent detected."],
|
| flags=flags,
|
| )
|
|
|
| def _flatten_json(self, value: Any) -> str:
|
| """Flatten nested structures to string for pattern matching."""
|
| if isinstance(value, dict):
|
| return " ".join(self._flatten_json(v) for v in value.values())
|
| if isinstance(value, list):
|
| return " ".join(self._flatten_json(v) for v in value)
|
| return str(value)
|
|
|
| def _contains_pattern(self, text: str, patterns: list) -> bool:
|
| """Check if text matches any pattern."""
|
| for pat in patterns:
|
| if re.search(pat, text, flags=re.IGNORECASE):
|
| return True
|
| return False
|
|
|
|
|
|
|
| plugin = ExfiltrationDetector()
|
|
|