Spaces:
Running
Running
| """ | |
| Text-to-speech audio generation for translated subtitles. | |
| """ | |
| import os | |
| import time | |
| import shutil | |
| import tempfile | |
| from pathlib import Path | |
| from tqdm import tqdm | |
| import subprocess | |
| from gtts import gTTS | |
| import pysrt | |
| from src.utils.logger import get_logger | |
| from src.audio.extractor import create_silent_audio | |
| from config import OUTPUT_DIR, TTS_VOICES, MAX_RETRY_ATTEMPTS | |
| logger = get_logger(__name__) | |
| def generate_translated_audio(srt_path, target_lang, video_duration=180): | |
| """ | |
| Generate translated audio using text-to-speech for each subtitle. | |
| Args: | |
| srt_path (str): Path to the SRT subtitle file | |
| target_lang (str): Target language code (e.g., 'en', 'es') | |
| video_duration (float): Duration of the original video in seconds | |
| Returns: | |
| Path: Path to the translated audio file | |
| Raises: | |
| Exception: If audio generation fails | |
| """ | |
| try: | |
| srt_path = Path(srt_path) | |
| logger.info(f"Generating translated audio for {target_lang} from {srt_path}") | |
| # Load subtitles | |
| subs = pysrt.open(srt_path, encoding="utf-8") | |
| logger.info(f"Loaded {len(subs)} subtitles from SRT file") | |
| # Create temporary directory for audio chunks | |
| temp_dir = Path(tempfile.mkdtemp(prefix=f"audio_{target_lang}_", dir=OUTPUT_DIR / "temp")) | |
| logger.debug(f"Created temporary directory: {temp_dir}") | |
| # Generate TTS for each subtitle | |
| audio_files = [] | |
| timings = [] | |
| logger.info(f"Generating speech for {len(subs)} subtitles in {target_lang}") | |
| for i, sub in enumerate(tqdm(subs, desc=f"Generating {target_lang} speech")): | |
| text = sub.text.strip() | |
| if not text: | |
| continue | |
| # Get timing information | |
| start_time = (sub.start.hours * 3600 + | |
| sub.start.minutes * 60 + | |
| sub.start.seconds + | |
| sub.start.milliseconds / 1000) | |
| end_time = (sub.end.hours * 3600 + | |
| sub.end.minutes * 60 + | |
| sub.end.seconds + | |
| sub.end.milliseconds / 1000) | |
| duration = end_time - start_time | |
| # Generate TTS audio | |
| tts_lang = TTS_VOICES.get(target_lang, target_lang) | |
| audio_file = temp_dir / f"chunk_{i:04d}.mp3" | |
| # Add a retry mechanism | |
| retry_count = 0 | |
| while retry_count < MAX_RETRY_ATTEMPTS: | |
| try: | |
| # For certain languages, use slower speed | |
| slow_option = target_lang in ["hi", "ja", "zh-CN", "ar"] | |
| tts = gTTS(text=text, lang=target_lang, slow=slow_option) | |
| tts.save(str(audio_file)) | |
| logger.info(f"Generated TTS file size for chunk {i}: {audio_file.stat().st_size} bytes") | |
| if audio_file.exists() and audio_file.stat().st_size > 0: | |
| break | |
| else: | |
| raise Exception("Generated audio file is empty") | |
| except Exception as e: | |
| retry_count += 1 | |
| logger.warning(f"TTS attempt {retry_count} failed for {target_lang}: {str(e)}") | |
| time.sleep(1) | |
| # Fallback to shortened text | |
| if retry_count == MAX_RETRY_ATTEMPTS - 1 and len(text) > 100: | |
| logger.warning(f"Trying with shortened text for {target_lang}") | |
| shortened_text = text[:100] + "..." | |
| tts = gTTS(text=shortened_text, lang=target_lang, slow=True) | |
| tts.save(str(audio_file)) | |
| if audio_file.exists() and audio_file.stat().st_size > 0: | |
| audio_files.append(audio_file) | |
| timings.append((start_time, end_time, duration, audio_file)) | |
| else: | |
| logger.warning(f"Failed to generate audio for subtitle {i}") | |
| # Fallback if no audio generated | |
| if not audio_files: | |
| logger.warning(f"No audio files generated for {target_lang}") | |
| silent_audio = OUTPUT_DIR / f"translated_audio_{target_lang}.wav" | |
| create_silent_audio(video_duration, silent_audio) | |
| return silent_audio | |
| # Output configuration | |
| output_audio = OUTPUT_DIR / f"translated_audio_{target_lang}.mp3" | |
| silence_file = temp_dir / "silence.wav" | |
| create_silent_audio(video_duration, silence_file) | |
| # Validate input files | |
| for f in [silence_file, *audio_files]: | |
| if not f.exists(): | |
| logger.error(f"Missing input file: {f}") | |
| return create_silent_audio(video_duration, output_audio) | |
| # Build FFmpeg command with volume boost and timing | |
| cmd = ['ffmpeg', '-y'] | |
| cmd += ['-i', str(silence_file)] | |
| # Add all audio chunks as inputs | |
| for audio_file in audio_files: | |
| cmd += ['-i', str(audio_file)] | |
| # Create filter chain for each audio chunk | |
| filter_chains = [] | |
| for i, (start_time, _, _, _) in enumerate(timings): | |
| delay_ms = int(start_time * 1000) | |
| filter_chains.append( | |
| f"[{i+1}:a]volume=12dB,adelay={delay_ms}|{delay_ms},apad=whole_dur={video_duration}[a{i}]" | |
| ) | |
| # Mix all audio streams with normalization | |
| mix_inputs = ''.join([f"[a{i}]" for i in range(len(timings))]) | |
| filter_complex = ";".join(filter_chains) + \ | |
| f";{mix_inputs}amix=inputs={len(timings)}:duration=longest:normalize=0,volume=3dB[aout]" | |
| cmd += [ | |
| '-filter_complex', filter_complex, | |
| '-map', '[aout]', | |
| '-c:a', 'libmp3lame', # Changed to MP3 codec | |
| '-b:a', '192k', | |
| str(output_audio) | |
| ] | |
| logger.debug(f"Running FFmpeg command: {' '.join(cmd)}") | |
| # Execute audio mixing | |
| process = subprocess.run(cmd, capture_output=True, text=True) | |
| if process.returncode != 0: | |
| logger.error(f"Audio mixing failed: {process.stderr}") | |
| silent_audio = OUTPUT_DIR / f"translated_audio_{target_lang}.wav" | |
| create_silent_audio(video_duration, silent_audio) | |
| return silent_audio | |
| logger.info(f"Final audio file size: {output_audio.stat().st_size} bytes") | |
| # Cleanup temporary files | |
| try: | |
| shutil.rmtree(temp_dir) | |
| logger.debug(f"Cleaned temporary directory: {temp_dir}") | |
| except Exception as e: | |
| logger.warning(f"Temp cleanup failed: {str(e)}") | |
| return output_audio | |
| except Exception as e: | |
| logger.error(f"Audio generation failed: {str(e)}", exc_info=True) | |
| silent_audio = OUTPUT_DIR / f"translated_audio_{target_lang}.wav" | |
| create_silent_audio(video_duration, silent_audio) | |
| return silent_audio |