SyncDub / text_to_speech.py
pranavinani's picture
Upload folder using huggingface_hub
e50cbd9 verified
import numpy as np
import os
import re
import tempfile
import logging
import torch
from pydub import AudioSegment
from pathlib import Path
import subprocess
import librosa
import soundfile as sf
# Set up basic logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Create directory structure
def ensure_directories():
"""Ensure the required directories exist"""
directories = ["audio", "audio2", "reference_audio"]
for directory in directories:
os.makedirs(directory, exist_ok=True)
ensure_directories() # Call immediately to ensure directories exist
# Setup audio effects for pydub
def setup_audio_effects():
"""Setup custom audio effects"""
from pydub import effects
# Add speedup if it's missing
if not hasattr(AudioSegment, "speedup"):
def speedup(audio_segment, playback_speed=1.5):
if playback_speed <= 0 or playback_speed == 1.0:
return audio_segment
new_frame_rate = int(audio_segment.frame_rate * playback_speed)
adjusted = audio_segment._spawn(audio_segment.raw_data,
overrides={'frame_rate': new_frame_rate})
return adjusted.set_frame_rate(audio_segment.frame_rate)
AudioSegment.speedup = speedup
# Add time_stretch if it's missing
if not hasattr(effects, "time_stretch"):
def time_stretch(audio_segment, stretch_factor):
if stretch_factor <= 0 or stretch_factor == 1.0:
return audio_segment
original_frame_rate = audio_segment.frame_rate
new_frame_rate = int(original_frame_rate / stretch_factor)
stretched = audio_segment._spawn(
audio_segment.raw_data,
overrides={'frame_rate': new_frame_rate}
)
return stretched.set_frame_rate(original_frame_rate)
effects.time_stretch = time_stretch
return effects
effects = setup_audio_effects()
def adjust_audio_duration(audio_segment, target_duration):
"""Adjust audio to target duration by adding silence or trimming"""
current_duration = len(audio_segment) / 1000 # ms to seconds
if current_duration < target_duration:
silence_duration_ms = int((target_duration - current_duration) * 1000)
silence = AudioSegment.silent(duration=silence_duration_ms)
return audio_segment + silence
else:
return audio_segment[:int(target_duration * 1000)]
# XTTS Model Loader (Singleton pattern)
class XTTSModelLoader:
_instance = None
model = None
@classmethod
def get_model(cls):
"""Get or initialize the XTTS model"""
if cls.model is None:
try:
from TTS.api import TTS
# Determine device
device = "cuda" if torch.cuda.is_available() else "cpu"
logger.info(f"Loading XTTS model on {device}...")
# Load the model
cls.model = TTS("tts_models/multilingual/multi-dataset/xtts_v2").to(device)
logger.info("XTTS model loaded successfully")
except Exception as e:
logger.error(f"Error loading XTTS model: {e}")
return None
return cls.model
def smooth_speed_change(audio_path, target_duration):
"""
Adjust audio speed with instantaneous time stretching to match target duration
Args:
audio_path: Path to audio file to adjust
target_duration: Target duration in seconds
Returns:
Path to adjusted audio file (temporary file)
"""
try:
# Debug start
print(f"\n[DEBUG] Starting audio speed adjustment:")
print(f"[DEBUG] Input file: {audio_path}")
print(f"[DEBUG] Target duration: {target_duration:.2f}s")
# Load audio with librosa
y, sr = librosa.load(audio_path, sr=None)
# Calculate current duration and speed factor
current_duration = librosa.get_duration(y=y, sr=sr)
speed_factor = current_duration / target_duration
print(f"[DEBUG] Current duration: {current_duration:.2f}s")
print(f"[DEBUG] Calculated speed factor: {speed_factor:.3f}")
# If the difference is minimal, return original path
if abs(speed_factor - 1) < 0.05:
print(f"[DEBUG] Speed factor {speed_factor:.3f} is within 5% threshold, skipping adjustment")
return audio_path
# Dynamic speed factor limits based on audio duration
# Allow more aggressive speed factors for short audio
if current_duration < 10.0: # Short audio under 10 seconds
max_speed = 3.0 # More aggressive for short segments
else:
max_speed = 2.7 # Standard limit for longer audio
min_speed = 0.5 # Allow more slowdown when needed
# Check if extreme speed change is needed
extreme_adjustment = (speed_factor > max_speed)
# Limit speed factor to reasonable range
original_speed_factor = speed_factor
speed_factor = min(max(speed_factor, min_speed), max_speed)
if original_speed_factor != speed_factor:
print(f"[DEBUG] Speed factor clamped from {original_speed_factor:.3f} to {speed_factor:.3f}")
if extreme_adjustment:
print(f"[DEBUG] Extreme adjustment needed - will apply max speed and then trim")
# Track processing time
import time
start_time = time.time()
# SIMPLIFIED: Apply direct time stretching to the entire audio at once
print(f"[DEBUG] Applying instantaneous time stretching with factor {speed_factor:.3f}")
stretched_audio = librosa.effects.time_stretch(y=y, rate=speed_factor)
# Calculate new duration
expected_duration = len(stretched_audio) / sr
# Save to temporary file
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.wav')
sf.write(temp_file.name, stretched_audio, sr)
# Calculate processing time
process_time = time.time() - start_time
# Verify the actual duration after processing
y_check, sr_check = librosa.load(temp_file.name, sr=None)
actual_duration = librosa.get_duration(y=y_check, sr=sr_check)
method = "direct"
# For extreme cases, perform additional trimming
if extreme_adjustment and actual_duration > target_duration:
print(f"[DEBUG] Performing additional trim for extreme case")
# Calculate how many samples to keep
samples_to_keep = int(target_duration * sr_check)
# Apply a small fade out to avoid clicks
fade_samples = min(int(0.1 * sr_check), samples_to_keep // 4) # 100ms fade or less
# Keep only the needed samples
trimmed_audio = y_check[:samples_to_keep]
# Apply fade out to avoid clicks
if fade_samples > 0:
fade_env = np.linspace(1.0, 0.0, fade_samples)
trimmed_audio[-fade_samples:] *= fade_env
# Save the trimmed version
sf.write(temp_file.name, trimmed_audio, sr_check)
# Update actual duration
actual_duration = librosa.get_duration(y=trimmed_audio, sr=sr_check)
method += "+trim"
print(f"[DEBUG] Method used: {method}")
print(f"[DEBUG] Processing completed in {process_time:.2f} seconds")
print(f"[DEBUG] Expected new duration: {expected_duration:.2f}s")
print(f"[DEBUG] Actual new duration: {actual_duration:.2f}s")
print(f"[DEBUG] Target was: {target_duration:.2f}s")
print(f"[DEBUG] Difference from target: {abs(actual_duration - target_duration):.3f}s")
print(f"[DEBUG] Output file: {temp_file.name}")
return temp_file.name
except Exception as e:
import traceback
print(f"[DEBUG ERROR] Audio speed adjustment failed: {e}")
print(traceback.format_exc())
logger.warning(f"Audio speed adjustment failed: {e}")
return audio_path
def create_segmented_edge_tts(text, pitch, voice, output_path, target_duration=None):
"""Create voice clone with specific characteristics and timing using Edge TTS"""
# Create a temporary file
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp3')
temp_filename = temp_file.name # Store filename before closing
temp_file.close()
# Fix pitch formatting
pitch_param = f"+{pitch}Hz" if pitch >= 0 else f"{pitch}Hz"
command = [
"edge-tts",
f"--pitch={pitch_param}",
"--voice", voice,
"--text", text,
"--write-media", temp_filename
]
subprocess.run(command, check=True)
# Load audio
audio = AudioSegment.from_file(temp_filename, format="mp3")
# Time constraint adjustment
if target_duration is not None:
current_duration = len(audio) / 1000 # ms to seconds
if abs(current_duration - target_duration) > 0.1: # 100ms threshold
speed_factor = current_duration / target_duration
speed_factor = min(max(speed_factor, 0.7), 3) # Keep within bounds
logger.info(f" Adjusting timing: {current_duration:.2f}s → {target_duration:.2f}s (factor: {speed_factor:.2f})")
# Apply time adjustment
# Instead of speed adjustments after generation, use Edge TTS rate parameter
if speed_factor < 1:
rate_adjustment = f"-{int((1 - speed_factor) * 100)}%"
else:
rate_adjustment = f"+{int((speed_factor - 1) * 100)}%"
# Regenerate with adjusted rate
os.unlink(temp_file.name) # Remove the previous temp file
# Create new command with rate parameter and fixed pitch formatting
command = [
"edge-tts",
f"--pitch={pitch_param}",
f"--rate={rate_adjustment}",
"--voice", voice,
"--text", text,
"--write-media", temp_filename
]
subprocess.run(command, check=True)
# Reload audio with rate adjustment
audio = AudioSegment.from_file(temp_filename, format="mp3")
# Fine-tune if needed
new_duration = len(audio) / 1000
if abs(new_duration - target_duration) > 0.1:
audio = adjust_audio_duration(audio, target_duration)
# Save the modified audio
audio.export(output_path, format="wav")
# Clean up temporary file
os.unlink(temp_file.name)
# Log final duration
final_audio = AudioSegment.from_file(output_path)
final_duration = len(final_audio) / 1000
logger.info(f" Final duration: {final_duration:.2f}s (target: {target_duration if target_duration else 'None'}s)")
return output_path
def create_segmented_xtts(text, reference_audio, language, output_path, target_duration=None):
"""Create voice-cloned speech using XTTS with speaker's reference audio and duration control"""
# Get the model (will be loaded on first call)
tts_model = XTTSModelLoader.get_model()
if tts_model is None:
raise RuntimeError("XTTS model could not be loaded. Ensure TTS is installed.")
# Verify reference audio exists
if not os.path.exists(reference_audio):
raise FileNotFoundError(f"Reference audio file not found: {reference_audio}")
# Generate speech
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.wav')
temp_filename = temp_file.name
temp_file.close()
logger.info(f"Generating XTTS speech using reference: {os.path.basename(reference_audio)}")
# Step 1: Try to optimize the generation parameters based on text length and target duration
# Short text might need special handling to avoid excessive padding
is_short_text = len(text.strip()) < 10
# XTTS generation options
generation_kwargs = {}
# Add text length information for very short text to help the model
# Note: These are example parameters - actual parameter support depends on the XTTS version
if is_short_text and target_duration is not None and target_duration < 2.0:
logger.info(f" Short text detected, attempting to minimize padding")
# These parameters may or may not be supported by the TTS model being used
generation_kwargs = {
'enable_text_splitting': False, # Avoid splitting short text
'no_silence_end': True, # Reduce trailing silence
}
# Some models may support 'speed' parameter
if hasattr(tts_model, 'tts_with_speed'):
generation_kwargs['speed'] = 1.2 # Slightly faster for short text
try:
# Try generating with optional parameters if supported
if generation_kwargs:
try:
tts_model.tts_to_file(
text=text,
speaker_wav=reference_audio,
language=language,
file_path=temp_filename,
**generation_kwargs
)
except (TypeError, ValueError):
# If parameters aren't supported, fall back to standard call
logger.info(" Advanced parameters not supported, using standard generation")
tts_model.tts_to_file(
text=text,
speaker_wav=reference_audio,
language=language,
file_path=temp_filename
)
else:
# Standard generation
tts_model.tts_to_file(
text=text,
speaker_wav=reference_audio,
language=language,
file_path=temp_filename
)
# Load generated audio
audio = AudioSegment.from_file(temp_filename)
# Step 2: Apply duration adjustment if needed
if target_duration is not None:
current_duration = len(audio) / 1000 # ms to seconds
if abs(current_duration - target_duration) > 0.1: # 100ms threshold
# Calculate speed factor - inverse of duration ratio
speed_factor = current_duration / target_duration
speed_factor = min(max(speed_factor, 0.7), 3) # Allow wider range for better adjustment
logger.info(f" Adjusting timing: {current_duration:.2f}s → {target_duration:.2f}s (speed factor: {speed_factor:.2f})")
try:
# Always attempt smooth speed change since regeneration doesn't work
logger.info(" Applying smooth speed adjustment...")
adjusted_path = smooth_speed_change(temp_filename, target_duration)
if adjusted_path != temp_filename: # If path is different, adjustment was done
# Load the adjusted audio
audio = AudioSegment.from_file(adjusted_path)
# Check if adjustment was successful
new_duration = len(audio) / 1000
if abs(new_duration - target_duration) <= 0.15: # 150ms tolerance
logger.info(f" Smooth adjustment successful: {new_duration:.2f}s")
# Clean up original file and use the adjusted one
os.unlink(temp_filename)
temp_filename = adjusted_path
else:
# Clean up adjusted file and just use duration adjustment
logger.info(f" Smooth adjustment not precise enough ({new_duration:.2f}s), will fine-tune with duration adjustment")
os.unlink(adjusted_path)
# We'll fall through to the final duration adjustment step
except Exception as e:
logger.warning(f" Smooth speed adjustment failed: {str(e)}")
# We'll fall through to the final duration adjustment step
# Always perform final duration adjustment to ensure exact timing
new_duration = len(audio) / 1000
if abs(new_duration - target_duration) > 0.1:
logger.info(f" Fine-tuning with duration adjustment: {new_duration:.2f}s → {target_duration:.2f}s")
audio = adjust_audio_duration(audio, target_duration)
# Save the final audio
audio.export(output_path, format="wav")
# Clean up
os.unlink(temp_filename)
# Log final duration
final_audio = AudioSegment.from_file(output_path)
final_duration = len(final_audio) / 1000
logger.info(f" Final duration: {final_duration:.2f}s (target: {target_duration if target_duration else 'None'}s)")
return output_path
except Exception as e:
logger.error(f"XTTS generation failed: {e}")
if os.path.exists(temp_filename):
os.unlink(temp_filename)
raise
def process_voice_config(voice_config):
"""
Process voice configuration to support both Edge TTS and XTTS
Args:
voice_config: Dict with speaker_id keys and configuration values
For Edge TTS: {'engine': 'edge_tts', 'gender': 'male'/'female'} or simply 'male'/'female'
For XTTS: {'engine': 'xtts', 'reference_audio': '/path/to/audio.wav', 'language': 'hi'}
Returns:
Processed configuration dictionary
"""
processed_config = {}
# Handle empty config
if not voice_config:
return {0: {'engine': 'edge_tts', 'voice': "hi-IN-MadhurNeural", 'pitch': 0}}
# Track Edge TTS speaker counts for pitch variations
edge_male_count = 0
edge_female_count = 0
# Pitch variations for multiple Edge TTS speakers of same gender
male_pitches = [0, -50, 50] # Default, deeper, higher
female_pitches = [0, 45, -45] # Default, higher, deeper
for speaker_id, config in voice_config.items():
# Convert string speaker_id to int if needed
if isinstance(speaker_id, str) and speaker_id.isdigit():
speaker_id = int(speaker_id)
# Determine which engine to use (default is edge_tts)
if isinstance(config, dict):
engine = config.get('engine', 'edge_tts')
else:
# Handle simple gender strings for backwards compatibility
engine = 'edge_tts'
config = {'gender': config} if config in ['male', 'female'] else {'gender': 'male'}
if engine == 'xtts':
# XTTS configuration - each speaker needs their own reference audio
if 'reference_audio' not in config:
logger.warning(f"No reference audio provided for XTTS speaker {speaker_id}, falling back to Edge TTS")
# Fall back to Edge TTS if no reference audio
engine = 'edge_tts'
gender = config.get('gender', 'male')
else:
# Valid XTTS configuration
processed_config[speaker_id] = {
'engine': 'xtts',
'reference_audio': config['reference_audio'],
'language': config.get('language', 'hi') # Default to Hindi
}
continue # Skip the Edge TTS processing below
# Edge TTS configuration (if engine is edge_tts or XTTS fallback)
gender = config.get('gender', 'male')
if gender == 'male':
# Assign male voice and pitch
pitch = male_pitches[edge_male_count % len(male_pitches)]
processed_config[speaker_id] = {
'engine': 'edge_tts',
'voice': "hi-IN-MadhurNeural",
'pitch': pitch
}
edge_male_count += 1
else:
# Assign female voice and pitch
pitch = female_pitches[edge_female_count % len(female_pitches)]
processed_config[speaker_id] = {
'engine': 'edge_tts',
'voice': "hi-IN-SwaraNeural",
'pitch': pitch
}
edge_female_count += 1
return processed_config
def generate_tts(segments, target_language, voice_config=None, output_dir="audio2"):
"""
Generate speech for all segments using appropriate TTS engine per speaker
Args:
segments: List of segments with text, speaker, start and end times
target_language: Language code for TTS
voice_config: Dictionary with speaker configurations
- For Edge TTS: {'gender': 'male'/'female'} or just 'male'/'female'
- For XTTS: {'engine': 'xtts', 'reference_audio': '/path/to/audio.wav'}
output_dir: Directory to save the final audio
Returns:
Path to the final combined audio file
"""
# Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
# Generate the full audio
output_path = os.path.join(output_dir, "dubbed_conversation.wav")
max_end_time = max(segment['end'] for segment in segments)
# Create a silent audio of the total duration
combined = AudioSegment.silent(duration=int(max_end_time * 1000) + 100)
ensure_directories()
audio_files = []
# Process voice configuration
processed_config = process_voice_config(voice_config or {})
print(processed_config)
# Process each segment
for i, segment in enumerate(segments):
# Extract speaker ID
speaker = segment.get('speaker', 'SPEAKER_00')
match = re.search(r'SPEAKER_(\d+)', speaker)
speaker_id = int(match.group(1)) if match else 0
# Get speaker configuration
speaker_config = processed_config.get(speaker_id,
{'engine': 'edge_tts', 'voice': "hi-IN-SwaraNeural", 'pitch': 0})
# Get text and timing information
text = segment['text']
start = segment['start']
end = segment['end']
duration = end - start
# Create output filename
output_file = f"audio/{start}.wav"
logger.info(f"Processing segment {i+1} (Speaker {speaker_id}, Engine: {speaker_config['engine']}):")
logger.info(f" Text: {text[:50]}{'...' if len(text) > 50 else ''}")
logger.info(f" Duration: {duration:.2f}s")
# Choose appropriate TTS engine
if speaker_config['engine'] == 'xtts':
# XTTS generation with speaker's reference audio
try:
create_segmented_xtts(
text=text,
reference_audio=speaker_config['reference_audio'],
language=speaker_config.get('language', target_language),
output_path=output_file,
target_duration=duration,
)
except Exception as e:
logger.error(f"Error using XTTS for speaker {speaker_id}: {e}")
logger.warning(f"Falling back to Edge TTS for this segment")
# Fallback to Edge TTS
create_segmented_edge_tts(
text=text,
pitch=0,
voice="hi-IN-SwaraNeural",
output_path=output_file,
target_duration=duration,
)
else:
# Edge TTS generation
create_segmented_edge_tts(
text=text,
pitch=speaker_config.get('pitch', 0),
voice=speaker_config.get('voice', "hi-IN-SwaraNeural"),
output_path=output_file,
target_duration=duration,
)
audio_files.append(output_file)
# Add segment to combined audio at the exact timestamp
segment_audio = AudioSegment.from_file(output_file)
position_ms = int(segment['start'] * 1000)
combined = combined.overlay(segment_audio, position=position_ms)
# Export the final combined audio
combined.export(output_path, format="wav")
logger.info(f" Final combined duration: {len(combined) / 1000:.2f}s")
# Clean up segment files
for file in audio_files:
try:
os.remove(file)
except:
pass
# Verify the final duration
final_audio = AudioSegment.from_file(output_path)
final_duration_sec = len(final_audio) / 1000
print(f"\nTarget duration: {max_end_time:.2f} seconds")
print(f"Actual duration: {final_duration_sec:.2f} seconds")
# If the final audio is still too long, trim it
if final_duration_sec > max_end_time + 0.1: # Allow 100ms grace
trimmed = final_audio[:int(max_end_time * 1000)]
trimmed.export(output_path, format="wav")
print(f"Trimmed to exactly {max_end_time:.2f} seconds")
return output_path