DASHBOARDS-2 / app.py
muhdefri's picture
Update app.py
4e609e6 verified
import streamlit as st
import pandas as pd
import plotly.express as px
import glob
import re
import os
# ======================
# PAGE CONFIG
# ======================
st.set_page_config(
page_title="HF KPI Dashboard",
layout="wide"
)
# ======================
# TAB NAVIGATION
# ======================
tab1, tab2 = st.tabs([
"πŸ“Š Dashboard KPI Monitoring Daily",
"πŸ“ˆ Dashboard Capacity & Utilization"
])
# ======================
# LOAD DATA (SHARED)
# ======================
@st.cache_data
def load_data():
files = (
glob.glob("src/LTE_CELL_DAILY *.csv") +
glob.glob("src/LTE_CELL_DAILY *.csv.gz")
)
if not files:
return None
dfs = []
for f in files:
tmp = pd.read_csv(f, compression="infer")
if "DATE_ID" in tmp.columns:
tmp["DATE_ID"] = pd.to_datetime(
tmp["DATE_ID"].astype(str).str.strip(),
format="%m/%d/%Y",
errors="coerce"
)
if tmp["DATE_ID"].isna().all():
m = re.search(r"(\d{8})", os.path.basename(f))
if m:
tmp["DATE_ID"] = pd.to_datetime(m.group(1), format="%Y%m%d")
dfs.append(tmp)
df = pd.concat(dfs, ignore_index=True)
df = df.dropna(subset=["DATE_ID"])
df["DATE_ID"] = df["DATE_ID"].dt.normalize()
return df
df_raw = load_data()
if df_raw is None or df_raw.empty:
st.error("Data CSV / CSV.GZ tidak ditemukan")
st.stop()
# ======================
# KPI ALIAS
# ======================
COLUMN_ALIAS = {
"Intra-Frequency Handover Out Success Rate":
"Intra_Frequency_HO_Out_Success_Rate"
}
for src, dst in COLUMN_ALIAS.items():
if src in df_raw.columns:
df_raw[dst] = df_raw[src]
# ======================
# FILTER (GLOBAL)
# ======================
with tab1:
st.title("DASHBOARDS KPI MONITORING DAILY")
st.caption("πŸ“Š Dashboard By Muhammad Defri")
st.subheader("Filter")
selected_sites = st.multiselect(
"Site ID",
sorted(df_raw["ERBS"].dropna().unique())
)
if not selected_sites:
st.warning("⚠️ Input Site ID")
st.stop()
df = df_raw[df_raw["ERBS"].isin(selected_sites)]
col_start, col_end = st.columns(2)
with col_start:
start = st.date_input(
"Start Date",
df["DATE_ID"].min().date(),
min_value=df_raw["DATE_ID"].min().date(),
max_value=df_raw["DATE_ID"].max().date()
)
with col_end:
end = st.date_input(
"End Date",
df_raw["DATE_ID"].max().date(),
min_value=start,
max_value=df_raw["DATE_ID"].max().date()
)
df = df[
(df["DATE_ID"] >= pd.to_datetime(start)) &
(df["DATE_ID"] < pd.to_datetime(end) + pd.Timedelta(days=1))
]
# ======================
# MAP BAND & SECTOR (UNCHANGED)
# ======================
def map_band(cell):
if pd.isna(cell):
return "OTHER"
cell = str(cell).upper()
if re.search(r"(MT|IT)", cell): return "L900"
if re.search(r"(ML|IL|RL)", cell): return "L1800"
if re.search(r"(MR|RR|IR)", cell): return "L2100"
if re.search(r"(ME|MF|MV|IE|IF|IV|VE|VF|VV)", cell): return "L2300"
return "OTHER"
def map_sector(cell):
if pd.isna(cell):
return "SEC_OTHER"
cell = str(cell).upper().strip()
m = re.search(r"(RR|RL|IR|IL)(\d{2})$", cell)
if m:
sec = int(m.group(2)) // 10
return f"SEC{sec}" if sec in (1, 2, 3) else "SEC_OTHER"
m = re.search(r"(\d{1,2})$", cell)
if m:
sec = ((int(m.group(1)) - 1) % 3) + 1
return f"SEC{sec}"
return "SEC_OTHER"
df["BAND"] = df["EUTRANCELLFDD"].apply(map_band)
df["SECTOR"] = df["EUTRANCELLFDD"].apply(map_sector)
available_bands = ["L900", "L1800", "L2100", "L2300"]
selected_bands = st.multiselect("Band", available_bands, default=available_bands)
df = df[df["BAND"].isin(selected_bands)]
# ======================
# GLOBAL DATE AXIS
# ======================
X_MIN = pd.to_datetime(start)
X_MAX = pd.to_datetime(end) + pd.Timedelta(days=1) - pd.Timedelta(seconds=1)
FULL_DATES = pd.DataFrame({
"DATE_ID": pd.date_range(start=X_MIN, end=pd.to_datetime(end), freq="D")
})
DTICK = "D1"
# ======================
# AUTO UNIT
# ======================
def auto_unit(d, kpi):
d = d.copy()
max_val = d[kpi].max()
if pd.isna(max_val):
return d.round(2), "GB"
if max_val >= 1024:
d[kpi] = (d[kpi] / 1024).round(2)
return d, "TB"
d[kpi] = d[kpi].round(2)
return d, "GB"
# ======================
# SECTOR CHART (UNCHANGED)
# ======================
def sector_chart(df, sector, kpi, title, is_pct=False):
base = df[df["SECTOR"] == sector]
if base.empty or kpi not in base.columns:
return None
frames = []
for cell in sorted(base["EUTRANCELLFDD"].unique()):
g = base[base["EUTRANCELLFDD"] == cell][["DATE_ID", kpi]]
g = FULL_DATES.merge(g, on="DATE_ID", how="left")
g["EUTRANCELLFDD"] = cell
frames.append(g)
d = pd.concat(frames, ignore_index=True)
if ("TRAFFIC" in kpi.upper()) or ("PAYLOAD" in kpi.upper()):
d, unit = auto_unit(d, kpi)
fig = px.area(d, x="DATE_ID", y=kpi, color="EUTRANCELLFDD",
title=f"{title} ({unit})")
else:
fig = px.line(d, x="DATE_ID", y=kpi,
color="EUTRANCELLFDD",
title=title, markers=True)
fig.update_layout(
height=420,
dragmode=False,
xaxis=dict(tickformat="%d-%b", tickangle=-45),
legend=dict(orientation="h", y=-0.35, x=0.5, xanchor="center")
)
fig.update_xaxes(
range=[X_MIN, X_MAX],
tickmode="linear",
dtick=DTICK,
fixedrange=True
)
if is_pct:
fig.update_yaxes(range=[0, 100])
return fig
KPI_LIST = [
("Radio_Network_Availability_Rate","Availability",True),
("Session_Abnormal_Release","Session Abnormal Release",False),
("RRC_Setup_Success_Rate_Service","RRC Setup Success Rate",True),
("ERAB_Setup_Success_Rate_All","ERAB Setup Success Rate",True),
("Session_Setup_Success_Rate","Session Setup Success Rate",True),
("Intra_Frequency_HO_Out_Success_Rate","Intra-Frequency HO Out",True),
("inter_freq_HO","Inter Frequency HO",False),
("UL_RSSI_dbm","UL RSSI (dBm)",False),
("Average_CQI_nonHOME","Average CQI",False),
("SE_2","Spectral Efficiency",False),
("Total_Traffic_Volume_new","Total Traffic Volume New",False),
("CA_PAYLOAD_GB","CA Payload (GB)",False),
]
for kpi, title, is_pct in KPI_LIST:
st.markdown("---")
c1, c2, c3 = st.columns(3)
for col, sec in zip([c1, c2, c3], ["SEC1","SEC2","SEC3"]):
with col:
fig = sector_chart(df, sec, kpi, f"{title} - {sec}", is_pct)
if fig:
st.plotly_chart(
fig,
use_container_width=True,
config={"displayModeBar": False, "scrollZoom": False}
)
# ======================
# DASHBOARD 2
# ======================
with tab2:
st.title("DASHBOARD CAPACITY & UTILIZATION")
st.caption("πŸ“ˆ Clean β€’ Enterprise β€’ Management Ready")
KPI_LIST_2 = [
"DL_Resource_Block_Utilizing_Rate",
"UL_Resource_Block_Utilizing_Rate",
"User_Uplink_Average_Throughput",
"DL_PDCP_User_Throughput",
"Cell_Downlink_Average_Throughput",
"Cell_Uplink_Average_Throughput",
"LTE_CSFB_SR",
"Maximum_User_Number_RrcConn",
"PRB_MAX",
"AU_MAX",
"Max DL Cell Downlink Throughput",
"Max UL Cell Downlink Throughput"
]
def simple_kpi_chart(df, kpi):
if kpi not in df.columns:
return None
d = df.groupby("DATE_ID")[kpi].mean().reset_index()
d = FULL_DATES.merge(d, on="DATE_ID", how="left")
fig = px.area(d, x="DATE_ID", y=kpi, title=kpi)
fig.update_layout(
height=420,
dragmode=False,
xaxis=dict(tickformat="%d-%b"),
)
fig.update_xaxes(
range=[X_MIN, X_MAX],
tickmode="linear",
dtick=DTICK,
fixedrange=True
)
return fig
for i in range(0, len(KPI_LIST_2), 3):
cols = st.columns(3)
for col, kpi in zip(cols, KPI_LIST_2[i:i+3]):
with col:
fig = simple_kpi_chart(df, kpi)
if fig:
st.plotly_chart(
fig,
use_container_width=True,
config={"displayModeBar": False, "scrollZoom": False}
)