varshakolanu's picture
Create app.py
d68584a verified
from flask import Flask, request, jsonify
import os
import json
import requests # For making HTTP requests to Salesforce
from model import process_vendor_logs # Import your scoring logic from model.py
app = Flask(__name__)
# --- Configuration ---
# These should be set as environment variables in your Hugging Face Space for security
SALESFORCE_CONSUMER_KEY = os.environ.get('SALESFORCE_CONSUMER_KEY')
SALESFORCE_CONSUMER_SECRET = os.environ.get('SALESFORCE_CONSUMER_SECRET')
SALESFORCE_CALLBACK_URL = os.environ.get('SALESFORCE_CALLBACK_URL') # Needs to match your Connected App
SALESFORCE_TOKEN_URL = "https://login.salesforce.com/services/oauth2/token" # Or your sandbox URL
SALESFORCE_AUTH_URL = "https://login.salesforce.com/services/oauth2/authorize" # Or your sandbox URL
# Placeholder for your Salesforce instance URL. This will be obtained dynamically
SALESFORCE_INSTANCE_URL = None
# Hugging Face Space doesn't persist data, so use a very basic in-memory storage for demonstration.
# Replace with a database (e.g., PostgreSQL) for a production environment.
auth_code = None
access_token = None
refresh_token = None
# --- Helper Functions ---
def get_salesforce_access_token(code):
"""
Exchanges an authorization code for an access token and refresh token from Salesforce.
Args:
code: The authorization code received from Salesforce.
Returns:
A tuple containing the access token and refresh token, or (None, None) on error.
"""
token_data = {
'grant_type': 'authorization_code',
'code': code,
'client_id': SALESFORCE_CONSUMER_KEY,
'client_secret': SALESFORCE_CONSUMER_SECRET,
'redirect_uri': SALESFORCE_CALLBACK_URL
}
response = requests.post(SALESFORCE_TOKEN_URL, data=token_data)
if response.status_code == 200:
token_response = response.json()
return token_response.get('access_token'), token_response.get('refresh_token'), token_response.get('instance_url')
else:
print(f"Error getting access token: {response.text}") # Log the error
return None, None, None
def refresh_salesforce_access_token(refresh_token):
"""
Refreshes the access token using the refresh token.
Args:
refresh_token: The refresh token from Salesforce.
Returns:
A tuple containing the new access token and refresh token, or (None, None) on error.
"""
token_data = {
'grant_type': 'refresh_token',
'refresh_token': refresh_token,
'client_id': SALESFORCE_CONSUMER_KEY,
'client_secret': SALESFORCE_CONSUMER_SECRET,
'redirect_uri': SALESFORCE_CALLBACK_URL #important for refresh
}
response = requests.post(SALESFORCE_TOKEN_URL, data=token_data)
if response.status_code == 200:
token_response = response.json()
return token_response.get('access_token'), token_response.get('refresh_token'), token_response.get('instance_url')
else:
print(f"Error refreshing access token: {response.text}") # Log the error
return None, None, None
def query_salesforce(soql_query, access_token, instance_url):
"""
Executes a SOQL query against Salesforce.
Args:
soql_query: The SOQL query string.
access_token: The Salesforce access token.
instance_url: The Salesforce instance URL.
Returns:
A list of records from Salesforce, or None on error.
"""
headers = {'Authorization': f'Bearer {access_token}'}
url = f"{instance_url}/services/data/v59.0/query/?q={soql_query}" # Use API version v59.0
response = requests.get(url, headers=headers)
if response.status_code == 200:
return response.json().get('records')
else:
print(f"Error querying Salesforce: {response.text}") # Log the error
return None
def send_data_to_salesforce(sobject_name, data, access_token, instance_url):
"""
Sends data to Salesforce to create a new record.
Args:
sobject_name: The Salesforce object name (e.g., 'Subcontractor_Performance_Score__c').
data: A dictionary containing the data to send.
access_token: The Salesforce access token.
instance_url: The Salesforce instance URL.
Returns:
The ID of the created record, or None on error.
"""
headers = {
'Authorization': f'Bearer {access_token}',
'Content-Type': 'application/json'
}
url = f"{instance_url}/services/data/v59.0/sobjects/{sobject_name}/" # Use API version v59.0
response = requests.post(url, headers=headers, json=data)
if 200 <= response.status_code < 300: # Successful creation (201 Created)
return response.json().get('id')
else:
print(f"Error sending data to Salesforce ({sobject_name}): {response.text}") # Log error
return None
# --- Flask Routes ---
@app.route('/')
def index():
"""
Displays a welcome message and instructions.
"""
return ("Welcome to the Subcontractor Performance Scorer API!\n"
"1. To authorize with Salesforce, visit /authorize_salesforce.\n"
"2. After authorization, vendor logs will be processed automatically (in a real implementation).\n"
"3. Scores will be sent to Salesforce (in a real implementation).\n")
@app.route('/authorize_salesforce')
def authorize_salesforce():
"""
Redirects the user to the Salesforce authorization URL to obtain an authorization code.
"""
authorization_url = (
f"{SALESFORCE_AUTH_URL}?"
"response_type=code&"
f"client_id={SALESFORCE_CONSUMER_KEY}&"
f"redirect_uri={SALESFORCE_CALLBACK_URL}&"
"scope=api" # Add other scopes as needed (e.g., 'refresh_token')
)
return jsonify({"authorization_url": authorization_url}) # Return as JSON for easier handling
@app.route('/callback')
def callback():
"""
Handles the Salesforce callback after authorization. Exchanges the authorization code
for an access token and refresh token.
"""
global auth_code, access_token, refresh_token, SALESFORCE_INSTANCE_URL
code = request.args.get('code')
if not code:
return jsonify({'error': 'Authorization code not provided.'}), 400
auth_code = code # Store the auth code
access_token, refresh_token, SALESFORCE_INSTANCE_URL = get_salesforce_access_token(code)
if access_token:
return jsonify({'message': 'Successfully authorized with Salesforce! Access token and refresh token obtained.'}), 200
else:
return jsonify({'error': 'Failed to obtain access token.'}), 500
@app.route('/process_logs') # You might trigger this from a Salesforce outbound message or scheduled job.
def process_logs():
"""
Retrieves vendor logs from Salesforce, processes them using the scoring model,
and sends the results to Salesforce.
"""
global access_token, refresh_token, SALESFORCE_INSTANCE_URL
if not access_token:
if auth_code:
access_token, refresh_token, SALESFORCE_INSTANCE_URL = get_salesforce_access_token(auth_code)
else:
return jsonify({'error': 'Not authorized with Salesforce. Please visit /authorize_salesforce first.'}), 401
if not access_token:
return jsonify({'error': 'Not authorized with Salesforce. Please visit /authorize_salesforce first.'}), 401
if not SALESFORCE_INSTANCE_URL:
return jsonify({'error': 'Salesforce instance URL is missing.'}), 500
try:
# 1. Fetch vendor logs from Salesforce (replace with your actual SOQL query)
soql_query = "SELECT Id, Vendor__c, Work_Completion_Details__c, Delay_Reports__c, Incident_Logs__c, Log_Date__c FROM Vendor_Log__c"
vendor_logs = query_salesforce(soql_query, access_token, SALESFORCE_INSTANCE_URL)
if not vendor_logs:
return jsonify({'error': 'Failed to retrieve vendor logs from Salesforce or no logs found.'}), 500
# 2. Process the vendor logs using the model
scores_for_salesforce = process_vendor_logs(vendor_logs) # Call your model.py function
if not scores_for_salesforce:
return jsonify({'warning': 'No scores to send to Salesforce.'}), 200 # No error, but no data sent
# 3. Send the scores to Salesforce
all_updates_successful = True
for score_data in scores_for_salesforce:
# Map the score_data to the fields in your Subcontractor_Performance_Score__c object
salesforce_data = {
'Vendor__c': score_data['vendor_id'], # You might need to look up the Salesforce ID
'Month_c': score_data['month'],
'Quality_Score__c': score_data['quality'],
'Timeliness_Score__c': score_data['timeliness'],
'Safety_Score__c': score_data['safety'],
'Communication_Score__c': score_data['communication'],
'Final_Score_c': score_data['final_score'],
'Alert_Flag_c': score_data['alert_flag'],
'Certification_URL_c': score_data['certificate_url'], #add this
'Trend_Deviation__c': score_data['trend_deviation']
}
record_id = send_data_to_salesforce('Subcontractor_Performance_Score__c', salesforce_data, access_token, SALESFORCE_INSTANCE_URL)
if not record_id:
all_updates_successful = False
print(f"Failed to send score for vendor {score_data['vendor_id']} to Salesforce.")
if all_updates_successful:
return jsonify({'message': 'Successfully processed vendor logs and sent scores to Salesforce.'}), 200
else:
return jsonify({'error': 'Failed to send some scores to Salesforce. Check logs for details.'}), 500
except Exception as e:
return jsonify({'error': f'An error occurred: {e}'}), 500
@app.route('/refresh_token')
def refresh_token_route():
"""
Endpoint to refresh the Salesforce access token. This would be called periodically
(e.g., by a scheduled task) in a real application.
"""
global access_token, refresh_token, SALESFORCE_INSTANCE_URL
if not refresh_token:
return jsonify({'error': 'No refresh token available.'}), 400
new_access_token, new_refresh_token, SALESFORCE_INSTANCE_URL = refresh_salesforce_access_token(refresh_token)
if new_access_token:
access_token = new_access_token
refresh_token = new_refresh_token # Update the refresh token, as it might change.
return jsonify({'message': 'Access token refreshed successfully.'}), 200
else:
return jsonify({'error': 'Failed to refresh access token.'}), 500
if __name__ == '__main__':
# The port is automatically configured by Hugging Face Spaces
port = int(os.environ.get("PORT", 8080))
app.run(host='0.0.0.0', port=port, debug=True)