Spaces:
No application file
No application file
| """ | |
| Функции для лабораторной работы №2: Прогнозирование временных рядов | |
| """ | |
| import numpy as np | |
| import pandas as pd | |
| from typing import List, Tuple, Dict, Optional | |
| from scipy import stats | |
| from scipy.stats import boxcox, boxcox_normmax | |
| from statsmodels.tsa.holtwinters import ExponentialSmoothing | |
| from statsmodels.stats.diagnostic import acorr_ljungbox | |
| from statsmodels.tsa.stattools import adfuller, kpss | |
| from sklearn.model_selection import TimeSeriesSplit | |
| from sklearn.metrics import mean_absolute_error, mean_squared_error | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| def calculate_mape(y_true: np.ndarray, y_pred: np.ndarray) -> float: | |
| """Вычисляет MAPE (Mean Absolute Percentage Error)""" | |
| y_true = np.array(y_true) | |
| y_pred = np.array(y_pred) | |
| mask = y_true != 0 | |
| if mask.sum() == 0: | |
| return np.nan | |
| return np.mean(np.abs((y_true[mask] - y_pred[mask]) / y_true[mask])) * 100 | |
| def create_advanced_features(df: pd.DataFrame, target: str, timestamp_col: str = 'timestamp') -> pd.DataFrame: | |
| """ | |
| Расширенный feature engineering: | |
| - Временные признаки (день недели, месяц, квартал) | |
| - Циклические признаки через sin/cos | |
| - Лаги: lag_1, lag_7, lag_30 | |
| - Скользящие статистики: mean, std, min, max по окнам 7, 30, 90 | |
| """ | |
| df = df.copy() | |
| df = df.set_index(timestamp_col).sort_index() | |
| # Временные признаки | |
| df['day_of_week'] = df.index.dayofweek | |
| df['month'] = df.index.month | |
| df['quarter'] = df.index.quarter | |
| df['day_of_month'] = df.index.day | |
| df['week_of_year'] = df.index.isocalendar().week | |
| # Циклические признаки | |
| df['day_of_week_sin'] = np.sin(2 * np.pi * df['day_of_week'] / 7) | |
| df['day_of_week_cos'] = np.cos(2 * np.pi * df['day_of_week'] / 7) | |
| df['month_sin'] = np.sin(2 * np.pi * df['month'] / 12) | |
| df['month_cos'] = np.cos(2 * np.pi * df['month'] / 12) | |
| # Лаги | |
| for lag in [1, 7, 30]: | |
| df[f'{target}_lag_{lag}'] = df[target].shift(lag) | |
| # Скользящие статистики | |
| windows = [7, 30, 90] | |
| for w in windows: | |
| df[f'{target}_rolling_mean_{w}'] = df[target].rolling(window=w, min_periods=1).mean() | |
| df[f'{target}_rolling_std_{w}'] = df[target].rolling(window=w, min_periods=1).std() | |
| df[f'{target}_rolling_min_{w}'] = df[target].rolling(window=w, min_periods=1).min() | |
| df[f'{target}_rolling_max_{w}'] = df[target].rolling(window=w, min_periods=1).max() | |
| # Коэффициент вариации (волатильность) | |
| for w in [7, 30]: | |
| rolling_mean = df[f'{target}_rolling_mean_{w}'] | |
| rolling_std = df[f'{target}_rolling_std_{w}'] | |
| df[f'{target}_rolling_cv_{w}'] = rolling_std / (rolling_mean + 1e-8) | |
| return df.reset_index() | |
| def apply_boxcox_transform(series: pd.Series, lambda_param: Optional[float] = None) -> Tuple[pd.Series, float]: | |
| """ | |
| Применяет преобразование Бокса-Кокса. | |
| Если lambda_param не указан, подбирает оптимальный. | |
| """ | |
| series_positive = series[series > 0] | |
| if len(series_positive) == 0: | |
| raise ValueError("Все значения должны быть положительными для преобразования Бокса-Кокса") | |
| if lambda_param is None: | |
| # Автоматический подбор lambda | |
| lambda_param = boxcox_normmax(series_positive.values) | |
| transformed_values, fitted_lambda = boxcox(series_positive.values, lmbda=lambda_param) | |
| # Создаём новый Series с теми же индексами | |
| result = pd.Series(index=series.index, dtype=float) | |
| result.loc[series > 0] = transformed_values | |
| return result, fitted_lambda | |
| def inverse_boxcox_transform(transformed_series: pd.Series, lambda_param: float) -> pd.Series: | |
| """Обратное преобразование Бокса-Кокса""" | |
| if lambda_param == 0: | |
| return np.exp(transformed_series) | |
| else: | |
| return (lambda_param * transformed_series + 1) ** (1 / lambda_param) | |
| def inverse_transformations( | |
| forecast: np.ndarray, | |
| last_train_values_transformed: np.ndarray, | |
| transform_info: Dict | |
| ) -> np.ndarray: | |
| """ | |
| Применяет обратные преобразования к прогнозу. | |
| Порядок обратного преобразования должен быть обратным порядку прямого: | |
| Прямое: transformation -> diff_order -> seasonal_diff | |
| Обратное: seasonal_diff -> diff_order -> transformation | |
| forecast: прогноз в преобразованном пространстве (после всех преобразований) | |
| last_train_values_transformed: последние значения обучающей выборки в преобразованном пространстве (после всех преобразований) | |
| transform_info: информация о применённых преобразованиях (может содержать промежуточные значения) | |
| """ | |
| result = forecast.copy() | |
| diff_order = transform_info.get('diff_order', 0) | |
| seasonal_diff = transform_info.get('seasonal_diff') | |
| # Получаем промежуточные значения из transform_info, если они есть | |
| last_values_after_diff = transform_info.get('last_values_after_diff', None) | |
| last_values_after_transform = transform_info.get('last_values_after_transform', None) | |
| # 1. Обратное сезонное дифференцирование (если было) | |
| if seasonal_diff is not None and seasonal_diff > 0: | |
| # Нужны последние seasonal_diff значений после transformation и diff, но до seasonal_diff | |
| if last_values_after_diff is not None and len(last_values_after_diff) >= seasonal_diff: | |
| last_seasonal = last_values_after_diff[-seasonal_diff:] | |
| elif len(last_train_values_transformed) >= seasonal_diff: | |
| # Fallback: используем последние значения (хотя это не совсем правильно) | |
| last_seasonal = last_train_values_transformed[-seasonal_diff:] | |
| else: | |
| last_seasonal = last_train_values_transformed if len(last_train_values_transformed) > 0 else np.array([0]) | |
| for i in range(len(result)): | |
| if i < len(last_seasonal): | |
| result[i] = result[i] + last_seasonal[i] | |
| else: | |
| # Используем предыдущие прогнозы | |
| result[i] = result[i] + result[i - seasonal_diff] | |
| # 2. Обратное обычное дифференцирование (если было) | |
| for _ in range(diff_order): | |
| # Нужны последние diff_order значений после transformation, но до diff | |
| if last_values_after_transform is not None and len(last_values_after_transform) > 0: | |
| last_val = last_values_after_transform[-1] | |
| elif len(last_train_values_transformed) > 0: | |
| # Fallback | |
| last_val = last_train_values_transformed[-1] | |
| else: | |
| last_val = 0 | |
| for i in range(len(result)): | |
| if i == 0: | |
| result[i] = result[i] + last_val | |
| else: | |
| result[i] = result[i] + result[i - 1] | |
| # 3. Обратное преобразование для стабилизации дисперсии | |
| if transform_info.get('transformation') == 'log': | |
| result = np.exp(result) | |
| elif transform_info.get('transformation') == 'boxcox': | |
| lambda_param = transform_info.get('lambda') | |
| if lambda_param is not None: | |
| if lambda_param == 0: | |
| result = np.exp(result) | |
| else: | |
| result = (lambda_param * result + 1) ** (1 / lambda_param) | |
| return result | |
| def apply_transformations( | |
| series: pd.Series, | |
| transformation: str = 'none', | |
| lambda_param: Optional[float] = None, | |
| diff_order: int = 0, | |
| seasonal_diff: Optional[int] = None | |
| ) -> Tuple[pd.Series, Dict]: | |
| """ | |
| Применяет цепочку преобразований к ряду. | |
| transformation: 'none', 'log', 'boxcox' | |
| diff_order: порядок обычного дифференцирования | |
| seasonal_diff: период сезонного дифференцирования | |
| Возвращает преобразованный ряд и словарь с информацией о преобразованиях, | |
| включая промежуточные значения для обратного преобразования. | |
| """ | |
| result = series.copy() | |
| info = {'transformation': transformation, 'lambda': None, 'diff_order': diff_order, 'seasonal_diff': seasonal_diff} | |
| # Преобразование для стабилизации дисперсии | |
| if transformation == 'log': | |
| if (result <= 0).any(): | |
| raise ValueError("Для лог-трансформации все значения должны быть положительными") | |
| result = np.log(result) | |
| elif transformation == 'boxcox': | |
| result, lambda_param = apply_boxcox_transform(result, lambda_param) | |
| info['lambda'] = lambda_param | |
| # Сохраняем значения после transformation (для обратного diff) | |
| result_after_transform = result.copy() | |
| # Обычное дифференцирование | |
| for _ in range(diff_order): | |
| result = result.diff() | |
| # Сохраняем значения после diff (для обратного seasonal_diff) | |
| result_after_diff = result.copy() | |
| # Сезонное дифференцирование | |
| if seasonal_diff is not None and seasonal_diff > 0: | |
| result = result.diff(periods=seasonal_diff) | |
| # Сохраняем промежуточные значения для обратного преобразования | |
| info['last_values_after_transform'] = result_after_transform.values[-max(diff_order, 1):] if len(result_after_transform) > 0 else np.array([]) | |
| info['last_values_after_diff'] = result_after_diff.values[-max(seasonal_diff if seasonal_diff else 1, 1):] if len(result_after_diff) > 0 else np.array([]) | |
| return result.dropna(), info | |
| def recursive_forecast( | |
| model_func, | |
| train_data: pd.Series, | |
| horizon: int, | |
| alpha: Optional[float] = None, | |
| **model_kwargs | |
| ) -> Tuple[np.ndarray, Optional[Tuple[np.ndarray, np.ndarray]]]: | |
| """ | |
| Рекурсивная стратегия прогнозирования: | |
| Одна модель → итеративное использование прогнозов | |
| Возвращает прогнозы и опционально доверительные интервалы (lower, upper) | |
| """ | |
| forecasts = [] | |
| conf_lower = [] if alpha is not None else None | |
| conf_upper = [] if alpha is not None else None | |
| current_data = train_data.copy() | |
| # Определяем тип индекса для правильного добавления новых значений | |
| is_datetime = pd.api.types.is_datetime64_any_dtype(current_data.index) | |
| for h in range(horizon): | |
| # Обучаем модель на текущих данных | |
| model = model_func(current_data, **model_kwargs) | |
| # Прогнозируем на 1 шаг вперёд | |
| if alpha is not None: | |
| try: | |
| forecast_result = model.forecast(steps=1, alpha=alpha) | |
| if isinstance(forecast_result, tuple): | |
| forecast_value = forecast_result[0][0] if len(forecast_result[0]) > 0 else forecast_result[0] | |
| if len(forecast_result) > 1: | |
| conf_lower.append(forecast_result[1][0] if len(forecast_result[1]) > 0 else forecast_result[1]) | |
| conf_upper.append(forecast_result[2][0] if len(forecast_result[2]) > 0 else forecast_result[2]) | |
| else: | |
| forecast_value = forecast_result[0] if hasattr(forecast_result, '__getitem__') else float(forecast_result) | |
| except: | |
| # Если доверительные интервалы не поддерживаются, используем обычный прогноз | |
| forecast = model.forecast(steps=1) | |
| forecast_value = forecast[0] if hasattr(forecast, '__getitem__') else float(forecast) | |
| else: | |
| forecast = model.forecast(steps=1) | |
| forecast_value = forecast[0] if hasattr(forecast, '__getitem__') else float(forecast) | |
| forecasts.append(forecast_value) | |
| # Добавляем прогноз к данным для следующей итерации | |
| if is_datetime: | |
| # Для DatetimeIndex используем частоту или инференс | |
| try: | |
| freq = pd.infer_freq(current_data.index) or 'D' | |
| # Используем pd.date_range для создания следующей даты | |
| last_date = current_data.index[-1] | |
| next_dates = pd.date_range(start=last_date, periods=2, freq=freq) | |
| if len(next_dates) >= 2: | |
| next_idx = next_dates[1] # Берём вторую дату (первая = last_date) | |
| else: | |
| # Fallback | |
| next_idx = len(current_data) | |
| is_datetime = False | |
| except: | |
| # Если не удалось определить частоту, используем числовой индекс | |
| try: | |
| # Пробуем простой способ через Timedelta | |
| next_idx = current_data.index[-1] + pd.Timedelta(days=1) | |
| except: | |
| next_idx = len(current_data) | |
| is_datetime = False | |
| else: | |
| # Для числового индекса просто увеличиваем на 1 | |
| next_idx = len(current_data) | |
| if is_datetime: | |
| current_data = pd.concat([current_data, pd.Series([forecast_value], index=[next_idx])]) | |
| else: | |
| # Используем числовой индекс | |
| current_data = pd.concat([current_data, pd.Series([forecast_value], index=[next_idx])]) | |
| result = np.array(forecasts) | |
| if alpha is not None and conf_lower and conf_upper: | |
| return result, (np.array(conf_lower), np.array(conf_upper)) | |
| return result, None | |
| def direct_forecast( | |
| model_func, | |
| train_data: pd.Series, | |
| horizon: int, | |
| alpha: Optional[float] = None, | |
| **model_kwargs | |
| ) -> Tuple[np.ndarray, Optional[Tuple[np.ndarray, np.ndarray]]]: | |
| """ | |
| Прямая стратегия прогнозирования: | |
| Отдельная модель для каждого шага t+1, ..., t+h | |
| Возвращает прогнозы и опционально доверительные интервалы (lower, upper) | |
| """ | |
| forecasts = [] | |
| conf_lower = [] if alpha is not None else None | |
| conf_upper = [] if alpha is not None else None | |
| for h in range(1, horizon + 1): | |
| # Обучаем отдельную модель для шага h | |
| model = model_func(train_data, **model_kwargs) | |
| # Прогнозируем на h шагов вперёд и берём последний | |
| if alpha is not None: | |
| try: | |
| forecast_result = model.forecast(steps=h, alpha=alpha) | |
| if isinstance(forecast_result, tuple): | |
| forecast_value = forecast_result[0][-1] if len(forecast_result[0]) > 0 else forecast_result[0] | |
| if len(forecast_result) > 1: | |
| conf_lower.append(forecast_result[1][-1] if len(forecast_result[1]) > 0 else forecast_result[1]) | |
| conf_upper.append(forecast_result[2][-1] if len(forecast_result[2]) > 0 else forecast_result[2]) | |
| else: | |
| forecast_value = forecast_result[-1] | |
| except: | |
| forecast = model.forecast(steps=h) | |
| forecast_value = forecast[-1] | |
| else: | |
| forecast = model.forecast(steps=h) | |
| forecast_value = forecast[-1] | |
| forecasts.append(forecast_value) | |
| result = np.array(forecasts) | |
| if alpha is not None and conf_lower and conf_upper: | |
| return result, (np.array(conf_lower), np.array(conf_upper)) | |
| return result, None | |
| def hybrid_forecast( | |
| model_func, | |
| train_data: pd.Series, | |
| horizon: int, | |
| recursive_steps: int = None, | |
| alpha: Optional[float] = None, | |
| **model_kwargs | |
| ) -> Tuple[np.ndarray, Optional[Tuple[np.ndarray, np.ndarray]]]: | |
| """ | |
| Гибридная стратегия: | |
| Рекурсивная для ближайших шагов, прямая — для дальних | |
| Возвращает прогнозы и опционально доверительные интервалы (lower, upper) | |
| """ | |
| if recursive_steps is None: | |
| recursive_steps = max(1, horizon // 2) | |
| forecasts = [] | |
| conf_lower = [] if alpha is not None else None | |
| conf_upper = [] if alpha is not None else None | |
| # Рекурсивная часть | |
| recursive_result = recursive_forecast(model_func, train_data, recursive_steps, alpha=alpha, **model_kwargs) | |
| if isinstance(recursive_result, tuple): | |
| recursive_forecasts, recursive_conf = recursive_result | |
| if recursive_conf is not None: | |
| conf_lower.extend(recursive_conf[0]) | |
| conf_upper.extend(recursive_conf[1]) | |
| else: | |
| recursive_forecasts = recursive_result | |
| forecasts.extend(recursive_forecasts) | |
| # Прямая часть для оставшихся шагов | |
| if horizon > recursive_steps: | |
| # Используем последние данные + рекурсивные прогнозы | |
| is_datetime = pd.api.types.is_datetime64_any_dtype(train_data.index) | |
| if is_datetime: | |
| try: | |
| freq = pd.infer_freq(train_data.index) or 'D' | |
| # Используем pd.date_range для создания дат начиная с последней даты + 1 период | |
| last_date = train_data.index[-1] | |
| extended_index = pd.date_range( | |
| start=last_date, | |
| periods=len(recursive_forecasts) + 1, | |
| freq=freq | |
| )[1:] # Берём все даты кроме первой (которая равна last_date) | |
| except: | |
| # Fallback на числовой индекс | |
| try: | |
| # Пробуем через date_range с periods | |
| last_date = train_data.index[-1] | |
| extended_index = pd.date_range( | |
| start=last_date, | |
| periods=len(recursive_forecasts) + 1, | |
| freq='D' | |
| )[1:] # Берём все даты кроме первой | |
| except: | |
| extended_index = range(len(train_data), len(train_data) + len(recursive_forecasts)) | |
| else: | |
| extended_index = range(len(train_data), len(train_data) + len(recursive_forecasts)) | |
| extended_data = pd.concat([ | |
| train_data, | |
| pd.Series(recursive_forecasts, index=extended_index) | |
| ]) | |
| remaining_horizon = horizon - recursive_steps | |
| direct_result = direct_forecast(model_func, extended_data, remaining_horizon, alpha=alpha, **model_kwargs) | |
| if isinstance(direct_result, tuple): | |
| direct_forecasts, direct_conf = direct_result | |
| if direct_conf is not None: | |
| conf_lower.extend(direct_conf[0]) | |
| conf_upper.extend(direct_conf[1]) | |
| else: | |
| direct_forecasts = direct_result | |
| forecasts.extend(direct_forecasts) | |
| result = np.array(forecasts[:horizon]) | |
| if alpha is not None and conf_lower and conf_upper: | |
| return result, (np.array(conf_lower[:horizon]), np.array(conf_upper[:horizon])) | |
| return result, None | |
| def create_exponential_smoothing_model( | |
| train_data: pd.Series, | |
| trend: Optional[str] = None, | |
| seasonal: Optional[str] = None, | |
| seasonal_periods: Optional[int] = None, | |
| optimized: bool = True | |
| ) -> ExponentialSmoothing: | |
| """Создаёт и обучает модель экспоненциального сглаживания""" | |
| try: | |
| model = ExponentialSmoothing( | |
| train_data, | |
| trend=trend, | |
| seasonal=seasonal, | |
| seasonal_periods=seasonal_periods, | |
| initialization_method='estimated' if optimized else 'simple' | |
| ) | |
| fitted_model = model.fit(optimized=optimized) | |
| return fitted_model | |
| except Exception as e: | |
| raise ValueError(f"Ошибка при создании модели: {e}") | |
| def evaluate_forecast(y_true: np.ndarray, y_pred: np.ndarray) -> Dict[str, float]: | |
| """Вычисляет метрики качества прогноза""" | |
| y_true = np.array(y_true) | |
| y_pred = np.array(y_pred) | |
| mae = mean_absolute_error(y_true, y_pred) | |
| rmse = np.sqrt(mean_squared_error(y_true, y_pred)) | |
| mape = calculate_mape(y_true, y_pred) | |
| return { | |
| 'MAE': mae, | |
| 'RMSE': rmse, | |
| 'MAPE': mape | |
| } | |
| def naive_forecast(train_data: pd.Series, horizon: int) -> np.ndarray: | |
| """Наивный прогноз: y[t+h] = y[t]""" | |
| last_value = train_data.iloc[-1] | |
| return np.full(horizon, last_value) | |
| def time_series_cv_sliding_window( | |
| model_func, | |
| data: pd.Series, | |
| train_size: int, | |
| test_size: int, | |
| horizon: int, | |
| step: int = 1, | |
| **model_kwargs | |
| ) -> List[Dict]: | |
| """ | |
| Кросс-валидация со скользящим окном (фиксированная длина обучения) | |
| """ | |
| results = [] | |
| n = len(data) | |
| for i in range(0, n - train_size - test_size + 1, step): | |
| train_end = i + train_size | |
| test_end = min(train_end + test_size, n) | |
| train_data = data.iloc[i:train_end] | |
| test_data = data.iloc[train_end:test_end] | |
| try: | |
| model = model_func(train_data, **model_kwargs) | |
| forecast = model.forecast(steps=min(horizon, len(test_data))) | |
| metrics = evaluate_forecast(test_data.values[:len(forecast)], forecast) | |
| metrics['fold'] = len(results) + 1 | |
| metrics['train_start'] = train_data.index[0] | |
| metrics['train_end'] = train_data.index[-1] | |
| metrics['test_start'] = test_data.index[0] | |
| metrics['test_end'] = test_data.index[-1] | |
| results.append(metrics) | |
| except Exception as e: | |
| print(f"Ошибка в фолде {len(results) + 1}: {e}") | |
| return results | |
| def time_series_cv_expanding_window( | |
| model_func, | |
| data: pd.Series, | |
| initial_train_size: int, | |
| test_size: int, | |
| horizon: int, | |
| step: int = 1, | |
| **model_kwargs | |
| ) -> List[Dict]: | |
| """ | |
| Кросс-валидация с расширяющимся окном (обучение растёт со временем) | |
| """ | |
| results = [] | |
| n = len(data) | |
| for i in range(initial_train_size, n - test_size + 1, step): | |
| train_end = i | |
| test_end = min(train_end + test_size, n) | |
| train_data = data.iloc[:train_end] | |
| test_data = data.iloc[train_end:test_end] | |
| try: | |
| model = model_func(train_data, **model_kwargs) | |
| forecast = model.forecast(steps=min(horizon, len(test_data))) | |
| metrics = evaluate_forecast(test_data.values[:len(forecast)], forecast) | |
| metrics['fold'] = len(results) + 1 | |
| metrics['train_start'] = train_data.index[0] | |
| metrics['train_end'] = train_data.index[-1] | |
| metrics['test_start'] = test_data.index[0] | |
| metrics['test_end'] = test_data.index[-1] | |
| results.append(metrics) | |
| except Exception as e: | |
| print(f"Ошибка в фолде {len(results) + 1}: {e}") | |
| return results | |
| def diagnose_model_residuals(residuals: np.ndarray, lags: int = 10) -> Dict: | |
| """ | |
| Диагностика остатков модели: | |
| - Тест Льюнга-Бокса на автокорреляцию | |
| - Проверка нормальности (Shapiro-Wilk) | |
| - Q-Q plot данные | |
| """ | |
| residuals_clean = residuals[~np.isnan(residuals)] | |
| if len(residuals_clean) < 3: | |
| return {'error': 'Недостаточно данных для диагностики'} | |
| results = {} | |
| # Тест Льюнга-Бокса | |
| try: | |
| lb_stat, lb_pvalue = acorr_ljungbox(residuals_clean, lags=min(lags, len(residuals_clean) - 1), return_df=False) | |
| results['ljung_box'] = { | |
| 'statistic': float(lb_stat[-1]) if len(lb_stat) > 0 else None, | |
| 'pvalue': float(lb_pvalue[-1]) if len(lb_pvalue) > 0 else None, | |
| 'lags': lags | |
| } | |
| except Exception as e: | |
| results['ljung_box'] = {'error': str(e)} | |
| # Тест Шапиро-Уилка на нормальность | |
| try: | |
| if len(residuals_clean) <= 5000: # Ограничение для Shapiro-Wilk | |
| shapiro_stat, shapiro_pvalue = stats.shapiro(residuals_clean) | |
| results['shapiro_wilk'] = { | |
| 'statistic': float(shapiro_stat), | |
| 'pvalue': float(shapiro_pvalue) | |
| } | |
| else: | |
| # Для больших выборок используем тест нормальности из scipy | |
| k2_stat, k2_pvalue = stats.normaltest(residuals_clean) | |
| results['normality_test'] = { | |
| 'statistic': float(k2_stat), | |
| 'pvalue': float(k2_pvalue), | |
| 'test': 'normaltest' | |
| } | |
| except Exception as e: | |
| results['normality_test'] = {'error': str(e)} | |
| # Статистики остатков | |
| results['residual_stats'] = { | |
| 'mean': float(np.mean(residuals_clean)), | |
| 'std': float(np.std(residuals_clean)), | |
| 'min': float(np.min(residuals_clean)), | |
| 'max': float(np.max(residuals_clean)), | |
| 'count': len(residuals_clean) | |
| } | |
| # Проверка стационарности остатков | |
| try: | |
| adf_stat, adf_pvalue, _, _, _, _ = adfuller(residuals_clean) | |
| kpss_stat, kpss_pvalue, _, _ = kpss(residuals_clean) | |
| results['stationarity'] = { | |
| 'adf': {'statistic': float(adf_stat), 'pvalue': float(adf_pvalue)}, | |
| 'kpss': {'statistic': float(kpss_stat), 'pvalue': float(kpss_pvalue)} | |
| } | |
| except Exception as e: | |
| results['stationarity'] = {'error': str(e)} | |
| return results | |