app_trial_current / audio_utils.py
SreekarB's picture
Upload 9 files
e2cd21d verified
import sounddevice as sd
import soundfile as sf
import numpy as np
import base64
import io
import time
import threading
from config import INPUT_SAMPLE_RATE, OUTPUT_SAMPLE_RATE, BIT_DEPTH, CHANNELS, CHUNK_SIZE
class AudioStreamer:
def __init__(self):
self.input_rate = INPUT_SAMPLE_RATE
self.output_rate = OUTPUT_SAMPLE_RATE
self.channels = CHANNELS
self.chunk_size = CHUNK_SIZE # Samples per chunk
self.last_audio = None
self.recording = False
self.stream = None
def record_audio_stream(self):
"""Generator that yields audio chunks from microphone"""
self.recording = True
# Calculate chunk duration in seconds
chunk_duration = self.chunk_size / self.input_rate
print("Recording... Speak now. (Press Ctrl+C to stop)")
try:
while self.recording:
# Record a small chunk of audio
audio_chunk = sd.rec(
int(self.chunk_size),
samplerate=self.input_rate,
channels=self.channels,
dtype='int16'
)
sd.wait() # Wait until recording is finished
# Convert to bytes for streaming to AWS
audio_bytes = audio_chunk.tobytes()
yield audio_bytes
except KeyboardInterrupt:
print("Recording stopped.")
finally:
self.recording = False
def play_audio_stream(self, audio_data):
"""Play back audio data (supports base64 encoded or raw bytes)"""
if not audio_data:
print("No audio data to play")
return
# Store last audio for replay feature
self.last_audio = audio_data
# Handle base64 encoded audio
if isinstance(audio_data, str) and audio_data.strip().startswith("b'"):
try:
# Extract the base64 content
b64_content = audio_data.split("b'", 1)[1].rsplit("'", 1)[0]
audio_data = base64.b64decode(b64_content)
except Exception as e:
print(f"Error decoding base64 audio: {e}")
return
# Convert bytes to numpy array
try:
# Convert bytes to numpy array (assuming 16-bit PCM format)
audio_array = np.frombuffer(audio_data, dtype=np.int16)
# Play the audio
sd.play(audio_array, self.output_rate)
sd.wait() # Wait until audio is finished playing
except Exception as e:
print(f"Error playing audio: {e}")
def replay_last_audio(self):
"""Replay the last audio response"""
if self.last_audio:
self.play_audio_stream(self.last_audio)
else:
print("No previous audio to replay")
def encode_audio_for_nova(self, audio_data):
"""Convert audio to the format required by Nova"""
# Ensure we have raw bytes
if isinstance(audio_data, str):
if audio_data.startswith("b'"):
# Extract base64 content
b64_content = audio_data.split("b'", 1)[1].rsplit("'", 1)[0]
audio_data = base64.b64decode(b64_content)
# If it's already bytes, return as is
if isinstance(audio_data, bytes):
return audio_data
# If it's a numpy array, convert to bytes
if isinstance(audio_data, np.ndarray):
return audio_data.tobytes()
return audio_data
def decode_nova_audio(self, audio_data):
"""Convert Nova's audio response to playable format"""
# Nova returns base64-encoded LPCM at 24kHz
if isinstance(audio_data, str):
if audio_data.startswith("b'"):
# Extract base64 content
b64_content = audio_data.split("b'", 1)[1].rsplit("'", 1)[0]
return base64.b64decode(b64_content)
# If it's already in bytes, return as is
return audio_data
def cleanup(self):
"""Stop recording and clean up resources"""
self.recording = False