SreekarB commited on
Commit
696b8b0
·
verified ·
1 Parent(s): cc0e850

Upload 10 files

Browse files
Files changed (3) hide show
  1. language_coach.py +182 -10
  2. nova_sonic.py +158 -70
  3. nova_sonic_tool_use.py +882 -0
language_coach.py CHANGED
@@ -1,6 +1,9 @@
1
  import re
2
  import json
3
  import os
 
 
 
4
 
5
  class LanguageCoach:
6
  def __init__(self):
@@ -9,21 +12,24 @@ class LanguageCoach:
9
  self.feedback_provided = set() # Track which issues we've already given feedback on
10
  self.analysis_dir = "speech_analysis"
11
 
12
- # Make sure analytics directory exists
13
  if not os.path.exists(self.analysis_dir):
14
  os.makedirs(self.analysis_dir)
15
 
 
 
 
16
  # Common speech disfluencies and patterns to track
17
  self.pattern_categories = {
18
  "filler_words": ["um", "uh", "er", "like", "you know", "sort of", "kind of"],
19
  "repetition": [], # Will be filled dynamically
20
- "incomplete_sentences": [], # Detected by sentence structure analysis
21
  "extended_pauses": [] # Detected in audio analysis
22
  }
23
 
24
  def analyze(self, transcript, session_id=None):
25
  """Analyze speech patterns in the transcript"""
26
- if not transcript:
27
  return None
28
 
29
  # Convert to lowercase for pattern matching
@@ -37,7 +43,8 @@ class LanguageCoach:
37
  "incomplete_sentences": 0,
38
  "extended_pauses": 0,
39
  "total_words": 0,
40
- "total_sentences": 0
 
41
  }
42
 
43
  # Count total words and sentences
@@ -49,8 +56,26 @@ class LanguageCoach:
49
  self.user_patterns[session_id]["total_words"] += len(words)
50
  self.user_patterns[session_id]["total_sentences"] += len(sentences)
51
 
52
- # Analyze filler words
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
53
  detected_patterns = []
 
 
54
  for filler in self.pattern_categories["filler_words"]:
55
  filler_count = text.count(filler)
56
  if filler_count > 0:
@@ -91,12 +116,88 @@ class LanguageCoach:
91
  "suggestion": f"You sometimes repeat '{word}' when speaking"
92
  })
93
  self.feedback_provided.add(pattern_key)
94
-
95
- # If session_id is provided, save the analysis
96
- if session_id:
97
- self._save_analysis(session_id)
98
-
99
  return detected_patterns
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
100
 
101
  def get_coaching_context(self, session_id):
102
  """Get contextual coaching information for the Nova assistant"""
@@ -116,15 +217,86 @@ class LanguageCoach:
116
 
117
  if top_repetitions:
118
  coaching_context += "\n- Word repetitions: " + ", ".join([f"'{w}' ({c} times)" for w, c in top_repetitions])
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
119
 
120
  # Add coaching instructions
121
  coaching_context += "\n\nCoaching approach:"
122
  coaching_context += "\n- Model clear speech without these patterns"
123
  coaching_context += "\n- Maintain natural conversation flow"
124
  coaching_context += "\n- Don't explicitly point out errors"
 
125
 
126
  return coaching_context
127
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
128
  def _save_analysis(self, session_id):
129
  """Save the speech analysis for this session"""
130
  analysis_file = os.path.join(self.analysis_dir, f"{session_id}_analysis.json")
 
1
  import re
2
  import json
3
  import os
4
+ import boto3
5
+ from datetime import datetime
6
+ from config import REGION
7
 
8
  class LanguageCoach:
9
  def __init__(self):
 
12
  self.feedback_provided = set() # Track which issues we've already given feedback on
13
  self.analysis_dir = "speech_analysis"
14
 
15
+ # Create analytics directory if it doesn't exist
16
  if not os.path.exists(self.analysis_dir):
17
  os.makedirs(self.analysis_dir)
18
 
19
+ # Initialize Bedrock client for Claude analysis
20
+ self.bedrock = boto3.client('bedrock-runtime', region_name=REGION)
21
+
22
  # Common speech disfluencies and patterns to track
23
  self.pattern_categories = {
24
  "filler_words": ["um", "uh", "er", "like", "you know", "sort of", "kind of"],
25
  "repetition": [], # Will be filled dynamically
26
+ "incomplete_sentences": [], # Detected by LLM analysis
27
  "extended_pauses": [] # Detected in audio analysis
28
  }
29
 
30
  def analyze(self, transcript, session_id=None):
31
  """Analyze speech patterns in the transcript"""
32
+ if not transcript or transcript.strip() == "":
33
  return None
34
 
35
  # Convert to lowercase for pattern matching
 
43
  "incomplete_sentences": 0,
44
  "extended_pauses": 0,
45
  "total_words": 0,
46
+ "total_sentences": 0,
47
+ "llm_analyses": []
48
  }
49
 
50
  # Count total words and sentences
 
56
  self.user_patterns[session_id]["total_words"] += len(words)
57
  self.user_patterns[session_id]["total_sentences"] += len(sentences)
58
 
59
+ # Basic pattern analysis
60
+ detected_patterns = self._analyze_basic_patterns(text, words, session_id)
61
+
62
+ # Add LLM analysis if there's enough speech content
63
+ if len(words) > 5 and session_id:
64
+ llm_patterns = self._perform_llm_analysis(transcript, session_id)
65
+ if llm_patterns:
66
+ detected_patterns.extend(llm_patterns)
67
+
68
+ # If session_id is provided, save the analysis
69
+ if session_id:
70
+ self._save_analysis(session_id)
71
+
72
+ return detected_patterns
73
+
74
+ def _analyze_basic_patterns(self, text, words, session_id=None):
75
+ """Analyze basic speech patterns like fillers and repetitions"""
76
  detected_patterns = []
77
+
78
+ # Analyze filler words
79
  for filler in self.pattern_categories["filler_words"]:
80
  filler_count = text.count(filler)
81
  if filler_count > 0:
 
116
  "suggestion": f"You sometimes repeat '{word}' when speaking"
117
  })
118
  self.feedback_provided.add(pattern_key)
119
+
 
 
 
 
120
  return detected_patterns
121
+
122
+ def _perform_llm_analysis(self, transcript, session_id):
123
+ """Use an LLM to analyze speech patterns more deeply"""
124
+ try:
125
+ # Prepare the prompt for LLM analysis
126
+ prompt = f"""Analyze the following speech transcript for language patterns and provide actionable feedback:
127
+
128
+ Speech: "{transcript}"
129
+
130
+ Focus on:
131
+ 1. Speech fluency (hesitations, incomplete sentences, abrupt transitions)
132
+ 2. Grammar issues (tense, agreement, word order)
133
+ 3. Vocabulary usage (appropriate word choice, variety)
134
+ 4. Pronunciation indicators (as inferred from transcription)
135
+ 5. Organization of thoughts
136
+
137
+ Respond with a JSON structure containing:
138
+ - Overall assessment (short paragraph)
139
+ - Detailed pattern analysis (array of specific issues found)
140
+ - Suggestions for improvement (actionable items)
141
+
142
+ Format your response as valid JSON only, no preamble or explanation.
143
+ """
144
+
145
+ # Call Claude via Amazon Bedrock
146
+ response = self.bedrock.invoke_model(
147
+ modelId="anthropic.claude-3-sonnet-20240229-v1:0",
148
+ body=json.dumps({
149
+ "anthropic_version": "bedrock-2023-05-31",
150
+ "max_tokens": 1000,
151
+ "messages": [
152
+ {
153
+ "role": "user",
154
+ "content": prompt
155
+ }
156
+ ],
157
+ "temperature": 0.3
158
+ }),
159
+ contentType="application/json",
160
+ accept="application/json"
161
+ )
162
+
163
+ # Parse the response
164
+ response_body = json.loads(response['body'].read())
165
+ content = response_body.get('content', [])
166
+ llm_text = ''.join([item.get('text', '') for item in content if item.get('type') == 'text'])
167
+
168
+ # Extract JSON from the response
169
+ try:
170
+ analysis = json.loads(llm_text.strip())
171
+
172
+ # Store the analysis in the user patterns
173
+ if session_id:
174
+ timestamp = datetime.now().isoformat()
175
+ self.user_patterns[session_id]["llm_analyses"].append({
176
+ "timestamp": timestamp,
177
+ "transcript": transcript,
178
+ "analysis": analysis
179
+ })
180
+
181
+ # Convert LLM analysis to our pattern format
182
+ patterns = []
183
+ if "detailed_pattern_analysis" in analysis:
184
+ for issue in analysis["detailed_pattern_analysis"]:
185
+ if isinstance(issue, dict) and "type" in issue and "description" in issue:
186
+ patterns.append({
187
+ "type": "llm_" + issue["type"].lower().replace(" ", "_"),
188
+ "pattern": issue["description"],
189
+ "suggestion": issue.get("suggestion", "Consider working on this pattern")
190
+ })
191
+
192
+ return patterns
193
+
194
+ except json.JSONDecodeError:
195
+ print("LLM response did not contain valid JSON")
196
+ return []
197
+
198
+ except Exception as e:
199
+ print(f"Error during LLM analysis: {e}")
200
+ return []
201
 
