import io import zipfile from datetime import datetime, timedelta from pathlib import Path import numpy as np import pandas as pd import plotly.express as px import streamlit as st from utils.convert_to_excel import convert_dfs, save_dataframe from utils.utils_vars import get_physical_db def read_uploaded_file(uploaded_file): """Read uploaded file, handling both ZIP and CSV formats. Args: uploaded_file: Uploaded file object from Streamlit Returns: pd.DataFrame: DataFrame containing the data from the uploaded file """ if uploaded_file.name.endswith(".zip"): with zipfile.ZipFile(io.BytesIO(uploaded_file.getvalue())) as z: # Get the first CSV file in the zip csv_files = [f for f in z.namelist() if f.lower().endswith(".csv")] if not csv_files: raise ValueError("No CSV file found in the ZIP archive") with z.open(csv_files[0]) as f: return pd.read_csv(f, encoding="latin1", sep=";", low_memory=False) elif uploaded_file.name.endswith(".csv"): return pd.read_csv(uploaded_file, encoding="latin1", sep=";", low_memory=False) else: raise ValueError("Unsupported file format. Please upload a ZIP or CSV file.") class TraficAnalysis: last_period_df: pd.DataFrame = None ############### PROCESSING ############### def extract_code(name): name = name.replace(" ", "_") if isinstance(name, str) else None if name and len(name) >= 10: try: return int(name.split("_")[0]) except ValueError: return None return None def preprocess_2g(df: pd.DataFrame) -> pd.DataFrame: df = df[df["BCF name"].str.len() >= 10].copy() df["2g_data_trafic"] = ((df["TRAFFIC_PS DL"] + df["PS_UL_Load"]) / 1000).round(1) df.rename(columns={"2G_Carried Traffic": "2g_voice_trafic"}, inplace=True) df["code"] = df["BCF name"].apply(extract_code) df["code"] = pd.to_numeric(df["code"], errors="coerce") df = df[df["code"].notna()] df["code"] = df["code"].astype(int) date_format = ( "%m.%d.%Y %H:%M:%S" if len(df["PERIOD_START_TIME"].iat[0]) > 10 else "%m.%d.%Y" ) df["date"] = pd.to_datetime(df["PERIOD_START_TIME"], format=date_format) df["ID"] = df["date"].astype(str) + "_" + df["code"].astype(str) if "TCH availability ratio" in df.columns: df["2g_tch_avail"] = pd.to_numeric( df["TCH availability ratio"], errors="coerce" ) agg_dict = { "2g_data_trafic": "sum", "2g_voice_trafic": "sum", } if "2g_tch_avail" in df.columns: agg_dict["2g_tch_avail"] = "mean" df = df.groupby(["date", "ID", "code"], as_index=False).agg(agg_dict) return df def preprocess_3g(df: pd.DataFrame) -> pd.DataFrame: df = df[df["WBTS name"].str.len() >= 10].copy() df["code"] = df["WBTS name"].apply(extract_code) df["code"] = pd.to_numeric(df["code"], errors="coerce") df = df[df["code"].notna()] df["code"] = df["code"].astype(int) date_format = ( "%m.%d.%Y %H:%M:%S" if len(df["PERIOD_START_TIME"].iat[0]) > 10 else "%m.%d.%Y" ) df["date"] = pd.to_datetime(df["PERIOD_START_TIME"], format=date_format) df["ID"] = df["date"].astype(str) + "_" + df["code"].astype(str) df.rename( columns={ "Total CS traffic - Erl": "3g_voice_trafic", "Total_Data_Traffic": "3g_data_trafic", }, inplace=True, ) kpi_col = None for col in df.columns: if "cell availability" in str(col).lower(): kpi_col = col break if kpi_col is not None: df["3g_cell_avail"] = pd.to_numeric(df[kpi_col], errors="coerce") agg_dict = { "3g_voice_trafic": "sum", "3g_data_trafic": "sum", } if "3g_cell_avail" in df.columns: agg_dict["3g_cell_avail"] = "mean" df = df.groupby(["date", "ID", "code"], as_index=False).agg(agg_dict) return df def preprocess_lte(df: pd.DataFrame) -> pd.DataFrame: df = df[df["LNBTS name"].str.len() >= 10].copy() df["lte_data_trafic"] = ( df["4G/LTE DL Traffic Volume (GBytes)"] + df["4G/LTE UL Traffic Volume (GBytes)"] ) df["code"] = df["LNBTS name"].apply(extract_code) df["code"] = pd.to_numeric(df["code"], errors="coerce") df = df[df["code"].notna()] df["code"] = df["code"].astype(int) date_format = ( "%m.%d.%Y %H:%M:%S" if len(df["PERIOD_START_TIME"].iat[0]) > 10 else "%m.%d.%Y" ) df["date"] = pd.to_datetime(df["PERIOD_START_TIME"], format=date_format) df["ID"] = df["date"].astype(str) + "_" + df["code"].astype(str) if "Cell Avail excl BLU" in df.columns: df["lte_cell_avail"] = pd.to_numeric(df["Cell Avail excl BLU"], errors="coerce") agg_dict = {"lte_data_trafic": "sum"} if "lte_cell_avail" in df.columns: agg_dict["lte_cell_avail"] = "mean" df = df.groupby(["date", "ID", "code"], as_index=False).agg(agg_dict) return df ############################## ANALYSIS ################ def merge_and_compare(df_2g, df_3g, df_lte, pre_range, post_range, last_period_range): # Load physical database physical_db = get_physical_db() physical_db["code"] = physical_db["Code_Sector"].str.split("_").str[0] physical_db["code"] = ( pd.to_numeric(physical_db["code"], errors="coerce").fillna(0).astype(int) ) physical_db = physical_db[["code", "Longitude", "Latitude", "City"]] physical_db = physical_db.drop_duplicates(subset="code") df = pd.merge(df_2g, df_3g, on=["date", "ID", "code"], how="outer") df = pd.merge(df, df_lte, on=["date", "ID", "code"], how="outer") # print(df) for col in [ "2g_data_trafic", "2g_voice_trafic", "3g_voice_trafic", "3g_data_trafic", "lte_data_trafic", ]: if col not in df: df[col] = 0 kpi_masks = {} for kpi_col in ["2g_tch_avail", "3g_cell_avail", "lte_cell_avail"]: if kpi_col in df.columns: kpi_masks[kpi_col] = df[kpi_col].notna() df.fillna(0, inplace=True) for kpi_col, mask in kpi_masks.items(): df.loc[~mask, kpi_col] = np.nan df["total_voice_trafic"] = df["2g_voice_trafic"] + df["3g_voice_trafic"] df["total_data_trafic"] = ( df["2g_data_trafic"] + df["3g_data_trafic"] + df["lte_data_trafic"] ) df = pd.merge(df, physical_db, on=["code"], how="left") # Assign period based on date range pre_start, pre_end = pd.to_datetime(pre_range[0]), pd.to_datetime(pre_range[1]) post_start, post_end = pd.to_datetime(post_range[0]), pd.to_datetime(post_range[1]) last_period_start, last_period_end = pd.to_datetime( last_period_range[0] ), pd.to_datetime(last_period_range[1]) last_period = df[ (df["date"] >= last_period_start) & (df["date"] <= last_period_end) ] def assign_period(date): if pre_start <= date <= pre_end: return "pre" elif post_start <= date <= post_end: return "post" else: return "other" df["period"] = df["date"].apply(assign_period) comparison = df[df["period"].isin(["pre", "post"])] sum_pivot = ( comparison.groupby(["code", "period"])[ ["total_voice_trafic", "total_data_trafic"] ] .sum() .unstack() ) sum_pivot.columns = [f"{metric}_{period}" for metric, period in sum_pivot.columns] sum_pivot = sum_pivot.reset_index() # Differences sum_pivot["total_voice_trafic_diff"] = ( sum_pivot["total_voice_trafic_post"] - sum_pivot["total_voice_trafic_pre"] ) sum_pivot["total_data_trafic_diff"] = ( sum_pivot["total_data_trafic_post"] - sum_pivot["total_data_trafic_pre"] ) for metric in ["total_voice_trafic", "total_data_trafic"]: sum_pivot[f"{metric}_diff_pct"] = ( (sum_pivot.get(f"{metric}_post", 0) - sum_pivot.get(f"{metric}_pre", 0)) / sum_pivot.get(f"{metric}_pre", 1) ) * 100 # Reorder sum_pivot columns: voice before data, pre before post sum_order = [ "code", "total_voice_trafic_pre", "total_voice_trafic_post", "total_voice_trafic_diff", "total_voice_trafic_diff_pct", "total_data_trafic_pre", "total_data_trafic_post", "total_data_trafic_diff", "total_data_trafic_diff_pct", ] sum_existing_cols = [col for col in sum_order if col in sum_pivot.columns] sum_remaining_cols = [ col for col in sum_pivot.columns if col not in sum_existing_cols ] sum_pivot = sum_pivot[sum_existing_cols + sum_remaining_cols] avg_pivot = ( comparison.groupby(["code", "period"])[ ["total_voice_trafic", "total_data_trafic"] ] .mean() .unstack() ) avg_pivot.columns = [f"{metric}_{period}" for metric, period in avg_pivot.columns] avg_pivot = avg_pivot.reset_index() # Differences avg_pivot["total_voice_trafic_diff"] = ( avg_pivot["total_voice_trafic_post"] - avg_pivot["total_voice_trafic_pre"] ) avg_pivot["total_data_trafic_diff"] = ( avg_pivot["total_data_trafic_post"] - avg_pivot["total_data_trafic_pre"] ) for metric in ["total_voice_trafic", "total_data_trafic"]: avg_pivot[f"{metric}_diff_pct"] = ( (avg_pivot.get(f"{metric}_post", 0) - avg_pivot.get(f"{metric}_pre", 0)) / avg_pivot.get(f"{metric}_pre", 1) ) * 100 # rename avg_pivot columns avg_pivot = avg_pivot.rename( columns={ "total_voice_trafic_pre": "avg_voice_trafic_pre", "total_voice_trafic_post": "avg_voice_trafic_post", "total_voice_trafic_diff": "avg_voice_trafic_diff", "total_voice_trafic_diff_pct": "avg_voice_trafic_diff_pct", "total_data_trafic_pre": "avg_data_trafic_pre", "total_data_trafic_post": "avg_data_trafic_post", "total_data_trafic_diff": "avg_data_trafic_diff", "total_data_trafic_diff_pct": "avg_data_trafic_diff_pct", } ) # Reorder avg_pivot columns: voice before data, pre before post avg_order = [ "code", "avg_voice_trafic_pre", "avg_voice_trafic_post", "avg_voice_trafic_diff", "avg_voice_trafic_diff_pct", "avg_data_trafic_pre", "avg_data_trafic_post", "avg_data_trafic_diff", "avg_data_trafic_diff_pct", ] avg_existing_cols = [col for col in avg_order if col in avg_pivot.columns] avg_remaining_cols = [ col for col in avg_pivot.columns if col not in avg_existing_cols ] avg_pivot = avg_pivot[avg_existing_cols + avg_remaining_cols] return df, last_period, sum_pivot.round(2), avg_pivot.round(2) def analyze_2g_availability(df: pd.DataFrame, sla_2g: float): avail_col = "2g_tch_avail" if avail_col not in df.columns or "period" not in df.columns: return None, None df_2g = df[df[avail_col].notna()].copy() df_2g = df_2g[df_2g["period"].isin(["pre", "post"])] if df_2g.empty: return None, None site_pivot = df_2g.groupby(["code", "period"])[avail_col].mean().unstack() site_pivot = site_pivot.rename( columns={"pre": "tch_avail_pre", "post": "tch_avail_post"} ) if "tch_avail_pre" not in site_pivot.columns: site_pivot["tch_avail_pre"] = pd.NA if "tch_avail_post" not in site_pivot.columns: site_pivot["tch_avail_post"] = pd.NA site_pivot["tch_avail_diff"] = ( site_pivot["tch_avail_post"] - site_pivot["tch_avail_pre"] ) site_pivot["pre_ok_vs_sla"] = site_pivot["tch_avail_pre"] >= sla_2g site_pivot["post_ok_vs_sla"] = site_pivot["tch_avail_post"] >= sla_2g site_pivot = site_pivot.reset_index() summary_rows = [] for period_label, col_name in [ ("pre", "tch_avail_pre"), ("post", "tch_avail_post"), ]: series = site_pivot[col_name].dropna() total_cells = series.shape[0] if total_cells == 0: summary_rows.append( { "period": period_label, "cells": 0, "avg_availability": pd.NA, "median_availability": pd.NA, "p05_availability": pd.NA, "p95_availability": pd.NA, "min_availability": pd.NA, "max_availability": pd.NA, "cells_ge_sla": 0, "cells_lt_sla": 0, "pct_cells_ge_sla": pd.NA, } ) continue cells_ge_sla = (series >= sla_2g).sum() cells_lt_sla = (series < sla_2g).sum() summary_rows.append( { "period": period_label, "cells": int(total_cells), "avg_availability": series.mean(), "median_availability": series.median(), "p05_availability": series.quantile(0.05), "p95_availability": series.quantile(0.95), "min_availability": series.min(), "max_availability": series.max(), "cells_ge_sla": int(cells_ge_sla), "cells_lt_sla": int(cells_lt_sla), "pct_cells_ge_sla": cells_ge_sla / total_cells * 100, } ) summary_df = pd.DataFrame(summary_rows) return summary_df, site_pivot def analyze_3g_availability(df: pd.DataFrame, sla_3g: float): avail_col = "3g_cell_avail" if avail_col not in df.columns or "period" not in df.columns: return None, None df_3g = df[df[avail_col].notna()].copy() df_3g = df_3g[df_3g["period"].isin(["pre", "post"])] if df_3g.empty: return None, None site_pivot = df_3g.groupby(["code", "period"])[avail_col].mean().unstack() site_pivot = site_pivot.rename( columns={"pre": "cell_avail_pre", "post": "cell_avail_post"} ) if "cell_avail_pre" not in site_pivot.columns: site_pivot["cell_avail_pre"] = pd.NA if "cell_avail_post" not in site_pivot.columns: site_pivot["cell_avail_post"] = pd.NA site_pivot["cell_avail_diff"] = ( site_pivot["cell_avail_post"] - site_pivot["cell_avail_pre"] ) site_pivot["pre_ok_vs_sla"] = site_pivot["cell_avail_pre"] >= sla_3g site_pivot["post_ok_vs_sla"] = site_pivot["cell_avail_post"] >= sla_3g site_pivot = site_pivot.reset_index() summary_rows = [] for period_label, col_name in [ ("pre", "cell_avail_pre"), ("post", "cell_avail_post"), ]: series = site_pivot[col_name].dropna() total_cells = series.shape[0] if total_cells == 0: summary_rows.append( { "period": period_label, "cells": 0, "avg_availability": pd.NA, "median_availability": pd.NA, "p05_availability": pd.NA, "p95_availability": pd.NA, "min_availability": pd.NA, "max_availability": pd.NA, "cells_ge_sla": 0, "cells_lt_sla": 0, "pct_cells_ge_sla": pd.NA, } ) continue cells_ge_sla = (series >= sla_3g).sum() cells_lt_sla = (series < sla_3g).sum() summary_rows.append( { "period": period_label, "cells": int(total_cells), "avg_availability": series.mean(), "median_availability": series.median(), "p05_availability": series.quantile(0.05), "p95_availability": series.quantile(0.95), "min_availability": series.min(), "max_availability": series.max(), "cells_ge_sla": int(cells_ge_sla), "cells_lt_sla": int(cells_lt_sla), "pct_cells_ge_sla": cells_ge_sla / total_cells * 100, } ) summary_df = pd.DataFrame(summary_rows) return summary_df, site_pivot def analyze_lte_availability(df: pd.DataFrame, sla_lte: float): avail_col = "lte_cell_avail" if avail_col not in df.columns or "period" not in df.columns: return None, None df_lte = df[df[avail_col].notna()].copy() df_lte = df_lte[df_lte["period"].isin(["pre", "post"])] if df_lte.empty: return None, None site_pivot = df_lte.groupby(["code", "period"])[avail_col].mean().unstack() site_pivot = site_pivot.rename( columns={"pre": "lte_avail_pre", "post": "lte_avail_post"} ) if "lte_avail_pre" not in site_pivot.columns: site_pivot["lte_avail_pre"] = pd.NA if "lte_avail_post" not in site_pivot.columns: site_pivot["lte_avail_post"] = pd.NA site_pivot["lte_avail_diff"] = ( site_pivot["lte_avail_post"] - site_pivot["lte_avail_pre"] ) site_pivot["pre_ok_vs_sla"] = site_pivot["lte_avail_pre"] >= sla_lte site_pivot["post_ok_vs_sla"] = site_pivot["lte_avail_post"] >= sla_lte site_pivot = site_pivot.reset_index() summary_rows = [] for period_label, col_name in [ ("pre", "lte_avail_pre"), ("post", "lte_avail_post"), ]: series = site_pivot[col_name].dropna() total_cells = series.shape[0] if total_cells == 0: summary_rows.append( { "period": period_label, "cells": 0, "avg_availability": pd.NA, "median_availability": pd.NA, "p05_availability": pd.NA, "p95_availability": pd.NA, "min_availability": pd.NA, "max_availability": pd.NA, "cells_ge_sla": 0, "cells_lt_sla": 0, "pct_cells_ge_sla": pd.NA, } ) continue cells_ge_sla = (series >= sla_lte).sum() cells_lt_sla = (series < sla_lte).sum() summary_rows.append( { "period": period_label, "cells": int(total_cells), "avg_availability": series.mean(), "median_availability": series.median(), "p05_availability": series.quantile(0.05), "p95_availability": series.quantile(0.95), "min_availability": series.min(), "max_availability": series.max(), "cells_ge_sla": int(cells_ge_sla), "cells_lt_sla": int(cells_lt_sla), "pct_cells_ge_sla": cells_ge_sla / total_cells * 100, } ) summary_df = pd.DataFrame(summary_rows) return summary_df, site_pivot def analyze_multirat_availability( df: pd.DataFrame, sla_2g: float, sla_3g: float, sla_lte: float ): if "period" not in df.columns: return None rat_cols = [] if "2g_tch_avail" in df.columns: rat_cols.append("2g_tch_avail") if "3g_cell_avail" in df.columns: rat_cols.append("3g_cell_avail") if "lte_cell_avail" in df.columns: rat_cols.append("lte_cell_avail") if not rat_cols: return None agg_dict = {col: "mean" for col in rat_cols} df_pre = df[df["period"] == "pre"] df_post = df[df["period"] == "post"] pre = df_pre.groupby("code", as_index=False).agg(agg_dict) post = df_post.groupby("code", as_index=False).agg(agg_dict) rename_map_pre = { "2g_tch_avail": "2g_avail_pre", "3g_cell_avail": "3g_avail_pre", "lte_cell_avail": "lte_avail_pre", } rename_map_post = { "2g_tch_avail": "2g_avail_post", "3g_cell_avail": "3g_avail_post", "lte_cell_avail": "lte_avail_post", } pre = pre.rename(columns=rename_map_pre) post = post.rename(columns=rename_map_post) multi = pd.merge(pre, post, on="code", how="outer") # Add post-period total traffic per site if not df_post.empty and { "total_voice_trafic", "total_data_trafic", }.issubset(df_post.columns): post_traffic = ( df_post.groupby("code", as_index=False)[ ["total_voice_trafic", "total_data_trafic"] ] .sum() .rename( columns={ "total_voice_trafic": "post_total_voice_trafic", "total_data_trafic": "post_total_data_trafic", } ) ) multi = pd.merge(multi, post_traffic, on="code", how="left") if "City" in df.columns: city_df = df[["code", "City"]].drop_duplicates("code") multi = pd.merge(multi, city_df, on="code", how="left") # Compute OK/Not OK flags vs SLA on post-period def _ok_flag(series: pd.Series, sla: float) -> pd.Series: if series.name not in multi.columns: return pd.Series([pd.NA] * len(multi), index=multi.index) ok = multi[series.name] >= sla ok = ok.where(multi[series.name].notna(), pd.NA) return ok if "2g_avail_post" in multi.columns: multi["ok_2g_post"] = _ok_flag(multi["2g_avail_post"], sla_2g) if "3g_avail_post" in multi.columns: multi["ok_3g_post"] = _ok_flag(multi["3g_avail_post"], sla_3g) if "lte_avail_post" in multi.columns: multi["ok_lte_post"] = _ok_flag(multi["lte_avail_post"], sla_lte) def classify_row(row): rats_status = [] for rat, col in [ ("2G", "ok_2g_post"), ("3G", "ok_3g_post"), ("LTE", "ok_lte_post"), ]: if col in row and not pd.isna(row[col]): rats_status.append((rat, bool(row[col]))) if not rats_status: return "No RAT data" bad_rats = [rat for rat, ok in rats_status if not ok] if not bad_rats: return "OK all RAT" if len(bad_rats) == 1: return f"Degraded {bad_rats[0]} only" return "Degraded multi-RAT (" + ",".join(bad_rats) + ")" multi["post_multirat_status"] = multi.apply(classify_row, axis=1) # Order columns for readability ordered_cols = ["code"] if "City" in multi.columns: ordered_cols.append("City") for col in [ "2g_avail_pre", "2g_avail_post", "3g_avail_pre", "3g_avail_post", "lte_avail_pre", "lte_avail_post", "post_total_voice_trafic", "post_total_data_trafic", "ok_2g_post", "ok_3g_post", "ok_lte_post", "post_multirat_status", ]: if col in multi.columns: ordered_cols.append(col) remaining_cols = [c for c in multi.columns if c not in ordered_cols] multi = multi[ordered_cols + remaining_cols] return multi def analyze_persistent_availability( df: pd.DataFrame, multi_rat_df: pd.DataFrame, sla_2g: float, sla_3g: float, sla_lte: float, min_consecutive_days: int = 3, ) -> pd.DataFrame: if df is None or df.empty: return pd.DataFrame() if "date" not in df.columns or "code" not in df.columns: return pd.DataFrame() work_df = df.copy() work_df["date_only"] = work_df["date"].dt.date site_stats = {} def _update_stats(rat_key_prefix: str, grouped: pd.DataFrame, sla: float) -> None: if grouped.empty: return for code, group in grouped.groupby("code"): group = group.sort_values("date_only") dates = pd.to_datetime(group["date_only"]).tolist() below_flags = (group["value"] < sla).tolist() max_streak = 0 current_streak = 0 total_below = 0 last_date = None for flag, current_date in zip(below_flags, dates): if flag: total_below += 1 if ( last_date is not None and current_date == last_date + timedelta(days=1) and current_streak > 0 ): current_streak += 1 else: current_streak = 1 if current_streak > max_streak: max_streak = current_streak else: current_streak = 0 last_date = current_date stats = site_stats.setdefault( code, { "code": code, "max_streak_2g": 0, "max_streak_3g": 0, "max_streak_lte": 0, "below_days_2g": 0, "below_days_3g": 0, "below_days_lte": 0, }, ) stats[f"max_streak_{rat_key_prefix}"] = max_streak stats[f"below_days_{rat_key_prefix}"] = total_below for rat_col, rat_key, sla in [ ("2g_tch_avail", "2g", sla_2g), ("3g_cell_avail", "3g", sla_3g), ("lte_cell_avail", "lte", sla_lte), ]: if rat_col in work_df.columns: g = ( work_df.dropna(subset=[rat_col]) .groupby(["code", "date_only"])[rat_col] .mean() .reset_index() ) g = g.rename(columns={rat_col: "value"}) _update_stats(rat_key, g, sla) if not site_stats: return pd.DataFrame() rows = [] for code, s in site_stats.items(): max_2g = s.get("max_streak_2g", 0) max_3g = s.get("max_streak_3g", 0) max_lte = s.get("max_streak_lte", 0) below_2g = s.get("below_days_2g", 0) below_3g = s.get("below_days_3g", 0) below_lte = s.get("below_days_lte", 0) persistent_2g = max_2g >= min_consecutive_days if max_2g else False persistent_3g = max_3g >= min_consecutive_days if max_3g else False persistent_lte = max_lte >= min_consecutive_days if max_lte else False total_below_any = below_2g + below_3g + below_lte persistent_any = persistent_2g or persistent_3g or persistent_lte rats_persistent_count = sum( [persistent_2g is True, persistent_3g is True, persistent_lte is True] ) rows.append( { "code": code, "persistent_issue_2g": persistent_2g, "persistent_issue_3g": persistent_3g, "persistent_issue_lte": persistent_lte, "max_consecutive_days_2g": max_2g, "max_consecutive_days_3g": max_3g, "max_consecutive_days_lte": max_lte, "total_below_days_2g": below_2g, "total_below_days_3g": below_3g, "total_below_days_lte": below_lte, "total_below_days_any": total_below_any, "persistent_issue_any": persistent_any, "persistent_rats_count": rats_persistent_count, } ) result = pd.DataFrame(rows) result = result[result["persistent_issue_any"] == True] if result.empty: return result if multi_rat_df is not None and not multi_rat_df.empty: cols_to_merge = [ c for c in [ "code", "City", "post_total_voice_trafic", "post_total_data_trafic", "post_multirat_status", ] if c in multi_rat_df.columns ] if cols_to_merge: result = pd.merge( result, multi_rat_df[cols_to_merge].drop_duplicates("code"), on="code", how="left", ) if "post_total_data_trafic" not in result.columns: result["post_total_data_trafic"] = 0.0 result["criticity_score"] = ( result["post_total_data_trafic"].fillna(0) * 1.0 + result["total_below_days_any"].fillna(0) * 100.0 + result["persistent_rats_count"].fillna(0) * 1000.0 ) result = result.sort_values( by=["criticity_score", "total_below_days_any"], ascending=[False, False] ) return result def monthly_data_analysis(df: pd.DataFrame) -> pd.DataFrame: df["date"] = pd.to_datetime(df["date"]) # Create column 'YYYY-MM' for grouping by month while keeping the year df["month_year"] = df["date"].dt.to_period("M").astype(str) # Pivot : lines = code, columns = month_year, values = sum voice_trafic = df.pivot_table( index="code", columns="month_year", values="total_voice_trafic", aggfunc="sum", fill_value=0, ) # Sort columns chronologically voice_trafic = voice_trafic.reindex(sorted(voice_trafic.columns), axis=1) data_trafic = df.pivot_table( index="code", columns="month_year", values="total_data_trafic", aggfunc="sum", fill_value=0, ) # Sort columns chronologically data_trafic = data_trafic.reindex(sorted(data_trafic.columns), axis=1) # Display result return voice_trafic, data_trafic ############################## UI ######################### st.title("📊 Global Trafic Analysis - 2G / 3G / LTE") doc_col, image_col = st.columns(2) with doc_col: st.write( """ The report analyzes 2G / 3G / LTE traffic : - 2G Traffic Report in CSV format (required columns : BCF name, PERIOD_START_TIME, TRAFFIC_PS DL, PS_UL_Load) - 3G Traffic Report in CSV format (required columns : WBTS name, PERIOD_START_TIME, Total CS traffic - Erl, Total_Data_Traffic) - LTE Traffic Report in CSV format (required columns : LNBTS name, PERIOD_START_TIME, 4G/LTE DL Traffic Volume (GBytes), 4G/LTE UL Traffic Volume (GBytes)) """ ) # with image_col: # st.image("./assets/trafic_analysis.png", width=250) upload_2g_col, upload_3g_col, upload_lte_col = st.columns(3) with upload_2g_col: two_g_file = st.file_uploader("Upload 2G Traffic Report", type=["csv", "zip"]) with upload_3g_col: three_g_file = st.file_uploader("Upload 3G Traffic Report", type=["csv", "zip"]) with upload_lte_col: lte_file = st.file_uploader("Upload LTE Traffic Report", type=["csv", "zip"]) pre_range_col, post_range_col = st.columns(2) with pre_range_col: pre_range = st.date_input("Pre-period (from - to)", []) with post_range_col: post_range = st.date_input("Post-period (from - to)", []) last_period_range_col, number_of_top_trafic_sites_col = st.columns(2) with last_period_range_col: last_period_range = st.date_input("Last period (from - to)", []) with number_of_top_trafic_sites_col: number_of_top_trafic_sites = st.number_input( "Number of top traffic sites", value=25 ) sla_2g_col, sla_3g_col, sla_lte_col = st.columns(3) with sla_2g_col: sla_2g = st.number_input("2G TCH availability SLA (%)", value=98.0) with sla_3g_col: sla_3g = st.number_input("3G Cell availability SLA (%)", value=98.0) with sla_lte_col: sla_lte = st.number_input("LTE Cell availability SLA (%)", value=98.0) if len(pre_range) != 2 or len(post_range) != 2: st.warning("⚠️ Please select 2 dates for each period (pre and post).") st.stop() if not all([two_g_file, three_g_file, lte_file]): st.info("Please upload all 3 reports and select the comparison periods.") st.stop() # Warning if pre and post periode are the same if pre_range == post_range: st.warning("⚠️ Pre and post periode are the same.") st.stop() # Warning if pre and post are overlapping if pre_range[0] < post_range[0] and pre_range[1] > post_range[1]: st.warning(" Pre and post periode are overlapping.") st.stop() run_analysis = st.button(" Run Analysis") if run_analysis: df_2g = read_uploaded_file(two_g_file) df_3g = read_uploaded_file(three_g_file) df_lte = read_uploaded_file(lte_file) df_2g_clean = preprocess_2g(df_2g) df_3g_clean = preprocess_3g(df_3g) df_lte_clean = preprocess_lte(df_lte) full_df, last_period, sum_pre_post_analysis, avg_pre_post_analysis = ( merge_and_compare( df_2g_clean, df_3g_clean, df_lte_clean, pre_range, post_range, last_period_range, ) ) monthly_voice_df, monthly_data_df = monthly_data_analysis(full_df) st.session_state["full_df"] = full_df st.session_state["last_period"] = last_period st.session_state["sum_pre_post_analysis"] = sum_pre_post_analysis st.session_state["avg_pre_post_analysis"] = avg_pre_post_analysis st.session_state["monthly_voice_df"] = monthly_voice_df st.session_state["monthly_data_df"] = monthly_data_df if "full_df" in st.session_state: full_df = st.session_state["full_df"] last_period = st.session_state["last_period"] sum_pre_post_analysis = st.session_state["sum_pre_post_analysis"] avg_pre_post_analysis = st.session_state["avg_pre_post_analysis"] monthly_voice_df = st.session_state["monthly_voice_df"] monthly_data_df = st.session_state["monthly_data_df"] full_df["week"] = full_df["date"].dt.isocalendar().week full_df["year"] = full_df["date"].dt.isocalendar().year analysis_df = full_df analysis_last_period = last_period if "City" in full_df.columns: available_cities = full_df["City"].dropna().unique() if len(available_cities) > 0: selected_cities = st.multiselect( "Filter analysis by City (optional)", sorted(available_cities), ) if selected_cities: analysis_df = full_df[full_df["City"].isin(selected_cities)].copy() analysis_last_period = last_period[ last_period["City"].isin(selected_cities) ].copy() # Display Summary st.success(" Analysis completed") st.subheader(" Summary Analysis Pre / Post") st.dataframe(sum_pre_post_analysis) summary_2g_avail, site_2g_avail = analyze_2g_availability(analysis_df, sla_2g) if summary_2g_avail is not None: st.subheader("2G - TCH Availability vs SLA") st.write(f"SLA target 2G TCH availability: {sla_2g}%") st.dataframe(summary_2g_avail.round(2)) st.subheader("2G - TCH Availability by site (worst 25 by post-period)") worst_sites_2g = site_2g_avail.sort_values("tch_avail_post").head(25) st.dataframe(worst_sites_2g.round(2)) else: st.info( "2G TCH availability KPI not found in input report or no data for selected periods." ) summary_3g_avail, site_3g_avail = analyze_3g_availability(analysis_df, sla_3g) if summary_3g_avail is not None: st.subheader("3G - Cell Availability vs SLA") st.write(f"SLA target 3G Cell availability: {sla_3g}%") st.dataframe(summary_3g_avail.round(2)) st.subheader("3G - Cell Availability by site (worst 25 by post-period)") worst_sites_3g = site_3g_avail.sort_values("cell_avail_post").head(25) st.dataframe(worst_sites_3g.round(2)) else: st.info( "3G Cell Availability KPI not found in input report or no data for selected periods." ) summary_lte_avail, site_lte_avail = analyze_lte_availability(analysis_df, sla_lte) if summary_lte_avail is not None: st.subheader("LTE - Cell Availability vs SLA") st.write(f"SLA target LTE Cell availability: {sla_lte}%") st.dataframe(summary_lte_avail.round(2)) st.subheader("LTE - Cell Availability by site (worst 25 by post-period)") worst_sites_lte = site_lte_avail.sort_values("lte_avail_post").head(25) st.dataframe(worst_sites_lte.round(2)) else: st.info( "LTE Cell Availability KPI not found in input report or no data for selected periods." ) # Multi-RAT availability view multi_rat_df = analyze_multirat_availability(analysis_df, sla_2g, sla_3g, sla_lte) if multi_rat_df is not None: st.subheader("Multi-RAT Availability by site (post-period)") st.dataframe(multi_rat_df.round(2)) worst_2g = None if ( "2g_avail_post" in multi_rat_df.columns and "ok_2g_post" in multi_rat_df.columns and "post_total_data_trafic" in multi_rat_df.columns ): tmp = multi_rat_df[ (multi_rat_df["ok_2g_post"] == False) & multi_rat_df["post_total_data_trafic"].notna() ].copy() if not tmp.empty: worst_2g = tmp.sort_values( "post_total_data_trafic", ascending=False ).head(number_of_top_trafic_sites) worst_3g = None if ( "3g_avail_post" in multi_rat_df.columns and "ok_3g_post" in multi_rat_df.columns and "post_total_data_trafic" in multi_rat_df.columns ): tmp = multi_rat_df[ (multi_rat_df["ok_3g_post"] == False) & multi_rat_df["post_total_data_trafic"].notna() ].copy() if not tmp.empty: worst_3g = tmp.sort_values( "post_total_data_trafic", ascending=False ).head(number_of_top_trafic_sites) worst_lte = None if ( "lte_avail_post" in multi_rat_df.columns and "ok_lte_post" in multi_rat_df.columns and "post_total_data_trafic" in multi_rat_df.columns ): tmp = multi_rat_df[ (multi_rat_df["ok_lte_post"] == False) & multi_rat_df["post_total_data_trafic"].notna() ].copy() if not tmp.empty: worst_lte = tmp.sort_values( "post_total_data_trafic", ascending=False ).head(number_of_top_trafic_sites) st.subheader( f"Worst high-traffic & low-availability sites by RAT (top {number_of_top_trafic_sites}, post-period)" ) tab_2g, tab_3g, tab_lte = st.tabs(["2G", "3G", "LTE"]) with tab_2g: if worst_2g is not None and not worst_2g.empty: st.dataframe(worst_2g.round(2)) else: st.info( "No 2G sites with low availability and significant traffic in post-period." ) with tab_3g: if worst_3g is not None and not worst_3g.empty: st.dataframe(worst_3g.round(2)) else: st.info( "No 3G sites with low availability and significant traffic in post-period." ) with tab_lte: if worst_lte is not None and not worst_lte.empty: st.dataframe(worst_lte.round(2)) else: st.info( "No LTE sites with low availability and significant traffic in post-period." ) st.subheader("Persistent availability issues and critical sites") min_persistent_days = st.number_input( "Minimum consecutive days below SLA to flag persistent issue", min_value=2, max_value=30, value=3, step=1, ) persistent_df = analyze_persistent_availability( analysis_df, multi_rat_df, sla_2g, sla_3g, sla_lte, int(min_persistent_days) ) if persistent_df is not None and not persistent_df.empty: top_critical_n = st.number_input( "Number of top critical sites to display", min_value=5, max_value=200, value=25, step=5, ) st.dataframe(persistent_df.head(top_critical_n).round(2)) else: st.info( "No persistent availability issues detected with current parameters." ) if not analysis_df.empty: st.subheader("Site drill-down: traffic and availability over time") sites_df = ( analysis_df[["code", "City"]] .drop_duplicates() .sort_values(by=["City", "code"]) ) site_options = sites_df.apply( lambda row: ( f"{row['City']}_{row['code']}" if pd.notna(row["City"]) else str(row["code"]) ), axis=1, ) site_map = dict(zip(site_options, sites_df["code"])) selected_site_label = st.selectbox( "Select a site for detailed view", options=site_options ) selected_code = site_map.get(selected_site_label) site_detail_df = analysis_df[analysis_df["code"] == selected_code].copy() if not site_detail_df.empty: site_detail_df = site_detail_df.sort_values("date") traffic_cols = [ col for col in ["total_voice_trafic", "total_data_trafic"] if col in site_detail_df.columns ] if traffic_cols: traffic_long = site_detail_df[["date"] + traffic_cols].melt( id_vars="date", value_vars=traffic_cols, var_name="metric", value_name="value", ) fig_traffic = px.line( traffic_long, x="date", y="value", color="metric", ) st.plotly_chart(fig_traffic) avail_cols = [] rename_map = {} if "2g_tch_avail" in site_detail_df.columns: avail_cols.append("2g_tch_avail") rename_map["2g_tch_avail"] = "2G" if "3g_cell_avail" in site_detail_df.columns: avail_cols.append("3g_cell_avail") rename_map["3g_cell_avail"] = "3G" if "lte_cell_avail" in site_detail_df.columns: avail_cols.append("lte_cell_avail") rename_map["lte_cell_avail"] = "LTE" if avail_cols: avail_df = site_detail_df[["date"] + avail_cols].copy() avail_df = avail_df.rename(columns=rename_map) value_cols = [c for c in avail_df.columns if c != "date"] avail_long = avail_df.melt( id_vars="date", value_vars=value_cols, var_name="RAT", value_name="availability", ) fig_avail = px.line( avail_long, x="date", y="availability", color="RAT", ) st.plotly_chart(fig_avail) site_detail_df["date_only"] = site_detail_df["date"].dt.date degraded_rows_site = [] for rat_col, rat_name, sla_value in [ ("2g_tch_avail", "2G", sla_2g), ("3g_cell_avail", "3G", sla_3g), ("lte_cell_avail", "LTE", sla_lte), ]: if rat_col in site_detail_df.columns: daily_site = ( site_detail_df.groupby("date_only")[rat_col].mean().dropna() ) mask = daily_site < sla_value for d, val in daily_site[mask].items(): degraded_rows_site.append( { "RAT": rat_name, "date": d, "avg_availability": val, "SLA": sla_value, } ) if degraded_rows_site: degraded_site_df = pd.DataFrame(degraded_rows_site) st.dataframe(degraded_site_df.round(2)) if "City" in analysis_df.columns and analysis_df["City"].notna().any(): st.subheader("City drill-down: traffic and availability over time") cities_df = ( analysis_df[["City"]].dropna().drop_duplicates().sort_values(by="City") ) selected_city = st.selectbox( "Select a City for aggregated view", options=cities_df["City"].tolist(), ) city_detail_df = analysis_df[analysis_df["City"] == selected_city].copy() if not city_detail_df.empty: city_detail_df = city_detail_df.sort_values("date") traffic_cols_city = [ col for col in ["total_voice_trafic", "total_data_trafic"] if col in city_detail_df.columns ] if traffic_cols_city: city_traffic = ( city_detail_df.groupby("date")[traffic_cols_city] .sum() .reset_index() ) traffic_long_city = city_traffic.melt( id_vars="date", value_vars=traffic_cols_city, var_name="metric", value_name="value", ) fig_traffic_city = px.line( traffic_long_city, x="date", y="value", color="metric", ) st.plotly_chart( fig_traffic_city, key=f"traffic_city_{selected_city}", ) avail_cols_city = [] rename_map_city = {} if "2g_tch_avail" in city_detail_df.columns: avail_cols_city.append("2g_tch_avail") rename_map_city["2g_tch_avail"] = "2G" if "3g_cell_avail" in city_detail_df.columns: avail_cols_city.append("3g_cell_avail") rename_map_city["3g_cell_avail"] = "3G" if "lte_cell_avail" in city_detail_df.columns: avail_cols_city.append("lte_cell_avail") rename_map_city["lte_cell_avail"] = "LTE" if avail_cols_city: avail_city_df = city_detail_df[["date"] + avail_cols_city].copy() avail_city_df = avail_city_df.rename(columns=rename_map_city) value_cols_city = [c for c in avail_city_df.columns if c != "date"] avail_long_city = avail_city_df.melt( id_vars="date", value_vars=value_cols_city, var_name="RAT", value_name="availability", ) fig_avail_city = px.line( avail_long_city, x="date", y="availability", color="RAT", ) st.plotly_chart( fig_avail_city, key=f"avail_city_{selected_city}", ) city_detail_df["date_only"] = city_detail_df["date"].dt.date degraded_rows_city = [] for rat_col, rat_name, sla_value in [ ("2g_tch_avail", "2G", sla_2g), ("3g_cell_avail", "3G", sla_3g), ("lte_cell_avail", "LTE", sla_lte), ]: if rat_col in city_detail_df.columns: daily_city = ( city_detail_df.groupby("date_only")[rat_col] .mean() .dropna() ) mask_city = daily_city < sla_value for d, val in daily_city[mask_city].items(): degraded_rows_city.append( { "RAT": rat_name, "date": d, "avg_availability": val, "SLA": sla_value, } ) if degraded_rows_city: degraded_city_df = pd.DataFrame(degraded_rows_city) st.dataframe(degraded_city_df.round(2)) # Temporal availability analysis - daily averages per RAT if any( col in analysis_df.columns for col in ["2g_tch_avail", "3g_cell_avail", "lte_cell_avail"] ): temp_df = analysis_df.copy() temp_df["date_only"] = temp_df["date"].dt.date agg_dict = {} if "2g_tch_avail" in temp_df.columns: agg_dict["2g_tch_avail"] = "mean" if "3g_cell_avail" in temp_df.columns: agg_dict["3g_cell_avail"] = "mean" if "lte_cell_avail" in temp_df.columns: agg_dict["lte_cell_avail"] = "mean" daily_avail = ( temp_df.groupby("date_only", as_index=False).agg(agg_dict) if agg_dict else pd.DataFrame() ) if not daily_avail.empty: rename_map = {} if "2g_tch_avail" in daily_avail.columns: rename_map["2g_tch_avail"] = "2G" if "3g_cell_avail" in daily_avail.columns: rename_map["3g_cell_avail"] = "3G" if "lte_cell_avail" in daily_avail.columns: rename_map["lte_cell_avail"] = "LTE" daily_avail = daily_avail.rename(columns=rename_map) value_cols = [c for c in daily_avail.columns if c != "date_only"] if value_cols: daily_melt = daily_avail.melt( id_vars="date_only", value_vars=value_cols, var_name="RAT", value_name="availability", ) st.subheader("Daily average availability per RAT") fig = px.line( daily_melt, x="date_only", y="availability", color="RAT", markers=True, ) st.plotly_chart(fig) degraded_rows = [] for rat_name, sla_value in [ ("2G", sla_2g), ("3G", sla_3g), ("LTE", sla_lte), ]: if rat_name in daily_avail.columns: series = daily_avail[rat_name] mask = series < sla_value for d, val in zip( daily_avail.loc[mask, "date_only"], series[mask] ): degraded_rows.append( { "RAT": rat_name, "date": d, "avg_availability": val, "SLA": sla_value, } ) if degraded_rows: degraded_df = pd.DataFrame(degraded_rows) st.subheader("Days with average availability below SLA") st.dataframe(degraded_df.round(2)) TraficAnalysis.last_period_df = analysis_last_period #######################################################################################################""" ####################################################################################################### if TraficAnalysis.last_period_df is not None: df = TraficAnalysis.last_period_df # Get top trafics sites based on total data trafic during last period top_sites = ( df.groupby(["code", "City"])["total_data_trafic"] .sum() .sort_values(ascending=False) ) top_sites = top_sites.head(number_of_top_trafic_sites) st.subheader(f"Top {number_of_top_trafic_sites} sites by data traffic") chart_col, data_col = st.columns(2) with data_col: st.dataframe(top_sites.sort_values(ascending=True)) # chart fig = px.bar( top_sites.reset_index(), y=top_sites.reset_index()[["City", "code"]].agg( lambda x: "_".join(map(str, x)), axis=1 ), x="total_data_trafic", title=f"Top {number_of_top_trafic_sites} sites by data traffic", orientation="h", text="total_data_trafic", text_auto=True, ) # fig.update_layout(height=600) with chart_col: st.plotly_chart(fig) # Top sites by voice trafic during last period top_sites_voice = ( df.groupby(["code", "City"])["total_voice_trafic"] .sum() .sort_values(ascending=False) ) top_sites_voice = top_sites_voice.head(number_of_top_trafic_sites) st.subheader(f"Top {number_of_top_trafic_sites} sites by voice traffic") chart_col, data_col = st.columns(2) with data_col: st.dataframe(top_sites_voice.sort_values(ascending=True)) # chart fig = px.bar( top_sites_voice.reset_index(), y=top_sites_voice.reset_index()[["City", "code"]].agg( lambda x: "_".join(map(str, x)), axis=1 ), x="total_voice_trafic", title=f"Top {number_of_top_trafic_sites} sites by voice traffic", orientation="h", text="total_voice_trafic", text_auto=True, ) # fig.update_layout(height=600) with chart_col: st.plotly_chart(fig) ##################################################### min_size = 5 max_size = 40 # Map of sum of data trafic during last period # Aggregate total data traffic df_data = ( df.groupby(["code", "City", "Latitude", "Longitude"])["total_data_trafic"] .sum() .reset_index() ) st.subheader("Map of data trafic during last period") # Define size range # Linear size scaling traffic_data_min = df_data["total_data_trafic"].min() traffic_data_max = df_data["total_data_trafic"].max() df_data["bubble_size"] = df_data["total_data_trafic"].apply( lambda x: min_size + (max_size - min_size) * (x - traffic_data_min) / (traffic_data_max - traffic_data_min) ) # Custom blue color scale: start from visible blue custom_blue_red = [ [0.0, "#4292c6"], # light blue [0.2, "#2171b5"], [0.4, "#084594"], # dark blue [0.6, "#cb181d"], # Strong red [0.8, "#a50f15"], # Darker red [1.0, "#67000d"], # Very dark red ] fig = px.scatter_map( df_data, lat="Latitude", lon="Longitude", color="total_data_trafic", size="bubble_size", color_continuous_scale=custom_blue_red, size_max=max_size, zoom=10, height=600, title="Data traffic distribution", hover_data={"code": True, "total_data_trafic": True}, hover_name="code", text=[str(x) for x in df_data["code"]], ) fig.update_layout( mapbox_style="open-street-map", coloraxis_colorbar=dict(title="Total Data Traffic (MB)"), coloraxis=dict(cmin=traffic_data_min, cmax=traffic_data_max), font=dict(size=10, color="black"), ) st.plotly_chart(fig) ######################################################################################## # Map of sum of voice trafic during last period # Aggregate total voice traffic df_voice = ( df.groupby(["code", "City", "Latitude", "Longitude"])["total_voice_trafic"] .sum() .reset_index() ) st.subheader("Map of voice trafic during last period") # Linear size scaling traffic_voice_min = df_voice["total_voice_trafic"].min() traffic_voice_max = df_voice["total_voice_trafic"].max() df_voice["bubble_size"] = df_voice["total_voice_trafic"].apply( lambda x: min_size + (max_size - min_size) * (x - traffic_voice_min) / (traffic_voice_max - traffic_voice_min) ) fig = px.scatter_map( df_voice, lat="Latitude", lon="Longitude", color="total_voice_trafic", size="bubble_size", color_continuous_scale=custom_blue_red, size_max=max_size, zoom=10, height=600, title="Voice traffic distribution", hover_data={"code": True, "total_voice_trafic": True}, hover_name="code", text=[str(x) for x in df_voice["code"]], ) fig.update_layout( mapbox_style="open-street-map", coloraxis_colorbar=dict(title="Total Voice Traffic (MB)"), coloraxis=dict(cmin=traffic_voice_min, cmax=traffic_voice_max), font=dict(size=10, color="black"), ) st.plotly_chart(fig) # Prepare availability DataFrames for export (fallback to empty if KPI missing) summary_frames = [] if "summary_2g_avail" in locals() and summary_2g_avail is not None: tmp = summary_2g_avail.copy() tmp["RAT"] = "2G" summary_frames.append(tmp) if "summary_3g_avail" in locals() and summary_3g_avail is not None: tmp = summary_3g_avail.copy() tmp["RAT"] = "3G" summary_frames.append(tmp) if "summary_lte_avail" in locals() and summary_lte_avail is not None: tmp = summary_lte_avail.copy() tmp["RAT"] = "LTE" summary_frames.append(tmp) if summary_frames: availability_summary_all = pd.concat(summary_frames, ignore_index=True) else: availability_summary_all = pd.DataFrame() export_site_2g = ( site_2g_avail if "site_2g_avail" in locals() and site_2g_avail is not None else pd.DataFrame() ) export_site_3g = ( site_3g_avail if "site_3g_avail" in locals() and site_3g_avail is not None else pd.DataFrame() ) export_site_lte = ( site_lte_avail if "site_lte_avail" in locals() and site_lte_avail is not None else pd.DataFrame() ) export_multi_rat_base = analyze_multirat_availability( full_df, sla_2g, sla_3g, sla_lte ) if export_multi_rat_base is not None: export_multi_rat = export_multi_rat_base else: export_multi_rat = pd.DataFrame() export_persistent = pd.DataFrame() if export_multi_rat_base is not None: export_persistent_tmp = analyze_persistent_availability( full_df, export_multi_rat_base, sla_2g, sla_3g, sla_lte ) if export_persistent_tmp is not None: export_persistent = export_persistent_tmp final_dfs = convert_dfs( [ full_df, sum_pre_post_analysis, avg_pre_post_analysis, monthly_voice_df, monthly_data_df, availability_summary_all, export_site_2g, export_site_3g, export_site_lte, export_multi_rat, export_persistent, ], [ "Global_Trafic_Analysis", "Sum_pre_post_analysis", "Avg_pre_post_analysis", "Monthly_voice_analysis", "Monthly_data_analysis", "Availability_Summary_All_RAT", "TwoG_Availability_By_Site", "ThreeG_Availability_By_Site", "LTE_Availability_By_Site", "MultiRAT_Availability_By_Site", "Top_Critical_Sites", ], ) # 📥 Bouton de téléchargement st.download_button( on_click="ignore", type="primary", label="Download the Analysis Report", data=final_dfs, file_name=f"Global_Trafic_Analysis_Report_{datetime.now()}.xlsx", mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", )