import os import shutil from typing import List, Tuple, Optional, Dict, Any, Callable from .motion_processor import MotionProcessor from .file_handler import FileHandler from .utils import unique_name, size_mb class VideoProcessor: def __init__(self, motion: MotionProcessor, files: FileHandler) -> None: self.motion = motion self.files = files def process_one(self, input: str, output: str, aspect: str, template: Optional[Dict[str, Any]] = None, quality: str = 'balanced') -> str: return self.motion.apply(input, output, template, aspect, quality) def create(self, input_files: List[Any], aspect: str = 'youtube', fmt: str = 'mp4', progress: Optional[Callable[[float, str], None]] = None, template: Optional[Dict[str, Any]] = None, quality: str = 'balanced') -> Tuple[Optional[str], str]: # Get files files = self.files.get_files(input_files) if not files: return None, "No valid files found" # Setup seg_dir = self._setup_segments() processed = [] if progress: progress(0.1, f"Processing {len(files)} files with {quality} quality...") try: # Process each file processed = self._process_all(files, seg_dir, aspect, template, quality, progress) if not processed: raise RuntimeError("Failed to process any files") # Combine final = self._combine(processed, fmt, progress) # Info out_size = size_mb(final) if progress: progress(1.0, "Complete!") return final, f"Created {fmt.upper()} from {len(processed)} segments ({out_size} MB)" finally: self._cleanup(processed, seg_dir) def _setup_segments(self) -> str: seg_dir = os.path.join(os.getcwd(), unique_name("segments", "")) os.makedirs(seg_dir, exist_ok=True) return seg_dir def _process_all(self, files: List[str], seg_dir: str, aspect: str, template: Optional[Dict[str, Any]], quality: str, progress: Optional[Callable[[float, str], None]]) -> List[str]: processed = [] for i, path in enumerate(files): output = os.path.join(seg_dir, f"seg_{i:03d}.mp4") result = self.process_one(path, output, aspect, template, quality) if not os.path.exists(result): raise RuntimeError(f"Failed to process {path}") processed.append(result) if progress: progress(0.1 + 0.6 * (i + 1) / len(files), f"Processed {i+1}/{len(files)} files") return processed def _combine(self, processed: List[str], fmt: str, progress: Optional[Callable[[float, str], None]]) -> str: if progress: progress(0.8, f"Combining {len(processed)} segments...") ext = 'gif' if fmt == 'gif' else 'mp4' final = os.path.join(os.getcwd(), unique_name("final", f".{ext}")) self.motion.concat(processed, final, fmt) if not os.path.exists(final): raise RuntimeError("Failed to create final output") return final def _cleanup(self, processed: List[str], seg_dir: str) -> None: for seg in processed: if os.path.exists(seg): os.unlink(seg) if os.path.exists(seg_dir): shutil.rmtree(seg_dir)