File size: 20,516 Bytes
b1a3dad
3b463d9
4acea9e
b1a3dad
7a5e7bc
be21d7c
 
 
305d0a7
262ccb4
2241262
04cc0d9
 
 
cc1c2f8
98b094e
be21d7c
 
7dccc42
3b463d9
b1a3dad
 
 
 
 
 
 
 
 
 
7a5e7bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b1a3dad
 
 
 
 
afada3a
 
262ccb4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
afada3a
 
 
 
 
b88a974
afada3a
 
b1a3dad
 
 
 
262ccb4
 
 
 
 
94ae67a
 
262ccb4
 
7b6acc4
262ccb4
94ae67a
262ccb4
 
b1a3dad
ac2b790
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
19359bd
ac2b790
 
 
 
 
 
 
 
 
94ae67a
 
 
e62a1a2
 
e7f93f6
 
 
 
262ccb4
73cb6f5
 
 
262ccb4
05bd67e
 
e648d70
9140986
05bd67e
 
 
 
 
 
 
 
 
 
 
262ccb4
04cc0d9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
94ae67a
04cc0d9
 
94ae67a
04cc0d9
 
94ae67a
 
 
04cc0d9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
305d0a7
7a5e7bc
305d0a7
 
 
 
 
 
 
 
 
 
 
 
e7f93f6
04cc0d9
94ae67a
04cc0d9
94ae67a
04cc0d9
54cce1c
305d0a7
 
 
 
 
 
 
 
 
 
 
 
 
e7f93f6
19359bd
305d0a7
94ae67a
04cc0d9
54cce1c
 
b1a3dad
 
 
 
 
 
 
 
cff7092
b1a3dad
 
 
 
8f0ef9c
b1a3dad
7a5e7bc
54cce1c
 
b1a3dad
 
 
 
3b463d9
8f0ef9c
04cc0d9
ae9278e
3945928
9c36d98
e1b1f60
 
54cce1c
 
305d0a7
 
54cce1c
 
305d0a7
 
 
 
 
 
 
 
 
 
54cce1c
305d0a7
 
 
 
 
 
 
 
 
 
 
 
54cce1c
 
305d0a7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
54cce1c
305d0a7
 
 
 
 
 
 
 
54cce1c
305d0a7
 
 
 
 
 
 
54cce1c
305d0a7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2e1d29a
305d0a7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
54cce1c
305d0a7
b1a3dad
 
305d0a7
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
# app.py
import gradio as gr
from transformers import pipeline, AutoModel
import torch
import ffmpeg
import nltk
import re
from deep_translator import MyMemoryTranslator
import num2words
import soundfile as sf
from gradio_client import Client, handle_file
from openvoice_cli.__main__ import tune_one
import pyrubberband as rb
import librosa
import os
import numpy as np

# You only need to run this download command once
nltk.download('punkt_tab')

# --- Model Loading ---
# We load the model once when the app starts, not on every function call.
# This makes the app much more efficient.
# We also check for GPU availability to speed things up if possible.
device = "cuda:0" if torch.cuda.is_available() else "cpu"
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32

print(f"Using device: {device}")

# --- Transcription Function ---
def extract_audio_from_video(video_path, output_audio_path="temp_extracted_audio.wav"):
    """
    Extracts audio from a video file using python-ffmpeg.
    """
    print(f"\n[STEP 1/9] Extracting audio from video: {video_path}")
    
    try:
        (
            ffmpeg
            .input(video_path)
            .output(
                output_audio_path,
                vn=None,           # Disable video
                acodec='mp3',      # Audio codec
                ab='192k',         # Audio bitrate
                ar='44100',        # Sample rate
                ac=2,              # Audio channels
                f='wav'            # Output format
            )
            .run(overwrite_output=True, quiet=True)
        )
        print(f"βœ… Audio extracted successfully to: {output_audio_path}")
        return output_audio_path
    except ffmpeg.Error as e:
        print(f"Error: Failed to extract audio from video. stderr: {e.stderr.decode('utf8')}")
        return None

