File size: 11,336 Bytes
a48d292
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
"""Data loading, cleaning, and filtering helpers for the BI dashboard."""

from __future__ import annotations

from dataclasses import dataclass
from io import BytesIO
from pathlib import Path
from typing import Dict, Iterable, List, Mapping, Optional, Tuple

import pandas as pd

from utils import (
    ColumnTypes,
    PREVIEW_ROWS,
    coerce_datetime_columns,
    ensure_unique_columns,
    infer_column_types,
    is_supported_file,
)

SAMPLE_DATA_DIR = Path(__file__).resolve().parent / "data"
SAMPLE_DESCRIPTIONS = {
    "train.csv": "Weekly Walmart sales with markdowns and holidays (training set).",
    "test.csv": "Companion test set without weekly sales labels.",
    "features.csv": "Store-level features such as markdowns, CPI, unemployment.",
    "stores.csv": "Store metadata including type and size.",
}


@dataclass(frozen=True)
class DatasetBundle:
    """Container storing the dataset and metadata required by the UI."""

    dataframe: pd.DataFrame
    column_types: ColumnTypes
    source_name: str


def load_dataset(file_obj) -> DatasetBundle:
    """Load the provided uploaded file into a pandas DataFrame.

    Parameters
    ----------
    file_obj:
        File-like object produced by the Gradio upload widget.

    Returns
    -------
    DatasetBundle
        Loaded dataset alongside inferred column metadata.

    Raises
    ------
    ValueError
        If the file cannot be read or uses an unsupported format.
    """
    if file_obj is None:
        raise ValueError("Please upload a CSV or Excel file.")

    file_name = getattr(file_obj, "name", None)
    original_name = getattr(file_obj, "orig_name", file_name)

    if not original_name or not is_supported_file(original_name):
        raise ValueError("Unsupported file type. Please upload a CSV or Excel file.")

    path_candidate = Path(str(file_name)) if file_name else None
    dataframe: Optional[pd.DataFrame] = None

    try:
        if path_candidate and path_candidate.exists():
            dataframe = _read_from_path(path_candidate, original_name)
        else:
            dataframe = _read_from_buffer(file_obj, original_name)
    except Exception as exc:  # pragma: no cover - defensive conversion
        raise ValueError(f"Unable to load dataset: {exc}") from exc

    if dataframe is None:
        raise ValueError("Failed to load dataset. The file may be empty or corrupted.")

    dataframe = ensure_unique_columns(dataframe)
    dataframe, datetime_cols = coerce_datetime_columns(dataframe)
    column_types = infer_column_types(dataframe)

    # Ensure newly detected datetime columns are included in metadata
    column_types = ColumnTypes(
        numeric=column_types.numeric,
        categorical=column_types.categorical,
        datetime=tuple(sorted(set(column_types.datetime + tuple(datetime_cols)))),
    )

    return DatasetBundle(
        dataframe=dataframe,
        column_types=column_types,
        source_name=Path(original_name).name,
    )


def _read_from_path(path: Path, original_name: str) -> pd.DataFrame:
    """Read a dataset from disk."""
    suffix = path.suffix.lower()
    if suffix == ".csv":
        return pd.read_csv(path)
    if suffix in {".xlsx", ".xls"}:
        return pd.read_excel(path)
    raise ValueError(f"Unsupported file extension in {original_name}.")


def _read_from_buffer(file_obj, original_name: str) -> pd.DataFrame:
    """Read a dataset from an in-memory buffer."""
    bytes_data = getattr(file_obj, "read", lambda: b"")()
    if not bytes_data:
        raise ValueError(f"The uploaded file '{original_name}' is empty.")

    buffer = BytesIO(bytes_data)
    lowered = original_name.lower()
    if lowered.endswith(".csv"):
        return pd.read_csv(buffer)
    if lowered.endswith((".xlsx", ".xls")):
        return pd.read_excel(buffer)

    raise ValueError("Only CSV and Excel files are supported.")


def dataset_overview(df: pd.DataFrame) -> Dict[str, object]:
    """Return basic information about the dataset."""
    info = {
        "Rows": int(df.shape[0]),
        "Columns": int(df.shape[1]),
        "Memory Usage (MB)": round(df.memory_usage(deep=True).sum() / (1024**2), 2),
    }
    dtypes = pd.DataFrame({"Column": df.columns, "Type": df.dtypes.astype(str)})
    return {"info": info, "dtypes": dtypes}


def dataset_preview(df: pd.DataFrame, rows: int = PREVIEW_ROWS) -> Dict[str, pd.DataFrame]:
    """Return head and tail previews of the dataset."""
    return {
        "head": df.head(rows),
        "tail": df.tail(rows),
    }


def numeric_summary(df: pd.DataFrame) -> pd.DataFrame:
    """Compute descriptive statistics for numeric columns."""
    numeric_df = df.select_dtypes(include=["number"])
    if numeric_df.empty:
        return pd.DataFrame()

    summary = pd.DataFrame(
        {
            "count": numeric_df.count(),
            "mean": numeric_df.mean(),
            "median": numeric_df.median(),
            "std": numeric_df.std(),
            "min": numeric_df.min(),
            "25%": numeric_df.quantile(0.25),
            "75%": numeric_df.quantile(0.75),
            "max": numeric_df.max(),
        }
    )

    summary.index.name = "column"
    return summary.round(3)


