Spaces:
Runtime error
Runtime error
File size: 10,919 Bytes
d68584a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 |
from flask import Flask, request, jsonify
import os
import json
import requests # For making HTTP requests to Salesforce
from model import process_vendor_logs # Import your scoring logic from model.py
app = Flask(__name__)
# --- Configuration ---
# These should be set as environment variables in your Hugging Face Space for security
SALESFORCE_CONSUMER_KEY = os.environ.get('SALESFORCE_CONSUMER_KEY')
SALESFORCE_CONSUMER_SECRET = os.environ.get('SALESFORCE_CONSUMER_SECRET')
SALESFORCE_CALLBACK_URL = os.environ.get('SALESFORCE_CALLBACK_URL') # Needs to match your Connected App
SALESFORCE_TOKEN_URL = "https://login.salesforce.com/services/oauth2/token" # Or your sandbox URL
SALESFORCE_AUTH_URL = "https://login.salesforce.com/services/oauth2/authorize" # Or your sandbox URL
# Placeholder for your Salesforce instance URL. This will be obtained dynamically
SALESFORCE_INSTANCE_URL = None
# Hugging Face Space doesn't persist data, so use a very basic in-memory storage for demonstration.
# Replace with a database (e.g., PostgreSQL) for a production environment.
auth_code = None
access_token = None
refresh_token = None
# --- Helper Functions ---
def get_salesforce_access_token(code):
"""
Exchanges an authorization code for an access token and refresh token from Salesforce.
Args:
code: The authorization code received from Salesforce.
Returns:
A tuple containing the access token and refresh token, or (None, None) on error.
"""
token_data = {
'grant_type': 'authorization_code',
'code': code,
'client_id': SALESFORCE_CONSUMER_KEY,
'client_secret': SALESFORCE_CONSUMER_SECRET,
'redirect_uri': SALESFORCE_CALLBACK_URL
}
response = requests.post(SALESFORCE_TOKEN_URL, data=token_data)
if response.status_code == 200:
token_response = response.json()
return token_response.get('access_token'), token_response.get('refresh_token'), token_response.get('instance_url')
else:
print(f"Error getting access token: {response.text}") # Log the error
return None, None, None
def refresh_salesforce_access_token(refresh_token):
"""
Refreshes the access token using the refresh token.
Args:
refresh_token: The refresh token from Salesforce.
Returns:
A tuple containing the new access token and refresh token, or (None, None) on error.
"""
token_data = {
'grant_type': 'refresh_token',
'refresh_token': refresh_token,
'client_id': SALESFORCE_CONSUMER_KEY,
'client_secret': SALESFORCE_CONSUMER_SECRET,
'redirect_uri': SALESFORCE_CALLBACK_URL #important for refresh
}
response = requests.post(SALESFORCE_TOKEN_URL, data=token_data)
if response.status_code == 200:
token_response = response.json()
return token_response.get('access_token'), token_response.get('refresh_token'), token_response.get('instance_url')
else:
print(f"Error refreshing access token: {response.text}") # Log the error
return None, None, None
def query_salesforce(soql_query, access_token, instance_url):
"""
Executes a SOQL query against Salesforce.
Args:
soql_query: The SOQL query string.
access_token: The Salesforce access token.
instance_url: The Salesforce instance URL.
Returns:
A list of records from Salesforce, or None on error.
"""
headers = {'Authorization': f'Bearer {access_token}'}
url = f"{instance_url}/services/data/v59.0/query/?q={soql_query}" # Use API version v59.0
response = requests.get(url, headers=headers)
if response.status_code == 200:
return response.json().get('records')
else:
print(f"Error querying Salesforce: {response.text}") # Log the error
return None
def send_data_to_salesforce(sobject_name, data, access_token, instance_url):
"""
Sends data to Salesforce to create a new record.
Args:
sobject_name: The Salesforce object name (e.g., 'Subcontractor_Performance_Score__c').
data: A dictionary containing the data to send.
access_token: The Salesforce access token.
instance_url: The Salesforce instance URL.
Returns:
The ID of the created record, or None on error.
"""
headers = {
'Authorization': f'Bearer {access_token}',
'Content-Type': 'application/json'
}
url = f"{instance_url}/services/data/v59.0/sobjects/{sobject_name}/" # Use API version v59.0
response = requests.post(url, headers=headers, json=data)
if 200 <= response.status_code < 300: # Successful creation (201 Created)
return response.json().get('id')
else:
print(f"Error sending data to Salesforce ({sobject_name}): {response.text}") # Log error
return None
# --- Flask Routes ---
@app.route('/')
def index():
"""
Displays a welcome message and instructions.
"""
return ("Welcome to the Subcontractor Performance Scorer API!\n"
"1. To authorize with Salesforce, visit /authorize_salesforce.\n"
"2. After authorization, vendor logs will be processed automatically (in a real implementation).\n"
"3. Scores will be sent to Salesforce (in a real implementation).\n")
@app.route('/authorize_salesforce')
def authorize_salesforce():
"""
Redirects the user to the Salesforce authorization URL to obtain an authorization code.
"""
authorization_url = (
f"{SALESFORCE_AUTH_URL}?"
"response_type=code&"
f"client_id={SALESFORCE_CONSUMER_KEY}&"
f"redirect_uri={SALESFORCE_CALLBACK_URL}&"
"scope=api" # Add other scopes as needed (e.g., 'refresh_token')
)
return jsonify({"authorization_url": authorization_url}) # Return as JSON for easier handling
@app.route('/callback')
def callback():
"""
Handles the Salesforce callback after authorization. Exchanges the authorization code
for an access token and refresh token.
"""
global auth_code, access_token, refresh_token, SALESFORCE_INSTANCE_URL
code = request.args.get('code')
if not code:
return jsonify({'error': 'Authorization code not provided.'}), 400
auth_code = code # Store the auth code
access_token, refresh_token, SALESFORCE_INSTANCE_URL = get_salesforce_access_token(code)
if access_token:
return jsonify({'message': 'Successfully authorized with Salesforce! Access token and refresh token obtained.'}), 200
else:
return jsonify({'error': 'Failed to obtain access token.'}), 500
@app.route('/process_logs') # You might trigger this from a Salesforce outbound message or scheduled job.
def process_logs():
"""
Retrieves vendor logs from Salesforce, processes them using the scoring model,
and sends the results to Salesforce.
"""
global access_token, refresh_token, SALESFORCE_INSTANCE_URL
if not access_token:
if auth_code:
access_token, refresh_token, SALESFORCE_INSTANCE_URL = get_salesforce_access_token(auth_code)
else:
return jsonify({'error': 'Not authorized with Salesforce. Please visit /authorize_salesforce first.'}), 401
if not access_token:
return jsonify({'error': 'Not authorized with Salesforce. Please visit /authorize_salesforce first.'}), 401
if not SALESFORCE_INSTANCE_URL:
return jsonify({'error': 'Salesforce instance URL is missing.'}), 500
try:
# 1. Fetch vendor logs from Salesforce (replace with your actual SOQL query)
soql_query = "SELECT Id, Vendor__c, Work_Completion_Details__c, Delay_Reports__c, Incident_Logs__c, Log_Date__c FROM Vendor_Log__c"
vendor_logs = query_salesforce(soql_query, access_token, SALESFORCE_INSTANCE_URL)
if not vendor_logs:
return jsonify({'error': 'Failed to retrieve vendor logs from Salesforce or no logs found.'}), 500
# 2. Process the vendor logs using the model
scores_for_salesforce = process_vendor_logs(vendor_logs) # Call your model.py function
if not scores_for_salesforce:
return jsonify({'warning': 'No scores to send to Salesforce.'}), 200 # No error, but no data sent
# 3. Send the scores to Salesforce
all_updates_successful = True
for score_data in scores_for_salesforce:
# Map the score_data to the fields in your Subcontractor_Performance_Score__c object
salesforce_data = {
'Vendor__c': score_data['vendor_id'], # You might need to look up the Salesforce ID
'Month_c': score_data['month'],
'Quality_Score__c': score_data['quality'],
'Timeliness_Score__c': score_data['timeliness'],
'Safety_Score__c': score_data['safety'],
'Communication_Score__c': score_data['communication'],
'Final_Score_c': score_data['final_score'],
'Alert_Flag_c': score_data['alert_flag'],
'Certification_URL_c': score_data['certificate_url'], #add this
'Trend_Deviation__c': score_data['trend_deviation']
}
record_id = send_data_to_salesforce('Subcontractor_Performance_Score__c', salesforce_data, access_token, SALESFORCE_INSTANCE_URL)
if not record_id:
all_updates_successful = False
print(f"Failed to send score for vendor {score_data['vendor_id']} to Salesforce.")
if all_updates_successful:
return jsonify({'message': 'Successfully processed vendor logs and sent scores to Salesforce.'}), 200
else:
return jsonify({'error': 'Failed to send some scores to Salesforce. Check logs for details.'}), 500
except Exception as e:
return jsonify({'error': f'An error occurred: {e}'}), 500
@app.route('/refresh_token')
def refresh_token_route():
"""
Endpoint to refresh the Salesforce access token. This would be called periodically
(e.g., by a scheduled task) in a real application.
"""
global access_token, refresh_token, SALESFORCE_INSTANCE_URL
if not refresh_token:
return jsonify({'error': 'No refresh token available.'}), 400
new_access_token, new_refresh_token, SALESFORCE_INSTANCE_URL = refresh_salesforce_access_token(refresh_token)
if new_access_token:
access_token = new_access_token
refresh_token = new_refresh_token # Update the refresh token, as it might change.
return jsonify({'message': 'Access token refreshed successfully.'}), 200
else:
return jsonify({'error': 'Failed to refresh access token.'}), 500
if __name__ == '__main__':
# The port is automatically configured by Hugging Face Spaces
port = int(os.environ.get("PORT", 8080))
app.run(host='0.0.0.0', port=port, debug=True)
|