Add multi-RAT combined sheet generation to KPI health check Excel export with time-based site aggregation, implement time key normalization for hourly/daily granularity with period_start/date_only column detection, build unified sheet merging 2G/3G/LTE/TWAMP KPIs by site_code and time with RAT prefix columns, extract and merge geographic coordinates from all RATs, and fallback to per-RAT sheets when combined generation fails
4a2530d | import pandas as pd | |
| from panel_app.convert_to_excel_panel import write_dfs_to_excel | |
| def _normalize_time_key( | |
| df: pd.DataFrame, granularity: str | |
| ) -> tuple[str, pd.Series] | None: | |
| if df is None or df.empty: | |
| return None | |
| g = str(granularity or "Daily").strip().lower() | |
| is_hourly = g.startswith("hour") or g.startswith("h") | |
| if is_hourly: | |
| time_col = "period_start" if "period_start" in df.columns else "date_only" | |
| t = pd.to_datetime(df.get(time_col), errors="coerce").dt.floor("h") | |
| return time_col, t | |
| time_col = "date_only" if "date_only" in df.columns else "period_start" | |
| t = pd.to_datetime(df.get(time_col), errors="coerce").dt.date | |
| return time_col, t | |
| def _build_all_tech_sheet( | |
| daily_by_rat: dict[str, pd.DataFrame], | |
| granularity: str, | |
| ) -> tuple[str, pd.DataFrame] | None: | |
| if not daily_by_rat or not isinstance(daily_by_rat, dict): | |
| return None | |
| g = str(granularity or "Daily").strip().lower() | |
| prefix = "Hourly" if (g.startswith("hour") or g.startswith("h")) else "Daily" | |
| ordered_rats = ["2G", "3G", "LTE", "TWAMP"] | |
| present = [r for r in ordered_rats if r in daily_by_rat] | |
| if not present: | |
| present = [str(r) for r in daily_by_rat.keys()] | |
| time_col = None | |
| keys = [] | |
| coords_parts = [] | |
| for rat in present: | |
| df = daily_by_rat.get(rat) | |
| if not isinstance(df, pd.DataFrame) or df.empty: | |
| continue | |
| nt = _normalize_time_key(df, granularity) | |
| if nt is None: | |
| continue | |
| tc, tkey = nt | |
| if time_col is None: | |
| time_col = tc | |
| tmp = pd.DataFrame( | |
| {"site_code": pd.to_numeric(df.get("site_code"), errors="coerce"), tc: tkey} | |
| ) | |
| tmp = tmp.dropna(subset=["site_code", tc]).copy() | |
| tmp["site_code"] = tmp["site_code"].astype(int) | |
| keys.append(tmp[["site_code", tc]]) | |
| cols = [ | |
| c for c in ["site_code", "City", "Longitude", "Latitude"] if c in df.columns | |
| ] | |
| if cols: | |
| cp = df[cols].copy() | |
| cp["site_code"] = pd.to_numeric(cp["site_code"], errors="coerce") | |
| cp = cp.dropna(subset=["site_code"]).copy() | |
| cp["site_code"] = cp["site_code"].astype(int) | |
| coords_parts.append(cp) | |
| if not keys or time_col is None: | |
| return None | |
| base = pd.concat(keys, ignore_index=True).drop_duplicates( | |
| subset=["site_code", time_col] | |
| ) | |
| coords = None | |
| if coords_parts: | |
| coords_all = pd.concat(coords_parts, ignore_index=True) | |
| coords_all = coords_all.drop_duplicates(subset=["site_code"]) | |
| keep = [ | |
| c | |
| for c in ["site_code", "City", "Longitude", "Latitude"] | |
| if c in coords_all.columns | |
| ] | |
| coords = coords_all[keep].copy() if keep else None | |
| if isinstance(coords, pd.DataFrame) and not coords.empty: | |
| base = pd.merge(base, coords, on="site_code", how="left") | |
| base["ID"] = base[time_col].astype(str) + "_" + base["site_code"].astype(str) | |
| meta_cols = { | |
| "site_code", | |
| "period_start", | |
| "date_only", | |
| "Longitude", | |
| "Latitude", | |
| "City", | |
| "RAT", | |
| "ID", | |
| } | |
| out = base | |
| for rat in present: | |
| df = daily_by_rat.get(rat) | |
| if not isinstance(df, pd.DataFrame) or df.empty: | |
| continue | |
| nt = _normalize_time_key(df, granularity) | |
| if nt is None: | |
| continue | |
| tc, tkey = nt | |
| tmp = df.copy() | |
| tmp["site_code"] = pd.to_numeric(tmp.get("site_code"), errors="coerce") | |
| tmp = tmp.dropna(subset=["site_code"]).copy() | |
| tmp["site_code"] = tmp["site_code"].astype(int) | |
| tmp[tc] = tkey | |
| tmp = tmp.dropna(subset=[tc]).copy() | |
| kpi_cols = [c for c in tmp.columns if c not in meta_cols] | |
| keep_cols = ["site_code", tc] + kpi_cols | |
| tmp2 = tmp[keep_cols].copy() | |
| rename = {c: f"{rat}_{c}" for c in kpi_cols} | |
| tmp2 = tmp2.rename(columns=rename) | |
| out = pd.merge( | |
| out, | |
| tmp2, | |
| left_on=["site_code", time_col], | |
| right_on=["site_code", tc], | |
| how="left", | |
| ) | |
| if tc != time_col and tc in out.columns: | |
| out = out.drop(columns=[tc], errors="ignore") | |
| first_cols = [ | |
| c | |
| for c in ["ID", time_col, "site_code", "City", "Longitude", "Latitude"] | |
| if c in out.columns | |
| ] | |
| rest = [c for c in out.columns if c not in first_cols] | |
| out = out[first_cols + rest] | |
| try: | |
| out = out.sort_values(by=[time_col, "site_code"], ascending=[True, True]) | |
| except Exception: | |
| pass | |
| return f"{prefix}_All", out | |
| def build_export_bytes( | |
| datasets_df: pd.DataFrame | None, | |
| rules_df: pd.DataFrame | None, | |
| summary_df: pd.DataFrame | None, | |
| status_df: pd.DataFrame | None, | |
| daily_by_rat: dict[str, pd.DataFrame] | None = None, | |
| granularity: str = "Daily", | |
| multirat_summary_df: pd.DataFrame | None = None, | |
| top_anomalies_df: pd.DataFrame | None = None, | |
| complaint_multirat_df: pd.DataFrame | None = None, | |
| complaint_top_anomalies_df: pd.DataFrame | None = None, | |
| ops_queue_df: pd.DataFrame | None = None, | |
| delta_df: pd.DataFrame | None = None, | |
| profile: dict | None = None, | |
| ) -> bytes: | |
| if profile is not None: | |
| profile["export_prep_seconds"] = 0.0 | |
| profile["excel_total_seconds"] = 0.0 | |
| t_prep0 = pd.Timestamp.utcnow() if profile is not None else None | |
| dfs = [ | |
| datasets_df if isinstance(datasets_df, pd.DataFrame) else pd.DataFrame(), | |
| rules_df if isinstance(rules_df, pd.DataFrame) else pd.DataFrame(), | |
| summary_df if isinstance(summary_df, pd.DataFrame) else pd.DataFrame(), | |
| status_df if isinstance(status_df, pd.DataFrame) else pd.DataFrame(), | |
| ] | |
| sheet_names = [ | |
| "Datasets", | |
| "KPI_Rules", | |
| "Site_Summary", | |
| "Site_KPI_Status", | |
| ] | |
| max_data_rows = 1048575 | |
| if daily_by_rat and isinstance(daily_by_rat, dict): | |
| g = str(granularity or "Daily").strip().lower() | |
| prefix = "Hourly" if (g.startswith("hour") or g.startswith("h")) else "Daily" | |
| combined = _build_all_tech_sheet(daily_by_rat, granularity) | |
| if combined is not None: | |
| base, df_all = combined | |
| if len(df_all) <= max_data_rows: | |
| dfs.append(df_all) | |
| sheet_names.append(base[:31]) | |
| else: | |
| part = 1 | |
| for start in range(0, len(df_all), max_data_rows): | |
| end = min(start + max_data_rows, len(df_all)) | |
| dfs.append(df_all.iloc[start:end].copy()) | |
| sheet_names.append(f"{base}_p{part}"[:31]) | |
| part += 1 | |
| else: | |
| for rat, df in daily_by_rat.items(): | |
| if not isinstance(df, pd.DataFrame): | |
| continue | |
| base = f"{prefix}_All_{str(rat)}" | |
| if len(df) <= max_data_rows: | |
| dfs.append(df) | |
| sheet_names.append(base[:31]) | |
| else: | |
| part = 1 | |
| for start in range(0, len(df), max_data_rows): | |
| end = min(start + max_data_rows, len(df)) | |
| dfs.append(df.iloc[start:end].copy()) | |
| sheet_names.append(f"{base}_p{part}"[:31]) | |
| part += 1 | |
| dfs.extend( | |
| [ | |
| ( | |
| multirat_summary_df | |
| if isinstance(multirat_summary_df, pd.DataFrame) | |
| else pd.DataFrame() | |
| ), | |
| ( | |
| top_anomalies_df | |
| if isinstance(top_anomalies_df, pd.DataFrame) | |
| else pd.DataFrame() | |
| ), | |
| ( | |
| complaint_multirat_df | |
| if isinstance(complaint_multirat_df, pd.DataFrame) | |
| else pd.DataFrame() | |
| ), | |
| ( | |
| complaint_top_anomalies_df | |
| if isinstance(complaint_top_anomalies_df, pd.DataFrame) | |
| else pd.DataFrame() | |
| ), | |
| ops_queue_df if isinstance(ops_queue_df, pd.DataFrame) else pd.DataFrame(), | |
| delta_df if isinstance(delta_df, pd.DataFrame) else pd.DataFrame(), | |
| ] | |
| ) | |
| sheet_names.extend( | |
| [ | |
| "MultiRAT_Summary", | |
| "Top_Anomalies", | |
| "Complaint_MultiRAT", | |
| "Complaint_Top_Anomalies", | |
| "Ops_Queue", | |
| "Delta", | |
| ] | |
| ) | |
| if profile is not None: | |
| t_prep1 = pd.Timestamp.utcnow() | |
| if t_prep0 is not None: | |
| profile["export_prep_seconds"] = float((t_prep1 - t_prep0).total_seconds()) | |
| profile["sheet_count"] = int(len(sheet_names)) | |
| return write_dfs_to_excel(dfs, sheet_names, index=False, profile=profile) | |