import os import tempfile from PIL import Image from typing import Tuple, Optional import logging from pathlib import Path logger = logging.getLogger(__name__) def validate_inputs(reference_image, source_video) -> Tuple[Image.Image, str]: """ Validate and process input files. Args: reference_image: PIL Image object source_video: Path or UploadedFile object Returns: Tuple of (PIL Image, video file path) """ # Validate reference image if reference_image is None: raise ValueError("Reference image is required") # Ensure reference image is PIL Image if not isinstance(reference_image, Image.Image): if hasattr(reference_image, 'name'): reference_image = Image.open(reference_image.name) else: raise ValueError("Invalid reference image format") # Convert to RGB if needed if reference_image.mode != 'RGB': reference_image = reference_image.convert('RGB') # Handle video file if hasattr(source_video, 'name'): video_path = source_video.name elif isinstance(source_video, str): video_path = source_video else: raise ValueError("Invalid video file format") # Validate video file exists if not os.path.exists(video_path): raise ValueError("Video file not found") # Check file size (limit to 500MB) file_size = os.path.getsize(video_path) if file_size > 500 * 1024 * 1024: # 500MB raise ValueError("Video file too large. Maximum size is 500MB") logger.info(f"Inputs validated successfully") return reference_image, video_path def save_uploaded_files(reference_image, source_video) -> Tuple[str, str]: """ Save uploaded files to temporary directory. Args: reference_image: PIL Image object source_video: Video file path Returns: Tuple of (reference_image_path, video_path) """ # Create temporary directory temp_dir = tempfile.mkdtemp(prefix='video_char_replacement_') # Save reference image ref_path = os.path.join(temp_dir, 'reference_image.jpg') reference_image.save(ref_path, 'JPEG', quality=95) # Copy video file video_path = os.path.join(temp_dir, 'source_video.mp4') # Handle different video formats if source_video.endswith(('.avi', '.mov', '.mkv')): # Convert to MP4 if needed try: import cv2 cap = cv2.VideoCapture(source_video) fps = int(cap.get(cv2.CAP_PROP_FPS)) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(video_path, fourcc, fps, (width, height)) while True: ret, frame = cap.read() if not ret: break out.write(frame) cap.release() out.release() except Exception as e: logger.warning(f"Could not convert video format: {e}") # Just copy the file as-is import shutil shutil.copy2(source_video, video_path) else: # Copy the file as-is import shutil shutil.copy2(source_video, video_path) logger.info(f"Files saved to temporary directory: {temp_dir}") return ref_path, video_path def create_output_directory() -> str: """ Create output directory for processed videos. Returns: Path to output directory """ # Create output directory in temporary folder output_dir = tempfile.mkdtemp(prefix='video_char_replacement_output_') # Ensure directory exists os.makedirs(output_dir, exist_ok=True) logger.info(f"Output directory created: {output_dir}") return output_dir def cleanup_temp_files(temp_dir: str): """ Clean up temporary files. Args: temp_dir: Directory to clean up """ try: import shutil if os.path.exists(temp_dir): shutil.rmtree(temp_dir) logger.info(f"Cleaned up temporary directory: {temp_dir}") except Exception as e: logger.warning(f"Could not clean up temporary directory: {e}") def format_file_size(size_bytes: int) -> str: """ Format file size in human-readable format. Args: size_bytes: Size in bytes Returns: Formatted string (e.g., "1.5 MB") """ if size_bytes == 0: return "0 B" size_names = ["B", "KB", "MB", "GB"] i = 0 size = float(size_bytes) while size >= 1024.0 and i < len(size_names) - 1: size /= 1024.0 i += 1 return f"{size:.1f} {size_names[i]}"