| from flask import Flask, request, Response, jsonify, render_template |
| from pydantic import BaseModel |
| from reportlab.lib.pagesizes import letter |
| from reportlab.pdfgen import canvas |
| import base64 |
| import os |
| import logging |
| import traceback |
| import requests |
| from datetime import datetime |
| from simple_salesforce import Salesforce |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| app = Flask(__name__) |
|
|
| |
| SF_USERNAME = "scores@app.com" |
| SF_PASSWORD = "Internal@1" |
| SF_SECURITY_TOKEN = "NbUKcTx45azba5HEdntE9YAh" |
| SF_DOMAIN = "login" |
|
|
| |
| HUGGING_FACE_API_URL = "https://api-inference.huggingface.co/models/your-model-name" |
| HUGGING_FACE_API_TOKEN = "your-hugging-face-api-token" |
|
|
| |
| try: |
| sf = Salesforce( |
| username=SF_USERNAME, |
| password=SF_PASSWORD, |
| security_token=SF_SECURITY_TOKEN, |
| domain=SF_DOMAIN |
| ) |
| logger.info("Successfully connected to Salesforce") |
| except Exception as e: |
| logger.error(f"Failed to connect to Salesforce: {str(e)}") |
| raise ValueError("Failed to connect to Salesforce") |
|
|
| |
| class VendorLog(BaseModel): |
| vendorLogId: str |
| vendorId: str |
| workDetails: str |
| qualityReport: str |
| incidentLog: str |
| workCompletionDate: str |
| actualCompletionDate: str |
| vendorLogName: str |
| delayDays: int |
| project: str |
|
|
| |
| vendor_logs = [] |
|
|
| def fetch_vendor_logs_from_salesforce(): |
| """Fetch vendor logs from Salesforce to send to Hugging Face.""" |
| try: |
| query = """ |
| SELECT Id, Vendor_Log_Id__c, Vendor_Id__c, Work_Details__c, Quality_Report__c, |
| Incident_Log__c, Work_Completion_Date__c, Actual_Completion_Date__c, |
| Vendor_Log_Name__c, Delay_Days__c, Project__c |
| FROM Vendor_Log__c |
| WHERE CreatedDate = THIS_MONTH |
| """ |
| result = sf.query(query) |
| logs = [] |
| for record in result['records']: |
| log = VendorLog( |
| vendorLogId=record['Vendor_Log_Id__c'], |
| vendorId=record['Vendor_Id__c'], |
| workDetails=record['Work_Details__c'], |
| qualityReport=record['Quality_Report__c'], |
| incidentLog=record['Incident_Log__c'], |
| workCompletionDate=record['Work_Completion_Date__c'], |
| actualCompletionDate=record['Actual_Completion_Date__c'], |
| vendorLogName=record['Vendor_Log_Name__c'], |
| delayDays=record['Delay_Days__c'], |
| project=record['Project__c'] |
| ) |
| logs.append(log) |
| return logs |
| except Exception as e: |
| logger.error(f"Error fetching vendor logs from Salesforce: {str(e)}") |
| raise |
|
|
| def calculate_scores_with_hugging_face(log: VendorLog): |
| """Send data to Hugging Face and get scores (mocked for now).""" |
| try: |
| |
| payload = { |
| "workDetails": log.workDetails, |
| "qualityReport": log.qualityReport, |
| "incidentLog": log.incidentLog, |
| "delayDays": log.delayDays |
| } |
| |
| |
| |
| |
| |
|
|
| |
| scores = { |
| 'qualityScore': float(log.qualityReport.replace('% quality', '')), |
| 'timelinessScore': 100.0 if log.delayDays <= 0 else 80.0 if log.delayDays <= 3 else 60.0 if log.delayDays <= 7 else 40.0, |
| 'safetyScore': {'None': 100.0, 'Low': 80.0, 'Minor': 80.0, 'Medium': 50.0, 'High': 20.0}.get(log.incidentLog, 100.0), |
| 'communicationScore': 0.0, |
| 'finalScore': 0.0 |
| } |
| scores['communicationScore'] = (scores['qualityScore'] * 0.33 + scores['timelinessScore'] * 0.33 + scores['safetyScore'] * 0.33) |
| scores['finalScore'] = (scores['qualityScore'] + scores['timelinessScore'] + scores['safetyScore'] + scores['communicationScore']) / 4 |
|
|
| |
| for key in scores: |
| scores[key] = round(scores[key], 2) |
|
|
| return scores |
| except Exception as e: |
| logger.error(f"Error calculating scores with Hugging Face: {str(e)}") |
| raise |
|
|
| def get_feedback(score: float, metric: str) -> str: |
| try: |
| if score >= 90: |
| return "Excellent: Maintain this standard" |
| elif score >= 70: |
| return "Good: Keep up the good work" |
| elif score >= 50: |
| if metric == 'Timeliness': |
| return "Needs Improvement: Maintain schedules to complete tasks on time" |
| elif metric == 'Safety': |
| return "Needs Improvement: Implement stricter safety protocols" |
| elif metric == 'Quality': |
| return "Needs Improvement: Focus on improving work quality" |
| else: |
| return "Needs Improvement: Enhance coordination with project teams" |
| else: |
| if metric == 'Timeliness': |
| return "Poor: Significant delays detected" |
| elif metric == 'Safety': |
| return "Poor: Critical safety issues identified" |
| elif metric == 'Quality': |
| return "Poor: Quality standards not met" |
| else: |
| return "Poor: Communication issues detected" |
| except Exception as e: |
| logger.error(f"Error generating feedback: {str(e)}") |
| raise |
|
|
| def generate_pdf(vendor_id: str, vendor_log_name: str, scores: dict): |
| try: |
| filename = f'report_{vendor_id}.pdf' |
| c = canvas.Canvas(filename, pagesize=letter) |
| c.setFont('Helvetica', 12) |
| c.drawString(100, 750, 'Subcontractor Performance Report') |
| c.drawString(100, 730, f'Vendor ID: {vendor_id}') |
| c.drawString(100, 710, f'Vendor Log Name: {vendor_log_name}') |
| c.drawString(100, 690, f'Quality Score: {scores["qualityScore"]}% ({get_feedback(scores["qualityScore"], "Quality")})') |
| c.drawString(100, 670, f'Timeliness Score: {scores["timelinessScore"]}% ({get_feedback(scores["timelinessScore"], "Timeliness")})') |
| c.drawString(100, 650, f'Safety Score: {scores["safetyScore"]}% ({get_feedback(scores["safetyScore"], "Safety")})') |
| c.drawString(100, 630, f'Communication Score: {scores["communicationScore"]}% ({get_feedback(scores["communicationScore"], "Communication")})') |
| c.drawString(100, 610, f'Final Score: {scores["finalScore"]}%') |
| c.save() |
|
|
| with open(filename, 'rb') as f: |
| pdf_content = f.read() |
| os.remove(filename) |
| return pdf_content |
| except Exception as e: |
| logger.error(f"Error generating PDF: {str(e)}") |
| raise |
|
|
| def determine_alert_flag(final_score: float, all_logs: list): |
| try: |
| if not all_logs: |
| return False |
| if final_score < 50: |
| return True |
| lowest_score = min([log['scores']['finalScore'] for log in all_logs]) |
| return final_score == lowest_score |
| except Exception as e: |
| logger.error(f"Error determining alert flag: {str(e)}") |
| raise |
|
|
| @app.route('/score', methods=['POST']) |
| def score_vendor(): |
| try: |
| |
| authorization = request.headers.get('Authorization') |
| if not authorization: |
| return jsonify({'error': 'Authorization header missing'}), 401 |
|
|
| |
| data = request.get_json() |
| if not data: |
| return jsonify({'error': 'Invalid request data'}), 400 |
|
|
| |
| log = VendorLog(**data) |
| logger.info(f"Received Vendor Log: {log}") |
|
|
| |
| if not sf.session_id: |
| return jsonify({'error': 'Salesforce session invalid'}), 401 |
|
|
| |
| scores = calculate_scores_with_hugging_face(log) |
| |
| |
| pdf_content = generate_pdf(log.vendorId, log.vendorLogName, scores) |
| pdf_base64 = base64.b64encode(pdf_content).decode('utf-8') |
|
|
| |
| alert_flag = determine_alert_flag(scores['finalScore'], vendor_logs) |
|
|
| |
| vendor_logs.append({ |
| 'vendorLogId': log.vendorLogId, |
| 'vendorId': log.vendorId, |
| 'vendorLogName': log.vendorLogName, |
| 'workDetails': log.workDetails, |
| 'qualityReport': log.qualityReport, |
| 'incidentLog': log.incidentLog, |
| 'workCompletionDate': log.workCompletionDate, |
| 'actualCompletionDate': log.actualCompletionDate, |
| 'delayDays': log.delayDays, |
| 'project': log.project, |
| 'scores': scores, |
| 'extracted': True |
| }) |
|
|
| |
| try: |
| sf.Subcontractor_Performance_Score__c.create({ |
| 'Vendor_ID__c': log.vendorId, |
| 'Month__c': datetime.now().strftime('%Y-%m-%d'), |
| 'Quality_Score__c': scores['qualityScore'], |
| 'Timeliness_Score__c': scores['timelinessScore'], |
| 'Safety_Score__c': scores['safetyScore'], |
| 'Communication_Score__c': scores['communicationScore'], |
| 'Final_Score__c': scores['finalScore'], |
| 'Certification_URL__c': pdf_base64, |
| 'Alert_Flag__c': alert_flag |
| }) |
| logger.info(f"Successfully saved scores to Salesforce for Vendor Log: {log.vendorLogId}") |
| except Exception as e: |
| logger.error(f"Error saving to Salesforce: {str(e)}") |
| |
|
|
| |
| return jsonify({ |
| 'vendorLogId': log.vendorLogId, |
| 'vendorId': log.vendorId, |
| 'vendorLogName': log.vendorLogName, |
| 'qualityScore': scores['qualityScore'], |
| 'timelinessScore': scores['timelinessScore'], |
| 'safetyScore': scores['safetyScore'], |
| 'communicationScore': scores['communicationScore'], |
| 'finalScore': scores['finalScore'], |
| 'pdfContent': pdf_base64, |
| 'alert': alert_flag |
| }), 200 |
| except Exception as e: |
| logger.error(f"Error in /score endpoint: {str(e)}") |
| return jsonify({'error': f"Error processing vendor log: {str(e)}"}), 500 |
|
|
| @app.route('/', methods=['GET']) |
| def get_dashboard(): |
| try: |
| |
| query = """ |
| SELECT Vendor_ID__c, Month__c, Quality_Score__c, Timeliness_Score__c, |
| Safety_Score__c, Communication_Score__c, Final_Score__c, Certification_URL__c, |
| Alert_Flag__c |
| FROM Subcontractor_Performance_Score__c |
| WHERE Month__c = '2025-05-01' |
| """ |
| result = sf.query(query) |
| vendor_logs.clear() |
| for record in result['records']: |
| vendor_logs.append({ |
| 'vendorId': record['Vendor_ID__c'], |
| 'vendorLogName': f"Vendor {record['Vendor_ID__c']}", |
| 'scores': { |
| 'qualityScore': record['Quality_Score__c'], |
| 'timelinessScore': record['Timeliness_Score__c'], |
| 'safetyScore': record['Safety_Score__c'], |
| 'communicationScore': record['Communication_Score__c'], |
| 'finalScore': record['Final_Score__c'] |
| }, |
| 'extracted': True |
| }) |
|
|
| |
| total_vendors = len(vendor_logs) |
| performance_alerts = sum(1 for log in vendor_logs if determine_alert_flag(log['scores']['finalScore'], vendor_logs)) |
| top_performers = sum(1 for log in vendor_logs if log['scores']['finalScore'] >= 90) |
| improving_vendors = sum(1 for log in vendor_logs if log['scores']['finalScore'] >= 70) |
|
|
| |
| sorted_logs = sorted(vendor_logs, key=lambda x: x['scores']['finalScore'], reverse=True) |
| top_logs_data = sorted_logs[:5] |
| top_performing_logs = sorted_logs[:4] |
| alert_logs = [log for log in vendor_logs if determine_alert_flag(log['scores']['finalScore'], vendor_logs)][:3] |
|
|
| |
| top_logs = [] |
| for idx, log in enumerate(top_logs_data, 1): |
| scores = log['scores'] |
| alert_flag = determine_alert_flag(scores['finalScore'], vendor_logs) |
| trend = "trend-up" if scores['finalScore'] >= 90 else "trend-down" if scores['finalScore'] < 70 else "trend-flat" |
| trend_symbol = "↗" if trend == "trend-up" else "↘" if trend == "trend-down" else "—" |
| status_class = "status-good" if not alert_flag else "status-alert" |
| status_text = "Good" if not alert_flag else "Alert" |
| top_logs.append({ |
| 'idx': idx, |
| 'vendorLogName': log['vendorLogName'], |
| 'scores': scores, |
| 'trend': trend, |
| 'trend_symbol': trend_symbol, |
| 'status_class': status_class, |
| 'status_text': status_text |
| }) |
|
|
| |
| return render_template('dashboard.html', |
| total_vendors=total_vendors, |
| performance_alerts=performance_alerts, |
| percent_alerts=round(performance_alerts/total_vendors*100, 1) if total_vendors else 0, |
| top_performers=top_performers, |
| percent_top=round(top_performers/total_vendors*100, 1) if total_vendors else 0, |
| improving_vendors=improving_vendors, |
| percent_improving=round(improving_vendors/total_vendors*100, 1) if total_vendors else 0, |
| top_logs=top_logs, |
| alert_logs=alert_logs, |
| top_performing_logs=top_performing_logs, |
| vendor_logs=vendor_logs, |
| sorted_logs=sorted_logs |
| ) |
| except Exception as e: |
| error_trace = traceback.format_exc() |
| logger.error(f"Error in / endpoint: {str(e)}\nStack trace:\n{error_trace}") |
| return jsonify({'error': f"Error generating dashboard: {str(e)}"}), 500 |
|
|
| @app.route('/document', methods=['GET']) |
| def get_document(): |
| try: |
| return render_template('document.html') |
| except Exception as e: |
| error_trace = traceback.format_exc() |
| logger.error(f"Error in /document endpoint: {str(e)}\nStack trace:\n{error_trace}") |
| return jsonify({'error': f"Error generating document: {str(e)}"}), 500 |
|
|
| if __name__ == "__main__": |
| app.run(host="0.0.0.0", port=7860, debug=True) |