mariesig commited on
Commit
990f149
·
1 Parent(s): aabdfb2

Refactor online streaming functionality and enhance documentation

Browse files
app.py CHANGED
@@ -1,7 +1,9 @@
 
1
  import gradio as gr
 
2
  from hf_dataset_utils import ALL_FILES
3
 
4
- from online_pipeline import transcribe_stream, reset_streamers, stop_streaming, change_stt_model
5
  from offline_pipeline import load_file_from_dataset, load_local_file, denoise_audio, retrieve_audio_information
6
  from clean_up import purge_tmp_directory, cleanup_previous_run
7
 
@@ -33,7 +35,7 @@ with gr.Blocks() as demo:
33
  )
34
 
35
  # Online STT streamer swap uses the same global control
36
- stt_model.change(fn=change_stt_model, inputs=stt_model, outputs=[])
37
 
38
  with gr.Tabs(elem_classes="main-tabs"):
39
  # =========================
@@ -42,10 +44,14 @@ with gr.Blocks() as demo:
42
  with gr.Tab("Offline", elem_classes="tab-offline") as offline_tab:
43
  with gr.Group(elem_classes="panel"):
44
  with gr.Tab("Upload local file", elem_classes="upload-tab") as upload_tab:
 
 
45
  audio_file_upload = gr.Audio(type="filepath", sources=["upload"])
46
  enhance_btn_for_upload = gr.Button("Enhance", scale=2)
47
 
48
  with gr.Tab("Dataset: Dawn Chorus", elem_classes="dataset-tab") as dataset_tab:
 
 
49
  dataset_dropdown = gr.Dropdown(choices=ALL_FILES, value=ALL_FILES[0], label="Select a sample from the Dawn Chorus dataset")
50
  audio_file_from_dataset = gr.Audio(type="filepath", interactive=False)
51
  enhance_btn_for_dataset = gr.Button("Enhance", scale=2)
@@ -114,6 +120,7 @@ with gr.Blocks() as demo:
114
  # ONLINE TAB
115
  # =========================
116
  with gr.Tab("Online", elem_classes="tab-online"):
 
117
  with gr.Group(elem_classes="panel"):
118
  stream_state = gr.State(None)
119
  audio_stream = gr.Audio(sources=["microphone"], streaming=True)
@@ -123,26 +130,32 @@ with gr.Blocks() as demo:
123
  with gr.Column(scale=5, min_width=320):
124
  raw_text = gr.Textbox(label="Raw Transcribed Text", lines=6)
125
  clear_btn = gr.Button("Clear")
126
-
127
-
128
- audio_stream.stream(
129
  fn=transcribe_stream,
130
  inputs=[stream_state, audio_stream, enhancement_level],
131
  outputs=[stream_state, enhanced_text, raw_text],
132
- stream_every=0.05,
 
 
 
133
  )
134
 
135
  clear_btn.click(
136
- fn=reset_streamers,
137
- outputs=[stream_state, enhanced_text, raw_text],
 
 
 
 
 
138
  )
139
 
140
- offline_tab.select(
141
- fn=stop_streaming,
142
- inputs=None,
143
- outputs=[audio_stream, stream_state, enhanced_text, raw_text],
144
- )
145
-
146
 
147
  purge_tmp_directory(max_age_minutes=0, skip_substrings=[])
148
  demo.launch(allowed_paths=["/tmp", "/"])
 
1
+
2
  import gradio as gr
3
+ from constants import STREAM_EVERY
4
  from hf_dataset_utils import ALL_FILES
5
 
6
+ from online_pipeline import set_stt_streamer, transcribe_stream, stop_streaming, set_stt_streamer
7
  from offline_pipeline import load_file_from_dataset, load_local_file, denoise_audio, retrieve_audio_information
8
  from clean_up import purge_tmp_directory, cleanup_previous_run
9
 
 
35
  )
36
 
37
  # Online STT streamer swap uses the same global control
38
+ stt_model.change(fn=set_stt_streamer, inputs=stt_model, outputs=[])
39
 
