"""cp_utils.py Utilities for evaluating changepoint credibility and performing semi-/supervised classification on changepoints. """ from __future__ import annotations import numpy as np import pandas as pd from typing import Tuple from statsmodels.tsa.stattools import adfuller from prophet import Prophet from sklearn.semi_supervised import SelfTrainingClassifier from xgboost import XGBClassifier # 🔧 新增:添加CatBoost支持 try: from catboost import CatBoostClassifier CATBOOST_AVAILABLE = True except ImportError: CATBOOST_AVAILABLE = False print("⚠️ CatBoost not installed. Install with: pip install catboost") # 1. 评估单栋楼的候选 changepoints —— ProphetDelta + 结构性指标 def _validate_cp_metrics(residual: pd.Series, idx: int, win: int = 6 ) -> Tuple[bool, float, float]: """在 idx±win 窗口内计算 (slope, adf_p) 并判断是否显著""" lo = max(0, idx - win) hi = min(len(residual), idx + win + 1) seg = residual.iloc[lo:hi].dropna() if seg.size < 4: return False, np.nan, np.nan slope = np.polyfit(range(len(seg)), seg, 1)[0] p_val = adfuller(seg)[1] is_valid = (abs(slope) > 0.1) and (p_val > 0.05) return is_valid, float(slope), float(p_val) def building_score_changepoints( summary_df: pd.DataFrame, filled_df: pd.DataFrame, building_name: str, model: str = "rbf", penalty: float | None = None, window_size: int = 6, usage_col: str = "FilledUse", date_col: str = "Date", cp_df: pd.DataFrame | None = None, ) -> pd.DataFrame: """Return table with Prophet delta & structure metrics for one building.""" from rupture_utils import detect_changepoints records: list[dict] = [] utilities = ( summary_df.loc[summary_df["BuildingName"] == building_name, "CommodityCode"].unique() ) for util in utilities: df_util = ( filled_df[ (filled_df["BuildingName"] == building_name) & (filled_df["CommodityCode"] == util) ] .sort_values(date_col) .reset_index(drop=True) ) if df_util[usage_col].isna().any() or len(df_util) < 24: continue if cp_df is None: cp_df = detect_changepoints( df_util[[date_col, usage_col]].rename( columns={date_col: "timestamp", usage_col: "value"}), algo="pelt", model=model, pen=penalty or 1.0, ) if cp_df.empty or cp_df["changepoint"].sum() == 0: continue df_p = df_util[[date_col, usage_col]].rename( columns={date_col: "ds", usage_col: "y"}) m_tmp = Prophet(yearly_seasonality=False, weekly_seasonality=False, daily_seasonality=False) m_tmp.fit(df_p) residual = df_p["y"] - m_tmp.predict(df_p)["yhat"] validated_dates: list[pd.Timestamp] = [] metrics_map: dict[pd.Timestamp, Tuple[float, float]] = {} for d in pd.to_datetime(cp_df.loc[cp_df["changepoint"] == 1, "timestamp"]): if d not in df_util[date_col].values: continue idx = df_util.index[df_util[date_col] == d][0] is_valid, slope, p_val = _validate_cp_metrics( residual, idx, win=window_size ) if is_valid: validated_dates.append(d) metrics_map[d] = (slope, p_val) if not validated_dates: continue m = Prophet(changepoints=validated_dates, yearly_seasonality=False) m.fit(df_p) deltas = m.params["delta"].mean(axis=0) for cp, delta in zip(m.changepoints, deltas): d = pd.to_datetime(cp) slope, p_val = metrics_map.get(d, (np.nan, np.nan)) records.append( { "Building Name": building_name, "CommodityCode": util, "Changepoint Date": d, "ProphetDelta": float(delta), "slope": float(slope), "adf_p_value": float(p_val), } ) result_df = pd.DataFrame(records) if not result_df.empty: result_df["AbsDelta"] = result_df["ProphetDelta"].abs() return result_df # 2. 伪标签打标 def label_changepoints_by_structure_signal( df: pd.DataFrame, slope_thresh: float = 0.1, p_thresh: float = 0.05, ) -> pd.DataFrame: """Assign pseudo-labels Real / Noise / Unknown based on structure signals.""" def _assign(row): s, p = row["slope"], row["adf_p_value"] if pd.isna(s) or pd.isna(p): return "Unknown" if (abs(s) > slope_thresh) and (p > p_thresh): return "Real" if (abs(s) < slope_thresh * 0.5) and (p < p_thresh * 0.5): return "Noise" return "Unknown" out = df.copy() out["Label"] = out.apply(_assign, axis=1) return out # 3. 时序衍生特征 def extract_changepoint_features( cp_df: pd.DataFrame, filled_df: pd.DataFrame, usage_col: str = "FilledUse", date_col: str = "Date", mean_win: int = 6, ) -> pd.DataFrame: """Derive mean diff/ratio and temporal context features for each cp, 并合并holidaycount特征(如有)""" cp_df = cp_df.copy() # 🔧 Fix: Ensure proper data types for TimeIndex and Season columns cp_df["TimeIndex"] = cp_df["Changepoint Date"].dt.month.astype('int64') # 🔧 Fix: Convert Season to categorical codes to avoid string/numeric # dtype conflicts season_mapping = { 6: 0, 7: 0, 8: 0, # Summer = 0 12: 1, 1: 1, 2: 1, # Winter = 1 } # Other = 2 season_col = cp_df["TimeIndex"].map(season_mapping).fillna(2) cp_df["Season"] = season_col.astype('int64') min_dates = filled_df.groupby("BuildingName")[date_col].min().to_dict() for i, row in cp_df.iterrows(): bld = row["Building Name"] cp_date = row["Changepoint Date"] df_bld = ( filled_df[filled_df["BuildingName"] == bld] .sort_values(date_col) .reset_index(drop=True) ) if cp_date not in df_bld[date_col].values: continue idx = df_bld.index[df_bld[date_col] == cp_date][0] before_vals = df_bld[usage_col].iloc[max(0, idx - mean_win): idx] after_vals = df_bld[usage_col].iloc[idx + 1: idx + mean_win + 1] before_mean = before_vals.mean() if len(before_vals) else np.nan after_mean = after_vals.mean() if len(after_vals) else np.nan diff = after_mean - before_mean if np.isfinite(before_mean) and np.isfinite(after_mean) else np.nan ratio = after_mean / before_mean if np.isfinite(before_mean) and before_mean != 0 else np.nan cp_df.at[i, "ΔMeanBefore"] = before_mean cp_df.at[i, "ΔMeanAfter"] = after_mean cp_df.at[i, "ΔMeanDiff"] = diff cp_df.at[i, "ΔMeanRatio"] = ratio cp_df.at[i, "TimeSinceStart"] = (cp_date - min_dates.get(bld, cp_date)).days # 🔧 Fix: Ensure all numeric columns have consistent dtypes numeric_cols = ["ΔMeanBefore", "ΔMeanAfter", "ΔMeanDiff", "ΔMeanRatio", "TimeSinceStart", "TimeIndex", "Season"] for col in numeric_cols: if col in cp_df.columns: cp_df[col] = pd.to_numeric(cp_df[col], errors='coerce') if "holidaycount" in filled_df.columns: # 只保留合并所需的列,避免重复 holiday_df = filled_df[["BuildingName", date_col, "holidaycount"]].drop_duplicates() holiday_df = holiday_df.rename( columns={"BuildingName": "Building Name", date_col: "Changepoint Date"} ) # 🔧 Fix: Ensure holidaycount is numeric holiday_df["holidaycount"] = pd.to_numeric( holiday_df["holidaycount"], errors='coerce' ).fillna(0) cp_df = cp_df.merge( holiday_df, on=["Building Name", "Changepoint Date"], how="left" ) # Fill any missing holidaycount values with 0 cp_df["holidaycount"] = cp_df["holidaycount"].fillna(0) return cp_df # 4. 半监督模型 (Self-Training XGBoost) def run_semi_supervised_cp_model( base_df: pd.DataFrame, k_best: int = 10, feature_cols: list[str] | None = None, xgb_params: dict | None = None, ) -> Tuple[pd.DataFrame, dict]: """Return preds_df (with Predicted) and simple stats.""" if feature_cols is None: feature_cols = [ "AbsDelta", "slope", "ΔMeanDiff", "ΔMeanRatio", "TimeSinceStart", 'holidaycount' ] if xgb_params is None: xgb_params = { "max_depth": 3, "learning_rate": 0.1, "n_estimators": 200, "subsample": 0.8, "colsample_bytree": 0.8, "objective": "binary:logistic", "eval_metric": "logloss", "verbosity": 0, } df = base_df.copy() y = np.full(len(df), -1, dtype=int) y[df["Label"] == "Real"] = 1 y[df["Label"] == "Noise"] = 0 X = df[feature_cols].fillna(0).values base_clf = XGBClassifier(**xgb_params) clf = SelfTrainingClassifier( base_estimator=base_clf, criterion="k_best", k_best=k_best ) unique_labels_in_y = np.unique(y[y != -1]) if len(unique_labels_in_y) < 2 and len(unique_labels_in_y) > 0: print(f"Initial pseudo-labels only contain one class: " f"{unique_labels_in_y}. Self-training may not be effective " f"or may fail. Predictions might be skewed or based on " f"initial labels only.") try: clf.fit(X, y) trans = clf.transduction_ except ValueError as e: print(f"SelfTrainingClassifier.fit error: {e}. This might be " f"due to homogenous initial labels (e.g., all 'Real' " f"or all 'Noise').") trans = y.copy() elif len(unique_labels_in_y) == 0: print("No initial pseudo-labels (Real/Noise) found. " "Self-training cannot proceed. All will be 'Unknown'.") trans = y else: clf.fit(X, y) trans = clf.transduction_ # 🔧 Fix: More robust dtype handling for np.select # Ensure trans is integer type and handle any potential issues trans = np.asarray(trans, dtype=int) # 🔧 Fix: Alternative approach - using pandas map for safer type handling # Create mapping dict and use pandas functionality instead of np.select label_map = {1: "Real", 0: "Noise", -1: "Unknown"} # Convert to pandas Series for safer dtype handling trans_series = pd.Series(trans) predicted_labels = trans_series.map(label_map).fillna("Unknown") # Assign to dataframe with explicit dtype specification df = df.copy() # Ensure we work with a clean copy df["Predicted"] = predicted_labels.astype(str) stats = df["Predicted"].value_counts(dropna=False).to_dict() stats["k_best"] = k_best return df, stats # 🔧 新增:CatBoost版本的半监督模型 def run_semi_supervised_cp_model_catboost( base_df: pd.DataFrame, k_best: int = 10, feature_cols: list[str] | None = None, catboost_params: dict | None = None, ) -> Tuple[pd.DataFrame, dict]: """ CatBoost版本的半监督变点分类模型 Args: base_df: 包含特征和标签的数据框 k_best: SelfTrainingClassifier的k_best参数 feature_cols: 特征列名列表 catboost_params: CatBoost参数字典 Returns: 预测结果数据框和统计信息 """ if not CATBOOST_AVAILABLE: raise ImportError("CatBoost not available. Install with: " "pip install catboost") if feature_cols is None: feature_cols = [ "AbsDelta", "slope", "ΔMeanDiff", "ΔMeanRatio", "TimeSinceStart", 'holidaycount' ] if catboost_params is None: catboost_params = { "depth": 3, "learning_rate": 0.1, "iterations": 200, "colsample_bylevel": 0.8, "loss_function": "Logloss", "eval_metric": "Logloss", "verbose": False, "allow_writing_files": False, "bootstrap_type": "Bayesian", # Better for small samples } df = base_df.copy() y = np.full(len(df), -1, dtype=int) y[df["Label"] == "Real"] = 1 y[df["Label"] == "Noise"] = 0 X = df[feature_cols].fillna(0).values # 🎯 使用CatBoost替代XGBoost base_clf = CatBoostClassifier(**catboost_params) clf = SelfTrainingClassifier( base_estimator=base_clf, criterion="k_best", k_best=k_best ) unique_labels_in_y = np.unique(y[y != -1]) if len(unique_labels_in_y) < 2 and len(unique_labels_in_y) > 0: print(f"Initial pseudo-labels only contain one class: " f"{unique_labels_in_y}. Self-training may not be effective " f"or may fail. Predictions might be skewed or based on " f"initial labels only.") try: clf.fit(X, y) trans = clf.transduction_ except ValueError as e: print(f"SelfTrainingClassifier.fit error: {e}. This might be " f"due to homogenous initial labels (e.g., all 'Real' " f"or all 'Noise').") trans = y.copy() elif len(unique_labels_in_y) == 0: print("No initial pseudo-labels (Real/Noise) found. " "Self-training cannot proceed. All will be 'Unknown'.") trans = y else: clf.fit(X, y) trans = clf.transduction_ # 🔧 Fix: Same robust dtype handling as XGBoost version trans = np.asarray(trans, dtype=int) # Use pandas map for safer type handling label_map = {1: "Real", 0: "Noise", -1: "Unknown"} trans_series = pd.Series(trans) predicted_labels = trans_series.map(label_map).fillna("Unknown") # Assign to dataframe with explicit dtype specification df = df.copy() df["Predicted"] = predicted_labels.astype(str) stats = df["Predicted"].value_counts(dropna=False).to_dict() stats["k_best"] = k_best stats["model_type"] = "CatBoost" return df, stats # 🔧 新增:统一的模型选择函数 def run_semi_supervised_cp_model_unified( base_df: pd.DataFrame, k_best: int = 10, feature_cols: list[str] | None = None, model_type: str = "xgboost", model_params: dict | None = None, ) -> Tuple[pd.DataFrame, dict]: """ 统一的半监督变点分类模型接口,支持XGBoost和CatBoost Args: base_df: 包含特征和标签的数据框 k_best: SelfTrainingClassifier的k_best参数 feature_cols: 特征列名列表 model_type: 模型类型,"xgboost" 或 "catboost" model_params: 模型参数字典 Returns: 预测结果数据框和统计信息 """ if model_type.lower() == "catboost": return run_semi_supervised_cp_model_catboost( base_df, k_best, feature_cols, model_params ) elif model_type.lower() == "xgboost": return run_semi_supervised_cp_model( base_df, k_best, feature_cols, model_params ) else: raise ValueError(f"Unsupported model_type: {model_type}. " f"Choose from 'xgboost' or 'catboost'")