Spaces:
Sleeping
Sleeping
| from itertools import chain | |
| import numpy as np | |
| import pandas as pd | |
| from sklearn.decomposition import PCA | |
| from sklearn.preprocessing import MinMaxScaler | |
| from src.utils.correlation import CorrelationMatrixGenerator | |
| class DimensionReduction: | |
| """ | |
| Correlation-driven clustering of features with a STRICT pairwise constraint: | |
| every pair of features in a cluster must have correlation within [lower_bound, upper_bound]. | |
| Clusters are found as (maximal) cliques in the graph where an edge connects two features | |
| iff their correlation lies in the requested band. | |
| """ | |
| def __init__(self, dataframe, feature_classes, method="pearson", projection_dimension=1): | |
| self.dataframe = dataframe.copy() | |
| self.correlation_matrix = CorrelationMatrixGenerator( | |
| df=self.dataframe, | |
| feature_classes=feature_classes, | |
| continuous_vs_continuous_method=method | |
| ).generate_matrix() | |
| if not isinstance(self.correlation_matrix, pd.DataFrame): | |
| raise TypeError("CorrelationMatrixGenerator.generate_matrix() must return a pandas.DataFrame") | |
| if projection_dimension < 1: | |
| raise ValueError("projection_dimension must be >= 1") | |
| self.k = int(projection_dimension) | |
| # --------------------------- | |
| # Strict clique-based clustering | |
| # --------------------------- | |
| def _cluster_features(self, lower_bound, upper_bound): | |
| """ | |
| Return DISJOINT clusters where every pair is within [lower_bound, upper_bound]. | |
| Implemented via maximal cliques + greedy disjoint selection. | |
| """ | |
| if not (0 <= lower_bound <= upper_bound <= 1): | |
| raise ValueError("Bounds must satisfy 0 <= lower_bound <= upper_bound <= 1") | |
| cm = self.correlation_matrix | |
| # Use only features present in the correlation matrix (and ideally in dataframe) | |
| features = [c for c in cm.columns if c in cm.index] | |
| if not features: | |
| return [] | |
| def in_band(x): | |
| return pd.notna(x) and (lower_bound <= x <= upper_bound) | |
| # Build adjacency sets | |
| adj = {f: set() for f in features} | |
| for f in features: | |
| row = cm.loc[f, features] | |
| for g, val in row.items(): | |
| if g == f: | |
| continue | |
| if in_band(val): | |
| adj[f].add(g) | |
| # Bron–Kerbosch with pivot to enumerate maximal cliques | |
| cliques = [] | |
| def bron_kerbosch(R, P, X): | |
| if not P and not X: | |
| if len(R) >= 2: | |
| cliques.append(set(R)) | |
| return | |
| # Choose a pivot to reduce branching | |
| if P or X: | |
| u = max(P | X, key=lambda v: len(adj[v] & P)) | |
| candidates = P - (adj[u] if u in adj else set()) | |
| else: | |
| candidates = set(P) | |
| for v in list(candidates): | |
| bron_kerbosch(R | {v}, P & adj[v], X & adj[v]) | |
| P.remove(v) | |
| X.add(v) | |
| bron_kerbosch(set(), set(features), set()) | |
| if not cliques: | |
| return [] | |
| # Score cliques: prefer larger, then higher average correlation (tie-break deterministic) | |
| def avg_corr(clique_set): | |
| cols = sorted(clique_set) | |
| sub = cm.loc[cols, cols].to_numpy(dtype=float) | |
| tri = sub[np.triu_indices_from(sub, k=1)] | |
| tri = tri[~np.isnan(tri)] | |
| return float(tri.mean()) if tri.size else -np.inf | |
| cliques_sorted = sorted( | |
| cliques, | |
| key=lambda c: (-len(c), -avg_corr(c), tuple(sorted(c))) | |
| ) | |
| # Greedily produce DISJOINT clusters (otherwise PCA/drop will conflict) | |
| used = set() | |
| final_clusters = [] | |
| for c in cliques_sorted: | |
| # Subset of a clique is still a clique -> pairwise constraint remains valid | |
| remaining = sorted(list(set(c) - used)) | |
| if len(remaining) >= 2: | |
| final_clusters.append(remaining) | |
| used.update(remaining) | |
| return final_clusters | |
| def _solve_conflict(clusters_dictionary): | |
| """ | |
| Safe conflict resolver across correlation bands: | |
| later keys win, features removed from earlier clusters. | |
| Removing elements from a clique keeps it a clique, so pairwise constraint is preserved. | |
| """ | |
| keys = list(clusters_dictionary.keys()) | |
| used = set() | |
| for key in reversed(keys): # later bands win | |
| cleaned = [] | |
| for cluster in clusters_dictionary[key]: | |
| remaining = [f for f in cluster if f not in used] | |
| if len(remaining) >= 2: | |
| cleaned.append(remaining) | |
| used.update(remaining) | |
| clusters_dictionary[key] = cleaned | |
| return clusters_dictionary | |
| def find_clusters(self, lower_bound, upper_bound): | |
| return self._cluster_features(lower_bound=lower_bound, upper_bound=upper_bound) | |
| # --------------------------- | |
| # PCA projection / replacement | |
| # --------------------------- | |
| def _assign_pca_components(self, cluster_index, comps, index): | |
| """ | |
| Assign PCA components into group.* columns, supporting k==1 and k>1. | |
| """ | |
| if comps.ndim != 2: | |
| raise ValueError("PCA output must be 2D") | |
| k_eff = comps.shape[1] | |
| if k_eff == 1: | |
| self.dataframe[f"group.{cluster_index}"] = pd.Series(comps[:, 0], index=index) | |
| else: | |
| for c in range(k_eff): | |
| self.dataframe[f"group.{cluster_index}.{c}"] = pd.Series(comps[:, c], index=index) | |
| def reduce_dimension(self, lower_bound=0.95, upper_bound=1.0, scale=True): | |
| clusters = self._cluster_features(lower_bound=lower_bound, upper_bound=upper_bound) | |
| for cluster_index, cols in enumerate(clusters): | |
| # Guard: only keep columns still present | |
| cols = [c for c in cols if c in self.dataframe.columns] | |
| if len(cols) < 2: | |
| continue | |
| subset = self.dataframe[cols] | |
| # PCA needs numeric matrix; if you truly have non-numerics here, you must encode upstream. | |
| if not all(pd.api.types.is_numeric_dtype(subset[c]) for c in subset.columns): | |
| raise TypeError( | |
| f"Non-numeric columns found in cluster {cluster_index}: {cols}. " | |
| "Encode them before PCA or restrict clustering to numeric features." | |
| ) | |
| X = subset.to_numpy() | |
| if scale: | |
| X = MinMaxScaler().fit_transform(X) | |
| pca = PCA(n_components=min(self.k, X.shape[1])) | |
| comps = pca.fit_transform(X) | |
| self._assign_pca_components(cluster_index, comps, index=subset.index) | |
| self.dataframe.drop(columns=cols, inplace=True) | |
| return self.dataframe | |
| def reduce_dimension_by_grouping(self, threshold=0.8, group_count=4, scale=True): | |
| clusters = {} | |
| steps = np.round(np.linspace(threshold, 1.0, group_count + 1), 4) | |
| for i in range(len(steps) - 1): | |
| lb, ub = float(steps[i]), float(steps[i + 1]) | |
| clusters[(lb, ub)] = self._cluster_features(lower_bound=lb, upper_bound=ub) | |
| clusters = self._solve_conflict(clusters_dictionary=clusters) | |
| final_clusters = list(chain(*clusters.values())) | |
| for cluster_index, cols in enumerate(final_clusters): | |
| cols = [c for c in cols if c in self.dataframe.columns] | |
| if len(cols) < 2: | |
| continue | |
| subset = self.dataframe[cols] | |
| if not all(pd.api.types.is_numeric_dtype(subset[c]) for c in subset.columns): | |
| raise TypeError( | |
| f"Non-numeric columns found in cluster {cluster_index}: {cols}. " | |
| "Encode them before PCA or restrict clustering to numeric features." | |
| ) | |
| X = subset.to_numpy() | |
| if scale: | |
| X = MinMaxScaler().fit_transform(X) | |
| pca = PCA(n_components=min(self.k, X.shape[1])) | |
| comps = pca.fit_transform(X) | |
| self._assign_pca_components(cluster_index, comps, index=subset.index) | |
| self.dataframe.drop(columns=cols, inplace=True) | |
| return self.dataframe, final_clusters | |