Spaces:
Sleeping
Sleeping
| import os | |
| import logging | |
| from datetime import datetime | |
| from flask import Flask, request, render_template, jsonify | |
| from pydantic import BaseModel, ValidationError | |
| from reportlab.lib.pagesizes import letter | |
| from reportlab.pdfgen import canvas | |
| import base64 | |
| from io import BytesIO | |
| from simple_salesforce import Salesforce | |
| from sendgrid import SendGridAPIClient | |
| from sendgrid.helpers.mail import Mail | |
| from dotenv import load_dotenv | |
| # Load environment variables from .env file | |
| load_dotenv() | |
| # Set up logging to capture errors and debug information | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| app = Flask(__name__) | |
| # Load environment variables | |
| SF_USERNAME = os.getenv("SF_USERNAME", "scores@app.com") | |
| SF_PASSWORD = os.getenv("SF_PASSWORD", "Internal@1") | |
| SF_SECURITY_TOKEN = os.getenv("SF_SECURITY_TOKEN", "NbUKcTx45azba5HEdntE9YAh") | |
| SF_DOMAIN = os.getenv("SF_DOMAIN", "login") | |
| SENDGRID_API_KEY = os.getenv("SENDGRID_API_KEY") | |
| # Validate required environment variables (exclude SENDGRID_API_KEY) | |
| required_vars = { | |
| 'SF_USERNAME': SF_USERNAME, | |
| 'SF_PASSWORD': SF_PASSWORD, | |
| 'SF_SECURITY_TOKEN': SF_SECURITY_TOKEN | |
| } | |
| missing_vars = [var for var, value in required_vars.items() if not value] | |
| if missing_vars: | |
| logger.error(f"Missing required environment variables: {', '.join(missing_vars)}") | |
| raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}") | |
| if not SENDGRID_API_KEY: | |
| logger.warning("SENDGRID_API_KEY not set. Email alerts will be disabled.") | |
| # Connect to Salesforce | |
| try: | |
| sf = Salesforce( | |
| username=SF_USERNAME, | |
| password=SF_PASSWORD, | |
| security_token=SF_SECURITY_TOKEN, | |
| domain=SF_DOMAIN, | |
| version='63.0' | |
| ) | |
| logger.info("Successfully connected to Salesforce") | |
| except Exception as e: | |
| logger.error(f"Failed to connect to Salesforce: {str(e)}") | |
| sf = None | |
| # VendorLog model to match Salesforce data | |
| class VendorLog(BaseModel): | |
| vendorLogId: str | |
| vendorId: str | |
| vendorRecordId: str | |
| workDetails: str | |
| qualityReport: str | |
| incidentLog: str | |
| workCompletionDate: str | |
| actualCompletionDate: str | |
| vendorLogName: str | |
| delayDays: int | |
| project: str | |
| communicationFrequency: int | |
| # Store vendor logs for display | |
| vendor_logs = [] | |
| def fetch_vendor_logs(): | |
| """Fetch the latest Vendor_Log__c records for each vendor based on LastModifiedDate.""" | |
| if not sf: | |
| logger.error("Salesforce connection not available") | |
| return [] | |
| try: | |
| # Fetch all vendors | |
| vendors = sf.query_all("SELECT Id, Vendor_Name__c, Contact_Email__c FROM Vendor__c")['records'] | |
| vendor_data = [] | |
| for vendor in vendors: | |
| vendor_id = vendor['Id'] | |
| # Validate vendor_id format (should be a valid Salesforce ID, 15 or 18 characters) | |
| if not (len(vendor_id) == 15 or len(vendor_id) == 18): | |
| logger.warning(f"Invalid Vendor__c ID format: {vendor_id}") | |
| continue | |
| # Fetch the latest Vendor_Log__c record for this vendor | |
| query = f""" | |
| SELECT Id, Name, Vendor__c, Project__c, Work_Completion_Percentage__c, Quality_Percentage__c, | |
| Incident_Severity__c, Delay_Days__c, Communication_Frequency__c, | |
| Work_Completion_Date__c, Actual_Completion_Date__c | |
| FROM Vendor_Log__c | |
| WHERE Vendor__c = '{vendor_id}' | |
| ORDER BY LastModifiedDate DESC | |
| LIMIT 1 | |
| """ | |
| log = sf.query(query)['records'] | |
| if log: | |
| record = log[0] | |
| if not record['Vendor__c']: | |
| logger.warning(f"Skipping Vendor_Log__c record with ID {record['Id']} due to missing Vendor__c") | |
| continue | |
| vendor_log = VendorLog( | |
| vendorLogId=record['Id'] or "Unknown", | |
| vendorId=record['Name'] or "Unknown", | |
| vendorRecordId=record['Vendor__c'], | |
| workDetails=str(record['Work_Completion_Percentage__c'] or "0.0"), | |
| qualityReport=str(record['Quality_Percentage__c'] or "0.0"), | |
| incidentLog=record['Incident_Severity__c'] or "None", | |
| workCompletionDate=record['Work_Completion_Date__c'] or "N/A", | |
| actualCompletionDate=record['Actual_Completion_Date__c'] or "N/A", | |
| vendorLogName=record['Name'] or "Unknown", | |
| delayDays=int(record['Delay_Days__c'] or 0), | |
| project=record['Project__c'] or "Unknown", | |
| communicationFrequency=int(record['Communication_Frequency__c'] or 0) | |
| ) | |
| vendor_data.append({ | |
| 'vendor_id': vendor_id, | |
| 'vendor_name': vendor['Vendor_Name__c'] or 'Unknown Vendor', | |
| 'contact_email': vendor['Contact_Email__c'], | |
| 'log': vendor_log | |
| }) | |
| else: | |
| logger.debug(f"No Vendor_Log__c found for Vendor__c: {vendor_id}") | |
| logger.info(f"Fetched {len(vendor_data)} vendor logs") | |
| return vendor_data | |
| except Exception as e: | |
| logger.error(f"Error fetching vendor logs: {str(e)}") | |
| return [] | |
| def calculate_scores(vendor_data): | |
| """Calculate performance scores for each vendor.""" | |
| results = [] | |
| for data in vendor_data: | |
| vendor_id = data['vendor_id'] | |
| vendor_name = data['vendor_name'] | |
| contact_email = data['contact_email'] | |
| log = data['log'] | |
| # Extract log details | |
| quality_percentage = float(log.qualityReport) | |
| delay_days = log.delayDays | |
| incident_severity = log.incidentLog | |
| communication_frequency = log.communicationFrequency | |
| # Timeliness Score | |
| timeliness_score = 100 # Base score for Delay_Days__c = 0 | |
| if delay_days > 0: | |
| timeliness_score -= delay_days * 5 # Decrease 5% per day late | |
| elif delay_days < 0: | |
| timeliness_score += (-delay_days) * 2 # Increase 2% per day early | |
| timeliness_score = max(0, min(timeliness_score, 120)) # Cap between 0 and 120 | |
| # Quality Score | |
| quality_score = quality_percentage | |
| # Safety Score | |
| if incident_severity == "None": | |
| safety_score = 100 # None: 100% | |
| else: | |
| safety_score = 90 if incident_severity == 'Low' else 70 if incident_severity == 'Medium' else 40 | |
| # Communication Score | |
| communication_score = 90 if communication_frequency >= 5 else 70 if communication_frequency >= 3 else 50 | |
| # Final Score (for display purposes) | |
| final_score = (timeliness_score + quality_score + safety_score + communication_score) / 4 | |
| # Alert Flag | |
| alert_flag = final_score < 40 | |
| results.append({ | |
| 'vendor_id': vendor_id, | |
| 'vendor_name': vendor_name, | |
| 'contact_email': contact_email, | |
| 'log': log, | |
| 'timeliness_score': round(timeliness_score, 2), | |
| 'quality_score': round(quality_score, 2), | |
| 'safety_score': round(safety_score, 2), | |
| 'communication_score': round(communication_score, 2), | |
| 'final_score': round(final_score, 2), | |
| 'alert_flag': alert_flag, | |
| 'delay_days': delay_days, | |
| 'incident_severity': incident_severity, | |
| 'communication_frequency': communication_frequency | |
| }) | |
| logger.info(f"Calculated scores for {len(results)} vendors") | |
| return results | |
| def generate_feedback(scores): | |
| """Generate creative feedback based on scores.""" | |
| final_score = scores['final_score'] | |
| timeliness_score = scores['timeliness_score'] | |
| quality_score = scores['quality_score'] | |
| safety_score = scores['safety_score'] | |
| communication_score = scores['communication_score'] | |
| feedback = f"Performance Review for {scores['vendor_name']} (Final Score: {final_score}%)\n\n" | |
| if final_score >= 80: | |
| feedback += "Outstanding Performance! Your dedication to excellence is evident across all metrics. " | |
| feedback += "We encourage you to maintain this high standard and continue to lead by example." | |
| elif final_score >= 60: | |
| feedback += "Good Effort! You are performing well overall, with opportunities for growth. " | |
| feedback += "Please focus on the areas highlighted below to enhance your performance further." | |
| else: | |
| feedback += "Action Required: Your performance is below the expected threshold. " | |
| feedback += "We are committed to supporting your improvement—please review the feedback below and let’s collaborate on next steps." | |
| feedback += "\n\nDetailed Feedback:\n" | |
| feedback += f"- Timeliness ({timeliness_score}%): " | |
| if timeliness_score >= 100: | |
| feedback += "Exceptional! Completing tasks ahead of schedule reflects your strong commitment to deadlines." | |
| elif timeliness_score >= 80: | |
| feedback += "Well done on maintaining timely deliveries." | |
| else: | |
| feedback += f"Delays of {scores['delay_days']} days have impacted your score. We recommend improving scheduling practices." | |
| feedback += f"\n- Quality ({quality_score}%): " | |
| if quality_score >= 80: | |
| feedback += "Excellent quality of work! Your deliverables consistently meet high standards." | |
| else: | |
| feedback += "There is room for improvement in quality. Focus on delivering consistent, high-quality results." | |
| feedback += f"\n- Safety ({safety_score}%): " | |
| if safety_score >= 80: | |
| feedback += "Strong safety record! Continue prioritizing safety in all operations." | |
| else: | |
| feedback += f"Incidents (Severity: {scores['incident_severity'] or 'None'}) have affected your score. Please strengthen safety protocols." | |
| feedback += f"\n- Communication ({communication_score}%): " | |
| if communication_score >= 80: | |
| feedback += "Effective communication! Your regular updates ensure smooth collaboration." | |
| else: | |
| feedback += f"Communication frequency ({scores['communication_frequency']} times) needs improvement. Aim for more consistent updates." | |
| return feedback | |
| def generate_pdf(vendor_scores): | |
| """Generate a PDF for each vendor and upload as a Salesforce attachment.""" | |
| pdf_urls = {} | |
| for scores in vendor_scores: | |
| vendor_id = scores['vendor_id'] | |
| vendor_name = scores['vendor_name'].replace(' ', '_') | |
| log = scores['log'] | |
| # Generate PDF in memory | |
| buffer = BytesIO() | |
| c = canvas.Canvas(buffer, pagesize=letter) | |
| c.setFont("Helvetica", 12) | |
| c.drawString(100, 750, f"Subcontractor Performance Report - {vendor_name}") | |
| c.drawString(100, 730, f"Generated on: {datetime.now().strftime('%Y-%m-%d')}") | |
| c.drawString(100, 700, f"Final Score: {scores['final_score']}%") | |
| c.drawString(100, 680, f"Timeliness Score: {scores['timeliness_score']}%") | |
| c.drawString(100, 660, f"Quality Score: {scores['quality_score']}%") | |
| c.drawString(100, 640, f"Safety Score: {scores['safety_score']}%") | |
| c.drawString(100, 620, f"Communication Score: {scores['communication_score']}%") | |
| c.drawString(100, 600, f"Alert Flag: {'Yes' if scores['alert_flag'] else 'No'}") | |
| # Add feedback | |
| feedback = generate_feedback(scores) | |
| y_position = 570 | |
| for line in feedback.split('\n'): | |
| c.drawString(100, y_position, line) | |
| y_position -= 15 | |
| if y_position < 50: | |
| c.showPage() | |
| y_position = 750 | |
| c.save() | |
| pdf_content = buffer.getvalue() | |
| buffer.close() | |
| # Upload PDF as an attachment to Subcontractor_Performance_Score__c | |
| try: | |
| month = datetime.today().replace(day=1).date().strftime('%Y-%m-%d') | |
| query = f""" | |
| SELECT Id | |
| FROM Subcontractor_Performance_Score__c | |
| WHERE Vendor__c = '{vendor_id}' | |
| AND Month__c = '{month}' | |
| LIMIT 1 | |
| """ | |
| score_record = sf.query(query)['records'] | |
| if not score_record: | |
| logger.error(f"No Subcontractor_Performance_Score__c record found for Vendor__c {vendor_id} for month {month}") | |
| continue | |
| parent_id = score_record[0]['Id'] | |
| attachment = { | |
| 'Name': f"{vendor_name}_Performance_Report_{month}.pdf", | |
| 'Body': base64.b64encode(pdf_content).decode('utf-8'), | |
| 'ParentId': parent_id | |
| } | |
| attachment_result = sf.Attachment.create(attachment) | |
| attachment_id = attachment_result['id'] | |
| # Construct the attachment URL using your Salesforce instance | |
| pdf_url = f"https://{sf.sf_instance}/servlet/servlet.FileDownload?file={attachment_id}" | |
| pdf_urls[vendor_id] = pdf_url | |
| logger.info(f"Uploaded PDF for Vendor__c {vendor_id}: {pdf_url}") | |
| except Exception as e: | |
| logger.error(f"Failed to upload PDF for Vendor__c {vendor_id}: {str(e)}") | |
| pdf_urls[vendor_id] = '' | |
| return pdf_urls | |
| def send_email_alert(vendor_scores, pdf_urls): | |
| """Send email alerts to vendors with low scores using SendGrid.""" | |
| if not SENDGRID_API_KEY: | |
| logger.info("Skipping email alerts due to missing SENDGRID_API_KEY") | |
| return | |
| sg = SendGridAPIClient(SENDGRID_API_KEY) | |
| for scores in vendor_scores: | |
| if not scores['alert_flag']: | |
| continue | |
| contact_email = scores['contact_email'] | |
| if not contact_email: | |
| logger.warning(f"No contact email provided for vendor {scores['vendor_name']}, skipping email alert") | |
| continue | |
| vendor_name = scores['vendor_name'] | |
| pdf_url = pdf_urls.get(scores['vendor_id'], '') | |
| message = Mail( | |
| from_email='performancescores.com@gmail.com', # Replace with your verified SendGrid sender email | |
| to_emails=contact_email, | |
| subject=f"Performance Alert for {vendor_name}", | |
| plain_text_content=f""" | |
| Dear {vendor_name}, | |
| Your recent performance score is {scores['final_score']}%, which is below the acceptable threshold of 40%. | |
| Please review your performance report for detailed feedback and take necessary actions to improve. | |
| Performance Report: {pdf_url} | |
| Best regards, | |
| Subcontractor Performance Team | |
| """ | |
| ) | |
| try: | |
| response = sg.send(message) | |
| logger.info(f"Email sent to {contact_email}, status: {response.status_code}") | |
| except Exception as e: | |
| logger.error(f"Failed to send email to {contact_email}: {str(e)}") | |
| def update_salesforce(vendor_scores, pdf_urls): | |
| """Update Subcontractor_Performance_Score__c records in Salesforce.""" | |
| if not sf: | |
| logger.error("Salesforce connection not available") | |
| return | |
| for scores in vendor_scores: | |
| vendor_id = scores['vendor_id'] | |
| month = datetime.today().replace(day=1).date() | |
| month_str = month.strftime('%Y-%m-%d') | |
| query = f""" | |
| SELECT Id | |
| FROM Subcontractor_Performance_Score__c | |
| WHERE Vendor__c = '{vendor_id}' | |
| AND Month__c = '{month_str}' | |
| LIMIT 1 | |
| """ | |
| try: | |
| existing = sf.query(query)['records'] | |
| except Exception as e: | |
| logger.error(f"Failed to query Subcontractor_Performance_Score__c for Vendor__c {vendor_id}: {str(e)}") | |
| continue | |
| # Prepare the record for create/update | |
| update_record = { | |
| 'Month__c': month_str, | |
| 'Quality_Score__c': scores['quality_score'], | |
| 'Timeliness_Score__c': scores['timeliness_score'], | |
| 'Safety_Score__c': scores['safety_score'], | |
| 'Communication_Score__c': scores['communication_score'], | |
| 'Alert_Flag__c': scores['alert_flag'], | |
| 'PDF_Link__c': pdf_urls.get(vendor_id, '') | |
| } | |
| create_record = update_record.copy() | |
| create_record['Vendor__c'] = vendor_id | |
| try: | |
| if existing: | |
| sf.Subcontractor_Performance_Score__c.update(existing[0]['Id'], update_record) | |
| logger.info(f"Updated Subcontractor_Performance_Score__c for Vendor__c {vendor_id}") | |
| else: | |
| sf.Subcontractor_Performance_Score__c.create(create_record) | |
| logger.info(f"Created Subcontractor_Performance_Score__c for Vendor__c {vendor_id}") | |
| except Exception as e: | |
| logger.error(f"Failed to update Salesforce for Vendor__c {vendor_id}: {str(e)}") | |
| def get_dashboard(): | |
| try: | |
| global vendor_logs | |
| vendor_data = fetch_vendor_logs() | |
| if not vendor_data: | |
| logger.warning("No vendor logs found") | |
| last_updated = "12:43 PM IST on Monday, May 19, 2025" | |
| return render_template('index.html', processed_logs=[], last_updated=last_updated) | |
| vendor_scores = calculate_scores(vendor_data) | |
| pdf_urls = generate_pdf(vendor_scores) | |
| send_email_alert(vendor_scores, pdf_urls) | |
| update_salesforce(vendor_scores, pdf_urls) | |
| # Prepare processed_logs for display | |
| processed_logs = [] | |
| for scores in vendor_scores: | |
| log = scores['log'] | |
| avg_score = scores['final_score'] # Use final_score for display | |
| alert_flag = scores['alert_flag'] | |
| log_entry = { | |
| 'vendorLogId': log.vendorLogId, | |
| 'vendorId': log.vendorId, | |
| 'vendor_name': scores['vendor_name'], | |
| 'contact_email': scores['contact_email'], | |
| 'workDetails': log.workDetails, | |
| 'qualityReport': log.qualityReport, | |
| 'incidentLog': log.incidentLog, | |
| 'workCompletionDate': log.workCompletionDate, | |
| 'actualCompletionDate': log.actualCompletionDate, | |
| 'delayDays': log.delayDays, | |
| 'project': log.project, | |
| 'communication_frequency': log.communicationFrequency, | |
| 'scores': { | |
| 'timelinessScore': scores['timeliness_score'], | |
| 'qualityScore': scores['quality_score'], | |
| 'safetyScore': scores['safety_score'], | |
| 'communicationScore': scores['communication_score'], | |
| 'finalScore': scores['final_score'] | |
| }, | |
| 'extracted': True, | |
| 'pdfLink': pdf_urls.get(scores['vendor_id'], ''), | |
| 'createdDate': datetime.now().isoformat() | |
| } | |
| processed_logs.append((log_entry, alert_flag, avg_score)) | |
| vendor_logs.append(log_entry) | |
| logger.info(f"Rendering dashboard with {len(processed_logs)} processed logs") | |
| last_updated = "12:43 PM IST on Monday, May 19, 2025" | |
| return render_template('index.html', processed_logs=processed_logs, last_updated=last_updated) | |
| except Exception as e: | |
| logger.error(f"Error in / endpoint: {str(e)}") | |
| return jsonify({'detail': f"Error generating dashboard: {str(e)}"}), 500 | |
| def generate_scores(): | |
| try: | |
| global vendor_logs | |
| vendor_data = fetch_vendor_logs() | |
| if not vendor_data: | |
| return jsonify({'detail': 'No vendor logs found to process'}), 400 | |
| vendor_scores = calculate_scores(vendor_data) | |
| pdf_urls = generate_pdf(vendor_scores) | |
| send_email_alert(vendor_scores, pdf_urls) | |
| update_salesforce(vendor_scores, pdf_urls) | |
| vendor_logs = [] | |
| for scores in vendor_scores: | |
| log = scores['log'] | |
| vendor_logs.append({ | |
| 'vendorLogId': log.vendorLogId, | |
| 'vendorId': log.vendorId, | |
| 'vendor_name': scores['vendor_name'], | |
| 'contact_email': scores['contact_email'], | |
| 'workDetails': log.workDetails, | |
| 'qualityReport': log.qualityReport, | |
| 'incidentLog': log.incidentLog, | |
| 'workCompletionDate': log.workCompletionDate, | |
| 'actualCompletionDate': log.actualCompletionDate, | |
| 'delayDays': log.delayDays, | |
| 'project': log.project, | |
| 'communication_frequency': log.communicationFrequency, | |
| 'scores': { | |
| 'timelinessScore': scores['timeliness_score'], | |
| 'qualityScore': scores['quality_score'], | |
| 'safetyScore': scores['safety_score'], | |
| 'communicationScore': scores['communication_score'], | |
| 'finalScore': scores['final_score'] | |
| }, | |
| 'extracted': True, | |
| 'pdfLink': pdf_urls.get(scores['vendor_id'], ''), | |
| 'createdDate': datetime.now().isoformat() | |
| }) | |
| return jsonify({'status': 'success'}) | |
| except Exception as e: | |
| logger.error(f"Error in /generate endpoint: {str(e)}") | |
| return jsonify({'detail': f"Error generating scores: {str(e)}"}), 500 | |
| if __name__ == "__main__": | |
| app.run(host="0.0.0.0", port=7860, debug=True) |