A newer version of the Gradio SDK is available: 6.12.0
Security Gateway Plugin System
A modular, extensible threat detection framework for AI agent security.
What is This?
The plugin system transforms your security gateway from a monolithic risk assessment engine into a modular, composable architecture where:
- Each threat detector is an independent plugin
- Plugins can be enabled/disabled at runtime
- New threats can be added without modifying core code
- The system works with any LLM-powered agent, not just MCP
Quick Examples
Run Plugin System
from security_gateway.risk_model import initialize_plugins, compute_risk_with_plugins
# Load all built-in plugins
initialize_plugins()
# Scan a tool call
result = compute_risk_with_plugins(
user_id="user1",
server_key="filesystem",
tool="read",
arguments={"path": "/etc/passwd"}
)
print(f"Risk: {result['total_score']}") # 0.55 (path traversal detected)
print(f"Threats: {result['detected_threats']}") # ['PathTraversalDetector']
Use Hybrid (Legacy + Plugins)
from security_gateway.risk_model import compute_risk_hybrid
result = compute_risk_hybrid(
user_id="user1",
server_key="fetch",
tool="fetch",
arguments={"url": "http://localhost:8080"},
use_plugins=True
)
print(f"Combined: {result['combined_score']}") # Best of both systems
Create Custom Plugin
from security_gateway.plugins.base import ScannerPlugin, ScanResult, PluginMetadata
class MyDetector(ScannerPlugin):
def __init__(self):
super().__init__(PluginMetadata(
name="MyDetector",
description="Detects my custom threat"
))
def scan(self, user_id, server_key, tool, arguments, llm_context=None):
detected = self._check_threat(arguments)
return ScanResult(
plugin_name=self.get_metadata().name,
detected=detected,
risk_score=0.5 if detected else 0.0,
reasons=["Custom threat found"] if detected else []
)
def _check_threat(self, arguments):
# Your detection logic here
return False
# Register it
from security_gateway.plugins.base import get_registry
registry = get_registry()
registry.register(MyDetector())
Manage Plugins
from security_gateway.plugins.base import get_registry
registry = get_registry()
# List all plugins
for name, plugin in registry.get_all_plugins().items():
print(f"{name}: {'enabled' if plugin.is_enabled() else 'disabled'}")
# Control plugins
registry.disable_plugin("PathTraversalDetector")
registry.enable_plugin("PathTraversalDetector")
# Get only active plugins
active = registry.get_enabled_plugins()
Built-in Plugins
| Plugin | Detects | Score |
|---|---|---|
| PathTraversalDetector | Directory traversal, sensitive paths | 0.35-0.55 |
| SQLInjectionDetector | SQL injection patterns | 0.35 |
| JailbreakDetector | Override attempts, prompt injection | 0.3-0.5 |
| SSRFDetector | Internal IP access, metadata endpoints | 0.5-0.7 |
File Structure
plugins/
βββ __init__.py # Package exports
βββ base.py # Core classes (ScannerPlugin, PluginRegistry)
βββ loader.py # Plugin discovery and loading
βββ builtin/
β βββ path_traversal.py # Path traversal detector
β βββ sql_injection.py # SQL injection detector
β βββ jailbreak.py # Jailbreak/prompt injection detector
β βββ ssrf.py # SSRF detector
β βββ TEMPLATE.py # Template for custom plugins
βββ PLUGIN_SYSTEM.md # Complete API documentation
βββ README.md # This file
../
βββ risk_model.py # Integration with legacy system
βββ INTEGRATION_GUIDE.md # Step-by-step integration instructions
Getting Started
1. Initialize at Startup
# In server.py
from security_gateway.risk_model import initialize_plugins
async def _on_startup() -> None:
await discover_downstream_tools()
initialize_plugins() # Load all plugins
2. Use in Risk Assessment
# Replace or supplement existing risk computation
from security_gateway.risk_model import compute_risk_with_plugins
result = compute_risk_with_plugins(
user_id=data.user_id,
server_key=server_key,
tool=tool_name,
arguments=raw_args,
llm_context=data.llm_context,
)
3. Act on Results
if result['total_score'] > 0.75:
# Block request
policy.allow = False
elif result['detected_threats']:
# Redact output
policy.redact_output = True
Kaizen Approach: Gradual Integration
This system is designed for incremental improvement:
Phase 1 (Day 1): Initialize plugins alongside legacy system
# Both systems run in parallel
result = compute_risk_hybrid(use_plugins=True)
Phase 2 (Days 1-3): Monitor and compare
- Compare legacy and plugin results
- Adjust plugin patterns as needed
- Build confidence
Phase 3 (Days 3-5): Switch to plugin-only
# Use new modular system exclusively
result = compute_risk_with_plugins(...)
Phase 4 (Days 5+): Extend with custom plugins
- Add domain-specific detectors
- Fine-tune for your use cases
- Build on solid foundation
See INTEGRATION_GUIDE.md for detailed migration steps.
Key Concepts
ScannerPlugin
Abstract base class that all plugins extend. Implement scan() to detect threats.
ScanResult
Output of a plugin scan:
@dataclass
class ScanResult:
plugin_name: str # Which plugin detected this
detected: bool # Was threat found?
risk_score: float # 0.0-1.0, higher = worse
reasons: List[str] # Explanations
flags: Dict[str, bool] # Detailed signal flags
metadata: Dict[str, Any] # Extra data
PluginRegistry
Central registry for all plugins:
- Register/unregister plugins
- Enable/disable at runtime
- Run all plugins at once
- Aggregate results
PluginLoader
Auto-discover and load plugins from:
- Built-in directory
- Custom directories
- Module paths
Creating Custom Plugins
Copy the template:
cp plugins/builtin/TEMPLATE.py plugins/custom/my_detector.pyImplement detection logic:
def scan(self, user_id, server_key, tool, arguments, llm_context=None): detected = self._check_threat(arguments) return ScanResult( plugin_name=self.get_metadata().name, detected=detected, risk_score=0.5 if detected else 0.0, reasons=["Threat found"] if detected else [] )Export the plugin:
plugin = MyDetector() # Auto-discovered!It's loaded automatically on next startup or via:
loader.load_and_register_from_directory("./plugins/custom")
See PLUGIN_SYSTEM.md for detailed patterns and examples.
Integration with server.py
Minimal change required - just add plugin initialization:
# At startup
async def _on_startup() -> None:
await discover_downstream_tools()
initialize_plugins() # Add this line
# In secure_call function
from security_gateway.risk_model import compute_risk_hybrid
risk_assessment = compute_risk_hybrid(
user_id=data.user_id,
server_key=server_key,
tool=tool_name,
arguments=raw_args,
llm_context=data.llm_context,
use_plugins=True
)
# Use combined score
from security_gateway.risk_model import RiskResult
risk = RiskResult(
score=risk_assessment['combined_score'],
reasons=risk_assessment['combined_reasons'],
flags=risk_assessment['combined_flags']
)
See INTEGRATION_GUIDE.md for complete implementation details.
Configuration
Enable/Disable Plugins
registry = get_registry()
registry.disable_plugin("PathTraversalDetector")
registry.enable_plugin("PathTraversalDetector")
Load Custom Plugins
from security_gateway.plugins.loader import PluginLoader
loader = PluginLoader()
loader.load_and_register_from_directory("./custom_plugins")
Environment Variables
# Disable specific plugins
DISABLED_PLUGINS=SQLInjectionDetector,JailbreakDetector
# Enable only these
ENABLED_PLUGINS=PathTraversalDetector,SSRFDetector
Performance
- Execution: O(n) where n = number of enabled plugins
- Memory: ~1-2MB per plugin instance
- Latency: Typically <50ms for all plugins combined
Optimize by:
- Disabling unused plugins
- Implementing efficient detection logic
- Caching results for repeated calls
Testing
Test Individual Plugin
from security_gateway.plugins.builtin.path_traversal import PathTraversalDetector
plugin = PathTraversalDetector()
result = plugin.scan(
"user1", "filesystem", "read",
{"path": "../../etc/passwd"}
)
assert result.detected
assert result.risk_score > 0.3
Test Plugin Registry
from security_gateway.plugins.base import PluginRegistry
registry = PluginRegistry()
registry.register(PathTraversalDetector())
results = registry.scan_all(
"user1", "filesystem", "read",
{"path": "../../etc/passwd"}
)
assert len(results) == 1
assert list(results.values())[0].detected
See PLUGIN_SYSTEM.md for more examples.
Advanced Features
Dynamic Plugin Management
Monitor directory for new plugins and auto-load:
async def watch_plugins(directory, interval=60):
loader = PluginLoader(get_registry())
# Auto-load new plugin files
Plugin Metrics
Track detection rates and performance:
metrics = {
"PathTraversalDetector": {
"execution_count": 1024,
"detection_count": 42,
"avg_execution_ms": 2.3
}
}
Conditional Plugins
Plugins that only run for certain contexts:
class FilesystemOnlyPlugin(ScannerPlugin):
def scan(self, user_id, server_key, ...):
if server_key != "filesystem":
return ScanResult(
detected=False,
risk_score=0.0,
reasons=["Not applicable"]
)
# Your detection logic
Comparison with Legacy System
| Aspect | Legacy | Plugin System |
|---|---|---|
| Modularity | Monolithic | Modular |
| Extensibility | Modify core | Add plugins |
| Runtime Control | Recompile | Enable/disable |
| Testability | Coupled | Independent |
| Framework Lock-in | MCP only | Any framework |
| Learning Curve | N/A | Low (base class) |
Migration Path
Legacy System Only
β
Legacy + Plugins (Parallel) β You are here
β
Plugins Only (Recommended)
β
Plugins + Custom Detectors (Advanced)
Troubleshooting
Plugins not loading?
from security_gateway.plugins.base import get_registry
registry = get_registry()
print(f"Plugins: {list(registry.get_all_plugins().keys())}")
Wrong risk scores?
result = compute_risk_hybrid(use_plugins=True)
print(f"Legacy: {result['legacy_result']['score']}")
print(f"Plugins: {result['plugin_result']['total_score']}")
Plugin execution error?
try:
result = compute_risk_with_plugins(...)
except Exception as e:
print(f"Error: {e}")
# Fall back to legacy system
result = compute_risk(...)
Documentation
PLUGIN_SYSTEM.md- Complete API referenceINTEGRATION_GUIDE.md- Step-by-step integrationbuiltin/TEMPLATE.py- Custom plugin template with examplesbase.py- Inline documentation of core classes
Contributing
- Create plugin in
builtin/orcustom/ - Extend
ScannerPlugin - Export as
plugin = YourDetector() - Add unit tests
- Document in
PLUGIN_SYSTEM.md
Real-World Usage
E-commerce Platform
# Add custom plugin for payment info detection
class PaymentInfoDetector(ScannerPlugin):
"""Blocks attempts to log payment information."""
# Detects credit cards, billing addresses, etc.
registry.register(PaymentInfoDetector())
Enterprise Security
# Fine-tune for corporate policies
registry.disable_plugin("SSRFDetector") # Trust network
registry.register(IPBlocklistPlugin()) # Add custom blocklist
Multi-tenant SaaS
# Different plugins per tenant
if tenant.security_level == "high":
registry.enable_plugin("JailbreakDetector")
registry.register(TenantCustomDetector())
Support & Issues
For issues, bugs, or feature requests:
- Check
PLUGIN_SYSTEM.mdfor API details - Review examples in
builtin/TEMPLATE.py - Test with
compute_risk_hybrid(use_plugins=True) - Check console output from
initialize_plugins()
License
Same as Security MCP project.
Ready to get started?
- Start with
INTEGRATION_GUIDE.mdfor setup - Reference
PLUGIN_SYSTEM.mdfor API details - Copy
builtin/TEMPLATE.pyfor custom plugins - Enable hybrid mode:
compute_risk_hybrid(use_plugins=True)