mnoorchenar commited on
Commit
bb35191
Β·
1 Parent(s): a698773

Update 2026-03-26 17:41:53

Browse files
app.py CHANGED
@@ -30,6 +30,17 @@ def _mlflow_client():
30
 
31
  # ── Seed demo data on first launch ────────────────────────────────────────────
32
 
 
 
 
 
 
 
 
 
 
 
 
33
  def _seed_demo():
34
  """Pre-populate a few MLflow runs so the dashboard looks great immediately."""
35
  client = _mlflow_client()
@@ -77,7 +88,8 @@ def _seed_demo():
77
  pass
78
 
79
 
80
- # Seed in background so startup isn't delayed
 
81
  threading.Thread(target=_seed_demo, daemon=True).start()
82
 
83
 
@@ -224,16 +236,15 @@ def api_pipeline_execute(pipeline_id):
224
  except ValueError as e:
225
  return jsonify({"error": str(e)}), 400
226
 
227
- # Use Apache Airflow. Falls back to the built-in engine only if Airflow
228
- # is not importable (i.e. not installed at all β€” should not happen in prod).
229
- try:
230
- from mlops.airflow_runner import trigger_pipeline
231
- exec_id = trigger_pipeline(pipeline_id, context=context, dag=dag)
232
- return jsonify({"exec_id": exec_id, "status": "queued", "engine": "airflow"})
233
- except ImportError:
234
- app.logger.warning("Airflow not installed β€” using built-in DAG engine")
235
- except Exception as af_err:
236
- app.logger.warning(f"Airflow trigger failed, falling back to built-in engine: {af_err}")
237
 
238
  exec_id = execute_dag(dag, context)
239
  return jsonify({"exec_id": exec_id, "status": "queued", "engine": "builtin"})
 
30
 
31
  # ── Seed demo data on first launch ────────────────────────────────────────────
32
 
33
+ def _warm_imports():
34
+ """Pre-import heavy ML libraries so the first pipeline run is instant."""
35
+ try:
36
+ import sklearn, sklearn.ensemble, sklearn.preprocessing # noqa: F401
37
+ import mlflow, mlflow.sklearn # noqa: F401
38
+ from mlops.datasets import load_dataset
39
+ load_dataset("Iris Flowers") # primes sklearn's data cache
40
+ except Exception:
41
+ pass
42
+
43
+
44
  def _seed_demo():
45
  """Pre-populate a few MLflow runs so the dashboard looks great immediately."""
46
  client = _mlflow_client()
 
88
  pass
89
 
90
 
91
+ # Warm imports and seed demo data in background so startup isn't delayed
92
+ threading.Thread(target=_warm_imports, daemon=True).start()
93
  threading.Thread(target=_seed_demo, daemon=True).start()
94
 
95
 
 
236
  except ValueError as e:
237
  return jsonify({"error": str(e)}), 400
238
 
239
+ # Built-in engine is the default β€” zero scheduler latency, runs immediately.
240
+ # Set USE_AIRFLOW=true in the environment to hand off to Apache Airflow instead.
241
+ if os.environ.get("USE_AIRFLOW", "").lower() in ("1", "true"):
242
+ try:
243
+ from mlops.airflow_runner import trigger_pipeline
244
+ exec_id = trigger_pipeline(pipeline_id, context=context, dag=dag)
245
+ return jsonify({"exec_id": exec_id, "status": "queued", "engine": "airflow"})
246
+ except Exception as af_err:
247
+ app.logger.warning(f"Airflow trigger failed, falling back to built-in engine: {af_err}")
 
248
 
249
  exec_id = execute_dag(dag, context)
250
  return jsonify({"exec_id": exec_id, "status": "queued", "engine": "builtin"})
pipelines/dag_engine.py CHANGED
@@ -141,7 +141,7 @@ def _run_dag(exec_id: str, dag: DAG, context: dict):
141
  progress = int(100 * (step_idx + 1) / total)
142
  _upd(progress=progress)
143
 
144
- time.sleep(0.4) # small delay so the UI can animate
145
 
146
  _upd(status="completed", progress=100,
147
  completed_at=datetime.utcnow().isoformat())
 
