|
|
|
|
|
import re |
|
|
import html |
|
|
from typing import Optional |
|
|
from backend.config.logging_config import get_logger |
|
|
|
|
|
|
|
|
logger = get_logger("sanitization") |
|
|
|
|
|
|
|
|
|
|
|
class DataSanitizer: |
|
|
"""Simple data sanitization utility for recipe chatbot inputs""" |
|
|
|
|
|
|
|
|
MAX_MESSAGE_LENGTH = 1000 |
|
|
MIN_MESSAGE_LENGTH = 1 |
|
|
|
|
|
|
|
|
HARMFUL_PATTERNS = [ |
|
|
r'<script[^>]*>.*?</script>', |
|
|
r'javascript:', |
|
|
r'on\w+\s*=', |
|
|
] |
|
|
|
|
|
@classmethod |
|
|
def sanitize_input(cls, text: str) -> str: |
|
|
""" |
|
|
Sanitize user input for recipe chatbot |
|
|
|
|
|
Args: |
|
|
text: Raw user input |
|
|
|
|
|
Returns: |
|
|
Sanitized text |
|
|
|
|
|
Raises: |
|
|
ValueError: If input fails validation |
|
|
""" |
|
|
if not text: |
|
|
raise ValueError("Input cannot be empty") |
|
|
|
|
|
logger.debug(f"🧼 Sanitizing input: '{text[:50]}...'") |
|
|
|
|
|
|
|
|
cls._validate_length(text) |
|
|
|
|
|
|
|
|
sanitized = html.escape(text.strip()) |
|
|
|
|
|
|
|
|
sanitized = cls._remove_harmful_content(sanitized) |
|
|
|
|
|
|
|
|
sanitized = cls._normalize_whitespace(sanitized) |
|
|
|
|
|
|
|
|
if not sanitized.strip(): |
|
|
raise ValueError("Input cannot be empty after sanitization") |
|
|
|
|
|
logger.debug(f"✅ Input sanitized successfully") |
|
|
return sanitized.strip() |
|
|
|
|
|
@classmethod |
|
|
def _validate_length(cls, text: str) -> None: |
|
|
"""Validate input length""" |
|
|
if len(text) < cls.MIN_MESSAGE_LENGTH: |
|
|
raise ValueError(f"Input too short (minimum {cls.MIN_MESSAGE_LENGTH} character)") |
|
|
|
|
|
if len(text) > cls.MAX_MESSAGE_LENGTH: |
|
|
raise ValueError(f"Input too long (maximum {cls.MAX_MESSAGE_LENGTH} characters)") |
|
|
|
|
|
@classmethod |
|
|
def _remove_harmful_content(cls, text: str) -> str: |
|
|
"""Remove basic harmful content""" |
|
|
for pattern in cls.HARMFUL_PATTERNS: |
|
|
text = re.sub(pattern, '', text, flags=re.IGNORECASE | re.DOTALL) |
|
|
return text |
|
|
|
|
|
@classmethod |
|
|
def _normalize_whitespace(cls, text: str) -> str: |
|
|
"""Normalize whitespace in text""" |
|
|
|
|
|
text = re.sub(r'\s+', ' ', text) |
|
|
return text.strip() |
|
|
|
|
|
|
|
|
def sanitize_user_input(text: str) -> str: |
|
|
"""Sanitize any user input (chat messages, demo prompts, etc.)""" |
|
|
return DataSanitizer.sanitize_input(text) |
|
|
|
|
|
def clean(s: Optional[str]) -> Optional[str]: |
|
|
if not s: return None |
|
|
s = re.sub(r"\s+", " ", s).strip() |
|
|
s = re.sub(r"\bclick here\b.*", "", s, flags=re.I) |
|
|
return s or None |
|
|
|