TimeSeriesHomework1_2 / src /lab3_functions.py
Kolesnikov Dmitry
feat: Sarima и VAR для третей лабораторки
b34a74f
# TimeSeriesHomework/src/lab3_functions.py
"""
Набор вспомогательных функций для ЛР3:
- обёртки для SARIMAX, auto_arima (pmdarima), VAR, GARCH (arch) и т.п.
- forecast helpers
- простые метрики
Файл не использует абсолютных путей и предназначен для импорта в проекте.
"""
import warnings
warnings.filterwarnings("ignore")
from typing import Tuple, Dict, Any, Optional, List
import numpy as np
import pandas as pd
# optional heavy deps
try:
import pmdarima as pm
PM_AVAILABLE = True
except Exception:
PM_AVAILABLE = False
try:
from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.tsa.api import VAR
STATSMODELS_AVAILABLE = True
except Exception:
STATSMODELS_AVAILABLE = False
try:
from arch import arch_model
ARCH_AVAILABLE = True
except Exception:
ARCH_AVAILABLE = False
# sklearn metrics used for convenience (optional)
try:
from sklearn.metrics import mean_absolute_error, mean_squared_error
SKLEARN_AVAILABLE = True
except Exception:
SKLEARN_AVAILABLE = False
def is_pandas_series(x: Any) -> bool:
return isinstance(x, (pd.Series,))
def mae_rmse(y_true, y_pred) -> Dict[str, float]:
y_true = np.array(y_true)
y_pred = np.array(y_pred)
if SKLEARN_AVAILABLE:
mae = float(mean_absolute_error(y_true, y_pred))
rmse = float(np.sqrt(mean_squared_error(y_true, y_pred)))
else:
mae = float(np.mean(np.abs(y_true - y_pred)))
rmse = float(np.sqrt(np.mean((y_true - y_pred) ** 2)))
return {"MAE": mae, "RMSE": rmse}
def fit_auto_arima(series: pd.Series, seasonal: bool = False, m: int = 1, **kwargs):
"""
Подбор ARIMA через pmdarima.auto_arima. Возвращает обученную модель pmdarima.
"""
if not PM_AVAILABLE:
raise ImportError("pmdarima не установлен. Установите pmdarima (pip install pmdarima).")
if not is_pandas_series(series):
series = pd.Series(series)
series_clean = series.dropna()
if series_clean.empty:
raise ValueError("Пустая серия передана в fit_auto_arima.")
model = pm.auto_arima(series_clean, seasonal=seasonal, m=m, error_action="ignore", suppress_warnings=True, **kwargs)
return model
def fit_sarimax(series: pd.Series, order: Tuple[int, int, int] = (1, 0, 0),
seasonal_order: Tuple[int, int, int, int] = (0, 0, 0, 0),
enforce_stationarity: bool = False, enforce_invertibility: bool = True, **fit_kwargs):
"""
Обучает SARIMAX (statsmodels). Возвращает результат fit() (SARIMAXResults).
"""
if not STATSMODELS_AVAILABLE:
raise ImportError("statsmodels не установлен. Установите statsmodels.")
if not is_pandas_series(series):
series = pd.Series(series)
series_clean = series.dropna()
if series_clean.empty:
raise ValueError("Пустая серия передана в fit_sarimax.")
model = SARIMAX(series_clean, order=order, seasonal_order=seasonal_order,
enforce_stationarity=enforce_stationarity, enforce_invertibility=enforce_invertibility)
res = model.fit(disp=False, **fit_kwargs)
return res
def forecast_sarimax(fit_res, steps: int, alpha: float = 0.05) -> Tuple[np.ndarray, Tuple[np.ndarray, np.ndarray]]:
"""
Делает прогноз из обученного SARIMAX-результата (res.get_forecast).
Возвращает (mean, (lower, upper)) — numpy arrays длины steps.
"""
if hasattr(fit_res, "get_forecast"):
fc = fit_res.get_forecast(steps=steps)
mean = np.asarray(fc.predicted_mean)
try:
conf = fc.conf_int(alpha=alpha)
lower = np.asarray(conf.iloc[:, 0])
upper = np.asarray(conf.iloc[:, 1])
except Exception:
lower = np.full(len(mean), np.nan)
upper = np.full(len(mean), np.nan)
return mean, (lower, upper)
else:
# fallback на forecast
try:
f = fit_res.forecast(steps=steps)
mean = np.asarray(f)
lower = np.full(len(mean), np.nan)
upper = np.full(len(mean), np.nan)
return mean, (lower, upper)
except Exception as e:
raise ValueError(f"Не удалось получить прогноз из объекта результата: {e}")
def fit_var(df: pd.DataFrame, maxlags: int = 15):
"""
Обучает VAR на multivariate dataframe (pandas DataFrame). Возвращает fitted VARResults.
"""
if not STATSMODELS_AVAILABLE:
raise ImportError("statsmodels не установлен. Установите statsmodels.")
if not isinstance(df, pd.DataFrame):
raise ValueError("fit_var ожидает pd.DataFrame с несколькими числовыми колонками.")
df_clean = df.dropna()
if df_clean.shape[0] < 3:
raise ValueError("Недостаточно наблюдений для VAR.")
model = VAR(df_clean)
sel = model.select_order(maxlags=maxlags)
best_lag = None
try:
if hasattr(sel, "selected_orders"):
so = sel.selected_orders
for k in ("aic", "bic", "fpe", "hqic"):
val = so.get(k, None)
if val is not None:
best_lag = int(val)
break
except Exception:
best_lag = None
if best_lag is None or best_lag < 1:
best_lag = 1
fitted = model.fit(maxlags=best_lag)
return fitted
def forecast_var(fitted_var, steps: int) -> pd.DataFrame:
"""
Multi-step forecasting for VARResults. Возвращает DataFrame прогнозов (columns = variables).
"""
try:
forecast = fitted_var.forecast(fitted_var.endog[-fitted_var.k_ar:], steps=steps)
cols = fitted_var.names
idx = range(1, steps + 1)
return pd.DataFrame(forecast, columns=cols, index=idx)
except Exception as e:
raise ValueError(f"Ошибка при прогнозе VAR: {e}")
def fit_garch(series: pd.Series, p: int = 1, q: int = 1):
"""
Обучает GARCH(p,q) (arch package). Возвращает объект результата fit() из arch.
"""
if not ARCH_AVAILABLE:
raise ImportError("arch не установлен. Установите arch (pip install arch).")
if not is_pandas_series(series):
series = pd.Series(series)
series_clean = series.dropna()
if series_clean.empty:
raise ValueError("Пустая серия передана в fit_garch.")
am = arch_model(series_clean, vol="Garch", p=p, q=q, dist="normal")
res = am.fit(disp="off")
return res
def safe_summary(obj) -> str:
try:
return str(obj.summary())
except Exception:
return repr(obj)
# краткий тест при запуске модуля напрямую
if __name__ == "__main__":
print("lab3_functions: доступные функции:",
[n for n in dir() if n.startswith("fit_") or n.startswith("forecast_")])