Spaces:
Running on Zero
Running on Zero
| """Per-dataset advisory ``DataCard`` (F003, Slice S2). | |
| After ingestion, profile the resulting SQLite table deterministically with DuckDB | |
| ``SUMMARIZE`` and assemble a Pydantic ``DataCard`` of *editable assumptions*: the | |
| model-free profile plus a plain-language description/gotcha layer (from an | |
| injectable ``describe_fn``, or a deterministic template fallback when none is | |
| given) plus the user's optional hints. This is the cheap, research-backed accuracy | |
| lever for a small model (ADR 0007 / ADR 0009). | |
| The card is **advisory** — ``DESCRIBE``/``SAMPLE``/``QUERY`` remains the | |
| load-bearing path; the card never replaces them. ``propose_data_card`` runs once | |
| on upload (off the per-query latency path); ``apply_card_edits`` is the confirm | |
| step (user edits win, the deterministic profile is never overwritten); | |
| ``render_data_context`` is a PURE helper F004 will use to inject the card later — | |
| it does NOT touch ``get_system_prompt`` or the trained prompt path here. | |
| ``DataCard`` lives in THIS module, NOT root ``models.py``: it is ``serve``-only and | |
| must not widen the lightweight contract that ``sql_environment`` imports. The | |
| module is dependency-light (no ``trl``/``torch``/``transformers`` on import). | |
| """ | |
| from collections.abc import Callable | |
| from pathlib import Path | |
| import duckdb | |
| from pydantic import BaseModel, Field, field_validator | |
| try: | |
| from .sql_ident import is_valid_identifier, quote_ident | |
| except ImportError: # pragma: no cover - flat-layout / direct-run fallback | |
| from sql_ident import is_valid_identifier, quote_ident # type: ignore[no-redef] | |
| # How many distinct sample values to surface per column (advisory only). | |
| _SAMPLE_LIMIT = 5 | |
| # A column at or below this distinct count is treated as a likely coded/categorical | |
| # value by the deterministic fallback describer. | |
| _LOW_CARDINALITY = 10 | |
| # Module-private alias onto the shared, single-source identifier quoter | |
| # (``server/sql_ident.py``). Kept so existing call sites (and any local readers) | |
| # can keep using the by-convention-private name while the logic lives once. | |
| _quote_ident = quote_ident | |
| def _validate_table_name(table: str) -> str: | |
| """Fail-closed validation of a public-boundary table name. | |
| Raises ``ValueError`` unless ``table`` matches ``^[A-Za-z0-9_]+$`` (the env's | |
| identifier contract), so injection attempts via the table argument never reach | |
| SQL execution. | |
| """ | |
| if not is_valid_identifier(table): | |
| raise ValueError(f"Invalid table name: {table!r}.") | |
| return table | |
| class ColumnProfile(BaseModel): | |
| """Deterministic, model-free profile of one column (DuckDB SUMMARIZE).""" | |
| name: str = Field(..., description="Original (pre-normalization) column name.") | |
| safe_name: str = Field( | |
| ..., description="SQL-safe column name as written to SQLite." | |
| ) | |
| sqlite_type: str = Field( | |
| ..., description="Declared SQLite affinity, e.g. INTEGER/REAL/TEXT." | |
| ) | |
| duckdb_type: str = Field( | |
| ..., description="DuckDB sniffed type, e.g. BIGINT/DOUBLE/VARCHAR." | |
| ) | |
| null_pct: float = Field(..., description="Percentage of NULL values, 0.0–100.0.") | |
| cardinality: int = Field( | |
| ..., description="Approximate distinct count (SUMMARIZE approx_unique)." | |
| ) | |
| min: str | None = Field(default=None, description="Min value as string, or None.") | |
| max: str | None = Field(default=None, description="Max value as string, or None.") | |
| sample_values: list[str] = Field( | |
| default_factory=list, | |
| description="A few distinct sample values (advisory).", | |
| ) | |
| class ColumnCard(BaseModel): | |
| """One column's editable assumptions = profile + advisory layer + user hint.""" | |
| profile: ColumnProfile = Field( | |
| ..., description="The deterministic profile (not user-editable)." | |
| ) | |
| description: str = Field( | |
| default="", | |
| description="Plain-language meaning (advisory; model- or template-generated).", | |
| ) | |
| gotcha: str = Field(default="", description="Coded-values/units caveat (advisory).") | |
| user_hint: str = Field( | |
| default="", | |
| description="Optional per-column hint the user supplies/edits (user wins).", | |
| ) | |
| class DataCard(BaseModel): | |
| """Per-dataset advisory context. Round-trips losslessly to/from JSON.""" | |
| db_id: str = Field( | |
| ..., description="Normalized db_id (matches the SQLite dir/file name)." | |
| ) | |
| table: str = Field(..., description="Table name inside the SQLite DB.") | |
| table_description: str = Field( | |
| default="", | |
| description="One-line plain-language description of the dataset (advisory).", | |
| ) | |
| user_hint: str = Field( | |
| default="", | |
| description="The user's optional one-line 'What's in this data?' hint.", | |
| ) | |
| row_count: int = Field(..., description="Number of rows ingested.") | |
| columns: list[ColumnCard] = Field( | |
| ..., description="Per-column editable assumptions, in column order." | |
| ) | |
| advisory: bool = Field( | |
| default=True, | |
| description="Always True — the card is advisory, not ground truth.", | |
| ) | |
| def _advisory_always_true(cls, _value: bool) -> bool: | |
| # The card is advisory by contract; never let a caller flip it off. | |
| return True | |
| # Injection contract for the (optional) LLM description pass. Off the per-query | |
| # latency path; runs once on upload; deterministic fallback when None. | |
| # (profiles, table_description_seed) -> {column_safe_name: {"description", "gotcha"}} | |
| # A "__table__" key MAY be returned to supply the table-level description. | |
| DescribeFn = Callable[[list[ColumnProfile], str], dict[str, dict[str, str]]] | |
| _TABLE_DESCRIBE_KEY = "__table__" | |
| def _sqlite_affinities(source: str | Path, table: str) -> dict[str, str]: | |
| """Map each column to its declared SQLite affinity via PRAGMA table_info.""" | |
| import sqlite3 | |
| conn = sqlite3.connect(f"file:{Path(source)}?mode=ro", uri=True) | |
| try: | |
| rows = conn.execute(f"PRAGMA table_info({_quote_ident(table)})").fetchall() | |
| finally: | |
| conn.close() | |
| # PRAGMA columns: (cid, name, type, notnull, dflt_value, pk) | |
| return {row[1]: (row[2] or "") for row in rows} | |
| def _read_table_df( | |
| source: str | Path, table: str, affinities: dict[str, str] | |
| ) -> "pd.DataFrame": # noqa: F821 | |
| """Read a whole SQLite table into a pandas DataFrame via the stdlib driver. | |
| Stays fully local/offline (ADR 0009): no DuckDB ``sqlite`` extension and no | |
| ``INSTALL``/``LOAD`` network call. DuckDB summarizes the resulting in-memory | |
| DataFrame with no extension at all. | |
| pandas widens a SQLite INTEGER column that contains a NULL to ``float64`` (so | |
| ``1`` reads as ``1.0``), which would make DuckDB report a DOUBLE min/max. Cast | |
| such columns back to nullable ``Int64`` using the declared SQLite affinity so | |
| the profile faithfully reflects the stored integer type (parity with the old | |
| ATTACH path). | |
| """ | |
| import sqlite3 | |
| import pandas as pd | |
| conn = sqlite3.connect(f"file:{Path(source)}?mode=ro", uri=True) | |
| try: | |
| df = pd.read_sql_query(f"SELECT * FROM {_quote_ident(table)}", conn) | |
| finally: | |
| conn.close() | |
| for column in df.columns: | |
| if "INT" not in affinities.get(column, "").upper(): | |
| continue | |
| if not str(df[column].dtype).startswith("float"): | |
| continue | |
| non_null = df[column].dropna() | |
| if non_null.empty or not (non_null == non_null.round()).all(): | |
| continue | |
| df[column] = df[column].astype("Int64") | |
| return df | |
| def _sample_values( | |
| con: duckdb.DuckDBPyConnection, relation: str, column: str | |
| ) -> list[str]: | |
| """A few distinct non-null sample values for a column, as strings (advisory). | |
| Ordered so the profile (and the cached card built from it) is deterministic | |
| across calls — an unordered ``DISTINCT ... LIMIT`` returns rows in an | |
| unstable order. ``relation`` is a DuckDB relation name (the registered df). | |
| """ | |
| col = _quote_ident(column) | |
| rows = con.execute( | |
| f"SELECT DISTINCT {col} FROM {_quote_ident(relation)} " | |
| f"WHERE {col} IS NOT NULL ORDER BY {col} LIMIT {_SAMPLE_LIMIT}" | |
| ).fetchall() | |
| return [str(row[0]) for row in rows] | |
| def profile_table(source: str | Path, table: str = "data") -> list[ColumnProfile]: | |
| """Profile a SQLite table with DuckDB ``SUMMARIZE`` (deterministic, no model). | |
| Reads per column: min/max (as strings), null_pct (0.0–100.0), approx distinct | |
| count, a few distinct sample values, plus both the declared SQLite affinity and | |
| the DuckDB sniffed type. ``source`` is a path to the ``.sqlite`` file. | |
| The table is read through the stdlib ``sqlite3`` driver into a pandas | |
| DataFrame, then summarized in-memory by DuckDB — no DuckDB ``sqlite`` | |
| extension and no network ``INSTALL`` (stays local/offline per ADR 0009). | |
| """ | |
| _validate_table_name(table) | |
| affinities = _sqlite_affinities(source, table) | |
| df = _read_table_df(source, table, affinities) | |
| con = duckdb.connect() | |
| try: | |
| con.register("source_df", df) | |
| summary = con.execute("SUMMARIZE source_df").fetchall() | |
| summary_cols = [desc[0] for desc in con.description] | |
| profiles: list[ColumnProfile] = [] | |
| for raw in summary: | |
| row = dict(zip(summary_cols, raw)) | |
| name = row["column_name"] | |
| null_pct = row.get("null_percentage") | |
| cardinality = row.get("approx_unique") | |
| profiles.append( | |
| ColumnProfile( | |
| name=name, | |
| safe_name=name, | |
| sqlite_type=affinities.get(name, ""), | |
| duckdb_type=str(row.get("column_type", "")), | |
| null_pct=float(null_pct) if null_pct is not None else 0.0, | |
| cardinality=int(cardinality) if cardinality is not None else 0, | |
| min=None if row.get("min") is None else str(row["min"]), | |
| max=None if row.get("max") is None else str(row["max"]), | |
| sample_values=_sample_values(con, "source_df", name), | |
| ) | |
| ) | |
| return profiles | |
| finally: | |
| con.close() | |
| def _is_int_like(profile: ColumnProfile) -> bool: | |
| """True if the column reads as an integer (SQLite INTEGER or DuckDB *INT).""" | |
| sqlite_type = profile.sqlite_type.upper() | |
| duckdb_type = profile.duckdb_type.upper() | |
| return "INT" in sqlite_type or "INT" in duckdb_type | |
| def _fallback_describe( | |
| profiles: list[ColumnProfile], table_description_seed: str | |
| ) -> dict[str, dict[str, str]]: | |
| """Deterministic, model-free describe with the same shape as ``DescribeFn``. | |
| Templates a plain-language description and a gotcha per column from the | |
| profile alone — e.g. a low-cardinality integer column is flagged as a likely | |
| coded/categorical value. Stable across calls (no model, no randomness). | |
| """ | |
| out: dict[str, dict[str, str]] = {} | |
| for profile in profiles: | |
| int_like = _is_int_like(profile) | |
| low_card = profile.cardinality <= _LOW_CARDINALITY | |
| if int_like and low_card: | |
| description = ( | |
| f"Integer column '{profile.safe_name}' with only " | |
| f"{profile.cardinality} distinct values — looks like a " | |
| f"coded/categorical value." | |
| ) | |
| gotcha = ( | |
| "Low-cardinality integer: likely a coded/categorical value " | |
| "(verify the code meanings with SAMPLE)." | |
| ) | |
| elif int_like: | |
| description = f"Integer column '{profile.safe_name}'." | |
| gotcha = "" | |
| elif ( | |
| "DOUBLE" in profile.duckdb_type.upper() | |
| or "REAL" in profile.sqlite_type.upper() | |
| ): | |
| description = f"Numeric column '{profile.safe_name}'." | |
| gotcha = "" | |
| else: | |
| description = ( | |
| f"Text column '{profile.safe_name}' with " | |
| f"{profile.cardinality} distinct values." | |
| ) | |
| gotcha = "" | |
| out[profile.safe_name] = {"description": description, "gotcha": gotcha} | |
| seed = table_description_seed.strip() | |
| table_text = ( | |
| f"Dataset: {seed}." if seed else "Dataset profiled from the uploaded CSV." | |
| ) | |
| out[_TABLE_DESCRIBE_KEY] = {"description": table_text, "gotcha": ""} | |
| return out | |
| def _resolve_column_key( | |
| key: str, by_safe: dict[str, str], by_original: dict[str, str] | |
| ) -> str: | |
| """Resolve a column hint/edit key to its safe_name, accepting either form. | |
| A key may be the original CSV header (``"Order Status"``) OR the safe_name | |
| (``"order_status"``). Raises ``KeyError`` if it matches neither, so a typo / | |
| stale header is never silently dropped. | |
| """ | |
| if key in by_safe: | |
| return key | |
| if key in by_original: | |
| return by_original[key] | |
| raise KeyError(f"Unknown column key {key!r}: not an original header or safe_name.") | |
| def propose_data_card( | |
| source: str | Path, | |
| *, | |
| db_id: str, | |
| table: str = "data", | |
| describe_fn: DescribeFn | None = None, | |
| user_hint: str = "", | |
| column_hints: dict[str, str] | None = None, | |
| column_mapping: dict[str, str] | None = None, | |
| ) -> DataCard: | |
| """Assemble a ``DataCard`` of editable assumptions. Runs ONCE on upload. | |
| Combines ``profile_table(...)`` with a description layer (``describe_fn`` if | |
| supplied, else the deterministic ``_fallback_describe``) and the user's hints. | |
| Does NOT call any model unless ``describe_fn`` is supplied (so the default | |
| path is fully testable without an LLM and stays off the per-query latency path). | |
| ``column_mapping`` is the original→safe header map (e.g. from | |
| ``IngestResult.column_mapping``). When supplied, each profile records its | |
| ORIGINAL header as ``name`` so the card round-trips back to source columns, and | |
| ``column_hints`` keys may be the original header OR the safe_name. A hint key | |
| matching neither raises ``KeyError`` (never silently dropped). | |
| """ | |
| _validate_table_name(table) | |
| profiles = profile_table(source, table) | |
| mapping = column_mapping or {} | |
| safe_to_original = {safe: original for original, safe in mapping.items()} | |
| if safe_to_original: | |
| profiles = [ | |
| profile.model_copy( | |
| update={"name": safe_to_original.get(profile.safe_name, profile.name)} | |
| ) | |
| for profile in profiles | |
| ] | |
| seed = user_hint or db_id | |
| describe = describe_fn or _fallback_describe | |
| described = describe(profiles, seed) | |
| by_safe = {profile.safe_name: profile for profile in profiles} | |
| by_original = {profile.name: profile.safe_name for profile in profiles} | |
| resolved_hints: dict[str, str] = {} | |
| for key, value in (column_hints or {}).items(): | |
| resolved_hints[_resolve_column_key(key, by_safe, by_original)] = value | |
| columns: list[ColumnCard] = [] | |
| for profile in profiles: | |
| layer = described.get(profile.safe_name, {}) | |
| columns.append( | |
| ColumnCard( | |
| profile=profile, | |
| description=layer.get("description", ""), | |
| gotcha=layer.get("gotcha", ""), | |
| user_hint=resolved_hints.get(profile.safe_name, ""), | |
| ) | |
| ) | |
| table_layer = described.get(_TABLE_DESCRIBE_KEY, {}) | |
| row_count = _row_count(source, table) | |
| return DataCard( | |
| db_id=db_id, | |
| table=table, | |
| table_description=table_layer.get("description", ""), | |
| user_hint=user_hint, | |
| row_count=row_count, | |
| columns=columns, | |
| ) | |
| def _row_count(source: str | Path, table: str) -> int: | |
| """Count rows in the ingested table (read-only). | |
| Fail-closed: validates the table name against the env's identifier contract | |
| and quotes the identifier so an injection attempt raises rather than executes. | |
| """ | |
| import sqlite3 | |
| _validate_table_name(table) | |
| conn = sqlite3.connect(f"file:{Path(source)}?mode=ro", uri=True) | |
| try: | |
| return int( | |
| conn.execute(f"SELECT COUNT(*) FROM {_quote_ident(table)}").fetchone()[0] | |
| ) | |
| finally: | |
| conn.close() | |
| def apply_card_edits(card: DataCard, edits: dict) -> DataCard: | |
| """Confirm step. Return a NEW ``DataCard`` with user edits merged over proposals. | |
| User values win. The deterministic ``profile`` is never overwritten; unknown | |
| *top-level* keys are silently ignored. Column edit keys may be the ORIGINAL | |
| header OR the safe_name; a column key matching NEITHER raises ``KeyError`` (a | |
| stale/typo'd header is never silently dropped). ``edits`` shape:: | |
| {"table_description": str, "user_hint": str, | |
| "columns": {<original-header-or-safe_name>: | |
| {"description": str, "gotcha": str, "user_hint": str}}} | |
| """ | |
| updated = card.model_copy(deep=True) | |
| if isinstance(edits.get("table_description"), str): | |
| updated.table_description = edits["table_description"] | |
| if isinstance(edits.get("user_hint"), str): | |
| updated.user_hint = edits["user_hint"] | |
| column_edits = edits.get("columns") | |
| if isinstance(column_edits, dict): | |
| by_safe = {col.profile.safe_name: col for col in updated.columns} | |
| by_original = { | |
| col.profile.name: col.profile.safe_name for col in updated.columns | |
| } | |
| for key, column_edit in column_edits.items(): | |
| safe_name = _resolve_column_key(key, by_safe, by_original) | |
| target = by_safe[safe_name] | |
| if not isinstance(column_edit, dict): | |
| continue # malformed edit payload — ignore the value, key was valid | |
| for editable in ("description", "gotcha", "user_hint"): | |
| value = column_edit.get(editable) | |
| if isinstance(value, str): | |
| setattr(target, editable, value) | |
| return updated | |
| def render_data_context(card: DataCard) -> str: | |
| """PURE helper: render the card as a short fenced advisory 'Data context' block. | |
| Does NOT touch ``get_system_prompt`` or the trained prompt path — F004 wires | |
| injection later (ADR 0007 caveat: the trained policy never saw this block). | |
| Marks the block as user-provided hints that may be incomplete or wrong. | |
| """ | |
| lines: list[str] = [ | |
| "```data-context", | |
| f"Data context for table '{card.table}' " | |
| "(user-provided hints — may be incomplete or wrong; " | |
| "verify with DESCRIBE/SAMPLE):", | |
| ] | |
| if card.table_description: | |
| lines.append(f"- {card.table_description}") | |
| if card.user_hint: | |
| lines.append(f"- User hint: {card.user_hint}") | |
| for column in card.columns: | |
| detail = column.user_hint or column.description | |
| if not detail and not column.gotcha: | |
| continue | |
| parts = [f"- {column.profile.safe_name}"] | |
| if detail: | |
| parts.append(detail) | |
| if column.gotcha: | |
| parts.append(f"(gotcha: {column.gotcha})") | |
| lines.append(": ".join(parts[:2]) + (f" {parts[2]}" if len(parts) > 2 else "")) | |
| lines.append("```") | |
| return "\n".join(lines) | |
| def _sidecar_path(root: str | Path, db_id: str) -> Path: | |
| return Path(root) / db_id / f"{db_id}.datacard.json" | |
| def save_data_card(card: DataCard, root: str | Path) -> Path: | |
| """Persist the card as a JSON sidecar at ``<root>/<db_id>/<db_id>.datacard.json``. | |
| Uses ``model_dump_json`` for a lossless round-trip. Returns the sidecar path. | |
| """ | |
| path = _sidecar_path(root, card.db_id) | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| path.write_text(card.model_dump_json(), encoding="utf-8") | |
| return path | |
| def load_data_card(root: str | Path, db_id: str) -> DataCard | None: | |
| """Load a cached card sidecar if present (``model_validate_json``), else None. | |
| Missing-file behavior is explicit: a non-existent sidecar returns ``None`` | |
| (never raises). After a re-ingest with ``if_exists="replace"`` invalidates the | |
| sidecar, this returns ``None`` so a stale card is never served. | |
| """ | |
| path = _sidecar_path(root, db_id) | |
| if not path.exists(): | |
| return None | |
| return DataCard.model_validate_json(path.read_text(encoding="utf-8")) | |