File size: 15,878 Bytes
6973475
 
 
 
 
 
 
 
0438355
6973475
0438355
 
6973475
 
 
 
 
 
 
9c191b0
6973475
 
 
 
 
 
 
 
 
 
 
 
bb35191
 
 
 
 
 
 
 
 
 
 
6973475
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bb35191
 
6973475
 
 
 
0438355
6973475
 
0438355
 
 
 
 
 
6973475
 
0438355
 
 
6973475
 
0438355
6973475
 
0438355
6973475
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
84e5a7a
6973475
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
edc9558
6973475
 
 
 
edc9558
fb9037e
 
 
 
 
 
 
 
 
edc9558
 
 
6973475
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
84e5a7a
6973475
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
"""AutoMLOps β€” ML Experiment Tracking & Pipeline Orchestration Platform."""
import os
import json
import threading
from datetime import datetime

import mlflow
import mlflow.sklearn
from flask import Flask, render_template, request, jsonify, redirect, url_for

from mlops.datasets import DATASETS
from mlops.algorithms import algorithms_for_json
from mlops.trainer import (
    training_jobs, automl_jobs,
    start_training, start_automl,
)
from pipelines.dag_engine import pipeline_executions, execute_dag
from pipelines.pipeline_defs import get_pipeline, PIPELINE_BUILDERS

app = Flask(__name__)

# ── MLflow setup ───────────────────────────────────────────────────────────────
TRACKING_URI = "sqlite:///mlflow.db"
mlflow.set_tracking_uri(TRACKING_URI)


def _mlflow_client():
    return mlflow.tracking.MlflowClient(tracking_uri=TRACKING_URI)


# ── Seed demo data on first launch ────────────────────────────────────────────

def _warm_imports():
    """Pre-import heavy ML libraries so the first pipeline run is instant."""
    try:
        import sklearn, sklearn.ensemble, sklearn.preprocessing  # noqa: F401
        import mlflow, mlflow.sklearn                            # noqa: F401
        from mlops.datasets import load_dataset
        load_dataset("Iris Flowers")   # primes sklearn's data cache
    except Exception:
        pass


def _seed_demo():
    """Pre-populate a few MLflow runs so the dashboard looks great immediately."""
    client = _mlflow_client()
    try:
        existing = client.search_runs(experiment_ids=[], max_results=1)
        if existing:
            return  # already seeded
    except Exception:
        pass

    demo_runs = [
        ("Iris Flowers",     "Ensemble / Boosting", "Random Forest",       "classification",
         {"accuracy": 0.9667, "f1_score": 0.9664, "precision": 0.9672, "recall": 0.9667}),
        ("Iris Flowers",     "Ensemble / Boosting", "XGBoost",             "classification",
         {"accuracy": 0.9600, "f1_score": 0.9598, "precision": 0.9601, "recall": 0.9600}),
        ("Iris Flowers",     "Linear Models",       "Logistic Regression", "classification",
         {"accuracy": 0.9467, "f1_score": 0.9463, "precision": 0.9472, "recall": 0.9467}),
        ("Wine Quality",     "Ensemble / Boosting", "LightGBM",            "classification",
         {"accuracy": 0.9722, "f1_score": 0.9720, "precision": 0.9725, "recall": 0.9722}),
        ("Wine Quality",     "Neural Networks",     "MLP (Medium)",        "classification",
         {"accuracy": 0.9444, "f1_score": 0.9441, "precision": 0.9449, "recall": 0.9444}),
        ("Breast Cancer",    "Support Vector Machines", "SVC (RBF Kernel)","classification",
         {"accuracy": 0.9737, "f1_score": 0.9736, "precision": 0.9741, "recall": 0.9737}),
        ("Breast Cancer",    "Ensemble / Boosting", "Gradient Boosting",   "classification",
         {"accuracy": 0.9561, "f1_score": 0.9558, "precision": 0.9565, "recall": 0.9561}),
        ("Diabetes Progression", "Ensemble / Boosting", "XGBoost Regressor","regression",
         {"r2_score": 0.4823, "mae": 44.12, "mse": 3124.5, "rmse": 55.90}),
        ("Diabetes Progression", "Linear Models",   "Ridge Regression",    "regression",
         {"r2_score": 0.4612, "mae": 45.87, "mse": 3258.3, "rmse": 57.08}),
        ("California Housing","Ensemble / Boosting","LightGBM Regressor",  "regression",
         {"r2_score": 0.8341, "mae": 0.3124, "mse": 0.2871, "rmse": 0.5358}),
    ]

    for ds, cat, alg, task, metrics in demo_runs:
        try:
            exp = client.get_experiment_by_name(ds)
            exp_id = exp.experiment_id if exp else mlflow.create_experiment(ds)
            with mlflow.start_run(experiment_id=exp_id,
                                  run_name=f"{alg} β€” {ds}") as run:
                mlflow.set_tags({"algorithm": alg, "category": cat,
                                 "dataset": ds, "task_type": task, "demo": "true"})
                mlflow.log_params({"algorithm": alg, "category": cat, "dataset": ds})
                mlflow.log_metrics(metrics)
        except Exception:
            pass


