SyncDub / demo.py
pranavinani's picture
Upload folder using huggingface_hub
8150719 verified
import os
import sys
import logging
from dotenv import load_dotenv
import re
# Set COQUI_TOS_AGREED to 1 to automatically accept the Terms of Service for Coqui TTS models
os.environ['COQUI_TOS_AGREED'] = '1'
# Add the current directory to path to help with imports
current_dir = os.path.dirname(os.path.abspath(__file__))
if current_dir not in sys.path:
sys.path.append(current_dir)
# Import the required modules
from media_ingestion import MediaIngester
from speech_recognition import SpeechRecognizer
from speech_diarization import SpeakerDiarizer
from translate import translate_text, generate_srt_subtitles
from text_to_speech import generate_tts # Import both TTS functions
from audio_to_video import create_video_with_mixed_audio
def create_directories(dirs):
"""Create necessary directories"""
for directory in dirs:
os.makedirs(directory, exist_ok=True)
def main():
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Create necessary directories
create_directories(["temp", "audio", "audio2", "reference_audio"])
# Get API tokens
hf_token = os.getenv("HUGGINGFACE_TOKEN")
if not hf_token:
logger.error("Error: HUGGINGFACE_TOKEN not found in .env file")
return
# Get input from user
media_source = input("Enter video URL or local file path: ")
target_language = input("Enter target language code (e.g., en, es, fr, de): ")
# Choose TTS engine
print("\nSelect TTS engine:")
print("1. Simple dubbing (Edge TTS)")
print("2. Voice cloning (XTTS)")
tts_choice = input("Enter choice (1/2): ").strip()
use_voice_cloning = tts_choice == "2"
# Initialize components
logger.info("Initializing pipeline components...")
ingester = MediaIngester(output_dir="temp")
recognizer = SpeechRecognizer(model_size="base")
diarizer = SpeakerDiarizer(hf_token=hf_token)
# Step 1: Process input and extract audio
logger.info("Processing media source...")
video_path = ingester.process_input(media_source)
audio_path = ingester.extract_audio(video_path)
clean_audio_path, bg_audio_path = ingester.separate_audio_sources(audio_path)
logger.info("Extracted audio: %s", audio_path)
logger.info("Cleaned audio: %s", clean_audio_path)
logger.info("Background audio: %s", bg_audio_path)
logger.info("Audio processing completed.")
# Step 2: Perform speech recognition
logger.info("Transcribing audio...")
segments = recognizer.transcribe(clean_audio_path)
# Step 3: Perform speaker diarization
logger.info("Identifying speakers...")
# Add user input for max speakers
max_speakers_str = input("Maximum number of speakers to detect (leave blank for auto): ")
max_speakers = int(max_speakers_str) if max_speakers_str.strip() else None
# Then call diarize with this parameter
speakers = diarizer.diarize(clean_audio_path, max_speakers=max_speakers)
# Step 4: Assign speakers to segments
logger.info("Assigning speakers to segments...")
final_segments = diarizer.assign_speakers_to_segments(segments, speakers)
# Step 5: Translate the segments
logger.info(f"Translating to {target_language}...")
translated_segments = translate_text(
final_segments,
target_lang=target_language,
translation_method="batch" # Can be "batch" or "iterative" or "groq"
)
# Print translated segments for debugging
subtitle_file = f"temp/{os.path.basename(video_path).split('.')[0]}_{target_language}.srt"
generate_srt_subtitles(translated_segments, output_file=subtitle_file)
logger.info(f"Generated subtitle file: {subtitle_file}")
# Step 6: Configure voice characteristics for speakers
voice_config = {} # Map of speaker_id to gender or voice config
# Detect number of unique speakers
unique_speakers = set()
for segment in translated_segments:
if 'speaker' in segment:
unique_speakers.add(segment['speaker'])
logger.info(f"Detected {len(unique_speakers)} speakers")
if use_voice_cloning:
# Extract reference audio for voice cloning
logger.info("Extracting speaker reference audio for voice cloning...")
reference_files = diarizer.extract_speaker_references(
clean_audio_path,
speakers,
output_dir="reference_audio"
)
# Create voice config for XTTS
for speaker in sorted(list(unique_speakers)):
match = re.search(r'SPEAKER_(\d+)', speaker)
if match:
speaker_id = int(match.group(1))
if speaker in reference_files:
voice_config[speaker_id] = {
'engine': 'xtts',
'reference_audio': reference_files[speaker],
'language': target_language
}
logger.info(f"Using voice cloning for Speaker {speaker_id+1} with reference file: {os.path.basename(reference_files[speaker])}")
else:
# Fallback to Edge TTS if no reference audio
logger.warning(f"No reference audio found for Speaker {speaker_id+1}, falling back to Edge TTS")
gender = input(f"Select voice gender for Speaker {speaker_id+1} (m/f): ").lower()
voice_config[speaker_id] = {
'engine': 'edge_tts',
'gender': "female" if gender.startswith("f") else "male"
}
else:
# Standard Edge TTS configuration - keeping your current approach
if len(unique_speakers) > 0:
for speaker in sorted(list(unique_speakers)):
match = re.search(r'SPEAKER_(\d+)', speaker)
if match:
speaker_id = int(match.group(1))
gender = input(f"Select voice gender for Speaker {speaker_id+1} (m/f): ").lower()
voice_config[speaker_id] = {
'engine': 'edge_tts',
'gender': "female" if gender.startswith("f") else "male"
}
# Step 7: Generate speech in target language
logger.info("Generating speech...")
dubbed_audio_path = generate_tts(translated_segments, target_language, voice_config, output_dir="audio2")
# Step 8: Create video with mixed audio
logger.info("Creating video with translated audio...")
create_video_with_mixed_audio(video_path, bg_audio_path, dubbed_audio_path)
logger.info("Process completed successfully!")
if __name__ == "__main__":
main()