Carrefourrefbem / App /functions_rupture /functions_gestion.py
COULIBALY Bourahima
update
2c49a88
raw
history blame
12.5 kB
import numpy as np
import pandas as pd
import streamlit as st
from App.utils.priorite_pays import dico
# from App.utils.divers_function import data_cleaning_func
import nltk
from typing import Tuple, List
nltk.download("stopwords")
def filter_data_with_valid_keys(
data: pd.DataFrame,
product_id_col: str,
class_id_col: str,
min_product_id_length: int,
valid_class_id_prefixes: List[str],
) -> pd.DataFrame:
"""
Filter the dataframe based on product ID length and class ID prefixes.
Args:
data (pd.DataFrame): Input dataframe
product_id_col (str): Name of the product ID column
class_id_col (str): Name of the class ID column
min_product_id_length (int): Minimum length for product IDs
valid_class_id_prefixes (List[str]): List of valid prefixes for class IDs
Returns:
pd.DataFrame: Filtered dataframe
"""
filtered_data = data[data[product_id_col].str.len() > min_product_id_length]
try:
filtered_data = filtered_data[
~filtered_data[class_id_col].str[0].isin(valid_class_id_prefixes)
]
except Exception:
pass
return filtered_data
@st.cache_data
def calculate_product_class_matrix(
data: pd.DataFrame, product_id_col: str, class_id_col: str
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""
Calculate the product-class matrix and total counts per product.
Args:
data (pd.DataFrame): Input dataframe
product_id_col (str): Name of the product ID column
class_id_col (str): Name of the class ID column
Returns:
Tuple[pd.DataFrame, pd.DataFrame]: Total counts per product and product-class matrix
"""
matrix = pd.crosstab(data[product_id_col], data[class_id_col])
total_by_product = matrix.sum(axis=1)
products_with_multiple_classes = total_by_product[total_by_product > 1].index
filtered_data = data[data[product_id_col].isin(products_with_multiple_classes)]
matrix = pd.crosstab(filtered_data[product_id_col], filtered_data[class_id_col])
total_by_product = matrix.sum(axis=1)
total_by_product_df = pd.DataFrame(
{
product_id_col: total_by_product.index,
"total_by_product": total_by_product.values,
}
)
return total_by_product_df, matrix
@st.cache_data
def create_sparse_matrix(
matrix: pd.DataFrame, product_id_col: str, class_id_col: str
) -> pd.DataFrame:
"""
Create a sparse matrix representation from the product-class matrix.
Args:
matrix (pd.DataFrame): Product-class matrix
product_id_col (str): Name of the product ID column
class_id_col (str): Name of the class ID column
Returns:
pd.DataFrame: Sparse matrix representation
"""
stacked = matrix.stack()
non_zero = stacked[stacked != 0]
sparse_matrix = pd.DataFrame(
{
product_id_col: non_zero.index.get_level_values(0).astype(str),
class_id_col: non_zero.index.get_level_values(1).astype(str),
"count": non_zero.values,
}
)
return sparse_matrix
@st.cache_data
def process_new_data(
data: pd.DataFrame, product_id_col: str, class_id_col: str
) -> Tuple[pd.Series, pd.DataFrame]:
"""
Process the data to create a new dataset with country groups and merged information.
Args:
data (pd.DataFrame): Input dataframe
product_id_col (str): Name of the product ID column
class_id_col (str): Name of the class ID column
Returns:
Tuple[pd.Series, pd.DataFrame]: Country groups and merged dataframe
"""
total_by_product_df, matrix = calculate_product_class_matrix(
data, product_id_col, class_id_col
)
sparse_matrix = create_sparse_matrix(matrix, product_id_col, class_id_col)
merged_data = pd.merge(sparse_matrix, total_by_product_df, on=[product_id_col])
merged_data["Proportion"] = merged_data["count"] / merged_data["total_by_product"]
final_merged = merged_data.merge(
data,
left_on=[class_id_col, product_id_col],
right_on=[class_id_col, product_id_col],
)
try:
country_groups = final_merged.groupby([class_id_col, product_id_col])[
"Country"
].agg(lambda x: x.tolist())
except KeyError:
try:
country_groups = final_merged.groupby([class_id_col, product_id_col])[
"COUNTRY_KEY"
].agg(lambda x: x.tolist())
except KeyError:
country_groups = final_merged.groupby([class_id_col, product_id_col])[
"COUNTRY"
].agg(lambda x: x.tolist())
return country_groups, final_merged
def add_country(produit_id: str, class_id: str, Country) -> List[str]:
"""
Retrieve the list of countries for a given product ID and class ID.
Args:
product_id (str): The product ID
class_id (str): The class ID
country_groups (pd.Series): Series containing country groups
Returns:
List[str]: List of countries for the given product and class
"""
return Country[produit_id, class_id]
def finalize_merged_data(
merged: pd.DataFrame,
country_groups: pd.Series,
product_id_col: str,
class_id_col: str,
) -> pd.DataFrame:
"""
Finalize the merged data by adding country information and removing duplicates.
Args:
merged (pd.DataFrame): Merged dataframe
country_groups (pd.Series): Series containing country groups
product_id_col (str): Name of the product ID column
class_id_col (str): Name of the class ID column
Returns:
pd.DataFrame: Finalized merged dataframe
"""
try:
merged["Countries"] = merged.apply(
lambda row: add_country(
row[1], row[0], country_groups
),
axis=1,
)
merged["Countries"] = merged["Countries"].apply(tuple)
final_merged = merged.drop_duplicates(
subset=[product_id_col, class_id_col, "Countries"]
)
except Exception as e:
st.warning(f"An error occurred: {e}")
final_merged = None
return final_merged
def filter_by_country_and_proportion(
merged_data: pd.DataFrame,
min_countries: int,
min_proportion: float,
product_id_col: str,
) -> pd.DataFrame:
"""
Filter the merged data based on minimum number of countries and proportion.
Args:
merged_data (pd.DataFrame): Merged dataframe
min_countries (int): Minimum number of countries required
min_proportion (float): Minimum proportion required
product_id_col (str): Name of the product ID column
Returns:
pd.DataFrame: Filtered dataframe
"""
filtered_data = merged_data[
(merged_data.Proportion >= min_proportion)
& (merged_data.total_by_product >= min_countries)
]
product_keys = filtered_data[product_id_col].unique()
result_df = merged_data[merged_data[product_id_col].isin(product_keys)]
return result_df
def process_country_priority(
merged_data: pd.DataFrame, product_id_col: str
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""
Process the merged data based on country priority.
Args:
merged_data (pd.DataFrame): Merged dataframe
product_id_col (str): Name of the product ID column
Returns:
Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: Processed dataframes (all, equal weight, non-equal weight)
"""
data = merged_data[
(merged_data.Proportion == 0.5) & (merged_data.total_by_product >= 2)
]
product_keys = data[product_id_col].unique()
df = merged_data[merged_data[product_id_col].isin(product_keys)]
df["Weight"] = df["Countries"].apply(lambda x: sum(dico[y] for y in x))
duplicated_subclass = df.duplicated(subset=[product_id_col, "Weight"], keep=False)
df_equal = df[duplicated_subclass & (df.Proportion == 0.5)]
df_not_equal = df[~df.isin(df_equal)].dropna()
return df, df_equal, df_not_equal
def merge_final_data(
original_data: pd.DataFrame,
new_data: pd.DataFrame,
product_id_col: str,
class_id_col: str,
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""
Merge the original data with the new processed data.
Args:
original_data (pd.DataFrame): Original dataframe
new_data (pd.DataFrame): New processed dataframe
product_id_col (str): Name of the product ID column
class_id_col (str): Name of the class ID column
Returns:
Tuple[pd.DataFrame, pd.DataFrame]: Final merged data and changes summary
"""
merged_df = pd.merge(
original_data,
new_data,
on=["COUNTRY_KEY", product_id_col],
how="left",
suffixes=("", "_y"),
)
merged_df[class_id_col] = merged_df[f"{class_id_col}_y"].fillna(
merged_df[class_id_col]
)
merged_df[f"{class_id_col[:-4]}_DESC_FR"] = merged_df[
f"{class_id_col[:-4]}_DESC_FR_y"
].fillna(merged_df[f"{class_id_col[:-4]}_DESC_FR"])
df_final = merged_df[
[product_id_col, "COUNTRY_KEY", class_id_col, f"{class_id_col[:-4]}_DESC_FR"]
]
merged = pd.merge(original_data, df_final, how="outer", indicator=True)
data_final = merged[merged["_merge"] != "both"]
data_final = data_final.rename(columns={"_merge": "Changes"})
data_final.sort_values(by=[product_id_col], ascending=True, inplace=True)
data_final["Changes"] = data_final["Changes"].apply(
lambda x: "Before" if x == "left_only" else "After"
)
data_final = data_final[
[
product_id_col,
"COUNTRY_KEY",
class_id_col,
f"{class_id_col[:-4]}_DESC_FR",
"Changes",
]
]
data_final.drop_duplicates(inplace=True)
return data_final, df_final
def process_non_equal_data(
df_not_equal: pd.DataFrame, product_id_col: str, class_id_col: str
) -> pd.DataFrame:
"""
Process data with non-equal weights, selecting the classification with the highest weight.
Args:
df_not_equal (pd.DataFrame): Dataframe with non-equal weights
product_id_col (str): Name of the product ID column
class_id_col (str): Name of the class ID column
Returns:
pd.DataFrame: Processed dataframe with selected classifications
"""
df_multi_country = df_not_equal[df_not_equal.Countries.apply(len) > 1]
max_weight_index = df_multi_country.groupby(product_id_col)["Weight"].idxmax()
df_multi_country.loc[:, [class_id_col, f"{class_id_col[:-4]}_DESC_FR"]] = (
df_multi_country.loc[
max_weight_index, [class_id_col, f"{class_id_col[:-4]}_DESC_FR"]
].values
)
df_duplicate = df_multi_country.copy()
df_duplicate.Countries = df_duplicate.Countries.str.join(",")
new_df = (
df_duplicate.explode("Countries")
.rename(columns={"Countries": "Country"})
.drop_duplicates()
)
return new_df
def process_france_data(
df: pd.DataFrame, product_id_col: str, class_id_col: str
) -> pd.DataFrame:
"""
Process data specific to France, handling special cases for item keys.
Args:
df (pd.DataFrame): Input dataframe
product_id_col (str): Name of the product ID column
class_id_col (str): Name of the class ID column
Returns:
pd.DataFrame: Processed dataframe for France
"""
df_france = df[df.Country == "FRA"]
barcodes = df_france[product_id_col].unique()
for barcode in barcodes:
items = df_france.item_key[df_france[product_id_col] == barcode].tolist()
if len(items) == 2:
if "R" in items[0]:
df_france.loc[
(df_france[product_id_col] == barcode)
& (df_france.item_key == items[0]),
[class_id_col, f"{class_id_col[:-3]}_DESC_FR"],
] = df_france.loc[
(df_france[product_id_col] == barcode)
& (df_france.item_key == items[1]),
[class_id_col, f"{class_id_col[:-3]}_DESC_FR"],
].values
elif "R" in items[1]:
df_france.loc[
(df_france[product_id_col] == barcode)
& (df_france.item_key == items[1]),
[class_id_col, f"{class_id_col[:-3]}_DESC_FR"],
] = df_france.loc[
(df_france[product_id_col] == barcode)
& (df_france.item_key == items[0]),
[class_id_col, f"{class_id_col[:-3]}_DESC_FR"],
].values
return df_france