SreekarB commited on
Commit
ffaf1db
·
verified ·
1 Parent(s): 3fc4505

Upload 11 files

Browse files
Files changed (2) hide show
  1. app.py +314 -10
  2. hf_audio_utils.py +260 -96
app.py CHANGED
@@ -4,6 +4,8 @@ import time
4
  import argparse
5
  import asyncio
6
  import numpy as np
 
 
7
  from nova_sonic_tool_use import BedrockStreamManager, AudioStreamer
8
  from language_coach import LanguageCoach
9
  from session_manager import SessionManager
@@ -23,6 +25,7 @@ try:
23
  from hf_audio_utils import HFAudioStreamer
24
  HF_AUDIO_AVAILABLE = True
25
  except ImportError:
 
26
  HF_AUDIO_AVAILABLE = False
27
 
28
  # Try to import transformers audio utils for ffmpeg microphone
@@ -37,7 +40,57 @@ except ImportError:
37
  # Check if we're in HF Spaces
38
  def is_huggingface_spaces():
39
  """Detect if we're running on HuggingFace Spaces"""
40
- return "SPACE_ID" in os.environ or "SYSTEM" in os.environ and os.environ.get("SYSTEM") == "spaces"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
41
 
42
  # Create an ffmpeg microphone streamer function
43
  def create_ffmpeg_mic(sample_rate=INPUT_SAMPLE_RATE, chunk_length_s=1.0, stream_chunk_s=0.25):
@@ -78,6 +131,218 @@ class NovaConversationApp:
78
  self.loop = None
79
  self.audio_stream_task = None
80
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
81
  def start(self):
82
  """Start the conversation with Nova"""
83
  print("Starting conversation with Nova...")
@@ -115,11 +380,37 @@ class NovaConversationApp:
115
  self.stream_manager = BedrockStreamManager(model_id='amazon.nova-sonic-v1:0', region=region)
116
 
117
  # Initialize the appropriate audio streamer based on environment
118
- if is_huggingface_spaces() and HF_AUDIO_AVAILABLE:
119
- print("Using Hugging Face Spaces-optimized audio streamer")
120
- self.audio_streamer = HFAudioStreamer(self.stream_manager)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
121
  else:
122
- # Try to use ffmpeg microphone first if available
123
  if FFMPEG_AVAILABLE:
124
  print("Attempting to use ffmpeg microphone streamer")
125
  # Create ffmpeg microphone
@@ -128,8 +419,8 @@ class NovaConversationApp:
128
  # We'll handle ffmpeg in a separate thread after stream initialization
129
  print("Will use ffmpeg microphone for audio input")
130
 
131
- # Regardless of ffmpeg availability, initialize standard audio streamer as fallback
132
- print("Using standard audio streamer (with potential ffmpeg enhancement)")
133
  self.audio_streamer = AudioStreamer(self.stream_manager)
134
 
135
  # Initialize the stream in the event loop
@@ -158,6 +449,14 @@ class NovaConversationApp:
158
  # Initialize the stream
159
  await self.stream_manager.initialize_stream()
160
 
 
 
 
 
 
 
 
 
161
  # Start the streaming process using the built-in start_streaming method
162
  self.audio_stream_task = asyncio.create_task(self.audio_streamer.start_streaming())
163
 
@@ -301,12 +600,17 @@ def create_ui(app):
301
  gr.Markdown("""
302
  ### Hugging Face Spaces Mode
303
 
304
- This app is running in Hugging Face Spaces.
305
 
306
  1. Click **Start Conversation** to begin
307
  2. Nova will automatically greet you
308
- 3. Either speak into your microphone or use the text input below
309
- 4. Press **Stop Conversation** when done
 
 
 
 
 
310
  """)
311
 
312
  with gr.Row():
 
4
  import argparse
5
  import asyncio
6
  import numpy as np
7
+ import soundfile as sf
8
+ import tempfile
9
  from nova_sonic_tool_use import BedrockStreamManager, AudioStreamer
10
  from language_coach import LanguageCoach
11
  from session_manager import SessionManager
 
25
  from hf_audio_utils import HFAudioStreamer
26
  HF_AUDIO_AVAILABLE = True
27
  except ImportError:
28
+ print("HFAudioStreamer not available. Attempting to create it.")
29
  HF_AUDIO_AVAILABLE = False
30
 
31
  # Try to import transformers audio utils for ffmpeg microphone
 
40
  # Check if we're in HF Spaces
41
  def is_huggingface_spaces():
42
  """Detect if we're running on HuggingFace Spaces"""
