Spaces:
Sleeping
Sleeping
| """AutoMLOps β ML Experiment Tracking & Pipeline Orchestration Platform.""" | |
| import os | |
| import json | |
| import threading | |
| from datetime import datetime | |
| import mlflow | |
| import mlflow.sklearn | |
| from flask import Flask, render_template, request, jsonify, redirect, url_for | |
| from mlops.datasets import DATASETS | |
| from mlops.algorithms import algorithms_for_json | |
| from mlops.trainer import ( | |
| training_jobs, automl_jobs, | |
| start_training, start_automl, | |
| ) | |
| from pipelines.dag_engine import pipeline_executions, execute_dag | |
| from pipelines.pipeline_defs import get_pipeline, PIPELINE_BUILDERS | |
| app = Flask(__name__) | |
| # ββ MLflow setup βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| TRACKING_URI = "sqlite:///mlflow.db" | |
| mlflow.set_tracking_uri(TRACKING_URI) | |
| def _mlflow_client(): | |
| return mlflow.tracking.MlflowClient(tracking_uri=TRACKING_URI) | |
| # ββ Seed demo data on first launch ββββββββββββββββββββββββββββββββββββββββββββ | |
| def _warm_imports(): | |
| """Pre-import heavy ML libraries so the first pipeline run is instant.""" | |
| try: | |
| import sklearn, sklearn.ensemble, sklearn.preprocessing # noqa: F401 | |
| import mlflow, mlflow.sklearn # noqa: F401 | |
| from mlops.datasets import load_dataset | |
| load_dataset("Iris Flowers") # primes sklearn's data cache | |
| except Exception: | |
| pass | |
| def _seed_demo(): | |
| """Pre-populate a few MLflow runs so the dashboard looks great immediately.""" | |
| client = _mlflow_client() | |
| try: | |
| existing = client.search_runs(experiment_ids=[], max_results=1) | |
| if existing: | |
| return # already seeded | |
| except Exception: | |
| pass | |
| demo_runs = [ | |
| ("Iris Flowers", "Ensemble / Boosting", "Random Forest", "classification", | |
| {"accuracy": 0.9667, "f1_score": 0.9664, "precision": 0.9672, "recall": 0.9667}), | |
| ("Iris Flowers", "Ensemble / Boosting", "XGBoost", "classification", | |
| {"accuracy": 0.9600, "f1_score": 0.9598, "precision": 0.9601, "recall": 0.9600}), | |
| ("Iris Flowers", "Linear Models", "Logistic Regression", "classification", | |
| {"accuracy": 0.9467, "f1_score": 0.9463, "precision": 0.9472, "recall": 0.9467}), | |
| ("Wine Quality", "Ensemble / Boosting", "LightGBM", "classification", | |
| {"accuracy": 0.9722, "f1_score": 0.9720, "precision": 0.9725, "recall": 0.9722}), | |
| ("Wine Quality", "Neural Networks", "MLP (Medium)", "classification", | |
| {"accuracy": 0.9444, "f1_score": 0.9441, "precision": 0.9449, "recall": 0.9444}), | |
| ("Breast Cancer", "Support Vector Machines", "SVC (RBF Kernel)","classification", | |
| {"accuracy": 0.9737, "f1_score": 0.9736, "precision": 0.9741, "recall": 0.9737}), | |
| ("Breast Cancer", "Ensemble / Boosting", "Gradient Boosting", "classification", | |
| {"accuracy": 0.9561, "f1_score": 0.9558, "precision": 0.9565, "recall": 0.9561}), | |
| ("Diabetes Progression", "Ensemble / Boosting", "XGBoost Regressor","regression", | |
| {"r2_score": 0.4823, "mae": 44.12, "mse": 3124.5, "rmse": 55.90}), | |
| ("Diabetes Progression", "Linear Models", "Ridge Regression", "regression", | |
| {"r2_score": 0.4612, "mae": 45.87, "mse": 3258.3, "rmse": 57.08}), | |
| ("California Housing","Ensemble / Boosting","LightGBM Regressor", "regression", | |
| {"r2_score": 0.8341, "mae": 0.3124, "mse": 0.2871, "rmse": 0.5358}), | |
| ] | |
| for ds, cat, alg, task, metrics in demo_runs: | |
| try: | |
| exp = client.get_experiment_by_name(ds) | |
| exp_id = exp.experiment_id if exp else mlflow.create_experiment(ds) | |
| with mlflow.start_run(experiment_id=exp_id, | |
| run_name=f"{alg} β {ds}") as run: | |
| mlflow.set_tags({"algorithm": alg, "category": cat, | |
| "dataset": ds, "task_type": task, "demo": "true"}) | |
| mlflow.log_params({"algorithm": alg, "category": cat, "dataset": ds}) | |
| mlflow.log_metrics(metrics) | |
| except Exception: | |
| pass | |
| # Warm imports and seed demo data in background so startup isn't delayed | |
| threading.Thread(target=_warm_imports, daemon=True).start() | |
| threading.Thread(target=_seed_demo, daemon=True).start() | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # PAGE ROUTES (3 pages: Pipeline Studio Β· AutoML Β· Model Registry) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _pipeline_context(): | |
| """Shared context for the Pipeline Studio page.""" | |
| dags = {pid: builder().to_dict() for pid, builder in PIPELINE_BUILDERS.items()} | |
| datasets_safe = {name: {k: v for k, v in cfg.items() if k != "loader"} | |
| for name, cfg in DATASETS.items()} | |
| return dict(dags=json.dumps(dags), datasets=datasets_safe) | |
| def index(): | |
| return render_template("pipeline.html", **_pipeline_context()) | |
| # Keep /pipeline working as a permanent redirect to / | |
| def pipeline(): | |
| return redirect(url_for("index"), code=301) | |
| def models(): | |
| client = _mlflow_client() | |
| try: | |
| registered = client.search_registered_models() | |
| except Exception: | |
| registered = [] | |
| model_list = [] | |
| for m in registered: | |
| versions = client.get_latest_versions(m.name) | |
| ver_list = [] | |
| for v in versions: | |
| run = None | |
| metrics = {} | |
| try: | |
| run = client.get_run(v.run_id) | |
| metrics = {k: round(val, 4) for k, val in run.data.metrics.items()} | |
| except Exception: | |
| pass | |
| ver_list.append({ | |
| "version": v.version, | |
| "stage": v.current_stage, | |
| "run_id": v.run_id[:8] if v.run_id else "β", | |
| "metrics": metrics, | |
| "created_at": datetime.fromtimestamp(v.creation_timestamp / 1000) | |
| .strftime("%Y-%m-%d %H:%M") | |
| if v.creation_timestamp else "β", | |
| }) | |
| model_list.append({ | |
| "name": m.name, | |
| "description": m.description or "β", | |
| "versions": ver_list, | |
| "latest_stage": ver_list[0]["stage"] if ver_list else "None", | |
| }) | |
| return render_template("models.html", models=model_list) | |
| def automl(): | |
| return render_template("automl.html", | |
| datasets=DATASETS, | |
| algorithms=algorithms_for_json()) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # API β TRAINING | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def api_train(): | |
| data = request.get_json(force=True) | |
| required = ["dataset", "algorithm", "category", "task_type"] | |
| if not all(k in data for k in required): | |
| return jsonify({"error": f"Missing fields: {required}"}), 400 | |
| job_id = start_training( | |
| dataset_name=data["dataset"], | |
| algorithm_name=data["algorithm"], | |
| algorithm_category=data["category"], | |
| task_type=data["task_type"], | |
| custom_params=data.get("params"), | |
| ) | |
| return jsonify({"job_id": job_id, "status": "queued"}) | |
| def api_run_status(job_id): | |
| job = training_jobs.get(job_id) | |
| if not job: | |
| return jsonify({"error": "Job not found"}), 404 | |
| return jsonify(job) | |
| def api_runs(): | |
| client = _mlflow_client() | |
| exp_filter = request.args.get("experiment") | |
| task_filter = request.args.get("task") | |
| try: | |
| exp_ids = [] | |
| if exp_filter: | |
| exp = client.get_experiment_by_name(exp_filter) | |
| if exp: | |
| exp_ids = [exp.experiment_id] | |
| runs = client.search_runs( | |
| experiment_ids=exp_ids or [], | |
| max_results=200, | |
| order_by=["start_time DESC"], | |
| ) | |
| except Exception: | |
| runs = [] | |
| result = [] | |
| for r in runs: | |
| if task_filter and r.data.tags.get("task_type") != task_filter: | |
| continue | |
| m = r.data.metrics | |
| result.append({ | |
| "run_id": r.info.run_id, | |
| "algorithm": r.data.tags.get("algorithm", "β"), | |
| "category": r.data.tags.get("category", "β"), | |
| "dataset": r.data.tags.get("dataset", "β"), | |
| "task_type": r.data.tags.get("task_type", "classification"), | |
| "metrics": {k: round(v, 4) for k, v in m.items()}, | |
| "status": r.info.status, | |
| "start_time": r.info.start_time, | |
| }) | |
| return jsonify(result) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # API β PIPELINE | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def api_pipeline_execute(pipeline_id): | |
| context = request.get_json(force=True) or {} | |
| try: | |
| dag = get_pipeline(pipeline_id) | |
| except ValueError as e: | |
| return jsonify({"error": str(e)}), 400 | |
| # Apache Airflow is the primary engine; built-in DAG engine is the fallback. | |
| try: | |
| from mlops.airflow_runner import trigger_pipeline | |
| exec_id = trigger_pipeline(pipeline_id, context=context, dag=dag) | |
| return jsonify({"exec_id": exec_id, "status": "queued", "engine": "airflow"}) | |
| except ImportError: | |
| app.logger.warning("Airflow not installed β using built-in DAG engine") | |
| except Exception as af_err: | |
| app.logger.warning(f"Airflow trigger failed, using built-in engine: {af_err}") | |
| exec_id = execute_dag(dag, context) | |
| return jsonify({"exec_id": exec_id, "status": "queued", "engine": "builtin"}) | |
| def api_pipeline_status(exec_id): | |
| state = pipeline_executions.get(exec_id) | |
| if not state: | |
| return jsonify({"error": "Execution not found"}), 404 | |
| return jsonify(state) | |
| def api_pipeline_dag(pipeline_id): | |
| try: | |
| dag = get_pipeline(pipeline_id) | |
| except ValueError as e: | |
| return jsonify({"error": str(e)}), 400 | |
| return jsonify(dag.to_dict()) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # API β MODEL REGISTRY | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def api_models_register(): | |
| data = request.get_json(force=True) | |
| run_id = data.get("run_id") | |
| name = data.get("name") | |
| if not run_id or not name: | |
| return jsonify({"error": "run_id and name required"}), 400 | |
| try: | |
| client = _mlflow_client() | |
| run = client.get_run(run_id) | |
| model_uri = f"runs:/{run_id}/model" | |
| result = mlflow.register_model(model_uri, name) | |
| return jsonify({"name": result.name, "version": result.version, | |
| "status": "registered"}) | |
| except Exception as exc: | |
| return jsonify({"error": str(exc)}), 500 | |
| def api_model_stage(name, version): | |
| data = request.get_json(force=True) | |
| stage = data.get("stage", "Staging") | |
| valid = {"Staging", "Production", "Archived", "None"} | |
| if stage not in valid: | |
| return jsonify({"error": f"stage must be one of {valid}"}), 400 | |
| try: | |
| client = _mlflow_client() | |
| client.transition_model_version_stage(name=name, version=version, | |
| stage=stage, archive_existing_versions=False) | |
| return jsonify({"name": name, "version": version, "stage": stage}) | |
| except Exception as exc: | |
| return jsonify({"error": str(exc)}), 500 | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # API β AUTO-ML | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def api_automl(): | |
| data = request.get_json(force=True) | |
| if "dataset" not in data or "task_type" not in data: | |
| return jsonify({"error": "dataset and task_type required"}), 400 | |
| job_id = start_automl( | |
| dataset_name=data["dataset"], | |
| task_type=data["task_type"], | |
| optimize_metric=data.get("metric", "accuracy"), | |
| max_runs=int(data.get("max_runs", 20)), | |
| ) | |
| return jsonify({"job_id": job_id, "status": "queued"}) | |
| def api_automl_status(job_id): | |
| job = automl_jobs.get(job_id) | |
| if not job: | |
| return jsonify({"error": "Job not found"}), 404 | |
| return jsonify(job) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # API β META | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def api_algorithms(): | |
| task = request.args.get("task", "classification") | |
| try: | |
| return jsonify(algorithms_for_json(task)) | |
| except ValueError as e: | |
| return jsonify({"error": str(e)}), 400 | |
| def api_datasets(): | |
| result = { | |
| name: {k: v for k, v in cfg.items() if k != "loader"} | |
| for name, cfg in DATASETS.items() | |
| } | |
| return jsonify(result) | |
| # ββ Entry point ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if __name__ == "__main__": | |
| app.run(host="0.0.0.0", port=7860, debug=False) | |