import os import tempfile import whisper import datetime as dt import gradio as gr from langchain.embeddings import OpenAIEmbeddings from langchain.vectorstores import Chroma from langchain.chat_models import ChatOpenAI from langchain.chains import ConversationalRetrievalChain from pytube import YouTube from typing import TYPE_CHECKING, Any, Generator, List chat_history = [] result = None chain = None run_once_flag = False call_to_load_video = 0 enable_box = gr.Textbox.update(value=None,placeholder= 'Upload your OpenAI API key',interactive=True) disable_box = gr.Textbox.update(value = 'OpenAI API key is Set',interactive=False) remove_box = gr.Textbox.update(value = 'Your API key successfully removed', interactive=False) pause = gr.Button.update(interactive=False) resume = gr.Button.update(interactive=True) def set_apikey(api_key): os.environ['OPENAI_API_KEY'] = api_key return disable_box def enable_api_box(): return enable_box def remove_key_box(): os.environ['OPENAI_API_KEY'] = '' return remove_box def reset_vars(): global chat_history, result, chain, run_once_flag, call_to_load_video os.environ['OPENAI_API_KEY'] = '' chat_history = None result, chain = None, None run_once_flag, call_to_load_video = False, 0 return [],'', gr.Video.update(value=None), gr.HTML.update(value=None) def load_video(url:str) -> str: global result yt = YouTube(url) target_dir = os.path.join('/tmp', 'Youtube') if not os.path.exists(target_dir): os.mkdir(target_dir) if os.path.exists(target_dir+'/'+yt.title+'.mp4'): return target_dir+'/'+yt.title+'.mp4' try: yt.streams.filter(only_audio=True) stream = yt.streams.get_audio_only() print('----DOWNLOADING AUDIO FILE----') stream.download(output_path=target_dir) except: raise gr.Error('Issue in Downloading video') return target_dir+'/'+yt.title+'.mp4' def process_video(video=None, url=None) -> dict[str, str | list]: if url: file_dir = load_video(url) else: file_dir = video print('Transcribing Video with whisper base model') model = whisper.load_model("base") result = model.transcribe(file_dir) return result def process_text(video=None, url=None) -> tuple[list, list[dt.datetime]]: global call_to_load_video if call_to_load_video==0: print('yes') result = process_video(url=url) if url else process_video(video=video) call_to_load_video+=1 texts, start_time_list = [], [] for res in result['segments']: start = res['start'] text = res['text'] start_time = dt.datetime.fromtimestamp(start) start_time_formatted = start_time.strftime("%H:%M:%S") texts.append(''.join(text)) start_time_list.append(start_time_formatted) texts_with_timestamps = dict(zip(texts,start_time_list)) formatted_texts = { text: dt.datetime.strptime(str(timestamp), '%H:%M:%S') for text, timestamp in texts_with_timestamps.items() } grouped_texts = [] current_group = '' time_list = [list(formatted_texts.values())[0]] previous_time = None time_difference = dt.timedelta(seconds=30) for text, timestamp in formatted_texts.items(): if previous_time is None or timestamp - previous_time <= time_difference: current_group+=text else: grouped_texts.append(current_group) time_list.append(timestamp) current_group = text previous_time = time_list[-1] # Append the last group of texts if current_group: grouped_texts.append(current_group) return grouped_texts, time_list # def process_text(video=None, url=None) -> tuple[list, list[dt.datetime]]: # # This function processes the text of a YouTube video or a local video file. # # Check if a YouTube link or a local video file is provided. # if not url and not video: # # Raise an error if no input is provided. # raise ValueError('Please provide a Youtube link or Upload a video') # # Get the result of processing the video. # global call_to_load_video # if call_to_load_video == 0: # print('yes') # result = process_video(url=url) if url else process_video(video=video) # call_to_load_video += 1 # # Get the text and start time of each segment of the video. # texts, start_time_list = [], [] # for res in result['segments']: # start = res['start'] # text = res['text'] # start_time = dt.datetime.fromtimestamp(start) # start_time_formatted = start_time.strftime("%H:%M:%S") # texts.append(''.join(text)) # start_time_list.append(start_time_formatted) # # Convert the text and start time to a dictionary. # texts_with_timestamps = dict(zip(texts, start_time_list)) # # Convert the dictionary to a list of tuples, where each tuple contains a text and its start time. # formatted_texts = { # text: dt.datetime.strptime(str(timestamp), '%H:%M:%S') # for text, timestamp in texts_with_timestamps.items() # } # # Group the texts by their start time. # grouped_texts = [] # current_group = '' # time_list = [list(formatted_texts.values())[0]] # previous_time = None # time_difference = dt.timedelta(seconds=30) # for text, timestamp in formatted_texts: # if previous_time is None or timestamp - previous_time <= time_difference: # current_group += text # else: # grouped_texts.append(current_group) # time_list.append(timestamp) # current_group = text # previous_time = time_list[-1] # # Append the last group of texts. # if current_group: # grouped_texts.append(current_group) # # Return the list of groups of texts and the list of start times. # return grouped_texts, time_list def get_title(url, video): print(url, video) if url!=None: yt = YouTube(url) title = yt.title else: title = os.path.basename(video) title = title[:-4] return title def check_path(url=None, video=None): if url: yt = YouTube(url) if os.path.exists('/tmp/Youtube'+yt.title+'.mp4'): return True else: if os.path.exists(video): return True return False def make_chain(url=None, video=None) -> (ConversationalRetrievalChain | Any | None): global chain, run_once_flag if not url and not video: raise gr.Error('Please provide a Youtube link or Upload a video') if not run_once_flag: run_once_flag=True title = get_title(url, video).replace(' ','-') # if not check_path(url, video): grouped_texts, time_list = process_text(url=url) if url else process_text(video=video) time_list = [{'source':str(t.time())} for t in time_list] vector_stores = Chroma.from_texts(texts=grouped_texts,collection_name= 'test',embedding=OpenAIEmbeddings(), metadatas=time_list) chain = ConversationalRetrievalChain.from_llm(ChatOpenAI(temperature=0.0), retriever=vector_stores.as_retriever(search_kwargs={"k": 5}), return_source_documents=True ) return chain else: return chain def QuestionAnswer(history, query=None, url=None, video=None) -> Generator[Any | None, Any, None]: global chat_history, chain if video and url: raise gr.Error('Upload a video or a Youtube link, not both') elif not url and not video: raise gr.Error('Provide a Youtube link or Upload a video') result = chain({"question": query, 'chat_history':chat_history},return_only_outputs=True) chat_history += [(query, result["answer"])] for char in result['answer']: history[-1][-1] += char yield history,'' def add_text(history, text): if not text: raise gr.Error('enter text') history = history + [(text,'')] return history def embed_yt(yt_link: str): # This function embeds a YouTube video into the page. # Check if the YouTube link is valid. if not yt_link: raise gr.Error('Paste a Youtube link') # Set the global variable `run_once_flag` to False. # This is used to prevent the function from being called more than once. run_once_flag = False # Set the global variable `call_to_load_video` to 0. # This is used to keep track of how many times the function has been called. call_to_load_video = 0 # Create a chain using the YouTube link. make_chain(url=yt_link) # Get the URL of the YouTube video. url = yt_link.replace('watch?v=', '/embed/') # Create the HTML code for the embedded YouTube video. embed_html = f"""""" # Return the HTML code and an empty list. return embed_html, [] def embed_video(video=str | None): # This function embeds a video into the page. # Check if the video is valid. if not video: raise gr.Error('Upload a Video') # Set the global variable `run_once_flag` to False. # This is used to prevent the function from being called more than once. run_once_flag = False # Create a chain using the video. make_chain(video=video) # Return the video and an empty list. return video, [] update_video = gr.Video.update(value = None) update_yt = gr.HTML.update(value=None) with gr.Blocks() as demo: with gr.Row(): # with gr.Group(): with gr.Column(scale=0.70): api_key = gr.Textbox(placeholder='Enter OpenAI API key', show_label=False, interactive=True).style(container=False) with gr.Column(scale=0.15): change_api_key = gr.Button('Change Key') with gr.Column(scale=0.15): remove_key = gr.Button('Remove Key') with gr.Row(): with gr.Column(): chatbot = gr.Chatbot(value=[]).style(height=650) query = gr.Textbox(placeholder='Enter query here', show_label=False).style(container=False) with gr.Column(): video = gr.Video(interactive=True,) start1 = gr.Button('Initiate Transcription') gr.HTML('OR') yt_link = gr.Textbox(placeholder='Paste a Youtube link here', show_label=False).style(container=False) yt_video = gr.HTML(label=True) start2 = gr.Button('Initiate Transcription') gr.HTML('Please reset the app after being done with the app to remove resources') reset = gr.Button('Reset App') start1.click(fn=lambda :(pause, update_yt), outputs=[start2, yt_video]).then( fn=embed_video, inputs=[video], outputs=[video, chatbot]).success( fn=lambda:resume, outputs=[start2]) start2.click(fn=lambda :(pause, update_video), outputs=[start1,video]).then( fn=embed_yt, inputs=[yt_link], outputs = [yt_video, chatbot]).success( fn=lambda:resume, outputs=[start1]) query.submit(fn=add_text, inputs=[chatbot, query], outputs=[chatbot]).success( fn=QuestionAnswer, inputs=[chatbot,query,yt_link,video], outputs=[chatbot,query]) api_key.submit(fn=set_apikey, inputs=api_key, outputs=api_key) change_api_key.click(fn=enable_api_box, outputs=api_key) remove_key.click(fn = remove_key_box, outputs=api_key) reset.click(fn = reset_vars, outputs=[chatbot,query, video, yt_video, ]) demo.queue() if __name__ == "__main__": demo.launch()