import streamlit as st import pandas as pd import plotly.express as px import plotly.graph_objects as go from datetime import datetime, timedelta import requests import json import numpy as np import math import base64 # =================== CONFIG ===================== st.set_page_config( page_title="Advanced Fatigue Anfalytics", page_icon="πŸ›‘οΈ", # Safety icon layout="wide", initial_sidebar_state="expanded" ) # =================== LOGO ===================== logo_path = "btech.png" # File logo def get_base64(file_path): with open(file_path, "rb") as f: data = f.read() return base64.b64encode(data).decode() try: logo_base64 = get_base64(logo_path) logo_html = f'' except FileNotFoundError: st.warning(f"Logo file '{logo_path}' not found. Using placeholder text.") logo_html = '
BTECH
' # # =================== GLOBAL CSS ===================== # st.markdown(""" # # """, unsafe_allow_html=True) # # =================== HEADER ===================== # st.markdown(f""" #
#
#

Advanced Fatigue Analysis

#

Proactive Safety Intelligence for Mining Operations

#
# #
# """, unsafe_allow_html=True) # =================== GLOBAL CSS ===================== st.markdown(""" """, unsafe_allow_html=True) # =================== HEADER ===================== st.markdown(f"""

Advanced Fatigue Analysis

Proactive Safety Intelligence for Mining Operations

""", unsafe_allow_html=True) # # ... (Kode selanjutnya disalin dari bagian bawah file Anda, misalnya LOAD DATA ke bawah) # # =================== LOAD DATA ====================== @st.cache_data def load_data(): try: # ================================== # 1. LOAD CSV & NORMALIZE COLUMNS # ================================== df = pd.read_csv("data.csv") original_columns = df.columns.tolist() # Normalize: lower, strip, underscore df.columns = ( df.columns.astype(str) .str.strip() .str.lower() .str.replace(r"\s+", "_", regex=True) ) # ================================== # 2. AUTO-DETECT COLUMNS (case-insensitive) # ================================== col_operator = next((c for c in df.columns if "operator" in c or "driver" in c), None) col_shift = next((c for c in df.columns if "shift" in c), None) # βœ… FIX: Search for normalized "parent_fleet", NOT original "Parent Fleet" col_fleet_type = next((c for c in df.columns if "parent_fleet" in c), None) col_fleet_no = next((c for c in df.columns if "fleet_number" in c), None) # ================================== # 3. DERIVE COLUMNS # ================================== # Unit Number if col_fleet_no: df["unit_no"] = df[col_fleet_no].astype(str).str.split("-", n=1).str[-1].str.strip() else: df["unit_no"] = "UNKNOWN" # Speed col_speed = None for orig in original_columns: norm = orig.lower().replace(" ", "_") if "(in_km/hour).1" in norm or "speed" in norm: if norm in df.columns: col_speed = norm break if not col_speed: col_speed = next((c for c in df.columns if "speed" in c), None) # Time time_cols = [c for c in df.columns if "gmt" in c and "wita" in c] if len(time_cols) >= 2: df["start"] = pd.to_datetime(df[time_cols[0]], errors="coerce") df["end"] = pd.to_datetime(df[time_cols[1]], errors="coerce") elif len(time_cols) == 1: df["start"] = pd.to_datetime(df[time_cols[0]], errors="coerce") df["end"] = df["start"] + pd.Timedelta(minutes=1) else: df["start"] = pd.NaT df["end"] = pd.NaT # Time features if not df["start"].isna().all(): df["hour"] = df["start"].dt.hour df["date"] = df["start"].dt.date df["day_of_week"] = df["start"].dt.day_name() # df["week"], df["month"], df["year"] β€” optional, not used in filters else: df["hour"] = 0 df["date"] = None # Shift as int if col_shift: df[col_shift] = pd.to_numeric(df[col_shift], errors="coerce").astype("Int64") # βœ… FIX: CREATE site & group_model HERE (not in sidebar!) if col_fleet_type: # Split ONCE on first '-', keep FULL left part (e.g., "Amsterdam - CAT789" β†’ "AMSTERDAM") split = df[col_fleet_type].astype(str).str.split("-", n=1, expand=True) df["site"] = split[0].str.strip().str.upper() df["group_model"] = split[1].str.strip().fillna("UNKNOWN").replace("", "UNKNOWN") else: df["site"] = "UNKNOWN" df["group_model"] = "UNKNOWN" return df, col_operator, col_shift, col_fleet_type, col_speed, col_fleet_no except Exception as e: st.error(f"Error loading data: {e}") return pd.DataFrame(), None, None, None, None, None # ================================== # CALL load_data() # ================================== df, col_operator, col_shift, col_fleet_type, col_speed, col_fleet_no = load_data() df_original_full = df.copy() if df.empty: st.stop() st.success("Data Loaded Successfully") df_full_report = df.copy() # =================== FILTERS (Sidebar) ===================== filter_dict = {} st.sidebar.markdown( """
Filter if Need Specific Conditions
""", unsafe_allow_html=True ) with st.sidebar.form("filters_form"): # ---------------- Date Range ---------------- if 'date' in df.columns and not df['date'].isna().all(): min_date = pd.to_datetime(df['date']).min().date() max_date = pd.to_datetime(df['date']).max().date() date_range = st.date_input("Select Date Range", (min_date, max_date)) filter_dict['date_range'] = date_range else: filter_dict['date_range'] = (None, None) # βœ… FIXED: Use df['site'] & df['group_model'] (already created in load_data) # ---------------- Site Filter ---------------- all_sites = sorted(df['site'].dropna().unique()) selected_site = st.selectbox( "Filter Site", options=[None] + all_sites, format_func=lambda x: "All" if x is None else x ) filter_dict['site'] = selected_site # ---------------- Group Model Filter βœ… NOW WORKING ---------------- all_models = sorted(df['group_model'].dropna().unique()) # Define display names for specific values display_map = { "OB HAULLER": "OB HAULER", "HAULING COAL": "COAL HAULING" } # Create display options display_options = [display_map.get(model, model) for model in all_models] # Create reverse map to get original value back reverse_map = {v: k for k, v in display_map.items()} # Create selectbox with display names selected_display = st.selectbox( "Filter Group Model", options=[None] + display_options, format_func=lambda x: "All" if x is None else x ) # Map back to original value for filtering selected_model = reverse_map.get(selected_display, selected_display) if selected_display else None filter_dict['group_model'] = selected_model # ---------------- Shift ---------------- if col_shift: shifts = sorted(df[col_shift].dropna().unique()) selected_shift = st.selectbox( f"Select {col_shift.replace('_', ' ').title()}", options=[None] + shifts, format_func=lambda x: "All" if x is None else f"Shift {x}" ) filter_dict['shift'] = selected_shift else: filter_dict['shift'] = None # ---------------- Operator ---------------- if col_operator: ops = sorted(df[col_operator].dropna().unique()) selected_op = st.selectbox( f"Select {col_operator.replace('_', ' ').title()}", options=[None] + ops, format_func=lambda x: "All" if x is None else x ) filter_dict['operator'] = selected_op else: filter_dict['operator'] = None # ---------------- Hour ---------------- if 'hour' in df.columns and not df['hour'].isna().all(): hours = sorted(df['hour'].dropna().unique()) hour_range = st.slider("Select Hour Range", int(min(hours)), int(max(hours)), (int(min(hours)), int(max(hours)))) filter_dict['hour_range'] = hour_range else: filter_dict['hour_range'] = (0, 23) # ---------------- Unit No ---------------- if 'unit_no' in df.columns: units = sorted(df['unit_no'].dropna().unique()) selected_unit = st.selectbox("Select Unit Number", [None] + units, format_func=lambda x: "All" if x is None else x) filter_dict['unit_no'] = selected_unit else: filter_dict['unit_no'] = None # ---------------- Submit ---------------- apply_filters = st.form_submit_button("Apply Filters") # =================== APPLY FILTERS ===================== if apply_filters: # Filter Date Range if filter_dict.get('date_range'): start_date, end_date = filter_dict['date_range'] df = df[(df['date'] >= start_date) & (df['date'] <= end_date)] # Filter Site if filter_dict.get('site') is not None: df = df[df['site'] == filter_dict['site']] # Filter Group Model if filter_dict.get('group_model') is not None: df = df[df['group_model'] == filter_dict['group_model']] # UI display mapping (for rendering only β€” data remains unchanged) group_model_display = { 'OB HAULLER': 'OB HAULER', 'HAULING COAL': 'COAL HAULING' } # Filter Shift if filter_dict.get('shift') is not None: df = df[df[col_shift] == filter_dict['shift']] # Filter Operator if filter_dict.get('operator') is not None: df = df[df[col_operator] == filter_dict['operator']] # Filter Hour Range if filter_dict.get('hour_range'): hr_start, hr_end = filter_dict['hour_range'] df = df[(df['hour'] >= hr_start) & (df['hour'] <= hr_end)] # Filter Unit No if filter_dict.get('unit_no') is not None: df = df[df[col_fleet_no] == filter_dict['unit_no']] # Sisanya dari kode Anda (Visualisasi, dll.) tetap sama # Objective 1 # ===================== GLOBAL FUNCTION: Hour Category Labels ===================== def hour_range_label_full(hour): if not (0 <= hour < 24): return 'Unknown' if 6 <= hour < 9: return 'Shift 1 Morning Early (6-9)' elif 9 <= hour < 12: return 'Shift 1 Morning Late (9-12)' elif 12 <= hour < 15: return 'Shift 1 Afternoon Early (12-15)' elif 15 <= hour < 18: return 'Shift 1 Afternoon Late (15-18)' elif 18 <= hour < 21: return 'Shift 2 Evening Early (18-21)' elif 21 <= hour < 24: return 'Shift 2 Evening Late (21-24)' elif 0 <= hour < 3: return 'Shift 2 Dawn Early (0-3)' elif 3 <= hour < 6: return 'Shift 2 Dawn Late (3-6)' return 'Unknown' # ===================== MAIN VISUALIZATION ===================== st.subheader("OBJECTIVE 1: Want to see fatigue patterns across different shifts?") if 'start' in df.columns and not df.empty: try: # --- Data Preparation --- df_local = df.copy() if not pd.api.types.is_datetime64_any_dtype(df_local['start']): df_local['start'] = pd.to_datetime(df_local['start'], errors='coerce') df_local = df_local.dropna(subset=['start']) df_local['hour'] = df_local['start'].dt.hour # --- COLOR MAP: KUNING-ORANGE (Shift 1), BIRU (Shift 2) --- color_map_full = { 'Shift 1 Morning Early (6-9)': '#FFEB3B', # Yellow 300 'Shift 1 Morning Late (9-12)': '#FFC107', # Amber 300 'Shift 1 Afternoon Early (12-15)': '#FF9800', # Orange 300 'Shift 1 Afternoon Late (15-18)': '#F57C00', # Deep Orange 300 'Shift 2 Evening Early (18-21)': '#42A5F5', # Light Blue 300 'Shift 2 Evening Late (21-24)': '#1976D2', # Blue 300 'Shift 2 Dawn Early (0-3)': '#0288D1', # Cyan 300 'Shift 2 Dawn Late (3-6)': '#01579B', # Blue 800 } # --- Define intervals in analog-clock order (12β†’3β†’6β†’9) --- intervals_shift1 = [(12, 15), (15, 18), (6, 9), (9, 12)] labels_shift1 = [ 'Shift 1 Afternoon Early (12-15)', 'Shift 1 Afternoon Late (15-18)', 'Shift 1 Morning Early (6-9)', 'Shift 1 Morning Late (9-12)', ] intervals_shift2 = [(0, 3), (3, 6), (18, 21), (21, 24)] labels_shift2 = [ 'Shift 2 Dawn Early (0-3)', 'Shift 2 Dawn Late (3-6)', 'Shift 2 Evening Early (18-21)', 'Shift 2 Evening Late (21-24)', ] # --- Compute frequencies --- def compute_counts(intervals): counts = [] for start_h, end_h in intervals: cnt = df_local[(df_local['hour'] >= start_h) & (df_local['hour'] < end_h)].shape[0] counts.append(cnt) return counts freq_shift1 = compute_counts(intervals_shift1) freq_shift2 = compute_counts(intervals_shift2) # --- Polar geometry --- theta_midpoints = [45, 135, 225, 315] # centers of 90Β° segments bar_width = [90] * 4 angular_tick_vals = [0, 90, 180, 270] # fixed angle positions # βœ… CUSTOM TICK LABELS PER SHIFT (sesuai permintaan Anda) angular_tick_text_shift1 = ["12", "15", "6/18", "9"] # 0Β°, 90Β°, 180Β°, 270Β° angular_tick_text_shift2 = ["24", "3", "18/6", "21"] # 0Β°=24, 90Β°=3, 180Β°=6, 270Β°=21 # --- Independent radial scales --- max_r1 = max(freq_shift1) if freq_shift1 and max(freq_shift1) > 0 else 1 max_r2 = max(freq_shift2) if freq_shift2 and max(freq_shift2) > 0 else 1 # ============== FIGURE: SHIFT 1 (KUNING-ORANGE) ============== fig1 = go.Figure() fig1.add_trace(go.Barpolar( r=freq_shift1, theta=theta_midpoints, width=bar_width, marker_color=[color_map_full.get(lbl, '#FFEB3B') for lbl in labels_shift1], marker_line_color="black", marker_line_width=1.5, opacity=0.93, hovertemplate="%{text}
Fatigue Incidents: %{r}", text=labels_shift1, )) fig1.update_layout( title=dict(text="Shift 1 (06:00–18:00)", font=dict(size=18, color="#FF9800", family="Segoe UI")), polar=dict( bgcolor="rgba(255,248,225,0.7)", angularaxis=dict( rotation=90, # 12 at top direction="clockwise", tickmode='array', tickvals=angular_tick_vals, ticktext=angular_tick_text_shift1, # βœ… 12, 15, 6, 9 tickfont=dict(size=14, color="#5D4037", weight="bold"), showline=True, linewidth=1.2, linecolor="#FFD54F", ), radialaxis=dict( visible=True, showticklabels=True, tickfont=dict(size=11), angle=45, gridcolor="#FFE082", gridwidth=0.8, range=[0, max_r1 * 1.15], ) ), showlegend=False, height=550, width=550, margin=dict(t=65, b=40, l=40, r=40), font=dict(family="Segoe UI, -apple-system, sans-serif"), ) # ============== FIGURE: SHIFT 2 (BIRU) ============== fig2 = go.Figure() fig2.add_trace(go.Barpolar( r=freq_shift2, theta=theta_midpoints, width=bar_width, marker_color=[color_map_full.get(lbl, '#42A5F5') for lbl in labels_shift2], marker_line_color="black", marker_line_width=1.5, opacity=0.93, hovertemplate="%{text}
Fatigue Incidents: %{r}", text=labels_shift2, )) fig2.update_layout( title=dict(text="Shift 2 (18:00–06:00)", font=dict(size=18, color="#1976D2", family="Segoe UI")), polar=dict( bgcolor="rgba(230,245,255,0.7)", angularaxis=dict( rotation=90, direction="clockwise", tickmode='array', tickvals=angular_tick_vals, ticktext=angular_tick_text_shift2, # βœ… 24, 3, 6, 21 tickfont=dict(size=14, color="#0D47A1", weight="bold"), showline=True, linewidth=1.2, linecolor="#64B5F6", ), radialaxis=dict( visible=True, showticklabels=True, tickfont=dict(size=11), angle=45, gridcolor="#BBDEFB", gridwidth=0.8, range=[0, max_r2 * 1.15], # βœ… SKALA INDEPENDEN ) ), showlegend=False, height=550, width=550, margin=dict(t=65, b=40, l=40, r=40), font=dict(family="Segoe UI, -apple-system, sans-serif"), ) # ============== EXPLANATION β€” URUTAN KRONOLOGIS, REMARK TETAP ============== st.markdown("""

