aaa / training /train_difficulty_model.py
work-sejal
Deploy AI service with FastAPI
70ea7be
Raw
History Blame Contribute Delete
11.2 kB
"""Difficulty Model training pipeline.
Trains a RandomForestRegressor on question features for difficulty estimation.
Target: difficulty_score (continuous [0, 1]).
Features: bloom_score, grade, subject (encoded), question_type (encoded).
Primary metric: MAE.
"""
import logging
from datetime import datetime, timezone
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error, r2_score
from sklearn.preprocessing import OrdinalEncoder
from app.core.config import settings
from app.core.exceptions import TrainingError
from training.base_trainer import BaseTrainer, TrainingResult
logger = logging.getLogger(__name__)
FEATURE_COLUMNS = ["bloom_score", "grade", "subject", "question_type"]
CATEGORICAL_COLUMNS = ["subject", "question_type"]
NUMERIC_COLUMNS = ["bloom_score", "grade"]
TARGET_COLUMN = "difficulty_score"
class DifficultyModelTrainer(BaseTrainer):
"""RandomForestRegressor for question difficulty estimation.
Target: difficulty_score (continuous [0, 1])
Features: bloom_score, grade, subject (encoded), question_type (encoded)
Primary metric: MAE
"""
@property
def model_name(self) -> str:
return "difficulty_model"
@property
def model_version(self) -> str:
return "difficulty_model_v2_baseline_001"
@property
def table_name(self) -> str:
return "training_lo_tagging"
def _load_with_question_type(self, df: pd.DataFrame) -> pd.DataFrame:
"""Join question_type from questions.csv since training_lo_tagging lacks it.
The training_lo_tagging table does not include question_type, but the
design requires it as a feature. We join on question_id from questions.csv.
"""
questions_df = self._loader.load_table("questions")
question_type_map = questions_df[["question_id", "question_type"]].drop_duplicates()
df = df.merge(question_type_map, on="question_id", how="left")
# Fill any missing question_type with a default
if df["question_type"].isna().any():
missing_count = df["question_type"].isna().sum()
logger.warning(
"Found %d rows with missing question_type after join; filling with 'unknown'",
missing_count,
)
df["question_type"] = df["question_type"].fillna("unknown")
return df
def train(self, train_df: pd.DataFrame, val_df: pd.DataFrame) -> dict:
"""Train RandomForestRegressor on question features.
Algorithm:
1. Join question_type from questions table
2. Encode categorical columns (subject, question_type) with OrdinalEncoder
3. Build numeric feature matrix: [bloom_score, grade, subject_encoded, question_type_encoded]
4. Target: difficulty_score
5. Fit RandomForestRegressor(n_estimators=100, random_state=seed)
6. Return {"model": rf, "encoder": ordinal_enc, "feature_columns.json": feature_list}
"""
# Join question_type for both train and val
train_df = self._load_with_question_type(train_df)
# Fit OrdinalEncoder on categorical columns
ordinal_enc = OrdinalEncoder(
handle_unknown="use_encoded_value",
unknown_value=-1,
)
ordinal_enc.fit(train_df[CATEGORICAL_COLUMNS])
# Build feature matrix
X_cat = ordinal_enc.transform(train_df[CATEGORICAL_COLUMNS])
X_num = train_df[NUMERIC_COLUMNS].values
X_train = np.hstack([X_num, X_cat])
y_train = train_df[TARGET_COLUMN].values
# Fit RandomForestRegressor
rf = RandomForestRegressor(
n_estimators=100,
random_state=self._seed,
)
rf.fit(X_train, y_train)
logger.info(
"Difficulty model trained — %d samples, %d features",
X_train.shape[0],
X_train.shape[1],
)
return {
"model": rf,
"encoder": ordinal_enc,
"feature_columns.json": FEATURE_COLUMNS,
}
def evaluate(self, artifacts: dict, df: pd.DataFrame, split_name: str) -> dict:
"""Evaluate model on a split.
Computes: MAE, R-squared, per-bucket MAE (easy/medium/hard based on
difficulty column).
"""
model = artifacts["model"]
encoder = artifacts["encoder"]
# Join question_type for evaluation data
df = self._load_with_question_type(df)
# Build feature matrix
X_cat = encoder.transform(df[CATEGORICAL_COLUMNS])
X_num = df[NUMERIC_COLUMNS].values
X = np.hstack([X_num, X_cat])
y_true = df[TARGET_COLUMN].values
y_pred = model.predict(X)
# Overall metrics
mae = mean_absolute_error(y_true, y_pred)
r2 = r2_score(y_true, y_pred)
# Per-bucket MAE (easy/medium/hard based on difficulty column)
per_bucket_mae = {}
if "difficulty" in df.columns:
for bucket in df["difficulty"].unique():
mask = df["difficulty"] == bucket
if mask.sum() > 0:
bucket_mae = mean_absolute_error(
y_true[mask], y_pred[mask]
)
per_bucket_mae[bucket.lower()] = round(bucket_mae, 4)
metrics = {
"mae": round(mae, 4),
"r_squared": round(r2, 4),
"per_bucket_mae": per_bucket_mae,
}
logger.info(
"%s metrics — MAE: %.4f, R²: %.4f",
split_name, mae, r2,
)
return metrics
def _check_baseline(self, metrics: dict) -> None:
"""Verify MAE < 0.5 (very lenient baseline for synthetic data).
Raises TrainingError if not met.
"""
test_metrics = metrics.get("metrics", {}).get("test", {})
mae = test_metrics.get("mae")
# Fallback to validation metrics if test not available
if mae is None:
val_metrics = metrics.get("metrics", {}).get("validation", {})
mae = val_metrics.get("mae")
if mae is None:
raise TrainingError(
"Cannot compute baseline: MAE not found in metrics.",
model_name=self.model_name,
)
if mae >= 0.5:
raise TrainingError(
f"MAE ({mae:.4f}) does not meet baseline threshold (< 0.5). "
f"Model performance is insufficient.",
model_name=self.model_name,
)
logger.info("Baseline check passed — MAE %.4f < 0.5", mae)
def _build_metrics(
self,
val_metrics: dict,
test_metrics: dict,
train_df: pd.DataFrame,
val_df: pd.DataFrame,
test_df: pd.DataFrame,
) -> dict:
"""Assemble full metrics.json content."""
return {
"model_name": self.model_name,
"model_version": self.model_version,
"dataset_version": settings.ai_service_version,
"trained_at": datetime.now(timezone.utc).isoformat(),
"seed": self._seed,
"split_counts": {
"train": len(train_df),
"validation": len(val_df),
"test": len(test_df),
},
"metrics": {
"validation": val_metrics,
"test": test_metrics,
},
"limitations": [
"Trained on synthetic data only.",
"difficulty_score distribution may not reflect real-world difficulty.",
"OrdinalEncoder assumes an ordering that may not be meaningful for subject/question_type.",
"Per-bucket MAE depends on the quality of the difficulty string labels.",
],
}
def _build_training_config(
self,
train_df: pd.DataFrame,
val_df: pd.DataFrame,
test_df: pd.DataFrame,
) -> dict:
"""Build training_config.json with hyperparameters."""
return {
"model_name": self.model_name,
"model_version": self.model_version,
"dataset_version": settings.ai_service_version,
"seed": self._seed,
"split_counts": {
"train": len(train_df),
"validation": len(val_df),
"test": len(test_df),
},
"hyperparameters": {
"n_estimators": 100,
"random_state": self._seed,
"algorithm": "RandomForestRegressor",
"encoder": "OrdinalEncoder",
},
"feature_columns": FEATURE_COLUMNS,
"categorical_columns": CATEGORICAL_COLUMNS,
"numeric_columns": NUMERIC_COLUMNS,
"target_column": TARGET_COLUMN,
"algorithm": "RandomForestRegressor",
}
def _build_model_card(self, metrics: dict) -> str:
"""Generate model_card.md content."""
val_metrics = metrics.get("metrics", {}).get("validation", {})
test_metrics = metrics.get("metrics", {}).get("test", {})
card = f"""# Model Card: Difficulty Model
## Model Details
- **Model Name:** {self.model_name}
- **Model Version:** {self.model_version}
- **Algorithm:** RandomForestRegressor
- **Framework:** scikit-learn
- **Trained At:** {metrics.get("trained_at", "N/A")}
- **Seed:** {self._seed}
## Intended Use
Estimate question difficulty as a continuous score in [0, 1] based on
question features (bloom_score, grade, subject, question_type). Used in
the difficulty estimation endpoint to predict how hard a question is for
a given grade level.
## Training Data
- **Source:** training_lo_tagging.csv + questions.csv (for question_type)
- **Split Counts:** train={metrics.get("split_counts", {}).get("train", "N/A")}, \
validation={metrics.get("split_counts", {}).get("validation", "N/A")}, \
test={metrics.get("split_counts", {}).get("test", "N/A")}
- **Features:** bloom_score (numeric), grade (numeric), subject (OrdinalEncoded), \
question_type (OrdinalEncoded)
- **Target:** difficulty_score (continuous [0, 1])
## Metrics
### Validation Set
- MAE: {val_metrics.get("mae", "N/A")}
- R-squared: {val_metrics.get("r_squared", "N/A")}
- Per-bucket MAE: {val_metrics.get("per_bucket_mae", "N/A")}
### Test Set
- MAE: {test_metrics.get("mae", "N/A")}
- R-squared: {test_metrics.get("r_squared", "N/A")}
- Per-bucket MAE: {test_metrics.get("per_bucket_mae", "N/A")}
## Known Limitations
- Trained on synthetic data only — performance on real questions is unknown.
- difficulty_score distribution may not reflect real-world difficulty.
- OrdinalEncoder assumes an ordering that may not be meaningful for subject/question_type.
- Per-bucket MAE depends on the quality of the difficulty string labels.
- Limited feature set (4 features); text-based features could improve performance.
## Fallback Behavior
When the model is not loaded or confidence is below threshold, the system
falls back to a rule-based difficulty estimation using bloom_score and
grade-level heuristics.
"""
return card