TimeSeriesHomework1_2 / src /lab2_functions.py
Kolesnikov Dmitry
feat: Вторая лабораторка
eaf6e74
"""
Функции для лабораторной работы №2: Прогнозирование временных рядов
"""
import numpy as np
import pandas as pd
from typing import List, Tuple, Dict, Optional
from scipy import stats
from scipy.stats import boxcox, boxcox_normmax
from statsmodels.tsa.holtwinters import ExponentialSmoothing
from statsmodels.stats.diagnostic import acorr_ljungbox
from statsmodels.tsa.stattools import adfuller, kpss
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import mean_absolute_error, mean_squared_error
import warnings
warnings.filterwarnings('ignore')
def calculate_mape(y_true: np.ndarray, y_pred: np.ndarray) -> float:
"""Вычисляет MAPE (Mean Absolute Percentage Error)"""
y_true = np.array(y_true)
y_pred = np.array(y_pred)
mask = y_true != 0
if mask.sum() == 0:
return np.nan
return np.mean(np.abs((y_true[mask] - y_pred[mask]) / y_true[mask])) * 100
def create_advanced_features(df: pd.DataFrame, target: str, timestamp_col: str = 'timestamp') -> pd.DataFrame:
"""
Расширенный feature engineering:
- Временные признаки (день недели, месяц, квартал)
- Циклические признаки через sin/cos
- Лаги: lag_1, lag_7, lag_30
- Скользящие статистики: mean, std, min, max по окнам 7, 30, 90
"""
df = df.copy()
df = df.set_index(timestamp_col).sort_index()
# Временные признаки
df['day_of_week'] = df.index.dayofweek
df['month'] = df.index.month
df['quarter'] = df.index.quarter
df['day_of_month'] = df.index.day
df['week_of_year'] = df.index.isocalendar().week
# Циклические признаки
df['day_of_week_sin'] = np.sin(2 * np.pi * df['day_of_week'] / 7)
df['day_of_week_cos'] = np.cos(2 * np.pi * df['day_of_week'] / 7)
df['month_sin'] = np.sin(2 * np.pi * df['month'] / 12)
df['month_cos'] = np.cos(2 * np.pi * df['month'] / 12)
# Лаги
for lag in [1, 7, 30]:
df[f'{target}_lag_{lag}'] = df[target].shift(lag)
# Скользящие статистики
windows = [7, 30, 90]
for w in windows:
df[f'{target}_rolling_mean_{w}'] = df[target].rolling(window=w, min_periods=1).mean()
df[f'{target}_rolling_std_{w}'] = df[target].rolling(window=w, min_periods=1).std()
df[f'{target}_rolling_min_{w}'] = df[target].rolling(window=w, min_periods=1).min()
df[f'{target}_rolling_max_{w}'] = df[target].rolling(window=w, min_periods=1).max()
# Коэффициент вариации (волатильность)
for w in [7, 30]:
rolling_mean = df[f'{target}_rolling_mean_{w}']
rolling_std = df[f'{target}_rolling_std_{w}']
df[f'{target}_rolling_cv_{w}'] = rolling_std / (rolling_mean + 1e-8)
return df.reset_index()
def apply_boxcox_transform(series: pd.Series, lambda_param: Optional[float] = None) -> Tuple[pd.Series, float]:
"""
Применяет преобразование Бокса-Кокса.
Если lambda_param не указан, подбирает оптимальный.
"""
series_positive = series[series > 0]
if len(series_positive) == 0:
raise ValueError("Все значения должны быть положительными для преобразования Бокса-Кокса")
if lambda_param is None:
# Автоматический подбор lambda
lambda_param = boxcox_normmax(series_positive.values)
transformed_values, fitted_lambda = boxcox(series_positive.values, lmbda=lambda_param)
# Создаём новый Series с теми же индексами
result = pd.Series(index=series.index, dtype=float)
result.loc[series > 0] = transformed_values
return result, fitted_lambda
def inverse_boxcox_transform(transformed_series: pd.Series, lambda_param: float) -> pd.Series:
"""Обратное преобразование Бокса-Кокса"""
if lambda_param == 0:
return np.exp(transformed_series)
else:
return (lambda_param * transformed_series + 1) ** (1 / lambda_param)
def inverse_transformations(
forecast: np.ndarray,
last_train_values_transformed: np.ndarray,
transform_info: Dict
) -> np.ndarray:
"""
Применяет обратные преобразования к прогнозу.
Порядок обратного преобразования должен быть обратным порядку прямого:
Прямое: transformation -> diff_order -> seasonal_diff
Обратное: seasonal_diff -> diff_order -> transformation
forecast: прогноз в преобразованном пространстве (после всех преобразований)
last_train_values_transformed: последние значения обучающей выборки в преобразованном пространстве (после всех преобразований)
transform_info: информация о применённых преобразованиях (может содержать промежуточные значения)
"""
result = forecast.copy()
diff_order = transform_info.get('diff_order', 0)
seasonal_diff = transform_info.get('seasonal_diff')
# Получаем промежуточные значения из transform_info, если они есть
last_values_after_diff = transform_info.get('last_values_after_diff', None)
last_values_after_transform = transform_info.get('last_values_after_transform', None)
# 1. Обратное сезонное дифференцирование (если было)
if seasonal_diff is not None and seasonal_diff > 0:
# Нужны последние seasonal_diff значений после transformation и diff, но до seasonal_diff
if last_values_after_diff is not None and len(last_values_after_diff) >= seasonal_diff:
last_seasonal = last_values_after_diff[-seasonal_diff:]
elif len(last_train_values_transformed) >= seasonal_diff:
# Fallback: используем последние значения (хотя это не совсем правильно)
last_seasonal = last_train_values_transformed[-seasonal_diff:]
else:
last_seasonal = last_train_values_transformed if len(last_train_values_transformed) > 0 else np.array([0])
for i in range(len(result)):
if i < len(last_seasonal):
result[i] = result[i] + last_seasonal[i]
else:
# Используем предыдущие прогнозы
result[i] = result[i] + result[i - seasonal_diff]
# 2. Обратное обычное дифференцирование (если было)
for _ in range(diff_order):
# Нужны последние diff_order значений после transformation, но до diff
if last_values_after_transform is not None and len(last_values_after_transform) > 0:
last_val = last_values_after_transform[-1]
elif len(last_train_values_transformed) > 0:
# Fallback
last_val = last_train_values_transformed[-1]
else:
last_val = 0
for i in range(len(result)):
if i == 0:
result[i] = result[i] + last_val
else:
result[i] = result[i] + result[i - 1]
# 3. Обратное преобразование для стабилизации дисперсии
if transform_info.get('transformation') == 'log':
result = np.exp(result)
elif transform_info.get('transformation') == 'boxcox':
lambda_param = transform_info.get('lambda')
if lambda_param is not None:
if lambda_param == 0:
result = np.exp(result)
else:
result = (lambda_param * result + 1) ** (1 / lambda_param)
return result
def apply_transformations(
series: pd.Series,
transformation: str = 'none',
lambda_param: Optional[float] = None,
diff_order: int = 0,
seasonal_diff: Optional[int] = None
) -> Tuple[pd.Series, Dict]:
"""
Применяет цепочку преобразований к ряду.
transformation: 'none', 'log', 'boxcox'
diff_order: порядок обычного дифференцирования
seasonal_diff: период сезонного дифференцирования
Возвращает преобразованный ряд и словарь с информацией о преобразованиях,
включая промежуточные значения для обратного преобразования.
"""
result = series.copy()
info = {'transformation': transformation, 'lambda': None, 'diff_order': diff_order, 'seasonal_diff': seasonal_diff}
# Преобразование для стабилизации дисперсии
if transformation == 'log':
if (result <= 0).any():
raise ValueError("Для лог-трансформации все значения должны быть положительными")
result = np.log(result)
elif transformation == 'boxcox':
result, lambda_param = apply_boxcox_transform(result, lambda_param)
info['lambda'] = lambda_param
# Сохраняем значения после transformation (для обратного diff)
result_after_transform = result.copy()
# Обычное дифференцирование
for _ in range(diff_order):
result = result.diff()
# Сохраняем значения после diff (для обратного seasonal_diff)
result_after_diff = result.copy()
# Сезонное дифференцирование
if seasonal_diff is not None and seasonal_diff > 0:
result = result.diff(periods=seasonal_diff)
# Сохраняем промежуточные значения для обратного преобразования
info['last_values_after_transform'] = result_after_transform.values[-max(diff_order, 1):] if len(result_after_transform) > 0 else np.array([])
info['last_values_after_diff'] = result_after_diff.values[-max(seasonal_diff if seasonal_diff else 1, 1):] if len(result_after_diff) > 0 else np.array([])
return result.dropna(), info
def recursive_forecast(
model_func,
train_data: pd.Series,
horizon: int,
alpha: Optional[float] = None,
**model_kwargs
) -> Tuple[np.ndarray, Optional[Tuple[np.ndarray, np.ndarray]]]:
"""
Рекурсивная стратегия прогнозирования:
Одна модель → итеративное использование прогнозов
Возвращает прогнозы и опционально доверительные интервалы (lower, upper)
"""
forecasts = []
conf_lower = [] if alpha is not None else None
conf_upper = [] if alpha is not None else None
current_data = train_data.copy()
# Определяем тип индекса для правильного добавления новых значений
is_datetime = pd.api.types.is_datetime64_any_dtype(current_data.index)
for h in range(horizon):
# Обучаем модель на текущих данных
model = model_func(current_data, **model_kwargs)
# Прогнозируем на 1 шаг вперёд
if alpha is not None:
try:
forecast_result = model.forecast(steps=1, alpha=alpha)
if isinstance(forecast_result, tuple):
forecast_value = forecast_result[0][0] if len(forecast_result[0]) > 0 else forecast_result[0]
if len(forecast_result) > 1:
conf_lower.append(forecast_result[1][0] if len(forecast_result[1]) > 0 else forecast_result[1])
conf_upper.append(forecast_result[2][0] if len(forecast_result[2]) > 0 else forecast_result[2])
else:
forecast_value = forecast_result[0] if hasattr(forecast_result, '__getitem__') else float(forecast_result)
except:
# Если доверительные интервалы не поддерживаются, используем обычный прогноз
forecast = model.forecast(steps=1)
forecast_value = forecast[0] if hasattr(forecast, '__getitem__') else float(forecast)
else:
forecast = model.forecast(steps=1)
forecast_value = forecast[0] if hasattr(forecast, '__getitem__') else float(forecast)
forecasts.append(forecast_value)
# Добавляем прогноз к данным для следующей итерации
if is_datetime:
# Для DatetimeIndex используем частоту или инференс
try:
freq = pd.infer_freq(current_data.index) or 'D'
# Используем pd.date_range для создания следующей даты
last_date = current_data.index[-1]
next_dates = pd.date_range(start=last_date, periods=2, freq=freq)
if len(next_dates) >= 2:
next_idx = next_dates[1] # Берём вторую дату (первая = last_date)
else:
# Fallback
next_idx = len(current_data)
is_datetime = False
except:
# Если не удалось определить частоту, используем числовой индекс
try:
# Пробуем простой способ через Timedelta
next_idx = current_data.index[-1] + pd.Timedelta(days=1)
except:
next_idx = len(current_data)
is_datetime = False
else:
# Для числового индекса просто увеличиваем на 1
next_idx = len(current_data)
if is_datetime:
current_data = pd.concat([current_data, pd.Series([forecast_value], index=[next_idx])])
else:
# Используем числовой индекс
current_data = pd.concat([current_data, pd.Series([forecast_value], index=[next_idx])])
result = np.array(forecasts)
if alpha is not None and conf_lower and conf_upper:
return result, (np.array(conf_lower), np.array(conf_upper))
return result, None
def direct_forecast(
model_func,
train_data: pd.Series,
horizon: int,
alpha: Optional[float] = None,
**model_kwargs
) -> Tuple[np.ndarray, Optional[Tuple[np.ndarray, np.ndarray]]]:
"""
Прямая стратегия прогнозирования:
Отдельная модель для каждого шага t+1, ..., t+h
Возвращает прогнозы и опционально доверительные интервалы (lower, upper)
"""
forecasts = []
conf_lower = [] if alpha is not None else None
conf_upper = [] if alpha is not None else None
for h in range(1, horizon + 1):
# Обучаем отдельную модель для шага h
model = model_func(train_data, **model_kwargs)
# Прогнозируем на h шагов вперёд и берём последний
if alpha is not None:
try:
forecast_result = model.forecast(steps=h, alpha=alpha)
if isinstance(forecast_result, tuple):
forecast_value = forecast_result[0][-1] if len(forecast_result[0]) > 0 else forecast_result[0]
if len(forecast_result) > 1:
conf_lower.append(forecast_result[1][-1] if len(forecast_result[1]) > 0 else forecast_result[1])
conf_upper.append(forecast_result[2][-1] if len(forecast_result[2]) > 0 else forecast_result[2])
else:
forecast_value = forecast_result[-1]
except:
forecast = model.forecast(steps=h)
forecast_value = forecast[-1]
else:
forecast = model.forecast(steps=h)
forecast_value = forecast[-1]
forecasts.append(forecast_value)
result = np.array(forecasts)
if alpha is not None and conf_lower and conf_upper:
return result, (np.array(conf_lower), np.array(conf_upper))
return result, None
def hybrid_forecast(
model_func,
train_data: pd.Series,
horizon: int,
recursive_steps: int = None,
alpha: Optional[float] = None,
**model_kwargs
) -> Tuple[np.ndarray, Optional[Tuple[np.ndarray, np.ndarray]]]:
"""
Гибридная стратегия:
Рекурсивная для ближайших шагов, прямая — для дальних
Возвращает прогнозы и опционально доверительные интервалы (lower, upper)
"""
if recursive_steps is None:
recursive_steps = max(1, horizon // 2)
forecasts = []
conf_lower = [] if alpha is not None else None
conf_upper = [] if alpha is not None else None
# Рекурсивная часть
recursive_result = recursive_forecast(model_func, train_data, recursive_steps, alpha=alpha, **model_kwargs)
if isinstance(recursive_result, tuple):
recursive_forecasts, recursive_conf = recursive_result
if recursive_conf is not None:
conf_lower.extend(recursive_conf[0])
conf_upper.extend(recursive_conf[1])
else:
recursive_forecasts = recursive_result
forecasts.extend(recursive_forecasts)
# Прямая часть для оставшихся шагов
if horizon > recursive_steps:
# Используем последние данные + рекурсивные прогнозы
is_datetime = pd.api.types.is_datetime64_any_dtype(train_data.index)
if is_datetime:
try:
freq = pd.infer_freq(train_data.index) or 'D'
# Используем pd.date_range для создания дат начиная с последней даты + 1 период
last_date = train_data.index[-1]
extended_index = pd.date_range(
start=last_date,
periods=len(recursive_forecasts) + 1,
freq=freq
)[1:] # Берём все даты кроме первой (которая равна last_date)
except:
# Fallback на числовой индекс
try:
# Пробуем через date_range с periods
last_date = train_data.index[-1]
extended_index = pd.date_range(
start=last_date,
periods=len(recursive_forecasts) + 1,
freq='D'
)[1:] # Берём все даты кроме первой
except:
extended_index = range(len(train_data), len(train_data) + len(recursive_forecasts))
else:
extended_index = range(len(train_data), len(train_data) + len(recursive_forecasts))
extended_data = pd.concat([
train_data,
pd.Series(recursive_forecasts, index=extended_index)
])
remaining_horizon = horizon - recursive_steps
direct_result = direct_forecast(model_func, extended_data, remaining_horizon, alpha=alpha, **model_kwargs)
if isinstance(direct_result, tuple):
direct_forecasts, direct_conf = direct_result
if direct_conf is not None:
conf_lower.extend(direct_conf[0])
conf_upper.extend(direct_conf[1])
else:
direct_forecasts = direct_result
forecasts.extend(direct_forecasts)
result = np.array(forecasts[:horizon])
if alpha is not None and conf_lower and conf_upper:
return result, (np.array(conf_lower[:horizon]), np.array(conf_upper[:horizon]))
return result, None
def create_exponential_smoothing_model(
train_data: pd.Series,
trend: Optional[str] = None,
seasonal: Optional[str] = None,
seasonal_periods: Optional[int] = None,
optimized: bool = True
) -> ExponentialSmoothing:
"""Создаёт и обучает модель экспоненциального сглаживания"""
try:
model = ExponentialSmoothing(
train_data,
trend=trend,
seasonal=seasonal,
seasonal_periods=seasonal_periods,
initialization_method='estimated' if optimized else 'simple'
)
fitted_model = model.fit(optimized=optimized)
return fitted_model
except Exception as e:
raise ValueError(f"Ошибка при создании модели: {e}")
def evaluate_forecast(y_true: np.ndarray, y_pred: np.ndarray) -> Dict[str, float]:
"""Вычисляет метрики качества прогноза"""
y_true = np.array(y_true)
y_pred = np.array(y_pred)
mae = mean_absolute_error(y_true, y_pred)
rmse = np.sqrt(mean_squared_error(y_true, y_pred))
mape = calculate_mape(y_true, y_pred)
return {
'MAE': mae,
'RMSE': rmse,
'MAPE': mape
}
def naive_forecast(train_data: pd.Series, horizon: int) -> np.ndarray:
"""Наивный прогноз: y[t+h] = y[t]"""
last_value = train_data.iloc[-1]
return np.full(horizon, last_value)
def time_series_cv_sliding_window(
model_func,
data: pd.Series,
train_size: int,
test_size: int,
horizon: int,
step: int = 1,
**model_kwargs
) -> List[Dict]:
"""
Кросс-валидация со скользящим окном (фиксированная длина обучения)
"""
results = []
n = len(data)
for i in range(0, n - train_size - test_size + 1, step):
train_end = i + train_size
test_end = min(train_end + test_size, n)
train_data = data.iloc[i:train_end]
test_data = data.iloc[train_end:test_end]
try:
model = model_func(train_data, **model_kwargs)
forecast = model.forecast(steps=min(horizon, len(test_data)))
metrics = evaluate_forecast(test_data.values[:len(forecast)], forecast)
metrics['fold'] = len(results) + 1
metrics['train_start'] = train_data.index[0]
metrics['train_end'] = train_data.index[-1]
metrics['test_start'] = test_data.index[0]
metrics['test_end'] = test_data.index[-1]
results.append(metrics)
except Exception as e:
print(f"Ошибка в фолде {len(results) + 1}: {e}")
return results
def time_series_cv_expanding_window(
model_func,
data: pd.Series,
initial_train_size: int,
test_size: int,
horizon: int,
step: int = 1,
**model_kwargs
) -> List[Dict]:
"""
Кросс-валидация с расширяющимся окном (обучение растёт со временем)
"""
results = []
n = len(data)
for i in range(initial_train_size, n - test_size + 1, step):
train_end = i
test_end = min(train_end + test_size, n)
train_data = data.iloc[:train_end]
test_data = data.iloc[train_end:test_end]
try:
model = model_func(train_data, **model_kwargs)
forecast = model.forecast(steps=min(horizon, len(test_data)))
metrics = evaluate_forecast(test_data.values[:len(forecast)], forecast)
metrics['fold'] = len(results) + 1
metrics['train_start'] = train_data.index[0]
metrics['train_end'] = train_data.index[-1]
metrics['test_start'] = test_data.index[0]
metrics['test_end'] = test_data.index[-1]
results.append(metrics)
except Exception as e:
print(f"Ошибка в фолде {len(results) + 1}: {e}")
return results
def diagnose_model_residuals(residuals: np.ndarray, lags: int = 10) -> Dict:
"""
Диагностика остатков модели:
- Тест Льюнга-Бокса на автокорреляцию
- Проверка нормальности (Shapiro-Wilk)
- Q-Q plot данные
"""
residuals_clean = residuals[~np.isnan(residuals)]
if len(residuals_clean) < 3:
return {'error': 'Недостаточно данных для диагностики'}
results = {}
# Тест Льюнга-Бокса
try:
lb_stat, lb_pvalue = acorr_ljungbox(residuals_clean, lags=min(lags, len(residuals_clean) - 1), return_df=False)
results['ljung_box'] = {
'statistic': float(lb_stat[-1]) if len(lb_stat) > 0 else None,
'pvalue': float(lb_pvalue[-1]) if len(lb_pvalue) > 0 else None,
'lags': lags
}
except Exception as e:
results['ljung_box'] = {'error': str(e)}
# Тест Шапиро-Уилка на нормальность
try:
if len(residuals_clean) <= 5000: # Ограничение для Shapiro-Wilk
shapiro_stat, shapiro_pvalue = stats.shapiro(residuals_clean)
results['shapiro_wilk'] = {
'statistic': float(shapiro_stat),
'pvalue': float(shapiro_pvalue)
}
else:
# Для больших выборок используем тест нормальности из scipy
k2_stat, k2_pvalue = stats.normaltest(residuals_clean)
results['normality_test'] = {
'statistic': float(k2_stat),
'pvalue': float(k2_pvalue),
'test': 'normaltest'
}
except Exception as e:
results['normality_test'] = {'error': str(e)}
# Статистики остатков
results['residual_stats'] = {
'mean': float(np.mean(residuals_clean)),
'std': float(np.std(residuals_clean)),
'min': float(np.min(residuals_clean)),
'max': float(np.max(residuals_clean)),
'count': len(residuals_clean)
}
# Проверка стационарности остатков
try:
adf_stat, adf_pvalue, _, _, _, _ = adfuller(residuals_clean)
kpss_stat, kpss_pvalue, _, _ = kpss(residuals_clean)
results['stationarity'] = {
'adf': {'statistic': float(adf_stat), 'pvalue': float(adf_pvalue)},
'kpss': {'statistic': float(kpss_stat), 'pvalue': float(kpss_pvalue)}
}
except Exception as e:
results['stationarity'] = {'error': str(e)}
return results