tot-talk / audio /capture.py
grungecoder's picture
Initial commit: real-time multi-model baby cry classifier
ea2601f
"""Microphone capture with overlapping sliding windows using sounddevice."""
from __future__ import annotations
import queue
import threading
import numpy as np
import sounddevice as sd
from audio.preprocess import SAMPLE_RATE, WINDOW_SECONDS
class MicCapture:
"""Captures audio from the default mic and emits overlapping windows.
Uses a ring-buffer with a 1-second hop (50 % overlap on 2 s windows) so
short or quiet sounds that straddle a boundary are still captured in at
least one complete window.
Parameters
----------
sample_rate : int
Target sample rate (default 16 000).
window_seconds : float
Window length in seconds (default 2.0).
hop_seconds : float
Hop between consecutive emitted windows (default 1.0 β†’ 50 % overlap).
"""
def __init__(
self,
sample_rate: int = SAMPLE_RATE,
window_seconds: float = WINDOW_SECONDS,
hop_seconds: float = 0.5,
) -> None:
self.sample_rate = sample_rate
self.window_seconds = window_seconds
self.hop_seconds = hop_seconds
self._window_samples = int(sample_rate * window_seconds)
self._hop_samples = int(sample_rate * hop_seconds)
# Ring buffer β€” pre-allocated numpy array
self._buf = np.zeros(self._window_samples, dtype=np.float32)
self._write_pos = 0 # how many samples written since last emit
self._buf_filled = False # True once we have at least one full window
# Thread-safe queue so the main loop can pull complete windows
self.window_queue: queue.Queue[np.ndarray] = queue.Queue(maxsize=16)
self._stream: sd.InputStream | None = None
self._running = threading.Event()
# ── sounddevice callback ──────────────────────────────────────────────
def _audio_callback(
self,
indata: np.ndarray,
frames: int,
time_info: object,
status: sd.CallbackFlags,
) -> None:
mono = indata[:, 0].copy()
n = len(mono)
# Shift buffer left and append new samples at the end
if n >= self._window_samples:
# Chunk larger than window β€” just keep the last window_samples
self._buf[:] = mono[-self._window_samples:]
self._write_pos = self._window_samples
self._buf_filled = True
else:
self._buf[:-n] = self._buf[n:]
self._buf[-n:] = mono
self._write_pos += n
# After initial fill, emit a window every hop_samples
if not self._buf_filled:
if self._write_pos >= self._window_samples:
self._buf_filled = True
self._write_pos = 0
self._emit()
else:
if self._write_pos >= self._hop_samples:
self._write_pos -= self._hop_samples
self._emit()
def _emit(self) -> None:
window = self._buf.copy()
try:
self.window_queue.put_nowait(window)
except queue.Full:
# Drop the oldest unprocessed window to keep latency low
try:
self.window_queue.get_nowait()
except queue.Empty:
pass
try:
self.window_queue.put_nowait(window)
except queue.Full:
pass
# ── public API ────────────────────────────────────────────────────────
def start(self) -> None:
"""Open the mic stream and begin capturing."""
self._running.set()
self._stream = sd.InputStream(
samplerate=self.sample_rate,
channels=1,
dtype="float32",
blocksize=int(self.sample_rate * 0.1), # 100 ms blocks
callback=self._audio_callback,
)
self._stream.start()
def stop(self) -> None:
"""Stop capturing and close the stream."""
self._running.clear()
if self._stream is not None:
self._stream.stop()
self._stream.close()
self._stream = None
@property
def is_running(self) -> bool:
return self._running.is_set()
class FileCapture:
"""Reads an audio file and emits sliding 2-second windows into a queue.
Parameters
----------
path : str
Path to a WAV/FLAC/MP3 file.
sample_rate : int
Target sample rate.
window_seconds : float
Window size in seconds.
hop_seconds : float
Hop between consecutive windows (default 1.0 s for 50 % overlap).
loop : bool
Whether to loop the file indefinitely.
"""
def __init__(
self,
path: str,
sample_rate: int = SAMPLE_RATE,
window_seconds: float = WINDOW_SECONDS,
hop_seconds: float = 1.0,
loop: bool = True,
) -> None:
import librosa
self.path = path
self.sample_rate = sample_rate
self.window_seconds = window_seconds
self.hop_seconds = hop_seconds
self.loop = loop
self._audio, _ = librosa.load(path, sr=sample_rate, mono=True)
self._window_samples = int(sample_rate * window_seconds)
self._hop_samples = int(sample_rate * hop_seconds)
self._total_samples = len(self._audio)
self.window_queue: queue.Queue[np.ndarray] = queue.Queue(maxsize=16)
self._thread: threading.Thread | None = None
self._running = threading.Event()
def _emit_loop(self) -> None:
offset = 0
while self._running.is_set():
end = offset + self._window_samples
if end > self._total_samples:
if self.loop:
offset = 0
continue
else:
break
window = self._audio[offset:end].copy()
try:
self.window_queue.put(window, timeout=0.5)
except queue.Full:
pass
offset += self._hop_samples
# Simulate real-time pacing
import time
time.sleep(self.hop_seconds)
@property
def current_position_seconds(self) -> float:
"""Approximate playback position β€” not perfectly precise but useful for display."""
return 0.0 # simplified; the thread owns the offset
def start(self) -> None:
self._running.set()
self._thread = threading.Thread(target=self._emit_loop, daemon=True)
self._thread.start()
def stop(self) -> None:
self._running.clear()
if self._thread is not None:
self._thread.join(timeout=3)
self._thread = None
@property
def is_running(self) -> bool:
return self._running.is_set()