202
  def get_coaching_context(self, session_id):
203
  """Get contextual coaching information for the Nova assistant"""
 
217
 
218
  if top_repetitions:
219
  coaching_context += "\n- Word repetitions: " + ", ".join([f"'{w}' ({c} times)" for w, c in top_repetitions])
220
+
221
+ # Add LLM insights if available
222
+ if patterns.get("llm_analyses") and len(patterns["llm_analyses"]) > 0:
223
+ latest_analysis = patterns["llm_analyses"][-1]["analysis"]
224
+ if "overall_assessment" in latest_analysis:
225
+ coaching_context += f"\n\nSpeech assessment: {latest_analysis['overall_assessment']}"
226
+
227
+ if "suggestions_for_improvement" in latest_analysis:
228
+ suggestions = latest_analysis["suggestions_for_improvement"]
229
+ if isinstance(suggestions, list) and len(suggestions) > 0:
230
+ coaching_context += "\n\nSuggested areas to work on:"
231
+ for idx, suggestion in enumerate(suggestions[:3]): # Limit to top 3 suggestions
232
+ if isinstance(suggestion, str):
233
+ coaching_context += f"\n- {suggestion}"
234
+ elif isinstance(suggestion, dict) and "text" in suggestion:
235
+ coaching_context += f"\n- {suggestion['text']}"
236
 
237
  # Add coaching instructions
238
  coaching_context += "\n\nCoaching approach:"
239
  coaching_context += "\n- Model clear speech without these patterns"
240
  coaching_context += "\n- Maintain natural conversation flow"
241
  coaching_context += "\n- Don't explicitly point out errors"
242
+ coaching_context += "\n- Rephrase user's sentences occasionally with correct patterns"
243
 
244
  return coaching_context
245
 
246
+ def get_coaching_report(self, session_id):
247
+ """Generate a coaching report for the user"""
248
+ if session_id not in self.user_patterns:
249
+ return "No speech data available for analysis."
250
+
251
+ patterns = self.user_patterns[session_id]
252
+
253
+ report = "## Speech Analysis Report\n\n"
254
+
255
+ # Basic statistics
256
+ report += "### Conversation Statistics\n"
257
+ report += f"- Total words spoken: {patterns['total_words']}\n"
258
+ report += f"- Total sentences: {patterns['total_sentences']}\n"
259
+
260
+ # Filler word usage
261
+ if patterns["filler_words"]:
262
+ report += "\n### Filler Word Usage\n"
263
+ for word, count in sorted(patterns["filler_words"].items(), key=lambda x: x[1], reverse=True):
264
+ percentage = (count / patterns['total_words']) * 100 if patterns['total_words'] > 0 else 0
265
+ report += f"- '{word}': {count} times ({percentage:.1f}% of total words)\n"
266
+
267
+ # Word repetitions
268
+ if patterns["repetition"]:
269
+ report += "\n### Word Repetitions\n"
270
+ for word, count in sorted(patterns["repetition"].items(), key=lambda x: x[1], reverse=True)[:5]:
271
+ report += f"- '{word}': repeated {count} times\n"
272
+
273
+ # LLM insights
274
+ if patterns.get("llm_analyses") and len(patterns["llm_analyses"]) > 0:
275
+ report += "\n### Language Model Analysis\n"
276
+
277
+ # Get the latest analysis
278
+ latest = patterns["llm_analyses"][-1]["analysis"]
279
+
280
+ if "overall_assessment" in latest:
281
+ report += f"{latest['overall_assessment']}\n\n"
282
+
283
+ if "detailed_pattern_analysis" in latest and isinstance(latest["detailed_pattern_analysis"], list):
284
+ report += "#### Detailed Pattern Analysis\n"
285
+ for item in latest["detailed_pattern_analysis"]:
286
+ if isinstance(item, dict):
287
+ if "type" in item and "description" in item:
288
+ report += f"- **{item['type']}**: {item['description']}\n"
289
+
290
+ if "suggestions_for_improvement" in latest and isinstance(latest["suggestions_for_improvement"], list):
291
+ report += "\n#### Suggestions for Improvement\n"
292
+ for item in latest["suggestions_for_improvement"]:
293
+ if isinstance(item, str):
294
+ report += f"- {item}\n"
295
+ elif isinstance(item, dict) and "text" in item:
296
+ report += f"- {item['text']}\n"
297
+
298
+ return report
299
+
300
  def _save_analysis(self, session_id):
301
  """Save the speech analysis for this session"""
302
  analysis_file = os.path.join(self.analysis_dir, f"{session_id}_analysis.json")
nova_sonic.py CHANGED
@@ -7,49 +7,99 @@ from config import MODEL_ID, REGION, INPUT_SAMPLE_RATE, BIT_DEPTH, CHANNELS, GRE
7
  class NovaSonicClient:
8
  def __init__(self):
9
  self.client = boto3.client('bedrock-runtime', region_name=REGION)
10
- self.model_id = MODEL_ID
11
- self.audio_content_type = f"application/vnd.amazon.models.audio.lpcm;sample-rate={INPUT_SAMPLE_RATE};bit-depth={BIT_DEPTH};channels={CHANNELS}"
12
  self.stream = None
13
- self.buffer = b""
14
- self.transcripts = []
15
 
16
  def start_stream(self, session_manager=None, language_coach=None):
17
- """Initialize a bidirectional stream with Nova"""
18
  try:
 
19
  self.stream = self.client.invoke_model_with_bidirectional_stream(
20
  modelId=self.model_id,
21
- contentType=self.audio_content_type,
22
- accept="application/json"
23
  )
24
 
25
- print("Connected to Nova Sonic")
 
26
 
27
- # Send a system message to set up Nova's role
28
  self._send_system_message(session_manager, language_coach)
29
 
30
- # Send initial greeting to start the conversation
31
  greeting_result = self.send_text(GREETING_TEXT)
32
 
33
  if session_manager and greeting_result:
34
  session_manager.set_last_response(greeting_result['audio'])
35
 
 
 
36
  return greeting_result
37
-
38
  except Exception as e:
39
  print(f"Error starting Nova stream: {e}")
40
  return None
41
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
42
  def send_audio(self, audio_chunk):
43
- """Send an audio chunk to Nova and get response"""
44
- if not self.stream:
45
- print("Stream not initialized. Call start_stream() first.")
46
  return None
47
 
48
  try:
