Spaces:
Sleeping
Sleeping
| """ | |
| Automated Data-Analysis Pipeline with Agent Prompts + Gemini (google-genai) | |
| Changes applied: | |
| - Use GEMINI_API_KEY from environment (no hardcoded key) | |
| - Stronger, model-proof PROMPTS that forbid plotting and require `result` assignment | |
| - Extended DISALLOWED_PATTERNS to block plotting libraries and plotting methods | |
| - Validation step after planning: drop model-provided code that lacks `result` or uses plotting tokens; record execution_errors | |
| - Execution still performs safety checks and falls back to deterministic generators when needed | |
| """ | |
| import os | |
| import sys | |
| import json | |
| import argparse | |
| import random | |
| import re | |
| import logging | |
| from typing import Any, Dict, List, Tuple, Optional | |
| import pandas as pd | |
| import numpy as np | |
| # google-genai | |
| from google import genai | |
| from google.genai import types | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger("pipeline") | |
| # --------------------------- | |
| # Agent system prompts (strict, plotting banned) | |
| # --------------------------- | |
| PROMPTS = { | |
| "file_ingestion": ( | |
| "You are a file ingestion agent. Detect file type; if Excel enumerate sheets and pick the specified sheet or default to the first. " | |
| "Load the chosen sheet into a pandas DataFrame and return only metadata (no narrative): " | |
| '{"file_type":"<.csv|.xlsx|...>", "sheet_names":[...], "selected_sheet":"..."}' | |
| ), | |
| "preprocessing": ( | |
| "You are a preprocessing agent. Clean and normalize the dataset deterministically. " | |
| "Operations allowed: trim strings, coerce numeric columns with pandas.to_numeric, fill numeric NaNs with median, fill object NaNs with mode, " | |
| "generate one-line schema summary. RETURN JSON only: {\"actions\": [...], \"schema\": {\"columns\":[{\"name\":\"...\",\"dtype\":\"...\",\"n_unique\":N},...], \"n_rows\":N}}. " | |
| "Do NOT print or return any code, diagrams, or explanations." | |
| ), | |
| "sampling": ( | |
| "You are a sampling agent. From the cleaned dataframe produce three JSON arrays: head(5), tail(5), random(5). " | |
| "Return JSON: {\"head\": [...], \"tail\": [...], \"random\": [...]} where each array contains row dicts. Do NOT include extra fields." | |
| ), | |
| "classification": ( | |
| "You are a classification agent. Examine provided samples and schema. Identify dataset domain (one-word) and propose at least SIX visualization tasks. " | |
| "Each task must be a JSON object: {\"task_id\":\"tN\",\"chart_type\":\"pie|bar|line|scatter|histogram|boxplot\",\"target_columns\":[...]," | |
| "\"aggregation\": null|\"count\"|\"sum\"|\"mean\",\"reasoning\":\"one-sentence\"}. " | |
| "Return JSON exactly: {\"domain\":\"...\",\"tasks\":[...]} and nothing else. Do NOT include code. Do NOT recommend plotting libraries." | |
| ), | |
| "planning": ( | |
| "You are a planning agent. Input: the classification JSON + schema + small samples. Produce at least SIX task entries. " | |
| "For each task output a Python/pandas code snippet that uses ONLY pandas and numpy (and the dataframe variable `df`) and assigns the final result to a variable named `result`. " | |
| "REQUIREMENTS for the code string: " | |
| " - Must NOT import or reference matplotlib, seaborn, plotly, altair, bokeh, or any plotting functions. " | |
| " - Must NOT call pandas plotting methods (e.g. .plot(), .hist() wrapper that uses matplotlib). " | |
| " - Must NOT use eval/exec/compile or open(). " | |
| " - Allowed names: df, pd, np, len, sum, min, max, round, sorted. " | |
| " - The code must produce `result` as a list of dictionaries ready for JSON serialization (use .to_dict(orient='records') or list comprehension). " | |
| " - Return JSON exactly: {\"tasks\":[ {\"task_id\":\"t1\",\"chart_type\":\"pie\",\"target_columns\":[\"colA\"]," | |
| "\"aggregation\":\"count\",\"reasoning\":\"...\",\"code\":\"result = df.groupby('colA').size().reset_index(name=\\'value\\').to_dict(orient=\\'records\\')\" }, ... ] }" | |
| ), | |
| "execution": ( | |
| "You are an execution agent. You will run model-provided code in a restricted execution environment WITHOUT plotting libraries. " | |
| "The executor expects the code to assign a variable named `result` containing a list of dicts. " | |
| "Rules: do not rely on plotting functions. Use pandas/numpy for aggregation and numeric work only. " | |
| "Schema expectations per chart type (examples only): " | |
| " Pie β [{\"name\":\"...\",\"value\":number}], " | |
| " Bar β [{\"label\":\"...\",\"metric1\":number, ...}], " | |
| " Line β [{\"x\":...,\"y\":...}] (x may be ISO string), " | |
| " Scatter β [{\"x\":number,\"y\":number}], " | |
| " Histogram β [{\"bin\":\"start-end\",\"count\":number}], " | |
| " Boxplot β [{\"category\":\"...\",\"q1\":number,\"median\":number,\"q3\":number}]. " | |
| "Return nothing else; the pipeline will read `result` after execution. If you must provide example code show it only as a code string and follow the allowed-names rule." | |
| ), | |
| "output": ( | |
| "You are an output agent. Aggregate final chart JSON objects into a single JSON object with keys: " | |
| '"pie","bar","line","scatter","histogram","boxplot". Each key maps to an array (may be empty). Output JSON only.' | |
| ) | |
| } | |
| # --------------------------- | |
| # Utility and safety helpers | |
| # --------------------------- | |
| DISALLOWED_PATTERNS = [ | |
| r"__\w+__", # dunder | |
| r"\bimport\s+os\b", | |
| r"\bimport\s+sys\b", | |
| r"\bimport\s+subprocess\b", | |
| r"\bimport\s+socket\b", | |
| r"\bimport\s+requests\b", | |
| r"\bopen\s*\(", | |
| r"\beval\s*\(", | |
| r"\bexec\s*\(", | |
| r"\bcompile\s*\(", | |
| r"\bsystem\s*\(", | |
| r"\bPopen\b", | |
| r"\bsh\b", | |
| # plotting libraries / functions | |
| r"\bmatplotlib\b", | |
| r"\bseaborn\b", | |
| r"\bplotly\b", | |
| r"\baltair\b", | |
| r"\bbokeh\b", | |
| r"\.plot\s*\(", | |
| r"\.hist\s*\(", | |
| r"\.boxplot\s*\(", | |
| r"\bpyplot\b", | |
| r"\bplt\b", | |
| ] | |
| def code_is_safe(code: str) -> Tuple[bool, Optional[str]]: | |
| lowered = code | |
| for pat in DISALLOWED_PATTERNS: | |
| if re.search(pat, lowered, flags=re.I): | |
| return False, f"disallowed pattern: {pat}" | |
| return True, None | |
| def ensure_datetime_series(s: pd.Series) -> pd.Series: | |
| if not np.issubdtype(s.dtype, np.datetime64): | |
| try: | |
| s = pd.to_datetime(s, errors="coerce") | |
| except Exception: | |
| s = pd.to_datetime(s.astype(str), errors="coerce") | |
| return s | |
| def simple_schema(df: pd.DataFrame) -> Dict[str, Any]: | |
| return { | |
| "columns": [ | |
| {"name": c, "dtype": str(df[c].dtype), "n_unique": int(df[c].nunique(dropna=True))} | |
| for c in df.columns | |
| ], | |
| "n_rows": int(len(df)), | |
| } | |
| def to_json_serializable(obj): | |
| if isinstance(obj, (np.integer, np.int64, np.int32)): | |
| return int(obj) | |
| if isinstance(obj, (np.floating, np.float32, np.float64)): | |
| return float(obj) | |
| if isinstance(obj, (np.ndarray,)): | |
| return obj.tolist() | |
| if pd.isna(obj): | |
| return None | |
| return obj | |
| # --------------------------- | |
| # Pipeline agents (local) | |
| # --------------------------- | |
| def ingest_file(path: str, sheet: Optional[str] = None) -> Tuple[pd.DataFrame, Dict[str, Any]]: | |
| ext = os.path.splitext(path)[1].lower() | |
| metadata = {"file_type": ext, "sheet_names": None, "selected_sheet": None} | |
| if ext in [".csv", ".txt"]: | |
| df = pd.read_csv(path) | |
| metadata["selected_sheet"] = "csv" | |
| elif ext in [".xls", ".xlsx"]: | |
| xls = pd.ExcelFile(path) | |
| sheets = xls.sheet_names | |
| metadata["sheet_names"] = sheets | |
| chosen = sheet if sheet and sheet in sheets else sheets[0] | |
| metadata["selected_sheet"] = chosen | |
| df = pd.read_excel(xls, sheet_name=chosen) | |
| else: | |
| raise ValueError("Unsupported file type: " + ext) | |
| metadata["file_type"] = ext | |
| return df, metadata | |
| def preprocess_df(df: pd.DataFrame) -> Tuple[pd.DataFrame, Dict[str, Any]]: | |
| actions = [] | |
| df = df.copy() | |
| # Strip column names | |
| df.columns = [str(c).strip() for c in df.columns] | |
| # Trim whitespace in object columns | |
| object_cols = df.select_dtypes(include="object").columns.tolist() | |
| for c in object_cols: | |
| try: | |
| df[c] = df[c].where(df[c].isna(), df[c].astype(str).str.strip()) | |
| except Exception: | |
| pass | |
| # Numeric inference | |
| for c in df.columns: | |
| if df[c].dtype == object: | |
| # try convert to numeric | |
| coerced = pd.to_numeric(df[c], errors="coerce") | |
| non_na = coerced.notna().sum() | |
| if non_na >= max(1, 0.5 * len(df)): # if at least 50% convertable, cast | |
| df[c] = coerced | |
| actions.append(f"coerced {c} -> numeric") | |
| # Fill numeric nulls with median | |
| num_cols = df.select_dtypes(include=[np.number]).columns.tolist() | |
| for c in num_cols: | |
| median = df[c].median() | |
| if pd.isna(median): | |
| median = 0 | |
| df[c] = df[c].fillna(median) | |
| actions.append(f"filled numeric {c} nulls with median {median}") | |
| # Fill object nulls with mode | |
| for c in object_cols: | |
| try: | |
| mode = df[c].mode().iloc[0] if not df[c].mode().empty else "" | |
| except Exception: | |
| mode = "" | |
| df[c] = df[c].fillna(mode) | |
| actions.append(f"filled object {c} nulls with mode '{mode}'") | |
| schema = simple_schema(df) | |
| return df, {"actions": actions, "schema": schema} | |
| def sample_df(df: pd.DataFrame, n: int = 5) -> Dict[str, Any]: | |
| head = df.head(n).to_dict(orient="records") | |
| tail = df.tail(n).to_dict(orient="records") | |
| if len(df) <= n: | |
| rnd = df.sample(frac=1.0).to_dict(orient="records") | |
| else: | |
| rnd = df.sample(n=n, random_state=42).to_dict(orient="records") | |
| return {"head": head, "tail": tail, "random": rnd} | |
| # --------------------------- | |
| # Gemini / genai interactions | |
| # --------------------------- | |
| def gemini_generate_json(model: str, system_instruction: str, user_content: str, require_json: bool = True) -> Any: | |
| """ | |
| Calls genai generate_content_stream with given system prompt and user content. | |
| Expects the model to return JSON text. Joins chunks and returns parsed JSON or raw text. | |
| Uses GEMINI_API_KEY from environment. | |
| """ | |
| api_key = os.environ.get("GEMINI_API_KEY") | |
| if not api_key: | |
| raise EnvironmentError("GEMINI_API_KEY not set in environment.") | |
| client = genai.Client(api_key=api_key) | |
| contents = [ | |
| types.Content( | |
| role="user", | |
| parts=[types.Part.from_text(text=user_content)], | |
| ) | |
| ] | |
| config = types.GenerateContentConfig( | |
| thinking_config=types.ThinkingConfig(thinking_budget=0), | |
| response_mime_type="application/json", | |
| system_instruction=[types.Part.from_text(text=system_instruction)], | |
| ) | |
| full_text = "" | |
| for chunk in client.models.generate_content_stream(model=model, contents=contents, config=config): | |
| # chunk may have .text or nested candidate parts | |
| if hasattr(chunk, "text") and chunk.text: | |
| full_text += chunk.text | |
| elif ( | |
| chunk.candidates | |
| and chunk.candidates[0].content | |
| and chunk.candidates[0].content.parts | |
| and chunk.candidates[0].content.parts[0].text | |
| ): | |
| full_text += chunk.candidates[0].content.parts[0].text | |
| full_text = full_text.strip() | |
| if require_json: | |
| try: | |
| return json.loads(full_text) | |
| except Exception: | |
| # fallback: return raw text for debugging | |
| return {"__raw_text__": full_text} | |
| return full_text | |
| # --------------------------- | |
| # Task execution (local, deterministic) | |
| # --------------------------- | |
| def generate_chart_data_from_spec(df: pd.DataFrame, spec: Dict[str, Any]) -> Tuple[str, List[Dict[str, Any]]]: | |
| """ | |
| Deterministic generator for known chart types. | |
| spec expected keys: chart_type, target_columns (list), aggregation (str or null) | |
| Returns (chart_type, results) | |
| """ | |
| chart_type = spec.get("chart_type") | |
| cols = spec.get("target_columns", []) | |
| agg = spec.get("aggregation", None) | |
| df_local = df.copy() | |
| if chart_type == "pie": | |
| # target_columns: [category_col] or [category_col, value_col] | |
| if len(cols) == 0: | |
| # pick first categorical | |
| cat = _pick_categorical(df_local) | |
| cols = [cat] if cat else [] | |
| if len(cols) == 1: | |
| col = cols[0] | |
| series = df_local[col].astype(str).value_counts(dropna=True) | |
| out = [{"name": k, "value": int(v)} for k, v in series.items()] | |
| return "pie", out | |
| else: | |
| cat, val = cols[0], cols[1] | |
| grouped = df_local.groupby(cat)[val].sum().reset_index() | |
| out = [{"name": r[cat], "value": to_json_serializable(r[val])} for r in grouped.to_dict(orient="records")] | |
| return "pie", out | |
| if chart_type == "bar": | |
| # target_columns: [label_col, metric_col] or [label_col] with count | |
| if len(cols) == 0: | |
| label = _pick_categorical(df_local) | |
| cols = [label] if label else [] | |
| if len(cols) == 1: | |
| label = cols[0] | |
| series = df_local[label].astype(str).value_counts().reset_index() | |
| series.columns = [label, "count"] | |
| out = [{"label": r[label], "count": int(r["count"])} for r in series.to_dict(orient="records")] | |
| return "bar", out | |
| else: | |
| label, metric = cols[0], cols[1] | |
| if agg in (None, "", "sum"): | |
| grouped = df_local.groupby(label)[metric].sum().reset_index() | |
| elif agg == "mean": | |
| grouped = df_local.groupby(label)[metric].mean().reset_index() | |
| else: | |
| grouped = df_local.groupby(label)[metric].sum().reset_index() | |
| out = [{"label": r[label], metric: to_json_serializable(r[metric])} for r in grouped.to_dict(orient="records")] | |
| return "bar", out | |
| if chart_type == "line": | |
| # target_columns: [x_col, y_col] | |
| if len(cols) < 2: | |
| # pick first numeric as y, first date-like or index as x | |
| y = _pick_numeric(df_local) | |
| x = _pick_datetime_or_index(df_local) | |
| cols = [x, y] | |
| xcol, ycol = cols[0], cols[1] | |
| s_x = ensure_datetime_series(df_local[xcol]) if xcol in df_local.columns else pd.Series(range(len(df_local))) | |
| series = pd.DataFrame({ "x": s_x, "y": df_local[ycol] }) | |
| # sort by x if datetime | |
| try: | |
| series = series.sort_values("x").reset_index(drop=True) | |
| except Exception: | |
| pass | |
| out = [{"x": to_json_serializable(r["x"].isoformat() if hasattr(r["x"], "isoformat") else r["x"]), "y": to_json_serializable(r["y"])} for r in series.to_dict(orient="records")] | |
| return "line", out | |
| if chart_type == "scatter": | |
| # target_columns: [x_col, y_col] | |
| if len(cols) < 2: | |
| x = _pick_numeric(df_local) | |
| y = _pick_numeric(df_local, exclude=[x]) if x else None | |
| cols = [x, y] | |
| xcol, ycol = cols[0], cols[1] | |
| out = [{"x": to_json_serializable(r[xcol]), "y": to_json_serializable(r[ycol])} for r in df_local[[xcol, ycol]].to_dict(orient="records")] | |
| return "scatter", out | |
| if chart_type == "histogram": | |
| # target_columns: [numeric_col] | |
| col = cols[0] if cols else _pick_numeric(df_local) | |
| series = df_local[col].dropna() | |
| counts, bin_edges = np.histogram(series, bins=10) | |
| out = [] | |
| for i in range(len(counts)): | |
| out.append({"bin": f"{float(bin_edges[i]):.6g}-{float(bin_edges[i+1]):.6g}", "count": int(counts[i])}) | |
| return "histogram", out | |
| if chart_type == "boxplot": | |
| # target_columns: [category_col, numeric_col] or [numeric_col] (global box) | |
| if len(cols) == 0: | |
| num = _pick_numeric(df_local) | |
| out = _box_aggregate(df_local, None, num) | |
| return "boxplot", out | |
| if len(cols) == 1: | |
| num = cols[0] | |
| out = _box_aggregate(df_local, None, num) | |
| return "boxplot", out | |
| cat, num = cols[0], cols[1] | |
| out = _box_aggregate(df_local, cat, num) | |
| return "boxplot", out | |
| # fallback: return empty | |
| return chart_type or "unknown", [] | |
| def _pick_categorical(df: pd.DataFrame) -> Optional[str]: | |
| for c in df.columns: | |
| if df[c].dtype == object or df[c].nunique() < max(50, 0.5 * len(df)): | |
| return c | |
| return None | |
| def _pick_numeric(df: pd.DataFrame, exclude: List[str] = []) -> Optional[str]: | |
| for c in df.select_dtypes(include=[np.number]).columns: | |
| if c not in exclude: | |
| return c | |
| # try coercion | |
| for c in df.columns: | |
| try: | |
| coerced = pd.to_numeric(df[c], errors="coerce") | |
| if coerced.notna().sum() > 0: | |
| return c | |
| except Exception: | |
| continue | |
| return None | |
| def _pick_datetime_or_index(df: pd.DataFrame) -> Optional[str]: | |
| for c in df.columns: | |
| if np.issubdtype(df[c].dtype, np.datetime64): | |
| return c | |
| # try to parse string columns | |
| for c in df.columns: | |
| try: | |
| parsed = pd.to_datetime(df[c], errors="coerce") | |
| if parsed.notna().sum() > 0: | |
| return c | |
| except Exception: | |
| continue | |
| return None | |
| def _box_aggregate(df: pd.DataFrame, cat_col: Optional[str], num_col: str) -> List[Dict[str, Any]]: | |
| out = [] | |
| if cat_col is None: | |
| series = df[num_col].dropna() | |
| q1 = float(series.quantile(0.25)) | |
| median = float(series.quantile(0.5)) | |
| q3 = float(series.quantile(0.75)) | |
| out.append({"category": None, "q1": q1, "median": median, "q3": q3}) | |
| return out | |
| for name, group in df.groupby(cat_col): | |
| ser = group[num_col].dropna() | |
| if len(ser) == 0: | |
| continue | |
| q1 = float(ser.quantile(0.25)) | |
| median = float(ser.quantile(0.5)) | |
| q3 = float(ser.quantile(0.75)) | |
| out.append({"category": to_json_serializable(name), "q1": q1, "median": median, "q3": q3}) | |
| return out | |
| # --------------------------- | |
| # Main pipeline controller | |
| # --------------------------- | |
| def ensure_six_tasks(tasks: List[Dict[str, Any]], df: pd.DataFrame) -> List[Dict[str, Any]]: | |
| """ | |
| Ensure at least 6 chart tasks. If <6, append deterministic tasks. | |
| """ | |
| existing_types = [t.get("chart_type") for t in tasks] | |
| candidates = ["pie", "bar", "line", "scatter", "histogram", "boxplot"] | |
| out = tasks[:] | |
| for ct in candidates: | |
| if len(out) >= 6: | |
| break | |
| if ct not in existing_types: | |
| # create a spec | |
| if ct == "pie": | |
| col = _pick_categorical(df) or df.columns[0] | |
| out.append({"chart_type": "pie", "target_columns": [col], "aggregation": "count", "reasoning": "fallback pie"}) | |
| elif ct == "bar": | |
| label = _pick_categorical(df) or df.columns[0] | |
| num = _pick_numeric(df) or df.columns[0] | |
| out.append({"chart_type": "bar", "target_columns": [label, num], "aggregation": "sum", "reasoning": "fallback bar"}) | |
| elif ct == "line": | |
| y = _pick_numeric(df) or df.columns[0] | |
| x = _pick_datetime_or_index(df) or df.index.name or "index" | |
| if x == "index": | |
| out.append({"chart_type": "line", "target_columns": [x, y], "aggregation": None, "reasoning": "fallback line on index"}) | |
| else: | |
| out.append({"chart_type": "line", "target_columns": [x, y], "aggregation": None, "reasoning": "fallback line"}) | |
| elif ct == "scatter": | |
| x = _pick_numeric(df) | |
| y = _pick_numeric(df, exclude=[x]) or x | |
| out.append({"chart_type": "scatter", "target_columns": [x, y], "aggregation": None, "reasoning": "fallback scatter"}) | |
| elif ct == "histogram": | |
| num = _pick_numeric(df) or df.columns[0] | |
| out.append({"chart_type": "histogram", "target_columns": [num], "aggregation": None, "reasoning": "fallback histogram"}) | |
| elif ct == "boxplot": | |
| num = _pick_numeric(df) or df.columns[0] | |
| cat = _pick_categorical(df) | |
| if cat: | |
| out.append({"chart_type": "boxplot", "target_columns": [cat, num], "aggregation": None, "reasoning": "fallback boxplot by category"}) | |
| else: | |
| out.append({"chart_type": "boxplot", "target_columns": [num], "aggregation": None, "reasoning": "fallback boxplot global"}) | |
| return out | |
| def process_file(path: str, sheet: Optional[str] = None, model: str = "gemini-2.5-flash-lite") -> Dict[str, Any]: | |
| # ingest | |
| df, meta = ingest_file(path, sheet) | |
| pre_df, preprocess_meta = preprocess_df(df) | |
| samples = sample_df(pre_df, n=5) | |
| # prepare payload for classification agent | |
| classification_input = json.dumps({"samples": samples, "schema": simple_schema(pre_df), "meta": meta}) | |
| classification_output = gemini_generate_json( | |
| model=model, | |
| system_instruction=PROMPTS["classification"], | |
| user_content=classification_input, | |
| require_json=True, | |
| ) | |
| # If classification_output is raw or malformed, fallback to naive classification | |
| if not isinstance(classification_output, dict) or "tasks" not in classification_output: | |
| fallback = { | |
| "domain": "unknown", | |
| "tasks": [ | |
| {"chart_type": "pie", "target_columns": [_pick_categorical(pre_df) or pre_df.columns[0]], "aggregation": "count", "reasoning": "fallback"}, | |
| {"chart_type": "bar", "target_columns": [_pick_categorical(pre_df) or pre_df.columns[0], _pick_numeric(pre_df) or pre_df.columns[0]], "aggregation": "sum", "reasoning": "fallback"}, | |
| ], | |
| } | |
| classification_output = fallback | |
| # planning agent: ask for code snippets & structured tasks | |
| planning_input = json.dumps({"classification": classification_output, "schema": simple_schema(pre_df), "samples": samples}) | |
| planning_output = gemini_generate_json( | |
| model=model, | |
| system_instruction=PROMPTS["planning"], | |
| user_content=planning_input, | |
| require_json=True, | |
| ) | |
| # planning_output expected form: {"tasks": [ {chart_type,..., code: "..."} ]} | |
| tasks = [] | |
| if isinstance(planning_output, dict) and "tasks" in planning_output: | |
| tasks = planning_output["tasks"] | |
| else: | |
| # If model didn't produce tasks array, use classification tasks | |
| tasks = classification_output.get("tasks", []) | |
| # Execution errors list (populate during validation/execution) | |
| execution_errors: List[Dict[str, Any]] = [] | |
| # Validate model-provided code before execution: | |
| # - require 'result' assignment inside code | |
| # - drop code that contains plotting tokens or disallowed patterns | |
| plotting_disallowed_re = re.compile(r"(matplotlib|seaborn|plotly|altair|bokeh|\.plot\s*\(|\.hist\s*\(|\.boxplot\s*\(|plt\b|pyplot\b)", flags=re.I) | |
| for i, t in enumerate(tasks): | |
| code = t.get("code", "") or "" | |
| if code: | |
| # 1) basic presence of `result` | |
| if "result" not in code: | |
| t.pop("code", None) | |
| execution_errors.append({"task_index": i, "task_id": t.get("task_id"), "reason": "missing 'result' assignment - code dropped"}) | |
| continue | |
| # 2) plotting tokens check | |
| if plotting_disallowed_re.search(code): | |
| t.pop("code", None) | |
| execution_errors.append({"task_index": i, "task_id": t.get("task_id"), "reason": "plotting functions not allowed - code dropped"}) | |
| continue | |
| # 3) disallowed patterns check | |
| safe, reason = code_is_safe(code) | |
| if not safe: | |
| t.pop("code", None) | |
| execution_errors.append({"task_index": i, "task_id": t.get("task_id"), "reason": f"disallowed pattern - code dropped: {reason}"}) | |
| continue | |
| # Ensure at least 6 tasks | |
| tasks = ensure_six_tasks(tasks, pre_df) | |
| # Execute tasks | |
| final: Dict[str, List[Dict[str, Any]]] = {"pie": [], "bar": [], "line": [], "scatter": [], "histogram": [], "boxplot": []} | |
| for idx, task in enumerate(tasks): | |
| chart_type = task.get("chart_type") | |
| code_snippet = task.get("code") # optional | |
| executed = False | |
| if code_snippet: | |
| safe, reason = code_is_safe(code_snippet) | |
| if not safe: | |
| logger.warning("Rejected unsafe code snippet at execution time: %s", reason) | |
| else: | |
| # Controlled globals for exec/eval | |
| allowed_globals = { | |
| "__builtins__": { | |
| "None": None, | |
| "True": True, | |
| "False": False, | |
| "len": len, | |
| "min": min, | |
| "max": max, | |
| "sum": sum, | |
| "sorted": sorted, | |
| "round": round, | |
| }, | |
| "pd": pd, | |
| "np": np, | |
| "df": pre_df.copy(), | |
| } | |
| local_vars: Dict[str, Any] = {} | |
| try: | |
| # 1) Try exec (model should assign `result`) | |
| exec(code_snippet, allowed_globals, local_vars) | |
| result = None | |
| if "result" in local_vars: | |
| result = local_vars["result"] | |
| elif "output" in local_vars: | |
| result = local_vars["output"] | |
| else: | |
| # 2) If no explicit result, attempt eval for single-expression snippets | |
| single_expr = ( | |
| ("\n" not in code_snippet) | |
| and ("=" not in code_snippet) | |
| and ("return" not in code_snippet) | |
| and (not code_snippet.strip().startswith("def ")) | |
| ) | |
| if single_expr: | |
| try: | |
| # eval in same controlled globals (no locals) | |
| result = eval(code_snippet, allowed_globals, {}) | |
| except Exception: | |
| result = None | |
| # 3) Normalize result into list-of-dicts | |
| result_json = None | |
| if isinstance(result, pd.DataFrame): | |
| result_json = [{k: to_json_serializable(v) for k, v in r.items()} for r in result.to_dict(orient="records")] | |
| elif isinstance(result, list): | |
| norm = [] | |
| for r in result: | |
| if isinstance(r, dict): | |
| norm.append({k: to_json_serializable(v) for k, v in r.items()}) | |
| else: | |
| # allow primitive lists but wrap as dict with value key | |
| norm.append({"value": to_json_serializable(r)}) | |
| result_json = norm | |
| elif isinstance(result, dict): | |
| result_json = [{k: to_json_serializable(v) for k, v in result.items()}] | |
| else: | |
| # primitive or None -> invalid for chart payload | |
| result_json = None | |
| if result_json is not None: | |
| # validate it's list of dicts or list | |
| if isinstance(result_json, list): | |
| # ensure each element is dict-like; if not, wrap | |
| normalized = [] | |
| for item in result_json: | |
| if isinstance(item, dict): | |
| normalized.append(item) | |
| else: | |
| normalized.append({"value": to_json_serializable(item)}) | |
| if chart_type in final: | |
| final[chart_type].extend(normalized) | |
| else: | |
| final.setdefault(chart_type, []).extend(normalized) | |
| executed = True | |
| if not executed: | |
| execution_errors.append({"task_index": idx, "reason": "result not list-of-dicts or missing after exec", "code": code_snippet}) | |
| except Exception as e: | |
| logger.exception("Model code execution failed for task %s: %s", idx, str(e)) | |
| execution_errors.append({"task_index": idx, "reason": "exception during exec/eval", "exception": str(e), "code": code_snippet}) | |
| if not executed: | |
| # deterministic fallback execution | |
| ct, res = generate_chart_data_from_spec(pre_df, task) | |
| if ct not in final: | |
| final.setdefault(ct, []) | |
| final[ct].extend(res) | |
| # Ensure lists are trimmed reasonably | |
| for k in final: | |
| if isinstance(final[k], list): | |
| final[k] = final[k][:200] | |
| output_payload = { | |
| "metadata": { | |
| "source_file": os.path.basename(path), | |
| "ingestion_meta": meta, | |
| "preprocess_meta": preprocess_meta, | |
| "classification": classification_output if isinstance(classification_output, dict) else {"raw": classification_output}, | |
| "planning_meta": planning_output if isinstance(planning_output, dict) else {"raw": planning_output}, | |
| "execution_errors": execution_errors, | |
| }, | |
| "charts": final, | |
| } | |
| return output_payload | |
| # --------------------------- | |
| # CLI | |
| # --------------------------- | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Automated analysis pipeline that outputs frontend-ready chart JSON.") | |
| parser.add_argument("path", type=str, help="Path to CSV or Excel file") | |
| parser.add_argument("--sheet", type=str, default=None, help="Sheet name if Excel") | |
| parser.add_argument("--model", type=str, default="gemini-2.5-flash-lite", help="Gemini model id") | |
| args = parser.parse_args() | |
| result = process_file(args.path, sheet=args.sheet, model=args.model) | |
| # Print final JSON to stdout | |
| print(json.dumps(result, indent=2, default=to_json_serializable)) | |
| if __name__ == "__main__": | |
| main() | |