Spaces:
Runtime error
Runtime error
| import sentence_transformers | |
| from transformers import AutoTokenizer | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| import os | |
| import ast | |
| import pandas as pd | |
| import nltk | |
| nltk.download('stopwords') | |
| from segmentation import SemanticTextSegmentation | |
| import random | |
| import re | |
| import string | |
| from symspellpy import SymSpell, Verbosity | |
| import pkg_resources | |
| import torch | |
| from transformers import AutoTokenizer, AutoModelForSeq2SeqLM | |
| from torch import cuda | |
| from transformers import pipeline | |
| import gradio as gr | |
| device = 'cuda' if cuda.is_available() else 'cpu' | |
| tokenizer = AutoTokenizer.from_pretrained("CareerNinja/t5_large_3_1_3e_4_v3_dataset") | |
| os.makedirs('./transcripts/') | |
| def clean_text(link,start,end): | |
| sym_spell = SymSpell(max_dictionary_edit_distance=2, prefix_length=7) | |
| dictionary_path = pkg_resources.resource_filename( | |
| "symspellpy", "frequency_dictionary_en_82_765.txt" | |
| ) | |
| sym_spell.load_dictionary(dictionary_path, term_index=0, count_index=1) | |
| def id_ts_grabber(link): | |
| youtube_video = link.split("=") | |
| video_id = youtube_video[1] | |
| #print(f""" This is the video ID: {video_id} and this is the Timestamp: {time_stamp}""") | |
| return video_id | |
| #print(f""" This is the video ID: {video_id} and no Timestamp was found""") | |
| def seg_getter(data,ts,es): | |
| starts = [] | |
| for line in data: | |
| ccs = ast.literal_eval(line) | |
| starts.append(float(ccs['start'])) | |
| #print(starts) | |
| ts_ = float(ts.strip("s&end")) | |
| #es_ = float(es.strip(es[-1])) | |
| t_val = starts[min(range(len(starts)), key = lambda i: abs(starts[i]-ts_))] | |
| e_val = starts[min(range(len(starts)), key = lambda i: abs(starts[i]-float(es)))] | |
| tid = starts.index(t_val) | |
| eid = starts.index(e_val) | |
| ts_list_len = len(starts[tid:eid]) | |
| return tid, ts_list_len | |
| def get_cc(video_id): | |
| try: | |
| transcript_list = YouTubeTranscriptApi.list_transcripts(video_id) | |
| try: | |
| # filter for manually created transcripts | |
| transcript = transcript_list.find_manually_created_transcript(['en','en-US','en-GB','en-IN']) | |
| except Exception as e: | |
| # print(e) | |
| transcript = None | |
| manual = True | |
| if not transcript: | |
| try: | |
| # or automatically generated ones | |
| transcript = transcript_list.find_generated_transcript(['en']) | |
| manual = False | |
| except Exception as e: | |
| # print(e) | |
| transcript = None | |
| if transcript: | |
| if manual: file_name = os.path.join('transcripts', str(video_id) + "_cc_manual" + ".txt") | |
| else: file_name = os.path.join('transcripts', str(video_id) + "_cc_auto" + ".txt") | |
| with open(file_name, 'w') as file: | |
| for line in transcript.fetch(): | |
| file.write(str(line).replace(r'\xa0', ' ').replace(r'\n', '') + '\n') | |
| # print(f"CC downloaded in {file_name}") | |
| return file_name | |
| else: | |
| #print("No transcript found") | |
| return None | |
| except Exception as e: | |
| #print(e) | |
| return None | |
| def transcript_creator(filename,timestamp,end_pt): | |
| #print(filename) | |
| with open(filename, 'r') as f: | |
| data = f.readlines() | |
| #print("This is data: ", data) | |
| transcripts = [] | |
| #print("this is ts: ",timestamp) | |
| if timestamp == None: | |
| #print("executing 1 ") | |
| for line in data: | |
| ccs = ast.literal_eval(line) | |
| transcripts.append(ccs['text']) | |
| return transcripts | |
| else : | |
| #print("executing 2") | |
| start,lenlist = seg_getter(data,timestamp,end_pt) | |
| #print(f""" This is the ts list{ts_len}""") | |
| for t in range(lenlist): | |
| ccs = ast.literal_eval(data[start+t]) | |
| transcripts.append(ccs['text']) | |
| return transcripts | |
| def transcript_collector(link,ts,es): | |
| vid = id_ts_grabber(link) | |
| print(f""" Fetching the transcript """) | |
| filename = get_cc(vid) | |
| return transcript_creator(filename, ts, es), vid | |
| transcript = pd.DataFrame(columns=['text', 'video_id']) | |
| transcript.loc[0,'text'],transcript.loc[0,'video_id'] = transcript_collector(link,start,end) | |
| def segment(corpus): | |
| text_data = [re.sub(r'\[.*?\]', '', x).strip() for x in corpus] | |
| text_data = [x for x in text_data if x != ''] | |
| df = pd.DataFrame(text_data, columns=["utterance"]) | |
| # remove new line, tab, return | |
| df["utterance"] = df["utterance"].apply(lambda x: x.replace("\n", " ").replace("\r", " ").replace("\t", " ")) | |
| # remove Nan | |
| df.dropna(inplace=True) | |
| sts = SemanticTextSegmentation(df) | |
| texts = sts.get_segments() | |
| return texts | |
| sf = pd.DataFrame(columns=['Segmented_Text','video_id']) | |
| text = segment(transcript.at[0,'text']) | |
| for i in range(len(text)): | |
| sf.loc[i, 'Segmented_Text'] = text[i] | |
| sf.loc[i, 'video_id'] = transcript.at[0,'video_id'] | |
| def word_seg(text): | |
| text = text.replace("\n", " ").replace("\r", " ").replace("\t", " ").replace("\xa0", " ") | |
| results = sym_spell.word_segmentation(text, max_edit_distance=0) | |
| texts = results.segmented_string | |
| #result = re.sub(r'[^\w\s]', '',texts).lower() | |
| return texts | |
| for i in range(len(sf)): | |
| sf.loc[i, 'Segmented_Text'] = word_seg(sf.at[i, 'Segmented_Text']) | |
| sf.loc[i, 'Lengths'] = len(tokenizer(sf.at[i, 'Segmented_Text'])['input_ids']) | |
| texts = pd.DataFrame(columns=['texts']) | |
| def segment_loader(dataframe): | |
| flag = 0 | |
| for i in range(len(dataframe)): | |
| if flag > 0: | |
| flag -= 1 | |
| continue | |
| m = 512 | |
| iter = 0 | |
| texts.loc[i, 'texts'] = dataframe.at[i+iter, 'Segmented_Text'] | |
| length = dataframe.at[i+iter, 'Lengths'] | |
| texts.loc[i,'video_id'] = dataframe.at[i, 'video_id'] | |
| while i+iter < len(dataframe)-1 and dataframe.at[i, 'video_id'] == dataframe.at[i+iter+1, 'video_id']: | |
| if length + dataframe.at[i + iter + 1, 'Lengths'] <= m : | |
| texts.loc[i,'texts'] += " " + dataframe.at[i+iter+1, 'Segmented_Text'] | |
| length += dataframe.at[i+iter + 1,'Lengths'] | |
| iter += 1 | |
| else: | |
| break | |
| flag = iter | |
| return texts | |
| cleaned_text = segment_loader(sf) | |
| cleaned_text.reset_index(drop=True, inplace=True) | |
| return cleaned_text | |
| def t5_summarizer(link,start, end): | |
| input_text = clean_text(link,start,end) | |
| model1 = AutoModelForSeq2SeqLM.from_pretrained("CareerNinja/t5_large_3_1_3e_4_v3_dataset") | |
| summarizer1 = pipeline("summarization", model=model1, tokenizer=tokenizer) | |
| print(f""" Entered summarizer ! """) | |
| out = [] | |
| for i in range(len(input_text)): | |
| summary = summarizer1(input_text.at[i,'texts'], min_length=64, max_length=128) | |
| sumry = list(summary[0].values()) | |
| input_text.loc[i,'Generated Summary'] = sumry[0] | |
| return (input_text.at[i, 'Generated Summary']) | |
| outbox = gr.Textbox(label = "Below is the generated summary !", placeholder="Enter a link to see a summary over here !", lines =5) | |
| interface = gr.Interface(fn=t5_summarizer,inputs=["text","text","text"],outputs=outbox).launch(debug=True) | |
| interface.launch() |