DavMelchi's picture
Add multi-RAT combined sheet generation to KPI health check Excel export with time-based site aggregation, implement time key normalization for hourly/daily granularity with period_start/date_only column detection, build unified sheet merging 2G/3G/LTE/TWAMP KPIs by site_code and time with RAT prefix columns, extract and merge geographic coordinates from all RATs, and fallback to per-RAT sheets when combined generation fails
4a2530d
import pandas as pd
from panel_app.convert_to_excel_panel import write_dfs_to_excel
def _normalize_time_key(
df: pd.DataFrame, granularity: str
) -> tuple[str, pd.Series] | None:
if df is None or df.empty:
return None
g = str(granularity or "Daily").strip().lower()
is_hourly = g.startswith("hour") or g.startswith("h")
if is_hourly:
time_col = "period_start" if "period_start" in df.columns else "date_only"
t = pd.to_datetime(df.get(time_col), errors="coerce").dt.floor("h")
return time_col, t
time_col = "date_only" if "date_only" in df.columns else "period_start"
t = pd.to_datetime(df.get(time_col), errors="coerce").dt.date
return time_col, t
def _build_all_tech_sheet(
daily_by_rat: dict[str, pd.DataFrame],
granularity: str,
) -> tuple[str, pd.DataFrame] | None:
if not daily_by_rat or not isinstance(daily_by_rat, dict):
return None
g = str(granularity or "Daily").strip().lower()
prefix = "Hourly" if (g.startswith("hour") or g.startswith("h")) else "Daily"
ordered_rats = ["2G", "3G", "LTE", "TWAMP"]
present = [r for r in ordered_rats if r in daily_by_rat]
if not present:
present = [str(r) for r in daily_by_rat.keys()]
time_col = None
keys = []
coords_parts = []
for rat in present:
df = daily_by_rat.get(rat)
if not isinstance(df, pd.DataFrame) or df.empty:
continue
nt = _normalize_time_key(df, granularity)
if nt is None:
continue
tc, tkey = nt
if time_col is None:
time_col = tc
tmp = pd.DataFrame(
{"site_code": pd.to_numeric(df.get("site_code"), errors="coerce"), tc: tkey}
)
tmp = tmp.dropna(subset=["site_code", tc]).copy()
tmp["site_code"] = tmp["site_code"].astype(int)
keys.append(tmp[["site_code", tc]])
cols = [
c for c in ["site_code", "City", "Longitude", "Latitude"] if c in df.columns
]
if cols:
cp = df[cols].copy()
cp["site_code"] = pd.to_numeric(cp["site_code"], errors="coerce")
cp = cp.dropna(subset=["site_code"]).copy()
cp["site_code"] = cp["site_code"].astype(int)
coords_parts.append(cp)
if not keys or time_col is None:
return None
base = pd.concat(keys, ignore_index=True).drop_duplicates(
subset=["site_code", time_col]
)
coords = None
if coords_parts:
coords_all = pd.concat(coords_parts, ignore_index=True)
coords_all = coords_all.drop_duplicates(subset=["site_code"])
keep = [
c
for c in ["site_code", "City", "Longitude", "Latitude"]
if c in coords_all.columns
]
coords = coords_all[keep].copy() if keep else None
if isinstance(coords, pd.DataFrame) and not coords.empty:
base = pd.merge(base, coords, on="site_code", how="left")
base["ID"] = base[time_col].astype(str) + "_" + base["site_code"].astype(str)
meta_cols = {
"site_code",
"period_start",
"date_only",
"Longitude",
"Latitude",
"City",
"RAT",
"ID",
}
out = base
for rat in present:
df = daily_by_rat.get(rat)
if not isinstance(df, pd.DataFrame) or df.empty:
continue
nt = _normalize_time_key(df, granularity)
if nt is None:
continue
tc, tkey = nt
tmp = df.copy()
tmp["site_code"] = pd.to_numeric(tmp.get("site_code"), errors="coerce")
tmp = tmp.dropna(subset=["site_code"]).copy()
tmp["site_code"] = tmp["site_code"].astype(int)
tmp[tc] = tkey
tmp = tmp.dropna(subset=[tc]).copy()
kpi_cols = [c for c in tmp.columns if c not in meta_cols]
keep_cols = ["site_code", tc] + kpi_cols
tmp2 = tmp[keep_cols].copy()
rename = {c: f"{rat}_{c}" for c in kpi_cols}
tmp2 = tmp2.rename(columns=rename)
out = pd.merge(
out,
tmp2,
left_on=["site_code", time_col],
right_on=["site_code", tc],
how="left",
)
if tc != time_col and tc in out.columns:
out = out.drop(columns=[tc], errors="ignore")
first_cols = [
c
for c in ["ID", time_col, "site_code", "City", "Longitude", "Latitude"]
if c in out.columns
]
rest = [c for c in out.columns if c not in first_cols]
out = out[first_cols + rest]
try:
out = out.sort_values(by=[time_col, "site_code"], ascending=[True, True])
except Exception:
pass
return f"{prefix}_All", out
def build_export_bytes(
datasets_df: pd.DataFrame | None,
rules_df: pd.DataFrame | None,
summary_df: pd.DataFrame | None,
status_df: pd.DataFrame | None,
daily_by_rat: dict[str, pd.DataFrame] | None = None,
granularity: str = "Daily",
multirat_summary_df: pd.DataFrame | None = None,
top_anomalies_df: pd.DataFrame | None = None,
complaint_multirat_df: pd.DataFrame | None = None,
complaint_top_anomalies_df: pd.DataFrame | None = None,
ops_queue_df: pd.DataFrame | None = None,
delta_df: pd.DataFrame | None = None,
profile: dict | None = None,
) -> bytes:
if profile is not None:
profile["export_prep_seconds"] = 0.0
profile["excel_total_seconds"] = 0.0
t_prep0 = pd.Timestamp.utcnow() if profile is not None else None
dfs = [
datasets_df if isinstance(datasets_df, pd.DataFrame) else pd.DataFrame(),
rules_df if isinstance(rules_df, pd.DataFrame) else pd.DataFrame(),
summary_df if isinstance(summary_df, pd.DataFrame) else pd.DataFrame(),
status_df if isinstance(status_df, pd.DataFrame) else pd.DataFrame(),
]
sheet_names = [
"Datasets",
"KPI_Rules",
"Site_Summary",
"Site_KPI_Status",
]
max_data_rows = 1048575
if daily_by_rat and isinstance(daily_by_rat, dict):
g = str(granularity or "Daily").strip().lower()
prefix = "Hourly" if (g.startswith("hour") or g.startswith("h")) else "Daily"
combined = _build_all_tech_sheet(daily_by_rat, granularity)
if combined is not None:
base, df_all = combined
if len(df_all) <= max_data_rows:
dfs.append(df_all)
sheet_names.append(base[:31])
else:
part = 1
for start in range(0, len(df_all), max_data_rows):
end = min(start + max_data_rows, len(df_all))
dfs.append(df_all.iloc[start:end].copy())
sheet_names.append(f"{base}_p{part}"[:31])
part += 1
else:
for rat, df in daily_by_rat.items():
if not isinstance(df, pd.DataFrame):
continue
base = f"{prefix}_All_{str(rat)}"
if len(df) <= max_data_rows:
dfs.append(df)
sheet_names.append(base[:31])
else:
part = 1
for start in range(0, len(df), max_data_rows):
end = min(start + max_data_rows, len(df))
dfs.append(df.iloc[start:end].copy())
sheet_names.append(f"{base}_p{part}"[:31])
part += 1
dfs.extend(
[
(
multirat_summary_df
if isinstance(multirat_summary_df, pd.DataFrame)
else pd.DataFrame()
),
(
top_anomalies_df
if isinstance(top_anomalies_df, pd.DataFrame)
else pd.DataFrame()
),
(
complaint_multirat_df
if isinstance(complaint_multirat_df, pd.DataFrame)
else pd.DataFrame()
),
(
complaint_top_anomalies_df
if isinstance(complaint_top_anomalies_df, pd.DataFrame)
else pd.DataFrame()
),
ops_queue_df if isinstance(ops_queue_df, pd.DataFrame) else pd.DataFrame(),
delta_df if isinstance(delta_df, pd.DataFrame) else pd.DataFrame(),
]
)
sheet_names.extend(
[
"MultiRAT_Summary",
"Top_Anomalies",
"Complaint_MultiRAT",
"Complaint_Top_Anomalies",
"Ops_Queue",
"Delta",
]
)
if profile is not None:
t_prep1 = pd.Timestamp.utcnow()
if t_prep0 is not None:
profile["export_prep_seconds"] = float((t_prep1 - t_prep0).total_seconds())
profile["sheet_count"] = int(len(sheet_names))
return write_dfs_to_excel(dfs, sheet_names, index=False, profile=profile)