""" LabOps Log Analyzer Dashboard with CSV file upload, PDF generation, and Salesforce integration """ import gradio as gr import pandas as pd from datetime import datetime, timedelta import logging import plotly.express as px from sklearn.ensemble import IsolationForest from transformers import pipeline import torch from concurrent.futures import ThreadPoolExecutor from simple_salesforce import Salesforce import os import json # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # Salesforce configuration try: sf = Salesforce( username=os.getenv('SF_USERNAME'), password=os.getenv('SF_PASSWORD'), security_token=os.getenv('SF_SECURITY_TOKEN'), domain='login' ) logging.info("Salesforce connection established") except Exception as e: logging.error(f"Failed to connect to Salesforce: {str(e)}") sf = None # Try to import reportlab try: from reportlab.lib.pagesizes import letter from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer from reportlab.lib.styles import getSampleStyleSheet reportlab_available = True logging.info("reportlab module successfully imported") except ImportError: logging.warning("reportlab module not found. PDF generation disabled.") reportlab_available = False # Preload Hugging Face model logging.info("Preloading Hugging Face model...") try: device = 0 if torch.cuda.is_available() else -1 summarizer = pipeline( "summarization", model="facebook/bart-large-cnn", device=device, max_length=50, min_length=10, num_beams=4 ) logging.info(f"Hugging Face model preloaded on {'GPU' if device == 0 else 'CPU'}") except Exception as e: logging.error(f"Failed to preload model: {str(e)}") raise e # Fetch valid picklist values from Salesforce def get_picklist_values(field_name): if sf is None: return [] try: obj_desc = sf.SmartLog__c.describe() for field in obj_desc['fields']: if field['name'] == field_name: return [value['value'] for value in field['picklistValues'] if value['active']] return [] except Exception as e: logging.error(f"Failed to fetch picklist values for {field_name}: {str(e)}") return [] # Cache picklist values at startup status_values = get_picklist_values('Status__c') or ["Active", "Inactive", "Pending"] log_type_values = get_picklist_values('Log_Type__c') or ["Smart Log", "Cell Analysis", "UV Verification"] logging.info(f"Valid Status__c values: {status_values}") logging.info(f"Valid Log_Type__c values: {log_type_values}") # Map invalid picklist values to valid ones picklist_mapping = { 'Status__c': { 'normal': 'Active', 'error': 'Inactive', 'warning': 'Pending', 'ok': 'Active', 'failed': 'Inactive' }, 'Log_Type__c': { 'maint': 'Smart Log', 'error': 'Cell Analysis', 'ops': 'UV Verification', 'maintenance': 'Smart Log', 'cell': 'Cell Analysis', 'uv': 'UV Verification' } } # Fetch folder ID for "LabOps Reports" def get_folder_id(folder_name): if sf is None: return None try: query = f"SELECT Id FROM Folder WHERE Name = '{folder_name}' AND Type = 'Report'" result = sf.query(query) if result['totalSize'] > 0: folder_id = result['records'][0]['Id'] logging.info(f"Found folder ID for '{folder_name}': {folder_id}") return folder_id else: logging.error(f"Folder '{folder_name}' not found in Salesforce.") return None except Exception as e: logging.error(f"Failed to fetch folder ID for '{folder_name}': {str(e)}") return None # Cache the folder ID at startup LABOPS_REPORTS_FOLDER_ID = get_folder_id('LabOps Reports') # Create Salesforce reports (Usage and AMC Reminders) def create_salesforce_reports(df): if sf is None: return "Salesforce connection not available." if not LABOPS_REPORTS_FOLDER_ID: return "Cannot create reports: 'LabOps Reports' folder not found in Salesforce." try: # Usage Report (Summary Report) usage_report_metadata = { "reportMetadata": { "name": f"SmartLog_Usage_Report_{datetime.now().strftime('%Y%m%d_%H%M%S')}", "developerName": f"SmartLog_Usage_Report_{datetime.now().strftime('%Y%m%d_%H%M%S')}", "reportType": { "type": "CustomObject", "value": "SmartLog__c" }, "reportFormat": "SUMMARY", "reportBooleanFilter": None, "reportFilters": [ { "column": "SmartLog__c.Status__c", "operator": "equals", "value": "Active" }, { "column": "SmartLog__c.Timestamp__c", "operator": "greaterOrEqual", "value": "THIS_MONTH" } ], "aggregates": ["s!SmartLog__c.Usage_Hours__c", "s!SmartLog__c.Downtime__c"], "groupingsDown": [ { "name": "Device_Id__c", "field": "SmartLog__c.Device_Id__c", "sortOrder": "Asc", "sortAggregate": None, "dateGranularity": "None" } ], "detailColumns": [ "SmartLog__c.Device_Id__c", "SmartLog__c.Log_Type__c", "SmartLog__c.Status__c", "SmartLog__c.Timestamp__c", "SmartLog__c.Usage_Hours__c", "SmartLog__c.Downtime__c", "SmartLog__c.AMC_Date__c" ], "folderId": LABOPS_REPORTS_FOLDER_ID, "currency": None } } usage_result = sf.restful('analytics/reports', method='POST', json=usage_report_metadata) usage_report_id = usage_result['id'] logging.info(f"Usage Report created: {usage_report_id}") # AMC Reminders Report (Tabular Report) amc_report_metadata = { "reportMetadata": { "name": f"SmartLog_AMC_Reminders_{datetime.now().strftime('%Y%m%d_%H%M%S')}", "developerName": f"SmartLog_AMC_Reminders_{datetime.now().strftime('%Y%m%d_%H%M%S')}", "reportType": { "type": "CustomObject", "value": "SmartLog__c" }, "reportFormat": "TABULAR", "reportBooleanFilter": None, "reportFilters": [ { "column": "SmartLog__c.Status__c", "operator": "equals", "value": "Active" }, { "column": "SmartLog__c.AMC_Date__c", "operator": "greaterOrEqual", "value": "TODAY" }, { "column": "SmartLog__c.AMC_Date__c", "operator": "lessOrEqual", "value": "NEXT_N_DAYS:30" } ], "detailColumns": [ "SmartLog__c.Device_Id__c", "SmartLog__c.AMC_Date__c", "SmartLog__c.Status__c" ], "folderId": LABOPS_REPORTS_FOLDER_ID, "currency": None } } amc_result = sf.restful('analytics/reports', method='POST', json=amc_report_metadata) amc_report_id = amc_result['id'] logging.info(f"AMC Reminders Report created: {amc_report_id}") return f"Usage Report ID: {usage_report_id}, AMC Reminders Report ID: {amc_report_id}" except Exception as e: logging.error(f"Failed to create Salesforce reports: {str(e)}") return f"Failed to create reports: {str(e)}" # Save results to Salesforce SmartLog__c def save_to_salesforce(df, summary, anomalies, amc_reminders, insights): if sf is None: return "Salesforce connection not available." try: records = [] current_date = datetime.now() next_30_days = current_date + timedelta(days=30) for _, row in df.head(100).iterrows(): # Validate and map picklist values status = str(row['status']) log_type = str(row['log_type']) # Map Status__c if status not in status_values: status = picklist_mapping['Status__c'].get(status.lower(), status_values[0] if status_values else None) if status is None: logging.warning(f"Skipping record with invalid Status__c: {row['status']}") continue # Map Log_Type__c if log_type not in log_type_values: log_type = picklist_mapping['Log_Type__c'].get(log_type.lower(), log_type_values[0] if log_type_values else None) if log_type is None: logging.warning(f"Skipping record with invalid Log_Type__c: {row['log_type']}") continue # Ensure AMC_Date__c is in correct format amc_date_str = row['amc_date'].strftime('%Y-%m-%d') if pd.notna(row['amc_date']) else None if amc_date_str: amc_date = datetime.strptime(amc_date_str, '%Y-%m-%d') # Log if this record qualifies for AMC Reminders if status == "Active" and current_date.date() <= amc_date.date() <= next_30_days.date(): logging.info(f"Record qualifies for AMC Reminders: Device ID {row['device_id']}, AMC Date {amc_date_str}") record = { 'Device_Id__c': str(row['device_id'])[:50], 'Log_Type__c': log_type, 'Status__c': status, 'Timestamp__c': row['timestamp'].isoformat() if pd.notna(row['timestamp']) else None, 'Usage_Hours__c': float(row['usage_hours']) if pd.notna(row['usage_hours']) else 0.0, 'Downtime__c': float(row['downtime']) if pd.notna(row['downtime']) else 0.0, 'AMC_Date__c': amc_date_str } records.append(record) # Bulk insert to reduce API calls if records: sf.bulk.SmartLog__c.insert(records) logging.info(f"Saved {len(records)} records to Salesforce") return f"Saved {len(records)} records to Salesforce." except Exception as e: logging.error(f"Failed to save to Salesforce: {str(e)}") return f"Failed to save to Salesforce: {str(e)}" # Summarize logs def summarize_logs(df, progress=gr.Progress()): progress(0.1, "Generating summary report...") try: total_devices = df["device_id"].nunique() most_used = df.groupby("device_id")["usage_hours"].sum().idxmax() if not df.empty else "N/A" prompt = f"Maintenance logs: {total_devices} devices. Most used: {most_used}." summary = summarizer(prompt, max_length=50, min_length=10, do_sample=False)[0]["summary_text"] logging.info("Summary generated successfully") return summary except Exception as e: logging.error(f"Summary generation failed: {str(e)}") return f"Failed to generate summary: {str(e)}" # Anomaly detection def detect_anomalies(df, progress=gr.Progress()): progress(0.4, "Detecting anomalies...") try: if "usage_hours" not in df.columns or "downtime" not in df.columns: return "Anomaly detection requires 'usage_hours' and 'downtime' columns." if len(df) > 1000: df = df.sample(n=1000, random_state=42) features = df[["usage_hours", "downtime"]].fillna(0) iso_forest = IsolationForest(contamination=0.1, random_state=42, n_jobs=-1) df["anomaly"] = iso_forest.fit_predict(features) anomalies = df[df["anomaly"] == -1][["device_id", "usage_hours", "downtime", "timestamp"]] if anomalies.empty: return "No anomalies detected." anomaly_lines = ["Detected Anomalies:"] for _, row in anomalies.head(5).iterrows(): anomaly_lines.append( f"- Device ID: {row['device_id']}, Usage Hours: {row['usage_hours']}, " f"Downtime: {row['downtime']}, Timestamp: {row['timestamp']}" ) return "\n".join(anomaly_lines) except Exception as e: logging.error(f"Anomaly detection failed: {str(e)}") return f"Anomaly detection failed: {str(e)}" # AMC reminders (identify records for display) def check_amc_reminders(df, current_date, progress=gr.Progress()): progress(0.6, "Checking AMC reminders...") try: if "device_id" not in df.columns or "amc_date" not in df.columns: return "AMC reminders require 'device_id' and 'amc_date' columns." df["amc_date"] = pd.to_datetime(df["amc_date"], errors='coerce') current_date = pd.to_datetime(current_date) df["days_to_amc"] = (df["amc_date"] - current_date).dt.days reminders = df[(df["days_to_amc"] >= 0) & (df["days_to_amc"] <= 30)][["device_id", "amc_date"]] if reminders.empty: return "No AMC reminders due within the next 30 days." reminder_lines = ["Upcoming AMC Reminders:"] for _, row in reminders.head(5).iterrows(): reminder_lines.append(f"- Device ID: {row['device_id']}, AMC Date: {row['amc_date']}") return "\n".join(reminder_lines) except Exception as e: logging.error(f"AMC reminder generation failed: {str(e)}") return f"AMC reminder generation failed: {str(e)}" # Dashboard insights def generate_dashboard_insights(df, progress=gr.Progress()): progress(0.8, "Generating dashboard insights...") try: total_devices = df["device_id"].nunique() avg_usage = df["usage_hours"].mean() if "usage_hours" in df.columns else 0 prompt = f"Insights: {total_devices} devices, avg usage {avg_usage:.2f} hours." insights = summarizer(prompt, max_length=50, min_length=10, do_sample=False)[0]["summary_text"] return insights except Exception as e: logging.error(f"Dashboard insights generation failed: {str(e)}") return f"Dashboard insights generation failed: {str(e)}" # Create usage chart def create_usage_chart(df, progress=gr.Progress()): progress(0.9, "Creating usage chart...") try: usage_data = df.groupby("device_id")["usage_hours"].sum().reset_index() if len(usage_data) > 5: usage_data = usage_data.nlargest(5, "usage_hours") custom_colors = ['#FF6B6B', '#4ECDC4', '#45B7D1', '#96CEB4'] fig = px.bar( usage_data, x="device_id", y="usage_hours", title="Usage Hours per Device", labels={"device_id": "Device ID", "usage_hours": "Usage Hours"}, color="device_id", color_discrete_sequence=custom_colors ) fig.update_layout( title_font_size=16, margin=dict(l=20, r=20, t=40, b=20), plot_bgcolor="white", paper_bgcolor="white", font=dict(size=12) ) return fig except Exception as e: logging.error(f"Failed to create usage chart: {str(e)}") return None # Generate PDF content def generate_pdf_content(summary, preview, anomalies, amc_reminders, insights): if not reportlab_available: return None try: pdf_path = f"analysis_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pdf" doc = SimpleDocTemplate(pdf_path, pagesize=letter) styles = getSampleStyleSheet() story = [] def safe_paragraph(text, style): return Paragraph(str(text).replace('\n', '
'), style) if text else Paragraph("", style) story.append(Paragraph("LabOps Log Analysis Report", styles['Title'])) story.append(Paragraph(f"Generated on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", styles['Normal'])) story.append(Spacer(1, 12)) story.append(Paragraph("Summary Report", styles['Heading2'])) story.append(safe_paragraph(summary or "No summary available.", styles['Normal'])) story.append(Spacer(1, 12)) story.append(Paragraph("Log Preview", styles['Heading2'])) story.append(safe_paragraph(preview or "No preview available.", styles['Normal'])) story.append(Spacer(1, 12)) story.append(Paragraph("Anomaly Detection", styles['Heading2'])) story.append(safe_paragraph(anomalies or "No anomalies detected.", styles['Normal'])) story.append(Spacer(1, 12)) story.append(Paragraph("AMC Reminders", styles['Heading2'])) story.append(safe_paragraph(amc_reminders or "No AMC reminders.", styles['Normal'])) story.append(Spacer(1, 12)) story.append(Paragraph("Dashboard Insights", styles['Heading2'])) story.append(safe_paragraph(insights or "No insights generated.", styles['Normal'])) doc.build(story) logging.info(f"PDF generated at {pdf_path}") return pdf_path except Exception as e: logging.error(f"Failed to generate PDF: {str(e)}") return None # Main Gradio function async def process_logs(file_obj, progress=gr.Progress()): try: progress(0, "Starting file processing...") if not file_obj: return "No file uploaded.", "No data to preview.", None, "No anomalies detected.", "No AMC reminders.", "No insights generated.", None, "No Salesforce data saved.", "No report created." file_name = file_obj.name logging.info(f"Processing file: {file_name}") if not file_name.endswith(".csv"): return "Please upload a CSV file.", "", None, "", "", "", None, "", "" required_columns = ["device_id", "log_type", "status", "timestamp", "usage_hours", "downtime", "amc_date"] dtypes = { "device_id": "string", "log_type": "string", "status": "string", "usage_hours": "float32", "downtime": "float32", "amc_date": "string" } df = pd.read_csv(file_obj, dtype=dtypes) missing_columns = [col for col in required_columns if col not in df.columns] if missing_columns: return f"Missing columns: {missing_columns}", None, None, None, None, None, None, None, None df["timestamp"] = pd.to_datetime(df["timestamp"], errors='coerce') df["amc_date"] = pd.to_datetime(df["amc_date"], errors='coerce') if df.empty: return "No data available.", None, None, None, None, None, None, None, None with ThreadPoolExecutor() as executor: future_summary = executor.submit(summarize_logs, df) future_anomalies = executor.submit(detect_anomalies, df) future_amc = executor.submit(check_amc_reminders, df, datetime.now()) future_insights = executor.submit(generate_dashboard_insights, df) future_chart = executor.submit(create_usage_chart, df) future_reports = executor.submit(create_salesforce_reports, df) summary = f"Step 1: Summary Report\n{future_summary.result()}" anomalies = f"Anomaly Detection\n{future_anomalies.result()}" amc_reminders = f"AMC Reminders\n{future_amc.result()}" insights = f"Dashboard Insights (AI)\n{future_insights.result()}" chart = future_chart.result() report_result = future_reports.result() preview_lines = ["Step 2: Log Preview (First 5 Rows)"] for idx, row in df.head(5).iterrows(): preview_lines.append( f"Row {idx + 1}: Device ID: {row['device_id']}, " f"Log Type: {row['log_type']}, Status: {row['status']}, " f"Timestamp: {row['timestamp']}, Usage Hours: {row['usage_hours']}, " f"Downtime: {row['downtime']}, AMC Date: {row['amc_date']}" ) preview = "\n".join(preview_lines) salesforce_result = save_to_salesforce(df, summary, anomalies, amc_reminders, insights) pdf_file = generate_pdf_content(summary, preview, anomalies, amc_reminders, insights) progress(1.0, "Done!") return summary, preview, chart, anomalies, amc_reminders, insights, pdf_file, salesforce_result, report_result except Exception as e: logging.error(f"Failed to process file: {str(e)}") return f"Error: {str(e)}", None, None, None, None, None, None, None, None # Gradio Interface try: logging.info("Initializing Gradio interface...") with gr.Blocks(css=""" .dashboard-container {border: 1px solid #e0e0e0; padding: 10px; border-radius: 5px;} .dashboard-title {font-size: 24px; font-weight: bold; margin-bottom: 5px;} .dashboard-section {margin-bottom: 20px;} .dashboard-section h3 {font-size: 18px; margin-bottom: 2px;} .dashboard-section p {margin: 1px 0; line-height: 1.2;} .dashboard-section ul {margin: 2px 0; padding-left: 20px;} """) as iface: gr.Markdown("

