LabOpsDashboard / app.py
sanjaybhargavneela1929's picture
Update app.py
4be142c verified
raw
history blame
25.1 kB
"""
LabOps Log Analyzer Dashboard with CSV file upload, PDF generation, and Salesforce integration
"""
import gradio as gr
import pandas as pd
from datetime import datetime, timedelta
import logging
import plotly.express as px
from sklearn.ensemble import IsolationForest
from transformers import pipeline
import torch
from concurrent.futures import ThreadPoolExecutor
from simple_salesforce import Salesforce
import os
import json
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Salesforce configuration
try:
sf = Salesforce(
username=os.getenv('SF_USERNAME'),
password=os.getenv('SF_PASSWORD'),
security_token=os.getenv('SF_SECURITY_TOKEN'),
domain='login'
)
logging.info("Salesforce connection established")
except Exception as e:
logging.error(f"Failed to connect to Salesforce: {str(e)}")
sf = None
# Try to import reportlab
try:
from reportlab.lib.pagesizes import letter
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer
from reportlab.lib.styles import getSampleStyleSheet
reportlab_available = True
logging.info("reportlab module successfully imported")
except ImportError:
logging.warning("reportlab module not found. PDF generation disabled.")
reportlab_available = False
# Preload Hugging Face model
logging.info("Preloading Hugging Face model...")
try:
device = 0 if torch.cuda.is_available() else -1
summarizer = pipeline(
"summarization",
model="facebook/bart-large-cnn",
device=device,
max_length=50,
min_length=10,
num_beams=4
)
logging.info(f"Hugging Face model preloaded on {'GPU' if device == 0 else 'CPU'}")
except Exception as e:
logging.error(f"Failed to preload model: {str(e)}")
raise e
# Fetch valid picklist values from Salesforce
def get_picklist_values(field_name):
if sf is None:
return []
try:
obj_desc = sf.SmartLog__c.describe()
for field in obj_desc['fields']:
if field['name'] == field_name:
return [value['value'] for value in field['picklistValues'] if value['active']]
return []
except Exception as e:
logging.error(f"Failed to fetch picklist values for {field_name}: {str(e)}")
return []
# Cache picklist values at startup
status_values = get_picklist_values('Status__c') or ["Active", "Inactive", "Pending"]
log_type_values = get_picklist_values('Log_Type__c') or ["Smart Log", "Cell Analysis", "UV Verification"]
logging.info(f"Valid Status__c values: {status_values}")
logging.info(f"Valid Log_Type__c values: {log_type_values}")
# Map invalid picklist values to valid ones
picklist_mapping = {
'Status__c': {
'normal': 'Active',
'error': 'Inactive',
'warning': 'Pending',
'ok': 'Active',
'failed': 'Inactive'
},
'Log_Type__c': {
'maint': 'Smart Log',
'error': 'Cell Analysis',
'ops': 'UV Verification',
'maintenance': 'Smart Log',
'cell': 'Cell Analysis',
'uv': 'UV Verification'
}
}
# Fetch folder ID for "LabOps Reports"
def get_folder_id(folder_name):
if sf is None:
return None
try:
query = f"SELECT Id FROM Folder WHERE Name = '{folder_name}' AND Type = 'Report'"
result = sf.query(query)
if result['totalSize'] > 0:
folder_id = result['records'][0]['Id']
logging.info(f"Found folder ID for '{folder_name}': {folder_id}")
return folder_id
else:
logging.error(f"Folder '{folder_name}' not found in Salesforce.")
return None
except Exception as e:
logging.error(f"Failed to fetch folder ID for '{folder_name}': {str(e)}")
return None
# Cache the folder ID at startup
LABOPS_REPORTS_FOLDER_ID = get_folder_id('LabOps Reports')
# Create Salesforce reports (Usage and AMC Reminders)
def create_salesforce_reports(df):
if sf is None:
return "Salesforce connection not available."
if not LABOPS_REPORTS_FOLDER_ID:
return "Cannot create reports: 'LabOps Reports' folder not found in Salesforce."
try:
# Usage Report (Summary Report)
usage_report_metadata = {
"reportMetadata": {
"name": f"SmartLog_Usage_Report_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
"developerName": f"SmartLog_Usage_Report_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
"reportType": {
"type": "CustomObject",
"value": "SmartLog__c"
},
"reportFormat": "SUMMARY",
"reportBooleanFilter": None,
"reportFilters": [
{
"column": "SmartLog__c.Status__c",
"operator": "equals",
"value": "Active"
},
{
"column": "SmartLog__c.Timestamp__c",
"operator": "greaterOrEqual",
"value": "THIS_MONTH"
}
],
"aggregates": ["s!SmartLog__c.Usage_Hours__c", "s!SmartLog__c.Downtime__c"],
"groupingsDown": [
{
"name": "Device_Id__c",
"field": "SmartLog__c.Device_Id__c",
"sortOrder": "Asc",
"sortAggregate": None,
"dateGranularity": "None"
}
],
"detailColumns": [
"SmartLog__c.Device_Id__c",
"SmartLog__c.Log_Type__c",
"SmartLog__c.Status__c",
"SmartLog__c.Timestamp__c",
"SmartLog__c.Usage_Hours__c",
"SmartLog__c.Downtime__c",
"SmartLog__c.AMC_Date__c"
],
"folderId": LABOPS_REPORTS_FOLDER_ID,
"currency": None
}
}
usage_result = sf.restful('analytics/reports', method='POST', json=usage_report_metadata)
usage_report_id = usage_result['id']
logging.info(f"Usage Report created: {usage_report_id}")
# AMC Reminders Report (Tabular Report)
amc_report_metadata = {
"reportMetadata": {
"name": f"SmartLog_AMC_Reminders_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
"developerName": f"SmartLog_AMC_Reminders_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
"reportType": {
"type": "CustomObject",
"value": "SmartLog__c"
},
"reportFormat": "TABULAR",
"reportBooleanFilter": None,
"reportFilters": [
{
"column": "SmartLog__c.Status__c",
"operator": "equals",
"value": "Active"
},
{
"column": "SmartLog__c.AMC_Date__c",
"operator": "greaterOrEqual",
"value": "TODAY"
},
{
"column": "SmartLog__c.AMC_Date__c",
"operator": "lessOrEqual",
"value": "NEXT_N_DAYS:30"
}
],
"detailColumns": [
"SmartLog__c.Device_Id__c",
"SmartLog__c.AMC_Date__c",
"SmartLog__c.Status__c"
],
"folderId": LABOPS_REPORTS_FOLDER_ID,
"currency": None
}
}
amc_result = sf.restful('analytics/reports', method='POST', json=amc_report_metadata)
amc_report_id = amc_result['id']
logging.info(f"AMC Reminders Report created: {amc_report_id}")
return f"Usage Report ID: {usage_report_id}, AMC Reminders Report ID: {amc_report_id}"
except Exception as e:
logging.error(f"Failed to create Salesforce reports: {str(e)}")
return f"Failed to create reports: {str(e)}"
# Save results to Salesforce SmartLog__c
def save_to_salesforce(df, summary, anomalies, amc_reminders, insights):
if sf is None:
return "Salesforce connection not available."
try:
records = []
current_date = datetime.now()
next_30_days = current_date + timedelta(days=30)
for _, row in df.head(100).iterrows():
# Validate and map picklist values
status = str(row['status'])
log_type = str(row['log_type'])
# Map Status__c
if status not in status_values:
status = picklist_mapping['Status__c'].get(status.lower(), status_values[0] if status_values else None)
if status is None:
logging.warning(f"Skipping record with invalid Status__c: {row['status']}")
continue
# Map Log_Type__c
if log_type not in log_type_values:
log_type = picklist_mapping['Log_Type__c'].get(log_type.lower(), log_type_values[0] if log_type_values else None)
if log_type is None:
logging.warning(f"Skipping record with invalid Log_Type__c: {row['log_type']}")
continue
# Ensure AMC_Date__c is in correct format
amc_date_str = row['amc_date'].strftime('%Y-%m-%d') if pd.notna(row['amc_date']) else None
if amc_date_str:
amc_date = datetime.strptime(amc_date_str, '%Y-%m-%d')
# Log if this record qualifies for AMC Reminders
if status == "Active" and current_date.date() <= amc_date.date() <= next_30_days.date():
logging.info(f"Record qualifies for AMC Reminders: Device ID {row['device_id']}, AMC Date {amc_date_str}")
record = {
'Device_Id__c': str(row['device_id'])[:50],
'Log_Type__c': log_type,
'Status__c': status,
'Timestamp__c': row['timestamp'].isoformat() if pd.notna(row['timestamp']) else None,
'Usage_Hours__c': float(row['usage_hours']) if pd.notna(row['usage_hours']) else 0.0,
'Downtime__c': float(row['downtime']) if pd.notna(row['downtime']) else 0.0,
'AMC_Date__c': amc_date_str
}
records.append(record)
# Bulk insert to reduce API calls
if records:
sf.bulk.SmartLog__c.insert(records)
logging.info(f"Saved {len(records)} records to Salesforce")
return f"Saved {len(records)} records to Salesforce."
except Exception as e:
logging.error(f"Failed to save to Salesforce: {str(e)}")
return f"Failed to save to Salesforce: {str(e)}"
# Summarize logs
def summarize_logs(df, progress=gr.Progress()):
progress(0.1, "Generating summary report...")
try:
total_devices = df["device_id"].nunique()
most_used = df.groupby("device_id")["usage_hours"].sum().idxmax() if not df.empty else "N/A"
prompt = f"Maintenance logs: {total_devices} devices. Most used: {most_used}."
summary = summarizer(prompt, max_length=50, min_length=10, do_sample=False)[0]["summary_text"]
logging.info("Summary generated successfully")
return summary
except Exception as e:
logging.error(f"Summary generation failed: {str(e)}")
return f"Failed to generate summary: {str(e)}"
# Anomaly detection
def detect_anomalies(df, progress=gr.Progress()):
progress(0.4, "Detecting anomalies...")
try:
if "usage_hours" not in df.columns or "downtime" not in df.columns:
return "Anomaly detection requires 'usage_hours' and 'downtime' columns."
if len(df) > 1000:
df = df.sample(n=1000, random_state=42)
features = df[["usage_hours", "downtime"]].fillna(0)
iso_forest = IsolationForest(contamination=0.1, random_state=42, n_jobs=-1)
df["anomaly"] = iso_forest.fit_predict(features)
anomalies = df[df["anomaly"] == -1][["device_id", "usage_hours", "downtime", "timestamp"]]
if anomalies.empty:
return "No anomalies detected."
anomaly_lines = ["Detected Anomalies:"]
for _, row in anomalies.head(5).iterrows():
anomaly_lines.append(
f"- Device ID: {row['device_id']}, Usage Hours: {row['usage_hours']}, "
f"Downtime: {row['downtime']}, Timestamp: {row['timestamp']}"
)
return "\n".join(anomaly_lines)
except Exception as e:
logging.error(f"Anomaly detection failed: {str(e)}")
return f"Anomaly detection failed: {str(e)}"
# AMC reminders (identify records for display)
def check_amc_reminders(df, current_date, progress=gr.Progress()):
progress(0.6, "Checking AMC reminders...")
try:
if "device_id" not in df.columns or "amc_date" not in df.columns:
return "AMC reminders require 'device_id' and 'amc_date' columns."
df["amc_date"] = pd.to_datetime(df["amc_date"], errors='coerce')
current_date = pd.to_datetime(current_date)
df["days_to_amc"] = (df["amc_date"] - current_date).dt.days
reminders = df[(df["days_to_amc"] >= 0) & (df["days_to_amc"] <= 30)][["device_id", "amc_date"]]
if reminders.empty:
return "No AMC reminders due within the next 30 days."
reminder_lines = ["Upcoming AMC Reminders:"]
for _, row in reminders.head(5).iterrows():
reminder_lines.append(f"- Device ID: {row['device_id']}, AMC Date: {row['amc_date']}")
return "\n".join(reminder_lines)
except Exception as e:
logging.error(f"AMC reminder generation failed: {str(e)}")
return f"AMC reminder generation failed: {str(e)}"
# Dashboard insights
def generate_dashboard_insights(df, progress=gr.Progress()):
progress(0.8, "Generating dashboard insights...")
try:
total_devices = df["device_id"].nunique()
avg_usage = df["usage_hours"].mean() if "usage_hours" in df.columns else 0
prompt = f"Insights: {total_devices} devices, avg usage {avg_usage:.2f} hours."
insights = summarizer(prompt, max_length=50, min_length=10, do_sample=False)[0]["summary_text"]
return insights
except Exception as e:
logging.error(f"Dashboard insights generation failed: {str(e)}")
return f"Dashboard insights generation failed: {str(e)}"
# Create usage chart
def create_usage_chart(df, progress=gr.Progress()):
progress(0.9, "Creating usage chart...")
try:
usage_data = df.groupby("device_id")["usage_hours"].sum().reset_index()
if len(usage_data) > 5:
usage_data = usage_data.nlargest(5, "usage_hours")
custom_colors = ['#FF6B6B', '#4ECDC4', '#45B7D1', '#96CEB4']
fig = px.bar(
usage_data,
x="device_id",
y="usage_hours",
title="Usage Hours per Device",
labels={"device_id": "Device ID", "usage_hours": "Usage Hours"},
color="device_id",
color_discrete_sequence=custom_colors
)
fig.update_layout(
title_font_size=16,
margin=dict(l=20, r=20, t=40, b=20),
plot_bgcolor="white",
paper_bgcolor="white",
font=dict(size=12)
)
return fig
except Exception as e:
logging.error(f"Failed to create usage chart: {str(e)}")
return None
# Generate PDF content
def generate_pdf_content(summary, preview, anomalies, amc_reminders, insights):
if not reportlab_available:
return None
try:
pdf_path = f"analysis_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pdf"
doc = SimpleDocTemplate(pdf_path, pagesize=letter)
styles = getSampleStyleSheet()
story = []
def safe_paragraph(text, style):
return Paragraph(str(text).replace('\n', '<br/>'), style) if text else Paragraph("", style)
story.append(Paragraph("LabOps Log Analysis Report", styles['Title']))
story.append(Paragraph(f"Generated on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", styles['Normal']))
story.append(Spacer(1, 12))
story.append(Paragraph("Summary Report", styles['Heading2']))
story.append(safe_paragraph(summary or "No summary available.", styles['Normal']))
story.append(Spacer(1, 12))
story.append(Paragraph("Log Preview", styles['Heading2']))
story.append(safe_paragraph(preview or "No preview available.", styles['Normal']))
story.append(Spacer(1, 12))
story.append(Paragraph("Anomaly Detection", styles['Heading2']))
story.append(safe_paragraph(anomalies or "No anomalies detected.", styles['Normal']))
story.append(Spacer(1, 12))
story.append(Paragraph("AMC Reminders", styles['Heading2']))
story.append(safe_paragraph(amc_reminders or "No AMC reminders.", styles['Normal']))
story.append(Spacer(1, 12))
story.append(Paragraph("Dashboard Insights", styles['Heading2']))
story.append(safe_paragraph(insights or "No insights generated.", styles['Normal']))
doc.build(story)
logging.info(f"PDF generated at {pdf_path}")
return pdf_path
except Exception as e:
logging.error(f"Failed to generate PDF: {str(e)}")
return None
# Main Gradio function
async def process_logs(file_obj, progress=gr.Progress()):
try:
progress(0, "Starting file processing...")
if not file_obj:
return "No file uploaded.", "No data to preview.", None, "No anomalies detected.", "No AMC reminders.", "No insights generated.", None, "No Salesforce data saved.", "No report created."
file_name = file_obj.name
logging.info(f"Processing file: {file_name}")
if not file_name.endswith(".csv"):
return "Please upload a CSV file.", "", None, "", "", "", None, "", ""
required_columns = ["device_id", "log_type", "status", "timestamp", "usage_hours", "downtime", "amc_date"]
dtypes = {
"device_id": "string",
"log_type": "string",
"status": "string",
"usage_hours": "float32",
"downtime": "float32",
"amc_date": "string"
}
df = pd.read_csv(file_obj, dtype=dtypes)
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
return f"Missing columns: {missing_columns}", None, None, None, None, None, None, None, None
df["timestamp"] = pd.to_datetime(df["timestamp"], errors='coerce')
df["amc_date"] = pd.to_datetime(df["amc_date"], errors='coerce')
if df.empty:
return "No data available.", None, None, None, None, None, None, None, None
with ThreadPoolExecutor() as executor:
future_summary = executor.submit(summarize_logs, df)
future_anomalies = executor.submit(detect_anomalies, df)
future_amc = executor.submit(check_amc_reminders, df, datetime.now())
future_insights = executor.submit(generate_dashboard_insights, df)
future_chart = executor.submit(create_usage_chart, df)
future_reports = executor.submit(create_salesforce_reports, df)
summary = f"Step 1: Summary Report\n{future_summary.result()}"
anomalies = f"Anomaly Detection\n{future_anomalies.result()}"
amc_reminders = f"AMC Reminders\n{future_amc.result()}"
insights = f"Dashboard Insights (AI)\n{future_insights.result()}"
chart = future_chart.result()
report_result = future_reports.result()
preview_lines = ["Step 2: Log Preview (First 5 Rows)"]
for idx, row in df.head(5).iterrows():
preview_lines.append(
f"Row {idx + 1}: Device ID: {row['device_id']}, "
f"Log Type: {row['log_type']}, Status: {row['status']}, "
f"Timestamp: {row['timestamp']}, Usage Hours: {row['usage_hours']}, "
f"Downtime: {row['downtime']}, AMC Date: {row['amc_date']}"
)
preview = "\n".join(preview_lines)
salesforce_result = save_to_salesforce(df, summary, anomalies, amc_reminders, insights)
pdf_file = generate_pdf_content(summary, preview, anomalies, amc_reminders, insights)
progress(1.0, "Done!")
return summary, preview, chart, anomalies, amc_reminders, insights, pdf_file, salesforce_result, report_result
except Exception as e:
logging.error(f"Failed to process file: {str(e)}")
return f"Error: {str(e)}", None, None, None, None, None, None, None, None
# Gradio Interface
try:
logging.info("Initializing Gradio interface...")
with gr.Blocks(css="""
.dashboard-container {border: 1px solid #e0e0e0; padding: 10px; border-radius: 5px;}
.dashboard-title {font-size: 24px; font-weight: bold; margin-bottom: 5px;}
.dashboard-section {margin-bottom: 20px;}
.dashboard-section h3 {font-size: 18px; margin-bottom: 2px;}
.dashboard-section p {margin: 1px 0; line-height: 1.2;}
.dashboard-section ul {margin: 2px 0; padding-left: 20px;}
""") as iface:
gr.Markdown("<h1>LabOps Log Analyzer Dashboard (Hugging Face AI)</h1>")
gr.Markdown("Upload a CSV file to analyze and generate Salesforce reports.")
with gr.Row():
with gr.Column(scale=1):
file_input = gr.File(label="Upload Logs (CSV)", file_types=[".csv"])
submit_button = gr.Button("Analyze", variant="primary")
with gr.Column(scale=2):
with gr.Group(elem_classes="dashboard-container"):
gr.Markdown("<div class='dashboard-title'>Analysis Results</div>")
with gr.Group(elem_classes="dashboard-section"):
gr.Markdown("### Step 1: Summary Report")
summary_output = gr.Markdown()
with gr.Group(elem_classes="dashboard-section"):
gr.Markdown("### Step 2: Log Preview")
preview_output = gr.Markdown()
with gr.Group(elem_classes="dashboard-section"):
gr.Markdown("### Step 3: Usage Chart")
chart_output = gr.Plot()
with gr.Group(elem_classes="dashboard-section"):
gr.Markdown("### Step 4: Anomaly Detection")
anomaly_output = gr.Markdown()
with gr.Group(elem_classes="dashboard-section"):
gr.Markdown("### Step 5: AMC Reminders")
amc_output = gr.Markdown()
with gr.Group(elem_classes="dashboard-section"):
gr.Markdown("### Step 6: Insights (AI)")
insights_output = gr.Markdown()
with gr.Group(elem_classes="dashboard-section"):
gr.Markdown("### Salesforce Integration")
salesforce_output = gr.Markdown()
with gr.Group(elem_classes="dashboard-section"):
gr.Markdown("### Salesforce Reports")
report_output = gr.Markdown()
with gr.Group(elem_classes="dashboard-section"):
gr.Markdown("### Download Report")
pdf_output = gr.File(label="Download Analysis Report as PDF")
submit_button.click(
fn=process_logs,
inputs=[file_input],
outputs=[
summary_output,
preview_output,
chart_output,
anomaly_output,
amc_output,
insights_output,
pdf_output,
salesforce_output,
report_output
]
)
logging.info("Gradio interface initialized successfully")
except Exception as e:
logging.error(f"Failed to initialize Gradio interface: {str(e)}")
raise e
if __name__ == "__main__":
try:
logging.info("Launching Gradio interface...")
iface.launch(server_name="0.0.0.0", server_port=7860, debug=True, share=False)
logging.info("Gradio interface launched successfully")
except Exception as e:
logging.error(f"Failed to launch Gradio interface: {str(e)}")
print(f"Error launching app: {str(e)}")
raise e