141
  progress = int(100 * (step_idx + 1) / total)
142
  _upd(progress=progress)
143
 
144
+ time.sleep(0.1) # small delay so the UI can animate
145
 
146
  _upd(status="completed", progress=100,
147
  completed_at=datetime.utcnow().isoformat())
pipelines/pipeline_defs.py CHANGED
@@ -1,5 +1,4 @@
1
  """Pre-built ML pipeline DAG definitions."""
2
- import time
3
  import numpy as np
4
  from pipelines.dag_engine import DAG, Task
5
 
@@ -18,21 +17,18 @@ def _load_data(ctx, _results):
18
  def _validate_data(ctx, results):
19
  log = ctx.get("_log")
20
  if log: log("Checking schema, nulls, and feature ranges…")
21
- time.sleep(0.2)
22
  if log: log("No nulls found Β· All feature ranges valid")
23
  return "Schema OK Β· No nulls detected Β· Feature ranges valid"
24
 
25
  def _preprocess(ctx, results):
26
  log = ctx.get("_log")
27
  if log: log("Fitting StandardScaler on training split…")
28
- time.sleep(0.3)
29
  if log: log("80/20 stratified train/test split applied")
30
  return "StandardScaler fitted Β· Train/test split 80/20"
31
 
32
  def _feature_engineering(ctx, results):
33
  log = ctx.get("_log")
34
  if log: log("Evaluating polynomial and interaction features…")
35
- time.sleep(0.2)
36
  if log: log("No additional features needed Β· all originals retained")
37
  return "Polynomial features skipped Β· All features retained"
38
 
@@ -65,79 +61,64 @@ def _train_model(ctx, results):
65
  def _evaluate_model(ctx, results):
66
  log = ctx.get("_log")
67
  if log: log("Computing accuracy / RΒ² on hold-out set…")
68
- time.sleep(0.2)
69
  if log: log("5-fold cross-validation passed")
70
  return "Accuracy / RΒ² computed Β· Cross-val 5-fold done"
71
 
72
  def _generate_report(ctx, results):
73
  log = ctx.get("_log")
74
  if log: log("Writing evaluation artefacts to MLflow…")
75
- time.sleep(0.15)
76
  return "HTML report generated Β· Metrics written to mlflow"
77
 
78
  def _register_model(ctx, _results):
79
  log = ctx.get("_log")
80
  if log: log("Pushing model artifact to MLflow Model Registry…")
81
- time.sleep(0.1)
82
  return "Model artifact registered in MLflow Model Registry"
83
 
84
  def _deploy_staging(ctx, _results):
85
  log = ctx.get("_log")
86
  if log: log("Transitioning model version to Staging…")
87
- time.sleep(0.2)
88
  if log: log("REST endpoint ready")
89
  return "Model transitioned to Staging Β· REST endpoint ready"
90
 
91
  # ── Retraining pipeline tasks ──────────────────────────────────────────────────
92
 
93
  def _check_drift(ctx, _):
94
- time.sleep(0.2)
95
  drift = round(np.random.uniform(0.01, 0.08), 4)
96
  return f"PSI={drift} Β· {'Drift detected β€” retraining triggered' if drift > 0.05 else 'No drift Β· pipeline skipped'}"
97
 
98
  def _fetch_new_data(ctx, _):
99
- time.sleep(0.3)
100
  n = np.random.randint(200, 800)
101
  return f"{n} new labelled samples fetched from data store"
102
 
103
  def _merge_datasets(ctx, _):
104
- time.sleep(0.2)
105
  return "New data merged with historical Β· duplicates removed"
106
 
107
  def _retrain_champion(ctx, _):
108
- time.sleep(0.4)
109
  acc = round(np.random.uniform(0.88, 0.97), 4)
110
  return f"Champion model retrained Β· new accuracy={acc}"
111
 
112
  def _ab_test(ctx, _):
113
- time.sleep(0.2)
114
  return "A/B test scheduled Β· 10% traffic split for 24 h"
115
 
116
  def _promote_production(ctx, _):
