File size: 15,467 Bytes
6db931d
 
 
 
 
 
 
60ac2d1
22a112f
6db931d
 
 
 
ee7e4ac
 
22a112f
6db931d
3fa408a
 
6db931d
3fa408a
 
 
 
22a112f
6db931d
 
43e574d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6db931d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d564690
6db931d
 
 
 
 
 
03a395a
 
6db931d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
329b6b4
6db931d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8389568
43e574d
 
6db931d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
03a395a
 
6db931d
 
 
8389568
6db931d
 
 
 
 
249638f
 
 
 
03a395a
 
249638f
 
8389568
6db931d
35fe4a4
 
43e574d
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
# helper functions
from typing import List, Dict, Tuple
import re
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import pandas as pd
import numpy as np
import pickle, json
# from IPython.display import clear_output

import spacy
from spacy.tokens import DocBin
from spacy.training import offsets_to_biluo_tags
import en_fetch_ner_spacy_tsf
nlp = en_fetch_ner_spacy_tsf.load()
# clear_output()

import nltk
nltk.download('stopwords')

from nltk.corpus import stopwords
stop_words = set(stopwords.words('english'))
additional_stop_words = {'pack'}
stop_words.update(additional_stop_words)
# clear_output()


# load operation data
path1 = "data/brand_belong_category_dict.json"
path2 = "data/product_upper_category_dict.json"
path3 = "data/offered_brands.pkl"
path4 = "data/offer_retailer.csv"

with open(path1, 'r') as f:
    brand_belong_category_dict = json.load(f)

with open(path2, 'rb') as f:
    category_dict = json.load(f)

with open(path3, 'rb') as f:
    offered_brands = pickle.load(f)

df_offers_brand_retailer = pd.read_csv(path4)

example_search = "Simply Spiked Lemonade 12 pack at Walmart"

# helper functions

def single_text_cleaner(text: str, remove_stopwords: bool=False, upper_case: bool = False, remove_punctuation: bool=True) -> str:
    """Clean one single text input. By default it will convert text to lower case"""
    if upper_case:
        text = text.upper()
    else:
        text = text.lower()
    if remove_punctuation:
        text = re.sub(r'[^a-z\s]', '', text)
    if remove_stopwords:
        words = text.split()
        words = [word for word in words if word not in stop_words]
        text = ' '.join(words)
    return text

def list_text_cleaner(texts: List[str], upper_case: bool = False, remove_stopwords: bool = False, remove_punctuation: bool=True) -> List[str]:
    """Takes in a list of strings and returns a list of cleaned strings without stop words. 
    Current tasks: 
    - remove non-alphabetical characters
    - converting to lower cases
    - remove stop words (optional)"""
    cleaned_texts = [single_text_cleaner(text, remove_stopwords, upper_case, remove_punctuation) for text in texts]
    return cleaned_texts

def match_product_category(s1: list[str], s2: list[str]) -> str:
    """Find if items of a list is in one list of product categories"""
    return next((p for c in s1 for p in s2 if c in p), None) # this will stop after finding first match, which saves time

def find_category(search_input: str, search_dict: Dict) -> str:
    """Find the category of a search input based on a dictionary of categories
    Args:
    - search_input: a string
    - search_dict: a dictionary of product categories
    """
    search_list = list_text_cleaner(re.split(r'[,\s]+', search_input), remove_stopwords=True)
    search_list = [c for c in search_list if len(c)>0] # sometimes there are empty strings
    matched_category = False
    for k, v in search_dict.items():
        v = list_text_cleaner(v, remove_punctuation=False)
        search_results = match_product_category(search_list, v)
        if search_results is not None:
            matched_category = True
            return k, search_results
        else:
            # print(f'Function find_category: No category {k} has matched for input: {search_input}') 
            continue
    if not matched_category:
        print(f'Function find_category: No category has matched for input: {search_input}')
        return None
    

def check_entity(search_input) -> bool:
    """Takes in a search input and checks if it contains any entities"""
    doc = nlp(search_input)
    if len(doc.ents) > 0:
        return doc
    else:
        return False

def get_cosine_sim(input_text: str, texts: List[str]) -> pd.DataFrame:
    """Calculate the cosine similarity of the input text against a list of texts
    Takes in:
    - input_text: a string
    - texts: a list of strings
    Returns a dataframe with two columns: Sentence Text and Cosine Similarity Score
    """
    input_text_cleaned = list_text_cleaner([input_text], remove_stopwords=True)[0]
    cleaned_texts = list_text_cleaner(texts, remove_stopwords=True)
    all_texts = [input_text_cleaned] + cleaned_texts
    vectors = get_vectors(*all_texts)
    sim_matrix = cosine_similarity(vectors)
    # Get the similarity scores of the input_text against all other texts
    sim_scores = sim_matrix[0, 1:]
    data = {'OFFER': texts, 'Cosine Similarity Score': sim_scores}
    df = pd.DataFrame(data)
    df = df.sort_values(by='Cosine Similarity Score', ascending=False).reset_index(drop=True)
    return df

