| from __future__ import annotations |
|
|
| from dataclasses import dataclass |
| from typing import Any, Optional |
|
|
| import pandas as pd |
| from sklearn.feature_extraction.text import TfidfVectorizer |
| from textstat import textstat |
| from tqdm.auto import tqdm |
|
|
|
|
| @dataclass(slots=True) |
| class Config: |
| verbose: bool = True |
| char_ngram_n: int = 4 |
| char_tfidf_min_df: int | float = 2 |
| char_tfidf_max_df: int | float = 0.95 |
| char_tfidf_max_features: Optional[int] = 100000 |
| pos_ngram_range: tuple[int, int] = (3, 3) |
| pos_tfidf_min_df: int | float = 2 |
| pos_tfidf_max_df: int | float = 0.95 |
| pos_tfidf_max_features: Optional[int] = 50000 |
| sublinear_tf: bool = True |
| norm: str = "l2" |
| include_readability: bool = True |
| dense_output: bool = True |
|
|
|
|
| config = Config() |
|
|
|
|
| |
|
|
| |
| def build_space_free_char_ngrams(text: str, n: int = 4) -> list[str]: |
| spans = text.split() |
| grams: list[str] = [] |
| for span in spans: |
| if not span: |
| continue |
| if len(span) < n: |
| grams.append(span) |
| continue |
| for index in range(len(span) - n + 1): |
| grams.append(span[index:index + n]) |
| return grams |
|
|
| def build_char_corpus( |
| dict_df: dict[str, pd.DataFrame], |
| split_name: str, |
| config: Config = config, |
| ) -> dict[str, list[str]]: |
| |
| corpus_by_column: dict[str, list[str]] = {} |
| split_df = dict_df[split_name] |
|
|
| for column in ["text1", "text2"]: |
| texts = split_df[column].tolist() |
| iterator = tqdm(texts, total=len(texts), desc=f"Char4 prep [{split_name}:{column}]") |
| corpus_by_column[column] = [ |
| " ".join(build_space_free_char_ngrams(text, n=config.char_ngram_n)) |
| for text in iterator |
| ] |
|
|
| return corpus_by_column |
|
|
| def fit_char_vectorizer( |
| dict_df: dict[str, pd.DataFrame], |
| config: Config = config, |
| ) -> tuple[TfidfVectorizer, dict[str, list[str]]]: |
| |
| if config.verbose: |
| print("\nBuilding character 4-gram training corpus from train/text1 + train/text2...") |
|
|
| train_corpus_by_column = build_char_corpus(dict_df, split_name="train", config=config) |
| fit_corpus: list[str] = [] |
| for column in ["text1", "text2"]: |
| fit_corpus.extend(train_corpus_by_column[column]) |
|
|
| vectorizer = TfidfVectorizer( |
| analyzer="word", |
| |
| lowercase=False, |
| preprocessor=None, |
| tokenizer=str.split, |
| ngram_range=(1, 1), |
| min_df=config.char_tfidf_min_df, |
| max_df=config.char_tfidf_max_df, |
| max_features=config.char_tfidf_max_features, |
| sublinear_tf=config.sublinear_tf, |
| norm=config.norm, |
| ) |
| vectorizer.fit(fit_corpus) |
|
|
| if config.verbose: |
| print(f"\nFitted character 4-gram vocabulary size: {len(vectorizer.get_feature_names_out()):,}") |
|
|
| return vectorizer, train_corpus_by_column |
|
|
|
|
| |
|
|
|
|
| def record_to_pos_sequence(record: dict[str, Any]) -> list[str]: |
| return [ |
| token_pos |
| for token_pos, is_punct, is_space in zip( |
| record["token_pos"], |
| record["token_is_punct"], |
| record["token_is_space"], |
| strict=False, |
| ) if not is_punct and not is_space] |
|
|
| def build_pos_corpus( |
| split_cache: dict[str, list[dict[str, Any]]], |
| split_name: str, |
| config: Config = config, |
| ) -> dict[str, list[str]]: |
| |
| corpus_by_column: dict[str, list[str]] = {} |
|
|
| for column in ["text1", "text2"]: |
| records = split_cache[column] |
| iterator = tqdm(records, total=len(records), desc=f"POS prep [{split_name}:{column}]") |
| corpus_by_column[column] = [ |
| " ".join(record_to_pos_sequence(record)) |
| for record in iterator |
| ] |
|
|
| return corpus_by_column |
|
|
| def fit_pos_vectorizer( |
| linguistic_cache: dict[str, dict[str, list[dict[str, Any]]]], |
| config: Config = config, |
| ) -> tuple[TfidfVectorizer, dict[str, list[str]]]: |
| |
| if config.verbose: |
| print("\nBuilding POS n-gram training corpus from train/text1 + train/text2...") |
|
|
| train_corpus_by_column = build_pos_corpus(linguistic_cache["train"], split_name="train", config=config) |
| fit_corpus: list[str] = [] |
| for column in ["text1", "text2"]: |
| fit_corpus.extend(train_corpus_by_column[column]) |
|
|
| vectorizer = TfidfVectorizer( |
| analyzer="word", |
| |
| lowercase=False, |
| preprocessor=None, |
| tokenizer=str.split, |
| ngram_range=config.pos_ngram_range, |
| min_df=config.pos_tfidf_min_df, |
| max_df=config.pos_tfidf_max_df, |
| max_features=config.pos_tfidf_max_features, |
| sublinear_tf=config.sublinear_tf, |
| norm=config.norm, |
| ) |
| vectorizer.fit(fit_corpus) |
|
|
| if config.verbose: |
| print(f"\nFitted POS {config.pos_ngram_range}-gram vocabulary size: {len(vectorizer.get_feature_names_out()):,}") |
|
|
| return vectorizer, train_corpus_by_column |
|
|
|
|
| |
|
|
|
|
| def _matrix_to_feature_df( |
| matrix: Any, |
| column_prefix: str, |
| feature_prefix: str, |
| config: Config = config, |
| ) -> pd.DataFrame: |
| values = matrix.toarray() if config.dense_output else matrix |
| columns = [ |
| f"{column_prefix}_{feature_prefix}_{index:05d}" |
| for index in range(matrix.shape[1]) |
| ] |
| return pd.DataFrame(values, columns=columns) |
|
|
|
|
| def build_readability_df( |
| df: pd.DataFrame, |
| ) -> pd.DataFrame: |
| rows: list[dict[str, float]] = [] |
|
|
| for _, row in df.iterrows(): |
| feature_row: dict[str, float] = {} |
| for column in ["text1", "text2"]: |
| text = row[column] |
| feature_row[f"{column}_readability_flesch_kincaid_grade"] = round(textstat.flesch_kincaid_grade(text), 5) |
| feature_row[f"{column}_readability_gunning_fog"] = round(textstat.gunning_fog(text), 5) |
| feature_row[f"{column}_readability_smog"] = round(textstat.smog_index(text), 5) |
| feature_row[f"{column}_readability_coleman_liau"] = round(textstat.coleman_liau_index(text), 5) |
| rows.append(feature_row) |
|
|
| return pd.DataFrame(rows) |
|
|
|
|
| def transform_split( |
| base_df: pd.DataFrame, |
| dict_df: dict[str, pd.DataFrame], |
| linguistic_cache: dict[str, dict[str, list[dict[str, Any]]]], |
| split_name: str, |
| char_vectorizer: TfidfVectorizer, |
| pos_vectorizer: TfidfVectorizer, |
| config: Config = config, |
| ) -> tuple[pd.DataFrame, dict[str, list[str]], dict[str, list[str]], dict[str, float]]: |
| |
| result = base_df.copy().reset_index(drop=True) |
| char_corpus_by_column = build_char_corpus(dict_df, split_name=split_name, config=config) |
| pos_corpus_by_column = build_pos_corpus(linguistic_cache[split_name], split_name=split_name, config=config) |
| density_stats: dict[str, float] = {} |
|
|
| for column in ["text1", "text2"]: |
| char_matrix = char_vectorizer.transform(char_corpus_by_column[column]) |
| density_stats[f"{column}_char_avg_nonzero_features"] = round( |
| char_matrix.getnnz(axis=1).mean() if char_matrix.shape[0] else 0.0, |
| 6, |
| ) |
| char_df = _matrix_to_feature_df( |
| char_matrix, |
| column_prefix=column, |
| feature_prefix=f"char{config.char_ngram_n}_tfidf", |
| config=config, |
| ) |
| result = pd.concat([result, char_df.reset_index(drop=True)], axis=1) |
|
|
| pos_matrix = pos_vectorizer.transform(pos_corpus_by_column[column]) |
| density_stats[f"{column}_pos_avg_nonzero_features"] = round( |
| pos_matrix.getnnz(axis=1).mean() if pos_matrix.shape[0] else 0.0, |
| 6, |
| ) |
| pos_df = _matrix_to_feature_df( |
| pos_matrix, |
| column_prefix=column, |
| feature_prefix=f"pos{config.pos_ngram_range}_tfidf", |
| config=config, |
| ) |
| result = pd.concat([result, pos_df.reset_index(drop=True)], axis=1) |
|
|
| if config.include_readability: |
| readability_df = build_readability_df(dict_df[split_name]) |
| result = pd.concat([result, readability_df.reset_index(drop=True)], axis=1) |
|
|
| return result, char_corpus_by_column, pos_corpus_by_column, density_stats |
|
|
|
|
| def build_summary( |
| dict_df: dict[str, pd.DataFrame], |
| density_stats_by_split: dict[str, dict[str, float]], |
| char_vocabulary_size: int, |
| pos_vocabulary_size: int, |
| ) -> pd.DataFrame: |
| |
| rows: list[dict[str, Any]] = [] |
|
|
| for split, df in dict_df.items(): |
| row: dict[str, Any] = { |
| "split": split, |
| "num_rows": len(df), |
| "char_vocabulary_size": char_vocabulary_size, |
| "pos_vocabulary_size": pos_vocabulary_size, |
| } |
| row.update(density_stats_by_split.get(split, {})) |
| rows.append(row) |
|
|
| return pd.DataFrame(rows) |
|
|
|
|
| def ngram_features_wrapper( |
| dict_df: dict[str, pd.DataFrame], |
| linguistic_cache: dict[str, dict[str, list[dict[str, Any]]]], |
| config: Config = config, |
| ) -> tuple[dict[str, pd.DataFrame], pd.DataFrame, dict[str, Any]]: |
|
|
| if config.verbose: |
| print("======= N-GRAM FEATURES START =======") |
|
|
| char_vectorizer, train_char_corpus = fit_char_vectorizer(dict_df, config=config) |
| pos_vectorizer, train_pos_corpus = fit_pos_vectorizer(linguistic_cache, config=config) |
|
|
| ngram_dict_df: dict[str, pd.DataFrame] = {} |
| char_corpus_by_split: dict[str, dict[str, list[str]]] = {} |
| pos_corpus_by_split: dict[str, dict[str, list[str]]] = {} |
| density_stats_by_split: dict[str, dict[str, float]] = {} |
|
|
| for split, df in dict_df.items(): |
|
|
| if config.verbose: |
| print(f"\nTransforming split='{split}' ({len(df):,} rows)") |
|
|
| transformed_df, split_char_corpus, split_pos_corpus, density_stats = transform_split( |
| base_df=df, |
| dict_df=dict_df, |
| linguistic_cache=linguistic_cache, |
| split_name=split, |
| char_vectorizer=char_vectorizer, |
| pos_vectorizer=pos_vectorizer, |
| config=config, |
| ) |
| ngram_dict_df[split] = transformed_df |
| char_corpus_by_split[split] = split_char_corpus |
| pos_corpus_by_split[split] = split_pos_corpus |
| density_stats_by_split[split] = density_stats |
|
|
| ngram_summary_df = build_summary( |
| ngram_dict_df, |
| density_stats_by_split=density_stats_by_split, |
| char_vocabulary_size=len(char_vectorizer.get_feature_names_out()), |
| pos_vocabulary_size=len(pos_vectorizer.get_feature_names_out()), |
| ) |
|
|
| ngram_artifacts = { |
| "char_vectorizer": char_vectorizer, |
| "char_feature_names": char_vectorizer.get_feature_names_out().tolist(), |
| "train_char_corpus": train_char_corpus, |
| "char_corpus_by_split": char_corpus_by_split, |
| "pos_vectorizer": pos_vectorizer, |
| "pos_feature_names": pos_vectorizer.get_feature_names_out().tolist(), |
| "train_pos_corpus": train_pos_corpus, |
| "pos_corpus_by_split": pos_corpus_by_split, |
| "config": config, |
| } |
|
|
| if config.verbose: |
| print("\nN-gram summary:") |
| print(ngram_summary_df) |
| print("") |
| print("======= N-GRAM FEATURES END =======") |
| print("") |
|
|
| return ngram_dict_df, ngram_summary_df, ngram_artifacts |
|
|