def transcribe_audio(audio_path):
    """
    This function takes an audio file path, transcribes it using the Whisper model,
    and returns the transcribed text.
    """
    # Initialize the ASR pipeline from Hugging Face Transformers
    
    THRESHOLDS = {
    "very_slow":  80,
    "slow":       110,
    "normal":     150,
    "fast":       200,
    "very_fast":  float("inf")
    }

    def get_audio_duration(path: str) -> float:
        """Return duration of audio file in seconds."""
        with sf.SoundFile(path) as f:
            return len(f) / f.samplerate

    def compute_wpm(transcript: str, duration_s: float) -> float:
        """Compute words per minute."""
        if not transcript or duration_s == 0:
            return 0.0
        words = transcript.strip().split()
        return len(words) / (duration_s / 60.0)

    def categorize_wpm(wpm: float) -> str:
        """Map a WPM value to one of the pace categories."""
        for label, threshold in THRESHOLDS.items():
            if wpm < threshold:
                return label
        return "unknown"

    
    transcriber = pipeline(
        "automatic-speech-recognition",
        model="openai/whisper-large-v3-turbo",
        torch_dtype=torch_dtype,
        device=device,
        generate_kwargs={"language": "english"},
    )

    if audio_path is None:
        return "No audio file provided. Please upload or record an audio file."
    
    print(f"Transcribing audio file: {audio_path}")
    
    # The pipeline handles all the complex steps of loading and processing the audio
    result = transcriber(audio_path)
    # The result is a dictionary, and we need the 'text' key
    transcription = result["text"]
    
    print(f"βœ… Transcription successful: {transcription}")

    duration_s = get_audio_duration(audio_path)
    wpm = compute_wpm(transcription, duration_s)
    pace = categorize_wpm(wpm)
    print(f"βœ… > Pace detected: {pace.upper()} ({wpm:.1f} WPM)")
    
    return transcription, pace

def lang_select(target_lang):
    LANGUAGE_NAME_TO_CODE = {
        "Bengali": "bn-IN", "English": "en-IN", "Gujarati": "gu-IN",
        "Hindi": "hi-IN", "Kannada": "kn-IN", "Malayalam": "ml-IN",
        "Marathi": "mr-IN", "Odia": "or-IN", "Punjabi": "pa-IN",
        "Tamil": "ta-IN", "Telugu": "te-IN"
    }
    return LANGUAGE_NAME_TO_CODE[target_lang]
    
def translate_local(text_to_translate, target_lang='ta-IN', device=None):
    """
    Translates text from English to a target language, handling texts longer
    than 500 characters by splitting them into sentence-based chunks.
    """
    # 1. Pre-process the text (same as your original code)
    text_to_translate = re.sub(r'\d+', lambda match: num2words(int(match.group(0))), text_to_translate)
    target_lang=lang_select(target_lang.capitalize())
    
    # 2. Split the entire text into individual sentences
    sentences = nltk.sent_tokenize(text_to_translate)

    # 3. Group sentences into chunks under 500 characters
    chunks = []
    current_chunk = ""
    for sentence in sentences:
        # Check if adding the next sentence exceeds the limit
        if len(current_chunk) + len(sentence) + 1 < 500:
            current_chunk += sentence + " "
        else:
            # If it exceeds, add the current chunk to the list and start a new one
            chunks.append(current_chunk.strip())
            current_chunk = sentence + " "
    
    # Add the last remaining chunk to the list
    if current_chunk:
        chunks.append(current_chunk.strip())

    # 4. Translate each chunk and combine the results
    translator = MyMemoryTranslator(source='en-GB', target=target_lang)
    translated_chunks = []
    for chunk in chunks:
        try:
            translated_chunks.append(translator.translate(chunk))
        except Exception as e:
            print(f"Could not translate chunk: {chunk}\nError: {e}")
            translated_chunks.append("") # Add an empty string on error

    translated_text = " ".join(translated_chunks)
    
    print(f"βœ… Translated Text to {target_lang} Successfully")
    
    return translated_text

def synthesize_speech(synth_text, target_lang, pace="normal", output_path="temp_audio_synthesized.wav", device="cpu"):
    
    ref_audio_path = str('reference/TAMIL/MALE_'+pace.upper()+'.wav')
    ref_text_path = str('reference/TAMIL/MALE_'+pace.upper()+'.txt')

    ref_audio_path = ref_audio_path
    with open(ref_text_path, encoding='utf-8') as f:
        ref_text = f.read()

    print("> Loading IndicF5 TTS model (ai4bharat/IndicF5)...")
    indicf5_repo_id = "ai4bharat/IndicF5"
    token = os.environ.get("HF_TOKEN")
    tts_model = AutoModel.from_pretrained(indicf5_repo_id, trust_remote_code=True).to(device)
    
    audio = tts_model(synth_text, ref_audio_path=ref_audio_path, ref_text=ref_text)

    if audio.dtype == np.int16:
        audio = audio.astype(np.float32) / 32768.0
    
    sf.write(output_path, np.array(audio, dtype=np.float32), samplerate=24000)
    print(f"βœ… Speech synthesis complete.")
    print(f"> Final audio saved to: {output_path}")
    
    return output_path

