Spaces:
Sleeping
Sleeping
| """Difficulty Model training pipeline. | |
| Trains a RandomForestRegressor on question features for difficulty estimation. | |
| Target: difficulty_score (continuous [0, 1]). | |
| Features: bloom_score, grade, subject (encoded), question_type (encoded). | |
| Primary metric: MAE. | |
| """ | |
| import logging | |
| from datetime import datetime, timezone | |
| import numpy as np | |
| import pandas as pd | |
| from sklearn.ensemble import RandomForestRegressor | |
| from sklearn.metrics import mean_absolute_error, r2_score | |
| from sklearn.preprocessing import OrdinalEncoder | |
| from app.core.config import settings | |
| from app.core.exceptions import TrainingError | |
| from training.base_trainer import BaseTrainer, TrainingResult | |
| logger = logging.getLogger(__name__) | |
| FEATURE_COLUMNS = ["bloom_score", "grade", "subject", "question_type"] | |
| CATEGORICAL_COLUMNS = ["subject", "question_type"] | |
| NUMERIC_COLUMNS = ["bloom_score", "grade"] | |
| TARGET_COLUMN = "difficulty_score" | |
| class DifficultyModelTrainer(BaseTrainer): | |
| """RandomForestRegressor for question difficulty estimation. | |
| Target: difficulty_score (continuous [0, 1]) | |
| Features: bloom_score, grade, subject (encoded), question_type (encoded) | |
| Primary metric: MAE | |
| """ | |
| def model_name(self) -> str: | |
| return "difficulty_model" | |
| def model_version(self) -> str: | |
| return "difficulty_model_v2_baseline_001" | |
| def table_name(self) -> str: | |
| return "training_lo_tagging" | |
| def _load_with_question_type(self, df: pd.DataFrame) -> pd.DataFrame: | |
| """Join question_type from questions.csv since training_lo_tagging lacks it. | |
| The training_lo_tagging table does not include question_type, but the | |
| design requires it as a feature. We join on question_id from questions.csv. | |
| """ | |
| questions_df = self._loader.load_table("questions") | |
| question_type_map = questions_df[["question_id", "question_type"]].drop_duplicates() | |
| df = df.merge(question_type_map, on="question_id", how="left") | |
| # Fill any missing question_type with a default | |
| if df["question_type"].isna().any(): | |
| missing_count = df["question_type"].isna().sum() | |
| logger.warning( | |
| "Found %d rows with missing question_type after join; filling with 'unknown'", | |
| missing_count, | |
| ) | |
| df["question_type"] = df["question_type"].fillna("unknown") | |
| return df | |
| def train(self, train_df: pd.DataFrame, val_df: pd.DataFrame) -> dict: | |
| """Train RandomForestRegressor on question features. | |
| Algorithm: | |
| 1. Join question_type from questions table | |
| 2. Encode categorical columns (subject, question_type) with OrdinalEncoder | |
| 3. Build numeric feature matrix: [bloom_score, grade, subject_encoded, question_type_encoded] | |
| 4. Target: difficulty_score | |
| 5. Fit RandomForestRegressor(n_estimators=100, random_state=seed) | |
| 6. Return {"model": rf, "encoder": ordinal_enc, "feature_columns.json": feature_list} | |
| """ | |
| # Join question_type for both train and val | |
| train_df = self._load_with_question_type(train_df) | |
| # Fit OrdinalEncoder on categorical columns | |
| ordinal_enc = OrdinalEncoder( | |
| handle_unknown="use_encoded_value", | |
| unknown_value=-1, | |
| ) | |
| ordinal_enc.fit(train_df[CATEGORICAL_COLUMNS]) | |
| # Build feature matrix | |
| X_cat = ordinal_enc.transform(train_df[CATEGORICAL_COLUMNS]) | |
| X_num = train_df[NUMERIC_COLUMNS].values | |
| X_train = np.hstack([X_num, X_cat]) | |
| y_train = train_df[TARGET_COLUMN].values | |
| # Fit RandomForestRegressor | |
| rf = RandomForestRegressor( | |
| n_estimators=100, | |
| random_state=self._seed, | |
| ) | |
| rf.fit(X_train, y_train) | |
| logger.info( | |
| "Difficulty model trained — %d samples, %d features", | |
| X_train.shape[0], | |
| X_train.shape[1], | |
| ) | |
| return { | |
| "model": rf, | |
| "encoder": ordinal_enc, | |
| "feature_columns.json": FEATURE_COLUMNS, | |
| } | |
| def evaluate(self, artifacts: dict, df: pd.DataFrame, split_name: str) -> dict: | |
| """Evaluate model on a split. | |
| Computes: MAE, R-squared, per-bucket MAE (easy/medium/hard based on | |
| difficulty column). | |
| """ | |
| model = artifacts["model"] | |
| encoder = artifacts["encoder"] | |
| # Join question_type for evaluation data | |
| df = self._load_with_question_type(df) | |
| # Build feature matrix | |
| X_cat = encoder.transform(df[CATEGORICAL_COLUMNS]) | |
| X_num = df[NUMERIC_COLUMNS].values | |
| X = np.hstack([X_num, X_cat]) | |
| y_true = df[TARGET_COLUMN].values | |
| y_pred = model.predict(X) | |
| # Overall metrics | |
| mae = mean_absolute_error(y_true, y_pred) | |
| r2 = r2_score(y_true, y_pred) | |
| # Per-bucket MAE (easy/medium/hard based on difficulty column) | |
| per_bucket_mae = {} | |
| if "difficulty" in df.columns: | |
| for bucket in df["difficulty"].unique(): | |
| mask = df["difficulty"] == bucket | |
| if mask.sum() > 0: | |
| bucket_mae = mean_absolute_error( | |
| y_true[mask], y_pred[mask] | |
| ) | |
| per_bucket_mae[bucket.lower()] = round(bucket_mae, 4) | |
| metrics = { | |
| "mae": round(mae, 4), | |
| "r_squared": round(r2, 4), | |
| "per_bucket_mae": per_bucket_mae, | |
| } | |
| logger.info( | |
| "%s metrics — MAE: %.4f, R²: %.4f", | |
| split_name, mae, r2, | |
| ) | |
| return metrics | |
| def _check_baseline(self, metrics: dict) -> None: | |
| """Verify MAE < 0.5 (very lenient baseline for synthetic data). | |
| Raises TrainingError if not met. | |
| """ | |
| test_metrics = metrics.get("metrics", {}).get("test", {}) | |
| mae = test_metrics.get("mae") | |
| # Fallback to validation metrics if test not available | |
| if mae is None: | |
| val_metrics = metrics.get("metrics", {}).get("validation", {}) | |
| mae = val_metrics.get("mae") | |
| if mae is None: | |
| raise TrainingError( | |
| "Cannot compute baseline: MAE not found in metrics.", | |
| model_name=self.model_name, | |
| ) | |
| if mae >= 0.5: | |
| raise TrainingError( | |
| f"MAE ({mae:.4f}) does not meet baseline threshold (< 0.5). " | |
| f"Model performance is insufficient.", | |
| model_name=self.model_name, | |
| ) | |
| logger.info("Baseline check passed — MAE %.4f < 0.5", mae) | |
| def _build_metrics( | |
| self, | |
| val_metrics: dict, | |
| test_metrics: dict, | |
| train_df: pd.DataFrame, | |
| val_df: pd.DataFrame, | |
| test_df: pd.DataFrame, | |
| ) -> dict: | |
| """Assemble full metrics.json content.""" | |
| return { | |
| "model_name": self.model_name, | |
| "model_version": self.model_version, | |
| "dataset_version": settings.ai_service_version, | |
| "trained_at": datetime.now(timezone.utc).isoformat(), | |
| "seed": self._seed, | |
| "split_counts": { | |
| "train": len(train_df), | |
| "validation": len(val_df), | |
| "test": len(test_df), | |
| }, | |
| "metrics": { | |
| "validation": val_metrics, | |
| "test": test_metrics, | |
| }, | |
| "limitations": [ | |
| "Trained on synthetic data only.", | |
| "difficulty_score distribution may not reflect real-world difficulty.", | |
| "OrdinalEncoder assumes an ordering that may not be meaningful for subject/question_type.", | |
| "Per-bucket MAE depends on the quality of the difficulty string labels.", | |
| ], | |
| } | |
| def _build_training_config( | |
| self, | |
| train_df: pd.DataFrame, | |
| val_df: pd.DataFrame, | |
| test_df: pd.DataFrame, | |
| ) -> dict: | |
| """Build training_config.json with hyperparameters.""" | |
| return { | |
| "model_name": self.model_name, | |
| "model_version": self.model_version, | |
| "dataset_version": settings.ai_service_version, | |
| "seed": self._seed, | |
| "split_counts": { | |
| "train": len(train_df), | |
| "validation": len(val_df), | |
| "test": len(test_df), | |
| }, | |
| "hyperparameters": { | |
| "n_estimators": 100, | |
| "random_state": self._seed, | |
| "algorithm": "RandomForestRegressor", | |
| "encoder": "OrdinalEncoder", | |
| }, | |
| "feature_columns": FEATURE_COLUMNS, | |
| "categorical_columns": CATEGORICAL_COLUMNS, | |
| "numeric_columns": NUMERIC_COLUMNS, | |
| "target_column": TARGET_COLUMN, | |
| "algorithm": "RandomForestRegressor", | |
| } | |
| def _build_model_card(self, metrics: dict) -> str: | |
| """Generate model_card.md content.""" | |
| val_metrics = metrics.get("metrics", {}).get("validation", {}) | |
| test_metrics = metrics.get("metrics", {}).get("test", {}) | |
| card = f"""# Model Card: Difficulty Model | |
| ## Model Details | |
| - **Model Name:** {self.model_name} | |
| - **Model Version:** {self.model_version} | |
| - **Algorithm:** RandomForestRegressor | |
| - **Framework:** scikit-learn | |
| - **Trained At:** {metrics.get("trained_at", "N/A")} | |
| - **Seed:** {self._seed} | |
| ## Intended Use | |
| Estimate question difficulty as a continuous score in [0, 1] based on | |
| question features (bloom_score, grade, subject, question_type). Used in | |
| the difficulty estimation endpoint to predict how hard a question is for | |
| a given grade level. | |
| ## Training Data | |
| - **Source:** training_lo_tagging.csv + questions.csv (for question_type) | |
| - **Split Counts:** train={metrics.get("split_counts", {}).get("train", "N/A")}, \ | |
| validation={metrics.get("split_counts", {}).get("validation", "N/A")}, \ | |
| test={metrics.get("split_counts", {}).get("test", "N/A")} | |
| - **Features:** bloom_score (numeric), grade (numeric), subject (OrdinalEncoded), \ | |
| question_type (OrdinalEncoded) | |
| - **Target:** difficulty_score (continuous [0, 1]) | |
| ## Metrics | |
| ### Validation Set | |
| - MAE: {val_metrics.get("mae", "N/A")} | |
| - R-squared: {val_metrics.get("r_squared", "N/A")} | |
| - Per-bucket MAE: {val_metrics.get("per_bucket_mae", "N/A")} | |
| ### Test Set | |
| - MAE: {test_metrics.get("mae", "N/A")} | |
| - R-squared: {test_metrics.get("r_squared", "N/A")} | |
| - Per-bucket MAE: {test_metrics.get("per_bucket_mae", "N/A")} | |
| ## Known Limitations | |
| - Trained on synthetic data only — performance on real questions is unknown. | |
| - difficulty_score distribution may not reflect real-world difficulty. | |
| - OrdinalEncoder assumes an ordering that may not be meaningful for subject/question_type. | |
| - Per-bucket MAE depends on the quality of the difficulty string labels. | |
| - Limited feature set (4 features); text-based features could improve performance. | |
| ## Fallback Behavior | |
| When the model is not loaded or confidence is below threshold, the system | |
| falls back to a rule-based difficulty estimation using bloom_score and | |
| grade-level heuristics. | |
| """ | |
| return card | |