Spaces:
No application file
No application file
| """cp_utils.py | |
| Utilities for evaluating changepoint credibility and performing | |
| semi-/supervised classification on changepoints. | |
| """ | |
| from __future__ import annotations | |
| import numpy as np | |
| import pandas as pd | |
| from typing import Tuple | |
| from statsmodels.tsa.stattools import adfuller | |
| from prophet import Prophet | |
| from sklearn.semi_supervised import SelfTrainingClassifier | |
| from xgboost import XGBClassifier | |
| # 🔧 新增:添加CatBoost支持 | |
| try: | |
| from catboost import CatBoostClassifier | |
| CATBOOST_AVAILABLE = True | |
| except ImportError: | |
| CATBOOST_AVAILABLE = False | |
| print("⚠️ CatBoost not installed. Install with: pip install catboost") | |
| # 1. 评估单栋楼的候选 changepoints —— ProphetDelta + 结构性指标 | |
| def _validate_cp_metrics(residual: pd.Series, idx: int, win: int = 6 | |
| ) -> Tuple[bool, float, float]: | |
| """在 idx±win 窗口内计算 (slope, adf_p) 并判断是否显著""" | |
| lo = max(0, idx - win) | |
| hi = min(len(residual), idx + win + 1) | |
| seg = residual.iloc[lo:hi].dropna() | |
| if seg.size < 4: | |
| return False, np.nan, np.nan | |
| slope = np.polyfit(range(len(seg)), seg, 1)[0] | |
| p_val = adfuller(seg)[1] | |
| is_valid = (abs(slope) > 0.1) and (p_val > 0.05) | |
| return is_valid, float(slope), float(p_val) | |
| def building_score_changepoints( | |
| summary_df: pd.DataFrame, | |
| filled_df: pd.DataFrame, | |
| building_name: str, | |
| model: str = "rbf", | |
| penalty: float | None = None, | |
| window_size: int = 6, | |
| usage_col: str = "FilledUse", | |
| date_col: str = "Date", | |
| cp_df: pd.DataFrame | None = None, | |
| ) -> pd.DataFrame: | |
| """Return table with Prophet delta & structure metrics for one building.""" | |
| from rupture_utils import detect_changepoints | |
| records: list[dict] = [] | |
| utilities = ( | |
| summary_df.loc[summary_df["BuildingName"] == building_name, | |
| "CommodityCode"].unique() | |
| ) | |
| for util in utilities: | |
| df_util = ( | |
| filled_df[ | |
| (filled_df["BuildingName"] == building_name) | |
| & (filled_df["CommodityCode"] == util) | |
| ] | |
| .sort_values(date_col) | |
| .reset_index(drop=True) | |
| ) | |
| if df_util[usage_col].isna().any() or len(df_util) < 24: | |
| continue | |
| if cp_df is None: | |
| cp_df = detect_changepoints( | |
| df_util[[date_col, usage_col]].rename( | |
| columns={date_col: "timestamp", usage_col: "value"}), | |
| algo="pelt", | |
| model=model, | |
| pen=penalty or 1.0, | |
| ) | |
| if cp_df.empty or cp_df["changepoint"].sum() == 0: | |
| continue | |
| df_p = df_util[[date_col, usage_col]].rename( | |
| columns={date_col: "ds", usage_col: "y"}) | |
| m_tmp = Prophet(yearly_seasonality=False, weekly_seasonality=False, | |
| daily_seasonality=False) | |
| m_tmp.fit(df_p) | |
| residual = df_p["y"] - m_tmp.predict(df_p)["yhat"] | |
| validated_dates: list[pd.Timestamp] = [] | |
| metrics_map: dict[pd.Timestamp, Tuple[float, float]] = {} | |
| for d in pd.to_datetime(cp_df.loc[cp_df["changepoint"] == 1, | |
| "timestamp"]): | |
| if d not in df_util[date_col].values: | |
| continue | |
| idx = df_util.index[df_util[date_col] == d][0] | |
| is_valid, slope, p_val = _validate_cp_metrics( | |
| residual, idx, win=window_size | |
| ) | |
| if is_valid: | |
| validated_dates.append(d) | |
| metrics_map[d] = (slope, p_val) | |
| if not validated_dates: | |
| continue | |
| m = Prophet(changepoints=validated_dates, yearly_seasonality=False) | |
| m.fit(df_p) | |
| deltas = m.params["delta"].mean(axis=0) | |
| for cp, delta in zip(m.changepoints, deltas): | |
| d = pd.to_datetime(cp) | |
| slope, p_val = metrics_map.get(d, (np.nan, np.nan)) | |
| records.append( | |
| { | |
| "Building Name": building_name, | |
| "CommodityCode": util, | |
| "Changepoint Date": d, | |
| "ProphetDelta": float(delta), | |
| "slope": float(slope), | |
| "adf_p_value": float(p_val), | |
| } | |
| ) | |
| result_df = pd.DataFrame(records) | |
| if not result_df.empty: | |
| result_df["AbsDelta"] = result_df["ProphetDelta"].abs() | |
| return result_df | |
| # 2. 伪标签打标 | |
| def label_changepoints_by_structure_signal( | |
| df: pd.DataFrame, | |
| slope_thresh: float = 0.1, | |
| p_thresh: float = 0.05, | |
| ) -> pd.DataFrame: | |
| """Assign pseudo-labels Real / Noise / Unknown based on structure | |
| signals.""" | |
| def _assign(row): | |
| s, p = row["slope"], row["adf_p_value"] | |
| if pd.isna(s) or pd.isna(p): | |
| return "Unknown" | |
| if (abs(s) > slope_thresh) and (p > p_thresh): | |
| return "Real" | |
| if (abs(s) < slope_thresh * 0.5) and (p < p_thresh * 0.5): | |
| return "Noise" | |
| return "Unknown" | |
| out = df.copy() | |
| out["Label"] = out.apply(_assign, axis=1) | |
| return out | |
| # 3. 时序衍生特征 | |
| def extract_changepoint_features( | |
| cp_df: pd.DataFrame, | |
| filled_df: pd.DataFrame, | |
| usage_col: str = "FilledUse", | |
| date_col: str = "Date", | |
| mean_win: int = 6, | |
| ) -> pd.DataFrame: | |
| """Derive mean diff/ratio and temporal context features for each cp, | |
| 并合并holidaycount特征(如有)""" | |
| cp_df = cp_df.copy() | |
| # 🔧 Fix: Ensure proper data types for TimeIndex and Season columns | |
| cp_df["TimeIndex"] = cp_df["Changepoint Date"].dt.month.astype('int64') | |
| # 🔧 Fix: Convert Season to categorical codes to avoid string/numeric | |
| # dtype conflicts | |
| season_mapping = { | |
| 6: 0, 7: 0, 8: 0, # Summer = 0 | |
| 12: 1, 1: 1, 2: 1, # Winter = 1 | |
| } | |
| # Other = 2 | |
| season_col = cp_df["TimeIndex"].map(season_mapping).fillna(2) | |
| cp_df["Season"] = season_col.astype('int64') | |
| min_dates = filled_df.groupby("BuildingName")[date_col].min().to_dict() | |
| for i, row in cp_df.iterrows(): | |
| bld = row["Building Name"] | |
| cp_date = row["Changepoint Date"] | |
| df_bld = ( | |
| filled_df[filled_df["BuildingName"] == bld] | |
| .sort_values(date_col) | |
| .reset_index(drop=True) | |
| ) | |
| if cp_date not in df_bld[date_col].values: | |
| continue | |
| idx = df_bld.index[df_bld[date_col] == cp_date][0] | |
| before_vals = df_bld[usage_col].iloc[max(0, idx - mean_win): idx] | |
| after_vals = df_bld[usage_col].iloc[idx + 1: idx + mean_win + 1] | |
| before_mean = before_vals.mean() if len(before_vals) else np.nan | |
| after_mean = after_vals.mean() if len(after_vals) else np.nan | |
| diff = after_mean - before_mean if np.isfinite(before_mean) and np.isfinite(after_mean) else np.nan | |
| ratio = after_mean / before_mean if np.isfinite(before_mean) and before_mean != 0 else np.nan | |
| cp_df.at[i, "ΔMeanBefore"] = before_mean | |
| cp_df.at[i, "ΔMeanAfter"] = after_mean | |
| cp_df.at[i, "ΔMeanDiff"] = diff | |
| cp_df.at[i, "ΔMeanRatio"] = ratio | |
| cp_df.at[i, "TimeSinceStart"] = (cp_date - min_dates.get(bld, cp_date)).days | |
| # 🔧 Fix: Ensure all numeric columns have consistent dtypes | |
| numeric_cols = ["ΔMeanBefore", "ΔMeanAfter", "ΔMeanDiff", "ΔMeanRatio", | |
| "TimeSinceStart", "TimeIndex", "Season"] | |
| for col in numeric_cols: | |
| if col in cp_df.columns: | |
| cp_df[col] = pd.to_numeric(cp_df[col], errors='coerce') | |
| if "holidaycount" in filled_df.columns: | |
| # 只保留合并所需的列,避免重复 | |
| holiday_df = filled_df[["BuildingName", date_col, "holidaycount"]].drop_duplicates() | |
| holiday_df = holiday_df.rename( | |
| columns={"BuildingName": "Building Name", date_col: "Changepoint Date"} | |
| ) | |
| # 🔧 Fix: Ensure holidaycount is numeric | |
| holiday_df["holidaycount"] = pd.to_numeric( | |
| holiday_df["holidaycount"], errors='coerce' | |
| ).fillna(0) | |
| cp_df = cp_df.merge( | |
| holiday_df, on=["Building Name", "Changepoint Date"], how="left" | |
| ) | |
| # Fill any missing holidaycount values with 0 | |
| cp_df["holidaycount"] = cp_df["holidaycount"].fillna(0) | |
| return cp_df | |
| # 4. 半监督模型 (Self-Training XGBoost) | |
| def run_semi_supervised_cp_model( | |
| base_df: pd.DataFrame, | |
| k_best: int = 10, | |
| feature_cols: list[str] | None = None, | |
| xgb_params: dict | None = None, | |
| ) -> Tuple[pd.DataFrame, dict]: | |
| """Return preds_df (with Predicted) and simple stats.""" | |
| if feature_cols is None: | |
| feature_cols = [ | |
| "AbsDelta", | |
| "slope", | |
| "ΔMeanDiff", | |
| "ΔMeanRatio", | |
| "TimeSinceStart", | |
| 'holidaycount' | |
| ] | |
| if xgb_params is None: | |
| xgb_params = { | |
| "max_depth": 3, | |
| "learning_rate": 0.1, | |
| "n_estimators": 200, | |
| "subsample": 0.8, | |
| "colsample_bytree": 0.8, | |
| "objective": "binary:logistic", | |
| "eval_metric": "logloss", | |
| "verbosity": 0, | |
| } | |
| df = base_df.copy() | |
| y = np.full(len(df), -1, dtype=int) | |
| y[df["Label"] == "Real"] = 1 | |
| y[df["Label"] == "Noise"] = 0 | |
| X = df[feature_cols].fillna(0).values | |
| base_clf = XGBClassifier(**xgb_params) | |
| clf = SelfTrainingClassifier( | |
| base_estimator=base_clf, | |
| criterion="k_best", | |
| k_best=k_best | |
| ) | |
| unique_labels_in_y = np.unique(y[y != -1]) | |
| if len(unique_labels_in_y) < 2 and len(unique_labels_in_y) > 0: | |
| print(f"Initial pseudo-labels only contain one class: " | |
| f"{unique_labels_in_y}. Self-training may not be effective " | |
| f"or may fail. Predictions might be skewed or based on " | |
| f"initial labels only.") | |
| try: | |
| clf.fit(X, y) | |
| trans = clf.transduction_ | |
| except ValueError as e: | |
| print(f"SelfTrainingClassifier.fit error: {e}. This might be " | |
| f"due to homogenous initial labels (e.g., all 'Real' " | |
| f"or all 'Noise').") | |
| trans = y.copy() | |
| elif len(unique_labels_in_y) == 0: | |
| print("No initial pseudo-labels (Real/Noise) found. " | |
| "Self-training cannot proceed. All will be 'Unknown'.") | |
| trans = y | |
| else: | |
| clf.fit(X, y) | |
| trans = clf.transduction_ | |
| # 🔧 Fix: More robust dtype handling for np.select | |
| # Ensure trans is integer type and handle any potential issues | |
| trans = np.asarray(trans, dtype=int) | |
| # 🔧 Fix: Alternative approach - using pandas map for safer type handling | |
| # Create mapping dict and use pandas functionality instead of np.select | |
| label_map = {1: "Real", 0: "Noise", -1: "Unknown"} | |
| # Convert to pandas Series for safer dtype handling | |
| trans_series = pd.Series(trans) | |
| predicted_labels = trans_series.map(label_map).fillna("Unknown") | |
| # Assign to dataframe with explicit dtype specification | |
| df = df.copy() # Ensure we work with a clean copy | |
| df["Predicted"] = predicted_labels.astype(str) | |
| stats = df["Predicted"].value_counts(dropna=False).to_dict() | |
| stats["k_best"] = k_best | |
| return df, stats | |
| # 🔧 新增:CatBoost版本的半监督模型 | |
| def run_semi_supervised_cp_model_catboost( | |
| base_df: pd.DataFrame, | |
| k_best: int = 10, | |
| feature_cols: list[str] | None = None, | |
| catboost_params: dict | None = None, | |
| ) -> Tuple[pd.DataFrame, dict]: | |
| """ | |
| CatBoost版本的半监督变点分类模型 | |
| Args: | |
| base_df: 包含特征和标签的数据框 | |
| k_best: SelfTrainingClassifier的k_best参数 | |
| feature_cols: 特征列名列表 | |
| catboost_params: CatBoost参数字典 | |
| Returns: | |
| 预测结果数据框和统计信息 | |
| """ | |
| if not CATBOOST_AVAILABLE: | |
| raise ImportError("CatBoost not available. Install with: " | |
| "pip install catboost") | |
| if feature_cols is None: | |
| feature_cols = [ | |
| "AbsDelta", | |
| "slope", | |
| "ΔMeanDiff", | |
| "ΔMeanRatio", | |
| "TimeSinceStart", | |
| 'holidaycount' | |
| ] | |
| if catboost_params is None: | |
| catboost_params = { | |
| "depth": 3, | |
| "learning_rate": 0.1, | |
| "iterations": 200, | |
| "colsample_bylevel": 0.8, | |
| "loss_function": "Logloss", | |
| "eval_metric": "Logloss", | |
| "verbose": False, | |
| "allow_writing_files": False, | |
| "bootstrap_type": "Bayesian", # Better for small samples | |
| } | |
| df = base_df.copy() | |
| y = np.full(len(df), -1, dtype=int) | |
| y[df["Label"] == "Real"] = 1 | |
| y[df["Label"] == "Noise"] = 0 | |
| X = df[feature_cols].fillna(0).values | |
| # 🎯 使用CatBoost替代XGBoost | |
| base_clf = CatBoostClassifier(**catboost_params) | |
| clf = SelfTrainingClassifier( | |
| base_estimator=base_clf, | |
| criterion="k_best", | |
| k_best=k_best | |
| ) | |
| unique_labels_in_y = np.unique(y[y != -1]) | |
| if len(unique_labels_in_y) < 2 and len(unique_labels_in_y) > 0: | |
| print(f"Initial pseudo-labels only contain one class: " | |
| f"{unique_labels_in_y}. Self-training may not be effective " | |
| f"or may fail. Predictions might be skewed or based on " | |
| f"initial labels only.") | |
| try: | |
| clf.fit(X, y) | |
| trans = clf.transduction_ | |
| except ValueError as e: | |
| print(f"SelfTrainingClassifier.fit error: {e}. This might be " | |
| f"due to homogenous initial labels (e.g., all 'Real' " | |
| f"or all 'Noise').") | |
| trans = y.copy() | |
| elif len(unique_labels_in_y) == 0: | |
| print("No initial pseudo-labels (Real/Noise) found. " | |
| "Self-training cannot proceed. All will be 'Unknown'.") | |
| trans = y | |
| else: | |
| clf.fit(X, y) | |
| trans = clf.transduction_ | |
| # 🔧 Fix: Same robust dtype handling as XGBoost version | |
| trans = np.asarray(trans, dtype=int) | |
| # Use pandas map for safer type handling | |
| label_map = {1: "Real", 0: "Noise", -1: "Unknown"} | |
| trans_series = pd.Series(trans) | |
| predicted_labels = trans_series.map(label_map).fillna("Unknown") | |
| # Assign to dataframe with explicit dtype specification | |
| df = df.copy() | |
| df["Predicted"] = predicted_labels.astype(str) | |
| stats = df["Predicted"].value_counts(dropna=False).to_dict() | |
| stats["k_best"] = k_best | |
| stats["model_type"] = "CatBoost" | |
| return df, stats | |
| # 🔧 新增:统一的模型选择函数 | |
| def run_semi_supervised_cp_model_unified( | |
| base_df: pd.DataFrame, | |
| k_best: int = 10, | |
| feature_cols: list[str] | None = None, | |
| model_type: str = "xgboost", | |
| model_params: dict | None = None, | |
| ) -> Tuple[pd.DataFrame, dict]: | |
| """ | |
| 统一的半监督变点分类模型接口,支持XGBoost和CatBoost | |
| Args: | |
| base_df: 包含特征和标签的数据框 | |
| k_best: SelfTrainingClassifier的k_best参数 | |
| feature_cols: 特征列名列表 | |
| model_type: 模型类型,"xgboost" 或 "catboost" | |
| model_params: 模型参数字典 | |
| Returns: | |
| 预测结果数据框和统计信息 | |
| """ | |
| if model_type.lower() == "catboost": | |
| return run_semi_supervised_cp_model_catboost( | |
| base_df, k_best, feature_cols, model_params | |
| ) | |
| elif model_type.lower() == "xgboost": | |
| return run_semi_supervised_cp_model( | |
| base_df, k_best, feature_cols, model_params | |
| ) | |
| else: | |
| raise ValueError(f"Unsupported model_type: {model_type}. " | |
| f"Choose from 'xgboost' or 'catboost'") |