import io import re from dataclasses import dataclass from typing import Optional import pandas as pd def _parse_int(value: object) -> Optional[int]: v = pd.to_numeric(value, errors="coerce") if pd.isna(v): return None return int(v) def _base_site_name_from_sites(sites: object) -> str: if not isinstance(sites, str): return "" s = sites.strip() for suffix in ["_3G", "_3g"]: if s.endswith(suffix): return s[: -len(suffix)] return s def read_ciq_3g_brut(ciq_file) -> pd.DataFrame: if hasattr(ciq_file, "seek"): ciq_file.seek(0) df = pd.read_excel(ciq_file, engine="calamine") df.columns = df.columns.astype(str).str.strip() if "Sites" not in df.columns: raise ValueError("CIQ 3G brut is missing required column: Sites") df["Sites"] = df["Sites"].where(df["Sites"].notna(), pd.NA) df["Sites"] = df["Sites"].astype("string").str.strip() return df def _band_from_cell_name(cell_name: object) -> str: if not isinstance(cell_name, str): return "" s = cell_name.upper() if "_U900" in s or s.endswith("U900"): return "U900" if "_U2100" in s or s.endswith("U2100"): return "U2100" return "" def _band_label(band: str) -> str: if band == "U900": return "U900 (U9)" if band == "U2100": return "U2100 (U21)" return band def _cell_number_from_cell_name(cell_name: object) -> Optional[int]: if not isinstance(cell_name, str): return None m = re.search(r"_(\d+)_U(?:900|2100)\b", cell_name.upper()) if not m: return None try: v = int(m.group(1)) except ValueError: return None return v if v > 0 else None def _sector_id_from_cell_name(cell_name: object) -> int: cell_no = _cell_number_from_cell_name(cell_name) if cell_no is None: raise ValueError(f"Cannot derive SectorID from NOM_CELLULE='{cell_name}'") return ((int(cell_no) - 1) % 3) + 1 def _tcell_from_band_and_sector(band: str, sector_id: int) -> int: if band == "U900": return sector_id + 2 # 1->3, 2->4, 3->5 # U2100 tcell_map = {1: 0, 2: 1, 3: 3} if sector_id not in tcell_map: raise ValueError(f"Invalid SectorID '{sector_id}' for Tcell") return tcell_map[sector_id] def build_wcel_sheet(ciq_df: pd.DataFrame) -> pd.DataFrame: required = [ "Sites", "NodeB_ID", "NOM_CELLULE", "CELLID", "SAC", "LAC", "RAC", "FREQUENCE", "PSCRAMBCODE", "RNC_id", ] missing = [c for c in required if c not in ciq_df.columns] if missing: raise ValueError(f"CIQ 3G brut is missing required columns for WCEL: {missing}") rows = [] for site_key, group in ciq_df.groupby(["NodeB_ID", "RNC_id"], dropna=False): nodeb_id_raw, rnc_id_raw = site_key nodeb_id = _parse_int(nodeb_id_raw) rnc_id = _parse_int(rnc_id_raw) if nodeb_id is None or rnc_id is None: continue tmp = group.copy() tmp["_band"] = tmp["NOM_CELLULE"].apply(_band_from_cell_name) # U2100 LcrId grouping by UARFCN (FREQUENCE) u2100 = tmp[tmp["_band"] == "U2100"].copy() u2100_uarfcns = sorted( pd.to_numeric(u2100["FREQUENCE"], errors="coerce") .dropna() .astype(int) .unique() ) u2100_base_by_uarfcn = { uarfcn: 1 + 3 * idx for idx, uarfcn in enumerate(u2100_uarfcns) } for _, r in tmp.iterrows(): band = r.get("_band") if band not in {"U900", "U2100"}: continue uarfcn = _parse_int(r.get("FREQUENCE")) if uarfcn is None: continue sector_id = _sector_id_from_cell_name(r.get("NOM_CELLULE")) if band == "U900": lcr_id = 9 + sector_id # 10..12 else: base = u2100_base_by_uarfcn.get(uarfcn) if base is None: # Should not happen, but keep safe base = 1 lcr_id = base + (sector_id - 1) cid = _parse_int(r.get("CELLID")) lac = _parse_int(r.get("LAC")) rac = _parse_int(r.get("RAC")) sac = _parse_int(r.get("SAC")) name = f"{str(r.get('NOM_CELLULE'))}_NA" rows.append( { "Site": nodeb_id, "RncId": rnc_id, "WBTSId": nodeb_id, "LcrId": int(lcr_id), "Band": _band_label(band), "CId": cid, "LAC": lac, "name": name, "PriScrCode": _parse_int(r.get("PSCRAMBCODE")), "PWSMCellGroup": int(sector_id), "RAC": rac, "SAC": sac, "Tcell": _tcell_from_band_and_sector(band, int(sector_id)), "UARFCN": int(uarfcn), "SectorID": int(sector_id), } ) df_wcel = pd.DataFrame(rows) if df_wcel.empty: return df_wcel ordered = [ "Site", "RncId", "WBTSId", "LcrId", "Band", "CId", "LAC", "name", "PriScrCode", "PWSMCellGroup", "RAC", "SAC", "Tcell", "UARFCN", "SectorID", ] df_wcel = df_wcel[ordered].sort_values(by=["Site", "LcrId"], kind="stable") return df_wcel def build_wbts_sheet( ciq_df: pd.DataFrame, year_suffix: str, bands: str ) -> pd.DataFrame: required = ["Sites", "NodeB_ID", "RNC_id"] missing = [c for c in required if c not in ciq_df.columns] if missing: raise ValueError(f"CIQ 3G brut is missing required columns for WBTS: {missing}") rows = [] for sites, group in ciq_df.groupby("Sites", dropna=False): if sites is None or (isinstance(sites, float) and pd.isna(sites)): continue sites_str = str(sites).strip() if not sites_str: continue nodeb_ids = pd.to_numeric(group["NodeB_ID"], errors="coerce").dropna().unique() if len(nodeb_ids) == 0: raise ValueError(f"Missing NodeB_ID for site '{sites_str}'") nodeb_id = int(nodeb_ids[0]) rnc_ids = pd.to_numeric(group["RNC_id"], errors="coerce").dropna().unique() if len(rnc_ids) == 0: raise ValueError(f"Missing RNC_id for site '{sites_str}'") rnc_id = int(rnc_ids[0]) base_name = _base_site_name_from_sites(sites_str) name = f"{base_name}_{year_suffix}_{bands}_NA" wbts_name = f"{sites_str}_NA" rows.append( { "S": nodeb_id, "Name": name, "RncId": rnc_id, "WBTSId": nodeb_id, "name": wbts_name, "WBTSName": wbts_name, } ) df_wbts = pd.DataFrame(rows) if not df_wbts.empty: df_wbts = df_wbts[ ["S", "Name", "RncId", "WBTSId", "name", "WBTSName"] ].sort_values(by=["S"], kind="stable") return df_wbts def generate_ciq_3g_excel( ciq_file, year_suffix: str = "25", bands: str = "G9G18U9U21L8L18L26", ) -> tuple[dict[str, pd.DataFrame], bytes]: ciq_df = read_ciq_3g_brut(ciq_file) df_wbts = build_wbts_sheet(ciq_df, year_suffix=year_suffix, bands=bands) df_wcel = build_wcel_sheet(ciq_df) sheets: dict[str, pd.DataFrame] = { "WBTS": df_wbts, "WCEL": df_wcel, } bytes_io = io.BytesIO() with pd.ExcelWriter(bytes_io, engine="xlsxwriter") as writer: for sheet_name, df in sheets.items(): df.to_excel(writer, sheet_name=sheet_name, index=False) return sheets, bytes_io.getvalue()