OMCP / src /cp_utils.py
cameroncameron's picture
Upload 4 files
6782585 verified
"""cp_utils.py
Utilities for evaluating changepoint credibility and performing
semi-/supervised classification on changepoints.
"""
from __future__ import annotations
import numpy as np
import pandas as pd
from typing import Tuple
from statsmodels.tsa.stattools import adfuller
from prophet import Prophet
from sklearn.semi_supervised import SelfTrainingClassifier
from xgboost import XGBClassifier
# 🔧 新增:添加CatBoost支持
try:
from catboost import CatBoostClassifier
CATBOOST_AVAILABLE = True
except ImportError:
CATBOOST_AVAILABLE = False
print("⚠️ CatBoost not installed. Install with: pip install catboost")
# 1. 评估单栋楼的候选 changepoints —— ProphetDelta + 结构性指标
def _validate_cp_metrics(residual: pd.Series, idx: int, win: int = 6
) -> Tuple[bool, float, float]:
"""在 idx±win 窗口内计算 (slope, adf_p) 并判断是否显著"""
lo = max(0, idx - win)
hi = min(len(residual), idx + win + 1)
seg = residual.iloc[lo:hi].dropna()
if seg.size < 4:
return False, np.nan, np.nan
slope = np.polyfit(range(len(seg)), seg, 1)[0]
p_val = adfuller(seg)[1]
is_valid = (abs(slope) > 0.1) and (p_val > 0.05)
return is_valid, float(slope), float(p_val)
def building_score_changepoints(
summary_df: pd.DataFrame,
filled_df: pd.DataFrame,
building_name: str,
model: str = "rbf",
penalty: float | None = None,
window_size: int = 6,
usage_col: str = "FilledUse",
date_col: str = "Date",
cp_df: pd.DataFrame | None = None,
) -> pd.DataFrame:
"""Return table with Prophet delta & structure metrics for one building."""
from rupture_utils import detect_changepoints
records: list[dict] = []
utilities = (
summary_df.loc[summary_df["BuildingName"] == building_name,
"CommodityCode"].unique()
)
for util in utilities:
df_util = (
filled_df[
(filled_df["BuildingName"] == building_name)
& (filled_df["CommodityCode"] == util)
]
.sort_values(date_col)
.reset_index(drop=True)
)
if df_util[usage_col].isna().any() or len(df_util) < 24:
continue
if cp_df is None:
cp_df = detect_changepoints(
df_util[[date_col, usage_col]].rename(
columns={date_col: "timestamp", usage_col: "value"}),
algo="pelt",
model=model,
pen=penalty or 1.0,
)
if cp_df.empty or cp_df["changepoint"].sum() == 0:
continue
df_p = df_util[[date_col, usage_col]].rename(
columns={date_col: "ds", usage_col: "y"})
m_tmp = Prophet(yearly_seasonality=False, weekly_seasonality=False,
daily_seasonality=False)
m_tmp.fit(df_p)
residual = df_p["y"] - m_tmp.predict(df_p)["yhat"]
validated_dates: list[pd.Timestamp] = []
metrics_map: dict[pd.Timestamp, Tuple[float, float]] = {}
for d in pd.to_datetime(cp_df.loc[cp_df["changepoint"] == 1,
"timestamp"]):
if d not in df_util[date_col].values:
continue
idx = df_util.index[df_util[date_col] == d][0]
is_valid, slope, p_val = _validate_cp_metrics(
residual, idx, win=window_size
)
if is_valid:
validated_dates.append(d)
metrics_map[d] = (slope, p_val)
if not validated_dates:
continue
m = Prophet(changepoints=validated_dates, yearly_seasonality=False)
m.fit(df_p)
deltas = m.params["delta"].mean(axis=0)
for cp, delta in zip(m.changepoints, deltas):
d = pd.to_datetime(cp)
slope, p_val = metrics_map.get(d, (np.nan, np.nan))
records.append(
{
"Building Name": building_name,
"CommodityCode": util,
"Changepoint Date": d,
"ProphetDelta": float(delta),
"slope": float(slope),
"adf_p_value": float(p_val),
}
)
result_df = pd.DataFrame(records)
if not result_df.empty:
result_df["AbsDelta"] = result_df["ProphetDelta"].abs()
return result_df
# 2. 伪标签打标
def label_changepoints_by_structure_signal(
df: pd.DataFrame,
slope_thresh: float = 0.1,
p_thresh: float = 0.05,
) -> pd.DataFrame:
"""Assign pseudo-labels Real / Noise / Unknown based on structure
signals."""
def _assign(row):
s, p = row["slope"], row["adf_p_value"]
if pd.isna(s) or pd.isna(p):
return "Unknown"
if (abs(s) > slope_thresh) and (p > p_thresh):
return "Real"
if (abs(s) < slope_thresh * 0.5) and (p < p_thresh * 0.5):
return "Noise"
return "Unknown"
out = df.copy()
out["Label"] = out.apply(_assign, axis=1)
return out
# 3. 时序衍生特征
def extract_changepoint_features(
cp_df: pd.DataFrame,
filled_df: pd.DataFrame,
usage_col: str = "FilledUse",
date_col: str = "Date",
mean_win: int = 6,
) -> pd.DataFrame:
"""Derive mean diff/ratio and temporal context features for each cp,
并合并holidaycount特征(如有)"""
cp_df = cp_df.copy()
# 🔧 Fix: Ensure proper data types for TimeIndex and Season columns
cp_df["TimeIndex"] = cp_df["Changepoint Date"].dt.month.astype('int64')
# 🔧 Fix: Convert Season to categorical codes to avoid string/numeric
# dtype conflicts
season_mapping = {
6: 0, 7: 0, 8: 0, # Summer = 0
12: 1, 1: 1, 2: 1, # Winter = 1
}
# Other = 2
season_col = cp_df["TimeIndex"].map(season_mapping).fillna(2)
cp_df["Season"] = season_col.astype('int64')
min_dates = filled_df.groupby("BuildingName")[date_col].min().to_dict()
for i, row in cp_df.iterrows():
bld = row["Building Name"]
cp_date = row["Changepoint Date"]
df_bld = (
filled_df[filled_df["BuildingName"] == bld]
.sort_values(date_col)
.reset_index(drop=True)
)
if cp_date not in df_bld[date_col].values:
continue
idx = df_bld.index[df_bld[date_col] == cp_date][0]
before_vals = df_bld[usage_col].iloc[max(0, idx - mean_win): idx]
after_vals = df_bld[usage_col].iloc[idx + 1: idx + mean_win + 1]
before_mean = before_vals.mean() if len(before_vals) else np.nan
after_mean = after_vals.mean() if len(after_vals) else np.nan
diff = after_mean - before_mean if np.isfinite(before_mean) and np.isfinite(after_mean) else np.nan
ratio = after_mean / before_mean if np.isfinite(before_mean) and before_mean != 0 else np.nan
cp_df.at[i, "ΔMeanBefore"] = before_mean
cp_df.at[i, "ΔMeanAfter"] = after_mean
cp_df.at[i, "ΔMeanDiff"] = diff
cp_df.at[i, "ΔMeanRatio"] = ratio
cp_df.at[i, "TimeSinceStart"] = (cp_date - min_dates.get(bld, cp_date)).days
# 🔧 Fix: Ensure all numeric columns have consistent dtypes
numeric_cols = ["ΔMeanBefore", "ΔMeanAfter", "ΔMeanDiff", "ΔMeanRatio",
"TimeSinceStart", "TimeIndex", "Season"]
for col in numeric_cols:
if col in cp_df.columns:
cp_df[col] = pd.to_numeric(cp_df[col], errors='coerce')
if "holidaycount" in filled_df.columns:
# 只保留合并所需的列,避免重复
holiday_df = filled_df[["BuildingName", date_col, "holidaycount"]].drop_duplicates()
holiday_df = holiday_df.rename(
columns={"BuildingName": "Building Name", date_col: "Changepoint Date"}
)
# 🔧 Fix: Ensure holidaycount is numeric
holiday_df["holidaycount"] = pd.to_numeric(
holiday_df["holidaycount"], errors='coerce'
).fillna(0)
cp_df = cp_df.merge(
holiday_df, on=["Building Name", "Changepoint Date"], how="left"
)
# Fill any missing holidaycount values with 0
cp_df["holidaycount"] = cp_df["holidaycount"].fillna(0)
return cp_df
# 4. 半监督模型 (Self-Training XGBoost)
def run_semi_supervised_cp_model(
base_df: pd.DataFrame,
k_best: int = 10,
feature_cols: list[str] | None = None,
xgb_params: dict | None = None,
) -> Tuple[pd.DataFrame, dict]:
"""Return preds_df (with Predicted) and simple stats."""
if feature_cols is None:
feature_cols = [
"AbsDelta",
"slope",
"ΔMeanDiff",
"ΔMeanRatio",
"TimeSinceStart",
'holidaycount'
]
if xgb_params is None:
xgb_params = {
"max_depth": 3,
"learning_rate": 0.1,
"n_estimators": 200,
"subsample": 0.8,
"colsample_bytree": 0.8,
"objective": "binary:logistic",
"eval_metric": "logloss",
"verbosity": 0,
}
df = base_df.copy()
y = np.full(len(df), -1, dtype=int)
y[df["Label"] == "Real"] = 1
y[df["Label"] == "Noise"] = 0
X = df[feature_cols].fillna(0).values
base_clf = XGBClassifier(**xgb_params)
clf = SelfTrainingClassifier(
base_estimator=base_clf,
criterion="k_best",
k_best=k_best
)
unique_labels_in_y = np.unique(y[y != -1])
if len(unique_labels_in_y) < 2 and len(unique_labels_in_y) > 0:
print(f"Initial pseudo-labels only contain one class: "
f"{unique_labels_in_y}. Self-training may not be effective "
f"or may fail. Predictions might be skewed or based on "
f"initial labels only.")
try:
clf.fit(X, y)
trans = clf.transduction_
except ValueError as e:
print(f"SelfTrainingClassifier.fit error: {e}. This might be "
f"due to homogenous initial labels (e.g., all 'Real' "
f"or all 'Noise').")
trans = y.copy()
elif len(unique_labels_in_y) == 0:
print("No initial pseudo-labels (Real/Noise) found. "
"Self-training cannot proceed. All will be 'Unknown'.")
trans = y
else:
clf.fit(X, y)
trans = clf.transduction_
# 🔧 Fix: More robust dtype handling for np.select
# Ensure trans is integer type and handle any potential issues
trans = np.asarray(trans, dtype=int)
# 🔧 Fix: Alternative approach - using pandas map for safer type handling
# Create mapping dict and use pandas functionality instead of np.select
label_map = {1: "Real", 0: "Noise", -1: "Unknown"}
# Convert to pandas Series for safer dtype handling
trans_series = pd.Series(trans)
predicted_labels = trans_series.map(label_map).fillna("Unknown")
# Assign to dataframe with explicit dtype specification
df = df.copy() # Ensure we work with a clean copy
df["Predicted"] = predicted_labels.astype(str)
stats = df["Predicted"].value_counts(dropna=False).to_dict()
stats["k_best"] = k_best
return df, stats
# 🔧 新增:CatBoost版本的半监督模型
def run_semi_supervised_cp_model_catboost(
base_df: pd.DataFrame,
k_best: int = 10,
feature_cols: list[str] | None = None,
catboost_params: dict | None = None,
) -> Tuple[pd.DataFrame, dict]:
"""
CatBoost版本的半监督变点分类模型
Args:
base_df: 包含特征和标签的数据框
k_best: SelfTrainingClassifier的k_best参数
feature_cols: 特征列名列表
catboost_params: CatBoost参数字典
Returns:
预测结果数据框和统计信息
"""
if not CATBOOST_AVAILABLE:
raise ImportError("CatBoost not available. Install with: "
"pip install catboost")
if feature_cols is None:
feature_cols = [
"AbsDelta",
"slope",
"ΔMeanDiff",
"ΔMeanRatio",
"TimeSinceStart",
'holidaycount'
]
if catboost_params is None:
catboost_params = {
"depth": 3,
"learning_rate": 0.1,
"iterations": 200,
"colsample_bylevel": 0.8,
"loss_function": "Logloss",
"eval_metric": "Logloss",
"verbose": False,
"allow_writing_files": False,
"bootstrap_type": "Bayesian", # Better for small samples
}
df = base_df.copy()
y = np.full(len(df), -1, dtype=int)
y[df["Label"] == "Real"] = 1
y[df["Label"] == "Noise"] = 0
X = df[feature_cols].fillna(0).values
# 🎯 使用CatBoost替代XGBoost
base_clf = CatBoostClassifier(**catboost_params)
clf = SelfTrainingClassifier(
base_estimator=base_clf,
criterion="k_best",
k_best=k_best
)
unique_labels_in_y = np.unique(y[y != -1])
if len(unique_labels_in_y) < 2 and len(unique_labels_in_y) > 0:
print(f"Initial pseudo-labels only contain one class: "
f"{unique_labels_in_y}. Self-training may not be effective "
f"or may fail. Predictions might be skewed or based on "
f"initial labels only.")
try:
clf.fit(X, y)
trans = clf.transduction_
except ValueError as e:
print(f"SelfTrainingClassifier.fit error: {e}. This might be "
f"due to homogenous initial labels (e.g., all 'Real' "
f"or all 'Noise').")
trans = y.copy()
elif len(unique_labels_in_y) == 0:
print("No initial pseudo-labels (Real/Noise) found. "
"Self-training cannot proceed. All will be 'Unknown'.")
trans = y
else:
clf.fit(X, y)
trans = clf.transduction_
# 🔧 Fix: Same robust dtype handling as XGBoost version
trans = np.asarray(trans, dtype=int)
# Use pandas map for safer type handling
label_map = {1: "Real", 0: "Noise", -1: "Unknown"}
trans_series = pd.Series(trans)
predicted_labels = trans_series.map(label_map).fillna("Unknown")
# Assign to dataframe with explicit dtype specification
df = df.copy()
df["Predicted"] = predicted_labels.astype(str)
stats = df["Predicted"].value_counts(dropna=False).to_dict()
stats["k_best"] = k_best
stats["model_type"] = "CatBoost"
return df, stats
# 🔧 新增:统一的模型选择函数
def run_semi_supervised_cp_model_unified(
base_df: pd.DataFrame,
k_best: int = 10,
feature_cols: list[str] | None = None,
model_type: str = "xgboost",
model_params: dict | None = None,
) -> Tuple[pd.DataFrame, dict]:
"""
统一的半监督变点分类模型接口,支持XGBoost和CatBoost
Args:
base_df: 包含特征和标签的数据框
k_best: SelfTrainingClassifier的k_best参数
feature_cols: 特征列名列表
model_type: 模型类型,"xgboost" 或 "catboost"
model_params: 模型参数字典
Returns:
预测结果数据框和统计信息
"""
if model_type.lower() == "catboost":
return run_semi_supervised_cp_model_catboost(
base_df, k_best, feature_cols, model_params
)
elif model_type.lower() == "xgboost":
return run_semi_supervised_cp_model(
base_df, k_best, feature_cols, model_params
)
else:
raise ValueError(f"Unsupported model_type: {model_type}. "
f"Choose from 'xgboost' or 'catboost'")