# app/pipeline.py import os import json import logging import re from typing import Any, Dict, List, Optional, Tuple import pandas as pd import numpy as np logger = logging.getLogger("pipeline") logger.setLevel(logging.INFO) # --------------------------- # Safety / helpers # --------------------------- DISALLOWED_PATTERNS = [ r"__\w+__", r"\bimport\s+os\b", r"\bimport\s+sys\b", r"\bimport\s+subprocess\b", r"\bimport\s+socket\b", r"\bimport\s+requests\b", r"\bopen\s*\(", r"\beval\s*\(", r"\bexec\s*\(", r"\bcompile\s*\(", r"\bsystem\s*\(", r"\bPopen\b", r"\bsh\b", ] def code_is_safe(code: str) -> Tuple[bool, Optional[str]]: for pat in DISALLOWED_PATTERNS: if re.search(pat, code): return False, f"disallowed pattern: {pat}" return True, None def to_json_serializable(obj): if isinstance(obj, (np.integer,)): return int(obj) if isinstance(obj, (np.floating,)): return float(obj) if isinstance(obj, (np.ndarray,)): return obj.tolist() if pd.isna(obj): return None return obj def simple_schema(df: pd.DataFrame) -> Dict[str, Any]: return { "columns": [ {"name": c, "dtype": str(df[c].dtype), "n_unique": int(df[c].nunique(dropna=True))} for c in df.columns ], "n_rows": int(len(df)), } # --------------------------- # Ingest / preprocess / sampling # --------------------------- def ingest_file(path: str, sheet: Optional[str] = None) -> Tuple[pd.DataFrame, Dict[str, Any]]: ext = os.path.splitext(path)[1].lower() metadata = {"file_type": ext, "sheet_names": None, "selected_sheet": None} if ext in [".csv", ".txt"]: df = pd.read_csv(path) metadata["selected_sheet"] = "csv" elif ext in [".xls", ".xlsx"]: xls = pd.ExcelFile(path) sheets = xls.sheet_names metadata["sheet_names"] = sheets chosen = sheet if sheet and sheet in sheets else sheets[0] metadata["selected_sheet"] = chosen df = pd.read_excel(xls, sheet_name=chosen) else: raise ValueError("Unsupported file type: " + ext) metadata["file_type"] = ext return df, metadata def preprocess_df(df: pd.DataFrame) -> Tuple[pd.DataFrame, Dict[str, Any]]: actions = [] df = df.copy() df.columns = [str(c).strip() for c in df.columns] object_cols = df.select_dtypes(include="object").columns.tolist() for c in object_cols: try: df[c] = df[c].where(df[c].isna(), df[c].astype(str).str.strip()) except Exception: pass for c in df.columns: if df[c].dtype == object: coerced = pd.to_numeric(df[c], errors="coerce") non_na = coerced.notna().sum() if non_na >= max(1, 0.5 * len(df)): df[c] = coerced actions.append(f"coerced {c} -> numeric") num_cols = df.select_dtypes(include=[np.number]).columns.tolist() for c in num_cols: median = df[c].median() if pd.isna(median): median = 0 df[c] = df[c].fillna(median) actions.append(f"filled numeric {c} nulls with median {median}") for c in object_cols: try: mode = df[c].mode().iloc[0] if not df[c].mode().empty else "" except Exception: mode = "" df[c] = df[c].fillna(mode) actions.append(f"filled object {c} nulls with mode '{mode}'") schema = simple_schema(df) return df, {"actions": actions, "schema": schema} def sample_df(df: pd.DataFrame, n: int = 5) -> Dict[str, Any]: head = df.head(n).to_dict(orient="records") tail = df.tail(n).to_dict(orient="records") rnd = df.sample(n=n, random_state=42).to_dict(orient="records") if len(df) > n else df.sample(frac=1.0).to_dict(orient="records") return {"head": head, "tail": tail, "random": rnd} # --------------------------- # Deterministic chart generator (fallback) # --------------------------- def _pick_categorical(df: pd.DataFrame) -> Optional[str]: for c in df.columns: if df[c].dtype == object or df[c].nunique() < max(50, 0.5 * len(df)): return c return None def _pick_numeric(df: pd.DataFrame, exclude: List[str] = []) -> Optional[str]: for c in df.select_dtypes(include=[np.number]).columns: if c not in exclude: return c for c in df.columns: try: coerced = pd.to_numeric(df[c], errors="coerce") if coerced.notna().sum() > 0: return c except Exception: continue return None def _pick_datetime_or_index(df: pd.DataFrame) -> Optional[str]: for c in df.columns: if np.issubdtype(df[c].dtype, np.datetime64): return c for c in df.columns: try: parsed = pd.to_datetime(df[c], errors="coerce") if parsed.notna().sum() > 0: return c except Exception: continue return None def _box_aggregate(df: pd.DataFrame, cat_col: Optional[str], num_col: str) -> List[Dict[str, Any]]: out = [] if cat_col is None: series = df[num_col].dropna() if len(series) == 0: return out q1 = float(series.quantile(0.25)) median = float(series.quantile(0.5)) q3 = float(series.quantile(0.75)) out.append({"category": None, "q1": q1, "median": median, "q3": q3}) return out for name, group in df.groupby(cat_col): ser = group[num_col].dropna() if len(ser) == 0: continue q1 = float(ser.quantile(0.25)) median = float(ser.quantile(0.5)) q3 = float(ser.quantile(0.75)) out.append({"category": to_json_serializable(name), "q1": q1, "median": median, "q3": q3}) return out def generate_chart_data_from_spec(df: pd.DataFrame, spec: Dict[str, Any]) -> Tuple[str, List[Dict[str, Any]]]: chart_type = spec.get("chart_type") cols = spec.get("target_columns", []) agg = spec.get("aggregation", None) df_local = df.copy() if chart_type == "pie": if len(cols) == 0: col = _pick_categorical(df_local) cols = [col] if col else [] if len(cols) == 1: col = cols[0] series = df_local[col].astype(str).value_counts(dropna=True) return "pie", [{"name": k, "value": int(v)} for k, v in series.items()] else: cat, val = cols[0], cols[1] grouped = df_local.groupby(cat)[val].sum().reset_index() return "pie", [{"name": r[cat], "value": to_json_serializable(r[val])} for r in grouped.to_dict(orient="records")] if chart_type == "bar": if len(cols) == 0: label = _pick_categorical(df_local) cols = [label] if label else [] if len(cols) == 1: label = cols[0] series = df_local[label].astype(str).value_counts().reset_index() series.columns = [label, "count"] return "bar", [{"label": r[label], "count": int(r["count"])} for r in series.to_dict(orient="records")] else: label, metric = cols[0], cols[1] if agg in (None, "", "sum"): grouped = df_local.groupby(label)[metric].sum().reset_index() elif agg == "mean": grouped = df_local.groupby(label)[metric].mean().reset_index() else: grouped = df_local.groupby(label)[metric].sum().reset_index() return "bar", [{"label": r[label], metric: to_json_serializable(r[metric])} for r in grouped.to_dict(orient="records")] if chart_type == "line": if len(cols) < 2: y = _pick_numeric(df_local) x = _pick_datetime_or_index(df_local) cols = [x, y] xcol, ycol = cols[0], cols[1] s_x = pd.to_datetime(df_local[xcol], errors="coerce") if xcol in df_local.columns else pd.Series(range(len(df_local))) series = pd.DataFrame({"x": s_x, "y": df_local[ycol]}) try: series = series.sort_values("x").reset_index(drop=True) except Exception: pass out = [] for r in series.to_dict(orient="records"): x_val = r["x"].isoformat() if hasattr(r["x"], "isoformat") else r["x"] out.append({"x": to_json_serializable(x_val), "y": to_json_serializable(r["y"])}) return "line", out if chart_type == "scatter": if len(cols) < 2: x = _pick_numeric(df_local) y = _pick_numeric(df_local, exclude=[x]) if x else None cols = [x, y] xcol, ycol = cols[0], cols[1] return "scatter", [{"x": to_json_serializable(r[xcol]), "y": to_json_serializable(r[ycol])} for r in df_local[[xcol, ycol]].to_dict(orient="records")] if chart_type == "histogram": col = cols[0] if cols else _pick_numeric(df_local) series = df_local[col].dropna() counts, bin_edges = np.histogram(series, bins=10) out = [] for i in range(len(counts)): out.append({"bin": f"{float(bin_edges[i]):.6g}-{float(bin_edges[i+1]):.6g}", "count": int(counts[i])}) return "histogram", out if chart_type == "boxplot": if len(cols) == 0: num = _pick_numeric(df_local) return "boxplot", _box_aggregate(df_local, None, num) if len(cols) == 1: num = cols[0] return "boxplot", _box_aggregate(df_local, None, num) cat, num = cols[0], cols[1] return "boxplot", _box_aggregate(df_local, cat, num) return chart_type or "unknown", [] # --------------------------- # Gemini wrapper (optional) # --------------------------- def gemini_generate_json(model: str, system_instruction: str, user_content: str, require_json: bool = True) -> Any: """ Attempts to call google-genai if GEMINI_API_KEY is provided. Falls back to returning a safe marker that indicates a deterministic fallback should be used. """ api_key = "AIzaSyDfy0E-9b2XjoYHrHX2C1nVLHWyrWUFkMs" if not api_key: logger.info("GEMINI_API_KEY not set; using deterministic fallbacks.") return {"__use_fallback__": True} try: # import lazily from google import genai from google.genai import types except Exception as e: logger.exception("google-genai not available: %s", e) return {"__use_fallback__": True} client = genai.Client(api_key=api_key) contents = [ types.Content( role="user", parts=[types.Part.from_text(text=user_content)], ) ] config = types.GenerateContentConfig( thinking_config=types.ThinkingConfig(thinking_budget=0), response_mime_type="application/json", system_instruction=[types.Part.from_text(text=system_instruction)], ) full_text = "" try: for chunk in client.models.generate_content_stream(model=model, contents=contents, config=config): if hasattr(chunk, "text") and chunk.text: full_text += chunk.text elif ( chunk.candidates and chunk.candidates[0].content and chunk.candidates[0].content.parts and chunk.candidates[0].content.parts[0].text ): full_text += chunk.candidates[0].content.parts[0].text full_text = full_text.strip() if require_json: try: return json.loads(full_text) except Exception: return {"__raw_text__": full_text} return full_text except Exception as e: logger.exception("genai call failed: %s", e) return {"__use_fallback__": True} # --------------------------- # Controller # --------------------------- DEFAULT_MODEL = "gemini-2.5-flash-lite" def ensure_six_tasks(tasks: List[Dict[str, Any]], df: pd.DataFrame) -> List[Dict[str, Any]]: existing_types = [t.get("chart_type") for t in tasks] candidates = ["pie", "bar", "line", "scatter", "histogram", "boxplot"] out = tasks[:] for ct in candidates: if len(out) >= 6: break if ct not in existing_types: if ct == "pie": col = _pick_categorical(df) or df.columns[0] out.append({"chart_type": "pie", "target_columns": [col], "aggregation": "count", "reasoning": "fallback pie"}) elif ct == "bar": label = _pick_categorical(df) or df.columns[0] num = _pick_numeric(df) or df.columns[0] out.append({"chart_type": "bar", "target_columns": [label, num], "aggregation": "sum", "reasoning": "fallback bar"}) elif ct == "line": y = _pick_numeric(df) or df.columns[0] x = _pick_datetime_or_index(df) or df.index.name or "index" if x == "index": out.append({"chart_type": "line", "target_columns": [x, y], "aggregation": None, "reasoning": "fallback line on index"}) else: out.append({"chart_type": "line", "target_columns": [x, y], "aggregation": None, "reasoning": "fallback line"}) elif ct == "scatter": x = _pick_numeric(df) y = _pick_numeric(df, exclude=[x]) or x out.append({"chart_type": "scatter", "target_columns": [x, y], "aggregation": None, "reasoning": "fallback scatter"}) elif ct == "histogram": num = _pick_numeric(df) or df.columns[0] out.append({"chart_type": "histogram", "target_columns": [num], "aggregation": None, "reasoning": "fallback histogram"}) elif ct == "boxplot": num = _pick_numeric(df) or df.columns[0] cat = _pick_categorical(df) if cat: out.append({"chart_type": "boxplot", "target_columns": [cat, num], "aggregation": None, "reasoning": "fallback boxplot by category"}) else: out.append({"chart_type": "boxplot", "target_columns": [num], "aggregation": None, "reasoning": "fallback boxplot global"}) return out def process_file(path: str, sheet: Optional[str] = None, model: str = DEFAULT_MODEL) -> Dict[str, Any]: df, meta = ingest_file(path, sheet) pre_df, preprocess_meta = preprocess_df(df) samples = sample_df(pre_df, n=5) classification_input = json.dumps({"samples": samples, "schema": simple_schema(pre_df), "meta": meta}) classification_output = gemini_generate_json( model=model, system_instruction="You are a classification agent. Identify domain and chart tasks.", user_content=classification_input, require_json=True, ) if not isinstance(classification_output, dict) or "tasks" not in classification_output: fallback = { "domain": "unknown", "tasks": [ {"chart_type": "pie", "target_columns": [_pick_categorical(pre_df) or pre_df.columns[0]], "aggregation": "count", "reasoning": "fallback"}, {"chart_type": "bar", "target_columns": [_pick_categorical(pre_df) or pre_df.columns[0], _pick_numeric(pre_df) or pre_df.columns[0]], "aggregation": "sum", "reasoning": "fallback"}, ], } classification_output = fallback planning_input = json.dumps({"classification": classification_output, "schema": simple_schema(pre_df), "samples": samples}) planning_output = gemini_generate_json( model=model, system_instruction="You are a planning agent. Produce tasks with code assigning `result` variable.", user_content=planning_input, require_json=True, ) tasks = [] if isinstance(planning_output, dict) and "tasks" in planning_output: tasks = planning_output["tasks"] else: tasks = classification_output.get("tasks", []) tasks = ensure_six_tasks(tasks, pre_df) final = {"pie": [], "bar": [], "line": [], "scatter": [], "histogram": [], "boxplot": []} execution_errors = [] for idx, task in enumerate(tasks): chart_type = task.get("chart_type") code_snippet = task.get("code") executed = False if code_snippet: safe, reason = code_is_safe(code_snippet) if not safe: logger.warning("Rejected unsafe code snippet: %s", reason) else: allowed_globals = { "__builtins__": {"None": None, "True": True, "False": False, "len": len, "min": min, "max": max, "sum": sum, "sorted": sorted, "round": round}, "pd": pd, "np": np, "df": pre_df.copy(), } local_vars = {} try: exec(code_snippet, allowed_globals, local_vars) result = None if "result" in local_vars: result = local_vars["result"] elif "output" in local_vars: result = local_vars["output"] else: single_expr = (("\n" not in code_snippet) and ("=" not in code_snippet) and ("return" not in code_snippet) and (not code_snippet.strip().startswith("def "))) if single_expr: try: result = eval(code_snippet, allowed_globals, {}) except Exception: result = None result_json = None if isinstance(result, pd.DataFrame): result_json = [{k: to_json_serializable(v) for k, v in r.items()} for r in result.to_dict(orient="records")] elif isinstance(result, list): norm = [] for r in result: if isinstance(r, dict): norm.append({k: to_json_serializable(v) for k, v in r.items()}) else: norm.append(to_json_serializable(r)) result_json = norm elif isinstance(result, dict): result_json = [{k: to_json_serializable(v) for k, v in result.items()}] else: result_json = None if result_json is not None: normalized = [] for item in result_json: if isinstance(item, dict): normalized.append(item) else: normalized.append({"value": to_json_serializable(item)}) if chart_type in final: final[chart_type].extend(normalized) else: final.setdefault(chart_type, []).extend(normalized) executed = True if not executed: execution_errors.append({"task_index": idx, "reason": "result not list-of-dicts or missing", "code": code_snippet}) except Exception as e: logger.exception("Model code execution failed for task %s: %s", idx, str(e)) execution_errors.append({"task_index": idx, "reason": "exception during exec/eval", "exception": str(e), "code": code_snippet}) if not executed: ct, res = generate_chart_data_from_spec(pre_df, task) final.setdefault(ct, []).extend(res) for k in final: if isinstance(final[k], list): final[k] = final[k][:200] output_payload = { "metadata": { "source_file": os.path.basename(path), "ingestion_meta": meta, "preprocess_meta": preprocess_meta, "classification": classification_output if isinstance(classification_output, dict) else {"raw": classification_output}, "planning_meta": planning_output if isinstance(planning_output, dict) else {"raw": planning_output}, "execution_errors": execution_errors, }, "charts": final, } return output_payload