import streamlit as st import pandas as pd import plotly.express as px import glob import re import os # ====================== # PAGE CONFIG # ====================== st.set_page_config( page_title="HF KPI Dashboard", layout="wide" ) # ====================== # TAB NAVIGATION # ====================== tab1, tab2 = st.tabs([ "📊 Dashboard KPI Monitoring Daily", "📈 Dashboard Capacity & Utilization" ]) # ====================== # LOAD DATA (SHARED) # ====================== @st.cache_data def load_data(): files = ( glob.glob("src/LTE_CELL_DAILY *.csv") + glob.glob("src/LTE_CELL_DAILY *.csv.gz") ) if not files: return None dfs = [] for f in files: tmp = pd.read_csv(f, compression="infer") if "DATE_ID" in tmp.columns: tmp["DATE_ID"] = pd.to_datetime( tmp["DATE_ID"].astype(str).str.strip(), format="%m/%d/%Y", errors="coerce" ) if tmp["DATE_ID"].isna().all(): m = re.search(r"(\d{8})", os.path.basename(f)) if m: tmp["DATE_ID"] = pd.to_datetime(m.group(1), format="%Y%m%d") dfs.append(tmp) df = pd.concat(dfs, ignore_index=True) df = df.dropna(subset=["DATE_ID"]) df["DATE_ID"] = df["DATE_ID"].dt.normalize() return df df_raw = load_data() if df_raw is None or df_raw.empty: st.error("Data CSV / CSV.GZ tidak ditemukan") st.stop() # ====================== # KPI ALIAS # ====================== COLUMN_ALIAS = { "Intra-Frequency Handover Out Success Rate": "Intra_Frequency_HO_Out_Success_Rate" } for src, dst in COLUMN_ALIAS.items(): if src in df_raw.columns: df_raw[dst] = df_raw[src] # ====================== # FILTER (GLOBAL) # ====================== with tab1: st.title("DASHBOARDS KPI MONITORING DAILY") st.caption("📊 Dashboard By Muhammad Defri") st.subheader("Filter") selected_sites = st.multiselect( "Site ID", sorted(df_raw["ERBS"].dropna().unique()) ) if not selected_sites: st.warning("⚠️ Input Site ID") st.stop() df = df_raw[df_raw["ERBS"].isin(selected_sites)] col_start, col_end = st.columns(2) with col_start: start = st.date_input( "Start Date", df["DATE_ID"].min().date(), min_value=df_raw["DATE_ID"].min().date(), max_value=df_raw["DATE_ID"].max().date() ) with col_end: end = st.date_input( "End Date", df_raw["DATE_ID"].max().date(), min_value=start, max_value=df_raw["DATE_ID"].max().date() ) df = df[ (df["DATE_ID"] >= pd.to_datetime(start)) & (df["DATE_ID"] < pd.to_datetime(end) + pd.Timedelta(days=1)) ] # ====================== # MAP BAND & SECTOR (UNCHANGED) # ====================== def map_band(cell): if pd.isna(cell): return "OTHER" cell = str(cell).upper() if re.search(r"(MT|IT)", cell): return "L900" if re.search(r"(ML|IL|RL)", cell): return "L1800" if re.search(r"(MR|RR|IR)", cell): return "L2100" if re.search(r"(ME|MF|MV|IE|IF|IV|VE|VF|VV)", cell): return "L2300" return "OTHER" def map_sector(cell): if pd.isna(cell): return "SEC_OTHER" cell = str(cell).upper().strip() m = re.search(r"(RR|RL|IR|IL)(\d{2})$", cell) if m: sec = int(m.group(2)) // 10 return f"SEC{sec}" if sec in (1, 2, 3) else "SEC_OTHER" m = re.search(r"(\d{1,2})$", cell) if m: sec = ((int(m.group(1)) - 1) % 3) + 1 return f"SEC{sec}" return "SEC_OTHER" df["BAND"] = df["EUTRANCELLFDD"].apply(map_band) df["SECTOR"] = df["EUTRANCELLFDD"].apply(map_sector) available_bands = ["L900", "L1800", "L2100", "L2300"] selected_bands = st.multiselect("Band", available_bands, default=available_bands) df = df[df["BAND"].isin(selected_bands)] # ====================== # GLOBAL DATE AXIS # ====================== X_MIN = pd.to_datetime(start) X_MAX = pd.to_datetime(end) + pd.Timedelta(days=1) - pd.Timedelta(seconds=1) FULL_DATES = pd.DataFrame({ "DATE_ID": pd.date_range(start=X_MIN, end=pd.to_datetime(end), freq="D") }) DTICK = "D1" # ====================== # AUTO UNIT # ====================== def auto_unit(d, kpi): d = d.copy() max_val = d[kpi].max() if pd.isna(max_val): return d.round(2), "GB" if max_val >= 1024: d[kpi] = (d[kpi] / 1024).round(2) return d, "TB" d[kpi] = d[kpi].round(2) return d, "GB" # ====================== # SECTOR CHART (UNCHANGED) # ====================== def sector_chart(df, sector, kpi, title, is_pct=False): base = df[df["SECTOR"] == sector] if base.empty or kpi not in base.columns: return None frames = [] for cell in sorted(base["EUTRANCELLFDD"].unique()): g = base[base["EUTRANCELLFDD"] == cell][["DATE_ID", kpi]] g = FULL_DATES.merge(g, on="DATE_ID", how="left") g["EUTRANCELLFDD"] = cell frames.append(g) d = pd.concat(frames, ignore_index=True) if ("TRAFFIC" in kpi.upper()) or ("PAYLOAD" in kpi.upper()): d, unit = auto_unit(d, kpi) fig = px.area(d, x="DATE_ID", y=kpi, color="EUTRANCELLFDD", title=f"{title} ({unit})") else: fig = px.line(d, x="DATE_ID", y=kpi, color="EUTRANCELLFDD", title=title, markers=True) fig.update_layout( height=420, dragmode=False, xaxis=dict(tickformat="%d-%b", tickangle=-45), legend=dict(orientation="h", y=-0.35, x=0.5, xanchor="center") ) fig.update_xaxes( range=[X_MIN, X_MAX], tickmode="linear", dtick=DTICK, fixedrange=True ) if is_pct: fig.update_yaxes(range=[0, 100]) return fig KPI_LIST = [ ("Radio_Network_Availability_Rate","Availability",True), ("Session_Abnormal_Release","Session Abnormal Release",False), ("RRC_Setup_Success_Rate_Service","RRC Setup Success Rate",True), ("ERAB_Setup_Success_Rate_All","ERAB Setup Success Rate",True), ("Session_Setup_Success_Rate","Session Setup Success Rate",True), ("Intra_Frequency_HO_Out_Success_Rate","Intra-Frequency HO Out",True), ("inter_freq_HO","Inter Frequency HO",False), ("UL_RSSI_dbm","UL RSSI (dBm)",False), ("Average_CQI_nonHOME","Average CQI",False), ("SE_2","Spectral Efficiency",False), ("Total_Traffic_Volume_new","Total Traffic Volume New",False), ("CA_PAYLOAD_GB","CA Payload (GB)",False), ] for kpi, title, is_pct in KPI_LIST: st.markdown("---") c1, c2, c3 = st.columns(3) for col, sec in zip([c1, c2, c3], ["SEC1","SEC2","SEC3"]): with col: fig = sector_chart(df, sec, kpi, f"{title} - {sec}", is_pct) if fig: st.plotly_chart( fig, use_container_width=True, config={"displayModeBar": False, "scrollZoom": False} ) # ====================== # DASHBOARD 2 # ====================== with tab2: st.title("DASHBOARD CAPACITY & UTILIZATION") st.caption("📈 Clean • Enterprise • Management Ready") KPI_LIST_2 = [ "DL_Resource_Block_Utilizing_Rate", "UL_Resource_Block_Utilizing_Rate", "User_Uplink_Average_Throughput", "DL_PDCP_User_Throughput", "Cell_Downlink_Average_Throughput", "Cell_Uplink_Average_Throughput", "LTE_CSFB_SR", "Maximum_User_Number_RrcConn", "PRB_MAX", "AU_MAX", "Max DL Cell Downlink Throughput", "Max UL Cell Downlink Throughput" ] def simple_kpi_chart(df, kpi): if kpi not in df.columns: return None d = df.groupby("DATE_ID")[kpi].mean().reset_index() d = FULL_DATES.merge(d, on="DATE_ID", how="left") fig = px.area(d, x="DATE_ID", y=kpi, title=kpi) fig.update_layout( height=420, dragmode=False, xaxis=dict(tickformat="%d-%b"), ) fig.update_xaxes( range=[X_MIN, X_MAX], tickmode="linear", dtick=DTICK, fixedrange=True ) return fig for i in range(0, len(KPI_LIST_2), 3): cols = st.columns(3) for col, kpi in zip(cols, KPI_LIST_2[i:i+3]): with col: fig = simple_kpi_chart(df, kpi) if fig: st.plotly_chart( fig, use_container_width=True, config={"displayModeBar": False, "scrollZoom": False} )