from sklearn.preprocessing import StandardScaler import ruptures as rpt import pandas as pd import streamlit as st def detect_changepoints( df: pd.DataFrame, algo: str = "pelt", model: str = "rbf", pen: float = 1.0, ) -> pd.DataFrame: # 0. 确保索引从 0 开始,避免后续按位置赋值出错 df = df.reset_index(drop=True) y = df["value"].values # 原始序列 # Step 1: 标准化(对 rbf/l2 强烈建议) if model in ["rbf", "l2", "normal"]: # 先标准化再保证二维形状 (n_samples, n_features) y_scaled = StandardScaler().fit_transform(y.reshape(-1, 1)) X = y_scaled # shape (n,1) else: X = y # 1-D 数组即可 # Step 2: 检测变点 if algo == "pelt": algo_obj = rpt.Pelt(model=model).fit(X) result = algo_obj.predict(pen=pen) elif algo == "window": algo_obj = rpt.Window(width=10, model=model).fit(X) result = algo_obj.predict(n_bkps=5) else: raise ValueError("Unknown algo") # Step 3: 标注变点 df_out = df.copy() df_out["changepoint"] = 0 for idx in result[:-1]: # 最后一个是序列终点 # ruptures 返回的 idx 是段结束位置(从 1 开始计数); # 直接按位置写入即可 if idx - 1 < len(df_out): df_out.loc[idx - 1, "changepoint"] = 1 st.write("Changepoint Index:", df_out[df_out["changepoint"] == 1]) print("Input sequence:", df["value"].values) print("Changepoint detected:", result) return df_out