yuki-sui's picture
Upload 169 files
ed71b0e verified
"""
Data exfiltration and suspicious intent detector.
Detects attempts to leak, steal, or exfiltrate data.
"""
from __future__ import annotations
import re
from typing import Any, Dict, Optional
from ..base import ScannerPlugin, ScanResult, PluginMetadata
# Exfiltration and data leakage intent patterns
EXFILTRATION_PATTERNS = [
r"\bexfiltrat(e|ion)\b",
r"send\s+all\s+(secrets|api\s*keys|data)\b",
r"\b(upload|post|send)\s+.*\s+to\s+(https?://|attacker|evil|external|server|endpoint|webhook|collector)",
r"\b(upload|post|send)\s+to\b",
r"webhook",
r"http[s]?://[^\s]*(evil|attacker|malicious)\.com",
]
# Sensitive server types that amplify exfiltration risk
SENSITIVE_SERVERS = {"filesystem", "database", "api", "auth_service", "credentials"}
# Dangerous tool names that suggest destructive intent
DANGEROUS_TOOL_KEYWORDS = ["delete", "drop", "truncate", "chmod", "chown", "rm "]
class ExfiltrationDetector(ScannerPlugin):
"""Detects data exfiltration attempts and suspicious intent."""
def __init__(self):
super().__init__(
metadata=PluginMetadata(
name="ExfiltrationDetector",
version="1.0.0",
description="Detects data exfiltration attempts and suspicious data leakage intent",
author="SecurityGateway",
)
)
def scan(
self,
user_id: Optional[str],
server_key: str,
tool: str,
arguments: Dict[str, Any],
llm_context: Optional[str] = None,
) -> ScanResult:
"""
Scan for exfiltration and data leakage attempts.
Checks for:
- Exfiltration intent patterns (send secrets, upload to, webhook, etc)
- Sensitive server types (filesystem, database, API, auth)
- Dangerous tool names (delete, drop, truncate, chmod, etc)
- Exfiltration via network tools (fetch + exfiltration intent)
Args:
user_id: User identifier
server_key: Server key
tool: Tool name
arguments: Tool arguments
llm_context: Optional context
Returns:
ScanResult with detection status
"""
context = (llm_context or "") + " " + self._flatten_json(arguments)
reasons = []
flags = {}
risk_score = 0.0
# 1) Check for exfiltration intent
exfiltration_detected = self._contains_pattern(context, EXFILTRATION_PATTERNS)
if exfiltration_detected:
reasons.append("Potential data exfiltration intent detected.")
flags["exfiltration_like"] = True
risk_score += 0.30
# 2) Check for sensitive server type
if server_key in SENSITIVE_SERVERS:
reasons.append(f"Sensitive server type: {server_key}.")
flags["sensitive_server"] = True
risk_score += 0.15
# 3) Check for dangerous tool names
tool_lower = tool.lower()
if any(keyword in tool_lower for keyword in DANGEROUS_TOOL_KEYWORDS):
reasons.append("Tool name suggests potentially destructive action.")
flags["dangerous_tool_name"] = True
risk_score += 0.15
# 4) Amplify risk when exfiltration intent targets network operations
if server_key in {"fetch"} and exfiltration_detected:
reasons.append("Exfiltration intent with network tool.")
flags["exfiltration_network"] = True
risk_score += 0.40
detected = bool(reasons)
return ScanResult(
plugin_name=self.get_metadata().name,
detected=detected,
risk_score=min(1.0, risk_score),
reasons=reasons if reasons else ["No exfiltration intent detected."],
flags=flags,
)
def _flatten_json(self, value: Any) -> str:
"""Flatten nested structures to string for pattern matching."""
if isinstance(value, dict):
return " ".join(self._flatten_json(v) for v in value.values())
if isinstance(value, list):
return " ".join(self._flatten_json(v) for v in value)
return str(value)
def _contains_pattern(self, text: str, patterns: list) -> bool:
"""Check if text matches any pattern."""
for pat in patterns:
if re.search(pat, text, flags=re.IGNORECASE):
return True
return False
# Export as module-level plugin for auto-loading
plugin = ExfiltrationDetector()