walidhadri's picture
Initial HF Space app
84f224f
from __future__ import annotations
from pathlib import Path
from typing import Any, Iterable
import pandas as pd
TIMESTAMP_NAME_HINTS = ("timestamp", "time", "date", "datetime", "ds")
def _resolve_file_path(file_obj: Any) -> Path:
if file_obj is None:
raise ValueError("No file provided.")
if isinstance(file_obj, (str, Path)):
return Path(file_obj)
if isinstance(file_obj, dict):
maybe_path = file_obj.get("path") or file_obj.get("name")
if maybe_path:
return Path(maybe_path)
maybe_name = getattr(file_obj, "name", None)
if maybe_name:
return Path(maybe_name)
raise ValueError("Unsupported file object type.")
def load_timeseries_from_file(file_obj: Any) -> pd.DataFrame:
file_path = _resolve_file_path(file_obj)
suffix = file_path.suffix.lower()
if suffix == ".csv":
df = pd.read_csv(file_path)
elif suffix in {".xlsx", ".xls"}:
df = pd.read_excel(file_path)
else:
raise ValueError(f"Unsupported file extension: {suffix}")
if df.empty:
raise ValueError("Uploaded file is empty.")
df.columns = [str(col).strip() for col in df.columns]
timestamp_col = detect_timestamp_column(df)
if timestamp_col:
df[timestamp_col] = pd.to_datetime(df[timestamp_col], errors="coerce")
return df
def detect_timestamp_column(df: pd.DataFrame) -> str | None:
for col in df.columns:
if pd.api.types.is_datetime64_any_dtype(df[col]):
return col
for col in df.columns:
normalized = str(col).strip().lower()
if any(hint in normalized for hint in TIMESTAMP_NAME_HINTS):
parsed = pd.to_datetime(df[col], errors="coerce")
if parsed.notna().mean() >= 0.7:
return col
for col in df.columns:
series = df[col]
if pd.api.types.is_numeric_dtype(series):
continue
parsed = pd.to_datetime(series, errors="coerce")
if parsed.notna().mean() >= 0.9:
return col
return None
def detect_numeric_columns(df: pd.DataFrame, exclude: Iterable[str] | None = None) -> list[str]:
excluded = set(exclude or [])
numeric_cols: list[str] = []
for col in df.columns:
if col in excluded:
continue
if pd.api.types.is_numeric_dtype(df[col]):
numeric_cols.append(col)
return numeric_cols
def infer_frequency(df: pd.DataFrame, timestamp_col: str | None) -> str | None:
if not timestamp_col or timestamp_col not in df.columns:
return None
timestamps = pd.to_datetime(df[timestamp_col], errors="coerce").dropna().sort_values()
timestamps = timestamps.drop_duplicates()
if len(timestamps) < 3:
return None
inferred = pd.infer_freq(timestamps)
if inferred:
return inferred
deltas = timestamps.diff().dropna()
if deltas.empty:
return None
mode_delta = deltas.mode()
if mode_delta.empty:
return None
try:
return pd.tseries.frequencies.to_offset(mode_delta.iloc[0]).freqstr
except ValueError:
return None
def validate_timeseries(
df: pd.DataFrame,
timestamp_col: str | None = None,
target_cols: list[str] | None = None,
min_length: int = 20,
) -> dict[str, Any]:
report: dict[str, Any] = {
"is_valid": True,
"errors": [],
"warnings": [],
"timestamp_column": timestamp_col,
"target_columns": target_cols or [],
"missing_summary": {},
"inferred_frequency": None,
}
if df.empty:
report["errors"].append("Dataset is empty.")
report["is_valid"] = False
return report
timestamp_col = timestamp_col or detect_timestamp_column(df)
report["timestamp_column"] = timestamp_col
if not timestamp_col:
report["errors"].append("Could not detect a timestamp column.")
elif timestamp_col not in df.columns:
report["errors"].append(f"Timestamp column '{timestamp_col}' not found.")
else:
ts = pd.to_datetime(df[timestamp_col], errors="coerce")
invalid_rate = 1.0 - ts.notna().mean()
if invalid_rate > 0:
report["warnings"].append(
f"{invalid_rate:.1%} of timestamp values could not be parsed."
)
inferred = infer_frequency(df, timestamp_col)
report["inferred_frequency"] = inferred
if not inferred:
report["warnings"].append(
"Could not infer a regular frequency; forecasting still runs with best effort."
)
else:
sorted_ts = ts.dropna().sort_values().drop_duplicates()
if len(sorted_ts) >= 3:
diffs = sorted_ts.diff().dropna()
if diffs.nunique() > 1:
report["warnings"].append(
"Timestamp intervals are irregular; model accuracy may degrade."
)
if target_cols is None:
target_cols = detect_numeric_columns(df, exclude=[timestamp_col] if timestamp_col else None)
report["target_columns"] = target_cols
if not target_cols:
report["errors"].append("No numeric target columns found.")
else:
for col in target_cols:
if col not in df.columns:
report["errors"].append(f"Target column '{col}' not found.")
continue
missing = int(df[col].isna().sum())
report["missing_summary"][col] = missing
if missing > 0:
report["warnings"].append(
f"Target '{col}' contains {missing} missing values (interpolation suggested)."
)
if len(df) < min_length:
report["warnings"].append(
f"Series length ({len(df)}) is short; {min_length}+ points is recommended."
)
if report["errors"]:
report["is_valid"] = False
return report
def format_validation_report(report: dict[str, Any]) -> str:
lines: list[str] = []
status = "Valid" if report.get("is_valid") else "Invalid"
lines.append(f"Status: {status}")
timestamp_col = report.get("timestamp_column")
target_cols = report.get("target_columns", [])
freq = report.get("inferred_frequency") or "Unknown"
lines.append(f"Timestamp column: {timestamp_col or 'Not detected'}")
lines.append(f"Targets: {', '.join(target_cols) if target_cols else 'None'}")
lines.append(f"Inferred frequency: {freq}")
errors = report.get("errors", [])
warnings = report.get("warnings", [])
if errors:
lines.append("Errors:")
lines.extend(f"- {msg}" for msg in errors)
if warnings:
lines.append("Warnings:")
lines.extend(f"- {msg}" for msg in warnings)
return "\n".join(lines)
def preprocess_for_model(
df: pd.DataFrame,
model_name: str,
prediction_length: int,
timestamp_col: str | None = None,
target_cols: list[str] | None = None,
interpolate_missing: bool = True,
) -> dict[str, Any]:
timestamp_col = timestamp_col or detect_timestamp_column(df)
if not timestamp_col:
raise ValueError("Timestamp column is required for preprocessing.")
if target_cols is None or len(target_cols) == 0:
target_cols = detect_numeric_columns(df, exclude=[timestamp_col])
if not target_cols:
raise ValueError("At least one numeric target column is required.")
prepared = df.copy()
prepared[timestamp_col] = pd.to_datetime(prepared[timestamp_col], errors="coerce")
prepared = prepared.dropna(subset=[timestamp_col])
prepared = prepared.sort_values(timestamp_col)
for col in target_cols:
prepared[col] = pd.to_numeric(prepared[col], errors="coerce")
if interpolate_missing:
prepared[target_cols] = prepared[target_cols].interpolate(limit_direction="both")
prepared[target_cols] = prepared[target_cols].ffill().bfill()
if prepared[target_cols].isna().any().any():
raise ValueError("Target columns still contain NaN values after preprocessing.")
frequency = infer_frequency(prepared, timestamp_col)
context_df = prepared[[timestamp_col, *target_cols]].copy()
context_df = context_df.set_index(timestamp_col)
return {
"model_name": model_name,
"context": context_df,
"timestamp_col": timestamp_col,
"target_cols": target_cols,
"frequency": frequency,
"prediction_length": prediction_length,
"is_multivariate": len(target_cols) > 1,
}
def generate_future_index(
last_timestamp: pd.Timestamp,
prediction_length: int,
frequency: str | None,
) -> pd.DatetimeIndex:
freq = frequency or "D"
try:
return pd.date_range(start=last_timestamp, periods=prediction_length + 1, freq=freq)[1:]
except ValueError:
return pd.date_range(start=last_timestamp, periods=prediction_length + 1, freq="D")[1:]