Spaces:
Running
Running
| """ | |
| Time Series & Forecasting Tools | |
| Tools for time series analysis, forecasting, seasonality detection, and feature engineering. | |
| """ | |
| import polars as pl | |
| import numpy as np | |
| from typing import Dict, Any, List, Optional | |
| from pathlib import Path | |
| import sys | |
| import os | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| # Add parent directory to path for imports | |
| sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| # Lazy imports - only import when needed to avoid blocking app startup | |
| # from statsmodels.tsa.arima.model import ARIMA | |
| # from statsmodels.tsa.statespace.sarimax import SARIMAX | |
| # from statsmodels.tsa.holtwinters import ExponentialSmoothing | |
| # from statsmodels.tsa.seasonal import seasonal_decompose, STL | |
| # from statsmodels.graphics.tsaplots import plot_acf, plot_pacf | |
| # from prophet import Prophet | |
| import pandas as pd | |
| from ..utils.polars_helpers import load_dataframe, save_dataframe | |
| from ..utils.validation import validate_file_exists, validate_file_format, validate_dataframe, validate_column_exists | |
| def forecast_time_series( | |
| file_path: str, | |
| time_col: str, | |
| target_col: str, | |
| forecast_horizon: int = 30, | |
| method: str = "prophet", | |
| seasonal_period: Optional[int] = None, | |
| output_path: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Forecast time series using ARIMA, SARIMA, Prophet, or Exponential Smoothing. | |
| Args: | |
| file_path: Path to time series dataset | |
| time_col: Time/date column name | |
| target_col: Target variable to forecast | |
| forecast_horizon: Number of periods to forecast ahead | |
| method: Forecasting method ('arima', 'sarima', 'prophet', 'exponential_smoothing') | |
| seasonal_period: Seasonal period (e.g., 7 for weekly, 12 for monthly) | |
| output_path: Path to save forecast results | |
| Returns: | |
| Dictionary with forecast values and metrics | |
| """ | |
| # Load data | |
| df = load_dataframe(file_path) | |
| validate_dataframe(df) | |
| validate_column_exists(df, time_col) | |
| validate_column_exists(df, target_col) | |
| # Sort by time | |
| df = df.sort(time_col) | |
| # Lazy import of time series libraries | |
| try: | |
| if method == "prophet": | |
| from prophet import Prophet | |
| elif method in ["arima", "sarima"]: | |
| from statsmodels.tsa.arima.model import ARIMA | |
| from statsmodels.tsa.statespace.sarimax import SARIMAX | |
| elif method == "exponential_smoothing": | |
| from statsmodels.tsa.holtwinters import ExponentialSmoothing | |
| except ImportError as e: | |
| return { | |
| 'status': 'error', | |
| 'message': f"Required library not installed for {method}: {str(e)}" | |
| } | |
| print(f"📈 Forecasting with {method} (horizon={forecast_horizon})...") | |
| # Convert to pandas for time series libraries | |
| df_pd = df.to_pandas() | |
| if method == "prophet": | |
| # Prophet requires 'ds' and 'y' columns | |
| prophet_df = pd.DataFrame({ | |
| 'ds': pd.to_datetime(df_pd[time_col]), | |
| 'y': df_pd[target_col] | |
| }) | |
| model = Prophet(yearly_seasonality=True, weekly_seasonality=True, daily_seasonality=False) | |
| model.fit(prophet_df) | |
| # Create future dataframe | |
| future = model.make_future_dataframe(periods=forecast_horizon) | |
| forecast = model.predict(future) | |
| # Extract forecast values | |
| forecast_values = forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']].tail(forecast_horizon) | |
| result = { | |
| 'method': 'prophet', | |
| 'forecast': forecast_values.to_dict('records'), | |
| 'model_components': { | |
| 'trend': forecast['trend'].tail(forecast_horizon).tolist(), | |
| 'weekly': forecast.get('weekly', pd.Series([0]*forecast_horizon)).tail(forecast_horizon).tolist() | |
| } | |
| } | |
| elif method == "arima": | |
| # ARIMA model | |
| ts_data = df_pd.set_index(time_col)[target_col] | |
| # Auto-determine order (p,d,q) - simplified version | |
| model = ARIMA(ts_data, order=(1, 1, 1)) | |
| fitted_model = model.fit() | |
| # Forecast | |
| forecast = fitted_model.forecast(steps=forecast_horizon) | |
| forecast_index = pd.date_range(start=ts_data.index[-1], periods=forecast_horizon+1, freq='D')[1:] | |
| result = { | |
| 'method': 'arima', | |
| 'order': '(1,1,1)', | |
| 'forecast': [{'date': str(date), 'value': float(val)} for date, val in zip(forecast_index, forecast)], | |
| 'aic': float(fitted_model.aic), | |
| 'bic': float(fitted_model.bic) | |
| } | |
| elif method == "sarima": | |
| if not seasonal_period: | |
| seasonal_period = 7 # Default weekly | |
| ts_data = df_pd.set_index(time_col)[target_col] | |
| # SARIMA model | |
| model = SARIMAX(ts_data, order=(1, 1, 1), seasonal_order=(1, 1, 1, seasonal_period)) | |
| fitted_model = model.fit(disp=False) | |
| # Forecast | |
| forecast = fitted_model.forecast(steps=forecast_horizon) | |
| forecast_index = pd.date_range(start=ts_data.index[-1], periods=forecast_horizon+1, freq='D')[1:] | |
| result = { | |
| 'method': 'sarima', | |
| 'order': '(1,1,1)', | |
| 'seasonal_order': f'(1,1,1,{seasonal_period})', | |
| 'forecast': [{'date': str(date), 'value': float(val)} for date, val in zip(forecast_index, forecast)], | |
| 'aic': float(fitted_model.aic) | |
| } | |
| elif method == "exponential_smoothing": | |
| ts_data = df_pd.set_index(time_col)[target_col] | |
| # Exponential Smoothing | |
| model = ExponentialSmoothing( | |
| ts_data, | |
| seasonal_periods=seasonal_period if seasonal_period else 12, | |
| trend='add', | |
| seasonal='add' if seasonal_period else None | |
| ) | |
| fitted_model = model.fit() | |
| # Forecast | |
| forecast = fitted_model.forecast(steps=forecast_horizon) | |
| forecast_index = pd.date_range(start=ts_data.index[-1], periods=forecast_horizon+1, freq='D')[1:] | |
| result = { | |
| 'method': 'exponential_smoothing', | |
| 'forecast': [{'date': str(date), 'value': float(val)} for date, val in zip(forecast_index, forecast)] | |
| } | |
| else: | |
| raise ValueError(f"Unsupported method: {method}") | |
| # Save forecast | |
| if output_path: | |
| forecast_df = pl.DataFrame(result['forecast']) | |
| save_dataframe(forecast_df, output_path) | |
| print(f"💾 Forecast saved to: {output_path}") | |
| result['status'] = 'success' | |
| result['forecast_horizon'] = forecast_horizon | |
| result['output_path'] = output_path | |
| return result | |
| def detect_seasonality_trends( | |
| file_path: str, | |
| time_col: str, | |
| target_col: str, | |
| period: Optional[int] = None, | |
| method: str = "stl", | |
| output_path: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Detect seasonality and trends in time series using STL decomposition. | |
| Args: | |
| file_path: Path to time series dataset | |
| time_col: Time/date column | |
| target_col: Target variable | |
| period: Seasonal period (None = auto-detect) | |
| method: Decomposition method ('stl', 'classical') | |
| output_path: Path to save decomposition results | |
| Returns: | |
| Dictionary with trend, seasonal, and residual components | |
| """ | |
| # Load data | |
| df = load_dataframe(file_path) | |
| validate_dataframe(df) | |
| validate_column_exists(df, time_col) | |
| validate_column_exists(df, target_col) | |
| # Sort by time | |
| df = df.sort(time_col) | |
| # Lazy import of time series libraries | |
| try: | |
| if method == "stl": | |
| from statsmodels.tsa.seasonal import STL | |
| else: | |
| from statsmodels.tsa.seasonal import seasonal_decompose | |
| except ImportError as e: | |
| return { | |
| 'status': 'error', | |
| 'message': f"Required library not installed: {str(e)}" | |
| } | |
| print(f"🔍 Detecting seasonality and trends using {method}...") | |
| # Convert to pandas | |
| df_pd = df.to_pandas() | |
| ts_data = df_pd.set_index(time_col)[target_col] | |
| # Auto-detect period using FFT if not provided | |
| if period is None: | |
| from scipy.fft import fft | |
| from scipy.signal import find_peaks | |
| # Remove trend | |
| detrended = ts_data - ts_data.rolling(window=min(len(ts_data)//10, 30), center=True).mean() | |
| detrended = detrended.fillna(method='bfill').fillna(method='ffill') | |
| # FFT | |
| fft_vals = np.abs(fft(detrended.values)) | |
| freqs = np.fft.fftfreq(len(fft_vals)) | |
| # Find peaks | |
| peaks, _ = find_peaks(fft_vals[:len(fft_vals)//2], height=np.max(fft_vals)*0.1) | |
| if len(peaks) > 0: | |
| # Get dominant frequency | |
| dominant_freq = freqs[peaks[0]] | |
| period = int(1 / abs(dominant_freq)) if dominant_freq != 0 else 7 | |
| else: | |
| period = 7 # Default weekly | |
| print(f"📊 Auto-detected period: {period}") | |
| # Perform decomposition | |
| if method == "stl": | |
| # STL decomposition (more robust) | |
| stl = STL(ts_data, seasonal=period*2+1, trend=period*4+1) | |
| result_decomp = stl.fit() | |
| trend = result_decomp.trend | |
| seasonal = result_decomp.seasonal | |
| residual = result_decomp.resid | |
| else: | |
| # Classical decomposition | |
| result_decomp = seasonal_decompose(ts_data, model='additive', period=period) | |
| trend = result_decomp.trend | |
| seasonal = result_decomp.seasonal | |
| residual = result_decomp.resid | |
| # Calculate seasonality strength | |
| var_resid = np.var(residual.dropna()) | |
| var_seasonal_resid = np.var((seasonal + residual).dropna()) | |
| seasonality_strength = 1 - (var_resid / var_seasonal_resid) if var_seasonal_resid > 0 else 0 | |
| # Calculate trend strength | |
| var_detrended = np.var((ts_data - trend).dropna()) | |
| trend_strength = 1 - (var_resid / var_detrended) if var_detrended > 0 else 0 | |
| # Autocorrelation analysis | |
| from statsmodels.tsa.stattools import acf | |
| acf_values = acf(ts_data.dropna(), nlags=min(40, len(ts_data)//2)) | |
| # Create decomposition dataframe | |
| decomp_df = pl.DataFrame({ | |
| 'time': df[time_col].to_list(), | |
| 'original': ts_data.values, | |
| 'trend': trend.fillna(0).values, | |
| 'seasonal': seasonal.fillna(0).values, | |
| 'residual': residual.fillna(0).values | |
| }) | |
| # Save if output path provided | |
| if output_path: | |
| save_dataframe(decomp_df, output_path) | |
| print(f"💾 Decomposition saved to: {output_path}") | |
| return { | |
| 'status': 'success', | |
| 'method': method, | |
| 'detected_period': period, | |
| 'seasonality_strength': float(seasonality_strength), | |
| 'trend_strength': float(trend_strength), | |
| 'interpretation': { | |
| 'seasonality': 'strong' if seasonality_strength > 0.6 else 'moderate' if seasonality_strength > 0.3 else 'weak', | |
| 'trend': 'strong' if trend_strength > 0.6 else 'moderate' if trend_strength > 0.3 else 'weak' | |
| }, | |
| 'autocorrelation': acf_values[:min(10, len(acf_values))].tolist(), | |
| 'output_path': output_path | |
| } | |
| def create_time_series_features( | |
| file_path: str, | |
| time_col: str, | |
| target_col: str, | |
| lag_periods: Optional[List[int]] = None, | |
| rolling_windows: Optional[List[int]] = None, | |
| add_holiday_features: bool = True, | |
| country: str = "US", | |
| output_path: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Create comprehensive time series features including lags, rolling stats, and calendar features. | |
| Args: | |
| file_path: Path to time series dataset | |
| time_col: Time/date column | |
| target_col: Target variable | |
| lag_periods: Lag periods to create (e.g., [1, 7, 30]) | |
| rolling_windows: Rolling window sizes (e.g., [7, 14, 30]) | |
| add_holiday_features: Add holiday indicators | |
| country: Country for holiday calendar | |
| output_path: Path to save dataset with new features | |
| Returns: | |
| Dictionary with feature engineering results | |
| """ | |
| # Load data | |
| df = load_dataframe(file_path) | |
| validate_dataframe(df) | |
| validate_column_exists(df, time_col) | |
| validate_column_exists(df, target_col) | |
| # Sort by time | |
| df = df.sort(time_col) | |
| print("⏰ Creating time series features...") | |
| # Convert to pandas for easier datetime handling | |
| df_pd = df.to_pandas() | |
| df_pd[time_col] = pd.to_datetime(df_pd[time_col]) | |
| df_pd = df_pd.set_index(time_col) | |
| created_features = [] | |
| # Lag features | |
| if lag_periods is None: | |
| lag_periods = [1, 7, 14, 30] | |
| for lag in lag_periods: | |
| df_pd[f'{target_col}_lag_{lag}'] = df_pd[target_col].shift(lag) | |
| created_features.append(f'{target_col}_lag_{lag}') | |
| # Rolling window features | |
| if rolling_windows is None: | |
| rolling_windows = [7, 14, 30] | |
| for window in rolling_windows: | |
| df_pd[f'{target_col}_rolling_mean_{window}'] = df_pd[target_col].rolling(window=window).mean() | |
| df_pd[f'{target_col}_rolling_std_{window}'] = df_pd[target_col].rolling(window=window).std() | |
| df_pd[f'{target_col}_rolling_min_{window}'] = df_pd[target_col].rolling(window=window).min() | |
| df_pd[f'{target_col}_rolling_max_{window}'] = df_pd[target_col].rolling(window=window).max() | |
| created_features.extend([ | |
| f'{target_col}_rolling_mean_{window}', | |
| f'{target_col}_rolling_std_{window}', | |
| f'{target_col}_rolling_min_{window}', | |
| f'{target_col}_rolling_max_{window}' | |
| ]) | |
| # Exponential moving average | |
| df_pd[f'{target_col}_ema_7'] = df_pd[target_col].ewm(span=7).mean() | |
| df_pd[f'{target_col}_ema_30'] = df_pd[target_col].ewm(span=30).mean() | |
| created_features.extend([f'{target_col}_ema_7', f'{target_col}_ema_30']) | |
| # Calendar features | |
| df_pd['year'] = df_pd.index.year | |
| df_pd['month'] = df_pd.index.month | |
| df_pd['day'] = df_pd.index.day | |
| df_pd['dayofweek'] = df_pd.index.dayofweek | |
| df_pd['dayofyear'] = df_pd.index.dayofyear | |
| df_pd['quarter'] = df_pd.index.quarter | |
| df_pd['is_weekend'] = (df_pd.index.dayofweek >= 5).astype(int) | |
| df_pd['is_month_start'] = df_pd.index.is_month_start.astype(int) | |
| df_pd['is_month_end'] = df_pd.index.is_month_end.astype(int) | |
| # Cyclical encoding for periodic features | |
| df_pd['month_sin'] = np.sin(2 * np.pi * df_pd['month'] / 12) | |
| df_pd['month_cos'] = np.cos(2 * np.pi * df_pd['month'] / 12) | |
| df_pd['day_sin'] = np.sin(2 * np.pi * df_pd['day'] / 31) | |
| df_pd['day_cos'] = np.cos(2 * np.pi * df_pd['day'] / 31) | |
| df_pd['dayofweek_sin'] = np.sin(2 * np.pi * df_pd['dayofweek'] / 7) | |
| df_pd['dayofweek_cos'] = np.cos(2 * np.pi * df_pd['dayofweek'] / 7) | |
| created_features.extend([ | |
| 'year', 'month', 'day', 'dayofweek', 'dayofyear', 'quarter', | |
| 'is_weekend', 'is_month_start', 'is_month_end', | |
| 'month_sin', 'month_cos', 'day_sin', 'day_cos', | |
| 'dayofweek_sin', 'dayofweek_cos' | |
| ]) | |
| # Holiday features | |
| if add_holiday_features: | |
| try: | |
| import holidays | |
| country_holidays = holidays.country_holidays(country) | |
| df_pd['is_holiday'] = df_pd.index.map(lambda x: 1 if x in country_holidays else 0) | |
| # Days until next holiday | |
| holiday_dates = sorted([date for date in country_holidays if date >= df_pd.index.min()]) | |
| df_pd['days_to_next_holiday'] = df_pd.index.map( | |
| lambda x: min([abs((hol - x).days) for hol in holiday_dates if hol >= x], default=365) | |
| ) | |
| created_features.extend(['is_holiday', 'days_to_next_holiday']) | |
| except Exception as e: | |
| print(f"⚠️ Could not add holiday features: {str(e)}") | |
| # Convert back to polars | |
| df_pd = df_pd.reset_index() | |
| df_result = pl.from_pandas(df_pd) | |
| # Save if output path provided | |
| if output_path: | |
| save_dataframe(df_result, output_path) | |
| print(f"💾 Dataset with time series features saved to: {output_path}") | |
| return { | |
| 'status': 'success', | |
| 'features_created': len(created_features), | |
| 'feature_names': created_features, | |
| 'lag_periods': lag_periods, | |
| 'rolling_windows': rolling_windows, | |
| 'holiday_features_added': add_holiday_features, | |
| 'output_path': output_path | |
| } | |