SreekarB commited on
Commit
288e434
·
verified ·
1 Parent(s): f7b85fd

Upload 11 files

Browse files
Files changed (5) hide show
  1. app.py +241 -7
  2. hf_audio_utils.py +36 -0
  3. nova_sonic_tool_use.py +36 -0
  4. old_sonic_use.py +885 -0
  5. session_manager.py +8 -0
app.py CHANGED
@@ -3,10 +3,11 @@ import threading
3
  import time
4
  import argparse
5
  import asyncio
 
6
  from nova_sonic_tool_use import BedrockStreamManager, AudioStreamer
7
  from language_coach import LanguageCoach
8
  from session_manager import SessionManager
9
- from config import UI_TITLE, UI_SUBTITLE
10
  import gradio as gr
11
 
12
  # Import dotenv for environment variables if available
@@ -24,11 +25,38 @@ try:
24
  except ImportError:
25
  HF_AUDIO_AVAILABLE = False
26
 
 
 
 
 
 
 
 
 
 
27
  # Check if we're in HF Spaces
28
  def is_huggingface_spaces():
29
  """Detect if we're running on HuggingFace Spaces"""
30
  return "SPACE_ID" in os.environ or "SYSTEM" in os.environ and os.environ.get("SYSTEM") == "spaces"
31
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
32
  class NovaConversationApp:
33
  def __init__(self, session_id=None):
34
  # Initialize core components
@@ -54,6 +82,8 @@ class NovaConversationApp:
54
  """Start the conversation with Nova"""
55
  print("Starting conversation with Nova...")
56
  self.is_running = True
 
 
57
 
58
  # Create event loop in the current thread if needed
59
  try:
@@ -74,7 +104,7 @@ class NovaConversationApp:
74
 
75
  error_msg = f"Missing AWS credentials: {', '.join(missing)}"
76
  # Check if running in Hugging Face Spaces
77
- if "SPACE_ID" in os.environ or ("SYSTEM" in os.environ and os.environ.get("SYSTEM") == "spaces"):
78
  error_msg += "\nPlease add these as secrets in your Hugging Face Space settings."
79
  else:
80
  error_msg += "\nPlease set these environment variables or add them to a .env file."
@@ -89,12 +119,31 @@ class NovaConversationApp:
89
  print("Using Hugging Face Spaces-optimized audio streamer")
90
  self.audio_streamer = HFAudioStreamer(self.stream_manager)
91
  else:
92
- print("Using standard audio streamer")
 
 
 
 
 
 
 
 
 
 
93
  self.audio_streamer = AudioStreamer(self.stream_manager)
94
 
95
  # Initialize the stream in the event loop
96
  self.loop.run_until_complete(self._initialize_streaming())
97
 
 
 
 
 
 
 
 
 
 
98
  # Monitor output text for session history and language coaching
99
  asyncio.run_coroutine_threadsafe(self._monitor_output(), self.loop)
100
 
@@ -179,6 +228,44 @@ class NovaConversationApp:
179
  return True
180
  return False
181
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
182
  def stop(self):
183
  """Stop the conversation and clean up resources"""
184
  if not self.is_running:
@@ -186,6 +273,14 @@ class NovaConversationApp:
186
 
187
  self.is_running = False
188
 
 
 
 
 
 
 
 
 
189
  # Clean up the audio streamer and stream manager
190
  if self.loop and self.audio_streamer:
191
  asyncio.run_coroutine_threadsafe(
@@ -201,6 +296,19 @@ def create_ui(app):
201
  gr.Markdown(f"# {UI_TITLE}")
202
  gr.Markdown(f"## {UI_SUBTITLE}")
203
 
 
 
 
 
 
 
 
 
 
 
 
 
 
204
  with gr.Row():
205
  status_indicator = gr.Textbox(
206
  value="Ready to start",
@@ -208,15 +316,53 @@ def create_ui(app):
208
  interactive=False
209
  )
210
 
 
211
  with gr.Row():
212
- start_button = gr.Button("Start Conversation")
213
- stop_button = gr.Button("Stop Conversation")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
214
  replay_button = gr.Button("Replay Last Response")
215
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
216
  # Define UI interactions
217
  def start_conversation():
218
  if app.start():
219
- return "Conversation started - Speak into your microphone"
220
  return "Failed to start conversation"
221
 
222
  def stop_conversation():
@@ -227,12 +373,100 @@ def create_ui(app):
227
  if app.replay_last_response():
228
  return "Replaying last response"
229
  return "No response to replay"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
230
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
231
  # Wire up the UI interactions
232
  start_button.click(start_conversation, outputs=status_indicator)
233
  stop_button.click(stop_conversation, outputs=status_indicator)
234
  replay_button.click(replay_last, outputs=status_indicator)
235
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
236
  return ui
237
 
238
  if __name__ == "__main__":
 
3
  import time
4
  import argparse
5
  import asyncio
6
+ import numpy as np
7
  from nova_sonic_tool_use import BedrockStreamManager, AudioStreamer
8
  from language_coach import LanguageCoach
9
  from session_manager import SessionManager
10
+ from config import UI_TITLE, UI_SUBTITLE, INPUT_SAMPLE_RATE
11
  import gradio as gr
12
 
13
  # Import dotenv for environment variables if available
 
25
  except ImportError:
26
  HF_AUDIO_AVAILABLE = False
27
 
28
+ # Try to import transformers audio utils for ffmpeg microphone
29
+ try:
30
+ from transformers.pipelines.audio_utils import ffmpeg_microphone_live
31
+ FFMPEG_AVAILABLE = True
32
+ print("ffmpeg_microphone_live is available!")
33
+ except ImportError:
34
+ FFMPEG_AVAILABLE = False
35
+ print("ffmpeg_microphone_live is not available. Using fallback audio handling.")
36
+
37
  # Check if we're in HF Spaces
38
  def is_huggingface_spaces():
39
  """Detect if we're running on HuggingFace Spaces"""
40
  return "SPACE_ID" in os.environ or "SYSTEM" in os.environ and os.environ.get("SYSTEM") == "spaces"
41
 
42
+ # Create an ffmpeg microphone streamer function
43
+ def create_ffmpeg_mic(sample_rate=INPUT_SAMPLE_RATE, chunk_length_s=1.0, stream_chunk_s=0.25):
44
+ """Creates an ffmpeg-based microphone stream if available"""
45
+ if not FFMPEG_AVAILABLE:
46
+ return None
47
+
48
+ try:
49
+ mic = ffmpeg_microphone_live(
50
+ sampling_rate=sample_rate,
51
+ chunk_length_s=chunk_length_s,
52
+ stream_chunk_s=stream_chunk_s,
53
+ )
54
+ print(f"Successfully created ffmpeg microphone with sample rate {sample_rate}")
55
+ return mic
56
+ except Exception as e:
57
+ print(f"Error creating ffmpeg microphone: {e}")
58
+ return None
59
+
60
  class NovaConversationApp:
61
  def __init__(self, session_id=None):
62
  # Initialize core components
 
82
  """Start the conversation with Nova"""
83
  print("Starting conversation with Nova...")
84
  self.is_running = True
85
+ self.ffmpeg_mic = None
86
+ self.ffmpeg_thread = None
87
 
88
  # Create event loop in the current thread if needed
89
  try:
 
104
 
105
  error_msg = f"Missing AWS credentials: {', '.join(missing)}"
106
  # Check if running in Hugging Face Spaces
107
+ if is_huggingface_spaces():
108
  error_msg += "\nPlease add these as secrets in your Hugging Face Space settings."
109
  else:
110
  error_msg += "\nPlease set these environment variables or add them to a .env file."
 
119
  print("Using Hugging Face Spaces-optimized audio streamer")
120
  self.audio_streamer = HFAudioStreamer(self.stream_manager)
121
  else:
122
+ # Try to use ffmpeg microphone first if available
123
+ if FFMPEG_AVAILABLE:
124
+ print("Attempting to use ffmpeg microphone streamer")
125
+ # Create ffmpeg microphone
126
+ self.ffmpeg_mic = create_ffmpeg_mic()
127
+ if self.ffmpeg_mic:
128
+ # We'll handle ffmpeg in a separate thread after stream initialization
129
+ print("Will use ffmpeg microphone for audio input")
130
+
131
+ # Regardless of ffmpeg availability, initialize standard audio streamer as fallback
132
+ print("Using standard audio streamer (with potential ffmpeg enhancement)")
133
  self.audio_streamer = AudioStreamer(self.stream_manager)
134
 
135
  # Initialize the stream in the event loop
136
  self.loop.run_until_complete(self._initialize_streaming())
137
 
138
+ # If ffmpeg mic is available, start a thread to process its input
139
+ if self.ffmpeg_mic:
140
+ self.ffmpeg_thread = threading.Thread(
141
+ target=self._process_ffmpeg_mic,
142
+ daemon=True
143
+ )
144
+ self.ffmpeg_thread.start()
145
+ print("Started ffmpeg microphone processing thread")
146
+
147
  # Monitor output text for session history and language coaching
148
  asyncio.run_coroutine_threadsafe(self._monitor_output(), self.loop)
149
 
 
228
  return True
229
  return False
230
 
231
+ def _process_ffmpeg_mic(self):
232
+ """Process audio from ffmpeg microphone in a separate thread"""
233
+ try:
234
+ # Log the start of processing
235
+ print("Starting ffmpeg microphone processing...")
236
+
237
+ # Track transcription for visual feedback
238
+ current_transcription = ""
239
+ last_transcription_time = time.time()
240
+
241
+ # Process each chunk from the ffmpeg microphone
242
+ for audio_chunk in self.ffmpeg_mic:
243
+ if not self.is_running:
244
+ break
245
+
246
+ # Convert from float32 [-1.0, 1.0] to int16 for Nova Sonic
247
+ if isinstance(audio_chunk, np.ndarray):
248
+ # Scale from [-1.0, 1.0] to int16 range
249
+ audio_int16 = (audio_chunk * 32767).astype(np.int16)
250
+ audio_bytes = audio_int16.tobytes()
251
+
252
+ # Send to Bedrock via the stream manager
253
+ if self.stream_manager and self.is_running:
254
+ self.stream_manager.add_audio_chunk(audio_bytes)
255
+
256
+ # Log periodically to show that audio is being processed
257
+ current_time = time.time()
258
+ if current_time - last_transcription_time > 2.0: # Every 2 seconds
259
+ print("Processing audio from ffmpeg microphone...")
260
+ last_transcription_time = current_time
261
+
262
+ print("Finished ffmpeg microphone processing")
263
+
264
+ except Exception as e:
265
+ print(f"Error in ffmpeg microphone thread: {e}")
266
+ import traceback
267
+ traceback.print_exc()
268
+
269
  def stop(self):
270
  """Stop the conversation and clean up resources"""
271
  if not self.is_running:
 
273
 
274
  self.is_running = False
275
 
276
+ # Stop the ffmpeg thread if it's running
277
+ if self.ffmpeg_mic:
278
+ try:
279
+ self.ffmpeg_mic.close()
280
+ except:
281
+ pass
282
+ self.ffmpeg_mic = None
283
+
284
  # Clean up the audio streamer and stream manager
285
  if self.loop and self.audio_streamer:
286
  asyncio.run_coroutine_threadsafe(
 
296
  gr.Markdown(f"# {UI_TITLE}")
297
  gr.Markdown(f"## {UI_SUBTITLE}")
298
 
299
+ # Check if we're in HF Spaces to provide appropriate instructions
300
+ if is_huggingface_spaces():
301
+ gr.Markdown("""
302
+ ### Hugging Face Spaces Mode
303
+
304
+ This app is running in Hugging Face Spaces.
305
+
306
+ 1. Click **Start Conversation** to begin
307
+ 2. Nova will automatically greet you
308
+ 3. Either speak into your microphone or use the text input below
309
+ 4. Press **Stop Conversation** when done
310
+ """)
311
+
312
  with gr.Row():
313
  status_indicator = gr.Textbox(
314
  value="Ready to start",
 
316
  interactive=False
317
  )
318
 
319
+ # Live transcription display
320
  with gr.Row():
321
+ live_transcription = gr.Textbox(
322
+ value="",
323
+ label="Live Transcription",
324
+ placeholder="Your speech will appear here as you speak...",
325
+ interactive=False
326
+ )
327
+
328
+ # Conversation history display
329
+ conversation_display = gr.Textbox(
330
+ value="",
331
+ label="Conversation History",
332
+ lines=10,
333
+ max_lines=20,
334
+ interactive=False
335
+ )
336
+
337
+ with gr.Row():
338
+ start_button = gr.Button("Start Conversation", variant="primary")
339
+ stop_button = gr.Button("Stop Conversation", variant="stop")
340
  replay_button = gr.Button("Replay Last Response")
341
+
342
+ # Add microphone component
343
+ with gr.Row():
344
+ audio_input = gr.Audio(
345
+ source="microphone",
346
+ type="filepath",
347
+ streaming=True,
348
+ label="Speak here (if your browser supports it)",
349
+ visible=not is_huggingface_spaces() # Hide in HF Spaces by default
350
+ )
351
+
352
+ # Text input for all users
353
+ with gr.Row():
354
+ user_message = gr.Textbox(
355
+ placeholder="Type your message here and press Enter",
356
+ label="Your Message",
357
+ interactive=True,
358
+ show_label=True
359
+ )
360
+ send_button = gr.Button("Send", variant="primary")
361
+
362
  # Define UI interactions
363
  def start_conversation():
364
  if app.start():
365
+ return "Conversation started - Nova will say hello shortly"
366
  return "Failed to start conversation"
367
 
368
  def stop_conversation():
 
373
  if app.replay_last_response():
374
  return "Replaying last response"
375
  return "No response to replay"
376
+
377
+ # Function to handle audio from microphone
378
+ def process_audio(audio_path):
379
+ try:
380
+ if app.is_running and app.audio_streamer and audio_path:
381
+ # Not returning anything here as this is processed in stream mode
382
+ # Update will be shown in live transcription
383
+ pass
384
+ return None
385
+ except Exception as e:
386
+ print(f"Error processing audio: {e}")
387
+ return None
388
+
389
+ # Function to send text messages
390
+ def send_text_message(text):
391
+ if not text.strip():
392
+ return "Please type a message first", live_transcription.value, None
393
 
394
+ if app.is_running and app.audio_streamer:
395
+ # Update the live transcription to show what user said
396
+ new_transcription = f"You: {text}"
397
+
398
+ # Add text to the conversation display
399
+ history = conversation_display.value
400
+ new_history = f"{history}\nYou: {text}\n"
401
+
402
+ # Use the appropriate method based on the streamer type
403
+ if hasattr(app.audio_streamer, 'send_text_message'):
404
+ # Schedule the text message to be sent
405
+ asyncio.run_coroutine_threadsafe(
406
+ app.audio_streamer.send_text_message(text),
407
+ app.loop
408
+ )
409
+ return "Message sent", new_transcription, new_history, ""
410
+ else:
411
+ return "Audio streamer doesn't support text messages", live_transcription.value, history, text
412
+ else:
413
+ return "Please start the conversation first", live_transcription.value, None, text
414
+
415
+ # Connect the audio input to processing
416
+ if not is_huggingface_spaces():
417
+ audio_input.stream(
418
+ process_audio,
419
+ inputs=[audio_input],
420
+ outputs=None
421
+ )
422
+
423
+ # Connect the text input to the send function
424
+ send_button.click(
425
+ send_text_message,
426
+ inputs=[user_message],
427
+ outputs=[status_indicator, live_transcription, conversation_display, user_message]
428
+ )
429
+ user_message.submit(
430
+ send_text_message,
431
+ inputs=[user_message],
432
+ outputs=[status_indicator, live_transcription, conversation_display, user_message]
433
+ )
434
+
435
  # Wire up the UI interactions
436
  start_button.click(start_conversation, outputs=status_indicator)
437
  stop_button.click(stop_conversation, outputs=status_indicator)
438
  replay_button.click(replay_last, outputs=status_indicator)
439
 
440
+ # Function to update the live transcription
441
+ def update_live_transcription():
442
+ if app.is_running and app.stream_manager and app.stream_manager.output_queue:
443
+ # Try to get the most recent user speech transcription if available
444
+ try:
445
+ # This is non-blocking
446
+ if not app.stream_manager.output_queue.empty():
447
+ message = app.stream_manager.output_queue.get_nowait()
448
+ if "event" in message and "textOutput" in message["event"]:
449
+ content = message["event"]["textOutput"]["content"]
450
+ role = message["event"]["textOutput"]["role"]
451
+ if role == "USER":
452
+ return f"You (live): {content}"
453
+ except Exception as e:
454
+ print(f"Error updating live transcription: {e}")
455
+ return live_transcription.value
456
+
457
+ # Update the conversation history from the app
458
+ def update_conversation():
459
+ if app.session_manager and app.is_running:
460
+ history = app.session_manager.get_conversation_context()
461
+ # Replace the format to make it more readable
462
+ history = history.replace("User: ", "You: ").replace("Nova: ", "Nova: ")
463
+ return history
464
+ return conversation_display.value
465
+
466
+ # Set up a periodic updates
467
+ live_transcription.every(0.5, update_live_transcription) # Update more frequently
468
+ conversation_display.every(1, update_conversation)
469
+
470
  return ui
471
 
472
  if __name__ == "__main__":
hf_audio_utils.py CHANGED
@@ -164,6 +164,9 @@ class HFAudioStreamer:
164
 
165
  self.is_streaming = True
166
 
 
 
 
167
  # Set up tasks based on mode
168
  if self.use_ffmpeg:
169
  # Start the ffmpeg microphone thread
@@ -182,6 +185,39 @@ class HFAudioStreamer:
182
 
183
  # Once input() returns, stop streaming
184
  await self.stop_streaming()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
185
 
186
  async def stop_streaming(self):
187
  """Stop streaming audio"""
 
164
 
165
  self.is_streaming = True
166
 
167
+ # Start with a welcome message from Nova
168
+ await self.send_text_message("Hi there! I'm Nova, your conversation partner. How are you doing today?")
169
+
170
  # Set up tasks based on mode
171
  if self.use_ffmpeg:
172
  # Start the ffmpeg microphone thread
 
185
 
186
  # Once input() returns, stop streaming
187
  await self.stop_streaming()
188
+
189
+ async def send_text_message(self, text):
190
+ """Send a text message to Nova to simulate user input"""
191
+ try:
192
+ # Create text content start event
193
+ content_name = str(time.time())
194
+ text_content_start = self.stream_manager.TEXT_CONTENT_START_EVENT % (
195
+ self.stream_manager.prompt_name,
196
+ content_name,
197
+ "USER"
198
+ )
199
+ await self.stream_manager.send_raw_event(text_content_start)
200
+
201
+ # Create text input event
202
+ text_input = self.stream_manager.TEXT_INPUT_EVENT % (
203
+ self.stream_manager.prompt_name,
204
+ content_name,
205
+ text
206
+ )
207
+ await self.stream_manager.send_raw_event(text_input)
208
+
209
+ # Create content end event
210
+ content_end = self.stream_manager.CONTENT_END_EVENT % (
211
+ self.stream_manager.prompt_name,
212
+ content_name
213
+ )
214
+ await self.stream_manager.send_raw_event(content_end)
215
+
216
+ print(f"Sent text message to Nova: {text}")
217
+ return True
218
+ except Exception as e:
219
+ print(f"Error sending text message: {e}")
220
+ return False
221
 
222
  async def stop_streaming(self):
223
  """Stop streaming audio"""
nova_sonic_tool_use.py CHANGED
@@ -929,6 +929,9 @@ class AudioStreamer:
929
 
930
  self.is_streaming = True
931
 
 
 
 
932
  # Set up tasks based on mode
933
  tasks = []
934
 
@@ -950,6 +953,39 @@ class AudioStreamer:
950
 
951
  # Once input() returns, stop streaming
952
  await self.stop_streaming()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
953
 
954
  async def stop_streaming(self):
955
  """Stop streaming audio."""
 
929
 
930
  self.is_streaming = True
931
 
932
+ # Start with a welcome message from Nova
933
+ await self.send_text_message("Hi there! I'm Nova, your conversation partner. How are you doing today?")
934
+
935
  # Set up tasks based on mode
936
  tasks = []
937
 
 
953
 
954
  # Once input() returns, stop streaming
955
  await self.stop_streaming()
956
+
957
+ async def send_text_message(self, text):
958
+ """Send a text message to Nova to simulate user input"""
959
+ try:
960
+ # Create text content start event
961
+ content_name = str(time.time())
962
+ text_content_start = self.stream_manager.TEXT_CONTENT_START_EVENT % (
963
+ self.stream_manager.prompt_name,
964
+ content_name,
965
+ "USER"
966
+ )
967
+ await self.stream_manager.send_raw_event(text_content_start)
968
+
969
+ # Create text input event
970
+ text_input = self.stream_manager.TEXT_INPUT_EVENT % (
971
+ self.stream_manager.prompt_name,
972
+ content_name,
973
+ text
974
+ )
975
+ await self.stream_manager.send_raw_event(text_input)
976
+
977
+ # Create content end event
978
+ content_end = self.stream_manager.CONTENT_END_EVENT % (
979
+ self.stream_manager.prompt_name,
980
+ content_name
981
+ )
982
+ await self.stream_manager.send_raw_event(content_end)
983
+
984
+ print(f"Sent text message to Nova: {text}")
985
+ return True
986
+ except Exception as e:
987
+ print(f"Error sending text message: {e}")
988
+ return False
989
 
990
  async def stop_streaming(self):
991
  """Stop streaming audio."""
old_sonic_use.py ADDED
@@ -0,0 +1,885 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import asyncio
3
+ import base64
4
+ import json
5
+ import uuid
6
+ import warnings
7
+ import pyaudio
8
+ import pytz
9
+ import random
10
+ import hashlib
11
+ import datetime
12
+ import time
13
+ import inspect
14
+ from aws_sdk_bedrock_runtime.client import BedrockRuntimeClient, InvokeModelWithBidirectionalStreamOperationInput
15
+ from aws_sdk_bedrock_runtime.models import InvokeModelWithBidirectionalStreamInputChunk, BidirectionalInputPayloadPart
16
+ from aws_sdk_bedrock_runtime.config import Config, HTTPAuthSchemeResolver, SigV4AuthScheme
17
+ from smithy_aws_core.credentials_resolvers.environment import EnvironmentCredentialsResolver
18
+
19
+ # Suppress warnings
20
+ warnings.filterwarnings("ignore")
21
+
22
+ # Audio configuration
23
+ INPUT_SAMPLE_RATE = 16000
24
+ OUTPUT_SAMPLE_RATE = 24000
25
+ CHANNELS = 1
26
+ FORMAT = pyaudio.paInt16
27
+ CHUNK_SIZE = 1024 # Number of frames per buffer
28
+
29
+ # Debug mode flag
30
+ DEBUG = False
31
+
32
+ def debug_print(message):
33
+ """Print only if debug mode is enabled"""
34
+ if DEBUG:
35
+ functionName = inspect.stack()[1].function
36
+ if functionName == 'time_it' or functionName == 'time_it_async':
37
+ functionName = inspect.stack()[2].function
38
+ print('{:%Y-%m-%d %H:%M:%S.%f}'.format(datetime.datetime.now())[:-3] + ' ' + functionName + ' ' + message)
39
+
40
+ def time_it(label, methodToRun):
41
+ start_time = time.perf_counter()
42
+ result = methodToRun()
43
+ end_time = time.perf_counter()
44
+ debug_print(f"Execution time for {label}: {end_time - start_time:.4f} seconds")
45
+ return result
46
+
47
+ async def time_it_async(label, methodToRun):
48
+ start_time = time.perf_counter()
49
+ result = await methodToRun()
50
+ end_time = time.perf_counter()
51
+ debug_print(f"Execution time for {label}: {end_time - start_time:.4f} seconds")
52
+ return result
53
+
54
+ class BedrockStreamManager:
55
+ """Manages bidirectional streaming with AWS Bedrock using asyncio"""
56
+
57
+ # Event templates
58
+ START_SESSION_EVENT = '''{
59
+ "event": {
60
+ "sessionStart": {
61
+ "inferenceConfiguration": {
62
+ "maxTokens": 1024,
63
+ "topP": 0.9,
64
+ "temperature": 0.7
65
+ }
66
+ }
67
+ }
68
+ }'''
69
+
70
+ CONTENT_START_EVENT = '''{
71
+ "event": {
72
+ "contentStart": {
73
+ "promptName": "%s",
74
+ "contentName": "%s",
75
+ "type": "AUDIO",
76
+ "interactive": true,
77
+ "role": "USER",
78
+ "audioInputConfiguration": {
79
+ "mediaType": "audio/lpcm",
80
+ "sampleRateHertz": 16000,
81
+ "sampleSizeBits": 16,
82
+ "channelCount": 1,
83
+ "audioType": "SPEECH",
84
+ "encoding": "base64"
85
+ }
86
+ }
87
+ }
88
+ }'''
89
+
90
+ AUDIO_EVENT_TEMPLATE = '''{
91
+ "event": {
92
+ "audioInput": {
93
+ "promptName": "%s",
94
+ "contentName": "%s",
95
+ "content": "%s"
96
+ }
97
+ }
98
+ }'''
99
+
100
+ TEXT_CONTENT_START_EVENT = '''{
101
+ "event": {
102
+ "contentStart": {
103
+ "promptName": "%s",
104
+ "contentName": "%s",
105
+ "type": "TEXT",
106
+ "role": "%s",
107
+ "interactive": true,
108
+ "textInputConfiguration": {
109
+ "mediaType": "text/plain"
110
+ }
111
+ }
112
+ }
113
+ }'''
114
+
115
+ TEXT_INPUT_EVENT = '''{
116
+ "event": {
117
+ "textInput": {
118
+ "promptName": "%s",
119
+ "contentName": "%s",
120
+ "content": "%s"
121
+ }
122
+ }
123
+ }'''
124
+
125
+ TOOL_CONTENT_START_EVENT = '''{
126
+ "event": {
127
+ "contentStart": {
128
+ "promptName": "%s",
129
+ "contentName": "%s",
130
+ "interactive": false,
131
+ "type": "TOOL",
132
+ "role": "TOOL",
133
+ "toolResultInputConfiguration": {
134
+ "toolUseId": "%s",
135
+ "type": "TEXT",
136
+ "textInputConfiguration": {
137
+ "mediaType": "text/plain"
138
+ }
139
+ }
140
+ }
141
+ }
142
+ }'''
143
+
144
+ CONTENT_END_EVENT = '''{
145
+ "event": {
146
+ "contentEnd": {
147
+ "promptName": "%s",
148
+ "contentName": "%s"
149
+ }
150
+ }
151
+ }'''
152
+
153
+ PROMPT_END_EVENT = '''{
154
+ "event": {
155
+ "promptEnd": {
156
+ "promptName": "%s"
157
+ }
158
+ }
159
+ }'''
160
+
161
+ SESSION_END_EVENT = '''{
162
+ "event": {
163
+ "sessionEnd": {}
164
+ }
165
+ }'''
166
+
167
+ def start_prompt(self):
168
+ """Create a promptStart event"""
169
+ get_default_tool_schema = json.dumps({
170
+ "type": "object",
171
+ "properties": {},
172
+ "required": []
173
+ })
174
+
175
+ get_order_tracking_schema = json.dumps({
176
+ "type": "object",
177
+ "properties": {
178
+ "orderId": {
179
+ "type": "string",
180
+ "description": "The order number or ID to track"
181
+ },
182
+ "requestNotifications": {
183
+ "type": "boolean",
184
+ "description": "Whether to set up notifications for this order",
185
+ "default": False
186
+ }
187
+ },
188
+ "required": ["orderId"]
189
+ })
190
+
191
+
192
+ prompt_start_event = {
193
+ "event": {
194
+ "promptStart": {
195
+ "promptName": self.prompt_name,
196
+ "textOutputConfiguration": {
197
+ "mediaType": "text/plain"
198
+ },
199
+ "audioOutputConfiguration": {
200
+ "mediaType": "audio/lpcm",
201
+ "sampleRateHertz": 24000,
202
+ "sampleSizeBits": 16,
203
+ "channelCount": 1,
204
+ "voiceId": "matthew",
205
+ "encoding": "base64",
206
+ "audioType": "SPEECH"
207
+ },
208
+ "toolUseOutputConfiguration": {
209
+ "mediaType": "application/json"
210
+ },
211
+ "toolConfiguration": {
212
+ "tools": [
213
+ {
214
+ "toolSpec": {
215
+ "name": "getDateAndTimeTool",
216
+ "description": "get information about the current date and time",
217
+ "inputSchema": {
218
+ "json": get_default_tool_schema
219
+ }
220
+ }
221
+ },
222
+ {
223
+ "toolSpec": {
224
+ "name": "trackOrderTool",
225
+ "description": "Retrieves real-time order tracking information and detailed status updates for customer orders by order ID. Provides estimated delivery dates. Use this tool when customers ask about their order status or delivery timeline.",
226
+ "inputSchema": {
227
+ "json": get_order_tracking_schema
228
+ }
229
+ }
230
+ }
231
+ ]
232
+ }
233
+ }
234
+ }
235
+ }
236
+
237
+ return json.dumps(prompt_start_event)
238
+
239
+ def tool_result_event(self, content_name, content, role):
240
+ """Create a tool result event"""
241
+
242
+ if isinstance(content, dict):
243
+ content_json_string = json.dumps(content)
244
+ else:
245
+ content_json_string = content
246
+
247
+ tool_result_event = {
248
+ "event": {
249
+ "toolResult": {
250
+ "promptName": self.prompt_name,
251
+ "contentName": content_name,
252
+ "content": content_json_string
253
+ }
254
+ }
255
+ }
256
+ return json.dumps(tool_result_event)
257
+
258
+ def __init__(self, model_id='amazon.nova-sonic-v1:0', region='us-east-1'):
259
+ """Initialize the stream manager."""
260
+ self.model_id = model_id
261
+ self.region = region
262
+
263
+ # Replace RxPy subjects with asyncio queues
264
+ self.audio_input_queue = asyncio.Queue()
265
+ self.audio_output_queue = asyncio.Queue()
266
+ self.output_queue = asyncio.Queue()
267
+
268
+ self.response_task = None
269
+ self.stream_response = None
270
+ self.is_active = False
271
+ self.barge_in = False
272
+ self.bedrock_client = None
273
+
274
+ # Audio playback components
275
+ self.audio_player = None
276
+
277
+ # Text response components
278
+ self.display_assistant_text = False
279
+ self.role = None
280
+
281
+ # Session information
282
+ self.prompt_name = str(uuid.uuid4())
283
+ self.content_name = str(uuid.uuid4())
284
+ self.audio_content_name = str(uuid.uuid4())
285
+ self.toolUseContent = ""
286
+ self.toolUseId = ""
287
+ self.toolName = ""
288
+
289
+ def _initialize_client(self):
290
+ """Initialize the Bedrock client."""
291
+ config = Config(
292
+ endpoint_uri=f"https://bedrock-runtime.{self.region}.amazonaws.com",
293
+ region=self.region,
294
+ aws_credentials_identity_resolver=EnvironmentCredentialsResolver(),
295
+ http_auth_scheme_resolver=HTTPAuthSchemeResolver(),
296
+ http_auth_schemes={"aws.auth#sigv4": SigV4AuthScheme()}
297
+ )
298
+ self.bedrock_client = BedrockRuntimeClient(config=config)
299
+
300
+ async def initialize_stream(self):
301
+ """Initialize the bidirectional stream with Bedrock."""
302
+ if not self.bedrock_client:
303
+ self._initialize_client()
304
+
305
+ try:
306
+ self.stream_response = await time_it_async("invoke_model_with_bidirectional_stream", lambda : self.bedrock_client.invoke_model_with_bidirectional_stream( InvokeModelWithBidirectionalStreamOperationInput(model_id=self.model_id)))
307
+ self.is_active = True
308
+ default_system_prompt = "You are a friend. The user and you will engage in a spoken dialog exchanging the transcripts of a natural real-time conversation." \
309
+ "When reading order numbers, please read each digit individually, separated by pauses. For example, order #1234 should be read as 'order number one-two-three-four' rather than 'order number one thousand two hundred thirty-four'."
310
+
311
+ # Send initialization events
312
+ prompt_event = self.start_prompt()
313
+ text_content_start = self.TEXT_CONTENT_START_EVENT % (self.prompt_name, self.content_name, "SYSTEM")
314
+ text_content = self.TEXT_INPUT_EVENT % (self.prompt_name, self.content_name, default_system_prompt)
315
+ text_content_end = self.CONTENT_END_EVENT % (self.prompt_name, self.content_name)
316
+
317
+ init_events = [self.START_SESSION_EVENT, prompt_event, text_content_start, text_content, text_content_end]
318
+
319
+ for event in init_events:
320
+ await self.send_raw_event(event)
321
+ # Small delay between init events
322
+ await asyncio.sleep(0.1)
323
+
324
+ # Start listening for responses
325
+ self.response_task = asyncio.create_task(self._process_responses())
326
+
327
+ # Start processing audio input
328
+ asyncio.create_task(self._process_audio_input())
329
+
330
+ # Wait a bit to ensure everything is set up
331
+ await asyncio.sleep(0.1)
332
+
333
+ debug_print("Stream initialized successfully")
334
+ return self
335
+ except Exception as e:
336
+ self.is_active = False
337
+ print(f"Failed to initialize stream: {str(e)}")
338
+ raise
339
+
340
+ async def send_raw_event(self, event_json):
341
+ """Send a raw event JSON to the Bedrock stream."""
342
+ if not self.stream_response or not self.is_active:
343
+ debug_print("Stream not initialized or closed")
344
+ return
345
+
346
+ event = InvokeModelWithBidirectionalStreamInputChunk(
347
+ value=BidirectionalInputPayloadPart(bytes_=event_json.encode('utf-8'))
348
+ )
349
+
350
+ try:
351
+ await self.stream_response.input_stream.send(event)
352
+ # For debugging large events, you might want to log just the type
353
+ if DEBUG:
354
+ if len(event_json) > 200:
355
+ event_type = json.loads(event_json).get("event", {}).keys()
356
+ debug_print(f"Sent event type: {list(event_type)}")
357
+ else:
358
+ debug_print(f"Sent event: {event_json}")
359
+ except Exception as e:
360
+ debug_print(f"Error sending event: {str(e)}")
361
+ if DEBUG:
362
+ import traceback
363
+ traceback.print_exc()
364
+
365
+ async def send_audio_content_start_event(self):
366
+ """Send a content start event to the Bedrock stream."""
367
+ content_start_event = self.CONTENT_START_EVENT % (self.prompt_name, self.audio_content_name)
368
+ await self.send_raw_event(content_start_event)
369
+
370
+ async def _process_audio_input(self):
371
+ """Process audio input from the queue and send to Bedrock."""
372
+ while self.is_active:
373
+ try:
374
+ # Get audio data from the queue
375
+ data = await self.audio_input_queue.get()
376
+
377
+ audio_bytes = data.get('audio_bytes')
378
+ if not audio_bytes:
379
+ debug_print("No audio bytes received")
380
+ continue
381
+
382
+ # Base64 encode the audio data
383
+ blob = base64.b64encode(audio_bytes)
384
+ audio_event = self.AUDIO_EVENT_TEMPLATE % (
385
+ self.prompt_name,
386
+ self.audio_content_name,
387
+ blob.decode('utf-8')
388
+ )
389
+
390
+ # Send the event
391
+ await self.send_raw_event(audio_event)
392
+
393
+ except asyncio.CancelledError:
394
+ break
395
+ except Exception as e:
396
+ debug_print(f"Error processing audio: {e}")
397
+ if DEBUG:
398
+ import traceback
399
+ traceback.print_exc()
400
+
401
+ def add_audio_chunk(self, audio_bytes):
402
+ """Add an audio chunk to the queue."""
403
+ self.audio_input_queue.put_nowait({
404
+ 'audio_bytes': audio_bytes,
405
+ 'prompt_name': self.prompt_name,
406
+ 'content_name': self.audio_content_name
407
+ })
408
+
409
+ async def send_audio_content_end_event(self):
410
+ """Send a content end event to the Bedrock stream."""
411
+ if not self.is_active:
412
+ debug_print("Stream is not active")
413
+ return
414
+
415
+ content_end_event = self.CONTENT_END_EVENT % (self.prompt_name, self.audio_content_name)
416
+ await self.send_raw_event(content_end_event)
417
+ debug_print("Audio ended")
418
+
419
+ async def send_tool_start_event(self, content_name):
420
+ """Send a tool content start event to the Bedrock stream."""
421
+ content_start_event = self.TOOL_CONTENT_START_EVENT % (self.prompt_name, content_name, self.toolUseId)
422
+ debug_print(f"Sending tool start event: {content_start_event}")
423
+ await self.send_raw_event(content_start_event)
424
+
425
+ async def send_tool_result_event(self, content_name, tool_result):
426
+ """Send a tool content event to the Bedrock stream."""
427
+ # Use the actual tool result from processToolUse
428
+ tool_result_event = self.tool_result_event(content_name=content_name, content=tool_result, role="TOOL")
429
+ debug_print(f"Sending tool result event: {tool_result_event}")
430
+ await self.send_raw_event(tool_result_event)
431
+
432
+ async def send_tool_content_end_event(self, content_name):
433
+ """Send a tool content end event to the Bedrock stream."""
434
+ tool_content_end_event = self.CONTENT_END_EVENT % (self.prompt_name, content_name)
435
+ debug_print(f"Sending tool content event: {tool_content_end_event}")
436
+ await self.send_raw_event(tool_content_end_event)
437
+
438
+ async def send_prompt_end_event(self):
439
+ """Close the stream and clean up resources."""
440
+ if not self.is_active:
441
+ debug_print("Stream is not active")
442
+ return
443
+
444
+ prompt_end_event = self.PROMPT_END_EVENT % (self.prompt_name)
445
+ await self.send_raw_event(prompt_end_event)
446
+ debug_print("Prompt ended")
447
+
448
+ async def send_session_end_event(self):
449
+ """Send a session end event to the Bedrock stream."""
450
+ if not self.is_active:
451
+ debug_print("Stream is not active")
452
+ return
453
+
454
+ await self.send_raw_event(self.SESSION_END_EVENT)
455
+ self.is_active = False
456
+ debug_print("Session ended")
457
+
458
+ async def _process_responses(self):
459
+ """Process incoming responses from Bedrock."""
460
+ try:
461
+ while self.is_active:
462
+ try:
463
+ output = await self.stream_response.await_output()
464
+ result = await output[1].receive()
465
+ if result.value and result.value.bytes_:
466
+ try:
467
+ response_data = result.value.bytes_.decode('utf-8')
468
+ json_data = json.loads(response_data)
469
+
470
+ # Handle different response types
471
+ if 'event' in json_data:
472
+ if 'contentStart' in json_data['event']:
473
+ debug_print("Content start detected")
474
+ content_start = json_data['event']['contentStart']
475
+ # set role
476
+ self.role = content_start['role']
477
+ # Check for speculative content
478
+ if 'additionalModelFields' in content_start:
479
+ try:
480
+ additional_fields = json.loads(content_start['additionalModelFields'])
481
+ if additional_fields.get('generationStage') == 'SPECULATIVE':
482
+ debug_print("Speculative content detected")
483
+ self.display_assistant_text = True
484
+ else:
485
+ self.display_assistant_text = False
486
+ except json.JSONDecodeError:
487
+ debug_print("Error parsing additionalModelFields")
488
+ elif 'textOutput' in json_data['event']:
489
+ text_content = json_data['event']['textOutput']['content']
490
+ role = json_data['event']['textOutput']['role']
491
+ # Check if there is a barge-in
492
+ if '{ "interrupted" : true }' in text_content:
493
+ debug_print("Barge-in detected. Stopping audio output.")
494
+ self.barge_in = True
495
+
496
+ if (self.role == "ASSISTANT" and self.display_assistant_text):
497
+ print(f"Assistant: {text_content}")
498
+ elif (self.role == "USER"):
499
+ print(f"User: {text_content}")
500
+
501
+ elif 'audioOutput' in json_data['event']:
502
+ audio_content = json_data['event']['audioOutput']['content']
503
+ audio_bytes = base64.b64decode(audio_content)
504
+ await self.audio_output_queue.put(audio_bytes)
505
+ elif 'toolUse' in json_data['event']:
506
+ self.toolUseContent = json_data['event']['toolUse']
507
+ self.toolName = json_data['event']['toolUse']['toolName']
508
+ self.toolUseId = json_data['event']['toolUse']['toolUseId']
509
+ debug_print(f"Tool use detected: {self.toolName}, ID: {self.toolUseId}")
510
+ elif 'contentEnd' in json_data['event'] and json_data['event'].get('contentEnd', {}).get('type') == 'TOOL':
511
+ debug_print("Processing tool use and sending result")
512
+ toolResult = await self.processToolUse(self.toolName, self.toolUseContent)
513
+ toolContent = str(uuid.uuid4())
514
+ await self.send_tool_start_event(toolContent)
515
+ await self.send_tool_result_event(toolContent, toolResult)
516
+ await self.send_tool_content_end_event(toolContent)
517
+
518
+ elif 'completionEnd' in json_data['event']:
519
+ # Handle end of conversation, no more response will be generated
520
+ print("End of response sequence")
521
+
522
+ # Put the response in the output queue for other components
523
+ await self.output_queue.put(json_data)
524
+ except json.JSONDecodeError:
525
+ await self.output_queue.put({"raw_data": response_data})
526
+ except StopAsyncIteration:
527
+ # Stream has ended
528
+ break
529
+ except Exception as e:
530
+ # Handle ValidationException properly
531
+ if "ValidationException" in str(e):
532
+ error_message = str(e)
533
+ print(f"Validation error: {error_message}")
534
+ else:
535
+ print(f"Error receiving response: {e}")
536
+ break
537
+
538
+ except Exception as e:
539
+ print(f"Response processing error: {e}")
540
+ finally:
541
+ self.is_active = False
542
+
543
+ async def processToolUse(self, toolName, toolUseContent):
544
+ """Return the tool result"""
545
+ tool = toolName.lower()
546
+ debug_print(f"Tool Use Content: {toolUseContent}")
547
+
548
+ if tool == "getdateandtimetool":
549
+ # Get current date in PST timezone
550
+ pst_timezone = pytz.timezone("America/Los_Angeles")
551
+ pst_date = datetime.datetime.now(pst_timezone)
552
+
553
+ return {
554
+ "formattedTime": pst_date.strftime("%I:%M %p"),
555
+ "date": pst_date.strftime("%Y-%m-%d"),
556
+ "year": pst_date.year,
557
+ "month": pst_date.month,
558
+ "day": pst_date.day,
559
+ "dayOfWeek": pst_date.strftime("%A").upper(),
560
+ "timezone": "PST"
561
+ }
562
+
563
+ elif tool == "trackordertool":
564
+
565
+ # Extract order ID from toolUseContent
566
+ content = toolUseContent.get("content", {})
567
+ content_data = json.loads(content)
568
+ order_id = content_data.get("orderId", "")
569
+ request_notifications = toolUseContent.get("requestNotifications", False)
570
+
571
+ # Convert order_id to string if it's an integer
572
+ if isinstance(order_id, int):
573
+ order_id = str(order_id)
574
+ # Validate order ID format
575
+ if not order_id or not isinstance(order_id, str):
576
+ return {
577
+ "error": "Invalid order ID format",
578
+ "orderStatus": "",
579
+ "estimatedDelivery": "",
580
+ "lastUpdate": ""
581
+ }
582
+
583
+ # Create deterministic randomness based on order ID
584
+ # This ensures the same order ID always returns the same status
585
+ seed = int(hashlib.md5(order_id.encode(), usedforsecurity=False).hexdigest(), 16) % 10000
586
+ random.seed(seed)
587
+
588
+ # Possible statuses with appropriate weights
589
+ statuses = [
590
+ "Order received",
591
+ "Processing",
592
+ "Preparing for shipment",
593
+ "Shipped",
594
+ "In transit",
595
+ "Out for delivery",
596
+ "Delivered",
597
+ "Delayed"
598
+ ]
599
+
600
+ weights = [10, 15, 15, 20, 20, 10, 5, 3]
601
+
602
+ # Select a status based on the weights
603
+ status = random.choices(statuses, weights=weights, k=1)[0]
604
+
605
+ # Generate a realistic estimated delivery date
606
+ today = datetime.datetime.now()
607
+ # Handle estimated delivery date based on status
608
+ if status == "Delivered":
609
+ # For delivered items, delivery date is in the past
610
+ delivery_days = -random.randint(0, 3)
611
+ estimated_delivery = (today + datetime.timedelta(days=delivery_days)).strftime("%Y-%m-%d")
612
+ elif status == "Out for delivery":
613
+ # For out for delivery, delivery is today
614
+ estimated_delivery = today.strftime("%Y-%m-%d")
615
+ else:
616
+ # For other statuses, delivery is in the future
617
+ delivery_days = random.randint(1, 10)
618
+ estimated_delivery = (today + datetime.timedelta(days=delivery_days)).strftime("%Y-%m-%d")
619
+
620
+ # Handle notification request if enabled
621
+ notification_message = ""
622
+ if request_notifications and status != "Delivered":
623
+ notification_message = f"You will receive notifications for order {order_id}"
624
+
625
+ # Return comprehensive tracking information
626
+ tracking_info = {
627
+ "orderStatus": status,
628
+ "orderNumber": order_id,
629
+ "notificationStatus": notification_message
630
+ }
631
+
632
+ # Add appropriate fields based on status
633
+ if status == "Delivered":
634
+ tracking_info["deliveredOn"] = estimated_delivery
635
+ elif status == "Out for delivery":
636
+ tracking_info["expectedDelivery"] = "Today"
637
+ else:
638
+ tracking_info["estimatedDelivery"] = estimated_delivery
639
+
640
+ # Add location information based on status
641
+ if status == "In transit":
642
+ tracking_info["currentLocation"] = "Distribution Center"
643
+ elif status == "Delivered":
644
+ tracking_info["deliveryLocation"] = "Front Door"
645
+
646
+ # Add additional info for delayed status
647
+ if status == "Delayed":
648
+ tracking_info["additionalInfo"] = "Weather delays possible"
649
+
650
+ return tracking_info
651
+
652
+ async def close(self):
653
+ """Close the stream properly."""
654
+ if not self.is_active:
655
+ return
656
+
657
+ self.is_active = False
658
+ if self.response_task and not self.response_task.done():
659
+ self.response_task.cancel()
660
+
661
+ await self.send_audio_content_end_event()
662
+ await self.send_prompt_end_event()
663
+ await self.send_session_end_event()
664
+
665
+ if self.stream_response:
666
+ await self.stream_response.input_stream.close()
667
+
668
+ class AudioStreamer:
669
+ """Handles continuous microphone input and audio output using separate streams."""
670
+
671
+ def __init__(self, stream_manager):
672
+ self.stream_manager = stream_manager
673
+ self.is_streaming = False
674
+ self.loop = asyncio.get_event_loop()
675
+
676
+ # Initialize PyAudio
677
+ debug_print("AudioStreamer Initializing PyAudio...")
678
+ self.p = time_it("AudioStreamerInitPyAudio", pyaudio.PyAudio)
679
+ debug_print("AudioStreamer PyAudio initialized")
680
+
681
+ # Initialize separate streams for input and output
682
+ # Input stream with callback for microphone
683
+ debug_print("Opening input audio stream...")
684
+ self.input_stream = time_it("AudioStreamerOpenAudio", lambda : self.p.open(
685
+ format=FORMAT,
686
+ channels=CHANNELS,
687
+ rate=INPUT_SAMPLE_RATE,
688
+ input=True,
689
+ frames_per_buffer=CHUNK_SIZE,
690
+ stream_callback=self.input_callback
691
+ ))
692
+ debug_print("input audio stream opened")
693
+
694
+ # Output stream for direct writing (no callback)
695
+ debug_print("Opening output audio stream...")
696
+ self.output_stream = time_it("AudioStreamerOpenAudio", lambda : self.p.open(
697
+ format=FORMAT,
698
+ channels=CHANNELS,
699
+ rate=OUTPUT_SAMPLE_RATE,
700
+ output=True,
701
+ frames_per_buffer=CHUNK_SIZE
702
+ ))
703
+
704
+ debug_print("output audio stream opened")
705
+
706
+ def input_callback(self, in_data, frame_count, time_info, status):
707
+ """Callback function that schedules audio processing in the asyncio event loop"""
708
+ if self.is_streaming and in_data:
709
+ # Schedule the task in the event loop
710
+ asyncio.run_coroutine_threadsafe(
711
+ self.process_input_audio(in_data),
712
+ self.loop
713
+ )
714
+ return (None, pyaudio.paContinue)
715
+
716
+ async def process_input_audio(self, audio_data):
717
+ """Process a single audio chunk directly"""
718
+ try:
719
+ # Send audio to Bedrock immediately
720
+ self.stream_manager.add_audio_chunk(audio_data)
721
+ except Exception as e:
722
+ if self.is_streaming:
723
+ print(f"Error processing input audio: {e}")
724
+
725
+ async def play_output_audio(self):
726
+ """Play audio responses from Nova Sonic"""
727
+ while self.is_streaming:
728
+ try:
729
+ # Check for barge-in flag
730
+ if self.stream_manager.barge_in:
731
+ # Clear the audio queue
732
+ while not self.stream_manager.audio_output_queue.empty():
733
+ try:
734
+ self.stream_manager.audio_output_queue.get_nowait()
735
+ except asyncio.QueueEmpty:
736
+ break
737
+ self.stream_manager.barge_in = False
738
+ # Small sleep after clearing
739
+ await asyncio.sleep(0.05)
740
+ continue
741
+
742
+ # Get audio data from the stream manager's queue
743
+ audio_data = await asyncio.wait_for(
744
+ self.stream_manager.audio_output_queue.get(),
745
+ timeout=0.1
746
+ )
747
+
748
+ if audio_data and self.is_streaming:
749
+ # Write directly to the output stream in smaller chunks
750
+ chunk_size = CHUNK_SIZE # Use the same chunk size as the stream
751
+
752
+ # Write the audio data in chunks to avoid blocking too long
753
+ for i in range(0, len(audio_data), chunk_size):
754
+ if not self.is_streaming:
755
+ break
756
+
757
+ end = min(i + chunk_size, len(audio_data))
758
+ chunk = audio_data[i:end]
759
+
760
+ # Create a new function that captures the chunk by value
761
+ def write_chunk(data):
762
+ return self.output_stream.write(data)
763
+
764
+ # Pass the chunk to the function
765
+ await asyncio.get_event_loop().run_in_executor(None, write_chunk, chunk)
766
+
767
+ # Brief yield to allow other tasks to run
768
+ await asyncio.sleep(0.001)
769
+
770
+ except asyncio.TimeoutError:
771
+ # No data available within timeout, just continue
772
+ continue
773
+ except Exception as e:
774
+ if self.is_streaming:
775
+ print(f"Error playing output audio: {str(e)}")
776
+ import traceback
777
+ traceback.print_exc()
778
+ await asyncio.sleep(0.05)
779
+
780
+ async def start_streaming(self):
781
+ """Start streaming audio."""
782
+ if self.is_streaming:
783
+ return
784
+
785
+ print("Starting audio streaming. Speak into your microphone...")
786
+ print("Press Enter to stop streaming...")
787
+
788
+ # Send audio content start event
789
+ await time_it_async("send_audio_content_start_event", lambda : self.stream_manager.send_audio_content_start_event())
790
+
791
+ self.is_streaming = True
792
+
793
+ # Start the input stream if not already started
794
+ if not self.input_stream.is_active():
795
+ self.input_stream.start_stream()
796
+
797
+ # Start processing tasks
798
+ #self.input_task = asyncio.create_task(self.process_input_audio())
799
+ self.output_task = asyncio.create_task(self.play_output_audio())
800
+
801
+ # Wait for user to press Enter to stop
802
+ await asyncio.get_event_loop().run_in_executor(None, input)
803
+
804
+ # Once input() returns, stop streaming
805
+ await self.stop_streaming()
806
+
807
+ async def stop_streaming(self):
808
+ """Stop streaming audio."""
809
+ if not self.is_streaming:
810
+ return
811
+
812
+ self.is_streaming = False
813
+
814
+ # Cancel the tasks
815
+ tasks = []
816
+ if hasattr(self, 'input_task') and not self.input_task.done():
817
+ tasks.append(self.input_task)
818
+ if hasattr(self, 'output_task') and not self.output_task.done():
819
+ tasks.append(self.output_task)
820
+ for task in tasks:
821
+ task.cancel()
822
+ if tasks:
823
+ await asyncio.gather(*tasks, return_exceptions=True)
824
+ # Stop and close the streams
825
+ if self.input_stream:
826
+ if self.input_stream.is_active():
827
+ self.input_stream.stop_stream()
828
+ self.input_stream.close()
829
+ if self.output_stream:
830
+ if self.output_stream.is_active():
831
+ self.output_stream.stop_stream()
832
+ self.output_stream.close()
833
+ if self.p:
834
+ self.p.terminate()
835
+
836
+ await self.stream_manager.close()
837
+
838
+
839
+ async def main(debug=False):
840
+ """Main function to run the application."""
841
+ global DEBUG
842
+ DEBUG = debug
843
+
844
+ # Create stream manager
845
+ stream_manager = BedrockStreamManager(model_id='amazon.nova-sonic-v1:0', region='us-east-1')
846
+
847
+ # Create audio streamer
848
+ audio_streamer = AudioStreamer(stream_manager)
849
+
850
+ # Initialize the stream
851
+ await time_it_async("initialize_stream", stream_manager.initialize_stream)
852
+
853
+ try:
854
+ # This will run until the user presses Enter
855
+ await audio_streamer.start_streaming()
856
+
857
+ except KeyboardInterrupt:
858
+ print("Interrupted by user")
859
+ finally:
860
+ # Clean up
861
+ await audio_streamer.stop_streaming()
862
+
863
+
864
+ if __name__ == "__main__":
865
+ import argparse
866
+
867
+ parser = argparse.ArgumentParser(description='Nova Sonic Python Streaming')
868
+ parser.add_argument('--debug', action='store_true', help='Enable debug mode')
869
+ args = parser.parse_args()
870
+ # Use environment variables for AWS credentials
871
+ # These should be set in your environment or Hugging Face secrets
872
+ # os.environ['AWS_ACCESS_KEY_ID'] - set via environment variable
873
+ # os.environ['AWS_SECRET_ACCESS_KEY'] - set via environment variable
874
+ # os.environ['AWS_DEFAULT_REGION'] - defaults to "us-east-1" if not set
875
+ if not os.environ.get('AWS_DEFAULT_REGION'):
876
+ os.environ['AWS_DEFAULT_REGION'] = "us-east-1"
877
+
878
+ # Run the main function
879
+ try:
880
+ asyncio.run(main(debug=args.debug))
881
+ except Exception as e:
882
+ print(f"Application error: {e}")
883
+ if args.debug:
884
+ import traceback
885
+ traceback.print_exc()
session_manager.py CHANGED
@@ -29,12 +29,20 @@ class SessionManager:
29
 
30
  def add_interaction(self, user_text, nova_text):
31
  """Add a conversation turn to the history"""
 
 
 
 
32
  interaction = {
33
  "timestamp": datetime.now().isoformat(),
34
  "user": user_text,
35
  "nova": nova_text
36
  }
37
 
 
 
 
 
38
  self.conversation_history.append(interaction)
39
 
40
  # Keep conversation history within the limit
 
29
 
30
  def add_interaction(self, user_text, nova_text):
31
  """Add a conversation turn to the history"""
32
+ # Make sure we have valid content, especially for automated greetings
33
+ if not user_text or user_text.strip() == "":
34
+ user_text = "..." # Placeholder for empty user text (e.g., when Nova speaks first)
35
+
36
  interaction = {
37
  "timestamp": datetime.now().isoformat(),
38
  "user": user_text,
39
  "nova": nova_text
40
  }
41
 
42
+ # Print to console for visibility
43
+ print(f"\nUser: {user_text}")
44
+ print(f"Nova: {nova_text}\n")
45
+
46
  self.conversation_history.append(interaction)
47
 
48
  # Keep conversation history within the limit