def match_audio_duration(original_path, translated_path, output_path="temp_audio_synced.wav"):
    """
    Matches Synthesized Audio duration to Original Audio duration
    """
    print("\n[STEP 7/9] Syncing Audio durations")
    # Load original audio
    original_audio, original_sr = librosa.load(original_path, sr=None)
    original_duration = librosa.get_duration(y=original_audio, sr=original_sr)
    print(f"Original audio duration: {original_duration:.2f} seconds")

    # Load translated audio
    translated_audio, translated_sr = librosa.load(translated_path, sr=None)
    translated_duration = librosa.get_duration(y=translated_audio, sr=translated_sr)
    print(f"Translated audio duration: {translated_duration:.2f} seconds")

    # Compute the speed-up/slow-down rate
    # If rate > 1.0, audio is sped up. If rate < 1.0, audio is slowed down.
    rate = translated_duration / original_duration
    print(f"Stretch rate: {rate:.4f}")

    # Apply time-stretch using the high-quality rubberband library
    # The parameters are: audio_data, sample_rate, and the desired rate
    adjusted_audio = rb.time_stretch(translated_audio, translated_sr, rate=rate)

    # Save output
    # The sample rate remains the same as the translated audio's original rate
    sf.write(output_path, adjusted_audio, translated_sr)
    print(f"βœ… Duration Adjusted audio saved as: {output_path}")
    return output_path

def clone_voice(translated_audio_path, original_audio_path, output_path="temp_audio_cloned.wav", device="cpu"):
    print("Cloning Voice")
    # Convert the tone color of a single audio file
    tune_one(input_file=translated_audio_path, ref_file=original_audio_path, output_file=output_path, device=device)
    print(f"βœ… Voice cloned audio saved to {output_path}")
    return output_path

def merge_audio_video(video_path, audio_path, output_path="temp_merged.mp4"):
    """
    Merges an audio file with a video file into a single output video.
    """
    print("\n[STEP] Merging audio and video...")
    video_input = ffmpeg.input(video_path)
    audio_input = ffmpeg.input(audio_path)
    (
        ffmpeg.output(video_input.video, audio_input.audio, output_path, vcodec='copy', acodec='aac', shortest=None)
        .run(overwrite_output=True, quiet=True)
    )
    print(f"βœ… Merged video saved to {output_path}")
    return output_path

def main_run(video_path,target_lang,user_transcript=None, user_translation=None):
    original_audio_file = extract_audio_from_video(video_path)
    if user_transcript:
        original_text , pace = transcribe_audio(original_audio_file)
        original_text = user_transcript
        print(f"Using provided transcript: {original_text}")
    else:
        original_text , pace = transcribe_audio(original_audio_file)
    if user_translation:
        translated_text = user_translation
        print(f"Using provided translation: {translated_text}")
    else:
        translated_text = translate_local(original_text,target_lang)
        print(f"Translated Text: {translated_text}")
    translated_audio = synthesize_speech(translated_text, target_lang, pace)
    synced_translated_audio = match_audio_duration(original_audio_file, translated_audio) 
    cloned_synced_translated_audio = clone_voice(synced_translated_audio, original_audio_file)
    final_video_nobgm = merge_audio_video(video_path, cloned_synced_translated_audio)
    print(f"βœ… Pipeline finished")
    return final_video_nobgm

def audio_pipeline_run(audio_path,target_lang,user_transcript=None, user_translation=None):
    if user_transcript:
        original_text , pace = transcribe_audio(audio_path)
        original_text = user_transcript
        print(f"Using provided transcript: {original_text}")
    else:
        original_text , pace = transcribe_audio(audio_path)
    if user_translation:
        translated_text = user_translation
        print(f"Using provided translation: {translated_text}")
    else:
        translated_text = translate_local(original_text,target_lang)
        print(f"Translated Text: {translated_text}")
    translated_audio = synthesize_speech(translated_text, target_lang, pace)
    synced_translated_audio = match_audio_duration(audio_path, translated_audio) 
    cloned_synced_translated_audio = clone_voice(synced_translated_audio, audio_path)
    print(f"βœ… Pipeline finished")
    return cloned_synced_translated_audio


