Rafiahmed Patel
cpu for fatser whisper
64846de
import gradio as gr
import tempfile
import os
import shutil
from moviepy.editor import VideoFileClip, AudioFileClip
from faster_whisper import WhisperModel
import torch
import torchaudio as ta
import torchaudio.transforms as transforms
from chatterbox.mtl_tts import ChatterboxMultilingualTTS, SUPPORTED_LANGUAGES
import logging
from typing import List, Dict
from deep_translator import GoogleTranslator
# Try to import spaces for ZeroGPU support (Hugging Face Spaces)
try:
import spaces
SPACES_AVAILABLE = True
except ImportError:
SPACES_AVAILABLE = False
logger_temp = logging.getLogger(__name__)
logger_temp.info("spaces library not available - running without ZeroGPU support")
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Configuration - Auto-detect GPU
# Note: faster-whisper uses ctranslate2 which doesn't work well with ZeroGPU,
# so we always use CPU for Whisper. TTS will use GPU when available.
if torch.cuda.is_available() and not SPACES_AVAILABLE:
# Only use GPU for local CUDA setups, not ZeroGPU
TTS_DEVICE = "cuda"
logger_temp = logging.getLogger(__name__)
logger_temp.info(f"πŸš€ GPU detected! Using CUDA with {torch.cuda.get_device_name(0)} for TTS")
else:
TTS_DEVICE = "cpu"
logger_temp = logging.getLogger(__name__)
if SPACES_AVAILABLE:
logger_temp.info("πŸš€ Running on ZeroGPU - TTS will use GPU inside decorated function")
else:
logger_temp.info("Running on CPU")
# Whisper always uses CPU (ctranslate2 compatibility)
WHISPER_DEVICE = "cpu"
WHISPER_COMPUTE_TYPE = "int8"
# Set temp directory to writable location
os.environ['TMPDIR'] = '/tmp'
tempfile.tempdir = '/tmp'
# Patch torch.load to force CPU mapping
torch_load_orig = torch.load
def torch_load_cpu(*args, **kwargs):
kwargs["map_location"] = torch.device("cpu")
return torch_load_orig(*args, **kwargs)
torch.load = torch_load_cpu
# Global models (loaded once)
whisper_model = None
tts_model = None
# ==================== Model Loading ====================
def load_models():
"""Load models (lazy loading for ZeroGPU compatibility)"""
global whisper_model, tts_model
if whisper_model is None:
logger.info("Loading Whisper model...")
whisper_model = WhisperModel(
"small",
device=WHISPER_DEVICE,
compute_type=WHISPER_COMPUTE_TYPE,
cpu_threads=4
)
logger.info("βœ… Whisper model loaded!")
if tts_model is None:
logger.info("Loading TTS model...")
# In ZeroGPU, determine device at runtime
tts_device = "cuda" if (SPACES_AVAILABLE and torch.cuda.is_available()) else TTS_DEVICE
tts_model = ChatterboxMultilingualTTS.from_pretrained(device=tts_device)
logger.info(f"βœ… TTS model loaded on {tts_device}!")
return whisper_model, tts_model
# ==================== TTS Processing ====================
def generate_translated_audio(
reference_audio_path: str,
segments: List[Dict],
output_path: str,
tts_model,
progress=gr.Progress(),
silence_duration: float = 0.5,
target_language: str = "en"
) -> str:
"""Generate translated audio using Chatterbox TTS with progress updates"""
try:
progress(0, desc=f"Generating TTS for {len(segments)} segments...")
all_wavs = []
silence_samples = int(silence_duration * tts_model.sr)
silence = torch.zeros(1, silence_samples)
total_segments = len(segments)
for counter, segment in enumerate(segments):
# Update progress
prog = (counter + 1) / total_segments
text_preview = segment['translated_text'][:50]
progress(prog, desc=f"Processing segment {counter + 1}/{total_segments}: {text_preview}...")
original_duration = segment['end'] - segment['start']
logger.info(f"Generating audio for text: {segment['translated_text']}")
# Send heartbeat progress update before generation
progress(prog, desc=f"πŸŽ™οΈ Generating audio for segment {counter + 1}/{total_segments}...")
# Generate audio for this segment
wav = tts_model.generate(
segment['translated_text'],
language_id = target_language,
audio_prompt_path=reference_audio_path,
exaggeration=0.2,
cfg_weight=0.8,
temperature=0.4,
repetition_penalty=1.2,
min_p=0.05,
top_p=0.9
)
generated_duration = wav.shape[-1] / tts_model.sr
# Add leading silence for the first segment (from 0.0 to segment start)
if counter == 0 and segment['start'] > 0:
leading_silence_duration = segment['start']
leading_silence_samples = int(leading_silence_duration * tts_model.sr)
leading_silence = torch.zeros((wav.shape[0], leading_silence_samples), dtype=wav.dtype, device=wav.device)
all_wavs.append(leading_silence)
# Handle duration matching
if generated_duration < original_duration:
# Generated audio is shorter - add it as is
all_wavs.append(wav)
# Add trailing silence to match original segment duration
trailing_silence_duration = original_duration - generated_duration
trailing_silence_samples = int(trailing_silence_duration * tts_model.sr)
if trailing_silence_samples > 0:
trailing_silence = torch.zeros((wav.shape[0], trailing_silence_samples), dtype=wav.dtype, device=wav.device)
all_wavs.append(trailing_silence)
elif generated_duration > original_duration:
# Generated audio is longer - speed it up to fit
speed_factor = generated_duration / original_duration
speed_transform = transforms.Speed(tts_model.sr, speed_factor)
wav_adjusted, _ = speed_transform(wav)
all_wavs.append(wav_adjusted)
else:
# Duration matches perfectly
all_wavs.append(wav)
# Add silence between segments (not after the last segment)
if counter < len(segments) - 1:
next_segment = segments[counter + 1]
gap_duration = next_segment['start'] - segment['end']
if gap_duration > 0:
gap_samples = int(gap_duration * tts_model.sr)
gap_silence = torch.zeros((wav.shape[0], gap_samples), dtype=wav.dtype, device=wav.device)
all_wavs.append(gap_silence)
# Save output
progress(0.95, desc="Combining audio segments...")
combined_wav = torch.cat(all_wavs, dim=-1)
ta.save(output_path, combined_wav, tts_model.sr)
total_duration = combined_wav.shape[-1] / tts_model.sr
logger.info(f"TTS completed! Total duration: {total_duration:.2f}s")
progress(1.0, desc="TTS generation completed!")
return output_path
except Exception as e:
logger.exception("Error generating TTS audio")
raise
# ==================== Helper Functions ====================
def audio_extractor(video_path):
"""Extract audio from video"""
video_clip = VideoFileClip(video_path)
audio_clip = video_clip.audio
temp_file = tempfile.NamedTemporaryFile(suffix='.wav', delete=False, dir='/tmp')
full_audio_path = temp_file.name
temp_file.close()
audio_clip.write_audiofile(full_audio_path, codec='pcm_s16le', logger=None)
audio_clip.close()
video_clip.close()
return full_audio_path
def transcribe(full_audio_path, whisper_model, progress=None):
"""Transcribe audio using faster-whisper"""
if progress:
progress(0, desc="Transcribing audio...")
# faster-whisper transcription
segments_generator, info = whisper_model.transcribe(
full_audio_path,
beam_size=5,
word_timestamps=True,
vad_filter=False,
# vad_parameters=dict(min_silence_duration_ms=500)
)
detected_language = info.language
if progress:
progress(0, desc=f"Detected language: {detected_language}")
# Convert generator to list and format segments
segments = []
for segment in segments_generator:
seg_dict = {
"start": segment.start,
"end": segment.end,
"text": segment.text.strip(),
"words": []
}
# Add word-level timestamps if available
if segment.words:
for word in segment.words:
seg_dict["words"].append({
"word": word.word,
"start": word.start,
"end": word.end
})
segments.append(seg_dict)
result = {
"segments": segments,
"language": detected_language,
"language_code": detected_language
}
if progress:
progress(0, desc=f"Transcribed {len(segments)} segments")
return result
def translate_segments(segments: List[Dict], target_lang: str) -> List[Dict]:
"""Translate segments to target language using deep-translator"""
results = []
translator = GoogleTranslator(source='auto', target=target_lang)
for seg in segments:
clean_seg = {k: v for k, v in seg.items() if k != "words"}
if not clean_seg["text"] or clean_seg["text"].isspace():
translated_text = ""
else:
translated_text = translator.translate(clean_seg["text"])
clean_seg["translated_text"] = translated_text
results.append(clean_seg)
return results
def replace_video_audio(video_path, new_audio_path, output_video_path):
"""Replace video audio with proper temp file handling"""
# Set MoviePy temp directory
os.environ['FFMPEG_BINARY'] = 'ffmpeg'
video_clip = VideoFileClip(video_path)
new_audio_clip = AudioFileClip(new_audio_path)
video_duration = video_clip.duration
audio_duration = new_audio_clip.duration
if audio_duration < video_duration:
final_video = video_clip.subclip(0, audio_duration)
final_audio = new_audio_clip
elif audio_duration > video_duration:
final_video = video_clip
final_audio = new_audio_clip.subclip(0, video_duration)
else:
final_video = video_clip
final_audio = new_audio_clip
final_clip = final_video.set_audio(final_audio)
# Write with explicit temp audiofile location
final_clip.write_videofile(
output_video_path,
codec='libx264',
audio_codec='aac',
temp_audiofile=f'/tmp/temp-audio-{os.getpid()}.m4a',
remove_temp=True,
logger=None
)
video_clip.close()
new_audio_clip.close()
final_audio.close()
final_video.close()
final_clip.close()
def format_transcription(transcription, translated_segments):
"""Format transcription for display"""
output = ""
for i, seg in enumerate(translated_segments):
output += f"**Segment {i+1}** ({seg['start']:.2f}s - {seg['end']:.2f}s)\n"
output += f"*Original:* {transcription['segments'][i]['text']}\n"
output += f"*Translated:* {seg['translated_text']}\n"
output += "---\n"
return output
# ==================== Main Processing Function ====================
# Apply ZeroGPU decorator if available (for Hugging Face Spaces)
if SPACES_AVAILABLE:
@spaces.GPU
def process_video(video_file, target_language, progress=gr.Progress()):
"""Main processing function for Gradio"""
if video_file is None:
return None, "Please upload a video file.", ""
temp_dir = tempfile.mkdtemp(dir='/tmp')
try:
# Load models
progress(0.05, desc="Loading models...")
whisper_mdl, tts_mdl = load_models()
# Copy uploaded video to temp directory
input_video_path = os.path.join(temp_dir, "input_video.mp4")
shutil.copy(video_file, input_video_path)
# Extract audio
progress(0.1, desc="Extracting audio from video...")
audio_path = audio_extractor(input_video_path)
# Transcribe
progress(0.2, desc="Transcribing audio...")
transcription = transcribe(audio_path, whisper_mdl, progress)
status_msg = f"βœ… Transcribed {len(transcription['segments'])} segments\n"
# Translate
progress(0.4, desc="Translating segments...")
translated_segments = translate_segments(transcription['segments'], target_language)
status_msg += f"βœ… Translated {len(translated_segments)} segments\n"
# Generate TTS
progress(0.5, desc="Generating voice-cloned audio...")
output_audio_path = os.path.join(temp_dir, "translated_audio.wav")
generate_translated_audio(
reference_audio_path=audio_path,
segments=translated_segments,
output_path=output_audio_path,
tts_model=tts_mdl,
progress=progress,
silence_duration=0.5,
target_language=target_language
)
status_msg += "βœ… TTS audio generated successfully!\n"
# Merge audio with video
progress(0.9, desc="Merging audio with video...")
output_video_path = os.path.join(temp_dir, "translated_video.mp4")
replace_video_audio(input_video_path, output_audio_path, output_video_path)
status_msg += "βœ… Video translation completed successfully!"
# Format transcription
transcription_text = format_transcription(transcription, translated_segments)
progress(1.0, desc="Complete!")
return output_video_path, status_msg, transcription_text
except Exception as e:
logger.exception("Error in translation pipeline")
return None, f"❌ Error: {str(e)}", ""
finally:
# Clean up audio file if it exists
try:
if 'audio_path' in locals() and os.path.exists(audio_path):
os.remove(audio_path)
except:
pass
else:
def process_video(video_file, target_language, progress=gr.Progress()):
"""Main processing function for Gradio"""
if video_file is None:
return None, "Please upload a video file.", ""
temp_dir = tempfile.mkdtemp(dir='/tmp')
try:
# Load models
progress(0.05, desc="Loading models...")
whisper_mdl, tts_mdl = load_models()
# Copy uploaded video to temp directory
input_video_path = os.path.join(temp_dir, "input_video.mp4")
shutil.copy(video_file, input_video_path)
# Extract audio
progress(0.1, desc="Extracting audio from video...")
audio_path = audio_extractor(input_video_path)
# Transcribe
progress(0.2, desc="Transcribing audio...")
transcription = transcribe(audio_path, whisper_mdl, progress)
status_msg = f"βœ… Transcribed {len(transcription['segments'])} segments\n"
# Translate
progress(0.4, desc="Translating segments...")
translated_segments = translate_segments(transcription['segments'], target_language)
status_msg += f"βœ… Translated {len(translated_segments)} segments\n"
# Generate TTS
progress(0.5, desc="Generating voice-cloned audio...")
output_audio_path = os.path.join(temp_dir, "translated_audio.wav")
generate_translated_audio(
reference_audio_path=audio_path,
segments=translated_segments,
output_path=output_audio_path,
tts_model=tts_mdl,
progress=progress,
silence_duration=0.5,
target_language=target_language
)
status_msg += "βœ… TTS audio generated successfully!\n"
# Merge audio with video
progress(0.9, desc="Merging audio with video...")
output_video_path = os.path.join(temp_dir, "translated_video.mp4")
replace_video_audio(input_video_path, output_audio_path, output_video_path)
status_msg += "βœ… Video translation completed successfully!"
# Format transcription
transcription_text = format_transcription(transcription, translated_segments)
progress(1.0, desc="Complete!")
return output_video_path, status_msg, transcription_text
except Exception as e:
logger.exception("Error in translation pipeline")
return None, f"❌ Error: {str(e)}", ""
finally:
# Clean up audio file if it exists
try:
if 'audio_path' in locals() and os.path.exists(audio_path):
os.remove(audio_path)
except:
pass
# ==================== Gradio Interface ====================
def create_interface():
"""Create Gradio interface"""
with gr.Blocks(title="Video Voice Translator", theme=gr.themes.Soft()) as demo:
gr.Markdown(
"""
# 🎬 Video Voice Translator
Upload a video, and we'll translate it to your target language while preserving the voice!
"""
)
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### πŸ“€ Upload Video")
video_input = gr.Video(label="Choose a video file", height=550)
target_language = gr.Dropdown(
choices=[(name, code) for code, name in ChatterboxMultilingualTTS.get_supported_languages().items()],
value="en",
label="Target Language",
info="Select the target language for text-to-speech synthesis"
)
# gr.Markdown("### βš™οΈ Configuration")
# target_language = gr.Dropdown(
# choices=[
# ("English", "en"),
# ("Hindi", "hi"),
# ("Spanish", "es"),
# ("French", "fr"),
# ("German", "de"),
# ("Italian", "it"),
# ("Portuguese", "pt"),
# ("Russian", "ru"),
# ("Japanese", "ja"),
# ("Korean", "ko"),
# ("Chinese (Simplified)", "zh-cn"),
# ],
# value="en",
# label="Target Language",
# type="value"
# )
translate_btn = gr.Button("πŸš€ Start Translation", variant="primary", size="lg")
gr.Markdown(
"""
### About
This app uses:
- **faster-whisper** for transcription
- **Google Translate** for translation
- **Chatterbox** for voice cloning TTS
All processing runs locally in this app.
"""
)
with gr.Column(scale=1):
gr.Markdown("### πŸ“₯ Output")
status_output = gr.Textbox(label="Status", lines=5, interactive=False)
video_output = gr.Video(label="Translated Video", height=550)
with gr.Accordion("πŸ“ View Transcription & Translation", open=False):
transcription_output = gr.Markdown()
# Connect the button to the processing function
translate_btn.click(
fn=process_video,
inputs=[video_input, target_language],
outputs=[video_output, status_output, transcription_output]
).then(
fn=lambda: gr.Button(interactive=True),
outputs=[translate_btn]
)
# Disable button when clicked
translate_btn.click(
fn=lambda: gr.Button(interactive=False),
outputs=[translate_btn],
queue=False
)
gr.Markdown(
"""
---
**Note:** Processing time depends on video length and number of segments.
Large videos may take several minutes to process.
"""
)
return demo
# ==================== Main ====================
if __name__ == "__main__":
# Load models at startup (except in ZeroGPU where GPU isn't available yet)
if not SPACES_AVAILABLE:
logger.info("Initializing models...")
load_models()
logger.info("Models loaded successfully!")
else:
logger.info("Running in ZeroGPU mode - models will be loaded on first request")
# Create and launch interface
# .queue() is essential for long-running tasks like model generation
demo = create_interface()
demo.queue(max_size=20, default_concurrency_limit=2).launch(
server_name="0.0.0.0",
server_port=7860,
share=False
)