""" CSV ingest and auto-clean pipeline for time-series data. Provides delimiter detection, date/numeric column suggestion, numeric cleaning (currency, commas, percentages, parenthesised negatives), duplicate and missing-value handling, frequency detection, and calendar-feature extraction. """ import csv import io import re import warnings from dataclasses import dataclass, field from datetime import timedelta import numpy as np import pandas as pd # --------------------------------------------------------------------------- # Dataclasses # --------------------------------------------------------------------------- @dataclass class CleaningReport: """Summary produced by :func:`clean_dataframe`.""" rows_before: int = 0 rows_after: int = 0 duplicates_found: int = 0 duplicates_action: str = "" missing_before: dict = field(default_factory=dict) missing_after: dict = field(default_factory=dict) parsing_warnings: list = field(default_factory=list) @dataclass class FrequencyInfo: """Result of :func:`detect_frequency`.""" label: str = "Unknown" median_delta: timedelta = timedelta(0) is_regular: bool = False # --------------------------------------------------------------------------- # Delimiter detection # --------------------------------------------------------------------------- def detect_delimiter(file_bytes: bytes) -> str: """Return the most likely CSV delimiter for *file_bytes*. Uses :class:`csv.Sniffer` on the first 8 KB of text. Falls back to a comma if the sniffer cannot decide. """ try: sample = file_bytes[:8192].decode("utf-8", errors="replace") dialect = csv.Sniffer().sniff(sample) return dialect.delimiter except csv.Error: return "," # --------------------------------------------------------------------------- # Reading uploads # --------------------------------------------------------------------------- def read_csv_upload(uploaded_file) -> tuple[pd.DataFrame, str]: """Read a Streamlit ``UploadedFile`` and return ``(df, delimiter)``. The file position is rewound so the object can be re-read later if needed. """ raw = uploaded_file.getvalue() delimiter = detect_delimiter(raw) text = raw.decode("utf-8", errors="replace") df = pd.read_csv(io.StringIO(text), sep=delimiter) # Rewind in case the caller wants to read again uploaded_file.seek(0) return df, delimiter # --------------------------------------------------------------------------- # Column suggestion helpers # --------------------------------------------------------------------------- _DATE_NAME_TOKENS = re.compile(r"(date|time|year|month|day|period)", re.IGNORECASE) def suggest_date_columns(df: pd.DataFrame) -> list[str]: """Return column names that are likely to contain date/time values. Checks are applied in order: 1. Column already has a datetime dtype. 2. :func:`pd.to_datetime` succeeds on the first non-null values. 3. The column *name* contains a date-related keyword. """ candidates: list[str] = [] for col in df.columns: name_has_token = bool(_DATE_NAME_TOKENS.search(str(col))) # 1. Already datetime if pd.api.types.is_datetime64_any_dtype(df[col]): if col not in candidates: candidates.append(col) continue # 2. Parseable as datetime (check a sample of non-null values) sample = df[col].dropna().head(20) if not sample.empty and (name_has_token or not pd.api.types.is_numeric_dtype(df[col])): try: with warnings.catch_warnings(): warnings.simplefilter("ignore", UserWarning) parsed = pd.to_datetime(sample, errors="coerce") if parsed.notna().mean() >= 0.8: if col not in candidates: candidates.append(col) continue except (ValueError, TypeError, OverflowError): pass # 3. Column name heuristic if name_has_token: if col not in candidates: candidates.append(col) return candidates def suggest_numeric_columns(df: pd.DataFrame) -> list[str]: """Return columns that are numeric or could be cleaned to numeric. A non-numeric column qualifies if, after stripping common formatting characters (currency symbols, commas, ``%``, parentheses), at least half of its non-null values can be converted to a number. """ candidates: list[str] = [] for col in df.columns: if pd.api.types.is_numeric_dtype(df[col]): candidates.append(col) continue # Attempt lightweight cleaning on a sample sample = df[col].dropna().head(50).astype(str) if sample.empty: continue cleaned = ( sample .str.replace(r"[\$\u20ac\u00a3,% ]", "", regex=True) .str.replace(r"^\((.+)\)$", r"-\1", regex=True) ) numeric = pd.to_numeric(cleaned, errors="coerce") if numeric.notna().sum() >= max(1, len(sample) * 0.5): candidates.append(col) return candidates # --------------------------------------------------------------------------- # Numeric cleaning # --------------------------------------------------------------------------- def clean_numeric_series(series: pd.Series) -> pd.Series: """Clean a series into proper numeric values. Handles: * Currency symbols: ``$``, ``EUR`` (U+20AC), ``GBP`` (U+00A3) * Thousands separators (commas) * Percentage signs * Parenthesised negatives, e.g. ``(123)`` becomes ``-123`` """ s = series.astype(str) # Strip currency symbols, commas, percent signs, and whitespace s = s.str.replace(r"[\$\u20ac\u00a3,%\s]", "", regex=True) # Convert accounting-style negatives: (123.45) -> -123.45 s = s.str.replace(r"^\((.+)\)$", r"-\1", regex=True) return pd.to_numeric(s, errors="coerce") # --------------------------------------------------------------------------- # Full cleaning pipeline # --------------------------------------------------------------------------- def clean_dataframe( df: pd.DataFrame, date_col: str, y_cols: list[str], dup_action: str = "keep_last", missing_action: str = "interpolate", ) -> tuple[pd.DataFrame, CleaningReport]: """Run the full cleaning pipeline and return ``(cleaned_df, report)``. Parameters ---------- df: Input dataframe (will not be mutated). date_col: Name of the column to parse as dates. y_cols: Names of the value columns to clean to numeric. dup_action: How to handle duplicate dates: ``"keep_first"``, ``"keep_last"``, or ``"drop_all"``. missing_action: How to handle missing values in *y_cols*: ``"interpolate"``, ``"ffill"``, or ``"drop"``. """ df = df.copy() report = CleaningReport() report.rows_before = len(df) # --- Parse date column ------------------------------------------------ try: df[date_col] = pd.to_datetime(df[date_col]) except Exception as exc: # noqa: BLE001 report.parsing_warnings.append( f"Date parsing issue in column '{date_col}': {exc}" ) # Coerce individually so partial failures become NaT df[date_col] = pd.to_datetime(df[date_col], errors="coerce") nat_count = int(df[date_col].isna().sum()) if nat_count > 0: report.parsing_warnings.append( f"{nat_count} value(s) in '{date_col}' could not be parsed as dates." ) df = df.dropna(subset=[date_col]) # --- Clean numeric columns -------------------------------------------- for col in y_cols: if not pd.api.types.is_numeric_dtype(df[col]): df[col] = clean_numeric_series(df[col]) # Record missing values *before* imputation report.missing_before = { col: int(df[col].isna().sum()) for col in y_cols } # --- Handle duplicates on date column --------------------------------- dup_mask = df.duplicated(subset=[date_col], keep=False) report.duplicates_found = int(dup_mask.sum()) report.duplicates_action = dup_action if report.duplicates_found > 0: if dup_action == "keep_first": df = df.drop_duplicates(subset=[date_col], keep="first") elif dup_action == "keep_last": df = df.drop_duplicates(subset=[date_col], keep="last") elif dup_action == "drop_all": df = df[~dup_mask] # --- Sort by date ----------------------------------------------------- df = df.sort_values(date_col).reset_index(drop=True) # --- Handle missing values -------------------------------------------- if missing_action == "interpolate": df[y_cols] = df[y_cols].interpolate(method="linear", limit_direction="both") elif missing_action == "ffill": df[y_cols] = df[y_cols].ffill().bfill() elif missing_action == "drop": df = df.dropna(subset=y_cols) report.missing_after = { col: int(df[col].isna().sum()) for col in y_cols } report.rows_after = len(df) return df, report # --------------------------------------------------------------------------- # Frequency detection # --------------------------------------------------------------------------- def detect_frequency(df: pd.DataFrame, date_col: str) -> FrequencyInfo: """Classify the time-series frequency based on median time delta. Returns a :class:`FrequencyInfo` with a human-readable label, the computed median delta, and whether the series is *regular* (the standard deviation of deltas is less than 20 % of the median). """ dates = df[date_col].dropna().sort_values() if len(dates) < 2: return FrequencyInfo(label="Unknown", median_delta=timedelta(0), is_regular=False) deltas = dates.diff().dropna() median_delta = deltas.median() # Regularity: std < 20% of median std_delta = deltas.std() is_regular = bool(std_delta <= median_delta * 0.2) if median_delta > timedelta(0) else False # Classify by median days days = median_delta.days if days <= 1: label = "Daily" elif 5 <= days <= 9: label = "Weekly" elif 25 <= days <= 35: label = "Monthly" elif 85 <= days <= 100: label = "Quarterly" elif 350 <= days <= 380: label = "Yearly" else: label = "Irregular" return FrequencyInfo(label=label, median_delta=median_delta, is_regular=is_regular) # --------------------------------------------------------------------------- # Calendar feature extraction # --------------------------------------------------------------------------- def detect_long_format( df: pd.DataFrame, date_col: str, ) -> tuple[bool, str | None, str | None]: """Heuristic: detect whether *df* is in long (stacked) format. Returns ``(is_long, group_col, value_col)``. A DataFrame is flagged as *long* when the date column contains duplicate values **and** there is at least one string/object column among the remaining columns (the likely group identifier). """ if date_col not in df.columns: return False, None, None dates = df[date_col] if dates.nunique() >= len(dates): # Every date is unique → wide format return False, None, None remaining = [c for c in df.columns if c != date_col] # Find first string/object column → candidate group column group_col: str | None = None for c in remaining: if df[c].dtype == object or pd.api.types.is_string_dtype(df[c]): group_col = c break if group_col is None: return False, None, None # Find first numeric column (excluding the group column) → candidate value value_col: str | None = None for c in remaining: if c == group_col: continue if pd.api.types.is_numeric_dtype(df[c]): value_col = c break if value_col is None: return False, None, None return True, group_col, value_col def pivot_long_to_wide( df: pd.DataFrame, date_col: str, group_col: str, value_col: str, ) -> pd.DataFrame: """Pivot a long-format DataFrame to wide format. Parameters ---------- df: Long-format dataframe. date_col: Column with date values (becomes the index/row key). group_col: Column whose unique values become the new column headers. value_col: Column with the numeric values to spread. Returns ------- pd.DataFrame Wide dataframe with *date_col* as a regular column and one column per unique value in *group_col*. """ wide = df.pivot_table( index=date_col, columns=group_col, values=value_col, aggfunc="first", ) # Flatten MultiIndex column names to plain strings wide.columns = [str(c) for c in wide.columns] wide = wide.reset_index() return wide # --------------------------------------------------------------------------- # Calendar feature extraction # --------------------------------------------------------------------------- def add_time_features(df: pd.DataFrame, date_col: str) -> pd.DataFrame: """Add calendar columns derived from *date_col*. New columns: ``year``, ``quarter``, ``month``, ``day_of_week``. The dataframe is returned (not copied) with new columns appended. """ dt = df[date_col].dt df["year"] = dt.year df["quarter"] = dt.quarter df["month"] = dt.month df["day_of_week"] = dt.dayofweek return df