phase2withfrontend / workflow.md
triflix's picture
Create workflow.md
ad8abe4 verified

I’m with you. Let’s wire the whole flow so uploads are snapshotted, deduped, and only re-run the AI pipeline when needed. We’ll keep your HF dataset store as the “remote S3,” maintain local snapshots, and call the strict Python Executor API to render charts from Agent 2’s python_snippet.

Overview

  • Service A: Storage + Snapshotter (extends your HF uploader)
    • Handles uploads, computes fingerprint (sha256, shape, schema), dedup across renames, keeps a manifest of snapshots and artifacts, pushes to HF.
    • If identical snapshot already exists, it returns the previous AI artifacts without re-running.
    • If changed, it creates a new snapshot and triggers the pipeline.
  • Service B: Analytics Orchestrator (Gemini + Qdrant + Chart Plan + Execution)
    • Runs Agent 1 (Profiler) and Agent 2 (Viz Planner) and stores artifacts (profile.json, plan.json, figures.json).
    • Calls the Strict Python Executor API (/execute) to run the python_snippet against the snapshot dataset.
  • Service C: Strict Python Executor (already done above)
    • Executes python_snippet with df + plan and returns Plotly figure JSON.

Data and UX notes

  • Renamed same file: dedup by content sha256 → reuse snapshot and all AI artifacts.
  • Same shape (rows/cols) but data changed: new snapshot → re-run pipeline.
  • Completely different dataset: new dataset key or new snapshot under a different key.
  • Frontend “click columns”:
    • An API returns preprocessed slices (value counts, stats, crosstabs) for the clicked columns and highlights those in the charts.
    • We serve preprocessed data from the snapshot to avoid recomputing.

Service A: Storage + Snapshotter (FastAPI)

  • Replaces/extends your uploader with fingerprinting, dedup, and snapshot registry.
  • Still uses CommitScheduler to push to HF.

requirements.txt

  • fastapi, uvicorn, pydantic, pandas, pyarrow, orjson, huggingface_hub

app/storage_main.py

import os, io, json, uuid, time, hashlib, asyncio
from pathlib import Path
from typing import Any, Dict, Optional, List
from fastapi import FastAPI, UploadFile, File, HTTPException, Form
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
from huggingface_hub import CommitScheduler
import pandas as pd

# ---------------- Config ----------------
HF_TOKEN    = os.environ.get("HF_TOKEN")
BACKUP_REPO = os.environ.get("HF_REPO", "triflix/database")
UPLOAD_DIR  = Path(os.environ.get("UPLOAD_DIR", "/tmp/uploadedfiles"))
DATA_ROOT   = Path(os.environ.get("DATA_ROOT", "/data"))  # local canonical storage
ARTIFACTS   = DATA_ROOT / "artifacts"                     # per-snapshot artifacts
REGISTRY    = DATA_ROOT / "manifest.json"                 # global manifest

UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
DATA_ROOT.mkdir(parents=True, exist_ok=True)
ARTIFACTS.mkdir(parents=True, exist_ok=True)
if not REGISTRY.exists():
    REGISTRY.write_text(json.dumps({"snapshots": {}, "by_sha": {}, "by_dataset": {}}, indent=2))

scheduler = CommitScheduler(
    repo_id      = BACKUP_REPO,
    repo_type    = "dataset",
    folder_path  = DATA_ROOT,    # push entire /data tree (datasets + artifacts)
    path_in_repo = "data",
    every        = 60*24*365,
    token        = HF_TOKEN,
)

app = FastAPI(title="Storage + Snapshotter")

# --------------- Models -----------------
class SnapshotMeta(BaseModel):
    snapshot_id: str
    dataset_key: str
    file_sha256: str
    file_name: str
    fmt: str
    n_rows: int
    n_cols: int
    columns: List[str]
    dtypes: Dict[str, str]
    sample_hash: str
    local_path: str
    created_at: float
    artifacts: Dict[str, str] = Field(default_factory=dict)  # profile.json, plan.json, figures.json

# --------------- Helpers ----------------
def load_manifest() -> Dict[str, Any]:
    try:
        return json.loads(REGISTRY.read_text())
    except Exception:
        return {"snapshots": {}, "by_sha": {}, "by_dataset": {}}

def save_manifest(man: Dict[str, Any]):
    tmp = REGISTRY.with_suffix(".tmp.json")
    tmp.write_text(json.dumps(man, indent=2))
    tmp.replace(REGISTRY)

