| import json |
| import numpy as np |
| import pandas as pd |
| from collections import Counter |
| from typing import List, Dict, Optional, Any |
|
|
| try: |
| import lightgbm as lgb |
| except Exception: |
| lgb = None |
| from infra.database import get_db, MetaLearningRecord |
|
|
| |
|
|
|
|
| def extract_meta_features(profile: dict) -> dict: |
| """Compute an enhanced meta-feature vector from a dataset profile.""" |
| n_rows = profile.get("rows", 0) |
| n_cols = profile.get("cols", 1) |
| if n_cols == 0: |
| n_cols = 1 |
|
|
| col_stats = profile.get("column_stats", {}) |
|
|
| |
| n_num = len(profile.get("num_cols") or []) |
| n_cat = len(profile.get("cat_cols") or []) |
|
|
| |
| c_binary = c_id = c_datetime = c_continuous = c_discrete = c_nominal = 0 |
| for col, stat in col_stats.items(): |
| stype = stat.get("semantic_type", "") |
| if stype == "Binary": |
| c_binary += 1 |
| elif stype == "ID/Index": |
| c_id += 1 |
| elif stype == "DateTime": |
| c_datetime += 1 |
| elif stype == "Continuous": |
| c_continuous += 1 |
| elif stype == "Discrete/Ordinal": |
| c_discrete += 1 |
| elif stype == "Nominal Category": |
| c_nominal += 1 |
|
|
| return { |
| "n_rows": n_rows, |
| "n_cols": n_cols, |
| "num_ratio": round(n_num / n_cols, 4), |
| "cat_ratio": round(n_cat / n_cols, 4), |
| "binary_ratio": round(c_binary / n_cols, 4), |
| "datetime_ratio": round(c_datetime / n_cols, 4), |
| "continuous_ratio": round(c_continuous / n_cols, 4), |
| "missing_pct": profile.get("missing_pct", 0), |
| "is_imbalanced": 1 if "High" in str(profile.get("imbalance", "")) else 0, |
| } |
|
|
|
|
| class MetaLearner: |
| def __init__(self): |
| self.model = ( |
| lgb.LGBMRegressor( |
| n_estimators=100, learning_rate=0.1, random_state=42 |
| ) |
| if lgb is not None |
| else None |
| ) |
| self.is_trained = False |
| self.min_records = 10 |
| self.val_error = 1.0 |
|
|
| def prepare_data(self, records): |
| data = [] |
|
|
| for r in records: |
| try: |
| mf = json.loads(r.get("meta_features_json") or "{}") |
| leaderboard = json.loads(r.get("leaderboard_json") or "[]") |
| except Exception: |
| continue |
|
|
| for entry in leaderboard: |
| try: |
| row = mf.copy() |
| row["model_type"] = entry.get("model") |
| row["score"] = entry.get("score", 0) |
| data.append(row) |
| except Exception: |
| continue |
|
|
| if not data: |
| return pd.DataFrame(), None |
|
|
| df = pd.DataFrame(data) |
|
|
| |
| try: |
| df["model_type"] = df["model_type"].astype("category") |
| except Exception: |
| pass |
|
|
| return df.drop(columns=["score"], errors="ignore"), df.get("score") |
|
|
| def train(self): |
| with get_db() as db: |
| raw_records = db.query(MetaLearningRecord).all() |
|
|
| records = [ |
| { |
| "meta_features_json": r.meta_features_json, |
| "leaderboard_json": r.leaderboard_json, |
| } |
| for r in raw_records |
| ] |
|
|
| if len(records) < self.min_records: |
| return |
| if self.model is None: |
| return |
|
|
| X, y = self.prepare_data(records) |
| if X.empty: |
| return |
|
|
| self.model.fit(X, y) |
| self.is_trained = True |
| |
| preds = self.model.predict(X) |
| self.val_error = np.mean(np.abs(preds - y)) / 100.0 |
|
|
| def predict_rankings(self, profile: dict, model_pool: List[str]) -> Dict: |
| """Predicts expected scores for each model and returns ranked list.""" |
| if not self.is_trained: |
| return self.get_heuristics(profile, model_pool) |
|
|
| mf = extract_meta_features(profile) |
| pred_data = [mf.copy() for _ in model_pool] |
| for i, m in enumerate(model_pool): |
| pred_data[i]["model_type"] = m |
|
|
| X_pred = pd.DataFrame(pred_data) |
| X_pred["model_type"] = X_pred["model_type"].astype("category") |
|
|
| preds = self.model.predict(X_pred) |
|
|
| |
| confidence = max(0, min(1, 1.0 - self.val_error)) |
|
|
| |
| results = [] |
| for m, score in zip(model_pool, preds): |
| results.append({"model": m, "pred_score": round(float(score), 2)}) |
|
|
| results.sort(key=lambda x: x["pred_score"], reverse=True) |
|
|
| |
| if confidence < 0.6: |
| return self.get_heuristics( |
| profile, model_pool, confidence, "Low Meta-Confidence" |
| ) |
|
|
| return { |
| "rankings": results, |
| "confidence": round(confidence * 100, 1), |
| "source": "LightGBM Meta-Learner", |
| "reason": f"Meta-learner confident ({round(confidence*100)}%) based on {X_pred.shape[0]} historical trials.", |
| } |
|
|
| def get_heuristics( |
| self, profile: dict, model_pool: List[str], confidence=0, reason="Cold Start" |
| ) -> Dict: |
| """Fallback heuristics (the existing rules).""" |
| rows = profile.get("rows", 0) |
| |
| if rows > 10000: |
| pivot = ["XGBoost", "LightGBM", "Random Forest"] |
| elif rows < 500: |
| pivot = ["Logistic Regression", "Ridge", "Random Forest"] |
| else: |
| pivot = ["Random Forest", "XGBoost", "SVM"] |
|
|
| |
| rankings = [] |
| for m in model_pool: |
| score = 70.0 |
| if m in pivot: |
| score += (len(pivot) - pivot.index(m)) * 5 |
| rankings.append({"model": m, "pred_score": score}) |
|
|
| rankings.sort(key=lambda x: x["pred_score"], reverse=True) |
|
|
| return { |
| "rankings": rankings, |
| "confidence": confidence, |
| "source": "Rule-based Heuristics", |
| "reason": f"Initial guess. {reason}", |
| } |
|
|
|
|
| |
| meta_engine = MetaLearner() |
|
|
|
|
| def _default_model_pool(profile: dict) -> List[str]: |
| """Pick a reasonable candidate pool when the API does not specify models.""" |
| cols = profile.get("columns") or [] |
| n = max(len(cols), 1) |
| cat_ratio = len(profile.get("cat_cols") or []) / n |
| rows = profile.get("rows") or 0 |
| if cat_ratio >= 0.25 or rows < 5000: |
| pool = ["Logistic Regression", "Random Forest", "Gradient Boosting", "XGBoost"] |
| if rows < 5000: |
| pool.append("SVM") |
| return pool |
| return [ |
| "Linear Regression", |
| "Ridge Regression", |
| "Lasso Regression", |
| "Random Forest", |
| "Gradient Boosting", |
| "XGBoost", |
| ] |
|
|
|
|
| def get_cross_dataset_insights(profile: dict) -> Dict[str, Any]: |
| """ |
| Summarize historical training runs (meta_learning) and compare to the current profile. |
| """ |
| mf = extract_meta_features(profile) |
| rows_cur = mf["n_rows"] |
| cols_cur = mf["n_cols"] |
|
|
| with get_db() as db: |
| records = ( |
| db.query(MetaLearningRecord) |
| .order_by(MetaLearningRecord.created_at.desc()) |
| .limit(500) |
| .all() |
| ) |
|
|
| if not records: |
| return { |
| "historical_runs": 0, |
| "message": "No historical training runs yet. Finish at least one job to unlock cross-dataset insights.", |
| "your_dataset": {"rows": rows_cur, "columns": cols_cur}, |
| } |
|
|
| model_counts = Counter(r.best_model for r in records if r.best_model) |
| task_counts = Counter(r.task_type for r in records if r.task_type) |
| top_model, top_model_n = model_counts.most_common(1)[0] |
|
|
| rows_hist = [] |
| cols_hist = [] |
| for r in records: |
| try: |
| fj = json.loads(r.meta_features_json) |
| rows_hist.append(int(fj.get("n_rows", 0))) |
| cols_hist.append(int(fj.get("n_cols", 0))) |
| except (TypeError, ValueError, json.JSONDecodeError): |
| continue |
|
|
| def _band(val: int, values: List[int]) -> str: |
| if not values: |
| return "unknown" |
| lo, hi = min(values), max(values) |
| if val < lo: |
| return "smaller than most past runs" |
| if val > hi: |
| return "larger than most past runs" |
| return "within the range seen in past runs" |
|
|
| return { |
| "historical_runs": len(records), |
| "unique_jobs_recorded": len({r.id for r in records}), |
| "task_mix": dict(task_counts), |
| "most_common_winner": {"model": top_model, "count": top_model_n}, |
| "your_dataset": { |
| "rows": rows_cur, |
| "columns": cols_cur, |
| "size_vs_history": { |
| "rows": _band(rows_cur, rows_hist), |
| "columns": _band(cols_cur, cols_hist), |
| }, |
| }, |
| "hint": "Winners reflect your machine's past AutoML jobs on similar-sized data; they are hints, not guarantees.", |
| } |
|
|
|
|
| def zero_shot_recommend(profile: dict, model_pool: Optional[List[str]] = None) -> dict: |
| """Entry point for the recommendation engine.""" |
| pool = model_pool if model_pool else _default_model_pool(profile) |
| if not meta_engine.is_trained: |
| meta_engine.train() |
| return meta_engine.predict_rankings(profile, pool) |
|
|
|
|
| def save_meta_record(profile: dict, results: dict): |
| """Persist a meta-learning record.""" |
| mf = extract_meta_features(profile) |
| with get_db() as db: |
| record = MetaLearningRecord( |
| meta_features_json=json.dumps(mf), |
| best_model=results.get("best_model", ""), |
| best_score=results.get("score", 0), |
| task_type=( |
| "classification" if results.get("is_classification") else "regression" |
| ), |
| metric_name=results.get("metric_name", "Score"), |
| leaderboard_json=json.dumps(results.get("leaderboard", [])), |
| ) |
| db.add(record) |
| db.commit() |
| |
| try: |
| meta_engine.train() |
| except Exception: |
| pass |
|
|