Spaces:
Sleeping
Sleeping
| """ | |
| Correlation matrix generation module for mixed data types. | |
| This module provides the CorrelationMatrixGenerator class which computes | |
| correlation/association matrices for DataFrames containing mixed data types | |
| (Continuous, Binary, Categorical). It automatically selects appropriate | |
| correlation measures based on feature type pairs. | |
| """ | |
| import numpy as np | |
| import pandas as pd | |
| from scipy.stats import chi2_contingency, pointbiserialr | |
| from tqdm import tqdm | |
| class CorrelationMatrixGenerator: | |
| """ | |
| A class to generate a correlation/association matrix for a pandas DataFrame, | |
| handling different data types appropriately. It supports Continuous, Binary, and Categorical data types. | |
| Parameters: | |
| ---------- | |
| df : pd.DataFrame | |
| The input DataFrame containing features for correlation analysis. | |
| feature_classes : dict | |
| A dictionary mapping column names to their data types ('Continuous', 'Binary', 'Categorical'). | |
| continuous_vs_continuous_method : str, optional | |
| Method to use for estimating the correlation coefficient of two continuous data types. Default is 'pearson'. | |
| Methods: | |
| ------- | |
| generate_matrix() -> pd.DataFrame | |
| Generates and returns a symmetric correlation/association matrix for the DataFrame. | |
| """ | |
| def __init__(self, df, feature_classes, continuous_vs_continuous_method='pearson'): | |
| """ | |
| Initialize with a DataFrame and a dictionary mapping column names to data types. | |
| Parameters: | |
| df : pandas.DataFrame | |
| The DataFrame containing your data. | |
| feature_classes : dict | |
| A dictionary where keys are column names in df and values are their data types. | |
| Valid types are 'Continuous', 'Binary', or 'Categorical'. | |
| continuous_vs_continuous_method : str | |
| Method to use for estimating the correlation coefficient of two continuous data | |
| """ | |
| self.df = df | |
| self.feature_classes = feature_classes | |
| self.continuous_vs_continuous_method = continuous_vs_continuous_method | |
| def recode_binary(series): | |
| """ | |
| Ensure a binary series is coded as 0 and 1. | |
| If the series is already numeric with values {0,1}, it is returned as is. | |
| Otherwise, it maps the two unique values to 0 and 1. | |
| Parameters | |
| ---------- | |
| series : pd.Series | |
| A binary series to recode. | |
| Returns | |
| ------- | |
| pd.Series | |
| Binary series with values {0, 1}. | |
| Raises | |
| ------ | |
| ValueError | |
| If the series does not appear to be binary (has more than 2 unique values). | |
| """ | |
| # Check if already numeric and in {0, 1} | |
| if pd.api.types.is_numeric_dtype(series): | |
| unique_vals = series.dropna().unique() | |
| if set(unique_vals) <= {0, 1}: | |
| return series | |
| # Map two unique values to {0, 1} | |
| unique_vals = series.dropna().unique() | |
| if len(unique_vals) == 2: | |
| mapping = {unique_vals[0]: 0, unique_vals[1]: 1} | |
| return series.map(mapping) | |
| else: | |
| raise ValueError("Series does not appear to be binary") | |
| def cramers_v(x, y): | |
| """ | |
| Calculate Cramér's V statistic for a categorical-categorical association. | |
| Cramér's V is a measure of association between two nominal variables, | |
| ranging from 0 (no association) to 1 (perfect association). | |
| Parameters | |
| ---------- | |
| x, y : array-like | |
| Two categorical variables. | |
| Returns | |
| ------- | |
| float | |
| Cramér's V statistic, or np.nan if computation is not possible. | |
| """ | |
| contingency_table = pd.crosstab(x, y) | |
| chi2 = chi2_contingency(contingency_table)[0] | |
| n = contingency_table.values.sum() | |
| min_dim = min(contingency_table.shape) - 1 | |
| if n == 0 or min_dim == 0: | |
| return np.nan | |
| return np.sqrt(chi2 / (n * min_dim)) | |
| def anova_eta(categories, measurements): | |
| """ | |
| Compute the eta (η) as an effect size measure derived from one-way ANOVA. | |
| It indicates the proportion of variance in the continuous variable (measurements) | |
| explained by the categorical grouping (categories). Higher values indicate a stronger effect. | |
| Parameters: | |
| categories : array-like (categorical grouping) | |
| measurements : array-like (continuous values) | |
| Returns: | |
| eta : float, between 0 and 1 representing the effect size. | |
| """ | |
| # Factorize the categorical variable | |
| factors, _ = pd.factorize(categories) | |
| categories_count = np.max(factors) + 1 | |
| overall_mean = np.mean(measurements) | |
| ss_between = 0.0 # Sum of Squares | |
| for i in range(categories_count): | |
| group = measurements[factors == i] | |
| n_i = len(group) | |
| if n_i == 0: | |
| continue | |
| group_mean = np.mean(group) | |
| ss_between += n_i * ((group_mean - overall_mean) ** 2) | |
| ss_total = np.sum((measurements - overall_mean) ** 2) | |
| if ss_total == 0: | |
| return np.nan | |
| eta = np.sqrt(ss_between / ss_total) | |
| return eta | |
| def compute_pairwise_correlation(self, series1, type1, series2, type2): | |
| """ | |
| Compute the correlation/association between two series based on their data types. | |
| Parameters: | |
| series1, series2 : pandas.Series | |
| type1, type2 : str, one of 'Continuous', 'Binary', 'Categorical' | |
| Returns: | |
| A correlation/association measure (float) or np.nan if not defined. | |
| """ | |
| # ------------- Homogeneous Data types ------------- | |
| # Continuous vs. Continuous: Pearson correlation | |
| if {type1, type2} == {'Continuous', 'Continuous'}: | |
| return series1.corr(series2, method=self.continuous_vs_continuous_method) | |
| # Binary vs. Binary: Phi coefficient (using Pearson on recoded binaries) | |
| elif {type1, type2} == {'Binary', 'Binary'}: | |
| try: | |
| s1 = self.recode_binary(series1) | |
| s2 = self.recode_binary(series2) | |
| except Exception as e: | |
| return np.nan | |
| return s1.corr(s2, method='pearson') | |
| # Categorical vs. Categorical: Use Cramér's V | |
| elif {type1, type2} == {'Categorical', 'Categorical'}: | |
| return self.cramers_v(series1, series2) | |
| # ------------- Heterogeneous Data Types ------------- | |
| # Binary & Continuous: Point-biserial correlation coefficient | |
| elif {type1, type2} == {'Continuous', 'Binary'}: | |
| binary_series = series1 if type1 == 'Binary' else series2 | |
| continuous_series = series2 if type2 == 'Continuous' else series1 | |
| try: | |
| binary_series = self.recode_binary(binary_series) | |
| except Exception as e: | |
| return np.nan | |
| corr, _ = pointbiserialr(binary_series, continuous_series) | |
| return corr | |
| # Categorical vs. Continuous: Use ANOVA-based effect size (η) | |
| elif {type1, type2} == {'Continuous', 'Categorical'}: | |
| return self.anova_eta(series1, series2) if type1 == 'Categorical' else self.anova_eta(series2, series1) | |
| # Binary vs. Categorical: Treat as nominal and use Cramér's V | |
| elif {type1, type2} == {'Binary', 'Categorical'}: | |
| return self.cramers_v(series1, series2) | |
| else: | |
| return np.nan | |
| def generate_matrix(self): | |
| """ | |
| Generate a symmetric correlation/association matrix for the specified columns, | |
| using the appropriate method based on their data types. | |
| The matrix is computed by iterating over all feature pairs and selecting | |
| the appropriate correlation measure based on their types. The matrix | |
| is symmetric (corr(A, B) = corr(B, A)). | |
| Returns | |
| ------- | |
| pd.DataFrame | |
| A symmetric correlation/association matrix with feature names as | |
| both index and columns. Values are rounded to 4 decimal places. | |
| """ | |
| factors = list(self.feature_classes.keys()) | |
| corr_matrix = pd.DataFrame(index=factors, columns=factors, dtype=float) | |
| # Compute pairwise correlations | |
| for i, var1 in tqdm(list(enumerate(factors))): | |
| for j, var2 in enumerate(factors): | |
| if i == j: | |
| # Diagonal: perfect correlation with itself | |
| corr_matrix.loc[var1, var2] = 1.0 | |
| elif pd.isna(corr_matrix.loc[var1, var2]): | |
| # Compute correlation only if not already computed (upper triangle) | |
| series1 = self.df[var1] | |
| series2 = self.df[var2] | |
| type1 = self.feature_classes[var1] | |
| type2 = self.feature_classes[var2] | |
| corr_value = self.compute_pairwise_correlation(series1, type1, series2, type2) | |
| # Fill both upper and lower triangle for symmetry | |
| corr_matrix.loc[var1, var2] = corr_value | |
| corr_matrix.loc[var2, var1] = corr_value # ensure symmetry | |
| return corr_matrix.round(4) | |