"""Utility helpers for the Business Intelligence dashboard.""" from __future__ import annotations from dataclasses import dataclass from typing import Dict, Iterable, List, Tuple import pandas as pd SUPPORTED_FILE_TYPES: Tuple[str, ...] = (".csv", ".xlsx", ".xls") """Allowed file extensions for uploads.""" PREVIEW_ROWS: int = 5 """Default number of rows to display in dataset previews.""" @dataclass(frozen=True) class ColumnTypes: """Container describing inferred column groupings.""" numeric: Tuple[str, ...] categorical: Tuple[str, ...] datetime: Tuple[str, ...] def is_supported_file(filename: str | None) -> bool: """Return True when the provided filename uses a supported extension.""" if not filename: return False lowered = filename.lower() return any(lowered.endswith(ext) for ext in SUPPORTED_FILE_TYPES) def coerce_datetime_columns(df: pd.DataFrame, threshold: float = 0.6) -> Tuple[pd.DataFrame, Tuple[str, ...]]: """Attempt to parse object columns as datetimes when enough values can be converted. Parameters ---------- df: Input DataFrame to mutate in-place. threshold: Minimum fraction of non-null values that must successfully convert for the column to be promoted to datetime. Returns ------- tuple Mutated DataFrame and the tuple of datetime column names. """ datetime_cols: List[str] = list( df.select_dtypes(include=["datetime64[ns]", "datetime64[ns, UTC]"]).columns ) object_cols = df.select_dtypes(include=["object"]).columns for col in object_cols: series = df[col] non_null_ratio = series.notna().mean() if non_null_ratio == 0 or non_null_ratio < threshold: continue converted = pd.to_datetime(series, errors="coerce", utc=False) success_ratio = converted.notna().mean() if success_ratio >= threshold: df[col] = converted datetime_cols.append(col) return df, tuple(sorted(set(datetime_cols))) def infer_column_types(df: pd.DataFrame) -> ColumnTypes: """Infer high-level data types for the provided DataFrame's columns.""" numeric_cols = tuple(df.select_dtypes(include=["number"]).columns) datetime_cols = tuple(df.select_dtypes(include=["datetime64[ns]", "datetime64[ns, UTC]"]).columns) categorical_cols: List[str] = [] for col in df.columns: if col in numeric_cols or col in datetime_cols: continue categorical_cols.append(col) return ColumnTypes(numeric=numeric_cols, categorical=tuple(categorical_cols), datetime=datetime_cols) def clamp_numeric(value: float, minimum: float, maximum: float) -> float: """Clamp *value* into the closed range [minimum, maximum].""" return max(minimum, min(maximum, value)) def ensure_unique_columns(df: pd.DataFrame) -> pd.DataFrame: """Rename duplicate columns to maintain uniqueness.""" if df.columns.is_unique: return df new_columns: List[str] = [] seen: Dict[str, int] = {} for col in df.columns: count = seen.get(col, 0) if count == 0: new_columns.append(col) else: new_columns.append(f"{col}_{count}") seen[col] = count + 1 df = df.copy() df.columns = new_columns return df def shorten_text(value: str, max_length: int = 80) -> str: """Truncate long text values for cleaner display.""" if len(value) <= max_length: return value return f"{value[: max_length - 3]}..." def safe_column_subset(columns: Iterable[str], allowed: Iterable[str]) -> List[str]: """Return a list of *columns* that exist inside *allowed*.""" allowed_set = set(allowed) return [col for col in columns if col in allowed_set]