triflix commited on
Commit
ad8abe4
·
verified ·
1 Parent(s): 347b431

Create workflow.md

Browse files
Files changed (1) hide show
  1. workflow.md +638 -0
workflow.md ADDED
@@ -0,0 +1,638 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ I’m with you. Let’s wire the whole flow so uploads are snapshotted, deduped, and only re-run the AI pipeline when needed. We’ll keep your HF dataset store as the “remote S3,” maintain local snapshots, and call the strict Python Executor API to render charts from Agent 2’s python_snippet.
2
+
3
+ Overview
4
+ - Service A: Storage + Snapshotter (extends your HF uploader)
5
+ - Handles uploads, computes fingerprint (sha256, shape, schema), dedup across renames, keeps a manifest of snapshots and artifacts, pushes to HF.
6
+ - If identical snapshot already exists, it returns the previous AI artifacts without re-running.
7
+ - If changed, it creates a new snapshot and triggers the pipeline.
8
+ - Service B: Analytics Orchestrator (Gemini + Qdrant + Chart Plan + Execution)
9
+ - Runs Agent 1 (Profiler) and Agent 2 (Viz Planner) and stores artifacts (profile.json, plan.json, figures.json).
10
+ - Calls the Strict Python Executor API (/execute) to run the python_snippet against the snapshot dataset.
11
+ - Service C: Strict Python Executor (already done above)
12
+ - Executes python_snippet with df + plan and returns Plotly figure JSON.
13
+
14
+ Data and UX notes
15
+ - Renamed same file: dedup by content sha256 → reuse snapshot and all AI artifacts.
16
+ - Same shape (rows/cols) but data changed: new snapshot → re-run pipeline.
17
+ - Completely different dataset: new dataset key or new snapshot under a different key.
18
+ - Frontend “click columns”:
19
+ - An API returns preprocessed slices (value counts, stats, crosstabs) for the clicked columns and highlights those in the charts.
20
+ - We serve preprocessed data from the snapshot to avoid recomputing.
21
+
22
+ Service A: Storage + Snapshotter (FastAPI)
23
+ - Replaces/extends your uploader with fingerprinting, dedup, and snapshot registry.
24
+ - Still uses CommitScheduler to push to HF.
25
+
26
+ requirements.txt
27
+ - fastapi, uvicorn, pydantic, pandas, pyarrow, orjson, huggingface_hub
28
+
29
+ app/storage_main.py
30
+ ```python
31
+ import os, io, json, uuid, time, hashlib, asyncio
32
+ from pathlib import Path
33
+ from typing import Any, Dict, Optional, List
34
+ from fastapi import FastAPI, UploadFile, File, HTTPException, Form
35
+ from fastapi.responses import JSONResponse
36
+ from pydantic import BaseModel, Field
37
+ from huggingface_hub import CommitScheduler
38
+ import pandas as pd
39
+
40
+ # ---------------- Config ----------------
41
+ HF_TOKEN = os.environ.get("HF_TOKEN")
42
+ BACKUP_REPO = os.environ.get("HF_REPO", "triflix/database")
43
+ UPLOAD_DIR = Path(os.environ.get("UPLOAD_DIR", "/tmp/uploadedfiles"))
44
+ DATA_ROOT = Path(os.environ.get("DATA_ROOT", "/data")) # local canonical storage
45
+ ARTIFACTS = DATA_ROOT / "artifacts" # per-snapshot artifacts
46
+ REGISTRY = DATA_ROOT / "manifest.json" # global manifest
47
+
48
+ UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
49
+ DATA_ROOT.mkdir(parents=True, exist_ok=True)
50
+ ARTIFACTS.mkdir(parents=True, exist_ok=True)
51
+ if not REGISTRY.exists():
52
+ REGISTRY.write_text(json.dumps({"snapshots": {}, "by_sha": {}, "by_dataset": {}}, indent=2))
53
+
54
+ scheduler = CommitScheduler(
55
+ repo_id = BACKUP_REPO,
56
+ repo_type = "dataset",
57
+ folder_path = DATA_ROOT, # push entire /data tree (datasets + artifacts)
58
+ path_in_repo = "data",
59
+ every = 60*24*365,
60
+ token = HF_TOKEN,
61
+ )
62
+
63
+ app = FastAPI(title="Storage + Snapshotter")
64
+
65
+ # --------------- Models -----------------
66
+ class SnapshotMeta(BaseModel):
67
+ snapshot_id: str
68
+ dataset_key: str
69
+ file_sha256: str
70
+ file_name: str
71
+ fmt: str
72
+ n_rows: int
73
+ n_cols: int
74
+ columns: List[str]
75
+ dtypes: Dict[str, str]
76
+ sample_hash: str
77
+ local_path: str
78
+ created_at: float
79
+ artifacts: Dict[str, str] = Field(default_factory=dict) # profile.json, plan.json, figures.json
80
+
81
+ # --------------- Helpers ----------------
82
+ def load_manifest() -> Dict[str, Any]:
83
+ try:
84
+ return json.loads(REGISTRY.read_text())
85
+ except Exception:
86
+ return {"snapshots": {}, "by_sha": {}, "by_dataset": {}}
87
+
88
+ def save_manifest(man: Dict[str, Any]):
89
+ tmp = REGISTRY.with_suffix(".tmp.json")
90
+ tmp.write_text(json.dumps(man, indent=2))
91
+ tmp.replace(REGISTRY)
92
+
93
+ def sha256_of_bytes(b: bytes) -> str:
94
+ h = hashlib.sha256()
95
+ h.update(b)
96
+ return h.hexdigest()
97
+
98
+ def sha256_of_file(path: Path) -> str:
99
+ h = hashlib.sha256()
100
+ with path.open("rb") as f:
101
+ for chunk in iter(lambda: f.read(1024*1024), b""):
102
+ h.update(chunk)
103
+ return h.hexdigest()
104
+
105
+ def sniff_fmt(filename: str) -> str:
106
+ lower = filename.lower()
107
+ if lower.endswith(".csv"): return "csv"
108
+ if lower.endswith(".parquet") or lower.endswith(".pq"): return "parquet"
109
+ raise ValueError("Unsupported file type (csv/parquet only)")
110
+
111
+ def fingerprint_dataset(path: Path, fmt: str, sample_n: int = 5) -> Dict[str, Any]:
112
+ if fmt == "csv":
113
+ # Count rows quickly without loading full DF into memory
114
+ with path.open("rb") as f:
115
+ n_rows = sum(1 for _ in f) - 1 # minus header
116
+ df_head = pd.read_csv(path, nrows=sample_n)
117
+ # tail: read all but keep last 5 (OK for mid-size; switch to chunks for very large)
118
+ df_tail = pd.read_csv(path).tail(sample_n)
119
+ cols = list(df_head.columns)
120
+ dtypes = {}
121
+ for c in cols:
122
+ try:
123
+ dtypes[c] = str(pd.read_csv(path, nrows=1)[c].dtype)
124
+ except Exception:
125
+ dtypes[c] = "unknown"
126
+ n_cols = len(cols)
127
+ else: # parquet
128
+ df = pd.read_parquet(path)
129
+ n_rows, n_cols = df.shape
130
+ cols = list(df.columns)
131
+ dtypes = {c: str(df[c].dtype) for c in cols}
132
+ df_head = df.head(sample_n)
133
+ df_tail = df.tail(sample_n)
134
+
135
+ # sample hash from head+tail to catch trivial renames
136
+ sample_blob = json.dumps({
137
+ "head": df_head.to_dict(orient="records"),
138
+ "tail": df_tail.to_dict(orient="records")
139
+ }, ensure_ascii=False, default=str).encode("utf-8")
140
+ sample_hash = sha256_of_bytes(sample_blob)
141
+
142
+ return {
143
+ "n_rows": int(n_rows),
144
+ "n_cols": int(n_cols),
145
+ "columns": cols,
146
+ "dtypes": dtypes,
147
+ "sample_hash": sample_hash
148
+ }
149
+
150
+ def ensure_snapshot_dirs(snapshot_id: str) -> Path:
151
+ sdir = DATA_ROOT / "snapshots" / snapshot_id
152
+ sdir.mkdir(parents=True, exist_ok=True)
153
+ (sdir / "artifacts").mkdir(parents=True, exist_ok=True)
154
+ return sdir
155
+
156
+ # --------------- Upload + Snapshot logic ----------------
157
+ @app.on_event("shutdown")
158
+ def _stop_scheduler():
159
+ scheduler.stop()
160
+
161
+ @app.post("/upload")
162
+ async def upload(
163
+ file: UploadFile = File(...),
164
+ dataset_key: Optional[str] = Form(None),
165
+ trigger_pipeline: bool = Form(True),
166
+ ):
167
+ """
168
+ - Saves file locally under /data/uploads
169
+ - Computes sha256, shape, schema, sample hash
170
+ - Dedups across renames using sha256
171
+ - Returns snapshot + optionally triggers pipeline by HTTP call (deferred to Orchestrator)
172
+ """
173
+ try:
174
+ # 1) Save temp
175
+ tmp = UPLOAD_DIR / f"{uuid.uuid4()}_{file.filename}"
176
+ content = await file.read()
177
+ tmp.write_bytes(content)
178
+
179
+ # 2) File meta
180
+ fmt = sniff_fmt(file.filename)
181
+ fsha = sha256_of_file(tmp)
182
+
183
+ # 3) Dedup by sha
184
+ man = load_manifest()
185
+ existing_snapshot_id = man["by_sha"].get(fsha)
186
+
187
+ if not dataset_key:
188
+ # Use filename stem as default dataset key
189
+ dataset_key = Path(file.filename).stem.lower().replace(" ", "_")
190
+
191
+ if existing_snapshot_id:
192
+ snap = man["snapshots"][existing_snapshot_id]
193
+ # Also ensure dataset_key list contains this snapshot
194
+ byds = man["by_dataset"].setdefault(dataset_key, [])
195
+ if existing_snapshot_id not in byds:
196
+ byds.append(existing_snapshot_id)
197
+ save_manifest(man)
198
+ # Cleanup temp (we already have canonical file)
199
+ try: tmp.unlink()
200
+ except: pass
201
+
202
+ # Return existing snapshot (reuse artifacts)
203
+ return {
204
+ "status": "reused",
205
+ "reason": "identical_content_sha",
206
+ "dataset_key": dataset_key,
207
+ "snapshot_id": existing_snapshot_id,
208
+ "artifacts": snap.get("artifacts", {}),
209
+ "meta": snap
210
+ }
211
+
212
+ # 4) New snapshot path
213
+ snapshot_id = fsha[:32]
214
+ sdir = ensure_snapshot_dirs(snapshot_id)
215
+ dest = sdir / file.filename
216
+ tmp.replace(dest)
217
+
218
+ # 5) Fingerprint
219
+ fp = fingerprint_dataset(dest, fmt)
220
+ meta = SnapshotMeta(
221
+ snapshot_id=snapshot_id,
222
+ dataset_key=dataset_key,
223
+ file_sha256=fsha,
224
+ file_name=file.filename,
225
+ fmt=fmt,
226
+ n_rows=fp["n_rows"],
227
+ n_cols=fp["n_cols"],
228
+ columns=fp["columns"],
229
+ dtypes=fp["dtypes"],
230
+ sample_hash=fp["sample_hash"],
231
+ local_path=str(dest),
232
+ created_at=time.time(),
233
+ artifacts={}
234
+ ).model_dump()
235
+
236
+ # 6) Update registry
237
+ man["snapshots"][snapshot_id] = meta
238
+ man["by_sha"][fsha] = snapshot_id
239
+ man["by_dataset"].setdefault(dataset_key, []).append(snapshot_id)
240
+ save_manifest(man)
241
+
242
+ # 7) Push to HF (async)
243
+ await asyncio.to_thread(scheduler.trigger)
244
+
245
+ # 8) Optionally call the Orchestrator to prepare artifacts
246
+ if trigger_pipeline:
247
+ # POST to orchestrator (configure ORCH_URL env)
248
+ orch = os.environ.get("ORCH_URL")
249
+ if orch:
250
+ import requests
251
+ try:
252
+ requests.post(f"{orch}/prepare", json={
253
+ "dataset_key": dataset_key,
254
+ "snapshot_id": snapshot_id,
255
+ "local_path": meta["local_path"]
256
+ }, timeout=5)
257
+ except Exception:
258
+ pass
259
+
260
+ return {
261
+ "status": "snapshot_created",
262
+ "dataset_key": dataset_key,
263
+ "snapshot_id": snapshot_id,
264
+ "meta": meta
265
+ }
266
+
267
+ except Exception as e:
268
+ raise HTTPException(500, f"Upload failed: {e}")
269
+
270
+ @app.get("/datasets/{dataset_key}/snapshots")
271
+ def list_snapshots(dataset_key: str):
272
+ man = load_manifest()
273
+ ids = man["by_dataset"].get(dataset_key, [])
274
+ return {"dataset_key": dataset_key, "snapshots": [man["snapshots"][i] for i in ids]}
275
+
276
+ @app.get("/snapshots/{snapshot_id}")
277
+ def get_snapshot(snapshot_id: str):
278
+ man = load_manifest()
279
+ snap = man["snapshots"].get(snapshot_id)
280
+ if not snap:
281
+ raise HTTPException(404, "Snapshot not found")
282
+ return snap
283
+ ```
284
+
285
+ Key rules implemented
286
+ - Exact same file (sha256) → “reused” snapshot and artifacts. This covers rename uploads too.
287
+ - Otherwise we create a new snapshot (even if rows/cols match but values changed).
288
+ - Everything is recorded in /data/manifest.json; data is under /data/snapshots/<snapshot_id>/.
289
+
290
+ Service B: Analytics Orchestrator (FastAPI)
291
+ - Runs Agent 1 + Agent 2, stores artifacts, and calls Strict Python Executor.
292
+
293
+ requirements.txt
294
+ - fastapi, uvicorn, pydantic, pandas, numpy, plotly, google-generativeai, qdrant-client, orjson
295
+
296
+ app/orchestrator_main.py
297
+ ```python
298
+ import os, json, time
299
+ from typing import Any, Dict, Optional
300
+ from pathlib import Path
301
+ from fastapi import FastAPI, HTTPException
302
+ from pydantic import BaseModel
303
+ import pandas as pd
304
+
305
+ import google.generativeai as genai
306
+ from qdrant_client import QdrantClient
307
+ from qdrant_client.models import Distance, VectorParams
308
+
309
+ # Reuse your LLM/Qdrant helpers from earlier response or import them as a module
310
+ # For brevity, define minimal here
311
+
312
+ GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY", "")
313
+ if not GEMINI_API_KEY:
314
+ raise RuntimeError("Set GEMINI_API_KEY")
315
+ genai.configure(api_key=GEMINI_API_KEY)
316
+ TEXT_MODEL = os.environ.get("GEMINI_TEXT_MODEL", "gemini-1.5-pro")
317
+ EMBED_MODEL = "models/embedding-001"
318
+ EMBED_DIM = 768
319
+
320
+ QDRANT_URL = os.environ.get("QDRANT_URL", "http://localhost:6333")
321
+ ARTIFACTS_ROOT = Path(os.environ.get("ARTIFACTS_ROOT", "/data/artifacts"))
322
+ SNAPSHOTS_ROOT = Path(os.environ.get("SNAPSHOTS_ROOT", "/data/snapshots"))
323
+ EXEC_URL = os.environ.get("EXEC_URL", "http://python-exec-sandbox:8000")
324
+
325
+ app = FastAPI(title="Analytics Orchestrator")
326
+
327
+ def qdrant():
328
+ c = QdrantClient(url=QDRANT_URL)
329
+ # Ensure collection
330
+ try:
331
+ c.get_collection("dataset_profiles")
332
+ except Exception:
333
+ c.create_collection("dataset_profiles",
334
+ vectors_config=VectorParams(size=EMBED_DIM, distance=Distance.COSINE))
335
+ return c
336
+
337
+ def embed_text(text: str, task: str) -> list[float]:
338
+ return genai.embed_content(model=EMBED_MODEL, content=text, task_type=task)["embedding"]
339
+
340
+ def df_schema_summary(df: pd.DataFrame) -> Dict[str, Any]:
341
+ cols = []
342
+ for c in df.columns:
343
+ s = df[c]
344
+ cols.append({
345
+ "name": c,
346
+ "dtype": str(s.dtype),
347
+ "unique_count": int(s.nunique(dropna=True)),
348
+ "missing": int(s.isna().sum()),
349
+ "missing_pct": float(s.isna().mean())
350
+ })
351
+ return {
352
+ "shape": {"rows": int(df.shape[0]), "columns": int(df.shape[1])},
353
+ "duplicates": int(df.duplicated().sum()),
354
+ "columns": cols
355
+ }
356
+
357
+ def profile_text(dataset_name: str, meta: Dict[str, Any]) -> str:
358
+ lines = [
359
+ f"Dataset: {dataset_name}",
360
+ f"Shape: {meta['shape']['rows']}x{meta['shape']['columns']}",
361
+ f"Duplicate rows: {meta['duplicates']}"
362
+ ]
363
+ for c in meta["columns"]:
364
+ lines.append(f"- {c['name']} ({c['dtype']}) unique={c['unique_count']} missing={c['missing']}({round(100*c['missing_pct'],1)}%)")
365
+ return "\n".join(lines)
366
+
367
+ def agent1(df: pd.DataFrame, dataset_name: str) -> Dict[str, Any]:
368
+ meta = df_schema_summary(df)
369
+ ctx = profile_text(dataset_name, meta)
370
+ model = genai.GenerativeModel(model_name=TEXT_MODEL,
371
+ generation_config={"temperature": 0.2, "response_mime_type": "application/json"})
372
+ prompt = """
373
+ You are Agent 1 (Profiler).
374
+ Return STRICT JSON:
375
+ {
376
+ "dataset_name": "string",
377
+ "shape": {"rows": int, "columns": int},
378
+ "duplicates": int,
379
+ "columns": [{"name": "string", "dtype": "string", "unique_count": int, "missing": int, "missing_pct": float, "notes": "string"}],
380
+ "summary": "short summary",
381
+ "data_quality": {
382
+ "high_missing_columns": [],
383
+ "high_cardinality_columns": [],
384
+ "potential_id_columns": [],
385
+ "warnings": []
386
+ }
387
+ }
388
+ Be concise. Use only provided columns.
389
+ """
390
+ parts = [
391
+ {"role": "user", "parts": [
392
+ f"Dataset name: {dataset_name}",
393
+ "Compact profile:",
394
+ ctx,
395
+ "Schema JSON:",
396
+ json.dumps(meta, ensure_ascii=False)
397
+ ]}
398
+ ]
399
+ resp = model.generate_content(parts)
400
+ out = json.loads(resp.text)
401
+
402
+ # Store in Qdrant
403
+ c = qdrant()
404
+ vec = embed_text(ctx, "retrieval_document")
405
+ c.upsert("dataset_profiles", points=[{"id": f"{dataset_name}-{int(time.time())}", "vector": vec, "payload": {"dataset_name": dataset_name, "profile": out}}])
406
+ return out
407
+
408
+ def agent2(agent1_out: Dict[str, Any], df: pd.DataFrame) -> Dict[str, Any]:
409
+ schema = [{"name": c["name"], "dtype": c["dtype"], "unique_count": c["unique_count"], "missing_pct": c["missing_pct"]}
410
+ for c in agent1_out["columns"]]
411
+ model = genai.GenerativeModel(model_name=TEXT_MODEL,
412
+ generation_config={"temperature": 0.2, "response_mime_type": "application/json"})
413
+ prompt = """
414
+ You are Agent 2 (Visualization Planner).
415
+ Produce EXACTLY 4 bar charts (count or mean/sum by category).
416
+ STRICT JSON:
417
+ {
418
+ "charts": [
419
+ {"id":"bar1","kind":"bar","x":"col","y":"count_or_numeric","agg":"count|mean|sum","top_k":10,"title":"t","description":"d"},
420
+ {...},{...},{...}
421
+ ]
422
+ }
423
+ Only use columns in schema. Avoid high-cardinality free-text.
424
+ """
425
+ parts = [{"role":"user","parts":[
426
+ "Agent 1 output:", json.dumps(agent1_out, ensure_ascii=False),
427
+ "Schema:", json.dumps({"schema": schema}, ensure_ascii=False)
428
+ ]}]
429
+ resp = model.generate_content(parts)
430
+ plan = json.loads(resp.text)
431
+
432
+ # Attach python_snippet (same safe snippet as before)
433
+ plan["python_snippet"] = r'''
434
+ import json
435
+ import pandas as pd
436
+ import plotly.express as px
437
+ def _safe_bar(df, spec):
438
+ x = spec["x"]; y = spec.get("y","count"); agg = spec.get("agg","count"); k=int(spec.get("top_k",10))
439
+ if x not in df.columns: raise ValueError(f"x not found: {x}")
440
+ if agg=="count" or y=="count":
441
+ vc = df[x].astype("object").value_counts(dropna=False).head(k)
442
+ fig = px.bar(x=[str(i) for i in vc.index], y=vc.values, labels={"x":x,"y":"count"}, title=spec.get("title",""))
443
+ fig.update_xaxes(type="category"); return fig
444
+ else:
445
+ if y not in df.columns: raise ValueError(f"y not found: {y}")
446
+ grouped = df.groupby(x, dropna=False)[y].agg(agg).sort_values(ascending=False).head(k)
447
+ fig = px.bar(x=[str(i) for i in grouped.index], y=grouped.values, labels={"x":x,"y":f"{agg}({y})"}, title=spec.get("title",""))
448
+ fig.update_xaxes(type="category"); return fig
449
+ def execute(df, plan):
450
+ figs = []
451
+ for spec in plan.get("charts", []):
452
+ if spec.get("kind")=="bar": figs.append(_safe_bar(df, spec))
453
+ return {"figures":[f.to_plotly_json() for f in figs], "charts_meta": plan.get("charts", [])}
454
+ result = execute(df, plan)
455
+ '''
456
+ return plan
457
+
458
+ class PrepareReq(BaseModel):
459
+ dataset_key: str
460
+ snapshot_id: str
461
+ local_path: str
462
+
463
+ @app.post("/prepare")
464
+ def prepare(req: PrepareReq):
465
+ # If artifacts already exist, return them
466
+ sdir = SNAPSHOTS_ROOT / req.snapshot_id
467
+ if not sdir.exists():
468
+ raise HTTPException(404, "Snapshot not found on disk")
469
+ adir = sdir / "artifacts"
470
+ prof_path = adir / "profile.json"
471
+ plan_path = adir / "plan.json"
472
+ figs_path = adir / "figures.json"
473
+
474
+ if prof_path.exists() and plan_path.exists() and figs_path.exists():
475
+ return {"status": "reused", "snapshot_id": req.snapshot_id,
476
+ "artifacts": {"profile": str(prof_path), "plan": str(plan_path), "figures": str(figs_path)}}
477
+
478
+ # Run Agents
479
+ fmt = "csv" if req.local_path.lower().endswith(".csv") else "parquet"
480
+ df = pd.read_csv(req.local_path) if fmt=="csv" else pd.read_parquet(req.local_path)
481
+ a1 = agent1(df, req.dataset_key)
482
+ a2 = agent2(a1, df)
483
+
484
+ # Execute charts via Strict Executor
485
+ import requests
486
+ exec_req = {
487
+ "code": a2["python_snippet"],
488
+ "variables": {"plan": a2},
489
+ "datasets": [{"alias": "df", "path": req.local_path, "fmt": fmt, "read_kwargs": {}}],
490
+ "limits": {"cpu_seconds": 10, "memory_mb": 1024, "wall_seconds": 20}
491
+ }
492
+ exec_resp = requests.post(f"{EXEC_URL}/execute", json=exec_req, timeout=60).json()
493
+ if not exec_resp.get("ok"):
494
+ raise HTTPException(500, f"Executor failed: {exec_resp.get('stderr')}")
495
+
496
+ # Persist artifacts
497
+ adir.mkdir(parents=True, exist_ok=True)
498
+ prof_path.write_text(json.dumps(a1, indent=2))
499
+ plan_to_save = dict(a2); plan_to_save["python_snippet"] = "<omitted>"
500
+ plan_path.write_text(json.dumps(plan_to_save, indent=2))
501
+ figs_path.write_text(json.dumps(exec_resp["result"], indent=2))
502
+
503
+ return {"status": "created", "snapshot_id": req.snapshot_id,
504
+ "artifacts": {"profile": str(prof_path), "plan": str(plan_path), "figures": str(figs_path)}}
505
+
506
+ @app.get("/snapshots/{snapshot_id}/artifacts")
507
+ def artifacts(snapshot_id: str):
508
+ sdir = SNAPSHOTS_ROOT / snapshot_id / "artifacts"
509
+ prof, plan, figs = sdir / "profile.json", sdir / "plan.json", sdir / "figures.json"
510
+ if not (prof.exists() and plan.exists() and figs.exists()):
511
+ raise HTTPException(404, "Artifacts not ready")
512
+ return {"profile": json.loads(prof.read_text()),
513
+ "plan": json.loads(plan.read_text()),
514
+ "figures": json.loads(figs.read_text())}
515
+
516
+ @app.get("/snapshots/{snapshot_id}/preprocess")
517
+ def preprocess(snapshot_id: str, columns: str):
518
+ cols = [c.strip() for c in columns.split(",") if c.strip()]
519
+ sdir = SNAPSHOTS_ROOT / snapshot_id
520
+ files = list(sdir.glob("*.*"))
521
+ if not files:
522
+ raise HTTPException(404, "Snapshot file missing")
523
+ path = str(files[0])
524
+ df = pd.read_csv(path) if path.lower().endswith(".csv") else pd.read_parquet(path)
525
+
526
+ out: Dict[str, Any] = {"columns": cols, "frames": {}, "stats": {}}
527
+ if len(cols) == 1:
528
+ c = cols[0]
529
+ if c not in df.columns: raise HTTPException(400, f"Column not found: {c}")
530
+ if str(df[c].dtype).startswith(("float","int")):
531
+ desc = df[c].describe().to_dict()
532
+ out["stats"][c] = {k: float(v) for k, v in desc.items()}
533
+ vc = df[c].astype("object").value_counts(dropna=False).head(50)
534
+ out["frames"][f"value_counts_{c}"] = {"index": [str(i) for i in vc.index], "values": vc.values.tolist()}
535
+ elif len(cols) == 2:
536
+ a, b = cols
537
+ for col in cols:
538
+ if col not in df.columns: raise HTTPException(400, f"Column not found: {col}")
539
+ ct = pd.crosstab(df[a].astype("object"), df[b].astype("object")).head(50)
540
+ out["frames"]["crosstab"] = {"index": [str(i) for i in ct.index], "columns": [str(i) for i in ct.columns], "values": ct.values.tolist()}
541
+ else:
542
+ # Fallback: per-column value_counts
543
+ for c in cols[:3]:
544
+ vc = df[c].astype("object").value_counts(dropna=False).head(20)
545
+ out["frames"][f"value_counts_{c}"] = {"index": [str(i) for i in vc.index], "values": vc.values.tolist()}
546
+ return out
547
+ ```
548
+
549
+ What happens now
550
+ - Upload pipeline:
551
+ - POST /upload (Storage): saves to /data/snapshots/<snapshot>/file.ext, fingerprints, dedups by sha256, pushes to HF repo, and returns snapshot info.
552
+ - If new snapshot, it optionally calls Orchestrator /prepare (async fire-and-forget). Otherwise, it returns “reused” and the artifacts that are already there.
553
+
554
+ - Orchestrator:
555
+ - /prepare loads the local file, runs Agent 1 and Agent 2, calls the Strict Executor /execute to get Plotly figures JSON, and saves artifacts under /data/snapshots/<snapshot>/artifacts/.
556
+ - /snapshots/{id}/artifacts returns the saved profile + chart plan + figures JSON for frontend.
557
+ - /snapshots/{id}/preprocess?columns=... returns preprocessed slices/stats to support your “click columns to show data” UX.
558
+
559
+ - Strict Python Executor:
560
+ - Already in place; we’re calling it with df alias and plan to render figures.
561
+
562
+ Docker Compose (3 services + Qdrant)
563
+ docker-compose.yml
564
+ ```yaml
565
+ version: "3.9"
566
+ services:
567
+ qdrant:
568
+ image: qdrant/qdrant:latest
569
+ ports: ["6333:6333"]
570
+ volumes:
571
+ - qdrant_data:/qdrant/storage
572
+
573
+ python-exec-sandbox:
574
+ build: ./python-exec-sandbox
575
+ ports: ["8000:8000"]
576
+ environment:
577
+ - STORAGE_ROOT=/data
578
+ volumes:
579
+ - exec_data:/data
580
+ # recommended in prod:
581
+ # network_mode: "none"
582
+ # read_only: true
583
+
584
+ storage:
585
+ build: ./storage
586
+ ports: ["8010:8000"]
587
+ environment:
588
+ - HF_TOKEN=${HF_TOKEN}
589
+ - HF_REPO=triflix/database
590
+ - DATA_ROOT=/data
591
+ - ORCH_URL=http://orchestrator:8000
592
+ volumes:
593
+ - data_root:/data
594
+
595
+ orchestrator:
596
+ build: ./orchestrator
597
+ ports: ["8020:8000"]
598
+ environment:
599
+ - GEMINI_API_KEY=${GEMINI_API_KEY}
600
+ - QDRANT_URL=http://qdrant:6333
601
+ - EXEC_URL=http://python-exec-sandbox:8000
602
+ - ARTIFACTS_ROOT=/data/artifacts
603
+ - SNAPSHOTS_ROOT=/data/snapshots
604
+ volumes:
605
+ - data_root:/data
606
+ depends_on:
607
+ - qdrant
608
+ - python-exec-sandbox
609
+
610
+ volumes:
611
+ data_root:
612
+ exec_data:
613
+ qdrant_data:
614
+ ```
615
+
616
+ Front-end flow
617
+ - Upload file to Storage /upload with dataset_key:
618
+ - If identical, response: status=reused + artifacts already available.
619
+ - If new, status=snapshot_created; the Orchestrator starts preparing artifacts; poll /snapshots/{id}/artifacts until ready.
620
+ - Render charts:
621
+ - Use artifacts.figures (Plotly figure dicts) directly in the frontend.
622
+ - “Click columns”:
623
+ - Call Orchestrator /snapshots/{id}/preprocess?columns=colA,colB
624
+ - Show value counts/crosstab/data slice; highlight clicked categories in your UI.
625
+
626
+ Edge cases fully covered
627
+ - Rename same file: same sha256 => reuse previous snapshot and artifacts.
628
+ - Same shape (rows/cols) but different values: sha differs => new snapshot => new run.
629
+ - Different dataset entirely: new dataset_key or separate snapshot series.
630
+ - Repeat uploads of an identical file: instant reuse, no pipeline cost.
631
+ - “Cycle that executed code of AI output and another output?”:
632
+ - Agent 2 can be re-run anytime to generate new chart plans (e.g., different params), and you can /execute again on the sandbox. Store these as additional artifacts (plan_v2.json, figures_v2.json) if needed. The orchestrator code can accept a plan override endpoint if you want to drive this.
633
+
634
+ Do you want me to:
635
+ - Add a /upload_json variant for small DataFrames (base64 or records) so we skip file IO?
636
+ - Add binary quantization in Qdrant to scale profiles?
637
+ - Add JWT auth to Storage/Orchestrator/Executor?
638
+ - Add “plan override” endpoint that takes a new chart plan and executes it against an existing snapshot?