File size: 3,785 Bytes
5e56bcf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
"""
upif.modules.input_protection
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

The First Line of Defense.
Implements heuristic analysis using massive regex pattern matching
to detect SQL Injection, XSS, Jailbreaks, and Prompt Manipulations.

:copyright: (c) 2025 Yash Dhone.
:license: Proprietary, see LICENSE for details.
"""

import re
import json
import os
from typing import Any, List, Optional, Dict
from upif.core.interfaces import SecurityModule

class InputGuard(SecurityModule):
    """
    Heuristic Input Guard.
    
    Capabilities:
    - Regex matching against 250+ known attack vectors.
    - JSON-based pattern loading for easy updates.
    - Configurable refusal messages (Internationalization ready).
    """

    def __init__(self, refusal_message: str = "Input unsafe. Action blocked."):
        """
        Initialize the Input Guard.

        Args:
            refusal_message (str): The message returned to the host application
                                   when an attack is detected.
        """
        self.refusal_message = refusal_message
        self.patterns: List[str] = []
        self._load_patterns()
        
        # Pre-compile regexes for performance (compilation happens once at startup)
        # Using IGNORECASE for broad matching
        self.compiled_patterns = [re.compile(p, re.IGNORECASE) for p in self.patterns]

    def _load_patterns(self) -> None:
        """
        Internal: Loads attack signatures from the bundled JSON database.
        
        Fail-Safe: If JSON allows parsing errors or is missing, falls back to
                   a minimal hardcoded set to ensure BASIC protection remains.
        """
        # Relative path resolution for self-contained distribution
        base_dir = os.path.dirname(os.path.dirname(__file__))
        data_path = os.path.join(base_dir, "data", "patterns.json")
        
        try:
            with open(data_path, "r", encoding="utf-8") as f:
                data = json.load(f)
            
            # Extract patterns from all categories
            raw_patterns = []
            for category, pattern_list in data.get("categories", {}).items():
                if isinstance(pattern_list, list):
                    raw_patterns.extend(pattern_list)
                    
            # Critical: Escape special regex characters in the strings
            # We treat the JSON entries as "Signatures" (Literals), not "Regexes"
            # This prevents a malformed user string in JSON from crashing the engine.
            self.patterns.extend([re.escape(p) for p in raw_patterns])
            
        except Exception as e:
            # Silent Fail-Safe (Logged via Coordinator if this instantiates, 
            # but ideally we print here since Logger might not be ready)
            # In production, we assume standard library logging or print to stderr
            print(f"UPIF WARNING: Pattern Logic Fallback due to: {e}")
            self.patterns = [re.escape("ignore previous instructions"), re.escape("system override")]

    def scan(self, content: Any, metadata: Optional[Dict[str, Any]] = None) -> Any:
        """
        Scans input string for known attack patterns.

        Args:
            content (Any): Payload. If not string, it is ignored (Pass-through).
            metadata (dict): Unused in Heuristic scan.

        Returns:
            str: Original content or self.refusal_message.
        """
        if not isinstance(content, str):
            return content
            
        # Linear Scan (Optimization: Could use Aho-Corasick for O(n) in v2)
        for pattern in self.compiled_patterns:
            if pattern.search(content):
                # Attack Detected
                return self.refusal_message
                
        return content