Spaces:
Runtime error
Runtime error
| from elevenlabs import VoiceSettings | |
| from elevenlabs.client import ElevenLabs | |
| from transformers import M2M100ForConditionalGeneration, M2M100Tokenizer | |
| from ai71 import AI71 | |
| from datetime import datetime | |
| import os | |
| import time | |
| from pydub import AudioSegment | |
| from base64 import b64encode | |
| import gradio as gr | |
| import concurrent.futures | |
| AI71_API_KEY = os.getenv('AI71_API_KEY') | |
| XI_API_KEY = os.getenv('ELEVEN_LABS_API_KEY') | |
| client = ElevenLabs(api_key=XI_API_KEY) | |
| translator = M2M100ForConditionalGeneration.from_pretrained("facebook/m2m100_1.2B") | |
| tokenizer = M2M100Tokenizer.from_pretrained("facebook/m2m100_1.2B") | |
| transcriber = gr.load("models/openai/whisper-large-v3-turbo") | |
| # transcriber = whisper.load_model("turbo") | |
| language_codes = {"English":"en", "Hindi":"hi", "Portuguese":"pt", "Chinese":"zh", "Spanish":"es", | |
| "French":"fr", "German":"de", "Japanese":"ja", "Arabic":"ar", "Russian":"ru", | |
| "Korean":"ko", "Indonesian":"id", "Italian":"it", "Dutch":"nl","Turkish":"tr", | |
| "Polish":"pl", "Swedish":"sv", "Filipino":"fil", "Malay":"ms", "Romanian":"ro", | |
| "Ukrainian":"uk", "Greek":"el", "Czech":"cs", "Danish":"da", "Finnish":"fi", | |
| "Bulgarian":"bg", "Croatian":"hr", "Slovak":"sk"} | |
| # meeting_texts = [] | |
| n_participants = 4 # This can be adjusted based on the number of people in the call | |
| language_choices = ["English", "Polish", "Hindi", "Arabic"] | |
| def clear_all(): | |
| global meeting_texts | |
| meeting_texts = [] # Reset meeting texts | |
| return [None] * (n_participants * 4 + 1) # Reset outputs of transcripts, translated texts, and dubbed videos | |
| def wait_for_dubbing_completion(dubbing_id: str) -> bool: | |
| """ | |
| Waits for the dubbing process to complete by periodically checking the status. | |
| Args: | |
| dubbing_id (str): The dubbing project id. | |
| Returns: | |
| bool: True if the dubbing is successful, False otherwise. | |
| """ | |
| MAX_ATTEMPTS = 120 | |
| CHECK_INTERVAL = 10 # In seconds | |
| for _ in range(MAX_ATTEMPTS): | |
| metadata = client.dubbing.get_dubbing_project_metadata(dubbing_id) | |
| if metadata.status == "dubbed": | |
| return True | |
| elif metadata.status == "dubbing": | |
| print( | |
| "Dubbing in progress... Will check status again in", | |
| CHECK_INTERVAL, | |
| "seconds.", | |
| ) | |
| time.sleep(CHECK_INTERVAL) | |
| else: | |
| print("Dubbing failed:", metadata.error_message) | |
| return False | |
| print("Dubbing timed out") | |
| return False | |
| def download_dubbed_file(dubbing_id: str, language_code: str) -> str: | |
| """ | |
| Downloads the dubbed file for a given dubbing ID and language code. | |
| Args: | |
| dubbing_id: The ID of the dubbing project. | |
| language_code: The language code for the dubbing. | |
| Returns: | |
| The file path to the downloaded dubbed file. | |
| """ | |
| dir_path = f"data/{dubbing_id}" | |
| os.makedirs(dir_path, exist_ok=True) | |
| file_path = f"{dir_path}/{language_code}.mp4" | |
| with open(file_path, "wb") as file: | |
| for chunk in client.dubbing.get_dubbed_file(dubbing_id, language_code): | |
| file.write(chunk) | |
| return file_path | |
| def create_dub_from_file( | |
| input_file_path: str, | |
| file_format: str, | |
| source_language: str, | |
| target_language: str, | |
| ): | |
| # ) -> Optional[str]: | |
| """ | |
| Dubs an audio or video file from one language to another and saves the output. | |
| Args: | |
| input_file_path (str): The file path of the audio or video to dub. | |
| file_format (str): The file format of the input file. | |
| source_language (str): The language of the input file. | |
| target_language (str): The target language to dub into. | |
| Returns: | |
| Optional[str]: The file path of the dubbed file or None if operation failed. | |
| """ | |
| if not os.path.isfile(input_file_path): | |
| raise FileNotFoundError(f"The input file does not exist: {input_file_path}") | |
| with open(input_file_path, "rb") as audio_file: | |
| response = client.dubbing.dub_a_video_or_an_audio_file( | |
| file=(os.path.basename(input_file_path), audio_file, file_format), # Optional file | |
| target_lang=target_language, # The target language to dub the content into. Can be none if dubbing studio editor is enabled and running manual mode | |
| # mode="automatic", # automatic or manual. | |
| source_lang=source_language, # Source language | |
| num_speakers=1, # Number of speakers to use for the dubbing. | |
| watermark=True, # Whether to apply watermark to the output video. | |
| ) | |
| # rest of the code | |
| dubbing_id = response.dubbing_id | |
| if wait_for_dubbing_completion(dubbing_id): | |
| output_file_path = download_dubbed_file(dubbing_id, target_language) | |
| return output_file_path | |
| else: | |
| return None | |
| def summarize(meeting_texts): | |
| meeting_texts = ', '.join([f"{k}: {v}" for i in meeting_texts for k, v in i.items()]) | |
| meeting_date_time = str(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) | |
| # meeting_texts = meeting_date_time + '\n' + meeting_texts | |
| # meeting_conversation_processed ='\n'.join(mt) | |
| # print("M:", session_conversation_processed) | |
| minutes_of_meeting = "" | |
| for chunk in AI71(AI71_API_KEY.strip()).chat.completions.create( | |
| model="tiiuae/falcon-180b-chat", | |
| messages=[ | |
| {"role": "system", "content": f"""You are an expereiced Secretary who can summarize meeting discussions into minutes of meeting. | |
| Summarize the meeting discussions provided in json format as Speakerwise conversation. | |
| Strictly consider ONLY the context given in user content for summarization. Do not generalize the summary with irrelevant content. | |
| Ensure to mention the title as 'Minutes of Meeting held on {meeting_date_time} and | |
| present the summary with better viewing format and title in bold letters"""}, | |
| {"role": "user", "content": meeting_texts}, | |
| ], | |
| stream=True, | |
| ): | |
| if chunk.choices[0].delta.content: | |
| summary = chunk.choices[0].delta.content | |
| minutes_of_meeting += summary | |
| minutes_of_meeting = minutes_of_meeting.replace('User:', '').strip() | |
| print("\n") | |
| print("minutes_of_meeting:", minutes_of_meeting) | |
| return minutes_of_meeting | |
| # Placeholder function for speech to text conversion | |
| def speech_to_text(video): | |
| print(video, type(video)) | |
| print('Started transcribing') | |
| audio = AudioSegment.from_file(video) | |
| audio.export('temp.wav', format="wav") | |
| # transcript = transcriber.transcribe(video).text | |
| # transcript = transcriber.transcribe(video).text | |
| transcript = transcriber("temp.wav").split("'")[1].strip() | |
| print('transcript:', transcript) | |
| return transcript | |
| # Placeholder function for translating text | |
| def translate_text(text, source_language,target_language): | |
| tokenizer.src_lang = source_language | |
| encoded_ln = tokenizer(text, return_tensors="pt") | |
| generated_tokens = translator.generate(**encoded_ln, forced_bos_token_id=tokenizer.get_lang_id(target_language)) | |
| translated_text = tokenizer.batch_decode(generated_tokens, skip_special_tokens=True)[0] | |
| print('translated_text:', translated_text) | |
| return translated_text | |
| # Placeholder function for dubbing (text-to-speech in another language) | |
| def synthesize_speech(video, source_language,target_language): | |
| print('Started dubbing') | |
| dub_video = create_dub_from_file(input_file_path = video, | |
| file_format = 'audio/mpeg', | |
| source_language = source_language, | |
| target_language = target_language) | |
| return dub_video | |
| # # This function handles the processing when any participant speaks | |
| # def process_speaker(video, speaker_idx, n_participants, *language_list): | |
| # transcript = speech_to_text(video) | |
| # # Create outputs for each participant | |
| # outputs = [] | |
| # global meeting_texts | |
| # def process_translation_dubbing(i): | |
| # if i != speaker_idx: | |
| # participant_language = language_codes[language_list[i]] | |
| # speaker_language = language_codes[language_list[speaker_idx]] | |
| # translated_text = translate_text(transcript, speaker_language, participant_language) | |
| # dubbed_video = synthesize_speech(video, speaker_language, participant_language) | |
| # return translated_text, dubbed_video | |
| # return None, None | |
| # with concurrent.futures.ThreadPoolExecutor() as executor: | |
| # futures = [executor.submit(process_translation_dubbing, i) for i in range(n_participants)] | |
| # results = [f.result() for f in futures] | |
| # for i, (translated_text, dubbed_video) in enumerate(results): | |
| # if i == speaker_idx: | |
| # outputs.insert(0, transcript) | |
| # else: | |
| # outputs.append(translated_text) | |
| # outputs.append(dubbed_video) | |
| # if speaker_idx == 0: | |
| # meeting_texts.append({f"Speaker_{speaker_idx+1}":outputs[0]}) | |
| # else: | |
| # meeting_texts.append({f"Speaker_{speaker_idx+1}":outputs[1]}) | |
| # print(len(outputs)) | |
| # print(outputs) | |
| # print('meeting_texts: ',meeting_texts) | |
| # return outputs | |
| # def create_participant_row(i, language_choices): | |
| # """Creates the UI for a single participant.""" | |
| # with gr.Row(): | |
| # video_input = gr.Video(label=f"Participant {i+1} Video", interactive=True) | |
| # language_dropdown = gr.Dropdown(choices=language_choices, label=f"Participant {i+1} Language", value=language_choices[i]) | |
| # transcript_output = gr.Textbox(label=f"Participant {i+1} Transcript") | |
| # translated_text = gr.Textbox(label="Speaker's Translated Text") | |
| # dubbed_video = gr.Video(label="Speaker's Dubbed Video") | |
| # return video_input, language_dropdown, transcript_output, translated_text, dubbed_video | |
| # # Main dynamic Gradio interface | |
| # def create_gradio_interface(n_participants, language_choices): | |
| # with gr.Blocks() as demo: | |
| # gr.Markdown("""# LinguaPolis: Bridging Languages, Uniting Teams Globally - Multilingual Conference Call Simulation | |
| # ## Record your video or upload your video and press the corresponding Submit button at the bottom""") | |
| # video_inputs = [] | |
| # language_dropdowns = [] | |
| # transcript_outputs = [] | |
| # translated_texts = [] | |
| # dubbed_videos = [] | |
| # clear_button = gr.Button("Clear All") | |
| # # Create a row for each participant | |
| # for i in range(n_participants): | |
| # video_input, language_dropdown, transcript_output, translated_text, dubbed_video = create_participant_row(i, language_choices) | |
| # video_inputs.append(video_input) | |
| # language_dropdowns.append(language_dropdown) | |
| # transcript_outputs.append(transcript_output) | |
| # translated_texts.append(translated_text) | |
| # dubbed_videos.append(dubbed_video) | |
| # # Create dynamic processing buttons for each participant | |
| # for i in range(n_participants): | |
| # gr.Button(f"Submit Speaker {i+1}'s Speech").click( | |
| # process_speaker, | |
| # [video_inputs[i], gr.State(i), gr.State(n_participants)] + [language_dropdowns[j] for j in range(n_participants)], | |
| # [transcript_outputs[i]] + [k for j in zip(translated_texts[:i]+translated_texts[i+1:], dubbed_videos[:i]+dubbed_videos[i+1:]) for k in j] | |
| # ) | |
| # minutes = gr.Textbox(label="Minutes of Meeting") | |
| # gr.Button(f"Generate Minutes of meeting").click(summarize, None, minutes) | |
| # # Clear button to reset inputs and outputs | |
| # clear_button.click(clear_all, None, [*video_inputs, *transcript_outputs, *translated_texts, *dubbed_videos, minutes]) | |
| # # Launch with .queue() to keep it running properly in Jupyter | |
| # demo.queue().launch(debug=True, share=True) | |
| # create_gradio_interface(n_participants, language_choices) | |
| # def create_dub_from_file( | |
| # input_file_path: str, | |
| # file_format: str, | |
| # source_language: str, | |
| # target_language: str, | |
| # ): | |
| # # ) -> Optional[str]: | |
| # """ | |
| # Dubs an audio or video file from one language to another and saves the output. | |
| # Args: | |
| # input_file_path (str): The file path of the audio or video to dub. | |
| # file_format (str): The file format of the input file. | |
| # source_language (str): The language of the input file. | |
| # target_language (str): The target language to dub into. | |
| # Returns: | |
| # Optional[str]: The file path of the dubbed file or None if operation failed. | |
| # """ | |
| # if not os.path.isfile(input_file_path): | |
| # raise FileNotFoundError(f"The input file does not exist: {input_file_path}") | |
| # with open(input_file_path, "rb") as audio_file: | |
| # response = client.dubbing.dub_a_video_or_an_audio_file( | |
| # file=(os.path.basename(input_file_path), audio_file, file_format), # Optional file | |
| # target_lang=target_language, # The target language to dub the content into. Can be none if dubbing studio editor is enabled and running manual mode | |
| # # mode="automatic", # automatic or manual. | |
| # source_lang=source_language, # Source language | |
| # num_speakers=1, # Number of speakers to use for the dubbing. | |
| # watermark=True, # Whether to apply watermark to the output video. | |
| # ) | |
| # # rest of the code | |
| # dubbing_id = response.dubbing_id | |
| # if wait_for_dubbing_completion(dubbing_id): | |
| # output_file_path = download_dubbed_file(dubbing_id, target_language) | |
| # return output_file_path | |
| # else: | |
| # return None | |
| # # Modify the summarize function to accept and return meeting_texts | |
| # def summarize(meeting_texts): | |
| # meeting_texts = ', '.join([f"{k}: {v}" for i in meeting_texts for k, v in i.items()]) | |
| # meeting_date_time = str(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) | |
| # # meeting_texts_str = meeting_date_time + '\n' + mt | |
| # minutes_of_meeting = "" | |
| # for chunk in AI71(AI71_API_KEY.strip()).chat.completions.create( | |
| # model="tiiuae/falcon-180b-chat", | |
| # messages=[ | |
| # {"role": "system", "content": f"""You are an experienced Secretary who can summarize meeting discussions into minutes of meeting. | |
| # Summarize the meetings discussions provided as Speakerwise conversation. | |
| # Strictly consider only the context given in user content for summarization. | |
| # Ensure to mention the title as 'Minutes of Meeting held on {meeting_date_time}' and present the summary with better viewing format and title in bold letters."""}, | |
| # {"role": "user", "content": meeting_texts}, | |
| # ], | |
| # stream=True, | |
| # ): | |
| # if chunk.choices[0].delta.content: | |
| # summary = chunk.choices[0].delta.content | |
| # minutes_of_meeting += summary | |
| # minutes_of_meeting = minutes_of_meeting.replace('User:', '').strip() | |
| # print("minutes_of_meeting:", minutes_of_meeting) | |
| # return minutes_of_meeting | |
| # # Placeholder function for speech to text conversion | |
| # def speech_to_text(video): | |
| # print(video, type(video)) | |
| # print('Started transcribing') | |
| # audio = AudioSegment.from_file(video) | |
| # audio.export('temp.wav', format="wav") | |
| # # transcript = transcriber.transcribe(video).text | |
| # # transcript = transcriber.transcribe(video).text | |
| # transcript = transcriber("temp.wav").split("'")[1].strip() | |
| # print('transcript:', transcript) | |
| # return transcript | |
| # # Placeholder function for translating text | |
| # def translate_text(text, source_language,target_language): | |
| # tokenizer.src_lang = source_language | |
| # encoded_ln = tokenizer(text, return_tensors="pt") | |
| # generated_tokens = translator.generate(**encoded_ln, forced_bos_token_id=tokenizer.get_lang_id(target_language)) | |
| # translated_text = tokenizer.batch_decode(generated_tokens, skip_special_tokens=True)[0] | |
| # print('translated_text:', translated_text) | |
| # return translated_text | |
| # # Placeholder function for dubbing (text-to-speech in another language) | |
| # def synthesize_speech(video, source_language,target_language): | |
| # print('Started dubbing') | |
| # dub_video = create_dub_from_file(input_file_path = video, | |
| # file_format = 'audio/mpeg', | |
| # source_language = source_language, | |
| # target_language = target_language) | |
| # return dub_video | |
| # Update process_speaker function to accept and return meeting_texts | |
| def process_speaker(video, speaker_idx, n_participants, meeting_texts, *language_list): | |
| transcript = speech_to_text(video) | |
| # Create outputs for each participant | |
| outputs = [] | |
| def process_translation_dubbing(i): | |
| if i != speaker_idx: | |
| participant_language = language_codes[language_list[i]] | |
| speaker_language = language_codes[language_list[speaker_idx]] | |
| translated_text = translate_text(transcript, speaker_language, participant_language) | |
| dubbed_video = synthesize_speech(video, speaker_language, participant_language) | |
| return translated_text, dubbed_video | |
| return None, None | |
| with concurrent.futures.ThreadPoolExecutor() as executor: | |
| futures = [executor.submit(process_translation_dubbing, i) for i in range(n_participants)] | |
| results = [f.result() for f in futures] | |
| for i, (translated_text, dubbed_video) in enumerate(results): | |
| if i == speaker_idx: | |
| outputs.insert(0, transcript) | |
| else: | |
| outputs.append(translated_text) | |
| outputs.append(dubbed_video) | |
| if speaker_idx == 0: | |
| meeting_texts.append({f"Speaker_{speaker_idx+1}": outputs[0]}) | |
| else: | |
| meeting_texts.append({f"Speaker_{speaker_idx+1}": outputs[1]}) | |
| print(len(outputs)) | |
| print(outputs) | |
| print("meeting_texts:", meeting_texts) | |
| print('outputs:', outputs) | |
| outputs.append(meeting_texts) | |
| print(len(outputs)) | |
| return outputs | |
| def create_participant_row(i, language_choices): | |
| """Creates the UI for a single participant.""" | |
| with gr.Row(): | |
| video_input = gr.Video(label=f"Participant {i+1} Video", interactive=True) | |
| language_dropdown = gr.Dropdown(choices=language_choices, label=f"Participant {i+1} Language", value=language_choices[i]) | |
| transcript_output = gr.Textbox(label=f"Participant {i+1} Transcript") | |
| translated_text = gr.Textbox(label="Speaker's Translated Text") | |
| dubbed_video = gr.Video(label="Speaker's Dubbed Video") | |
| return video_input, language_dropdown, transcript_output, translated_text, dubbed_video | |
| # Modify the Gradio interface to manage the meeting_texts between function calls | |
| def create_gradio_interface(n_participants, language_choices): | |
| with gr.Blocks() as demo: | |
| gr.Markdown("""# LinguaPolis: Bridging Languages, Uniting Teams Globally - Multilingual Conference Call Simulation | |
| ## Record your video or upload your video and press the corresponding Submit button at the bottom""") | |
| video_inputs = [] | |
| language_dropdowns = [] | |
| transcript_outputs = [] | |
| translated_texts = [] | |
| dubbed_videos = [] | |
| clear_button = gr.Button("Clear All") | |
| meeting_texts = gr.State([]) # Initialize meeting_texts as a Gradio State | |
| # Create a row for each participant | |
| for i in range(n_participants): | |
| video_input, language_dropdown, transcript_output, translated_text, dubbed_video = create_participant_row(i, language_choices) | |
| video_inputs.append(video_input) | |
| language_dropdowns.append(language_dropdown) | |
| transcript_outputs.append(transcript_output) | |
| translated_texts.append(translated_text) | |
| dubbed_videos.append(dubbed_video) | |
| # Create dynamic processing buttons for each participant | |
| for i in range(n_participants): | |
| gr.Button(f"Submit Speaker {i+1}'s Speech").click( | |
| process_speaker, | |
| [video_inputs[i], gr.State(i), gr.State(n_participants), meeting_texts] + [language_dropdowns[j] for j in range(n_participants)], | |
| [transcript_outputs[i]] + [k for j in zip(translated_texts[:i]+translated_texts[i+1:], dubbed_videos[:i]+dubbed_videos[i+1:]) for k in j] + [meeting_texts] | |
| ) | |
| minutes = gr.Textbox(label="Minutes of Meeting") | |
| gr.Button(f"Generate Minutes of meeting").click(summarize, [meeting_texts], minutes) | |
| # Clear button to reset inputs and outputs | |
| clear_button.click(clear_all, None, [*video_inputs, *transcript_outputs, *translated_texts, *dubbed_videos, minutes, meeting_texts]) | |
| demo.launch(debug=True, share=True) | |
| create_gradio_interface(4, language_choices) | |