""" 시계열 데이터 처리 유틸리티 """ import numpy as np from scipy.interpolate import PchipInterpolator from joblib import Parallel, delayed def calculate_time_derivative(x, dt=None, smooth=False): """ 시퀀스 데이터의 시간 도함수 계산 (중앙 차분법) dt: 단일 값 또는 각 시점간 시간 간격을 담은 배열 """ if len(x) <= 1: return np.zeros_like(x) dx = np.zeros_like(x) # dt가 배열인지 확인 is_dt_array = isinstance(dt, (list, np.ndarray)) and len(dt) > 1 # 내부 포인트 if x.shape[0] > 2: if is_dt_array: for i in range(1, len(x)-1): dt_prev = dt[i-1] dt_next = dt[i] if i < len(dt) else 1.0 total_dt = dt_prev + dt_next # 가중 중앙 차분법 if total_dt > 0: w_prev = dt_next / total_dt # 이전 값 가중치 w_next = dt_prev / total_dt # 다음 값 가중치 dx[i] = (w_next * (x[i+1] - x[i]) / dt_next - w_prev * (x[i] - x[i-1]) / dt_prev) else: dt_val = 1.0 if dt is None else dt dx[1:-1] = (x[2:] - x[:-2]) / (2.0 * dt_val) # 경계 포인트 if is_dt_array: # 시작점 dt_first = dt[0] if len(dt) > 0 else 1.0 dx[0] = (x[1] - x[0]) / dt_first # 끝점 dt_last = dt[-1] if len(dt) > 0 else 1.0 dx[-1] = (x[-1] - x[-2]) / dt_last else: # 일정한 시간 간격 dt_val = 1.0 if dt is None else dt dx[0] = (x[1] - x[0]) / dt_val dx[-1] = (x[-1] - x[-2]) / dt_val # 선택적 스무딩 (노이즈 감소) if smooth and len(x) > 3: kernel = np.array([0.25, 0.5, 0.25]) dx[1:-1] = np.convolve(dx, kernel, mode='same')[1:-1] return dx def hermite_cubic_spline(data, n_interpolation_points=10, time_points=None): """ SciPy의 PCHIP 보간기를 사용한 최적화된 스플라인 보간 """ # 시간축 설정 if time_points is not None: # 실제 누적 시간 사용 original_times = time_points # 균일한 간격으로 보간 지점 생성 total_time = original_times[-1] - original_times[0] interp_times = np.linspace(original_times[0], original_times[-1], int(total_time * n_interpolation_points)) else: # 균일한 인덱스 사용 original_times = np.arange(len(data)) interp_times = np.linspace(0, len(data)-1, (len(data)-1)*n_interpolation_points + 1) # 헬퍼 함수 정의 def interpolate_column(col): return PchipInterpolator(original_times, data[:, col])(interp_times) # 병렬 처리 interpolated_data = Parallel(n_jobs=-1)( delayed(interpolate_column)(col) for col in range(data.shape[1]) ) return np.column_stack(interpolated_data), interp_times