import sounddevice as sd import soundfile as sf import numpy as np import base64 import io import time import threading from config import INPUT_SAMPLE_RATE, OUTPUT_SAMPLE_RATE, BIT_DEPTH, CHANNELS, CHUNK_SIZE class AudioStreamer: def __init__(self): self.input_rate = INPUT_SAMPLE_RATE self.output_rate = OUTPUT_SAMPLE_RATE self.channels = CHANNELS self.chunk_size = CHUNK_SIZE # Samples per chunk self.last_audio = None self.recording = False self.stream = None def record_audio_stream(self): """Generator that yields audio chunks from microphone""" self.recording = True # Calculate chunk duration in seconds chunk_duration = self.chunk_size / self.input_rate print("Recording... Speak now. (Press Ctrl+C to stop)") try: while self.recording: # Record a small chunk of audio audio_chunk = sd.rec( int(self.chunk_size), samplerate=self.input_rate, channels=self.channels, dtype='int16' ) sd.wait() # Wait until recording is finished # Convert to bytes for streaming to AWS audio_bytes = audio_chunk.tobytes() yield audio_bytes except KeyboardInterrupt: print("Recording stopped.") finally: self.recording = False def play_audio_stream(self, audio_data): """Play back audio data (supports base64 encoded or raw bytes)""" if not audio_data: print("No audio data to play") return # Store last audio for replay feature self.last_audio = audio_data # Handle base64 encoded audio if isinstance(audio_data, str) and audio_data.strip().startswith("b'"): try: # Extract the base64 content b64_content = audio_data.split("b'", 1)[1].rsplit("'", 1)[0] audio_data = base64.b64decode(b64_content) except Exception as e: print(f"Error decoding base64 audio: {e}") return # Convert bytes to numpy array try: # Convert bytes to numpy array (assuming 16-bit PCM format) audio_array = np.frombuffer(audio_data, dtype=np.int16) # Play the audio sd.play(audio_array, self.output_rate) sd.wait() # Wait until audio is finished playing except Exception as e: print(f"Error playing audio: {e}") def replay_last_audio(self): """Replay the last audio response""" if self.last_audio: self.play_audio_stream(self.last_audio) else: print("No previous audio to replay") def encode_audio_for_nova(self, audio_data): """Convert audio to the format required by Nova""" # Ensure we have raw bytes if isinstance(audio_data, str): if audio_data.startswith("b'"): # Extract base64 content b64_content = audio_data.split("b'", 1)[1].rsplit("'", 1)[0] audio_data = base64.b64decode(b64_content) # If it's already bytes, return as is if isinstance(audio_data, bytes): return audio_data # If it's a numpy array, convert to bytes if isinstance(audio_data, np.ndarray): return audio_data.tobytes() return audio_data def decode_nova_audio(self, audio_data): """Convert Nova's audio response to playable format""" # Nova returns base64-encoded LPCM at 24kHz if isinstance(audio_data, str): if audio_data.startswith("b'"): # Extract base64 content b64_content = audio_data.split("b'", 1)[1].rsplit("'", 1)[0] return base64.b64decode(b64_content) # If it's already in bytes, return as is return audio_data def cleanup(self): """Stop recording and clean up resources""" self.recording = False