"""Drift engine: four atomic, idempotent DDL operations. Each apply_* function mutates ``conn`` in place inside a DuckDB ``BEGIN; ... COMMIT`` pair and returns a machine-readable changelog string. Humans consume the string via the :class:`read_changelog` tool; the rubric consults a separate drift-acknowledgement flag on the runtime state, not the string itself. Idempotency is enforced via a post-condition schema probe: once the drift has been applied (the target column / enum value is in the expected post-state), a second call short-circuits with the same changelog string. This matters because the environment's drift-trigger check runs every step and needs to be safe to retry. """ from __future__ import annotations from typing import TYPE_CHECKING, Any if TYPE_CHECKING: import duckdb # DuckDB auto-commits DDL and forbids mixing multi-statement transactions # with schema alterations across commit boundaries. Each drift operation # therefore executes its statements sequentially on the default # auto-commit connection; individual DML statements (UPDATEs) are # internally atomic at the statement level, which is sufficient for the # fixture mutation the env needs. If a drift operation raises mid-way we # tear down and re-seed the DuckDB via ScenarioSpec.materialize — there's # no long-lived on-disk state to roll back. def _table_columns(conn: duckdb.DuckDBPyConnection, table: str) -> list[str]: rows = conn.execute(f"PRAGMA table_info('{table}')").fetchall() # PRAGMA table_info returns (cid, name, type, notnull, dflt_value, pk) return [r[1] for r in rows] def _table_exists(conn: duckdb.DuckDBPyConnection, table: str) -> bool: rows = conn.execute( "SELECT COUNT(*) FROM information_schema.tables WHERE table_name = ?", [table] ).fetchone() return bool(rows and rows[0]) # ============================================================================= # Column rename # ============================================================================= def apply_column_rename(conn: duckdb.DuckDBPyConnection, payload: dict[str, Any]) -> str: """``{"table": str, "old": str, "new": str}``.""" table = payload["table"] old = payload["old"] new = payload["new"] cols = _table_columns(conn, table) if new in cols and old not in cols: return f"rename_already_applied:{table}.{old}->{new}" if old not in cols: raise ValueError(f"column_rename: {table}.{old} missing (cols={cols})") conn.execute(f'ALTER TABLE "{table}" RENAME COLUMN "{old}" TO "{new}"') return f"rename:{table}.{old}->{new}" # ============================================================================= # Date format change (iso_string → epoch_ms) # ============================================================================= def apply_date_format_change(conn: duckdb.DuckDBPyConnection, payload: dict[str, Any]) -> str: """``{"table": str, "col": str, "from": "iso_string", "to": "epoch_ms"}``. Only the one direction is supported for now; the payload still carries from/to for forward-compatibility and audit. """ table = payload["table"] col = payload["col"] from_fmt = payload.get("from", "iso_string") to_fmt = payload.get("to", "epoch_ms") if (from_fmt, to_fmt) != ("iso_string", "epoch_ms"): raise NotImplementedError( f"date_format_change only supports iso_string→epoch_ms, got {from_fmt}→{to_fmt}" ) cols = _table_columns(conn, table) # Idempotent: once column is BIGINT, consider it applied. type_row = conn.execute( "SELECT data_type FROM information_schema.columns WHERE table_name = ? AND column_name = ?", [table, col], ).fetchone() if type_row is None: raise ValueError(f"date_format_change: {table}.{col} missing (cols={cols})") if "BIGINT" in type_row[0].upper() or "INT" in type_row[0].upper(): return f"date_format_already_applied:{table}.{col}" tmp = f"{col}_epoch_ms" conn.execute(f'ALTER TABLE "{table}" ADD COLUMN "{tmp}" BIGINT') conn.execute( f'UPDATE "{table}" SET "{tmp}" = ' f'CAST(EXTRACT(EPOCH FROM CAST("{col}" AS TIMESTAMP)) * 1000 AS BIGINT)' ) conn.execute(f'ALTER TABLE "{table}" DROP COLUMN "{col}"') conn.execute(f'ALTER TABLE "{table}" RENAME COLUMN "{tmp}" TO "{col}"') return f"date_format:{table}.{col}:iso_string->epoch_ms" # ============================================================================= # Enum rule change (split `old_value` into N new values) # ============================================================================= def apply_enum_rule_change(conn: duckdb.DuckDBPyConnection, payload: dict[str, Any]) -> str: """``{"table": str, "col": str, "old_value": str, "new_values": list[str]}``. Rows holding ``old_value`` are re-distributed deterministically into ``new_values`` (round-robin by rowid) so the split is reproducible. """ table = payload["table"] col = payload["col"] old_value = payload["old_value"] new_values: list[str] = list(payload["new_values"]) if not new_values: raise ValueError("enum_rule_change: new_values must be non-empty") count_row = conn.execute( f'SELECT COUNT(*) FROM "{table}" WHERE "{col}" = ?', [old_value] ).fetchone() count_old = count_row[0] if count_row is not None else 0 # Idempotent: if old_value has already been drained AND any of the # new_values is present, treat as applied. if count_old == 0: has_new_row = conn.execute( f'SELECT COUNT(*) FROM "{table}" WHERE "{col}" IN ({",".join("?" * len(new_values))})', new_values, ).fetchone() has_new = has_new_row[0] if has_new_row is not None else 0 if has_new > 0: return f"enum_rule_already_applied:{table}.{col}:{old_value}->{new_values}" # Deterministic split by rowid mod N. case_branches = " ".join( f"WHEN mod(rid, {len(new_values)}) = {i} THEN '{v}'" for i, v in enumerate(new_values) ) conn.execute( f"CREATE TEMP TABLE _enum_remap AS " f"SELECT rowid AS rid, " f"CASE {case_branches} END AS new_val " f'FROM "{table}" WHERE "{col}" = ?', [old_value], ) conn.execute( f'UPDATE "{table}" SET "{col}" = _enum_remap.new_val ' f'FROM _enum_remap WHERE _enum_remap.rid = "{table}".rowid' ) conn.execute("DROP TABLE _enum_remap") return f"enum_rule:{table}.{col}:{old_value}->{'+'.join(new_values)}" # ============================================================================= # Field deprecation (replace inline string col with FK lookup) # ============================================================================= def apply_field_deprecation(conn: duckdb.DuckDBPyConnection, payload: dict[str, Any]) -> str: """``{"orig": (table, col), "lookup": (table, id_col, name_col)}``. - Creates the lookup table (if missing) and seeds it with distinct values observed on ``orig.col``. - Adds ``orig.`` with a FK-style backfill. - Drops ``orig.col``. """ orig_table, orig_col = payload["orig"] lookup_table, lookup_id_col, lookup_name_col = payload["lookup"] new_fk_col = f"{lookup_table}_{lookup_id_col}" # e.g. "users_id" orig_cols = _table_columns(conn, orig_table) if orig_col not in orig_cols and new_fk_col in orig_cols: return f"field_deprecation_already_applied:{orig_table}.{orig_col}" if orig_col not in orig_cols: raise ValueError(f"field_deprecation: {orig_table}.{orig_col} missing (cols={orig_cols})") if not _table_exists(conn, lookup_table): conn.execute( f'CREATE TABLE "{lookup_table}" (' f' "{lookup_id_col}" BIGINT PRIMARY KEY,' f' "{lookup_name_col}" VARCHAR' ");" ) conn.execute( f'INSERT INTO "{lookup_table}" ("{lookup_id_col}", "{lookup_name_col}") ' f"SELECT ROW_NUMBER() OVER (ORDER BY v) + " f'COALESCE((SELECT MAX("{lookup_id_col}") FROM "{lookup_table}"), 0), v ' f'FROM (SELECT DISTINCT "{orig_col}" AS v FROM "{orig_table}") ' f"WHERE v IS NOT NULL " f' AND v NOT IN (SELECT "{lookup_name_col}" FROM "{lookup_table}");' ) conn.execute(f'ALTER TABLE "{orig_table}" ADD COLUMN "{new_fk_col}" BIGINT') conn.execute( f'UPDATE "{orig_table}" SET "{new_fk_col}" = lookup."{lookup_id_col}" ' f'FROM "{lookup_table}" lookup ' f'WHERE lookup."{lookup_name_col}" = "{orig_table}"."{orig_col}"' ) conn.execute(f'ALTER TABLE "{orig_table}" DROP COLUMN "{orig_col}"') return ( f"field_deprecation:{orig_table}.{orig_col}->" f"{orig_table}.{new_fk_col}→{lookup_table}.{lookup_name_col}" ) # ============================================================================= # Dispatcher # ============================================================================= DRIFT_HANDLERS = { "column_rename": apply_column_rename, "date_format": apply_date_format_change, "enum_rule": apply_enum_rule_change, "field_deprecation": apply_field_deprecation, } def apply_drift(conn: duckdb.DuckDBPyConnection, kind: str, payload: dict[str, Any]) -> str: if kind not in DRIFT_HANDLERS: raise ValueError(f"unknown drift kind={kind!r}; known: {sorted(DRIFT_HANDLERS)}") return DRIFT_HANDLERS[kind](conn, payload) __all__ = [ "DRIFT_HANDLERS", "apply_column_rename", "apply_date_format_change", "apply_drift", "apply_enum_rule_change", "apply_field_deprecation", ]