Spaces:
Build error
Build error
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| from scipy.interpolate import interp1d | |
| from scipy.signal import savgol_filter, correlate, find_peaks | |
| def numpy_to_native(data): | |
| if isinstance(data, (np.int64, np.int32)): | |
| return int(data) | |
| elif isinstance(data, (np.float64, np.float32)): | |
| return float(data) | |
| elif isinstance(data, np.ndarray): | |
| return data.tolist() | |
| elif isinstance(data, dict): | |
| return {k: numpy_to_native(v) for k, v in data.items()} | |
| elif isinstance(data, list): | |
| return [numpy_to_native(v) for v in data] | |
| else: | |
| return data | |
| def process_signals(gz_signal, upsample_factor, window_size=40, poly_order=2, peak_distance=2, peak_prominence=1): | |
| smoothed_signal = savgol_filter(gz_signal, window_size, poly_order,mode='interp') | |
| upsampled_smoothed_signal = upsample_signal(smoothed_signal, upsample_factor) | |
| autocorr = correlate(upsampled_smoothed_signal, upsampled_smoothed_signal, mode='full') | |
| autocorr = autocorr[autocorr.size // 2:] | |
| peaks, _ = find_peaks(autocorr, distance=peak_distance, prominence=peak_prominence) | |
| return upsampled_smoothed_signal, peaks | |
| def fill_missing_values(data, window_size=10): | |
| for col in data.columns: | |
| if data[col].isna().any(): | |
| data[col] = data[col].rolling(window=window_size, min_periods=1, center=True).median() | |
| return data | |
| def upsample_signal(signal, upsample_factor): | |
| x = np.arange(signal.size) | |
| interpolator = interp1d(x, signal, kind='quadratic') | |
| x_upsampled = np.linspace(0, signal.size - 1, signal.size * upsample_factor) | |
| return interpolator(x_upsampled) | |
| def upsample_signal_v2(signal, upsample_factor): | |
| x = np.arange(signal.size) | |
| # Calculate the second-order derivative | |
| second_derivative = np.diff(signal, n=2) | |
| # Count the number of non-zero second-order derivatives | |
| non_zero_second_derivatives = np.count_nonzero(second_derivative) | |
| # Choose the interpolation method adaptively | |
| if non_zero_second_derivatives > signal.size * 0.5: | |
| kind = 'quadratic' | |
| elif non_zero_second_derivatives > signal.size * 0.25: | |
| kind = 'cubic' | |
| else: | |
| kind = 'linear' | |
| interpolator = interp1d(x, signal, kind=kind) | |
| x_upsampled = np.linspace(0, signal.size - 1, signal.size * upsample_factor) | |
| return interpolator(x_upsampled) |