40
  with gr.Tabs(elem_classes="main-tabs"):
41
  # =========================
 
44
  with gr.Tab("Offline", elem_classes="tab-offline") as offline_tab:
45
  with gr.Group(elem_classes="panel"):
46
  with gr.Tab("Upload local file", elem_classes="upload-tab") as upload_tab:
47
+ with gr.Row():
48
+ gr.Markdown(open("docs/local_file.md", "r", encoding="utf-8").read())
49
  audio_file_upload = gr.Audio(type="filepath", sources=["upload"])
50
  enhance_btn_for_upload = gr.Button("Enhance", scale=2)
51
 
52
  with gr.Tab("Dataset: Dawn Chorus", elem_classes="dataset-tab") as dataset_tab:
53
+ with gr.Row():
54
+ gr.Markdown(open("docs/dawn_chorus.md", "r", encoding="utf-8").read())
55
  dataset_dropdown = gr.Dropdown(choices=ALL_FILES, value=ALL_FILES[0], label="Select a sample from the Dawn Chorus dataset")
56
  audio_file_from_dataset = gr.Audio(type="filepath", interactive=False)
57
  enhance_btn_for_dataset = gr.Button("Enhance", scale=2)
 
120
  # ONLINE TAB
121
  # =========================
122
  with gr.Tab("Online", elem_classes="tab-online"):
123
+ gr.Markdown(open("docs/online.md", "r", encoding="utf-8").read())
124
  with gr.Group(elem_classes="panel"):
125
  stream_state = gr.State(None)
126
  audio_stream = gr.Audio(sources=["microphone"], streaming=True)
 
130
  with gr.Column(scale=5, min_width=320):
131
  raw_text = gr.Textbox(label="Raw Transcribed Text", lines=6)
132
  clear_btn = gr.Button("Clear")
133
+ stream_evt = audio_stream.stream(
 
 
134
  fn=transcribe_stream,
135
  inputs=[stream_state, audio_stream, enhancement_level],
136
  outputs=[stream_state, enhanced_text, raw_text],
137
+ stream_every=STREAM_EVERY,
138
+ time_limit=60*2,
139
+ concurrency_limit=1,
140
+
141
  )
142
 
143
  clear_btn.click(
144
+ fn=stop_streaming,
145
+ outputs=[audio_stream,stream_state, enhanced_text, raw_text],
146
+ cancels=[stream_evt]
147
+ ).then(
148
+ set_stt_streamer,
149
+ inputs=stt_model,
150
+ outputs=None,
151
  )
152
 
153
+ offline_tab.select(
154
+ fn=stop_streaming,
155
+ outputs=[audio_stream,stream_state, enhanced_text, raw_text],
156
+ cancels=[stream_evt]
157
+ )
158
+
159
 
160
  purge_tmp_directory(max_age_minutes=0, skip_substrings=[])
161
  demo.launch(allowed_paths=["/tmp", "/"])
constants.py CHANGED
@@ -1,7 +1,7 @@
 
1
  from typing import Final
2
- import os
3
 
4
- from stt_streamers.soniox_streamer import SONIOX_WEBSOCKET_URL
5
 
6
  CHUNK_SIZE: Final = 1024
7
  TIMEOUT_FACTOR_MB: Final = 60
@@ -17,6 +17,10 @@ MIX_DIR: Final = "mix"
17
  SPEECH_DIR: Final = "speech"
18
  TRANS_DIR: Final = "transcripts"
19
 
20
- # Private access token from Space secrets:
21
  DEFAULT_SR: Final = 16000
 
22
 
 
 
 
 
 
1
+ from re import S
2
  from typing import Final
3
+ from stt_streamers import DeepgramStreamer, SonioxStreamer
4
 
 
5
 
6
  CHUNK_SIZE: Final = 1024
7
  TIMEOUT_FACTOR_MB: Final = 60
 
17
  SPEECH_DIR: Final = "speech"
18
  TRANS_DIR: Final = "transcripts"
