import gradio as gr import pandas as pd import matplotlib.pyplot as plt import numpy as np import io from datetime import datetime, timedelta import sys import traceback # Try to import fpdf2, but allow the app to run without it try: from fpdf2 import FPDF FPDF_AVAILABLE = True print("FPDF2 successfully loaded.") # Debug log to confirm fpdf2 installation except ImportError: FPDF_AVAILABLE = False FPDF = None print("FPDF2 not installed. PDF download feature will be disabled.") # Check library versions for debugging debug_msg = "Library Versions:\n" try: debug_msg += f"Python: {sys.version}\n" debug_msg += f"Gradio: {gr.__version__}\n" debug_msg += f"Pandas: {pd.__version__}\n" debug_msg += f"Matplotlib: {matplotlib.__version__}\n" debug_msg += f"NumPy: {np.__version__}\n" if FPDF_AVAILABLE: debug_msg += f"FPDF2: {FPDF.__version__}\n" else: debug_msg += "FPDF2: Not installed (PDF download feature disabled)\n" except Exception as e: debug_msg += f"Error checking library versions: {str(e)}\n" # Global DataFrame to store the CSV data df = pd.DataFrame() def upload_csv(file): global df debug_msg_local = debug_msg + "\nStarting CSV upload process...\n" try: if file is None: debug_msg_local += "No file uploaded. Please upload a CSV file.\n" print(debug_msg_local) # Log to console for debugging return ["All"], ["All"], ["All"], debug_msg_local, "All", "All", "All", None, None, None, None # Read the CSV file with encoding handling debug_msg_local += "Reading CSV file...\n" try: df = pd.read_csv(file, encoding='utf-8') except UnicodeDecodeError: debug_msg_local += "Error: CSV file encoding is not UTF-8. Trying latin1 encoding...\n" df = pd.read_csv(file, encoding='latin1') except Exception as e: debug_msg_local += f"Error reading CSV file: {str(e)}\n{traceback.format_exc()}\n" print(debug_msg_local) return ["All"], ["All"], ["All"], debug_msg_local, "All", "All", "All", None, None, None, None if df.empty: debug_msg_local += "The uploaded CSV file is empty.\n" print(debug_msg_local) return ["All"], ["All"], ["All"], debug_msg_local, "All", "All", "All", None, None, None, None # Debug: Show the CSV column names (limit verbosity) debug_msg_local += f"CSV Columns: {', '.join(df.columns)}\n" # Define required columns required_columns = {'DeviceID', 'Lab', 'Type', 'Timestamp', 'Status', 'UsageCount'} if not required_columns.issubset(df.columns): missing_cols = required_columns - set(df.columns) debug_msg_local += f"Error: CSV is missing required columns: {', '.join(missing_cols)}\n" print(debug_msg_local) return ["All"], ["All"], ["All"], debug_msg_local, "All", "All", "All", None, None, None, None # Debug: Check data types and sample values (limit to 5 rows) debug_msg_local += f"Data Types:\n{df.dtypes.to_string()}\n" debug_msg_local += f"Sample Values (first 5 rows):\n{df.head(5).to_string()}\n" # Check for empty or all-NaN columns if df['Lab'].dropna().empty: debug_msg_local += "Error: Lab column is empty or contains only NaN values.\n" if df['Type'].dropna().empty: debug_msg_local += "Error: Type column is empty or contains only NaN values.\n" if df['Lab'].dropna().empty or df['Type'].dropna().empty: print(debug_msg_local) return ["All"], ["All"], ["All"], debug_msg_local, "All", "All", "All", None, None, None, None # Convert Timestamp to datetime with a specific format and fallback debug_msg_local += "Converting Timestamp column...\n" try: # Try parsing with a common format first df['Timestamp'] = pd.to_datetime(df['Timestamp'], format='%Y-%m-%d %H:%M:%S', errors='coerce') # If parsing fails for some rows, try without a specific format if df['Timestamp'].isna().any(): debug_msg_local += "Some timestamps failed to parse with format '%Y-%m-%d %H:%M:%S'. Falling back to generic parsing...\n" df['Timestamp'] = pd.to_datetime(df['Timestamp'], errors='coerce') timestamps_invalid = df['Timestamp'].isna().all() if timestamps_invalid: debug_msg_local += "Warning: All Timestamp values are invalid or unparseable. Date range filtering will be disabled.\n" except Exception as e: debug_msg_local += f"Error parsing Timestamp column: {str(e)}\n{traceback.format_exc()}\n" print(debug_msg_local) return ["All"], ["All"], ["All"], debug_msg_local, "All", "All", "All", None, None, None, None # Extract unique values for dropdowns debug_msg_local += "Extracting unique values for dropdowns...\n" labs = ['All'] + sorted([str(lab) for lab in df['Lab'].dropna().unique()]) types = ['All'] + sorted([str(v) for v in df['Type'].dropna().unique()]) debug_msg_local += f"Lab options: {', '.join(labs)}\nType options: {', '.join(types)}\n" # Extract date range for filter if timestamps_invalid: date_ranges = ['All'] debug_msg_local += "Date range dropdown disabled due to invalid timestamps.\n" else: min_date = df['Timestamp'].min() max_date = df['Timestamp'].max() if pd.isna(min_date) or pd.isna(max_date): date_ranges = ['All'] debug_msg_local += "Warning: Could not determine date range due to invalid timestamps.\n" else: min_date_str = min_date.strftime('%Y-%m-%d') max_date_str = max_date.strftime('%Y-%m-%d') date_ranges = ['All', f"{min_date_str} to {max_date_str}"] debug_msg_local += f"Date Range: {min_date_str} to {max_date_str}\n" # Automatically trigger filter_and_visualize after upload with default filters debug_msg_local += "Triggering initial visualization with default filters...\n" try: device_cards, plot_daily, plot_uptime, anomaly_text, filter_msg = filter_and_visualize("All", "All", "All") debug_msg_local += f"Initial Filter Result: {filter_msg}\n" except Exception as e: debug_msg_local += f"Initial Filter Error: {str(e)}\n{traceback.format_exc()}\n" device_cards, plot_daily, plot_uptime, anomaly_text = None, None, None, None # Truncate debug message to prevent Gradio rendering issues debug_msg_local = debug_msg_local[:5000] # Limit to 5000 characters print(debug_msg_local) return labs, types, date_ranges, debug_msg_local, "All", "All", "All", device_cards, plot_daily, plot_uptime, anomaly_text except Exception as e: debug_msg_local += f"Failed to process CSV: {str(e)}\n{traceback.format_exc()}\n" debug_msg_local = debug_msg_local[:5000] # Limit to 5000 characters print(debug_msg_local) return ["All"], ["All"], ["All"], debug_msg_local, "All", "All", "All", None, None, None, None def filter_and_visualize(selected_lab, selected_type, selected_date_range): global df error_msg = "Starting filter and visualize process...\n" try: if df.empty: error_msg += "No data available.\n" print(error_msg) return None, None, None, None, error_msg # Debug: Log the filter parameters error_msg += f"Applying filters: Lab={selected_lab}, Type={selected_type}, Date Range={selected_date_range}\n" # Filter the DataFrame filtered_df = df.copy() error_msg += f"Initial DataFrame: {len(filtered_df)} rows\n" if selected_lab != "All": filtered_df = filtered_df[filtered_df["Lab"] == selected_lab] error_msg += f"After Lab filter ({selected_lab}): {len(filtered_df)} rows\n" if selected_type != "All": filtered_df = filtered_df[filtered_df["Type"] == selected_type] error_msg += f"After Type filter ({selected_type}): {len(filtered_df)} rows\n" if selected_date_range != "All" and selected_date_range != "No data available." and not df['Timestamp'].isna().all(): try: start_date, end_date = selected_date_range.split(" to ") start_date = pd.to_datetime(start_date) end_date = pd.to_datetime(end_date) + timedelta(days=1) # Include end date filtered_df = filtered_df[(filtered_df["Timestamp"] >= start_date) & (filtered_df["Timestamp"] < end_date)] error_msg += f"After Date Range filter ({start_date} to {end_date}): {len(filtered_df)} rows\n" except Exception as e: error_msg += f"Error parsing date range: {str(e)}\n{traceback.format_exc()}\n" if filtered_df.empty: error_msg += "No data matches the selected filters.\n" print(error_msg) return None, None, None, None, error_msg # Debug: Log the filtered DataFrame (limit verbosity) error_msg += f"Filtered DataFrame (first 5 rows):\n{filtered_df.head(5).to_string()}\n" # Device Cards (as a table) device_cards = filtered_df[['DeviceID', 'Lab', 'Type', 'UsageCount', 'Timestamp']].sort_values(by='Timestamp', ascending=False) # Daily Log Trends (Line Chart) try: if df['Timestamp'].isna().all(): error_msg += "Warning: All timestamps are invalid. Skipping Daily Log Trends.\n" plt.figure(figsize=(8, 4)) plt.title("Daily Log Trends - No Data (Invalid Timestamps)") plt.xlabel("Date") plt.ylabel("Number of Logs") plot_daily = io.BytesIO() plt.savefig(plot_daily, format="png", bbox_inches="tight") plt.close() plot_daily.seek(0) else: daily_logs = filtered_df.groupby(filtered_df['Timestamp'].dt.date).size() if daily_logs.empty: error_msg += "Warning: No data for Daily Log Trends.\n" plt.figure(figsize=(8, 4)) plt.title("Daily Log Trends - No Data") plt.xlabel("Date") plt.ylabel("Number of Logs") plot_daily = io.BytesIO() plt.savefig(plot_daily, format="png", bbox_inches="tight") plt.close() plot_daily.seek(0) else: plt.figure(figsize=(8, 4)) daily_logs.plot(kind='line', marker='o', color='blue') plt.title("Daily Log Trends") plt.xlabel("Date") plt.ylabel("Number of Logs") plt.xticks(rotation=45) plot_daily = io.BytesIO() plt.savefig(plot_daily, format="png", bbox RACinches="tight") plt.close() plot_daily.seek(0) except Exception as e: error_msg += f"Error generating Daily Log Trends: {str(e)}\n{traceback.format_exc()}\n" plt.figure(figsize=(8, 4)) plt.title("Daily Log Trends - Error") plt.xlabel("Date") plt.ylabel("Number of Logs") plot_daily = io.BytesIO() plt.savefig(plot_daily, format="png", bbox_inches="tight") plt.close() plot_daily.seek(0) # Weekly Uptime % (Bar Chart) try: if df['Timestamp'].isna().all(): error_msg += "Warning: All timestamps are invalid. Skipping Weekly Uptime.\n" plt.figure(figsize=(8, 4)) plt.title("Weekly Uptime % - No Data (Invalid Timestamps)") plt.xlabel("Date") plt.ylabel("Uptime %") plot_uptime = io.BytesIO() plt.savefig(plot_uptime, format="png", bbox_inches="tight") plt.close() plot_uptime.seek(0) else: end_date = filtered_df['Timestamp'].max() start_date = end_date - timedelta(days=7) weekly_df = filtered_df[(filtered_df['Timestamp'] >= start_date) & (filtered_df['Timestamp'] <= end_date)] if weekly_df.empty: error_msg += "Warning: No data for Weekly Uptime % (date range too narrow).\n" plt.figure(figsize=(8, 4)) plt.title("Weekly Uptime % - No Data") plt.xlabel("Date") plt.ylabel("Uptime %") plot_uptime = io.BytesIO() plt.savefig(plot_uptime, format="png", bbox_inches="tight") plt.close() plot_uptime.seek(0) else: uptime = weekly_df.groupby(weekly_df['Timestamp'].dt.date)['Status'].apply(lambda x: (x == 'Up').mean() * 100) plt.figure(figsize=(8, 4)) uptime.plot(kind='bar', color='green') plt.title("Weekly Uptime %") plt.xlabel("Date") plt.ylabel("Uptime %") plt.xticks(rotation=45) plot_uptime = io.BytesIO() plt.savefig(plot_uptime, format="png", bbox_inches="tight") plt.close() plot_uptime.seek(0) except Exception as e: error_msg += f"Error generating Weekly Uptime %: {str(e)}\n{traceback.format_exc()}\n" plt.figure(figsize=(8, 4)) plt.title("Weekly Uptime % - Error") plt.xlabel("Date") plt.ylabel("Uptime %") plot_uptime = io.BytesIO() plt.savefig(plot_uptime, format="png", bbox_inches="tight") plt.close() plot_uptime.seek(0) # Anomaly Alerts (Text) try: anomalies = filtered_df[(filtered_df['UsageCount'] > 80) | (filtered_df['Status'] == 'Down')] if anomalies.empty: anomaly_text = "No anomalies detected." else: anomaly_text = "Anomalies Detected:\n" + anomalies[['DeviceID', 'Lab', 'Type', 'Status', 'UsageCount']].to_string(index=False) except Exception as e: error_msg += f"Error generating Anomaly Alerts: {str(e)}\n{traceback.format_exc()}\n" anomaly_text = "Error generating anomaly alerts." error_msg = error_msg[:5000] # Limit to 5000 characters print(error_msg) return device_cards, plot_daily, plot_uptime, anomaly_text, f"{error_msg}Filters applied successfully." except Exception as e: error_msg += f"Unexpected error in filter_and_visualize: {str(e)}\n{traceback.format_exc()}\n" plt.figure(figsize=(8, 4)) plt.title("Daily Log Trends - Error") plt.xlabel("Date") plt.ylabel("Number of Logs") plot_daily = io.BytesIO() plt.savefig(plot_daily, format="png", bbox_inches="tight") plt.close() plot_daily.seek(0) plt.figure(figsize=(8, 4)) plt.title("Weekly Uptime % - Error") plt.xlabel("Date") plt.ylabel("Uptime %") plot_uptime = io.BytesIO() plt.savefig(plot_uptime, format="png", bbox_inches="tight") plt.close() plot_uptime.seek(0) error_msg = error_msg[:5000] # Limit to 5000 characters print(error_msg) return None, plot_daily, plot_uptime, "Error generating anomaly alerts.", error_msg def download_pdf(selected_lab, selected_type, selected_date_range): global df try: if not FPDF_AVAILABLE: print("PDF download feature disabled: fpdf2 module not installed.") return None if df.empty: return None filtered_df = df.copy() if selected_lab != "All": filtered_df = filtered_df[filtered_df["Lab"] == selected_lab] if selected_type != "All": filtered_df = filtered_df[filtered_df["Type"] == selected_type] if selected_date_range != "All" and selected_date_range != "No data available." and not df['Timestamp'].isna().all(): start_date, end_date = selected_date_range.split(" to ") start_date = pd.to_datetime(start_date) end_date = pd.to_datetime(end_date) + timedelta(days=1) filtered_df = filtered_df[(filtered_df["Timestamp"] >= start_date) & (filtered_df["Timestamp"] < end_date)] if filtered_df.empty: return None pdf = FPDF() pdf.add_page() pdf.set_font("Arial", size=12) pdf.cell(200, 10, txt="LabOps Dashboard Report", ln=True, align='C') pdf.ln(10) for index, row in filtered_df.iterrows(): line = f"{row['Timestamp']} | {row['DeviceID']} | {row['Lab']} | {row['Type']} | {row['Status']} | {row['UsageCount']}" pdf.multi_cell(0, 10, txt=line) output = io.BytesIO() pdf.output(output) output.seek(0) return output except Exception as e: print(f"Error in download_pdf: {str(e)}\n{traceback.format_exc()}") return None # Build the Gradio interface try: with gr.Blocks() as demo: gr.Markdown("🧪 **Multi-Device LabOps Dashboard**\nMonitor smart lab devices, visualize logs, and generate PDF reports.") with gr.Row(): csv_input = gr.File(label="Upload Device Logs CSV", file_types=[".csv"]) with gr.Row(): lab_dropdown = gr.Dropdown(label="Filter by Lab", choices=["All"], value="All") type_dropdown = gr.Dropdown(label="Filter by Equipment Type", choices=["All"], value="All") date_dropdown = gr.Dropdown(label="Filter by Date Range", choices=["All"], value="All") with gr.Row(): submit_btn = gr.Button("Submit Filters") with gr.Row(): device_cards = gr.DataFrame(label="Device Cards (Usage, Last Log)") plot_daily = gr.Image(label="Daily Log Trends") plot_uptime = gr.Image(label="Weekly Uptime %") anomaly_output = gr.Textbox(label="Anomaly Alerts") with gr.Row(): download_btn = gr.Button("Download PDF Report", visible=True) # Always visible since fpdf2 should be installed error_box = gr.Textbox(label="Status/Error Message", visible=True, interactive=False) # Connect the components csv_input.change( fn=upload_csv, inputs=csv_input, outputs=[lab_dropdown, type_dropdown, date_dropdown, error_box, lab_dropdown, type_dropdown, date_dropdown, device_cards, plot_daily, plot_uptime, anomaly_output] ) submit_btn.click( fn=filter_and_visualize, inputs=[lab_dropdown, type_dropdown, date_dropdown], outputs=[device_cards, plot_daily, plot_uptime, anomaly_output, error_box] ) download_btn.click( fn=download_pdf, inputs=[lab_dropdown, type_dropdown, date_dropdown], outputs=gr.File(label="Download PDF") ) demo.launch() except Exception as e: print(f"Error launching Gradio interface: {str(e)}\n{traceback.format_exc()}")