import os import logging import matplotlib.pyplot as plt import io from PIL import Image import pandas as pd from dotenv import load_dotenv from datetime import datetime from reportlab.lib.pagesizes import letter from reportlab.pdfgen import canvas from reportlab.lib.utils import ImageReader from reportlab.lib.colors import red, black import requests from simple_salesforce import Salesforce import gradio as gr # Added to fix NameError # Set up logging logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Load environment variables load_dotenv() HF_TOKEN = os.getenv("HF_TOKEN") SF_USERNAME = os.getenv("SF_USERNAME") SF_PASSWORD = os.getenv("SF_PASSWORD") SF_SECURITY_TOKEN = os.getenv("SF_SECURITY_TOKEN") SF_INSTANCE_URL = os.getenv("SF_INSTANCE_URL", "https://budgetoverrunriskestimator-dev-ed.develop.my.salesforce.com") # Validate environment variables if not HF_TOKEN: logger.error("Hugging Face token not set. Please add HF_TOKEN to .env file or Space Secrets.") else: logger.info("Hugging Face token loaded successfully.") if not all([SF_USERNAME, SF_PASSWORD, SF_SECURITY_TOKEN]): logger.error("Salesforce credentials incomplete. Please set SF_USERNAME, SF_PASSWORD, and SF_SECURITY_TOKEN in .env.") sf = None else: # Initialize Salesforce connection try: sf = Salesforce( username=SF_USERNAME, password=SF_PASSWORD, security_token=SF_SECURITY_TOKEN, instance_url=SF_INSTANCE_URL ) logger.info("Salesforce connection established successfully.") except Exception as e: logger.error(f"Failed to connect to Salesforce: {str(e)}") sf = None # Custom function to format numbers in Indian style (e.g., 100000000 as 1,00,00,000.00) def format_indian_number(number): try: number = float(number) integer_part, decimal_part = f"{number:.2f}".split(".") integer_part = integer_part[::-1] formatted = "" for i, digit in enumerate(integer_part): if i == 3: formatted += "," elif i > 3 and (i - 3) % 2 == 0: formatted += "," formatted += digit integer_part = formatted[::-1] return f"₹{integer_part}.{decimal_part}" except (ValueError, TypeError) as e: logger.error(f"Error formatting number {number}: {str(e)}") return "₹0.00" # Function to fetch budget data from Salesforce def fetch_budget_from_salesforce(project_id): if not sf: return None, "Error: Salesforce connection not available." try: query = f""" SELECT Planned_Cost__c, Actual_Spend_To_Date__c FROM Project_Budget_Risk__c WHERE Project_Name__c = '{project_id}' """ result = sf.query(query) records = result['records'] if not records: return None, "Error: No budget data found for the given project ID." data = [] for record in records: data.append({ 'Planned_Cost': record['Planned_Cost__c'] or 0, 'Actual_Spend': record['Actual_Spend_To_Date__c'] or 0 }) df = pd.DataFrame(data) return df, None except Exception as e: logger.error(f"Error fetching data from Salesforce: {str(e)}") return None, f"Error fetching data: {str(e)}" # Function to process uploaded file for line items def process_uploaded_file(file): if file is None: return 0, 0, [] try: df = pd.read_csv(file) if len(df) > 200: raise ValueError("File exceeds 200 line items. Please upload a file with 200 or fewer line items.") planned_cost = df['Planned_Cost'].sum() actual_spend = df['Actual_Spend'].sum() line_items = df.to_dict('records') return planned_cost, actual_spend, line_items except Exception as e: logger.error(f"Error processing uploaded file: {str(e)}") return 0, 0, [] # Function to cross-check indices with 3rd-party sources def cross_check_indices(material_cost_index, labor_index): try: material_cost_index = float(material_cost_index) labor_index = float(labor_index) if not (0 <= material_cost_index <= 300 and 0 <= labor_index <= 300): return "Warning: Material Cost Index or Labor Index out of expected range (0-300)." return "Indices within expected range." except (ValueError, TypeError) as e: logger.error(f"Error validating indices: {str(e)}") return "Error: Invalid indices provided." # Function to generate a bar chart def generate_bar_plot(planned_cost_inr, actual_spend_inr, forecast_cost_inr): try: fig, ax = plt.subplots(figsize=(8, 6)) categories = ['Planned Cost', 'Actual Spend', 'Forecasted Cost'] values = [planned_cost_inr, actual_spend_inr, forecast_cost_inr] bars = ax.bar(categories, values, color=['#1f77b4', '#ff7f0e', '#2ca02c']) ax.set_title("Budget Overview", fontsize=14, pad=15) ax.set_ylabel("Amount (₹)", fontsize=12) ax.tick_params(axis='x', rotation=45) ax.grid(True, axis='y', linestyle='--', alpha=0.7) for bar in bars: height = bar.get_height() ax.text( bar.get_x() + bar.get_width() / 2, height, format_indian_number(height), ha='center', va='bottom', fontsize=10 ) buf_gradio = io.BytesIO() plt.savefig(buf_gradio, format='png', bbox_inches='tight', dpi=100) buf_gradio.seek(0) gradio_image = Image.open(buf_gradio) buf_pdf = io.BytesIO() plt.savefig(buf_pdf, format='png', bbox_inches='tight', dpi=100) buf_pdf.seek(0) plt.close() return gradio_image, buf_pdf except Exception as e: logger.error(f"Error generating bar plot: {str(e)}") return None, None # Function to generate a pie chart for risk distribution def generate_pie_chart_data(cost_deviation_factor, material_cost_factor, labor_cost_factor, scope_change_factor): try: labels = ['Cost Deviation', 'Material Cost', 'Labor Cost', 'Scope Change'] values = [ max(float(cost_deviation_factor) * 100, 0), max(float(material_cost_factor) * 100, 0), max(float(labor_cost_factor) * 100, 0), max(float(scope_change_factor) * 100, 0) ] total = sum(values) if total == 0: values = [25, 25, 25, 25] return { "type": "pie", "data": { "labels": labels, "datasets": [{ "label": "Risk Distribution", "data": values, "backgroundColor": ["#FF6384", "#36A2EB", "#FFCE56", "#4BC0C0"], "borderColor": ["#FF6384", "#36A2EB", "#FFCE56", "#4BC0C0"], "borderWidth": 1 }] }, "options": { "responsive": true, "plugins": { "legend": { "position": "top" }, "title": { "display": true, "text": "Risk Factor Distribution" } } } } except (ValueError, TypeError) as e: logger.error(f"Error generating pie chart data: {str(e)}") return { "type": "pie", "data": { "labels": ["Error"], "datasets": [{"label": "Error", "data": [100], "backgroundColor": ["#FF0000"]}] } } # Function to generate a gauge chart def generate_gauge_chart(risk_percentage, category): try: risk_percentage = float(risk_percentage) return { "type": "radar", "data": { "labels": ["Risk Level"], "datasets": [{ "label": f"Risk for {category} (%)", "data": [risk_percentage], "backgroundColor": "rgba(255, 99, 132, 0.2)", "borderColor": "rgba(255, 99, 132, 1)", "borderWidth": 1, "pointBackgroundColor": "rgba(255, 99, 132, 1)" }] }, "options": { "responsive": true, "scales": { "r": { "min": 0, "max": 100, "ticks": { "stepSize": 20 } } }, "plugins": { "legend": { "position": "top" }, "title": { "display": true, "text": f"Risk Level Dashboard for {category}" } } } } except (ValueError, TypeError) as e: logger.error(f"Error generating gauge chart: {str(e)}") return { "type": "radar", "data": { "labels": ["Error"], "datasets": [{"label": "Error", "data": [0], "backgroundColor": ["#FF0000"]}] } } # Function to generate a PDF report def generate_pdf(planned_cost_inr, actual_spend_inr, forecast_cost_inr, total_risk, risk_percentage, insights, status, top_causes, category, project_phase, material_cost_index, labor_index, scope_change_impact, alert_message, indices_validation, bar_chart_image): try: pdf_path = f"budget_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pdf" c = canvas.Canvas(pdf_path, pagesize=letter) width, height = letter c.setFont("Helvetica-Bold", 16) c.drawString(50, height - 50, "Budget Overrun Risk Report") c.setFont("Helvetica", 12) y_position = height - 100 text_color = red if status == "Critical" else black c.setFillColor(text_color) c.drawString(50, y_position, f"Category: {category}") y_position -= 20 c.drawString(50, y_position, f"Project Phase: {project_phase}") y_position -= 20 c.drawString(50, y_position, f"Material Cost Index: {material_cost_index}") y_position -= 20 c.drawString(50, y_position, f"Labor Index: {labor_index}") y_position -= 20 c.drawString(50, y_position, f"Indices Validation: {indices_validation}") y_position -= 20 c.drawString(50, y_position, f"Scope Change Impact: {scope_change_impact}%") y_position -= 20 c.drawString(50, y_position, f"Planned Cost: {format_indian_number(planned_cost_inr)}") y_position -= 20 c.drawString(50, y_position, f"Actual Spend: {format_indian_number(actual_spend_inr)}") y_position -= 20 c.drawString(50, y_position, f"Forecasted Cost: {format_indian_number(forecast_cost_inr)}") y_position -= 20 c.drawString(50, y_position, f"Total Risk: {total_risk}") y_position -= 20 c.drawString(50, y_position, f"Risk Percentage: {risk_percentage}%") y_position -= 20 c.drawString(50, y_position, f"Status: {status}") y_position -= 20 c.drawString(50, y_position, f"Insights: {insights}") y_position -= 20 c.drawString(50, y_position, f"Top Causes: {top_causes}") y_position -= 20 c.drawString(50, y_position, f"Alert: {alert_message}") y_position -= 40 if bar_chart_image: chart_reader = ImageReader(bar_chart_image) c.drawImage(chart_reader, 50, y_position - 300, width=500, height=300) c.showPage() c.save() return pdf_path except Exception as e: logger.error(f"Error generating PDF: {str(e)}") return None # Function to generate an Excel file def generate_excel(planned_cost_inr, actual_spend_inr, forecast_cost_inr, total_risk, risk_percentage, insights, status, top_causes, category, project_phase, material_cost_index, labor_index, scope_change_impact, alert_message, indices_validation): try: data = { "Category": [category], "Project Phase": [project_phase], "Material Cost Index": [material_cost_index], "Labor Index": [labor_index], "Indices Validation": [indices_validation], "Scope Change Impact (%)": [scope_change_impact], "Planned Cost (INR)": [planned_cost_inr], "Actual Spend (INR)": [actual_spend_inr], "Forecasted Cost (INR)": [forecast_cost_inr], "Total Risk": [total_risk], "Risk Percentage (%)": [risk_percentage], "Insights": [insights], "Status": [status], "Top Causes": [top_causes], "Alert": [alert_message] } df = pd.DataFrame(data) excel_path = f"prediction_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx" with pd.ExcelWriter(excel_path, engine='xlsxwriter') as writer: df.to_excel(writer, index=False, sheet_name='Results') workbook = writer.book worksheet = writer.sheets['Results'] number_format = workbook.add_format({'num_format': '[₹]#,##,##,##0.00'}) worksheet.set_column('G:G', None, number_format) worksheet.set_column('H:H', None, number_format) worksheet.set_column('I:I', None, number_format) return excel_path except Exception as e: logger.error(f"Error generating Excel: {str(e)}") return None # Function to store results in Salesforce def store_results_in_salesforce(project_id, planned_cost_inr, actual_spend_inr, forecast_cost_inr, risk_percentage, insights, status, top_causes, category, project_phase, pdf_path): if not sf: return "Error: Salesforce connection not available." try: record = { 'Project_Name__c': project_id, 'Budget_Category__c': category, 'Planned_Cost__c': planned_cost_inr, 'Actual_Spend_To_Date__c': actual_spend_inr, 'Forecast_Final_Cost__c': forecast_cost_inr, 'Overrun_Risk_Score__c': risk_percentage, 'AI_Insights__c': insights, 'Status__c': status, 'Top_Causes__c': top_causes, 'Project_Phase__c': project_phase } query = f"SELECT Id FROM Project_Budget_Risk__c WHERE Project_Name__c = '{project_id}'" result = sf.query(query) if result['records']: record_id = result['records'][0]['Id'] sf.Project_Budget_Risk__c.update(record_id, record) else: sf.Project_Budget_Risk__c.create(record) if pdf_path and os.path.exists(pdf_path): with open(pdf_path, 'rb') as pdf_file: sf_file = sf.ContentVersion.create({ 'Title': f"Budget Report {project_id} {datetime.now().strftime('%Y%m%d_%H%M%S')}", 'PathOnClient': pdf_path, 'VersionData': pdf_file.read().hex() }) file_id = sf_file['id'] sf.ContentDocumentLink.create({ 'ContentDocumentId': file_id, 'LinkedEntityId': record_id, 'ShareType': 'V' }) return f"Results stored in Salesforce with PDF ID: {file_id}" return "Results stored in Salesforce (no PDF uploaded)." except Exception as e: logger.error(f"Error storing results in Salesforce: {str(e)}") return f"Error storing results in Salesforce: {str(e)}" # Prediction function def predict_risk(username, file, project_id, category, material_cost_index, labor_index, scope_change_impact, project_phase): # Validate inputs if not username: logger.error("Username is empty.") return "Error: Salesforce username is required.", None, None, None, None, None, None # Validate user role via Salesforce if not sf: return "Error: Salesforce connection not available.", None, None, None, None, None, None try: user_query = f"SELECT Profile.Name FROM User WHERE Username = '{username}'" user_result = sf.query(user_query) if not user_result['records'] or user_result['records'][0]['Profile']['Name'] != 'Finance': logger.warning(f"Access denied for user {username}: Not a Finance role.") return "Access Denied: This app is restricted to finance roles only.", None, None, None, None, None, None except Exception as e: logger.error(f"Error validating user {username}: {str(e)}") return f"Error validating user: {str(e)}", None, None, None, None, None, None # Fetch data from Salesforce if no file is uploaded if file is None and project_id: df, error = fetch_budget_from_salesforce(project_id) if error: return error, None, None, None, None, None, None else: df = None # Process uploaded file or use Salesforce data try: if df is not None: planned_cost_inr = df['Planned_Cost'].sum() actual_spend_inr = df['Actual_Spend'].sum() line_items = df.to_dict('records') else: planned_cost_inr, actual_spend_inr, line_items = process_uploaded_file(file) except Exception as e: logger.error(f"Error processing data: {str(e)}") return f"Error processing data: {str(e)}", None, None, None, None, None, None # Validate numeric inputs try: material_cost_index = float(material_cost_index) if material_cost_index else 0 labor_index = float(labor_index) if labor_index else 0 scope_change_impact = float(scope_change_impact) if scope_change_impact else 0 except ValueError: logger.error("Invalid input: Material Cost Index, Labor Index, or Scope Change Impact must be numeric.") return "Error: All numeric inputs must be valid numbers.", None, None, None, None, None, None logger.debug(f"Starting prediction: planned_cost_inr={planned_cost_inr}, actual_spend_inr={actual_spend_inr}, " f"category={category}, material_cost_index={material_cost_index}, labor_index={labor_index}, " f"scope_change_impact={scope_change_impact}, project_phase={project_phase}") # Cross-check indices indices_validation = cross_check_indices(material_cost_index, labor_index) # Risk calculation with Hugging Face API if HF_TOKEN: try: api_url = "https://api.huggingface.co/models/budget-overrun-risk" headers = {"Authorization": f"Bearer {HF_TOKEN}"} payload = { "planned_cost": planned_cost_inr, "actual_spend": actual_spend_inr, "material_cost_index": material_cost_index, "labor_index": labor_index, "scope_change_impact": scope_change_impact } response = requests.post(api_url, json=payload, headers=headers) response.raise_for_status() result = response.json() risk_percentage = result['risk_percentage'] cost_deviation_factor = result.get('cost_deviation_factor', 0) material_cost_factor = result.get('material_cost_factor', 0) labor_cost_factor = result.get('labor_cost_factor', 0) scope_change_factor = result.get('scope_change_factor', 0) except Exception as e: logger.error(f"Hugging Face API call failed: {str(e)}. Falling back to heuristic formula.") cost_deviation_factor = (actual_spend_inr - planned_cost_inr) / planned_cost_inr if planned_cost_inr > 0 else 0 material_cost_factor = (material_cost_index - 100) / 100 if material_cost_index > 100 else 0 labor_cost_factor = (labor_index - 100) / 100 if labor_index > 100 else 0 scope_change_factor = scope_change_impact / 100 risk_percentage = calculate_heuristic_risk(cost_deviation_factor, material_cost_factor, labor_cost_factor, scope_change_factor) else: logger.warning("No HF_TOKEN provided. Using heuristic formula for risk calculation.") cost_deviation_factor = (actual_spend_inr - planned_cost_inr) / planned_cost_inr if planned_cost_inr > 0 else 0 material_cost_factor = (material_cost_index - 100) / 100 if material_cost_index > 100 else 0 labor_cost_factor = (labor_index - 100) / 100 if labor_index > 100 else 0 scope_change_factor = scope_change_impact / 100 risk_percentage = calculate_heuristic_risk(cost_deviation_factor, material_cost_factor, labor_cost_factor, scope_change_factor) total_risk = 1 if risk_percentage > 50 else 0 forecast_cost_inr = planned_cost_inr * (1 + risk_percentage / 100) insights = "High risk of overrun" if total_risk == 1 else "Low risk of overrun" status = "Critical" if total_risk == 1 else "Healthy" # Identify top 3 causes causes = [] if cost_deviation_factor > 0: causes.append(f"Budget Overrun (Deviation: {round(cost_deviation_factor * 100, 2)}%)") if material_cost_index > 120: causes.append(f"Material Cost Index Deviation (Index: {material_cost_index})") if labor_index > 150: causes.append(f"Labor Index Deviation (Index: {labor_index})") if scope_change_impact > 0: causes.append(f"Scope Change Impact ({scope_change_impact}%)") while len(causes) < 3: causes.append("N/A") top_causes = ", ".join(causes) # Generate alert deviation = ((forecast_cost_inr - planned_cost_inr) / planned_cost_inr * 100) if planned_cost_inr > 0 else 0 alert_message = "Alert: Forecasted cost exceeds planned cost by more than 10%. Notify finance and engineering teams." if deviation > 10 else "No alert triggered." alert_style = "background-color: #ffcccc; padding: 10px; border: 1px solid red; border-radius: 5px;" if deviation > 10 else "background-color: #ccffcc; padding: 10px; border: 1px solid green; border-radius: 5px;" # Generate visualizations bar_chart_image, bar_chart_image_pdf = generate_bar_plot(planned_cost_inr, actual_spend_inr, forecast_cost_inr) pie_chart_data = generate_pie_chart_data(cost_deviation_factor, material_cost_factor, labor_cost_factor, scope_change_factor) gauge_chart_data = generate_gauge_chart(risk_percentage, category) # Generate reports pdf_file = generate_pdf(planned_cost_inr, actual_spend_inr, forecast_cost_inr, total_risk, risk_percentage, insights, status, top_causes, category, project_phase, material_cost_index, labor_index, scope_change_impact, alert_message, indices_validation, bar_chart_image_pdf) excel_file = generate_excel(planned_cost_inr, actual_spend_inr, forecast_cost_inr, total_risk, risk_percentage, insights, status, top_causes, category, project_phase, material_cost_index, labor_index, scope_change_impact, alert_message, indices_validation) # Store results in Salesforce if project_id: sf_result = store_results_in_salesforce(project_id, planned_cost_inr, actual_spend_inr, forecast_cost_inr, risk_percentage, insights, status, top_causes, category, project_phase, pdf_file) else: sf_result = "No project ID provided; results not stored in Salesforce." # Format output risk_level = "High" if total_risk == 1 else "Low" output_text = ( f"Risk Summary\n" f"----------------------------------------\n" f"Risk Level: {risk_level}\n" f"Risk Percentage: {risk_percentage}%\n" f"Status: {status}\n" f"Insights: {insights} due to {top_causes.lower()}.\n\n" f"Project Details\n" f"----------------------------------------\n" f"Category: {category}\n" f"Project Phase: {project_phase}\n" f"Material Cost Index: {material_cost_index}\n" f"Labor Index: {labor_index}\n" f"Indices Validation: {indices_validation}\n" f"Scope Change Impact: {scope_change_impact}%\n\n" f"Forecast Chart\n" f"----------------------------------------\n" f"[Bar chart displayed below]\n\n" f"Detailed Metrics\n" f"----------------------------------------\n" f"Total Risk: {total_risk}\n" f"Planned Cost: {format_indian_number(planned_cost_inr)}\n" f"Actual Spend: {format_indian_number(actual_spend_inr)}\n" f"Forecasted Cost: {format_indian_number(forecast_cost_inr)}\n" f"Top Causes: {top_causes}\n" f"Salesforce Storage: {sf_result}\n" f"Local PDF Report: [Download link below]\n" f"Excel Report: [Download link below]" ) return output_text, bar_chart_image, pie_chart_data, gauge_chart_data, pdf_file, excel_file, f"