import streamlit as st import pandas as pd import plotly.express as px import plotly.graph_objects as go from datetime import datetime, timedelta import requests import json import numpy as np import math import base64 # =================== CONFIG ===================== st.set_page_config( page_title="Fatigue Analyzer - Advanced Fatigue Anfalytics", page_icon="πŸ›‘οΈ", # Safety icon layout="wide", initial_sidebar_state="expanded" ) # =================== LOGO ===================== logo_path = "btech.png" # File logo def get_base64(file_path): with open(file_path, "rb") as f: data = f.read() return base64.b64encode(data).decode() try: logo_base64 = get_base64(logo_path) logo_html = f'' except FileNotFoundError: st.warning(f"Logo file '{logo_path}' not found. Using placeholder text.") logo_html = '
BTECH
' # # =================== GLOBAL CSS ===================== # st.markdown(""" # # """, unsafe_allow_html=True) # # =================== HEADER ===================== # st.markdown(f""" #
#
#

Safety Analysis and AI - Advanced Fatigue Analysis

#

Proactive Safety Intelligence for Mining Operations

#
# #
# """, unsafe_allow_html=True) # =================== GLOBAL CSS ===================== st.markdown(""" """, unsafe_allow_html=True) # =================== HEADER ===================== st.markdown(f"""

Safety Analysis and AI - Advanced Fatigue Analysis

Proactive Safety Intelligence for Mining Operations

