File size: 9,539 Bytes
e869d90
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
"""
Correlation matrix generation module for mixed data types.

This module provides the CorrelationMatrixGenerator class which computes
correlation/association matrices for DataFrames containing mixed data types
(Continuous, Binary, Categorical). It automatically selects appropriate
correlation measures based on feature type pairs.
"""

import numpy as np
import pandas as pd
from scipy.stats import chi2_contingency, pointbiserialr
from tqdm import tqdm


class CorrelationMatrixGenerator:
    """
    A class to generate a correlation/association matrix for a pandas DataFrame,
    handling different data types appropriately. It supports Continuous, Binary, and Categorical data types.
    Parameters:
    ----------
    df : pd.DataFrame
        The input DataFrame containing features for correlation analysis.
    feature_classes : dict
        A dictionary mapping column names to their data types ('Continuous', 'Binary', 'Categorical').
    continuous_vs_continuous_method : str, optional
        Method to use for estimating the correlation coefficient of two continuous data types. Default is 'pearson'.
    Methods:
    -------
    generate_matrix() -> pd.DataFrame
        Generates and returns a symmetric correlation/association matrix for the DataFrame.
    """

    def __init__(self, df, feature_classes, continuous_vs_continuous_method='pearson'):

        """
        Initialize with a DataFrame and a dictionary mapping column names to data types.
        
        Parameters:
            df : pandas.DataFrame
                The DataFrame containing your data.
            feature_classes : dict
                A dictionary where keys are column names in df and values are their data types.
                Valid types are 'Continuous', 'Binary', or 'Categorical'.
            continuous_vs_continuous_method : str
                Method to use for estimating the correlation coefficient of two continuous data
        """

        self.df = df
        self.feature_classes = feature_classes
        self.continuous_vs_continuous_method = continuous_vs_continuous_method
        

    @staticmethod
    def recode_binary(series):
        """
        Ensure a binary series is coded as 0 and 1.
        
        If the series is already numeric with values {0,1}, it is returned as is.
        Otherwise, it maps the two unique values to 0 and 1.
        
        Parameters
        ----------
        series : pd.Series
            A binary series to recode.
            
        Returns
        -------
        pd.Series
            Binary series with values {0, 1}.
            
        Raises
        ------
        ValueError
            If the series does not appear to be binary (has more than 2 unique values).
        """
        # Check if already numeric and in {0, 1}
        if pd.api.types.is_numeric_dtype(series):
            unique_vals = series.dropna().unique()
            if set(unique_vals) <= {0, 1}:
                return series
        # Map two unique values to {0, 1}
        unique_vals = series.dropna().unique()
        if len(unique_vals) == 2:
            mapping = {unique_vals[0]: 0, unique_vals[1]: 1}
            return series.map(mapping)
        else:
            raise ValueError("Series does not appear to be binary")

    @staticmethod
    def cramers_v(x, y):
        """
        Calculate Cramér's V statistic for a categorical-categorical association.
        
        Cramér's V is a measure of association between two nominal variables,
        ranging from 0 (no association) to 1 (perfect association).
        
        Parameters
        ----------
        x, y : array-like
            Two categorical variables.
            
        Returns
        -------
        float
            Cramér's V statistic, or np.nan if computation is not possible.
        """
        contingency_table = pd.crosstab(x, y)
        chi2 = chi2_contingency(contingency_table)[0]
        n = contingency_table.values.sum()
        min_dim = min(contingency_table.shape) - 1
        if n == 0 or min_dim == 0:
            return np.nan
        return np.sqrt(chi2 / (n * min_dim))

    @staticmethod
    def anova_eta(categories, measurements):
        """
        Compute the eta (η) as an effect size measure derived from one-way ANOVA.
        It indicates the proportion of variance in the continuous variable (measurements)
        explained by the categorical grouping (categories). Higher values indicate a stronger effect.
        
        Parameters:
          categories : array-like (categorical grouping)
          measurements : array-like (continuous values)
          
        Returns:
          eta : float, between 0 and 1 representing the effect size.
        """

        # Factorize the categorical variable
        factors, _ = pd.factorize(categories)
        categories_count = np.max(factors) + 1
        overall_mean = np.mean(measurements)
        ss_between = 0.0 # Sum of Squares

        for i in range(categories_count):
            group = measurements[factors == i]
            n_i = len(group)
            if n_i == 0:
                continue
            group_mean = np.mean(group)
            ss_between += n_i * ((group_mean - overall_mean) ** 2)

        ss_total = np.sum((measurements - overall_mean) ** 2)

        if ss_total == 0:
            return np.nan
        
        eta = np.sqrt(ss_between / ss_total)

        return eta

    def compute_pairwise_correlation(self, series1, type1, series2, type2):
        """
        Compute the correlation/association between two series based on their data types.
        
        Parameters:
          series1, series2 : pandas.Series
          type1, type2 : str, one of 'Continuous', 'Binary', 'Categorical'
          
        Returns:
          A correlation/association measure (float) or np.nan if not defined.
        """

        # ------------- Homogeneous Data types -------------

        # Continuous vs. Continuous: Pearson correlation
        if {type1, type2} == {'Continuous', 'Continuous'}:
            return series1.corr(series2, method=self.continuous_vs_continuous_method)

        # Binary vs. Binary: Phi coefficient (using Pearson on recoded binaries)
        elif {type1, type2} == {'Binary', 'Binary'}:
            try:
                s1 = self.recode_binary(series1)
                s2 = self.recode_binary(series2)
            except Exception as e:
                return np.nan
            return s1.corr(s2, method='pearson')
        
        # Categorical vs. Categorical: Use Cramér's V
        elif {type1, type2} == {'Categorical', 'Categorical'}:
            return self.cramers_v(series1, series2)

        # ------------- Heterogeneous Data Types -------------
        
        # Binary & Continuous: Point-biserial correlation coefficient
        elif {type1, type2} == {'Continuous', 'Binary'}:

            binary_series = series1 if type1 == 'Binary' else series2
            continuous_series = series2 if type2 == 'Continuous' else series1

            try:
                binary_series = self.recode_binary(binary_series)
            except Exception as e:
                return np.nan
            
            corr, _ = pointbiserialr(binary_series, continuous_series)

            return corr
        
        # Categorical vs. Continuous: Use ANOVA-based effect size (η)
        elif {type1, type2} == {'Continuous', 'Categorical'}:
            return self.anova_eta(series1, series2) if type1 == 'Categorical' else self.anova_eta(series2, series1)
        
        # Binary vs. Categorical: Treat as nominal and use Cramér's V
        elif {type1, type2} == {'Binary', 'Categorical'}:
            return self.cramers_v(series1, series2)

        else:
            return np.nan

    def generate_matrix(self):
        """
        Generate a symmetric correlation/association matrix for the specified columns,
        using the appropriate method based on their data types.
        
        The matrix is computed by iterating over all feature pairs and selecting
        the appropriate correlation measure based on their types. The matrix
        is symmetric (corr(A, B) = corr(B, A)).
        
        Returns
        -------
        pd.DataFrame
            A symmetric correlation/association matrix with feature names as
            both index and columns. Values are rounded to 4 decimal places.
        """
        factors = list(self.feature_classes.keys())
        corr_matrix = pd.DataFrame(index=factors, columns=factors, dtype=float)
        
        # Compute pairwise correlations
        for i, var1 in tqdm(list(enumerate(factors))):
            for j, var2 in enumerate(factors):
                if i == j:
                    # Diagonal: perfect correlation with itself
                    corr_matrix.loc[var1, var2] = 1.0
                elif pd.isna(corr_matrix.loc[var1, var2]):
                    # Compute correlation only if not already computed (upper triangle)
                    series1 = self.df[var1]
                    series2 = self.df[var2]
                    type1 = self.feature_classes[var1]
                    type2 = self.feature_classes[var2]
                    corr_value = self.compute_pairwise_correlation(series1, type1, series2, type2)
                    # Fill both upper and lower triangle for symmetry
                    corr_matrix.loc[var1, var2] = corr_value
                    corr_matrix.loc[var2, var1] = corr_value  # ensure symmetry

        return corr_matrix.round(4)