srt-caption-generator / performance_optimizer.py
Your Name
fine v.1.0 enhanced with reflected .md
a646649
"""Performance optimization utilities for the caption generation tool."""
import os
import hashlib
import logging
from pathlib import Path
from typing import Dict, List, Optional, Union
from contextlib import contextmanager
from config import MODEL_CACHE_DIR, MAX_AUDIO_LENGTH_SEC, TEMP_FILE_PREFIX
logger = logging.getLogger(__name__)
class ModelCacheManager:
"""Manages local model caching to avoid repeated downloads."""
def __init__(self, cache_dir: str = MODEL_CACHE_DIR):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
def get_model_path(self, model_id: str) -> Optional[Path]:
"""Check if model is cached locally."""
model_hash = hashlib.md5(model_id.encode()).hexdigest()[:8]
model_path = self.cache_dir / f"model_{model_hash}"
return model_path if model_path.exists() else None
def cache_model(self, model_id: str, model_data: bytes) -> Path:
"""Cache model data locally."""
model_hash = hashlib.md5(model_id.encode()).hexdigest()[:8]
model_path = self.cache_dir / f"model_{model_hash}"
with open(model_path, 'wb') as f:
f.write(model_data)
logger.info(f"Cached model {model_id} to {model_path}")
return model_path
class AudioValidator:
"""Enhanced audio validation with performance checks."""
@staticmethod
def validate_audio_duration(audio_path: Union[str, Path]) -> float:
"""Validate audio duration is within processing limits."""
import subprocess
audio_path = Path(audio_path)
# Use ffprobe to get duration quickly without loading audio
cmd = [
'ffprobe', '-v', 'quiet', '-show_entries',
'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1',
str(audio_path)
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
duration = float(result.stdout.strip())
if duration > MAX_AUDIO_LENGTH_SEC:
raise ValueError(
f"Audio too long: {duration:.1f}s (max: {MAX_AUDIO_LENGTH_SEC}s). "
"Consider splitting into smaller segments."
)
return duration
except (subprocess.TimeoutExpired, subprocess.CalledProcessError, ValueError) as e:
raise RuntimeError(f"Failed to validate audio duration: {e}")
@contextmanager
def temp_file_manager(suffix: str = ".tmp", prefix: str = TEMP_FILE_PREFIX):
"""Context manager for safe temporary file handling."""
import tempfile
temp_files = []
try:
with tempfile.NamedTemporaryFile(
suffix=suffix, prefix=prefix, delete=False
) as f:
temp_files.append(f.name)
yield f.name
finally:
# Clean up all temp files
for temp_file in temp_files:
try:
Path(temp_file).unlink()
except OSError:
logger.warning(f"Failed to clean up temp file: {temp_file}")
class MemoryOptimizer:
"""Memory usage optimization utilities."""
@staticmethod
def estimate_memory_usage(audio_duration: float, word_count: int) -> Dict[str, float]:
"""Estimate memory requirements for processing."""
# Rough estimates based on typical usage patterns
audio_mb = audio_duration * 0.5 # ~500KB per second for 16kHz mono
model_mb = 1200 # facebook/mms-300m model size
alignment_mb = word_count * 0.01 # Alignment metadata
total_mb = audio_mb + model_mb + alignment_mb
return {
"audio_mb": audio_mb,
"model_mb": model_mb,
"alignment_mb": alignment_mb,
"total_mb": total_mb,
"recommended_ram_gb": max(4.0, total_mb / 1024 * 1.5)
}
@staticmethod
def check_available_memory() -> float:
"""Check available system memory in GB."""
import psutil
memory = psutil.virtual_memory()
return memory.available / (1024**3)
class BatchProcessor:
"""Optimized batch processing with concurrency control."""
def __init__(self, max_concurrent: int = 4):
self.max_concurrent = max_concurrent
def process_batch_optimized(self, audio_script_pairs: List[tuple],
output_dir: Path) -> List[Dict]:
"""Process multiple files with optimal resource usage."""
from concurrent.futures import ThreadPoolExecutor, as_completed
results = []
# Sort by file size for better load balancing
pairs_with_size = []
for audio_path, script_path in audio_script_pairs:
audio_size = Path(audio_path).stat().st_size
pairs_with_size.append((audio_size, audio_path, script_path))
# Process largest files first to minimize idle time
pairs_with_size.sort(reverse=True)
with ThreadPoolExecutor(max_workers=self.max_concurrent) as executor:
futures = []
for _, audio_path, script_path in pairs_with_size:
future = executor.submit(
self._process_single_optimized,
audio_path, script_path, output_dir
)
futures.append(future)
for future in as_completed(futures):
try:
result = future.result()
results.append(result)
except Exception as e:
logger.error(f"Batch processing error: {e}")
results.append({"error": str(e)})
return results
def _process_single_optimized(self, audio_path: str, script_path: str,
output_dir: Path) -> Dict:
"""Process single file with optimizations."""
# This would call the main align function with optimizations
# Implementation would go here
return {
"audio_path": audio_path,
"script_path": script_path,
"status": "processed",
"output_path": output_dir / f"{Path(audio_path).stem}.srt"
}