import json from constants.constants import * from tqdm import tqdm from transliterate import translit, detect_language import pandas as pd from rapidfuzz import fuzz, process import numpy as np from math import isnan from preprocess.utils.common.utils import * def normalize_name(name): """ Нормализует строку: если обнаруживается русский язык, транслитерирует её в латиницу, приводит к нижнему регистру. """ try: if detect_language(name) == 'ru': return translit(name, 'ru', reversed=True).lower() except Exception: pass return name.lower() def normalize_name_ex(name): name = normalize_name(name) for nnk in NORMALIZED_NAMES_ALTERNATIVES_DICT: word = find_full_word(name, NORMALIZED_NAMES_ALTERNATIVES_DICT[nnk]) if word: name = name.replace(word, nnk) return name def compare_names(name1, name2, scorer=fuzz.ratio, score_cutoff=50): print("Scoring: " + name1 + " vs " + name2) words1 = name1.split(" ") words2 = name2.split(" ") score = 0 for w1 in words1: for w2 in words2: r = scorer(w1, w2) print("\t " + w1 + " - " + w2 + " ; " + str(r)) if r >= score_cutoff: score = score + r print("Score result: " + str(score / (100*len(words1)))) return score / (100*len(words1)) def compare_name_with_list(name, names_list, scorer=fuzz.ratio, score_cutoff=50): result = [] index = 0 for name2 in names_list: result.append((name2, compare_names(name, name2, scorer, score_cutoff), index)) index = index + 1 return result def prepare_groups_with_ids(items_df): """ Предварительная группировка данных из items по (new_brand, type, volume, new_type_wine, sour) с учетом нормализованного названия. Добавляем столбец 'norm_name', чтобы нормализовать значение name один раз заранее. :param items_df: DataFrame с колонками 'new_brand', 'type', 'name', 'id', 'volume', 'new_type_wine', 'sour'. :return: Словарь {(new_brand, type, volume, new_type_wine, sour): [(id, name, norm_name, volume, new_type_wine, sour)]}. """ items_df = items_df.copy() items_df['norm_name'] = items_df['name'].apply(normalize_name_ex) grouped = items_df.groupby(['new_brand', 'type', 'volume', 'new_type_wine', 'sour']).apply( lambda x: list(zip(x['id'], x['name'], x['fullname'], x['norm_name'], x['volume'], x['new_type_wine'], x['sour'], x['year'])) ).to_dict() #print(grouped) return grouped def prepare_groups_by_alternative_keys(items_df): """ Группировка данных из items по (new_type_wine, new_type, volume, sour) с сохранением id, new_brand, оригинального и нормализованного имени. :param items_df: DataFrame с колонками 'new_brand', 'new_type_wine', 'new_type', 'volume', 'name', 'id', 'sour'. :return: Словарь {(new_type_wine, new_type, volume, sour): [(id, new_brand, name, norm_name, volume, new_type_wine, sour)]}. """ items_df = items_df.copy() items_df['norm_name'] = items_df['name'].apply(normalize_name_ex) #grouped = items_df.groupby(['new_type_wine', 'new_type', 'volume' ''', 'sour''''']).apply( grouped = items_df.groupby(['new_type_wine', 'new_type', 'volume']).apply( lambda x: list(zip(x['id'], x['new_brand'], x['name'], x['fullname'], x['norm_name'], x['volume'], x['new_type_wine'], x['sour'], x['year'])) ).to_dict() return grouped def parse_year(year): if not year: return False elif isinstance(year, str): return int(year) elif isinstance(year, (int, float)) and not isnan(year): return int(year) return False def order_by_best_year(matched_items, year): best_matched_items = [] max_year_matched_items = [] other_matched_items = [] max_year = 0 year = parse_year(year) for mi in matched_items: # Если в оригинале указан год, то ищем точное совпадение, иначе сортируем по году в обратном порядке try: if isinstance(mi['year'], (int, float, str)): mi_year = int(mi['year']) else: mi_year = False if year and mi_year and (mi_year == year): best_matched_items.append(mi['item_id']) elif mi_year: if mi_year > max_year: max_year_matched_items = [mi] max_year = mi_year elif mi_year == max_year: max_year_matched_items.append(mi) else: other_matched_items.append(mi['item_id']) else: other_matched_items.append(mi['item_id']) except Exception as ex: print("Error processing best year for item " + str(mi["item_id"]) + " value " + str(mi['year']) + ": " + str(ex)) if len(best_matched_items) > 0: for m in matched_items: if not m['item_id'] in best_matched_items: m['score'] = m['score']*0.8 return matched_items def new_find_matches_with_ids(products_df, items_groups, items_df, name_threshold=85, include_alternatives=True): """ Поиск совпадений с сохранением id найденных итемов, используя заранее подготовленные нормализованные группы. Производится два прохода: - Первый: поиск по группам (brand, type, volume, new_type_wine, sour); - Второй: для продуктов без совпадения ищем по альтернативным группам (new_type_wine, new_type, volume, sour), исключая итемы с исходным брендом. Сравнение производится по столбцу norm_name, а для вывода используется оригинальное name. :param products_df: DataFrame с колонками 'id', 'brand', 'type', 'name', 'volume', 'new_type_wine', 'sour', 'new_type'. :param items_groups: Словарь, сформированный функцией prepare_groups_with_ids. :param items_df: DataFrame итемов с колонками 'id', 'new_brand', 'new_type_wine', 'new_type', 'volume', 'name', 'sour'. :param name_threshold: Порог сходства для fuzzy matching. :return: DataFrame с добавленными столбцами 'matched_items' (список совпадений) и 'alternative' (альтернативные совпадения). """ results = [] no_match_products = [] # Список для хранения продуктов без совпадения в исходной группе if name_threshold < 50: name_threshold = 50 # Первый проход: поиск по группам (brand, type, volume, new_type_wine, sour) for idx, product in tqdm(products_df.iterrows(), total=len(products_df)): product_brand = product['brand'] product_type = product['type'] product_name = product['name'] product_volume = product['volume'] product_type_wine = product['new_type_wine'] product_sour = product['sour'] key = (product_brand, product_type, product_volume, product_type_wine, product_sour) #print("Name: " + product_name) #print("Key: " + str(key)) #print("Groups: " + str(items_groups)) items_data = items_groups.get(key, []) if items_data: # Распаковываем: id, оригинальное имя, нормализованное имя, volume, new_type_wine, sour #print("Data: " + str(items_data)) items_ids, items_names, items_full_names, items_norm_names, items_volumes, item_type_wine, items_sour, items_year = zip(*items_data) else: #print("Data: No") items_ids, items_names,items_full_names, items_norm_names, items_volumes, item_type_wine, items_sour, items_year = ([], [], [], [], [],[], [], []) norm_product_name = normalize_name_ex(product_name) matches = process.extract( norm_product_name, list(items_norm_names), scorer=fuzz.ratio, score_cutoff=name_threshold, limit=20 ) matched_items = [ { 'item_id': items_ids[idx_candidate], 'brand': product_brand, 'item_name': items_full_names[idx_candidate], #'item_name': items_names[idx_candidate], 'score': score, 'volume': items_volumes[idx_candidate], 'color': item_type_wine[idx_candidate], 'sour': items_sour[idx_candidate], 'year': items_year[idx_candidate], } for match, score, idx_candidate in matches ] if matched_items: matched_items = order_by_best_year(matched_items, product['year']) matched_items = matched_items[:5] else: no_match_products.append((idx, product)) results.append({ 'product_id': product['id'], #"matched_top_id": top_matched_id, 'matched_items': matched_items, #"alternative_top_id": "", #'alternative': [] # Заполняется во втором проходе }) if include_alternatives: # Подготовка альтернативной группировки по (new_type_wine, new_type, volume, sour) groups_by_alternative_keys = prepare_groups_by_alternative_keys(items_df) # Второй проход: для продуктов без совпадений ищем по альтернативным группам for idx, product in tqdm(no_match_products): #print("Product: " + str(product)) product_brand = product['brand'] product_type_wine = product['new_type_wine'] product_type = product['new_type'] product_volume = product['volume'] product_name = product['name'] product_sour = product['sour'] #alt_key = (product_type_wine, product_type, product_volume, product_sour) alt_key = (product_type_wine, product_type, product_volume) #print("AltName: " + str(product)) #print("AltKey: " + str(alt_key)) #print("AltGroups: " + str(groups_by_alternative_keys)) #print("AltGroups Keys: " + str(groups_by_alternative_keys.keys())) type_items = groups_by_alternative_keys.get(alt_key, []) #print("AltGroups2: " + str(type_items)) # Фильтруем, исключая итемы с исходным брендом filtered_items = [item for item in type_items if item[1] != product_brand] if filtered_items: #print("AltData: " + str(filtered_items)) alt_ids, alt_brands, alt_names, alt_full_names, alt_norm_names, alt_volumes, alt_type_wine, alt_sour, alt_year = zip(*filtered_items) else: #print("AltData: No") alt_ids, alt_brands, alt_names, alt_full_names, alt_norm_names, alt_volumes, alt_type_wine, alt_sour, alt_year = ([], [], [], [], [], [], [],[], []) norm_product_name = normalize_name_ex(product_name) #print("norm_product_name: " + str(norm_product_name)) #print("alt_norm_names: " + str(alt_norm_names)) alt_matches = process.extract( norm_product_name, list(alt_norm_names), scorer=fuzz.ratio, score_cutoff=50 ) #alt_matches = compare_name_with_list( # norm_product_name, list(alt_norm_names), scorer=fuzz.ratio, score_cutoff=70 #) #print("alt_matches: " + str(alt_matches)) alt_matched_items = [ { 'item_id': alt_ids[idx_candidate], 'brand': alt_brands[idx_candidate], #'item_name': alt_names[idx_candidate], 'item_name': alt_full_names[idx_candidate], 'score': score / 2, 'volume': alt_volumes[idx_candidate], 'color': alt_type_wine[idx_candidate], 'sour': alt_sour[idx_candidate], 'year': alt_year[idx_candidate], } for match, score, idx_candidate in alt_matches ] alt_matched_items = order_by_best_year(alt_matched_items, product['year']) alt_matched_items = alt_matched_items[:5] results[idx]['matched_items'].extend(alt_matched_items) for r in results: r['matched_items'] = json.dumps(r['matched_items'], ensure_ascii=False) #if alt_matched_items: # results[idx]['alternative_top_id'] = alt_matched_items[0]["item_id"] #results[idx]['alternative'] = alt_matched_items results_df = pd.DataFrame(results) merged_df = products_df.merge(results_df, left_on='id', right_on='product_id').drop(columns=['product_id']) return merged_df