CSS_EDA_Dashboard / src /utils /dimension_reduction.py
arash7920's picture
Upload 35 files
29854ee verified
from itertools import chain
import numpy as np
import pandas as pd
from sklearn.decomposition import PCA
from sklearn.preprocessing import MinMaxScaler
from src.utils.correlation import CorrelationMatrixGenerator
class DimensionReduction:
"""
Correlation-driven clustering of features with a STRICT pairwise constraint:
every pair of features in a cluster must have correlation within [lower_bound, upper_bound].
Clusters are found as (maximal) cliques in the graph where an edge connects two features
iff their correlation lies in the requested band.
"""
def __init__(self, dataframe, feature_classes, method="pearson", projection_dimension=1):
self.dataframe = dataframe.copy()
self.correlation_matrix = CorrelationMatrixGenerator(
df=self.dataframe,
feature_classes=feature_classes,
continuous_vs_continuous_method=method
).generate_matrix()
if not isinstance(self.correlation_matrix, pd.DataFrame):
raise TypeError("CorrelationMatrixGenerator.generate_matrix() must return a pandas.DataFrame")
if projection_dimension < 1:
raise ValueError("projection_dimension must be >= 1")
self.k = int(projection_dimension)
# ---------------------------
# Strict clique-based clustering
# ---------------------------
def _cluster_features(self, lower_bound, upper_bound):
"""
Return DISJOINT clusters where every pair is within [lower_bound, upper_bound].
Implemented via maximal cliques + greedy disjoint selection.
"""
if not (0 <= lower_bound <= upper_bound <= 1):
raise ValueError("Bounds must satisfy 0 <= lower_bound <= upper_bound <= 1")
cm = self.correlation_matrix
# Use only features present in the correlation matrix (and ideally in dataframe)
features = [c for c in cm.columns if c in cm.index]
if not features:
return []
def in_band(x):
return pd.notna(x) and (lower_bound <= x <= upper_bound)
# Build adjacency sets
adj = {f: set() for f in features}
for f in features:
row = cm.loc[f, features]
for g, val in row.items():
if g == f:
continue
if in_band(val):
adj[f].add(g)
# Bron–Kerbosch with pivot to enumerate maximal cliques
cliques = []
def bron_kerbosch(R, P, X):
if not P and not X:
if len(R) >= 2:
cliques.append(set(R))
return
# Choose a pivot to reduce branching
if P or X:
u = max(P | X, key=lambda v: len(adj[v] & P))
candidates = P - (adj[u] if u in adj else set())
else:
candidates = set(P)
for v in list(candidates):
bron_kerbosch(R | {v}, P & adj[v], X & adj[v])
P.remove(v)
X.add(v)
bron_kerbosch(set(), set(features), set())
if not cliques:
return []
# Score cliques: prefer larger, then higher average correlation (tie-break deterministic)
def avg_corr(clique_set):
cols = sorted(clique_set)
sub = cm.loc[cols, cols].to_numpy(dtype=float)
tri = sub[np.triu_indices_from(sub, k=1)]
tri = tri[~np.isnan(tri)]
return float(tri.mean()) if tri.size else -np.inf
cliques_sorted = sorted(
cliques,
key=lambda c: (-len(c), -avg_corr(c), tuple(sorted(c)))
)
# Greedily produce DISJOINT clusters (otherwise PCA/drop will conflict)
used = set()
final_clusters = []
for c in cliques_sorted:
# Subset of a clique is still a clique -> pairwise constraint remains valid
remaining = sorted(list(set(c) - used))
if len(remaining) >= 2:
final_clusters.append(remaining)
used.update(remaining)
return final_clusters
@staticmethod
def _solve_conflict(clusters_dictionary):
"""
Safe conflict resolver across correlation bands:
later keys win, features removed from earlier clusters.
Removing elements from a clique keeps it a clique, so pairwise constraint is preserved.
"""
keys = list(clusters_dictionary.keys())
used = set()
for key in reversed(keys): # later bands win
cleaned = []
for cluster in clusters_dictionary[key]:
remaining = [f for f in cluster if f not in used]
if len(remaining) >= 2:
cleaned.append(remaining)
used.update(remaining)
clusters_dictionary[key] = cleaned
return clusters_dictionary
def find_clusters(self, lower_bound, upper_bound):
return self._cluster_features(lower_bound=lower_bound, upper_bound=upper_bound)
# ---------------------------
# PCA projection / replacement
# ---------------------------
def _assign_pca_components(self, cluster_index, comps, index):
"""
Assign PCA components into group.* columns, supporting k==1 and k>1.
"""
if comps.ndim != 2:
raise ValueError("PCA output must be 2D")
k_eff = comps.shape[1]
if k_eff == 1:
self.dataframe[f"group.{cluster_index}"] = pd.Series(comps[:, 0], index=index)
else:
for c in range(k_eff):
self.dataframe[f"group.{cluster_index}.{c}"] = pd.Series(comps[:, c], index=index)
def reduce_dimension(self, lower_bound=0.95, upper_bound=1.0, scale=True):
clusters = self._cluster_features(lower_bound=lower_bound, upper_bound=upper_bound)
for cluster_index, cols in enumerate(clusters):
# Guard: only keep columns still present
cols = [c for c in cols if c in self.dataframe.columns]
if len(cols) < 2:
continue
subset = self.dataframe[cols]
# PCA needs numeric matrix; if you truly have non-numerics here, you must encode upstream.
if not all(pd.api.types.is_numeric_dtype(subset[c]) for c in subset.columns):
raise TypeError(
f"Non-numeric columns found in cluster {cluster_index}: {cols}. "
"Encode them before PCA or restrict clustering to numeric features."
)
X = subset.to_numpy()
if scale:
X = MinMaxScaler().fit_transform(X)
pca = PCA(n_components=min(self.k, X.shape[1]))
comps = pca.fit_transform(X)
self._assign_pca_components(cluster_index, comps, index=subset.index)
self.dataframe.drop(columns=cols, inplace=True)
return self.dataframe
def reduce_dimension_by_grouping(self, threshold=0.8, group_count=4, scale=True):
clusters = {}
steps = np.round(np.linspace(threshold, 1.0, group_count + 1), 4)
for i in range(len(steps) - 1):
lb, ub = float(steps[i]), float(steps[i + 1])
clusters[(lb, ub)] = self._cluster_features(lower_bound=lb, upper_bound=ub)
clusters = self._solve_conflict(clusters_dictionary=clusters)
final_clusters = list(chain(*clusters.values()))
for cluster_index, cols in enumerate(final_clusters):
cols = [c for c in cols if c in self.dataframe.columns]
if len(cols) < 2:
continue
subset = self.dataframe[cols]
if not all(pd.api.types.is_numeric_dtype(subset[c]) for c in subset.columns):
raise TypeError(
f"Non-numeric columns found in cluster {cluster_index}: {cols}. "
"Encode them before PCA or restrict clustering to numeric features."
)
X = subset.to_numpy()
if scale:
X = MinMaxScaler().fit_transform(X)
pca = PCA(n_components=min(self.k, X.shape[1]))
comps = pca.fit_transform(X)
self._assign_pca_components(cluster_index, comps, index=subset.index)
self.dataframe.drop(columns=cols, inplace=True)
return self.dataframe, final_clusters