""", unsafe_allow_html=True) # # ... (Kode selanjutnya disalin dari bagian bawah file Anda, misalnya LOAD DATA ke bawah) # # =================== LOAD DATA ====================== @st.cache_data def load_data(): try: # ================================== # 1. LOAD CSV & NORMALIZE COLUMNS # ================================== df = pd.read_csv("data.csv") original_columns = df.columns.tolist() # Normalize: lower, strip, underscore df.columns = ( df.columns.astype(str) .str.strip() .str.lower() .str.replace(r"\s+", "_", regex=True) ) # ================================== # 2. AUTO-DETECT COLUMNS (case-insensitive) # ================================== col_operator = next((c for c in df.columns if "operator" in c or "driver" in c), None) col_shift = next((c for c in df.columns if "shift" in c), None) # βœ… FIX: Search for normalized "parent_fleet", NOT original "Parent Fleet" col_fleet_type = next((c for c in df.columns if "parent_fleet" in c), None) col_fleet_no = next((c for c in df.columns if "fleet_number" in c), None) # ================================== # 3. DERIVE COLUMNS # ================================== # Unit Number if col_fleet_no: df["unit_no"] = df[col_fleet_no].astype(str).str.split("-", n=1).str[-1].str.strip() else: df["unit_no"] = "UNKNOWN" # Speed col_speed = None for orig in original_columns: norm = orig.lower().replace(" ", "_") if "(in_km/hour).1" in norm or "speed" in norm: if norm in df.columns: col_speed = norm break if not col_speed: col_speed = next((c for c in df.columns if "speed" in c), None) # Time time_cols = [c for c in df.columns if "gmt" in c and "wita" in c] if len(time_cols) >= 2: df["start"] = pd.to_datetime(df[time_cols[0]], errors="coerce") df["end"] = pd.to_datetime(df[time_cols[1]], errors="coerce") elif len(time_cols) == 1: df["start"] = pd.to_datetime(df[time_cols[0]], errors="coerce") df["end"] = df["start"] + pd.Timedelta(minutes=1) else: df["start"] = pd.NaT df["end"] = pd.NaT # Time features if not df["start"].isna().all(): df["hour"] = df["start"].dt.hour df["date"] = df["start"].dt.date df["day_of_week"] = df["start"].dt.day_name() # df["week"], df["month"], df["year"] β€” optional, not used in filters else: df["hour"] = 0 df["date"] = None # Shift as int if col_shift: df[col_shift] = pd.to_numeric(df[col_shift], errors="coerce").astype("Int64") # βœ… FIX: CREATE site & group_model HERE (not in sidebar!) if col_fleet_type: # Split ONCE on first '-', keep FULL left part (e.g., "Amsterdam - CAT789" β†’ "AMSTERDAM") split = df[col_fleet_type].astype(str).str.split("-", n=1, expand=True) df["site"] = split[0].str.strip().str.upper() df["group_model"] = split[1].str.strip().fillna("UNKNOWN").replace("", "UNKNOWN") else: df["site"] = "UNKNOWN" df["group_model"] = "UNKNOWN" return df, col_operator, col_shift, col_fleet_type, col_speed, col_fleet_no except Exception as e: st.error(f"Error loading data: {e}") return pd.DataFrame(), None, None, None, None, None # ================================== # CALL load_data() # ================================== df, col_operator, col_shift, col_fleet_type, col_speed, col_fleet_no = load_data() df_original_full = df.copy() if df.empty: st.stop() st.success("Data Loaded Successfully") df_full_report = df.copy() # =================== FILTERS (Sidebar) ===================== filter_dict = {} st.sidebar.markdown( """
Filter if Need Specific Conditions
""", unsafe_allow_html=True ) with st.sidebar.form("filters_form"): # ---------------- Date Range ---------------- if 'date' in df.columns and not df['date'].isna().all(): min_date = pd.to_datetime(df['date']).min().date() max_date = pd.to_datetime(df['date']).max().date() date_range = st.date_input("Select Date Range", (min_date, max_date)) filter_dict['date_range'] = date_range else: filter_dict['date_range'] = (None, None) # βœ… FIXED: Use df['site'] & df['group_model'] (already created in load_data) # ---------------- Site Filter ---------------- all_sites = sorted(df['site'].dropna().unique()) selected_site = st.selectbox( "Filter Site", options=[None] + all_sites, format_func=lambda x: "All" if x is None else x ) filter_dict['site'] = selected_site # ---------------- Group Model Filter βœ… NOW WORKING ---------------- all_models = sorted(df['group_model'].dropna().unique()) selected_model = st.selectbox( "Filter Group Model", options=[None] + all_models, format_func=lambda x: "All" if x is None else x ) filter_dict['group_model'] = selected_model # ---------------- Shift ---------------- if col_shift: shifts = sorted(df[col_shift].dropna().unique()) selected_shift = st.selectbox( f"Select {col_shift.replace('_', ' ').title()}", options=[None] + shifts, format_func=lambda x: "All" if x is None else f"Shift {x}" ) filter_dict['shift'] = selected_shift else: filter_dict['shift'] = None # ---------------- Operator ---------------- if col_operator: ops = sorted(df[col_operator].dropna().unique()) selected_op = st.selectbox( f"Select {col_operator.replace('_', ' ').title()}", options=[None] + ops, format_func=lambda x: "All" if x is None else x ) filter_dict['operator'] = selected_op else: filter_dict['operator'] = None # ---------------- Hour ---------------- if 'hour' in df.columns and not df['hour'].isna().all(): hours = sorted(df['hour'].dropna().unique()) hour_range = st.slider("Select Hour Range", int(min(hours)), int(max(hours)), (int(min(hours)), int(max(hours)))) filter_dict['hour_range'] = hour_range else: filter_dict['hour_range'] = (0, 23) # ---------------- Unit No ---------------- if 'unit_no' in df.columns: units = sorted(df['unit_no'].dropna().unique()) selected_unit = st.selectbox("Select Unit Number", [None] + units, format_func=lambda x: "All" if x is None else x) filter_dict['unit_no'] = selected_unit else: filter_dict['unit_no'] = None # ---------------- Submit ---------------- apply_filters = st.form_submit_button("Apply Filters") # =================== APPLY FILTERS ===================== if apply_filters: # Filter Date Range if filter_dict.get('date_range'): start_date, end_date = filter_dict['date_range'] df = df[(df['date'] >= start_date) & (df['date'] <= end_date)] # Filter Site if filter_dict.get('site') is not None: df = df[df['site'] == filter_dict['site']] # Filter Group Model if filter_dict.get('group_model') is not None: df = df[df['group_model'] == filter_dict['group_model']] # Filter Shift if filter_dict.get('shift') is not None: df = df[df[col_shift] == filter_dict['shift']] # Filter Operator if filter_dict.get('operator') is not None: df = df[df[col_operator] == filter_dict['operator']] # Filter Hour Range if filter_dict.get('hour_range'): hr_start, hr_end = filter_dict['hour_range'] df = df[(df['hour'] >= hr_start) & (df['hour'] <= hr_end)] # Filter Unit No if filter_dict.get('unit_no') is not None: df = df[df[col_fleet_no] == filter_dict['unit_no']] # Sisanya dari kode Anda (Visualisasi, dll.) tetap sama # Objective 1 # ===================== GLOBAL FUNCTION: Hour Category Labels ===================== def hour_range_label_full(hour): if not (0 <= hour < 24): return 'Unknown' if 6 <= hour < 9: return 'Shift 1 Morning Early (6-9)' elif 9 <= hour < 12: return 'Shift 1 Morning Late (9-12)' elif 12 <= hour < 15: return 'Shift 1 Afternoon Early (12-15)' elif 15 <= hour < 18: return 'Shift 1 Afternoon Late (15-18)' elif 18 <= hour < 21: return 'Shift 2 Evening Early (18-21)' elif 21 <= hour < 24: return 'Shift 2 Evening Late (21-24)' elif 0 <= hour < 3: return 'Shift 2 Dawn Early (0-3)' elif 3 <= hour < 6: return 'Shift 2 Dawn Late (3-6)' return 'Unknown' # ===================== MAIN VISUALIZATION ===================== st.subheader("OBJECTIVE 1: Want to see fatigue patterns across different shifts?") if 'start' in df.columns and not df.empty: try: # --- Data Preparation --- df_local = df.copy() if not pd.api.types.is_datetime64_any_dtype(df_local['start']): df_local['start'] = pd.to_datetime(df_local['start'], errors='coerce') df_local = df_local.dropna(subset=['start']) df_local['hour'] = df_local['start'].dt.hour # --- COLOR MAP: KUNING-ORANGE (Shift 1), BIRU (Shift 2) --- color_map_full = { 'Shift 1 Morning Early (6-9)': '#FFEB3B', # Yellow 300 'Shift 1 Morning Late (9-12)': '#FFC107', # Amber 300 'Shift 1 Afternoon Early (12-15)': '#FF9800', # Orange 300 'Shift 1 Afternoon Late (15-18)': '#F57C00', # Deep Orange 300 'Shift 2 Evening Early (18-21)': '#42A5F5', # Light Blue 300 'Shift 2 Evening Late (21-24)': '#1976D2', # Blue 300 'Shift 2 Dawn Early (0-3)': '#0288D1', # Cyan 300 'Shift 2 Dawn Late (3-6)': '#01579B', # Blue 800 } # --- Define intervals in analog-clock order (12β†’3β†’6β†’9) --- intervals_shift1 = [(12, 15), (15, 18), (6, 9), (9, 12)] labels_shift1 = [ 'Shift 1 Afternoon Early (12-15)', 'Shift 1 Afternoon Late (15-18)', 'Shift 1 Morning Early (6-9)', 'Shift 1 Morning Late (9-12)', ] intervals_shift2 = [(0, 3), (3, 6), (18, 21), (21, 24)] labels_shift2 = [ 'Shift 2 Dawn Early (0-3)', 'Shift 2 Dawn Late (3-6)', 'Shift 2 Evening Early (18-21)', 'Shift 2 Evening Late (21-24)', ] # --- Compute frequencies --- def compute_counts(intervals): counts = [] for start_h, end_h in intervals: cnt = df_local[(df_local['hour'] >= start_h) & (df_local['hour'] < end_h)].shape[0] counts.append(cnt) return counts freq_shift1 = compute_counts(intervals_shift1) freq_shift2 = compute_counts(intervals_shift2) # --- Polar geometry --- theta_midpoints = [45, 135, 225, 315] # centers of 90Β° segments bar_width = [90] * 4 angular_tick_vals = [0, 90, 180, 270] # fixed angle positions # βœ… CUSTOM TICK LABELS PER SHIFT (sesuai permintaan Anda) angular_tick_text_shift1 = ["12", "15", "6/18", "9"] # 0Β°, 90Β°, 180Β°, 270Β° angular_tick_text_shift2 = ["24", "3", "18/6", "21"] # 0Β°=24, 90Β°=3, 180Β°=6, 270Β°=21 # --- Independent radial scales --- max_r1 = max(freq_shift1) if freq_shift1 and max(freq_shift1) > 0 else 1 max_r2 = max(freq_shift2) if freq_shift2 and max(freq_shift2) > 0 else 1 # ============== FIGURE: SHIFT 1 (KUNING-ORANGE) ============== fig1 = go.Figure() fig1.add_trace(go.Barpolar( r=freq_shift1, theta=theta_midpoints, width=bar_width, marker_color=[color_map_full.get(lbl, '#FFEB3B') for lbl in labels_shift1], marker_line_color="black", marker_line_width=1.5, opacity=0.93, hovertemplate="%{text}
Fatigue Incidents: %{r}", text=labels_shift1, )) fig1.update_layout( title=dict(text="Shift 1 (06:00–18:00)", font=dict(size=18, color="#FF9800", family="Segoe UI")), polar=dict( bgcolor="rgba(255,248,225,0.7)", angularaxis=dict( rotation=90, # 12 at top direction="clockwise", tickmode='array', tickvals=angular_tick_vals, ticktext=angular_tick_text_shift1, # βœ… 12, 15, 6, 9 tickfont=dict(size=14, color="#5D4037", weight="bold"), showline=True, linewidth=1.2, linecolor="#FFD54F", ), radialaxis=dict( visible=True, showticklabels=True, tickfont=dict(size=11), angle=45, gridcolor="#FFE082", gridwidth=0.8, range=[0, max_r1 * 1.15], ) ), showlegend=False, height=550, width=550, margin=dict(t=65, b=40, l=40, r=40), font=dict(family="Segoe UI, -apple-system, sans-serif"), ) # ============== FIGURE: SHIFT 2 (BIRU) ============== fig2 = go.Figure() fig2.add_trace(go.Barpolar( r=freq_shift2, theta=theta_midpoints, width=bar_width, marker_color=[color_map_full.get(lbl, '#42A5F5') for lbl in labels_shift2], marker_line_color="black", marker_line_width=1.5, opacity=0.93, hovertemplate="%{text}
Fatigue Incidents: %{r}", text=labels_shift2, )) fig2.update_layout( title=dict(text="Shift 2 (18:00–06:00)", font=dict(size=18, color="#1976D2", family="Segoe UI")), polar=dict( bgcolor="rgba(230,245,255,0.7)", angularaxis=dict( rotation=90, direction="clockwise", tickmode='array', tickvals=angular_tick_vals, ticktext=angular_tick_text_shift2, # βœ… 24, 3, 6, 21 tickfont=dict(size=14, color="#0D47A1", weight="bold"), showline=True, linewidth=1.2, linecolor="#64B5F6", ), radialaxis=dict( visible=True, showticklabels=True, tickfont=dict(size=11), angle=45, gridcolor="#BBDEFB", gridwidth=0.8, range=[0, max_r2 * 1.15], # βœ… SKALA INDEPENDEN ) ), showlegend=False, height=550, width=550, margin=dict(t=65, b=40, l=40, r=40), font=dict(family="Segoe UI, -apple-system, sans-serif"), ) # ============== EXPLANATION β€” URUTAN KRONOLOGIS, REMARK TETAP ============== st.markdown("""

