I’m with you. Let’s wire the whole flow so uploads are snapshotted, deduped, and only re-run the AI pipeline when needed. We’ll keep your HF dataset store as the “remote S3,” maintain local snapshots, and call the strict Python Executor API to render charts from Agent 2’s python_snippet. Overview - Service A: Storage + Snapshotter (extends your HF uploader) - Handles uploads, computes fingerprint (sha256, shape, schema), dedup across renames, keeps a manifest of snapshots and artifacts, pushes to HF. - If identical snapshot already exists, it returns the previous AI artifacts without re-running. - If changed, it creates a new snapshot and triggers the pipeline. - Service B: Analytics Orchestrator (Gemini + Qdrant + Chart Plan + Execution) - Runs Agent 1 (Profiler) and Agent 2 (Viz Planner) and stores artifacts (profile.json, plan.json, figures.json). - Calls the Strict Python Executor API (/execute) to run the python_snippet against the snapshot dataset. - Service C: Strict Python Executor (already done above) - Executes python_snippet with df + plan and returns Plotly figure JSON. Data and UX notes - Renamed same file: dedup by content sha256 → reuse snapshot and all AI artifacts. - Same shape (rows/cols) but data changed: new snapshot → re-run pipeline. - Completely different dataset: new dataset key or new snapshot under a different key. - Frontend “click columns”: - An API returns preprocessed slices (value counts, stats, crosstabs) for the clicked columns and highlights those in the charts. - We serve preprocessed data from the snapshot to avoid recomputing. Service A: Storage + Snapshotter (FastAPI) - Replaces/extends your uploader with fingerprinting, dedup, and snapshot registry. - Still uses CommitScheduler to push to HF. requirements.txt - fastapi, uvicorn, pydantic, pandas, pyarrow, orjson, huggingface_hub app/storage_main.py ```python import os, io, json, uuid, time, hashlib, asyncio from pathlib import Path from typing import Any, Dict, Optional, List from fastapi import FastAPI, UploadFile, File, HTTPException, Form from fastapi.responses import JSONResponse from pydantic import BaseModel, Field from huggingface_hub import CommitScheduler import pandas as pd # ---------------- Config ---------------- HF_TOKEN = os.environ.get("HF_TOKEN") BACKUP_REPO = os.environ.get("HF_REPO", "triflix/database") UPLOAD_DIR = Path(os.environ.get("UPLOAD_DIR", "/tmp/uploadedfiles")) DATA_ROOT = Path(os.environ.get("DATA_ROOT", "/data")) # local canonical storage ARTIFACTS = DATA_ROOT / "artifacts" # per-snapshot artifacts REGISTRY = DATA_ROOT / "manifest.json" # global manifest UPLOAD_DIR.mkdir(parents=True, exist_ok=True) DATA_ROOT.mkdir(parents=True, exist_ok=True) ARTIFACTS.mkdir(parents=True, exist_ok=True) if not REGISTRY.exists(): REGISTRY.write_text(json.dumps({"snapshots": {}, "by_sha": {}, "by_dataset": {}}, indent=2)) scheduler = CommitScheduler( repo_id = BACKUP_REPO, repo_type = "dataset", folder_path = DATA_ROOT, # push entire /data tree (datasets + artifacts) path_in_repo = "data", every = 60*24*365, token = HF_TOKEN, ) app = FastAPI(title="Storage + Snapshotter") # --------------- Models ----------------- class SnapshotMeta(BaseModel): snapshot_id: str dataset_key: str file_sha256: str file_name: str fmt: str n_rows: int n_cols: int columns: List[str] dtypes: Dict[str, str] sample_hash: str local_path: str created_at: float artifacts: Dict[str, str] = Field(default_factory=dict) # profile.json, plan.json, figures.json # --------------- Helpers ---------------- def load_manifest() -> Dict[str, Any]: try: return json.loads(REGISTRY.read_text()) except Exception: return {"snapshots": {}, "by_sha": {}, "by_dataset": {}} def save_manifest(man: Dict[str, Any]): tmp = REGISTRY.with_suffix(".tmp.json") tmp.write_text(json.dumps(man, indent=2)) tmp.replace(REGISTRY) def sha256_of_bytes(b: bytes) -> str: h = hashlib.sha256() h.update(b) return h.hexdigest() def sha256_of_file(path: Path) -> str: h = hashlib.sha256() with path.open("rb") as f: for chunk in iter(lambda: f.read(1024*1024), b""): h.update(chunk) return h.hexdigest() def sniff_fmt(filename: str) -> str: lower = filename.lower() if lower.endswith(".csv"): return "csv" if lower.endswith(".parquet") or lower.endswith(".pq"): return "parquet" raise ValueError("Unsupported file type (csv/parquet only)") def fingerprint_dataset(path: Path, fmt: str, sample_n: int = 5) -> Dict[str, Any]: if fmt == "csv": # Count rows quickly without loading full DF into memory with path.open("rb") as f: n_rows = sum(1 for _ in f) - 1 # minus header df_head = pd.read_csv(path, nrows=sample_n) # tail: read all but keep last 5 (OK for mid-size; switch to chunks for very large) df_tail = pd.read_csv(path).tail(sample_n) cols = list(df_head.columns) dtypes = {} for c in cols: try: dtypes[c] = str(pd.read_csv(path, nrows=1)[c].dtype) except Exception: dtypes[c] = "unknown" n_cols = len(cols) else: # parquet df = pd.read_parquet(path) n_rows, n_cols = df.shape cols = list(df.columns) dtypes = {c: str(df[c].dtype) for c in cols} df_head = df.head(sample_n) df_tail = df.tail(sample_n) # sample hash from head+tail to catch trivial renames sample_blob = json.dumps({ "head": df_head.to_dict(orient="records"), "tail": df_tail.to_dict(orient="records") }, ensure_ascii=False, default=str).encode("utf-8") sample_hash = sha256_of_bytes(sample_blob) return { "n_rows": int(n_rows), "n_cols": int(n_cols), "columns": cols, "dtypes": dtypes, "sample_hash": sample_hash } def ensure_snapshot_dirs(snapshot_id: str) -> Path: sdir = DATA_ROOT / "snapshots" / snapshot_id sdir.mkdir(parents=True, exist_ok=True) (sdir / "artifacts").mkdir(parents=True, exist_ok=True) return sdir # --------------- Upload + Snapshot logic ---------------- @app.on_event("shutdown") def _stop_scheduler(): scheduler.stop() @app.post("/upload") async def upload( file: UploadFile = File(...), dataset_key: Optional[str] = Form(None), trigger_pipeline: bool = Form(True), ): """ - Saves file locally under /data/uploads - Computes sha256, shape, schema, sample hash - Dedups across renames using sha256 - Returns snapshot + optionally triggers pipeline by HTTP call (deferred to Orchestrator) """ try: # 1) Save temp tmp = UPLOAD_DIR / f"{uuid.uuid4()}_{file.filename}" content = await file.read() tmp.write_bytes(content) # 2) File meta fmt = sniff_fmt(file.filename) fsha = sha256_of_file(tmp) # 3) Dedup by sha man = load_manifest() existing_snapshot_id = man["by_sha"].get(fsha) if not dataset_key: # Use filename stem as default dataset key dataset_key = Path(file.filename).stem.lower().replace(" ", "_") if existing_snapshot_id: snap = man["snapshots"][existing_snapshot_id] # Also ensure dataset_key list contains this snapshot byds = man["by_dataset"].setdefault(dataset_key, []) if existing_snapshot_id not in byds: byds.append(existing_snapshot_id) save_manifest(man) # Cleanup temp (we already have canonical file) try: tmp.unlink() except: pass # Return existing snapshot (reuse artifacts) return { "status": "reused", "reason": "identical_content_sha", "dataset_key": dataset_key, "snapshot_id": existing_snapshot_id, "artifacts": snap.get("artifacts", {}), "meta": snap } # 4) New snapshot path snapshot_id = fsha[:32] sdir = ensure_snapshot_dirs(snapshot_id) dest = sdir / file.filename tmp.replace(dest) # 5) Fingerprint fp = fingerprint_dataset(dest, fmt) meta = SnapshotMeta( snapshot_id=snapshot_id, dataset_key=dataset_key, file_sha256=fsha, file_name=file.filename, fmt=fmt, n_rows=fp["n_rows"], n_cols=fp["n_cols"], columns=fp["columns"], dtypes=fp["dtypes"], sample_hash=fp["sample_hash"], local_path=str(dest), created_at=time.time(), artifacts={} ).model_dump() # 6) Update registry man["snapshots"][snapshot_id] = meta man["by_sha"][fsha] = snapshot_id man["by_dataset"].setdefault(dataset_key, []).append(snapshot_id) save_manifest(man) # 7) Push to HF (async) await asyncio.to_thread(scheduler.trigger) # 8) Optionally call the Orchestrator to prepare artifacts if trigger_pipeline: # POST to orchestrator (configure ORCH_URL env) orch = os.environ.get("ORCH_URL") if orch: import requests try: requests.post(f"{orch}/prepare", json={ "dataset_key": dataset_key, "snapshot_id": snapshot_id, "local_path": meta["local_path"] }, timeout=5) except Exception: pass return { "status": "snapshot_created", "dataset_key": dataset_key, "snapshot_id": snapshot_id, "meta": meta } except Exception as e: raise HTTPException(500, f"Upload failed: {e}") @app.get("/datasets/{dataset_key}/snapshots") def list_snapshots(dataset_key: str): man = load_manifest() ids = man["by_dataset"].get(dataset_key, []) return {"dataset_key": dataset_key, "snapshots": [man["snapshots"][i] for i in ids]} @app.get("/snapshots/{snapshot_id}") def get_snapshot(snapshot_id: str): man = load_manifest() snap = man["snapshots"].get(snapshot_id) if not snap: raise HTTPException(404, "Snapshot not found") return snap ``` Key rules implemented - Exact same file (sha256) → “reused” snapshot and artifacts. This covers rename uploads too. - Otherwise we create a new snapshot (even if rows/cols match but values changed). - Everything is recorded in /data/manifest.json; data is under /data/snapshots//. Service B: Analytics Orchestrator (FastAPI) - Runs Agent 1 + Agent 2, stores artifacts, and calls Strict Python Executor. requirements.txt - fastapi, uvicorn, pydantic, pandas, numpy, plotly, google-generativeai, qdrant-client, orjson app/orchestrator_main.py ```python import os, json, time from typing import Any, Dict, Optional from pathlib import Path from fastapi import FastAPI, HTTPException from pydantic import BaseModel import pandas as pd import google.generativeai as genai from qdrant_client import QdrantClient from qdrant_client.models import Distance, VectorParams # Reuse your LLM/Qdrant helpers from earlier response or import them as a module # For brevity, define minimal here GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY", "") if not GEMINI_API_KEY: raise RuntimeError("Set GEMINI_API_KEY") genai.configure(api_key=GEMINI_API_KEY) TEXT_MODEL = os.environ.get("GEMINI_TEXT_MODEL", "gemini-1.5-pro") EMBED_MODEL = "models/embedding-001" EMBED_DIM = 768 QDRANT_URL = os.environ.get("QDRANT_URL", "http://localhost:6333") ARTIFACTS_ROOT = Path(os.environ.get("ARTIFACTS_ROOT", "/data/artifacts")) SNAPSHOTS_ROOT = Path(os.environ.get("SNAPSHOTS_ROOT", "/data/snapshots")) EXEC_URL = os.environ.get("EXEC_URL", "http://python-exec-sandbox:8000") app = FastAPI(title="Analytics Orchestrator") def qdrant(): c = QdrantClient(url=QDRANT_URL) # Ensure collection try: c.get_collection("dataset_profiles") except Exception: c.create_collection("dataset_profiles", vectors_config=VectorParams(size=EMBED_DIM, distance=Distance.COSINE)) return c def embed_text(text: str, task: str) -> list[float]: return genai.embed_content(model=EMBED_MODEL, content=text, task_type=task)["embedding"] def df_schema_summary(df: pd.DataFrame) -> Dict[str, Any]: cols = [] for c in df.columns: s = df[c] cols.append({ "name": c, "dtype": str(s.dtype), "unique_count": int(s.nunique(dropna=True)), "missing": int(s.isna().sum()), "missing_pct": float(s.isna().mean()) }) return { "shape": {"rows": int(df.shape[0]), "columns": int(df.shape[1])}, "duplicates": int(df.duplicated().sum()), "columns": cols } def profile_text(dataset_name: str, meta: Dict[str, Any]) -> str: lines = [ f"Dataset: {dataset_name}", f"Shape: {meta['shape']['rows']}x{meta['shape']['columns']}", f"Duplicate rows: {meta['duplicates']}" ] for c in meta["columns"]: lines.append(f"- {c['name']} ({c['dtype']}) unique={c['unique_count']} missing={c['missing']}({round(100*c['missing_pct'],1)}%)") return "\n".join(lines) def agent1(df: pd.DataFrame, dataset_name: str) -> Dict[str, Any]: meta = df_schema_summary(df) ctx = profile_text(dataset_name, meta) model = genai.GenerativeModel(model_name=TEXT_MODEL, generation_config={"temperature": 0.2, "response_mime_type": "application/json"}) prompt = """ You are Agent 1 (Profiler). Return STRICT JSON: { "dataset_name": "string", "shape": {"rows": int, "columns": int}, "duplicates": int, "columns": [{"name": "string", "dtype": "string", "unique_count": int, "missing": int, "missing_pct": float, "notes": "string"}], "summary": "short summary", "data_quality": { "high_missing_columns": [], "high_cardinality_columns": [], "potential_id_columns": [], "warnings": [] } } Be concise. Use only provided columns. """ parts = [ {"role": "user", "parts": [ f"Dataset name: {dataset_name}", "Compact profile:", ctx, "Schema JSON:", json.dumps(meta, ensure_ascii=False) ]} ] resp = model.generate_content(parts) out = json.loads(resp.text) # Store in Qdrant c = qdrant() vec = embed_text(ctx, "retrieval_document") c.upsert("dataset_profiles", points=[{"id": f"{dataset_name}-{int(time.time())}", "vector": vec, "payload": {"dataset_name": dataset_name, "profile": out}}]) return out def agent2(agent1_out: Dict[str, Any], df: pd.DataFrame) -> Dict[str, Any]: schema = [{"name": c["name"], "dtype": c["dtype"], "unique_count": c["unique_count"], "missing_pct": c["missing_pct"]} for c in agent1_out["columns"]] model = genai.GenerativeModel(model_name=TEXT_MODEL, generation_config={"temperature": 0.2, "response_mime_type": "application/json"}) prompt = """ You are Agent 2 (Visualization Planner). Produce EXACTLY 4 bar charts (count or mean/sum by category). STRICT JSON: { "charts": [ {"id":"bar1","kind":"bar","x":"col","y":"count_or_numeric","agg":"count|mean|sum","top_k":10,"title":"t","description":"d"}, {...},{...},{...} ] } Only use columns in schema. Avoid high-cardinality free-text. """ parts = [{"role":"user","parts":[ "Agent 1 output:", json.dumps(agent1_out, ensure_ascii=False), "Schema:", json.dumps({"schema": schema}, ensure_ascii=False) ]}] resp = model.generate_content(parts) plan = json.loads(resp.text) # Attach python_snippet (same safe snippet as before) plan["python_snippet"] = r''' import json import pandas as pd import plotly.express as px def _safe_bar(df, spec): x = spec["x"]; y = spec.get("y","count"); agg = spec.get("agg","count"); k=int(spec.get("top_k",10)) if x not in df.columns: raise ValueError(f"x not found: {x}") if agg=="count" or y=="count": vc = df[x].astype("object").value_counts(dropna=False).head(k) fig = px.bar(x=[str(i) for i in vc.index], y=vc.values, labels={"x":x,"y":"count"}, title=spec.get("title","")) fig.update_xaxes(type="category"); return fig else: if y not in df.columns: raise ValueError(f"y not found: {y}") grouped = df.groupby(x, dropna=False)[y].agg(agg).sort_values(ascending=False).head(k) fig = px.bar(x=[str(i) for i in grouped.index], y=grouped.values, labels={"x":x,"y":f"{agg}({y})"}, title=spec.get("title","")) fig.update_xaxes(type="category"); return fig def execute(df, plan): figs = [] for spec in plan.get("charts", []): if spec.get("kind")=="bar": figs.append(_safe_bar(df, spec)) return {"figures":[f.to_plotly_json() for f in figs], "charts_meta": plan.get("charts", [])} result = execute(df, plan) ''' return plan class PrepareReq(BaseModel): dataset_key: str snapshot_id: str local_path: str @app.post("/prepare") def prepare(req: PrepareReq): # If artifacts already exist, return them sdir = SNAPSHOTS_ROOT / req.snapshot_id if not sdir.exists(): raise HTTPException(404, "Snapshot not found on disk") adir = sdir / "artifacts" prof_path = adir / "profile.json" plan_path = adir / "plan.json" figs_path = adir / "figures.json" if prof_path.exists() and plan_path.exists() and figs_path.exists(): return {"status": "reused", "snapshot_id": req.snapshot_id, "artifacts": {"profile": str(prof_path), "plan": str(plan_path), "figures": str(figs_path)}} # Run Agents fmt = "csv" if req.local_path.lower().endswith(".csv") else "parquet" df = pd.read_csv(req.local_path) if fmt=="csv" else pd.read_parquet(req.local_path) a1 = agent1(df, req.dataset_key) a2 = agent2(a1, df) # Execute charts via Strict Executor import requests exec_req = { "code": a2["python_snippet"], "variables": {"plan": a2}, "datasets": [{"alias": "df", "path": req.local_path, "fmt": fmt, "read_kwargs": {}}], "limits": {"cpu_seconds": 10, "memory_mb": 1024, "wall_seconds": 20} } exec_resp = requests.post(f"{EXEC_URL}/execute", json=exec_req, timeout=60).json() if not exec_resp.get("ok"): raise HTTPException(500, f"Executor failed: {exec_resp.get('stderr')}") # Persist artifacts adir.mkdir(parents=True, exist_ok=True) prof_path.write_text(json.dumps(a1, indent=2)) plan_to_save = dict(a2); plan_to_save["python_snippet"] = "" plan_path.write_text(json.dumps(plan_to_save, indent=2)) figs_path.write_text(json.dumps(exec_resp["result"], indent=2)) return {"status": "created", "snapshot_id": req.snapshot_id, "artifacts": {"profile": str(prof_path), "plan": str(plan_path), "figures": str(figs_path)}} @app.get("/snapshots/{snapshot_id}/artifacts") def artifacts(snapshot_id: str): sdir = SNAPSHOTS_ROOT / snapshot_id / "artifacts" prof, plan, figs = sdir / "profile.json", sdir / "plan.json", sdir / "figures.json" if not (prof.exists() and plan.exists() and figs.exists()): raise HTTPException(404, "Artifacts not ready") return {"profile": json.loads(prof.read_text()), "plan": json.loads(plan.read_text()), "figures": json.loads(figs.read_text())} @app.get("/snapshots/{snapshot_id}/preprocess") def preprocess(snapshot_id: str, columns: str): cols = [c.strip() for c in columns.split(",") if c.strip()] sdir = SNAPSHOTS_ROOT / snapshot_id files = list(sdir.glob("*.*")) if not files: raise HTTPException(404, "Snapshot file missing") path = str(files[0]) df = pd.read_csv(path) if path.lower().endswith(".csv") else pd.read_parquet(path) out: Dict[str, Any] = {"columns": cols, "frames": {}, "stats": {}} if len(cols) == 1: c = cols[0] if c not in df.columns: raise HTTPException(400, f"Column not found: {c}") if str(df[c].dtype).startswith(("float","int")): desc = df[c].describe().to_dict() out["stats"][c] = {k: float(v) for k, v in desc.items()} vc = df[c].astype("object").value_counts(dropna=False).head(50) out["frames"][f"value_counts_{c}"] = {"index": [str(i) for i in vc.index], "values": vc.values.tolist()} elif len(cols) == 2: a, b = cols for col in cols: if col not in df.columns: raise HTTPException(400, f"Column not found: {col}") ct = pd.crosstab(df[a].astype("object"), df[b].astype("object")).head(50) out["frames"]["crosstab"] = {"index": [str(i) for i in ct.index], "columns": [str(i) for i in ct.columns], "values": ct.values.tolist()} else: # Fallback: per-column value_counts for c in cols[:3]: vc = df[c].astype("object").value_counts(dropna=False).head(20) out["frames"][f"value_counts_{c}"] = {"index": [str(i) for i in vc.index], "values": vc.values.tolist()} return out ``` What happens now - Upload pipeline: - POST /upload (Storage): saves to /data/snapshots//file.ext, fingerprints, dedups by sha256, pushes to HF repo, and returns snapshot info. - If new snapshot, it optionally calls Orchestrator /prepare (async fire-and-forget). Otherwise, it returns “reused” and the artifacts that are already there. - Orchestrator: - /prepare loads the local file, runs Agent 1 and Agent 2, calls the Strict Executor /execute to get Plotly figures JSON, and saves artifacts under /data/snapshots//artifacts/. - /snapshots/{id}/artifacts returns the saved profile + chart plan + figures JSON for frontend. - /snapshots/{id}/preprocess?columns=... returns preprocessed slices/stats to support your “click columns to show data” UX. - Strict Python Executor: - Already in place; we’re calling it with df alias and plan to render figures. Docker Compose (3 services + Qdrant) docker-compose.yml ```yaml version: "3.9" services: qdrant: image: qdrant/qdrant:latest ports: ["6333:6333"] volumes: - qdrant_data:/qdrant/storage python-exec-sandbox: build: ./python-exec-sandbox ports: ["8000:8000"] environment: - STORAGE_ROOT=/data volumes: - exec_data:/data # recommended in prod: # network_mode: "none" # read_only: true storage: build: ./storage ports: ["8010:8000"] environment: - HF_TOKEN=${HF_TOKEN} - HF_REPO=triflix/database - DATA_ROOT=/data - ORCH_URL=http://orchestrator:8000 volumes: - data_root:/data orchestrator: build: ./orchestrator ports: ["8020:8000"] environment: - GEMINI_API_KEY=${GEMINI_API_KEY} - QDRANT_URL=http://qdrant:6333 - EXEC_URL=http://python-exec-sandbox:8000 - ARTIFACTS_ROOT=/data/artifacts - SNAPSHOTS_ROOT=/data/snapshots volumes: - data_root:/data depends_on: - qdrant - python-exec-sandbox volumes: data_root: exec_data: qdrant_data: ``` Front-end flow - Upload file to Storage /upload with dataset_key: - If identical, response: status=reused + artifacts already available. - If new, status=snapshot_created; the Orchestrator starts preparing artifacts; poll /snapshots/{id}/artifacts until ready. - Render charts: - Use artifacts.figures (Plotly figure dicts) directly in the frontend. - “Click columns”: - Call Orchestrator /snapshots/{id}/preprocess?columns=colA,colB - Show value counts/crosstab/data slice; highlight clicked categories in your UI. Edge cases fully covered - Rename same file: same sha256 => reuse previous snapshot and artifacts. - Same shape (rows/cols) but different values: sha differs => new snapshot => new run. - Different dataset entirely: new dataset_key or separate snapshot series. - Repeat uploads of an identical file: instant reuse, no pipeline cost. - “Cycle that executed code of AI output and another output?”: - Agent 2 can be re-run anytime to generate new chart plans (e.g., different params), and you can /execute again on the sandbox. Store these as additional artifacts (plan_v2.json, figures_v2.json) if needed. The orchestrator code can accept a plan override endpoint if you want to drive this. Do you want me to: - Add a /upload_json variant for small DataFrames (base64 or records) so we skip file IO? - Add binary quantization in Qdrant to scale profiles? - Add JWT auth to Storage/Orchestrator/Executor? - Add “plan override” endpoint that takes a new chart plan and executes it against an existing snapshot?