File size: 3,760 Bytes
32e1e14
a5a3530
f09864d
 
 
ed85b4d
32e1e14
 
ed85b4d
 
 
32e1e14
ed85b4d
 
 
 
32e1e14
ed85b4d
 
 
 
 
 
 
 
32e1e14
 
 
ed85b4d
 
 
 
 
 
a5a3530
 
ed85b4d
 
 
a5a3530
ed85b4d
 
 
 
 
 
 
 
 
 
 
 
a5a3530
 
ed85b4d
 
 
 
 
 
 
 
 
 
 
 
 
 
a5a3530
ed85b4d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import os
import shutil
from typing import List, Tuple, Optional, Dict, Any, Callable
from .motion_processor import MotionProcessor
from .file_handler import FileHandler
from .utils import unique_name, size_mb

class VideoProcessor:
    def __init__(self, motion: MotionProcessor, files: FileHandler) -> None:
        self.motion = motion
        self.files = files
    
    def process_one(self, input: str, output: str, aspect: str, 
                   template: Optional[Dict[str, Any]] = None, 
                   quality: str = 'balanced') -> str:
        return self.motion.apply(input, output, template, aspect, quality)
    
    def create(self, input_files: List[Any], aspect: str = 'youtube', 
              fmt: str = 'mp4', 
              progress: Optional[Callable[[float, str], None]] = None, 
              template: Optional[Dict[str, Any]] = None,
              quality: str = 'balanced') -> Tuple[Optional[str], str]:
        
        # Get files
        files = self.files.get_files(input_files)
        if not files:
            return None, "No valid files found"

        # Setup
        seg_dir = self._setup_segments()
        processed = []
        
        if progress:
            progress(0.1, f"Processing {len(files)} files with {quality} quality...")
        
        try:
            # Process each file
            processed = self._process_all(files, seg_dir, aspect, template, quality, progress)
            
            if not processed:
                raise RuntimeError("Failed to process any files")
            
            # Combine
            final = self._combine(processed, fmt, progress)
            
            # Info
            out_size = size_mb(final)
            
            if progress:
                progress(1.0, "Complete!")
            
            return final, f"Created {fmt.upper()} from {len(processed)} segments ({out_size} MB)"
            
        finally:
            self._cleanup(processed, seg_dir)
    
    def _setup_segments(self) -> str:
        seg_dir = os.path.join(os.getcwd(), unique_name("segments", ""))
        os.makedirs(seg_dir, exist_ok=True)
        return seg_dir
    
    def _process_all(self, files: List[str], seg_dir: str, aspect: str,
                    template: Optional[Dict[str, Any]], quality: str,
                    progress: Optional[Callable[[float, str], None]]) -> List[str]:
        processed = []
        
        for i, path in enumerate(files):
            output = os.path.join(seg_dir, f"seg_{i:03d}.mp4")
            
            result = self.process_one(path, output, aspect, template, quality)
            
            if not os.path.exists(result):
                raise RuntimeError(f"Failed to process {path}")
            
            processed.append(result)
            
            if progress:
                progress(0.1 + 0.6 * (i + 1) / len(files), f"Processed {i+1}/{len(files)} files")
        
        return processed
    
    def _combine(self, processed: List[str], fmt: str,
                progress: Optional[Callable[[float, str], None]]) -> str:
        if progress:
            progress(0.8, f"Combining {len(processed)} segments...")
        
        ext = 'gif' if fmt == 'gif' else 'mp4'
        final = os.path.join(os.getcwd(), unique_name("final", f".{ext}"))
        
        self.motion.concat(processed, final, fmt)
        
        if not os.path.exists(final):
            raise RuntimeError("Failed to create final output")
        
        return final
    
    def _cleanup(self, processed: List[str], seg_dir: str) -> None:
        for seg in processed:
            if os.path.exists(seg):
                os.unlink(seg)
        
        if os.path.exists(seg_dir):
            shutil.rmtree(seg_dir)