File size: 10,829 Bytes
4ff9d22
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
27816c1
4ff9d22
 
 
cbba89e
4ff9d22
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
eaa4855
 
 
4ff9d22
eaa4855
 
 
 
 
 
 
 
 
 
4ff9d22
eaa4855
 
 
4ff9d22
 
27816c1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4ff9d22
 
 
 
 
 
 
 
 
 
 
 
3c38f93
 
4ff9d22
 
3c38f93
2eaa429
 
4ff9d22
 
 
 
3c38f93
cbba89e
4ff9d22
 
3c38f93
27816c1
 
4ff9d22
3c38f93
4ff9d22
3c38f93
 
 
cbba89e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3c38f93
cbba89e
4ff9d22
3c38f93
4ff9d22
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
27816c1
4ff9d22
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cbba89e
4ff9d22
 
 
cbba89e
 
 
 
 
4ff9d22
27816c1
4ff9d22
 
27816c1
4ff9d22
 
 
cbba89e
 
 
 
 
 
 
 
 
27816c1
cbba89e
 
 
 
 
 
 
 
 
4ff9d22
27816c1
 
 
 
 
 
 
 
 
 
 
 
4ff9d22
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cbba89e
 
 
 
 
 
 
 
 
 
4ff9d22
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
"""
Media Handler Module

This module determines what type of file a URL points to (image, video, GIF, Instagram link, etc.)
and prepares it for processing.

For Non-Technical Developers:
- Figures out what kind of media the user gave us (image vs video vs Instagram link)
- Downloads the media from the internet or extracts it from Instagram
- Validates that it's something we can actually process
- Returns the media in a format our face-swapping AI can use
"""

import os
import io
import requests
import cv2
import numpy as np
from PIL import Image
from urllib.parse import urlparse
from src.config import (
    DEFAULT_HEADERS, INSTAGRAM_HEADERS, DOWNLOAD_TIMEOUT, SUPPORTED_FORMATS,
    SUPPORTED_IMAGE_FORMATS, SUPPORTED_VIDEO_FORMATS, SUPPORTED_GIF_FORMATS,
    ERROR_MESSAGES
)
from src.logger import debug_log

# ==================== HELPER FUNCTIONS ====================

def get_file_extension_from_url(url: str) -> str:
    """
    Extract the file extension from a URL.
    
    Example: "https://example.com/image.jpg" -> ".jpg"
    
    Args:
        url: The web address we're downloading from
        
    Returns:
        The file extension (like .jpg, .mp4, .gif) or empty string if not found
    """
    parsed = urlparse(url)
    path = parsed.path.lower()
    
    # Get the extension from the path
    if '.' in path:
        return os.path.splitext(path)[1]
    
    return ''


def is_instagram_url(url: str) -> bool:
    """
    Check if a URL points to an Instagram page/post/reel URL.

    This should only match Instagram page domains, not CDN or media delivery hosts.
    """
    instagram_page_domains = {
        'instagram.com',
        'www.instagram.com',
        'm.instagram.com',
        'l.instagram.com',
        'instagr.am',
        'www.instagr.am',
        'ig.me',
        'www.ig.me',
    }
    parsed = urlparse(url)
    domain = parsed.netloc.lower()

    return domain in instagram_page_domains


def is_instagram_cdn_url(url: str) -> bool:
    """
    Detect Instagram CDN/media delivery hosts.

    This helps when the user supplies a direct Instagram CDN URL such as
    scontent-iad6-1.cdninstagram.com or cdninstagram.com.
    """
    parsed = urlparse(url)
    domain = parsed.netloc.lower()
    return any(host in domain for host in [
        'cdninstagram.com',
        'scontent',
        'instagram.com',
    ])


def detect_media_type(url: str) -> str:
    """
    Determine what type of media a URL points to.
    
    This is like asking: "Is this a photo, a video, a GIF, or an Instagram link?"
    
    Args:
        url: The web address or Instagram link
        
    Returns:
        One of: 'instagram', 'gif', 'video', 'image', or 'unknown'
    """
    from src.config import DEBUG_MODE
    
    # Check if it's Instagram first
    if is_instagram_url(url):
        if DEBUG_MODE:
            debug_log(f"[detect_media_type] Detected Instagram URL")
        return 'instagram'
    
    # Get file extension
    ext = get_file_extension_from_url(url)
    
    if DEBUG_MODE:
        debug_log(f"[detect_media_type] URL extension: {ext}")
    # Categorize based on extension
    if ext in SUPPORTED_GIF_FORMATS:
        media_type = 'gif'
    elif ext == '.webp':
        media_type = 'webp'
    elif ext in SUPPORTED_VIDEO_FORMATS:
        media_type = 'video'
    elif ext in SUPPORTED_IMAGE_FORMATS:
        media_type = 'image'
    else:
        media_type = 'unknown'

    # If the extension is unknown, attempt to infer from the remote content type.
    if media_type == 'unknown' and not is_instagram_url(url):
        try:
            response = requests.head(
                url,
                headers=DEFAULT_HEADERS,
                timeout=DOWNLOAD_TIMEOUT,
                allow_redirects=True
            )
            content_type = response.headers.get('Content-Type', '').lower()
            if 'video' in content_type:
                media_type = 'video'
            elif 'gif' in content_type:
                media_type = 'gif'
            elif 'image/webp' in content_type:
                media_type = 'webp'
            elif 'image' in content_type:
                media_type = 'image'
        except Exception:
            pass

    if DEBUG_MODE:
        debug_log(f"[detect_media_type] Detected type: {media_type}")
    
    return media_type