! ⚠️ Clockwise Time Mapping (Analog Layout)

Time Block Shift 1 (Day) Shift 2 (Night)
1st Block 06 β†’ 09 18 β†’ 21 (Shift Start)
2nd Block 09 β†’ 12 21 β†’ 24 (Alertness Decline)
3rd Block 12 β†’ 15 24 β†’ 03 (Circadian Nadir)
4th Block 15 β†’ 18 03 β†’ 06

Scale is independent per shift β€” bar length shows relative risk within the shift.

""", unsafe_allow_html=True) # ============== RENDER CHARTS HORIZONTALLY (NO OVERLAP) ============== col1, col2 = st.columns(2) with col1: st.plotly_chart(fig1, use_container_width=True, config={'displayModeBar': False}) with col2: st.plotly_chart(fig2, use_container_width=True, config={'displayModeBar': False}) # ============== FOOTNOTE (SEMINAR-READY) ============== st.caption( " *Safety Insight*: Highest fatigue risk occurs during **24β†’06** (Shift 2) β€” aligns with circadian trough (Czeisler, 1999). " ) except Exception as e: st.error(f"⚠️ Rendering error: {e}") st.code(f"{type(e).__name__}: {str(e)}", language="python") else: st.info("⏳ Awaiting data... Ensure column `'start'` contains valid timestamps (e.g., '2025-06-15 14:30:00').") #Objective # #Objective 1 st.subheader("OBJECTIVE 2: How does operator energy fluctuate from start to finish of each shift?") if 'start' in df.columns and not df.empty: try: df_local = df.copy() df_local['hour'] = df_local['start'].dt.hour df_local['date'] = df_local['start'].dt.normalize() # Kategorisasi jam menggunakan fungsi global df_local['hour_category'] = df_local['hour'].apply(hour_range_label_full) color_map = { 'Shift 1 Morning Early (6-9)': '#FFEB3B', 'Shift 1 Morning Late (9-12)': '#FFC107', 'Shift 1 Afternoon Early (12-15)':'#FF9800', 'Shift 1 Afternoon Late (15-18)': '#F57C00', 'Shift 2 Evening Early (18-21)': '#42A5F5', 'Shift 2 Evening Late (21-24)': '#1976D2', 'Shift 2 Dawn Early (0-3)': '#0288D1', 'Shift 2 Dawn Late (3-6)': '#01579B', 'Unknown': '#E0E0E0' } # Hitung jumlah fatigue per hari dan kategori jam daily_by_cat = df_local.groupby(['date', 'hour_category']).size().reset_index(name='fatigue_count') # --- TAMBAHAN: Ambil dominant hour_category per hari untuk Objective 3 --- # Kita gunakan data df_local yang sudah memiliki hour_category daily_dominant_cat = df_local.groupby('date')['hour_category'].agg( lambda x: x.value_counts().idxmax() ).reset_index() daily_dominant_cat.rename(columns={'hour_category': 'dominant_hour_category'}, inplace=True) # --- END TAMBAHAN --- all_dates = pd.date_range(start=daily_by_cat['date'].min(), end=daily_by_cat['date'].max(), freq='D') all_cats = list(color_map.keys()) full_index = pd.MultiIndex.from_product([all_dates, all_cats], names=['date', 'hour_category']) daily_by_cat = daily_by_cat.set_index(['date', 'hour_category']).reindex(full_index, fill_value=0).reset_index() daily_by_cat['day_of_week_num'] = daily_by_cat['date'].dt.dayofweek daily_by_cat['week_start'] = daily_by_cat['date'] - pd.to_timedelta(daily_by_cat['day_of_week_num'], unit='d') daily_by_cat['week_label'] = daily_by_cat['week_start'].dt.strftime('Week %U') fig = px.bar( daily_by_cat, x='date', y='fatigue_count', color='hour_category', title="Daily Fatigue Alerts by Detailed Hour Category", color_discrete_map=color_map, labels={'fatigue_count': 'Fatigue Alerts', 'date': 'Date'}, hover_data={'fatigue_count': True, 'week_label': True} ) fig.update_layout( barmode='stack', xaxis_title="Date", yaxis_title="Fatigue Alerts", height=400, legend_title="Hour Category" ) unique_weeks = daily_by_cat['week_start'].unique() shapes = [] week_labels = [] bg_colors = ['#f0e6ff', '#e6f0ff', '#e6fff0', '#fff0e6', '#ffe6e6', '#f0ffe6', '#e6e6ff'] for i, week in enumerate(sorted(unique_weeks)): week_days = daily_by_cat[daily_by_cat['week_start'] == week]['date'] if len(week_days) > 0: start_date = week_days.min() end_date = week_days.max() shapes.append(dict( type="rect", xref="x", yref="paper", x0=start_date, x1=end_date, y0=0, y1=1, fillcolor=bg_colors[i % len(bg_colors)], opacity=0.2, layer="below", line_width=0, )) week_labels.append( dict( xref='x', yref='paper', x=start_date + (end_date - start_date) / 2, y=1.02, text=f"Week {week.strftime('%U')}", showarrow=False, font=dict(size=10), xanchor='center', yanchor='bottom' ) ) fig.update_layout(shapes=shapes, annotations=week_labels) st.plotly_chart(fig, use_container_width=True) except Exception as e: st.error(f"⚠️ Error in Daily Fatigue by Detailed Hour Category: {e}") else: st.info("ℹ️ Insufficient time data to display this visualization.") # =================== OBJECTIVE 3: Daily Roster Insight per Week (Scatter Plot) ===================== # =================== OBJECTIVE 3: Daily Roster Insight per Week (Scatter Plot) ===================== st.subheader("OBJECTIVE 3: Looking for patterns in your team’s weekly roster?") if not df.empty and col_operator in df.columns and col_shift and col_shift in df.columns: try: df['date'] = pd.to_datetime(df['date']) # Hitung total event per hari daily_totals = df.groupby('date').size().reset_index(name='total_count') # Ambil dominant shift per hari dominant_shift = df.groupby('date')[col_shift].agg(lambda x: x.value_counts().idxmax()).reset_index() dominant_shift.rename(columns={col_shift: 'dominant_shift'}, inplace=True) daily_analysis = daily_totals.merge(dominant_shift, on='date', how='left') daily_analysis['week_start'] = daily_analysis['date'] - pd.to_timedelta(daily_analysis['date'].dt.weekday, unit='d') summary = [] weekly_groups = daily_analysis.groupby('week_start') for week_start, week_data in weekly_groups: # Urutkan data berdasarkan tanggal dalam minggu ini week_data_sorted = week_data.sort_values('date').reset_index(drop=True) for idx, row in week_data_sorted.iterrows(): current_date = row['date'] current_shift = row['dominant_shift'] current_count = row['total_count'] # --- CARI DATA DI HARI SEBELUM DAN SESUDAH (BERDASARKAN TANGGAL, BUKAN INDEKS) --- prev_date = current_date - pd.Timedelta(days=1) next_date = current_date + pd.Timedelta(days=1) # Cari shift di hari sebelumnya prev_row = week_data_sorted[week_data_sorted['date'] == prev_date] prev_shift = prev_row['dominant_shift'].iloc[0] if not prev_row.empty else None # Cari shift di hari berikutnya next_row = week_data_sorted[week_data_sorted['date'] == next_date] next_shift = next_row['dominant_shift'].iloc[0] if not next_row.empty else None # ---- LOGIKA REMARK BERDASARKAN PERUBAHAN SHIFT DALAM MINGGU YANG SAMA---- # Awal Roster: Ada data di hari sebelumnya (prev_date) dalam minggu, dan shift-nya berbeda # Akhir Roster: Ada data di hari berikutnya (next_date) dalam minggu, dan shift-nya berbeda # Bukan Awal/Akhir: Ada data di hari sebelumnya ATAU berikutnya, dan shift-nya sama # Unknown: Tidak ada data di hari sebelumnya (prev_date) DAN tidak ada data di hari berikutnya (next_date) dalam minggu yang sama if pd.isna(current_shift): remark = "Unknown" elif prev_shift is not None and prev_shift != current_shift: remark = "Start of Roster" elif next_shift is not None and next_shift != current_shift: remark = "End of Roster" elif (prev_shift is not None and prev_shift == current_shift) or (next_shift is not None and next_shift == current_shift): remark = "Neither Start nor End of Roster" elif prev_shift is None and next_shift is None: remark = "Unknown" else: remark = "Unknown" # --- Operator dari data df (YANG SUDAH DIFILTER) --- df_orig_for_date = df[df['date']==current_date] # Gunakan df yang difilter if not df_orig_for_date.empty: peak_nik_counts = df_orig_for_date[col_operator].value_counts() peak_nik = peak_nik_counts.index[0] if not peak_nik_counts.empty else "N/A" else: peak_nik = "N/A" summary.append({ 'week_start': week_start, 'date': current_date, 'day_name': current_date.strftime('%A'), 'total_count': current_count, 'shift_category': current_shift, 'remark': remark, 'operator': peak_nik }) summary_df = pd.DataFrame(summary) if not summary_df.empty: # Buat color map untuk remark (sesuai permintaan Anda) color_map_remark = { 'Start of Roster': '#ffcccc', # Merah muda 'End of Roster': '#cce5ff', # Biru muda 'Neither Start nor End of Roster': '#fff2cc', # Kuning muda 'Unknown': '#c0c0c0' # Abu-abu muda } # ===== SCATTER PLOT (WARNA BERDASARKAN remark) ===== fig = px.scatter( summary_df, x='date', y='remark', color='remark', # Warna berdasarkan remark (satu-satunya kolom di sumbu Y) color_discrete_map=color_map_remark, # Gunakan color_map_remark size='total_count', hover_data=['shift_category', 'operator', 'total_count'], title="Daily Roster Status by Date and Trend", category_orders={'remark': ['Start of Roster', 'End of Roster', 'Neither Start nor End of Roster', 'Unknown']} ) fig.update_layout(height=450, xaxis_title="Date", yaxis_title="Roster Status") st.plotly_chart(fig, use_container_width=True) # ===== TABEL ===== table_df = summary_df.rename(columns={ 'week_start':'Week Start', 'day_name':'Day', 'date':'Date', 'total_count':'Event Count', 'shift_category':'Dominant Shift', 'remark':'Roster Status', 'operator':'Operator' }) def highlight_remark(row): colors = { 'Start of Roster':'background-color: #ffcccc', 'End of Roster':'background-color: #cce5ff', 'Neither Start nor End of Roster':'background-color: #fff2cc', 'Unknown':'background-color: #c0c0c0' } return [colors.get(row['Roster Status'], '') for _ in row] st.dataframe(table_df.style.apply(highlight_remark, axis=1), use_container_width=True) else: st.info("ℹ️ No daily data to analyze.") except Exception as e: st.error(f"Error in Daily Roster Insight: {e}") else: if col_shift is None: st.info("ℹ️ Shift column not found, cannot display Daily Roster Insight.") elif col_shift not in df.columns: st.info(f"ℹ️ Column '{col_shift}' not found in the filtered data, cannot display Daily Roster Insight.") else: st.info("ℹ️ Insufficient data (date, operator, or shift column not found) to display daily roster insight.") # import plotly.express as px # from datetime import datetime st.subheader("OBJECTIVE 4: How is the Fatigue Event Risk Map per Operator?") import math import plotly.express as px try: # ============================ # 1. PREPROCESS & COPY DF # ============================ df_local = df.copy() df_local['date_only'] = df_local['start'].dt.normalize() df_local['week_number'] = df_local['date_only'].dt.isocalendar().week df_local['week_label'] = "Week " + df_local['week_number'].astype(str) # Unit cleanup df_local['unit_no'] = ( df_local[col_fleet_no] .astype(str) .str.split("-", n=1).str[-1].str.strip() ) if 'id' not in df_local.columns: st.error("❌ Column 'id' not found!") st.stop() # ============================ # 2. FILTER 8 MINGGU TERAKHIR # ============================ df_local['week_num_int'] = df_local['week_number'].astype(int) unique_weeks = sorted(df_local['week_num_int'].unique()) selected_last8 = unique_weeks[-8:] if len(unique_weeks) >= 8 else unique_weeks df_8w = df_local[df_local['week_num_int'].isin(selected_last8)].copy() # ===================================== # 3. FREQUENCY PER OPERATOR PER MINGGU # ===================================== weekly_freq = ( df_8w.groupby([col_operator, 'week_label'])['id'] .nunique() .reset_index(name='weekly_frequency') ) # ============================================ # 4. SUMMARY FREQUENCY & CEIL AVERAGE FREQ # ============================================ freq_summary = ( weekly_freq .groupby(col_operator)['weekly_frequency'] .agg(['sum', 'mean', 'count']) .reset_index() .rename(columns={ 'sum': 'frequency_by_shift', 'mean': 'avg_frequency', 'count': 'frequency_by_weeks' }) ) freq_summary['avg_frequency'] = freq_summary['avg_frequency'].apply(lambda x: math.ceil(x)) # ================================ # 5. RATA-RATA SPEED PER OPERATOR # ================================ speed_summary = ( df_8w.groupby(col_operator)[col_speed] .mean() .reset_index(name='avg_speed') ) # ===================== # 6. GABUNGKAN DATA # ===================== risk_matrix = freq_summary.merge(speed_summary, on=col_operator, how='left') risk_matrix = risk_matrix.rename(columns={col_operator: "Operator Name"}) # ================================ # 7. Tentukan Quadrant untuk Count # ================================ def assign_quadrant(row): if row['avg_frequency'] >= 2.5 and row['avg_speed'] >= 20: return "Quadrant I – Prevent at Source" elif row['avg_frequency'] < 2.5 and row['avg_speed'] >= 20: return "Quadrant II – Detect & Monitor" elif row['avg_frequency'] >= 2.5 and row['avg_speed'] < 20: return "Quadrant III – Monitor" else: return "Quadrant IV – Low Control" risk_matrix['quadrant'] = risk_matrix.apply(assign_quadrant, axis=1) quadrant_count = risk_matrix['quadrant'].value_counts().reindex([ "Quadrant I – Prevent at Source", "Quadrant II – Detect & Monitor", "Quadrant III – Monitor", "Quadrant IV – Low Control" ], fill_value=0) # ================================ # 8. VISUAL SCATTER PLOT # ================================ fig = px.scatter( risk_matrix, x='avg_frequency', y='avg_speed', hover_name="Operator Name", title="Operator Risk Matrix: Frequency vs Speed", size=[12] * len(risk_matrix), size_max=15 ) max_x = risk_matrix['avg_frequency'].max() + 1 max_y = risk_matrix['avg_speed'].max() + 1 # ================================ # 9. Quadrant Coloring # ================================ fig.add_shape(type="rect", x0=2.5, x1=max_x, y0=20, y1=max_y, fillcolor="rgba(255,0,0,0.25)", line_width=0) # I fig.add_shape(type="rect", x0=0, x1=2.5, y0=20, y1=max_y, fillcolor="rgba(255,150,50,0.25)", line_width=0) # II fig.add_shape(type="rect", x0=2.5, x1=max_x, y0=0, y1=20, fillcolor="rgba(255,200,200,0.25)", line_width=0) # III fig.add_shape(type="rect", x0=0, x1=2.5, y0=0, y1=20, fillcolor="rgba(0,120,255,0.15)", line_width=0) # IV # Garis batas fig.add_vline(x=2.5, line_dash="dash", line_color="black") fig.add_hline(y=20, line_dash="dash", line_color="black") # ================================ # 10. Tampilkan Count di Quadrant # ================================ fig.add_annotation( x=2.5 + (max_x-2.5)/2, y=20 + (max_y-20)/2, text=f"{quadrant_count['Quadrant I – Prevent at Source']}", showarrow=False, font=dict(size=20, color="red") ) fig.add_annotation( x=2.5/2, y=20 + (max_y-20)/2, text=f"{quadrant_count['Quadrant II – Detect & Monitor']}", showarrow=False, font=dict(size=20, color="orange") ) fig.add_annotation( x=2.5 + (max_x-2.5)/2, y=0 + (20-0)/2, text=f"{quadrant_count['Quadrant III – Monitor']}", showarrow=False, font=dict(size=20, color="darkred") ) fig.add_annotation( x=2.5/2, y=0 + (20-0)/2, text=f"{quadrant_count['Quadrant IV – Low Control']}", showarrow=False, font=dict(size=20, color="blue") ) # ================================ # 11. Label Quadrant # ================================ fig.add_annotation(x=4, y=max_y-2, text="Quadrant I
Prevent at Source", showarrow=False, font=dict(size=12)) fig.add_annotation(x=1.25, y=max_y-2, text="Quadrant II
Detect & Monitor", showarrow=False, font=dict(size=12)) fig.add_annotation(x=4, y=5, text="Quadrant III
Monitor", showarrow=False, font=dict(size=12)) fig.add_annotation(x=1.25, y=5, text="Quadrant IV
Low Control", showarrow=False, font=dict(size=12)) fig.update_xaxes(dtick=1) fig.update_layout( xaxis_title="Average Frequency (Ceil)", yaxis_title="Average Speed (km/h)", height=650 ) st.plotly_chart(fig, use_container_width=True) # ================================ # 12. DISPLAY TABLE # ================================ st.subheader("Operator Hazard Summary Table (8 Weeks Observed)") table_display = ( risk_matrix[[ "Operator Name", "frequency_by_shift", "avg_frequency", "frequency_by_weeks", "avg_speed", "quadrant" ]] .rename(columns={ "frequency_by_shift": "Frequency by Shift", "avg_frequency": "Avg Frequency", "frequency_by_weeks": "Frequency by Weeks", "avg_speed": "Avg Speed" }) ) st.dataframe( table_display.sort_values("Avg Frequency", ascending=False), use_container_width=True ) except Exception as e: st.error(f"⚠️ Error Risk Map Objective 4: {e}") st.exception(e) # ... (kode sebelumnya tetap sama) ... # ... (kode sebelumnya tetap sama) ... # ... (kode sebelumnya tetap sama) ... # =================== OBJECTIVE 5: Operator Fatigue Risk Gradient Dashboard ===================== # ... (kode sebelumnya tetap sama) ... st.subheader("OBJECTIVE 5:See your team’s Fatigue Hazard Profile!") # Custom CSS β€” tetap seperti sebelumnya (sudah sesuai preferensi) st.markdown(""" """, unsafe_allow_html=True) # =============================================================== # LOGIC UTAMA # =============================================================== if df.empty: st.info("No data available after applying filters.") else: try: required = [col_operator, col_fleet_type, "start"] if not all(c in df.columns for c in required if c is not None): st.warning("Required columns (operator, fleet_type, start) are missing.") st.stop() df_op = df[[col_operator, col_fleet_type, "start"]].dropna() if df_op.empty: st.info("No operator data after filtering.") st.stop() if col_operator is None: st.error("Operator column could not be auto-detected. Please check your data.") st.stop() df_op["year_week"] = df_op["start"].dt.strftime("%Y-W%U") # Fuzzy match fleet names fleet_clean = df_op[col_fleet_type].str.strip().str.upper() df_op["is_ob"] = fleet_clean.str.contains(r"OB HAULLER", na=False) df_op["is_coal"] = fleet_clean.str.contains(r"HAULING COAL", na=False) ob_data = df_op[df_op["is_ob"]] coal_data = df_op[df_op["is_coal"]] def get_top10_with_slope(data): if data.empty: return pd.DataFrame() if col_operator not in data.columns: st.error(f"Operator column '{col_operator}' not found in data subset.") return pd.DataFrame() weekly = data.groupby([col_operator, "year_week"]).size().reset_index(name="weekly_sum") metrics = [] for nik, grp in weekly.groupby(col_operator): if pd.isna(nik): continue grp = grp.sort_values("year_week") counts = grp["weekly_sum"].values weeks = np.arange(len(counts)) weekly_avg = counts.mean() total_events = counts.sum() n_weeks = len(counts) if n_weeks >= 2: x_mean = weeks.mean() y_mean = counts.mean() numerator = np.sum((weeks - x_mean) * (counts - y_mean)) denominator = np.sum((weeks - x_mean) ** 2) slope = numerator / denominator if denominator != 0 else 0.0 else: slope = 0.0 # One Time Event metrics.append({ col_operator: nik, "weekly_avg": weekly_avg, "slope": slope, "total_events": total_events, "n_weeks": n_weeks }) if not metrics: return pd.DataFrame() return pd.DataFrame(metrics).nlargest(10, "weekly_avg") top_ob = get_top10_with_slope(ob_data) top_coal = get_top10_with_slope(coal_data) def get_all_operators_with_slope(data): if data.empty: return pd.DataFrame() if col_operator not in data.columns: return pd.DataFrame() weekly = data.groupby([col_operator, "year_week"]).size().reset_index(name="weekly_sum") metrics = [] for nik, grp in weekly.groupby(col_operator): if pd.isna(nik): continue grp = grp.sort_values("year_week") counts = grp["weekly_sum"].values weeks = np.arange(len(counts)) weekly_avg = counts.mean() total_events = counts.sum() n_weeks = len(counts) if n_weeks >= 2: slope = np.cov(weeks, counts)[0, 1] / np.var(weeks) if np.var(weeks) != 0 else 0.0 else: slope = 0.0 metrics.append({ col_operator: nik, "weekly_avg": weekly_avg, "slope": slope, "total_events": total_events, "n_weeks": n_weeks }) return pd.DataFrame(metrics) if metrics else pd.DataFrame() all_ob = get_all_operators_with_slope(ob_data) all_coal = get_all_operators_with_slope(coal_data) # =============================================================== # LEGEND β€” UPDATED: Stable β†’ One Time Event, Gray β†’ Yellow # =============================================================== st.subheader("Hazard Gradient Legend") st.markdown("""
Worsening Trends (Positive Slope):
Very High Worsening (β‰₯1.5)
High Worsening (1.0–1.5)
Moderate Worsening (0.5–1.0)
Slight Worsening (0–0.5)
Improving Trends (Negative Slope):
Excellent Improvement (β‰€βˆ’1.5)
Great Improvement (βˆ’1.5 to βˆ’1.0)
Good Improvement (βˆ’1.0 to βˆ’0.5)
Slight Improvement (βˆ’0.5 to 0)
One-Time Events (Zero Slope):
One Time Event (0)

