Spaces:
Sleeping
Sleeping
| # app/pipeline.py | |
| import os | |
| import json | |
| import logging | |
| import re | |
| from typing import Any, Dict, List, Optional, Tuple | |
| import pandas as pd | |
| import numpy as np | |
| logger = logging.getLogger("pipeline") | |
| logger.setLevel(logging.INFO) | |
| # --------------------------- | |
| # Safety / helpers | |
| # --------------------------- | |
| DISALLOWED_PATTERNS = [ | |
| r"__\w+__", r"\bimport\s+os\b", r"\bimport\s+sys\b", r"\bimport\s+subprocess\b", | |
| r"\bimport\s+socket\b", r"\bimport\s+requests\b", r"\bopen\s*\(", r"\beval\s*\(", | |
| r"\bexec\s*\(", r"\bcompile\s*\(", r"\bsystem\s*\(", r"\bPopen\b", r"\bsh\b", | |
| ] | |
| def code_is_safe(code: str) -> Tuple[bool, Optional[str]]: | |
| for pat in DISALLOWED_PATTERNS: | |
| if re.search(pat, code): | |
| return False, f"disallowed pattern: {pat}" | |
| return True, None | |
| def to_json_serializable(obj): | |
| if isinstance(obj, (np.integer,)): | |
| return int(obj) | |
| if isinstance(obj, (np.floating,)): | |
| return float(obj) | |
| if isinstance(obj, (np.ndarray,)): | |
| return obj.tolist() | |
| if pd.isna(obj): | |
| return None | |
| return obj | |
| def simple_schema(df: pd.DataFrame) -> Dict[str, Any]: | |
| return { | |
| "columns": [ | |
| {"name": c, "dtype": str(df[c].dtype), "n_unique": int(df[c].nunique(dropna=True))} | |
| for c in df.columns | |
| ], | |
| "n_rows": int(len(df)), | |
| } | |
| # --------------------------- | |
| # Ingest / preprocess / sampling | |
| # --------------------------- | |
| def ingest_file(path: str, sheet: Optional[str] = None) -> Tuple[pd.DataFrame, Dict[str, Any]]: | |
| ext = os.path.splitext(path)[1].lower() | |
| metadata = {"file_type": ext, "sheet_names": None, "selected_sheet": None} | |
| if ext in [".csv", ".txt"]: | |
| df = pd.read_csv(path) | |
| metadata["selected_sheet"] = "csv" | |
| elif ext in [".xls", ".xlsx"]: | |
| xls = pd.ExcelFile(path) | |
| sheets = xls.sheet_names | |
| metadata["sheet_names"] = sheets | |
| chosen = sheet if sheet and sheet in sheets else sheets[0] | |
| metadata["selected_sheet"] = chosen | |
| df = pd.read_excel(xls, sheet_name=chosen) | |
| else: | |
| raise ValueError("Unsupported file type: " + ext) | |
| metadata["file_type"] = ext | |
| return df, metadata | |
| def preprocess_df(df: pd.DataFrame) -> Tuple[pd.DataFrame, Dict[str, Any]]: | |
| actions = [] | |
| df = df.copy() | |
| df.columns = [str(c).strip() for c in df.columns] | |
| object_cols = df.select_dtypes(include="object").columns.tolist() | |
| for c in object_cols: | |
| try: | |
| df[c] = df[c].where(df[c].isna(), df[c].astype(str).str.strip()) | |
| except Exception: | |
| pass | |
| for c in df.columns: | |
| if df[c].dtype == object: | |
| coerced = pd.to_numeric(df[c], errors="coerce") | |
| non_na = coerced.notna().sum() | |
| if non_na >= max(1, 0.5 * len(df)): | |
| df[c] = coerced | |
| actions.append(f"coerced {c} -> numeric") | |
| num_cols = df.select_dtypes(include=[np.number]).columns.tolist() | |
| for c in num_cols: | |
| median = df[c].median() | |
| if pd.isna(median): | |
| median = 0 | |
| df[c] = df[c].fillna(median) | |
| actions.append(f"filled numeric {c} nulls with median {median}") | |
| for c in object_cols: | |
| try: | |
| mode = df[c].mode().iloc[0] if not df[c].mode().empty else "" | |
| except Exception: | |
| mode = "" | |
| df[c] = df[c].fillna(mode) | |
| actions.append(f"filled object {c} nulls with mode '{mode}'") | |
| schema = simple_schema(df) | |
| return df, {"actions": actions, "schema": schema} | |
| def sample_df(df: pd.DataFrame, n: int = 5) -> Dict[str, Any]: | |
| head = df.head(n).to_dict(orient="records") | |
| tail = df.tail(n).to_dict(orient="records") | |
| rnd = df.sample(n=n, random_state=42).to_dict(orient="records") if len(df) > n else df.sample(frac=1.0).to_dict(orient="records") | |
| return {"head": head, "tail": tail, "random": rnd} | |
| # --------------------------- | |
| # Deterministic chart generator (fallback) | |
| # --------------------------- | |
| def _pick_categorical(df: pd.DataFrame) -> Optional[str]: | |
| for c in df.columns: | |
| if df[c].dtype == object or df[c].nunique() < max(50, 0.5 * len(df)): | |
| return c | |
| return None | |
| def _pick_numeric(df: pd.DataFrame, exclude: List[str] = []) -> Optional[str]: | |
| for c in df.select_dtypes(include=[np.number]).columns: | |
| if c not in exclude: | |
| return c | |
| for c in df.columns: | |
| try: | |
| coerced = pd.to_numeric(df[c], errors="coerce") | |
| if coerced.notna().sum() > 0: | |
| return c | |
| except Exception: | |
| continue | |
| return None | |
| def _pick_datetime_or_index(df: pd.DataFrame) -> Optional[str]: | |
| for c in df.columns: | |
| if np.issubdtype(df[c].dtype, np.datetime64): | |
| return c | |
| for c in df.columns: | |
| try: | |
| parsed = pd.to_datetime(df[c], errors="coerce") | |
| if parsed.notna().sum() > 0: | |
| return c | |
| except Exception: | |
| continue | |
| return None | |
| def _box_aggregate(df: pd.DataFrame, cat_col: Optional[str], num_col: str) -> List[Dict[str, Any]]: | |
| out = [] | |
| if cat_col is None: | |
| series = df[num_col].dropna() | |
| if len(series) == 0: | |
| return out | |
| q1 = float(series.quantile(0.25)) | |
| median = float(series.quantile(0.5)) | |
| q3 = float(series.quantile(0.75)) | |
| out.append({"category": None, "q1": q1, "median": median, "q3": q3}) | |
| return out | |
| for name, group in df.groupby(cat_col): | |
| ser = group[num_col].dropna() | |
| if len(ser) == 0: | |
| continue | |
| q1 = float(ser.quantile(0.25)) | |
| median = float(ser.quantile(0.5)) | |
| q3 = float(ser.quantile(0.75)) | |
| out.append({"category": to_json_serializable(name), "q1": q1, "median": median, "q3": q3}) | |
| return out | |
| def generate_chart_data_from_spec(df: pd.DataFrame, spec: Dict[str, Any]) -> Tuple[str, List[Dict[str, Any]]]: | |
| chart_type = spec.get("chart_type") | |
| cols = spec.get("target_columns", []) | |
| agg = spec.get("aggregation", None) | |
| df_local = df.copy() | |
| if chart_type == "pie": | |
| if len(cols) == 0: | |
| col = _pick_categorical(df_local) | |
| cols = [col] if col else [] | |
| if len(cols) == 1: | |
| col = cols[0] | |
| series = df_local[col].astype(str).value_counts(dropna=True) | |
| return "pie", [{"name": k, "value": int(v)} for k, v in series.items()] | |
| else: | |
| cat, val = cols[0], cols[1] | |
| grouped = df_local.groupby(cat)[val].sum().reset_index() | |
| return "pie", [{"name": r[cat], "value": to_json_serializable(r[val])} for r in grouped.to_dict(orient="records")] | |
| if chart_type == "bar": | |
| if len(cols) == 0: | |
| label = _pick_categorical(df_local) | |
| cols = [label] if label else [] | |
| if len(cols) == 1: | |
| label = cols[0] | |
| series = df_local[label].astype(str).value_counts().reset_index() | |
| series.columns = [label, "count"] | |
| return "bar", [{"label": r[label], "count": int(r["count"])} for r in series.to_dict(orient="records")] | |
| else: | |
| label, metric = cols[0], cols[1] | |
| if agg in (None, "", "sum"): | |
| grouped = df_local.groupby(label)[metric].sum().reset_index() | |
| elif agg == "mean": | |
| grouped = df_local.groupby(label)[metric].mean().reset_index() | |
| else: | |
| grouped = df_local.groupby(label)[metric].sum().reset_index() | |
| return "bar", [{"label": r[label], metric: to_json_serializable(r[metric])} for r in grouped.to_dict(orient="records")] | |
| if chart_type == "line": | |
| if len(cols) < 2: | |
| y = _pick_numeric(df_local) | |
| x = _pick_datetime_or_index(df_local) | |
| cols = [x, y] | |
| xcol, ycol = cols[0], cols[1] | |
| s_x = pd.to_datetime(df_local[xcol], errors="coerce") if xcol in df_local.columns else pd.Series(range(len(df_local))) | |
| series = pd.DataFrame({"x": s_x, "y": df_local[ycol]}) | |
| try: | |
| series = series.sort_values("x").reset_index(drop=True) | |
| except Exception: | |
| pass | |
| out = [] | |
| for r in series.to_dict(orient="records"): | |
| x_val = r["x"].isoformat() if hasattr(r["x"], "isoformat") else r["x"] | |
| out.append({"x": to_json_serializable(x_val), "y": to_json_serializable(r["y"])}) | |
| return "line", out | |
| if chart_type == "scatter": | |
| if len(cols) < 2: | |
| x = _pick_numeric(df_local) | |
| y = _pick_numeric(df_local, exclude=[x]) if x else None | |
| cols = [x, y] | |
| xcol, ycol = cols[0], cols[1] | |
| return "scatter", [{"x": to_json_serializable(r[xcol]), "y": to_json_serializable(r[ycol])} for r in df_local[[xcol, ycol]].to_dict(orient="records")] | |
| if chart_type == "histogram": | |
| col = cols[0] if cols else _pick_numeric(df_local) | |
| series = df_local[col].dropna() | |
| counts, bin_edges = np.histogram(series, bins=10) | |
| out = [] | |
| for i in range(len(counts)): | |
| out.append({"bin": f"{float(bin_edges[i]):.6g}-{float(bin_edges[i+1]):.6g}", "count": int(counts[i])}) | |
| return "histogram", out | |
| if chart_type == "boxplot": | |
| if len(cols) == 0: | |
| num = _pick_numeric(df_local) | |
| return "boxplot", _box_aggregate(df_local, None, num) | |
| if len(cols) == 1: | |
| num = cols[0] | |
| return "boxplot", _box_aggregate(df_local, None, num) | |
| cat, num = cols[0], cols[1] | |
| return "boxplot", _box_aggregate(df_local, cat, num) | |
| return chart_type or "unknown", [] | |
| # --------------------------- | |
| # Gemini wrapper (optional) | |
| # --------------------------- | |
| def gemini_generate_json(model: str, system_instruction: str, user_content: str, require_json: bool = True) -> Any: | |
| """ | |
| Attempts to call google-genai if GEMINI_API_KEY is provided. | |
| Falls back to returning a safe marker that indicates a deterministic fallback should be used. | |
| """ | |
| api_key = "AIzaSyDfy0E-9b2XjoYHrHX2C1nVLHWyrWUFkMs" | |
| if not api_key: | |
| logger.info("GEMINI_API_KEY not set; using deterministic fallbacks.") | |
| return {"__use_fallback__": True} | |
| try: | |
| # import lazily | |
| from google import genai | |
| from google.genai import types | |
| except Exception as e: | |
| logger.exception("google-genai not available: %s", e) | |
| return {"__use_fallback__": True} | |
| client = genai.Client(api_key=api_key) | |
| contents = [ | |
| types.Content( | |
| role="user", | |
| parts=[types.Part.from_text(text=user_content)], | |
| ) | |
| ] | |
| config = types.GenerateContentConfig( | |
| thinking_config=types.ThinkingConfig(thinking_budget=0), | |
| response_mime_type="application/json", | |
| system_instruction=[types.Part.from_text(text=system_instruction)], | |
| ) | |
| full_text = "" | |
| try: | |
| for chunk in client.models.generate_content_stream(model=model, contents=contents, config=config): | |
| if hasattr(chunk, "text") and chunk.text: | |
| full_text += chunk.text | |
| elif ( | |
| chunk.candidates | |
| and chunk.candidates[0].content | |
| and chunk.candidates[0].content.parts | |
| and chunk.candidates[0].content.parts[0].text | |
| ): | |
| full_text += chunk.candidates[0].content.parts[0].text | |
| full_text = full_text.strip() | |
| if require_json: | |
| try: | |
| return json.loads(full_text) | |
| except Exception: | |
| return {"__raw_text__": full_text} | |
| return full_text | |
| except Exception as e: | |
| logger.exception("genai call failed: %s", e) | |
| return {"__use_fallback__": True} | |
| # --------------------------- | |
| # Controller | |
| # --------------------------- | |
| DEFAULT_MODEL = "gemini-2.5-flash-lite" | |
| def ensure_six_tasks(tasks: List[Dict[str, Any]], df: pd.DataFrame) -> List[Dict[str, Any]]: | |
| existing_types = [t.get("chart_type") for t in tasks] | |
| candidates = ["pie", "bar", "line", "scatter", "histogram", "boxplot"] | |
| out = tasks[:] | |
| for ct in candidates: | |
| if len(out) >= 6: | |
| break | |
| if ct not in existing_types: | |
| if ct == "pie": | |
| col = _pick_categorical(df) or df.columns[0] | |
| out.append({"chart_type": "pie", "target_columns": [col], "aggregation": "count", "reasoning": "fallback pie"}) | |
| elif ct == "bar": | |
| label = _pick_categorical(df) or df.columns[0] | |
| num = _pick_numeric(df) or df.columns[0] | |
| out.append({"chart_type": "bar", "target_columns": [label, num], "aggregation": "sum", "reasoning": "fallback bar"}) | |
| elif ct == "line": | |
| y = _pick_numeric(df) or df.columns[0] | |
| x = _pick_datetime_or_index(df) or df.index.name or "index" | |
| if x == "index": | |
| out.append({"chart_type": "line", "target_columns": [x, y], "aggregation": None, "reasoning": "fallback line on index"}) | |
| else: | |
| out.append({"chart_type": "line", "target_columns": [x, y], "aggregation": None, "reasoning": "fallback line"}) | |
| elif ct == "scatter": | |
| x = _pick_numeric(df) | |
| y = _pick_numeric(df, exclude=[x]) or x | |
| out.append({"chart_type": "scatter", "target_columns": [x, y], "aggregation": None, "reasoning": "fallback scatter"}) | |
| elif ct == "histogram": | |
| num = _pick_numeric(df) or df.columns[0] | |
| out.append({"chart_type": "histogram", "target_columns": [num], "aggregation": None, "reasoning": "fallback histogram"}) | |
| elif ct == "boxplot": | |
| num = _pick_numeric(df) or df.columns[0] | |
| cat = _pick_categorical(df) | |
| if cat: | |
| out.append({"chart_type": "boxplot", "target_columns": [cat, num], "aggregation": None, "reasoning": "fallback boxplot by category"}) | |
| else: | |
| out.append({"chart_type": "boxplot", "target_columns": [num], "aggregation": None, "reasoning": "fallback boxplot global"}) | |
| return out | |
| def process_file(path: str, sheet: Optional[str] = None, model: str = DEFAULT_MODEL) -> Dict[str, Any]: | |
| df, meta = ingest_file(path, sheet) | |
| pre_df, preprocess_meta = preprocess_df(df) | |
| samples = sample_df(pre_df, n=5) | |
| classification_input = json.dumps({"samples": samples, "schema": simple_schema(pre_df), "meta": meta}) | |
| classification_output = gemini_generate_json( | |
| model=model, | |
| system_instruction="You are a classification agent. Identify domain and chart tasks.", | |
| user_content=classification_input, | |
| require_json=True, | |
| ) | |
| if not isinstance(classification_output, dict) or "tasks" not in classification_output: | |
| fallback = { | |
| "domain": "unknown", | |
| "tasks": [ | |
| {"chart_type": "pie", "target_columns": [_pick_categorical(pre_df) or pre_df.columns[0]], "aggregation": "count", "reasoning": "fallback"}, | |
| {"chart_type": "bar", "target_columns": [_pick_categorical(pre_df) or pre_df.columns[0], _pick_numeric(pre_df) or pre_df.columns[0]], "aggregation": "sum", "reasoning": "fallback"}, | |
| ], | |
| } | |
| classification_output = fallback | |
| planning_input = json.dumps({"classification": classification_output, "schema": simple_schema(pre_df), "samples": samples}) | |
| planning_output = gemini_generate_json( | |
| model=model, | |
| system_instruction="You are a planning agent. Produce tasks with code assigning `result` variable.", | |
| user_content=planning_input, | |
| require_json=True, | |
| ) | |
| tasks = [] | |
| if isinstance(planning_output, dict) and "tasks" in planning_output: | |
| tasks = planning_output["tasks"] | |
| else: | |
| tasks = classification_output.get("tasks", []) | |
| tasks = ensure_six_tasks(tasks, pre_df) | |
| final = {"pie": [], "bar": [], "line": [], "scatter": [], "histogram": [], "boxplot": []} | |
| execution_errors = [] | |
| for idx, task in enumerate(tasks): | |
| chart_type = task.get("chart_type") | |
| code_snippet = task.get("code") | |
| executed = False | |
| if code_snippet: | |
| safe, reason = code_is_safe(code_snippet) | |
| if not safe: | |
| logger.warning("Rejected unsafe code snippet: %s", reason) | |
| else: | |
| allowed_globals = { | |
| "__builtins__": {"None": None, "True": True, "False": False, "len": len, "min": min, "max": max, "sum": sum, "sorted": sorted, "round": round}, | |
| "pd": pd, "np": np, "df": pre_df.copy(), | |
| } | |
| local_vars = {} | |
| try: | |
| exec(code_snippet, allowed_globals, local_vars) | |
| result = None | |
| if "result" in local_vars: | |
| result = local_vars["result"] | |
| elif "output" in local_vars: | |
| result = local_vars["output"] | |
| else: | |
| single_expr = (("\n" not in code_snippet) and ("=" not in code_snippet) and ("return" not in code_snippet) and (not code_snippet.strip().startswith("def "))) | |
| if single_expr: | |
| try: | |
| result = eval(code_snippet, allowed_globals, {}) | |
| except Exception: | |
| result = None | |
| result_json = None | |
| if isinstance(result, pd.DataFrame): | |
| result_json = [{k: to_json_serializable(v) for k, v in r.items()} for r in result.to_dict(orient="records")] | |
| elif isinstance(result, list): | |
| norm = [] | |
| for r in result: | |
| if isinstance(r, dict): | |
| norm.append({k: to_json_serializable(v) for k, v in r.items()}) | |
| else: | |
| norm.append(to_json_serializable(r)) | |
| result_json = norm | |
| elif isinstance(result, dict): | |
| result_json = [{k: to_json_serializable(v) for k, v in result.items()}] | |
| else: | |
| result_json = None | |
| if result_json is not None: | |
| normalized = [] | |
| for item in result_json: | |
| if isinstance(item, dict): | |
| normalized.append(item) | |
| else: | |
| normalized.append({"value": to_json_serializable(item)}) | |
| if chart_type in final: | |
| final[chart_type].extend(normalized) | |
| else: | |
| final.setdefault(chart_type, []).extend(normalized) | |
| executed = True | |
| if not executed: | |
| execution_errors.append({"task_index": idx, "reason": "result not list-of-dicts or missing", "code": code_snippet}) | |
| except Exception as e: | |
| logger.exception("Model code execution failed for task %s: %s", idx, str(e)) | |
| execution_errors.append({"task_index": idx, "reason": "exception during exec/eval", "exception": str(e), "code": code_snippet}) | |
| if not executed: | |
| ct, res = generate_chart_data_from_spec(pre_df, task) | |
| final.setdefault(ct, []).extend(res) | |
| for k in final: | |
| if isinstance(final[k], list): | |
| final[k] = final[k][:200] | |
| output_payload = { | |
| "metadata": { | |
| "source_file": os.path.basename(path), | |
| "ingestion_meta": meta, | |
| "preprocess_meta": preprocess_meta, | |
| "classification": classification_output if isinstance(classification_output, dict) else {"raw": classification_output}, | |
| "planning_meta": planning_output if isinstance(planning_output, dict) else {"raw": planning_output}, | |
| "execution_errors": execution_errors, | |
| }, | |
| "charts": final, | |
| } | |
| return output_payload | |