Spaces:
Sleeping
Sleeping
| import os | |
| import threading | |
| import time | |
| import argparse | |
| import asyncio | |
| import numpy as np | |
| import soundfile as sf | |
| import tempfile | |
| from nova_sonic_tool_use import BedrockStreamManager, AudioStreamer | |
| from language_coach import LanguageCoach | |
| from session_manager import SessionManager | |
| from config import UI_TITLE, UI_SUBTITLE, INPUT_SAMPLE_RATE | |
| import gradio as gr | |
| # Import dotenv for environment variables if available | |
| try: | |
| from dotenv import load_dotenv | |
| # Load environment variables from .env file if it exists | |
| load_dotenv() | |
| except ImportError: | |
| pass | |
| # Import HF-specific audio utils | |
| try: | |
| from hf_audio_utils import HFAudioStreamer | |
| HF_AUDIO_AVAILABLE = True | |
| except ImportError: | |
| print("HFAudioStreamer not available. Attempting to create it.") | |
| HF_AUDIO_AVAILABLE = False | |
| # Try to import transformers audio utils for ffmpeg microphone | |
| try: | |
| from transformers.pipelines.audio_utils import ffmpeg_microphone_live | |
| FFMPEG_AVAILABLE = True | |
| print("ffmpeg_microphone_live is available!") | |
| except ImportError: | |
| FFMPEG_AVAILABLE = False | |
| print("ffmpeg_microphone_live is not available. Using fallback audio handling.") | |
| # Check if we're in HF Spaces | |
| def is_huggingface_spaces(): | |
| """Detect if we're running on HuggingFace Spaces""" | |
| return "SPACE_ID" in os.environ or ("SYSTEM" in os.environ and os.environ.get("SYSTEM") == "spaces") | |
| # Set environment variables to suppress ALSA errors in HF Spaces | |
| if is_huggingface_spaces(): | |
| os.environ['AUDIODEV'] = 'null' | |
| # Redirect stderr to suppress ALSA errors in output | |
| try: | |
| import sys | |
| import io | |
| if not hasattr(sys, '_alsa_error_redirected'): | |
| # Save the original stderr | |
| sys._original_stderr = sys.stderr | |
| # Create a filter to capture ALSA errors but pass through other messages | |
| class ALSAErrorFilter: | |
| def __init__(self, original_stderr): | |
| self.original_stderr = original_stderr | |
| self.buffer = "" | |
| def write(self, text): | |
| # If it's an ALSA error, suppress it | |
| if "ALSA" in text or "PCM" in text: | |
| return | |
| # Otherwise, write to the original stderr | |
| self.original_stderr.write(text) | |
| def flush(self): | |
| self.original_stderr.flush() | |
| def isatty(self): | |
| return hasattr(self.original_stderr, 'isatty') and self.original_stderr.isatty() | |
| # Replace stderr with our filtered version | |
| sys.stderr = ALSAErrorFilter(sys._original_stderr) | |
| # Function to restore stderr | |
| def restore_stderr(): | |
| if hasattr(sys, '_original_stderr'): | |
| sys.stderr = sys._original_stderr | |
| print("Restored original stderr") | |
| # Mark that we've handled this | |
| sys._alsa_error_redirected = True | |
| # Restore stderr on exit | |
| import atexit | |
| atexit.register(restore_stderr) | |
| print("Installed ALSA error filter to suppress audio device errors") | |
| except: | |
| pass | |
| # Create an ffmpeg microphone streamer function | |
| def create_ffmpeg_mic(sample_rate=INPUT_SAMPLE_RATE, chunk_length_s=1.0, stream_chunk_s=0.25): | |
| """Creates an ffmpeg-based microphone stream if available""" | |
| if not FFMPEG_AVAILABLE: | |
| return None | |
| try: | |
| mic = ffmpeg_microphone_live( | |
| sampling_rate=sample_rate, | |
| chunk_length_s=chunk_length_s, | |
| stream_chunk_s=stream_chunk_s, | |
| ) | |
| print(f"Successfully created ffmpeg microphone with sample rate {sample_rate}") | |
| return mic | |
| except Exception as e: | |
| print(f"Error creating ffmpeg microphone: {e}") | |
| return None | |
| class NovaConversationApp: | |
| def __init__(self, session_id=None): | |
| # Initialize core components | |
| self.session_manager = SessionManager() | |
| self.language_coach = LanguageCoach() | |
| # Start or resume session | |
| self.session_id = self.session_manager.start_session(session_id) | |
| # Status flags | |
| self.is_running = False | |
| self.is_listening = False | |
| self.is_processing = False | |
| # Initialize the stream manager and audio streamer | |
| # These will be properly initialized in start() | |
| self.stream_manager = None | |
| self.audio_streamer = None | |
| self.loop = None | |
| self.audio_stream_task = None | |
| def _get_hf_audio_utils_content(self): | |
| """Returns the content for a dynamically generated HFAudioStreamer module""" | |
| return ''' | |
| import os | |
| import asyncio | |
| import numpy as np | |
| import random | |
| import time | |
| import threading | |
| import base64 | |
| import json | |
| import tempfile | |
| from concurrent.futures import ThreadPoolExecutor | |
| # Try to import the Hugging Face-specific audio utilities | |
| try: | |
| from transformers.pipelines.audio_utils import ffmpeg_microphone_live | |
| HF_AUDIO_AVAILABLE = True | |
| except ImportError: | |
| HF_AUDIO_AVAILABLE = False | |
| print("Warning: transformers.pipelines.audio_utils not available, will use fallback audio simulation") | |
| class HFAudioStreamer: | |
| """Audio streamer for Hugging Face Spaces that works with or without real audio devices""" | |
| def __init__(self, stream_manager): | |
| """Initialize the HF Audio Streamer""" | |
| self.stream_manager = stream_manager | |
| self.is_streaming = False | |
| self.use_ffmpeg = HF_AUDIO_AVAILABLE | |
| self.mic_stream = None | |
| self.executor = ThreadPoolExecutor(max_workers=2) | |
| self.loop = asyncio.get_event_loop() | |
| # Initialize tasks | |
| self.input_task = None | |
| self.output_task = None | |
| # Check if we're in HF Spaces | |
| self.is_hf_spaces = "SPACE_ID" in os.environ or ("SYSTEM" in os.environ and os.environ.get("SYSTEM") == "spaces") | |
| # Create output directory for audio files | |
| self.output_dir = os.path.join(tempfile.gettempdir(), "nova_output") | |
| os.makedirs(self.output_dir, exist_ok=True) | |
| print(f"HF Audio Streamer initialized. Using ffmpeg: {self.use_ffmpeg}, In HF Spaces: {self.is_hf_spaces}") | |
| print(f"Audio output will be saved to: {self.output_dir}") | |
| async def generate_simulated_input(self): | |
| """Generate simulated audio input when real microphone isn't available""" | |
| print("Starting simulated audio input") | |
| while self.is_streaming: | |
| try: | |
| # Generate a dummy audio chunk with some basic noise | |
| CHUNK_SIZE = 1024 # Standard audio chunk size | |
| CHANNELS = 1 # Mono audio | |
| samples = np.random.normal(0, 0.01, CHUNK_SIZE * CHANNELS).astype(np.float32) | |
| audio_data = (samples * 32767).astype(np.int16).tobytes() | |
| # Send to Bedrock | |
| self.stream_manager.add_audio_chunk(audio_data) | |
| # Wait between chunks | |
| await asyncio.sleep(0.2) | |
| # Occasionally send text to get a response | |
| if random.random() < 0.05: # 5% chance | |
| messages = [ | |
| "Hello there", | |
| "How are you today?", | |
| "Tell me something interesting", | |
| "What's the weather like?", | |
| "I'm learning to speak more fluently" | |
| ] | |
| message = random.choice(messages) | |
| await self.send_text_message(message) | |
| await asyncio.sleep(2.0) | |
| except Exception as e: | |
| if self.is_streaming: | |
| print(f"Error generating simulated audio: {e}") | |
| await asyncio.sleep(0.5) | |
| async def play_output_audio(self): | |
| """Handle audio output from Nova Sonic""" | |
| while self.is_streaming: | |
| try: | |
| # Get audio data from the stream manager's queue | |
| audio_data = await asyncio.wait_for( | |
| self.stream_manager.audio_output_queue.get(), | |
| timeout=0.5 | |
| ) | |
| if audio_data and self.is_streaming: | |
| # Store info in output queue for other parts of the app | |
| self.stream_manager.output_queue.put_nowait({ | |
| "event": { | |
| "audioOutput": { | |
| "content": "Audio received from Nova" | |
| } | |
| } | |
| }) | |
| # In HF Spaces, we can't play audio directly, but we can save it | |
| timestamp = int(time.time()) | |
| output_path = os.path.join(self.output_dir, f"nova_response_{timestamp}.wav") | |
| try: | |
| # Convert from raw PCM to numpy for saving | |
| audio_np = np.frombuffer(audio_data, dtype=np.int16) | |
| # We can't import soundfile here, so we'll just log the info | |
| print(f"Would save Nova audio response ({len(audio_np)} samples) to {output_path}") | |
| except Exception as e: | |
| print(f"Error handling audio response: {e}") | |
| except asyncio.TimeoutError: | |
| # No data available within timeout | |
| continue | |
| except Exception as e: | |
| if self.is_streaming: | |
| print(f"Error handling output audio: {e}") | |
| await asyncio.sleep(0.1) | |
| async def start_streaming(self): | |
| """Start streaming audio""" | |
| if self.is_streaming: | |
| return | |
| print(f"Starting audio streaming in HF mode...") | |
| # Send audio content start event | |
| await self.stream_manager.send_audio_content_start_event() | |
| self.is_streaming = True | |
| # Start with a welcome message from Nova | |
| await self.send_text_message("Hi there! I'm Nova, your conversation partner. How are you doing today?") | |
| # Start simulated input | |
| self.input_task = asyncio.create_task(self.generate_simulated_input()) | |
| # Start output processing | |
| self.output_task = asyncio.create_task(self.play_output_audio()) | |
| async def send_text_message(self, text): | |
| """Send a text message to Nova to simulate user input""" | |
| try: | |
| # Create text content start event | |
| content_name = str(time.time()) | |
| text_content_start = self.stream_manager.TEXT_CONTENT_START_EVENT % ( | |
| self.stream_manager.prompt_name, | |
| content_name, | |
| "USER" | |
| ) | |
| await self.stream_manager.send_raw_event(text_content_start) | |
| # Create text input event | |
| text_input = self.stream_manager.TEXT_INPUT_EVENT % ( | |
| self.stream_manager.prompt_name, | |
| content_name, | |
| text | |
| ) | |
| await self.stream_manager.send_raw_event(text_input) | |
| # Create content end event | |
| content_end = self.stream_manager.CONTENT_END_EVENT % ( | |
| self.stream_manager.prompt_name, | |
| content_name | |
| ) | |
| await self.stream_manager.send_raw_event(content_end) | |
| print(f"Sent text message to Nova: {text}") | |
| # Also add message to output queue for UI | |
| await self.stream_manager.output_queue.put({ | |
| "event": { | |
| "textOutput": { | |
| "content": text, | |
| "role": "USER" | |
| } | |
| } | |
| }) | |
| return True | |
| except Exception as e: | |
| print(f"Error sending text message: {e}") | |
| return False | |
| async def stop_streaming(self): | |
| """Stop streaming audio""" | |
| if not self.is_streaming: | |
| return | |
| self.is_streaming = False | |
| print("Stopping HF audio streaming...") | |
| # Cancel all tasks | |
| if self.input_task and not self.input_task.done(): | |
| self.input_task.cancel() | |
| if self.output_task and not self.output_task.done(): | |
| self.output_task.cancel() | |
| # Shutdown executor | |
| self.executor.shutdown(wait=False) | |
| # Always close the stream manager | |
| await self.stream_manager.close() | |
| print("HF audio streaming stopped") | |
| ''' | |
| def start(self): | |
| """Start the conversation with Nova""" | |
| print("Starting conversation with Nova...") | |
| self.is_running = True | |
| self.ffmpeg_mic = None | |
| self.ffmpeg_thread = None | |
| # Create event loop in the current thread if needed | |
| try: | |
| self.loop = asyncio.get_event_loop() | |
| except RuntimeError: | |
| self.loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(self.loop) | |
| # Run initialization in the event loop | |
| try: | |
| # Check for AWS credentials | |
| if not os.environ.get("AWS_ACCESS_KEY_ID") or not os.environ.get("AWS_SECRET_ACCESS_KEY"): | |
| missing = [] | |
| if not os.environ.get("AWS_ACCESS_KEY_ID"): | |
| missing.append("AWS_ACCESS_KEY_ID") | |
| if not os.environ.get("AWS_SECRET_ACCESS_KEY"): | |
| missing.append("AWS_SECRET_ACCESS_KEY") | |
| error_msg = f"Missing AWS credentials: {', '.join(missing)}" | |
| # Check if running in Hugging Face Spaces | |
| if is_huggingface_spaces(): | |
| error_msg += "\nPlease add these as secrets in your Hugging Face Space settings." | |
| else: | |
| error_msg += "\nPlease set these environment variables or add them to a .env file." | |
| raise ValueError(error_msg) | |
| # Initialize stream manager | |
| region = os.environ.get("AWS_DEFAULT_REGION", "us-east-1") | |
| self.stream_manager = BedrockStreamManager(model_id='amazon.nova-sonic-v1:0', region=region) | |
| # Initialize the appropriate audio streamer based on environment | |
| if is_huggingface_spaces(): | |
| # For HF Spaces, prefer our custom HF audio streamer | |
| if HF_AUDIO_AVAILABLE: | |
| print("Using Hugging Face Spaces-optimized audio streamer") | |
| self.audio_streamer = HFAudioStreamer(self.stream_manager) | |
| else: | |
| # Create HFAudioStreamer dynamically if not imported | |
| try: | |
| print("Creating HFAudioStreamer dynamically") | |
| # Write module to a temporary file | |
| module_content = self._get_hf_audio_utils_content() | |
| temp_dir = tempfile.mkdtemp() | |
| module_path = os.path.join(temp_dir, "dynamic_hf_audio.py") | |
| with open(module_path, 'w') as f: | |
| f.write(module_content) | |
| import sys | |
| sys.path.append(temp_dir) | |
| # Import the module | |
| import dynamic_hf_audio | |
| self.audio_streamer = dynamic_hf_audio.HFAudioStreamer(self.stream_manager) | |
| print("Successfully created dynamic HFAudioStreamer") | |
| except Exception as e: | |
| print(f"Failed to create dynamic HFAudioStreamer: {e}") | |
| # Fall back to standard audio streamer | |
| print("Falling back to standard audio streamer") | |
| self.audio_streamer = AudioStreamer(self.stream_manager) | |
| else: | |
| # For local environments, try ffmpeg first | |
| if FFMPEG_AVAILABLE: | |
| print("Attempting to use ffmpeg microphone streamer") | |
| # Create ffmpeg microphone | |
| self.ffmpeg_mic = create_ffmpeg_mic() | |
| if self.ffmpeg_mic: | |
| # We'll handle ffmpeg in a separate thread after stream initialization | |
| print("Will use ffmpeg microphone for audio input") | |
| # Initialize standard audio streamer | |
| print("Using standard audio streamer" + (" with ffmpeg enhancement" if self.ffmpeg_mic else "")) | |
| self.audio_streamer = AudioStreamer(self.stream_manager) | |
| # Initialize the stream in the event loop | |
| self.loop.run_until_complete(self._initialize_streaming()) | |
| # If ffmpeg mic is available, start a thread to process its input | |
| if self.ffmpeg_mic: | |
| self.ffmpeg_thread = threading.Thread( | |
| target=self._process_ffmpeg_mic, | |
| daemon=True | |
| ) | |
| self.ffmpeg_thread.start() | |
| print("Started ffmpeg microphone processing thread") | |
| # Monitor output text for session history and language coaching | |
| asyncio.run_coroutine_threadsafe(self._monitor_output(), self.loop) | |
| return True | |
| except Exception as e: | |
| print(f"Failed to start conversation with Nova: {e}") | |
| self.is_running = False | |
| return False | |
| async def _initialize_streaming(self): | |
| """Initialize and start streaming""" | |
| # Initialize the stream | |
| await self.stream_manager.initialize_stream() | |
| # Restore stderr after stream initialization if we redirected it | |
| try: | |
| if hasattr(sys, '_alsa_error_redirected') and hasattr(sys, '_original_stderr'): | |
| sys.stderr = sys._original_stderr | |
| print("Restored stderr after stream initialization") | |
| except: | |
| pass | |
| # Start the streaming process using the built-in start_streaming method | |
| self.audio_stream_task = asyncio.create_task(self.audio_streamer.start_streaming()) | |
| async def _monitor_output(self): | |
| """Monitor output messages to capture transcripts and responses""" | |
| try: | |
| while self.is_running: | |
| # Try to get a message from the output queue | |
| try: | |
| message = await asyncio.wait_for( | |
| self.stream_manager.output_queue.get(), | |
| timeout=0.5 | |
| ) | |
| # Process the message | |
| if "event" in message: | |
| if "textOutput" in message["event"]: | |
| # Extract text content and role | |
| text_content = message["event"]["textOutput"]["content"] | |
| role = message["event"]["textOutput"]["role"] | |
| # Save to session history if it's from Nova | |
| if role == "ASSISTANT": | |
| self.session_manager.add_interaction("User speech", text_content) | |
| # Analyze with language coach | |
| self.language_coach.analyze(text_content, self.session_id) | |
| except asyncio.TimeoutError: | |
| # No message received within timeout, continue | |
| continue | |
| except Exception as e: | |
| print(f"Error monitoring output: {e}") | |
| if self.is_running: | |
| self.stop() | |
| def conversation_loop(self): | |
| """The main conversation loop for CLI usage""" | |
| # First, initialize the stream | |
| if not self.start(): | |
| print("Error: Failed to initialize Nova stream") | |
| return | |
| # Keep the main thread alive | |
| try: | |
| print("\nListening... (Press Ctrl+C to exit)") | |
| # In CLI mode, we need a way to stop the stream | |
| # Use input() to wait for Enter key | |
| input("\nPress Enter to stop conversation...") | |
| except KeyboardInterrupt: | |
| print("\nExiting conversation") | |
| finally: | |
| self.stop() | |
| def replay_last_response(self): | |
| """Replay the last audio response from Nova""" | |
| if self.stream_manager and self.stream_manager.is_active: | |
| last_audio = self.session_manager.get_last_response() | |
| if last_audio: | |
| # Add the audio to the output queue | |
| asyncio.run_coroutine_threadsafe( | |
| self.stream_manager.audio_output_queue.put(last_audio), | |
| self.loop | |
| ) | |
| return True | |
| return False | |
| def _process_ffmpeg_mic(self): | |
| """Process audio from ffmpeg microphone in a separate thread""" | |
| try: | |
| # Log the start of processing | |
| print("Starting ffmpeg microphone processing...") | |
| # Track transcription for visual feedback | |
| current_transcription = "" | |
| last_transcription_time = time.time() | |
| # Process each chunk from the ffmpeg microphone | |
| for audio_chunk in self.ffmpeg_mic: | |
| if not self.is_running: | |
| break | |
| # Convert from float32 [-1.0, 1.0] to int16 for Nova Sonic | |
| if isinstance(audio_chunk, np.ndarray): | |
| # Scale from [-1.0, 1.0] to int16 range | |
| audio_int16 = (audio_chunk * 32767).astype(np.int16) | |
| audio_bytes = audio_int16.tobytes() | |
| # Send to Bedrock via the stream manager | |
| if self.stream_manager and self.is_running: | |
| self.stream_manager.add_audio_chunk(audio_bytes) | |
| # Log periodically to show that audio is being processed | |
| current_time = time.time() | |
| if current_time - last_transcription_time > 2.0: # Every 2 seconds | |
| print("Processing audio from ffmpeg microphone...") | |
| last_transcription_time = current_time | |
| print("Finished ffmpeg microphone processing") | |
| except Exception as e: | |
| print(f"Error in ffmpeg microphone thread: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| def stop(self): | |
| """Stop the conversation and clean up resources""" | |
| if not self.is_running: | |
| return | |
| self.is_running = False | |
| # Stop the ffmpeg thread if it's running | |
| if self.ffmpeg_mic: | |
| try: | |
| self.ffmpeg_mic.close() | |
| except: | |
| pass | |
| self.ffmpeg_mic = None | |
| # Clean up the audio streamer and stream manager | |
| if self.loop and self.audio_streamer: | |
| asyncio.run_coroutine_threadsafe( | |
| self.audio_streamer.stop_streaming(), | |
| self.loop | |
| ) | |
| print("Conversation stopped") | |
| # Gradio UI setup | |
| def create_ui(app): | |
| with gr.Blocks(title=UI_TITLE) as ui: | |
| gr.Markdown(f"# {UI_TITLE}") | |
| gr.Markdown(f"## {UI_SUBTITLE}") | |
| # Check if we're in HF Spaces to provide appropriate instructions | |
| if is_huggingface_spaces(): | |
| gr.Markdown(""" | |
| ### Hugging Face Spaces Mode | |
| This app is running in Hugging Face Spaces with speech-to-speech functionality. | |
| 1. Click **Start Conversation** to begin | |
| 2. Nova will automatically greet you | |
| 3. The app simulates speech input since real microphones aren't available in this environment | |
| 4. Nova's audio responses are saved as WAV files in a temporary directory | |
| 5. You'll see text transcriptions of the conversation in real-time | |
| 6. You can also use the text input below to send messages to Nova | |
| 7. Press **Stop Conversation** when done | |
| Note: ALSA errors in the logs are normal and expected - the app handles them automatically. | |
| """) | |
| with gr.Row(): | |
| status_indicator = gr.Textbox( | |
| value="Ready to start", | |
| label="Status", | |
| interactive=False | |
| ) | |
| # Live transcription display | |
| with gr.Row(): | |
| live_transcription = gr.Textbox( | |
| value="", | |
| label="Live Transcription", | |
| placeholder="Your speech will appear here as you speak...", | |
| interactive=False | |
| ) | |
| # Conversation history display | |
| conversation_display = gr.Textbox( | |
| value="", | |
| label="Conversation History", | |
| lines=10, | |
| max_lines=20, | |
| interactive=False | |
| ) | |
| with gr.Row(): | |
| start_button = gr.Button("Start Conversation", variant="primary") | |
| stop_button = gr.Button("Stop Conversation", variant="stop") | |
| replay_button = gr.Button("Replay Last Response") | |
| # Add microphone component - use params compatible with older Gradio versions | |
| with gr.Row(): | |
| # Check if we're in HF Spaces and skip this component | |
| if not is_huggingface_spaces(): | |
| try: | |
| # Try with newer Gradio params | |
| audio_input = gr.Audio( | |
| source="microphone", | |
| type="filepath", | |
| streaming=True, | |
| label="Speak here (if your browser supports it)" | |
| ) | |
| except TypeError: | |
| # Fall back to older Gradio version compatible params | |
| audio_input = gr.Audio( | |
| type="filepath", | |
| streaming=True, | |
| label="Speak here (if your browser supports it)" | |
| ) | |
| # Text input for all users | |
| with gr.Row(): | |
| user_message = gr.Textbox( | |
| placeholder="Type your message here and press Enter", | |
| label="Your Message", | |
| interactive=True, | |
| show_label=True | |
| ) | |
| send_button = gr.Button("Send", variant="primary") | |
| # Define UI interactions | |
| def start_conversation(): | |
| if app.start(): | |
| return "Conversation started - Nova will say hello shortly" | |
| return "Failed to start conversation" | |
| def stop_conversation(): | |
| app.stop() | |
| return "Conversation stopped" | |
| def replay_last(): | |
| if app.replay_last_response(): | |
| return "Replaying last response" | |
| return "No response to replay" | |
| # Function to handle audio from microphone | |
| def process_audio(audio_path): | |
| try: | |
| if app.is_running and app.audio_streamer and audio_path: | |
| # Not returning anything here as this is processed in stream mode | |
| # Update will be shown in live transcription | |
| pass | |
| return None | |
| except Exception as e: | |
| print(f"Error processing audio: {e}") | |
| return None | |
| # Function to send text messages | |
| def send_text_message(text): | |
| if not text.strip(): | |
| return "Please type a message first", live_transcription.value, None | |
| if app.is_running and app.audio_streamer: | |
| # Update the live transcription to show what user said | |
| new_transcription = f"You: {text}" | |
| # Add text to the conversation display | |
| history = conversation_display.value | |
| new_history = f"{history}\nYou: {text}\n" | |
| # Use the appropriate method based on the streamer type | |
| if hasattr(app.audio_streamer, 'send_text_message'): | |
| # Schedule the text message to be sent | |
| asyncio.run_coroutine_threadsafe( | |
| app.audio_streamer.send_text_message(text), | |
| app.loop | |
| ) | |
| return "Message sent", new_transcription, new_history, "" | |
| else: | |
| return "Audio streamer doesn't support text messages", live_transcription.value, history, text | |
| else: | |
| return "Please start the conversation first", live_transcription.value, None, text | |
| # Connect the audio input to processing if we're not in HF Spaces | |
| if not is_huggingface_spaces() and 'audio_input' in locals(): | |
| try: | |
| audio_input.stream( | |
| process_audio, | |
| inputs=[audio_input], | |
| outputs=None | |
| ) | |
| except Exception as e: | |
| print(f"Warning: Could not set up audio streaming: {e}") | |
| print("Continuing with text input only") | |
| # Connect the text input to the send function | |
| send_button.click( | |
| send_text_message, | |
| inputs=[user_message], | |
| outputs=[status_indicator, live_transcription, conversation_display, user_message] | |
| ) | |
| user_message.submit( | |
| send_text_message, | |
| inputs=[user_message], | |
| outputs=[status_indicator, live_transcription, conversation_display, user_message] | |
| ) | |
| # Wire up the UI interactions | |
| start_button.click(start_conversation, outputs=status_indicator) | |
| stop_button.click(stop_conversation, outputs=status_indicator) | |
| replay_button.click(replay_last, outputs=status_indicator) | |
| # Function to update the live transcription | |
| def update_live_transcription(): | |
| if app.is_running and app.stream_manager and app.stream_manager.output_queue: | |
| # Try to get the most recent user speech transcription if available | |
| try: | |
| # This is non-blocking | |
| if not app.stream_manager.output_queue.empty(): | |
| message = app.stream_manager.output_queue.get_nowait() | |
| if "event" in message and "textOutput" in message["event"]: | |
| content = message["event"]["textOutput"]["content"] | |
| role = message["event"]["textOutput"]["role"] | |
| if role == "USER": | |
| return f"You (live): {content}" | |
| except Exception as e: | |
| print(f"Error updating live transcription: {e}") | |
| return live_transcription.value | |
| # Update the conversation history from the app | |
| def update_conversation(): | |
| if app.session_manager and app.is_running: | |
| history = app.session_manager.get_conversation_context() | |
| # Replace the format to make it more readable | |
| history = history.replace("User: ", "You: ").replace("Nova: ", "Nova: ") | |
| return history | |
| return conversation_display.value | |
| # Set up periodic updates - handle different Gradio versions | |
| try: | |
| # Try newer Gradio method | |
| live_transcription.every(0.5, update_live_transcription) # Update more frequently | |
| conversation_display.every(1, update_conversation) | |
| except AttributeError: | |
| # Fall back to older Gradio version using the update event | |
| print("Using alternative update method for older Gradio") | |
| # Create a refresh button that's hidden and auto-clicks | |
| with gr.Row(visible=False): | |
| refresh_btn = gr.Button("Refresh") | |
| # Set up the update functions with the refresh button | |
| refresh_btn.click( | |
| update_live_transcription, | |
| inputs=None, | |
| outputs=live_transcription | |
| ).then( | |
| update_conversation, | |
| inputs=None, | |
| outputs=conversation_display | |
| ) | |
| # Auto-click the refresh button every second | |
| def auto_refresh(): | |
| while True: | |
| time.sleep(1) | |
| try: | |
| # Programmatically trigger the refresh button | |
| refresh_btn.click() | |
| except: | |
| pass | |
| # Start the auto-refresh thread | |
| auto_thread = threading.Thread(target=auto_refresh, daemon=True) | |
| auto_thread.start() | |
| return ui | |
| if __name__ == "__main__": | |
| # Parse command line arguments | |
| parser = argparse.ArgumentParser(description="Nova Conversation Partner") | |
| parser.add_argument("--session", help="Resume an existing session by ID") | |
| parser.add_argument("--cli", action="store_true", help="Run in CLI mode (no UI)") | |
| parser.add_argument("--debug", action="store_true", help="Enable debug output") | |
| args = parser.parse_args() | |
| # Set debug flag in the nova_sonic_tool_use module | |
| import nova_sonic_tool_use | |
| nova_sonic_tool_use.DEBUG = args.debug | |
| # Create the app instance | |
| app = NovaConversationApp(session_id=args.session) | |
| # Run in appropriate mode | |
| if args.cli: | |
| # CLI mode | |
| app.conversation_loop() | |
| else: | |
| # UI mode (Gradio) | |
| ui = create_ui(app) | |
| ui.launch(share=True) | |