Spaces:
Runtime error
Runtime error
| """Security utilities for input validation and sanitization.""" | |
| from __future__ import annotations | |
| import os | |
| import re | |
| import logging | |
| from typing import Optional, List, Set | |
| from urllib.parse import urlparse | |
| import hashlib | |
| import secrets | |
| logger = logging.getLogger(__name__) | |
| # Allowed domains for external URL fetching | |
| ALLOWED_DOMAINS: Set[str] = { | |
| "www.careeraddict.com", | |
| "careeraddict.com", | |
| "linkedin.com", | |
| "www.linkedin.com", | |
| "api.linkedin.com", | |
| "github.com", | |
| "www.github.com", | |
| } | |
| # Allowed URL schemes | |
| ALLOWED_SCHEMES: Set[str] = {"http", "https"} | |
| def sanitize_path_component(component: str) -> str: | |
| """ | |
| Sanitize a path component to prevent directory traversal attacks. | |
| Args: | |
| component: The path component to sanitize | |
| Returns: | |
| Sanitized path component | |
| """ | |
| if not component: | |
| return "default" | |
| # Remove any directory traversal attempts | |
| component = component.replace("..", "") | |
| component = component.replace("./", "") | |
| component = component.replace("../", "") | |
| # Remove path separators | |
| component = component.replace("/", "_") | |
| component = component.replace("\\", "_") | |
| component = component.replace(os.sep, "_") | |
| # Remove null bytes | |
| component = component.replace("\x00", "") | |
| # Remove other potentially dangerous characters | |
| component = re.sub(r'[<>:"|?*]', "_", component) | |
| # Limit length to prevent filesystem issues | |
| if len(component) > 255: | |
| # Hash the component if it's too long | |
| hash_suffix = hashlib.sha256(component.encode()).hexdigest()[:8] | |
| component = component[:240] + "_" + hash_suffix | |
| # Ensure it's not empty after sanitization | |
| if not component or component.strip() == "": | |
| component = "default" | |
| return component | |
| def validate_url(url: str, allowed_domains: Optional[Set[str]] = None) -> bool: | |
| """ | |
| Validate a URL for safety before fetching. | |
| Args: | |
| url: The URL to validate | |
| allowed_domains: Optional set of allowed domains (uses default if None) | |
| Returns: | |
| True if the URL is safe to fetch, False otherwise | |
| """ | |
| if not url: | |
| logger.warning("Empty URL provided for validation") | |
| return False | |
| try: | |
| parsed = urlparse(url) | |
| # Check scheme | |
| if parsed.scheme not in ALLOWED_SCHEMES: | |
| logger.warning(f"Invalid URL scheme: {parsed.scheme}") | |
| return False | |
| # Check for localhost/private IPs (prevent SSRF) | |
| hostname = parsed.hostname | |
| if not hostname: | |
| logger.warning("URL has no hostname") | |
| return False | |
| # Block localhost and private IPs | |
| if hostname in ["localhost", "127.0.0.1", "0.0.0.0"]: | |
| logger.warning(f"Blocked localhost URL: {hostname}") | |
| return False | |
| # Block private IP ranges | |
| if hostname.startswith("192.168.") or hostname.startswith("10.") or hostname.startswith("172."): | |
| logger.warning(f"Blocked private IP: {hostname}") | |
| return False | |
| # Check against allowed domains if specified | |
| domains_to_check = allowed_domains if allowed_domains is not None else ALLOWED_DOMAINS | |
| if domains_to_check and hostname not in domains_to_check: | |
| logger.warning(f"Domain not in allowed list: {hostname}") | |
| return False | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error validating URL {url}: {e}") | |
| return False | |
| def sanitize_user_input(text: str, max_length: int = 10000) -> str: | |
| """ | |
| Sanitize user text input to prevent injection attacks. | |
| Args: | |
| text: The user input text | |
| max_length: Maximum allowed length | |
| Returns: | |
| Sanitized text | |
| """ | |
| if not text: | |
| return "" | |
| # Truncate to max length | |
| text = text[:max_length] | |
| # Remove null bytes | |
| text = text.replace("\x00", "") | |
| # Remove control characters except newlines and tabs | |
| text = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', text) | |
| return text | |
| def generate_secure_token(length: int = 32) -> str: | |
| """Generate a cryptographically secure random token.""" | |
| return secrets.token_urlsafe(length) | |
| def mask_sensitive_data(text: str) -> str: | |
| """ | |
| Mask sensitive data like API keys in logs. | |
| Args: | |
| text: Text that might contain sensitive data | |
| Returns: | |
| Text with sensitive data masked | |
| """ | |
| # Mask API keys (various patterns) | |
| patterns = [ | |
| (r'(api[_-]?key["\']?\s*[:=]\s*["\']?)([^"\'\s]+)', r'\1***MASKED***'), | |
| (r'(token["\']?\s*[:=]\s*["\']?)([^"\'\s]+)', r'\1***MASKED***'), | |
| (r'(secret["\']?\s*[:=]\s*["\']?)([^"\'\s]+)', r'\1***MASKED***'), | |
| (r'(password["\']?\s*[:=]\s*["\']?)([^"\'\s]+)', r'\1***MASKED***'), | |
| (r'(Authorization:\s*Bearer\s+)([^\s]+)', r'\1***MASKED***'), | |
| ] | |
| masked_text = text | |
| for pattern, replacement in patterns: | |
| masked_text = re.sub(pattern, replacement, masked_text, flags=re.IGNORECASE) | |
| return masked_text | |
| def validate_job_id(job_id: str) -> bool: | |
| """ | |
| Validate a job ID to ensure it's safe to use. | |
| Args: | |
| job_id: The job ID to validate | |
| Returns: | |
| True if valid, False otherwise | |
| """ | |
| if not job_id: | |
| return False | |
| # Allow alphanumeric, underscore, and hyphen only | |
| if not re.match(r'^[a-zA-Z0-9_-]+$', job_id): | |
| logger.warning(f"Invalid job ID format: {job_id}") | |
| return False | |
| # Reasonable length limit | |
| if len(job_id) > 100: | |
| logger.warning(f"Job ID too long: {len(job_id)} characters") | |
| return False | |
| return True |