File size: 5,748 Bytes
5143557 eed1cab 5143557 eed1cab 5143557 eed1cab 5143557 eed1cab 5143557 eed1cab 5143557 eed1cab 5143557 eed1cab 5143557 eed1cab 5143557 eed1cab 5143557 eed1cab 5143557 eed1cab 5143557 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 | """Detector for decimal-shift anomalies in numeric columns.
Identifies values that are exact powers-of-10 multiples (10x, 100x, 0.1x,
0.01x, etc.) of the column's central tendency. This is the canonical
"decimal point was moved" data-entry error pattern.
The detector is **pure**: no LLM calls, no I/O, no side effects.
"""
from __future__ import annotations
import math
from statistics import median
from dataforge.detectors.base import Issue, Schema, Severity
from dataforge.table import TableLike, column_names, column_values
# Minimum non-null numeric values required for meaningful statistics.
_MIN_COLUMN_SIZE = 5
# Powers of 10 to check. Positive = value is N× too large;
# negative = value is N× too small.
_SHIFT_POWERS = (-3, -2, -1, 1, 2, 3)
# How close ratio must be to a power of 10 (in log10 space).
# 0.15 means we accept ratios within 10^±0.15 ≈ 0.71× – 1.41× of the
# exact power. Tight enough to avoid false positives on natural variance.
_LOG_TOLERANCE = 0.15
def _try_float(value: object) -> float | None:
"""Attempt to parse a value as float, returning None on failure.
Args:
value: Any value (string, int, float, None, …).
Returns:
The float value or None if parsing fails.
"""
if value is None or (isinstance(value, float) and math.isnan(value)):
return None
try:
return float(str(value))
except (ValueError, TypeError):
return None
class DecimalShiftDetector:
"""Detects values that are power-of-10 multiples of the column distribution.
For each numeric column, computes the median and checks every value
to see if ``value / median`` is close to 10^k for k in {-3, -2, -1,
1, 2, 3}. Flagged values get an ``expected`` field with the corrected
value (``value / 10^k``).
Requires at least 5 non-null numeric values per column. Columns with
zero or near-zero median are handled gracefully.
Example:
>>> import pandas as pd
>>> detector = DecimalShiftDetector()
>>> df = pd.DataFrame({"price": [100.0, 105.0, 98.0, 1020.0, 103.0]})
>>> issues = detector.detect(df)
>>> issues[0].row
3
"""
def detect(self, df: TableLike, schema: Schema | None = None) -> list[Issue]:
"""Detect decimal-shift issues in the DataFrame.
Args:
df: The input DataFrame to analyze.
schema: Optional declared schema (unused by this detector).
Returns:
A list of Issue objects for values that appear to be shifted
by a power of 10 relative to the column distribution.
"""
issues: list[Issue] = []
for col_name in column_names(df):
col_issues = self._check_column(df, str(col_name))
issues.extend(col_issues)
return issues
def _check_column(self, df: TableLike, col_name: str) -> list[Issue]:
"""Check a single column for decimal-shift outliers.
Args:
df: The DataFrame containing the column.
col_name: Name of the column to check.
Returns:
Issues found in this column.
"""
# Parse all values to float, keeping track of original indices.
parsed: list[tuple[int, float, str]] = []
for row_idx, val in enumerate(column_values(df, col_name)):
fval = _try_float(val)
if fval is not None:
parsed.append((row_idx, fval, str(val)))
if len(parsed) < _MIN_COLUMN_SIZE:
return []
center = float(median([v for _, v, _ in parsed]))
# If median is zero or very close, we cannot compute meaningful ratios.
if abs(center) < 1e-10:
return []
issues: list[Issue] = []
for row_idx, fval, str_val in parsed:
if abs(fval) < 1e-10:
continue
ratio = fval / center
if abs(ratio) < 1e-10:
continue
log_ratio = math.log10(abs(ratio))
best_power: int | None = None
best_distance = float("inf")
for power in _SHIFT_POWERS:
distance = abs(log_ratio - power)
if distance < _LOG_TOLERANCE and distance < best_distance:
best_distance = distance
best_power = power
if best_power is not None:
correction_factor = 10.0**best_power
expected_val = fval / correction_factor
# Confidence: closer to exact power → higher confidence.
confidence = round(min(0.95, max(0.70, 1.0 - best_distance * 2.0)), 2)
if best_power > 0:
reason = (
f"Value {fval:g} in column '{col_name}' appears to be "
f"~{int(correction_factor)}x the typical value "
f"(median ~{center:g})"
)
else:
reason = (
f"Value {fval:g} in column '{col_name}' appears to be "
f"~{1.0 / correction_factor:g}x too small compared to "
f"the typical value (median ~{center:g})"
)
issues.append(
Issue(
row=row_idx,
column=col_name,
issue_type="decimal_shift",
severity=Severity.REVIEW,
confidence=confidence,
expected=f"{expected_val:g}",
actual=str_val.strip(),
reason=reason,
)
)
return issues
|