| from __future__ import annotations |
|
|
| from dataclasses import dataclass |
| from typing import Any, Optional |
|
|
| import numpy as np |
| import pandas as pd |
| from scipy import sparse |
| from sklearn.decomposition import TruncatedSVD |
|
|
|
|
| @dataclass(slots=True) |
| class Config: |
| verbose: bool = True |
| text_columns: tuple[str, ...] = ("text1", "text2") |
| keep_original_columns: bool = False |
| reduce_tfidf: bool = True |
| reduce_char_ngrams: bool = True |
| reduce_pos_ngrams: bool = True |
| tfidf_components: int = 300 |
| char_components: int = 300 |
| pos_components: int = 100 |
| random_state: int = 42 |
|
|
| config = Config() |
|
|
|
|
| |
| FAMILY_SPECS = { |
| "tfidf": { |
| "suffix_prefix": "tfidf_", |
| "config_flag": "reduce_tfidf", |
| "components_attr": "tfidf_components", |
| "reduced_prefix": "tfidf_svd", |
| }, |
| "char_ngrams": { |
| "suffix_prefix": "char", |
| "must_contain": "_tfidf_", |
| "config_flag": "reduce_char_ngrams", |
| "components_attr": "char_components", |
| "reduced_prefix": "char_tfidf_svd", |
| }, |
| "pos_ngrams": { |
| "suffix_prefix": "pos", |
| "must_contain": "_tfidf_", |
| "config_flag": "reduce_pos_ngrams", |
| "components_attr": "pos_components", |
| "reduced_prefix": "pos_tfidf_svd", |
| }} |
|
|
|
|
|
|
| def _match_family_suffix(suffix: str, family_name: str) -> bool: |
| spec = FAMILY_SPECS[family_name] |
| if family_name == "tfidf": |
| return suffix.startswith(spec["suffix_prefix"]) |
| return suffix.startswith(spec["suffix_prefix"]) and spec["must_contain"] in suffix |
|
|
| |
| def discover_family_suffixes(df: pd.DataFrame, family_name: str) -> list[str]: |
| suffixes: list[str] = [] |
| for column in df.columns: |
| if not column.startswith("text1_"): |
| continue |
| suffix = column[len("text1_"):] |
| if _match_family_suffix(suffix, family_name): |
| suffixes.append(suffix) |
| return suffixes |
|
|
|
|
| |
|
|
| def _shared_train_matrix(train_df: pd.DataFrame, suffixes: list[str]) -> sparse.csr_matrix: |
| text1_columns = [f"text1_{suffix}" for suffix in suffixes] |
| text2_columns = [f"text2_{suffix}" for suffix in suffixes] |
| train_text1 = sparse.csr_matrix(train_df[text1_columns].to_numpy(dtype=np.float32, copy=False)) |
| train_text2 = sparse.csr_matrix(train_df[text2_columns].to_numpy(dtype=np.float32, copy=False)) |
| return sparse.vstack([train_text1, train_text2], format="csr") |
|
|
| |
| |
| def _effective_components(requested_components: int, train_matrix: sparse.csr_matrix) -> int: |
| max_components = min(train_matrix.shape[0] - 1, train_matrix.shape[1] - 1) |
| if max_components < 1: |
| return 1 |
| return min(requested_components, max_components) |
|
|
| def fit_family_svd( |
| train_df: pd.DataFrame, |
| family_name: str, |
| suffixes: list[str], |
| config: Config = config, |
| ) -> tuple[TruncatedSVD, int]: |
| train_matrix = _shared_train_matrix(train_df, suffixes) |
| requested_components = getattr(config, FAMILY_SPECS[family_name]["components_attr"]) |
| n_components = _effective_components(requested_components, train_matrix) |
|
|
| svd = TruncatedSVD(n_components=n_components, random_state=config.random_state) |
| svd.fit(train_matrix) |
| return svd, n_components |
|
|
| |
|
|
|
|
| def transform_family_split( |
| df: pd.DataFrame, |
| family_name: str, |
| suffixes: list[str], |
| svd: TruncatedSVD, |
| config: Config = config, |
| ) -> pd.DataFrame: |
| |
| result = df.copy().reset_index(drop=True) |
| text1_columns = [f"text1_{suffix}" for suffix in suffixes] |
| text2_columns = [f"text2_{suffix}" for suffix in suffixes] |
|
|
| text1_matrix = sparse.csr_matrix(result[text1_columns].to_numpy(dtype=np.float32, copy=False)) |
| text2_matrix = sparse.csr_matrix(result[text2_columns].to_numpy(dtype=np.float32, copy=False)) |
|
|
| |
| text1_reduced = svd.transform(text1_matrix) |
| text2_reduced = svd.transform(text2_matrix) |
|
|
| reduced_prefix = FAMILY_SPECS[family_name]["reduced_prefix"] |
| text1_reduced_df = pd.DataFrame(text1_reduced, |
| columns=[f"text1_{reduced_prefix}_{index:04d}" for index in range(text1_reduced.shape[1])]) |
| text2_reduced_df = pd.DataFrame(text2_reduced, |
| columns=[f"text2_{reduced_prefix}_{index:04d}" for index in range(text2_reduced.shape[1])]) |
|
|
| if not config.keep_original_columns: |
| result = result.drop(columns=text1_columns + text2_columns) |
|
|
| result = pd.concat([ |
| result.reset_index(drop=True), |
| text1_reduced_df.reset_index(drop=True), |
| text2_reduced_df.reset_index(drop=True), |
| ], axis=1) |
| |
| return result |
|
|
|
|
|
|
| def dimensionality_reduction_wrapper( |
| dict_df: dict[str, pd.DataFrame], |
| config: Config = config, |
| ) -> tuple[dict[str, pd.DataFrame], pd.DataFrame, dict[str, Any]]: |
|
|
|
|
| if config.verbose: |
| print("======= DIMENSIONALITY REDUCTION START =======") |
|
|
| reduced_dict_df = {split: df.copy().reset_index(drop=True) for split, df in dict_df.items()} |
| summary_rows: list[dict[str, Any]] = [] |
| artifacts: dict[str, Any] = {"svd_models": {}, "family_suffixes": {}} |
|
|
| for family_name, spec in FAMILY_SPECS.items(): |
|
|
| if not getattr(config, spec["config_flag"]): |
| continue |
|
|
| suffixes = discover_family_suffixes(reduced_dict_df["train"], family_name) |
| if config.verbose: |
| print(f"\nFitting TruncatedSVD for family='{family_name}' with {len(suffixes):,} base features") |
|
|
| svd, n_components = fit_family_svd(reduced_dict_df["train"], family_name=family_name, suffixes=suffixes, config=config) |
|
|
| artifacts["svd_models"][family_name] = svd |
| artifacts["family_suffixes"][family_name] = suffixes |
|
|
| for split, df in list(reduced_dict_df.items()): |
|
|
| if config.verbose: |
| print(f" Transforming split='{split}' for family='{family_name}'") |
| reduced_dict_df[split] = transform_family_split( |
| df, |
| family_name=family_name, |
| suffixes=suffixes, |
| svd=svd, |
| config=config, |
| ) |
|
|
| explained_variance = float(svd.explained_variance_ratio_.sum()) |
| summary_rows.append({ |
| "family": family_name, |
| "original_base_features": len(suffixes), |
| "reduced_components": n_components, |
| "explained_variance_ratio_sum": round(explained_variance, 6), |
| }) |
|
|
| if config.verbose: |
| print( |
| f" Reduced family='{family_name}' to {n_components} components " |
| f"(explained variance sum={explained_variance:.4f})" |
| ) |
|
|
| reduction_summary_df = pd.DataFrame(summary_rows) |
|
|
| if config.verbose: |
| print("\nDimensionality reduction summary:") |
| print(reduction_summary_df) |
| print("") |
| print("======= DIMENSIONALITY REDUCTION END =======") |
| print("") |
|
|
| return reduced_dict_df, reduction_summary_df, artifacts |
|
|