File size: 9,346 Bytes
3ff2f18
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
"""
Memory optimization utilities.

Provides utilities for processing large audio files (>1 hour) efficiently
without running out of memory.
"""

import gc
import logging
from pathlib import Path
from typing import Iterator, List, Optional, Tuple

import numpy as np

from src.lib.audio_io import AudioIOError, read_audio

logger = logging.getLogger(__name__)


class AudioChunker:
    """
    Utility for processing large audio files in chunks.

    Allows processing audio files that are too large to fit in memory
    by streaming them in manageable chunks.
    """

    def __init__(self, chunk_duration: float = 60.0, overlap: float = 5.0):
        """
        Initialize audio chunker.

        Args:
            chunk_duration: Duration of each chunk in seconds (default: 60s)
            overlap: Overlap between chunks in seconds (default: 5s)
        """
        self.chunk_duration = chunk_duration
        self.overlap = overlap

        logger.debug(f"AudioChunker initialized (chunk: {chunk_duration}s, overlap: {overlap}s)")

    def iter_chunks(
        self, file_path: str, target_sr: int = 16000
    ) -> Iterator[Tuple[np.ndarray, int, float, float]]:
        """
        Iterate over audio file in chunks.

        Args:
            file_path: Path to audio file
            target_sr: Target sample rate

        Yields:
            Tuples of (audio_chunk, sample_rate, start_time, end_time)

        Raises:
            AudioIOError: If file cannot be read
        """
        try:
            # Read full audio (we'll optimize this for truly large files later)
            audio, sr = read_audio(file_path, target_sr=target_sr)
            total_duration = len(audio) / sr

            logger.info(
                f"Processing {Path(file_path).name} in chunks "
                f"(duration: {total_duration:.1f}s, chunk size: {self.chunk_duration}s)"
            )

            # Calculate chunk parameters
            chunk_samples = int(self.chunk_duration * sr)
            overlap_samples = int(self.overlap * sr)
            step_samples = chunk_samples - overlap_samples

            position = 0
            chunk_idx = 0

            while position < len(audio):
                # Extract chunk
                chunk_start = position
                chunk_end = min(position + chunk_samples, len(audio))
                chunk = audio[chunk_start:chunk_end]

                # Calculate time boundaries
                start_time = chunk_start / sr
                end_time = chunk_end / sr

                logger.debug(
                    f"Chunk {chunk_idx}: {start_time:.1f}s - {end_time:.1f}s "
                    f"({len(chunk) / sr:.1f}s)"
                )

                yield chunk, sr, start_time, end_time

                # Move to next chunk
                position += step_samples
                chunk_idx += 1

                # Force garbage collection between chunks
                gc.collect()

            logger.info(f"Processed {chunk_idx} chunks")

        except Exception as e:
            logger.error(f"Failed to process chunks: {e}")
            raise AudioIOError(f"Chunking failed: {e}")

    def process_file_in_chunks(
        self, file_path: str, processor_func, target_sr: int = 16000, **processor_kwargs
    ) -> List:
        """
        Process audio file in chunks with custom processor function.

        Args:
            file_path: Path to audio file
            processor_func: Function to process each chunk
                           Should accept (audio, sr, start_time, end_time, **kwargs)
            target_sr: Target sample rate
            **processor_kwargs: Additional arguments for processor function

        Returns:
            List of processing results from each chunk

        Example:
            >>> def detect_segments(audio, sr, start_time, end_time):
            ...     # Process audio chunk
            ...     return segments
            >>>
            >>> chunker = AudioChunker(chunk_duration=60.0)
            >>> results = chunker.process_file_in_chunks(
            ...     "long_file.m4a",
            ...     detect_segments
            ... )
        """
        results = []

        for chunk, sr, start_time, end_time in self.iter_chunks(file_path, target_sr):
            try:
                result = processor_func(chunk, sr, start_time, end_time, **processor_kwargs)
                results.append(result)

            except Exception as e:
                logger.error(f"Chunk processing failed at {start_time:.1f}s: {e}")
                # Continue with next chunk
                continue

        return results


