#!/usr/bin/env python3 """ Core correlation analysis functionality shared between Gradio app and CLI interface """ import pandas as pd import numpy as np from scipy import stats import os from datetime import datetime # Import utility functions from csv_utils import robust_csv_loader, find_email_column, find_ipm_columns def convert_percentage_to_numeric(series): """Convert percentage strings to numeric values.""" if series.dtype == 'object': return pd.to_numeric(series.astype(str).str.rstrip('%'), errors='coerce') / 100 else: return pd.to_numeric(series, errors='coerce') def load_and_merge_data(kpi_file, scores_file): """ Load KPI and scores files and merge them on email. Args: kpi_file: Path to KPI file (CSV or Excel) scores_file: Path to scores file (CSV) Returns: tuple: (kpi_df, scores_df, merged_df, kpi_email_col, fy2425_ipm_col, fy2324_ipm_col) """ # Load KPI file kpi_df = robust_csv_loader(kpi_file, required_columns=['Email']) # Load scores file scores_df = robust_csv_loader(scores_file, required_columns=['email', 'problem_score', 'ability_score']) # Find email column in KPI dataframe kpi_email_col = find_email_column(kpi_df) if not kpi_email_col: raise ValueError("Could not find email column in KPI file") # Find IPM columns fy2425_ipm_col, fy2324_ipm_col = find_ipm_columns(kpi_df) # Normalize email columns kpi_df['email_normalized'] = kpi_df[kpi_email_col].str.lower() scores_df['email_normalized'] = scores_df['email'].str.lower() # Merge datasets merged_df = pd.merge(scores_df, kpi_df, on='email_normalized', how='inner') return kpi_df, scores_df, merged_df, kpi_email_col, fy2425_ipm_col, fy2324_ipm_col def analyze_data_quality(kpi_df, scores_df, merged_df, kpi_email_col='Email'): """ Analyze and report data quality statistics. Returns: dict: Data quality statistics """ stats = { 'kpi_records': len(kpi_df), 'scores_records': len(scores_df), 'matched_emails': len(merged_df) } # Calculate match rate kpi_emails = set(kpi_df[kpi_email_col].str.lower().dropna()) scores_emails = set(scores_df['email'].str.lower().dropna()) common_emails = kpi_emails.intersection(scores_emails) kpi_only = kpi_emails - scores_emails scores_only = scores_emails - kpi_emails stats.update({ 'common_emails': len(common_emails), 'emails_only_in_kpi': len(kpi_only), 'emails_only_in_scores': len(scores_only), 'match_rate_kpi': len(common_emails)/len(kpi_emails)*100 if len(kpi_emails) > 0 else 0, 'match_rate_scores': len(common_emails)/len(scores_emails)*100 if len(scores_emails) > 0 else 0 }) return stats def calculate_correlations(merged_df, fy2324_ipm_col, fy2425_ipm_col): """ Calculate correlations for all pairs. Args: merged_df: Merged dataframe with both scores and KPI data fy2324_ipm_col: Column name for FY23/24 IPM fy2425_ipm_col: Column name for FY24/25 IPM Returns: tuple: (results_dict, pairs_plot_data) """ # Extract columns A = merged_df['problem_score'] B = merged_df['ability_score'] C = merged_df[fy2324_ipm_col] D = merged_df[fy2425_ipm_col] # Define pairs pairs = [ ('AC', A, C, 'problem_score', 'FY23/24 IPM'), ('AD', A, D, 'problem_score', 'FY24/25 IPM'), ('BC', B, C, 'ability_score', 'FY23/24 IPM'), ('BD', B, D, 'ability_score', 'FY24/25 IPM') ] results = {} plot_data = {} # Calculate correlations for each pair for pair_name, series1, series2, name1, name2 in pairs: # Clean data series1_clean = pd.to_numeric(series1, errors='coerce') series2_clean = convert_percentage_to_numeric(series2) # Create dataframe and drop NaN pair_df = pd.DataFrame({ 'var1': series1_clean, 'var2': series2_clean }) initial_count = len(pair_df) pair_df_clean = pair_df.dropna() valid_points = len(pair_df_clean) # Initialize result results[pair_name] = { 'pearson': { 'correlation': None, 'p_value': None, 'n_samples': valid_points }, 'spearman': { 'correlation': None, 'p_value': None, 'n_samples': valid_points }, 'data_quality': { 'initial_records': initial_count, 'valid_records': valid_points, 'completion_rate': f"{valid_points/initial_count*100:.1f}%" if initial_count > 0 else "0%" } } # Store plot data plot_data[pair_name] = { 'x_data': pair_df_clean['var1'].values if valid_points > 0 else [], 'y_data': pair_df_clean['var2'].values if valid_points > 0 else [], 'x_label': name1, 'y_label': name2, 'pearson_corr': None, 'spearman_corr': None, 'n_samples': valid_points } # Calculate correlations if enough data if valid_points >= 3: pearson_corr, pearson_p = stats.pearsonr(pair_df_clean['var1'], pair_df_clean['var2']) spearman_corr, spearman_p = stats.spearmanr(pair_df_clean['var1'], pair_df_clean['var2']) results[pair_name]['pearson']['correlation'] = float(pearson_corr) results[pair_name]['pearson']['p_value'] = float(pearson_p) results[pair_name]['spearman']['correlation'] = float(spearman_corr) results[pair_name]['spearman']['p_value'] = float(spearman_p) plot_data[pair_name]['pearson_corr'] = float(pearson_corr) plot_data[pair_name]['spearman_corr'] = float(spearman_corr) return results, plot_data def analyze_correlations_full(kpi_file, scores_file): """ Complete correlation analysis pipeline. Args: kpi_file: Path to KPI file scores_file: Path to scores file Returns: tuple: (data_quality_stats, correlation_results, plot_data, column_info) """ # Load and merge data kpi_df, scores_df, merged_df, kpi_email_col, fy2425_ipm_col, fy2324_ipm_col = load_and_merge_data(kpi_file, scores_file) # Get data quality stats data_quality_stats = analyze_data_quality(kpi_df, scores_df, merged_df, kpi_email_col) # Calculate correlations correlation_results, plot_data = calculate_correlations(merged_df, fy2324_ipm_col, fy2425_ipm_col) # Column info for reference column_info = { 'kpi_email_col': kpi_email_col, 'fy2425_ipm_col': fy2425_ipm_col, 'fy2324_ipm_col': fy2324_ipm_col } return data_quality_stats, correlation_results, plot_data, column_info