Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import spaces | |
| import uuid | |
| import os | |
| import asyncio | |
| import edge_tts | |
| from deep_translator import GoogleTranslator | |
| from patch_tts import tts | |
| import logging | |
| import torch | |
| import zipfile | |
| from pathlib import Path | |
| import tempfile | |
| import shutil | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| language_mapping = { | |
| "English": ("en", "en-US-ChristopherNeural"), | |
| "Spanish": ("es", "es-ES-AlvaroNeural"), | |
| "French": ("fr", "fr-FR-DeniseNeural"), | |
| "German": ("de", "de-DE-KatjaNeural"), | |
| "Italian": ("it", "it-IT-IsabellaNeural"), | |
| "Portuguese": ("pt", "pt-PT-DuarteNeural"), | |
| "Polish": ("pl", "pl-PL-AgnieszkaNeural"), | |
| "Turkish": ("tr", "tr-TR-AhmetNeural"), | |
| "Russian": ("ru", "ru-RU-DmitryNeural"), | |
| "Dutch": ("nl", "nl-NL-ColetteNeural"), | |
| "Czech": ("cs", "cs-CZ-VlastaNeural"), | |
| "Arabic": ("ar", "ar-SA-HamedNeural"), | |
| "Chinese": ("zh", "zh-CN-XiaoxiaoNeural"), | |
| "Japanese": ("ja", "ja-JP-NanamiNeural"), | |
| "Hungarian": ("hu", "hu-HU-TamasNeural"), | |
| "Korean": ("ko", "ko-KR-SunHiNeural") | |
| } | |
| def text_to_speech(text, voice, output_file, speaker_wav=None, language="en"): | |
| if speaker_wav: | |
| try: | |
| logger.info("Using patched Coqui TTS with XTTS-v2 model") | |
| device = "cpu" if not torch.cuda.is_available() else "cuda" | |
| logger.info(f"Using device: {device}") | |
| logger.info(f"Generating speech with text: {text[:50]}... and speaker_wav: {speaker_wav}") | |
| tts.tts_to_file( | |
| text=text, | |
| speaker_wav=speaker_wav, | |
| language=language.lower(), | |
| file_path=output_file, | |
| speed=1.0 | |
| ) | |
| logger.info(f"Generated audio saved to {output_file}") | |
| except Exception as e: | |
| logger.error(f"Coqui TTS error: {str(e)}") | |
| raise Exception(f"Coqui TTS error: {str(e)}") | |
| else: | |
| logger.info("Using edge-tts as fallback") | |
| communicate = edge_tts.Communicate(text, voice) | |
| asyncio.run(communicate.save(output_file)) | |
| def process_audio(input_text, target_language, speaker_wav=None): | |
| try: | |
| if target_language is None: | |
| raise ValueError("Please select a Target Language.") | |
| if not input_text: | |
| raise ValueError("Please provide text to synthesize.") | |
| if not speaker_wav: | |
| raise ValueError("Please upload a voice sample for cloning.") | |
| run_uuid = uuid.uuid4().hex[:6] | |
| output_filename = f"{run_uuid}_output_synth.wav" | |
| target_language_code, voice = language_mapping[target_language] | |
| translator = GoogleTranslator(source='auto', target=target_language_code) | |
| translated_text = translator.translate(input_text) | |
| logger.info(f"Translated text: {translated_text}") | |
| text_to_speech(translated_text, voice, output_filename, speaker_wav=speaker_wav, language=target_language_code) | |
| if not os.path.exists(output_filename): | |
| raise FileNotFoundError(f"Error: {output_filename} was not generated.") | |
| return output_filename, "" | |
| except Exception as e: | |
| logger.error(f"Error in process_audio: {str(e)}") | |
| return None, f"Error: {str(e)}" | |
| def process_batch_audio(audio_files, text_input, target_language, progress=gr.Progress()): | |
| try: | |
| if not audio_files: | |
| return None, "Error: No audio files uploaded." | |
| if not text_input: | |
| return None, "Error: No text provided." | |
| if target_language is None: | |
| return None, "Error: Please select a Target Language." | |
| # Parse text input (expecting one text per line) | |
| texts = text_input.strip().split("\n") | |
| texts = [t.strip() for t in texts if t.strip()] # Remove empty lines | |
| if len(audio_files) != len(texts): | |
| return None, f"Error: Number of audio files ({len(audio_files)}) does not match number of text lines ({len(texts)})." | |
| if len(audio_files) > 100: | |
| return None, "Error: Maximum 100 audio files allowed." | |
| target_language_code, _ = language_mapping[target_language] | |
| translator = GoogleTranslator(source='auto', target=target_language_code) | |
| # Create temporary directory for output files | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| output_files = [] | |
| seen_filenames = set() # Track filenames to handle duplicates | |
| for idx, (audio_file, text) in enumerate(zip(audio_files, texts), 1): | |
| progress(idx / len(audio_files), desc=f"Processing file {idx}/{len(audio_files)}") | |
| try: | |
| translated_text = translator.translate(text) | |
| if not translated_text: | |
| raise ValueError(f"Translation failed for text: {text[:50]}...") | |
| # Extract original filename without path and extension | |
| original_filename = Path(audio_file).stem | |
| # Handle duplicate filenames by appending index if needed | |
| base_filename = original_filename | |
| suffix = 0 | |
| while base_filename in seen_filenames: | |
| suffix += 1 | |
| base_filename = f"{original_filename}_{suffix}" | |
| seen_filenames.add(base_filename) | |
| output_filename = os.path.join(temp_dir, f"{base_filename}.wav") | |
| text_to_speech( | |
| text=translated_text, | |
| voice=None, # Not used with speaker_wav | |
| output_file=output_filename, | |
| speaker_wav=audio_file, | |
| language=target_language_code | |
| ) | |
| if not os.path.exists(output_filename): | |
| raise FileNotFoundError(f"Output file {output_filename} was not generated.") | |
| output_files.append((output_filename, f"{base_filename}.wav")) | |
| except Exception as e: | |
| logger.error(f"Error processing file {idx} ({original_filename}): {str(e)}") | |
| return None, f"Error processing file {idx} ({original_filename}): {str(e)}" | |
| # Create ZIP archive | |
| zip_filename = f"batch_output_{uuid.uuid4().hex[:6]}.zip" | |
| with zipfile.ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED) as zipf: | |
| for output_file, zip_name in output_files: | |
| zipf.write(output_file, zip_name) | |
| return zip_filename, f"Successfully processed {len(output_files)} files. Download the ZIP archive." | |
| except Exception as e: | |
| logger.error(f"Error in batch processing: {str(e)}") | |
| return None, f"Error: {str(e)}" | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# Audio Dubbing AI") | |
| gr.Markdown("Upload voice samples and provide text to synthesize. Supports single or batch processing (up to 100 files).") | |
| with gr.Tabs(): | |
| with gr.Tab("Single Audio"): | |
| gr.Markdown("Process one audio file with one text.") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| single_input_text = gr.Textbox(label="Text to Synthesize", placeholder="Enter the text you want to synthesize") | |
| single_speaker_wav = gr.Audio(label="Upload Voice Sample (2-3 seconds)", type="filepath") | |
| single_target_language = gr.Dropdown( | |
| choices=list(language_mapping.keys()), | |
| label="Target Language", | |
| value="Russian" | |
| ) | |
| single_submit_button = gr.Button("Generate Audio", variant="primary") | |
| with gr.Column(scale=3): | |
| single_output_audio = gr.Audio(label="Synthesized Audio") | |
| single_error_message = gr.Textbox(label="Status / Error Message", interactive=False) | |
| single_submit_button.click( | |
| process_audio, | |
| inputs=[single_input_text, single_target_language, single_speaker_wav], | |
| outputs=[single_output_audio, single_error_message] | |
| ) | |
| with gr.Tab("Batch Audio"): | |
| gr.Markdown("Upload multiple WAV files and provide one text per file (one per line).") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| batch_audio_files = gr.Files(label="Upload WAV Files (up to 100)", file_types=[".wav"], file_count="multiple") | |
| batch_text_input = gr.Textbox( | |
| label="Text for Each File (one per line)", | |
| placeholder="Text for file 1\nText for file 2\n...", | |
| lines=5 | |
| ) | |
| batch_target_language = gr.Dropdown( | |
| choices=list(language_mapping.keys()), | |
| label="Target Language", | |
| value="Russian" | |
| ) | |
| batch_submit_button = gr.Button("Generate Batch Audio", variant="primary") | |
| with gr.Column(scale=3): | |
| batch_output_file = gr.File(label="Download ZIP Archive") | |
| batch_status_message = gr.Textbox(label="Status / Error Message", interactive=False) | |
| batch_submit_button.click( | |
| process_batch_audio, | |
| inputs=[batch_audio_files, batch_text_input, batch_target_language], | |
| outputs=[batch_output_file, batch_status_message] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |