from itertools import chain import numpy as np import pandas as pd from sklearn.decomposition import PCA from sklearn.preprocessing import MinMaxScaler from src.utils.correlation import CorrelationMatrixGenerator class DimensionReduction: """ Correlation-driven clustering of features with a STRICT pairwise constraint: every pair of features in a cluster must have correlation within [lower_bound, upper_bound]. Clusters are found as (maximal) cliques in the graph where an edge connects two features iff their correlation lies in the requested band. """ def __init__(self, dataframe, feature_classes, method="pearson", projection_dimension=1): self.dataframe = dataframe.copy() self.correlation_matrix = CorrelationMatrixGenerator( df=self.dataframe, feature_classes=feature_classes, continuous_vs_continuous_method=method ).generate_matrix() if not isinstance(self.correlation_matrix, pd.DataFrame): raise TypeError("CorrelationMatrixGenerator.generate_matrix() must return a pandas.DataFrame") if projection_dimension < 1: raise ValueError("projection_dimension must be >= 1") self.k = int(projection_dimension) # --------------------------- # Strict clique-based clustering # --------------------------- def _cluster_features(self, lower_bound, upper_bound): """ Return DISJOINT clusters where every pair is within [lower_bound, upper_bound]. Implemented via maximal cliques + greedy disjoint selection. """ if not (0 <= lower_bound <= upper_bound <= 1): raise ValueError("Bounds must satisfy 0 <= lower_bound <= upper_bound <= 1") cm = self.correlation_matrix # Use only features present in the correlation matrix (and ideally in dataframe) features = [c for c in cm.columns if c in cm.index] if not features: return [] def in_band(x): return pd.notna(x) and (lower_bound <= x <= upper_bound) # Build adjacency sets adj = {f: set() for f in features} for f in features: row = cm.loc[f, features] for g, val in row.items(): if g == f: continue if in_band(val): adj[f].add(g) # Bron–Kerbosch with pivot to enumerate maximal cliques cliques = [] def bron_kerbosch(R, P, X): if not P and not X: if len(R) >= 2: cliques.append(set(R)) return # Choose a pivot to reduce branching if P or X: u = max(P | X, key=lambda v: len(adj[v] & P)) candidates = P - (adj[u] if u in adj else set()) else: candidates = set(P) for v in list(candidates): bron_kerbosch(R | {v}, P & adj[v], X & adj[v]) P.remove(v) X.add(v) bron_kerbosch(set(), set(features), set()) if not cliques: return [] # Score cliques: prefer larger, then higher average correlation (tie-break deterministic) def avg_corr(clique_set): cols = sorted(clique_set) sub = cm.loc[cols, cols].to_numpy(dtype=float) tri = sub[np.triu_indices_from(sub, k=1)] tri = tri[~np.isnan(tri)] return float(tri.mean()) if tri.size else -np.inf cliques_sorted = sorted( cliques, key=lambda c: (-len(c), -avg_corr(c), tuple(sorted(c))) ) # Greedily produce DISJOINT clusters (otherwise PCA/drop will conflict) used = set() final_clusters = [] for c in cliques_sorted: # Subset of a clique is still a clique -> pairwise constraint remains valid remaining = sorted(list(set(c) - used)) if len(remaining) >= 2: final_clusters.append(remaining) used.update(remaining) return final_clusters @staticmethod def _solve_conflict(clusters_dictionary): """ Safe conflict resolver across correlation bands: later keys win, features removed from earlier clusters. Removing elements from a clique keeps it a clique, so pairwise constraint is preserved. """ keys = list(clusters_dictionary.keys()) used = set() for key in reversed(keys): # later bands win cleaned = [] for cluster in clusters_dictionary[key]: remaining = [f for f in cluster if f not in used] if len(remaining) >= 2: cleaned.append(remaining) used.update(remaining) clusters_dictionary[key] = cleaned return clusters_dictionary def find_clusters(self, lower_bound, upper_bound): return self._cluster_features(lower_bound=lower_bound, upper_bound=upper_bound) # --------------------------- # PCA projection / replacement # --------------------------- def _assign_pca_components(self, cluster_index, comps, index): """ Assign PCA components into group.* columns, supporting k==1 and k>1. """ if comps.ndim != 2: raise ValueError("PCA output must be 2D") k_eff = comps.shape[1] if k_eff == 1: self.dataframe[f"group.{cluster_index}"] = pd.Series(comps[:, 0], index=index) else: for c in range(k_eff): self.dataframe[f"group.{cluster_index}.{c}"] = pd.Series(comps[:, c], index=index) def reduce_dimension(self, lower_bound=0.95, upper_bound=1.0, scale=True): clusters = self._cluster_features(lower_bound=lower_bound, upper_bound=upper_bound) for cluster_index, cols in enumerate(clusters): # Guard: only keep columns still present cols = [c for c in cols if c in self.dataframe.columns] if len(cols) < 2: continue subset = self.dataframe[cols] # PCA needs numeric matrix; if you truly have non-numerics here, you must encode upstream. if not all(pd.api.types.is_numeric_dtype(subset[c]) for c in subset.columns): raise TypeError( f"Non-numeric columns found in cluster {cluster_index}: {cols}. " "Encode them before PCA or restrict clustering to numeric features." ) X = subset.to_numpy() if scale: X = MinMaxScaler().fit_transform(X) pca = PCA(n_components=min(self.k, X.shape[1])) comps = pca.fit_transform(X) self._assign_pca_components(cluster_index, comps, index=subset.index) self.dataframe.drop(columns=cols, inplace=True) return self.dataframe def reduce_dimension_by_grouping(self, threshold=0.8, group_count=4, scale=True): clusters = {} steps = np.round(np.linspace(threshold, 1.0, group_count + 1), 4) for i in range(len(steps) - 1): lb, ub = float(steps[i]), float(steps[i + 1]) clusters[(lb, ub)] = self._cluster_features(lower_bound=lb, upper_bound=ub) clusters = self._solve_conflict(clusters_dictionary=clusters) final_clusters = list(chain(*clusters.values())) for cluster_index, cols in enumerate(final_clusters): cols = [c for c in cols if c in self.dataframe.columns] if len(cols) < 2: continue subset = self.dataframe[cols] if not all(pd.api.types.is_numeric_dtype(subset[c]) for c in subset.columns): raise TypeError( f"Non-numeric columns found in cluster {cluster_index}: {cols}. " "Encode them before PCA or restrict clustering to numeric features." ) X = subset.to_numpy() if scale: X = MinMaxScaler().fit_transform(X) pca = PCA(n_components=min(self.k, X.shape[1])) comps = pca.fit_transform(X) self._assign_pca_components(cluster_index, comps, index=subset.index) self.dataframe.drop(columns=cols, inplace=True) return self.dataframe, final_clusters