43
+ return "SPACE_ID" in os.environ or ("SYSTEM" in os.environ and os.environ.get("SYSTEM") == "spaces")
44
+
45
+ # Set environment variables to suppress ALSA errors in HF Spaces
46
+ if is_huggingface_spaces():
47
+ os.environ['AUDIODEV'] = 'null'
48
+ # Redirect stderr to suppress ALSA errors in output
49
+ try:
50
+ import sys
51
+ import io
52
+ if not hasattr(sys, '_alsa_error_redirected'):
53
+ # Save the original stderr
54
+ sys._original_stderr = sys.stderr
55
+
56
+ # Create a filter to capture ALSA errors but pass through other messages
57
+ class ALSAErrorFilter:
58
+ def __init__(self, original_stderr):
59
+ self.original_stderr = original_stderr
60
+ self.buffer = ""
61
+
62
+ def write(self, text):
63
+ # If it's an ALSA error, suppress it
64
+ if "ALSA" in text or "PCM" in text:
65
+ return
66
+ # Otherwise, write to the original stderr
67
+ self.original_stderr.write(text)
68
+
69
+ def flush(self):
70
+ self.original_stderr.flush()
71
+
72
+ def isatty(self):
73
+ return hasattr(self.original_stderr, 'isatty') and self.original_stderr.isatty()
74
+
75
+ # Replace stderr with our filtered version
76
+ sys.stderr = ALSAErrorFilter(sys._original_stderr)
77
+
78
+ # Function to restore stderr
79
+ def restore_stderr():
80
+ if hasattr(sys, '_original_stderr'):
81
+ sys.stderr = sys._original_stderr
82
+ print("Restored original stderr")
83
+
84
+ # Mark that we've handled this
85
+ sys._alsa_error_redirected = True
86
+
87
+ # Restore stderr on exit
88
+ import atexit
89
+ atexit.register(restore_stderr)
90
+
91
+ print("Installed ALSA error filter to suppress audio device errors")
92
+ except:
93
+ pass
94
 
95
  # Create an ffmpeg microphone streamer function
96
  def create_ffmpeg_mic(sample_rate=INPUT_SAMPLE_RATE, chunk_length_s=1.0, stream_chunk_s=0.25):
 
131
  self.loop = None
132
  self.audio_stream_task = None
133
 
