yuki-sui's picture
Upload 169 files
ed71b0e verified
"""
Enumeration and broad-scope data collection detector.
Detects attempts to enumerate large amounts of data across multiple targets,
locations, or categories. Flags requests that indicate mass data collection
even when using vague or legitimate-sounding language.
"""
from __future__ import annotations
import re
from typing import Any, Dict, Optional
from ..base import ScannerPlugin, ScanResult, PluginMetadata
# Scope enumeration patterns (detecting "all X" or "every X" across broad categories)
ENUMERATION_PATTERNS = [
# "all X" or "every X" patterns with broad scope
r"\b(all|every|complete|full|entire|every single)\b.*\b(cities|locations|regions|countries|states|areas|zones|venues|events|attendees|users|members)\b",
# Aggregation + breadth indicators
r"\b(comprehensive|complete|full|exhaustive|entire|all-inclusive|massive|huge)\b.*\b(dataset|map|database|collection|archive|catalog|list|inventory)\b",
r"\b(dataset|archive|catalog|collection|map)\b.*\b(all|every|complete|full|entire)\b",
# "Across all/every" pattern (geographic enumeration)
r"\b(across|throughout|over)\s+(all|every)\b.*\b(cities|regions|locations|countries|states|areas)\b",
r"\b(all|every)\b.*\b(US|United States|cities|locations|regions)\b.*\b(across|throughout)\b",
# Multiple location collection
r"\b(scrape|fetch|download|extract|collect|harvest|gather|aggregate)\b.*\b(multiple|many|numerous|all)\b.*\b(cities|locations|venues|regions|areas)\b",
# Unlimited result collection
r"\b(all|every|complete|every single|everything|without.*limit|no.*limit|unlimited)\b.*\b(results?|events?|records?|entries?|items?|data|information)\b",
r"\b(return|get|fetch|retrieve)\b.*\b(all|every|complete|entire|total|unlimited)\b.*\b(results?|records?|events?|data)\b",
# "Create X from Y" where X=aggregated view and Y=multiple sources
r"\b(create|build|generate|compile|assemble)\b.*\b(map|dataset|database|collection|inventory|catalog)\b.*\b(from|of|for)\b.*\b(all|every|multiple|every|each)\b.*\b(cities|locations|events|regions)\b",
r"\b(map|database|dataset|collection|inventory)\b.*\b(of|for)\b.*\b(all|every|complete|comprehensive)\b.*\b(events?|attendees?|users?|venues?)\b",
# Geographic scope with vague intent
r"\b(map|collection|archive|inventory|catalog)\b.*\b(all|every)\b.*\b(US|city|cities|location|venue|region)\b.*\b(events?|data)\b",
# Pagination bypass (requesting without normal pagination limits)
r"\b(skip|skip.*limit|ignore.*pagination|all.*pages?|complete.*pages?|entire.*page|every.*page)\b",
r"\b(all|every|complete)\b.*\b(pages?|results?|records?)\b.*\b(without|no|ignore)\b.*\b(limit|pagination|offset)\b",
# Mass scanning/enumeration terminology
r"\b(enumerate|enumerate.*all|discover.*all|find.*all|locate.*all)\b.*\b(events?|cities|locations|venues|users?|attendees?)\b",
r"\b(mass|bulk|batch|wholesale)\b.*\b(data|collection|fetch|scrape|gather|download)\b",
]
# Scope indicators (words that suggest broad collection)
BROAD_SCOPE_KEYWORDS = {
"all", "every", "each", "complete", "comprehensive", "full", "entire",
"exhaustive", "total", "overall", "global", "worldwide", "nationwide",
"unlimited", "everything", "mass", "bulk", "batch", "wholesale"
}
# Target indicators (what is being collected)
TARGET_KEYWORDS = {
"events", "attendees", "users", "members", "records", "data", "information",
"cities", "locations", "venues", "regions", "areas", "zones", "states",
"countries", "sites", "platforms", "pages", "results", "entries"
}
# Aggregation indicators (intent to consolidate data)
AGGREGATION_KEYWORDS = {
"map", "dataset", "database", "collection", "archive", "catalog", "inventory",
"aggregate", "compile", "consolidate", "combine", "merge", "centralize",
"assemble", "gather", "accumulate"
}
# Action words that apply to collections
ACTION_KEYWORDS = {
"scrape", "fetch", "download", "extract", "collect", "harvest", "gather",
"crawl", "scan", "enumerate", "discover", "find", "locate", "retrieve",
"access", "query", "bulk", "mass"
}
class EnumerationDetector(ScannerPlugin):
"""Detects broad-scope enumeration and mass data collection attempts."""
def __init__(self):
super().__init__(
metadata=PluginMetadata(
name="EnumerationDetector",
version="1.0.0",
description="Detects enumeration of large-scale datasets and broad-scope data collection even with vague language",
author="SecurityGateway",
)
)
def scan(
self,
user_id: Optional[str],
server_key: str,
tool: str,
arguments: Dict[str, Any],
llm_context: Optional[str] = None,
) -> ScanResult:
"""
Scan for enumeration and mass collection patterns.
Detects requests that indicate:
- "All X across all Y" patterns (geographic/scope enumeration)
- Aggregation intent with broad scope ("create comprehensive map of all events")
- Mass data collection terminology
- Scope indicators without normal pagination/limits
- Bulk collection across multiple targets/locations
Args:
user_id: User identifier
server_key: Server key
tool: Tool name
arguments: Tool arguments
llm_context: Optional context
Returns:
ScanResult with enumeration detection
"""
context = (llm_context or "") + " " + self._flatten_json(arguments)
context_lower = context.lower()
reasons = []
flags = {}
risk_score = 0.0
# 1) Check for explicit enumeration patterns
enumeration_pattern_detected = self._contains_pattern(context, ENUMERATION_PATTERNS)
if enumeration_pattern_detected:
reasons.append("Broad-scope enumeration pattern detected (all X across all Y).")
flags["enumeration_pattern"] = True
risk_score += 0.35
# 2) Keyword-based heuristic: scope + target + action/aggregation
scope_keywords_found = self._count_keywords(context_lower, BROAD_SCOPE_KEYWORDS)
target_keywords_found = self._count_keywords(context_lower, TARGET_KEYWORDS)
action_found = self._count_keywords(context_lower, ACTION_KEYWORDS) > 0
aggregation_found = self._count_keywords(context_lower, AGGREGATION_KEYWORDS) > 0
# Score based on keyword combinations
keyword_risk = 0.0
if scope_keywords_found >= 2 and target_keywords_found >= 1:
# "all cities" + "events" = broad scope
keyword_risk += 0.20
reasons.append("Multiple broad-scope qualifiers with collection targets.")
if aggregation_found and scope_keywords_found >= 1 and target_keywords_found >= 1:
# "comprehensive map" + "of all events" = mass collection
keyword_risk += 0.25
reasons.append("Aggregation intent with broad-scope data collection.")
flags["aggregation_with_enumeration"] = True
if action_found and scope_keywords_found >= 2 and target_keywords_found >= 1:
# "scrape" + "all cities" + "events" = enumeration attack
keyword_risk += 0.20
reasons.append("Collection action targeting broad scope across multiple targets.")
flags["collection_across_scope"] = True
risk_score += keyword_risk
# 3) Detect geographic/location-based enumeration
has_location_qualifier = any(word in context_lower for word in [
"cities", "locations", "regions", "states", "countries", "venues", "areas",
"US", "USA", "United States", "worldwide", "global", "nationwide"
])
has_enumeration_verb = any(word in context_lower for word in [
"all", "every", "each", "enumerate", "discover all", "find all"
])
if has_location_qualifier and has_enumeration_verb:
reasons.append("Geographic enumeration: targeting all locations or cities.")
flags["geographic_enumeration"] = True
risk_score += 0.25
# 4) Detect pagination bypass attempts
if any(word in context_lower for word in [
"skip limit", "ignore pagination", "all pages", "no limit", "unlimited"
]):
reasons.append("Pagination bypass or unlimited result collection detected.")
flags["pagination_bypass"] = True
risk_score += 0.20
# 5) Detect mass operation indicators
if any(word in context_lower for word in ["mass ", "bulk ", "batch ", "wholesale"]):
if action_found or aggregation_found:
reasons.append("Mass operation with data collection intent.")
flags["mass_operation"] = True
risk_score += 0.15
# 6) For event/ticketing platforms specifically - sensitive scope
event_platforms = ["eventbrite", "ticketmaster", "meetup", "eventful"]
if any(platform in context_lower for platform in event_platforms):
if any(word in context_lower for word in ["all events", "every event", "all cities", "all venues"]):
reasons.append("Broad event enumeration from ticketing/event platform.")
flags["event_platform_enumeration"] = True
risk_score += 0.25
detected = bool(reasons)
return ScanResult(
plugin_name=self.get_metadata().name,
detected=detected,
risk_score=min(1.0, risk_score),
reasons=reasons if reasons else ["No enumeration patterns detected."],
flags=flags,
)
def _flatten_json(self, value: Any) -> str:
"""Flatten nested structures to string for pattern matching."""
if isinstance(value, dict):
return " ".join(self._flatten_json(v) for v in value.values())
if isinstance(value, list):
return " ".join(self._flatten_json(v) for v in value)
return str(value)
def _contains_pattern(self, text: str, patterns: list) -> bool:
"""Check if text matches any pattern."""
for pat in patterns:
if re.search(pat, text, flags=re.IGNORECASE):
return True
return False
def _count_keywords(self, text: str, keywords: set) -> int:
"""Count how many keywords appear in text (case-insensitive)."""
count = 0
for keyword in keywords:
# Use word boundary to avoid partial matches
if re.search(r"\b" + re.escape(keyword) + r"\b", text, flags=re.IGNORECASE):
count += 1
return count
# Export as module-level plugin for auto-loading
plugin = EnumerationDetector()