File size: 3,802 Bytes
a48d292
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f81a8b5
a48d292
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
"""Utility helpers for the Business Intelligence dashboard."""

from __future__ import annotations

from dataclasses import dataclass
from typing import Dict, Iterable, List, Tuple

import pandas as pd


SUPPORTED_FILE_TYPES: Tuple[str, ...] = (".csv", ".xlsx", ".xls")
"""Allowed file extensions for uploads."""

PREVIEW_ROWS: int = 5
"""Default number of rows to display in dataset previews."""


@dataclass(frozen=True)
class ColumnTypes:
    """Container describing inferred column groupings."""

    numeric: Tuple[str, ...]
    categorical: Tuple[str, ...]
    datetime: Tuple[str, ...]


def is_supported_file(filename: str | None) -> bool:
    """Return True when the provided filename uses a supported extension."""
    if not filename:
        return False
    lowered = filename.lower()
    return any(lowered.endswith(ext) for ext in SUPPORTED_FILE_TYPES)


def coerce_datetime_columns(df: pd.DataFrame, threshold: float = 0.6) -> Tuple[pd.DataFrame, Tuple[str, ...]]:
    """Attempt to parse object columns as datetimes when enough values can be converted.

    Parameters
    ----------
    df:
        Input DataFrame to mutate in-place.
    threshold:
        Minimum fraction of non-null values that must successfully convert
        for the column to be promoted to datetime.

    Returns
    -------
    tuple
        Mutated DataFrame and the tuple of datetime column names.
    """
    datetime_cols: List[str] = list(
        df.select_dtypes(include=["datetime64[ns]", "datetime64[ns, UTC]"]).columns
    )

    object_cols = df.select_dtypes(include=["object"]).columns
    for col in object_cols:
        series = df[col]
        non_null_ratio = series.notna().mean()
        if non_null_ratio == 0 or non_null_ratio < threshold:
            continue
        converted = pd.to_datetime(series, errors="coerce", utc=False)
        success_ratio = converted.notna().mean()
        if success_ratio >= threshold:
            df[col] = converted
            datetime_cols.append(col)

    return df, tuple(sorted(set(datetime_cols)))


def infer_column_types(df: pd.DataFrame) -> ColumnTypes:
    """Infer high-level data types for the provided DataFrame's columns."""
    numeric_cols = tuple(df.select_dtypes(include=["number"]).columns)
    datetime_cols = tuple(df.select_dtypes(include=["datetime64[ns]", "datetime64[ns, UTC]"]).columns)
    categorical_cols: List[str] = []

    for col in df.columns:
        if col in numeric_cols or col in datetime_cols:
            continue
        categorical_cols.append(col)

    return ColumnTypes(numeric=numeric_cols, categorical=tuple(categorical_cols), datetime=datetime_cols)


def clamp_numeric(value: float, minimum: float, maximum: float) -> float:
    """Clamp *value* into the closed range [minimum, maximum]."""
    return max(minimum, min(maximum, value))


def ensure_unique_columns(df: pd.DataFrame) -> pd.DataFrame:
    """Rename duplicate columns to maintain uniqueness."""
    if df.columns.is_unique:
        return df

    new_columns: List[str] = []
    seen: Dict[str, int] = {}
    for col in df.columns:
        count = seen.get(col, 0)
        if count == 0:
            new_columns.append(col)
        else:
            new_columns.append(f"{col}_{count}")
        seen[col] = count + 1

    df = df.copy()
    df.columns = new_columns
    return df


def shorten_text(value: str, max_length: int = 80) -> str:
    """Truncate long text values for cleaner display."""
    if len(value) <= max_length:
        return value
    return f"{value[: max_length - 3]}..."


def safe_column_subset(columns: Iterable[str], allowed: Iterable[str]) -> List[str]:
    """Return a list of *columns* that exist inside *allowed*."""
    allowed_set = set(allowed)
    return [col for col in columns if col in allowed_set]