File size: 6,593 Bytes
46cc63a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 | """
Logistic regression on TF-IDF(clean_text) + scaled metadata features.
"""
from __future__ import annotations
from pathlib import Path
import numpy as np
import pandas as pd
from scipy.sparse import hstack
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import f1_score
from sklearn.preprocessing import StandardScaler
from src.features.metadata_features import DEFAULT_METADATA_COLUMNS
from src.utils.logger import get_logger
logger = get_logger(__name__)
class MetadataLRModel:
"""TF-IDF on clean text + numeric metadata → logistic regression."""
def __init__(
self,
lr_cfg: dict,
tfidf_cfg: dict,
*,
metadata_columns: list[str] | None = None,
C: float | None = None,
):
self.metadata_columns = metadata_columns or list(DEFAULT_METADATA_COLUMNS)
ngram = tuple(tfidf_cfg.get("ngram_range", [1, 2]))
self.tfidf = TfidfVectorizer(
max_features=int(tfidf_cfg.get("max_features", 5000)),
ngram_range=ngram,
sublinear_tf=bool(tfidf_cfg.get("sublinear_tf", True)),
min_df=int(tfidf_cfg.get("min_df", 3)),
analyzer="word",
strip_accents="unicode",
)
self.scaler = StandardScaler()
self.clf = LogisticRegression(
C=float(C if C is not None else lr_cfg.get("C", 0.05)),
max_iter=int(lr_cfg.get("max_iter", 2000)),
class_weight=lr_cfg.get("class_weight", "balanced"),
solver=lr_cfg.get("solver", "lbfgs"),
random_state=42,
)
self.is_fitted = False
@property
def C(self) -> float:
return float(self.clf.C)
def _meta_matrix(self, meta: pd.DataFrame) -> np.ndarray:
cols = [c for c in self.metadata_columns if c in meta.columns]
return meta[cols].astype(float).values
def _features(self, X_clean: pd.Series, meta: pd.DataFrame, *, fit: bool) -> np.ndarray:
if fit:
X_t = self.tfidf.fit_transform(X_clean.astype(str))
X_m = self.scaler.fit_transform(self._meta_matrix(meta))
else:
X_t = self.tfidf.transform(X_clean.astype(str))
X_m = self.scaler.transform(self._meta_matrix(meta))
return hstack([X_t, X_m])
def fit(
self,
X_clean: pd.Series,
meta: pd.DataFrame,
y,
) -> "MetadataLRModel":
X = self._features(X_clean, meta, fit=True)
self.clf.fit(X, y)
self.is_fitted = True
logger.info(
f"Metadata LR trained — C={self.C} | "
f"tfidf_dim={len(self.tfidf.vocabulary_)} | meta_dim={len(self.metadata_columns)}"
)
return self
def predict_proba(self, X_clean: pd.Series, meta: pd.DataFrame) -> np.ndarray:
X = self._features(X_clean, meta, fit=False)
return self.clf.predict_proba(X)
def predict(self, X_clean: pd.Series, meta: pd.DataFrame) -> np.ndarray:
return self.predict_proba(X_clean, meta).argmax(axis=1)
def train_test_gap(
self,
X_train_clean,
meta_train,
y_train,
X_test_clean,
meta_test,
y_test,
) -> tuple[float, float, float]:
preds_train = self.predict(X_train_clean, meta_train)
preds_test = self.predict(X_test_clean, meta_test)
y_tr = np.asarray(y_train).astype(int)
y_te = np.asarray(y_test).astype(int)
f1_train = float(f1_score(y_tr, preds_train, average="weighted", zero_division=0))
f1_test = float(f1_score(y_te, preds_test, average="weighted", zero_division=0))
return f1_train, f1_test, abs(f1_train - f1_test)
def save(self, path: str | Path) -> None:
import joblib
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
joblib.dump(
{
"tfidf": self.tfidf,
"scaler": self.scaler,
"clf": self.clf,
"metadata_columns": self.metadata_columns,
},
path,
)
logger.info(f"Metadata LR saved: {path}")
@classmethod
def load(cls, path: str | Path) -> "MetadataLRModel":
import joblib
blob = joblib.load(path)
inst = cls.__new__(cls)
inst.tfidf = blob["tfidf"]
inst.scaler = blob["scaler"]
inst.clf = blob["clf"]
inst.metadata_columns = blob["metadata_columns"]
inst.is_fitted = True
return inst
def fit_metadata_lr_with_gap_control(
X_train_clean,
meta_train,
y_train,
X_test_clean,
meta_test,
y_test,
lr_cfg: dict,
tfidf_cfg: dict,
*,
max_gap: float = 0.05,
X_train_gap_clean=None,
meta_train_gap=None,
y_train_gap=None,
) -> tuple[MetadataLRModel, dict]:
gap_cfg = lr_cfg.get("gap_search", {})
X_gap = X_train_gap_clean if X_train_gap_clean is not None else X_train_clean
meta_gap = meta_train_gap if meta_train_gap is not None else meta_train
y_gap = y_train_gap if y_train_gap is not None else y_train
grid = (
gap_cfg.get("param_grid")
if gap_cfg.get("enabled", True)
else [{"C": float(lr_cfg.get("C", 0.05)), **tfidf_cfg}]
)
best: MetadataLRModel | None = None
best_meta: dict = {}
best_gap = float("inf")
for params in grid:
merged = {**tfidf_cfg, **{k: v for k, v in params.items() if k != "C"}}
c = float(params.get("C", lr_cfg.get("C", 0.05)))
model = MetadataLRModel(lr_cfg, merged, C=c)
model.fit(X_train_clean, meta_train, y_train)
f1_train, f1_test, gap = model.train_test_gap(
X_gap, meta_gap, y_gap, X_test_clean, meta_test, y_test
)
logger.info(
f"Metadata LR gap — C={c} max_features={merged.get('max_features')} "
f"train_f1={f1_train:.4f} test_f1={f1_test:.4f} gap={gap:.4f}"
)
meta = {
"C": c,
"max_features": int(merged.get("max_features", 5000)),
"min_df": int(merged.get("min_df", 3)),
"f1_train": round(f1_train, 4),
"f1_test": round(f1_test, 4),
"train_test_gap": round(gap, 4),
"train_test_gap_pp": round(gap * 100, 2),
"gap_ok": gap < max_gap,
}
if gap < best_gap:
best, best_meta = model, meta
best_gap = gap
if gap < max_gap:
break
return best, best_meta # type: ignore[return-value]
|