test1 / app.py
AkashKhamkar's picture
Update app.py
4398f74
import sentence_transformers
from transformers import AutoTokenizer
from youtube_transcript_api import YouTubeTranscriptApi
import os
import ast
import pandas as pd
import nltk
nltk.download('stopwords')
from segmentation import SemanticTextSegmentation
import random
import re
import string
from symspellpy import SymSpell, Verbosity
import pkg_resources
import torch
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
from torch import cuda
from transformers import pipeline
import gradio as gr
device = 'cuda' if cuda.is_available() else 'cpu'
tokenizer = AutoTokenizer.from_pretrained("CareerNinja/t5_large_3_1_3e_4_v3_dataset")
os.makedirs('./transcripts/')
def clean_text(link,start,end):
sym_spell = SymSpell(max_dictionary_edit_distance=2, prefix_length=7)
dictionary_path = pkg_resources.resource_filename(
"symspellpy", "frequency_dictionary_en_82_765.txt"
)
sym_spell.load_dictionary(dictionary_path, term_index=0, count_index=1)
def id_ts_grabber(link):
youtube_video = link.split("=")
video_id = youtube_video[1]
#print(f""" This is the video ID: {video_id} and this is the Timestamp: {time_stamp}""")
return video_id
#print(f""" This is the video ID: {video_id} and no Timestamp was found""")
def seg_getter(data,ts,es):
starts = []
for line in data:
ccs = ast.literal_eval(line)
starts.append(float(ccs['start']))
#print(starts)
ts_ = float(ts.strip("s&end"))
#es_ = float(es.strip(es[-1]))
t_val = starts[min(range(len(starts)), key = lambda i: abs(starts[i]-ts_))]
e_val = starts[min(range(len(starts)), key = lambda i: abs(starts[i]-float(es)))]
tid = starts.index(t_val)
eid = starts.index(e_val)
ts_list_len = len(starts[tid:eid])
return tid, ts_list_len
def get_cc(video_id):
try:
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
try:
# filter for manually created transcripts
transcript = transcript_list.find_manually_created_transcript(['en','en-US','en-GB','en-IN'])
except Exception as e:
# print(e)
transcript = None
manual = True
if not transcript:
try:
# or automatically generated ones
transcript = transcript_list.find_generated_transcript(['en'])
manual = False
except Exception as e:
# print(e)
transcript = None
if transcript:
if manual: file_name = os.path.join('transcripts', str(video_id) + "_cc_manual" + ".txt")
else: file_name = os.path.join('transcripts', str(video_id) + "_cc_auto" + ".txt")
with open(file_name, 'w') as file:
for line in transcript.fetch():
file.write(str(line).replace(r'\xa0', ' ').replace(r'\n', '') + '\n')
# print(f"CC downloaded in {file_name}")
return file_name
else:
#print("No transcript found")
return None
except Exception as e:
#print(e)
return None
def transcript_creator(filename,timestamp,end_pt):
#print(filename)
with open(filename, 'r') as f:
data = f.readlines()
#print("This is data: ", data)
transcripts = []
#print("this is ts: ",timestamp)
if timestamp == None:
#print("executing 1 ")
for line in data:
ccs = ast.literal_eval(line)
transcripts.append(ccs['text'])
return transcripts
else :
#print("executing 2")
start,lenlist = seg_getter(data,timestamp,end_pt)
#print(f""" This is the ts list{ts_len}""")
for t in range(lenlist):
ccs = ast.literal_eval(data[start+t])
transcripts.append(ccs['text'])
return transcripts
def transcript_collector(link,ts,es):
vid = id_ts_grabber(link)
print(f""" Fetching the transcript """)
filename = get_cc(vid)
return transcript_creator(filename, ts, es), vid
transcript = pd.DataFrame(columns=['text', 'video_id'])
transcript.loc[0,'text'],transcript.loc[0,'video_id'] = transcript_collector(link,start,end)
def segment(corpus):
text_data = [re.sub(r'\[.*?\]', '', x).strip() for x in corpus]
text_data = [x for x in text_data if x != '']
df = pd.DataFrame(text_data, columns=["utterance"])
# remove new line, tab, return
df["utterance"] = df["utterance"].apply(lambda x: x.replace("\n", " ").replace("\r", " ").replace("\t", " "))
# remove Nan
df.dropna(inplace=True)
sts = SemanticTextSegmentation(df)
texts = sts.get_segments()
return texts
sf = pd.DataFrame(columns=['Segmented_Text','video_id'])
text = segment(transcript.at[0,'text'])
for i in range(len(text)):
sf.loc[i, 'Segmented_Text'] = text[i]
sf.loc[i, 'video_id'] = transcript.at[0,'video_id']
def word_seg(text):
text = text.replace("\n", " ").replace("\r", " ").replace("\t", " ").replace("\xa0", " ")
results = sym_spell.word_segmentation(text, max_edit_distance=0)
texts = results.segmented_string
#result = re.sub(r'[^\w\s]', '',texts).lower()
return texts
for i in range(len(sf)):
sf.loc[i, 'Segmented_Text'] = word_seg(sf.at[i, 'Segmented_Text'])
sf.loc[i, 'Lengths'] = len(tokenizer(sf.at[i, 'Segmented_Text'])['input_ids'])
texts = pd.DataFrame(columns=['texts'])
def segment_loader(dataframe):
flag = 0
for i in range(len(dataframe)):
if flag > 0:
flag -= 1
continue
m = 512
iter = 0
texts.loc[i, 'texts'] = dataframe.at[i+iter, 'Segmented_Text']
length = dataframe.at[i+iter, 'Lengths']
texts.loc[i,'video_id'] = dataframe.at[i, 'video_id']
while i+iter < len(dataframe)-1 and dataframe.at[i, 'video_id'] == dataframe.at[i+iter+1, 'video_id']:
if length + dataframe.at[i + iter + 1, 'Lengths'] <= m :
texts.loc[i,'texts'] += " " + dataframe.at[i+iter+1, 'Segmented_Text']
length += dataframe.at[i+iter + 1,'Lengths']
iter += 1
else:
break
flag = iter
return texts
cleaned_text = segment_loader(sf)
cleaned_text.reset_index(drop=True, inplace=True)
return cleaned_text
def t5_summarizer(link,start, end):
input_text = clean_text(link,start,end)
model1 = AutoModelForSeq2SeqLM.from_pretrained("CareerNinja/t5_large_3_1_3e_4_v3_dataset")
summarizer1 = pipeline("summarization", model=model1, tokenizer=tokenizer)
print(f""" Entered summarizer ! """)
out = []
for i in range(len(input_text)):
summary = summarizer1(input_text.at[i,'texts'], min_length=64, max_length=128)
sumry = list(summary[0].values())
input_text.loc[i,'Generated Summary'] = sumry[0]
return (input_text.at[i, 'Generated Summary'])
outbox = gr.Textbox(label = "Below is the generated summary !", placeholder="Enter a link to see a summary over here !", lines =5)
interface = gr.Interface(fn=t5_summarizer,inputs=["text","text","text"],outputs=outbox).launch(debug=True)
interface.launch()