import io import re from dataclasses import dataclass from typing import Optional import pandas as pd from utils.utils_vars import UtilsVars REQUIRED_DUMP_BTS_COLS = ["BSC", "BCF", "BTS", "usedMobileAllocation"] BTS_EXPORT_COLUMNS = [ "site", "bscid", "cellId", "bcfId", "btsId", "Check", "bsIdentityCodeNCC", "bsIdentityCodeBCC", "locationAreaIdLAC", "locationAreaIdMCC", "locationAreaIdMNC", "usedMobileAllocation", "malId", "name", "template_name", "sectorId", ] def _normalize_col(col: object) -> str: return re.sub(r"[^0-9A-Za-z]", "", str(col)) def _clean_columns(df: pd.DataFrame) -> pd.DataFrame: df = df.copy() df.columns = [_normalize_col(c) for c in df.columns] return df def _read_dump_bts_required_columns(dump_file) -> pd.DataFrame: if hasattr(dump_file, "seek"): dump_file.seek(0) hdr = pd.read_excel( dump_file, sheet_name="BTS", engine="calamine", skiprows=[0], nrows=0, ) original_cols = list(hdr.columns) normalized_to_original: dict[str, str] = {} for c in original_cols: n = _normalize_col(c) if n and n not in normalized_to_original: normalized_to_original[n] = c missing = [c for c in REQUIRED_DUMP_BTS_COLS if c not in normalized_to_original] if missing: raise ValueError( f"Dump sheet 'BTS' is missing required columns after cleanup: {missing}. " f"Found columns (normalized): {sorted(normalized_to_original.keys())[:50]}" ) usecols = [normalized_to_original[c] for c in REQUIRED_DUMP_BTS_COLS] if hasattr(dump_file, "seek"): dump_file.seek(0) df = pd.read_excel( dump_file, sheet_name="BTS", engine="calamine", skiprows=[0], usecols=usecols, ) df = _clean_columns(df) df = df[REQUIRED_DUMP_BTS_COLS] for c in ["BSC", "BCF", "BTS", "usedMobileAllocation"]: df[c] = pd.to_numeric(df[c], errors="coerce") return df @dataclass(frozen=True) class _PlannedSite: site_name: str site_number: int bsc: int bsc_name: str name: str configuration: str assigned_bcf: Optional[int] needed_bts_ids: tuple[int, ...] def _parse_site_number(site: object) -> int: if not isinstance(site, str): return 0 m = re.match(r"^(\d+)", site.strip()) return int(m.group(1)) if m else 0 def _read_ciq_df(ciq_file) -> pd.DataFrame: if hasattr(ciq_file, "seek"): ciq_file.seek(0) df = pd.read_excel(ciq_file, engine="calamine") df.columns = df.columns.astype(str).str.strip() if "Sites" not in df.columns: raise ValueError("CIQ brut is missing required column: Sites") df["Sites"] = df["Sites"].where(df["Sites"].notna(), pd.NA) df["Sites"] = df["Sites"].astype("string").str.strip() df["site_number"] = df["Sites"].apply(_parse_site_number) # Si "BSC ID" n'est pas fourni mais "Nom BSC" existe, générer l'ID à partir du nom if "BSC ID" not in df.columns and "Nom BSC" in df.columns: # Créer un dictionnaire inversé: nom -> id bsc_name_to_id = {v: k for k, v in UtilsVars.bsc_name.items()} df["BSC ID"] = df["Nom BSC"].map(bsc_name_to_id) if "BSC ID" in df.columns: df["BSC ID"] = pd.to_numeric(df["BSC ID"], errors="coerce") if "Nbre_TRE_DR" in df.columns: df["Nbre_TRE_DR"] = pd.to_numeric(df["Nbre_TRE_DR"], errors="coerce") if "NOM_CELLULE" in df.columns: bands_sectors = df["NOM_CELLULE"].apply(_extract_band_and_sector) df["band"] = bands_sectors.apply(lambda x: x[0]) df["sector"] = bands_sectors.apply(lambda x: x[1]) else: df["band"] = None df["sector"] = None return df def _extract_band_and_sector(cell_name: object) -> tuple[Optional[str], Optional[int]]: if not isinstance(cell_name, str): return None, None parts = cell_name.strip().split("_") for i in range(len(parts) - 1): if parts[i].isdigit() and parts[i + 1] in {"900", "1800"}: sector = int(parts[i]) band = "G9" if parts[i + 1] == "900" else "G18" return band, sector if cell_name.endswith("_900"): return "G9", None if cell_name.endswith("_1800"): return "G18", None return None, None def _build_configuration(site_rows: pd.DataFrame) -> str: rows = site_rows.copy() rows["sector"] = pd.to_numeric(rows.get("sector"), errors="coerce") rows["Nbre_TRE_DR"] = pd.to_numeric(rows.get("Nbre_TRE_DR"), errors="coerce") configs: list[str] = [] for band in ["G9", "G18"]: sub = rows[rows["band"] == band] if sub.empty: continue sub = ( sub.dropna(subset=["Nbre_TRE_DR"]) .drop_duplicates(subset=["sector"], keep="first") .sort_values(by=["sector"], na_position="last") ) digits = "".join(str(int(v)) for v in sub["Nbre_TRE_DR"].tolist()) if digits: configs.append(f"{band}-{digits}") return ", ".join(configs) def _needed_bts_ids_from_site_rows( bcf: int, site_rows: pd.DataFrame ) -> tuple[int, ...]: ids: set[int] = set() offset_map = { ("G9", 1): 1, ("G9", 2): 2, ("G9", 3): 3, ("G18", 1): 4, ("G18", 2): 5, ("G18", 3): 6, } for _, r in site_rows.iterrows(): band = r.get("band") sector = r.get("sector") if ( band in {"G9", "G18"} and isinstance(sector, (int, float)) and not pd.isna(sector) ): sector_int = int(sector) off = offset_map.get((band, sector_int)) if off is not None: ids.add(bcf + off) return tuple(sorted(ids)) def _parse_ciq_sites(ciq_df: pd.DataFrame) -> list[_PlannedSite]: required = [ "Sites", "NOM_CELLULE", "Nbre_TRE_DR", "Nom BSC", "BSC ID", "band", "sector", "site_number", ] missing = [c for c in required if c not in ciq_df.columns] if missing: raise ValueError(f"CIQ brut is missing required columns: {missing}") df = ciq_df[required].copy() sites: list[_PlannedSite] = [] for site_name, site_rows in df.groupby("Sites", dropna=False): if not isinstance(site_name, str) or not site_name.strip(): continue bsc_series = site_rows["BSC ID"].dropna() if bsc_series.empty: raise ValueError(f"Missing BSC ID for site '{site_name}'") bsc = int(bsc_series.iloc[0]) bsc_name_series = site_rows["Nom BSC"].dropna() bsc_name = str(bsc_name_series.iloc[0]) if not bsc_name_series.empty else "" site_number = int(site_rows["site_number"].dropna().iloc[0]) configuration = _build_configuration(site_rows) sites.append( _PlannedSite( site_name=site_name, site_number=site_number, bsc=bsc, bsc_name=bsc_name, name=f"{site_name}_NA", configuration=configuration, assigned_bcf=None, needed_bts_ids=(), ) ) return sorted(sites, key=lambda s: (s.bsc, s.site_number, s.site_name)) def _assign_bcfs( dump_bts: pd.DataFrame, planned_sites: list[_PlannedSite], ciq_df: pd.DataFrame ) -> list[_PlannedSite]: dump_bts = dump_bts.dropna(subset=["BSC"]) assigned: list[_PlannedSite] = [] sites_by_bsc: dict[int, list[_PlannedSite]] = {} for s in planned_sites: sites_by_bsc.setdefault(s.bsc, []).append(s) for bsc, sites_in_bsc in sites_by_bsc.items(): sub_dump = dump_bts[dump_bts["BSC"].fillna(-1).astype(int) == int(bsc)] used_bcfs: set[int] = set( pd.to_numeric(sub_dump["BCF"], errors="coerce") .dropna() .astype(int) .tolist() ) used_bts: set[int] = set( pd.to_numeric(sub_dump["BTS"], errors="coerce") .dropna() .astype(int) .tolist() ) used_mal: set[int] = set( pd.to_numeric(sub_dump["usedMobileAllocation"], errors="coerce") .dropna() .astype(int) .tolist() ) sites_in_bsc_sorted = sorted( sites_in_bsc, key=lambda s: (s.site_number, s.site_name) ) for site in sites_in_bsc_sorted: site_rows = ciq_df[ciq_df["Sites"] == site.site_name] if site_rows.empty: raise ValueError(f"No CIQ rows found for site '{site.site_name}'") assigned_bcf = None assigned_needed_ids: Optional[tuple[int, ...]] = None for cand in range(10, 4401, 10): if cand in used_bcfs: continue site_needed_ids = _needed_bts_ids_from_site_rows(cand, site_rows) if not site_needed_ids: continue required_ids = tuple(cand + i for i in range(1, 7)) if any((i in used_bts) or (i in used_mal) for i in required_ids): continue assigned_bcf = cand assigned_needed_ids = site_needed_ids break if assigned_bcf is None or assigned_needed_ids is None: raise ValueError( f"No available BCF found for site '{site.site_name}' on BSC {bsc}" ) used_bcfs.add(assigned_bcf) reserved_ids = [assigned_bcf + i for i in range(1, 7)] used_bts.update(reserved_ids) used_mal.update(reserved_ids) assigned.append( _PlannedSite( site_name=site.site_name, site_number=site.site_number, bsc=site.bsc, bsc_name=site.bsc_name, name=site.name, configuration=site.configuration, assigned_bcf=int(assigned_bcf), needed_bts_ids=assigned_needed_ids, ) ) return sorted(assigned, key=lambda s: (s.bsc, s.site_number, s.site_name)) def build_bcf_sheet(dump_file, ciq_file) -> pd.DataFrame: dump_bts = _read_dump_bts_required_columns(dump_file) ciq_df = _read_ciq_df(ciq_file) planned_sites = _parse_ciq_sites(ciq_df) assigned_sites = _assign_bcfs(dump_bts, planned_sites, ciq_df) return _build_bcf_sheet_from_assigned_sites(assigned_sites) def _build_bcf_sheet_from_assigned_sites( assigned_sites: list[_PlannedSite], ) -> pd.DataFrame: rows = [] for i, s in enumerate(assigned_sites, start=1): rows.append( { "S. No.": i, "Site Number": s.site_number, "BSC": s.bsc, "BSC Name": s.bsc_name, "BCF": s.assigned_bcf, "name": s.name, "Configuration": s.configuration, } ) return pd.DataFrame(rows) def _sector_id_from_band_sector(band: object, sector: object) -> int: if band not in {"G9", "G18"}: raise ValueError(f"Invalid band '{band}'") if sector is None or (isinstance(sector, float) and pd.isna(sector)): raise ValueError("Missing sector") sec = int(sector) if sec not in {1, 2, 3}: raise ValueError(f"Invalid sector '{sec}'") return sec if band == "G9" else sec + 3 def _template_name_from_freq(freq: object) -> str: s = str(freq) if freq is not None else "" s_u = s.upper() if "1800" in s_u: return "GSM1800" if "900" in s_u: return "GSM900" return s def _template_name_from_band(band: object) -> str: if band == "G9": return "GSM900" if band == "G18": return "GSM1800" return _template_name_from_freq(band) def _frequency_band_in_use_from_band(band: object) -> int: if band == "G9": return 0 if band == "G18": return 1 raise ValueError(f"Invalid band '{band}'") def _parse_trx_frequencies(value: object) -> list[str]: if value is None or (isinstance(value, float) and pd.isna(value)): return [] s = str(value) nums = re.findall(r"\d+", s) return nums def _build_trx_sheet_from_assigned_sites( ciq_df: pd.DataFrame, assigned_sites: list[_PlannedSite] ) -> pd.DataFrame: assigned_by_site = {s.site_name: s for s in assigned_sites} required = [ "Sites", "CI", "band", "sector", "BCCH", "TRX", "BCC", ] missing = [c for c in required if c not in ciq_df.columns] if missing: raise ValueError( f"CIQ brut is missing required columns for TRX sheet: {missing}" ) rows = [] bcch_types = [4, 8, 6, 2, 2, 2, 2, 2] for _, r in ciq_df[ciq_df["Sites"].isin(assigned_by_site.keys())].iterrows(): site_name = r["Sites"] site = assigned_by_site.get(site_name) if site is None or site.assigned_bcf is None: continue sector_id = _sector_id_from_band_sector(r.get("band"), r.get("sector")) bts_id = int(site.assigned_bcf) + int(sector_id) cell_id = pd.to_numeric(r.get("CI"), errors="coerce") bcch = pd.to_numeric(r.get("BCCH"), errors="coerce") bcc = pd.to_numeric(r.get("BCC"), errors="coerce") freq_band = _frequency_band_in_use_from_band(r.get("band")) base = { "site": int(site.site_number), "bscid": int(site.bsc), "cellId": int(cell_id) if not pd.isna(cell_id) else None, "bcfId": int(site.assigned_bcf), "btsId": int(bts_id), "tsc": int(bcc) if not pd.isna(bcc) else None, "FrequencyBandinUse": int(freq_band), } bcch_row = dict(base) bcch_row["TRX"] = None bcch_row["initialFrequency"] = int(bcch) if not pd.isna(bcch) else None bcch_row["_sort_type"] = 0 bcch_row["_sort_maio"] = -1 for i in range(8): bcch_row[f"channel{i}Maio"] = None bcch_row[f"channel{i}Type"] = bcch_types[i] rows.append(bcch_row) trx_list = _parse_trx_frequencies(r.get("TRX")) if not pd.isna(bcch): bcch_str = str(int(bcch)) trx_list = [x for x in trx_list if x != bcch_str] for maio, f in enumerate(trx_list): tr_row = dict(base) tr_row["TRX"] = None tr_row["initialFrequency"] = int(f) tr_row["_sort_type"] = 1 tr_row["_sort_maio"] = int(maio) for i in range(8): tr_row[f"channel{i}Maio"] = maio tr_row[f"channel{i}Type"] = 3 if i == 0 else 2 rows.append(tr_row) df_trx = pd.DataFrame(rows) if df_trx.empty: return df_trx ordered_cols = [ "site", "bscid", "cellId", "bcfId", "btsId", "TRX", "tsc", "FrequencyBandinUse", "initialFrequency", ] for i in range(8): ordered_cols.append(f"channel{i}Maio") ordered_cols.append(f"channel{i}Type") df_trx = df_trx.sort_values( by=["site", "btsId", "_sort_type", "_sort_maio"], kind="stable" ) df_trx["TRX"] = range(1, len(df_trx) + 1) df_trx = df_trx[ordered_cols] return df_trx def build_bts_sheet(dump_file, ciq_file, mcc: int = 610, mnc: int = 2) -> pd.DataFrame: dump_bts = _read_dump_bts_required_columns(dump_file) ciq_df = _read_ciq_df(ciq_file) planned_sites = _parse_ciq_sites(ciq_df) assigned_sites = _assign_bcfs(dump_bts, planned_sites, ciq_df) return _build_bts_sheet_from_assigned_sites( ciq_df, assigned_sites, mcc=mcc, mnc=mnc ) def _build_bts_sheet_from_assigned_sites( ciq_df: pd.DataFrame, assigned_sites: list[_PlannedSite], mcc: int, mnc: int ) -> pd.DataFrame: assigned_by_site = {s.site_name: s for s in assigned_sites} required = [ "Sites", "NOM_CELLULE", "CI", "LAC", "Frequence", "NCC", "BCC", "band", "sector", ] missing = [c for c in required if c not in ciq_df.columns] if missing: raise ValueError( f"CIQ brut is missing required columns for BTS sheet: {missing}" ) rows = [] for _, r in ciq_df[ciq_df["Sites"].isin(assigned_by_site.keys())].iterrows(): site_name = r["Sites"] site = assigned_by_site.get(site_name) if site is None or site.assigned_bcf is None: continue sector_id = _sector_id_from_band_sector(r.get("band"), r.get("sector")) bts_id = int(site.assigned_bcf) + int(sector_id) cell_id = pd.to_numeric(r.get("CI"), errors="coerce") lac = pd.to_numeric(r.get("LAC"), errors="coerce") ncc = pd.to_numeric(r.get("NCC"), errors="coerce") bcc = pd.to_numeric(r.get("BCC"), errors="coerce") rows.append( { "site": int(site.site_number), "bscid": int(site.bsc), "cellId": int(cell_id) if not pd.isna(cell_id) else None, "bcfId": int(site.assigned_bcf), "btsId": int(bts_id), "Check": int(sector_id), "bsIdentityCodeNCC": int(ncc) if not pd.isna(ncc) else None, "bsIdentityCodeBCC": int(bcc) if not pd.isna(bcc) else None, "locationAreaIdLAC": int(lac) if not pd.isna(lac) else None, "locationAreaIdMCC": int(mcc), "locationAreaIdMNC": int(mnc), "usedMobileAllocation": int(bts_id), "malId": int(bts_id), "name": f"{str(r.get('NOM_CELLULE'))}_NA", "template_name": _template_name_from_band(r.get("band")), "sectorId": int(sector_id), } ) df_bts = pd.DataFrame(rows) if not df_bts.empty: df_bts = df_bts[BTS_EXPORT_COLUMNS].sort_values( by=["site", "sectorId"], kind="stable" ) return df_bts def _build_mal_sheet_from_assigned_sites( ciq_df: pd.DataFrame, assigned_sites: list[_PlannedSite] ) -> pd.DataFrame: assigned_by_site = {s.site_name: s for s in assigned_sites} required = [ "Sites", "CI", "band", "sector", "BCCH", "TRX", ] missing = [c for c in required if c not in ciq_df.columns] if missing: raise ValueError( f"CIQ brut is missing required columns for MAL sheet: {missing}" ) rows = [] for _, r in ciq_df[ciq_df["Sites"].isin(assigned_by_site.keys())].iterrows(): site_name = r["Sites"] site = assigned_by_site.get(site_name) if site is None or site.assigned_bcf is None: continue sector_id = _sector_id_from_band_sector(r.get("band"), r.get("sector")) bts_id = int(site.assigned_bcf) + int(sector_id) cell_id = pd.to_numeric(r.get("CI"), errors="coerce") bcch = pd.to_numeric(r.get("BCCH"), errors="coerce") trx_list = _parse_trx_frequencies(r.get("TRX")) freq_str = ", ".join(trx_list) row = { "site": int(site.site_number), "siteId": int(site.site_number), "bscid": int(site.bsc), "cellId": int(cell_id) if not pd.isna(cell_id) else None, "bcfId": int(site.assigned_bcf), "btsId": int(bts_id), "frequencyBandInUse": _frequency_band_in_use_from_band(r.get("band")), "malId": int(bts_id), "initial frequency": int(bcch) if not pd.isna(bcch) else None, "frequency": freq_str if freq_str else None, } for i in range(1, 7): row[f"frequency{i}"] = trx_list[i - 1] if len(trx_list) >= i else None rows.append(row) df_mal = pd.DataFrame(rows) if df_mal.empty: return df_mal ordered_cols = [ "site", "siteId", "bscid", "cellId", "bcfId", "btsId", "frequencyBandInUse", "malId", "initial frequency", "frequency", "frequency1", "frequency2", "frequency3", "frequency4", "frequency5", "frequency6", ] df_mal = df_mal[ordered_cols].sort_values(by=["site", "btsId"], kind="stable") return df_mal def generate_ciq_2g_excel( dump_file, ciq_file, mcc: int = 610, mnc: int = 2 ) -> tuple[dict[str, pd.DataFrame], bytes]: dump_bts = _read_dump_bts_required_columns(dump_file) ciq_df = _read_ciq_df(ciq_file) planned_sites = _parse_ciq_sites(ciq_df) assigned_sites = _assign_bcfs(dump_bts, planned_sites, ciq_df) df_bcf = _build_bcf_sheet_from_assigned_sites(assigned_sites) df_bts = _build_bts_sheet_from_assigned_sites( ciq_df, assigned_sites, mcc=mcc, mnc=mnc ) df_mal = _build_mal_sheet_from_assigned_sites(ciq_df, assigned_sites) df_trx = _build_trx_sheet_from_assigned_sites(ciq_df, assigned_sites) df_bts_min = pd.DataFrame() if not df_bts.empty: df_bts_min = df_bts[["site", "bscid", "cellId", "bcfId", "btsId"]].rename( columns={"site": "Site"} ) df_hoc = pd.DataFrame() df_poc = pd.DataFrame() if not df_bts.empty: base = df_bts[ ["site", "bscid", "cellId", "bcfId", "btsId", "template_name"] ].rename(columns={"site": "Site"}) df_hoc = base.copy() df_hoc.insert(5, "hocId", 1) df_hoc = df_hoc[ ["Site", "bscid", "cellId", "bcfId", "btsId", "hocId", "template_name"] ] df_poc = base.copy() df_poc.insert(5, "pocId", 1) df_poc = df_poc[ ["Site", "bscid", "cellId", "bcfId", "btsId", "pocId", "template_name"] ] df_plmn_permitted = pd.DataFrame() if not df_bts.empty: base_plmn = df_bts[["bscid", "cellId", "bcfId", "btsId"]].rename( columns={"bscid": "BSCId"} ) df_plmn_permitted = base_plmn.loc[base_plmn.index.repeat(8)].reset_index( drop=True ) df_plmn_permitted["template_name"] = list(range(1, 9)) * len(base_plmn) df_plmn_permitted["plmnPermitted"] = "List;1;1;1;1;1;1;1;1" df_plmn_permitted = df_plmn_permitted[ ["BSCId", "cellId", "bcfId", "btsId", "template_name", "plmnPermitted"] ] sheets: dict[str, pd.DataFrame] = { "BCF": df_bcf, "BTS": df_bts, "BTS_GPRS": df_bts_min, "BTS_AMR": df_bts_min, "HOC": df_hoc, "POC": df_poc, "MAL": df_mal, "BTS_PLMNPERMITTED": df_plmn_permitted, "TRX": df_trx, } bytes_io = io.BytesIO() with pd.ExcelWriter(bytes_io, engine="xlsxwriter") as writer: for sheet_name, df in sheets.items(): df.to_excel(writer, sheet_name=sheet_name, index=False) return sheets, bytes_io.getvalue()