AVeri / src /model_training.py
salirafi's picture
Upload 14 files
66242b8 verified
from __future__ import annotations
import json
from xgboost import XGBClassifier
from helpers import save_json
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Any
import numpy as np
import pandas as pd
from scipy import sparse
from sklearn.metrics import (
accuracy_score,
balanced_accuracy_score,
confusion_matrix,
f1_score,
precision_score,
recall_score,
roc_auc_score,
)
@dataclass(slots=True)
class Config:
# local pairwise
include_statistical: bool = True
include_tfidf: bool = True
include_char_ngrams: bool = True
include_pos_ngrams: bool = True
include_readability: bool = True
include_local_pairwise: bool = True # local features pairwise operations; this will override the local pairwise booleans config
include_global_pairwise: bool = True # global features using cosine similarity
threshold_metric: str = "youden_j" # reducing false positives
threshold_grid_step: float = 0.01
model_params: dict[str, Any] | None = None
def __post_init__(self) -> None:
if self.model_params is None: # hyperparameters for XGBoost
# include local features
self.model_params = {
"objective": "binary:logistic",
"eval_metric": "logloss",
"n_estimators": 500,
"max_depth": 4,
"learning_rate": 0.05,
"subsample": 0.8,
"colsample_bytree": 0.3,
"min_child_weight": 3,
"reg_lambda": 5.0,
"reg_alpha": 1.0,
"random_state": 42,
"n_jobs": 2,
"tree_method": "hist",
}
# # only global featuers
# self.model_params = {
# "objective": "binary:logistic",
# "eval_metric": "logloss",
# "n_estimators": 200,
# "max_depth": 2,
# "learning_rate": 0.03,
# "subsample": 0.9,
# "colsample_bytree": 1.0,
# "min_child_weight": 5,
# "reg_lambda": 10.0,
# "reg_alpha": 0.5,
# "random_state": 42,
# "n_jobs": 2,
# "tree_method": "hist",
# }
def _feature_family_from_suffix(suffix: str) -> str:
if suffix.startswith("tfidf_"):
return "tfidf"
if suffix.startswith("char") and "_tfidf_" in suffix:
return "char_ngrams"
if suffix.startswith("pos") and "_tfidf_" in suffix:
return "pos_ngrams"
if suffix.startswith("readability_"):
return "readability"
return "statistical"
def _include_family(family: str, config: Config) -> bool:
return {
"statistical": config.include_statistical,
"tfidf": config.include_tfidf,
"char_ngrams": config.include_char_ngrams,
"pos_ngrams": config.include_pos_ngrams,
"readability": config.include_readability,
}[family]
# self-build cosine similarity function
def _safe_cosine_from_columns(left: sparse.csr_matrix, right: sparse.csr_matrix) -> np.ndarray:
numerator = np.asarray(left.multiply(right).sum(axis=1)).ravel()
left_norm = np.sqrt(np.asarray(left.multiply(left).sum(axis=1)).ravel())
right_norm = np.sqrt(np.asarray(right.multiply(right).sum(axis=1)).ravel())
denominator = left_norm * right_norm
result = np.divide(numerator, denominator, out=np.zeros_like(numerator, dtype=np.float32), where=denominator > 0)
return result.astype(np.float32)
def discover_suffixes(train_df: pd.DataFrame, config: Config) -> list[str]:
suffixes: list[str] = []
for column in train_df.columns:
if not column.startswith("text1_"):
continue
suffix = column[len("text1_") :]
if _include_family(_feature_family_from_suffix(suffix), config):
suffixes.append(suffix)
return suffixes
# summary global features
def build_global_pairwise_features(df: pd.DataFrame, suffixes: list[str]) -> tuple[Any, list[str]]:
dtype = np.float32
blocks = []
feature_names = []
families = {
"tfidf": [s for s in suffixes if s.startswith("tfidf_")],
"char_ngrams": [s for s in suffixes if s.startswith("char") and "_tfidf_" in s],
"pos_ngrams": [s for s in suffixes if s.startswith("pos") and "_tfidf_" in s],
"scalar": [s for s in suffixes if not (
s.startswith("tfidf_")
or (s.startswith("char") and "_tfidf_" in s)
or (s.startswith("pos") and "_tfidf_" in s)
)
]}
for family_name, family_suffixes in families.items():
if not family_suffixes:
continue
left_cols = [f"text1_{s}" for s in family_suffixes]
right_cols = [f"text2_{s}" for s in family_suffixes]
left = sparse.csr_matrix(df[left_cols].to_numpy(dtype=dtype))
right = sparse.csr_matrix(df[right_cols].to_numpy(dtype=dtype))
diff = left - right
cosine = _safe_cosine_from_columns(left, right).reshape(-1, 1) # cosine similarity
l1 = np.asarray(np.abs(diff).sum(axis=1)).ravel().astype(dtype).reshape(-1, 1) # l1 distance
l2 = np.sqrt(np.asarray(diff.multiply(diff).sum(axis=1)).ravel()).astype(dtype).reshape(-1, 1) # l2 distance
family_block = sparse.csr_matrix(np.hstack([cosine, l1, l2]), dtype=dtype)
blocks.append(family_block)
feature_names.extend([
f"{family_name}_cosine_similarity",
f"{family_name}_l1_distance",
f"{family_name}_l2_distance",
])
if not blocks:
return sparse.csr_matrix((len(df), 0), dtype=dtype), []
return sparse.hstack(blocks, format="csr", dtype=dtype), feature_names
# local features (pairwise operations done for each feature column)
def build_pairwise_matrix(df: pd.DataFrame, suffixes: list[str]) -> tuple[Any, np.ndarray, list[str]]:
dtype = np.float32
feature_names: list[str] = []
columns: list[sparse.csr_matrix] = []
for suffix in suffixes:
# operate one for each column for pairwise operations
left = sparse.csr_matrix(df[f"text1_{suffix}"].to_numpy(dtype=dtype).reshape(-1, 1))
right = sparse.csr_matrix(df[f"text2_{suffix}"].to_numpy(dtype=dtype).reshape(-1, 1))
diff = left - right
# asbolute difference
columns.append(abs(diff))
feature_names.append(f"{suffix}_abs_diff")
# dot product
columns.append(left.multiply(right))
feature_names.append(f"{suffix}_product")
X = sparse.hstack(columns, format="csr", dtype=dtype)
y = df["same"].to_numpy(dtype=np.int8, copy=False) # binary label
return X, y, feature_names
def compute_metrics(y_true: np.ndarray, y_proba: np.ndarray, threshold: float) -> dict[str, Any]:
y_pred = (y_proba >= threshold).astype(int)
tn, fp, fn, tp = confusion_matrix(y_true, y_pred, labels=[0, 1]).ravel()
specificity = tn / (tn + fp) if (tn + fp) > 0 else 0.0
sensitivity = recall_score(y_true, y_pred, zero_division=0)
balanced_accuracy = balanced_accuracy_score(y_true, y_pred)
youden_j = sensitivity + specificity - 1.0
return {
"threshold": round(threshold, 5),
"accuracy": round(accuracy_score(y_true, y_pred), 5),
"precision": round(precision_score(y_true, y_pred, zero_division=0), 5),
"recall": round(sensitivity, 5),
"f1": round(f1_score(y_true, y_pred, zero_division=0), 5),
"balanced_accuracy": round(balanced_accuracy, 5),
"specificity": round(specificity, 5),
"youden_j": round(youden_j, 5),
"roc_auc": round(roc_auc_score(y_true, y_proba), 5),
"tn": int(tn),
"fp": int(fp),
"fn": int(fn),
"tp": int(tp),
}
# finding best threshold using grid search based on config.threshold_metric
# using different config.threshold_metric can lead to different performance for the classification (not proba)
def find_best_threshold(
y_true: np.ndarray,
y_proba: np.ndarray,
config: Config,
) -> tuple[float, dict[str, Any]]:
thresholds = np.arange(0.0, 1.0+config.threshold_grid_step, config.threshold_grid_step, dtype=np.float32)
if thresholds.size == 0:
thresholds = np.array([0.5], dtype=np.float32)
best_threshold = 0.5
best_metrics = compute_metrics(y_true, y_proba, threshold=best_threshold)
best_score = float(best_metrics[config.threshold_metric])
for threshold in thresholds:
metrics = compute_metrics(y_true, y_proba, threshold=float(threshold)) # compute metrics' value for each test threshold
score = float(metrics[config.threshold_metric])
# if current score > best_score...
if score > best_score:
best_threshold = float(threshold)
best_metrics = metrics
best_score = score
return best_threshold, best_metrics
def train_and_save_model(save_root: str | Path | None = None, config: Config | None = None) -> dict[str, Any]:
config = config or Config()
project_root = Path(__file__).resolve().parents[1] # assuming this file under subfolder in project root
saved_dir = project_root / "saved"
ngram_dir = saved_dir / "ngram_features" / "dataframes"
model_dir = Path(save_root) if save_root is not None else saved_dir / "model"
ngram_dict_df = {split: pd.read_parquet(ngram_dir / f"{split}_ngram.parquet") for split in ("train", "validation", "test")}
suffixes = discover_suffixes(ngram_dict_df["train"], config)
X_by_split: dict[str, Any] = {}
y_by_split: dict[str, np.ndarray] = {}
feature_names: list[str] = []
for split, df in ngram_dict_df.items():
blocks = []
feature_names = []
if config.include_local_pairwise:
X_pairwise, y, local_names = build_pairwise_matrix(df, suffixes)
blocks.append(X_pairwise)
feature_names.extend(local_names)
else:
y = df["same"].to_numpy(dtype=np.int8, copy=False)
if config.include_global_pairwise:
X_global, global_names = build_global_pairwise_features(df, suffixes)
blocks.append(X_global)
feature_names.extend(global_names)
if not blocks: # if both local and global features are set to false
raise ValueError("At least one of include_local_pairwise or include_global_pairwise must be True.")
X = sparse.hstack(blocks, format="csr", dtype=np.float32)
X_by_split[split] = X
y_by_split[split] = y
model = XGBClassifier(**config.model_params) # the model can be changed as desired
model.fit(X_by_split["train"], y_by_split["train"]) # fitting
validation_proba = model.predict_proba(X_by_split["validation"])[:, 1]
best_threshold, validation_metrics = find_best_threshold(
y_by_split["validation"],
validation_proba,
config)
test_proba = model.predict_proba(X_by_split["test"])[:, 1]
test_metrics = compute_metrics(y_by_split["test"], test_proba, threshold=best_threshold)
# saving
model_dir.mkdir(parents=True, exist_ok=True)
model.save_model(model_dir / "model.json")
save_json({"threshold": best_threshold}, model_dir / "threshold.json") # saving best threshold
save_json({ # saving features used for training, including original features
"suffixes": suffixes,
"feature_names": feature_names,
},
model_dir / "feature_spec.json")
save_json({ # saving model performance metrics
"validation": validation_metrics,
"test": test_metrics,
},
model_dir / "metrics.json")
save_json(asdict(config), model_dir / "training_config.json") # saving model config
return {
"model_dir": str(model_dir),
"model_path": str(model_dir / "model.json"),
"threshold": best_threshold,
"suffixes": suffixes,
"feature_names": feature_names,
"metrics": {
"validation": validation_metrics,
"test": test_metrics,
}}
if __name__ == "__main__":
outputs = train_and_save_model()
print(f"Saved model bundle to: {outputs['model_dir']}")
print(json.dumps(outputs["metrics"], indent=2))