fetch_ner / src /utils.py
hjianganthony's picture
Update src/utils.py
03a395a
# helper functions
from typing import List, Dict, Tuple
import re
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import pandas as pd
import numpy as np
import pickle, json
# from IPython.display import clear_output
import spacy
from spacy.tokens import DocBin
from spacy.training import offsets_to_biluo_tags
import en_fetch_ner_spacy_tsf
nlp = en_fetch_ner_spacy_tsf.load()
# clear_output()
import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords
stop_words = set(stopwords.words('english'))
additional_stop_words = {'pack'}
stop_words.update(additional_stop_words)
# clear_output()
# load operation data
path1 = "data/brand_belong_category_dict.json"
path2 = "data/product_upper_category_dict.json"
path3 = "data/offered_brands.pkl"
path4 = "data/offer_retailer.csv"
with open(path1, 'r') as f:
brand_belong_category_dict = json.load(f)
with open(path2, 'rb') as f:
category_dict = json.load(f)
with open(path3, 'rb') as f:
offered_brands = pickle.load(f)
df_offers_brand_retailer = pd.read_csv(path4)
example_search = "Simply Spiked Lemonade 12 pack at Walmart"
# helper functions
def single_text_cleaner(text: str, remove_stopwords: bool=False, upper_case: bool = False, remove_punctuation: bool=True) -> str:
"""Clean one single text input. By default it will convert text to lower case"""
if upper_case:
text = text.upper()
else:
text = text.lower()
if remove_punctuation:
text = re.sub(r'[^a-z\s]', '', text)
if remove_stopwords:
words = text.split()
words = [word for word in words if word not in stop_words]
text = ' '.join(words)
return text
def list_text_cleaner(texts: List[str], upper_case: bool = False, remove_stopwords: bool = False, remove_punctuation: bool=True) -> List[str]:
"""Takes in a list of strings and returns a list of cleaned strings without stop words.
Current tasks:
- remove non-alphabetical characters
- converting to lower cases
- remove stop words (optional)"""
cleaned_texts = [single_text_cleaner(text, remove_stopwords, upper_case, remove_punctuation) for text in texts]
return cleaned_texts
def match_product_category(s1: list[str], s2: list[str]) -> str:
"""Find if items of a list is in one list of product categories"""
return next((p for c in s1 for p in s2 if c in p), None) # this will stop after finding first match, which saves time
def find_category(search_input: str, search_dict: Dict) -> str:
"""Find the category of a search input based on a dictionary of categories
Args:
- search_input: a string
- search_dict: a dictionary of product categories
"""
search_list = list_text_cleaner(re.split(r'[,\s]+', search_input), remove_stopwords=True)
search_list = [c for c in search_list if len(c)>0] # sometimes there are empty strings
matched_category = False
for k, v in search_dict.items():
v = list_text_cleaner(v, remove_punctuation=False)
search_results = match_product_category(search_list, v)
if search_results is not None:
matched_category = True
return k, search_results
else:
# print(f'Function find_category: No category {k} has matched for input: {search_input}')
continue
if not matched_category:
print(f'Function find_category: No category has matched for input: {search_input}')
return None
def check_entity(search_input) -> bool:
"""Takes in a search input and checks if it contains any entities"""
doc = nlp(search_input)
if len(doc.ents) > 0:
return doc
else:
return False
def get_cosine_sim(input_text: str, texts: List[str]) -> pd.DataFrame:
"""Calculate the cosine similarity of the input text against a list of texts
Takes in:
- input_text: a string
- texts: a list of strings
Returns a dataframe with two columns: Sentence Text and Cosine Similarity Score
"""
input_text_cleaned = list_text_cleaner([input_text], remove_stopwords=True)[0]
cleaned_texts = list_text_cleaner(texts, remove_stopwords=True)
all_texts = [input_text_cleaned] + cleaned_texts
vectors = get_vectors(*all_texts)
sim_matrix = cosine_similarity(vectors)
# Get the similarity scores of the input_text against all other texts
sim_scores = sim_matrix[0, 1:]
data = {'OFFER': texts, 'Cosine Similarity Score': sim_scores}
df = pd.DataFrame(data)
df = df.sort_values(by='Cosine Similarity Score', ascending=False).reset_index(drop=True)
return df
def get_vectors(*strs: str) -> np.ndarray:
text = list(strs)
vectorizer = CountVectorizer()
vectorizer.fit(text)
return vectorizer.transform(text).toarray()
def jaccard_similarity(s1: List[str], s2: List[str]) -> float:
"""Takes in two lists and returns the Jaccard similarity score (3 digits)"""
intersection = set(s1).intersection(set(s2))
n = len(intersection)
score = round(n / (len(s1) + len(s2) - n), 3)
return score
def get_jaccard_sim(input_text: str, texts: List[str]) -> pd.DataFrame:
"""Calculate the Jaccard similarity of the input text against a list of texts
Takes in:
- input_text: a string
- texts: a list of strings
Returns a dataframe with two columns: Sentence Text and Jaccard Similarity Score
"""
cleaned_input_text = list_text_cleaner([input_text], remove_stopwords=True)[0].split()
cleaned_texts = list_text_cleaner(texts, remove_stopwords=True)
jaccard_scores = [jaccard_similarity(cleaned_input_text, text.split()) for text in cleaned_texts]
data = {'OFFER': texts, 'Jaccard Similarity Score': jaccard_scores}
df = pd.DataFrame(data)
# sort based on the similarity score
df = df.sort_values(by='Jaccard Similarity Score', ascending=False).reset_index(drop=True)
return df
def find_column(df: pd.DataFrame, keyword: str) -> str:
"""Function to find the first column containing a specific keyword. Note that we assume there will only be one score at most for a similarity score dataframe"""
cols = [col for col in df.columns if keyword.lower() in col.lower()]
return cols[0] if cols else None
def extract_similar_offers(data: pd.DataFrame, threshold: float = 0.0) -> pd.DataFrame:
"""Takes in the results from get_cosine_sim() and get_jaccard_sim(); returns a dataframe of similar offers with scores > threshold"""
score = find_column(data, 'score')
similar_offers = data[data[score] >= threshold]
similar_offers[score] = similar_offers[score].apply(lambda x: round(x, 3)) # round to 3 digits
return similar_offers
def category_to_brand(category: str, offered_brands: List, brand_belong_category_dict: Dict) -> List[str]:
"""Use case: when a user searches for a category, we return a list of brands in that category"""
# checks if the category is in the dictionary keys
if category.upper() in brand_belong_category_dict.keys():
search_brands = brand_belong_category_dict[category.upper()] # becase all keys are in upper case
result = list(set(search_brands) & set(offered_brands))
print(f"Function category_to_brand | Found {category} in offered brand")
return result
else:
print(f"Function category_to_brand | No offered brand is found in {category}")
return None
class CatchErros(Exception):
class ParamsInputError(Exception):
pass
class SearchFailedError(Exception):
pass
class UnknownError(Exception):
pass
def offer_finder_by_category(search_input: str, search_category_tuple: Tuple, category_dict: Dict, offers: pd.DataFrame, offered_brands: List,
brand_belong_category_dict: Dict, score: str, threshold: float = 0.0) -> pd.DataFrame:
"""Find offers based on a category identified from search input.
Args:
- search_input: a string
- search_category_tuple: a tuple of (upper_category, product_category)
- category_dict: a dictionary of categories. Keys are upper categories and values are lists of product categories
- offers: a dataframe of offers (OFFER, BRAND, RETAILER) that are avaialble in our database
- offered_brands: a list of offers from offer.df
- brand_belong_category_dict: a dictionary of brands and the categories they belong to
- score: a string of either 'cosine' or 'jaccard'
- threshold: a float between 0 and 1
Returns a dataframe of similar offers, ordered by highest score
"""
# we assume people just search one category at a time
# search_category_tuple = find_category(search_input, category_dict)
product_category, upper_category = search_category_tuple[1], search_category_tuple[0] # ('Alcohol', 'beer')
print(f'Function offer_finder_by_category | Found items:\n- Search input: {search_input}\n- Product category: {product_category}\n- Upper category: {upper_category}')
potential_brands = category_to_brand(product_category, offered_brands, brand_belong_category_dict)
if potential_brands is not None:
potential_offers = offers[offers['BRAND'].isin(potential_brands)]['OFFER'].tolist()
if score == 'cosine':
cos_sim_score = get_cosine_sim(search_input, potential_offers)
output = extract_similar_offers(cos_sim_score, threshold)
elif score == 'jaccard':
jaccard_sim_score = get_jaccard_sim(search_input, potential_offers)
output = extract_similar_offers(jaccard_sim_score, threshold)
elif score not in ['cosine', 'jaccard']:
raise ParamsInputError(f'Please enter a valid score: cosine or jaccard; Not {score}')
else: # this means something else is worng
raise UnknownError(f'Something must be broken. Please try again.')
return output
else:
potential_product_categories = category_dict[upper_category]
msg = f'{product_category} is not found. Do you wanna take a look at these similar offers in {upper_category}?\n We have: {potential_product_categories}' # we can still calculate similarity but this is computationally expensive
print(msg)
return None
def offer_finder_by_entity(search_input: str, entities: Tuple, offers_data: pd.DataFrame, score: str, threshold: float=0.0) -> pd.DataFrame:
"""Find offers based on entities identified from search input.
Args:
- search_input: a string
- entities: a tuple of entities
- offers_data: a dataframe of offers (OFFER, BRAND, RETAILER) that are avaialble in our database
- score: a string of either 'cosine' or 'jaccard'
- threshold: a float between 0 and 1
Returns a dataframe of similar offers, ordered by highest score
"""
collects = [] # collect all the results if there are more than one entity
for ent in entities:
ent_name, ent_label = ent.text, ent.label_
print(f'Function offer_finder_by_entity | Found entity: {ent_name} with label: {ent_label}')
# filter offers by entity
df_tmp = offers_data[offers_data[ent_label.upper()] == ent_name.upper()]
if df_tmp.shape[0] > 0:
print(f'Function offer_finder_by_entity | Found {df_tmp.shape[0]} offer(s) for the brand/retailer: {ent_name}')
potential_offers = df_tmp['OFFER'].drop_duplicates().tolist()
if score == 'cosine':
cos_sim_score = get_cosine_sim(search_input, potential_offers)
output = extract_similar_offers(cos_sim_score, threshold)
elif score == 'jaccard':
jaccard_sim_score = get_jaccard_sim(search_input, potential_offers)
output = extract_similar_offers(jaccard_sim_score, threshold)
elif score not in ['cosine', 'jaccard']:
raise ValueError(f'Please enter a valid score: cosine or jaccard; Not {score}')
else: # this means something else is worng
raise UnknownError(f'Something must be broken. Please try again.')
collects.append(output)
else:
print(f'Function offer_finder_by_entity | No offer is found for the brand/retailer: {ent_name}')
if len(collects) > 0:
final_output = pd.concat(collects, ignore_index=True)# they should be using the same similarity score
score = find_column(collects[0], 'score')
final_output = final_output.sort_values(by=score, ascending=False).reset_index(drop=True) # sort final_output by score
return final_output
elif len(collects) == 1:
return collects[0]
else:
print('###'*5 + 'FINAL SEARCH RESULTS' + '###'*5)
print('Function offer_finder_by_entity | No offer is found for any of the entities.')
return None
def search_offers(search_input: str=example_search, offers: pd.DataFrame=df_offers_brand_retailer, offer_brands: List=offered_brands,
category_dict: Dict=category_dict, brand_belong_category_dict: Dict=brand_belong_category_dict,
score: str="jaccard", score_threshold: float = 0.0):
"""Main function. Takes in a serach_input and decide whether it can find entities or not. Then excecute the appropriate functions
Inputs:
- search_input: a string that a user enters
- offers: a dataframe of offers (OFFER, BRAND, RETAILER) that are avaialble in our database
- category_dict: a dictionary of categories. Keys are upper categories and values are lists of product categories
- brand_belong_category_dict: a dictionary of brands and the categories they belong to
- score: a string of either 'cosine' or 'jaccard'
- score_threshold: a float between 0 and 1
Returns a dataframe of similar offers, ordered by highest score
"""
print(f'Function main | Search input: {search_input}')
check_ent = check_entity(search_input)
if not check_entity(search_input): # no entities found
# check category
cat_check = find_category(search_input, category_dict)
if cat_check is None:
print('No brand/retailer/category is found. Please try again.')
return None
else:
# we assume people just search one category at a time
cat_tuple = cat_check # ('Alcohol', 'beer')
search_results = offer_finder_by_category(search_input, cat_tuple, category_dict, offers, offered_brands, brand_belong_category_dict, score, score_threshold)
return search_results
else:
entities = check_ent.ents # entities will be a tuple anyways
print(f'Found {len(entities)} entity object(s) in the search input.')
search_results = offer_finder_by_entity(search_input, entities, offers, score, score_threshold)
if search_results is None:
print('No offers matched retailer/category is found. Now trying to recommend based on category.')
cat_check = find_category(search_input, category_dict)
if cat_check is None:
print('No brand/retailer/category is found. Please try again.')
return None
else:
cat_tuple = cat_check
search_results = offer_finder_by_category(search_input, cat_tuple, category_dict, offers, offered_brands, brand_belong_category_dict, score, score_threshold)
return search_results
if __name__ == "__main__":
search_offers()