Spaces:
No application file
No application file
| from sklearn.preprocessing import StandardScaler | |
| import ruptures as rpt | |
| import pandas as pd | |
| import streamlit as st | |
| def detect_changepoints( | |
| df: pd.DataFrame, | |
| algo: str = "pelt", | |
| model: str = "rbf", | |
| pen: float = 1.0, | |
| ) -> pd.DataFrame: | |
| # 0. 确保索引从 0 开始,避免后续按位置赋值出错 | |
| df = df.reset_index(drop=True) | |
| y = df["value"].values # 原始序列 | |
| # Step 1: 标准化(对 rbf/l2 强烈建议) | |
| if model in ["rbf", "l2", "normal"]: | |
| # 先标准化再保证二维形状 (n_samples, n_features) | |
| y_scaled = StandardScaler().fit_transform(y.reshape(-1, 1)) | |
| X = y_scaled # shape (n,1) | |
| else: | |
| X = y # 1-D 数组即可 | |
| # Step 2: 检测变点 | |
| if algo == "pelt": | |
| algo_obj = rpt.Pelt(model=model).fit(X) | |
| result = algo_obj.predict(pen=pen) | |
| elif algo == "window": | |
| algo_obj = rpt.Window(width=10, model=model).fit(X) | |
| result = algo_obj.predict(n_bkps=5) | |
| else: | |
| raise ValueError("Unknown algo") | |
| # Step 3: 标注变点 | |
| df_out = df.copy() | |
| df_out["changepoint"] = 0 | |
| for idx in result[:-1]: # 最后一个是序列终点 | |
| # ruptures 返回的 idx 是段结束位置(从 1 开始计数); | |
| # 直接按位置写入即可 | |
| if idx - 1 < len(df_out): | |
| df_out.loc[idx - 1, "changepoint"] = 1 | |
| st.write("Changepoint Index:", df_out[df_out["changepoint"] == 1]) | |
| print("Input sequence:", df["value"].values) | |
| print("Changepoint detected:", result) | |
| return df_out |