File size: 3,061 Bytes
4be2d4d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
"""
์‹œ๊ณ„์—ด ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ์œ ํ‹ธ๋ฆฌํ‹ฐ
"""
import numpy as np
from scipy.interpolate import PchipInterpolator
from joblib import Parallel, delayed

def calculate_time_derivative(x, dt=None, smooth=False):
    """
    ์‹œํ€€์Šค ๋ฐ์ดํ„ฐ์˜ ์‹œ๊ฐ„ ๋„ํ•จ์ˆ˜ ๊ณ„์‚ฐ (์ค‘์•™ ์ฐจ๋ถ„๋ฒ•)
    dt: ๋‹จ์ผ ๊ฐ’ ๋˜๋Š” ๊ฐ ์‹œ์ ๊ฐ„ ์‹œ๊ฐ„ ๊ฐ„๊ฒฉ์„ ๋‹ด์€ ๋ฐฐ์—ด
    """
    if len(x) <= 1:
        return np.zeros_like(x)
    
    dx = np.zeros_like(x)
    
    # dt๊ฐ€ ๋ฐฐ์—ด์ธ์ง€ ํ™•์ธ
    is_dt_array = isinstance(dt, (list, np.ndarray)) and len(dt) > 1
    
    # ๋‚ด๋ถ€ ํฌ์ธํŠธ
    if x.shape[0] > 2:
        if is_dt_array:
            for i in range(1, len(x)-1):
                dt_prev = dt[i-1]
                dt_next = dt[i] if i < len(dt) else 1.0
                total_dt = dt_prev + dt_next
                
                # ๊ฐ€์ค‘ ์ค‘์•™ ์ฐจ๋ถ„๋ฒ•
                if total_dt > 0:
                    w_prev = dt_next / total_dt  # ์ด์ „ ๊ฐ’ ๊ฐ€์ค‘์น˜
                    w_next = dt_prev / total_dt  # ๋‹ค์Œ ๊ฐ’ ๊ฐ€์ค‘์น˜
                    dx[i] = (w_next * (x[i+1] - x[i]) / dt_next - 
                             w_prev * (x[i] - x[i-1]) / dt_prev)
        else:
            dt_val = 1.0 if dt is None else dt
            dx[1:-1] = (x[2:] - x[:-2]) / (2.0 * dt_val)
    
    # ๊ฒฝ๊ณ„ ํฌ์ธํŠธ
    if is_dt_array:
        # ์‹œ์ž‘์ 
        dt_first = dt[0] if len(dt) > 0 else 1.0
        dx[0] = (x[1] - x[0]) / dt_first
        
        # ๋์ 
        dt_last = dt[-1] if len(dt) > 0 else 1.0
        dx[-1] = (x[-1] - x[-2]) / dt_last
    else:
        # ์ผ์ •ํ•œ ์‹œ๊ฐ„ ๊ฐ„๊ฒฉ
        dt_val = 1.0 if dt is None else dt
        dx[0] = (x[1] - x[0]) / dt_val
        dx[-1] = (x[-1] - x[-2]) / dt_val
    
    # ์„ ํƒ์  ์Šค๋ฌด๋”ฉ (๋…ธ์ด์ฆˆ ๊ฐ์†Œ)
    if smooth and len(x) > 3:
        kernel = np.array([0.25, 0.5, 0.25])
        dx[1:-1] = np.convolve(dx, kernel, mode='same')[1:-1]
        
    return dx

def hermite_cubic_spline(data, n_interpolation_points=10, time_points=None):
    """
    SciPy์˜ PCHIP ๋ณด๊ฐ„๊ธฐ๋ฅผ ์‚ฌ์šฉํ•œ ์ตœ์ ํ™”๋œ ์Šคํ”Œ๋ผ์ธ ๋ณด๊ฐ„
    """
    # ์‹œ๊ฐ„์ถ• ์„ค์ •
    if time_points is not None:
        # ์‹ค์ œ ๋ˆ„์  ์‹œ๊ฐ„ ์‚ฌ์šฉ
        original_times = time_points
        # ๊ท ์ผํ•œ ๊ฐ„๊ฒฉ์œผ๋กœ ๋ณด๊ฐ„ ์ง€์  ์ƒ์„ฑ
        total_time = original_times[-1] - original_times[0]
        interp_times = np.linspace(original_times[0], original_times[-1], 
                                   int(total_time * n_interpolation_points))
    else:
        # ๊ท ์ผํ•œ ์ธ๋ฑ์Šค ์‚ฌ์šฉ
        original_times = np.arange(len(data))
        interp_times = np.linspace(0, len(data)-1, (len(data)-1)*n_interpolation_points + 1)
    
    # ํ—ฌํผ ํ•จ์ˆ˜ ์ •์˜
    def interpolate_column(col):
        return PchipInterpolator(original_times, data[:, col])(interp_times)
    
    # ๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ
    interpolated_data = Parallel(n_jobs=-1)(
        delayed(interpolate_column)(col)
        for col in range(data.shape[1])
    )
    
    return np.column_stack(interpolated_data), interp_times