Spaces:
Build error
Build error
| """ | |
| upif.modules.input_protection | |
| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
| The First Line of Defense. | |
| Implements heuristic analysis using massive regex pattern matching | |
| to detect SQL Injection, XSS, Jailbreaks, and Prompt Manipulations. | |
| :copyright: (c) 2025 Yash Dhone. | |
| :license: Proprietary, see LICENSE for details. | |
| """ | |
| import re | |
| import json | |
| import os | |
| from typing import Any, List, Optional, Dict | |
| from upif.core.interfaces import SecurityModule | |
| class InputGuard(SecurityModule): | |
| """ | |
| Heuristic Input Guard. | |
| Capabilities: | |
| - Regex matching against 250+ known attack vectors. | |
| - JSON-based pattern loading for easy updates. | |
| - Configurable refusal messages (Internationalization ready). | |
| """ | |
| def __init__(self, refusal_message: str = "Input unsafe. Action blocked."): | |
| """ | |
| Initialize the Input Guard. | |
| Args: | |
| refusal_message (str): The message returned to the host application | |
| when an attack is detected. | |
| """ | |
| self.refusal_message = refusal_message | |
| self.patterns: List[str] = [] | |
| self._load_patterns() | |
| # Pre-compile regexes for performance (compilation happens once at startup) | |
| # Using IGNORECASE for broad matching | |
| self.compiled_patterns = [re.compile(p, re.IGNORECASE) for p in self.patterns] | |
| def _load_patterns(self) -> None: | |
| """ | |
| Internal: Loads attack signatures from the bundled JSON database. | |
| Fail-Safe: If JSON allows parsing errors or is missing, falls back to | |
| a minimal hardcoded set to ensure BASIC protection remains. | |
| """ | |
| # Relative path resolution for self-contained distribution | |
| base_dir = os.path.dirname(os.path.dirname(__file__)) | |
| data_path = os.path.join(base_dir, "data", "patterns.json") | |
| try: | |
| with open(data_path, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| # Extract patterns from all categories | |
| raw_patterns = [] | |
| for category, pattern_list in data.get("categories", {}).items(): | |
| if isinstance(pattern_list, list): | |
| raw_patterns.extend(pattern_list) | |
| # Critical: Escape special regex characters in the strings | |
| # We treat the JSON entries as "Signatures" (Literals), not "Regexes" | |
| # This prevents a malformed user string in JSON from crashing the engine. | |
| self.patterns.extend([re.escape(p) for p in raw_patterns]) | |
| except Exception as e: | |
| # Silent Fail-Safe (Logged via Coordinator if this instantiates, | |
| # but ideally we print here since Logger might not be ready) | |
| # In production, we assume standard library logging or print to stderr | |
| print(f"UPIF WARNING: Pattern Logic Fallback due to: {e}") | |
| self.patterns = [re.escape("ignore previous instructions"), re.escape("system override")] | |
| def scan(self, content: Any, metadata: Optional[Dict[str, Any]] = None) -> Any: | |
| """ | |
| Scans input string for known attack patterns. | |
| Args: | |
| content (Any): Payload. If not string, it is ignored (Pass-through). | |
| metadata (dict): Unused in Heuristic scan. | |
| Returns: | |
| str: Original content or self.refusal_message. | |
| """ | |
| if not isinstance(content, str): | |
| return content | |
| # Linear Scan (Optimization: Could use Aho-Corasick for O(n) in v2) | |
| for pattern in self.compiled_patterns: | |
| if pattern.search(content): | |
| # Attack Detected | |
| return self.refusal_message | |
| return content | |