19
 
 
20
  DEFAULT_SR: Final = 16000
21
+ STREAM_EVERY: Final = 0.2
22
 
23
+ STREAMER_CLASSES: Final = {
24
+ "Deepgram": DeepgramStreamer,
25
+ "Soniox": SonioxStreamer,
26
+ }
docs/dawn_chorus.md ADDED
@@ -0,0 +1 @@
 
 
1
+ Select a sample from our open-source [Dawn Chorus English](https://huggingface.co/datasets/ai-coustics/dawn_chorus_en), which features challenging cases with background voice activity.
docs/intro.md CHANGED
@@ -1,9 +1,2 @@
1
- Welcome! This Space lets you try **ai‑coustics VoiceFocus (Quail Voice Focus)** — a real‑time, STT‑oriented enhancement model that **isolates the foreground speaker** and **suppresses competing voices + background noise**. It’s tuned to keep the phonetic cues speech‑to‑text systems need, so the output isn’t always “prettier” — just cleaner for transcription.
2
-
3
- **Offline:** upload an audio file or pick a sample from the dataset, then listen to the enhanced result and compare raw vs enhanced transcripts.
4
-
5
- **Online:** stream from your microphone and watch raw vs enhanced text update live.
6
-
7
- Use **Enhancement level (0–100)** to dial in the strength, and switch the **STT backend (Deepgram / Soniox)** to see how different engines react to cleaner input.
8
-
9
- Tip: speak close to your mic (near‑field) and keep a steady level for best results. Please don’t upload sensitive or private audio—use test material only.
 
1
+ Welcome! This Space lets you try **ai‑coustics Quail Voice Focus** — a real‑time, STT‑oriented enhancement model that **isolates the foreground speaker** and suppresses competing voices and background noise. For more information visit our [docs](https://docs.ai-coustics.com/guides/models#quail).
2
+ The model is tuned to preserve the phonetic cues needed for speech‑to‑text systems, so the output isn’t always “prettier”—just cleaner for transcription.
 
 
 
 
 
 
 
docs/local_file.md ADDED
@@ -0,0 +1 @@
 
 
1
+ Upload an audio file from your computer. For best results, choose a recording with overlapping or background speech to observe how the model handles challenging scenarios.
docs/online.md ADDED
@@ -0,0 +1,2 @@
 
 
 
1
+ Use your microphone to stream live audio and see, in real time, how our enhancement technology surpresses voices in the background and excludes them from transcription.
2
+ Tip: speak close to your mic (near‑field) for best results. Please don’t upload sensitive or private audio—use test material only.
online_pipeline.py CHANGED
@@ -1,10 +1,10 @@
1
  import numpy as np
2
  import soxr
3
- from constants import DEFAULT_SR
4
-
5
- from stt_streamers import DeepgramStreamer, SonioxStreamer
6
  from sdk import SDKWrapper
7
- import gradio as gr
 
8
  # ----------------------------
9
  # Global transcript store (UI pulls from this)
10
  # ----------------------------
@@ -12,7 +12,6 @@ _ENHANCED_TRANSCRIPT: str = ""
12
  _RAW_TRANSCRIPT: str = ""
13
 
14
 
15
-
16
  def _set_transcript_enhanced(text: str) -> None:
17
  """Deepgram callback: update latest transcript text (no printing)."""
18
  global _ENHANCED_TRANSCRIPT
@@ -23,14 +22,8 @@ def _set_transcript_raw(text: str) -> None:
23
  global _RAW_TRANSCRIPT
24
  _RAW_TRANSCRIPT = text
25
 
26
- map_streamer_to_callback = {
27
- "Deepgram": DeepgramStreamer,
28
- "Soniox": SonioxStreamer,
29
- }
30
 
31
- # ----------------------------
32
- # Single global streamer (stays the same)
33
- # ----------------------------
34
  global Streamer_enhanced, Streamer_raw, SDK
35
 
36
  Streamer_enhanced = DeepgramStreamer(
@@ -44,82 +37,88 @@ Streamer_raw = DeepgramStreamer(
44
  on_update=_set_transcript_raw,
45
  )
46
 
47
- ResampleStream = soxr.ResampleStream(48000, DEFAULT_SR,1,dtype='float32')
48
  SDK = SDKWrapper()
49
- SDK.init_processor(sample_rate=DEFAULT_SR, enhancement_level=1.0, allow_variable_frames=True, num_frames=800) # 100% enhancement for online demo
50
-
51
-
52
- # ----------------------------
53
- # Gradio stream handler
54
- # ----------------------------
55
- def transcribe_stream(stream_16k, new_chunk, enhancement_level):
56
- """
57
- stream_16k: np.ndarray | None (we store the running buffer in 16 kHz)
58
- new_chunk: None OR (sr:int, y:np.ndarray)
59
- returns: (stream_16k_state, enhanced_text, raw_text)
60
- """
61
- # Gradio can send None when stream ends / resets / no audio yet
62
- if new_chunk is None:
63
- return stream_16k, _ENHANCED_TRANSCRIPT, _RAW_TRANSCRIPT
64
-
65
- sr, y = new_chunk
66
- if y is None:
67
- return stream_16k, _ENHANCED_TRANSCRIPT, _RAW_TRANSCRIPT
68
-
 
 
 
 
 
 
 
 
 
 
 
69
  y = np.asarray(y)
70
- if y.size == 0:
71
- return stream_16k, _ENHANCED_TRANSCRIPT, _RAW_TRANSCRIPT
72
-
73
- # Convert to mono if stereo: y can be (frames,) or (frames, channels)
74
  if y.ndim > 1:
75
  y = y.mean(axis=1)
76
-
77
- # Convert dtype correctly
78
  if y.dtype == np.int16:
79
- y = y.astype(np.float32) / 32768.0
80
  else:
81
  y = y.astype(np.float32)
 
82
 
83
- if sr != 16000:
84
- y_16k = soxr.resample(y, sr, 16000).astype(np.float32)
85
- else:
86
- y_16k = y
87
 
88
- # Save stream in 16 kHz
89
- stream_16k = y_16k if stream_16k is None else np.concatenate([stream_16k, y_16k])
 
90
 
91
- # Enhance at 16k
 
 
92
  SDK.change_enhancement_level(float(enhancement_level) / 100.0)
93
- enhanced_chunk_16k = SDK.process_chunk(y_16k)
94
- Streamer_enhanced.process_chunk(enhanced_chunk_16k)
95
- Streamer_raw.process_chunk(y_16k)
 
 
 
 
 
 
 
 
 
 
 
96
 
97
- return stream_16k, _ENHANCED_TRANSCRIPT, _RAW_TRANSCRIPT
98
 
99
- def reset_streamers():
100
- global _ENHANCED_TRANSCRIPT, _RAW_TRANSCRIPT
101
 
 
102
  try:
103
- Streamer_enhanced.clear_text()
104
- Streamer_raw.clear_text()
105
- ResampleStream.clear()
 
 
106
  except Exception:
107
  pass
108
-
109
- _ENHANCED_TRANSCRIPT = ""
110
- _RAW_TRANSCRIPT = ""
111
-
112
- return None, "", ""
113
-
114
-
115
- def stop_streaming():
116
- reset_streamers()
117
  return None, None, "", ""
118
 
119
 
120
- def change_stt_model(model_name):
121
- StreamerCls = map_streamer_to_callback.get(model_name, DeepgramStreamer)
122
- print(StreamerCls)
123
  global Streamer_enhanced, Streamer_raw
124
  Streamer_enhanced = StreamerCls(
125
  fs_hz=DEFAULT_SR,
@@ -131,5 +130,4 @@ def change_stt_model(model_name):
131
  stream_name="raw",
132
  on_update=_set_transcript_raw,
133
  )
134
-
135
-
 
1
  import numpy as np
2
  import soxr
3
+ from constants import DEFAULT_SR, STREAM_EVERY, STREAMER_CLASSES
4
+ from stt_streamers import DeepgramStreamer
 
5
  from sdk import SDKWrapper
6
+ from dataclasses import dataclass
7
+
8
  # ----------------------------
9
  # Global transcript store (UI pulls from this)
10
  # ----------------------------
 
12
  _RAW_TRANSCRIPT: str = ""
13
 
14
 
 
15
  def _set_transcript_enhanced(text: str) -> None:
16
  """Deepgram callback: update latest transcript text (no printing)."""
17
  global _ENHANCED_TRANSCRIPT
 
22
  global _RAW_TRANSCRIPT
23
  _RAW_TRANSCRIPT = text
24
 
 
 
 
 
25
 
26
+
 
 
27
  global Streamer_enhanced, Streamer_raw, SDK
28
 
29
  Streamer_enhanced = DeepgramStreamer(
 
37
  on_update=_set_transcript_raw,
38
  )
39
 
 
40
  SDK = SDKWrapper()
41
+ SDK.init_processor(
42
+ sample_rate=DEFAULT_SR,
43
+ enhancement_level=1.0,
44
+ allow_variable_frames=False,
45
+ num_channels=1,
46
+ )
47
+
48
+
49
+
50
+ @dataclass
51
+ class EnhanceSession:
52
+ pending: np.ndarray # 1D float32 @ processor sample rate
53
+ sr: int
54
+ num_frames: int
55
+ @dataclass
56
+ class StreamSession:
57
+ # nur was du wirklich brauchst
58
+ resampler: soxr.ResampleStream | None
59
+ sr_in: int | None
60
+ tail_16k: np.ndarray # ring buffer (z.B. letzte 10s)
61
+ tail_max: int # max samples
62
+
63
+ def _get_or_init_session(session: StreamSession | None, sr_in: int) -> StreamSession:
64
+ if session is None or session.sr_in != sr_in:
65
+ # ResampleStream ist für real-time processing gedacht citeturn8view0
66
+ resampler = None if sr_in == 16000 else soxr.ResampleStream(sr_in, 16000, num_channels=1, dtype="float32")
67
+ return StreamSession(resampler=resampler, sr_in=sr_in, tail_16k=np.zeros((0,), dtype=np.float32), tail_max=10 * 16000)
68
+ return session
69
+
70
+ def _to_float32_mono(y: np.ndarray) -> np.ndarray:
71
+ # Gradio liefert int16 (oder (samples, channels)). citeturn1view4
72
  y = np.asarray(y)
 
 
 
 
73
  if y.ndim > 1:
74
  y = y.mean(axis=1)
 
 
75
  if y.dtype == np.int16:
76
+ y = (y.astype(np.float32) / 32768.0)
77
  else:
78
  y = y.astype(np.float32)
79
+ return y
80
 
 
 
 
 
81
 
82
+ def transcribe_stream(session: StreamSession | None, new_chunk, enhancement_level):
83
+ if new_chunk is None or new_chunk[1] is None:
84
+ return session, _ENHANCED_TRANSCRIPT, _RAW_TRANSCRIPT
85
 
86
+ sr, y = new_chunk
87
+ y = _to_float32_mono(y)
88
+ session = _get_or_init_session(session, sr)
89
  SDK.change_enhancement_level(float(enhancement_level) / 100.0)
90
+ if session.resampler is not None:
91
+ y_16k = session.resampler.resample_chunk(y)
92
+ else:
93
+ y_16k = y
94
+
95
+ # Ringbuffer (nicht unendlich konkatenieren)
96
+ if y_16k.size > 0:
97
+ tail = np.concatenate([session.tail_16k, y_16k])
98
+ if tail.size > session.tail_max:
99
+ tail = tail[-session.tail_max:]
100
+ session.tail_16k = tail
101
+ enhanced_chunk_16k = SDK.process_sync(y_16k)
102
+ Streamer_enhanced.process_chunk(enhanced_chunk_16k.flatten())
103
+ Streamer_raw.process_chunk(y_16k.flatten())
104
 
105
+ return session, _ENHANCED_TRANSCRIPT, _RAW_TRANSCRIPT
106
 
 
 
107
 
108
+ def stop_streaming():
109
  try:
110
+ Streamer_enhanced.shutdown()
111
+ except Exception:
112
+ pass
113
+ try:
114
+ Streamer_raw.shutdown()
115
  except Exception:
116
  pass
 
 
 
 
 
 
 
 
 
117
  return None, None, "", ""
118
 
119
 
120
+ def set_stt_streamer(model_name):
121
+ StreamerCls = STREAMER_CLASSES.get(model_name, DeepgramStreamer)
 
122
  global Streamer_enhanced, Streamer_raw
123
  Streamer_enhanced = StreamerCls(
124
  fs_hz=DEFAULT_SR,
 
130
  stream_name="raw",
131
  on_update=_set_transcript_raw,
132
  )
133
+
 
sdk.py CHANGED
@@ -1,10 +1,4 @@
1
- # sdk_audio.py (or keep inline)
2
-
3
- from __future__ import annotations
4
-
5
  import numpy as np
6
- import librosa
7
- import soundfile as sf
8
  from dotenv import load_dotenv
9
  import aic_sdk as aic
10
  import os
@@ -22,16 +16,21 @@ class SDKWrapper:
22
  model_path = aic.Model.download(model_id, models_dir)
23
  self.model = aic.Model.from_file(model_path)
24
 
25
- def init_processor(self, sample_rate: int, enhancement_level: float, allow_variable_frames: bool = False, num_frames: int | None = None):
 
26
  self.processor_sample_rate = sample_rate
27
- self.processor_optimal_frames = self.model.get_optimal_num_frames(sample_rate)
 
28
  config = aic.ProcessorConfig(
29
  sample_rate=sample_rate,
30
- num_channels=1,
31
- num_frames=self.processor_optimal_frames if num_frames is None else num_frames,
32
  allow_variable_frames=allow_variable_frames,
33
  )
34
- processor = aic.Processor(self.model, self.sdk_key, config)
 
 
 
35
  processor.get_processor_context().set_parameter(
36
  aic.ProcessorParameter.EnhancementLevel, float(enhancement_level)
37
  )
@@ -44,6 +43,13 @@ class SDKWrapper:
44
  aic.ProcessorParameter.EnhancementLevel, float(enhancement_level)
45
  )
46
 
 
 
 
 
 
 
 
47
  def process_sync(
48
  self,
49
  audio: np.ndarray,
@@ -51,12 +57,9 @@ class SDKWrapper:
51
  """
52
  audio_array: 2D NumPy array with shape (num_channels, samples) containing audio data to be enhanced
53
  """
54
- if len(audio.shape) == 1:
55
- audio = audio.reshape(1, -1)
56
- if audio.shape[0] > 2 or len(audio.shape) != 2:
57
- raise ValueError("Expected audio with shape (n, frames)")
58
  out = np.zeros_like(audio)
59
- chunk_size = self.processor_optimal_frames
60
  n = audio.shape[1]
61
  for i in range(0, n, chunk_size):
62
  chunk = audio[:, i : i + chunk_size]
@@ -71,18 +74,8 @@ class SDKWrapper:
71
  out[:, i : i + chunk_size] = enhanced[:, :chunk_size]
72
  return out
73
 
74
- def process_chunk(
75
- self,
76
- chunk: np.ndarray,
77
- ) -> np.ndarray:
78
- """
79
- Realtime processing: process a single chunk of audio and return enhanced chunk.
80
- """
81
- if not hasattr(self, "processor"):
82
- raise ValueError("Processor not initialized")
83
- chunk = np.asarray(chunk, dtype=np.float32).flatten()
84
- if chunk.size == 0:
85
- return chunk
86
- chunk_planar = chunk.reshape(1, -1)
87
- enhanced_planar = self.processor.process(chunk_planar)
88
- return enhanced_planar.flatten()
 
 
 
 
 
1
  import numpy as np
 
 
2
  from dotenv import load_dotenv
3
  import aic_sdk as aic
4
  import os
 
16
  model_path = aic.Model.download(model_id, models_dir)
17
  self.model = aic.Model.from_file(model_path)
18
 
19
+
20
+ def init_processor(self, sample_rate: int, enhancement_level: float, allow_variable_frames: bool = False, num_frames: int | None = None,num_channels: int = 1, sync: bool = True):
21
  self.processor_sample_rate = sample_rate
22
+ processor_optimal_frames = self.model.get_optimal_num_frames(sample_rate)
23
+ self.num_frames = num_frames if num_frames else processor_optimal_frames
24
  config = aic.ProcessorConfig(
25
  sample_rate=sample_rate,
26
+ num_channels=num_channels,
27
+ num_frames=self.num_frames,
28
  allow_variable_frames=allow_variable_frames,
29
  )
30
+ if sync:
31
+ processor = aic.Processor(self.model, self.sdk_key, config)
32
+ else:
33
+ processor = aic.ProcessorAsync(self.model, self.sdk_key, config)
34
  processor.get_processor_context().set_parameter(
35
  aic.ProcessorParameter.EnhancementLevel, float(enhancement_level)
36
  )
 
43
  aic.ProcessorParameter.EnhancementLevel, float(enhancement_level)
44
  )
45
 
46
+ def _check_shape(self, audio: np.ndarray) -> np.ndarray:
47
+ if len(audio.shape) == 1:
48
+ audio = audio.reshape(1, -1)
49
+ if audio.shape[0] > 2 or len(audio.shape) != 2:
50
+ raise ValueError("Expected audio with shape (n, frames)")
51
+ return audio
52
+
53
  def process_sync(
54
  self,
55
  audio: np.ndarray,
 
57
  """
58
  audio_array: 2D NumPy array with shape (num_channels, samples) containing audio data to be enhanced
59
  """
60
+ audio = self._check_shape(audio)
 
 
 
61
  out = np.zeros_like(audio)
62
+ chunk_size = self.num_frames
63
  n = audio.shape[1]
64
  for i in range(0, n, chunk_size):
65
  chunk = audio[:, i : i + chunk_size]
 
74
  out[:, i : i + chunk_size] = enhanced[:, :chunk_size]
75
  return out
76
 
77
+ def process_chunk(self, audio: np.ndarray) -> np.ndarray:
78
+ audio = self._check_shape(audio)
79
+ result = self.processor.process(audio)
80
+ return result
81
+
 
 
 
 
 
 
 
 
 
 
stt_streamers/deepgram_streamer.py CHANGED
@@ -1,6 +1,7 @@
1
  import json
2
  import os
3
  import threading
 
4
  import urllib.parse
5
  import numpy as np
6
  from websockets.sync.client import connect
@@ -33,10 +34,16 @@ class DeepgramStreamer:
33
  # Deepgram requires the API key in the headers
34
  headers = {"Authorization": f"Token {api_key}"}
35
  self.ws = connect(url_with_params, additional_headers=headers)
36
-
 
 
 
37
  # 3. Start the receiving thread
38
  self.thread = threading.Thread(target=self._receive_loop, daemon=True)
39
  self.thread.start()
 
 
 
40
 
41
  def stream_array(self, pcm: np.ndarray) -> str:
42
  """
@@ -86,16 +93,16 @@ class DeepgramStreamer:
86
  }
87
 
88
  def process_chunk(self, chunk: np.ndarray) -> None:
89
- """
90
- Converts float32 numpy array to int16 bytes and sends to WebSocket.
91
- """
92
  chunk = np.clip(chunk, -1.0, 1.0)
93
  chunk_int16 = (chunk * 32767).astype(np.int16)
94
- if len(chunk_int16) > 0:
95
- try:
 
 
96
  self.ws.send(chunk_int16.tobytes())
97
- except Exception:
98
- pass
 
99
 
100
  def render_tokens(
101
  self, final_tokens: list[dict], non_final_tokens: list[dict]
@@ -170,18 +177,41 @@ class DeepgramStreamer:
170
  print(f"Deepgram receive loop error: {e}")
171
  finally:
172
  self.finished_event.set()
173
-
174
- def close(self) -> str:
175
- """
176
- Sends the specific JSON message Deepgram expects to close the stream.
177
- """
178
- if hasattr(self, "ws"):
179
- try:
180
- # Deepgram V1 expects this specific JSON to close the stream
 
 
 
 
 
 
 
 
 
 
181
  self.ws.send(json.dumps({"type": "CloseStream"}))
182
- except Exception:
183
- pass
184
- return self.render_tokens(self.final_tokens, [])
 
 
 
 
 
 
 
 
 
 
 
 
 
185
 
186
  def _ensure_closed(self) -> None:
187
  """
 
1
  import json
2
  import os
3
  import threading
4
+ import time
5
  import urllib.parse
6
  import numpy as np
7
  from websockets.sync.client import connect
 
34
  # Deepgram requires the API key in the headers
35
  headers = {"Authorization": f"Token {api_key}"}
36
  self.ws = connect(url_with_params, additional_headers=headers)
37
+ self._send_lock = threading.Lock()
38
+ self._stop_evt = threading.Event()
39
+ self._last_send_ts = time.monotonic()
40
+
41
  # 3. Start the receiving thread
42
  self.thread = threading.Thread(target=self._receive_loop, daemon=True)
43
  self.thread.start()
44
+
45
+ self.keepalive_thread = threading.Thread(target=self._keepalive_loop, daemon=True)
46
+ self.keepalive_thread.start()
47
 
48
  def stream_array(self, pcm: np.ndarray) -> str:
49
  """
 
93
  }
94
 
95
  def process_chunk(self, chunk: np.ndarray) -> None:
 
 
 
96
  chunk = np.clip(chunk, -1.0, 1.0)
97
  chunk_int16 = (chunk * 32767).astype(np.int16)
98
+ if len(chunk_int16) == 0:
99
+ return
100
+ try:
101
+ with self._send_lock:
102
  self.ws.send(chunk_int16.tobytes())
103
+ self._last_send_ts = time.monotonic()
104
+ except Exception as e:
105
+ print(f"[{self.stream_name}] send failed: {e}")
106
 
107
  def render_tokens(
108
  self, final_tokens: list[dict], non_final_tokens: list[dict]
 
177
  print(f"Deepgram receive loop error: {e}")
178
  finally:
179
  self.finished_event.set()
180
+
181
+ def _keepalive_loop(self):
182
+ # Deepgram: KeepAlive als Text-Message senden citeturn4search25
183
+ while not self._stop_evt.is_set():
184
+ time.sleep(0.5)
185
+ if time.monotonic() - self._last_send_ts >= 3.0:
186
+ try:
187
+ with self._send_lock:
188
+ self.ws.send(json.dumps({"type": "KeepAlive"}))
189
+ self._last_send_ts = time.monotonic()
190
+ except Exception:
191
+ # bei Fehler: loop endet, send wird später reconnecten können
192
+ return
193
+
194
+ def close(self) -> None:
195
+ # Deepgram CloseStream: {"type":"CloseStream"} citeturn2view3
196
+ try:
197
+ with self._send_lock:
198
  self.ws.send(json.dumps({"type": "CloseStream"}))
199
+ except Exception:
200
+ pass
201
+
202
+ def shutdown(self) -> None:
203
+ self._stop_evt.set()
204
+ self.close()
205
+ try:
206
+ self.ws.close()
207
+ except Exception:
208
+ pass
209
+ if hasattr(self, "thread") and self.thread.is_alive():
210
+ self.thread.join(timeout=1.0)
211
+ if hasattr(self, "keepalive_thread") and self.keepalive_thread.is_alive():
212
+ self.keepalive_thread.join(timeout=1.0)
213
+
214
+
215
 
216
  def _ensure_closed(self) -> None:
217
  """