File size: 6,364 Bytes
a646649
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
"""Performance optimization utilities for the caption generation tool."""

import os
import hashlib
import logging
from pathlib import Path
from typing import Dict, List, Optional, Union
from contextlib import contextmanager

from config import MODEL_CACHE_DIR, MAX_AUDIO_LENGTH_SEC, TEMP_FILE_PREFIX

logger = logging.getLogger(__name__)


class ModelCacheManager:
    """Manages local model caching to avoid repeated downloads."""
    
    def __init__(self, cache_dir: str = MODEL_CACHE_DIR):
        self.cache_dir = Path(cache_dir)
        self.cache_dir.mkdir(exist_ok=True)
        
    def get_model_path(self, model_id: str) -> Optional[Path]:
        """Check if model is cached locally."""
        model_hash = hashlib.md5(model_id.encode()).hexdigest()[:8]
        model_path = self.cache_dir / f"model_{model_hash}"
        return model_path if model_path.exists() else None
    
    def cache_model(self, model_id: str, model_data: bytes) -> Path:
        """Cache model data locally."""
        model_hash = hashlib.md5(model_id.encode()).hexdigest()[:8] 
        model_path = self.cache_dir / f"model_{model_hash}"
        
        with open(model_path, 'wb') as f:
            f.write(model_data)
        
        logger.info(f"Cached model {model_id} to {model_path}")
        return model_path


class AudioValidator:
    """Enhanced audio validation with performance checks."""
    
    @staticmethod
    def validate_audio_duration(audio_path: Union[str, Path]) -> float:
        """Validate audio duration is within processing limits."""
        import subprocess
        
        audio_path = Path(audio_path)
        
        # Use ffprobe to get duration quickly without loading audio
        cmd = [
            'ffprobe', '-v', 'quiet', '-show_entries', 
            'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1',
            str(audio_path)
        ]
        
        try:
            result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
            duration = float(result.stdout.strip())
            
            if duration > MAX_AUDIO_LENGTH_SEC:
                raise ValueError(
                    f"Audio too long: {duration:.1f}s (max: {MAX_AUDIO_LENGTH_SEC}s). "
                    "Consider splitting into smaller segments."
                )
            
            return duration
            
        except (subprocess.TimeoutExpired, subprocess.CalledProcessError, ValueError) as e:
            raise RuntimeError(f"Failed to validate audio duration: {e}")


@contextmanager
def temp_file_manager(suffix: str = ".tmp", prefix: str = TEMP_FILE_PREFIX):
    """Context manager for safe temporary file handling."""
    import tempfile
    
    temp_files = []
    try:
        with tempfile.NamedTemporaryFile(
            suffix=suffix, prefix=prefix, delete=False
        ) as f:
            temp_files.append(f.name)
            yield f.name
    finally:
        # Clean up all temp files
        for temp_file in temp_files:
            try:
                Path(temp_file).unlink()
            except OSError:
                logger.warning(f"Failed to clean up temp file: {temp_file}")


class MemoryOptimizer:
    """Memory usage optimization utilities."""
    
    @staticmethod
    def estimate_memory_usage(audio_duration: float, word_count: int) -> Dict[str, float]:
        """Estimate memory requirements for processing."""
        # Rough estimates based on typical usage patterns
        audio_mb = audio_duration * 0.5  # ~500KB per second for 16kHz mono
        model_mb = 1200  # facebook/mms-300m model size
        alignment_mb = word_count * 0.01  # Alignment metadata
        
        total_mb = audio_mb + model_mb + alignment_mb
        
        return {
            "audio_mb": audio_mb,
            "model_mb": model_mb,
            "alignment_mb": alignment_mb,
            "total_mb": total_mb,
            "recommended_ram_gb": max(4.0, total_mb / 1024 * 1.5)
        }
    
    @staticmethod
    def check_available_memory() -> float:
        """Check available system memory in GB."""
        import psutil
        memory = psutil.virtual_memory()
        return memory.available / (1024**3)


class BatchProcessor:
    """Optimized batch processing with concurrency control."""
    
    def __init__(self, max_concurrent: int = 4):
        self.max_concurrent = max_concurrent
        
    def process_batch_optimized(self, audio_script_pairs: List[tuple], 
                              output_dir: Path) -> List[Dict]:
        """Process multiple files with optimal resource usage."""
        from concurrent.futures import ThreadPoolExecutor, as_completed
        
        results = []
        
        # Sort by file size for better load balancing
        pairs_with_size = []
        for audio_path, script_path in audio_script_pairs:
            audio_size = Path(audio_path).stat().st_size
            pairs_with_size.append((audio_size, audio_path, script_path))
        
        # Process largest files first to minimize idle time
        pairs_with_size.sort(reverse=True)
        
        with ThreadPoolExecutor(max_workers=self.max_concurrent) as executor:
            futures = []
            
            for _, audio_path, script_path in pairs_with_size:
                future = executor.submit(
                    self._process_single_optimized, 
                    audio_path, script_path, output_dir
                )
                futures.append(future)
            
            for future in as_completed(futures):
                try:
                    result = future.result()
                    results.append(result)
                except Exception as e:
                    logger.error(f"Batch processing error: {e}")
                    results.append({"error": str(e)})
        
        return results
    
    def _process_single_optimized(self, audio_path: str, script_path: str, 
                                output_dir: Path) -> Dict:
        """Process single file with optimizations."""
        # This would call the main align function with optimizations
        # Implementation would go here
        return {
            "audio_path": audio_path,
            "script_path": script_path,
            "status": "processed",
            "output_path": output_dir / f"{Path(audio_path).stem}.srt"
        }