Data-Science-Agent / src /tools /autogluon_training.py
Pulastya B
Fixed Bugs where the SSE Streaming was improper added support for Auto Gluon, Fixed instances where stale schemas was causing EDA plots generation to fail
1ca2e0e
"""
AutoGluon-Powered Training Tools
Replaces manual model training with AutoGluon's automated ML for better accuracy,
automatic ensembling, and built-in handling of raw data (no pre-encoding needed).
Supports:
- Classification (binary + multiclass)
- Regression
- Time Series Forecasting (NEW capability)
Scalability safeguards:
- time_limit prevents runaway training
- presets control compute budget
- num_cpus capped to avoid hogging shared resources
- Memory-aware: excludes heavy models on limited RAM
"""
import os
import json
import time
import shutil
import warnings
from typing import Dict, Any, Optional, List
from pathlib import Path
import pandas as pd
import numpy as np
warnings.filterwarnings('ignore')
# Lazy import AutoGluon to avoid slow startup
AUTOGLUON_TABULAR_AVAILABLE = False
AUTOGLUON_TIMESERIES_AVAILABLE = False
def _ensure_autogluon_tabular():
global AUTOGLUON_TABULAR_AVAILABLE
try:
from autogluon.tabular import TabularPredictor, TabularDataset
AUTOGLUON_TABULAR_AVAILABLE = True
return TabularPredictor, TabularDataset
except ImportError:
raise ImportError(
"AutoGluon tabular not installed. Run: pip install autogluon.tabular"
)
def _ensure_autogluon_timeseries():
global AUTOGLUON_TIMESERIES_AVAILABLE
try:
from autogluon.timeseries import TimeSeriesPredictor, TimeSeriesDataFrame
AUTOGLUON_TIMESERIES_AVAILABLE = True
return TimeSeriesPredictor, TimeSeriesDataFrame
except ImportError:
raise ImportError(
"AutoGluon timeseries not installed. Run: pip install autogluon.timeseries"
)
# ============================================================
# RESOURCE CONFIGURATION
# Adapt to deployment environment (HF Spaces, local, cloud)
# ============================================================
def _get_resource_config() -> Dict[str, Any]:
"""
Detect available resources and return safe training config.
Prevents AutoGluon from consuming too much memory/CPU on shared infra.
"""
import psutil
total_ram_gb = psutil.virtual_memory().total / (1024 ** 3)
cpu_count = os.cpu_count() or 2
# Conservative defaults for shared environments (HF Spaces = 16GB, 2-8 vCPU)
config = {
"num_cpus": min(cpu_count, 4), # Cap at 4 to leave room for other users
"num_gpus": 0, # No GPU on free HF Spaces
}
if total_ram_gb < 8:
config["presets"] = "medium_quality"
config["excluded_model_types"] = ["NN_TORCH", "FASTAI", "KNN"]
config["time_limit"] = 60
elif total_ram_gb < 16:
config["presets"] = "medium_quality"
config["excluded_model_types"] = ["NN_TORCH", "FASTAI"]
config["time_limit"] = 120
else:
config["presets"] = "best_quality"
config["excluded_model_types"] = ["NN_TORCH"] # Still skip neural nets for speed
config["time_limit"] = 180
return config
# ============================================================
# TABULAR: Classification + Regression
# ============================================================
def train_with_autogluon(
file_path: str,
target_col: str,
task_type: str = "auto",
time_limit: int = 120,
presets: str = "medium_quality",
eval_metric: Optional[str] = None,
output_dir: Optional[str] = None,
infer_limit: Optional[float] = None
) -> Dict[str, Any]:
"""
Train ML models using AutoGluon's automated approach.
Handles raw data directly — no need to pre-encode categoricals or impute missing values.
Automatically trains multiple models, performs stacking, and returns the best ensemble.
Supports: classification (binary/multiclass), regression.
Args:
file_path: Path to CSV/Parquet dataset
target_col: Column to predict
task_type: 'classification', 'regression', or 'auto' (auto-detected)
time_limit: Max training time in seconds (default 120 = 2 minutes)
presets: Quality preset - 'medium_quality' (fast), 'best_quality' (slower, better),
'good_quality' (balanced)
eval_metric: Metric to optimize (auto-selected if None).
Classification: 'accuracy', 'f1', 'roc_auc', 'log_loss'
Regression: 'rmse', 'mae', 'r2', 'mape'
output_dir: Where to save trained model (default: ./outputs/autogluon_model)
Returns:
Dictionary with training results, leaderboard, best model info, and feature importance
"""
TabularPredictor, TabularDataset = _ensure_autogluon_tabular()
start_time = time.time()
output_dir = output_dir or "./outputs/autogluon_model"
# ── Validate input ──
if not Path(file_path).exists():
return {"status": "error", "message": f"File not found: {file_path}"}
# ── Load data ──
print(f"\n🚀 AutoGluon Training Starting...")
print(f" 📁 Dataset: {file_path}")
print(f" 🎯 Target: {target_col}")
print(f" ⏱️ Time limit: {time_limit}s")
print(f" 📊 Presets: {presets}")
try:
train_data = TabularDataset(file_path)
except Exception as e:
return {"status": "error", "message": f"Failed to load data: {str(e)}"}
if target_col not in train_data.columns:
return {
"status": "error",
"message": f"Target column '{target_col}' not found. Available: {list(train_data.columns)}"
}
n_rows, n_cols = train_data.shape
print(f" 📐 Shape: {n_rows:,} rows × {n_cols} columns")
# ── Get resource-aware config ──
resource_config = _get_resource_config()
# User overrides take priority
effective_time_limit = min(time_limit, resource_config["time_limit"])
effective_presets = presets
# ── Auto-detect task type ──
if task_type == "auto":
n_unique = train_data[target_col].nunique()
if n_unique <= 20 or train_data[target_col].dtype == 'object':
task_type = "classification"
if n_unique == 2:
task_type_detail = "binary"
else:
task_type_detail = "multiclass"
else:
task_type = "regression"
task_type_detail = "regression"
else:
task_type_detail = task_type
# ── Select eval metric ──
if eval_metric is None:
if task_type == "classification":
eval_metric = "f1_weighted" if task_type_detail == "multiclass" else "f1"
else:
eval_metric = "root_mean_squared_error"
print(f" 🔍 Task type: {task_type_detail}")
print(f" 📏 Eval metric: {eval_metric}")
print(f" 🔧 Excluded models: {resource_config.get('excluded_model_types', [])}")
# ── Clean output directory (AutoGluon needs fresh dir) ──
if Path(output_dir).exists():
shutil.rmtree(output_dir, ignore_errors=True)
# ── Train ──
try:
predictor = TabularPredictor(
label=target_col,
eval_metric=eval_metric,
path=output_dir,
problem_type=task_type if task_type != "auto" else None
)
fit_kwargs = dict(
train_data=train_data,
time_limit=effective_time_limit,
presets=effective_presets,
excluded_model_types=resource_config.get("excluded_model_types", []),
num_cpus=resource_config["num_cpus"],
num_gpus=resource_config["num_gpus"],
verbosity=1
)
if infer_limit is not None:
fit_kwargs["infer_limit"] = infer_limit
predictor.fit(**fit_kwargs)
except Exception as e:
return {"status": "error", "message": f"Training failed: {str(e)}"}
elapsed = time.time() - start_time
# ── Extract results ──
leaderboard = predictor.leaderboard(silent=True)
# Convert leaderboard to serializable format
leaderboard_data = []
for _, row in leaderboard.head(10).iterrows():
entry = {
"model": str(row.get("model", "")),
"score_val": round(float(row.get("score_val", 0)), 4),
"fit_time": round(float(row.get("fit_time", 0)), 1),
"pred_time_val": round(float(row.get("pred_time_val", 0)), 3),
}
if "stack_level" in row:
entry["stack_level"] = int(row["stack_level"])
leaderboard_data.append(entry)
# Best model info
best_model = predictor.model_best
best_score = float(leaderboard.iloc[0]["score_val"]) if len(leaderboard) > 0 else None
# Feature importance (top 20)
feature_importance_data = []
try:
fi = predictor.feature_importance(train_data, silent=True)
for feat, row in fi.head(20).iterrows():
feature_importance_data.append({
"feature": str(feat),
"importance": round(float(row.get("importance", 0)), 4),
"p_value": round(float(row.get("p_value", 1)), 4) if "p_value" in row else None
})
except Exception:
# feature_importance can fail on some model types
pass
# Model count
n_models = len(leaderboard)
# Summary
results = {
"status": "success",
"task_type": task_type_detail,
"eval_metric": eval_metric,
"best_model": best_model,
"best_score": best_score,
"n_models_trained": n_models,
"n_rows": n_rows,
"n_features": n_cols - 1,
"training_time_seconds": round(elapsed, 1),
"time_limit_used": effective_time_limit,
"presets": effective_presets,
"leaderboard": leaderboard_data,
"feature_importance": feature_importance_data,
"model_path": output_dir,
"output_path": output_dir,
}
# ── Print summary ──
print(f"\n{'='*60}")
print(f"✅ AUTOGLUON TRAINING COMPLETE")
print(f"{'='*60}")
print(f"📊 Models trained: {n_models}")
print(f"🏆 Best model: {best_model}")
print(f"📈 Best {eval_metric}: {best_score:.4f}" if best_score else "")
print(f"⏱️ Total time: {elapsed:.1f}s")
print(f"💾 Model saved: {output_dir}")
if leaderboard_data:
print(f"\n📋 Top 5 Leaderboard:")
for i, entry in enumerate(leaderboard_data[:5], 1):
print(f" {i}. {entry['model']}: {entry['score_val']:.4f} (fit: {entry['fit_time']:.1f}s)")
if feature_importance_data:
print(f"\n🔑 Top 5 Features:")
for fi_entry in feature_importance_data[:5]:
print(f" • {fi_entry['feature']}: {fi_entry['importance']:.4f}")
print(f"{'='*60}\n")
return results
def predict_with_autogluon(
model_path: str,
data_path: str,
output_path: Optional[str] = None
) -> Dict[str, Any]:
"""
Make predictions using a trained AutoGluon model.
Args:
model_path: Path to saved AutoGluon model directory
data_path: Path to new data for prediction
output_path: Path to save predictions CSV (optional)
Returns:
Dictionary with predictions and metadata
"""
TabularPredictor, TabularDataset = _ensure_autogluon_tabular()
if not Path(model_path).exists():
return {"status": "error", "message": f"Model not found: {model_path}"}
if not Path(data_path).exists():
return {"status": "error", "message": f"Data not found: {data_path}"}
try:
predictor = TabularPredictor.load(model_path)
test_data = TabularDataset(data_path)
predictions = predictor.predict(test_data)
output_path = output_path or "./outputs/autogluon_predictions.csv"
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
result_df = test_data.copy()
result_df["prediction"] = predictions.values
result_df.to_csv(output_path, index=False)
# Prediction probabilities for classification
probabilities = None
try:
proba = predictor.predict_proba(test_data)
probabilities = {
"columns": list(proba.columns),
"sample": proba.head(5).to_dict()
}
except Exception:
pass
return {
"status": "success",
"n_predictions": len(predictions),
"prediction_sample": predictions.head(10).tolist(),
"output_path": output_path,
"model_used": predictor.model_best,
"probabilities": probabilities
}
except Exception as e:
return {"status": "error", "message": f"Prediction failed: {str(e)}"}
# ============================================================
# TIME SERIES FORECASTING
# ============================================================
def forecast_with_autogluon(
file_path: str,
target_col: str,
time_col: str,
forecast_horizon: int = 30,
id_col: Optional[str] = None,
freq: Optional[str] = None,
time_limit: int = 120,
presets: str = "medium_quality",
output_path: Optional[str] = None,
static_features_path: Optional[str] = None,
known_covariates_cols: Optional[List[str]] = None,
holiday_country: Optional[str] = None,
fill_missing: bool = True,
models: Optional[List[str]] = None,
quantile_levels: Optional[List[float]] = None
) -> Dict[str, Any]:
"""
Forecast time series using AutoGluon's TimeSeriesPredictor.
Supports multiple forecasting models automatically: DeepAR, ETS, ARIMA, Theta,
Chronos (foundation model), and statistical ensembles.
Enhanced with covariates, holiday features, model selection, and quantile forecasting.
Args:
file_path: Path to time series CSV/Parquet
target_col: Column with values to forecast
time_col: Column with timestamps/dates
forecast_horizon: Number of future periods to predict
id_col: Column identifying different series (for multi-series)
freq: Frequency string ('D'=daily, 'h'=hourly, 'MS'=monthly, 'W'=weekly)
time_limit: Max training time in seconds
presets: 'fast_training', 'medium_quality', 'best_quality', or 'chronos_tiny'
output_path: Path to save forecast CSV
static_features_path: CSV with per-series metadata (one row per series)
known_covariates_cols: Columns with future-known values (holidays, promotions)
holiday_country: Country code for auto holiday features (e.g. 'US', 'UK', 'IN')
fill_missing: Whether to auto-fill missing values in time series
models: Specific models to train (e.g. ['ETS', 'DeepAR', 'AutoARIMA'])
quantile_levels: Quantile levels for probabilistic forecasts (e.g. [0.1, 0.5, 0.9])
Returns:
Dictionary with forecasts, model performance, and leaderboard
"""
TimeSeriesPredictor, TimeSeriesDataFrame = _ensure_autogluon_timeseries()
start_time = time.time()
output_dir = "./outputs/autogluon_ts_model"
output_path = output_path or "./outputs/autogluon_forecast.csv"
# ── Validate ──
if not Path(file_path).exists():
return {"status": "error", "message": f"File not found: {file_path}"}
print(f"\n🚀 AutoGluon Time Series Forecasting...")
print(f" 📁 Dataset: {file_path}")
print(f" 🎯 Target: {target_col}")
print(f" 📅 Time column: {time_col}")
print(f" 🔮 Forecast horizon: {forecast_horizon} periods")
# ── Load and prepare data ──
try:
df = pd.read_csv(file_path)
except Exception:
try:
df = pd.read_parquet(file_path)
except Exception as e:
return {"status": "error", "message": f"Failed to load data: {str(e)}"}
if target_col not in df.columns:
return {
"status": "error",
"message": f"Target column '{target_col}' not found. Available: {list(df.columns)}"
}
if time_col not in df.columns:
return {
"status": "error",
"message": f"Time column '{time_col}' not found. Available: {list(df.columns)}"
}
# Parse datetime
df[time_col] = pd.to_datetime(df[time_col])
df = df.sort_values(time_col)
# If no id_col, create a dummy one (single series)
if id_col is None or id_col not in df.columns:
id_col = "__series_id"
df[id_col] = "series_0"
# Auto-detect frequency if not provided
if freq is None:
time_diffs = df[time_col].diff().dropna()
median_diff = time_diffs.median()
if median_diff <= pd.Timedelta(hours=2):
freq = "h"
elif median_diff <= pd.Timedelta(days=1.5):
freq = "D"
elif median_diff <= pd.Timedelta(days=8):
freq = "W"
elif median_diff <= pd.Timedelta(days=35):
freq = "MS"
else:
freq = "D" # Default
print(f" 📊 Frequency: {freq}")
print(f" 📐 Shape: {df.shape[0]:,} rows")
# ── Add holiday features (#29) ──
if holiday_country:
try:
import holidays as holidays_lib
country_holidays = holidays_lib.country_holidays(holiday_country)
df['is_holiday'] = df[time_col].dt.date.apply(
lambda d: 1 if d in country_holidays else 0
).astype(float)
if known_covariates_cols is None:
known_covariates_cols = []
if 'is_holiday' not in known_covariates_cols:
known_covariates_cols.append('is_holiday')
print(f" 🎄 Holiday features added for: {holiday_country}")
except ImportError:
print(f" ⚠️ 'holidays' package not installed. Skipping holiday features.")
except Exception as e:
print(f" ⚠️ Could not add holiday features: {e}")
# ── Convert to TimeSeriesDataFrame ──
try:
ts_df = TimeSeriesDataFrame.from_data_frame(
df,
id_column=id_col,
timestamp_column=time_col
)
except Exception as e:
return {"status": "error", "message": f"Failed to create time series: {str(e)}"}
# ── Attach static features (#26) ──
if static_features_path and Path(static_features_path).exists():
try:
static_df = pd.read_csv(static_features_path)
ts_df.static_features = static_df
print(f" 📌 Static features loaded: {list(static_df.columns)}")
except Exception as e:
print(f" ⚠️ Could not load static features: {e}")
# ── Fill missing values (#36) ──
if fill_missing:
try:
ts_df = ts_df.fill_missing_values()
print(f" 🔧 Missing values filled")
except Exception:
pass
# ── Clean output dir ──
if Path(output_dir).exists():
shutil.rmtree(output_dir, ignore_errors=True)
# ── Get resource config ──
resource_config = _get_resource_config()
effective_time_limit = min(time_limit, resource_config["time_limit"])
# ── Train forecasting models ──
try:
predictor_kwargs = dict(
target=target_col,
prediction_length=forecast_horizon,
path=output_dir,
freq=freq
)
if known_covariates_cols:
predictor_kwargs["known_covariates_names"] = known_covariates_cols
if quantile_levels:
predictor_kwargs["quantile_levels"] = quantile_levels
predictor = TimeSeriesPredictor(**predictor_kwargs)
ts_fit_kwargs = dict(
train_data=ts_df,
time_limit=effective_time_limit,
presets=presets,
)
if models:
ts_fit_kwargs["hyperparameters"] = {m: {} for m in models}
predictor.fit(**ts_fit_kwargs)
except Exception as e:
return {"status": "error", "message": f"Time series training failed: {str(e)}"}
elapsed = time.time() - start_time
# ── Generate forecasts ──
try:
predict_kwargs = {}
if known_covariates_cols:
try:
future_known = predictor.make_future_data_frame(ts_df)
if holiday_country:
import holidays as holidays_lib
country_holidays = holidays_lib.country_holidays(holiday_country)
dates = future_known.index.get_level_values('timestamp')
future_known['is_holiday'] = [
1.0 if d.date() in country_holidays else 0.0 for d in dates
]
predict_kwargs["known_covariates"] = future_known
except Exception:
pass
forecasts = predictor.predict(ts_df, **predict_kwargs)
except Exception as e:
return {"status": "error", "message": f"Forecasting failed: {str(e)}"}
# ── Leaderboard ──
leaderboard = predictor.leaderboard(silent=True)
leaderboard_data = []
for _, row in leaderboard.head(10).iterrows():
leaderboard_data.append({
"model": str(row.get("model", "")),
"score_val": round(float(row.get("score_val", 0)), 4),
"fit_time": round(float(row.get("fit_time", 0)), 1),
})
best_model = predictor.model_best if hasattr(predictor, 'model_best') else leaderboard_data[0]["model"] if leaderboard_data else "unknown"
best_score = leaderboard_data[0]["score_val"] if leaderboard_data else None
# ── Save forecasts ──
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
try:
forecast_df = forecasts.reset_index()
forecast_df.to_csv(output_path, index=False)
except Exception:
output_path = output_path # Keep path but note it may not have saved
# ── Forecast summary ──
forecast_summary = {}
try:
mean_col = "mean" if "mean" in forecasts.columns else forecasts.columns[0]
forecast_values = forecasts[mean_col].values
forecast_summary = {
"mean_forecast": round(float(np.mean(forecast_values)), 2),
"min_forecast": round(float(np.min(forecast_values)), 2),
"max_forecast": round(float(np.max(forecast_values)), 2),
"forecast_std": round(float(np.std(forecast_values)), 2),
}
except Exception:
pass
results = {
"status": "success",
"task_type": "time_series_forecasting",
"target_col": target_col,
"time_col": time_col,
"forecast_horizon": forecast_horizon,
"frequency": freq,
"n_series": df[id_col].nunique() if id_col != "__series_id" else 1,
"n_data_points": len(df),
"best_model": best_model,
"best_score": best_score,
"n_models_trained": len(leaderboard),
"training_time_seconds": round(elapsed, 1),
"leaderboard": leaderboard_data,
"forecast_summary": forecast_summary,
"output_path": output_path,
"model_path": output_dir,
}
# ── Print summary ──
print(f"\n{'='*60}")
print(f"✅ TIME SERIES FORECASTING COMPLETE")
print(f"{'='*60}")
print(f"📊 Models trained: {len(leaderboard)}")
print(f"🏆 Best model: {best_model}")
print(f"📈 Best score: {best_score}")
print(f"🔮 Forecast: {forecast_horizon} periods ahead")
if forecast_summary:
print(f"📉 Forecast range: {forecast_summary.get('min_forecast')} to {forecast_summary.get('max_forecast')}")
print(f"⏱️ Total time: {elapsed:.1f}s")
print(f"💾 Forecasts saved: {output_path}")
if leaderboard_data:
print(f"\n📋 Leaderboard:")
for i, entry in enumerate(leaderboard_data[:5], 1):
print(f" {i}. {entry['model']}: {entry['score_val']:.4f}")
print(f"{'='*60}\n")
return results
# ============================================================
# POST-TRAINING OPTIMIZATION (#1, #2, #6, #8, #9, #24)
# ============================================================
def optimize_autogluon_model(
model_path: str,
operation: str,
data_path: Optional[str] = None,
metric: Optional[str] = None,
models_to_delete: Optional[List[str]] = None,
output_dir: Optional[str] = None
) -> Dict[str, Any]:
"""
Post-training optimization on a trained AutoGluon model.
Operations:
- refit_full: Re-train best models on 100% data (no held-out fold) for deployment
- distill: Compress ensemble into a single lighter model via knowledge distillation
- calibrate_threshold: Optimize binary classification threshold for best F1/precision/recall
- deploy_optimize: Strip training artifacts for minimal deployment footprint
- delete_models: Remove specific models to free resources
Args:
model_path: Path to saved AutoGluon model directory
operation: One of 'refit_full', 'distill', 'calibrate_threshold', 'deploy_optimize', 'delete_models'
data_path: Path to dataset (required for distill, calibrate_threshold)
metric: Metric to optimize for calibrate_threshold: 'f1', 'balanced_accuracy', 'precision', 'recall'
models_to_delete: List of model names to delete (for delete_models operation)
output_dir: Directory for optimized model output (for deploy_optimize)
Returns:
Dictionary with optimization results
"""
TabularPredictor, TabularDataset = _ensure_autogluon_tabular()
if not Path(model_path).exists():
return {"status": "error", "message": f"Model not found: {model_path}"}
try:
predictor = TabularPredictor.load(model_path)
except Exception as e:
return {"status": "error", "message": f"Failed to load model: {str(e)}"}
print(f"\n🔧 AutoGluon Model Optimization: {operation}")
print(f" 📁 Model: {model_path}")
try:
if operation == "refit_full":
refit_map = predictor.refit_full()
refit_models = list(refit_map.values())
new_leaderboard = predictor.leaderboard(silent=True)
leaderboard_data = []
for _, row in new_leaderboard.head(10).iterrows():
leaderboard_data.append({
"model": str(row.get("model", "")),
"score_val": round(float(row.get("score_val", 0)), 4),
})
print(f" ✅ Models refit on 100% data: {refit_models}")
return {
"status": "success",
"operation": "refit_full",
"message": "Models re-trained on 100% data (no held-out folds) for deployment",
"refit_models": refit_models,
"original_best": predictor.model_best,
"leaderboard": leaderboard_data,
"model_path": model_path
}
elif operation == "distill":
if not data_path or not Path(data_path).exists():
return {"status": "error", "message": "data_path required for distillation"}
train_data = TabularDataset(data_path)
resource_config = _get_resource_config()
distilled = predictor.distill(
train_data=train_data,
time_limit=resource_config["time_limit"],
augment_method='spunge'
)
new_leaderboard = predictor.leaderboard(silent=True)
leaderboard_data = []
for _, row in new_leaderboard.head(10).iterrows():
leaderboard_data.append({
"model": str(row.get("model", "")),
"score_val": round(float(row.get("score_val", 0)), 4),
})
print(f" ✅ Ensemble distilled into: {distilled}")
return {
"status": "success",
"operation": "distill",
"message": "Ensemble distilled into lighter model(s) via knowledge distillation",
"distilled_models": distilled,
"best_model": predictor.model_best,
"leaderboard": leaderboard_data,
"model_path": model_path
}
elif operation == "calibrate_threshold":
if not data_path or not Path(data_path).exists():
return {"status": "error", "message": "data_path required for threshold calibration"}
if predictor.problem_type != 'binary':
return {"status": "error", "message": "Threshold calibration only works for binary classification"}
test_data = TabularDataset(data_path)
metric = metric or "f1"
threshold, score = predictor.calibrate_decision_threshold(
data=test_data,
metric=metric
)
print(f" ✅ Optimal threshold: {threshold:.4f} ({metric}={score:.4f})")
return {
"status": "success",
"operation": "calibrate_threshold",
"optimal_threshold": round(float(threshold), 4),
"score_at_threshold": round(float(score), 4),
"metric": metric,
"message": f"Optimal threshold: {threshold:.4f} (default was 0.5), {metric}={score:.4f}",
"model_path": model_path
}
elif operation == "deploy_optimize":
output_dir = output_dir or model_path + "_deploy"
size_before = sum(
f.stat().st_size for f in Path(model_path).rglob('*') if f.is_file()
) / (1024 * 1024)
deploy_path = predictor.clone_for_deployment(output_dir)
deploy_predictor = TabularPredictor.load(deploy_path)
deploy_predictor.save_space()
size_after = sum(
f.stat().st_size for f in Path(deploy_path).rglob('*') if f.is_file()
) / (1024 * 1024)
print(f" ✅ Optimized: {size_before:.1f}MB → {size_after:.1f}MB")
return {
"status": "success",
"operation": "deploy_optimize",
"message": f"Model optimized for deployment: {size_before:.1f}MB → {size_after:.1f}MB ({(1-size_after/max(size_before,0.01))*100:.0f}% reduction)",
"size_before_mb": round(size_before, 1),
"size_after_mb": round(size_after, 1),
"deploy_path": str(deploy_path),
"best_model": deploy_predictor.model_best
}
elif operation == "delete_models":
if not models_to_delete:
return {"status": "error", "message": "models_to_delete list required"}
before_count = len(predictor.model_names())
predictor.delete_models(models_to_delete=models_to_delete, dry_run=False)
after_count = len(predictor.model_names())
print(f" ✅ Deleted {before_count - after_count} models")
return {
"status": "success",
"operation": "delete_models",
"message": f"Deleted {before_count - after_count} models ({before_count}{after_count})",
"remaining_models": predictor.model_names(),
"best_model": predictor.model_best,
"model_path": model_path
}
else:
return {
"status": "error",
"message": f"Unknown operation '{operation}'. Choose: refit_full, distill, calibrate_threshold, deploy_optimize, delete_models"
}
except Exception as e:
return {"status": "error", "message": f"Optimization failed: {str(e)}"}
# ============================================================
# MODEL ANALYSIS & INSPECTION (#19 + extended leaderboard)
# ============================================================
def analyze_autogluon_model(
model_path: str,
data_path: Optional[str] = None,
operation: str = "summary"
) -> Dict[str, Any]:
"""
Inspect and analyze a trained AutoGluon model.
Operations:
- summary: Extended leaderboard with detailed model info (stack levels, memory, etc.)
- transform_features: Returns the internally transformed feature matrix
- info: Comprehensive model metadata and training summary
Args:
model_path: Path to saved AutoGluon model directory
data_path: Path to dataset (required for transform_features)
operation: One of 'summary', 'transform_features', 'info'
Returns:
Dictionary with analysis results
"""
TabularPredictor, TabularDataset = _ensure_autogluon_tabular()
if not Path(model_path).exists():
return {"status": "error", "message": f"Model not found: {model_path}"}
try:
predictor = TabularPredictor.load(model_path)
except Exception as e:
return {"status": "error", "message": f"Failed to load model: {str(e)}"}
try:
if operation == "summary":
leaderboard = predictor.leaderboard(extra_info=True, silent=True)
leaderboard_data = []
for _, row in leaderboard.iterrows():
entry = {"model": str(row.get("model", ""))}
for col in leaderboard.columns:
if col != "model":
val = row[col]
try:
entry[str(col)] = round(float(val), 4) if isinstance(val, (int, float, np.floating)) else str(val)
except (ValueError, TypeError):
entry[str(col)] = str(val)
leaderboard_data.append(entry)
return {
"status": "success",
"operation": "summary",
"best_model": predictor.model_best,
"problem_type": predictor.problem_type,
"eval_metric": str(predictor.eval_metric),
"n_models": len(leaderboard),
"model_names": predictor.model_names(),
"leaderboard": leaderboard_data
}
elif operation == "transform_features":
if not data_path or not Path(data_path).exists():
return {"status": "error", "message": "data_path required for transform_features"}
data = TabularDataset(data_path)
transformed = predictor.transform_features(data)
output_path = "./outputs/autogluon_transformed_features.csv"
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
transformed.to_csv(output_path, index=False)
return {
"status": "success",
"operation": "transform_features",
"original_shape": list(data.shape),
"transformed_shape": list(transformed.shape),
"original_columns": list(data.columns[:20]),
"transformed_columns": list(transformed.columns[:30]),
"output_path": output_path,
"message": f"Features transformed: {data.shape[1]} original → {transformed.shape[1]} engineered"
}
elif operation == "info":
info = predictor.info()
safe_info = {}
for key, val in info.items():
try:
json.dumps(val)
safe_info[key] = val
except (TypeError, ValueError):
safe_info[key] = str(val)
return {
"status": "success",
"operation": "info",
"model_info": safe_info
}
else:
return {
"status": "error",
"message": f"Unknown operation '{operation}'. Choose: summary, transform_features, info"
}
except Exception as e:
return {"status": "error", "message": f"Analysis failed: {str(e)}"}
# ============================================================
# INCREMENTAL TRAINING (#3, #5)
# ============================================================
def extend_autogluon_training(
model_path: str,
operation: str = "fit_extra",
data_path: Optional[str] = None,
time_limit: int = 60,
hyperparameters: Optional[Dict] = None
) -> Dict[str, Any]:
"""
Add models or re-fit ensemble on an existing AutoGluon predictor.
Operations:
- fit_extra: Train additional models/hyperparameters without retraining from scratch
- fit_weighted_ensemble: Re-fit the weighted ensemble layer on existing base models
Args:
model_path: Path to saved AutoGluon model directory
operation: 'fit_extra' or 'fit_weighted_ensemble'
data_path: Path to training data (required for fit_extra)
time_limit: Additional training time in seconds
hyperparameters: Model hyperparameters dict for fit_extra.
e.g. {"GBM": {"num_boost_round": 500}, "RF": {}}
Returns:
Dictionary with updated model info
"""
TabularPredictor, TabularDataset = _ensure_autogluon_tabular()
if not Path(model_path).exists():
return {"status": "error", "message": f"Model not found: {model_path}"}
try:
predictor = TabularPredictor.load(model_path)
except Exception as e:
return {"status": "error", "message": f"Failed to load model: {str(e)}"}
before_models = predictor.model_names()
print(f"\n🔧 Extending AutoGluon Model: {operation}")
print(f" 📁 Model: {model_path}")
print(f" 📊 Current models: {len(before_models)}")
try:
if operation == "fit_extra":
if not data_path or not Path(data_path).exists():
return {"status": "error", "message": "data_path required for fit_extra"}
resource_config = _get_resource_config()
hp = hyperparameters or {
"GBM": [
{"extra_trees": True, "ag_args": {"name_suffix": "XT"}},
{"num_boost_round": 500},
],
"RF": [
{"criterion": "gini", "ag_args": {"name_suffix": "Gini"}},
{"criterion": "entropy", "ag_args": {"name_suffix": "Entr"}},
],
}
predictor.fit_extra(
hyperparameters=hp,
time_limit=min(time_limit, resource_config["time_limit"]),
num_cpus=resource_config["num_cpus"],
num_gpus=0
)
elif operation == "fit_weighted_ensemble":
predictor.fit_weighted_ensemble()
else:
return {
"status": "error",
"message": f"Unknown operation '{operation}'. Choose: fit_extra, fit_weighted_ensemble"
}
after_models = predictor.model_names()
leaderboard = predictor.leaderboard(silent=True)
leaderboard_data = []
for _, row in leaderboard.head(10).iterrows():
leaderboard_data.append({
"model": str(row.get("model", "")),
"score_val": round(float(row.get("score_val", 0)), 4),
"fit_time": round(float(row.get("fit_time", 0)), 1),
})
new_models = [m for m in after_models if m not in before_models]
print(f" ✅ New models added: {len(new_models)}")
print(f" 🏆 Best model: {predictor.model_best}")
return {
"status": "success",
"operation": operation,
"models_before": len(before_models),
"models_after": len(after_models),
"new_models": new_models,
"best_model": predictor.model_best,
"leaderboard": leaderboard_data,
"model_path": model_path
}
except Exception as e:
return {"status": "error", "message": f"Extension failed: {str(e)}"}
# ============================================================
# MULTI-LABEL PREDICTION (#14)
# ============================================================
def train_multilabel_autogluon(
file_path: str,
target_cols: List[str],
time_limit: int = 120,
presets: str = "medium_quality",
output_dir: Optional[str] = None
) -> Dict[str, Any]:
"""
Train multi-label prediction using AutoGluon's MultilabelPredictor.
Predicts multiple target columns simultaneously by training separate
TabularPredictors per label with shared feature engineering.
Args:
file_path: Path to CSV/Parquet dataset
target_cols: List of columns to predict (e.g. ['label1', 'label2', 'label3'])
time_limit: Max training time per label in seconds
presets: Quality preset
output_dir: Where to save trained model
Returns:
Dictionary with per-label results and overall performance
"""
try:
from autogluon.tabular import TabularDataset, MultilabelPredictor
except ImportError:
return {
"status": "error",
"message": "MultilabelPredictor not available. Ensure autogluon.tabular>=1.2 is installed."
}
start_time = time.time()
output_dir = output_dir or "./outputs/autogluon_multilabel"
if not Path(file_path).exists():
return {"status": "error", "message": f"File not found: {file_path}"}
try:
data = TabularDataset(file_path)
except Exception as e:
return {"status": "error", "message": f"Failed to load data: {str(e)}"}
missing_cols = [c for c in target_cols if c not in data.columns]
if missing_cols:
return {
"status": "error",
"message": f"Target columns not found: {missing_cols}. Available: {list(data.columns)}"
}
print(f"\n🚀 AutoGluon Multi-Label Training...")
print(f" 📁 Dataset: {file_path}")
print(f" 🎯 Targets: {target_cols}")
print(f" 📐 Shape: {data.shape[0]:,} rows × {data.shape[1]} columns")
resource_config = _get_resource_config()
effective_time_limit = min(time_limit, resource_config["time_limit"])
if Path(output_dir).exists():
shutil.rmtree(output_dir, ignore_errors=True)
try:
multi_predictor = MultilabelPredictor(
labels=target_cols,
path=output_dir
)
multi_predictor.fit(
train_data=data,
time_limit=effective_time_limit,
presets=presets
)
except Exception as e:
return {"status": "error", "message": f"Multi-label training failed: {str(e)}"}
elapsed = time.time() - start_time
per_label_results = {}
for label in target_cols:
try:
label_predictor = multi_predictor.get_predictor(label)
lb = label_predictor.leaderboard(silent=True)
per_label_results[label] = {
"best_model": label_predictor.model_best,
"best_score": round(float(lb.iloc[0]["score_val"]), 4) if len(lb) > 0 else None,
"n_models": len(lb),
"problem_type": label_predictor.problem_type
}
except Exception:
per_label_results[label] = {"error": "Could not retrieve results"}
print(f"\n{'='*60}")
print(f"✅ MULTI-LABEL TRAINING COMPLETE")
print(f"{'='*60}")
for label, result in per_label_results.items():
score = result.get('best_score', 'N/A')
model = result.get('best_model', 'N/A')
print(f" 🎯 {label}: {model} (score: {score})")
print(f" ⏱️ Total time: {elapsed:.1f}s")
print(f"{'='*60}\n")
return {
"status": "success",
"task_type": "multilabel",
"n_labels": len(target_cols),
"labels": target_cols,
"per_label_results": per_label_results,
"training_time_seconds": round(elapsed, 1),
"model_path": output_dir,
"output_path": output_dir
}
# ============================================================
# TIME SERIES BACKTESTING (#33)
# ============================================================
def backtest_timeseries(
file_path: str,
target_col: str,
time_col: str,
forecast_horizon: int = 30,
id_col: Optional[str] = None,
freq: Optional[str] = None,
num_val_windows: int = 3,
time_limit: int = 120,
presets: str = "medium_quality",
output_path: Optional[str] = None
) -> Dict[str, Any]:
"""
Backtest time series models using multiple validation windows.
Trains models with multi-window cross-validation for robust performance
estimates. More reliable than a single train/test split.
Args:
file_path: Path to time series CSV/Parquet
target_col: Column with values to forecast
time_col: Column with timestamps/dates
forecast_horizon: Periods to predict per window
id_col: Column identifying different series
freq: Frequency string ('D', 'h', 'W', 'MS')
num_val_windows: Number of backtesting windows (default: 3)
time_limit: Max training time in seconds
presets: Quality preset
output_path: Path to save backtest predictions CSV
Returns:
Dictionary with per-window evaluation and aggregate metrics
"""
TimeSeriesPredictor, TimeSeriesDataFrame = _ensure_autogluon_timeseries()
start_time = time.time()
output_dir = "./outputs/autogluon_ts_backtest"
output_path = output_path or "./outputs/autogluon_backtest.csv"
if not Path(file_path).exists():
return {"status": "error", "message": f"File not found: {file_path}"}
print(f"\n📊 Time Series Backtesting ({num_val_windows} windows)...")
print(f" 📁 Dataset: {file_path}")
print(f" 🎯 Target: {target_col}")
print(f" 🔮 Horizon: {forecast_horizon} periods × {num_val_windows} windows")
# Load data
try:
df = pd.read_csv(file_path)
except Exception:
try:
df = pd.read_parquet(file_path)
except Exception as e:
return {"status": "error", "message": f"Failed to load data: {str(e)}"}
if target_col not in df.columns or time_col not in df.columns:
return {"status": "error", "message": f"Columns not found. Available: {list(df.columns)}"}
df[time_col] = pd.to_datetime(df[time_col])
df = df.sort_values(time_col)
if id_col is None or id_col not in df.columns:
id_col_name = "__series_id"
df[id_col_name] = "series_0"
else:
id_col_name = id_col
# Auto-detect frequency
if freq is None:
time_diffs = df[time_col].diff().dropna()
median_diff = time_diffs.median()
if median_diff <= pd.Timedelta(hours=2):
freq = "h"
elif median_diff <= pd.Timedelta(days=1.5):
freq = "D"
elif median_diff <= pd.Timedelta(days=8):
freq = "W"
elif median_diff <= pd.Timedelta(days=35):
freq = "MS"
else:
freq = "D"
try:
ts_df = TimeSeriesDataFrame.from_data_frame(
df, id_column=id_col_name, timestamp_column=time_col
)
except Exception as e:
return {"status": "error", "message": f"Failed to create time series: {str(e)}"}
if Path(output_dir).exists():
shutil.rmtree(output_dir, ignore_errors=True)
resource_config = _get_resource_config()
try:
predictor = TimeSeriesPredictor(
target=target_col,
prediction_length=forecast_horizon,
path=output_dir,
freq=freq
)
predictor.fit(
train_data=ts_df,
time_limit=min(time_limit, resource_config["time_limit"]),
presets=presets,
num_val_windows=num_val_windows
)
except Exception as e:
return {"status": "error", "message": f"Backtest training failed: {str(e)}"}
elapsed = time.time() - start_time
# Get backtest predictions
try:
bt_preds = predictor.backtest_predictions()
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
bt_df = bt_preds.reset_index()
bt_df.to_csv(output_path, index=False)
except Exception:
bt_preds = None
# Leaderboard
leaderboard = predictor.leaderboard(silent=True)
leaderboard_data = []
for _, row in leaderboard.head(10).iterrows():
leaderboard_data.append({
"model": str(row.get("model", "")),
"score_val": round(float(row.get("score_val", 0)), 4),
"fit_time": round(float(row.get("fit_time", 0)), 1),
})
best_model = predictor.model_best if hasattr(predictor, 'model_best') else "unknown"
best_score = leaderboard_data[0]["score_val"] if leaderboard_data else None
print(f"\n{'='*60}")
print(f"✅ BACKTESTING COMPLETE ({num_val_windows} windows)")
print(f"{'='*60}")
print(f"🏆 Best: {best_model} (score: {best_score})")
print(f"⏱️ Time: {elapsed:.1f}s")
print(f"{'='*60}\n")
return {
"status": "success",
"task_type": "backtesting",
"num_val_windows": num_val_windows,
"forecast_horizon": forecast_horizon,
"best_model": best_model,
"best_score": best_score,
"n_models_trained": len(leaderboard),
"training_time_seconds": round(elapsed, 1),
"leaderboard": leaderboard_data,
"output_path": output_path,
"model_path": output_dir
}
# ============================================================
# TIME SERIES ANALYSIS (#34, #35, #37)
# ============================================================
def analyze_timeseries_model(
model_path: str,
data_path: str,
time_col: str,
id_col: Optional[str] = None,
operation: str = "feature_importance",
output_path: Optional[str] = None
) -> Dict[str, Any]:
"""
Analyze a trained AutoGluon time series model.
Operations:
- feature_importance: Permutation importance of covariates
- plot: Generate forecast vs actuals visualization
- make_future_dataframe: Generate future timestamp skeleton for prediction
Args:
model_path: Path to saved AutoGluon TimeSeriesPredictor
data_path: Path to time series data
time_col: Column with timestamps/dates
id_col: Column identifying different series
operation: One of 'feature_importance', 'plot', 'make_future_dataframe'
output_path: Path to save output
Returns:
Dictionary with analysis results
"""
TimeSeriesPredictor, TimeSeriesDataFrame = _ensure_autogluon_timeseries()
if not Path(model_path).exists():
return {"status": "error", "message": f"Model not found: {model_path}"}
if not Path(data_path).exists():
return {"status": "error", "message": f"Data not found: {data_path}"}
try:
predictor = TimeSeriesPredictor.load(model_path)
except Exception as e:
return {"status": "error", "message": f"Failed to load model: {str(e)}"}
# Reconstruct TimeSeriesDataFrame
try:
df = pd.read_csv(data_path)
df[time_col] = pd.to_datetime(df[time_col])
df = df.sort_values(time_col)
if id_col is None or id_col not in df.columns:
id_col_name = "__series_id"
df[id_col_name] = "series_0"
else:
id_col_name = id_col
ts_df = TimeSeriesDataFrame.from_data_frame(
df, id_column=id_col_name, timestamp_column=time_col
)
except Exception as e:
return {"status": "error", "message": f"Failed to create time series data: {str(e)}"}
try:
if operation == "feature_importance":
fi = predictor.feature_importance(ts_df)
fi_data = []
if isinstance(fi, pd.DataFrame):
for feat in fi.index:
row_data = {"feature": str(feat)}
for col in fi.columns:
try:
row_data[str(col)] = round(float(fi.loc[feat, col]), 4)
except (TypeError, ValueError):
row_data[str(col)] = str(fi.loc[feat, col])
fi_data.append(row_data)
return {
"status": "success",
"operation": "feature_importance",
"features": fi_data,
"model_path": model_path,
"message": f"Feature importance computed for {len(fi_data)} features"
}
elif operation == "plot":
output_path = output_path or "./outputs/plots/ts_forecast_plot.png"
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
predictions = predictor.predict(ts_df)
try:
predictor.plot(ts_df, predictions, quantile_levels=[0.1, 0.9])
plt.savefig(output_path, dpi=150, bbox_inches='tight')
plt.close()
except Exception:
# Fallback: manual plot
fig, ax = plt.subplots(figsize=(12, 6))
target = predictor.target
for item_id in list(ts_df.item_ids)[:3]:
actual = ts_df.loc[item_id][target].tail(100)
ax.plot(actual.index, actual.values, label=f'Actual ({item_id})', linewidth=1.5)
if item_id in predictions.item_ids:
pred = predictions.loc[item_id]
mean_col = "mean" if "mean" in pred.columns else pred.columns[0]
ax.plot(pred.index, pred[mean_col].values, '--', label=f'Forecast ({item_id})', linewidth=1.5)
ax.set_title(f'Time Series Forecast - {predictor.model_best}')
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(output_path, dpi=150, bbox_inches='tight')
plt.close()
return {
"status": "success",
"operation": "plot",
"output_path": output_path,
"message": f"Forecast plot saved to {output_path}"
}
elif operation == "make_future_dataframe":
output_path = output_path or "./outputs/future_dataframe.csv"
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
future_df = predictor.make_future_data_frame(ts_df)
future_df.reset_index().to_csv(output_path, index=False)
return {
"status": "success",
"operation": "make_future_dataframe",
"shape": list(future_df.shape),
"columns": list(future_df.columns) if hasattr(future_df, 'columns') else [],
"output_path": output_path,
"message": f"Future dataframe generated: {len(future_df)} rows"
}
else:
return {
"status": "error",
"message": f"Unknown operation '{operation}'. Choose: feature_importance, plot, make_future_dataframe"
}
except Exception as e:
return {"status": "error", "message": f"Analysis failed: {str(e)}"}