Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| from datetime import datetime, timedelta | |
| import random | |
| # Simulate sample data for lab devices (mimicking Salesforce custom objects) | |
| def generate_sample_data(): | |
| labs = ["Lab_A", "Lab_B", "Lab_C"] | |
| equipment_types = ["Microscope", "Centrifuge", "UV_Sterilizer"] | |
| devices = [] | |
| logs = [] | |
| # Generate devices | |
| for i in range(10): | |
| devices.append({ | |
| "Device_ID": f"Device_{i+1}", | |
| "Lab": random.choice(labs), | |
| "Equipment_Type": random.choice(equipment_types), | |
| "Status": random.choice(["Operational", "Down", "Maintenance"]) | |
| }) | |
| # Generate logs for a broad date range (Jan 1, 2025 to Jun 30, 2025) | |
| start_date = datetime(2025, 1, 1) | |
| end_date = datetime(2025, 6, 30) | |
| for device in devices: | |
| current_date = start_date | |
| while current_date <= end_date: | |
| logs.append({ | |
| "Device_ID": device["Device_ID"], | |
| "Log_Timestamp": current_date.strftime("%Y-%m-%d %H:%M:%S"), | |
| "Usage_Count": random.randint(0, 50), | |
| "Status": random.choice(["Operational", "Down", "Maintenance"]), | |
| "AMC_Expiry": (current_date + timedelta(days=random.randint(10, 90))).strftime("%Y-%m-%d") | |
| }) | |
| current_date += timedelta(days=1) | |
| return pd.DataFrame(devices), pd.DataFrame(logs) | |
| # Initialize sample data | |
| devices_df, logs_df = generate_sample_data() | |
| def process_dashboard_data(lab_filter, equipment_type_filter, start_date, end_date): | |
| """ | |
| Process device and log data based on filters and return dashboard components. | |
| Args: | |
| lab_filter (str): Selected lab (e.g., Lab_A or All). | |
| equipment_type_filter (str): Selected equipment type (e.g., Microscope or All). | |
| start_date (str): Start date in YYYY-MM-DD format. | |
| end_date (str): End date in YYYY-MM-DD format. | |
| Returns: | |
| tuple: Device cards text, daily trend plot, uptime plot, anomaly alerts, report text, report file. | |
| """ | |
| try: | |
| # Validate and parse date inputs | |
| if start_date and end_date: | |
| try: | |
| start_date = start_date.strip() if start_date else "" | |
| end_date = end_date.strip() if end_date else "" | |
| start_date_dt = datetime.strptime(start_date, "%Y-%m-%d") | |
| end_date_dt = datetime.strptime(end_date, "%Y-%m-%d") | |
| if start_date_dt > end_date_dt: | |
| return "Error: Start date must be before end date.", None, None, None, "", None | |
| # Check if dates are within sample data range | |
| data_start = datetime(2025, 1, 1) | |
| data_end = datetime(2025, 6, 30) | |
| if start_date_dt < data_start or end_date_dt > data_end: | |
| return f"Error: Dates must be between 2025-01-01 and 2025-06-30. Received: Start={start_date}, End={end_date}", None, None, None, "", None | |
| except ValueError: | |
| return f"Error: Invalid date format. Use YYYY-MM-DD (e.g., 2025-05-01). Received: Start={start_date}, End={end_date}", None, None, None, "", None | |
| else: | |
| start_date_dt = datetime(2025, 1, 1) | |
| end_date_dt = datetime(2025, 6, 30) | |
| print(f"Input dates: Start={start_date}, End={end_date}") # Debug log | |
| # Apply filters | |
| filtered_devices = devices_df.copy() | |
| if lab_filter != "All": | |
| filtered_devices = filtered_devices[filtered_devices["Lab"] == lab_filter] | |
| if equipment_type_filter != "All": | |
| filtered_devices = filtered_devices[filtered_devices["Equipment_Type"] == equipment_type_filter] | |
| filtered_logs = logs_df[logs_df["Device_ID"].isin(filtered_devices["Device_ID"])] | |
| if start_date_dt and end_date_dt: | |
| filtered_logs["Log_Date"] = pd.to_datetime(filtered_logs["Log_Timestamp"]).dt.date | |
| start_date_str = start_date_dt.date() | |
| end_date_str = end_date_dt.date() | |
| filtered_logs = filtered_logs[ | |
| (filtered_logs["Log_Date"] >= start_date_str) & | |
| (filtered_logs["Log_Date"] <= end_date_str) | |
| ] | |
| print(f"Filtered logs count: {len(filtered_logs)}") # Debug log | |
| # Device Cards | |
| device_cards = "Device Cards:\n" | |
| if filtered_devices.empty: | |
| device_cards += "No devices match the selected filters.\n" | |
| for _, device in filtered_devices.iterrows(): | |
| device_logs = filtered_logs[filtered_logs["Device_ID"] == device["Device_ID"]] | |
| usage_count = device_logs["Usage_Count"].sum() if not device_logs.empty else 0 | |
| last_log = device_logs["Log_Timestamp"].max() if not device_logs.empty else "No logs" | |
| device_cards += ( | |
| f"Device: {device['Device_ID']}, Lab: {device['Lab']}, Type: {device['Equipment_Type']}, " | |
| f"Status: {device['Status']}, Usage Count: {usage_count}, Last Log: {last_log}\n" | |
| ) | |
| # Daily Log Trends (Matplotlib Plot) | |
| daily_trend_plot = None | |
| if not filtered_logs.empty: | |
| filtered_logs["Date"] = pd.to_datetime(filtered_logs["Log_Timestamp"]).dt.date | |
| daily_trends = filtered_logs.groupby("Date")["Usage_Count"].sum().reset_index() | |
| plt.figure(figsize=(8, 4)) | |
| plt.plot(daily_trends["Date"], daily_trends["Usage_Count"], marker="o", color="#1f77b4") | |
| plt.title("Daily Log Trends") | |
| plt.xlabel("Date") | |
| plt.ylabel("Total Usage Count") | |
| plt.xticks(rotation=45) | |
| plt.tight_layout() | |
| daily_trend_plot = plt.gcf() | |
| else: | |
| daily_trend_plot = "No data available for Daily Log Trends in the selected range." | |
| # Weekly Uptime % (Matplotlib Plot) | |
| uptime_plot = None | |
| if not filtered_logs.empty: | |
| filtered_logs["Week"] = pd.to_datetime(filtered_logs["Log_Timestamp"]).dt.isocalendar().week | |
| uptime_data = filtered_logs.groupby("Week")["Status"].value_counts().unstack(fill_value=0) | |
| uptime_data["Uptime_%"] = uptime_data.get("Operational", 0) / ( | |
| uptime_data.get("Operational", 0) + uptime_data.get("Down", 0) + uptime_data.get("Maintenance", 0) | |
| ) * 100 | |
| plt.figure(figsize=(8, 4)) | |
| plt.bar(uptime_data.index, uptime_data["Uptime_%"], color="#ff7f0e") | |
| plt.title("Weekly Uptime %") | |
| plt.xlabel("Week") | |
| plt.ylabel("Uptime %") | |
| plt.tight_layout() | |
| uptime_plot = plt.gcf() | |
| else: | |
| uptime_plot = "No data available for Weekly Uptime % in the selected range." | |
| # Anomaly Alerts (Usage spikes: >2x average usage) | |
| anomaly_alerts = "Anomaly Alerts:\n" | |
| if not filtered_logs.empty: | |
| avg_usage = filtered_logs["Usage_Count"].mean() | |
| anomalies = filtered_logs[filtered_logs["Usage_Count"] > 2 * avg_usage] | |
| if anomalies.empty: | |
| anomaly_alerts += "No anomalies detected.\n" | |
| for _, log in anomalies.iterrows(): | |
| anomaly_alerts += ( | |
| f"Device: {log['Device_ID']}, Timestamp: {log['Log_Timestamp']}, " | |
| f"Usage Spike: {log['Usage_Count']} (Avg: {avg_usage:.2f})\n" | |
| ) | |
| else: | |
| anomaly_alerts += "No data available for anomaly detection.\n" | |
| # AMC Reminders (simulated) | |
| report = "LabOps Dashboard Report:\n" | |
| report += f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n" | |
| report += device_cards + "\n" | |
| report += anomaly_alerts + "\n" | |
| report += "AMC Reminders:\n" | |
| if not filtered_logs.empty: | |
| amc_expiring = filtered_logs[pd.to_datetime(filtered_logs["AMC_Expiry"]) <= (datetime.now() + timedelta(days=14))] | |
| if amc_expiring.empty: | |
| report += "No AMC expiries within the next 14 days.\n" | |
| for _, log in amc_expiring.iterrows(): | |
| report += ( | |
| f"Device: {log['Device_ID']}, AMC Expiry: {log['AMC_Expiry']}\n" | |
| ) | |
| else: | |
| report += "No data available for AMC reminders.\n" | |
| return device_cards, daily_trend_plot, uptime_plot, anomaly_alerts, report, "labops_report.txt" | |
| except Exception as e: | |
| return f"Error: {str(e)}", None, None, None, "", None | |
| # Define Gradio interface | |
| with gr.Blocks(title="LabOps Dashboard") as demo: | |
| gr.Markdown("# LabOps Dashboard") | |
| gr.Markdown("Monitor smart lab devices, view usage trends, uptime, anomalies, and export reports.") | |
| gr.Markdown("**Note**: Use the calendar picker to select dates in YYYY-MM-DD format (e.g., 2025-05-01 to 2025-05-30). Dates must be between 2025-01-01 and 2025-06-30. If the calendar picker doesn't appear, enter dates manually.") | |
| # Filters | |
| gr.Markdown("## Filters") | |
| lab_filter = gr.Dropdown(choices=["All"] + list(devices_df["Lab"].unique()), label="Lab Site") | |
| equipment_type_filter = gr.Dropdown(choices=["All"] + list(devices_df["Equipment_Type"].unique()), label="Equipment Type") | |
| start_date = gr.Textbox( | |
| label="Start Date (YYYY-MM-DD)", | |
| placeholder="Select or enter date (e.g., 2025-05-01)", | |
| elem_id="start_date_picker" | |
| ) | |
| end_date = gr.Textbox( | |
| label="End Date (YYYY-MM-DD)", | |
| placeholder="Select or enter date (e.g., 2025-05-30)", | |
| elem_id="end_date_picker" | |
| ) | |
| # Custom JavaScript to enable browser-native date picker with constraints | |
| gr.HTML(""" | |
| <script> | |
| document.addEventListener('DOMContentLoaded', function() { | |
| const startPicker = document.getElementById('start_date_picker'); | |
| const endPicker = document.getElementById('end_date_picker'); | |
| if (startPicker && endPicker) { | |
| startPicker.type = 'date'; | |
| endPicker.type = 'date'; | |
| startPicker.min = '2025-01-01'; | |
| startPicker.max = '2025-06-30'; | |
| endPicker.min = '2025-01-01'; | |
| endPicker.max = '2025-06-30'; | |
| } else { | |
| console.error('Date picker elements not found'); | |
| } | |
| }); | |
| </script> | |
| """) | |
| # Dashboard Components | |
| gr.Markdown("## Device Cards") | |
| device_cards_output = gr.Textbox(label="Device Status", lines=10, interactive=False) | |
| gr.Markdown("## Daily Log Trends") | |
| daily_trend_plot = gr.Plot(label="Daily Usage Trends") | |
| gr.Markdown("## Weekly Uptime %") | |
| uptime_plot = gr.Plot(label="Weekly Uptime") | |
| gr.Markdown("## Anomaly Alerts") | |
| anomaly_alerts_output = gr.Textbox(label="Anomaly Alerts", lines=5, interactive=False) | |
| # Export Report | |
| gr.Markdown("## Export Report") | |
| report_output = gr.Textbox(label="Report Preview", lines=10, interactive=False) | |
| download_button = gr.File(label="Download Report as Text") | |
| # Update dashboard on filter change | |
| def update_dashboard(lab, equipment, start_date, end_date): | |
| device_cards, daily_trend, uptime, anomalies, report, report_file = process_dashboard_data( | |
| lab, equipment, start_date, end_date | |
| ) | |
| if report_file: | |
| with open(report_file, "w") as f: | |
| f.write(report) | |
| return device_cards, daily_trend, uptime, anomalies, report, report_file | |
| gr.Button("Update Dashboard").click( | |
| fn=update_dashboard, | |
| inputs=[lab_filter, equipment_type_filter, start_date, end_date], | |
| outputs=[device_cards_output, daily_trend_plot, uptime_plot, anomaly_alerts_output, report_output, download_button] | |
| ) | |
| # Launch the app | |
| demo.launch() |