File size: 3,760 Bytes
32e1e14 a5a3530 f09864d ed85b4d 32e1e14 ed85b4d 32e1e14 ed85b4d 32e1e14 ed85b4d 32e1e14 ed85b4d a5a3530 ed85b4d a5a3530 ed85b4d a5a3530 ed85b4d a5a3530 ed85b4d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 | import os
import shutil
from typing import List, Tuple, Optional, Dict, Any, Callable
from .motion_processor import MotionProcessor
from .file_handler import FileHandler
from .utils import unique_name, size_mb
class VideoProcessor:
def __init__(self, motion: MotionProcessor, files: FileHandler) -> None:
self.motion = motion
self.files = files
def process_one(self, input: str, output: str, aspect: str,
template: Optional[Dict[str, Any]] = None,
quality: str = 'balanced') -> str:
return self.motion.apply(input, output, template, aspect, quality)
def create(self, input_files: List[Any], aspect: str = 'youtube',
fmt: str = 'mp4',
progress: Optional[Callable[[float, str], None]] = None,
template: Optional[Dict[str, Any]] = None,
quality: str = 'balanced') -> Tuple[Optional[str], str]:
# Get files
files = self.files.get_files(input_files)
if not files:
return None, "No valid files found"
# Setup
seg_dir = self._setup_segments()
processed = []
if progress:
progress(0.1, f"Processing {len(files)} files with {quality} quality...")
try:
# Process each file
processed = self._process_all(files, seg_dir, aspect, template, quality, progress)
if not processed:
raise RuntimeError("Failed to process any files")
# Combine
final = self._combine(processed, fmt, progress)
# Info
out_size = size_mb(final)
if progress:
progress(1.0, "Complete!")
return final, f"Created {fmt.upper()} from {len(processed)} segments ({out_size} MB)"
finally:
self._cleanup(processed, seg_dir)
def _setup_segments(self) -> str:
seg_dir = os.path.join(os.getcwd(), unique_name("segments", ""))
os.makedirs(seg_dir, exist_ok=True)
return seg_dir
def _process_all(self, files: List[str], seg_dir: str, aspect: str,
template: Optional[Dict[str, Any]], quality: str,
progress: Optional[Callable[[float, str], None]]) -> List[str]:
processed = []
for i, path in enumerate(files):
output = os.path.join(seg_dir, f"seg_{i:03d}.mp4")
result = self.process_one(path, output, aspect, template, quality)
if not os.path.exists(result):
raise RuntimeError(f"Failed to process {path}")
processed.append(result)
if progress:
progress(0.1 + 0.6 * (i + 1) / len(files), f"Processed {i+1}/{len(files)} files")
return processed
def _combine(self, processed: List[str], fmt: str,
progress: Optional[Callable[[float, str], None]]) -> str:
if progress:
progress(0.8, f"Combining {len(processed)} segments...")
ext = 'gif' if fmt == 'gif' else 'mp4'
final = os.path.join(os.getcwd(), unique_name("final", f".{ext}"))
self.motion.concat(processed, final, fmt)
if not os.path.exists(final):
raise RuntimeError("Failed to create final output")
return final
def _cleanup(self, processed: List[str], seg_dir: str) -> None:
for seg in processed:
if os.path.exists(seg):
os.unlink(seg)
if os.path.exists(seg_dir):
shutil.rmtree(seg_dir) |