File size: 4,622 Bytes
b67668b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 | from __future__ import annotations
from typing import Any, List
import pandas as pd
from pydantic import BaseModel, Field, ValidationError
import math
class FilterSpec(BaseModel):
column: str
op: str = Field(..., description="one of: eq, neq, contains, in, gte, lte")
value: Any
class QuerySpec(BaseModel):
select: List[str] = Field(default_factory=list)
filters: List[FilterSpec] = Field(default_factory=list)
distinct: bool = True
limit: int = 50
SYSTEM_PROMPT = """You are a data query planner.
You receive:
- user_question
- available_columns
- sample_values (small)
Return ONLY valid JSON for QuerySpec with:
- select: columns needed
- filters: list of {column, op, value}
- distinct: true/false
- limit: integer <= 200
Rules:
- Prefer deterministic, simple filters.
- If the user asks for a department like "Artificial Intelligence", filter Department equals that exact string.
- If user says AI department, treat it as Department in ["Artificial Intelligence","AI/ML"] unless user explicitly excludes AI/ML.
- Do NOT invent columns.
"""
def _json_safe(obj):
"""Recursively convert NaN/inf to None so payload becomes valid JSON."""
if obj is None:
return None
if isinstance(obj, float):
if math.isnan(obj) or math.isinf(obj):
return None
return obj
if isinstance(obj, dict):
return {k: _json_safe(v) for k, v in obj.items()}
if isinstance(obj, list):
return [_json_safe(v) for v in obj]
return obj
def _openai_client(api_key: str):
from openai import OpenAI
return OpenAI(api_key=api_key)
def plan_query_with_llm(user_question: str, df: pd.DataFrame, api_key: str, model: str = "gpt-4.1-mini") -> QuerySpec:
cols = list(df.columns)
sample_df = df.head(8).copy()
sample_df = sample_df.where(pd.notna(sample_df), None)
sample = _json_safe(sample_df.to_dict(orient="records"))
client = _openai_client(api_key)
import json # add at top of file if not present
payload = {
"user_question": user_question,
"available_columns": cols,
"sample_values": sample,
}
resp = client.responses.create(
model=model,
input=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(payload)},
],
temperature=0,
max_output_tokens=600,
)
text = resp.output_text
try:
spec = QuerySpec.model_validate_json(text)
spec.limit = max(1, min(int(spec.limit), 200))
return spec
except ValidationError as e:
raise ValueError(f"Could not parse model output as QuerySpec JSON. Raw output:\\n{text}\\n\\nError:\\n{e}")
def execute_query(spec: QuerySpec, df: pd.DataFrame) -> pd.DataFrame:
out = df.copy()
for f in spec.filters:
col = f.column
if col not in out.columns:
continue
op = f.op
val = f.value
if op == "eq":
if val is None:
out = out[out[col].isna()]
else:
out = out[out[col] == val]
elif op == "neq":
if val is None:
out = out[out[col].notna()]
else:
out = out[out[col] != val]
elif op == "contains":
out = out[out[col].astype(str).str.contains(str(val), case=False, na=False)]
elif op == "in":
if not isinstance(val, list):
val = [val]
out = out[out[col].isin(val)]
elif op == "gte":
out = out[pd.to_numeric(out[col], errors="coerce") >= float(val)]
elif op == "lte":
out = out[pd.to_numeric(out[col], errors="coerce") <= float(val)]
if spec.select:
safe_select = [c for c in spec.select if c in out.columns]
out = out[safe_select]
if spec.distinct:
out = out.drop_duplicates()
return out.head(spec.limit)
def summarize_results_with_llm(user_question: str, result_df: pd.DataFrame, api_key: str, model: str = "gpt-4.1-mini") -> str:
client = _openai_client(api_key)
safe_df = result_df.copy().where(pd.notna(result_df), None)
preview = _json_safe(safe_df.to_dict(orient="records"))
import json # add at top if not present
payload = {"question": user_question, "results": preview}
resp = client.responses.create(
model=model,
input=[
{"role": "system", "content": "You are a helpful analyst. Summarize results concisely and accurately."},
{"role": "user", "content": json.dumps(payload)},
],
temperature=0.2,
max_output_tokens=500,
)
return resp.output_text
|