Spaces:
Runtime error
Runtime error
| from textwrap3 import wrap | |
| text = """Elon Musk has shown again he can influence the digital currency market with just his tweets. After saying that his electric vehicle-making company | |
| Tesla will not accept payments in Bitcoin because of environmental concerns, he tweeted that he was working with developers of Dogecoin to improve | |
| system transaction efficiency. Following the two distinct statements from him, the world's largest cryptocurrency hit a two-month low, while Dogecoin | |
| rallied by about 20 percent. The SpaceX CEO has in recent months often tweeted in support of Dogecoin, but rarely for Bitcoin. In a recent tweet, | |
| Musk put out a statement from Tesla that it was “concerned” about the rapidly increasing use of fossil fuels for Bitcoin (price in India) mining and | |
| transaction, and hence was suspending vehicle purchases using the cryptocurrency. A day later he again tweeted saying, “To be clear, I strongly | |
| believe in crypto, but it can't drive a massive increase in fossil fuel use, especially coal”. It triggered a downward spiral for Bitcoin value but | |
| the cryptocurrency has stabilised since. A number of Twitter users welcomed Musk's statement. One of them said it's time people started realising | |
| that Dogecoin “is here to stay” and another referred to Musk's previous assertion that crypto could become the world's future currency.""" | |
| for wrp in wrap(text, 150): | |
| print (wrp) | |
| print ("\n") | |
| """## Example 2""" | |
| import torch | |
| from transformers import T5ForConditionalGeneration,T5Tokenizer | |
| summary_model = T5ForConditionalGeneration.from_pretrained('t5-base') | |
| summary_tokenizer = T5Tokenizer.from_pretrained('t5-base') | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| summary_model = summary_model.to(device) | |
| import random | |
| import numpy as np | |
| def set_seed(seed: int): | |
| random.seed(seed) | |
| np.random.seed(seed) | |
| torch.manual_seed(seed) | |
| torch.cuda.manual_seed_all(seed) | |
| set_seed(42) | |
| import nltk | |
| nltk.download('punkt') | |
| nltk.download('brown') | |
| nltk.download('wordnet') | |
| from nltk.corpus import wordnet as wn | |
| from nltk.tokenize import sent_tokenize | |
| def postprocesstext (content): | |
| final="" | |
| for sent in sent_tokenize(content): | |
| sent = sent.capitalize() | |
| final = final +" "+sent | |
| return final | |
| def summarizer(text,model,tokenizer): | |
| text = text.strip().replace("\n"," ") | |
| text = "summarize: "+text | |
| # print (text) | |
| max_len = 512 | |
| encoding = tokenizer.encode_plus(text,max_length=max_len, pad_to_max_length=False,truncation=True, return_tensors="pt").to(device) | |
| input_ids, attention_mask = encoding["input_ids"], encoding["attention_mask"] | |
| outs = model.generate(input_ids=input_ids, | |
| attention_mask=attention_mask, | |
| early_stopping=True, | |
| num_beams=3, | |
| num_return_sequences=1, | |
| no_repeat_ngram_size=2, | |
| min_length = 75, | |
| max_length=300) | |
| dec = [tokenizer.decode(ids,skip_special_tokens=True) for ids in outs] | |
| summary = dec[0] | |
| summary = postprocesstext(summary) | |
| summary= summary.strip() | |
| return summary | |
| summarized_text = summarizer(text,summary_model,summary_tokenizer) | |
| print ("\noriginal Text >>") | |
| for wrp in wrap(text, 150): | |
| print (wrp) | |
| print ("\n") | |
| print ("Summarized Text >>") | |
| for wrp in wrap(summarized_text, 150): | |
| print (wrp) | |
| print ("\n") | |
| """# **Answer Span Extraction (Keywords and Noun Phrases)**""" | |
| total = 10 | |
| import nltk | |
| nltk.download('stopwords') | |
| from nltk.corpus import stopwords | |
| import string | |
| import pke | |
| import traceback | |
| def get_nouns_multipartite(content): | |
| out=[] | |
| try: | |
| extractor = pke.unsupervised.MultipartiteRank() | |
| extractor.load_document(input=content,language='en') | |
| # not contain punctuation marks or stopwords as candidates. | |
| pos = {'PROPN','NOUN'} | |
| #pos = {'PROPN','NOUN'} | |
| stoplist = list(string.punctuation) | |
| stoplist += ['-lrb-', '-rrb-', '-lcb-', '-rcb-', '-lsb-', '-rsb-'] | |
| stoplist += stopwords.words('english') | |
| # extractor.candidate_selection(pos=pos, stoplist=stoplist) | |
| extractor.candidate_selection(pos=pos) | |
| # 4. build the Multipartite graph and rank candidates using random walk, | |
| # alpha controls the weight adjustment mechanism, see TopicRank for | |
| # threshold/method parameters. | |
| extractor.candidate_weighting(alpha=1.1, | |
| threshold=0.75, | |
| method='average') | |
| keyphrases = extractor.get_n_best(n=15) | |
| for val in keyphrases: | |
| out.append(val[0]) | |
| except: | |
| out = [] | |
| traceback.print_exc() | |
| return out | |
| from flashtext import KeywordProcessor | |
| def get_keywords(originaltext,summarytext,total): | |
| keywords = get_nouns_multipartite(originaltext) | |
| print ("keywords unsummarized: ",keywords) | |
| keyword_processor = KeywordProcessor() | |
| for keyword in keywords: | |
| keyword_processor.add_keyword(keyword) | |
| keywords_found = keyword_processor.extract_keywords(summarytext) | |
| keywords_found = list(set(keywords_found)) | |
| print ("keywords_found in summarized: ",keywords_found) | |
| important_keywords =[] | |
| for keyword in keywords: | |
| if keyword in keywords_found: | |
| important_keywords.append(keyword) | |
| return important_keywords[:total] | |
| imp_keywords = get_keywords(text,summarized_text,total) | |
| print (imp_keywords) | |
| """# **Question generation with T5**""" | |
| question_model = T5ForConditionalGeneration.from_pretrained('ramsrigouthamg/t5_squad_v1') | |
| question_tokenizer = T5Tokenizer.from_pretrained('ramsrigouthamg/t5_squad_v1') | |
| question_model = question_model.to(device) | |
| def get_question(context,answer,model,tokenizer): | |
| text = "context: {} answer: {}".format(context,answer) | |
| encoding = tokenizer.encode_plus(text,max_length=384, pad_to_max_length=False,truncation=True, return_tensors="pt").to(device) | |
| input_ids, attention_mask = encoding["input_ids"], encoding["attention_mask"] | |
| outs = model.generate(input_ids=input_ids, | |
| attention_mask=attention_mask, | |
| early_stopping=True, | |
| num_beams=5, | |
| num_return_sequences=1, | |
| no_repeat_ngram_size=2, | |
| max_length=72) | |
| dec = [tokenizer.decode(ids,skip_special_tokens=True) for ids in outs] | |
| Question = dec[0].replace("question:","") | |
| Question= Question.strip() | |
| return Question | |
| for wrp in wrap(summarized_text, 150): | |
| print (wrp) | |
| print ("\n") | |
| for answer in imp_keywords: | |
| ques = get_question(summarized_text,answer,question_model,question_tokenizer) | |
| print (ques) | |
| print (answer.capitalize()) | |
| print ("\n") | |
| """# **Gradio UI Visualization**""" | |
| #pip install git+https://github.com/explosion/sense2vec/releases/download/v1.0.0/s2v_reddit_2015_md.tar.gz | |
| #tar -xvf s2v_reddit_2015_md.tar.gz | |
| import numpy as np | |
| from sense2vec import Sense2Vec | |
| s2v = Sense2Vec().from_disk('s2v_old') | |
| from sentence_transformers import SentenceTransformer | |
| # paraphrase-distilroberta-base-v1 | |
| sentence_transformer_model = SentenceTransformer('msmarco-distilbert-base-v3') | |
| from similarity.normalized_levenshtein import NormalizedLevenshtein | |
| normalized_levenshtein = NormalizedLevenshtein() | |
| def filter_same_sense_words(original,wordlist): | |
| filtered_words=[] | |
| base_sense =original.split('|')[1] | |
| print (base_sense) | |
| for eachword in wordlist: | |
| if eachword[0].split('|')[1] == base_sense: | |
| filtered_words.append(eachword[0].split('|')[0].replace("_", " ").title().strip()) | |
| return filtered_words | |
| def get_highest_similarity_score(wordlist,wrd): | |
| score=[] | |
| for each in wordlist: | |
| score.append(normalized_levenshtein.similarity(each.lower(),wrd.lower())) | |
| return max(score) | |
| def sense2vec_get_words(word,s2v,topn,question): | |
| output = [] | |
| print ("word ",word) | |
| try: | |
| sense = s2v.get_best_sense(word, senses= ["NOUN", "PERSON","PRODUCT","LOC","ORG","EVENT","NORP","WORK OF ART","FAC","GPE","NUM","FACILITY"]) | |
| most_similar = s2v.most_similar(sense, n=topn) | |
| # print (most_similar) | |
| output = filter_same_sense_words(sense,most_similar) | |
| print ("Similar ",output) | |
| except: | |
| output =[] | |
| threshold = 0.6 | |
| final=[word] | |
| checklist =question.split() | |
| for x in output: | |
| if get_highest_similarity_score(final,x)<threshold and x not in final and x not in checklist: | |
| final.append(x) | |
| return final[1:] | |
| def mmr(doc_embedding, word_embeddings, words, top_n, lambda_param): | |
| # Extract similarity within words, and between words and the document | |
| word_doc_similarity = cosine_similarity(word_embeddings, doc_embedding) | |
| word_similarity = cosine_similarity(word_embeddings) | |
| # Initialize candidates and already choose best keyword/keyphrase | |
| keywords_idx = [np.argmax(word_doc_similarity)] | |
| candidates_idx = [i for i in range(len(words)) if i != keywords_idx[0]] | |
| for _ in range(top_n - 1): | |
| # Extract similarities within candidates and | |
| # between candidates and selected keywords/phrases | |
| candidate_similarities = word_doc_similarity[candidates_idx, :] | |
| target_similarities = np.max(word_similarity[candidates_idx][:, keywords_idx], axis=1) | |
| # Calculate MMR | |
| mmr = (lambda_param) * candidate_similarities - (1-lambda_param) * target_similarities.reshape(-1, 1) | |
| mmr_idx = candidates_idx[np.argmax(mmr)] | |
| # Update keywords & candidates | |
| keywords_idx.append(mmr_idx) | |
| candidates_idx.remove(mmr_idx) | |
| return [words[idx] for idx in keywords_idx] | |
| from collections import OrderedDict | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| def get_distractors_wordnet(word): | |
| distractors=[] | |
| try: | |
| syn = wn.synsets(word,'n')[0] | |
| word= word.lower() | |
| orig_word = word | |
| if len(word.split())>0: | |
| word = word.replace(" ","_") | |
| hypernym = syn.hypernyms() | |
| if len(hypernym) == 0: | |
| return distractors | |
| for item in hypernym[0].hyponyms(): | |
| name = item.lemmas()[0].name() | |
| #print ("name ",name, " word",orig_word) | |
| if name == orig_word: | |
| continue | |
| name = name.replace("_"," ") | |
| name = " ".join(w.capitalize() for w in name.split()) | |
| if name is not None and name not in distractors: | |
| distractors.append(name) | |
| except: | |
| print ("Wordnet distractors not found") | |
| return distractors | |
| def get_distractors (word,origsentence,sense2vecmodel,sentencemodel,top_n,lambdaval): | |
| distractors = sense2vec_get_words(word,sense2vecmodel,top_n,origsentence) | |
| print ("distractors ",distractors) | |
| if len(distractors) ==0: | |
| return distractors | |
| distractors_new = [word.capitalize()] | |
| distractors_new.extend(distractors) | |
| # print ("distractors_new .. ",distractors_new) | |
| embedding_sentence = origsentence+ " "+word.capitalize() | |
| # embedding_sentence = word | |
| keyword_embedding = sentencemodel.encode([embedding_sentence]) | |
| distractor_embeddings = sentencemodel.encode(distractors_new) | |
| # filtered_keywords = mmr(keyword_embedding, distractor_embeddings,distractors,4,0.7) | |
| max_keywords = min(len(distractors_new),5) | |
| filtered_keywords = mmr(keyword_embedding, distractor_embeddings,distractors_new,max_keywords,lambdaval) | |
| # filtered_keywords = filtered_keywords[1:] | |
| final = [word.capitalize()] | |
| for wrd in filtered_keywords: | |
| if wrd.lower() !=word.lower(): | |
| final.append(wrd.capitalize()) | |
| final = final[1:] | |
| return final | |
| sent = "What cryptocurrency did Musk rarely tweet about?" | |
| keyword = "Bitcoin" | |
| # sent = "What did Musk say he was working with to improve system transaction efficiency?" | |
| # keyword= "Dogecoin" | |
| # sent = "What company did Musk say would not accept bitcoin payments?" | |
| # keyword= "Tesla" | |
| # sent = "What has Musk often tweeted in support of?" | |
| # keyword = "Cryptocurrency" | |
| print (get_distractors(keyword,sent,s2v,sentence_transformer_model,40,0.2)) | |
| """# **Gradio Visualization with MCQs**""" | |
| # import mysql.connector | |
| # import datetime; | |
| # mydb = mysql.connector.connect( | |
| # host="qtechdb-1.cexugk1h8rui.ap-northeast-1.rds.amazonaws.com", | |
| # user="admin", | |
| # password="F3v2vGWzb8vaniE3nqzi", | |
| # database="spring_social" | |
| # ) | |
| import gradio as gr | |
| import re | |
| context = gr.Textbox(lines=10, placeholder="Enter paragraph/content here...", label="Enter your content (words input must be more than 150 words).") | |
| total = gr.Slider(1,10, value=1,step=1, label="Total Number Of Questions") | |
| subject = gr.Textbox(placeholder="Enter subject/title here...", label="Enter your title (title must contain 1 word)") | |
| output = gr.HTML( label="Question and Answers") | |
| def generate_question_text(context,subject,total): | |
| words_text = len(re.findall(r'\w+', context)) | |
| words_subject = len(re.findall(r'\w+', subject)) | |
| if (words_text < 150): | |
| raise gr.Error("Invalid Input (Words limit must be more than 150 words).") | |
| # print("Number of words:", words) | |
| elif (words_subject < 1): | |
| raise gr.Error("Invalid Input (Title must be one or more than one word).") | |
| elif (words_subject < 1): | |
| raise gr.Error("Invalid Input (Title must be one or more than one word).") | |
| else: | |
| summary_text = summarizer(context,summary_model,summary_tokenizer) | |
| for wrp in wrap(summary_text, 150): | |
| print (wrp) | |
| # np = getnounphrases(summary_text,sentence_transformer_model,3) | |
| np = get_keywords(context,summary_text,total) | |
| random.shuffle(np) | |
| print ("\n\nNoun phrases",np) | |
| output="<b style='color:black;'>Select/Tick the correct answer.</b><br><br>" | |
| i = 1 | |
| for answer in np: | |
| ques = get_question(summary_text,answer,question_model,question_tokenizer) | |
| distractors = get_distractors(answer.capitalize(),ques,s2v,sentence_transformer_model,40,0.2) | |
| # output= output + ques + "\n" + "Ans: "+answer.capitalize() + "\n\n" | |
| output = output + "<b style='color:black;'>Q"+ str(i) + ") " + ques + "</b><br/>" | |
| # output = output + "<br>" | |
| i += 1 | |
| answerlist=[answer.capitalize()] | |
| # output = output + "<br><b> ▪ " +answer.capitalize()+ "</b>" | |
| for distractor in distractors[:3]: | |
| answerlist.append(distractor) | |
| random.shuffle(answerlist) | |
| print(answerlist) | |
| for answer in answerlist: | |
| output = output +answer.capitalize()+ "<br/>" | |
| output = output + "<br><b style='color:black;'>" + "Correct Answer Key:</b><br>" | |
| i = 1 | |
| for answer in np: | |
| output = output + "<b style='color:green;'>Ans"+ str(i) + ") " +answer.capitalize()+ "</b>" | |
| output = output + "<br>" | |
| i += 1 | |
| # mycursor = mydb.cursor() | |
| # timedate = datetime.datetime.now() | |
| # sql = "INSERT INTO mcqstexts (subject, input, output, timedate) VALUES (%s,%s, %s,%s)" | |
| # val = (subject, context, output, timedate) | |
| # mycursor.execute(sql, val) | |
| # mydb.commit() | |
| # print(mycursor.rowcount, "record inserted.") | |
| return output | |
| iface = gr.Interface( | |
| fn=generate_question_text, | |
| inputs=[context,subject,total], | |
| outputs=output, | |
| allow_flagging="never",flagging_options=["Save Data"]) | |
| # iface.launch(debug=True, share=True) | |
| def generate_question(context,subject,total): | |
| summary_text = summarizer(context,summary_model,summary_tokenizer) | |
| for wrp in wrap(summary_text, 150): | |
| print (wrp) | |
| # np = getnounphrases(summary_text,sentence_transformer_model,3) | |
| np = get_keywords(context,summary_text,total) | |
| random.shuffle(np) | |
| print ("\n\nNoun phrases",np) | |
| output="<b style='color:black;'>Select/Tick the correct answer.</b><br><br>" | |
| i = 1 | |
| for answer in np: | |
| ques = get_question(summary_text,answer,question_model,question_tokenizer) | |
| distractors = get_distractors(answer.capitalize(),ques,s2v,sentence_transformer_model,40,0.2) | |
| # output= output + ques + "\n" + "Ans: "+answer.capitalize() + "\n\n" | |
| output = output + "<b style='color:black;'>Q"+ str(i) + ") " + ques + "</b><br/>" | |
| # output = output + "<br>" | |
| i += 1 | |
| answerlist=[answer.capitalize()] | |
| # output = output + "<br><b> ▪ " +answer.capitalize()+ "</b>" | |
| for distractor in distractors[:3]: | |
| answerlist.append(distractor) | |
| random.shuffle(answerlist) | |
| # print(answerlist) | |
| for answer in answerlist: | |
| output = output +answer.capitalize()+ "<br/>" | |
| output = output + "<br><b style='color:black;'>" + "Correct Answer Key:</b><br>" | |
| i = 1 | |
| for answer in np: | |
| output = output + "<b style='color:green;'>Ans"+ str(i) + ") " +answer.capitalize()+ "</b>" | |
| output = output + "<br/>" | |
| i += 1 | |
| # mycursor = mydb.cursor() | |
| # timedate = datetime.datetime.now() | |
| # sql = "INSERT INTO mcqstexts (subject, input, output, timedate) VALUES (%s,%s, %s,%s)" | |
| # val = (subject, context, output, timedate) | |
| # mycursor.execute(sql, val) | |
| # mydb.commit() | |
| # print(mycursor.rowcount, "record inserted.") | |
| return output | |
| import pandas as pd | |
| file =None | |
| def filecreate(x,subject,total): | |
| with open(x.name) as fo: | |
| text = fo.read() | |
| # print(text) | |
| words_text = len(re.findall(r'\w+', text)) | |
| words_subject = len(re.findall(r'\w+', subject)) | |
| if (words_text < 150): | |
| raise gr.Error("Invalid Input (Words limit must be more than 150 words).") | |
| # print("Number of words:", words) | |
| elif (words_subject < 1): | |
| raise gr.Error("Invalid Input (Title must be one or more than one word).") | |
| else: | |
| generated = generate_question(text,subject, total) | |
| return generated | |
| # filecreate(file,2) | |
| import gradio as gr | |
| context = gr.HTML(label="Text") | |
| file = gr.File(label="Upload your *.txt file (File must contain more than 150 words).") | |
| total = gr.Slider(1,10, value=1,step=1, label="Total Number Of Questions") | |
| subject = gr.Textbox(placeholder="Enter subject/title here...", label="Enter your title (title must contain 1 word).") | |
| fface = gr.Interface( | |
| fn=filecreate, | |
| inputs=[file,subject,total], | |
| outputs=context, | |
| # css=".gradio-container {background-image: url('file=blue.jpg')}", | |
| allow_flagging="never",flagging_options=["Save Data"]) | |
| # fface.launch(debug=True, share=True) | |
| demo = gr.TabbedInterface([iface, fface], ["Text", "Upload File"]) | |
| demo.launch(debug=True, show_api=False) |