""" Функции для лабораторной работы №2: Прогнозирование временных рядов """ import numpy as np import pandas as pd from typing import List, Tuple, Dict, Optional from scipy import stats from scipy.stats import boxcox, boxcox_normmax from statsmodels.tsa.holtwinters import ExponentialSmoothing from statsmodels.stats.diagnostic import acorr_ljungbox from statsmodels.tsa.stattools import adfuller, kpss from sklearn.model_selection import TimeSeriesSplit from sklearn.metrics import mean_absolute_error, mean_squared_error import warnings warnings.filterwarnings('ignore') def calculate_mape(y_true: np.ndarray, y_pred: np.ndarray) -> float: """Вычисляет MAPE (Mean Absolute Percentage Error)""" y_true = np.array(y_true) y_pred = np.array(y_pred) mask = y_true != 0 if mask.sum() == 0: return np.nan return np.mean(np.abs((y_true[mask] - y_pred[mask]) / y_true[mask])) * 100 def create_advanced_features(df: pd.DataFrame, target: str, timestamp_col: str = 'timestamp') -> pd.DataFrame: """ Расширенный feature engineering: - Временные признаки (день недели, месяц, квартал) - Циклические признаки через sin/cos - Лаги: lag_1, lag_7, lag_30 - Скользящие статистики: mean, std, min, max по окнам 7, 30, 90 """ df = df.copy() df = df.set_index(timestamp_col).sort_index() # Временные признаки df['day_of_week'] = df.index.dayofweek df['month'] = df.index.month df['quarter'] = df.index.quarter df['day_of_month'] = df.index.day df['week_of_year'] = df.index.isocalendar().week # Циклические признаки df['day_of_week_sin'] = np.sin(2 * np.pi * df['day_of_week'] / 7) df['day_of_week_cos'] = np.cos(2 * np.pi * df['day_of_week'] / 7) df['month_sin'] = np.sin(2 * np.pi * df['month'] / 12) df['month_cos'] = np.cos(2 * np.pi * df['month'] / 12) # Лаги for lag in [1, 7, 30]: df[f'{target}_lag_{lag}'] = df[target].shift(lag) # Скользящие статистики windows = [7, 30, 90] for w in windows: df[f'{target}_rolling_mean_{w}'] = df[target].rolling(window=w, min_periods=1).mean() df[f'{target}_rolling_std_{w}'] = df[target].rolling(window=w, min_periods=1).std() df[f'{target}_rolling_min_{w}'] = df[target].rolling(window=w, min_periods=1).min() df[f'{target}_rolling_max_{w}'] = df[target].rolling(window=w, min_periods=1).max() # Коэффициент вариации (волатильность) for w in [7, 30]: rolling_mean = df[f'{target}_rolling_mean_{w}'] rolling_std = df[f'{target}_rolling_std_{w}'] df[f'{target}_rolling_cv_{w}'] = rolling_std / (rolling_mean + 1e-8) return df.reset_index() def apply_boxcox_transform(series: pd.Series, lambda_param: Optional[float] = None) -> Tuple[pd.Series, float]: """ Применяет преобразование Бокса-Кокса. Если lambda_param не указан, подбирает оптимальный. """ series_positive = series[series > 0] if len(series_positive) == 0: raise ValueError("Все значения должны быть положительными для преобразования Бокса-Кокса") if lambda_param is None: # Автоматический подбор lambda lambda_param = boxcox_normmax(series_positive.values) transformed_values, fitted_lambda = boxcox(series_positive.values, lmbda=lambda_param) # Создаём новый Series с теми же индексами result = pd.Series(index=series.index, dtype=float) result.loc[series > 0] = transformed_values return result, fitted_lambda def inverse_boxcox_transform(transformed_series: pd.Series, lambda_param: float) -> pd.Series: """Обратное преобразование Бокса-Кокса""" if lambda_param == 0: return np.exp(transformed_series) else: return (lambda_param * transformed_series + 1) ** (1 / lambda_param) def inverse_transformations( forecast: np.ndarray, last_train_values_transformed: np.ndarray, transform_info: Dict ) -> np.ndarray: """ Применяет обратные преобразования к прогнозу. Порядок обратного преобразования должен быть обратным порядку прямого: Прямое: transformation -> diff_order -> seasonal_diff Обратное: seasonal_diff -> diff_order -> transformation forecast: прогноз в преобразованном пространстве (после всех преобразований) last_train_values_transformed: последние значения обучающей выборки в преобразованном пространстве (после всех преобразований) transform_info: информация о применённых преобразованиях (может содержать промежуточные значения) """ result = forecast.copy() diff_order = transform_info.get('diff_order', 0) seasonal_diff = transform_info.get('seasonal_diff') # Получаем промежуточные значения из transform_info, если они есть last_values_after_diff = transform_info.get('last_values_after_diff', None) last_values_after_transform = transform_info.get('last_values_after_transform', None) # 1. Обратное сезонное дифференцирование (если было) if seasonal_diff is not None and seasonal_diff > 0: # Нужны последние seasonal_diff значений после transformation и diff, но до seasonal_diff if last_values_after_diff is not None and len(last_values_after_diff) >= seasonal_diff: last_seasonal = last_values_after_diff[-seasonal_diff:] elif len(last_train_values_transformed) >= seasonal_diff: # Fallback: используем последние значения (хотя это не совсем правильно) last_seasonal = last_train_values_transformed[-seasonal_diff:] else: last_seasonal = last_train_values_transformed if len(last_train_values_transformed) > 0 else np.array([0]) for i in range(len(result)): if i < len(last_seasonal): result[i] = result[i] + last_seasonal[i] else: # Используем предыдущие прогнозы result[i] = result[i] + result[i - seasonal_diff] # 2. Обратное обычное дифференцирование (если было) for _ in range(diff_order): # Нужны последние diff_order значений после transformation, но до diff if last_values_after_transform is not None and len(last_values_after_transform) > 0: last_val = last_values_after_transform[-1] elif len(last_train_values_transformed) > 0: # Fallback last_val = last_train_values_transformed[-1] else: last_val = 0 for i in range(len(result)): if i == 0: result[i] = result[i] + last_val else: result[i] = result[i] + result[i - 1] # 3. Обратное преобразование для стабилизации дисперсии if transform_info.get('transformation') == 'log': result = np.exp(result) elif transform_info.get('transformation') == 'boxcox': lambda_param = transform_info.get('lambda') if lambda_param is not None: if lambda_param == 0: result = np.exp(result) else: result = (lambda_param * result + 1) ** (1 / lambda_param) return result def apply_transformations( series: pd.Series, transformation: str = 'none', lambda_param: Optional[float] = None, diff_order: int = 0, seasonal_diff: Optional[int] = None ) -> Tuple[pd.Series, Dict]: """ Применяет цепочку преобразований к ряду. transformation: 'none', 'log', 'boxcox' diff_order: порядок обычного дифференцирования seasonal_diff: период сезонного дифференцирования Возвращает преобразованный ряд и словарь с информацией о преобразованиях, включая промежуточные значения для обратного преобразования. """ result = series.copy() info = {'transformation': transformation, 'lambda': None, 'diff_order': diff_order, 'seasonal_diff': seasonal_diff} # Преобразование для стабилизации дисперсии if transformation == 'log': if (result <= 0).any(): raise ValueError("Для лог-трансформации все значения должны быть положительными") result = np.log(result) elif transformation == 'boxcox': result, lambda_param = apply_boxcox_transform(result, lambda_param) info['lambda'] = lambda_param # Сохраняем значения после transformation (для обратного diff) result_after_transform = result.copy() # Обычное дифференцирование for _ in range(diff_order): result = result.diff() # Сохраняем значения после diff (для обратного seasonal_diff) result_after_diff = result.copy() # Сезонное дифференцирование if seasonal_diff is not None and seasonal_diff > 0: result = result.diff(periods=seasonal_diff) # Сохраняем промежуточные значения для обратного преобразования info['last_values_after_transform'] = result_after_transform.values[-max(diff_order, 1):] if len(result_after_transform) > 0 else np.array([]) info['last_values_after_diff'] = result_after_diff.values[-max(seasonal_diff if seasonal_diff else 1, 1):] if len(result_after_diff) > 0 else np.array([]) return result.dropna(), info def recursive_forecast( model_func, train_data: pd.Series, horizon: int, alpha: Optional[float] = None, **model_kwargs ) -> Tuple[np.ndarray, Optional[Tuple[np.ndarray, np.ndarray]]]: """ Рекурсивная стратегия прогнозирования: Одна модель → итеративное использование прогнозов Возвращает прогнозы и опционально доверительные интервалы (lower, upper) """ forecasts = [] conf_lower = [] if alpha is not None else None conf_upper = [] if alpha is not None else None current_data = train_data.copy() # Определяем тип индекса для правильного добавления новых значений is_datetime = pd.api.types.is_datetime64_any_dtype(current_data.index) for h in range(horizon): # Обучаем модель на текущих данных model = model_func(current_data, **model_kwargs) # Прогнозируем на 1 шаг вперёд if alpha is not None: try: forecast_result = model.forecast(steps=1, alpha=alpha) if isinstance(forecast_result, tuple): forecast_value = forecast_result[0][0] if len(forecast_result[0]) > 0 else forecast_result[0] if len(forecast_result) > 1: conf_lower.append(forecast_result[1][0] if len(forecast_result[1]) > 0 else forecast_result[1]) conf_upper.append(forecast_result[2][0] if len(forecast_result[2]) > 0 else forecast_result[2]) else: forecast_value = forecast_result[0] if hasattr(forecast_result, '__getitem__') else float(forecast_result) except: # Если доверительные интервалы не поддерживаются, используем обычный прогноз forecast = model.forecast(steps=1) forecast_value = forecast[0] if hasattr(forecast, '__getitem__') else float(forecast) else: forecast = model.forecast(steps=1) forecast_value = forecast[0] if hasattr(forecast, '__getitem__') else float(forecast) forecasts.append(forecast_value) # Добавляем прогноз к данным для следующей итерации if is_datetime: # Для DatetimeIndex используем частоту или инференс try: freq = pd.infer_freq(current_data.index) or 'D' # Используем pd.date_range для создания следующей даты last_date = current_data.index[-1] next_dates = pd.date_range(start=last_date, periods=2, freq=freq) if len(next_dates) >= 2: next_idx = next_dates[1] # Берём вторую дату (первая = last_date) else: # Fallback next_idx = len(current_data) is_datetime = False except: # Если не удалось определить частоту, используем числовой индекс try: # Пробуем простой способ через Timedelta next_idx = current_data.index[-1] + pd.Timedelta(days=1) except: next_idx = len(current_data) is_datetime = False else: # Для числового индекса просто увеличиваем на 1 next_idx = len(current_data) if is_datetime: current_data = pd.concat([current_data, pd.Series([forecast_value], index=[next_idx])]) else: # Используем числовой индекс current_data = pd.concat([current_data, pd.Series([forecast_value], index=[next_idx])]) result = np.array(forecasts) if alpha is not None and conf_lower and conf_upper: return result, (np.array(conf_lower), np.array(conf_upper)) return result, None def direct_forecast( model_func, train_data: pd.Series, horizon: int, alpha: Optional[float] = None, **model_kwargs ) -> Tuple[np.ndarray, Optional[Tuple[np.ndarray, np.ndarray]]]: """ Прямая стратегия прогнозирования: Отдельная модель для каждого шага t+1, ..., t+h Возвращает прогнозы и опционально доверительные интервалы (lower, upper) """ forecasts = [] conf_lower = [] if alpha is not None else None conf_upper = [] if alpha is not None else None for h in range(1, horizon + 1): # Обучаем отдельную модель для шага h model = model_func(train_data, **model_kwargs) # Прогнозируем на h шагов вперёд и берём последний if alpha is not None: try: forecast_result = model.forecast(steps=h, alpha=alpha) if isinstance(forecast_result, tuple): forecast_value = forecast_result[0][-1] if len(forecast_result[0]) > 0 else forecast_result[0] if len(forecast_result) > 1: conf_lower.append(forecast_result[1][-1] if len(forecast_result[1]) > 0 else forecast_result[1]) conf_upper.append(forecast_result[2][-1] if len(forecast_result[2]) > 0 else forecast_result[2]) else: forecast_value = forecast_result[-1] except: forecast = model.forecast(steps=h) forecast_value = forecast[-1] else: forecast = model.forecast(steps=h) forecast_value = forecast[-1] forecasts.append(forecast_value) result = np.array(forecasts) if alpha is not None and conf_lower and conf_upper: return result, (np.array(conf_lower), np.array(conf_upper)) return result, None def hybrid_forecast( model_func, train_data: pd.Series, horizon: int, recursive_steps: int = None, alpha: Optional[float] = None, **model_kwargs ) -> Tuple[np.ndarray, Optional[Tuple[np.ndarray, np.ndarray]]]: """ Гибридная стратегия: Рекурсивная для ближайших шагов, прямая — для дальних Возвращает прогнозы и опционально доверительные интервалы (lower, upper) """ if recursive_steps is None: recursive_steps = max(1, horizon // 2) forecasts = [] conf_lower = [] if alpha is not None else None conf_upper = [] if alpha is not None else None # Рекурсивная часть recursive_result = recursive_forecast(model_func, train_data, recursive_steps, alpha=alpha, **model_kwargs) if isinstance(recursive_result, tuple): recursive_forecasts, recursive_conf = recursive_result if recursive_conf is not None: conf_lower.extend(recursive_conf[0]) conf_upper.extend(recursive_conf[1]) else: recursive_forecasts = recursive_result forecasts.extend(recursive_forecasts) # Прямая часть для оставшихся шагов if horizon > recursive_steps: # Используем последние данные + рекурсивные прогнозы is_datetime = pd.api.types.is_datetime64_any_dtype(train_data.index) if is_datetime: try: freq = pd.infer_freq(train_data.index) or 'D' # Используем pd.date_range для создания дат начиная с последней даты + 1 период last_date = train_data.index[-1] extended_index = pd.date_range( start=last_date, periods=len(recursive_forecasts) + 1, freq=freq )[1:] # Берём все даты кроме первой (которая равна last_date) except: # Fallback на числовой индекс try: # Пробуем через date_range с periods last_date = train_data.index[-1] extended_index = pd.date_range( start=last_date, periods=len(recursive_forecasts) + 1, freq='D' )[1:] # Берём все даты кроме первой except: extended_index = range(len(train_data), len(train_data) + len(recursive_forecasts)) else: extended_index = range(len(train_data), len(train_data) + len(recursive_forecasts)) extended_data = pd.concat([ train_data, pd.Series(recursive_forecasts, index=extended_index) ]) remaining_horizon = horizon - recursive_steps direct_result = direct_forecast(model_func, extended_data, remaining_horizon, alpha=alpha, **model_kwargs) if isinstance(direct_result, tuple): direct_forecasts, direct_conf = direct_result if direct_conf is not None: conf_lower.extend(direct_conf[0]) conf_upper.extend(direct_conf[1]) else: direct_forecasts = direct_result forecasts.extend(direct_forecasts) result = np.array(forecasts[:horizon]) if alpha is not None and conf_lower and conf_upper: return result, (np.array(conf_lower[:horizon]), np.array(conf_upper[:horizon])) return result, None def create_exponential_smoothing_model( train_data: pd.Series, trend: Optional[str] = None, seasonal: Optional[str] = None, seasonal_periods: Optional[int] = None, optimized: bool = True ) -> ExponentialSmoothing: """Создаёт и обучает модель экспоненциального сглаживания""" try: model = ExponentialSmoothing( train_data, trend=trend, seasonal=seasonal, seasonal_periods=seasonal_periods, initialization_method='estimated' if optimized else 'simple' ) fitted_model = model.fit(optimized=optimized) return fitted_model except Exception as e: raise ValueError(f"Ошибка при создании модели: {e}") def evaluate_forecast(y_true: np.ndarray, y_pred: np.ndarray) -> Dict[str, float]: """Вычисляет метрики качества прогноза""" y_true = np.array(y_true) y_pred = np.array(y_pred) mae = mean_absolute_error(y_true, y_pred) rmse = np.sqrt(mean_squared_error(y_true, y_pred)) mape = calculate_mape(y_true, y_pred) return { 'MAE': mae, 'RMSE': rmse, 'MAPE': mape } def naive_forecast(train_data: pd.Series, horizon: int) -> np.ndarray: """Наивный прогноз: y[t+h] = y[t]""" last_value = train_data.iloc[-1] return np.full(horizon, last_value) def time_series_cv_sliding_window( model_func, data: pd.Series, train_size: int, test_size: int, horizon: int, step: int = 1, **model_kwargs ) -> List[Dict]: """ Кросс-валидация со скользящим окном (фиксированная длина обучения) """ results = [] n = len(data) for i in range(0, n - train_size - test_size + 1, step): train_end = i + train_size test_end = min(train_end + test_size, n) train_data = data.iloc[i:train_end] test_data = data.iloc[train_end:test_end] try: model = model_func(train_data, **model_kwargs) forecast = model.forecast(steps=min(horizon, len(test_data))) metrics = evaluate_forecast(test_data.values[:len(forecast)], forecast) metrics['fold'] = len(results) + 1 metrics['train_start'] = train_data.index[0] metrics['train_end'] = train_data.index[-1] metrics['test_start'] = test_data.index[0] metrics['test_end'] = test_data.index[-1] results.append(metrics) except Exception as e: print(f"Ошибка в фолде {len(results) + 1}: {e}") return results def time_series_cv_expanding_window( model_func, data: pd.Series, initial_train_size: int, test_size: int, horizon: int, step: int = 1, **model_kwargs ) -> List[Dict]: """ Кросс-валидация с расширяющимся окном (обучение растёт со временем) """ results = [] n = len(data) for i in range(initial_train_size, n - test_size + 1, step): train_end = i test_end = min(train_end + test_size, n) train_data = data.iloc[:train_end] test_data = data.iloc[train_end:test_end] try: model = model_func(train_data, **model_kwargs) forecast = model.forecast(steps=min(horizon, len(test_data))) metrics = evaluate_forecast(test_data.values[:len(forecast)], forecast) metrics['fold'] = len(results) + 1 metrics['train_start'] = train_data.index[0] metrics['train_end'] = train_data.index[-1] metrics['test_start'] = test_data.index[0] metrics['test_end'] = test_data.index[-1] results.append(metrics) except Exception as e: print(f"Ошибка в фолде {len(results) + 1}: {e}") return results def diagnose_model_residuals(residuals: np.ndarray, lags: int = 10) -> Dict: """ Диагностика остатков модели: - Тест Льюнга-Бокса на автокорреляцию - Проверка нормальности (Shapiro-Wilk) - Q-Q plot данные """ residuals_clean = residuals[~np.isnan(residuals)] if len(residuals_clean) < 3: return {'error': 'Недостаточно данных для диагностики'} results = {} # Тест Льюнга-Бокса try: lb_stat, lb_pvalue = acorr_ljungbox(residuals_clean, lags=min(lags, len(residuals_clean) - 1), return_df=False) results['ljung_box'] = { 'statistic': float(lb_stat[-1]) if len(lb_stat) > 0 else None, 'pvalue': float(lb_pvalue[-1]) if len(lb_pvalue) > 0 else None, 'lags': lags } except Exception as e: results['ljung_box'] = {'error': str(e)} # Тест Шапиро-Уилка на нормальность try: if len(residuals_clean) <= 5000: # Ограничение для Shapiro-Wilk shapiro_stat, shapiro_pvalue = stats.shapiro(residuals_clean) results['shapiro_wilk'] = { 'statistic': float(shapiro_stat), 'pvalue': float(shapiro_pvalue) } else: # Для больших выборок используем тест нормальности из scipy k2_stat, k2_pvalue = stats.normaltest(residuals_clean) results['normality_test'] = { 'statistic': float(k2_stat), 'pvalue': float(k2_pvalue), 'test': 'normaltest' } except Exception as e: results['normality_test'] = {'error': str(e)} # Статистики остатков results['residual_stats'] = { 'mean': float(np.mean(residuals_clean)), 'std': float(np.std(residuals_clean)), 'min': float(np.min(residuals_clean)), 'max': float(np.max(residuals_clean)), 'count': len(residuals_clean) } # Проверка стационарности остатков try: adf_stat, adf_pvalue, _, _, _, _ = adfuller(residuals_clean) kpss_stat, kpss_pvalue, _, _ = kpss(residuals_clean) results['stationarity'] = { 'adf': {'statistic': float(adf_stat), 'pvalue': float(adf_pvalue)}, 'kpss': {'statistic': float(kpss_stat), 'pvalue': float(kpss_pvalue)} } except Exception as e: results['stationarity'] = {'error': str(e)} return results