SignalMod / src /models /metadata_lr.py
Mirae Kang
feat: implement new models and improve UI, #23
46cc63a
raw
history blame
6.59 kB
"""
Logistic regression on TF-IDF(clean_text) + scaled metadata features.
"""
from __future__ import annotations
from pathlib import Path
import numpy as np
import pandas as pd
from scipy.sparse import hstack
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import f1_score
from sklearn.preprocessing import StandardScaler
from src.features.metadata_features import DEFAULT_METADATA_COLUMNS
from src.utils.logger import get_logger
logger = get_logger(__name__)
class MetadataLRModel:
"""TF-IDF on clean text + numeric metadata → logistic regression."""
def __init__(
self,
lr_cfg: dict,
tfidf_cfg: dict,
*,
metadata_columns: list[str] | None = None,
C: float | None = None,
):
self.metadata_columns = metadata_columns or list(DEFAULT_METADATA_COLUMNS)
ngram = tuple(tfidf_cfg.get("ngram_range", [1, 2]))
self.tfidf = TfidfVectorizer(
max_features=int(tfidf_cfg.get("max_features", 5000)),
ngram_range=ngram,
sublinear_tf=bool(tfidf_cfg.get("sublinear_tf", True)),
min_df=int(tfidf_cfg.get("min_df", 3)),
analyzer="word",
strip_accents="unicode",
)
self.scaler = StandardScaler()
self.clf = LogisticRegression(
C=float(C if C is not None else lr_cfg.get("C", 0.05)),
max_iter=int(lr_cfg.get("max_iter", 2000)),
class_weight=lr_cfg.get("class_weight", "balanced"),
solver=lr_cfg.get("solver", "lbfgs"),
random_state=42,
)
self.is_fitted = False
@property
def C(self) -> float:
return float(self.clf.C)
def _meta_matrix(self, meta: pd.DataFrame) -> np.ndarray:
cols = [c for c in self.metadata_columns if c in meta.columns]
return meta[cols].astype(float).values
def _features(self, X_clean: pd.Series, meta: pd.DataFrame, *, fit: bool) -> np.ndarray:
if fit:
X_t = self.tfidf.fit_transform(X_clean.astype(str))
X_m = self.scaler.fit_transform(self._meta_matrix(meta))
else:
X_t = self.tfidf.transform(X_clean.astype(str))
X_m = self.scaler.transform(self._meta_matrix(meta))
return hstack([X_t, X_m])
def fit(
self,
X_clean: pd.Series,
meta: pd.DataFrame,
y,
) -> "MetadataLRModel":
X = self._features(X_clean, meta, fit=True)
self.clf.fit(X, y)
self.is_fitted = True
logger.info(
f"Metadata LR trained — C={self.C} | "
f"tfidf_dim={len(self.tfidf.vocabulary_)} | meta_dim={len(self.metadata_columns)}"
)
return self
def predict_proba(self, X_clean: pd.Series, meta: pd.DataFrame) -> np.ndarray:
X = self._features(X_clean, meta, fit=False)
return self.clf.predict_proba(X)
def predict(self, X_clean: pd.Series, meta: pd.DataFrame) -> np.ndarray:
return self.predict_proba(X_clean, meta).argmax(axis=1)
def train_test_gap(
self,
X_train_clean,
meta_train,
y_train,
X_test_clean,
meta_test,
y_test,
) -> tuple[float, float, float]:
preds_train = self.predict(X_train_clean, meta_train)
preds_test = self.predict(X_test_clean, meta_test)
y_tr = np.asarray(y_train).astype(int)
y_te = np.asarray(y_test).astype(int)
f1_train = float(f1_score(y_tr, preds_train, average="weighted", zero_division=0))
f1_test = float(f1_score(y_te, preds_test, average="weighted", zero_division=0))
return f1_train, f1_test, abs(f1_train - f1_test)
def save(self, path: str | Path) -> None:
import joblib
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
joblib.dump(
{
"tfidf": self.tfidf,
"scaler": self.scaler,
"clf": self.clf,
"metadata_columns": self.metadata_columns,
},
path,
)
logger.info(f"Metadata LR saved: {path}")
@classmethod
def load(cls, path: str | Path) -> "MetadataLRModel":
import joblib
blob = joblib.load(path)
inst = cls.__new__(cls)
inst.tfidf = blob["tfidf"]
inst.scaler = blob["scaler"]
inst.clf = blob["clf"]
inst.metadata_columns = blob["metadata_columns"]
inst.is_fitted = True
return inst
def fit_metadata_lr_with_gap_control(
X_train_clean,
meta_train,
y_train,
X_test_clean,
meta_test,
y_test,
lr_cfg: dict,
tfidf_cfg: dict,
*,
max_gap: float = 0.05,
X_train_gap_clean=None,
meta_train_gap=None,
y_train_gap=None,
) -> tuple[MetadataLRModel, dict]:
gap_cfg = lr_cfg.get("gap_search", {})
X_gap = X_train_gap_clean if X_train_gap_clean is not None else X_train_clean
meta_gap = meta_train_gap if meta_train_gap is not None else meta_train
y_gap = y_train_gap if y_train_gap is not None else y_train
grid = (
gap_cfg.get("param_grid")
if gap_cfg.get("enabled", True)
else [{"C": float(lr_cfg.get("C", 0.05)), **tfidf_cfg}]
)
best: MetadataLRModel | None = None
best_meta: dict = {}
best_gap = float("inf")
for params in grid:
merged = {**tfidf_cfg, **{k: v for k, v in params.items() if k != "C"}}
c = float(params.get("C", lr_cfg.get("C", 0.05)))
model = MetadataLRModel(lr_cfg, merged, C=c)
model.fit(X_train_clean, meta_train, y_train)
f1_train, f1_test, gap = model.train_test_gap(
X_gap, meta_gap, y_gap, X_test_clean, meta_test, y_test
)
logger.info(
f"Metadata LR gap — C={c} max_features={merged.get('max_features')} "
f"train_f1={f1_train:.4f} test_f1={f1_test:.4f} gap={gap:.4f}"
)
meta = {
"C": c,
"max_features": int(merged.get("max_features", 5000)),
"min_df": int(merged.get("min_df", 3)),
"f1_train": round(f1_train, 4),
"f1_test": round(f1_test, 4),
"train_test_gap": round(gap, 4),
"train_test_gap_pp": round(gap * 100, 2),
"gap_ok": gap < max_gap,
}
if gap < best_gap:
best, best_meta = model, meta
best_gap = gap
if gap < max_gap:
break
return best, best_meta # type: ignore[return-value]