Dineshpopuri's picture
Update app.py
a086af7 verified
import gradio as gr
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
import random
# Simulate sample data for lab devices (mimicking Salesforce custom objects)
def generate_sample_data():
labs = ["Lab_A", "Lab_B", "Lab_C"]
equipment_types = ["Microscope", "Centrifuge", "UV_Sterilizer"]
devices = []
logs = []
# Generate devices
for i in range(10):
devices.append({
"Device_ID": f"Device_{i+1}",
"Lab": random.choice(labs),
"Equipment_Type": random.choice(equipment_types),
"Status": random.choice(["Operational", "Down", "Maintenance"])
})
# Generate logs for a broad date range (Jan 1, 2025 to Jun 30, 2025)
start_date = datetime(2025, 1, 1)
end_date = datetime(2025, 6, 30)
for device in devices:
current_date = start_date
while current_date <= end_date:
logs.append({
"Device_ID": device["Device_ID"],
"Log_Timestamp": current_date.strftime("%Y-%m-%d %H:%M:%S"),
"Usage_Count": random.randint(0, 50),
"Status": random.choice(["Operational", "Down", "Maintenance"]),
"AMC_Expiry": (current_date + timedelta(days=random.randint(10, 90))).strftime("%Y-%m-%d")
})
current_date += timedelta(days=1)
return pd.DataFrame(devices), pd.DataFrame(logs)
# Initialize sample data
devices_df, logs_df = generate_sample_data()
def process_dashboard_data(lab_filter, equipment_type_filter, start_date, end_date):
"""
Process device and log data based on filters and return dashboard components.
Args:
lab_filter (str): Selected lab (e.g., Lab_A or All).
equipment_type_filter (str): Selected equipment type (e.g., Microscope or All).
start_date (str): Start date in YYYY-MM-DD format.
end_date (str): End date in YYYY-MM-DD format.
Returns:
tuple: Device cards text, daily trend plot, uptime plot, anomaly alerts, report text, report file.
"""
try:
# Validate and parse date inputs
if start_date and end_date:
try:
start_date = start_date.strip() if start_date else ""
end_date = end_date.strip() if end_date else ""
start_date_dt = datetime.strptime(start_date, "%Y-%m-%d")
end_date_dt = datetime.strptime(end_date, "%Y-%m-%d")
if start_date_dt > end_date_dt:
return "Error: Start date must be before end date.", None, None, None, "", None
# Check if dates are within sample data range
data_start = datetime(2025, 1, 1)
data_end = datetime(2025, 6, 30)
if start_date_dt < data_start or end_date_dt > data_end:
return f"Error: Dates must be between 2025-01-01 and 2025-06-30. Received: Start={start_date}, End={end_date}", None, None, None, "", None
except ValueError:
return f"Error: Invalid date format. Use YYYY-MM-DD (e.g., 2025-05-01). Received: Start={start_date}, End={end_date}", None, None, None, "", None
else:
start_date_dt = datetime(2025, 1, 1)
end_date_dt = datetime(2025, 6, 30)
print(f"Input dates: Start={start_date}, End={end_date}") # Debug log
# Apply filters
filtered_devices = devices_df.copy()
if lab_filter != "All":
filtered_devices = filtered_devices[filtered_devices["Lab"] == lab_filter]
if equipment_type_filter != "All":
filtered_devices = filtered_devices[filtered_devices["Equipment_Type"] == equipment_type_filter]
filtered_logs = logs_df[logs_df["Device_ID"].isin(filtered_devices["Device_ID"])]
if start_date_dt and end_date_dt:
filtered_logs["Log_Date"] = pd.to_datetime(filtered_logs["Log_Timestamp"]).dt.date
start_date_str = start_date_dt.date()
end_date_str = end_date_dt.date()
filtered_logs = filtered_logs[
(filtered_logs["Log_Date"] >= start_date_str) &
(filtered_logs["Log_Date"] <= end_date_str)
]
print(f"Filtered logs count: {len(filtered_logs)}") # Debug log
# Device Cards
device_cards = "Device Cards:\n"
if filtered_devices.empty:
device_cards += "No devices match the selected filters.\n"
for _, device in filtered_devices.iterrows():
device_logs = filtered_logs[filtered_logs["Device_ID"] == device["Device_ID"]]
usage_count = device_logs["Usage_Count"].sum() if not device_logs.empty else 0
last_log = device_logs["Log_Timestamp"].max() if not device_logs.empty else "No logs"
device_cards += (
f"Device: {device['Device_ID']}, Lab: {device['Lab']}, Type: {device['Equipment_Type']}, "
f"Status: {device['Status']}, Usage Count: {usage_count}, Last Log: {last_log}\n"
)
# Daily Log Trends (Matplotlib Plot)
daily_trend_plot = None
if not filtered_logs.empty:
filtered_logs["Date"] = pd.to_datetime(filtered_logs["Log_Timestamp"]).dt.date
daily_trends = filtered_logs.groupby("Date")["Usage_Count"].sum().reset_index()
plt.figure(figsize=(8, 4))
plt.plot(daily_trends["Date"], daily_trends["Usage_Count"], marker="o", color="#1f77b4")
plt.title("Daily Log Trends")
plt.xlabel("Date")
plt.ylabel("Total Usage Count")
plt.xticks(rotation=45)
plt.tight_layout()
daily_trend_plot = plt.gcf()
else:
daily_trend_plot = "No data available for Daily Log Trends in the selected range."
# Weekly Uptime % (Matplotlib Plot)
uptime_plot = None
if not filtered_logs.empty:
filtered_logs["Week"] = pd.to_datetime(filtered_logs["Log_Timestamp"]).dt.isocalendar().week
uptime_data = filtered_logs.groupby("Week")["Status"].value_counts().unstack(fill_value=0)
uptime_data["Uptime_%"] = uptime_data.get("Operational", 0) / (
uptime_data.get("Operational", 0) + uptime_data.get("Down", 0) + uptime_data.get("Maintenance", 0)
) * 100
plt.figure(figsize=(8, 4))
plt.bar(uptime_data.index, uptime_data["Uptime_%"], color="#ff7f0e")
plt.title("Weekly Uptime %")
plt.xlabel("Week")
plt.ylabel("Uptime %")
plt.tight_layout()
uptime_plot = plt.gcf()
else:
uptime_plot = "No data available for Weekly Uptime % in the selected range."
# Anomaly Alerts (Usage spikes: >2x average usage)
anomaly_alerts = "Anomaly Alerts:\n"
if not filtered_logs.empty:
avg_usage = filtered_logs["Usage_Count"].mean()
anomalies = filtered_logs[filtered_logs["Usage_Count"] > 2 * avg_usage]
if anomalies.empty:
anomaly_alerts += "No anomalies detected.\n"
for _, log in anomalies.iterrows():
anomaly_alerts += (
f"Device: {log['Device_ID']}, Timestamp: {log['Log_Timestamp']}, "
f"Usage Spike: {log['Usage_Count']} (Avg: {avg_usage:.2f})\n"
)
else:
anomaly_alerts += "No data available for anomaly detection.\n"
# AMC Reminders (simulated)
report = "LabOps Dashboard Report:\n"
report += f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
report += device_cards + "\n"
report += anomaly_alerts + "\n"
report += "AMC Reminders:\n"
if not filtered_logs.empty:
amc_expiring = filtered_logs[pd.to_datetime(filtered_logs["AMC_Expiry"]) <= (datetime.now() + timedelta(days=14))]
if amc_expiring.empty:
report += "No AMC expiries within the next 14 days.\n"
for _, log in amc_expiring.iterrows():
report += (
f"Device: {log['Device_ID']}, AMC Expiry: {log['AMC_Expiry']}\n"
)
else:
report += "No data available for AMC reminders.\n"
return device_cards, daily_trend_plot, uptime_plot, anomaly_alerts, report, "labops_report.txt"
except Exception as e:
return f"Error: {str(e)}", None, None, None, "", None
# Define Gradio interface
with gr.Blocks(title="LabOps Dashboard") as demo:
gr.Markdown("# LabOps Dashboard")
gr.Markdown("Monitor smart lab devices, view usage trends, uptime, anomalies, and export reports.")
gr.Markdown("**Note**: Use the calendar picker to select dates in YYYY-MM-DD format (e.g., 2025-05-01 to 2025-05-30). Dates must be between 2025-01-01 and 2025-06-30. If the calendar picker doesn't appear, enter dates manually.")
# Filters
gr.Markdown("## Filters")
lab_filter = gr.Dropdown(choices=["All"] + list(devices_df["Lab"].unique()), label="Lab Site")
equipment_type_filter = gr.Dropdown(choices=["All"] + list(devices_df["Equipment_Type"].unique()), label="Equipment Type")
start_date = gr.Textbox(
label="Start Date (YYYY-MM-DD)",
placeholder="Select or enter date (e.g., 2025-05-01)",
elem_id="start_date_picker"
)
end_date = gr.Textbox(
label="End Date (YYYY-MM-DD)",
placeholder="Select or enter date (e.g., 2025-05-30)",
elem_id="end_date_picker"
)
# Custom JavaScript to enable browser-native date picker with constraints
gr.HTML("""
<script>
document.addEventListener('DOMContentLoaded', function() {
const startPicker = document.getElementById('start_date_picker');
const endPicker = document.getElementById('end_date_picker');
if (startPicker && endPicker) {
startPicker.type = 'date';
endPicker.type = 'date';
startPicker.min = '2025-01-01';
startPicker.max = '2025-06-30';
endPicker.min = '2025-01-01';
endPicker.max = '2025-06-30';
} else {
console.error('Date picker elements not found');
}
});
</script>
""")
# Dashboard Components
gr.Markdown("## Device Cards")
device_cards_output = gr.Textbox(label="Device Status", lines=10, interactive=False)
gr.Markdown("## Daily Log Trends")
daily_trend_plot = gr.Plot(label="Daily Usage Trends")
gr.Markdown("## Weekly Uptime %")
uptime_plot = gr.Plot(label="Weekly Uptime")
gr.Markdown("## Anomaly Alerts")
anomaly_alerts_output = gr.Textbox(label="Anomaly Alerts", lines=5, interactive=False)
# Export Report
gr.Markdown("## Export Report")
report_output = gr.Textbox(label="Report Preview", lines=10, interactive=False)
download_button = gr.File(label="Download Report as Text")
# Update dashboard on filter change
def update_dashboard(lab, equipment, start_date, end_date):
device_cards, daily_trend, uptime, anomalies, report, report_file = process_dashboard_data(
lab, equipment, start_date, end_date
)
if report_file:
with open(report_file, "w") as f:
f.write(report)
return device_cards, daily_trend, uptime, anomalies, report, report_file
gr.Button("Update Dashboard").click(
fn=update_dashboard,
inputs=[lab_filter, equipment_type_filter, start_date, end_date],
outputs=[device_cards_output, daily_trend_plot, uptime_plot, anomaly_alerts_output, report_output, download_button]
)
# Launch the app
demo.launch()