Spaces:
Build error
Build error
| import re | |
| from transliterate import translit, detect_language | |
| from constants.constants import * | |
| from tqdm import tqdm | |
| def get_delimiter(file_path): | |
| with open(file_path, 'r', encoding="utf-8") as f: | |
| ln = f.readline() | |
| if ',' in ln: | |
| return ',' | |
| if ';' in ln: | |
| return ';' | |
| if '\t' in ln: | |
| return '\t' | |
| if '|' in ln: | |
| return '|' | |
| raise ValueError(None, "Error parsing CSV file. Cannot detect delimiter") | |
| def verify_csv(csv_file, fixed_csv=None): | |
| bad_lines = [] | |
| lnnum = 1 | |
| if fixed_csv: | |
| w = open(fixed_csv, "w", encoding="utf-8") | |
| with open(csv_file, "r", encoding="utf-8") as f: | |
| ln = f.readline() | |
| if fixed_csv: | |
| w.write(ln) | |
| while True: | |
| ln = f.readline() | |
| if len(ln) == 0: | |
| break | |
| if not ln.count('"') % 2 == 0: | |
| bad_lines.append("(" + str(lnnum) + "): " + ln) | |
| if fixed_csv: | |
| w.write(ln.replace('"', '""', 1)) | |
| #raise Exception("Incorrect quotes at line " + str(lnnum) + " in file [" + csv_file + "]") | |
| else: | |
| if fixed_csv: | |
| w.write(ln) | |
| lnnum = lnnum + 1 | |
| if fixed_csv: | |
| w.close() | |
| return bad_lines | |
| def normalize_name(name): | |
| """ | |
| Нормализует строку: если обнаруживается русский язык, транслитерирует её в латиницу, | |
| приводит к нижнему регистру. | |
| """ | |
| try: | |
| if not isinstance(name, str): | |
| return name | |
| if detect_language(name) == 'ru': | |
| return translit(name, 'ru', reversed=True).lower() | |
| except Exception: | |
| pass | |
| return name.lower() | |
| def preprocess_product_brand(name): | |
| if not isinstance(name, str): | |
| return str(name) | |
| name = name.lower() | |
| name = replace_interword_separators(name) | |
| return name | |
| def preprocess_product_name(name): | |
| if not isinstance(name, str): | |
| return str(name) | |
| return name.lower() | |
| def replace_interword_separators(name): | |
| return name.replace("'", "").replace("`", "").replace("’", "").replace('-', '') | |
| def clean_text(text): | |
| text = remove_quotes(text) | |
| # Удаляем одиночную звездочку, остающуюся после объема вида "0.75*' | |
| text = re.sub(r'(\s|^)[\*;](\s|$)', ' ', text) | |
| # Убираем '()' | |
| text = re.sub(r'\(\s*\)', ' ', text) | |
| #text = clean_wine_name(text) | |
| # Убираем // | |
| text = re.sub(r'/\s*/', '/', text).strip() | |
| # Убираем / | |
| text = re.sub(r'\s+/\s+', ' ', text).strip() | |
| # Убираем дублирующиеся пробелы | |
| text = re.sub(r'\s+', ' ', text).strip() | |
| return text | |
| def normalize_and_clean_brand(brand): | |
| if not brand: | |
| return brand | |
| brand = normalize_name(brand) | |
| brand = replace_interword_separators(brand) | |
| brand = clean_text(brand) | |
| return brand | |
| def normalize_terms_and_attributes(name): | |
| if not name: | |
| return name | |
| for term in TERMS_AND_ATTRIBUTES_VARIATIONS: | |
| word = find_full_word(name, TERMS_AND_ATTRIBUTES_VARIATIONS[term]) | |
| if word: | |
| name = name.replace(word, term.lower()) | |
| return name | |
| def normalize_and_clean_name(name): | |
| if not name: | |
| return name | |
| name = normalize_name(name) | |
| name = normalize_terms_and_attributes(name) | |
| name = replace_interword_separators(name) | |
| name = replace_brands_and_names_alternatives(name) | |
| name = clean_text(name) | |
| return name | |
| def replace_brands_and_names_alternatives(name): | |
| if not name: | |
| return name | |
| for nnk in BRANDS_AND_NAMES_ALTERNATIVES_DICT: | |
| word = find_full_word(name, BRANDS_AND_NAMES_ALTERNATIVES_DICT[nnk]) | |
| if word: | |
| name = name.replace(word, nnk.lower()) | |
| return name | |
| def remove_quotes(text): | |
| return re.sub(r'["\'«»]', ' ', text) | |
| def clean_wine_name(name): | |
| """ | |
| Удаляет в конце строки отдельно стоящие буквы (однобуквенные слова), не входящие в состав других слов. | |
| Например, "токай л" превратится в "токай". | |
| """ | |
| # Регулярное выражение ищет: | |
| # \s+ – один или несколько пробельных символов; | |
| # \b – граница слова; | |
| # [A-Za-zА-ЯЁа-яё] – ровно одна буква (латинская или кириллическая); | |
| # \b – граница слова; | |
| # \s*$ – любые пробелы до конца строки. | |
| return re.sub(r'\s+\b[A-Za-zА-ЯЁа-яё\-]\b\s*$', '', name) | |
| def find_full_word(text, word_list): | |
| """ | |
| Ищет первое полное вхождение слова из word_list в строке text. | |
| Возвращает найденное слово или None, если совпадение не найдено. | |
| """ | |
| for word in word_list: | |
| pattern = r'\b' + re.escape(word) + r'\b' | |
| match = re.search(pattern, text, re.IGNORECASE) | |
| if match: | |
| return match.group(0) | |
| return None | |
| def find_word(text, word_list): | |
| for word in word_list: | |
| if word in text: | |
| return word | |
| return None | |
| def remove_full_words(text, word_list): | |
| """ | |
| Ищет все полное вхождение слов из word_list в строке text и удаляет их | |
| """ | |
| for word in word_list: | |
| if word: | |
| pattern = r'\b' + re.escape(word) + r'\b' | |
| text = re.sub(pattern, ' ', text, flags=re.IGNORECASE) | |
| return text | |
| def merge_wine_type(items, colors=None, color_merge_dict=None): | |
| result=[] | |
| for row in tqdm(items.iterrows()): | |
| try: | |
| #print("merge_wine_type:" + str(row)) | |
| if row[1]['type_wine'] is not None: | |
| color=find_full_word(row[1]['type_wine'], colors) | |
| if color is not None: | |
| result.append(color) | |
| else: | |
| color=find_full_word(row[1]['name'], colors) | |
| if color is not None: | |
| result.append(color) | |
| else: | |
| result.append(None) | |
| else: | |
| color=find_full_word(row[1]['name'], colors) | |
| if color is not None: | |
| result.append(color) | |
| else: | |
| result.append(None) | |
| except Exception as ex: | |
| print("Error in merge_wine_type: " + str(ex)) | |
| result.append(None) | |
| items['new_type_wine']=result | |
| items['new_type_wine']=items['new_type_wine'].replace(color_merge_dict) | |
| def merge_types(items, products, type_merge_dict={}, sub_alco_types=["Бренди", "Шампань", "Шампанское"], product_types = None): | |
| pt = product_types if product_types else products['type'].unique() | |
| alco_types=[i.strip().lower() for i in pt] | |
| alco_types.append('ликёр') | |
| #alco_types.sort(reverse=True) | |
| alco_types = sorted(alco_types, reverse=True, key=len) | |
| result=[] | |
| for row in tqdm(items.iterrows()): | |
| try: | |
| # Parameter 'sub_alco_types' specifies specific alcohol types that usually specified | |
| # in product / item name along with "parent" type and in this case this subtype should have priority | |
| # For example, "Вино Шампано Ле Брён де Нёвиль", or "Бренди де Херес" | |
| if sub_alco_types: | |
| type_in_name=find_full_word(row[1]['name'], sub_alco_types) | |
| if type_in_name is not None: | |
| result.append(type_in_name) | |
| continue | |
| type_in_name=find_full_word(row[1]['name'], alco_types) | |
| if type_in_name is not None: | |
| result.append(type_in_name) | |
| continue | |
| if row[1]['type'] is not None: | |
| type_in_type=find_full_word(row[1]['type'], alco_types) | |
| if type_in_type is not None: | |
| result.append(type_in_type) | |
| else: | |
| result.append(row[1]['type']) | |
| else: | |
| result.append(None) | |
| except Exception as ex: | |
| print(ex) | |
| result.append(None) | |
| items['new_type']=result | |
| #items['new_type']=items['new_type'].replace({'ликёр': 'ликер', None: 'unmatched'}) | |
| items['new_type'] = items['new_type'].replace(type_merge_dict) | |
| def trim_name(text, words_to_remove): | |
| """ | |
| Удаляет из текста только те слова, которые полностью совпадают с элементами списка words_to_remove. | |
| :param text: Исходная строка. | |
| :param words_to_remove: Список слов, которые необходимо удалить. | |
| :return: Обновлённая строка с удалёнными словами. | |
| """ | |
| # Создаём регулярное выражение, которое ищет любое из указанных слов как отдельное слово. | |
| # Используем re.escape, чтобы экранировать спецсимволы в словах. | |
| pattern = r'\b(?:' + '|'.join(re.escape(word) for word in words_to_remove) + r')\b' | |
| #print("Pattern: " + pattern) | |
| # Заменяем найденные полные слова на пустую строку. | |
| new_text = re.sub(pattern, '', text, flags=re.IGNORECASE) | |
| # Убираем лишние пробелы, возникающие после удаления слов. | |
| new_text = re.sub(r'\s+', ' ', new_text).strip() | |
| return new_text | |
| '''def is_string(val): | |
| if not val: | |
| return False | |
| elif isinstance(val, str): | |
| return True | |
| return False | |
| ''' | |
| def detect_language_simple(text): | |
| for ch in text: | |
| if (ord(ch) >= ord('А') and ord(ch) <= ord('Я')) or \ | |
| (ord(ch) >= ord('а') and ord(ch) <= ord('я')): | |
| return 'ru' | |
| elif (ord(ch) >= ord('A') and ord(ch) <= ord('Z')) or \ | |
| (ord(ch) >= ord('a') and ord(ch) <= ord('z')): | |
| return 'en' | |
| return 'xx' |