File size: 5,748 Bytes
5143557
 
 
 
 
 
 
 
 
 
 
 
eed1cab
5143557
 
eed1cab
5143557
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
eed1cab
5143557
 
 
 
 
 
 
 
 
 
 
 
eed1cab
5143557
 
 
 
 
eed1cab
5143557
 
 
 
 
 
 
 
 
 
 
eed1cab
5143557
 
 
 
 
 
 
eed1cab
5143557
 
eed1cab
5143557
 
 
 
 
 
 
eed1cab
5143557
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
eed1cab
5143557
 
 
 
 
eed1cab
5143557
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
"""Detector for decimal-shift anomalies in numeric columns.

Identifies values that are exact powers-of-10 multiples (10x, 100x, 0.1x,
0.01x, etc.) of the column's central tendency.  This is the canonical
"decimal point was moved" data-entry error pattern.

The detector is **pure**: no LLM calls, no I/O, no side effects.
"""

from __future__ import annotations

import math
from statistics import median

from dataforge.detectors.base import Issue, Schema, Severity
from dataforge.table import TableLike, column_names, column_values

# Minimum non-null numeric values required for meaningful statistics.
_MIN_COLUMN_SIZE = 5

# Powers of 10 to check.  Positive = value is N× too large;
# negative = value is N× too small.
_SHIFT_POWERS = (-3, -2, -1, 1, 2, 3)

# How close ratio must be to a power of 10 (in log10 space).
# 0.15 means we accept ratios within 10^±0.15 ≈ 0.71× – 1.41× of the
# exact power.  Tight enough to avoid false positives on natural variance.
_LOG_TOLERANCE = 0.15


def _try_float(value: object) -> float | None:
    """Attempt to parse a value as float, returning None on failure.

    Args:
        value: Any value (string, int, float, None, …).

    Returns:
        The float value or None if parsing fails.
    """
    if value is None or (isinstance(value, float) and math.isnan(value)):
        return None
    try:
        return float(str(value))
    except (ValueError, TypeError):
        return None


class DecimalShiftDetector:
    """Detects values that are power-of-10 multiples of the column distribution.

    For each numeric column, computes the median and checks every value
    to see if ``value / median`` is close to 10^k for k in {-3, -2, -1,
    1, 2, 3}.  Flagged values get an ``expected`` field with the corrected
    value (``value / 10^k``).

    Requires at least 5 non-null numeric values per column.  Columns with
    zero or near-zero median are handled gracefully.

    Example:
        >>> import pandas as pd
        >>> detector = DecimalShiftDetector()
        >>> df = pd.DataFrame({"price": [100.0, 105.0, 98.0, 1020.0, 103.0]})
        >>> issues = detector.detect(df)
        >>> issues[0].row
        3
    """

    def detect(self, df: TableLike, schema: Schema | None = None) -> list[Issue]:
        """Detect decimal-shift issues in the DataFrame.

        Args:
            df: The input DataFrame to analyze.
            schema: Optional declared schema (unused by this detector).

        Returns:
            A list of Issue objects for values that appear to be shifted
            by a power of 10 relative to the column distribution.
        """
        issues: list[Issue] = []

        for col_name in column_names(df):
            col_issues = self._check_column(df, str(col_name))
            issues.extend(col_issues)

        return issues

    def _check_column(self, df: TableLike, col_name: str) -> list[Issue]:
        """Check a single column for decimal-shift outliers.

        Args:
            df: The DataFrame containing the column.
            col_name: Name of the column to check.

        Returns:
            Issues found in this column.
        """
        # Parse all values to float, keeping track of original indices.
        parsed: list[tuple[int, float, str]] = []
        for row_idx, val in enumerate(column_values(df, col_name)):
            fval = _try_float(val)
            if fval is not None:
                parsed.append((row_idx, fval, str(val)))

        if len(parsed) < _MIN_COLUMN_SIZE:
            return []

        center = float(median([v for _, v, _ in parsed]))

        # If median is zero or very close, we cannot compute meaningful ratios.
        if abs(center) < 1e-10:
            return []

        issues: list[Issue] = []
        for row_idx, fval, str_val in parsed:
            if abs(fval) < 1e-10:
                continue

            ratio = fval / center
            if abs(ratio) < 1e-10:
                continue

            log_ratio = math.log10(abs(ratio))

            best_power: int | None = None
            best_distance = float("inf")

            for power in _SHIFT_POWERS:
                distance = abs(log_ratio - power)
                if distance < _LOG_TOLERANCE and distance < best_distance:
                    best_distance = distance
                    best_power = power

            if best_power is not None:
                correction_factor = 10.0**best_power
                expected_val = fval / correction_factor

                # Confidence: closer to exact power → higher confidence.
                confidence = round(min(0.95, max(0.70, 1.0 - best_distance * 2.0)), 2)

                if best_power > 0:
                    reason = (
                        f"Value {fval:g} in column '{col_name}' appears to be "
                        f"~{int(correction_factor)}x the typical value "
                        f"(median ~{center:g})"
                    )
                else:
                    reason = (
                        f"Value {fval:g} in column '{col_name}' appears to be "
                        f"~{1.0 / correction_factor:g}x too small compared to "
                        f"the typical value (median ~{center:g})"
                    )

                issues.append(
                    Issue(
                        row=row_idx,
                        column=col_name,
                        issue_type="decimal_shift",
                        severity=Severity.REVIEW,
                        confidence=confidence,
                        expected=f"{expected_val:g}",
                        actual=str_val.strip(),
                        reason=reason,
                    )
                )

        return issues