# lab3_pipeline.py """ Полный pipeline для ЛР №3 (выполнение пунктов 3.1-3.8). Сохраняйте файл в папке src проекта (TimeSeriesHomework/src/lab3_pipeline.py). """ import os import math import time from typing import List, Dict, Any, Optional, Tuple import numpy as np import pandas as pd import matplotlib.pyplot as plt # statsmodels и основные тесты try: from statsmodels.tsa.stattools import adfuller, kpss from statsmodels.tsa.seasonal import seasonal_decompose from statsmodels.graphics.tsaplots import plot_acf, plot_pacf from statsmodels.stats.diagnostic import acorr_ljungbox from statsmodels.tsa.statespace.sarimax import SARIMAX from statsmodels.tsa.api import VAR STATSMODELS_AVAILABLE = True except Exception as e: STATSMODELS_AVAILABLE = False print("warning: statsmodels not available:", e) # optional heavy deps try: import pmdarima as pm PM_AVAILABLE = True except Exception: PM_AVAILABLE = False try: from arch import arch_model ARCH_AVAILABLE = True except Exception: ARCH_AVAILABLE = False try: from prophet import Prophet PROPHET_AVAILABLE = True except Exception: PROPHET_AVAILABLE = False # sklearn try: from sklearn.model_selection import TimeSeriesSplit from sklearn.linear_model import LinearRegression from sklearn.metrics import r2_score SKLEARN_AVAILABLE = True except Exception: SKLEARN_AVAILABLE = False # scipy (Box-Cox, Shapiro) try: from scipy.stats import boxcox, boxcox_normmax, shapiro SCIPY_AVAILABLE = True except Exception: SCIPY_AVAILABLE = False try: from tbats import TBATS TBATS_AVAILABLE = True except ImportError: TBATS_AVAILABLE = False # ------------------------------------------------------------------------- # Metrics # ------------------------------------------------------------------------- def mae(y_true, y_pred): return np.mean(np.abs(y_true - y_pred)) def rmse(y_true, y_pred): return math.sqrt(np.mean((y_true - y_pred) ** 2)) def mape(y_true, y_pred): return np.mean(np.abs((y_true - y_pred) / (y_true + 1e-9))) * 100.0 def smape(y_true, y_pred): return 100.0 * np.mean( 2.0 * np.abs(y_pred - y_true) / (np.abs(y_true) + np.abs(y_pred) + 1e-9)) def rmsle(y_true, y_pred): return math.sqrt(np.mean((np.log1p(y_pred) - np.log1p(y_true)) ** 2)) def mase(y_true, y_pred, naive_ref): # naive_ref: series used to compute naive diff (e.g. train series) denom = np.mean(np.abs(np.diff(naive_ref))) if denom == 0: return np.nan return np.mean(np.abs(y_true - y_pred)) / denom # ------------------------------------------------------------------------- # IO & preprocessing utilities # ------------------------------------------------------------------------- def load_data(path: str, timestamp_col: str = 'timestamp', tz: Optional[str] = None) -> pd.DataFrame: if path.endswith('.parquet'): df = pd.read_parquet(path) else: df = pd.read_csv(path) if timestamp_col not in df.columns: raise ValueError(f"timestamp column '{timestamp_col}' not found") df[timestamp_col] = pd.to_datetime(df[timestamp_col], errors='coerce') if tz is not None: try: df[timestamp_col] = df[timestamp_col].dt.tz_localize(tz) except Exception: try: df[timestamp_col] = df[timestamp_col].dt.tz_convert(tz) except Exception: pass df = df.sort_values(timestamp_col).drop_duplicates(subset=[timestamp_col]) df = df.set_index(timestamp_col) return df def resample_and_interpolate(df: pd.DataFrame, freq: str = 'D', method: str = 'linear') -> pd.DataFrame: dfr = df.resample(freq).asfreq() if method == 'linear': return dfr.interpolate(method='linear') elif method == 'ffill': return dfr.fillna(method='ffill') else: return dfr.fillna(method='ffill').interpolate() # ------------------------------------------------------------------------- # Transformations and stationarity selection # ------------------------------------------------------------------------- def test_stationarity_pair(series: pd.Series) -> Dict[str, Dict[str, Any]]: """Возвращает результаты ADF и KPSS""" res = {} if not STATSMODELS_AVAILABLE: raise ImportError("statsmodels required for stationarity tests") s = series.dropna() if len(s) < 3: return {'adf': {'pvalue': np.nan}, 'kpss': {'pvalue': np.nan}} adf_res = adfuller(s, autolag='AIC', regression='c') kpss_res = kpss(s, nlags='auto') return {'adf': {'stat': adf_res[0], 'pvalue': adf_res[1]}, 'kpss': {'stat': kpss_res[0], 'pvalue': kpss_res[1]}} def try_transformations_and_choose(y_train: pd.Series, seasonal_period: int = 7): """ Пробуем набор преобразований: - none - log (если >0) - boxcox (если >0 и scipy доступен) - diff(1), diff(s), diff(1).diff(s) Выбираем ту комбинацию, которая минимизирует конфликт ADF/KPSS: критерий: ADF.pvalue < 0.05 (хочется) и KPSS.pvalue > 0.05 (хочется). Возвращаем: transformed_series, meta dict (applied transformations, lambda) """ candidates = [] # original candidates.append(('none', y_train)) # log if (y_train > 0).all(): candidates.append(('log', np.log(y_train))) # boxcox if (y_train > 0).all() and SCIPY_AVAILABLE: try: lam = boxcox_normmax(y_train.dropna(), brack=(-2, 2)) bc, _ = apply_boxcox(y_train, lmbda=lam) candidates.append((f'boxcox_{lam:.4f}', bc)) except Exception: pass # differenced versions # diff1 of original or of transformed series final_candidates = [] for name, ser in candidates: ser_clean = ser.dropna() final_candidates.append((name, 0, ser_clean)) # 0 differences if len(ser_clean) > 3: final_candidates.append((name, 1, ser_clean.diff(1).dropna())) if seasonal_period and len(ser_clean) > seasonal_period + 3: final_candidates.append((name, seasonal_period, ser_clean.diff(seasonal_period).dropna())) final_candidates.append( (name, ('1+s', seasonal_period), ser_clean.diff(1).diff(seasonal_period).dropna())) # Evaluate candidates scored = [] for cand in final_candidates: tag = cand[0] d = cand[1] ser = cand[2] if ser.dropna().shape[0] < 10: continue try: tests = test_stationarity_pair(ser) # score: lower is better. We want ADF.p < 0.05 and KPSS.p > 0.05. adf_p = tests['adf']['pvalue'] if tests['adf']['pvalue'] is not None else 1.0 kpss_p = tests['kpss']['pvalue'] if tests['kpss']['pvalue'] is not None else 0.0 # penalty for bad ADF (want small) and bad KPSS (want big) score = (adf_p) + (1.0 - kpss_p) scored.append({'tag': tag, 'diff': d, 'score': score, 'adf_p': adf_p, 'kpss_p': kpss_p, 'series': ser}) except Exception: continue if not scored: return y_train, {'method': 'none', 'lambda': None} scored = sorted(scored, key=lambda x: x['score']) best = scored[0] meta = {'method': best['tag'], 'diff': best['diff'], 'adf_p': best['adf_p'], 'kpss_p': best['kpss_p']} return best['series'], meta def apply_boxcox(series: pd.Series, lmbda: Optional[float] = None): if not SCIPY_AVAILABLE: raise ImportError("scipy required for boxcox") s = series.dropna() if (s <= 0).any(): raise ValueError("Box-Cox requires positive values") if lmbda is None: lmbda = boxcox_normmax(s, brack=(-2, 2)) transformed = boxcox(s, lmbda) return pd.Series(index=s.index, data=transformed), float(lmbda) # ------------------------------------------------------------------------- # Feature engineering # ------------------------------------------------------------------------- def make_lags(df: pd.DataFrame, col: str, lags: List[int]): for l in lags: df[f'{col}_lag_{l}'] = df[col].shift(l) return df def make_rolls(df: pd.DataFrame, col: str, windows: List[int]): for w in windows: df[f'{col}_roll_mean_{w}'] = df[col].rolling(window=w, min_periods=1).mean() df[f'{col}_roll_std_{w}'] = df[col].rolling(window=w, min_periods=1).std() df[f'{col}_roll_min_{w}'] = df[col].rolling(window=w, min_periods=1).min() df[f'{col}_roll_max_{w}'] = df[col].rolling(window=w, min_periods=1).max() return df def make_time_features(df: pd.DataFrame): idx = df.index df['dayofweek'] = idx.dayofweek df['month'] = idx.month df['is_weekend'] = idx.dayofweek >= 5 df['sin_week'] = np.sin(2 * np.pi * df['dayofweek'] / 7) df['cos_month'] = np.cos(2 * np.pi * (df['month'] - 1) / 12) return df # ------------------------------------------------------------------------- # Splits, CV and strategies # ------------------------------------------------------------------------- def chronological_split(df: pd.DataFrame, frac_train=0.7, frac_val=0.15): n = len(df) i_train = int(n * frac_train) i_val = i_train + int(n * frac_val) train = df.iloc[:i_train].copy() val = df.iloc[i_train:i_val].copy() test = df.iloc[i_val:].copy() # Проверяем непрерывность дат all_data = pd.concat([train, val, test]) date_diff = (all_data.index[1:] - all_data.index[:-1]).value_counts() if len(date_diff) > 1: print(f"Предупреждение: обнаружены разные интервалы между датами: {date_diff.index.tolist()}") return train, val, test def expanding_window_cv(X: pd.DataFrame, y: pd.Series, model_fit_predict, initial_train_size: int, h: int, n_splits: int = 5): """Expanding window: [0:t] -> [t+1:t+h]""" n = len(X) step = (n - initial_train_size - h) // n_splits if n_splits > 0 else h metrics = [] for i in range(n_splits): end_train = initial_train_size + i * step train_X, train_y = X.iloc[:end_train], y.iloc[:end_train] test_X, test_y = X.iloc[end_train:end_train + h], y.iloc[end_train:end_train + h] y_pred = model_fit_predict(train_X, train_y, h) metrics.append({'fold': i, 'mae': mae(test_y.values, y_pred), 'rmse': rmse(test_y.values, y_pred)}) return pd.DataFrame(metrics) def rolling_window_cv(X: pd.DataFrame, y: pd.Series, model_fit_predict, window: int, h: int, n_splits: int = 5): """Rolling window: [t-w:t] -> [t+1:t+h]""" n = len(X) step = (n - window - h) // n_splits if n_splits > 0 else h metrics = [] for i in range(n_splits): start = i * step end = start + window train_X, train_y = X.iloc[start:end], y.iloc[start:end] test_X, test_y = X.iloc[end:end + h], y.iloc[end:end + h] y_pred = model_fit_predict(train_X, train_y, h) metrics.append({'fold': i, 'mae': mae(test_y.values, y_pred), 'rmse': rmse(test_y.values, y_pred)}) return pd.DataFrame(metrics) # Strategies: recursive, direct, hybrid def forecast_recursive_arima(fit_res, steps: int, last_date: pd.Timestamp = None, freq: str = 'D'): """Wrapper for SARIMAX results with proper date index""" if hasattr(fit_res, "get_forecast"): fc = fit_res.get_forecast(steps=steps) mean = np.asarray(fc.predicted_mean) try: conf = fc.conf_int() low = np.asarray(conf.iloc[:, 0]) high = np.asarray(conf.iloc[:, 1]) except Exception: low = np.full(len(mean), np.nan) high = np.full(len(mean), np.nan) # Создаем правильный индекс if last_date is not None: dates = create_forecast_index(last_date, steps, freq) mean = pd.Series(mean, index=dates) low = pd.Series(low, index=dates) high = pd.Series(high, index=dates) return mean, (low, high) else: mean = fit_res.forecast(steps=steps) if last_date is not None: dates = create_forecast_index(last_date, steps, freq) mean = pd.Series(mean, index=dates) return mean, (None, None) # Direct strategy for SARIMAX: fit separate models for each horizon def forecast_direct_arima(train_series: pd.Series, h: int, order=(1, 0, 0)): if not STATSMODELS_AVAILABLE: raise ImportError("statsmodels required") # create shifted target for forecasting h steps ahead df = train_series.to_frame("y") df['y_target_h'] = df['y'].shift(-h) df = df.dropna() # naive approach: use previous value as predictor (simple) last = train_series.iloc[-1] return np.full(h, last) # ------------------------------------------------------------------------- # Models training wrapper # ------------------------------------------------------------------------- def fit_sarimax_simple(series: pd.Series, order=(1, 0, 0), seasonal_order=(0, 0, 0, 0), **kwargs): if not STATSMODELS_AVAILABLE: raise ImportError("statsmodels required") m = SARIMAX(series.dropna(), order=order, seasonal_order=seasonal_order, enforce_stationarity=False, enforce_invertibility=False) res = m.fit(disp=False, **kwargs) return res def forecast_sarimax(fit_res, steps: int, alpha: float = 0.05) -> Tuple[np.ndarray, Tuple[np.ndarray, np.ndarray]]: """ Делает прогноз из обученного SARIMAX-результата. Возвращает (mean, (lower, upper)) — numpy arrays длины steps. """ try: if hasattr(fit_res, "get_forecast"): fc = fit_res.get_forecast(steps=steps) mean = np.asarray(fc.predicted_mean) # Проверяем на NaN if np.any(np.isnan(mean)): # Fallback: используем простой forecast mean = fit_res.forecast(steps=steps) mean = np.asarray(mean) try: conf = fc.conf_int(alpha=alpha) lower = np.asarray(conf.iloc[:, 0]) upper = np.asarray(conf.iloc[:, 1]) # Проверяем доверительные интервалы на NaN if np.any(np.isnan(lower)) or np.any(np.isnan(upper)): lower = np.full(len(mean), np.nan) upper = np.full(len(mean), np.nan) except Exception: lower = np.full(len(mean), np.nan) upper = np.full(len(mean), np.nan) return mean, (lower, upper) else: # fallback на forecast mean = fit_res.forecast(steps=steps) mean = np.asarray(mean) lower = np.full(len(mean), np.nan) upper = np.full(len(mean), np.nan) return mean, (lower, upper) except Exception as e: # Если все методы не сработали, возвращаем массив NaN print(f"Warning: SARIMAX forecast failed: {e}") mean = np.full(steps, np.nan) lower = np.full(steps, np.nan) upper = np.full(steps, np.nan) return mean, (lower, upper) def fit_auto_arima(series: pd.Series, seasonal=False, m=1, **kwargs): if not PM_AVAILABLE: raise ImportError("pmdarima not installed") model = pm.auto_arima(series.dropna(), seasonal=seasonal, m=m, error_action='ignore', suppress_warnings=True, **kwargs) return model def fit_var(df: pd.DataFrame, maxlags=15): if not STATSMODELS_AVAILABLE: raise ImportError("statsmodels required") model = VAR(df.dropna()) sel = model.select_order(maxlags=maxlags) best = 1 try: so = sel.selected_orders for k in ('aic', 'bic', 'fpe', 'hqic'): if so.get(k) is not None: best = int(so[k]) break except Exception: best = 1 res = model.fit(maxlags=best) return res def fit_garch_on_residuals(residuals, p=1, q=1): if not ARCH_AVAILABLE: raise ImportError("arch not installed") am = arch_model(residuals, vol='Garch', p=p, q=q, dist='normal') r = am.fit(disp='off') return r # ------------------------------------------------------------------------- # Diagnostics and tests # ------------------------------------------------------------------------- def ljung_box_test(resid: np.ndarray, lags: List[int] = [10]): if not STATSMODELS_AVAILABLE: raise ImportError("statsmodels required") res = acorr_ljungbox(resid, lags=lags, return_df=True) return res def shapiro_test(resid: np.ndarray): if not SCIPY_AVAILABLE: raise ImportError("scipy required") stat, p = shapiro(resid) return {'stat': stat, 'pvalue': p} def simple_dm_test(e1: np.ndarray, e2: np.ndarray): """ Простая реализация Diebold-Mariano теста по разности квадратических ошибок. Возвращает t-stat и p-value (двухсторонний). Примечание: это упрощённая версия, без HAC коррекции. """ # use squared error loss d = (e1 - e2) n = len(d) dbar = np.mean(d) sd = np.var(d, ddof=1) denom = math.sqrt(sd / n) if sd > 0 else np.nan if denom == 0 or np.isnan(denom): return {'stat': np.nan, 'pvalue': np.nan} tstat = dbar / denom # two-sided pval from Student's t approx from scipy.stats import t as student_t pval = 2 * (1 - student_t.cdf(abs(tstat), df=n - 1)) return {'stat': float(tstat), 'pvalue': float(pval)} # ------------------------------------------------------------------------- # Report generation (HTML) # ------------------------------------------------------------------------- def generate_report_html(out_path: str, plots: List[plt.Figure], tables: Dict[str, pd.DataFrame], title="Lab3 Report"): import base64 from io import BytesIO html_parts = [f""" {title}

