Spaces:
Sleeping
Sleeping
File size: 15,467 Bytes
6db931d 60ac2d1 22a112f 6db931d ee7e4ac 22a112f 6db931d 3fa408a 6db931d 3fa408a 22a112f 6db931d 43e574d 6db931d d564690 6db931d 03a395a 6db931d 329b6b4 6db931d 8389568 43e574d 6db931d 03a395a 6db931d 8389568 6db931d 249638f 03a395a 249638f 8389568 6db931d 35fe4a4 43e574d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 | # helper functions
from typing import List, Dict, Tuple
import re
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import pandas as pd
import numpy as np
import pickle, json
# from IPython.display import clear_output
import spacy
from spacy.tokens import DocBin
from spacy.training import offsets_to_biluo_tags
import en_fetch_ner_spacy_tsf
nlp = en_fetch_ner_spacy_tsf.load()
# clear_output()
import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords
stop_words = set(stopwords.words('english'))
additional_stop_words = {'pack'}
stop_words.update(additional_stop_words)
# clear_output()
# load operation data
path1 = "data/brand_belong_category_dict.json"
path2 = "data/product_upper_category_dict.json"
path3 = "data/offered_brands.pkl"
path4 = "data/offer_retailer.csv"
with open(path1, 'r') as f:
brand_belong_category_dict = json.load(f)
with open(path2, 'rb') as f:
category_dict = json.load(f)
with open(path3, 'rb') as f:
offered_brands = pickle.load(f)
df_offers_brand_retailer = pd.read_csv(path4)
example_search = "Simply Spiked Lemonade 12 pack at Walmart"
# helper functions
def single_text_cleaner(text: str, remove_stopwords: bool=False, upper_case: bool = False, remove_punctuation: bool=True) -> str:
"""Clean one single text input. By default it will convert text to lower case"""
if upper_case:
text = text.upper()
else:
text = text.lower()
if remove_punctuation:
text = re.sub(r'[^a-z\s]', '', text)
if remove_stopwords:
words = text.split()
words = [word for word in words if word not in stop_words]
text = ' '.join(words)
return text
def list_text_cleaner(texts: List[str], upper_case: bool = False, remove_stopwords: bool = False, remove_punctuation: bool=True) -> List[str]:
"""Takes in a list of strings and returns a list of cleaned strings without stop words.
Current tasks:
- remove non-alphabetical characters
- converting to lower cases
- remove stop words (optional)"""
cleaned_texts = [single_text_cleaner(text, remove_stopwords, upper_case, remove_punctuation) for text in texts]
return cleaned_texts
def match_product_category(s1: list[str], s2: list[str]) -> str:
"""Find if items of a list is in one list of product categories"""
return next((p for c in s1 for p in s2 if c in p), None) # this will stop after finding first match, which saves time
def find_category(search_input: str, search_dict: Dict) -> str:
"""Find the category of a search input based on a dictionary of categories
Args:
- search_input: a string
- search_dict: a dictionary of product categories
"""
search_list = list_text_cleaner(re.split(r'[,\s]+', search_input), remove_stopwords=True)
search_list = [c for c in search_list if len(c)>0] # sometimes there are empty strings
matched_category = False
for k, v in search_dict.items():
v = list_text_cleaner(v, remove_punctuation=False)
search_results = match_product_category(search_list, v)
if search_results is not None:
matched_category = True
return k, search_results
else:
# print(f'Function find_category: No category {k} has matched for input: {search_input}')
continue
if not matched_category:
print(f'Function find_category: No category has matched for input: {search_input}')
return None
def check_entity(search_input) -> bool:
"""Takes in a search input and checks if it contains any entities"""
doc = nlp(search_input)
if len(doc.ents) > 0:
return doc
else:
return False
def get_cosine_sim(input_text: str, texts: List[str]) -> pd.DataFrame:
"""Calculate the cosine similarity of the input text against a list of texts
Takes in:
- input_text: a string
- texts: a list of strings
Returns a dataframe with two columns: Sentence Text and Cosine Similarity Score
"""
input_text_cleaned = list_text_cleaner([input_text], remove_stopwords=True)[0]
cleaned_texts = list_text_cleaner(texts, remove_stopwords=True)
all_texts = [input_text_cleaned] + cleaned_texts
vectors = get_vectors(*all_texts)
sim_matrix = cosine_similarity(vectors)
# Get the similarity scores of the input_text against all other texts
sim_scores = sim_matrix[0, 1:]
data = {'OFFER': texts, 'Cosine Similarity Score': sim_scores}
df = pd.DataFrame(data)
df = df.sort_values(by='Cosine Similarity Score', ascending=False).reset_index(drop=True)
return df
def get_vectors(*strs: str) -> np.ndarray:
text = list(strs)
vectorizer = CountVectorizer()
vectorizer.fit(text)
return vectorizer.transform(text).toarray()
def jaccard_similarity(s1: List[str], s2: List[str]) -> float:
"""Takes in two lists and returns the Jaccard similarity score (3 digits)"""
intersection = set(s1).intersection(set(s2))
n = len(intersection)
score = round(n / (len(s1) + len(s2) - n), 3)
return score
def get_jaccard_sim(input_text: str, texts: List[str]) -> pd.DataFrame:
"""Calculate the Jaccard similarity of the input text against a list of texts
Takes in:
- input_text: a string
- texts: a list of strings
Returns a dataframe with two columns: Sentence Text and Jaccard Similarity Score
"""
cleaned_input_text = list_text_cleaner([input_text], remove_stopwords=True)[0].split()
cleaned_texts = list_text_cleaner(texts, remove_stopwords=True)
jaccard_scores = [jaccard_similarity(cleaned_input_text, text.split()) for text in cleaned_texts]
data = {'OFFER': texts, 'Jaccard Similarity Score': jaccard_scores}
df = pd.DataFrame(data)
# sort based on the similarity score
df = df.sort_values(by='Jaccard Similarity Score', ascending=False).reset_index(drop=True)
return df
def find_column(df: pd.DataFrame, keyword: str) -> str:
"""Function to find the first column containing a specific keyword. Note that we assume there will only be one score at most for a similarity score dataframe"""
cols = [col for col in df.columns if keyword.lower() in col.lower()]
return cols[0] if cols else None
def extract_similar_offers(data: pd.DataFrame, threshold: float = 0.0) -> pd.DataFrame:
"""Takes in the results from get_cosine_sim() and get_jaccard_sim(); returns a dataframe of similar offers with scores > threshold"""
score = find_column(data, 'score')
similar_offers = data[data[score] >= threshold]
similar_offers[score] = similar_offers[score].apply(lambda x: round(x, 3)) # round to 3 digits
return similar_offers
def category_to_brand(category: str, offered_brands: List, brand_belong_category_dict: Dict) -> List[str]:
"""Use case: when a user searches for a category, we return a list of brands in that category"""
# checks if the category is in the dictionary keys
if category.upper() in brand_belong_category_dict.keys():
search_brands = brand_belong_category_dict[category.upper()] # becase all keys are in upper case
result = list(set(search_brands) & set(offered_brands))
print(f"Function category_to_brand | Found {category} in offered brand")
return result
else:
print(f"Function category_to_brand | No offered brand is found in {category}")
return None
class CatchErros(Exception):
class ParamsInputError(Exception):
pass
class SearchFailedError(Exception):
pass
class UnknownError(Exception):
pass
def offer_finder_by_category(search_input: str, search_category_tuple: Tuple, category_dict: Dict, offers: pd.DataFrame, offered_brands: List,
brand_belong_category_dict: Dict, score: str, threshold: float = 0.0) -> pd.DataFrame:
"""Find offers based on a category identified from search input.
Args:
- search_input: a string
- search_category_tuple: a tuple of (upper_category, product_category)
- category_dict: a dictionary of categories. Keys are upper categories and values are lists of product categories
- offers: a dataframe of offers (OFFER, BRAND, RETAILER) that are avaialble in our database
- offered_brands: a list of offers from offer.df
- brand_belong_category_dict: a dictionary of brands and the categories they belong to
- score: a string of either 'cosine' or 'jaccard'
- threshold: a float between 0 and 1
Returns a dataframe of similar offers, ordered by highest score
"""
# we assume people just search one category at a time
# search_category_tuple = find_category(search_input, category_dict)
product_category, upper_category = search_category_tuple[1], search_category_tuple[0] # ('Alcohol', 'beer')
print(f'Function offer_finder_by_category | Found items:\n- Search input: {search_input}\n- Product category: {product_category}\n- Upper category: {upper_category}')
potential_brands = category_to_brand(product_category, offered_brands, brand_belong_category_dict)
if potential_brands is not None:
potential_offers = offers[offers['BRAND'].isin(potential_brands)]['OFFER'].tolist()
if score == 'cosine':
cos_sim_score = get_cosine_sim(search_input, potential_offers)
output = extract_similar_offers(cos_sim_score, threshold)
elif score == 'jaccard':
jaccard_sim_score = get_jaccard_sim(search_input, potential_offers)
output = extract_similar_offers(jaccard_sim_score, threshold)
elif score not in ['cosine', 'jaccard']:
raise ParamsInputError(f'Please enter a valid score: cosine or jaccard; Not {score}')
else: # this means something else is worng
raise UnknownError(f'Something must be broken. Please try again.')
return output
else:
potential_product_categories = category_dict[upper_category]
msg = f'{product_category} is not found. Do you wanna take a look at these similar offers in {upper_category}?\n We have: {potential_product_categories}' # we can still calculate similarity but this is computationally expensive
print(msg)
return None
def offer_finder_by_entity(search_input: str, entities: Tuple, offers_data: pd.DataFrame, score: str, threshold: float=0.0) -> pd.DataFrame:
"""Find offers based on entities identified from search input.
Args:
- search_input: a string
- entities: a tuple of entities
- offers_data: a dataframe of offers (OFFER, BRAND, RETAILER) that are avaialble in our database
- score: a string of either 'cosine' or 'jaccard'
- threshold: a float between 0 and 1
Returns a dataframe of similar offers, ordered by highest score
"""
collects = [] # collect all the results if there are more than one entity
for ent in entities:
ent_name, ent_label = ent.text, ent.label_
print(f'Function offer_finder_by_entity | Found entity: {ent_name} with label: {ent_label}')
# filter offers by entity
df_tmp = offers_data[offers_data[ent_label.upper()] == ent_name.upper()]
if df_tmp.shape[0] > 0:
print(f'Function offer_finder_by_entity | Found {df_tmp.shape[0]} offer(s) for the brand/retailer: {ent_name}')
potential_offers = df_tmp['OFFER'].drop_duplicates().tolist()
if score == 'cosine':
cos_sim_score = get_cosine_sim(search_input, potential_offers)
output = extract_similar_offers(cos_sim_score, threshold)
elif score == 'jaccard':
jaccard_sim_score = get_jaccard_sim(search_input, potential_offers)
output = extract_similar_offers(jaccard_sim_score, threshold)
elif score not in ['cosine', 'jaccard']:
raise ValueError(f'Please enter a valid score: cosine or jaccard; Not {score}')
else: # this means something else is worng
raise UnknownError(f'Something must be broken. Please try again.')
collects.append(output)
else:
print(f'Function offer_finder_by_entity | No offer is found for the brand/retailer: {ent_name}')
if len(collects) > 0:
final_output = pd.concat(collects, ignore_index=True)# they should be using the same similarity score
score = find_column(collects[0], 'score')
final_output = final_output.sort_values(by=score, ascending=False).reset_index(drop=True) # sort final_output by score
return final_output
elif len(collects) == 1:
return collects[0]
else:
print('###'*5 + 'FINAL SEARCH RESULTS' + '###'*5)
print('Function offer_finder_by_entity | No offer is found for any of the entities.')
return None
def search_offers(search_input: str=example_search, offers: pd.DataFrame=df_offers_brand_retailer, offer_brands: List=offered_brands,
category_dict: Dict=category_dict, brand_belong_category_dict: Dict=brand_belong_category_dict,
score: str="jaccard", score_threshold: float = 0.0):
"""Main function. Takes in a serach_input and decide whether it can find entities or not. Then excecute the appropriate functions
Inputs:
- search_input: a string that a user enters
- offers: a dataframe of offers (OFFER, BRAND, RETAILER) that are avaialble in our database
- category_dict: a dictionary of categories. Keys are upper categories and values are lists of product categories
- brand_belong_category_dict: a dictionary of brands and the categories they belong to
- score: a string of either 'cosine' or 'jaccard'
- score_threshold: a float between 0 and 1
Returns a dataframe of similar offers, ordered by highest score
"""
print(f'Function main | Search input: {search_input}')
check_ent = check_entity(search_input)
if not check_entity(search_input): # no entities found
# check category
cat_check = find_category(search_input, category_dict)
if cat_check is None:
print('No brand/retailer/category is found. Please try again.')
return None
else:
# we assume people just search one category at a time
cat_tuple = cat_check # ('Alcohol', 'beer')
search_results = offer_finder_by_category(search_input, cat_tuple, category_dict, offers, offered_brands, brand_belong_category_dict, score, score_threshold)
return search_results
else:
entities = check_ent.ents # entities will be a tuple anyways
print(f'Found {len(entities)} entity object(s) in the search input.')
search_results = offer_finder_by_entity(search_input, entities, offers, score, score_threshold)
if search_results is None:
print('No offers matched retailer/category is found. Now trying to recommend based on category.')
cat_check = find_category(search_input, category_dict)
if cat_check is None:
print('No brand/retailer/category is found. Please try again.')
return None
else:
cat_tuple = cat_check
search_results = offer_finder_by_category(search_input, cat_tuple, category_dict, offers, offered_brands, brand_belong_category_dict, score, score_threshold)
return search_results
if __name__ == "__main__":
search_offers() |