Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| import io | |
| from datetime import datetime, timedelta | |
| import sys | |
| import traceback | |
| # Try to import fpdf2, but allow the app to run without it | |
| try: | |
| from fpdf2 import FPDF | |
| FPDF_AVAILABLE = True | |
| print("FPDF2 successfully loaded.") # Debug log to confirm fpdf2 installation | |
| except ImportError: | |
| FPDF_AVAILABLE = False | |
| FPDF = None | |
| print("FPDF2 not installed. PDF download feature will be disabled.") | |
| # Check library versions for debugging | |
| debug_msg = "Library Versions:\n" | |
| try: | |
| debug_msg += f"Python: {sys.version}\n" | |
| debug_msg += f"Gradio: {gr.__version__}\n" | |
| debug_msg += f"Pandas: {pd.__version__}\n" | |
| debug_msg += f"Matplotlib: {matplotlib.__version__}\n" | |
| debug_msg += f"NumPy: {np.__version__}\n" | |
| if FPDF_AVAILABLE: | |
| debug_msg += f"FPDF2: {FPDF.__version__}\n" | |
| else: | |
| debug_msg += "FPDF2: Not installed (PDF download feature disabled)\n" | |
| except Exception as e: | |
| debug_msg += f"Error checking library versions: {str(e)}\n" | |
| # Global DataFrame to store the CSV data | |
| df = pd.DataFrame() | |
| def upload_csv(file): | |
| global df | |
| debug_msg_local = debug_msg + "\nStarting CSV upload process...\n" | |
| try: | |
| if file is None: | |
| debug_msg_local += "No file uploaded. Please upload a CSV file.\n" | |
| print(debug_msg_local) # Log to console for debugging | |
| return ["All"], ["All"], ["All"], debug_msg_local, "All", "All", "All", None, None, None, None | |
| # Read the CSV file with encoding handling | |
| debug_msg_local += "Reading CSV file...\n" | |
| try: | |
| df = pd.read_csv(file, encoding='utf-8') | |
| except UnicodeDecodeError: | |
| debug_msg_local += "Error: CSV file encoding is not UTF-8. Trying latin1 encoding...\n" | |
| df = pd.read_csv(file, encoding='latin1') | |
| except Exception as e: | |
| debug_msg_local += f"Error reading CSV file: {str(e)}\n{traceback.format_exc()}\n" | |
| print(debug_msg_local) | |
| return ["All"], ["All"], ["All"], debug_msg_local, "All", "All", "All", None, None, None, None | |
| if df.empty: | |
| debug_msg_local += "The uploaded CSV file is empty.\n" | |
| print(debug_msg_local) | |
| return ["All"], ["All"], ["All"], debug_msg_local, "All", "All", "All", None, None, None, None | |
| # Debug: Show the CSV column names (limit verbosity) | |
| debug_msg_local += f"CSV Columns: {', '.join(df.columns)}\n" | |
| # Define required columns | |
| required_columns = {'DeviceID', 'Lab', 'Type', 'Timestamp', 'Status', 'UsageCount'} | |
| if not required_columns.issubset(df.columns): | |
| missing_cols = required_columns - set(df.columns) | |
| debug_msg_local += f"Error: CSV is missing required columns: {', '.join(missing_cols)}\n" | |
| print(debug_msg_local) | |
| return ["All"], ["All"], ["All"], debug_msg_local, "All", "All", "All", None, None, None, None | |
| # Debug: Check data types and sample values (limit to 5 rows) | |
| debug_msg_local += f"Data Types:\n{df.dtypes.to_string()}\n" | |
| debug_msg_local += f"Sample Values (first 5 rows):\n{df.head(5).to_string()}\n" | |
| # Check for empty or all-NaN columns | |
| if df['Lab'].dropna().empty: | |
| debug_msg_local += "Error: Lab column is empty or contains only NaN values.\n" | |
| if df['Type'].dropna().empty: | |
| debug_msg_local += "Error: Type column is empty or contains only NaN values.\n" | |
| if df['Lab'].dropna().empty or df['Type'].dropna().empty: | |
| print(debug_msg_local) | |
| return ["All"], ["All"], ["All"], debug_msg_local, "All", "All", "All", None, None, None, None | |
| # Convert Timestamp to datetime with a specific format and fallback | |
| debug_msg_local += "Converting Timestamp column...\n" | |
| try: | |
| # Try parsing with a common format first | |
| df['Timestamp'] = pd.to_datetime(df['Timestamp'], format='%Y-%m-%d %H:%M:%S', errors='coerce') | |
| # If parsing fails for some rows, try without a specific format | |
| if df['Timestamp'].isna().any(): | |
| debug_msg_local += "Some timestamps failed to parse with format '%Y-%m-%d %H:%M:%S'. Falling back to generic parsing...\n" | |
| df['Timestamp'] = pd.to_datetime(df['Timestamp'], errors='coerce') | |
| timestamps_invalid = df['Timestamp'].isna().all() | |
| if timestamps_invalid: | |
| debug_msg_local += "Warning: All Timestamp values are invalid or unparseable. Date range filtering will be disabled.\n" | |
| except Exception as e: | |
| debug_msg_local += f"Error parsing Timestamp column: {str(e)}\n{traceback.format_exc()}\n" | |
| print(debug_msg_local) | |
| return ["All"], ["All"], ["All"], debug_msg_local, "All", "All", "All", None, None, None, None | |
| # Extract unique values for dropdowns | |
| debug_msg_local += "Extracting unique values for dropdowns...\n" | |
| labs = ['All'] + sorted([str(lab) for lab in df['Lab'].dropna().unique()]) | |
| types = ['All'] + sorted([str(v) for v in df['Type'].dropna().unique()]) | |
| debug_msg_local += f"Lab options: {', '.join(labs)}\nType options: {', '.join(types)}\n" | |
| # Extract date range for filter | |
| if timestamps_invalid: | |
| date_ranges = ['All'] | |
| debug_msg_local += "Date range dropdown disabled due to invalid timestamps.\n" | |
| else: | |
| min_date = df['Timestamp'].min() | |
| max_date = df['Timestamp'].max() | |
| if pd.isna(min_date) or pd.isna(max_date): | |
| date_ranges = ['All'] | |
| debug_msg_local += "Warning: Could not determine date range due to invalid timestamps.\n" | |
| else: | |
| min_date_str = min_date.strftime('%Y-%m-%d') | |
| max_date_str = max_date.strftime('%Y-%m-%d') | |
| date_ranges = ['All', f"{min_date_str} to {max_date_str}"] | |
| debug_msg_local += f"Date Range: {min_date_str} to {max_date_str}\n" | |
| # Automatically trigger filter_and_visualize after upload with default filters | |
| debug_msg_local += "Triggering initial visualization with default filters...\n" | |
| try: | |
| device_cards, plot_daily, plot_uptime, anomaly_text, filter_msg = filter_and_visualize("All", "All", "All") | |
| debug_msg_local += f"Initial Filter Result: {filter_msg}\n" | |
| except Exception as e: | |
| debug_msg_local += f"Initial Filter Error: {str(e)}\n{traceback.format_exc()}\n" | |
| device_cards, plot_daily, plot_uptime, anomaly_text = None, None, None, None | |
| # Truncate debug message to prevent Gradio rendering issues | |
| debug_msg_local = debug_msg_local[:5000] # Limit to 5000 characters | |
| print(debug_msg_local) | |
| return labs, types, date_ranges, debug_msg_local, "All", "All", "All", device_cards, plot_daily, plot_uptime, anomaly_text | |
| except Exception as e: | |
| debug_msg_local += f"Failed to process CSV: {str(e)}\n{traceback.format_exc()}\n" | |
| debug_msg_local = debug_msg_local[:5000] # Limit to 5000 characters | |
| print(debug_msg_local) | |
| return ["All"], ["All"], ["All"], debug_msg_local, "All", "All", "All", None, None, None, None | |
| def filter_and_visualize(selected_lab, selected_type, selected_date_range): | |
| global df | |
| error_msg = "Starting filter and visualize process...\n" | |
| try: | |
| if df.empty: | |
| error_msg += "No data available.\n" | |
| print(error_msg) | |
| return None, None, None, None, error_msg | |
| # Debug: Log the filter parameters | |
| error_msg += f"Applying filters: Lab={selected_lab}, Type={selected_type}, Date Range={selected_date_range}\n" | |
| # Filter the DataFrame | |
| filtered_df = df.copy() | |
| error_msg += f"Initial DataFrame: {len(filtered_df)} rows\n" | |
| if selected_lab != "All": | |
| filtered_df = filtered_df[filtered_df["Lab"] == selected_lab] | |
| error_msg += f"After Lab filter ({selected_lab}): {len(filtered_df)} rows\n" | |
| if selected_type != "All": | |
| filtered_df = filtered_df[filtered_df["Type"] == selected_type] | |
| error_msg += f"After Type filter ({selected_type}): {len(filtered_df)} rows\n" | |
| if selected_date_range != "All" and selected_date_range != "No data available." and not df['Timestamp'].isna().all(): | |
| try: | |
| start_date, end_date = selected_date_range.split(" to ") | |
| start_date = pd.to_datetime(start_date) | |
| end_date = pd.to_datetime(end_date) + timedelta(days=1) # Include end date | |
| filtered_df = filtered_df[(filtered_df["Timestamp"] >= start_date) & (filtered_df["Timestamp"] < end_date)] | |
| error_msg += f"After Date Range filter ({start_date} to {end_date}): {len(filtered_df)} rows\n" | |
| except Exception as e: | |
| error_msg += f"Error parsing date range: {str(e)}\n{traceback.format_exc()}\n" | |
| if filtered_df.empty: | |
| error_msg += "No data matches the selected filters.\n" | |
| print(error_msg) | |
| return None, None, None, None, error_msg | |
| # Debug: Log the filtered DataFrame (limit verbosity) | |
| error_msg += f"Filtered DataFrame (first 5 rows):\n{filtered_df.head(5).to_string()}\n" | |
| # Device Cards (as a table) | |
| device_cards = filtered_df[['DeviceID', 'Lab', 'Type', 'UsageCount', 'Timestamp']].sort_values(by='Timestamp', ascending=False) | |
| # Daily Log Trends (Line Chart) | |
| try: | |
| if df['Timestamp'].isna().all(): | |
| error_msg += "Warning: All timestamps are invalid. Skipping Daily Log Trends.\n" | |
| plt.figure(figsize=(8, 4)) | |
| plt.title("Daily Log Trends - No Data (Invalid Timestamps)") | |
| plt.xlabel("Date") | |
| plt.ylabel("Number of Logs") | |
| plot_daily = io.BytesIO() | |
| plt.savefig(plot_daily, format="png", bbox_inches="tight") | |
| plt.close() | |
| plot_daily.seek(0) | |
| else: | |
| daily_logs = filtered_df.groupby(filtered_df['Timestamp'].dt.date).size() | |
| if daily_logs.empty: | |
| error_msg += "Warning: No data for Daily Log Trends.\n" | |
| plt.figure(figsize=(8, 4)) | |
| plt.title("Daily Log Trends - No Data") | |
| plt.xlabel("Date") | |
| plt.ylabel("Number of Logs") | |
| plot_daily = io.BytesIO() | |
| plt.savefig(plot_daily, format="png", bbox_inches="tight") | |
| plt.close() | |
| plot_daily.seek(0) | |
| else: | |
| plt.figure(figsize=(8, 4)) | |
| daily_logs.plot(kind='line', marker='o', color='blue') | |
| plt.title("Daily Log Trends") | |
| plt.xlabel("Date") | |
| plt.ylabel("Number of Logs") | |
| plt.xticks(rotation=45) | |
| plot_daily = io.BytesIO() | |
| plt.savefig(plot_daily, format="png", bbox RACinches="tight") | |
| plt.close() | |
| plot_daily.seek(0) | |
| except Exception as e: | |
| error_msg += f"Error generating Daily Log Trends: {str(e)}\n{traceback.format_exc()}\n" | |
| plt.figure(figsize=(8, 4)) | |
| plt.title("Daily Log Trends - Error") | |
| plt.xlabel("Date") | |
| plt.ylabel("Number of Logs") | |
| plot_daily = io.BytesIO() | |
| plt.savefig(plot_daily, format="png", bbox_inches="tight") | |
| plt.close() | |
| plot_daily.seek(0) | |
| # Weekly Uptime % (Bar Chart) | |
| try: | |
| if df['Timestamp'].isna().all(): | |
| error_msg += "Warning: All timestamps are invalid. Skipping Weekly Uptime.\n" | |
| plt.figure(figsize=(8, 4)) | |
| plt.title("Weekly Uptime % - No Data (Invalid Timestamps)") | |
| plt.xlabel("Date") | |
| plt.ylabel("Uptime %") | |
| plot_uptime = io.BytesIO() | |
| plt.savefig(plot_uptime, format="png", bbox_inches="tight") | |
| plt.close() | |
| plot_uptime.seek(0) | |
| else: | |
| end_date = filtered_df['Timestamp'].max() | |
| start_date = end_date - timedelta(days=7) | |
| weekly_df = filtered_df[(filtered_df['Timestamp'] >= start_date) & (filtered_df['Timestamp'] <= end_date)] | |
| if weekly_df.empty: | |
| error_msg += "Warning: No data for Weekly Uptime % (date range too narrow).\n" | |
| plt.figure(figsize=(8, 4)) | |
| plt.title("Weekly Uptime % - No Data") | |
| plt.xlabel("Date") | |
| plt.ylabel("Uptime %") | |
| plot_uptime = io.BytesIO() | |
| plt.savefig(plot_uptime, format="png", bbox_inches="tight") | |
| plt.close() | |
| plot_uptime.seek(0) | |
| else: | |
| uptime = weekly_df.groupby(weekly_df['Timestamp'].dt.date)['Status'].apply(lambda x: (x == 'Up').mean() * 100) | |
| plt.figure(figsize=(8, 4)) | |
| uptime.plot(kind='bar', color='green') | |
| plt.title("Weekly Uptime %") | |
| plt.xlabel("Date") | |
| plt.ylabel("Uptime %") | |
| plt.xticks(rotation=45) | |
| plot_uptime = io.BytesIO() | |
| plt.savefig(plot_uptime, format="png", bbox_inches="tight") | |
| plt.close() | |
| plot_uptime.seek(0) | |
| except Exception as e: | |
| error_msg += f"Error generating Weekly Uptime %: {str(e)}\n{traceback.format_exc()}\n" | |
| plt.figure(figsize=(8, 4)) | |
| plt.title("Weekly Uptime % - Error") | |
| plt.xlabel("Date") | |
| plt.ylabel("Uptime %") | |
| plot_uptime = io.BytesIO() | |
| plt.savefig(plot_uptime, format="png", bbox_inches="tight") | |
| plt.close() | |
| plot_uptime.seek(0) | |
| # Anomaly Alerts (Text) | |
| try: | |
| anomalies = filtered_df[(filtered_df['UsageCount'] > 80) | (filtered_df['Status'] == 'Down')] | |
| if anomalies.empty: | |
| anomaly_text = "No anomalies detected." | |
| else: | |
| anomaly_text = "Anomalies Detected:\n" + anomalies[['DeviceID', 'Lab', 'Type', 'Status', 'UsageCount']].to_string(index=False) | |
| except Exception as e: | |
| error_msg += f"Error generating Anomaly Alerts: {str(e)}\n{traceback.format_exc()}\n" | |
| anomaly_text = "Error generating anomaly alerts." | |
| error_msg = error_msg[:5000] # Limit to 5000 characters | |
| print(error_msg) | |
| return device_cards, plot_daily, plot_uptime, anomaly_text, f"{error_msg}Filters applied successfully." | |
| except Exception as e: | |
| error_msg += f"Unexpected error in filter_and_visualize: {str(e)}\n{traceback.format_exc()}\n" | |
| plt.figure(figsize=(8, 4)) | |
| plt.title("Daily Log Trends - Error") | |
| plt.xlabel("Date") | |
| plt.ylabel("Number of Logs") | |
| plot_daily = io.BytesIO() | |
| plt.savefig(plot_daily, format="png", bbox_inches="tight") | |
| plt.close() | |
| plot_daily.seek(0) | |
| plt.figure(figsize=(8, 4)) | |
| plt.title("Weekly Uptime % - Error") | |
| plt.xlabel("Date") | |
| plt.ylabel("Uptime %") | |
| plot_uptime = io.BytesIO() | |
| plt.savefig(plot_uptime, format="png", bbox_inches="tight") | |
| plt.close() | |
| plot_uptime.seek(0) | |
| error_msg = error_msg[:5000] # Limit to 5000 characters | |
| print(error_msg) | |
| return None, plot_daily, plot_uptime, "Error generating anomaly alerts.", error_msg | |
| def download_pdf(selected_lab, selected_type, selected_date_range): | |
| global df | |
| try: | |
| if not FPDF_AVAILABLE: | |
| print("PDF download feature disabled: fpdf2 module not installed.") | |
| return None | |
| if df.empty: | |
| return None | |
| filtered_df = df.copy() | |
| if selected_lab != "All": | |
| filtered_df = filtered_df[filtered_df["Lab"] == selected_lab] | |
| if selected_type != "All": | |
| filtered_df = filtered_df[filtered_df["Type"] == selected_type] | |
| if selected_date_range != "All" and selected_date_range != "No data available." and not df['Timestamp'].isna().all(): | |
| start_date, end_date = selected_date_range.split(" to ") | |
| start_date = pd.to_datetime(start_date) | |
| end_date = pd.to_datetime(end_date) + timedelta(days=1) | |
| filtered_df = filtered_df[(filtered_df["Timestamp"] >= start_date) & (filtered_df["Timestamp"] < end_date)] | |
| if filtered_df.empty: | |
| return None | |
| pdf = FPDF() | |
| pdf.add_page() | |
| pdf.set_font("Arial", size=12) | |
| pdf.cell(200, 10, txt="LabOps Dashboard Report", ln=True, align='C') | |
| pdf.ln(10) | |
| for index, row in filtered_df.iterrows(): | |
| line = f"{row['Timestamp']} | {row['DeviceID']} | {row['Lab']} | {row['Type']} | {row['Status']} | {row['UsageCount']}" | |
| pdf.multi_cell(0, 10, txt=line) | |
| output = io.BytesIO() | |
| pdf.output(output) | |
| output.seek(0) | |
| return output | |
| except Exception as e: | |
| print(f"Error in download_pdf: {str(e)}\n{traceback.format_exc()}") | |
| return None | |
| # Build the Gradio interface | |
| try: | |
| with gr.Blocks() as demo: | |
| gr.Markdown("🧪 **Multi-Device LabOps Dashboard**\nMonitor smart lab devices, visualize logs, and generate PDF reports.") | |
| with gr.Row(): | |
| csv_input = gr.File(label="Upload Device Logs CSV", file_types=[".csv"]) | |
| with gr.Row(): | |
| lab_dropdown = gr.Dropdown(label="Filter by Lab", choices=["All"], value="All") | |
| type_dropdown = gr.Dropdown(label="Filter by Equipment Type", choices=["All"], value="All") | |
| date_dropdown = gr.Dropdown(label="Filter by Date Range", choices=["All"], value="All") | |
| with gr.Row(): | |
| submit_btn = gr.Button("Submit Filters") | |
| with gr.Row(): | |
| device_cards = gr.DataFrame(label="Device Cards (Usage, Last Log)") | |
| plot_daily = gr.Image(label="Daily Log Trends") | |
| plot_uptime = gr.Image(label="Weekly Uptime %") | |
| anomaly_output = gr.Textbox(label="Anomaly Alerts") | |
| with gr.Row(): | |
| download_btn = gr.Button("Download PDF Report", visible=True) # Always visible since fpdf2 should be installed | |
| error_box = gr.Textbox(label="Status/Error Message", visible=True, interactive=False) | |
| # Connect the components | |
| csv_input.change( | |
| fn=upload_csv, | |
| inputs=csv_input, | |
| outputs=[lab_dropdown, type_dropdown, date_dropdown, error_box, lab_dropdown, type_dropdown, date_dropdown, device_cards, plot_daily, plot_uptime, anomaly_output] | |
| ) | |
| submit_btn.click( | |
| fn=filter_and_visualize, | |
| inputs=[lab_dropdown, type_dropdown, date_dropdown], | |
| outputs=[device_cards, plot_daily, plot_uptime, anomaly_output, error_box] | |
| ) | |
| download_btn.click( | |
| fn=download_pdf, | |
| inputs=[lab_dropdown, type_dropdown, date_dropdown], | |
| outputs=gr.File(label="Download PDF") | |
| ) | |
| demo.launch() | |
| except Exception as e: | |
| print(f"Error launching Gradio interface: {str(e)}\n{traceback.format_exc()}") |