Spaces:
Sleeping
Sleeping
File size: 3,061 Bytes
4be2d4d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 | """
์๊ณ์ด ๋ฐ์ดํฐ ์ฒ๋ฆฌ ์ ํธ๋ฆฌํฐ
"""
import numpy as np
from scipy.interpolate import PchipInterpolator
from joblib import Parallel, delayed
def calculate_time_derivative(x, dt=None, smooth=False):
"""
์ํ์ค ๋ฐ์ดํฐ์ ์๊ฐ ๋ํจ์ ๊ณ์ฐ (์ค์ ์ฐจ๋ถ๋ฒ)
dt: ๋จ์ผ ๊ฐ ๋๋ ๊ฐ ์์ ๊ฐ ์๊ฐ ๊ฐ๊ฒฉ์ ๋ด์ ๋ฐฐ์ด
"""
if len(x) <= 1:
return np.zeros_like(x)
dx = np.zeros_like(x)
# dt๊ฐ ๋ฐฐ์ด์ธ์ง ํ์ธ
is_dt_array = isinstance(dt, (list, np.ndarray)) and len(dt) > 1
# ๋ด๋ถ ํฌ์ธํธ
if x.shape[0] > 2:
if is_dt_array:
for i in range(1, len(x)-1):
dt_prev = dt[i-1]
dt_next = dt[i] if i < len(dt) else 1.0
total_dt = dt_prev + dt_next
# ๊ฐ์ค ์ค์ ์ฐจ๋ถ๋ฒ
if total_dt > 0:
w_prev = dt_next / total_dt # ์ด์ ๊ฐ ๊ฐ์ค์น
w_next = dt_prev / total_dt # ๋ค์ ๊ฐ ๊ฐ์ค์น
dx[i] = (w_next * (x[i+1] - x[i]) / dt_next -
w_prev * (x[i] - x[i-1]) / dt_prev)
else:
dt_val = 1.0 if dt is None else dt
dx[1:-1] = (x[2:] - x[:-2]) / (2.0 * dt_val)
# ๊ฒฝ๊ณ ํฌ์ธํธ
if is_dt_array:
# ์์์
dt_first = dt[0] if len(dt) > 0 else 1.0
dx[0] = (x[1] - x[0]) / dt_first
# ๋์
dt_last = dt[-1] if len(dt) > 0 else 1.0
dx[-1] = (x[-1] - x[-2]) / dt_last
else:
# ์ผ์ ํ ์๊ฐ ๊ฐ๊ฒฉ
dt_val = 1.0 if dt is None else dt
dx[0] = (x[1] - x[0]) / dt_val
dx[-1] = (x[-1] - x[-2]) / dt_val
# ์ ํ์ ์ค๋ฌด๋ฉ (๋
ธ์ด์ฆ ๊ฐ์)
if smooth and len(x) > 3:
kernel = np.array([0.25, 0.5, 0.25])
dx[1:-1] = np.convolve(dx, kernel, mode='same')[1:-1]
return dx
def hermite_cubic_spline(data, n_interpolation_points=10, time_points=None):
"""
SciPy์ PCHIP ๋ณด๊ฐ๊ธฐ๋ฅผ ์ฌ์ฉํ ์ต์ ํ๋ ์คํ๋ผ์ธ ๋ณด๊ฐ
"""
# ์๊ฐ์ถ ์ค์
if time_points is not None:
# ์ค์ ๋์ ์๊ฐ ์ฌ์ฉ
original_times = time_points
# ๊ท ์ผํ ๊ฐ๊ฒฉ์ผ๋ก ๋ณด๊ฐ ์ง์ ์์ฑ
total_time = original_times[-1] - original_times[0]
interp_times = np.linspace(original_times[0], original_times[-1],
int(total_time * n_interpolation_points))
else:
# ๊ท ์ผํ ์ธ๋ฑ์ค ์ฌ์ฉ
original_times = np.arange(len(data))
interp_times = np.linspace(0, len(data)-1, (len(data)-1)*n_interpolation_points + 1)
# ํฌํผ ํจ์ ์ ์
def interpolate_column(col):
return PchipInterpolator(original_times, data[:, col])(interp_times)
# ๋ณ๋ ฌ ์ฒ๋ฆฌ
interpolated_data = Parallel(n_jobs=-1)(
delayed(interpolate_column)(col)
for col in range(data.shape[1])
)
return np.column_stack(interpolated_data), interp_times |