j-s-v's picture
2025-07-28
d4bade4
import re
from transliterate import translit, detect_language
from constants.constants import *
from tqdm import tqdm
def get_delimiter(file_path):
with open(file_path, 'r', encoding="utf-8") as f:
ln = f.readline()
if ',' in ln:
return ','
if ';' in ln:
return ';'
if '\t' in ln:
return '\t'
if '|' in ln:
return '|'
raise ValueError(None, "Error parsing CSV file. Cannot detect delimiter")
def verify_csv(csv_file, fixed_csv=None):
bad_lines = []
lnnum = 1
if fixed_csv:
w = open(fixed_csv, "w", encoding="utf-8")
with open(csv_file, "r", encoding="utf-8") as f:
ln = f.readline()
if fixed_csv:
w.write(ln)
while True:
ln = f.readline()
if len(ln) == 0:
break
if not ln.count('"') % 2 == 0:
bad_lines.append("(" + str(lnnum) + "): " + ln)
if fixed_csv:
w.write(ln.replace('"', '""', 1))
#raise Exception("Incorrect quotes at line " + str(lnnum) + " in file [" + csv_file + "]")
else:
if fixed_csv:
w.write(ln)
lnnum = lnnum + 1
if fixed_csv:
w.close()
return bad_lines
def normalize_name(name):
"""
Нормализует строку: если обнаруживается русский язык, транслитерирует её в латиницу,
приводит к нижнему регистру.
"""
try:
if not isinstance(name, str):
return name
if detect_language(name) == 'ru':
return translit(name, 'ru', reversed=True).lower()
except Exception:
pass
return name.lower()
def preprocess_product_brand(name):
if not isinstance(name, str):
return str(name)
name = name.lower()
name = replace_interword_separators(name)
return name
def preprocess_product_name(name):
if not isinstance(name, str):
return str(name)
return name.lower()
def replace_interword_separators(name):
return name.replace("'", "").replace("`", "").replace("’", "").replace('-', '')
def clean_text(text):
text = remove_quotes(text)
# Удаляем одиночную звездочку, остающуюся после объема вида "0.75*'
text = re.sub(r'(\s|^)[\*;](\s|$)', ' ', text)
# Убираем '()'
text = re.sub(r'\(\s*\)', ' ', text)
#text = clean_wine_name(text)
# Убираем //
text = re.sub(r'/\s*/', '/', text).strip()
# Убираем /
text = re.sub(r'\s+/\s+', ' ', text).strip()
# Убираем дублирующиеся пробелы
text = re.sub(r'\s+', ' ', text).strip()
return text
def normalize_and_clean_brand(brand):
if not brand:
return brand
brand = normalize_name(brand)
brand = replace_interword_separators(brand)
brand = clean_text(brand)
return brand
def normalize_terms_and_attributes(name):
if not name:
return name
for term in TERMS_AND_ATTRIBUTES_VARIATIONS:
word = find_full_word(name, TERMS_AND_ATTRIBUTES_VARIATIONS[term])
if word:
name = name.replace(word, term.lower())
return name
def normalize_and_clean_name(name):
if not name:
return name
name = normalize_name(name)
name = normalize_terms_and_attributes(name)
name = replace_interword_separators(name)
name = replace_brands_and_names_alternatives(name)
name = clean_text(name)
return name
def replace_brands_and_names_alternatives(name):
if not name:
return name
for nnk in BRANDS_AND_NAMES_ALTERNATIVES_DICT:
word = find_full_word(name, BRANDS_AND_NAMES_ALTERNATIVES_DICT[nnk])
if word:
name = name.replace(word, nnk.lower())
return name
def remove_quotes(text):
return re.sub(r'["\'«»]', ' ', text)
def clean_wine_name(name):
"""
Удаляет в конце строки отдельно стоящие буквы (однобуквенные слова), не входящие в состав других слов.
Например, "токай л" превратится в "токай".
"""
# Регулярное выражение ищет:
# \s+ – один или несколько пробельных символов;
# \b – граница слова;
# [A-Za-zА-ЯЁа-яё] – ровно одна буква (латинская или кириллическая);
# \b – граница слова;
# \s*$ – любые пробелы до конца строки.
return re.sub(r'\s+\b[A-Za-zА-ЯЁа-яё\-]\b\s*$', '', name)
def find_full_word(text, word_list):
"""
Ищет первое полное вхождение слова из word_list в строке text.
Возвращает найденное слово или None, если совпадение не найдено.
"""
for word in word_list:
pattern = r'\b' + re.escape(word) + r'\b'
match = re.search(pattern, text, re.IGNORECASE)
if match:
return match.group(0)
return None
def find_word(text, word_list):
for word in word_list:
if word in text:
return word
return None
def remove_full_words(text, word_list):
"""
Ищет все полное вхождение слов из word_list в строке text и удаляет их
"""
for word in word_list:
if word:
pattern = r'\b' + re.escape(word) + r'\b'
text = re.sub(pattern, ' ', text, flags=re.IGNORECASE)
return text
def merge_wine_type(items, colors=None, color_merge_dict=None):
result=[]
for row in tqdm(items.iterrows()):
try:
#print("merge_wine_type:" + str(row))
if row[1]['type_wine'] is not None:
color=find_full_word(row[1]['type_wine'], colors)
if color is not None:
result.append(color)
else:
color=find_full_word(row[1]['name'], colors)
if color is not None:
result.append(color)
else:
result.append(None)
else:
color=find_full_word(row[1]['name'], colors)
if color is not None:
result.append(color)
else:
result.append(None)
except Exception as ex:
print("Error in merge_wine_type: " + str(ex))
result.append(None)
items['new_type_wine']=result
items['new_type_wine']=items['new_type_wine'].replace(color_merge_dict)
def merge_types(items, products, type_merge_dict={}, sub_alco_types=["Бренди", "Шампань", "Шампанское"], product_types = None):
pt = product_types if product_types else products['type'].unique()
alco_types=[i.strip().lower() for i in pt]
alco_types.append('ликёр')
#alco_types.sort(reverse=True)
alco_types = sorted(alco_types, reverse=True, key=len)
result=[]
for row in tqdm(items.iterrows()):
try:
# Parameter 'sub_alco_types' specifies specific alcohol types that usually specified
# in product / item name along with "parent" type and in this case this subtype should have priority
# For example, "Вино Шампано Ле Брён де Нёвиль", or "Бренди де Херес"
if sub_alco_types:
type_in_name=find_full_word(row[1]['name'], sub_alco_types)
if type_in_name is not None:
result.append(type_in_name)
continue
type_in_name=find_full_word(row[1]['name'], alco_types)
if type_in_name is not None:
result.append(type_in_name)
continue
if row[1]['type'] is not None:
type_in_type=find_full_word(row[1]['type'], alco_types)
if type_in_type is not None:
result.append(type_in_type)
else:
result.append(row[1]['type'])
else:
result.append(None)
except Exception as ex:
print(ex)
result.append(None)
items['new_type']=result
#items['new_type']=items['new_type'].replace({'ликёр': 'ликер', None: 'unmatched'})
items['new_type'] = items['new_type'].replace(type_merge_dict)
def trim_name(text, words_to_remove):
"""
Удаляет из текста только те слова, которые полностью совпадают с элементами списка words_to_remove.
:param text: Исходная строка.
:param words_to_remove: Список слов, которые необходимо удалить.
:return: Обновлённая строка с удалёнными словами.
"""
# Создаём регулярное выражение, которое ищет любое из указанных слов как отдельное слово.
# Используем re.escape, чтобы экранировать спецсимволы в словах.
pattern = r'\b(?:' + '|'.join(re.escape(word) for word in words_to_remove) + r')\b'
#print("Pattern: " + pattern)
# Заменяем найденные полные слова на пустую строку.
new_text = re.sub(pattern, '', text, flags=re.IGNORECASE)
# Убираем лишние пробелы, возникающие после удаления слов.
new_text = re.sub(r'\s+', ' ', new_text).strip()
return new_text
'''def is_string(val):
if not val:
return False
elif isinstance(val, str):
return True
return False
'''
def detect_language_simple(text):
for ch in text:
if (ord(ch) >= ord('А') and ord(ch) <= ord('Я')) or \
(ord(ch) >= ord('а') and ord(ch) <= ord('я')):
return 'ru'
elif (ord(ch) >= ord('A') and ord(ch) <= ord('Z')) or \
(ord(ch) >= ord('a') and ord(ch) <= ord('z')):
return 'en'
return 'xx'