File size: 1,564 Bytes
6782585
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from sklearn.preprocessing import StandardScaler
import ruptures as rpt
import pandas as pd
import streamlit as st

def detect_changepoints(
    df: pd.DataFrame,
    algo: str = "pelt",
    model: str = "rbf",
    pen: float = 1.0,
) -> pd.DataFrame:
    # 0. 确保索引从 0 开始,避免后续按位置赋值出错
    df = df.reset_index(drop=True)
    y = df["value"].values  # 原始序列

    # Step 1: 标准化(对 rbf/l2 强烈建议)
    if model in ["rbf", "l2", "normal"]:
        # 先标准化再保证二维形状 (n_samples, n_features)
        y_scaled = StandardScaler().fit_transform(y.reshape(-1, 1))
        X = y_scaled  # shape (n,1)
    else:
        X = y  # 1-D 数组即可

    # Step 2: 检测变点
    if algo == "pelt":
        algo_obj = rpt.Pelt(model=model).fit(X)
        result = algo_obj.predict(pen=pen)
    elif algo == "window":
        algo_obj = rpt.Window(width=10, model=model).fit(X)
        result = algo_obj.predict(n_bkps=5)
    else:
        raise ValueError("Unknown algo")

    # Step 3: 标注变点
    df_out = df.copy()
    df_out["changepoint"] = 0
    for idx in result[:-1]:  # 最后一个是序列终点
        # ruptures 返回的 idx 是段结束位置(从 1 开始计数);
        # 直接按位置写入即可
        if idx - 1 < len(df_out):
            df_out.loc[idx - 1, "changepoint"] = 1

    st.write("Changepoint Index:", df_out[df_out["changepoint"] == 1])

    print("Input sequence:", df["value"].values)
    print("Changepoint detected:", result)

    return df_out