Add CIQ 3G Generator with WBTS/WCEL sheet generation, refactor CIQ 2G to extract shared site parsing logic with MCC/MNC parameters, implement TRX sheet builder with BCCH/TRX frequency parsing and MAIO assignment, add BTS sheet builder with template name detection and sector ID mapping, and create MAL sheet builder with mobile allocation frequency extraction from CIQ brut Excel
1e7ca72
| import io | |
| import re | |
| from dataclasses import dataclass | |
| from typing import Optional | |
| import pandas as pd | |
| def _parse_int(value: object) -> Optional[int]: | |
| v = pd.to_numeric(value, errors="coerce") | |
| if pd.isna(v): | |
| return None | |
| return int(v) | |
| def _base_site_name_from_sites(sites: object) -> str: | |
| if not isinstance(sites, str): | |
| return "" | |
| s = sites.strip() | |
| for suffix in ["_3G", "_3g"]: | |
| if s.endswith(suffix): | |
| return s[: -len(suffix)] | |
| return s | |
| def read_ciq_3g_brut(ciq_file) -> pd.DataFrame: | |
| if hasattr(ciq_file, "seek"): | |
| ciq_file.seek(0) | |
| df = pd.read_excel(ciq_file, engine="calamine") | |
| df.columns = df.columns.astype(str).str.strip() | |
| if "Sites" not in df.columns: | |
| raise ValueError("CIQ 3G brut is missing required column: Sites") | |
| df["Sites"] = df["Sites"].where(df["Sites"].notna(), pd.NA) | |
| df["Sites"] = df["Sites"].astype("string").str.strip() | |
| return df | |
| def _band_from_cell_name(cell_name: object) -> str: | |
| if not isinstance(cell_name, str): | |
| return "" | |
| s = cell_name.upper() | |
| if "_U900" in s or s.endswith("U900"): | |
| return "U900" | |
| if "_U2100" in s or s.endswith("U2100"): | |
| return "U2100" | |
| return "" | |
| def _band_label(band: str) -> str: | |
| if band == "U900": | |
| return "U900 (U9)" | |
| if band == "U2100": | |
| return "U2100 (U21)" | |
| return band | |
| def _cell_number_from_cell_name(cell_name: object) -> Optional[int]: | |
| if not isinstance(cell_name, str): | |
| return None | |
| m = re.search(r"_(\d+)_U(?:900|2100)\b", cell_name.upper()) | |
| if not m: | |
| return None | |
| try: | |
| v = int(m.group(1)) | |
| except ValueError: | |
| return None | |
| return v if v > 0 else None | |
| def _sector_id_from_cell_name(cell_name: object) -> int: | |
| cell_no = _cell_number_from_cell_name(cell_name) | |
| if cell_no is None: | |
| raise ValueError(f"Cannot derive SectorID from NOM_CELLULE='{cell_name}'") | |
| return ((int(cell_no) - 1) % 3) + 1 | |
| def _tcell_from_band_and_sector(band: str, sector_id: int) -> int: | |
| if band == "U900": | |
| return sector_id + 2 # 1->3, 2->4, 3->5 | |
| # U2100 | |
| tcell_map = {1: 0, 2: 1, 3: 3} | |
| if sector_id not in tcell_map: | |
| raise ValueError(f"Invalid SectorID '{sector_id}' for Tcell") | |
| return tcell_map[sector_id] | |
| def build_wcel_sheet(ciq_df: pd.DataFrame) -> pd.DataFrame: | |
| required = [ | |
| "Sites", | |
| "NodeB_ID", | |
| "NOM_CELLULE", | |
| "CELLID", | |
| "SAC", | |
| "LAC", | |
| "RAC", | |
| "FREQUENCE", | |
| "PSCRAMBCODE", | |
| "RNC_id", | |
| ] | |
| missing = [c for c in required if c not in ciq_df.columns] | |
| if missing: | |
| raise ValueError(f"CIQ 3G brut is missing required columns for WCEL: {missing}") | |
| rows = [] | |
| for site_key, group in ciq_df.groupby(["NodeB_ID", "RNC_id"], dropna=False): | |
| nodeb_id_raw, rnc_id_raw = site_key | |
| nodeb_id = _parse_int(nodeb_id_raw) | |
| rnc_id = _parse_int(rnc_id_raw) | |
| if nodeb_id is None or rnc_id is None: | |
| continue | |
| tmp = group.copy() | |
| tmp["_band"] = tmp["NOM_CELLULE"].apply(_band_from_cell_name) | |
| # U2100 LcrId grouping by UARFCN (FREQUENCE) | |
| u2100 = tmp[tmp["_band"] == "U2100"].copy() | |
| u2100_uarfcns = sorted( | |
| pd.to_numeric(u2100["FREQUENCE"], errors="coerce") | |
| .dropna() | |
| .astype(int) | |
| .unique() | |
| ) | |
| u2100_base_by_uarfcn = { | |
| uarfcn: 1 + 3 * idx for idx, uarfcn in enumerate(u2100_uarfcns) | |
| } | |
| for _, r in tmp.iterrows(): | |
| band = r.get("_band") | |
| if band not in {"U900", "U2100"}: | |
| continue | |
| uarfcn = _parse_int(r.get("FREQUENCE")) | |
| if uarfcn is None: | |
| continue | |
| sector_id = _sector_id_from_cell_name(r.get("NOM_CELLULE")) | |
| if band == "U900": | |
| lcr_id = 9 + sector_id # 10..12 | |
| else: | |
| base = u2100_base_by_uarfcn.get(uarfcn) | |
| if base is None: | |
| # Should not happen, but keep safe | |
| base = 1 | |
| lcr_id = base + (sector_id - 1) | |
| cid = _parse_int(r.get("CELLID")) | |
| lac = _parse_int(r.get("LAC")) | |
| rac = _parse_int(r.get("RAC")) | |
| sac = _parse_int(r.get("SAC")) | |
| name = f"{str(r.get('NOM_CELLULE'))}_NA" | |
| rows.append( | |
| { | |
| "Site": nodeb_id, | |
| "RncId": rnc_id, | |
| "WBTSId": nodeb_id, | |
| "LcrId": int(lcr_id), | |
| "Band": _band_label(band), | |
| "CId": cid, | |
| "LAC": lac, | |
| "name": name, | |
| "PriScrCode": _parse_int(r.get("PSCRAMBCODE")), | |
| "PWSMCellGroup": int(sector_id), | |
| "RAC": rac, | |
| "SAC": sac, | |
| "Tcell": _tcell_from_band_and_sector(band, int(sector_id)), | |
| "UARFCN": int(uarfcn), | |
| "SectorID": int(sector_id), | |
| } | |
| ) | |
| df_wcel = pd.DataFrame(rows) | |
| if df_wcel.empty: | |
| return df_wcel | |
| ordered = [ | |
| "Site", | |
| "RncId", | |
| "WBTSId", | |
| "LcrId", | |
| "Band", | |
| "CId", | |
| "LAC", | |
| "name", | |
| "PriScrCode", | |
| "PWSMCellGroup", | |
| "RAC", | |
| "SAC", | |
| "Tcell", | |
| "UARFCN", | |
| "SectorID", | |
| ] | |
| df_wcel = df_wcel[ordered].sort_values(by=["Site", "LcrId"], kind="stable") | |
| return df_wcel | |
| def build_wbts_sheet( | |
| ciq_df: pd.DataFrame, year_suffix: str, bands: str | |
| ) -> pd.DataFrame: | |
| required = ["Sites", "NodeB_ID", "RNC_id"] | |
| missing = [c for c in required if c not in ciq_df.columns] | |
| if missing: | |
| raise ValueError(f"CIQ 3G brut is missing required columns for WBTS: {missing}") | |
| rows = [] | |
| for sites, group in ciq_df.groupby("Sites", dropna=False): | |
| if sites is None or (isinstance(sites, float) and pd.isna(sites)): | |
| continue | |
| sites_str = str(sites).strip() | |
| if not sites_str: | |
| continue | |
| nodeb_ids = pd.to_numeric(group["NodeB_ID"], errors="coerce").dropna().unique() | |
| if len(nodeb_ids) == 0: | |
| raise ValueError(f"Missing NodeB_ID for site '{sites_str}'") | |
| nodeb_id = int(nodeb_ids[0]) | |
| rnc_ids = pd.to_numeric(group["RNC_id"], errors="coerce").dropna().unique() | |
| if len(rnc_ids) == 0: | |
| raise ValueError(f"Missing RNC_id for site '{sites_str}'") | |
| rnc_id = int(rnc_ids[0]) | |
| base_name = _base_site_name_from_sites(sites_str) | |
| name = f"{base_name}_{year_suffix}_{bands}_NA" | |
| wbts_name = f"{sites_str}_NA" | |
| rows.append( | |
| { | |
| "S": nodeb_id, | |
| "Name": name, | |
| "RncId": rnc_id, | |
| "WBTSId": nodeb_id, | |
| "name": wbts_name, | |
| "WBTSName": wbts_name, | |
| } | |
| ) | |
| df_wbts = pd.DataFrame(rows) | |
| if not df_wbts.empty: | |
| df_wbts = df_wbts[ | |
| ["S", "Name", "RncId", "WBTSId", "name", "WBTSName"] | |
| ].sort_values(by=["S"], kind="stable") | |
| return df_wbts | |
| def generate_ciq_3g_excel( | |
| ciq_file, | |
| year_suffix: str = "25", | |
| bands: str = "G9G18U9U21L8L18L26", | |
| ) -> tuple[dict[str, pd.DataFrame], bytes]: | |
| ciq_df = read_ciq_3g_brut(ciq_file) | |
| df_wbts = build_wbts_sheet(ciq_df, year_suffix=year_suffix, bands=bands) | |
| df_wcel = build_wcel_sheet(ciq_df) | |
| sheets: dict[str, pd.DataFrame] = { | |
| "WBTS": df_wbts, | |
| "WCEL": df_wcel, | |
| } | |
| bytes_io = io.BytesIO() | |
| with pd.ExcelWriter(bytes_io, engine="xlsxwriter") as writer: | |
| for sheet_name, df in sheets.items(): | |
| df.to_excel(writer, sheet_name=sheet_name, index=False) | |
| return sheets, bytes_io.getvalue() | |