| import os |
| import shutil |
| from typing import List, Tuple, Optional, Dict, Any, Callable |
| from .motion_processor import MotionProcessor |
| from .file_handler import FileHandler |
| from .utils import unique_name, size_mb |
|
|
| class VideoProcessor: |
| def __init__(self, motion: MotionProcessor, files: FileHandler) -> None: |
| self.motion = motion |
| self.files = files |
| |
| def process_one(self, input: str, output: str, aspect: str, |
| template: Optional[Dict[str, Any]] = None, |
| quality: str = 'balanced') -> str: |
| return self.motion.apply(input, output, template, aspect, quality) |
| |
| def create(self, input_files: List[Any], aspect: str = 'youtube', |
| fmt: str = 'mp4', |
| progress: Optional[Callable[[float, str], None]] = None, |
| template: Optional[Dict[str, Any]] = None, |
| quality: str = 'balanced') -> Tuple[Optional[str], str]: |
| |
| |
| files = self.files.get_files(input_files) |
| if not files: |
| return None, "No valid files found" |
|
|
| |
| seg_dir = self._setup_segments() |
| processed = [] |
| |
| if progress: |
| progress(0.1, f"Processing {len(files)} files with {quality} quality...") |
| |
| try: |
| |
| processed = self._process_all(files, seg_dir, aspect, template, quality, progress) |
| |
| if not processed: |
| raise RuntimeError("Failed to process any files") |
| |
| |
| final = self._combine(processed, fmt, progress) |
| |
| |
| out_size = size_mb(final) |
| |
| if progress: |
| progress(1.0, "Complete!") |
| |
| return final, f"Created {fmt.upper()} from {len(processed)} segments ({out_size} MB)" |
| |
| finally: |
| self._cleanup(processed, seg_dir) |
| |
| def _setup_segments(self) -> str: |
| seg_dir = os.path.join(os.getcwd(), unique_name("segments", "")) |
| os.makedirs(seg_dir, exist_ok=True) |
| return seg_dir |
| |
| def _process_all(self, files: List[str], seg_dir: str, aspect: str, |
| template: Optional[Dict[str, Any]], quality: str, |
| progress: Optional[Callable[[float, str], None]]) -> List[str]: |
| processed = [] |
| |
| for i, path in enumerate(files): |
| output = os.path.join(seg_dir, f"seg_{i:03d}.mp4") |
| |
| result = self.process_one(path, output, aspect, template, quality) |
| |
| if not os.path.exists(result): |
| raise RuntimeError(f"Failed to process {path}") |
| |
| processed.append(result) |
| |
| if progress: |
| progress(0.1 + 0.6 * (i + 1) / len(files), f"Processed {i+1}/{len(files)} files") |
| |
| return processed |
| |
| def _combine(self, processed: List[str], fmt: str, |
| progress: Optional[Callable[[float, str], None]]) -> str: |
| if progress: |
| progress(0.8, f"Combining {len(processed)} segments...") |
| |
| ext = 'gif' if fmt == 'gif' else 'mp4' |
| final = os.path.join(os.getcwd(), unique_name("final", f".{ext}")) |
| |
| self.motion.concat(processed, final, fmt) |
| |
| if not os.path.exists(final): |
| raise RuntimeError("Failed to create final output") |
| |
| return final |
| |
| def _cleanup(self, processed: List[str], seg_dir: str) -> None: |
| for seg in processed: |
| if os.path.exists(seg): |
| os.unlink(seg) |
| |
| if os.path.exists(seg_dir): |
| shutil.rmtree(seg_dir) |