class MemoryMonitor:
    """
    Monitor and manage memory usage during processing.
    """

    def __init__(self, max_memory_mb: Optional[float] = None):
        """
        Initialize memory monitor.

        Args:
            max_memory_mb: Maximum memory usage in MB (None = no limit)
        """
        self.max_memory_mb = max_memory_mb

        try:
            import os

            import psutil

            self.process = psutil.Process(os.getpid())
            self.psutil_available = True
        except ImportError:
            logger.warning("psutil not available, memory monitoring disabled")
            self.psutil_available = False

    def get_current_memory_mb(self) -> float:
        """
        Get current memory usage in MB.

        Returns:
            Memory usage in MB, or 0 if unavailable
        """
        if not self.psutil_available:
            return 0.0

        try:
            return self.process.memory_info().rss / 1024 / 1024
        except Exception:
            return 0.0

    def check_memory_limit(self) -> bool:
        """
        Check if memory usage is below limit.

        Returns:
            True if within limit (or no limit set), False if exceeded
        """
        if self.max_memory_mb is None:
            return True

        current_mb = self.get_current_memory_mb()

        if current_mb > self.max_memory_mb:
            logger.warning(
                f"Memory limit exceeded: {current_mb:.1f}MB > {self.max_memory_mb:.1f}MB"
            )
            return False

        return True

    def force_cleanup(self):
        """Force garbage collection and cleanup."""
        gc.collect()

        if self.psutil_available:
            try:
                import torch

                if torch.cuda.is_available():
                    torch.cuda.empty_cache()
                    logger.debug("Cleared CUDA cache")
            except ImportError:
                pass

        logger.debug("Forced garbage collection")


def optimize_for_large_files(audio_duration: float) -> dict:
    """
    Get optimization recommendations for large files.

    Args:
        audio_duration: Duration of audio file in seconds

    Returns:
        Dictionary with optimization parameters
    """
    # Thresholds
    LARGE_FILE_THRESHOLD = 3600  # 1 hour
    VERY_LARGE_FILE_THRESHOLD = 7200  # 2 hours

    config = {
        "use_chunking": False,
        "chunk_duration": 60.0,
        "chunk_overlap": 5.0,
        "force_gc_frequency": 10,  # Force GC every N chunks
        "recommended_batch_size": 32,
    }

    if audio_duration > VERY_LARGE_FILE_THRESHOLD:
        # Very large file (>2 hours)
        config.update(
            {
                "use_chunking": True,
                "chunk_duration": 30.0,  # Smaller chunks
                "chunk_overlap": 3.0,
                "force_gc_frequency": 5,  # More frequent GC
                "recommended_batch_size": 16,  # Smaller batches
            }
        )
        logger.info(
            f"Large file detected ({audio_duration / 3600:.1f}h), "
            "using aggressive memory optimization"
        )

    elif audio_duration > LARGE_FILE_THRESHOLD:
        # Large file (>1 hour)
        config.update(
            {
                "use_chunking": True,
                "chunk_duration": 60.0,
                "chunk_overlap": 5.0,
                "force_gc_frequency": 10,
                "recommended_batch_size": 24,
            }
        )
        logger.info(
            f"Large file detected ({audio_duration / 3600:.1f}h), using memory optimization"
        )

    return config


def estimate_memory_requirements(
    audio_duration: float, sample_rate: int = 16000, num_models: int = 3, safety_factor: float = 2.0
) -> float:
    """
    Estimate memory requirements for processing.

    Args:
        audio_duration: Duration in seconds
        sample_rate: Sample rate in Hz
        num_models: Number of ML models to load
        safety_factor: Safety multiplier (default: 2.0)

    Returns:
        Estimated memory requirement in MB
    """
    # Audio data (float32 = 4 bytes)
    audio_mb = (audio_duration * sample_rate * 4) / 1024 / 1024

    # Model overhead (rough estimate)
    model_mb = num_models * 500  # ~500MB per model

    # Processing overhead
    processing_mb = audio_mb * 2  # Intermediate buffers, embeddings, etc.

    total_mb = (audio_mb + model_mb + processing_mb) * safety_factor

    logger.debug(
        f"Estimated memory: audio={audio_mb:.1f}MB, "
        f"models={model_mb:.1f}MB, processing={processing_mb:.1f}MB, "
        f"total={total_mb:.1f}MB (with {safety_factor}x safety factor)"
    )

    return total_mb