Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| import sys | |
| import io | |
| import os | |
| import locale | |
| # Comprehensive UTF-8 encoding setup for Windows | |
| if sys.platform.startswith('win'): | |
| try: | |
| # Set locale to UTF-8 | |
| try: | |
| locale.setlocale(locale.LC_ALL, 'en_US.UTF-8') | |
| except: | |
| try: | |
| locale.setlocale(locale.LC_ALL, 'C.UTF-8') | |
| except: | |
| pass | |
| # Set console to UTF-8 mode | |
| os.system('chcp 65001 > nul 2>&1') | |
| # Set environment variables for UTF-8 | |
| os.environ['PYTHONIOENCODING'] = 'utf-8:replace' | |
| os.environ['PYTHONUTF8'] = '1' | |
| # Force UTF-8 encoding for stdout/stderr with error handling | |
| try: | |
| if hasattr(sys.stdout, 'reconfigure'): | |
| sys.stdout.reconfigure(encoding='utf-8', errors='replace') | |
| sys.stderr.reconfigure(encoding='utf-8', errors='replace') | |
| else: | |
| # Fallback for older Python versions | |
| sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace') | |
| sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace') | |
| except Exception: | |
| # Final fallback | |
| sys.stdout = io.TextIOWrapper(sys.stdout.detach(), encoding='utf-8', errors='replace') | |
| sys.stderr = io.TextIOWrapper(sys.stderr.detach(), encoding='utf-8', errors='replace') | |
| except Exception as e: | |
| # Silently continue if encoding setup fails | |
| pass | |
| # Store original print function before any imports | |
| import builtins | |
| _original_print = builtins.print | |
| def safe_print(*args, **kwargs): | |
| """Safe print function that handles UTF-8 encoding""" | |
| try: | |
| # Convert all arguments to strings first to avoid encoding issues | |
| safe_args = [] | |
| for arg in args: | |
| if isinstance(arg, str): | |
| # Ensure string can be encoded/decoded properly | |
| try: | |
| arg.encode('utf-8') | |
| safe_args.append(arg) | |
| except UnicodeEncodeError: | |
| safe_args.append(arg.encode('utf-8', errors='replace').decode('utf-8')) | |
| else: | |
| safe_args.append(str(arg)) | |
| _original_print(*safe_args, **kwargs) | |
| except (UnicodeEncodeError, UnicodeDecodeError) as e: | |
| # Last resort: convert to ASCII with replacement | |
| ascii_args = [] | |
| for arg in args: | |
| if isinstance(arg, str): | |
| ascii_args.append(arg.encode('ascii', errors='replace').decode('ascii')) | |
| else: | |
| ascii_args.append(str(arg).encode('ascii', errors='replace').decode('ascii')) | |
| _original_print(*ascii_args, **kwargs) | |
| except Exception: | |
| # Ultimate fallback | |
| _original_print("[Encoding Error in Print]") | |
| # Override built-in print | |
| builtins.print = safe_print | |
| """ | |
| Live Translation AI Agent - Two Person Mode | |
| Real-time Cross-Translation between Person A & Person B | |
| """ | |
| import gradio as gr | |
| import numpy as np | |
| import librosa | |
| import soundfile as sf | |
| import tempfile | |
| import os | |
| import time | |
| import logging | |
| import json | |
| from typing import Optional, Tuple, Dict, List | |
| import asyncio | |
| import threading | |
| from pathlib import Path | |
| # Load environment variables from .env file | |
| try: | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| print("Environment variables loaded from .env file") | |
| except ImportError: | |
| print("python-dotenv not available, using system environment variables") | |
| # Google Gemini integration | |
| try: | |
| import google.generativeai as genai | |
| GEMINI_AVAILABLE = True | |
| print("Google Gemini library loaded successfully") | |
| except ImportError: | |
| GEMINI_AVAILABLE = False | |
| print("Google Gemini library not available") | |
| # Google Speech Recognition integration | |
| try: | |
| import speech_recognition as sr | |
| SPEECH_RECOGNITION_AVAILABLE = True | |
| print("SpeechRecognition library loaded successfully") | |
| except ImportError: | |
| SPEECH_RECOGNITION_AVAILABLE = False | |
| print("SpeechRecognition library not available") | |
| # Edge TTS for speech synthesis | |
| try: | |
| import edge_tts | |
| EDGE_TTS_AVAILABLE = True | |
| print("Edge TTS loaded successfully") | |
| except ImportError: | |
| EDGE_TTS_AVAILABLE = False | |
| print("Edge TTS not available") | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class TranslationAIAgent: | |
| """Main AI Agent for translation tasks - Google Gemini Powered""" | |
| def __init__(self): | |
| # Enhanced language and voice options with country flags | |
| self.language_voice_options = { | |
| 'en': { | |
| 'name': 'English', | |
| 'options': [ | |
| {'code': 'en-us', 'display': 'English (United States)', 'voice': 'en-US-JennyNeural', 'alt_voice': 'en-US-GuyNeural'}, | |
| {'code': 'en-gb', 'display': 'English (United Kingdom)', 'voice': 'en-GB-LibbyNeural', 'alt_voice': 'en-GB-RyanNeural'}, | |
| ] | |
| }, | |
| 'es': { | |
| 'name': 'Spanish', | |
| 'options': [ | |
| {'code': 'es-es', 'display': 'Spanish (Spain)', 'voice': 'es-ES-ElviraNeural', 'alt_voice': 'es-ES-AlvaroNeural'}, | |
| {'code': 'es-mx', 'display': 'Spanish (Mexico)', 'voice': 'es-MX-DaliaNeural', 'alt_voice': 'es-MX-JorgeNeural'}, | |
| ] | |
| }, | |
| 'fr': { | |
| 'name': 'French', | |
| 'options': [ | |
| {'code': 'fr-fr', 'display': 'French (France)', 'voice': 'fr-FR-DeniseNeural', 'alt_voice': 'fr-FR-HenriNeural'}, | |
| {'code': 'fr-ca', 'display': 'French (Canada)', 'voice': 'fr-CA-SylvieNeural', 'alt_voice': 'fr-CA-AntoineNeural'}, | |
| ] | |
| }, | |
| 'de': { | |
| 'name': 'German', | |
| 'options': [ | |
| {'code': 'de-de', 'display': 'German (Germany)', 'voice': 'de-DE-KatjaNeural', 'alt_voice': 'de-DE-ConradNeural'}, | |
| ] | |
| }, | |
| 'vi': { | |
| 'name': 'Vietnamese', | |
| 'options': [ | |
| {'code': 'vi-vn', 'display': 'Vietnamese (Vietnam)', 'voice': 'vi-VN-HoaiMyNeural', 'alt_voice': 'vi-VN-NamMinhNeural'} | |
| ] | |
| }, | |
| 'ja': { | |
| 'name': 'Japanese', | |
| 'options': [ | |
| {'code': 'ja-jp', 'display': 'Japanese (Japan)', 'voice': 'ja-JP-NanamiNeural', 'alt_voice': 'ja-JP-KeitaNeural'} | |
| ] | |
| }, | |
| 'zh': { | |
| 'name': 'Chinese', | |
| 'options': [ | |
| {'code': 'zh-cn', 'display': 'Chinese (Simplified)', 'voice': 'zh-CN-XiaoxiaoNeural', 'alt_voice': 'zh-CN-YunxiNeural'}, | |
| ] | |
| } | |
| } | |
| # Create simple supported languages mapping for backward compatibility | |
| self.supported_languages = { | |
| lang_code: lang_info['name'] | |
| for lang_code, lang_info in self.language_voice_options.items() | |
| } | |
| # Create default voice mapping for backward compatibility | |
| self.voice_map = { | |
| lang_code: lang_info['options'][0]['voice'] | |
| for lang_code, lang_info in self.language_voice_options.items() | |
| } | |
| self.setup_gemini_client() | |
| self.setup_speech_recognizer() | |
| def setup_gemini_client(self): | |
| """Setup Google Gemini client for translation""" | |
| try: | |
| if not GEMINI_AVAILABLE: | |
| logger.error("Google Gemini library not available - please install: pip install google-generativeai") | |
| self.gemini_model = None | |
| self.gemini_configured = False | |
| return | |
| # Get Google API key from environment | |
| api_key = ( | |
| os.environ.get("GOOGLE_API_KEY") or | |
| os.environ.get("GEMINI_API_KEY") or | |
| os.getenv("GOOGLE_API_KEY") | |
| ) | |
| if api_key and api_key.strip() and not api_key.strip().startswith("your-"): | |
| genai.configure(api_key=api_key.strip()) | |
| self.gemini_model = genai.GenerativeModel('gemini-2.0-flash-exp') | |
| self.gemini_configured = True | |
| logger.info("[SUCCESS] Google Gemini client configured successfully - Real translation mode enabled") | |
| else: | |
| self.gemini_model = None | |
| self.gemini_configured = False | |
| logger.error("β Google API key not found or invalid in environment variables") | |
| logger.error("Please set GOOGLE_API_KEY in your .env file with a valid API key") | |
| except Exception as e: | |
| logger.error(f"Gemini setup failed: {e}") | |
| self.gemini_model = None | |
| self.gemini_configured = False | |
| def setup_speech_recognizer(self): | |
| """Setup speech recognizer for audio input""" | |
| try: | |
| if not SPEECH_RECOGNITION_AVAILABLE: | |
| logger.error("SpeechRecognition library not available - please install: pip install SpeechRecognition") | |
| self.recognizer = None | |
| self.speech_configured = False | |
| return | |
| self.recognizer = sr.Recognizer() | |
| # More conservative settings for better recognition | |
| self.recognizer.energy_threshold = 1000 # Lower threshold for processed audio | |
| self.recognizer.dynamic_energy_threshold = False # More consistent | |
| self.recognizer.pause_threshold = 0.5 # Shorter pauses | |
| self.recognizer.operation_timeout = 15 # Longer timeout | |
| self.recognizer.phrase_threshold = 0.3 # More sensitive phrase detection | |
| self.recognizer.non_speaking_duration = 0.2 # Less aggressive silence detection | |
| self.speech_configured = True | |
| logger.info("[SUCCESS] Speech recognizer configured successfully") | |
| except Exception as e: | |
| logger.error(f"Speech recognizer setup failed: {e}") | |
| self.recognizer = None | |
| self.speech_configured = False | |
| def speech_to_text(self, audio_path: str, language: str = 'auto') -> tuple[str, str]: | |
| """Convert speech to text using Gemini Flash 2.0 with language detection or specified language""" | |
| try: | |
| if not self.gemini_configured: | |
| raise Exception("Gemini client not configured. Please check your API key.") | |
| if not os.path.exists(audio_path): | |
| raise Exception(f"Audio file not found: {audio_path}") | |
| # Load and preprocess audio with better error handling | |
| try: | |
| # Wait a bit to ensure file is fully written | |
| import time as time_module | |
| time_module.sleep(0.5) | |
| # Try to access the file multiple times if needed | |
| for attempt in range(3): | |
| try: | |
| y, sr_rate = librosa.load(audio_path, sr=16000, duration=30) | |
| break | |
| except Exception as e: | |
| if attempt < 2: | |
| logger.warning(f"Audio loading attempt {attempt + 1} failed: {e}, retrying...") | |
| time_module.sleep(0.5) | |
| else: | |
| raise e | |
| if len(y) == 0: | |
| return "No audio data found", "unknown" | |
| # Check for audio clipping and quality issues | |
| max_amplitude = np.max(np.abs(y)) | |
| rms_level = np.sqrt(np.mean(y**2)) | |
| logger.info(f"Audio quality check - Max: {max_amplitude:.4f}, RMS: {rms_level:.4f}, Duration: {len(y)/sr_rate:.2f}s") | |
| # Handle clipped audio (amplitude = 1.0 means clipping) | |
| if max_amplitude >= 0.99: | |
| logger.warning("β οΈ Audio appears to be clipped - applying de-clipping") | |
| # Apply soft clipping recovery | |
| y = np.tanh(y * 0.8) * 0.9 # Soft compression to recover from clipping | |
| max_amplitude = np.max(np.abs(y)) | |
| # Check if audio is too quiet | |
| if rms_level < 0.01: | |
| logger.warning("β οΈ Audio level very low - boosting signal") | |
| # Boost quiet audio | |
| y = y * (0.1 / rms_level) | |
| y = np.clip(y, -0.95, 0.95) # Prevent new clipping | |
| elif rms_level > 0.5: | |
| logger.warning("β οΈ Audio level very high - reducing signal") | |
| # Reduce loud audio | |
| y = y * (0.3 / rms_level) | |
| # Final normalization to safe level | |
| if max_amplitude > 0.8: | |
| y = y * (0.7 / max_amplitude) | |
| elif max_amplitude > 0: | |
| y = y * (0.7 / max_amplitude) # Normalize to 70% to avoid clipping | |
| logger.info(f"After processing - Max: {np.max(np.abs(y)):.4f}, RMS: {np.sqrt(np.mean(y**2)):.4f}") | |
| # Apply simple noise reduction | |
| if len(y) > sr_rate: # Only if audio is longer than 1 second | |
| # Calculate RMS energy | |
| frame_length = int(0.025 * sr_rate) # 25ms frames | |
| hop_length = int(0.010 * sr_rate) # 10ms hop | |
| rms = librosa.feature.rms(y=y, frame_length=frame_length, hop_length=hop_length)[0] | |
| # Simple voice activity detection | |
| rms_threshold = np.percentile(rms, 30) # Bottom 30% is likely silence | |
| # Keep frames above threshold | |
| frame_indices = np.where(rms > rms_threshold)[0] | |
| if len(frame_indices) > 0: | |
| start_frame = max(0, frame_indices[0] - 2) | |
| end_frame = min(len(rms) - 1, frame_indices[-1] + 2) | |
| start_sample = start_frame * hop_length | |
| end_sample = min(len(y), end_frame * hop_length + frame_length) | |
| y = y[start_sample:end_sample] | |
| # Save processed audio to temporary file with unique name | |
| temp_dir = tempfile.gettempdir() | |
| temp_audio = os.path.join(temp_dir, f"speech_audio_{os.getpid()}_{int(time.time() * 1000)}.wav") | |
| # Ensure we can write to the temp file | |
| try: | |
| sf.write(temp_audio, y, sr_rate, format='WAV', subtype='PCM_16') | |
| # Verify file was written | |
| if not os.path.exists(temp_audio) or os.path.getsize(temp_audio) == 0: | |
| raise Exception("Failed to write temporary audio file") | |
| except Exception as e: | |
| logger.error(f"Failed to save temporary audio: {e}") | |
| return f"Audio processing failed: {str(e)}", "unknown" | |
| # Give file system time to finish writing | |
| time_module.sleep(0.1) | |
| # Use Gemini Flash 2.0 for speech-to-text | |
| logger.info("π§ Using Gemini Flash 2.0 for speech recognition...") | |
| try: | |
| # Upload audio file to Gemini | |
| import google.generativeai as genai | |
| # Upload the audio file | |
| audio_file = genai.upload_file(temp_audio, mime_type="audio/wav") | |
| logger.info(f"π€ Audio uploaded to Gemini: {audio_file.name}") | |
| # Create prompt based on language preference | |
| if language == 'auto': | |
| prompt = """Please transcribe this audio file. | |
| Instructions: | |
| 1. Listen to the audio and transcribe exactly what is spoken | |
| 2. Detect the language automatically | |
| 3. Provide the transcription in the original language | |
| 4. Return ONLY the transcribed text, no explanations | |
| 5. If you cannot understand the audio, respond with "RECOGNITION_FAILED" | |
| Transcription:""" | |
| else: | |
| # Map language codes to language names | |
| lang_name_map = { | |
| 'en-US': 'English', 'vi-VN': 'Vietnamese', 'es-ES': 'Spanish', | |
| 'fr-FR': 'French', 'de-DE': 'German', 'ja-JP': 'Japanese', | |
| 'zh-CN': 'Chinese', 'ko-KR': 'Korean', 'it-IT': 'Italian', | |
| 'pt-PT': 'Portuguese', 'ru-RU': 'Russian', 'ar-SA': 'Arabic', | |
| 'hi-IN': 'Hindi', 'th-TH': 'Thai', 'tr-TR': 'Turkish' | |
| } | |
| expected_lang = lang_name_map.get(language, 'English') | |
| prompt = f"""Please transcribe this audio file in {expected_lang}. | |
| Instructions: | |
| 1. Listen to the audio and transcribe exactly what is spoken | |
| 2. The audio should be in {expected_lang} | |
| 3. Provide the transcription in {expected_lang} | |
| 4. Return ONLY the transcribed text, no explanations | |
| 5. If you cannot understand the audio, respond with "RECOGNITION_FAILED" | |
| Transcription:""" | |
| # Generate transcription with Gemini | |
| response = self.gemini_model.generate_content([prompt, audio_file]) | |
| transcription = response.text.strip() | |
| logger.info(f"π§ Gemini transcription result: {transcription[:100]}...") | |
| # Clean up uploaded file | |
| try: | |
| genai.delete_file(audio_file.name) | |
| logger.info("ποΈ Cleaned up uploaded file from Gemini") | |
| except: | |
| pass | |
| # Check if recognition failed | |
| if transcription == "RECOGNITION_FAILED" or "cannot understand" in transcription.lower(): | |
| logger.warning("β Gemini could not understand the audio") | |
| return "Could not understand speech - please try speaking more clearly or check your microphone", "unknown" | |
| # Detect language of transcription using Gemini | |
| detected_language = self.detect_language_with_gemini(transcription) | |
| logger.info(f"β Gemini transcription successful: {transcription[:50]}...") | |
| logger.info(f"β Detected language: {detected_language}") | |
| return transcription, detected_language | |
| except Exception as gemini_error: | |
| logger.error(f"β Gemini transcription failed: {gemini_error}") | |
| return f"Gemini transcription failed: {str(gemini_error)}", "unknown" | |
| finally: | |
| # Clean up temp file | |
| try: | |
| os.remove(temp_audio) | |
| except Exception as e: | |
| logger.warning(f"Failed to cleanup temp file: {e}") | |
| except Exception as audio_error: | |
| logger.error(f"Audio processing error: {audio_error}") | |
| return f"Audio processing failed: {str(audio_error)}", "unknown" | |
| except Exception as e: | |
| error_msg = str(e) | |
| logger.error(f"Speech recognition error: {error_msg}") | |
| raise Exception(f"Speech recognition failed: {error_msg}") | |
| def detect_language_with_gemini(self, text: str) -> str: | |
| """Use Gemini to detect language of text""" | |
| try: | |
| if not self.gemini_configured or not text.strip(): | |
| return "English" | |
| prompt = f"""Analyze this text and identify the language. Respond with just the language name in English (e.g., "English", "Vietnamese", "Spanish", etc.): | |
| {text[:200]}""" | |
| response = self.gemini_model.generate_content(prompt) | |
| detected_lang = response.text.strip() | |
| # Validate response | |
| valid_languages = ['English', 'Vietnamese', 'Spanish', 'French', 'German', 'Japanese', 'Chinese', 'Korean', 'Italian', 'Portuguese', 'Russian', 'Arabic', 'Hindi', 'Thai', 'Turkish'] | |
| if detected_lang in valid_languages: | |
| return detected_lang | |
| else: | |
| return "English" | |
| except Exception as e: | |
| logger.warning(f"Gemini language detection failed: {e}") | |
| return "English" | |
| def get_audio_duration(self, audio_path: str) -> float: | |
| """Get duration of audio file""" | |
| try: | |
| y, sr = librosa.load(audio_path) | |
| return len(y) / sr | |
| except: | |
| return 0.0 | |
| def translate_text(self, text: str, source_lang: str, target_lang: str) -> str: | |
| """Translate text using Google Gemini Flash 2.0""" | |
| try: | |
| if not self.gemini_configured: | |
| raise Exception("Google Gemini client not configured. Please check your API key.") | |
| # Create translation prompt | |
| source_name = self.supported_languages.get(source_lang, source_lang) | |
| target_name = self.supported_languages.get(target_lang, target_lang) | |
| prompt = f"""Translate the following {source_name} text to {target_name}. Provide only the translation, no explanations or additional text: | |
| {text}""" | |
| response = self.gemini_model.generate_content(prompt) | |
| translated_text = response.text.strip() | |
| if translated_text: | |
| logger.info(f"Gemini Flash 2.0 translation successful: {translated_text[:100]}...") | |
| return translated_text | |
| else: | |
| raise Exception("Empty translation response from Gemini") | |
| except Exception as e: | |
| error_msg = str(e) | |
| logger.error(f"Translation error: {error_msg}") | |
| # Check for quota exceeded error | |
| if "429" in error_msg or "quota" in error_msg.lower() or "insufficient_quota" in error_msg.lower(): | |
| logger.warning("[WARNING] Google Gemini API quota exceeded - using fallback translation") | |
| target_name = self.supported_languages.get(target_lang, target_lang) | |
| return f"[API Quota Exceeded] Please add credits to your Google account. Original text: {text}" | |
| raise Exception(f"Translation failed: {error_msg}") | |
| async def generate_speech_with_custom_voice(self, text: str, voice: str) -> str: | |
| """Generate speech using Edge TTS with custom voice""" | |
| try: | |
| if not EDGE_TTS_AVAILABLE: | |
| logger.warning("Edge TTS not available") | |
| return None | |
| # Create temporary output file | |
| temp_dir = tempfile.gettempdir() | |
| output_path = os.path.join(temp_dir, f"tts_output_{int(time.time())}.wav") | |
| # Generate speech with specific voice | |
| communicate = edge_tts.Communicate(text, voice) | |
| await communicate.save(output_path) | |
| if os.path.exists(output_path): | |
| logger.info(f"Edge TTS generated with {voice}: {output_path}") | |
| return output_path | |
| else: | |
| return None | |
| except Exception as e: | |
| logger.error(f"TTS Error: {e}") | |
| return None | |
| def process_audio_translation_with_voice( | |
| self, | |
| audio_path: str, | |
| target_lang: str, | |
| voice: str, | |
| input_language: str = 'auto' | |
| ) -> Tuple[str, str, str, Optional[str]]: | |
| """Complete audio translation pipeline with custom voice selection and input language option""" | |
| if not audio_path: | |
| return "Please upload an audio file", "", "", None | |
| input_desc = "auto-detection" if input_language == 'auto' else f"specified language ({input_language})" | |
| logger.info(f"Processing audio translation with {input_desc} -> {target_lang} (voice: {voice})") | |
| # Step 1: Speech to text with language detection or specified language | |
| logger.info(f"Step 1: Transcribing audio with {input_desc}...") | |
| transcribed_text, detected_language = self.speech_to_text(audio_path, input_language) | |
| if transcribed_text.startswith("Error"): | |
| return transcribed_text, "", "", None | |
| logger.info(f"Transcription: {transcribed_text[:100]}...") | |
| logger.info(f"Language: {detected_language}") | |
| # Step 2: Translate text with Gemini Flash 2.0 using detected/specified language | |
| logger.info("Step 2: Translating text...") | |
| # Map detected language name to code for translation | |
| lang_code_map = { | |
| 'English': 'en', 'Vietnamese': 'vi', 'Spanish': 'es', 'French': 'fr', | |
| 'German': 'de', 'Italian': 'it', 'Portuguese': 'pt', 'Russian': 'ru', | |
| 'Japanese': 'ja', 'Korean': 'ko', 'Chinese': 'zh', 'Arabic': 'ar', | |
| 'Hindi': 'hi', 'Thai': 'th', 'Turkish': 'tr' | |
| } | |
| detected_lang_code = lang_code_map.get(detected_language, 'en') | |
| translated_text = self.translate_text(transcribed_text, detected_lang_code, target_lang) | |
| if translated_text.startswith("[Translation Error]"): | |
| return transcribed_text, detected_language, translated_text, None | |
| logger.info(f"Translation: {translated_text[:100]}...") | |
| # Step 3: Generate speech with Edge TTS using custom voice | |
| logger.info(f"Step 3: Generating speech with voice: {voice}") | |
| audio_output = asyncio.run(self.generate_speech_with_custom_voice(translated_text, voice)) | |
| if audio_output: | |
| logger.info("Complete translation pipeline successful!") | |
| else: | |
| logger.warning("TTS generation failed, returning text only") | |
| return transcribed_text, detected_language, translated_text, audio_output | |
| # Initialize AI Agent | |
| agent = TranslationAIAgent() | |
| # Interface Functions | |
| def get_country_options() -> List[str]: | |
| """Get country options with flags for target language""" | |
| choices = [] | |
| for lang_code, lang_info in agent.language_voice_options.items(): | |
| for option in lang_info['options']: | |
| choice = f"{option['code']} | {option['display']}" | |
| choices.append(choice) | |
| return sorted(choices) | |
| def get_input_language_options() -> List[str]: | |
| """Get input language options for speech recognition""" | |
| choices = ["auto | Auto-detect Language (Recommended)"] | |
| # Add specific language options | |
| language_options = [ | |
| ("en-US", "English (United States)"), | |
| ("vi-VN", "Vietnamese (Vietnam)"), | |
| ("es-ES", "Spanish (Spain)"), | |
| ("fr-FR", "French (France)"), | |
| ("de-DE", "German (Germany)"), | |
| ("ja-JP", "Japanese (Japan)"), | |
| ("zh-CN", "Chinese (Simplified)"), | |
| ("ko-KR", "Korean (South Korea)"), | |
| ("it-IT", "Italian (Italy)"), | |
| ("pt-PT", "Portuguese (Portugal)"), | |
| ("ru-RU", "Russian (Russia)"), | |
| ("ar-SA", "Arabic (Saudi Arabia)"), | |
| ("hi-IN", "Hindi (India)"), | |
| ("th-TH", "Thai (Thailand)"), | |
| ("tr-TR", "Turkish (Turkey)") | |
| ] | |
| for code, display in language_options: | |
| choice = f"{code} | {display}" | |
| choices.append(choice) | |
| return choices | |
| def get_voice_options_for_country(country_selection: str) -> List[str]: | |
| """Get voice options for selected country""" | |
| if not country_selection or '|' not in country_selection: | |
| return ["Jenny (Female)", "Guy (Male)"] | |
| code = country_selection.split(' | ')[0].strip() | |
| for lang_info in agent.language_voice_options.values(): | |
| for option in lang_info['options']: | |
| if option['code'] == code: | |
| main_voice = option['voice'].replace('Neural', '').split('-')[-1] | |
| alt_voice = option['alt_voice'].replace('Neural', '').split('-')[-1] | |
| def get_gender(voice_name): | |
| female_names = ['Jenny', 'Libby', 'Natasha', 'Clara', 'Elvira', 'Dalia', 'Denise', 'Sylvie', 'Katja', 'Elsa', 'Raquel', 'Francisca', 'Svetlana', 'Nanami', 'SunHi', 'Xiaoxiao', 'HoaiMy'] | |
| return "(Female)" if any(name in voice_name for name in female_names) else "(Male)" | |
| return [ | |
| f"{main_voice} {get_gender(main_voice)}", | |
| f"{alt_voice} {get_gender(alt_voice)}" | |
| ] | |
| return ["Jenny (Female)", "Guy (Male)"] | |
| def get_voice_code_from_selections(country_selection: str, voice_selection: str) -> str: | |
| """Get full voice code from country and voice selections""" | |
| if not country_selection or '|' not in country_selection: | |
| return 'en-US-JennyNeural' | |
| code = country_selection.split(' | ')[0].strip() | |
| voice_name = voice_selection.split(' (')[0].strip() | |
| for lang_info in agent.language_voice_options.values(): | |
| for option in lang_info['options']: | |
| if option['code'] == code: | |
| main_voice_name = option['voice'].replace('Neural', '').split('-')[-1] | |
| alt_voice_name = option['alt_voice'].replace('Neural', '').split('-')[-1] | |
| if voice_name == main_voice_name: | |
| return option['voice'] | |
| elif voice_name == alt_voice_name: | |
| return option['alt_voice'] | |
| return 'en-US-JennyNeural' | |
| def get_language_code_from_country(country_selection: str) -> str: | |
| """Extract language code from country selection""" | |
| if not country_selection or '|' not in country_selection: | |
| return 'en' | |
| code = country_selection.split(' | ')[0].strip() | |
| return code.split('-')[0] | |
| def update_voice_options(country_selection: str) -> gr.Dropdown: | |
| """Update voice dropdown based on country selection""" | |
| voice_options = get_voice_options_for_country(country_selection) | |
| return gr.Dropdown(choices=voice_options, value=voice_options[0] if voice_options else "Jenny (Female)") | |
| def get_input_language_code_from_selection(input_lang_selection: str) -> str: | |
| """Extract language code from input language selection""" | |
| if not input_lang_selection or '|' not in input_lang_selection: | |
| return 'auto' | |
| code = input_lang_selection.split(' | ')[0].strip() | |
| if code == 'auto': | |
| return 'auto' | |
| return code | |
| # Global conversation and audio state | |
| conversation_state = { | |
| "person_a_messages": [], | |
| "person_b_messages": [], | |
| "person_a_translations": [], | |
| "person_b_translations": [], | |
| "latest_audio_for_a": None, # Audio that Person A should hear | |
| "latest_audio_for_b": None # Audio that Person B should hear | |
| } | |
| def add_message_to_conversation(person, original, detected_lang, translation, target_person): | |
| """Add message to global conversation state""" | |
| if original and translation: | |
| timestamp = time.strftime("%H:%M") | |
| if person == "A": | |
| conversation_state["person_a_messages"].append(f"[{timestamp}] Person A ({detected_lang}): {original}") | |
| conversation_state["person_b_translations"].append(f"[{timestamp}] -> Person B: {translation}") | |
| else: # person == "B" | |
| conversation_state["person_b_messages"].append(f"[{timestamp}] Person B ({detected_lang}): {original}") | |
| conversation_state["person_a_translations"].append(f"[{timestamp}] -> Person A: {translation}") | |
| def get_full_conversation(): | |
| """Get complete conversation history for both tabs""" | |
| all_messages = [] | |
| max_length = max( | |
| len(conversation_state["person_a_messages"]), | |
| len(conversation_state["person_b_messages"]), | |
| len(conversation_state["person_a_translations"]), | |
| len(conversation_state["person_b_translations"]) | |
| ) | |
| for i in range(max_length): | |
| if i < len(conversation_state["person_a_messages"]): | |
| all_messages.append(conversation_state["person_a_messages"][i]) | |
| if i < len(conversation_state["person_b_translations"]): | |
| all_messages.append(conversation_state["person_b_translations"][i]) | |
| if i < len(conversation_state["person_b_messages"]): | |
| all_messages.append(conversation_state["person_b_messages"][i]) | |
| if i < len(conversation_state["person_a_translations"]): | |
| all_messages.append(conversation_state["person_a_translations"][i]) | |
| return "\n".join(all_messages[-10:]) # Show last 10 messages | |
| def translate_person_a_to_b(audio_file, country_b: str, voice_b: str, input_lang_a: str) -> tuple[str, Optional[str]]: | |
| """Person A speaks -> results appear in Person B's tab""" | |
| if audio_file is None: | |
| return "", None | |
| try: | |
| print(f"[DEBUG] Person A recording: {audio_file}") | |
| tgt_code = get_language_code_from_country(country_b) | |
| selected_voice = get_voice_code_from_selections(country_b, voice_b) | |
| input_language = get_input_language_code_from_selection(input_lang_a) | |
| print(f"[DEBUG] Input Language: {input_language}, Target: {tgt_code}, Voice: {selected_voice}") | |
| original_text, detected_lang, translated_text, audio_output = agent.process_audio_translation_with_voice( | |
| audio_file, tgt_code, selected_voice, input_language | |
| ) | |
| print(f"[DEBUG] Results: {original_text[:50]}... -> {translated_text[:50]}...") | |
| print(f"[DEBUG] Audio output: {audio_output}") | |
| # Add to conversation | |
| add_message_to_conversation("A", original_text, detected_lang, translated_text, "B") | |
| # Return conversation for Person B's tab and audio | |
| conversation_history = get_full_conversation() | |
| print(f"[DEBUG] Conversation length: {len(conversation_history)}") | |
| return conversation_history, audio_output | |
| except Exception as e: | |
| print(f"[ERROR] translate_person_a_to_b: {e}") | |
| return f"Error: {str(e)}", None | |
| def translate_person_b_to_a(audio_file, country_a: str, voice_a: str, input_lang_b: str) -> tuple[str, Optional[str]]: | |
| """Person B speaks -> results appear in Person A's tab""" | |
| if audio_file is None: | |
| return "", None | |
| try: | |
| print(f"[DEBUG] Person B recording: {audio_file}") | |
| tgt_code = get_language_code_from_country(country_a) | |
| selected_voice = get_voice_code_from_selections(country_a, voice_a) | |
| input_language = get_input_language_code_from_selection(input_lang_b) | |
| print(f"[DEBUG] Input Language: {input_language}, Target: {tgt_code}, Voice: {selected_voice}") | |
| original_text, detected_lang, translated_text, audio_output = agent.process_audio_translation_with_voice( | |
| audio_file, tgt_code, selected_voice, input_language | |
| ) | |
| print(f"[DEBUG] Results: {original_text[:50]}... -> {translated_text[:50]}...") | |
| print(f"[DEBUG] Audio output: {audio_output}") | |
| # Add to conversation | |
| add_message_to_conversation("B", original_text, detected_lang, translated_text, "A") | |
| # Return conversation for Person A's tab and audio | |
| conversation_history = get_full_conversation() | |
| print(f"[DEBUG] Conversation length: {len(conversation_history)}") | |
| return conversation_history, audio_output | |
| except Exception as e: | |
| print(f"[ERROR] translate_person_b_to_a: {e}") | |
| return f"Error: {str(e)}", None | |
| def get_audio_for_person_a() -> Optional[str]: | |
| """Get latest audio that Person A should hear""" | |
| return conversation_state.get("latest_audio_for_a") | |
| def get_audio_for_person_b() -> Optional[str]: | |
| """Get latest audio that Person B should hear""" | |
| return conversation_state.get("latest_audio_for_b") | |
| # Create Two-Person Translation Interface | |
| with gr.Blocks( | |
| title="ποΈ Two-Person Live Translation", | |
| theme=gr.themes.Soft(), | |
| css=""" | |
| .gradio-container { max-width: 1400px !important; margin: 0 auto !important; } | |
| .header { | |
| text-align: center; | |
| background: linear-gradient(135deg, #4A90E2 0%, #FF6B9D 100%); | |
| color: white; | |
| padding: 20px; | |
| border-radius: 10px; | |
| margin-bottom: 20px; | |
| } | |
| .status-box { | |
| background: rgba(78, 205, 196, 0.1); | |
| border: 2px solid rgba(78, 205, 196, 0.3); | |
| border-radius: 10px; | |
| padding: 15px; | |
| text-align: center; | |
| margin: 15px 0; | |
| } | |
| .footer { | |
| text-align: center; | |
| background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); | |
| color: white; | |
| padding: 20px; | |
| border-radius: 10px; | |
| margin-top: 30px; | |
| box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1); | |
| } | |
| .guide-box { | |
| background: rgba(255, 255, 255, 0.05); | |
| border: 1px solid rgba(255, 255, 255, 0.2); | |
| border-radius: 10px; | |
| padding: 20px; | |
| margin: 15px 0; | |
| } | |
| .step-card { | |
| background: rgba(78, 205, 196, 0.1); | |
| border-left: 4px solid #4ECDCC; | |
| padding: 15px; | |
| margin: 10px 0; | |
| border-radius: 5px; | |
| } | |
| .tips-card { | |
| background: rgba(255, 193, 7, 0.1); | |
| border-left: 4px solid #FFC107; | |
| padding: 15px; | |
| margin: 10px 0; | |
| border-radius: 5px; | |
| } | |
| """ | |
| ) as demo: | |
| # Header | |
| api_status = "Ready" if agent.gemini_configured else "Missing API Key" | |
| edge_tts_status = "Ready" if EDGE_TTS_AVAILABLE else "Not Available" | |
| gr.HTML(f""" | |
| <div class="header"> | |
| <h1>ποΈ Two-Person Live Translation</h1> | |
| <p>Real-time Cross-Translation between Person A & Person B</p> | |
| <div style="margin-top: 15px;"> | |
| <span style="background: rgba(255,255,255,0.2); padding: 6px 12px; border-radius: 15px; margin: 0 5px;"> | |
| <strong>Gemini:</strong> {api_status} | |
| </span> | |
| <span style="background: rgba(255,255,255,0.2); padding: 6px 12px; border-radius: 15px; margin: 0 5px;"> | |
| <strong>Edge TTS:</strong> {edge_tts_status} | |
| </span> | |
| </div> | |
| <div style="margin-top: 10px;">π§ <strong>Digitized Brains</strong></div> | |
| </div> | |
| """) | |
| # Status Box | |
| gr.HTML(f""" | |
| <div class="status-box"> | |
| <h4>π€ AI Pipeline Status</h4> | |
| <div style="display: flex; justify-content: center; gap: 20px; flex-wrap: wrap;"> | |
| <span><strong>π§ Gemini Speech Recognition:</strong> {'π’ Ready' if agent.gemini_configured else 'π΄ Not Ready'}</span> | |
| <span><strong>π§ Gemini Translation:</strong> {'π’ Ready' if agent.gemini_configured else 'π΄ Not Ready'}</span> | |
| <span><strong>π Edge TTS:</strong> {'π’ Ready' if EDGE_TTS_AVAILABLE else 'π΄ Not Ready'}</span> | |
| </div> | |
| </div> | |
| """) | |
| with gr.Tabs(): | |
| # Person A Tab - Only shows translation FROM Person B | |
| with gr.TabItem("Person A View"): | |
| gr.Markdown("### Person A receives translations from Person B") | |
| # Conversation History for Person A | |
| conversation_display_a = gr.Textbox( | |
| label="Full Conversation", | |
| lines=8, | |
| interactive=False, | |
| placeholder="Conversation will appear here...", | |
| value="" | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| mic_a = gr.Audio( | |
| sources=["microphone"], | |
| type="filepath", | |
| label="Person A: Record Your Voice" | |
| ) | |
| with gr.Column(scale=1): | |
| # Person A's input language selection | |
| input_lang_a = gr.Dropdown( | |
| choices=get_input_language_options(), | |
| label="Person A's Input Language", | |
| value="auto | Auto-detect Language (Recommended)", | |
| info="Select your speaking language or use auto-detect" | |
| ) | |
| # Person B's output settings | |
| country_b_for_a = gr.Dropdown( | |
| choices=get_country_options(), | |
| label="Person B's Language", | |
| value="vi-vn | Vietnamese (Vietnam)" | |
| ) | |
| voice_b_for_a = gr.Dropdown( | |
| choices=["HoaiMy (Female)", "NamMinh (Male)"], | |
| label="Person B's Voice", | |
| value="HoaiMy (Female)" | |
| ) | |
| # Only show audio FROM Person B (Person A hears this) | |
| audio_from_b = gr.Audio( | |
| label="π Translation Audio from Person B", | |
| interactive=False, | |
| value=None | |
| ) | |
| # Person B Tab - Only shows translation FROM Person A | |
| with gr.TabItem("Person B View"): | |
| gr.Markdown("### Person B receives translations from Person A") | |
| # Conversation History for Person B | |
| conversation_display_b = gr.Textbox( | |
| label="Full Conversation", | |
| lines=8, | |
| interactive=False, | |
| placeholder="Conversation will appear here...", | |
| value="" | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| mic_b = gr.Audio( | |
| sources=["microphone"], | |
| type="filepath", | |
| label="Person B: Record Your Voice" | |
| ) | |
| with gr.Column(scale=1): | |
| # Person B's input language selection | |
| input_lang_b = gr.Dropdown( | |
| choices=get_input_language_options(), | |
| label="Person B's Input Language", | |
| value="auto | Auto-detect Language (Recommended)", | |
| info="Select your speaking language or use auto-detect" | |
| ) | |
| # Person A's output settings | |
| country_a_for_b = gr.Dropdown( | |
| choices=get_country_options(), | |
| label="Person A's Language", | |
| value="en-us | English (United States)" | |
| ) | |
| voice_a_for_b = gr.Dropdown( | |
| choices=["Jenny (Female)", "Guy (Male)"], | |
| label="Person A's Voice", | |
| value="Jenny (Female)" | |
| ) | |
| # Only show audio FROM Person A (Person B hears this) | |
| audio_from_a = gr.Audio( | |
| label="π Translation Audio from Person A", | |
| interactive=False, | |
| value=None | |
| ) | |
| # User Guide Tab | |
| with gr.TabItem("π User Guide"): | |
| gr.HTML(""" | |
| <div class="guide-box"> | |
| <h2 style="color: #4A90E2; margin-bottom: 20px;">ποΈ Two-Way Translation App User Guide</h2> | |
| <p style="font-size: 16px; margin-bottom: 20px;">This application enables two people to communicate in different languages through automatic translation.</p> | |
| </div> | |
| """) | |
| gr.HTML(""" | |
| <div class="step-card"> | |
| <h3>π Step 1: Preparation</h3> | |
| <ul> | |
| <li><strong>Check microphone:</strong> Ensure your microphone works properly</li> | |
| <li><strong>Quiet environment:</strong> Find a location with minimal background noise</li> | |
| <li><strong>Stable internet:</strong> Internet connection required for AI processing</li> | |
| <li><strong>Speakers/headphones:</strong> To hear translated audio output</li> | |
| </ul> | |
| </div> | |
| """) | |
| gr.HTML(""" | |
| <div class="step-card"> | |
| <h3>π₯ Step 2: Choose Your Tab</h3> | |
| <ul> | |
| <li><strong>Person A View:</strong> For the first person</li> | |
| <li><strong>Person B View:</strong> For the second person</li> | |
| <li><strong>Each person only needs to focus on their own tab</strong></li> | |
| </ul> | |
| </div> | |
| """) | |
| gr.HTML(""" | |
| <div class="step-card"> | |
| <h3>π£οΈ Step 3: Language Setup</h3> | |
| <ul> | |
| <li><strong>Input Language:</strong> Select the language you will speak (or Auto-detect)</li> | |
| <li><strong>Target Language:</strong> Choose the language to translate to</li> | |
| <li><strong>Voice:</strong> Select voice for translated audio output</li> | |
| <li><strong>Recommendation:</strong> Choose specific language instead of Auto-detect for better accuracy</li> | |
| </ul> | |
| </div> | |
| """) | |
| gr.HTML(""" | |
| <div class="step-card"> | |
| <h3>π€ Step 4: Record and Translate</h3> | |
| <ul> | |
| <li><strong>Click the microphone</strong> to start recording</li> | |
| <li><strong>Speak clearly for 3-7 seconds</strong></li> | |
| <li><strong>Wait for results:</strong> AI will recognize β translate β generate voice</li> | |
| <li><strong>Check results</strong> in the conversation history</li> | |
| </ul> | |
| </div> | |
| """) | |
| gr.HTML(""" | |
| <div class="tips-card"> | |
| <h3>π‘ Tips for Best Results</h3> | |
| <ul> | |
| <li><strong>π€ Microphone:</strong> Speak close to mic, not too loud or quiet</li> | |
| <li><strong>β±οΈ Duration:</strong> 3-7 seconds is ideal (not too short/long)</li> | |
| <li><strong>π£οΈ Speaking style:</strong> Clear, not too fast, natural punctuation</li> | |
| <li><strong>π Environment:</strong> Minimize background noise</li> | |
| <li><strong>π Language:</strong> Select correct input language instead of auto-detect</li> | |
| <li><strong>π Retry:</strong> If unsuccessful, try again with different approach</li> | |
| </ul> | |
| </div> | |
| """) | |
| gr.HTML(""" | |
| <div class="step-card"> | |
| <h3>π§ Common Troubleshooting</h3> | |
| <ul> | |
| <li><strong>"Could not understand speech":</strong> Speak more clearly, check microphone</li> | |
| <li><strong>No audio output:</strong> Check speakers/headphones</li> | |
| <li><strong>Incorrect translation:</strong> Select specific input language</li> | |
| <li><strong>Slow processing:</strong> Check internet connection</li> | |
| </ul> | |
| </div> | |
| """) | |
| # Event Handlers | |
| # Person A -> Person B | |
| country_b_for_a.change( | |
| fn=update_voice_options, | |
| inputs=[country_b_for_a], | |
| outputs=[voice_b_for_a] | |
| ) | |
| # Person A records -> Audio & translation appears in Person B's tab | |
| mic_a.change( | |
| fn=translate_person_a_to_b, | |
| inputs=[mic_a, country_b_for_a, voice_b_for_a, input_lang_a], | |
| outputs=[conversation_display_b, audio_from_a] # Results appear in Person B's tab | |
| ) | |
| # Person B -> Person A | |
| country_a_for_b.change( | |
| fn=update_voice_options, | |
| inputs=[country_a_for_b], | |
| outputs=[voice_a_for_b] | |
| ) | |
| # Person B records -> Audio & translation appears in Person A's tab | |
| mic_b.change( | |
| fn=translate_person_b_to_a, | |
| inputs=[mic_b, country_a_for_b, voice_a_for_b, input_lang_b], | |
| outputs=[conversation_display_a, audio_from_b] # Results appear in Person A's tab | |
| ) | |
| # Footer | |
| gr.HTML(""" | |
| <div class="footer"> | |
| <div style="display: flex; align-items: center; justify-content: center; gap: 10px; margin-bottom: 10px;"> | |
| <span style="font-size: 24px;">π§ </span> | |
| <h3 style="margin: 0; font-size: 20px; font-weight: 600; background: linear-gradient(45deg, #fff, #e0e0e0); -webkit-background-clip: text; -webkit-text-fill-color: transparent; background-clip: text;"> | |
| Digitized Brains - AI Translation | |
| </h3> | |
| </div> | |
| <div style="height: 1px; background: linear-gradient(90deg, transparent, rgba(255,255,255,0.3), transparent); margin: 15px 0;"></div> | |
| <p style="margin: 0; font-size: 14px; opacity: 0.8; font-style: italic;"> | |
| Intelligent Communication Solutions | |
| </p> | |
| </div> | |
| """) | |
| if __name__ == "__main__": | |
| print("===== Two-Person Live Translation Startup =====") | |
| print("Starting Two-Person Live Translation with Google Gemini") | |
| print(f"Google Gemini API Status: {'Ready' if agent.gemini_configured else 'Missing - Set GOOGLE_API_KEY'}") | |
| print(f"Edge TTS Status: {'Ready' if EDGE_TTS_AVAILABLE else 'Not Available'}") | |
| if agent.gemini_configured: | |
| print("Production Mode - Full Gemini AI Translation enabled") | |
| print("Speech Recognition: Google Gemini Flash 2.0") | |
| print("Language Detection: Google Gemini Flash 2.0") | |
| print("Translation Model: Google Gemini Flash 2.0") | |
| print("π§ All AI processing powered by Gemini Flash 2.0!") | |
| else: | |
| print("Demo Mode - Configure GOOGLE_API_KEY for full functionality") | |
| # Use environment port or default (7906 for testing) | |
| port = int(os.environ.get("GRADIO_SERVER_PORT", 7906)) | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=port, | |
| share=False, | |
| show_error=True | |
| ) |