yuki-sui's picture
Update security_gateway/README.md
8fbeaad verified

A newer version of the Gradio SDK is available: 6.13.0

Upgrade

Security Gateway - Comprehensive Documentation

A production-grade MCP security layer providing intelligent threat detection, policy enforcement, input/output sanitization, rate limiting, and comprehensive audit logging for all access to downstream MCP servers.

Table of Contents

  1. System Overview
  2. Architecture
  3. Core Components
  4. Security Features
  5. Threat Detection Plugins
  6. Policy Enforcement
  7. Input/Output Sanitization
  8. Rate Limiting
  9. Audit Logging
  10. Configuration
  11. API Reference
  12. Integration Guide
  13. Deployment
  14. Performance & Optimization
  15. Troubleshooting

👥 Team

Team Name: MemKrew

Team Members:

System Overview

Purpose

The Security Gateway acts as a security checkpoint between LLM clients (Claude, OpenAI, etc.) and downstream MCP servers. It:

  • Analyzes every request for threat patterns (10 specialized detectors)
  • Makes decisions using multi-threshold policy (block, redact, or allow)
  • Sanitizes inputs (removes PII, detects path traversal) and outputs
  • Limits rate of requests per user (sliding window algorithm)
  • Logs complete audit trail in JSONL format for forensics

Key Innovation

Instead of simple allow/deny rules, the gateway uses a risk scoring engine with 10 specialized plugins that detect:

  • Sophisticated jailbreak attempts
  • SSRF attacks targeting cloud metadata
  • Data theft disguised as legitimate operations
  • Competitive intelligence harvesting
  • Credential exfiltration with operational cover stories
  • Broad-scope data enumeration

Threats Protected Against

Threat Category Detection Mechanism Risk If Found Action
Jailbreak/Prompt Injection 30+ regex patterns HIGH (0.5-0.8) BLOCK
SSRF Attacks IP range + metadata endpoint blocking CRITICAL (0.7) BLOCK
SQL Injection SQL keyword detection MEDIUM (0.35) Aggregate score
Path Traversal ../ detection + sensitive path list MEDIUM-HIGH (0.35-0.55) BLOCK at gateway
Data Exfiltration Intent patterns + large payloads MEDIUM-HIGH (0.3-0.7) Flag & redact
Competitive Intelligence Competitor + harvesting pattern detection HIGH (0.5+) BLOCK
Code Extraction Source code harvesting patterns HIGH (0.45-0.55) Block/Flag
Broad Enumeration "all X across all Y" patterns MEDIUM (0.15-0.35) Aggregate
Credential Theft Operational disguise patterns CRITICAL (0.35-0.65) BLOCK if fetch
DoS/Rate Abuse Sliding window per-user limits HIGH THROTTLE

Architecture

High-Level Flow

┌──────────────────────────────────────────────────────────┐
│                    LLM CLIENT                            │
│            (Claude, OpenAI, Gemini, etc.)               │
└────────────────────┬─────────────────────────────────────┘
                     │
                     │ HTTP or Stdio Transport
                     │
    ┌────────────────▼─────────────────────┐
    │   SECURITY GATEWAY (Main Endpoint)   │
    │   POST /tools/secure_call            │
    └────────────────┬─────────────────────┘
                     │
    ┌────────────────▼─────────────────────────────────────┐
    │ 1. RATE LIMIT CHECK                                 │
    │    • Sliding window (default: 60 calls/60s)         │
    │    • Per-user tracking                              │
    │    • Returns 429 if exceeded                         │
    └────────────────┬─────────────────────────────────────┘
                     │
    ┌────────────────▼─────────────────────────────────────┐
    │ 2. SERVER/TOOL VALIDATION                            │
    │    • Check DISCOVERED_TOOLS registry                │
    │    • Fallback to configured servers                 │
    └────────────────┬─────────────────────────────────────┘
                     │
    ┌────────────────▼─────────────────────────────────────┐
    │ 3. RISK SCORING (Plugin System)                     │
    │    • 10 specialized threat detectors                │
    │    • Plugin-based architecture                      │
    │    • Produces risk_score (0.0-1.0)                  │
    │    • Lists all detected threats                      │
    └────────────────┬─────────────────────────────────────┘
                     │
    ┌────────────────▼─────────────────────────────────────┐
    │ 4. POLICY DECISION                                   │
    │    • score >= 0.75: BLOCK                           │
    │    • score 0.40-0.74: ALLOW + REDACT OUTPUT        │
    │    • score < 0.40: ALLOW                            │
    │    • Hard-blocks: SSRF, jailbreak, data theft      │
    └────────────────┬─────────────────────────────────────┘
                     │
    ┌────────────────▼─────────────────────────────────────┐
    │ 5. INPUT SANITIZATION                               │
    │    • Detect + redact PII (emails, phones, keys)    │
    │    • Detect path traversal (../)                    │
    │    • Clamp fetch max_length to prevent exfil      │
    │    • Block if critical path args modified          │
    └────────────────┬─────────────────────────────────────┘
                     │
         (If ALLOWED)│
                     │
    ┌────────────────▼──────────────────────────────────┐
    │ 6. DOWNSTREAM EXECUTION                           │
    │    • Call MCP server with sanitized args          │
    │    • 60 second timeout per call                   │
    │    • Automatic auth from servers.yaml             │
    └────────────────┬──────────────────────────────────┘
                     │
    ┌────────────────▼──────────────────────────────────┐
    │ 7. OUTPUT SANITIZATION                            │
    │    • Optionally redact output based on policy     │
    │    • Apply same PII patterns                      │
    └────────────────┬──────────────────────────────────┘
                     │
    ┌────────────────▼──────────────────────────────────┐
    │ 8. COMPREHENSIVE AUDITING                         │
    │    • Log to JSONL audit file                      │
    │    • Capture raw args, sanitized args             │
    │    • Store risk assessment, policy decision       │
    │    • Include timing and outcome                   │
    └────────────────┬──────────────────────────────────┘
                     │
                     │ Response with decision metadata
                     │
    ┌────────────────▼──────────────────────────────────┐
    │       Return to LLM Client                        │
    │  (allowed/redacted/blocked, metadata, result)    │
    └───────────────────────────────────────────────────┘

