Spaces:
Build error
Build error
| # Importing necessary libraries | |
| import streamlit as st | |
| st.set_page_config( | |
| page_title="Data Import", | |
| page_icon="⚖️", | |
| layout="wide", | |
| initial_sidebar_state="collapsed", | |
| ) | |
| import re | |
| import sys | |
| import pickle | |
| import numbers | |
| import traceback | |
| import pandas as pd | |
| from scenario import numerize | |
| from post_gres_cred import db_cred | |
| from collections import OrderedDict | |
| from log_application import log_message | |
| from utilities import set_header, load_local_css, update_db, project_selection | |
| from constants import ( | |
| upload_rows_limit, | |
| upload_column_limit, | |
| word_length_limit_lower, | |
| word_length_limit_upper, | |
| minimum_percent_overlap, | |
| minimum_row_req, | |
| percent_drop_col_threshold, | |
| ) | |
| schema = db_cred["schema"] | |
| load_local_css("styles.css") | |
| set_header() | |
| # Initialize project name session state | |
| if "project_name" not in st.session_state: | |
| st.session_state["project_name"] = None | |
| # Fetch project dictionary | |
| if "project_dct" not in st.session_state: | |
| project_selection() | |
| st.stop() | |
| # Display Username and Project Name | |
| if "username" in st.session_state and st.session_state["username"] is not None: | |
| cols1 = st.columns([2, 1]) | |
| with cols1[0]: | |
| st.markdown(f"**Welcome {st.session_state['username']}**") | |
| with cols1[1]: | |
| st.markdown(f"**Current Project: {st.session_state['project_name']}**") | |
| # Initialize session state keys | |
| if "granularity_selection_key" not in st.session_state: | |
| st.session_state["granularity_selection_key"] = st.session_state["project_dct"][ | |
| "data_import" | |
| ]["granularity_selection"] | |
| # Function to format name | |
| def name_format_func(name): | |
| return str(name).strip().title() | |
| # Function to get columns with specified prefix and remove prefix | |
| def get_columns_with_prefix(df, prefix): | |
| return [ | |
| col.replace(prefix, "") | |
| for col in df.columns | |
| if col.startswith(prefix) and str(col) != str(prefix) | |
| ] | |
| # Function to fetch columns info | |
| def fetch_columns(gold_layer_df, data_upload_df): | |
| # Get lists of columns starting with 'spends_' and 'response_metric_' from gold_layer_df | |
| spends_columns_gold_layer = get_columns_with_prefix(gold_layer_df, "spends_") | |
| response_metric_columns_gold_layer = get_columns_with_prefix( | |
| gold_layer_df, "response_metric_" | |
| ) | |
| # Get lists of columns starting with 'spends_' and 'response_metric_' from data_upload_df | |
| spends_columns_upload = get_columns_with_prefix(data_upload_df, "spends_") | |
| response_metric_columns_upload = get_columns_with_prefix( | |
| data_upload_df, "response_metric_" | |
| ) | |
| # Combine lists from both DataFrames | |
| spends_columns = spends_columns_gold_layer + spends_columns_upload | |
| # Remove 'total' from the spends_columns list if it exists | |
| spends_columns = list( | |
| set([col for col in spends_columns if not col.endswith("_total")]) | |
| ) | |
| response_metric_columns = ( | |
| response_metric_columns_gold_layer + response_metric_columns_upload | |
| ) | |
| # Filter columns ending with '_total' and remove the '_total' suffix | |
| response_metric_columns = list( | |
| set( | |
| [ | |
| col[:-6] | |
| for col in response_metric_columns | |
| if col.endswith("_total") and len(col[:-6]) != 0 | |
| ] | |
| ) | |
| ) | |
| # Get list of all columns from both DataFrames | |
| gold_layer_columns = list(gold_layer_df.columns) | |
| data_upload_columns = list(data_upload_df.columns) | |
| # Combine all columns and get unique columns | |
| all_columns = list(set(gold_layer_columns + data_upload_columns)) | |
| return ( | |
| spends_columns, | |
| response_metric_columns, | |
| all_columns, | |
| gold_layer_columns, | |
| data_upload_columns, | |
| ) | |
| # Function to format values for display | |
| def format_values_for_display(values_list): | |
| # Format value | |
| formatted_list = [value.lower().strip() for value in values_list] | |
| # Join values with commas and 'and' before the last value | |
| if len(formatted_list) > 1: | |
| return ", ".join(formatted_list[:-1]) + ", and " + formatted_list[-1] | |
| elif formatted_list: | |
| return formatted_list[0] | |
| return "No values available" | |
| # Function to validate input DataFrame | |
| def valid_input_df( | |
| df, | |
| spends_columns, | |
| response_metric_columns, | |
| total_columns, | |
| gold_layer_columns, | |
| data_upload_columns, | |
| ): | |
| # Check if DataFrame is empty | |
| if df.empty or len(df) < 1: | |
| return (True, None) | |
| # Check for invalid column names | |
| invalid_columns = [ | |
| col | |
| for col in df.columns | |
| if not re.match(r"^[A-Za-z0-9_]+$", col) | |
| or not (word_length_limit_lower <= len(col) <= word_length_limit_upper) | |
| ] | |
| if invalid_columns: | |
| return ( | |
| False, | |
| f"Invalid column names: {format_values_for_display(invalid_columns)}. Use only letters, numbers, and underscores. Column name length should be {word_length_limit_lower} to {word_length_limit_upper} characters long.", | |
| ) | |
| # Ensure 'panel' column values are strings and conform to specified pattern and length | |
| if "panel" in df.columns: | |
| df["panel"] = df["panel"].astype(str).str.strip() | |
| invalid_panel_values = [ | |
| val | |
| for val in df["panel"].unique() | |
| if not re.match(r"^[A-Za-z0-9_]+$", val) | |
| or not (word_length_limit_lower <= len(val) <= word_length_limit_upper) | |
| ] | |
| if invalid_panel_values: | |
| return ( | |
| False, | |
| f"Invalid panel values: {format_values_for_display(invalid_panel_values)}. Use only letters, numbers, and underscores. Panel name length should be {word_length_limit_lower} to {word_length_limit_upper} characters long.", | |
| ) | |
| # Check for missing required columns | |
| required_columns = ["date", "panel"] | |
| missing_columns = [col for col in required_columns if col not in df.columns] | |
| if missing_columns: | |
| return ( | |
| False, | |
| f"Missing compulsory columns: {format_values_for_display(missing_columns)}.", | |
| ) | |
| # Check if all other columns are numeric | |
| non_numeric_columns = [ | |
| col | |
| for col in df.columns | |
| if col not in required_columns and not pd.api.types.is_numeric_dtype(df[col]) | |
| ] | |
| if non_numeric_columns: | |
| return ( | |
| False, | |
| f"Non-numeric columns: {format_values_for_display(non_numeric_columns)}. All columns except {format_values_for_display(required_columns)} should be numeric.", | |
| ) | |
| # Ensure all columns in data_upload_columns are unique | |
| duplicate_columns_in_upload = [ | |
| col for col in data_upload_columns if data_upload_columns.count(col) > 1 | |
| ] | |
| if duplicate_columns_in_upload: | |
| return ( | |
| False, | |
| f"Duplicate columns found in the uploaded data: {format_values_for_display(set(duplicate_columns_in_upload))}.", | |
| ) | |
| # Convert 'date' column to datetime format | |
| try: | |
| df["date"] = pd.to_datetime(df["date"], format="%Y-%m-%d") | |
| except: | |
| return False, "The 'date' column is not in the correct format 'YYYY-MM-DD'." | |
| # Check date frequency | |
| unique_panels = df["panel"].unique() | |
| for panel in unique_panels: | |
| date_diff = df[df["panel"] == panel]["date"].diff().dropna() | |
| if not ( | |
| (date_diff == pd.Timedelta(days=1)).all() | |
| or (date_diff == pd.Timedelta(weeks=1)).all() | |
| ): | |
| return False, "The 'date' column does not have a daily or weekly frequency." | |
| # Check for null values in 'date' or 'panel' columns | |
| if df[required_columns].isnull().any().any(): | |
| return ( | |
| False, | |
| f"The {format_values_for_display(required_columns)} should not contain null values.", | |
| ) | |
| # Check for panels with less than 1% date overlap | |
| if not gold_layer_df.empty: | |
| panels_with_low_overlap = [] | |
| unique_panels = list( | |
| set(df["panel"].unique()).union(set(gold_layer_df["panel"].unique())) | |
| ) | |
| for panel in unique_panels: | |
| gold_layer_dates = set( | |
| gold_layer_df[gold_layer_df["panel"] == panel]["date"] | |
| ) | |
| data_upload_dates = set(df[df["panel"] == panel]["date"]) | |
| if gold_layer_dates and data_upload_dates: | |
| overlap = len(gold_layer_dates & data_upload_dates) / len( | |
| gold_layer_dates | data_upload_dates | |
| ) | |
| else: | |
| overlap = 0 | |
| if overlap < (minimum_percent_overlap / 100): | |
| panels_with_low_overlap.append(panel) | |
| if panels_with_low_overlap: | |
| return ( | |
| False, | |
| f"Date columns in the gold layer and uploaded data do not have at least {minimum_percent_overlap}% overlap for panels: {format_values_for_display(panels_with_low_overlap)}.", | |
| ) | |
| # Check if spends_columns is less than two | |
| if len(spends_columns) < 2: | |
| return False, "Please add at least two spends columns." | |
| # Check if response_metric_columns is empty | |
| if len(response_metric_columns) < 1: | |
| return False, "Please add response metric columns." | |
| # Check if all numeric columns are positive except those starting with 'exogenous_' or 'internal_' | |
| valid_prefixes = ["exogenous_", "internal_"] | |
| negative_values_columns = [ | |
| col | |
| for col in df.select_dtypes(include=[float, int]).columns | |
| if not any(col.startswith(prefix) for prefix in valid_prefixes) | |
| and (df[col] < 0).any() | |
| ] | |
| if negative_values_columns: | |
| return ( | |
| False, | |
| f"Negative values detected in columns: {format_values_for_display(negative_values_columns)}. Ensure all media and response metric columns are positive.", | |
| ) | |
| # Check for unassociated columns | |
| detected_channels = spends_columns + ["total"] | |
| unassociated_columns = [] | |
| for col in df.columns: | |
| if (col.startswith("_") or col.endswith("_")) or not ( | |
| col.startswith("exogenous_") # Column starts with "exogenous_" | |
| or col.startswith("internal_") # Column starts with "internal_" | |
| or any( | |
| col == f"spends_{channel}" for channel in spends_columns | |
| ) # Column is not in the format "spends_<channel>" | |
| or any( | |
| col == f"response_metric_{metric}_{channel}" | |
| for metric in response_metric_columns | |
| for channel in detected_channels | |
| ) # Column is not in the format "response_metric_<metric>_<channel>" | |
| or any( | |
| col.startswith("media_") | |
| and col.endswith(f"_{channel}") | |
| and len(col) > len(f"media__{channel}") | |
| for channel in spends_columns | |
| ) # Column is not in the format "media_<media_variable_name>_<channel>" | |
| or col in ["date", "panel"] | |
| ): | |
| unassociated_columns.append(col) | |
| if unassociated_columns: | |
| return ( | |
| False, | |
| f"Columns with incorrect format detected: {format_values_for_display(unassociated_columns)}.", | |
| ) | |
| return True, "The data is valid and meets all requirements." | |
| # Function to load the uploaded Excel file into a DataFrame | |
| def load_and_transform_data(uploaded_file): | |
| # Load the uploaded file into a DataFrame if a file is uploaded | |
| if uploaded_file is not None: | |
| df = pd.read_excel(uploaded_file) | |
| else: | |
| df = pd.DataFrame() | |
| return df | |
| # Check if DataFrame exceeds row and column limits | |
| if len(df) > upload_rows_limit or len(df.columns) > upload_column_limit: | |
| st.warning( | |
| f"Data exceeds the row limit of {numerize(upload_rows_limit)} or column limit of {numerize(upload_column_limit)}. Please upload a smaller file.", | |
| icon="⚠️", | |
| ) | |
| # Log message | |
| log_message( | |
| "warning", | |
| f"Data exceeds the row limit of {numerize(upload_rows_limit)} or column limit of {numerize(upload_column_limit)}. Please upload a smaller file.", | |
| "Data Import", | |
| ) | |
| return pd.DataFrame() | |
| # If the DataFrame contains only 'panel' and 'date' columns, return empty DataFrame | |
| if set(df.columns) == {"date", "panel"}: | |
| return pd.DataFrame() | |
| # Transform column names: lower, strip start and end, replace spaces with _ | |
| df.columns = [str(col).strip().lower().replace(" ", "_") for col in df.columns] | |
| # If 'panel' column exists, clean its values | |
| try: | |
| if "panel" in df.columns: | |
| df["panel"] = ( | |
| df["panel"].astype(str).str.lower().str.strip().str.replace(" ", "_") | |
| ) | |
| except: | |
| return df | |
| try: | |
| df["date"] = pd.to_datetime(df["date"], format="%Y-%m-%d") | |
| except: | |
| # The 'date' column is not in the correct format 'YYYY-MM-DD' | |
| return df | |
| # Check date frequency and convert to daily if needed | |
| date_diff = df["date"].diff().dropna() | |
| if (date_diff == pd.Timedelta(days=1)).all(): | |
| # Data is already at daily level | |
| return df | |
| elif (date_diff == pd.Timedelta(weeks=1)).all(): | |
| # Data is at weekly level, convert to daily | |
| weekly_data = df.copy() | |
| daily_data = [] | |
| for index, row in weekly_data.iterrows(): | |
| week_start = row["date"] - pd.to_timedelta(row["date"].weekday(), unit="D") | |
| for i in range(7): | |
| daily_date = week_start + pd.DateOffset(days=i) | |
| new_row = row.copy() | |
| new_row["date"] = daily_date | |
| for col in df.columns: | |
| if isinstance(new_row[col], numbers.Number): | |
| new_row[col] = new_row[col] / 7 | |
| daily_data.append(new_row) | |
| daily_data_df = pd.DataFrame(daily_data) | |
| return daily_data_df | |
| else: | |
| # The 'date' column does not have a daily or weekly frequency | |
| return df | |
| # Function to merge DataFrames if present | |
| def merge_dataframes(gold_layer_df, data_upload_df): | |
| if gold_layer_df.empty and data_upload_df.empty: | |
| return pd.DataFrame() | |
| if not gold_layer_df.empty and not data_upload_df.empty: | |
| # Merge gold_layer_df and data_upload_df on 'panel', and 'date' | |
| merged_df = pd.merge( | |
| gold_layer_df, | |
| data_upload_df, | |
| on=["panel", "date"], | |
| how="outer", | |
| suffixes=("_gold", "_upload"), | |
| ) | |
| # Handle duplicate columns | |
| for col in merged_df.columns: | |
| if col.endswith("_gold"): | |
| base_col = col[:-5] # Remove '_gold' suffix | |
| upload_col = base_col + "_upload" # Column name in data_upload_df | |
| if upload_col in merged_df.columns: | |
| # Prefer values from data_upload_df | |
| merged_df[base_col] = merged_df[upload_col].combine_first( | |
| merged_df[col] | |
| ) | |
| merged_df.drop(columns=[col, upload_col], inplace=True) | |
| else: | |
| # Rename column to remove the suffix | |
| merged_df.rename(columns={col: base_col}, inplace=True) | |
| elif data_upload_df.empty: | |
| merged_df = gold_layer_df.copy() | |
| elif gold_layer_df.empty: | |
| merged_df = data_upload_df.copy() | |
| return merged_df | |
| # Function to check if all required columns are present in the Uploaded DataFrame | |
| def check_required_columns(df, detected_channels, detected_response_metric): | |
| required_columns = [] | |
| # Add all channels with 'spends_' + detected channel name | |
| for channel in detected_channels: | |
| required_columns.append(f"spends_{channel}") | |
| # Add all channels with 'response_metric_' + detected channel name | |
| for response_metric in detected_response_metric: | |
| for channel in detected_channels + ["total"]: | |
| required_columns.append(f"response_metric_{response_metric}_{channel}") | |
| # Check for missing columns | |
| missing_columns = [col for col in required_columns if col not in df.columns] | |
| # Channel groupings | |
| no_media_data = [] | |
| channel_columns_dict = {} | |
| for channel in detected_channels: | |
| channel_columns = [ | |
| col | |
| for col in merged_df.columns | |
| if channel in col | |
| and not ( | |
| col.startswith("response_metric_") | |
| or col.startswith("exogenous_") | |
| or col.startswith("internal_") | |
| ) | |
| and col.endswith(channel) | |
| ] | |
| channel_columns_dict[channel] = channel_columns | |
| if len(channel_columns) <= 1: | |
| no_media_data.append(channel) | |
| return missing_columns, no_media_data, channel_columns_dict | |
| # Function to prepare tool DataFrame | |
| def prepare_tool_df(merged_df, granularity_selection): | |
| # Drop all response metric columns that do not end with '_total' | |
| cols_to_drop = [ | |
| col | |
| for col in merged_df.columns | |
| if col.startswith("response_metric_") and not col.endswith("_total") | |
| ] | |
| # Create a DataFrame to be used for the tool | |
| tool_df = merged_df.drop(columns=cols_to_drop) | |
| # Convert to weekly granularity by aggregating all data for given panel and week | |
| if granularity_selection.lower() == "weekly": | |
| tool_df.set_index("date", inplace=True) | |
| tool_df = ( | |
| tool_df.groupby( | |
| [pd.Grouper(freq="W-MON", closed="left", label="left"), "panel"] | |
| ) | |
| .sum() | |
| .reset_index() | |
| ) | |
| return tool_df | |
| # Function to generate imputation DataFrame | |
| def generate_imputation_df(tool_df): | |
| # Initialize lists to store the column details | |
| column_names = [] | |
| categories = [] | |
| missing_values_info = [] | |
| zero_values_info = [] | |
| imputation_methods = [] | |
| # Define the function to calculate the percentage of missing values | |
| def calculate_missing_percentage(series): | |
| return series.isnull().sum(), (series.isnull().mean() * 100) | |
| # Define the function to calculate the percentage of zero values | |
| def calculate_zero_percentage(series): | |
| return (series == 0).sum(), ((series == 0).mean() * 100) | |
| # Iterate over each column to categorize and calculate missing and zero values | |
| for col in tool_df.columns: | |
| # Determine category based on column name prefix | |
| if col == "date" or col == "panel": | |
| continue | |
| elif col.startswith("response_metric_"): | |
| categories.append("Response Metrics") | |
| elif col.startswith("spends_"): | |
| categories.append("Spends") | |
| elif col.startswith("exogenous_"): | |
| categories.append("Exogenous") | |
| elif col.startswith("internal_"): | |
| categories.append("Internal") | |
| else: | |
| categories.append("Media") | |
| # Calculate missing values and percentage | |
| missing_count, missing_percentage = calculate_missing_percentage(tool_df[col]) | |
| missing_values_info.append(f"{missing_count} ({missing_percentage:.1f}%)") | |
| # Calculate zero values and percentage | |
| zero_count, zero_percentage = calculate_zero_percentage(tool_df[col]) | |
| zero_values_info.append(f"{zero_count} ({zero_percentage:.1f}%)") | |
| # Determine default imputation method based on conditions | |
| if col.startswith("spends_"): | |
| imputation_methods.append("Fill with 0") | |
| elif col.startswith("response_metric_"): | |
| imputation_methods.append("Fill with Mean") | |
| elif zero_percentage + missing_percentage > percent_drop_col_threshold: | |
| imputation_methods.append("Drop Column") | |
| else: | |
| imputation_methods.append("Fill with Mean") | |
| column_names.append(col) | |
| # Create the DataFrame | |
| imputation_df = pd.DataFrame( | |
| { | |
| "Column Name": column_names, | |
| "Category": categories, | |
| "Missing Values": missing_values_info, | |
| "Zero Values": zero_values_info, | |
| "Imputation Method": imputation_methods, | |
| } | |
| ) | |
| # Define the category order for sorting | |
| category_order = { | |
| "Response Metrics": 1, | |
| "Spends": 2, | |
| "Media": 3, | |
| "Exogenous": 4, | |
| "Internal": 5, | |
| } | |
| # Add a temporary column for sorting based on the category order | |
| imputation_df["Category Order"] = imputation_df["Category"].map(category_order) | |
| # Sort the DataFrame based on the category order and then drop the temporary column | |
| imputation_df = imputation_df.sort_values( | |
| by=["Category Order", "Column Name"] | |
| ).drop(columns=["Category Order"]) | |
| return imputation_df | |
| # Function to perform imputation as per user requests | |
| def perform_imputation(imputation_df, tool_df): | |
| # Detect channels associated with spends | |
| detected_channels = [ | |
| col.replace("spends_", "") | |
| for col in tool_df.columns | |
| if col.startswith("spends_") | |
| ] | |
| # Create a dictionary with keys as channels and values as associated columns | |
| group_dict = { | |
| channel: [ | |
| col | |
| for col in tool_df.columns | |
| if channel in col | |
| and not ( | |
| col.startswith("response_metric_") | |
| or col.startswith("exogenous_") | |
| or col.startswith("internal_") | |
| ) | |
| ] | |
| for channel in detected_channels | |
| } | |
| # Create a reverse dictionary with keys as columns and values as channels | |
| column_to_channel_dict = { | |
| col: channel for channel, cols in group_dict.items() for col in cols | |
| } | |
| # Perform imputation | |
| already_dropped = [] | |
| for index, row in imputation_df.iterrows(): | |
| col_name = row["Column Name"] | |
| impute_method = row["Imputation Method"] | |
| # Skip already dropped columns | |
| if col_name in already_dropped: | |
| continue | |
| # Skip imputation if dropping response metric column and add warning | |
| if impute_method == "Drop Column" and col_name.startswith("response_metric_"): | |
| return None, {}, f"Cannot drop response metric column: {col_name}" | |
| # Drop column if requested | |
| if impute_method == "Drop Column": | |
| # If spends column is dropped, drop all related columns | |
| if col_name.startswith("spends_"): | |
| tool_df.drop( | |
| columns=group_dict[col_name.replace("spends_", "")], | |
| inplace=True, | |
| ) | |
| already_dropped += group_dict[col_name.replace("spends_", "")] | |
| del group_dict[col_name.replace("spends_", "")] | |
| else: | |
| tool_df.drop(columns=[col_name], inplace=True) | |
| if not ( | |
| col_name.startswith("exogenous_") | |
| or col_name.startswith("internal_") | |
| ): | |
| group_name = column_to_channel_dict[col_name] | |
| group_dict[group_name].remove(col_name) | |
| # Check for channels with one or fewer associated columns and add warning if needed | |
| if len(group_dict[group_name]) <= 1: | |
| return ( | |
| None, | |
| {}, | |
| f"No media variable associated with category {col_name.replace('spends_', '')}.", | |
| ) | |
| continue | |
| # Check for each panel | |
| for panel in tool_df["panel"].unique(): | |
| panel_df = tool_df[tool_df["panel"] == panel] | |
| # Check if the column is entirely null or empty for the current panel | |
| if panel_df[col_name].isnull().all(): | |
| if impute_method in ["Fill with Mean", "Fill with Median"]: | |
| return ( | |
| None, | |
| {}, | |
| f"Cannot impute for empty column(s) with mean or median. Select 'Fill with 0'. Details: Panel: {panel}, Column: {col_name}", | |
| ) | |
| # Fill missing values as requested | |
| if impute_method == "Fill with Mean": | |
| tool_df[col_name] = tool_df.groupby("panel")[col_name].transform( | |
| lambda x: x.fillna(x.mean()) | |
| ) | |
| elif impute_method == "Fill with Median": | |
| tool_df[col_name] = tool_df.groupby("panel")[col_name].transform( | |
| lambda x: x.fillna(x.median()) | |
| ) | |
| elif impute_method == "Fill with 0": | |
| tool_df[col_name].fillna(0, inplace=True) | |
| # Check if final DataFrame has at least one response metric and two spends categories | |
| response_metrics = [ | |
| col for col in tool_df.columns if col.startswith("response_metric_") | |
| ] | |
| spends_categories = [col for col in tool_df.columns if col.startswith("spends_")] | |
| if len(response_metrics) < 1: | |
| return (None, {}, "The final DataFrame must have at least one response metric.") | |
| if len(spends_categories) < 2: | |
| return ( | |
| None, | |
| {}, | |
| "The final DataFrame must have at least two spends categories.", | |
| ) | |
| return tool_df, group_dict, "Imputed Successfully!" | |
| # Function to display groups with custom styling | |
| def display_groups(input_dict): | |
| # Define custom CSS for pastel light blue rounded rectangle | |
| custom_css = """ | |
| <style> | |
| .group-box { | |
| background-color: #ffdaab; | |
| border-radius: 10px; | |
| padding: 10px; | |
| margin: 5px 0; | |
| } | |
| </style> | |
| """ | |
| st.markdown(custom_css, unsafe_allow_html=True) | |
| for group_name, values in input_dict.items(): | |
| group_html = f"<div class='group-box'><strong>{group_name}:</strong> {format_values_for_display(values)}</div>" | |
| st.markdown(group_html, unsafe_allow_html=True) | |
| # Function to categorize columns and create an ordered dictionary | |
| def create_ordered_category_dict(df): | |
| category_dict = { | |
| "Response Metrics": [], | |
| "Spends": [], | |
| "Media": [], | |
| "Exogenous": [], | |
| "Internal": [], | |
| } | |
| # Define the category order for sorting | |
| category_order = { | |
| "Response Metrics": 1, | |
| "Spends": 2, | |
| "Media": 3, | |
| "Exogenous": 4, | |
| "Internal": 5, | |
| } | |
| for column in df.columns: | |
| if column == "date" or column == "panel": | |
| continue # Skip 'date' and 'panel' columns | |
| if column.startswith("response_metric_"): | |
| category_dict["Response Metrics"].append(column) | |
| elif column.startswith("spends_"): | |
| category_dict["Spends"].append(column) | |
| elif column.startswith("exogenous_"): | |
| category_dict["Exogenous"].append(column) | |
| elif column.startswith("internal_"): | |
| category_dict["Internal"].append(column) | |
| else: | |
| category_dict["Media"].append(column) | |
| # Sort the dictionary based on the defined category order | |
| sorted_category_dict = OrderedDict( | |
| sorted(category_dict.items(), key=lambda item: category_order[item[0]]) | |
| ) | |
| return sorted_category_dict | |
| try: | |
| # Page Title | |
| st.title("Data Import") | |
| # Create file uploader | |
| uploaded_file = st.file_uploader( | |
| "Upload Data", type=["xlsx"], accept_multiple_files=False | |
| ) | |
| # Expander with markdown for upload rules | |
| with st.expander("Upload Rules and Guidelines"): | |
| st.markdown( | |
| """ | |
| ### Upload Guidelines | |
| Please ensure your data adheres to the following rules: | |
| 1. **File Format**: | |
| - Upload all data in a single Excel file. | |
| 2. **Compulsory Columns**: | |
| - **Date**: Must be in the format `YYYY-MM-DD` only. | |
| - **Panel**: If no panel data exists, use `aggregated` as a single panel. | |
| 3. **Column Naming Conventions**: | |
| - All columns should start with the associated category prefix. | |
| **Examples**: | |
| - **Response Metric Column**: | |
| - Format: `response_metric_<response_metric_name>_<channel_name>` | |
| - Example: `response_metric_revenue_facebook` | |
| - **Total Response Metric**: | |
| - Format: `response_metric_<response_metric_name>_total` | |
| - Example: `response_metric_revenue_total` | |
| - **Spend Column**: | |
| - Format: `spends_<channel_name>` | |
| - Example: `spends_facebook` | |
| - **Media Column**: | |
| - Format: `media_<media_variable_name>_<channel_name>` | |
| - Example: `media_clicks_facebook` | |
| - **Exogenous Column**: | |
| - Format: `exogenous_<variable_name>` | |
| - Example: `exogenous_unemployment_rate` | |
| - **Internal Column**: | |
| - Format: `internal_<variable_name>` | |
| - Example: `internal_discount` | |
| **Notes**: | |
| - The `total` response metric should represent the total for a particular date and panel, including all channels and organic contributions. | |
| - The `date` column for weekly data should be the Monday of that week, representing the data from that Monday to the following Sunday. Example: If the week starts on Monday, August 5th, 2024, and ends on Sunday, August 11th, 2024, the date column for that week should display 2024-08-05. | |
| """ | |
| ) | |
| # Upload warning placeholder | |
| upload_warning_placeholder = st.container() | |
| # Load the uploaded file into a DataFrame if a file is uploaded | |
| data_upload_df = load_and_transform_data(uploaded_file) | |
| # Columns for user input | |
| granularity_col, validate_process_col = st.columns(2) | |
| # Dropdown for data granularity | |
| granularity_selection = granularity_col.selectbox( | |
| "Select data granularity", | |
| options=["daily", "weekly"], | |
| format_func=name_format_func, | |
| key="granularity_selection_key", | |
| ) | |
| # Gold Layer DataFrame | |
| gold_layer_df = st.session_state["project_dct"]["data_import"]["gold_layer_df"] | |
| if not gold_layer_df.empty: | |
| st.subheader("Gold Layer DataFrame") | |
| with st.expander("Gold Layer DataFrame"): | |
| st.dataframe( | |
| gold_layer_df, | |
| hide_index=True, | |
| column_config={ | |
| "date": st.column_config.DateColumn("date", format="YYYY-MM-DD") | |
| }, | |
| ) | |
| else: | |
| st.info( | |
| "No gold layer data is selected for this project. Please upload data manually.", | |
| icon="📊", | |
| ) | |
| # Check input data | |
| with validate_process_col: | |
| st.write("##") # Padding | |
| if validate_process_col.button("Validate and Process", use_container_width=True): | |
| with st.spinner("Processing ..."): | |
| # Check if both DataFrames are empty | |
| valid_input = True | |
| if gold_layer_df.empty and data_upload_df.empty: | |
| # If both gold_layer_df and data_upload_df are empty, display a warning and stop the script | |
| st.warning( | |
| "Both the Gold Layer data and the uploaded data are empty. Please provide at least one data source.", | |
| icon="⚠️", | |
| ) | |
| # Log message | |
| log_message( | |
| "warning", | |
| "Both the Gold Layer data and the uploaded data are empty. Please provide at least one data source.", | |
| "Data Import", | |
| ) | |
| valid_input = False | |
| # If the uploaded DataFrame is empty and the Gold Layer is not, swap them to ensure all validation conditions are checked | |
| elif not gold_layer_df.empty and data_upload_df.empty: | |
| data_upload_df, gold_layer_df = ( | |
| gold_layer_df.copy(), | |
| data_upload_df.copy(), | |
| ) | |
| valid_input = True | |
| if valid_input: | |
| # Fetch all necessary columns list | |
| ( | |
| spends_columns, | |
| response_metric_columns, | |
| total_columns, | |
| gold_layer_columns, | |
| data_upload_columns, | |
| ) = fetch_columns(gold_layer_df, data_upload_df) | |
| with upload_warning_placeholder: | |
| valid_input, message = valid_input_df( | |
| data_upload_df, | |
| spends_columns, | |
| response_metric_columns, | |
| total_columns, | |
| gold_layer_columns, | |
| data_upload_columns, | |
| ) | |
| if not valid_input: | |
| st.warning(message, icon="⚠️") | |
| # Log message | |
| log_message("warning", message, "Data Import") | |
| # Merge gold_layer_df and data_upload_df on 'panel' and 'date' | |
| if valid_input: | |
| merged_df = merge_dataframes(gold_layer_df, data_upload_df) | |
| missing_columns, no_media_data, channel_columns_dict = ( | |
| check_required_columns( | |
| merged_df, spends_columns, response_metric_columns | |
| ) | |
| ) | |
| with upload_warning_placeholder: | |
| # Warning for categories with no media data | |
| if no_media_data: | |
| st.warning( | |
| f"Categories without media data: {format_values_for_display(no_media_data)}. Please upload at least one media column to proceed.", | |
| icon="⚠️", | |
| ) | |
| valid_input = False | |
| # Log message | |
| log_message( | |
| "warning", | |
| f"Categories without media data: {format_values_for_display(no_media_data)}. Please upload at least one media column to proceed.", | |
| "Data Import", | |
| ) | |
| # Warning for insufficient rows | |
| elif any( | |
| granularity_selection == "daily" | |
| and len(merged_df[merged_df["panel"] == panel]) | |
| < minimum_row_req | |
| for panel in merged_df["panel"].unique() | |
| ): | |
| st.warning( | |
| f"Insufficient data. Please provide at least {minimum_row_req} days of data for all panel.", | |
| icon="⚠️", | |
| ) | |
| valid_input = False | |
| # Log message | |
| log_message( | |
| "warning", | |
| f"Insufficient data. Please provide at least {minimum_row_req} days of data for all panel.", | |
| "Data Import", | |
| ) | |
| elif any( | |
| granularity_selection == "weekly" | |
| and len(merged_df[merged_df["panel"] == panel]) | |
| < minimum_row_req * 7 | |
| for panel in merged_df["panel"].unique() | |
| ): | |
| st.warning( | |
| f"Insufficient data. Please provide at least {minimum_row_req} weeks of data for all panel.", | |
| icon="⚠️", | |
| ) | |
| valid_input = False | |
| # Log message | |
| log_message( | |
| "warning", | |
| f"Insufficient data. Please provide at least {minimum_row_req} weeks of data for all panel.", | |
| "Data Import", | |
| ) | |
| # Info for missing columns | |
| elif missing_columns: | |
| st.info( | |
| f"Missing columns: {format_values_for_display(missing_columns)}. Please upload all required columns.", | |
| icon="💡", | |
| ) | |
| if valid_input: | |
| # Create a copy of the merged DataFrame for dashboard purposes | |
| dashboard_df = merged_df | |
| # Create a DataFrame for tool purposes | |
| tool_df = prepare_tool_df(merged_df, granularity_selection) | |
| # Create Imputation DataFrame | |
| imputation_df = generate_imputation_df(tool_df) | |
| # Save data to project dictionary | |
| st.session_state["project_dct"]["data_import"][ | |
| "granularity_selection" | |
| ] = st.session_state["granularity_selection_key"] | |
| st.session_state["project_dct"]["data_import"][ | |
| "dashboard_df" | |
| ] = dashboard_df | |
| st.session_state["project_dct"]["data_import"]["tool_df"] = tool_df | |
| st.session_state["project_dct"]["data_import"]["unique_panels"] = ( | |
| tool_df["panel"].unique() | |
| ) | |
| st.session_state["project_dct"]["data_import"][ | |
| "imputation_df" | |
| ] = imputation_df | |
| # Success message | |
| with upload_warning_placeholder: | |
| st.success("Processed Successfully!", icon="🗂️") | |
| st.toast("Processed Successfully!", icon="🗂️") | |
| # Log message | |
| log_message("info", "Processed Successfully!", "Data Import") | |
| # Load saved data from project dictionary | |
| if st.session_state["project_dct"]["data_import"]["tool_df"] is None: | |
| st.stop() | |
| else: | |
| tool_df = st.session_state["project_dct"]["data_import"]["tool_df"] | |
| imputation_df = st.session_state["project_dct"]["data_import"]["imputation_df"] | |
| unique_panels = st.session_state["project_dct"]["data_import"]["unique_panels"] | |
| # Unique Panel | |
| st.subheader("Unique Panel") | |
| # Get unique panels count | |
| total_count = len(unique_panels) | |
| # Define custom CSS for pastel light blue rounded rectangle | |
| custom_css = """ | |
| <style> | |
| .panel-box { | |
| background-color: #ffdaab; | |
| border-radius: 10px; | |
| padding: 10px; | |
| margin: 0 0; | |
| } | |
| </style> | |
| """ | |
| # Display unique panels with total count | |
| st.markdown(custom_css, unsafe_allow_html=True) | |
| panel_html = f"<div class='panel-box'><strong>Unique Panels:</strong> {format_values_for_display(unique_panels)}<br><strong>Total Count:</strong> {total_count}</div>" | |
| st.markdown(panel_html, unsafe_allow_html=True) | |
| st.write("##") # Padding | |
| # Impute Missing Values | |
| st.subheader("Impute Missing Values") | |
| edited_imputation_df = st.data_editor( | |
| imputation_df, | |
| column_config={ | |
| "Imputation Method": st.column_config.SelectboxColumn( | |
| options=[ | |
| "Drop Column", | |
| "Fill with Mean", | |
| "Fill with Median", | |
| "Fill with 0", | |
| ], | |
| required=True, | |
| default="Fill with 0", | |
| ), | |
| }, | |
| column_order=[ | |
| "Column Name", | |
| "Category", | |
| "Missing Values", | |
| "Zero Values", | |
| "Imputation Method", | |
| ], | |
| disabled=["Column Name", "Category", "Missing Values", "Zero Values"], | |
| hide_index=True, | |
| use_container_width=True, | |
| key="imputation_df_key", | |
| ) | |
| # Expander with markdown for imputation rules | |
| with st.expander("Impute Missing Values Guidelines"): | |
| st.markdown( | |
| f""" | |
| ### Imputation Guidelines | |
| Please adhere to the following rules when handling missing values: | |
| 1. **Default Imputation Strategies**: | |
| - **Response Metrics**: Imputed using the **mean** value of the column. | |
| - **Spends**: Imputed with **zero** values. | |
| - **Media, Exogenous, Internal**: Imputation strategy is **dynamic** based on the data. | |
| 2. **Drop Threshold**: | |
| - If the combined percentage of **zeros** and **null values** in any column exceeds `{percent_drop_col_threshold}%`, the column will be **categorized to drop** by default which user can change manually. | |
| - **Example**: If `spends_facebook` has more than `{percent_drop_col_threshold}%` of zeros and nulls combined, it will be marked for dropping. | |
| 3. **Category Generation and Association**: | |
| - Categories are automatically generated from the **Spends** columns. | |
| - **Example**: The column `spends_facebook` will generate the **facebook** category. This means columns like `spends_facebook`, `media_impression_facebook` and `media_clicks_facebook` will also be associated with this category. | |
| 4. **Column Association and Imputation**: | |
| - Each category must have at least **one Media column** associated with it for imputation to proceed. | |
| - **Example**: If the **facebook** category does not have any media columns like `media_impression_facebook`, imputation will not be allowed for that category. | |
| - Solution: Either **drop the entire category** if it is empty, or **impute the columns** associated with the category instead of dropping them. | |
| 5. **Response Metrics and Category Count**: | |
| - Dropping **Response Metric** columns is **not allowed** under any circumstances. | |
| - At least **two categories** must exist after imputation, or the Imputation will not proceed. | |
| - **Example**: If only **facebook** remains after selection, imputation will be halted. | |
| **Notes**: | |
| - The decision to drop a spends column will result in all associated columns being dropped. | |
| - **Example**: Dropping `spends_facebook` will also drop all related columns like `media_impression_facebook` and `media_clicks_facebook`. | |
| """ | |
| ) | |
| # Imputation Warning Placeholder | |
| imputation_warning_placeholder = st.container() | |
| # Save the DataFrame and dictionary from the current session | |
| if st.button("Impute and Save", use_container_width=True): | |
| with st.spinner("Imputing ..."): | |
| with imputation_warning_placeholder: | |
| # Perform Imputation | |
| imputed_tool_df, group_dict, message = perform_imputation( | |
| edited_imputation_df.copy(), tool_df.copy() | |
| ) | |
| if imputed_tool_df is None: | |
| st.warning(message, icon="⚠️") | |
| # Log message | |
| log_message("warning", message, "Data Import") | |
| else: | |
| st.session_state["project_dct"]["data_import"][ | |
| "imputed_tool_df" | |
| ] = imputed_tool_df | |
| st.session_state["project_dct"]["data_import"][ | |
| "imputation_df" | |
| ] = edited_imputation_df | |
| st.session_state["project_dct"]["data_import"][ | |
| "group_dict" | |
| ] = group_dict | |
| st.session_state["project_dct"]["data_import"]["category_dict"] = ( | |
| create_ordered_category_dict(imputed_tool_df) | |
| ) | |
| if imputed_tool_df is not None: | |
| # Update DB | |
| update_db( | |
| prj_id=st.session_state["project_number"], | |
| page_nam="Data Import", | |
| file_nam="project_dct", | |
| pkl_obj=pickle.dumps(st.session_state["project_dct"]), | |
| schema=schema, | |
| ) | |
| # Success message | |
| st.success("Saved Successfully!", icon="💾") | |
| st.toast("Saved Successfully!", icon="💾") | |
| # Log message | |
| log_message("info", "Saved Successfully!", "Data Import") | |
| # Load saved data from project dictionary | |
| if st.session_state["project_dct"]["data_import"]["imputed_tool_df"] is None: | |
| st.stop() | |
| else: | |
| imputed_tool_df = st.session_state["project_dct"]["data_import"][ | |
| "imputed_tool_df" | |
| ] | |
| group_dict = st.session_state["project_dct"]["data_import"]["group_dict"] | |
| category_dict = st.session_state["project_dct"]["data_import"]["category_dict"] | |
| # Channel Groupings | |
| st.subheader("Channel Groupings") | |
| display_groups(group_dict) | |
| st.write("##") # Padding | |
| # Variable Categorization | |
| st.subheader("Variable Categorization") | |
| display_groups(category_dict) | |
| st.write("##") # Padding | |
| # Final DataFrame | |
| st.subheader("Final DataFrame") | |
| st.dataframe( | |
| imputed_tool_df, | |
| hide_index=True, | |
| column_config={ | |
| "date": st.column_config.DateColumn("date", format="YYYY-MM-DD") | |
| }, | |
| ) | |
| st.write("##") # Padding | |
| except Exception as e: | |
| # Capture the error details | |
| exc_type, exc_value, exc_traceback = sys.exc_info() | |
| error_message = "".join( | |
| traceback.format_exception(exc_type, exc_value, exc_traceback) | |
| ) | |
| # Log message | |
| log_message("error", f"An error occurred: {error_message}.", "Data Import") | |
| # Display a warning message | |
| st.warning( | |
| "Oops! Something went wrong. Please try refreshing the tool or creating a new project.", | |
| icon="⚠️", | |
| ) | |