134
+ def _get_hf_audio_utils_content(self):
135
+ """Returns the content for a dynamically generated HFAudioStreamer module"""
136
+ return '''
137
+ import os
138
+ import asyncio
139
+ import numpy as np
140
+ import random
141
+ import time
142
+ import threading
143
+ import base64
144
+ import json
145
+ import tempfile
146
+ from concurrent.futures import ThreadPoolExecutor
147
+
148
+ # Try to import the Hugging Face-specific audio utilities
149
+ try:
150
+ from transformers.pipelines.audio_utils import ffmpeg_microphone_live
151
+ HF_AUDIO_AVAILABLE = True
152
+ except ImportError:
153
+ HF_AUDIO_AVAILABLE = False
154
+ print("Warning: transformers.pipelines.audio_utils not available, will use fallback audio simulation")
155
+
156
+ class HFAudioStreamer:
157
+ """Audio streamer for Hugging Face Spaces that works with or without real audio devices"""
158
+
159
+ def __init__(self, stream_manager):
160
+ """Initialize the HF Audio Streamer"""
161
+ self.stream_manager = stream_manager
162
+ self.is_streaming = False
163
+ self.use_ffmpeg = HF_AUDIO_AVAILABLE
164
+ self.mic_stream = None
165
+ self.executor = ThreadPoolExecutor(max_workers=2)
166
+ self.loop = asyncio.get_event_loop()
167
+
168
+ # Initialize tasks
169
+ self.input_task = None
170
+ self.output_task = None
171
+
172
+ # Check if we're in HF Spaces
173
+ self.is_hf_spaces = "SPACE_ID" in os.environ or ("SYSTEM" in os.environ and os.environ.get("SYSTEM") == "spaces")
174
+
175
+ # Create output directory for audio files
176
+ self.output_dir = os.path.join(tempfile.gettempdir(), "nova_output")
177
+ os.makedirs(self.output_dir, exist_ok=True)
178
+
179
+ print(f"HF Audio Streamer initialized. Using ffmpeg: {self.use_ffmpeg}, In HF Spaces: {self.is_hf_spaces}")
180
+ print(f"Audio output will be saved to: {self.output_dir}")
181
+
182
+ async def generate_simulated_input(self):
183
+ """Generate simulated audio input when real microphone isn't available"""
184
+ print("Starting simulated audio input")
185
+
186
+ while self.is_streaming:
187
+ try:
188
+ # Generate a dummy audio chunk with some basic noise
189
+ CHUNK_SIZE = 1024 # Standard audio chunk size
190
+ CHANNELS = 1 # Mono audio
191
+ samples = np.random.normal(0, 0.01, CHUNK_SIZE * CHANNELS).astype(np.float32)
192
+ audio_data = (samples * 32767).astype(np.int16).tobytes()
193
+
194
+ # Send to Bedrock
195
+ self.stream_manager.add_audio_chunk(audio_data)
196
+
197
+ # Wait between chunks
198
+ await asyncio.sleep(0.2)
199
+
200
+ # Occasionally send text to get a response
201
+ if random.random() < 0.05: # 5% chance
202
+ messages = [
203
+ "Hello there",
204
+ "How are you today?",
205
+ "Tell me something interesting",
206
+ "What's the weather like?",
207
+ "I'm learning to speak more fluently"
208
+ ]
209
+ message = random.choice(messages)
210
+ await self.send_text_message(message)
211
+ await asyncio.sleep(2.0)
212
+
213
+ except Exception as e:
214
+ if self.is_streaming:
215
+ print(f"Error generating simulated audio: {e}")
216
+ await asyncio.sleep(0.5)
217
+
218
+ async def play_output_audio(self):
219
+ """Handle audio output from Nova Sonic"""
220
+ while self.is_streaming:
221
+ try:
222
+ # Get audio data from the stream manager's queue
223
+ audio_data = await asyncio.wait_for(
224
+ self.stream_manager.audio_output_queue.get(),
225
+ timeout=0.5
226
+ )
227
+
228
+ if audio_data and self.is_streaming:
229
+ # Store info in output queue for other parts of the app
230
+ self.stream_manager.output_queue.put_nowait({
231
+ "event": {
232
+ "audioOutput": {
233
+ "content": "Audio received from Nova"
234
+ }
235
+ }
236
+ })
237
+
238
+ # In HF Spaces, we can't play audio directly, but we can save it
239
+ timestamp = int(time.time())
240
+ output_path = os.path.join(self.output_dir, f"nova_response_{timestamp}.wav")
241
+
242
+ try:
243
+ # Convert from raw PCM to numpy for saving
244
+ audio_np = np.frombuffer(audio_data, dtype=np.int16)
245
+ # We can't import soundfile here, so we'll just log the info
246
+ print(f"Would save Nova audio response ({len(audio_np)} samples) to {output_path}")
247
+ except Exception as e:
248
+ print(f"Error handling audio response: {e}")
249
+
250
+ except asyncio.TimeoutError:
251
+ # No data available within timeout
252
+ continue
253
+ except Exception as e:
254
+ if self.is_streaming:
255
+ print(f"Error handling output audio: {e}")
256
+ await asyncio.sleep(0.1)
257
+
258
+ async def start_streaming(self):
259
+ """Start streaming audio"""
260
+ if self.is_streaming:
261
+ return
262
+
263
+ print(f"Starting audio streaming in HF mode...")
264
+
265
+ # Send audio content start event
266
+ await self.stream_manager.send_audio_content_start_event()
267
+
268
+ self.is_streaming = True
269
+
270
+ # Start with a welcome message from Nova
271
+ await self.send_text_message("Hi there! I'm Nova, your conversation partner. How are you doing today?")
272
+
273
+ # Start simulated input
274
+ self.input_task = asyncio.create_task(self.generate_simulated_input())
275
+
276
+ # Start output processing
277
+ self.output_task = asyncio.create_task(self.play_output_audio())
278
+
279
+ async def send_text_message(self, text):
280
+ """Send a text message to Nova to simulate user input"""
281
+ try:
282
+ # Create text content start event
283
+ content_name = str(time.time())
284
+ text_content_start = self.stream_manager.TEXT_CONTENT_START_EVENT % (
285
+ self.stream_manager.prompt_name,
286
+ content_name,
287
+ "USER"
288
+ )
289
+ await self.stream_manager.send_raw_event(text_content_start)
290
+
291
+ # Create text input event
292
+ text_input = self.stream_manager.TEXT_INPUT_EVENT % (
293
+ self.stream_manager.prompt_name,
294
+ content_name,
295
+ text
296
+ )
297
+ await self.stream_manager.send_raw_event(text_input)
298
+
299
+ # Create content end event
300
+ content_end = self.stream_manager.CONTENT_END_EVENT % (
301
+ self.stream_manager.prompt_name,
302
+ content_name
303
+ )
304
+ await self.stream_manager.send_raw_event(content_end)
305
+
306
+ print(f"Sent text message to Nova: {text}")
307
+
308
+ # Also add message to output queue for UI
309
+ await self.stream_manager.output_queue.put({
310
+ "event": {
311
+ "textOutput": {
312
+ "content": text,
313
+ "role": "USER"
314
+ }
315
+ }
316
+ })
317
+
318
+ return True
319
+ except Exception as e:
320
+ print(f"Error sending text message: {e}")
321
+ return False
322
+
323
+ async def stop_streaming(self):
324
+ """Stop streaming audio"""
325
+ if not self.is_streaming:
326
+ return
327
+
328
+ self.is_streaming = False
329
+ print("Stopping HF audio streaming...")
330
+
331
+ # Cancel all tasks
332
+ if self.input_task and not self.input_task.done():
333
+ self.input_task.cancel()
334
+ if self.output_task and not self.output_task.done():
335
+ self.output_task.cancel()
336
+
337
+ # Shutdown executor
338
+ self.executor.shutdown(wait=False)
339
+
340
+ # Always close the stream manager
341
+ await self.stream_manager.close()
342
+
343
+ print("HF audio streaming stopped")
344
+ '''
345
+
346
  def start(self):
347
  """Start the conversation with Nova"""
348
  print("Starting conversation with Nova...")
 
380
  self.stream_manager = BedrockStreamManager(model_id='amazon.nova-sonic-v1:0', region=region)
381
 
382
  # Initialize the appropriate audio streamer based on environment