def sha256_of_bytes(b: bytes) -> str:
    h = hashlib.sha256()
    h.update(b)
    return h.hexdigest()

def sha256_of_file(path: Path) -> str:
    h = hashlib.sha256()
    with path.open("rb") as f:
        for chunk in iter(lambda: f.read(1024*1024), b""):
            h.update(chunk)
    return h.hexdigest()

def sniff_fmt(filename: str) -> str:
    lower = filename.lower()
    if lower.endswith(".csv"): return "csv"
    if lower.endswith(".parquet") or lower.endswith(".pq"): return "parquet"
    raise ValueError("Unsupported file type (csv/parquet only)")

def fingerprint_dataset(path: Path, fmt: str, sample_n: int = 5) -> Dict[str, Any]:
    if fmt == "csv":
        # Count rows quickly without loading full DF into memory
        with path.open("rb") as f:
            n_rows = sum(1 for _ in f) - 1  # minus header
        df_head = pd.read_csv(path, nrows=sample_n)
        # tail: read all but keep last 5 (OK for mid-size; switch to chunks for very large)
        df_tail = pd.read_csv(path).tail(sample_n)
        cols = list(df_head.columns)
        dtypes = {}
        for c in cols:
            try:
                dtypes[c] = str(pd.read_csv(path, nrows=1)[c].dtype)
            except Exception:
                dtypes[c] = "unknown"
        n_cols = len(cols)
    else:  # parquet
        df = pd.read_parquet(path)
        n_rows, n_cols = df.shape
        cols = list(df.columns)
        dtypes = {c: str(df[c].dtype) for c in cols}
        df_head = df.head(sample_n)
        df_tail = df.tail(sample_n)

    # sample hash from head+tail to catch trivial renames
    sample_blob = json.dumps({
        "head": df_head.to_dict(orient="records"),
        "tail": df_tail.to_dict(orient="records")
    }, ensure_ascii=False, default=str).encode("utf-8")
    sample_hash = sha256_of_bytes(sample_blob)

    return {
        "n_rows": int(n_rows),
        "n_cols": int(n_cols),
        "columns": cols,
        "dtypes": dtypes,
        "sample_hash": sample_hash
    }

def ensure_snapshot_dirs(snapshot_id: str) -> Path:
    sdir = DATA_ROOT / "snapshots" / snapshot_id
    sdir.mkdir(parents=True, exist_ok=True)
    (sdir / "artifacts").mkdir(parents=True, exist_ok=True)
    return sdir

# --------------- Upload + Snapshot logic ----------------
@app.on_event("shutdown")
def _stop_scheduler():
    scheduler.stop()

@app.post("/upload")
async def upload(
    file: UploadFile = File(...),
    dataset_key: Optional[str] = Form(None),
    trigger_pipeline: bool = Form(True),
):
    """
    - Saves file locally under /data/uploads
    - Computes sha256, shape, schema, sample hash
    - Dedups across renames using sha256
    - Returns snapshot + optionally triggers pipeline by HTTP call (deferred to Orchestrator)
    """
    try:
        # 1) Save temp
        tmp = UPLOAD_DIR / f"{uuid.uuid4()}_{file.filename}"
        content = await file.read()
        tmp.write_bytes(content)

        # 2) File meta
        fmt = sniff_fmt(file.filename)
        fsha = sha256_of_file(tmp)

        # 3) Dedup by sha
        man = load_manifest()
        existing_snapshot_id = man["by_sha"].get(fsha)

        if not dataset_key:
            # Use filename stem as default dataset key
            dataset_key = Path(file.filename).stem.lower().replace(" ", "_")

        if existing_snapshot_id:
            snap = man["snapshots"][existing_snapshot_id]
            # Also ensure dataset_key list contains this snapshot
            byds = man["by_dataset"].setdefault(dataset_key, [])
            if existing_snapshot_id not in byds:
                byds.append(existing_snapshot_id)
                save_manifest(man)
            # Cleanup temp (we already have canonical file)
            try: tmp.unlink()
            except: pass

            # Return existing snapshot (reuse artifacts)
            return {
                "status": "reused",
                "reason": "identical_content_sha",
                "dataset_key": dataset_key,
                "snapshot_id": existing_snapshot_id,
                "artifacts": snap.get("artifacts", {}),
                "meta": snap
            }

        # 4) New snapshot path
        snapshot_id = fsha[:32]
        sdir = ensure_snapshot_dirs(snapshot_id)
        dest = sdir / file.filename
        tmp.replace(dest)

        # 5) Fingerprint
        fp = fingerprint_dataset(dest, fmt)
        meta = SnapshotMeta(
            snapshot_id=snapshot_id,
            dataset_key=dataset_key,
            file_sha256=fsha,
            file_name=file.filename,
            fmt=fmt,
            n_rows=fp["n_rows"],
            n_cols=fp["n_cols"],
            columns=fp["columns"],
            dtypes=fp["dtypes"],
            sample_hash=fp["sample_hash"],
            local_path=str(dest),
            created_at=time.time(),
            artifacts={}
        ).model_dump()

        # 6) Update registry
        man["snapshots"][snapshot_id] = meta
        man["by_sha"][fsha] = snapshot_id
        man["by_dataset"].setdefault(dataset_key, []).append(snapshot_id)
        save_manifest(man)

        # 7) Push to HF (async)
        await asyncio.to_thread(scheduler.trigger)

        # 8) Optionally call the Orchestrator to prepare artifacts
        if trigger_pipeline:
            # POST to orchestrator (configure ORCH_URL env)
            orch = os.environ.get("ORCH_URL")
            if orch:
                import requests
                try:
                    requests.post(f"{orch}/prepare", json={
                        "dataset_key": dataset_key,
                        "snapshot_id": snapshot_id,
                        "local_path": meta["local_path"]
                    }, timeout=5)
                except Exception:
                    pass

        return {
            "status": "snapshot_created",
            "dataset_key": dataset_key,
            "snapshot_id": snapshot_id,
            "meta": meta
        }

    except Exception as e:
        raise HTTPException(500, f"Upload failed: {e}")

