Spaces:
No application file
No application file
| # lab3_pipeline.py | |
| """ | |
| Полный pipeline для ЛР №3 (выполнение пунктов 3.1-3.8). | |
| Сохраняйте файл в папке src проекта (TimeSeriesHomework/src/lab3_pipeline.py). | |
| """ | |
| import os | |
| import math | |
| import time | |
| from typing import List, Dict, Any, Optional, Tuple | |
| import numpy as np | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| # statsmodels и основные тесты | |
| try: | |
| from statsmodels.tsa.stattools import adfuller, kpss | |
| from statsmodels.tsa.seasonal import seasonal_decompose | |
| from statsmodels.graphics.tsaplots import plot_acf, plot_pacf | |
| from statsmodels.stats.diagnostic import acorr_ljungbox | |
| from statsmodels.tsa.statespace.sarimax import SARIMAX | |
| from statsmodels.tsa.api import VAR | |
| STATSMODELS_AVAILABLE = True | |
| except Exception as e: | |
| STATSMODELS_AVAILABLE = False | |
| print("warning: statsmodels not available:", e) | |
| # optional heavy deps | |
| try: | |
| import pmdarima as pm | |
| PM_AVAILABLE = True | |
| except Exception: | |
| PM_AVAILABLE = False | |
| try: | |
| from arch import arch_model | |
| ARCH_AVAILABLE = True | |
| except Exception: | |
| ARCH_AVAILABLE = False | |
| try: | |
| from prophet import Prophet | |
| PROPHET_AVAILABLE = True | |
| except Exception: | |
| PROPHET_AVAILABLE = False | |
| # sklearn | |
| try: | |
| from sklearn.model_selection import TimeSeriesSplit | |
| from sklearn.linear_model import LinearRegression | |
| from sklearn.metrics import r2_score | |
| SKLEARN_AVAILABLE = True | |
| except Exception: | |
| SKLEARN_AVAILABLE = False | |
| # scipy (Box-Cox, Shapiro) | |
| try: | |
| from scipy.stats import boxcox, boxcox_normmax, shapiro | |
| SCIPY_AVAILABLE = True | |
| except Exception: | |
| SCIPY_AVAILABLE = False | |
| try: | |
| from tbats import TBATS | |
| TBATS_AVAILABLE = True | |
| except ImportError: | |
| TBATS_AVAILABLE = False | |
| # ------------------------------------------------------------------------- | |
| # Metrics | |
| # ------------------------------------------------------------------------- | |
| def mae(y_true, y_pred): return np.mean(np.abs(y_true - y_pred)) | |
| def rmse(y_true, y_pred): return math.sqrt(np.mean((y_true - y_pred) ** 2)) | |
| def mape(y_true, y_pred): return np.mean(np.abs((y_true - y_pred) / (y_true + 1e-9))) * 100.0 | |
| def smape(y_true, y_pred): return 100.0 * np.mean( | |
| 2.0 * np.abs(y_pred - y_true) / (np.abs(y_true) + np.abs(y_pred) + 1e-9)) | |
| def rmsle(y_true, y_pred): return math.sqrt(np.mean((np.log1p(y_pred) - np.log1p(y_true)) ** 2)) | |
| def mase(y_true, y_pred, naive_ref): | |
| # naive_ref: series used to compute naive diff (e.g. train series) | |
| denom = np.mean(np.abs(np.diff(naive_ref))) | |
| if denom == 0: | |
| return np.nan | |
| return np.mean(np.abs(y_true - y_pred)) / denom | |
| # ------------------------------------------------------------------------- | |
| # IO & preprocessing utilities | |
| # ------------------------------------------------------------------------- | |
| def load_data(path: str, timestamp_col: str = 'timestamp', tz: Optional[str] = None) -> pd.DataFrame: | |
| if path.endswith('.parquet'): | |
| df = pd.read_parquet(path) | |
| else: | |
| df = pd.read_csv(path) | |
| if timestamp_col not in df.columns: | |
| raise ValueError(f"timestamp column '{timestamp_col}' not found") | |
| df[timestamp_col] = pd.to_datetime(df[timestamp_col], errors='coerce') | |
| if tz is not None: | |
| try: | |
| df[timestamp_col] = df[timestamp_col].dt.tz_localize(tz) | |
| except Exception: | |
| try: | |
| df[timestamp_col] = df[timestamp_col].dt.tz_convert(tz) | |
| except Exception: | |
| pass | |
| df = df.sort_values(timestamp_col).drop_duplicates(subset=[timestamp_col]) | |
| df = df.set_index(timestamp_col) | |
| return df | |
| def resample_and_interpolate(df: pd.DataFrame, freq: str = 'D', method: str = 'linear') -> pd.DataFrame: | |
| dfr = df.resample(freq).asfreq() | |
| if method == 'linear': | |
| return dfr.interpolate(method='linear') | |
| elif method == 'ffill': | |
| return dfr.fillna(method='ffill') | |
| else: | |
| return dfr.fillna(method='ffill').interpolate() | |
| # ------------------------------------------------------------------------- | |
| # Transformations and stationarity selection | |
| # ------------------------------------------------------------------------- | |
| def test_stationarity_pair(series: pd.Series) -> Dict[str, Dict[str, Any]]: | |
| """Возвращает результаты ADF и KPSS""" | |
| res = {} | |
| if not STATSMODELS_AVAILABLE: | |
| raise ImportError("statsmodels required for stationarity tests") | |
| s = series.dropna() | |
| if len(s) < 3: | |
| return {'adf': {'pvalue': np.nan}, 'kpss': {'pvalue': np.nan}} | |
| adf_res = adfuller(s, autolag='AIC', regression='c') | |
| kpss_res = kpss(s, nlags='auto') | |
| return {'adf': {'stat': adf_res[0], 'pvalue': adf_res[1]}, 'kpss': {'stat': kpss_res[0], 'pvalue': kpss_res[1]}} | |
| def try_transformations_and_choose(y_train: pd.Series, seasonal_period: int = 7): | |
| """ | |
| Пробуем набор преобразований: | |
| - none | |
| - log (если >0) | |
| - boxcox (если >0 и scipy доступен) | |
| - diff(1), diff(s), diff(1).diff(s) | |
| Выбираем ту комбинацию, которая минимизирует конфликт ADF/KPSS: | |
| критерий: ADF.pvalue < 0.05 (хочется) и KPSS.pvalue > 0.05 (хочется). | |
| Возвращаем: transformed_series, meta dict (applied transformations, lambda) | |
| """ | |
| candidates = [] | |
| # original | |
| candidates.append(('none', y_train)) | |
| # log | |
| if (y_train > 0).all(): | |
| candidates.append(('log', np.log(y_train))) | |
| # boxcox | |
| if (y_train > 0).all() and SCIPY_AVAILABLE: | |
| try: | |
| lam = boxcox_normmax(y_train.dropna(), brack=(-2, 2)) | |
| bc, _ = apply_boxcox(y_train, lmbda=lam) | |
| candidates.append((f'boxcox_{lam:.4f}', bc)) | |
| except Exception: | |
| pass | |
| # differenced versions | |
| # diff1 of original or of transformed series | |
| final_candidates = [] | |
| for name, ser in candidates: | |
| ser_clean = ser.dropna() | |
| final_candidates.append((name, 0, ser_clean)) # 0 differences | |
| if len(ser_clean) > 3: | |
| final_candidates.append((name, 1, ser_clean.diff(1).dropna())) | |
| if seasonal_period and len(ser_clean) > seasonal_period + 3: | |
| final_candidates.append((name, seasonal_period, ser_clean.diff(seasonal_period).dropna())) | |
| final_candidates.append( | |
| (name, ('1+s', seasonal_period), ser_clean.diff(1).diff(seasonal_period).dropna())) | |
| # Evaluate candidates | |
| scored = [] | |
| for cand in final_candidates: | |
| tag = cand[0] | |
| d = cand[1] | |
| ser = cand[2] | |
| if ser.dropna().shape[0] < 10: | |
| continue | |
| try: | |
| tests = test_stationarity_pair(ser) | |
| # score: lower is better. We want ADF.p < 0.05 and KPSS.p > 0.05. | |
| adf_p = tests['adf']['pvalue'] if tests['adf']['pvalue'] is not None else 1.0 | |
| kpss_p = tests['kpss']['pvalue'] if tests['kpss']['pvalue'] is not None else 0.0 | |
| # penalty for bad ADF (want small) and bad KPSS (want big) | |
| score = (adf_p) + (1.0 - kpss_p) | |
| scored.append({'tag': tag, 'diff': d, 'score': score, 'adf_p': adf_p, 'kpss_p': kpss_p, 'series': ser}) | |
| except Exception: | |
| continue | |
| if not scored: | |
| return y_train, {'method': 'none', 'lambda': None} | |
| scored = sorted(scored, key=lambda x: x['score']) | |
| best = scored[0] | |
| meta = {'method': best['tag'], 'diff': best['diff'], 'adf_p': best['adf_p'], 'kpss_p': best['kpss_p']} | |
| return best['series'], meta | |
| def apply_boxcox(series: pd.Series, lmbda: Optional[float] = None): | |
| if not SCIPY_AVAILABLE: | |
| raise ImportError("scipy required for boxcox") | |
| s = series.dropna() | |
| if (s <= 0).any(): | |
| raise ValueError("Box-Cox requires positive values") | |
| if lmbda is None: | |
| lmbda = boxcox_normmax(s, brack=(-2, 2)) | |
| transformed = boxcox(s, lmbda) | |
| return pd.Series(index=s.index, data=transformed), float(lmbda) | |
| # ------------------------------------------------------------------------- | |
| # Feature engineering | |
| # ------------------------------------------------------------------------- | |
| def make_lags(df: pd.DataFrame, col: str, lags: List[int]): | |
| for l in lags: | |
| df[f'{col}_lag_{l}'] = df[col].shift(l) | |
| return df | |
| def make_rolls(df: pd.DataFrame, col: str, windows: List[int]): | |
| for w in windows: | |
| df[f'{col}_roll_mean_{w}'] = df[col].rolling(window=w, min_periods=1).mean() | |
| df[f'{col}_roll_std_{w}'] = df[col].rolling(window=w, min_periods=1).std() | |
| df[f'{col}_roll_min_{w}'] = df[col].rolling(window=w, min_periods=1).min() | |
| df[f'{col}_roll_max_{w}'] = df[col].rolling(window=w, min_periods=1).max() | |
| return df | |
| def make_time_features(df: pd.DataFrame): | |
| idx = df.index | |
| df['dayofweek'] = idx.dayofweek | |
| df['month'] = idx.month | |
| df['is_weekend'] = idx.dayofweek >= 5 | |
| df['sin_week'] = np.sin(2 * np.pi * df['dayofweek'] / 7) | |
| df['cos_month'] = np.cos(2 * np.pi * (df['month'] - 1) / 12) | |
| return df | |
| # ------------------------------------------------------------------------- | |
| # Splits, CV and strategies | |
| # ------------------------------------------------------------------------- | |
| def chronological_split(df: pd.DataFrame, frac_train=0.7, frac_val=0.15): | |
| n = len(df) | |
| i_train = int(n * frac_train) | |
| i_val = i_train + int(n * frac_val) | |
| train = df.iloc[:i_train].copy() | |
| val = df.iloc[i_train:i_val].copy() | |
| test = df.iloc[i_val:].copy() | |
| # Проверяем непрерывность дат | |
| all_data = pd.concat([train, val, test]) | |
| date_diff = (all_data.index[1:] - all_data.index[:-1]).value_counts() | |
| if len(date_diff) > 1: | |
| print(f"Предупреждение: обнаружены разные интервалы между датами: {date_diff.index.tolist()}") | |
| return train, val, test | |
| def expanding_window_cv(X: pd.DataFrame, y: pd.Series, model_fit_predict, initial_train_size: int, h: int, | |
| n_splits: int = 5): | |
| """Expanding window: [0:t] -> [t+1:t+h]""" | |
| n = len(X) | |
| step = (n - initial_train_size - h) // n_splits if n_splits > 0 else h | |
| metrics = [] | |
| for i in range(n_splits): | |
| end_train = initial_train_size + i * step | |
| train_X, train_y = X.iloc[:end_train], y.iloc[:end_train] | |
| test_X, test_y = X.iloc[end_train:end_train + h], y.iloc[end_train:end_train + h] | |
| y_pred = model_fit_predict(train_X, train_y, h) | |
| metrics.append({'fold': i, 'mae': mae(test_y.values, y_pred), 'rmse': rmse(test_y.values, y_pred)}) | |
| return pd.DataFrame(metrics) | |
| def rolling_window_cv(X: pd.DataFrame, y: pd.Series, model_fit_predict, window: int, h: int, n_splits: int = 5): | |
| """Rolling window: [t-w:t] -> [t+1:t+h]""" | |
| n = len(X) | |
| step = (n - window - h) // n_splits if n_splits > 0 else h | |
| metrics = [] | |
| for i in range(n_splits): | |
| start = i * step | |
| end = start + window | |
| train_X, train_y = X.iloc[start:end], y.iloc[start:end] | |
| test_X, test_y = X.iloc[end:end + h], y.iloc[end:end + h] | |
| y_pred = model_fit_predict(train_X, train_y, h) | |
| metrics.append({'fold': i, 'mae': mae(test_y.values, y_pred), 'rmse': rmse(test_y.values, y_pred)}) | |
| return pd.DataFrame(metrics) | |
| # Strategies: recursive, direct, hybrid | |
| def forecast_recursive_arima(fit_res, steps: int, last_date: pd.Timestamp = None, freq: str = 'D'): | |
| """Wrapper for SARIMAX results with proper date index""" | |
| if hasattr(fit_res, "get_forecast"): | |
| fc = fit_res.get_forecast(steps=steps) | |
| mean = np.asarray(fc.predicted_mean) | |
| try: | |
| conf = fc.conf_int() | |
| low = np.asarray(conf.iloc[:, 0]) | |
| high = np.asarray(conf.iloc[:, 1]) | |
| except Exception: | |
| low = np.full(len(mean), np.nan) | |
| high = np.full(len(mean), np.nan) | |
| # Создаем правильный индекс | |
| if last_date is not None: | |
| dates = create_forecast_index(last_date, steps, freq) | |
| mean = pd.Series(mean, index=dates) | |
| low = pd.Series(low, index=dates) | |
| high = pd.Series(high, index=dates) | |
| return mean, (low, high) | |
| else: | |
| mean = fit_res.forecast(steps=steps) | |
| if last_date is not None: | |
| dates = create_forecast_index(last_date, steps, freq) | |
| mean = pd.Series(mean, index=dates) | |
| return mean, (None, None) | |
| # Direct strategy for SARIMAX: fit separate models for each horizon | |
| def forecast_direct_arima(train_series: pd.Series, h: int, order=(1, 0, 0)): | |
| if not STATSMODELS_AVAILABLE: | |
| raise ImportError("statsmodels required") | |
| # create shifted target for forecasting h steps ahead | |
| df = train_series.to_frame("y") | |
| df['y_target_h'] = df['y'].shift(-h) | |
| df = df.dropna() | |
| # naive approach: use previous value as predictor (simple) | |
| last = train_series.iloc[-1] | |
| return np.full(h, last) | |
| # ------------------------------------------------------------------------- | |
| # Models training wrapper | |
| # ------------------------------------------------------------------------- | |
| def fit_sarimax_simple(series: pd.Series, order=(1, 0, 0), seasonal_order=(0, 0, 0, 0), **kwargs): | |
| if not STATSMODELS_AVAILABLE: | |
| raise ImportError("statsmodels required") | |
| m = SARIMAX(series.dropna(), order=order, seasonal_order=seasonal_order, | |
| enforce_stationarity=False, enforce_invertibility=False) | |
| res = m.fit(disp=False, **kwargs) | |
| return res | |
| def forecast_sarimax(fit_res, steps: int, alpha: float = 0.05) -> Tuple[np.ndarray, Tuple[np.ndarray, np.ndarray]]: | |
| """ | |
| Делает прогноз из обученного SARIMAX-результата. | |
| Возвращает (mean, (lower, upper)) — numpy arrays длины steps. | |
| """ | |
| try: | |
| if hasattr(fit_res, "get_forecast"): | |
| fc = fit_res.get_forecast(steps=steps) | |
| mean = np.asarray(fc.predicted_mean) | |
| # Проверяем на NaN | |
| if np.any(np.isnan(mean)): | |
| # Fallback: используем простой forecast | |
| mean = fit_res.forecast(steps=steps) | |
| mean = np.asarray(mean) | |
| try: | |
| conf = fc.conf_int(alpha=alpha) | |
| lower = np.asarray(conf.iloc[:, 0]) | |
| upper = np.asarray(conf.iloc[:, 1]) | |
| # Проверяем доверительные интервалы на NaN | |
| if np.any(np.isnan(lower)) or np.any(np.isnan(upper)): | |
| lower = np.full(len(mean), np.nan) | |
| upper = np.full(len(mean), np.nan) | |
| except Exception: | |
| lower = np.full(len(mean), np.nan) | |
| upper = np.full(len(mean), np.nan) | |
| return mean, (lower, upper) | |
| else: | |
| # fallback на forecast | |
| mean = fit_res.forecast(steps=steps) | |
| mean = np.asarray(mean) | |
| lower = np.full(len(mean), np.nan) | |
| upper = np.full(len(mean), np.nan) | |
| return mean, (lower, upper) | |
| except Exception as e: | |
| # Если все методы не сработали, возвращаем массив NaN | |
| print(f"Warning: SARIMAX forecast failed: {e}") | |
| mean = np.full(steps, np.nan) | |
| lower = np.full(steps, np.nan) | |
| upper = np.full(steps, np.nan) | |
| return mean, (lower, upper) | |
| def fit_auto_arima(series: pd.Series, seasonal=False, m=1, **kwargs): | |
| if not PM_AVAILABLE: | |
| raise ImportError("pmdarima not installed") | |
| model = pm.auto_arima(series.dropna(), seasonal=seasonal, m=m, error_action='ignore', suppress_warnings=True, | |
| **kwargs) | |
| return model | |
| def fit_var(df: pd.DataFrame, maxlags=15): | |
| if not STATSMODELS_AVAILABLE: | |
| raise ImportError("statsmodels required") | |
| model = VAR(df.dropna()) | |
| sel = model.select_order(maxlags=maxlags) | |
| best = 1 | |
| try: | |
| so = sel.selected_orders | |
| for k in ('aic', 'bic', 'fpe', 'hqic'): | |
| if so.get(k) is not None: | |
| best = int(so[k]) | |
| break | |
| except Exception: | |
| best = 1 | |
| res = model.fit(maxlags=best) | |
| return res | |
| def fit_garch_on_residuals(residuals, p=1, q=1): | |
| if not ARCH_AVAILABLE: | |
| raise ImportError("arch not installed") | |
| am = arch_model(residuals, vol='Garch', p=p, q=q, dist='normal') | |
| r = am.fit(disp='off') | |
| return r | |
| # ------------------------------------------------------------------------- | |
| # Diagnostics and tests | |
| # ------------------------------------------------------------------------- | |
| def ljung_box_test(resid: np.ndarray, lags: List[int] = [10]): | |
| if not STATSMODELS_AVAILABLE: | |
| raise ImportError("statsmodels required") | |
| res = acorr_ljungbox(resid, lags=lags, return_df=True) | |
| return res | |
| def shapiro_test(resid: np.ndarray): | |
| if not SCIPY_AVAILABLE: | |
| raise ImportError("scipy required") | |
| stat, p = shapiro(resid) | |
| return {'stat': stat, 'pvalue': p} | |
| def simple_dm_test(e1: np.ndarray, e2: np.ndarray): | |
| """ | |
| Простая реализация Diebold-Mariano теста по разности квадратических ошибок. | |
| Возвращает t-stat и p-value (двухсторонний). | |
| Примечание: это упрощённая версия, без HAC коррекции. | |
| """ | |
| # use squared error loss | |
| d = (e1 - e2) | |
| n = len(d) | |
| dbar = np.mean(d) | |
| sd = np.var(d, ddof=1) | |
| denom = math.sqrt(sd / n) if sd > 0 else np.nan | |
| if denom == 0 or np.isnan(denom): | |
| return {'stat': np.nan, 'pvalue': np.nan} | |
| tstat = dbar / denom | |
| # two-sided pval from Student's t approx | |
| from scipy.stats import t as student_t | |
| pval = 2 * (1 - student_t.cdf(abs(tstat), df=n - 1)) | |
| return {'stat': float(tstat), 'pvalue': float(pval)} | |
| # ------------------------------------------------------------------------- | |
| # Report generation (HTML) | |
| # ------------------------------------------------------------------------- | |
| def generate_report_html(out_path: str, plots: List[plt.Figure], tables: Dict[str, pd.DataFrame], title="Lab3 Report"): | |
| import base64 | |
| from io import BytesIO | |
| html_parts = [f""" | |
| <html> | |
| <head> | |
| <meta charset='utf-8'> | |
| <title>{title}</title> | |
| <style> | |
| body {{ font-family: Arial, sans-serif; margin: 20px; background-color: white; color: black; }} | |
| table {{ border-collapse: collapse; width: 100%; margin: 10px 0; }} | |
| th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }} | |
| th {{ background-color: #f2f2f2; }} | |
| img {{ max-width: 100%; height: auto; margin: 10px 0; }} | |
| .table-container {{ overflow-x: auto; }} | |
| </style> | |
| </head> | |
| <body> | |
| <h1>{title}</h1> | |
| """] | |
| # Таблицы | |
| for name, df in tables.items(): | |
| html_parts.append(f"<h2>{name}</h2>") | |
| html_parts.append('<div class="table-container">') | |
| html_parts.append(df.to_html(classes='table table-striped', border=0, index=True)) | |
| html_parts.append('</div>') | |
| # Графики как base64 | |
| for i, fig in enumerate(plots): | |
| # Сохраняем рисунок в буфер | |
| buf = BytesIO() | |
| fig.savefig(buf, format='png', bbox_inches='tight', dpi=100) | |
| buf.seek(0) | |
| # Кодируем в base64 | |
| img_data = base64.b64encode(buf.read()).decode('utf-8') | |
| html_parts.append(f'<h3>Figure {i + 1}</h3>') | |
| html_parts.append(f'<img src="data:image/png;base64,{img_data}" alt="Figure {i + 1}">') | |
| # Закрываем рисунок чтобы освободить память | |
| plt.close(fig) | |
| break | |
| html_parts.append("</body></html>") | |
| with open(out_path, 'w', encoding='utf-8') as f: | |
| f.write("\n".join(html_parts)) | |
| print("Report saved to", out_path) | |
| # ------------------------------------------------------------------------- | |
| # Main runner that orchestrates everything | |
| # ------------------------------------------------------------------------- | |
| def evaluate_with_cv(models_dict, X, y, cv_method='expanding', n_splits=5): | |
| """Оценка моделей с кросс-валидацией""" | |
| cv_results = {} | |
| for name, model_func in models_dict.items(): | |
| if cv_method == 'expanding': | |
| cv_scores = expanding_window_cv(X, y, model_func, | |
| initial_train_size=len(X) // 2, | |
| h=30, n_splits=n_splits) | |
| else: | |
| cv_scores = rolling_window_cv(X, y, model_func, | |
| window=len(X) // 2, | |
| h=30, n_splits=n_splits) | |
| cv_results[name] = cv_scores | |
| return cv_results | |
| def run_pipeline(data_path: str, timestamp_col: str, target_col: str, | |
| out_report: str = 'lab3_report.html', freq: str = 'D'): | |
| """ | |
| Главная точка запуска pipeline. | |
| """ | |
| print("Loading", data_path) | |
| df = load_data(data_path, timestamp_col) | |
| if target_col not in df.columns: | |
| numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist() | |
| if not numeric_cols: | |
| raise ValueError("No numeric columns found") | |
| target_col = numeric_cols[0] | |
| print("Target not found, using", target_col) | |
| series = df[target_col].astype('float').copy() | |
| series = resample_and_interpolate(series.to_frame(), freq=freq).iloc[:, 0] | |
| print("Series length after resample:", len(series)) | |
| # 3.1 Preprocessing & transformation selection | |
| transformed, meta = try_transformations_and_choose(series, seasonal_period=7) | |
| print("Chosen transform:", meta) | |
| # 3.2 Feature engineering | |
| df_all = transformed.to_frame(name=target_col) | |
| df_all = make_time_features(df_all) | |
| df_all = make_lags(df_all, target_col, [1, 2, 7, 30]) | |
| df_all = make_rolls(df_all, target_col, [7, 30]) | |
| # dropna rows with lag features | |
| df_all = df_all.dropna() | |
| train, val, test = chronological_split(df_all, frac_train=0.7, frac_val=0.15) | |
| y_train = train[target_col]; | |
| y_val = val[target_col]; | |
| y_test = test[target_col] | |
| print("Sizes train/val/test:", len(y_train), len(y_val), len(y_test)) | |
| # 3.3 Models: benchmarks + SARIMAX + optional auto_arima + VAR | |
| results = [] # each elem: dict(model, h, preds (np.array), extra) | |
| horizons = [1, 7, 30] | |
| # Определяем частоту для прогнозов | |
| try: | |
| inferred_freq = pd.infer_freq(y_train.index) or freq | |
| except: | |
| inferred_freq = freq | |
| # Benchmarks | |
| for h in horizons: | |
| pred_values = np.full(h, y_train.iloc[-1]) | |
| pred_dates = create_forecast_index(y_train.index[-1], h, inferred_freq) | |
| results.append({ | |
| 'model': 'naive', | |
| 'h': h, | |
| 'pred': pd.Series(pred_values, index=pred_dates) | |
| }) | |
| if len(y_train) >= 7: | |
| seasonal_pred = seasonal_naive_forecast(y_train, season=7, steps=h) | |
| seasonal_dates = create_forecast_index(y_train.index[-1], h, inferred_freq) | |
| results.append({ | |
| 'model': 'seasonal_naive', | |
| 'h': h, | |
| 'pred': pd.Series(seasonal_pred, index=seasonal_dates) | |
| }) | |
| # SES/Holt (simple forecasting for 1-step and iterated for multi-step) | |
| try: | |
| from statsmodels.tsa.holtwinters import SimpleExpSmoothing, ExponentialSmoothing | |
| # SES as simple baseline | |
| ses = SimpleExpSmoothing(y_train.dropna()).fit(optimized=True) | |
| for h in horizons: | |
| pred = ses.forecast(h) | |
| pred_dates = create_forecast_index(y_train.index[-1], h, inferred_freq) | |
| results.append({ | |
| 'model': 'SES', | |
| 'h': h, | |
| 'pred': pd.Series(pred, index=pred_dates) | |
| }) | |
| except Exception as e: | |
| print("SES skipped:", e) | |
| # SARIMAX baseline | |
| if STATSMODELS_AVAILABLE: | |
| try: | |
| # Проверяем, что данные подходят для SARIMAX | |
| if len(y_train.dropna()) > 10 and y_train.var() > 1e-6: # достаточное количество точек и дисперсия | |
| sar = fit_sarimax_simple(y_train, order=(1, 1, 1)) | |
| # Проверяем, что модель сходилась | |
| if hasattr(sar, 'mle_retvals') and sar.mle_retvals.get('converged', False): | |
| for h in horizons: | |
| mean, (lower, upper) = forecast_sarimax(sar, steps=h) | |
| # Проверяем, что прогнозы не все NaN | |
| if not np.all(np.isnan(mean)): | |
| pred_dates = create_forecast_index(y_train.index[-1], h, inferred_freq) | |
| results.append({ | |
| 'model': 'SARIMAX(1,1,1)', | |
| 'h': h, | |
| 'pred': pd.Series(mean, index=pred_dates) | |
| }) | |
| else: | |
| print(f"SARIMAX returned all NaN for horizon {h}") | |
| else: | |
| print("SARIMAX model did not converge") | |
| else: | |
| print("Insufficient data for SARIMAX") | |
| except Exception as e: | |
| print("SARIMAX failed:", e) | |
| # pmdarima auto_arima | |
| if PM_AVAILABLE: | |
| try: | |
| auto = fit_auto_arima(y_train, seasonal=False) | |
| for h in horizons: | |
| p = auto.predict(n_periods=h) | |
| pred_dates = create_forecast_index(y_train.index[-1], h, inferred_freq) | |
| results.append({ | |
| 'model': 'auto_arima', | |
| 'h': h, | |
| 'pred': pd.Series(p, index=pred_dates) | |
| }) | |
| except Exception as e: | |
| print("auto_arima failed:", e) | |
| # VAR if multivariate | |
| if STATSMODELS_AVAILABLE and df.select_dtypes(include=[np.number]).shape[1] >= 2: | |
| try: | |
| num_df = df.select_dtypes(include=[np.number]).dropna() | |
| var_res = fit_var(num_df, maxlags=5) | |
| fut = var_res.forecast(var_res.endog[-var_res.k_ar:], steps=30) | |
| # fut is array shape (30, k) | |
| # wrap as predictions per horizon for the first variable | |
| for h in [30]: | |
| pred_dates = create_forecast_index(y_train.index[-1], h, inferred_freq) | |
| results.append({ | |
| 'model': 'VAR', | |
| 'h': h, | |
| 'pred': pd.Series(fut[:h, 0], index=pred_dates) | |
| }) | |
| except Exception as e: | |
| print("VAR failed:", e) | |
| # TBATS модель | |
| if TBATS_AVAILABLE: | |
| try: | |
| tbats_model = TBATS(seasonal_periods=[7, 30], use_arma_errors=True) | |
| tbats_fitted = tbats_model.fit(y_train) | |
| for h in horizons: | |
| tbats_pred = tbats_fitted.forecast(steps=h) | |
| pred_dates = create_forecast_index(y_train.index[-1], h, inferred_freq) | |
| results.append({ | |
| 'model': 'TBATS', | |
| 'h': h, | |
| 'pred': pd.Series(tbats_pred, index=pred_dates) | |
| }) | |
| except Exception as e: | |
| print("TBATS failed:", e) | |
| # Prophet модель | |
| if PROPHET_AVAILABLE: | |
| try: | |
| prophet_df = y_train.reset_index() | |
| prophet_df.columns = ['ds', 'y'] | |
| prophet_model = Prophet() | |
| prophet_model.fit(prophet_df) | |
| future = prophet_model.make_future_dataframe(periods=max(horizons), freq=inferred_freq) | |
| forecast = prophet_model.predict(future) | |
| for h in horizons: | |
| prophet_pred = forecast.tail(h)['yhat'].values | |
| pred_dates = create_forecast_index(y_train.index[-1], h, inferred_freq) | |
| results.append({ | |
| 'model': 'Prophet', | |
| 'h': h, | |
| 'pred': pd.Series(prophet_pred, index=pred_dates) | |
| }) | |
| except Exception as e: | |
| print("Prophet failed:", e) | |
| # GARCH на остатках SARIMAX | |
| if ARCH_AVAILABLE and 'sar' in locals(): | |
| try: | |
| garch_model = fit_garch_on_residuals(sar.resid, p=1, q=1) | |
| # Прогноз волатильности можно добавить в анализ | |
| except Exception as e: | |
| print("GARCH failed:", e) | |
| # 3.6 Diagnostics later for top models | |
| # 3.7 Evaluate on test set | |
| eval_rows = [] | |
| plots = [] | |
| for rec in results: | |
| model_name = rec['model'] | |
| h = rec['h'] | |
| pred = rec['pred'] | |
| # Выравниваем прогнозы с тестовыми данными по времени | |
| if hasattr(pred, 'index'): | |
| # Для прогнозов с правильным индексом | |
| aligned_pred = pred | |
| # Берем только первые h точек тестовых данных для сравнения | |
| y_true_aligned = y_test.iloc[:min(h, len(y_test))] | |
| else: | |
| # Для прогнозов без индекса (старый формат) | |
| pred_values = np.asarray(pred).ravel() | |
| aligned_pred = pd.Series(pred_values, index=y_test.index[:len(pred_values)]) | |
| y_true_aligned = y_test.iloc[:len(pred_values)] | |
| if len(y_true_aligned) == 0: | |
| continue | |
| # Обрезаем прогноз до длины тестовых данных | |
| aligned_pred = aligned_pred.iloc[:len(y_true_aligned)] | |
| # Вычисляем метрики | |
| row = { | |
| 'model': model_name, | |
| 'h': h, | |
| 'MAE': mae(y_true_aligned.values, aligned_pred.values), | |
| 'RMSE': rmse(y_true_aligned.values, aligned_pred.values), | |
| 'MAPE': mape(y_true_aligned.values, aligned_pred.values), | |
| 'SMAPE': smape(y_true_aligned.values, aligned_pred.values) | |
| } | |
| # MASE: use naive in-sample reference | |
| row['MASE'] = mase(y_true_aligned.values, aligned_pred.values, y_train.values) | |
| # R2 where possible | |
| try: | |
| row['R2'] = float((1 - np.sum((y_true_aligned.values - aligned_pred.values) ** 2) / np.sum( | |
| (y_true_aligned.values - np.mean(y_true_aligned.values)) ** 2))) | |
| except Exception: | |
| row['R2'] = np.nan | |
| eval_rows.append(row) | |
| # Визуализация | |
| fig, ax = plt.subplots(figsize=(8, 3)) | |
| # Показываем больше данных для контекста | |
| context_points = min(200, len(y_train)) | |
| ax.plot(y_train.index[-context_points:], y_train.values[-context_points:], | |
| label='train', alpha=0.7) | |
| if len(val) > 0: | |
| ax.plot(val.index, val.values, label='val', alpha=0.7) | |
| ax.plot(y_test.index, y_test.values, label='test', alpha=0.7) | |
| # Прогнозы с правильными датами | |
| ax.plot(aligned_pred.index, aligned_pred.values, | |
| label=f'pred_{model_name}_h{h}', linewidth=2) | |
| ax.legend() | |
| plots.append(fig) | |
| eval_df = pd.DataFrame(eval_rows) | |
| # Diagnostics for top-3 by RMSE | |
| diag_tables = {} | |
| try: | |
| top3 = eval_df.sort_values('RMSE').head(3)['model'].tolist() | |
| except Exception: | |
| top3 = [] | |
| for m in top3: | |
| # find corresponding fitted residuals if model was SARIMAX etc. | |
| if m.startswith('SARIMAX'): | |
| try: | |
| resid = sar.resid.dropna() | |
| lb = acorr_ljungbox(resid, lags=[10], return_df=True) | |
| diag_tables[f'ljungbox_{m}'] = lb | |
| if SCIPY_AVAILABLE: | |
| sh = shapiro(resid) | |
| diag_tables[f'shapiro_{m}'] = pd.DataFrame([{'stat': sh[0], 'pvalue': sh[1]}]) | |
| except Exception: | |
| pass | |
| # Diebold-Mariano pairwise for top 2 models (if available) | |
| dm_table = None | |
| try: | |
| if len(eval_df) >= 2: | |
| sorted_models = eval_df.sort_values('RMSE') | |
| if len(sorted_models) >= 2: | |
| m1 = sorted_models.iloc[0]['model'] | |
| m2 = sorted_models.iloc[1]['model'] | |
| # pick their predictions at h=1 (if exist) | |
| pred1 = None; | |
| pred2 = None | |
| for rec in results: | |
| if rec['model'] == m1 and rec['h'] == 1: | |
| pred1 = rec['pred'] | |
| if rec['model'] == m2 and rec['h'] == 1: | |
| pred2 = rec['pred'] | |
| if pred1 is not None and pred2 is not None: | |
| # align lengths with test | |
| y_true = y_test.values[:min(len(pred1), len(y_test))] | |
| e1 = (y_true - pred1.values[:len(y_true)]) ** 2 | |
| e2 = (y_true - pred2.values[:len(y_true)]) ** 2 | |
| dm = simple_dm_test(e1, e2) | |
| dm_table = pd.DataFrame( | |
| [{'model1': m1, 'model2': m2, 'dm_stat': dm['stat'], 'pvalue': dm['pvalue']}]) | |
| except Exception: | |
| dm_table = None | |
| # Generate report | |
| tables = {'evaluation': eval_df} | |
| if diag_tables: | |
| tables.update(diag_tables) | |
| if dm_table is not None: | |
| tables['dm_test'] = dm_table | |
| generate_report_html(out_report, plots, tables, title="Lab3 Full Report") | |
| print("Pipeline finished. Report:", out_report) | |
| # Ensure we have at least some predictions | |
| if not results: | |
| st.warning("Все модели вернули NaN. Использую простой наивный прогноз.") | |
| for h in horizons: | |
| pred_values = np.full(h, y_train.iloc[-1] if len(y_train) > 0 else 0) | |
| pred_dates = create_forecast_index(y_train.index[-1], h, inferred_freq) | |
| results.append({ | |
| 'model': 'fallback_naive', | |
| 'h': h, | |
| 'pred': pd.Series(pred_values, index=pred_dates) | |
| }) | |
| cv_results = evaluate_with_cv({ | |
| 'SARIMAX': lambda X, y, h: forecast_recursive(fit_sarimax_simple(y), y, h), | |
| 'AutoARIMA': lambda X, y, h: forecast_recursive(fit_auto_arima(y), y, h) | |
| }, df_all.drop(columns=[target_col]), df_all[target_col]) | |
| # ------------------------- | |
| # helpers used in the pipeline but defined later | |
| # ------------------------- | |
| def seasonal_naive_forecast(series: pd.Series, season: int, steps: int): | |
| last = series.iloc[-season:] | |
| reps = int(np.ceil(steps / season)) | |
| arr = np.tile(last.values, reps)[:steps] | |
| return arr | |
| def create_forecast_index(last_train_date: pd.Timestamp, steps: int, freq: str = 'D') -> pd.DatetimeIndex: | |
| """Создает правильный временной индекс для прогнозов""" | |
| try: | |
| # Если freq = 'auto', пытаемся определить частоту | |
| if freq == 'auto': | |
| freq = pd.infer_freq(pd.DatetimeIndex([last_train_date])) or 'D' | |
| # Создаем индекс с правильным смещением | |
| if isinstance(last_train_date, pd.Timestamp): | |
| start_date = last_train_date + pd.Timedelta(days=1) | |
| else: | |
| start_date = last_train_date + pd.DateOffset(days=1) | |
| return pd.date_range( | |
| start=start_date, | |
| periods=steps, | |
| freq=freq | |
| ) | |
| except Exception as e: | |
| print(f"Warning: could not create proper date index: {e}") | |
| # Fallback: числовой индекс | |
| return pd.RangeIndex(start=0, stop=steps) | |
| def forecast_recursive(model, series, steps, freq='D'): | |
| """Рекурсивная стратегия прогнозирования""" | |
| predictions = [] | |
| current_series = series.copy() | |
| for _ in range(steps): | |
| if hasattr(model, 'predict'): | |
| pred = model.predict(n_periods=1) | |
| else: | |
| pred = model.forecast(steps=1) | |
| predictions.append(pred[0]) | |
| # Обновляем ряд для следующей итерации | |
| current_series = pd.concat( | |
| [current_series, pd.Series([pred[0]], index=[current_series.index[-1] + pd.Timedelta(days=1)])]) | |
| return np.array(predictions) | |
| def forecast_direct(train_series, test_features, model_factory, steps): | |
| """Прямая стратегия - отдельная модель для каждого горизонта""" | |
| predictions = [] | |
| for h in range(1, steps + 1): | |
| # Создаем смещенную целевую переменную | |
| y_h = train_series.shift(-h).dropna() | |
| X_h = train_series.iloc[:len(y_h)] | |
| # Обучаем модель для горизонта h | |
| model = model_factory() | |
| model.fit(X_h.values.reshape(-1, 1), y_h.values) | |
| # Прогноз для горизонта h | |
| pred = model.predict(train_series.values[-1:].reshape(1, -1)) | |
| predictions.append(pred[0]) | |
| return np.array(predictions) | |