| """ | |
| TEMPLATE: Create custom scanner plugins by copying and modifying this file. | |
| Steps: | |
| 1. Copy this file to a new name: my_detector.py | |
| 2. Modify the class name and metadata | |
| 3. Implement the scan() method with your detection logic | |
| 4. Place in plugins/builtin/ or plugins/custom/ directory | |
| 5. It will be auto-discovered and loaded | |
| See PLUGIN_SYSTEM.md for detailed documentation. | |
| """ | |
| from __future__ import annotations | |
| import re | |
| from typing import Any, Dict, Optional | |
| from ..base import ScannerPlugin, ScanResult, PluginMetadata | |
| class MyCustomDetector(ScannerPlugin): | |
| """ | |
| TODO: Replace with your detector description. | |
| Detects my custom threat pattern in tool calls. | |
| """ | |
| def __init__(self): | |
| super().__init__( | |
| metadata=PluginMetadata( | |
| name="MyCustomDetector", # TODO: Change to your plugin name | |
| version="1.0.0", | |
| description="TODO: Add your detector description", | |
| author="Your Name", | |
| ) | |
| ) | |
| def scan( | |
| self, | |
| user_id: Optional[str], | |
| server_key: str, | |
| tool: str, | |
| arguments: Dict[str, Any], | |
| llm_context: Optional[str] = None, | |
| ) -> ScanResult: | |
| """ | |
| Scan for custom threats. | |
| TODO: Replace with your implementation. | |
| Args: | |
| user_id: User identifier | |
| server_key: Server key (e.g., 'filesystem', 'fetch') | |
| tool: Tool name | |
| arguments: Tool arguments | |
| llm_context: Optional context from LLM | |
| Returns: | |
| ScanResult with detection status and risk score | |
| """ | |
| reasons = [] | |
| flags = {} | |
| # TODO: Implement your detection logic | |
| # Example: detect if arguments contain a specific pattern | |
| context = (llm_context or "") + " " + self._flatten_json(arguments) | |
| detected = self._check_threat(context, arguments) | |
| if detected: | |
| reasons.append("TODO: Add reason for detection") | |
| flags["custom_threat"] = True | |
| # TODO: Adjust risk score (0.0 = no threat, 1.0 = critical) | |
| risk_score = 0.5 if detected else 0.0 | |
| return ScanResult( | |
| plugin_name=self.get_metadata().name, | |
| detected=detected, | |
| risk_score=risk_score, | |
| reasons=reasons if reasons else ["No threats detected."], | |
| flags=flags, | |
| ) | |
| def _check_threat(self, context: str, arguments: Dict[str, Any]) -> bool: | |
| """ | |
| TODO: Implement your threat detection logic. | |
| Example patterns: | |
| - Check for specific keywords | |
| - Look for dangerous functions | |
| - Detect suspicious patterns | |
| - Analyze argument types/values | |
| """ | |
| # Placeholder implementation | |
| return False | |
| def _flatten_json(self, value: Any) -> str: | |
| """Helper: flatten nested structures to string.""" | |
| if isinstance(value, dict): | |
| return " ".join(self._flatten_json(v) for v in value.values()) | |
| if isinstance(value, list): | |
| return " ".join(self._flatten_json(v) for v in value) | |
| return str(value) | |
| # IMPORTANT: Export as module-level variable so plugin loader can find it | |
| # This enables auto-discovery when the file is loaded | |
| plugin = MyCustomDetector() | |
| # ============================================================================ | |
| # EXAMPLE IMPLEMENTATIONS BELOW (uncomment and modify as needed) | |
| # ============================================================================ | |
| # ============================================================================ | |
| # Example 1: Pattern-Based Detector | |
| # ============================================================================ | |
| # Detects if arguments match any regex pattern | |
| # class PatternDetector(ScannerPlugin): | |
| # """Detects patterns in tool arguments.""" | |
| # | |
| # def __init__(self, patterns: list[str], risk_score: float = 0.5): | |
| # self.patterns = patterns | |
| # self.risk_score = risk_score | |
| # super().__init__( | |
| # metadata=PluginMetadata( | |
| # name="PatternDetector", | |
| # version="1.0.0", | |
| # description="Detects custom regex patterns", | |
| # author="Your Name", | |
| # ) | |
| # ) | |
| # | |
| # def scan(self, user_id, server_key, tool, arguments, llm_context=None): | |
| # context = (llm_context or "") + " " + str(arguments) | |
| # | |
| # for pattern in self.patterns: | |
| # if re.search(pattern, context, re.IGNORECASE): | |
| # return ScanResult( | |
| # plugin_name=self.get_metadata().name, | |
| # detected=True, | |
| # risk_score=self.risk_score, | |
| # reasons=[f"Pattern matched: {pattern}"], | |
| # flags={"pattern_match": True}, | |
| # ) | |
| # | |
| # return ScanResult( | |
| # plugin_name=self.get_metadata().name, | |
| # detected=False, | |
| # risk_score=0.0, | |
| # reasons=["No patterns matched"], | |
| # ) | |
| # | |
| # # Usage: | |
| # # plugin = PatternDetector( | |
| # # patterns=[r"delete.*all", r"drop.*database"], | |
| # # risk_score=0.7 | |
| # # ) | |
| # ============================================================================ | |
| # Example 2: Server-Specific Detector | |
| # ============================================================================ | |
| # Only checks specific server types | |
| # class FilesystemOnlyDetector(ScannerPlugin): | |
| # """Detects threats specific to filesystem operations.""" | |
| # | |
| # def scan(self, user_id, server_key, tool, arguments, llm_context=None): | |
| # # Only apply to filesystem server | |
| # if server_key != "filesystem": | |
| # return ScanResult( | |
| # plugin_name=self.get_metadata().name, | |
| # detected=False, | |
| # risk_score=0.0, | |
| # reasons=["Not applicable to this server type"], | |
| # ) | |
| # | |
| # # Check for dangerous file operations | |
| # dangerous_tools = {"delete_file", "truncate", "chmod"} | |
| # if tool in dangerous_tools: | |
| # return ScanResult( | |
| # plugin_name=self.get_metadata().name, | |
| # detected=True, | |
| # risk_score=0.3, | |
| # reasons=[f"Dangerous file operation: {tool}"], | |
| # flags={"dangerous_tool": True}, | |
| # ) | |
| # | |
| # return ScanResult( | |
| # plugin_name=self.get_metadata().name, | |
| # detected=False, | |
| # risk_score=0.0, | |
| # reasons=["Safe file operation"], | |
| # ) | |
| # | |
| # # plugin = FilesystemOnlyDetector() | |
| # ============================================================================ | |
| # Example 3: Context-Aware Detector | |
| # ============================================================================ | |
| # Analyzes LLM context for intent | |
| # class DestructiveIntentDetector(ScannerPlugin): | |
| # """Detects intent to perform destructive actions.""" | |
| # | |
| # DESTRUCTIVE_PATTERNS = [ | |
| # r"delete\s+(all|everything|the\s+(whole|entire))", | |
| # r"drop\s+(database|table|schema)", | |
| # r"wipe.*data", | |
| # r"remove.*permanently", | |
| # ] | |
| # | |
| # def scan(self, user_id, server_key, tool, arguments, llm_context=None): | |
| # if not llm_context: | |
| # return ScanResult( | |
| # plugin_name=self.get_metadata().name, | |
| # detected=False, | |
| # risk_score=0.0, | |
| # reasons=["No context provided"], | |
| # ) | |
| # | |
| # detected = False | |
| # for pattern in self.DESTRUCTIVE_PATTERNS: | |
| # if re.search(pattern, llm_context, re.IGNORECASE): | |
| # detected = True | |
| # break | |
| # | |
| # return ScanResult( | |
| # plugin_name=self.get_metadata().name, | |
| # detected=detected, | |
| # risk_score=0.8 if detected else 0.0, | |
| # reasons=["Destructive intent detected"] if detected else ["No destructive intent"], | |
| # flags={"destructive_intent": detected}, | |
| # ) | |
| # | |
| # # plugin = DestructiveIntentDetector() | |
| # ============================================================================ | |
| # Example 4: Argument Value Detector | |
| # ============================================================================ | |
| # Checks specific argument values | |
| # class LargePayloadDetector(ScannerPlugin): | |
| # """Detects suspiciously large payloads.""" | |
| # | |
| # def scan(self, user_id, server_key, tool, arguments, llm_context=None): | |
| # total_size = sum( | |
| # len(str(v)) | |
| # for v in arguments.values() | |
| # ) | |
| # | |
| # if total_size > 100000: # 100KB | |
| # return ScanResult( | |
| # plugin_name=self.get_metadata().name, | |
| # detected=True, | |
| # risk_score=0.4, | |
| # reasons=[f"Large payload detected: {total_size} bytes"], | |
| # flags={"large_payload": True}, | |
| # metadata={"payload_size": total_size}, | |
| # ) | |
| # | |
| # return ScanResult( | |
| # plugin_name=self.get_metadata().name, | |
| # detected=False, | |
| # risk_score=0.0, | |
| # reasons=["Payload size acceptable"], | |
| # ) | |
| # | |
| # # plugin = LargePayloadDetector() | |
| # ============================================================================ | |
| # Example 5: Multi-Factor Detector | |
| # ============================================================================ | |
| # Combines multiple heuristics | |
| # class MultiFactorThreatDetector(ScannerPlugin): | |
| # """Combines multiple threat signals.""" | |
| # | |
| # def scan(self, user_id, server_key, tool, arguments, llm_context=None): | |
| # risk_score = 0.0 | |
| # reasons = [] | |
| # flags = {} | |
| # | |
| # # Factor 1: Check if server is sensitive | |
| # if server_key in {"filesystem", "database"}: | |
| # risk_score += 0.1 | |
| # reasons.append("Sensitive server type") | |
| # flags["sensitive_server"] = True | |
| # | |
| # # Factor 2: Check tool name | |
| # if any(word in tool.lower() for word in ["delete", "drop", "truncate"]): | |
| # risk_score += 0.2 | |
| # reasons.append("Dangerous tool") | |
| # flags["dangerous_tool"] = True | |
| # | |
| # # Factor 3: Check context | |
| # context = (llm_context or "") + " " + str(arguments) | |
| # if "all" in context.lower() and "delete" in context.lower(): | |
| # risk_score += 0.3 | |
| # reasons.append("Delete-all pattern detected") | |
| # flags["delete_all"] = True | |
| # | |
| # return ScanResult( | |
| # plugin_name=self.get_metadata().name, | |
| # detected=risk_score > 0.3, | |
| # risk_score=min(1.0, risk_score), | |
| # reasons=reasons if reasons else ["No threats detected"], | |
| # flags=flags, | |
| # ) | |
| # | |
| # # plugin = MultiFactorThreatDetector() | |