Siddhesh Patil
Initial commit - Self-Correcting Data Validation Agent
b67668b
from __future__ import annotations
from typing import Any, List
import pandas as pd
from pydantic import BaseModel, Field, ValidationError
import math
class FilterSpec(BaseModel):
column: str
op: str = Field(..., description="one of: eq, neq, contains, in, gte, lte")
value: Any
class QuerySpec(BaseModel):
select: List[str] = Field(default_factory=list)
filters: List[FilterSpec] = Field(default_factory=list)
distinct: bool = True
limit: int = 50
SYSTEM_PROMPT = """You are a data query planner.
You receive:
- user_question
- available_columns
- sample_values (small)
Return ONLY valid JSON for QuerySpec with:
- select: columns needed
- filters: list of {column, op, value}
- distinct: true/false
- limit: integer <= 200
Rules:
- Prefer deterministic, simple filters.
- If the user asks for a department like "Artificial Intelligence", filter Department equals that exact string.
- If user says AI department, treat it as Department in ["Artificial Intelligence","AI/ML"] unless user explicitly excludes AI/ML.
- Do NOT invent columns.
"""
def _json_safe(obj):
"""Recursively convert NaN/inf to None so payload becomes valid JSON."""
if obj is None:
return None
if isinstance(obj, float):
if math.isnan(obj) or math.isinf(obj):
return None
return obj
if isinstance(obj, dict):
return {k: _json_safe(v) for k, v in obj.items()}
if isinstance(obj, list):
return [_json_safe(v) for v in obj]
return obj
def _openai_client(api_key: str):
from openai import OpenAI
return OpenAI(api_key=api_key)
def plan_query_with_llm(user_question: str, df: pd.DataFrame, api_key: str, model: str = "gpt-4.1-mini") -> QuerySpec:
cols = list(df.columns)
sample_df = df.head(8).copy()
sample_df = sample_df.where(pd.notna(sample_df), None)
sample = _json_safe(sample_df.to_dict(orient="records"))
client = _openai_client(api_key)
import json # add at top of file if not present
payload = {
"user_question": user_question,
"available_columns": cols,
"sample_values": sample,
}
resp = client.responses.create(
model=model,
input=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(payload)},
],
temperature=0,
max_output_tokens=600,
)
text = resp.output_text
try:
spec = QuerySpec.model_validate_json(text)
spec.limit = max(1, min(int(spec.limit), 200))
return spec
except ValidationError as e:
raise ValueError(f"Could not parse model output as QuerySpec JSON. Raw output:\\n{text}\\n\\nError:\\n{e}")
def execute_query(spec: QuerySpec, df: pd.DataFrame) -> pd.DataFrame:
out = df.copy()
for f in spec.filters:
col = f.column
if col not in out.columns:
continue
op = f.op
val = f.value
if op == "eq":
if val is None:
out = out[out[col].isna()]
else:
out = out[out[col] == val]
elif op == "neq":
if val is None:
out = out[out[col].notna()]
else:
out = out[out[col] != val]
elif op == "contains":
out = out[out[col].astype(str).str.contains(str(val), case=False, na=False)]
elif op == "in":
if not isinstance(val, list):
val = [val]
out = out[out[col].isin(val)]
elif op == "gte":
out = out[pd.to_numeric(out[col], errors="coerce") >= float(val)]
elif op == "lte":
out = out[pd.to_numeric(out[col], errors="coerce") <= float(val)]
if spec.select:
safe_select = [c for c in spec.select if c in out.columns]
out = out[safe_select]
if spec.distinct:
out = out.drop_duplicates()
return out.head(spec.limit)
def summarize_results_with_llm(user_question: str, result_df: pd.DataFrame, api_key: str, model: str = "gpt-4.1-mini") -> str:
client = _openai_client(api_key)
safe_df = result_df.copy().where(pd.notna(result_df), None)
preview = _json_safe(safe_df.to_dict(orient="records"))
import json # add at top if not present
payload = {"question": user_question, "results": preview}
resp = client.responses.create(
model=model,
input=[
{"role": "system", "content": "You are a helpful analyst. Summarize results concisely and accurately."},
{"role": "user", "content": json.dumps(payload)},
],
temperature=0.2,
max_output_tokens=500,
)
return resp.output_text