Spaces:
Sleeping
Sleeping
| """ | |
| Audio utilities for Hugging Face Spaces integration. | |
| This module provides audio streaming for Hugging Face Spaces environments. | |
| """ | |
| import os | |
| import asyncio | |
| import numpy as np | |
| import random | |
| import time | |
| import threading | |
| import base64 | |
| import json | |
| import tempfile | |
| import soundfile as sf | |
| from concurrent.futures import ThreadPoolExecutor | |
| # Try to import the Hugging Face-specific audio utilities | |
| try: | |
| from transformers.pipelines.audio_utils import ffmpeg_microphone_live | |
| HF_AUDIO_AVAILABLE = True | |
| except ImportError: | |
| HF_AUDIO_AVAILABLE = False | |
| print("Warning: transformers.pipelines.audio_utils not available, will use fallback audio simulation") | |
| class HFAudioStreamer: | |
| """Audio streamer for Hugging Face Spaces that works with or without real audio devices""" | |
| def __init__(self, stream_manager): | |
| """Initialize the HF Audio Streamer""" | |
| self.stream_manager = stream_manager | |
| self.is_streaming = False | |
| self.use_ffmpeg = HF_AUDIO_AVAILABLE | |
| self.mic_stream = None | |
| self.executor = ThreadPoolExecutor(max_workers=2) | |
| self.loop = asyncio.get_event_loop() | |
| # Initialize tasks | |
| self.input_task = None | |
| self.output_task = None | |
| # Check if we're in HF Spaces | |
| self.is_hf_spaces = "SPACE_ID" in os.environ or ("SYSTEM" in os.environ and os.environ.get("SYSTEM") == "spaces") | |
| # Create output directory for audio files | |
| self.output_dir = os.path.join(tempfile.gettempdir(), "nova_output") | |
| os.makedirs(self.output_dir, exist_ok=True) | |
| print(f"HF Audio Streamer initialized. Using ffmpeg: {self.use_ffmpeg}, In HF Spaces: {self.is_hf_spaces}") | |
| print(f"Audio output will be saved to: {self.output_dir}") | |
| async def initialize_ffmpeg_mic(self): | |
| """Initialize the FFMPEG microphone if available""" | |
| if not self.use_ffmpeg: | |
| return False | |
| # If we're in HF Spaces, expect ALSA errors and handle them gracefully | |
| if self.is_hf_spaces: | |
| print("HF Spaces detected - ALSA errors are expected and will be handled") | |
| # Set environment variable to suppress ALSA errors | |
| os.environ['AUDIODEV'] = 'null' | |
| try: | |
| # Create in a thread to avoid blocking | |
| sampling_rate = 16000 # 16kHz as required by Nova Sonic | |
| chunk_length_s = 0.5 # Process 0.5 seconds at a time | |
| stream_chunk_s = 0.25 # Stream in 0.25 second chunks | |
| # In HF Spaces, we expect this to fail with ALSA errors | |
| # But we'll try anyway in case they add audio support later | |
| self.mic_stream = await self.loop.run_in_executor( | |
| self.executor, | |
| lambda: ffmpeg_microphone_live( | |
| sampling_rate=sampling_rate, | |
| chunk_length_s=chunk_length_s, | |
| stream_chunk_s=stream_chunk_s | |
| ) | |
| ) | |
| print("Successfully initialized FFMPEG microphone") | |
| return True | |
| except Exception as e: | |
| # Check for ALSA errors which are expected in Hugging Face Spaces | |
| error_str = str(e) | |
| if "ALSA" in error_str and "PCM" in error_str: | |
| print("ALSA audio device errors detected - this is expected in cloud environments") | |
| print("Switching to simulated audio input (no real microphone will be used)") | |
| else: | |
| print(f"Error initializing FFMPEG microphone: {e}") | |
| # Always fall back to simulated audio in HF Spaces | |
| self.use_ffmpeg = False | |
| return False | |
| async def ffmpeg_audio_processor(self): | |
| """Process audio from ffmpeg microphone""" | |
| if not self.mic_stream: | |
| print("FFMPEG microphone not initialized") | |
| self.use_ffmpeg = False | |
| return | |
| print("Starting FFMPEG audio processing") | |
| try: | |
| # Track for logging | |
| chunks_processed = 0 | |
| last_log_time = time.time() | |
| # Use the mic_stream as an iterator | |
| for audio_chunk in self.mic_stream: | |
| if not self.is_streaming: | |
| break | |
| # Process the chunk | |
| if isinstance(audio_chunk, np.ndarray): | |
| # Convert float32 [-1.0, 1.0] to int16 for Nova Sonic | |
| audio_int16 = (audio_chunk * 32767).astype(np.int16) | |
| audio_bytes = audio_int16.tobytes() | |
| # Send to Bedrock | |
| self.stream_manager.add_audio_chunk(audio_bytes) | |
| # Log periodically to show activity | |
| chunks_processed += 1 | |
| current_time = time.time() | |
| if current_time - last_log_time > 2.0: | |
| print(f"FFMPEG audio: processed {chunks_processed} chunks") | |
| chunks_processed = 0 | |
| last_log_time = current_time | |
| # Add a small sleep to prevent tight loops | |
| await asyncio.sleep(0.01) | |
| except Exception as e: | |
| print(f"Error in FFMPEG audio processor: {e}") | |
| # If the ffmpeg processor fails, fall back to simulated audio | |
| self.use_ffmpeg = False | |
| # Start simulated input if we're still streaming | |
| if self.is_streaming: | |
| print("Falling back to simulated audio input") | |
| asyncio.create_task(self.generate_simulated_input()) | |
| finally: | |
| # Cleanup | |
| if hasattr(self.mic_stream, 'close'): | |
| try: | |
| self.mic_stream.close() | |
| except: | |
| pass | |
| self.mic_stream = None | |
| print("FFMPEG audio processor stopped") | |
| async def generate_simulated_input(self): | |
| """Generate simulated audio input when real microphone isn't available""" | |
| print("Starting simulated audio input") | |
| # Create a few temporary audio files with silence/noise | |
| audio_files = [] | |
| for i in range(3): | |
| noise_level = 0.01 * (i + 1) # Vary noise level | |
| duration = 1.0 # 1 second of audio | |
| samples = np.random.normal(0, noise_level, int(16000 * duration)) | |
| # Create temporary file | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f: | |
| sf.write(f.name, samples, 16000) | |
| audio_files.append(f.name) | |
| try: | |
| # Send simulated speech in a pattern | |
| sequence_count = 0 | |
| while self.is_streaming: | |
| # Choose a random file | |
| file_path = np.random.choice(audio_files) | |
| # Load the audio | |
| try: | |
| audio_data, _ = sf.read(file_path) | |
| audio_int16 = (audio_data * 32767).astype(np.int16) | |
| audio_bytes = audio_int16.tobytes() | |
| # Send to Bedrock | |
| self.stream_manager.add_audio_chunk(audio_bytes) | |
| except Exception as e: | |
| print(f"Error processing simulated audio file: {e}") | |
| # Wait between chunks | |
| await asyncio.sleep(0.2) | |
| # Increment sequence counter | |
| sequence_count += 1 | |
| # After a sequence of noise, send text to get a response | |
| if sequence_count >= 10: # After 10 chunks (about 2 seconds) | |
| sequence_count = 0 | |
| # Send text instead of more simulated audio | |
| messages = [ | |
| "Hello there", | |
| "How are you today?", | |
| "Tell me something interesting", | |
| "What's the weather like?", | |
| "I'm learning to speak more fluently" | |
| ] | |
| message = np.random.choice(messages) | |
| await self.send_text_message(message) | |
| # Add transcription to the output queue for UI | |
| await self.stream_manager.output_queue.put({ | |
| "event": { | |
| "textOutput": { | |
| "content": message, | |
| "role": "USER" | |
| } | |
| } | |
| }) | |
| # Wait for Nova to respond | |
| await asyncio.sleep(3.0) | |
| except Exception as e: | |
| print(f"Error in simulated audio generator: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| finally: | |
| # Clean up temp files | |
| for file_path in audio_files: | |
| try: | |
| os.unlink(file_path) | |
| except: | |
| pass | |
| async def play_output_audio(self): | |
| """Handle audio output from Nova Sonic""" | |
| while self.is_streaming: | |
| try: | |
| # Get audio data from the stream manager's queue | |
| audio_data = await asyncio.wait_for( | |
| self.stream_manager.audio_output_queue.get(), | |
| timeout=0.5 | |
| ) | |
| if audio_data and self.is_streaming: | |
| # Store info in output queue for other parts of the app | |
| self.stream_manager.output_queue.put_nowait({ | |
| "event": { | |
| "audioOutput": { | |
| "content": "Audio received from Nova" | |
| } | |
| } | |
| }) | |
| # In HF Spaces, we can't play audio directly, but we can save it | |
| timestamp = int(time.time()) | |
| output_path = os.path.join(self.output_dir, f"nova_response_{timestamp}.wav") | |
| try: | |
| # Convert from raw PCM to numpy for soundfile | |
| audio_np = np.frombuffer(audio_data, dtype=np.int16) | |
| sf.write(output_path, audio_np, 24000) # Nova outputs at 24kHz | |
| print(f"Saved Nova audio response to {output_path}") | |
| except Exception as e: | |
| print(f"Error saving audio response: {e}") | |
| except asyncio.TimeoutError: | |
| # No data available within timeout | |
| continue | |
| except Exception as e: | |
| if self.is_streaming: | |
| print(f"Error handling output audio: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| await asyncio.sleep(0.1) | |
| async def start_streaming(self): | |
| """Start streaming audio""" | |
| if self.is_streaming: | |
| return | |
| print(f"Starting audio streaming in HF mode...") | |
| # For HF Spaces, we'll use our enhanced error handling | |
| if self.is_hf_spaces: | |
| # Set environment variables to help with audio issues | |
| os.environ['AUDIODEV'] = 'null' | |
| os.environ['SDL_AUDIODRIVER'] = 'dummy' | |
| # Send audio content start event | |
| await self.stream_manager.send_audio_content_start_event() | |
| self.is_streaming = True | |
| # Start with a welcome message from Nova | |
| await self.send_text_message("Hi there! I'm Nova, your conversation partner. How are you doing today?") | |
| # In HF Spaces, just go straight to simulated audio to avoid ALSA errors | |
| if self.is_hf_spaces: | |
| print("Running in Hugging Face Spaces - using simulated audio") | |
| self.use_ffmpeg = False | |
| self.input_task = asyncio.create_task(self.generate_simulated_input()) | |
| self.output_task = asyncio.create_task(self.play_output_audio()) | |
| # Let the user know what's happening | |
| print("Speech-to-speech functionality is active:") | |
| print("- Simulated audio is being sent to Nova Sonic") | |
| print("- Nova's responses will be saved as WAV files") | |
| print("- Conversation will be shown as text transcriptions") | |
| return | |
| # For non-HF environments, try the ffmpeg approach | |
| tasks = [] | |
| # Initialize FFMPEG mic if available and create audio input task | |
| if self.use_ffmpeg: | |
| ffmpeg_available = await self.initialize_ffmpeg_mic() | |
| if ffmpeg_available: | |
| self.input_task = asyncio.create_task(self.ffmpeg_audio_processor()) | |
| tasks.append(self.input_task) | |
| else: | |
| self.use_ffmpeg = False | |
| # Fall back to simulated audio if FFMPEG isn't available | |
| if not self.use_ffmpeg: | |
| self.input_task = asyncio.create_task(self.generate_simulated_input()) | |
| tasks.append(self.input_task) | |
| # Start output processing | |
| self.output_task = asyncio.create_task(self.play_output_audio()) | |
| tasks.append(self.output_task) | |
| # Let the tasks run - we won't wait for input() here because that's handled in the UI | |
| # This will allow the tasks to continue running until stop_streaming is called | |
| async def send_text_message(self, text): | |
| """Send a text message to Nova to simulate user input""" | |
| try: | |
| # Create text content start event | |
| content_name = str(time.time()) | |
| text_content_start = self.stream_manager.TEXT_CONTENT_START_EVENT % ( | |
| self.stream_manager.prompt_name, | |
| content_name, | |
| "USER" | |
| ) | |
| await self.stream_manager.send_raw_event(text_content_start) | |
| # Create text input event | |
| text_input = self.stream_manager.TEXT_INPUT_EVENT % ( | |
| self.stream_manager.prompt_name, | |
| content_name, | |
| text | |
| ) | |
| await self.stream_manager.send_raw_event(text_input) | |
| # Create content end event | |
| content_end = self.stream_manager.CONTENT_END_EVENT % ( | |
| self.stream_manager.prompt_name, | |
| content_name | |
| ) | |
| await self.stream_manager.send_raw_event(content_end) | |
| print(f"Sent text message to Nova: {text}") | |
| # Also add message to output queue for UI | |
| await self.stream_manager.output_queue.put({ | |
| "event": { | |
| "textOutput": { | |
| "content": text, | |
| "role": "USER" | |
| } | |
| } | |
| }) | |
| return True | |
| except Exception as e: | |
| print(f"Error sending text message: {e}") | |
| return False | |
| async def stop_streaming(self): | |
| """Stop streaming audio""" | |
| if not self.is_streaming: | |
| return | |
| self.is_streaming = False | |
| print("Stopping HF audio streaming...") | |
| # Cancel all tasks | |
| tasks = [] | |
| if self.input_task and not self.input_task.done(): | |
| self.input_task.cancel() | |
| tasks.append(self.input_task) | |
| if self.output_task and not self.output_task.done(): | |
| self.output_task.cancel() | |
| tasks.append(self.output_task) | |
| if tasks: | |
| await asyncio.gather(*tasks, return_exceptions=True) | |
| # Close ffmpeg mic if open | |
| if self.mic_stream and hasattr(self.mic_stream, 'close'): | |
| try: | |
| self.mic_stream.close() | |
| except: | |
| pass | |
| self.mic_stream = None | |
| # Shutdown executor | |
| self.executor.shutdown(wait=False) | |
| # Always close the stream manager | |
| await self.stream_manager.close() | |
| print("HF audio streaming stopped") |