RathodHarish's picture
Update app.py
898eeba verified
raw
history blame
19.9 kB
import gradio as gr
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import io
from datetime import datetime, timedelta
import sys
import traceback
# Try to import fpdf2, but allow the app to run without it
try:
from fpdf2 import FPDF
FPDF_AVAILABLE = True
print("FPDF2 successfully loaded.") # Debug log to confirm fpdf2 installation
except ImportError:
FPDF_AVAILABLE = False
FPDF = None
print("FPDF2 not installed. PDF download feature will be disabled.")
# Check library versions for debugging
debug_msg = "Library Versions:\n"
try:
debug_msg += f"Python: {sys.version}\n"
debug_msg += f"Gradio: {gr.__version__}\n"
debug_msg += f"Pandas: {pd.__version__}\n"
debug_msg += f"Matplotlib: {matplotlib.__version__}\n"
debug_msg += f"NumPy: {np.__version__}\n"
if FPDF_AVAILABLE:
debug_msg += f"FPDF2: {FPDF.__version__}\n"
else:
debug_msg += "FPDF2: Not installed (PDF download feature disabled)\n"
except Exception as e:
debug_msg += f"Error checking library versions: {str(e)}\n"
# Global DataFrame to store the CSV data
df = pd.DataFrame()
def upload_csv(file):
global df
debug_msg_local = debug_msg + "\nStarting CSV upload process...\n"
try:
if file is None:
debug_msg_local += "No file uploaded. Please upload a CSV file.\n"
print(debug_msg_local) # Log to console for debugging
return ["All"], ["All"], ["All"], debug_msg_local, "All", "All", "All", None, None, None, None
# Read the CSV file with encoding handling
debug_msg_local += "Reading CSV file...\n"
try:
df = pd.read_csv(file, encoding='utf-8')
except UnicodeDecodeError:
debug_msg_local += "Error: CSV file encoding is not UTF-8. Trying latin1 encoding...\n"
df = pd.read_csv(file, encoding='latin1')
except Exception as e:
debug_msg_local += f"Error reading CSV file: {str(e)}\n{traceback.format_exc()}\n"
print(debug_msg_local)
return ["All"], ["All"], ["All"], debug_msg_local, "All", "All", "All", None, None, None, None
if df.empty:
debug_msg_local += "The uploaded CSV file is empty.\n"
print(debug_msg_local)
return ["All"], ["All"], ["All"], debug_msg_local, "All", "All", "All", None, None, None, None
# Debug: Show the CSV column names (limit verbosity)
debug_msg_local += f"CSV Columns: {', '.join(df.columns)}\n"
# Define required columns
required_columns = {'DeviceID', 'Lab', 'Type', 'Timestamp', 'Status', 'UsageCount'}
if not required_columns.issubset(df.columns):
missing_cols = required_columns - set(df.columns)
debug_msg_local += f"Error: CSV is missing required columns: {', '.join(missing_cols)}\n"
print(debug_msg_local)
return ["All"], ["All"], ["All"], debug_msg_local, "All", "All", "All", None, None, None, None
# Debug: Check data types and sample values (limit to 5 rows)
debug_msg_local += f"Data Types:\n{df.dtypes.to_string()}\n"
debug_msg_local += f"Sample Values (first 5 rows):\n{df.head(5).to_string()}\n"
# Check for empty or all-NaN columns
if df['Lab'].dropna().empty:
debug_msg_local += "Error: Lab column is empty or contains only NaN values.\n"
if df['Type'].dropna().empty:
debug_msg_local += "Error: Type column is empty or contains only NaN values.\n"
if df['Lab'].dropna().empty or df['Type'].dropna().empty:
print(debug_msg_local)
return ["All"], ["All"], ["All"], debug_msg_local, "All", "All", "All", None, None, None, None
# Convert Timestamp to datetime with a specific format and fallback
debug_msg_local += "Converting Timestamp column...\n"
try:
# Try parsing with a common format first
df['Timestamp'] = pd.to_datetime(df['Timestamp'], format='%Y-%m-%d %H:%M:%S', errors='coerce')
# If parsing fails for some rows, try without a specific format
if df['Timestamp'].isna().any():
debug_msg_local += "Some timestamps failed to parse with format '%Y-%m-%d %H:%M:%S'. Falling back to generic parsing...\n"
df['Timestamp'] = pd.to_datetime(df['Timestamp'], errors='coerce')
timestamps_invalid = df['Timestamp'].isna().all()
if timestamps_invalid:
debug_msg_local += "Warning: All Timestamp values are invalid or unparseable. Date range filtering will be disabled.\n"
except Exception as e:
debug_msg_local += f"Error parsing Timestamp column: {str(e)}\n{traceback.format_exc()}\n"
print(debug_msg_local)
return ["All"], ["All"], ["All"], debug_msg_local, "All", "All", "All", None, None, None, None
# Extract unique values for dropdowns
debug_msg_local += "Extracting unique values for dropdowns...\n"
labs = ['All'] + sorted([str(lab) for lab in df['Lab'].dropna().unique()])
types = ['All'] + sorted([str(v) for v in df['Type'].dropna().unique()])
debug_msg_local += f"Lab options: {', '.join(labs)}\nType options: {', '.join(types)}\n"
# Extract date range for filter
if timestamps_invalid:
date_ranges = ['All']
debug_msg_local += "Date range dropdown disabled due to invalid timestamps.\n"
else:
min_date = df['Timestamp'].min()
max_date = df['Timestamp'].max()
if pd.isna(min_date) or pd.isna(max_date):
date_ranges = ['All']
debug_msg_local += "Warning: Could not determine date range due to invalid timestamps.\n"
else:
min_date_str = min_date.strftime('%Y-%m-%d')
max_date_str = max_date.strftime('%Y-%m-%d')
date_ranges = ['All', f"{min_date_str} to {max_date_str}"]
debug_msg_local += f"Date Range: {min_date_str} to {max_date_str}\n"
# Automatically trigger filter_and_visualize after upload with default filters
debug_msg_local += "Triggering initial visualization with default filters...\n"
try:
device_cards, plot_daily, plot_uptime, anomaly_text, filter_msg = filter_and_visualize("All", "All", "All")
debug_msg_local += f"Initial Filter Result: {filter_msg}\n"
except Exception as e:
debug_msg_local += f"Initial Filter Error: {str(e)}\n{traceback.format_exc()}\n"
device_cards, plot_daily, plot_uptime, anomaly_text = None, None, None, None
# Truncate debug message to prevent Gradio rendering issues
debug_msg_local = debug_msg_local[:5000] # Limit to 5000 characters
print(debug_msg_local)
return labs, types, date_ranges, debug_msg_local, "All", "All", "All", device_cards, plot_daily, plot_uptime, anomaly_text
except Exception as e:
debug_msg_local += f"Failed to process CSV: {str(e)}\n{traceback.format_exc()}\n"
debug_msg_local = debug_msg_local[:5000] # Limit to 5000 characters
print(debug_msg_local)
return ["All"], ["All"], ["All"], debug_msg_local, "All", "All", "All", None, None, None, None
def filter_and_visualize(selected_lab, selected_type, selected_date_range):
global df
error_msg = "Starting filter and visualize process...\n"
try:
if df.empty:
error_msg += "No data available.\n"
print(error_msg)
return None, None, None, None, error_msg
# Debug: Log the filter parameters
error_msg += f"Applying filters: Lab={selected_lab}, Type={selected_type}, Date Range={selected_date_range}\n"
# Filter the DataFrame
filtered_df = df.copy()
error_msg += f"Initial DataFrame: {len(filtered_df)} rows\n"
if selected_lab != "All":
filtered_df = filtered_df[filtered_df["Lab"] == selected_lab]
error_msg += f"After Lab filter ({selected_lab}): {len(filtered_df)} rows\n"
if selected_type != "All":
filtered_df = filtered_df[filtered_df["Type"] == selected_type]
error_msg += f"After Type filter ({selected_type}): {len(filtered_df)} rows\n"
if selected_date_range != "All" and selected_date_range != "No data available." and not df['Timestamp'].isna().all():
try:
start_date, end_date = selected_date_range.split(" to ")
start_date = pd.to_datetime(start_date)
end_date = pd.to_datetime(end_date) + timedelta(days=1) # Include end date
filtered_df = filtered_df[(filtered_df["Timestamp"] >= start_date) & (filtered_df["Timestamp"] < end_date)]
error_msg += f"After Date Range filter ({start_date} to {end_date}): {len(filtered_df)} rows\n"
except Exception as e:
error_msg += f"Error parsing date range: {str(e)}\n{traceback.format_exc()}\n"
if filtered_df.empty:
error_msg += "No data matches the selected filters.\n"
print(error_msg)
return None, None, None, None, error_msg
# Debug: Log the filtered DataFrame (limit verbosity)
error_msg += f"Filtered DataFrame (first 5 rows):\n{filtered_df.head(5).to_string()}\n"
# Device Cards (as a table)
device_cards = filtered_df[['DeviceID', 'Lab', 'Type', 'UsageCount', 'Timestamp']].sort_values(by='Timestamp', ascending=False)
# Daily Log Trends (Line Chart)
try:
if df['Timestamp'].isna().all():
error_msg += "Warning: All timestamps are invalid. Skipping Daily Log Trends.\n"
plt.figure(figsize=(8, 4))
plt.title("Daily Log Trends - No Data (Invalid Timestamps)")
plt.xlabel("Date")
plt.ylabel("Number of Logs")
plot_daily = io.BytesIO()
plt.savefig(plot_daily, format="png", bbox_inches="tight")
plt.close()
plot_daily.seek(0)
else:
daily_logs = filtered_df.groupby(filtered_df['Timestamp'].dt.date).size()
if daily_logs.empty:
error_msg += "Warning: No data for Daily Log Trends.\n"
plt.figure(figsize=(8, 4))
plt.title("Daily Log Trends - No Data")
plt.xlabel("Date")
plt.ylabel("Number of Logs")
plot_daily = io.BytesIO()
plt.savefig(plot_daily, format="png", bbox_inches="tight")
plt.close()
plot_daily.seek(0)
else:
plt.figure(figsize=(8, 4))
daily_logs.plot(kind='line', marker='o', color='blue')
plt.title("Daily Log Trends")
plt.xlabel("Date")
plt.ylabel("Number of Logs")
plt.xticks(rotation=45)
plot_daily = io.BytesIO()
plt.savefig(plot_daily, format="png", bbox RACinches="tight")
plt.close()
plot_daily.seek(0)
except Exception as e:
error_msg += f"Error generating Daily Log Trends: {str(e)}\n{traceback.format_exc()}\n"
plt.figure(figsize=(8, 4))
plt.title("Daily Log Trends - Error")
plt.xlabel("Date")
plt.ylabel("Number of Logs")
plot_daily = io.BytesIO()
plt.savefig(plot_daily, format="png", bbox_inches="tight")
plt.close()
plot_daily.seek(0)
# Weekly Uptime % (Bar Chart)
try:
if df['Timestamp'].isna().all():
error_msg += "Warning: All timestamps are invalid. Skipping Weekly Uptime.\n"
plt.figure(figsize=(8, 4))
plt.title("Weekly Uptime % - No Data (Invalid Timestamps)")
plt.xlabel("Date")
plt.ylabel("Uptime %")
plot_uptime = io.BytesIO()
plt.savefig(plot_uptime, format="png", bbox_inches="tight")
plt.close()
plot_uptime.seek(0)
else:
end_date = filtered_df['Timestamp'].max()
start_date = end_date - timedelta(days=7)
weekly_df = filtered_df[(filtered_df['Timestamp'] >= start_date) & (filtered_df['Timestamp'] <= end_date)]
if weekly_df.empty:
error_msg += "Warning: No data for Weekly Uptime % (date range too narrow).\n"
plt.figure(figsize=(8, 4))
plt.title("Weekly Uptime % - No Data")
plt.xlabel("Date")
plt.ylabel("Uptime %")
plot_uptime = io.BytesIO()
plt.savefig(plot_uptime, format="png", bbox_inches="tight")
plt.close()
plot_uptime.seek(0)
else:
uptime = weekly_df.groupby(weekly_df['Timestamp'].dt.date)['Status'].apply(lambda x: (x == 'Up').mean() * 100)
plt.figure(figsize=(8, 4))
uptime.plot(kind='bar', color='green')
plt.title("Weekly Uptime %")
plt.xlabel("Date")
plt.ylabel("Uptime %")
plt.xticks(rotation=45)
plot_uptime = io.BytesIO()
plt.savefig(plot_uptime, format="png", bbox_inches="tight")
plt.close()
plot_uptime.seek(0)
except Exception as e:
error_msg += f"Error generating Weekly Uptime %: {str(e)}\n{traceback.format_exc()}\n"
plt.figure(figsize=(8, 4))
plt.title("Weekly Uptime % - Error")
plt.xlabel("Date")
plt.ylabel("Uptime %")
plot_uptime = io.BytesIO()
plt.savefig(plot_uptime, format="png", bbox_inches="tight")
plt.close()
plot_uptime.seek(0)
# Anomaly Alerts (Text)
try:
anomalies = filtered_df[(filtered_df['UsageCount'] > 80) | (filtered_df['Status'] == 'Down')]
if anomalies.empty:
anomaly_text = "No anomalies detected."
else:
anomaly_text = "Anomalies Detected:\n" + anomalies[['DeviceID', 'Lab', 'Type', 'Status', 'UsageCount']].to_string(index=False)
except Exception as e:
error_msg += f"Error generating Anomaly Alerts: {str(e)}\n{traceback.format_exc()}\n"
anomaly_text = "Error generating anomaly alerts."
error_msg = error_msg[:5000] # Limit to 5000 characters
print(error_msg)
return device_cards, plot_daily, plot_uptime, anomaly_text, f"{error_msg}Filters applied successfully."
except Exception as e:
error_msg += f"Unexpected error in filter_and_visualize: {str(e)}\n{traceback.format_exc()}\n"
plt.figure(figsize=(8, 4))
plt.title("Daily Log Trends - Error")
plt.xlabel("Date")
plt.ylabel("Number of Logs")
plot_daily = io.BytesIO()
plt.savefig(plot_daily, format="png", bbox_inches="tight")
plt.close()
plot_daily.seek(0)
plt.figure(figsize=(8, 4))
plt.title("Weekly Uptime % - Error")
plt.xlabel("Date")
plt.ylabel("Uptime %")
plot_uptime = io.BytesIO()
plt.savefig(plot_uptime, format="png", bbox_inches="tight")
plt.close()
plot_uptime.seek(0)
error_msg = error_msg[:5000] # Limit to 5000 characters
print(error_msg)
return None, plot_daily, plot_uptime, "Error generating anomaly alerts.", error_msg
def download_pdf(selected_lab, selected_type, selected_date_range):
global df
try:
if not FPDF_AVAILABLE:
print("PDF download feature disabled: fpdf2 module not installed.")
return None
if df.empty:
return None
filtered_df = df.copy()
if selected_lab != "All":
filtered_df = filtered_df[filtered_df["Lab"] == selected_lab]
if selected_type != "All":
filtered_df = filtered_df[filtered_df["Type"] == selected_type]
if selected_date_range != "All" and selected_date_range != "No data available." and not df['Timestamp'].isna().all():
start_date, end_date = selected_date_range.split(" to ")
start_date = pd.to_datetime(start_date)
end_date = pd.to_datetime(end_date) + timedelta(days=1)
filtered_df = filtered_df[(filtered_df["Timestamp"] >= start_date) & (filtered_df["Timestamp"] < end_date)]
if filtered_df.empty:
return None
pdf = FPDF()
pdf.add_page()
pdf.set_font("Arial", size=12)
pdf.cell(200, 10, txt="LabOps Dashboard Report", ln=True, align='C')
pdf.ln(10)
for index, row in filtered_df.iterrows():
line = f"{row['Timestamp']} | {row['DeviceID']} | {row['Lab']} | {row['Type']} | {row['Status']} | {row['UsageCount']}"
pdf.multi_cell(0, 10, txt=line)
output = io.BytesIO()
pdf.output(output)
output.seek(0)
return output
except Exception as e:
print(f"Error in download_pdf: {str(e)}\n{traceback.format_exc()}")
return None
# Build the Gradio interface
try:
with gr.Blocks() as demo:
gr.Markdown("🧪 **Multi-Device LabOps Dashboard**\nMonitor smart lab devices, visualize logs, and generate PDF reports.")
with gr.Row():
csv_input = gr.File(label="Upload Device Logs CSV", file_types=[".csv"])
with gr.Row():
lab_dropdown = gr.Dropdown(label="Filter by Lab", choices=["All"], value="All")
type_dropdown = gr.Dropdown(label="Filter by Equipment Type", choices=["All"], value="All")
date_dropdown = gr.Dropdown(label="Filter by Date Range", choices=["All"], value="All")
with gr.Row():
submit_btn = gr.Button("Submit Filters")
with gr.Row():
device_cards = gr.DataFrame(label="Device Cards (Usage, Last Log)")
plot_daily = gr.Image(label="Daily Log Trends")
plot_uptime = gr.Image(label="Weekly Uptime %")
anomaly_output = gr.Textbox(label="Anomaly Alerts")
with gr.Row():
download_btn = gr.Button("Download PDF Report", visible=True) # Always visible since fpdf2 should be installed
error_box = gr.Textbox(label="Status/Error Message", visible=True, interactive=False)
# Connect the components
csv_input.change(
fn=upload_csv,
inputs=csv_input,
outputs=[lab_dropdown, type_dropdown, date_dropdown, error_box, lab_dropdown, type_dropdown, date_dropdown, device_cards, plot_daily, plot_uptime, anomaly_output]
)
submit_btn.click(
fn=filter_and_visualize,
inputs=[lab_dropdown, type_dropdown, date_dropdown],
outputs=[device_cards, plot_daily, plot_uptime, anomaly_output, error_box]
)
download_btn.click(
fn=download_pdf,
inputs=[lab_dropdown, type_dropdown, date_dropdown],
outputs=gr.File(label="Download PDF")
)
demo.launch()
except Exception as e:
print(f"Error launching Gradio interface: {str(e)}\n{traceback.format_exc()}")