AVeri / src /features_ngram.py
salirafi's picture
Upload 14 files
66242b8 verified
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Optional
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from textstat import textstat
from tqdm.auto import tqdm
@dataclass(slots=True)
class Config:
verbose: bool = True
char_ngram_n: int = 4
char_tfidf_min_df: int | float = 2
char_tfidf_max_df: int | float = 0.95
char_tfidf_max_features: Optional[int] = 100000
pos_ngram_range: tuple[int, int] = (3, 3)
pos_tfidf_min_df: int | float = 2
pos_tfidf_max_df: int | float = 0.95
pos_tfidf_max_features: Optional[int] = 50000
sublinear_tf: bool = True
norm: str = "l2"
include_readability: bool = True
dense_output: bool = True
config = Config()
# ================== CAHARCTER 4-GRAM =======================
# bsaed on definition in https://asistdl.onlinelibrary.wiley.com/doi/10.1002/asi.22954
def build_space_free_char_ngrams(text: str, n: int = 4) -> list[str]:
spans = text.split()
grams: list[str] = []
for span in spans:
if not span:
continue
if len(span) < n:
grams.append(span)
continue
for index in range(len(span) - n + 1):
grams.append(span[index:index + n])
return grams
def build_char_corpus(
dict_df: dict[str, pd.DataFrame],
split_name: str,
config: Config = config,
) -> dict[str, list[str]]:
corpus_by_column: dict[str, list[str]] = {}
split_df = dict_df[split_name]
for column in ["text1", "text2"]:
texts = split_df[column].tolist()
iterator = tqdm(texts, total=len(texts), desc=f"Char4 prep [{split_name}:{column}]")
corpus_by_column[column] = [
" ".join(build_space_free_char_ngrams(text, n=config.char_ngram_n))
for text in iterator
]
return corpus_by_column
def fit_char_vectorizer(
dict_df: dict[str, pd.DataFrame],
config: Config = config,
) -> tuple[TfidfVectorizer, dict[str, list[str]]]:
if config.verbose:
print("\nBuilding character 4-gram training corpus from train/text1 + train/text2...")
train_corpus_by_column = build_char_corpus(dict_df, split_name="train", config=config)
fit_corpus: list[str] = []
for column in ["text1", "text2"]:
fit_corpus.extend(train_corpus_by_column[column]) # character-level features
vectorizer = TfidfVectorizer(
analyzer="word",
# token_pattern=r"(?u)\b\S+\b",
lowercase=False,
preprocessor=None,
tokenizer=str.split,
ngram_range=(1, 1),
min_df=config.char_tfidf_min_df,
max_df=config.char_tfidf_max_df,
max_features=config.char_tfidf_max_features,
sublinear_tf=config.sublinear_tf,
norm=config.norm,
)
vectorizer.fit(fit_corpus)
if config.verbose:
print(f"\nFitted character 4-gram vocabulary size: {len(vectorizer.get_feature_names_out()):,}")
return vectorizer, train_corpus_by_column
# =================== POS n-GRAM =========================
def record_to_pos_sequence(record: dict[str, Any]) -> list[str]:
return [
token_pos
for token_pos, is_punct, is_space in zip(
record["token_pos"],
record["token_is_punct"],
record["token_is_space"],
strict=False,
) if not is_punct and not is_space]
def build_pos_corpus(
split_cache: dict[str, list[dict[str, Any]]],
split_name: str,
config: Config = config,
) -> dict[str, list[str]]:
corpus_by_column: dict[str, list[str]] = {}
for column in ["text1", "text2"]:
records = split_cache[column]
iterator = tqdm(records, total=len(records), desc=f"POS prep [{split_name}:{column}]")
corpus_by_column[column] = [
" ".join(record_to_pos_sequence(record))
for record in iterator
]
return corpus_by_column
def fit_pos_vectorizer(
linguistic_cache: dict[str, dict[str, list[dict[str, Any]]]],
config: Config = config,
) -> tuple[TfidfVectorizer, dict[str, list[str]]]:
if config.verbose:
print("\nBuilding POS n-gram training corpus from train/text1 + train/text2...")
train_corpus_by_column = build_pos_corpus(linguistic_cache["train"], split_name="train", config=config)
fit_corpus: list[str] = []
for column in ["text1", "text2"]:
fit_corpus.extend(train_corpus_by_column[column])
vectorizer = TfidfVectorizer(
analyzer="word",
# token_pattern=r"(?u)\b\S+\b",
lowercase=False,
preprocessor=None,
tokenizer=str.split,
ngram_range=config.pos_ngram_range,
min_df=config.pos_tfidf_min_df,
max_df=config.pos_tfidf_max_df,
max_features=config.pos_tfidf_max_features,
sublinear_tf=config.sublinear_tf,
norm=config.norm,
)
vectorizer.fit(fit_corpus)
if config.verbose:
print(f"\nFitted POS {config.pos_ngram_range}-gram vocabulary size: {len(vectorizer.get_feature_names_out()):,}")
return vectorizer, train_corpus_by_column
# =================== MAIN =========================
def _matrix_to_feature_df(
matrix: Any,
column_prefix: str,
feature_prefix: str,
config: Config = config,
) -> pd.DataFrame:
values = matrix.toarray() if config.dense_output else matrix
columns = [
f"{column_prefix}_{feature_prefix}_{index:05d}"
for index in range(matrix.shape[1])
]
return pd.DataFrame(values, columns=columns)
def build_readability_df(
df: pd.DataFrame,
) -> pd.DataFrame:
rows: list[dict[str, float]] = []
for _, row in df.iterrows():
feature_row: dict[str, float] = {}
for column in ["text1", "text2"]:
text = row[column]
feature_row[f"{column}_readability_flesch_kincaid_grade"] = round(textstat.flesch_kincaid_grade(text), 5)
feature_row[f"{column}_readability_gunning_fog"] = round(textstat.gunning_fog(text), 5)
feature_row[f"{column}_readability_smog"] = round(textstat.smog_index(text), 5)
feature_row[f"{column}_readability_coleman_liau"] = round(textstat.coleman_liau_index(text), 5)
rows.append(feature_row)
return pd.DataFrame(rows)
def transform_split(
base_df: pd.DataFrame,
dict_df: dict[str, pd.DataFrame],
linguistic_cache: dict[str, dict[str, list[dict[str, Any]]]],
split_name: str,
char_vectorizer: TfidfVectorizer,
pos_vectorizer: TfidfVectorizer,
config: Config = config,
) -> tuple[pd.DataFrame, dict[str, list[str]], dict[str, list[str]], dict[str, float]]:
result = base_df.copy().reset_index(drop=True)
char_corpus_by_column = build_char_corpus(dict_df, split_name=split_name, config=config)
pos_corpus_by_column = build_pos_corpus(linguistic_cache[split_name], split_name=split_name, config=config)
density_stats: dict[str, float] = {}
for column in ["text1", "text2"]:
char_matrix = char_vectorizer.transform(char_corpus_by_column[column])
density_stats[f"{column}_char_avg_nonzero_features"] = round(
char_matrix.getnnz(axis=1).mean() if char_matrix.shape[0] else 0.0,
6,
)
char_df = _matrix_to_feature_df(
char_matrix,
column_prefix=column,
feature_prefix=f"char{config.char_ngram_n}_tfidf",
config=config,
)
result = pd.concat([result, char_df.reset_index(drop=True)], axis=1)
pos_matrix = pos_vectorizer.transform(pos_corpus_by_column[column])
density_stats[f"{column}_pos_avg_nonzero_features"] = round(
pos_matrix.getnnz(axis=1).mean() if pos_matrix.shape[0] else 0.0,
6,
)
pos_df = _matrix_to_feature_df(
pos_matrix,
column_prefix=column,
feature_prefix=f"pos{config.pos_ngram_range}_tfidf",
config=config,
)
result = pd.concat([result, pos_df.reset_index(drop=True)], axis=1)
if config.include_readability:
readability_df = build_readability_df(dict_df[split_name])
result = pd.concat([result, readability_df.reset_index(drop=True)], axis=1)
return result, char_corpus_by_column, pos_corpus_by_column, density_stats
def build_summary(
dict_df: dict[str, pd.DataFrame],
density_stats_by_split: dict[str, dict[str, float]],
char_vocabulary_size: int,
pos_vocabulary_size: int,
) -> pd.DataFrame:
rows: list[dict[str, Any]] = []
for split, df in dict_df.items():
row: dict[str, Any] = {
"split": split,
"num_rows": len(df),
"char_vocabulary_size": char_vocabulary_size,
"pos_vocabulary_size": pos_vocabulary_size,
}
row.update(density_stats_by_split.get(split, {}))
rows.append(row)
return pd.DataFrame(rows)
def ngram_features_wrapper(
dict_df: dict[str, pd.DataFrame],
linguistic_cache: dict[str, dict[str, list[dict[str, Any]]]],
config: Config = config,
) -> tuple[dict[str, pd.DataFrame], pd.DataFrame, dict[str, Any]]:
if config.verbose:
print("======= N-GRAM FEATURES START =======")
char_vectorizer, train_char_corpus = fit_char_vectorizer(dict_df, config=config)
pos_vectorizer, train_pos_corpus = fit_pos_vectorizer(linguistic_cache, config=config)
ngram_dict_df: dict[str, pd.DataFrame] = {}
char_corpus_by_split: dict[str, dict[str, list[str]]] = {}
pos_corpus_by_split: dict[str, dict[str, list[str]]] = {}
density_stats_by_split: dict[str, dict[str, float]] = {}
for split, df in dict_df.items():
if config.verbose:
print(f"\nTransforming split='{split}' ({len(df):,} rows)")
transformed_df, split_char_corpus, split_pos_corpus, density_stats = transform_split(
base_df=df,
dict_df=dict_df,
linguistic_cache=linguistic_cache,
split_name=split,
char_vectorizer=char_vectorizer,
pos_vectorizer=pos_vectorizer,
config=config,
)
ngram_dict_df[split] = transformed_df
char_corpus_by_split[split] = split_char_corpus
pos_corpus_by_split[split] = split_pos_corpus
density_stats_by_split[split] = density_stats
ngram_summary_df = build_summary(
ngram_dict_df,
density_stats_by_split=density_stats_by_split,
char_vocabulary_size=len(char_vectorizer.get_feature_names_out()),
pos_vocabulary_size=len(pos_vectorizer.get_feature_names_out()),
)
ngram_artifacts = {
"char_vectorizer": char_vectorizer,
"char_feature_names": char_vectorizer.get_feature_names_out().tolist(),
"train_char_corpus": train_char_corpus,
"char_corpus_by_split": char_corpus_by_split,
"pos_vectorizer": pos_vectorizer,
"pos_feature_names": pos_vectorizer.get_feature_names_out().tolist(),
"train_pos_corpus": train_pos_corpus,
"pos_corpus_by_split": pos_corpus_by_split,
"config": config,
}
if config.verbose:
print("\nN-gram summary:")
print(ngram_summary_df)
print("")
print("======= N-GRAM FEATURES END =======")
print("")
return ngram_dict_df, ngram_summary_df, ngram_artifacts