A newer version of the Gradio SDK is available: 6.13.0
Security Gateway - Comprehensive Documentation
A production-grade MCP security layer providing intelligent threat detection, policy enforcement, input/output sanitization, rate limiting, and comprehensive audit logging for all access to downstream MCP servers.
Table of Contents
- System Overview
- Architecture
- Core Components
- Security Features
- Threat Detection Plugins
- Policy Enforcement
- Input/Output Sanitization
- Rate Limiting
- Audit Logging
- Configuration
- API Reference
- Integration Guide
- Deployment
- Performance & Optimization
- Troubleshooting
👥 Team
Team Name: MemKrew
Team Members:
- Yuki Sui - @yukisui22 - Lead Developer & AI Architect
- Charles Plowman - @Plowmann - Deployment Engineer
- Long Truong - @LongTTruong - Developer
- Albert Nguyen - @TigerInATux - Developer
System Overview
Purpose
The Security Gateway acts as a security checkpoint between LLM clients (Claude, OpenAI, etc.) and downstream MCP servers. It:
- Analyzes every request for threat patterns (10 specialized detectors)
- Makes decisions using multi-threshold policy (block, redact, or allow)
- Sanitizes inputs (removes PII, detects path traversal) and outputs
- Limits rate of requests per user (sliding window algorithm)
- Logs complete audit trail in JSONL format for forensics
Key Innovation
Instead of simple allow/deny rules, the gateway uses a risk scoring engine with 10 specialized plugins that detect:
- Sophisticated jailbreak attempts
- SSRF attacks targeting cloud metadata
- Data theft disguised as legitimate operations
- Competitive intelligence harvesting
- Credential exfiltration with operational cover stories
- Broad-scope data enumeration
Threats Protected Against
| Threat Category | Detection Mechanism | Risk If Found | Action |
|---|---|---|---|
| Jailbreak/Prompt Injection | 30+ regex patterns | HIGH (0.5-0.8) | BLOCK |
| SSRF Attacks | IP range + metadata endpoint blocking | CRITICAL (0.7) | BLOCK |
| SQL Injection | SQL keyword detection | MEDIUM (0.35) | Aggregate score |
| Path Traversal | ../ detection + sensitive path list | MEDIUM-HIGH (0.35-0.55) | BLOCK at gateway |
| Data Exfiltration | Intent patterns + large payloads | MEDIUM-HIGH (0.3-0.7) | Flag & redact |
| Competitive Intelligence | Competitor + harvesting pattern detection | HIGH (0.5+) | BLOCK |
| Code Extraction | Source code harvesting patterns | HIGH (0.45-0.55) | Block/Flag |
| Broad Enumeration | "all X across all Y" patterns | MEDIUM (0.15-0.35) | Aggregate |
| Credential Theft | Operational disguise patterns | CRITICAL (0.35-0.65) | BLOCK if fetch |
| DoS/Rate Abuse | Sliding window per-user limits | HIGH | THROTTLE |
Architecture
High-Level Flow
┌──────────────────────────────────────────────────────────┐
│ LLM CLIENT │
│ (Claude, OpenAI, Gemini, etc.) │
└────────────────────┬─────────────────────────────────────┘
│
│ HTTP or Stdio Transport
│
┌────────────────▼─────────────────────┐
│ SECURITY GATEWAY (Main Endpoint) │
│ POST /tools/secure_call │
└────────────────┬─────────────────────┘
│
┌────────────────▼─────────────────────────────────────┐
│ 1. RATE LIMIT CHECK │
│ • Sliding window (default: 60 calls/60s) │
│ • Per-user tracking │
│ • Returns 429 if exceeded │
└────────────────┬─────────────────────────────────────┘
│
┌────────────────▼─────────────────────────────────────┐
│ 2. SERVER/TOOL VALIDATION │
│ • Check DISCOVERED_TOOLS registry │
│ • Fallback to configured servers │
└────────────────┬─────────────────────────────────────┘
│
┌────────────────▼─────────────────────────────────────┐
│ 3. RISK SCORING (Plugin System) │
│ • 10 specialized threat detectors │
│ • Plugin-based architecture │
│ • Produces risk_score (0.0-1.0) │
│ • Lists all detected threats │
└────────────────┬─────────────────────────────────────┘
│
┌────────────────▼─────────────────────────────────────┐
│ 4. POLICY DECISION │
│ • score >= 0.75: BLOCK │
│ • score 0.40-0.74: ALLOW + REDACT OUTPUT │
│ • score < 0.40: ALLOW │
│ • Hard-blocks: SSRF, jailbreak, data theft │
└────────────────┬─────────────────────────────────────┘
│
┌────────────────▼─────────────────────────────────────┐
│ 5. INPUT SANITIZATION │
│ • Detect + redact PII (emails, phones, keys) │
│ • Detect path traversal (../) │
│ • Clamp fetch max_length to prevent exfil │
│ • Block if critical path args modified │
└────────────────┬─────────────────────────────────────┘
│
(If ALLOWED)│
│
┌────────────────▼──────────────────────────────────┐
│ 6. DOWNSTREAM EXECUTION │
│ • Call MCP server with sanitized args │
│ • 60 second timeout per call │
│ • Automatic auth from servers.yaml │
└────────────────┬──────────────────────────────────┘
│
┌────────────────▼──────────────────────────────────┐
│ 7. OUTPUT SANITIZATION │
│ • Optionally redact output based on policy │
│ • Apply same PII patterns │
└────────────────┬──────────────────────────────────┘
│
┌────────────────▼──────────────────────────────────┐
│ 8. COMPREHENSIVE AUDITING │
│ • Log to JSONL audit file │
│ • Capture raw args, sanitized args │
│ • Store risk assessment, policy decision │
│ • Include timing and outcome │
└────────────────┬──────────────────────────────────┘
│
│ Response with decision metadata
│
┌────────────────▼──────────────────────────────────┐
│ Return to LLM Client │
│ (allowed/redacted/blocked, metadata, result) │
└───────────────────────────────────────────────────┘
System Layers
┌─────────────────────────────────────────────────────────┐
│ Policy Layer │
│ (Decide: block, redact, allow based on risk score) │
└─────────────────────────────────────────────────────────┘
↑
┌─────────────────────────────────────────────────────────┐
│ Risk Scoring Layer │
│ (10 plugins analyze for threat patterns) │
│ - JailbreakDetector │
│ - SSRFDetector │
│ - SQLInjectionDetector │
│ - PathTraversalDetector │
│ - ExfiltrationDetector │
│ - DataTheftDetector │
│ - CodeExtractionDetector │
│ - EnumerationDetector │
│ - OperationalDisguiseDetector │
│ - PayloadSizeDetector │
└─────────────────────────────────────────────────────────┘
↑
┌─────────────────────────────────────────────────────────┐
│ Input/Output Sanitization Layer │
│ (Redact PII, detect traversal, clamp sizes) │
└─────────────────────────────────────────────────────────┘
↑
┌─────────────────────────────────────────────────────────┐
│ Rate Limiting Layer │
│ (Sliding window per user, 60 calls/60s default) │
└─────────────────────────────────────────────────────────┘
↑
┌─────────────────────────────────────────────────────────┐
│ Audit & Logging Layer │
│ (JSONL format, complete request/response trace) │
└─────────────────────────────────────────────────────────┘
Core Components
1. Main Gateway Server (server.py)
Lines: ~600 | Purpose: HTTP MCP server with main secure_call tool
Key Method:
@mcp.tool()
async def secure_call(
user_id: str, # Logical user (e.g., "admin", "judge-1")
server: str, # Downstream server key (e.g., "ultimate_scraper")
tool: str, # Tool name (e.g., "searchEventListings")
arguments: dict, # Tool arguments
llm_context: Optional[str] = None # Optional prompt for risk analysis
) -> SecureCallOutput
Request Processing:
- Rate limit check (per-user)
- Server/tool validation
- Risk scoring (all 10 plugins)
- Policy decision (block, redact, allow)
- Input sanitization
- Downstream execution
- Output sanitization
- Audit logging
Response Fields:
allowed- Security decision (boolean)redacted- Output was sanitized (boolean)reason- Decision explanationrisk_score- Computed risk (0.0-1.0)risk_factors- List of detected threatspolicy_decision- Label: "allow", "redacted", "blocked", "error", "timeout"execution_time_ms- Performance metricdownstream_result- Actual tool output
2. Risk Scoring Engine (risk_model.py)
Lines: ~150 | Purpose: Plugin-based threat detection and risk aggregation
Architecture:
class PluginRegistry:
def scan_all(user_id, server, tool, args, llm_context) -> Dict:
# Run all enabled plugins in sequence
# Aggregate results into composite risk score
# Combine threat reasons and flags
Plugin Execution:
- Each plugin runs independently
- Produces individual risk_score (0.0-1.0)
- Returns reasons and flags
- Results aggregated:
- Sum all scores (capped at 1.0)
- Merge all reasons into single list
- Combine all flags into single dict
Special Handling:
native/code_interpreter→ Base risk 0.8 (inherently dangerous)web-search→ Risk capped at 0.35 (read-only access)ultimate_scraper+ competitor language → Flagged as data theft
3. Threat Detection Plugins (plugins/builtin/)
See Threat Detection Plugins section below.
4. Policy Enforcement (policy.py)
Lines: ~200 | Purpose: Multi-threshold decision making
Decision Logic:
if risk_score >= 0.75:
# HARD BLOCKS (irrespective of score)
if flags["ssrf_attempt"]: BLOCK
if flags["malicious_url"]: BLOCK
if flags["jailbreak_like"]: BLOCK
if flags["data_theft_like"]: BLOCK
# Score-based blocks
BLOCK with reason (e.g., "credential theft detected")
elif risk_score >= 0.40:
# Medium risk
ALLOW but REDACT OUTPUT
else: # risk_score < 0.40
# Low risk
ALLOW
Configuration:
HIGH_RISK_BLOCK_THRESHOLD = 0.75 # Environment: HIGH_RISK_BLOCK_THRESHOLD
MEDIUM_RISK_REDACT_THRESHOLD = 0.40 # Environment: MEDIUM_RISK_REDACT_THRESHOLD
5. Input/Output Sanitization (sanitizer.py)
Lines: ~300 | Purpose: PII redaction and path traversal detection
PII Patterns Redacted:
- Email addresses:
[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+ - Phone numbers:
\+?\d[\d\s\-]{7,}\d - Credit card numbers: 13-16 digit sequences
- API keys:
sk-[A-Za-z0-9]{20,}(OpenAI format) - Generic long tokens: 32+ character alphanumeric
Path Traversal Detection:
- Pattern:
(\.\.[\\/])+(catches ../, ..\, etc.) - Blocked:
[REDACTED_PATH_TRAVERSAL]
Sensitive Path Blocking:
Unix/Linux Paths:
/etc(system config)/var/log(logs)/root,/home(user homes)/sys,/proc(kernel interfaces)
Windows Paths:
C:\Windows(system directory)C:\ProgramData(application data)C:\Users\Administrator(admin home)C:\config(configuration)
Size Clamping:
fetchmax_length clamped to 999,999 characters- Prevents exfiltration via massive downloads
Blocking Conditions: If critical path arguments (path, filepath, target) are modified by sanitization, block request with:
"Blocked: sanitized critical argument (path traversal or sensitive path detected)."
6. Rate Limiting (rate_limiter.py)
Lines: ~100 | Purpose: Sliding window rate limiting per user
Algorithm: Timestamp-based sliding window
Configuration:
max_calls: int = 60 # Environment: RATE_LIMIT_MAX_CALLS
window_seconds: int = 60 # Environment: RATE_LIMIT_WINDOW_SECONDS
Implementation:
class RateLimiter:
def __init__(self):
self.user_requests = {} # Dict[user_id] → deque of timestamps
self.lock = threading.Lock()
def check_and_increment(user_id: str) -> Tuple[bool, Dict]:
# Remove timestamps outside window
# Count remaining in window
# Return (allowed, info)
Response Info:
{
"limit": 60,
"remaining": 42,
"reset_in_seconds": 45.23,
"window_seconds": 60,
"current_count": 18
}
HTTP Headers (on 429):
X-RateLimit-Limit: 60
X-RateLimit-Remaining: 0
X-RateLimit-Reset: 1705316445
Retry-After: 45
7. Audit Logging (audit.py)
Lines: ~200 | Purpose: JSONL audit trail for forensics
Log Format:
{
"timestamp": "2025-01-15T10:30:45.123456+00:00",
"user_id": "admin",
"server": "ultimate_scraper",
"tool": "searchEventListings",
"raw_arguments": {...},
"sanitized_arguments": {...},
"policy": {
"allow": true,
"redact_output": true,
"reason": "Allowed with redaction: medium risk (0.45)",
"risk_score": 0.45,
"flags": {"enumeration_pattern": true}
},
"risk": {
"score": 0.45,
"reasons": [
"Broad-scope enumeration pattern detected: 'all cities' + 'eventbrite'"
],
"flags": {"enumeration_pattern": true}
},
"outcome": {
"success": true,
"redacted": true
},
"execution_time_ms": 234.56
}
Append Locations:
- Modal environment:
/mnt/audit/audit.log.jsonl(volume-mounted) - Local development:
./audit.log.jsonl, with fallback to parent directory
API Endpoint:
GET /audit/latest → Returns last 100 entries (reverse chronological)
Security Features
Feature 1: Plugin-Based Risk Scoring
Advantage: Extensible, testable, disableable per-plugin
Architecture:
# Each plugin independently detects a threat category
# All plugins run, results aggregated
# Individual plugins can be enabled/disabled at runtime
class JailbreakDetector(ScannerPlugin):
def scan(...) -> ScanResult:
# Detects prompt injection, instruction override, etc.
return ScanResult(
detected=True,
risk_score=0.5,
reasons=["Instruction override pattern: 'ignore all previous'"],
flags={"jailbreak_like": True}
)
Feature 2: Multi-Threshold Policy
Advantage: Nuanced decisions beyond simple allow/deny
Thresholds:
- HIGH (≥0.75): BLOCK - Critical threats detected
- MEDIUM (0.40-0.74): ALLOW but REDACT OUTPUT - Concerning but not blocking
- LOW (<0.40): ALLOW - Low risk, no redaction
Hard Blocks (Irrespective of Score):
- SSRF attempts (internal IPs, cloud metadata)
- Malicious URLs detected
- Jailbreak patterns
- Explicit data theft flags
Feature 3: Input Sanitization
Advantage: Prevents attacks even if they slip past detection
Operations:
- Redact PII (emails, phones, API keys)
- Detect path traversal
- Clamp large fetch requests
- Block if critical args modified
Feature 4: Output Sanitization
Advantage: Prevents accidental data leakage in responses
Operations:
- Apply same PII redaction patterns to response
- Conditional redaction based on policy decision
- Logs whether output was redacted
Feature 5: Comprehensive Audit Trail
Advantage: Complete forensics capability
Logged Information:
- Raw and sanitized arguments
- Risk score and threat reasons
- Policy decision and threshold
- Actual tool output (or error)
- Execution timing
- Outcome (success, error, timeout, blocked)
Feature 6: Rate Limiting
Advantage: Prevents abuse and DoS attacks
Mechanism: Per-user sliding window
- Track request timestamps per user
- Maintain deque of timestamps within window
- Remove old timestamps on each check
- Count remaining requests
Configuration: 60 calls/60 seconds (configurable)
Threat Detection Plugins
Plugin 1: JailbreakDetector (plugins/builtin/jailbreak.py)
Detects: Prompt injection, instruction override, role-play attacks
Patterns:
- Override/ignore directives:
ignore (all|previous|your) (instructions|rules) - Bypass attempts:
bypass (safety|security|restrictions) - Role-play attacks:
act as (malware|hacker|attacker|developer) - Secret disclosure:
reveal.*secret,tell me.*api key - Instruction replacement:
forget.*previous instructions
Token Detection:
- OpenAI tokens:
sk-[A-Za-z0-9]{20,} - Generic long tokens: 32+ character sequences
Risk Score:
- Jailbreak pattern detected: +0.5
- Secrets found: +0.3
- Combined max: 0.8
Response Example:
ScanResult(
detected=True,
risk_score=0.6,
reasons=["Instruction override pattern detected: 'ignore all previous instructions'",
"Potential secret token found in arguments"],
flags={"jailbreak_like": True, "potential_secrets": True}
)
Plugin 2: SSRFDetector (plugins/builtin/ssrf.py)
Detects: Server-Side Request Forgery targeting internal/cloud infrastructure
Applicable Servers: fetch, scraper, ultimate_scraper, web-search, jina-ai
Protected IP Ranges:
- Loopback: 127.0.0.1, ::1
- Private: 10.0.0.0/8, 192.168.0.0/16, 172.16.0.0/12
- Link-local: 169.254.0.0/16
Cloud Metadata Endpoints:
- AWS:
169.254.169.254/latest/meta-data - GCP:
metadata.google.com,169.254.169.254 - Azure:
metadata.internal - Alibaba:
imds.aliyuncs.com
Malicious URL Patterns:
- Credential injection:
http://attacker.com@example.com(@ tricks) - Known malicious domains:
evil.com,attacker.com - Executable downloads: URLs ending in
.exe - Phishing domains:
phishing.*
Risk Score:
- SSRF to internal IP: +0.70 (flag:
ssrf_attempt) - SSRF to metadata: +0.70
- Malicious URL: +0.50 (flag:
malicious_url) - Combined max: 1.0
Response Example:
ScanResult(
detected=True,
risk_score=0.85,
reasons=["SSRF attempt to AWS metadata endpoint detected",
"URL pattern matches known malicious domain"],
flags={"ssrf_attempt": True, "malicious_url": True}
)
Plugin 3: SQLInjectionDetector (plugins/builtin/sql_injection.py)
Detects: SQL injection attacks
Destructive Patterns:
DROP TABLE,DROP DATABASEDELETE FROM,TRUNCATEINSERT INTO,UPDATE(modify data)UNION SELECT,UNION ALL
Authentication Bypass:
admin'--,admin' OR 1=1- Comment syntax:
--,/**/
Risk Score:
- SQL injection pattern: 0.35 (no hard block, aggregates with other threats)
Plugin 4: PathTraversalDetector (plugins/builtin/path_traversal.py)
Detects: Directory traversal attacks
Traversal Patterns:
- Unix:
../,../../, etc. - Windows:
..\,..\\, etc. - Encoded:
%2e%2e,..%5c
Sensitive Paths Blocked:
Unix/Linux:
/etc(system config)/var/log(logs)/root,/home(user homes)/sys,/proc(kernel)/dev(devices)
Windows:
C:\Windows(system)C:\ProgramDataC:\Users\AdministratorC:\configC:\Program Files
Risk Score:
- Traversal detected: +0.35
- Sensitive path access: +0.20
- Combined max: 0.55
Plugin 5: ExfiltrationDetector (plugins/builtin/exfiltration.py)
Detects: Data exfiltration attacks
Exfiltration Intent:
exfiltrate,send secrets,upload to webhookcopy to external,export sensitiveemail me the results
Sensitive Servers:
- filesystem, database, api, auth_service, credentials
- Any server + exfiltration intent = flag
Dangerous Tools:
- delete, drop, truncate, chmod, chown
- delete + exfiltration intent = flag
Network-Based Exfiltration:
- fetch + exfiltration intent = +0.40 (using fetch to exfiltrate)
Risk Score:
- Exfiltration intent: +0.30
- Sensitive server + intent: +0.15
- Dangerous tool + intent: +0.15
- Network exfiltration: +0.40
- Combined max: 1.0
Plugin 6: DataTheftDetector (plugins/builtin/data_theft.py)
Detects: Competitive intelligence and unauthorized data harvesting
Competitor Targeting:
competitor.*private,rival's.*emailscompetitor's events,private competitor data- Patterns: competitor + private/confidential = flag
Private Event Harvesting:
extract.*all.*attendees.*private.*eventharvest.*private.*event.*participant.*data- Intent: stealing attendee lists from private events
Bulk Enumeration:
all cities from eventbrite,every event worldwidecompile all listings- Geographic + platform enumeration = flag
Credential Harvesting:
extract.*javascript.*from.*event.*platformpull eventbrite api key from page- Code + credentials = high risk
Unverified Authority Claims:
- "I'm authorized", "this is for research"
- "legally", "compliance", "testing"
- Claims + scraping + platform = flag
Risk Score:
- Competitor targeting: +0.45
- Private event + harvesting: +0.40
- Bulk enumeration: +0.35
- Credential theft: +0.50
- Unverified authority + scraping: +0.25
- Combined max: 1.0
Plugin 7: CodeExtractionDetector (plugins/builtin/code_extraction.py)
Detects: Source code and intellectual property theft
Code Extraction Intent:
extract.*javascript,pull source codeunderstand implementation,reverse engineerdownload.*source,copy.*code
Target Keywords:
- website, site, eventbrite, ticketmaster, meetup, facebook
- competitor, rival, platform
Keyword Combinations:
- Code + website = flag
- Code + competitor = flag
- Reverse engineering + any = flag
Risk Score:
- Code extraction pattern: +0.45
- Code + target: +0.50
- Reverse engineering: +0.55
Plugin 8: EnumerationDetector (plugins/builtin/enumeration.py)
Detects: Broad-scope mass data collection
Enumeration Patterns:
all X across all Y: "all cities across all events"comprehensive/complete/full datasetevery X in Y: "every event in every city"all pages without limit(pagination bypass)
Heuristic Scoring:
- Multiple scope keywords (2+) + targets (1+): +0.20
- Aggregation + scope + targets: +0.25
- Action (scrape) + scope + targets: +0.20
- Geographic + enumeration: +0.25
- Pagination bypass: +0.20
- Mass operation: +0.15
Risk Score:
- Combined heuristics, typically 0.15-0.35
Plugin 9: OperationalDisguiseDetector (plugins/builtin/operational_disguise.py)
Detects: Credential theft disguised as operational tasks
Operational Disguise Tactics:
- Testing:
test credential rotation,verify API keys work - Debugging:
debug deployment, pull database credentials - Backup:
backup encryption keys,restore certificates - Rotation:
rotate api keys - Config:
environment configuration with secrets
Sensitive Credentials Targeted:
- API keys, secrets, passwords, tokens, bearers
- Database passwords, private keys, SSH keys
- Certificates, encryption keys
Risk Score:
- Operational disguise + credential: +0.35-0.65
- Fetch + explicit credential request: +0.60-0.65 (CRITICAL)
Plugin 10: PayloadSizeDetector (plugins/builtin/payload_size.py)
Detects: Oversized requests for exfiltration
Argument Size:
1000 characters: MEDIUM flag
Fetch Size Requests:
250k-500k characters: +0.20
500k characters: +0.30
1M characters: +0.40 (CRITICAL)
Policy Enforcement
Decision Making Flow
Input: Risk Score (0.0-1.0) + Threat Flags
Step 1: Check Hard Blocks
├─ flags["ssrf_attempt"] → BLOCK (0.70+)
├─ flags["malicious_url"] → BLOCK
├─ flags["jailbreak_like"] → BLOCK
├─ flags["data_theft_like"] → BLOCK
└─ flags["exfiltration_like"] → BLOCK
Step 2: Score-Based Decision
├─ score >= 0.75 → BLOCK
│ └─ Include specific threat reason
├─ score >= 0.40 → ALLOW + REDACT OUTPUT
│ └─ Reason: "Allowed with redaction: medium risk (X.XX)"
└─ score < 0.40 → ALLOW
└─ Reason: "Allowed: low risk (X.XX)"
Step 3: Return Decision
└─ PolicyDecision(
allow: bool,
redact_output: bool,
reason: str,
risk_score: float,
flags: Dict[str, bool]
)
Configuration Constants
# Thresholds (Environment Variables)
HIGH_RISK_BLOCK_THRESHOLD = 0.75 # RISK_BLOCK_THRESHOLD
MEDIUM_RISK_REDACT_THRESHOLD = 0.40 # RISK_REDACT_THRESHOLD
Policy Decision Labels
| Label | Meaning | Action |
|---|---|---|
allow |
Low risk, fully allowed | Execute tool, return full result |
redacted |
Medium risk, output sanitized | Execute tool, redact output |
blocked |
High risk, denied | Return error, don't execute |
error |
Request validation/processing failed | Return error details |
timeout |
Tool execution exceeded 60s | Return timeout error |
Input/Output Sanitization
PII Detection & Redaction
Patterns Detected:
| Type | Pattern | Redacted As |
|---|---|---|
[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+ |
[REDACTED_EMAIL] |
|
| Phone | \+?\d[\d\s\-]{7,}\d |
[REDACTED_PHONE] |
| Credit Card | 13-16 digit sequences | [REDACTED_CARD] |
| API Key (OpenAI) | sk-[A-Za-z0-9]{20,} |
[REDACTED_KEY] |
| Generic Token | 32+ character alphanumeric | [REDACTED_TOKEN] |
Path Traversal Detection
Detection:
- Pattern:
(\.\.[\\/])+(catches ../, ..\, etc.) - Applied to all string-type arguments
Blocking: If path-type arguments contain traversal:
"Blocked: sanitized critical argument (path traversal detected)."
Sensitive Path Blocking
Unix/Linux:
/etc, /var/log, /root, /home, /sys, /proc
Windows:
C:\Windows, C:\ProgramData, C:\Users\Administrator, C:\config
Blocking: If filesystem path contains sensitive path:
"Blocked: sanitized critical argument (sensitive path detected)."
Size Clamping
Fetch max_length:
- Clamped to 999,999 characters
- Prevents downloading massive files for exfiltration
- Downstream tool enforces actual limit
Rate Limiting
Sliding Window Algorithm
class RateLimiter:
def __init__(self, max_calls=60, window_seconds=60):
self.max_calls = max_calls
self.window_seconds = window_seconds
self.user_requests = {} # Dict[user_id] → deque[timestamp]
self.lock = threading.Lock()
def check_and_increment(user_id: str) -> Tuple[bool, Dict]:
with self.lock:
# Get user's request deque
requests = self.user_requests.get(user_id, deque())
# Remove timestamps outside window
now = time.time()
window_start = now - self.window_seconds
while requests and requests[0] < window_start:
requests.popleft()
# Check if under limit
if len(requests) < self.max_calls:
requests.append(now)
self.user_requests[user_id] = requests
return True, {
"limit": self.max_calls,
"remaining": self.max_calls - len(requests),
"reset_in_seconds": self.window_seconds
}
else:
# Rate limited
reset_time = requests[0] + self.window_seconds
return False, {
"limit": self.max_calls,
"remaining": 0,
"reset_in_seconds": reset_time - now
}
Response Headers
Successful Request:
X-RateLimit-Limit: 60
X-RateLimit-Remaining: 42
X-RateLimit-Reset: 1705316445
Rate Limited (429):
X-RateLimit-Limit: 60
X-RateLimit-Remaining: 0
X-RateLimit-Reset: 1705316445
Retry-After: 45
Audit Logging
JSONL Format
Each line is a valid JSON object:
{"timestamp": "...", "user_id": "...", ...}
{"timestamp": "...", "user_id": "...", ...}
Log Entries
Complete audit entry with all context:
{
"timestamp": ISO8601, # When request occurred
"user_id": str, # Logical user ID
"server": str, # Server key (ultimate_scraper, etc.)
"tool": str, # Tool name
"raw_arguments": dict, # Original args from user
"sanitized_arguments": dict, # After PII/traversal redaction
"policy": {
"allow": bool, # Was allowed?
"redact_output": bool, # Should output be redacted?
"reason": str, # Decision explanation
"risk_score": float, # 0.0-1.0
"flags": { # Threat flags detected
"ssrf_attempt": bool,
"jailbreak_like": bool,
"data_theft_like": bool,
...
}
},
"risk": {
"score": float, # Aggregated risk
"reasons": [str, ...], # All detected threats
"flags": dict, # All threat flags
},
"outcome": {
"success": bool, # Tool succeeded?
"redacted": bool, # Output was redacted?
"error": str, # If failed, error message
"category": str, # Error category
},
"execution_time_ms": float # Request duration
}
Log Location
Environment Variable: AUDIT_LOG_PATH
Default Locations:
- Modal:
/mnt/audit/audit.log.jsonl(volume-mounted) - Local:
./audit.log.jsonl - Fallback: Parent directory if local fails
API Access
Endpoint: GET /audit/latest
Returns:
{
"entries": [
{...}, # Most recent
{...},
... # Last 100 entries
{...} # Oldest
],
"total_available": 1000, # Total in file
"returned": 100 # Entries returned
}
Configuration
Configuration Files
servers.yaml - Downstream server definitions
servers:
web-search:
display_name: "Web Search (Brave + DuckDuckGo)"
url: "http://web-search:8001"
enabled: true
tags: ["search", "web"]
description: "General web search..."
auth:
type: "bearer"
header_name: "Authorization"
env_var: "BRAVE_API_KEY"
tool_parameters:
web_search:
query:
type: "string"
required: true
description: "Search query"
ultimate_scraper:
display_name: "Ultimate Event Scraper"
url: "http://ultimate-scraper:8002"
enabled: true
tags: ["events", "scraper"]
description: "Multi-platform event extraction..."
Environment Variables
Core Configuration:
# Audit logging
AUDIT_LOG_PATH=/mnt/audit/audit.log.jsonl
# Rate limiting (per-user)
RATE_LIMIT_MAX_CALLS=60
RATE_LIMIT_WINDOW_SECONDS=60
# Downstream calls
DOWNSTREAM_TIMEOUT_SECONDS=60
# Risk thresholds
HIGH_RISK_BLOCK_THRESHOLD=0.75
MEDIUM_RISK_REDACT_THRESHOLD=0.40
# Server authentication (per downstream server)
BRAVE_API_KEY=xxx
JINA_API_KEY=xxx
BLAXEL_API_KEY=xxx
TICKETMASTER_API_KEY=xxx
Server Overrides:
# Override individual server config
ULTIMATE_SCRAPER_URL=http://custom-scraper:9999
ULTIMATE_SCRAPER_ENABLED=true
WEB_SEARCH_ENABLED=false
API Reference
POST /tools/secure_call
Main endpoint for secure tool invocation
Request:
{
"user_id": "admin",
"server": "ultimate_scraper",
"tool": "searchEventListings",
"arguments": {
"url": "https://example.com/events",
"location": "New York",
"keyword": "jazz"
},
"llm_context": "The user is searching for jazz events in NYC"
}
Response (200 OK):
{
"allowed": true,
"redacted": false,
"reason": "Allowed: low risk (0.15)",
"risk_score": 0.15,
"risk_factors": [
"Broad-scope enumeration pattern detected"
],
"policy_decision": "allow",
"execution_time_ms": 234.56,
"downstream_result": {
"events": [...]
}
}
Response (400 Bad Request):
{
"allowed": false,
"reason": "Invalid server: 'unknown_server'",
"error_category": "validation",
"policy_decision": "error"
}
Response (429 Too Many Requests):
{
"allowed": false,
"reason": "Rate limit exceeded: 60 calls/60s",
"remaining": 0,
"reset_in_seconds": 45.23,
"policy_decision": "rate_limited"
}
Response (403 Forbidden - Policy Blocked):
{
"allowed": false,
"reason": "Blocked: credential theft detected",
"risk_score": 0.82,
"risk_factors": [
"Operational disguise pattern with credential fetch",
"Explicit API key extraction attempt"
],
"policy_decision": "blocked"
}
GET /tools/list
List all available tools (triggers auto-discovery)
Response:
{
"tools": {
"web-search": {
"web_search": {
"description": "Search the internet...",
"inputSchema": {...}
}
},
"ultimate_scraper": {
"searchEventListings": {...},
"scrapeEventPage": {...}
},
...
}
}
POST /tools/refresh
Force re-discovery of downstream tools
Response:
{
"refreshed": true,
"tools_discovered": 25,
"servers_contacted": 6,
"timestamp": "2025-01-15T10:30:45Z"
}
GET /config/servers
Get server configuration (servers.yaml)
Response:
{
"servers": [
{
"key": "web-search",
"display_name": "Web Search",
"url": "http://web-search:8001",
"enabled": true,
...
}
]
}
GET /audit/latest
Get last 100 audit entries
Response:
{
"entries": [
{
"timestamp": "2025-01-15T10:35:00Z",
"user_id": "admin",
"policy": {...},
"risk": {...},
...
},
...
],
"total_available": 5432,
"returned": 100
}
Integration Guide
Upstream Integration (LLM Clients)
Claude Desktop Integration
# In Claude Desktop config
{
"mcpServers": {
"security-gateway": {
"command": "python",
"args": ["/path/to/security_gateway/sse_server.py"],
"env": {
"AUDIT_LOG_PATH": "/logs/audit.jsonl",
"RATE_LIMIT_MAX_CALLS": "60"
}
}
}
}
HTTP API Integration
import httpx
async def call_tool(user_id, server, tool, arguments):
async with httpx.AsyncClient() as client:
response = await client.post(
"http://localhost:8000/tools/secure_call",
json={
"user_id": user_id,
"server": server,
"tool": tool,
"arguments": arguments
}
)
result = response.json()
if result["allowed"]:
return result["downstream_result"]
else:
raise SecurityException(result["reason"])
Downstream Integration (MCP Servers)
servers.yaml Configuration:
servers:
my-tool:
url: "http://my-tool:8000"
enabled: true
auth:
type: "bearer"
env_var: "MY_TOOL_API_KEY"
Tool Discovery:
- Gateway auto-discovers tools via
/tools/list - Calls tool with sanitized arguments
- Receives tool output
- Applies output sanitization
- Returns to LLM client
Deployment
Local Development
# Install dependencies
pip install -r requirements.txt
# Set environment variables
export AUDIT_LOG_PATH=./audit.log.jsonl
export RATE_LIMIT_MAX_CALLS=60
export HIGH_RISK_BLOCK_THRESHOLD=0.75
# Start gateway
python server.py
# Available at: http://localhost:8000
Docker
FROM python:3.11-slim
WORKDIR /app
COPY . .
RUN pip install -r requirements.txt
EXPOSE 8000
CMD ["python", "server.py"]
Run:
docker build -t security-gateway .
docker run -p 8000:8000 \
-e AUDIT_LOG_PATH=/var/log/audit.jsonl \
-v /var/log:/var/log \
security-gateway
Modal Deployment
# Deploy
modal deploy modal_app.py
# Configure secrets
modal secret create mcp-config \
--env AUDIT_LOG_PATH=/mnt/audit/audit.jsonl \
--env RATE_LIMIT_MAX_CALLS=100
Performance & Optimization
Per-Request Overhead
| Component | Time | Notes |
|---|---|---|
| Rate limit check | <1ms | O(1) with lock |
| Plugin scanning | 50-200ms | All 10 plugins sequentially |
| Sanitization | 10-50ms | Regex passes |
| Downstream call | 1-60s | Network latency |
| Output sanitization | 10-50ms | Same as input |
| Audit write | <1ms | Append to file |
| Total (without downstream) | 100-300ms | Gateway overhead |
| Total (with downstream) | 1-60s | Dominated by tool |
Caching
- Plugin instances: Cached at startup (reused)
- Discovered tools: Cached in memory + file
- Server configurations: Loaded from YAML, cached
- User request deque: Maintained in memory per user
Scalability Considerations
Single Instance:
- Thread-safe rate limiting with lock
- Per-user request tracking in memory
- Audit log as local JSONL file
Multiple Instances (Distributed):
- Rate limiting not distributed (per-instance)
- Solution: Use external rate limiter (Redis)
- Audit logs local to each instance
- Solution: Use centralized logging (CloudWatch, etc.)
- Tool discovery cached per instance
- Solution: Refresh endpoint forces re-discovery
Troubleshooting
Issue 1: "Rate limit exceeded"
Symptom:
Error: 429 Too Many Requests
Retry-After: 45
Cause: User exceeded 60 calls/60 seconds
Solution:
# Check limit configuration
echo $RATE_LIMIT_MAX_CALLS
# Increase limit if needed
export RATE_LIMIT_MAX_CALLS=120
# Or decrease window
export RATE_LIMIT_WINDOW_SECONDS=30
Issue 2: "Blocked: credential theft detected"
Symptom:
{
"allowed": false,
"reason": "Blocked: credential theft detected",
"risk_score": 0.82
}
Cause: Request matched credential theft patterns
Legitimate Cases:
- Testing credential rotation (operational context)
- Admin auditing (documented authorization)
- Backup/restore procedures
Solution:
# Add context to help scoring
response = await client.post(
"http://localhost:8000/tools/secure_call",
json={
"user_id": "admin",
"server": "...",
"tool": "...",
"arguments": {...},
"llm_context": "This is authorized credential rotation testing by the admin"
}
)
Context helps plugins differentiate legitimate from malicious patterns.
Issue 3: "Output redacted"
Symptom:
{
"allowed": true,
"redacted": true,
"reason": "Allowed with redaction: medium risk (0.45)"
}
Cause: Medium risk (0.40-0.74) → output was sanitized
What Happened:
- Some PII was detected and redacted
- Patterns: emails, phones, API keys
- Or generic tokens found
Example:
Original: "Contact: admin@company.com, key: sk-abc123..."
Redacted: "Contact: [REDACTED_EMAIL], key: [REDACTED_KEY]..."
Issue 4: "Tool not found"
Symptom:
{
"allowed": false,
"reason": "Server 'my-tool' not found in configuration",
"error_category": "validation"
}
Cause: Server not in servers.yaml or not discovered
Solution:
# Check available servers
curl http://localhost:8000/tools/list
# Refresh discovery
curl -X POST http://localhost:8000/tools/refresh
# Check servers.yaml
cat /path/to/servers.yaml
# Verify server is running
curl http://my-tool:8000/health
Issue 5: "Downstream timeout"
Symptom:
{
"allowed": true,
"policy_decision": "timeout",
"reason": "Tool execution exceeded 60 seconds"
}
Cause: Tool took longer than timeout
Solution:
# Increase timeout
export DOWNSTREAM_TIMEOUT_SECONDS=120
# Or optimize downstream server
# Check server performance
time curl -X POST http://downstream:8000/tools/...
Contributing
Adding a New Threat Detector
- Create plugin file:
plugins/builtin/my_detector.py
from plugins.base import ScannerPlugin, ScanResult
class MyDetector(ScannerPlugin):
def scan(self, user_id, server_key, tool, arguments, llm_context=None) -> ScanResult:
# Implement threat detection
detected = "malicious_pattern" in str(arguments).lower()
return ScanResult(
plugin_name="MyDetector",
detected=detected,
risk_score=0.5 if detected else 0.0,
reasons=["Pattern matched"] if detected else [],
flags={"my_flag": detected},
metadata={}
)
# Export instance for dynamic loading
plugin = MyDetector()
- Update plugin registry:
# In plugins/loader.py, plugin auto-discovers from /builtin/
# No changes needed - plugin will be auto-loaded
- Test:
curl -X POST http://localhost:8000/tools/secure_call \
-d '{"user_id": "test", "server": "web-search", "tool": "search", "arguments": {"query": "...malicious..."}}'
Updating Threat Detection Rules
Modify individual plugin files:
# In plugins/builtin/jailbreak.py
OVERRIDE_PATTERNS = [
r"ignore all previous instructions",
r"bypass security",
# Add new pattern
r"new custom pattern",
]
License
Same as parent project (MCP Security Hackathon)
Last Updated: 2025-11-28
Maintainer: MCP Security Team
Architecture: Plugin-based risk scoring with multi-threshold policy enforcement
Threat Detectors: 10 specialized plugins
Security Thresholds: 2 configurable (block: 0.75, redact: 0.40)
Rate Limiting: Per-user sliding window
Audit Trail: JSONL format with complete request/response logging
Production Ready: Yes