yuki-sui's picture
Upload 169 files
ed71b0e verified

A newer version of the Gradio SDK is available: 6.12.0

Upgrade

Security Gateway Plugin System

A modular, extensible threat detection framework for AI agent security.

What is This?

The plugin system transforms your security gateway from a monolithic risk assessment engine into a modular, composable architecture where:

  • Each threat detector is an independent plugin
  • Plugins can be enabled/disabled at runtime
  • New threats can be added without modifying core code
  • The system works with any LLM-powered agent, not just MCP

Quick Examples

Run Plugin System

from security_gateway.risk_model import initialize_plugins, compute_risk_with_plugins

# Load all built-in plugins
initialize_plugins()

# Scan a tool call
result = compute_risk_with_plugins(
    user_id="user1",
    server_key="filesystem",
    tool="read",
    arguments={"path": "/etc/passwd"}
)

print(f"Risk: {result['total_score']}")  # 0.55 (path traversal detected)
print(f"Threats: {result['detected_threats']}")  # ['PathTraversalDetector']

Use Hybrid (Legacy + Plugins)

from security_gateway.risk_model import compute_risk_hybrid

result = compute_risk_hybrid(
    user_id="user1",
    server_key="fetch",
    tool="fetch",
    arguments={"url": "http://localhost:8080"},
    use_plugins=True
)

print(f"Combined: {result['combined_score']}")  # Best of both systems

Create Custom Plugin

from security_gateway.plugins.base import ScannerPlugin, ScanResult, PluginMetadata

class MyDetector(ScannerPlugin):
    def __init__(self):
        super().__init__(PluginMetadata(
            name="MyDetector",
            description="Detects my custom threat"
        ))

    def scan(self, user_id, server_key, tool, arguments, llm_context=None):
        detected = self._check_threat(arguments)
        return ScanResult(
            plugin_name=self.get_metadata().name,
            detected=detected,
            risk_score=0.5 if detected else 0.0,
            reasons=["Custom threat found"] if detected else []
        )

    def _check_threat(self, arguments):
        # Your detection logic here
        return False

# Register it
from security_gateway.plugins.base import get_registry
registry = get_registry()
registry.register(MyDetector())

Manage Plugins

from security_gateway.plugins.base import get_registry

registry = get_registry()

# List all plugins
for name, plugin in registry.get_all_plugins().items():
    print(f"{name}: {'enabled' if plugin.is_enabled() else 'disabled'}")

# Control plugins
registry.disable_plugin("PathTraversalDetector")
registry.enable_plugin("PathTraversalDetector")

# Get only active plugins
active = registry.get_enabled_plugins()

Built-in Plugins

Plugin Detects Score
PathTraversalDetector Directory traversal, sensitive paths 0.35-0.55
SQLInjectionDetector SQL injection patterns 0.35
JailbreakDetector Override attempts, prompt injection 0.3-0.5
SSRFDetector Internal IP access, metadata endpoints 0.5-0.7

File Structure

plugins/
β”œβ”€β”€ __init__.py              # Package exports
β”œβ”€β”€ base.py                  # Core classes (ScannerPlugin, PluginRegistry)
β”œβ”€β”€ loader.py                # Plugin discovery and loading
β”œβ”€β”€ builtin/
β”‚   β”œβ”€β”€ path_traversal.py    # Path traversal detector
β”‚   β”œβ”€β”€ sql_injection.py     # SQL injection detector
β”‚   β”œβ”€β”€ jailbreak.py         # Jailbreak/prompt injection detector
β”‚   β”œβ”€β”€ ssrf.py              # SSRF detector
β”‚   └── TEMPLATE.py          # Template for custom plugins
β”œβ”€β”€ PLUGIN_SYSTEM.md         # Complete API documentation
└── README.md                # This file

../
β”œβ”€β”€ risk_model.py            # Integration with legacy system
└── INTEGRATION_GUIDE.md     # Step-by-step integration instructions

Getting Started

1. Initialize at Startup

# In server.py
from security_gateway.risk_model import initialize_plugins

async def _on_startup() -> None:
    await discover_downstream_tools()
    initialize_plugins()  # Load all plugins

2. Use in Risk Assessment

# Replace or supplement existing risk computation
from security_gateway.risk_model import compute_risk_with_plugins

