Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import torch | |
| import numpy as np | |
| import librosa | |
| import soundfile as sf | |
| import threading | |
| import time | |
| import queue | |
| import warnings | |
| from typing import Optional, List, Dict, Tuple | |
| from dataclasses import dataclass | |
| from collections import deque | |
| import psutil | |
| import gc | |
| # Import models | |
| from dia.model import Dia | |
| from transformers import pipeline | |
| import webrtcvad | |
| warnings.filterwarnings("ignore", category=FutureWarning) | |
| warnings.filterwarnings("ignore", category=UserWarning) | |
| class ConversationTurn: | |
| user_audio: np.ndarray | |
| user_text: str | |
| ai_response_text: str | |
| ai_response_audio: np.ndarray | |
| timestamp: float | |
| emotion: str | |
| speaker_id: str | |
| class EmotionRecognizer: | |
| def __init__(self): | |
| self.emotion_pipeline = pipeline( | |
| "audio-classification", | |
| model="ehcalabres/wav2vec2-lg-xlsr-en-speech-emotion-recognition", | |
| device=0 if torch.cuda.is_available() else -1 | |
| ) | |
| def detect_emotion(self, audio: np.ndarray, sample_rate: int = 16000) -> str: | |
| try: | |
| result = self.emotion_pipeline({"array": audio, "sampling_rate": sample_rate}) | |
| return result[0]["label"] if result else "neutral" | |
| except Exception as e: | |
| print(f"Emotion detection error: {e}") | |
| return "neutral" | |
| class VADProcessor: | |
| def __init__(self, aggressiveness: int = 2): | |
| self.vad = webrtcvad.Vad(aggressiveness) | |
| self.sample_rate = 16000 | |
| self.frame_duration = 30 # ms | |
| self.frame_size = int(self.sample_rate * self.frame_duration / 1000) | |
| def is_speech(self, audio: np.ndarray) -> bool: | |
| try: | |
| # Convert to 16-bit PCM | |
| audio_int16 = (audio * 32767).astype(np.int16) | |
| # Process in frames | |
| frames = [] | |
| for i in range(0, len(audio_int16) - self.frame_size, self.frame_size): | |
| frame = audio_int16[i:i + self.frame_size].tobytes() | |
| frames.append(self.vad.is_speech(frame, self.sample_rate)) | |
| # Return True if majority of frames contain speech | |
| return sum(frames) > len(frames) * 0.3 | |
| except Exception: | |
| return True # Default to treating as speech | |
| class ConversationManager: | |
| def __init__(self, max_exchanges: int = 50): | |
| self.conversations: Dict[str, deque] = {} | |
| self.max_exchanges = max_exchanges | |
| self.lock = threading.RLock() | |
| def add_turn(self, session_id: str, turn: ConversationTurn): | |
| with self.lock: | |
| if session_id not in self.conversations: | |
| self.conversations[session_id] = deque(maxlen=self.max_exchanges) | |
| self.conversations[session_id].append(turn) | |
| def get_context(self, session_id: str, last_n: int = 5) -> List[ConversationTurn]: | |
| with self.lock: | |
| if session_id not in self.conversations: | |
| return [] | |
| return list(self.conversations[session_id])[-last_n:] | |
| def clear_session(self, session_id: str): | |
| with self.lock: | |
| if session_id in self.conversations: | |
| del self.conversations[session_id] | |
| class SupernaturalAI: | |
| def __init__(self): | |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
| self.models_loaded = False | |
| self.processing_queue = queue.Queue() | |
| self.conversation_manager = ConversationManager() | |
| self.emotion_recognizer = None | |
| self.vad_processor = VADProcessor() | |
| # Models | |
| self.ultravox_model = None | |
| self.dia_model = None | |
| # Performance tracking | |
| self.active_sessions = set() | |
| self.processing_times = deque(maxlen=100) | |
| print("Initializing Supernatural AI...") | |
| self._initialize_models() | |
| def _initialize_models(self): | |
| try: | |
| print("Loading Ultravox model...") | |
| self.ultravox_model = pipeline( | |
| 'automatic-speech-recognition', | |
| model='fixie-ai/ultravox-v0_2', | |
| trust_remote_code=True, | |
| device=0 if torch.cuda.is_available() else -1, | |
| torch_dtype=torch.float16 | |
| ) | |
| print("Loading Dia TTS model...") | |
| self.dia_model = Dia.from_pretrained( | |
| "nari-labs/Dia-1.6B", | |
| compute_dtype="float16" | |
| ) | |
| print("Loading emotion recognition...") | |
| self.emotion_recognizer = EmotionRecognizer() | |
| self.models_loaded = True | |
| print("β All models loaded successfully!") | |
| # Memory cleanup | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| except Exception as e: | |
| print(f"β Error loading models: {e}") | |
| self.models_loaded = False | |
| def _get_memory_usage(self) -> Dict[str, float]: | |
| """Get current memory usage statistics""" | |
| memory = psutil.virtual_memory() | |
| gpu_memory = {} | |
| if torch.cuda.is_available(): | |
| for i in range(torch.cuda.device_count()): | |
| gpu_memory[f"GPU_{i}"] = { | |
| "allocated": torch.cuda.memory_allocated(i) / 1024**3, | |
| "cached": torch.cuda.memory_reserved(i) / 1024**3 | |
| } | |
| return { | |
| "RAM": memory.percent, | |
| "GPU": gpu_memory | |
| } | |
| def _generate_contextual_prompt(self, | |
| user_text: str, | |
| emotion: str, | |
| context: List[ConversationTurn]) -> str: | |
| """Generate contextual prompt with emotion and conversation history""" | |
| # Build context from previous turns | |
| context_text = "" | |
| if context: | |
| for turn in context[-3:]: # Last 3 exchanges | |
| context_text += f"[S1] {turn.user_text} [S2] {turn.ai_response_text} " | |
| # Emotion-aware response generation | |
| emotion_modifiers = { | |
| "happy": "(cheerful)", | |
| "sad": "(sympathetic)", | |
| "angry": "(calming)", | |
| "fear": "(reassuring)", | |
| "surprise": "(excited)", | |
| "neutral": "" | |
| } | |
| modifier = emotion_modifiers.get(emotion.lower(), "") | |
| # Create supernatural AI personality | |
| prompt = f"{context_text}[S1] {user_text} [S2] {modifier} As a supernatural AI with deep emotional understanding, I sense your {emotion} energy. " | |
| return prompt | |
| def process_audio_input(self, | |
| audio_data: Tuple[int, np.ndarray], | |
| session_id: str) -> Tuple[Optional[Tuple[int, np.ndarray]], str, str]: | |
| """Main processing pipeline for audio input""" | |
| if not self.models_loaded: | |
| return None, "β Models not loaded", "Please wait for initialization" | |
| if audio_data is None: | |
| return None, "β No audio received", "Please record some audio" | |
| start_time = time.time() | |
| try: | |
| sample_rate, audio = audio_data | |
| # Ensure audio is mono and proper format | |
| if len(audio.shape) > 1: | |
| audio = np.mean(audio, axis=1) | |
| # Normalize audio | |
| audio = audio.astype(np.float32) | |
| if np.max(np.abs(audio)) > 0: | |
| audio = audio / np.max(np.abs(audio)) * 0.95 | |
| # Voice Activity Detection | |
| if not self.vad_processor.is_speech(audio): | |
| return None, "π No speech detected", "Please speak clearly" | |
| # Resample if needed | |
| if sample_rate != 16000: | |
| audio = librosa.resample(audio, orig_sr=sample_rate, target_sr=16000) | |
| sample_rate = 16000 | |
| # Speech Recognition with Ultravox | |
| try: | |
| speech_result = self.ultravox_model({ | |
| 'array': audio, | |
| 'sampling_rate': sample_rate | |
| }) | |
| user_text = speech_result.get('text', '').strip() | |
| if not user_text: | |
| return None, "β Could not understand speech", "Please speak more clearly" | |
| except Exception as e: | |
| print(f"ASR Error: {e}") | |
| return None, f"β Speech recognition failed: {str(e)}", "Please try again" | |
| # Emotion Recognition | |
| emotion = self.emotion_recognizer.detect_emotion(audio, sample_rate) | |
| # Get conversation context | |
| context = self.conversation_manager.get_context(session_id) | |
| # Generate contextual response | |
| prompt = self._generate_contextual_prompt(user_text, emotion, context) | |
| # Generate speech with Dia TTS | |
| try: | |
| with torch.no_grad(): | |
| audio_output = self.dia_model.generate( | |
| prompt, | |
| use_torch_compile=False, # Better stability | |
| verbose=False | |
| ) | |
| # Ensure audio output is proper format | |
| if isinstance(audio_output, torch.Tensor): | |
| audio_output = audio_output.cpu().numpy() | |
| # Normalize output | |
| if len(audio_output) > 0: | |
| max_val = np.max(np.abs(audio_output)) | |
| if max_val > 1.0: | |
| audio_output = audio_output / max_val * 0.95 | |
| except Exception as e: | |
| print(f"TTS Error: {e}") | |
| return None, f"β Speech generation failed: {str(e)}", "Please try again" | |
| # Extract AI response text (remove speaker tags and modifiers) | |
| ai_response = prompt.split('[S2]')[-1].strip() | |
| ai_response = ai_response.replace('(cheerful)', '').replace('(sympathetic)', '') | |
| ai_response = ai_response.replace('(calming)', '').replace('(reassuring)', '') | |
| ai_response = ai_response.replace('(excited)', '').strip() | |
| # Store conversation turn | |
| turn = ConversationTurn( | |
| user_audio=audio, | |
| user_text=user_text, | |
| ai_response_text=ai_response, | |
| ai_response_audio=audio_output, | |
| timestamp=time.time(), | |
| emotion=emotion, | |
| speaker_id=session_id | |
| ) | |
| self.conversation_manager.add_turn(session_id, turn) | |
| # Track performance | |
| processing_time = time.time() - start_time | |
| self.processing_times.append(processing_time) | |
| # Memory cleanup | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| status = f"β Processed in {processing_time:.2f}s | Emotion: {emotion} | Users: {len(self.active_sessions)}" | |
| return (44100, audio_output), status, f"**You said:** {user_text}\n\n**AI Response:** {ai_response}" | |
| except Exception as e: | |
| print(f"Processing error: {e}") | |
| return None, f"β Processing failed: {str(e)}", "Please try again" | |
| def get_conversation_history(self, session_id: str) -> str: | |
| """Get formatted conversation history""" | |
| context = self.conversation_manager.get_context(session_id, last_n=10) | |
| if not context: | |
| return "No conversation history yet." | |
| history = "## Conversation History\n\n" | |
| for i, turn in enumerate(context, 1): | |
| history += f"**Turn {i}:**\n" | |
| history += f"- **You:** {turn.user_text}\n" | |
| history += f"- **AI:** {turn.ai_response_text}\n" | |
| history += f"- **Emotion Detected:** {turn.emotion}\n\n" | |
| return history | |
| def clear_conversation(self, session_id: str) -> str: | |
| """Clear conversation history for session""" | |
| self.conversation_manager.clear_session(session_id) | |
| return "Conversation history cleared." | |
| def get_system_status(self) -> str: | |
| """Get system status information""" | |
| memory = self._get_memory_usage() | |
| avg_processing = np.mean(self.processing_times) if self.processing_times else 0 | |
| status = f"""## System Status | |
| **Performance:** | |
| - Average Processing Time: {avg_processing:.2f}s | |
| - Active Sessions: {len(self.active_sessions)} | |
| - Total Conversations: {len(self.conversation_manager.conversations)} | |
| **Memory Usage:** | |
| - RAM: {memory['RAM']:.1f}% | |
| - GPU Memory: {memory.get('GPU', {})} | |
| **Models Status:** | |
| - Models Loaded: {"β " if self.models_loaded else "β"} | |
| - Device: {self.device} | |
| """ | |
| return status | |
| # Initialize the AI system | |
| print("Starting Supernatural AI system...") | |
| ai_system = SupernaturalAI() | |
| # Gradio Interface | |
| def process_audio_interface(audio, session_id): | |
| """Interface function for Gradio""" | |
| if not session_id: | |
| session_id = f"user_{int(time.time())}" | |
| ai_system.active_sessions.add(session_id) | |
| result = ai_system.process_audio_input(audio, session_id) | |
| return result + (session_id,) | |
| def get_history_interface(session_id): | |
| """Get conversation history interface""" | |
| if not session_id: | |
| return "No session ID provided" | |
| return ai_system.get_conversation_history(session_id) | |
| def clear_history_interface(session_id): | |
| """Clear history interface""" | |
| if not session_id: | |
| return "No session ID provided" | |
| return ai_system.clear_conversation(session_id) | |
| # Create Gradio interface | |
| with gr.Blocks(title="Supernatural Conversational AI", theme=gr.themes.Soft()) as demo: | |
| gr.HTML(""" | |
| <div style="text-align: center; padding: 20px;"> | |
| <h1>π§ββοΈ Supernatural Conversational AI</h1> | |
| <p style="font-size: 18px; color: #666;"> | |
| Advanced Speech-to-Speech AI with Emotional Intelligence | |
| </p> | |
| <p style="color: #888;"> | |
| Powered by Ultravox + Dia TTS | Optimized for 4x L4 GPUs | |
| </p> | |
| </div> | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| # Audio input/output | |
| audio_input = gr.Audio( | |
| label="π€ Speak to the AI", | |
| sources=["microphone"], | |
| type="numpy", | |
| streaming=False | |
| ) | |
| audio_output = gr.Audio( | |
| label="π AI Response", | |
| type="numpy", | |
| autoplay=True | |
| ) | |
| # Session management | |
| session_id = gr.Textbox( | |
| label="Session ID", | |
| placeholder="Auto-generated if empty", | |
| value="", | |
| interactive=True | |
| ) | |
| # Process button | |
| process_btn = gr.Button("π― Process Audio", variant="primary", size="lg") | |
| with gr.Column(scale=1): | |
| # Status and conversation | |
| status_display = gr.Textbox( | |
| label="π Status", | |
| interactive=False, | |
| lines=3 | |
| ) | |
| conversation_display = gr.Markdown( | |
| label="π¬ Conversation", | |
| value="Start speaking to begin..." | |
| ) | |
| # History management | |
| with gr.Row(): | |
| history_btn = gr.Button("π Show History", size="sm") | |
| clear_btn = gr.Button("ποΈ Clear History", size="sm") | |
| status_btn = gr.Button("β‘ System Status", size="sm") | |
| # History and status display | |
| history_display = gr.Markdown( | |
| label="π Conversation History", | |
| value="No history yet." | |
| ) | |
| # Event handlers | |
| process_btn.click( | |
| fn=process_audio_interface, | |
| inputs=[audio_input, session_id], | |
| outputs=[audio_output, status_display, conversation_display, session_id] | |
| ) | |
| history_btn.click( | |
| fn=get_history_interface, | |
| inputs=[session_id], | |
| outputs=[history_display] | |
| ) | |
| clear_btn.click( | |
| fn=clear_history_interface, | |
| inputs=[session_id], | |
| outputs=[history_display] | |
| ) | |
| status_btn.click( | |
| fn=lambda: ai_system.get_system_status(), | |
| outputs=[history_display] | |
| ) | |
| # Auto-process on audio input | |
| audio_input.change( | |
| fn=process_audio_interface, | |
| inputs=[audio_input, session_id], | |
| outputs=[audio_output, status_display, conversation_display, session_id] | |
| ) | |
| # Usage instructions | |
| gr.HTML(""" | |
| <div style="margin-top: 20px; padding: 15px; background: #f0f8ff; border-radius: 8px;"> | |
| <h3>π‘ Usage Instructions:</h3> | |
| <ul> | |
| <li><strong>Record Audio:</strong> Click the microphone and speak naturally</li> | |
| <li><strong>Emotional AI:</strong> The AI detects and responds to your emotions</li> | |
| <li><strong>Conversation Memory:</strong> Up to 50 exchanges are remembered</li> | |
| <li><strong>Session Management:</strong> Use Session ID to maintain separate conversations</li> | |
| <li><strong>Performance:</strong> Optimized for sub-500ms latency</li> | |
| </ul> | |
| <p><strong>Supported Features:</strong> Emotion recognition, voice activity detection, | |
| contextual responses, conversation history, concurrent users (15-20), memory management</p> | |
| </div> | |
| """) | |
| # Configure for optimal performance | |
| demo.queue( | |
| concurrency_count=20, # Support 20 concurrent users | |
| max_size=100, | |
| api_open=False | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| show_error=True, | |
| quiet=False, | |
| enable_queue=True, | |
| max_threads=40 | |
| ) | |