Spaces:
Sleeping
Sleeping
| import os | |
| import logging | |
| import matplotlib.pyplot as plt | |
| import io | |
| from PIL import Image | |
| import pandas as pd | |
| from dotenv import load_dotenv | |
| from datetime import datetime | |
| from reportlab.lib.pagesizes import letter | |
| from reportlab.pdfgen import canvas | |
| from reportlab.lib.utils import ImageReader | |
| from reportlab.lib.colors import red, black | |
| import requests | |
| from simple_salesforce import Salesforce | |
| import gradio as gr # Added to fix NameError | |
| # Set up logging | |
| logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| # Load environment variables | |
| load_dotenv() | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| SF_USERNAME = os.getenv("SF_USERNAME") | |
| SF_PASSWORD = os.getenv("SF_PASSWORD") | |
| SF_SECURITY_TOKEN = os.getenv("SF_SECURITY_TOKEN") | |
| SF_INSTANCE_URL = os.getenv("SF_INSTANCE_URL", "https://budgetoverrunriskestimator-dev-ed.develop.my.salesforce.com") | |
| # Validate environment variables | |
| if not HF_TOKEN: | |
| logger.error("Hugging Face token not set. Please add HF_TOKEN to .env file or Space Secrets.") | |
| else: | |
| logger.info("Hugging Face token loaded successfully.") | |
| if not all([SF_USERNAME, SF_PASSWORD, SF_SECURITY_TOKEN]): | |
| logger.error("Salesforce credentials incomplete. Please set SF_USERNAME, SF_PASSWORD, and SF_SECURITY_TOKEN in .env.") | |
| sf = None | |
| else: | |
| # Initialize Salesforce connection | |
| try: | |
| sf = Salesforce( | |
| username=SF_USERNAME, | |
| password=SF_PASSWORD, | |
| security_token=SF_SECURITY_TOKEN, | |
| instance_url=SF_INSTANCE_URL | |
| ) | |
| logger.info("Salesforce connection established successfully.") | |
| except Exception as e: | |
| logger.error(f"Failed to connect to Salesforce: {str(e)}") | |
| sf = None | |
| # Custom function to format numbers in Indian style (e.g., 100000000 as 1,00,00,000.00) | |
| def format_indian_number(number): | |
| try: | |
| number = float(number) | |
| integer_part, decimal_part = f"{number:.2f}".split(".") | |
| integer_part = integer_part[::-1] | |
| formatted = "" | |
| for i, digit in enumerate(integer_part): | |
| if i == 3: | |
| formatted += "," | |
| elif i > 3 and (i - 3) % 2 == 0: | |
| formatted += "," | |
| formatted += digit | |
| integer_part = formatted[::-1] | |
| return f"₹{integer_part}.{decimal_part}" | |
| except (ValueError, TypeError) as e: | |
| logger.error(f"Error formatting number {number}: {str(e)}") | |
| return "₹0.00" | |
| # Function to fetch budget data from Salesforce | |
| def fetch_budget_from_salesforce(project_id): | |
| if not sf: | |
| return None, "Error: Salesforce connection not available." | |
| try: | |
| query = f""" | |
| SELECT Planned_Cost__c, Actual_Spend_To_Date__c | |
| FROM Project_Budget_Risk__c | |
| WHERE Project_Name__c = '{project_id}' | |
| """ | |
| result = sf.query(query) | |
| records = result['records'] | |
| if not records: | |
| return None, "Error: No budget data found for the given project ID." | |
| data = [] | |
| for record in records: | |
| data.append({ | |
| 'Planned_Cost': record['Planned_Cost__c'] or 0, | |
| 'Actual_Spend': record['Actual_Spend_To_Date__c'] or 0 | |
| }) | |
| df = pd.DataFrame(data) | |
| return df, None | |
| except Exception as e: | |
| logger.error(f"Error fetching data from Salesforce: {str(e)}") | |
| return None, f"Error fetching data: {str(e)}" | |
| # Function to process uploaded file for line items | |
| def process_uploaded_file(file): | |
| if file is None: | |
| return 0, 0, [] | |
| try: | |
| df = pd.read_csv(file) | |
| if len(df) > 200: | |
| raise ValueError("File exceeds 200 line items. Please upload a file with 200 or fewer line items.") | |
| planned_cost = df['Planned_Cost'].sum() | |
| actual_spend = df['Actual_Spend'].sum() | |
| line_items = df.to_dict('records') | |
| return planned_cost, actual_spend, line_items | |
| except Exception as e: | |
| logger.error(f"Error processing uploaded file: {str(e)}") | |
| return 0, 0, [] | |
| # Function to cross-check indices with 3rd-party sources | |
| def cross_check_indices(material_cost_index, labor_index): | |
| try: | |
| material_cost_index = float(material_cost_index) | |
| labor_index = float(labor_index) | |
| if not (0 <= material_cost_index <= 300 and 0 <= labor_index <= 300): | |
| return "Warning: Material Cost Index or Labor Index out of expected range (0-300)." | |
| return "Indices within expected range." | |
| except (ValueError, TypeError) as e: | |
| logger.error(f"Error validating indices: {str(e)}") | |
| return "Error: Invalid indices provided." | |
| # Function to generate a bar chart | |
| def generate_bar_plot(planned_cost_inr, actual_spend_inr, forecast_cost_inr): | |
| try: | |
| fig, ax = plt.subplots(figsize=(8, 6)) | |
| categories = ['Planned Cost', 'Actual Spend', 'Forecasted Cost'] | |
| values = [planned_cost_inr, actual_spend_inr, forecast_cost_inr] | |
| bars = ax.bar(categories, values, color=['#1f77b4', '#ff7f0e', '#2ca02c']) | |
| ax.set_title("Budget Overview", fontsize=14, pad=15) | |
| ax.set_ylabel("Amount (₹)", fontsize=12) | |
| ax.tick_params(axis='x', rotation=45) | |
| ax.grid(True, axis='y', linestyle='--', alpha=0.7) | |
| for bar in bars: | |
| height = bar.get_height() | |
| ax.text( | |
| bar.get_x() + bar.get_width() / 2, height, | |
| format_indian_number(height), ha='center', va='bottom', fontsize=10 | |
| ) | |
| buf_gradio = io.BytesIO() | |
| plt.savefig(buf_gradio, format='png', bbox_inches='tight', dpi=100) | |
| buf_gradio.seek(0) | |
| gradio_image = Image.open(buf_gradio) | |
| buf_pdf = io.BytesIO() | |
| plt.savefig(buf_pdf, format='png', bbox_inches='tight', dpi=100) | |
| buf_pdf.seek(0) | |
| plt.close() | |
| return gradio_image, buf_pdf | |
| except Exception as e: | |
| logger.error(f"Error generating bar plot: {str(e)}") | |
| return None, None | |
| # Function to generate a pie chart for risk distribution | |
| def generate_pie_chart_data(cost_deviation_factor, material_cost_factor, labor_cost_factor, scope_change_factor): | |
| try: | |
| labels = ['Cost Deviation', 'Material Cost', 'Labor Cost', 'Scope Change'] | |
| values = [ | |
| max(float(cost_deviation_factor) * 100, 0), | |
| max(float(material_cost_factor) * 100, 0), | |
| max(float(labor_cost_factor) * 100, 0), | |
| max(float(scope_change_factor) * 100, 0) | |
| ] | |
| total = sum(values) | |
| if total == 0: | |
| values = [25, 25, 25, 25] | |
| return { | |
| "type": "pie", | |
| "data": { | |
| "labels": labels, | |
| "datasets": [{ | |
| "label": "Risk Distribution", | |
| "data": values, | |
| "backgroundColor": ["#FF6384", "#36A2EB", "#FFCE56", "#4BC0C0"], | |
| "borderColor": ["#FF6384", "#36A2EB", "#FFCE56", "#4BC0C0"], | |
| "borderWidth": 1 | |
| }] | |
| }, | |
| "options": { | |
| "responsive": true, | |
| "plugins": { | |
| "legend": { | |
| "position": "top" | |
| }, | |
| "title": { | |
| "display": true, | |
| "text": "Risk Factor Distribution" | |
| } | |
| } | |
| } | |
| } | |
| except (ValueError, TypeError) as e: | |
| logger.error(f"Error generating pie chart data: {str(e)}") | |
| return { | |
| "type": "pie", | |
| "data": { | |
| "labels": ["Error"], | |
| "datasets": [{"label": "Error", "data": [100], "backgroundColor": ["#FF0000"]}] | |
| } | |
| } | |
| # Function to generate a gauge chart | |
| def generate_gauge_chart(risk_percentage, category): | |
| try: | |
| risk_percentage = float(risk_percentage) | |
| return { | |
| "type": "radar", | |
| "data": { | |
| "labels": ["Risk Level"], | |
| "datasets": [{ | |
| "label": f"Risk for {category} (%)", | |
| "data": [risk_percentage], | |
| "backgroundColor": "rgba(255, 99, 132, 0.2)", | |
| "borderColor": "rgba(255, 99, 132, 1)", | |
| "borderWidth": 1, | |
| "pointBackgroundColor": "rgba(255, 99, 132, 1)" | |
| }] | |
| }, | |
| "options": { | |
| "responsive": true, | |
| "scales": { | |
| "r": { | |
| "min": 0, | |
| "max": 100, | |
| "ticks": { | |
| "stepSize": 20 | |
| } | |
| } | |
| }, | |
| "plugins": { | |
| "legend": { | |
| "position": "top" | |
| }, | |
| "title": { | |
| "display": true, | |
| "text": f"Risk Level Dashboard for {category}" | |
| } | |
| } | |
| } | |
| } | |
| except (ValueError, TypeError) as e: | |
| logger.error(f"Error generating gauge chart: {str(e)}") | |
| return { | |
| "type": "radar", | |
| "data": { | |
| "labels": ["Error"], | |
| "datasets": [{"label": "Error", "data": [0], "backgroundColor": ["#FF0000"]}] | |
| } | |
| } | |
| # Function to generate a PDF report | |
| def generate_pdf(planned_cost_inr, actual_spend_inr, forecast_cost_inr, total_risk, risk_percentage, insights, status, top_causes, category, project_phase, material_cost_index, labor_index, scope_change_impact, alert_message, indices_validation, bar_chart_image): | |
| try: | |
| pdf_path = f"budget_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pdf" | |
| c = canvas.Canvas(pdf_path, pagesize=letter) | |
| width, height = letter | |
| c.setFont("Helvetica-Bold", 16) | |
| c.drawString(50, height - 50, "Budget Overrun Risk Report") | |
| c.setFont("Helvetica", 12) | |
| y_position = height - 100 | |
| text_color = red if status == "Critical" else black | |
| c.setFillColor(text_color) | |
| c.drawString(50, y_position, f"Category: {category}") | |
| y_position -= 20 | |
| c.drawString(50, y_position, f"Project Phase: {project_phase}") | |
| y_position -= 20 | |
| c.drawString(50, y_position, f"Material Cost Index: {material_cost_index}") | |
| y_position -= 20 | |
| c.drawString(50, y_position, f"Labor Index: {labor_index}") | |
| y_position -= 20 | |
| c.drawString(50, y_position, f"Indices Validation: {indices_validation}") | |
| y_position -= 20 | |
| c.drawString(50, y_position, f"Scope Change Impact: {scope_change_impact}%") | |
| y_position -= 20 | |
| c.drawString(50, y_position, f"Planned Cost: {format_indian_number(planned_cost_inr)}") | |
| y_position -= 20 | |
| c.drawString(50, y_position, f"Actual Spend: {format_indian_number(actual_spend_inr)}") | |
| y_position -= 20 | |
| c.drawString(50, y_position, f"Forecasted Cost: {format_indian_number(forecast_cost_inr)}") | |
| y_position -= 20 | |
| c.drawString(50, y_position, f"Total Risk: {total_risk}") | |
| y_position -= 20 | |
| c.drawString(50, y_position, f"Risk Percentage: {risk_percentage}%") | |
| y_position -= 20 | |
| c.drawString(50, y_position, f"Status: {status}") | |
| y_position -= 20 | |
| c.drawString(50, y_position, f"Insights: {insights}") | |
| y_position -= 20 | |
| c.drawString(50, y_position, f"Top Causes: {top_causes}") | |
| y_position -= 20 | |
| c.drawString(50, y_position, f"Alert: {alert_message}") | |
| y_position -= 40 | |
| if bar_chart_image: | |
| chart_reader = ImageReader(bar_chart_image) | |
| c.drawImage(chart_reader, 50, y_position - 300, width=500, height=300) | |
| c.showPage() | |
| c.save() | |
| return pdf_path | |
| except Exception as e: | |
| logger.error(f"Error generating PDF: {str(e)}") | |
| return None | |
| # Function to generate an Excel file | |
| def generate_excel(planned_cost_inr, actual_spend_inr, forecast_cost_inr, total_risk, risk_percentage, insights, status, top_causes, category, project_phase, material_cost_index, labor_index, scope_change_impact, alert_message, indices_validation): | |
| try: | |
| data = { | |
| "Category": [category], | |
| "Project Phase": [project_phase], | |
| "Material Cost Index": [material_cost_index], | |
| "Labor Index": [labor_index], | |
| "Indices Validation": [indices_validation], | |
| "Scope Change Impact (%)": [scope_change_impact], | |
| "Planned Cost (INR)": [planned_cost_inr], | |
| "Actual Spend (INR)": [actual_spend_inr], | |
| "Forecasted Cost (INR)": [forecast_cost_inr], | |
| "Total Risk": [total_risk], | |
| "Risk Percentage (%)": [risk_percentage], | |
| "Insights": [insights], | |
| "Status": [status], | |
| "Top Causes": [top_causes], | |
| "Alert": [alert_message] | |
| } | |
| df = pd.DataFrame(data) | |
| excel_path = f"prediction_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx" | |
| with pd.ExcelWriter(excel_path, engine='xlsxwriter') as writer: | |
| df.to_excel(writer, index=False, sheet_name='Results') | |
| workbook = writer.book | |
| worksheet = writer.sheets['Results'] | |
| number_format = workbook.add_format({'num_format': '[₹]#,##,##,##0.00'}) | |
| worksheet.set_column('G:G', None, number_format) | |
| worksheet.set_column('H:H', None, number_format) | |
| worksheet.set_column('I:I', None, number_format) | |
| return excel_path | |
| except Exception as e: | |
| logger.error(f"Error generating Excel: {str(e)}") | |
| return None | |
| # Function to store results in Salesforce | |
| def store_results_in_salesforce(project_id, planned_cost_inr, actual_spend_inr, forecast_cost_inr, risk_percentage, insights, status, top_causes, category, project_phase, pdf_path): | |
| if not sf: | |
| return "Error: Salesforce connection not available." | |
| try: | |
| record = { | |
| 'Project_Name__c': project_id, | |
| 'Budget_Category__c': category, | |
| 'Planned_Cost__c': planned_cost_inr, | |
| 'Actual_Spend_To_Date__c': actual_spend_inr, | |
| 'Forecast_Final_Cost__c': forecast_cost_inr, | |
| 'Overrun_Risk_Score__c': risk_percentage, | |
| 'AI_Insights__c': insights, | |
| 'Status__c': status, | |
| 'Top_Causes__c': top_causes, | |
| 'Project_Phase__c': project_phase | |
| } | |
| query = f"SELECT Id FROM Project_Budget_Risk__c WHERE Project_Name__c = '{project_id}'" | |
| result = sf.query(query) | |
| if result['records']: | |
| record_id = result['records'][0]['Id'] | |
| sf.Project_Budget_Risk__c.update(record_id, record) | |
| else: | |
| sf.Project_Budget_Risk__c.create(record) | |
| if pdf_path and os.path.exists(pdf_path): | |
| with open(pdf_path, 'rb') as pdf_file: | |
| sf_file = sf.ContentVersion.create({ | |
| 'Title': f"Budget Report {project_id} {datetime.now().strftime('%Y%m%d_%H%M%S')}", | |
| 'PathOnClient': pdf_path, | |
| 'VersionData': pdf_file.read().hex() | |
| }) | |
| file_id = sf_file['id'] | |
| sf.ContentDocumentLink.create({ | |
| 'ContentDocumentId': file_id, | |
| 'LinkedEntityId': record_id, | |
| 'ShareType': 'V' | |
| }) | |
| return f"Results stored in Salesforce with PDF ID: {file_id}" | |
| return "Results stored in Salesforce (no PDF uploaded)." | |
| except Exception as e: | |
| logger.error(f"Error storing results in Salesforce: {str(e)}") | |
| return f"Error storing results in Salesforce: {str(e)}" | |
| # Prediction function | |
| def predict_risk(username, file, project_id, category, material_cost_index, labor_index, scope_change_impact, project_phase): | |
| # Validate inputs | |
| if not username: | |
| logger.error("Username is empty.") | |
| return "Error: Salesforce username is required.", None, None, None, None, None, None | |
| # Validate user role via Salesforce | |
| if not sf: | |
| return "Error: Salesforce connection not available.", None, None, None, None, None, None | |
| try: | |
| user_query = f"SELECT Profile.Name FROM User WHERE Username = '{username}'" | |
| user_result = sf.query(user_query) | |
| if not user_result['records'] or user_result['records'][0]['Profile']['Name'] != 'Finance': | |
| logger.warning(f"Access denied for user {username}: Not a Finance role.") | |
| return "Access Denied: This app is restricted to finance roles only.", None, None, None, None, None, None | |
| except Exception as e: | |
| logger.error(f"Error validating user {username}: {str(e)}") | |
| return f"Error validating user: {str(e)}", None, None, None, None, None, None | |
| # Fetch data from Salesforce if no file is uploaded | |
| if file is None and project_id: | |
| df, error = fetch_budget_from_salesforce(project_id) | |
| if error: | |
| return error, None, None, None, None, None, None | |
| else: | |
| df = None | |
| # Process uploaded file or use Salesforce data | |
| try: | |
| if df is not None: | |
| planned_cost_inr = df['Planned_Cost'].sum() | |
| actual_spend_inr = df['Actual_Spend'].sum() | |
| line_items = df.to_dict('records') | |
| else: | |
| planned_cost_inr, actual_spend_inr, line_items = process_uploaded_file(file) | |
| except Exception as e: | |
| logger.error(f"Error processing data: {str(e)}") | |
| return f"Error processing data: {str(e)}", None, None, None, None, None, None | |
| # Validate numeric inputs | |
| try: | |
| material_cost_index = float(material_cost_index) if material_cost_index else 0 | |
| labor_index = float(labor_index) if labor_index else 0 | |
| scope_change_impact = float(scope_change_impact) if scope_change_impact else 0 | |
| except ValueError: | |
| logger.error("Invalid input: Material Cost Index, Labor Index, or Scope Change Impact must be numeric.") | |
| return "Error: All numeric inputs must be valid numbers.", None, None, None, None, None, None | |
| logger.debug(f"Starting prediction: planned_cost_inr={planned_cost_inr}, actual_spend_inr={actual_spend_inr}, " | |
| f"category={category}, material_cost_index={material_cost_index}, labor_index={labor_index}, " | |
| f"scope_change_impact={scope_change_impact}, project_phase={project_phase}") | |
| # Cross-check indices | |
| indices_validation = cross_check_indices(material_cost_index, labor_index) | |
| # Risk calculation with Hugging Face API | |
| if HF_TOKEN: | |
| try: | |
| api_url = "https://api.huggingface.co/models/budget-overrun-risk" | |
| headers = {"Authorization": f"Bearer {HF_TOKEN}"} | |
| payload = { | |
| "planned_cost": planned_cost_inr, | |
| "actual_spend": actual_spend_inr, | |
| "material_cost_index": material_cost_index, | |
| "labor_index": labor_index, | |
| "scope_change_impact": scope_change_impact | |
| } | |
| response = requests.post(api_url, json=payload, headers=headers) | |
| response.raise_for_status() | |
| result = response.json() | |
| risk_percentage = result['risk_percentage'] | |
| cost_deviation_factor = result.get('cost_deviation_factor', 0) | |
| material_cost_factor = result.get('material_cost_factor', 0) | |
| labor_cost_factor = result.get('labor_cost_factor', 0) | |
| scope_change_factor = result.get('scope_change_factor', 0) | |
| except Exception as e: | |
| logger.error(f"Hugging Face API call failed: {str(e)}. Falling back to heuristic formula.") | |
| cost_deviation_factor = (actual_spend_inr - planned_cost_inr) / planned_cost_inr if planned_cost_inr > 0 else 0 | |
| material_cost_factor = (material_cost_index - 100) / 100 if material_cost_index > 100 else 0 | |
| labor_cost_factor = (labor_index - 100) / 100 if labor_index > 100 else 0 | |
| scope_change_factor = scope_change_impact / 100 | |
| risk_percentage = calculate_heuristic_risk(cost_deviation_factor, material_cost_factor, labor_cost_factor, scope_change_factor) | |
| else: | |
| logger.warning("No HF_TOKEN provided. Using heuristic formula for risk calculation.") | |
| cost_deviation_factor = (actual_spend_inr - planned_cost_inr) / planned_cost_inr if planned_cost_inr > 0 else 0 | |
| material_cost_factor = (material_cost_index - 100) / 100 if material_cost_index > 100 else 0 | |
| labor_cost_factor = (labor_index - 100) / 100 if labor_index > 100 else 0 | |
| scope_change_factor = scope_change_impact / 100 | |
| risk_percentage = calculate_heuristic_risk(cost_deviation_factor, material_cost_factor, labor_cost_factor, scope_change_factor) | |
| total_risk = 1 if risk_percentage > 50 else 0 | |
| forecast_cost_inr = planned_cost_inr * (1 + risk_percentage / 100) | |
| insights = "High risk of overrun" if total_risk == 1 else "Low risk of overrun" | |
| status = "Critical" if total_risk == 1 else "Healthy" | |
| # Identify top 3 causes | |
| causes = [] | |
| if cost_deviation_factor > 0: | |
| causes.append(f"Budget Overrun (Deviation: {round(cost_deviation_factor * 100, 2)}%)") | |
| if material_cost_index > 120: | |
| causes.append(f"Material Cost Index Deviation (Index: {material_cost_index})") | |
| if labor_index > 150: | |
| causes.append(f"Labor Index Deviation (Index: {labor_index})") | |
| if scope_change_impact > 0: | |
| causes.append(f"Scope Change Impact ({scope_change_impact}%)") | |
| while len(causes) < 3: | |
| causes.append("N/A") | |
| top_causes = ", ".join(causes) | |
| # Generate alert | |
| deviation = ((forecast_cost_inr - planned_cost_inr) / planned_cost_inr * 100) if planned_cost_inr > 0 else 0 | |
| alert_message = "Alert: Forecasted cost exceeds planned cost by more than 10%. Notify finance and engineering teams." if deviation > 10 else "No alert triggered." | |
| alert_style = "background-color: #ffcccc; padding: 10px; border: 1px solid red; border-radius: 5px;" if deviation > 10 else "background-color: #ccffcc; padding: 10px; border: 1px solid green; border-radius: 5px;" | |
| # Generate visualizations | |
| bar_chart_image, bar_chart_image_pdf = generate_bar_plot(planned_cost_inr, actual_spend_inr, forecast_cost_inr) | |
| pie_chart_data = generate_pie_chart_data(cost_deviation_factor, material_cost_factor, labor_cost_factor, scope_change_factor) | |
| gauge_chart_data = generate_gauge_chart(risk_percentage, category) | |
| # Generate reports | |
| pdf_file = generate_pdf(planned_cost_inr, actual_spend_inr, forecast_cost_inr, total_risk, risk_percentage, insights, status, top_causes, category, project_phase, material_cost_index, labor_index, scope_change_impact, alert_message, indices_validation, bar_chart_image_pdf) | |
| excel_file = generate_excel(planned_cost_inr, actual_spend_inr, forecast_cost_inr, total_risk, risk_percentage, insights, status, top_causes, category, project_phase, material_cost_index, labor_index, scope_change_impact, alert_message, indices_validation) | |
| # Store results in Salesforce | |
| if project_id: | |
| sf_result = store_results_in_salesforce(project_id, planned_cost_inr, actual_spend_inr, forecast_cost_inr, risk_percentage, insights, status, top_causes, category, project_phase, pdf_file) | |
| else: | |
| sf_result = "No project ID provided; results not stored in Salesforce." | |
| # Format output | |
| risk_level = "High" if total_risk == 1 else "Low" | |
| output_text = ( | |
| f"Risk Summary\n" | |
| f"----------------------------------------\n" | |
| f"Risk Level: {risk_level}\n" | |
| f"Risk Percentage: {risk_percentage}%\n" | |
| f"Status: {status}\n" | |
| f"Insights: {insights} due to {top_causes.lower()}.\n\n" | |
| f"Project Details\n" | |
| f"----------------------------------------\n" | |
| f"Category: {category}\n" | |
| f"Project Phase: {project_phase}\n" | |
| f"Material Cost Index: {material_cost_index}\n" | |
| f"Labor Index: {labor_index}\n" | |
| f"Indices Validation: {indices_validation}\n" | |
| f"Scope Change Impact: {scope_change_impact}%\n\n" | |
| f"Forecast Chart\n" | |
| f"----------------------------------------\n" | |
| f"[Bar chart displayed below]\n\n" | |
| f"Detailed Metrics\n" | |
| f"----------------------------------------\n" | |
| f"Total Risk: {total_risk}\n" | |
| f"Planned Cost: {format_indian_number(planned_cost_inr)}\n" | |
| f"Actual Spend: {format_indian_number(actual_spend_inr)}\n" | |
| f"Forecasted Cost: {format_indian_number(forecast_cost_inr)}\n" | |
| f"Top Causes: {top_causes}\n" | |
| f"Salesforce Storage: {sf_result}\n" | |
| f"Local PDF Report: [Download link below]\n" | |
| f"Excel Report: [Download link below]" | |
| ) | |
| return output_text, bar_chart_image, pie_chart_data, gauge_chart_data, pdf_file, excel_file, f"<div style='{alert_style}'>{alert_message}</div>" | |
| # Helper function for heuristic risk calculation | |
| def calculate_heuristic_risk(cost_deviation_factor, material_cost_factor, labor_cost_factor, scope_change_factor): | |
| try: | |
| weights = {'cost_deviation': 0.4, 'material_cost': 0.2, 'labor_cost': 0.2, 'scope_change': 0.2} | |
| risk_percentage = ( | |
| weights['cost_deviation'] * min(float(cost_deviation_factor) * 100, 100) + | |
| weights['material_cost'] * min(float(material_cost_factor) * 100, 100) + | |
| weights['labor_cost'] * min(float(labor_cost_factor) * 100, 100) + | |
| weights['scope_change'] * min(float(scope_change_factor) * 100, 100) | |
| ) | |
| return round(max(0, min(risk_percentage, 100)), 2) | |
| except (ValueError, TypeError) as e: | |
| logger.error(f"Error calculating heuristic risk: {str(e)}") | |
| return 0 | |
| # Function to update explanations | |
| def update_material_cost_explanation(category): | |
| material_examples = { | |
| "Civil": "cement", | |
| "Plumbing": "pipes and fittings", | |
| "Electrical": "wiring and conduits", | |
| "Mechanical": "HVAC equipment and ducting", | |
| "Finishing": "tiles and paint", | |
| "Others": "key materials" | |
| } | |
| material = material_examples.get(category, "key materials") | |
| return ( | |
| f"**Material Cost Index**: This tracks the cost trend of primary materials for {category} projects (e.g., {material}) " | |
| f"compared to a baseline (100 = average cost in a reference year). Higher values indicate rising material costs, " | |
| f"increasing project expenses. A value above 120 flags a potential risk. Example: If the index is 130, material costs " | |
| f"are 30% higher than the baseline." | |
| ) | |
| def update_labor_explanation(category): | |
| labor_examples = { | |
| "Civil": "construction workers", | |
| "Plumbing": "plumbers and pipefitters", | |
| "Electrical": "electricians", | |
| "Mechanical": "HVAC technicians", | |
| "Finishing": "painters and tilers", | |
| "Others": "specialized labor" | |
| } | |
| labor = labor_examples.get(category, "specialized labor") | |
| return ( | |
| f"**Labor Index**: This tracks the cost trend of labor for {category} projects (e.g., {labor}) compared to a baseline " | |
| f"(100 = average cost in a reference year). Higher values indicate rising labor costs, increasing project expenses. " | |
| f"A value above 150 flags a potential risk. Example: If the index is 160, labor costs are 60% higher than the baseline." | |
| ) | |
| # Custom CSS | |
| custom_css = """ | |
| #submit-button { | |
| background-color: #FFD700 !important; | |
| color: #333 !important; | |
| width: 150px !important; | |
| height: 40px !important; | |
| border: none !important; | |
| border-radius: 5px !important; | |
| font-size: 16px !important; | |
| display: flex !important; | |
| align-items: center !important; | |
| justify-content: center !important; | |
| } | |
| #custom_css:hover { | |
| background-color: #E6C200 !important; | |
| } | |
| """ | |
| # Gradio interface | |
| with gr.Blocks(title="Budget Overrun Risk Estimator", css=custom_css) as demo: | |
| gr.Markdown("# Budget Overrun Risk Estimator") | |
| gr.Markdown("Upload a CSV file or provide a Project ID to fetch budget line items from Salesforce. All numeric fields are required.") | |
| with gr.Row(): | |
| with gr.Column(): | |
| username_input = gr.Textbox(label="Salesforce Username", placeholder="Enter your Salesforce username") | |
| project_id_input = gr.Textbox(label="Project ID (Optional)", placeholder="Enter Project ID to fetch data from Salesforce") | |
| file_input = gr.File(label="Upload Budget Line Items (CSV, Optional if Project ID provided)", file_types=[".csv"]) | |
| category_input = gr.Dropdown(label="Category", choices=["Civil", "Electrical", "Plumbing", "Mechanical", "Finishing", "Others"], value="Plumbing") | |
| material_cost_input = gr.Textbox(label="Material Cost Index", placeholder="Enter material cost index (e.g., 120)") | |
| material_cost_explanation = gr.Markdown(update_material_cost_explanation("Plumbing")) | |
| labor_index_input = gr.Textbox(label="Labor Index", placeholder="Enter labor index (e.g., 130)") | |
| labor_index_explanation = gr.Markdown(update_labor_explanation("Plumbing")) | |
| scope_change_input = gr.Textbox(label="Scope Change Impact (%)", placeholder="Enter scope change impact as a percentage (e.g., 10 for 10%)") | |
| project_phase_input = gr.Dropdown(label="Project Phase", choices=["Planning", "Execution", "Closure"], value="Planning") | |
| with gr.Row(): | |
| clear_button = gr.Button("Clear") | |
| submit_button = gr.Button("Submit", elem_id="submit-button") | |
| with gr.Column(): | |
| gr.Markdown("## Dashboard") | |
| gauge_chart_output = gr.Plot(label="Risk Level Dashboard") | |
| gr.Markdown("## Prediction Results") | |
| output_text = gr.Textbox(label="Prediction Results", lines=20, max_lines=30) | |
| gr.Markdown("## Forecast Chart") | |
| bar_chart_output = gr.Image(label="Budget Overview (Bar Chart)") | |
| gr.Markdown("## Risk Distribution") | |
| pie_chart_output = gr.Plot(label="Risk Factor Distribution (Pie Chart)") | |
| gr.Markdown("## Alerts") | |
| alert_output = gr.HTML(label="Alert Notification") | |
| output_pdf = gr.File(label="Download Local PDF Report") | |
| output_excel = gr.File(label="Download Excel Report") | |
| category_input.change( | |
| fn=update_material_cost_explanation, | |
| inputs=category_input, | |
| outputs=material_cost_explanation | |
| ) | |
| category_input.change( | |
| fn=update_labor_explanation, | |
| inputs=category_input, | |
| outputs=labor_index_explanation | |
| ) | |
| clear_button.click( | |
| fn=lambda: ("", "", None, "Plumbing", "", "", "", "Planning", "", None, None, None, "", ""), | |
| outputs=[ | |
| username_input, project_id_input, file_input, category_input, material_cost_input, labor_index_input, | |
| scope_change_input, project_phase_input, output_text, bar_chart_output, | |
| pie_chart_output, gauge_chart_output, output_pdf, output_excel, alert_output | |
| ] | |
| ) | |
| submit_button.click( | |
| fn=predict_risk, | |
| inputs=[username_input, file_input, project_id_input, category_input, material_cost_input, labor_index_input, scope_change_input, project_phase_input], | |
| outputs=[output_text, bar_chart_output, pie_chart_output, gauge_chart_output, output_pdf, output_excel, alert_output] | |
| ) | |
| # Launch the app | |
| if __name__ == "__main__": | |
| try: | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| auth_message="Please log in with your Salesforce credentials.", | |
| allowed_paths=["/home/user/app"], | |
| ssr_mode=False | |
| ) | |
| except Exception as e: | |
| logger.error(f"Failed to launch Gradio app: {str(e)}") |