| import gradio as gr |
| from datetime import datetime |
| import random |
| from transformers import pipeline |
| from transformers.pipelines.audio_utils import ffmpeg_read |
| from moviepy import ( |
| ImageClip, |
| VideoFileClip, |
| TextClip, |
| CompositeVideoClip, |
| AudioFileClip, |
| concatenate_videoclips |
| ) |
| import subprocess |
| import speech_recognition as sr |
| import json |
| from nltk.tokenize import sent_tokenize |
| import logging |
| from textblob import TextBlob |
| import whisper |
| import time |
| import sqlite3 |
|
|
| |
| PASSCODE = "show_feedback_db" |
|
|
| |
| def handle_feedback(feedback): |
| feedback = feedback.strip() |
| if not feedback: |
| return "Feedback cannot be empty.", None |
|
|
| if feedback == PASSCODE: |
| |
| return "Access granted! Download the database file below.", "feedback.db" |
| else: |
| |
| with sqlite3.connect("feedback.db") as conn: |
| cursor = conn.cursor() |
| cursor.execute("CREATE TABLE IF NOT EXISTS studio_feedback (id INTEGER PRIMARY KEY, comment TEXT)") |
| cursor.execute("INSERT INTO studio_feedback (comment) VALUES (?)", (feedback,)) |
| conn.commit() |
| return "Thank you for your feedback!", None |
|
|
| |
| logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s") |
| logger = logging.getLogger(__name__) |
|
|
| def list_available_fonts(): |
| try: |
| |
| result = subprocess.run( |
| ["fc-list", "--format", "%{file}\\n"], |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE, |
| text=True, |
| check=True |
| ) |
| fonts = result.stdout.splitlines() |
| logger.debug(f"Available fonts:\n{fonts}") |
| return fonts |
| except subprocess.CalledProcessError as e: |
| logger.error(f"Error while listing fonts: {e.stderr}") |
| return [] |
|
|
| def split_into_sentences(text): |
| blob = TextBlob(text) |
| return [str(sentence) for sentence in blob.sentences] |
|
|
| def transcribe_video(video_path): |
| |
| video = VideoFileClip(video_path) |
| audio_path = "audio.wav" |
| video.audio.write_audiofile(audio_path) |
| |
| |
| model = whisper.load_model("base") |
|
|
| |
| result = model.transcribe(audio_path, word_timestamps=True) |
| |
| |
| transcript_with_timestamps = [ |
| { |
| "start": segment["start"], |
| "end": segment["end"], |
| "text": segment["text"] |
| } |
| for segment in result["segments"] |
| ] |
| return transcript_with_timestamps |
|
|
| |
| def get_translation_model(target_language): |
| |
| model_map = { |
| "es": "Helsinki-NLP/opus-mt-en-es", |
| "fr": "Helsinki-NLP/opus-mt-en-fr", |
| "zh": "Helsinki-NLP/opus-mt-en-zh", |
| |
| } |
| return model_map.get(target_language, "Helsinki-NLP/opus-mt-en-zh") |
|
|
| def translate_text(transcription_json, target_language): |
| |
| translation_model_id = get_translation_model(target_language) |
| logger.debug(f"Translation model: {translation_model_id}") |
| translator = pipeline("translation", model=translation_model_id) |
|
|
| |
| translated_json = [] |
|
|
| |
| for entry in transcription_json: |
| original_text = entry["text"] |
| translated_text = translator(original_text)[0]['translation_text'] |
| translated_json.append({ |
| "start": entry["start"], |
| "original": original_text, |
| "translated": translated_text, |
| "end": entry["end"] |
| }) |
| |
| logger.debug("Adding to translated_json: start=%s, original=%s, translated=%s, end=%s", |
| entry["start"], original_text, translated_text, entry["end"]) |
|
|
| |
| return translated_json |
|
|
| def add_transcript_to_video(video_path, translated_json, output_path): |
| |
| video = VideoFileClip(video_path) |
|
|
| |
| text_clips = [] |
|
|
| logger.debug("Full translated_json: %s", translated_json) |
| for entry in translated_json: |
| logger.debug("Processing entry: %s", entry) |
|
|
| font_path = "./NotoSansSC-Regular.ttf" |
|
|
| for entry in translated_json: |
| |
| if isinstance(entry, dict) and "translated" in entry: |
| txt_clip = TextClip( |
| text=entry["translated"], font=font_path, method='caption', color='yellow', size=video.size |
| ).with_start(entry["start"]).with_duration(entry["end"] - entry["start"]).with_position(('bottom')).with_opacity(0.7) |
| text_clips.append(txt_clip) |
| else: |
| raise ValueError(f"Invalid entry format: {entry}") |
| |
| |
| final_video = CompositeVideoClip([video] + text_clips) |
|
|
| |
| final_video.write_videofile(output_path, codec='libx264', audio_codec='aac') |
|
|
| |
| def mock_post_to_platform(platform, content_title): |
| return f"Content '{content_title}' successfully posted on {platform}!" |
|
|
| def mock_analytics(): |
| return { |
| "YouTube": {"Views": random.randint(1000, 5000), "Engagement Rate": f"{random.uniform(5, 15):.2f}%"}, |
| "Instagram": {"Views": random.randint(500, 3000), "Engagement Rate": f"{random.uniform(10, 20):.2f}%"}, |
| } |
|
|
| def update_translations(file, edited_table): |
| """ |
| Update the translations based on user edits in the Gradio Dataframe. |
| """ |
| output_video_path = "output_video.mp4" |
| logger.debug(f"Editable Table: {edited_table}") |
|
|
| try: |
| start_time = time.time() |
|
|
| |
| updated_translations = [ |
| { |
| "start": row["start"], |
| "original": row["original"], |
| "translated": row["translated"], |
| "end": row["end"] |
| } |
| for _, row in edited_table.iterrows() |
| ] |
|
|
| |
| add_transcript_to_video(file.name, updated_translations, output_video_path) |
|
|
| |
| elapsed_time = time.time() - start_time |
| elapsed_time_display = f"Updates applied successfully in {elapsed_time:.2f} seconds." |
|
|
| return output_video_path, elapsed_time_display |
|
|
| except Exception as e: |
| raise ValueError(f"Error updating translations: {e}") |
|
|
| def generate_voiceover(translated_json, language, output_audio_path): |
| from gtts import gTTS |
| |
| |
| full_text = " ".join(entry["translated"] for entry in translated_json) |
| |
| |
| tts = gTTS(text=full_text, lang=language) |
| tts.save(output_audio_path) |
|
|
| def replace_audio_in_video(video_path, new_audio_path, final_video_path): |
| import moviepy.editor as mp |
| |
| video = mp.VideoFileClip(video_path) |
| new_audio = mp.AudioFileClip(new_audio_path) |
| |
| |
| video = video.set_audio(new_audio) |
| |
| |
| video.write_videofile(final_video_path, codec="libx264", audio_codec="aac") |
|
|
| def upload_and_manage(file, language, mode="transcription"): |
| if file is None: |
| return None, [], None, "No file uploaded. Please upload a video/audio file." |
|
|
| try: |
| start_time = time.time() |
|
|
| |
| audio_path = "audio.wav" |
| output_video_path = "output_video.mp4" |
| voiceover_path = "voiceover.wav" |
|
|
| list_available_fonts() |
|
|
| |
| transcription_json = transcribe_video(file.name) |
|
|
| |
| translated_json = translate_text(transcription_json, language) |
|
|
| |
| add_transcript_to_video(file.name, translated_json, output_video_path) |
|
|
| |
| if mode == "transcription_voiceover": |
| generate_voiceover(translated_json, language, voiceover_path) |
| replace_audio_in_video(output_video_path, voiceover_path, output_video_path) |
|
|
| |
| editable_table = [ |
| [float(entry["start"]), entry["original"], entry["translated"], float(entry["end"])] |
| for entry in translated_json |
| ] |
|
|
| |
| elapsed_time = time.time() - start_time |
| elapsed_time_display = f"Processing completed in {elapsed_time:.2f} seconds." |
|
|
| return translated_json, editable_table, output_video_path, elapsed_time_display |
|
|
| except Exception as e: |
| return None, [], None, f"An error occurred: {str(e)}" |
|
|
| |
| def build_interface(): |
| css = """ |
| /* Adjust row height */ |
| .dataframe-container tr { |
| height: 50px !important; |
| } |
| |
| /* Ensure text wrapping and prevent overflow */ |
| .dataframe-container td { |
| white-space: normal !important; |
| word-break: break-word !important; |
| } |
| |
| /* Set column widths */ |
| [data-testid="block-container"] .scrolling-dataframe th:nth-child(1), |
| [data-testid="block-container"] .scrolling-dataframe td:nth-child(1) { |
| width: 5%; /* Start column */ |
| } |
| |
| [data-testid="block-container"] .scrolling-dataframe th:nth-child(2), |
| [data-testid="block-container"] .scrolling-dataframe td:nth-child(2) { |
| width: 45%; /* Original text */ |
| } |
| |
| [data-testid="block-container"] .scrolling-dataframe th:nth-child(3), |
| [data-testid="block-container"] .scrolling-dataframe td:nth-child(3) { |
| width: 45%; /* Translated text */ |
| } |
| |
| [data-testid="block-container"] .scrolling-dataframe th:nth-child(4), |
| [data-testid="block-container"] .scrolling-dataframe td:nth-child(4) { |
| width: 5%; /* End column */ |
| } |
| """ |
| with gr.Blocks(css=css) as demo: |
| gr.Markdown("## Video Localization") |
| with gr.Row(): |
| with gr.Column(scale=4): |
| file_input = gr.File(label="Upload Video/Audio File") |
| language_input = gr.Dropdown(["en", "es", "fr", "zh"], label="Select Language") |
| process_mode = gr.Radio(choices=["Transcription", "Transcription with Voiceover"], label="Choose Processing Type", value="Transcription") |
| submit_button = gr.Button("Post and Process") |
| editable_translations = gr.State(value=[]) |
|
|
| with gr.Column(scale=8): |
| gr.Markdown("## Edit Translations") |
| |
| |
| editable_table = gr.Dataframe( |
| value=[], |
| headers=["start", "original", "translated", "end"], |
| datatype=["number", "str", "str", "number"], |
| row_count=1, |
| col_count=4, |
| interactive=[False, True, True, False], |
| label="Edit Translations", |
| wrap=True |
| ) |
| save_changes_button = gr.Button("Save Changes") |
| processed_video_output = gr.File(label="Download Processed Video", interactive=True) |
| elapsed_time_display = gr.Textbox(label="Elapsed Time", lines=1, interactive=False) |
|
|
| with gr.Column(scale=1): |
| gr.Markdown("**Feedback**") |
| feedback_input = gr.Textbox( |
| placeholder="Leave your feedback here...", |
| label=None, |
| lines=3, |
| ) |
| feedback_btn = gr.Button("Submit Feedback") |
| response_message = gr.Textbox(label=None, lines=1, interactive=False) |
| db_download = gr.File(label="Download Database File", visible=False) |
| |
| |
| def feedback_submission(feedback): |
| message, file_path = handle_feedback(feedback) |
| if file_path: |
| return message, gr.update(value=file_path, visible=True) |
| return message, gr.update(visible=False) |
|
|
| save_changes_button.click( |
| update_translations, |
| inputs=[file_input, editable_table], |
| outputs=[processed_video_output, elapsed_time_display] |
| ) |
|
|
| submit_button.click( |
| upload_and_manage, |
| inputs=[file_input, language_input, process_mode], |
| outputs=[editable_translations, editable_table, processed_video_output, elapsed_time_display] |
| ) |
|
|
| |
| feedback_btn.click( |
| feedback_submission, |
| inputs=[feedback_input], |
| outputs=[response_message, db_download] |
| ) |
|
|
| return demo |
|
|
| |
| demo = build_interface() |
| demo.launch() |