Spaces:
Sleeping
Sleeping
| import subprocess | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| from matplotlib.ticker import FuncFormatter | |
| import gradio as gr | |
| import tempfile | |
| import logging | |
| from PIL import Image | |
| import os | |
| import io | |
| import numpy as np | |
| from itertools import zip_longest | |
| import openai | |
| from dotenv import load_dotenv | |
| from openai import OpenAI | |
| from langchain_openai import ChatOpenAI | |
| from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder | |
| import openpyxl | |
| from io import BytesIO | |
| import logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Load environment variables from .env file | |
| load_dotenv() | |
| # Get the OpenAI API key from environment variables | |
| openai_api_key = os.getenv("OPENAI_API_KEY") | |
| if not openai_api_key: | |
| logger.error("OPENAI_API_KEY is not set.") | |
| else: | |
| logger.info("OpenAI API key loaded.") | |
| try: | |
| # Initialize OpenAI client with the API key | |
| client = OpenAI(api_key=openai_api_key) | |
| except Exception as e: | |
| logger.error(f"Error initializing OpenAI client: {e}") | |
| max_outputs = 10 | |
| outputs = [] | |
| # Global variable to store the selected dataset for AI computation | |
| selected_dataset_ai = "Volkswagen Customers" # Default value | |
| def plot_model_results(results_df, average_value, title, model_type): | |
| """ | |
| Plot model results with specific orders and colors for Trust and NPS models. | |
| Args: | |
| results_df (DataFrame): DataFrame containing predictor names and their importance. | |
| average_value (float): Average importance value. | |
| title (str): Title of the plot. | |
| model_type (str): Type of model (either "Trust" or "NPS"). | |
| Returns: | |
| Image: Image object containing the plot. | |
| """ | |
| logger.info( | |
| "Plotting model results for %s model with title '%s'.", model_type, title | |
| ) | |
| try: | |
| # Define color scheme | |
| color_map = { | |
| "Stability": "#375570", | |
| "Development": "#E3B05B", | |
| "Relationship": "#C63F48", | |
| "Benefit": "#418387", | |
| "Vision": "#DF8859", | |
| "Competence": "#6D93AB", | |
| "Trust": "#f5918a", | |
| } | |
| # Define the order for each model | |
| if model_type == "Trust": | |
| order = [ | |
| "Stability", | |
| "Development", | |
| "Relationship", | |
| "Benefit", | |
| "Vision", | |
| "Competence", | |
| ] | |
| else: # "NPS" | |
| order = [ | |
| "Trust", | |
| "Stability", | |
| "Development", | |
| "Relationship", | |
| "Benefit", | |
| "Vision", | |
| "Competence", | |
| ] | |
| # Apply the categorical ordering to the 'Predictor' column | |
| results_df["Predictor"] = pd.Categorical( | |
| results_df["Predictor"], categories=order, ordered=True | |
| ) | |
| results_df.sort_values("Predictor", ascending=False, inplace=True) | |
| # Create the figure and axis | |
| fig, ax = plt.subplots(figsize=(10, 8)) | |
| # Set the x-axis labels with "%" using FuncFormatter | |
| formatter = FuncFormatter(lambda x, _: f"{x:.0f}%") | |
| ax.xaxis.set_major_formatter(formatter) | |
| # Determine the dynamic range of the X-axis | |
| actual_min = results_df["Importance_percent"].min() | |
| actual_max = results_df["Importance_percent"].max() | |
| # Calculate the x-axis limits | |
| half_range = max(average_value - actual_min, actual_max - average_value) | |
| x_min = average_value - half_range - 3 # Adding some padding for text | |
| x_max = average_value + half_range + 3 # Adding some padding for text | |
| plt.xlim(x_min, x_max) | |
| # Set the x-axis ticks at every 5% interval and add dotted lines | |
| x_ticks = np.arange( | |
| np.floor(x_min), np.ceil(x_max) + 5, 5 | |
| ) # Ensures complete coverage | |
| ax.set_xticks(x_ticks) # Set the ticks on the axis | |
| for tick in x_ticks: | |
| ax.axvline( | |
| x=tick, color="grey", linestyle="--", linewidth=0.5, zorder=2 | |
| ) # Add dotted lines | |
| # Create bars in the bar chart | |
| for i, row in enumerate(results_df.itertuples(index=False)): | |
| color = color_map[row.Predictor] | |
| if row.Importance_percent < average_value: | |
| # For values less than the average, the bar starts at the value and extends to the average | |
| bar_length = average_value - row.Importance_percent | |
| left_edge = row.Importance_percent | |
| text_x = left_edge - 0.5 # Text to the left of the bar | |
| ha = "right" | |
| else: | |
| # For values greater than the average, the bar starts at the average and extends to the value | |
| bar_length = row.Importance_percent - average_value | |
| left_edge = average_value | |
| text_x = row.Importance_percent + 0.5 # Text to the right of the bar | |
| ha = "left" | |
| ax.barh( | |
| row.Predictor, | |
| bar_length, | |
| left=left_edge, | |
| color=color, | |
| edgecolor="white", | |
| height=0.6, | |
| zorder=3, # Set zorder to a value higher than the default for lines | |
| ) | |
| ax.text( | |
| text_x, | |
| i, | |
| f"{row.Importance_percent:.1f}%", | |
| va="center", | |
| ha=ha, | |
| color="#8c8b8c", | |
| ) | |
| # Draw the average line and set the title | |
| ax.axvline(average_value, color="black", linewidth=1, linestyle="-", zorder=3) | |
| plt.title(title, fontsize=14) | |
| # Remove plot borders | |
| ax.spines[["left", "top", "right"]].set_color("none") | |
| # Change the colour of y-axis text | |
| ax.tick_params(axis="y", colors="#8c8b8c", length=0) | |
| # Send axes to background and tighten the layout | |
| ax.set_axisbelow(True) | |
| plt.tight_layout() | |
| # Save the figure to a bytes buffer and then to an image | |
| img_data = io.BytesIO() | |
| plt.savefig( | |
| img_data, format="png", facecolor=fig.get_facecolor(), edgecolor="none" | |
| ) | |
| img_data.seek(0) | |
| img = Image.open(img_data) | |
| plt.close(fig) | |
| return img | |
| except Exception as e: | |
| logger.error("Error plotting model results: %s", e) | |
| raise | |
| def plot_bucket_fullness(driver_df, title): | |
| # Determine required trust buckets | |
| buckets = [ | |
| "Stability", | |
| "Development", | |
| "Relationship", | |
| "Benefit", | |
| "Vision", | |
| "Competence", | |
| ] | |
| # Check if columns are present in df | |
| missing_columns = [col for col in buckets if col not in driver_df.columns] | |
| if missing_columns: | |
| logger.warning( | |
| f"The following columns are missing in driver_df: {missing_columns}" | |
| ) | |
| return None | |
| logger.info("All required columns are present in driver_df.") | |
| try: | |
| color_map = { | |
| "Stability": "#375570", | |
| "Development": "#E3B05B", | |
| "Relationship": "#C63F48", | |
| "Benefit": "#418387", | |
| "Vision": "#DF8859", | |
| "Competence": "#6D93AB", | |
| } | |
| order = buckets | |
| # Calculate the percentage of fullness for each column in buckets | |
| results_df = (driver_df[buckets].mean()).reset_index() | |
| results_df.columns = ["Trust_Bucket", "Fullness_of_Bucket"] | |
| results_df["Trust_Bucket"] = pd.Categorical( | |
| results_df["Trust_Bucket"], categories=order, ordered=True | |
| ) | |
| results_df.sort_values("Trust_Bucket", inplace=True) | |
| fig, ax = plt.subplots(figsize=(10, 8)) | |
| ax.bar( | |
| results_df["Trust_Bucket"], | |
| results_df["Fullness_of_Bucket"], | |
| color=[color_map[bucket] for bucket in results_df["Trust_Bucket"]], | |
| edgecolor="white", | |
| zorder=2, | |
| ) | |
| # Adding the percentage values on top of the bars | |
| for i, row in enumerate(results_df.itertuples(index=False, name=None)): | |
| trust_bucket, fullness_of_bucket = row | |
| ax.text( | |
| i, | |
| fullness_of_bucket + 0.5, # slightly above the top of the bar | |
| f"{fullness_of_bucket:.1f}", | |
| ha="center", | |
| va="bottom", | |
| color="#8c8b8c", | |
| ) | |
| y_max = results_df["Fullness_of_Bucket"].max() + 1 | |
| plt.ylim(0, y_max) | |
| plt.ylabel("Fullness") | |
| plt.title(title, fontsize=14) | |
| ax.spines[["top", "right"]].set_color("none") | |
| # Adding grey dotted lines along the y-axis labels | |
| y_ticks = ax.get_yticks() | |
| for y_tick in y_ticks: | |
| ax.axhline(y=y_tick, color="grey", linestyle="--", linewidth=0.5, zorder=1) | |
| ax.set_axisbelow(True) | |
| plt.tight_layout() | |
| # Save the figure to a bytes buffer and then to an image | |
| img_data = io.BytesIO() | |
| plt.savefig( | |
| img_data, format="png", facecolor=fig.get_facecolor(), edgecolor="none" | |
| ) | |
| img_data.seek(0) | |
| img = Image.open(img_data) | |
| plt.close(fig) | |
| return img | |
| except Exception as e: | |
| logger.error("Error plotting bucket fullness: %s", e) | |
| raise | |
| def call_r_script( | |
| input_file, | |
| text_output_path, | |
| csv_output_path_trust, | |
| csv_output_path_nps, | |
| csv_output_path_loyalty, | |
| csv_output_path_consideration, | |
| csv_output_path_satisfaction, | |
| csv_output_path_trustbuilder, | |
| nps_present, | |
| loyalty_present, | |
| consideration_present, | |
| satisfaction_present, | |
| trustbuilder_present, | |
| ): | |
| """ | |
| Call the R script for Shapley regression analysis. | |
| Args: | |
| input_file (str): Path to the input Excel file. | |
| text_output_path (str): Path to the output text file. | |
| csv_output_path_trust (str): Path to the output CSV file for Trust. | |
| csv_output_path_nps (str): Path to the output CSV file for NPS. | |
| csv_output_path_loyalty (str): Path to the output CSV file for Loyalty. | |
| csv_output_path_consideration (str): Path to the output CSV file for Consideration. | |
| csv_output_path_satisfaction (str): Path to the output CSV file for Satisfaction. | |
| nps_present (bool): Flag indicating whether NPS column is present in the data. | |
| loyalty_present (bool): Flag indicating whether Loyalty column is present in the data. | |
| consideration_present (bool): Flag indicating whether Consideration column is present in the data. | |
| satisfaction_present (bool): Flag indicating whether Satisfaction column is present in the data. | |
| trustbuilder_present (bool): Flag indicating whether Trustbuilder column is present in the data. | |
| """ | |
| command = [ | |
| "Rscript", | |
| "process_data.R", | |
| input_file, | |
| text_output_path, | |
| csv_output_path_trust, | |
| csv_output_path_nps, | |
| csv_output_path_loyalty, | |
| csv_output_path_consideration, | |
| csv_output_path_satisfaction, | |
| csv_output_path_trustbuilder, | |
| str(nps_present).upper(), # Convert the boolean to a string ("TRUE" or "FALSE") | |
| str(loyalty_present).upper(), | |
| str(consideration_present).upper(), | |
| str(satisfaction_present).upper(), | |
| str(trustbuilder_present).upper(), | |
| ] | |
| try: | |
| subprocess.run(command, check=True) | |
| except subprocess.CalledProcessError as e: | |
| logger.error("R script failed with error: %s", e) | |
| raise RuntimeError( | |
| "Error executing R script. Please check the input file format." | |
| ) | |
| except Exception as e: | |
| logger.error("Error calling R script: %s", e) | |
| raise | |
| def analyze_excel_single(file_path): | |
| """ | |
| Analyzes a single Excel file containing data and generates plots for Trust, NPS, Loyalty, Consideration, and Satisfaction models. | |
| Args: | |
| file_path (str): Path to the Excel file. | |
| Returns: | |
| Image: Image of the Trust regression plot. | |
| Image: Image of the NPS regression plot. | |
| Image: Image of the Loyalty regression plot. | |
| Image: Image of the Consideration regression plot. | |
| Image: Image of the Satisfaction regression plot. | |
| str: Summary of the analysis. | |
| """ | |
| logger.info("Analyzing Excel file: %s", file_path) | |
| # Create a temporary directory | |
| temp_dir = tempfile.mkdtemp() | |
| logger.info("Created temporary directory: %s", temp_dir) | |
| try: | |
| # Manually construct file paths | |
| text_output_path = os.path.join(temp_dir, "output.txt") | |
| csv_output_path_trust = text_output_path.replace(".txt", "_trust.csv") | |
| csv_output_path_nps = text_output_path.replace(".txt", "_nps.csv") | |
| csv_output_path_loyalty = text_output_path.replace(".txt", "_loyalty.csv") | |
| csv_output_path_consideration = text_output_path.replace( | |
| ".txt", "_consideration.csv" | |
| ) | |
| csv_output_path_satisfaction = text_output_path.replace( | |
| ".txt", "_satisfaction.csv" | |
| ) | |
| csv_output_path_trustbuilder = text_output_path.replace( | |
| ".txt", "_trustbuilder.csv" | |
| ) | |
| # Load the Trust Driver dataset (CSV or Excel) | |
| # Trust Driver dataset is mandatory | |
| df = None | |
| trustbuilder_present = False | |
| excel_file = pd.ExcelFile(file_path) | |
| # Load the Excel file with the fourth row as the header | |
| df = pd.read_excel(file_path, sheet_name="Driver", header=3) | |
| df = df.applymap(clean_and_convert) | |
| # Check if the "Builder" sheet is present | |
| if "Builder" in excel_file.sheet_names: | |
| # Read the "Builder" sheet, making row 6 the header and reading row 7 onwards as data | |
| builder_data = pd.read_excel(file_path, sheet_name="Builder", header=5) | |
| builder_data = builder_data.applymap(clean_and_convert) | |
| # Check if the "Builder" sheet contains: | |
| # more than 10 rows and required columns, | |
| # and contains at least one TB column | |
| required_builder_columns = [ | |
| "Stability", | |
| "Development", | |
| "Relationship", | |
| "Benefit", | |
| "Vision", | |
| "Competence", | |
| ] | |
| trustbuilder_present = ( | |
| len(builder_data) > 10 | |
| and all(col in builder_data.columns for col in required_builder_columns) | |
| and any(col.startswith("TB") for col in builder_data.columns) | |
| ) | |
| else: | |
| trustbuilder_present = False | |
| # Step 1: Check for missing columns and handle NPS column | |
| required_columns = [ | |
| "Trust", | |
| "Stability", | |
| "Development", | |
| "Relationship", | |
| "Benefit", | |
| "Vision", | |
| "Competence", | |
| ] | |
| missing_columns = set(required_columns) - set(df.columns) | |
| if missing_columns: | |
| logger.warning("Missing columns in dataset: %s", missing_columns) | |
| # Handling NPS column | |
| nps_present = "NPS" in df.columns | |
| if nps_present: | |
| nps_missing_ratio = df["NPS"].isna().mean() | |
| if nps_missing_ratio > 0.8: | |
| df.drop(columns=["NPS"], inplace=True) | |
| nps_present = False | |
| # Handling Loyalty column | |
| loyalty_present = "Loyalty" in df.columns | |
| if loyalty_present: | |
| loyalty_missing_ratio = df["Loyalty"].isna().mean() | |
| if loyalty_missing_ratio > 0.8: | |
| df.drop(columns=["Loyalty"], inplace=True) | |
| loyalty_present = False | |
| # Handling Consideration column | |
| consideration_present = "Consideration" in df.columns | |
| if consideration_present: | |
| consideration_missing_ratio = df["Consideration"].isna().mean() | |
| if consideration_missing_ratio > 0.8: | |
| df.drop(columns=["Consideration"], inplace=True) | |
| consideration_present = False | |
| # Handling Satisfaction column | |
| satisfaction_present = "Satisfaction" in df.columns | |
| if satisfaction_present: | |
| satisfaction_missing_ratio = df["Satisfaction"].isna().mean() | |
| if satisfaction_missing_ratio > 0.8: | |
| df.drop(columns=["Satisfaction"], inplace=True) | |
| satisfaction_present = False | |
| # Step 2: Remove missing values and print data shape | |
| df.dropna(subset=required_columns, inplace=True) | |
| # Ensure the dataset has more than 10 rows | |
| if df.shape[0] <= 10: | |
| return ( | |
| None, | |
| None, | |
| None, | |
| None, | |
| None, | |
| None, | |
| "Dataset must contain more than 10 rows after preprocessing.", | |
| ) | |
| # Step 3: Adjust Shapley regression analysis based on column presence | |
| # Handle Trust Driver Analysis and Trust Builder Analysis | |
| call_r_script( | |
| file_path, | |
| text_output_path, | |
| csv_output_path_trust, | |
| csv_output_path_nps, | |
| csv_output_path_loyalty, | |
| csv_output_path_consideration, | |
| csv_output_path_satisfaction, | |
| csv_output_path_trustbuilder, | |
| nps_present, | |
| loyalty_present, | |
| consideration_present, | |
| satisfaction_present, | |
| trustbuilder_present, | |
| ) | |
| # Read the output text file | |
| with open(text_output_path, "r") as file: | |
| output_text = file.read() | |
| # Get file name for display | |
| file_name = file_path.split("/")[-1] | |
| # Plot how full the trust buckets are | |
| title = f"Trust Profile: {file_name}" | |
| if missing_columns: | |
| img_bucketfull = Image.open("./images/bucket_fullness_not_available.png") | |
| else: | |
| img_bucketfull = plot_bucket_fullness(df, title) | |
| # plot trust | |
| # Get n_samples from output text | |
| n_samples_trust = output_text.split(": Trust")[1] | |
| n_samples_trust = n_samples_trust.split("Analysis based on ")[1] | |
| n_samples_trust = n_samples_trust.split("observations")[0] | |
| results_df_trust = None | |
| results_df_trust = pd.read_csv(csv_output_path_trust) | |
| results_df_trust["Importance_percent"] = results_df_trust["Importance"] * 100 | |
| average_value_trust = results_df_trust["Importance_percent"].mean() | |
| # Load the placeholder image if Trust analysis was not performed | |
| if missing_columns: | |
| img_trust = Image.open("./images/trust_not_available.png") | |
| img_trust = img_trust.resize((1000, 800), Image.Resampling.LANCZOS) | |
| else: | |
| img_trust = plot_model_results( | |
| results_df_trust, | |
| average_value_trust, | |
| f"Trust Drivers: {file_name}", | |
| "Trust", | |
| ) | |
| # plot NPS | |
| img_nps = None | |
| results_df_nps = None | |
| if nps_present: | |
| # Get n_samples from output text | |
| n_samples_nps = output_text.split(": NPS")[1] | |
| n_samples_nps = n_samples_nps.split("Analysis based on ")[1] | |
| n_samples_nps = n_samples_nps.split("observations")[0] | |
| results_df_nps = pd.read_csv(csv_output_path_nps) | |
| results_df_nps["Importance_percent"] = results_df_nps["Importance"] * 100 | |
| average_value_nps = results_df_nps["Importance_percent"].mean() | |
| img_nps = plot_model_results( | |
| results_df_nps, | |
| average_value_nps, | |
| f"NPS Drivers: {file_name}", | |
| "NPS", | |
| ) | |
| else: | |
| # Load the placeholder image if NPS analysis was not performed | |
| img_nps = Image.open("./images/nps_not_available.png") | |
| img_nps = img_nps.resize((1000, 800), Image.Resampling.LANCZOS) | |
| # plot loyalty | |
| img_loyalty = None | |
| results_df_loyalty = None | |
| if loyalty_present: | |
| # Get n_samples from output text | |
| n_samples_loyalty = output_text.split(": Loyalty")[1] | |
| n_samples_loyalty = n_samples_loyalty.split("Analysis based on ")[1] | |
| n_samples_loyalty = n_samples_loyalty.split("observations")[0] | |
| results_df_loyalty = pd.read_csv(csv_output_path_loyalty) | |
| results_df_loyalty["Importance_percent"] = ( | |
| results_df_loyalty["Importance"] * 100 | |
| ) | |
| average_value_loyalty = results_df_loyalty["Importance_percent"].mean() | |
| img_loyalty = plot_model_results( | |
| results_df_loyalty, | |
| average_value_loyalty, | |
| f"Loyalty Drivers: {file_name}", | |
| "Loyalty", | |
| ) | |
| else: | |
| # Load the placeholder image if Loyalty analysis was not performed | |
| img_loyalty = Image.open("./images/loyalty_not_available.png") | |
| img_loyalty = img_loyalty.resize((1000, 800), Image.Resampling.LANCZOS) | |
| # plot consideration | |
| img_consideration = None | |
| results_df_consideration = None | |
| if consideration_present: | |
| # Get n_samples from output text | |
| n_samples_consideration = output_text.split(": Consideration")[1] | |
| n_samples_consideration = n_samples_consideration.split( | |
| "Analysis based on " | |
| )[1] | |
| n_samples_consideration = n_samples_consideration.split("observations")[0] | |
| results_df_consideration = pd.read_csv(csv_output_path_consideration) | |
| results_df_consideration["Importance_percent"] = ( | |
| results_df_consideration["Importance"] * 100 | |
| ) | |
| average_value_consideration = results_df_consideration[ | |
| "Importance_percent" | |
| ].mean() | |
| img_consideration = plot_model_results( | |
| results_df_consideration, | |
| average_value_consideration, | |
| f"Consideration Drivers: {file_name}", | |
| "Consideration", | |
| ) | |
| else: | |
| # Load the placeholder image if Consideration analysis was not performed | |
| img_consideration = Image.open("./images/consideration_not_available.png") | |
| img_consideration = img_consideration.resize( | |
| (1000, 800), Image.Resampling.LANCZOS | |
| ) | |
| # plot satisfaction | |
| img_satisfaction = None | |
| results_df_satisfaction = None | |
| if satisfaction_present: | |
| # Get n_samples from output text | |
| n_samples_satisfaction = output_text.split(": Satisfaction")[1] | |
| n_samples_satisfaction = n_samples_satisfaction.split("Analysis based on ")[ | |
| 1 | |
| ] | |
| n_samples_satisfaction = n_samples_satisfaction.split("observations")[0] | |
| results_df_satisfaction = pd.read_csv(csv_output_path_satisfaction) | |
| results_df_satisfaction["Importance_percent"] = ( | |
| results_df_satisfaction["Importance"] * 100 | |
| ) | |
| average_value_satisfaction = results_df_satisfaction[ | |
| "Importance_percent" | |
| ].mean() | |
| img_satisfaction = plot_model_results( | |
| results_df_satisfaction, | |
| average_value_satisfaction, | |
| f"Satisfaction Drivers: {file_name}", | |
| "Satisfaction", | |
| ) | |
| else: | |
| # Load the placeholder image if Satisfaction analysis was not performed | |
| img_satisfaction = Image.open("./images/satisfaction_not_available.png") | |
| img_satisfaction = img_satisfaction.resize( | |
| (1000, 800), Image.Resampling.LANCZOS | |
| ) | |
| # plot trust builder table 1 and 2 | |
| df_builder_pivot = None | |
| if trustbuilder_present and os.path.exists(csv_output_path_trustbuilder): | |
| # Create dataframe for trust builder | |
| results_df_builder = pd.read_csv(csv_output_path_trustbuilder) | |
| combined_data = { | |
| "Message": results_df_builder["Message"], | |
| "Stability": results_df_builder["Stability"].round(0).astype(int), | |
| "Development": results_df_builder["Development"].round(0).astype(int), | |
| "Relationship": results_df_builder["Relationship"].round(0).astype(int), | |
| "Benefit": results_df_builder["Benefit"].round(0).astype(int), | |
| "Vision": results_df_builder["Vision"].round(0).astype(int), | |
| "Competence": results_df_builder["Competence"].round(0).astype(int), | |
| } | |
| df_builder = pd.DataFrame(combined_data) | |
| # Create consolidated table | |
| # List of bucket columns | |
| bucket_columns = [ | |
| "Stability", | |
| "Development", | |
| "Relationship", | |
| "Benefit", | |
| "Vision", | |
| "Competence", | |
| ] | |
| # Prepare lists to collect data | |
| buckets = [] | |
| messages = [] | |
| percentages = [] | |
| # Iterate through each bucket column | |
| for bucket in bucket_columns: | |
| for index, value in results_df_builder[bucket].items(): | |
| if value > 0: | |
| buckets.append(bucket) | |
| messages.append(results_df_builder["Message"][index]) | |
| percentages.append(int(round(value))) | |
| # Create the new DataFrame | |
| builder_consolidated = { | |
| "Trust Driver®": buckets, | |
| "Trust Proof Point®": messages, | |
| "%": percentages, | |
| } | |
| df_builder_pivot = pd.DataFrame(builder_consolidated) | |
| # Define the order of the Trust Driver® categories | |
| trust_driver_order = [ | |
| "Stability", | |
| "Development", | |
| "Relationship", | |
| "Benefit", | |
| "Vision", | |
| "Competence", | |
| ] | |
| # Convert Trust Driver® column to a categorical type with the specified order | |
| df_builder_pivot["Trust Driver®"] = pd.Categorical( | |
| df_builder_pivot["Trust Driver®"], | |
| categories=trust_driver_order, | |
| ordered=True, | |
| ) | |
| # Sort the DataFrame by 'Trust Driver®' and '%' in descending order within each 'Trust Driver®' | |
| df_builder_pivot = df_builder_pivot.sort_values( | |
| by=["Trust Driver®", "%"], ascending=[True, False] | |
| ) | |
| # Cleanup temporary files | |
| if os.path.exists(csv_output_path_trust): | |
| os.remove(csv_output_path_trust) | |
| if nps_present and os.path.exists(csv_output_path_nps): | |
| os.remove(csv_output_path_nps) | |
| if loyalty_present and os.path.exists(csv_output_path_loyalty): | |
| os.remove(csv_output_path_loyalty) | |
| if consideration_present and os.path.exists(csv_output_path_consideration): | |
| os.remove(csv_output_path_consideration) | |
| if satisfaction_present and os.path.exists(csv_output_path_satisfaction): | |
| os.remove(csv_output_path_satisfaction) | |
| if trustbuilder_present and os.path.exists(csv_output_path_trustbuilder): | |
| os.remove(csv_output_path_trustbuilder) | |
| if os.path.exists(text_output_path): | |
| os.remove(text_output_path) | |
| return ( | |
| img_bucketfull, | |
| img_trust, | |
| img_nps, | |
| img_loyalty, | |
| img_consideration, | |
| img_satisfaction, | |
| df_builder_pivot, | |
| output_text, | |
| results_df_trust, | |
| results_df_nps, | |
| results_df_loyalty, | |
| results_df_consideration, | |
| results_df_satisfaction, | |
| ) | |
| except Exception as e: | |
| logger.error("Error analyzing Excel file: %s", e) | |
| raise | |
| finally: | |
| if os.path.exists(temp_dir): | |
| try: | |
| os.rmdir(temp_dir) | |
| except Exception as e: | |
| logger.error("Error removing temporary directory: %s", e) | |
| def batch_file_processing(file_paths): | |
| """ | |
| Analyzes all Excel files in a list of file paths and generates plots for all models. | |
| Args: | |
| file_paths (List[str]): List of paths to the Excel files. | |
| Returns: | |
| Image: Image of the Trust regression plot. | |
| Image: Image of the NPS regression plot. | |
| Image: Image of the Loyalty regression plot. | |
| Image: Image of the Consideration regression plot. | |
| Image: Image of the Satisfaction regression plot. | |
| str: Summary of the analysis. | |
| """ | |
| img_bucketfull_list = [] | |
| img_trust_list = [] | |
| img_nps_list = [] | |
| img_loyalty_list = [] | |
| img_consideration_list = [] | |
| img_satisfaction_list = [] | |
| df_builder_pivot_list = [] | |
| output_text_list = [] | |
| for file_path in file_paths: | |
| try: | |
| ( | |
| img_bucketfull, | |
| img_trust, | |
| img_nps, | |
| img_loyalty, | |
| img_consideration, | |
| img_satisfaction, | |
| df_builder_pivot, | |
| output_text, | |
| results_df_trust, | |
| results_df_nps, | |
| results_df_loyalty, | |
| results_df_consideration, | |
| results_df_satisfaction, | |
| ) = analyze_excel_single(file_path) | |
| img_bucketfull_list.append(img_bucketfull) | |
| img_trust_list.append(img_trust) | |
| img_nps_list.append(img_nps) | |
| img_loyalty_list.append(img_loyalty) | |
| img_consideration_list.append(img_consideration) | |
| img_satisfaction_list.append(img_satisfaction) | |
| df_builder_pivot_list.append(df_builder_pivot) | |
| output_text_list.append(output_text) | |
| except Exception as e: | |
| logger.error("Error processing file %s: %s", file_path, e) | |
| return ( | |
| img_bucketfull_list, | |
| img_trust_list, | |
| img_nps_list, | |
| img_loyalty_list, | |
| img_consideration_list, | |
| img_satisfaction_list, | |
| df_builder_pivot_list, | |
| output_text_list, | |
| ) | |
| def variable_outputs(file_inputs): | |
| file_inputs_single = file_inputs | |
| # Call batch file processing and get analysis results | |
| ( | |
| img_bucketfull_list, | |
| img_trust_list, | |
| img_nps_list, | |
| img_loyalty_list, | |
| img_consideration_list, | |
| img_satisfaction_list, | |
| df_builder_pivot_list, | |
| output_text_list, | |
| ) = batch_file_processing(file_inputs_single) | |
| # Get number of datasets uploaded | |
| k = len(file_inputs_single) | |
| # Container for visible plots | |
| plots_visible = [] | |
| # Use zip_longest to iterate over the lists, padding with None | |
| for row, ( | |
| img_bucketfull, | |
| img_trust, | |
| img_nps, | |
| img_loyalty, | |
| img_consideration, | |
| img_satisfaction, | |
| df_builder_pivot, | |
| output_text, | |
| ) in enumerate( | |
| zip_longest( | |
| img_bucketfull_list, | |
| img_trust_list, | |
| img_nps_list, | |
| img_loyalty_list, | |
| img_consideration_list, | |
| img_satisfaction_list, | |
| df_builder_pivot_list, | |
| output_text_list, | |
| ) | |
| ): | |
| # Get dataset name | |
| dataset_name = file_inputs_single[row].split("/")[-1] | |
| # Based on the number of files uploaded, determine the content of each textbox | |
| plots = [ | |
| gr.Markdown( | |
| "<span style='font-size:20px; font-weight:bold;'>1) Trust Profile</span>", | |
| visible=True, | |
| ), | |
| gr.Markdown( | |
| "This analysis shows you show strongly you are trusted in each of the six Trust Buckets®. You can also see this for any competitor.", | |
| visible=True, | |
| ), | |
| gr.Image( | |
| value=img_bucketfull, | |
| type="pil", | |
| label="Trust Profile", | |
| visible=True, | |
| ), | |
| gr.Markdown( | |
| "<span style='font-size:20px; font-weight:bold;'>2) Trust and KPI Drivers</span>", | |
| visible=True, | |
| ), | |
| gr.Markdown( | |
| "This analysis shows you which of the TrustLogic® dimensions are most effective in building more trust and improving your KPIs. " | |
| + "Here we display Trust and NPS, but in the full version you can include up to four KPIs (e.g. CSAT, Consideration, Loyalty). " | |
| + "<br>The Trust Buckets® extending to the right are the more important ones. We show how they over and under-index. " | |
| + "The average driver impact is 16.7% (100% divided by 6 trust dimensions). The higher the % above average, the more important. " | |
| + "That means that you need to ‘fill’ these Trust Buckets® with the right attributes and messages.", | |
| visible=True, | |
| ), | |
| gr.Image( | |
| value=img_trust, | |
| type="pil", | |
| label="Trust Drivers", | |
| visible=True, | |
| ), | |
| gr.Image( | |
| value=img_nps, | |
| type="pil", | |
| label="NPS Drivers", | |
| visible=True, | |
| ), | |
| gr.Image( | |
| value=img_loyalty, | |
| type="pil", | |
| label="Loyalty Drivers", | |
| visible=True, | |
| ), | |
| gr.Image( | |
| value=img_consideration, | |
| type="pil", | |
| label="Consideration Drivers", | |
| visible=True, | |
| ), | |
| gr.Image( | |
| value=img_satisfaction, | |
| type="pil", | |
| label="Satisfaction Drivers", | |
| visible=True, | |
| ), | |
| gr.Textbox( | |
| value=output_text, | |
| label="Analysis Summary", | |
| visible=False, | |
| ), | |
| ] | |
| # add current plots to container | |
| plots_visible += plots | |
| if isinstance(df_builder_pivot, pd.DataFrame): | |
| logger.debug(f"df_builder_pivot: {df_builder_pivot}") | |
| markdown_5 = gr.Markdown( | |
| "<span style='font-size:20px; font-weight:bold;'>3) Proof Points</span>", | |
| visible=True, | |
| ) | |
| markdown_6 = gr.Markdown( | |
| "These are the reasons to trust and recommend. They can be your brand values, features, attributes, programmes and messages. " | |
| + "<br>In the first table, use the little arrow in each column to toggle the most to least effective proof points to fill each Trust Bucket®. Your focus is only on the Trust Bucket® with the highest driver impact. " | |
| + "<br>In the second table you see the top scoring proof points ordered by Trust Bucket®. " | |
| + "<br>Note: Even if Trust Buckets for Customers and Prospects overlap, the most effective statements are very different. This provides clear guidance for acquisition versus loyalty activities.", | |
| visible=True, | |
| ) | |
| table_builder_2 = gr.Dataframe( | |
| value=df_builder_pivot, | |
| headers=list(df_builder_pivot.columns), | |
| interactive=False, | |
| label=f"{dataset_name}", | |
| visible=True, | |
| height=800, | |
| wrap=True, | |
| ) | |
| plots_visible.append(markdown_5) | |
| plots_visible.append(markdown_6) | |
| plots_visible.append(table_builder_2) | |
| else: | |
| empty_markdown = gr.Markdown("", visible=False) | |
| empty_table = gr.Dataframe(value=None, label="", visible=False) | |
| plots_visible.append(gr.Markdown("", visible=False)) | |
| plots_visible.append(gr.Markdown("", visible=False)) | |
| plots_visible.append(gr.Dataframe(value=None, label="", visible=False)) | |
| plots_invisible = [ | |
| gr.Markdown("", visible=False), | |
| gr.Markdown("", visible=False), | |
| gr.Image(label="Trust Buckets", visible=False), | |
| gr.Markdown("", visible=False), | |
| gr.Markdown("", visible=False), | |
| gr.Image(label="Trust Drivers", visible=False), | |
| gr.Image(label="NPS Drivers", visible=False), | |
| gr.Image(label="Loyalty Drivers", visible=False), | |
| gr.Image(label="Consideration Drivers", visible=False), | |
| gr.Image(label="Satisfaction Drivers", visible=False), | |
| gr.Textbox(label="Analysis Summary", visible=False), | |
| gr.Markdown("", visible=False), | |
| gr.Markdown("", visible=False), | |
| gr.Dataframe(value=None, label="", visible=False), | |
| ] | |
| return plots_visible + plots_invisible * (max_outputs - k) | |
| def reset_outputs(): | |
| # Reset outputs | |
| outputs = [] | |
| # Create fixed dummy components | |
| markdown_1 = gr.Markdown( | |
| "<span style='font-size:20px; font-weight:bold;'>1) Trust Profile</span>", | |
| visible=True, | |
| ) | |
| markdown_2 = gr.Markdown( | |
| "This analysis shows you show strongly you are trusted in each of the six Trust Buckets®. You can also see this for any competitor.", | |
| visible=True, | |
| ) | |
| buckets_plot = gr.Image(value=None, label="Trust Buckets", visible=True) | |
| markdown_3 = gr.Markdown( | |
| "<span style='font-size:20px; font-weight:bold;'>2) Trust and KPI Drivers</span>", | |
| visible=True, | |
| ) | |
| markdown_4 = gr.Markdown( | |
| "This analysis shows you which of the TrustLogic® dimensions are most effective in building more trust and improving your KPIs. " | |
| + "Here we display Trust and NPS, but in the full version you can include up to four KPIs (e.g. CSAT, Consideration, Loyalty). " | |
| + "<br>The Trust Buckets® extending to the right are the more important ones. We show how they over and under-index. " | |
| + "The average driver impact is 16.7% (100% divided by 6 trust dimensions). The higher the % above average, the more important. " | |
| + "That means that you need to ‘fill’ these Trust Buckets® with the right attributes and messages.", | |
| visible=True, | |
| ) | |
| trust_plot = gr.Image(value=None, label="Trust Drivers", visible=True) | |
| nps_plot = gr.Image(value=None, label="NPS Drivers", visible=True) | |
| loyalty_plot = gr.Image(value=None, label="Loyalty Drivers", visible=True) | |
| consideration_plot = gr.Image( | |
| value=None, label="Consideration Drivers", visible=True | |
| ) | |
| satisfaction_plot = gr.Image(value=None, label="Satisfaction Drivers", visible=True) | |
| summary_text = gr.Textbox(value=None, label="Analysis Summary", visible=False) | |
| markdown_5 = gr.Markdown( | |
| "<span style='font-size:20px; font-weight:bold;'>3) Proof Points</span>", | |
| visible=True, | |
| ) | |
| markdown_6 = gr.Markdown( | |
| "These are the reasons to trust and recommend. They can be your brand values, features, attributes, programmes and messages. " | |
| + "<br>In the first table, use the little arrow in each column to toggle the most to least effective proof points to fill each Trust Bucket®. Your focus is only on the Trust Bucket® with the highest driver impact. " | |
| + "<br>In the second table you see the top scoring proof points ordered by Trust Bucket®. " | |
| + "<br>Note: Even if Trust Buckets for Customers and Prospects overlap, the most effective statements are very different. This provides clear guidance for acquisition versus loyalty activities.", | |
| visible=True, | |
| ) | |
| df_builder_pivot = gr.Dataframe(value=None, label="", visible=True) | |
| outputs.append(markdown_1) | |
| outputs.append(markdown_2) | |
| outputs.append(buckets_plot) | |
| outputs.append(markdown_3) | |
| outputs.append(markdown_4) | |
| outputs.append(trust_plot) | |
| outputs.append(nps_plot) | |
| outputs.append(loyalty_plot) | |
| outputs.append(consideration_plot) | |
| outputs.append(satisfaction_plot) | |
| outputs.append(summary_text) | |
| outputs.append(markdown_5) | |
| outputs.append(markdown_6) | |
| outputs.append(df_builder_pivot) | |
| # invisible from second set onwards | |
| for i in range(1, max_outputs): | |
| markdown_empty = gr.Markdown("", visible=False) | |
| plot_empty = gr.Image(value=None, label="", visible=False) | |
| df_empty = gr.Dataframe(value=None, label="", visible=False) | |
| text_empty = gr.Textbox(value=None, label="", visible=False) | |
| outputs.append(gr.Markdown("", visible=False)) | |
| outputs.append(gr.Markdown("", visible=False)) | |
| outputs.append(gr.Image(value=None, label="", visible=False)) | |
| outputs.append(gr.Markdown("", visible=False)) | |
| outputs.append(gr.Markdown("", visible=False)) | |
| outputs.append(gr.Image(value=None, label="", visible=False)) | |
| outputs.append(gr.Image(value=None, label="", visible=False)) | |
| outputs.append(gr.Image(value=None, label="", visible=False)) | |
| outputs.append(gr.Image(value=None, label="", visible=False)) | |
| outputs.append(gr.Image(value=None, label="", visible=False)) | |
| outputs.append(gr.Textbox(value=None, label="", visible=False)) | |
| outputs.append(gr.Markdown("", visible=False)) | |
| outputs.append(gr.Markdown("", visible=False)) | |
| outputs.append(gr.Dataframe(value=None, label="", visible=False)) | |
| return outputs | |
| def clean_and_convert(cell): | |
| if isinstance(cell, str): | |
| cell = cell.replace("\t", "") | |
| try: | |
| # Convert to float | |
| float_val = float(cell) | |
| return float_val | |
| except ValueError: | |
| return cell | |
| def data_processing(file_path): | |
| """ | |
| Processes a single CSV file and generates required outputs. | |
| Args: | |
| file_path (str): Path to the CSV file. | |
| Returns: | |
| str: Path to the processed Excel file. | |
| """ | |
| try: | |
| logger.info("Processing CSV file: %s", file_path) | |
| # Load the first two rows to get the column names | |
| header_df = pd.read_csv(file_path, header=None, nrows=2) | |
| # Fill NaN values in the rows with an empty string | |
| header_df.iloc[0] = header_df.iloc[0].fillna("") | |
| header_df.iloc[1] = header_df.iloc[1].fillna("") | |
| # Merge the two rows to create column names | |
| merged_columns = header_df.iloc[0] + " " + header_df.iloc[1] | |
| # Load the rest of the DataFrame (data) and rename columns using the merged column names | |
| df = pd.read_csv(file_path, skiprows=2, names=merged_columns) | |
| df = df.applymap(clean_and_convert) | |
| print(df) | |
| # Remove the "RID" column if it exists in header_df, merged_columns, and df | |
| rid_columns = [col for col in merged_columns if "RID" in col] | |
| if rid_columns: | |
| for rid_col in rid_columns: | |
| rid_index = merged_columns[merged_columns == rid_col].index[0] | |
| header_df.drop(columns=header_df.columns[rid_index], inplace=True) | |
| merged_columns = merged_columns.drop(rid_index) | |
| df.drop(columns=[rid_col], inplace=True) | |
| # For any value in all columns that contain " - " (rating), | |
| # split and only take the first part (digits) | |
| def split_value(val): | |
| if isinstance(val, str) and " - " in val: | |
| return val.split(" - ")[0] | |
| return val | |
| # Apply the function to all elements of the DataFrame | |
| df = df.applymap(split_value) | |
| # Convert the columns from the third column onwards to numeric | |
| df.iloc[:, 2:] = df.iloc[:, 2:].apply(pd.to_numeric, errors="coerce") | |
| # Context-based data processing | |
| # Search for the text in the column names, get column index | |
| search_text = "how likely are you to buy another".lower() | |
| col_index = [ | |
| i for i, col in enumerate(df.columns) if search_text in col.lower() | |
| ] | |
| # If there is such column found (column index not empty) | |
| if col_index: | |
| col_index = col_index[ | |
| 0 | |
| ] # Get the column index instead of list (assume there's only one column) | |
| # Define the mapping dictionary for reverse replacement | |
| # 1 change to 5, 2 change to 4, and vice versa | |
| replace_map = {1: 5, 2: 4, 4: 2, 5: 1} | |
| # Replace values in the chosen column | |
| df.iloc[:, col_index] = df.iloc[:, col_index].replace(replace_map) | |
| # Define column mapping for renaming | |
| column_mapping = { | |
| "Did you own a": "Q1", | |
| "your age": "Q2", | |
| "How likely are you to recommend buying a": "NPS", | |
| "level of trust": "Trust", | |
| "buy another": "Loyalty", | |
| "consider buying": "Consideration", | |
| "Has built a strong and stable foundation": "Stability", | |
| "Will develop well in the future": "Development", | |
| "Relates well to people like me": "Relationship", | |
| "Is valuable to our lives": "Benefit", | |
| "Has vision and values I find appealing": "Vision", | |
| "Has what it takes to succeed": "Competence", | |
| } | |
| # Create a list to hold the new column names | |
| list_labels = [] | |
| # Loop through each column in merged_columns | |
| # Define new column names | |
| for col in merged_columns: | |
| label = None | |
| for key, value in column_mapping.items(): | |
| if key.lower() in col.lower(): | |
| label = value | |
| break | |
| if label: | |
| list_labels.append(label) | |
| # Determine the difference between the lengths of list_labels and merged_columns | |
| difference = len(merged_columns) - len(list_labels) | |
| # TRUST STATEMENTS TB1 - TB37 populate to the rest of columns | |
| # Append the next values ("TB1", "TB2", ...) until list_labels matches the length of merged_columns | |
| for i in range(difference): | |
| list_labels.append(f"TB{i + 1}") | |
| # Place list_labels at the first row after the column names | |
| df_labels = pd.DataFrame([list_labels], columns=df.columns) | |
| # Concatenate header_df, df_labels, and df | |
| # Ensure header_df has the same columns as df | |
| header_df.columns = df.columns | |
| # Create a DataFrame with 2 rows of NaNs (to follow the format of Excel template) | |
| nan_rows = pd.DataFrame(np.nan, index=range(2), columns=df.columns) | |
| # Pad 2 rows of NaNs, followed by survey questions to make it the same format as the input excel file | |
| df = pd.concat([nan_rows, header_df, df_labels, df]).reset_index(drop=True) | |
| # Make list labels the column names | |
| df.columns = list_labels | |
| # Remove columns beyond TB60 | |
| max_tb_label = 60 | |
| tb_columns = [col for col in df.columns if col.startswith("TB")] | |
| tb_columns_to_keep = {f"TB{i}" for i in range(1, max_tb_label + 1)} | |
| tb_columns_to_drop = [ | |
| col for col in tb_columns if col not in tb_columns_to_keep | |
| ] | |
| df.drop(columns=tb_columns_to_drop, inplace=True) | |
| # Take snippets from df as drivers | |
| kpis = [ | |
| "Trust", | |
| "NPS", | |
| "Loyalty", | |
| "Consideration", | |
| "Satisfaction", | |
| ] | |
| drivers = [ | |
| "Stability", | |
| "Development", | |
| "Relationship", | |
| "Benefit", | |
| "Vision", | |
| "Competence", | |
| ] | |
| # Create an empty list to store the selected columns | |
| selected_columns = [] | |
| # Check each item in kpis and drivers and search in df.columns | |
| for kpi in kpis: | |
| for col in df.columns: | |
| if pd.notna(col) and kpi.lower() in col.lower(): | |
| selected_columns.append(col) | |
| for driver in drivers: | |
| for col in df.columns: | |
| if pd.notna(col) and driver.lower() in col.lower(): | |
| selected_columns.append(col) | |
| # Extract the selected columns into a new DataFrame df_drivers | |
| df_drivers = df[selected_columns].iloc[4:].reset_index(drop=True) | |
| # Create a DataFrame with 2 rows of NaNs | |
| nan_rows = pd.DataFrame(np.nan, index=range(2), columns=df_drivers.columns) | |
| # Pad 3 rows of NaNs to make it the same format as the input excel file | |
| df_drivers = pd.concat([nan_rows, df_drivers]).reset_index(drop=True) | |
| # Get dataset name | |
| dataset_name = file_path.split("/")[-1] | |
| dataset_name = dataset_name.split(".")[0] | |
| # Create a temporary directory | |
| temp_dir = tempfile.mkdtemp() | |
| logger.info("Created temporary directory for processed file: %s", temp_dir) | |
| # Save processed df as an Excel file in the temporary directory | |
| processed_file_path = os.path.join(temp_dir, f"{dataset_name}.xlsx") | |
| with pd.ExcelWriter(processed_file_path) as writer: | |
| df_drivers.to_excel(writer, sheet_name="Driver", index=False) | |
| df.to_excel(writer, sheet_name="Builder", index=False) | |
| return processed_file_path | |
| except Exception as e: | |
| logger.error("Error processing CSV file: %s", e) | |
| raise | |
| def process_examples(file_name): | |
| file_path = f"example_files/{file_name[0]}" | |
| file_path = [file_path] | |
| outputs = variable_outputs(file_path) | |
| return outputs | |
| def clean_and_convert(cell): | |
| if isinstance(cell, str): # Check if cell value is a string | |
| cell = cell.replace("\t", "") | |
| try: | |
| float_val = float(cell) | |
| return float_val | |
| except (ValueError, TypeError): | |
| return cell | |
| def clean_excel_file(file_path): | |
| logger.info("Cleaning file...") | |
| # Load the workbook from the uploaded file | |
| with open(file_path, 'rb') as file_obj: | |
| workbook = openpyxl.load_workbook(file_obj) | |
| # Create a temporary file path for the cleaned file | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.xlsx') as temp_file: | |
| temp_file_path = temp_file.name | |
| # Iterate through all sheets | |
| for sheet_name in workbook.sheetnames: | |
| worksheet = workbook[sheet_name] | |
| logger.info(f"Cleaning sheet: {sheet_name}") | |
| # Iterate through all cells in the current sheet and clean the data | |
| for row in worksheet.iter_rows(): | |
| for cell in row: | |
| cell.value = clean_and_convert(cell.value) | |
| # Save the cleaned file to the temporary path | |
| workbook.save(temp_file_path) | |
| logger.info("File cleaned successfully.") | |
| return temp_file_path | |
| def process_datasets(file_paths): | |
| """ | |
| Processes uploaded datasets and calls appropriate functions based on file type. | |
| Args: | |
| file_paths (List[str]): List of file paths. | |
| Returns: | |
| List[str]: List of paths to processed files. | |
| """ | |
| outputs_list = [] | |
| for file_path in file_paths: | |
| file_extension = os.path.splitext(file_path)[-1].lower() | |
| logger.info(f"Processing file: {file_path}") | |
| if file_extension == ".xlsx": | |
| logger.info("Detected .xlsx file. Cleaning and processing...") | |
| try: | |
| cleaned_file_path = clean_excel_file(file_path) | |
| outputs_list.append(cleaned_file_path) | |
| logger.info(f"Cleaned file saved to: {cleaned_file_path}") | |
| except Exception as e: | |
| logger.error("Error cleaning file %s: %s", file_path, e) | |
| elif file_extension == ".csv": | |
| logger.info("Detected .csv file. Saving and processing...") | |
| try: | |
| processed_file_path = data_processing(file_path) | |
| if processed_file_path: | |
| outputs_list.append(processed_file_path) | |
| logger.info(f"Processed file saved to: {processed_file_path}") | |
| except Exception as e: | |
| logger.error("Error processing file %s: %s", file_path, e) | |
| outputs = variable_outputs(outputs_list) | |
| return outputs | |
| def chatbot_response(message): | |
| global selected_dataset_ai | |
| # Load the selected dataset | |
| dataset_file_path = f"example_files/{selected_dataset_ai}.xlsx" | |
| try: | |
| ( | |
| img_bucketfull, | |
| img_trust, | |
| img_nps, | |
| img_loyalty, | |
| img_consideration, | |
| img_satisfaction, | |
| df_builder_pivot, | |
| output_text, | |
| results_df_trust, | |
| results_df_nps, | |
| results_df_loyalty, | |
| results_df_consideration, | |
| results_df_satisfaction, | |
| ) = analyze_excel_single(dataset_file_path) | |
| if df_builder_pivot is not None: | |
| qualified_bucket_names_list = [] | |
| # Remove buckets with values below 18% | |
| qualified_bucket_names_trust = results_df_trust[ | |
| results_df_trust["Importance_percent"] >= 18 | |
| ]["Predictor"].tolist() | |
| qualified_bucket_names_list.append(qualified_bucket_names_trust) | |
| if results_df_nps is not None: | |
| qualified_bucket_names_nps = results_df_nps[ | |
| results_df_nps["Importance_percent"] >= 18 | |
| ]["Predictor"].tolist() | |
| qualified_bucket_names_list.append(qualified_bucket_names_nps) | |
| if results_df_loyalty is not None: | |
| qualified_bucket_names_loyalty = results_df_loyalty[ | |
| results_df_loyalty["Importance_percent"] >= 18 | |
| ]["Predictor"].tolist() | |
| qualified_bucket_names_list.append(qualified_bucket_names_loyalty) | |
| if results_df_consideration is not None: | |
| qualified_bucket_names_consideration = results_df_consideration[ | |
| results_df_consideration["Importance_percent"] >= 18 | |
| ]["Predictor"].tolist() | |
| qualified_bucket_names_list.append(qualified_bucket_names_consideration) | |
| if results_df_satisfaction is not None: | |
| qualified_bucket_names_satisfaction = results_df_satisfaction[ | |
| results_df_satisfaction["Importance_percent"] >= 18 | |
| ]["Predictor"].tolist() | |
| qualified_bucket_names_list.append(qualified_bucket_names_satisfaction) | |
| # Flatten the list of lists and convert to a set to remove duplicates | |
| qualified_bucket_names_flat = [ | |
| item for sublist in qualified_bucket_names_list for item in sublist | |
| ] | |
| qualified_bucket_names_unique = list(set(qualified_bucket_names_flat)) | |
| # Filter df_builder_pivot to include only statements where "Trust Driver" is in qualified_bucket_names_unique | |
| df_builder_pivot = df_builder_pivot[ | |
| df_builder_pivot["Trust Driver®"].isin(qualified_bucket_names_unique) | |
| ] | |
| # Remove statements with values below 18% | |
| df_builder_pivot = df_builder_pivot[df_builder_pivot["%"] >= 18] | |
| df_builder_pivot_str = df_builder_pivot.to_string(index=False) | |
| else: | |
| df_builder_pivot_str = "Trust Builder information is not available." | |
| except FileNotFoundError: | |
| df_builder_pivot_str = "Dataset not found." | |
| except Exception as e: | |
| df_builder_pivot_str = f"An error occurred during analysis: {e}" | |
| # Define knowledge base | |
| knowledge = None | |
| # Define the path to the .md file | |
| knowledge_file_path = "./data_source/time_to_rethink_trust_book.md" | |
| # Read the content of the file into a variable | |
| with open(knowledge_file_path, "r", encoding="utf-8") as file: | |
| knowledge = file.read() | |
| # Create the prompt template | |
| prompt_message = f""" | |
| You are an expert copywriter that generates content based on the instruction from the user request. | |
| USER_REQUEST: {message} | |
| Equip yourself with domain knowledge in the field of Trust Analysis with the knowledge base. | |
| KNOWLEDGE_BASE: {knowledge} | |
| The user has selected the dataset: {selected_dataset_ai}. | |
| The user already computes his/her Trust Analysis and the result is displayed as DATAFRAME_PROOF_POINT: {df_builder_pivot_str}. | |
| There are 3 columns in DATAFRAME_PROOF_POINT: Trust Driver, Trust Proof Point, and %. | |
| Trust Driver: contains Trust indicators/buckets. | |
| Trust Buckets: contains 6 unique Trust Buckets: Stability, Development, Relationship, Benefit, Vision, and Competence. | |
| Trust Proof Point: contains Trust statements/messages associated with its Trust indicator/bucket. | |
| %: contains the percentage of how strong the Trust statements/messages contribute to their respective Trust indicators/buckets. | |
| The higher the % value is, the more important the Trust Proof Points are. | |
| Here's how you need to generate your response: | |
| 1. If not explicitly mentioned, the user's default company name is Volkswagen. | |
| 2. First, mention which dataset is selected. | |
| 3. If DATAFRAME_PROOF_POINT is None or empty: | |
| - Respond to the user by saying Trust Builder information is not given and you will reply based on general knowledge. | |
| - Generate your response to the user prompt based on KNOWLEDGE_BASE and general knowledge. | |
| 4. If DATAFRAME_PROOF_POINT is not None or empty: | |
| - For each Trust Bucket Filter in DATAFRAME_PROOF_POINT, select Trust Proof Points related to that Trust Bucket that have values 18% and above. They are considered as top scoring statements. | |
| - Display only the top scoring statements with values of 18% and above. | |
| - Then, respond to the user prompt based on these top scoring statements. | |
| You must adhere to generating the exact type of sales content required by the user based on USER_REQUEST. | |
| Use KNOWLEDGE_BASE as a reference in terms of definitions and examples. | |
| The sales content must be accurate, factual, and precise, based on the top scoring statements. Avoid making up new information. | |
| YOUR RESPONSE: | |
| """ | |
| llm = ChatOpenAI(model="gpt-4o", temperature=0.0) | |
| response = llm.invoke(prompt_message) | |
| return response.content | |
| def read_ai_dataset_selection(): | |
| global selected_dataset_ai | |
| return selected_dataset_ai | |
| def update_ai_dataset_selection(selection): | |
| global selected_dataset_ai | |
| selected_dataset_ai = selection | |
| return selection | |
| with gr.Blocks() as demo: | |
| # with gr.Column(): | |
| # gr.Markdown( | |
| # "<span style='font-size:20px; font-weight:bold;'>Click 'Volkswagen Customers' or 'Volkswagen Prospects' to see the full results and play with the TrustAI.</span>", | |
| # visible=True, | |
| # ) | |
| # gr.Markdown( | |
| # "Our calculator will conduct the driver analysis from the underlying Excel file and display the results. " | |
| # + "Scroll down to view them and interact with them. " | |
| # + "In the full version you can link your survey directly to our calculator or export your data as CSV and drag & drop it into our calculator.", | |
| # visible=True, | |
| # ) | |
| with gr.Column(): | |
| # with gr.Row(): | |
| # vw_customers_btn = gr.Button("Volkswagen Customers") | |
| # vw_prospects_btn = gr.Button("Volkswagen Prospects") | |
| # with gr.Row(): | |
| # gr.Markdown( | |
| # "<span style='font-size:20px; font-weight:bold;'>Click any of the examples below to see top-line driver results in different categories.</span>", | |
| # visible=True, | |
| # ) | |
| # with gr.Row(): | |
| # hsbc_btn = gr.Button("HSBC") | |
| # cba_btn = gr.Button("Commonwealth Bank") | |
| # bupa_btn = gr.Button("BUPA") | |
| # health_insurance_btn = gr.Button("GMHBA") | |
| # care_btn = gr.Button("CARE") | |
| # red_cross_btn = gr.Button("Red Cross") | |
| with gr.Row(): | |
| # set file upload widget | |
| file_inputs = gr.Files(label="Datasets") | |
| with gr.Row(): | |
| # set clear and submit butttons | |
| clear_button = gr.ClearButton(file_inputs) | |
| submit_button = gr.Button("Submit", variant="primary") | |
| with gr.Column(): | |
| # set default output widgets | |
| outputs = reset_outputs() | |
| # function for submit button click | |
| submit_button.click(fn=process_datasets, inputs=file_inputs, outputs=outputs) | |
| # function for clear button click | |
| # this only handles the outputs. Input reset is handled at button definition | |
| clear_button.click(fn=reset_outputs, inputs=[], outputs=outputs) | |
| # # Create gr.State components to store file names as lists | |
| # vw_customers_state = gr.State(value=["Volkswagen Customers.xlsx"]) | |
| # vw_prospects_state = gr.State(value=["Volkswagen Prospects.xlsx"]) | |
| # hsbc_state = gr.State(value=["HSBC.xlsx"]) | |
| # cba_state = gr.State(value=["Commonwealth Bank.xlsx"]) | |
| # bupa_state = gr.State(value=["BUPA.xlsx"]) | |
| # health_insurance_state = gr.State(value=["GMHBA.xlsx"]) | |
| # care_state = gr.State(value=["CARE.xlsx"]) | |
| # red_cross_state = gr.State(value=["Red Cross.xlsx"]) | |
| # vw_customers_btn.click( | |
| # fn=process_examples, | |
| # inputs=[vw_customers_state], | |
| # outputs=outputs, | |
| # ) | |
| # vw_prospects_btn.click( | |
| # fn=process_examples, | |
| # inputs=[vw_prospects_state], | |
| # outputs=outputs, | |
| # ) | |
| # hsbc_btn.click( | |
| # fn=process_examples, | |
| # inputs=[hsbc_state], | |
| # outputs=outputs, | |
| # ) | |
| # cba_btn.click( | |
| # fn=process_examples, | |
| # inputs=[cba_state], | |
| # outputs=outputs, | |
| # ) | |
| # bupa_btn.click( | |
| # fn=process_examples, | |
| # inputs=[bupa_state], | |
| # outputs=outputs, | |
| # ) | |
| # health_insurance_btn.click( | |
| # fn=process_examples, | |
| # inputs=[health_insurance_state], | |
| # outputs=outputs, | |
| # ) | |
| # care_btn.click( | |
| # fn=process_examples, | |
| # inputs=[care_state], | |
| # outputs=outputs, | |
| # ) | |
| # red_cross_btn.click( | |
| # fn=process_examples, | |
| # inputs=[red_cross_state], | |
| # outputs=outputs, | |
| # ) | |
| with gr.Column(): | |
| gr.Markdown( | |
| "<span style='font-size:20px; font-weight:bold;'>4) Instant Insight-2-Action</span>", | |
| visible=True, | |
| ) | |
| gr.Markdown( | |
| "With <b>TrustAI</b> you go straight from insight to implementation ideas. " | |
| + "Select <b>the dataset you want to use.</b> ", | |
| visible=True, | |
| ) | |
| radio = gr.Radio( | |
| choices=["Volkswagen Customers", "Volkswagen Prospects"], | |
| label="Select a dataset you want to use for the TrustAI", | |
| value=read_ai_dataset_selection(), # Initialize with the current selection | |
| visible=True, | |
| ) | |
| gr.Markdown( | |
| "Tell TrustAI what you want to generate and <b>create trust-enhanced ideas using the top trust and KPI proof points.</b> " | |
| + "<br><br>Or copy and paste this <b>sample prompt</b> to the AI input field below: <br>" | |
| + "<i>''Write a letter to get reader to want to come to showroom''</i>", | |
| visible=True, | |
| ) | |
| radio.change(fn=update_ai_dataset_selection, inputs=radio, outputs=[]) | |
| # Text input box for the user to enter their prompt | |
| prompt_input = gr.Textbox( | |
| lines=2, | |
| value="", | |
| label="Enter your prompt", | |
| visible=True, | |
| ) | |
| # with gr.Column(): | |
| gr.Markdown( | |
| "Click <b>'Submit'</b> and our TrustAI will generate responses based on your input prompt.", | |
| visible=True, | |
| ) | |
| # Submit button | |
| submit_button = gr.Button("Submit") | |
| # Output display box to show the response | |
| output_display = gr.Markdown(label="Response") | |
| # Connect the submit button to the chatbot_response function | |
| submit_button.click( | |
| fn=chatbot_response, inputs=prompt_input, outputs=output_display | |
| ) | |
| demo.launch(server_name="0.0.0.0") | |