# --- Gradio Interface Definition ---
# Title and description for the new Space
title = "Custom Whisper Transcription App"
description = """
This is a custom Gradio app that uses the <b>openai/whisper-large-v2</b> model 
from the Hugging Face Hub for transcription. Upload an audio file or record 
directly from your microphone to get the transcript.
"""
article = "<p style='text-align: center'><a href='https://huggingface.co/openai/whisper-large-v3-turbo' target='_blank'>Model Card</a></p>"


# Create the Gradio interface with our custom function
# We define the input as an Audio component and the output as a Textbox
'''
app_interface = gr.Interface(
    fn=main_run,
    inputs=gr.Video(label="Upload Video"),gr.
    outputs=gr.Textbox(label="Translation Result"),gr.Radio(choices=["Tamil", "Telugu", "Hindi"], label="Target Language", value="Tamil")
    title=title,
    description=description,
    article=article,
    allow_flagging="never"
)
'''

with gr.Blocks(title="Audio/Video Translation Toolkit") as app_interface:
    gr.Markdown("# πŸš€ Audio/Video Translation Toolkit")
    gr.Markdown("This might take a while to generate as it's running on the free tier.")
    gr.Markdown("Please input only English Audio/Video under 30secs.")
    gr.Markdown("Time taken for 10secs of audio/video is 5-10 mins.")
    with gr.Tabs():
        with gr.Tab("🎬 Translate Video"):
            with gr.Column():
                with gr.Row():
                    video_in = gr.Video(label="Input Video", height=500)
                    video_out = gr.Video(label="Output Video", interactive=False, height=500)
                with gr.Row():
                    # Radio buttons for selecting target language
                    # This allows users to choose one of the mutually exclusive options
                    lang_radio_vid = gr.Radio(choices=["Tamil", "Telugu", "Hindi"], label="Target Language", value="Tamil")
                    # Single-select option for mutually exclusive choices
                    option_select = gr.Radio(choices=["Use my Transcript", "Use my Translation"], label="Optional Input")
                    # Textboxes for user input, initially hidden
                user_transcript_vid = gr.Textbox(label="Your English Transcript", lines=3, visible=False)
                user_translation_vid = gr.Textbox(label="Your Translated Text", lines=3, visible=False)
                submit_btn_vid = gr.Button("Translate Video", variant="primary")
                    
                # Toggle visibility based on selected option (only one can be active)
                option_select.change(
                    fn=lambda choice: (
                        gr.update(visible=(choice == "Use my Transcript")),
                        gr.update(visible=(choice == "Use my Translation")),
                    ),
                    inputs=option_select,
                    outputs=[user_transcript_vid, user_translation_vid],
                )

            # Include the optional transcript/translation textboxes as inputs (they may be hidden)
            submit_btn_vid.click(fn=main_run, inputs=[video_in, lang_radio_vid, user_transcript_vid, user_translation_vid], outputs=[video_out])

        with gr.Tab("🎡 Translate Audio"):
            with gr.Column():
                with gr.Row():
                    audio_in = gr.Audio(label="Input Audio")
                    audio_out = gr.Audio(label="Output Audio", interactive=False)
                with gr.Row():
                    # Radio buttons for selecting target language
                    # This allows users to choose one of the mutually exclusive options
                    lang_radio_aud = gr.Radio(choices=["Tamil", "Telugu", "Hindi"], label="Target Language", value="Tamil")
                    # Single-select option for mutually exclusive choices
                    option_select = gr.Radio(choices=["Use my Transcript", "Use my Translation"], label="Optional Input")
                    # Textboxes for user input, initially hidden
                user_transcript_aud = gr.Textbox(label="Your English Transcript", lines=3, visible=False)
                user_translation_aud = gr.Textbox(label="Your Translated Text", lines=3, visible=False)
                submit_btn_aud = gr.Button("Translate Audio", variant="primary")
                    
                # Toggle visibility based on selected option (only one can be active)
                option_select.change(
                    fn=lambda choice: (
                        gr.update(visible=(choice == "Use my Transcript")),
                        gr.update(visible=(choice == "Use my Translation")),
                    ),
                    inputs=option_select,
                    outputs=[user_transcript_aud, user_translation_aud],
                )                                
            submit_btn_aud.click(fn=audio_pipeline_run, inputs=[audio_in, lang_radio_aud, user_transcript_aud, user_translation_aud], outputs=[audio_out])
            
        with gr.Tab("βœ‚οΈ Extract Audio"):
            with gr.Row():
                video_in_ext = gr.Video(label="Input Video", height=500)
                audio_out_ext = gr.Audio(label="Extracted Audio")
            btn_ext = gr.Button("Extract", variant="secondary")
            btn_ext.click(fn=extract_audio_from_video, inputs=video_in_ext, outputs=audio_out_ext)

        with gr.Tab("✍️ Transcribe"):
            with gr.Row():
                audio_in_trans = gr.Audio(type="filepath", label="Input Audio")
                with gr.Column():
                    text_out_trans = gr.Textbox(label="Transcription")
                    text_out_pace = gr.Textbox(label="Detected Pace")
            btn_trans = gr.Button("Transcribe", variant="secondary")
            btn_trans.click(lambda aud: transcribe_audio(aud), inputs=audio_in_trans, outputs=[text_out_trans, text_out_pace])

        with gr.Tab("🌐 Translate Text"):
            with gr.Row():
                with gr.Column():
                    text_in_tran = gr.Textbox(label="Text to Translate", lines=5)
                    lang_radio_tran = gr.Radio(choices=["Tamil", "Telugu", "Hindi"], label="Target Language", value="Tamil")
                    btn_tran = gr.Button("Translate", variant="secondary")
                text_out_tran = gr.Textbox(label="Translated Text", lines=5, interactive=False)
            btn_tran.click(fn=translate_local, inputs=[text_in_tran, lang_radio_tran], outputs=text_out_tran)

        with gr.Tab("πŸ”Š Synthesize Speech"):
            with gr.Column():
                with gr.Row():
                    text_in_synth = gr.Textbox(label="Text to Synthesize", lines=5)
                    audio_out_synth = gr.Audio(label="Synthesized Speech")
                with gr.Row():
                    lang_radio_tran = gr.Radio(choices=["Tamil", "Telugu", "Hindi"], label="Target Language", value="Tamil")
                    gender_radio_tran = gr.Radio(choices=["Male", "Female"], label="Speaker Gender", value="Male")
                    pace_radio_tran = gr.Radio(choices=["Very_Slow", "Slow", "Normal", "Fast", "Very_Fast"], label="Target Language", value="Normal")
            btn_synth = gr.Button("Synthesize", variant="secondary")
            btn_synth.click(fn=synthesize_speech, inputs=[text_in_synth,lang_radio_tran,pace_radio_tran], outputs=audio_out_synth)

        with gr.Tab("⏱️ Sync Duration"):
            with gr.Row():
                audio_in_sync1 = gr.Audio(type="filepath", label="Original Audio (for duration reference)")
                audio_in_sync2 = gr.Audio(type="filepath", label="Translated Audio (to be resized)")
                audio_out_sync = gr.Audio(label="Duration-Synced Audio")
            btn_sync = gr.Button("Sync Duration", variant="secondary")
            btn_sync.click(fn=match_audio_duration, inputs=[audio_in_sync1, audio_in_sync2], outputs=audio_out_sync)

        with gr.Tab("🧬 Clone Voice"):
            with gr.Row():
                audio_in_clone1 = gr.Audio(type="filepath", label="Target Audio (e.g., Synthesized Speech)")
                audio_in_clone2 = gr.Audio(type="filepath", label="Reference Audio (Original Speaker's Voice)")
                audio_out_clone = gr.Audio(label="Cloned Voice Audio")
            btn_clone = gr.Button("Clone Voice", variant="secondary")
            btn_clone.click(fn=clone_voice, inputs=[audio_in_clone1, audio_in_clone2], outputs=audio_out_clone)

        with gr.Tab("🎞️ Replace Audio"):
            with gr.Row():
                video_in_rep = gr.Video(label="Input Video", height=500)
                audio_in_rep = gr.Audio(type="filepath", label="New Audio")
                video_out_rep = gr.Video(label="Video with Replaced Audio", height=500)
            btn_rep = gr.Button("Replace Audio", variant="secondary")
            btn_rep.click(fn=merge_audio_video, inputs=[video_in_rep, audio_in_rep], outputs=video_out_rep)

            # --- Launch the App ---
if __name__ == "__main__":
    # The launch() method creates a web server and makes the interface accessible.
    app_interface.launch()