| """Detector for functional-dependency violations in tabular data. |
| |
| Given a declared functional dependency X -> Y (where X is a set of |
| determinant columns and Y is a dependent column), this detector groups |
| rows by X and flags any group where Y takes more than one distinct value. |
| |
| Week 1 scope: declared FDs only (from the schema YAML). Automatic FD |
| mining is deferred to a later milestone. |
| |
| The detector is **pure**: no LLM calls, no I/O, no side effects. |
| """ |
|
|
| from __future__ import annotations |
|
|
| from dataforge.detectors.base import Issue, Schema, Severity |
| from dataforge.table import TableLike, cell_value, column_names, row_count |
|
|
|
|
| class FDViolationDetector: |
| """Detects rows that violate declared functional dependencies. |
| |
| For each FD ``determinant -> dependent`` in the schema, groups the |
| DataFrame by the determinant columns and checks that each group has |
| exactly one unique value in the dependent column. All rows in a |
| violating group are flagged. |
| |
| Requires a ``Schema`` with ``functional_dependencies`` to do anything; |
| returns an empty list if no schema or no FDs are provided. |
| |
| Example: |
| >>> import pandas as pd |
| >>> from dataforge.detectors.base import FunctionalDependency, Schema |
| >>> detector = FDViolationDetector() |
| >>> df = pd.DataFrame({ |
| ... "zip": ["10001", "10001", "90210"], |
| ... "city": ["NY", "Manhattan", "LA"], |
| ... }) |
| >>> schema = Schema(functional_dependencies=[ |
| ... FunctionalDependency(determinant=["zip"], dependent="city"), |
| ... ]) |
| >>> issues = detector.detect(df, schema) |
| >>> len(issues) |
| 2 |
| """ |
|
|
| def detect(self, df: TableLike, schema: Schema | None = None) -> list[Issue]: |
| """Detect FD-violation issues in the DataFrame. |
| |
| Args: |
| df: The input DataFrame to analyze. |
| schema: Schema containing declared functional dependencies. |
| If None or no FDs declared, returns an empty list. |
| |
| Returns: |
| A list of Issue objects for rows violating declared FDs. |
| """ |
| if schema is None or not schema.functional_dependencies: |
| return [] |
|
|
| issues: list[Issue] = [] |
|
|
| for fd in schema.functional_dependencies: |
| fd_issues = self._check_fd(df, fd.determinant, fd.dependent) |
| issues.extend(fd_issues) |
|
|
| return issues |
|
|
| def _check_fd( |
| self, |
| df: TableLike, |
| determinant: tuple[str, ...], |
| dependent: str, |
| ) -> list[Issue]: |
| """Check a single functional dependency X -> Y. |
| |
| Args: |
| df: The DataFrame to check. |
| determinant: List of determinant column names (X). |
| dependent: The dependent column name (Y). |
| |
| Returns: |
| Issues for all rows in groups that violate the FD. |
| """ |
| determinant_columns = list(determinant) |
|
|
| |
| all_cols = [*determinant_columns, dependent] |
| available_columns = set(column_names(df)) |
| for col in all_cols: |
| if col not in available_columns: |
| return [] |
|
|
| groups: dict[tuple[str, ...], list[int]] = {} |
| for row in range(row_count(df)): |
| group_key = tuple(cell_value(df, row, column) for column in determinant_columns) |
| if any(value == "" for value in group_key): |
| continue |
| groups.setdefault(group_key, []).append(row) |
|
|
| if not groups: |
| return [] |
|
|
| issues: list[Issue] = [] |
| for group_key, row_indices in groups.items(): |
| unique_deps: list[str] = [] |
| for row in row_indices: |
| value = cell_value(df, row, dependent) |
| if value == "" or value in unique_deps: |
| continue |
| unique_deps.append(value) |
| if len(unique_deps) <= 1: |
| continue |
|
|
| det_desc = self._format_determinant(determinant, group_key) |
| unique_str = ", ".join(repr(str(v)) for v in unique_deps) |
|
|
| for idx in row_indices: |
| actual_val = cell_value(df, idx, dependent) |
| reason = ( |
| f"Functional dependency {determinant} -> {dependent} " |
| f"violated: {det_desc} maps to multiple values: " |
| f"{{{unique_str}}}" |
| ) |
| issues.append( |
| Issue( |
| row=int(idx), |
| column=dependent, |
| issue_type="fd_violation", |
| severity=Severity.UNSAFE, |
| confidence=0.95, |
| actual=actual_val, |
| reason=reason, |
| ) |
| ) |
|
|
| return issues |
|
|
| @staticmethod |
| def _format_determinant(determinant: tuple[str, ...], group_key: object) -> str: |
| """Format the determinant key for human-readable output. |
| |
| Args: |
| determinant: List of determinant column names. |
| group_key: The group key (scalar or tuple). |
| |
| Returns: |
| A formatted string like ``zip_code='10001'``. |
| """ |
| if len(determinant) == 1: |
| return f"{determinant[0]}='{group_key}'" |
|
|
| |
| if isinstance(group_key, tuple): |
| parts = [f"{col}='{val}'" for col, val in zip(determinant, group_key, strict=True)] |
| return ", ".join(parts) |
|
|
| return f"{determinant}='{group_key}'" |
|
|