117
- time.sleep(0.15)
118
  return "Champion model promoted to Production Β· old version archived"
119
 
120
  # ── Data pipeline tasks ────────────────────────────────────────────────────────
121
 
122
  def _ingest_raw(ctx, _):
123
- time.sleep(0.2)
124
  return "Raw data ingested from source"
125
 
126
  def _clean_data(ctx, _):
127
- time.sleep(0.3)
128
  removed = np.random.randint(5, 40)
129
  return f"{removed} anomalous rows removed Β· missing values imputed"
130
 
131
  def _encode_features(ctx, _):
132
- time.sleep(0.2)
133
  return "Categorical features one-hot encoded Β· ordinals label-encoded"
134
 
135
  def _scale_features(ctx, _):
136
- time.sleep(0.2)
137
  return "Numeric features scaled with StandardScaler"
138
 
139
  def _save_processed(ctx, _):
140
- time.sleep(0.1)
141
  return "Processed dataset saved to feature store"
142
 
143
 
 
1
  """Pre-built ML pipeline DAG definitions."""
 
2
  import numpy as np
3
  from pipelines.dag_engine import DAG, Task
4
 
 
17
  def _validate_data(ctx, results):
18
  log = ctx.get("_log")
19
  if log: log("Checking schema, nulls, and feature ranges…")
 
20
  if log: log("No nulls found Β· All feature ranges valid")
21
  return "Schema OK Β· No nulls detected Β· Feature ranges valid"
22
 
23
  def _preprocess(ctx, results):
24
  log = ctx.get("_log")
25
  if log: log("Fitting StandardScaler on training split…")
 
26
  if log: log("80/20 stratified train/test split applied")
27
  return "StandardScaler fitted Β· Train/test split 80/20"
28
 
29
  def _feature_engineering(ctx, results):
30
  log = ctx.get("_log")
31
  if log: log("Evaluating polynomial and interaction features…")
 
32
  if log: log("No additional features needed Β· all originals retained")
33
  return "Polynomial features skipped Β· All features retained"
34
 
 
61
  def _evaluate_model(ctx, results):
62
  log = ctx.get("_log")
63
  if log: log("Computing accuracy / RΒ² on hold-out set…")
 
64
  if log: log("5-fold cross-validation passed")
65
  return "Accuracy / RΒ² computed Β· Cross-val 5-fold done"
66
 
67
  def _generate_report(ctx, results):
68
  log = ctx.get("_log")
69
  if log: log("Writing evaluation artefacts to MLflow…")
 
70
  return "HTML report generated Β· Metrics written to mlflow"
71
 
72
  def _register_model(ctx, _results):
73
  log = ctx.get("_log")
74
  if log: log("Pushing model artifact to MLflow Model Registry…")
 
75
  return "Model artifact registered in MLflow Model Registry"
76
 
77
  def _deploy_staging(ctx, _results):
78
  log = ctx.get("_log")
79
  if log: log("Transitioning model version to Staging…")
 
80
  if log: log("REST endpoint ready")
81
  return "Model transitioned to Staging Β· REST endpoint ready"
82
 
83
  # ── Retraining pipeline tasks ──────────────────────────────────────────────────
84
 
85
  def _check_drift(ctx, _):
 
86
  drift = round(np.random.uniform(0.01, 0.08), 4)
87
  return f"PSI={drift} Β· {'Drift detected β€” retraining triggered' if drift > 0.05 else 'No drift Β· pipeline skipped'}"
88
 
89
  def _fetch_new_data(ctx, _):
 
90
  n = np.random.randint(200, 800)
91
  return f"{n} new labelled samples fetched from data store"
92
 
93
  def _merge_datasets(ctx, _):
 
94
  return "New data merged with historical Β· duplicates removed"
95
 
96
  def _retrain_champion(ctx, _):
 
97
  acc = round(np.random.uniform(0.88, 0.97), 4)
98
  return f"Champion model retrained Β· new accuracy={acc}"
99
 
100
  def _ab_test(ctx, _):
 
101
  return "A/B test scheduled Β· 10% traffic split for 24 h"
102
 
103
  def _promote_production(ctx, _):
 
