Spaces:
Runtime error
Runtime error
File size: 6,043 Bytes
7498f2c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
"""Security utilities for input validation and sanitization."""
from __future__ import annotations
import os
import re
import logging
from typing import Optional, List, Set
from urllib.parse import urlparse
import hashlib
import secrets
logger = logging.getLogger(__name__)
# Allowed domains for external URL fetching
ALLOWED_DOMAINS: Set[str] = {
"www.careeraddict.com",
"careeraddict.com",
"linkedin.com",
"www.linkedin.com",
"api.linkedin.com",
"github.com",
"www.github.com",
}
# Allowed URL schemes
ALLOWED_SCHEMES: Set[str] = {"http", "https"}
def sanitize_path_component(component: str) -> str:
"""
Sanitize a path component to prevent directory traversal attacks.
Args:
component: The path component to sanitize
Returns:
Sanitized path component
"""
if not component:
return "default"
# Remove any directory traversal attempts
component = component.replace("..", "")
component = component.replace("./", "")
component = component.replace("../", "")
# Remove path separators
component = component.replace("/", "_")
component = component.replace("\\", "_")
component = component.replace(os.sep, "_")
# Remove null bytes
component = component.replace("\x00", "")
# Remove other potentially dangerous characters
component = re.sub(r'[<>:"|?*]', "_", component)
# Limit length to prevent filesystem issues
if len(component) > 255:
# Hash the component if it's too long
hash_suffix = hashlib.sha256(component.encode()).hexdigest()[:8]
component = component[:240] + "_" + hash_suffix
# Ensure it's not empty after sanitization
if not component or component.strip() == "":
component = "default"
return component
def validate_url(url: str, allowed_domains: Optional[Set[str]] = None) -> bool:
"""
Validate a URL for safety before fetching.
Args:
url: The URL to validate
allowed_domains: Optional set of allowed domains (uses default if None)
Returns:
True if the URL is safe to fetch, False otherwise
"""
if not url:
logger.warning("Empty URL provided for validation")
return False
try:
parsed = urlparse(url)
# Check scheme
if parsed.scheme not in ALLOWED_SCHEMES:
logger.warning(f"Invalid URL scheme: {parsed.scheme}")
return False
# Check for localhost/private IPs (prevent SSRF)
hostname = parsed.hostname
if not hostname:
logger.warning("URL has no hostname")
return False
# Block localhost and private IPs
if hostname in ["localhost", "127.0.0.1", "0.0.0.0"]:
logger.warning(f"Blocked localhost URL: {hostname}")
return False
# Block private IP ranges
if hostname.startswith("192.168.") or hostname.startswith("10.") or hostname.startswith("172."):
logger.warning(f"Blocked private IP: {hostname}")
return False
# Check against allowed domains if specified
domains_to_check = allowed_domains if allowed_domains is not None else ALLOWED_DOMAINS
if domains_to_check and hostname not in domains_to_check:
logger.warning(f"Domain not in allowed list: {hostname}")
return False
return True
except Exception as e:
logger.error(f"Error validating URL {url}: {e}")
return False
def sanitize_user_input(text: str, max_length: int = 10000) -> str:
"""
Sanitize user text input to prevent injection attacks.
Args:
text: The user input text
max_length: Maximum allowed length
Returns:
Sanitized text
"""
if not text:
return ""
# Truncate to max length
text = text[:max_length]
# Remove null bytes
text = text.replace("\x00", "")
# Remove control characters except newlines and tabs
text = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', text)
return text
def generate_secure_token(length: int = 32) -> str:
"""Generate a cryptographically secure random token."""
return secrets.token_urlsafe(length)
def mask_sensitive_data(text: str) -> str:
"""
Mask sensitive data like API keys in logs.
Args:
text: Text that might contain sensitive data
Returns:
Text with sensitive data masked
"""
# Mask API keys (various patterns)
patterns = [
(r'(api[_-]?key["\']?\s*[:=]\s*["\']?)([^"\'\s]+)', r'\1***MASKED***'),
(r'(token["\']?\s*[:=]\s*["\']?)([^"\'\s]+)', r'\1***MASKED***'),
(r'(secret["\']?\s*[:=]\s*["\']?)([^"\'\s]+)', r'\1***MASKED***'),
(r'(password["\']?\s*[:=]\s*["\']?)([^"\'\s]+)', r'\1***MASKED***'),
(r'(Authorization:\s*Bearer\s+)([^\s]+)', r'\1***MASKED***'),
]
masked_text = text
for pattern, replacement in patterns:
masked_text = re.sub(pattern, replacement, masked_text, flags=re.IGNORECASE)
return masked_text
def validate_job_id(job_id: str) -> bool:
"""
Validate a job ID to ensure it's safe to use.
Args:
job_id: The job ID to validate
Returns:
True if valid, False otherwise
"""
if not job_id:
return False
# Allow alphanumeric, underscore, and hyphen only
if not re.match(r'^[a-zA-Z0-9_-]+$', job_id):
logger.warning(f"Invalid job ID format: {job_id}")
return False
# Reasonable length limit
if len(job_id) > 100:
logger.warning(f"Job ID too long: {len(job_id)} characters")
return False
return True |