ostock-backend / model /src /data /time_utils.py
johnaness's picture
Deploy OStock FastAPI backend to HF Space (Docker SDK, port 7860)
4be2d4d
"""
์‹œ๊ณ„์—ด ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ์œ ํ‹ธ๋ฆฌํ‹ฐ
"""
import numpy as np
from scipy.interpolate import PchipInterpolator
from joblib import Parallel, delayed
def calculate_time_derivative(x, dt=None, smooth=False):
"""
์‹œํ€€์Šค ๋ฐ์ดํ„ฐ์˜ ์‹œ๊ฐ„ ๋„ํ•จ์ˆ˜ ๊ณ„์‚ฐ (์ค‘์•™ ์ฐจ๋ถ„๋ฒ•)
dt: ๋‹จ์ผ ๊ฐ’ ๋˜๋Š” ๊ฐ ์‹œ์ ๊ฐ„ ์‹œ๊ฐ„ ๊ฐ„๊ฒฉ์„ ๋‹ด์€ ๋ฐฐ์—ด
"""
if len(x) <= 1:
return np.zeros_like(x)
dx = np.zeros_like(x)
# dt๊ฐ€ ๋ฐฐ์—ด์ธ์ง€ ํ™•์ธ
is_dt_array = isinstance(dt, (list, np.ndarray)) and len(dt) > 1
# ๋‚ด๋ถ€ ํฌ์ธํŠธ
if x.shape[0] > 2:
if is_dt_array:
for i in range(1, len(x)-1):
dt_prev = dt[i-1]
dt_next = dt[i] if i < len(dt) else 1.0
total_dt = dt_prev + dt_next
# ๊ฐ€์ค‘ ์ค‘์•™ ์ฐจ๋ถ„๋ฒ•
if total_dt > 0:
w_prev = dt_next / total_dt # ์ด์ „ ๊ฐ’ ๊ฐ€์ค‘์น˜
w_next = dt_prev / total_dt # ๋‹ค์Œ ๊ฐ’ ๊ฐ€์ค‘์น˜
dx[i] = (w_next * (x[i+1] - x[i]) / dt_next -
w_prev * (x[i] - x[i-1]) / dt_prev)
else:
dt_val = 1.0 if dt is None else dt
dx[1:-1] = (x[2:] - x[:-2]) / (2.0 * dt_val)
# ๊ฒฝ๊ณ„ ํฌ์ธํŠธ
if is_dt_array:
# ์‹œ์ž‘์ 
dt_first = dt[0] if len(dt) > 0 else 1.0
dx[0] = (x[1] - x[0]) / dt_first
# ๋์ 
dt_last = dt[-1] if len(dt) > 0 else 1.0
dx[-1] = (x[-1] - x[-2]) / dt_last
else:
# ์ผ์ •ํ•œ ์‹œ๊ฐ„ ๊ฐ„๊ฒฉ
dt_val = 1.0 if dt is None else dt
dx[0] = (x[1] - x[0]) / dt_val
dx[-1] = (x[-1] - x[-2]) / dt_val
# ์„ ํƒ์  ์Šค๋ฌด๋”ฉ (๋…ธ์ด์ฆˆ ๊ฐ์†Œ)
if smooth and len(x) > 3:
kernel = np.array([0.25, 0.5, 0.25])
dx[1:-1] = np.convolve(dx, kernel, mode='same')[1:-1]
return dx
def hermite_cubic_spline(data, n_interpolation_points=10, time_points=None):
"""
SciPy์˜ PCHIP ๋ณด๊ฐ„๊ธฐ๋ฅผ ์‚ฌ์šฉํ•œ ์ตœ์ ํ™”๋œ ์Šคํ”Œ๋ผ์ธ ๋ณด๊ฐ„
"""
# ์‹œ๊ฐ„์ถ• ์„ค์ •
if time_points is not None:
# ์‹ค์ œ ๋ˆ„์  ์‹œ๊ฐ„ ์‚ฌ์šฉ
original_times = time_points
# ๊ท ์ผํ•œ ๊ฐ„๊ฒฉ์œผ๋กœ ๋ณด๊ฐ„ ์ง€์  ์ƒ์„ฑ
total_time = original_times[-1] - original_times[0]
interp_times = np.linspace(original_times[0], original_times[-1],
int(total_time * n_interpolation_points))
else:
# ๊ท ์ผํ•œ ์ธ๋ฑ์Šค ์‚ฌ์šฉ
original_times = np.arange(len(data))
interp_times = np.linspace(0, len(data)-1, (len(data)-1)*n_interpolation_points + 1)
# ํ—ฌํผ ํ•จ์ˆ˜ ์ •์˜
def interpolate_column(col):
return PchipInterpolator(original_times, data[:, col])(interp_times)
# ๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ
interpolated_data = Parallel(n_jobs=-1)(
delayed(interpolate_column)(col)
for col in range(data.shape[1])
)
return np.column_stack(interpolated_data), interp_times