editor / src /video_processor.py
Vo Hoang Minh
up
ed85b4d
import os
import shutil
from typing import List, Tuple, Optional, Dict, Any, Callable
from .motion_processor import MotionProcessor
from .file_handler import FileHandler
from .utils import unique_name, size_mb
class VideoProcessor:
def __init__(self, motion: MotionProcessor, files: FileHandler) -> None:
self.motion = motion
self.files = files
def process_one(self, input: str, output: str, aspect: str,
template: Optional[Dict[str, Any]] = None,
quality: str = 'balanced') -> str:
return self.motion.apply(input, output, template, aspect, quality)
def create(self, input_files: List[Any], aspect: str = 'youtube',
fmt: str = 'mp4',
progress: Optional[Callable[[float, str], None]] = None,
template: Optional[Dict[str, Any]] = None,
quality: str = 'balanced') -> Tuple[Optional[str], str]:
# Get files
files = self.files.get_files(input_files)
if not files:
return None, "No valid files found"
# Setup
seg_dir = self._setup_segments()
processed = []
if progress:
progress(0.1, f"Processing {len(files)} files with {quality} quality...")
try:
# Process each file
processed = self._process_all(files, seg_dir, aspect, template, quality, progress)
if not processed:
raise RuntimeError("Failed to process any files")
# Combine
final = self._combine(processed, fmt, progress)
# Info
out_size = size_mb(final)
if progress:
progress(1.0, "Complete!")
return final, f"Created {fmt.upper()} from {len(processed)} segments ({out_size} MB)"
finally:
self._cleanup(processed, seg_dir)
def _setup_segments(self) -> str:
seg_dir = os.path.join(os.getcwd(), unique_name("segments", ""))
os.makedirs(seg_dir, exist_ok=True)
return seg_dir
def _process_all(self, files: List[str], seg_dir: str, aspect: str,
template: Optional[Dict[str, Any]], quality: str,
progress: Optional[Callable[[float, str], None]]) -> List[str]:
processed = []
for i, path in enumerate(files):
output = os.path.join(seg_dir, f"seg_{i:03d}.mp4")
result = self.process_one(path, output, aspect, template, quality)
if not os.path.exists(result):
raise RuntimeError(f"Failed to process {path}")
processed.append(result)
if progress:
progress(0.1 + 0.6 * (i + 1) / len(files), f"Processed {i+1}/{len(files)} files")
return processed
def _combine(self, processed: List[str], fmt: str,
progress: Optional[Callable[[float, str], None]]) -> str:
if progress:
progress(0.8, f"Combining {len(processed)} segments...")
ext = 'gif' if fmt == 'gif' else 'mp4'
final = os.path.join(os.getcwd(), unique_name("final", f".{ext}"))
self.motion.concat(processed, final, fmt)
if not os.path.exists(final):
raise RuntimeError("Failed to create final output")
return final
def _cleanup(self, processed: List[str], seg_dir: str) -> None:
for seg in processed:
if os.path.exists(seg):
os.unlink(seg)
if os.path.exists(seg_dir):
shutil.rmtree(seg_dir)