Auto_ML / backend /core /meta_learning.py
abhiraj12's picture
Initial commit
2c29579
import json
import numpy as np
import pandas as pd
from collections import Counter
from typing import List, Dict, Optional, Any
try:
import lightgbm as lgb
except Exception:
lgb = None
from infra.database import get_db, MetaLearningRecord
# ── Meta-Feature Extraction ───────────────────────────────────────────────────
def extract_meta_features(profile: dict) -> dict:
"""Compute an enhanced meta-feature vector from a dataset profile."""
n_rows = profile.get("rows", 0)
n_cols = profile.get("cols", 1) # avoid div by zero inside
if n_cols == 0:
n_cols = 1
col_stats = profile.get("column_stats", {})
# Extract structural ratios
n_num = len(profile.get("num_cols") or [])
n_cat = len(profile.get("cat_cols") or [])
# Extract semantic ratios
c_binary = c_id = c_datetime = c_continuous = c_discrete = c_nominal = 0
for col, stat in col_stats.items():
stype = stat.get("semantic_type", "")
if stype == "Binary":
c_binary += 1
elif stype == "ID/Index":
c_id += 1
elif stype == "DateTime":
c_datetime += 1
elif stype == "Continuous":
c_continuous += 1
elif stype == "Discrete/Ordinal":
c_discrete += 1
elif stype == "Nominal Category":
c_nominal += 1
return {
"n_rows": n_rows,
"n_cols": n_cols,
"num_ratio": round(n_num / n_cols, 4),
"cat_ratio": round(n_cat / n_cols, 4),
"binary_ratio": round(c_binary / n_cols, 4),
"datetime_ratio": round(c_datetime / n_cols, 4),
"continuous_ratio": round(c_continuous / n_cols, 4),
"missing_pct": profile.get("missing_pct", 0),
"is_imbalanced": 1 if "High" in str(profile.get("imbalance", "")) else 0,
}
class MetaLearner:
def __init__(self):
self.model = (
lgb.LGBMRegressor(
n_estimators=100, learning_rate=0.1, random_state=42
)
if lgb is not None
else None
)
self.is_trained = False
self.min_records = 10
self.val_error = 1.0 # High error initially
def prepare_data(self, records):
data = []
for r in records:
try:
mf = json.loads(r.get("meta_features_json") or "{}")
leaderboard = json.loads(r.get("leaderboard_json") or "[]")
except Exception:
continue
for entry in leaderboard:
try:
row = mf.copy()
row["model_type"] = entry.get("model")
row["score"] = entry.get("score", 0)
data.append(row)
except Exception:
continue
if not data:
return pd.DataFrame(), None
df = pd.DataFrame(data)
# Encode model_type safely
try:
df["model_type"] = df["model_type"].astype("category")
except Exception:
pass
return df.drop(columns=["score"], errors="ignore"), df.get("score")
def train(self):
with get_db() as db:
raw_records = db.query(MetaLearningRecord).all()
records = [
{
"meta_features_json": r.meta_features_json,
"leaderboard_json": r.leaderboard_json,
}
for r in raw_records
]
if len(records) < self.min_records:
return
if self.model is None:
return
X, y = self.prepare_data(records)
if X.empty:
return
self.model.fit(X, y)
self.is_trained = True
# Simple heuristic for validation error: use 10% of training data as pseudo-val
preds = self.model.predict(X)
self.val_error = np.mean(np.abs(preds - y)) / 100.0 # Normalized error
def predict_rankings(self, profile: dict, model_pool: List[str]) -> Dict:
"""Predicts expected scores for each model and returns ranked list."""
if not self.is_trained:
return self.get_heuristics(profile, model_pool)
mf = extract_meta_features(profile)
pred_data = [mf.copy() for _ in model_pool]
for i, m in enumerate(model_pool):
pred_data[i]["model_type"] = m
X_pred = pd.DataFrame(pred_data)
X_pred["model_type"] = X_pred["model_type"].astype("category")
preds = self.model.predict(X_pred)
# Confidence logic: 1.0 - val_error
confidence = max(0, min(1, 1.0 - self.val_error))
# Rankings
results = []
for m, score in zip(model_pool, preds):
results.append({"model": m, "pred_score": round(float(score), 2)})
results.sort(key=lambda x: x["pred_score"], reverse=True)
# Switch to heuristics if confidence is too low
if confidence < 0.6:
return self.get_heuristics(
profile, model_pool, confidence, "Low Meta-Confidence"
)
return {
"rankings": results,
"confidence": round(confidence * 100, 1),
"source": "LightGBM Meta-Learner",
"reason": f"Meta-learner confident ({round(confidence*100)}%) based on {X_pred.shape[0]} historical trials.",
}
def get_heuristics(
self, profile: dict, model_pool: List[str], confidence=0, reason="Cold Start"
) -> Dict:
"""Fallback heuristics (the existing rules)."""
rows = profile.get("rows", 0)
# Simple ordering based on rows
if rows > 10000:
pivot = ["XGBoost", "LightGBM", "Random Forest"]
elif rows < 500:
pivot = ["Logistic Regression", "Ridge", "Random Forest"]
else:
pivot = ["Random Forest", "XGBoost", "SVM"]
# Sort pool based on pivot
rankings = []
for m in model_pool:
score = 70.0 # Default base
if m in pivot:
score += (len(pivot) - pivot.index(m)) * 5
rankings.append({"model": m, "pred_score": score})
rankings.sort(key=lambda x: x["pred_score"], reverse=True)
return {
"rankings": rankings,
"confidence": confidence,
"source": "Rule-based Heuristics",
"reason": f"Initial guess. {reason}",
}
# Singleton instance for the system
meta_engine = MetaLearner()
def _default_model_pool(profile: dict) -> List[str]:
"""Pick a reasonable candidate pool when the API does not specify models."""
cols = profile.get("columns") or []
n = max(len(cols), 1)
cat_ratio = len(profile.get("cat_cols") or []) / n
rows = profile.get("rows") or 0
if cat_ratio >= 0.25 or rows < 5000:
pool = ["Logistic Regression", "Random Forest", "Gradient Boosting", "XGBoost"]
if rows < 5000:
pool.append("SVM")
return pool
return [
"Linear Regression",
"Ridge Regression",
"Lasso Regression",
"Random Forest",
"Gradient Boosting",
"XGBoost",
]
def get_cross_dataset_insights(profile: dict) -> Dict[str, Any]:
"""
Summarize historical training runs (meta_learning) and compare to the current profile.
"""
mf = extract_meta_features(profile)
rows_cur = mf["n_rows"]
cols_cur = mf["n_cols"]
with get_db() as db:
records = (
db.query(MetaLearningRecord)
.order_by(MetaLearningRecord.created_at.desc())
.limit(500)
.all()
)
if not records:
return {
"historical_runs": 0,
"message": "No historical training runs yet. Finish at least one job to unlock cross-dataset insights.",
"your_dataset": {"rows": rows_cur, "columns": cols_cur},
}
model_counts = Counter(r.best_model for r in records if r.best_model)
task_counts = Counter(r.task_type for r in records if r.task_type)
top_model, top_model_n = model_counts.most_common(1)[0]
rows_hist = []
cols_hist = []
for r in records:
try:
fj = json.loads(r.meta_features_json)
rows_hist.append(int(fj.get("n_rows", 0)))
cols_hist.append(int(fj.get("n_cols", 0)))
except (TypeError, ValueError, json.JSONDecodeError):
continue
def _band(val: int, values: List[int]) -> str:
if not values:
return "unknown"
lo, hi = min(values), max(values)
if val < lo:
return "smaller than most past runs"
if val > hi:
return "larger than most past runs"
return "within the range seen in past runs"
return {
"historical_runs": len(records),
"unique_jobs_recorded": len({r.id for r in records}),
"task_mix": dict(task_counts),
"most_common_winner": {"model": top_model, "count": top_model_n},
"your_dataset": {
"rows": rows_cur,
"columns": cols_cur,
"size_vs_history": {
"rows": _band(rows_cur, rows_hist),
"columns": _band(cols_cur, cols_hist),
},
},
"hint": "Winners reflect your machine's past AutoML jobs on similar-sized data; they are hints, not guarantees.",
}
def zero_shot_recommend(profile: dict, model_pool: Optional[List[str]] = None) -> dict:
"""Entry point for the recommendation engine."""
pool = model_pool if model_pool else _default_model_pool(profile)
if not meta_engine.is_trained:
meta_engine.train()
return meta_engine.predict_rankings(profile, pool)
def save_meta_record(profile: dict, results: dict):
"""Persist a meta-learning record."""
mf = extract_meta_features(profile)
with get_db() as db:
record = MetaLearningRecord(
meta_features_json=json.dumps(mf),
best_model=results.get("best_model", ""),
best_score=results.get("score", 0),
task_type=(
"classification" if results.get("is_classification") else "regression"
),
metric_name=results.get("metric_name", "Score"),
leaderboard_json=json.dumps(results.get("leaderboard", [])),
)
db.add(record)
db.commit()
# Trigger retrain
try:
meta_engine.train()
except Exception:
pass