104
  return "Champion model promoted to Production Β· old version archived"
105
 
106
  # ── Data pipeline tasks ────────────────────────────────────────────────────────
107
 
108
  def _ingest_raw(ctx, _):
 
109
  return "Raw data ingested from source"
110
 
111
  def _clean_data(ctx, _):
 
112
  removed = np.random.randint(5, 40)
113
  return f"{removed} anomalous rows removed Β· missing values imputed"
114
 
115
  def _encode_features(ctx, _):
 
116
  return "Categorical features one-hot encoded Β· ordinals label-encoded"
117
 
118
  def _scale_features(ctx, _):
 
119
  return "Numeric features scaled with StandardScaler"
120
 
121
  def _save_processed(ctx, _):
 
122
  return "Processed dataset saved to feature store"
123
 
124
 
templates/pipeline.html CHANGED
@@ -175,42 +175,6 @@
175
  .cfg-row-k { color: var(--text-muted); white-space: nowrap; padding-right: 8px; }
176
  .cfg-row-v { color: var(--text-primary); font-weight: 500; text-align: right; word-break: break-word; max-width: 62%; font-size: .77rem; }
177
 
178
- /* ── Steps progress bar ───────────────────────────────────────────────────── */
179
- .ps-steps {
180
- flex-shrink: 0; display: none;
181
- padding: 5px 16px; min-height: 38px;
182
- background: var(--bg-secondary);
183
- border-bottom: 1px solid var(--border-color);
184
- overflow-x: auto; align-items: center; gap: 4px;
185
- scrollbar-width: none;
186
- }
187
- .ps-steps::-webkit-scrollbar { display: none; }
188
- .ps-steps.visible { display: flex; }
189
-
190
- .step-pill {
191
- display: inline-flex; align-items: center; gap: 5px;
192
- padding: 3px 9px; border-radius: 20px; flex-shrink: 0;
193
- font-size: .7rem; font-weight: 500; white-space: nowrap;
194
- border: 1px solid var(--border-color);
195
- background: var(--bg-tertiary); color: var(--text-secondary);
196
- transition: background .2s, border-color .2s, color .2s;
197
- }
198
- .step-pill.s-running {
199
- border-color: var(--warning); background: rgba(245,158,11,.12); color: var(--warning);
200
- animation: pill-pulse 1.4s ease-in-out infinite;
201
- }
202
- .step-pill.s-success {
203
- border-color: rgba(34,197,94,.35); background: rgba(34,197,94,.08); color: var(--success);
204
- }
205
- .step-pill.s-failed {
206
- border-color: rgba(239,68,68,.35); background: rgba(239,68,68,.08); color: var(--danger);
207
- }
208
- .step-sep { color: var(--border-color); font-size: .65rem; flex-shrink: 0; user-select: none; }
209
- @keyframes pill-pulse {
210
- 0%,100% { box-shadow: none; }
211
- 50% { box-shadow: 0 0 0 3px rgba(245,158,11,.15); }
212
- }
213
-
214
  /* ── Terminal ─────────────────────────────────────────────────────────────── */
