Spaces:
Runtime error
Runtime error
| from elevenlabs import VoiceSettings | |
| from elevenlabs.client import ElevenLabs | |
| from transformers import M2M100ForConditionalGeneration, M2M100Tokenizer | |
| import whisper | |
| from ai71 import AI71 | |
| from datetime import datetime | |
| import os | |
| import time | |
| from pydub import AudioSegment | |
| # from IPython.display import Audio, display, Video, HTML | |
| # import assemblyai as aai | |
| from base64 import b64encode | |
| import gradio as gr | |
| import concurrent.futures | |
| import assemblyai as aai | |
| aai.settings.api_key = "d5b107f34d534b4ebdfbd869f8408f92" | |
| transcriber = aai.Transcriber() | |
| print(transcript.text) | |
| AI71_API_KEY = os.getenv('AI71_API_KEY') | |
| XI_API_KEY = os.getenv('ELEVEN_LABS_API_KEY') | |
| client = ElevenLabs(api_key=XI_API_KEY) | |
| model = M2M100ForConditionalGeneration.from_pretrained("facebook/m2m100_1.2B") | |
| tokenizer = M2M100Tokenizer.from_pretrained("facebook/m2m100_1.2B") | |
| # transcriber = whisper.load_model("turbo") | |
| language_codes = {"English":"en", "Hindi":"hi", "Portuguese":"pt", "Chinese":"zh", "Spanish":"es", | |
| "French":"fr", "German":"de", "Japanese":"ja", "Arabic":"ar", "Russian":"ru", | |
| "Korean":"ko", "Indonesian":"id", "Italian":"it", "Dutch":"nl","Turkish":"tr", | |
| "Polish":"pl", "Swedish":"sv", "Filipino":"fil", "Malay":"ms", "Romanian":"ro", | |
| "Ukrainian":"uk", "Greek":"el", "Czech":"cs", "Danish":"da", "Finnish":"fi", | |
| "Bulgarian":"bg", "Croatian":"hr", "Slovak":"sk"} | |
| meeting_texts = [] | |
| n_participants = 4 # This can be adjusted based on the number of people in the call | |
| language_choices = ["English", "Polish", "Hindi", "Arabic"] | |
| def wait_for_dubbing_completion(dubbing_id: str) -> bool: | |
| """ | |
| Waits for the dubbing process to complete by periodically checking the status. | |
| Args: | |
| dubbing_id (str): The dubbing project id. | |
| Returns: | |
| bool: True if the dubbing is successful, False otherwise. | |
| """ | |
| MAX_ATTEMPTS = 120 | |
| CHECK_INTERVAL = 10 # In seconds | |
| for _ in range(MAX_ATTEMPTS): | |
| metadata = client.dubbing.get_dubbing_project_metadata(dubbing_id) | |
| if metadata.status == "dubbed": | |
| return True | |
| elif metadata.status == "dubbing": | |
| print( | |
| "Dubbing in progress... Will check status again in", | |
| CHECK_INTERVAL, | |
| "seconds.", | |
| ) | |
| time.sleep(CHECK_INTERVAL) | |
| else: | |
| print("Dubbing failed:", metadata.error_message) | |
| return False | |
| print("Dubbing timed out") | |
| return False | |
| def download_dubbed_file(dubbing_id: str, language_code: str) -> str: | |
| """ | |
| Downloads the dubbed file for a given dubbing ID and language code. | |
| Args: | |
| dubbing_id: The ID of the dubbing project. | |
| language_code: The language code for the dubbing. | |
| Returns: | |
| The file path to the downloaded dubbed file. | |
| """ | |
| dir_path = f"data/{dubbing_id}" | |
| os.makedirs(dir_path, exist_ok=True) | |
| file_path = f"{dir_path}/{language_code}.mp4" | |
| with open(file_path, "wb") as file: | |
| for chunk in client.dubbing.get_dubbed_file(dubbing_id, language_code): | |
| file.write(chunk) | |
| return file_path | |
| def create_dub_from_file( | |
| input_file_path: str, | |
| file_format: str, | |
| source_language: str, | |
| target_language: str, | |
| ): | |
| # ) -> Optional[str]: | |
| """ | |
| Dubs an audio or video file from one language to another and saves the output. | |
| Args: | |
| input_file_path (str): The file path of the audio or video to dub. | |
| file_format (str): The file format of the input file. | |
| source_language (str): The language of the input file. | |
| target_language (str): The target language to dub into. | |
| Returns: | |
| Optional[str]: The file path of the dubbed file or None if operation failed. | |
| """ | |
| if not os.path.isfile(input_file_path): | |
| raise FileNotFoundError(f"The input file does not exist: {input_file_path}") | |
| with open(input_file_path, "rb") as audio_file: | |
| response = client.dubbing.dub_a_video_or_an_audio_file( | |
| file=(os.path.basename(input_file_path), audio_file, file_format), # Optional file | |
| target_lang=target_language, # The target language to dub the content into. Can be none if dubbing studio editor is enabled and running manual mode | |
| # mode="automatic", # automatic or manual. | |
| source_lang=source_language, # Source language | |
| num_speakers=1, # Number of speakers to use for the dubbing. | |
| watermark=True, # Whether to apply watermark to the output video. | |
| ) | |
| # rest of the code | |
| dubbing_id = response.dubbing_id | |
| if wait_for_dubbing_completion(dubbing_id): | |
| output_file_path = download_dubbed_file(dubbing_id, target_language) | |
| return output_file_path | |
| else: | |
| return None | |
| def summarize(meeting_texts=meeting_texts): | |
| mt = ', '.join([f"{k}: {v}" for i in meeting_texts for k, v in i.items()]) | |
| meeting_date_time = str(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) | |
| meeting_texts = meeting_date_time + '\n' + mt | |
| # meeting_conversation_processed ='\n'.join(mt) | |
| # print("M:", session_conversation_processed) | |
| minutes_of_meeting = "" | |
| for chunk in AI71(AI71_API_KEY.strip()).chat.completions.create( | |
| model="tiiuae/falcon-180b-chat", | |
| messages=[ | |
| {"role": "system", "content": f"""You are an expereiced Secretary who can summarize meeting discussions into minutes of meeting. | |
| Summarize the meetings discussions provided as Speakerwise conversation. | |
| Strictly consider only the context given in user content {meeting_texts} for summarization. | |
| Ensure to mention the title as 'Minutes of Meeting held on {meeting_date_time} and present the summary with better viewing format and title in bold letters"""}, | |
| {"role": "user", "content": meeting_texts}, | |
| ], | |
| stream=True, | |
| ): | |
| if chunk.choices[0].delta.content: | |
| summary = chunk.choices[0].delta.content | |
| minutes_of_meeting += summary | |
| minutes_of_meeting = minutes_of_meeting.replace('User:', '').strip() | |
| print("\n") | |
| print("minutes_of_meeting:", minutes_of_meeting) | |
| return minutes_of_meeting | |
| # Placeholder function for speech to text conversion | |
| def speech_to_text(video): | |
| print('Started transcribing') | |
| # audio = AudioSegment.from_file(video) | |
| # audio.export('temp.mp3', format="mp3") | |
| # transcript= transcriber.transcribe('temp.mp3')['text'] | |
| transcript = transcriber.transcribe(video) | |
| print('transcript:', transcript) | |
| return transcript | |
| # Placeholder function for translating text | |
| def translate_text(text, source_language,target_language): | |
| tokenizer.src_lang = source_language | |
| encoded_ln = tokenizer(text, return_tensors="pt") | |
| generated_tokens = model.generate(**encoded_ln, forced_bos_token_id=tokenizer.get_lang_id(target_language)) | |
| translated_text = tokenizer.batch_decode(generated_tokens, skip_special_tokens=True)[0] | |
| print('translated_text:', translated_text) | |
| return translated_text | |
| # Placeholder function for dubbing (text-to-speech in another language) | |
| def synthesize_speech(video, source_language,target_language): | |
| print('Started dubbing') | |
| dub_video = create_dub_from_file(input_file_path = video, | |
| file_format = 'audio/mpeg', | |
| source_language = source_language, | |
| target_language = target_language) | |
| return dub_video | |
| # This function handles the processing when any participant speaks | |
| def process_speaker(video, speaker_idx, n_participants, *language_list): | |
| transcript = speech_to_text(video) | |
| # Create outputs for each participant | |
| outputs = [] | |
| global meeting_texts | |
| def process_translation_dubbing(i): | |
| if i != speaker_idx: | |
| participant_language = language_codes[language_list[i]] | |
| speaker_language = language_codes[language_list[speaker_idx]] | |
| translated_text = translate_text(transcript, speaker_language, participant_language) | |
| dubbed_video = synthesize_speech(video, speaker_language, participant_language) | |
| return translated_text, dubbed_video | |
| return None, None | |
| with concurrent.futures.ThreadPoolExecutor() as executor: | |
| futures = [executor.submit(process_translation_dubbing, i) for i in range(n_participants)] | |
| results = [f.result() for f in futures] | |
| for i, (translated_text, dubbed_video) in enumerate(results): | |
| if i == speaker_idx: | |
| outputs.insert(0, transcript) | |
| else: | |
| outputs.append(translated_text) | |
| outputs.append(dubbed_video) | |
| if speaker_idx == 0: | |
| meeting_texts.append({f"Speaker_{speaker_idx+1}":outputs[0]}) | |
| else: | |
| meeting_texts.append({f"Speaker_{speaker_idx+1}":outputs[1]}) | |
| print(len(outputs)) | |
| print(outputs) | |
| print('meeting_texts: ',meeting_texts) | |
| return outputs | |
| def create_participant_row(i, language_choices): | |
| """Creates the UI for a single participant.""" | |
| with gr.Row(): | |
| video_input = gr.Video(label=f"Participant {i+1} Video", interactive=True) | |
| language_dropdown = gr.Dropdown(choices=language_choices, label=f"Participant {i+1} Language", value=language_choices[i]) | |
| transcript_output = gr.Textbox(label=f"Participant {i+1} Transcript") | |
| translated_text = gr.Textbox(label="Speaker's Translated Text") | |
| dubbed_video = gr.Video(label="Speaker's Dubbed Video") | |
| return video_input, language_dropdown, transcript_output, translated_text, dubbed_video | |
| # Main dynamic Gradio interface | |
| def create_gradio_interface(n_participants, language_choices): | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# LinguaPolis: Bridging Languages, Uniting Teams Globally - Multilingual Conference Call Simulation") | |
| video_inputs = [] | |
| language_dropdowns = [] | |
| transcript_outputs = [] | |
| translated_texts = [] | |
| dubbed_videos = [] | |
| # Create a row for each participant | |
| for i in range(n_participants): | |
| video_input, language_dropdown, transcript_output, translated_text, dubbed_video = create_participant_row(i, language_choices) | |
| video_inputs.append(video_input) | |
| language_dropdowns.append(language_dropdown) | |
| transcript_outputs.append(transcript_output) | |
| translated_texts.append(translated_text) | |
| dubbed_videos.append(dubbed_video) | |
| # Create dynamic processing buttons for each participant | |
| for i in range(n_participants): | |
| gr.Button(f"Submit Speaker {i+1}'s Speech").click( | |
| process_speaker, | |
| [video_inputs[i], gr.State(i), gr.State(n_participants)] + [language_dropdowns[j] for j in range(n_participants)], | |
| [transcript_outputs[i]] + [k for j in zip(translated_texts[:i]+translated_texts[i+1:], dubbed_videos[:i]+dubbed_videos[i+1:]) for k in j] | |
| ) | |
| minutes = gr.Textbox(label="Minutes of Meeting") | |
| gr.Button(f"Generate Minutes of meeting").click(summarize, None, minutes) | |
| # Launch with .queue() to keep it running properly in Jupyter | |
| demo.queue().launch(debug=True, share=True) | |
| create_gradio_interface(n_participants, language_choices) |