Note: Applies when an operator has data in only one week β€” slope is set to 0 by definition.
""", unsafe_allow_html=True) # =============================================================== # PLOT FUNCTION β€” UPDATED: color for slope=0 is now #FFD700 # =============================================================== def plot_chart(data, title): if data.empty: fig = go.Figure() fig.add_annotation( text="No Data", x=0.5, y=0.5, showarrow=False, font_size=16 ) fig.update_layout(height=350, title=dict(text=title, x=0.5)) return fig data_sorted = data.sort_values('weekly_avg', ascending=False) def get_color(slope): if slope == 0: return "#FFD700" # βœ… Kuning untuk One Time Event elif slope > 0: if slope < 0.5: return "#ffcdd2" elif slope < 1.0: return "#ef9a9a" elif slope < 1.5: return "#e57373" else: return "#d32f2f" else: # slope < 0 if slope > -0.5: return "#c8e6c9" elif slope > -1.0: return "#a5d6a7" elif slope > -1.5: return "#81c784" else: return "#388e3c" colors = [get_color(s) for s in data_sorted["slope"]] bar_trace = go.Bar( x=data_sorted[col_operator].astype(str), y=data_sorted["weekly_avg"], marker=dict( color=colors, line=dict(width=2, color="rgba(0,0,0,0.2)") ), text=[f"{v:.1f}" for v in data_sorted["weekly_avg"]], textposition="outside", hovertemplate=( "%{x}
" + "Weekly Avg: %{y:.2f}
" + "Trend Slope: %{customdata[0]:+.3f}
" + "Total Events: %{customdata[1]}
" + "Weeks Active: %{customdata[2]}
" + "" ), customdata=np.stack([data_sorted["slope"], data_sorted["total_events"], data_sorted["n_weeks"]], axis=-1) ) fig = go.Figure(bar_trace) fig.update_layout( title=dict(text=f"{title}", x=0.5), height=450, margin=dict(l=50, r=20, t=60, b=120), xaxis_title="Operator Name", yaxis_title="Weekly Avg Events", font=dict(family="Segoe UI", size=12), bargap=0.3, plot_bgcolor="rgba(0,0,0,0)", paper_bgcolor="rgba(0,0,0,0)", xaxis=dict(tickangle=45) ) return fig # =============================================================== # CHARTS # =============================================================== col1, col2 = st.columns(2) with col1: st.plotly_chart(plot_chart(top_ob, "OB HAULER Operators (Hazard Gradient)"), use_container_width=True) with col2: st.plotly_chart(plot_chart(top_coal, "HAULING COAL Operators (Hazard Gradient)"), use_container_width=True) # =============================================================== # AI INSIGHTS β€” tetap dalam bahasa Inggris, tanpa emoticon # =============================================================== col_insight1, col_insight2 = st.columns(2) with col_insight1: if not top_ob.empty: st.markdown("### OB HAULER Analysis") ob_worsening = len(top_ob[top_ob['slope'] > 0]) ob_improving = len(top_ob[top_ob['slope'] < 0]) ob_one_time = len(top_ob[top_ob['slope'] == 0]) ob_avg_risk = top_ob['weekly_avg'].mean() ob_max_risk = top_ob['weekly_avg'].max() ob_insights = [] if ob_worsening > ob_improving: ob_insights.append(f"{ob_worsening} out of 10 top risk operators are showing worsening trends.") else: ob_insights.append(f"{ob_improving} out of 10 top risk operators are showing improvement.") if ob_one_time > 0: ob_insights.append(f"{ob_one_time} operators are classified as One Time Event (single-week activity).") ob_insights.append(f"Average risk: {ob_avg_risk:.2f} events/week (max: {ob_max_risk:.2f}).") for insight in ob_insights: st.markdown(f"""
Hazard Summary

