Spaces:
Runtime error
Runtime error
| import numpy as np | |
| import pandas as pd | |
| from urllib.parse import unquote | |
| import nltk | |
| nltk.download('popular') | |
| import re | |
| import math | |
| import configparser | |
| from bs4 import BeautifulSoup | |
| import unicodedata | |
| import string | |
| from gensim.models import Word2Vec | |
| from gensim.models.phrases import Phraser, Phrases | |
| from nltk.corpus import stopwords | |
| from nltk.stem import WordNetLemmatizer | |
| from nltk.corpus import wordnet as wn | |
| from nltk import pos_tag | |
| from collections import defaultdict | |
| from nltk.corpus import wordnet as wn | |
| tag_map = defaultdict(lambda : wn.NOUN) | |
| tag_map['J'] = wn.ADJ | |
| tag_map['V'] = wn.VERB | |
| tag_map['R'] = wn.ADV | |
| import spacy | |
| spacy.cli.download("en_core_web_md") | |
| nlp = spacy.load("en_core_web_trf") | |
| lemmatizer = WordNetLemmatizer() | |
| evType_stop = set(nltk.corpus.stopwords.words('english')) | |
| # read configuration file | |
| # config = configparser.ConfigParser() | |
| # config.read('myproject.ini') | |
| ### files to be load | |
| tag_similarModel_path = "word2vec.model" #config['path']['tag_similar_model'] #word2vec model | |
| tag_trigram_path = "tri_phrases.txt"#config['path']['tag_trigram'] # trigram phraser | |
| tag_bigram_path = "bi_phrases.txt"#config['path']['tag_bigram'] # bigram phraser | |
| tag_similarModel = Word2Vec.load("word2vec.model") | |
| tag_trigram_phraser = Phraser.load("tri_phrases.txt") | |
| tag_bigram_phraser = Phraser.load("bi_phrases.txt") | |
| # load stopword file | |
| file2 = open("stopwords_tag.txt", "r+") | |
| data2 = file2.read() | |
| stopword_tag = data2.split(",") | |
| # load vocabulary of single words | |
| file3 = open("vocabSingle.txt", "r+") | |
| data3 = file3.read() | |
| vocab1 = data3.split(",") | |
| # load vocabulary of words of length more than 2 | |
| file4 = open("vocabMulti.txt", "r+") | |
| data4 = file4.read() | |
| vocab2 = data4.split(",") | |
| # load vocabulary of words of length 2 | |
| file6 = open("vocabDouble.txt", "r+") | |
| data6 = file6.read() | |
| vocab4 = data6.split(",") | |
| ### preprocess the text string | |
| ### remove any email ids, websites, digits etc | |
| def pre_process(text): | |
| try: | |
| soup = BeautifulSoup(text, "html.parser") | |
| text = soup.get_text() | |
| text = unicodedata.normalize('NFKD', text).encode('ascii', 'ignore').decode('utf-8', 'ignore') | |
| convert_IT = re.sub(r'^IT$| IT ', " information technology ",text) | |
| remove_urls = re.sub(r'''(?i)\b((?:https?://|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:'".,<>?«»“”‘’]))'''," ",convert_IT.lower()) | |
| remove_ids = re.sub(r'\S*@\S*\S?'," ",remove_urls) | |
| remove_digit_punct = re.sub(r'[^a-zA-z0-9-]|\d+nd|\d+st|\d+th|\d+rd|null|^nan| nan ', ' ', remove_ids) | |
| remove_spaces = re.sub(r'\s{2,}', ' ', remove_digit_punct) | |
| remove_sufix = re.sub(r'^all\w+', '', remove_spaces) | |
| return remove_sufix | |
| except Exception: | |
| return text | |
| # lemmatization of words | |
| def nlk_lemma(tokns): | |
| lema_tokns = [] | |
| for token, tag in pos_tag(tokns): | |
| if token!='media': | |
| lema_tokns.append(lemmatizer.lemmatize(token, tag_map[tag[0]])) | |
| else: | |
| lema_tokns.append(token) | |
| return lema_tokns | |
| # tokenize and lemmatize the text string | |
| def preprocess(text): | |
| text = re.sub(r'^information technology$| information technology ',' IT ',text) | |
| tokens = text.split(" ") | |
| tokens = [w for w in tokens if not w in evType_stop] | |
| tokens = [token for token in tokens if len(token) > 1] | |
| tokens = nlk_lemma(tokens) | |
| return tokens | |
| # create bigrams, trigrams | |
| def phrasers(token_sent): | |
| textx = tag_bigram_phraser[token_sent] | |
| texty = list(tag_trigram_phraser[textx]) | |
| phrase_text = " ".join(texty) | |
| phrase_text = re.sub(r'^information technology$| information technology ',' IT ',phrase_text) | |
| return phrase_text | |
| # identify the part-of-speech(POS) tags for each word | |
| # select only those words which are having the POS tags as per the below list | |
| def posTags(text): | |
| text = re.sub('-','hyphen',text) | |
| doc = nlp(text) | |
| tags = [(X, X.tag_) for X in doc] | |
| keys1 = [] | |
| #print(tags) | |
| for el in tags: | |
| if el[1] in ['NN','PRP','NNP','NNS','NNPS','JJ','JJR','VB','VBD','VBZ','FW','XX','VBG','VBP','VBN']: | |
| key_text = re.sub('hyphen','-',str(el[0])) | |
| keys1.append(key_text) | |
| return keys1 | |
| def posTags1(text): | |
| text = re.sub('-','hyphen',text) | |
| text = re.sub("hyphen | hyphen",' ',text) | |
| doc = nlp(text) | |
| tags = [(X, X.tag_) for X in doc] | |
| keys1 = [] | |
| for el in tags: | |
| #if el[1] in ['NN','NNP','NNS','NNPS','JJ','VBG','VBZ','FW','XX','IN','VBN','DT','VBP','VBN']: | |
| key_text = re.sub('hyphen','-',str(el[0])) | |
| keys1.append(key_text) | |
| return keys1 | |
| # this function will from given list of words remove words that starts with 'accessory' or words contains stopwords or | |
| # words contains digits | |
| def get_keys(keys1): | |
| imp_keys =[] | |
| for tokens in keys1: | |
| if re.search("^accessory",tokens): | |
| pass | |
| elif len(tokens)>1: | |
| tex_li = tokens.split("_") | |
| if tex_li[0] in stopword_tag or tex_li[-1] in stopword_tag or tokens in stopword_tag or tex_li[0].isdigit() or tex_li[-1].isdigit(): | |
| pass | |
| else: | |
| imp_keys.append(tokens) | |
| return imp_keys | |
| # this function will return the topics eligible | |
| def tags(kys): | |
| kys = ['gis' if x=='gi' else x for x in kys] | |
| temp = [] | |
| max_words=[] | |
| max_count=(dict( (l, kys.count(l) ) for l in set(kys))) # count the frequency of all the words | |
| try: | |
| if max_count['business']: | |
| max_count['business']=1 # if 'business' word in the list then make it's count ==1 since it is most frequent word | |
| except: | |
| pass | |
| itemMaxValue = max(max_count.items(), key=lambda x: x[1]) | |
| v = list(max_count.values()) | |
| if len(max_count)!=sum(v): | |
| for key, value in max_count.items(): | |
| if value == itemMaxValue[1]: | |
| max_words.append(key) | |
| kys = list(set(kys)) | |
| for elem in kys: | |
| temp1 = [] | |
| for ek in kys: | |
| try: | |
| if 0.99 > tag_similarModel.wv.similarity(elem, ek)>0.15: | |
| temp1.append(1) | |
| else: | |
| temp1.append(0) | |
| except: | |
| temp1.append(0) | |
| temp.append(temp1) | |
| su = [] | |
| for val in temp: | |
| su.append(sum(val)) | |
| try: | |
| if 0 in list(set(su)): | |
| thrhld = math.floor(sum(list(set(su)))/(len(set(su))-1)) | |
| else: | |
| thrhld = math.floor(sum(list(set(su)))/(len(set(su)))) | |
| final_keys = [] | |
| for i in range(0,len(temp)): | |
| if sum(temp[i])>=thrhld: | |
| final_keys.append(kys[i]) | |
| if final_keys and max_words: | |
| for max_word in max_words: | |
| final_keys.append(max_word) | |
| return final_keys | |
| elif final_keys: | |
| return final_keys | |
| else: | |
| return kys | |
| except Exception as e: | |
| if len(kys)<3: | |
| return kys | |
| elif max_words: | |
| return max_words | |
| else: | |
| return [] | |
| # this function will return words which are present in vocabulary contains single words | |
| def checkSingleWord(single_list): | |
| sin_list=[] | |
| for word in single_list: | |
| if word in vocab1: | |
| sin_list.append(word) | |
| return sin_list | |
| # this function will identify whether words are present in vocabulary contains words of length 2 | |
| # if no words found in vocabulary then it will call checkSingleWord function | |
| def checkBigram(word_list): | |
| bi_list = [] | |
| for word in word_list: | |
| w2 = word.split("_") | |
| singleWord = checkSingleWord(w2) | |
| if word in vocab4: | |
| bi_list.append(word) | |
| elif singleWord: | |
| for elem in singleWord: | |
| bi_list.append(elem) | |
| return bi_list | |
| # this function will 1st identify whether words are in vocabulary contains words of length more than 2 | |
| # if not then call checkBigram function to identify words with length equal to 2 | |
| # if list return empty then call checkSingleWord function | |
| # return list of words | |
| def getFinalTags(tags): | |
| finalTags= [] | |
| for element in tags: | |
| w1 = element.split("_") | |
| if len(w1)>2: | |
| if element in vocab2: | |
| finalTags.append([element]) | |
| else: | |
| textx = tag_bigram_phraser[w1] | |
| bigram_tags = checkBigram(textx) | |
| finalTags.append(bigram_tags) | |
| elif len(w1)==2: | |
| bigram_tags = checkBigram([element]) | |
| finalTags.append(bigram_tags) | |
| else: | |
| single_tags= checkSingleWord([element]) | |
| finalTags.append(single_tags) | |
| flat_list = [item for sublist in finalTags for item in sublist] | |
| return flat_list | |
| # this function will call all the other functions | |
| # it will return the list of topics extracted from a given list | |
| def getTags(text): | |
| pos_text = posTags(text) | |
| text = " ".join(pos_text) | |
| text = pre_process(text) | |
| text_token = preprocess(text) | |
| text = phrasers(text_token) | |
| print(text) | |
| pos_text1 = posTags1(text) # convert it into list using split(" ") | |
| print(pos_text1) | |
| filtered_keys = get_keys(pos_text1) | |
| finalTagsList = getFinalTags(filtered_keys) | |
| tags_list = tags(finalTagsList) | |
| tags_list = list(set(tags_list)) | |
| return tags_list | |
| def classify(user_query): | |
| user_query = unquote(unquote(user_query)) | |
| eventdict = {} | |
| eventdict['tags'] = [] | |
| try: | |
| #Get Products | |
| tags_result = getTags(user_query) | |
| eventdict['tags'] = tags_result | |
| return eventdict | |
| except (ValueError, TypeError, AttributeError) as e: | |
| #print(e) | |
| return eventdict |