sunilkumardash9 commited on
Commit
bf79cde
·
1 Parent(s): ed7eec4

upload app.py

Browse files
Files changed (1) hide show
  1. app.py +364 -0
app.py ADDED
@@ -0,0 +1,364 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import tempfile
3
+ import whisper
4
+ import datetime as dt
5
+ import gradio as gr
6
+ from langchain.embeddings import OpenAIEmbeddings
7
+ from langchain.vectorstores import Chroma
8
+ from langchain.chat_models import ChatOpenAI
9
+ from langchain.chains import ConversationalRetrievalChain
10
+ from pytube import YouTube
11
+ from typing import TYPE_CHECKING, Any, Generator, List
12
+
13
+
14
+
15
+ #os.environ['OPENAI_API_KEY'] = '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'
16
+
17
+ chat_history = []
18
+ result = None
19
+ chain = None
20
+ run_once_flag = False
21
+ call_to_load_video = 0
22
+
23
+ enable_box = gr.Textbox.update(value=None,placeholder= 'Upload your OpenAI API key',interactive=True)
24
+ disable_box = gr.Textbox.update(value = 'OpenAI API key is Set',interactive=False)
25
+ remove_box = gr.Textbox.update(value = 'Your API key successfully removed', interactive=False)
26
+ pause = gr.Button.update(interactive=False)
27
+ resume = gr.Button.update(interactive=True)
28
+
29
+ def set_apikey(api_key):
30
+ os.environ['OPENAI_API_KEY'] = api_key
31
+ return disable_box
32
+ def enable_api_box():
33
+ return enable_box
34
+ def remove_key_box():
35
+ os.environ['OPENAI_API_KEY'] = ''
36
+ return remove_box
37
+
38
+ def reset_vars():
39
+ global chat_history, result, chain, run_once_flag, call_to_load_video
40
+
41
+ os.environ['OPENAI_API_KEY'] = ''
42
+ chat_history = None
43
+ result, chain = None, None
44
+ run_once_flag, call_to_load_video = False, 0
45
+
46
+ return [],'', gr.Video.update(value=None), gr.HTML.update(value=None)
47
+
48
+
49
+ def load_video(url:str) -> str:
50
+ global result
51
+
52
+ yt = YouTube(url)
53
+ target_dir = os.path.join('/tmp', 'Youtube')
54
+ if not os.path.exists(target_dir):
55
+ os.mkdir(target_dir)
56
+
57
+ if os.path.exists(target_dir+'/'+yt.title+'.mp4'):
58
+ return target_dir+'/'+yt.title+'.mp4'
59
+ try:
60
+
61
+ yt.streams.filter(only_audio=True)
62
+ stream = yt.streams.get_audio_only()
63
+ print('----DOWNLOADING AUDIO FILE----')
64
+ stream.download(output_path=target_dir)
65
+ except:
66
+ raise gr.Error('Issue in Downloading video')
67
+
68
+ return target_dir+'/'+yt.title+'.mp4'
69
+
70
+ def process_video(video=None, url=None) -> dict[str, str | list]:
71
+
72
+ if url:
73
+ file_dir = load_video(url)
74
+ else:
75
+ file_dir = video
76
+
77
+ print('Transcribing Video with whisper base model')
78
+ model = whisper.load_model("base")
79
+ result = model.transcribe(file_dir)
80
+
81
+ return result
82
+
83
+
84
+ def process_text(video=None, url=None) -> tuple[list, list[dt.datetime]]:
85
+ global call_to_load_video
86
+
87
+ if call_to_load_video==0:
88
+ print('yes')
89
+ result = process_video(url=url) if url else process_video(video=video)
90
+ call_to_load_video+=1
91
+
92
+ texts, start_time_list = [], []
93
+
94
+ for res in result['segments']:
95
+ start = res['start']
96
+ text = res['text']
97
+
98
+ start_time = dt.datetime.fromtimestamp(start)
99
+ start_time_formatted = start_time.strftime("%H:%M:%S")
100
+
101
+ texts.append(''.join(text))
102
+ start_time_list.append(start_time_formatted)
103
+
104
+ texts_with_timestamps = dict(zip(texts,start_time_list))
105
+ formatted_texts = {
106
+ text: dt.datetime.strptime(str(timestamp), '%H:%M:%S')
107
+ for text, timestamp in texts_with_timestamps.items()
108
+ }
109
+
110
+ grouped_texts = []
111
+ current_group = ''
112
+ time_list = [list(formatted_texts.values())[0]]
113
+ previous_time = None
114
+ time_difference = dt.timedelta(seconds=30)
115
+
116
+ for text, timestamp in formatted_texts.items():
117
+
118
+ if previous_time is None or timestamp - previous_time <= time_difference:
119
+ current_group+=text
120
+ else:
121
+ grouped_texts.append(current_group)
122
+ time_list.append(timestamp)
123
+ current_group = text
124
+ previous_time = time_list[-1]
125
+
126
+ # Append the last group of texts
127
+ if current_group:
128
+ grouped_texts.append(current_group)
129
+
130
+ return grouped_texts, time_list
131
+
132
+ # def process_text(video=None, url=None) -> tuple[list, list[dt.datetime]]:
133
+ # # This function processes the text of a YouTube video or a local video file.
134
+
135
+ # # Check if a YouTube link or a local video file is provided.
136
+ # if not url and not video:
137
+ # # Raise an error if no input is provided.
138
+ # raise ValueError('Please provide a Youtube link or Upload a video')
139
+
140
+ # # Get the result of processing the video.
141
+ # global call_to_load_video
142
+ # if call_to_load_video == 0:
143
+ # print('yes')
144
+ # result = process_video(url=url) if url else process_video(video=video)
145
+ # call_to_load_video += 1
146
+
147
+ # # Get the text and start time of each segment of the video.
148
+ # texts, start_time_list = [], []
149
+ # for res in result['segments']:
150
+ # start = res['start']
151
+ # text = res['text']
152
+
153
+ # start_time = dt.datetime.fromtimestamp(start)
154
+ # start_time_formatted = start_time.strftime("%H:%M:%S")
155
+
156
+ # texts.append(''.join(text))
157
+ # start_time_list.append(start_time_formatted)
158
+
159
+ # # Convert the text and start time to a dictionary.
160
+ # texts_with_timestamps = dict(zip(texts, start_time_list))
161
+
162
+ # # Convert the dictionary to a list of tuples, where each tuple contains a text and its start time.
163
+ # formatted_texts = {
164
+ # text: dt.datetime.strptime(str(timestamp), '%H:%M:%S')
165
+ # for text, timestamp in texts_with_timestamps.items()
166
+ # }
167
+
168
+ # # Group the texts by their start time.
169
+ # grouped_texts = []
170
+ # current_group = ''
171
+ # time_list = [list(formatted_texts.values())[0]]
172
+ # previous_time = None
173
+ # time_difference = dt.timedelta(seconds=30)
174
+
175
+ # for text, timestamp in formatted_texts:
176
+
177
+ # if previous_time is None or timestamp - previous_time <= time_difference:
178
+ # current_group += text
179
+ # else:
180
+ # grouped_texts.append(current_group)
181
+ # time_list.append(timestamp)
182
+ # current_group = text
183
+ # previous_time = time_list[-1]
184
+
185
+ # # Append the last group of texts.
186
+ # if current_group:
187
+ # grouped_texts.append(current_group)
188
+
189
+ # # Return the list of groups of texts and the list of start times.
190
+ # return grouped_texts, time_list
191
+
192
+
193
+ def get_title(url, video):
194
+ print(url, video)
195
+ if url!=None:
196
+ yt = YouTube(url)
197
+ title = yt.title
198
+ else:
199
+ title = os.path.basename(video)
200
+ title = title[:-4]
201
+ return title
202
+
203
+ def check_path(url=None, video=None):
204
+ if url:
205
+ yt = YouTube(url)
206
+ if os.path.exists('/tmp/Youtube'+yt.title+'.mp4'):
207
+ return True
208
+ else:
209
+ if os.path.exists(video):
210
+ return True
211
+ return False
212
+
213
+ def make_chain(url=None, video=None) -> (ConversationalRetrievalChain | Any | None):
214
+ global chain, run_once_flag
215
+
216
+ if not url and not video:
217
+ raise gr.Error('Please provide a Youtube link or Upload a video')
218
+ if not run_once_flag:
219
+ run_once_flag=True
220
+ title = get_title(url, video).replace(' ','-')
221
+
222
+ # if not check_path(url, video):
223
+ grouped_texts, time_list = process_text(url=url) if url else process_text(video=video)
224
+ time_list = [{'source':str(t.time())} for t in time_list]
225
+
226
+ vector_stores = Chroma.from_texts(texts=grouped_texts,collection_name= 'test',embedding=OpenAIEmbeddings(), metadatas=time_list)
227
+ chain = ConversationalRetrievalChain.from_llm(ChatOpenAI(temperature=0.0),
228
+ retriever=vector_stores.as_retriever(search_kwargs={"k": 5}),
229
+ return_source_documents=True )
230
+
231
+ return chain
232
+ else:
233
+ return chain
234
+
235
+
236
+
237
+ def QuestionAnswer(history, query=None, url=None, video=None) -> Generator[Any | None, Any, None]:
238
+ global chat_history, chain
239
+
240
+ if video and url:
241
+ raise gr.Error('Upload a video or a Youtube link, not both')
242
+ elif not url and not video:
243
+ raise gr.Error('Provide a Youtube link or Upload a video')
244
+
245
+ result = chain({"question": query, 'chat_history':chat_history},return_only_outputs=True)
246
+ chat_history += [(query, result["answer"])]
247
+ for char in result['answer']:
248
+ history[-1][-1] += char
249
+ yield history,''
250
+
251
+ def add_text(history, text):
252
+ if not text:
253
+ raise gr.Error('enter text')
254
+ history = history + [(text,'')]
255
+ return history
256
+
257
+ def embed_yt(yt_link: str):
258
+ # This function embeds a YouTube video into the page.
259
+
260
+ # Check if the YouTube link is valid.
261
+ if not yt_link:
262
+ raise gr.Error('Paste a Youtube link')
263
+
264
+ # Set the global variable `run_once_flag` to False.
265
+ # This is used to prevent the function from being called more than once.
266
+ run_once_flag = False
267
+
268
+ # Set the global variable `call_to_load_video` to 0.
269
+ # This is used to keep track of how many times the function has been called.
270
+ call_to_load_video = 0
271
+
272
+ # Create a chain using the YouTube link.
273
+ make_chain(url=yt_link)
274
+
275
+ # Get the URL of the YouTube video.
276
+ url = yt_link.replace('watch?v=', '/embed/')
277
+
278
+ # Create the HTML code for the embedded YouTube video.
279
+ embed_html = f"""<iframe width="750" height="315" src="{url}"
280
+ title="YouTube video player" frameborder="0"
281
+ allow="accelerometer; autoplay; clipboard-write;
282
+ encrypted-media; gyroscope; picture-in-picture"
283
+ allowfullscreen></iframe>"""
284
+
285
+ # Return the HTML code and an empty list.
286
+ return embed_html, []
287
+
288
+
289
+ def embed_video(video=str | None):
290
+ # This function embeds a video into the page.
291
+
292
+ # Check if the video is valid.
293
+ if not video:
294
+ raise gr.Error('Upload a Video')
295
+
296
+ # Set the global variable `run_once_flag` to False.
297
+ # This is used to prevent the function from being called more than once.
298
+ run_once_flag = False
299
+
300
+ # Create a chain using the video.
301
+ make_chain(video=video)
302
+
303
+ # Return the video and an empty list.
304
+ return video, []
305
+
306
+ update_video = gr.Video.update(value = None)
307
+ update_yt = gr.HTML.update(value=None)
308
+
309
+ with gr.Blocks() as demo:
310
+
311
+ with gr.Row():
312
+ # with gr.Group():
313
+ with gr.Column(scale=0.70):
314
+ api_key = gr.Textbox(placeholder='Enter OpenAI API key', show_label=False, interactive=True).style(container=False)
315
+ with gr.Column(scale=0.15):
316
+ change_api_key = gr.Button('Change Key')
317
+ with gr.Column(scale=0.15):
318
+ remove_key = gr.Button('Remove Key')
319
+
320
+ with gr.Row():
321
+ with gr.Column():
322
+
323
+ chatbot = gr.Chatbot(value=[]).style(height=650)
324
+ query = gr.Textbox(placeholder='Enter query here',
325
+ show_label=False).style(container=False)
326
+
327
+ with gr.Column():
328
+ video = gr.Video(interactive=True,)
329
+ start1 = gr.Button('Initiate Transcription')
330
+ gr.HTML('OR')
331
+ yt_link = gr.Textbox(placeholder='Paste a Youtube link here', show_label=False).style(container=False)
332
+ yt_video = gr.HTML(label=True)
333
+ start2 = gr.Button('Initiate Transcription')
334
+ gr.HTML('Please reset the app after being done with the app to remove resources')
335
+ reset = gr.Button('Reset App')
336
+
337
+
338
+ start1.click(fn=lambda :(pause, update_yt),
339
+ outputs=[start2, yt_video]).then(
340
+ fn=embed_video, inputs=[video],
341
+ outputs=[video, chatbot]).success(
342
+ fn=lambda:resume,
343
+ outputs=[start2])
344
+
345
+ start2.click(fn=lambda :(pause, update_video),
346
+ outputs=[start1,video]).then(
347
+ fn=embed_yt, inputs=[yt_link],
348
+ outputs = [yt_video, chatbot]).success(
349
+ fn=lambda:resume, outputs=[start1])
350
+
351
+ query.submit(fn=add_text, inputs=[chatbot, query],
352
+ outputs=[chatbot]).success(
353
+ fn=QuestionAnswer,
354
+ inputs=[chatbot,query,yt_link,video],
355
+ outputs=[chatbot,query])
356
+
357
+ api_key.submit(fn=set_apikey, inputs=api_key, outputs=api_key)
358
+ change_api_key.click(fn=enable_api_box, outputs=api_key)
359
+ remove_key.click(fn = remove_key_box, outputs=api_key)
360
+ reset.click(fn = reset_vars, outputs=[chatbot,query, video, yt_video, ])
361
+
362
+ demo.queue()
363
+ if __name__ == "__main__":
364
+ demo.launch()