Spaces:
Runtime error
Runtime error
File size: 7,108 Bytes
bd30af9 f4d66aa bd30af9 c5e6480 bd30af9 325e826 bd30af9 4398f74 32d06b0 c5e6480 bec231c bd30af9 bec231c bd30af9 bec231c bd30af9 bec231c bd30af9 5349733 bd30af9 c5e6480 4398f74 afeccd2 c5e6480 4029d38 bec231c 7a3e1ef bec231c 7a3e1ef 6be8ce0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 |
import sentence_transformers
from transformers import AutoTokenizer
from youtube_transcript_api import YouTubeTranscriptApi
import os
import ast
import pandas as pd
import nltk
nltk.download('stopwords')
from segmentation import SemanticTextSegmentation
import random
import re
import string
from symspellpy import SymSpell, Verbosity
import pkg_resources
import torch
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
from torch import cuda
from transformers import pipeline
import gradio as gr
device = 'cuda' if cuda.is_available() else 'cpu'
tokenizer = AutoTokenizer.from_pretrained("CareerNinja/t5_large_3_1_3e_4_v3_dataset")
os.makedirs('./transcripts/')
def clean_text(link,start,end):
sym_spell = SymSpell(max_dictionary_edit_distance=2, prefix_length=7)
dictionary_path = pkg_resources.resource_filename(
"symspellpy", "frequency_dictionary_en_82_765.txt"
)
sym_spell.load_dictionary(dictionary_path, term_index=0, count_index=1)
def id_ts_grabber(link):
youtube_video = link.split("=")
video_id = youtube_video[1]
#print(f""" This is the video ID: {video_id} and this is the Timestamp: {time_stamp}""")
return video_id
#print(f""" This is the video ID: {video_id} and no Timestamp was found""")
def seg_getter(data,ts,es):
starts = []
for line in data:
ccs = ast.literal_eval(line)
starts.append(float(ccs['start']))
#print(starts)
ts_ = float(ts.strip("s&end"))
#es_ = float(es.strip(es[-1]))
t_val = starts[min(range(len(starts)), key = lambda i: abs(starts[i]-ts_))]
e_val = starts[min(range(len(starts)), key = lambda i: abs(starts[i]-float(es)))]
tid = starts.index(t_val)
eid = starts.index(e_val)
ts_list_len = len(starts[tid:eid])
return tid, ts_list_len
def get_cc(video_id):
try:
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
try:
# filter for manually created transcripts
transcript = transcript_list.find_manually_created_transcript(['en','en-US','en-GB','en-IN'])
except Exception as e:
# print(e)
transcript = None
manual = True
if not transcript:
try:
# or automatically generated ones
transcript = transcript_list.find_generated_transcript(['en'])
manual = False
except Exception as e:
# print(e)
transcript = None
if transcript:
if manual: file_name = os.path.join('transcripts', str(video_id) + "_cc_manual" + ".txt")
else: file_name = os.path.join('transcripts', str(video_id) + "_cc_auto" + ".txt")
with open(file_name, 'w') as file:
for line in transcript.fetch():
file.write(str(line).replace(r'\xa0', ' ').replace(r'\n', '') + '\n')
# print(f"CC downloaded in {file_name}")
return file_name
else:
#print("No transcript found")
return None
except Exception as e:
#print(e)
return None
def transcript_creator(filename,timestamp,end_pt):
#print(filename)
with open(filename, 'r') as f:
data = f.readlines()
#print("This is data: ", data)
transcripts = []
#print("this is ts: ",timestamp)
if timestamp == None:
#print("executing 1 ")
for line in data:
ccs = ast.literal_eval(line)
transcripts.append(ccs['text'])
return transcripts
else :
#print("executing 2")
start,lenlist = seg_getter(data,timestamp,end_pt)
#print(f""" This is the ts list{ts_len}""")
for t in range(lenlist):
ccs = ast.literal_eval(data[start+t])
transcripts.append(ccs['text'])
return transcripts
def transcript_collector(link,ts,es):
vid = id_ts_grabber(link)
print(f""" Fetching the transcript """)
filename = get_cc(vid)
return transcript_creator(filename, ts, es), vid
transcript = pd.DataFrame(columns=['text', 'video_id'])
transcript.loc[0,'text'],transcript.loc[0,'video_id'] = transcript_collector(link,start,end)
def segment(corpus):
text_data = [re.sub(r'\[.*?\]', '', x).strip() for x in corpus]
text_data = [x for x in text_data if x != '']
df = pd.DataFrame(text_data, columns=["utterance"])
# remove new line, tab, return
df["utterance"] = df["utterance"].apply(lambda x: x.replace("\n", " ").replace("\r", " ").replace("\t", " "))
# remove Nan
df.dropna(inplace=True)
sts = SemanticTextSegmentation(df)
texts = sts.get_segments()
return texts
sf = pd.DataFrame(columns=['Segmented_Text','video_id'])
text = segment(transcript.at[0,'text'])
for i in range(len(text)):
sf.loc[i, 'Segmented_Text'] = text[i]
sf.loc[i, 'video_id'] = transcript.at[0,'video_id']
def word_seg(text):
text = text.replace("\n", " ").replace("\r", " ").replace("\t", " ").replace("\xa0", " ")
results = sym_spell.word_segmentation(text, max_edit_distance=0)
texts = results.segmented_string
#result = re.sub(r'[^\w\s]', '',texts).lower()
return texts
for i in range(len(sf)):
sf.loc[i, 'Segmented_Text'] = word_seg(sf.at[i, 'Segmented_Text'])
sf.loc[i, 'Lengths'] = len(tokenizer(sf.at[i, 'Segmented_Text'])['input_ids'])
texts = pd.DataFrame(columns=['texts'])
def segment_loader(dataframe):
flag = 0
for i in range(len(dataframe)):
if flag > 0:
flag -= 1
continue
m = 512
iter = 0
texts.loc[i, 'texts'] = dataframe.at[i+iter, 'Segmented_Text']
length = dataframe.at[i+iter, 'Lengths']
texts.loc[i,'video_id'] = dataframe.at[i, 'video_id']
while i+iter < len(dataframe)-1 and dataframe.at[i, 'video_id'] == dataframe.at[i+iter+1, 'video_id']:
if length + dataframe.at[i + iter + 1, 'Lengths'] <= m :
texts.loc[i,'texts'] += " " + dataframe.at[i+iter+1, 'Segmented_Text']
length += dataframe.at[i+iter + 1,'Lengths']
iter += 1
else:
break
flag = iter
return texts
cleaned_text = segment_loader(sf)
cleaned_text.reset_index(drop=True, inplace=True)
return cleaned_text
def t5_summarizer(link,start, end):
input_text = clean_text(link,start,end)
model1 = AutoModelForSeq2SeqLM.from_pretrained("CareerNinja/t5_large_3_1_3e_4_v3_dataset")
summarizer1 = pipeline("summarization", model=model1, tokenizer=tokenizer)
print(f""" Entered summarizer ! """)
out = []
for i in range(len(input_text)):
summary = summarizer1(input_text.at[i,'texts'], min_length=64, max_length=128)
sumry = list(summary[0].values())
input_text.loc[i,'Generated Summary'] = sumry[0]
return (input_text.at[i, 'Generated Summary'])
outbox = gr.Textbox(label = "Below is the generated summary !", placeholder="Enter a link to see a summary over here !", lines =5)
interface = gr.Interface(fn=t5_summarizer,inputs=["text","text","text"],outputs=outbox).launch(debug=True)
interface.launch() |