| """Detector for decimal-shift anomalies in numeric columns. |
| |
| Identifies values that are exact powers-of-10 multiples (10x, 100x, 0.1x, |
| 0.01x, etc.) of the column's central tendency. This is the canonical |
| "decimal point was moved" data-entry error pattern. |
| |
| The detector is **pure**: no LLM calls, no I/O, no side effects. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import math |
| from typing import TYPE_CHECKING |
|
|
| import numpy as np |
| import pandas as pd |
|
|
| from dataforge.detectors.base import Issue, Schema, Severity |
|
|
| if TYPE_CHECKING: |
| pass |
|
|
| |
| _MIN_COLUMN_SIZE = 5 |
|
|
| |
| |
| _SHIFT_POWERS = (-3, -2, -1, 1, 2, 3) |
|
|
| |
| |
| |
| _LOG_TOLERANCE = 0.15 |
|
|
|
|
| def _try_float(value: object) -> float | None: |
| """Attempt to parse a value as float, returning None on failure. |
| |
| Args: |
| value: Any value (string, int, float, None, …). |
| |
| Returns: |
| The float value or None if parsing fails. |
| """ |
| if value is None or (isinstance(value, float) and math.isnan(value)): |
| return None |
| try: |
| return float(str(value)) |
| except (ValueError, TypeError): |
| return None |
|
|
|
|
| class DecimalShiftDetector: |
| """Detects values that are power-of-10 multiples of the column distribution. |
| |
| For each numeric column, computes the median and checks every value |
| to see if ``value / median`` is close to 10^k for k in {-3, -2, -1, |
| 1, 2, 3}. Flagged values get an ``expected`` field with the corrected |
| value (``value / 10^k``). |
| |
| Requires at least 5 non-null numeric values per column. Columns with |
| zero or near-zero median are handled gracefully. |
| |
| Example: |
| >>> import pandas as pd |
| >>> detector = DecimalShiftDetector() |
| >>> df = pd.DataFrame({"price": [100.0, 105.0, 98.0, 1020.0, 103.0]}) |
| >>> issues = detector.detect(df) |
| >>> issues[0].row |
| 3 |
| """ |
|
|
| def detect(self, df: pd.DataFrame, schema: Schema | None = None) -> list[Issue]: |
| """Detect decimal-shift issues in the DataFrame. |
| |
| Args: |
| df: The input DataFrame to analyze. |
| schema: Optional declared schema (unused by this detector). |
| |
| Returns: |
| A list of Issue objects for values that appear to be shifted |
| by a power of 10 relative to the column distribution. |
| """ |
| issues: list[Issue] = [] |
|
|
| for col_name in df.columns: |
| col_issues = self._check_column(df, str(col_name)) |
| issues.extend(col_issues) |
|
|
| return issues |
|
|
| def _check_column(self, df: pd.DataFrame, col_name: str) -> list[Issue]: |
| """Check a single column for decimal-shift outliers. |
| |
| Args: |
| df: The DataFrame containing the column. |
| col_name: Name of the column to check. |
| |
| Returns: |
| Issues found in this column. |
| """ |
| |
| parsed: list[tuple[int, float, str]] = [] |
| for row_idx, val in enumerate(df[col_name].tolist()): |
| fval = _try_float(val) |
| if fval is not None: |
| parsed.append((row_idx, fval, str(val))) |
|
|
| if len(parsed) < _MIN_COLUMN_SIZE: |
| return [] |
|
|
| values = np.array([v for _, v, _ in parsed]) |
| median = float(np.median(values)) |
|
|
| |
| if abs(median) < 1e-10: |
| return [] |
|
|
| issues: list[Issue] = [] |
| for row_idx, fval, str_val in parsed: |
| if abs(fval) < 1e-10: |
| continue |
|
|
| ratio = fval / median |
| if abs(ratio) < 1e-10: |
| continue |
|
|
| log_ratio = math.log10(abs(ratio)) |
|
|
| best_power: int | None = None |
| best_distance = float("inf") |
|
|
| for power in _SHIFT_POWERS: |
| distance = abs(log_ratio - power) |
| if distance < _LOG_TOLERANCE and distance < best_distance: |
| best_distance = distance |
| best_power = power |
|
|
| if best_power is not None: |
| correction_factor = 10.0**best_power |
| expected_val = fval / correction_factor |
|
|
| |
| confidence = round(min(0.95, max(0.70, 1.0 - best_distance * 2.0)), 2) |
|
|
| if best_power > 0: |
| reason = ( |
| f"Value {fval:g} in column '{col_name}' appears to be " |
| f"~{int(correction_factor)}x the typical value " |
| f"(median ~{median:g})" |
| ) |
| else: |
| reason = ( |
| f"Value {fval:g} in column '{col_name}' appears to be " |
| f"~{1.0 / correction_factor:g}x too small compared to " |
| f"the typical value (median ~{median:g})" |
| ) |
|
|
| issues.append( |
| Issue( |
| row=row_idx, |
| column=col_name, |
| issue_type="decimal_shift", |
| severity=Severity.REVIEW, |
| confidence=confidence, |
| expected=f"{expected_val:g}", |
| actual=str_val.strip(), |
| reason=reason, |
| ) |
| ) |
|
|
| return issues |
|
|