Spaces:
Sleeping
Sleeping
| from flask import Flask, request, render_template, jsonify, Response | |
| from pydantic import BaseModel, ValidationError | |
| from reportlab.lib.pagesizes import letter | |
| from reportlab.pdfgen import canvas | |
| import base64 | |
| import os | |
| import logging | |
| from datetime import datetime | |
| from simple_salesforce import Salesforce | |
| import json | |
| # Set up logging to capture errors and debug information | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| app = Flask(__name__) | |
| # Salesforce credentials | |
| SF_USERNAME = os.getenv("SF_USERNAME", "scores@app.com") | |
| SF_PASSWORD = os.getenv("SF_PASSWORD", "Internal@1") | |
| SF_SECURITY_TOKEN = os.getenv("SF_SECURITY_TOKEN", "NbUKcTx45azba5HEdntE9YAh") | |
| SF_DOMAIN = os.getenv("SF_DOMAIN", "login") | |
| # Verify API key is set | |
| API_KEY = os.getenv("HUGGINGFACE_API_KEY") | |
| if not API_KEY: | |
| logger.error("HUGGINGFACE_API_KEY environment variable not set") | |
| raise ValueError("HUGGINGFACE_API_KEY environment variable not set") | |
| # Connect to Salesforce | |
| try: | |
| sf = Salesforce( | |
| username=SF_USERNAME, | |
| password=SF_PASSWORD, | |
| security_token=SF_SECURITY_TOKEN, | |
| domain=SF_DOMAIN | |
| ) | |
| logger.info("Successfully connected to Salesforce") | |
| except Exception as e: | |
| logger.error(f"Failed to connect to Salesforce: {str(e)}") | |
| raise | |
| # VendorLog model to match Salesforce data | |
| class VendorLog(BaseModel): | |
| vendorLogId: str | |
| vendorId: str | |
| vendorRecordId: str | |
| workDetails: str | |
| qualityReport: str | |
| incidentLog: str | |
| workCompletionDate: str | |
| actualCompletionDate: str | |
| vendorLogName: str | |
| delayDays: int | |
| project: str | |
| # Store vendor logs for display | |
| vendor_logs = [] | |
| def fetch_vendor_logs_from_salesforce(): | |
| try: | |
| query = """ | |
| SELECT Id, Name, Vendor__c, Work_Completion_Percentage__c, Quality_Percentage__c, Incident_Severity__c, | |
| Work_Completion_Date__c, Actual_Completion_Date__c, Delay_Days__c, Project__c | |
| FROM Vendor_Log__c | |
| """ | |
| result = sf.query_all(query) | |
| logs = [] | |
| for record in result['records']: | |
| if not record['Vendor__c']: | |
| logger.warning(f"Skipping Vendor_Log__c record with ID {record['Id']} due to missing Vendor__c") | |
| continue | |
| log = VendorLog( | |
| vendorLogId=record['Id'] or "Unknown", | |
| vendorId=record['Name'] or "Unknown", | |
| vendorRecordId=record['Vendor__c'], | |
| workDetails=str(record['Work_Completion_Percentage__c'] or "0.0"), | |
| qualityReport=str(record['Quality_Percentage__c'] or "0.0"), | |
| incidentLog=record['Incident_Severity__c'] or "None", | |
| workCompletionDate=record['Work_Completion_Date__c'] or "N/A", | |
| actualCompletionDate=record['Actual_Completion_Date__c'] or "N/A", | |
| vendorLogName=record['Name'] or "Unknown", | |
| delayDays=int(record['Delay_Days__c'] or 0), | |
| project=record['Project__c'] or "Unknown" | |
| ) | |
| logs.append(log) | |
| return logs | |
| except Exception as e: | |
| logger.error(f"Error fetching vendor logs from Salesforce: {str(e)}") | |
| raise | |
| def calculate_scores(log: VendorLog): | |
| try: | |
| # Extract numeric values by removing the suffixes | |
| work_completion_percentage = float(log.workDetails.replace('% completed', '')) | |
| quality_percentage = float(log.qualityReport.replace('% quality', '')) | |
| # Quality Score: Directly use the quality percentage | |
| quality_score = quality_percentage | |
| # Timeliness Score: Based on delay days | |
| timeliness_score = 100.0 if log.delayDays <= 0 else 80.0 if log.delayDays <= 3 else 60.0 if log.delayDays <= 7 else 40.0 | |
| # Safety Score: Based on incident severity | |
| severity_map = {'None': 100.0, 'Low': 80.0, 'Minor': 80.0, 'Medium': 50.0, 'High': 20.0} | |
| safety_score = severity_map.get(log.incidentLog, 100.0) | |
| # Communication Score: Weighted average of other scores | |
| communication_score = (quality_score * 0.33 + timeliness_score * 0.33 + safety_score * 0.33) | |
| return { | |
| 'qualityScore': round(quality_score, 2), | |
| 'timelinessScore': round(timeliness_score, 2), | |
| 'safetyScore': round(safety_score, 2), | |
| 'communicationScore': round(communication_score, 2) | |
| } | |
| except ValueError as e: | |
| logger.error(f"Error parsing workDetails or qualityReport in calculate_scores: {str(e)}") | |
| raise ValueError(f"Invalid data format for workDetails ({log.workDetails}) or qualityReport ({log.qualityReport})") | |
| except Exception as e: | |
| logger.error(f"Error calculating scores: {str(e)}") | |
| raise | |
| def get_feedback(score: float, metric: str) -> str: | |
| try: | |
| if score >= 90: | |
| return "Excellent: Maintain this standard" | |
| elif score >= 70: | |
| return "Good: Keep up the good work" | |
| elif score >= 50: | |
| if metric == 'Timeliness': | |
| return "Needs Improvement: Maintain schedules to complete tasks on time" | |
| elif metric == 'Safety': | |
| return "Needs Improvement: Implement stricter safety protocols" | |
| elif metric == 'Quality': | |
| return "Needs Improvement: Focus on improving work quality" | |
| else: | |
| return "Needs Improvement: Enhance coordination with project teams" | |
| else: | |
| if metric == 'Timeliness': | |
| return "Poor: Significant delays detected" | |
| elif metric == 'Safety': | |
| return "Poor: Critical safety issues identified" | |
| elif metric == 'Quality': | |
| return "Poor: Quality standards not met" | |
| else: | |
| return "Poor: Communication issues detected" | |
| except Exception as e: | |
| logger.error(f"Error generating feedback: {str(e)}") | |
| raise | |
| def generate_pdf(vendor_id: str, vendor_log_name: str, scores: dict): | |
| try: | |
| filename = f'report_{vendor_id}.pdf' | |
| c = canvas.Canvas(filename, pagesize=letter) | |
| c.setFont('Helvetica', 12) | |
| c.drawString(100, 750, 'Subcontractor Performance Report') | |
| c.drawString(100, 730, f'Vendor ID: {vendor_id}') | |
| c.drawString(100, 710, f'Vendor Log Name: {vendor_log_name}') | |
| c.drawString(100, 690, f'Quality Score: {scores["qualityScore"]}% ({get_feedback(scores["qualityScore"], "Quality")})') | |
| c.drawString(100, 670, f'Timeliness Score: {scores["timelinessScore"]}% ({get_feedback(scores["timelinessScore"], "Timeliness")})') | |
| c.drawString(100, 650, f'Safety Score: {scores["safetyScore"]}% ({get_feedback(scores["safetyScore"], "Safety")})') | |
| c.drawString(100, 630, f'Communication Score: {scores["communicationScore"]}% ({get_feedback(scores["communicationScore"], "Communication")})') | |
| c.save() | |
| with open(filename, 'rb') as f: | |
| pdf_content = f.read() | |
| os.remove(filename) | |
| return pdf_content | |
| except Exception as e: | |
| logger.error(f"Error generating PDF: {str(e)}") | |
| raise | |
| def determine_alert_flag(scores: dict, all_logs: list): | |
| try: | |
| if not all_logs: | |
| return False | |
| avg_score = (scores['qualityScore'] + scores['timelinessScore'] + scores['safetyScore'] + scores['communicationScore']) / 4 | |
| if avg_score < 50: | |
| return True | |
| lowest_avg = min([(log['scores']['qualityScore'] + log['scores']['timelinessScore'] + log['scores']['safetyScore'] + log['scores']['communicationScore']) / 4 for log in all_logs]) | |
| return avg_score == lowest_avg | |
| except Exception as e: | |
| logger.error(f"Error determining alert flag: {str(e)}") | |
| raise | |
| def store_scores_in_salesforce(log: VendorLog, scores: dict, pdf_content: bytes, alert_flag: bool): | |
| try: | |
| # Step 1: Create the Subcontractor_Performance_Score__c record | |
| score_record = sf.Subcontractor_Performance_Score__c.create({ | |
| 'Vendor_Log__c': log.vendorLogId, | |
| 'Vendor__c': log.vendorRecordId, | |
| 'Quality_Score__c': scores['qualityScore'], | |
| 'Timeliness_Score__c': scores['timelinessScore'], | |
| 'Safety_Score__c': scores['safetyScore'], | |
| 'Communication_Score__c': scores['communicationScore'], | |
| 'Alert_Flag__c': alert_flag | |
| }) | |
| score_record_id = score_record['id'] | |
| logger.info(f"Successfully created Subcontractor_Performance_Score__c record with ID: {score_record_id}") | |
| # Step 2: Upload the PDF as a ContentVersion | |
| pdf_base64 = base64.b64encode(pdf_content).decode('utf-8') | |
| content_version = sf.ContentVersion.create({ | |
| 'Title': f'Performance_Report_{log.vendorId}', | |
| 'PathOnClient': f'report_{log.vendorId}.pdf', | |
| 'VersionData': pdf_base64, | |
| 'FirstPublishLocationId': score_record_id | |
| }) | |
| logger.info(f"Successfully uploaded PDF as ContentVersion for Vendor Log ID: {log.vendorLogId}") | |
| # Step 3: Get the ContentDocumentId and construct a URL to the file | |
| content_version_id = content_version['id'] | |
| content_version_record = sf.query(f"SELECT ContentDocumentId FROM ContentVersion WHERE Id = '{content_version_id}'") | |
| content_document_id = content_version_record['records'][0]['ContentDocumentId'] | |
| pdf_url = f"https://{sf.sf_instance}/sfc/servlet.shepherd/document/download/{content_document_id}" | |
| # Step 4: Update the Subcontractor_Performance_Score__c record with the PDF URL | |
| sf.Subcontractor_Performance_Score__c.update(score_record_id, { | |
| 'PDF_Link__c': pdf_url | |
| }) | |
| logger.info(f"Successfully updated Subcontractor_Performance_Score__c record with PDF URL: {pdf_url}") | |
| except Exception as e: | |
| logger.error(f"Error storing scores in Salesforce: {str(e)}") | |
| raise | |
| def score_vendor(): | |
| try: | |
| # Get the Authorization header | |
| auth_header = request.headers.get('Authorization') | |
| if auth_header != f'Bearer {API_KEY}': | |
| return jsonify({'detail': 'Invalid API key'}), 401 | |
| # Parse and validate the request body | |
| if not request.is_json: | |
| return jsonify({'detail': 'Request body must be JSON'}), 400 | |
| data = request.get_json() | |
| try: | |
| log = VendorLog(**data) | |
| except ValidationError as e: | |
| logger.error(f"Validation error in /score endpoint: {str(e)}") | |
| return jsonify({'detail': f"Invalid request data: {str(e)}"}), 400 | |
| logger.info(f"Received Vendor Log: {log}") | |
| # Calculate scores, generate PDF, and store in Salesforce | |
| scores = calculate_scores(log) | |
| pdf_content = generate_pdf(log.vendorId, log.vendorLogName, scores) | |
| pdf_base64 = base64.b64encode(pdf_content).decode('utf-8') | |
| alert_flag = determine_alert_flag(scores, vendor_logs) | |
| store_scores_in_salesforce(log, scores, pdf_content, alert_flag) | |
| # Store the log in memory | |
| vendor_logs.append({ | |
| 'vendorLogId': log.vendorLogId, | |
| 'vendorId': log.vendorId, | |
| 'vendorLogName': log.vendorLogName, | |
| 'workDetails': log.workDetails, | |
| 'qualityReport': log.qualityReport, | |
| 'incidentLog': log.incidentLog, | |
| 'workCompletionDate': log.workCompletionDate, | |
| 'actualCompletionDate': log.actualCompletionDate, | |
| 'delayDays': log.delayDays, | |
| 'project': log.project, | |
| 'scores': scores, | |
| 'extracted': True | |
| }) | |
| return jsonify({ | |
| 'vendorLogId': log.vendorLogId, | |
| 'vendorId': log.vendorId, | |
| 'vendorLogName': log.vendorLogName, | |
| 'qualityScore': scores['qualityScore'], | |
| 'timelinessScore': scores['timelinessScore'], | |
| 'safetyScore': scores['safetyScore'], | |
| 'communicationScore': scores['communicationScore'], | |
| 'pdfContent': pdf_base64, | |
| 'alert': alert_flag | |
| }) | |
| except ValueError as e: | |
| logger.error(f"ValueError in /score endpoint: {str(e)}") | |
| return jsonify({'detail': str(e)}), 400 | |
| except Exception as e: | |
| logger.error(f"Error in /score endpoint: {str(e)}") | |
| return jsonify({'detail': f"Error processing vendor log: {str(e)}"}), 500 | |
| def get_dashboard(): | |
| try: | |
| global vendor_logs | |
| fetched_logs = fetch_vendor_logs_from_salesforce() | |
| for log in fetched_logs: | |
| if not any(existing_log['vendorLogId'] == log.vendorLogId for existing_log in vendor_logs): | |
| scores = calculate_scores(log) | |
| pdf_content = generate_pdf(log.vendorId, log.vendorLogName, scores) | |
| alert_flag = determine_alert_flag(scores, vendor_logs) | |
| store_scores_in_salesforce(log, scores, pdf_content, alert_flag) | |
| vendor_logs.append({ | |
| 'vendorLogId': log.vendorLogId, | |
| 'vendorId': log.vendorId, | |
| 'vendorLogName': log.vendorLogName, | |
| 'workDetails': log.workDetails, | |
| 'qualityReport': log.qualityReport, | |
| 'incidentLog': log.incidentLog, | |
| 'workCompletionDate': log.workCompletionDate, | |
| 'actualCompletionDate': log.actualCompletionDate, | |
| 'delayDays': log.delayDays, | |
| 'project': log.project, | |
| 'scores': scores, | |
| 'extracted': True | |
| }) | |
| # Current timestamp for display | |
| last_updated = "03:44 PM IST on Saturday, May 17, 2025" | |
| return render_template('index.html', vendor_logs=vendor_logs, last_updated=last_updated) | |
| except Exception as e: | |
| logger.error(f"Error in / endpoint: {str(e)}") | |
| return jsonify({'detail': f"Error generating dashboard: {str(e)}"}), 500 | |
| def generate_scores(): | |
| try: | |
| global vendor_logs | |
| fetched_logs = fetch_vendor_logs_from_salesforce() | |
| vendor_logs = [] | |
| for log in fetched_logs: | |
| scores = calculate_scores(log) | |
| pdf_content = generate_pdf(log.vendorId, log.vendorLogName, scores) | |
| alert_flag = determine_alert_flag(scores, vendor_logs) | |
| store_scores_in_salesforce(log, scores, pdf_content, alert_flag) | |
| vendor_logs.append({ | |
| 'vendorLogId': log.vendorLogId, | |
| 'vendorId': log.vendorId, | |
| 'vendorLogName': log.vendorLogName, | |
| 'workDetails': log.workDetails, | |
| 'qualityReport': log.qualityReport, | |
| 'incidentLog': log.incidentLog, | |
| 'workCompletionDate': log.workCompletionDate, | |
| 'actualCompletionDate': log.actualCompletionDate, | |
| 'delayDays': log.delayDays, | |
| 'project': log.project, | |
| 'scores': scores, | |
| 'extracted': True | |
| }) | |
| return jsonify({'status': 'success'}) | |
| except Exception as e: | |
| logger.error(f"Error in /generate endpoint: {str(e)}") | |
| return jsonify({'detail': f"Error generating scores: {str(e)}"}), 500 | |
| if __name__ == "__main__": | |
| app.run(host="0.0.0.0", port=7860, debug=True) |