""" Audio utilities for Hugging Face Spaces integration. This module provides audio streaming for Hugging Face Spaces environments. """ import os import asyncio import numpy as np import random import time import threading import base64 import json import tempfile import soundfile as sf from concurrent.futures import ThreadPoolExecutor # Try to import the Hugging Face-specific audio utilities try: from transformers.pipelines.audio_utils import ffmpeg_microphone_live HF_AUDIO_AVAILABLE = True except ImportError: HF_AUDIO_AVAILABLE = False print("Warning: transformers.pipelines.audio_utils not available, will use fallback audio simulation") class HFAudioStreamer: """Audio streamer for Hugging Face Spaces that works with or without real audio devices""" def __init__(self, stream_manager): """Initialize the HF Audio Streamer""" self.stream_manager = stream_manager self.is_streaming = False self.use_ffmpeg = HF_AUDIO_AVAILABLE self.mic_stream = None self.executor = ThreadPoolExecutor(max_workers=2) self.loop = asyncio.get_event_loop() # Initialize tasks self.input_task = None self.output_task = None # Check if we're in HF Spaces self.is_hf_spaces = "SPACE_ID" in os.environ or ("SYSTEM" in os.environ and os.environ.get("SYSTEM") == "spaces") # Create output directory for audio files self.output_dir = os.path.join(tempfile.gettempdir(), "nova_output") os.makedirs(self.output_dir, exist_ok=True) print(f"HF Audio Streamer initialized. Using ffmpeg: {self.use_ffmpeg}, In HF Spaces: {self.is_hf_spaces}") print(f"Audio output will be saved to: {self.output_dir}") async def initialize_ffmpeg_mic(self): """Initialize the FFMPEG microphone if available""" if not self.use_ffmpeg: return False # If we're in HF Spaces, expect ALSA errors and handle them gracefully if self.is_hf_spaces: print("HF Spaces detected - ALSA errors are expected and will be handled") # Set environment variable to suppress ALSA errors os.environ['AUDIODEV'] = 'null' try: # Create in a thread to avoid blocking sampling_rate = 16000 # 16kHz as required by Nova Sonic chunk_length_s = 0.5 # Process 0.5 seconds at a time stream_chunk_s = 0.25 # Stream in 0.25 second chunks # In HF Spaces, we expect this to fail with ALSA errors # But we'll try anyway in case they add audio support later self.mic_stream = await self.loop.run_in_executor( self.executor, lambda: ffmpeg_microphone_live( sampling_rate=sampling_rate, chunk_length_s=chunk_length_s, stream_chunk_s=stream_chunk_s ) ) print("Successfully initialized FFMPEG microphone") return True except Exception as e: # Check for ALSA errors which are expected in Hugging Face Spaces error_str = str(e) if "ALSA" in error_str and "PCM" in error_str: print("ALSA audio device errors detected - this is expected in cloud environments") print("Switching to simulated audio input (no real microphone will be used)") else: print(f"Error initializing FFMPEG microphone: {e}") # Always fall back to simulated audio in HF Spaces self.use_ffmpeg = False return False async def ffmpeg_audio_processor(self): """Process audio from ffmpeg microphone""" if not self.mic_stream: print("FFMPEG microphone not initialized") self.use_ffmpeg = False return print("Starting FFMPEG audio processing") try: # Track for logging chunks_processed = 0 last_log_time = time.time() # Use the mic_stream as an iterator for audio_chunk in self.mic_stream: if not self.is_streaming: break # Process the chunk if isinstance(audio_chunk, np.ndarray): # Convert float32 [-1.0, 1.0] to int16 for Nova Sonic audio_int16 = (audio_chunk * 32767).astype(np.int16) audio_bytes = audio_int16.tobytes() # Send to Bedrock self.stream_manager.add_audio_chunk(audio_bytes) # Log periodically to show activity chunks_processed += 1 current_time = time.time() if current_time - last_log_time > 2.0: print(f"FFMPEG audio: processed {chunks_processed} chunks") chunks_processed = 0 last_log_time = current_time # Add a small sleep to prevent tight loops await asyncio.sleep(0.01) except Exception as e: print(f"Error in FFMPEG audio processor: {e}") # If the ffmpeg processor fails, fall back to simulated audio self.use_ffmpeg = False # Start simulated input if we're still streaming if self.is_streaming: print("Falling back to simulated audio input") asyncio.create_task(self.generate_simulated_input()) finally: # Cleanup if hasattr(self.mic_stream, 'close'): try: self.mic_stream.close() except: pass self.mic_stream = None print("FFMPEG audio processor stopped") async def generate_simulated_input(self): """Generate simulated audio input when real microphone isn't available""" print("Starting simulated audio input") # Create a few temporary audio files with silence/noise audio_files = [] for i in range(3): noise_level = 0.01 * (i + 1) # Vary noise level duration = 1.0 # 1 second of audio samples = np.random.normal(0, noise_level, int(16000 * duration)) # Create temporary file with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f: sf.write(f.name, samples, 16000) audio_files.append(f.name) try: # Send simulated speech in a pattern sequence_count = 0 while self.is_streaming: # Choose a random file file_path = np.random.choice(audio_files) # Load the audio try: audio_data, _ = sf.read(file_path) audio_int16 = (audio_data * 32767).astype(np.int16) audio_bytes = audio_int16.tobytes() # Send to Bedrock self.stream_manager.add_audio_chunk(audio_bytes) except Exception as e: print(f"Error processing simulated audio file: {e}") # Wait between chunks await asyncio.sleep(0.2) # Increment sequence counter sequence_count += 1 # After a sequence of noise, send text to get a response if sequence_count >= 10: # After 10 chunks (about 2 seconds) sequence_count = 0 # Send text instead of more simulated audio messages = [ "Hello there", "How are you today?", "Tell me something interesting", "What's the weather like?", "I'm learning to speak more fluently" ] message = np.random.choice(messages) await self.send_text_message(message) # Add transcription to the output queue for UI await self.stream_manager.output_queue.put({ "event": { "textOutput": { "content": message, "role": "USER" } } }) # Wait for Nova to respond await asyncio.sleep(3.0) except Exception as e: print(f"Error in simulated audio generator: {e}") import traceback traceback.print_exc() finally: # Clean up temp files for file_path in audio_files: try: os.unlink(file_path) except: pass async def play_output_audio(self): """Handle audio output from Nova Sonic""" while self.is_streaming: try: # Get audio data from the stream manager's queue audio_data = await asyncio.wait_for( self.stream_manager.audio_output_queue.get(), timeout=0.5 ) if audio_data and self.is_streaming: # Store info in output queue for other parts of the app self.stream_manager.output_queue.put_nowait({ "event": { "audioOutput": { "content": "Audio received from Nova" } } }) # In HF Spaces, we can't play audio directly, but we can save it timestamp = int(time.time()) output_path = os.path.join(self.output_dir, f"nova_response_{timestamp}.wav") try: # Convert from raw PCM to numpy for soundfile audio_np = np.frombuffer(audio_data, dtype=np.int16) sf.write(output_path, audio_np, 24000) # Nova outputs at 24kHz print(f"Saved Nova audio response to {output_path}") except Exception as e: print(f"Error saving audio response: {e}") except asyncio.TimeoutError: # No data available within timeout continue except Exception as e: if self.is_streaming: print(f"Error handling output audio: {e}") import traceback traceback.print_exc() await asyncio.sleep(0.1) async def start_streaming(self): """Start streaming audio""" if self.is_streaming: return print(f"Starting audio streaming in HF mode...") # For HF Spaces, we'll use our enhanced error handling if self.is_hf_spaces: # Set environment variables to help with audio issues os.environ['AUDIODEV'] = 'null' os.environ['SDL_AUDIODRIVER'] = 'dummy' # Send audio content start event await self.stream_manager.send_audio_content_start_event() self.is_streaming = True # Start with a welcome message from Nova await self.send_text_message("Hi there! I'm Nova, your conversation partner. How are you doing today?") # In HF Spaces, just go straight to simulated audio to avoid ALSA errors if self.is_hf_spaces: print("Running in Hugging Face Spaces - using simulated audio") self.use_ffmpeg = False self.input_task = asyncio.create_task(self.generate_simulated_input()) self.output_task = asyncio.create_task(self.play_output_audio()) # Let the user know what's happening print("Speech-to-speech functionality is active:") print("- Simulated audio is being sent to Nova Sonic") print("- Nova's responses will be saved as WAV files") print("- Conversation will be shown as text transcriptions") return # For non-HF environments, try the ffmpeg approach tasks = [] # Initialize FFMPEG mic if available and create audio input task if self.use_ffmpeg: ffmpeg_available = await self.initialize_ffmpeg_mic() if ffmpeg_available: self.input_task = asyncio.create_task(self.ffmpeg_audio_processor()) tasks.append(self.input_task) else: self.use_ffmpeg = False # Fall back to simulated audio if FFMPEG isn't available if not self.use_ffmpeg: self.input_task = asyncio.create_task(self.generate_simulated_input()) tasks.append(self.input_task) # Start output processing self.output_task = asyncio.create_task(self.play_output_audio()) tasks.append(self.output_task) # Let the tasks run - we won't wait for input() here because that's handled in the UI # This will allow the tasks to continue running until stop_streaming is called async def send_text_message(self, text): """Send a text message to Nova to simulate user input""" try: # Create text content start event content_name = str(time.time()) text_content_start = self.stream_manager.TEXT_CONTENT_START_EVENT % ( self.stream_manager.prompt_name, content_name, "USER" ) await self.stream_manager.send_raw_event(text_content_start) # Create text input event text_input = self.stream_manager.TEXT_INPUT_EVENT % ( self.stream_manager.prompt_name, content_name, text ) await self.stream_manager.send_raw_event(text_input) # Create content end event content_end = self.stream_manager.CONTENT_END_EVENT % ( self.stream_manager.prompt_name, content_name ) await self.stream_manager.send_raw_event(content_end) print(f"Sent text message to Nova: {text}") # Also add message to output queue for UI await self.stream_manager.output_queue.put({ "event": { "textOutput": { "content": text, "role": "USER" } } }) return True except Exception as e: print(f"Error sending text message: {e}") return False async def stop_streaming(self): """Stop streaming audio""" if not self.is_streaming: return self.is_streaming = False print("Stopping HF audio streaming...") # Cancel all tasks tasks = [] if self.input_task and not self.input_task.done(): self.input_task.cancel() tasks.append(self.input_task) if self.output_task and not self.output_task.done(): self.output_task.cancel() tasks.append(self.output_task) if tasks: await asyncio.gather(*tasks, return_exceptions=True) # Close ffmpeg mic if open if self.mic_stream and hasattr(self.mic_stream, 'close'): try: self.mic_stream.close() except: pass self.mic_stream = None # Shutdown executor self.executor.shutdown(wait=False) # Always close the stream manager await self.stream_manager.close() print("HF audio streaming stopped")