Spaces:
Sleeping
Sleeping
| # helper functions | |
| from typing import List, Dict, Tuple | |
| import re | |
| from sklearn.feature_extraction.text import CountVectorizer | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| import pandas as pd | |
| import numpy as np | |
| import pickle, json | |
| # from IPython.display import clear_output | |
| import spacy | |
| from spacy.tokens import DocBin | |
| from spacy.training import offsets_to_biluo_tags | |
| import en_fetch_ner_spacy_tsf | |
| nlp = en_fetch_ner_spacy_tsf.load() | |
| # clear_output() | |
| import nltk | |
| nltk.download('stopwords') | |
| from nltk.corpus import stopwords | |
| stop_words = set(stopwords.words('english')) | |
| additional_stop_words = {'pack'} | |
| stop_words.update(additional_stop_words) | |
| # clear_output() | |
| # load operation data | |
| path1 = "data/brand_belong_category_dict.json" | |
| path2 = "data/product_upper_category_dict.json" | |
| path3 = "data/offered_brands.pkl" | |
| path4 = "data/offer_retailer.csv" | |
| with open(path1, 'r') as f: | |
| brand_belong_category_dict = json.load(f) | |
| with open(path2, 'rb') as f: | |
| category_dict = json.load(f) | |
| with open(path3, 'rb') as f: | |
| offered_brands = pickle.load(f) | |
| df_offers_brand_retailer = pd.read_csv(path4) | |
| example_search = "Simply Spiked Lemonade 12 pack at Walmart" | |
| # helper functions | |
| def single_text_cleaner(text: str, remove_stopwords: bool=False, upper_case: bool = False, remove_punctuation: bool=True) -> str: | |
| """Clean one single text input. By default it will convert text to lower case""" | |
| if upper_case: | |
| text = text.upper() | |
| else: | |
| text = text.lower() | |
| if remove_punctuation: | |
| text = re.sub(r'[^a-z\s]', '', text) | |
| if remove_stopwords: | |
| words = text.split() | |
| words = [word for word in words if word not in stop_words] | |
| text = ' '.join(words) | |
| return text | |
| def list_text_cleaner(texts: List[str], upper_case: bool = False, remove_stopwords: bool = False, remove_punctuation: bool=True) -> List[str]: | |
| """Takes in a list of strings and returns a list of cleaned strings without stop words. | |
| Current tasks: | |
| - remove non-alphabetical characters | |
| - converting to lower cases | |
| - remove stop words (optional)""" | |
| cleaned_texts = [single_text_cleaner(text, remove_stopwords, upper_case, remove_punctuation) for text in texts] | |
| return cleaned_texts | |
| def match_product_category(s1: list[str], s2: list[str]) -> str: | |
| """Find if items of a list is in one list of product categories""" | |
| return next((p for c in s1 for p in s2 if c in p), None) # this will stop after finding first match, which saves time | |
| def find_category(search_input: str, search_dict: Dict) -> str: | |
| """Find the category of a search input based on a dictionary of categories | |
| Args: | |
| - search_input: a string | |
| - search_dict: a dictionary of product categories | |
| """ | |
| search_list = list_text_cleaner(re.split(r'[,\s]+', search_input), remove_stopwords=True) | |
| search_list = [c for c in search_list if len(c)>0] # sometimes there are empty strings | |
| matched_category = False | |
| for k, v in search_dict.items(): | |
| v = list_text_cleaner(v, remove_punctuation=False) | |
| search_results = match_product_category(search_list, v) | |
| if search_results is not None: | |
| matched_category = True | |
| return k, search_results | |
| else: | |
| # print(f'Function find_category: No category {k} has matched for input: {search_input}') | |
| continue | |
| if not matched_category: | |
| print(f'Function find_category: No category has matched for input: {search_input}') | |
| return None | |
| def check_entity(search_input) -> bool: | |
| """Takes in a search input and checks if it contains any entities""" | |
| doc = nlp(search_input) | |
| if len(doc.ents) > 0: | |
| return doc | |
| else: | |
| return False | |
| def get_cosine_sim(input_text: str, texts: List[str]) -> pd.DataFrame: | |
| """Calculate the cosine similarity of the input text against a list of texts | |
| Takes in: | |
| - input_text: a string | |
| - texts: a list of strings | |
| Returns a dataframe with two columns: Sentence Text and Cosine Similarity Score | |
| """ | |
| input_text_cleaned = list_text_cleaner([input_text], remove_stopwords=True)[0] | |
| cleaned_texts = list_text_cleaner(texts, remove_stopwords=True) | |
| all_texts = [input_text_cleaned] + cleaned_texts | |
| vectors = get_vectors(*all_texts) | |
| sim_matrix = cosine_similarity(vectors) | |
| # Get the similarity scores of the input_text against all other texts | |
| sim_scores = sim_matrix[0, 1:] | |
| data = {'OFFER': texts, 'Cosine Similarity Score': sim_scores} | |
| df = pd.DataFrame(data) | |
| df = df.sort_values(by='Cosine Similarity Score', ascending=False).reset_index(drop=True) | |
| return df | |
| def get_vectors(*strs: str) -> np.ndarray: | |
| text = list(strs) | |
| vectorizer = CountVectorizer() | |
| vectorizer.fit(text) | |
| return vectorizer.transform(text).toarray() | |
| def jaccard_similarity(s1: List[str], s2: List[str]) -> float: | |
| """Takes in two lists and returns the Jaccard similarity score (3 digits)""" | |
| intersection = set(s1).intersection(set(s2)) | |
| n = len(intersection) | |
| score = round(n / (len(s1) + len(s2) - n), 3) | |
| return score | |
| def get_jaccard_sim(input_text: str, texts: List[str]) -> pd.DataFrame: | |
| """Calculate the Jaccard similarity of the input text against a list of texts | |
| Takes in: | |
| - input_text: a string | |
| - texts: a list of strings | |
| Returns a dataframe with two columns: Sentence Text and Jaccard Similarity Score | |
| """ | |
| cleaned_input_text = list_text_cleaner([input_text], remove_stopwords=True)[0].split() | |
| cleaned_texts = list_text_cleaner(texts, remove_stopwords=True) | |
| jaccard_scores = [jaccard_similarity(cleaned_input_text, text.split()) for text in cleaned_texts] | |
| data = {'OFFER': texts, 'Jaccard Similarity Score': jaccard_scores} | |
| df = pd.DataFrame(data) | |
| # sort based on the similarity score | |
| df = df.sort_values(by='Jaccard Similarity Score', ascending=False).reset_index(drop=True) | |
| return df | |
| def find_column(df: pd.DataFrame, keyword: str) -> str: | |
| """Function to find the first column containing a specific keyword. Note that we assume there will only be one score at most for a similarity score dataframe""" | |
| cols = [col for col in df.columns if keyword.lower() in col.lower()] | |
| return cols[0] if cols else None | |
| def extract_similar_offers(data: pd.DataFrame, threshold: float = 0.0) -> pd.DataFrame: | |
| """Takes in the results from get_cosine_sim() and get_jaccard_sim(); returns a dataframe of similar offers with scores > threshold""" | |
| score = find_column(data, 'score') | |
| similar_offers = data[data[score] >= threshold] | |
| similar_offers[score] = similar_offers[score].apply(lambda x: round(x, 3)) # round to 3 digits | |
| return similar_offers | |
| def category_to_brand(category: str, offered_brands: List, brand_belong_category_dict: Dict) -> List[str]: | |
| """Use case: when a user searches for a category, we return a list of brands in that category""" | |
| # checks if the category is in the dictionary keys | |
| if category.upper() in brand_belong_category_dict.keys(): | |
| search_brands = brand_belong_category_dict[category.upper()] # becase all keys are in upper case | |
| result = list(set(search_brands) & set(offered_brands)) | |
| print(f"Function category_to_brand | Found {category} in offered brand") | |
| return result | |
| else: | |
| print(f"Function category_to_brand | No offered brand is found in {category}") | |
| return None | |
| class CatchErros(Exception): | |
| class ParamsInputError(Exception): | |
| pass | |
| class SearchFailedError(Exception): | |
| pass | |
| class UnknownError(Exception): | |
| pass | |
| def offer_finder_by_category(search_input: str, search_category_tuple: Tuple, category_dict: Dict, offers: pd.DataFrame, offered_brands: List, | |
| brand_belong_category_dict: Dict, score: str, threshold: float = 0.0) -> pd.DataFrame: | |
| """Find offers based on a category identified from search input. | |
| Args: | |
| - search_input: a string | |
| - search_category_tuple: a tuple of (upper_category, product_category) | |
| - category_dict: a dictionary of categories. Keys are upper categories and values are lists of product categories | |
| - offers: a dataframe of offers (OFFER, BRAND, RETAILER) that are avaialble in our database | |
| - offered_brands: a list of offers from offer.df | |
| - brand_belong_category_dict: a dictionary of brands and the categories they belong to | |
| - score: a string of either 'cosine' or 'jaccard' | |
| - threshold: a float between 0 and 1 | |
| Returns a dataframe of similar offers, ordered by highest score | |
| """ | |
| # we assume people just search one category at a time | |
| # search_category_tuple = find_category(search_input, category_dict) | |
| product_category, upper_category = search_category_tuple[1], search_category_tuple[0] # ('Alcohol', 'beer') | |
| print(f'Function offer_finder_by_category | Found items:\n- Search input: {search_input}\n- Product category: {product_category}\n- Upper category: {upper_category}') | |
| potential_brands = category_to_brand(product_category, offered_brands, brand_belong_category_dict) | |
| if potential_brands is not None: | |
| potential_offers = offers[offers['BRAND'].isin(potential_brands)]['OFFER'].tolist() | |
| if score == 'cosine': | |
| cos_sim_score = get_cosine_sim(search_input, potential_offers) | |
| output = extract_similar_offers(cos_sim_score, threshold) | |
| elif score == 'jaccard': | |
| jaccard_sim_score = get_jaccard_sim(search_input, potential_offers) | |
| output = extract_similar_offers(jaccard_sim_score, threshold) | |
| elif score not in ['cosine', 'jaccard']: | |
| raise ParamsInputError(f'Please enter a valid score: cosine or jaccard; Not {score}') | |
| else: # this means something else is worng | |
| raise UnknownError(f'Something must be broken. Please try again.') | |
| return output | |
| else: | |
| potential_product_categories = category_dict[upper_category] | |
| msg = f'{product_category} is not found. Do you wanna take a look at these similar offers in {upper_category}?\n We have: {potential_product_categories}' # we can still calculate similarity but this is computationally expensive | |
| print(msg) | |
| return None | |
| def offer_finder_by_entity(search_input: str, entities: Tuple, offers_data: pd.DataFrame, score: str, threshold: float=0.0) -> pd.DataFrame: | |
| """Find offers based on entities identified from search input. | |
| Args: | |
| - search_input: a string | |
| - entities: a tuple of entities | |
| - offers_data: a dataframe of offers (OFFER, BRAND, RETAILER) that are avaialble in our database | |
| - score: a string of either 'cosine' or 'jaccard' | |
| - threshold: a float between 0 and 1 | |
| Returns a dataframe of similar offers, ordered by highest score | |
| """ | |
| collects = [] # collect all the results if there are more than one entity | |
| for ent in entities: | |
| ent_name, ent_label = ent.text, ent.label_ | |
| print(f'Function offer_finder_by_entity | Found entity: {ent_name} with label: {ent_label}') | |
| # filter offers by entity | |
| df_tmp = offers_data[offers_data[ent_label.upper()] == ent_name.upper()] | |
| if df_tmp.shape[0] > 0: | |
| print(f'Function offer_finder_by_entity | Found {df_tmp.shape[0]} offer(s) for the brand/retailer: {ent_name}') | |
| potential_offers = df_tmp['OFFER'].drop_duplicates().tolist() | |
| if score == 'cosine': | |
| cos_sim_score = get_cosine_sim(search_input, potential_offers) | |
| output = extract_similar_offers(cos_sim_score, threshold) | |
| elif score == 'jaccard': | |
| jaccard_sim_score = get_jaccard_sim(search_input, potential_offers) | |
| output = extract_similar_offers(jaccard_sim_score, threshold) | |
| elif score not in ['cosine', 'jaccard']: | |
| raise ValueError(f'Please enter a valid score: cosine or jaccard; Not {score}') | |
| else: # this means something else is worng | |
| raise UnknownError(f'Something must be broken. Please try again.') | |
| collects.append(output) | |
| else: | |
| print(f'Function offer_finder_by_entity | No offer is found for the brand/retailer: {ent_name}') | |
| if len(collects) > 0: | |
| final_output = pd.concat(collects, ignore_index=True)# they should be using the same similarity score | |
| score = find_column(collects[0], 'score') | |
| final_output = final_output.sort_values(by=score, ascending=False).reset_index(drop=True) # sort final_output by score | |
| return final_output | |
| elif len(collects) == 1: | |
| return collects[0] | |
| else: | |
| print('###'*5 + 'FINAL SEARCH RESULTS' + '###'*5) | |
| print('Function offer_finder_by_entity | No offer is found for any of the entities.') | |
| return None | |
| def search_offers(search_input: str=example_search, offers: pd.DataFrame=df_offers_brand_retailer, offer_brands: List=offered_brands, | |
| category_dict: Dict=category_dict, brand_belong_category_dict: Dict=brand_belong_category_dict, | |
| score: str="jaccard", score_threshold: float = 0.0): | |
| """Main function. Takes in a serach_input and decide whether it can find entities or not. Then excecute the appropriate functions | |
| Inputs: | |
| - search_input: a string that a user enters | |
| - offers: a dataframe of offers (OFFER, BRAND, RETAILER) that are avaialble in our database | |
| - category_dict: a dictionary of categories. Keys are upper categories and values are lists of product categories | |
| - brand_belong_category_dict: a dictionary of brands and the categories they belong to | |
| - score: a string of either 'cosine' or 'jaccard' | |
| - score_threshold: a float between 0 and 1 | |
| Returns a dataframe of similar offers, ordered by highest score | |
| """ | |
| print(f'Function main | Search input: {search_input}') | |
| check_ent = check_entity(search_input) | |
| if not check_entity(search_input): # no entities found | |
| # check category | |
| cat_check = find_category(search_input, category_dict) | |
| if cat_check is None: | |
| print('No brand/retailer/category is found. Please try again.') | |
| return None | |
| else: | |
| # we assume people just search one category at a time | |
| cat_tuple = cat_check # ('Alcohol', 'beer') | |
| search_results = offer_finder_by_category(search_input, cat_tuple, category_dict, offers, offered_brands, brand_belong_category_dict, score, score_threshold) | |
| return search_results | |
| else: | |
| entities = check_ent.ents # entities will be a tuple anyways | |
| print(f'Found {len(entities)} entity object(s) in the search input.') | |
| search_results = offer_finder_by_entity(search_input, entities, offers, score, score_threshold) | |
| if search_results is None: | |
| print('No offers matched retailer/category is found. Now trying to recommend based on category.') | |
| cat_check = find_category(search_input, category_dict) | |
| if cat_check is None: | |
| print('No brand/retailer/category is found. Please try again.') | |
| return None | |
| else: | |
| cat_tuple = cat_check | |
| search_results = offer_finder_by_category(search_input, cat_tuple, category_dict, offers, offered_brands, brand_belong_category_dict, score, score_threshold) | |
| return search_results | |
| if __name__ == "__main__": | |
| search_offers() |