Spaces:
Sleeping
Sleeping
| """ | |
| Train the fraud-text classifier. | |
| Pipeline (senior-grade upgrade over the previous single-LogReg version): | |
| 1. Text normalisation (``app.services.preprocess.normalize_for_classifier``) | |
| replaces digits and URLs with sentinel tokens so the model learns the | |
| *pattern of manipulation*, not memorised amounts or specific phishing | |
| domains. | |
| 2. Two TF-IDF representations are concatenated: | |
| * word n-grams (1–3) — captures phrase-level signals | |
| ("сообщите код", "не разглашайте") | |
| * char-word-boundary n-grams (2–5) — robust to Russian/Kazakh | |
| morphology and to ASR-style spelling errors | |
| (e.g. "бұғатталады" → "бугаталады" still matches at char level). | |
| 3. Three diverse base classifiers are calibrated with isotonic regression | |
| on a held-out split and combined via soft voting: | |
| * Logistic Regression (linear, well-regularised) | |
| * Linear SVC (large-margin classifier, complementary errors) | |
| * Multinomial Naive Bayes (different inductive bias) | |
| Calibration is essential for the downstream fusion formula | |
| (0.6·ml + 0.4·rules) to behave like real probability arithmetic. | |
| 4. 5-fold stratified CV reports F1 / ROC-AUC so we know what to expect | |
| on production data, not just on the test split which is inflated by | |
| template similarity. | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import os | |
| import sys | |
| from pathlib import Path | |
| import joblib | |
| import numpy as np | |
| import pandas as pd | |
| from scipy.sparse import hstack | |
| from sklearn.calibration import CalibratedClassifierCV | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.linear_model import LogisticRegression | |
| from sklearn.metrics import classification_report, roc_auc_score | |
| from sklearn.model_selection import StratifiedKFold, train_test_split | |
| from sklearn.naive_bayes import MultinomialNB | |
| from sklearn.svm import LinearSVC | |
| ROOT = Path(__file__).resolve().parent.parent | |
| sys.path.insert(0, str(ROOT)) | |
| from app.services.preprocess import normalize_for_classifier # noqa: E402 | |
| def build_vectorisers(): | |
| word_vec = TfidfVectorizer( | |
| ngram_range=(1, 3), | |
| min_df=2, | |
| max_df=0.95, | |
| max_features=80_000, | |
| sublinear_tf=True, | |
| lowercase=False, # already lowercased in preprocessing | |
| ) | |
| char_vec = TfidfVectorizer( | |
| analyzer="char_wb", | |
| ngram_range=(2, 5), | |
| min_df=2, | |
| max_features=120_000, | |
| sublinear_tf=True, | |
| lowercase=False, | |
| ) | |
| return word_vec, char_vec | |
| def build_features(word_vec, char_vec, texts, fit=False): | |
| if fit: | |
| word = word_vec.fit_transform(texts) | |
| char = char_vec.fit_transform(texts) | |
| else: | |
| word = word_vec.transform(texts) | |
| char = char_vec.transform(texts) | |
| return hstack([word, char]).tocsr() | |
| def build_ensemble(random_state: int = 42): | |
| """Return three classifier factories (instantiated per fold/training).""" | |
| return [ | |
| ("logreg", | |
| lambda: LogisticRegression( | |
| C=4.0, max_iter=4000, class_weight="balanced", | |
| solver="liblinear", random_state=random_state)), | |
| ("linsvc", | |
| lambda: CalibratedClassifierCV( | |
| estimator=LinearSVC(C=1.0, class_weight="balanced", random_state=random_state), | |
| method="isotonic", cv=3)), | |
| ("mnb", | |
| lambda: MultinomialNB(alpha=0.3)), | |
| ] | |
| def calibrate(estimator, X, y, method: str = "isotonic"): | |
| """Wrap an estimator into a calibrator unless it's already calibrated.""" | |
| if isinstance(estimator, CalibratedClassifierCV): | |
| estimator.fit(X, y) | |
| return estimator | |
| cal = CalibratedClassifierCV(estimator=estimator, method=method, cv=3) | |
| cal.fit(X, y) | |
| return cal | |
| def cross_validate_ensemble(X_text: pd.Series, y: pd.Series, seed: int, | |
| folds: int = 5) -> dict[str, float]: | |
| skf = StratifiedKFold(n_splits=folds, shuffle=True, random_state=seed) | |
| f1s, aucs = [], [] | |
| for fold, (tr_idx, va_idx) in enumerate(skf.split(X_text, y), start=1): | |
| X_tr_text = X_text.iloc[tr_idx] | |
| X_va_text = X_text.iloc[va_idx] | |
| y_tr = y.iloc[tr_idx] | |
| y_va = y.iloc[va_idx] | |
| word_vec, char_vec = build_vectorisers() | |
| X_tr = build_features(word_vec, char_vec, X_tr_text, fit=True) | |
| X_va = build_features(word_vec, char_vec, X_va_text, fit=False) | |
| probas = np.zeros(len(va_idx), dtype=float) | |
| for _, factory in build_ensemble(seed): | |
| clf = calibrate(factory(), X_tr, y_tr) | |
| probas += clf.predict_proba(X_va)[:, 1] | |
| probas /= len(build_ensemble(seed)) | |
| preds = (probas >= 0.5).astype(int) | |
| from sklearn.metrics import f1_score | |
| f1s.append(f1_score(y_va, preds)) | |
| aucs.append(roc_auc_score(y_va, probas)) | |
| print(f" fold {fold}: f1={f1s[-1]:.4f} auc={aucs[-1]:.4f}") | |
| return {"f1_mean": float(np.mean(f1s)), "f1_std": float(np.std(f1s)), | |
| "auc_mean": float(np.mean(aucs)), "auc_std": float(np.std(aucs))} | |
| def main() -> int: | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--dataset", default=None) | |
| parser.add_argument("--out", default=None) | |
| parser.add_argument("--test-size", type=float, default=0.2) | |
| parser.add_argument("--seed", type=int, default=42) | |
| parser.add_argument("--no-cv", action="store_true", | |
| help="Skip 5-fold CV report (faster in CI)") | |
| args = parser.parse_args() | |
| here = Path(__file__).resolve().parent | |
| server_root = here.parent | |
| dataset_path = Path(args.dataset) if args.dataset else here / "seed_dataset.csv" | |
| out_path = Path(args.out) if args.out else server_root / "models" / "clf.pkl" | |
| if not dataset_path.exists(): | |
| print(f"Dataset not found: {dataset_path}", file=sys.stderr) | |
| return 1 | |
| df = pd.read_csv(dataset_path) | |
| df = df.dropna(subset=["text", "label"]).reset_index(drop=True) | |
| df["text"] = df["text"].astype(str).map(normalize_for_classifier) | |
| df = df[df["text"].str.len() > 0].reset_index(drop=True) | |
| df["label"] = df["label"].astype(int) | |
| print(f"Loaded {len(df)} examples. Label distribution:") | |
| print(df["label"].value_counts().to_string()) | |
| if not args.no_cv: | |
| print("\n5-fold stratified CV (ensemble):") | |
| cv = cross_validate_ensemble(df["text"], df["label"], seed=args.seed) | |
| print(f" ▸ F1 {cv['f1_mean']:.4f} ± {cv['f1_std']:.4f}") | |
| print(f" ▸ AUC {cv['auc_mean']:.4f} ± {cv['auc_std']:.4f}") | |
| X_train_text, X_test_text, y_train, y_test = train_test_split( | |
| df["text"], df["label"], test_size=args.test_size, | |
| stratify=df["label"], random_state=args.seed, | |
| ) | |
| word_vec, char_vec = build_vectorisers() | |
| X_train = build_features(word_vec, char_vec, X_train_text, fit=True) | |
| X_test = build_features(word_vec, char_vec, X_test_text, fit=False) | |
| # Fit each calibrated base learner on the training split | |
| members = [] | |
| for name, factory in build_ensemble(args.seed): | |
| print(f"\nFitting {name}…") | |
| clf = calibrate(factory(), X_train, y_train) | |
| members.append((name, clf)) | |
| # Evaluate the soft-voting ensemble on the held-out test split | |
| probas = np.zeros(X_test.shape[0], dtype=float) | |
| for _, clf in members: | |
| probas += clf.predict_proba(X_test)[:, 1] | |
| probas /= len(members) | |
| y_pred = (probas >= 0.5).astype(int) | |
| print("\nHeld-out test split — ensemble report:") | |
| print(classification_report(y_test, y_pred, digits=3, zero_division=0)) | |
| print(f"AUC: {roc_auc_score(y_test, probas):.4f}") | |
| out_path.parent.mkdir(parents=True, exist_ok=True) | |
| bundle = { | |
| "word": word_vec, | |
| "char": char_vec, | |
| "members": members, | |
| "version": 2, | |
| } | |
| joblib.dump(bundle, out_path) | |
| size_kb = os.path.getsize(out_path) / 1024 | |
| print(f"\nSaved ensemble classifier to {out_path} ({size_kb:.1f} KB)") | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) | |