Spaces:
Build error
Build error
| import os | |
| import tempfile | |
| from PIL import Image | |
| from typing import Tuple, Optional | |
| import logging | |
| from pathlib import Path | |
| logger = logging.getLogger(__name__) | |
| def validate_inputs(reference_image, source_video) -> Tuple[Image.Image, str]: | |
| """ | |
| Validate and process input files. | |
| Args: | |
| reference_image: PIL Image object | |
| source_video: Path or UploadedFile object | |
| Returns: | |
| Tuple of (PIL Image, video file path) | |
| """ | |
| # Validate reference image | |
| if reference_image is None: | |
| raise ValueError("Reference image is required") | |
| # Ensure reference image is PIL Image | |
| if not isinstance(reference_image, Image.Image): | |
| if hasattr(reference_image, 'name'): | |
| reference_image = Image.open(reference_image.name) | |
| else: | |
| raise ValueError("Invalid reference image format") | |
| # Convert to RGB if needed | |
| if reference_image.mode != 'RGB': | |
| reference_image = reference_image.convert('RGB') | |
| # Handle video file | |
| if hasattr(source_video, 'name'): | |
| video_path = source_video.name | |
| elif isinstance(source_video, str): | |
| video_path = source_video | |
| else: | |
| raise ValueError("Invalid video file format") | |
| # Validate video file exists | |
| if not os.path.exists(video_path): | |
| raise ValueError("Video file not found") | |
| # Check file size (limit to 500MB) | |
| file_size = os.path.getsize(video_path) | |
| if file_size > 500 * 1024 * 1024: # 500MB | |
| raise ValueError("Video file too large. Maximum size is 500MB") | |
| logger.info(f"Inputs validated successfully") | |
| return reference_image, video_path | |
| def save_uploaded_files(reference_image, source_video) -> Tuple[str, str]: | |
| """ | |
| Save uploaded files to temporary directory. | |
| Args: | |
| reference_image: PIL Image object | |
| source_video: Video file path | |
| Returns: | |
| Tuple of (reference_image_path, video_path) | |
| """ | |
| # Create temporary directory | |
| temp_dir = tempfile.mkdtemp(prefix='video_char_replacement_') | |
| # Save reference image | |
| ref_path = os.path.join(temp_dir, 'reference_image.jpg') | |
| reference_image.save(ref_path, 'JPEG', quality=95) | |
| # Copy video file | |
| video_path = os.path.join(temp_dir, 'source_video.mp4') | |
| # Handle different video formats | |
| if source_video.endswith(('.avi', '.mov', '.mkv')): | |
| # Convert to MP4 if needed | |
| try: | |
| import cv2 | |
| cap = cv2.VideoCapture(source_video) | |
| fps = int(cap.get(cv2.CAP_PROP_FPS)) | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(video_path, fourcc, fps, (width, height)) | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| out.write(frame) | |
| cap.release() | |
| out.release() | |
| except Exception as e: | |
| logger.warning(f"Could not convert video format: {e}") | |
| # Just copy the file as-is | |
| import shutil | |
| shutil.copy2(source_video, video_path) | |
| else: | |
| # Copy the file as-is | |
| import shutil | |
| shutil.copy2(source_video, video_path) | |
| logger.info(f"Files saved to temporary directory: {temp_dir}") | |
| return ref_path, video_path | |
| def create_output_directory() -> str: | |
| """ | |
| Create output directory for processed videos. | |
| Returns: | |
| Path to output directory | |
| """ | |
| # Create output directory in temporary folder | |
| output_dir = tempfile.mkdtemp(prefix='video_char_replacement_output_') | |
| # Ensure directory exists | |
| os.makedirs(output_dir, exist_ok=True) | |
| logger.info(f"Output directory created: {output_dir}") | |
| return output_dir | |
| def cleanup_temp_files(temp_dir: str): | |
| """ | |
| Clean up temporary files. | |
| Args: | |
| temp_dir: Directory to clean up | |
| """ | |
| try: | |
| import shutil | |
| if os.path.exists(temp_dir): | |
| shutil.rmtree(temp_dir) | |
| logger.info(f"Cleaned up temporary directory: {temp_dir}") | |
| except Exception as e: | |
| logger.warning(f"Could not clean up temporary directory: {e}") | |
| def format_file_size(size_bytes: int) -> str: | |
| """ | |
| Format file size in human-readable format. | |
| Args: | |
| size_bytes: Size in bytes | |
| Returns: | |
| Formatted string (e.g., "1.5 MB") | |
| """ | |
| if size_bytes == 0: | |
| return "0 B" | |
| size_names = ["B", "KB", "MB", "GB"] | |
| i = 0 | |
| size = float(size_bytes) | |
| while size >= 1024.0 and i < len(size_names) - 1: | |
| size /= 1024.0 | |
| i += 1 | |
| return f"{size:.1f} {size_names[i]}" |