49
- # Send the audio chunk
 
 
 
 
 
 
 
 
 
 
 
 
 
50
  self.stream.send_chunk({
51
  "chunk": {
52
- "bytes": audio_chunk
53
  }
54
  })
55
 
@@ -57,24 +107,29 @@ class NovaSonicClient:
57
  audio_data = b""
58
  transcript = ""
59
 
60
- response_ready = False
61
- while not response_ready:
 
62
  chunk = self.stream.read_chunk()
63
  if "chunk" in chunk:
64
- payload = json.loads(chunk["chunk"]["bytes"].decode())
65
-
66
- # Extract audio if present
67
- if "output_audio" in payload:
68
- audio_base64 = payload["output_audio"]
69
- audio_data += base64.b64decode(audio_base64)
70
-
71
- # Extract transcript if present
72
- if "transcript" in payload:
73
- transcript = payload["transcript"]
74
-
75
- # Check if this is the last chunk
76
- if "is_final" in payload and payload["is_final"]:
77
- response_ready = True
 
 
 
 
78
 
79
  return {
80
  "audio": audio_data,
@@ -87,20 +142,25 @@ class NovaSonicClient:
87
 
88
  def send_text(self, text_message):
89
  """Send a text message instead of audio and get response"""
90
- if not self.stream:
91
- print("Stream not initialized. Call start_stream() first.")
92
  return None
93
 
94
  try:
95
- # Create the text message payload
96
- message_payload = {
97
- "message": text_message
 
 
 
 
 
98
  }
99
 
100
- # Send the message
101
  self.stream.send_chunk({
102
  "chunk": {
103
- "bytes": json.dumps(message_payload).encode()
104
  }
105
  })
106
 
@@ -108,24 +168,29 @@ class NovaSonicClient:
108
  audio_data = b""
109
  transcript = ""
110
 
111
- response_ready = False
112
- while not response_ready:
 
113
  chunk = self.stream.read_chunk()
114
  if "chunk" in chunk:
115
- payload = json.loads(chunk["chunk"]["bytes"].decode())
116
-
117
- # Extract audio if present
118
- if "output_audio" in payload:
119
- audio_base64 = payload["output_audio"]
120
- audio_data += base64.b64decode(audio_base64)
121
-
122
- # Extract transcript if present
123
- if "response" in payload:
124
- transcript = payload["response"]
125
-
126
- # Check if this is the last chunk
127
- if "is_final" in payload and payload["is_final"]:
128
- response_ready = True
 
 
 
 
129
 
130
  return {
131
  "audio": audio_data,
@@ -139,35 +204,41 @@ class NovaSonicClient:
139
  def _send_system_message(self, session_manager=None, language_coach=None):
140
  """Send a system message to configure Nova's behavior"""
141
  # Build system message with context from session and language coach
142
- system_message = {
143
- "role": "system",
144
- "content": (
145
- "You are Nova, a friendly and supportive conversation partner. "
146
- "Your goal is to engage in natural, flowing conversation that feels human and authentic. "
147
- "Respond thoughtfully and maintain the conversation context. "
148
- "Keep your responses concise and conversational. "
149
- "Ask open-ended questions to encourage the user to practice speaking. "
150
- "Never mention that you are an AI unless explicitly asked."
151
- )
152
- }
153
 
154
  # Add conversation history context if available
155
  if session_manager:
156
  conversation_context = session_manager.get_conversation_context()
157
  if conversation_context:
158
- system_message["content"] += f"\n\nConversation history:\n{conversation_context}"
159
 
160
  # Add language coaching context if available
161
  if language_coach and session_manager:
162
  coaching_context = language_coach.get_coaching_context(session_manager.session_id)
163
  if coaching_context:
164
- system_message["content"] += f"\n\n{coaching_context}"
 
 
 
 
 
 
 
 
 
165
 
166
- # Send the system message
167
  try:
 
168
  self.stream.send_chunk({
169
  "chunk": {
170
- "bytes": json.dumps(system_message).encode()
171
  }
172
  })
173
 
@@ -181,7 +252,24 @@ class NovaSonicClient:
181
  """Close the streaming connection"""
182
  if self.stream:
183
  try:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
184
  self.stream.done()
 
185
  print("Nova stream closed")
 
186
  except Exception as e:
187
  print(f"Error closing stream: {e}")
 
 
7
  class NovaSonicClient:
8
  def __init__(self):
9
  self.client = boto3.client('bedrock-runtime', region_name=REGION)
10
+ self.model_id = "amazon.nova-sonic-v1:0" # Use the official Nova Sonic model ID
 
11
  self.stream = None
12
+ self.session_active = False
 
13
 
14
  def start_stream(self, session_manager=None, language_coach=None):
15
+ """Initialize a bidirectional stream with Nova Sonic"""
16
  try:
17
+ # Initialize the stream
18
  self.stream = self.client.invoke_model_with_bidirectional_stream(
19
  modelId=self.model_id,
20
+ contentType="application/vnd.amazon.eventstream",
21
+ accept="application/vnd.amazon.eventstream"
22
  )
23
 
24
+ # Send session start event with configuration
25
+ self._send_session_start()
26
 
27
+ # Send system message to configure Nova's behavior
28
  self._send_system_message(session_manager, language_coach)
29
 
30
+ # Send initial greeting as a text message
31
  greeting_result = self.send_text(GREETING_TEXT)
32
 
33
  if session_manager and greeting_result:
34
  session_manager.set_last_response(greeting_result['audio'])
35
 
36
+ self.session_active = True
37
+ print("Connected to Nova Sonic")
38
  return greeting_result
39
+
40
  except Exception as e:
41
  print(f"Error starting Nova stream: {e}")
42
  return None
43
 
44
+ def _send_session_start(self):
45
+ """Send session start event to initialize the stream"""
46
+ start_session_event = {
47
+ "event": {
48
+ "sessionStart": {
49
+ "inferenceConfiguration": {
50
+ "maxTokens": 1024,
51
+ "topP": 0.9,
52
+ "temperature": 0.7
53
+ },
54
+ "audioOutputConfiguration": {
55
+ "sampleRateHertz": 24000,
56
+ "sampleSizeBits": 16,
57
+ "channelCount": 1,
58
+ "encoding": "base64",
59
+ "audioType": "SPEECH"
60
+ }
61
+ }
62
+ }
63
+ }
64
+
65
+ try:
66
+ # Send the event as bytes
67
+ self.stream.send_chunk({
68
+ "chunk": {
69
+ "bytes": json.dumps(start_session_event).encode('utf-8')
70
+ }
71
+ })
72
+
73
+ # Wait for acknowledgment
74
+ time.sleep(0.5)
75
+
76
+ except Exception as e:
77
+ print(f"Error sending session start: {e}")
78
+
79
  def send_audio(self, audio_chunk):
80
+ """Send an audio chunk to Nova Sonic and get response"""
81
+ if not self.stream or not self.session_active:
82
+ print("Stream not initialized or session not active")
83
  return None
84
 
85
  try:
86
+ # Encode the audio as base64
87
+ audio_base64 = base64.b64encode(audio_chunk).decode('utf-8')
88
+
89
+ # Create the audio event
90
+ audio_event = {
91
+ "event": {
92
+ "audioInput": {
93
+ "data": audio_base64,
94
+ "endpointResponse": True
95
+ }
96
+ }
97
+ }
98
+
99
+ # Send the audio event
100
  self.stream.send_chunk({
101
  "chunk": {
102
+ "bytes": json.dumps(audio_event).encode('utf-8')
103
  }
104
  })
105
 
 
107
  audio_data = b""
108
  transcript = ""
109
 
110
+ # Process response chunks until we get the final one
111
+ response_complete = False
112
+ while not response_complete:
113
  chunk = self.stream.read_chunk()
114
  if "chunk" in chunk:
115
+ try:
116
+ payload = json.loads(chunk["chunk"]["bytes"].decode('utf-8'))
117
+
118
+ # Check for audio output
119
+ if "event" in payload and "audioOutput" in payload["event"]:
120
+ audio_base64 = payload["event"]["audioOutput"]["data"]
121
+ audio_data += base64.b64decode(audio_base64)
122
+
123
+ # Check for transcript
124
+ if "event" in payload and "transcript" in payload["event"]:
125
+ transcript = payload["event"]["transcript"]["text"]
126
+
127
+ # Check if this is the end of the response
128
+ if "event" in payload and "endpointResponse" in payload["event"]:
129
+ response_complete = True
130
+
131
+ except Exception as e:
132
+ print(f"Error processing response chunk: {e}")
133
 
134
  return {
135
  "audio": audio_data,
 
142
 
143
  def send_text(self, text_message):
144
  """Send a text message instead of audio and get response"""
145
+ if not self.stream or not self.session_active:
146
+ print("Stream not initialized or session not active")
147
  return None
148
 
149
  try:
150
+ # Create the text input event
151
+ text_event = {
152
+ "event": {
153
+ "textInput": {
154
+ "text": text_message,
155
+ "endpointResponse": True
156
+ }
157
+ }
158
  }
159
 
160
+ # Send the text event
161
  self.stream.send_chunk({
162
  "chunk": {
163
+ "bytes": json.dumps(text_event).encode('utf-8')
164
  }
165
  })
166
 
 
168
  audio_data = b""
169
  transcript = ""
170
 
171
+ # Process response chunks until we get the final one
172
+ response_complete = False
173
+ while not response_complete:
174
  chunk = self.stream.read_chunk()
175
  if "chunk" in chunk:
176
+ try:
177
+ payload = json.loads(chunk["chunk"]["bytes"].decode('utf-8'))
178
+
179
+ # Check for audio output
180
+ if "event" in payload and "audioOutput" in payload["event"]:
181
+ audio_base64 = payload["event"]["audioOutput"]["data"]
182
+ audio_data += base64.b64decode(audio_base64)
183
+
184
+ # Check for transcript
185
+ if "event" in payload and "transcript" in payload["event"]:
186
+ transcript = payload["event"]["transcript"]["text"]
187
+
188
+ # Check if this is the end of the response
189
+ if "event" in payload and "endpointResponse" in payload["event"]:
190
+ response_complete = True
191
+
192
+ except Exception as e:
193
+ print(f"Error processing response chunk: {e}")
194
 
195
  return {
196
  "audio": audio_data,
 
204
  def _send_system_message(self, session_manager=None, language_coach=None):
205
  """Send a system message to configure Nova's behavior"""
206
  # Build system message with context from session and language coach
207
+ system_content = (
208
+ "You are Nova, a friendly and supportive conversation partner. "
209
+ "Your goal is to engage in natural, flowing conversation that feels human and authentic. "
210
+ "Respond thoughtfully and maintain the conversation context. "
211
+ "Keep your responses concise and conversational. "
212
+ "Ask open-ended questions to encourage the user to practice speaking. "
213
+ "Never mention that you are an AI unless explicitly asked."
214
+ )
 
 
 
215
 
216
  # Add conversation history context if available
217
  if session_manager:
218
  conversation_context = session_manager.get_conversation_context()
219
  if conversation_context:
220
+ system_content += f"\n\nConversation history:\n{conversation_context}"
221
 
222
  # Add language coaching context if available
223
  if language_coach and session_manager:
224
  coaching_context = language_coach.get_coaching_context(session_manager.session_id)
225
  if coaching_context:
226
+ system_content += f"\n\n{coaching_context}"
227
+
228
+ # Create the system event
229
+ system_event = {
230
+ "event": {
231
+ "systemInput": {
232
+ "content": system_content
233
+ }
234
+ }
235
+ }
236
 
 
237
  try:
238
+ # Send the system event
239
  self.stream.send_chunk({
240
  "chunk": {
241
+ "bytes": json.dumps(system_event).encode('utf-8')
242
  }
243
  })
244
 
 
252
  """Close the streaming connection"""
253
  if self.stream:
254
  try:
255
+ # Send session end event
256
+ end_event = {
257
+ "event": {
258
+ "sessionEnd": {}
259
+ }
260
+ }
261
+
262
+ self.stream.send_chunk({
263
+ "chunk": {
264
+ "bytes": json.dumps(end_event).encode('utf-8')
265
+ }
266
+ })
267
+
268
+ # Close the stream
269
  self.stream.done()
270
+ self.session_active = False
271
  print("Nova stream closed")
272
+
273
  except Exception as e:
274
  print(f"Error closing stream: {e}")
275
+ EOF < /dev/null
nova_sonic_tool_use.py ADDED
@@ -0,0 +1,882 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import asyncio
3
+ import base64
4
+ import json
5
+ import uuid
6
+ import warnings
7
+ import pyaudio
8
+ import pytz
9
+ import random
10
+ import hashlib
11
+ import datetime
12
+ import time
13
+ import inspect
14
+ from aws_sdk_bedrock_runtime.client import BedrockRuntimeClient, InvokeModelWithBidirectionalStreamOperationInput
15
+ from aws_sdk_bedrock_runtime.models import InvokeModelWithBidirectionalStreamInputChunk, BidirectionalInputPayloadPart
16
+ from aws_sdk_bedrock_runtime.config import Config, HTTPAuthSchemeResolver, SigV4AuthScheme
17
+ from smithy_aws_core.credentials_resolvers.environment import EnvironmentCredentialsResolver
18
+
19
+ # Suppress warnings
20
+ warnings.filterwarnings("ignore")
21
+
22
+ # Audio configuration
23
+ INPUT_SAMPLE_RATE = 16000
24
+ OUTPUT_SAMPLE_RATE = 24000
25
+ CHANNELS = 1
26
+ FORMAT = pyaudio.paInt16
27
+ CHUNK_SIZE = 1024 # Number of frames per buffer
28
+
29
+ # Debug mode flag
30
+ DEBUG = False
31
+
32
+ def debug_print(message):
33
+ """Print only if debug mode is enabled"""
34
+ if DEBUG:
35
+ functionName = inspect.stack()[1].function
36
+ if functionName == 'time_it' or functionName == 'time_it_async':
37
+ functionName = inspect.stack()[2].function
38
+ print('{:%Y-%m-%d %H:%M:%S.%f}'.format(datetime.datetime.now())[:-3] + ' ' + functionName + ' ' + message)
39
+
40
+ def time_it(label, methodToRun):
41
+ start_time = time.perf_counter()
42
+ result = methodToRun()
43
+ end_time = time.perf_counter()
44
+ debug_print(f"Execution time for {label}: {end_time - start_time:.4f} seconds")
45
+ return result
46
+
47
+ async def time_it_async(label, methodToRun):
48
+ start_time = time.perf_counter()
49
+ result = await methodToRun()
50
+ end_time = time.perf_counter()
51
+ debug_print(f"Execution time for {label}: {end_time - start_time:.4f} seconds")
52
+ return result
53
+
54
+ class BedrockStreamManager:
55
+ """Manages bidirectional streaming with AWS Bedrock using asyncio"""
56
+
57
+ # Event templates
58
+ START_SESSION_EVENT = '''{
59
+ "event": {
60
+ "sessionStart": {
61
+ "inferenceConfiguration": {
62
+ "maxTokens": 1024,
63
+ "topP": 0.9,
64
+ "temperature": 0.7
65
+ }
66
+ }
67
+ }
68
+ }'''
69
+
70
+ CONTENT_START_EVENT = '''{
71
+ "event": {
72
+ "contentStart": {
73
+ "promptName": "%s",
74
+ "contentName": "%s",
75
+ "type": "AUDIO",
76
+ "interactive": true,
77
+ "role": "USER",
78
+ "audioInputConfiguration": {
79
+ "mediaType": "audio/lpcm",
80
+ "sampleRateHertz": 16000,
81
+ "sampleSizeBits": 16,
82
+ "channelCount": 1,
83
+ "audioType": "SPEECH",
84
+ "encoding": "base64"
85
+ }
86
+ }
87
+ }
88
+ }'''
89
+
90
+ AUDIO_EVENT_TEMPLATE = '''{
91
+ "event": {
92
+ "audioInput": {
93
+ "promptName": "%s",
94
+ "contentName": "%s",
95
+ "content": "%s"
96
+ }
97
+ }
98
+ }'''
99
+
100
+ TEXT_CONTENT_START_EVENT = '''{
101
+ "event": {
102
+ "contentStart": {
103
+ "promptName": "%s",
104
+ "contentName": "%s",
105
+ "type": "TEXT",
106
+ "role": "%s",
107
+ "interactive": true,
108
+ "textInputConfiguration": {
109
+ "mediaType": "text/plain"
110
+ }
111
+ }
112
+ }
113
+ }'''
114
+
115
+ TEXT_INPUT_EVENT = '''{
116
+ "event": {
117
+ "textInput": {
118
+ "promptName": "%s",
119
+ "contentName": "%s",
120
+ "content": "%s"
121
+ }
122
+ }
123
+ }'''
124
+
125
+ TOOL_CONTENT_START_EVENT = '''{
126
+ "event": {
127
+ "contentStart": {
128
+ "promptName": "%s",
129
+ "contentName": "%s",
130
+ "interactive": false,
131
+ "type": "TOOL",
132
+ "role": "TOOL",
133
+ "toolResultInputConfiguration": {
134
+ "toolUseId": "%s",
135
+ "type": "TEXT",
136
+ "textInputConfiguration": {
137
+ "mediaType": "text/plain"
138
+ }
139
+ }
140
+ }
141
+ }
142
+ }'''
143
+
144
+ CONTENT_END_EVENT = '''{
145
+ "event": {
146
+ "contentEnd": {
147
+ "promptName": "%s",
148
+ "contentName": "%s"
149
+ }
150
+ }
151
+ }'''
152
+
153
+ PROMPT_END_EVENT = '''{
154
+ "event": {
155
+ "promptEnd": {
156
+ "promptName": "%s"
157
+ }
158
+ }
159
+ }'''
160
+
161
+ SESSION_END_EVENT = '''{
162
+ "event": {
163
+ "sessionEnd": {}
164
+ }
165
+ }'''
166
+
167
+ def start_prompt(self):
168
+ """Create a promptStart event"""
169
+ get_default_tool_schema = json.dumps({
170
+ "type": "object",
171
+ "properties": {},
172
+ "required": []
173
+ })
174
+
175
+ get_order_tracking_schema = json.dumps({
176
+ "type": "object",
177
+ "properties": {
178
+ "orderId": {
179
+ "type": "string",
180
+ "description": "The order number or ID to track"
181
+ },
182
+ "requestNotifications": {
183
+ "type": "boolean",
184
+ "description": "Whether to set up notifications for this order",
185
+ "default": False
186
+ }
187
+ },
188
+ "required": ["orderId"]
189
+ })
190
+
191
+
192
+ prompt_start_event = {
193
+ "event": {
194
+ "promptStart": {
195
+ "promptName": self.prompt_name,
196
+ "textOutputConfiguration": {
197
+ "mediaType": "text/plain"
198
+ },
199
+ "audioOutputConfiguration": {
200
+ "mediaType": "audio/lpcm",
201
+ "sampleRateHertz": 24000,
202
+ "sampleSizeBits": 16,
203
+ "channelCount": 1,
204
+ "voiceId": "matthew",
205
+ "encoding": "base64",
206
+ "audioType": "SPEECH"
207
+ },
208
+ "toolUseOutputConfiguration": {
209
+ "mediaType": "application/json"
210
+ },
211
+ "toolConfiguration": {
212
+ "tools": [
213
+ {
214
+ "toolSpec": {
215
+ "name": "getDateAndTimeTool",
216
+ "description": "get information about the current date and time",
217
+ "inputSchema": {
218
+ "json": get_default_tool_schema
219
+ }
220
+ }
221
+ },
222
+ {
223
+ "toolSpec": {
224
+ "name": "trackOrderTool",
225
+ "description": "Retrieves real-time order tracking information and detailed status updates for customer orders by order ID. Provides estimated delivery dates. Use this tool when customers ask about their order status or delivery timeline.",
226
+ "inputSchema": {
227
+ "json": get_order_tracking_schema
228
+ }
229
+ }
230
+ }
231
+ ]
232
+ }
233
+ }
234
+ }
235
+ }
236
+
237
+ return json.dumps(prompt_start_event)
238
+
239
+ def tool_result_event(self, content_name, content, role):
240
+ """Create a tool result event"""
241
+
242
+ if isinstance(content, dict):
243
+ content_json_string = json.dumps(content)
244
+ else:
245
+ content_json_string = content
246
+
247
+ tool_result_event = {
248
+ "event": {
249
+ "toolResult": {
250
+ "promptName": self.prompt_name,
251
+ "contentName": content_name,
252
+ "content": content_json_string
253
+ }
254
+ }
255
+ }
256
+ return json.dumps(tool_result_event)
257
+
258
+ def __init__(self, model_id='amazon.nova-sonic-v1:0', region='us-east-1'):
259
+ """Initialize the stream manager."""
260
+ self.model_id = model_id
261
+ self.region = region
262
+
263
+ # Replace RxPy subjects with asyncio queues
264
+ self.audio_input_queue = asyncio.Queue()
265
+ self.audio_output_queue = asyncio.Queue()
266
+ self.output_queue = asyncio.Queue()
267
+
268
+ self.response_task = None
269
+ self.stream_response = None
270
+ self.is_active = False
271
+ self.barge_in = False
272
+ self.bedrock_client = None
273
+
274
+ # Audio playback components
275
+ self.audio_player = None
276
+
277
+ # Text response components
278
+ self.display_assistant_text = False
279
+ self.role = None
280
+
281
+ # Session information
282
+ self.prompt_name = str(uuid.uuid4())
283
+ self.content_name = str(uuid.uuid4())
284
+ self.audio_content_name = str(uuid.uuid4())
285
+ self.toolUseContent = ""
286
+ self.toolUseId = ""
287
+ self.toolName = ""
288
+
289
+ def _initialize_client(self):
290
+ """Initialize the Bedrock client."""
291
+ config = Config(
292
+ endpoint_uri=f"https://bedrock-runtime.{self.region}.amazonaws.com",
293
+ region=self.region,
294
+ aws_credentials_identity_resolver=EnvironmentCredentialsResolver(),
295
+ http_auth_scheme_resolver=HTTPAuthSchemeResolver(),
296
+ http_auth_schemes={"aws.auth#sigv4": SigV4AuthScheme()}
297
+ )
298
+ self.bedrock_client = BedrockRuntimeClient(config=config)
299
+
300
+ async def initialize_stream(self):
301
+ """Initialize the bidirectional stream with Bedrock."""
302
+ if not self.bedrock_client:
303
+ self._initialize_client()
304
+
305
+ try:
306
+ self.stream_response = await time_it_async("invoke_model_with_bidirectional_stream", lambda : self.bedrock_client.invoke_model_with_bidirectional_stream( InvokeModelWithBidirectionalStreamOperationInput(model_id=self.model_id)))
307
+ self.is_active = True
308
+ default_system_prompt = "You are a friend. The user and you will engage in a spoken dialog exchanging the transcripts of a natural real-time conversation." \
309
+ "When reading order numbers, please read each digit individually, separated by pauses. For example, order #1234 should be read as 'order number one-two-three-four' rather than 'order number one thousand two hundred thirty-four'."
310
+
311
+ # Send initialization events
312
+ prompt_event = self.start_prompt()
313
+ text_content_start = self.TEXT_CONTENT_START_EVENT % (self.prompt_name, self.content_name, "SYSTEM")
314
+ text_content = self.TEXT_INPUT_EVENT % (self.prompt_name, self.content_name, default_system_prompt)
315
+ text_content_end = self.CONTENT_END_EVENT % (self.prompt_name, self.content_name)
316
+
317
+ init_events = [self.START_SESSION_EVENT, prompt_event, text_content_start, text_content, text_content_end]
318
+
319
+ for event in init_events:
320
+ await self.send_raw_event(event)
321
+ # Small delay between init events
322
+ await asyncio.sleep(0.1)
323
+
324
+ # Start listening for responses
325
+ self.response_task = asyncio.create_task(self._process_responses())
326
+
327
+ # Start processing audio input
328
+ asyncio.create_task(self._process_audio_input())
329
+
330
+ # Wait a bit to ensure everything is set up
331
+ await asyncio.sleep(0.1)
332
+
333
+ debug_print("Stream initialized successfully")
334
+ return self
335
+ except Exception as e:
336
+ self.is_active = False
337
+ print(f"Failed to initialize stream: {str(e)}")
338
+ raise
339
+
340
+ async def send_raw_event(self, event_json):
341
+ """Send a raw event JSON to the Bedrock stream."""
342
+ if not self.stream_response or not self.is_active:
343
+ debug_print("Stream not initialized or closed")
344
+ return
345
+
346
+ event = InvokeModelWithBidirectionalStreamInputChunk(
347
+ value=BidirectionalInputPayloadPart(bytes_=event_json.encode('utf-8'))
348
+ )
349
+
350
+ try:
351
+ await self.stream_response.input_stream.send(event)
352
+ # For debugging large events, you might want to log just the type
353
+ if DEBUG:
354
+ if len(event_json) > 200:
355
+ event_type = json.loads(event_json).get("event", {}).keys()
356
+ debug_print(f"Sent event type: {list(event_type)}")
357
+ else:
358
+ debug_print(f"Sent event: {event_json}")
359
+ except Exception as e:
360
+ debug_print(f"Error sending event: {str(e)}")
361
+ if DEBUG:
362
+ import traceback
363
+ traceback.print_exc()
364
+
365
+ async def send_audio_content_start_event(self):
366
+ """Send a content start event to the Bedrock stream."""
367
+ content_start_event = self.CONTENT_START_EVENT % (self.prompt_name, self.audio_content_name)
368
+ await self.send_raw_event(content_start_event)
369
+
370
+ async def _process_audio_input(self):
371
+ """Process audio input from the queue and send to Bedrock."""
372
+ while self.is_active:
373
+ try:
374
+ # Get audio data from the queue
375
+ data = await self.audio_input_queue.get()
376
+
377
+ audio_bytes = data.get('audio_bytes')
378
+ if not audio_bytes:
379
+ debug_print("No audio bytes received")
380
+ continue
381
+
382
+ # Base64 encode the audio data
383
+ blob = base64.b64encode(audio_bytes)
384
+ audio_event = self.AUDIO_EVENT_TEMPLATE % (
385
+ self.prompt_name,
386
+ self.audio_content_name,
387
+ blob.decode('utf-8')
388
+ )
389
+
390
+ # Send the event
391
+ await self.send_raw_event(audio_event)
392
+
393
+ except asyncio.CancelledError:
394
+ break
395
+ except Exception as e:
396
+ debug_print(f"Error processing audio: {e}")
397
+ if DEBUG:
398
+ import traceback
399
+ traceback.print_exc()
400
+
401
+ def add_audio_chunk(self, audio_bytes):
402
+ """Add an audio chunk to the queue."""
403
+ self.audio_input_queue.put_nowait({
404
+ 'audio_bytes': audio_bytes,
405
+ 'prompt_name': self.prompt_name,
406
+ 'content_name': self.audio_content_name
407
+ })
408
+
409
+ async def send_audio_content_end_event(self):
410
+ """Send a content end event to the Bedrock stream."""
411
+ if not self.is_active:
412
+ debug_print("Stream is not active")
413
+ return
414
+
415
+ content_end_event = self.CONTENT_END_EVENT % (self.prompt_name, self.audio_content_name)
416
+ await self.send_raw_event(content_end_event)
417
+ debug_print("Audio ended")
418
+
419
+ async def send_tool_start_event(self, content_name):
420
+ """Send a tool content start event to the Bedrock stream."""
421
+ content_start_event = self.TOOL_CONTENT_START_EVENT % (self.prompt_name, content_name, self.toolUseId)
422
+ debug_print(f"Sending tool start event: {content_start_event}")
423
+ await self.send_raw_event(content_start_event)
424
+
425
+ async def send_tool_result_event(self, content_name, tool_result):
426
+ """Send a tool content event to the Bedrock stream."""
427
+ # Use the actual tool result from processToolUse
428
+ tool_result_event = self.tool_result_event(content_name=content_name, content=tool_result, role="TOOL")
429
+ debug_print(f"Sending tool result event: {tool_result_event}")
430
+ await self.send_raw_event(tool_result_event)
431
+
432
+ async def send_tool_content_end_event(self, content_name):
433
+ """Send a tool content end event to the Bedrock stream."""
434
+ tool_content_end_event = self.CONTENT_END_EVENT % (self.prompt_name, content_name)
435
+ debug_print(f"Sending tool content event: {tool_content_end_event}")
436
+ await self.send_raw_event(tool_content_end_event)
437
+
438
+ async def send_prompt_end_event(self):
439
+ """Close the stream and clean up resources."""
440
+ if not self.is_active:
441
+ debug_print("Stream is not active")
442
+ return
443
+
444
+ prompt_end_event = self.PROMPT_END_EVENT % (self.prompt_name)
445
+ await self.send_raw_event(prompt_end_event)
446
+ debug_print("Prompt ended")
447
+
448
+ async def send_session_end_event(self):
449
+ """Send a session end event to the Bedrock stream."""
450
+ if not self.is_active:
451
+ debug_print("Stream is not active")
452
+ return
453
+
454
+ await self.send_raw_event(self.SESSION_END_EVENT)
455
+ self.is_active = False
456
+ debug_print("Session ended")
457
+
458
+ async def _process_responses(self):
459
+ """Process incoming responses from Bedrock."""
460
+ try:
461
+ while self.is_active:
462
+ try:
463
+ output = await self.stream_response.await_output()
464
+ result = await output[1].receive()
465
+ if result.value and result.value.bytes_:
466
+ try:
467
+ response_data = result.value.bytes_.decode('utf-8')
468
+ json_data = json.loads(response_data)
469
+
470
+ # Handle different response types
471
+ if 'event' in json_data:
472
+ if 'contentStart' in json_data['event']:
473
+ debug_print("Content start detected")
474
+ content_start = json_data['event']['contentStart']
475
+ # set role
476
+ self.role = content_start['role']
477
+ # Check for speculative content
478
+ if 'additionalModelFields' in content_start:
479
+ try:
480
+ additional_fields = json.loads(content_start['additionalModelFields'])
481
+ if additional_fields.get('generationStage') == 'SPECULATIVE':
482
+ debug_print("Speculative content detected")
483
+ self.display_assistant_text = True
484
+ else:
485
+ self.display_assistant_text = False
486
+ except json.JSONDecodeError:
487
+ debug_print("Error parsing additionalModelFields")
488
+ elif 'textOutput' in json_data['event']:
489
+ text_content = json_data['event']['textOutput']['content']
490
+ role = json_data['event']['textOutput']['role']
491
+ # Check if there is a barge-in
492
+ if '{ "interrupted" : true }' in text_content:
493
+ debug_print("Barge-in detected. Stopping audio output.")
494
+ self.barge_in = True
495
+
496
+ if (self.role == "ASSISTANT" and self.display_assistant_text):
497
+ print(f"Assistant: {text_content}")
498
+ elif (self.role == "USER"):
499
+ print(f"User: {text_content}")
500
+
501
+ elif 'audioOutput' in json_data['event']:
502
+ audio_content = json_data['event']['audioOutput']['content']
503
+ audio_bytes = base64.b64decode(audio_content)
504
+ await self.audio_output_queue.put(audio_bytes)
505
+ elif 'toolUse' in json_data['event']:
506
+ self.toolUseContent = json_data['event']['toolUse']
507
+ self.toolName = json_data['event']['toolUse']['toolName']
508
+ self.toolUseId = json_data['event']['toolUse']['toolUseId']
509
+ debug_print(f"Tool use detected: {self.toolName}, ID: {self.toolUseId}")
510
+ elif 'contentEnd' in json_data['event'] and json_data['event'].get('contentEnd', {}).get('type') == 'TOOL':
511
+ debug_print("Processing tool use and sending result")
512
+ toolResult = await self.processToolUse(self.toolName, self.toolUseContent)
513
+ toolContent = str(uuid.uuid4())
514
+ await self.send_tool_start_event(toolContent)
515
+ await self.send_tool_result_event(toolContent, toolResult)
516
+ await self.send_tool_content_end_event(toolContent)
517
+
518
+ elif 'completionEnd' in json_data['event']:
519
+ # Handle end of conversation, no more response will be generated
520
+ print("End of response sequence")
521
+
522
+ # Put the response in the output queue for other components
523
+ await self.output_queue.put(json_data)
524
+ except json.JSONDecodeError:
525
+ await self.output_queue.put({"raw_data": response_data})
526
+ except StopAsyncIteration:
527
+ # Stream has ended
528
+ break
529
+ except Exception as e:
530
+ # Handle ValidationException properly
531
+ if "ValidationException" in str(e):
532
+ error_message = str(e)
533
+ print(f"Validation error: {error_message}")
534
+ else:
535
+ print(f"Error receiving response: {e}")
536
+ break
537
+
538
+ except Exception as e:
539
+ print(f"Response processing error: {e}")
540
+ finally:
541
+ self.is_active = False
542
+
543
+ async def processToolUse(self, toolName, toolUseContent):
544
+ """Return the tool result"""
545
+ tool = toolName.lower()
546
+ debug_print(f"Tool Use Content: {toolUseContent}")
547
+
548
+ if tool == "getdateandtimetool":
549
+ # Get current date in PST timezone
550
+ pst_timezone = pytz.timezone("America/Los_Angeles")
551
+ pst_date = datetime.datetime.now(pst_timezone)
552
+
553
+ return {
554
+ "formattedTime": pst_date.strftime("%I:%M %p"),
555
+ "date": pst_date.strftime("%Y-%m-%d"),
556
+ "year": pst_date.year,
557
+ "month": pst_date.month,
558
+ "day": pst_date.day,
559
+ "dayOfWeek": pst_date.strftime("%A").upper(),
560
+ "timezone": "PST"
561
+ }
562
+
563
+ elif tool == "trackordertool":
564
+
565
+ # Extract order ID from toolUseContent
566
+ content = toolUseContent.get("content", {})
567
+ content_data = json.loads(content)
568
+ order_id = content_data.get("orderId", "")
569
+ request_notifications = toolUseContent.get("requestNotifications", False)
570
+
571
+ # Convert order_id to string if it's an integer
572
+ if isinstance(order_id, int):
573
+ order_id = str(order_id)
574
+ # Validate order ID format
575
+ if not order_id or not isinstance(order_id, str):
576
+ return {
577
+ "error": "Invalid order ID format",
578
+ "orderStatus": "",
579
+ "estimatedDelivery": "",
580
+ "lastUpdate": ""
581
+ }
582
+
583
+ # Create deterministic randomness based on order ID
584
+ # This ensures the same order ID always returns the same status
585
+ seed = int(hashlib.md5(order_id.encode(), usedforsecurity=False).hexdigest(), 16) % 10000
586
+ random.seed(seed)
587
+
588
+ # Possible statuses with appropriate weights
589
+ statuses = [
590
+ "Order received",
591
+ "Processing",
592
+ "Preparing for shipment",
593
+ "Shipped",
594
+ "In transit",
595
+ "Out for delivery",
596
+ "Delivered",
597
+ "Delayed"
598
+ ]
599
+
600
+ weights = [10, 15, 15, 20, 20, 10, 5, 3]
601
+
602
+ # Select a status based on the weights
603
+ status = random.choices(statuses, weights=weights, k=1)[0]
604
+
605
+ # Generate a realistic estimated delivery date
606
+ today = datetime.datetime.now()
607
+ # Handle estimated delivery date based on status
608
+ if status == "Delivered":
609
+ # For delivered items, delivery date is in the past
610
+ delivery_days = -random.randint(0, 3)
611
+ estimated_delivery = (today + datetime.timedelta(days=delivery_days)).strftime("%Y-%m-%d")
612
+ elif status == "Out for delivery":
613
+ # For out for delivery, delivery is today
614
+ estimated_delivery = today.strftime("%Y-%m-%d")
615
+ else:
616
+ # For other statuses, delivery is in the future
617
+ delivery_days = random.randint(1, 10)
618
+ estimated_delivery = (today + datetime.timedelta(days=delivery_days)).strftime("%Y-%m-%d")
619
+
620
+ # Handle notification request if enabled
621
+ notification_message = ""
622
+ if request_notifications and status != "Delivered":
623
+ notification_message = f"You will receive notifications for order {order_id}"
624
+
625
+ # Return comprehensive tracking information
626
+ tracking_info = {
627
+ "orderStatus": status,
628
+ "orderNumber": order_id,
629
+ "notificationStatus": notification_message
630
+ }
631
+
632
+ # Add appropriate fields based on status
633
+ if status == "Delivered":
634
+ tracking_info["deliveredOn"] = estimated_delivery
635
+ elif status == "Out for delivery":
636
+ tracking_info["expectedDelivery"] = "Today"
637
+ else:
638
+ tracking_info["estimatedDelivery"] = estimated_delivery
639
+
640
+ # Add location information based on status
641
+ if status == "In transit":
642
+ tracking_info["currentLocation"] = "Distribution Center"
643
+ elif status == "Delivered":
644
+ tracking_info["deliveryLocation"] = "Front Door"
645
+
646
+ # Add additional info for delayed status
647
+ if status == "Delayed":
648
+ tracking_info["additionalInfo"] = "Weather delays possible"
649
+
650
+ return tracking_info
651
+
652
+ async def close(self):
653
+ """Close the stream properly."""
654
+ if not self.is_active:
655
+ return
656
+
657
+ self.is_active = False
658
+ if self.response_task and not self.response_task.done():
659
+ self.response_task.cancel()
660
+
661
+ await self.send_audio_content_end_event()
662
+ await self.send_prompt_end_event()
663
+ await self.send_session_end_event()
664
+
665
+ if self.stream_response:
666
+ await self.stream_response.input_stream.close()
667
+
668
+ class AudioStreamer:
669
+ """Handles continuous microphone input and audio output using separate streams."""
670
+
671
+ def __init__(self, stream_manager):
672
+ self.stream_manager = stream_manager
673
+ self.is_streaming = False
674
+ self.loop = asyncio.get_event_loop()
675
+
676
+ # Initialize PyAudio
677
+ debug_print("AudioStreamer Initializing PyAudio...")
678
+ self.p = time_it("AudioStreamerInitPyAudio", pyaudio.PyAudio)
679
+ debug_print("AudioStreamer PyAudio initialized")
680
+
681
+ # Initialize separate streams for input and output
682
+ # Input stream with callback for microphone
683
+ debug_print("Opening input audio stream...")
684
+ self.input_stream = time_it("AudioStreamerOpenAudio", lambda : self.p.open(
685
+ format=FORMAT,
686
+ channels=CHANNELS,
687
+ rate=INPUT_SAMPLE_RATE,
688
+ input=True,
689
+ frames_per_buffer=CHUNK_SIZE,
690
+ stream_callback=self.input_callback
691
+ ))
692
+ debug_print("input audio stream opened")
693
+
694
+ # Output stream for direct writing (no callback)
695
+ debug_print("Opening output audio stream...")
696
+ self.output_stream = time_it("AudioStreamerOpenAudio", lambda : self.p.open(
697
+ format=FORMAT,
698
+ channels=CHANNELS,
699
+ rate=OUTPUT_SAMPLE_RATE,
700
+ output=True,
701
+ frames_per_buffer=CHUNK_SIZE
702
+ ))
703
+
704
+ debug_print("output audio stream opened")
705
+
706
+ def input_callback(self, in_data, frame_count, time_info, status):
707
+ """Callback function that schedules audio processing in the asyncio event loop"""
708
+ if self.is_streaming and in_data:
709
+ # Schedule the task in the event loop
710
+ asyncio.run_coroutine_threadsafe(
711
+ self.process_input_audio(in_data),
712
+ self.loop
713
+ )
714
+ return (None, pyaudio.paContinue)
715
+
716
+ async def process_input_audio(self, audio_data):
717
+ """Process a single audio chunk directly"""
718
+ try:
719
+ # Send audio to Bedrock immediately
720
+ self.stream_manager.add_audio_chunk(audio_data)
721
+ except Exception as e:
722
+ if self.is_streaming:
723
+ print(f"Error processing input audio: {e}")
724
+
725
+ async def play_output_audio(self):
726
+ """Play audio responses from Nova Sonic"""
727
+ while self.is_streaming:
728
+ try:
729
+ # Check for barge-in flag
730
+ if self.stream_manager.barge_in:
731
+ # Clear the audio queue
732
+ while not self.stream_manager.audio_output_queue.empty():
733
+ try:
734
+ self.stream_manager.audio_output_queue.get_nowait()
735
+ except asyncio.QueueEmpty:
736
+ break
737
+ self.stream_manager.barge_in = False
738
+ # Small sleep after clearing
739
+ await asyncio.sleep(0.05)
740
+ continue
741
+
742
+ # Get audio data from the stream manager's queue
743
+ audio_data = await asyncio.wait_for(
744
+ self.stream_manager.audio_output_queue.get(),
745
+ timeout=0.1
746
+ )
747
+
748
+ if audio_data and self.is_streaming:
749
+ # Write directly to the output stream in smaller chunks
750
+ chunk_size = CHUNK_SIZE # Use the same chunk size as the stream
751
+
752
+ # Write the audio data in chunks to avoid blocking too long
753
+ for i in range(0, len(audio_data), chunk_size):
754
+ if not self.is_streaming:
755
+ break
756
+
757
+ end = min(i + chunk_size, len(audio_data))
758
+ chunk = audio_data[i:end]
759
+
760
+ # Create a new function that captures the chunk by value
761
+ def write_chunk(data):
762
+ return self.output_stream.write(data)
763
+
764
+ # Pass the chunk to the function
765
+ await asyncio.get_event_loop().run_in_executor(None, write_chunk, chunk)
766
+
767
+ # Brief yield to allow other tasks to run
768
+ await asyncio.sleep(0.001)
769
+
770
+ except asyncio.TimeoutError:
771
+ # No data available within timeout, just continue
772
+ continue
773
+ except Exception as e:
774
+ if self.is_streaming:
775
+ print(f"Error playing output audio: {str(e)}")
776
+ import traceback
777
+ traceback.print_exc()
778
+ await asyncio.sleep(0.05)
779
+
780
+ async def start_streaming(self):
781
+ """Start streaming audio."""
782
+ if self.is_streaming:
783
+ return
784
+
785
+ print("Starting audio streaming. Speak into your microphone...")
786
+ print("Press Enter to stop streaming...")
787
+
788
+ # Send audio content start event
789
+ await time_it_async("send_audio_content_start_event", lambda : self.stream_manager.send_audio_content_start_event())
790
+
791
+ self.is_streaming = True
792
+
793
+ # Start the input stream if not already started
794
+ if not self.input_stream.is_active():
795
+ self.input_stream.start_stream()
796
+
797
+ # Start processing tasks
798
+ #self.input_task = asyncio.create_task(self.process_input_audio())
799
+ self.output_task = asyncio.create_task(self.play_output_audio())
800
+
801
+ # Wait for user to press Enter to stop
802
+ await asyncio.get_event_loop().run_in_executor(None, input)
803
+
804
+ # Once input() returns, stop streaming
805
+ await self.stop_streaming()
806
+
807
+ async def stop_streaming(self):
808
+ """Stop streaming audio."""
809
+ if not self.is_streaming:
810
+ return
811
+
812
+ self.is_streaming = False
813
+
814
+ # Cancel the tasks
815
+ tasks = []
816
+ if hasattr(self, 'input_task') and not self.input_task.done():
817
+ tasks.append(self.input_task)
818
+ if hasattr(self, 'output_task') and not self.output_task.done():
819
+ tasks.append(self.output_task)
820
+ for task in tasks:
821
+ task.cancel()
822
+ if tasks:
823
+ await asyncio.gather(*tasks, return_exceptions=True)
824
+ # Stop and close the streams
825
+ if self.input_stream:
826
+ if self.input_stream.is_active():
827
+ self.input_stream.stop_stream()
828
+ self.input_stream.close()
829
+ if self.output_stream:
830
+ if self.output_stream.is_active():
831
+ self.output_stream.stop_stream()
832
+ self.output_stream.close()
833
+ if self.p:
834
+ self.p.terminate()
835
+
836
+ await self.stream_manager.close()
837
+
838
+
839
+ async def main(debug=False):
840
+ """Main function to run the application."""
841
+ global DEBUG
842
+ DEBUG = debug
843
+
844
+ # Create stream manager
845
+ stream_manager = BedrockStreamManager(model_id='amazon.nova-sonic-v1:0', region='us-east-1')
846
+
847
+ # Create audio streamer
848
+ audio_streamer = AudioStreamer(stream_manager)
849
+
850
+ # Initialize the stream
851
+ await time_it_async("initialize_stream", stream_manager.initialize_stream)
852
+
853
+ try:
854
+ # This will run until the user presses Enter
855
+ await audio_streamer.start_streaming()
856
+
857
+ except KeyboardInterrupt:
858
+ print("Interrupted by user")
859
+ finally:
860
+ # Clean up
861
+ await audio_streamer.stop_streaming()
862
+
863
+
864
+ if __name__ == "__main__":
865
+ import argparse
866
+
867
+ parser = argparse.ArgumentParser(description='Nova Sonic Python Streaming')
868
+ parser.add_argument('--debug', action='store_true', help='Enable debug mode')
869
+ args = parser.parse_args()
870
+ # Set your AWS credentials here or use environment variables
871
+ # os.environ['AWS_ACCESS_KEY_ID'] = "AWS_ACCESS_KEY_ID"
872
+ # os.environ['AWS_SECRET_ACCESS_KEY'] = "AWS_SECRET_ACCESS_KEY"
873
+ # os.environ['AWS_DEFAULT_REGION'] = "us-east-1"
874
+
875
+ # Run the main function
876
+ try:
877
+ asyncio.run(main(debug=args.debug))
878
+ except Exception as e:
879
+ print(f"Application error: {e}")
880
+ if args.debug:
881
+ import traceback
882
+ traceback.print_exc()