def get_vectors(*strs: str) -> np.ndarray:
    text = list(strs)
    vectorizer = CountVectorizer()
    vectorizer.fit(text)
    return vectorizer.transform(text).toarray()

def jaccard_similarity(s1: List[str], s2: List[str]) -> float:
    """Takes in two lists and returns the Jaccard similarity score (3 digits)"""
    intersection = set(s1).intersection(set(s2))
    n = len(intersection)
    score = round(n / (len(s1) + len(s2) - n), 3)
    return score

def get_jaccard_sim(input_text: str, texts: List[str]) -> pd.DataFrame:
    """Calculate the Jaccard similarity of the input text against a list of texts
    Takes in:
    - input_text: a string
    - texts: a list of strings
    Returns a dataframe with two columns: Sentence Text and Jaccard Similarity Score
    """
    cleaned_input_text = list_text_cleaner([input_text], remove_stopwords=True)[0].split()
    cleaned_texts = list_text_cleaner(texts, remove_stopwords=True)
    
    jaccard_scores = [jaccard_similarity(cleaned_input_text, text.split()) for text in cleaned_texts]
    
    data = {'OFFER': texts, 'Jaccard Similarity Score': jaccard_scores}
    df = pd.DataFrame(data)
    # sort based on the similarity score
    df = df.sort_values(by='Jaccard Similarity Score', ascending=False).reset_index(drop=True)
    return df

def find_column(df: pd.DataFrame, keyword: str) -> str:
    """Function to find the first column containing a specific keyword. Note that we assume there will only be one score at most for a similarity score dataframe"""
    cols = [col for col in df.columns if keyword.lower() in col.lower()]
    return cols[0] if cols else None

def extract_similar_offers(data: pd.DataFrame, threshold: float = 0.0) -> pd.DataFrame:
    """Takes in the results from get_cosine_sim() and get_jaccard_sim(); returns a dataframe of similar offers with scores > threshold"""
    score = find_column(data, 'score')
    similar_offers = data[data[score] >= threshold]
    similar_offers[score] = similar_offers[score].apply(lambda x: round(x, 3)) # round to 3 digits
    return similar_offers

def category_to_brand(category: str, offered_brands: List, brand_belong_category_dict: Dict) -> List[str]:
    """Use case: when a user searches for a category, we return a list of brands in that category"""
    # checks if the category is in the dictionary keys
    if category.upper() in brand_belong_category_dict.keys():
        search_brands = brand_belong_category_dict[category.upper()] # becase all keys are in upper case
        result = list(set(search_brands) & set(offered_brands))
        print(f"Function category_to_brand | Found {category} in offered brand") 
        return result 
    else:
        print(f"Function category_to_brand | No offered brand is found in {category}")
        return None

class CatchErros(Exception):
    class ParamsInputError(Exception):
        pass
    class SearchFailedError(Exception):
        pass
    class UnknownError(Exception):
        pass


def offer_finder_by_category(search_input: str, search_category_tuple: Tuple, category_dict: Dict, offers: pd.DataFrame, offered_brands: List, 
                             brand_belong_category_dict: Dict, score: str, threshold: float = 0.0) -> pd.DataFrame:
    """Find offers based on a category identified from search input.
    Args:
    - search_input: a string
    - search_category_tuple: a tuple of (upper_category, product_category)
    - category_dict: a dictionary of categories. Keys are upper categories and values are lists of product categories
    - offers:  a dataframe of offers (OFFER, BRAND, RETAILER) that are avaialble in our database
    - offered_brands: a list of offers from offer.df
    - brand_belong_category_dict: a dictionary of brands and the categories they belong to
    - score: a string of either 'cosine' or 'jaccard'
    - threshold: a float between 0 and 1

    Returns a dataframe of similar offers, ordered by highest score
    """
    # we assume people just search one category at a time
    # search_category_tuple = find_category(search_input, category_dict)
    product_category, upper_category = search_category_tuple[1], search_category_tuple[0] # ('Alcohol', 'beer')
    print(f'Function offer_finder_by_category | Found items:\n- Search input: {search_input}\n- Product category: {product_category}\n- Upper category: {upper_category}')
    potential_brands = category_to_brand(product_category, offered_brands, brand_belong_category_dict)
    if potential_brands is not None:
        potential_offers = offers[offers['BRAND'].isin(potential_brands)]['OFFER'].tolist()
        if score == 'cosine':
            cos_sim_score = get_cosine_sim(search_input, potential_offers)
            output = extract_similar_offers(cos_sim_score, threshold)
        elif score == 'jaccard':
            jaccard_sim_score = get_jaccard_sim(search_input, potential_offers)
            output = extract_similar_offers(jaccard_sim_score, threshold)
        elif score not in ['cosine', 'jaccard']:
            raise ParamsInputError(f'Please enter a valid score: cosine or jaccard; Not {score}')
        else: # this means something else is worng
            raise UnknownError(f'Something must be broken. Please try again.')
        return output
    else:
        potential_product_categories = category_dict[upper_category]
        msg = f'{product_category} is not found. Do you wanna take a look at these similar offers in {upper_category}?\n We have: {potential_product_categories}' # we can still calculate similarity but this is computationally expensive
        print(msg)
        return None