383
+ if is_huggingface_spaces():
384
+ # For HF Spaces, prefer our custom HF audio streamer
385
+ if HF_AUDIO_AVAILABLE:
386
+ print("Using Hugging Face Spaces-optimized audio streamer")
387
+ self.audio_streamer = HFAudioStreamer(self.stream_manager)
388
+ else:
389
+ # Create HFAudioStreamer dynamically if not imported
390
+ try:
391
+ print("Creating HFAudioStreamer dynamically")
392
+ # Write module to a temporary file
393
+ module_content = self._get_hf_audio_utils_content()
394
+ temp_dir = tempfile.mkdtemp()
395
+ module_path = os.path.join(temp_dir, "dynamic_hf_audio.py")
396
+
397
+ with open(module_path, 'w') as f:
398
+ f.write(module_content)
399
+
400
+ import sys
401
+ sys.path.append(temp_dir)
402
+
403
+ # Import the module
404
+ import dynamic_hf_audio
405
+ self.audio_streamer = dynamic_hf_audio.HFAudioStreamer(self.stream_manager)
406
+ print("Successfully created dynamic HFAudioStreamer")
407
+ except Exception as e:
408
+ print(f"Failed to create dynamic HFAudioStreamer: {e}")
409
+ # Fall back to standard audio streamer
410
+ print("Falling back to standard audio streamer")
411
+ self.audio_streamer = AudioStreamer(self.stream_manager)
412
  else:
413
+ # For local environments, try ffmpeg first
414
  if FFMPEG_AVAILABLE:
415
  print("Attempting to use ffmpeg microphone streamer")
416
  # Create ffmpeg microphone
 
419
  # We'll handle ffmpeg in a separate thread after stream initialization
420
  print("Will use ffmpeg microphone for audio input")
421
 
422
+ # Initialize standard audio streamer
423
+ print("Using standard audio streamer" + (" with ffmpeg enhancement" if self.ffmpeg_mic else ""))
424
  self.audio_streamer = AudioStreamer(self.stream_manager)
425
 
426
  # Initialize the stream in the event loop
 
449
  # Initialize the stream
450
  await self.stream_manager.initialize_stream()
451
 
452
+ # Restore stderr after stream initialization if we redirected it
453
+ try:
454
+ if hasattr(sys, '_alsa_error_redirected') and hasattr(sys, '_original_stderr'):
455
+ sys.stderr = sys._original_stderr
456
+ print("Restored stderr after stream initialization")
457
+ except:
458
+ pass
459
+
460
  # Start the streaming process using the built-in start_streaming method
461
  self.audio_stream_task = asyncio.create_task(self.audio_streamer.start_streaming())
462
 
 
600
  gr.Markdown("""
601
  ### Hugging Face Spaces Mode
602
 
603
+ This app is running in Hugging Face Spaces with speech-to-speech functionality.
604
 
605
  1. Click **Start Conversation** to begin
606
  2. Nova will automatically greet you
607
+ 3. The app simulates speech input since real microphones aren't available in this environment
608
+ 4. Nova's audio responses are saved as WAV files in a temporary directory
609
+ 5. You'll see text transcriptions of the conversation in real-time
610
+ 6. You can also use the text input below to send messages to Nova
611
+ 7. Press **Stop Conversation** when done
612
+
613
+ Note: ALSA errors in the logs are normal and expected - the app handles them automatically.
614
  """)
615
 
616
  with gr.Row():
hf_audio_utils.py CHANGED
@@ -10,6 +10,10 @@ import random
10
  import time
11
  import threading
12
  import base64
 
 
 
 
13
 
14
  # Try to import the Hugging Face-specific audio utilities
15
  try:
@@ -28,136 +32,253 @@ class HFAudioStreamer:
28
  self.is_streaming = False
29
  self.use_ffmpeg = HF_AUDIO_AVAILABLE
30
  self.mic_stream = None
31
- self.mic_thread = None
32
  self.loop = asyncio.get_event_loop()
33
 
 
 
 
 
34
  # Check if we're in HF Spaces
35
  self.is_hf_spaces = "SPACE_ID" in os.environ or ("SYSTEM" in os.environ and os.environ.get("SYSTEM") == "spaces")
36
 
 
 
 
 
37
  print(f"HF Audio Streamer initialized. Using ffmpeg: {self.use_ffmpeg}, In HF Spaces: {self.is_hf_spaces}")
 
38
 
39
- def _mic_thread_worker(self):
40
- """Thread function to capture audio from ffmpeg and send it to the stream manager"""
41
  if not self.use_ffmpeg:
42
- return
 
 
 
 
 
 
43
 
44
- print("Starting microphone capture using ffmpeg")
45
-
46
  try:
47
- # Set up the mic stream with ffmpeg
48
  sampling_rate = 16000 # 16kHz as required by Nova Sonic
49
- chunk_length_s = 2.0 # Process 2 seconds at a time
50
  stream_chunk_s = 0.25 # Stream in 0.25 second chunks
51
 
