Spaces:
Sleeping
Sleeping
| """Microphone capture with overlapping sliding windows using sounddevice.""" | |
| from __future__ import annotations | |
| import queue | |
| import threading | |
| import numpy as np | |
| import sounddevice as sd | |
| from audio.preprocess import SAMPLE_RATE, WINDOW_SECONDS | |
| class MicCapture: | |
| """Captures audio from the default mic and emits overlapping windows. | |
| Uses a ring-buffer with a 1-second hop (50 % overlap on 2 s windows) so | |
| short or quiet sounds that straddle a boundary are still captured in at | |
| least one complete window. | |
| Parameters | |
| ---------- | |
| sample_rate : int | |
| Target sample rate (default 16 000). | |
| window_seconds : float | |
| Window length in seconds (default 2.0). | |
| hop_seconds : float | |
| Hop between consecutive emitted windows (default 1.0 β 50 % overlap). | |
| """ | |
| def __init__( | |
| self, | |
| sample_rate: int = SAMPLE_RATE, | |
| window_seconds: float = WINDOW_SECONDS, | |
| hop_seconds: float = 0.5, | |
| ) -> None: | |
| self.sample_rate = sample_rate | |
| self.window_seconds = window_seconds | |
| self.hop_seconds = hop_seconds | |
| self._window_samples = int(sample_rate * window_seconds) | |
| self._hop_samples = int(sample_rate * hop_seconds) | |
| # Ring buffer β pre-allocated numpy array | |
| self._buf = np.zeros(self._window_samples, dtype=np.float32) | |
| self._write_pos = 0 # how many samples written since last emit | |
| self._buf_filled = False # True once we have at least one full window | |
| # Thread-safe queue so the main loop can pull complete windows | |
| self.window_queue: queue.Queue[np.ndarray] = queue.Queue(maxsize=16) | |
| self._stream: sd.InputStream | None = None | |
| self._running = threading.Event() | |
| # ββ sounddevice callback ββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _audio_callback( | |
| self, | |
| indata: np.ndarray, | |
| frames: int, | |
| time_info: object, | |
| status: sd.CallbackFlags, | |
| ) -> None: | |
| mono = indata[:, 0].copy() | |
| n = len(mono) | |
| # Shift buffer left and append new samples at the end | |
| if n >= self._window_samples: | |
| # Chunk larger than window β just keep the last window_samples | |
| self._buf[:] = mono[-self._window_samples:] | |
| self._write_pos = self._window_samples | |
| self._buf_filled = True | |
| else: | |
| self._buf[:-n] = self._buf[n:] | |
| self._buf[-n:] = mono | |
| self._write_pos += n | |
| # After initial fill, emit a window every hop_samples | |
| if not self._buf_filled: | |
| if self._write_pos >= self._window_samples: | |
| self._buf_filled = True | |
| self._write_pos = 0 | |
| self._emit() | |
| else: | |
| if self._write_pos >= self._hop_samples: | |
| self._write_pos -= self._hop_samples | |
| self._emit() | |
| def _emit(self) -> None: | |
| window = self._buf.copy() | |
| try: | |
| self.window_queue.put_nowait(window) | |
| except queue.Full: | |
| # Drop the oldest unprocessed window to keep latency low | |
| try: | |
| self.window_queue.get_nowait() | |
| except queue.Empty: | |
| pass | |
| try: | |
| self.window_queue.put_nowait(window) | |
| except queue.Full: | |
| pass | |
| # ββ public API ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def start(self) -> None: | |
| """Open the mic stream and begin capturing.""" | |
| self._running.set() | |
| self._stream = sd.InputStream( | |
| samplerate=self.sample_rate, | |
| channels=1, | |
| dtype="float32", | |
| blocksize=int(self.sample_rate * 0.1), # 100 ms blocks | |
| callback=self._audio_callback, | |
| ) | |
| self._stream.start() | |
| def stop(self) -> None: | |
| """Stop capturing and close the stream.""" | |
| self._running.clear() | |
| if self._stream is not None: | |
| self._stream.stop() | |
| self._stream.close() | |
| self._stream = None | |
| def is_running(self) -> bool: | |
| return self._running.is_set() | |
| class FileCapture: | |
| """Reads an audio file and emits sliding 2-second windows into a queue. | |
| Parameters | |
| ---------- | |
| path : str | |
| Path to a WAV/FLAC/MP3 file. | |
| sample_rate : int | |
| Target sample rate. | |
| window_seconds : float | |
| Window size in seconds. | |
| hop_seconds : float | |
| Hop between consecutive windows (default 1.0 s for 50 % overlap). | |
| loop : bool | |
| Whether to loop the file indefinitely. | |
| """ | |
| def __init__( | |
| self, | |
| path: str, | |
| sample_rate: int = SAMPLE_RATE, | |
| window_seconds: float = WINDOW_SECONDS, | |
| hop_seconds: float = 1.0, | |
| loop: bool = True, | |
| ) -> None: | |
| import librosa | |
| self.path = path | |
| self.sample_rate = sample_rate | |
| self.window_seconds = window_seconds | |
| self.hop_seconds = hop_seconds | |
| self.loop = loop | |
| self._audio, _ = librosa.load(path, sr=sample_rate, mono=True) | |
| self._window_samples = int(sample_rate * window_seconds) | |
| self._hop_samples = int(sample_rate * hop_seconds) | |
| self._total_samples = len(self._audio) | |
| self.window_queue: queue.Queue[np.ndarray] = queue.Queue(maxsize=16) | |
| self._thread: threading.Thread | None = None | |
| self._running = threading.Event() | |
| def _emit_loop(self) -> None: | |
| offset = 0 | |
| while self._running.is_set(): | |
| end = offset + self._window_samples | |
| if end > self._total_samples: | |
| if self.loop: | |
| offset = 0 | |
| continue | |
| else: | |
| break | |
| window = self._audio[offset:end].copy() | |
| try: | |
| self.window_queue.put(window, timeout=0.5) | |
| except queue.Full: | |
| pass | |
| offset += self._hop_samples | |
| # Simulate real-time pacing | |
| import time | |
| time.sleep(self.hop_seconds) | |
| def current_position_seconds(self) -> float: | |
| """Approximate playback position β not perfectly precise but useful for display.""" | |
| return 0.0 # simplified; the thread owns the offset | |
| def start(self) -> None: | |
| self._running.set() | |
| self._thread = threading.Thread(target=self._emit_loop, daemon=True) | |
| self._thread.start() | |
| def stop(self) -> None: | |
| self._running.clear() | |
| if self._thread is not None: | |
| self._thread.join(timeout=3) | |
| self._thread = None | |
| def is_running(self) -> bool: | |
| return self._running.is_set() | |