Spaces:
Sleeping
Sleeping
| import boto3 | |
| import base64 | |
| import json | |
| import time | |
| from config import MODEL_ID, REGION, INPUT_SAMPLE_RATE, BIT_DEPTH, CHANNELS, GREETING_TEXT | |
| class NovaSonicClient: | |
| def __init__(self): | |
| self.client = boto3.client('bedrock-runtime', region_name=REGION) | |
| self.model_id = "amazon.nova-sonic-v1:0" # Use the official Nova Sonic model ID | |
| self.stream = None | |
| self.session_active = False | |
| def start_stream(self, session_manager=None, language_coach=None): | |
| """Initialize a bidirectional stream with Nova Sonic""" | |
| try: | |
| # Initialize the stream | |
| self.stream = self.client.invoke_model_with_bidirectional_stream( | |
| modelId=self.model_id, | |
| contentType="application/vnd.amazon.eventstream", | |
| accept="application/vnd.amazon.eventstream" | |
| ) | |
| # Send session start event with configuration | |
| self._send_session_start() | |
| # Send system message to configure Nova's behavior | |
| self._send_system_message(session_manager, language_coach) | |
| # Send initial greeting as a text message | |
| greeting_result = self.send_text(GREETING_TEXT) | |
| if session_manager and greeting_result: | |
| session_manager.set_last_response(greeting_result['audio']) | |
| self.session_active = True | |
| print("Connected to Nova Sonic") | |
| return greeting_result | |
| except Exception as e: | |
| print(f"Error starting Nova stream: {e}") | |
| return None | |
| def _send_session_start(self): | |
| """Send session start event to initialize the stream""" | |
| start_session_event = { | |
| "event": { | |
| "sessionStart": { | |
| "inferenceConfiguration": { | |
| "maxTokens": 1024, | |
| "topP": 0.9, | |
| "temperature": 0.7 | |
| }, | |
| "audioOutputConfiguration": { | |
| "sampleRateHertz": 24000, | |
| "sampleSizeBits": 16, | |
| "channelCount": 1, | |
| "encoding": "base64", | |
| "audioType": "SPEECH" | |
| } | |
| } | |
| } | |
| } | |
| try: | |
| # Send the event as bytes | |
| self.stream.send_chunk({ | |
| "chunk": { | |
| "bytes": json.dumps(start_session_event).encode('utf-8') | |
| } | |
| }) | |
| # Wait for acknowledgment | |
| time.sleep(0.5) | |
| except Exception as e: | |
| print(f"Error sending session start: {e}") | |
| def send_audio(self, audio_chunk): | |
| """Send an audio chunk to Nova Sonic and get response""" | |
| if not self.stream or not self.session_active: | |
| print("Stream not initialized or session not active") | |
| return None | |
| try: | |
| # Encode the audio as base64 | |
| audio_base64 = base64.b64encode(audio_chunk).decode('utf-8') | |
| # Create the audio event | |
| audio_event = { | |
| "event": { | |
| "audioInput": { | |
| "data": audio_base64, | |
| "endpointResponse": True | |
| } | |
| } | |
| } | |
| # Send the audio event | |
| self.stream.send_chunk({ | |
| "chunk": { | |
| "bytes": json.dumps(audio_event).encode('utf-8') | |
| } | |
| }) | |
| # Get the response | |
| audio_data = b"" | |
| transcript = "" | |
| # Process response chunks until we get the final one | |
| response_complete = False | |
| while not response_complete: | |
| chunk = self.stream.read_chunk() | |
| if "chunk" in chunk: | |
| try: | |
| payload = json.loads(chunk["chunk"]["bytes"].decode('utf-8')) | |
| # Check for audio output | |
| if "event" in payload and "audioOutput" in payload["event"]: | |
| audio_base64 = payload["event"]["audioOutput"]["data"] | |
| audio_data += base64.b64decode(audio_base64) | |
| # Check for transcript | |
| if "event" in payload and "transcript" in payload["event"]: | |
| transcript = payload["event"]["transcript"]["text"] | |
| # Check if this is the end of the response | |
| if "event" in payload and "endpointResponse" in payload["event"]: | |
| response_complete = True | |
| except Exception as e: | |
| print(f"Error processing response chunk: {e}") | |
| return { | |
| "audio": audio_data, | |
| "transcript": transcript | |
| } | |
| except Exception as e: | |
| print(f"Error in audio streaming: {e}") | |
| return None | |
| def send_text(self, text_message): | |
| """Send a text message instead of audio and get response""" | |
| if not self.stream or not self.session_active: | |
| print("Stream not initialized or session not active") | |
| return None | |
| try: | |
| # Create the text input event | |
| text_event = { | |
| "event": { | |
| "textInput": { | |
| "text": text_message, | |
| "endpointResponse": True | |
| } | |
| } | |
| } | |
| # Send the text event | |
| self.stream.send_chunk({ | |
| "chunk": { | |
| "bytes": json.dumps(text_event).encode('utf-8') | |
| } | |
| }) | |
| # Get the response | |
| audio_data = b"" | |
| transcript = "" | |
| # Process response chunks until we get the final one | |
| response_complete = False | |
| while not response_complete: | |
| chunk = self.stream.read_chunk() | |
| if "chunk" in chunk: | |
| try: | |
| payload = json.loads(chunk["chunk"]["bytes"].decode('utf-8')) | |
| # Check for audio output | |
| if "event" in payload and "audioOutput" in payload["event"]: | |
| audio_base64 = payload["event"]["audioOutput"]["data"] | |
| audio_data += base64.b64decode(audio_base64) | |
| # Check for transcript | |
| if "event" in payload and "transcript" in payload["event"]: | |
| transcript = payload["event"]["transcript"]["text"] | |
| # Check if this is the end of the response | |
| if "event" in payload and "endpointResponse" in payload["event"]: | |
| response_complete = True | |
| except Exception as e: | |
| print(f"Error processing response chunk: {e}") | |
| return { | |
| "audio": audio_data, | |
| "transcript": transcript | |
| } | |
| except Exception as e: | |
| print(f"Error sending text message: {e}") | |
| return None | |
| def _send_system_message(self, session_manager=None, language_coach=None): | |
| """Send a system message to configure Nova's behavior""" | |
| # Build system message with context from session and language coach | |
| system_content = ( | |
| "You are Nova, a friendly and supportive conversation partner. " | |
| "Your goal is to engage in natural, flowing conversation that feels human and authentic. " | |
| "Respond thoughtfully and maintain the conversation context. " | |
| "Keep your responses concise and conversational. " | |
| "Ask open-ended questions to encourage the user to practice speaking. " | |
| "Never mention that you are an AI unless explicitly asked." | |
| ) | |
| # Add conversation history context if available | |
| if session_manager: | |
| conversation_context = session_manager.get_conversation_context() | |
| if conversation_context: | |
| system_content += f"\n\nConversation history:\n{conversation_context}" | |
| # Add language coaching context if available | |
| if language_coach and session_manager: | |
| coaching_context = language_coach.get_coaching_context(session_manager.session_id) | |
| if coaching_context: | |
| system_content += f"\n\n{coaching_context}" | |
| # Create the system event | |
| system_event = { | |
| "event": { | |
| "systemInput": { | |
| "content": system_content | |
| } | |
| } | |
| } | |
| try: | |
| # Send the system event | |
| self.stream.send_chunk({ | |
| "chunk": { | |
| "bytes": json.dumps(system_event).encode('utf-8') | |
| } | |
| }) | |
| # Wait for acknowledgment | |
| time.sleep(0.5) | |
| except Exception as e: | |
| print(f"Error sending system message: {e}") | |
| def close(self): | |
| """Close the streaming connection""" | |
| if self.stream: | |
| try: | |
| # Send session end event | |
| end_event = { | |
| "event": { | |
| "sessionEnd": {} | |
| } | |
| } | |
| self.stream.send_chunk({ | |
| "chunk": { | |
| "bytes": json.dumps(end_event).encode('utf-8') | |
| } | |
| }) | |
| # Close the stream | |
| self.stream.done() | |
| self.session_active = False | |
| print("Nova stream closed") | |
| except Exception as e: | |
| print(f"Error closing stream: {e}") |