Spaces:
Sleeping
Sleeping
| from Constants import * | |
| from rapidfuzz import fuzz, process | |
| import pandas as pd | |
| import re | |
| from ahocorasick import Automaton | |
| from unidecode import unidecode | |
| import pandas as pd | |
| import json | |
| import numpy as np | |
| from tqdm import tqdm | |
| import re | |
| from pqdm.threads import pqdm | |
| import math | |
| import pandas as pd | |
| import numpy as np | |
| import pickle as pk | |
| import Levenshtein | |
| from transliterate import translit, detect_language | |
| from collections import defaultdict | |
| from collections import Counter | |
| def check_spark(row, col_name='name', types=['Игристое', 'игр']): | |
| if col_name in row.keys(): | |
| for t in types: | |
| if t.lower() in row[col_name].lower() and 'Пилигрим' not in row[col_name].lower(): | |
| return 'Игристое' | |
| return None | |
| def check_color_and_sour(row, col_name='type_wine', types=['Белое', 'Розовое', 'Красное']): | |
| if col_name in row.keys(): | |
| for t in types: | |
| if t.lower() in row[col_name].lower(): | |
| return 'Вино' | |
| return None | |
| def is_type_exist(row, types): | |
| for t in types: | |
| if t.lower() in row['type'].lower(): # Сравнение без учета регистра | |
| return t | |
| return None | |
| def check_type(row, types): | |
| #checker=False | |
| for t in types: | |
| if t.lower() in row['name'].lower(): # Сравнение без учета регистра | |
| return t | |
| return None | |
| def get_type(row, types): | |
| if 'type' not in row.keys(): | |
| return check_type(row, types) | |
| elif 'type' in row.keys(): | |
| semi_res=is_type_exist(row, types) | |
| if semi_res!=None: | |
| return semi_res | |
| else: | |
| return check_type(row, types) | |
| return None | |
| def extract_years(text): | |
| """ | |
| Извлекает сочетание числа и слова, указывающего возраст (например: '50 лет', '21 years'). | |
| """ | |
| # Регулярное выражение ищет числа и слова 'лет' или 'years' с учетом регистра | |
| match = re.search(r'\b(?<!\d)(\d{1,2})\s*(лет|years)\b', text, re.IGNORECASE) | |
| if match: | |
| # Приводим слово 'лет' или 'years' к исходному регистру | |
| return f"{match.group(1)} {match.group(2)}" | |
| return None | |
| def extract_production_year(text): | |
| """ | |
| Извлекает год производства (четырехзначное число в диапазоне 1900–2099) из строки. | |
| Например: '2019'. | |
| """ | |
| match = re.search(r'\b(19\d{2}|20\d{2})\b', text) | |
| if match: | |
| return match.group(1) | |
| return None | |
| def extract_alcohol_content(text): | |
| """ | |
| Извлекает содержание алкоголя из строки. | |
| Например: '40%'. | |
| """ | |
| match = re.search(r'(\d{1,2}(?:[.,]\d+)?\s*%)', text) | |
| if match: | |
| # Заменяем запятую на точку для единообразия (если нужно) | |
| return match.group(1).replace(' ', '').replace(',', '.') | |
| return None | |
| def is_volume(value): | |
| """ | |
| Проверяет, является ли значение валидным объемом (<= 10 литров). | |
| """ | |
| try: | |
| volume = float(value) | |
| return volume if volume <= 10 else None | |
| except ValueError: | |
| return None | |
| def extract_volume_or_number(text): | |
| """ | |
| Извлекает объем в литрах или число с плавающей точкой из строки. | |
| Например: '0,75л', '0.5', или '1,5 л'. | |
| """ | |
| # Попытка найти объем с буквой 'л' или без пробела перед ней | |
| match_with_l = re.search(r'(\d+(?:[\.,]\d+)?\s*[лЛ]|(?:\d+(?:[\.,]\d+)?[лЛ]))', text) | |
| if match_with_l: | |
| return is_volume(match_with_l.group(1).replace(',', '.').replace('л', '').replace('Л', '').strip()) | |
| # Если не найдено, ищем просто число с плавающей точкой | |
| match_number = re.search(r'(?<!№)\b(\d{1,2}(?:[\.,]\d+))\b(?!\s*(№|-er|er|\d{3,}))', text) | |
| if match_number: | |
| return is_volume(match_number.group(1).replace(',', '.')) | |
| return None | |
| def get_sour(s): | |
| """ | |
| Извлекает из строки ключевое слово, если оно присутствует как отдельное слово. | |
| Использует отрицательные просмотр назад/вперёд для проверки, что перед и после найденного | |
| ключевого слова нет буквенно-цифровых символов. | |
| Args: | |
| s (str): Исходная строка. | |
| Returns: | |
| str or None: Найденное ключевое слово, если оно присутствует как отдельное слово, иначе None. | |
| """ | |
| # Список ключевых слов | |
| keywords = [ | |
| r'brut', | |
| r'semi-sweet', | |
| r'sweet', | |
| r'брют', | |
| r'сухое', | |
| r'полусухое', | |
| r'полусладкое', | |
| r'сладкое', | |
| r'п/сух', | |
| r'п/сл', | |
| r'п/с', | |
| r'сл', | |
| r'сух' | |
| ] | |
| # Собираем шаблон с использованием негативных просмотр назад и вперёд, | |
| # чтобы убедиться, что совпадение не является частью более длинного слова. | |
| # (?<!\w) - перед совпадением не должно быть символа [a-zA-Z0-9_] | |
| # (?!\w) - после совпадения не должно быть символа [a-zA-Z0-9_] | |
| pattern = re.compile(r'(?<!\w)(?:' + '|'.join(keywords) + r')(?!\w)', re.IGNORECASE) | |
| match = pattern.search(s) | |
| if match: | |
| return match.group() | |
| else: | |
| return None | |
| def get_color(s): | |
| """ | |
| Извлекает строки, содержащие упоминания о подарочной упаковке, | |
| и возвращает их в виде словаря с индексами. | |
| Args: | |
| strings (list): Список строк. | |
| Returns: | |
| dict: Словарь, где ключи — индексы строк, а значения — строки с упоминаниями о подарочной упаковке. | |
| """ | |
| # Список ключевых слов и фраз для поиска | |
| keywords = [r'красное', | |
| r'белое', | |
| r'розовое' | |
| r'кр', | |
| r'бел', | |
| r'розе', | |
| r'rosso', | |
| r'roso', | |
| r'roseto', | |
| r'rosetto', | |
| r'red', | |
| r'white'] | |
| # Создаем шаблон регулярного выражения | |
| pattern = re.compile('|'.join(keywords), re.IGNORECASE) | |
| #gift_box_phrases={} | |
| #for idx, s in enumerate(strings): | |
| #s=str(s).lower() | |
| a=pattern.search(s) | |
| if a: | |
| return str(a.group()) | |
| else: return None | |
| def get_GB(s): | |
| """ | |
| Извлекает строки, содержащие упоминания о подарочной упаковке, | |
| и возвращает их в виде словаря с индексами. | |
| Args: | |
| strings (list): Список строк. | |
| Returns: | |
| dict: Словарь, где ключи — индексы строк, а значения — строки с упоминаниями о подарочной упаковке. | |
| """ | |
| # Список ключевых слов и фраз для поиска | |
| keywords = [r'cristal decanter in oak gift box', | |
| r'in the carton gift box with 2 glasses', | |
| r'decanter in the carton gift box', | |
| r'in the carton gift box', | |
| r'in the wooden gift box', | |
| r'in gift box in the carton', | |
| r'in gift box in carton', | |
| r'gift box in the carton', | |
| r'gift box in carton', | |
| r'in gift box in the wood', | |
| r'in gift box in wood', | |
| r'gift box in the wood', | |
| r'gift box in wood', | |
| r'gift box with 2 glasses', | |
| r'in gift box', | |
| r'gift box', | |
| r'in carton', | |
| r'in wooden case', | |
| r'in wooden box', | |
| r'in wood case' | |
| r'in wood box', | |
| r'in wood', | |
| r'хрустальный декантер в подарочной упаковке из дуба', | |
| r'декантер в подарочной упаковке из картона', | |
| r'в подарочной упаковке из картона с 2 бокалами' | |
| r'в подарочной упаковке из картона', | |
| r'в подарочной упаковке из Дуба', | |
| r'в П У графин и деревянная коробка', | |
| r'в подарочной упаковке', | |
| r'подарочная упаковка', | |
| r'подарочный набор', | |
| r'в деревянной коробке', | |
| r'деревянная коробка', | |
| r'в п/у+2 бокаланов', | |
| r'в п/у из картона', | |
| r'в п/у+бокал', | |
| r'в п/у (дер.коробке)', | |
| r'в п/у солома', | |
| r'в п/у', | |
| r'в п у', | |
| r'п/уп', | |
| r'п/у', | |
| r'в тубе', | |
| r'туба', | |
| r'ПУ'] | |
| # Создаем шаблон регулярного выражения | |
| pattern = re.compile('|'.join(keywords), re.IGNORECASE) | |
| #gift_box_phrases={} | |
| #for idx, s in enumerate(strings): | |
| #s=str(s).lower() | |
| a=pattern.search(s) | |
| if a: | |
| return str(a.group()) | |
| else: return None | |
| def remove_quotes(text): | |
| return re.sub(r'["\']', '', text) | |
| def prcess_text(origin): | |
| text=''+origin | |
| #text=str(split_russian_and_english(text)) | |
| gb=get_GB(text) | |
| if gb is not None: | |
| text=text.replace(str(gb), '') | |
| alcohol = extract_alcohol_content(text) | |
| if alcohol is not None: | |
| alco_w_comma=alcohol.replace('.', ',') | |
| text=text.replace(str(alcohol), '').replace(str(alco_w_comma), '') | |
| volume_or_number = extract_volume_or_number(text) | |
| if volume_or_number is not None: | |
| volume_with_comma=str(volume_or_number).replace('.', ',') | |
| text=text.replace(str(volume_or_number), '').replace(str(volume_with_comma), '') | |
| text=text.replace(str(volume_or_number)+' л', '').replace(str(volume_with_comma)+' л', '') | |
| # else: | |
| # volume_or_number=re_extract_volume(text) | |
| # if volume_or_number is not None: | |
| # volume_with_comma=volume_or_number.replace('.', ',') | |
| # text=text.replace(str(volume_or_number), '').replace(str(volume_with_comma), '') | |
| years = extract_years(text) | |
| if years is not None: | |
| text=text.replace(str(years), '').replace(str('выдержка'), '').replace(str('Выдержка'), '').replace(str('aging'), '') | |
| production_year = extract_production_year(text) | |
| if production_year is not None: | |
| text=text.replace(str(production_year), '') | |
| color=get_color(text) | |
| if color is not None: | |
| text=text.replace(str(color), '') | |
| sour=get_sour(text) | |
| if sour is not None: | |
| text=text.replace(str(sour), '') | |
| # re_extracted_volume=re_extract_volume(text) | |
| # if re_extracted_volume is not None: | |
| # volume_with_comma=re_extracted_volume.replace('.', ',') | |
| # text=text.replace(str(re_extracted_volume), '').replace(str(volume_with_comma), '') | |
| # else: | |
| # re_extracted_volume=re_extract_volume(str(volume_or_number)) | |
| # volume_or_number=re_extracted_volume | |
| return remove_quotes(text), alcohol, volume_or_number, years, production_year, gb, color, sour | |
| def remove_l(text): | |
| result = re.sub(r'\bл\b', '', text, flags=re.IGNORECASE) | |
| # Убираем возможные лишние пробелы, возникающие после удаления | |
| result = re.sub(r'\s{2,}', ' ', result).strip() | |
| return result | |
| def trim_name(text, words_to_remove): | |
| """ | |
| Удаляет из текста только те слова, которые полностью совпадают с элементами списка words_to_remove. | |
| :param text: Исходная строка. | |
| :param words_to_remove: Список слов, которые необходимо удалить. | |
| :return: Обновлённая строка с удалёнными словами. | |
| """ | |
| # Создаём регулярное выражение, которое ищет любое из указанных слов как отдельное слово. | |
| # Используем re.escape, чтобы экранировать спецсимволы в словах. | |
| pattern = r'\b(?:' + '|'.join(re.escape(word) for word in words_to_remove) + r')\b' | |
| #print(pattern) | |
| # Заменяем найденные полные слова на пустую строку. | |
| new_text = re.sub(pattern, '', text, flags=re.IGNORECASE) | |
| # Убираем лишние пробелы, возникающие после удаления слов. | |
| new_text = re.sub(r'\s+', ' ', new_text).strip() | |
| return new_text | |
| def name_trimmer(df): | |
| result={} | |
| gbs=[] | |
| sours=[] | |
| for idx, row in tqdm(df.iterrows()): | |
| text, alcohol, volume_or_number, years, production_year, gb, color, sour=prcess_text(str(row['name'])) | |
| text=trim_name(text, ALCO_PRODUCTS_TYPES).replace(',','').replace('.','') | |
| result[row['id']]=remove_l(text).lower().strip() | |
| gbs.append(gb) | |
| sours.append(sour) | |
| return result, gbs, sours | |
| def process_items(df, ru_types=None, ru_sour_list=None): | |
| if ru_types==None: | |
| ru_types=RU_TYPES | |
| if ru_sour_list==None: | |
| ru_sour_list=RU_SOUR_LIST | |
| dd={'id':[], 'brand':[], 'name':[], 'type':[], "type_wine":[], "volume":[], "year":[], 'alco':[]}#, 'embeddings':[]} | |
| #counter=0 | |
| for idf, i in tqdm(zip(df['id'].values, df['attrs'].values)): | |
| try: | |
| i=json.loads(i) | |
| dd['id'].append(idf) | |
| if 'brand' in i.keys(): | |
| dd['brand'].append(i['brand']) | |
| else: dd['brand'].append(None) | |
| dd['name'].append(i['name']) | |
| drink_type=get_type(i, ru_types) | |
| if drink_type is None: | |
| drink_type=check_spark(i) | |
| if drink_type is None: | |
| drink_type=check_color_and_sour(i) | |
| if drink_type is None: | |
| drink_type=check_spark(i, col_name='type_wine') | |
| if drink_type is None: | |
| drink_type=check_color_and_sour(i, types=ru_sour_list) | |
| #if 'type' in i.keys(): | |
| dd['type'].append(drink_type)#i['type']) | |
| #else: dd['type'].append(None) | |
| if 'volume' in i.keys(): | |
| dd['volume'].append(i['volume']) | |
| else: | |
| vol=extract_volume_or_number(i['name']) | |
| dd['volume'].append(vol) | |
| if 'year' in i.keys(): | |
| dd['year'].append(i['year']) | |
| else: | |
| year=extract_production_year(i['name']) | |
| dd['year'].append(year) | |
| alco=extract_alcohol_content(i['name']) | |
| if 'type_wine' in i.keys(): | |
| dd['type_wine'].append(i['type_wine']) | |
| else: dd['type_wine'].append(None) | |
| #f alco is not None: | |
| dd['alco'].append(alco) | |
| #else: dd['type_wine'].append(None) | |
| except Exception as ex: | |
| print(idf, ex) | |
| return pd.DataFrame(dd) | |
| def process_products(products): | |
| rr={'id':[], 'brand':[], 'name':[], 'type':[], "type_wine":[], "volume":[], "year":[], 'alco':[]}#, 'embeddings':[]} | |
| for idx, row in tqdm(products.iterrows()): | |
| try: | |
| rr['id'].append(row['id']) | |
| rr['brand'].append(row['brand']) | |
| rr['type_wine'].append(row['category']) | |
| rr['type'].append(row['product_type']) | |
| rr['name'].append(row['name_long']) | |
| vol=extract_volume_or_number(row['name']) | |
| rr['volume'].append(vol) | |
| #year=extract_production_year(row['name']) | |
| year=extract_production_year(str(row['name_postfix'])) | |
| rr['year'].append(year) | |
| #rr['year'].append(row['name_postfix']) | |
| alco=extract_alcohol_content(row['name']) | |
| #f alco is not None: | |
| rr['alco'].append(alco) | |
| except Exception as ex: | |
| print(ex) | |
| return pd.DataFrame(rr) | |
| def fill_brands_in_dataframe(brands, df, col_name='new_brand', is_brand=True): | |
| """ | |
| Заполняет колонку 'brand' в DataFrame найденными брендами. | |
| :param brands: Список брендов. | |
| :param df: DataFrame с колонками ['id', 'brand', 'name', ...]. | |
| :return: DataFrame с обновлённой колонкой 'brand'. | |
| """ | |
| # Инициализируем автомат для быстрого поиска брендов | |
| automaton = Automaton() | |
| # Добавляем бренды в автомат | |
| for idx, brand in enumerate(brands): | |
| if isinstance(brand, str) and brand: | |
| automaton.add_word(brand.lower(), (idx, brand)) | |
| automaton.make_automaton() | |
| def find_brand(name): | |
| """ | |
| Находит лучший бренд для данного имени. | |
| """ | |
| matched_brands = set() | |
| for _, (_, brand) in automaton.iter(name.lower()): | |
| # Проверяем, что бренд встречается как отдельное слово | |
| if re.search(rf'\b{re.escape(brand.lower())}\b', name.lower()): | |
| matched_brands.add(brand) | |
| # Возвращаем бренд с максимальной длиной (более точное совпадение) | |
| return max(matched_brands, key=len) if matched_brands else None | |
| # Обновляем колонку brand только для пустых значений | |
| # df['new_brand'] = df.apply( | |
| # lambda row: find_brand(row['name']), #if pd.isna(row['brand']) else row['brand'], | |
| # axis=1 | |
| # ) | |
| if is_brand==True: | |
| df[col_name] = df.apply(lambda row: find_brand(row['name']) or row['brand'], axis=1) | |
| else: | |
| df[col_name] = df.apply(lambda row: find_brand(row['name']) or None, axis=1) | |
| def brend_counter(u_nn, dff, th=None): | |
| res={} | |
| for i in tqdm(u_nn): | |
| lenta=len(dff[dff['new_name']==i]) | |
| if lenta>1: | |
| res[i]=lenta | |
| if th is None: | |
| th=math.sqrt(((np.array(list(res.values())).mean()+np.array(list(res.values())).std())**2)//2) | |
| other_brands=[i for i,j in res.items() if j>th] | |
| return other_brands | |
| def get_same_brands(products, items): | |
| comp_list=[] | |
| #not_comp_prods=[] | |
| #not_comp_items=[] | |
| prod_brand_list=list(products['brand'].unique()) | |
| items_brand_list=list(items['new_brand'].unique()) | |
| for i in tqdm(prod_brand_list): | |
| if i in items_brand_list: | |
| comp_list.append(i) | |
| return comp_list, prod_brand_list, items_brand_list | |
| def match_brands_improved(items_brands, prods_brands, threshold=85): | |
| """ | |
| Улучшенный алгоритм сопоставления брендов с учётом нечёткого поиска и фильтрации ошибок. | |
| :param items_brands: Список брендов из датафрейма items. | |
| :param prods_brands: Список брендов из датафрейма prods. | |
| :param threshold: Порог сходства для нечёткого поиска. | |
| :return: Словарь соответствий {бренд из items: ближайший бренд из prods}. | |
| """ | |
| brand_mapping = {} | |
| for item_brand in tqdm(items_brands): | |
| if isinstance(item_brand, str): | |
| # Разделяем бренд на части | |
| parts = [part.strip() for part in re.split(r"[\/\(\)]", item_brand) if part.strip()] | |
| best_match = None | |
| best_score = 0 | |
| for part in parts: | |
| match, score, _ = process.extractOne(part, prods_brands, scorer=fuzz.ratio) | |
| # Фильтрация по длине строк и порогу | |
| if score >= threshold and abs(len(part) - len(match)) / len(part) <= 0.3: | |
| if score > best_score: | |
| best_match = match | |
| best_score = score | |
| # Сохранение результата | |
| if best_match: | |
| brand_mapping[item_brand] = best_match#, best_score) | |
| return brand_mapping | |
| def normalize(text): | |
| """ | |
| Приводит текст к нижнему регистру и транслитерирует его в латиницу. | |
| """ | |
| return unidecode(text.lower()) | |
| def build_regex_for_brands(brands): | |
| """ | |
| Нормализует бренды и создаёт одно регулярное выражение для точного поиска. | |
| Возвращает скомпилированный паттерн и словарь: нормализованное название -> оригинальное название. | |
| """ | |
| norm_to_brand = {} | |
| for brand in brands: | |
| norm_brand = normalize(brand) | |
| if norm_brand not in norm_to_brand: | |
| norm_to_brand[norm_brand] = brand | |
| pattern = re.compile(r'\b(?:' + '|'.join(re.escape(nb) for nb in norm_to_brand.keys()) + r')\b') | |
| return pattern, norm_to_brand | |
| def process_string(s, regex_pattern, norm_to_brand, norm_brand_list, index_to_brand, threshold): | |
| """ | |
| Обрабатывает одну строку: | |
| 1. Пытается найти бренд через регулярное выражение. | |
| 2. Если точного совпадения нет – разбивает строку и выполняет нечёткий поиск. | |
| Возвращает кортеж: (исходная строка, найденный бренд или None). | |
| """ | |
| norm_s = normalize(s) | |
| # Пытаемся найти бренд через регулярное выражение | |
| match = regex_pattern.search(norm_s) | |
| if match: | |
| return s, norm_to_brand[match.group(0)] | |
| # Если точного совпадения нет, разбиваем строку по разделителям и анализируем части | |
| parts = [part.strip() for part in re.split(r"[\/\(\)]", s) if part.strip()] | |
| parts.append(s) # анализ всей строки | |
| best_match = None | |
| best_score = 0 | |
| for part in parts: | |
| norm_part = normalize(part) | |
| res = process.extractOne(norm_part, norm_brand_list, scorer=fuzz.ratio, score_cutoff=threshold) | |
| if res is not None: | |
| match_norm, score, idx = res | |
| if score > best_score: | |
| best_match = index_to_brand[idx] | |
| best_score = score | |
| if best_score == 100: | |
| break | |
| if best_match: | |
| return s, best_match | |
| return s, None | |
| def check_brands_in_strings_pqdm(strings, brands, threshold=85, n_jobs=8): | |
| """ | |
| Поиск брендов в строках с учетом вариантов написания и транслитерации. | |
| Использует предварительный поиск через регулярное выражение и, при необходимости, | |
| нечёткий поиск. Обработка выполняется параллельно с отображением прогресса с помощью pqdm. | |
| :param strings: Список строк для поиска брендов. | |
| :param brands: Список брендов для поиска. | |
| :param threshold: Порог сходства для нечёткого поиска. | |
| :param n_jobs: Число рабочих потоков (или процессов, если использовать pqdm.processes). | |
| :return: Словарь вида {строка: найденный бренд}. | |
| """ | |
| # Подготавливаем список нормализованных брендов и сопоставление индексов с оригинальными брендами. | |
| norm_brand_list = [] | |
| index_to_brand = [] | |
| for brand in brands: | |
| norm_brand = normalize(brand) | |
| norm_brand_list.append(norm_brand) | |
| index_to_brand.append(brand) | |
| # Создаем комбинированный паттерн для точного поиска. | |
| regex_pattern, norm_to_brand = build_regex_for_brands(brands) | |
| # Определяем вспомогательную функцию, закрывающую необходимые параметры. | |
| def process_string_wrapper(s): | |
| return process_string(s, regex_pattern, norm_to_brand, norm_brand_list, index_to_brand, threshold) | |
| # Обрабатываем строки параллельно с отображением прогресса. | |
| results = pqdm(strings, process_string_wrapper, n_jobs=n_jobs) | |
| brand_mapping = {} | |
| for s, matched_brand in results: | |
| if matched_brand: | |
| brand_mapping[s] = matched_brand | |
| return brand_mapping | |
| def clean_wine_name(name): | |
| """ | |
| Удаляет в конце строки отдельно стоящие буквы (однобуквенные слова), не входящие в состав других слов. | |
| Например, "токай л" превратится в "токай". | |
| """ | |
| # Регулярное выражение ищет: | |
| # \s+ – один или несколько пробельных символов; | |
| # \b – граница слова; | |
| # [A-Za-zА-ЯЁа-яё] – ровно одна буква (латинская или кириллическая); | |
| # \b – граница слова; | |
| # \s*$ – любые пробелы до конца строки. | |
| return re.sub(r'\s+\b[A-Za-zА-ЯЁа-яё]\b\s*$', '', name) | |
| def most_common_words(strings, top_n=None): | |
| """ | |
| Возвращает список наиболее часто повторяющихся слов из списка строк. | |
| Параметры: | |
| - strings: список строк | |
| - top_n: количество наиболее часто встречающихся слов, которые необходимо вернуть. | |
| Если None, возвращаются все слова, отсортированные по частоте. | |
| Возвращает: | |
| - Список кортежей (слово, частота) | |
| """ | |
| all_words = [] | |
| for s in tqdm(strings): | |
| s=str(s) | |
| # Извлекаем слова, приводим их к нижнему регистру и удаляем пунктуацию | |
| words = re.findall(r'\w+', s.lower()) | |
| all_words.extend(words) | |
| counter = Counter(all_words) | |
| return counter.most_common(top_n) | |
| def top_inserts_matching(other_brands, p_brands, items, th=65): | |
| replaced={} | |
| for i in other_brands: | |
| l=i.split('/') | |
| if len(l)>2: | |
| replaced[l[0].replace('Шато','')]=i | |
| else: | |
| if 'Шато' in i: | |
| replaced[i.replace('Шато','')]=i | |
| ob=[i.split('/')[0].replace('Шато','') for i in other_brands] | |
| rr60_ob=check_brands_in_strings_pqdm(ob, p_brands, threshold=th) | |
| result={} | |
| for k in rr60_ob.keys(): | |
| if k in replaced.keys(): | |
| result[replaced[k]]=rr60_ob[k] | |
| else: | |
| result[k]=rr60_ob[k] | |
| items.loc[items['new_name'].isin(result.keys()), 'new_brand'] = items['new_name'].map(result) | |
| def process_unbrended_names(items, p_brands, types, grape_varieties, onther_words): | |
| result={} | |
| for n in tqdm(items[items['new_brand'].isna()]['name'].values): | |
| name, alcohol, volume_or_number, years, production_year, gb, color, sour=prcess_text(n) | |
| #name, alcohol, volume_or_number, years, production_year, gb, color, sour=prcess_text('Вино Токай Фурминт п/сл. бел.0.75л') | |
| name=trim_name(name, types) | |
| name=trim_name(name, grape_varieties) | |
| name=trim_name(name, onther_words) | |
| name=name.replace('.','').replace(',','').replace('(','').replace(')','') | |
| #result.append(clean_wine_name(name).strip()) | |
| result[n]=clean_wine_name(name).strip() | |
| items['new_name']=None | |
| items.loc[items['name'].isin(result.keys()), 'new_name'] = items['name'].map(result) | |
| u_nn=list(items[~items['new_name'].isna()]['new_name'].unique()) | |
| res={} | |
| for i in tqdm(u_nn): | |
| lenta=len(items[items['new_name']==i]) | |
| if lenta>1: | |
| res[i]=lenta | |
| th=math.sqrt(((np.array(list(res.values())).mean()+np.array(list(res.values())).std())**2)//2) | |
| other_brands=[i for i,j in res.items() if j>th] | |
| reess=check_brands_in_strings_pqdm(other_brands, p_brands) | |
| items.loc[items['new_name'].isin(reess.keys()), 'new_brand'] = items['new_name'].map(reess) | |
| top_inserts_matching(other_brands, p_brands, items) | |
| def find_full_word(text, word_list): | |
| """ | |
| Ищет первое полное вхождение слова из word_list в строке text. | |
| Возвращает найденное слово или None, если совпадение не найдено. | |
| """ | |
| for word in word_list: | |
| pattern = r'\b' + re.escape(word) + r'\b' | |
| if re.search(pattern, text, re.IGNORECASE): | |
| return word | |
| return None | |
| def merge_wine_type(items, colors=None, color_merge_dict=None): | |
| if colors==None: | |
| colors=COLORS | |
| if color_merge_dict==None: | |
| color_merge_dict=COLOR_MERGE_DICT | |
| result=[] | |
| for row in tqdm(items.iterrows()): | |
| try: | |
| if row[1]['type_wine'] is not None: | |
| color=find_full_word(row[1]['type_wine'], colors) | |
| if color is not None: | |
| result.append(color) | |
| else: | |
| color=find_full_word(row[1]['name'], colors) | |
| if color is not None: | |
| result.append(color) | |
| else: | |
| result.append(None) | |
| else: | |
| color=find_full_word(row[1]['name'], colors) | |
| if color is not None: | |
| result.append(color) | |
| else: | |
| result.append(None) | |
| except Exception as ex: | |
| print(ex) | |
| result.append(None) | |
| items['new_type_wine']=result | |
| items['new_type_wine']=items['new_type_wine'].replace(color_merge_dict) | |
| def merge_types(items, products): | |
| alco_types=[i.strip().lower() for i in products['type'].unique()] | |
| alco_types.append('ликёр') | |
| result=[] | |
| for row in tqdm(items.iterrows()): | |
| try: | |
| type_in_name=find_full_word(row[1]['name'], alco_types) | |
| if type_in_name is not None: | |
| result.append(type_in_name) | |
| continue | |
| if row[1]['type'] is not None: | |
| type_in_type=find_full_word(row[1]['type'], alco_types) | |
| if type_in_type is not None: | |
| result.append(type_in_type) | |
| else: | |
| result.append(row[1]['type']) | |
| else: | |
| result.append(None) | |
| except Exception as ex: | |
| print(ex) | |
| result.append(None) | |
| items['new_type']=result | |
| items['new_type']=items['new_type'].replace({'ликёр': 'ликер', None: 'unmatched'}) | |
| def normalize_name(name): | |
| """ | |
| Нормализует строку: если обнаруживается русский язык, транслитерирует её в латиницу, | |
| приводит к нижнему регистру. | |
| """ | |
| try: | |
| if detect_language(name) == 'ru': | |
| return translit(name, 'ru', reversed=True).lower() | |
| except Exception: | |
| pass | |
| return name.lower() | |
| def prepare_groups_with_ids(items_df): | |
| """ | |
| Предварительная группировка данных из items по (new_brand, type, volume, new_type_wine, sour) | |
| с учетом нормализованного названия. | |
| Добавляем столбец 'norm_name', чтобы нормализовать значение name один раз заранее. | |
| :param items_df: DataFrame с колонками 'new_brand', 'type', 'name', 'id', 'volume', 'new_type_wine', 'sour'. | |
| :return: Словарь {(new_brand, type, volume, new_type_wine, sour): [(id, name, norm_name, volume, new_type_wine, sour)]}. | |
| """ | |
| items_df = items_df.copy() | |
| items_df['norm_name'] = items_df['name'].apply(normalize_name) | |
| grouped = items_df.groupby(['new_brand', 'type', 'volume', 'new_type_wine', 'sour']).apply( | |
| lambda x: list(zip(x['id'], x['name'], x['norm_name'], x['volume'], x['new_type_wine'], x['sour'])) | |
| ).to_dict() | |
| return grouped | |
| def prepare_groups_by_alternative_keys(items_df): | |
| """ | |
| Группировка данных из items по (new_type_wine, new_type, volume, sour) с сохранением id, new_brand, | |
| оригинального и нормализованного имени. | |
| :param items_df: DataFrame с колонками 'new_brand', 'new_type_wine', 'new_type', 'volume', 'name', 'id', 'sour'. | |
| :return: Словарь {(new_type_wine, new_type, volume, sour): [(id, new_brand, name, norm_name, volume, new_type_wine, sour)]}. | |
| """ | |
| items_df = items_df.copy() | |
| items_df['norm_name'] = items_df['name'].apply(normalize_name) | |
| grouped = items_df.groupby(['new_type_wine', 'new_type', 'volume', 'sour']).apply( | |
| lambda x: list(zip(x['id'], x['new_brand'], x['name'], x['norm_name'], x['volume'], x['new_type_wine'], x['sour'])) | |
| ).to_dict() | |
| return grouped | |
| def new_find_matches_with_ids(products_df, items_groups, items_df, name_threshold=85): | |
| """ | |
| Поиск совпадений с сохранением id найденных итемов, используя заранее подготовленные | |
| нормализованные группы. | |
| Производится два прохода: | |
| - Первый: поиск по группам (brand, type, volume, new_type_wine, sour); | |
| - Второй: для продуктов без совпадения ищем по альтернативным группам (new_type_wine, new_type, volume, sour), | |
| исключая итемы с исходным брендом. | |
| Сравнение производится по столбцу norm_name, а для вывода используется оригинальное name. | |
| :param products_df: DataFrame с колонками 'id', 'brand', 'type', 'name', 'volume', 'new_type_wine', 'sour', 'new_type'. | |
| :param items_groups: Словарь, сформированный функцией prepare_groups_with_ids. | |
| :param items_df: DataFrame итемов с колонками 'id', 'new_brand', 'new_type_wine', 'new_type', 'volume', 'name', 'sour'. | |
| :param name_threshold: Порог сходства для fuzzy matching. | |
| :return: DataFrame с добавленными столбцами 'matched_items' (список совпадений) и 'alternative' (альтернативные совпадения). | |
| """ | |
| results = [] | |
| no_match_products = [] # Список для хранения продуктов без совпадения в исходной группе | |
| # Первый проход: поиск по группам (brand, type, volume, new_type_wine, sour) | |
| for idx, product in tqdm(products_df.iterrows(), total=len(products_df)): | |
| product_brand = product['brand'] | |
| product_type = product['type'] | |
| product_name = product['name'] | |
| product_volume = product['volume'] | |
| product_type_wine = product['new_type_wine'] | |
| product_sour = product['sour'] | |
| key = (product_brand, product_type, product_volume, product_type_wine, product_sour) | |
| items_data = items_groups.get(key, []) | |
| if items_data: | |
| # Распаковываем: id, оригинальное имя, нормализованное имя, volume, new_type_wine, sour | |
| items_ids, items_names, items_norm_names, items_volumes, item_type_wine, items_sour = zip(*items_data) | |
| else: | |
| items_ids, items_names, items_norm_names, items_volumes, item_type_wine, items_sour = ([], [], [], [], [], []) | |
| norm_product_name = normalize_name(product_name) | |
| matches = process.extract( | |
| norm_product_name, list(items_norm_names), scorer=fuzz.ratio, score_cutoff=name_threshold | |
| ) | |
| matched_items = [ | |
| { | |
| 'item_id': items_ids[idx_candidate], | |
| 'item_name': items_names[idx_candidate], | |
| 'score': score, | |
| 'volume': items_volumes[idx_candidate], | |
| 'color': item_type_wine[idx_candidate], | |
| 'sour': items_sour[idx_candidate] | |
| } | |
| for match, score, idx_candidate in matches | |
| ] | |
| if not matched_items: | |
| no_match_products.append((idx, product)) | |
| results.append({ | |
| 'product_id': product['id'], | |
| 'matched_items': matched_items, | |
| 'alternative': [] # Заполняется во втором проходе | |
| }) | |
| # Подготовка альтернативной группировки по (new_type_wine, new_type, volume, sour) | |
| groups_by_alternative_keys = prepare_groups_by_alternative_keys(items_df) | |
| # Второй проход: для продуктов без совпадений ищем по альтернативным группам | |
| for idx, product in tqdm(no_match_products): | |
| product_brand = product['brand'] | |
| product_type_wine = product['new_type_wine'] | |
| product_type = product['new_type'] | |
| product_volume = product['volume'] | |
| product_name = product['name'] | |
| product_sour = product['sour'] | |
| alt_key = (product_type_wine, product_type, product_volume, product_sour) | |
| type_items = groups_by_alternative_keys.get(alt_key, []) | |
| # Фильтруем, исключая итемы с исходным брендом | |
| filtered_items = [item for item in type_items if item[1] != product_brand] | |
| if filtered_items: | |
| alt_ids, alt_brands, alt_names, alt_norm_names, alt_volumes, alt_type_wine, alt_sour = zip(*filtered_items) | |
| else: | |
| alt_ids, alt_brands, alt_names, alt_norm_names, alt_volumes, alt_type_wine, alt_sour = ([], [], [], [], [], [], []) | |
| norm_product_name = normalize_name(product_name) | |
| alt_matches = process.extract( | |
| norm_product_name, list(alt_norm_names), scorer=fuzz.ratio, score_cutoff=name_threshold | |
| ) | |
| alt_matched_items = [ | |
| { | |
| 'item_id': alt_ids[idx_candidate], | |
| 'item_name': alt_names[idx_candidate], | |
| 'score': score, | |
| 'volume': alt_volumes[idx_candidate], | |
| 'color': alt_type_wine[idx_candidate], | |
| 'sour': alt_sour[idx_candidate] | |
| } | |
| for match, score, idx_candidate in alt_matches | |
| ] | |
| results[idx]['alternative'] = alt_matched_items | |
| results_df = pd.DataFrame(results) | |
| merged_df = products_df.merge(results_df, left_on='id', right_on='product_id').drop(columns=['product_id']) | |
| return merged_df | |
| def contains_full_word(word, text, case_sensitive=True): | |
| """ | |
| Проверяет, содержится ли слово word в строке text как отдельное слово. | |
| Параметр case_sensitive задаёт, учитывать ли регистр. | |
| """ | |
| flags = 0 if case_sensitive else re.IGNORECASE | |
| pattern = r'\b' + re.escape(word) + r'\b' | |
| return re.search(pattern, text, flags) is not None | |
| def unwrap_brands(products): | |
| res={} | |
| #brands=items['brand'].unique() | |
| new_brands=sorted([x for x in products['brand'].unique() if isinstance(x, str)], key=len) | |
| #items['new_brand'].unique() if isinstance(x, str)], key=len) | |
| for i in tqdm(new_brands): | |
| for j in new_brands: | |
| if contains_full_word(i, j, case_sensitive=False): | |
| if i != j: | |
| #if len(i)>1:#i != 'А' and i != "Я": | |
| res[j]=i | |
| return res | |
| def split_n_match(products, items, th_len=3): | |
| result={} | |
| conditionally_spited=[] | |
| for i in tqdm(items['brand'].unique()): | |
| if '/' in i: | |
| conditionally_spited.append(i) | |
| for i in tqdm(products['brand'].unique()): | |
| for j in conditionally_spited: | |
| if len(i)>th_len and contains_full_word(i,j): | |
| result[j]=i | |
| return result | |
| def new_run(products, items, types=None, th=50): | |
| if types==None: | |
| types=TYPES_WINES | |
| print('------*-----Prepare items catalogue-----*-----') | |
| items=process_items(items.copy()) | |
| print('-----*-----Prepare products catalogue-----*-----') | |
| products=process_products(products.copy()) | |
| items['brand']=items['brand'].apply(lambda x: str(x).strip().lower()) | |
| products['brand']=products['brand'].apply(lambda x: str(x).strip().lower()) | |
| print('-----*-----Split n match-----*-----') | |
| splited=split_n_match(products, items) | |
| items["brand"] = items["brand"].replace(splited) | |
| print('-----*-----Fill brands in items-----*-----') | |
| fill_brands_in_dataframe(products['brand'].unique(), items) | |
| print('-----*-----Brand matching-----*-----') | |
| comp_list, prod_brand_list, items_brand_list=get_same_brands(products, items) | |
| out_prods=list(set(prod_brand_list)-set(comp_list)) | |
| out_items=list(set(items_brand_list)-set(comp_list)) | |
| brand_map_improved=match_brands_improved(out_items, list(products['brand'].unique())) | |
| items["new_brand"] = items["new_brand"].replace(brand_map_improved) | |
| items['type']=items['type'].replace(types) | |
| print('-----*-----Unwrap brend cats step 1-----*-----') | |
| unwrap_b_match=unwrap_brands(products) | |
| items["new_brand"] = items["new_brand"].replace(unwrap_b_match) | |
| products["brand"] = products["brand"].replace(unwrap_b_match) | |
| print('-----*-----Unwrap brend cats step 2-----*-----') | |
| unwrap_b_match=unwrap_brands(products) | |
| items["new_brand"] = items["new_brand"].replace(unwrap_b_match) | |
| products["brand"] = products["brand"].replace(unwrap_b_match) | |
| # print('-----*-----Split n match-----*-----') | |
| # splited=split_n_match(products, items) | |
| # items["new_brand"] = items["new_brand"].replace(splited) | |
| #fill_brands_in_dataframe(splited.values(), items) | |
| print('-----*-----Finding brands in names-----*-----') | |
| items['new_brand']=items['new_brand'].replace('none', None) | |
| i_brands=items[items['new_brand'].isna()]['name'].values | |
| p_brands=[i for i in products['brand'].unique() if i is not None and len(i)>3] | |
| rr=check_brands_in_strings_pqdm(i_brands, p_brands) | |
| items.loc[items['name'].isin(rr.keys()), 'new_brand'] = items['name'].map(rr) | |
| print('-----*-----Top inserts-----*-----') | |
| process_unbrended_names(items, p_brands, TYPES, GRAPES, OTHER_WORDS) | |
| print('-----*-----Adding service categories-----*-----') | |
| merge_wine_type(items, colors=COLORS) | |
| merge_types(items, products) | |
| merge_wine_type(products, colors=COLORS) | |
| merge_types(products, products) | |
| print('-----*-----Name trimming-----*-----') | |
| item_timed_names, gb, sour=name_trimmer(items) | |
| #items['name']=items['id'].replace(item_timed_names) | |
| items.loc[items['id'].isin(item_timed_names.keys()), 'name'] = items['id'].map(item_timed_names) | |
| items['gb']=gb | |
| items['sour']=sour | |
| items['sour']=items['sour'].replace(SOUR_MERGE_DICT) | |
| priducts_trimed_names, gb, sour=name_trimmer(products) | |
| products.loc[products['id'].isin(priducts_trimed_names.keys()), 'name'] = products['id'].map(priducts_trimed_names) | |
| products['gb']=gb | |
| products['sour']=sour | |
| products['sour']=products['sour'].replace(SOUR_MERGE_DICT) | |
| print('-----*-----Replacing product types-----*-----') | |
| products['type']=products['type'].replace(types) | |
| print('-----*-----Matching-----*-----') | |
| items_groups = prepare_groups_with_ids(items) | |
| res=new_find_matches_with_ids(products, items_groups, items, name_threshold=th) | |
| return res.drop(['type','type_wine','year','alco','gb'], axis=1), items, products | |