| """ |
| NeuralKarma — ML Model Training Pipeline |
| Trains real classifiers on real data from HuggingFace: |
| 1. Prosociality classifier (ProsocialDialog safety labels) |
| 2. Five ETHICS-axis classifiers (commonsense, deontology, justice, virtue, utilitarianism) |
| |
| All models use TF-IDF vectorization + Logistic Regression with cross-validation. |
| Models are saved as .joblib files for production inference. |
| """ |
|
|
| import os |
| import sys |
| import time |
| import warnings |
| from pathlib import Path |
|
|
| import joblib |
| import numpy as np |
| import pandas as pd |
| from sklearn.feature_extraction.text import TfidfVectorizer |
| from sklearn.linear_model import LogisticRegression |
| from sklearn.model_selection import cross_val_score, StratifiedKFold |
| from sklearn.pipeline import Pipeline |
| from sklearn.metrics import classification_report |
| from sklearn.utils import resample |
|
|
| warnings.filterwarnings("ignore", category=UserWarning) |
|
|
| |
| PROJECT_ROOT = Path(__file__).parent.parent |
| sys.path.insert(0, str(PROJECT_ROOT)) |
|
|
| MODEL_DIR = Path(__file__).parent / "models" |
| DATA_DIR = PROJECT_ROOT / "data" / "cache" |
|
|
|
|
| def ensure_model_dir(): |
| """Create model directory if it doesn't exist.""" |
| MODEL_DIR.mkdir(parents=True, exist_ok=True) |
|
|
|
|
| def preprocess_utilitarianism(df): |
| """ |
| Convert utilitarianism pairwise comparison format to binary classification. |
| 'baseline' is the more ethical option (label=1), 'less_pleasant' is less ethical (label=0). |
| We interleave both into a single text+label DataFrame. |
| """ |
| positive = pd.DataFrame({ |
| "text": df["baseline"].astype(str), |
| "label": 1, |
| }) |
| negative = pd.DataFrame({ |
| "text": df["less_pleasant"].astype(str), |
| "label": 0, |
| }) |
| combined = pd.concat([positive, negative], ignore_index=True) |
| return combined.sample(frac=1, random_state=42).reset_index(drop=True) |
|
|
|
|
| def get_text_column(df, subset_name): |
| """ |
| Intelligently find the text column in an ETHICS subset DataFrame. |
| Different ETHICS subsets have different column structures. |
| """ |
| if "input" in df.columns: |
| return df["input"].astype(str) |
| elif "scenario" in df.columns: |
| |
| if "excuse" in df.columns: |
| return (df["scenario"].astype(str) + " " + df["excuse"].astype(str)).str.strip() |
| return df["scenario"].astype(str) |
| elif "text" in df.columns: |
| return df["text"].astype(str) |
| elif "baseline" in df.columns: |
| |
| return df["text"].astype(str) if "text" in df.columns else df["baseline"].astype(str) |
| else: |
| |
| str_cols = df.select_dtypes(include=["object"]).columns |
| if len(str_cols) > 0: |
| return df[str_cols[0]].astype(str) |
| raise ValueError(f"Cannot find text column in {subset_name}: {df.columns.tolist()}") |
|
|
|
|
| def get_label_column(df, subset_name): |
| """Find the label column in a dataset.""" |
| if "label" in df.columns: |
| return df["label"].astype(int) |
| elif "is_short" in df.columns: |
| return df["is_short"].astype(int) |
| else: |
| label_cols = [c for c in df.columns if "label" in c.lower()] |
| if label_cols: |
| return df[label_cols[0]].astype(int) |
| raise ValueError(f"Cannot find label column in {subset_name}: {df.columns.tolist()}") |
|
|
|
|
| def balance_dataset(texts, labels, max_per_class=5000, random_state=42): |
| """ |
| Balance a dataset by upsampling minority and downsampling majority classes. |
| Caps at max_per_class to keep training fast. |
| """ |
| df = pd.DataFrame({"text": texts, "label": labels}) |
| classes = df["label"].unique() |
| balanced_parts = [] |
|
|
| for c in classes: |
| class_df = df[df["label"] == c] |
| if len(class_df) > max_per_class: |
| class_df = class_df.sample(n=max_per_class, random_state=random_state) |
| elif len(class_df) < max_per_class // 2 and len(class_df) > 10: |
| class_df = resample(class_df, n_samples=min(max_per_class, len(class_df) * 3), |
| random_state=random_state, replace=True) |
| balanced_parts.append(class_df) |
|
|
| result = pd.concat(balanced_parts, ignore_index=True).sample(frac=1, random_state=random_state) |
| return result["text"].values, result["label"].values |
|
|
|
|
| def train_prosociality_model(prosocial_df): |
| """ |
| Train prosociality classifier on ProsocialDialog safety labels. |
| Maps safety labels to binary: safe/casual → prosocial (1), caution/intervention → not prosocial (0) |
| """ |
| model_path = MODEL_DIR / "prosociality_model.joblib" |
| vectorizer_path = MODEL_DIR / "prosociality_vectorizer.joblib" |
|
|
| if model_path.exists() and vectorizer_path.exists(): |
| print(" [OK] Prosociality model already trained, loading from cache...") |
| return joblib.load(model_path), joblib.load(vectorizer_path) |
|
|
| print(" ⚙ Training Prosociality Classifier...") |
| start = time.time() |
|
|
| |
| label_map = { |
| "__ok__": 1, |
| "__casual__": 1, |
| "__probably_needs_caution__": 0, |
| "__needs_caution__": 0, |
| "__needs_intervention__": 0, |
| } |
|
|
| df = prosocial_df.copy() |
| df["binary_label"] = df["safety_label"].map(label_map) |
| df = df.dropna(subset=["binary_label", "context"]) |
| df["binary_label"] = df["binary_label"].astype(int) |
|
|
| |
| texts = df["context"].astype(str).values |
| labels = df["binary_label"].values |
|
|
| |
| texts, labels = balance_dataset(texts, labels, max_per_class=8000) |
|
|
| print(f" Training on {len(texts):,} samples (balanced)...") |
|
|
| |
| vectorizer = TfidfVectorizer( |
| max_features=15000, |
| ngram_range=(1, 2), |
| min_df=2, |
| max_df=0.95, |
| strip_accents="unicode", |
| sublinear_tf=True, |
| ) |
|
|
| X = vectorizer.fit_transform(texts) |
|
|
| |
| model = LogisticRegression( |
| C=1.0, |
| max_iter=1000, |
| solver="lbfgs", |
| class_weight="balanced", |
| n_jobs=-1, |
| ) |
|
|
| |
| cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42) |
| scores = cross_val_score(model, X, labels, cv=cv, scoring="accuracy", n_jobs=-1) |
| print(f" CV Accuracy: {scores.mean():.4f} (±{scores.std():.4f})") |
|
|
| |
| model.fit(X, labels) |
|
|
| |
| joblib.dump(model, model_path) |
| joblib.dump(vectorizer, vectorizer_path) |
|
|
| elapsed = time.time() - start |
| print(f" [OK] Prosociality model trained in {elapsed:.1f}s") |
|
|
| return model, vectorizer |
|
|
|
|
| def train_ethics_model(subset_name, subset_df): |
| """ |
| Train a classifier for one ETHICS benchmark subset. |
| """ |
| model_path = MODEL_DIR / f"ethics_{subset_name}_model.joblib" |
| vectorizer_path = MODEL_DIR / f"ethics_{subset_name}_vectorizer.joblib" |
|
|
| if model_path.exists() and vectorizer_path.exists(): |
| print(f" [OK] ETHICS/{subset_name} model already trained, loading from cache...") |
| return joblib.load(model_path), joblib.load(vectorizer_path) |
|
|
| print(f" ⚙ Training ETHICS/{subset_name} Classifier...") |
| start = time.time() |
|
|
| try: |
| texts = get_text_column(subset_df, subset_name) |
| labels = get_label_column(subset_df, subset_name) |
| except ValueError as e: |
| print(f" [WARNING] Skipping {subset_name}: {e}") |
| return None, None |
|
|
| |
| mask = texts.str.strip().str.len() > 0 |
| texts = texts[mask].values |
| labels = labels[mask].values |
|
|
| if len(texts) < 10: |
| print(f" [WARNING] Too few samples ({len(texts)}), skipping {subset_name}") |
| return None, None |
|
|
| |
| texts, labels = balance_dataset(texts, labels, max_per_class=5000) |
| print(f" Training on {len(texts):,} samples...") |
|
|
| |
| vectorizer = TfidfVectorizer( |
| max_features=10000, |
| ngram_range=(1, 2), |
| min_df=2 if len(texts) > 100 else 1, |
| max_df=0.95, |
| strip_accents="unicode", |
| sublinear_tf=True, |
| ) |
|
|
| X = vectorizer.fit_transform(texts) |
|
|
| model = LogisticRegression( |
| C=1.0, |
| max_iter=1000, |
| solver="lbfgs", |
| class_weight="balanced", |
| n_jobs=-1, |
| ) |
|
|
| |
| n_splits = min(5, min(np.bincount(labels))) |
| n_splits = max(2, n_splits) |
| try: |
| cv = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42) |
| scores = cross_val_score(model, X, labels, cv=cv, scoring="accuracy", n_jobs=-1) |
| print(f" CV Accuracy: {scores.mean():.4f} (±{scores.std():.4f})") |
| except Exception: |
| print(f" (CV skipped due to small dataset)") |
|
|
| model.fit(X, labels) |
|
|
| joblib.dump(model, model_path) |
| joblib.dump(vectorizer, vectorizer_path) |
|
|
| elapsed = time.time() - start |
| print(f" [OK] ETHICS/{subset_name} model trained in {elapsed:.1f}s") |
|
|
| return model, vectorizer |
|
|
|
|
| def train_all_models(): |
| """ |
| Train all NeuralKarma models. Main entry point for ML pipeline. |
| Returns dict of all trained models and vectorizers. |
| """ |
| ensure_model_dir() |
|
|
| print("=" * 60) |
| print("NeuralKarma — ML Model Training Pipeline") |
| print("=" * 60) |
|
|
| |
| from data.download_datasets import download_all |
| prosocial_df, ethics_data, norms_df = download_all() |
|
|
| models = {} |
|
|
| print("\n[1/6] Prosociality Model") |
| model, vec = train_prosociality_model(prosocial_df) |
| models["prosociality"] = {"model": model, "vectorizer": vec} |
|
|
| axis_names = ["commonsense", "deontology", "justice", "virtue", "utilitarianism"] |
| for i, subset_name in enumerate(axis_names): |
| print(f"\n[{i+2}/6] ETHICS/{subset_name} Model") |
| if subset_name in ethics_data: |
| subset_df = ethics_data[subset_name] |
| |
| if subset_name == "utilitarianism" and "baseline" in subset_df.columns and "label" not in subset_df.columns: |
| subset_df = preprocess_utilitarianism(subset_df) |
| model, vec = train_ethics_model(subset_name, subset_df) |
| models[subset_name] = {"model": model, "vectorizer": vec} |
| else: |
| print(f" [WARNING] No data for {subset_name}") |
|
|
| print("\n" + "=" * 60) |
| print("Training Summary:") |
| for name, m in models.items(): |
| status = "[OK] Ready" if m.get("model") is not None else "[FAILED] Failed" |
| print(f" • {name}: {status}") |
| print(f" • Models Dir: {MODEL_DIR.resolve()}") |
| print("=" * 60) |
|
|
| return models |
|
|
|
|
| if __name__ == "__main__": |
| train_all_models() |
|
|