analyst-buddy / server /data_card.py
hjerpe's picture
F006/F008: serve Qwen models + model switcher (vanilla-first)
656f91e verified
Raw
History Blame Contribute Delete
20.2 kB
"""Per-dataset advisory ``DataCard`` (F003, Slice S2).
After ingestion, profile the resulting SQLite table deterministically with DuckDB
``SUMMARIZE`` and assemble a Pydantic ``DataCard`` of *editable assumptions*: the
model-free profile plus a plain-language description/gotcha layer (from an
injectable ``describe_fn``, or a deterministic template fallback when none is
given) plus the user's optional hints. This is the cheap, research-backed accuracy
lever for a small model (ADR 0007 / ADR 0009).
The card is **advisory** — ``DESCRIBE``/``SAMPLE``/``QUERY`` remains the
load-bearing path; the card never replaces them. ``propose_data_card`` runs once
on upload (off the per-query latency path); ``apply_card_edits`` is the confirm
step (user edits win, the deterministic profile is never overwritten);
``render_data_context`` is a PURE helper F004 will use to inject the card later —
it does NOT touch ``get_system_prompt`` or the trained prompt path here.
``DataCard`` lives in THIS module, NOT root ``models.py``: it is ``serve``-only and
must not widen the lightweight contract that ``sql_environment`` imports. The
module is dependency-light (no ``trl``/``torch``/``transformers`` on import).
"""
from collections.abc import Callable
from pathlib import Path
import duckdb
from pydantic import BaseModel, Field, field_validator
try:
from .sql_ident import is_valid_identifier, quote_ident
except ImportError: # pragma: no cover - flat-layout / direct-run fallback
from sql_ident import is_valid_identifier, quote_ident # type: ignore[no-redef]
# How many distinct sample values to surface per column (advisory only).
_SAMPLE_LIMIT = 5
# A column at or below this distinct count is treated as a likely coded/categorical
# value by the deterministic fallback describer.
_LOW_CARDINALITY = 10
# Module-private alias onto the shared, single-source identifier quoter
# (``server/sql_ident.py``). Kept so existing call sites (and any local readers)
# can keep using the by-convention-private name while the logic lives once.
_quote_ident = quote_ident
def _validate_table_name(table: str) -> str:
"""Fail-closed validation of a public-boundary table name.
Raises ``ValueError`` unless ``table`` matches ``^[A-Za-z0-9_]+$`` (the env's
identifier contract), so injection attempts via the table argument never reach
SQL execution.
"""
if not is_valid_identifier(table):
raise ValueError(f"Invalid table name: {table!r}.")
return table
class ColumnProfile(BaseModel):
"""Deterministic, model-free profile of one column (DuckDB SUMMARIZE)."""
name: str = Field(..., description="Original (pre-normalization) column name.")
safe_name: str = Field(
..., description="SQL-safe column name as written to SQLite."
)
sqlite_type: str = Field(
..., description="Declared SQLite affinity, e.g. INTEGER/REAL/TEXT."
)
duckdb_type: str = Field(
..., description="DuckDB sniffed type, e.g. BIGINT/DOUBLE/VARCHAR."
)
null_pct: float = Field(..., description="Percentage of NULL values, 0.0–100.0.")
cardinality: int = Field(
..., description="Approximate distinct count (SUMMARIZE approx_unique)."
)
min: str | None = Field(default=None, description="Min value as string, or None.")
max: str | None = Field(default=None, description="Max value as string, or None.")
sample_values: list[str] = Field(
default_factory=list,
description="A few distinct sample values (advisory).",
)
class ColumnCard(BaseModel):
"""One column's editable assumptions = profile + advisory layer + user hint."""
profile: ColumnProfile = Field(
..., description="The deterministic profile (not user-editable)."
)
description: str = Field(
default="",
description="Plain-language meaning (advisory; model- or template-generated).",
)
gotcha: str = Field(default="", description="Coded-values/units caveat (advisory).")
user_hint: str = Field(
default="",
description="Optional per-column hint the user supplies/edits (user wins).",
)
class DataCard(BaseModel):
"""Per-dataset advisory context. Round-trips losslessly to/from JSON."""
db_id: str = Field(
..., description="Normalized db_id (matches the SQLite dir/file name)."
)
table: str = Field(..., description="Table name inside the SQLite DB.")
table_description: str = Field(
default="",
description="One-line plain-language description of the dataset (advisory).",
)
user_hint: str = Field(
default="",
description="The user's optional one-line 'What's in this data?' hint.",
)
row_count: int = Field(..., description="Number of rows ingested.")
columns: list[ColumnCard] = Field(
..., description="Per-column editable assumptions, in column order."
)
advisory: bool = Field(
default=True,
description="Always True — the card is advisory, not ground truth.",
)
@field_validator("advisory")
@classmethod
def _advisory_always_true(cls, _value: bool) -> bool:
# The card is advisory by contract; never let a caller flip it off.
return True
# Injection contract for the (optional) LLM description pass. Off the per-query
# latency path; runs once on upload; deterministic fallback when None.
# (profiles, table_description_seed) -> {column_safe_name: {"description", "gotcha"}}
# A "__table__" key MAY be returned to supply the table-level description.
DescribeFn = Callable[[list[ColumnProfile], str], dict[str, dict[str, str]]]
_TABLE_DESCRIBE_KEY = "__table__"
def _sqlite_affinities(source: str | Path, table: str) -> dict[str, str]:
"""Map each column to its declared SQLite affinity via PRAGMA table_info."""
import sqlite3
conn = sqlite3.connect(f"file:{Path(source)}?mode=ro", uri=True)
try:
rows = conn.execute(f"PRAGMA table_info({_quote_ident(table)})").fetchall()
finally:
conn.close()
# PRAGMA columns: (cid, name, type, notnull, dflt_value, pk)
return {row[1]: (row[2] or "") for row in rows}
def _read_table_df(
source: str | Path, table: str, affinities: dict[str, str]
) -> "pd.DataFrame": # noqa: F821
"""Read a whole SQLite table into a pandas DataFrame via the stdlib driver.
Stays fully local/offline (ADR 0009): no DuckDB ``sqlite`` extension and no
``INSTALL``/``LOAD`` network call. DuckDB summarizes the resulting in-memory
DataFrame with no extension at all.
pandas widens a SQLite INTEGER column that contains a NULL to ``float64`` (so
``1`` reads as ``1.0``), which would make DuckDB report a DOUBLE min/max. Cast
such columns back to nullable ``Int64`` using the declared SQLite affinity so
the profile faithfully reflects the stored integer type (parity with the old
ATTACH path).
"""
import sqlite3
import pandas as pd
conn = sqlite3.connect(f"file:{Path(source)}?mode=ro", uri=True)
try:
df = pd.read_sql_query(f"SELECT * FROM {_quote_ident(table)}", conn)
finally:
conn.close()
for column in df.columns:
if "INT" not in affinities.get(column, "").upper():
continue
if not str(df[column].dtype).startswith("float"):
continue
non_null = df[column].dropna()
if non_null.empty or not (non_null == non_null.round()).all():
continue
df[column] = df[column].astype("Int64")
return df
def _sample_values(
con: duckdb.DuckDBPyConnection, relation: str, column: str
) -> list[str]:
"""A few distinct non-null sample values for a column, as strings (advisory).
Ordered so the profile (and the cached card built from it) is deterministic
across calls — an unordered ``DISTINCT ... LIMIT`` returns rows in an
unstable order. ``relation`` is a DuckDB relation name (the registered df).
"""
col = _quote_ident(column)
rows = con.execute(
f"SELECT DISTINCT {col} FROM {_quote_ident(relation)} "
f"WHERE {col} IS NOT NULL ORDER BY {col} LIMIT {_SAMPLE_LIMIT}"
).fetchall()
return [str(row[0]) for row in rows]
def profile_table(source: str | Path, table: str = "data") -> list[ColumnProfile]:
"""Profile a SQLite table with DuckDB ``SUMMARIZE`` (deterministic, no model).
Reads per column: min/max (as strings), null_pct (0.0–100.0), approx distinct
count, a few distinct sample values, plus both the declared SQLite affinity and
the DuckDB sniffed type. ``source`` is a path to the ``.sqlite`` file.
The table is read through the stdlib ``sqlite3`` driver into a pandas
DataFrame, then summarized in-memory by DuckDB — no DuckDB ``sqlite``
extension and no network ``INSTALL`` (stays local/offline per ADR 0009).
"""
_validate_table_name(table)
affinities = _sqlite_affinities(source, table)
df = _read_table_df(source, table, affinities)
con = duckdb.connect()
try:
con.register("source_df", df)
summary = con.execute("SUMMARIZE source_df").fetchall()
summary_cols = [desc[0] for desc in con.description]
profiles: list[ColumnProfile] = []
for raw in summary:
row = dict(zip(summary_cols, raw))
name = row["column_name"]
null_pct = row.get("null_percentage")
cardinality = row.get("approx_unique")
profiles.append(
ColumnProfile(
name=name,
safe_name=name,
sqlite_type=affinities.get(name, ""),
duckdb_type=str(row.get("column_type", "")),
null_pct=float(null_pct) if null_pct is not None else 0.0,
cardinality=int(cardinality) if cardinality is not None else 0,
min=None if row.get("min") is None else str(row["min"]),
max=None if row.get("max") is None else str(row["max"]),
sample_values=_sample_values(con, "source_df", name),
)
)
return profiles
finally:
con.close()
def _is_int_like(profile: ColumnProfile) -> bool:
"""True if the column reads as an integer (SQLite INTEGER or DuckDB *INT)."""
sqlite_type = profile.sqlite_type.upper()
duckdb_type = profile.duckdb_type.upper()
return "INT" in sqlite_type or "INT" in duckdb_type
def _fallback_describe(
profiles: list[ColumnProfile], table_description_seed: str
) -> dict[str, dict[str, str]]:
"""Deterministic, model-free describe with the same shape as ``DescribeFn``.
Templates a plain-language description and a gotcha per column from the
profile alone — e.g. a low-cardinality integer column is flagged as a likely
coded/categorical value. Stable across calls (no model, no randomness).
"""
out: dict[str, dict[str, str]] = {}
for profile in profiles:
int_like = _is_int_like(profile)
low_card = profile.cardinality <= _LOW_CARDINALITY
if int_like and low_card:
description = (
f"Integer column '{profile.safe_name}' with only "
f"{profile.cardinality} distinct values — looks like a "
f"coded/categorical value."
)
gotcha = (
"Low-cardinality integer: likely a coded/categorical value "
"(verify the code meanings with SAMPLE)."
)
elif int_like:
description = f"Integer column '{profile.safe_name}'."
gotcha = ""
elif (
"DOUBLE" in profile.duckdb_type.upper()
or "REAL" in profile.sqlite_type.upper()
):
description = f"Numeric column '{profile.safe_name}'."
gotcha = ""
else:
description = (
f"Text column '{profile.safe_name}' with "
f"{profile.cardinality} distinct values."
)
gotcha = ""
out[profile.safe_name] = {"description": description, "gotcha": gotcha}
seed = table_description_seed.strip()
table_text = (
f"Dataset: {seed}." if seed else "Dataset profiled from the uploaded CSV."
)
out[_TABLE_DESCRIBE_KEY] = {"description": table_text, "gotcha": ""}
return out
def _resolve_column_key(
key: str, by_safe: dict[str, str], by_original: dict[str, str]
) -> str:
"""Resolve a column hint/edit key to its safe_name, accepting either form.
A key may be the original CSV header (``"Order Status"``) OR the safe_name
(``"order_status"``). Raises ``KeyError`` if it matches neither, so a typo /
stale header is never silently dropped.
"""
if key in by_safe:
return key
if key in by_original:
return by_original[key]
raise KeyError(f"Unknown column key {key!r}: not an original header or safe_name.")
def propose_data_card(
source: str | Path,
*,
db_id: str,
table: str = "data",
describe_fn: DescribeFn | None = None,
user_hint: str = "",
column_hints: dict[str, str] | None = None,
column_mapping: dict[str, str] | None = None,
) -> DataCard:
"""Assemble a ``DataCard`` of editable assumptions. Runs ONCE on upload.
Combines ``profile_table(...)`` with a description layer (``describe_fn`` if
supplied, else the deterministic ``_fallback_describe``) and the user's hints.
Does NOT call any model unless ``describe_fn`` is supplied (so the default
path is fully testable without an LLM and stays off the per-query latency path).
``column_mapping`` is the original→safe header map (e.g. from
``IngestResult.column_mapping``). When supplied, each profile records its
ORIGINAL header as ``name`` so the card round-trips back to source columns, and
``column_hints`` keys may be the original header OR the safe_name. A hint key
matching neither raises ``KeyError`` (never silently dropped).
"""
_validate_table_name(table)
profiles = profile_table(source, table)
mapping = column_mapping or {}
safe_to_original = {safe: original for original, safe in mapping.items()}
if safe_to_original:
profiles = [
profile.model_copy(
update={"name": safe_to_original.get(profile.safe_name, profile.name)}
)
for profile in profiles
]
seed = user_hint or db_id
describe = describe_fn or _fallback_describe
described = describe(profiles, seed)
by_safe = {profile.safe_name: profile for profile in profiles}
by_original = {profile.name: profile.safe_name for profile in profiles}
resolved_hints: dict[str, str] = {}
for key, value in (column_hints or {}).items():
resolved_hints[_resolve_column_key(key, by_safe, by_original)] = value
columns: list[ColumnCard] = []
for profile in profiles:
layer = described.get(profile.safe_name, {})
columns.append(
ColumnCard(
profile=profile,
description=layer.get("description", ""),
gotcha=layer.get("gotcha", ""),
user_hint=resolved_hints.get(profile.safe_name, ""),
)
)
table_layer = described.get(_TABLE_DESCRIBE_KEY, {})
row_count = _row_count(source, table)
return DataCard(
db_id=db_id,
table=table,
table_description=table_layer.get("description", ""),
user_hint=user_hint,
row_count=row_count,
columns=columns,
)
def _row_count(source: str | Path, table: str) -> int:
"""Count rows in the ingested table (read-only).
Fail-closed: validates the table name against the env's identifier contract
and quotes the identifier so an injection attempt raises rather than executes.
"""
import sqlite3
_validate_table_name(table)
conn = sqlite3.connect(f"file:{Path(source)}?mode=ro", uri=True)
try:
return int(
conn.execute(f"SELECT COUNT(*) FROM {_quote_ident(table)}").fetchone()[0]
)
finally:
conn.close()
def apply_card_edits(card: DataCard, edits: dict) -> DataCard:
"""Confirm step. Return a NEW ``DataCard`` with user edits merged over proposals.
User values win. The deterministic ``profile`` is never overwritten; unknown
*top-level* keys are silently ignored. Column edit keys may be the ORIGINAL
header OR the safe_name; a column key matching NEITHER raises ``KeyError`` (a
stale/typo'd header is never silently dropped). ``edits`` shape::
{"table_description": str, "user_hint": str,
"columns": {<original-header-or-safe_name>:
{"description": str, "gotcha": str, "user_hint": str}}}
"""
updated = card.model_copy(deep=True)
if isinstance(edits.get("table_description"), str):
updated.table_description = edits["table_description"]
if isinstance(edits.get("user_hint"), str):
updated.user_hint = edits["user_hint"]
column_edits = edits.get("columns")
if isinstance(column_edits, dict):
by_safe = {col.profile.safe_name: col for col in updated.columns}
by_original = {
col.profile.name: col.profile.safe_name for col in updated.columns
}
for key, column_edit in column_edits.items():
safe_name = _resolve_column_key(key, by_safe, by_original)
target = by_safe[safe_name]
if not isinstance(column_edit, dict):
continue # malformed edit payload — ignore the value, key was valid
for editable in ("description", "gotcha", "user_hint"):
value = column_edit.get(editable)
if isinstance(value, str):
setattr(target, editable, value)
return updated
def render_data_context(card: DataCard) -> str:
"""PURE helper: render the card as a short fenced advisory 'Data context' block.
Does NOT touch ``get_system_prompt`` or the trained prompt path — F004 wires
injection later (ADR 0007 caveat: the trained policy never saw this block).
Marks the block as user-provided hints that may be incomplete or wrong.
"""
lines: list[str] = [
"```data-context",
f"Data context for table '{card.table}' "
"(user-provided hints — may be incomplete or wrong; "
"verify with DESCRIBE/SAMPLE):",
]
if card.table_description:
lines.append(f"- {card.table_description}")
if card.user_hint:
lines.append(f"- User hint: {card.user_hint}")
for column in card.columns:
detail = column.user_hint or column.description
if not detail and not column.gotcha:
continue
parts = [f"- {column.profile.safe_name}"]
if detail:
parts.append(detail)
if column.gotcha:
parts.append(f"(gotcha: {column.gotcha})")
lines.append(": ".join(parts[:2]) + (f" {parts[2]}" if len(parts) > 2 else ""))
lines.append("```")
return "\n".join(lines)
def _sidecar_path(root: str | Path, db_id: str) -> Path:
return Path(root) / db_id / f"{db_id}.datacard.json"
def save_data_card(card: DataCard, root: str | Path) -> Path:
"""Persist the card as a JSON sidecar at ``<root>/<db_id>/<db_id>.datacard.json``.
Uses ``model_dump_json`` for a lossless round-trip. Returns the sidecar path.
"""
path = _sidecar_path(root, card.db_id)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(card.model_dump_json(), encoding="utf-8")
return path
def load_data_card(root: str | Path, db_id: str) -> DataCard | None:
"""Load a cached card sidecar if present (``model_validate_json``), else None.
Missing-file behavior is explicit: a non-existent sidecar returns ``None``
(never raises). After a re-ingest with ``if_exists="replace"`` invalidates the
sidecar, this returns ``None`` so a stale card is never served.
"""
path = _sidecar_path(root, db_id)
if not path.exists():
return None
return DataCard.model_validate_json(path.read_text(encoding="utf-8"))