| import re |
|
|
| import numpy as np |
| import pandas as pd |
|
|
|
|
| class GsmAnalysis: |
| hf_rate_coef = { |
| 10: 1.1, |
| 20: 1.2, |
| 40: 1.4, |
| 60: 1.6, |
| 70: 1.7, |
| 80: 1.8, |
| 99: 2.0, |
| 100: 1.4, |
| } |
| erlangB_table = { |
| 1: 0.0204, |
| 2: 0.2234, |
| 3: 0.6022, |
| 4: 1.092, |
| 5: 1.657, |
| 6: 2.276, |
| 7: 2.935, |
| 8: 3.627, |
| 9: 4.345, |
| 10: 5.084, |
| 11: 5.841, |
| 12: 6.614, |
| 13: 7.401, |
| 14: 8.2, |
| 15: 9.009, |
| 16: 9.828, |
| 17: 10.66, |
| 18: 11.49, |
| 19: 12.33, |
| 20: 13.18, |
| 21: 14.04, |
| 22: 14.9, |
| 23: 15.76, |
| 24: 16.63, |
| 25: 17.5, |
| 26: 18.38, |
| 27: 19.26, |
| 28: 20.15, |
| 29: 21.04, |
| 30: 21.93, |
| 31: 22.83, |
| 32: 23.72, |
| 33: 24.63, |
| 34: 25.53, |
| 35: 26.43, |
| 36: 27.34, |
| 37: 28.25, |
| 38: 29.17, |
| 39: 30.08, |
| 40: 31, |
| 41: 31.91, |
| 42: 32.84, |
| 43: 33.76, |
| 44: 34.68, |
| 45: 35.61, |
| 46: 36.53, |
| 47: 37.46, |
| 48: 38.39, |
| 49: 39.32, |
| 50: 40.25, |
| 51: 41.19, |
| 52: 42.12, |
| 53: 43.06, |
| 54: 44, |
| 55: 44.93, |
| 56: 45.88, |
| 57: 46.81, |
| 58: 47.75, |
| 59: 48.7, |
| 60: 49.64, |
| 61: 50.59, |
| 62: 51.53, |
| 63: 52.48, |
| 64: 53.43, |
| 65: 54.38, |
| 66: 55.32, |
| 67: 56.27, |
| 68: 57.22, |
| 69: 58.18, |
| 70: 59.13, |
| 71: 60.08, |
| 72: 61.04, |
| 73: 61.99, |
| 74: 62.94, |
| 75: 63.9, |
| 76: 64.86, |
| 77: 65.81, |
| 78: 66.77, |
| 79: 67.73, |
| 80: 68.69, |
| 81: 69.64, |
| 82: 70.61, |
| 83: 71.57, |
| 84: 72.53, |
| 85: 73.49, |
| 86: 74.45, |
| 87: 75.41, |
| 88: 76.38, |
| 89: 77.34, |
| 90: 78.3, |
| 91: 79.27, |
| 92: 80.23, |
| 93: 81.2, |
| 94: 82.16, |
| 95: 83.13, |
| 96: 84.09, |
| 97: 85.06, |
| 98: 86.03, |
| 99: 87, |
| 100: 87.97, |
| 101: 88.94, |
| 102: 89.91, |
| 103: 90.88, |
| 104: 91.85, |
| 105: 92.82, |
| 106: 93.79, |
| 107: 94.76, |
| 108: 95.73, |
| 109: 96.71, |
| 110: 97.68, |
| 111: 98.65, |
| 112: 99.63, |
| 113: 100.6, |
| 114: 101.57, |
| 115: 102.54, |
| 116: 103.52, |
| 117: 104.49, |
| 118: 105.47, |
| 119: 106.44, |
| 120: 107.42, |
| 121: 108.4, |
| 122: 109.37, |
| 123: 110.35, |
| 124: 111.32, |
| 125: 112.3, |
| 126: 113.28, |
| 127: 114.25, |
| 128: 115.23, |
| 129: 116.21, |
| 130: 117.19, |
| 131: 118.17, |
| 132: 119.15, |
| 133: 120.12, |
| 134: 121.1, |
| 135: 122.08, |
| 136: 123.07, |
| 137: 124.04, |
| 138: 125.02, |
| 139: 126.01341, |
| 140: 127.00918, |
| 141: 127.96752, |
| 142: 128.98152, |
| 143: 129.92152, |
| 144: 130.88534, |
| 145: 131.96461, |
| 146: 132.89897, |
| 147: 133.86373, |
| 148: 134.82569, |
| 149: 135.76295, |
| 150: 136.82988, |
| 151: 137.79, |
| 152: 138.77, |
| 153: 139.75, |
| 154: 140.74, |
| 155: 141.72, |
| 156: 142.7, |
| 157: 143.69, |
| 158: 144.67, |
| 159: 145.66, |
| 160: 146.64, |
| 161: 147.63, |
| 162: 148.61, |
| 163: 149.6, |
| 164: 150.58, |
| 165: 151.57, |
| 166: 152.55, |
| 167: 153.54, |
| 168: 154.53, |
| 169: 155.51, |
| 170: 156.5, |
| 171: 157.48, |
| 172: 158.47, |
| 173: 159.46, |
| 174: 160.44, |
| 175: 161.43, |
| 176: 162.42, |
| 177: 163.41, |
| 178: 164.39, |
| 179: 165.38, |
| 180: 166.37, |
| 181: 167.36, |
| 182: 168.35, |
| 183: 169.33, |
| 184: 170.32, |
| 185: 171.31, |
| 186: 172.3, |
| 187: 173.29, |
| 188: 174.28, |
| 189: 175.27, |
| 190: 176.26, |
| 191: 177.25, |
| 192: 178.24, |
| 193: 179.23, |
| 194: 180.22, |
| 195: 181.21, |
| 196: 182.2, |
| 197: 183.19, |
| 198: 184.18, |
| 199: 185.17, |
| 200: 186.16, |
| } |
|
|
|
|
| class GsmCapacity: |
| final_results = None |
| operational_neighbours_df = None |
| final_comment_mapping = { |
| "Availability and TX issues": "Operational issues with no congestion", |
| "Availability issues": "Operational issues with no congestion", |
| "TX issues": "Operational issues with no congestion", |
| "Operational is OK": "Operational is OK with no congestion", |
| "Tch utilization exceeded threshold, Availability and TX issues": "High utilization with Operational issues", |
| "Tch utilization exceeded threshold, Availability issues": "High utilization with Operational issues", |
| "Tch utilization exceeded threshold, TX issues": "High utilization with Operational issues", |
| "Tch utilization exceeded threshold, SDCCH blocking exceeded threshold, Operational is OK": "High Utilization with Congestion without Operational issues", |
| "Tch utilization exceeded threshold, TCH blocking exceeded threshold, Operational is OK": "High Utilization with Congestion without Operational issues", |
| "Tch utilization exceeded threshold, TCH blocking exceeded threshold, SDCCH blocking exceeded threshold, Operational is OK": "High Utilization with Congestion without Operational issues", |
| "Tch utilization exceeded threshold, TCH blocking exceeded threshold, SDCCH blocking exceeded threshold, TX issues": "High Utilization with Congestion without Operational issues", |
| "Tch utilization exceeded threshold, SDCCH blocking exceeded threshold, Availability and TX issues": "High utilization with Congestion and operational issues", |
| "Tch utilization exceeded threshold, SDCCH blocking exceeded threshold, TX issues": "High utilization with Congestion and operational issues", |
| "Tch utilization exceeded threshold, TCH blocking exceeded threshold, Availability and TX issues": "High utilization with Congestion and operational issues", |
| "Tch utilization exceeded threshold, TCH blocking exceeded threshold, Availability issues": "High utilization with Congestion and operational issues", |
| "Tch utilization exceeded threshold, TCH blocking exceeded threshold, SDCCH blocking exceeded threshold, Availability and TX issues": "High utilization with Congestion and operational issues", |
| "Tch utilization exceeded threshold, TCH blocking exceeded threshold, SDCCH blocking exceeded threshold, Availability issues": "High utilization with Congestion and operational issues", |
| "Tch utilization exceeded threshold, TCH blocking exceeded threshold, TX issues": "High utilization with Congestion and operational issues", |
| "Down Site": "Down Cell", |
| "SDCCH blocking exceeded threshold, Operational is OK": "Congestion without Operational issues", |
| "TCH blocking exceeded threshold, Operational is OK": "Congestion without Operational issues", |
| "TCH blocking exceeded threshold, SDCCH blocking exceeded threshold, Operational is OK": "Congestion without Operational issues", |
| "Tch utilization exceeded threshold, Operational is OK": "High utilization without Congestion and Operational issues", |
| "SDCCH blocking exceeded threshold, Availability and TX issues": "Congestion with Operational issues", |
| "SDCCH blocking exceeded threshold, Availability issues": "Congestion with Operational issues", |
| "SDCCH blocking exceeded threshold, TX issues": "Congestion with Operational issues", |
| "TCH blocking exceeded threshold, Availability and TX issues": "Congestion with Operational issues", |
| "TCH blocking exceeded threshold, Availability issues": "Congestion with Operational issues", |
| "TCH blocking exceeded threshold, SDCCH blocking exceeded threshold, Availability and TX issues": "Congestion with Operational issues", |
| "TCH blocking exceeded threshold, SDCCH blocking exceeded threshold, Availability issues": "Congestion with Operational issues", |
| "TCH blocking exceeded threshold, SDCCH blocking exceeded threshold, TX issues": "Congestion with Operational issues", |
| "TCH blocking exceeded threshold, TX issues": "Congestion with Operational issues", |
| } |
|
|
|
|
| def combine_comments(df: pd.DataFrame, *columns: str, new_column: str) -> pd.DataFrame: |
| """ |
| Combine comments from multiple columns into one column. |
| |
| Args: |
| df: DataFrame containing comment columns |
| *columns: Variable number of column names containing comments |
| new_column: Name for the new combined comments column |
| |
| Returns: |
| DataFrame with a new column containing combined comments |
| """ |
| result_df = df.copy() |
| result_df[new_column] = result_df[list(columns)].apply( |
| lambda row: ", ".join([str(x) for x in row if x]), axis=1 |
| ) |
| |
| result_df[new_column] = result_df[new_column].str.replace( |
| r"^[,\s]+|[,\s]+$", "", regex=True |
| ) |
| |
| result_df[new_column] = result_df[new_column].str.replace( |
| r",\s*,", ", ", regex=True |
| ) |
| return result_df |
|
|
|
|
| def summarize_fails_comments(comment): |
| if not comment or pd.isna(comment) or comment.strip() == "": |
| return "" |
|
|
| |
| matches = re.findall(r"rrc_fail_([a-z_]+)", comment) |
| if not matches: |
| return "" |
|
|
| |
| unique_sorted = sorted(set(matches)) |
|
|
| |
| return ", ".join(unique_sorted) + " fails" |
|
|
|
|
| def kpi_naming_cleaning(df: pd.DataFrame) -> pd.DataFrame: |
| """ |
| Clean KPI column names by replacing special characters and standardizing format. |
| |
| Args: |
| df: DataFrame with KPI column names to clean |
| |
| Returns: |
| DataFrame with cleaned column names |
| """ |
| name_df: pd.DataFrame = df.copy() |
| name_df.columns = name_df.columns.str.replace("[ /(),-.']", "_", regex=True) |
| name_df.columns = name_df.columns.str.replace("___", "_") |
| name_df.columns = name_df.columns.str.replace("__", "_") |
| name_df.columns = name_df.columns.str.replace("%", "perc") |
| name_df.columns = name_df.columns.str.rstrip("_") |
| return name_df |
|
|
|
|
| def create_daily_date(df: pd.DataFrame) -> pd.DataFrame: |
| """ |
| Create a daily date column from PERIOD_START_TIME and drop unnecessary columns. |
| |
| Args: |
| df: DataFrame containing PERIOD_START_TIME column |
| |
| Returns: |
| DataFrame with new date column and unnecessary columns removed |
| """ |
| date_df: pd.DataFrame = df.copy() |
| date_df[["mois", "jour", "annee"]] = date_df["PERIOD_START_TIME"].str.split( |
| ".", expand=True |
| ) |
| date_df["date"] = date_df["annee"] + "-" + date_df["mois"] + "-" + date_df["jour"] |
| |
| date_df = date_df.drop(["annee", "mois", "jour", "PERIOD_START_TIME"], axis=1) |
| return date_df |
|
|
|
|
| def create_hourly_date(df: pd.DataFrame) -> pd.DataFrame: |
| date_df: pd.DataFrame = df |
| date_df[["date_t", "hour"]] = date_df["PERIOD_START_TIME"].str.split( |
| " ", expand=True |
| ) |
| date_df[["mois", "jour", "annee"]] = date_df["date_t"].str.split(".", expand=True) |
| date_df["datetime"] = ( |
| date_df["annee"] |
| + "-" |
| + date_df["mois"] |
| + "-" |
| + date_df["jour"] |
| + " " |
| + date_df["hour"] |
| ) |
|
|
| date_df["date"] = date_df["annee"] + "-" + date_df["mois"] + "-" + date_df["jour"] |
|
|
| |
| date_df = date_df.drop( |
| ["annee", "mois", "jour", "date_t", "PERIOD_START_TIME"], axis=1 |
| ) |
| return date_df |
|
|
|
|
| def create_dfs_per_kpi( |
| df: pd.DataFrame = None, |
| pivot_date_column: str = "date", |
| pivot_name_column: str = "BTS_name", |
| kpi_columns_from: int = None, |
| ) -> pd.DataFrame: |
| """ |
| Create pivoted DataFrames for each KPI and perform analysis. |
| |
| Args: |
| df: DataFrame containing KPI data |
| Returns: |
| DataFrame with combined analysis results |
| """ |
| kpi_columns = df.columns[kpi_columns_from:] |
|
|
| pivoted_kpi_dfs = {} |
|
|
| |
| for kpi in kpi_columns: |
| temp_df = df[[pivot_date_column, pivot_name_column, kpi]].copy() |
| |
| temp_df = temp_df.drop_duplicates( |
| subset=[pivot_name_column, pivot_date_column], keep="first" |
| ) |
| temp_df = temp_df.reset_index() |
| |
| pivot_df = temp_df.pivot( |
| index=pivot_name_column, columns=pivot_date_column, values=kpi |
| ) |
| pivot_df.columns = pd.MultiIndex.from_product([[kpi], pivot_df.columns]) |
| pivot_df.columns.names = ["KPI", "Date"] |
|
|
| |
| pivoted_kpi_dfs[kpi] = pivot_df |
|
|
| return pivoted_kpi_dfs |
|
|
|
|
| def cell_availability_analysis( |
| df: pd.DataFrame, |
| days: int = 7, |
| availability_threshold: int = 95, |
| analysis_type: str = "daily", |
| ) -> pd.DataFrame: |
| """ |
| Analyze cell availability and categorize sites based on availability metrics. |
| |
| Args: |
| df: DataFrame containing cell availability data |
| days: Number of days to analyze |
| |
| Returns: |
| DataFrame with availability analysis and site status comments |
| """ |
| result_df: pd.DataFrame = df.copy().fillna(0) |
| last_days_df: pd.DataFrame = result_df.iloc[:, -days:] |
| result_df[f"Average_cell_availability_{analysis_type.lower()}"] = last_days_df.mean( |
| axis=1 |
| ).round(2) |
|
|
| |
| result_df[ |
| f"number_of_days_exceeding_availability_threshold_{analysis_type.lower()}" |
| ] = last_days_df.apply( |
| lambda row: sum(1 for x in row if x <= availability_threshold), axis=1 |
| ) |
|
|
| |
| def categorize_availability(x: float) -> str: |
| if x == 0 or pd.isnull(x): |
| return "Down Site" |
| elif 0 < x <= 70: |
| return "critical instability" |
| elif 70 < x <= availability_threshold: |
| return "instability" |
| else: |
| return "Availability OK" |
|
|
| result_df[f"availability_comment_{analysis_type.lower()}"] = result_df[ |
| f"Average_cell_availability_{analysis_type.lower()}" |
| ].apply(categorize_availability) |
|
|
| return result_df |
|
|
|
|
| def analyze_tch_abis_fails( |
| df: pd.DataFrame, |
| number_of_kpi_days: int, |
| analysis_type: str, |
| number_of_threshold_days: int, |
| tch_abis_fails_threshold: int, |
| ) -> pd.DataFrame: |
|
|
| result_df: pd.DataFrame = df.copy() |
| last_days_df: pd.DataFrame = result_df.iloc[:, -number_of_kpi_days:] |
| |
|
|
| result_df[f"avg_tch_abis_fail_{analysis_type.lower()}"] = last_days_df.mean( |
| axis=1 |
| ).round(2) |
| result_df[f"max_tch_abis_fail_{analysis_type.lower()}"] = last_days_df.max(axis=1) |
| |
| result_df[f"number_of_days_with_tch_abis_fail_exceeded_{analysis_type.lower()}"] = ( |
| last_days_df.apply( |
| lambda row: sum(1 for x in row if x >= tch_abis_fails_threshold), axis=1 |
| ) |
| ) |
|
|
| |
| result_df[f"tch_abis_fail_{analysis_type.lower()}_comment"] = np.where( |
| result_df[f"number_of_days_with_tch_abis_fail_exceeded_{analysis_type.lower()}"] |
| >= number_of_threshold_days, |
| "tch abis fail exceeded threshold", |
| None, |
| ) |
|
|
| return result_df |
|
|
|
|
| def analyze_tch_call_blocking( |
| df: pd.DataFrame, |
| number_of_kpi_days: int, |
| analysis_type: str, |
| number_of_threshold_days: int, |
| tch_blocking_threshold: int, |
| ) -> pd.DataFrame: |
|
|
| result_df = df.copy() |
| last_days_df: pd.DataFrame = result_df.iloc[:, -number_of_kpi_days:] |
| |
|
|
| result_df[f"avg_tch_call_blocking_{analysis_type.lower()}"] = last_days_df.mean( |
| axis=1 |
| ).round(2) |
| result_df[f"max_tch_call_blocking_{analysis_type.lower()}"] = last_days_df.max( |
| axis=1 |
| ) |
| |
| result_df[f"number_of_days_with_tch_blocking_exceeded_{analysis_type.lower()}"] = ( |
| last_days_df.apply( |
| lambda row: sum(1 for x in row if x >= tch_blocking_threshold), axis=1 |
| ) |
| ) |
|
|
| |
| result_df[f"tch_call_blocking_{analysis_type.lower()}_comment"] = np.where( |
| result_df[f"number_of_days_with_tch_blocking_exceeded_{analysis_type.lower()}"] |
| >= number_of_threshold_days, |
| "TCH blocking exceeded threshold", |
| None, |
| ) |
| return result_df |
|
|
|
|
| def analyze_sdcch_call_blocking( |
| df: pd.DataFrame, |
| number_of_kpi_days: int, |
| sdcch_blocking_threshold: int, |
| analysis_type: str, |
| number_of_threshold_days: int, |
| ) -> pd.DataFrame: |
|
|
| result_df = df.copy() |
| last_days_df: pd.DataFrame = result_df.iloc[:, -number_of_kpi_days:] |
| |
|
|
| result_df[f"avg_sdcch_real_blocking_{analysis_type.lower()}"] = last_days_df.mean( |
| axis=1 |
| ).round(2) |
| result_df[f"max_sdcch_real_blocking_{analysis_type.lower()}"] = last_days_df.max( |
| axis=1 |
| ) |
| |
| result_df[ |
| f"number_of_days_with_sdcch_blocking_exceeded_{analysis_type.lower()}" |
| ] = last_days_df.apply( |
| lambda row: sum(1 for x in row if x >= sdcch_blocking_threshold), axis=1 |
| ) |
|
|
| |
| result_df[f"sdcch_real_blocking_{analysis_type.lower()}_comment"] = np.where( |
| result_df[ |
| f"number_of_days_with_sdcch_blocking_exceeded_{analysis_type.lower()}" |
| ] |
| >= number_of_threshold_days, |
| "SDCCH blocking exceeded threshold", |
| None, |
| ) |
|
|
| return result_df |
|
|
|
|
| class LteCapacity: |
| final_results = None |
| |
| next_band_mapping = { |
| "L1800": "L800", |
| "L800": "L1800", |
| "L1800/L800": "L2600", |
| "L1800/L2300/L800": "L2600", |
| "L2300/L800": "L2600", |
| "L1800/L2600/L800": "New site/Dual Beam", |
| "L1800/L2300/L2600/L800": "New site/Dual Beam", |
| "L2300": "FDD H// colocated site", |
| } |
|
|
|
|
| def analyze_prb_usage( |
| df: pd.DataFrame, |
| number_of_kpi_days: int, |
| prb_usage_threshold: int, |
| analysis_type: str, |
| number_of_threshold_days: int, |
| suffix: str = "", |
| ) -> pd.DataFrame: |
| result_df = df.copy() |
| last_days_df: pd.DataFrame = result_df.iloc[:, -number_of_kpi_days:] |
| |
|
|
| result_df[f"avg_prb_usage_{analysis_type.lower()}{suffix}"] = last_days_df.mean( |
| axis=1 |
| ).round(2) |
| result_df[f"max_prb_usage_{analysis_type.lower()}{suffix}"] = last_days_df.max( |
| axis=1 |
| ) |
| |
| result_df[ |
| f"number_of_days_with_prb_usage_exceeded_{analysis_type.lower()}{suffix}" |
| ] = last_days_df.apply( |
| lambda row: sum(1 for x in row if x >= prb_usage_threshold), axis=1 |
| ) |
|
|
| |
| result_df[f"prb_usage_{analysis_type.lower()}{suffix}_comment"] = np.where( |
| result_df[ |
| f"number_of_days_with_prb_usage_exceeded_{analysis_type.lower()}{suffix}" |
| ] |
| >= number_of_threshold_days, |
| "PRB usage exceeded threshold", |
| None, |
| ) |
| return result_df |
|
|
|
|
| def analyze_fails_kpi( |
| df: pd.DataFrame, |
| number_of_kpi_days: int, |
| number_of_threshold_days: int, |
| kpi_threshold: int, |
| kpi_column_name: str, |
| ) -> pd.DataFrame: |
| result_df: pd.DataFrame = df.copy() |
| last_days_df: pd.DataFrame = result_df.iloc[:, -number_of_kpi_days:] |
| |
|
|
| result_df[f"avg_{kpi_column_name}"] = last_days_df.mean(axis=1).round(2) |
| result_df[f"max_{kpi_column_name}"] = last_days_df.max(axis=1) |
| |
| result_df[f"number_of_days_with_{kpi_column_name}_exceeded"] = last_days_df.apply( |
| lambda row: sum(1 for x in row if x >= kpi_threshold), axis=1 |
| ) |
|
|
| |
| result_df[f"{kpi_column_name}_comment"] = np.where( |
| result_df[f"number_of_days_with_{kpi_column_name}_exceeded"] |
| >= number_of_threshold_days, |
| f"{kpi_column_name} exceeded threshold", |
| None, |
| ) |
| return result_df |
|
|
|
|
| def analyze_lcg_utilization( |
| df: pd.DataFrame, |
| number_of_kpi_days: int, |
| number_of_threshold_days: int, |
| kpi_threshold: int, |
| kpi_column_name: str, |
| ) -> pd.DataFrame: |
| result_df: pd.DataFrame = df.copy() |
| last_days_df: pd.DataFrame = result_df.iloc[:, -number_of_kpi_days:] |
| |
|
|
| result_df[f"avg_{kpi_column_name}"] = last_days_df.mean(axis=1).round(2) |
| result_df[f"max_{kpi_column_name}"] = last_days_df.max(axis=1) |
| |
| result_df[f"number_of_days_with_{kpi_column_name}_exceeded"] = last_days_df.apply( |
| lambda row: sum(1 for x in row if x >= kpi_threshold), axis=1 |
| ) |
|
|
| |
| result_df[f"{kpi_column_name}_comment"] = np.where( |
| result_df[f"number_of_days_with_{kpi_column_name}_exceeded"] |
| >= number_of_threshold_days, |
| f"{kpi_column_name} exceeded threshold", |
| None, |
| ) |
| return result_df |
|
|