OMCP / src /rupture_utils.py
cameroncameron's picture
Upload 4 files
6782585 verified
from sklearn.preprocessing import StandardScaler
import ruptures as rpt
import pandas as pd
import streamlit as st
def detect_changepoints(
df: pd.DataFrame,
algo: str = "pelt",
model: str = "rbf",
pen: float = 1.0,
) -> pd.DataFrame:
# 0. 确保索引从 0 开始,避免后续按位置赋值出错
df = df.reset_index(drop=True)
y = df["value"].values # 原始序列
# Step 1: 标准化(对 rbf/l2 强烈建议)
if model in ["rbf", "l2", "normal"]:
# 先标准化再保证二维形状 (n_samples, n_features)
y_scaled = StandardScaler().fit_transform(y.reshape(-1, 1))
X = y_scaled # shape (n,1)
else:
X = y # 1-D 数组即可
# Step 2: 检测变点
if algo == "pelt":
algo_obj = rpt.Pelt(model=model).fit(X)
result = algo_obj.predict(pen=pen)
elif algo == "window":
algo_obj = rpt.Window(width=10, model=model).fit(X)
result = algo_obj.predict(n_bkps=5)
else:
raise ValueError("Unknown algo")
# Step 3: 标注变点
df_out = df.copy()
df_out["changepoint"] = 0
for idx in result[:-1]: # 最后一个是序列终点
# ruptures 返回的 idx 是段结束位置(从 1 开始计数);
# 直接按位置写入即可
if idx - 1 < len(df_out):
df_out.loc[idx - 1, "changepoint"] = 1
st.write("Changepoint Index:", df_out[df_out["changepoint"] == 1])
print("Input sequence:", df["value"].values)
print("Changepoint detected:", result)
return df_out