Spaces:
Runtime error
Runtime error
| import streamlit as st | |
| import sentence_transformers | |
| from transformers import AutoTokenizer | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| import os | |
| import ast | |
| import pandas as pd | |
| from segmentation import SemanticTextSegmentation | |
| import re | |
| from symspellpy import SymSpell | |
| import pkg_resources | |
| from transformers import AutoTokenizer, AutoModelForSeq2SeqLM | |
| from torch import cuda | |
| from transformers import pipeline | |
| import nltk | |
| nltk.download('stopwords') | |
| from PIL import Image | |
| from PIL import ImageDraw | |
| from PIL import ImageFont | |
| if not os.path.exists('./transcripts'): | |
| os.mkdir('./transcripts') | |
| device = 'cuda' if cuda.is_available() else 'cpu' | |
| def clean_text(link,start,end): | |
| tokenizer = AutoTokenizer.from_pretrained("CareerNinja/t5_large_1e-4_on_V3dataset") | |
| sym_spell = SymSpell(max_dictionary_edit_distance=2, prefix_length=7) | |
| dictionary_path = pkg_resources.resource_filename( | |
| "symspellpy", "frequency_dictionary_en_82_765.txt" | |
| ) | |
| sym_spell.load_dictionary(dictionary_path, term_index=0, count_index=1) | |
| def id_ts_grabber(link): | |
| youtube_video = link.split("=") | |
| video_id = youtube_video[1] | |
| #print(f""" This is the video ID: {video_id} and this is the Timestamp: {time_stamp}""") | |
| return video_id | |
| #print(f""" This is the video ID: {video_id} and no Timestamp was found""") | |
| def seg_getter(data,ts,es): | |
| starts = [] | |
| for line in data: | |
| ccs = ast.literal_eval(line) | |
| starts.append(float(ccs['start'])) | |
| #print(starts) | |
| #ts_ = float(ts.strip("s&end")) | |
| #es_ = float(es.strip(es[-1])) | |
| if not(es) : | |
| e_val = starts[-1] | |
| else: | |
| e_val = starts[min(range(len(starts)), key = lambda i: abs(starts[i]-float(es)))] | |
| t_val = starts[min(range(len(starts)), key = lambda i: abs(starts[i]-float(ts)))] | |
| tid = starts.index(t_val) | |
| eid = starts.index(e_val) | |
| ts_list_len = len(starts[tid:eid]) | |
| return tid, ts_list_len | |
| def get_cc(video_id): | |
| try: | |
| transcript_list = YouTubeTranscriptApi.list_transcripts(video_id) | |
| try: | |
| # filter for manually created transcripts | |
| transcript = transcript_list.find_manually_created_transcript(['en','en-US','en-GB','en-IN']) | |
| except Exception as e: | |
| # print(e) | |
| transcript = None | |
| manual = True | |
| if not transcript: | |
| try: | |
| # or automatically generated ones | |
| transcript = transcript_list.find_generated_transcript(['en']) | |
| manual = False | |
| except Exception as e: | |
| # print(e) | |
| transcript = None | |
| if transcript: | |
| if manual: file_name = os.path.join('transcripts', str(video_id) + "_cc_manual" + ".txt") | |
| else: file_name = os.path.join('transcripts', str(video_id) + "_cc_auto" + ".txt") | |
| with open(file_name, 'w') as file: | |
| for line in transcript.fetch(): | |
| file.write(str(line).replace(r'\xa0', ' ').replace(r'\n', '') + '\n') | |
| # print(f"CC downloaded in {file_name}") | |
| return file_name | |
| else: | |
| #print("No transcript found") | |
| return None | |
| except Exception as e: | |
| #print(e) | |
| return None | |
| def transcript_creator(filename,timestamp,end_pt): | |
| #print(filename) | |
| with open(filename, 'r') as f: | |
| data = f.readlines() | |
| #print("This is data: ", data) | |
| transcripts = [] | |
| #print("this is ts: ",timestamp) | |
| if not(timestamp) and not(end_pt): | |
| #print("executing 1 ") | |
| for line in data: | |
| ccs = ast.literal_eval(line) | |
| transcripts.append(ccs['text']) | |
| return transcripts | |
| elif not(timestamp) and end_pt : | |
| timestamp = 0 | |
| start,lenlist = seg_getter(data, timestamp, end_pt) | |
| for t in range(lenlist): | |
| ccs = ast.literal_eval(data[start+t]) | |
| transcripts.append(ccs['text']) | |
| return transcripts | |
| else : | |
| #print("executing 2") | |
| start,lenlist = seg_getter(data,timestamp,end_pt) | |
| #print(f""" This is the ts list{ts_len}""") | |
| for t in range(lenlist): | |
| ccs = ast.literal_eval(data[start+t]) | |
| transcripts.append(ccs['text']) | |
| return transcripts | |
| def transcript_collector(link,ts,es): | |
| vid = id_ts_grabber(link) | |
| print(f""" Fetching the transcript """) | |
| filename = get_cc(vid) | |
| return transcript_creator(filename, ts, es), vid | |
| transcript = pd.DataFrame(columns=['text', 'video_id']) | |
| transcript.loc[0,'text'],transcript.loc[0,'video_id'] = transcript_collector(link,start,end) | |
| def segment(corpus): | |
| text_data = [re.sub(r'\[.*?\]', '', x).strip() for x in corpus] | |
| text_data = [x for x in text_data if x != ''] | |
| df = pd.DataFrame(text_data, columns=["utterance"]) | |
| # remove new line, tab, return | |
| df["utterance"] = df["utterance"].apply(lambda x: x.replace("\n", " ").replace("\r", " ").replace("\t", " ")) | |
| # remove Nan | |
| df.dropna(inplace=True) | |
| sts = SemanticTextSegmentation(df) | |
| texts = sts.get_segments() | |
| return texts | |
| sf = pd.DataFrame(columns=['Segmented_Text','video_id']) | |
| text = segment(transcript.at[0,'text']) | |
| for i in range(len(text)): | |
| sf.loc[i, 'Segmented_Text'] = text[i] | |
| sf.loc[i, 'video_id'] = transcript.at[0,'video_id'] | |
| def word_seg(text): | |
| text = text.replace("\n", " ").replace("\r", " ").replace("\t", " ").replace("\xa0", " ") | |
| results = sym_spell.word_segmentation(text, max_edit_distance=0) | |
| texts = results.segmented_string | |
| #result = re.sub(r'[^\w\s]', '',texts).lower() | |
| return texts | |
| for i in range(len(sf)): | |
| #st.write(sf.at[i, 'Segmented_Text']) | |
| sf.loc[i, 'Segmented_Text'] = word_seg(sf.at[i, 'Segmented_Text']) | |
| sf.loc[i, 'Lengths'] = len(tokenizer(sf.at[i, 'Segmented_Text'])['input_ids']) | |
| texts = pd.DataFrame(columns=['texts']) | |
| def segment_loader(dataframe): | |
| flag = 0 | |
| for i in range(len(dataframe)): | |
| if flag > 0: | |
| flag -= 1 | |
| continue | |
| m = 512 | |
| iter = 0 | |
| texts.loc[i, 'texts'] = dataframe.at[i+iter, 'Segmented_Text'] | |
| length = dataframe.at[i+iter, 'Lengths'] | |
| texts.loc[i,'video_id'] = dataframe.at[i, 'video_id'] | |
| while i+iter < len(dataframe)-1 and dataframe.at[i, 'video_id'] == dataframe.at[i+iter+1, 'video_id']: | |
| if length + dataframe.at[i + iter + 1, 'Lengths'] <= m : | |
| texts.loc[i,'texts'] += " " + dataframe.at[i+iter+1, 'Segmented_Text'] | |
| length += dataframe.at[i+iter + 1,'Lengths'] | |
| iter += 1 | |
| else: | |
| break | |
| flag = iter | |
| return texts | |
| cleaned_text = segment_loader(sf) | |
| cleaned_text.reset_index(drop=True, inplace=True) | |
| return cleaned_text | |
| def t5_summarizer(link,start, end): | |
| input_text = clean_text(link,start,end) | |
| lst_outputs = [] | |
| tokenizer1 = AutoTokenizer.from_pretrained("CareerNinja/t5_large_1e-4_on_V3dataset") | |
| model1 = AutoModelForSeq2SeqLM.from_pretrained("CareerNinja/t5_large_1e-4_on_V3dataset") | |
| summarizer1 = pipeline("summarization", model=model1, tokenizer=tokenizer1) | |
| print(f""" Entered summarizer ! """) | |
| st.write('Below is the summary of the given URL: ') | |
| for i in range(len(input_text)): | |
| summary = summarizer1(input_text.at[i,'texts'], min_length=64, max_length=128) | |
| sumry = list(summary[0].values()) | |
| input_text.loc[i,'Generated Summary'] = sumry[0] | |
| lst_outputs.append(sumry[0]) | |
| st.write(input_text.at[i,'Generated Summary']) | |
| if i != len(input_text) - 1: | |
| st.write('=====================================================================================') | |
| return lst_outputs | |
| def card_creator(path, text, y_value): | |
| img = Image.open(path) | |
| def text_wrap(text, font, max_width): | |
| """Wrap text base on specified width. | |
| This is to enable text of width more than the image width to be display | |
| nicely. | |
| @params: | |
| text: str | |
| text to wrap | |
| font: obj | |
| font of the text | |
| max_width: int | |
| width to split the text with | |
| @return | |
| lines: list[str] | |
| list of sub-strings | |
| """ | |
| lines = [] | |
| # If the text width is smaller than the image width, then no need to split | |
| # just add it to the line list and return | |
| if font.getsize(text)[0] <= max_width: | |
| lines.append(text) | |
| else: | |
| #split the line by spaces to get words | |
| words = text.split(' ') | |
| i = 0 | |
| # append every word to a line while its width is shorter than the image width | |
| while i < len(words): | |
| line = '' | |
| while i < len(words) and font.getsize(line + words[i])[0] <= max_width: | |
| line = line + words[i]+ " " | |
| i += 1 | |
| if not line: | |
| line = words[i] | |
| i += 1 | |
| lines.append(line) | |
| return lines | |
| font_path = 'Montserrat-Regular.ttf' | |
| font = ImageFont.truetype(font=font_path, size=22) | |
| lines = text_wrap(text, font, img.size[0] - 44) | |
| line_height = font.getsize('hg')[1] | |
| draw = ImageDraw.Draw(img) | |
| #Draw text on image | |
| color = 'rgb(255,255,255)' # white color | |
| x = 22 | |
| y = y_value | |
| for line in lines: | |
| draw.text((x,y), line, fill=color, font=font) | |
| y = y + line_height # update y-axis for new line | |
| img.save("card.png") | |
| st.image(img, caption="Summary Card") | |
| def main(): | |
| if 'submitted' not in st.session_state: | |
| st.session_state.submitted = False | |
| if 'opt' not in st.session_state: | |
| st.session_state.opt = [] | |
| def callback(): | |
| st.session_state.submitted = True | |
| st.title('Video Summarizer') | |
| url = st.text_input('Enter the Video Link') | |
| start_pt = st.text_input('Enter the Start point in secs') | |
| end_pt = st.text_input('Enter the end point in secs') | |
| if (st.button("Submit URL", on_click=callback) and url) : | |
| opt = t5_summarizer(url,start_pt,end_pt) | |
| st.session_state.opt = opt | |
| #st.write(st.session_state) | |
| #text = st.text_input('Enter the Summary here to make a Summary Card.') | |
| #text = st.selectbox('Select the summary you want to creat a card of ', opt, key="text") | |
| #st.write('You selected:', option) | |
| if st.session_state.submitted and st.session_state.opt: | |
| text = st.selectbox('Select the summary you want to creat a card of ', st.session_state.opt) | |
| option = st.selectbox('Which color template would you like to use ?',('Elf Green','Dark Pastel Green')) | |
| if st.button("Generate Summary Card") and text and option: | |
| if option == 'Elf Green': | |
| if len(text) > 380 : | |
| st.error('Summary is too long !') | |
| else: | |
| card_creator('iteration5_empty.png',text,335) | |
| else : | |
| if len(text) > 430 : | |
| st.error('Summary is too long !') | |
| else : | |
| card_creator('X-93.png',text,285) | |
| with open("card.png", "rb") as file: | |
| btn = st.download_button( | |
| label="Download card", | |
| data=file, | |
| file_name="card.png", | |
| mime="image/png" | |
| ) | |
| main() |