Spaces:
Running
Running
| """Detector for decimal-shift anomalies in numeric columns. | |
| Identifies values that are exact powers-of-10 multiples (10x, 100x, 0.1x, | |
| 0.01x, etc.) of the column's central tendency. This is the canonical | |
| "decimal point was moved" data-entry error pattern. | |
| The detector is **pure**: no LLM calls, no I/O, no side effects. | |
| """ | |
| from __future__ import annotations | |
| import math | |
| from typing import TYPE_CHECKING | |
| import numpy as np | |
| import pandas as pd | |
| from dataforge.detectors.base import Issue, Schema, Severity | |
| if TYPE_CHECKING: | |
| pass | |
| # Minimum non-null numeric values required for meaningful statistics. | |
| _MIN_COLUMN_SIZE = 5 | |
| # Powers of 10 to check. Positive = value is N× too large; | |
| # negative = value is N× too small. | |
| _SHIFT_POWERS = (-3, -2, -1, 1, 2, 3) | |
| # How close ratio must be to a power of 10 (in log10 space). | |
| # 0.15 means we accept ratios within 10^±0.15 ≈ 0.71× – 1.41× of the | |
| # exact power. Tight enough to avoid false positives on natural variance. | |
| _LOG_TOLERANCE = 0.15 | |
| def _try_float(value: object) -> float | None: | |
| """Attempt to parse a value as float, returning None on failure. | |
| Args: | |
| value: Any value (string, int, float, None, …). | |
| Returns: | |
| The float value or None if parsing fails. | |
| """ | |
| if value is None or (isinstance(value, float) and math.isnan(value)): | |
| return None | |
| try: | |
| return float(str(value)) | |
| except (ValueError, TypeError): | |
| return None | |
| class DecimalShiftDetector: | |
| """Detects values that are power-of-10 multiples of the column distribution. | |
| For each numeric column, computes the median and checks every value | |
| to see if ``value / median`` is close to 10^k for k in {-3, -2, -1, | |
| 1, 2, 3}. Flagged values get an ``expected`` field with the corrected | |
| value (``value / 10^k``). | |
| Requires at least 5 non-null numeric values per column. Columns with | |
| zero or near-zero median are handled gracefully. | |
| Example: | |
| >>> import pandas as pd | |
| >>> detector = DecimalShiftDetector() | |
| >>> df = pd.DataFrame({"price": [100.0, 105.0, 98.0, 1020.0, 103.0]}) | |
| >>> issues = detector.detect(df) | |
| >>> issues[0].row | |
| 3 | |
| """ | |
| def detect(self, df: pd.DataFrame, schema: Schema | None = None) -> list[Issue]: | |
| """Detect decimal-shift issues in the DataFrame. | |
| Args: | |
| df: The input DataFrame to analyze. | |
| schema: Optional declared schema (unused by this detector). | |
| Returns: | |
| A list of Issue objects for values that appear to be shifted | |
| by a power of 10 relative to the column distribution. | |
| """ | |
| issues: list[Issue] = [] | |
| for col_name in df.columns: | |
| col_issues = self._check_column(df, str(col_name)) | |
| issues.extend(col_issues) | |
| return issues | |
| def _check_column(self, df: pd.DataFrame, col_name: str) -> list[Issue]: | |
| """Check a single column for decimal-shift outliers. | |
| Args: | |
| df: The DataFrame containing the column. | |
| col_name: Name of the column to check. | |
| Returns: | |
| Issues found in this column. | |
| """ | |
| # Parse all values to float, keeping track of original indices. | |
| parsed: list[tuple[int, float, str]] = [] | |
| for row_idx, val in enumerate(df[col_name].tolist()): | |
| fval = _try_float(val) | |
| if fval is not None: | |
| parsed.append((row_idx, fval, str(val))) | |
| if len(parsed) < _MIN_COLUMN_SIZE: | |
| return [] | |
| values = np.array([v for _, v, _ in parsed]) | |
| median = float(np.median(values)) | |
| # If median is zero or very close, we cannot compute meaningful ratios. | |
| if abs(median) < 1e-10: | |
| return [] | |
| issues: list[Issue] = [] | |
| for row_idx, fval, str_val in parsed: | |
| if abs(fval) < 1e-10: | |
| continue | |
| ratio = fval / median | |
| if abs(ratio) < 1e-10: | |
| continue | |
| log_ratio = math.log10(abs(ratio)) | |
| best_power: int | None = None | |
| best_distance = float("inf") | |
| for power in _SHIFT_POWERS: | |
| distance = abs(log_ratio - power) | |
| if distance < _LOG_TOLERANCE and distance < best_distance: | |
| best_distance = distance | |
| best_power = power | |
| if best_power is not None: | |
| correction_factor = 10.0**best_power | |
| expected_val = fval / correction_factor | |
| # Confidence: closer to exact power → higher confidence. | |
| confidence = round(min(0.95, max(0.70, 1.0 - best_distance * 2.0)), 2) | |
| if best_power > 0: | |
| reason = ( | |
| f"Value {fval:g} in column '{col_name}' appears to be " | |
| f"~{int(correction_factor)}x the typical value " | |
| f"(median ~{median:g})" | |
| ) | |
| else: | |
| reason = ( | |
| f"Value {fval:g} in column '{col_name}' appears to be " | |
| f"~{1.0 / correction_factor:g}x too small compared to " | |
| f"the typical value (median ~{median:g})" | |
| ) | |
| issues.append( | |
| Issue( | |
| row=row_idx, | |
| column=col_name, | |
| issue_type="decimal_shift", | |
| severity=Severity.REVIEW, | |
| confidence=confidence, | |
| expected=f"{expected_val:g}", | |
| actual=str_val.strip(), | |
| reason=reason, | |
| ) | |
| ) | |
| return issues | |