! ⚠️ Clockwise Time Mapping (Analog Layout)

Time Block Shift 1 (Day) Shift 2 (Night)
1st Block 06 β†’ 09 18 β†’ 21
2nd Block 09 β†’ 12 21 β†’ 24 (Alertness Decline)
3rd Block 12 β†’ 15 24 β†’ 03 (Circadian Nadir)
4th Block 15 β†’ 18 03 β†’ 06

Scale is independent per shift β€” bar length shows relative risk within the shift.

""", unsafe_allow_html=True) # ============== RENDER CHARTS HORIZONTALLY (NO OVERLAP) ============== col1, col2 = st.columns(2) with col1: st.plotly_chart(fig1, use_container_width=True, config={'displayModeBar': False}) with col2: st.plotly_chart(fig2, use_container_width=True, config={'displayModeBar': False}) # ============== FOOTNOTE (SEMINAR-READY) ============== st.caption( " *Safety Insight*: Highest fatigue risk occurs during **24β†’06** (Shift 2) β€” aligns with circadian trough (Czeisler, 1999). " ) except Exception as e: st.error(f"⚠️ Rendering error: {e}") st.code(f"{type(e).__name__}: {str(e)}", language="python") else: st.info("⏳ Awaiting data... Ensure column `'start'` contains valid timestamps (e.g., '2025-06-15 14:30:00').") #Objective # #Objective 1 st.subheader("OBJECTIVE 2: How does operator energy fluctuate from start to finish of each shift?") if 'start' in df.columns and not df.empty: try: df_local = df.copy() df_local['hour'] = df_local['start'].dt.hour df_local['date'] = df_local['start'].dt.normalize() # Kategorisasi jam menggunakan fungsi global df_local['hour_category'] = df_local['hour'].apply(hour_range_label_full) color_map = { 'Shift 1 Morning Early (6-9)': '#FFEB3B', 'Shift 1 Morning Late (9-12)': '#FFC107', 'Shift 1 Afternoon Early (12-15)':'#FF9800', 'Shift 1 Afternoon Late (15-18)': '#F57C00', 'Shift 2 Evening Early (18-21)': '#42A5F5', 'Shift 2 Evening Late (21-24)': '#1976D2', 'Shift 2 Dawn Early (0-3)': '#0288D1', 'Shift 2 Dawn Late (3-6)': '#01579B', 'Unknown': '#E0E0E0' } # Hitung jumlah fatigue per hari dan kategori jam daily_by_cat = df_local.groupby(['date', 'hour_category']).size().reset_index(name='fatigue_count') # --- TAMBAHAN: Ambil dominant hour_category per hari untuk Objective 3 --- # Kita gunakan data df_local yang sudah memiliki hour_category daily_dominant_cat = df_local.groupby('date')['hour_category'].agg( lambda x: x.value_counts().idxmax() ).reset_index() daily_dominant_cat.rename(columns={'hour_category': 'dominant_hour_category'}, inplace=True) # --- END TAMBAHAN --- all_dates = pd.date_range(start=daily_by_cat['date'].min(), end=daily_by_cat['date'].max(), freq='D') all_cats = list(color_map.keys()) full_index = pd.MultiIndex.from_product([all_dates, all_cats], names=['date', 'hour_category']) daily_by_cat = daily_by_cat.set_index(['date', 'hour_category']).reindex(full_index, fill_value=0).reset_index() daily_by_cat['day_of_week_num'] = daily_by_cat['date'].dt.dayofweek daily_by_cat['week_start'] = daily_by_cat['date'] - pd.to_timedelta(daily_by_cat['day_of_week_num'], unit='d') daily_by_cat['week_label'] = daily_by_cat['week_start'].dt.strftime('Week %U') fig = px.bar( daily_by_cat, x='date', y='fatigue_count', color='hour_category', title="Daily Fatigue Alerts by Detailed Hour Category", color_discrete_map=color_map, labels={'fatigue_count': 'Fatigue Alerts', 'date': 'Date'}, hover_data={'fatigue_count': True, 'week_label': True} ) fig.update_layout( barmode='stack', xaxis_title="Date", yaxis_title="Fatigue Alerts", height=400, legend_title="Hour Category" ) unique_weeks = daily_by_cat['week_start'].unique() shapes = [] week_labels = [] bg_colors = ['#f0e6ff', '#e6f0ff', '#e6fff0', '#fff0e6', '#ffe6e6', '#f0ffe6', '#e6e6ff'] for i, week in enumerate(sorted(unique_weeks)): week_days = daily_by_cat[daily_by_cat['week_start'] == week]['date'] if len(week_days) > 0: start_date = week_days.min() end_date = week_days.max() shapes.append(dict( type="rect", xref="x", yref="paper", x0=start_date, x1=end_date, y0=0, y1=1, fillcolor=bg_colors[i % len(bg_colors)], opacity=0.2, layer="below", line_width=0, )) week_labels.append( dict( xref='x', yref='paper', x=start_date + (end_date - start_date) / 2, y=1.02, text=f"Week {week.strftime('%U')}", showarrow=False, font=dict(size=10), xanchor='center', yanchor='bottom' ) ) fig.update_layout(shapes=shapes, annotations=week_labels) st.plotly_chart(fig, use_container_width=True) except Exception as e: st.error(f"⚠️ Error in Daily Fatigue by Detailed Hour Category: {e}") else: st.info("ℹ️ Insufficient time data to display this visualization.") # =================== OBJECTIVE 3: Daily Roster Insight per Week (Scatter Plot) ===================== # =================== OBJECTIVE 3: Daily Roster Insight per Week (Scatter Plot) ===================== st.subheader("OBJECTIVE 3: Looking for patterns in your team’s weekly roster?") if not df.empty and col_operator in df.columns and col_shift and col_shift in df.columns: try: df['date'] = pd.to_datetime(df['date']) # Hitung total event per hari daily_totals = df.groupby('date').size().reset_index(name='total_count') # Ambil dominant shift per hari dominant_shift = df.groupby('date')[col_shift].agg(lambda x: x.value_counts().idxmax()).reset_index() dominant_shift.rename(columns={col_shift: 'dominant_shift'}, inplace=True) daily_analysis = daily_totals.merge(dominant_shift, on='date', how='left') daily_analysis['week_start'] = daily_analysis['date'] - pd.to_timedelta(daily_analysis['date'].dt.weekday, unit='d') summary = [] weekly_groups = daily_analysis.groupby('week_start') for week_start, week_data in weekly_groups: # Urutkan data berdasarkan tanggal dalam minggu ini week_data_sorted = week_data.sort_values('date').reset_index(drop=True) for idx, row in week_data_sorted.iterrows(): current_date = row['date'] current_shift = row['dominant_shift'] current_count = row['total_count'] # --- CARI DATA DI HARI SEBELUM DAN SESUDAH (BERDASARKAN TANGGAL, BUKAN INDEKS) --- prev_date = current_date - pd.Timedelta(days=1) next_date = current_date + pd.Timedelta(days=1) # Cari shift di hari sebelumnya prev_row = week_data_sorted[week_data_sorted['date'] == prev_date] prev_shift = prev_row['dominant_shift'].iloc[0] if not prev_row.empty else None # Cari shift di hari berikutnya next_row = week_data_sorted[week_data_sorted['date'] == next_date] next_shift = next_row['dominant_shift'].iloc[0] if not next_row.empty else None # ---- LOGIKA REMARK BERDASARKAN PERUBAHAN SHIFT DALAM MINGGU YANG SAMA---- # Awal Roster: Ada data di hari sebelumnya (prev_date) dalam minggu, dan shift-nya berbeda # Akhir Roster: Ada data di hari berikutnya (next_date) dalam minggu, dan shift-nya berbeda # Bukan Awal/Akhir: Ada data di hari sebelumnya ATAU berikutnya, dan shift-nya sama # Unknown: Tidak ada data di hari sebelumnya (prev_date) DAN tidak ada data di hari berikutnya (next_date) dalam minggu yang sama if pd.isna(current_shift): remark = "Unknown" elif prev_shift is not None and prev_shift != current_shift: remark = "Start of Roster" elif next_shift is not None and next_shift != current_shift: remark = "End of Roster" elif (prev_shift is not None and prev_shift == current_shift) or (next_shift is not None and next_shift == current_shift): remark = "Neither Start nor End of Roster" elif prev_shift is None and next_shift is None: remark = "Unknown" else: remark = "Unknown" # --- Operator dari data df (YANG SUDAH DIFILTER) --- df_orig_for_date = df[df['date']==current_date] # Gunakan df yang difilter if not df_orig_for_date.empty: peak_nik_counts = df_orig_for_date[col_operator].value_counts() peak_nik = peak_nik_counts.index[0] if not peak_nik_counts.empty else "N/A" else: peak_nik = "N/A" summary.append({ 'week_start': week_start, 'date': current_date, 'day_name': current_date.strftime('%A'), 'total_count': current_count, 'shift_category': current_shift, 'remark': remark, 'operator': peak_nik }) summary_df = pd.DataFrame(summary) if not summary_df.empty: # Buat color map untuk remark (sesuai permintaan Anda) color_map_remark = { 'Start of Roster': '#ffcccc', # Merah muda 'End of Roster': '#cce5ff', # Biru muda 'Neither Start nor End of Roster': '#fff2cc', # Kuning muda 'Unknown': '#c0c0c0' # Abu-abu muda } # ===== SCATTER PLOT (WARNA BERDASARKAN remark) ===== fig = px.scatter( summary_df, x='date', y='remark', color='remark', # Warna berdasarkan remark (satu-satunya kolom di sumbu Y) color_discrete_map=color_map_remark, # Gunakan color_map_remark size='total_count', hover_data=['shift_category', 'operator', 'total_count'], title="Daily Roster Status by Date and Trend", category_orders={'remark': ['Start of Roster', 'End of Roster', 'Neither Start nor End of Roster', 'Unknown']} ) fig.update_layout(height=450, xaxis_title="Date", yaxis_title="Roster Status") st.plotly_chart(fig, use_container_width=True) # ===== TABEL ===== table_df = summary_df.rename(columns={ 'week_start':'Week Start', 'day_name':'Day', 'date':'Date', 'total_count':'Event Count', 'shift_category':'Dominant Shift', 'remark':'Roster Status', 'operator':'Operator' }) def highlight_remark(row): colors = { 'Start of Roster':'background-color: #ffcccc', 'End of Roster':'background-color: #cce5ff', 'Neither Start nor End of Roster':'background-color: #fff2cc', 'Unknown':'background-color: #c0c0c0' } return [colors.get(row['Roster Status'], '') for _ in row] st.dataframe(table_df.style.apply(highlight_remark, axis=1), use_container_width=True) else: st.info("ℹ️ No daily data to analyze.") except Exception as e: st.error(f"Error in Daily Roster Insight: {e}") else: if col_shift is None: st.info("ℹ️ Shift column not found, cannot display Daily Roster Insight.") elif col_shift not in df.columns: st.info(f"ℹ️ Column '{col_shift}' not found in the filtered data, cannot display Daily Roster Insight.") else: st.info("ℹ️ Insufficient data (date, operator, or shift column not found) to display daily roster insight.") # import plotly.express as px # from datetime import datetime st.subheader("OBJECTIVE 4: How is the Fatigue Event Risk Map per Operator?") import math import plotly.express as px try: # ============================ # 1. PREPROCESS & COPY DF # ============================ df_local = df.copy() df_local['date_only'] = df_local['start'].dt.normalize() df_local['week_number'] = df_local['date_only'].dt.isocalendar().week df_local['week_label'] = "Week " + df_local['week_number'].astype(str) # Unit cleanup df_local['unit_no'] = ( df_local[col_fleet_no] .astype(str) .str.split("-", n=1).str[-1].str.strip() ) if 'id' not in df_local.columns: st.error("❌ Column 'id' not found!") st.stop() # ============================ # 2. FILTER 8 MINGGU TERAKHIR # ============================ df_local['week_num_int'] = df_local['week_number'].astype(int) unique_weeks = sorted(df_local['week_num_int'].unique()) selected_last8 = unique_weeks[-8:] if len(unique_weeks) >= 8 else unique_weeks df_8w = df_local[df_local['week_num_int'].isin(selected_last8)].copy() # ===================================== # 3. FREQUENCY PER OPERATOR PER MINGGU # ===================================== weekly_freq = ( df_8w.groupby([col_operator, 'week_label'])['id'] .nunique() .reset_index(name='weekly_frequency') ) # ============================================ # 4. SUMMARY FREQUENCY & CEIL AVERAGE FREQ # ============================================ freq_summary = ( weekly_freq .groupby(col_operator)['weekly_frequency'] .agg(['sum', 'mean', 'count']) .reset_index() .rename(columns={ 'sum': 'frequency_by_shift', 'mean': 'avg_frequency', 'count': 'frequency_by_weeks' }) ) freq_summary['avg_frequency'] = freq_summary['avg_frequency'].apply(lambda x: math.ceil(x)) # ================================ # 5. RATA-RATA SPEED PER OPERATOR # ================================ speed_summary = ( df_8w.groupby(col_operator)[col_speed] .mean() .reset_index(name='avg_speed') ) # ===================== # 6. GABUNGKAN DATA # ===================== risk_matrix = freq_summary.merge(speed_summary, on=col_operator, how='left') risk_matrix = risk_matrix.rename(columns={col_operator: "Operator Name"}) # ================================ # 7. Tentukan Quadrant untuk Count # ================================ def assign_quadrant(row): if row['avg_frequency'] >= 2.5 and row['avg_speed'] >= 20: return "Quadrant I – Prevent at Source" elif row['avg_frequency'] < 2.5 and row['avg_speed'] >= 20: return "Quadrant II – Detect & Monitor" elif row['avg_frequency'] >= 2.5 and row['avg_speed'] < 20: return "Quadrant III – Monitor" else: return "Quadrant IV – Low Control" risk_matrix['quadrant'] = risk_matrix.apply(assign_quadrant, axis=1) quadrant_count = risk_matrix['quadrant'].value_counts().reindex([ "Quadrant I – Prevent at Source", "Quadrant II – Detect & Monitor", "Quadrant III – Monitor", "Quadrant IV – Low Control" ], fill_value=0) # ================================ # 8. VISUAL SCATTER PLOT # ================================ fig = px.scatter( risk_matrix, x='avg_frequency', y='avg_speed', hover_name="Operator Name", title="Operator Risk Matrix: Frequency vs Speed", size=[12] * len(risk_matrix), size_max=15 ) max_x = risk_matrix['avg_frequency'].max() + 1 max_y = risk_matrix['avg_speed'].max() + 1 # ================================ # 9. Quadrant Coloring # ================================ fig.add_shape(type="rect", x0=2.5, x1=max_x, y0=20, y1=max_y, fillcolor="rgba(255,0,0,0.25)", line_width=0) # I fig.add_shape(type="rect", x0=0, x1=2.5, y0=20, y1=max_y, fillcolor="rgba(255,150,50,0.25)", line_width=0) # II fig.add_shape(type="rect", x0=2.5, x1=max_x, y0=0, y1=20, fillcolor="rgba(255,200,200,0.25)", line_width=0) # III fig.add_shape(type="rect", x0=0, x1=2.5, y0=0, y1=20, fillcolor="rgba(0,120,255,0.15)", line_width=0) # IV # Garis batas fig.add_vline(x=2.5, line_dash="dash", line_color="black") fig.add_hline(y=20, line_dash="dash", line_color="black") # ================================ # 10. Tampilkan Count di Quadrant # ================================ fig.add_annotation( x=2.5 + (max_x-2.5)/2, y=20 + (max_y-20)/2, text=f"{quadrant_count['Quadrant I – Prevent at Source']}", showarrow=False, font=dict(size=20, color="red") ) fig.add_annotation( x=2.5/2, y=20 + (max_y-20)/2, text=f"{quadrant_count['Quadrant II – Detect & Monitor']}", showarrow=False, font=dict(size=20, color="orange") ) fig.add_annotation( x=2.5 + (max_x-2.5)/2, y=0 + (20-0)/2, text=f"{quadrant_count['Quadrant III – Monitor']}", showarrow=False, font=dict(size=20, color="darkred") ) fig.add_annotation( x=2.5/2, y=0 + (20-0)/2, text=f"{quadrant_count['Quadrant IV – Low Control']}", showarrow=False, font=dict(size=20, color="blue") ) # ================================ # 11. Label Quadrant # ================================ fig.add_annotation(x=4, y=max_y-2, text="Quadrant I
Prevent at Source", showarrow=False, font=dict(size=12)) fig.add_annotation(x=1.25, y=max_y-2, text="Quadrant II
Detect & Monitor", showarrow=False, font=dict(size=12)) fig.add_annotation(x=4, y=5, text="Quadrant III
Monitor", showarrow=False, font=dict(size=12)) fig.add_annotation(x=1.25, y=5, text="Quadrant IV
Low Control", showarrow=False, font=dict(size=12)) fig.update_xaxes(dtick=1) fig.update_layout( xaxis_title="Average Frequency (Ceil)", yaxis_title="Average Speed (km/h)", height=650 ) st.plotly_chart(fig, use_container_width=True) # ================================ # 12. DISPLAY TABLE # ================================ st.subheader("Operator Hazard Summary Table (8 Weeks Observed)") table_display = ( risk_matrix[[ "Operator Name", "frequency_by_shift", "avg_frequency", "frequency_by_weeks", "avg_speed", "quadrant" ]] .rename(columns={ "frequency_by_shift": "Frequency by Shift", "avg_frequency": "Avg Frequency", "frequency_by_weeks": "Frequency by Weeks", "avg_speed": "Avg Speed", "quadrant":"Quadrant" }) ) st.dataframe( table_display.sort_values("Avg Frequency", ascending=False), use_container_width=True ) except Exception as e: st.error(f"⚠️ Error Risk Map Objective 4: {e}") st.exception(e) # st.exception(e) # Uncomment during development import streamlit as st import pandas as pd import numpy as np import plotly.graph_objects as go st.subheader("OBJECTIVE 5: See your team’s Fatigue Hazard Profile!") # Custom CSS β€” tetap seperti sebelumnya (sudah sesuai preferensi) st.markdown(""" """, unsafe_allow_html=True) # =============================================================== # LOGIC UTAMA # =============================================================== if df.empty: st.info("No data available after applying filters.") else: try: required = [col_operator, col_fleet_type, "start"] if not all(c in df.columns for c in required if c is not None): st.warning("Required columns (operator, fleet_type, start) are missing.") st.stop() df_op = df[[col_operator, col_fleet_type, "start"]].dropna() if df_op.empty: st.info("No operator data after filtering.") st.stop() if col_operator is None: st.error("Operator column could not be auto-detected. Please check your data.") st.stop() df_op["year_week"] = df_op["start"].dt.strftime("%Y-W%U") # Fuzzy match fleet names fleet_clean = df_op[col_fleet_type].str.strip().str.upper() df_op["is_ob"] = fleet_clean.str.contains(r"OB HAULLER", na=False) df_op["is_coal"] = fleet_clean.str.contains(r"HAULING COAL", na=False) ob_data = df_op[df_op["is_ob"]] coal_data = df_op[df_op["is_coal"]] def get_top10_with_slope(data): if data.empty: return pd.DataFrame() if col_operator not in data.columns: st.error(f"Operator column '{col_operator}' not found in data subset.") return pd.DataFrame() weekly = data.groupby([col_operator, "year_week"]).size().reset_index(name="weekly_sum") metrics = [] for nik, grp in weekly.groupby(col_operator): if pd.isna(nik): continue grp = grp.sort_values("year_week") counts = grp["weekly_sum"].values weeks = np.arange(len(counts)) weekly_avg = counts.mean() total_events = counts.sum() n_weeks = len(counts) if n_weeks >= 2: x_mean = weeks.mean() y_mean = counts.mean() numerator = np.sum((weeks - x_mean) * (counts - y_mean)) denominator = np.sum((weeks - x_mean) ** 2) slope = numerator / denominator if denominator != 0 else 0.0 else: slope = 0.0 # One Time Event metrics.append({ col_operator: nik, "weekly_avg": weekly_avg, "slope": slope, "total_events": total_events, "n_weeks": n_weeks }) if not metrics: return pd.DataFrame() return pd.DataFrame(metrics).nlargest(10, "weekly_avg") top_ob = get_top10_with_slope(ob_data) top_coal = get_top10_with_slope(coal_data) def get_all_operators_with_slope(data): if data.empty: return pd.DataFrame() if col_operator not in data.columns: return pd.DataFrame() weekly = data.groupby([col_operator, "year_week"]).size().reset_index(name="weekly_sum") metrics = [] for nik, grp in weekly.groupby(col_operator): if pd.isna(nik): continue grp = grp.sort_values("year_week") counts = grp["weekly_sum"].values weeks = np.arange(len(counts)) weekly_avg = counts.mean() total_events = counts.sum() n_weeks = len(counts) if n_weeks >= 2: slope = np.cov(weeks, counts)[0, 1] / np.var(weeks) if np.var(weeks) != 0 else 0.0 else: slope = 0.0 metrics.append({ col_operator: nik, "weekly_avg": weekly_avg, "slope": slope, "total_events": total_events, "n_weeks": n_weeks }) return pd.DataFrame(metrics) if metrics else pd.DataFrame() all_ob = get_all_operators_with_slope(ob_data) all_coal = get_all_operators_with_slope(coal_data) # =============================================================== # LEGEND β€” UPDATED: Stable β†’ One Time Event, Gray β†’ Yellow # =============================================================== st.subheader("Legend of Frequency Trends") st.markdown("""
Worsening Trends (Positive Slope):
Very High Worsening (β‰₯1.5)
High Worsening (1.0–1.5)
Moderate Worsening (0.5–1.0)
Slight Worsening (0–0.5)
Note: Positive slope indicates increasing fatigue event frequency over weeks.
Improving Trends (Negative Slope):
Excellent Improvement (β‰€βˆ’1.5)
Great Improvement (βˆ’1.5 to βˆ’1.0)
Good Improvement (βˆ’1.0 to βˆ’0.5)
Slight Improvement (βˆ’0.5 to 0)
Note: Negative slope reflects a consistent decline in fatigue events.
One-Time Events (Zero Slope):
One Time Event (0)
Note: Slope = 0 by definition when data exists for only one week β€” trend assessment is not applicable.
""", unsafe_allow_html=True) # βœ… Function definition at correct indentation level def plot_chart(data, title): if data.empty: fig = go.Figure() fig.add_annotation( text="No Data", x=0.5, y=0.5, showarrow=False, font_size=16 ) fig.update_layout( height=350, title=dict(text=title, x=0.5, xanchor='center') # ← Ini wajib ) return fig data_sorted = data.sort_values('weekly_avg', ascending=False) def get_color(slope): if slope == 0: return "#FFD700" # βœ… Yellow for One Time Event elif slope > 0: if slope < 0.5: return "#ffcdd2" elif slope < 1.0: return "#ef9a9a" elif slope < 1.5: return "#e57373" else: return "#d32f2f" else: # slope < 0 if slope > -0.5: return "#c8e6c9" elif slope > -1.0: return "#a5d6a7" elif slope > -1.5: return "#81c784" else: return "#388e3c" colors = [get_color(s) for s in data_sorted["slope"]] bar_trace = go.Bar( x=data_sorted[col_operator].astype(str), y=data_sorted["weekly_avg"], marker=dict(color=colors, line=dict(width=2, color="rgba(0,0,0,0.2)")), text=[f"{v:.1f}" for v in data_sorted["weekly_avg"]], textposition="outside", hovertemplate=( "%{x}
" + "Weekly Avg: %{y:.2f}
" + "Trend Slope: %{customdata[0]:+.3f}
" + "Total Events: %{customdata[1]}
" + "Weeks Active: %{customdata[2]}
" + "" ), customdata=np.stack([data_sorted["slope"], data_sorted["total_events"], data_sorted["n_weeks"]], axis=-1) ) fig = go.Figure(bar_trace) fig.update_layout( title=dict(text=f"{title}", x=0.5, xanchor='center'), # ← Ini wajib height=450, margin=dict(l=50, r=20, t=60, b=120), xaxis_title="Operator Name", yaxis_title="Weekly Avg Events", font=dict(family="Segoe UI", size=12), bargap=0.3, plot_bgcolor="rgba(0,0,0,0)", paper_bgcolor="rgba(0,0,0,0)", xaxis=dict(tickangle=45) ) return fig # =============================================================== # CHARTS # =============================================================== # =============================================================== # CHARTS # =============================================================== col1, col2 = st.columns(2) with col1: st.plotly_chart( plot_chart(top_ob, "OB HAULER Operator Hazard Profile"), use_container_width=True, # ← Ini penting! config={'displayModeBar': False} ) with col2: st.plotly_chart( plot_chart(top_coal, "COAL HAULING Operator Hazard Profile"), use_container_width=True, # ← Ini penting! config={'displayModeBar': False} ) # =============================================================== # AI INSIGHTS β€” DIPERBAIKI: Risk Summary jadi 1 box + 3 list # =============================================================== col_insight1, col_insight2 = st.columns(2) with col_insight1: if not top_ob.empty: st.markdown("### OB HAULER Analysis") ob_worsening = len(top_ob[top_ob['slope'] > 0]) ob_improving = len(top_ob[top_ob['slope'] < 0]) ob_one_time = len(top_ob[top_ob['slope'] == 0]) ob_avg_risk = top_ob['weekly_avg'].mean() ob_max_risk = top_ob['weekly_avg'].max() ob_insights = [] if ob_worsening > ob_improving: ob_insights.append(f"{ob_worsening} out of 10 top risk operators are showing worsening trends.") else: ob_insights.append(f"{ob_improving} out of 10 top risk operators are showing improvement.") if ob_one_time > 0: ob_insights.append(f"{ob_one_time} operators are classified as One Time Event (single-week activity).") else: ob_insights.append("No operators classified as One Time Event.") ob_insights.append(f"Average risk: {ob_avg_risk:.2f} events/week (max: {ob_max_risk:.2f}).") st.markdown(f"""
Hazard Summary
""", unsafe_allow_html=True) else: st.info("No OB HAULER data for analysis.") with col_insight2: if not top_coal.empty: st.markdown("### HAULING COAL Analysis") coal_worsening = len(top_coal[top_coal['slope'] > 0]) coal_improving = len(top_coal[top_coal['slope'] < 0]) coal_one_time = len(top_coal[top_coal['slope'] == 0]) coal_avg_risk = top_coal['weekly_avg'].mean() coal_max_risk = top_coal['weekly_avg'].max() coal_insights = [] if coal_worsening > coal_improving: coal_insights.append(f"{coal_worsening} out of 10 top risk operators are showing worsening trends.") else: coal_insights.append(f"{coal_improving} out of 10 top risk operators are showing improvement.") if coal_one_time > 0: coal_insights.append(f"{coal_one_time} operators are classified as One Time Event (single-week activity).") else: coal_insights.append("No operators classified as One Time Event.") coal_insights.append(f"Average risk: {coal_avg_risk:.2f} events/week (max: {coal_max_risk:.2f}).") st.markdown(f"""
Hazard Summary
""", unsafe_allow_html=True) else: st.info("No HAULING COAL data for analysis.") # =============================================================== # RECOMMENDATIONS β€” DIPERBARUI: 3 list per fleet, sesuai 3 poin Risk Summary # =============================================================== col_rec1, col_rec2 = st.columns(2) with col_rec1: if not top_ob.empty: w = len(top_ob[top_ob['slope'] > 0]) ot = len(top_ob[top_ob['slope'] == 0]) avg = top_ob['weekly_avg'].mean() max_risk = top_ob['weekly_avg'].max() # 3 rekomendasi, paralel dengan 3 poin Risk Summary rec_list = [] # 1. Trend-driven action if w > 5: rec_list.append("Conduct targeted fatigue risk assessments for operators with worsening trends (slope > 0).") elif w > 0 and w <= 5: rec_list.append("Monitor worsening-trend operators weekly and schedule supervisor check-ins.") elif w == 0 and len(top_ob[top_ob['slope'] < 0]) > 0: rec_list.append("Recognize improving operators β€” consider sharing best practices internally.") else: rec_list.append("Maintain current monitoring for stable trend profile.") # 2. One-Time Event follow-up if ot > 0: rec_list.append(f"Re-engage {ot} One Time Event operators to verify data completeness and activity status.") else: rec_list.append("Trend analysis is reliable β€” all operators have multi-week activity.") # 3. Benchmark & sustain if avg > 8: rec_list.append("Initiate immediate review of shift scheduling and rest-break compliance.") elif avg > 5: rec_list.append("Conduct monthly fatigue KPI review using cohort average as baseline.") else: rec_list.append("Sustain current protocols β€” risk level is within acceptable range.") # Ensure exactly 3 items while len(rec_list) < 3: rec_list.append("β€”") st.markdown("### OB HAULER Recommendations") st.markdown(f"""
Action Plan
""", unsafe_allow_html=True) else: st.info("No OB HAULER recommendations.") with col_rec2: if not top_coal.empty: w = len(top_coal[top_coal['slope'] > 0]) ot = len(top_coal[top_coal['slope'] == 0]) avg = top_coal['weekly_avg'].mean() max_risk = top_coal['weekly_avg'].max() rec_list = [] # 1. Trend-driven action if w > 5: rec_list.append("Conduct targeted fatigue risk assessments for operators with worsening trends (slope > 0).") elif w > 0 and w <= 5: rec_list.append("Monitor worsening-trend operators weekly and schedule supervisor check-ins.") elif w == 0 and len(top_coal[top_coal['slope'] < 0]) > 0: rec_list.append("Recognize improving operators β€” consider sharing best practices internally.") else: rec_list.append("Maintain current monitoring for stable trend profile.") # 2. One-Time Event follow-up if ot > 0: rec_list.append(f"Re-engage {ot} One Time Event operators to verify data completeness and activity status.") else: rec_list.append("Trend analysis is reliable β€” all operators have multi-week activity.") # 3. Benchmark & sustain if avg > 8: rec_list.append("Initiate immediate review of shift scheduling and rest-break compliance.") elif avg > 5: rec_list.append("Conduct monthly fatigue KPI review using cohort average as baseline.") else: rec_list.append("Sustain current protocols β€” risk level is within acceptable range.") while len(rec_list) < 3: rec_list.append("β€”") st.markdown("### HAULING COAL Recommendations") st.markdown(f"""
Action Plan
""", unsafe_allow_html=True) else: st.info("No HAULING COAL recommendations.") except Exception as e: st.error(f"Error in Top 10 Operator analysis: {str(e)}") st.exception(e) # # =================== OBJECTIVE 6: Automated Insights & AI Recommendations ===================== # st.subheader("OBJECTIVE 6: Instant Insights & Recommendations") # # Membagi tampilan menjadi dua kolom # col_insights, col_recs = st.columns(2) # # ===================================================================== # # πŸ”Ή KOLOM KIRI β€” INSIGHTS BY ADVANCED ANALYTICS # # ===================================================================== # with col_insights: # st.subheader("Insights by Advanced Analytics") # # ===================== 1. Critical Hour Analysis ===================== # critical_hours = [2, 3, 4, 5] # critical_alerts = df[df['hour'].isin(critical_hours)] # critical_pct = (len(critical_alerts) / len(df)) * 100 if len(df) > 0 else 0 # st.markdown(f"**Critical Hour Risk (3-6 AM)**") # bg_color = ( # "#ffcccc" if critical_pct > 50 else # "#ffebcc" if critical_pct > 25 else # "#ffffcc" if critical_pct > 10 else # "#e6ffe6" # ) # st.markdown( # f'
' # f'Critical Hour Alerts: {len(critical_alerts)} ({critical_pct:.1f}% of total alerts)
', # unsafe_allow_html=True # ) # if critical_pct > 10: # st.warning( # f"High risk: {critical_pct:.1f}% of fatigue alerts occur during critical hours (3-6 AM). " # f"This is a known circadian dip period." # ) # else: # st.info( # f"{critical_pct:.1f}% of alerts occur during critical hours. This is within acceptable range." # ) # # ===================== 2. High-Speed Fatigue Analysis ===================== # if col_speed and col_speed in df.columns: # high_speed_threshold = 20 # high_speed_fatigue = df[df[col_speed] >= high_speed_threshold] # high_speed_pct = (len(high_speed_fatigue) / len(df)) * 100 if len(df) > 0 else 0 # st.markdown(f"**High-Speed Fatigue Risk (Speed > {high_speed_threshold} km/h)**") # st.markdown( # f""" #
{len(high_speed_fatigue)}
#
↑ {high_speed_pct:.1f}% of total alerts
# """, # unsafe_allow_html=True # ) # if high_speed_pct > 20: # st.warning( # f"High risk: {high_speed_pct:.1f}% of fatigue alerts occur at high speeds. " # f"This increases accident severity potential." # ) # else: # st.info( # f"{high_speed_pct:.1f}% of alerts occur at high speeds. This is within acceptable range." # ) # else: # st.info("Speed data not available for High-Speed Fatigue Analysis.") # # ===================== 3. Shift Pattern Analysis ===================== # if col_shift and col_shift in df.columns: # shift_counts = df[col_shift].value_counts() # st.markdown(f"**Shift Pattern Risk**") # for shift_val in shift_counts.index: # shift_pct = (shift_counts[shift_val] / len(df)) * 100 # st.markdown( # f""" #
{shift_counts[shift_val]}
#
↑ {shift_pct:.1f}% of total alerts
# """, # unsafe_allow_html=True # ) # if shift_pct > 50: # st.warning( # f"Shift {shift_val} has disproportionately high alerts ({shift_pct:.1f}%). " # f"Review shift scheduling and workload." # ) # else: # st.info( # f"Shift {shift_val} alert distribution is acceptable ({shift_pct:.1f}%)." # ) # else: # st.info("Shift data not available for Shift Pattern Analysis.") # # ===================== 4. Operator Risk Profiling ===================== # if col_operator and col_operator in df.columns: # operator_alerts = df[col_operator].value_counts() # top_risk_operators = operator_alerts.head(5) # st.markdown("**High-Risk Operator Identification**") # colors = ["#d32f2f", "#e57373", "#ef9a9a", "#ffcdd2", "#ffe1e4"] # for idx, (op_name, count) in enumerate(top_risk_operators.items()): # op_pct = (count / len(df)) * 100 # color = colors[idx] if idx < len(colors) else colors[-1] # st.markdown( # f"**Operator:** {op_name} \n**Alerts:** {count}" # ) # st.markdown( # f"Share: " # f"{op_pct:.1f}% of total alerts", # unsafe_allow_html=True # ) # if op_pct > 5: # st.warning( # f"Operator {op_name} has high fatigue risk ({op_pct:.1f}%). " # f"Consider coaching or rest plan." # ) # else: # st.info( # f"Operator {op_name} fatigue risk is within acceptable range ({op_pct:.1f}%)." # ) # else: # st.info("Operator data not available for Operator Risk Profiling.") # # ===================================================================== # # πŸ”Ή KOLOM KANAN β€” AI RECOMMENDATIONS (PER INSIGHT + PER OPERATOR) # # ===================================================================== # with col_recs: # st.subheader("Recommendations") # ai_recommendations = [] # # 1. Critical Hour Insight β†’ AI Rec # if "hour" in df.columns and not df.empty: # peak_hour = df["hour"].value_counts().idxmax() # critical_hours = [2, 3, 4, 5] # if peak_hour in critical_hours: # ai_recommendations.append({ # "action": "Deploy enhanced fatigue monitoring systems during 3-6 AM.", # "data_point": f"Critical Hour Alerts: {len(critical_alerts)} ({critical_pct:.1f}%)", # "reasoning": "High percentage of alerts during circadian low period." # }) # else: # ai_recommendations.append({ # "action": "Monitor fatigue patterns around peak hour (Hour {peak_hour}).", # "data_point": f"Peak Hour: {peak_hour}:00 β€” {df['hour'].value_counts()[peak_hour]} alerts", # "reasoning": "This hour shows highest fatigue occurrence." # }) # # 2. High-Speed Insight β†’ AI Rec # if col_speed and col_speed in df.columns and not df.empty: # high_speed_threshold = 20 # high_speed_fatigue = df[df[col_speed] >= high_speed_threshold] # high_speed_pct = (len(high_speed_fatigue) / len(df)) * 100 if len(df) > 0 else 0 # if high_speed_pct > 20: # ai_recommendations.append({ # "action": "Implement speed-reduction protocols during fatigue-prone hours.", # "data_point": f"High-Speed Alerts: {len(high_speed_fatigue)} ({high_speed_pct:.1f}%)", # "reasoning": "High-speed alerts increase accident severity potential." # }) # else: # ai_recommendations.append({ # "action": "Maintain current speed monitoring β€” risk level is acceptable.", # "data_point": f"High-Speed Alerts: {len(high_speed_fatigue)} ({high_speed_pct:.1f}%)", # "reasoning": "Current high-speed fatigue rate is within acceptable range." # }) # # 3. Shift Pattern Insight β†’ AI Rec # if col_shift and col_shift in df.columns and not df.empty: # worst_shift = df[col_shift].value_counts().idxmax() # shift_pct = (df[col_shift].value_counts()[worst_shift] / len(df)) * 100 # if shift_pct > 50: # ai_recommendations.append({ # "action": "Review shift rotation schedules for Shift {worst_shift}.", # "data_point": f"Shift {worst_shift}: {df[col_shift].value_counts()[worst_shift]} alerts ({shift_pct:.1f}%)", # "reasoning": "Disproportionately high fatigue alerts indicate scheduling imbalance." # }) # else: # ai_recommendations.append({ # "action": "Continue monitoring all shifts β€” no dominant risk identified.", # "data_point": f"Shift {worst_shift}: {df[col_shift].value_counts()[worst_shift]} alerts ({shift_pct:.1f}%)", # "reasoning": "Shift distribution is balanced." # }) # # 4. Operator Risk Profiling β†’ AI Rec for EACH of Top 5 Operators # if col_operator and col_operator in df.columns and not df.empty: # top_operators = df[col_operator].value_counts().head(5) # for op_name, count in top_operators.items(): # op_pct = (count / len(df)) * 100 # if op_pct > 5: # ai_recommendations.append({ # "action": f"Coaching or mandatory rest for Operator {op_name}.", # "data_point": f"Operator {op_name}: {count} alerts ({op_pct:.1f}%)", # "reasoning": f"Operator has high fatigue alerts β€” requires individual intervention." # }) # else: # ai_recommendations.append({ # "action": f"Continue general monitoring for Operator {op_name}.", # "data_point": f"Operator {op_name}: {count} alerts ({op_pct:.1f}%)", # "reasoning": f"Risk is within acceptable range β€” no urgent action needed." # }) # # Render each recommendation as a card # for rec in ai_recommendations: # # Highlight percentages in red # data_point_colored = rec['data_point'].replace( # f"({rec['data_point'].split('(')[-1]}", # f"({rec['data_point'].split('(')[-1]}" # ).replace(")", ")") # reasoning_colored = rec['reasoning'].replace( # f"({rec['reasoning'].split('(')[-1]}", # f"({rec['reasoning'].split('(')[-1]}" # ).replace(")", ")") # st.markdown( # f""" #
#
# AI Recommendation #
#
# Action: {rec['action']} #
#
# Data Point: {data_point_colored} #
#
# AI Reasoning: {reasoning_colored} #
#
# """, # unsafe_allow_html=True # ) # if not ai_recommendations: # st.info( # "No specific data points available for AI recommendations. " # "Ensure relevant columns are present (hour, shift, operator, duration, speed)." # ) # # ================= FOOTER =========================== # st.markdown("---") # st.markdown( # '', # unsafe_allow_html=True # ) # # =================== OBJECTIVE 6: Automated Insights & AI Recommendations ===================== # st.subheader("OBJECTIVE 6: Instant Insights & Recommendations") # # Membagi tampilan menjadi dua kolom # col_insights, col_recs = st.columns(2) # # ===================================================================== # # πŸ”Ή KOLOM KIRI β€” INSIGHTS BY ADVANCED ANALYTICS # # ===================================================================== # with col_insights: # st.subheader("Insights by Advanced Analytics") # # ===================== 1. Critical Hour Analysis ===================== # critical_hours = [2, 3, 4, 5] # critical_alerts = df[df['hour'].isin(critical_hours)] # critical_pct = (len(critical_alerts) / len(df)) * 100 if len(df) > 0 else 0 # st.markdown(f"**Critical Hour Risk (3-6 AM)**") # bg_color = ( # "#ffcccc" if critical_pct > 50 else # "#ffebcc" if critical_pct > 25 else # "#ffffcc" if critical_pct > 10 else # "#e6ffe6" # ) # st.markdown( # f'
' # f'Critical Hour Alerts: {len(critical_alerts)} ({critical_pct:.1f}% of total alerts)
', # unsafe_allow_html=True # ) # if critical_pct > 10: # st.warning( # f"High risk: {critical_pct:.1f}% of fatigue alerts occur during critical hours (3-6 AM). " # f"This is a known circadian dip period." # ) # else: # st.info( # f"{critical_pct:.1f}% of alerts occur during critical hours. This is within acceptable range." # ) # # ===================== 2. High-Speed Fatigue Analysis ===================== # if col_speed and col_speed in df.columns: # high_speed_threshold = 20 # high_speed_fatigue = df[df[col_speed] >= high_speed_threshold] # high_speed_pct = (len(high_speed_fatigue) / len(df)) * 100 if len(df) > 0 else 0 # st.markdown(f"**High-Speed Fatigue Risk (Speed > {high_speed_threshold} km/h)**") # st.markdown( # f""" #
{len(high_speed_fatigue)}
#
↑ {high_speed_pct:.1f}% of total alerts
# """, # unsafe_allow_html=True # ) # if high_speed_pct > 20: # st.warning( # f"High risk: {high_speed_pct:.1f}% of fatigue alerts occur at high speeds. " # f"This increases accident severity potential." # ) # else: # st.info( # f"{high_speed_pct:.1f}% of alerts occur at high speeds. This is within acceptable range." # ) # else: # st.info("Speed data not available for High-Speed Fatigue Analysis.") # # ===================== 3. Shift Pattern Analysis ===================== # if col_shift and col_shift in df.columns: # shift_counts = df[col_shift].value_counts() # st.markdown(f"**Shift Pattern Risk**") # for shift_val in shift_counts.index: # shift_pct = (shift_counts[shift_val] / len(df)) * 100 # st.markdown( # f""" #
{shift_counts[shift_val]}
#
↑ {shift_pct:.1f}% of total alerts
# """, # unsafe_allow_html=True # ) # if shift_pct > 50: # st.warning( # f"Shift {shift_val} has disproportionately high alerts ({shift_pct:.1f}%). " # f"Review shift scheduling and workload." # ) # else: # st.info( # f"Shift {shift_val} alert distribution is acceptable ({shift_pct:.1f}%)." # ) # else: # st.info("Shift data not available for Shift Pattern Analysis.") # # ===================== 4. Operator Risk Profiling ===================== # if col_operator and col_operator in df.columns: # operator_alerts = df[col_operator].value_counts() # top_risk_operators = operator_alerts.head(5) # st.markdown("**High-Risk Operator Identification**") # colors = ["#d32f2f", "#e57373", "#ef9a9a", "#ffcdd2", "#ffe1e4"] # for idx, (op_name, count) in enumerate(top_risk_operators.items()): # op_pct = (count / len(df)) * 100 # color = colors[idx] if idx < len(colors) else colors[-1] # st.markdown( # f"**Operator:** {op_name} \n**Alerts:** {count}" # ) # st.markdown( # f"Share: " # f"{op_pct:.1f}% of total alerts", # unsafe_allow_html=True # ) # if op_pct > 5: # st.warning( # f"Operator {op_name} has high fatigue risk ({op_pct:.1f}%). " # f"Consider coaching or rest plan." # ) # else: # st.info( # f"Operator {op_name} fatigue risk is within acceptable range ({op_pct:.1f}%)." # ) # else: # st.info("Operator data not available for Operator Risk Profiling.") # # ===================================================================== # # πŸ”Ή KOLOM KANAN β€” AI RECOMMENDATIONS # # ===================================================================== # with col_recs: # st.subheader("Recommendations") # ai_recommendations = [] # # 1. Critical Hour Insight β†’ AI Rec # if "hour" in df.columns and not df.empty: # peak_hour = df["hour"].value_counts().idxmax() # critical_hours = [2, 3, 4, 5] # if peak_hour in critical_hours: # ai_recommendations.append({ # "type": "critical_hour", # "action": "Deploy enhanced fatigue monitoring systems during 3-6 AM.", # "data_point": f"Critical Hour Alerts: {len(critical_alerts)} ({critical_pct:.1f}%)", # "reasoning": "High percentage of alerts during circadian low period." # }) # else: # ai_recommendations.append({ # "type": "critical_hour", # "action": "Monitor fatigue patterns around peak hour (Hour {peak_hour}).", # "data_point": f"Peak Hour: {peak_hour}:00 β€” {df['hour'].value_counts()[peak_hour]} alerts", # "reasoning": "This hour shows highest fatigue occurrence." # }) # # 2. High-Speed Insight β†’ AI Rec # if col_speed and col_speed in df.columns and not df.empty: # high_speed_threshold = 20 # high_speed_fatigue = df[df[col_speed] >= high_speed_threshold] # high_speed_pct = (len(high_speed_fatigue) / len(df)) * 100 if len(df) > 0 else 0 # if high_speed_pct > 20: # ai_recommendations.append({ # "type": "high_speed", # "action": "Implement speed-reduction protocols during fatigue-prone hours.", # "data_point": f"High-Speed Alerts: {len(high_speed_fatigue)} ({high_speed_pct:.1f}%)", # "reasoning": "High-speed alerts increase accident severity potential." # }) # else: # ai_recommendations.append({ # "type": "high_speed", # "action": "Maintain current speed monitoring β€” risk level is acceptable.", # "data_point": f"High-Speed Alerts: {len(high_speed_fatigue)} ({high_speed_pct:.1f}%)", # "reasoning": "Current high-speed fatigue rate is within acceptable range." # }) # # 3. Shift Pattern Insight β†’ AI Rec # if col_shift and col_shift in df.columns and not df.empty: # worst_shift = df[col_shift].value_counts().idxmax() # shift_pct = (df[col_shift].value_counts()[worst_shift] / len(df)) * 100 # if shift_pct > 50: # ai_recommendations.append({ # "type": "shift_pattern", # "action": "Review shift rotation schedules for Shift {worst_shift}.", # "data_point": f"Shift {worst_shift}: {df[col_shift].value_counts()[worst_shift]} alerts ({shift_pct:.1f}%)", # "reasoning": "Disproportionately high fatigue alerts indicate scheduling imbalance." # }) # else: # ai_recommendations.append({ # "type": "shift_pattern", # "action": "Continue monitoring all shifts β€” no dominant risk identified.", # "data_point": f"Shift {worst_shift}: {df[col_shift].value_counts()[worst_shift]} alerts ({shift_pct:.1f}%)", # "reasoning": "Shift distribution is balanced." # }) # # 4. Operator Risk Profiling β†’ Simple Recommendations (No AI Reasoning, No Box) # if col_operator and col_operator in df.columns and not df.empty: # top_operators = df[col_operator].value_counts().head(5) # for op_name, count in top_operators.items(): # op_pct = (count / len(df)) * 100 # if op_pct > 5: # ai_recommendations.append({ # "type": "operator", # "action": f"Coaching or mandatory rest for Operator {op_name}.", # "data_point": f"Operator {op_name}: {count} alerts ({op_pct:.1f}%)" # }) # else: # ai_recommendations.append({ # "type": "operator", # "action": f"Continue general monitoring for Operator {op_name}.", # "data_point": f"Operator {op_name}: {count} alerts ({op_pct:.1f}%)" # }) # # Render each recommendation based on type # for rec in ai_recommendations: # if rec["type"] == "operator": # # Simple format: Action + Data Point only # data_point_colored = rec['data_point'].replace( # f"({rec['data_point'].split('(')[-1]}", # f"({rec['data_point'].split('(')[-1]}" # ).replace(")", ")") # st.markdown( # f""" #
# Action: {rec['action']}
# Data Point: {data_point_colored} #
# """, # unsafe_allow_html=True # ) # else: # # Standard format with AI Reasoning and box # data_point_colored = rec['data_point'].replace( # f"({rec['data_point'].split('(')[-1]}", # f"({rec['data_point'].split('(')[-1]}" # ).replace(")", ")") # reasoning_colored = rec['reasoning'].replace( # f"({rec['reasoning'].split('(')[-1]}", # f"({rec['reasoning'].split('(')[-1]}" # ).replace(")", ")") # st.markdown( # f""" #
#
# AI Recommendation #
#
# Action: {rec['action']} #
#
# Data Point: {data_point_colored} #
#
# AI Reasoning: {reasoning_colored} #
#
# """, # unsafe_allow_html=True # ) # if not ai_recommendations: # st.info( # "No specific data points available for AI recommendations. " # "Ensure relevant columns are present (hour, shift, operator, duration, speed)." # ) # # ================= FOOTER =========================== # st.markdown("---") # st.markdown( # '', # unsafe_allow_html=True # ) # =================== OBJECTIVE 6: Automated Insights & AI Recommendations ===================== st.subheader("OBJECTIVE 6: Instant Insights & Recommendations") # Membagi tampilan menjadi dua kolom col_insights, col_recs = st.columns(2) # ===================================================================== # πŸ”Ή KOLOM KIRI β€” INSIGHTS BY ADVANCED ANALYTICS (TANPA SEMUA KOTAK BIRU) # ===================================================================== with col_insights: st.subheader("Insights by Advanced Analytics") # ===================== 1. Critical Hour Analysis ===================== critical_hours = [2, 3, 4, 5] critical_alerts = df[df['hour'].isin(critical_hours)] critical_pct = (len(critical_alerts) / len(df)) * 100 if len(df) > 0 else 0 st.markdown(f"**Critical Hour Risk (3-6 AM)**") bg_color = ( "#ffcccc" if critical_pct > 50 else "#ffebcc" if critical_pct > 25 else "#ffffcc" if critical_pct > 10 else "#e6ffe6" ) st.markdown( f'
' f'Critical Hour Alerts: {len(critical_alerts)} ({critical_pct:.1f}% of total alerts)
', unsafe_allow_html=True ) if critical_pct > 10: st.warning( f"High risk: {critical_pct:.1f}% of fatigue alerts occur during critical hours (3-6 AM). " f"This is a known circadian dip period." ) else: st.info( f"{critical_pct:.1f}% of alerts occur during critical hours. This is within acceptable range." ) # ===================== 2. High-Speed Fatigue Analysis ===================== if col_speed and col_speed in df.columns: high_speed_threshold = 20 high_speed_fatigue = df[df[col_speed] >= high_speed_threshold] high_speed_pct = (len(high_speed_fatigue) / len(df)) * 100 if len(df) > 0 else 0 st.markdown(f"**High-Speed Fatigue Risk (Speed > {high_speed_threshold} km/h)**") st.markdown( f"""
{len(high_speed_fatigue)}
↑ {high_speed_pct:.1f}% of total alerts
""", unsafe_allow_html=True ) if high_speed_pct > 20: st.warning( f"High risk: {high_speed_pct:.1f}% of fatigue alerts occur at high speeds. " f"This increases accident severity potential." ) else: st.info( f"{high_speed_pct:.1f}% of alerts occur at high speeds. This is within acceptable range." ) else: st.info("Speed data not available for High-Speed Fatigue Analysis.") # ===================== 3. Shift Pattern Analysis ===================== if col_shift and col_shift in df.columns: shift_counts = df[col_shift].value_counts() st.markdown(f"**Shift Pattern Risk**") for shift_val in shift_counts.index: shift_pct = (shift_counts[shift_val] / len(df)) * 100 st.markdown( f"""
{shift_counts[shift_val]}
↑ {shift_pct:.1f}% of total alerts
""", unsafe_allow_html=True ) if shift_pct > 50: st.warning( f"Shift {shift_val} has disproportionately high alerts ({shift_pct:.1f}%). " f"Review shift scheduling and workload." ) else: st.info( f"Shift {shift_val} alert distribution is acceptable ({shift_pct:.1f}%)." ) else: st.info("Shift data not available for Shift Pattern Analysis.") # ===================== 4. Operator Risk Profiling ===================== if col_operator and col_operator in df.columns: operator_alerts = df[col_operator].value_counts() top_risk_operators = operator_alerts.head(5) st.markdown("**High-Risk Operator Identification**") colors = ["#d32f2f", "#e57373", "#ef9a9a", "#ffcdd2", "#ffe1e4"] for idx, (op_name, count) in enumerate(top_risk_operators.items()): op_pct = (count / len(df)) * 100 color = colors[idx] if idx < len(colors) else colors[-1] st.markdown( f"**Operator:** {op_name} \n**Alerts:** {count}" ) st.markdown( f"Share: " f"{op_pct:.1f}% of total alerts", unsafe_allow_html=True ) if op_pct > 5: st.warning( f"Operator {op_name} has high fatigue risk ({op_pct:.1f}%). " f"Consider coaching or rest plan." ) else: # ❌ HILANGKAN TEKS "is within acceptable range" DAN KOTAK BIRU # Hanya tampilkan nama operator + alert count β€” tanpa tambahan teks st.markdown( f"Operator {op_name}: {count} alerts ({op_pct:.1f}%)", unsafe_allow_html=True ) else: st.info("Operator data not available for Operator Risk Profiling.") # ===================================================================== # πŸ”Ή KOLOM KANAN β€” AI RECOMMENDATIONS (TIDAK BERUBAH) # ===================================================================== with col_recs: st.subheader("Recommendations") ai_recommendations = [] # 1. Critical Hour Insight β†’ AI Rec if "hour" in df.columns and not df.empty: peak_hour = df["hour"].value_counts().idxmax() critical_hours = [2, 3, 4, 5] if peak_hour in critical_hours: ai_recommendations.append({ "type": "critical_hour", "action": "Deploy enhanced fatigue monitoring systems during 3-6 AM.", "data_point": f"Critical Hour Alerts: {len(critical_alerts)} ({critical_pct:.1f}%)", "reasoning": "High percentage of alerts during circadian low period." }) else: ai_recommendations.append({ "type": "critical_hour", "action": "Monitor fatigue patterns around peak hour (Hour {peak_hour}).", "data_point": f"Peak Hour: {peak_hour}:00 β€” {df['hour'].value_counts()[peak_hour]} alerts", "reasoning": "This hour shows highest fatigue occurrence." }) # 2. High-Speed Insight β†’ AI Rec if col_speed and col_speed in df.columns and not df.empty: high_speed_threshold = 20 high_speed_fatigue = df[df[col_speed] >= high_speed_threshold] high_speed_pct = (len(high_speed_fatigue) / len(df)) * 100 if len(df) > 0 else 0 if high_speed_pct > 20: ai_recommendations.append({ "type": "high_speed", "action": "Implement speed-reduction protocols during fatigue-prone hours.", "data_point": f"High-Speed Alerts: {len(high_speed_fatigue)} ({high_speed_pct:.1f}%)", "reasoning": "High-speed alerts increase accident severity potential." }) else: ai_recommendations.append({ "type": "high_speed", "action": "Maintain current speed monitoring β€” risk level is acceptable.", "data_point": f"High-Speed Alerts: {len(high_speed_fatigue)} ({high_speed_pct:.1f}%)", "reasoning": "Current high-speed fatigue rate is within acceptable range." }) # 3. Shift Pattern Insight β†’ AI Rec if col_shift and col_shift in df.columns and not df.empty: worst_shift = df[col_shift].value_counts().idxmax() shift_pct = (df[col_shift].value_counts()[worst_shift] / len(df)) * 100 if shift_pct > 50: ai_recommendations.append({ "type": "shift_pattern", "action": "Review shift rotation schedules for Shift {worst_shift}.", "data_point": f"Shift {worst_shift}: {df[col_shift].value_counts()[worst_shift]} alerts ({shift_pct:.1f}%)", "reasoning": "Disproportionately high fatigue alerts indicate scheduling imbalance." }) else: ai_recommendations.append({ "type": "shift_pattern", "action": "Continue monitoring all shifts β€” no dominant risk identified.", "data_point": f"Shift {worst_shift}: {df[col_shift].value_counts()[worst_shift]} alerts ({shift_pct:.1f}%)", "reasoning": "Shift distribution is balanced." }) # 4. Operator Risk Profiling β†’ Simple Recommendations (No AI Reasoning, No Box) if col_operator and col_operator in df.columns and not df.empty: top_operators = df[col_operator].value_counts().head(5) for op_name, count in top_operators.items(): op_pct = (count / len(df)) * 100 if op_pct > 5: ai_recommendations.append({ "type": "operator", "action": f"Coaching or mandatory rest for Operator {op_name}.", "data_point": f"Operator {op_name}: {count} alerts ({op_pct:.1f}%)" }) else: ai_recommendations.append({ "type": "operator", "action": f"Continue general monitoring for Operator {op_name}.", "data_point": f"Operator {op_name}: {count} alerts ({op_pct:.1f}%)" }) # Render each recommendation based on type for rec in ai_recommendations: if rec["type"] == "operator": # Simple format: Action + Data Point only data_point_colored = rec['data_point'].replace( f"({rec['data_point'].split('(')[-1]}", f"({rec['data_point'].split('(')[-1]}" ).replace(")", ")") st.markdown( f"""
Action: {rec['action']}
Data Point: {data_point_colored}
""", unsafe_allow_html=True ) else: # Standard format with AI Reasoning and box data_point_colored = rec['data_point'].replace( f"({rec['data_point'].split('(')[-1]}", f"({rec['data_point'].split('(')[-1]}" ).replace(")", ")") reasoning_colored = rec['reasoning'].replace( f"({rec['reasoning'].split('(')[-1]}", f"({rec['reasoning'].split('(')[-1]}" ).replace(")", ")") st.markdown( f"""
AI Recommendation
Action: {rec['action']}
Data Point: {data_point_colored}
AI Reasoning: {reasoning_colored}
""", unsafe_allow_html=True ) if not ai_recommendations: st.info( "No specific data points available for AI recommendations. " "Ensure relevant columns are present (hour, shift, operator, duration, speed)." ) # ================= FOOTER =========================== st.markdown("---") st.markdown( '', unsafe_allow_html=True )