@app.get("/datasets/{dataset_key}/snapshots")
def list_snapshots(dataset_key: str):
    man = load_manifest()
    ids = man["by_dataset"].get(dataset_key, [])
    return {"dataset_key": dataset_key, "snapshots": [man["snapshots"][i] for i in ids]}

@app.get("/snapshots/{snapshot_id}")
def get_snapshot(snapshot_id: str):
    man = load_manifest()
    snap = man["snapshots"].get(snapshot_id)
    if not snap:
        raise HTTPException(404, "Snapshot not found")
    return snap

Key rules implemented

  • Exact same file (sha256) → “reused” snapshot and artifacts. This covers rename uploads too.
  • Otherwise we create a new snapshot (even if rows/cols match but values changed).
  • Everything is recorded in /data/manifest.json; data is under /data/snapshots//.

Service B: Analytics Orchestrator (FastAPI)

  • Runs Agent 1 + Agent 2, stores artifacts, and calls Strict Python Executor.

requirements.txt

  • fastapi, uvicorn, pydantic, pandas, numpy, plotly, google-generativeai, qdrant-client, orjson

app/orchestrator_main.py

import os, json, time
from typing import Any, Dict, Optional
from pathlib import Path
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import pandas as pd

import google.generativeai as genai
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams

# Reuse your LLM/Qdrant helpers from earlier response or import them as a module
# For brevity, define minimal here

GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY", "")
if not GEMINI_API_KEY:
    raise RuntimeError("Set GEMINI_API_KEY")
genai.configure(api_key=GEMINI_API_KEY)
TEXT_MODEL = os.environ.get("GEMINI_TEXT_MODEL", "gemini-1.5-pro")
EMBED_MODEL = "models/embedding-001"
EMBED_DIM = 768

QDRANT_URL = os.environ.get("QDRANT_URL", "http://localhost:6333")
ARTIFACTS_ROOT = Path(os.environ.get("ARTIFACTS_ROOT", "/data/artifacts"))
SNAPSHOTS_ROOT = Path(os.environ.get("SNAPSHOTS_ROOT", "/data/snapshots"))
EXEC_URL = os.environ.get("EXEC_URL", "http://python-exec-sandbox:8000")

app = FastAPI(title="Analytics Orchestrator")

def qdrant():
    c = QdrantClient(url=QDRANT_URL)
    # Ensure collection
    try:
        c.get_collection("dataset_profiles")
    except Exception:
        c.create_collection("dataset_profiles",
            vectors_config=VectorParams(size=EMBED_DIM, distance=Distance.COSINE))
    return c

def embed_text(text: str, task: str) -> list[float]:
    return genai.embed_content(model=EMBED_MODEL, content=text, task_type=task)["embedding"]