# Warm imports and seed demo data in background so startup isn't delayed
threading.Thread(target=_warm_imports, daemon=True).start()
threading.Thread(target=_seed_demo, daemon=True).start()


# ══════════════════════════════════════════════════════════════════════════════
#  PAGE ROUTES  (3 pages: Pipeline Studio Β· AutoML Β· Model Registry)
# ══════════════════════════════════════════════════════════════════════════════

def _pipeline_context():
    """Shared context for the Pipeline Studio page."""
    dags = {pid: builder().to_dict() for pid, builder in PIPELINE_BUILDERS.items()}
    datasets_safe = {name: {k: v for k, v in cfg.items() if k != "loader"}
                     for name, cfg in DATASETS.items()}
    return dict(dags=json.dumps(dags), datasets=datasets_safe)


@app.route("/")
def index():
    return render_template("pipeline.html", **_pipeline_context())


# Keep /pipeline working as a permanent redirect to /
@app.route("/pipeline")
def pipeline():
    return redirect(url_for("index"), code=301)


@app.route("/models")
def models():
    client = _mlflow_client()
    try:
        registered = client.search_registered_models()
    except Exception:
        registered = []
    model_list = []
    for m in registered:
        versions = client.get_latest_versions(m.name)
        ver_list = []
        for v in versions:
            run = None
            metrics = {}
            try:
                run  = client.get_run(v.run_id)
                metrics = {k: round(val, 4) for k, val in run.data.metrics.items()}
            except Exception:
                pass
            ver_list.append({
                "version":    v.version,
                "stage":      v.current_stage,
                "run_id":     v.run_id[:8] if v.run_id else "β€”",
                "metrics":    metrics,
                "created_at": datetime.fromtimestamp(v.creation_timestamp / 1000)
                              .strftime("%Y-%m-%d %H:%M")
                              if v.creation_timestamp else "β€”",
            })
        model_list.append({
            "name":        m.name,
            "description": m.description or "β€”",
            "versions":    ver_list,
            "latest_stage": ver_list[0]["stage"] if ver_list else "None",
        })
    return render_template("models.html", models=model_list)


@app.route("/automl")
def automl():
    return render_template("automl.html",
                           datasets=DATASETS,
                           algorithms=algorithms_for_json())


# ══════════════════════════════════════════════════════════════════════════════
#  API β€” TRAINING
# ══════════════════════════════════════════════════════════════════════════════

@app.route("/api/train", methods=["POST"])
def api_train():
    data = request.get_json(force=True)
    required = ["dataset", "algorithm", "category", "task_type"]
    if not all(k in data for k in required):
        return jsonify({"error": f"Missing fields: {required}"}), 400
    job_id = start_training(
        dataset_name=data["dataset"],
        algorithm_name=data["algorithm"],
        algorithm_category=data["category"],
        task_type=data["task_type"],
        custom_params=data.get("params"),
    )
    return jsonify({"job_id": job_id, "status": "queued"})


@app.route("/api/run/<job_id>/status")
def api_run_status(job_id):
    job = training_jobs.get(job_id)
    if not job:
        return jsonify({"error": "Job not found"}), 404
    return jsonify(job)


@app.route("/api/runs")
def api_runs():
    client = _mlflow_client()
    exp_filter = request.args.get("experiment")
    task_filter = request.args.get("task")
    try:
        exp_ids = []
        if exp_filter:
            exp = client.get_experiment_by_name(exp_filter)
            if exp:
                exp_ids = [exp.experiment_id]
        runs = client.search_runs(
            experiment_ids=exp_ids or [],
            max_results=200,
            order_by=["start_time DESC"],
        )
    except Exception:
        runs = []
    result = []
    for r in runs:
        if task_filter and r.data.tags.get("task_type") != task_filter:
            continue
        m = r.data.metrics
        result.append({
            "run_id":    r.info.run_id,
            "algorithm": r.data.tags.get("algorithm", "β€”"),
            "category":  r.data.tags.get("category",  "β€”"),
            "dataset":   r.data.tags.get("dataset",   "β€”"),
            "task_type": r.data.tags.get("task_type", "classification"),
            "metrics":   {k: round(v, 4) for k, v in m.items()},
            "status":    r.info.status,
            "start_time": r.info.start_time,
        })
    return jsonify(result)


