yuki-sui's picture
Upload 169 files
ed71b0e verified
"""
TEMPLATE: Create custom scanner plugins by copying and modifying this file.
Steps:
1. Copy this file to a new name: my_detector.py
2. Modify the class name and metadata
3. Implement the scan() method with your detection logic
4. Place in plugins/builtin/ or plugins/custom/ directory
5. It will be auto-discovered and loaded
See PLUGIN_SYSTEM.md for detailed documentation.
"""
from __future__ import annotations
import re
from typing import Any, Dict, Optional
from ..base import ScannerPlugin, ScanResult, PluginMetadata
class MyCustomDetector(ScannerPlugin):
"""
TODO: Replace with your detector description.
Detects my custom threat pattern in tool calls.
"""
def __init__(self):
super().__init__(
metadata=PluginMetadata(
name="MyCustomDetector", # TODO: Change to your plugin name
version="1.0.0",
description="TODO: Add your detector description",
author="Your Name",
)
)
def scan(
self,
user_id: Optional[str],
server_key: str,
tool: str,
arguments: Dict[str, Any],
llm_context: Optional[str] = None,
) -> ScanResult:
"""
Scan for custom threats.
TODO: Replace with your implementation.
Args:
user_id: User identifier
server_key: Server key (e.g., 'filesystem', 'fetch')
tool: Tool name
arguments: Tool arguments
llm_context: Optional context from LLM
Returns:
ScanResult with detection status and risk score
"""
reasons = []
flags = {}
# TODO: Implement your detection logic
# Example: detect if arguments contain a specific pattern
context = (llm_context or "") + " " + self._flatten_json(arguments)
detected = self._check_threat(context, arguments)
if detected:
reasons.append("TODO: Add reason for detection")
flags["custom_threat"] = True
# TODO: Adjust risk score (0.0 = no threat, 1.0 = critical)
risk_score = 0.5 if detected else 0.0
return ScanResult(
plugin_name=self.get_metadata().name,
detected=detected,
risk_score=risk_score,
reasons=reasons if reasons else ["No threats detected."],
flags=flags,
)
def _check_threat(self, context: str, arguments: Dict[str, Any]) -> bool:
"""
TODO: Implement your threat detection logic.
Example patterns:
- Check for specific keywords
- Look for dangerous functions
- Detect suspicious patterns
- Analyze argument types/values
"""
# Placeholder implementation
return False
def _flatten_json(self, value: Any) -> str:
"""Helper: flatten nested structures to string."""
if isinstance(value, dict):
return " ".join(self._flatten_json(v) for v in value.values())
if isinstance(value, list):
return " ".join(self._flatten_json(v) for v in value)
return str(value)
# IMPORTANT: Export as module-level variable so plugin loader can find it
# This enables auto-discovery when the file is loaded
plugin = MyCustomDetector()
# ============================================================================
# EXAMPLE IMPLEMENTATIONS BELOW (uncomment and modify as needed)
# ============================================================================
# ============================================================================
# Example 1: Pattern-Based Detector
# ============================================================================
# Detects if arguments match any regex pattern
# class PatternDetector(ScannerPlugin):
# """Detects patterns in tool arguments."""
#
# def __init__(self, patterns: list[str], risk_score: float = 0.5):
# self.patterns = patterns
# self.risk_score = risk_score
# super().__init__(
# metadata=PluginMetadata(
# name="PatternDetector",
# version="1.0.0",
# description="Detects custom regex patterns",
# author="Your Name",
# )
# )
#
# def scan(self, user_id, server_key, tool, arguments, llm_context=None):
# context = (llm_context or "") + " " + str(arguments)
#
# for pattern in self.patterns:
# if re.search(pattern, context, re.IGNORECASE):
# return ScanResult(
# plugin_name=self.get_metadata().name,
# detected=True,
# risk_score=self.risk_score,
# reasons=[f"Pattern matched: {pattern}"],
# flags={"pattern_match": True},
# )
#
# return ScanResult(
# plugin_name=self.get_metadata().name,
# detected=False,
# risk_score=0.0,
# reasons=["No patterns matched"],
# )
#
# # Usage:
# # plugin = PatternDetector(
# # patterns=[r"delete.*all", r"drop.*database"],
# # risk_score=0.7
# # )
# ============================================================================
# Example 2: Server-Specific Detector
# ============================================================================
# Only checks specific server types
# class FilesystemOnlyDetector(ScannerPlugin):
# """Detects threats specific to filesystem operations."""
#
# def scan(self, user_id, server_key, tool, arguments, llm_context=None):
# # Only apply to filesystem server
# if server_key != "filesystem":
# return ScanResult(
# plugin_name=self.get_metadata().name,
# detected=False,
# risk_score=0.0,
# reasons=["Not applicable to this server type"],
# )
#
# # Check for dangerous file operations
# dangerous_tools = {"delete_file", "truncate", "chmod"}
# if tool in dangerous_tools:
# return ScanResult(
# plugin_name=self.get_metadata().name,
# detected=True,
# risk_score=0.3,
# reasons=[f"Dangerous file operation: {tool}"],
# flags={"dangerous_tool": True},
# )
#
# return ScanResult(
# plugin_name=self.get_metadata().name,
# detected=False,
# risk_score=0.0,
# reasons=["Safe file operation"],
# )
#
# # plugin = FilesystemOnlyDetector()
# ============================================================================
# Example 3: Context-Aware Detector
# ============================================================================
# Analyzes LLM context for intent
# class DestructiveIntentDetector(ScannerPlugin):
# """Detects intent to perform destructive actions."""
#
# DESTRUCTIVE_PATTERNS = [
# r"delete\s+(all|everything|the\s+(whole|entire))",
# r"drop\s+(database|table|schema)",
# r"wipe.*data",
# r"remove.*permanently",
# ]
#
# def scan(self, user_id, server_key, tool, arguments, llm_context=None):
# if not llm_context:
# return ScanResult(
# plugin_name=self.get_metadata().name,
# detected=False,
# risk_score=0.0,
# reasons=["No context provided"],
# )
#
# detected = False
# for pattern in self.DESTRUCTIVE_PATTERNS:
# if re.search(pattern, llm_context, re.IGNORECASE):
# detected = True
# break
#
# return ScanResult(
# plugin_name=self.get_metadata().name,
# detected=detected,
# risk_score=0.8 if detected else 0.0,
# reasons=["Destructive intent detected"] if detected else ["No destructive intent"],
# flags={"destructive_intent": detected},
# )
#
# # plugin = DestructiveIntentDetector()
# ============================================================================
# Example 4: Argument Value Detector
# ============================================================================
# Checks specific argument values
# class LargePayloadDetector(ScannerPlugin):
# """Detects suspiciously large payloads."""
#
# def scan(self, user_id, server_key, tool, arguments, llm_context=None):
# total_size = sum(
# len(str(v))
# for v in arguments.values()
# )
#
# if total_size > 100000: # 100KB
# return ScanResult(
# plugin_name=self.get_metadata().name,
# detected=True,
# risk_score=0.4,
# reasons=[f"Large payload detected: {total_size} bytes"],
# flags={"large_payload": True},
# metadata={"payload_size": total_size},
# )
#
# return ScanResult(
# plugin_name=self.get_metadata().name,
# detected=False,
# risk_score=0.0,
# reasons=["Payload size acceptable"],
# )
#
# # plugin = LargePayloadDetector()
# ============================================================================
# Example 5: Multi-Factor Detector
# ============================================================================
# Combines multiple heuristics
# class MultiFactorThreatDetector(ScannerPlugin):
# """Combines multiple threat signals."""
#
# def scan(self, user_id, server_key, tool, arguments, llm_context=None):
# risk_score = 0.0
# reasons = []
# flags = {}
#
# # Factor 1: Check if server is sensitive
# if server_key in {"filesystem", "database"}:
# risk_score += 0.1
# reasons.append("Sensitive server type")
# flags["sensitive_server"] = True
#
# # Factor 2: Check tool name
# if any(word in tool.lower() for word in ["delete", "drop", "truncate"]):
# risk_score += 0.2
# reasons.append("Dangerous tool")
# flags["dangerous_tool"] = True
#
# # Factor 3: Check context
# context = (llm_context or "") + " " + str(arguments)
# if "all" in context.lower() and "delete" in context.lower():
# risk_score += 0.3
# reasons.append("Delete-all pattern detected")
# flags["delete_all"] = True
#
# return ScanResult(
# plugin_name=self.get_metadata().name,
# detected=risk_score > 0.3,
# risk_score=min(1.0, risk_score),
# reasons=reasons if reasons else ["No threats detected"],
# flags=flags,
# )
#
# # plugin = MultiFactorThreatDetector()