def df_schema_summary(df: pd.DataFrame) -> Dict[str, Any]:
    cols = []
    for c in df.columns:
        s = df[c]
        cols.append({
            "name": c,
            "dtype": str(s.dtype),
            "unique_count": int(s.nunique(dropna=True)),
            "missing": int(s.isna().sum()),
            "missing_pct": float(s.isna().mean())
        })
    return {
        "shape": {"rows": int(df.shape[0]), "columns": int(df.shape[1])},
        "duplicates": int(df.duplicated().sum()),
        "columns": cols
    }

def profile_text(dataset_name: str, meta: Dict[str, Any]) -> str:
    lines = [
        f"Dataset: {dataset_name}",
        f"Shape: {meta['shape']['rows']}x{meta['shape']['columns']}",
        f"Duplicate rows: {meta['duplicates']}"
    ]
    for c in meta["columns"]:
        lines.append(f"- {c['name']} ({c['dtype']}) unique={c['unique_count']} missing={c['missing']}({round(100*c['missing_pct'],1)}%)")
    return "\n".join(lines)

def agent1(df: pd.DataFrame, dataset_name: str) -> Dict[str, Any]:
    meta = df_schema_summary(df)
    ctx = profile_text(dataset_name, meta)
    model = genai.GenerativeModel(model_name=TEXT_MODEL,
                                  generation_config={"temperature": 0.2, "response_mime_type": "application/json"})
    prompt = """
You are Agent 1 (Profiler).
Return STRICT JSON:
{
  "dataset_name": "string",
  "shape": {"rows": int, "columns": int},
  "duplicates": int,
  "columns": [{"name": "string", "dtype": "string", "unique_count": int, "missing": int, "missing_pct": float, "notes": "string"}],
  "summary": "short summary",
  "data_quality": {
     "high_missing_columns": [],
     "high_cardinality_columns": [],
     "potential_id_columns": [],
     "warnings": []
  }
}
Be concise. Use only provided columns.
"""
    parts = [
        {"role": "user", "parts": [
            f"Dataset name: {dataset_name}",
            "Compact profile:",
            ctx,
            "Schema JSON:",
            json.dumps(meta, ensure_ascii=False)
        ]}
    ]
    resp = model.generate_content(parts)
    out = json.loads(resp.text)

    # Store in Qdrant
    c = qdrant()
    vec = embed_text(ctx, "retrieval_document")
    c.upsert("dataset_profiles", points=[{"id": f"{dataset_name}-{int(time.time())}", "vector": vec, "payload": {"dataset_name": dataset_name, "profile": out}}])
    return out

def agent2(agent1_out: Dict[str, Any], df: pd.DataFrame) -> Dict[str, Any]:
    schema = [{"name": c["name"], "dtype": c["dtype"], "unique_count": c["unique_count"], "missing_pct": c["missing_pct"]}
              for c in agent1_out["columns"]]
    model = genai.GenerativeModel(model_name=TEXT_MODEL,
                                  generation_config={"temperature": 0.2, "response_mime_type": "application/json"})
    prompt = """
You are Agent 2 (Visualization Planner).
Produce EXACTLY 4 bar charts (count or mean/sum by category).
STRICT JSON:
{
  "charts": [
    {"id":"bar1","kind":"bar","x":"col","y":"count_or_numeric","agg":"count|mean|sum","top_k":10,"title":"t","description":"d"},
    {...},{...},{...}
  ]
}
Only use columns in schema. Avoid high-cardinality free-text.
"""
    parts = [{"role":"user","parts":[
        "Agent 1 output:", json.dumps(agent1_out, ensure_ascii=False),
        "Schema:", json.dumps({"schema": schema}, ensure_ascii=False)
    ]}]
    resp = model.generate_content(parts)
    plan = json.loads(resp.text)

    # Attach python_snippet (same safe snippet as before)
    plan["python_snippet"] = r'''
import json
import pandas as pd
import plotly.express as px
def _safe_bar(df, spec):
    x = spec["x"]; y = spec.get("y","count"); agg = spec.get("agg","count"); k=int(spec.get("top_k",10))
    if x not in df.columns: raise ValueError(f"x not found: {x}")
    if agg=="count" or y=="count":
        vc = df[x].astype("object").value_counts(dropna=False).head(k)
        fig = px.bar(x=[str(i) for i in vc.index], y=vc.values, labels={"x":x,"y":"count"}, title=spec.get("title",""))
        fig.update_xaxes(type="category"); return fig
    else:
        if y not in df.columns: raise ValueError(f"y not found: {y}")
        grouped = df.groupby(x, dropna=False)[y].agg(agg).sort_values(ascending=False).head(k)
        fig = px.bar(x=[str(i) for i in grouped.index], y=grouped.values, labels={"x":x,"y":f"{agg}({y})"}, title=spec.get("title",""))
        fig.update_xaxes(type="category"); return fig
def execute(df, plan):
    figs = []
    for spec in plan.get("charts", []):
        if spec.get("kind")=="bar": figs.append(_safe_bar(df, spec))
    return {"figures":[f.to_plotly_json() for f in figs], "charts_meta": plan.get("charts", [])}
result = execute(df, plan)
'''
    return plan