result = compute_risk_with_plugins(
    user_id=data.user_id,
    server_key=server_key,
    tool=tool_name,
    arguments=raw_args,
    llm_context=data.llm_context,
)

3. Act on Results

if result['total_score'] > 0.75:
    # Block request
    policy.allow = False
elif result['detected_threats']:
    # Redact output
    policy.redact_output = True

Kaizen Approach: Gradual Integration

This system is designed for incremental improvement:

Phase 1 (Day 1): Initialize plugins alongside legacy system

# Both systems run in parallel
result = compute_risk_hybrid(use_plugins=True)

Phase 2 (Days 1-3): Monitor and compare

  • Compare legacy and plugin results
  • Adjust plugin patterns as needed
  • Build confidence

Phase 3 (Days 3-5): Switch to plugin-only

# Use new modular system exclusively
result = compute_risk_with_plugins(...)

Phase 4 (Days 5+): Extend with custom plugins

  • Add domain-specific detectors
  • Fine-tune for your use cases
  • Build on solid foundation

See INTEGRATION_GUIDE.md for detailed migration steps.

Key Concepts

ScannerPlugin

Abstract base class that all plugins extend. Implement scan() to detect threats.

ScanResult

Output of a plugin scan:

@dataclass
class ScanResult:
    plugin_name: str         # Which plugin detected this
    detected: bool           # Was threat found?
    risk_score: float        # 0.0-1.0, higher = worse
    reasons: List[str]       # Explanations
    flags: Dict[str, bool]   # Detailed signal flags
    metadata: Dict[str, Any] # Extra data

PluginRegistry

Central registry for all plugins:

  • Register/unregister plugins
  • Enable/disable at runtime
  • Run all plugins at once
  • Aggregate results

PluginLoader

Auto-discover and load plugins from:

  • Built-in directory
  • Custom directories
  • Module paths

Creating Custom Plugins

  1. Copy the template:

    cp plugins/builtin/TEMPLATE.py plugins/custom/my_detector.py
    
  2. Implement detection logic:

    def scan(self, user_id, server_key, tool, arguments, llm_context=None):
        detected = self._check_threat(arguments)
        return ScanResult(
            plugin_name=self.get_metadata().name,
            detected=detected,
            risk_score=0.5 if detected else 0.0,
            reasons=["Threat found"] if detected else []
        )
    
  3. Export the plugin:

    plugin = MyDetector()  # Auto-discovered!
    
  4. It's loaded automatically on next startup or via:

    loader.load_and_register_from_directory("./plugins/custom")
    

See PLUGIN_SYSTEM.md for detailed patterns and examples.

Integration with server.py

Minimal change required - just add plugin initialization:

# At startup
async def _on_startup() -> None:
    await discover_downstream_tools()
    initialize_plugins()  # Add this line

# In secure_call function
from security_gateway.risk_model import compute_risk_hybrid

risk_assessment = compute_risk_hybrid(
    user_id=data.user_id,
    server_key=server_key,
    tool=tool_name,
    arguments=raw_args,
    llm_context=data.llm_context,
    use_plugins=True
)

# Use combined score
from security_gateway.risk_model import RiskResult
risk = RiskResult(
    score=risk_assessment['combined_score'],
    reasons=risk_assessment['combined_reasons'],
    flags=risk_assessment['combined_flags']
)

See INTEGRATION_GUIDE.md for complete implementation details.

Configuration

Enable/Disable Plugins

registry = get_registry()
registry.disable_plugin("PathTraversalDetector")
registry.enable_plugin("PathTraversalDetector")

Load Custom Plugins

from security_gateway.plugins.loader import PluginLoader

loader = PluginLoader()
loader.load_and_register_from_directory("./custom_plugins")

Environment Variables

# Disable specific plugins
DISABLED_PLUGINS=SQLInjectionDetector,JailbreakDetector

# Enable only these
ENABLED_PLUGINS=PathTraversalDetector,SSRFDetector

Performance

  • Execution: O(n) where n = number of enabled plugins
  • Memory: ~1-2MB per plugin instance
  • Latency: Typically <50ms for all plugins combined

Optimize by:

  • Disabling unused plugins
  • Implementing efficient detection logic
  • Caching results for repeated calls

