Spaces:
Running
Running
| """ | |
| Media Processor Module | |
| This module handles extracting frames from different media formats: | |
| - GIFs: Animated images | |
| - MP4, WebM, AVI, etc.: Video files | |
| - WEBP: Modern image format that can be animated | |
| - PNG, JPG: Static images | |
| For Non-Technical Developers: | |
| - Converts different video/image formats into individual frames we can process | |
| - Keeps track of timing information so we can rebuild videos with correct speed | |
| - Handles memory efficiently so we don't run out of RAM with large files | |
| - Returns frames in BGR format (what our face-swapping AI expects) | |
| """ | |
| import base64 | |
| import cv2 | |
| import io | |
| import os | |
| import shutil | |
| import subprocess | |
| import tempfile | |
| import numpy as np | |
| from PIL import Image, ImageSequence | |
| import imageio.v2 as imageio | |
| from functools import lru_cache | |
| from typing import List, Optional, Tuple | |
| import requests | |
| from urllib.parse import urlparse | |
| from src.config import ( | |
| SUPPORTED_IMAGE_FORMATS, SUPPORTED_VIDEO_FORMATS, SUPPORTED_GIF_FORMATS, | |
| FRAME_CACHE_SIZE, DEFAULT_HEADERS, DOWNLOAD_TIMEOUT, DEFAULT_GIF_DURATION, | |
| GIF_QUALITY, DEBUG_MODE, VIDEO_CODEC, INSTAGRAM_HEADERS | |
| ) | |
| from src.media_handler import download_media_from_url, load_image_from_bytes, is_instagram_cdn_url | |
| from src.logger import debug_log, log_start, log_success, log_error | |
| # ==================== GIF PROCESSING ==================== | |
| def _extract_frames_from_gif_cached(url: str, max_frames: int = None) -> Tuple[tuple, tuple]: | |
| """ | |
| Download a GIF from URL and extract all frames. | |
| This is cached (remembered) so we don't download the same GIF twice. | |
| Args: | |
| url: URL pointing to the GIF file | |
| max_frames: Maximum number of frames to extract (None = all frames) | |
| Returns: | |
| Tuple of (frames, durations) where: | |
| - frames: Tuple of numpy BGR arrays (the actual frame data) | |
| - durations: Tuple of integers (how long each frame displays in ms) | |
| """ | |
| try: | |
| # Download the GIF | |
| gif_bytes = download_media_from_url(url) | |
| # Open it as a PIL Image (PIL handles GIF formats) | |
| gif_image = Image.open(io.BytesIO(gif_bytes)) | |
| frames = [] # Will hold all frames | |
| durations = [] # Will hold timing for each frame | |
| # Extract each frame from the GIF | |
| while True: | |
| try: | |
| # Convert current frame to numpy BGR format | |
| frame_rgb = np.array(gif_image.convert('RGB')) | |
| frame_bgr = cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2BGR) | |
| frames.append(frame_bgr) | |
| # Get duration of this frame (how long to display it) | |
| # Default to 67ms (roughly 15 FPS) if not specified | |
| duration = gif_image.info.get('duration', DEFAULT_GIF_DURATION) | |
| durations.append(duration) | |
| # Check if we've hit the frame limit | |
| if max_frames and len(frames) >= max_frames: | |
| break | |
| # Move to next frame | |
| gif_image.seek(gif_image.tell() + 1) | |
| except EOFError: | |
| # Reached end of GIF (normal, not an error) | |
| break | |
| if DEBUG_MODE: | |
| debug_log(f"✓ Extracted {len(frames)} frames from GIF") | |
| return tuple(frames), tuple(durations) | |
| except Exception as e: | |
| raise ValueError(f"Failed to process GIF: {str(e)}") | |
| def extract_frames_from_gif(url: str, max_frames: int = None) -> Tuple[List[np.ndarray], List[int]]: | |
| """ | |
| Extract frames from a GIF (public wrapper around cached function). | |
| Args: | |
| url: URL of the GIF file | |
| max_frames: Maximum frames to extract | |
| Returns: | |
| Tuple of (frames_list, durations_list) | |
| """ | |
| frames, durations = _extract_frames_from_gif_cached(url, max_frames) | |
| return list(frames), list(durations) | |
| # ==================== VIDEO PROCESSING ==================== | |
| def _open_video_file(url: str) -> Tuple[cv2.VideoCapture, dict]: | |
| """ | |
| Open a video file from URL and get its properties. | |
| Args: | |
| url: URL or local path to video file | |
| Returns: | |
| Tuple of (video_capture_object, video_info_dict, temp_file_path) | |
| Raises: | |
| ValueError: If video can't be opened | |
| """ | |
| import tempfile | |
| temp_file_path = None | |
| try: | |
| if DEBUG_MODE: | |
| debug_log(f"Opening video: {url}") | |
| # For URLs, we need to download to a temporary file first | |
| # cv2.VideoCapture doesn't work well with BytesIO objects | |
| if url.startswith('http'): | |
| if DEBUG_MODE: | |
| debug_log(f"Downloading video from URL...") | |
| from src.media_handler import is_instagram_cdn_url | |
| from src.config import INSTAGRAM_HEADERS | |
| headers = INSTAGRAM_HEADERS if is_instagram_cdn_url(url) else DEFAULT_HEADERS | |
| response = requests.get( | |
| url, | |
| headers=headers, | |
| timeout=DOWNLOAD_TIMEOUT, | |
| allow_redirects=True, | |
| stream=True | |
| ) | |
| response.raise_for_status() | |
| # Create a temporary file with proper extension | |
| _, ext = os.path.splitext(url.split('?')[0]) # Remove query params | |
| if not ext or ext not in {'.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv'}: | |
| ext = '.mp4' # Default to mp4 | |
| temp_file = tempfile.NamedTemporaryFile(suffix=ext, delete=False) | |
| temp_file_path = temp_file.name | |
| for chunk in response.iter_content(chunk_size=1024 * 1024): | |
| if chunk: | |
| temp_file.write(chunk) | |
| temp_file.close() | |
| if DEBUG_MODE: | |
| debug_log(f"Saved video to temp file: {temp_file_path}") | |
| video_file = cv2.VideoCapture(temp_file_path) | |
| else: | |
| # Local file path | |
| video_file = cv2.VideoCapture(url) | |
| if not video_file.isOpened(): | |
| raise ValueError("Cannot open video file - may be corrupted or unsupported format") | |
| # Extract video properties | |
| fps = video_file.get(cv2.CAP_PROP_FPS) | |
| frame_count = int(video_file.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| width = int(video_file.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(video_file.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| if DEBUG_MODE: | |
| debug_log(f"Video properties: {frame_count} frames, {fps}fps, {width}x{height}") | |
| if frame_count == 0 or width == 0 or height == 0: | |
| raise ValueError(f"Invalid video properties: frames={frame_count}, fps={fps}, resolution={width}x{height}") | |
| # Calculate frame durations based on FPS | |
| frame_duration = int(1000 / fps) if fps > 0 else DEFAULT_GIF_DURATION | |
| info = { | |
| 'fps': fps, | |
| 'frame_count': frame_count, | |
| 'width': width, | |
| 'height': height, | |
| 'frame_duration': frame_duration, | |
| } | |
| return video_file, info, temp_file_path | |
| except Exception as e: | |
| if DEBUG_MODE: | |
| import traceback | |
| log_error("Video open error", detail=str(e), exc=e) | |
| print(traceback.format_exc()) | |
| # Clean up temp file if it was created | |
| if temp_file_path and os.path.exists(temp_file_path): | |
| try: | |
| os.unlink(temp_file_path) | |
| except: | |
| pass | |
| raise ValueError(f"Failed to open video: {str(e)}") | |
| def _ensure_ffmpeg_installed() -> Optional[str]: | |
| """Return the path to ffmpeg if installed, otherwise None.""" | |
| return shutil.which('ffmpeg') | |
| def download_video_to_temp_file(url: str) -> str: | |
| """ | |
| Download a video URL to a temporary local file. | |
| Args: | |
| url: Video URL | |
| Returns: | |
| Local path to downloaded video file | |
| """ | |
| headers = INSTAGRAM_HEADERS if is_instagram_cdn_url(url) else DEFAULT_HEADERS | |
| response = requests.get( | |
| url, | |
| headers=headers, | |
| timeout=DOWNLOAD_TIMEOUT, | |
| allow_redirects=True, | |
| stream=True | |
| ) | |
| response.raise_for_status() | |
| parsed = urlparse(url) | |
| ext = os.path.splitext(parsed.path)[1].lower() | |
| if not ext or ext not in {'.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv'}: | |
| ext = '.mp4' | |
| temp_file = tempfile.NamedTemporaryFile(suffix=ext, delete=False) | |
| temp_file_path = temp_file.name | |
| for chunk in response.iter_content(chunk_size=1024 * 1024): | |
| if chunk: | |
| temp_file.write(chunk) | |
| temp_file.close() | |
| if DEBUG_MODE: | |
| debug_log(f"Downloaded video to temp file: {temp_file_path}") | |
| return temp_file_path | |
| def _extract_audio_from_video_file(video_path: str, audio_path: str) -> Optional[str]: | |
| """ | |
| Extract the audio track from a local video file using ffmpeg. | |
| Args: | |
| video_path: Local video file path | |
| audio_path: Desired output audio path | |
| Returns: | |
| Path to extracted audio file if successful, otherwise None | |
| """ | |
| ffmpeg = _ensure_ffmpeg_installed() | |
| if ffmpeg is None: | |
| debug_log("FFmpeg is not installed; audio extraction unavailable") | |
| return None | |
| cmd = [ffmpeg, '-y', '-i', video_path, '-vn', '-acodec', 'copy', audio_path] | |
| proc = subprocess.run(cmd, capture_output=True, text=True) | |
| if proc.returncode == 0 and os.path.exists(audio_path) and os.path.getsize(audio_path) > 0: | |
| return audio_path | |
| if os.path.exists(audio_path): | |
| os.unlink(audio_path) | |
| cmd = [ffmpeg, '-y', '-i', video_path, '-vn', '-acodec', 'aac', '-b:a', '128k', audio_path] | |
| proc = subprocess.run(cmd, capture_output=True, text=True) | |
| if proc.returncode == 0 and os.path.exists(audio_path) and os.path.getsize(audio_path) > 0: | |
| return audio_path | |
| if DEBUG_MODE: | |
| log_error("Audio extraction failed", detail=f"ffmpeg stderr: {proc.stderr}") | |
| if os.path.exists(audio_path): | |
| os.unlink(audio_path) | |
| return None | |
| def _merge_audio_into_video(video_path: str, audio_path: str, output_path: str) -> str: | |
| """ | |
| Merge an audio track into an existing MP4 video using ffmpeg. | |
| Args: | |
| video_path: Path to the video file | |
| audio_path: Path to the extracted audio file | |
| output_path: Path to write the merged output file | |
| Returns: | |
| Path to the final merged video file | |
| """ | |
| ffmpeg = _ensure_ffmpeg_installed() | |
| if ffmpeg is None: | |
| debug_log("FFmpeg is not installed; cannot merge audio") | |
| return video_path | |
| cmd = [ | |
| ffmpeg, '-y', | |
| '-i', video_path, | |
| '-i', audio_path, | |
| '-c:v', 'copy', | |
| '-c:a', 'aac', | |
| '-map', '0:v:0', | |
| '-map', '1:a:0', | |
| '-shortest', | |
| output_path | |
| ] | |
| proc = subprocess.run(cmd, capture_output=True, text=True) | |
| if proc.returncode != 0: | |
| if DEBUG_MODE: | |
| log_error("Audio merge failed", detail=f"ffmpeg stderr: {proc.stderr}") | |
| return video_path | |
| return output_path | |
| def encode_frames_to_mp4(frames: List[np.ndarray], fps: float, output_path: str, audio_path: str = None) -> str: | |
| """ | |
| Encode a series of RGB frames to an MP4 video file and optionally merge audio. | |
| Args: | |
| frames: RGB frames | |
| fps: Frames per second for output video | |
| output_path: Local path to write the MP4 file | |
| audio_path: Optional local path to an audio file to merge | |
| Returns: | |
| Path to final MP4 file | |
| """ | |
| if not frames: | |
| raise ValueError("No frames to encode to MP4") | |
| height, width = frames[0].shape[:2] | |
| fourcc = cv2.VideoWriter_fourcc(*VIDEO_CODEC) | |
| writer = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) | |
| if not writer.isOpened(): | |
| raise ValueError("Failed to open video writer for MP4 output") | |
| for frame in frames: | |
| if frame is None or frame.size == 0: | |
| continue | |
| if frame.ndim == 3 and frame.shape[2] == 3: | |
| frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) | |
| else: | |
| raise ValueError("Unexpected frame format for MP4 encoding") | |
| writer.write(frame_bgr) | |
| writer.release() | |
| if audio_path: | |
| merged_path = f"{output_path}.with_audio.mp4" | |
| merged_path = _merge_audio_into_video(output_path, audio_path, merged_path) | |
| if merged_path != output_path and os.path.exists(merged_path): | |
| os.replace(merged_path, output_path) | |
| return output_path | |
| def _extract_audio_from_video_url(url: str) -> Optional[str]: | |
| temp_video_path = None | |
| temp_audio_path = None | |
| try: | |
| temp_video_path = download_video_to_temp_file(url) | |
| temp_audio_path = tempfile.NamedTemporaryFile(suffix='.m4a', delete=False).name | |
| return _extract_audio_from_video_file(temp_video_path, temp_audio_path) | |
| finally: | |
| if temp_video_path and os.path.exists(temp_video_path): | |
| try: | |
| os.unlink(temp_video_path) | |
| except Exception: | |
| pass | |
| def encode_frames_to_mp4_base64(frames: List[np.ndarray], fps: float, audio_url: str = None) -> str: | |
| """ | |
| Encode frames to MP4 and return the file as a base64 string. | |
| Args: | |
| frames: RGB frames | |
| fps: Frames per second | |
| audio_url: Optional URL to a source video with audio to merge | |
| Returns: | |
| Base64-encoded MP4 content | |
| """ | |
| temp_dir = tempfile.mkdtemp(prefix='faceswap_mp4_') | |
| video_path = os.path.join(temp_dir, 'output.mp4') | |
| audio_path = None | |
| try: | |
| if audio_url: | |
| audio_path = _extract_audio_from_video_url(audio_url) | |
| encode_frames_to_mp4(frames, fps, video_path, audio_path=audio_path) | |
| with open(video_path, 'rb') as f: | |
| return base64.b64encode(f.read()).decode() | |
| finally: | |
| if audio_path and os.path.exists(audio_path): | |
| try: | |
| os.unlink(audio_path) | |
| except Exception: | |
| pass | |
| if os.path.exists(video_path): | |
| try: | |
| os.unlink(video_path) | |
| except Exception: | |
| pass | |
| if os.path.isdir(temp_dir): | |
| try: | |
| os.rmdir(temp_dir) | |
| except Exception: | |
| pass | |
| def extract_frames_from_video(url: str, max_frames: int = None, start_time: float = None, end_time: float = None) -> Tuple[List[np.ndarray], List[int]]: | |
| """ | |
| Extract frames from a video file (MP4, AVI, etc.). | |
| Args: | |
| url: URL or path to video file | |
| max_frames: Maximum frames to extract (None = all) | |
| start_time: Optional start time in seconds to trim the video | |
| end_time: Optional end time in seconds to trim the video | |
| Returns: | |
| Tuple of (frames_list, durations_list) where each duration is in milliseconds | |
| """ | |
| video_file = None | |
| temp_file_path = None | |
| try: | |
| video_file, info, temp_file_path = _open_video_file(url) | |
| fps = info.get('fps', 30.0) | |
| frame_count = int(info.get('frame_count', 0)) | |
| if frame_count <= 0: | |
| raise ValueError("Invalid video frame count") | |
| start_frame = 0 | |
| end_frame = frame_count | |
| if start_time is not None or end_time is not None: | |
| duration_seconds = frame_count / fps if fps > 0 else frame_count / 30.0 | |
| if start_time is None: | |
| start_time = 0.0 | |
| if end_time is None: | |
| end_time = duration_seconds | |
| if end_time > duration_seconds: | |
| end_time = duration_seconds | |
| start_frame = max(0, min(frame_count - 1, int(start_time * fps))) | |
| end_frame = max(start_frame + 1, min(frame_count, int(end_time * fps))) | |
| if start_frame >= frame_count: | |
| raise ValueError("start_time is beyond the end of the video") | |
| if end_frame <= start_frame: | |
| raise ValueError("end_time must be greater than start_time") | |
| if DEBUG_MODE: | |
| debug_log(f"Video trim window: start_time={start_time}s, end_time={end_time}s, start_frame={start_frame}, end_frame={end_frame}") | |
| video_file.set(cv2.CAP_PROP_POS_FRAMES, start_frame) | |
| frames = [] | |
| current_frame_index = start_frame | |
| while current_frame_index < end_frame: | |
| ret, frame_bgr = video_file.read() | |
| if not ret: # End of video or unable to read | |
| break | |
| if frame_bgr is None or frame_bgr.size == 0: | |
| if DEBUG_MODE: | |
| debug_log(f"Skipping empty frame at index {current_frame_index}") | |
| current_frame_index += 1 | |
| continue | |
| frames.append(frame_bgr) | |
| current_frame_index += 1 | |
| if max_frames and len(frames) >= max_frames: | |
| if DEBUG_MODE: | |
| debug_log(f"Stopped at max_frames={max_frames}") | |
| break | |
| if not frames: | |
| raise ValueError("No valid frames could be extracted from video") | |
| if DEBUG_MODE: | |
| debug_log(f"✓ Extracted {len(frames)} frames from video (duration per frame: {info['frame_duration']}ms)") | |
| # Create durations list (all frames have same duration) | |
| durations = [info['frame_duration']] * len(frames) | |
| return frames, durations | |
| except Exception as e: | |
| if DEBUG_MODE: | |
| import traceback | |
| log_error("Video extraction error", detail=str(e), exc=e) | |
| print(traceback.format_exc()) | |
| raise | |
| finally: | |
| # Clean up | |
| if video_file: | |
| video_file.release() | |
| # Remove temporary file if created | |
| if temp_file_path and os.path.exists(temp_file_path): | |
| try: | |
| os.unlink(temp_file_path) | |
| if DEBUG_MODE: | |
| debug_log(f"Cleaned up temp file: {temp_file_path}") | |
| except Exception as e: | |
| if DEBUG_MODE: | |
| log_error("Failed to clean up temp file", detail=str(e), exc=e) | |
| # ==================== WEBP PROCESSING ==================== | |
| def extract_frames_from_webp(url: str, max_frames: int = None) -> Tuple[List[np.ndarray], List[int]]: | |
| """ | |
| Extract frames from a WEBP file (can be static or animated). | |
| WEBP is a modern web image format. Animated WEBP works like GIFs. | |
| Args: | |
| url: URL to WEBP file | |
| max_frames: Maximum frames to extract | |
| Returns: | |
| Tuple of (frames_list, durations_list) | |
| """ | |
| try: | |
| log_start("Extracting WEBP frames") | |
| webp_bytes = download_media_from_url(url) | |
| webp_image = Image.open(io.BytesIO(webp_bytes)) | |
| # Best-effort detection for animated WEBP: PIL flags or raw header check | |
| is_animated_pil = getattr(webp_image, 'is_animated', False) or getattr(webp_image, 'n_frames', 1) > 1 | |
| is_animated_chunk = (b'ANIM' in webp_bytes[:65536]) or (b'ANMF' in webp_bytes[:65536]) | |
| is_animated = is_animated_pil or is_animated_chunk | |
| debug_log(f"WEBP info: format={webp_image.format}, n_frames={getattr(webp_image, 'n_frames', 1)}, is_animated_pil={is_animated_pil}, is_animated_chunk={is_animated_chunk}") | |
| frames = [] | |
| durations = [] | |
| def _append_frame(frame, duration=None): | |
| frame_rgb = frame if isinstance(frame, np.ndarray) else np.array(Image.fromarray(frame).convert('RGB')) | |
| frame_bgr = cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2BGR) | |
| frames.append(frame_bgr) | |
| durations.append(duration if duration is not None else webp_image.info.get('duration', DEFAULT_GIF_DURATION)) | |
| # Try PIL animated WEBP extraction first. | |
| if is_animated_pil: | |
| try: | |
| for frame in ImageSequence.Iterator(webp_image): | |
| _append_frame(frame) | |
| if max_frames and len(frames) >= max_frames: | |
| break | |
| except Exception as e: | |
| debug_log(f"PIL animated WEBP extraction failed: {e}") | |
| # If PIL did not produce multiple frames, use imageio as a fallback. | |
| if len(frames) <= 1 and (is_animated_chunk or is_animated_pil): | |
| try: | |
| debug_log("Attempting animated WEBP fallback with imageio") | |
| animated_frames = imageio.mimread(io.BytesIO(webp_bytes), format='webp') | |
| if animated_frames: | |
| frames = [] | |
| durations = [] | |
| for frame in animated_frames: | |
| _append_frame(frame) | |
| if max_frames and len(frames) >= max_frames: | |
| break | |
| except Exception as e: | |
| debug_log(f"imageio animated WEBP fallback failed: {e}") | |
| if not frames: | |
| if DEBUG_MODE: | |
| debug_log("Processing static WEBP") | |
| frame_rgb = np.array(webp_image.convert('RGB')) | |
| frame_bgr = cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2BGR) | |
| frames.append(frame_bgr) | |
| durations.append(DEFAULT_GIF_DURATION) | |
| if len(frames) == 1 and (is_animated_pil or is_animated_chunk): | |
| debug_log("WEBP looks animated but only one frame could be extracted") | |
| log_success("Extracted WEBP frames", detail=f"{len(frames)} frames") | |
| return frames, durations | |
| except Exception as e: | |
| log_error("WEBP extraction error", detail=str(e), exc=e) | |
| raise ValueError(f"Failed to process WEBP: {str(e)}") | |
| # ==================== STATIC IMAGE PROCESSING ==================== | |
| def extract_frames_from_image(url: str, max_frames: int = None) -> Tuple[List[np.ndarray], List[int]]: | |
| """ | |
| Extract frames from a static image (PNG, JPG, etc.). | |
| Even though it's just one image, we return it as a single-frame "video" | |
| so the processing pipeline is consistent. | |
| Args: | |
| url: URL to image file | |
| max_frames: Maximum frames (ignored for static images) | |
| Returns: | |
| Tuple of ([single_frame], [duration]) | |
| """ | |
| try: | |
| image_bytes = download_media_from_url(url) | |
| bgr_image = load_image_from_bytes(image_bytes) | |
| if DEBUG_MODE: | |
| debug_log(f"✓ Loaded static image") | |
| # Return as a single-frame sequence | |
| return [bgr_image], [DEFAULT_GIF_DURATION] | |
| except Exception as e: | |
| raise ValueError(f"Failed to process image: {str(e)}") | |
| # ==================== UNIFIED FRAME EXTRACTION ==================== | |
| def extract_frames(url: str, media_type: str = None, max_frames: int = None, start_time: float = None, end_time: float = None) -> Tuple[List[np.ndarray], List[int]]: | |
| """ | |
| Smart function that extracts frames from ANY supported media format. | |
| This is your main entry point. Give it a URL and it figures out what type | |
| of file it is and extracts frames appropriately. | |
| Args: | |
| url: URL to video/image/GIF/WEBP file | |
| media_type: Optional hint - 'gif', 'video', 'webp', 'image' | |
| If not provided, we'll try to detect it | |
| max_frames: Maximum frames to extract (None = all) | |
| start_time: Optional start time in seconds for video trimming | |
| end_time: Optional end time in seconds for video trimming | |
| Returns: | |
| Tuple of (frames_list, durations_list) | |
| Raises: | |
| ValueError: If extraction fails | |
| """ | |
| # If media type not provided, try to detect from URL | |
| if media_type is None: | |
| # Check file extension from the path only (ignore query strings) | |
| parsed_url = urlparse(url.lower()) | |
| path = parsed_url.path | |
| if any(path.endswith(ext) for ext in SUPPORTED_GIF_FORMATS): | |
| media_type = 'gif' | |
| elif any(path.endswith(ext) for ext in SUPPORTED_VIDEO_FORMATS): | |
| media_type = 'video' | |
| elif path.endswith('.webp'): | |
| media_type = 'webp' | |
| elif any(path.endswith(ext) for ext in SUPPORTED_IMAGE_FORMATS): | |
| media_type = 'image' | |
| else: | |
| media_type = 'unknown' | |
| debug_log(f"extract_frames selected media_type={media_type} for URL={url[:80]}") | |
| # Route to appropriate handler | |
| if media_type == 'gif': | |
| if start_time is not None or end_time is not None: | |
| debug_log("Ignoring start_time/end_time for GIF input") | |
| return extract_frames_from_gif(url, max_frames) | |
| elif media_type == 'video': | |
| return extract_frames_from_video(url, max_frames, start_time=start_time, end_time=end_time) | |
| elif media_type == 'webp': | |
| if start_time is not None or end_time is not None: | |
| debug_log("Ignoring start_time/end_time for WEBP input") | |
| return extract_frames_from_webp(url, max_frames) | |
| elif media_type == 'image': | |
| if start_time is not None or end_time is not None: | |
| debug_log("Ignoring start_time/end_time for image input") | |
| return extract_frames_from_image(url, max_frames) | |
| else: | |
| raise ValueError(f"Unknown or unsupported media type: {media_type}") | |
| # ==================== OUTPUT ENCODING ==================== | |
| def encode_frames_to_gif(frames: List[np.ndarray], durations: List[int]) -> str: | |
| """ | |
| Convert processed frames back into a GIF and return as base64. | |
| This is used to return the result to the user. | |
| Args: | |
| frames: List of RGB frames (note: RGB not BGR) | |
| durations: List of frame durations in milliseconds | |
| Returns: | |
| Base64-encoded GIF data (can be embedded in web pages) | |
| """ | |
| if not frames: | |
| raise ValueError("No frames to encode") | |
| try: | |
| # Convert frames to PIL Images (PIL needs RGB format) | |
| pil_frames = [Image.fromarray(frame) for frame in frames] | |
| # Create GIF | |
| output_buffer = io.BytesIO() | |
| pil_frames[0].save( | |
| output_buffer, | |
| format='GIF', | |
| save_all=True, | |
| append_images=pil_frames[1:] if len(pil_frames) > 1 else [], | |
| loop=0, # Loop infinitely | |
| duration=int(np.mean(durations)), # Use average frame duration | |
| quality=GIF_QUALITY, | |
| optimize=False # Don't optimize to maintain quality | |
| ) | |
| # Convert to base64 (this can be sent over internet as text) | |
| output_buffer.seek(0) | |
| gif_base64 = base64.b64encode(output_buffer.getvalue()).decode() | |
| if DEBUG_MODE: | |
| size_mb = len(gif_base64) / (1024 * 1024) | |
| debug_log(f"✓ Encoded GIF ({len(frames)} frames, ~{size_mb:.1f} MB)") | |
| return gif_base64 | |
| except Exception as e: | |
| raise ValueError(f"Failed to encode GIF: {str(e)}") | |
| def clear_frame_cache() -> None: | |
| """ | |
| Clear the cached extracted frames. | |
| Use this to free up memory when you no longer need cached frames. | |
| """ | |
| _extract_frames_from_gif_cached.cache_clear() | |