class PrepareReq(BaseModel):
    dataset_key: str
    snapshot_id: str
    local_path: str

@app.post("/prepare")
def prepare(req: PrepareReq):
    # If artifacts already exist, return them
    sdir = SNAPSHOTS_ROOT / req.snapshot_id
    if not sdir.exists():
        raise HTTPException(404, "Snapshot not found on disk")
    adir = sdir / "artifacts"
    prof_path = adir / "profile.json"
    plan_path = adir / "plan.json"
    figs_path = adir / "figures.json"

    if prof_path.exists() and plan_path.exists() and figs_path.exists():
        return {"status": "reused", "snapshot_id": req.snapshot_id,
                "artifacts": {"profile": str(prof_path), "plan": str(plan_path), "figures": str(figs_path)}}

    # Run Agents
    fmt = "csv" if req.local_path.lower().endswith(".csv") else "parquet"
    df = pd.read_csv(req.local_path) if fmt=="csv" else pd.read_parquet(req.local_path)
    a1 = agent1(df, req.dataset_key)
    a2 = agent2(a1, df)

    # Execute charts via Strict Executor
    import requests
    exec_req = {
        "code": a2["python_snippet"],
        "variables": {"plan": a2},
        "datasets": [{"alias": "df", "path": req.local_path, "fmt": fmt, "read_kwargs": {}}],
        "limits": {"cpu_seconds": 10, "memory_mb": 1024, "wall_seconds": 20}
    }
    exec_resp = requests.post(f"{EXEC_URL}/execute", json=exec_req, timeout=60).json()
    if not exec_resp.get("ok"):
        raise HTTPException(500, f"Executor failed: {exec_resp.get('stderr')}")

    # Persist artifacts
    adir.mkdir(parents=True, exist_ok=True)
    prof_path.write_text(json.dumps(a1, indent=2))
    plan_to_save = dict(a2); plan_to_save["python_snippet"] = "<omitted>"
    plan_path.write_text(json.dumps(plan_to_save, indent=2))
    figs_path.write_text(json.dumps(exec_resp["result"], indent=2))

    return {"status": "created", "snapshot_id": req.snapshot_id,
            "artifacts": {"profile": str(prof_path), "plan": str(plan_path), "figures": str(figs_path)}}

@app.get("/snapshots/{snapshot_id}/artifacts")
def artifacts(snapshot_id: str):
    sdir = SNAPSHOTS_ROOT / snapshot_id / "artifacts"
    prof, plan, figs = sdir / "profile.json", sdir / "plan.json", sdir / "figures.json"
    if not (prof.exists() and plan.exists() and figs.exists()):
        raise HTTPException(404, "Artifacts not ready")
    return {"profile": json.loads(prof.read_text()),
            "plan": json.loads(plan.read_text()),
            "figures": json.loads(figs.read_text())}

@app.get("/snapshots/{snapshot_id}/preprocess")
def preprocess(snapshot_id: str, columns: str):
    cols = [c.strip() for c in columns.split(",") if c.strip()]
    sdir = SNAPSHOTS_ROOT / snapshot_id
    files = list(sdir.glob("*.*"))
    if not files:
        raise HTTPException(404, "Snapshot file missing")
    path = str(files[0])
    df = pd.read_csv(path) if path.lower().endswith(".csv") else pd.read_parquet(path)

    out: Dict[str, Any] = {"columns": cols, "frames": {}, "stats": {}}
    if len(cols) == 1:
        c = cols[0]
        if c not in df.columns: raise HTTPException(400, f"Column not found: {c}")
        if str(df[c].dtype).startswith(("float","int")):
            desc = df[c].describe().to_dict()
            out["stats"][c] = {k: float(v) for k, v in desc.items()}
        vc = df[c].astype("object").value_counts(dropna=False).head(50)
        out["frames"][f"value_counts_{c}"] = {"index": [str(i) for i in vc.index], "values": vc.values.tolist()}
    elif len(cols) == 2:
        a, b = cols
        for col in cols:
            if col not in df.columns: raise HTTPException(400, f"Column not found: {col}")
        ct = pd.crosstab(df[a].astype("object"), df[b].astype("object")).head(50)
        out["frames"]["crosstab"] = {"index": [str(i) for i in ct.index], "columns": [str(i) for i in ct.columns], "values": ct.values.tolist()}
    else:
        # Fallback: per-column value_counts
        for c in cols[:3]:
            vc = df[c].astype("object").value_counts(dropna=False).head(20)
            out["frames"][f"value_counts_{c}"] = {"index": [str(i) for i in vc.index], "values": vc.values.tolist()}
    return out