Testing

Test Individual Plugin

from security_gateway.plugins.builtin.path_traversal import PathTraversalDetector

plugin = PathTraversalDetector()
result = plugin.scan(
    "user1", "filesystem", "read",
    {"path": "../../etc/passwd"}
)

assert result.detected
assert result.risk_score > 0.3

Test Plugin Registry

from security_gateway.plugins.base import PluginRegistry

registry = PluginRegistry()
registry.register(PathTraversalDetector())

results = registry.scan_all(
    "user1", "filesystem", "read",
    {"path": "../../etc/passwd"}
)

assert len(results) == 1
assert list(results.values())[0].detected

See PLUGIN_SYSTEM.md for more examples.

Advanced Features

Dynamic Plugin Management

Monitor directory for new plugins and auto-load:

async def watch_plugins(directory, interval=60):
    loader = PluginLoader(get_registry())
    # Auto-load new plugin files

Plugin Metrics

Track detection rates and performance:

metrics = {
    "PathTraversalDetector": {
        "execution_count": 1024,
        "detection_count": 42,
        "avg_execution_ms": 2.3
    }
}

Conditional Plugins

Plugins that only run for certain contexts:

class FilesystemOnlyPlugin(ScannerPlugin):
    def scan(self, user_id, server_key, ...):
        if server_key != "filesystem":
            return ScanResult(
                detected=False,
                risk_score=0.0,
                reasons=["Not applicable"]
            )
        # Your detection logic

Comparison with Legacy System

Aspect Legacy Plugin System
Modularity Monolithic Modular
Extensibility Modify core Add plugins
Runtime Control Recompile Enable/disable
Testability Coupled Independent
Framework Lock-in MCP only Any framework
Learning Curve N/A Low (base class)

Migration Path

Legacy System Only
       ↓
Legacy + Plugins (Parallel) ← You are here
       ↓
Plugins Only (Recommended)
       ↓
Plugins + Custom Detectors (Advanced)

Troubleshooting

Plugins not loading?

from security_gateway.plugins.base import get_registry
registry = get_registry()
print(f"Plugins: {list(registry.get_all_plugins().keys())}")

Wrong risk scores?

result = compute_risk_hybrid(use_plugins=True)
print(f"Legacy: {result['legacy_result']['score']}")
print(f"Plugins: {result['plugin_result']['total_score']}")

Plugin execution error?

try:
    result = compute_risk_with_plugins(...)
except Exception as e:
    print(f"Error: {e}")
    # Fall back to legacy system
    result = compute_risk(...)

Documentation

  • PLUGIN_SYSTEM.md - Complete API reference
  • INTEGRATION_GUIDE.md - Step-by-step integration
  • builtin/TEMPLATE.py - Custom plugin template with examples
  • base.py - Inline documentation of core classes

Contributing

  1. Create plugin in builtin/ or custom/
  2. Extend ScannerPlugin
  3. Export as plugin = YourDetector()
  4. Add unit tests
  5. Document in PLUGIN_SYSTEM.md

Real-World Usage

E-commerce Platform

# Add custom plugin for payment info detection
class PaymentInfoDetector(ScannerPlugin):
    """Blocks attempts to log payment information."""
    # Detects credit cards, billing addresses, etc.

registry.register(PaymentInfoDetector())

Enterprise Security

# Fine-tune for corporate policies
registry.disable_plugin("SSRFDetector")  # Trust network
registry.register(IPBlocklistPlugin())    # Add custom blocklist

Multi-tenant SaaS

# Different plugins per tenant
if tenant.security_level == "high":
    registry.enable_plugin("JailbreakDetector")
    registry.register(TenantCustomDetector())

Support & Issues

For issues, bugs, or feature requests:

  1. Check PLUGIN_SYSTEM.md for API details
  2. Review examples in builtin/TEMPLATE.py
  3. Test with compute_risk_hybrid(use_plugins=True)
  4. Check console output from initialize_plugins()

License

Same as Security MCP project.


Ready to get started?

  1. Start with INTEGRATION_GUIDE.md for setup
  2. Reference PLUGIN_SYSTEM.md for API details
  3. Copy builtin/TEMPLATE.py for custom plugins
  4. Enable hybrid mode: compute_risk_hybrid(use_plugins=True)