import numpy as np import pandas as pd import shap import optuna import json import platform import sys from typing import Any, Dict from sklearn.model_selection import ( train_test_split, cross_val_score, KFold, StratifiedKFold, ) from sklearn.metrics import ( accuracy_score, r2_score, precision_score, recall_score, f1_score, mean_squared_error, mean_absolute_error, ) from sklearn.preprocessing import LabelEncoder from sklearn.pipeline import Pipeline try: from lightgbm import LGBMClassifier, LGBMRegressor LGBM_TYPES = (LGBMClassifier, LGBMRegressor) except Exception: LGBMClassifier = None LGBMRegressor = None LGBM_TYPES = tuple() from core.pipeline_engine import PipelineComponent, PipelineContext, PipelineStep from core.feature_engine import ManagedFeatureEngine from core.integrations import MLTracking from infra.database import DatasetModel, get_db from infra.storage import ModelRegistry, DataContract, get_model_path, get_schema_path, save_metrics from services.training.preprocessing import ( make_lite_preprocessor, make_preprocessor, DataAgent, ) from services.training.evaluator import _resolve_scoring, stability_check from services.training.model_selector import ModelSelector class DataValidationComponent(PipelineComponent): def get_step_type(self) -> PipelineStep: return PipelineStep.VALIDATE def execute(self, ctx: PipelineContext): df = pd.read_csv(ctx.file_path) ctx.reasoning.append( f"DataValidation: Loaded dataset with {df.shape[1]} columns: {list(df.columns)}" ) if ctx.target_column not in df.columns: raise ValueError( f"Target column '{ctx.target_column}' not found in dataset" ) selected_features = ctx.config.get("selected_features") if selected_features: keep_cols = list(selected_features) + [ctx.target_column] available = [c for c in keep_cols if c in df.columns] df = df[available] ctx.reasoning.append( f"DataValidation: Feature selection applied, keeping {len(available)} columns: {available}" ) # 1. Null Value Thresholds (NaN > 90% is critical failure) nan_percentages = df.isna().mean() high_nan_cols = nan_percentages[nan_percentages > 0.9].index.tolist() if high_nan_cols: ctx.reasoning.append( f"DataValidation: Warning - dropping columns with >90% missing values: {high_nan_cols}" ) df = df.drop(columns=high_nan_cols) # 2. Strict Type Parsing for col in df.columns: if df[col].dtype == object: try: df[col] = pd.to_numeric(df[col]) except Exception: pass # 3. Target Leakage Detection y_preview = df[ctx.target_column].dropna() task_type = ( "classification" if ( not pd.api.types.is_numeric_dtype(y_preview) or y_preview.nunique() <= 20 ) else "regression" ) fe = ManagedFeatureEngine(target_col=ctx.target_column, task_type=task_type) leaks = fe.detect_leakage(df) if leaks: ctx.reasoning.append( f"DataValidation: LeakageGuard detected and dropped suspicious target-leaking columns early: {leaks}" ) df = df.drop(columns=leaks) ctx.reasoning.append( f"DataValidation: After validation, {df.shape[1]} columns remain: {list(df.columns)}" ) # Enforce Data Contract immediately DataContract.save_contract(ctx.job_id, df) # Fit statistical baseline from core.drift_detector import DriftDetector DriftDetector(ctx.job_id).fit_baseline(df) ctx.df = df ctx.reasoning.append( "DataValidation: Uploaded data matches contract. Constraints enforced." ) class FeatureEngineeringComponent(PipelineComponent): def get_step_type(self) -> PipelineStep: return PipelineStep.FEATURE_ENG def execute(self, ctx: PipelineContext): df = ctx.df ctx.reasoning.append( f"FeatureEngineer: Starting with {df.shape[1]} columns: {list(df.columns)}" ) if ctx.config.get("auto_clean", True): da = DataAgent() df, logs = da.clean(df, ctx.target_column) ctx.reasoning.extend(logs) ctx.reasoning.append( f"DataCleaner: After cleaning, {df.shape[1]} columns remain: {list(df.columns)}" ) else: ctx.reasoning.append("DataCleaner: Auto-clean disabled.") task_type = ( "classification" if ( not pd.api.types.is_numeric_dtype(df[ctx.target_column].dropna()) or df[ctx.target_column].dropna().nunique() <= 20 ) else "regression" ) fe = ManagedFeatureEngine(target_col=ctx.target_column, task_type=task_type) should_generate_features = ( ctx.mode in ["Balanced", "Full"] and len(df) <= 25000 and max(len(df.columns), 1) <= 120 ) if should_generate_features: df = fe.generate_features(df) ctx.reasoning.append(f"FeatureEngine: Generated {df.shape[1]} features.") elif ctx.mode in ["Balanced", "Full"]: ctx.reasoning.append( "FeatureEngine: Skipped heavy automated feature synthesis because the dataset is large or wide." ) y_raw = df[ctx.target_column] X = df.drop(columns=[ctx.target_column]) invalid_target = y_raw.isna() if y_raw.dtype == object or pd.api.types.is_string_dtype(y_raw): sr = y_raw.astype(str).str.strip().str.lower() invalid_target = invalid_target | sr.isin( ( "nan", "none", "", "na", "n/a", "null", "?", "unknown", "??", "invalid", ) ) dropped_target = int(invalid_target.sum()) if dropped_target > 0: ctx.reasoning.append( f"TargetCleaner: Removed {dropped_target} invalid target rows." ) X = X.loc[~invalid_target].reset_index(drop=True) y = y_raw.loc[~invalid_target].reset_index(drop=True) if len(y) == 0: raise ValueError( "No rows left after dropping invalid target rows. Cannot train." ) ctx.eda_summary = { "rows_after_target_cleaning": int(len(y)), "columns_after_feature_engineering": int(X.shape[1]), "target_missing_removed": dropped_target, "numeric_features": int(X.select_dtypes(include=[np.number]).shape[1]), "categorical_features": int( X.select_dtypes(include=["object", "category", "bool"]).shape[1] ), "feature_synthesis_applied": bool(should_generate_features), } ctx.is_classification = ( not pd.api.types.is_numeric_dtype(y) or y.nunique() <= 20 ) if ctx.is_classification: le = LabelEncoder() y = pd.Series(le.fit_transform(y.astype(str))) ctx.num_cols = X.select_dtypes(include=[np.number]).columns.tolist() ctx.cat_cols = X.select_dtypes(include=["object", "category"]).columns.tolist() # Validate that we have at least some columns if len(ctx.num_cols) == 0 and len(ctx.cat_cols) == 0: error_msg = ( f"No usable columns found after feature engineering. X shape: {X.shape}, dtypes:\n{X.dtypes}\n\n" "This usually means:\n" "1. All features were dropped during data cleaning (identifiers, constants, high missing values)\n" "2. Target leakage detection removed all features\n" "3. Feature selection was too restrictive\n" "4. The dataset only contains the target column\n\n" "Check the reasoning logs above for details on what columns were dropped and why." ) raise ValueError(error_msg) ctx.reasoning.append( f"FeatureEngineer: Identified {len(ctx.num_cols)} numeric and {len(ctx.cat_cols)} categorical features." ) split_kwargs = {"test_size": 0.2, "random_state": 42} if ctx.is_classification: split_kwargs["stratify"] = y try: X_train, X_test, y_train, y_test = train_test_split(X, y, **split_kwargs) except Exception: X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, random_state=42 ) ctx.X_train, ctx.X_test = X_train, X_test ctx.y_train, ctx.y_test = y_train, y_test ctx.X, ctx.y = X, y class ModelSelectionComponent(PipelineComponent): def get_step_type(self) -> PipelineStep: return PipelineStep.TRAIN def execute(self, ctx: PipelineContext): profile = { "rows": len(ctx.X), "cols": len(ctx.X.columns), "num_cols": ctx.num_cols, "cat_cols": ctx.cat_cols, "column_stats": {}, } model_pool, meta_rec = ModelSelector.select_pool( len(ctx.X), ctx.is_classification, ctx.goal, profile ) ctx.reasoning.append( f"Meta-Learner: {meta_rec['reason']} (Source: {meta_rec['source']})" ) ctx.model_pool = model_pool if ctx.mode == "Full": ctx.preprocessor = make_preprocessor(ctx.num_cols, ctx.cat_cols) ctx.reasoning.append( "Preprocessor: Full mode selected richer preprocessing with skew/outlier/interactions." ) else: ctx.preprocessor = make_lite_preprocessor(ctx.num_cols, ctx.cat_cols) ctx.reasoning.append( "Preprocessor: Lite preprocessing selected for faster iteration." ) if len(ctx.num_cols) >= 40: ctx.reasoning.append( "Dimensionality: PCA compression enabled for the numeric branch to keep wide datasets responsive." ) class TrainingComponent(PipelineComponent): def get_step_type(self) -> PipelineStep: return PipelineStep.TRAIN def _apply_imbalance_strategy(self, model, ctx: PipelineContext): if not ctx.is_classification or not ctx.config.get("handle_imbalance"): return model try: if hasattr(model, "get_params") and "class_weight" in model.get_params(): model.set_params(class_weight="balanced") except Exception: pass try: value_counts = pd.Series(ctx.y_train).value_counts() if len(value_counts) == 2 and hasattr(model, "get_params"): params = model.get_params() if "scale_pos_weight" in params: majority = max(value_counts.max(), 1) minority = max(value_counts.min(), 1) model.set_params(scale_pos_weight=float(majority / minority)) except Exception: pass return model def _execution_profile(self, ctx: PipelineContext) -> Dict[str, Any]: rows = len(ctx.X_train) if ctx.mode == "Fast": return { "sweep_size": 0.2 if rows < 5000 else 0.08, "top_k": 1, "n_trials": 0, "timeout": 0, "run_optuna": False, "use_full_preprocessor": False, } if ctx.mode == "Balanced": return { "sweep_size": 0.35 if rows < 5000 else 0.12, "top_k": 2, "n_trials": 12, "timeout": 120, "run_optuna": True, "use_full_preprocessor": False, } return { "sweep_size": 0.5 if rows < 5000 else 0.2, "top_k": 3, "n_trials": 32, "timeout": 360, "run_optuna": True, "use_full_preprocessor": True, } def _safe_train_subset( self, X, y, train_size: float, is_classification: bool, random_state: int = 42 ): y_series = pd.Series(y) total_rows = len(y_series) if total_rows <= 2: return X, y desired_train = max(2, min(total_rows - 1, int(round(total_rows * train_size)))) test_size = max(1, total_rows - desired_train) split_kwargs = {"train_size": desired_train, "random_state": random_state} if is_classification and y_series.nunique() > 1: min_class_count = int(y_series.value_counts().min()) if test_size >= y_series.nunique() and min_class_count >= 2: split_kwargs["stratify"] = y try: X_train_slice, _, y_train_slice, _ = train_test_split(X, y, **split_kwargs) return X_train_slice, y_train_slice except Exception: try: X_train_slice, _, y_train_slice, _ = train_test_split( X, y, train_size=desired_train, random_state=random_state ) return X_train_slice, y_train_slice except Exception: return X, y def _safe_holdout_split( self, X, y, is_classification: bool, random_state: int = 42 ): y_series = pd.Series(y) total_rows = len(y_series) if total_rows <= 2: return X, X, y, y test_size = max(1, int(round(total_rows * 0.2))) if test_size >= total_rows: test_size = 1 split_kwargs = {"test_size": test_size, "random_state": random_state} if is_classification and y_series.nunique() > 1: min_class_count = int(y_series.value_counts().min()) if test_size >= y_series.nunique() and min_class_count >= 2: split_kwargs["stratify"] = y try: return train_test_split(X, y, **split_kwargs) except Exception: return train_test_split( X, y, test_size=test_size, random_state=random_state ) def _safe_cv(self, y, requested_folds: int, is_classification: bool): if requested_folds < 2: return None if not is_classification: return KFold( n_splits=min(requested_folds, max(2, len(pd.Series(y)) // 2)), shuffle=True, random_state=42, ) y_series = pd.Series(y) min_class_count = ( int(y_series.value_counts().min()) if not y_series.empty else 0 ) safe_folds = min(requested_folds, min_class_count) if safe_folds < 2: return None return StratifiedKFold(n_splits=safe_folds, shuffle=True, random_state=42) def execute(self, ctx: PipelineContext): import traceback execution_profile = self._execution_profile(ctx) # Validate model pool if not ctx.model_pool: raise ValueError( "ModelSelectionComponent failed to populate model_pool. No models available for training." ) ctx.reasoning.append( f"ExecutionProfile: goal={ctx.goal}, mode={ctx.mode}, " f"models={list(ctx.model_pool.keys())}, sweep_size={execution_profile['sweep_size']}, " f"top_k={execution_profile['top_k']}, optuna={execution_profile['run_optuna']}" ) ctx.reasoning.append("🏁 Stage 1: Starting Exploration Sweep") ctx.record_history("Sweep Start", "Running", phase="sweep") sweep_size = execution_profile["sweep_size"] requested_rows = max(64, int(len(ctx.X_train) * sweep_size)) sample_rows = min(requested_rows, 5000 if ctx.mode == "Fast" else 8000) effective_train_size = min(sample_rows / max(len(ctx.X_train), 1), 0.95) execution_profile["effective_sweep_rows"] = sample_rows X_swp, y_swp = self._safe_train_subset( ctx.X_train, ctx.y_train, train_size=effective_train_size, is_classification=ctx.is_classification, random_state=42, ) # Validate sweep data ctx.reasoning.append(f"Sweep Data Shape: X={X_swp.shape}, y={y_swp.shape}") if len(X_swp) == 0: raise ValueError("Sweep subset is empty after train/test split.") if len(y_swp) == 0: raise ValueError("Sweep target is empty after train/test split.") # Preprocess sweep data try: X_swp_proc = ctx.preprocessor.fit_transform(X_swp, y_swp) ctx.reasoning.append(f"Preprocessed Shape: {X_swp_proc.shape}") except Exception as e: raise ValueError( f"Preprocessor failed to transform sweep data: {type(e).__name__}: {str(e)}\n{traceback.format_exc()}" ) sweep_results = [] model_debug_rows = [] for name, model in ctx.model_pool.items(): cheap_config = ModelSelector.get_cheap_config(name, ctx.is_classification) try: model.set_params(**cheap_config) model = self._apply_imbalance_strategy(model, ctx) model.fit(X_swp_proc, y_swp) score, std_score, metric_extras = stability_check( model, X_swp_proc, y_swp, ctx.is_classification ) row = { "name": name, "score": score, "stability_std": round(std_score * 100, 2), "model": model, "phase": "sweep", "cheap_config": cheap_config, } row.update(metric_extras) sweep_results.append(row) model_debug_rows.append( { "model": name, "phase": "sweep", "status": "ok", "sweep_score": round(score * 100, 2), "stability_std": round(std_score * 100, 2), "cheap_config": cheap_config, "optimized": False, "error": None, **metric_extras, } ) ctx.reasoning.append(f"Sweep: {name} scored {score:.3f}") ctx.record_history( name, round(score * 100, 2), phase="sweep", status="ok" ) except Exception as e: error_detail = f"{type(e).__name__}: {str(e)}" tb_str = traceback.format_exc() model_debug_rows.append( { "model": name, "phase": "sweep", "status": "failed", "sweep_score": None, "stability_std": None, "cheap_config": cheap_config, "optimized": False, "error": error_detail, } ) ctx.reasoning.append(f"Sweep Failed for {name}: {error_detail}") ctx.reasoning.append(f"Traceback: {tb_str}") ctx.record_history( name, f"failed: {error_detail}", phase="sweep", status="failed" ) sweep_results.sort(key=lambda x: x["score"], reverse=True) top_candidates = sweep_results[: execution_profile["top_k"]] ctx.sweep_results = sweep_results ctx.tested_models = model_debug_rows if not top_candidates: failed_models = [ (r["model"], r["error"]) for r in model_debug_rows if r["status"] == "failed" ] error_summary = "\n".join( [f" {name}: {error}" for name, error in failed_models] ) raise ValueError( f"No candidate models completed the exploration sweep.\nFailed models:\n{error_summary}" ) winner_pool_name = None final_model = None if not execution_profile["run_optuna"]: ctx.reasoning.append( "Execution: Fast mode selected. Skipping Bayesian Opt." ) final_model = top_candidates[0]["model"] winner_pool_name = top_candidates[0]["name"] else: ctx.reasoning.append( f"🚀 Stage 2: Deep Dive optimization for: {[c['name'] for c in top_candidates]}" ) ctx.record_history("Optimization", "Running", phase="optuna") best_overall_score = -1 for candidate in top_candidates: name = candidate["name"] def objective(trial): try: p = ModelSelector.get_bayesian_space(trial, name) m = ctx.model_pool[name].__class__(**p) m = self._apply_imbalance_strategy(m, ctx) if LGBM_TYPES and isinstance(m, LGBM_TYPES): try: m.set_params(verbose=-1) except Exception: pass pipe = Pipeline([("pre", ctx.preprocessor), ("m", m)]) metric = ctx.config.get("eval_metric", "") cv_folds = int(ctx.config.get("cv_folds", 0) or 0) if cv_folds >= 2: cv = self._safe_cv( ctx.y_train, cv_folds, ctx.is_classification ) if cv is not None: return cross_val_score( pipe, ctx.X_train, ctx.y_train, cv=cv, scoring=_resolve_scoring( metric, ctx.is_classification ), ).mean() X_tr, X_val, y_tr, y_val = self._safe_holdout_split( ctx.X_train, ctx.y_train, is_classification=ctx.is_classification, random_state=42, ) pipe.fit(X_tr, y_tr) preds = pipe.predict(X_val) if ctx.is_classification: scoring_name = _resolve_scoring(metric, True) if scoring_name == "f1_weighted": return f1_score( y_val, preds, average="weighted", zero_division=0 ) if scoring_name == "precision_weighted": return precision_score( y_val, preds, average="weighted", zero_division=0 ) if scoring_name == "recall_weighted": return recall_score( y_val, preds, average="weighted", zero_division=0 ) return accuracy_score(y_val, preds) scoring_name = _resolve_scoring(metric, False) if scoring_name == "neg_mean_absolute_error": return -mean_absolute_error(y_val, preds) if scoring_name == "neg_mean_squared_error": return -mean_squared_error(y_val, preds) if scoring_name == "neg_root_mean_squared_error": return -float(np.sqrt(mean_squared_error(y_val, preds))) return r2_score(y_val, preds) except Exception: return 0 n_trials = execution_profile["n_trials"] study = optuna.create_study(direction="maximize") study.optimize( objective, n_trials=n_trials, timeout=execution_profile["timeout"] ) opt_row = next( (r for r in model_debug_rows if r["model"] == name), None ) if opt_row is not None: opt_row["optimized"] = True opt_row["optuna_trials"] = n_trials opt_row["best_cv_score"] = ( round(float(study.best_value) * 100, 2) if study.best_value is not None else None ) opt_row["best_params"] = study.best_params if study.best_value is not None: ctx.record_history( f"{name} CV", round(float(study.best_value) * 100, 2), phase="optuna", status="ok", ) if study.best_value > best_overall_score: best_overall_score = study.best_value final_model = ctx.model_pool[name].__class__(**study.best_params) final_model = self._apply_imbalance_strategy(final_model, ctx) winner_pool_name = name if not final_model: final_model = top_candidates[0]["model"] winner_pool_name = top_candidates[0]["name"] ctx.final_model = final_model ctx.winner_pool_name = winner_pool_name if ctx.config.get("handle_imbalance") and ctx.is_classification: ctx.reasoning.append( "ImbalanceStrategy: Enabled balanced weighting for supported classifiers." ) ctx.reasoning.append("🏁 Training final production pipe on full dataset...") final_pipe = Pipeline( [("preprocessor", ctx.preprocessor), ("model", final_model)] ) final_pipe.fit(ctx.X_train, ctx.y_train) ctx.final_model = final_pipe ctx.execution_profile = execution_profile class EvaluationComponent(PipelineComponent): def get_step_type(self) -> PipelineStep: return PipelineStep.EVALUATE def _package_versions(self) -> Dict[str, Any]: versions = { "python": sys.version.split()[0], "platform": platform.platform(), "pandas": getattr(pd, "__version__", None), "numpy": getattr(np, "__version__", None), "scikit_learn": getattr(__import__("sklearn"), "__version__", None), "xgboost": getattr(__import__("xgboost"), "__version__", None), "optuna": getattr(optuna, "__version__", None), "shap": getattr(shap, "__version__", None), "lightgbm": None, } try: import lightgbm as lgb versions["lightgbm"] = getattr(lgb, "__version__", None) except Exception: versions["lightgbm"] = "not_installed" return versions def execute(self, ctx: PipelineContext): preds = ctx.final_model.predict(ctx.X_test) execution_profile = getattr( ctx, "execution_profile", None ) or TrainingComponent()._execution_profile(ctx) sweep_size = execution_profile["sweep_size"] score = ( accuracy_score(ctx.y_test, preds) if ctx.is_classification else r2_score(ctx.y_test, preds) ) ctx.final_score = score # Explainability shap_summary = {} try: X_test_proc = ctx.preprocessor.transform(ctx.X_test) underlying_model = ctx.final_model.named_steps["model"] explainer = shap.Explainer(underlying_model, X_test_proc) shap_vals = explainer(X_test_proc[:50]) importances = np.abs(shap_vals.values).mean(axis=0) if len(importances.shape) > 1: importances = importances.mean(axis=1) f_names = ctx.preprocessor.get_feature_names_out() for i, f in enumerate(f_names[:8]): shap_summary[f.split("__")[-1]] = float(importances[i]) except Exception as e: ctx.reasoning.append(f"Explainability SHAP skipped ({e})") ctx.shap_summary = shap_summary final_sc = round(score * 100, 1) ctx.record_history("Final", final_sc, phase="holdout_test", status="ok") lb = [ {"model": ctx.winner_pool_name, "score": final_sc, "phase": "holdout_test"} ] for r in ctx.sweep_results: if r["name"] == ctx.winner_pool_name: continue row = { "model": r["name"], "score": round(r["score"] * 100, 1), "phase": "sweep", } for k in ("precision", "recall", "f1", "mse", "mae"): if k in r: row[k] = r[k] lb.append(row) if ctx.is_classification: lb[0]["precision"] = round( float( precision_score( ctx.y_test, preds, average="weighted", zero_division=0 ) ) * 100, 1, ) lb[0]["recall"] = round( float( recall_score(ctx.y_test, preds, average="weighted", zero_division=0) ) * 100, 1, ) lb[0]["f1"] = round( float(f1_score(ctx.y_test, preds, average="weighted", zero_division=0)) * 100, 1, ) else: lb[0]["mse"] = round(float(mean_squared_error(ctx.y_test, preds)), 6) lb[0]["mae"] = round(float(mean_absolute_error(ctx.y_test, preds)), 6) winner_debug = next( (r for r in ctx.tested_models if r["model"] == ctx.winner_pool_name), None ) if winner_debug is not None: winner_debug["phase"] = "holdout_test" winner_debug["holdout_score"] = final_sc winner_debug["winner"] = True if ctx.is_classification: winner_debug["precision"] = lb[0].get("precision") winner_debug["recall"] = lb[0].get("recall") winner_debug["f1"] = lb[0].get("f1") else: winner_debug["mse"] = lb[0].get("mse") winner_debug["mae"] = lb[0].get("mae") ctx.leaderboard = lb schema_hash = None schema_snapshot = {} try: schema_path = get_schema_path(ctx.job_id) with open(schema_path, "r", encoding="utf-8") as handle: schema_snapshot = json.load(handle) schema_hash = schema_snapshot.get("hash") except Exception: schema_snapshot = {} with get_db() as db: dataset_row = db.query(DatasetModel).filter(DatasetModel.id == ctx.dataset_id).first() reproducibility = { "job_id": ctx.job_id, "dataset_id": ctx.dataset_id, "parent_dataset_id": dataset_row.parent_dataset_id if dataset_row else None, "dataset_source_type": dataset_row.source_type if dataset_row else None, "schema_hash": schema_hash, "selected_features": list(ctx.config.get("selected_features") or []), "selected_feature_count": len(ctx.config.get("selected_features") or []) or len(ctx.num_cols + ctx.cat_cols), "auto_clean": bool(ctx.config.get("auto_clean", True)), "handle_imbalance": bool(ctx.config.get("handle_imbalance", False)), "cv_folds_used": int(ctx.config.get("cv_folds", 0) or 0), "eval_metric_requested": ctx.config.get("eval_metric") or ("Accuracy" if ctx.is_classification else "R²"), "train_test_split_random_state": 42, "stability_seeds": [42, 123, 999], "schema_column_count": len((schema_snapshot.get("schema") or {}).keys()), "package_versions": self._package_versions(), } metadata = { "task_type": "classification" if ctx.is_classification else "regression", "eval_metric_requested": ctx.config.get("eval_metric") or ("Accuracy" if ctx.is_classification else "R²"), "cv_folds_used": int(ctx.config.get("cv_folds", 0) or 0), "preprocessor": ( "full_column_transformer" if execution_profile["use_full_preprocessor"] else "lite_column_transformer" ), "feature_names": ctx.num_cols + ctx.cat_cols, "pca_applied": False, "reproducibility": reproducibility, } pca_components_used = None try: num_pipeline = ctx.preprocessor.named_transformers_.get("num") if num_pipeline and "pca" in getattr(num_pipeline, "named_steps", {}): pca_components_used = int(num_pipeline.named_steps["pca"].n_components_) except Exception: pca_components_used = None ctx.eda_summary["pca_applied"] = bool(pca_components_used) ctx.eda_summary["pca_components_used"] = pca_components_used metadata["pca_applied"] = bool(pca_components_used) metadata["pca_components_used"] = pca_components_used # Save explicit model using ModelRegistry ModelRegistry.save_model(ctx.job_id, ctx.final_model, metadata) ctx.metrics = { "best_model": ctx.winner_pool_name, "metric_name": "Accuracy" if ctx.is_classification else "R² Score", "score": final_sc, "leaderboard": lb, "is_classification": ctx.is_classification, "shap_summary": shap_summary, "model_path": get_model_path(ctx.job_id), "feature_names": ctx.num_cols + ctx.cat_cols, "target": ctx.target_column, "eda_summary": ctx.eda_summary, "model_metadata": metadata, "reasoning": ctx.reasoning, "goal": ctx.goal, "mode": ctx.mode, "execution_profile": execution_profile, "tested_models": ctx.tested_models, "reproducibility_snapshot": reproducibility, "dataset_lineage_snapshot": { "dataset_id": ctx.dataset_id, "parent_dataset_id": dataset_row.parent_dataset_id if dataset_row else None, "source_type": dataset_row.source_type if dataset_row else None, }, } save_metrics(ctx.job_id, ctx.metrics) MLTracking.log_run( job_id=ctx.job_id, params={ "best_model": ctx.winner_pool_name, "mode": ctx.mode, "goal": ctx.goal, "sweep_size": sweep_size, "effective_sweep_rows": execution_profile.get("effective_sweep_rows"), "top_k": execution_profile["top_k"], "optuna_trials": execution_profile["n_trials"], "cv_folds": ctx.config.get("cv_folds", 0), "metric_name": ctx.metrics["metric_name"], }, metrics=ctx.metrics, model=ctx.final_model, artifact_path=get_model_path(ctx.job_id), )