Spaces:
Sleeping
Sleeping
File size: 8,396 Bytes
e869d90 29854ee e869d90 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 | from itertools import chain
import numpy as np
import pandas as pd
from sklearn.decomposition import PCA
from sklearn.preprocessing import MinMaxScaler
from src.utils.correlation import CorrelationMatrixGenerator
class DimensionReduction:
"""
Correlation-driven clustering of features with a STRICT pairwise constraint:
every pair of features in a cluster must have correlation within [lower_bound, upper_bound].
Clusters are found as (maximal) cliques in the graph where an edge connects two features
iff their correlation lies in the requested band.
"""
def __init__(self, dataframe, feature_classes, method="pearson", projection_dimension=1):
self.dataframe = dataframe.copy()
self.correlation_matrix = CorrelationMatrixGenerator(
df=self.dataframe,
feature_classes=feature_classes,
continuous_vs_continuous_method=method
).generate_matrix()
if not isinstance(self.correlation_matrix, pd.DataFrame):
raise TypeError("CorrelationMatrixGenerator.generate_matrix() must return a pandas.DataFrame")
if projection_dimension < 1:
raise ValueError("projection_dimension must be >= 1")
self.k = int(projection_dimension)
# ---------------------------
# Strict clique-based clustering
# ---------------------------
def _cluster_features(self, lower_bound, upper_bound):
"""
Return DISJOINT clusters where every pair is within [lower_bound, upper_bound].
Implemented via maximal cliques + greedy disjoint selection.
"""
if not (0 <= lower_bound <= upper_bound <= 1):
raise ValueError("Bounds must satisfy 0 <= lower_bound <= upper_bound <= 1")
cm = self.correlation_matrix
# Use only features present in the correlation matrix (and ideally in dataframe)
features = [c for c in cm.columns if c in cm.index]
if not features:
return []
def in_band(x):
return pd.notna(x) and (lower_bound <= x <= upper_bound)
# Build adjacency sets
adj = {f: set() for f in features}
for f in features:
row = cm.loc[f, features]
for g, val in row.items():
if g == f:
continue
if in_band(val):
adj[f].add(g)
# Bron–Kerbosch with pivot to enumerate maximal cliques
cliques = []
def bron_kerbosch(R, P, X):
if not P and not X:
if len(R) >= 2:
cliques.append(set(R))
return
# Choose a pivot to reduce branching
if P or X:
u = max(P | X, key=lambda v: len(adj[v] & P))
candidates = P - (adj[u] if u in adj else set())
else:
candidates = set(P)
for v in list(candidates):
bron_kerbosch(R | {v}, P & adj[v], X & adj[v])
P.remove(v)
X.add(v)
bron_kerbosch(set(), set(features), set())
if not cliques:
return []
# Score cliques: prefer larger, then higher average correlation (tie-break deterministic)
def avg_corr(clique_set):
cols = sorted(clique_set)
sub = cm.loc[cols, cols].to_numpy(dtype=float)
tri = sub[np.triu_indices_from(sub, k=1)]
tri = tri[~np.isnan(tri)]
return float(tri.mean()) if tri.size else -np.inf
cliques_sorted = sorted(
cliques,
key=lambda c: (-len(c), -avg_corr(c), tuple(sorted(c)))
)
# Greedily produce DISJOINT clusters (otherwise PCA/drop will conflict)
used = set()
final_clusters = []
for c in cliques_sorted:
# Subset of a clique is still a clique -> pairwise constraint remains valid
remaining = sorted(list(set(c) - used))
if len(remaining) >= 2:
final_clusters.append(remaining)
used.update(remaining)
return final_clusters
@staticmethod
def _solve_conflict(clusters_dictionary):
"""
Safe conflict resolver across correlation bands:
later keys win, features removed from earlier clusters.
Removing elements from a clique keeps it a clique, so pairwise constraint is preserved.
"""
keys = list(clusters_dictionary.keys())
used = set()
for key in reversed(keys): # later bands win
cleaned = []
for cluster in clusters_dictionary[key]:
remaining = [f for f in cluster if f not in used]
if len(remaining) >= 2:
cleaned.append(remaining)
used.update(remaining)
clusters_dictionary[key] = cleaned
return clusters_dictionary
def find_clusters(self, lower_bound, upper_bound):
return self._cluster_features(lower_bound=lower_bound, upper_bound=upper_bound)
# ---------------------------
# PCA projection / replacement
# ---------------------------
def _assign_pca_components(self, cluster_index, comps, index):
"""
Assign PCA components into group.* columns, supporting k==1 and k>1.
"""
if comps.ndim != 2:
raise ValueError("PCA output must be 2D")
k_eff = comps.shape[1]
if k_eff == 1:
self.dataframe[f"group.{cluster_index}"] = pd.Series(comps[:, 0], index=index)
else:
for c in range(k_eff):
self.dataframe[f"group.{cluster_index}.{c}"] = pd.Series(comps[:, c], index=index)
def reduce_dimension(self, lower_bound=0.95, upper_bound=1.0, scale=True):
clusters = self._cluster_features(lower_bound=lower_bound, upper_bound=upper_bound)
for cluster_index, cols in enumerate(clusters):
# Guard: only keep columns still present
cols = [c for c in cols if c in self.dataframe.columns]
if len(cols) < 2:
continue
subset = self.dataframe[cols]
# PCA needs numeric matrix; if you truly have non-numerics here, you must encode upstream.
if not all(pd.api.types.is_numeric_dtype(subset[c]) for c in subset.columns):
raise TypeError(
f"Non-numeric columns found in cluster {cluster_index}: {cols}. "
"Encode them before PCA or restrict clustering to numeric features."
)
X = subset.to_numpy()
if scale:
X = MinMaxScaler().fit_transform(X)
pca = PCA(n_components=min(self.k, X.shape[1]))
comps = pca.fit_transform(X)
self._assign_pca_components(cluster_index, comps, index=subset.index)
self.dataframe.drop(columns=cols, inplace=True)
return self.dataframe
def reduce_dimension_by_grouping(self, threshold=0.8, group_count=4, scale=True):
clusters = {}
steps = np.round(np.linspace(threshold, 1.0, group_count + 1), 4)
for i in range(len(steps) - 1):
lb, ub = float(steps[i]), float(steps[i + 1])
clusters[(lb, ub)] = self._cluster_features(lower_bound=lb, upper_bound=ub)
clusters = self._solve_conflict(clusters_dictionary=clusters)
final_clusters = list(chain(*clusters.values()))
for cluster_index, cols in enumerate(final_clusters):
cols = [c for c in cols if c in self.dataframe.columns]
if len(cols) < 2:
continue
subset = self.dataframe[cols]
if not all(pd.api.types.is_numeric_dtype(subset[c]) for c in subset.columns):
raise TypeError(
f"Non-numeric columns found in cluster {cluster_index}: {cols}. "
"Encode them before PCA or restrict clustering to numeric features."
)
X = subset.to_numpy()
if scale:
X = MinMaxScaler().fit_transform(X)
pca = PCA(n_components=min(self.k, X.shape[1]))
comps = pca.fit_transform(X)
self._assign_pca_components(cluster_index, comps, index=subset.index)
self.dataframe.drop(columns=cols, inplace=True)
return self.dataframe, final_clusters
|