# ══════════════════════════════════════════════════════════════════════════════
#  API β€” PIPELINE
# ══════════════════════════════════════════════════════════════════════════════

@app.route("/api/pipeline/<pipeline_id>/execute", methods=["POST"])
def api_pipeline_execute(pipeline_id):
    context = request.get_json(force=True) or {}
    try:
        dag = get_pipeline(pipeline_id)
    except ValueError as e:
        return jsonify({"error": str(e)}), 400

    # Apache Airflow is the primary engine; built-in DAG engine is the fallback.
    try:
        from mlops.airflow_runner import trigger_pipeline
        exec_id = trigger_pipeline(pipeline_id, context=context, dag=dag)
        return jsonify({"exec_id": exec_id, "status": "queued", "engine": "airflow"})
    except ImportError:
        app.logger.warning("Airflow not installed β€” using built-in DAG engine")
    except Exception as af_err:
        app.logger.warning(f"Airflow trigger failed, using built-in engine: {af_err}")

    exec_id = execute_dag(dag, context)
    return jsonify({"exec_id": exec_id, "status": "queued", "engine": "builtin"})


@app.route("/api/pipeline/status/<exec_id>")
def api_pipeline_status(exec_id):
    state = pipeline_executions.get(exec_id)
    if not state:
        return jsonify({"error": "Execution not found"}), 404
    return jsonify(state)


@app.route("/api/pipeline/<pipeline_id>/dag")
def api_pipeline_dag(pipeline_id):
    try:
        dag = get_pipeline(pipeline_id)
    except ValueError as e:
        return jsonify({"error": str(e)}), 400
    return jsonify(dag.to_dict())


# ══════════════════════════════════════════════════════════════════════════════
#  API β€” MODEL REGISTRY
# ══════════════════════════════════════════════════════════════════════════════

@app.route("/api/models/register", methods=["POST"])
def api_models_register():
    data   = request.get_json(force=True)
    run_id = data.get("run_id")
    name   = data.get("name")
    if not run_id or not name:
        return jsonify({"error": "run_id and name required"}), 400
    try:
        client = _mlflow_client()
        run    = client.get_run(run_id)
        model_uri = f"runs:/{run_id}/model"
        result = mlflow.register_model(model_uri, name)
        return jsonify({"name": result.name, "version": result.version,
                        "status": "registered"})
    except Exception as exc:
        return jsonify({"error": str(exc)}), 500


@app.route("/api/models/<name>/<version>/stage", methods=["POST"])
def api_model_stage(name, version):
    data  = request.get_json(force=True)
    stage = data.get("stage", "Staging")
    valid = {"Staging", "Production", "Archived", "None"}
    if stage not in valid:
        return jsonify({"error": f"stage must be one of {valid}"}), 400
    try:
        client = _mlflow_client()
        client.transition_model_version_stage(name=name, version=version,
                                              stage=stage, archive_existing_versions=False)
        return jsonify({"name": name, "version": version, "stage": stage})
    except Exception as exc:
        return jsonify({"error": str(exc)}), 500


# ══════════════════════════════════════════════════════════════════════════════
#  API β€” AUTO-ML
# ══════════════════════════════════════════════════════════════════════════════

@app.route("/api/automl", methods=["POST"])
def api_automl():
    data = request.get_json(force=True)
    if "dataset" not in data or "task_type" not in data:
        return jsonify({"error": "dataset and task_type required"}), 400
    job_id = start_automl(
        dataset_name=data["dataset"],
        task_type=data["task_type"],
        optimize_metric=data.get("metric", "accuracy"),
        max_runs=int(data.get("max_runs", 20)),
    )
    return jsonify({"job_id": job_id, "status": "queued"})


@app.route("/api/automl/status/<job_id>")
def api_automl_status(job_id):
    job = automl_jobs.get(job_id)
    if not job:
        return jsonify({"error": "Job not found"}), 404
    return jsonify(job)


# ══════════════════════════════════════════════════════════════════════════════
#  API β€” META
# ══════════════════════════════════════════════════════════════════════════════

@app.route("/api/algorithms")
def api_algorithms():
    task = request.args.get("task", "classification")
    try:
        return jsonify(algorithms_for_json(task))
    except ValueError as e:
        return jsonify({"error": str(e)}), 400


@app.route("/api/datasets")
def api_datasets():
    result = {
        name: {k: v for k, v in cfg.items() if k != "loader"}
        for name, cfg in DATASETS.items()
    }
    return jsonify(result)




# ── Entry point ────────────────────────────────────────────────────────────────
if __name__ == "__main__":
    app.run(host="0.0.0.0", port=7860, debug=False)