app_trial_current / hf_audio_utils.py
SreekarB's picture
Upload 11 files
ffaf1db verified
"""
Audio utilities for Hugging Face Spaces integration.
This module provides audio streaming for Hugging Face Spaces environments.
"""
import os
import asyncio
import numpy as np
import random
import time
import threading
import base64
import json
import tempfile
import soundfile as sf
from concurrent.futures import ThreadPoolExecutor
# Try to import the Hugging Face-specific audio utilities
try:
from transformers.pipelines.audio_utils import ffmpeg_microphone_live
HF_AUDIO_AVAILABLE = True
except ImportError:
HF_AUDIO_AVAILABLE = False
print("Warning: transformers.pipelines.audio_utils not available, will use fallback audio simulation")
class HFAudioStreamer:
"""Audio streamer for Hugging Face Spaces that works with or without real audio devices"""
def __init__(self, stream_manager):
"""Initialize the HF Audio Streamer"""
self.stream_manager = stream_manager
self.is_streaming = False
self.use_ffmpeg = HF_AUDIO_AVAILABLE
self.mic_stream = None
self.executor = ThreadPoolExecutor(max_workers=2)
self.loop = asyncio.get_event_loop()
# Initialize tasks
self.input_task = None
self.output_task = None
# Check if we're in HF Spaces
self.is_hf_spaces = "SPACE_ID" in os.environ or ("SYSTEM" in os.environ and os.environ.get("SYSTEM") == "spaces")
# Create output directory for audio files
self.output_dir = os.path.join(tempfile.gettempdir(), "nova_output")
os.makedirs(self.output_dir, exist_ok=True)
print(f"HF Audio Streamer initialized. Using ffmpeg: {self.use_ffmpeg}, In HF Spaces: {self.is_hf_spaces}")
print(f"Audio output will be saved to: {self.output_dir}")
async def initialize_ffmpeg_mic(self):
"""Initialize the FFMPEG microphone if available"""
if not self.use_ffmpeg:
return False
# If we're in HF Spaces, expect ALSA errors and handle them gracefully
if self.is_hf_spaces:
print("HF Spaces detected - ALSA errors are expected and will be handled")
# Set environment variable to suppress ALSA errors
os.environ['AUDIODEV'] = 'null'
try:
# Create in a thread to avoid blocking
sampling_rate = 16000 # 16kHz as required by Nova Sonic
chunk_length_s = 0.5 # Process 0.5 seconds at a time
stream_chunk_s = 0.25 # Stream in 0.25 second chunks
# In HF Spaces, we expect this to fail with ALSA errors
# But we'll try anyway in case they add audio support later
self.mic_stream = await self.loop.run_in_executor(
self.executor,
lambda: ffmpeg_microphone_live(
sampling_rate=sampling_rate,
chunk_length_s=chunk_length_s,
stream_chunk_s=stream_chunk_s
)
)
print("Successfully initialized FFMPEG microphone")
return True
except Exception as e:
# Check for ALSA errors which are expected in Hugging Face Spaces
error_str = str(e)
if "ALSA" in error_str and "PCM" in error_str:
print("ALSA audio device errors detected - this is expected in cloud environments")
print("Switching to simulated audio input (no real microphone will be used)")
else:
print(f"Error initializing FFMPEG microphone: {e}")
# Always fall back to simulated audio in HF Spaces
self.use_ffmpeg = False
return False
async def ffmpeg_audio_processor(self):
"""Process audio from ffmpeg microphone"""
if not self.mic_stream:
print("FFMPEG microphone not initialized")
self.use_ffmpeg = False
return
print("Starting FFMPEG audio processing")
try:
# Track for logging
chunks_processed = 0
last_log_time = time.time()
# Use the mic_stream as an iterator
for audio_chunk in self.mic_stream:
if not self.is_streaming:
break
# Process the chunk
if isinstance(audio_chunk, np.ndarray):
# Convert float32 [-1.0, 1.0] to int16 for Nova Sonic
audio_int16 = (audio_chunk * 32767).astype(np.int16)
audio_bytes = audio_int16.tobytes()
# Send to Bedrock
self.stream_manager.add_audio_chunk(audio_bytes)
# Log periodically to show activity
chunks_processed += 1
current_time = time.time()
if current_time - last_log_time > 2.0:
print(f"FFMPEG audio: processed {chunks_processed} chunks")
chunks_processed = 0
last_log_time = current_time
# Add a small sleep to prevent tight loops
await asyncio.sleep(0.01)
except Exception as e:
print(f"Error in FFMPEG audio processor: {e}")
# If the ffmpeg processor fails, fall back to simulated audio
self.use_ffmpeg = False
# Start simulated input if we're still streaming
if self.is_streaming:
print("Falling back to simulated audio input")
asyncio.create_task(self.generate_simulated_input())
finally:
# Cleanup
if hasattr(self.mic_stream, 'close'):
try:
self.mic_stream.close()
except:
pass
self.mic_stream = None
print("FFMPEG audio processor stopped")
async def generate_simulated_input(self):
"""Generate simulated audio input when real microphone isn't available"""
print("Starting simulated audio input")
# Create a few temporary audio files with silence/noise
audio_files = []
for i in range(3):
noise_level = 0.01 * (i + 1) # Vary noise level
duration = 1.0 # 1 second of audio
samples = np.random.normal(0, noise_level, int(16000 * duration))
# Create temporary file
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
sf.write(f.name, samples, 16000)
audio_files.append(f.name)
try:
# Send simulated speech in a pattern
sequence_count = 0
while self.is_streaming:
# Choose a random file
file_path = np.random.choice(audio_files)
# Load the audio
try:
audio_data, _ = sf.read(file_path)
audio_int16 = (audio_data * 32767).astype(np.int16)
audio_bytes = audio_int16.tobytes()
# Send to Bedrock
self.stream_manager.add_audio_chunk(audio_bytes)
except Exception as e:
print(f"Error processing simulated audio file: {e}")
# Wait between chunks
await asyncio.sleep(0.2)
# Increment sequence counter
sequence_count += 1
# After a sequence of noise, send text to get a response
if sequence_count >= 10: # After 10 chunks (about 2 seconds)
sequence_count = 0
# Send text instead of more simulated audio
messages = [
"Hello there",
"How are you today?",
"Tell me something interesting",
"What's the weather like?",
"I'm learning to speak more fluently"
]
message = np.random.choice(messages)
await self.send_text_message(message)
# Add transcription to the output queue for UI
await self.stream_manager.output_queue.put({
"event": {
"textOutput": {
"content": message,
"role": "USER"
}
}
})
# Wait for Nova to respond
await asyncio.sleep(3.0)
except Exception as e:
print(f"Error in simulated audio generator: {e}")
import traceback
traceback.print_exc()
finally:
# Clean up temp files
for file_path in audio_files:
try:
os.unlink(file_path)
except:
pass
async def play_output_audio(self):
"""Handle audio output from Nova Sonic"""
while self.is_streaming:
try:
# Get audio data from the stream manager's queue
audio_data = await asyncio.wait_for(
self.stream_manager.audio_output_queue.get(),
timeout=0.5
)
if audio_data and self.is_streaming:
# Store info in output queue for other parts of the app
self.stream_manager.output_queue.put_nowait({
"event": {
"audioOutput": {
"content": "Audio received from Nova"
}
}
})
# In HF Spaces, we can't play audio directly, but we can save it
timestamp = int(time.time())
output_path = os.path.join(self.output_dir, f"nova_response_{timestamp}.wav")
try:
# Convert from raw PCM to numpy for soundfile
audio_np = np.frombuffer(audio_data, dtype=np.int16)
sf.write(output_path, audio_np, 24000) # Nova outputs at 24kHz
print(f"Saved Nova audio response to {output_path}")
except Exception as e:
print(f"Error saving audio response: {e}")
except asyncio.TimeoutError:
# No data available within timeout
continue
except Exception as e:
if self.is_streaming:
print(f"Error handling output audio: {e}")
import traceback
traceback.print_exc()
await asyncio.sleep(0.1)
async def start_streaming(self):
"""Start streaming audio"""
if self.is_streaming:
return
print(f"Starting audio streaming in HF mode...")
# For HF Spaces, we'll use our enhanced error handling
if self.is_hf_spaces:
# Set environment variables to help with audio issues
os.environ['AUDIODEV'] = 'null'
os.environ['SDL_AUDIODRIVER'] = 'dummy'
# Send audio content start event
await self.stream_manager.send_audio_content_start_event()
self.is_streaming = True
# Start with a welcome message from Nova
await self.send_text_message("Hi there! I'm Nova, your conversation partner. How are you doing today?")
# In HF Spaces, just go straight to simulated audio to avoid ALSA errors
if self.is_hf_spaces:
print("Running in Hugging Face Spaces - using simulated audio")
self.use_ffmpeg = False
self.input_task = asyncio.create_task(self.generate_simulated_input())
self.output_task = asyncio.create_task(self.play_output_audio())
# Let the user know what's happening
print("Speech-to-speech functionality is active:")
print("- Simulated audio is being sent to Nova Sonic")
print("- Nova's responses will be saved as WAV files")
print("- Conversation will be shown as text transcriptions")
return
# For non-HF environments, try the ffmpeg approach
tasks = []
# Initialize FFMPEG mic if available and create audio input task
if self.use_ffmpeg:
ffmpeg_available = await self.initialize_ffmpeg_mic()
if ffmpeg_available:
self.input_task = asyncio.create_task(self.ffmpeg_audio_processor())
tasks.append(self.input_task)
else:
self.use_ffmpeg = False
# Fall back to simulated audio if FFMPEG isn't available
if not self.use_ffmpeg:
self.input_task = asyncio.create_task(self.generate_simulated_input())
tasks.append(self.input_task)
# Start output processing
self.output_task = asyncio.create_task(self.play_output_audio())
tasks.append(self.output_task)
# Let the tasks run - we won't wait for input() here because that's handled in the UI
# This will allow the tasks to continue running until stop_streaming is called
async def send_text_message(self, text):
"""Send a text message to Nova to simulate user input"""
try:
# Create text content start event
content_name = str(time.time())
text_content_start = self.stream_manager.TEXT_CONTENT_START_EVENT % (
self.stream_manager.prompt_name,
content_name,
"USER"
)
await self.stream_manager.send_raw_event(text_content_start)
# Create text input event
text_input = self.stream_manager.TEXT_INPUT_EVENT % (
self.stream_manager.prompt_name,
content_name,
text
)
await self.stream_manager.send_raw_event(text_input)
# Create content end event
content_end = self.stream_manager.CONTENT_END_EVENT % (
self.stream_manager.prompt_name,
content_name
)
await self.stream_manager.send_raw_event(content_end)
print(f"Sent text message to Nova: {text}")
# Also add message to output queue for UI
await self.stream_manager.output_queue.put({
"event": {
"textOutput": {
"content": text,
"role": "USER"
}
}
})
return True
except Exception as e:
print(f"Error sending text message: {e}")
return False
async def stop_streaming(self):
"""Stop streaming audio"""
if not self.is_streaming:
return
self.is_streaming = False
print("Stopping HF audio streaming...")
# Cancel all tasks
tasks = []
if self.input_task and not self.input_task.done():
self.input_task.cancel()
tasks.append(self.input_task)
if self.output_task and not self.output_task.done():
self.output_task.cancel()
tasks.append(self.output_task)
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
# Close ffmpeg mic if open
if self.mic_stream and hasattr(self.mic_stream, 'close'):
try:
self.mic_stream.close()
except:
pass
self.mic_stream = None
# Shutdown executor
self.executor.shutdown(wait=False)
# Always close the stream manager
await self.stream_manager.close()
print("HF audio streaming stopped")