eduard76's picture
Update app.py
faef6ba verified
"""
Professional Voice Agent - GPU Optimized
High-quality voice assistant with speech recognition and synthesis
Designed for best user experience on GPU hardware
"""
import gradio as gr
import torch
import numpy as np
from transformers import (
pipeline,
AutoModelForCausalLM,
AutoTokenizer,
WhisperProcessor,
WhisperForConditionalGeneration,
SpeechT5Processor,
SpeechT5ForTextToSpeech,
SpeechT5HifiGan
)
from datasets import load_dataset
import soundfile as sf
import io
import time
import logging
from typing import Tuple, Optional
import warnings
warnings.filterwarnings("ignore")
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class ProfessionalVoiceAgent:
"""High-quality voice agent optimized for GPU"""
def __init__(self, use_large_models=True):
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.use_large_models = use_large_models and torch.cuda.is_available()
logger.info(f"Initializing on {self.device}")
logger.info(f"GPU Available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
logger.info(f"GPU Name: {torch.cuda.get_device_name(0)}")
logger.info(f"GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB")
# Model components
self.whisper_model = None
self.whisper_processor = None
self.chat_model = None
self.chat_tokenizer = None
self.tts_model = None
self.tts_processor = None
self.vocoder = None
self.speaker_embeddings = None
# Load models
self.load_all_models()
def load_all_models(self):
"""Load all models with GPU optimization"""
logger.info("Loading models... This will take a moment for best quality.")
# Load Whisper for speech recognition
self.load_whisper()
# Load chat model
self.load_chat_model()
# Load TTS
self.load_tts()
logger.info("All models loaded successfully!")
def load_whisper(self):
"""Load Whisper model for speech recognition"""
try:
# Use tiny model for speed - small is too slow
model_name = "openai/whisper-tiny"
logger.info(f"Loading Whisper Tiny for fast processing...")
self.whisper_processor = WhisperProcessor.from_pretrained(model_name)
self.whisper_model = WhisperForConditionalGeneration.from_pretrained(
model_name,
torch_dtype=torch.float16 if self.device.type == "cuda" else torch.float32,
low_cpu_mem_usage=True
).to(self.device)
# Set to eval mode for inference
self.whisper_model.eval()
logger.info(f"βœ“ Whisper loaded on {self.device}")
except Exception as e:
logger.error(f"Failed to load Whisper: {e}")
# Fallback to pipeline
self.whisper_model = pipeline(
"automatic-speech-recognition",
model="openai/whisper-tiny",
device=0 if self.device.type == "cuda" else -1
)
def load_chat_model(self):
"""Load conversational AI model"""
try:
if self.use_large_models:
# Use larger model for better conversations
model_name = "microsoft/DialoGPT-medium"
logger.info("Loading DialoGPT-medium for better conversations...")
else:
model_name = "microsoft/DialoGPT-small"
logger.info("Loading DialoGPT-small...")
self.chat_tokenizer = AutoTokenizer.from_pretrained(model_name)
self.chat_model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.float16 if self.device.type == "cuda" else torch.float32,
low_cpu_mem_usage=True
).to(self.device)
# Add padding token
self.chat_tokenizer.pad_token = self.chat_tokenizer.eos_token
# Set to eval mode
self.chat_model.eval()
logger.info(f"βœ“ Chat model loaded on {self.device}")
except Exception as e:
logger.error(f"Failed to load chat model: {e}")
# Fallback
self.chat_model = pipeline(
"text-generation",
model="microsoft/DialoGPT-small",
device=0 if self.device.type == "cuda" else -1
)
def load_tts(self):
"""Load Text-to-Speech model"""
try:
logger.info("Loading SpeechT5 TTS model...")
self.tts_processor = SpeechT5Processor.from_pretrained("microsoft/speecht5_tts")
self.tts_model = SpeechT5ForTextToSpeech.from_pretrained(
"microsoft/speecht5_tts",
torch_dtype=torch.float16 if self.device.type == "cuda" else torch.float32
).to(self.device)
self.vocoder = SpeechT5HifiGan.from_pretrained(
"microsoft/speecht5_hifigan",
torch_dtype=torch.float16 if self.device.type == "cuda" else torch.float32
).to(self.device)
# Set to eval mode
self.tts_model.eval()
self.vocoder.eval()
# Load speaker embeddings for voice
try:
logger.info("Loading speaker embeddings dataset...")
embeddings_dataset = load_dataset("Matthijs/cmu-arctic-xvectors", split="validation")
# Use a pleasant voice (you can experiment with different indices)
self.speaker_embeddings = torch.tensor(
embeddings_dataset[7306]["xvector"]
).unsqueeze(0).to(self.device)
logger.info("βœ“ Speaker embeddings loaded from dataset")
except Exception as e:
logger.warning(f"Failed to load speaker embeddings from dataset: {e}")
logger.info("Creating default speaker embeddings...")
# Fallback: Create default speaker embeddings
# SpeechT5 expects 512-dimensional speaker embeddings
self.speaker_embeddings = torch.randn(1, 512).to(self.device)
if self.device.type == "cuda":
self.speaker_embeddings = self.speaker_embeddings.half()
logger.info("βœ“ Using default speaker embeddings")
logger.info("βœ“ TTS models loaded successfully")
except Exception as e:
logger.error(f"Failed to load TTS: {e}")
self.tts_model = None
def transcribe_audio(self, audio) -> str:
"""Convert speech to text using Whisper"""
if audio is None:
logger.warning("No audio input received")
return ""
try:
# Handle Gradio 4.x audio format (dict with 'array' and 'sample_rate')
if isinstance(audio, dict):
sample_rate = audio.get("sample_rate", 16000)
audio_data = audio.get("array", audio.get("data", None))
logger.info(f"Audio format: dict, sample_rate={sample_rate}, data shape={audio_data.shape if audio_data is not None else 'None'}")
if audio_data is None:
logger.error("Audio dict missing 'array' or 'data' key")
return "Could not process audio format."
elif isinstance(audio, tuple):
sample_rate, audio_data = audio
logger.info(f"Audio format: tuple, sample_rate={sample_rate}, data shape={audio_data.shape}")
else:
audio_data = audio
sample_rate = 16000
logger.info(f"Audio format: raw array, shape={audio_data.shape}")
# Ensure we have audio data
if audio_data is None or len(audio_data) == 0:
logger.warning("Empty audio data")
return "No audio data received."
# Log audio stats
duration_seconds = len(audio_data) / sample_rate
logger.info(f"Audio duration: {duration_seconds:.2f}s, sample_rate: {sample_rate}Hz")
# Convert to float32 if needed
logger.info(f"Audio dtype before conversion: {audio_data.dtype}")
if audio_data.dtype == np.int16:
logger.info("Converting from int16 to float32")
audio_data = audio_data.astype(np.float32) / 32768.0
elif audio_data.dtype == np.int32:
logger.info("Converting from int32 to float32")
audio_data = audio_data.astype(np.float32) / 2147483648.0
elif audio_data.dtype == np.float64:
logger.info("Converting from float64 to float32")
audio_data = audio_data.astype(np.float32)
logger.info(f"Audio dtype after conversion: {audio_data.dtype}")
# Handle stereo to mono conversion
if len(audio_data.shape) > 1 and audio_data.shape[1] > 1:
audio_data = np.mean(audio_data, axis=1)
logger.info(f"Converted stereo to mono, new shape: {audio_data.shape}")
# Check audio statistics before resampling
logger.info(f"Audio stats - min: {audio_data.min():.4f}, max: {audio_data.max():.4f}, mean: {audio_data.mean():.4f}")
# Resample to 16kHz if needed (Whisper requirement)
if sample_rate != 16000:
import librosa
audio_data = librosa.resample(audio_data, orig_sr=sample_rate, target_sr=16000)
logger.info(f"Resampled to 16kHz, new length: {len(audio_data)} samples ({len(audio_data)/16000:.2f}s)")
# Check if audio is too quiet or silent
audio_abs_mean = np.abs(audio_data).mean()
if audio_abs_mean < 0.001:
logger.warning(f"Audio might be too quiet! Abs mean: {audio_abs_mean}")
# Trim silence and limit audio length for speed (max 30 seconds)
max_samples = 16000 * 30 # 30 seconds at 16kHz
if len(audio_data) > max_samples:
logger.warning(f"Audio trimmed from {len(audio_data)/16000:.1f}s to 30s")
audio_data = audio_data[:max_samples]
if self.whisper_processor and hasattr(self.whisper_model, 'generate'):
# Use loaded model
input_features = self.whisper_processor(
audio_data,
sampling_rate=16000,
return_tensors="pt"
).input_features.to(self.device)
logger.info(f"Whisper input_features shape: {input_features.shape}, device: {input_features.device}")
# Generate token ids - optimized for speed
with torch.cuda.amp.autocast(enabled=self.device.type == "cuda"):
with torch.no_grad():
# Force English language to avoid language detection overhead
forced_decoder_ids = self.whisper_processor.get_decoder_prompt_ids(
language="en",
task="transcribe"
)
logger.info(f"Forced decoder IDs: {forced_decoder_ids}")
predicted_ids = self.whisper_model.generate(
input_features,
forced_decoder_ids=forced_decoder_ids,
max_new_tokens=64, # Reduced for faster processing
num_beams=1, # Greedy decoding for speed
do_sample=False # Deterministic
)
logger.info(f"Predicted token IDs shape: {predicted_ids.shape}, first 10 IDs: {predicted_ids[0][:10].tolist()}")
# Decode token ids to text
transcription = self.whisper_processor.batch_decode(
predicted_ids,
skip_special_tokens=True
)[0]
else:
# Use pipeline
transcription = self.whisper_model(audio_data)["text"]
# Clear CUDA cache to prevent memory buildup
if self.device.type == "cuda":
torch.cuda.empty_cache()
logger.info(f"Transcribed: {transcription}")
return transcription.strip()
except Exception as e:
logger.error(f"Transcription error: {e}")
return "Could not transcribe audio. Please try again."
def generate_response(self, text: str, conversation_history: list = None, temperature: float = 0.8) -> str:
"""Generate AI response with conversation context"""
if not text:
return "I didn't catch that. Could you please repeat?"
try:
# Build conversation context
if conversation_history:
context = ""
for user_msg, bot_msg in conversation_history[-3:]: # Last 3 exchanges
context += f"User: {user_msg}\nAssistant: {bot_msg}\n"
context += f"User: {text}\nAssistant:"
logger.info(f"Input text: '{text}' | History entries: {len(conversation_history)}")
else:
context = f"User: {text}\nAssistant:"
logger.info(f"Input text: '{text}' | No history")
logger.debug(f"Full context sent to model:\n{context}")
if self.chat_tokenizer and hasattr(self.chat_model, 'generate'):
# Tokenize input
inputs = self.chat_tokenizer.encode(
context,
return_tensors="pt",
truncation=True,
max_length=512
).to(self.device)
# Generate response - optimized for speed
with torch.cuda.amp.autocast(enabled=self.device.type == "cuda"):
with torch.no_grad():
outputs = self.chat_model.generate(
inputs,
max_new_tokens=50, # Shorter for faster response
temperature=temperature,
top_p=0.9,
do_sample=True if temperature > 0 else False,
pad_token_id=self.chat_tokenizer.eos_token_id,
eos_token_id=self.chat_tokenizer.eos_token_id,
num_beams=1 # Greedy for speed
)
# Decode response
full_response = self.chat_tokenizer.decode(outputs[0], skip_special_tokens=True)
logger.debug(f"Raw model output: '{full_response}'")
# Clean response
response = full_response.replace(context, "").strip()
logger.info(f"Generated response: '{response}'")
else:
# Use pipeline
result = self.chat_model(
text,
max_new_tokens=100,
temperature=temperature,
do_sample=True
)
response = result[0]['generated_text'].replace(text, "").strip()
# Clear CUDA cache
if self.device.type == "cuda":
torch.cuda.empty_cache()
return response if response else "I understand. Tell me more!"
except Exception as e:
logger.error(f"Generation error: {e}")
return "I had a moment of confusion. Could you rephrase that?"
def synthesize_speech(self, text: str, speed: float = 1.0) -> Optional[Tuple[int, np.ndarray]]:
"""Convert text to speech"""
if not text or not self.tts_model or self.speaker_embeddings is None:
if not self.tts_model:
logger.warning("TTS model not loaded")
if self.speaker_embeddings is None:
logger.warning("Speaker embeddings not available")
return None
try:
logger.info(f"Synthesizing speech for text: '{text}'")
# Truncate if too long and warn
max_chars = 600
if len(text) > max_chars:
logger.warning(f"Text truncated from {len(text)} to {max_chars} characters for TTS")
text = text[:max_chars] + "..."
# Prepare text input
inputs = self.tts_processor(
text=text,
return_tensors="pt",
truncation=True,
max_length=600 # SpeechT5 limit
)
input_ids = inputs["input_ids"].to(self.device)
# Generate speech
with torch.cuda.amp.autocast(enabled=self.device.type == "cuda"):
with torch.no_grad():
speech = self.tts_model.generate_speech(
input_ids,
self.speaker_embeddings,
vocoder=self.vocoder
)
# Convert to numpy
speech_np = speech.cpu().numpy()
# Apply speed adjustment if needed
if speed != 1.0:
import librosa
speech_np = librosa.effects.time_stretch(speech_np, rate=speed)
# Clear CUDA cache
if self.device.type == "cuda":
torch.cuda.empty_cache()
# Return with sample rate
return (16000, speech_np)
except Exception as e:
logger.error(f"TTS error: {e}")
return None
def process_voice_to_voice(self, audio, conversation_history=None, temperature=0.8, speed=1.0) -> Tuple[str, str, Optional[Tuple[int, np.ndarray]]]:
"""Complete voice-to-voice pipeline"""
start_time = time.time()
# Step 1: Transcribe
logger.info("Processing voice input...")
user_text = self.transcribe_audio(audio)
if "Could not transcribe" in user_text or "No audio data" in user_text:
return user_text, "Please try speaking again.", None
# Step 2: Generate response
logger.info("Generating response...")
response_text = self.generate_response(user_text, conversation_history, temperature)
# Step 3: Synthesize speech
logger.info("Generating voice output...")
response_audio = self.synthesize_speech(response_text, speed)
total_time = time.time() - start_time
logger.info(f"Total processing time: {total_time:.2f}s")
return user_text, response_text, response_audio
# Global instance
agent = ProfessionalVoiceAgent(use_large_models=True)
def create_professional_interface():
"""Create professional voice interface"""
custom_css = """
.container {max-width: 900px; margin: auto; padding: 20px;}
.main-button {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
border: none;
padding: 20px 40px;
border-radius: 50px;
font-size: 18px;
font-weight: bold;
cursor: pointer;
color: white;
transition: all 0.3s;
}
.main-button:hover {transform: scale(1.05);}
.status-box {
padding: 10px;
border-radius: 10px;
margin: 10px 0;
text-align: center;
}
"""
with gr.Blocks(title="Professional Voice Agent", css=custom_css) as interface:
# Store conversation history
conversation_history = gr.State([])
gr.HTML("""
<div class="container">
<h1 style="text-align: center;">πŸŽ™οΈ Professional Voice Assistant</h1>
<p style="text-align: center;">GPU-powered voice agent with high-quality speech recognition and synthesis</p>
</div>
""")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### 🎀 Voice Input")
audio_input = gr.Audio(
sources=["microphone", "upload"],
type="numpy",
label="Click microphone to record",
elem_classes=["audio-input"]
)
with gr.Row():
clear_audio = gr.Button("πŸ—‘οΈ Clear", size="sm")
process_btn = gr.Button("πŸš€ Process Voice", variant="primary", size="lg", elem_classes=["main-button"])
gr.Markdown("""
**Tips for best results:**
- Speak clearly and naturally
- Avoid background noise
- Keep messages concise
- Wait for complete processing
""")
with gr.Column(scale=1):
gr.Markdown("### πŸ’¬ Conversation")
user_text = gr.Textbox(
label="You said:",
lines=2,
interactive=False
)
response_text = gr.Textbox(
label="Assistant response:",
lines=3,
interactive=False
)
response_audio = gr.Audio(
label="πŸ”Š Voice Response",
type="numpy",
autoplay=True,
elem_classes=["audio-output"]
)
status = gr.Textbox(
label="Status",
value="Ready",
interactive=False,
elem_classes=["status-box"]
)
# Conversation history display
with gr.Row():
gr.Markdown("### πŸ“ Conversation History")
chat_history = gr.Chatbot(
height=300,
bubble_full_width=False,
avatar_images=["πŸ§‘", "πŸ€–"]
)
# Advanced settings
with gr.Accordion("βš™οΈ Advanced Settings", open=False):
with gr.Row():
temperature = gr.Slider(0.1, 1.0, 0.8, label="Response Creativity (Temperature)")
voice_speed = gr.Slider(0.5, 2.0, 1.0, label="Voice Speed")
clear_history = gr.Button("Clear History")
# Processing pipeline
def process_audio_pipeline(audio, history, temp, speed):
if audio is None:
return (
"",
"Please record or upload audio first.",
None,
"No audio detected",
history if history else [],
history if history else []
)
# Initialize history if None
if history is None:
history = []
# Update status
status_msg = "Processing... πŸ”„"
# Process voice-to-voice
user_text_result, bot_response, audio_response = agent.process_voice_to_voice(
audio,
history,
temperature=temp,
speed=speed
)
# Update history
history.append((user_text_result, bot_response))
# Format for chatbot display
chat_display = [(u, b) for u, b in history]
return (
user_text_result,
bot_response,
audio_response,
"βœ… Complete",
history,
chat_display
)
process_btn.click(
fn=process_audio_pipeline,
inputs=[audio_input, conversation_history, temperature, voice_speed],
outputs=[
user_text,
response_text,
response_audio,
status,
conversation_history,
chat_history
]
)
clear_audio.click(
lambda: None,
outputs=[audio_input]
)
clear_history.click(
lambda: ([], []),
outputs=[conversation_history, chat_history]
)
# Examples
gr.Markdown("### πŸ’‘ Example Phrases")
gr.Examples(
examples=[
["Hello, introduce yourself"],
["What's the weather like today?"],
["Tell me an interesting fact"],
["How can you help me?"],
["What are your capabilities?"]
],
inputs=[user_text],
examples_per_page=5
)
# System info
with gr.Accordion("πŸ“Š System Information", open=False):
system_info = f"""
- **Device**: {agent.device}
- **GPU Available**: {torch.cuda.is_available()}
"""
if torch.cuda.is_available():
system_info += f"""
- **GPU Model**: {torch.cuda.get_device_name(0)}
- **GPU Memory**: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB
- **Models**: Large variants loaded for best quality
"""
else:
system_info += "\n- **Note**: Running on CPU (slower performance)"
gr.Markdown(system_info)
return interface
# Create the interface
demo = create_professional_interface()
if __name__ == "__main__":
print("="*50)
print("Professional Voice Agent - GPU Optimized")
print("="*50)
print(f"Device: {agent.device}")
if torch.cuda.is_available():
print(f"GPU: {torch.cuda.get_device_name(0)}")
print(f"Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB")
print("="*50)
print("Starting server...")
demo.queue(max_size=5, default_concurrency_limit=1) # Manage GPU memory
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
max_threads=2 # Limit for GPU memory
)