{title}

"""] # Таблицы for name, df in tables.items(): html_parts.append(f"

{name}

") html_parts.append('
') html_parts.append(df.to_html(classes='table table-striped', border=0, index=True)) html_parts.append('
') # Графики как base64 for i, fig in enumerate(plots): # Сохраняем рисунок в буфер buf = BytesIO() fig.savefig(buf, format='png', bbox_inches='tight', dpi=100) buf.seek(0) # Кодируем в base64 img_data = base64.b64encode(buf.read()).decode('utf-8') html_parts.append(f'

Figure {i + 1}

') html_parts.append(f'Figure {i + 1}') # Закрываем рисунок чтобы освободить память plt.close(fig) break html_parts.append("") with open(out_path, 'w', encoding='utf-8') as f: f.write("\n".join(html_parts)) print("Report saved to", out_path) # ------------------------------------------------------------------------- # Main runner that orchestrates everything # ------------------------------------------------------------------------- def evaluate_with_cv(models_dict, X, y, cv_method='expanding', n_splits=5): """Оценка моделей с кросс-валидацией""" cv_results = {} for name, model_func in models_dict.items(): if cv_method == 'expanding': cv_scores = expanding_window_cv(X, y, model_func, initial_train_size=len(X) // 2, h=30, n_splits=n_splits) else: cv_scores = rolling_window_cv(X, y, model_func, window=len(X) // 2, h=30, n_splits=n_splits) cv_results[name] = cv_scores return cv_results def run_pipeline(data_path: str, timestamp_col: str, target_col: str, out_report: str = 'lab3_report.html', freq: str = 'D'): """ Главная точка запуска pipeline. """ print("Loading", data_path) df = load_data(data_path, timestamp_col) if target_col not in df.columns: numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist() if not numeric_cols: raise ValueError("No numeric columns found") target_col = numeric_cols[0] print("Target not found, using", target_col) series = df[target_col].astype('float').copy() series = resample_and_interpolate(series.to_frame(), freq=freq).iloc[:, 0] print("Series length after resample:", len(series)) # 3.1 Preprocessing & transformation selection transformed, meta = try_transformations_and_choose(series, seasonal_period=7) print("Chosen transform:", meta) # 3.2 Feature engineering df_all = transformed.to_frame(name=target_col) df_all = make_time_features(df_all) df_all = make_lags(df_all, target_col, [1, 2, 7, 30]) df_all = make_rolls(df_all, target_col, [7, 30]) # dropna rows with lag features df_all = df_all.dropna() train, val, test = chronological_split(df_all, frac_train=0.7, frac_val=0.15) y_train = train[target_col]; y_val = val[target_col]; y_test = test[target_col] print("Sizes train/val/test:", len(y_train), len(y_val), len(y_test)) # 3.3 Models: benchmarks + SARIMAX + optional auto_arima + VAR results = [] # each elem: dict(model, h, preds (np.array), extra) horizons = [1, 7, 30] # Определяем частоту для прогнозов try: inferred_freq = pd.infer_freq(y_train.index) or freq except: inferred_freq = freq # Benchmarks for h in horizons: pred_values = np.full(h, y_train.iloc[-1]) pred_dates = create_forecast_index(y_train.index[-1], h, inferred_freq) results.append({ 'model': 'naive', 'h': h, 'pred': pd.Series(pred_values, index=pred_dates) }) if len(y_train) >= 7: seasonal_pred = seasonal_naive_forecast(y_train, season=7, steps=h) seasonal_dates = create_forecast_index(y_train.index[-1], h, inferred_freq) results.append({ 'model': 'seasonal_naive', 'h': h, 'pred': pd.Series(seasonal_pred, index=seasonal_dates) }) # SES/Holt (simple forecasting for 1-step and iterated for multi-step) try: from statsmodels.tsa.holtwinters import SimpleExpSmoothing, ExponentialSmoothing # SES as simple baseline ses = SimpleExpSmoothing(y_train.dropna()).fit(optimized=True) for h in horizons: pred = ses.forecast(h) pred_dates = create_forecast_index(y_train.index[-1], h, inferred_freq) results.append({ 'model': 'SES', 'h': h, 'pred': pd.Series(pred, index=pred_dates) }) except Exception as e: print("SES skipped:", e) # SARIMAX baseline if STATSMODELS_AVAILABLE: try: # Проверяем, что данные подходят для SARIMAX if len(y_train.dropna()) > 10 and y_train.var() > 1e-6: # достаточное количество точек и дисперсия sar = fit_sarimax_simple(y_train, order=(1, 1, 1)) # Проверяем, что модель сходилась if hasattr(sar, 'mle_retvals') and sar.mle_retvals.get('converged', False): for h in horizons: mean, (lower, upper) = forecast_sarimax(sar, steps=h) # Проверяем, что прогнозы не все NaN if not np.all(np.isnan(mean)): pred_dates = create_forecast_index(y_train.index[-1], h, inferred_freq) results.append({ 'model': 'SARIMAX(1,1,1)', 'h': h, 'pred': pd.Series(mean, index=pred_dates) }) else: print(f"SARIMAX returned all NaN for horizon {h}") else: print("SARIMAX model did not converge") else: print("Insufficient data for SARIMAX") except Exception as e: print("SARIMAX failed:", e) # pmdarima auto_arima if PM_AVAILABLE: try: auto = fit_auto_arima(y_train, seasonal=False) for h in horizons: p = auto.predict(n_periods=h) pred_dates = create_forecast_index(y_train.index[-1], h, inferred_freq) results.append({ 'model': 'auto_arima', 'h': h, 'pred': pd.Series(p, index=pred_dates) }) except Exception as e: print("auto_arima failed:", e) # VAR if multivariate if STATSMODELS_AVAILABLE and df.select_dtypes(include=[np.number]).shape[1] >= 2: try: num_df = df.select_dtypes(include=[np.number]).dropna() var_res = fit_var(num_df, maxlags=5) fut = var_res.forecast(var_res.endog[-var_res.k_ar:], steps=30) # fut is array shape (30, k) # wrap as predictions per horizon for the first variable for h in [30]: pred_dates = create_forecast_index(y_train.index[-1], h, inferred_freq) results.append({ 'model': 'VAR', 'h': h, 'pred': pd.Series(fut[:h, 0], index=pred_dates) }) except Exception as e: print("VAR failed:", e) # TBATS модель if TBATS_AVAILABLE: try: tbats_model = TBATS(seasonal_periods=[7, 30], use_arma_errors=True) tbats_fitted = tbats_model.fit(y_train) for h in horizons: tbats_pred = tbats_fitted.forecast(steps=h) pred_dates = create_forecast_index(y_train.index[-1], h, inferred_freq) results.append({ 'model': 'TBATS', 'h': h, 'pred': pd.Series(tbats_pred, index=pred_dates) }) except Exception as e: print("TBATS failed:", e) # Prophet модель if PROPHET_AVAILABLE: try: prophet_df = y_train.reset_index() prophet_df.columns = ['ds', 'y'] prophet_model = Prophet() prophet_model.fit(prophet_df) future = prophet_model.make_future_dataframe(periods=max(horizons), freq=inferred_freq) forecast = prophet_model.predict(future) for h in horizons: prophet_pred = forecast.tail(h)['yhat'].values pred_dates = create_forecast_index(y_train.index[-1], h, inferred_freq) results.append({ 'model': 'Prophet', 'h': h, 'pred': pd.Series(prophet_pred, index=pred_dates) }) except Exception as e: print("Prophet failed:", e) # GARCH на остатках SARIMAX if ARCH_AVAILABLE and 'sar' in locals(): try: garch_model = fit_garch_on_residuals(sar.resid, p=1, q=1) # Прогноз волатильности можно добавить в анализ except Exception as e: print("GARCH failed:", e) # 3.6 Diagnostics later for top models # 3.7 Evaluate on test set eval_rows = [] plots = [] for rec in results: model_name = rec['model'] h = rec['h'] pred = rec['pred'] # Выравниваем прогнозы с тестовыми данными по времени if hasattr(pred, 'index'): # Для прогнозов с правильным индексом aligned_pred = pred # Берем только первые h точек тестовых данных для сравнения y_true_aligned = y_test.iloc[:min(h, len(y_test))] else: # Для прогнозов без индекса (старый формат) pred_values = np.asarray(pred).ravel() aligned_pred = pd.Series(pred_values, index=y_test.index[:len(pred_values)]) y_true_aligned = y_test.iloc[:len(pred_values)] if len(y_true_aligned) == 0: continue # Обрезаем прогноз до длины тестовых данных aligned_pred = aligned_pred.iloc[:len(y_true_aligned)] # Вычисляем метрики row = { 'model': model_name, 'h': h, 'MAE': mae(y_true_aligned.values, aligned_pred.values), 'RMSE': rmse(y_true_aligned.values, aligned_pred.values), 'MAPE': mape(y_true_aligned.values, aligned_pred.values), 'SMAPE': smape(y_true_aligned.values, aligned_pred.values) } # MASE: use naive in-sample reference row['MASE'] = mase(y_true_aligned.values, aligned_pred.values, y_train.values) # R2 where possible try: row['R2'] = float((1 - np.sum((y_true_aligned.values - aligned_pred.values) ** 2) / np.sum( (y_true_aligned.values - np.mean(y_true_aligned.values)) ** 2))) except Exception: row['R2'] = np.nan eval_rows.append(row) # Визуализация fig, ax = plt.subplots(figsize=(8, 3)) # Показываем больше данных для контекста context_points = min(200, len(y_train)) ax.plot(y_train.index[-context_points:], y_train.values[-context_points:], label='train', alpha=0.7) if len(val) > 0: ax.plot(val.index, val.values, label='val', alpha=0.7) ax.plot(y_test.index, y_test.values, label='test', alpha=0.7) # Прогнозы с правильными датами ax.plot(aligned_pred.index, aligned_pred.values, label=f'pred_{model_name}_h{h}', linewidth=2) ax.legend() plots.append(fig) eval_df = pd.DataFrame(eval_rows) # Diagnostics for top-3 by RMSE diag_tables = {} try: top3 = eval_df.sort_values('RMSE').head(3)['model'].tolist() except Exception: top3 = [] for m in top3: # find corresponding fitted residuals if model was SARIMAX etc. if m.startswith('SARIMAX'): try: resid = sar.resid.dropna() lb = acorr_ljungbox(resid, lags=[10], return_df=True) diag_tables[f'ljungbox_{m}'] = lb if SCIPY_AVAILABLE: sh = shapiro(resid) diag_tables[f'shapiro_{m}'] = pd.DataFrame([{'stat': sh[0], 'pvalue': sh[1]}]) except Exception: pass # Diebold-Mariano pairwise for top 2 models (if available) dm_table = None try: if len(eval_df) >= 2: sorted_models = eval_df.sort_values('RMSE') if len(sorted_models) >= 2: m1 = sorted_models.iloc[0]['model'] m2 = sorted_models.iloc[1]['model'] # pick their predictions at h=1 (if exist) pred1 = None; pred2 = None for rec in results: if rec['model'] == m1 and rec['h'] == 1: pred1 = rec['pred'] if rec['model'] == m2 and rec['h'] == 1: pred2 = rec['pred'] if pred1 is not None and pred2 is not None: # align lengths with test y_true = y_test.values[:min(len(pred1), len(y_test))] e1 = (y_true - pred1.values[:len(y_true)]) ** 2 e2 = (y_true - pred2.values[:len(y_true)]) ** 2 dm = simple_dm_test(e1, e2) dm_table = pd.DataFrame( [{'model1': m1, 'model2': m2, 'dm_stat': dm['stat'], 'pvalue': dm['pvalue']}]) except Exception: dm_table = None # Generate report tables = {'evaluation': eval_df} if diag_tables: tables.update(diag_tables) if dm_table is not None: tables['dm_test'] = dm_table generate_report_html(out_report, plots, tables, title="Lab3 Full Report") print("Pipeline finished. Report:", out_report) # Ensure we have at least some predictions if not results: st.warning("Все модели вернули NaN. Использую простой наивный прогноз.") for h in horizons: pred_values = np.full(h, y_train.iloc[-1] if len(y_train) > 0 else 0) pred_dates = create_forecast_index(y_train.index[-1], h, inferred_freq) results.append({ 'model': 'fallback_naive', 'h': h, 'pred': pd.Series(pred_values, index=pred_dates) }) cv_results = evaluate_with_cv({ 'SARIMAX': lambda X, y, h: forecast_recursive(fit_sarimax_simple(y), y, h), 'AutoARIMA': lambda X, y, h: forecast_recursive(fit_auto_arima(y), y, h) }, df_all.drop(columns=[target_col]), df_all[target_col]) # ------------------------- # helpers used in the pipeline but defined later # ------------------------- def seasonal_naive_forecast(series: pd.Series, season: int, steps: int): last = series.iloc[-season:] reps = int(np.ceil(steps / season)) arr = np.tile(last.values, reps)[:steps] return arr def create_forecast_index(last_train_date: pd.Timestamp, steps: int, freq: str = 'D') -> pd.DatetimeIndex: """Создает правильный временной индекс для прогнозов""" try: # Если freq = 'auto', пытаемся определить частоту if freq == 'auto': freq = pd.infer_freq(pd.DatetimeIndex([last_train_date])) or 'D' # Создаем индекс с правильным смещением if isinstance(last_train_date, pd.Timestamp): start_date = last_train_date + pd.Timedelta(days=1) else: start_date = last_train_date + pd.DateOffset(days=1) return pd.date_range( start=start_date, periods=steps, freq=freq ) except Exception as e: print(f"Warning: could not create proper date index: {e}") # Fallback: числовой индекс return pd.RangeIndex(start=0, stop=steps) def forecast_recursive(model, series, steps, freq='D'): """Рекурсивная стратегия прогнозирования""" predictions = [] current_series = series.copy() for _ in range(steps): if hasattr(model, 'predict'): pred = model.predict(n_periods=1) else: pred = model.forecast(steps=1) predictions.append(pred[0]) # Обновляем ряд для следующей итерации current_series = pd.concat( [current_series, pd.Series([pred[0]], index=[current_series.index[-1] + pd.Timedelta(days=1)])]) return np.array(predictions) def forecast_direct(train_series, test_features, model_factory, steps): """Прямая стратегия - отдельная модель для каждого горизонта""" predictions = [] for h in range(1, steps + 1): # Создаем смещенную целевую переменную y_h = train_series.shift(-h).dropna() X_h = train_series.iloc[:len(y_h)] # Обучаем модель для горизонта h model = model_factory() model.fit(X_h.values.reshape(-1, 1), y_h.values) # Прогноз для горизонта h pred = model.predict(train_series.values[-1:].reshape(1, -1)) predictions.append(pred[0]) return np.array(predictions)