{insight}

""", unsafe_allow_html=True) else: st.info("No OB HAULER data for analysis.") with col_insight2: if not top_coal.empty: st.markdown("### HAULING COAL Analysis") coal_worsening = len(top_coal[top_coal['slope'] > 0]) coal_improving = len(top_coal[top_coal['slope'] < 0]) coal_one_time = len(top_coal[top_coal['slope'] == 0]) coal_avg_risk = top_coal['weekly_avg'].mean() coal_max_risk = top_coal['weekly_avg'].max() coal_insights = [] if coal_worsening > coal_improving: coal_insights.append(f"{coal_worsening} out of 10 top risk operators are showing worsening trends.") else: coal_insights.append(f"{coal_improving} out of 10 top risk operators are showing improvement.") if coal_one_time > 0: coal_insights.append(f"{coal_one_time} operators are classified as One Time Event (single-week activity).") coal_insights.append(f"Average risk: {coal_avg_risk:.2f} events/week (max: {coal_max_risk:.2f}).") for insight in coal_insights: st.markdown(f"""
Hazard Summary

{insight}

""", unsafe_allow_html=True) else: st.info("No HAULING COAL data for analysis.") # =============================================================== # RECOMMENDATIONS # =============================================================== col_rec1, col_rec2 = st.columns(2) def generate_recommendations(top_ob, top_coal): rec = {} if not top_ob.empty: w = len(top_ob[top_ob['slope'] > 0]) ot = len(top_ob[top_ob['slope'] == 0]) avg = top_ob['weekly_avg'].mean() if w > 5: r = "Prioritize fatigue intervention for operators with worsening trends." reason = "High proportion of deteriorating operators signals emerging fatigue risks." elif ot > 4: r = "Validate data completeness β€” high One Time Event count may indicate reporting gaps." reason = "Operators with single-week data cannot yield reliable trend analysis." elif avg > 8: r = "Review scheduling and rest protocols to reduce event frequency." reason = "Elevated average event rate increases cumulative fatigue exposure." else: r = "Maintain current protocols with targeted monitoring." reason = "Risk profile is stable; focus on sustaining safe practices." rec['ob'] = r rec['ob_reason'] = reason if not top_coal.empty: w = len(top_coal[top_coal['slope'] > 0]) ot = len(top_coal[top_coal['slope'] == 0]) avg = top_coal['weekly_avg'].mean() if w > 5: r = "Prioritize fatigue intervention for operators with worsening trends." reason = "High proportion of deteriorating operators signals emerging fatigue risks." elif ot > 4: r = "Validate data completeness β€” high One Time Event count may indicate reporting gaps." reason = "Operators with single-week data cannot yield reliable trend analysis." elif avg > 8: r = "Review scheduling and rest protocols to reduce event frequency." reason = "Elevated average event rate increases cumulative fatigue exposure." else: r = "Maintain current protocols with targeted monitoring." reason = "Risk profile is stable; focus on sustaining safe practices." rec['coal'] = r rec['coal_reason'] = reason return rec ai_rec = generate_recommendations(top_ob, top_coal) with col_rec1: if 'ob' in ai_rec: st.markdown("### OB HAULER Recommendations") st.markdown(f"""
Action Plan
{ai_rec['ob']}
AI Reasoning: {ai_rec['ob_reason']}
""", unsafe_allow_html=True) else: st.info("No OB HAULER recommendations.") with col_rec2: if 'coal' in ai_rec: st.markdown("### HAULING COAL Recommendations") st.markdown(f"""
Action Plan
{ai_rec['coal']}
AI Reasoning: {ai_rec['coal_reason']}
""", unsafe_allow_html=True) else: st.info("No HAULING COAL recommendations.") except Exception as e: st.error(f"Error in Top 10 Operator analysis: {str(e)}") st.exception(e) # optionally show full traceback during dev import re # Tambahkan ini jika belum ada # =================== OBJECTIVE 6: Automated Insights & AI Recommendations ===================== st.subheader("OBJECTIVE 6: Instant Insights & Recommendations") # Membagi tampilan menjadi dua kolom col_insights, col_recs = st.columns(2) # Kolom kiri: Insights by Advanced Analytics with col_insights: st.subheader("Insights by Advanced Analytics") # 1. Critical Hour Analysis (2-5 AM) critical_hours = [2, 3, 4, 5] critical_alerts = df[df['hour'].isin(critical_hours)] critical_pct = (len(critical_alerts) / len(df)) * 100 if len(df) > 0 else 0 st.markdown(f"**Critical Hour Risk (3-6 AM)**") bg_color = "#ffcccc" if critical_pct > 50 else "#ffebcc" if critical_pct > 25 else "#ffffcc" if critical_pct > 10 else "#e6ffe6" # Tampilkan jumlah dan persentase dalam satu div st.markdown( f"""
{len(critical_alerts)}
↑ {critical_pct:.1f}% of total alerts
""", unsafe_allow_html=True ) # Gunakan st.markdown untuk warning/info dengan persentase berwarna merah tebal if critical_pct > 10: st.markdown( f"⚠️ High risk: {critical_pct:.1f}% of fatigue alerts occur during critical hours (3-6 AM). This is a known circadian dip period.", unsafe_allow_html=True ) else: st.markdown( f"βœ… {critical_pct:.1f}% of alerts occur during critical hours. This is within acceptable range.", unsafe_allow_html=True ) # 2. High-Speed Fatigue Analysis (Environmental Risk) if col_speed and col_speed in df.columns: high_speed_threshold = 20 high_speed_fatigue = df[df[col_speed] >= high_speed_threshold] if high_speed_threshold > 0 else pd.DataFrame() high_speed_pct = (len(high_speed_fatigue) / len(df)) * 100 if len(df) > 0 else 0 st.markdown(f"**High-Speed Fatigue Risk (Speed > {high_speed_threshold:.0f} km/h)**") # Tampilkan jumlah dan persentase dalam satu div st.markdown( f"""
{len(high_speed_fatigue)}
↑ {high_speed_pct:.1f}% of total alerts
""", unsafe_allow_html=True ) # Gunakan st.markdown untuk warning/info dengan persentase berwarna merah tebal if high_speed_pct > 20: st.markdown( f"⚠️ High risk: {high_speed_pct:.1f}% of fatigue alerts occur at high speeds. This increases accident severity potential.", unsafe_allow_html=True ) else: st.markdown( f"βœ… {high_speed_pct:.1f}% of alerts occur at high speeds. This is within acceptable range.", unsafe_allow_html=True ) else: st.info("Speed data not available for High-Speed Fatigue Analysis.") # 3. Shift Pattern Analysis if col_shift and col_shift in df.columns: shift_counts = df[col_shift].value_counts() st.markdown(f"**Shift Pattern Risk**") for shift_val in shift_counts.index: shift_pct = (shift_counts[shift_val] / len(df)) * 100 # Tampilkan jumlah dan persentase dalam satu div dengan warna latar belakang seragam st.markdown( f"""
Shift {shift_val}
{shift_counts[shift_val]}
↑ {shift_pct:.1f}% of total alerts
""", unsafe_allow_html=True ) # Gunakan st.markdown untuk warning/info dengan persentase berwarna merah tebal if shift_pct > 50: st.markdown( f"⚠️ Shift {shift_val} has disproportionately high alerts ({shift_pct:.1f}%). Review shift scheduling and workload.", unsafe_allow_html=True ) else: st.markdown( f"βœ… Shift {shift_val} alert distribution is acceptable ({shift_pct:.1f}%).", unsafe_allow_html=True ) else: st.info("Shift data not available for Shift Pattern Analysis.") # 4. Operator Risk Profiling if col_operator and col_operator in df.columns: operator_alerts = df[col_operator].value_counts() top_risk_operators = operator_alerts.head(5) # Top 5 operators by alerts st.markdown("**High-Risk Operator Identification**") # Warna berdasarkan ranking 1–5 colors = ["#d32f2f", "#e57373", "#ef9a9a", "#ffcdd2", "#ffe1e4"] for idx, (op_name, count) in enumerate(top_risk_operators.items()): op_pct = (count / len(df)) * 100 color = colors[idx] if idx < len(colors) else colors[-1] # Teks normal untuk nama dan jumlah alert st.markdown(f"**Operator:** {op_name} \n**Alerts:** {count}") # Hanya 'Share' yang berwarna sesuai ranking st.markdown( f"Share: " f"{op_pct:.1f}% of total alerts", unsafe_allow_html=True ) # Risk message (tetap gunakan component Streamlit agar konsisten) if op_pct > 5: st.warning(f"Operator {op_name} has high fatigue risk ({op_pct:.1f}%). Consider coaching or rest plan.") else: st.info(f"Operator {op_name} fatigue risk is within acceptable range ({op_pct:.1f}%).") else: st.info("Operator data not available for Operator Risk Profiling.") # Kolom kanan: AI Recommendations with col_recs: st.subheader("Recommendations") ai_recs = [] insights_found = [] # Peak hour if "hour" in df.columns and not df.empty: peak_hour = df["hour"].value_counts().idxmax() critical_hours = [2, 3, 4, 5] if peak_hour in critical_hours: insights_found.append(f" Most fatigue risk occurs at **{peak_hour}:00** β€” during critical circadian low period (3-6 AM). Consider enhanced monitoring.") else: insights_found.append(f"Most fatigue risk occurs at **{peak_hour}:00** β€” likely due to circadian drop.") # Risk shift if col_shift and not df.empty: worst_shift = df[col_shift].value_counts().idxmax() insights_found.append(f" Highest fatigue recorded in **Shift {worst_shift}** β€” review scheduling & workload.") # Worst operator if col_operator and not df.empty: worst_operator = df[col_operator].value_counts().idxmax() insights_found.append(f" Operator at highest risk: **{worst_operator}** β€” suggested coaching or rest plan.") # Duration risk if "duration_sec" in df.columns and not df.empty: avg_duration = df["duration_sec"].mean() if not pd.isna(avg_duration) and avg_duration > 10: insights_found.append(" Long fatigue event duration suggests slow response β€” improve alerting training.") # Generate recommendations based on found insights if insights_found: if any("circadian low" in i.lower() for i in insights_found): ai_recs.append({ "recommendation": "Deploy enhanced fatigue monitoring systems (e.g., EOR) specifically during 3-6 AM shifts.", "data_point": f"Critical Hour Alerts: {len(critical_alerts)} ({critical_pct:.1f}% of total alerts)", "reason": "High percentage of alerts occurring during the known circadian low period (3-6 AM) indicates increased risk during these hours." }) if any("shift" in i.lower() for i in insights_found): ai_recs.append({ "recommendation": "Review shift rotation schedules to minimize consecutive high-risk shifts.", "data_point": f"Shift {worst_shift} Alerts: {df[col_shift].value_counts()[worst_shift]} ({(df[col_shift].value_counts()[worst_shift] / len(df)) * 100:.1f}% of total alerts)", "reason": f"The identified high-risk shift ({worst_shift}) has the highest number of fatigue alerts, suggesting scheduling or workload issues." }) if any("operator" in i.lower() for i in insights_found): ai_recs.append({ "recommendation": "Initiate individual coaching or mandatory rest periods for high-risk operators.", "data_point": f"Operator {worst_operator} Alerts: {df[col_operator].value_counts()[worst_operator]} ({(df[col_operator].value_counts()[worst_operator] / len(df)) * 100:.1f}% of total alerts)", "reason": f"The identified high-risk operator ({worst_operator}) has the highest number of fatigue alerts, indicating a need for targeted intervention." }) if any("duration" in i.lower() for i in insights_found): ai_recs.append({ "recommendation": "Review and improve alert response protocols and training.", "data_point": f"Average Fatigue Event Duration: {avg_duration:.2f} seconds", "reason": "Long average duration suggests potential delays in response time or alert acknowledgment, requiring protocol review." }) if any("high-speed" in i.lower() for i in insights_found): ai_recs.append({ "recommendation": "Implement speed management strategies in conjunction with fatigue monitoring.", "data_point": f"High-Speed Fatigue Events: {len(high_speed_fatigue)} ({high_speed_pct:.1f}% of total alerts)", "reason": "A significant percentage of alerts occur at high speeds, increasing accident severity risk. Speed control is crucial." }) if not ai_recs: ai_recs.append({ "recommendation": "Data quality is sufficient. Focus on implementing recommendations from Objectives 1-5.", "data_point": "General Data Quality Check", "reason": "No specific high-impact insights were automatically identified from the aggregated data in this section." }) for rec in ai_recs: # Ambil data_point dan ganti teks persentase di dalamnya menjadi warna merah data_point_text = rec['data_point'] # Ganti pola persentase (X.X%) dengan span warna merah # Cari pola seperti "1.6%", "21.2%", dll. data_point_colored = re.sub(r'(\d+\.?\d*%)', r'\1', data_point_text) # Ambil reason dan lakukan hal yang sama reason_text = rec['reason'] reason_colored = re.sub(r'(\d+\.?\d*%)', r'\1', reason_text) # Tampilkan rekomendasi dengan teks persentase berwarna merah st.markdown(f"""
AI Recommendation
Action: {rec['recommendation']}
Data Point: {data_point_colored}
AI Reasoning: {reason_colored}
""", unsafe_allow_html=True) else: st.info("No specific data points available for AI recommendations. Ensure relevant columns (hour, shift, operator, duration, speed) are present and populated.") # ================= FOOTER =========================== st.markdown("---") st.markdown('', unsafe_allow_html=True)