| from elevenlabs import VoiceSettings |
| from elevenlabs.client import ElevenLabs |
| from transformers import M2M100ForConditionalGeneration, M2M100Tokenizer |
| import whisper |
| from ai71 import AI71 |
| from datetime import datetime |
| import os |
| import time |
| from pydub import AudioSegment |
| from IPython.display import Audio, display, Video, HTML |
| |
| from base64 import b64encode |
| import gradio as gr |
| import concurrent.futures |
|
|
| AI71_API_KEY = os.get('AI71_API_KEY') |
| XI_API_KEY = os.get('ELEVEN_LABS_API_KEY') |
| client = ElevenLabs(api_key=XI_API_KEY) |
|
|
| model = M2M100ForConditionalGeneration.from_pretrained("facebook/m2m100_1.2B") |
| tokenizer = M2M100Tokenizer.from_pretrained("facebook/m2m100_1.2B") |
| transcriber = whisper.load_model("turbo") |
|
|
| language_codes = {"English":"en", "Hindi":"hi", "Portuguese":"pt", "Chinese":"zh", "Spanish":"es", |
| "French":"fr", "German":"de", "Japanese":"ja", "Arabic":"ar", "Russian":"ru", |
| "Korean":"ko", "Indonesian":"id", "Italian":"it", "Dutch":"nl","Turkish":"tr", |
| "Polish":"pl", "Swedish":"sv", "Filipino":"fil", "Malay":"ms", "Romanian":"ro", |
| "Ukrainian":"uk", "Greek":"el", "Czech":"cs", "Danish":"da", "Finnish":"fi", |
| "Bulgarian":"bg", "Croatian":"hr", "Slovak":"sk"} |
|
|
| meeting_texts = [] |
| n_participants = 4 |
| language_choices = ["English", "Polish", "Hindi", "Arabic"] |
|
|
|
|
| def wait_for_dubbing_completion(dubbing_id: str) -> bool: |
| """ |
| Waits for the dubbing process to complete by periodically checking the status. |
| |
| Args: |
| dubbing_id (str): The dubbing project id. |
| |
| Returns: |
| bool: True if the dubbing is successful, False otherwise. |
| """ |
| MAX_ATTEMPTS = 120 |
| CHECK_INTERVAL = 10 |
|
|
| for _ in range(MAX_ATTEMPTS): |
| metadata = client.dubbing.get_dubbing_project_metadata(dubbing_id) |
| if metadata.status == "dubbed": |
| return True |
| elif metadata.status == "dubbing": |
| print( |
| "Dubbing in progress... Will check status again in", |
| CHECK_INTERVAL, |
| "seconds.", |
| ) |
| time.sleep(CHECK_INTERVAL) |
| else: |
| print("Dubbing failed:", metadata.error_message) |
| return False |
|
|
| print("Dubbing timed out") |
| return False |
|
|
| def download_dubbed_file(dubbing_id: str, language_code: str) -> str: |
| """ |
| Downloads the dubbed file for a given dubbing ID and language code. |
| |
| Args: |
| dubbing_id: The ID of the dubbing project. |
| language_code: The language code for the dubbing. |
| |
| Returns: |
| The file path to the downloaded dubbed file. |
| """ |
| dir_path = f"data/{dubbing_id}" |
| os.makedirs(dir_path, exist_ok=True) |
|
|
| file_path = f"{dir_path}/{language_code}.mp4" |
| with open(file_path, "wb") as file: |
| for chunk in client.dubbing.get_dubbed_file(dubbing_id, language_code): |
| file.write(chunk) |
|
|
| return file_path |
|
|
| def create_dub_from_file( |
| input_file_path: str, |
| file_format: str, |
| source_language: str, |
| target_language: str, |
| ): |
| |
| """ |
| Dubs an audio or video file from one language to another and saves the output. |
| |
| Args: |
| input_file_path (str): The file path of the audio or video to dub. |
| file_format (str): The file format of the input file. |
| source_language (str): The language of the input file. |
| target_language (str): The target language to dub into. |
| |
| Returns: |
| Optional[str]: The file path of the dubbed file or None if operation failed. |
| """ |
| if not os.path.isfile(input_file_path): |
| raise FileNotFoundError(f"The input file does not exist: {input_file_path}") |
|
|
| with open(input_file_path, "rb") as audio_file: |
| response = client.dubbing.dub_a_video_or_an_audio_file( |
| file=(os.path.basename(input_file_path), audio_file, file_format), |
| target_lang=target_language, |
| |
| source_lang=source_language, |
| num_speakers=1, |
| watermark=True, |
| ) |
|
|
| |
| dubbing_id = response.dubbing_id |
| if wait_for_dubbing_completion(dubbing_id): |
| output_file_path = download_dubbed_file(dubbing_id, target_language) |
| return output_file_path |
| else: |
| return None |
|
|
|
|
| def summarize(meeting_texts=meeting_texts): |
| mt = ', '.join([f"{k}: {v}" for i in meeting_texts for k, v in i.items()]) |
| meeting_date_time = str(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) |
| meeting_texts = meeting_date_time + '\n' + mt |
|
|
| meeting_conversation_processed ='\n'.join(mt) |
| |
|
|
| minutes_of_meeting = "" |
| for chunk in AI71(AI71_API_KEY.strip()).chat.completions.create( |
| model="tiiuae/falcon-180b-chat", |
| messages=[ |
| {"role": "system", "content": """You are an expereiced Secretary who can summarize meeting discussions into minutes of meeting. |
| Summarize the meetings discussions provided as Speakerwise conversation. Ensure to mention the title as 'Minutes of Meeting held on {meeting_date_time} and present the summary with better viewing format and title in bold letters"""}, |
| {"role": "user", "content": meeting_conversation_processed}, |
| ], |
| stream=True, |
| ): |
| if chunk.choices[0].delta.content: |
| summary = chunk.choices[0].delta.content |
| minutes_of_meeting += summary |
| minutes_of_meeting = minutes_of_meeting.replace('User:', '').strip() |
| print("\n") |
| print(minutes_of_meeting) |
| return minutes_of_meeting |
|
|
|
|
| |
| def speech_to_text(video): |
| print('Started transcribing') |
| |
| |
| |
| audio = AudioSegment.from_file(video, format="mp4") |
| audio.export('temp.mp3', format="mp3") |
| transcript= transcriber.transcribe('temp.mp3')['text'] |
| print('transcript:', transcript) |
| return transcript |
|
|
| |
| def translate_text(text, source_language,target_language): |
| tokenizer.src_lang = source_language |
| encoded_ln = tokenizer(text, return_tensors="pt") |
| generated_tokens = model.generate(**encoded_ln, forced_bos_token_id=tokenizer.get_lang_id(target_language)) |
| translated_text = tokenizer.batch_decode(generated_tokens, skip_special_tokens=True)[0] |
| print('translated_text:', translated_text) |
| return translated_text |
|
|
| |
| def synthesize_speech(video, source_language,target_language): |
| print('Started dubbing') |
| dub_video = create_dub_from_file(input_file_path = video, |
| file_format = 'audio/mpeg', |
| source_language = source_language, |
| target_language = target_language) |
| |
| |
| |
| |
| |
| |
| |
| |
| return dub_video |
|
|
| |
| def process_speaker(video, speaker_idx, n_participants, *language_list): |
| transcript = speech_to_text(video) |
|
|
| |
| outputs = [] |
| global meeting_texts |
| def process_translation_dubbing(i): |
| if i != speaker_idx: |
| participant_language = language_codes[language_list[i]] |
| speaker_language = language_codes[language_list[speaker_idx]] |
| translated_text = translate_text(transcript, speaker_language, participant_language) |
| dubbed_video = synthesize_speech(video, speaker_language, participant_language) |
| return translated_text, dubbed_video |
| return None, None |
|
|
| with concurrent.futures.ThreadPoolExecutor() as executor: |
| futures = [executor.submit(process_translation_dubbing, i) for i in range(n_participants)] |
| results = [f.result() for f in futures] |
|
|
| for i, (translated_text, dubbed_video) in enumerate(results): |
| if i == speaker_idx: |
| outputs.insert(0, transcript) |
| else: |
| outputs.append(translated_text) |
| outputs.append(dubbed_video) |
| if speaker_idx == 0: |
| meeting_texts.append({f"Speaker_{speaker_idx+1}":outputs[0]}) |
| else: |
| meeting_texts.append({f"Speaker_{speaker_idx+1}":outputs[1]}) |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| print(len(outputs)) |
| print(outputs) |
| print('meeting_texts: ',meeting_texts) |
| return outputs |
|
|
| def create_participant_row(i, language_choices): |
| """Creates the UI for a single participant.""" |
| with gr.Row(): |
| video_input = gr.Video(label=f"Participant {i+1} Video", interactive=True) |
| language_dropdown = gr.Dropdown(choices=language_choices, label=f"Participant {i+1} Language", value=language_choices[i]) |
| transcript_output = gr.Textbox(label=f"Participant {i+1} Transcript") |
| translated_text = gr.Textbox(label="Speaker's Translated Text") |
| dubbed_video = gr.Video(label="Speaker's Dubbed Video") |
| return video_input, language_dropdown, transcript_output, translated_text, dubbed_video |
|
|
| |
| def create_gradio_interface(n_participants, language_choices): |
| with gr.Blocks() as demo: |
| gr.Markdown("# Multilingual Conference Call Simulation") |
|
|
| video_inputs = [] |
| language_dropdowns = [] |
| transcript_outputs = [] |
| translated_texts = [] |
| dubbed_videos = [] |
|
|
| |
| for i in range(n_participants): |
| video_input, language_dropdown, transcript_output, translated_text, dubbed_video = create_participant_row(i, language_choices) |
| video_inputs.append(video_input) |
| language_dropdowns.append(language_dropdown) |
| transcript_outputs.append(transcript_output) |
| translated_texts.append(translated_text) |
| dubbed_videos.append(dubbed_video) |
|
|
| |
| for i in range(n_participants): |
| gr.Button(f"Submit Speaker {i+1}'s Speech").click( |
| process_speaker, |
| [video_inputs[i], gr.State(i), gr.State(n_participants)] + [language_dropdowns[j] for j in range(n_participants)], |
| [transcript_outputs[i]] + [k for j in zip(translated_texts[:i]+translated_texts[i+1:], dubbed_videos[:i]+dubbed_videos[i+1:]) for k in j] |
| ) |
| minutes = gr.Textbox(label="Minutes of Meeting") |
| gr.Button(f"Generate Minutes of meeting").click(summarize, None, minutes) |
|
|
| |
| demo.queue().launch(debug=True, share=True) |
|
|
|
|
| create_gradio_interface(n_participants, language_choices) |