LabOps Log Analyzer Dashboard (Hugging Face AI)

") gr.Markdown("Upload a CSV file to analyze and generate Salesforce reports.") with gr.Row(): with gr.Column(scale=1): file_input = gr.File(label="Upload Logs (CSV)", file_types=[".csv"]) submit_button = gr.Button("Analyze", variant="primary") with gr.Column(scale=2): with gr.Group(elem_classes="dashboard-container"): gr.Markdown("
Analysis Results
") with gr.Group(elem_classes="dashboard-section"): gr.Markdown("### Step 1: Summary Report") summary_output = gr.Markdown() with gr.Group(elem_classes="dashboard-section"): gr.Markdown("### Step 2: Log Preview") preview_output = gr.Markdown() with gr.Group(elem_classes="dashboard-section"): gr.Markdown("### Step 3: Usage Chart") chart_output = gr.Plot() with gr.Group(elem_classes="dashboard-section"): gr.Markdown("### Step 4: Anomaly Detection") anomaly_output = gr.Markdown() with gr.Group(elem_classes="dashboard-section"): gr.Markdown("### Step 5: AMC Reminders") amc_output = gr.Markdown() with gr.Group(elem_classes="dashboard-section"): gr.Markdown("### Step 6: Insights (AI)") insights_output = gr.Markdown() with gr.Group(elem_classes="dashboard-section"): gr.Markdown("### Salesforce Integration") salesforce_output = gr.Markdown() with gr.Group(elem_classes="dashboard-section"): gr.Markdown("### Salesforce Reports") report_output = gr.Markdown() with gr.Group(elem_classes="dashboard-section"): gr.Markdown("### Download Report") pdf_output = gr.File(label="Download Analysis Report as PDF") submit_button.click( fn=process_logs, inputs=[file_input], outputs=[ summary_output, preview_output, chart_output, anomaly_output, amc_output, insights_output, pdf_output, salesforce_output, report_output ] ) logging.info("Gradio interface initialized successfully") except Exception as e: logging.error(f"Failed to initialize Gradio interface: {str(e)}") raise e if __name__ == "__main__": try: logging.info("Launching Gradio interface...") iface.launch(server_name="0.0.0.0", server_port=7860, debug=True, share=False) logging.info("Gradio interface launched successfully") except Exception as e: logging.error(f"Failed to launch Gradio interface: {str(e)}") print(f"Error launching app: {str(e)}") raise e