def categorical_summary(df: pd.DataFrame, top_values: int = 5) -> pd.DataFrame:
    """Compute summary statistics for categorical columns."""
    categorical_cols = df.select_dtypes(exclude=["number", "datetime64[ns]", "datetime64[ns, UTC]"])
    if categorical_cols.empty:
        return pd.DataFrame()

    rows: List[Dict[str, object]] = []
    for column in categorical_cols:
        series = categorical_cols[column]
        mode_series = series.mode(dropna=True)
        mode_value = mode_series.iloc[0] if not mode_series.empty else None
        counts = series.value_counts(dropna=True).head(top_values)
        top_repr = ", ".join(f"{idx} ({count})" for idx, count in counts.items())
        rows.append(
            {
                "column": column,
                "unique_values": int(series.nunique(dropna=True)),
                "mode": mode_value,
                "mode_count": int(counts.iloc[0]) if not counts.empty else 0,
                f"top_{top_values}": top_repr,
            }
        )

    return pd.DataFrame(rows)


def missing_value_report(df: pd.DataFrame) -> pd.DataFrame:
    """Return the count and percentage of missing values per column."""
    missing_counts = df.isna().sum()
    if missing_counts.sum() == 0:
        return pd.DataFrame(columns=["column", "missing_count", "missing_pct"])

    missing_pct = (missing_counts / len(df)) * 100
    report = pd.DataFrame(
        {
            "column": missing_counts.index,
            "missing_count": missing_counts.values,
            "missing_pct": missing_pct.values,
        }
    )
    return report.sort_values(by="missing_pct", ascending=False).reset_index(drop=True).round({"missing_pct": 2})


def correlation_matrix(df: pd.DataFrame) -> pd.DataFrame:
    """Compute the correlation matrix for numeric columns."""
    numeric_df = df.select_dtypes(include=["number"])
    if numeric_df.empty or numeric_df.shape[1] < 2:
        return pd.DataFrame()
    corr = numeric_df.corr()
    return corr.round(3)


def filter_dataframe(
    df: pd.DataFrame,
    numeric_filters: Mapping[str, Tuple[Optional[float], Optional[float]]],
    categorical_filters: Mapping[str, Iterable[str]],
    date_filters: Mapping[str, Tuple[Optional[str], Optional[str]]],
) -> pd.DataFrame:
    """Filter the dataset according to the provided filter definitions."""
    filtered = df.copy()

    for column, bounds in numeric_filters.items():
        if column not in filtered.columns or bounds is None:
            continue
        lower, upper = bounds
        series = filtered[column]
        if lower is not None:
            filtered = filtered[series >= lower]
        if upper is not None:
            filtered = filtered[series <= upper]

    for column, values in categorical_filters.items():
        if column not in filtered.columns:
            continue
        values = list(values)
        if not values:
            continue
        filtered = filtered[filtered[column].isin(values)]

    for column, bounds in date_filters.items():
        if column not in filtered.columns or bounds is None:
            continue
        start, end = bounds
        series = pd.to_datetime(filtered[column], errors="coerce")
        if start:
            filtered = filtered[series >= pd.to_datetime(start)]
        if end:
            filtered = filtered[series <= pd.to_datetime(end)]

    return filtered


def filter_metadata(df: pd.DataFrame, column_types: ColumnTypes, categorical_limit: int = 200) -> Dict[str, object]:
    """Pre-compute useful metadata for rendering filter controls."""
    metadata: Dict[str, object] = {"numeric": {}, "categorical": {}, "datetime": {}}

    for column in column_types.numeric:
        series = df[column].dropna()
        if series.empty:
            continue
        metadata["numeric"][column] = {
            "min": float(series.min()),
            "max": float(series.max()),
        }

    for column in column_types.categorical:
        series = df[column].dropna().astype(str)
        unique_values = series.unique().tolist()
        if len(unique_values) > categorical_limit:
            unique_values = unique_values[:categorical_limit]
        metadata["categorical"][column] = unique_values

    for column in column_types.datetime:
        series = pd.to_datetime(df[column], errors="coerce")
        series = series.dropna()
        if series.empty:
            continue
        metadata["datetime"][column] = {
            "min": series.min().date(),
            "max": series.max().date(),
        }

    return metadata


def sample_dataset_options() -> Dict[str, str]:
    """Return available bundled datasets and their descriptions."""
    options: Dict[str, str] = {}
    if not SAMPLE_DATA_DIR.exists():
        return options

    for path in sorted(SAMPLE_DATA_DIR.iterdir()):
        if not path.is_file():
            continue
        if path.suffix.lower() not in {".csv", ".xlsx", ".xls"}:
            continue
        description = SAMPLE_DESCRIPTIONS.get(path.name, f"Sample dataset sourced from '{path.name}'.")
        options[path.name] = description
    return options


def load_sample_dataset(selection: str) -> DatasetBundle:
    """Load a dataset bundled inside the local data directory."""
    if not selection:
        raise ValueError("Please select a sample dataset from the dropdown.")

    path = SAMPLE_DATA_DIR / selection
    if not path.exists():
        raise ValueError(
            f"Sample dataset '{selection}' was not found in the 'data/' directory. "
            "Ensure the file exists and try again."
        )

    dataframe = _read_from_path(path, selection)
    dataframe = ensure_unique_columns(dataframe)
    dataframe, datetime_cols = coerce_datetime_columns(dataframe)
    column_types = infer_column_types(dataframe)
    column_types = ColumnTypes(
        numeric=column_types.numeric,
        categorical=column_types.categorical,
        datetime=tuple(sorted(set(column_types.datetime + tuple(datetime_cols)))),
    )

    return DatasetBundle(
        dataframe=dataframe,
        column_types=column_types,
        source_name=selection,
    )