ai1 / app.py
varshakolanu's picture
Update app.py
0ec6129 verified
raw
history blame
15.4 kB
from flask import Flask, request, render_template, jsonify, Response
from pydantic import BaseModel, ValidationError
from reportlab.lib.pagesizes import letter
from reportlab.pdfgen import canvas
import base64
import os
import logging
from datetime import datetime
from simple_salesforce import Salesforce
import json
# Set up logging to capture errors and debug information
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = Flask(__name__)
# Salesforce credentials
SF_USERNAME = os.getenv("SF_USERNAME", "scores@app.com")
SF_PASSWORD = os.getenv("SF_PASSWORD", "Internal@1")
SF_SECURITY_TOKEN = os.getenv("SF_SECURITY_TOKEN", "NbUKcTx45azba5HEdntE9YAh")
SF_DOMAIN = os.getenv("SF_DOMAIN", "login")
# Verify API key is set
API_KEY = os.getenv("HUGGINGFACE_API_KEY")
if not API_KEY:
logger.error("HUGGINGFACE_API_KEY environment variable not set")
raise ValueError("HUGGINGFACE_API_KEY environment variable not set")
# Connect to Salesforce
try:
sf = Salesforce(
username=SF_USERNAME,
password=SF_PASSWORD,
security_token=SF_SECURITY_TOKEN,
domain=SF_DOMAIN
)
logger.info("Successfully connected to Salesforce")
except Exception as e:
logger.error(f"Failed to connect to Salesforce: {str(e)}")
raise
# VendorLog model to match Salesforce data
class VendorLog(BaseModel):
vendorLogId: str
vendorId: str
vendorRecordId: str
workDetails: str
qualityReport: str
incidentLog: str
workCompletionDate: str
actualCompletionDate: str
vendorLogName: str
delayDays: int
project: str
# Store vendor logs for display
vendor_logs = []
def fetch_vendor_logs_from_salesforce():
try:
query = """
SELECT Id, Name, Vendor__c, Work_Completion_Percentage__c, Quality_Percentage__c, Incident_Severity__c,
Work_Completion_Date__c, Actual_Completion_Date__c, Delay_Days__c, Project__c
FROM Vendor_Log__c
"""
result = sf.query_all(query)
logs = []
for record in result['records']:
if not record['Vendor__c']:
logger.warning(f"Skipping Vendor_Log__c record with ID {record['Id']} due to missing Vendor__c")
continue
log = VendorLog(
vendorLogId=record['Id'] or "Unknown",
vendorId=record['Name'] or "Unknown",
vendorRecordId=record['Vendor__c'],
workDetails=str(record['Work_Completion_Percentage__c'] or "0.0"),
qualityReport=str(record['Quality_Percentage__c'] or "0.0"),
incidentLog=record['Incident_Severity__c'] or "None",
workCompletionDate=record['Work_Completion_Date__c'] or "N/A",
actualCompletionDate=record['Actual_Completion_Date__c'] or "N/A",
vendorLogName=record['Name'] or "Unknown",
delayDays=int(record['Delay_Days__c'] or 0),
project=record['Project__c'] or "Unknown"
)
logs.append(log)
return logs
except Exception as e:
logger.error(f"Error fetching vendor logs from Salesforce: {str(e)}")
raise
def calculate_scores(log: VendorLog):
try:
# Extract numeric values by removing the suffixes
work_completion_percentage = float(log.workDetails.replace('% completed', ''))
quality_percentage = float(log.qualityReport.replace('% quality', ''))
# Quality Score: Directly use the quality percentage
quality_score = quality_percentage
# Timeliness Score: Based on delay days
timeliness_score = 100.0 if log.delayDays <= 0 else 80.0 if log.delayDays <= 3 else 60.0 if log.delayDays <= 7 else 40.0
# Safety Score: Based on incident severity
severity_map = {'None': 100.0, 'Low': 80.0, 'Minor': 80.0, 'Medium': 50.0, 'High': 20.0}
safety_score = severity_map.get(log.incidentLog, 100.0)
# Communication Score: Weighted average of other scores
communication_score = (quality_score * 0.33 + timeliness_score * 0.33 + safety_score * 0.33)
return {
'qualityScore': round(quality_score, 2),
'timelinessScore': round(timeliness_score, 2),
'safetyScore': round(safety_score, 2),
'communicationScore': round(communication_score, 2)
}
except ValueError as e:
logger.error(f"Error parsing workDetails or qualityReport in calculate_scores: {str(e)}")
raise ValueError(f"Invalid data format for workDetails ({log.workDetails}) or qualityReport ({log.qualityReport})")
except Exception as e:
logger.error(f"Error calculating scores: {str(e)}")
raise
def get_feedback(score: float, metric: str) -> str:
try:
if score >= 90:
return "Excellent: Maintain this standard"
elif score >= 70:
return "Good: Keep up the good work"
elif score >= 50:
if metric == 'Timeliness':
return "Needs Improvement: Maintain schedules to complete tasks on time"
elif metric == 'Safety':
return "Needs Improvement: Implement stricter safety protocols"
elif metric == 'Quality':
return "Needs Improvement: Focus on improving work quality"
else:
return "Needs Improvement: Enhance coordination with project teams"
else:
if metric == 'Timeliness':
return "Poor: Significant delays detected"
elif metric == 'Safety':
return "Poor: Critical safety issues identified"
elif metric == 'Quality':
return "Poor: Quality standards not met"
else:
return "Poor: Communication issues detected"
except Exception as e:
logger.error(f"Error generating feedback: {str(e)}")
raise
def generate_pdf(vendor_id: str, vendor_log_name: str, scores: dict):
try:
filename = f'report_{vendor_id}.pdf'
c = canvas.Canvas(filename, pagesize=letter)
c.setFont('Helvetica', 12)
c.drawString(100, 750, 'Subcontractor Performance Report')
c.drawString(100, 730, f'Vendor ID: {vendor_id}')
c.drawString(100, 710, f'Vendor Log Name: {vendor_log_name}')
c.drawString(100, 690, f'Quality Score: {scores["qualityScore"]}% ({get_feedback(scores["qualityScore"], "Quality")})')
c.drawString(100, 670, f'Timeliness Score: {scores["timelinessScore"]}% ({get_feedback(scores["timelinessScore"], "Timeliness")})')
c.drawString(100, 650, f'Safety Score: {scores["safetyScore"]}% ({get_feedback(scores["safetyScore"], "Safety")})')
c.drawString(100, 630, f'Communication Score: {scores["communicationScore"]}% ({get_feedback(scores["communicationScore"], "Communication")})')
c.save()
with open(filename, 'rb') as f:
pdf_content = f.read()
os.remove(filename)
return pdf_content
except Exception as e:
logger.error(f"Error generating PDF: {str(e)}")
raise
def determine_alert_flag(scores: dict, all_logs: list):
try:
if not all_logs:
return False
avg_score = (scores['qualityScore'] + scores['timelinessScore'] + scores['safetyScore'] + scores['communicationScore']) / 4
if avg_score < 50:
return True
lowest_avg = min([(log['scores']['qualityScore'] + log['scores']['timelinessScore'] + log['scores']['safetyScore'] + log['scores']['communicationScore']) / 4 for log in all_logs])
return avg_score == lowest_avg
except Exception as e:
logger.error(f"Error determining alert flag: {str(e)}")
raise
def store_scores_in_salesforce(log: VendorLog, scores: dict, pdf_content: bytes, alert_flag: bool):
try:
# Step 1: Create the Subcontractor_Performance_Score__c record
score_record = sf.Subcontractor_Performance_Score__c.create({
'Vendor_Log__c': log.vendorLogId,
'Vendor__c': log.vendorRecordId,
'Quality_Score__c': scores['qualityScore'],
'Timeliness_Score__c': scores['timelinessScore'],
'Safety_Score__c': scores['safetyScore'],
'Communication_Score__c': scores['communicationScore'],
'Alert_Flag__c': alert_flag
})
score_record_id = score_record['id']
logger.info(f"Successfully created Subcontractor_Performance_Score__c record with ID: {score_record_id}")
# Step 2: Upload the PDF as a ContentVersion
pdf_base64 = base64.b64encode(pdf_content).decode('utf-8')
content_version = sf.ContentVersion.create({
'Title': f'Performance_Report_{log.vendorId}',
'PathOnClient': f'report_{log.vendorId}.pdf',
'VersionData': pdf_base64,
'FirstPublishLocationId': score_record_id
})
logger.info(f"Successfully uploaded PDF as ContentVersion for Vendor Log ID: {log.vendorLogId}")
# Step 3: Get the ContentDocumentId and construct a URL to the file
content_version_id = content_version['id']
content_version_record = sf.query(f"SELECT ContentDocumentId FROM ContentVersion WHERE Id = '{content_version_id}'")
content_document_id = content_version_record['records'][0]['ContentDocumentId']
pdf_url = f"https://{sf.sf_instance}/sfc/servlet.shepherd/document/download/{content_document_id}"
# Step 4: Update the Subcontractor_Performance_Score__c record with the PDF URL
sf.Subcontractor_Performance_Score__c.update(score_record_id, {
'PDF_Link__c': pdf_url
})
logger.info(f"Successfully updated Subcontractor_Performance_Score__c record with PDF URL: {pdf_url}")
except Exception as e:
logger.error(f"Error storing scores in Salesforce: {str(e)}")
raise
@app.route('/score', methods=['POST'])
def score_vendor():
try:
# Get the Authorization header
auth_header = request.headers.get('Authorization')
if auth_header != f'Bearer {API_KEY}':
return jsonify({'detail': 'Invalid API key'}), 401
# Parse and validate the request body
if not request.is_json:
return jsonify({'detail': 'Request body must be JSON'}), 400
data = request.get_json()
try:
log = VendorLog(**data)
except ValidationError as e:
logger.error(f"Validation error in /score endpoint: {str(e)}")
return jsonify({'detail': f"Invalid request data: {str(e)}"}), 400
logger.info(f"Received Vendor Log: {log}")
# Calculate scores, generate PDF, and store in Salesforce
scores = calculate_scores(log)
pdf_content = generate_pdf(log.vendorId, log.vendorLogName, scores)
pdf_base64 = base64.b64encode(pdf_content).decode('utf-8')
alert_flag = determine_alert_flag(scores, vendor_logs)
store_scores_in_salesforce(log, scores, pdf_content, alert_flag)
# Store the log in memory
vendor_logs.append({
'vendorLogId': log.vendorLogId,
'vendorId': log.vendorId,
'vendorLogName': log.vendorLogName,
'workDetails': log.workDetails,
'qualityReport': log.qualityReport,
'incidentLog': log.incidentLog,
'workCompletionDate': log.workCompletionDate,
'actualCompletionDate': log.actualCompletionDate,
'delayDays': log.delayDays,
'project': log.project,
'scores': scores,
'extracted': True
})
return jsonify({
'vendorLogId': log.vendorLogId,
'vendorId': log.vendorId,
'vendorLogName': log.vendorLogName,
'qualityScore': scores['qualityScore'],
'timelinessScore': scores['timelinessScore'],
'safetyScore': scores['safetyScore'],
'communicationScore': scores['communicationScore'],
'pdfContent': pdf_base64,
'alert': alert_flag
})
except ValueError as e:
logger.error(f"ValueError in /score endpoint: {str(e)}")
return jsonify({'detail': str(e)}), 400
except Exception as e:
logger.error(f"Error in /score endpoint: {str(e)}")
return jsonify({'detail': f"Error processing vendor log: {str(e)}"}), 500
@app.route('/', methods=['GET'])
def get_dashboard():
try:
global vendor_logs
fetched_logs = fetch_vendor_logs_from_salesforce()
for log in fetched_logs:
if not any(existing_log['vendorLogId'] == log.vendorLogId for existing_log in vendor_logs):
scores = calculate_scores(log)
pdf_content = generate_pdf(log.vendorId, log.vendorLogName, scores)
alert_flag = determine_alert_flag(scores, vendor_logs)
store_scores_in_salesforce(log, scores, pdf_content, alert_flag)
vendor_logs.append({
'vendorLogId': log.vendorLogId,
'vendorId': log.vendorId,
'vendorLogName': log.vendorLogName,
'workDetails': log.workDetails,
'qualityReport': log.qualityReport,
'incidentLog': log.incidentLog,
'workCompletionDate': log.workCompletionDate,
'actualCompletionDate': log.actualCompletionDate,
'delayDays': log.delayDays,
'project': log.project,
'scores': scores,
'extracted': True
})
# Current timestamp for display
last_updated = "03:44 PM IST on Saturday, May 17, 2025"
return render_template('index.html', vendor_logs=vendor_logs, last_updated=last_updated)
except Exception as e:
logger.error(f"Error in / endpoint: {str(e)}")
return jsonify({'detail': f"Error generating dashboard: {str(e)}"}), 500
@app.route('/generate', methods=['POST'])
def generate_scores():
try:
global vendor_logs
fetched_logs = fetch_vendor_logs_from_salesforce()
vendor_logs = []
for log in fetched_logs:
scores = calculate_scores(log)
pdf_content = generate_pdf(log.vendorId, log.vendorLogName, scores)
alert_flag = determine_alert_flag(scores, vendor_logs)
store_scores_in_salesforce(log, scores, pdf_content, alert_flag)
vendor_logs.append({
'vendorLogId': log.vendorLogId,
'vendorId': log.vendorId,
'vendorLogName': log.vendorLogName,
'workDetails': log.workDetails,
'qualityReport': log.qualityReport,
'incidentLog': log.incidentLog,
'workCompletionDate': log.workCompletionDate,
'actualCompletionDate': log.actualCompletionDate,
'delayDays': log.delayDays,
'project': log.project,
'scores': scores,
'extracted': True
})
return jsonify({'status': 'success'})
except Exception as e:
logger.error(f"Error in /generate endpoint: {str(e)}")
return jsonify({'detail': f"Error generating scores: {str(e)}"}), 500
if __name__ == "__main__":
app.run(host="0.0.0.0", port=7860, debug=True)