Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import torch | |
| from tqdm import tqdm | |
| from peft import PeftModel, PeftConfig | |
| from transformers import AutoModelForSeq2SeqLM, AutoModelForCausalLM | |
| from transformers import AutoTokenizer | |
| import numpy as np | |
| import time | |
| import string | |
| # JS | |
| import nltk | |
| nltk.download('wordnet') | |
| from nltk.corpus import wordnet as wn | |
| from nltk.tokenize import word_tokenize | |
| def get_models(llama=False): | |
| st.write('Loading the model...') | |
| config = PeftConfig.from_pretrained("YouNameIt/T5ForReverseDictionary_prefix_tuned") | |
| model = AutoModelForSeq2SeqLM.from_pretrained("google/flan-t5-large") | |
| model = PeftModel.from_pretrained(model, "YouNameIt/T5ForReverseDictionary_prefix_tuned") | |
| tokenizer = AutoTokenizer.from_pretrained("google/flan-t5-large") | |
| # JS | |
| if llama: | |
| model_name = 'meta-llama/Llama-2-7b-chat-hf' | |
| access_token = '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' | |
| llama_tokenizer = AutoTokenizer.from_pretrained(model_name, use_auth_token=access_token, use_fast=True)#, use_fast=True) | |
| llama_model = AutoModelForCausalLM.from_pretrained(model_name, use_auth_token=access_token, device_map={'':0})#, load_in_4bit=True) | |
| st.write("The assistant is loaded and ready to use!") | |
| return model, tokenizer, llama_model, llama_tokenizer | |
| else: | |
| st.write("_The assistant is loaded and ready to use! :tada:_") | |
| return model, tokenizer | |
| model, tokenizer = get_models() | |
| def remove_punctuation(word): | |
| # Create a translation table that maps all punctuation characters to None | |
| translator = str.maketrans('', '', string.punctuation) | |
| # Use the translate method to remove punctuation from the word | |
| word_without_punctuation = word.translate(translator) | |
| return word_without_punctuation | |
| def return_top_k(sentence, k=10, word=None, rels=False): | |
| if sentence[-1] != ".": | |
| sentence = sentence + "." | |
| if rels: | |
| inputs = [f"Description : It is related to '{word}' but not '{word}'. Word : "] | |
| else: | |
| inputs = [f"Description : {sentence} Word : "] | |
| inputs = tokenizer( | |
| inputs, | |
| padding=True, truncation=True, | |
| return_tensors="pt", | |
| ) | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| model.to(device) | |
| with torch.no_grad(): | |
| inputs = {k: v.to(device) for k, v in inputs.items()} | |
| output_sequences = model.generate(input_ids=inputs["input_ids"], max_new_tokens=10, num_beams=k+5, num_return_sequences=k+5, #max_length=3, | |
| top_p = 50, output_scores=True, return_dict_in_generate=True) #repetition_penalty=10000.0 | |
| logits = output_sequences['sequences_scores'].clone().detach() | |
| decoded_probabilities = torch.softmax(logits, dim=0) | |
| #all word predictions | |
| predictions = [tokenizer.decode(tokens, skip_special_tokens=True) for tokens in output_sequences['sequences']] | |
| probabilities = [round(float(prob), 2) for prob in decoded_probabilities] | |
| stripped_sent = [remove_punctuation(word.lower()) for word in sentence.split()] | |
| for pred in predictions: | |
| if (len(pred) < 2) | (pred in stripped_sent): | |
| predictions.pop(predictions.index(pred)) | |
| return predictions[:10] | |
| # JS | |
| def get_related_words(word, num=5): | |
| model.eval() | |
| with torch.no_grad(): | |
| sentence = [f"Descripton : It is related to {word} but not {word}. Word : "] | |
| #inputs = ["Description: It is something to cut stuff with. Word: "] | |
| print(sentence) | |
| inputs = tokenizer(sentence, padding=True, truncation=True, return_tensors="pt",) | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| model.to(device) | |
| batch = {k: v.to(device) for k, v in inputs.items()} | |
| beam_outputs = model.generate( | |
| input_ids=batch['input_ids'], max_new_tokens=10, num_beams=num+2, num_return_sequences=num+2, early_stopping=True | |
| ) | |
| #beam_preds = [tokenizer.decode(beam_output.detach().cpu().numpy(), skip_special_tokens=True) for beam_output in beam_outputs if ] | |
| beam_preds = [] | |
| for beam_output in beam_outputs: | |
| prediction = tokenizer.decode(beam_output.detach().cpu().numpy(), skip_special_tokens=True).strip() | |
| if prediction not in " ".join(sentence): | |
| beam_preds.append(prediction) | |
| return ", ".join(beam_preds[:num]) | |
| #if 'messages' not in st.session_state: | |
| def get_text(): | |
| input_text = st.chat_input() | |
| return input_text | |
| def write_bot(input, remember=True, blink=True): | |
| with st.chat_message('assistant'): | |
| message_placeholder = st.empty() | |
| full_response = input | |
| if blink == True: | |
| response = '' | |
| for chunk in full_response.split(): | |
| response += chunk + " " | |
| time.sleep(0.05) | |
| # Add a blinking cursor to simulate typing | |
| message_placeholder.markdown(response + "▌") | |
| time.sleep(0.5) | |
| message_placeholder.markdown(full_response) | |
| if remember == True: | |
| st.session_state.messages.append({'role': 'assistant', 'content': full_response}) | |
| #def ask_if_helped(): | |
| #y = st.button('Yes!', key=60) | |
| #n = st.button('No...', key=61) | |
| #new = st.button('I have a new word', key=62) | |
| #if y: | |
| # write_bot("I am happy to help!") | |
| # again = st.button('Play again') | |
| # if again: | |
| # write_bot("Please describe your word!") | |
| # st.session_state.is_helpful['ask'] = False | |
| #elif n: | |
| # st.session_state.actions.append('cue') | |
| # st.session_state.is_helpful['ask'] = False | |
| # #cue_generation() | |
| #elif new: | |
| # write_bot("Please describe your word!") | |
| # st.session_state.is_helpful['ask'] = False | |
| ## removed: if st.session_state.actions[-1] == "result": | |
| # JS | |
| def get_related_words_llama(relation, target, device, num=5): | |
| prompt = f"Provide {num} {relation}s for the word '{target}'. Your answer consists of these {num} words only. Do not include the word '{target}' itself in your answer" | |
| inputs = tokenizer([prompt], return_tensors='pt').to(device) | |
| output = model.generate( | |
| **inputs, max_new_tokens=40, temperature=.75, early_stopping=True, | |
| ) | |
| chatbot_response = tokenizer.decode(output[:, inputs['input_ids'].shape[-1]:][0], skip_special_tokens=True).strip() | |
| postproc = [word for word in word_tokenize(chatbot_response) if len(word)>=3] | |
| return postproc[-num:] if len(postproc)>=num else postproc | |
| def postproc_wn(related_words, syns=False): | |
| if syns: | |
| related_words = [word.split('.')[0] if word[0] != "." else word.split('.')[1] for word in related_words] | |
| else: | |
| related_words = [word.name().split('.')[0] if word.name()[0] != "." else word.name().split('.')[1] for word in related_words] | |
| related_words = [word.replace("_", " ") for word in related_words] | |
| return related_words | |
| # JS | |
| def get_available_cues(target): | |
| wn_nouns = [word.name() for word in wn.all_synsets(pos='n')] | |
| wn_nouns = [word.split('.')[0] if word[0] != "." else word.split('.')[1] for word in wn_nouns] | |
| if target in wn_nouns: | |
| available_cues = {} | |
| synset_target = wn.synsets(target, pos=wn.NOUN)[0] | |
| #if wn.synonyms(target)[0]: | |
| # available_cues['Synonyms'] = postproc_wn(wn.synonyms(target)[0], syns=True) | |
| #if synset_target.hypernyms(): | |
| # available_cues['Hypernyms'] = postproc_wn(synset_target.hypernyms()) | |
| #if synset_target.hyponyms(): | |
| # available_cues['Hyponyms'] = postproc_wn(synset_target.hyponyms()) | |
| if synset_target.examples(): | |
| examples = [] | |
| for example in synset_target.examples(): | |
| examples.append(example.replace(target, "...")) | |
| available_cues['Examples'] = examples | |
| return available_cues | |
| else: | |
| return None | |
| # JS: moved the cue generation further down | |
| #def cue_generation(): | |
| # if st.session_state.actions[-1] == 'cue': | |
| if 'messages' not in st.session_state: | |
| st.session_state.messages = [] | |
| if 'results' not in st.session_state: | |
| st.session_state.results = {'results': False, 'results_print': False} | |
| if 'actions' not in st.session_state: | |
| st.session_state.actions = [""] | |
| if 'counters' not in st.session_state: | |
| st.session_state.counters = {"letter_count": 0, "word_count": 0} | |
| if 'is_helpful' not in st.session_state: | |
| st.session_state.is_helpful = {'ask':False} | |
| if 'descriptions' not in st.session_state: | |
| st.session_state.descriptions = [] | |
| st.title("You name it! 🗣") | |
| with st.chat_message('user', avatar='nursulu.jpg'): | |
| st.write("Hey assistant!") | |
| bot = st.chat_message('assistant') | |
| bot.write("Hello human! Wanna practice naming some words?") | |
| #for showing history of messages | |
| for message in st.session_state.messages: | |
| if message['role'] == 'user': | |
| with st.chat_message(message['role'], avatar='nursulu.jpg'): | |
| st.markdown(message['content']) | |
| else: | |
| with st.chat_message(message['role']): | |
| st.markdown(message['content']) | |
| #display user message in chat message container | |
| prompt = get_text() | |
| if prompt: | |
| with st.chat_message('user', avatar='nursulu.jpg'): | |
| st.markdown(prompt) | |
| #add to history | |
| st.session_state.messages.append({'role': 'user', 'content': prompt}) | |
| #TODO: replace it with zero-shot classifier | |
| yes = ['yes', 'again', 'sure', 'new word', 'yes!', 'yep', 'yeah'] | |
| no = ['no', 'nope', 'nah'] | |
| try: | |
| if prompt.lower() in yes: | |
| write_bot("Please describe your word!") | |
| elif prompt.lower() in no: | |
| write_bot("Okay, see you next time then! :innocent:") | |
| elif prompt == 'it is similar to the best place on earth': | |
| write_bot("Great! Let me think what it could be...") | |
| time.sleep(3) | |
| write_bot("Do you mean Saarland?") | |
| #if previously we asked to give a prompt | |
| elif (st.session_state.messages[-2]['content'] == "Please describe your word!") & (st.session_state.messages[-1]['content'] != "no"): | |
| write_bot("Great! Let me think what it could be...") | |
| st.session_state.descriptions.append(prompt) | |
| st.session_state.results['results'] = return_top_k(st.session_state.descriptions[-1]) | |
| st.session_state.results['results_print'] = dict(zip(range(1, 11), st.session_state.results['results'])) | |
| write_bot("I think I have some ideas. Do you want to see my guesses or do you want a cue?") | |
| st.session_state.actions.append("result") | |
| except: | |
| write_bot("Sorry, I didn't understand you... I am still learning :sob: For now, could you respond with 'yes' or 'no'? ") | |
| if st.session_state.actions[-1] == "result": | |
| col1, col2, col3, col4, col5 = st.columns(5) | |
| with col1: | |
| a1 = st.button('Results', key=10) | |
| with col2: | |
| a2 = st.button('Cue', key=11) | |
| if a1: | |
| write_bot("Here are my guesses about your word:") | |
| st.write(st.session_state.results['results_print']) | |
| time.sleep(1) | |
| write_bot('Does it help you remember the word?', remember=False) | |
| st.session_state.is_helpful['ask'] = True | |
| elif a2: | |
| #write_bot(f'The first letter is {st.session_state.results["results"][0][0]}.') | |
| #time.sleep(1) | |
| st.session_state.actions.append('cue') | |
| #cue_generation() | |
| #write_bot('Does it help you remember the word?', remember=False) | |
| #st.session_state.is_helpful['ask'] = True | |
| if st.session_state.is_helpful['ask']: | |
| y = st.button('Yes!', key=60) | |
| n = st.button('No...', key=61) | |
| new = st.button('I have a new word', key=62) | |
| if y: | |
| write_bot("I am happy to help!") | |
| again = st.button('Play again') | |
| if again: | |
| write_bot("Please describe your word!") | |
| st.session_state.is_helpful['ask'] = False | |
| elif n: | |
| st.session_state.is_helpful['ask'] = False | |
| st.session_state.actions.append('cue') | |
| #cue_generation() | |
| elif new: | |
| write_bot("Please describe your word!") | |
| st.session_state.is_helpful['ask'] = False | |
| if st.session_state.actions[-1] == 'cue': | |
| guessed = False | |
| write_bot('What do you want to see?', remember=False, blink=False) | |
| while guessed == False: | |
| # JS | |
| word_count = st.session_state.counters["word_count"] | |
| target = st.session_state.results["results"][word_count] | |
| col1, col2, col3, col4, col5 = st.columns(5) | |
| with col1: | |
| b1 = st.button("Next letter", key="1") | |
| with col2: | |
| b2 = st.button("Related words") | |
| with col3: | |
| b3 = st.button("Next word", key="2") | |
| with col4: | |
| b4 = st.button("All words", key="3") | |
| # JS | |
| #if get_available_cues(target): | |
| # avail_cues = get_available_cues(target) | |
| #cues_buttons = {cue_type: st.button(cue_type) for cue_type in avail_cues} | |
| b5 = st.button("I remembered the word!", key="4", type='primary') | |
| b6 = st.button("Exit", key="5", type='primary') | |
| new = st.button('Play again', key=64, type='primary') | |
| if b1: | |
| st.session_state.counters["letter_count"] += 1 | |
| #word_count = st.session_state.counters["word_count"] | |
| letter_count = st.session_state.counters["letter_count"] | |
| if letter_count < len(target): | |
| write_bot(f'The word starts with {st.session_state.results["results"][word_count][:letter_count]}.', remember=False) | |
| #ask_if_helped() | |
| st.session_state.is_helpful['ask'] = True | |
| else: | |
| write_bot(f'This is my predicted word: "{target}". Does this match your query?') | |
| #ask_if_helped() | |
| st.session_state.is_helpful['ask'] = True | |
| elif b2: | |
| rels = return_top_k(st.session_state.descriptions[-1], word=target, rels=True) | |
| write_bot(f'Here are words that are related to your word: {", ".join(rels)}.', remember=False) | |
| #ask_if_helped() | |
| st.session_state.is_helpful['ask'] = True | |
| elif b3: | |
| st.session_state.counters["letter_count"] = 1 | |
| letter_count = st.session_state.counters["letter_count"] | |
| st.session_state.counters["word_count"] += 1 | |
| word_count = st.session_state.counters["word_count"] | |
| #write_bot(f'The next word starts with {st.session_state.results["results"][word_count][:letter_count]}', remember=False) | |
| if letter_count < len(target): | |
| write_bot(f'The next word starts with {st.session_state.results["results"][word_count][:letter_count]}.', remember=False) | |
| #ask_if_helped() | |
| st.session_state.is_helpful['ask'] = True | |
| else: | |
| write_bot(f'This is my predicted word: "{target}". Does this match your query?') | |
| #ask_if_helped() | |
| st.session_state.is_helpful['ask'] = True | |
| #elif get_available_cues(target) and "Synonyms" in cues_buttons and cues_buttons['Synonyms']: | |
| #write_bot(f'Here are synonyms for the current word: {", ".join(avail_cues["Synonyms"])}', remember=False) | |
| #elif get_available_cues(target) and "Hypernyms" in cues_buttons and cues_buttons['Hypernyms']: | |
| #write_bot(f'Here are hypernyms for the current word: {", ".join(avail_cues["Hypernyms"])}', remember=False) | |
| #elif get_available_cues(target) and "Hyponyms" in cues_buttons and cues_buttons['Hyponyms']: | |
| #write_bot(f'Here are hyponyms for the current word: {", ".join(avail_cues["Hyponyms"])}', remember=False) | |
| #elif get_available_cues(target) and "Examples" in cues_buttons and cues_buttons['Examples']: | |
| #write_bot(f'Here are example contexts for the current word: {", ".join(avail_cues["Examples"])}', remember=False) | |
| elif b4: | |
| write_bot(f"Here are all my guesses about your word: {st.session_state.results['results_print']}") | |
| elif b5: | |
| write_bot("Yay! I am happy I could be of help!") | |
| st.session_state.counters["word_count"] = 0 | |
| st.session_state.counters["letter_count"] = 0 | |
| new = st.button('Play again', key=63) | |
| if new: | |
| write_bot("Please describe your word!") | |
| guessed = True | |
| break | |
| elif b6: | |
| write_bot("I am sorry I couldn't help you this time. See you soon!") | |
| st.session_state.counters["word_count"] = 0 | |
| st.session_state.counters["letter_count"] = 0 | |
| st.session_state.actions.append('cue') | |
| if new: | |
| write_bot("Please describe your word!") | |
| st.session_state.counters["word_count"] = 0 | |
| st.session_state.counters["letter_count"] = 0 | |
| break | |