|
|
|
|
|
import gradio as gr |
|
|
from transformers import pipeline, AutoModel |
|
|
import torch |
|
|
import ffmpeg |
|
|
import nltk |
|
|
import re |
|
|
from deep_translator import MyMemoryTranslator |
|
|
import num2words |
|
|
import soundfile as sf |
|
|
from gradio_client import Client, handle_file |
|
|
from openvoice_cli.__main__ import tune_one |
|
|
import pyrubberband as rb |
|
|
import librosa |
|
|
import os |
|
|
import numpy as np |
|
|
|
|
|
|
|
|
nltk.download('punkt_tab') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
device = "cuda:0" if torch.cuda.is_available() else "cpu" |
|
|
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32 |
|
|
|
|
|
print(f"Using device: {device}") |
|
|
|
|
|
|
|
|
def extract_audio_from_video(video_path, output_audio_path="temp_extracted_audio.wav"): |
|
|
""" |
|
|
Extracts audio from a video file using python-ffmpeg. |
|
|
""" |
|
|
print(f"\n[STEP 1/9] Extracting audio from video: {video_path}") |
|
|
|
|
|
try: |
|
|
( |
|
|
ffmpeg |
|
|
.input(video_path) |
|
|
.output( |
|
|
output_audio_path, |
|
|
vn=None, |
|
|
acodec='mp3', |
|
|
ab='192k', |
|
|
ar='44100', |
|
|
ac=2, |
|
|
f='wav' |
|
|
) |
|
|
.run(overwrite_output=True, quiet=True) |
|
|
) |
|
|
print(f"β
Audio extracted successfully to: {output_audio_path}") |
|
|
return output_audio_path |
|
|
except ffmpeg.Error as e: |
|
|
print(f"Error: Failed to extract audio from video. stderr: {e.stderr.decode('utf8')}") |
|
|
return None |
|
|
|
|
|
def transcribe_audio(audio_path): |
|
|
""" |
|
|
This function takes an audio file path, transcribes it using the Whisper model, |
|
|
and returns the transcribed text. |
|
|
""" |
|
|
|
|
|
|
|
|
THRESHOLDS = { |
|
|
"very_slow": 80, |
|
|
"slow": 110, |
|
|
"normal": 150, |
|
|
"fast": 200, |
|
|
"very_fast": float("inf") |
|
|
} |
|
|
|
|
|
def get_audio_duration(path: str) -> float: |
|
|
"""Return duration of audio file in seconds.""" |
|
|
with sf.SoundFile(path) as f: |
|
|
return len(f) / f.samplerate |
|
|
|
|
|
def compute_wpm(transcript: str, duration_s: float) -> float: |
|
|
"""Compute words per minute.""" |
|
|
if not transcript or duration_s == 0: |
|
|
return 0.0 |
|
|
words = transcript.strip().split() |
|
|
return len(words) / (duration_s / 60.0) |
|
|
|
|
|
def categorize_wpm(wpm: float) -> str: |
|
|
"""Map a WPM value to one of the pace categories.""" |
|
|
for label, threshold in THRESHOLDS.items(): |
|
|
if wpm < threshold: |
|
|
return label |
|
|
return "unknown" |
|
|
|
|
|
|
|
|
transcriber = pipeline( |
|
|
"automatic-speech-recognition", |
|
|
model="openai/whisper-large-v3-turbo", |
|
|
torch_dtype=torch_dtype, |
|
|
device=device, |
|
|
generate_kwargs={"language": "english"}, |
|
|
) |
|
|
|
|
|
if audio_path is None: |
|
|
return "No audio file provided. Please upload or record an audio file." |
|
|
|
|
|
print(f"Transcribing audio file: {audio_path}") |
|
|
|
|
|
|
|
|
result = transcriber(audio_path) |
|
|
|
|
|
transcription = result["text"] |
|
|
|
|
|
print(f"β
Transcription successful: {transcription}") |
|
|
|
|
|
duration_s = get_audio_duration(audio_path) |
|
|
wpm = compute_wpm(transcription, duration_s) |
|
|
pace = categorize_wpm(wpm) |
|
|
print(f"β
> Pace detected: {pace.upper()} ({wpm:.1f} WPM)") |
|
|
|
|
|
return transcription, pace |
|
|
|
|
|
def lang_select(target_lang): |
|
|
LANGUAGE_NAME_TO_CODE = { |
|
|
"Bengali": "bn-IN", "English": "en-IN", "Gujarati": "gu-IN", |
|
|
"Hindi": "hi-IN", "Kannada": "kn-IN", "Malayalam": "ml-IN", |
|
|
"Marathi": "mr-IN", "Odia": "or-IN", "Punjabi": "pa-IN", |
|
|
"Tamil": "ta-IN", "Telugu": "te-IN" |
|
|
} |
|
|
return LANGUAGE_NAME_TO_CODE[target_lang] |
|
|
|
|
|
def translate_local(text_to_translate, target_lang='ta-IN', device=None): |
|
|
""" |
|
|
Translates text from English to a target language, handling texts longer |
|
|
than 500 characters by splitting them into sentence-based chunks. |
|
|
""" |
|
|
|
|
|
text_to_translate = re.sub(r'\d+', lambda match: num2words(int(match.group(0))), text_to_translate) |
|
|
target_lang=lang_select(target_lang.capitalize()) |
|
|
|
|
|
|
|
|
sentences = nltk.sent_tokenize(text_to_translate) |
|
|
|
|
|
|
|
|
chunks = [] |
|
|
current_chunk = "" |
|
|
for sentence in sentences: |
|
|
|
|
|
if len(current_chunk) + len(sentence) + 1 < 500: |
|
|
current_chunk += sentence + " " |
|
|
else: |
|
|
|
|
|
chunks.append(current_chunk.strip()) |
|
|
current_chunk = sentence + " " |
|
|
|
|
|
|
|
|
if current_chunk: |
|
|
chunks.append(current_chunk.strip()) |
|
|
|
|
|
|
|
|
translator = MyMemoryTranslator(source='en-GB', target=target_lang) |
|
|
translated_chunks = [] |
|
|
for chunk in chunks: |
|
|
try: |
|
|
translated_chunks.append(translator.translate(chunk)) |
|
|
except Exception as e: |
|
|
print(f"Could not translate chunk: {chunk}\nError: {e}") |
|
|
translated_chunks.append("") |
|
|
|
|
|
translated_text = " ".join(translated_chunks) |
|
|
|
|
|
print(f"β
Translated Text to {target_lang} Successfully") |
|
|
|
|
|
return translated_text |
|
|
|
|
|
def synthesize_speech(synth_text, target_lang, pace="normal", output_path="temp_audio_synthesized.wav", device="cpu"): |
|
|
|
|
|
ref_audio_path = str('reference/TAMIL/MALE_'+pace.upper()+'.wav') |
|
|
ref_text_path = str('reference/TAMIL/MALE_'+pace.upper()+'.txt') |
|
|
|
|
|
ref_audio_path = ref_audio_path |
|
|
with open(ref_text_path, encoding='utf-8') as f: |
|
|
ref_text = f.read() |
|
|
|
|
|
print("> Loading IndicF5 TTS model (ai4bharat/IndicF5)...") |
|
|
indicf5_repo_id = "ai4bharat/IndicF5" |
|
|
token = os.environ.get("HF_TOKEN") |
|
|
tts_model = AutoModel.from_pretrained(indicf5_repo_id, trust_remote_code=True).to(device) |
|
|
|
|
|
audio = tts_model(synth_text, ref_audio_path=ref_audio_path, ref_text=ref_text) |
|
|
|
|
|
if audio.dtype == np.int16: |
|
|
audio = audio.astype(np.float32) / 32768.0 |
|
|
|
|
|
sf.write(output_path, np.array(audio, dtype=np.float32), samplerate=24000) |
|
|
print(f"β
Speech synthesis complete.") |
|
|
print(f"> Final audio saved to: {output_path}") |
|
|
|
|
|
return output_path |
|
|
|
|
|
def match_audio_duration(original_path, translated_path, output_path="temp_audio_synced.wav"): |
|
|
""" |
|
|
Matches Synthesized Audio duration to Original Audio duration |
|
|
""" |
|
|
print("\n[STEP 7/9] Syncing Audio durations") |
|
|
|
|
|
original_audio, original_sr = librosa.load(original_path, sr=None) |
|
|
original_duration = librosa.get_duration(y=original_audio, sr=original_sr) |
|
|
print(f"Original audio duration: {original_duration:.2f} seconds") |
|
|
|
|
|
|
|
|
translated_audio, translated_sr = librosa.load(translated_path, sr=None) |
|
|
translated_duration = librosa.get_duration(y=translated_audio, sr=translated_sr) |
|
|
print(f"Translated audio duration: {translated_duration:.2f} seconds") |
|
|
|
|
|
|
|
|
|
|
|
rate = translated_duration / original_duration |
|
|
print(f"Stretch rate: {rate:.4f}") |
|
|
|
|
|
|
|
|
|
|
|
adjusted_audio = rb.time_stretch(translated_audio, translated_sr, rate=rate) |
|
|
|
|
|
|
|
|
|
|
|
sf.write(output_path, adjusted_audio, translated_sr) |
|
|
print(f"β
Duration Adjusted audio saved as: {output_path}") |
|
|
return output_path |
|
|
|
|
|
def clone_voice(translated_audio_path, original_audio_path, output_path="temp_audio_cloned.wav", device="cpu"): |
|
|
print("Cloning Voice") |
|
|
|
|
|
tune_one(input_file=translated_audio_path, ref_file=original_audio_path, output_file=output_path, device=device) |
|
|
print(f"β
Voice cloned audio saved to {output_path}") |
|
|
return output_path |
|
|
|
|
|
def merge_audio_video(video_path, audio_path, output_path="temp_merged.mp4"): |
|
|
""" |
|
|
Merges an audio file with a video file into a single output video. |
|
|
""" |
|
|
print("\n[STEP] Merging audio and video...") |
|
|
video_input = ffmpeg.input(video_path) |
|
|
audio_input = ffmpeg.input(audio_path) |
|
|
( |
|
|
ffmpeg.output(video_input.video, audio_input.audio, output_path, vcodec='copy', acodec='aac', shortest=None) |
|
|
.run(overwrite_output=True, quiet=True) |
|
|
) |
|
|
print(f"β
Merged video saved to {output_path}") |
|
|
return output_path |
|
|
|
|
|
def main_run(video_path,target_lang,user_transcript=None, user_translation=None): |
|
|
original_audio_file = extract_audio_from_video(video_path) |
|
|
if user_transcript: |
|
|
original_text , pace = transcribe_audio(original_audio_file) |
|
|
original_text = user_transcript |
|
|
print(f"Using provided transcript: {original_text}") |
|
|
else: |
|
|
original_text , pace = transcribe_audio(original_audio_file) |
|
|
if user_translation: |
|
|
translated_text = user_translation |
|
|
print(f"Using provided translation: {translated_text}") |
|
|
else: |
|
|
translated_text = translate_local(original_text,target_lang) |
|
|
print(f"Translated Text: {translated_text}") |
|
|
translated_audio = synthesize_speech(translated_text, target_lang, pace) |
|
|
synced_translated_audio = match_audio_duration(original_audio_file, translated_audio) |
|
|
cloned_synced_translated_audio = clone_voice(synced_translated_audio, original_audio_file) |
|
|
final_video_nobgm = merge_audio_video(video_path, cloned_synced_translated_audio) |
|
|
print(f"β
Pipeline finished") |
|
|
return final_video_nobgm |
|
|
|
|
|
def audio_pipeline_run(audio_path,target_lang,user_transcript=None, user_translation=None): |
|
|
if user_transcript: |
|
|
original_text , pace = transcribe_audio(audio_path) |
|
|
original_text = user_transcript |
|
|
print(f"Using provided transcript: {original_text}") |
|
|
else: |
|
|
original_text , pace = transcribe_audio(audio_path) |
|
|
if user_translation: |
|
|
translated_text = user_translation |
|
|
print(f"Using provided translation: {translated_text}") |
|
|
else: |
|
|
translated_text = translate_local(original_text,target_lang) |
|
|
print(f"Translated Text: {translated_text}") |
|
|
translated_audio = synthesize_speech(translated_text, target_lang, pace) |
|
|
synced_translated_audio = match_audio_duration(audio_path, translated_audio) |
|
|
cloned_synced_translated_audio = clone_voice(synced_translated_audio, audio_path) |
|
|
print(f"β
Pipeline finished") |
|
|
return cloned_synced_translated_audio |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
title = "Custom Whisper Transcription App" |
|
|
description = """ |
|
|
This is a custom Gradio app that uses the <b>openai/whisper-large-v2</b> model |
|
|
from the Hugging Face Hub for transcription. Upload an audio file or record |
|
|
directly from your microphone to get the transcript. |
|
|
""" |
|
|
article = "<p style='text-align: center'><a href='https://huggingface.co/openai/whisper-large-v3-turbo' target='_blank'>Model Card</a></p>" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
''' |
|
|
app_interface = gr.Interface( |
|
|
fn=main_run, |
|
|
inputs=gr.Video(label="Upload Video"),gr. |
|
|
outputs=gr.Textbox(label="Translation Result"),gr.Radio(choices=["Tamil", "Telugu", "Hindi"], label="Target Language", value="Tamil") |
|
|
title=title, |
|
|
description=description, |
|
|
article=article, |
|
|
allow_flagging="never" |
|
|
) |
|
|
''' |
|
|
|
|
|
with gr.Blocks(title="Audio/Video Translation Toolkit") as app_interface: |
|
|
gr.Markdown("# π Audio/Video Translation Toolkit") |
|
|
gr.Markdown("This might take a while to generate as it's running on the free tier.") |
|
|
gr.Markdown("Please input only English Audio/Video under 30secs.") |
|
|
gr.Markdown("Time taken for 10secs of audio/video is 5-10 mins.") |
|
|
with gr.Tabs(): |
|
|
with gr.Tab("π¬ Translate Video"): |
|
|
with gr.Column(): |
|
|
with gr.Row(): |
|
|
video_in = gr.Video(label="Input Video", height=500) |
|
|
video_out = gr.Video(label="Output Video", interactive=False, height=500) |
|
|
with gr.Row(): |
|
|
|
|
|
|
|
|
lang_radio_vid = gr.Radio(choices=["Tamil", "Telugu", "Hindi"], label="Target Language", value="Tamil") |
|
|
|
|
|
option_select = gr.Radio(choices=["Use my Transcript", "Use my Translation"], label="Optional Input") |
|
|
|
|
|
user_transcript_vid = gr.Textbox(label="Your English Transcript", lines=3, visible=False) |
|
|
user_translation_vid = gr.Textbox(label="Your Translated Text", lines=3, visible=False) |
|
|
submit_btn_vid = gr.Button("Translate Video", variant="primary") |
|
|
|
|
|
|
|
|
option_select.change( |
|
|
fn=lambda choice: ( |
|
|
gr.update(visible=(choice == "Use my Transcript")), |
|
|
gr.update(visible=(choice == "Use my Translation")), |
|
|
), |
|
|
inputs=option_select, |
|
|
outputs=[user_transcript_vid, user_translation_vid], |
|
|
) |
|
|
|
|
|
|
|
|
submit_btn_vid.click(fn=main_run, inputs=[video_in, lang_radio_vid, user_transcript_vid, user_translation_vid], outputs=[video_out]) |
|
|
|
|
|
with gr.Tab("π΅ Translate Audio"): |
|
|
with gr.Column(): |
|
|
with gr.Row(): |
|
|
audio_in = gr.Audio(label="Input Audio") |
|
|
audio_out = gr.Audio(label="Output Audio", interactive=False) |
|
|
with gr.Row(): |
|
|
|
|
|
|
|
|
lang_radio_aud = gr.Radio(choices=["Tamil", "Telugu", "Hindi"], label="Target Language", value="Tamil") |
|
|
|
|
|
option_select = gr.Radio(choices=["Use my Transcript", "Use my Translation"], label="Optional Input") |
|
|
|
|
|
user_transcript_aud = gr.Textbox(label="Your English Transcript", lines=3, visible=False) |
|
|
user_translation_aud = gr.Textbox(label="Your Translated Text", lines=3, visible=False) |
|
|
submit_btn_aud = gr.Button("Translate Audio", variant="primary") |
|
|
|
|
|
|
|
|
option_select.change( |
|
|
fn=lambda choice: ( |
|
|
gr.update(visible=(choice == "Use my Transcript")), |
|
|
gr.update(visible=(choice == "Use my Translation")), |
|
|
), |
|
|
inputs=option_select, |
|
|
outputs=[user_transcript_aud, user_translation_aud], |
|
|
) |
|
|
submit_btn_aud.click(fn=audio_pipeline_run, inputs=[audio_in, lang_radio_aud, user_transcript_aud, user_translation_aud], outputs=[audio_out]) |
|
|
|
|
|
with gr.Tab("βοΈ Extract Audio"): |
|
|
with gr.Row(): |
|
|
video_in_ext = gr.Video(label="Input Video", height=500) |
|
|
audio_out_ext = gr.Audio(label="Extracted Audio") |
|
|
btn_ext = gr.Button("Extract", variant="secondary") |
|
|
btn_ext.click(fn=extract_audio_from_video, inputs=video_in_ext, outputs=audio_out_ext) |
|
|
|
|
|
with gr.Tab("βοΈ Transcribe"): |
|
|
with gr.Row(): |
|
|
audio_in_trans = gr.Audio(type="filepath", label="Input Audio") |
|
|
with gr.Column(): |
|
|
text_out_trans = gr.Textbox(label="Transcription") |
|
|
text_out_pace = gr.Textbox(label="Detected Pace") |
|
|
btn_trans = gr.Button("Transcribe", variant="secondary") |
|
|
btn_trans.click(lambda aud: transcribe_audio(aud), inputs=audio_in_trans, outputs=[text_out_trans, text_out_pace]) |
|
|
|
|
|
with gr.Tab("π Translate Text"): |
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
text_in_tran = gr.Textbox(label="Text to Translate", lines=5) |
|
|
lang_radio_tran = gr.Radio(choices=["Tamil", "Telugu", "Hindi"], label="Target Language", value="Tamil") |
|
|
btn_tran = gr.Button("Translate", variant="secondary") |
|
|
text_out_tran = gr.Textbox(label="Translated Text", lines=5, interactive=False) |
|
|
btn_tran.click(fn=translate_local, inputs=[text_in_tran, lang_radio_tran], outputs=text_out_tran) |
|
|
|
|
|
with gr.Tab("π Synthesize Speech"): |
|
|
with gr.Column(): |
|
|
with gr.Row(): |
|
|
text_in_synth = gr.Textbox(label="Text to Synthesize", lines=5) |
|
|
audio_out_synth = gr.Audio(label="Synthesized Speech") |
|
|
with gr.Row(): |
|
|
lang_radio_tran = gr.Radio(choices=["Tamil", "Telugu", "Hindi"], label="Target Language", value="Tamil") |
|
|
gender_radio_tran = gr.Radio(choices=["Male", "Female"], label="Speaker Gender", value="Male") |
|
|
pace_radio_tran = gr.Radio(choices=["Very_Slow", "Slow", "Normal", "Fast", "Very_Fast"], label="Target Language", value="Normal") |
|
|
btn_synth = gr.Button("Synthesize", variant="secondary") |
|
|
btn_synth.click(fn=synthesize_speech, inputs=[text_in_synth,lang_radio_tran,pace_radio_tran], outputs=audio_out_synth) |
|
|
|
|
|
with gr.Tab("β±οΈ Sync Duration"): |
|
|
with gr.Row(): |
|
|
audio_in_sync1 = gr.Audio(type="filepath", label="Original Audio (for duration reference)") |
|
|
audio_in_sync2 = gr.Audio(type="filepath", label="Translated Audio (to be resized)") |
|
|
audio_out_sync = gr.Audio(label="Duration-Synced Audio") |
|
|
btn_sync = gr.Button("Sync Duration", variant="secondary") |
|
|
btn_sync.click(fn=match_audio_duration, inputs=[audio_in_sync1, audio_in_sync2], outputs=audio_out_sync) |
|
|
|
|
|
with gr.Tab("𧬠Clone Voice"): |
|
|
with gr.Row(): |
|
|
audio_in_clone1 = gr.Audio(type="filepath", label="Target Audio (e.g., Synthesized Speech)") |
|
|
audio_in_clone2 = gr.Audio(type="filepath", label="Reference Audio (Original Speaker's Voice)") |
|
|
audio_out_clone = gr.Audio(label="Cloned Voice Audio") |
|
|
btn_clone = gr.Button("Clone Voice", variant="secondary") |
|
|
btn_clone.click(fn=clone_voice, inputs=[audio_in_clone1, audio_in_clone2], outputs=audio_out_clone) |
|
|
|
|
|
with gr.Tab("ποΈ Replace Audio"): |
|
|
with gr.Row(): |
|
|
video_in_rep = gr.Video(label="Input Video", height=500) |
|
|
audio_in_rep = gr.Audio(type="filepath", label="New Audio") |
|
|
video_out_rep = gr.Video(label="Video with Replaced Audio", height=500) |
|
|
btn_rep = gr.Button("Replace Audio", variant="secondary") |
|
|
btn_rep.click(fn=merge_audio_video, inputs=[video_in_rep, audio_in_rep], outputs=video_out_rep) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
app_interface.launch() |