import pandas as pd import numpy as np def analyze_and_fill_usage( usage_data: pd.DataFrame, gap_threshold: int = 62, fill_earliest_cutoff: str = "2013-01-01", min_fill_gap_months: int = 9, rolling_window_size: int = 4, rolling_centered: bool = True, sequence_fill_method: str = "mean", post_missing_threshold: float = 0.1, ) -> pd.DataFrame: """ 对 usage_data 进行缺失类型分析,计算 FillStartDate 和 NotGonnaUse, 返回 summary_df,只包含统计指标,不做时间序列填补。 """ df = usage_data.copy() df["StartDate"] = pd.to_datetime(df["StartDate"]) df["EndDate"] = pd.to_datetime(df["EndDate"]) - pd.Timedelta(days=2) # 🔧 修复:自动检测建筑列名,兼容两种格式 building_col = None if "BuildingName" in df.columns: building_col = "BuildingName" elif "Building Name" in df.columns: building_col = "Building Name" # 创建标准化列名供后续使用 df["BuildingName"] = df["Building Name"] else: raise ValueError("Neither 'BuildingName' nor 'Building Name' column found in usage data") records = [] # 🔧 修复:始终使用BuildingName进行groupby,确保一致性 for (bld, util), grp in df.groupby(["BuildingName", "CommodityCode"]): # 完整月份索引 start = grp["StartDate"].min().replace(day=1) end = grp["StartDate"].max().replace(day=1) full_idx = pd.date_range(start, end, freq="MS") flag = pd.Series(0, index=full_idx) flag.loc[grp["StartDate"].dt.to_period("M").dt.to_timestamp()] = 1 missing = flag[flag == 0].index # 统计 Random / Sequence 缺失 seq_ranges, rand_dates = [], [] rand_months = seq_months = 0 if missing.empty: mtype = "No Missing" else: gaps = missing.to_series().diff().dt.days.fillna(9999) gid = (gaps > gap_threshold).cumsum() for _, seg in missing.to_series().groupby(gid): if len(seg) > 1: seq_ranges.append( f"From {seg.min().strftime('%Y-%m')} to {seg.max().strftime('%Y-%m')}" ) seq_months += len(seg) else: rand_dates.append(seg.iloc[0].strftime("%Y-%m")) rand_months += 1 has_rand, has_seq = bool(rand_dates), bool(seq_ranges) if has_rand and has_seq: mtype = "Both" elif has_rand: mtype = "Random" else: mtype = "Sequence" records.append( { "BuildingName": bld, "CommodityCode": util, "MissingType": mtype, "SequenceMissingRanges": "; ".join(seq_ranges), "RandomMissingDates": "; ".join(rand_dates), "TotalMonths": len(full_idx), "RandomMissingMonths": rand_months, "SequenceMissingMonths": seq_months, "RandomMissingRatio": rand_months / len(full_idx) if full_idx.size else 0, "SequenceMissingRatio": seq_months / len(full_idx) if full_idx.size else 0, } ) summary_df = pd.DataFrame(records) # ------------------------------------------------------------- # 计算 FillStartDate # ------------------------------------------------------------- cutoff_dt = pd.to_datetime(fill_earliest_cutoff) def get_fill_start(r): """ 根据用户澄清的正确策略计算FillStartDate: 1. 不管2013年之前是否有序列缺失,都检查2013年之后是否存在≥9个月的序列缺失 2. 如果2013年之后存在≥9个月缺失 → 返回缺失结束时间+1个月 3. 如果2013年之后没有≥9个月缺失 → 返回2013-01-01 """ try: # 解析所有序列缺失范围 seq_ranges = [] seq_missing_ranges = r.get("SequenceMissingRanges", "") if not seq_missing_ranges or pd.isna(seq_missing_ranges): # 没有序列缺失数据,从2013年开始 return cutoff_dt for rng in str(seq_missing_ranges).split("; "): if "to" not in rng: continue try: s, e = rng.replace("From ", "").split(" to ") sd, ed = pd.to_datetime(s), pd.to_datetime(e) gap = (ed.to_period("M") - sd.to_period("M")).n + 1 seq_ranges.append((sd, ed, gap)) except Exception: # 跳过无法解析的日期范围,继续处理其他范围 continue if not seq_ranges: # 没有有效的序列缺失,从2013年开始 return cutoff_dt # 🔍 关键:只关注2013年之后的序列缺失(开始时间>=2013-01-01) # 🔧 修复:确保正确访问min_fill_gap_months变量 post_2013_missing = [ (sd, ed, gap) for sd, ed, gap in seq_ranges if sd >= cutoff_dt and gap >= min_fill_gap_months ] # 🔍 如果2013年之后存在≥9个月的序列缺失 if post_2013_missing: # 按开始时间排序,取第一个符合条件的缺失 post_2013_missing.sort(key=lambda x: x[0]) sd, ed, gap = post_2013_missing[0] return ed + pd.offsets.MonthBegin(1) # 🔍 如果2013年之后没有≥9个月的序列缺失,从2013年开始 return cutoff_dt except Exception as e: # 🔧 关键修复:如果任何解析步骤失败,总是返回默认的cutoff_dt # 这确保get_fill_start永远不会抛出异常,从而避免pandas.apply返回NaT # 🔧 新增:记录异常信息用于调试 import traceback print(f"get_fill_start exception for {r.get('BuildingName', 'Unknown')}: {e}") print(f"Traceback: {traceback.format_exc()}") return cutoff_dt summary_df["FillStartDate"] = summary_df.apply(get_fill_start, axis=1) # ------------------------------------------------------------- # 计算填充后缺失比率 # ------------------------------------------------------------- post_recs = [] for _, r in summary_df.iterrows(): bld, util, fsd = r["BuildingName"], r["CommodityCode"], r["FillStartDate"] if pd.isna(fsd): continue grp2 = df[ (df["BuildingName"] == bld) & (df["CommodityCode"] == util) & (df["StartDate"] >= fsd) ] if grp2.empty: continue idx2 = pd.date_range( fsd.replace(day=1), grp2["StartDate"].max().replace(day=1), freq="MS" ) flag2 = pd.Series(0, index=idx2) flag2.loc[grp2["StartDate"].dt.to_period("M").dt.to_timestamp()] = 1 miss2 = flag2[flag2 == 0].index gaps2 = pd.Series(miss2).diff().dt.days.fillna(9999) gid2 = (gaps2 > gap_threshold).cumsum() rm2 = sm2 = 0 for _, seg2 in pd.Series(miss2).groupby(gid2): if len(seg2) > 1: sm2 += len(seg2) else: rm2 += 1 post_recs.append( { "BuildingName": bld, "CommodityCode": util, "PostTotalMonths": len(idx2), "PostRandomMissingMonths": rm2, "PostSequenceMissingMonths": sm2, "PostRandomMissingRatio": rm2 / len(idx2) if idx2.size else 0, "PostSequenceMissingRatio": sm2 / len(idx2) if idx2.size else 0, } ) post_df = pd.DataFrame(post_recs) # 🔧 修复:如果post_df为空,需要创建包含所有必要列的空DataFrame if post_df.empty: # 创建一个与summary_df结构匹配的空DataFrame post_df = pd.DataFrame(columns=[ "BuildingName", "CommodityCode", "PostTotalMonths", "PostRandomMissingMonths", "PostSequenceMissingMonths", "PostRandomMissingRatio", "PostSequenceMissingRatio" ]) summary_df = summary_df.merge(post_df, on=["BuildingName", "CommodityCode"], how="left") # 🔧 修复:填充缺失的post分析列为默认值 post_columns = ["PostTotalMonths", "PostRandomMissingMonths", "PostSequenceMissingMonths", "PostRandomMissingRatio", "PostSequenceMissingRatio"] for col in post_columns: if col not in summary_df.columns: if "Ratio" in col: summary_df[col] = 0.0 # 比率列默认为0 else: summary_df[col] = 0 # 月数列默认为0 summary_df["NotGonnaUse"] = ( (summary_df["PostRandomMissingRatio"] > post_missing_threshold) | (summary_df["PostSequenceMissingRatio"] > post_missing_threshold) ).astype(int) return summary_df def fill_usage_with_sequence_check_strict_mean( usage_data: pd.DataFrame, summary_df: pd.DataFrame, method: str = "mean", force: bool = False, fill_earliest_cutoff: str = "1900-01-01", ) -> pd.DataFrame: """ 根据 summary_df 的 FillStartDate / NotGonnaUse 对 usage_data 进行填补。 参数 ---------- usage_data : 原始用量表 summary_df : analyze_and_fill_usage 的结果 method : 'mean' 或 'median' force : True 时忽略 NotGonnaUse 和 NaT,自动调整起点,保证能输出序列 fill_earliest_cutoff : 当 FillStartDate 为 NaT 时的回退起点(仅在 force=True 时使用) 返回 ---------- filled_df : ['BuildingName','CommodityCode','Date','FilledUse'] """ df = usage_data.copy() df["StartDate"] = pd.to_datetime(df["StartDate"]).dt.to_period("M").dt.to_timestamp() # 🔧 修复:自动检测建筑列名,兼容两种格式 building_col = None if "BuildingName" in df.columns: building_col = "BuildingName" elif "Building Name" in df.columns: building_col = "Building Name" # 创建标准化列名供后续使用 df["BuildingName"] = df["Building Name"] else: raise ValueError("Neither 'BuildingName' nor 'Building Name' column found in usage data") all_records = [] for _, row in summary_df.iterrows(): bld, util, fsd, drop = ( row["BuildingName"], row["CommodityCode"], row["FillStartDate"], row["NotGonnaUse"], ) # ───────── 闸门 1 & 2 ───────── if not force and (drop == 1 or pd.isna(fsd)): # 严格模式:缺失率过高 或 没有有效起点 → 直接跳过 continue if force and pd.isna(fsd): fsd = pd.to_datetime(fill_earliest_cutoff) # 取 >= fsd 的原始数据(使用标准化的BuildingName列) grp = df[ (df["BuildingName"] == bld) & (df["CommodityCode"] == util) & (df["StartDate"] >= fsd) ] # ───────── 闸门 3 ───────── if grp.empty: if not force: continue # 强制模式:回退到该组合最早月份 grp = df[(df["BuildingName"] == bld) & (df["CommodityCode"] == util)] if grp.empty: # 数据确实不存在 continue fsd = grp["StartDate"].min() last_m = grp["StartDate"].max() all_months = pd.date_range(fsd, last_m, freq="MS") monthly = grp.groupby("StartDate")["Use"].sum().reindex(all_months) base = monthly.dropna() fill_val = base.median() if method == "median" else base.mean() filled = monthly.fillna(fill_val).reset_index() filled.columns = ["Date", "FilledUse"] filled["BuildingName"] = bld filled["CommodityCode"] = util all_records.append(filled) if not all_records: return pd.DataFrame(columns=["BuildingName", "CommodityCode", "Date", "FilledUse"]) return pd.concat(all_records, ignore_index=True) # =============================================================== # LLM-based Weather Variable Selection Functions # =============================================================== # Building Type → Weather Variable Rule Mapping weather_influence_map = { "Office": ["temp_mean", "temp_std", "CDD_sum", "clouds_all_mean"], "Instructional": ["temp_mean", "temp_std", "CDD_sum", "humidity_mean"], "Residential": ["temp_mean", "HDD_sum", "CDD_sum", "humidity_mean"], "Health": ["temp_mean", "HDD_sum", "CDD_sum", "humidity_mean"], "Library": ["temp_mean", "temp_std", "humidity_mean", "clouds_all_mean"], "Dining": ["temp_mean", "rain_sum", "CDD_sum"], "Recreation": ["temp_mean", "rain_sum", "wind_speed_mean"], "Assembly or Theater": ["temp_mean", "wind_speed_mean", "rain_sum"], "Affiliate": ["temp_mean", "CDD_sum", "rain_sum"], "Parking Structure": [], "Infrastructure": [], "Container": [], "Mixed": ["temp_mean", "CDD_sum", "humidity_mean"], "Other": ["temp_mean", "CDD_sum"], } def infer_building_type(text: str) -> str: """推断建筑类型基于文本描述""" patterns = { "Instructional": ["Teaching", "Classroom", "School", "Lecture Hall", "Academic", "Education"], "Residential": ["Residential", "Apartment", "Dormitory", "Housing", "Student"], "Office": ["Office", "Office Building", "Administrative", "Admin"], "Health": ["Hospital", "Medical", "Clinic", "Health"], "Dining": ["Canteen", "Restaurant", "Dining", "Food", "Kitchen"], "Recreation": ["Fitness", "Sports", "Entertainment", "Recreation", "Gym"], "Library": ["Library"], "Assembly or Theater": ["Theater", "Auditorium", "Performance", "Assembly"], "Affiliate": ["Affiliate"], } text_lower = text.lower() for btype, keywords in patterns.items(): if any(k.lower() in text_lower for k in keywords): return btype return "Mixed" def construct_weather_prompt_static( user_description: str, detected_type: str, suggested_vars: list, gross_area: float, avg_space_sqft: float, workpoint_count: int, floor_count: int ) -> str: """构建静态weather prompt""" scenario_note = """ Weather-Scenario (z-score offset, user will pick one): • Normal → 0 σ offset (historical monthly mean) • Hot → +1 σ on temp_mean & CDD_sum, −0.5 σ on humidity_mean • ColdWet → −1 σ on temp_mean, +1 σ on HDD_sum & humidity_mean • WindyCloudy → +1 σ on wind_speed_mean & clouds_all_mean LLM only needs to recommend variables; offsets are applied downstream. """ return f""" You are an expert in building energy modeling and changepoint detection. {scenario_note} Building Description (current use only): {user_description} Inferred Operation Type: {detected_type} Structural Information: - Building Gross Area: {gross_area:,.0f} sq ft - Average Space Size: {avg_space_sqft:,.0f} sq ft - Total Workpoint Count: {workpoint_count} - Floor Count: {floor_count} Rule-based Suggested Variables: {', '.join(suggested_vars)} Candidate Weather Variables: temp_mean · temp_std · HDD_sum · CDD_sum · rain_sum · clouds_all_mean · humidity_mean · wind_speed_mean Tasks: 1. Select 3–5 variables that best capture energy-use behaviour under the current configuration. 2. Briefly justify each choice. 3. Return a markdown table with columns: Selected Variable | Reason. """ def chat_with_ollama(messages: list, model: str = "mistral") -> str: """与Ollama API聊天""" import requests url = "http://localhost:11434/api/chat" try: response = requests.post( url, json={"model": model, "messages": messages, "stream": False}, timeout=30 ) response.raise_for_status() return response.json()["message"]["content"] except requests.exceptions.RequestException as e: raise Exception(f"Ollama API error: {str(e)}") except KeyError: raise Exception("Invalid response format from Ollama API") def parse_selected_vars(md: str) -> list: """解析markdown表格中的变量列表""" vars_ = [] for row in md.strip().splitlines(): if row.startswith("|") and not row.startswith("| Selected"): parts = row.split("|") if len(parts) > 1: first = parts[1].strip() if first and first != '---' and first not in ["Selected Variable", ""]: vars_.append(first) return vars_