Spaces:
Running
Running
| import numpy as np | |
| import pandas as pd | |
| from queries.process_lte import process_lte_data | |
| from utils.convert_to_excel import save_dataframe | |
| from utils.kpi_analysis_utils import ( | |
| LteCapacity, | |
| analyze_prb_usage, | |
| cell_availability_analysis, | |
| create_dfs_per_kpi, | |
| create_hourly_date, | |
| kpi_naming_cleaning, | |
| ) | |
| LTE_ANALYSIS_COLUMNS = [ | |
| "code", | |
| "code_sector", | |
| "Region", | |
| "site_config_band", | |
| "Longitude", | |
| "Latitude", | |
| "LNCEL_name_l800", | |
| "LNCEL_name_l1800", | |
| "LNCEL_name_l2300", | |
| "LNCEL_name_l2600", | |
| "LNCEL_name_l1800s", | |
| "avg_prb_usage_bh_l800", | |
| "avg_prb_usage_bh_l1800", | |
| "avg_prb_usage_bh_l2300", | |
| "avg_prb_usage_bh_l2600", | |
| "avg_prb_usage_bh_l1800s", | |
| "avg_prb_usage_bh_l800_2nd", | |
| "avg_prb_usage_bh_l1800_2nd", | |
| "avg_prb_usage_bh_l2300_2nd", | |
| "avg_prb_usage_bh_l2600_2nd", | |
| "avg_prb_usage_bh_l1800s_2nd", | |
| "avg_act_ues_l800", | |
| "avg_act_ues_l1800", | |
| "avg_act_ues_l2300", | |
| "avg_act_ues_l2600", | |
| "avg_act_ues_l1800s", | |
| "avg_dl_thp_l800", | |
| "avg_dl_thp_l1800", | |
| "avg_dl_thp_l2300", | |
| "avg_dl_thp_l2600", | |
| "avg_dl_thp_l1800s", | |
| "avg_ul_thp_l800", | |
| "avg_ul_thp_l1800", | |
| "avg_ul_thp_l2300", | |
| "avg_ul_thp_l2600", | |
| "avg_ul_thp_l1800s", | |
| "num_congested_cells", | |
| "num_cells", | |
| "num_cell_with_kpi", | |
| "num_down_or_no_kpi_cells", | |
| "prb_diff_between_cells", | |
| "load_balance_required", | |
| "congestion_comment", | |
| "final_comments", | |
| ] | |
| LTE_DATABASE_COLUMNS = [ | |
| "code", | |
| "Region", | |
| "site_config_band", | |
| "final_name", | |
| "Longitude", | |
| "Latitude", | |
| ] | |
| KPI_COLUMNS = [ | |
| "date", | |
| "LNCEL_name", | |
| "Cell_Avail_excl_BLU", | |
| "E_UTRAN_Avg_PRB_usage_per_TTI_DL", | |
| "DL_PRB_Util_p_TTI_Lev_10", | |
| "Avg_PDCP_cell_thp_UL", | |
| "Avg_PDCP_cell_thp_DL", | |
| "Avg_act_UEs_DL", | |
| ] | |
| PRB_COLUMNS = [ | |
| "LNCEL_name", | |
| "avg_prb_usage_bh", | |
| "avg_prb_usage_bh_2nd", | |
| "avg_act_ues", | |
| "avg_dl_thp", | |
| "avg_ul_thp", | |
| ] | |
| def lte_analysis_logic( | |
| df: pd.DataFrame, | |
| prb_usage_threshold: int, | |
| prb_diff_between_cells_threshold: int, | |
| ) -> pd.DataFrame: | |
| lte_analysis_logic_df = df.copy() | |
| lte_analysis_logic_df["num_congested_cells"] = ( | |
| lte_analysis_logic_df[ | |
| [ | |
| "avg_prb_usage_bh_l800", | |
| "avg_prb_usage_bh_l1800", | |
| "avg_prb_usage_bh_l2300", | |
| "avg_prb_usage_bh_l2600", | |
| "avg_prb_usage_bh_l1800s", | |
| ] | |
| ] | |
| >= prb_usage_threshold | |
| ).sum(axis=1) | |
| # Add Number of cells LNCEL_name_l800 LNCEL_name_l1800 LNCEL_name_l2300 LNCEL_name_l2600 LNCEL_name_l1800s | |
| lte_analysis_logic_df["num_cells"] = lte_analysis_logic_df[ | |
| [ | |
| "LNCEL_name_l800", | |
| "LNCEL_name_l1800", | |
| "LNCEL_name_l2300", | |
| "LNCEL_name_l2600", | |
| "LNCEL_name_l1800s", | |
| ] | |
| ].count(axis=1) | |
| # Add Number of cell with KPI | |
| lte_analysis_logic_df["num_cell_with_kpi"] = lte_analysis_logic_df[ | |
| [ | |
| "avg_prb_usage_bh_l800", | |
| "avg_prb_usage_bh_l1800", | |
| "avg_prb_usage_bh_l2300", | |
| "avg_prb_usage_bh_l2600", | |
| "avg_prb_usage_bh_l1800s", | |
| ] | |
| ].count(axis=1) | |
| # Number of Down or No KPI cells = num_cells -num_cell_with_kpi | |
| lte_analysis_logic_df["num_down_or_no_kpi_cells"] = ( | |
| lte_analysis_logic_df["num_cells"] - lte_analysis_logic_df["num_cell_with_kpi"] | |
| ) | |
| # Check Max difference between avg_prb_usage_bh_l800 avg_prb_usage_bh_l1800 avg_prb_usage_bh_l2300 avg_prb_usage_bh_l2600 avg_prb_usage_bh_l1800s | |
| lte_analysis_logic_df["prb_diff_between_cells"] = lte_analysis_logic_df[ | |
| [ | |
| "avg_prb_usage_bh_l800", | |
| "avg_prb_usage_bh_l1800", | |
| "avg_prb_usage_bh_l2300", | |
| "avg_prb_usage_bh_l2600", | |
| "avg_prb_usage_bh_l1800s", | |
| ] | |
| ].apply(lambda row: max(row) - min(row), axis=1) | |
| # Add Load balance required column = Yes if prb_diff_between_cells > prb_diff_between_cells_threshold else No | |
| lte_analysis_logic_df["load_balance_required"] = lte_analysis_logic_df[ | |
| "prb_diff_between_cells" | |
| ].apply(lambda x: "Yes" if x > prb_diff_between_cells_threshold else "No") | |
| # Add Next band column | |
| lte_analysis_logic_df["next_band"] = lte_analysis_logic_df["site_config_band"].map( | |
| LteCapacity.next_band_mapping | |
| ) | |
| # Add congestion comments | |
| # if num_congested_cells == 0 and num_down_or_no_kpi_cells == 0 = " No Congestion" | |
| # if num_congested_cells == 0 and num_down_or_no_kpi_cells > 0 = "No congestion but Down cell" | |
| # if num_congested_cells > 0 and num_down_or_no_kpi_cells > 0 = "Congestion but Colocated Down Cell" | |
| # Else Need Action | |
| conditions = [ | |
| (lte_analysis_logic_df["num_congested_cells"] == 0) | |
| & (lte_analysis_logic_df["num_down_or_no_kpi_cells"] == 0), | |
| (lte_analysis_logic_df["num_congested_cells"] == 0) | |
| & (lte_analysis_logic_df["num_down_or_no_kpi_cells"] > 0), | |
| (lte_analysis_logic_df["num_congested_cells"] > 0) | |
| & (lte_analysis_logic_df["num_down_or_no_kpi_cells"] > 0), | |
| ] | |
| choices = [ | |
| "No Congestion", | |
| "No congestion but Down cell", | |
| "Congestion but Colocated Down Cell", | |
| ] | |
| lte_analysis_logic_df["congestion_comment"] = np.select( | |
| conditions, choices, default="Need Action" | |
| ) | |
| # Add "Actions" column | |
| # if load_balance_required = "Yes" and congestion_comment = "Need Action" then "Load Balancing parameter tuning required" | |
| # if load_balance_required = "Yes" and congestion_comment = "Need Action" then "Add Layer" | |
| # Else keep congestion_comment | |
| conditions = [ | |
| (lte_analysis_logic_df["load_balance_required"] == "Yes") | |
| & (lte_analysis_logic_df["congestion_comment"] == "Need Action"), | |
| (lte_analysis_logic_df["load_balance_required"] == "No") | |
| & (lte_analysis_logic_df["congestion_comment"] == "Need Action"), | |
| ] | |
| choices = [ | |
| "Load Balancing parameter tuning required", | |
| "Add Layer", | |
| ] | |
| lte_analysis_logic_df["actions"] = np.select( | |
| conditions, choices, default=lte_analysis_logic_df["congestion_comment"] | |
| ) | |
| # Add Final Comments | |
| # if "actions" = "Add Layer" then "'Add' + 'next_band'' | |
| # Else keep "actions" as it is | |
| lte_analysis_logic_df["final_comments"] = lte_analysis_logic_df.apply( | |
| lambda row: ( | |
| f"Add {row['next_band']}" | |
| if row["actions"] == "Add Layer" | |
| else row["actions"] | |
| ), | |
| axis=1, | |
| ) | |
| # create column "sector" equal to conteent of "LNCEL_name_l800" if not empty else "LNCEL_name_l1800" if not empty else "LNCEL_name_l2300" | |
| lte_analysis_logic_df["sector"] = ( | |
| lte_analysis_logic_df["LNCEL_name_l800"] | |
| .combine_first(lte_analysis_logic_df["LNCEL_name_l1800"]) | |
| .combine_first(lte_analysis_logic_df["LNCEL_name_l2300"]) | |
| .combine_first(lte_analysis_logic_df["LNCEL_name_l2600"]) | |
| .combine_first(lte_analysis_logic_df["LNCEL_name_l1800s"]) | |
| ) | |
| # remove rows where sector is empty | |
| lte_analysis_logic_df = lte_analysis_logic_df[ | |
| lte_analysis_logic_df["sector"].notna() | |
| ] | |
| # Add sector_id column if sector contains : '_1_" then 1 elif sector contains : '_2_" then 2 elif sector contains : '_3_" then 3 | |
| lte_analysis_logic_df["sector_id"] = np.where( | |
| lte_analysis_logic_df["sector"].str.contains("_1_"), | |
| 1, | |
| np.where( | |
| lte_analysis_logic_df["sector"].str.contains("_2_"), | |
| 2, | |
| np.where(lte_analysis_logic_df["sector"].str.contains("_3_"), 3, np.nan), | |
| ), | |
| ) | |
| # add code_sector column by combine code and sector_id | |
| lte_analysis_logic_df["code_sector"] = ( | |
| lte_analysis_logic_df["code"].astype(str) | |
| + "_" | |
| + lte_analysis_logic_df["sector_id"].astype(str) | |
| ) | |
| # remove '.0' from code_sector | |
| lte_analysis_logic_df["code_sector"] = lte_analysis_logic_df[ | |
| "code_sector" | |
| ].str.replace(".0", "") | |
| # lte_analysis_logic_df = lte_analysis_logic_df[LTE_ANALYSIS_COLUMNS] | |
| return lte_analysis_logic_df | |
| def dfs_per_band_cell(df: pd.DataFrame) -> pd.DataFrame: | |
| # Base DataFrame with unique codes, Region, and site_config_band | |
| all_codes_df = df[ | |
| ["code", "Region", "site_config_band", "Longitude", "Latitude"] | |
| ].drop_duplicates() | |
| # Configuration for sector groups and their respective LNCEL patterns and column suffixes | |
| # Format: { "group_key": [(lncel_name_pattern_part, column_suffix), ...] } | |
| # lncel_name_pattern_part will be combined with "_<group_key>" or similar | |
| # Example: for group "1", pattern "_1_L800" gives suffix "l800" | |
| sector_groups_config = { | |
| "1": [ | |
| ("_1_L800", "l800"), | |
| ("_1_L1800", "l1800"), | |
| ("_1_L2300", "l2300"), | |
| ("_1_L2600", "l2600"), | |
| ("_1S_L1800", "l1800s"), | |
| ], | |
| "2": [ | |
| ("_2_L800", "l800"), | |
| ("_2_L1800", "l1800"), | |
| ("_2_L2300", "l2300"), | |
| ("_2_L2600", "l2600"), | |
| ("_2S_L1800", "l1800s"), | |
| ], | |
| "3": [ | |
| ("_3_L800", "l800"), | |
| ("_3_L1800", "l1800"), | |
| ("_3_L2300", "l2300"), | |
| ("_3_L2600", "l2600"), | |
| ("_3S_L1800", "l1800s"), | |
| ], | |
| } | |
| all_processed_sectors_dfs = [] | |
| for sector_group_key, band_configurations in sector_groups_config.items(): | |
| # Start with the base DataFrame for the current sector group | |
| current_sector_group_df = all_codes_df.copy() | |
| for lncel_name_pattern, column_suffix in band_configurations: | |
| # Filter the original DataFrame for the current LNCEL pattern | |
| # The pattern assumes LNCEL_name contains something like "SITENAME<lncel_name_pattern>" | |
| filtered_band_df = df[df["LNCEL_name"].str.contains(lncel_name_pattern)] | |
| # Select relevant columns and rename them for the merge | |
| # This avoids pandas automatically adding _x, _y suffixes and then needing to rename them | |
| df_to_merge = filtered_band_df[ | |
| [ | |
| "code", | |
| "LNCEL_name", | |
| "avg_prb_usage_bh", | |
| "avg_prb_usage_bh_2nd", | |
| "avg_act_ues", | |
| "avg_dl_thp", | |
| "avg_ul_thp", | |
| ] | |
| ].rename( | |
| columns={ | |
| "LNCEL_name": f"LNCEL_name_{column_suffix}", | |
| "avg_prb_usage_bh": f"avg_prb_usage_bh_{column_suffix}", | |
| "avg_prb_usage_bh_2nd": f"avg_prb_usage_bh_{column_suffix}_2nd", | |
| "avg_act_ues": f"avg_act_ues_{column_suffix}", | |
| "avg_dl_thp": f"avg_dl_thp_{column_suffix}", | |
| "avg_ul_thp": f"avg_ul_thp_{column_suffix}", | |
| } | |
| ) | |
| # Perform a left merge | |
| current_sector_group_df = pd.merge( | |
| current_sector_group_df, df_to_merge, on="code", how="left" | |
| ) | |
| all_processed_sectors_dfs.append(current_sector_group_df) | |
| # Concatenate all the processed sector DataFrames | |
| all_sectors_dfs = pd.concat(all_processed_sectors_dfs, axis=0, ignore_index=True) | |
| # save_dataframe(all_sectors_dfs, "all_sectors_dfs.csv") | |
| return all_sectors_dfs | |
| def lte_database_for_capacity(dump_path: str): | |
| dfs = process_lte_data(dump_path) | |
| lte_fdd = dfs[0] | |
| lte_tdd = dfs[1] | |
| lte_fdd = lte_fdd[LTE_DATABASE_COLUMNS] | |
| lte_tdd = lte_tdd[LTE_DATABASE_COLUMNS] | |
| lte_db = pd.concat([lte_fdd, lte_tdd], axis=0) | |
| # rename final_name to LNCEL_name | |
| lte_db = lte_db.rename(columns={"final_name": "LNCEL_name"}) | |
| # save_dataframe(lte_db, "LTE_Database.csv") | |
| return lte_db | |
| def lte_bh_dfs_per_kpi( | |
| dump_path: str, | |
| df: pd.DataFrame, | |
| number_of_kpi_days: int = 7, | |
| availability_threshold: int = 95, | |
| prb_usage_threshold: int = 80, | |
| prb_diff_between_cells_threshold: int = 20, | |
| number_of_threshold_days: int = 3, | |
| main_prb_to_use: str = "", | |
| ) -> pd.DataFrame: | |
| # print(df.columns) | |
| pivoted_kpi_dfs = create_dfs_per_kpi( | |
| df=df, | |
| pivot_date_column="date", | |
| pivot_name_column="LNCEL_name", | |
| kpi_columns_from=2, | |
| ) | |
| cell_availability_df = cell_availability_analysis( | |
| df=pivoted_kpi_dfs["Cell_Avail_excl_BLU"], | |
| days=number_of_kpi_days, | |
| availability_threshold=availability_threshold, | |
| ) | |
| prb_usage_df = analyze_prb_usage( | |
| df=pivoted_kpi_dfs["E_UTRAN_Avg_PRB_usage_per_TTI_DL"], | |
| number_of_kpi_days=number_of_kpi_days, | |
| prb_usage_threshold=prb_usage_threshold, | |
| analysis_type="BH", | |
| number_of_threshold_days=number_of_threshold_days, | |
| suffix="" if main_prb_to_use == "E-UTRAN Avg PRB usage per TTI DL" else "_2nd", | |
| ) | |
| prb_lev10_usage_df = analyze_prb_usage( | |
| df=pivoted_kpi_dfs["DL_PRB_Util_p_TTI_Lev_10"], | |
| number_of_kpi_days=number_of_kpi_days, | |
| prb_usage_threshold=prb_usage_threshold, | |
| analysis_type="BH", | |
| number_of_threshold_days=number_of_threshold_days, | |
| suffix="" if main_prb_to_use == "DL PRB Util p TTI Lev_10" else "_2nd", | |
| ) | |
| act_ues_df = pivoted_kpi_dfs["Avg_act_UEs_DL"] | |
| # Add Max and avg columns for act_ues_df | |
| act_ues_df["max_act_ues"] = act_ues_df.max(axis=1) | |
| act_ues_df["avg_act_ues"] = act_ues_df.mean(axis=1) | |
| dl_thp_df = pivoted_kpi_dfs["Avg_PDCP_cell_thp_DL"] | |
| # Add Max and avg columns for dl_thp_df | |
| dl_thp_df["max_dl_thp"] = dl_thp_df.max(axis=1) | |
| dl_thp_df["avg_dl_thp"] = dl_thp_df.mean(axis=1) | |
| ul_thp_df = pivoted_kpi_dfs["Avg_PDCP_cell_thp_UL"] | |
| # Add Max and avg columns for ul_thp_df | |
| ul_thp_df["max_ul_thp"] = ul_thp_df.max(axis=1) | |
| ul_thp_df["avg_ul_thp"] = ul_thp_df.mean(axis=1) | |
| bh_kpi_df = pd.concat( | |
| [ | |
| cell_availability_df, | |
| prb_lev10_usage_df, | |
| prb_usage_df, | |
| act_ues_df, | |
| dl_thp_df, | |
| ul_thp_df, | |
| ], | |
| axis=1, | |
| ) | |
| bh_kpi_df = bh_kpi_df.reset_index() | |
| prb_df = bh_kpi_df[PRB_COLUMNS] | |
| # drop row if lnCEL_name is empty or 1 | |
| prb_df = prb_df[prb_df["LNCEL_name"].str.len() > 3] | |
| # prb_df = prb_df.reset_index() | |
| prb_df = prb_df.droplevel(level=1, axis=1) # Drop the first level (date) | |
| # prb_df = prb_df.reset_index() | |
| # prb_df["code"] = prb_df["LNCEL_name"].str.split("_").str[0] | |
| lte_db = lte_database_for_capacity(dump_path) | |
| db_and_prb = pd.merge(lte_db, prb_df, on="LNCEL_name", how="left") | |
| # if avg_prb_usage_bh is "" then set it to "cell exists in dump but not in BH report" | |
| # db_and_prb.loc[db_and_prb["avg_prb_usage_bh"].isnull(), "avg_prb_usage_bh"] = ( | |
| # "cell exists in dump but not in BH report" | |
| # ) | |
| # drop row if lnCEL_name is empty or 1 | |
| db_and_prb = db_and_prb[db_and_prb["LNCEL_name"].str.len() > 3] | |
| lte_analysis_df = dfs_per_band_cell(db_and_prb) | |
| lte_analysis_df = lte_analysis_logic( | |
| lte_analysis_df, | |
| prb_usage_threshold, | |
| prb_diff_between_cells_threshold, | |
| ) | |
| lte_analysis_df = lte_analysis_df[LTE_ANALYSIS_COLUMNS] | |
| # Rename columns | |
| lte_analysis_df = lte_analysis_df.rename( | |
| columns={ | |
| "LNCEL_name_l800": "name_l800", | |
| "LNCEL_name_l1800": "name_l1800", | |
| "LNCEL_name_l2300": "name_l2300", | |
| "LNCEL_name_l2600": "name_l2600", | |
| "LNCEL_name_l1800s": "name_l1800s", | |
| "avg_prb_usage_bh_l800": "prb_l800", | |
| "avg_prb_usage_bh_l1800": "prb_l1800", | |
| "avg_prb_usage_bh_l2300": "prb_l2300", | |
| "avg_prb_usage_bh_l2600": "prb_l2600", | |
| "avg_prb_usage_bh_l1800s": "prb_l1800s", | |
| "avg_prb_usage_bh_l800_2nd": "prb_l800_2nd", | |
| "avg_prb_usage_bh_l1800_2nd": "prb_l1800_2nd", | |
| "avg_prb_usage_bh_l2300_2nd": "prb_l2300_2nd", | |
| "avg_prb_usage_bh_l2600_2nd": "prb_l2600_2nd", | |
| "avg_prb_usage_bh_l1800s_2nd": "prb_l1800s_2nd", | |
| "avg_act_ues_l800": "act_ues_l800", | |
| "avg_act_ues_l1800": "act_ues_l1800", | |
| "avg_act_ues_l2300": "act_ues_l2300", | |
| "avg_act_ues_l2600": "act_ues_l2600", | |
| "avg_act_ues_l1800s": "act_ues_l1800s", | |
| "avg_dl_thp_l800": "dl_thp_l800", | |
| "avg_dl_thp_l1800": "dl_thp_l1800", | |
| "avg_dl_thp_l2300": "dl_thp_l2300", | |
| "avg_dl_thp_l2600": "dl_thp_l2600", | |
| "avg_dl_thp_l1800s": "dl_thp_l1800s", | |
| "avg_ul_thp_l800": "ul_thp_l800", | |
| "avg_ul_thp_l1800": "ul_thp_l1800", | |
| "avg_ul_thp_l2300": "ul_thp_l2300", | |
| "avg_ul_thp_l2600": "ul_thp_l2600", | |
| "avg_ul_thp_l1800s": "ul_thp_l1800s", | |
| } | |
| ) | |
| return [bh_kpi_df, lte_analysis_df] | |
| def process_lte_bh_report( | |
| dump_path: str, | |
| bh_report_path: str, | |
| num_last_days: int, | |
| num_threshold_days: int, | |
| availability_threshold: float, | |
| prb_usage_threshold: float, | |
| prb_diff_between_cells_threshold: float, | |
| main_prb_to_use: str, | |
| ) -> dict: | |
| """ | |
| Process LTE Busy Hour report and perform capacity analysis | |
| Args: | |
| bh_report_path: Path to BH report CSV file | |
| num_last_days: Number of last days for analysis | |
| num_threshold_days: Number of days for threshold calculation | |
| availability_threshold: Minimum required availability | |
| prb_usage_threshold: Maximum allowed PRB usage | |
| prb_diff_between_cells_threshold: Maximum allowed PRB usage difference between cells | |
| Returns: | |
| Dictionary containing analysis results and DataFrames | |
| """ | |
| LteCapacity.final_results = None | |
| # lte_db_dfs = lte_database_for_capacity(dump_path) | |
| # Read BH report | |
| df = pd.read_csv(bh_report_path, delimiter=";") | |
| df = kpi_naming_cleaning(df) | |
| # print(df.columns) | |
| df = create_hourly_date(df) | |
| df = df[KPI_COLUMNS] | |
| pivoted_kpi_dfs = lte_bh_dfs_per_kpi( | |
| dump_path=dump_path, | |
| df=df, | |
| number_of_kpi_days=num_last_days, | |
| availability_threshold=availability_threshold, | |
| prb_usage_threshold=prb_usage_threshold, | |
| prb_diff_between_cells_threshold=prb_diff_between_cells_threshold, | |
| number_of_threshold_days=num_threshold_days, | |
| main_prb_to_use=main_prb_to_use, | |
| ) | |
| # save_dataframe(pivoted_kpi_dfs, "LTE_BH_Report.csv") | |
| return pivoted_kpi_dfs | |