rawdataapi / app /pipeline.py
triflix's picture
Rename pipeline.py to app/pipeline.py
ca70b65 verified
# app/pipeline.py
import os
import json
import logging
import re
from typing import Any, Dict, List, Optional, Tuple
import pandas as pd
import numpy as np
logger = logging.getLogger("pipeline")
logger.setLevel(logging.INFO)
# ---------------------------
# Safety / helpers
# ---------------------------
DISALLOWED_PATTERNS = [
r"__\w+__", r"\bimport\s+os\b", r"\bimport\s+sys\b", r"\bimport\s+subprocess\b",
r"\bimport\s+socket\b", r"\bimport\s+requests\b", r"\bopen\s*\(", r"\beval\s*\(",
r"\bexec\s*\(", r"\bcompile\s*\(", r"\bsystem\s*\(", r"\bPopen\b", r"\bsh\b",
]
def code_is_safe(code: str) -> Tuple[bool, Optional[str]]:
for pat in DISALLOWED_PATTERNS:
if re.search(pat, code):
return False, f"disallowed pattern: {pat}"
return True, None
def to_json_serializable(obj):
if isinstance(obj, (np.integer,)):
return int(obj)
if isinstance(obj, (np.floating,)):
return float(obj)
if isinstance(obj, (np.ndarray,)):
return obj.tolist()
if pd.isna(obj):
return None
return obj
def simple_schema(df: pd.DataFrame) -> Dict[str, Any]:
return {
"columns": [
{"name": c, "dtype": str(df[c].dtype), "n_unique": int(df[c].nunique(dropna=True))}
for c in df.columns
],
"n_rows": int(len(df)),
}
# ---------------------------
# Ingest / preprocess / sampling
# ---------------------------
def ingest_file(path: str, sheet: Optional[str] = None) -> Tuple[pd.DataFrame, Dict[str, Any]]:
ext = os.path.splitext(path)[1].lower()
metadata = {"file_type": ext, "sheet_names": None, "selected_sheet": None}
if ext in [".csv", ".txt"]:
df = pd.read_csv(path)
metadata["selected_sheet"] = "csv"
elif ext in [".xls", ".xlsx"]:
xls = pd.ExcelFile(path)
sheets = xls.sheet_names
metadata["sheet_names"] = sheets
chosen = sheet if sheet and sheet in sheets else sheets[0]
metadata["selected_sheet"] = chosen
df = pd.read_excel(xls, sheet_name=chosen)
else:
raise ValueError("Unsupported file type: " + ext)
metadata["file_type"] = ext
return df, metadata
def preprocess_df(df: pd.DataFrame) -> Tuple[pd.DataFrame, Dict[str, Any]]:
actions = []
df = df.copy()
df.columns = [str(c).strip() for c in df.columns]
object_cols = df.select_dtypes(include="object").columns.tolist()
for c in object_cols:
try:
df[c] = df[c].where(df[c].isna(), df[c].astype(str).str.strip())
except Exception:
pass
for c in df.columns:
if df[c].dtype == object:
coerced = pd.to_numeric(df[c], errors="coerce")
non_na = coerced.notna().sum()
if non_na >= max(1, 0.5 * len(df)):
df[c] = coerced
actions.append(f"coerced {c} -> numeric")
num_cols = df.select_dtypes(include=[np.number]).columns.tolist()
for c in num_cols:
median = df[c].median()
if pd.isna(median):
median = 0
df[c] = df[c].fillna(median)
actions.append(f"filled numeric {c} nulls with median {median}")
for c in object_cols:
try:
mode = df[c].mode().iloc[0] if not df[c].mode().empty else ""
except Exception:
mode = ""
df[c] = df[c].fillna(mode)
actions.append(f"filled object {c} nulls with mode '{mode}'")
schema = simple_schema(df)
return df, {"actions": actions, "schema": schema}
def sample_df(df: pd.DataFrame, n: int = 5) -> Dict[str, Any]:
head = df.head(n).to_dict(orient="records")
tail = df.tail(n).to_dict(orient="records")
rnd = df.sample(n=n, random_state=42).to_dict(orient="records") if len(df) > n else df.sample(frac=1.0).to_dict(orient="records")
return {"head": head, "tail": tail, "random": rnd}
# ---------------------------
# Deterministic chart generator (fallback)
# ---------------------------
def _pick_categorical(df: pd.DataFrame) -> Optional[str]:
for c in df.columns:
if df[c].dtype == object or df[c].nunique() < max(50, 0.5 * len(df)):
return c
return None
def _pick_numeric(df: pd.DataFrame, exclude: List[str] = []) -> Optional[str]:
for c in df.select_dtypes(include=[np.number]).columns:
if c not in exclude:
return c
for c in df.columns:
try:
coerced = pd.to_numeric(df[c], errors="coerce")
if coerced.notna().sum() > 0:
return c
except Exception:
continue
return None
def _pick_datetime_or_index(df: pd.DataFrame) -> Optional[str]:
for c in df.columns:
if np.issubdtype(df[c].dtype, np.datetime64):
return c
for c in df.columns:
try:
parsed = pd.to_datetime(df[c], errors="coerce")
if parsed.notna().sum() > 0:
return c
except Exception:
continue
return None
def _box_aggregate(df: pd.DataFrame, cat_col: Optional[str], num_col: str) -> List[Dict[str, Any]]:
out = []
if cat_col is None:
series = df[num_col].dropna()
if len(series) == 0:
return out
q1 = float(series.quantile(0.25))
median = float(series.quantile(0.5))
q3 = float(series.quantile(0.75))
out.append({"category": None, "q1": q1, "median": median, "q3": q3})
return out
for name, group in df.groupby(cat_col):
ser = group[num_col].dropna()
if len(ser) == 0:
continue
q1 = float(ser.quantile(0.25))
median = float(ser.quantile(0.5))
q3 = float(ser.quantile(0.75))
out.append({"category": to_json_serializable(name), "q1": q1, "median": median, "q3": q3})
return out
def generate_chart_data_from_spec(df: pd.DataFrame, spec: Dict[str, Any]) -> Tuple[str, List[Dict[str, Any]]]:
chart_type = spec.get("chart_type")
cols = spec.get("target_columns", [])
agg = spec.get("aggregation", None)
df_local = df.copy()
if chart_type == "pie":
if len(cols) == 0:
col = _pick_categorical(df_local)
cols = [col] if col else []
if len(cols) == 1:
col = cols[0]
series = df_local[col].astype(str).value_counts(dropna=True)
return "pie", [{"name": k, "value": int(v)} for k, v in series.items()]
else:
cat, val = cols[0], cols[1]
grouped = df_local.groupby(cat)[val].sum().reset_index()
return "pie", [{"name": r[cat], "value": to_json_serializable(r[val])} for r in grouped.to_dict(orient="records")]
if chart_type == "bar":
if len(cols) == 0:
label = _pick_categorical(df_local)
cols = [label] if label else []
if len(cols) == 1:
label = cols[0]
series = df_local[label].astype(str).value_counts().reset_index()
series.columns = [label, "count"]
return "bar", [{"label": r[label], "count": int(r["count"])} for r in series.to_dict(orient="records")]
else:
label, metric = cols[0], cols[1]
if agg in (None, "", "sum"):
grouped = df_local.groupby(label)[metric].sum().reset_index()
elif agg == "mean":
grouped = df_local.groupby(label)[metric].mean().reset_index()
else:
grouped = df_local.groupby(label)[metric].sum().reset_index()
return "bar", [{"label": r[label], metric: to_json_serializable(r[metric])} for r in grouped.to_dict(orient="records")]
if chart_type == "line":
if len(cols) < 2:
y = _pick_numeric(df_local)
x = _pick_datetime_or_index(df_local)
cols = [x, y]
xcol, ycol = cols[0], cols[1]
s_x = pd.to_datetime(df_local[xcol], errors="coerce") if xcol in df_local.columns else pd.Series(range(len(df_local)))
series = pd.DataFrame({"x": s_x, "y": df_local[ycol]})
try:
series = series.sort_values("x").reset_index(drop=True)
except Exception:
pass
out = []
for r in series.to_dict(orient="records"):
x_val = r["x"].isoformat() if hasattr(r["x"], "isoformat") else r["x"]
out.append({"x": to_json_serializable(x_val), "y": to_json_serializable(r["y"])})
return "line", out
if chart_type == "scatter":
if len(cols) < 2:
x = _pick_numeric(df_local)
y = _pick_numeric(df_local, exclude=[x]) if x else None
cols = [x, y]
xcol, ycol = cols[0], cols[1]
return "scatter", [{"x": to_json_serializable(r[xcol]), "y": to_json_serializable(r[ycol])} for r in df_local[[xcol, ycol]].to_dict(orient="records")]
if chart_type == "histogram":
col = cols[0] if cols else _pick_numeric(df_local)
series = df_local[col].dropna()
counts, bin_edges = np.histogram(series, bins=10)
out = []
for i in range(len(counts)):
out.append({"bin": f"{float(bin_edges[i]):.6g}-{float(bin_edges[i+1]):.6g}", "count": int(counts[i])})
return "histogram", out
if chart_type == "boxplot":
if len(cols) == 0:
num = _pick_numeric(df_local)
return "boxplot", _box_aggregate(df_local, None, num)
if len(cols) == 1:
num = cols[0]
return "boxplot", _box_aggregate(df_local, None, num)
cat, num = cols[0], cols[1]
return "boxplot", _box_aggregate(df_local, cat, num)
return chart_type or "unknown", []
# ---------------------------
# Gemini wrapper (optional)
# ---------------------------
def gemini_generate_json(model: str, system_instruction: str, user_content: str, require_json: bool = True) -> Any:
"""
Attempts to call google-genai if GEMINI_API_KEY is provided.
Falls back to returning a safe marker that indicates a deterministic fallback should be used.
"""
api_key = "AIzaSyDfy0E-9b2XjoYHrHX2C1nVLHWyrWUFkMs"
if not api_key:
logger.info("GEMINI_API_KEY not set; using deterministic fallbacks.")
return {"__use_fallback__": True}
try:
# import lazily
from google import genai
from google.genai import types
except Exception as e:
logger.exception("google-genai not available: %s", e)
return {"__use_fallback__": True}
client = genai.Client(api_key=api_key)
contents = [
types.Content(
role="user",
parts=[types.Part.from_text(text=user_content)],
)
]
config = types.GenerateContentConfig(
thinking_config=types.ThinkingConfig(thinking_budget=0),
response_mime_type="application/json",
system_instruction=[types.Part.from_text(text=system_instruction)],
)
full_text = ""
try:
for chunk in client.models.generate_content_stream(model=model, contents=contents, config=config):
if hasattr(chunk, "text") and chunk.text:
full_text += chunk.text
elif (
chunk.candidates
and chunk.candidates[0].content
and chunk.candidates[0].content.parts
and chunk.candidates[0].content.parts[0].text
):
full_text += chunk.candidates[0].content.parts[0].text
full_text = full_text.strip()
if require_json:
try:
return json.loads(full_text)
except Exception:
return {"__raw_text__": full_text}
return full_text
except Exception as e:
logger.exception("genai call failed: %s", e)
return {"__use_fallback__": True}
# ---------------------------
# Controller
# ---------------------------
DEFAULT_MODEL = "gemini-2.5-flash-lite"
def ensure_six_tasks(tasks: List[Dict[str, Any]], df: pd.DataFrame) -> List[Dict[str, Any]]:
existing_types = [t.get("chart_type") for t in tasks]
candidates = ["pie", "bar", "line", "scatter", "histogram", "boxplot"]
out = tasks[:]
for ct in candidates:
if len(out) >= 6:
break
if ct not in existing_types:
if ct == "pie":
col = _pick_categorical(df) or df.columns[0]
out.append({"chart_type": "pie", "target_columns": [col], "aggregation": "count", "reasoning": "fallback pie"})
elif ct == "bar":
label = _pick_categorical(df) or df.columns[0]
num = _pick_numeric(df) or df.columns[0]
out.append({"chart_type": "bar", "target_columns": [label, num], "aggregation": "sum", "reasoning": "fallback bar"})
elif ct == "line":
y = _pick_numeric(df) or df.columns[0]
x = _pick_datetime_or_index(df) or df.index.name or "index"
if x == "index":
out.append({"chart_type": "line", "target_columns": [x, y], "aggregation": None, "reasoning": "fallback line on index"})
else:
out.append({"chart_type": "line", "target_columns": [x, y], "aggregation": None, "reasoning": "fallback line"})
elif ct == "scatter":
x = _pick_numeric(df)
y = _pick_numeric(df, exclude=[x]) or x
out.append({"chart_type": "scatter", "target_columns": [x, y], "aggregation": None, "reasoning": "fallback scatter"})
elif ct == "histogram":
num = _pick_numeric(df) or df.columns[0]
out.append({"chart_type": "histogram", "target_columns": [num], "aggregation": None, "reasoning": "fallback histogram"})
elif ct == "boxplot":
num = _pick_numeric(df) or df.columns[0]
cat = _pick_categorical(df)
if cat:
out.append({"chart_type": "boxplot", "target_columns": [cat, num], "aggregation": None, "reasoning": "fallback boxplot by category"})
else:
out.append({"chart_type": "boxplot", "target_columns": [num], "aggregation": None, "reasoning": "fallback boxplot global"})
return out
def process_file(path: str, sheet: Optional[str] = None, model: str = DEFAULT_MODEL) -> Dict[str, Any]:
df, meta = ingest_file(path, sheet)
pre_df, preprocess_meta = preprocess_df(df)
samples = sample_df(pre_df, n=5)
classification_input = json.dumps({"samples": samples, "schema": simple_schema(pre_df), "meta": meta})
classification_output = gemini_generate_json(
model=model,
system_instruction="You are a classification agent. Identify domain and chart tasks.",
user_content=classification_input,
require_json=True,
)
if not isinstance(classification_output, dict) or "tasks" not in classification_output:
fallback = {
"domain": "unknown",
"tasks": [
{"chart_type": "pie", "target_columns": [_pick_categorical(pre_df) or pre_df.columns[0]], "aggregation": "count", "reasoning": "fallback"},
{"chart_type": "bar", "target_columns": [_pick_categorical(pre_df) or pre_df.columns[0], _pick_numeric(pre_df) or pre_df.columns[0]], "aggregation": "sum", "reasoning": "fallback"},
],
}
classification_output = fallback
planning_input = json.dumps({"classification": classification_output, "schema": simple_schema(pre_df), "samples": samples})
planning_output = gemini_generate_json(
model=model,
system_instruction="You are a planning agent. Produce tasks with code assigning `result` variable.",
user_content=planning_input,
require_json=True,
)
tasks = []
if isinstance(planning_output, dict) and "tasks" in planning_output:
tasks = planning_output["tasks"]
else:
tasks = classification_output.get("tasks", [])
tasks = ensure_six_tasks(tasks, pre_df)
final = {"pie": [], "bar": [], "line": [], "scatter": [], "histogram": [], "boxplot": []}
execution_errors = []
for idx, task in enumerate(tasks):
chart_type = task.get("chart_type")
code_snippet = task.get("code")
executed = False
if code_snippet:
safe, reason = code_is_safe(code_snippet)
if not safe:
logger.warning("Rejected unsafe code snippet: %s", reason)
else:
allowed_globals = {
"__builtins__": {"None": None, "True": True, "False": False, "len": len, "min": min, "max": max, "sum": sum, "sorted": sorted, "round": round},
"pd": pd, "np": np, "df": pre_df.copy(),
}
local_vars = {}
try:
exec(code_snippet, allowed_globals, local_vars)
result = None
if "result" in local_vars:
result = local_vars["result"]
elif "output" in local_vars:
result = local_vars["output"]
else:
single_expr = (("\n" not in code_snippet) and ("=" not in code_snippet) and ("return" not in code_snippet) and (not code_snippet.strip().startswith("def ")))
if single_expr:
try:
result = eval(code_snippet, allowed_globals, {})
except Exception:
result = None
result_json = None
if isinstance(result, pd.DataFrame):
result_json = [{k: to_json_serializable(v) for k, v in r.items()} for r in result.to_dict(orient="records")]
elif isinstance(result, list):
norm = []
for r in result:
if isinstance(r, dict):
norm.append({k: to_json_serializable(v) for k, v in r.items()})
else:
norm.append(to_json_serializable(r))
result_json = norm
elif isinstance(result, dict):
result_json = [{k: to_json_serializable(v) for k, v in result.items()}]
else:
result_json = None
if result_json is not None:
normalized = []
for item in result_json:
if isinstance(item, dict):
normalized.append(item)
else:
normalized.append({"value": to_json_serializable(item)})
if chart_type in final:
final[chart_type].extend(normalized)
else:
final.setdefault(chart_type, []).extend(normalized)
executed = True
if not executed:
execution_errors.append({"task_index": idx, "reason": "result not list-of-dicts or missing", "code": code_snippet})
except Exception as e:
logger.exception("Model code execution failed for task %s: %s", idx, str(e))
execution_errors.append({"task_index": idx, "reason": "exception during exec/eval", "exception": str(e), "code": code_snippet})
if not executed:
ct, res = generate_chart_data_from_spec(pre_df, task)
final.setdefault(ct, []).extend(res)
for k in final:
if isinstance(final[k], list):
final[k] = final[k][:200]
output_payload = {
"metadata": {
"source_file": os.path.basename(path),
"ingestion_meta": meta,
"preprocess_meta": preprocess_meta,
"classification": classification_output if isinstance(classification_output, dict) else {"raw": classification_output},
"planning_meta": planning_output if isinstance(planning_output, dict) else {"raw": planning_output},
"execution_errors": execution_errors,
},
"charts": final,
}
return output_payload