| | from elevenlabs import VoiceSettings |
| | from elevenlabs.client import ElevenLabs |
| | from transformers import M2M100ForConditionalGeneration, M2M100Tokenizer |
| | from ai71 import AI71 |
| | from datetime import datetime |
| | import os |
| | import time |
| | from pydub import AudioSegment |
| | from base64 import b64encode |
| | import gradio as gr |
| | import concurrent.futures |
| | |
| | |
| | |
| |
|
| | |
| | |
| |
|
| | AI71_API_KEY = os.getenv('AI71_API_KEY') |
| | XI_API_KEY = os.getenv('ELEVEN_LABS_API_KEY') |
| | client = ElevenLabs(api_key=XI_API_KEY) |
| |
|
| | translator = M2M100ForConditionalGeneration.from_pretrained("facebook/m2m100_1.2B") |
| | tokenizer = M2M100Tokenizer.from_pretrained("facebook/m2m100_1.2B") |
| |
|
| | transcriber = gr.load("models/openai/whisper-large-v3-turbo") |
| | |
| |
|
| | language_codes = {"English":"en", "Hindi":"hi", "Portuguese":"pt", "Chinese":"zh", "Spanish":"es", |
| | "French":"fr", "German":"de", "Japanese":"ja", "Arabic":"ar", "Russian":"ru", |
| | "Korean":"ko", "Indonesian":"id", "Italian":"it", "Dutch":"nl","Turkish":"tr", |
| | "Polish":"pl", "Swedish":"sv", "Filipino":"fil", "Malay":"ms", "Romanian":"ro", |
| | "Ukrainian":"uk", "Greek":"el", "Czech":"cs", "Danish":"da", "Finnish":"fi", |
| | "Bulgarian":"bg", "Croatian":"hr", "Slovak":"sk"} |
| |
|
| | |
| | n_participants = 4 |
| | language_choices = ["English", "Polish", "Hindi", "Arabic"] |
| |
|
| | def clear_all(): |
| | |
| | meeting_texts = [] |
| | return [None] * (n_participants * 4 + 1) |
| |
|
| |
|
| | def wait_for_dubbing_completion(dubbing_id: str) -> bool: |
| | """ |
| | Waits for the dubbing process to complete by periodically checking the status. |
| | |
| | Args: |
| | dubbing_id (str): The dubbing project id. |
| | |
| | Returns: |
| | bool: True if the dubbing is successful, False otherwise. |
| | """ |
| | MAX_ATTEMPTS = 120 |
| | CHECK_INTERVAL = 10 |
| |
|
| | for _ in range(MAX_ATTEMPTS): |
| | metadata = client.dubbing.get_dubbing_project_metadata(dubbing_id) |
| | if metadata.status == "dubbed": |
| | return True |
| | elif metadata.status == "dubbing": |
| | print( |
| | "Dubbing in progress... Will check status again in", |
| | CHECK_INTERVAL, |
| | "seconds.", |
| | ) |
| | time.sleep(CHECK_INTERVAL) |
| | else: |
| | print("Dubbing failed:", metadata.error_message) |
| | return False |
| |
|
| | print("Dubbing timed out") |
| | return False |
| |
|
| | def download_dubbed_file(dubbing_id: str, language_code: str) -> str: |
| | """ |
| | Downloads the dubbed file for a given dubbing ID and language code. |
| | |
| | Args: |
| | dubbing_id: The ID of the dubbing project. |
| | language_code: The language code for the dubbing. |
| | |
| | Returns: |
| | The file path to the downloaded dubbed file. |
| | """ |
| | dir_path = f"data/{dubbing_id}" |
| | os.makedirs(dir_path, exist_ok=True) |
| |
|
| | file_path = f"{dir_path}/{language_code}.mp4" |
| | with open(file_path, "wb") as file: |
| | for chunk in client.dubbing.get_dubbed_file(dubbing_id, language_code): |
| | file.write(chunk) |
| |
|
| | return file_path |
| |
|
| | def create_dub_from_file( |
| | input_file_path: str, |
| | file_format: str, |
| | source_language: str, |
| | target_language: str, |
| | ): |
| | |
| | """ |
| | Dubs an audio or video file from one language to another and saves the output. |
| | |
| | Args: |
| | input_file_path (str): The file path of the audio or video to dub. |
| | file_format (str): The file format of the input file. |
| | source_language (str): The language of the input file. |
| | target_language (str): The target language to dub into. |
| | |
| | Returns: |
| | Optional[str]: The file path of the dubbed file or None if operation failed. |
| | """ |
| | if not os.path.isfile(input_file_path): |
| | raise FileNotFoundError(f"The input file does not exist: {input_file_path}") |
| |
|
| | with open(input_file_path, "rb") as audio_file: |
| | response = client.dubbing.dub_a_video_or_an_audio_file( |
| | file=(os.path.basename(input_file_path), audio_file, file_format), |
| | target_lang=target_language, |
| | |
| | source_lang=source_language, |
| | num_speakers=1, |
| | watermark=True, |
| | ) |
| |
|
| | |
| | dubbing_id = response.dubbing_id |
| | if wait_for_dubbing_completion(dubbing_id): |
| | output_file_path = download_dubbed_file(dubbing_id, target_language) |
| | return output_file_path |
| | else: |
| | return None |
| |
|
| |
|
| | def summarize(meeting_texts): |
| | mt = ', '.join([f"{k}: {v}" for i in meeting_texts for k, v in i.items()]) |
| | meeting_date_time = str(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) |
| | meeting_texts = meeting_date_time + '\n' + mt |
| |
|
| | minutes_of_meeting = "" |
| | for chunk in AI71(AI71_API_KEY.strip()).chat.completions.create( |
| | model="tiiuae/falcon-180b-chat", |
| | messages=[ |
| | {"role": "system", "content": f"""You are an expereiced Secretary who can summarize meeting discussions into minutes of meeting. |
| | Summarize the meetings discussions provided as Speakerwise conversation. |
| | Strictly consider only the context given in user content {meeting_texts} for summarization. |
| | Ensure to mention the title as 'Minutes of Meeting held on {meeting_date_time} and present the summary with better viewing format and title in bold letters"""}, |
| | {"role": "user", "content": meeting_texts}, |
| | ], |
| | stream=True, |
| | ): |
| | if chunk.choices[0].delta.content: |
| | summary = chunk.choices[0].delta.content |
| | minutes_of_meeting += summary |
| | minutes_of_meeting = minutes_of_meeting.replace('User:', '').strip() |
| | print("\n") |
| | print("minutes_of_meeting:", minutes_of_meeting) |
| | return minutes_of_meeting |
| |
|
| |
|
| | |
| | def speech_to_text(video): |
| | print(video, type(video)) |
| | print('Started transcribing') |
| | audio = AudioSegment.from_file(video) |
| | audio.export('temp.wav', format="wav") |
| | |
| | |
| | |
| | transcript = transcriber("temp.wav").split("'")[1].strip() |
| |
|
| | print('transcript:', transcript) |
| | return transcript |
| |
|
| | |
| | def translate_text(text, source_language,target_language): |
| | tokenizer.src_lang = source_language |
| | encoded_ln = tokenizer(text, return_tensors="pt") |
| | generated_tokens = translator.generate(**encoded_ln, forced_bos_token_id=tokenizer.get_lang_id(target_language)) |
| | translated_text = tokenizer.batch_decode(generated_tokens, skip_special_tokens=True)[0] |
| | print('translated_text:', translated_text) |
| | return translated_text |
| |
|
| | |
| | def synthesize_speech(video, source_language,target_language): |
| | print('Started dubbing') |
| | dub_video = create_dub_from_file(input_file_path = video, |
| | file_format = 'audio/mpeg', |
| | source_language = source_language, |
| | target_language = target_language) |
| | return dub_video |
| |
|
| | |
| | def process_speaker(video, speaker_idx, n_participants, meeting_texts, *language_list): |
| | transcript = speech_to_text(video) |
| |
|
| | |
| | outputs = [] |
| | |
| | def process_translation_dubbing(i): |
| | if i != speaker_idx: |
| | participant_language = language_codes[language_list[i]] |
| | speaker_language = language_codes[language_list[speaker_idx]] |
| | translated_text = translate_text(transcript, speaker_language, participant_language) |
| | dubbed_video = synthesize_speech(video, speaker_language, participant_language) |
| | return translated_text, dubbed_video |
| | return None, None |
| |
|
| | with concurrent.futures.ThreadPoolExecutor() as executor: |
| | futures = [executor.submit(process_translation_dubbing, i) for i in range(n_participants)] |
| | results = [f.result() for f in futures] |
| |
|
| | for i, (translated_text, dubbed_video) in enumerate(results): |
| | if i == speaker_idx: |
| | outputs.insert(0, transcript) |
| | else: |
| | outputs.append(translated_text) |
| | outputs.append(dubbed_video) |
| | |
| | if speaker_idx == 0: |
| | meeting_texts.append({f"Speaker_{speaker_idx+1}":outputs[0]}) |
| | else: |
| | meeting_texts.append({f"Speaker_{speaker_idx+1}":outputs[1]}) |
| |
|
| | print(len(outputs)) |
| | print(outputs) |
| | outputs.extend(meeting_texts) |
| | print('meeting_texts: ',meeting_texts) |
| | return outputs |
| |
|
| | |
| | def create_participant_row(i, language_choices): |
| | """Creates the UI for a single participant.""" |
| | with gr.Row(): |
| | video_input = gr.Video(label=f"Participant {i+1} Video", interactive=True) |
| | language_dropdown = gr.Dropdown(choices=language_choices, label=f"Participant {i+1} Language", value=language_choices[i]) |
| | transcript_output = gr.Textbox(label=f"Participant {i+1} Transcript") |
| | translated_text = gr.Textbox(label="Speaker's Translated Text") |
| | dubbed_video = gr.Video(label="Speaker's Dubbed Video") |
| | return video_input, language_dropdown, transcript_output, translated_text, dubbed_video |
| |
|
| | |
| | |
| | def create_gradio_interface(n_participants, language_choices): |
| | with gr.Blocks() as demo: |
| | gr.Markdown("""# LinguaPolis: Bridging Languages, Uniting Teams Globally - Multilingual Conference Call Simulation |
| | ## Record your video or upload your video and press the corresponding Submit button at the bottom""") |
| | |
| | meeting_texts = [] |
| |
|
| | video_inputs = [] |
| | language_dropdowns = [] |
| | transcript_outputs = [] |
| | translated_texts = [] |
| | dubbed_videos = [] |
| |
|
| | clear_button = gr.Button("Clear All") |
| |
|
| | |
| | for i in range(n_participants): |
| | video_input, language_dropdown, transcript_output, translated_text, dubbed_video = create_participant_row(i, language_choices) |
| | video_inputs.append(video_input) |
| | language_dropdowns.append(language_dropdown) |
| | transcript_outputs.append(transcript_output) |
| | translated_texts.append(translated_text) |
| | dubbed_videos.append(dubbed_video) |
| |
|
| | |
| | for i in range(n_participants): |
| | gr.Button(f"Submit Speaker {i+1}'s Speech").click( |
| | process_speaker, |
| | |
| | [video_inputs[i], gr.State(i), gr.State(n_participants)] + [gr.State(meeting_texts)] + [language_dropdowns[j] for j in range(n_participants)], |
| | [transcript_outputs[i]] + [k for j in zip(translated_texts[:i]+translated_texts[i+1:], dubbed_videos[:i]+dubbed_videos[i+1:]) for k in j] + [gr.State(meeting_texts)] |
| | ) |
| | minutes = gr.Textbox(label="Minutes of Meeting") |
| | gr.Button(f"Generate Minutes of meeting").click(summarize, meeting_texts, minutes) |
| |
|
| | |
| | clear_button.click(clear_all, None, [*video_inputs, *transcript_outputs, *translated_texts, *dubbed_videos, minutes]) |
| |
|
| | |
| | demo.queue().launch(debug=True, share=True) |
| |
|
| |
|
| | create_gradio_interface(n_participants, language_choices) |