Spaces:
Runtime error
Runtime error
| from flask import Flask, request, jsonify | |
| import os | |
| import json | |
| import requests # For making HTTP requests to Salesforce | |
| from model import process_vendor_logs # Import your scoring logic from model.py | |
| app = Flask(__name__) | |
| # --- Configuration --- | |
| # These should be set as environment variables in your Hugging Face Space for security | |
| SALESFORCE_CONSUMER_KEY = os.environ.get('SALESFORCE_CONSUMER_KEY') | |
| SALESFORCE_CONSUMER_SECRET = os.environ.get('SALESFORCE_CONSUMER_SECRET') | |
| SALESFORCE_CALLBACK_URL = os.environ.get('SALESFORCE_CALLBACK_URL') # Needs to match your Connected App | |
| SALESFORCE_TOKEN_URL = "https://login.salesforce.com/services/oauth2/token" # Or your sandbox URL | |
| SALESFORCE_AUTH_URL = "https://login.salesforce.com/services/oauth2/authorize" # Or your sandbox URL | |
| # Placeholder for your Salesforce instance URL. This will be obtained dynamically | |
| SALESFORCE_INSTANCE_URL = None | |
| # Hugging Face Space doesn't persist data, so use a very basic in-memory storage for demonstration. | |
| # Replace with a database (e.g., PostgreSQL) for a production environment. | |
| auth_code = None | |
| access_token = None | |
| refresh_token = None | |
| # --- Helper Functions --- | |
| def get_salesforce_access_token(code): | |
| """ | |
| Exchanges an authorization code for an access token and refresh token from Salesforce. | |
| Args: | |
| code: The authorization code received from Salesforce. | |
| Returns: | |
| A tuple containing the access token and refresh token, or (None, None) on error. | |
| """ | |
| token_data = { | |
| 'grant_type': 'authorization_code', | |
| 'code': code, | |
| 'client_id': SALESFORCE_CONSUMER_KEY, | |
| 'client_secret': SALESFORCE_CONSUMER_SECRET, | |
| 'redirect_uri': SALESFORCE_CALLBACK_URL | |
| } | |
| response = requests.post(SALESFORCE_TOKEN_URL, data=token_data) | |
| if response.status_code == 200: | |
| token_response = response.json() | |
| return token_response.get('access_token'), token_response.get('refresh_token'), token_response.get('instance_url') | |
| else: | |
| print(f"Error getting access token: {response.text}") # Log the error | |
| return None, None, None | |
| def refresh_salesforce_access_token(refresh_token): | |
| """ | |
| Refreshes the access token using the refresh token. | |
| Args: | |
| refresh_token: The refresh token from Salesforce. | |
| Returns: | |
| A tuple containing the new access token and refresh token, or (None, None) on error. | |
| """ | |
| token_data = { | |
| 'grant_type': 'refresh_token', | |
| 'refresh_token': refresh_token, | |
| 'client_id': SALESFORCE_CONSUMER_KEY, | |
| 'client_secret': SALESFORCE_CONSUMER_SECRET, | |
| 'redirect_uri': SALESFORCE_CALLBACK_URL #important for refresh | |
| } | |
| response = requests.post(SALESFORCE_TOKEN_URL, data=token_data) | |
| if response.status_code == 200: | |
| token_response = response.json() | |
| return token_response.get('access_token'), token_response.get('refresh_token'), token_response.get('instance_url') | |
| else: | |
| print(f"Error refreshing access token: {response.text}") # Log the error | |
| return None, None, None | |
| def query_salesforce(soql_query, access_token, instance_url): | |
| """ | |
| Executes a SOQL query against Salesforce. | |
| Args: | |
| soql_query: The SOQL query string. | |
| access_token: The Salesforce access token. | |
| instance_url: The Salesforce instance URL. | |
| Returns: | |
| A list of records from Salesforce, or None on error. | |
| """ | |
| headers = {'Authorization': f'Bearer {access_token}'} | |
| url = f"{instance_url}/services/data/v59.0/query/?q={soql_query}" # Use API version v59.0 | |
| response = requests.get(url, headers=headers) | |
| if response.status_code == 200: | |
| return response.json().get('records') | |
| else: | |
| print(f"Error querying Salesforce: {response.text}") # Log the error | |
| return None | |
| def send_data_to_salesforce(sobject_name, data, access_token, instance_url): | |
| """ | |
| Sends data to Salesforce to create a new record. | |
| Args: | |
| sobject_name: The Salesforce object name (e.g., 'Subcontractor_Performance_Score__c'). | |
| data: A dictionary containing the data to send. | |
| access_token: The Salesforce access token. | |
| instance_url: The Salesforce instance URL. | |
| Returns: | |
| The ID of the created record, or None on error. | |
| """ | |
| headers = { | |
| 'Authorization': f'Bearer {access_token}', | |
| 'Content-Type': 'application/json' | |
| } | |
| url = f"{instance_url}/services/data/v59.0/sobjects/{sobject_name}/" # Use API version v59.0 | |
| response = requests.post(url, headers=headers, json=data) | |
| if 200 <= response.status_code < 300: # Successful creation (201 Created) | |
| return response.json().get('id') | |
| else: | |
| print(f"Error sending data to Salesforce ({sobject_name}): {response.text}") # Log error | |
| return None | |
| # --- Flask Routes --- | |
| def index(): | |
| """ | |
| Displays a welcome message and instructions. | |
| """ | |
| return ("Welcome to the Subcontractor Performance Scorer API!\n" | |
| "1. To authorize with Salesforce, visit /authorize_salesforce.\n" | |
| "2. After authorization, vendor logs will be processed automatically (in a real implementation).\n" | |
| "3. Scores will be sent to Salesforce (in a real implementation).\n") | |
| def authorize_salesforce(): | |
| """ | |
| Redirects the user to the Salesforce authorization URL to obtain an authorization code. | |
| """ | |
| authorization_url = ( | |
| f"{SALESFORCE_AUTH_URL}?" | |
| "response_type=code&" | |
| f"client_id={SALESFORCE_CONSUMER_KEY}&" | |
| f"redirect_uri={SALESFORCE_CALLBACK_URL}&" | |
| "scope=api" # Add other scopes as needed (e.g., 'refresh_token') | |
| ) | |
| return jsonify({"authorization_url": authorization_url}) # Return as JSON for easier handling | |
| def callback(): | |
| """ | |
| Handles the Salesforce callback after authorization. Exchanges the authorization code | |
| for an access token and refresh token. | |
| """ | |
| global auth_code, access_token, refresh_token, SALESFORCE_INSTANCE_URL | |
| code = request.args.get('code') | |
| if not code: | |
| return jsonify({'error': 'Authorization code not provided.'}), 400 | |
| auth_code = code # Store the auth code | |
| access_token, refresh_token, SALESFORCE_INSTANCE_URL = get_salesforce_access_token(code) | |
| if access_token: | |
| return jsonify({'message': 'Successfully authorized with Salesforce! Access token and refresh token obtained.'}), 200 | |
| else: | |
| return jsonify({'error': 'Failed to obtain access token.'}), 500 | |
| # You might trigger this from a Salesforce outbound message or scheduled job. | |
| def process_logs(): | |
| """ | |
| Retrieves vendor logs from Salesforce, processes them using the scoring model, | |
| and sends the results to Salesforce. | |
| """ | |
| global access_token, refresh_token, SALESFORCE_INSTANCE_URL | |
| if not access_token: | |
| if auth_code: | |
| access_token, refresh_token, SALESFORCE_INSTANCE_URL = get_salesforce_access_token(auth_code) | |
| else: | |
| return jsonify({'error': 'Not authorized with Salesforce. Please visit /authorize_salesforce first.'}), 401 | |
| if not access_token: | |
| return jsonify({'error': 'Not authorized with Salesforce. Please visit /authorize_salesforce first.'}), 401 | |
| if not SALESFORCE_INSTANCE_URL: | |
| return jsonify({'error': 'Salesforce instance URL is missing.'}), 500 | |
| try: | |
| # 1. Fetch vendor logs from Salesforce (replace with your actual SOQL query) | |
| soql_query = "SELECT Id, Vendor__c, Work_Completion_Details__c, Delay_Reports__c, Incident_Logs__c, Log_Date__c FROM Vendor_Log__c" | |
| vendor_logs = query_salesforce(soql_query, access_token, SALESFORCE_INSTANCE_URL) | |
| if not vendor_logs: | |
| return jsonify({'error': 'Failed to retrieve vendor logs from Salesforce or no logs found.'}), 500 | |
| # 2. Process the vendor logs using the model | |
| scores_for_salesforce = process_vendor_logs(vendor_logs) # Call your model.py function | |
| if not scores_for_salesforce: | |
| return jsonify({'warning': 'No scores to send to Salesforce.'}), 200 # No error, but no data sent | |
| # 3. Send the scores to Salesforce | |
| all_updates_successful = True | |
| for score_data in scores_for_salesforce: | |
| # Map the score_data to the fields in your Subcontractor_Performance_Score__c object | |
| salesforce_data = { | |
| 'Vendor__c': score_data['vendor_id'], # You might need to look up the Salesforce ID | |
| 'Month_c': score_data['month'], | |
| 'Quality_Score__c': score_data['quality'], | |
| 'Timeliness_Score__c': score_data['timeliness'], | |
| 'Safety_Score__c': score_data['safety'], | |
| 'Communication_Score__c': score_data['communication'], | |
| 'Final_Score_c': score_data['final_score'], | |
| 'Alert_Flag_c': score_data['alert_flag'], | |
| 'Certification_URL_c': score_data['certificate_url'], #add this | |
| 'Trend_Deviation__c': score_data['trend_deviation'] | |
| } | |
| record_id = send_data_to_salesforce('Subcontractor_Performance_Score__c', salesforce_data, access_token, SALESFORCE_INSTANCE_URL) | |
| if not record_id: | |
| all_updates_successful = False | |
| print(f"Failed to send score for vendor {score_data['vendor_id']} to Salesforce.") | |
| if all_updates_successful: | |
| return jsonify({'message': 'Successfully processed vendor logs and sent scores to Salesforce.'}), 200 | |
| else: | |
| return jsonify({'error': 'Failed to send some scores to Salesforce. Check logs for details.'}), 500 | |
| except Exception as e: | |
| return jsonify({'error': f'An error occurred: {e}'}), 500 | |
| def refresh_token_route(): | |
| """ | |
| Endpoint to refresh the Salesforce access token. This would be called periodically | |
| (e.g., by a scheduled task) in a real application. | |
| """ | |
| global access_token, refresh_token, SALESFORCE_INSTANCE_URL | |
| if not refresh_token: | |
| return jsonify({'error': 'No refresh token available.'}), 400 | |
| new_access_token, new_refresh_token, SALESFORCE_INSTANCE_URL = refresh_salesforce_access_token(refresh_token) | |
| if new_access_token: | |
| access_token = new_access_token | |
| refresh_token = new_refresh_token # Update the refresh token, as it might change. | |
| return jsonify({'message': 'Access token refreshed successfully.'}), 200 | |
| else: | |
| return jsonify({'error': 'Failed to refresh access token.'}), 500 | |
| if __name__ == '__main__': | |
| # The port is automatically configured by Hugging Face Spaces | |
| port = int(os.environ.get("PORT", 8080)) | |
| app.run(host='0.0.0.0', port=port, debug=True) | |