Spaces:
Sleeping
Sleeping
| import numpy as np | |
| import pandas as pd | |
| import streamlit as st | |
| from App.utils.priorite_pays import dico | |
| # from App.utils.divers_function import data_cleaning_func | |
| import nltk | |
| from typing import Tuple, List | |
| nltk.download("stopwords") | |
| def filter_data_with_valid_keys( | |
| data: pd.DataFrame, | |
| product_id_col: str, | |
| class_id_col: str, | |
| min_product_id_length: int, | |
| valid_class_id_prefixes: List[str], | |
| ) -> pd.DataFrame: | |
| """ | |
| Filter the dataframe based on product ID length and class ID prefixes. | |
| Args: | |
| data (pd.DataFrame): Input dataframe | |
| product_id_col (str): Name of the product ID column | |
| class_id_col (str): Name of the class ID column | |
| min_product_id_length (int): Minimum length for product IDs | |
| valid_class_id_prefixes (List[str]): List of valid prefixes for class IDs | |
| Returns: | |
| pd.DataFrame: Filtered dataframe | |
| """ | |
| filtered_data = data[data[product_id_col].str.len() > min_product_id_length] | |
| try: | |
| filtered_data = filtered_data[ | |
| ~filtered_data[class_id_col].str[0].isin(valid_class_id_prefixes) | |
| ] | |
| except Exception: | |
| pass | |
| return filtered_data | |
| def calculate_product_class_matrix( | |
| data: pd.DataFrame, product_id_col: str, class_id_col: str | |
| ) -> Tuple[pd.DataFrame, pd.DataFrame]: | |
| """ | |
| Calculate the product-class matrix and total counts per product. | |
| Args: | |
| data (pd.DataFrame): Input dataframe | |
| product_id_col (str): Name of the product ID column | |
| class_id_col (str): Name of the class ID column | |
| Returns: | |
| Tuple[pd.DataFrame, pd.DataFrame]: Total counts per product and product-class matrix | |
| """ | |
| matrix = pd.crosstab(data[product_id_col], data[class_id_col]) | |
| total_by_product = matrix.sum(axis=1) | |
| products_with_multiple_classes = total_by_product[total_by_product > 1].index | |
| filtered_data = data[data[product_id_col].isin(products_with_multiple_classes)] | |
| matrix = pd.crosstab(filtered_data[product_id_col], filtered_data[class_id_col]) | |
| total_by_product = matrix.sum(axis=1) | |
| total_by_product_df = pd.DataFrame( | |
| { | |
| product_id_col: total_by_product.index, | |
| "total_by_product": total_by_product.values, | |
| } | |
| ) | |
| return total_by_product_df, matrix | |
| def create_sparse_matrix( | |
| matrix: pd.DataFrame, product_id_col: str, class_id_col: str | |
| ) -> pd.DataFrame: | |
| """ | |
| Create a sparse matrix representation from the product-class matrix. | |
| Args: | |
| matrix (pd.DataFrame): Product-class matrix | |
| product_id_col (str): Name of the product ID column | |
| class_id_col (str): Name of the class ID column | |
| Returns: | |
| pd.DataFrame: Sparse matrix representation | |
| """ | |
| stacked = matrix.stack() | |
| non_zero = stacked[stacked != 0] | |
| sparse_matrix = pd.DataFrame( | |
| { | |
| product_id_col: non_zero.index.get_level_values(0).astype(str), | |
| class_id_col: non_zero.index.get_level_values(1).astype(str), | |
| "count": non_zero.values, | |
| } | |
| ) | |
| return sparse_matrix | |
| def process_new_data( | |
| data: pd.DataFrame, product_id_col: str, class_id_col: str | |
| ) -> Tuple[pd.Series, pd.DataFrame]: | |
| """ | |
| Process the data to create a new dataset with country groups and merged information. | |
| Args: | |
| data (pd.DataFrame): Input dataframe | |
| product_id_col (str): Name of the product ID column | |
| class_id_col (str): Name of the class ID column | |
| Returns: | |
| Tuple[pd.Series, pd.DataFrame]: Country groups and merged dataframe | |
| """ | |
| total_by_product_df, matrix = calculate_product_class_matrix( | |
| data, product_id_col, class_id_col | |
| ) | |
| sparse_matrix = create_sparse_matrix(matrix, product_id_col, class_id_col) | |
| merged_data = pd.merge(sparse_matrix, total_by_product_df, on=[product_id_col]) | |
| merged_data["Proportion"] = merged_data["count"] / merged_data["total_by_product"] | |
| final_merged = merged_data.merge( | |
| data, | |
| left_on=[class_id_col, product_id_col], | |
| right_on=[class_id_col, product_id_col], | |
| ) | |
| try: | |
| country_groups = final_merged.groupby([class_id_col, product_id_col])[ | |
| "Country" | |
| ].agg(lambda x: x.tolist()) | |
| except KeyError: | |
| try: | |
| country_groups = final_merged.groupby([class_id_col, product_id_col])[ | |
| "COUNTRY_KEY" | |
| ].agg(lambda x: x.tolist()) | |
| except KeyError: | |
| country_groups = final_merged.groupby([class_id_col, product_id_col])[ | |
| "COUNTRY" | |
| ].agg(lambda x: x.tolist()) | |
| return country_groups, final_merged | |
| def add_country(produit_id: str, class_id: str, Country) -> List[str]: | |
| """ | |
| Retrieve the list of countries for a given product ID and class ID. | |
| Args: | |
| product_id (str): The product ID | |
| class_id (str): The class ID | |
| country_groups (pd.Series): Series containing country groups | |
| Returns: | |
| List[str]: List of countries for the given product and class | |
| """ | |
| return Country[produit_id, class_id] | |
| def finalize_merged_data( | |
| merged: pd.DataFrame, | |
| country_groups: pd.Series, | |
| product_id_col: str, | |
| class_id_col: str, | |
| ) -> pd.DataFrame: | |
| """ | |
| Finalize the merged data by adding country information and removing duplicates. | |
| Args: | |
| merged (pd.DataFrame): Merged dataframe | |
| country_groups (pd.Series): Series containing country groups | |
| product_id_col (str): Name of the product ID column | |
| class_id_col (str): Name of the class ID column | |
| Returns: | |
| pd.DataFrame: Finalized merged dataframe | |
| """ | |
| try: | |
| merged["Countries"] = merged.apply( | |
| lambda row: add_country( | |
| row[1], row[0], country_groups | |
| ), | |
| axis=1, | |
| ) | |
| merged["Countries"] = merged["Countries"].apply(tuple) | |
| final_merged = merged.drop_duplicates( | |
| subset=[product_id_col, class_id_col, "Countries"] | |
| ) | |
| except Exception as e: | |
| st.warning(f"An error occurred: {e}") | |
| final_merged = None | |
| return final_merged | |
| def filter_by_country_and_proportion( | |
| merged_data: pd.DataFrame, | |
| min_countries: int, | |
| min_proportion: float, | |
| product_id_col: str, | |
| ) -> pd.DataFrame: | |
| """ | |
| Filter the merged data based on minimum number of countries and proportion. | |
| Args: | |
| merged_data (pd.DataFrame): Merged dataframe | |
| min_countries (int): Minimum number of countries required | |
| min_proportion (float): Minimum proportion required | |
| product_id_col (str): Name of the product ID column | |
| Returns: | |
| pd.DataFrame: Filtered dataframe | |
| """ | |
| filtered_data = merged_data[ | |
| (merged_data.Proportion >= min_proportion) | |
| & (merged_data.total_by_product >= min_countries) | |
| ] | |
| product_keys = filtered_data[product_id_col].unique() | |
| result_df = merged_data[merged_data[product_id_col].isin(product_keys)] | |
| return result_df | |
| def process_country_priority( | |
| merged_data: pd.DataFrame, product_id_col: str | |
| ) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: | |
| """ | |
| Process the merged data based on country priority. | |
| Args: | |
| merged_data (pd.DataFrame): Merged dataframe | |
| product_id_col (str): Name of the product ID column | |
| Returns: | |
| Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: Processed dataframes (all, equal weight, non-equal weight) | |
| """ | |
| data = merged_data[ | |
| (merged_data.Proportion == 0.5) & (merged_data.total_by_product >= 2) | |
| ] | |
| product_keys = data[product_id_col].unique() | |
| df = merged_data[merged_data[product_id_col].isin(product_keys)] | |
| df["Weight"] = df["Countries"].apply(lambda x: sum(dico[y] for y in x)) | |
| duplicated_subclass = df.duplicated(subset=[product_id_col, "Weight"], keep=False) | |
| df_equal = df[duplicated_subclass & (df.Proportion == 0.5)] | |
| df_not_equal = df[~df.isin(df_equal)].dropna() | |
| return df, df_equal, df_not_equal | |
| def merge_final_data( | |
| original_data: pd.DataFrame, | |
| new_data: pd.DataFrame, | |
| product_id_col: str, | |
| class_id_col: str, | |
| ) -> Tuple[pd.DataFrame, pd.DataFrame]: | |
| """ | |
| Merge the original data with the new processed data. | |
| Args: | |
| original_data (pd.DataFrame): Original dataframe | |
| new_data (pd.DataFrame): New processed dataframe | |
| product_id_col (str): Name of the product ID column | |
| class_id_col (str): Name of the class ID column | |
| Returns: | |
| Tuple[pd.DataFrame, pd.DataFrame]: Final merged data and changes summary | |
| """ | |
| merged_df = pd.merge( | |
| original_data, | |
| new_data, | |
| on=["COUNTRY_KEY", product_id_col], | |
| how="left", | |
| suffixes=("", "_y"), | |
| ) | |
| merged_df[class_id_col] = merged_df[f"{class_id_col}_y"].fillna( | |
| merged_df[class_id_col] | |
| ) | |
| merged_df[f"{class_id_col[:-4]}_DESC_FR"] = merged_df[ | |
| f"{class_id_col[:-4]}_DESC_FR_y" | |
| ].fillna(merged_df[f"{class_id_col[:-4]}_DESC_FR"]) | |
| df_final = merged_df[ | |
| [product_id_col, "COUNTRY_KEY", class_id_col, f"{class_id_col[:-4]}_DESC_FR"] | |
| ] | |
| merged = pd.merge(original_data, df_final, how="outer", indicator=True) | |
| data_final = merged[merged["_merge"] != "both"] | |
| data_final = data_final.rename(columns={"_merge": "Changes"}) | |
| data_final.sort_values(by=[product_id_col], ascending=True, inplace=True) | |
| data_final["Changes"] = data_final["Changes"].apply( | |
| lambda x: "Before" if x == "left_only" else "After" | |
| ) | |
| data_final = data_final[ | |
| [ | |
| product_id_col, | |
| "COUNTRY_KEY", | |
| class_id_col, | |
| f"{class_id_col[:-4]}_DESC_FR", | |
| "Changes", | |
| ] | |
| ] | |
| data_final.drop_duplicates(inplace=True) | |
| return data_final, df_final | |
| def process_non_equal_data( | |
| df_not_equal: pd.DataFrame, product_id_col: str, class_id_col: str | |
| ) -> pd.DataFrame: | |
| """ | |
| Process data with non-equal weights, selecting the classification with the highest weight. | |
| Args: | |
| df_not_equal (pd.DataFrame): Dataframe with non-equal weights | |
| product_id_col (str): Name of the product ID column | |
| class_id_col (str): Name of the class ID column | |
| Returns: | |
| pd.DataFrame: Processed dataframe with selected classifications | |
| """ | |
| df_multi_country = df_not_equal[df_not_equal.Countries.apply(len) > 1] | |
| max_weight_index = df_multi_country.groupby(product_id_col)["Weight"].idxmax() | |
| df_multi_country.loc[:, [class_id_col, f"{class_id_col[:-4]}_DESC_FR"]] = ( | |
| df_multi_country.loc[ | |
| max_weight_index, [class_id_col, f"{class_id_col[:-4]}_DESC_FR"] | |
| ].values | |
| ) | |
| df_duplicate = df_multi_country.copy() | |
| df_duplicate.Countries = df_duplicate.Countries.str.join(",") | |
| new_df = ( | |
| df_duplicate.explode("Countries") | |
| .rename(columns={"Countries": "Country"}) | |
| .drop_duplicates() | |
| ) | |
| return new_df | |
| def process_france_data( | |
| df: pd.DataFrame, product_id_col: str, class_id_col: str | |
| ) -> pd.DataFrame: | |
| """ | |
| Process data specific to France, handling special cases for item keys. | |
| Args: | |
| df (pd.DataFrame): Input dataframe | |
| product_id_col (str): Name of the product ID column | |
| class_id_col (str): Name of the class ID column | |
| Returns: | |
| pd.DataFrame: Processed dataframe for France | |
| """ | |
| df_france = df[df.Country == "FRA"] | |
| barcodes = df_france[product_id_col].unique() | |
| for barcode in barcodes: | |
| items = df_france.item_key[df_france[product_id_col] == barcode].tolist() | |
| if len(items) == 2: | |
| if "R" in items[0]: | |
| df_france.loc[ | |
| (df_france[product_id_col] == barcode) | |
| & (df_france.item_key == items[0]), | |
| [class_id_col, f"{class_id_col[:-3]}_DESC_FR"], | |
| ] = df_france.loc[ | |
| (df_france[product_id_col] == barcode) | |
| & (df_france.item_key == items[1]), | |
| [class_id_col, f"{class_id_col[:-3]}_DESC_FR"], | |
| ].values | |
| elif "R" in items[1]: | |
| df_france.loc[ | |
| (df_france[product_id_col] == barcode) | |
| & (df_france.item_key == items[1]), | |
| [class_id_col, f"{class_id_col[:-3]}_DESC_FR"], | |
| ] = df_france.loc[ | |
| (df_france[product_id_col] == barcode) | |
| & (df_france.item_key == items[0]), | |
| [class_id_col, f"{class_id_col[:-3]}_DESC_FR"], | |
| ].values | |
| return df_france | |