File size: 3,802 Bytes
a48d292 f81a8b5 a48d292 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
"""Utility helpers for the Business Intelligence dashboard."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Dict, Iterable, List, Tuple
import pandas as pd
SUPPORTED_FILE_TYPES: Tuple[str, ...] = (".csv", ".xlsx", ".xls")
"""Allowed file extensions for uploads."""
PREVIEW_ROWS: int = 5
"""Default number of rows to display in dataset previews."""
@dataclass(frozen=True)
class ColumnTypes:
"""Container describing inferred column groupings."""
numeric: Tuple[str, ...]
categorical: Tuple[str, ...]
datetime: Tuple[str, ...]
def is_supported_file(filename: str | None) -> bool:
"""Return True when the provided filename uses a supported extension."""
if not filename:
return False
lowered = filename.lower()
return any(lowered.endswith(ext) for ext in SUPPORTED_FILE_TYPES)
def coerce_datetime_columns(df: pd.DataFrame, threshold: float = 0.6) -> Tuple[pd.DataFrame, Tuple[str, ...]]:
"""Attempt to parse object columns as datetimes when enough values can be converted.
Parameters
----------
df:
Input DataFrame to mutate in-place.
threshold:
Minimum fraction of non-null values that must successfully convert
for the column to be promoted to datetime.
Returns
-------
tuple
Mutated DataFrame and the tuple of datetime column names.
"""
datetime_cols: List[str] = list(
df.select_dtypes(include=["datetime64[ns]", "datetime64[ns, UTC]"]).columns
)
object_cols = df.select_dtypes(include=["object"]).columns
for col in object_cols:
series = df[col]
non_null_ratio = series.notna().mean()
if non_null_ratio == 0 or non_null_ratio < threshold:
continue
converted = pd.to_datetime(series, errors="coerce", utc=False)
success_ratio = converted.notna().mean()
if success_ratio >= threshold:
df[col] = converted
datetime_cols.append(col)
return df, tuple(sorted(set(datetime_cols)))
def infer_column_types(df: pd.DataFrame) -> ColumnTypes:
"""Infer high-level data types for the provided DataFrame's columns."""
numeric_cols = tuple(df.select_dtypes(include=["number"]).columns)
datetime_cols = tuple(df.select_dtypes(include=["datetime64[ns]", "datetime64[ns, UTC]"]).columns)
categorical_cols: List[str] = []
for col in df.columns:
if col in numeric_cols or col in datetime_cols:
continue
categorical_cols.append(col)
return ColumnTypes(numeric=numeric_cols, categorical=tuple(categorical_cols), datetime=datetime_cols)
def clamp_numeric(value: float, minimum: float, maximum: float) -> float:
"""Clamp *value* into the closed range [minimum, maximum]."""
return max(minimum, min(maximum, value))
def ensure_unique_columns(df: pd.DataFrame) -> pd.DataFrame:
"""Rename duplicate columns to maintain uniqueness."""
if df.columns.is_unique:
return df
new_columns: List[str] = []
seen: Dict[str, int] = {}
for col in df.columns:
count = seen.get(col, 0)
if count == 0:
new_columns.append(col)
else:
new_columns.append(f"{col}_{count}")
seen[col] = count + 1
df = df.copy()
df.columns = new_columns
return df
def shorten_text(value: str, max_length: int = 80) -> str:
"""Truncate long text values for cleaner display."""
if len(value) <= max_length:
return value
return f"{value[: max_length - 3]}..."
def safe_column_subset(columns: Iterable[str], allowed: Iterable[str]) -> List[str]:
"""Return a list of *columns* that exist inside *allowed*."""
allowed_set = set(allowed)
return [col for col in columns if col in allowed_set]
|