System Layers

┌─────────────────────────────────────────────────────────┐
│  Policy Layer                                           │
│  (Decide: block, redact, allow based on risk score)    │
└─────────────────────────────────────────────────────────┘
              ↑
┌─────────────────────────────────────────────────────────┐
│  Risk Scoring Layer                                     │
│  (10 plugins analyze for threat patterns)              │
│  - JailbreakDetector                                   │
│  - SSRFDetector                                        │
│  - SQLInjectionDetector                               │
│  - PathTraversalDetector                              │
│  - ExfiltrationDetector                               │
│  - DataTheftDetector                                  │
│  - CodeExtractionDetector                             │
│  - EnumerationDetector                                │
│  - OperationalDisguiseDetector                        │
│  - PayloadSizeDetector                                │
└─────────────────────────────────────────────────────────┘
              ↑
┌─────────────────────────────────────────────────────────┐
│  Input/Output Sanitization Layer                        │
│  (Redact PII, detect traversal, clamp sizes)           │
└─────────────────────────────────────────────────────────┘
              ↑
┌─────────────────────────────────────────────────────────┐
│  Rate Limiting Layer                                    │
│  (Sliding window per user, 60 calls/60s default)       │
└─────────────────────────────────────────────────────────┘
              ↑
┌─────────────────────────────────────────────────────────┐
│  Audit & Logging Layer                                  │
│  (JSONL format, complete request/response trace)       │
└─────────────────────────────────────────────────────────┘

Core Components

1. Main Gateway Server (server.py)

Lines: ~600 | Purpose: HTTP MCP server with main secure_call tool

Key Method:

@mcp.tool()
async def secure_call(
    user_id: str,                    # Logical user (e.g., "admin", "judge-1")
    server: str,                     # Downstream server key (e.g., "ultimate_scraper")
    tool: str,                       # Tool name (e.g., "searchEventListings")
    arguments: dict,                 # Tool arguments
    llm_context: Optional[str] = None # Optional prompt for risk analysis
) -> SecureCallOutput

Request Processing:

  1. Rate limit check (per-user)
  2. Server/tool validation
  3. Risk scoring (all 10 plugins)
  4. Policy decision (block, redact, allow)
  5. Input sanitization
  6. Downstream execution
  7. Output sanitization
  8. Audit logging

Response Fields:

  • allowed - Security decision (boolean)
  • redacted - Output was sanitized (boolean)
  • reason - Decision explanation
  • risk_score - Computed risk (0.0-1.0)
  • risk_factors - List of detected threats
  • policy_decision - Label: "allow", "redacted", "blocked", "error", "timeout"
  • execution_time_ms - Performance metric
  • downstream_result - Actual tool output

2. Risk Scoring Engine (risk_model.py)

Lines: ~150 | Purpose: Plugin-based threat detection and risk aggregation

Architecture:

class PluginRegistry:
    def scan_all(user_id, server, tool, args, llm_context) -> Dict:
        # Run all enabled plugins in sequence
        # Aggregate results into composite risk score
        # Combine threat reasons and flags

Plugin Execution:

  1. Each plugin runs independently
  2. Produces individual risk_score (0.0-1.0)
  3. Returns reasons and flags
  4. Results aggregated:
    • Sum all scores (capped at 1.0)
    • Merge all reasons into single list
    • Combine all flags into single dict

Special Handling:

  • native/code_interpreter → Base risk 0.8 (inherently dangerous)
  • web-search → Risk capped at 0.35 (read-only access)
  • ultimate_scraper + competitor language → Flagged as data theft

3. Threat Detection Plugins (plugins/builtin/)

See Threat Detection Plugins section below.

4. Policy Enforcement (policy.py)

Lines: ~200 | Purpose: Multi-threshold decision making

Decision Logic:

if risk_score >= 0.75:
    # HARD BLOCKS (irrespective of score)
    if flags["ssrf_attempt"]: BLOCK
    if flags["malicious_url"]: BLOCK
    if flags["jailbreak_like"]: BLOCK
    if flags["data_theft_like"]: BLOCK

    # Score-based blocks
    BLOCK with reason (e.g., "credential theft detected")

elif risk_score >= 0.40:
    # Medium risk
    ALLOW but REDACT OUTPUT

else:  # risk_score < 0.40
    # Low risk
    ALLOW

Configuration:

