| """
|
| Enumeration and broad-scope data collection detector.
|
|
|
| Detects attempts to enumerate large amounts of data across multiple targets,
|
| locations, or categories. Flags requests that indicate mass data collection
|
| even when using vague or legitimate-sounding language.
|
| """
|
|
|
| from __future__ import annotations
|
|
|
| import re
|
| from typing import Any, Dict, Optional
|
|
|
| from ..base import ScannerPlugin, ScanResult, PluginMetadata
|
|
|
|
|
|
|
| ENUMERATION_PATTERNS = [
|
|
|
| r"\b(all|every|complete|full|entire|every single)\b.*\b(cities|locations|regions|countries|states|areas|zones|venues|events|attendees|users|members)\b",
|
|
|
|
|
| r"\b(comprehensive|complete|full|exhaustive|entire|all-inclusive|massive|huge)\b.*\b(dataset|map|database|collection|archive|catalog|list|inventory)\b",
|
| r"\b(dataset|archive|catalog|collection|map)\b.*\b(all|every|complete|full|entire)\b",
|
|
|
|
|
| r"\b(across|throughout|over)\s+(all|every)\b.*\b(cities|regions|locations|countries|states|areas)\b",
|
| r"\b(all|every)\b.*\b(US|United States|cities|locations|regions)\b.*\b(across|throughout)\b",
|
|
|
|
|
| r"\b(scrape|fetch|download|extract|collect|harvest|gather|aggregate)\b.*\b(multiple|many|numerous|all)\b.*\b(cities|locations|venues|regions|areas)\b",
|
|
|
|
|
| r"\b(all|every|complete|every single|everything|without.*limit|no.*limit|unlimited)\b.*\b(results?|events?|records?|entries?|items?|data|information)\b",
|
| r"\b(return|get|fetch|retrieve)\b.*\b(all|every|complete|entire|total|unlimited)\b.*\b(results?|records?|events?|data)\b",
|
|
|
|
|
| r"\b(create|build|generate|compile|assemble)\b.*\b(map|dataset|database|collection|inventory|catalog)\b.*\b(from|of|for)\b.*\b(all|every|multiple|every|each)\b.*\b(cities|locations|events|regions)\b",
|
| r"\b(map|database|dataset|collection|inventory)\b.*\b(of|for)\b.*\b(all|every|complete|comprehensive)\b.*\b(events?|attendees?|users?|venues?)\b",
|
|
|
|
|
| r"\b(map|collection|archive|inventory|catalog)\b.*\b(all|every)\b.*\b(US|city|cities|location|venue|region)\b.*\b(events?|data)\b",
|
|
|
|
|
| r"\b(skip|skip.*limit|ignore.*pagination|all.*pages?|complete.*pages?|entire.*page|every.*page)\b",
|
| r"\b(all|every|complete)\b.*\b(pages?|results?|records?)\b.*\b(without|no|ignore)\b.*\b(limit|pagination|offset)\b",
|
|
|
|
|
| r"\b(enumerate|enumerate.*all|discover.*all|find.*all|locate.*all)\b.*\b(events?|cities|locations|venues|users?|attendees?)\b",
|
| r"\b(mass|bulk|batch|wholesale)\b.*\b(data|collection|fetch|scrape|gather|download)\b",
|
| ]
|
|
|
|
|
| BROAD_SCOPE_KEYWORDS = {
|
| "all", "every", "each", "complete", "comprehensive", "full", "entire",
|
| "exhaustive", "total", "overall", "global", "worldwide", "nationwide",
|
| "unlimited", "everything", "mass", "bulk", "batch", "wholesale"
|
| }
|
|
|
|
|
| TARGET_KEYWORDS = {
|
| "events", "attendees", "users", "members", "records", "data", "information",
|
| "cities", "locations", "venues", "regions", "areas", "zones", "states",
|
| "countries", "sites", "platforms", "pages", "results", "entries"
|
| }
|
|
|
|
|
| AGGREGATION_KEYWORDS = {
|
| "map", "dataset", "database", "collection", "archive", "catalog", "inventory",
|
| "aggregate", "compile", "consolidate", "combine", "merge", "centralize",
|
| "assemble", "gather", "accumulate"
|
| }
|
|
|
|
|
| ACTION_KEYWORDS = {
|
| "scrape", "fetch", "download", "extract", "collect", "harvest", "gather",
|
| "crawl", "scan", "enumerate", "discover", "find", "locate", "retrieve",
|
| "access", "query", "bulk", "mass"
|
| }
|
|
|
|
|
| class EnumerationDetector(ScannerPlugin):
|
| """Detects broad-scope enumeration and mass data collection attempts."""
|
|
|
| def __init__(self):
|
| super().__init__(
|
| metadata=PluginMetadata(
|
| name="EnumerationDetector",
|
| version="1.0.0",
|
| description="Detects enumeration of large-scale datasets and broad-scope data collection even with vague language",
|
| author="SecurityGateway",
|
| )
|
| )
|
|
|
| def scan(
|
| self,
|
| user_id: Optional[str],
|
| server_key: str,
|
| tool: str,
|
| arguments: Dict[str, Any],
|
| llm_context: Optional[str] = None,
|
| ) -> ScanResult:
|
| """
|
| Scan for enumeration and mass collection patterns.
|
|
|
| Detects requests that indicate:
|
| - "All X across all Y" patterns (geographic/scope enumeration)
|
| - Aggregation intent with broad scope ("create comprehensive map of all events")
|
| - Mass data collection terminology
|
| - Scope indicators without normal pagination/limits
|
| - Bulk collection across multiple targets/locations
|
|
|
| Args:
|
| user_id: User identifier
|
| server_key: Server key
|
| tool: Tool name
|
| arguments: Tool arguments
|
| llm_context: Optional context
|
|
|
| Returns:
|
| ScanResult with enumeration detection
|
| """
|
| context = (llm_context or "") + " " + self._flatten_json(arguments)
|
| context_lower = context.lower()
|
|
|
| reasons = []
|
| flags = {}
|
| risk_score = 0.0
|
|
|
|
|
| enumeration_pattern_detected = self._contains_pattern(context, ENUMERATION_PATTERNS)
|
| if enumeration_pattern_detected:
|
| reasons.append("Broad-scope enumeration pattern detected (all X across all Y).")
|
| flags["enumeration_pattern"] = True
|
| risk_score += 0.35
|
|
|
|
|
| scope_keywords_found = self._count_keywords(context_lower, BROAD_SCOPE_KEYWORDS)
|
| target_keywords_found = self._count_keywords(context_lower, TARGET_KEYWORDS)
|
| action_found = self._count_keywords(context_lower, ACTION_KEYWORDS) > 0
|
| aggregation_found = self._count_keywords(context_lower, AGGREGATION_KEYWORDS) > 0
|
|
|
|
|
| keyword_risk = 0.0
|
|
|
| if scope_keywords_found >= 2 and target_keywords_found >= 1:
|
|
|
| keyword_risk += 0.20
|
| reasons.append("Multiple broad-scope qualifiers with collection targets.")
|
|
|
| if aggregation_found and scope_keywords_found >= 1 and target_keywords_found >= 1:
|
|
|
| keyword_risk += 0.25
|
| reasons.append("Aggregation intent with broad-scope data collection.")
|
| flags["aggregation_with_enumeration"] = True
|
|
|
| if action_found and scope_keywords_found >= 2 and target_keywords_found >= 1:
|
|
|
| keyword_risk += 0.20
|
| reasons.append("Collection action targeting broad scope across multiple targets.")
|
| flags["collection_across_scope"] = True
|
|
|
| risk_score += keyword_risk
|
|
|
|
|
| has_location_qualifier = any(word in context_lower for word in [
|
| "cities", "locations", "regions", "states", "countries", "venues", "areas",
|
| "US", "USA", "United States", "worldwide", "global", "nationwide"
|
| ])
|
| has_enumeration_verb = any(word in context_lower for word in [
|
| "all", "every", "each", "enumerate", "discover all", "find all"
|
| ])
|
|
|
| if has_location_qualifier and has_enumeration_verb:
|
| reasons.append("Geographic enumeration: targeting all locations or cities.")
|
| flags["geographic_enumeration"] = True
|
| risk_score += 0.25
|
|
|
|
|
| if any(word in context_lower for word in [
|
| "skip limit", "ignore pagination", "all pages", "no limit", "unlimited"
|
| ]):
|
| reasons.append("Pagination bypass or unlimited result collection detected.")
|
| flags["pagination_bypass"] = True
|
| risk_score += 0.20
|
|
|
|
|
| if any(word in context_lower for word in ["mass ", "bulk ", "batch ", "wholesale"]):
|
| if action_found or aggregation_found:
|
| reasons.append("Mass operation with data collection intent.")
|
| flags["mass_operation"] = True
|
| risk_score += 0.15
|
|
|
|
|
| event_platforms = ["eventbrite", "ticketmaster", "meetup", "eventful"]
|
| if any(platform in context_lower for platform in event_platforms):
|
| if any(word in context_lower for word in ["all events", "every event", "all cities", "all venues"]):
|
| reasons.append("Broad event enumeration from ticketing/event platform.")
|
| flags["event_platform_enumeration"] = True
|
| risk_score += 0.25
|
|
|
| detected = bool(reasons)
|
|
|
| return ScanResult(
|
| plugin_name=self.get_metadata().name,
|
| detected=detected,
|
| risk_score=min(1.0, risk_score),
|
| reasons=reasons if reasons else ["No enumeration patterns detected."],
|
| flags=flags,
|
| )
|
|
|
| def _flatten_json(self, value: Any) -> str:
|
| """Flatten nested structures to string for pattern matching."""
|
| if isinstance(value, dict):
|
| return " ".join(self._flatten_json(v) for v in value.values())
|
| if isinstance(value, list):
|
| return " ".join(self._flatten_json(v) for v in value)
|
| return str(value)
|
|
|
| def _contains_pattern(self, text: str, patterns: list) -> bool:
|
| """Check if text matches any pattern."""
|
| for pat in patterns:
|
| if re.search(pat, text, flags=re.IGNORECASE):
|
| return True
|
| return False
|
|
|
| def _count_keywords(self, text: str, keywords: set) -> int:
|
| """Count how many keywords appear in text (case-insensitive)."""
|
| count = 0
|
| for keyword in keywords:
|
|
|
| if re.search(r"\b" + re.escape(keyword) + r"\b", text, flags=re.IGNORECASE):
|
| count += 1
|
| return count
|
|
|
|
|
|
|
| plugin = EnumerationDetector()
|
|
|