""" Memory optimization utilities. Provides utilities for processing large audio files (>1 hour) efficiently without running out of memory. """ import gc import logging from pathlib import Path from typing import Iterator, List, Optional, Tuple import numpy as np from src.lib.audio_io import AudioIOError, read_audio logger = logging.getLogger(__name__) class AudioChunker: """ Utility for processing large audio files in chunks. Allows processing audio files that are too large to fit in memory by streaming them in manageable chunks. """ def __init__(self, chunk_duration: float = 60.0, overlap: float = 5.0): """ Initialize audio chunker. Args: chunk_duration: Duration of each chunk in seconds (default: 60s) overlap: Overlap between chunks in seconds (default: 5s) """ self.chunk_duration = chunk_duration self.overlap = overlap logger.debug(f"AudioChunker initialized (chunk: {chunk_duration}s, overlap: {overlap}s)") def iter_chunks( self, file_path: str, target_sr: int = 16000 ) -> Iterator[Tuple[np.ndarray, int, float, float]]: """ Iterate over audio file in chunks. Args: file_path: Path to audio file target_sr: Target sample rate Yields: Tuples of (audio_chunk, sample_rate, start_time, end_time) Raises: AudioIOError: If file cannot be read """ try: # Read full audio (we'll optimize this for truly large files later) audio, sr = read_audio(file_path, target_sr=target_sr) total_duration = len(audio) / sr logger.info( f"Processing {Path(file_path).name} in chunks " f"(duration: {total_duration:.1f}s, chunk size: {self.chunk_duration}s)" ) # Calculate chunk parameters chunk_samples = int(self.chunk_duration * sr) overlap_samples = int(self.overlap * sr) step_samples = chunk_samples - overlap_samples position = 0 chunk_idx = 0 while position < len(audio): # Extract chunk chunk_start = position chunk_end = min(position + chunk_samples, len(audio)) chunk = audio[chunk_start:chunk_end] # Calculate time boundaries start_time = chunk_start / sr end_time = chunk_end / sr logger.debug( f"Chunk {chunk_idx}: {start_time:.1f}s - {end_time:.1f}s " f"({len(chunk) / sr:.1f}s)" ) yield chunk, sr, start_time, end_time # Move to next chunk position += step_samples chunk_idx += 1 # Force garbage collection between chunks gc.collect() logger.info(f"Processed {chunk_idx} chunks") except Exception as e: logger.error(f"Failed to process chunks: {e}") raise AudioIOError(f"Chunking failed: {e}") def process_file_in_chunks( self, file_path: str, processor_func, target_sr: int = 16000, **processor_kwargs ) -> List: """ Process audio file in chunks with custom processor function. Args: file_path: Path to audio file processor_func: Function to process each chunk Should accept (audio, sr, start_time, end_time, **kwargs) target_sr: Target sample rate **processor_kwargs: Additional arguments for processor function Returns: List of processing results from each chunk Example: >>> def detect_segments(audio, sr, start_time, end_time): ... # Process audio chunk ... return segments >>> >>> chunker = AudioChunker(chunk_duration=60.0) >>> results = chunker.process_file_in_chunks( ... "long_file.m4a", ... detect_segments ... ) """ results = [] for chunk, sr, start_time, end_time in self.iter_chunks(file_path, target_sr): try: result = processor_func(chunk, sr, start_time, end_time, **processor_kwargs) results.append(result) except Exception as e: logger.error(f"Chunk processing failed at {start_time:.1f}s: {e}") # Continue with next chunk continue return results class MemoryMonitor: """ Monitor and manage memory usage during processing. """ def __init__(self, max_memory_mb: Optional[float] = None): """ Initialize memory monitor. Args: max_memory_mb: Maximum memory usage in MB (None = no limit) """ self.max_memory_mb = max_memory_mb try: import os import psutil self.process = psutil.Process(os.getpid()) self.psutil_available = True except ImportError: logger.warning("psutil not available, memory monitoring disabled") self.psutil_available = False def get_current_memory_mb(self) -> float: """ Get current memory usage in MB. Returns: Memory usage in MB, or 0 if unavailable """ if not self.psutil_available: return 0.0 try: return self.process.memory_info().rss / 1024 / 1024 except Exception: return 0.0 def check_memory_limit(self) -> bool: """ Check if memory usage is below limit. Returns: True if within limit (or no limit set), False if exceeded """ if self.max_memory_mb is None: return True current_mb = self.get_current_memory_mb() if current_mb > self.max_memory_mb: logger.warning( f"Memory limit exceeded: {current_mb:.1f}MB > {self.max_memory_mb:.1f}MB" ) return False return True def force_cleanup(self): """Force garbage collection and cleanup.""" gc.collect() if self.psutil_available: try: import torch if torch.cuda.is_available(): torch.cuda.empty_cache() logger.debug("Cleared CUDA cache") except ImportError: pass logger.debug("Forced garbage collection") def optimize_for_large_files(audio_duration: float) -> dict: """ Get optimization recommendations for large files. Args: audio_duration: Duration of audio file in seconds Returns: Dictionary with optimization parameters """ # Thresholds LARGE_FILE_THRESHOLD = 3600 # 1 hour VERY_LARGE_FILE_THRESHOLD = 7200 # 2 hours config = { "use_chunking": False, "chunk_duration": 60.0, "chunk_overlap": 5.0, "force_gc_frequency": 10, # Force GC every N chunks "recommended_batch_size": 32, } if audio_duration > VERY_LARGE_FILE_THRESHOLD: # Very large file (>2 hours) config.update( { "use_chunking": True, "chunk_duration": 30.0, # Smaller chunks "chunk_overlap": 3.0, "force_gc_frequency": 5, # More frequent GC "recommended_batch_size": 16, # Smaller batches } ) logger.info( f"Large file detected ({audio_duration / 3600:.1f}h), " "using aggressive memory optimization" ) elif audio_duration > LARGE_FILE_THRESHOLD: # Large file (>1 hour) config.update( { "use_chunking": True, "chunk_duration": 60.0, "chunk_overlap": 5.0, "force_gc_frequency": 10, "recommended_batch_size": 24, } ) logger.info( f"Large file detected ({audio_duration / 3600:.1f}h), using memory optimization" ) return config def estimate_memory_requirements( audio_duration: float, sample_rate: int = 16000, num_models: int = 3, safety_factor: float = 2.0 ) -> float: """ Estimate memory requirements for processing. Args: audio_duration: Duration in seconds sample_rate: Sample rate in Hz num_models: Number of ML models to load safety_factor: Safety multiplier (default: 2.0) Returns: Estimated memory requirement in MB """ # Audio data (float32 = 4 bytes) audio_mb = (audio_duration * sample_rate * 4) / 1024 / 1024 # Model overhead (rough estimate) model_mb = num_models * 500 # ~500MB per model # Processing overhead processing_mb = audio_mb * 2 # Intermediate buffers, embeddings, etc. total_mb = (audio_mb + model_mb + processing_mb) * safety_factor logger.debug( f"Estimated memory: audio={audio_mb:.1f}MB, " f"models={model_mb:.1f}MB, processing={processing_mb:.1f}MB, " f"total={total_mb:.1f}MB (with {safety_factor}x safety factor)" ) return total_mb