HIGH_RISK_BLOCK_THRESHOLD = 0.75        # Environment: HIGH_RISK_BLOCK_THRESHOLD
MEDIUM_RISK_REDACT_THRESHOLD = 0.40    # Environment: MEDIUM_RISK_REDACT_THRESHOLD

5. Input/Output Sanitization (sanitizer.py)

Lines: ~300 | Purpose: PII redaction and path traversal detection

PII Patterns Redacted:

  • Email addresses: [A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+
  • Phone numbers: \+?\d[\d\s\-]{7,}\d
  • Credit card numbers: 13-16 digit sequences
  • API keys: sk-[A-Za-z0-9]{20,} (OpenAI format)
  • Generic long tokens: 32+ character alphanumeric

Path Traversal Detection:

  • Pattern: (\.\.[\\/])+ (catches ../, ..\, etc.)
  • Blocked: [REDACTED_PATH_TRAVERSAL]

Sensitive Path Blocking:

Unix/Linux Paths:

  • /etc (system config)
  • /var/log (logs)
  • /root, /home (user homes)
  • /sys, /proc (kernel interfaces)

Windows Paths:

  • C:\Windows (system directory)
  • C:\ProgramData (application data)
  • C:\Users\Administrator (admin home)
  • C:\config (configuration)

Size Clamping:

  • fetch max_length clamped to 999,999 characters
  • Prevents exfiltration via massive downloads

Blocking Conditions: If critical path arguments (path, filepath, target) are modified by sanitization, block request with:

"Blocked: sanitized critical argument (path traversal or sensitive path detected)."

6. Rate Limiting (rate_limiter.py)

Lines: ~100 | Purpose: Sliding window rate limiting per user

Algorithm: Timestamp-based sliding window

Configuration:

max_calls: int = 60                  # Environment: RATE_LIMIT_MAX_CALLS
window_seconds: int = 60             # Environment: RATE_LIMIT_WINDOW_SECONDS

Implementation:

class RateLimiter:
    def __init__(self):
        self.user_requests = {}  # Dict[user_id] → deque of timestamps
        self.lock = threading.Lock()

    def check_and_increment(user_id: str) -> Tuple[bool, Dict]:
        # Remove timestamps outside window
        # Count remaining in window
        # Return (allowed, info)

Response Info:

{
    "limit": 60,
    "remaining": 42,
    "reset_in_seconds": 45.23,
    "window_seconds": 60,
    "current_count": 18
}

HTTP Headers (on 429):

X-RateLimit-Limit: 60
X-RateLimit-Remaining: 0
X-RateLimit-Reset: 1705316445
Retry-After: 45

7. Audit Logging (audit.py)

Lines: ~200 | Purpose: JSONL audit trail for forensics

Log Format:

{
  "timestamp": "2025-01-15T10:30:45.123456+00:00",
  "user_id": "admin",
  "server": "ultimate_scraper",
  "tool": "searchEventListings",
  "raw_arguments": {...},
  "sanitized_arguments": {...},
  "policy": {
    "allow": true,
    "redact_output": true,
    "reason": "Allowed with redaction: medium risk (0.45)",
    "risk_score": 0.45,
    "flags": {"enumeration_pattern": true}
  },
  "risk": {
    "score": 0.45,
    "reasons": [
      "Broad-scope enumeration pattern detected: 'all cities' + 'eventbrite'"
    ],
    "flags": {"enumeration_pattern": true}
  },
  "outcome": {
    "success": true,
    "redacted": true
  },
  "execution_time_ms": 234.56
}

Append Locations:

  • Modal environment: /mnt/audit/audit.log.jsonl (volume-mounted)
  • Local development: ./audit.log.jsonl, with fallback to parent directory

API Endpoint:

GET /audit/latest → Returns last 100 entries (reverse chronological)

Security Features

Feature 1: Plugin-Based Risk Scoring

Advantage: Extensible, testable, disableable per-plugin

Architecture:

# Each plugin independently detects a threat category
# All plugins run, results aggregated
# Individual plugins can be enabled/disabled at runtime

class JailbreakDetector(ScannerPlugin):
    def scan(...) -> ScanResult:
        # Detects prompt injection, instruction override, etc.
        return ScanResult(
            detected=True,
            risk_score=0.5,
            reasons=["Instruction override pattern: 'ignore all previous'"],
            flags={"jailbreak_like": True}
        )

Feature 2: Multi-Threshold Policy

Advantage: Nuanced decisions beyond simple allow/deny

Thresholds:

  • HIGH (≥0.75): BLOCK - Critical threats detected
  • MEDIUM (0.40-0.74): ALLOW but REDACT OUTPUT - Concerning but not blocking
  • LOW (<0.40): ALLOW - Low risk, no redaction

Hard Blocks (Irrespective of Score):

  • SSRF attempts (internal IPs, cloud metadata)
  • Malicious URLs detected
  • Jailbreak patterns
  • Explicit data theft flags

Feature 3: Input Sanitization

Advantage: Prevents attacks even if they slip past detection

Operations:

  1. Redact PII (emails, phones, API keys)
  2. Detect path traversal
  3. Clamp large fetch requests
  4. Block if critical args modified

Feature 4: Output Sanitization

Advantage: Prevents accidental data leakage in responses

Operations:

  • Apply same PII redaction patterns to response
  • Conditional redaction based on policy decision
  • Logs whether output was redacted

Feature 5: Comprehensive Audit Trail

Advantage: Complete forensics capability

Logged Information:

  • Raw and sanitized arguments
  • Risk score and threat reasons
  • Policy decision and threshold
  • Actual tool output (or error)
  • Execution timing
  • Outcome (success, error, timeout, blocked)

Feature 6: Rate Limiting

Advantage: Prevents abuse and DoS attacks

Mechanism: Per-user sliding window

  • Track request timestamps per user
  • Maintain deque of timestamps within window
  • Remove old timestamps on each check
  • Count remaining requests

Configuration: 60 calls/60 seconds (configurable)


Threat Detection Plugins

Plugin 1: JailbreakDetector (plugins/builtin/jailbreak.py)

Detects: Prompt injection, instruction override, role-play attacks

Patterns:

  • Override/ignore directives: ignore (all|previous|your) (instructions|rules)
  • Bypass attempts: bypass (safety|security|restrictions)
  • Role-play attacks: act as (malware|hacker|attacker|developer)
  • Secret disclosure: reveal.*secret, tell me.*api key
  • Instruction replacement: forget.*previous instructions

Token Detection:

  • OpenAI tokens: sk-[A-Za-z0-9]{20,}
  • Generic long tokens: 32+ character sequences

Risk Score:

  • Jailbreak pattern detected: +0.5
  • Secrets found: +0.3
  • Combined max: 0.8

Response Example:

ScanResult(
    detected=True,
    risk_score=0.6,
    reasons=["Instruction override pattern detected: 'ignore all previous instructions'",
             "Potential secret token found in arguments"],
    flags={"jailbreak_like": True, "potential_secrets": True}
)

Plugin 2: SSRFDetector (plugins/builtin/ssrf.py)

Detects: Server-Side Request Forgery targeting internal/cloud infrastructure

Applicable Servers: fetch, scraper, ultimate_scraper, web-search, jina-ai

Protected IP Ranges:

  • Loopback: 127.0.0.1, ::1
  • Private: 10.0.0.0/8, 192.168.0.0/16, 172.16.0.0/12
  • Link-local: 169.254.0.0/16

Cloud Metadata Endpoints:

  • AWS: 169.254.169.254/latest/meta-data
  • GCP: metadata.google.com, 169.254.169.254
  • Azure: metadata.internal
  • Alibaba: imds.aliyuncs.com

Malicious URL Patterns:

  • Credential injection: http://attacker.com@example.com (@ tricks)
  • Known malicious domains: evil.com, attacker.com
  • Executable downloads: URLs ending in .exe
  • Phishing domains: phishing.*

Risk Score:

  • SSRF to internal IP: +0.70 (flag: ssrf_attempt)
  • SSRF to metadata: +0.70
  • Malicious URL: +0.50 (flag: malicious_url)
  • Combined max: 1.0

Response Example:

ScanResult(
    detected=True,
    risk_score=0.85,
    reasons=["SSRF attempt to AWS metadata endpoint detected",
             "URL pattern matches known malicious domain"],
    flags={"ssrf_attempt": True, "malicious_url": True}
)

Plugin 3: SQLInjectionDetector (plugins/builtin/sql_injection.py)

Detects: SQL injection attacks

Destructive Patterns:

  • DROP TABLE, DROP DATABASE
  • DELETE FROM, TRUNCATE
  • INSERT INTO, UPDATE (modify data)
  • UNION SELECT, UNION ALL

Authentication Bypass:

  • admin'--, admin' OR 1=1
  • Comment syntax: --, /**/

Risk Score:

  • SQL injection pattern: 0.35 (no hard block, aggregates with other threats)

Plugin 4: PathTraversalDetector (plugins/builtin/path_traversal.py)

Detects: Directory traversal attacks

Traversal Patterns:

  • Unix: ../, ../../, etc.
  • Windows: ..\, ..\\, etc.
  • Encoded: %2e%2e, ..%5c

Sensitive Paths Blocked:

Unix/Linux:

  • /etc (system config)
  • /var/log (logs)
  • /root, /home (user homes)
  • /sys, /proc (kernel)
  • /dev (devices)

Windows:

  • C:\Windows (system)
  • C:\ProgramData
  • C:\Users\Administrator
  • C:\config
  • C:\Program Files

Risk Score:

  • Traversal detected: +0.35
  • Sensitive path access: +0.20
  • Combined max: 0.55

Plugin 5: ExfiltrationDetector (plugins/builtin/exfiltration.py)

Detects: Data exfiltration attacks

Exfiltration Intent:

  • exfiltrate, send secrets, upload to webhook
  • copy to external, export sensitive
  • email me the results

Sensitive Servers:

  • filesystem, database, api, auth_service, credentials
  • Any server + exfiltration intent = flag

Dangerous Tools:

  • delete, drop, truncate, chmod, chown
  • delete + exfiltration intent = flag

Network-Based Exfiltration:

  • fetch + exfiltration intent = +0.40 (using fetch to exfiltrate)

Risk Score:

  • Exfiltration intent: +0.30
  • Sensitive server + intent: +0.15
  • Dangerous tool + intent: +0.15
  • Network exfiltration: +0.40
  • Combined max: 1.0

Plugin 6: DataTheftDetector (plugins/builtin/data_theft.py)

Detects: Competitive intelligence and unauthorized data harvesting

Competitor Targeting:

  • competitor.*private, rival's.*emails
  • competitor's events, private competitor data
  • Patterns: competitor + private/confidential = flag

Private Event Harvesting:

  • extract.*all.*attendees.*private.*event
  • harvest.*private.*event.*participant.*data
  • Intent: stealing attendee lists from private events

Bulk Enumeration:

  • all cities from eventbrite, every event worldwide
  • compile all listings
  • Geographic + platform enumeration = flag

Credential Harvesting:

  • extract.*javascript.*from.*event.*platform
  • pull eventbrite api key from page
  • Code + credentials = high risk

Unverified Authority Claims:

  • "I'm authorized", "this is for research"
  • "legally", "compliance", "testing"
  • Claims + scraping + platform = flag

Risk Score:

  • Competitor targeting: +0.45
  • Private event + harvesting: +0.40
  • Bulk enumeration: +0.35
  • Credential theft: +0.50
  • Unverified authority + scraping: +0.25
  • Combined max: 1.0

Plugin 7: CodeExtractionDetector (plugins/builtin/code_extraction.py)

Detects: Source code and intellectual property theft

Code Extraction Intent:

  • extract.*javascript, pull source code
  • understand implementation, reverse engineer
  • download.*source, copy.*code

Target Keywords:

  • website, site, eventbrite, ticketmaster, meetup, facebook
  • competitor, rival, platform

Keyword Combinations:

  • Code + website = flag
  • Code + competitor = flag
  • Reverse engineering + any = flag

Risk Score:

  • Code extraction pattern: +0.45
  • Code + target: +0.50
  • Reverse engineering: +0.55

Plugin 8: EnumerationDetector (plugins/builtin/enumeration.py)

Detects: Broad-scope mass data collection

Enumeration Patterns:

  • all X across all Y: "all cities across all events"
  • comprehensive/complete/full dataset
  • every X in Y: "every event in every city"
  • all pages without limit (pagination bypass)

Heuristic Scoring:

  • Multiple scope keywords (2+) + targets (1+): +0.20
  • Aggregation + scope + targets: +0.25
  • Action (scrape) + scope + targets: +0.20
  • Geographic + enumeration: +0.25
  • Pagination bypass: +0.20
  • Mass operation: +0.15

Risk Score:

  • Combined heuristics, typically 0.15-0.35

Plugin 9: OperationalDisguiseDetector (plugins/builtin/operational_disguise.py)

Detects: Credential theft disguised as operational tasks

Operational Disguise Tactics:

  • Testing: test credential rotation, verify API keys work
  • Debugging: debug deployment, pull database credentials
  • Backup: backup encryption keys, restore certificates
  • Rotation: rotate api keys
  • Config: environment configuration with secrets

Sensitive Credentials Targeted:

  • API keys, secrets, passwords, tokens, bearers
  • Database passwords, private keys, SSH keys
  • Certificates, encryption keys

Risk Score:

  • Operational disguise + credential: +0.35-0.65
  • Fetch + explicit credential request: +0.60-0.65 (CRITICAL)

Plugin 10: PayloadSizeDetector (plugins/builtin/payload_size.py)

Detects: Oversized requests for exfiltration

Argument Size:

  • 1000 characters: MEDIUM flag

Fetch Size Requests:

  • 250k-500k characters: +0.20

  • 500k characters: +0.30

  • 1M characters: +0.40 (CRITICAL)


Policy Enforcement

Decision Making Flow

Input: Risk Score (0.0-1.0) + Threat Flags

Step 1: Check Hard Blocks
├─ flags["ssrf_attempt"] → BLOCK (0.70+)
├─ flags["malicious_url"] → BLOCK
├─ flags["jailbreak_like"] → BLOCK
├─ flags["data_theft_like"] → BLOCK
└─ flags["exfiltration_like"] → BLOCK

Step 2: Score-Based Decision
├─ score >= 0.75 → BLOCK
│   └─ Include specific threat reason
├─ score >= 0.40 → ALLOW + REDACT OUTPUT
│   └─ Reason: "Allowed with redaction: medium risk (X.XX)"
└─ score < 0.40 → ALLOW
    └─ Reason: "Allowed: low risk (X.XX)"

Step 3: Return Decision
└─ PolicyDecision(
    allow: bool,
    redact_output: bool,
    reason: str,
    risk_score: float,
    flags: Dict[str, bool]
)

Configuration Constants

# Thresholds (Environment Variables)
HIGH_RISK_BLOCK_THRESHOLD = 0.75              # RISK_BLOCK_THRESHOLD
MEDIUM_RISK_REDACT_THRESHOLD = 0.40           # RISK_REDACT_THRESHOLD

Policy Decision Labels

Label Meaning Action
allow Low risk, fully allowed Execute tool, return full result
redacted Medium risk, output sanitized Execute tool, redact output
blocked High risk, denied Return error, don't execute
error Request validation/processing failed Return error details
timeout Tool execution exceeded 60s Return timeout error

Input/Output Sanitization

PII Detection & Redaction

Patterns Detected:

Type Pattern Redacted As
Email [A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+ [REDACTED_EMAIL]
Phone \+?\d[\d\s\-]{7,}\d [REDACTED_PHONE]
Credit Card 13-16 digit sequences [REDACTED_CARD]
API Key (OpenAI) sk-[A-Za-z0-9]{20,} [REDACTED_KEY]
Generic Token 32+ character alphanumeric [REDACTED_TOKEN]

Path Traversal Detection

Detection:

  • Pattern: (\.\.[\\/])+ (catches ../, ..\, etc.)
  • Applied to all string-type arguments

Blocking: If path-type arguments contain traversal:

"Blocked: sanitized critical argument (path traversal detected)."

Sensitive Path Blocking

Unix/Linux:

/etc, /var/log, /root, /home, /sys, /proc

Windows:

C:\Windows, C:\ProgramData, C:\Users\Administrator, C:\config

Blocking: If filesystem path contains sensitive path:

"Blocked: sanitized critical argument (sensitive path detected)."

Size Clamping

Fetch max_length:

  • Clamped to 999,999 characters
  • Prevents downloading massive files for exfiltration
  • Downstream tool enforces actual limit

Rate Limiting

Sliding Window Algorithm

class RateLimiter:
    def __init__(self, max_calls=60, window_seconds=60):
        self.max_calls = max_calls
        self.window_seconds = window_seconds
        self.user_requests = {}  # Dict[user_id] → deque[timestamp]
        self.lock = threading.Lock()

    def check_and_increment(user_id: str) -> Tuple[bool, Dict]:
        with self.lock:
            # Get user's request deque
            requests = self.user_requests.get(user_id, deque())

            # Remove timestamps outside window
            now = time.time()
            window_start = now - self.window_seconds
            while requests and requests[0] < window_start:
                requests.popleft()

            # Check if under limit
            if len(requests) < self.max_calls:
                requests.append(now)
                self.user_requests[user_id] = requests
                return True, {
                    "limit": self.max_calls,
                    "remaining": self.max_calls - len(requests),
                    "reset_in_seconds": self.window_seconds
                }
            else:
                # Rate limited
                reset_time = requests[0] + self.window_seconds
                return False, {
                    "limit": self.max_calls,
                    "remaining": 0,
                    "reset_in_seconds": reset_time - now
                }

Response Headers

Successful Request:

X-RateLimit-Limit: 60
X-RateLimit-Remaining: 42
X-RateLimit-Reset: 1705316445

Rate Limited (429):

X-RateLimit-Limit: 60
X-RateLimit-Remaining: 0
X-RateLimit-Reset: 1705316445
Retry-After: 45

Audit Logging

JSONL Format

Each line is a valid JSON object:

{"timestamp": "...", "user_id": "...", ...}
{"timestamp": "...", "user_id": "...", ...}

Log Entries

Complete audit entry with all context:

{
    "timestamp": ISO8601,              # When request occurred
    "user_id": str,                    # Logical user ID
    "server": str,                     # Server key (ultimate_scraper, etc.)
    "tool": str,                       # Tool name
    "raw_arguments": dict,             # Original args from user
    "sanitized_arguments": dict,       # After PII/traversal redaction
    "policy": {
        "allow": bool,                 # Was allowed?
        "redact_output": bool,         # Should output be redacted?
        "reason": str,                 # Decision explanation
        "risk_score": float,           # 0.0-1.0
        "flags": {                     # Threat flags detected
            "ssrf_attempt": bool,
            "jailbreak_like": bool,
            "data_theft_like": bool,
            ...
        }
    },
    "risk": {
        "score": float,                # Aggregated risk
        "reasons": [str, ...],         # All detected threats
        "flags": dict,                 # All threat flags
    },
    "outcome": {
        "success": bool,               # Tool succeeded?
        "redacted": bool,              # Output was redacted?
        "error": str,                  # If failed, error message
        "category": str,               # Error category
    },
    "execution_time_ms": float         # Request duration
}

Log Location

Environment Variable: AUDIT_LOG_PATH

Default Locations:

  • Modal: /mnt/audit/audit.log.jsonl (volume-mounted)
  • Local: ./audit.log.jsonl
  • Fallback: Parent directory if local fails

API Access

Endpoint: GET /audit/latest

Returns:

{
    "entries": [
        {...},  # Most recent
        {...},
        ...     # Last 100 entries
        {...}   # Oldest
    ],
    "total_available": 1000,  # Total in file
    "returned": 100           # Entries returned
}

Configuration

Configuration Files

servers.yaml - Downstream server definitions

servers:
  web-search:
    display_name: "Web Search (Brave + DuckDuckGo)"
    url: "http://web-search:8001"
    enabled: true
    tags: ["search", "web"]
    description: "General web search..."
    auth:
      type: "bearer"
      header_name: "Authorization"
      env_var: "BRAVE_API_KEY"
    tool_parameters:
      web_search:
        query:
          type: "string"
          required: true
          description: "Search query"

  ultimate_scraper:
    display_name: "Ultimate Event Scraper"
    url: "http://ultimate-scraper:8002"
    enabled: true
    tags: ["events", "scraper"]
    description: "Multi-platform event extraction..."

Environment Variables

Core Configuration:

# Audit logging
AUDIT_LOG_PATH=/mnt/audit/audit.log.jsonl

# Rate limiting (per-user)
RATE_LIMIT_MAX_CALLS=60
RATE_LIMIT_WINDOW_SECONDS=60

# Downstream calls
DOWNSTREAM_TIMEOUT_SECONDS=60

# Risk thresholds
HIGH_RISK_BLOCK_THRESHOLD=0.75
MEDIUM_RISK_REDACT_THRESHOLD=0.40

# Server authentication (per downstream server)
BRAVE_API_KEY=xxx
JINA_API_KEY=xxx
BLAXEL_API_KEY=xxx
TICKETMASTER_API_KEY=xxx

Server Overrides:

# Override individual server config
ULTIMATE_SCRAPER_URL=http://custom-scraper:9999
ULTIMATE_SCRAPER_ENABLED=true
WEB_SEARCH_ENABLED=false

API Reference

POST /tools/secure_call

Main endpoint for secure tool invocation

Request:

{
  "user_id": "admin",
  "server": "ultimate_scraper",
  "tool": "searchEventListings",
  "arguments": {
    "url": "https://example.com/events",
    "location": "New York",
    "keyword": "jazz"
  },
  "llm_context": "The user is searching for jazz events in NYC"
}

Response (200 OK):

{
  "allowed": true,
  "redacted": false,
  "reason": "Allowed: low risk (0.15)",
  "risk_score": 0.15,
  "risk_factors": [
    "Broad-scope enumeration pattern detected"
  ],
  "policy_decision": "allow",
  "execution_time_ms": 234.56,
  "downstream_result": {
    "events": [...]
  }
}

Response (400 Bad Request):

{
  "allowed": false,
  "reason": "Invalid server: 'unknown_server'",
  "error_category": "validation",
  "policy_decision": "error"
}

Response (429 Too Many Requests):

{
  "allowed": false,
  "reason": "Rate limit exceeded: 60 calls/60s",
  "remaining": 0,
  "reset_in_seconds": 45.23,
  "policy_decision": "rate_limited"
}

Response (403 Forbidden - Policy Blocked):

{
  "allowed": false,
  "reason": "Blocked: credential theft detected",
  "risk_score": 0.82,
  "risk_factors": [
    "Operational disguise pattern with credential fetch",
    "Explicit API key extraction attempt"
  ],
  "policy_decision": "blocked"
}

GET /tools/list

List all available tools (triggers auto-discovery)

Response:

{
  "tools": {
    "web-search": {
      "web_search": {
        "description": "Search the internet...",
        "inputSchema": {...}
      }
    },
    "ultimate_scraper": {
      "searchEventListings": {...},
      "scrapeEventPage": {...}
    },
    ...
  }
}

POST /tools/refresh

Force re-discovery of downstream tools

Response:

{
  "refreshed": true,
  "tools_discovered": 25,
  "servers_contacted": 6,
  "timestamp": "2025-01-15T10:30:45Z"
}

GET /config/servers

Get server configuration (servers.yaml)

Response:

{
  "servers": [
    {
      "key": "web-search",
      "display_name": "Web Search",
      "url": "http://web-search:8001",
      "enabled": true,
      ...
    }
  ]
}

GET /audit/latest

Get last 100 audit entries

Response:

{
  "entries": [
    {
      "timestamp": "2025-01-15T10:35:00Z",
      "user_id": "admin",
      "policy": {...},
      "risk": {...},
      ...
    },
    ...
  ],
  "total_available": 5432,
  "returned": 100
}

Integration Guide

Upstream Integration (LLM Clients)

Claude Desktop Integration

# In Claude Desktop config
{
  "mcpServers": {
    "security-gateway": {
      "command": "python",
      "args": ["/path/to/security_gateway/sse_server.py"],
      "env": {
        "AUDIT_LOG_PATH": "/logs/audit.jsonl",
        "RATE_LIMIT_MAX_CALLS": "60"
      }
    }
  }
}

HTTP API Integration

import httpx

async def call_tool(user_id, server, tool, arguments):
    async with httpx.AsyncClient() as client:
        response = await client.post(
            "http://localhost:8000/tools/secure_call",
            json={
                "user_id": user_id,
                "server": server,
                "tool": tool,
                "arguments": arguments
            }
        )
        result = response.json()

        if result["allowed"]:
            return result["downstream_result"]
        else:
            raise SecurityException(result["reason"])

Downstream Integration (MCP Servers)

servers.yaml Configuration:

servers:
  my-tool:
    url: "http://my-tool:8000"
    enabled: true
    auth:
      type: "bearer"
      env_var: "MY_TOOL_API_KEY"

Tool Discovery:

  • Gateway auto-discovers tools via /tools/list
  • Calls tool with sanitized arguments
  • Receives tool output
  • Applies output sanitization
  • Returns to LLM client

Deployment

Local Development

# Install dependencies
pip install -r requirements.txt

# Set environment variables
export AUDIT_LOG_PATH=./audit.log.jsonl
export RATE_LIMIT_MAX_CALLS=60
export HIGH_RISK_BLOCK_THRESHOLD=0.75

# Start gateway
python server.py
# Available at: http://localhost:8000

Docker

FROM python:3.11-slim

WORKDIR /app
COPY . .
RUN pip install -r requirements.txt

EXPOSE 8000
CMD ["python", "server.py"]

Run:

docker build -t security-gateway .
docker run -p 8000:8000 \
  -e AUDIT_LOG_PATH=/var/log/audit.jsonl \
  -v /var/log:/var/log \
  security-gateway

Modal Deployment

# Deploy
modal deploy modal_app.py

# Configure secrets
modal secret create mcp-config \
  --env AUDIT_LOG_PATH=/mnt/audit/audit.jsonl \
  --env RATE_LIMIT_MAX_CALLS=100

Performance & Optimization

Per-Request Overhead

Component Time Notes
Rate limit check <1ms O(1) with lock
Plugin scanning 50-200ms All 10 plugins sequentially
Sanitization 10-50ms Regex passes
Downstream call 1-60s Network latency
Output sanitization 10-50ms Same as input
Audit write <1ms Append to file
Total (without downstream) 100-300ms Gateway overhead
Total (with downstream) 1-60s Dominated by tool

Caching

  • Plugin instances: Cached at startup (reused)
  • Discovered tools: Cached in memory + file
  • Server configurations: Loaded from YAML, cached
  • User request deque: Maintained in memory per user

Scalability Considerations

Single Instance:

  • Thread-safe rate limiting with lock
  • Per-user request tracking in memory
  • Audit log as local JSONL file

Multiple Instances (Distributed):

  • Rate limiting not distributed (per-instance)
    • Solution: Use external rate limiter (Redis)
  • Audit logs local to each instance
    • Solution: Use centralized logging (CloudWatch, etc.)
  • Tool discovery cached per instance
    • Solution: Refresh endpoint forces re-discovery

Troubleshooting

Issue 1: "Rate limit exceeded"

Symptom:

Error: 429 Too Many Requests
Retry-After: 45

Cause: User exceeded 60 calls/60 seconds

Solution:

# Check limit configuration
echo $RATE_LIMIT_MAX_CALLS

# Increase limit if needed
export RATE_LIMIT_MAX_CALLS=120

# Or decrease window
export RATE_LIMIT_WINDOW_SECONDS=30

Issue 2: "Blocked: credential theft detected"

Symptom:

{
  "allowed": false,
  "reason": "Blocked: credential theft detected",
  "risk_score": 0.82
}

Cause: Request matched credential theft patterns

Legitimate Cases:

  • Testing credential rotation (operational context)
  • Admin auditing (documented authorization)
  • Backup/restore procedures

Solution:

# Add context to help scoring
response = await client.post(
    "http://localhost:8000/tools/secure_call",
    json={
        "user_id": "admin",
        "server": "...",
        "tool": "...",
        "arguments": {...},
        "llm_context": "This is authorized credential rotation testing by the admin"
    }
)

Context helps plugins differentiate legitimate from malicious patterns.

Issue 3: "Output redacted"

Symptom:

{
  "allowed": true,
  "redacted": true,
  "reason": "Allowed with redaction: medium risk (0.45)"
}

Cause: Medium risk (0.40-0.74) → output was sanitized

What Happened:

  • Some PII was detected and redacted
  • Patterns: emails, phones, API keys
  • Or generic tokens found

Example:

Original: "Contact: admin@company.com, key: sk-abc123..."
Redacted: "Contact: [REDACTED_EMAIL], key: [REDACTED_KEY]..."

Issue 4: "Tool not found"

Symptom:

{
  "allowed": false,
  "reason": "Server 'my-tool' not found in configuration",
  "error_category": "validation"
}

Cause: Server not in servers.yaml or not discovered

Solution:

# Check available servers
curl http://localhost:8000/tools/list

# Refresh discovery
curl -X POST http://localhost:8000/tools/refresh

# Check servers.yaml
cat /path/to/servers.yaml

# Verify server is running
curl http://my-tool:8000/health

Issue 5: "Downstream timeout"

Symptom:

{
  "allowed": true,
  "policy_decision": "timeout",
  "reason": "Tool execution exceeded 60 seconds"
}

Cause: Tool took longer than timeout

Solution:

# Increase timeout
export DOWNSTREAM_TIMEOUT_SECONDS=120

# Or optimize downstream server
# Check server performance
time curl -X POST http://downstream:8000/tools/...

Contributing

Adding a New Threat Detector

  1. Create plugin file: plugins/builtin/my_detector.py
from plugins.base import ScannerPlugin, ScanResult

class MyDetector(ScannerPlugin):
    def scan(self, user_id, server_key, tool, arguments, llm_context=None) -> ScanResult:
        # Implement threat detection
        detected = "malicious_pattern" in str(arguments).lower()

        return ScanResult(
            plugin_name="MyDetector",
            detected=detected,
            risk_score=0.5 if detected else 0.0,
            reasons=["Pattern matched"] if detected else [],
            flags={"my_flag": detected},
            metadata={}
        )

# Export instance for dynamic loading
plugin = MyDetector()
  1. Update plugin registry:
# In plugins/loader.py, plugin auto-discovers from /builtin/
# No changes needed - plugin will be auto-loaded
  1. Test:
curl -X POST http://localhost:8000/tools/secure_call \
  -d '{"user_id": "test", "server": "web-search", "tool": "search", "arguments": {"query": "...malicious..."}}'

Updating Threat Detection Rules

Modify individual plugin files:

# In plugins/builtin/jailbreak.py
OVERRIDE_PATTERNS = [
    r"ignore all previous instructions",
    r"bypass security",
    # Add new pattern
    r"new custom pattern",
]

License

Same as parent project (MCP Security Hackathon)


Last Updated: 2025-11-28
Maintainer: MCP Security Team
Architecture: Plugin-based risk scoring with multi-threshold policy enforcement
Threat Detectors: 10 specialized plugins
Security Thresholds: 2 configurable (block: 0.75, redact: 0.40)
Rate Limiting: Per-user sliding window
Audit Trail: JSONL format with complete request/response logging
Production Ready: Yes