VoiceFocus / stream_pipeline.py
Luis Küffner
Push latest updates
e368e99
raw
history blame
7.18 kB
import gradio as gr
import numpy as np
import soxr
from constants import DEFAULT_SR, STREAMER_CLASSES
from stt_streamers import DeepgramStreamer
from sdk import SDKWrapper
from dataclasses import dataclass
# ----------------------------
# Global transcript store (UI pulls from this)
# ----------------------------
_ENHANCED_TRANSCRIPT: str = ""
_RAW_TRANSCRIPT: str = ""
def _set_transcript_enhanced(text: str) -> None:
"""Deepgram callback: update latest transcript text (no printing)."""
global _ENHANCED_TRANSCRIPT
_ENHANCED_TRANSCRIPT = text
def _set_transcript_raw(text: str) -> None:
"""Deepgram callback: update latest transcript text (no printing)."""
global _RAW_TRANSCRIPT
_RAW_TRANSCRIPT = text
def get_live_transcripts() -> tuple[str, str]:
"""Return current enhanced and raw transcript for live UI updates."""
return _ENHANCED_TRANSCRIPT, _RAW_TRANSCRIPT
SDK = SDKWrapper()
SDK.init_processor(
sample_rate=DEFAULT_SR,
enhancement_level=1.0,
allow_variable_frames=True, # streaming chunks are variable-sized
num_channels=1,
)
# Created on first start_recording (lazy) to avoid Soniox "No audio received" timeout at app load
Streamer_enhanced = None
Streamer_raw = None
_streamer_generation = 0
_last_stop_generation = 1 # so first stop doesn't skip (1 > 1 is False)
@dataclass
class EnhanceSession:
pending: np.ndarray # 1D float32 @ processor sample rate
sr: int
num_frames: int
@dataclass
class StreamSession:
# nur was du wirklich brauchst
resampler: soxr.ResampleStream | None
sr_in: int | None
tail_16k: np.ndarray # ring buffer (z.B. letzte 10s)
tail_max: int # max samples
def _get_or_init_session(session: StreamSession | None, sr_in: int) -> StreamSession:
if session is None or session.sr_in != sr_in:
# ResampleStream ist für real-time processing gedacht citeturn8view0
resampler = None if sr_in == DEFAULT_SR else soxr.ResampleStream(sr_in, DEFAULT_SR, num_channels=1, dtype="float32")
return StreamSession(resampler=resampler, sr_in=sr_in, tail_16k=np.zeros((0,), dtype=np.float32), tail_max=10 * DEFAULT_SR)
return session
def _to_float32_mono(y: np.ndarray) -> np.ndarray:
# Gradio liefert int16 (oder (samples, channels)). citeturn1view4
y = np.asarray(y)
if y.ndim > 1:
y = y.mean(axis=1)
if y.dtype == np.int16:
y = (y.astype(np.float32) / 32768.0)
else:
y = y.astype(np.float32)
return y
def transcribe_stream(session: StreamSession | None, new_chunk, enhancement_level, input_gain_db: float = 0.0):
if (
Streamer_enhanced is None
or Streamer_raw is None
or Streamer_enhanced.ws is None
or Streamer_raw.ws is None
):
return session, _ENHANCED_TRANSCRIPT, _RAW_TRANSCRIPT
if new_chunk is None or new_chunk[1] is None:
return session, _ENHANCED_TRANSCRIPT, _RAW_TRANSCRIPT
sr, y = new_chunk
y = _to_float32_mono(y)
# Apply input gain: linear = 10^(dB/20), clip to avoid overflow
if input_gain_db is not None and input_gain_db > 0:
gain_linear = np.float32(10.0 ** (float(input_gain_db) / 20.0))
y = (y * gain_linear).astype(np.float32)
y = np.clip(y, -1.0, 1.0)
session = _get_or_init_session(session, sr)
SDK.change_enhancement_level(float(enhancement_level) / 100.0)
if session.resampler is not None:
y_16k = session.resampler.resample_chunk(y)
else:
y_16k = y
# Ensure 1D float32 for SDK and streamers (resample_chunk can return 0 samples or 2D)
y_16k = np.asarray(y_16k, dtype=np.float32).flatten()
# Ringbuffer (nicht unendlich konkatenieren)
if y_16k.size > 0:
tail = np.concatenate([session.tail_16k, y_16k])
if tail.size > session.tail_max:
tail = tail[-session.tail_max:]
session.tail_16k = tail
# Only send when we have samples (resample_chunk can return empty; SDK needs valid input)
if y_16k.size == 0:
return session, _ENHANCED_TRANSCRIPT, _RAW_TRANSCRIPT
# Parallel path: send raw to STT immediately, then enhance and send enhanced.
# SDK requires fixed num_frames (AudioConfigMismatchError if we use process_chunk with variable size).
Streamer_raw.process_chunk(y_16k)
enhanced_chunk_16k = SDK.process_sync(y_16k)
out_1d = np.asarray(enhanced_chunk_16k, dtype=np.float32).flatten()
# Always send something to enhanced so Soniox doesn't close with "No audio received"
if out_1d.size > 0:
Streamer_enhanced.process_chunk(out_1d)
else:
Streamer_enhanced.process_chunk(np.zeros(160, dtype=np.float32))
return session, _ENHANCED_TRANSCRIPT, _RAW_TRANSCRIPT
def shutdown_streamers(from_stop_recording: bool = False):
"""Shut down STT streamers. If from_stop_recording, skip when streamers were
created after the last stop (avoids delayed stop killing new streamers)."""
global Streamer_enhanced, Streamer_raw, _streamer_generation, _last_stop_generation
if from_stop_recording and _streamer_generation > _last_stop_generation:
return
gen = _streamer_generation
try:
if Streamer_enhanced is not None and Streamer_enhanced.ws is not None:
Streamer_enhanced.shutdown()
if Streamer_raw is not None and Streamer_raw.ws is not None:
Streamer_raw.shutdown()
except Exception:
print("Failed to shutdown streamers.")
finally:
Streamer_enhanced = None
Streamer_raw = None
if from_stop_recording:
_last_stop_generation = gen
def on_stop_recording():
"""Call from Gradio stop_recording so streamers shut down when user clicks Stop."""
shutdown_streamers(from_stop_recording=True)
def clear_ui():
global _ENHANCED_TRANSCRIPT, _RAW_TRANSCRIPT
_ENHANCED_TRANSCRIPT = ""
_RAW_TRANSCRIPT = ""
return None, _ENHANCED_TRANSCRIPT, _RAW_TRANSCRIPT
def stop_online_backend():
"""Stop streamers and clear transcripts. Do not update the Audio component:
toggling streaming=False then back to True can make the frontend lose the
microphone (getUserMedia not re-called), so we leave it unchanged."""
shutdown_streamers()
session, enhanced_transcript, raw_transcript = clear_ui()
return session, enhanced_transcript, raw_transcript, gr.update()
def set_stt_streamer(model_name):
StreamerCls = STREAMER_CLASSES.get(model_name, DeepgramStreamer)
global Streamer_enhanced, Streamer_raw, _streamer_generation
# Shut down current streamers first so we don't leak
if Streamer_enhanced is not None or Streamer_raw is not None:
shutdown_streamers()
# Create both before assigning so transcribe_stream never sees one new and one old
new_enhanced = StreamerCls(
fs_hz=DEFAULT_SR,
stream_name="enhanced",
on_update=_set_transcript_enhanced,
)
new_raw = StreamerCls(
fs_hz=DEFAULT_SR,
stream_name="raw",
on_update=_set_transcript_raw,
)
_streamer_generation += 1
Streamer_enhanced = new_enhanced
Streamer_raw = new_raw