CSS_EDA_Dashboard / src /utils /correlation.py
arash7920's picture
Upload 38 files
e869d90 verified
"""
Correlation matrix generation module for mixed data types.
This module provides the CorrelationMatrixGenerator class which computes
correlation/association matrices for DataFrames containing mixed data types
(Continuous, Binary, Categorical). It automatically selects appropriate
correlation measures based on feature type pairs.
"""
import numpy as np
import pandas as pd
from scipy.stats import chi2_contingency, pointbiserialr
from tqdm import tqdm
class CorrelationMatrixGenerator:
"""
A class to generate a correlation/association matrix for a pandas DataFrame,
handling different data types appropriately. It supports Continuous, Binary, and Categorical data types.
Parameters:
----------
df : pd.DataFrame
The input DataFrame containing features for correlation analysis.
feature_classes : dict
A dictionary mapping column names to their data types ('Continuous', 'Binary', 'Categorical').
continuous_vs_continuous_method : str, optional
Method to use for estimating the correlation coefficient of two continuous data types. Default is 'pearson'.
Methods:
-------
generate_matrix() -> pd.DataFrame
Generates and returns a symmetric correlation/association matrix for the DataFrame.
"""
def __init__(self, df, feature_classes, continuous_vs_continuous_method='pearson'):
"""
Initialize with a DataFrame and a dictionary mapping column names to data types.
Parameters:
df : pandas.DataFrame
The DataFrame containing your data.
feature_classes : dict
A dictionary where keys are column names in df and values are their data types.
Valid types are 'Continuous', 'Binary', or 'Categorical'.
continuous_vs_continuous_method : str
Method to use for estimating the correlation coefficient of two continuous data
"""
self.df = df
self.feature_classes = feature_classes
self.continuous_vs_continuous_method = continuous_vs_continuous_method
@staticmethod
def recode_binary(series):
"""
Ensure a binary series is coded as 0 and 1.
If the series is already numeric with values {0,1}, it is returned as is.
Otherwise, it maps the two unique values to 0 and 1.
Parameters
----------
series : pd.Series
A binary series to recode.
Returns
-------
pd.Series
Binary series with values {0, 1}.
Raises
------
ValueError
If the series does not appear to be binary (has more than 2 unique values).
"""
# Check if already numeric and in {0, 1}
if pd.api.types.is_numeric_dtype(series):
unique_vals = series.dropna().unique()
if set(unique_vals) <= {0, 1}:
return series
# Map two unique values to {0, 1}
unique_vals = series.dropna().unique()
if len(unique_vals) == 2:
mapping = {unique_vals[0]: 0, unique_vals[1]: 1}
return series.map(mapping)
else:
raise ValueError("Series does not appear to be binary")
@staticmethod
def cramers_v(x, y):
"""
Calculate Cramér's V statistic for a categorical-categorical association.
Cramér's V is a measure of association between two nominal variables,
ranging from 0 (no association) to 1 (perfect association).
Parameters
----------
x, y : array-like
Two categorical variables.
Returns
-------
float
Cramér's V statistic, or np.nan if computation is not possible.
"""
contingency_table = pd.crosstab(x, y)
chi2 = chi2_contingency(contingency_table)[0]
n = contingency_table.values.sum()
min_dim = min(contingency_table.shape) - 1
if n == 0 or min_dim == 0:
return np.nan
return np.sqrt(chi2 / (n * min_dim))
@staticmethod
def anova_eta(categories, measurements):
"""
Compute the eta (η) as an effect size measure derived from one-way ANOVA.
It indicates the proportion of variance in the continuous variable (measurements)
explained by the categorical grouping (categories). Higher values indicate a stronger effect.
Parameters:
categories : array-like (categorical grouping)
measurements : array-like (continuous values)
Returns:
eta : float, between 0 and 1 representing the effect size.
"""
# Factorize the categorical variable
factors, _ = pd.factorize(categories)
categories_count = np.max(factors) + 1
overall_mean = np.mean(measurements)
ss_between = 0.0 # Sum of Squares
for i in range(categories_count):
group = measurements[factors == i]
n_i = len(group)
if n_i == 0:
continue
group_mean = np.mean(group)
ss_between += n_i * ((group_mean - overall_mean) ** 2)
ss_total = np.sum((measurements - overall_mean) ** 2)
if ss_total == 0:
return np.nan
eta = np.sqrt(ss_between / ss_total)
return eta
def compute_pairwise_correlation(self, series1, type1, series2, type2):
"""
Compute the correlation/association between two series based on their data types.
Parameters:
series1, series2 : pandas.Series
type1, type2 : str, one of 'Continuous', 'Binary', 'Categorical'
Returns:
A correlation/association measure (float) or np.nan if not defined.
"""
# ------------- Homogeneous Data types -------------
# Continuous vs. Continuous: Pearson correlation
if {type1, type2} == {'Continuous', 'Continuous'}:
return series1.corr(series2, method=self.continuous_vs_continuous_method)
# Binary vs. Binary: Phi coefficient (using Pearson on recoded binaries)
elif {type1, type2} == {'Binary', 'Binary'}:
try:
s1 = self.recode_binary(series1)
s2 = self.recode_binary(series2)
except Exception as e:
return np.nan
return s1.corr(s2, method='pearson')
# Categorical vs. Categorical: Use Cramér's V
elif {type1, type2} == {'Categorical', 'Categorical'}:
return self.cramers_v(series1, series2)
# ------------- Heterogeneous Data Types -------------
# Binary & Continuous: Point-biserial correlation coefficient
elif {type1, type2} == {'Continuous', 'Binary'}:
binary_series = series1 if type1 == 'Binary' else series2
continuous_series = series2 if type2 == 'Continuous' else series1
try:
binary_series = self.recode_binary(binary_series)
except Exception as e:
return np.nan
corr, _ = pointbiserialr(binary_series, continuous_series)
return corr
# Categorical vs. Continuous: Use ANOVA-based effect size (η)
elif {type1, type2} == {'Continuous', 'Categorical'}:
return self.anova_eta(series1, series2) if type1 == 'Categorical' else self.anova_eta(series2, series1)
# Binary vs. Categorical: Treat as nominal and use Cramér's V
elif {type1, type2} == {'Binary', 'Categorical'}:
return self.cramers_v(series1, series2)
else:
return np.nan
def generate_matrix(self):
"""
Generate a symmetric correlation/association matrix for the specified columns,
using the appropriate method based on their data types.
The matrix is computed by iterating over all feature pairs and selecting
the appropriate correlation measure based on their types. The matrix
is symmetric (corr(A, B) = corr(B, A)).
Returns
-------
pd.DataFrame
A symmetric correlation/association matrix with feature names as
both index and columns. Values are rounded to 4 decimal places.
"""
factors = list(self.feature_classes.keys())
corr_matrix = pd.DataFrame(index=factors, columns=factors, dtype=float)
# Compute pairwise correlations
for i, var1 in tqdm(list(enumerate(factors))):
for j, var2 in enumerate(factors):
if i == j:
# Diagonal: perfect correlation with itself
corr_matrix.loc[var1, var2] = 1.0
elif pd.isna(corr_matrix.loc[var1, var2]):
# Compute correlation only if not already computed (upper triangle)
series1 = self.df[var1]
series2 = self.df[var2]
type1 = self.feature_classes[var1]
type2 = self.feature_classes[var2]
corr_value = self.compute_pairwise_correlation(series1, type1, series2, type2)
# Fill both upper and lower triangle for symmetry
corr_matrix.loc[var1, var2] = corr_value
corr_matrix.loc[var2, var1] = corr_value # ensure symmetry
return corr_matrix.round(4)