What happens now

  • Upload pipeline:

    • POST /upload (Storage): saves to /data/snapshots//file.ext, fingerprints, dedups by sha256, pushes to HF repo, and returns snapshot info.
    • If new snapshot, it optionally calls Orchestrator /prepare (async fire-and-forget). Otherwise, it returns “reused” and the artifacts that are already there.
  • Orchestrator:

    • /prepare loads the local file, runs Agent 1 and Agent 2, calls the Strict Executor /execute to get Plotly figures JSON, and saves artifacts under /data/snapshots//artifacts/.
    • /snapshots/{id}/artifacts returns the saved profile + chart plan + figures JSON for frontend.
    • /snapshots/{id}/preprocess?columns=... returns preprocessed slices/stats to support your “click columns to show data” UX.
  • Strict Python Executor:

    • Already in place; we’re calling it with df alias and plan to render figures.

Docker Compose (3 services + Qdrant) docker-compose.yml

version: "3.9"
services:
  qdrant:
    image: qdrant/qdrant:latest
    ports: ["6333:6333"]
    volumes:
      - qdrant_data:/qdrant/storage

  python-exec-sandbox:
    build: ./python-exec-sandbox
    ports: ["8000:8000"]
    environment:
      - STORAGE_ROOT=/data
    volumes:
      - exec_data:/data
    # recommended in prod:
    # network_mode: "none"
    # read_only: true

  storage:
    build: ./storage
    ports: ["8010:8000"]
    environment:
      - HF_TOKEN=${HF_TOKEN}
      - HF_REPO=triflix/database
      - DATA_ROOT=/data
      - ORCH_URL=http://orchestrator:8000
    volumes:
      - data_root:/data

  orchestrator:
    build: ./orchestrator
    ports: ["8020:8000"]
    environment:
      - GEMINI_API_KEY=${GEMINI_API_KEY}
      - QDRANT_URL=http://qdrant:6333
      - EXEC_URL=http://python-exec-sandbox:8000
      - ARTIFACTS_ROOT=/data/artifacts
      - SNAPSHOTS_ROOT=/data/snapshots
    volumes:
      - data_root:/data
    depends_on:
      - qdrant
      - python-exec-sandbox

volumes:
  data_root:
  exec_data:
  qdrant_data:

Front-end flow

  • Upload file to Storage /upload with dataset_key:
    • If identical, response: status=reused + artifacts already available.
    • If new, status=snapshot_created; the Orchestrator starts preparing artifacts; poll /snapshots/{id}/artifacts until ready.
  • Render charts:
    • Use artifacts.figures (Plotly figure dicts) directly in the frontend.
  • “Click columns”:
    • Call Orchestrator /snapshots/{id}/preprocess?columns=colA,colB
    • Show value counts/crosstab/data slice; highlight clicked categories in your UI.

Edge cases fully covered

  • Rename same file: same sha256 => reuse previous snapshot and artifacts.
  • Same shape (rows/cols) but different values: sha differs => new snapshot => new run.
  • Different dataset entirely: new dataset_key or separate snapshot series.
  • Repeat uploads of an identical file: instant reuse, no pipeline cost.
  • “Cycle that executed code of AI output and another output?”:
    • Agent 2 can be re-run anytime to generate new chart plans (e.g., different params), and you can /execute again on the sandbox. Store these as additional artifacts (plan_v2.json, figures_v2.json) if needed. The orchestrator code can accept a plan override endpoint if you want to drive this.

Do you want me to:

  • Add a /upload_json variant for small DataFrames (base64 or records) so we skip file IO?
  • Add binary quantization in Qdrant to scale profiles?
  • Add JWT auth to Storage/Orchestrator/Executor?
  • Add “plan override” endpoint that takes a new chart plan and executes it against an existing snapshot?