File size: 4,918 Bytes
5143557 eed1cab 5143557 eed1cab 5143557 eed1cab 5143557 eed1cab 5143557 eed1cab 5143557 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 | """Core models and protocol for the DataForge detector subsystem."""
from __future__ import annotations
import enum
from typing import Literal, Protocol
from pydantic import BaseModel, Field
from dataforge.table import TableLike
from dataforge.verifier.schema import (
AggregateDependency,
DomainBound,
FunctionalDependency,
Schema,
)
__all__ = [
"AggregateDependency",
"Detector",
"DomainBound",
"FunctionalDependency",
"Issue",
"IssueTypeLiteral",
"Schema",
"Severity",
]
class Severity(enum.Enum):
"""Three-tier severity for data-quality issues.
Ordering: SAFE < REVIEW < UNSAFE (higher = more severe).
- SAFE: likely benign; can be auto-applied in bulk without human review.
- REVIEW: ambiguous; should appear in the profile table for human triage.
- UNSAFE: structural error; blocks automated repair without explicit approval.
See DECISIONS.md entry "Issue severity tiers" for the rationale behind
choosing exactly 3 levels.
"""
SAFE = "safe"
REVIEW = "review"
UNSAFE = "unsafe"
def __lt__(self, other: object) -> bool:
"""Enable ordering so SAFE < REVIEW < UNSAFE."""
if not isinstance(other, Severity):
return NotImplemented
order = {Severity.SAFE: 0, Severity.REVIEW: 1, Severity.UNSAFE: 2}
return order[self] < order[other]
def __le__(self, other: object) -> bool:
"""Enable ordering so SAFE <= REVIEW <= UNSAFE."""
if not isinstance(other, Severity):
return NotImplemented
order = {Severity.SAFE: 0, Severity.REVIEW: 1, Severity.UNSAFE: 2}
return order[self] <= order[other]
def __gt__(self, other: object) -> bool:
"""Enable ordering so UNSAFE > REVIEW > SAFE."""
if not isinstance(other, Severity):
return NotImplemented
order = {Severity.SAFE: 0, Severity.REVIEW: 1, Severity.UNSAFE: 2}
return order[self] > order[other]
def __ge__(self, other: object) -> bool:
"""Enable ordering so UNSAFE >= REVIEW >= SAFE."""
if not isinstance(other, Severity):
return NotImplemented
order = {Severity.SAFE: 0, Severity.REVIEW: 1, Severity.UNSAFE: 2}
return order[self] >= order[other]
# Closed vocabulary of issue types. Extend this Literal as new detectors ship.
IssueTypeLiteral = Literal["type_mismatch", "decimal_shift", "fd_violation"]
class Issue(BaseModel):
"""A single data-quality finding at a specific (row, column) location.
Args:
row: Zero-indexed row number in the DataFrame.
column: Column name where the issue was detected.
issue_type: Machine-readable issue category (closed vocabulary).
severity: Three-tier severity classification.
confidence: Detector's confidence in the finding (0.0 to 1.0).
expected: What the value should be (if known); None for detection-only.
actual: The actual value found in the cell.
reason: Human-readable explanation of the issue.
Example:
>>> issue = Issue(
... row=3, column="price", issue_type="decimal_shift",
... severity=Severity.REVIEW, confidence=0.92,
... expected="102.0", actual="1020.0",
... reason="Value 1020.0 appears to be ~10x the typical value",
... )
"""
row: int = Field(ge=0, description="Zero-indexed row number")
column: str = Field(min_length=1, description="Column name")
issue_type: IssueTypeLiteral = Field(description="Machine-readable issue category")
severity: Severity = Field(description="Three-tier severity")
confidence: float = Field(ge=0.0, le=1.0, description="Detector confidence")
expected: str | None = Field(default=None, description="Expected value (if known)")
actual: str = Field(description="Actual value found in the cell")
reason: str = Field(min_length=1, description="Human-readable explanation")
model_config = {"frozen": True}
class Detector(Protocol):
"""Structural protocol that every detector must implement.
A detector is a pure function over tabular data: it receives a table
and an optional Schema, and returns a list of Issue objects. No LLM calls,
no disk I/O, no side effects.
Example:
>>> class MyDetector:
... def detect(
... self, df: TableLike, schema: Schema | None = None
... ) -> list[Issue]:
... return []
"""
def detect(self, df: TableLike, schema: Schema | None = None) -> list[Issue]:
"""Detect data-quality issues in the given DataFrame.
Args:
df: The input table to analyze.
schema: Optional declared schema with column types and constraints.
Returns:
A list of Issue objects describing detected anomalies.
"""
... # pragma: no cover
|