""" Correlation matrix generation module for mixed data types. This module provides the CorrelationMatrixGenerator class which computes correlation/association matrices for DataFrames containing mixed data types (Continuous, Binary, Categorical). It automatically selects appropriate correlation measures based on feature type pairs. """ import numpy as np import pandas as pd from scipy.stats import chi2_contingency, pointbiserialr from tqdm import tqdm class CorrelationMatrixGenerator: """ A class to generate a correlation/association matrix for a pandas DataFrame, handling different data types appropriately. It supports Continuous, Binary, and Categorical data types. Parameters: ---------- df : pd.DataFrame The input DataFrame containing features for correlation analysis. feature_classes : dict A dictionary mapping column names to their data types ('Continuous', 'Binary', 'Categorical'). continuous_vs_continuous_method : str, optional Method to use for estimating the correlation coefficient of two continuous data types. Default is 'pearson'. Methods: ------- generate_matrix() -> pd.DataFrame Generates and returns a symmetric correlation/association matrix for the DataFrame. """ def __init__(self, df, feature_classes, continuous_vs_continuous_method='pearson'): """ Initialize with a DataFrame and a dictionary mapping column names to data types. Parameters: df : pandas.DataFrame The DataFrame containing your data. feature_classes : dict A dictionary where keys are column names in df and values are their data types. Valid types are 'Continuous', 'Binary', or 'Categorical'. continuous_vs_continuous_method : str Method to use for estimating the correlation coefficient of two continuous data """ self.df = df self.feature_classes = feature_classes self.continuous_vs_continuous_method = continuous_vs_continuous_method @staticmethod def recode_binary(series): """ Ensure a binary series is coded as 0 and 1. If the series is already numeric with values {0,1}, it is returned as is. Otherwise, it maps the two unique values to 0 and 1. Parameters ---------- series : pd.Series A binary series to recode. Returns ------- pd.Series Binary series with values {0, 1}. Raises ------ ValueError If the series does not appear to be binary (has more than 2 unique values). """ # Check if already numeric and in {0, 1} if pd.api.types.is_numeric_dtype(series): unique_vals = series.dropna().unique() if set(unique_vals) <= {0, 1}: return series # Map two unique values to {0, 1} unique_vals = series.dropna().unique() if len(unique_vals) == 2: mapping = {unique_vals[0]: 0, unique_vals[1]: 1} return series.map(mapping) else: raise ValueError("Series does not appear to be binary") @staticmethod def cramers_v(x, y): """ Calculate Cramér's V statistic for a categorical-categorical association. Cramér's V is a measure of association between two nominal variables, ranging from 0 (no association) to 1 (perfect association). Parameters ---------- x, y : array-like Two categorical variables. Returns ------- float Cramér's V statistic, or np.nan if computation is not possible. """ contingency_table = pd.crosstab(x, y) chi2 = chi2_contingency(contingency_table)[0] n = contingency_table.values.sum() min_dim = min(contingency_table.shape) - 1 if n == 0 or min_dim == 0: return np.nan return np.sqrt(chi2 / (n * min_dim)) @staticmethod def anova_eta(categories, measurements): """ Compute the eta (η) as an effect size measure derived from one-way ANOVA. It indicates the proportion of variance in the continuous variable (measurements) explained by the categorical grouping (categories). Higher values indicate a stronger effect. Parameters: categories : array-like (categorical grouping) measurements : array-like (continuous values) Returns: eta : float, between 0 and 1 representing the effect size. """ # Factorize the categorical variable factors, _ = pd.factorize(categories) categories_count = np.max(factors) + 1 overall_mean = np.mean(measurements) ss_between = 0.0 # Sum of Squares for i in range(categories_count): group = measurements[factors == i] n_i = len(group) if n_i == 0: continue group_mean = np.mean(group) ss_between += n_i * ((group_mean - overall_mean) ** 2) ss_total = np.sum((measurements - overall_mean) ** 2) if ss_total == 0: return np.nan eta = np.sqrt(ss_between / ss_total) return eta def compute_pairwise_correlation(self, series1, type1, series2, type2): """ Compute the correlation/association between two series based on their data types. Parameters: series1, series2 : pandas.Series type1, type2 : str, one of 'Continuous', 'Binary', 'Categorical' Returns: A correlation/association measure (float) or np.nan if not defined. """ # ------------- Homogeneous Data types ------------- # Continuous vs. Continuous: Pearson correlation if {type1, type2} == {'Continuous', 'Continuous'}: return series1.corr(series2, method=self.continuous_vs_continuous_method) # Binary vs. Binary: Phi coefficient (using Pearson on recoded binaries) elif {type1, type2} == {'Binary', 'Binary'}: try: s1 = self.recode_binary(series1) s2 = self.recode_binary(series2) except Exception as e: return np.nan return s1.corr(s2, method='pearson') # Categorical vs. Categorical: Use Cramér's V elif {type1, type2} == {'Categorical', 'Categorical'}: return self.cramers_v(series1, series2) # ------------- Heterogeneous Data Types ------------- # Binary & Continuous: Point-biserial correlation coefficient elif {type1, type2} == {'Continuous', 'Binary'}: binary_series = series1 if type1 == 'Binary' else series2 continuous_series = series2 if type2 == 'Continuous' else series1 try: binary_series = self.recode_binary(binary_series) except Exception as e: return np.nan corr, _ = pointbiserialr(binary_series, continuous_series) return corr # Categorical vs. Continuous: Use ANOVA-based effect size (η) elif {type1, type2} == {'Continuous', 'Categorical'}: return self.anova_eta(series1, series2) if type1 == 'Categorical' else self.anova_eta(series2, series1) # Binary vs. Categorical: Treat as nominal and use Cramér's V elif {type1, type2} == {'Binary', 'Categorical'}: return self.cramers_v(series1, series2) else: return np.nan def generate_matrix(self): """ Generate a symmetric correlation/association matrix for the specified columns, using the appropriate method based on their data types. The matrix is computed by iterating over all feature pairs and selecting the appropriate correlation measure based on their types. The matrix is symmetric (corr(A, B) = corr(B, A)). Returns ------- pd.DataFrame A symmetric correlation/association matrix with feature names as both index and columns. Values are rounded to 4 decimal places. """ factors = list(self.feature_classes.keys()) corr_matrix = pd.DataFrame(index=factors, columns=factors, dtype=float) # Compute pairwise correlations for i, var1 in tqdm(list(enumerate(factors))): for j, var2 in enumerate(factors): if i == j: # Diagonal: perfect correlation with itself corr_matrix.loc[var1, var2] = 1.0 elif pd.isna(corr_matrix.loc[var1, var2]): # Compute correlation only if not already computed (upper triangle) series1 = self.df[var1] series2 = self.df[var2] type1 = self.feature_classes[var1] type2 = self.feature_classes[var2] corr_value = self.compute_pairwise_correlation(series1, type1, series2, type2) # Fill both upper and lower triangle for symmetry corr_matrix.loc[var1, var2] = corr_value corr_matrix.loc[var2, var1] = corr_value # ensure symmetry return corr_matrix.round(4)