# ==================== MEDIA DOWNLOADING ====================

def download_media_from_url(url: str) -> bytes:
    """
    Download a file from the internet.
    
    This function handles the boring stuff like retries, headers, timeouts, etc.
    It's like a smart downloader that fetends to be a browser.
    
    Args:
        url: The web address to download from
        
    Returns:
        The file contents as binary data
        
    Raises:
        ValueError: If download fails
    """
    try:
        # Make the internet request with browser headers
        response = requests.get(
            url,
            headers=INSTAGRAM_HEADERS if is_instagram_cdn_url(url) else DEFAULT_HEADERS,
            timeout=DOWNLOAD_TIMEOUT,
            allow_redirects=True,
            stream=False  # Download the entire file at once
        )
        response.raise_for_status()  # Raise an error if server returned an error
        
        return response.content
        
    except requests.exceptions.Timeout:
        raise ValueError("Download timed out. The server took too long to respond.")
    except requests.exceptions.HTTPError as e:
        raise ValueError(f"Server returned error: {e.response.status_code}")
    except requests.exceptions.RequestException as e:
        raise ValueError(f"Download failed: {ERROR_MESSAGES['download_failed']}")


def load_image_from_bytes(image_bytes: bytes) -> np.ndarray:
    """
    Convert downloaded image bytes into a format the AI can understand (OpenCV BGR image).
    
    Args:
        image_bytes: The raw image file data
        
    Returns:
        The image as a BGR numpy array (that's how OpenCV likes images)
        
    Raises:
        ValueError: If image data is invalid
    """
    try:
        # Convert bytes to numpy array
        arr = np.frombuffer(image_bytes, np.uint8)
        
        # Decode as image (OpenCV will auto-detect the format)
        bgr_image = cv2.imdecode(arr, cv2.IMREAD_COLOR)
        
        if bgr_image is None or bgr_image.size == 0:
            raise ValueError("Image data is empty or invalid")
        
        return bgr_image
        
    except Exception as e:
        raise ValueError(f"Failed to load image: {str(e)}")


# ==================== VALIDATION ====================

def validate_url_accessibility(url: str) -> bool:
    """
    Check if we can actually reach the URL before processing.
    
    This is like knocking on the door before trying to download - saves time if the door is locked!
    
    Args:
        url: The URL to check
    
    Returns:
        True if URL is accessible, False otherwise
    """
    if is_instagram_url(url):
        # Instagram may block HEAD requests, so skip direct accessibility checks.
        # Extraction will determine if the link is valid.
        return True

    try:
        headers = INSTAGRAM_HEADERS if is_instagram_cdn_url(url) else DEFAULT_HEADERS
        response = requests.head(
            url,
            headers=headers,
            timeout=DOWNLOAD_TIMEOUT,
            allow_redirects=True
        )

        if response.status_code == 200:
            return True

        # Some servers reject HEAD requests even though GET works.
        if response.status_code in {400, 403, 405, 429}:
            try:
                response = requests.get(
                    url,
                    headers=headers,
                    timeout=DOWNLOAD_TIMEOUT,
                    allow_redirects=True,
                    stream=True
                )
                return response.status_code == 200
            except Exception:
                return False

        return False
    except Exception:
        # Some hosts (including CDN links) reject HEAD requests but still allow GET.
        try:
            response = requests.get(
                url,
                headers=INSTAGRAM_HEADERS if is_instagram_cdn_url(url) else DEFAULT_HEADERS,
                timeout=DOWNLOAD_TIMEOUT,
                allow_redirects=True,
                stream=True
            )
            return response.status_code == 200
        except Exception:
            return False


def validate_media_format(url: str) -> dict:
    """
    Check if a media URL is in a format we support.
    
    Args:
        url: The media URL to validate
        
    Returns:
        Dictionary with keys:
        - 'valid': True/False - can we process this?
        - 'media_type': What kind of media is it?
        - 'error': Error message if not valid
    """
    # Check if it's Instagram (special case - we'll handle it separately)
    if is_instagram_url(url):
        return {
            'valid': True,
            'media_type': 'instagram',
            'error': None
        }
    
    # For other URLs, check the file extension
    media_type = detect_media_type(url)
    
    if media_type == 'unknown':
        return {
            'valid': False,
            'media_type': 'unknown',
            'error': ERROR_MESSAGES['unsupported_format']
        }
    
    return {
        'valid': True,
        'media_type': media_type,
        'error': None
    }


# ==================== PUBLIC API ====================

def get_media_handler(url: str) -> dict:
    """
    Smart function that figures out what to do with a URL.
    
    This is your main entry point - give it a URL and it tells you what it is
    and what to do with it.
    
    Args:
        url: The media URL provided by the user
        
    Returns:
        Dictionary with:
        - 'accessible': Can we reach this URL?
        - 'media_type': What kind of media?
        - 'validation': Full validation results
        
    Raises:
        ValueError: If URL is invalid or unreachable
    """
    # First, validate the format
    validation = validate_media_format(url)
    
    if not validation['valid'] and not is_instagram_url(url):
        raise ValueError(validation['error'])
    
    # Check if URL is actually accessible
    if not validate_url_accessibility(url):
        raise ValueError(ERROR_MESSAGES['download_failed'])
    
    if is_instagram_url(url):
        return {
            'accessible': True,
            'media_type': 'instagram',
            'validation': validation
        }

    if not validate_url_accessibility(url):
        raise ValueError(ERROR_MESSAGES['download_failed'])

    return {
        'accessible': True,
        'media_type': validation['media_type'],
        'validation': validation
    }