import re from transliterate import translit, detect_language from constants.constants import * from tqdm import tqdm def get_delimiter(file_path): with open(file_path, 'r', encoding="utf-8") as f: ln = f.readline() if ',' in ln: return ',' if ';' in ln: return ';' if '\t' in ln: return '\t' if '|' in ln: return '|' raise ValueError(None, "Error parsing CSV file. Cannot detect delimiter") def verify_csv(csv_file, fixed_csv=None): bad_lines = [] lnnum = 1 if fixed_csv: w = open(fixed_csv, "w", encoding="utf-8") with open(csv_file, "r", encoding="utf-8") as f: ln = f.readline() if fixed_csv: w.write(ln) while True: ln = f.readline() if len(ln) == 0: break if not ln.count('"') % 2 == 0: bad_lines.append("(" + str(lnnum) + "): " + ln) if fixed_csv: w.write(ln.replace('"', '""', 1)) #raise Exception("Incorrect quotes at line " + str(lnnum) + " in file [" + csv_file + "]") else: if fixed_csv: w.write(ln) lnnum = lnnum + 1 if fixed_csv: w.close() return bad_lines def normalize_name(name): """ Нормализует строку: если обнаруживается русский язык, транслитерирует её в латиницу, приводит к нижнему регистру. """ try: if not isinstance(name, str): return name if detect_language(name) == 'ru': return translit(name, 'ru', reversed=True).lower() except Exception: pass return name.lower() def preprocess_product_brand(name): if not isinstance(name, str): return str(name) name = name.lower() name = replace_interword_separators(name) return name def preprocess_product_name(name): if not isinstance(name, str): return str(name) return name.lower() def replace_interword_separators(name): return name.replace("'", "").replace("`", "").replace("’", "").replace('-', '') def clean_text(text): text = remove_quotes(text) # Удаляем одиночную звездочку, остающуюся после объема вида "0.75*' text = re.sub(r'(\s|^)[\*;](\s|$)', ' ', text) # Убираем '()' text = re.sub(r'\(\s*\)', ' ', text) #text = clean_wine_name(text) # Убираем // text = re.sub(r'/\s*/', '/', text).strip() # Убираем / text = re.sub(r'\s+/\s+', ' ', text).strip() # Убираем дублирующиеся пробелы text = re.sub(r'\s+', ' ', text).strip() return text def normalize_and_clean_brand(brand): if not brand: return brand brand = normalize_name(brand) brand = replace_interword_separators(brand) brand = clean_text(brand) return brand def normalize_terms_and_attributes(name): if not name: return name for term in TERMS_AND_ATTRIBUTES_VARIATIONS: word = find_full_word(name, TERMS_AND_ATTRIBUTES_VARIATIONS[term]) if word: name = name.replace(word, term.lower()) return name def normalize_and_clean_name(name): if not name: return name name = normalize_name(name) name = normalize_terms_and_attributes(name) name = replace_interword_separators(name) name = replace_brands_and_names_alternatives(name) name = clean_text(name) return name def replace_brands_and_names_alternatives(name): if not name: return name for nnk in BRANDS_AND_NAMES_ALTERNATIVES_DICT: word = find_full_word(name, BRANDS_AND_NAMES_ALTERNATIVES_DICT[nnk]) if word: name = name.replace(word, nnk.lower()) return name def remove_quotes(text): return re.sub(r'["\'«»]', ' ', text) def clean_wine_name(name): """ Удаляет в конце строки отдельно стоящие буквы (однобуквенные слова), не входящие в состав других слов. Например, "токай л" превратится в "токай". """ # Регулярное выражение ищет: # \s+ – один или несколько пробельных символов; # \b – граница слова; # [A-Za-zА-ЯЁа-яё] – ровно одна буква (латинская или кириллическая); # \b – граница слова; # \s*$ – любые пробелы до конца строки. return re.sub(r'\s+\b[A-Za-zА-ЯЁа-яё\-]\b\s*$', '', name) def find_full_word(text, word_list): """ Ищет первое полное вхождение слова из word_list в строке text. Возвращает найденное слово или None, если совпадение не найдено. """ for word in word_list: pattern = r'\b' + re.escape(word) + r'\b' match = re.search(pattern, text, re.IGNORECASE) if match: return match.group(0) return None def find_word(text, word_list): for word in word_list: if word in text: return word return None def remove_full_words(text, word_list): """ Ищет все полное вхождение слов из word_list в строке text и удаляет их """ for word in word_list: if word: pattern = r'\b' + re.escape(word) + r'\b' text = re.sub(pattern, ' ', text, flags=re.IGNORECASE) return text def merge_wine_type(items, colors=None, color_merge_dict=None): result=[] for row in tqdm(items.iterrows()): try: #print("merge_wine_type:" + str(row)) if row[1]['type_wine'] is not None: color=find_full_word(row[1]['type_wine'], colors) if color is not None: result.append(color) else: color=find_full_word(row[1]['name'], colors) if color is not None: result.append(color) else: result.append(None) else: color=find_full_word(row[1]['name'], colors) if color is not None: result.append(color) else: result.append(None) except Exception as ex: print("Error in merge_wine_type: " + str(ex)) result.append(None) items['new_type_wine']=result items['new_type_wine']=items['new_type_wine'].replace(color_merge_dict) def merge_types(items, products, type_merge_dict={}, sub_alco_types=["Бренди", "Шампань", "Шампанское"], product_types = None): pt = product_types if product_types else products['type'].unique() alco_types=[i.strip().lower() for i in pt] alco_types.append('ликёр') #alco_types.sort(reverse=True) alco_types = sorted(alco_types, reverse=True, key=len) result=[] for row in tqdm(items.iterrows()): try: # Parameter 'sub_alco_types' specifies specific alcohol types that usually specified # in product / item name along with "parent" type and in this case this subtype should have priority # For example, "Вино Шампано Ле Брён де Нёвиль", or "Бренди де Херес" if sub_alco_types: type_in_name=find_full_word(row[1]['name'], sub_alco_types) if type_in_name is not None: result.append(type_in_name) continue type_in_name=find_full_word(row[1]['name'], alco_types) if type_in_name is not None: result.append(type_in_name) continue if row[1]['type'] is not None: type_in_type=find_full_word(row[1]['type'], alco_types) if type_in_type is not None: result.append(type_in_type) else: result.append(row[1]['type']) else: result.append(None) except Exception as ex: print(ex) result.append(None) items['new_type']=result #items['new_type']=items['new_type'].replace({'ликёр': 'ликер', None: 'unmatched'}) items['new_type'] = items['new_type'].replace(type_merge_dict) def trim_name(text, words_to_remove): """ Удаляет из текста только те слова, которые полностью совпадают с элементами списка words_to_remove. :param text: Исходная строка. :param words_to_remove: Список слов, которые необходимо удалить. :return: Обновлённая строка с удалёнными словами. """ # Создаём регулярное выражение, которое ищет любое из указанных слов как отдельное слово. # Используем re.escape, чтобы экранировать спецсимволы в словах. pattern = r'\b(?:' + '|'.join(re.escape(word) for word in words_to_remove) + r')\b' #print("Pattern: " + pattern) # Заменяем найденные полные слова на пустую строку. new_text = re.sub(pattern, '', text, flags=re.IGNORECASE) # Убираем лишние пробелы, возникающие после удаления слов. new_text = re.sub(r'\s+', ' ', new_text).strip() return new_text '''def is_string(val): if not val: return False elif isinstance(val, str): return True return False ''' def detect_language_simple(text): for ch in text: if (ord(ch) >= ord('А') and ord(ch) <= ord('Я')) or \ (ord(ch) >= ord('а') and ord(ch) <= ord('я')): return 'ru' elif (ord(ch) >= ord('A') and ord(ch) <= ord('Z')) or \ (ord(ch) >= ord('a') and ord(ch) <= ord('z')): return 'en' return 'xx'