| from __future__ import annotations | |
| from pathlib import Path | |
| from typing import Any, Iterable | |
| import pandas as pd | |
| TIMESTAMP_NAME_HINTS = ("timestamp", "time", "date", "datetime", "ds") | |
| def _resolve_file_path(file_obj: Any) -> Path: | |
| if file_obj is None: | |
| raise ValueError("No file provided.") | |
| if isinstance(file_obj, (str, Path)): | |
| return Path(file_obj) | |
| if isinstance(file_obj, dict): | |
| maybe_path = file_obj.get("path") or file_obj.get("name") | |
| if maybe_path: | |
| return Path(maybe_path) | |
| maybe_name = getattr(file_obj, "name", None) | |
| if maybe_name: | |
| return Path(maybe_name) | |
| raise ValueError("Unsupported file object type.") | |
| def load_timeseries_from_file(file_obj: Any) -> pd.DataFrame: | |
| file_path = _resolve_file_path(file_obj) | |
| suffix = file_path.suffix.lower() | |
| if suffix == ".csv": | |
| df = pd.read_csv(file_path) | |
| elif suffix in {".xlsx", ".xls"}: | |
| df = pd.read_excel(file_path) | |
| else: | |
| raise ValueError(f"Unsupported file extension: {suffix}") | |
| if df.empty: | |
| raise ValueError("Uploaded file is empty.") | |
| df.columns = [str(col).strip() for col in df.columns] | |
| timestamp_col = detect_timestamp_column(df) | |
| if timestamp_col: | |
| df[timestamp_col] = pd.to_datetime(df[timestamp_col], errors="coerce") | |
| return df | |
| def detect_timestamp_column(df: pd.DataFrame) -> str | None: | |
| for col in df.columns: | |
| if pd.api.types.is_datetime64_any_dtype(df[col]): | |
| return col | |
| for col in df.columns: | |
| normalized = str(col).strip().lower() | |
| if any(hint in normalized for hint in TIMESTAMP_NAME_HINTS): | |
| parsed = pd.to_datetime(df[col], errors="coerce") | |
| if parsed.notna().mean() >= 0.7: | |
| return col | |
| for col in df.columns: | |
| series = df[col] | |
| if pd.api.types.is_numeric_dtype(series): | |
| continue | |
| parsed = pd.to_datetime(series, errors="coerce") | |
| if parsed.notna().mean() >= 0.9: | |
| return col | |
| return None | |
| def detect_numeric_columns(df: pd.DataFrame, exclude: Iterable[str] | None = None) -> list[str]: | |
| excluded = set(exclude or []) | |
| numeric_cols: list[str] = [] | |
| for col in df.columns: | |
| if col in excluded: | |
| continue | |
| if pd.api.types.is_numeric_dtype(df[col]): | |
| numeric_cols.append(col) | |
| return numeric_cols | |
| def infer_frequency(df: pd.DataFrame, timestamp_col: str | None) -> str | None: | |
| if not timestamp_col or timestamp_col not in df.columns: | |
| return None | |
| timestamps = pd.to_datetime(df[timestamp_col], errors="coerce").dropna().sort_values() | |
| timestamps = timestamps.drop_duplicates() | |
| if len(timestamps) < 3: | |
| return None | |
| inferred = pd.infer_freq(timestamps) | |
| if inferred: | |
| return inferred | |
| deltas = timestamps.diff().dropna() | |
| if deltas.empty: | |
| return None | |
| mode_delta = deltas.mode() | |
| if mode_delta.empty: | |
| return None | |
| try: | |
| return pd.tseries.frequencies.to_offset(mode_delta.iloc[0]).freqstr | |
| except ValueError: | |
| return None | |
| def validate_timeseries( | |
| df: pd.DataFrame, | |
| timestamp_col: str | None = None, | |
| target_cols: list[str] | None = None, | |
| min_length: int = 20, | |
| ) -> dict[str, Any]: | |
| report: dict[str, Any] = { | |
| "is_valid": True, | |
| "errors": [], | |
| "warnings": [], | |
| "timestamp_column": timestamp_col, | |
| "target_columns": target_cols or [], | |
| "missing_summary": {}, | |
| "inferred_frequency": None, | |
| } | |
| if df.empty: | |
| report["errors"].append("Dataset is empty.") | |
| report["is_valid"] = False | |
| return report | |
| timestamp_col = timestamp_col or detect_timestamp_column(df) | |
| report["timestamp_column"] = timestamp_col | |
| if not timestamp_col: | |
| report["errors"].append("Could not detect a timestamp column.") | |
| elif timestamp_col not in df.columns: | |
| report["errors"].append(f"Timestamp column '{timestamp_col}' not found.") | |
| else: | |
| ts = pd.to_datetime(df[timestamp_col], errors="coerce") | |
| invalid_rate = 1.0 - ts.notna().mean() | |
| if invalid_rate > 0: | |
| report["warnings"].append( | |
| f"{invalid_rate:.1%} of timestamp values could not be parsed." | |
| ) | |
| inferred = infer_frequency(df, timestamp_col) | |
| report["inferred_frequency"] = inferred | |
| if not inferred: | |
| report["warnings"].append( | |
| "Could not infer a regular frequency; forecasting still runs with best effort." | |
| ) | |
| else: | |
| sorted_ts = ts.dropna().sort_values().drop_duplicates() | |
| if len(sorted_ts) >= 3: | |
| diffs = sorted_ts.diff().dropna() | |
| if diffs.nunique() > 1: | |
| report["warnings"].append( | |
| "Timestamp intervals are irregular; model accuracy may degrade." | |
| ) | |
| if target_cols is None: | |
| target_cols = detect_numeric_columns(df, exclude=[timestamp_col] if timestamp_col else None) | |
| report["target_columns"] = target_cols | |
| if not target_cols: | |
| report["errors"].append("No numeric target columns found.") | |
| else: | |
| for col in target_cols: | |
| if col not in df.columns: | |
| report["errors"].append(f"Target column '{col}' not found.") | |
| continue | |
| missing = int(df[col].isna().sum()) | |
| report["missing_summary"][col] = missing | |
| if missing > 0: | |
| report["warnings"].append( | |
| f"Target '{col}' contains {missing} missing values (interpolation suggested)." | |
| ) | |
| if len(df) < min_length: | |
| report["warnings"].append( | |
| f"Series length ({len(df)}) is short; {min_length}+ points is recommended." | |
| ) | |
| if report["errors"]: | |
| report["is_valid"] = False | |
| return report | |
| def format_validation_report(report: dict[str, Any]) -> str: | |
| lines: list[str] = [] | |
| status = "Valid" if report.get("is_valid") else "Invalid" | |
| lines.append(f"Status: {status}") | |
| timestamp_col = report.get("timestamp_column") | |
| target_cols = report.get("target_columns", []) | |
| freq = report.get("inferred_frequency") or "Unknown" | |
| lines.append(f"Timestamp column: {timestamp_col or 'Not detected'}") | |
| lines.append(f"Targets: {', '.join(target_cols) if target_cols else 'None'}") | |
| lines.append(f"Inferred frequency: {freq}") | |
| errors = report.get("errors", []) | |
| warnings = report.get("warnings", []) | |
| if errors: | |
| lines.append("Errors:") | |
| lines.extend(f"- {msg}" for msg in errors) | |
| if warnings: | |
| lines.append("Warnings:") | |
| lines.extend(f"- {msg}" for msg in warnings) | |
| return "\n".join(lines) | |
| def preprocess_for_model( | |
| df: pd.DataFrame, | |
| model_name: str, | |
| prediction_length: int, | |
| timestamp_col: str | None = None, | |
| target_cols: list[str] | None = None, | |
| interpolate_missing: bool = True, | |
| ) -> dict[str, Any]: | |
| timestamp_col = timestamp_col or detect_timestamp_column(df) | |
| if not timestamp_col: | |
| raise ValueError("Timestamp column is required for preprocessing.") | |
| if target_cols is None or len(target_cols) == 0: | |
| target_cols = detect_numeric_columns(df, exclude=[timestamp_col]) | |
| if not target_cols: | |
| raise ValueError("At least one numeric target column is required.") | |
| prepared = df.copy() | |
| prepared[timestamp_col] = pd.to_datetime(prepared[timestamp_col], errors="coerce") | |
| prepared = prepared.dropna(subset=[timestamp_col]) | |
| prepared = prepared.sort_values(timestamp_col) | |
| for col in target_cols: | |
| prepared[col] = pd.to_numeric(prepared[col], errors="coerce") | |
| if interpolate_missing: | |
| prepared[target_cols] = prepared[target_cols].interpolate(limit_direction="both") | |
| prepared[target_cols] = prepared[target_cols].ffill().bfill() | |
| if prepared[target_cols].isna().any().any(): | |
| raise ValueError("Target columns still contain NaN values after preprocessing.") | |
| frequency = infer_frequency(prepared, timestamp_col) | |
| context_df = prepared[[timestamp_col, *target_cols]].copy() | |
| context_df = context_df.set_index(timestamp_col) | |
| return { | |
| "model_name": model_name, | |
| "context": context_df, | |
| "timestamp_col": timestamp_col, | |
| "target_cols": target_cols, | |
| "frequency": frequency, | |
| "prediction_length": prediction_length, | |
| "is_multivariate": len(target_cols) > 1, | |
| } | |
| def generate_future_index( | |
| last_timestamp: pd.Timestamp, | |
| prediction_length: int, | |
| frequency: str | None, | |
| ) -> pd.DatetimeIndex: | |
| freq = frequency or "D" | |
| try: | |
| return pd.date_range(start=last_timestamp, periods=prediction_length + 1, freq=freq)[1:] | |
| except ValueError: | |
| return pd.date_range(start=last_timestamp, periods=prediction_length + 1, freq="D")[1:] | |