Praneshrajan15's picture
Deploy DataForge playground API
eed1cab verified
"""Detector for functional-dependency violations in tabular data.
Given a declared functional dependency X -> Y (where X is a set of
determinant columns and Y is a dependent column), this detector groups
rows by X and flags any group where Y takes more than one distinct value.
Week 1 scope: declared FDs only (from the schema YAML). Automatic FD
mining is deferred to a later milestone.
The detector is **pure**: no LLM calls, no I/O, no side effects.
"""
from __future__ import annotations
from dataforge.detectors.base import Issue, Schema, Severity
from dataforge.table import TableLike, cell_value, column_names, row_count
class FDViolationDetector:
"""Detects rows that violate declared functional dependencies.
For each FD ``determinant -> dependent`` in the schema, groups the
DataFrame by the determinant columns and checks that each group has
exactly one unique value in the dependent column. All rows in a
violating group are flagged.
Requires a ``Schema`` with ``functional_dependencies`` to do anything;
returns an empty list if no schema or no FDs are provided.
Example:
>>> import pandas as pd
>>> from dataforge.detectors.base import FunctionalDependency, Schema
>>> detector = FDViolationDetector()
>>> df = pd.DataFrame({
... "zip": ["10001", "10001", "90210"],
... "city": ["NY", "Manhattan", "LA"],
... })
>>> schema = Schema(functional_dependencies=[
... FunctionalDependency(determinant=["zip"], dependent="city"),
... ])
>>> issues = detector.detect(df, schema)
>>> len(issues)
2
"""
def detect(self, df: TableLike, schema: Schema | None = None) -> list[Issue]:
"""Detect FD-violation issues in the DataFrame.
Args:
df: The input DataFrame to analyze.
schema: Schema containing declared functional dependencies.
If None or no FDs declared, returns an empty list.
Returns:
A list of Issue objects for rows violating declared FDs.
"""
if schema is None or not schema.functional_dependencies:
return []
issues: list[Issue] = []
for fd in schema.functional_dependencies:
fd_issues = self._check_fd(df, fd.determinant, fd.dependent)
issues.extend(fd_issues)
return issues
def _check_fd(
self,
df: TableLike,
determinant: tuple[str, ...],
dependent: str,
) -> list[Issue]:
"""Check a single functional dependency X -> Y.
Args:
df: The DataFrame to check.
determinant: List of determinant column names (X).
dependent: The dependent column name (Y).
Returns:
Issues for all rows in groups that violate the FD.
"""
determinant_columns = list(determinant)
# Verify all columns exist in the DataFrame.
all_cols = [*determinant_columns, dependent]
available_columns = set(column_names(df))
for col in all_cols:
if col not in available_columns:
return []
groups: dict[tuple[str, ...], list[int]] = {}
for row in range(row_count(df)):
group_key = tuple(cell_value(df, row, column) for column in determinant_columns)
if any(value == "" for value in group_key):
continue
groups.setdefault(group_key, []).append(row)
if not groups:
return []
issues: list[Issue] = []
for group_key, row_indices in groups.items():
unique_deps: list[str] = []
for row in row_indices:
value = cell_value(df, row, dependent)
if value == "" or value in unique_deps:
continue
unique_deps.append(value)
if len(unique_deps) <= 1:
continue
det_desc = self._format_determinant(determinant, group_key)
unique_str = ", ".join(repr(str(v)) for v in unique_deps)
for idx in row_indices:
actual_val = cell_value(df, idx, dependent)
reason = (
f"Functional dependency {determinant} -> {dependent} "
f"violated: {det_desc} maps to multiple values: "
f"{{{unique_str}}}"
)
issues.append(
Issue(
row=int(idx),
column=dependent,
issue_type="fd_violation",
severity=Severity.UNSAFE,
confidence=0.95,
actual=actual_val,
reason=reason,
)
)
return issues
@staticmethod
def _format_determinant(determinant: tuple[str, ...], group_key: object) -> str:
"""Format the determinant key for human-readable output.
Args:
determinant: List of determinant column names.
group_key: The group key (scalar or tuple).
Returns:
A formatted string like ``zip_code='10001'``.
"""
if len(determinant) == 1:
return f"{determinant[0]}='{group_key}'"
# Composite key: group_key is a tuple.
if isinstance(group_key, tuple):
parts = [f"{col}='{val}'" for col, val in zip(determinant, group_key, strict=True)]
return ", ".join(parts)
return f"{determinant}='{group_key}'"