Spaces:
Running
Running
| import numpy as np | |
| import pandas as pd | |
| from utils.kpi_analysis_utils import ( | |
| analyze_lcg_utilization, | |
| combine_comments, | |
| create_daily_date, | |
| create_dfs_per_kpi, | |
| kpi_naming_cleaning, | |
| ) | |
| from utils.utils_vars import get_physical_db | |
| lcg_comments_mapping = { | |
| "2": "No Congestion", | |
| "1": "No Congestion", | |
| "lcg1 exceeded threshold, lcg2 exceeded threshold, 2": "Need BB SU upgrage", | |
| "lcg1 exceeded threshold, 2": "Need LCG balancing", | |
| "lcg1 exceeded threshold, 1": "Need BB SU upgrage", | |
| "lcg2 exceeded threshold, 2": "Need LCG balancing", | |
| } | |
| KPI_COLUMNS = [ | |
| "date", | |
| "WBTS_name", | |
| "lcg_id", | |
| "BB_SU_LCG_MAX_R", | |
| ] | |
| LCG_ANALYSIS_COLUMNS = [ | |
| "WBTS_name", | |
| "lcg1_utilisation", | |
| "avg_lcg1", | |
| "max_lcg1", | |
| "number_of_days_with_lcg1_exceeded", | |
| "lcg1_comment", | |
| "lcg2_utilisation", | |
| "avg_lcg2", | |
| "max_lcg2", | |
| "number_of_days_with_lcg2_exceeded", | |
| "lcg2_comment", | |
| "difference_between_lcgs", | |
| "difference_between_lcgs_comment", | |
| "lcg_comment", | |
| "number_of_lcg", | |
| "final_comments", | |
| ] | |
| def lcg_kpi_analysis( | |
| df, | |
| num_last_days, | |
| num_threshold_days, | |
| lcg_utilization_threshold, | |
| difference_between_lcgs, | |
| ) -> pd.DataFrame: | |
| """ | |
| Analyze LCG capacity data. | |
| Args: | |
| df: DataFrame containing LCG capacity data | |
| num_last_days: Number of days for analysis | |
| num_threshold_days: Minimum days above threshold to flag for upgrade | |
| lcg_utilization_threshold: Utilization threshold percentage for flagging | |
| difference_between_lcgs: Difference between LCGs for flagging | |
| Returns: | |
| Processed DataFrame with LCG capacity analysis results | |
| """ | |
| lcg1_df = df[df["lcg_id"] == 1] | |
| lcg2_df = df[df["lcg_id"] == 2] | |
| pivoted_kpi_dfs = create_dfs_per_kpi( | |
| df=df, | |
| pivot_date_column="date", | |
| pivot_name_column="WBTS_name", | |
| kpi_columns_from=2, | |
| ) | |
| pivoted_lcg1_df = create_dfs_per_kpi( | |
| df=lcg1_df, | |
| pivot_date_column="date", | |
| pivot_name_column="WBTS_name", | |
| kpi_columns_from=2, | |
| ) | |
| pivoted_lcg2_df = create_dfs_per_kpi( | |
| df=lcg2_df, | |
| pivot_date_column="date", | |
| pivot_name_column="WBTS_name", | |
| kpi_columns_from=2, | |
| ) | |
| # BB_SU_LCG_MAX_R to have all site with LCG 1 and/ or LCG 2 | |
| BB_SU_LCG_MAX_R_df = pivoted_kpi_dfs["BB_SU_LCG_MAX_R"] | |
| pivoted_lcg1_df = pivoted_lcg1_df["BB_SU_LCG_MAX_R"] | |
| pivoted_lcg2_df = pivoted_lcg2_df["BB_SU_LCG_MAX_R"] | |
| # rename column | |
| pivoted_lcg1_df = pivoted_lcg1_df.rename( | |
| columns={"BB_SU_LCG_MAX_R": "lcg1_utilisation"} | |
| ) | |
| pivoted_lcg2_df = pivoted_lcg2_df.rename( | |
| columns={"BB_SU_LCG_MAX_R": "lcg2_utilisation"} | |
| ) | |
| # analyze lcg utilization for each site per number_of_kpi_days and number_of_threshold_days | |
| pivoted_lcg1_df = analyze_lcg_utilization( | |
| df=pivoted_lcg1_df, | |
| number_of_kpi_days=num_last_days, | |
| number_of_threshold_days=num_threshold_days, | |
| kpi_threshold=lcg_utilization_threshold, | |
| kpi_column_name="lcg1", | |
| ) | |
| pivoted_lcg2_df = analyze_lcg_utilization( | |
| df=pivoted_lcg2_df, | |
| number_of_kpi_days=num_last_days, | |
| number_of_threshold_days=num_threshold_days, | |
| kpi_threshold=lcg_utilization_threshold, | |
| kpi_column_name="lcg2", | |
| ) | |
| kpi_df = pd.concat( | |
| [ | |
| BB_SU_LCG_MAX_R_df, | |
| pivoted_lcg1_df, | |
| pivoted_lcg2_df, | |
| ], | |
| axis=1, | |
| ) | |
| kpi_df = kpi_df.reset_index() | |
| # Number of available lcgs | |
| # kpi_df = pd.merge(kpi_df, available_lcgs_df, on="WBTS_name", how="left") | |
| # calculate difference between lcg1 and lcg2 | |
| kpi_df["difference_between_lcgs"] = kpi_df[["avg_lcg1", "avg_lcg2"]].apply( | |
| lambda row: max(row) - min(row), axis=1 | |
| ) | |
| # flag if difference between lcg1 and lcg2 is above threshold | |
| kpi_df["difference_between_lcgs_comment"] = np.where( | |
| kpi_df["difference_between_lcgs"] > difference_between_lcgs, | |
| "difference between lcgs exceeded threshold", | |
| None, | |
| ) | |
| # Combine comments | |
| kpi_df = combine_comments( | |
| kpi_df, | |
| "lcg1_comment", | |
| "lcg2_comment", | |
| # "difference_between_lcgs_comment", | |
| new_column="lcg_comment", | |
| ) | |
| # Replace if "lcg_comment" contains "nan" and ", nan" and "nan, " with None | |
| kpi_df["lcg_comment"] = kpi_df["lcg_comment"].replace("nan", None) | |
| # Remove "nan" from comma-separated strings | |
| kpi_df["lcg_comment"] = ( | |
| kpi_df["lcg_comment"].str.replace(r"\bnan\b,?\s?", "", regex=True).str.strip() | |
| ) | |
| kpi_df["number_of_lcg"] = np.where( | |
| kpi_df["avg_lcg1"].notna() & kpi_df["avg_lcg2"].notna(), | |
| 2, | |
| np.where(kpi_df["avg_lcg1"].notna() | kpi_df["avg_lcg2"].notna(), 1, 0), | |
| ) | |
| # Combine comments | |
| kpi_df = combine_comments( | |
| kpi_df, | |
| "lcg_comment", | |
| "number_of_lcg", | |
| new_column="final_comments", | |
| ) | |
| kpi_df["final_comments"] = kpi_df["final_comments"].apply( | |
| lambda x: lcg_comments_mapping.get(x, x) | |
| ) | |
| kpi_df = kpi_df[LCG_ANALYSIS_COLUMNS] | |
| lcg_analysis_df = kpi_df.copy() | |
| lcg_analysis_df = lcg_analysis_df[ | |
| [ | |
| "WBTS_name", | |
| "avg_lcg1", | |
| "max_lcg1", | |
| "number_of_days_with_lcg1_exceeded", | |
| "lcg1_comment", | |
| "avg_lcg2", | |
| "max_lcg2", | |
| "number_of_days_with_lcg2_exceeded", | |
| "lcg2_comment", | |
| "difference_between_lcgs", | |
| "final_comments", | |
| ] | |
| ] | |
| lcg_analysis_df = lcg_analysis_df.droplevel(level=1, axis=1) | |
| # Remove row if code less than 5 characters | |
| lcg_analysis_df = lcg_analysis_df[lcg_analysis_df["WBTS_name"].str.len() >= 5] | |
| # Add code | |
| lcg_analysis_df["code"] = lcg_analysis_df["WBTS_name"].str.split("_").str[0] | |
| lcg_analysis_df["code"] = ( | |
| pd.to_numeric(lcg_analysis_df["code"], errors="coerce").fillna(0).astype(int) | |
| ) | |
| lcg_analysis_df["Region"] = ( | |
| lcg_analysis_df["WBTS_name"].str.split("_").str[1:2].str.join("_") | |
| ) | |
| lcg_analysis_df["Region"] = lcg_analysis_df["Region"].fillna("UNKNOWN") | |
| # move code to the first column | |
| lcg_analysis_df = lcg_analysis_df[ | |
| ["code", "Region"] | |
| + [col for col in lcg_analysis_df if col != "code" and col != "Region"] | |
| ] | |
| # Load physical database | |
| physical_db: pd.DataFrame = get_physical_db() | |
| # Convert code_sector to code | |
| physical_db["code"] = physical_db["Code_Sector"].str.split("_").str[0] | |
| # remove duplicates | |
| physical_db = physical_db.drop_duplicates(subset="code") | |
| # keep only code and longitude and latitude | |
| physical_db = physical_db[["code", "Longitude", "Latitude"]] | |
| physical_db["code"] = ( | |
| pd.to_numeric(physical_db["code"], errors="coerce").fillna(0).astype(int) | |
| ) | |
| lcg_analysis_df = pd.merge( | |
| lcg_analysis_df, | |
| physical_db, | |
| on="code", | |
| how="left", | |
| ) | |
| return [lcg_analysis_df, kpi_df] | |
| def load_and_process_lcg_data( | |
| uploaded_file, | |
| num_last_days, | |
| num_threshold_days, | |
| lcg_utilization_threshold, | |
| difference_between_lcgs, | |
| ) -> pd.DataFrame: | |
| """Load and process data for LCG capacity analysis.""" | |
| try: | |
| # Load data | |
| df = pd.read_csv(uploaded_file, delimiter=";") | |
| if df.empty: | |
| raise ValueError("Uploaded file is empty") | |
| df = kpi_naming_cleaning(df) | |
| df = create_daily_date(df) | |
| # Validate required columns | |
| missing_cols = [col for col in KPI_COLUMNS if col not in df.columns] | |
| if missing_cols: | |
| raise ValueError(f"Missing required columns: {', '.join(missing_cols)}") | |
| df = df[KPI_COLUMNS] | |
| # Process the data | |
| dfs = lcg_kpi_analysis( | |
| df, | |
| num_last_days, | |
| num_threshold_days, | |
| lcg_utilization_threshold, | |
| difference_between_lcgs, | |
| ) | |
| return dfs | |
| except Exception as e: | |
| # Log the error and re-raise with a user-friendly message | |
| error_msg = f"Error processing LCG data: {str(e)}" | |
| st.error(error_msg) | |
| raise | |