LaxmiVarshithaCH
Deploy NeuralKarma to HuggingFace Spaces
553a798
"""
NeuralKarma — ML Model Training Pipeline
Trains real classifiers on real data from HuggingFace:
1. Prosociality classifier (ProsocialDialog safety labels)
2. Five ETHICS-axis classifiers (commonsense, deontology, justice, virtue, utilitarianism)
All models use TF-IDF vectorization + Logistic Regression with cross-validation.
Models are saved as .joblib files for production inference.
"""
import os
import sys
import time
import warnings
from pathlib import Path
import joblib
import numpy as np
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score, StratifiedKFold
from sklearn.pipeline import Pipeline
from sklearn.metrics import classification_report
from sklearn.utils import resample
warnings.filterwarnings("ignore", category=UserWarning)
# Add project root to path
PROJECT_ROOT = Path(__file__).parent.parent
sys.path.insert(0, str(PROJECT_ROOT))
MODEL_DIR = Path(__file__).parent / "models"
DATA_DIR = PROJECT_ROOT / "data" / "cache"
def ensure_model_dir():
"""Create model directory if it doesn't exist."""
MODEL_DIR.mkdir(parents=True, exist_ok=True)
def preprocess_utilitarianism(df):
"""
Convert utilitarianism pairwise comparison format to binary classification.
'baseline' is the more ethical option (label=1), 'less_pleasant' is less ethical (label=0).
We interleave both into a single text+label DataFrame.
"""
positive = pd.DataFrame({
"text": df["baseline"].astype(str),
"label": 1,
})
negative = pd.DataFrame({
"text": df["less_pleasant"].astype(str),
"label": 0,
})
combined = pd.concat([positive, negative], ignore_index=True)
return combined.sample(frac=1, random_state=42).reset_index(drop=True)
def get_text_column(df, subset_name):
"""
Intelligently find the text column in an ETHICS subset DataFrame.
Different ETHICS subsets have different column structures.
"""
if "input" in df.columns:
return df["input"].astype(str)
elif "scenario" in df.columns:
# Deontology / Justice / Virtue
if "excuse" in df.columns:
return (df["scenario"].astype(str) + " " + df["excuse"].astype(str)).str.strip()
return df["scenario"].astype(str)
elif "text" in df.columns:
return df["text"].astype(str)
elif "baseline" in df.columns:
# Utilitarianism — already preprocessed
return df["text"].astype(str) if "text" in df.columns else df["baseline"].astype(str)
else:
# Fallback: concatenate all string columns
str_cols = df.select_dtypes(include=["object"]).columns
if len(str_cols) > 0:
return df[str_cols[0]].astype(str)
raise ValueError(f"Cannot find text column in {subset_name}: {df.columns.tolist()}")
def get_label_column(df, subset_name):
"""Find the label column in a dataset."""
if "label" in df.columns:
return df["label"].astype(int)
elif "is_short" in df.columns:
return df["is_short"].astype(int)
else:
label_cols = [c for c in df.columns if "label" in c.lower()]
if label_cols:
return df[label_cols[0]].astype(int)
raise ValueError(f"Cannot find label column in {subset_name}: {df.columns.tolist()}")
def balance_dataset(texts, labels, max_per_class=5000, random_state=42):
"""
Balance a dataset by upsampling minority and downsampling majority classes.
Caps at max_per_class to keep training fast.
"""
df = pd.DataFrame({"text": texts, "label": labels})
classes = df["label"].unique()
balanced_parts = []
for c in classes:
class_df = df[df["label"] == c]
if len(class_df) > max_per_class:
class_df = class_df.sample(n=max_per_class, random_state=random_state)
elif len(class_df) < max_per_class // 2 and len(class_df) > 10:
class_df = resample(class_df, n_samples=min(max_per_class, len(class_df) * 3),
random_state=random_state, replace=True)
balanced_parts.append(class_df)
result = pd.concat(balanced_parts, ignore_index=True).sample(frac=1, random_state=random_state)
return result["text"].values, result["label"].values
def train_prosociality_model(prosocial_df):
"""
Train prosociality classifier on ProsocialDialog safety labels.
Maps safety labels to binary: safe/casual → prosocial (1), caution/intervention → not prosocial (0)
"""
model_path = MODEL_DIR / "prosociality_model.joblib"
vectorizer_path = MODEL_DIR / "prosociality_vectorizer.joblib"
if model_path.exists() and vectorizer_path.exists():
print(" [OK] Prosociality model already trained, loading from cache...")
return joblib.load(model_path), joblib.load(vectorizer_path)
print(" ⚙ Training Prosociality Classifier...")
start = time.time()
# Binary mapping: safe/casual → 1 (prosocial), caution/intervention → 0 (not)
label_map = {
"__ok__": 1,
"__casual__": 1,
"__probably_needs_caution__": 0,
"__needs_caution__": 0,
"__needs_intervention__": 0,
}
df = prosocial_df.copy()
df["binary_label"] = df["safety_label"].map(label_map)
df = df.dropna(subset=["binary_label", "context"])
df["binary_label"] = df["binary_label"].astype(int)
# Use context as input text
texts = df["context"].astype(str).values
labels = df["binary_label"].values
# Balance and cap
texts, labels = balance_dataset(texts, labels, max_per_class=8000)
print(f" Training on {len(texts):,} samples (balanced)...")
# TF-IDF vectorizer
vectorizer = TfidfVectorizer(
max_features=15000,
ngram_range=(1, 2),
min_df=2,
max_df=0.95,
strip_accents="unicode",
sublinear_tf=True,
)
X = vectorizer.fit_transform(texts)
# Logistic Regression with cross-validation
model = LogisticRegression(
C=1.0,
max_iter=1000,
solver="lbfgs",
class_weight="balanced",
n_jobs=-1,
)
# Cross-validation
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
scores = cross_val_score(model, X, labels, cv=cv, scoring="accuracy", n_jobs=-1)
print(f" CV Accuracy: {scores.mean():.4f}{scores.std():.4f})")
# Final fit on all data
model.fit(X, labels)
# Save
joblib.dump(model, model_path)
joblib.dump(vectorizer, vectorizer_path)
elapsed = time.time() - start
print(f" [OK] Prosociality model trained in {elapsed:.1f}s")
return model, vectorizer
def train_ethics_model(subset_name, subset_df):
"""
Train a classifier for one ETHICS benchmark subset.
"""
model_path = MODEL_DIR / f"ethics_{subset_name}_model.joblib"
vectorizer_path = MODEL_DIR / f"ethics_{subset_name}_vectorizer.joblib"
if model_path.exists() and vectorizer_path.exists():
print(f" [OK] ETHICS/{subset_name} model already trained, loading from cache...")
return joblib.load(model_path), joblib.load(vectorizer_path)
print(f" ⚙ Training ETHICS/{subset_name} Classifier...")
start = time.time()
try:
texts = get_text_column(subset_df, subset_name)
labels = get_label_column(subset_df, subset_name)
except ValueError as e:
print(f" [WARNING] Skipping {subset_name}: {e}")
return None, None
# Clean
mask = texts.str.strip().str.len() > 0
texts = texts[mask].values
labels = labels[mask].values
if len(texts) < 10:
print(f" [WARNING] Too few samples ({len(texts)}), skipping {subset_name}")
return None, None
# Balance
texts, labels = balance_dataset(texts, labels, max_per_class=5000)
print(f" Training on {len(texts):,} samples...")
# TF-IDF
vectorizer = TfidfVectorizer(
max_features=10000,
ngram_range=(1, 2),
min_df=2 if len(texts) > 100 else 1,
max_df=0.95,
strip_accents="unicode",
sublinear_tf=True,
)
X = vectorizer.fit_transform(texts)
model = LogisticRegression(
C=1.0,
max_iter=1000,
solver="lbfgs",
class_weight="balanced",
n_jobs=-1,
)
# Cross-validation
n_splits = min(5, min(np.bincount(labels)))
n_splits = max(2, n_splits)
try:
cv = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
scores = cross_val_score(model, X, labels, cv=cv, scoring="accuracy", n_jobs=-1)
print(f" CV Accuracy: {scores.mean():.4f}{scores.std():.4f})")
except Exception:
print(f" (CV skipped due to small dataset)")
model.fit(X, labels)
joblib.dump(model, model_path)
joblib.dump(vectorizer, vectorizer_path)
elapsed = time.time() - start
print(f" [OK] ETHICS/{subset_name} model trained in {elapsed:.1f}s")
return model, vectorizer
def train_all_models():
"""
Train all NeuralKarma models. Main entry point for ML pipeline.
Returns dict of all trained models and vectorizers.
"""
ensure_model_dir()
print("=" * 60)
print("NeuralKarma — ML Model Training Pipeline")
print("=" * 60)
# Load cached datasets
from data.download_datasets import download_all
prosocial_df, ethics_data, norms_df = download_all()
models = {}
print("\n[1/6] Prosociality Model")
model, vec = train_prosociality_model(prosocial_df)
models["prosociality"] = {"model": model, "vectorizer": vec}
axis_names = ["commonsense", "deontology", "justice", "virtue", "utilitarianism"]
for i, subset_name in enumerate(axis_names):
print(f"\n[{i+2}/6] ETHICS/{subset_name} Model")
if subset_name in ethics_data:
subset_df = ethics_data[subset_name]
# Utilitarianism needs special preprocessing (pairwise → classification)
if subset_name == "utilitarianism" and "baseline" in subset_df.columns and "label" not in subset_df.columns:
subset_df = preprocess_utilitarianism(subset_df)
model, vec = train_ethics_model(subset_name, subset_df)
models[subset_name] = {"model": model, "vectorizer": vec}
else:
print(f" [WARNING] No data for {subset_name}")
print("\n" + "=" * 60)
print("Training Summary:")
for name, m in models.items():
status = "[OK] Ready" if m.get("model") is not None else "[FAILED] Failed"
print(f" • {name}: {status}")
print(f" • Models Dir: {MODEL_DIR.resolve()}")
print("=" * 60)
return models
if __name__ == "__main__":
train_all_models()