"""Difficulty Model training pipeline. Trains a RandomForestRegressor on question features for difficulty estimation. Target: difficulty_score (continuous [0, 1]). Features: bloom_score, grade, subject (encoded), question_type (encoded). Primary metric: MAE. """ import logging from datetime import datetime, timezone import numpy as np import pandas as pd from sklearn.ensemble import RandomForestRegressor from sklearn.metrics import mean_absolute_error, r2_score from sklearn.preprocessing import OrdinalEncoder from app.core.config import settings from app.core.exceptions import TrainingError from training.base_trainer import BaseTrainer, TrainingResult logger = logging.getLogger(__name__) FEATURE_COLUMNS = ["bloom_score", "grade", "subject", "question_type"] CATEGORICAL_COLUMNS = ["subject", "question_type"] NUMERIC_COLUMNS = ["bloom_score", "grade"] TARGET_COLUMN = "difficulty_score" class DifficultyModelTrainer(BaseTrainer): """RandomForestRegressor for question difficulty estimation. Target: difficulty_score (continuous [0, 1]) Features: bloom_score, grade, subject (encoded), question_type (encoded) Primary metric: MAE """ @property def model_name(self) -> str: return "difficulty_model" @property def model_version(self) -> str: return "difficulty_model_v2_baseline_001" @property def table_name(self) -> str: return "training_lo_tagging" def _load_with_question_type(self, df: pd.DataFrame) -> pd.DataFrame: """Join question_type from questions.csv since training_lo_tagging lacks it. The training_lo_tagging table does not include question_type, but the design requires it as a feature. We join on question_id from questions.csv. """ questions_df = self._loader.load_table("questions") question_type_map = questions_df[["question_id", "question_type"]].drop_duplicates() df = df.merge(question_type_map, on="question_id", how="left") # Fill any missing question_type with a default if df["question_type"].isna().any(): missing_count = df["question_type"].isna().sum() logger.warning( "Found %d rows with missing question_type after join; filling with 'unknown'", missing_count, ) df["question_type"] = df["question_type"].fillna("unknown") return df def train(self, train_df: pd.DataFrame, val_df: pd.DataFrame) -> dict: """Train RandomForestRegressor on question features. Algorithm: 1. Join question_type from questions table 2. Encode categorical columns (subject, question_type) with OrdinalEncoder 3. Build numeric feature matrix: [bloom_score, grade, subject_encoded, question_type_encoded] 4. Target: difficulty_score 5. Fit RandomForestRegressor(n_estimators=100, random_state=seed) 6. Return {"model": rf, "encoder": ordinal_enc, "feature_columns.json": feature_list} """ # Join question_type for both train and val train_df = self._load_with_question_type(train_df) # Fit OrdinalEncoder on categorical columns ordinal_enc = OrdinalEncoder( handle_unknown="use_encoded_value", unknown_value=-1, ) ordinal_enc.fit(train_df[CATEGORICAL_COLUMNS]) # Build feature matrix X_cat = ordinal_enc.transform(train_df[CATEGORICAL_COLUMNS]) X_num = train_df[NUMERIC_COLUMNS].values X_train = np.hstack([X_num, X_cat]) y_train = train_df[TARGET_COLUMN].values # Fit RandomForestRegressor rf = RandomForestRegressor( n_estimators=100, random_state=self._seed, ) rf.fit(X_train, y_train) logger.info( "Difficulty model trained — %d samples, %d features", X_train.shape[0], X_train.shape[1], ) return { "model": rf, "encoder": ordinal_enc, "feature_columns.json": FEATURE_COLUMNS, } def evaluate(self, artifacts: dict, df: pd.DataFrame, split_name: str) -> dict: """Evaluate model on a split. Computes: MAE, R-squared, per-bucket MAE (easy/medium/hard based on difficulty column). """ model = artifacts["model"] encoder = artifacts["encoder"] # Join question_type for evaluation data df = self._load_with_question_type(df) # Build feature matrix X_cat = encoder.transform(df[CATEGORICAL_COLUMNS]) X_num = df[NUMERIC_COLUMNS].values X = np.hstack([X_num, X_cat]) y_true = df[TARGET_COLUMN].values y_pred = model.predict(X) # Overall metrics mae = mean_absolute_error(y_true, y_pred) r2 = r2_score(y_true, y_pred) # Per-bucket MAE (easy/medium/hard based on difficulty column) per_bucket_mae = {} if "difficulty" in df.columns: for bucket in df["difficulty"].unique(): mask = df["difficulty"] == bucket if mask.sum() > 0: bucket_mae = mean_absolute_error( y_true[mask], y_pred[mask] ) per_bucket_mae[bucket.lower()] = round(bucket_mae, 4) metrics = { "mae": round(mae, 4), "r_squared": round(r2, 4), "per_bucket_mae": per_bucket_mae, } logger.info( "%s metrics — MAE: %.4f, R²: %.4f", split_name, mae, r2, ) return metrics def _check_baseline(self, metrics: dict) -> None: """Verify MAE < 0.5 (very lenient baseline for synthetic data). Raises TrainingError if not met. """ test_metrics = metrics.get("metrics", {}).get("test", {}) mae = test_metrics.get("mae") # Fallback to validation metrics if test not available if mae is None: val_metrics = metrics.get("metrics", {}).get("validation", {}) mae = val_metrics.get("mae") if mae is None: raise TrainingError( "Cannot compute baseline: MAE not found in metrics.", model_name=self.model_name, ) if mae >= 0.5: raise TrainingError( f"MAE ({mae:.4f}) does not meet baseline threshold (< 0.5). " f"Model performance is insufficient.", model_name=self.model_name, ) logger.info("Baseline check passed — MAE %.4f < 0.5", mae) def _build_metrics( self, val_metrics: dict, test_metrics: dict, train_df: pd.DataFrame, val_df: pd.DataFrame, test_df: pd.DataFrame, ) -> dict: """Assemble full metrics.json content.""" return { "model_name": self.model_name, "model_version": self.model_version, "dataset_version": settings.ai_service_version, "trained_at": datetime.now(timezone.utc).isoformat(), "seed": self._seed, "split_counts": { "train": len(train_df), "validation": len(val_df), "test": len(test_df), }, "metrics": { "validation": val_metrics, "test": test_metrics, }, "limitations": [ "Trained on synthetic data only.", "difficulty_score distribution may not reflect real-world difficulty.", "OrdinalEncoder assumes an ordering that may not be meaningful for subject/question_type.", "Per-bucket MAE depends on the quality of the difficulty string labels.", ], } def _build_training_config( self, train_df: pd.DataFrame, val_df: pd.DataFrame, test_df: pd.DataFrame, ) -> dict: """Build training_config.json with hyperparameters.""" return { "model_name": self.model_name, "model_version": self.model_version, "dataset_version": settings.ai_service_version, "seed": self._seed, "split_counts": { "train": len(train_df), "validation": len(val_df), "test": len(test_df), }, "hyperparameters": { "n_estimators": 100, "random_state": self._seed, "algorithm": "RandomForestRegressor", "encoder": "OrdinalEncoder", }, "feature_columns": FEATURE_COLUMNS, "categorical_columns": CATEGORICAL_COLUMNS, "numeric_columns": NUMERIC_COLUMNS, "target_column": TARGET_COLUMN, "algorithm": "RandomForestRegressor", } def _build_model_card(self, metrics: dict) -> str: """Generate model_card.md content.""" val_metrics = metrics.get("metrics", {}).get("validation", {}) test_metrics = metrics.get("metrics", {}).get("test", {}) card = f"""# Model Card: Difficulty Model ## Model Details - **Model Name:** {self.model_name} - **Model Version:** {self.model_version} - **Algorithm:** RandomForestRegressor - **Framework:** scikit-learn - **Trained At:** {metrics.get("trained_at", "N/A")} - **Seed:** {self._seed} ## Intended Use Estimate question difficulty as a continuous score in [0, 1] based on question features (bloom_score, grade, subject, question_type). Used in the difficulty estimation endpoint to predict how hard a question is for a given grade level. ## Training Data - **Source:** training_lo_tagging.csv + questions.csv (for question_type) - **Split Counts:** train={metrics.get("split_counts", {}).get("train", "N/A")}, \ validation={metrics.get("split_counts", {}).get("validation", "N/A")}, \ test={metrics.get("split_counts", {}).get("test", "N/A")} - **Features:** bloom_score (numeric), grade (numeric), subject (OrdinalEncoded), \ question_type (OrdinalEncoded) - **Target:** difficulty_score (continuous [0, 1]) ## Metrics ### Validation Set - MAE: {val_metrics.get("mae", "N/A")} - R-squared: {val_metrics.get("r_squared", "N/A")} - Per-bucket MAE: {val_metrics.get("per_bucket_mae", "N/A")} ### Test Set - MAE: {test_metrics.get("mae", "N/A")} - R-squared: {test_metrics.get("r_squared", "N/A")} - Per-bucket MAE: {test_metrics.get("per_bucket_mae", "N/A")} ## Known Limitations - Trained on synthetic data only — performance on real questions is unknown. - difficulty_score distribution may not reflect real-world difficulty. - OrdinalEncoder assumes an ordering that may not be meaningful for subject/question_type. - Per-bucket MAE depends on the quality of the difficulty string labels. - Limited feature set (4 features); text-based features could improve performance. ## Fallback Behavior When the model is not loaded or confidence is below threshold, the system falls back to a rule-based difficulty estimation using bloom_score and grade-level heuristics. """ return card