Praneshrajan15's picture
feat: initial playground deployment
5143557 verified
"""Detector for decimal-shift anomalies in numeric columns.
Identifies values that are exact powers-of-10 multiples (10x, 100x, 0.1x,
0.01x, etc.) of the column's central tendency. This is the canonical
"decimal point was moved" data-entry error pattern.
The detector is **pure**: no LLM calls, no I/O, no side effects.
"""
from __future__ import annotations
import math
from typing import TYPE_CHECKING
import numpy as np
import pandas as pd
from dataforge.detectors.base import Issue, Schema, Severity
if TYPE_CHECKING:
pass
# Minimum non-null numeric values required for meaningful statistics.
_MIN_COLUMN_SIZE = 5
# Powers of 10 to check. Positive = value is N× too large;
# negative = value is N× too small.
_SHIFT_POWERS = (-3, -2, -1, 1, 2, 3)
# How close ratio must be to a power of 10 (in log10 space).
# 0.15 means we accept ratios within 10^±0.15 ≈ 0.71× – 1.41× of the
# exact power. Tight enough to avoid false positives on natural variance.
_LOG_TOLERANCE = 0.15
def _try_float(value: object) -> float | None:
"""Attempt to parse a value as float, returning None on failure.
Args:
value: Any value (string, int, float, None, …).
Returns:
The float value or None if parsing fails.
"""
if value is None or (isinstance(value, float) and math.isnan(value)):
return None
try:
return float(str(value))
except (ValueError, TypeError):
return None
class DecimalShiftDetector:
"""Detects values that are power-of-10 multiples of the column distribution.
For each numeric column, computes the median and checks every value
to see if ``value / median`` is close to 10^k for k in {-3, -2, -1,
1, 2, 3}. Flagged values get an ``expected`` field with the corrected
value (``value / 10^k``).
Requires at least 5 non-null numeric values per column. Columns with
zero or near-zero median are handled gracefully.
Example:
>>> import pandas as pd
>>> detector = DecimalShiftDetector()
>>> df = pd.DataFrame({"price": [100.0, 105.0, 98.0, 1020.0, 103.0]})
>>> issues = detector.detect(df)
>>> issues[0].row
3
"""
def detect(self, df: pd.DataFrame, schema: Schema | None = None) -> list[Issue]:
"""Detect decimal-shift issues in the DataFrame.
Args:
df: The input DataFrame to analyze.
schema: Optional declared schema (unused by this detector).
Returns:
A list of Issue objects for values that appear to be shifted
by a power of 10 relative to the column distribution.
"""
issues: list[Issue] = []
for col_name in df.columns:
col_issues = self._check_column(df, str(col_name))
issues.extend(col_issues)
return issues
def _check_column(self, df: pd.DataFrame, col_name: str) -> list[Issue]:
"""Check a single column for decimal-shift outliers.
Args:
df: The DataFrame containing the column.
col_name: Name of the column to check.
Returns:
Issues found in this column.
"""
# Parse all values to float, keeping track of original indices.
parsed: list[tuple[int, float, str]] = []
for row_idx, val in enumerate(df[col_name].tolist()):
fval = _try_float(val)
if fval is not None:
parsed.append((row_idx, fval, str(val)))
if len(parsed) < _MIN_COLUMN_SIZE:
return []
values = np.array([v for _, v, _ in parsed])
median = float(np.median(values))
# If median is zero or very close, we cannot compute meaningful ratios.
if abs(median) < 1e-10:
return []
issues: list[Issue] = []
for row_idx, fval, str_val in parsed:
if abs(fval) < 1e-10:
continue
ratio = fval / median
if abs(ratio) < 1e-10:
continue
log_ratio = math.log10(abs(ratio))
best_power: int | None = None
best_distance = float("inf")
for power in _SHIFT_POWERS:
distance = abs(log_ratio - power)
if distance < _LOG_TOLERANCE and distance < best_distance:
best_distance = distance
best_power = power
if best_power is not None:
correction_factor = 10.0**best_power
expected_val = fval / correction_factor
# Confidence: closer to exact power → higher confidence.
confidence = round(min(0.95, max(0.70, 1.0 - best_distance * 2.0)), 2)
if best_power > 0:
reason = (
f"Value {fval:g} in column '{col_name}' appears to be "
f"~{int(correction_factor)}x the typical value "
f"(median ~{median:g})"
)
else:
reason = (
f"Value {fval:g} in column '{col_name}' appears to be "
f"~{1.0 / correction_factor:g}x too small compared to "
f"the typical value (median ~{median:g})"
)
issues.append(
Issue(
row=row_idx,
column=col_name,
issue_type="decimal_shift",
severity=Severity.REVIEW,
confidence=confidence,
expected=f"{expected_val:g}",
actual=str_val.strip(),
reason=reason,
)
)
return issues