Spaces:
Runtime error
Runtime error
| import streamlit as st | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| from transformers import AutoTokenizer, AutoModelWithLMHead | |
| import torch | |
| import nltk | |
| import before_run | |
| #nltk.download('wordnet') | |
| #nltk.download('punkt') | |
| #nltk.download('brown') | |
| #nltk.download('stopwords') | |
| from nltk.tokenize import sent_tokenize | |
| from flashtext import KeywordProcessor | |
| from nltk.corpus import stopwords | |
| from urllib import response | |
| import requests | |
| import string | |
| import traceback | |
| import pke | |
| link = "http://127.0.0.1:8000/question" | |
| summary_tokenizer = AutoTokenizer.from_pretrained("t5-base") | |
| summary_model = AutoModelWithLMHead.from_pretrained("t5-base") | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| summary_model = summary_model.to(device) | |
| question_model = AutoModelWithLMHead.from_pretrained('ramsrigouthamg/t5_squad_v1') | |
| question_tokenizer = AutoTokenizer.from_pretrained('ramsrigouthamg/t5_squad_v1') | |
| question_model = question_model.to(device) | |
| def query(url, payload): | |
| return requests.post(url, json=payload) | |
| def fetch_transcript(url): | |
| vid = url.split("=")[1] | |
| transcript = YouTubeTranscriptApi.get_transcript(vid) | |
| result = "" | |
| for i in transcript: | |
| result += ' ' + i['text'] | |
| return result | |
| def postprocesstext (content): | |
| final="" | |
| for sent in sent_tokenize(content): | |
| sent = sent.capitalize() | |
| final = final +" "+sent | |
| return final | |
| def summarizer(text,model,tokenizer): | |
| text = text.strip().replace("\n"," ") | |
| text = "summarize: "+text | |
| # print (text) | |
| max_len = 512 | |
| encoding = tokenizer.encode_plus(text,max_length=max_len, pad_to_max_length=False,truncation=True, return_tensors="pt").to(device) | |
| input_ids, attention_mask = encoding["input_ids"], encoding["attention_mask"] | |
| outs = model.generate(input_ids=input_ids, | |
| attention_mask=attention_mask, | |
| early_stopping=True, | |
| num_beams=3, | |
| num_return_sequences=1, | |
| no_repeat_ngram_size=2, | |
| min_length = 75, | |
| max_length=300) | |
| dec = [tokenizer.decode(ids,skip_special_tokens=True) for ids in outs] | |
| summary = dec[0] | |
| summary = postprocesstext(summary) | |
| summary= summary.strip() | |
| return summary | |
| def get_nouns_multipartite(content): | |
| out=[] | |
| try: | |
| extractor = pke.unsupervised.MultipartiteRank() | |
| stoplist = list(string.punctuation) | |
| stoplist += ['-lrb-', '-rrb-', '-lcb-', '-rcb-', '-lsb-', '-rsb-'] | |
| stoplist += stopwords.words('english') | |
| extractor.load_document(input=content, stoplist=stoplist) | |
| # not contain punctuation marks or stopwords as candidates. | |
| pos = {'PROPN','NOUN'} | |
| extractor.candidate_selection(pos=pos) | |
| extractor.candidate_weighting(alpha=1.1, | |
| threshold=0.75, | |
| method='average') | |
| keyphrases = extractor.get_n_best(n=15) | |
| for val in keyphrases: | |
| out.append(val[0]) | |
| except: | |
| out = [] | |
| traceback.print_exc() | |
| return out | |
| def get_keywords(originaltext,summarytext,count): | |
| keywords = get_nouns_multipartite(originaltext) | |
| print ("keywords unsummarized: ",keywords) | |
| keyword_processor = KeywordProcessor() | |
| for keyword in keywords: | |
| keyword_processor.add_keyword(keyword) | |
| keywords_found = keyword_processor.extract_keywords(summarytext) | |
| keywords_found = list(set(keywords_found)) | |
| print ("keywords_found in summarized: ",keywords_found) | |
| important_keywords =[] | |
| for keyword in keywords: | |
| if keyword in keywords_found: | |
| important_keywords.append(keyword) | |
| return important_keywords[:int(count)] | |
| def get_question(context,answer,model,tokenizer): | |
| text = "context: {} answer: {}".format(context,answer) | |
| encoding = tokenizer.encode_plus(text,max_length=384, pad_to_max_length=False,truncation=True, return_tensors="pt").to(device) | |
| input_ids, attention_mask = encoding["input_ids"], encoding["attention_mask"] | |
| outs = model.generate(input_ids=input_ids, | |
| attention_mask=attention_mask, | |
| early_stopping=True, | |
| num_beams=5, | |
| num_return_sequences=1, | |
| no_repeat_ngram_size=2, | |
| max_length=72) | |
| dec = [tokenizer.decode(ids,skip_special_tokens=True) for ids in outs] | |
| Question = dec[0].replace("question:","") | |
| Question= Question.strip() | |
| return Question | |
| def all(url,count): | |
| transcript = fetch_transcript(url) | |
| summarized_text = summarizer(transcript, summary_model, summary_tokenizer) | |
| keywords = get_keywords(transcript,summarized_text,count) | |
| qna = [] | |
| for answer in keywords: | |
| qna.append(get_question(summarized_text,answer,question_model,question_tokenizer)+' : '+answer) | |
| return qna | |
| def main(): | |
| if 'submitted' not in st.session_state: | |
| st.session_state.submitted = False | |
| if 'opt' not in st.session_state: | |
| st.session_state.opt = [] | |
| def callback(): | |
| st.session_state.submitted = True | |
| st.title('QnA pair Generator') | |
| url = st.text_input('Enter the Video Link') | |
| count = st.text_input('Enter the number of questions you want to generate') | |
| if (st.button("Submit URL", on_click=callback) and url and count) : | |
| st.write("Thanks for submission !") | |
| opt = all(url, count) | |
| st.session_state.opt = opt | |
| if st.session_state.submitted and st.session_state.opt: | |
| option = st.multiselect('Select the question you want to add to database ', st.session_state.opt) | |
| if option: | |
| if st.button("Add question"): | |
| for i in range(len(option)): | |
| files = { | |
| "question": option[i].split(":")[0], | |
| "answer": option[i].split(":")[1] | |
| } | |
| response = query(link, files) | |
| st.write(response.text) | |
| main() | |