File size: 4,842 Bytes
a34d96e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import os
import tempfile
from PIL import Image
from typing import Tuple, Optional
import logging
from pathlib import Path

logger = logging.getLogger(__name__)

def validate_inputs(reference_image, source_video) -> Tuple[Image.Image, str]:
    """
    Validate and process input files.
    
    Args:
        reference_image: PIL Image object
        source_video: Path or UploadedFile object
    
    Returns:
        Tuple of (PIL Image, video file path)
    """
    # Validate reference image
    if reference_image is None:
        raise ValueError("Reference image is required")
    
    # Ensure reference image is PIL Image
    if not isinstance(reference_image, Image.Image):
        if hasattr(reference_image, 'name'):
            reference_image = Image.open(reference_image.name)
        else:
            raise ValueError("Invalid reference image format")
    
    # Convert to RGB if needed
    if reference_image.mode != 'RGB':
        reference_image = reference_image.convert('RGB')
    
    # Handle video file
    if hasattr(source_video, 'name'):
        video_path = source_video.name
    elif isinstance(source_video, str):
        video_path = source_video
    else:
        raise ValueError("Invalid video file format")
    
    # Validate video file exists
    if not os.path.exists(video_path):
        raise ValueError("Video file not found")
    
    # Check file size (limit to 500MB)
    file_size = os.path.getsize(video_path)
    if file_size > 500 * 1024 * 1024:  # 500MB
        raise ValueError("Video file too large. Maximum size is 500MB")
    
    logger.info(f"Inputs validated successfully")
    return reference_image, video_path

def save_uploaded_files(reference_image, source_video) -> Tuple[str, str]:
    """
    Save uploaded files to temporary directory.
    
    Args:
        reference_image: PIL Image object
        source_video: Video file path
    
    Returns:
        Tuple of (reference_image_path, video_path)
    """
    # Create temporary directory
    temp_dir = tempfile.mkdtemp(prefix='video_char_replacement_')
    
    # Save reference image
    ref_path = os.path.join(temp_dir, 'reference_image.jpg')
    reference_image.save(ref_path, 'JPEG', quality=95)
    
    # Copy video file
    video_path = os.path.join(temp_dir, 'source_video.mp4')
    
    # Handle different video formats
    if source_video.endswith(('.avi', '.mov', '.mkv')):
        # Convert to MP4 if needed
        try:
            import cv2
            cap = cv2.VideoCapture(source_video)
            fps = int(cap.get(cv2.CAP_PROP_FPS))
            width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
            height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
            
            fourcc = cv2.VideoWriter_fourcc(*'mp4v')
            out = cv2.VideoWriter(video_path, fourcc, fps, (width, height))
            
            while True:
                ret, frame = cap.read()
                if not ret:
                    break
                out.write(frame)
            
            cap.release()
            out.release()
        except Exception as e:
            logger.warning(f"Could not convert video format: {e}")
            # Just copy the file as-is
            import shutil
            shutil.copy2(source_video, video_path)
    else:
        # Copy the file as-is
        import shutil
        shutil.copy2(source_video, video_path)
    
    logger.info(f"Files saved to temporary directory: {temp_dir}")
    return ref_path, video_path

def create_output_directory() -> str:
    """
    Create output directory for processed videos.
    
    Returns:
        Path to output directory
    """
    # Create output directory in temporary folder
    output_dir = tempfile.mkdtemp(prefix='video_char_replacement_output_')
    
    # Ensure directory exists
    os.makedirs(output_dir, exist_ok=True)
    
    logger.info(f"Output directory created: {output_dir}")
    return output_dir

def cleanup_temp_files(temp_dir: str):
    """
    Clean up temporary files.
    
    Args:
        temp_dir: Directory to clean up
    """
    try:
        import shutil
        if os.path.exists(temp_dir):
            shutil.rmtree(temp_dir)
        logger.info(f"Cleaned up temporary directory: {temp_dir}")
    except Exception as e:
        logger.warning(f"Could not clean up temporary directory: {e}")

def format_file_size(size_bytes: int) -> str:
    """
    Format file size in human-readable format.
    
    Args:
        size_bytes: Size in bytes
    
    Returns:
        Formatted string (e.g., "1.5 MB")
    """
    if size_bytes == 0:
        return "0 B"
    
    size_names = ["B", "KB", "MB", "GB"]
    i = 0
    size = float(size_bytes)
    
    while size >= 1024.0 and i < len(size_names) - 1:
        size /= 1024.0
        i += 1
    
    return f"{size:.1f} {size_names[i]}"