215
  .ps-term {
216
  flex-shrink: 0; height: 34px; overflow: hidden;
@@ -275,9 +239,6 @@
275
  </button>
276
  </div>
277
 
278
- <!-- ── Steps progress bar ────────────────────────────────────────────── -->
279
- <div class="ps-steps" id="ps-steps"></div>
280
-
281
  <!-- ── Main area ──────────────────────────────────────────────────────── -->
282
  <div class="ps-main">
283
 
@@ -364,7 +325,6 @@ document.addEventListener('DOMContentLoaded', () =>
364
  function switchPipeline(id, btn) {
365
  if (pollIv) { clearInterval(pollIv); pollIv = null; }
366
  cur = id; execId = null; tstates = {}; selNode = null;
367
- _hideSteps();
368
  closeConfig(false);
369
  document.querySelectorAll('.ps-tab').forEach(b => b.classList.remove('active'));
370
  btn.classList.add('active');
@@ -618,38 +578,6 @@ async function onTtChange(tt) {
618
  }
619
  function onCatChange(cat) { pCtx.category=cat; _fillAlgos(cat); }
620
 
621
- // ── Steps bar ─────────────────────────────────────────────────────────────────
622
- function _stepsOrder(dag) {
623
- return Object.values(dag.tasks)
624
- .sort((a, b) => a.layer !== b.layer ? a.layer - b.layer : a.task_id.localeCompare(b.task_id));
625
- }
626
-
627
- function _renderSteps(dag, tstates) {
628
- const el = document.getElementById('ps-steps');
629
- el.innerHTML = '';
630
- _stepsOrder(dag).forEach((t, i) => {
631
- if (i > 0) {
632
- const sep = document.createElement('span');
633
- sep.className = 'step-sep'; sep.textContent = 'β€Ί';
634
- el.appendChild(sep);
635
- }
636
- const st = (tstates[t.task_id] || {}).status || 'pending';
637
- const pill = document.createElement('div');
638
- pill.className = `step-pill s-${st}`;
639
- pill.textContent = `${t.icon} ${t.name}`;
640
- el.appendChild(pill);
641
- });
642
- }
643
-
644
- function _showSteps(dag) {
645
- _renderSteps(dag, {});
646
- document.getElementById('ps-steps').classList.add('visible');
647
- }
648
-
649
- function _hideSteps() {
650
- document.getElementById('ps-steps').classList.remove('visible');
651
- }
652
-
653
  // ── Execute pipeline ──────────────────────────────────────────────────────────
654
  async function runPipeline() {
655
  const runBtn = document.getElementById('ps-run-btn');
@@ -657,7 +585,6 @@ async function runPipeline() {
657
  document.getElementById('ps-btn-icon').className = 'spinner';
658
  document.getElementById('ps-btn-txt').textContent = 'Running…';
659
  _setBadge('running');
660
- _showSteps(DAGS[cur]);
661
  _openTerm();
662
 
663
  const ctx = {};
@@ -703,7 +630,6 @@ function _poll() {
703
 
704
  document.getElementById('term-pct').textContent = exec.progress!=null?exec.progress+'%':'';
705
  renderDAG(DAGS[cur], tstates);
706
- _renderSteps(DAGS[cur], tstates);
707
  _updateCfgStatus();
708
 
709
  if (exec.status==='completed') {
 
175
  .cfg-row-k { color: var(--text-muted); white-space: nowrap; padding-right: 8px; }
176
  .cfg-row-v { color: var(--text-primary); font-weight: 500; text-align: right; word-break: break-word; max-width: 62%; font-size: .77rem; }
177
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
178
  /* ── Terminal ─────────────────────────────────────────────────────────────── */
179
  .ps-term {
180
  flex-shrink: 0; height: 34px; overflow: hidden;
 
239
  </button>
240
  </div>
241
 
 
 
 
242
  <!-- ── Main area ──────────────────────────────────────────────────────── -->
243
  <div class="ps-main">
244
 
 
325
  function switchPipeline(id, btn) {
326
  if (pollIv) { clearInterval(pollIv); pollIv = null; }
327
  cur = id; execId = null; tstates = {}; selNode = null;
 
328
  closeConfig(false);
329
  document.querySelectorAll('.ps-tab').forEach(b => b.classList.remove('active'));
330
  btn.classList.add('active');
 
578
  }
579
  function onCatChange(cat) { pCtx.category=cat; _fillAlgos(cat); }
580
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
581
  // ── Execute pipeline ──────────────────────────────────────────────────────────
582
  async function runPipeline() {
583
  const runBtn = document.getElementById('ps-run-btn');
 
585
  document.getElementById('ps-btn-icon').className = 'spinner';
586
  document.getElementById('ps-btn-txt').textContent = 'Running…';
587
  _setBadge('running');
 
588
  _openTerm();
589
 
590
  const ctx = {};
 
630
 
631
  document.getElementById('term-pct').textContent = exec.progress!=null?exec.progress+'%':'';
632
  renderDAG(DAGS[cur], tstates);
 
633
  _updateCfgStatus();
634
 
635
  if (exec.status==='completed') {