Gainward777's picture
Upload 22 files
1f22e94 verified
import json
from constants.constants import *
from tqdm import tqdm
from transliterate import translit, detect_language
import pandas as pd
from rapidfuzz import fuzz, process
import numpy as np
from math import isnan
from preprocess.utils.common.utils import *
def normalize_name(name):
"""
Нормализует строку: если обнаруживается русский язык, транслитерирует её в латиницу,
приводит к нижнему регистру.
"""
try:
if detect_language(name) == 'ru':
return translit(name, 'ru', reversed=True).lower()
except Exception:
pass
return name.lower()
def normalize_name_ex(name):
name = normalize_name(name)
for nnk in NORMALIZED_NAMES_ALTERNATIVES_DICT:
word = find_full_word(name, NORMALIZED_NAMES_ALTERNATIVES_DICT[nnk])
if word:
name = name.replace(word, nnk)
return name
def compare_names(name1, name2, scorer=fuzz.ratio, score_cutoff=50):
print("Scoring: " + name1 + " vs " + name2)
words1 = name1.split(" ")
words2 = name2.split(" ")
score = 0
for w1 in words1:
for w2 in words2:
r = scorer(w1, w2)
print("\t " + w1 + " - " + w2 + " ; " + str(r))
if r >= score_cutoff:
score = score + r
print("Score result: " + str(score / (100*len(words1))))
return score / (100*len(words1))
def compare_name_with_list(name, names_list, scorer=fuzz.ratio, score_cutoff=50):
result = []
index = 0
for name2 in names_list:
result.append((name2, compare_names(name, name2, scorer, score_cutoff), index))
index = index + 1
return result
def prepare_groups_with_ids(items_df):
"""
Предварительная группировка данных из items по (new_brand, type, volume, new_type_wine, sour)
с учетом нормализованного названия.
Добавляем столбец 'norm_name', чтобы нормализовать значение name один раз заранее.
:param items_df: DataFrame с колонками 'new_brand', 'type', 'name', 'id', 'volume', 'new_type_wine', 'sour'.
:return: Словарь {(new_brand, type, volume, new_type_wine, sour): [(id, name, norm_name, volume, new_type_wine, sour)]}.
"""
items_df = items_df.copy()
items_df['norm_name'] = items_df['name'].apply(normalize_name_ex)
grouped = items_df.groupby(['new_brand', 'type', 'volume', 'new_type_wine', 'sour']).apply(
lambda x: list(zip(x['id'], x['name'], x['fullname'], x['norm_name'], x['volume'], x['new_type_wine'], x['sour'], x['year']))
).to_dict()
#print(grouped)
return grouped
def prepare_groups_by_alternative_keys(items_df):
"""
Группировка данных из items по (new_type_wine, new_type, volume, sour) с сохранением id, new_brand,
оригинального и нормализованного имени.
:param items_df: DataFrame с колонками 'new_brand', 'new_type_wine', 'new_type', 'volume', 'name', 'id', 'sour'.
:return: Словарь {(new_type_wine, new_type, volume, sour): [(id, new_brand, name, norm_name, volume, new_type_wine, sour)]}.
"""
items_df = items_df.copy()
items_df['norm_name'] = items_df['name'].apply(normalize_name_ex)
#grouped = items_df.groupby(['new_type_wine', 'new_type', 'volume' ''', 'sour''''']).apply(
grouped = items_df.groupby(['new_type_wine', 'new_type', 'volume']).apply(
lambda x: list(zip(x['id'], x['new_brand'], x['name'], x['fullname'], x['norm_name'], x['volume'], x['new_type_wine'], x['sour'], x['year']))
).to_dict()
return grouped
def parse_year(year):
if not year:
return False
elif isinstance(year, str):
return int(year)
elif isinstance(year, (int, float)) and not isnan(year):
return int(year)
return False
def order_by_best_year(matched_items, year):
best_matched_items = []
max_year_matched_items = []
other_matched_items = []
max_year = 0
year = parse_year(year)
for mi in matched_items:
# Если в оригинале указан год, то ищем точное совпадение, иначе сортируем по году в обратном порядке
try:
if isinstance(mi['year'], (int, float, str)):
mi_year = int(mi['year'])
else:
mi_year = False
if year and mi_year and (mi_year == year):
best_matched_items.append(mi['item_id'])
elif mi_year:
if mi_year > max_year:
max_year_matched_items = [mi]
max_year = mi_year
elif mi_year == max_year:
max_year_matched_items.append(mi)
else:
other_matched_items.append(mi['item_id'])
else:
other_matched_items.append(mi['item_id'])
except Exception as ex:
print("Error processing best year for item " + str(mi["item_id"]) + " value " + str(mi['year']) + ": " + str(ex))
if len(best_matched_items) > 0:
for m in matched_items:
if not m['item_id'] in best_matched_items:
m['score'] = m['score']*0.8
return matched_items
def new_find_matches_with_ids(products_df, items_groups, items_df, name_threshold=85, include_alternatives=True):
"""
Поиск совпадений с сохранением id найденных итемов, используя заранее подготовленные
нормализованные группы.
Производится два прохода:
- Первый: поиск по группам (brand, type, volume, new_type_wine, sour);
- Второй: для продуктов без совпадения ищем по альтернативным группам (new_type_wine, new_type, volume, sour),
исключая итемы с исходным брендом.
Сравнение производится по столбцу norm_name, а для вывода используется оригинальное name.
:param products_df: DataFrame с колонками 'id', 'brand', 'type', 'name', 'volume', 'new_type_wine', 'sour', 'new_type'.
:param items_groups: Словарь, сформированный функцией prepare_groups_with_ids.
:param items_df: DataFrame итемов с колонками 'id', 'new_brand', 'new_type_wine', 'new_type', 'volume', 'name', 'sour'.
:param name_threshold: Порог сходства для fuzzy matching.
:return: DataFrame с добавленными столбцами 'matched_items' (список совпадений) и 'alternative' (альтернативные совпадения).
"""
results = []
no_match_products = [] # Список для хранения продуктов без совпадения в исходной группе
if name_threshold < 50:
name_threshold = 50
# Первый проход: поиск по группам (brand, type, volume, new_type_wine, sour)
for idx, product in tqdm(products_df.iterrows(), total=len(products_df)):
product_brand = product['brand']
product_type = product['type']
product_name = product['name']
product_volume = product['volume']
product_type_wine = product['new_type_wine']
product_sour = product['sour']
key = (product_brand, product_type, product_volume, product_type_wine, product_sour)
#print("Name: " + product_name)
#print("Key: " + str(key))
#print("Groups: " + str(items_groups))
items_data = items_groups.get(key, [])
if items_data:
# Распаковываем: id, оригинальное имя, нормализованное имя, volume, new_type_wine, sour
#print("Data: " + str(items_data))
items_ids, items_names, items_full_names, items_norm_names, items_volumes, item_type_wine, items_sour, items_year = zip(*items_data)
else:
#print("Data: No")
items_ids, items_names,items_full_names, items_norm_names, items_volumes, item_type_wine, items_sour, items_year = ([], [], [], [], [],[], [], [])
norm_product_name = normalize_name_ex(product_name)
matches = process.extract(
norm_product_name, list(items_norm_names), scorer=fuzz.ratio, score_cutoff=name_threshold, limit=20
)
matched_items = [
{
'item_id': items_ids[idx_candidate],
'brand': product_brand,
'item_name': items_full_names[idx_candidate],
#'item_name': items_names[idx_candidate],
'score': score,
'volume': items_volumes[idx_candidate],
'color': item_type_wine[idx_candidate],
'sour': items_sour[idx_candidate],
'year': items_year[idx_candidate],
}
for match, score, idx_candidate in matches
]
if matched_items:
matched_items = order_by_best_year(matched_items, product['year'])
matched_items = matched_items[:5]
else:
no_match_products.append((idx, product))
results.append({
'product_id': product['id'],
#"matched_top_id": top_matched_id,
'matched_items': matched_items,
#"alternative_top_id": "",
#'alternative': [] # Заполняется во втором проходе
})
if include_alternatives:
# Подготовка альтернативной группировки по (new_type_wine, new_type, volume, sour)
groups_by_alternative_keys = prepare_groups_by_alternative_keys(items_df)
# Второй проход: для продуктов без совпадений ищем по альтернативным группам
for idx, product in tqdm(no_match_products):
#print("Product: " + str(product))
product_brand = product['brand']
product_type_wine = product['new_type_wine']
product_type = product['new_type']
product_volume = product['volume']
product_name = product['name']
product_sour = product['sour']
#alt_key = (product_type_wine, product_type, product_volume, product_sour)
alt_key = (product_type_wine, product_type, product_volume)
#print("AltName: " + str(product))
#print("AltKey: " + str(alt_key))
#print("AltGroups: " + str(groups_by_alternative_keys))
#print("AltGroups Keys: " + str(groups_by_alternative_keys.keys()))
type_items = groups_by_alternative_keys.get(alt_key, [])
#print("AltGroups2: " + str(type_items))
# Фильтруем, исключая итемы с исходным брендом
filtered_items = [item for item in type_items if item[1] != product_brand]
if filtered_items:
#print("AltData: " + str(filtered_items))
alt_ids, alt_brands, alt_names, alt_full_names, alt_norm_names, alt_volumes, alt_type_wine, alt_sour, alt_year = zip(*filtered_items)
else:
#print("AltData: No")
alt_ids, alt_brands, alt_names, alt_full_names, alt_norm_names, alt_volumes, alt_type_wine, alt_sour, alt_year = ([], [], [], [], [], [], [],[], [])
norm_product_name = normalize_name_ex(product_name)
#print("norm_product_name: " + str(norm_product_name))
#print("alt_norm_names: " + str(alt_norm_names))
alt_matches = process.extract(
norm_product_name, list(alt_norm_names), scorer=fuzz.ratio, score_cutoff=50
)
#alt_matches = compare_name_with_list(
# norm_product_name, list(alt_norm_names), scorer=fuzz.ratio, score_cutoff=70
#)
#print("alt_matches: " + str(alt_matches))
alt_matched_items = [
{
'item_id': alt_ids[idx_candidate],
'brand': alt_brands[idx_candidate],
#'item_name': alt_names[idx_candidate],
'item_name': alt_full_names[idx_candidate],
'score': score / 2,
'volume': alt_volumes[idx_candidate],
'color': alt_type_wine[idx_candidate],
'sour': alt_sour[idx_candidate],
'year': alt_year[idx_candidate],
}
for match, score, idx_candidate in alt_matches
]
alt_matched_items = order_by_best_year(alt_matched_items, product['year'])
alt_matched_items = alt_matched_items[:5]
results[idx]['matched_items'].extend(alt_matched_items)
for r in results:
r['matched_items'] = json.dumps(r['matched_items'], ensure_ascii=False)
#if alt_matched_items:
# results[idx]['alternative_top_id'] = alt_matched_items[0]["item_id"]
#results[idx]['alternative'] = alt_matched_items
results_df = pd.DataFrame(results)
merged_df = products_df.merge(results_df, left_on='id', right_on='product_id').drop(columns=['product_id'])
return merged_df