Spaces:
Running
Running
| import torch | |
| import numpy as np | |
| from scipy import stats | |
| from scipy.signal import find_peaks | |
| import random | |
| from statsmodels.tsa.stattools import acf | |
| from scipy.ndimage import gaussian_filter1d | |
| from scipy import optimize | |
| class TimeSeriesProcessor: | |
| """ | |
| Utility class for converting between numpy and torch. | |
| """ | |
| def to_numpy(data): | |
| """Convert torch tensor to numpy array while preserving device and dtype info""" | |
| is_torch = isinstance(data, torch.Tensor) | |
| if is_torch: | |
| device = data.device | |
| dtype = data.dtype | |
| return data.detach().cpu().numpy(), is_torch, device, dtype | |
| return data, False, None, None | |
| def to_torch(data_np, is_torch, device=None, dtype=None): | |
| """Convert numpy array back to torch tensor if original was a tensor""" | |
| if is_torch: | |
| return torch.tensor(data_np, device=device, dtype=dtype) | |
| return data_np | |
| class Embedding: | |
| """ | |
| Class for embedding methods to transform time series to target dimension. | |
| """ | |
| def estimate_TDM_tau(data, acorr_threshold=1/np.e): | |
| """ | |
| Estimate tau using autocorrelation function with threshold method | |
| Args: | |
| data: Input data tensor of shape (seq_length, N) | |
| acorr_threshold: Autocorrelation threshold | |
| Returns: | |
| Maximum estimated tau across all dimensions | |
| """ | |
| # Convert to numpy | |
| data_np, _, _, _ = TimeSeriesProcessor.to_numpy(data) | |
| seq_length, n_dims = data_np.shape | |
| tau_vals = np.zeros(n_dims, dtype=int) | |
| for dim in range(n_dims): | |
| # Calculate autocorrelation | |
| autocorr_vals = acf(data_np[:, dim] - np.mean(data_np[:, dim]), nlags=seq_length//2) | |
| # Find first value below threshold (after lag 0) | |
| below_threshold = np.where(autocorr_vals[1:] < acorr_threshold)[0] | |
| if len(below_threshold) > 0: | |
| tau_vals[dim] = below_threshold[0] + 1 # +1 because skipping lag 0 | |
| else: | |
| tau_vals[dim] = 1 # Default if no value below threshold | |
| return int(np.max(tau_vals)) | |
| def estimate_pos_tau(data, max_lag=None, min_lag=None): | |
| """ | |
| Estimate autocorrelation time for positional embedding | |
| Args: | |
| data: Input data tensor of shape (seq_length, N) | |
| max_lag: Maximum lag to consider | |
| min_lag: Minimum lag to consider | |
| Returns: | |
| Maximum autocorrelation time across dimensions | |
| """ | |
| data_np, _, _, _ = TimeSeriesProcessor.to_numpy(data) | |
| seq_length, n = data_np.shape | |
| if max_lag is None: | |
| max_lag = seq_length - 1 | |
| if min_lag is None: | |
| min_lag = seq_length // 10 | |
| tau_vals = np.zeros(n, dtype=int) | |
| for dim in range(n): | |
| ts = data_np[:, dim] if not isinstance(data, torch.Tensor) else data[:, dim].cpu().numpy() | |
| autocorr_vals = acf(ts - np.mean(ts), nlags=max_lag) | |
| # Determine max autocorrelation with tau>tau_min | |
| peaks, _ = find_peaks(autocorr_vals) | |
| valid_peaks = [i for i in peaks if i > min_lag and i < len(autocorr_vals)] | |
| if valid_peaks: | |
| peak_values = autocorr_vals[valid_peaks] | |
| max_peak_idx = np.argmax(peak_values) | |
| tau_vals[dim] = valid_peaks[max_peak_idx] | |
| else: | |
| start_idx = min_lag + 1 | |
| segment = autocorr_vals[start_idx:] | |
| tau_vals[dim] = start_idx + int(np.argmax(segment)) | |
| return np.max(tau_vals) | |
| def delay_embedding(data, model_dim, tau=None): | |
| """ | |
| Standard delay embedding with optimal tau | |
| Args: | |
| data: Input data tensor of shape (seq_length, N) | |
| model_dim: Target dimension | |
| tau: Time delay (if None, estimated from autocorrelation) | |
| Returns: | |
| Delay embedded data of shape (shortened_length, model_dim) | |
| """ | |
| seq_length, N_data = data.shape | |
| needed_dims = model_dim - N_data | |
| if needed_dims <= 0: | |
| return data | |
| processed_data = data.clone() | |
| # Estimate tau if not provided | |
| if tau is None: | |
| tau = Embedding.estimate_TDM_tau(processed_data) | |
| # Select the last column for embedding | |
| ts = processed_data[:, -1].clone() | |
| # Calculate starting index | |
| start_idx = needed_dims * tau | |
| # Handle case where start_idx is too large | |
| if start_idx >= seq_length: | |
| tau = max(1, seq_length // (needed_dims + 1)) | |
| start_idx = needed_dims * tau | |
| # Create shortened data | |
| shortened_data = processed_data[start_idx:].clone() | |
| result = shortened_data | |
| # Add delayed versions | |
| for i in range(1, needed_dims + 1): | |
| delayed = ts[start_idx - i * tau:seq_length - i * tau].unsqueeze(1) | |
| result = torch.cat([result, delayed], dim=1) | |
| return result | |
| def delay_embedding_random(data, model_dim, upper_tau=10, lower_tau=3): | |
| """ | |
| Random delay embedding with random tau values | |
| Args: | |
| data: Input data tensor of shape (seq_length, N) | |
| model_dim: Target dimension | |
| upper_tau: Upper bound for random tau values | |
| lower_tau: Lower bound for random tau values | |
| Returns: | |
| Random delay embedded data | |
| """ | |
| seq_length, N_data = data.shape | |
| needed_dims = model_dim - N_data | |
| if needed_dims <= 0: | |
| return data | |
| processed_data = data.clone() | |
| # Generate random tau values | |
| taus = [random.randint(lower_tau, upper_tau) for _ in range(needed_dims)] | |
| max_tau = max(taus) | |
| # Select the first column for embedding | |
| ts = processed_data[:, 0].clone() | |
| # Create shortened data | |
| result = processed_data[max_tau:].clone() | |
| # Add delayed versions | |
| for i in range(needed_dims): | |
| delayed = ts[max_tau - taus[i]:seq_length - taus[i]].unsqueeze(1) | |
| result = torch.cat([result, delayed], dim=1) | |
| return result | |
| def zero_embedding(data, model_dim): | |
| """ | |
| Zero embedding: appends zeros to reach model dimensions | |
| Args: | |
| data: Input data tensor of shape (seq_length, N) | |
| model_dim: Target dimension | |
| Returns: | |
| Tensor with zeros appended to reach model_dim | |
| """ | |
| seq_length, N_data = data.shape | |
| needed_dims = model_dim - N_data | |
| if needed_dims > 0: | |
| zeros = torch.zeros(seq_length, needed_dims, device=data.device, dtype=data.dtype) | |
| data = torch.cat([data, zeros], dim=1) | |
| return data | |
| def positional_embedding(data, model_dim, tau=None): | |
| """ | |
| Positional embedding: adds sinusoidal signals based on autocorrelation time | |
| Args: | |
| data: Input data tensor of shape (seq_length, N) | |
| model_dim: Target dimension | |
| tau: Optional fixed value for tau. If None, estimated from data. | |
| Returns: | |
| Data with positional embeddings added | |
| """ | |
| seq_length, N_data = data.shape | |
| needed_dims = model_dim - N_data | |
| if needed_dims <= 0: | |
| return data | |
| if needed_dims != 1: | |
| shifts = torch.linspace(0, np.pi/2, needed_dims, device=data.device) | |
| else: | |
| shifts = torch.tensor([0.0], device=data.device) | |
| tau_val = tau if tau is not None else Embedding.estimate_pos_tau(data) | |
| t = torch.arange(1, seq_length + 1, dtype=data.dtype, device=data.device) | |
| result = data.clone() | |
| for shift in shifts: | |
| pos_feature = torch.sin(2 * np.pi / tau_val * t + shift).unsqueeze(1) | |
| result = torch.cat([result, pos_feature], dim=1) | |
| return result | |
| def apply_embedding(data, model_dim, method="pos_embedding", **kwargs): | |
| """ | |
| Apply selected embedding method to the data | |
| Args: | |
| data: Input data tensor of shape (seq_length, N) | |
| model_dim: Target dimension | |
| method: Embedding method ('pos_embedding', 'zero_embedding', | |
| 'delay_embedding', or 'delay_embedding_random') | |
| **kwargs: Additional parameters to pass to the specific embedding method | |
| Returns: | |
| Embedded data | |
| """ | |
| if method == "pos_embedding": | |
| return Embedding.positional_embedding(data, model_dim, **kwargs) | |
| elif method == "zero_embedding": | |
| return Embedding.zero_embedding(data, model_dim) | |
| elif method == "delay_embedding": | |
| return Embedding.delay_embedding(data, model_dim, **kwargs) | |
| elif method == "delay_embedding_random": | |
| return Embedding.delay_embedding_random(data, model_dim, **kwargs) | |
| else: | |
| raise ValueError(f"Unsupported embedding method: {method}") | |
| class BoxCoxTransformer: | |
| """ | |
| Applies Box-Cox transformation to data for variance stabilization. | |
| """ | |
| def __init__(self, lambda_range=(-2, 2)): | |
| """ | |
| Initialize BoxCoxTransformer. | |
| Args: | |
| lambda_range: Range for lambda parameter search | |
| """ | |
| self.lambda_range = lambda_range | |
| self.params = None | |
| def transform(data, lambda_range=(-2, 2)): | |
| """ | |
| Apply Box-Cox transformation to data for stabilization | |
| Args: | |
| data: Input data tensor of shape (seq_length, N) | |
| lambda_range: Range for lambda parameter search | |
| Returns: | |
| Transformed data and parameters for inverse transformation | |
| """ | |
| # Convert to numpy | |
| data_np, is_torch, device, dtype = TimeSeriesProcessor.to_numpy(data) | |
| seq_length, n_dims = data_np.shape | |
| transformed_data = np.zeros_like(data_np) | |
| box_cox_params = [] | |
| for dim in range(n_dims): | |
| # Add constant to ensure positivity | |
| if np.min(data_np[:, dim]) <= 0: | |
| offset = abs(np.min(data_np[:, dim])) + 1.2 | |
| data_shifted = data_np[:, dim] + offset | |
| else: | |
| offset = 1.2 | |
| data_shifted = data_np[:, dim] + offset | |
| try: | |
| # Find optimal lambda for Box-Cox transformation | |
| transformed, lambda_param = stats.boxcox(data_shifted) | |
| # Limit lambda to a reasonable range to prevent numerical issues | |
| lambda_param = max(min(lambda_param, 2.0), -2.0) | |
| # Recalculate transformation with bounded lambda for consistency | |
| if abs(lambda_param) < 1e-8: | |
| # For lambda near zero, use logarithmic transformation | |
| transformed = np.log(data_shifted) | |
| else: | |
| transformed = (data_shifted ** lambda_param - 1) / lambda_param | |
| # Store transformed data and parameters | |
| transformed_data[:, dim] = transformed | |
| except: | |
| # If transformation fails, just use the original data | |
| transformed_data[:, dim] = data_np[:, dim] | |
| lambda_param = 1.0 # Identity transform | |
| box_cox_params.append((lambda_param, offset)) | |
| # Convert back to torch if needed | |
| return TimeSeriesProcessor.to_torch(transformed_data, is_torch, device, dtype), box_cox_params | |
| def inverse_transform(data, box_cox_params): | |
| """ | |
| Apply inverse Box-Cox transformation | |
| Args: | |
| data: Transformed data tensor | |
| box_cox_params: Parameters from Box-Cox transformation | |
| Returns: | |
| Original scale data | |
| """ | |
| # Convert to numpy for computation | |
| data_np, is_torch, device, dtype = TimeSeriesProcessor.to_numpy(data) | |
| seq_length, n_dims = data_np.shape | |
| inverse_data = np.zeros_like(data_np) | |
| for dim in range(min(n_dims, len(box_cox_params))): | |
| lambda_param, offset = box_cox_params[dim] | |
| # Apply inverse transformation | |
| if abs(lambda_param) < 1e-8: | |
| # For lambda near zero, the transformation is logarithmic | |
| inverse_data[:, dim] = np.exp(data_np[:, dim]) - offset | |
| elif abs(lambda_param - 1.0) < 1e-8: | |
| # For lambda=1 (identity transform), just subtract offset | |
| inverse_data[:, dim] = data_np[:, dim] - offset | |
| else: | |
| # For other lambda values | |
| base = lambda_param * data_np[:, dim] + 1 | |
| # Simple clipping approach to ensure base is positive | |
| # This avoids complex numbers while preserving most data characteristics | |
| base = np.maximum(base, 1e-10) | |
| # Apply power transformation | |
| result = base ** (1/lambda_param) | |
| inverse_data[:, dim] = result - offset | |
| # Convert back to torch if needed | |
| return TimeSeriesProcessor.to_torch(inverse_data, is_torch, device, dtype) | |
| class Detrending: | |
| """ | |
| Applies exponential detrending to time series data. | |
| """ | |
| def exp_model(t, params): | |
| """ | |
| Exponential model for detrending | |
| Args: | |
| t: Time points | |
| params: Model parameters [a, b, c] | |
| Returns: | |
| Model values | |
| """ | |
| a, b, c = params | |
| return a * (t ** b) + c | |
| def fit_objective(params, data): | |
| """ | |
| Objective function for exponential model fitting | |
| Args: | |
| params: Model parameters | |
| data: Data to fit | |
| Returns: | |
| Sum of squared errors | |
| """ | |
| t = np.arange(1, len(data) + 1) | |
| predicted = Detrending.exp_model(t, params) | |
| return np.sum((data - predicted) ** 2) | |
| def apply_detrending(data): | |
| """ | |
| Apply exponential detrending to data | |
| Args: | |
| data: Input data tensor of shape (seq_length, N) | |
| Returns: | |
| Detrended data and parameters for inverse transformation | |
| """ | |
| # Convert to numpy | |
| data_np, is_torch, device, dtype = TimeSeriesProcessor.to_numpy(data) | |
| seq_length, n_dims = data_np.shape | |
| detrended_data = np.zeros_like(data_np) | |
| detrending_params = [] | |
| for dim in range(n_dims): | |
| # Define the objective function for this dimension | |
| objective = lambda params: Detrending.fit_objective(params, data_np[:, dim]) | |
| # Initial parameter guess | |
| initial_params = [0.0, 1.0, data_np[0,dim]] | |
| # Bounds for parameters | |
| bounds = [(None, None), (0.1, 3.0), (None, None)] | |
| # Optimize | |
| result = optimize.minimize( | |
| objective, | |
| initial_params, | |
| method='L-BFGS-B', | |
| bounds=bounds, | |
| options={ | |
| 'maxiter': 1000, | |
| 'gtol': 1e-6, | |
| 'maxfun': 1500, | |
| 'maxcor': 10 | |
| } | |
| ) | |
| optimal_params = np.round(result.x, 3) | |
| # Calculate trend and detrend the data | |
| t = np.arange(1, seq_length + 1) | |
| trend = Detrending.exp_model(t, optimal_params) | |
| detrended_data[:, dim] = data_np[:, dim] - trend | |
| # Store parameters for inverse transformation | |
| detrending_params.append(optimal_params) | |
| # Convert back to torch if needed | |
| return TimeSeriesProcessor.to_torch(detrended_data, is_torch, device, dtype), detrending_params | |
| def apply_detrending_inverse(context, data, detrending_params): | |
| """ | |
| Apply inverse detrending to forecasted data | |
| Args: | |
| context: Original context data | |
| data: Forecasted data | |
| detrending_params: Parameters from detrending | |
| Returns: | |
| Forecasted data with trend restored | |
| """ | |
| # Convert to numpy for computation | |
| data_np, is_torch, device, dtype = TimeSeriesProcessor.to_numpy(data) | |
| context_np, _, _, _ = TimeSeriesProcessor.to_numpy(context) | |
| # Get dimensions | |
| forecast_length, n_dims = data_np.shape | |
| context_length = len(context_np) | |
| # Create time points for the forecast horizon | |
| t = np.arange(context_length + 1, context_length + forecast_length + 1) | |
| # Add trend back to each dimension | |
| for dim in range(min(n_dims, len(detrending_params))): | |
| params = detrending_params[dim] | |
| trend = Detrending.exp_model(t, params) | |
| data_np[:, dim] = data_np[:, dim] + trend | |
| # Convert back to torch if needed | |
| return TimeSeriesProcessor.to_torch(data_np, is_torch, device, dtype) | |
| def estimate_initial_condition(initial_x, context_embedded): | |
| """ | |
| Estimate full initial condition from partial observation | |
| Args: | |
| initial_x: Partial initial condition of shape (N_partial,) | |
| context_embedded: Context data of shape (seq_length, N) | |
| Returns: | |
| Complete initial condition of shape (N,) | |
| """ | |
| T, N = context_embedded.shape | |
| N_partial = initial_x.shape[0] | |
| assert N_partial <= N, "Initial condition dimension must be <= embedding dimension" | |
| # Find timestep with closest match to initial condition in first N_partial dimensions | |
| distances = torch.zeros(T, device=initial_x.device) | |
| for t in range(T): | |
| distances[t] = torch.sum((context_embedded[t, :N_partial] - initial_x) ** 2) | |
| closest_t = torch.argmin(distances) | |
| # Combine initial condition with closest matching state | |
| return torch.cat([initial_x, context_embedded[closest_t, N_partial:]]) | |