Spaces:
Sleeping
Sleeping
| import sounddevice as sd | |
| import soundfile as sf | |
| import numpy as np | |
| import base64 | |
| import io | |
| import time | |
| import threading | |
| from config import INPUT_SAMPLE_RATE, OUTPUT_SAMPLE_RATE, BIT_DEPTH, CHANNELS, CHUNK_SIZE | |
| class AudioStreamer: | |
| def __init__(self): | |
| self.input_rate = INPUT_SAMPLE_RATE | |
| self.output_rate = OUTPUT_SAMPLE_RATE | |
| self.channels = CHANNELS | |
| self.chunk_size = CHUNK_SIZE # Samples per chunk | |
| self.last_audio = None | |
| self.recording = False | |
| self.stream = None | |
| def record_audio_stream(self): | |
| """Generator that yields audio chunks from microphone""" | |
| self.recording = True | |
| # Calculate chunk duration in seconds | |
| chunk_duration = self.chunk_size / self.input_rate | |
| print("Recording... Speak now. (Press Ctrl+C to stop)") | |
| try: | |
| while self.recording: | |
| # Record a small chunk of audio | |
| audio_chunk = sd.rec( | |
| int(self.chunk_size), | |
| samplerate=self.input_rate, | |
| channels=self.channels, | |
| dtype='int16' | |
| ) | |
| sd.wait() # Wait until recording is finished | |
| # Convert to bytes for streaming to AWS | |
| audio_bytes = audio_chunk.tobytes() | |
| yield audio_bytes | |
| except KeyboardInterrupt: | |
| print("Recording stopped.") | |
| finally: | |
| self.recording = False | |
| def play_audio_stream(self, audio_data): | |
| """Play back audio data (supports base64 encoded or raw bytes)""" | |
| if not audio_data: | |
| print("No audio data to play") | |
| return | |
| # Store last audio for replay feature | |
| self.last_audio = audio_data | |
| # Handle base64 encoded audio | |
| if isinstance(audio_data, str) and audio_data.strip().startswith("b'"): | |
| try: | |
| # Extract the base64 content | |
| b64_content = audio_data.split("b'", 1)[1].rsplit("'", 1)[0] | |
| audio_data = base64.b64decode(b64_content) | |
| except Exception as e: | |
| print(f"Error decoding base64 audio: {e}") | |
| return | |
| # Convert bytes to numpy array | |
| try: | |
| # Convert bytes to numpy array (assuming 16-bit PCM format) | |
| audio_array = np.frombuffer(audio_data, dtype=np.int16) | |
| # Play the audio | |
| sd.play(audio_array, self.output_rate) | |
| sd.wait() # Wait until audio is finished playing | |
| except Exception as e: | |
| print(f"Error playing audio: {e}") | |
| def replay_last_audio(self): | |
| """Replay the last audio response""" | |
| if self.last_audio: | |
| self.play_audio_stream(self.last_audio) | |
| else: | |
| print("No previous audio to replay") | |
| def encode_audio_for_nova(self, audio_data): | |
| """Convert audio to the format required by Nova""" | |
| # Ensure we have raw bytes | |
| if isinstance(audio_data, str): | |
| if audio_data.startswith("b'"): | |
| # Extract base64 content | |
| b64_content = audio_data.split("b'", 1)[1].rsplit("'", 1)[0] | |
| audio_data = base64.b64decode(b64_content) | |
| # If it's already bytes, return as is | |
| if isinstance(audio_data, bytes): | |
| return audio_data | |
| # If it's a numpy array, convert to bytes | |
| if isinstance(audio_data, np.ndarray): | |
| return audio_data.tobytes() | |
| return audio_data | |
| def decode_nova_audio(self, audio_data): | |
| """Convert Nova's audio response to playable format""" | |
| # Nova returns base64-encoded LPCM at 24kHz | |
| if isinstance(audio_data, str): | |
| if audio_data.startswith("b'"): | |
| # Extract base64 content | |
| b64_content = audio_data.split("b'", 1)[1].rsplit("'", 1)[0] | |
| return base64.b64decode(b64_content) | |
| # If it's already in bytes, return as is | |
| return audio_data | |
| def cleanup(self): | |
| """Stop recording and clean up resources""" | |
| self.recording = False |