"""Security utilities for input validation and sanitization.""" from __future__ import annotations import os import re import logging from typing import Optional, List, Set from urllib.parse import urlparse import hashlib import secrets logger = logging.getLogger(__name__) # Allowed domains for external URL fetching ALLOWED_DOMAINS: Set[str] = { "www.careeraddict.com", "careeraddict.com", "linkedin.com", "www.linkedin.com", "api.linkedin.com", "github.com", "www.github.com", } # Allowed URL schemes ALLOWED_SCHEMES: Set[str] = {"http", "https"} def sanitize_path_component(component: str) -> str: """ Sanitize a path component to prevent directory traversal attacks. Args: component: The path component to sanitize Returns: Sanitized path component """ if not component: return "default" # Remove any directory traversal attempts component = component.replace("..", "") component = component.replace("./", "") component = component.replace("../", "") # Remove path separators component = component.replace("/", "_") component = component.replace("\\", "_") component = component.replace(os.sep, "_") # Remove null bytes component = component.replace("\x00", "") # Remove other potentially dangerous characters component = re.sub(r'[<>:"|?*]', "_", component) # Limit length to prevent filesystem issues if len(component) > 255: # Hash the component if it's too long hash_suffix = hashlib.sha256(component.encode()).hexdigest()[:8] component = component[:240] + "_" + hash_suffix # Ensure it's not empty after sanitization if not component or component.strip() == "": component = "default" return component def validate_url(url: str, allowed_domains: Optional[Set[str]] = None) -> bool: """ Validate a URL for safety before fetching. Args: url: The URL to validate allowed_domains: Optional set of allowed domains (uses default if None) Returns: True if the URL is safe to fetch, False otherwise """ if not url: logger.warning("Empty URL provided for validation") return False try: parsed = urlparse(url) # Check scheme if parsed.scheme not in ALLOWED_SCHEMES: logger.warning(f"Invalid URL scheme: {parsed.scheme}") return False # Check for localhost/private IPs (prevent SSRF) hostname = parsed.hostname if not hostname: logger.warning("URL has no hostname") return False # Block localhost and private IPs if hostname in ["localhost", "127.0.0.1", "0.0.0.0"]: logger.warning(f"Blocked localhost URL: {hostname}") return False # Block private IP ranges if hostname.startswith("192.168.") or hostname.startswith("10.") or hostname.startswith("172."): logger.warning(f"Blocked private IP: {hostname}") return False # Check against allowed domains if specified domains_to_check = allowed_domains if allowed_domains is not None else ALLOWED_DOMAINS if domains_to_check and hostname not in domains_to_check: logger.warning(f"Domain not in allowed list: {hostname}") return False return True except Exception as e: logger.error(f"Error validating URL {url}: {e}") return False def sanitize_user_input(text: str, max_length: int = 10000) -> str: """ Sanitize user text input to prevent injection attacks. Args: text: The user input text max_length: Maximum allowed length Returns: Sanitized text """ if not text: return "" # Truncate to max length text = text[:max_length] # Remove null bytes text = text.replace("\x00", "") # Remove control characters except newlines and tabs text = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', text) return text def generate_secure_token(length: int = 32) -> str: """Generate a cryptographically secure random token.""" return secrets.token_urlsafe(length) def mask_sensitive_data(text: str) -> str: """ Mask sensitive data like API keys in logs. Args: text: Text that might contain sensitive data Returns: Text with sensitive data masked """ # Mask API keys (various patterns) patterns = [ (r'(api[_-]?key["\']?\s*[:=]\s*["\']?)([^"\'\s]+)', r'\1***MASKED***'), (r'(token["\']?\s*[:=]\s*["\']?)([^"\'\s]+)', r'\1***MASKED***'), (r'(secret["\']?\s*[:=]\s*["\']?)([^"\'\s]+)', r'\1***MASKED***'), (r'(password["\']?\s*[:=]\s*["\']?)([^"\'\s]+)', r'\1***MASKED***'), (r'(Authorization:\s*Bearer\s+)([^\s]+)', r'\1***MASKED***'), ] masked_text = text for pattern, replacement in patterns: masked_text = re.sub(pattern, replacement, masked_text, flags=re.IGNORECASE) return masked_text def validate_job_id(job_id: str) -> bool: """ Validate a job ID to ensure it's safe to use. Args: job_id: The job ID to validate Returns: True if valid, False otherwise """ if not job_id: return False # Allow alphanumeric, underscore, and hyphen only if not re.match(r'^[a-zA-Z0-9_-]+$', job_id): logger.warning(f"Invalid job ID format: {job_id}") return False # Reasonable length limit if len(job_id) > 100: logger.warning(f"Job ID too long: {len(job_id)} characters") return False return True