RathodHarish's picture
Update app.py
28b9a07 verified
raw
history blame
17 kB
import gradio as gr
import pandas as pd
import matplotlib.pyplot as plt
from fpdf import FPDF
import io
from datetime import datetime, timedelta
import sys
# Check library versions for debugging
debug_msg = "Library Versions:\n"
try:
debug_msg += f"Python: {sys.version}\n"
debug_msg += f"Gradio: {gr.__version__}\n"
debug_msg += f"Pandas: {pd.__version__}\n"
debug_msg += f"Matplotlib: {matplotlib.__version__}\n"
debug_msg += f"FPDF: {FPDF.FPDF_VERSION}\n"
except Exception as e:
debug_msg += f"Error checking library versions: {str(e)}\n"
# Global DataFrame to store the CSV data
df = pd.DataFrame()
def upload_csv(file):
global df
debug_msg_local = debug_msg + "\nStarting CSV upload process...\n"
try:
if file is None:
return ["All"], ["All"], ["All"], f"{debug_msg_local}No file uploaded. Please upload a CSV file.", "All", "All", "All", None, None, None, None
# Read the CSV file
debug_msg_local += "Reading CSV file...\n"
df = pd.read_csv(file)
if df.empty:
return ["All"], ["All"], ["All"], f"{debug_msg_local}The uploaded CSV file is empty.", "All", "All", "All", None, None, None, None
# Debug: Show the CSV content and column names
debug_msg_local += f"CSV Columns: {', '.join(df.columns)}\nRaw CSV Content:\n{df.to_string()}\n\n"
# Define required columns
required_columns = {'DeviceID', 'Lab', 'Type', 'Timestamp', 'Status', 'UsageCount'}
if not required_columns.issubset(df.columns):
missing_cols = required_columns - set(df.columns)
return ["All"], ["All"], ["All"], f"{debug_msg_local}Error: CSV is missing required columns: {', '.join(missing_cols)}", "All", "All", "All", None, None, None, None
# Debug: Check data types and sample values
debug_msg_local += f"Data Types:\n{df.dtypes}\n\nSample Values:\n{df.head().to_string()}\n\n"
# Check for empty or all-NaN columns
if df['Lab'].dropna().empty:
debug_msg_local += "Error: Lab column is empty or contains only NaN values.\n"
if df['Type'].dropna().empty:
debug_msg_local += "Error: Type column is empty or contains only NaN values.\n"
if df['Lab'].dropna().empty or df['Type'].dropna().empty:
return ["All"], ["All"], ["All"], debug_msg_local, "All", "All", "All", None, None, None, None
# Convert Timestamp to datetime with error handling
debug_msg_local += "Converting Timestamp column...\n"
try:
df['Timestamp'] = pd.to_datetime(df['Timestamp'], errors='coerce')
debug_msg_local += f"Timestamps after conversion:\n{df['Timestamp'].to_string()}\n\n"
timestamps_invalid = df['Timestamp'].isna().all()
if timestamps_invalid:
debug_msg_local += "Warning: All Timestamp values are invalid or unparseable. Date range filtering will be disabled.\n"
except Exception as e:
return ["All"], ["All"], ["All"], f"{debug_msg_local}Error: Failed to parse Timestamp column: {str(e)}", "All", "All", "All", None, None, None, None
# Extract unique values for dropdowns
debug_msg_local += "Extracting unique values for dropdowns...\n"
labs = ['All'] + sorted([str(lab) for lab in df['Lab'].dropna().unique()])
types = ['All'] + sorted([str(v) for v in df['Type'].dropna().unique()])
debug_msg_local += f"Lab options: {', '.join(labs)}\nType options: {', '.join(types)}\n\n"
# Extract date range for filter
if timestamps_invalid:
date_ranges = ['All']
debug_msg_local += "Date range dropdown disabled due to invalid timestamps.\n"
else:
min_date = df['Timestamp'].min()
max_date = df['Timestamp'].max()
if pd.isna(min_date) or pd.isna(max_date):
date_ranges = ['All']
debug_msg_local += "Warning: Could not determine date range due to invalid timestamps.\n"
else:
min_date_str = min_date.strftime('%Y-%m-%d')
max_date_str = max_date.strftime('%Y-%m-%d')
date_ranges = ['All', f"{min_date_str} to {max_date_str}"]
debug_msg_local += f"Date Range: {min_date_str} to {max_date_str}\n"
# Automatically trigger filter_and_visualize after upload with default filters
debug_msg_local += "Triggering initial visualization with default filters...\n"
try:
device_cards, plot_daily, plot_uptime, anomaly_text, filter_msg = filter_and_visualize("All", "All", "All")
debug_msg_local += f"Initial Filter Result: {filter_msg}\n"
except Exception as e:
debug_msg_local += f"Initial Filter Error: {str(e)}\n"
device_cards, plot_daily, plot_uptime, anomaly_text = None, None, None, None
return labs, types, date_ranges, debug_msg_local, "All", "All", "All", device_cards, plot_daily, plot_uptime, anomaly_text
except Exception as e:
return ["All"], ["All"], ["All"], f"{debug_msg_local}Failed to process CSV: {str(e)}", "All", "All", "All", None, None, None, None
def filter_and_visualize(selected_lab, selected_type, selected_date_range):
global df
error_msg = "Starting filter and visualize process...\n"
try:
if df.empty:
return None, None, None, None, f"{error_msg}No data available."
# Debug: Log the filter parameters
error_msg += f"Applying filters: Lab={selected_lab}, Type={selected_type}, Date Range={selected_date_range}\n"
# Filter the DataFrame
filtered_df = df.copy()
error_msg += f"Initial DataFrame: {len(filtered_df)} rows\n"
if selected_lab != "All":
filtered_df = filtered_df[filtered_df["Lab"] == selected_lab]
error_msg += f"After Lab filter ({selected_lab}): {len(filtered_df)} rows\n"
if selected_type != "All":
filtered_df = filtered_df[filtered_df["Type"] == selected_type]
error_msg += f"After Type filter ({selected_type}): {len(filtered_df)} rows\n"
if selected_date_range != "All" and selected_date_range != "No data available." and not df['Timestamp'].isna().all():
try:
start_date, end_date = selected_date_range.split(" to ")
start_date = pd.to_datetime(start_date)
end_date = pd.to_datetime(end_date) + timedelta(days=1) # Include end date
filtered_df = filtered_df[(filtered_df["Timestamp"] >= start_date) & (filtered_df["Timestamp"] < end_date)]
error_msg += f"After Date Range filter ({start_date} to {end_date}): {len(filtered_df)} rows\n"
except Exception as e:
error_msg += f"Error parsing date range: {str(e)}\n"
if filtered_df.empty:
return None, None, None, None, f"{error_msg}No data matches the selected filters."
# Debug: Log the filtered DataFrame
error_msg += f"Filtered DataFrame:\n{filtered_df.to_string()}\n"
# Device Cards (as a table)
device_cards = filtered_df[['DeviceID', 'Lab', 'Type', 'UsageCount', 'Timestamp']].sort_values(by='Timestamp', ascending=False)
# Daily Log Trends (Line Chart)
try:
if df['Timestamp'].isna().all():
error_msg += "Warning: All timestamps are invalid. Skipping Daily Log Trends.\n"
plt.figure(figsize=(8, 4))
plt.title("Daily Log Trends - No Data (Invalid Timestamps)")
plt.xlabel("Date")
plt.ylabel("Number of Logs")
plot_daily = io.BytesIO()
plt.savefig(plot_daily, format="png", bbox_inches="tight")
plt.close()
plot_daily.seek(0)
else:
daily_logs = filtered_df.groupby(filtered_df['Timestamp'].dt.date).size()
if daily_logs.empty:
error_msg += "Warning: No data for Daily Log Trends.\n"
plt.figure(figsize=(8, 4))
plt.title("Daily Log Trends - No Data")
plt.xlabel("Date")
plt.ylabel("Number of Logs")
plot_daily = io.BytesIO()
plt.savefig(plot_daily, format="png", bbox_inches="tight")
plt.close()
plot_daily.seek(0)
else:
plt.figure(figsize=(8, 4))
daily_logs.plot(kind='line', marker='o', color='blue')
plt.title("Daily Log Trends")
plt.xlabel("Date")
plt.ylabel("Number of Logs")
plt.xticks(rotation=45)
plot_daily = io.BytesIO()
plt.savefig(plot_daily, format="png", bbox_inches="tight")
plt.close()
plot_daily.seek(0)
except Exception as e:
error_msg += f"Error generating Daily Log Trends: {str(e)}\n"
plt.figure(figsize=(8, 4))
plt.title("Daily Log Trends - Error")
plt.xlabel("Date")
plt.ylabel("Number of Logs")
plot_daily = io.BytesIO()
plt.savefig(plot_daily, format="png", bbox_inches="tight")
plt.close()
plot_daily.seek(0)
# Weekly Uptime % (Bar Chart)
try:
if df['Timestamp'].isna().all():
error_msg += "Warning: All timestamps are invalid. Skipping Weekly Uptime.\n"
plt.figure(figsize=(8, 4))
plt.title("Weekly Uptime % - No Data (Invalid Timestamps)")
plt.xlabel("Date")
plt.ylabel("Uptime %")
plot_uptime = io.BytesIO()
plt.savefig(plot_uptime, format="png", bbox_inches="tight")
plt.close()
plot_uptime.seek(0)
else:
end_date = filtered_df['Timestamp'].max()
start_date = end_date - timedelta(days=7)
weekly_df = filtered_df[(filtered_df['Timestamp'] >= start_date) & (filtered_df['Timestamp'] <= end_date)]
if weekly_df.empty:
error_msg += "Warning: No data for Weekly Uptime % (date range too narrow).\n"
plt.figure(figsize=(8, 4))
plt.title("Weekly Uptime % - No Data")
plt.xlabel("Date")
plt.ylabel("Uptime %")
plot_uptime = io.BytesIO()
plt.savefig(plot_uptime, format="png", bbox_inches="tight")
plt.close()
plot_uptime.seek(0)
else:
uptime = weekly_df.groupby(weekly_df['Timestamp'].dt.date)['Status'].apply(lambda x: (x == 'Up').mean() * 100)
plt.figure(figsize=(8, 4))
uptime.plot(kind='bar', color='green')
plt.title("Weekly Uptime %")
plt.xlabel("Date")
plt.ylabel("Uptime %")
plt.xticks(rotation=45)
plot_uptime = io.BytesIO()
plt.savefig(plot_uptime, format="png", bbox_inches="tight")
plt.close()
plot_uptime.seek(0)
except Exception as e:
error_msg += f"Error generating Weekly Uptime %: {str(e)}\n"
plt.figure(figsize=(8, 4))
plt.title("Weekly Uptime % - Error")
plt.xlabel("Date")
plt.ylabel("Uptime %")
plot_uptime = io.BytesIO()
plt.savefig(plot_uptime, format="png", bbox_inches="tight")
plt.close()
plot_uptime.seek(0)
# Anomaly Alerts (Text)
try:
anomalies = filtered_df[(filtered_df['UsageCount'] > 80) | (filtered_df['Status'] == 'Down')]
if anomalies.empty:
anomaly_text = "No anomalies detected."
else:
anomaly_text = "Anomalies Detected:\n" + anomalies[['DeviceID', 'Lab', 'Type', 'Status', 'UsageCount']].to_string(index=False)
except Exception as e:
error_msg += f"Error generating Anomaly Alerts: {str(e)}\n"
anomaly_text = "Error generating anomaly alerts."
return device_cards, plot_daily, plot_uptime, anomaly_text, f"{error_msg}Filters applied successfully."
except Exception as e:
error_msg += f"Unexpected error in filter_and_visualize: {str(e)}\n"
plt.figure(figsize=(8, 4))
plt.title("Daily Log Trends - Error")
plt.xlabel("Date")
plt.ylabel("Number of Logs")
plot_daily = io.BytesIO()
plt.savefig(plot_daily, format="png", bbox_inches="tight")
plt.close()
plot_daily.seek(0)
plt.figure(figsize=(8, 4))
plt.title("Weekly Uptime % - Error")
plt.xlabel("Date")
plt.ylabel("Uptime %")
plot_uptime = io.BytesIO()
plt.savefig(plot_uptime, format="png", bbox_inches="tight")
plt.close()
plot_uptime.seek(0)
return None, plot_daily, plot_uptime, "Error generating anomaly alerts.", error_msg
def download_pdf(selected_lab, selected_type, selected_date_range):
global df
try:
if df.empty:
return None
filtered_df = df.copy()
if selected_lab != "All":
filtered_df = filtered_df[filtered_df["Lab"] == selected_lab]
if selected_type != "All":
filtered_df = filtered_df[filtered_df["Type"] == selected_type]
if selected_date_range != "All" and selected_date_range != "No data available." and not df['Timestamp'].isna().all():
start_date, end_date = selected_date_range.split(" to ")
start_date = pd.to_datetime(start_date)
end_date = pd.to_datetime(end_date) + timedelta(days=1)
filtered_df = filtered_df[(filtered_df["Timestamp"] >= start_date) & (filtered_df["Timestamp"] < end_date)]
if filtered_df.empty:
return None
pdf = FPDF()
pdf.add_page()
pdf.set_font("Arial", size=12)
pdf.cell(200, 10, txt="LabOps Dashboard Report", ln=True, align='C')
pdf.ln(10)
for index, row in filtered_df.iterrows():
line = f"{row['Timestamp']} | {row['DeviceID']} | {row['Lab']} | {row['Type']} | {row['Status']} | {row['UsageCount']}"
pdf.multi_cell(0, 10, txt=line)
output = io.BytesIO()
pdf.output(output)
output.seek(0)
return output
except Exception as e:
print(f"Error in download_pdf: {str(e)}")
return None
# Build the Gradio interface
try:
with gr.Blocks() as demo:
gr.Markdown("🧪 **Multi-Device LabOps Dashboard**\nMonitor smart lab devices, visualize logs, and generate PDF reports.")
with gr.Row():
csv_input = gr.File(label="Upload Device Logs CSV", file_types=[".csv"])
with gr.Row():
lab_dropdown = gr.Dropdown(label="Filter by Lab", choices=["All"], value="All")
type_dropdown = gr.Dropdown(label="Filter by Equipment Type", choices=["All"], value="All")
date_dropdown = gr.Dropdown(label="Filter by Date Range", choices=["All"], value="All")
with gr.Row():
submit_btn = gr.Button("Submit Filters")
with gr.Row():
device_cards = gr.DataFrame(label="Device Cards (Usage, Last Log)")
plot_daily = gr.Image(label="Daily Log Trends")
plot_uptime = gr.Image(label="Weekly Uptime %")
anomaly_output = gr.Textbox(label="Anomaly Alerts")
with gr.Row():
download_btn = gr.Button("Download PDF Report")
error_box = gr.Textbox(label="Status/Error Message", visible=True, interactive=False)
# Connect the components
csv_input.change(
fn=upload_csv,
inputs=csv_input,
outputs=[lab_dropdown, type_dropdown, date_dropdown, error_box, lab_dropdown, type_dropdown, date_dropdown, device_cards, plot_daily, plot_uptime, anomaly_output]
)
submit_btn.click(
fn=filter_and_visualize,
inputs=[lab_dropdown, type_dropdown, date_dropdown],
outputs=[device_cards, plot_daily, plot_uptime, anomaly_output, error_box]
)
download_btn.click(
fn=download_pdf,
inputs=[lab_dropdown, type_dropdown, date_dropdown],
outputs=gr.File(label="Download PDF")
)
demo.launch()
except Exception as e:
print(f"Error launching Gradio interface: {str(e)}")