Spaces:
Runtime error
Runtime error
| import numpy as np | |
| import os | |
| import re | |
| import tempfile | |
| import logging | |
| import torch | |
| from pydub import AudioSegment | |
| from pathlib import Path | |
| import subprocess | |
| import librosa | |
| import soundfile as sf | |
| # Set up basic logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| # Create directory structure | |
| def ensure_directories(): | |
| """Ensure the required directories exist""" | |
| directories = ["audio", "audio2", "reference_audio"] | |
| for directory in directories: | |
| os.makedirs(directory, exist_ok=True) | |
| ensure_directories() # Call immediately to ensure directories exist | |
| # Setup audio effects for pydub | |
| def setup_audio_effects(): | |
| """Setup custom audio effects""" | |
| from pydub import effects | |
| # Add speedup if it's missing | |
| if not hasattr(AudioSegment, "speedup"): | |
| def speedup(audio_segment, playback_speed=1.5): | |
| if playback_speed <= 0 or playback_speed == 1.0: | |
| return audio_segment | |
| new_frame_rate = int(audio_segment.frame_rate * playback_speed) | |
| adjusted = audio_segment._spawn(audio_segment.raw_data, | |
| overrides={'frame_rate': new_frame_rate}) | |
| return adjusted.set_frame_rate(audio_segment.frame_rate) | |
| AudioSegment.speedup = speedup | |
| # Add time_stretch if it's missing | |
| if not hasattr(effects, "time_stretch"): | |
| def time_stretch(audio_segment, stretch_factor): | |
| if stretch_factor <= 0 or stretch_factor == 1.0: | |
| return audio_segment | |
| original_frame_rate = audio_segment.frame_rate | |
| new_frame_rate = int(original_frame_rate / stretch_factor) | |
| stretched = audio_segment._spawn( | |
| audio_segment.raw_data, | |
| overrides={'frame_rate': new_frame_rate} | |
| ) | |
| return stretched.set_frame_rate(original_frame_rate) | |
| effects.time_stretch = time_stretch | |
| return effects | |
| effects = setup_audio_effects() | |
| def adjust_audio_duration(audio_segment, target_duration): | |
| """Adjust audio to target duration by adding silence or trimming""" | |
| current_duration = len(audio_segment) / 1000 # ms to seconds | |
| if current_duration < target_duration: | |
| silence_duration_ms = int((target_duration - current_duration) * 1000) | |
| silence = AudioSegment.silent(duration=silence_duration_ms) | |
| return audio_segment + silence | |
| else: | |
| return audio_segment[:int(target_duration * 1000)] | |
| # XTTS Model Loader (Singleton pattern) | |
| class XTTSModelLoader: | |
| _instance = None | |
| model = None | |
| def get_model(cls): | |
| """Get or initialize the XTTS model""" | |
| if cls.model is None: | |
| try: | |
| from TTS.api import TTS | |
| # Determine device | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| logger.info(f"Loading XTTS model on {device}...") | |
| # Load the model | |
| cls.model = TTS("tts_models/multilingual/multi-dataset/xtts_v2").to(device) | |
| logger.info("XTTS model loaded successfully") | |
| except Exception as e: | |
| logger.error(f"Error loading XTTS model: {e}") | |
| return None | |
| return cls.model | |
| def smooth_speed_change(audio_path, target_duration): | |
| """ | |
| Adjust audio speed with instantaneous time stretching to match target duration | |
| Args: | |
| audio_path: Path to audio file to adjust | |
| target_duration: Target duration in seconds | |
| Returns: | |
| Path to adjusted audio file (temporary file) | |
| """ | |
| try: | |
| # Debug start | |
| print(f"\n[DEBUG] Starting audio speed adjustment:") | |
| print(f"[DEBUG] Input file: {audio_path}") | |
| print(f"[DEBUG] Target duration: {target_duration:.2f}s") | |
| # Load audio with librosa | |
| y, sr = librosa.load(audio_path, sr=None) | |
| # Calculate current duration and speed factor | |
| current_duration = librosa.get_duration(y=y, sr=sr) | |
| speed_factor = current_duration / target_duration | |
| print(f"[DEBUG] Current duration: {current_duration:.2f}s") | |
| print(f"[DEBUG] Calculated speed factor: {speed_factor:.3f}") | |
| # If the difference is minimal, return original path | |
| if abs(speed_factor - 1) < 0.05: | |
| print(f"[DEBUG] Speed factor {speed_factor:.3f} is within 5% threshold, skipping adjustment") | |
| return audio_path | |
| # Dynamic speed factor limits based on audio duration | |
| # Allow more aggressive speed factors for short audio | |
| if current_duration < 10.0: # Short audio under 10 seconds | |
| max_speed = 3.0 # More aggressive for short segments | |
| else: | |
| max_speed = 2.7 # Standard limit for longer audio | |
| min_speed = 0.5 # Allow more slowdown when needed | |
| # Check if extreme speed change is needed | |
| extreme_adjustment = (speed_factor > max_speed) | |
| # Limit speed factor to reasonable range | |
| original_speed_factor = speed_factor | |
| speed_factor = min(max(speed_factor, min_speed), max_speed) | |
| if original_speed_factor != speed_factor: | |
| print(f"[DEBUG] Speed factor clamped from {original_speed_factor:.3f} to {speed_factor:.3f}") | |
| if extreme_adjustment: | |
| print(f"[DEBUG] Extreme adjustment needed - will apply max speed and then trim") | |
| # Track processing time | |
| import time | |
| start_time = time.time() | |
| # SIMPLIFIED: Apply direct time stretching to the entire audio at once | |
| print(f"[DEBUG] Applying instantaneous time stretching with factor {speed_factor:.3f}") | |
| stretched_audio = librosa.effects.time_stretch(y=y, rate=speed_factor) | |
| # Calculate new duration | |
| expected_duration = len(stretched_audio) / sr | |
| # Save to temporary file | |
| temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.wav') | |
| sf.write(temp_file.name, stretched_audio, sr) | |
| # Calculate processing time | |
| process_time = time.time() - start_time | |
| # Verify the actual duration after processing | |
| y_check, sr_check = librosa.load(temp_file.name, sr=None) | |
| actual_duration = librosa.get_duration(y=y_check, sr=sr_check) | |
| method = "direct" | |
| # For extreme cases, perform additional trimming | |
| if extreme_adjustment and actual_duration > target_duration: | |
| print(f"[DEBUG] Performing additional trim for extreme case") | |
| # Calculate how many samples to keep | |
| samples_to_keep = int(target_duration * sr_check) | |
| # Apply a small fade out to avoid clicks | |
| fade_samples = min(int(0.1 * sr_check), samples_to_keep // 4) # 100ms fade or less | |
| # Keep only the needed samples | |
| trimmed_audio = y_check[:samples_to_keep] | |
| # Apply fade out to avoid clicks | |
| if fade_samples > 0: | |
| fade_env = np.linspace(1.0, 0.0, fade_samples) | |
| trimmed_audio[-fade_samples:] *= fade_env | |
| # Save the trimmed version | |
| sf.write(temp_file.name, trimmed_audio, sr_check) | |
| # Update actual duration | |
| actual_duration = librosa.get_duration(y=trimmed_audio, sr=sr_check) | |
| method += "+trim" | |
| print(f"[DEBUG] Method used: {method}") | |
| print(f"[DEBUG] Processing completed in {process_time:.2f} seconds") | |
| print(f"[DEBUG] Expected new duration: {expected_duration:.2f}s") | |
| print(f"[DEBUG] Actual new duration: {actual_duration:.2f}s") | |
| print(f"[DEBUG] Target was: {target_duration:.2f}s") | |
| print(f"[DEBUG] Difference from target: {abs(actual_duration - target_duration):.3f}s") | |
| print(f"[DEBUG] Output file: {temp_file.name}") | |
| return temp_file.name | |
| except Exception as e: | |
| import traceback | |
| print(f"[DEBUG ERROR] Audio speed adjustment failed: {e}") | |
| print(traceback.format_exc()) | |
| logger.warning(f"Audio speed adjustment failed: {e}") | |
| return audio_path | |
| def create_segmented_edge_tts(text, pitch, voice, output_path, target_duration=None): | |
| """Create voice clone with specific characteristics and timing using Edge TTS""" | |
| # Create a temporary file | |
| temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp3') | |
| temp_filename = temp_file.name # Store filename before closing | |
| temp_file.close() | |
| # Fix pitch formatting | |
| pitch_param = f"+{pitch}Hz" if pitch >= 0 else f"{pitch}Hz" | |
| command = [ | |
| "edge-tts", | |
| f"--pitch={pitch_param}", | |
| "--voice", voice, | |
| "--text", text, | |
| "--write-media", temp_filename | |
| ] | |
| subprocess.run(command, check=True) | |
| # Load audio | |
| audio = AudioSegment.from_file(temp_filename, format="mp3") | |
| # Time constraint adjustment | |
| if target_duration is not None: | |
| current_duration = len(audio) / 1000 # ms to seconds | |
| if abs(current_duration - target_duration) > 0.1: # 100ms threshold | |
| speed_factor = current_duration / target_duration | |
| speed_factor = min(max(speed_factor, 0.7), 3) # Keep within bounds | |
| logger.info(f" Adjusting timing: {current_duration:.2f}s → {target_duration:.2f}s (factor: {speed_factor:.2f})") | |
| # Apply time adjustment | |
| # Instead of speed adjustments after generation, use Edge TTS rate parameter | |
| if speed_factor < 1: | |
| rate_adjustment = f"-{int((1 - speed_factor) * 100)}%" | |
| else: | |
| rate_adjustment = f"+{int((speed_factor - 1) * 100)}%" | |
| # Regenerate with adjusted rate | |
| os.unlink(temp_file.name) # Remove the previous temp file | |
| # Create new command with rate parameter and fixed pitch formatting | |
| command = [ | |
| "edge-tts", | |
| f"--pitch={pitch_param}", | |
| f"--rate={rate_adjustment}", | |
| "--voice", voice, | |
| "--text", text, | |
| "--write-media", temp_filename | |
| ] | |
| subprocess.run(command, check=True) | |
| # Reload audio with rate adjustment | |
| audio = AudioSegment.from_file(temp_filename, format="mp3") | |
| # Fine-tune if needed | |
| new_duration = len(audio) / 1000 | |
| if abs(new_duration - target_duration) > 0.1: | |
| audio = adjust_audio_duration(audio, target_duration) | |
| # Save the modified audio | |
| audio.export(output_path, format="wav") | |
| # Clean up temporary file | |
| os.unlink(temp_file.name) | |
| # Log final duration | |
| final_audio = AudioSegment.from_file(output_path) | |
| final_duration = len(final_audio) / 1000 | |
| logger.info(f" Final duration: {final_duration:.2f}s (target: {target_duration if target_duration else 'None'}s)") | |
| return output_path | |
| def create_segmented_xtts(text, reference_audio, language, output_path, target_duration=None): | |
| """Create voice-cloned speech using XTTS with speaker's reference audio and duration control""" | |
| # Get the model (will be loaded on first call) | |
| tts_model = XTTSModelLoader.get_model() | |
| if tts_model is None: | |
| raise RuntimeError("XTTS model could not be loaded. Ensure TTS is installed.") | |
| # Verify reference audio exists | |
| if not os.path.exists(reference_audio): | |
| raise FileNotFoundError(f"Reference audio file not found: {reference_audio}") | |
| # Generate speech | |
| temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.wav') | |
| temp_filename = temp_file.name | |
| temp_file.close() | |
| logger.info(f"Generating XTTS speech using reference: {os.path.basename(reference_audio)}") | |
| # Step 1: Try to optimize the generation parameters based on text length and target duration | |
| # Short text might need special handling to avoid excessive padding | |
| is_short_text = len(text.strip()) < 10 | |
| # XTTS generation options | |
| generation_kwargs = {} | |
| # Add text length information for very short text to help the model | |
| # Note: These are example parameters - actual parameter support depends on the XTTS version | |
| if is_short_text and target_duration is not None and target_duration < 2.0: | |
| logger.info(f" Short text detected, attempting to minimize padding") | |
| # These parameters may or may not be supported by the TTS model being used | |
| generation_kwargs = { | |
| 'enable_text_splitting': False, # Avoid splitting short text | |
| 'no_silence_end': True, # Reduce trailing silence | |
| } | |
| # Some models may support 'speed' parameter | |
| if hasattr(tts_model, 'tts_with_speed'): | |
| generation_kwargs['speed'] = 1.2 # Slightly faster for short text | |
| try: | |
| # Try generating with optional parameters if supported | |
| if generation_kwargs: | |
| try: | |
| tts_model.tts_to_file( | |
| text=text, | |
| speaker_wav=reference_audio, | |
| language=language, | |
| file_path=temp_filename, | |
| **generation_kwargs | |
| ) | |
| except (TypeError, ValueError): | |
| # If parameters aren't supported, fall back to standard call | |
| logger.info(" Advanced parameters not supported, using standard generation") | |
| tts_model.tts_to_file( | |
| text=text, | |
| speaker_wav=reference_audio, | |
| language=language, | |
| file_path=temp_filename | |
| ) | |
| else: | |
| # Standard generation | |
| tts_model.tts_to_file( | |
| text=text, | |
| speaker_wav=reference_audio, | |
| language=language, | |
| file_path=temp_filename | |
| ) | |
| # Load generated audio | |
| audio = AudioSegment.from_file(temp_filename) | |
| # Step 2: Apply duration adjustment if needed | |
| if target_duration is not None: | |
| current_duration = len(audio) / 1000 # ms to seconds | |
| if abs(current_duration - target_duration) > 0.1: # 100ms threshold | |
| # Calculate speed factor - inverse of duration ratio | |
| speed_factor = current_duration / target_duration | |
| speed_factor = min(max(speed_factor, 0.7), 3) # Allow wider range for better adjustment | |
| logger.info(f" Adjusting timing: {current_duration:.2f}s → {target_duration:.2f}s (speed factor: {speed_factor:.2f})") | |
| try: | |
| # Always attempt smooth speed change since regeneration doesn't work | |
| logger.info(" Applying smooth speed adjustment...") | |
| adjusted_path = smooth_speed_change(temp_filename, target_duration) | |
| if adjusted_path != temp_filename: # If path is different, adjustment was done | |
| # Load the adjusted audio | |
| audio = AudioSegment.from_file(adjusted_path) | |
| # Check if adjustment was successful | |
| new_duration = len(audio) / 1000 | |
| if abs(new_duration - target_duration) <= 0.15: # 150ms tolerance | |
| logger.info(f" Smooth adjustment successful: {new_duration:.2f}s") | |
| # Clean up original file and use the adjusted one | |
| os.unlink(temp_filename) | |
| temp_filename = adjusted_path | |
| else: | |
| # Clean up adjusted file and just use duration adjustment | |
| logger.info(f" Smooth adjustment not precise enough ({new_duration:.2f}s), will fine-tune with duration adjustment") | |
| os.unlink(adjusted_path) | |
| # We'll fall through to the final duration adjustment step | |
| except Exception as e: | |
| logger.warning(f" Smooth speed adjustment failed: {str(e)}") | |
| # We'll fall through to the final duration adjustment step | |
| # Always perform final duration adjustment to ensure exact timing | |
| new_duration = len(audio) / 1000 | |
| if abs(new_duration - target_duration) > 0.1: | |
| logger.info(f" Fine-tuning with duration adjustment: {new_duration:.2f}s → {target_duration:.2f}s") | |
| audio = adjust_audio_duration(audio, target_duration) | |
| # Save the final audio | |
| audio.export(output_path, format="wav") | |
| # Clean up | |
| os.unlink(temp_filename) | |
| # Log final duration | |
| final_audio = AudioSegment.from_file(output_path) | |
| final_duration = len(final_audio) / 1000 | |
| logger.info(f" Final duration: {final_duration:.2f}s (target: {target_duration if target_duration else 'None'}s)") | |
| return output_path | |
| except Exception as e: | |
| logger.error(f"XTTS generation failed: {e}") | |
| if os.path.exists(temp_filename): | |
| os.unlink(temp_filename) | |
| raise | |
| def process_voice_config(voice_config): | |
| """ | |
| Process voice configuration to support both Edge TTS and XTTS | |
| Args: | |
| voice_config: Dict with speaker_id keys and configuration values | |
| For Edge TTS: {'engine': 'edge_tts', 'gender': 'male'/'female'} or simply 'male'/'female' | |
| For XTTS: {'engine': 'xtts', 'reference_audio': '/path/to/audio.wav', 'language': 'hi'} | |
| Returns: | |
| Processed configuration dictionary | |
| """ | |
| processed_config = {} | |
| # Handle empty config | |
| if not voice_config: | |
| return {0: {'engine': 'edge_tts', 'voice': "hi-IN-MadhurNeural", 'pitch': 0}} | |
| # Track Edge TTS speaker counts for pitch variations | |
| edge_male_count = 0 | |
| edge_female_count = 0 | |
| # Pitch variations for multiple Edge TTS speakers of same gender | |
| male_pitches = [0, -50, 50] # Default, deeper, higher | |
| female_pitches = [0, 45, -45] # Default, higher, deeper | |
| for speaker_id, config in voice_config.items(): | |
| # Convert string speaker_id to int if needed | |
| if isinstance(speaker_id, str) and speaker_id.isdigit(): | |
| speaker_id = int(speaker_id) | |
| # Determine which engine to use (default is edge_tts) | |
| if isinstance(config, dict): | |
| engine = config.get('engine', 'edge_tts') | |
| else: | |
| # Handle simple gender strings for backwards compatibility | |
| engine = 'edge_tts' | |
| config = {'gender': config} if config in ['male', 'female'] else {'gender': 'male'} | |
| if engine == 'xtts': | |
| # XTTS configuration - each speaker needs their own reference audio | |
| if 'reference_audio' not in config: | |
| logger.warning(f"No reference audio provided for XTTS speaker {speaker_id}, falling back to Edge TTS") | |
| # Fall back to Edge TTS if no reference audio | |
| engine = 'edge_tts' | |
| gender = config.get('gender', 'male') | |
| else: | |
| # Valid XTTS configuration | |
| processed_config[speaker_id] = { | |
| 'engine': 'xtts', | |
| 'reference_audio': config['reference_audio'], | |
| 'language': config.get('language', 'hi') # Default to Hindi | |
| } | |
| continue # Skip the Edge TTS processing below | |
| # Edge TTS configuration (if engine is edge_tts or XTTS fallback) | |
| gender = config.get('gender', 'male') | |
| if gender == 'male': | |
| # Assign male voice and pitch | |
| pitch = male_pitches[edge_male_count % len(male_pitches)] | |
| processed_config[speaker_id] = { | |
| 'engine': 'edge_tts', | |
| 'voice': "hi-IN-MadhurNeural", | |
| 'pitch': pitch | |
| } | |
| edge_male_count += 1 | |
| else: | |
| # Assign female voice and pitch | |
| pitch = female_pitches[edge_female_count % len(female_pitches)] | |
| processed_config[speaker_id] = { | |
| 'engine': 'edge_tts', | |
| 'voice': "hi-IN-SwaraNeural", | |
| 'pitch': pitch | |
| } | |
| edge_female_count += 1 | |
| return processed_config | |
| def generate_tts(segments, target_language, voice_config=None, output_dir="audio2"): | |
| """ | |
| Generate speech for all segments using appropriate TTS engine per speaker | |
| Args: | |
| segments: List of segments with text, speaker, start and end times | |
| target_language: Language code for TTS | |
| voice_config: Dictionary with speaker configurations | |
| - For Edge TTS: {'gender': 'male'/'female'} or just 'male'/'female' | |
| - For XTTS: {'engine': 'xtts', 'reference_audio': '/path/to/audio.wav'} | |
| output_dir: Directory to save the final audio | |
| Returns: | |
| Path to the final combined audio file | |
| """ | |
| # Ensure output directory exists | |
| os.makedirs(output_dir, exist_ok=True) | |
| # Generate the full audio | |
| output_path = os.path.join(output_dir, "dubbed_conversation.wav") | |
| max_end_time = max(segment['end'] for segment in segments) | |
| # Create a silent audio of the total duration | |
| combined = AudioSegment.silent(duration=int(max_end_time * 1000) + 100) | |
| ensure_directories() | |
| audio_files = [] | |
| # Process voice configuration | |
| processed_config = process_voice_config(voice_config or {}) | |
| print(processed_config) | |
| # Process each segment | |
| for i, segment in enumerate(segments): | |
| # Extract speaker ID | |
| speaker = segment.get('speaker', 'SPEAKER_00') | |
| match = re.search(r'SPEAKER_(\d+)', speaker) | |
| speaker_id = int(match.group(1)) if match else 0 | |
| # Get speaker configuration | |
| speaker_config = processed_config.get(speaker_id, | |
| {'engine': 'edge_tts', 'voice': "hi-IN-SwaraNeural", 'pitch': 0}) | |
| # Get text and timing information | |
| text = segment['text'] | |
| start = segment['start'] | |
| end = segment['end'] | |
| duration = end - start | |
| # Create output filename | |
| output_file = f"audio/{start}.wav" | |
| logger.info(f"Processing segment {i+1} (Speaker {speaker_id}, Engine: {speaker_config['engine']}):") | |
| logger.info(f" Text: {text[:50]}{'...' if len(text) > 50 else ''}") | |
| logger.info(f" Duration: {duration:.2f}s") | |
| # Choose appropriate TTS engine | |
| if speaker_config['engine'] == 'xtts': | |
| # XTTS generation with speaker's reference audio | |
| try: | |
| create_segmented_xtts( | |
| text=text, | |
| reference_audio=speaker_config['reference_audio'], | |
| language=speaker_config.get('language', target_language), | |
| output_path=output_file, | |
| target_duration=duration, | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error using XTTS for speaker {speaker_id}: {e}") | |
| logger.warning(f"Falling back to Edge TTS for this segment") | |
| # Fallback to Edge TTS | |
| create_segmented_edge_tts( | |
| text=text, | |
| pitch=0, | |
| voice="hi-IN-SwaraNeural", | |
| output_path=output_file, | |
| target_duration=duration, | |
| ) | |
| else: | |
| # Edge TTS generation | |
| create_segmented_edge_tts( | |
| text=text, | |
| pitch=speaker_config.get('pitch', 0), | |
| voice=speaker_config.get('voice', "hi-IN-SwaraNeural"), | |
| output_path=output_file, | |
| target_duration=duration, | |
| ) | |
| audio_files.append(output_file) | |
| # Add segment to combined audio at the exact timestamp | |
| segment_audio = AudioSegment.from_file(output_file) | |
| position_ms = int(segment['start'] * 1000) | |
| combined = combined.overlay(segment_audio, position=position_ms) | |
| # Export the final combined audio | |
| combined.export(output_path, format="wav") | |
| logger.info(f" Final combined duration: {len(combined) / 1000:.2f}s") | |
| # Clean up segment files | |
| for file in audio_files: | |
| try: | |
| os.remove(file) | |
| except: | |
| pass | |
| # Verify the final duration | |
| final_audio = AudioSegment.from_file(output_path) | |
| final_duration_sec = len(final_audio) / 1000 | |
| print(f"\nTarget duration: {max_end_time:.2f} seconds") | |
| print(f"Actual duration: {final_duration_sec:.2f} seconds") | |
| # If the final audio is still too long, trim it | |
| if final_duration_sec > max_end_time + 0.1: # Allow 100ms grace | |
| trimmed = final_audio[:int(max_end_time * 1000)] | |
| trimmed.export(output_path, format="wav") | |
| print(f"Trimmed to exactly {max_end_time:.2f} seconds") | |
| return output_path | |