TimeSeriesHomework1_2 / src /lab3_pipeline.py
Kolesnikov Dmitry
feat: Красивые графики
6e8ed89
# lab3_pipeline.py
"""
Полный pipeline для ЛР №3 (выполнение пунктов 3.1-3.8).
Сохраняйте файл в папке src проекта (TimeSeriesHomework/src/lab3_pipeline.py).
"""
import os
import math
import time
from typing import List, Dict, Any, Optional, Tuple
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
# statsmodels и основные тесты
try:
from statsmodels.tsa.stattools import adfuller, kpss
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.stats.diagnostic import acorr_ljungbox
from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.tsa.api import VAR
STATSMODELS_AVAILABLE = True
except Exception as e:
STATSMODELS_AVAILABLE = False
print("warning: statsmodels not available:", e)
# optional heavy deps
try:
import pmdarima as pm
PM_AVAILABLE = True
except Exception:
PM_AVAILABLE = False
try:
from arch import arch_model
ARCH_AVAILABLE = True
except Exception:
ARCH_AVAILABLE = False
try:
from prophet import Prophet
PROPHET_AVAILABLE = True
except Exception:
PROPHET_AVAILABLE = False
# sklearn
try:
from sklearn.model_selection import TimeSeriesSplit
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score
SKLEARN_AVAILABLE = True
except Exception:
SKLEARN_AVAILABLE = False
# scipy (Box-Cox, Shapiro)
try:
from scipy.stats import boxcox, boxcox_normmax, shapiro
SCIPY_AVAILABLE = True
except Exception:
SCIPY_AVAILABLE = False
try:
from tbats import TBATS
TBATS_AVAILABLE = True
except ImportError:
TBATS_AVAILABLE = False
# -------------------------------------------------------------------------
# Metrics
# -------------------------------------------------------------------------
def mae(y_true, y_pred): return np.mean(np.abs(y_true - y_pred))
def rmse(y_true, y_pred): return math.sqrt(np.mean((y_true - y_pred) ** 2))
def mape(y_true, y_pred): return np.mean(np.abs((y_true - y_pred) / (y_true + 1e-9))) * 100.0
def smape(y_true, y_pred): return 100.0 * np.mean(
2.0 * np.abs(y_pred - y_true) / (np.abs(y_true) + np.abs(y_pred) + 1e-9))
def rmsle(y_true, y_pred): return math.sqrt(np.mean((np.log1p(y_pred) - np.log1p(y_true)) ** 2))
def mase(y_true, y_pred, naive_ref):
# naive_ref: series used to compute naive diff (e.g. train series)
denom = np.mean(np.abs(np.diff(naive_ref)))
if denom == 0:
return np.nan
return np.mean(np.abs(y_true - y_pred)) / denom
# -------------------------------------------------------------------------
# IO & preprocessing utilities
# -------------------------------------------------------------------------
def load_data(path: str, timestamp_col: str = 'timestamp', tz: Optional[str] = None) -> pd.DataFrame:
if path.endswith('.parquet'):
df = pd.read_parquet(path)
else:
df = pd.read_csv(path)
if timestamp_col not in df.columns:
raise ValueError(f"timestamp column '{timestamp_col}' not found")
df[timestamp_col] = pd.to_datetime(df[timestamp_col], errors='coerce')
if tz is not None:
try:
df[timestamp_col] = df[timestamp_col].dt.tz_localize(tz)
except Exception:
try:
df[timestamp_col] = df[timestamp_col].dt.tz_convert(tz)
except Exception:
pass
df = df.sort_values(timestamp_col).drop_duplicates(subset=[timestamp_col])
df = df.set_index(timestamp_col)
return df
def resample_and_interpolate(df: pd.DataFrame, freq: str = 'D', method: str = 'linear') -> pd.DataFrame:
dfr = df.resample(freq).asfreq()
if method == 'linear':
return dfr.interpolate(method='linear')
elif method == 'ffill':
return dfr.fillna(method='ffill')
else:
return dfr.fillna(method='ffill').interpolate()
# -------------------------------------------------------------------------
# Transformations and stationarity selection
# -------------------------------------------------------------------------
def test_stationarity_pair(series: pd.Series) -> Dict[str, Dict[str, Any]]:
"""Возвращает результаты ADF и KPSS"""
res = {}
if not STATSMODELS_AVAILABLE:
raise ImportError("statsmodels required for stationarity tests")
s = series.dropna()
if len(s) < 3:
return {'adf': {'pvalue': np.nan}, 'kpss': {'pvalue': np.nan}}
adf_res = adfuller(s, autolag='AIC', regression='c')
kpss_res = kpss(s, nlags='auto')
return {'adf': {'stat': adf_res[0], 'pvalue': adf_res[1]}, 'kpss': {'stat': kpss_res[0], 'pvalue': kpss_res[1]}}
def try_transformations_and_choose(y_train: pd.Series, seasonal_period: int = 7):
"""
Пробуем набор преобразований:
- none
- log (если >0)
- boxcox (если >0 и scipy доступен)
- diff(1), diff(s), diff(1).diff(s)
Выбираем ту комбинацию, которая минимизирует конфликт ADF/KPSS:
критерий: ADF.pvalue < 0.05 (хочется) и KPSS.pvalue > 0.05 (хочется).
Возвращаем: transformed_series, meta dict (applied transformations, lambda)
"""
candidates = []
# original
candidates.append(('none', y_train))
# log
if (y_train > 0).all():
candidates.append(('log', np.log(y_train)))
# boxcox
if (y_train > 0).all() and SCIPY_AVAILABLE:
try:
lam = boxcox_normmax(y_train.dropna(), brack=(-2, 2))
bc, _ = apply_boxcox(y_train, lmbda=lam)
candidates.append((f'boxcox_{lam:.4f}', bc))
except Exception:
pass
# differenced versions
# diff1 of original or of transformed series
final_candidates = []
for name, ser in candidates:
ser_clean = ser.dropna()
final_candidates.append((name, 0, ser_clean)) # 0 differences
if len(ser_clean) > 3:
final_candidates.append((name, 1, ser_clean.diff(1).dropna()))
if seasonal_period and len(ser_clean) > seasonal_period + 3:
final_candidates.append((name, seasonal_period, ser_clean.diff(seasonal_period).dropna()))
final_candidates.append(
(name, ('1+s', seasonal_period), ser_clean.diff(1).diff(seasonal_period).dropna()))
# Evaluate candidates
scored = []
for cand in final_candidates:
tag = cand[0]
d = cand[1]
ser = cand[2]
if ser.dropna().shape[0] < 10:
continue
try:
tests = test_stationarity_pair(ser)
# score: lower is better. We want ADF.p < 0.05 and KPSS.p > 0.05.
adf_p = tests['adf']['pvalue'] if tests['adf']['pvalue'] is not None else 1.0
kpss_p = tests['kpss']['pvalue'] if tests['kpss']['pvalue'] is not None else 0.0
# penalty for bad ADF (want small) and bad KPSS (want big)
score = (adf_p) + (1.0 - kpss_p)
scored.append({'tag': tag, 'diff': d, 'score': score, 'adf_p': adf_p, 'kpss_p': kpss_p, 'series': ser})
except Exception:
continue
if not scored:
return y_train, {'method': 'none', 'lambda': None}
scored = sorted(scored, key=lambda x: x['score'])
best = scored[0]
meta = {'method': best['tag'], 'diff': best['diff'], 'adf_p': best['adf_p'], 'kpss_p': best['kpss_p']}
return best['series'], meta
def apply_boxcox(series: pd.Series, lmbda: Optional[float] = None):
if not SCIPY_AVAILABLE:
raise ImportError("scipy required for boxcox")
s = series.dropna()
if (s <= 0).any():
raise ValueError("Box-Cox requires positive values")
if lmbda is None:
lmbda = boxcox_normmax(s, brack=(-2, 2))
transformed = boxcox(s, lmbda)
return pd.Series(index=s.index, data=transformed), float(lmbda)
# -------------------------------------------------------------------------
# Feature engineering
# -------------------------------------------------------------------------
def make_lags(df: pd.DataFrame, col: str, lags: List[int]):
for l in lags:
df[f'{col}_lag_{l}'] = df[col].shift(l)
return df
def make_rolls(df: pd.DataFrame, col: str, windows: List[int]):
for w in windows:
df[f'{col}_roll_mean_{w}'] = df[col].rolling(window=w, min_periods=1).mean()
df[f'{col}_roll_std_{w}'] = df[col].rolling(window=w, min_periods=1).std()
df[f'{col}_roll_min_{w}'] = df[col].rolling(window=w, min_periods=1).min()
df[f'{col}_roll_max_{w}'] = df[col].rolling(window=w, min_periods=1).max()
return df
def make_time_features(df: pd.DataFrame):
idx = df.index
df['dayofweek'] = idx.dayofweek
df['month'] = idx.month
df['is_weekend'] = idx.dayofweek >= 5
df['sin_week'] = np.sin(2 * np.pi * df['dayofweek'] / 7)
df['cos_month'] = np.cos(2 * np.pi * (df['month'] - 1) / 12)
return df
# -------------------------------------------------------------------------
# Splits, CV and strategies
# -------------------------------------------------------------------------
def chronological_split(df: pd.DataFrame, frac_train=0.7, frac_val=0.15):
n = len(df)
i_train = int(n * frac_train)
i_val = i_train + int(n * frac_val)
train = df.iloc[:i_train].copy()
val = df.iloc[i_train:i_val].copy()
test = df.iloc[i_val:].copy()
# Проверяем непрерывность дат
all_data = pd.concat([train, val, test])
date_diff = (all_data.index[1:] - all_data.index[:-1]).value_counts()
if len(date_diff) > 1:
print(f"Предупреждение: обнаружены разные интервалы между датами: {date_diff.index.tolist()}")
return train, val, test
def expanding_window_cv(X: pd.DataFrame, y: pd.Series, model_fit_predict, initial_train_size: int, h: int,
n_splits: int = 5):
"""Expanding window: [0:t] -> [t+1:t+h]"""
n = len(X)
step = (n - initial_train_size - h) // n_splits if n_splits > 0 else h
metrics = []
for i in range(n_splits):
end_train = initial_train_size + i * step
train_X, train_y = X.iloc[:end_train], y.iloc[:end_train]
test_X, test_y = X.iloc[end_train:end_train + h], y.iloc[end_train:end_train + h]
y_pred = model_fit_predict(train_X, train_y, h)
metrics.append({'fold': i, 'mae': mae(test_y.values, y_pred), 'rmse': rmse(test_y.values, y_pred)})
return pd.DataFrame(metrics)
def rolling_window_cv(X: pd.DataFrame, y: pd.Series, model_fit_predict, window: int, h: int, n_splits: int = 5):
"""Rolling window: [t-w:t] -> [t+1:t+h]"""
n = len(X)
step = (n - window - h) // n_splits if n_splits > 0 else h
metrics = []
for i in range(n_splits):
start = i * step
end = start + window
train_X, train_y = X.iloc[start:end], y.iloc[start:end]
test_X, test_y = X.iloc[end:end + h], y.iloc[end:end + h]
y_pred = model_fit_predict(train_X, train_y, h)
metrics.append({'fold': i, 'mae': mae(test_y.values, y_pred), 'rmse': rmse(test_y.values, y_pred)})
return pd.DataFrame(metrics)
# Strategies: recursive, direct, hybrid
def forecast_recursive_arima(fit_res, steps: int, last_date: pd.Timestamp = None, freq: str = 'D'):
"""Wrapper for SARIMAX results with proper date index"""
if hasattr(fit_res, "get_forecast"):
fc = fit_res.get_forecast(steps=steps)
mean = np.asarray(fc.predicted_mean)
try:
conf = fc.conf_int()
low = np.asarray(conf.iloc[:, 0])
high = np.asarray(conf.iloc[:, 1])
except Exception:
low = np.full(len(mean), np.nan)
high = np.full(len(mean), np.nan)
# Создаем правильный индекс
if last_date is not None:
dates = create_forecast_index(last_date, steps, freq)
mean = pd.Series(mean, index=dates)
low = pd.Series(low, index=dates)
high = pd.Series(high, index=dates)
return mean, (low, high)
else:
mean = fit_res.forecast(steps=steps)
if last_date is not None:
dates = create_forecast_index(last_date, steps, freq)
mean = pd.Series(mean, index=dates)
return mean, (None, None)
# Direct strategy for SARIMAX: fit separate models for each horizon
def forecast_direct_arima(train_series: pd.Series, h: int, order=(1, 0, 0)):
if not STATSMODELS_AVAILABLE:
raise ImportError("statsmodels required")
# create shifted target for forecasting h steps ahead
df = train_series.to_frame("y")
df['y_target_h'] = df['y'].shift(-h)
df = df.dropna()
# naive approach: use previous value as predictor (simple)
last = train_series.iloc[-1]
return np.full(h, last)
# -------------------------------------------------------------------------
# Models training wrapper
# -------------------------------------------------------------------------
def fit_sarimax_simple(series: pd.Series, order=(1, 0, 0), seasonal_order=(0, 0, 0, 0), **kwargs):
if not STATSMODELS_AVAILABLE:
raise ImportError("statsmodels required")
m = SARIMAX(series.dropna(), order=order, seasonal_order=seasonal_order,
enforce_stationarity=False, enforce_invertibility=False)
res = m.fit(disp=False, **kwargs)
return res
def forecast_sarimax(fit_res, steps: int, alpha: float = 0.05) -> Tuple[np.ndarray, Tuple[np.ndarray, np.ndarray]]:
"""
Делает прогноз из обученного SARIMAX-результата.
Возвращает (mean, (lower, upper)) — numpy arrays длины steps.
"""
try:
if hasattr(fit_res, "get_forecast"):
fc = fit_res.get_forecast(steps=steps)
mean = np.asarray(fc.predicted_mean)
# Проверяем на NaN
if np.any(np.isnan(mean)):
# Fallback: используем простой forecast
mean = fit_res.forecast(steps=steps)
mean = np.asarray(mean)
try:
conf = fc.conf_int(alpha=alpha)
lower = np.asarray(conf.iloc[:, 0])
upper = np.asarray(conf.iloc[:, 1])
# Проверяем доверительные интервалы на NaN
if np.any(np.isnan(lower)) or np.any(np.isnan(upper)):
lower = np.full(len(mean), np.nan)
upper = np.full(len(mean), np.nan)
except Exception:
lower = np.full(len(mean), np.nan)
upper = np.full(len(mean), np.nan)
return mean, (lower, upper)
else:
# fallback на forecast
mean = fit_res.forecast(steps=steps)
mean = np.asarray(mean)
lower = np.full(len(mean), np.nan)
upper = np.full(len(mean), np.nan)
return mean, (lower, upper)
except Exception as e:
# Если все методы не сработали, возвращаем массив NaN
print(f"Warning: SARIMAX forecast failed: {e}")
mean = np.full(steps, np.nan)
lower = np.full(steps, np.nan)
upper = np.full(steps, np.nan)
return mean, (lower, upper)
def fit_auto_arima(series: pd.Series, seasonal=False, m=1, **kwargs):
if not PM_AVAILABLE:
raise ImportError("pmdarima not installed")
model = pm.auto_arima(series.dropna(), seasonal=seasonal, m=m, error_action='ignore', suppress_warnings=True,
**kwargs)
return model
def fit_var(df: pd.DataFrame, maxlags=15):
if not STATSMODELS_AVAILABLE:
raise ImportError("statsmodels required")
model = VAR(df.dropna())
sel = model.select_order(maxlags=maxlags)
best = 1
try:
so = sel.selected_orders
for k in ('aic', 'bic', 'fpe', 'hqic'):
if so.get(k) is not None:
best = int(so[k])
break
except Exception:
best = 1
res = model.fit(maxlags=best)
return res
def fit_garch_on_residuals(residuals, p=1, q=1):
if not ARCH_AVAILABLE:
raise ImportError("arch not installed")
am = arch_model(residuals, vol='Garch', p=p, q=q, dist='normal')
r = am.fit(disp='off')
return r
# -------------------------------------------------------------------------
# Diagnostics and tests
# -------------------------------------------------------------------------
def ljung_box_test(resid: np.ndarray, lags: List[int] = [10]):
if not STATSMODELS_AVAILABLE:
raise ImportError("statsmodels required")
res = acorr_ljungbox(resid, lags=lags, return_df=True)
return res
def shapiro_test(resid: np.ndarray):
if not SCIPY_AVAILABLE:
raise ImportError("scipy required")
stat, p = shapiro(resid)
return {'stat': stat, 'pvalue': p}
def simple_dm_test(e1: np.ndarray, e2: np.ndarray):
"""
Простая реализация Diebold-Mariano теста по разности квадратических ошибок.
Возвращает t-stat и p-value (двухсторонний).
Примечание: это упрощённая версия, без HAC коррекции.
"""
# use squared error loss
d = (e1 - e2)
n = len(d)
dbar = np.mean(d)
sd = np.var(d, ddof=1)
denom = math.sqrt(sd / n) if sd > 0 else np.nan
if denom == 0 or np.isnan(denom):
return {'stat': np.nan, 'pvalue': np.nan}
tstat = dbar / denom
# two-sided pval from Student's t approx
from scipy.stats import t as student_t
pval = 2 * (1 - student_t.cdf(abs(tstat), df=n - 1))
return {'stat': float(tstat), 'pvalue': float(pval)}
# -------------------------------------------------------------------------
# Report generation (HTML)
# -------------------------------------------------------------------------
def generate_report_html(out_path: str, plots: List[plt.Figure], tables: Dict[str, pd.DataFrame], title="Lab3 Report"):
import base64
from io import BytesIO
html_parts = [f"""
<html>
<head>
<meta charset='utf-8'>
<title>{title}</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; background-color: white; color: black; }}
table {{ border-collapse: collapse; width: 100%; margin: 10px 0; }}
th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }}
th {{ background-color: #f2f2f2; }}
img {{ max-width: 100%; height: auto; margin: 10px 0; }}
.table-container {{ overflow-x: auto; }}
</style>
</head>
<body>
<h1>{title}</h1>
"""]
# Таблицы
for name, df in tables.items():
html_parts.append(f"<h2>{name}</h2>")
html_parts.append('<div class="table-container">')
html_parts.append(df.to_html(classes='table table-striped', border=0, index=True))
html_parts.append('</div>')
# Графики как base64
for i, fig in enumerate(plots):
# Сохраняем рисунок в буфер
buf = BytesIO()
fig.savefig(buf, format='png', bbox_inches='tight', dpi=100)
buf.seek(0)
# Кодируем в base64
img_data = base64.b64encode(buf.read()).decode('utf-8')
html_parts.append(f'<h3>Figure {i + 1}</h3>')
html_parts.append(f'<img src="data:image/png;base64,{img_data}" alt="Figure {i + 1}">')
# Закрываем рисунок чтобы освободить память
plt.close(fig)
break
html_parts.append("</body></html>")
with open(out_path, 'w', encoding='utf-8') as f:
f.write("\n".join(html_parts))
print("Report saved to", out_path)
# -------------------------------------------------------------------------
# Main runner that orchestrates everything
# -------------------------------------------------------------------------
def evaluate_with_cv(models_dict, X, y, cv_method='expanding', n_splits=5):
"""Оценка моделей с кросс-валидацией"""
cv_results = {}
for name, model_func in models_dict.items():
if cv_method == 'expanding':
cv_scores = expanding_window_cv(X, y, model_func,
initial_train_size=len(X) // 2,
h=30, n_splits=n_splits)
else:
cv_scores = rolling_window_cv(X, y, model_func,
window=len(X) // 2,
h=30, n_splits=n_splits)
cv_results[name] = cv_scores
return cv_results
def run_pipeline(data_path: str, timestamp_col: str, target_col: str,
out_report: str = 'lab3_report.html', freq: str = 'D'):
"""
Главная точка запуска pipeline.
"""
print("Loading", data_path)
df = load_data(data_path, timestamp_col)
if target_col not in df.columns:
numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
if not numeric_cols:
raise ValueError("No numeric columns found")
target_col = numeric_cols[0]
print("Target not found, using", target_col)
series = df[target_col].astype('float').copy()
series = resample_and_interpolate(series.to_frame(), freq=freq).iloc[:, 0]
print("Series length after resample:", len(series))
# 3.1 Preprocessing & transformation selection
transformed, meta = try_transformations_and_choose(series, seasonal_period=7)
print("Chosen transform:", meta)
# 3.2 Feature engineering
df_all = transformed.to_frame(name=target_col)
df_all = make_time_features(df_all)
df_all = make_lags(df_all, target_col, [1, 2, 7, 30])
df_all = make_rolls(df_all, target_col, [7, 30])
# dropna rows with lag features
df_all = df_all.dropna()
train, val, test = chronological_split(df_all, frac_train=0.7, frac_val=0.15)
y_train = train[target_col];
y_val = val[target_col];
y_test = test[target_col]
print("Sizes train/val/test:", len(y_train), len(y_val), len(y_test))
# 3.3 Models: benchmarks + SARIMAX + optional auto_arima + VAR
results = [] # each elem: dict(model, h, preds (np.array), extra)
horizons = [1, 7, 30]
# Определяем частоту для прогнозов
try:
inferred_freq = pd.infer_freq(y_train.index) or freq
except:
inferred_freq = freq
# Benchmarks
for h in horizons:
pred_values = np.full(h, y_train.iloc[-1])
pred_dates = create_forecast_index(y_train.index[-1], h, inferred_freq)
results.append({
'model': 'naive',
'h': h,
'pred': pd.Series(pred_values, index=pred_dates)
})
if len(y_train) >= 7:
seasonal_pred = seasonal_naive_forecast(y_train, season=7, steps=h)
seasonal_dates = create_forecast_index(y_train.index[-1], h, inferred_freq)
results.append({
'model': 'seasonal_naive',
'h': h,
'pred': pd.Series(seasonal_pred, index=seasonal_dates)
})
# SES/Holt (simple forecasting for 1-step and iterated for multi-step)
try:
from statsmodels.tsa.holtwinters import SimpleExpSmoothing, ExponentialSmoothing
# SES as simple baseline
ses = SimpleExpSmoothing(y_train.dropna()).fit(optimized=True)
for h in horizons:
pred = ses.forecast(h)
pred_dates = create_forecast_index(y_train.index[-1], h, inferred_freq)
results.append({
'model': 'SES',
'h': h,
'pred': pd.Series(pred, index=pred_dates)
})
except Exception as e:
print("SES skipped:", e)
# SARIMAX baseline
if STATSMODELS_AVAILABLE:
try:
# Проверяем, что данные подходят для SARIMAX
if len(y_train.dropna()) > 10 and y_train.var() > 1e-6: # достаточное количество точек и дисперсия
sar = fit_sarimax_simple(y_train, order=(1, 1, 1))
# Проверяем, что модель сходилась
if hasattr(sar, 'mle_retvals') and sar.mle_retvals.get('converged', False):
for h in horizons:
mean, (lower, upper) = forecast_sarimax(sar, steps=h)
# Проверяем, что прогнозы не все NaN
if not np.all(np.isnan(mean)):
pred_dates = create_forecast_index(y_train.index[-1], h, inferred_freq)
results.append({
'model': 'SARIMAX(1,1,1)',
'h': h,
'pred': pd.Series(mean, index=pred_dates)
})
else:
print(f"SARIMAX returned all NaN for horizon {h}")
else:
print("SARIMAX model did not converge")
else:
print("Insufficient data for SARIMAX")
except Exception as e:
print("SARIMAX failed:", e)
# pmdarima auto_arima
if PM_AVAILABLE:
try:
auto = fit_auto_arima(y_train, seasonal=False)
for h in horizons:
p = auto.predict(n_periods=h)
pred_dates = create_forecast_index(y_train.index[-1], h, inferred_freq)
results.append({
'model': 'auto_arima',
'h': h,
'pred': pd.Series(p, index=pred_dates)
})
except Exception as e:
print("auto_arima failed:", e)
# VAR if multivariate
if STATSMODELS_AVAILABLE and df.select_dtypes(include=[np.number]).shape[1] >= 2:
try:
num_df = df.select_dtypes(include=[np.number]).dropna()
var_res = fit_var(num_df, maxlags=5)
fut = var_res.forecast(var_res.endog[-var_res.k_ar:], steps=30)
# fut is array shape (30, k)
# wrap as predictions per horizon for the first variable
for h in [30]:
pred_dates = create_forecast_index(y_train.index[-1], h, inferred_freq)
results.append({
'model': 'VAR',
'h': h,
'pred': pd.Series(fut[:h, 0], index=pred_dates)
})
except Exception as e:
print("VAR failed:", e)
# TBATS модель
if TBATS_AVAILABLE:
try:
tbats_model = TBATS(seasonal_periods=[7, 30], use_arma_errors=True)
tbats_fitted = tbats_model.fit(y_train)
for h in horizons:
tbats_pred = tbats_fitted.forecast(steps=h)
pred_dates = create_forecast_index(y_train.index[-1], h, inferred_freq)
results.append({
'model': 'TBATS',
'h': h,
'pred': pd.Series(tbats_pred, index=pred_dates)
})
except Exception as e:
print("TBATS failed:", e)
# Prophet модель
if PROPHET_AVAILABLE:
try:
prophet_df = y_train.reset_index()
prophet_df.columns = ['ds', 'y']
prophet_model = Prophet()
prophet_model.fit(prophet_df)
future = prophet_model.make_future_dataframe(periods=max(horizons), freq=inferred_freq)
forecast = prophet_model.predict(future)
for h in horizons:
prophet_pred = forecast.tail(h)['yhat'].values
pred_dates = create_forecast_index(y_train.index[-1], h, inferred_freq)
results.append({
'model': 'Prophet',
'h': h,
'pred': pd.Series(prophet_pred, index=pred_dates)
})
except Exception as e:
print("Prophet failed:", e)
# GARCH на остатках SARIMAX
if ARCH_AVAILABLE and 'sar' in locals():
try:
garch_model = fit_garch_on_residuals(sar.resid, p=1, q=1)
# Прогноз волатильности можно добавить в анализ
except Exception as e:
print("GARCH failed:", e)
# 3.6 Diagnostics later for top models
# 3.7 Evaluate on test set
eval_rows = []
plots = []
for rec in results:
model_name = rec['model']
h = rec['h']
pred = rec['pred']
# Выравниваем прогнозы с тестовыми данными по времени
if hasattr(pred, 'index'):
# Для прогнозов с правильным индексом
aligned_pred = pred
# Берем только первые h точек тестовых данных для сравнения
y_true_aligned = y_test.iloc[:min(h, len(y_test))]
else:
# Для прогнозов без индекса (старый формат)
pred_values = np.asarray(pred).ravel()
aligned_pred = pd.Series(pred_values, index=y_test.index[:len(pred_values)])
y_true_aligned = y_test.iloc[:len(pred_values)]
if len(y_true_aligned) == 0:
continue
# Обрезаем прогноз до длины тестовых данных
aligned_pred = aligned_pred.iloc[:len(y_true_aligned)]
# Вычисляем метрики
row = {
'model': model_name,
'h': h,
'MAE': mae(y_true_aligned.values, aligned_pred.values),
'RMSE': rmse(y_true_aligned.values, aligned_pred.values),
'MAPE': mape(y_true_aligned.values, aligned_pred.values),
'SMAPE': smape(y_true_aligned.values, aligned_pred.values)
}
# MASE: use naive in-sample reference
row['MASE'] = mase(y_true_aligned.values, aligned_pred.values, y_train.values)
# R2 where possible
try:
row['R2'] = float((1 - np.sum((y_true_aligned.values - aligned_pred.values) ** 2) / np.sum(
(y_true_aligned.values - np.mean(y_true_aligned.values)) ** 2)))
except Exception:
row['R2'] = np.nan
eval_rows.append(row)
# Визуализация
fig, ax = plt.subplots(figsize=(8, 3))
# Показываем больше данных для контекста
context_points = min(200, len(y_train))
ax.plot(y_train.index[-context_points:], y_train.values[-context_points:],
label='train', alpha=0.7)
if len(val) > 0:
ax.plot(val.index, val.values, label='val', alpha=0.7)
ax.plot(y_test.index, y_test.values, label='test', alpha=0.7)
# Прогнозы с правильными датами
ax.plot(aligned_pred.index, aligned_pred.values,
label=f'pred_{model_name}_h{h}', linewidth=2)
ax.legend()
plots.append(fig)
eval_df = pd.DataFrame(eval_rows)
# Diagnostics for top-3 by RMSE
diag_tables = {}
try:
top3 = eval_df.sort_values('RMSE').head(3)['model'].tolist()
except Exception:
top3 = []
for m in top3:
# find corresponding fitted residuals if model was SARIMAX etc.
if m.startswith('SARIMAX'):
try:
resid = sar.resid.dropna()
lb = acorr_ljungbox(resid, lags=[10], return_df=True)
diag_tables[f'ljungbox_{m}'] = lb
if SCIPY_AVAILABLE:
sh = shapiro(resid)
diag_tables[f'shapiro_{m}'] = pd.DataFrame([{'stat': sh[0], 'pvalue': sh[1]}])
except Exception:
pass
# Diebold-Mariano pairwise for top 2 models (if available)
dm_table = None
try:
if len(eval_df) >= 2:
sorted_models = eval_df.sort_values('RMSE')
if len(sorted_models) >= 2:
m1 = sorted_models.iloc[0]['model']
m2 = sorted_models.iloc[1]['model']
# pick their predictions at h=1 (if exist)
pred1 = None;
pred2 = None
for rec in results:
if rec['model'] == m1 and rec['h'] == 1:
pred1 = rec['pred']
if rec['model'] == m2 and rec['h'] == 1:
pred2 = rec['pred']
if pred1 is not None and pred2 is not None:
# align lengths with test
y_true = y_test.values[:min(len(pred1), len(y_test))]
e1 = (y_true - pred1.values[:len(y_true)]) ** 2
e2 = (y_true - pred2.values[:len(y_true)]) ** 2
dm = simple_dm_test(e1, e2)
dm_table = pd.DataFrame(
[{'model1': m1, 'model2': m2, 'dm_stat': dm['stat'], 'pvalue': dm['pvalue']}])
except Exception:
dm_table = None
# Generate report
tables = {'evaluation': eval_df}
if diag_tables:
tables.update(diag_tables)
if dm_table is not None:
tables['dm_test'] = dm_table
generate_report_html(out_report, plots, tables, title="Lab3 Full Report")
print("Pipeline finished. Report:", out_report)
# Ensure we have at least some predictions
if not results:
st.warning("Все модели вернули NaN. Использую простой наивный прогноз.")
for h in horizons:
pred_values = np.full(h, y_train.iloc[-1] if len(y_train) > 0 else 0)
pred_dates = create_forecast_index(y_train.index[-1], h, inferred_freq)
results.append({
'model': 'fallback_naive',
'h': h,
'pred': pd.Series(pred_values, index=pred_dates)
})
cv_results = evaluate_with_cv({
'SARIMAX': lambda X, y, h: forecast_recursive(fit_sarimax_simple(y), y, h),
'AutoARIMA': lambda X, y, h: forecast_recursive(fit_auto_arima(y), y, h)
}, df_all.drop(columns=[target_col]), df_all[target_col])
# -------------------------
# helpers used in the pipeline but defined later
# -------------------------
def seasonal_naive_forecast(series: pd.Series, season: int, steps: int):
last = series.iloc[-season:]
reps = int(np.ceil(steps / season))
arr = np.tile(last.values, reps)[:steps]
return arr
def create_forecast_index(last_train_date: pd.Timestamp, steps: int, freq: str = 'D') -> pd.DatetimeIndex:
"""Создает правильный временной индекс для прогнозов"""
try:
# Если freq = 'auto', пытаемся определить частоту
if freq == 'auto':
freq = pd.infer_freq(pd.DatetimeIndex([last_train_date])) or 'D'
# Создаем индекс с правильным смещением
if isinstance(last_train_date, pd.Timestamp):
start_date = last_train_date + pd.Timedelta(days=1)
else:
start_date = last_train_date + pd.DateOffset(days=1)
return pd.date_range(
start=start_date,
periods=steps,
freq=freq
)
except Exception as e:
print(f"Warning: could not create proper date index: {e}")
# Fallback: числовой индекс
return pd.RangeIndex(start=0, stop=steps)
def forecast_recursive(model, series, steps, freq='D'):
"""Рекурсивная стратегия прогнозирования"""
predictions = []
current_series = series.copy()
for _ in range(steps):
if hasattr(model, 'predict'):
pred = model.predict(n_periods=1)
else:
pred = model.forecast(steps=1)
predictions.append(pred[0])
# Обновляем ряд для следующей итерации
current_series = pd.concat(
[current_series, pd.Series([pred[0]], index=[current_series.index[-1] + pd.Timedelta(days=1)])])
return np.array(predictions)
def forecast_direct(train_series, test_features, model_factory, steps):
"""Прямая стратегия - отдельная модель для каждого горизонта"""
predictions = []
for h in range(1, steps + 1):
# Создаем смещенную целевую переменную
y_h = train_series.shift(-h).dropna()
X_h = train_series.iloc[:len(y_h)]
# Обучаем модель для горизонта h
model = model_factory()
model.fit(X_h.values.reshape(-1, 1), y_h.values)
# Прогноз для горизонта h
pred = model.predict(train_series.values[-1:].reshape(1, -1))
predictions.append(pred[0])
return np.array(predictions)