Noo88ear's picture
πŸš€ Initial deployment of Multi-Agent Job Application Assistant
7498f2c
"""Security utilities for input validation and sanitization."""
from __future__ import annotations
import os
import re
import logging
from typing import Optional, List, Set
from urllib.parse import urlparse
import hashlib
import secrets
logger = logging.getLogger(__name__)
# Allowed domains for external URL fetching
ALLOWED_DOMAINS: Set[str] = {
"www.careeraddict.com",
"careeraddict.com",
"linkedin.com",
"www.linkedin.com",
"api.linkedin.com",
"github.com",
"www.github.com",
}
# Allowed URL schemes
ALLOWED_SCHEMES: Set[str] = {"http", "https"}
def sanitize_path_component(component: str) -> str:
"""
Sanitize a path component to prevent directory traversal attacks.
Args:
component: The path component to sanitize
Returns:
Sanitized path component
"""
if not component:
return "default"
# Remove any directory traversal attempts
component = component.replace("..", "")
component = component.replace("./", "")
component = component.replace("../", "")
# Remove path separators
component = component.replace("/", "_")
component = component.replace("\\", "_")
component = component.replace(os.sep, "_")
# Remove null bytes
component = component.replace("\x00", "")
# Remove other potentially dangerous characters
component = re.sub(r'[<>:"|?*]', "_", component)
# Limit length to prevent filesystem issues
if len(component) > 255:
# Hash the component if it's too long
hash_suffix = hashlib.sha256(component.encode()).hexdigest()[:8]
component = component[:240] + "_" + hash_suffix
# Ensure it's not empty after sanitization
if not component or component.strip() == "":
component = "default"
return component
def validate_url(url: str, allowed_domains: Optional[Set[str]] = None) -> bool:
"""
Validate a URL for safety before fetching.
Args:
url: The URL to validate
allowed_domains: Optional set of allowed domains (uses default if None)
Returns:
True if the URL is safe to fetch, False otherwise
"""
if not url:
logger.warning("Empty URL provided for validation")
return False
try:
parsed = urlparse(url)
# Check scheme
if parsed.scheme not in ALLOWED_SCHEMES:
logger.warning(f"Invalid URL scheme: {parsed.scheme}")
return False
# Check for localhost/private IPs (prevent SSRF)
hostname = parsed.hostname
if not hostname:
logger.warning("URL has no hostname")
return False
# Block localhost and private IPs
if hostname in ["localhost", "127.0.0.1", "0.0.0.0"]:
logger.warning(f"Blocked localhost URL: {hostname}")
return False
# Block private IP ranges
if hostname.startswith("192.168.") or hostname.startswith("10.") or hostname.startswith("172."):
logger.warning(f"Blocked private IP: {hostname}")
return False
# Check against allowed domains if specified
domains_to_check = allowed_domains if allowed_domains is not None else ALLOWED_DOMAINS
if domains_to_check and hostname not in domains_to_check:
logger.warning(f"Domain not in allowed list: {hostname}")
return False
return True
except Exception as e:
logger.error(f"Error validating URL {url}: {e}")
return False
def sanitize_user_input(text: str, max_length: int = 10000) -> str:
"""
Sanitize user text input to prevent injection attacks.
Args:
text: The user input text
max_length: Maximum allowed length
Returns:
Sanitized text
"""
if not text:
return ""
# Truncate to max length
text = text[:max_length]
# Remove null bytes
text = text.replace("\x00", "")
# Remove control characters except newlines and tabs
text = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', text)
return text
def generate_secure_token(length: int = 32) -> str:
"""Generate a cryptographically secure random token."""
return secrets.token_urlsafe(length)
def mask_sensitive_data(text: str) -> str:
"""
Mask sensitive data like API keys in logs.
Args:
text: Text that might contain sensitive data
Returns:
Text with sensitive data masked
"""
# Mask API keys (various patterns)
patterns = [
(r'(api[_-]?key["\']?\s*[:=]\s*["\']?)([^"\'\s]+)', r'\1***MASKED***'),
(r'(token["\']?\s*[:=]\s*["\']?)([^"\'\s]+)', r'\1***MASKED***'),
(r'(secret["\']?\s*[:=]\s*["\']?)([^"\'\s]+)', r'\1***MASKED***'),
(r'(password["\']?\s*[:=]\s*["\']?)([^"\'\s]+)', r'\1***MASKED***'),
(r'(Authorization:\s*Bearer\s+)([^\s]+)', r'\1***MASKED***'),
]
masked_text = text
for pattern, replacement in patterns:
masked_text = re.sub(pattern, replacement, masked_text, flags=re.IGNORECASE)
return masked_text
def validate_job_id(job_id: str) -> bool:
"""
Validate a job ID to ensure it's safe to use.
Args:
job_id: The job ID to validate
Returns:
True if valid, False otherwise
"""
if not job_id:
return False
# Allow alphanumeric, underscore, and hyphen only
if not re.match(r'^[a-zA-Z0-9_-]+$', job_id):
logger.warning(f"Invalid job ID format: {job_id}")
return False
# Reasonable length limit
if len(job_id) > 100:
logger.warning(f"Job ID too long: {len(job_id)} characters")
return False
return True