Spaces:
No application file
No application file
| # TimeSeriesHomework/src/lab3_functions.py | |
| """ | |
| Набор вспомогательных функций для ЛР3: | |
| - обёртки для SARIMAX, auto_arima (pmdarima), VAR, GARCH (arch) и т.п. | |
| - forecast helpers | |
| - простые метрики | |
| Файл не использует абсолютных путей и предназначен для импорта в проекте. | |
| """ | |
| import warnings | |
| warnings.filterwarnings("ignore") | |
| from typing import Tuple, Dict, Any, Optional, List | |
| import numpy as np | |
| import pandas as pd | |
| # optional heavy deps | |
| try: | |
| import pmdarima as pm | |
| PM_AVAILABLE = True | |
| except Exception: | |
| PM_AVAILABLE = False | |
| try: | |
| from statsmodels.tsa.statespace.sarimax import SARIMAX | |
| from statsmodels.tsa.api import VAR | |
| STATSMODELS_AVAILABLE = True | |
| except Exception: | |
| STATSMODELS_AVAILABLE = False | |
| try: | |
| from arch import arch_model | |
| ARCH_AVAILABLE = True | |
| except Exception: | |
| ARCH_AVAILABLE = False | |
| # sklearn metrics used for convenience (optional) | |
| try: | |
| from sklearn.metrics import mean_absolute_error, mean_squared_error | |
| SKLEARN_AVAILABLE = True | |
| except Exception: | |
| SKLEARN_AVAILABLE = False | |
| def is_pandas_series(x: Any) -> bool: | |
| return isinstance(x, (pd.Series,)) | |
| def mae_rmse(y_true, y_pred) -> Dict[str, float]: | |
| y_true = np.array(y_true) | |
| y_pred = np.array(y_pred) | |
| if SKLEARN_AVAILABLE: | |
| mae = float(mean_absolute_error(y_true, y_pred)) | |
| rmse = float(np.sqrt(mean_squared_error(y_true, y_pred))) | |
| else: | |
| mae = float(np.mean(np.abs(y_true - y_pred))) | |
| rmse = float(np.sqrt(np.mean((y_true - y_pred) ** 2))) | |
| return {"MAE": mae, "RMSE": rmse} | |
| def fit_auto_arima(series: pd.Series, seasonal: bool = False, m: int = 1, **kwargs): | |
| """ | |
| Подбор ARIMA через pmdarima.auto_arima. Возвращает обученную модель pmdarima. | |
| """ | |
| if not PM_AVAILABLE: | |
| raise ImportError("pmdarima не установлен. Установите pmdarima (pip install pmdarima).") | |
| if not is_pandas_series(series): | |
| series = pd.Series(series) | |
| series_clean = series.dropna() | |
| if series_clean.empty: | |
| raise ValueError("Пустая серия передана в fit_auto_arima.") | |
| model = pm.auto_arima(series_clean, seasonal=seasonal, m=m, error_action="ignore", suppress_warnings=True, **kwargs) | |
| return model | |
| def fit_sarimax(series: pd.Series, order: Tuple[int, int, int] = (1, 0, 0), | |
| seasonal_order: Tuple[int, int, int, int] = (0, 0, 0, 0), | |
| enforce_stationarity: bool = False, enforce_invertibility: bool = True, **fit_kwargs): | |
| """ | |
| Обучает SARIMAX (statsmodels). Возвращает результат fit() (SARIMAXResults). | |
| """ | |
| if not STATSMODELS_AVAILABLE: | |
| raise ImportError("statsmodels не установлен. Установите statsmodels.") | |
| if not is_pandas_series(series): | |
| series = pd.Series(series) | |
| series_clean = series.dropna() | |
| if series_clean.empty: | |
| raise ValueError("Пустая серия передана в fit_sarimax.") | |
| model = SARIMAX(series_clean, order=order, seasonal_order=seasonal_order, | |
| enforce_stationarity=enforce_stationarity, enforce_invertibility=enforce_invertibility) | |
| res = model.fit(disp=False, **fit_kwargs) | |
| return res | |
| def forecast_sarimax(fit_res, steps: int, alpha: float = 0.05) -> Tuple[np.ndarray, Tuple[np.ndarray, np.ndarray]]: | |
| """ | |
| Делает прогноз из обученного SARIMAX-результата (res.get_forecast). | |
| Возвращает (mean, (lower, upper)) — numpy arrays длины steps. | |
| """ | |
| if hasattr(fit_res, "get_forecast"): | |
| fc = fit_res.get_forecast(steps=steps) | |
| mean = np.asarray(fc.predicted_mean) | |
| try: | |
| conf = fc.conf_int(alpha=alpha) | |
| lower = np.asarray(conf.iloc[:, 0]) | |
| upper = np.asarray(conf.iloc[:, 1]) | |
| except Exception: | |
| lower = np.full(len(mean), np.nan) | |
| upper = np.full(len(mean), np.nan) | |
| return mean, (lower, upper) | |
| else: | |
| # fallback на forecast | |
| try: | |
| f = fit_res.forecast(steps=steps) | |
| mean = np.asarray(f) | |
| lower = np.full(len(mean), np.nan) | |
| upper = np.full(len(mean), np.nan) | |
| return mean, (lower, upper) | |
| except Exception as e: | |
| raise ValueError(f"Не удалось получить прогноз из объекта результата: {e}") | |
| def fit_var(df: pd.DataFrame, maxlags: int = 15): | |
| """ | |
| Обучает VAR на multivariate dataframe (pandas DataFrame). Возвращает fitted VARResults. | |
| """ | |
| if not STATSMODELS_AVAILABLE: | |
| raise ImportError("statsmodels не установлен. Установите statsmodels.") | |
| if not isinstance(df, pd.DataFrame): | |
| raise ValueError("fit_var ожидает pd.DataFrame с несколькими числовыми колонками.") | |
| df_clean = df.dropna() | |
| if df_clean.shape[0] < 3: | |
| raise ValueError("Недостаточно наблюдений для VAR.") | |
| model = VAR(df_clean) | |
| sel = model.select_order(maxlags=maxlags) | |
| best_lag = None | |
| try: | |
| if hasattr(sel, "selected_orders"): | |
| so = sel.selected_orders | |
| for k in ("aic", "bic", "fpe", "hqic"): | |
| val = so.get(k, None) | |
| if val is not None: | |
| best_lag = int(val) | |
| break | |
| except Exception: | |
| best_lag = None | |
| if best_lag is None or best_lag < 1: | |
| best_lag = 1 | |
| fitted = model.fit(maxlags=best_lag) | |
| return fitted | |
| def forecast_var(fitted_var, steps: int) -> pd.DataFrame: | |
| """ | |
| Multi-step forecasting for VARResults. Возвращает DataFrame прогнозов (columns = variables). | |
| """ | |
| try: | |
| forecast = fitted_var.forecast(fitted_var.endog[-fitted_var.k_ar:], steps=steps) | |
| cols = fitted_var.names | |
| idx = range(1, steps + 1) | |
| return pd.DataFrame(forecast, columns=cols, index=idx) | |
| except Exception as e: | |
| raise ValueError(f"Ошибка при прогнозе VAR: {e}") | |
| def fit_garch(series: pd.Series, p: int = 1, q: int = 1): | |
| """ | |
| Обучает GARCH(p,q) (arch package). Возвращает объект результата fit() из arch. | |
| """ | |
| if not ARCH_AVAILABLE: | |
| raise ImportError("arch не установлен. Установите arch (pip install arch).") | |
| if not is_pandas_series(series): | |
| series = pd.Series(series) | |
| series_clean = series.dropna() | |
| if series_clean.empty: | |
| raise ValueError("Пустая серия передана в fit_garch.") | |
| am = arch_model(series_clean, vol="Garch", p=p, q=q, dist="normal") | |
| res = am.fit(disp="off") | |
| return res | |
| def safe_summary(obj) -> str: | |
| try: | |
| return str(obj.summary()) | |
| except Exception: | |
| return repr(obj) | |
| # краткий тест при запуске модуля напрямую | |
| if __name__ == "__main__": | |
| print("lab3_functions: доступные функции:", | |
| [n for n in dir() if n.startswith("fit_") or n.startswith("forecast_")]) | |