from __future__ import annotations from pathlib import Path from typing import Any, Iterable import pandas as pd TIMESTAMP_NAME_HINTS = ("timestamp", "time", "date", "datetime", "ds") def _resolve_file_path(file_obj: Any) -> Path: if file_obj is None: raise ValueError("No file provided.") if isinstance(file_obj, (str, Path)): return Path(file_obj) if isinstance(file_obj, dict): maybe_path = file_obj.get("path") or file_obj.get("name") if maybe_path: return Path(maybe_path) maybe_name = getattr(file_obj, "name", None) if maybe_name: return Path(maybe_name) raise ValueError("Unsupported file object type.") def load_timeseries_from_file(file_obj: Any) -> pd.DataFrame: file_path = _resolve_file_path(file_obj) suffix = file_path.suffix.lower() if suffix == ".csv": df = pd.read_csv(file_path) elif suffix in {".xlsx", ".xls"}: df = pd.read_excel(file_path) else: raise ValueError(f"Unsupported file extension: {suffix}") if df.empty: raise ValueError("Uploaded file is empty.") df.columns = [str(col).strip() for col in df.columns] timestamp_col = detect_timestamp_column(df) if timestamp_col: df[timestamp_col] = pd.to_datetime(df[timestamp_col], errors="coerce") return df def detect_timestamp_column(df: pd.DataFrame) -> str | None: for col in df.columns: if pd.api.types.is_datetime64_any_dtype(df[col]): return col for col in df.columns: normalized = str(col).strip().lower() if any(hint in normalized for hint in TIMESTAMP_NAME_HINTS): parsed = pd.to_datetime(df[col], errors="coerce") if parsed.notna().mean() >= 0.7: return col for col in df.columns: series = df[col] if pd.api.types.is_numeric_dtype(series): continue parsed = pd.to_datetime(series, errors="coerce") if parsed.notna().mean() >= 0.9: return col return None def detect_numeric_columns(df: pd.DataFrame, exclude: Iterable[str] | None = None) -> list[str]: excluded = set(exclude or []) numeric_cols: list[str] = [] for col in df.columns: if col in excluded: continue if pd.api.types.is_numeric_dtype(df[col]): numeric_cols.append(col) return numeric_cols def infer_frequency(df: pd.DataFrame, timestamp_col: str | None) -> str | None: if not timestamp_col or timestamp_col not in df.columns: return None timestamps = pd.to_datetime(df[timestamp_col], errors="coerce").dropna().sort_values() timestamps = timestamps.drop_duplicates() if len(timestamps) < 3: return None inferred = pd.infer_freq(timestamps) if inferred: return inferred deltas = timestamps.diff().dropna() if deltas.empty: return None mode_delta = deltas.mode() if mode_delta.empty: return None try: return pd.tseries.frequencies.to_offset(mode_delta.iloc[0]).freqstr except ValueError: return None def validate_timeseries( df: pd.DataFrame, timestamp_col: str | None = None, target_cols: list[str] | None = None, min_length: int = 20, ) -> dict[str, Any]: report: dict[str, Any] = { "is_valid": True, "errors": [], "warnings": [], "timestamp_column": timestamp_col, "target_columns": target_cols or [], "missing_summary": {}, "inferred_frequency": None, } if df.empty: report["errors"].append("Dataset is empty.") report["is_valid"] = False return report timestamp_col = timestamp_col or detect_timestamp_column(df) report["timestamp_column"] = timestamp_col if not timestamp_col: report["errors"].append("Could not detect a timestamp column.") elif timestamp_col not in df.columns: report["errors"].append(f"Timestamp column '{timestamp_col}' not found.") else: ts = pd.to_datetime(df[timestamp_col], errors="coerce") invalid_rate = 1.0 - ts.notna().mean() if invalid_rate > 0: report["warnings"].append( f"{invalid_rate:.1%} of timestamp values could not be parsed." ) inferred = infer_frequency(df, timestamp_col) report["inferred_frequency"] = inferred if not inferred: report["warnings"].append( "Could not infer a regular frequency; forecasting still runs with best effort." ) else: sorted_ts = ts.dropna().sort_values().drop_duplicates() if len(sorted_ts) >= 3: diffs = sorted_ts.diff().dropna() if diffs.nunique() > 1: report["warnings"].append( "Timestamp intervals are irregular; model accuracy may degrade." ) if target_cols is None: target_cols = detect_numeric_columns(df, exclude=[timestamp_col] if timestamp_col else None) report["target_columns"] = target_cols if not target_cols: report["errors"].append("No numeric target columns found.") else: for col in target_cols: if col not in df.columns: report["errors"].append(f"Target column '{col}' not found.") continue missing = int(df[col].isna().sum()) report["missing_summary"][col] = missing if missing > 0: report["warnings"].append( f"Target '{col}' contains {missing} missing values (interpolation suggested)." ) if len(df) < min_length: report["warnings"].append( f"Series length ({len(df)}) is short; {min_length}+ points is recommended." ) if report["errors"]: report["is_valid"] = False return report def format_validation_report(report: dict[str, Any]) -> str: lines: list[str] = [] status = "Valid" if report.get("is_valid") else "Invalid" lines.append(f"Status: {status}") timestamp_col = report.get("timestamp_column") target_cols = report.get("target_columns", []) freq = report.get("inferred_frequency") or "Unknown" lines.append(f"Timestamp column: {timestamp_col or 'Not detected'}") lines.append(f"Targets: {', '.join(target_cols) if target_cols else 'None'}") lines.append(f"Inferred frequency: {freq}") errors = report.get("errors", []) warnings = report.get("warnings", []) if errors: lines.append("Errors:") lines.extend(f"- {msg}" for msg in errors) if warnings: lines.append("Warnings:") lines.extend(f"- {msg}" for msg in warnings) return "\n".join(lines) def preprocess_for_model( df: pd.DataFrame, model_name: str, prediction_length: int, timestamp_col: str | None = None, target_cols: list[str] | None = None, interpolate_missing: bool = True, ) -> dict[str, Any]: timestamp_col = timestamp_col or detect_timestamp_column(df) if not timestamp_col: raise ValueError("Timestamp column is required for preprocessing.") if target_cols is None or len(target_cols) == 0: target_cols = detect_numeric_columns(df, exclude=[timestamp_col]) if not target_cols: raise ValueError("At least one numeric target column is required.") prepared = df.copy() prepared[timestamp_col] = pd.to_datetime(prepared[timestamp_col], errors="coerce") prepared = prepared.dropna(subset=[timestamp_col]) prepared = prepared.sort_values(timestamp_col) for col in target_cols: prepared[col] = pd.to_numeric(prepared[col], errors="coerce") if interpolate_missing: prepared[target_cols] = prepared[target_cols].interpolate(limit_direction="both") prepared[target_cols] = prepared[target_cols].ffill().bfill() if prepared[target_cols].isna().any().any(): raise ValueError("Target columns still contain NaN values after preprocessing.") frequency = infer_frequency(prepared, timestamp_col) context_df = prepared[[timestamp_col, *target_cols]].copy() context_df = context_df.set_index(timestamp_col) return { "model_name": model_name, "context": context_df, "timestamp_col": timestamp_col, "target_cols": target_cols, "frequency": frequency, "prediction_length": prediction_length, "is_multivariate": len(target_cols) > 1, } def generate_future_index( last_timestamp: pd.Timestamp, prediction_length: int, frequency: str | None, ) -> pd.DatetimeIndex: freq = frequency or "D" try: return pd.date_range(start=last_timestamp, periods=prediction_length + 1, freq=freq)[1:] except ValueError: return pd.date_range(start=last_timestamp, periods=prediction_length + 1, freq="D")[1:]