|
|
"""Utility helpers for the Business Intelligence dashboard.""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
from dataclasses import dataclass |
|
|
from typing import Dict, Iterable, List, Tuple |
|
|
|
|
|
import pandas as pd |
|
|
|
|
|
|
|
|
SUPPORTED_FILE_TYPES: Tuple[str, ...] = (".csv", ".xlsx", ".xls") |
|
|
"""Allowed file extensions for uploads.""" |
|
|
|
|
|
PREVIEW_ROWS: int = 5 |
|
|
"""Default number of rows to display in dataset previews.""" |
|
|
|
|
|
|
|
|
@dataclass(frozen=True) |
|
|
class ColumnTypes: |
|
|
"""Container describing inferred column groupings.""" |
|
|
|
|
|
numeric: Tuple[str, ...] |
|
|
categorical: Tuple[str, ...] |
|
|
datetime: Tuple[str, ...] |
|
|
|
|
|
|
|
|
def is_supported_file(filename: str | None) -> bool: |
|
|
"""Return True when the provided filename uses a supported extension.""" |
|
|
if not filename: |
|
|
return False |
|
|
lowered = filename.lower() |
|
|
return any(lowered.endswith(ext) for ext in SUPPORTED_FILE_TYPES) |
|
|
|
|
|
|
|
|
def coerce_datetime_columns(df: pd.DataFrame, threshold: float = 0.6) -> Tuple[pd.DataFrame, Tuple[str, ...]]: |
|
|
"""Attempt to parse object columns as datetimes when enough values can be converted. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
df: |
|
|
Input DataFrame to mutate in-place. |
|
|
threshold: |
|
|
Minimum fraction of non-null values that must successfully convert |
|
|
for the column to be promoted to datetime. |
|
|
|
|
|
Returns |
|
|
------- |
|
|
tuple |
|
|
Mutated DataFrame and the tuple of datetime column names. |
|
|
""" |
|
|
datetime_cols: List[str] = list( |
|
|
df.select_dtypes(include=["datetime64[ns]", "datetime64[ns, UTC]"]).columns |
|
|
) |
|
|
|
|
|
object_cols = df.select_dtypes(include=["object"]).columns |
|
|
for col in object_cols: |
|
|
series = df[col] |
|
|
non_null_ratio = series.notna().mean() |
|
|
if non_null_ratio == 0 or non_null_ratio < threshold: |
|
|
continue |
|
|
converted = pd.to_datetime(series, errors="coerce", utc=False) |
|
|
success_ratio = converted.notna().mean() |
|
|
if success_ratio >= threshold: |
|
|
df[col] = converted |
|
|
datetime_cols.append(col) |
|
|
|
|
|
return df, tuple(sorted(set(datetime_cols))) |
|
|
|
|
|
|
|
|
def infer_column_types(df: pd.DataFrame) -> ColumnTypes: |
|
|
"""Infer high-level data types for the provided DataFrame's columns.""" |
|
|
numeric_cols = tuple(df.select_dtypes(include=["number"]).columns) |
|
|
datetime_cols = tuple(df.select_dtypes(include=["datetime64[ns]", "datetime64[ns, UTC]"]).columns) |
|
|
categorical_cols: List[str] = [] |
|
|
|
|
|
for col in df.columns: |
|
|
if col in numeric_cols or col in datetime_cols: |
|
|
continue |
|
|
categorical_cols.append(col) |
|
|
|
|
|
return ColumnTypes(numeric=numeric_cols, categorical=tuple(categorical_cols), datetime=datetime_cols) |
|
|
|
|
|
|
|
|
def clamp_numeric(value: float, minimum: float, maximum: float) -> float: |
|
|
"""Clamp *value* into the closed range [minimum, maximum].""" |
|
|
return max(minimum, min(maximum, value)) |
|
|
|
|
|
|
|
|
def ensure_unique_columns(df: pd.DataFrame) -> pd.DataFrame: |
|
|
"""Rename duplicate columns to maintain uniqueness.""" |
|
|
if df.columns.is_unique: |
|
|
return df |
|
|
|
|
|
new_columns: List[str] = [] |
|
|
seen: Dict[str, int] = {} |
|
|
for col in df.columns: |
|
|
count = seen.get(col, 0) |
|
|
if count == 0: |
|
|
new_columns.append(col) |
|
|
else: |
|
|
new_columns.append(f"{col}_{count}") |
|
|
seen[col] = count + 1 |
|
|
|
|
|
df = df.copy() |
|
|
df.columns = new_columns |
|
|
return df |
|
|
|
|
|
|
|
|
def shorten_text(value: str, max_length: int = 80) -> str: |
|
|
"""Truncate long text values for cleaner display.""" |
|
|
if len(value) <= max_length: |
|
|
return value |
|
|
return f"{value[: max_length - 3]}..." |
|
|
|
|
|
|
|
|
def safe_column_subset(columns: Iterable[str], allowed: Iterable[str]) -> List[str]: |
|
|
"""Return a list of *columns* that exist inside *allowed*.""" |
|
|
allowed_set = set(allowed) |
|
|
return [col for col in columns if col in allowed_set] |
|
|
|