my-demo-694 / utils.py
ItsMpilo's picture
Deploy Gradio app with multiple files
a34d96e verified
import os
import tempfile
from PIL import Image
from typing import Tuple, Optional
import logging
from pathlib import Path
logger = logging.getLogger(__name__)
def validate_inputs(reference_image, source_video) -> Tuple[Image.Image, str]:
"""
Validate and process input files.
Args:
reference_image: PIL Image object
source_video: Path or UploadedFile object
Returns:
Tuple of (PIL Image, video file path)
"""
# Validate reference image
if reference_image is None:
raise ValueError("Reference image is required")
# Ensure reference image is PIL Image
if not isinstance(reference_image, Image.Image):
if hasattr(reference_image, 'name'):
reference_image = Image.open(reference_image.name)
else:
raise ValueError("Invalid reference image format")
# Convert to RGB if needed
if reference_image.mode != 'RGB':
reference_image = reference_image.convert('RGB')
# Handle video file
if hasattr(source_video, 'name'):
video_path = source_video.name
elif isinstance(source_video, str):
video_path = source_video
else:
raise ValueError("Invalid video file format")
# Validate video file exists
if not os.path.exists(video_path):
raise ValueError("Video file not found")
# Check file size (limit to 500MB)
file_size = os.path.getsize(video_path)
if file_size > 500 * 1024 * 1024: # 500MB
raise ValueError("Video file too large. Maximum size is 500MB")
logger.info(f"Inputs validated successfully")
return reference_image, video_path
def save_uploaded_files(reference_image, source_video) -> Tuple[str, str]:
"""
Save uploaded files to temporary directory.
Args:
reference_image: PIL Image object
source_video: Video file path
Returns:
Tuple of (reference_image_path, video_path)
"""
# Create temporary directory
temp_dir = tempfile.mkdtemp(prefix='video_char_replacement_')
# Save reference image
ref_path = os.path.join(temp_dir, 'reference_image.jpg')
reference_image.save(ref_path, 'JPEG', quality=95)
# Copy video file
video_path = os.path.join(temp_dir, 'source_video.mp4')
# Handle different video formats
if source_video.endswith(('.avi', '.mov', '.mkv')):
# Convert to MP4 if needed
try:
import cv2
cap = cv2.VideoCapture(source_video)
fps = int(cap.get(cv2.CAP_PROP_FPS))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(video_path, fourcc, fps, (width, height))
while True:
ret, frame = cap.read()
if not ret:
break
out.write(frame)
cap.release()
out.release()
except Exception as e:
logger.warning(f"Could not convert video format: {e}")
# Just copy the file as-is
import shutil
shutil.copy2(source_video, video_path)
else:
# Copy the file as-is
import shutil
shutil.copy2(source_video, video_path)
logger.info(f"Files saved to temporary directory: {temp_dir}")
return ref_path, video_path
def create_output_directory() -> str:
"""
Create output directory for processed videos.
Returns:
Path to output directory
"""
# Create output directory in temporary folder
output_dir = tempfile.mkdtemp(prefix='video_char_replacement_output_')
# Ensure directory exists
os.makedirs(output_dir, exist_ok=True)
logger.info(f"Output directory created: {output_dir}")
return output_dir
def cleanup_temp_files(temp_dir: str):
"""
Clean up temporary files.
Args:
temp_dir: Directory to clean up
"""
try:
import shutil
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
logger.info(f"Cleaned up temporary directory: {temp_dir}")
except Exception as e:
logger.warning(f"Could not clean up temporary directory: {e}")
def format_file_size(size_bytes: int) -> str:
"""
Format file size in human-readable format.
Args:
size_bytes: Size in bytes
Returns:
Formatted string (e.g., "1.5 MB")
"""
if size_bytes == 0:
return "0 B"
size_names = ["B", "KB", "MB", "GB"]
i = 0
size = float(size_bytes)
while size >= 1024.0 and i < len(size_names) - 1:
size /= 1024.0
i += 1
return f"{size:.1f} {size_names[i]}"