ai1 / app.py
lokeshloki143's picture
Update app.py
5cfa01e verified
import os
import logging
from datetime import datetime
from flask import Flask, request, render_template, jsonify
from pydantic import BaseModel, ValidationError
from reportlab.lib.pagesizes import letter
from reportlab.pdfgen import canvas
import base64
from io import BytesIO
from simple_salesforce import Salesforce
from sendgrid import SendGridAPIClient
from sendgrid.helpers.mail import Mail
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
# Set up logging to capture errors and debug information
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = Flask(__name__)
# Load environment variables
SF_USERNAME = os.getenv("SF_USERNAME", "scores@app.com")
SF_PASSWORD = os.getenv("SF_PASSWORD", "Internal@1")
SF_SECURITY_TOKEN = os.getenv("SF_SECURITY_TOKEN", "NbUKcTx45azba5HEdntE9YAh")
SF_DOMAIN = os.getenv("SF_DOMAIN", "login")
SENDGRID_API_KEY = os.getenv("SENDGRID_API_KEY")
# Validate required environment variables (exclude SENDGRID_API_KEY)
required_vars = {
'SF_USERNAME': SF_USERNAME,
'SF_PASSWORD': SF_PASSWORD,
'SF_SECURITY_TOKEN': SF_SECURITY_TOKEN
}
missing_vars = [var for var, value in required_vars.items() if not value]
if missing_vars:
logger.error(f"Missing required environment variables: {', '.join(missing_vars)}")
raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}")
if not SENDGRID_API_KEY:
logger.warning("SENDGRID_API_KEY not set. Email alerts will be disabled.")
# Connect to Salesforce
try:
sf = Salesforce(
username=SF_USERNAME,
password=SF_PASSWORD,
security_token=SF_SECURITY_TOKEN,
domain=SF_DOMAIN,
version='63.0'
)
logger.info("Successfully connected to Salesforce")
except Exception as e:
logger.error(f"Failed to connect to Salesforce: {str(e)}")
sf = None
# VendorLog model to match Salesforce data
class VendorLog(BaseModel):
vendorLogId: str
vendorId: str
vendorRecordId: str
workDetails: str
qualityReport: str
incidentLog: str
workCompletionDate: str
actualCompletionDate: str
vendorLogName: str
delayDays: int
project: str
communicationFrequency: int
# Store vendor logs for display
vendor_logs = []
def fetch_vendor_logs():
"""Fetch the latest Vendor_Log__c records for each vendor based on LastModifiedDate."""
if not sf:
logger.error("Salesforce connection not available")
return []
try:
# Fetch all vendors
vendors = sf.query_all("SELECT Id, Vendor_Name__c, Contact_Email__c FROM Vendor__c")['records']
vendor_data = []
for vendor in vendors:
vendor_id = vendor['Id']
# Validate vendor_id format (should be a valid Salesforce ID, 15 or 18 characters)
if not (len(vendor_id) == 15 or len(vendor_id) == 18):
logger.warning(f"Invalid Vendor__c ID format: {vendor_id}")
continue
# Fetch the latest Vendor_Log__c record for this vendor
query = f"""
SELECT Id, Name, Vendor__c, Project__c, Work_Completion_Percentage__c, Quality_Percentage__c,
Incident_Severity__c, Delay_Days__c, Communication_Frequency__c,
Work_Completion_Date__c, Actual_Completion_Date__c
FROM Vendor_Log__c
WHERE Vendor__c = '{vendor_id}'
ORDER BY LastModifiedDate DESC
LIMIT 1
"""
log = sf.query(query)['records']
if log:
record = log[0]
if not record['Vendor__c']:
logger.warning(f"Skipping Vendor_Log__c record with ID {record['Id']} due to missing Vendor__c")
continue
vendor_log = VendorLog(
vendorLogId=record['Id'] or "Unknown",
vendorId=record['Name'] or "Unknown",
vendorRecordId=record['Vendor__c'],
workDetails=str(record['Work_Completion_Percentage__c'] or "0.0"),
qualityReport=str(record['Quality_Percentage__c'] or "0.0"),
incidentLog=record['Incident_Severity__c'] or "None",
workCompletionDate=record['Work_Completion_Date__c'] or "N/A",
actualCompletionDate=record['Actual_Completion_Date__c'] or "N/A",
vendorLogName=record['Name'] or "Unknown",
delayDays=int(record['Delay_Days__c'] or 0),
project=record['Project__c'] or "Unknown",
communicationFrequency=int(record['Communication_Frequency__c'] or 0)
)
vendor_data.append({
'vendor_id': vendor_id,
'vendor_name': vendor['Vendor_Name__c'] or 'Unknown Vendor',
'contact_email': vendor['Contact_Email__c'],
'log': vendor_log
})
else:
logger.debug(f"No Vendor_Log__c found for Vendor__c: {vendor_id}")
logger.info(f"Fetched {len(vendor_data)} vendor logs")
return vendor_data
except Exception as e:
logger.error(f"Error fetching vendor logs: {str(e)}")
return []
def calculate_scores(vendor_data):
"""Calculate performance scores for each vendor."""
results = []
for data in vendor_data:
vendor_id = data['vendor_id']
vendor_name = data['vendor_name']
contact_email = data['contact_email']
log = data['log']
# Extract log details
quality_percentage = float(log.qualityReport)
delay_days = log.delayDays
incident_severity = log.incidentLog
communication_frequency = log.communicationFrequency
# Timeliness Score
timeliness_score = 100 # Base score for Delay_Days__c = 0
if delay_days > 0:
timeliness_score -= delay_days * 5 # Decrease 5% per day late
elif delay_days < 0:
timeliness_score += (-delay_days) * 2 # Increase 2% per day early
timeliness_score = max(0, min(timeliness_score, 120)) # Cap between 0 and 120
# Quality Score
quality_score = quality_percentage
# Safety Score
if incident_severity == "None":
safety_score = 100 # None: 100%
else:
safety_score = 90 if incident_severity == 'Low' else 70 if incident_severity == 'Medium' else 40
# Communication Score
communication_score = 90 if communication_frequency >= 5 else 70 if communication_frequency >= 3 else 50
# Final Score (for display purposes)
final_score = (timeliness_score + quality_score + safety_score + communication_score) / 4
# Alert Flag
alert_flag = final_score < 40
results.append({
'vendor_id': vendor_id,
'vendor_name': vendor_name,
'contact_email': contact_email,
'log': log,
'timeliness_score': round(timeliness_score, 2),
'quality_score': round(quality_score, 2),
'safety_score': round(safety_score, 2),
'communication_score': round(communication_score, 2),
'final_score': round(final_score, 2),
'alert_flag': alert_flag,
'delay_days': delay_days,
'incident_severity': incident_severity,
'communication_frequency': communication_frequency
})
logger.info(f"Calculated scores for {len(results)} vendors")
return results
def generate_feedback(scores):
"""Generate creative feedback based on scores."""
final_score = scores['final_score']
timeliness_score = scores['timeliness_score']
quality_score = scores['quality_score']
safety_score = scores['safety_score']
communication_score = scores['communication_score']
feedback = f"Performance Review for {scores['vendor_name']} (Final Score: {final_score}%)\n\n"
if final_score >= 80:
feedback += "Outstanding Performance! Your dedication to excellence is evident across all metrics. "
feedback += "We encourage you to maintain this high standard and continue to lead by example."
elif final_score >= 60:
feedback += "Good Effort! You are performing well overall, with opportunities for growth. "
feedback += "Please focus on the areas highlighted below to enhance your performance further."
else:
feedback += "Action Required: Your performance is below the expected threshold. "
feedback += "We are committed to supporting your improvement—please review the feedback below and let’s collaborate on next steps."
feedback += "\n\nDetailed Feedback:\n"
feedback += f"- Timeliness ({timeliness_score}%): "
if timeliness_score >= 100:
feedback += "Exceptional! Completing tasks ahead of schedule reflects your strong commitment to deadlines."
elif timeliness_score >= 80:
feedback += "Well done on maintaining timely deliveries."
else:
feedback += f"Delays of {scores['delay_days']} days have impacted your score. We recommend improving scheduling practices."
feedback += f"\n- Quality ({quality_score}%): "
if quality_score >= 80:
feedback += "Excellent quality of work! Your deliverables consistently meet high standards."
else:
feedback += "There is room for improvement in quality. Focus on delivering consistent, high-quality results."
feedback += f"\n- Safety ({safety_score}%): "
if safety_score >= 80:
feedback += "Strong safety record! Continue prioritizing safety in all operations."
else:
feedback += f"Incidents (Severity: {scores['incident_severity'] or 'None'}) have affected your score. Please strengthen safety protocols."
feedback += f"\n- Communication ({communication_score}%): "
if communication_score >= 80:
feedback += "Effective communication! Your regular updates ensure smooth collaboration."
else:
feedback += f"Communication frequency ({scores['communication_frequency']} times) needs improvement. Aim for more consistent updates."
return feedback
def generate_pdf(vendor_scores):
"""Generate a PDF for each vendor and upload as a Salesforce attachment."""
pdf_urls = {}
for scores in vendor_scores:
vendor_id = scores['vendor_id']
vendor_name = scores['vendor_name'].replace(' ', '_')
log = scores['log']
# Generate PDF in memory
buffer = BytesIO()
c = canvas.Canvas(buffer, pagesize=letter)
c.setFont("Helvetica", 12)
c.drawString(100, 750, f"Subcontractor Performance Report - {vendor_name}")
c.drawString(100, 730, f"Generated on: {datetime.now().strftime('%Y-%m-%d')}")
c.drawString(100, 700, f"Final Score: {scores['final_score']}%")
c.drawString(100, 680, f"Timeliness Score: {scores['timeliness_score']}%")
c.drawString(100, 660, f"Quality Score: {scores['quality_score']}%")
c.drawString(100, 640, f"Safety Score: {scores['safety_score']}%")
c.drawString(100, 620, f"Communication Score: {scores['communication_score']}%")
c.drawString(100, 600, f"Alert Flag: {'Yes' if scores['alert_flag'] else 'No'}")
# Add feedback
feedback = generate_feedback(scores)
y_position = 570
for line in feedback.split('\n'):
c.drawString(100, y_position, line)
y_position -= 15
if y_position < 50:
c.showPage()
y_position = 750
c.save()
pdf_content = buffer.getvalue()
buffer.close()
# Upload PDF as an attachment to Subcontractor_Performance_Score__c
try:
month = datetime.today().replace(day=1).date().strftime('%Y-%m-%d')
query = f"""
SELECT Id
FROM Subcontractor_Performance_Score__c
WHERE Vendor__c = '{vendor_id}'
AND Month__c = '{month}'
LIMIT 1
"""
score_record = sf.query(query)['records']
if not score_record:
logger.error(f"No Subcontractor_Performance_Score__c record found for Vendor__c {vendor_id} for month {month}")
continue
parent_id = score_record[0]['Id']
attachment = {
'Name': f"{vendor_name}_Performance_Report_{month}.pdf",
'Body': base64.b64encode(pdf_content).decode('utf-8'),
'ParentId': parent_id
}
attachment_result = sf.Attachment.create(attachment)
attachment_id = attachment_result['id']
# Construct the attachment URL using your Salesforce instance
pdf_url = f"https://{sf.sf_instance}/servlet/servlet.FileDownload?file={attachment_id}"
pdf_urls[vendor_id] = pdf_url
logger.info(f"Uploaded PDF for Vendor__c {vendor_id}: {pdf_url}")
except Exception as e:
logger.error(f"Failed to upload PDF for Vendor__c {vendor_id}: {str(e)}")
pdf_urls[vendor_id] = ''
return pdf_urls
def send_email_alert(vendor_scores, pdf_urls):
"""Send email alerts to vendors with low scores using SendGrid."""
if not SENDGRID_API_KEY:
logger.info("Skipping email alerts due to missing SENDGRID_API_KEY")
return
sg = SendGridAPIClient(SENDGRID_API_KEY)
for scores in vendor_scores:
if not scores['alert_flag']:
continue
contact_email = scores['contact_email']
if not contact_email:
logger.warning(f"No contact email provided for vendor {scores['vendor_name']}, skipping email alert")
continue
vendor_name = scores['vendor_name']
pdf_url = pdf_urls.get(scores['vendor_id'], '')
message = Mail(
from_email='performancescores.com@gmail.com', # Replace with your verified SendGrid sender email
to_emails=contact_email,
subject=f"Performance Alert for {vendor_name}",
plain_text_content=f"""
Dear {vendor_name},
Your recent performance score is {scores['final_score']}%, which is below the acceptable threshold of 40%.
Please review your performance report for detailed feedback and take necessary actions to improve.
Performance Report: {pdf_url}
Best regards,
Subcontractor Performance Team
"""
)
try:
response = sg.send(message)
logger.info(f"Email sent to {contact_email}, status: {response.status_code}")
except Exception as e:
logger.error(f"Failed to send email to {contact_email}: {str(e)}")
def update_salesforce(vendor_scores, pdf_urls):
"""Update Subcontractor_Performance_Score__c records in Salesforce."""
if not sf:
logger.error("Salesforce connection not available")
return
for scores in vendor_scores:
vendor_id = scores['vendor_id']
month = datetime.today().replace(day=1).date()
month_str = month.strftime('%Y-%m-%d')
query = f"""
SELECT Id
FROM Subcontractor_Performance_Score__c
WHERE Vendor__c = '{vendor_id}'
AND Month__c = '{month_str}'
LIMIT 1
"""
try:
existing = sf.query(query)['records']
except Exception as e:
logger.error(f"Failed to query Subcontractor_Performance_Score__c for Vendor__c {vendor_id}: {str(e)}")
continue
# Prepare the record for create/update
update_record = {
'Month__c': month_str,
'Quality_Score__c': scores['quality_score'],
'Timeliness_Score__c': scores['timeliness_score'],
'Safety_Score__c': scores['safety_score'],
'Communication_Score__c': scores['communication_score'],
'Alert_Flag__c': scores['alert_flag'],
'PDF_Link__c': pdf_urls.get(vendor_id, '')
}
create_record = update_record.copy()
create_record['Vendor__c'] = vendor_id
try:
if existing:
sf.Subcontractor_Performance_Score__c.update(existing[0]['Id'], update_record)
logger.info(f"Updated Subcontractor_Performance_Score__c for Vendor__c {vendor_id}")
else:
sf.Subcontractor_Performance_Score__c.create(create_record)
logger.info(f"Created Subcontractor_Performance_Score__c for Vendor__c {vendor_id}")
except Exception as e:
logger.error(f"Failed to update Salesforce for Vendor__c {vendor_id}: {str(e)}")
@app.route('/', methods=['GET'])
def get_dashboard():
try:
global vendor_logs
vendor_data = fetch_vendor_logs()
if not vendor_data:
logger.warning("No vendor logs found")
last_updated = "12:43 PM IST on Monday, May 19, 2025"
return render_template('index.html', processed_logs=[], last_updated=last_updated)
vendor_scores = calculate_scores(vendor_data)
pdf_urls = generate_pdf(vendor_scores)
send_email_alert(vendor_scores, pdf_urls)
update_salesforce(vendor_scores, pdf_urls)
# Prepare processed_logs for display
processed_logs = []
for scores in vendor_scores:
log = scores['log']
avg_score = scores['final_score'] # Use final_score for display
alert_flag = scores['alert_flag']
log_entry = {
'vendorLogId': log.vendorLogId,
'vendorId': log.vendorId,
'vendor_name': scores['vendor_name'],
'contact_email': scores['contact_email'],
'workDetails': log.workDetails,
'qualityReport': log.qualityReport,
'incidentLog': log.incidentLog,
'workCompletionDate': log.workCompletionDate,
'actualCompletionDate': log.actualCompletionDate,
'delayDays': log.delayDays,
'project': log.project,
'communication_frequency': log.communicationFrequency,
'scores': {
'timelinessScore': scores['timeliness_score'],
'qualityScore': scores['quality_score'],
'safetyScore': scores['safety_score'],
'communicationScore': scores['communication_score'],
'finalScore': scores['final_score']
},
'extracted': True,
'pdfLink': pdf_urls.get(scores['vendor_id'], ''),
'createdDate': datetime.now().isoformat()
}
processed_logs.append((log_entry, alert_flag, avg_score))
vendor_logs.append(log_entry)
logger.info(f"Rendering dashboard with {len(processed_logs)} processed logs")
last_updated = "12:43 PM IST on Monday, May 19, 2025"
return render_template('index.html', processed_logs=processed_logs, last_updated=last_updated)
except Exception as e:
logger.error(f"Error in / endpoint: {str(e)}")
return jsonify({'detail': f"Error generating dashboard: {str(e)}"}), 500
@app.route('/generate', methods=['POST'])
def generate_scores():
try:
global vendor_logs
vendor_data = fetch_vendor_logs()
if not vendor_data:
return jsonify({'detail': 'No vendor logs found to process'}), 400
vendor_scores = calculate_scores(vendor_data)
pdf_urls = generate_pdf(vendor_scores)
send_email_alert(vendor_scores, pdf_urls)
update_salesforce(vendor_scores, pdf_urls)
vendor_logs = []
for scores in vendor_scores:
log = scores['log']
vendor_logs.append({
'vendorLogId': log.vendorLogId,
'vendorId': log.vendorId,
'vendor_name': scores['vendor_name'],
'contact_email': scores['contact_email'],
'workDetails': log.workDetails,
'qualityReport': log.qualityReport,
'incidentLog': log.incidentLog,
'workCompletionDate': log.workCompletionDate,
'actualCompletionDate': log.actualCompletionDate,
'delayDays': log.delayDays,
'project': log.project,
'communication_frequency': log.communicationFrequency,
'scores': {
'timelinessScore': scores['timeliness_score'],
'qualityScore': scores['quality_score'],
'safetyScore': scores['safety_score'],
'communicationScore': scores['communication_score'],
'finalScore': scores['final_score']
},
'extracted': True,
'pdfLink': pdf_urls.get(scores['vendor_id'], ''),
'createdDate': datetime.now().isoformat()
})
return jsonify({'status': 'success'})
except Exception as e:
logger.error(f"Error in /generate endpoint: {str(e)}")
return jsonify({'detail': f"Error generating scores: {str(e)}"}), 500
if __name__ == "__main__":
app.run(host="0.0.0.0", port=7860, debug=True)