import gradio as gr import pandas as pd import matplotlib.pyplot as plt from datetime import datetime, timedelta import random # Simulate sample data for lab devices (mimicking Salesforce custom objects) def generate_sample_data(): labs = ["Lab_A", "Lab_B", "Lab_C"] equipment_types = ["Microscope", "Centrifuge", "UV_Sterilizer"] devices = [] logs = [] # Generate devices for i in range(10): devices.append({ "Device_ID": f"Device_{i+1}", "Lab": random.choice(labs), "Equipment_Type": random.choice(equipment_types), "Status": random.choice(["Operational", "Down", "Maintenance"]) }) # Generate logs for a broad date range (Jan 1, 2025 to Jun 30, 2025) start_date = datetime(2025, 1, 1) end_date = datetime(2025, 6, 30) for device in devices: current_date = start_date while current_date <= end_date: logs.append({ "Device_ID": device["Device_ID"], "Log_Timestamp": current_date.strftime("%Y-%m-%d %H:%M:%S"), "Usage_Count": random.randint(0, 50), "Status": random.choice(["Operational", "Down", "Maintenance"]), "AMC_Expiry": (current_date + timedelta(days=random.randint(10, 90))).strftime("%Y-%m-%d") }) current_date += timedelta(days=1) return pd.DataFrame(devices), pd.DataFrame(logs) # Initialize sample data devices_df, logs_df = generate_sample_data() def process_dashboard_data(lab_filter, equipment_type_filter, start_date, end_date): """ Process device and log data based on filters and return dashboard components. Args: lab_filter (str): Selected lab (e.g., Lab_A or All). equipment_type_filter (str): Selected equipment type (e.g., Microscope or All). start_date (str): Start date in YYYY-MM-DD format. end_date (str): End date in YYYY-MM-DD format. Returns: tuple: Device cards text, daily trend plot, uptime plot, anomaly alerts, report text, report file. """ try: # Validate and parse date inputs if start_date and end_date: try: start_date = start_date.strip() if start_date else "" end_date = end_date.strip() if end_date else "" start_date_dt = datetime.strptime(start_date, "%Y-%m-%d") end_date_dt = datetime.strptime(end_date, "%Y-%m-%d") if start_date_dt > end_date_dt: return "Error: Start date must be before end date.", None, None, None, "", None # Check if dates are within sample data range data_start = datetime(2025, 1, 1) data_end = datetime(2025, 6, 30) if start_date_dt < data_start or end_date_dt > data_end: return f"Error: Dates must be between 2025-01-01 and 2025-06-30. Received: Start={start_date}, End={end_date}", None, None, None, "", None except ValueError: return f"Error: Invalid date format. Use YYYY-MM-DD (e.g., 2025-05-01). Received: Start={start_date}, End={end_date}", None, None, None, "", None else: start_date_dt = datetime(2025, 1, 1) end_date_dt = datetime(2025, 6, 30) print(f"Input dates: Start={start_date}, End={end_date}") # Debug log # Apply filters filtered_devices = devices_df.copy() if lab_filter != "All": filtered_devices = filtered_devices[filtered_devices["Lab"] == lab_filter] if equipment_type_filter != "All": filtered_devices = filtered_devices[filtered_devices["Equipment_Type"] == equipment_type_filter] filtered_logs = logs_df[logs_df["Device_ID"].isin(filtered_devices["Device_ID"])] if start_date_dt and end_date_dt: filtered_logs["Log_Date"] = pd.to_datetime(filtered_logs["Log_Timestamp"]).dt.date start_date_str = start_date_dt.date() end_date_str = end_date_dt.date() filtered_logs = filtered_logs[ (filtered_logs["Log_Date"] >= start_date_str) & (filtered_logs["Log_Date"] <= end_date_str) ] print(f"Filtered logs count: {len(filtered_logs)}") # Debug log # Device Cards device_cards = "Device Cards:\n" if filtered_devices.empty: device_cards += "No devices match the selected filters.\n" for _, device in filtered_devices.iterrows(): device_logs = filtered_logs[filtered_logs["Device_ID"] == device["Device_ID"]] usage_count = device_logs["Usage_Count"].sum() if not device_logs.empty else 0 last_log = device_logs["Log_Timestamp"].max() if not device_logs.empty else "No logs" device_cards += ( f"Device: {device['Device_ID']}, Lab: {device['Lab']}, Type: {device['Equipment_Type']}, " f"Status: {device['Status']}, Usage Count: {usage_count}, Last Log: {last_log}\n" ) # Daily Log Trends (Matplotlib Plot) daily_trend_plot = None if not filtered_logs.empty: filtered_logs["Date"] = pd.to_datetime(filtered_logs["Log_Timestamp"]).dt.date daily_trends = filtered_logs.groupby("Date")["Usage_Count"].sum().reset_index() plt.figure(figsize=(8, 4)) plt.plot(daily_trends["Date"], daily_trends["Usage_Count"], marker="o", color="#1f77b4") plt.title("Daily Log Trends") plt.xlabel("Date") plt.ylabel("Total Usage Count") plt.xticks(rotation=45) plt.tight_layout() daily_trend_plot = plt.gcf() else: daily_trend_plot = "No data available for Daily Log Trends in the selected range." # Weekly Uptime % (Matplotlib Plot) uptime_plot = None if not filtered_logs.empty: filtered_logs["Week"] = pd.to_datetime(filtered_logs["Log_Timestamp"]).dt.isocalendar().week uptime_data = filtered_logs.groupby("Week")["Status"].value_counts().unstack(fill_value=0) uptime_data["Uptime_%"] = uptime_data.get("Operational", 0) / ( uptime_data.get("Operational", 0) + uptime_data.get("Down", 0) + uptime_data.get("Maintenance", 0) ) * 100 plt.figure(figsize=(8, 4)) plt.bar(uptime_data.index, uptime_data["Uptime_%"], color="#ff7f0e") plt.title("Weekly Uptime %") plt.xlabel("Week") plt.ylabel("Uptime %") plt.tight_layout() uptime_plot = plt.gcf() else: uptime_plot = "No data available for Weekly Uptime % in the selected range." # Anomaly Alerts (Usage spikes: >2x average usage) anomaly_alerts = "Anomaly Alerts:\n" if not filtered_logs.empty: avg_usage = filtered_logs["Usage_Count"].mean() anomalies = filtered_logs[filtered_logs["Usage_Count"] > 2 * avg_usage] if anomalies.empty: anomaly_alerts += "No anomalies detected.\n" for _, log in anomalies.iterrows(): anomaly_alerts += ( f"Device: {log['Device_ID']}, Timestamp: {log['Log_Timestamp']}, " f"Usage Spike: {log['Usage_Count']} (Avg: {avg_usage:.2f})\n" ) else: anomaly_alerts += "No data available for anomaly detection.\n" # AMC Reminders (simulated) report = "LabOps Dashboard Report:\n" report += f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n" report += device_cards + "\n" report += anomaly_alerts + "\n" report += "AMC Reminders:\n" if not filtered_logs.empty: amc_expiring = filtered_logs[pd.to_datetime(filtered_logs["AMC_Expiry"]) <= (datetime.now() + timedelta(days=14))] if amc_expiring.empty: report += "No AMC expiries within the next 14 days.\n" for _, log in amc_expiring.iterrows(): report += ( f"Device: {log['Device_ID']}, AMC Expiry: {log['AMC_Expiry']}\n" ) else: report += "No data available for AMC reminders.\n" return device_cards, daily_trend_plot, uptime_plot, anomaly_alerts, report, "labops_report.txt" except Exception as e: return f"Error: {str(e)}", None, None, None, "", None # Define Gradio interface with gr.Blocks(title="LabOps Dashboard") as demo: gr.Markdown("# LabOps Dashboard") gr.Markdown("Monitor smart lab devices, view usage trends, uptime, anomalies, and export reports.") gr.Markdown("**Note**: Use the calendar picker to select dates in YYYY-MM-DD format (e.g., 2025-05-01 to 2025-05-30). Dates must be between 2025-01-01 and 2025-06-30. If the calendar picker doesn't appear, enter dates manually.") # Filters gr.Markdown("## Filters") lab_filter = gr.Dropdown(choices=["All"] + list(devices_df["Lab"].unique()), label="Lab Site") equipment_type_filter = gr.Dropdown(choices=["All"] + list(devices_df["Equipment_Type"].unique()), label="Equipment Type") start_date = gr.Textbox( label="Start Date (YYYY-MM-DD)", placeholder="Select or enter date (e.g., 2025-05-01)", elem_id="start_date_picker" ) end_date = gr.Textbox( label="End Date (YYYY-MM-DD)", placeholder="Select or enter date (e.g., 2025-05-30)", elem_id="end_date_picker" ) # Custom JavaScript to enable browser-native date picker with constraints gr.HTML(""" """) # Dashboard Components gr.Markdown("## Device Cards") device_cards_output = gr.Textbox(label="Device Status", lines=10, interactive=False) gr.Markdown("## Daily Log Trends") daily_trend_plot = gr.Plot(label="Daily Usage Trends") gr.Markdown("## Weekly Uptime %") uptime_plot = gr.Plot(label="Weekly Uptime") gr.Markdown("## Anomaly Alerts") anomaly_alerts_output = gr.Textbox(label="Anomaly Alerts", lines=5, interactive=False) # Export Report gr.Markdown("## Export Report") report_output = gr.Textbox(label="Report Preview", lines=10, interactive=False) download_button = gr.File(label="Download Report as Text") # Update dashboard on filter change def update_dashboard(lab, equipment, start_date, end_date): device_cards, daily_trend, uptime, anomalies, report, report_file = process_dashboard_data( lab, equipment, start_date, end_date ) if report_file: with open(report_file, "w") as f: f.write(report) return device_cards, daily_trend, uptime, anomalies, report, report_file gr.Button("Update Dashboard").click( fn=update_dashboard, inputs=[lab_filter, equipment_type_filter, start_date, end_date], outputs=[device_cards_output, daily_trend_plot, uptime_plot, anomaly_alerts_output, report_output, download_button] ) # Launch the app demo.launch()