Spaces:
Sleeping
Sleeping
| import datetime | |
| import logging | |
| import sys | |
| import uuid | |
| from pathlib import Path | |
| import csv | |
| import pandas as pd | |
| import gradio as gr | |
| from reportlab.lib.pagesizes import letter | |
| from reportlab.pdfgen import canvas | |
| from simple_salesforce import Salesforce | |
| import base64 | |
| import os | |
| import locale | |
| import numpy as np | |
| try: | |
| locale.setlocale(locale.LC_ALL, 'en_IN.UTF-8') | |
| except locale.Error: | |
| locale.setlocale(locale.LC_ALL, '') | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| handlers=[logging.FileHandler('app_log.txt'), logging.StreamHandler(sys.stdout)] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| SALESFORCE_USERNAME = "vaneshdevarapalli866@agentforce.com" | |
| SALESFORCE_PASSWORD = "vanesh@331" | |
| SALESFORCE_SECURITY_TOKEN = "VRUVbBOdG0s9Q4xy0W6DB1Y6b" | |
| def connect_to_salesforce(): | |
| try: | |
| sf_instance = Salesforce( | |
| username=SALESFORCE_USERNAME, | |
| password=SALESFORCE_PASSWORD, | |
| security_token=SALESFORCE_SECURITY_TOKEN, | |
| ) | |
| logger.info("Connected to Salesforce successfully.") | |
| return sf_instance | |
| except Exception as e: | |
| logger.error(f"Salesforce connection failed: {e}") | |
| raise | |
| try: | |
| sf = connect_to_salesforce() | |
| except Exception as e: | |
| logger.error(f"Failed to initialize Salesforce connection: {e}") | |
| sf = None | |
| equipment_choices = [ | |
| "Bulldozer", "Crane", "Excavator", "Loader", "Forklift", | |
| "Backhoe", "Grader", "Scraper", "Dump Truck", "Roller" | |
| ] | |
| project_choices = [ | |
| "Project Alpha", "Project Beta", "Project Gamma", "Project Delta", "Project Epsilon", | |
| "Project Zeta", "Project Theta", "Project Sigma", "Project Omega", "Project Phoenix" | |
| ] | |
| ai_suggestion_choices = ["Move", "Repair", "Replace"] # Removed "Pause Rent" | |
| def validate_date(last_maint): | |
| try: | |
| # Check if the provided date is in the future | |
| if last_maint and last_maint != "N/A": | |
| entered_date = datetime.datetime.strptime(last_maint, "%Y-%m-%d") | |
| current_date = datetime.datetime.today() | |
| if entered_date > current_date: | |
| raise ValueError(f"Last Maintenance Date cannot be in the future. You entered: {last_maint}") | |
| return last_maint | |
| except ValueError as ve: | |
| raise ValueError(f"Invalid date format for Last Maintenance. Please use YYYY-MM-DD. Error: {ve}") | |
| def call_ai_model(usage, idle, freq, cost, last): | |
| try: | |
| usage = float(usage) if usage is not None and not np.isnan(usage) else 0.0 | |
| idle = float(idle) if idle is not None and not np.isnan(idle) else 0.0 | |
| freq = float(freq) if freq is not None and not np.isnan(freq) else 0.0 | |
| cost = float(cost) if cost is not None and not np.isnan(cost) else 0.0 | |
| total = usage + idle | |
| utilization_ratio = usage / total if total > 0 else 0.0 | |
| utilization_percent = utilization_ratio * 100 | |
| # Determine suggestion (no "Pause Rent") | |
| if utilization_percent < 60: | |
| sug = "Move" | |
| elif utilization_percent < 80: | |
| sug = "Repair" | |
| else: | |
| sug = "Replace" | |
| base_conf = utilization_ratio | |
| if idle > usage: | |
| diff_ratio = (idle - usage) / total | |
| base_conf -= diff_ratio * 0.5 | |
| freq_factor = min(freq / 10.0, 1.0) | |
| base_conf *= (0.7 + 0.3 * freq_factor) | |
| confidence = max(0.05, min(base_conf, 1.0)) | |
| # Round before return | |
| return sug, round(confidence, 2), round(utilization_percent, 2) | |
| except Exception as e: | |
| logger.error(f"Error in call_ai_model: {e}") | |
| raise ValueError(f"AI model computation failed: {str(e)}") | |
| def process_equipment_utilization(equip, proj, use_h, idle_h, move_f, cost_h, last_maint, ai_sug): | |
| try: | |
| if not sf: | |
| raise ValueError("Salesforce connection is not initialized. Please check credentials and try again.") | |
| # Validate the Last Maintenance date | |
| last_maint = validate_date(last_maint) | |
| # Always run AI model to get suggestion, confidence, score | |
| ai_sug_generated, conf, score = call_ai_model(use_h, idle_h, move_f, cost_h, last_maint) | |
| # Use manual suggestion if provided, else AI suggestion | |
| suggestion_to_use = ai_sug if ai_sug else ai_sug_generated | |
| for field, value in [("Usage Hours", use_h), ("Idle Hours", idle_h), | |
| ("Movement Frequency", move_f), ("Cost per Hour", cost_h), | |
| ("Confidence", conf), ("Utilization Score", score)]: | |
| if value is None or np.isnan(value): | |
| raise ValueError(f"Invalid value for {field}: {value}. Must be a valid number.") | |
| if last_maint is None or pd.isna(last_maint): | |
| last_maint = "N/A" | |
| summary = { | |
| "Equipment Name": equip, | |
| "Project": proj, | |
| "Usage Hours": use_h, | |
| "Idle Hours": idle_h, | |
| "Suggestion": suggestion_to_use, | |
| "Confidence": conf * 100, # Store as percentage (0-100) | |
| "Utilization Score": score, | |
| "Cost per Hour": cost_h, | |
| "Last Maintenance": last_maint or "N/A" | |
| } | |
| record_data = { | |
| "Equipment_Name__c": equip, | |
| "Project_Name__c": proj, | |
| "Usage_Hours__c": float(use_h), | |
| "Idle_Hours__c": float(idle_h), | |
| "AI_Suggestion__c": suggestion_to_use, | |
| "Suggestion_Confidence__c": float(conf * 100), | |
| "Utilization_Score__c": float(score), | |
| "Cost_per_Hour__c": float(cost_h), | |
| "Report_Link__c": "Pending", | |
| "Last_Maintenance__c": last_maint if last_maint != "N/A" else None, | |
| "Dashboard_Flag__c": False | |
| } | |
| for key, value in record_data.items(): | |
| if isinstance(value, float) and np.isnan(value): | |
| raise ValueError(f"Field {key} contains NaN, which is not allowed in Salesforce requests.") | |
| logger.info(f"Sending record data to Salesforce: {record_data}") | |
| resp = sf.Equipment_Utilization_Record__c.create(record_data) | |
| if not resp.get("success"): | |
| raise ValueError(f"Failed to create Salesforce record: {resp}") | |
| rec_id = resp.get("id") | |
| if not rec_id: | |
| raise ValueError("Salesforce record creation succeeded, but no record ID was returned.") | |
| safe_equip = equip.replace(" ", "_") | |
| safe_proj = proj.replace(" ", "_") | |
| uid = uuid.uuid4().hex[:8] | |
| pdf_path = Path(f"static/reports/report_{safe_equip}_{safe_proj}_{uid}.pdf") | |
| pdf_path.parent.mkdir(parents=True, exist_ok=True) | |
| c = canvas.Canvas(str(pdf_path), pagesize=letter) | |
| c.setFont("Helvetica-Bold", 14) | |
| title_str = f"Equipment Utilization Report - {equip} ({proj})" | |
| c.drawString(100, 750, title_str) | |
| c.setFont("Helvetica", 12) | |
| c.drawString(100, 730, f"Record ID: {rec_id}") | |
| y = 710 | |
| for k, v in summary.items(): | |
| c.drawString(100, y, f"{k}: {v}") | |
| y -= 20 | |
| c.save() | |
| encoded = base64.b64encode(pdf_path.read_bytes()).decode() | |
| cv = sf.ContentVersion.create({ | |
| "Title": "UtilReport", | |
| "PathOnClient": os.path.basename(str(pdf_path)), | |
| "VersionData": encoded, | |
| "FirstPublishLocationId": rec_id | |
| }) | |
| if not cv.get("success"): | |
| raise ValueError(f"Failed to upload PDF to Salesforce: {cv}") | |
| pdf_url = f"https://{sf.sf_instance}/sfc/servlet.shepherd/version/download/{cv['id']}" | |
| sf.Equipment_Utilization_Record__c.update(rec_id, {"Report_Link__c": pdf_url}) | |
| return { | |
| "Salesforce_Record_Id": rec_id, | |
| "Summary": summary, | |
| "Report_Link": pdf_url, | |
| "Report_File_Path": str(pdf_path) | |
| } | |
| except Exception as e: | |
| logger.error(f"Error in process_equipment_utilization: {e}") | |
| raise ValueError(f"Processing failed: {str(e)}") | |
| def format_output(result): | |
| summary = result.get("Summary", {}) | |
| cost_val = summary.get("Cost per Hour", 0) | |
| try: | |
| cost_str = locale.currency(cost_val, grouping=True) | |
| except: | |
| cost_str = f"βΉ{cost_val:,.2f}" | |
| conf_pct = summary.get("Confidence", 0) # Already percentage (0-100) | |
| util_score = summary.get("Utilization Score", 0) # Already percentage (0-100) | |
| lines = [ | |
| f" β’ AI Suggestion: {summary.get('Suggestion', 'N/A')}", | |
| f" β’ Suggestion Confidence: {conf_pct:.2f}%", | |
| f" β’ Utilization Score: {util_score:.2f}%", | |
| f" β’ Equipment Name: {summary.get('Equipment Name', 'N/A')}", | |
| f" β’ Project: {summary.get('Project', 'N/A')}", | |
| f" β’ Usage Hours: {summary.get('Usage Hours', 0):.2f}", | |
| f" β’ Idle Hours: {summary.get('Idle Hours', 0):.2f}", | |
| f" β’ Cost per Hour: {cost_str}", | |
| f" β’ Last Maintenance: {summary.get('Last Maintenance', 'N/A')}" | |
| ] | |
| return "\n".join(lines) | |
| def format_batch_output(records): | |
| lines = [] | |
| for i, rec in enumerate(records, 1): | |
| summary = rec.get("Summary", {}) | |
| # Detailed record output format | |
| record_details = [ | |
| f"Record {i}:", | |
| f" β’ Record ID: {rec['Salesforce_Record_Id']}", | |
| f" β’ Equipment Name: {summary.get('Equipment Name', 'N/A')}", | |
| f" β’ Project: {summary.get('Project', 'N/A')}", | |
| f" β’ Usage Hours: {summary.get('Usage Hours', 0):.2f}", | |
| f" β’ Idle Hours: {summary.get('Idle Hours', 0):.2f}", | |
| f" β’ Suggestion: {summary.get('Suggestion', 'N/A')}", | |
| f" β’ Suggestion Confidence: {summary.get('Confidence', 0):.2f}%", | |
| f" β’ Utilization Score: {summary.get('Utilization Score', 0):.2f}%", | |
| f" β’ Cost per Hour: βΉ{summary.get('Cost per Hour', 0):,.2f}", | |
| f" β’ Last Maintenance: {summary.get('Last Maintenance', 'N/A')}" | |
| ] | |
| lines.append("\n".join(record_details)) | |
| lines.append("\n---\n") # Separator between records | |
| return "\n".join(lines) | |
| def generate_batch_pdf(records, batch_uid): | |
| pdf_path = Path(f"static/reports/batch_report_{batch_uid}.pdf") | |
| pdf_path.parent.mkdir(parents=True, exist_ok=True) | |
| c = canvas.Canvas(str(pdf_path), pagesize=letter) | |
| c.setFont("Helvetica-Bold", 14) | |
| y = 760 | |
| for idx, record in enumerate(records, 1): | |
| if y < 100: | |
| c.showPage() | |
| c.setFont("Helvetica-Bold", 14) | |
| c.drawString(100, 770, "Equipment Utilization Batch Report") | |
| c.setFont("Helvetica", 12) | |
| y = 750 | |
| if y == 760: | |
| c.drawString(100, y, "Equipment Utilization Batch Report") | |
| y -= 20 | |
| c.setFont("Helvetica", 12) | |
| c.drawString(100, y, f"Record {idx}: {record['Summary']['Equipment Name']}") | |
| y -= 20 | |
| c.drawString(100, y, f"Salesforce Record ID: {record['Salesforce_Record_Id']}") | |
| y -= 20 | |
| for key, value in record["Summary"].items(): | |
| c.drawString(100, y, f"{key}: {value}") | |
| y -= 20 | |
| if y < 100: | |
| c.showPage() | |
| c.setFont("Helvetica-Bold", 14) | |
| c.drawString(100, 770, "Equipment Utilization Batch Report") | |
| c.setFont("Helvetica", 12) | |
| y = 750 | |
| y -= 10 | |
| c.drawString(100, y, "-" * 50) | |
| y -= 20 | |
| c.save() | |
| return str(pdf_path) | |
| def manual_input(equipment, project, usage, idle, freq, cost, last, ai_suggestion): | |
| try: | |
| if not equipment or equipment not in equipment_choices: | |
| raise ValueError("Please select a valid Equipment Name.") | |
| if not project or project not in project_choices: | |
| raise ValueError("Please select a valid Project Name.") | |
| if usage is None or usage < 0 or np.isnan(usage): | |
| raise ValueError("Usage Hours must be a non-negative number.") | |
| if idle is None or idle < 0 or np.isnan(idle): | |
| raise ValueError("Idle Hours must be a non-negative number.") | |
| if freq is None or freq < 0 or np.isnan(freq): | |
| raise ValueError("Movement Frequency must be a non-negative number.") | |
| if cost is None or cost < 0 or np.isnan(cost): | |
| raise ValueError("Cost per Hour must be a non-negative number.") | |
| last_val = last or "N/A" | |
| res = process_equipment_utilization(equipment, project, usage, idle, freq, cost, last_val, ai_suggestion) | |
| formatted = format_output(res) | |
| return formatted, res.get("Report_File_Path") # Only PDF output here now | |
| except Exception as e: | |
| logger.error(f"Error in manual_input: {e}") | |
| return f"Error: {str(e)}", None | |
| def batch_upload(csv_file): | |
| try: | |
| if csv_file is None: | |
| return "", None | |
| df = pd.read_csv(csv_file.name) | |
| required_columns = ['equipment_name', 'project_name', 'usage_hours', 'idle_hours', 'movement_frequency', 'cost_per_hour'] | |
| missing_columns = [col for col in required_columns if col not in df.columns] | |
| if missing_columns: | |
| raise ValueError(f"CSV file is missing required columns: {', '.join(missing_columns)}") | |
| NUMERIC_FIELDS = ['usage_hours', 'idle_hours', 'movement_frequency', 'cost_per_hour'] | |
| for col in NUMERIC_FIELDS: | |
| if df[col].isna().any(): | |
| raise ValueError(f"Column '{col}' contains missing or invalid values (e.g., NaN). Please ensure all values are valid numbers.") | |
| try: | |
| df[col] = df[col].astype(float) | |
| except ValueError as e: | |
| raise ValueError(f"Column '{col}' contains non-numeric values: {str(e)}") | |
| MACHINERY_FIELDS = ['equipment_name', 'project_name'] | |
| for col in MACHINERY_FIELDS: | |
| if col not in df.columns: | |
| raise ValueError(f"Missing required column: {col}") | |
| if df[col].isna().any(): | |
| raise ValueError(f"Column '{col}' contains missing values. Please ensure all rows have valid equipment and project names.") | |
| if 'last_maintenance' in df.columns: | |
| df['last_maintenance'] = df['last_maintenance'].apply(lambda x: "N/A" if pd.isna(x) else str(x)) | |
| if 'ai_suggestion' in df.columns: | |
| df['ai_suggestion'] = df['ai_suggestion'].fillna('') | |
| df['ai_suggestion'] = df['ai_suggestion'].apply( | |
| lambda x: x if x in ai_suggestion_choices else '' | |
| ) | |
| else: | |
| df['ai_suggestion'] = '' | |
| records = [] | |
| for idx, row in df.iterrows(): | |
| try: | |
| last_maint = row.get('last_maintenance', 'N/A') | |
| if pd.isna(last_maint): | |
| last_maint = "N/A" | |
| rec = process_equipment_utilization( | |
| row['equipment_name'], row['project_name'], | |
| row['usage_hours'], row['idle_hours'], | |
| row['movement_frequency'], row['cost_per_hour'], | |
| last_maint, row.get('ai_suggestion', '') | |
| ) | |
| records.append(rec) | |
| except Exception as e: | |
| logger.error(f"Error processing row {idx + 2}: {e}") | |
| raise ValueError(f"Error processing row {idx + 2}: {str(e)}") | |
| batch_uid = uuid.uuid4().hex[:8] | |
| batch_pdf_path = generate_batch_pdf(records, batch_uid) | |
| formatted_text = format_batch_output(records) | |
| return formatted_text, batch_pdf_path | |
| except Exception as e: | |
| logger.error(f"Error in batch_upload: {e}") | |
| return f"Error: {str(e)}", None | |
| with gr.Blocks() as app: | |
| gr.Markdown("## π Equipment Utilization Record Uploader", elem_id="app-title") | |
| with gr.Tabs(): | |
| with gr.TabItem("Manual Input"): | |
| with gr.Group(): | |
| equipment_dropdown = gr.Dropdown(equipment_choices, label="π§ Equipment Name") | |
| project_dropdown = gr.Dropdown(project_choices, label="ποΈ Project Name") | |
| ai_dropdown = gr.Dropdown([""] + ai_suggestion_choices, label="π§ AI Suggestion") | |
| with gr.Group(): | |
| with gr.Row(): | |
| usage = gr.Number(label="β±οΈ Usage Hours", value=0, minimum=0) | |
| idle = gr.Number(label="π Idle Hours", value=0, minimum=0) | |
| with gr.Row(): | |
| freq = gr.Number(label="π Movement Frequency", value=0, minimum=0) | |
| cost = gr.Number(label="π° Cost per Hour", value=0, minimum=0) | |
| last = gr.Textbox(label="π οΈ Last Maintenance Date (YYYY-MM-DD)", placeholder="Optional") | |
| submit_btn = gr.Button("π Submit", variant="primary") | |
| clear_btn = gr.Button("π§Ή Clear") | |
| result_txt = gr.Markdown(elem_id="result-box") | |
| report_file = gr.File(label="π Download PDF Report") | |
| submit_btn.click( | |
| fn=manual_input, | |
| inputs=[equipment_dropdown, project_dropdown, usage, idle, freq, cost, last, ai_dropdown], | |
| outputs=[result_txt, report_file] # Only PDF output here now | |
| ) | |
| clear_btn.click(lambda: ("", None), None, [result_txt, report_file]) | |
| with gr.TabItem("CSV Upload"): | |
| with gr.Group(): | |
| csv_file = gr.File(label="π Upload CSV file", file_types=[".csv"]) | |
| with gr.Row(): | |
| upload_btn = gr.Button("π Upload", variant="primary") | |
| clear_btn = gr.Button("π§Ή Clear") | |
| csv_output = gr.Markdown(label="π Batch Upload Results", elem_id="result-box") | |
| batch_pdf = gr.File(label="π Download Batch PDF Report", file_types=[".pdf"]) | |
| upload_btn.click( | |
| fn=batch_upload, | |
| inputs=csv_file, | |
| outputs=[csv_output, batch_pdf] | |
| ) | |
| clear_btn.click( | |
| lambda: (None, "", None), # Reset file input, output markdown, and pdf file output | |
| None, | |
| [csv_file, csv_output, batch_pdf] | |
| ) | |
| app.css = """ | |
| .gradio-container { background-color: #ffffff !important; } | |
| #app-title { text-align: center !important; } | |
| .gradio-container .gr-group { background-color: #d3d3d3 !important; padding: 20px; border: 3px solid #d3d3d3 !important; border-radius: 10px; } | |
| #result-box { border: 3px solid #d3d3d3 !important; border-radius: 10px; padding: 10px; background: #f9f9f9; white-space: pre-line; } | |
| """ | |
| if __name__ == "__main__": | |
| app.launch() | |