File size: 2,234 Bytes
dce68a7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c22bf49
dce68a7
 
 
 
 
 
 
 
 
 
 
 
 
c22bf49
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
from __future__ import annotations

from typing import Any

from env.actions import is_missing


def _is_numeric_value(value: Any, dtype: str) -> bool:
    if is_missing(value):
        return False
    try:
        if dtype == "int":
            int(str(value))
        elif dtype == "float":
            float(str(value))
        else:
            return False
        return True
    except (TypeError, ValueError):
        return False


def _compute_consistency(dataset: list[dict], column_infos: list) -> float:
    if not dataset or not column_infos:
        return 1.0

    valid_checks = 0
    total_checks = 0

    for info in column_infos:
        values = [row.get(info.name) for row in dataset]
        if info.dtype in {"int", "float"}:
            for value in values:
                total_checks += 1
                if _is_numeric_value(value, info.dtype):
                    valid_checks += 1
        else:
            non_missing = [str(value) for value in values if not is_missing(value)]
            if not non_missing:
                continue
            lowered = {}
            for value in non_missing:
                lowered.setdefault(value.lower(), set()).add(value)
            has_inconsistency = any(len(forms) > 1 for forms in lowered.values())
            total_checks += 1
            if not has_inconsistency:
                valid_checks += 1

    return valid_checks / total_checks if total_checks else 1.0


def compute_quality_score(dataset: list[dict], column_infos: list, original_issues_count: int) -> float:
    if original_issues_count == 0:
        return 0.99

    total_cells = len(dataset) * len(dataset[0]) if dataset else 1
    missing_cells = sum(
        1 for row in dataset for value in row.values() if value is None or value == "" or value == "not_available"
    )
    completeness = 1.0 - (missing_cells / total_cells)

    total_rows = len(dataset)
    unique_rows = len(set(str(sorted(row.items())) for row in dataset))
    uniqueness = unique_rows / total_rows if total_rows > 0 else 1.0

    consistency = _compute_consistency(dataset, column_infos)

    score = 0.4 * completeness + 0.3 * uniqueness + 0.3 * consistency
    return round(max(0.01, min(0.99, score)), 4)