def offer_finder_by_entity(search_input: str, entities: Tuple, offers_data: pd.DataFrame, score: str, threshold: float=0.0) -> pd.DataFrame:
    """Find offers based on entities identified from search input.
    Args:
    - search_input: a string
    - entities: a tuple of entities
    - offers_data: a dataframe of offers (OFFER, BRAND, RETAILER) that are avaialble in our database
    - score: a string of either 'cosine' or 'jaccard'
    - threshold: a float between 0 and 1

    Returns a dataframe of similar offers, ordered by highest score
    """
    collects = [] # collect all the results if there are more than one entity
    for ent in entities:
        ent_name, ent_label = ent.text, ent.label_
        print(f'Function offer_finder_by_entity | Found entity: {ent_name} with label: {ent_label}')
        # filter offers by entity
        df_tmp = offers_data[offers_data[ent_label.upper()] == ent_name.upper()]
        if df_tmp.shape[0] > 0:
            print(f'Function offer_finder_by_entity | Found {df_tmp.shape[0]} offer(s) for the brand/retailer: {ent_name}')
            potential_offers = df_tmp['OFFER'].drop_duplicates().tolist()
            if score == 'cosine':
                cos_sim_score = get_cosine_sim(search_input, potential_offers)
                output = extract_similar_offers(cos_sim_score, threshold)
            elif score == 'jaccard':
                jaccard_sim_score = get_jaccard_sim(search_input, potential_offers)
                output = extract_similar_offers(jaccard_sim_score, threshold)
            elif score not in ['cosine', 'jaccard']:
                raise ValueError(f'Please enter a valid score: cosine or jaccard; Not {score}')
            else: # this means something else is worng
                raise UnknownError(f'Something must be broken. Please try again.')
            collects.append(output)
        else:
            print(f'Function offer_finder_by_entity | No offer is found for the brand/retailer: {ent_name}')

    if len(collects) > 0:
        final_output = pd.concat(collects, ignore_index=True)# they should be using the same similarity score
        score = find_column(collects[0], 'score') 
        final_output = final_output.sort_values(by=score, ascending=False).reset_index(drop=True) # sort final_output by score
        return final_output
    elif len(collects) == 1:
        return collects[0]
    else:
        print('###'*5 + 'FINAL SEARCH RESULTS' + '###'*5)
        print('Function offer_finder_by_entity | No offer is found for any of the entities.')
        return None


def search_offers(search_input: str=example_search, offers: pd.DataFrame=df_offers_brand_retailer, offer_brands: List=offered_brands,
                  category_dict: Dict=category_dict, brand_belong_category_dict: Dict=brand_belong_category_dict, 
                  score: str="jaccard", score_threshold: float = 0.0):
    """Main function. Takes in a serach_input and decide whether it can find entities or not. Then excecute the appropriate functions
    Inputs:
    - search_input: a string that a user enters
    - offers: a dataframe of offers (OFFER, BRAND, RETAILER) that are avaialble in our database
    - category_dict: a dictionary of categories. Keys are upper categories and values are lists of product categories
    - brand_belong_category_dict: a dictionary of brands and the categories they belong to
    - score: a string of either 'cosine' or 'jaccard'
    - score_threshold: a float between 0 and 1

    Returns a dataframe of similar offers, ordered by highest score
    """
    print(f'Function main | Search input: {search_input}')
    check_ent = check_entity(search_input)
    if not check_entity(search_input): # no entities found
       # check category
       cat_check = find_category(search_input, category_dict)
       if cat_check is None:
           print('No brand/retailer/category is found. Please try again.')
           return None
       else:
            # we assume people just search one category at a time
            cat_tuple = cat_check # ('Alcohol', 'beer')
            search_results = offer_finder_by_category(search_input, cat_tuple, category_dict, offers, offered_brands, brand_belong_category_dict, score, score_threshold)
            return search_results
    else:
        entities = check_ent.ents # entities will be a tuple anyways
        print(f'Found {len(entities)} entity object(s) in the search input.')
        search_results = offer_finder_by_entity(search_input, entities, offers, score, score_threshold)
        if search_results is None:
            print('No offers matched retailer/category is found. Now trying to recommend based on category.')
            cat_check = find_category(search_input, category_dict)
            if cat_check is None:
                print('No brand/retailer/category is found. Please try again.')
                return None
            else:
                cat_tuple = cat_check
                search_results = offer_finder_by_category(search_input, cat_tuple, category_dict, offers, offered_brands, brand_belong_category_dict, score, score_threshold)
        return search_results

if __name__ == "__main__":
    search_offers()