import json import datetime import settings from preprocess.utils.common.extracters import * from multiprocessing import Process, Queue import pandas as pd from rapidfuzz import fuzz, process from math import isnan from preprocess.utils.common.utils import * from time import perf_counter SCORE_EX_EMPTY = [0,0,0,0,0,0,0,0] SCORE_EX_BRAND_INDEX = 0 SCORE_EX_NAME_INDEX = 1 SCORE_EX_SIMILARITY_INDEX = 2 SCORE_EX_TYPE_INDEX = 3 SCORE_EX_COLORSOUR_INDEX = 4 SCORE_EX_VOLUME_INDEX = 5 SCORE_EX_YEAR_INDEX = 6 SCORE_EX_GB_INDEX = 7 '''def compare_names(name1, name2, scorer=fuzz.ratio, score_cutoff=50): print("Scoring: " + name1 + " vs " + name2) words1 = name1.split(" ") words2 = name2.split(" ") score = 0 for w1 in words1: for w2 in words2: r = scorer(w1, w2) print("\t " + w1 + " - " + w2 + " ; " + str(r)) if r >= score_cutoff: score = score + r print("Score result: " + str(score / (100*len(words1)))) return score / (100*len(words1)) def compare_name_with_list(name, names_list, scorer=fuzz.ratio, score_cutoff=50): result = [] index = 0 for name2 in names_list: result.append((name2, compare_names(name, name2, scorer, score_cutoff), index)) index = index + 1 return result''' '''def prepare_groups_with_ids(items_df, brand_col_name = "new_brand"): """ Предварительная группировка данных из items по (new_brand, type, volume, new_type_wine, sour) с учетом нормализованного названия. Добавляем столбец 'norm_name', чтобы нормализовать значение name один раз заранее. :param items_df: DataFrame с колонками 'new_brand', 'type', 'name', 'id', 'volume', 'new_type_wine', 'sour'. :return: Словарь {(new_brand, type, volume, new_type_wine, sour): [(id, name, norm_name, volume, new_type_wine, sour)]}. """ #items_df = items_df.copy() #items_df['norm_name'] = items_df['name'].apply(normalize_name_ex) #grouped = items_df.groupby([brand_col_name, 'type', 'volume', 'new_type_wine', 'sour']).apply( grouped = items_df.groupby([brand_col_name, 'type', 'volume', 'new_type_wine']).apply( lambda x: list(zip(x['id'], x[brand_col_name], x['name'], x['orig_name'], x['norm_name'], x['volume'], x['new_type_wine'], x['sour'], x['year'])) ).to_dict() #print(grouped) return grouped''' def split_name(name): return name.split(" ") def prepare_groups_with_ids_ex(items_df, key_cols, name_col="name"): """ Предварительная группировка данных из items по (new_brand, type, volume, new_type_wine, sour) с учетом нормализованного названия. Добавляем столбец 'norm_name', чтобы нормализовать значение name один раз заранее. :param items_df: DataFrame с колонками 'new_brand', 'type', 'name', 'id', 'volume', 'new_type_wine', 'sour'. :return: Словарь {(new_brand, type, volume, new_type_wine, sour): [(id, name, norm_name, volume, new_type_wine, sour)]}. """ items_df[name_col + "_splitted"] = items_df[name_col].apply(split_name) items_df["name_2_splitted"] = items_df["name_2"].apply(split_name) grouped = items_df.groupby(key_cols).apply( #lambda x: list(zip(x['id'], x["new_brand"], x['name'], x['orig_name'], x['norm_name'], x['volume'], x['new_type_wine'], x['sour'], x['year'])) #lambda x: list(zip(x['id'], x["new_brand"], x['name'], x['name_2'], x['norm_name'], x['volume'], x['new_type_wine'],x['sour'], x['year'])) #lambda x: list(zip(x['index'], x['norm_name'], x['name_2'])) lambda x: [list(x['index']), list(x['id']), list(x[name_col]), list(x[name_col + "_splitted"]), list(x["name_2"]), list(x["name_2_splitted"])] ).to_dict() return grouped def parse_year(year): if not year: return False elif isinstance(year, str): return int(year) elif isinstance(year, (int, float)) and not isnan(year): return int(year) return False def order_by_best_year(matched_items, year): best_matched_items = [] max_year_matched_items = [] other_matched_items = [] max_year = 0 year = parse_year(year) for mi in matched_items: # Если в оригинале указан год, то ищем точное совпадение, иначе сортируем по году в обратном порядке try: if (isinstance(mi['year'], (int, float)) and not isnan(mi['year'])) or isinstance(mi['year'], str): mi_year = int(mi['year']) else: mi_year = False if year and mi_year and (mi_year == year): best_matched_items.append(mi['id']) mi['score_year'] = 3 elif mi_year: if mi_year > max_year: max_year_matched_items = [mi] max_year = mi_year elif mi_year == max_year: max_year_matched_items.append(mi) except Exception as ex: print("Error processing best year for product " + str(mi["id"]) + " value " + str(mi['year']) + ": " + str(ex)) for m in matched_items: if year: if m['id'] in best_matched_items: m['score_year'] = 3 elif m['id'] in max_year_matched_items: m['score_year'] = 2 else: m['score_year'] = 0 else: m['score_year'] = 3 return matched_items time_11s = time_11 = 0 time_12s = time_12 = 0 time_13s = time_13 = 0 time_14s = time_14 = 0 time_20s = time_20 = 0 time_21s = time_21 = 0 time_22s = time_22 = 0 time_23s = time_23 = 0 time_24s = time_24 = 0 time_25s = time_25 = 0 def compare_names_for_same_brand(name, name_candidates, name_candidates_splitted, scorer, score_cutoff, limit): if not name: return [] result = [] parts = name.split(" ") for idx_candidate in range(len(name_candidates)): parts_c = name_candidates_splitted[idx_candidate] similar_words_count = 0 for p1 in parts: if p1 in parts_c: similar_words_count += 1 #for p1 in parts: # match, score, _ = process.extractOne(p1, parts_c, scorer=scorer) # if score > 90: # total_score += score if similar_words_count > 0: score = 100 if similar_words_count == len(parts): similarity = 3 else: similarity = 2 if score >= score_cutoff: result.append((name_candidates[idx_candidate], score, similarity, idx_candidate)) time_25 += perf_counter() - time_25s idx_candidate += 1 time_22 += perf_counter() - time_22s time_20 += perf_counter() - time_20s return result def compare_names_invariant_order(name, name_candidates, name_candidates_splitted, scorer, score_cutoff, limit): result = [] idx_candidate = 0 if not name: return [] parts = name.split(" ") for idx_candidate in range(len(name_candidates)): parts_c = name_candidates_splitted[idx_candidate] total_score = 0 for p1 in parts: if p1 in parts_c: total_score += 100 # match, score, _ = process.extractOne(p1, parts_c, scorer=scorer) # if score > 90: # total_score += score score = total_score / len(parts) similarity = 3 if len(parts) != len(parts_c): similarity = 2 if score >= score_cutoff: result.append((name_candidates[idx_candidate], score, similarity, idx_candidate)) return result def show_stat(): global time_20s, time_20, time_21s, time_21, time_22s, time_22, time_23s, time_23, time_24s, time_24, time_25s, time_25 print("20 : " + str(time_20) + "\n" + "21 : " + str(time_21) + "\n" + "22 : " + str(time_22) + "\n" + "23 : " + str(time_23) + "\n" + "24 : " + str(time_24) + "\n" + "25 : " + str(time_25) + "\n") def find_matches_from_candidates(item_name, candidates, order_invariant_names_matching, name_threshold, limit, brand_score): global time_11s, time_11, time_12s, time_12, time_13s, time_13, time_14s, time_14 if not candidates or len(candidates) == 0: return [] time_11s = perf_counter() products_indexes = candidates[0] products_ids = candidates[1] products_names = candidates[2] products_names_splitted = candidates[3] products_names_2 = candidates[4] products_names_2_splitted = candidates[5] time_11 += perf_counter() - time_11s time_12s = perf_counter() matches = alt_matches = [] if brand_score > 0: name_threshold = 0 limit = 100 matches = process.extract(item_name, list(products_names), scorer=fuzz.ratio, score_cutoff=name_threshold, limit=limit) matches_2 = process.extract(item_name, list(products_names_2), scorer=fuzz.ratio, score_cutoff=name_threshold, limit=limit) matches.extend(matches_2) alt_matches = [] if order_invariant_names_matching: alt_matches = compare_names_invariant_order(item_name, list(products_names), list(products_names_splitted), scorer=fuzz.ratio, score_cutoff=name_threshold, limit=limit) matches_2 = compare_names_invariant_order(item_name, list(products_names_2), list(products_names_2_splitted), scorer=fuzz.ratio, score_cutoff=name_threshold, limit=limit) alt_matches.extend(matches_2) time_12 += perf_counter() - time_12s '''time_13s = perf_counter() duplicate_indexes = [] matches_new = [] for match, score, idx_candidate in matches: if not idx_candidate in duplicate_indexes: matches_new.append((match, score, idx_candidate)) duplicate_indexes.append(idx_candidate) time_13 += perf_counter() - time_13s matches = matches_new''' time_14s = perf_counter() matched_products = [] if matches: for match, score, idx_candidate in matches: score_ex = SCORE_EX_EMPTY.copy() score_ex[SCORE_EX_BRAND_INDEX] = brand_score score_ex[SCORE_EX_NAME_INDEX] = score score_ex[SCORE_EX_SIMILARITY_INDEX] = 3 matched_products.append((products_indexes[idx_candidate], products_ids[idx_candidate], score, match, score_ex)) if alt_matches: for match, score, similarity, idx_candidate in alt_matches: score_ex = SCORE_EX_EMPTY.copy() score_ex[SCORE_EX_BRAND_INDEX] = brand_score score_ex[SCORE_EX_NAME_INDEX] = score score_ex[SCORE_EX_SIMILARITY_INDEX] = similarity matched_products.append((products_indexes[idx_candidate], products_ids[idx_candidate], score, match, score_ex)) time_14 += perf_counter() - time_14s return matched_products def score_and_filter_matched_items_by_attributes(matched_items, item): filtered_matched_items = [] for mi in matched_items: if (not item['volume'] and not mi['volume']): mi['score_volume'] = 3 elif (not item['volume'] or not mi['volume']): mi['score_volume'] = 2 else: mi_vol = float(mi['volume']) i_vol = float(item['volume']) if abs(mi_vol - i_vol) / max(mi_vol, i_vol) < 0.15: mi['score_volume'] = 3 else: mi['score_volume'] = 0 mi['alternative'] = 1 if item['type'] == mi['type']: mi['score_type'] = 3 elif item['type_l1'] == mi['type_l1'] or item['type_l0'] == "unmatched": mi['score_type'] = 2 elif item['type_l0'] == mi['type_l0']: mi['score_type'] = 1 type_wine_match = sour_match = 0 if item['type_wine'] and mi['color'] and (item['type_wine'] == mi['color']): type_wine_match = 2 if not item['type_wine'] and not mi['color']: type_wine_match = 2 elif not item['type_wine'] or not mi['color']: type_wine_match = 1 if item['sour'] and mi['sour'] and (item['sour'] == mi['sour']): sour_match = 2 if not item['sour'] and not mi['sour']: sour_match = 2 elif not item['sour'] or not mi['sour']: sour_match = 1 if type_wine_match and sour_match: mi['score_colorsour'] = 3 elif type_wine_match and not sour_match: mi['score_colorsour'] = 2 elif not type_wine_match and sour_match: mi['score_colorsour'] = 1 else: mi['score_colorsour'] = 0 mi['alternative'] = 1 #if item['sour']: # if mi['sour'] and mi['sour'] != item['sour']: # if SETTINGS_MATCHING_INCLUDE_ALTERNATIVES: # mi['alternative'] = 1 # mi['score'] *= 0.8 if (item['gb'] and mi['gb']) or (not item['gb'] and not mi['gb']): mi['score_gb'] = 3 else: mi['score_gb'] = 0 mi['alternative'] = 1 if mi['alternative'] and not SETTINGS_MATCHING_INCLUDE_ALTERNATIVES: continue filtered_matched_items.append(mi) return filtered_matched_items def find_matches_for_brand(brand, item, products_groups_brand_type_vol, products_groups_brand_typel1_vol, products_groups_brand_typel0_vol, order_invariant_names_matching, name_threshold, brand_score): item_type = item['type'] item_name = item['name'] item_name_2 = item['name_2'] item_volume = item['volume'] item_type_l1 = item['type_l1'] item_type_l0 = item['type_l0'] item_name_wo_brand = item_name if brand and brand in item_name: item_name_x = item_name.replace(brand, '').strip() if len(item_name_x) > 2: item_name_wo_brand = item_name_x item_name_2_wo_brand = item_name_2 if brand and brand in item_name_2: item_name_x = item_name_2.replace(brand, '').strip() if len(item_name_x) > 2: item_name_2_wo_brand = item_name_x matches = [] key = (brand, item_type, item_volume) products_candidates = products_groups_brand_type_vol.get(key, []) matches_x0xx = find_matches_from_candidates(item_name_wo_brand, products_candidates, order_invariant_names_matching, name_threshold, 100, brand_score) matches.extend(matches_x0xx) if item_name_2_wo_brand: matches_x0xx = find_matches_from_candidates(item_name_2_wo_brand, products_candidates, order_invariant_names_matching, name_threshold, 100, brand_score) matches.extend(matches_x0xx) key = (brand, item_type_l1, item_volume) products_candidates = products_groups_brand_typel1_vol.get(key, []) matches_x1xx = find_matches_from_candidates(item_name_wo_brand, products_candidates, order_invariant_names_matching, name_threshold, 100, brand_score) matches.extend(matches_x1xx) if item_name_2_wo_brand: matches_x1xx = find_matches_from_candidates(item_name_2_wo_brand, products_candidates, order_invariant_names_matching, name_threshold, 100, brand_score) matches.extend(matches_x1xx) key = (brand, item_type_l0, item_volume) products_candidates = products_groups_brand_typel0_vol.get(key, []) matches_x2xx = find_matches_from_candidates(item_name_wo_brand, products_candidates, order_invariant_names_matching, name_threshold, 100, brand_score) matches.extend(matches_x2xx) if item_name_2_wo_brand: matches_x2xx = find_matches_from_candidates(item_name_2_wo_brand, products_candidates, order_invariant_names_matching, name_threshold, 100, brand_score) matches.extend(matches_x2xx) return matches def calculate_total_score(all_matched_items): for mi in all_matched_items: total_score = 28.0 * mi['score_brand']/3 total_score += 45.0 * mi['score_name']/100 total_score += 0.0 * mi['score_similarity'] / 3 total_score += 10.0 * mi['score_year'] / 3 total_score += 4.0 * mi['score_volume'] / 3 total_score += 4.0 * mi['score_type'] / 3 total_score += 5.0 * mi['score_colorsour'] / 3 total_score += 4.0 * mi['score_gb'] / 3 mi['score'] = total_score def new_find_matches_with_ids_func(items_df, products_df, name_threshold=85, products_groups_brand_type_vol=None, products_groups_brand_typel1_vol=None, products_groups_brand_typel0_vol=None, products_groups_typewine_type_vol=None, order_invariant_names_matching=False, index=None, qresult=None): """ Поиск совпадений с сохранением id найденных итемов, используя заранее подготовленные нормализованные группы. Производится два прохода: - Первый: поиск по группам (brand, type, volume, new_type_wine, sour); - Второй: для продуктов без совпадения ищем по альтернативным группам (new_type_wine, new_type, volume, sour), исключая итемы с исходным брендом. Сравнение производится по столбцу norm_name, а для вывода используется оригинальное name. :param products_df: DataFrame с колонками 'id', 'brand', 'type', 'name', 'volume', 'new_type_wine', 'sour', 'new_type'. :param items_groups: Словарь, сформированный функцией prepare_groups_with_ids. :param items_df: DataFrame итемов с колонками 'id', 'new_brand', 'new_type_wine', 'new_type', 'volume', 'name', 'sour'. :param name_threshold: Порог сходства для fuzzy matching. :return: DataFrame с добавленными столбцами 'matched_items' (список совпадений) и 'alternative' (альтернативные совпадения). """ results = [] print("starting [" + str(index) + "]") if name_threshold < 50: name_threshold = 50 all_products_indexes = list(products_df["index"]) all_products_ids = list(products_df["id"]) all_products_brands = list(products_df["new_brand"]) all_products_names = list(products_df["name_wo_brand"]) all_products_names_splitted = list(products_df['name_wo_brand'].apply(split_name)) all_products_names_with_brand = list(products_df["name_with_brand"]) all_products_names_with_brand_splitted = list(products_df['name_with_brand'].apply(split_name)) all_products_names_2 = list(products_df["name_2"]) all_products_names_2_splitted = list(products_df['name_2'].apply(split_name)) #all_products_names_wo_brand = list(products_df["name_wo_brand"]) all_products_orig_names = list(products_df["orig_name"]) all_products_volumes = list(products_df["volume"]) all_products_types = list(products_df["type"]) all_products_types_l1 = list(products_df["type_l1"]) all_products_types_l0 = list(products_df["type_l0"]) all_products_type_wine = list(products_df["new_type_wine"]) all_products_sour = list(products_df["sour"]) all_products_year = list(products_df["year"]) all_products_gbs = list(products_df["gb"]) all_products = [all_products_indexes, all_products_ids, all_products_names, all_products_names_splitted, all_products_names_2, all_products_names_2_splitted] all_products_with_brands = [all_products_indexes, all_products_ids, all_products_names_with_brand, all_products_names_with_brand_splitted, all_products_names_2, all_products_names_2_splitted] time_0s = time_0 = 0 time_1s = time_1 = 0 time_2s = time_2 = 0 time_3s = time_3 = 0 time_4s = time_4 = 0 time_5s = time_5 = 0 time_6s = time_6 = 0 time_7s = time_7 = 0 time_8s = time_8 = 0 time_9s = time_9 = 0 #for idx, item in tqdm(items_df.iterrows(), total=len(items_df)): total=len(items_df) row_index = 0 for idx, item in items_df.iterrows(): time_0s = perf_counter() #print("Matching row " + str(index) + " - " + str(row_index) + "/" + str(total)) row_index += 1 time_1s = perf_counter() item_brand = item['brand'] item_brand_2 = item['brand_2'] item_type = item['type'] item_name = item['name'] item_name_2 = item['name_2'] #item_name_with_brand = item['name_with_brand'] item_volume = item['volume'] item_type_wine = item['new_type_wine'] item_sour = item['sour'] item_type_l1 = item['type_l1'] item_type_l0 = item['type_l0'] matched_items = [] time_1 += perf_counter() - time_1s time_2s = perf_counter() all_matches = [] # First let's find matches for all brands we found for the item so far used_brands = [] if item['brand']: matches = find_matches_for_brand(item['brand'], item, products_groups_brand_type_vol, products_groups_brand_typel1_vol, products_groups_brand_typel0_vol, order_invariant_names_matching, name_threshold, 3) all_matches.extend(matches) used_brands.append(item['brand']) if item['brand_2']: matches = find_matches_for_brand(item['brand_2'], item, products_groups_brand_type_vol, products_groups_brand_typel1_vol, products_groups_brand_typel0_vol, order_invariant_names_matching, name_threshold, 3) all_matches.extend(matches) used_brands.append(item['brand_2']) if item['new_brand'] and (not item['new_brand'] in used_brands): matches = find_matches_for_brand(item['new_brand'], item, products_groups_brand_type_vol, products_groups_brand_typel1_vol, products_groups_brand_typel0_vol, order_invariant_names_matching, name_threshold, 2) all_matches.extend(matches) used_brands.append(item['new_brand']) for ab in item['alt_brands']: if not ab in used_brands: matches = find_matches_for_brand(ab, item, products_groups_brand_type_vol, products_groups_brand_typel1_vol, products_groups_brand_typel0_vol, order_invariant_names_matching, name_threshold, 2) all_matches.extend(matches) used_brands.append(ab) # All further searchings is performed using full name with brand item_name_with_brand = item_name if item_brand and not item_brand in item_name: item_name_with_brand = item_brand + " " + item_name item_name_2_with_brand = item_name_2 if item_name_2 and item_brand and not item_brand in item_name_2: item_name_2_with_brand = item_brand + " " + item_name_2 alt_key = (item_type_wine, item_type, item_volume) products_candidates = products_groups_typewine_type_vol.get(alt_key, []) matches = find_matches_from_candidates(item_name_with_brand, products_candidates, order_invariant_names_matching, name_threshold, 30, 0) all_matches.extend(matches) if item_name_2_with_brand: matches = find_matches_from_candidates(item_name_2_with_brand, products_candidates, order_invariant_names_matching, name_threshold, 30, 0) all_matches.extend(matches) # Finally search among all products matches = find_matches_from_candidates(item_name_with_brand, all_products_with_brands, order_invariant_names_matching, name_threshold, 30, 0) all_matches.extend(matches) if item_name_2_with_brand: matches = find_matches_from_candidates(item_name_2_with_brand, all_products_with_brands, order_invariant_names_matching, name_threshold, 30, 0) all_matches.extend(matches) if not item['brand']: matches = find_matches_from_candidates(item_name_with_brand, all_products, order_invariant_names_matching, name_threshold, 30, 0) all_matches.extend(matches) if item_name_2_with_brand: matches = find_matches_from_candidates(item_name_2_with_brand, all_products, order_invariant_names_matching, name_threshold, 30, 0) all_matches.extend(matches) all_matched_items = [ { 'id': all_products_ids[product_index], 'brand': all_products_brands[product_index], 'item_name': all_products_names[product_index], 'score': 0, 'alternative': 0, 'score_ex': score_ex, 'score_brand': score_ex[SCORE_EX_BRAND_INDEX], 'score_name': int(score_ex[SCORE_EX_NAME_INDEX]), 'score_similarity': score_ex[SCORE_EX_SIMILARITY_INDEX], 'score_type': score_ex[SCORE_EX_TYPE_INDEX], 'score_colorsour': score_ex[SCORE_EX_COLORSOUR_INDEX], 'score_volume': score_ex[SCORE_EX_VOLUME_INDEX], 'score_year': score_ex[SCORE_EX_YEAR_INDEX], 'score_gb': score_ex[SCORE_EX_GB_INDEX], 'item_orig_name': all_products_orig_names[product_index], 'volume': all_products_volumes[product_index], 'type': all_products_types[product_index], 'type_l1': all_products_types_l1[product_index], 'type_l0': all_products_types_l0[product_index], 'color': all_products_type_wine[product_index], 'sour': all_products_sour[product_index], 'year': all_products_year[product_index], 'gb': all_products_gbs[product_index], } for product_index, product_id, score, match, score_ex in all_matches ] all_matched_items = score_and_filter_matched_items_by_attributes(all_matched_items, item) all_matched_items = order_by_best_year(all_matched_items, item['year']) calculate_total_score(all_matched_items) # Now it's time to sort by all scores all_matched_items = sorted(all_matched_items, key=lambda d: d['score'], reverse=True) duplicate_ids = [] best_score_ex = '' all_matched_items_new = [] for product in all_matched_items: if not product['id'] in duplicate_ids: score_ex = 'B' + str(product["score_brand"]) + ',' + \ 'N' + str(product["score_name"]) + ',' + \ 'S' + str(product["score_similarity"]) + ',' + \ 'T' + str(product["score_type"]) + ',' + \ 'C' + str(product["score_colorsour"]) + ',' + \ 'V' + str(product["score_volume"]) + ',' + \ 'Y' + str(product["score_year"]) + ',' + \ 'G' + str(product["score_gb"]) if not best_score_ex: best_score_ex = score_ex product['score_ex'] = score_ex all_matched_items_new.append(product) duplicate_ids.append(product['id']) results.append({ 'item_id': item['id'], #"matched_top_id": top_matched_id, 'best_score_ex': best_score_ex, 'matched_items': all_matched_items_new[:10], #"alternative_top_id": "", #'alternative': [] # Заполняется во втором проходе }) #results[idx]['matched_items'].extend(alt_matched_items) #results[idx]['match_type'] = "".join(match_type) time_0 += perf_counter() - time_0s print("finished [" + str(index) + "]") if qresult: qresult.put(results) return results def new_find_matches_with_ids(items_df, products_df, name_threshold=85, products_groups_brand_type_vol=None, products_groups_brand_typel1_vol=None, products_groups_brand_typel0_vol=None, products_groups_typewine_type_vol=None, order_invariant_names_matching = False, thread_count = 8): print("Started matching at " + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + "\n") if len(items_df) < 1000: results = new_find_matches_with_ids_func(items_df, products_df, name_threshold, products_groups_brand_type_vol, products_groups_brand_typel1_vol, products_groups_brand_typel0_vol, products_groups_typewine_type_vol, order_invariant_names_matching, 0) show_stat() else: results = [] threads_data = list() chunk_size = len(items_df) // thread_count + 1 num_chunks = len(items_df) // chunk_size + 1 for i in range(num_chunks): #for i in range(1): chunk = items_df[i * chunk_size:(i + 1) * chunk_size] data = {"index": i, "items_df": chunk, "products_df": products_df, "name_threshold":name_threshold, "products_groups_brand_type_vol":products_groups_brand_type_vol, "products_groups_brand_typel1_vol":products_groups_brand_typel1_vol, "products_groups_brand_typel0_vol": products_groups_brand_typel0_vol, "products_groups_typewine_type_vol": products_groups_typewine_type_vol, "order_invariant_names_matching": order_invariant_names_matching} q = Queue() p = Process(target=new_find_matches_with_ids_func, args=(chunk, products_df, name_threshold, products_groups_brand_type_vol, products_groups_brand_typel1_vol, products_groups_brand_typel0_vol, products_groups_typewine_type_vol, order_invariant_names_matching, i, q)) p.start() threads_data.append({"index": i, "q": q}) for td in threads_data: td["result"] = td["q"].get() for td in threads_data: results.extend(td["result"]) for r in results: r['matched_items'] = json.dumps(r['matched_items'], ensure_ascii=False) results_df = pd.DataFrame(results) merged_df = items_df.merge(results_df, left_on='id', right_on='item_id').drop(columns=['item_id']) print("Finished matching at " + datetime.datetime.now().strftime("Started at: %Y-%m-%d %H:%M:%S") + "\n") return merged_df