app_trial_current / nova_sonic.py
SreekarB's picture
Update nova_sonic.py
d2eb293 verified
import boto3
import base64
import json
import time
from config import MODEL_ID, REGION, INPUT_SAMPLE_RATE, BIT_DEPTH, CHANNELS, GREETING_TEXT
class NovaSonicClient:
def __init__(self):
self.client = boto3.client('bedrock-runtime', region_name=REGION)
self.model_id = "amazon.nova-sonic-v1:0" # Use the official Nova Sonic model ID
self.stream = None
self.session_active = False
def start_stream(self, session_manager=None, language_coach=None):
"""Initialize a bidirectional stream with Nova Sonic"""
try:
# Initialize the stream
self.stream = self.client.invoke_model_with_bidirectional_stream(
modelId=self.model_id,
contentType="application/vnd.amazon.eventstream",
accept="application/vnd.amazon.eventstream"
)
# Send session start event with configuration
self._send_session_start()
# Send system message to configure Nova's behavior
self._send_system_message(session_manager, language_coach)
# Send initial greeting as a text message
greeting_result = self.send_text(GREETING_TEXT)
if session_manager and greeting_result:
session_manager.set_last_response(greeting_result['audio'])
self.session_active = True
print("Connected to Nova Sonic")
return greeting_result
except Exception as e:
print(f"Error starting Nova stream: {e}")
return None
def _send_session_start(self):
"""Send session start event to initialize the stream"""
start_session_event = {
"event": {
"sessionStart": {
"inferenceConfiguration": {
"maxTokens": 1024,
"topP": 0.9,
"temperature": 0.7
},
"audioOutputConfiguration": {
"sampleRateHertz": 24000,
"sampleSizeBits": 16,
"channelCount": 1,
"encoding": "base64",
"audioType": "SPEECH"
}
}
}
}
try:
# Send the event as bytes
self.stream.send_chunk({
"chunk": {
"bytes": json.dumps(start_session_event).encode('utf-8')
}
})
# Wait for acknowledgment
time.sleep(0.5)
except Exception as e:
print(f"Error sending session start: {e}")
def send_audio(self, audio_chunk):
"""Send an audio chunk to Nova Sonic and get response"""
if not self.stream or not self.session_active:
print("Stream not initialized or session not active")
return None
try:
# Encode the audio as base64
audio_base64 = base64.b64encode(audio_chunk).decode('utf-8')
# Create the audio event
audio_event = {
"event": {
"audioInput": {
"data": audio_base64,
"endpointResponse": True
}
}
}
# Send the audio event
self.stream.send_chunk({
"chunk": {
"bytes": json.dumps(audio_event).encode('utf-8')
}
})
# Get the response
audio_data = b""
transcript = ""
# Process response chunks until we get the final one
response_complete = False
while not response_complete:
chunk = self.stream.read_chunk()
if "chunk" in chunk:
try:
payload = json.loads(chunk["chunk"]["bytes"].decode('utf-8'))
# Check for audio output
if "event" in payload and "audioOutput" in payload["event"]:
audio_base64 = payload["event"]["audioOutput"]["data"]
audio_data += base64.b64decode(audio_base64)
# Check for transcript
if "event" in payload and "transcript" in payload["event"]:
transcript = payload["event"]["transcript"]["text"]
# Check if this is the end of the response
if "event" in payload and "endpointResponse" in payload["event"]:
response_complete = True
except Exception as e:
print(f"Error processing response chunk: {e}")
return {
"audio": audio_data,
"transcript": transcript
}
except Exception as e:
print(f"Error in audio streaming: {e}")
return None
def send_text(self, text_message):
"""Send a text message instead of audio and get response"""
if not self.stream or not self.session_active:
print("Stream not initialized or session not active")
return None
try:
# Create the text input event
text_event = {
"event": {
"textInput": {
"text": text_message,
"endpointResponse": True
}
}
}
# Send the text event
self.stream.send_chunk({
"chunk": {
"bytes": json.dumps(text_event).encode('utf-8')
}
})
# Get the response
audio_data = b""
transcript = ""
# Process response chunks until we get the final one
response_complete = False
while not response_complete:
chunk = self.stream.read_chunk()
if "chunk" in chunk:
try:
payload = json.loads(chunk["chunk"]["bytes"].decode('utf-8'))
# Check for audio output
if "event" in payload and "audioOutput" in payload["event"]:
audio_base64 = payload["event"]["audioOutput"]["data"]
audio_data += base64.b64decode(audio_base64)
# Check for transcript
if "event" in payload and "transcript" in payload["event"]:
transcript = payload["event"]["transcript"]["text"]
# Check if this is the end of the response
if "event" in payload and "endpointResponse" in payload["event"]:
response_complete = True
except Exception as e:
print(f"Error processing response chunk: {e}")
return {
"audio": audio_data,
"transcript": transcript
}
except Exception as e:
print(f"Error sending text message: {e}")
return None
def _send_system_message(self, session_manager=None, language_coach=None):
"""Send a system message to configure Nova's behavior"""
# Build system message with context from session and language coach
system_content = (
"You are Nova, a friendly and supportive conversation partner. "
"Your goal is to engage in natural, flowing conversation that feels human and authentic. "
"Respond thoughtfully and maintain the conversation context. "
"Keep your responses concise and conversational. "
"Ask open-ended questions to encourage the user to practice speaking. "
"Never mention that you are an AI unless explicitly asked."
)
# Add conversation history context if available
if session_manager:
conversation_context = session_manager.get_conversation_context()
if conversation_context:
system_content += f"\n\nConversation history:\n{conversation_context}"
# Add language coaching context if available
if language_coach and session_manager:
coaching_context = language_coach.get_coaching_context(session_manager.session_id)
if coaching_context:
system_content += f"\n\n{coaching_context}"
# Create the system event
system_event = {
"event": {
"systemInput": {
"content": system_content
}
}
}
try:
# Send the system event
self.stream.send_chunk({
"chunk": {
"bytes": json.dumps(system_event).encode('utf-8')
}
})
# Wait for acknowledgment
time.sleep(0.5)
except Exception as e:
print(f"Error sending system message: {e}")
def close(self):
"""Close the streaming connection"""
if self.stream:
try:
# Send session end event
end_event = {
"event": {
"sessionEnd": {}
}
}
self.stream.send_chunk({
"chunk": {
"bytes": json.dumps(end_event).encode('utf-8')
}
})
# Close the stream
self.stream.done()
self.session_active = False
print("Nova stream closed")
except Exception as e:
print(f"Error closing stream: {e}")