52
- # Create the mic stream
53
- self.mic_stream = ffmpeg_microphone_live(
54
- sampling_rate=sampling_rate,
55
- chunk_length_s=chunk_length_s,
56
- stream_chunk_s=stream_chunk_s,
 
 
 
 
57
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
58
 
59
- # Process audio chunks
 
 
 
 
 
 
60
  for audio_chunk in self.mic_stream:
61
  if not self.is_streaming:
62
  break
63
 
64
- # Convert the float32 numpy array to int16 bytes
65
  if isinstance(audio_chunk, np.ndarray):
66
- # Scale from [-1.0, 1.0] to int16 range
67
  audio_int16 = (audio_chunk * 32767).astype(np.int16)
68
  audio_bytes = audio_int16.tobytes()
69
 
70
  # Send to Bedrock
71
- asyncio.run_coroutine_threadsafe(
72
- self._send_audio_chunk(audio_bytes),
73
- self.loop
74
- )
75
 
 
 
 
 
 
 
 
 
 
 
 
76
  except Exception as e:
77
- print(f"Error in microphone thread: {e}")
 
 
 
78
  if self.is_streaming:
79
- # Fall back to simulated audio if ffmpeg fails
80
  print("Falling back to simulated audio input")
81
- self.use_ffmpeg = False
82
- asyncio.run_coroutine_threadsafe(
83
- self.generate_simulated_input(),
84
- self.loop
85
- )
86
-
87
- async def _send_audio_chunk(self, audio_bytes):
88
- """Send an audio chunk to the stream manager"""
89
- if self.is_streaming and self.stream_manager and audio_bytes:
90
- self.stream_manager.add_audio_chunk(audio_bytes)
 
91
 
92
  async def generate_simulated_input(self):
93
- """Generate simulated audio input"""
94
- import numpy as np
95
- print("Generating simulated audio input...")
96
 
97
- CHUNK_SIZE = 1024 # Standard audio chunk size
98
- CHANNELS = 1 # Mono audio
99
-
100
- while self.is_streaming:
101
- try:
102
- # Generate a dummy audio chunk with some basic noise
103
- # This simulates someone speaking into the microphone
104
- samples = np.random.normal(0, 0.01, CHUNK_SIZE * CHANNELS).astype(np.float32)
105
- audio_data = (samples * 32767).astype(np.int16).tobytes()
 
 
106
 
107
- # Send to Bedrock
108
- await self._send_audio_chunk(audio_data)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
109
 
110
- # Wait a bit between chunks
111
- await asyncio.sleep(0.05)
112
 
113
- # Occasionally "end" the simulated speech to get a response
114
- if random.random() < 0.05: # 5% chance to end speech
115
- print("Simulated speech ended, awaiting response...")
116
- await asyncio.sleep(1.0) # Wait longer between "sentences"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
117
 
118
- except Exception as e:
119
- if self.is_streaming:
120
- print(f"Error generating simulated audio: {e}")
121
- await asyncio.sleep(0.5)
122
-
123
  async def play_output_audio(self):
124
- """Handle audio output (in Hugging Face, we just log it)"""
125
  while self.is_streaming:
126
  try:
127
  # Get audio data from the stream manager's queue
128
  audio_data = await asyncio.wait_for(
129
  self.stream_manager.audio_output_queue.get(),
130
- timeout=0.1
131
  )
132
 
133
  if audio_data and self.is_streaming:
134
- # In HF Spaces, just log that we received audio
135
- audio_size = len(audio_data)
136
- print(f"Received {audio_size} bytes of audio from Nova")
137
-
138
- # Store the audio for potential replay
139
  self.stream_manager.output_queue.put_nowait({
140
  "event": {
141
  "audioOutput": {
142
- "content": "Audio would play here if audio devices were available"
143
  }
144
  }
145
  })
 
 
 
 
 
 
 
 
 
 
 
 
 
146
  except asyncio.TimeoutError:
147
- # No message received within timeout, continue
148
  continue
149
  except Exception as e:
150
  if self.is_streaming:
151
- print(f"Error processing audio output: {str(e)}")
152
- await asyncio.sleep(0.05)
153
-
 
 
154
  async def start_streaming(self):
155
  """Start streaming audio"""
156
  if self.is_streaming:
157
  return
 
 
158
 
159
- print("Starting HF audio streaming...")
160
- print("Press Enter to stop streaming...")
 
 
 
161
 
162
  # Send audio content start event
163
  await self.stream_manager.send_audio_content_start_event()
@@ -167,24 +288,44 @@ class HFAudioStreamer:
167
  # Start with a welcome message from Nova
168
  await self.send_text_message("Hi there! I'm Nova, your conversation partner. How are you doing today?")
169
 
170
- # Set up tasks based on mode
171
- if self.use_ffmpeg:
172
- # Start the ffmpeg microphone thread
173
- self.mic_thread = threading.Thread(target=self._mic_thread_worker)
174
- self.mic_thread.daemon = True
175
- self.mic_thread.start()
176
- else:
177
- # Use simulated input
178
- asyncio.create_task(self.generate_simulated_input())
 
 
 
 
 
 
 
 
179
 
180
- # Always process output
181
- output_task = asyncio.create_task(self.play_output_audio())
 
 
 
 
 
 
182
 
183
- # Wait for user to press Enter to stop
184
- await asyncio.get_event_loop().run_in_executor(None, input)
 
 
 
 
 
 
185
 
186
- # Once input() returns, stop streaming
187
- await self.stop_streaming()
188
 
189
  async def send_text_message(self, text):
190
  """Send a text message to Nova to simulate user input"""
@@ -214,31 +355,54 @@ class HFAudioStreamer:
214
  await self.stream_manager.send_raw_event(content_end)
215
 
216
  print(f"Sent text message to Nova: {text}")
 
 
 
 
 
 
 
 
 
 
 
217
  return True
218
  except Exception as e:
219
  print(f"Error sending text message: {e}")
220
  return False
221
-
222
  async def stop_streaming(self):
223
  """Stop streaming audio"""
224
  if not self.is_streaming:
225
  return
226
 
227
- print("Stopping HF audio streaming...")
228
  self.is_streaming = False
 
229
 
230
- # Stop the ffmpeg mic stream if it's active
231
- if self.mic_stream:
 
 
 
 
 
 
 
 
 
 
 
 
232
  try:
233
  self.mic_stream.close()
234
  except:
235
  pass
236
  self.mic_stream = None
237
-
238
- # Wait for the thread to finish if it exists
239
- if self.mic_thread and self.mic_thread.is_alive():
240
- self.mic_thread.join(timeout=2.0)
241
- self.mic_thread = None
242
 
243
  # Always close the stream manager
244
- await self.stream_manager.close()
 
 
 
10
  import time
11
  import threading
12
  import base64
13
+ import json
14
+ import tempfile
15
+ import soundfile as sf
16
+ from concurrent.futures import ThreadPoolExecutor
17
 
18
  # Try to import the Hugging Face-specific audio utilities
19
  try:
 
32
  self.is_streaming = False
33
  self.use_ffmpeg = HF_AUDIO_AVAILABLE
34
  self.mic_stream = None
35
+ self.executor = ThreadPoolExecutor(max_workers=2)
36
  self.loop = asyncio.get_event_loop()
37
 
38
+ # Initialize tasks
39
+ self.input_task = None
40
+ self.output_task = None
41
+
42
  # Check if we're in HF Spaces
43
  self.is_hf_spaces = "SPACE_ID" in os.environ or ("SYSTEM" in os.environ and os.environ.get("SYSTEM") == "spaces")
44
 
45
+ # Create output directory for audio files
46
+ self.output_dir = os.path.join(tempfile.gettempdir(), "nova_output")
47
+ os.makedirs(self.output_dir, exist_ok=True)
48
+
49
  print(f"HF Audio Streamer initialized. Using ffmpeg: {self.use_ffmpeg}, In HF Spaces: {self.is_hf_spaces}")
50
+ print(f"Audio output will be saved to: {self.output_dir}")
51
 
52
+ async def initialize_ffmpeg_mic(self):
53
+ """Initialize the FFMPEG microphone if available"""
54
  if not self.use_ffmpeg:
55
+ return False
56
+
57
+ # If we're in HF Spaces, expect ALSA errors and handle them gracefully
58
+ if self.is_hf_spaces:
59
+ print("HF Spaces detected - ALSA errors are expected and will be handled")
60
+ # Set environment variable to suppress ALSA errors
61
+ os.environ['AUDIODEV'] = 'null'
62
 
 
 
63
  try:
64
+ # Create in a thread to avoid blocking
65
  sampling_rate = 16000 # 16kHz as required by Nova Sonic
66
+ chunk_length_s = 0.5 # Process 0.5 seconds at a time
67
  stream_chunk_s = 0.25 # Stream in 0.25 second chunks
68
 
69
+ # In HF Spaces, we expect this to fail with ALSA errors
70
+ # But we'll try anyway in case they add audio support later
71
+ self.mic_stream = await self.loop.run_in_executor(
72
+ self.executor,
73
+ lambda: ffmpeg_microphone_live(
74
+ sampling_rate=sampling_rate,
75
+ chunk_length_s=chunk_length_s,
76
+ stream_chunk_s=stream_chunk_s
77
+ )
78
  )
79
+ print("Successfully initialized FFMPEG microphone")
80
+ return True
81
+ except Exception as e:
82
+ # Check for ALSA errors which are expected in Hugging Face Spaces
83
+ error_str = str(e)
84
+ if "ALSA" in error_str and "PCM" in error_str:
85
+ print("ALSA audio device errors detected - this is expected in cloud environments")
86
+ print("Switching to simulated audio input (no real microphone will be used)")
87
+ else:
88
+ print(f"Error initializing FFMPEG microphone: {e}")
89
+
90
+ # Always fall back to simulated audio in HF Spaces
91
+ self.use_ffmpeg = False
92
+ return False
93
+
94
+ async def ffmpeg_audio_processor(self):
95
+ """Process audio from ffmpeg microphone"""
96
+ if not self.mic_stream:
97
+ print("FFMPEG microphone not initialized")
98
+ self.use_ffmpeg = False
99
+ return
100
 
101
+ print("Starting FFMPEG audio processing")
102
+ try:
103
+ # Track for logging
104
+ chunks_processed = 0
105
+ last_log_time = time.time()
106
+
107
+ # Use the mic_stream as an iterator
108
  for audio_chunk in self.mic_stream:
109
  if not self.is_streaming:
110
  break
111
 
112
+ # Process the chunk
113
  if isinstance(audio_chunk, np.ndarray):
114
+ # Convert float32 [-1.0, 1.0] to int16 for Nova Sonic
115
  audio_int16 = (audio_chunk * 32767).astype(np.int16)
116
  audio_bytes = audio_int16.tobytes()
117
 
118
  # Send to Bedrock
119
+ self.stream_manager.add_audio_chunk(audio_bytes)
 
 
 
120
 
121
+ # Log periodically to show activity
122
+ chunks_processed += 1
123
+ current_time = time.time()
124
+ if current_time - last_log_time > 2.0:
125
+ print(f"FFMPEG audio: processed {chunks_processed} chunks")
126
+ chunks_processed = 0
127
+ last_log_time = current_time
128
+
129
+ # Add a small sleep to prevent tight loops
130
+ await asyncio.sleep(0.01)
131
+
132
  except Exception as e:
133
+ print(f"Error in FFMPEG audio processor: {e}")
134
+ # If the ffmpeg processor fails, fall back to simulated audio
135
+ self.use_ffmpeg = False
136
+ # Start simulated input if we're still streaming
137
  if self.is_streaming:
 
138
  print("Falling back to simulated audio input")
139
+ asyncio.create_task(self.generate_simulated_input())
140
+
141
+ finally:
142
+ # Cleanup
143
+ if hasattr(self.mic_stream, 'close'):
144
+ try:
145
+ self.mic_stream.close()
146
+ except:
147
+ pass
148
+ self.mic_stream = None
149
+ print("FFMPEG audio processor stopped")
150
 
151
  async def generate_simulated_input(self):
152
+ """Generate simulated audio input when real microphone isn't available"""
153
+ print("Starting simulated audio input")
 
154
 
155
+ # Create a few temporary audio files with silence/noise
156
+ audio_files = []
157
+ for i in range(3):
158
+ noise_level = 0.01 * (i + 1) # Vary noise level
159
+ duration = 1.0 # 1 second of audio
160
+ samples = np.random.normal(0, noise_level, int(16000 * duration))
161
+
162
+ # Create temporary file
163
+ with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
164
+ sf.write(f.name, samples, 16000)
165
+ audio_files.append(f.name)
166
 
167
+ try:
168
+ # Send simulated speech in a pattern
169
+ sequence_count = 0
170
+ while self.is_streaming:
171
+ # Choose a random file
172
+ file_path = np.random.choice(audio_files)
173
+
174
+ # Load the audio
175
+ try:
176
+ audio_data, _ = sf.read(file_path)
177
+ audio_int16 = (audio_data * 32767).astype(np.int16)
178
+ audio_bytes = audio_int16.tobytes()
179
+
180
+ # Send to Bedrock
181
+ self.stream_manager.add_audio_chunk(audio_bytes)
182
+ except Exception as e:
183
+ print(f"Error processing simulated audio file: {e}")
184
+
185
+ # Wait between chunks
186
+ await asyncio.sleep(0.2)
187
 
188
+ # Increment sequence counter
189
+ sequence_count += 1
190
 
191
+ # After a sequence of noise, send text to get a response
192
+ if sequence_count >= 10: # After 10 chunks (about 2 seconds)
193
+ sequence_count = 0
194
+ # Send text instead of more simulated audio
195
+ messages = [
196
+ "Hello there",
197
+ "How are you today?",
198
+ "Tell me something interesting",
199
+ "What's the weather like?",
200
+ "I'm learning to speak more fluently"
201
+ ]
202
+ message = np.random.choice(messages)
203
+ await self.send_text_message(message)
204
+ # Add transcription to the output queue for UI
205
+ await self.stream_manager.output_queue.put({
206
+ "event": {
207
+ "textOutput": {
208
+ "content": message,
209
+ "role": "USER"
210
+ }
211
+ }
212
+ })
213
+ # Wait for Nova to respond
214
+ await asyncio.sleep(3.0)
215
+
216
+ except Exception as e:
217
+ print(f"Error in simulated audio generator: {e}")
218
+ import traceback
219
+ traceback.print_exc()
220
+ finally:
221
+ # Clean up temp files
222
+ for file_path in audio_files:
223
+ try:
224
+ os.unlink(file_path)
225
+ except:
226
+ pass
227
 
 
 
 
 
 
228
  async def play_output_audio(self):
229
+ """Handle audio output from Nova Sonic"""
230
  while self.is_streaming:
231
  try:
232
  # Get audio data from the stream manager's queue
233
  audio_data = await asyncio.wait_for(
234
  self.stream_manager.audio_output_queue.get(),
235
+ timeout=0.5
236
  )
237
 
238
  if audio_data and self.is_streaming:
239
+ # Store info in output queue for other parts of the app
 
 
 
 
240
  self.stream_manager.output_queue.put_nowait({
241
  "event": {
242
  "audioOutput": {
243
+ "content": "Audio received from Nova"
244
  }
245
  }
246
  })
247
+
248
+ # In HF Spaces, we can't play audio directly, but we can save it
249
+ timestamp = int(time.time())
250
+ output_path = os.path.join(self.output_dir, f"nova_response_{timestamp}.wav")
251
+
252
+ try:
253
+ # Convert from raw PCM to numpy for soundfile
254
+ audio_np = np.frombuffer(audio_data, dtype=np.int16)
255
+ sf.write(output_path, audio_np, 24000) # Nova outputs at 24kHz
256
+ print(f"Saved Nova audio response to {output_path}")
257
+ except Exception as e:
258
+ print(f"Error saving audio response: {e}")
259
+
260
  except asyncio.TimeoutError:
261
+ # No data available within timeout
262
  continue
263
  except Exception as e:
264
  if self.is_streaming:
265
+ print(f"Error handling output audio: {e}")
266
+ import traceback
267
+ traceback.print_exc()
268
+ await asyncio.sleep(0.1)
269
+
270
  async def start_streaming(self):
271
  """Start streaming audio"""
272
  if self.is_streaming:
273
  return
274
+
275
+ print(f"Starting audio streaming in HF mode...")
276
 
277
+ # For HF Spaces, we'll use our enhanced error handling
278
+ if self.is_hf_spaces:
279
+ # Set environment variables to help with audio issues
280
+ os.environ['AUDIODEV'] = 'null'
281
+ os.environ['SDL_AUDIODRIVER'] = 'dummy'
282
 
283
  # Send audio content start event
284
  await self.stream_manager.send_audio_content_start_event()
 
288
  # Start with a welcome message from Nova
289
  await self.send_text_message("Hi there! I'm Nova, your conversation partner. How are you doing today?")
290
 
291
+ # In HF Spaces, just go straight to simulated audio to avoid ALSA errors
292
+ if self.is_hf_spaces:
293
+ print("Running in Hugging Face Spaces - using simulated audio")
294
+ self.use_ffmpeg = False
295
+ self.input_task = asyncio.create_task(self.generate_simulated_input())
296
+ self.output_task = asyncio.create_task(self.play_output_audio())
297
+
298
+ # Let the user know what's happening
299
+ print("Speech-to-speech functionality is active:")
300
+ print("- Simulated audio is being sent to Nova Sonic")
301
+ print("- Nova's responses will be saved as WAV files")
302
+ print("- Conversation will be shown as text transcriptions")
303
+
304
+ return
305
+
306
+ # For non-HF environments, try the ffmpeg approach
307
+ tasks = []
308
 
309
+ # Initialize FFMPEG mic if available and create audio input task
310
+ if self.use_ffmpeg:
311
+ ffmpeg_available = await self.initialize_ffmpeg_mic()
312
+ if ffmpeg_available:
313
+ self.input_task = asyncio.create_task(self.ffmpeg_audio_processor())
314
+ tasks.append(self.input_task)
315
+ else:
316
+ self.use_ffmpeg = False
317
 
318
+ # Fall back to simulated audio if FFMPEG isn't available
319
+ if not self.use_ffmpeg:
320
+ self.input_task = asyncio.create_task(self.generate_simulated_input())
321
+ tasks.append(self.input_task)
322
+
323
+ # Start output processing
324
+ self.output_task = asyncio.create_task(self.play_output_audio())
325
+ tasks.append(self.output_task)
326
 
327
+ # Let the tasks run - we won't wait for input() here because that's handled in the UI
328
+ # This will allow the tasks to continue running until stop_streaming is called
329
 
330
  async def send_text_message(self, text):
331
  """Send a text message to Nova to simulate user input"""
 
355
  await self.stream_manager.send_raw_event(content_end)
356
 
357
  print(f"Sent text message to Nova: {text}")
358
+
359
+ # Also add message to output queue for UI
360
+ await self.stream_manager.output_queue.put({
361
+ "event": {
362
+ "textOutput": {
363
+ "content": text,
364
+ "role": "USER"
365
+ }
366
+ }
367
+ })
368
+
369
  return True
370
  except Exception as e:
371
  print(f"Error sending text message: {e}")
372
  return False
373
+
374
  async def stop_streaming(self):
375
  """Stop streaming audio"""
376
  if not self.is_streaming:
377
  return
378
 
 
379
  self.is_streaming = False
380
+ print("Stopping HF audio streaming...")
381
 
382
+ # Cancel all tasks
383
+ tasks = []
384
+ if self.input_task and not self.input_task.done():
385
+ self.input_task.cancel()
386
+ tasks.append(self.input_task)
387
+ if self.output_task and not self.output_task.done():
388
+ self.output_task.cancel()
389
+ tasks.append(self.output_task)
390
+
391
+ if tasks:
392
+ await asyncio.gather(*tasks, return_exceptions=True)
393
+
394
+ # Close ffmpeg mic if open
395
+ if self.mic_stream and hasattr(self.mic_stream, 'close'):
396
  try:
397
  self.mic_stream.close()
398
  except:
399
  pass
400
  self.mic_stream = None
401
+
402
+ # Shutdown executor
403
+ self.executor.shutdown(wait=False)
 
 
404
 
405
  # Always close the stream manager
406
+ await self.stream_manager.close()
407
+
408
+ print("HF audio streaming stopped")