import sentence_transformers from transformers import AutoTokenizer from youtube_transcript_api import YouTubeTranscriptApi import os import ast import pandas as pd import nltk nltk.download('stopwords') from segmentation import SemanticTextSegmentation import random import re import string from symspellpy import SymSpell, Verbosity import pkg_resources import torch from transformers import AutoTokenizer, AutoModelForSeq2SeqLM from torch import cuda from transformers import pipeline import gradio as gr device = 'cuda' if cuda.is_available() else 'cpu' tokenizer = AutoTokenizer.from_pretrained("CareerNinja/t5_large_3_1_3e_4_v3_dataset") os.makedirs('./transcripts/') def clean_text(link,start,end): sym_spell = SymSpell(max_dictionary_edit_distance=2, prefix_length=7) dictionary_path = pkg_resources.resource_filename( "symspellpy", "frequency_dictionary_en_82_765.txt" ) sym_spell.load_dictionary(dictionary_path, term_index=0, count_index=1) def id_ts_grabber(link): youtube_video = link.split("=") video_id = youtube_video[1] #print(f""" This is the video ID: {video_id} and this is the Timestamp: {time_stamp}""") return video_id #print(f""" This is the video ID: {video_id} and no Timestamp was found""") def seg_getter(data,ts,es): starts = [] for line in data: ccs = ast.literal_eval(line) starts.append(float(ccs['start'])) #print(starts) ts_ = float(ts.strip("s&end")) #es_ = float(es.strip(es[-1])) t_val = starts[min(range(len(starts)), key = lambda i: abs(starts[i]-ts_))] e_val = starts[min(range(len(starts)), key = lambda i: abs(starts[i]-float(es)))] tid = starts.index(t_val) eid = starts.index(e_val) ts_list_len = len(starts[tid:eid]) return tid, ts_list_len def get_cc(video_id): try: transcript_list = YouTubeTranscriptApi.list_transcripts(video_id) try: # filter for manually created transcripts transcript = transcript_list.find_manually_created_transcript(['en','en-US','en-GB','en-IN']) except Exception as e: # print(e) transcript = None manual = True if not transcript: try: # or automatically generated ones transcript = transcript_list.find_generated_transcript(['en']) manual = False except Exception as e: # print(e) transcript = None if transcript: if manual: file_name = os.path.join('transcripts', str(video_id) + "_cc_manual" + ".txt") else: file_name = os.path.join('transcripts', str(video_id) + "_cc_auto" + ".txt") with open(file_name, 'w') as file: for line in transcript.fetch(): file.write(str(line).replace(r'\xa0', ' ').replace(r'\n', '') + '\n') # print(f"CC downloaded in {file_name}") return file_name else: #print("No transcript found") return None except Exception as e: #print(e) return None def transcript_creator(filename,timestamp,end_pt): #print(filename) with open(filename, 'r') as f: data = f.readlines() #print("This is data: ", data) transcripts = [] #print("this is ts: ",timestamp) if timestamp == None: #print("executing 1 ") for line in data: ccs = ast.literal_eval(line) transcripts.append(ccs['text']) return transcripts else : #print("executing 2") start,lenlist = seg_getter(data,timestamp,end_pt) #print(f""" This is the ts list{ts_len}""") for t in range(lenlist): ccs = ast.literal_eval(data[start+t]) transcripts.append(ccs['text']) return transcripts def transcript_collector(link,ts,es): vid = id_ts_grabber(link) print(f""" Fetching the transcript """) filename = get_cc(vid) return transcript_creator(filename, ts, es), vid transcript = pd.DataFrame(columns=['text', 'video_id']) transcript.loc[0,'text'],transcript.loc[0,'video_id'] = transcript_collector(link,start,end) def segment(corpus): text_data = [re.sub(r'\[.*?\]', '', x).strip() for x in corpus] text_data = [x for x in text_data if x != ''] df = pd.DataFrame(text_data, columns=["utterance"]) # remove new line, tab, return df["utterance"] = df["utterance"].apply(lambda x: x.replace("\n", " ").replace("\r", " ").replace("\t", " ")) # remove Nan df.dropna(inplace=True) sts = SemanticTextSegmentation(df) texts = sts.get_segments() return texts sf = pd.DataFrame(columns=['Segmented_Text','video_id']) text = segment(transcript.at[0,'text']) for i in range(len(text)): sf.loc[i, 'Segmented_Text'] = text[i] sf.loc[i, 'video_id'] = transcript.at[0,'video_id'] def word_seg(text): text = text.replace("\n", " ").replace("\r", " ").replace("\t", " ").replace("\xa0", " ") results = sym_spell.word_segmentation(text, max_edit_distance=0) texts = results.segmented_string #result = re.sub(r'[^\w\s]', '',texts).lower() return texts for i in range(len(sf)): sf.loc[i, 'Segmented_Text'] = word_seg(sf.at[i, 'Segmented_Text']) sf.loc[i, 'Lengths'] = len(tokenizer(sf.at[i, 'Segmented_Text'])['input_ids']) texts = pd.DataFrame(columns=['texts']) def segment_loader(dataframe): flag = 0 for i in range(len(dataframe)): if flag > 0: flag -= 1 continue m = 512 iter = 0 texts.loc[i, 'texts'] = dataframe.at[i+iter, 'Segmented_Text'] length = dataframe.at[i+iter, 'Lengths'] texts.loc[i,'video_id'] = dataframe.at[i, 'video_id'] while i+iter < len(dataframe)-1 and dataframe.at[i, 'video_id'] == dataframe.at[i+iter+1, 'video_id']: if length + dataframe.at[i + iter + 1, 'Lengths'] <= m : texts.loc[i,'texts'] += " " + dataframe.at[i+iter+1, 'Segmented_Text'] length += dataframe.at[i+iter + 1,'Lengths'] iter += 1 else: break flag = iter return texts cleaned_text = segment_loader(sf) cleaned_text.reset_index(drop=True, inplace=True) return cleaned_text def t5_summarizer(link,start, end): input_text = clean_text(link,start,end) model1 = AutoModelForSeq2SeqLM.from_pretrained("CareerNinja/t5_large_3_1_3e_4_v3_dataset") summarizer1 = pipeline("summarization", model=model1, tokenizer=tokenizer) print(f""" Entered summarizer ! """) out = [] for i in range(len(input_text)): summary = summarizer1(input_text.at[i,'texts'], min_length=64, max_length=128) sumry = list(summary[0].values()) input_text.loc[i,'Generated Summary'] = sumry[0] return (input_text.at[i, 'Generated Summary']) outbox = gr.Textbox(label = "Below is the generated summary !", placeholder="Enter a link to see a summary over here !", lines =5) interface = gr.Interface(fn=t5_summarizer,inputs=["text","text","text"],outputs=outbox).launch(debug=True) interface.launch()