kpi_analysis / correlation_analysis_core.py
zh3036's picture
Deploy KPI snapshot 2025-06-12
4e67a93
#!/usr/bin/env python3
"""
Core correlation analysis functionality shared between Gradio app and CLI interface
"""
import pandas as pd
import numpy as np
from scipy import stats
import os
from datetime import datetime
# Import utility functions
from csv_utils import robust_csv_loader, find_email_column, find_ipm_columns
def convert_percentage_to_numeric(series):
"""Convert percentage strings to numeric values."""
if series.dtype == 'object':
return pd.to_numeric(series.astype(str).str.rstrip('%'), errors='coerce') / 100
else:
return pd.to_numeric(series, errors='coerce')
def load_and_merge_data(kpi_file, scores_file):
"""
Load KPI and scores files and merge them on email.
Args:
kpi_file: Path to KPI file (CSV or Excel)
scores_file: Path to scores file (CSV)
Returns:
tuple: (kpi_df, scores_df, merged_df, kpi_email_col, fy2425_ipm_col, fy2324_ipm_col)
"""
# Load KPI file
kpi_df = robust_csv_loader(kpi_file, required_columns=['Email'])
# Load scores file
scores_df = robust_csv_loader(scores_file, required_columns=['email', 'problem_score', 'ability_score'])
# Find email column in KPI dataframe
kpi_email_col = find_email_column(kpi_df)
if not kpi_email_col:
raise ValueError("Could not find email column in KPI file")
# Find IPM columns
fy2425_ipm_col, fy2324_ipm_col = find_ipm_columns(kpi_df)
# Normalize email columns
kpi_df['email_normalized'] = kpi_df[kpi_email_col].str.lower()
scores_df['email_normalized'] = scores_df['email'].str.lower()
# Merge datasets
merged_df = pd.merge(scores_df, kpi_df, on='email_normalized', how='inner')
return kpi_df, scores_df, merged_df, kpi_email_col, fy2425_ipm_col, fy2324_ipm_col
def analyze_data_quality(kpi_df, scores_df, merged_df, kpi_email_col='Email'):
"""
Analyze and report data quality statistics.
Returns:
dict: Data quality statistics
"""
stats = {
'kpi_records': len(kpi_df),
'scores_records': len(scores_df),
'matched_emails': len(merged_df)
}
# Calculate match rate
kpi_emails = set(kpi_df[kpi_email_col].str.lower().dropna())
scores_emails = set(scores_df['email'].str.lower().dropna())
common_emails = kpi_emails.intersection(scores_emails)
kpi_only = kpi_emails - scores_emails
scores_only = scores_emails - kpi_emails
stats.update({
'common_emails': len(common_emails),
'emails_only_in_kpi': len(kpi_only),
'emails_only_in_scores': len(scores_only),
'match_rate_kpi': len(common_emails)/len(kpi_emails)*100 if len(kpi_emails) > 0 else 0,
'match_rate_scores': len(common_emails)/len(scores_emails)*100 if len(scores_emails) > 0 else 0
})
return stats
def calculate_correlations(merged_df, fy2324_ipm_col, fy2425_ipm_col):
"""
Calculate correlations for all pairs.
Args:
merged_df: Merged dataframe with both scores and KPI data
fy2324_ipm_col: Column name for FY23/24 IPM
fy2425_ipm_col: Column name for FY24/25 IPM
Returns:
tuple: (results_dict, pairs_plot_data)
"""
# Extract columns
A = merged_df['problem_score']
B = merged_df['ability_score']
C = merged_df[fy2324_ipm_col]
D = merged_df[fy2425_ipm_col]
# Define pairs
pairs = [
('AC', A, C, 'problem_score', 'FY23/24 IPM'),
('AD', A, D, 'problem_score', 'FY24/25 IPM'),
('BC', B, C, 'ability_score', 'FY23/24 IPM'),
('BD', B, D, 'ability_score', 'FY24/25 IPM')
]
results = {}
plot_data = {}
# Calculate correlations for each pair
for pair_name, series1, series2, name1, name2 in pairs:
# Clean data
series1_clean = pd.to_numeric(series1, errors='coerce')
series2_clean = convert_percentage_to_numeric(series2)
# Create dataframe and drop NaN
pair_df = pd.DataFrame({
'var1': series1_clean,
'var2': series2_clean
})
initial_count = len(pair_df)
pair_df_clean = pair_df.dropna()
valid_points = len(pair_df_clean)
# Initialize result
results[pair_name] = {
'pearson': {
'correlation': None,
'p_value': None,
'n_samples': valid_points
},
'spearman': {
'correlation': None,
'p_value': None,
'n_samples': valid_points
},
'data_quality': {
'initial_records': initial_count,
'valid_records': valid_points,
'completion_rate': f"{valid_points/initial_count*100:.1f}%" if initial_count > 0 else "0%"
}
}
# Store plot data
plot_data[pair_name] = {
'x_data': pair_df_clean['var1'].values if valid_points > 0 else [],
'y_data': pair_df_clean['var2'].values if valid_points > 0 else [],
'x_label': name1,
'y_label': name2,
'pearson_corr': None,
'spearman_corr': None,
'n_samples': valid_points
}
# Calculate correlations if enough data
if valid_points >= 3:
pearson_corr, pearson_p = stats.pearsonr(pair_df_clean['var1'], pair_df_clean['var2'])
spearman_corr, spearman_p = stats.spearmanr(pair_df_clean['var1'], pair_df_clean['var2'])
results[pair_name]['pearson']['correlation'] = float(pearson_corr)
results[pair_name]['pearson']['p_value'] = float(pearson_p)
results[pair_name]['spearman']['correlation'] = float(spearman_corr)
results[pair_name]['spearman']['p_value'] = float(spearman_p)
plot_data[pair_name]['pearson_corr'] = float(pearson_corr)
plot_data[pair_name]['spearman_corr'] = float(spearman_corr)
return results, plot_data
def analyze_correlations_full(kpi_file, scores_file):
"""
Complete correlation analysis pipeline.
Args:
kpi_file: Path to KPI file
scores_file: Path to scores file
Returns:
tuple: (data_quality_stats, correlation_results, plot_data, column_info)
"""
# Load and merge data
kpi_df, scores_df, merged_df, kpi_email_col, fy2425_ipm_col, fy2324_ipm_col = load_and_merge_data(kpi_file, scores_file)
# Get data quality stats
data_quality_stats = analyze_data_quality(kpi_df, scores_df, merged_df, kpi_email_col)
# Calculate correlations
correlation_results, plot_data = calculate_correlations(merged_df, fy2324_ipm_col, fy2425_ipm_col)
# Column info for reference
column_info = {
'kpi_email_col': kpi_email_col,
'fy2425_ipm_col': fy2425_ipm_col,
'fy2324_ipm_col': fy2324_ipm_col
}
return data_quality_stats, correlation_results, plot_data, column_info