File size: 6,043 Bytes
7498f2c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
"""Security utilities for input validation and sanitization."""
from __future__ import annotations
import os
import re
import logging
from typing import Optional, List, Set
from urllib.parse import urlparse
import hashlib
import secrets

logger = logging.getLogger(__name__)

# Allowed domains for external URL fetching
ALLOWED_DOMAINS: Set[str] = {
    "www.careeraddict.com",
    "careeraddict.com",
    "linkedin.com",
    "www.linkedin.com",
    "api.linkedin.com",
    "github.com",
    "www.github.com",
}

# Allowed URL schemes
ALLOWED_SCHEMES: Set[str] = {"http", "https"}


def sanitize_path_component(component: str) -> str:
    """

    Sanitize a path component to prevent directory traversal attacks.

    

    Args:

        component: The path component to sanitize

        

    Returns:

        Sanitized path component

    """
    if not component:
        return "default"
    
    # Remove any directory traversal attempts
    component = component.replace("..", "")
    component = component.replace("./", "")
    component = component.replace("../", "")
    
    # Remove path separators
    component = component.replace("/", "_")
    component = component.replace("\\", "_")
    component = component.replace(os.sep, "_")
    
    # Remove null bytes
    component = component.replace("\x00", "")
    
    # Remove other potentially dangerous characters
    component = re.sub(r'[<>:"|?*]', "_", component)
    
    # Limit length to prevent filesystem issues
    if len(component) > 255:
        # Hash the component if it's too long
        hash_suffix = hashlib.sha256(component.encode()).hexdigest()[:8]
        component = component[:240] + "_" + hash_suffix
    
    # Ensure it's not empty after sanitization
    if not component or component.strip() == "":
        component = "default"
    
    return component


def validate_url(url: str, allowed_domains: Optional[Set[str]] = None) -> bool:
    """

    Validate a URL for safety before fetching.

    

    Args:

        url: The URL to validate

        allowed_domains: Optional set of allowed domains (uses default if None)

        

    Returns:

        True if the URL is safe to fetch, False otherwise

    """
    if not url:
        logger.warning("Empty URL provided for validation")
        return False
    
    try:
        parsed = urlparse(url)
        
        # Check scheme
        if parsed.scheme not in ALLOWED_SCHEMES:
            logger.warning(f"Invalid URL scheme: {parsed.scheme}")
            return False
        
        # Check for localhost/private IPs (prevent SSRF)
        hostname = parsed.hostname
        if not hostname:
            logger.warning("URL has no hostname")
            return False
        
        # Block localhost and private IPs
        if hostname in ["localhost", "127.0.0.1", "0.0.0.0"]:
            logger.warning(f"Blocked localhost URL: {hostname}")
            return False
        
        # Block private IP ranges
        if hostname.startswith("192.168.") or hostname.startswith("10.") or hostname.startswith("172."):
            logger.warning(f"Blocked private IP: {hostname}")
            return False
        
        # Check against allowed domains if specified
        domains_to_check = allowed_domains if allowed_domains is not None else ALLOWED_DOMAINS
        if domains_to_check and hostname not in domains_to_check:
            logger.warning(f"Domain not in allowed list: {hostname}")
            return False
        
        return True
        
    except Exception as e:
        logger.error(f"Error validating URL {url}: {e}")
        return False


def sanitize_user_input(text: str, max_length: int = 10000) -> str:
    """

    Sanitize user text input to prevent injection attacks.

    

    Args:

        text: The user input text

        max_length: Maximum allowed length

        

    Returns:

        Sanitized text

    """
    if not text:
        return ""
    
    # Truncate to max length
    text = text[:max_length]
    
    # Remove null bytes
    text = text.replace("\x00", "")
    
    # Remove control characters except newlines and tabs
    text = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', text)
    
    return text


def generate_secure_token(length: int = 32) -> str:
    """Generate a cryptographically secure random token."""
    return secrets.token_urlsafe(length)


def mask_sensitive_data(text: str) -> str:
    """

    Mask sensitive data like API keys in logs.

    

    Args:

        text: Text that might contain sensitive data

        

    Returns:

        Text with sensitive data masked

    """
    # Mask API keys (various patterns)
    patterns = [
        (r'(api[_-]?key["\']?\s*[:=]\s*["\']?)([^"\'\s]+)', r'\1***MASKED***'),
        (r'(token["\']?\s*[:=]\s*["\']?)([^"\'\s]+)', r'\1***MASKED***'),
        (r'(secret["\']?\s*[:=]\s*["\']?)([^"\'\s]+)', r'\1***MASKED***'),
        (r'(password["\']?\s*[:=]\s*["\']?)([^"\'\s]+)', r'\1***MASKED***'),
        (r'(Authorization:\s*Bearer\s+)([^\s]+)', r'\1***MASKED***'),
    ]
    
    masked_text = text
    for pattern, replacement in patterns:
        masked_text = re.sub(pattern, replacement, masked_text, flags=re.IGNORECASE)
    
    return masked_text


def validate_job_id(job_id: str) -> bool:
    """

    Validate a job ID to ensure it's safe to use.

    

    Args:

        job_id: The job ID to validate

        

    Returns:

        True if valid, False otherwise

    """
    if not job_id:
        return False
    
    # Allow alphanumeric, underscore, and hyphen only
    if not re.match(r'^[a-zA-Z0-9_-]+$', job_id):
        logger.warning(f"Invalid job ID format: {job_id}")
        return False
    
    # Reasonable length limit
    if len(job_id) > 100:
        logger.warning(f"Job ID too long: {len(job_id)} characters")
        return False
    
    return True